Skip to content

Commit ff1b628

Browse files
authored
Merge pull request #481 from LydiaCai1203/feat-summary-v3
feat: long task summary
2 parents 7c6456b + f024c77 commit ff1b628

3 files changed

Lines changed: 96 additions & 44 deletions

File tree

backend/biz/task/service/tasksummary.go

Lines changed: 94 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"log/slog"
10+
"slices"
1011
"strings"
1112
"sync"
1213
"time"
@@ -261,9 +262,14 @@ func (s *TaskSummaryService) handleJob(ctx context.Context, job *delayqueue.Job[
261262
return nil
262263
}
263264

264-
// fetchConversation 从 Loki 获取历史对话,返回消息数组
265+
// fetchConversation 从 Loki 获取历史对话,只保留最近 N 轮对话(user-input / reply-question 及其对应的 agent 回复)。
266+
// 使用倒序查询,从最新的日志往前查,收集到足够轮用户消息后立即停止,避免遍历全部历史。
265267
func (s *TaskSummaryService) fetchConversation(ctx context.Context, taskID string, createdAt time.Time) ([]llm.Message, error) {
266-
var messages []llm.Message
268+
maxRounds := s.cfg.TaskSummary.MaxRounds
269+
if maxRounds <= 0 {
270+
maxRounds = 3
271+
}
272+
const pageSize = 200
267273

268274
taskUUID, err := uuid.Parse(taskID)
269275
if err != nil {
@@ -274,64 +280,107 @@ func (s *TaskSummaryService) fetchConversation(ctx context.Context, taskID strin
274280
if err != nil {
275281
return nil, fmt.Errorf("failed to get task: %w", err)
276282
}
277-
messages = append(messages, llm.Message{Role: "user", Content: t.Content})
278283

279-
agentMsg := []string{}
280-
_, err = s.loki.History(ctx, taskID, createdAt, func(entries []loki.LogEntry) {
284+
// 从后往前分页查 Loki,收集到足够的用户轮次后停止
285+
start := createdAt
286+
end := time.Now()
287+
var tailEntries []loki.LogEntry
288+
userRoundCount := 0
289+
290+
for {
291+
entries, err := s.loki.QueryByTaskID(ctx, taskID, start, end, pageSize, "backward")
292+
if err != nil {
293+
return nil, fmt.Errorf("failed to fetch loki history: %w", err)
294+
}
295+
296+
done := false
281297
for _, entry := range entries {
298+
tailEntries = append(tailEntries, entry)
299+
282300
if entry.Line == "" {
283301
continue
284302
}
285-
286-
s.logger.DebugContext(ctx, "loki entry", "entry", entry.Line)
287-
288303
var lokiEnt lokiEntry
289304
if err := json.Unmarshal([]byte(entry.Line), &lokiEnt); err != nil {
290-
s.logger.ErrorContext(ctx, "failed to unmarshal loki entry", "task_id", taskID, "error", err)
291305
continue
292306
}
293-
294-
if lokiEnt.Data == "" {
295-
continue
307+
if lokiEnt.Event == "user-input" || lokiEnt.Event == "reply-question" {
308+
userRoundCount++
309+
if userRoundCount >= maxRounds {
310+
done = true
311+
break
312+
}
296313
}
314+
}
297315

298-
decoded, err := base64.StdEncoding.DecodeString(lokiEnt.Data)
299-
if err != nil {
300-
s.logger.ErrorContext(ctx, "failed to decode base64 data", "task_id", taskID, "error", err)
301-
continue
302-
}
316+
if done || len(entries) < pageSize {
317+
break
318+
}
319+
// 向更早的时间翻页
320+
end = entries[len(entries)-1].Timestamp.Add(-time.Nanosecond)
321+
}
303322

304-
switch lokiEnt.Event {
305-
case "user-input", "reply-question":
306-
var userInputText string
307-
var ur userReply
308-
if err := json.Unmarshal(decoded, &ur); err != nil {
309-
userInputText = string(decoded)
310-
} else {
311-
userInputText = ur.AnswersJSON
312-
}
323+
// 反转为时间正序
324+
slices.Reverse(tailEntries)
313325

314-
if len(agentMsg) > 0 {
315-
agentContent := strings.Join(agentMsg, "")
316-
messages = append(messages, llm.Message{Role: "assistant", Content: agentContent})
317-
agentMsg = []string{}
318-
}
326+
// 按正序解析为 messages
327+
var messages []llm.Message
328+
// 如果 Loki 中用户轮次不足 3 轮,补上初始任务内容
329+
if userRoundCount < maxRounds {
330+
messages = append(messages, llm.Message{Role: "user", Content: t.Content})
331+
}
332+
333+
agentMsg := []string{}
334+
for _, entry := range tailEntries {
335+
if entry.Line == "" {
336+
continue
337+
}
319338

320-
messages = append(messages, llm.Message{Role: "user", Content: userInputText})
339+
s.logger.DebugContext(ctx, "loki entry", "entry", entry.Line)
321340

322-
case "task-running":
323-
var taskMsg wsData
324-
if err := json.Unmarshal(decoded, &taskMsg); err != nil {
325-
continue
326-
}
327-
if taskMsg.Update.SessionUpdate == "agent_message_chunk" {
328-
agentMsg = append(agentMsg, taskMsg.Update.Content.Text)
329-
}
341+
var lokiEnt lokiEntry
342+
if err := json.Unmarshal([]byte(entry.Line), &lokiEnt); err != nil {
343+
s.logger.ErrorContext(ctx, "failed to unmarshal loki entry", "task_id", taskID, "error", err)
344+
continue
345+
}
346+
347+
if lokiEnt.Data == "" {
348+
continue
349+
}
350+
351+
decoded, err := base64.StdEncoding.DecodeString(lokiEnt.Data)
352+
if err != nil {
353+
s.logger.ErrorContext(ctx, "failed to decode base64 data", "task_id", taskID, "error", err)
354+
continue
355+
}
356+
357+
switch lokiEnt.Event {
358+
case "user-input", "reply-question":
359+
var userInputText string
360+
var ur userReply
361+
if err := json.Unmarshal(decoded, &ur); err != nil {
362+
userInputText = string(decoded)
363+
} else {
364+
userInputText = ur.AnswersJSON
365+
}
366+
367+
if len(agentMsg) > 0 {
368+
agentContent := strings.Join(agentMsg, "")
369+
messages = append(messages, llm.Message{Role: "assistant", Content: agentContent})
370+
agentMsg = []string{}
371+
}
372+
373+
messages = append(messages, llm.Message{Role: "user", Content: userInputText})
374+
375+
case "task-running":
376+
var taskMsg wsData
377+
if err := json.Unmarshal(decoded, &taskMsg); err != nil {
378+
continue
379+
}
380+
if taskMsg.Update.SessionUpdate == "agent_message_chunk" {
381+
agentMsg = append(agentMsg, taskMsg.Update.Content.Text)
330382
}
331383
}
332-
})
333-
if err != nil {
334-
return nil, fmt.Errorf("failed to fetch loki history: %w", err)
335384
}
336385

337386
if len(messages) == 0 {
@@ -343,7 +392,7 @@ func (s *TaskSummaryService) fetchConversation(ctx context.Context, taskID strin
343392
messages = append(messages, llm.Message{Role: "assistant", Content: agentContent})
344393
}
345394

346-
s.logger.DebugContext(ctx, "conversation", "messages_count", messages)
395+
s.logger.DebugContext(ctx, "conversation", "messages_count", len(messages), "messages", messages)
347396
return messages, nil
348397
}
349398

@@ -367,6 +416,7 @@ func (s *TaskSummaryService) generateSummary(ctx context.Context, conversation [
367416
- 如果是开发任务:说明做的是什么应用/功能(如"开发五子棋游戏")
368417
- 如果是问问题:说明问的是什么问题(如"React Hooks 如何管理状态")
369418
- 如果是修 bug:说明修的是什么问题(如"修复用户登录失败问题")
419+
- 中英文之间要加空格(如"修复 React 组件的 bug"而不是"修复React组件的bug")
370420
- 如果对话无实质内容,就用最近一条用户输入作为标题`, maxChars)
371421

372422
messages := []llm.Message{

backend/biz/task/service/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,4 @@ type wsContent struct {
3535
Content string `json:"content"`
3636
Message string `json:"message"`
3737
}
38+

backend/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ type TaskSummary struct {
110110
InterfaceType string `mapstructure:"interface_type"` // API 接口类型(openai_chat/openai_responses/anthropic)
111111
Delay int `mapstructure:"delay"` // 延迟时间(秒),默认 3600
112112
MaxChars int `mapstructure:"max_chars"` // 摘要最大字符数,默认 300
113+
MaxRounds int `mapstructure:"max_rounds"` // 最近对话轮数,默认 3
113114
MaxWorkers int `mapstructure:"max_workers"` // 最大消费者数量,默认 5
114115
}
115116

0 commit comments

Comments
 (0)