diff --git a/backend/cmd/start.go b/backend/cmd/start.go index e5776b2..f5542f7 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -135,6 +135,9 @@ func Start() { if err = eventsvc.RegisterAgentStateSnapshotHandler(eventBus, outboxRepo, manager); err != nil { log.Fatalf("Failed to register agent state snapshot event handler: %v", err) } + if err = eventsvc.RegisterAgentTimelinePersistHandler(eventBus, outboxRepo, agentRepo, cacheRepo); err != nil { + log.Fatalf("Failed to register agent timeline persist event handler: %v", err) + } if err = eventsvc.RegisterMemoryExtractRequestedHandler(eventBus, outboxRepo, memoryModule); err != nil { log.Fatalf("Failed to register memory extract event handler: %v", err) } diff --git a/backend/model/agent_timeline.go b/backend/model/agent_timeline.go index 700232f..75f0f6c 100644 --- a/backend/model/agent_timeline.go +++ b/backend/model/agent_timeline.go @@ -1,13 +1,16 @@ package model -import "time" +import ( + "strings" + "time" +) // AgentTimelineKind 定义会话时间线事件类型。 // // 说明: // 1. 这些类型面向前端渲染,要求语义稳定,不随节点内部实现细节频繁变化; // 2. 文本消息和卡片事件共用一条时间线,前端只按 seq 顺序渲染; -// 3. token 统计仍以 chat_histories / agent_chats 为准,时间线只做展示顺序与结构承载。 +// 3. token 统计仍以 chat_histories / agent_chats 为准,时间线只负责展示顺序与结构承载。 const ( AgentTimelineKindUserText = "user_text" AgentTimelineKindAssistantText = "assistant_text" @@ -16,6 +19,7 @@ const ( AgentTimelineKindConfirmRequest = "confirm_request" AgentTimelineKindBusinessCard = "business_card" AgentTimelineKindScheduleCompleted = "schedule_completed" + AgentTimelineKindThinkingSummary = "thinking_summary" ) // AgentTimelineEvent 表示会话里“可展示事件”的统一持久化记录。 @@ -33,13 +37,18 @@ type AgentTimelineEvent struct { Role *string `gorm:"column:role;type:varchar(32);comment:消息角色"` Content *string `gorm:"column:content;type:text;comment:正文内容"` Payload *string `gorm:"column:payload;type:json;comment:结构化负载"` - TokensConsumed int `gorm:"column:tokens_consumed;not null;default:0;comment:该事件关联token,默认0"` + TokensConsumed int `gorm:"column:tokens_consumed;not null;default:0;comment:该事件关联 token,默认 0"` CreatedAt *time.Time `gorm:"column:created_at;autoCreateTime;index:idx_timeline_user_chat_created,priority:3"` } func (AgentTimelineEvent) TableName() string { return "agent_timeline_events" } // ChatTimelinePersistPayload 定义时间线单条事件落库输入。 +// +// 职责边界: +// 1. 只表达一次“写入 agent_timeline_events”的最小字段集合; +// 2. Content 面向纯文本类事件,结构化事件更多依赖 PayloadJSON; +// 3. thinking_summary 事件要求 PayloadJSON 内只保留 detail_summary 与必要 metadata。 type ChatTimelinePersistPayload struct { UserID int `json:"user_id"` ConversationID string `json:"conversation_id"` @@ -51,6 +60,73 @@ type ChatTimelinePersistPayload struct { TokensConsumed int `json:"tokens_consumed"` } +// Normalize 负责收敛时间线持久化载荷的基础口径。 +// +// 职责边界: +// 1. 只做字符串 trim 和非负数兜底; +// 2. 不负责 thinking_summary 的业务裁剪; +// 3. 返回副本,避免调用方意外修改原对象。 +func (p ChatTimelinePersistPayload) Normalize() ChatTimelinePersistPayload { + p.ConversationID = strings.TrimSpace(p.ConversationID) + p.Kind = strings.TrimSpace(p.Kind) + p.Role = strings.TrimSpace(p.Role) + p.Content = strings.TrimSpace(p.Content) + p.PayloadJSON = strings.TrimSpace(p.PayloadJSON) + if p.Seq < 0 { + p.Seq = 0 + } + if p.TokensConsumed < 0 { + p.TokensConsumed = 0 + } + return p +} + +// HasValidIdentity 判断 payload 是否具备最小可持久化主键语义。 +func (p ChatTimelinePersistPayload) HasValidIdentity() bool { + normalized := p.Normalize() + return normalized.UserID > 0 && + normalized.ConversationID != "" && + normalized.Seq > 0 && + normalized.Kind != "" +} + +// MatchesStoredEvent 判断 payload 与库中事件是否可视为“同一条业务事件”。 +// +// 说明: +// 1. 主要用于 outbox 重放时识别“唯一键冲突但其实已经成功落库”的场景; +// 2. 只比较持久化字段,不比较 created_at / id 这类存储侧派生值; +// 3. 返回 true 时,上层可以把 seq 冲突视为幂等成功。 +func (p ChatTimelinePersistPayload) MatchesStoredEvent(event AgentTimelineEvent) bool { + normalized := p.Normalize() + return event.UserID == normalized.UserID && + strings.TrimSpace(event.ChatID) == normalized.ConversationID && + event.Seq == normalized.Seq && + strings.TrimSpace(event.Kind) == normalized.Kind && + trimTimelinePointerString(event.Role) == normalized.Role && + trimTimelinePointerString(event.Content) == normalized.Content && + trimTimelinePointerString(event.Payload) == normalized.PayloadJSON && + event.TokensConsumed == normalized.TokensConsumed +} + +// IsTimelineSeqConflictError 判断 error 是否属于时间线 seq 唯一键冲突。 +// +// 说明: +// 1. MySQL / PostgreSQL / SQLite 的重复键报错文案并不完全一致,这里用宽松文本匹配; +// 2. 该函数只用于“是否进入幂等/补 seq 分支”的判断,不承担精确错误分类职责; +// 3. 若未来统一抽数据库错误码适配层,应优先替换这里而不是继续复制判断逻辑。 +func IsTimelineSeqConflictError(err error) bool { + if err == nil { + return false + } + lower := strings.ToLower(err.Error()) + return strings.Contains(lower, "duplicate entry") || + strings.Contains(lower, "duplicate key") || + strings.Contains(lower, "unique constraint") || + strings.Contains(lower, "unique violation") || + strings.Contains(lower, "error 1062") || + strings.Contains(lower, "uk_timeline_user_chat_seq") +} + // GetConversationTimelineItem 定义前端读取时间线接口的单条返回项。 type GetConversationTimelineItem struct { ID int64 `json:"id,omitempty"` @@ -62,3 +138,10 @@ type GetConversationTimelineItem struct { TokensConsumed int `json:"tokens_consumed,omitempty"` CreatedAt *time.Time `json:"created_at,omitempty"` } + +func trimTimelinePointerString(value *string) string { + if value == nil { + return "" + } + return strings.TrimSpace(*value) +} diff --git a/backend/newAgent/node/execute/action_router.go b/backend/newAgent/node/execute/action_router.go index 018c441..d628a58 100644 --- a/backend/newAgent/node/execute/action_router.go +++ b/backend/newAgent/node/execute/action_router.go @@ -56,6 +56,15 @@ func collectExecuteDecisionFromLLM( parser := newagentrouter.NewStreamDecisionParser() output := &executeDecisionStreamOutput{firstChunk: true} var fullText strings.Builder + reasoningDigestor, digestorErr := emitter.NewReasoningDigestor(ctx, executeSpeakBlockID, executeStageName) + if digestorErr != nil { + return nil, fmt.Errorf("执行 thinking 摘要器初始化失败: %w", digestorErr) + } + defer func() { + if reasoningDigestor != nil { + _ = reasoningDigestor.Close(ctx) + } + }() for { chunk, recvErr := reader.Recv() @@ -68,15 +77,9 @@ func collectExecuteDecisionFromLLM( } if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { - if emitErr := emitter.EmitReasoningText( - executeSpeakBlockID, - executeStageName, - chunk.ReasoningContent, - output.firstChunk, - ); emitErr != nil { - return nil, fmt.Errorf("执行 thinking 推送失败: %w", emitErr) + if reasoningDigestor != nil { + reasoningDigestor.Append(chunk.ReasoningContent) } - output.firstChunk = false } content := "" @@ -148,6 +151,9 @@ func collectExecuteDecisionFromLLM( output.decision = decision if visible != "" { + if reasoningDigestor != nil { + reasoningDigestor.MarkContentStarted() + } if emitErr := emitter.EmitAssistantText( executeSpeakBlockID, executeStageName, @@ -174,9 +180,14 @@ func collectExecuteDecisionFromLLM( continue } if strings.TrimSpace(chunk2.ReasoningContent) != "" { - _ = emitter.EmitReasoningText(executeSpeakBlockID, executeStageName, chunk2.ReasoningContent, false) + if reasoningDigestor != nil { + reasoningDigestor.Append(chunk2.ReasoningContent) + } } if chunk2.Content != "" { + if reasoningDigestor != nil { + reasoningDigestor.MarkContentStarted() + } if emitErr := emitter.EmitAssistantText( executeSpeakBlockID, executeStageName, diff --git a/backend/newAgent/node/plan.go b/backend/newAgent/node/plan.go index 66b6238..aaf8e8b 100644 --- a/backend/newAgent/node/plan.go +++ b/backend/newAgent/node/plan.go @@ -106,6 +106,15 @@ func RunPlanNode(ctx context.Context, input PlanNodeInput) error { parser := newagentrouter.NewStreamDecisionParser() firstChunk := true speakStreamed := false + reasoningDigestor, digestorErr := emitter.NewReasoningDigestor(ctx, planSpeakBlockID, planStageName) + if digestorErr != nil { + return fmt.Errorf("规划 thinking 摘要器初始化失败: %w", digestorErr) + } + defer func() { + if reasoningDigestor != nil { + _ = reasoningDigestor.Close(ctx) + } + }() // 3.1 阶段一:解析决策标签。 for { @@ -118,12 +127,11 @@ func RunPlanNode(ctx context.Context, input PlanNodeInput) error { break } - // thinking 内容独立推流。 + // thinking 内容只进入摘要器,不再把 raw reasoning_content 透传给前端。 if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { - if emitErr := emitter.EmitReasoningText(planSpeakBlockID, planStageName, chunk.ReasoningContent, firstChunk); emitErr != nil { - return fmt.Errorf("规划 thinking 推送失败: %w", emitErr) + if reasoningDigestor != nil { + reasoningDigestor.Append(chunk.ReasoningContent) } - firstChunk = false } content := "" @@ -152,6 +160,9 @@ func RunPlanNode(ctx context.Context, input PlanNodeInput) error { // 3.2 阶段二:流式推送 speak(同一 reader 继续读取)。 var fullText strings.Builder if visible != "" { + if reasoningDigestor != nil { + reasoningDigestor.MarkContentStarted() + } if emitErr := emitter.EmitAssistantText(planSpeakBlockID, planStageName, visible, firstChunk); emitErr != nil { return fmt.Errorf("规划文案推送失败: %w", emitErr) } @@ -172,9 +183,14 @@ func RunPlanNode(ctx context.Context, input PlanNodeInput) error { continue } if strings.TrimSpace(chunk2.ReasoningContent) != "" { - _ = emitter.EmitReasoningText(planSpeakBlockID, planStageName, chunk2.ReasoningContent, false) + if reasoningDigestor != nil { + reasoningDigestor.Append(chunk2.ReasoningContent) + } } if chunk2.Content != "" { + if reasoningDigestor != nil { + reasoningDigestor.MarkContentStarted() + } if emitErr := emitter.EmitAssistantText(planSpeakBlockID, planStageName, chunk2.Content, firstChunk); emitErr != nil { return fmt.Errorf("规划文案推送失败: %w", emitErr) } diff --git a/backend/newAgent/prompt/reasoning_summary.go b/backend/newAgent/prompt/reasoning_summary.go new file mode 100644 index 0000000..4926f1f --- /dev/null +++ b/backend/newAgent/prompt/reasoning_summary.go @@ -0,0 +1,128 @@ +package newagentprompt + +import ( + "encoding/json" + "fmt" + "strings" + "unicode/utf8" + + "github.com/cloudwego/eino/schema" +) + +const ( + reasoningSummaryMaxFullRunes = 6000 + reasoningSummaryMaxDeltaRunes = 1800 +) + +// ReasoningSummaryPromptInput 描述一次“思考摘要”模型调用所需的最小输入。 +// +// 职责边界: +// 1. 只承载摘要模型需要看的文本与运行态,不绑定 stream 包的 DTO,避免 prompt 层反向依赖输出协议; +// 2. FullReasoning 会在构造 prompt 时只保留尾部,避免长时间思考把便宜模型上下文撑爆; +// 3. PreviousSummary 只作为连续摘要的参考,不要求模型逐字继承。 +type ReasoningSummaryPromptInput struct { + FullReasoning string + DeltaReasoning string + PreviousSummary string + CandidateSeq int + Final bool + DurationSeconds float64 +} + +type reasoningSummaryPromptPayload struct { + CandidateSeq int `json:"candidate_seq"` + Final bool `json:"final"` + DurationSeconds float64 `json:"duration_seconds"` + PreviousSummary string `json:"previous_summary,omitempty"` + RecentReasoning string `json:"recent_reasoning,omitempty"` + DeltaReasoning string `json:"delta_reasoning,omitempty"` + SourceTextRunes int `json:"source_text_runes,omitempty"` + MaxDetailSummaryRunes int `json:"max_detail_summary_runes,omitempty"` +} + +// BuildReasoningSummaryMessages 构造思考摘要模型调用的 messages。 +// +// 步骤说明: +// 1. system prompt 明确“只做用户可见摘要”,禁止复述原始思考链和内部推理细节; +// 2. user prompt 使用 JSON 承载输入,便于后续扩展字段且减少模型误读; +// 3. 长文本只保留尾部窗口,保证异步摘要请求稳定、便宜、可控。 +func BuildReasoningSummaryMessages(input ReasoningSummaryPromptInput) []*schema.Message { + recentReasoning := trimRunesFromEnd(input.FullReasoning, reasoningSummaryMaxFullRunes) + deltaReasoning := trimRunesFromEnd(input.DeltaReasoning, reasoningSummaryMaxDeltaRunes) + payload := reasoningSummaryPromptPayload{ + CandidateSeq: input.CandidateSeq, + Final: input.Final, + DurationSeconds: input.DurationSeconds, + PreviousSummary: strings.TrimSpace(input.PreviousSummary), + RecentReasoning: recentReasoning, + DeltaReasoning: deltaReasoning, + SourceTextRunes: reasoningSummarySourceRunes(recentReasoning, deltaReasoning), + MaxDetailSummaryRunes: ReasoningSummaryDetailRuneLimit(input.FullReasoning, input.DeltaReasoning), + } + + raw, err := json.MarshalIndent(payload, "", " ") + if err != nil { + raw = []byte(fmt.Sprintf(`{"recent_reasoning":%q}`, trimRunesFromEnd(input.FullReasoning, reasoningSummaryMaxFullRunes))) + } + + return []*schema.Message{ + schema.SystemMessage(buildReasoningSummarySystemPrompt()), + schema.UserMessage("请把下面的模型思考内容整理成用户可见的进度摘要。\n输入:\n" + string(raw)), + } +} + +func buildReasoningSummarySystemPrompt() string { + return strings.TrimSpace(`你是 SmartMate 的“思考摘要器”。你的任务是把模型内部 reasoning 整理成用户可见的进度摘要。 + +输出必须是严格 JSON 对象: +{ + "short_summary": "8到18个汉字的短摘要", + "detail_summary": "不超过 max_detail_summary_runes 个字的展开摘要" +} + +规则: +1. 只描述“正在做什么”和“目前推进到哪一步”,不要复述、引用或暴露原始思考链。 +2. 不输出 markdown,不输出代码块,不解释 JSON 以外的内容。 +3. short_summary 要短、稳定、适合前端几秒刷新一次。 +4. detail_summary 不按固定句数限制,而按输入长度控制:字数必须小于等于 max_detail_summary_runes;不需要凑满上限,信息密度优先。 +5. detail_summary 仍然面向用户,不写内部推理细节、隐含假设链、逐步演算。 +6. 若输入为空或噪声较多,用保守摘要,例如“正在整理思路”“正在核对可用信息”。 +7. final=true 时,detail_summary 用完成态语气,说明思考已收拢到下一步答复或动作。`) +} + +// ReasoningSummaryDetailRuneLimit 返回 detail_summary 的最大字数。 +// +// 职责边界: +// 1. 与 BuildReasoningSummaryMessages 使用同一套输入窗口,避免 prompt 提示和服务端兜底口径不一致; +// 2. 上限取“提供给摘要模型的主要文本段”的一半,并向上取整,适配极短文本; +// 3. 返回 0 表示没有有效输入文本,调用方不应做硬裁剪。 +func ReasoningSummaryDetailRuneLimit(fullReasoning, deltaReasoning string) int { + recentReasoning := trimRunesFromEnd(fullReasoning, reasoningSummaryMaxFullRunes) + delta := trimRunesFromEnd(deltaReasoning, reasoningSummaryMaxDeltaRunes) + sourceRunes := reasoningSummarySourceRunes(recentReasoning, delta) + if sourceRunes <= 0 { + return 0 + } + return (sourceRunes + 1) / 2 +} + +func reasoningSummarySourceRunes(recentReasoning, deltaReasoning string) int { + recentReasoning = strings.TrimSpace(recentReasoning) + if recentReasoning != "" { + return utf8.RuneCountInString(recentReasoning) + } + return utf8.RuneCountInString(strings.TrimSpace(deltaReasoning)) +} + +func trimRunesFromEnd(text string, maxRunes int) string { + text = strings.TrimSpace(text) + if text == "" || maxRunes <= 0 { + return "" + } + + runes := []rune(text) + if len(runes) <= maxRunes { + return text + } + return string(runes[len(runes)-maxRunes:]) +} diff --git a/backend/newAgent/stream/emitter.go b/backend/newAgent/stream/emitter.go index 7cccde2..b0fb041 100644 --- a/backend/newAgent/stream/emitter.go +++ b/backend/newAgent/stream/emitter.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "strings" + "sync" "time" infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" @@ -62,6 +63,18 @@ type ChunkEmitter struct { RequestID string ModelName string Created int64 + + // thinkingGateMu 是“正文门卫”的轻量保护。 + // 1. 它只保护 thinking_summary 是否还能发,不串行化全部 SSE; + // 2. 正文一旦开始,对应 block 的门会被关闭,后续同 block 摘要直接丢弃; + // 3. 这样既避免摘要 goroutine 在正文之后补发旧思考,又不误杀后续节点的新一轮思考。 + thinkingGateMu sync.Mutex + thinkingClosedBlocks map[string]bool + // reasoningSummaryFunc 用于把原始 reasoning 压成用户可见摘要。 + // 1. 该函数由 service 层注入,stream 包只负责调度,不负责选择模型; + // 2. 未注入时模型 reasoning 只会被静默丢弃,不再回退成 raw reasoning_content; + // 3. 正文一旦开始,ReasoningDigestor 和 ChunkEmitter 会同时关门,迟到结果不会再发给前端。 + reasoningSummaryFunc ReasoningSummaryFunc // extraEventHook 用于把关键结构化事件同步给上层做持久化。 // 1. hook 失败不能影响 SSE 主链路; // 2. hook 只接收 extra 结构,避免 emitter 反向依赖业务层; @@ -122,6 +135,40 @@ func (e *ChunkEmitter) SetExtraEventHook(hook func(extra *OpenAIChunkExtra)) { e.extraEventHook = hook } +// SetReasoningSummaryFunc 设置 reasoning 摘要模型调用函数。 +// +// 职责边界: +// 1. 这里只保存函数引用,不立即调用模型; +// 2. 摘要触发频率、单飞、正文闸门由 ReasoningDigestor 负责; +// 3. 传 nil 表示关闭摘要能力,后续 reasoning chunk 会被静默丢弃。 +func (e *ChunkEmitter) SetReasoningSummaryFunc(fn ReasoningSummaryFunc) { + if e == nil { + return + } + e.reasoningSummaryFunc = fn +} + +// NewReasoningDigestor 为当前 block 创建一个 reasoning 摘要器。 +// +// 步骤说明: +// 1. 若未注入摘要函数,返回 nil,调用方只需跳过 raw reasoning 推送; +// 2. 摘要结果先经过 ChunkEmitter 的正文门卫,再走统一 extra/hook 链路; +// 3. Digestor 自身仍负责单飞、水位线和正文开始后的 in-flight 结果丢弃。 +func (e *ChunkEmitter) NewReasoningDigestor(ctx context.Context, blockID, stage string) (*ReasoningDigestor, error) { + if e == nil || e.reasoningSummaryFunc == nil { + return nil, nil + } + e.openThinkingSummaryGate(blockID, stage) + return NewReasoningDigestor(ReasoningDigestorOptions{ + SummaryFunc: e.reasoningSummaryFunc, + SummarySink: func(summary StreamThinkingSummaryExtra) { + _ = e.EmitThinkingSummary(blockID, stage, summary) + }, + BaseContext: ctx, + SummaryTimeout: 8 * time.Second, + }) +} + // EmitReasoningText 输出一段 reasoning 文字,并附带 reasoning_text extra。 func (e *ChunkEmitter) EmitReasoningText(blockID, stage, text string, includeRole bool) error { if e == nil || e.emit == nil { @@ -160,6 +207,7 @@ func (e *ChunkEmitter) EmitAssistantText(blockID, stage, text string, includeRol if text == "" { return nil } + e.closeThinkingSummaryGate(blockID, stage) payload, err := ToOpenAIAssistantChunkWithExtra( e.RequestID, @@ -178,6 +226,66 @@ func (e *ChunkEmitter) EmitAssistantText(blockID, stage, text string, includeRol return e.emit(payload) } +// EmitThinkingSummary 输出一次“流式思考摘要”事件。 +// +// 协议约束: +// 1. 该事件只走 extra.thinking_summary,不回写 delta.content / delta.reasoning_content; +// 2. 仍复用现有 extra hook,让上层在不依赖 emitter 细节的前提下同步持久化; +// 3. includeRole 不再需要,因为 thinking_summary 本身就是纯结构化事件。 +func (e *ChunkEmitter) EmitThinkingSummary(blockID, stage string, summary StreamThinkingSummaryExtra) error { + if e == nil || e.emit == nil { + return nil + } + if e.isThinkingSummaryGateClosed(blockID, stage) { + return nil + } + return e.emitExtraOnly(NewThinkingSummaryExtra(blockID, stage, summary)) +} + +func (e *ChunkEmitter) openThinkingSummaryGate(blockID, stage string) { + if e == nil { + return + } + e.thinkingGateMu.Lock() + if e.thinkingClosedBlocks != nil { + delete(e.thinkingClosedBlocks, thinkingSummaryGateKey(blockID, stage)) + } + e.thinkingGateMu.Unlock() +} + +func (e *ChunkEmitter) closeThinkingSummaryGate(blockID, stage string) { + if e == nil { + return + } + e.thinkingGateMu.Lock() + if e.thinkingClosedBlocks == nil { + e.thinkingClosedBlocks = make(map[string]bool) + } + e.thinkingClosedBlocks[thinkingSummaryGateKey(blockID, stage)] = true + e.thinkingGateMu.Unlock() +} + +func (e *ChunkEmitter) isThinkingSummaryGateClosed(blockID, stage string) bool { + if e == nil { + return true + } + e.thinkingGateMu.Lock() + defer e.thinkingGateMu.Unlock() + return e.thinkingClosedBlocks[thinkingSummaryGateKey(blockID, stage)] +} + +func thinkingSummaryGateKey(blockID, stage string) string { + blockID = strings.TrimSpace(blockID) + stage = strings.TrimSpace(stage) + if blockID != "" { + return blockID + } + if stage != "" { + return stage + } + return "__default__" +} + // EmitPseudoReasoningText 把整段 reasoning 文本按伪流式方式逐块推出。 func (e *ChunkEmitter) EmitPseudoReasoningText(ctx context.Context, blockID, stage, text string, options PseudoStreamOptions) error { return e.emitPseudoText( @@ -304,6 +412,9 @@ func (e *ChunkEmitter) EmitConfirmRequest(ctx context.Context, blockID, stage, i text := buildConfirmAssistantText(title, summary) extra := NewConfirmRequestExtra(blockID, stage, interactionID, title, summary) e.emitExtraEventHook(extra) + if strings.TrimSpace(text) != "" { + e.closeThinkingSummaryGate(blockID, stage) + } return e.emitPseudoText( ctx, text, @@ -341,6 +452,9 @@ func (e *ChunkEmitter) EmitInterruptMessage(ctx context.Context, blockID, stage, text := buildInterruptAssistantText(interactionType, summary) extra := NewInterruptExtra(blockID, stage, interactionID, interactionType, summary) + if strings.TrimSpace(text) != "" { + e.closeThinkingSummaryGate(blockID, stage) + } return e.emitPseudoText( ctx, text, @@ -435,6 +549,15 @@ func (e *ChunkEmitter) EmitStreamAssistantText( var fullText strings.Builder firstChunk := true + digestor, digestorErr := e.NewReasoningDigestor(ctx, blockID, stage) + if digestorErr != nil { + return "", digestorErr + } + defer func() { + if digestor != nil { + _ = digestor.Close(ctx) + } + }() for { chunk, err := reader.Recv() @@ -445,16 +568,19 @@ func (e *ChunkEmitter) EmitStreamAssistantText( return fullText.String(), err } - // 推送 reasoning content。 + // 1. reasoning content 只喂给摘要器,不再透传给前端。 + // 2. 未注入摘要能力时直接丢弃,避免 raw reasoning_content 泄漏到 SSE。 if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { - if emitErr := e.EmitReasoningText(blockID, stage, chunk.ReasoningContent, firstChunk); emitErr != nil { - return fullText.String(), emitErr + if digestor != nil { + digestor.Append(chunk.ReasoningContent) } - firstChunk = false } // 推送 assistant 正文。 if chunk != nil && chunk.Content != "" { + if digestor != nil { + digestor.MarkContentStarted() + } if emitErr := e.EmitAssistantText(blockID, stage, chunk.Content, firstChunk); emitErr != nil { return fullText.String(), emitErr } @@ -466,9 +592,9 @@ func (e *ChunkEmitter) EmitStreamAssistantText( return fullText.String(), nil } -// EmitStreamReasoningText 从 StreamReader 逐 chunk 读取并实时推送 reasoning 文字。 +// EmitStreamReasoningText 从 StreamReader 逐 chunk 读取 reasoning,并转成低频 thinking_summary。 // -// 与 EmitStreamAssistantText 结构相同,但只推送 ReasoningContent,不推送 Content。 +// 与 EmitStreamAssistantText 结构相同,但不再输出 raw ReasoningContent。 // 用于只需展示思考过程而无需展示正文的场景。 func (e *ChunkEmitter) EmitStreamReasoningText( ctx context.Context, @@ -480,7 +606,15 @@ func (e *ChunkEmitter) EmitStreamReasoningText( } var fullText strings.Builder - firstChunk := true + digestor, digestorErr := e.NewReasoningDigestor(ctx, blockID, stage) + if digestorErr != nil { + return "", digestorErr + } + defer func() { + if digestor != nil { + _ = digestor.Close(ctx) + } + }() for { chunk, err := reader.Recv() @@ -492,11 +626,10 @@ func (e *ChunkEmitter) EmitStreamReasoningText( } if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { - if emitErr := e.EmitReasoningText(blockID, stage, chunk.ReasoningContent, firstChunk); emitErr != nil { - return fullText.String(), emitErr + if digestor != nil { + digestor.Append(chunk.ReasoningContent) } fullText.WriteString(chunk.ReasoningContent) - firstChunk = false } } diff --git a/backend/newAgent/stream/openai.go b/backend/newAgent/stream/openai.go index 3c45f26..e038f53 100644 --- a/backend/newAgent/stream/openai.go +++ b/backend/newAgent/stream/openai.go @@ -40,6 +40,7 @@ type StreamExtraKind string const ( StreamExtraKindReasoningText StreamExtraKind = "reasoning_text" + StreamExtraKindThinkingSummary StreamExtraKind = "thinking_summary" StreamExtraKindAssistantText StreamExtraKind = "assistant_text" StreamExtraKindStatus StreamExtraKind = "status" StreamExtraKindToolCall StreamExtraKind = "tool_call" @@ -67,16 +68,31 @@ const ( // 2. Status / Tool / Confirm / Interrupt / BusinessCard 只存展示层真正需要的摘要,不直接耦合后端完整状态对象; // 3. Meta 留给后续做灰度扩展,避免每加一种小字段都要立刻改 DTO 结构。 type OpenAIChunkExtra struct { - Kind StreamExtraKind `json:"kind,omitempty"` - BlockID string `json:"block_id,omitempty"` - Stage string `json:"stage,omitempty"` - DisplayMode StreamDisplayMode `json:"display_mode,omitempty"` - Status *StreamStatusExtra `json:"status,omitempty"` - Tool *StreamToolExtra `json:"tool,omitempty"` - Confirm *StreamConfirmExtra `json:"confirm,omitempty"` - Interrupt *StreamInterruptExtra `json:"interrupt,omitempty"` - BusinessCard *StreamBusinessCardExtra `json:"business_card,omitempty"` - Meta map[string]any `json:"meta,omitempty"` + Kind StreamExtraKind `json:"kind,omitempty"` + BlockID string `json:"block_id,omitempty"` + Stage string `json:"stage,omitempty"` + DisplayMode StreamDisplayMode `json:"display_mode,omitempty"` + ThinkingSummary *StreamThinkingSummaryExtra `json:"thinking_summary,omitempty"` + Status *StreamStatusExtra `json:"status,omitempty"` + Tool *StreamToolExtra `json:"tool,omitempty"` + Confirm *StreamConfirmExtra `json:"confirm,omitempty"` + Interrupt *StreamInterruptExtra `json:"interrupt,omitempty"` + BusinessCard *StreamBusinessCardExtra `json:"business_card,omitempty"` + Meta map[string]any `json:"meta,omitempty"` +} + +// StreamThinkingSummaryExtra 表示“流式思考摘要”事件。 +// +// 职责边界: +// 1. short_summary 仅用于 SSE 端快速展示短句,不要求与持久化内容完全一致; +// 2. detail_summary 作为更完整的摘要正文,后续持久化层可直接复用; +// 3. summary_seq / final / duration_seconds 由摘要调度层补充运行态信息,前端可据此去重和排序。 +type StreamThinkingSummaryExtra struct { + SummarySeq int `json:"summary_seq,omitempty"` + ShortSummary string `json:"short_summary,omitempty"` + DetailSummary string `json:"detail_summary,omitempty"` + Final bool `json:"final,omitempty"` + DurationSeconds float64 `json:"duration_seconds,omitempty"` } // StreamStatusExtra 表示普通阶段状态或提示性事件。 @@ -195,6 +211,17 @@ func NewReasoningTextExtra(blockID, stage string) *OpenAIChunkExtra { } } +// NewThinkingSummaryExtra 创建“流式思考摘要”事件的 extra。 +func NewThinkingSummaryExtra(blockID, stage string, summary StreamThinkingSummaryExtra) *OpenAIChunkExtra { + return &OpenAIChunkExtra{ + Kind: StreamExtraKindThinkingSummary, + BlockID: blockID, + Stage: stage, + DisplayMode: StreamDisplayModeAppend, + ThinkingSummary: &summary, + } +} + // NewAssistantTextExtra 创建“正文文字”事件的 extra。 func NewAssistantTextExtra(blockID, stage string) *OpenAIChunkExtra { return &OpenAIChunkExtra{ @@ -367,6 +394,7 @@ func hasStreamExtra(extra *OpenAIChunkExtra) bool { extra.BlockID != "" || extra.Stage != "" || extra.DisplayMode != "" || + extra.ThinkingSummary != nil || extra.Status != nil || extra.Tool != nil || extra.Confirm != nil || diff --git a/backend/newAgent/stream/reasoning_digestor.go b/backend/newAgent/stream/reasoning_digestor.go new file mode 100644 index 0000000..2c65ce0 --- /dev/null +++ b/backend/newAgent/stream/reasoning_digestor.go @@ -0,0 +1,599 @@ +package newagentstream + +import ( + "context" + "errors" + "strings" + "sync" + "time" + "unicode" + "unicode/utf8" +) + +const ( + defaultReasoningDigestMinNewRunes = 120 + defaultReasoningDigestMinNewTokens = 80 + defaultReasoningDigestMinInterval = 3 * time.Second +) + +// ReasoningSummaryFunc 负责真正调用摘要模型。 +// +// 职责边界: +// 1. 该函数只负责“把输入整理成一份摘要结果”,不负责调度、节流、正文闸门和结果丢弃; +// 2. 返回值里的 short/detail 由模型或适配层填写; +// 3. summary_seq / final / duration_seconds 由 ReasoningDigestor 统一补齐,避免上层重复维护运行态字段。 +type ReasoningSummaryFunc func(ctx context.Context, input ReasoningSummaryInput) (StreamThinkingSummaryExtra, error) + +// ReasoningSummarySink 负责消费一条已经通过闸门校验的摘要结果。 +// +// 职责边界: +// 1. 常见用法是把结果交给 ChunkEmitter.EmitThinkingSummary; +// 2. 该回调不参与单飞、重试、水位线判断; +// 3. 回调为 nil 时,Digestor 仍会维护 LatestSummary,方便调用方按需主动拉取。 +type ReasoningSummarySink func(summary StreamThinkingSummaryExtra) + +// ReasoningSummaryInput 是注入给摘要模型调用方的统一输入。 +// +// 职责边界: +// 1. FullReasoning 提供完整 reasoning 缓冲区,适合做“全量重摘要”; +// 2. DeltaReasoning + PreviousSummary 提供增量上下文,适合做“旧摘要续写”; +// 3. CandidateSeq / Final / DurationSeconds 仅表达调度层意图,不要求模型原样回填。 +type ReasoningSummaryInput struct { + FullReasoning string `json:"full_reasoning,omitempty"` + DeltaReasoning string `json:"delta_reasoning,omitempty"` + PreviousSummary *StreamThinkingSummaryExtra `json:"previous_summary,omitempty"` + CandidateSeq int `json:"candidate_seq,omitempty"` + Final bool `json:"final,omitempty"` + DurationSeconds float64 `json:"duration_seconds,omitempty"` +} + +// ReasoningDigestorOptions 描述 reasoning 摘要器的调度参数。 +type ReasoningDigestorOptions struct { + SummaryFunc ReasoningSummaryFunc + SummarySink ReasoningSummarySink + BaseContext context.Context + MinNewRunes int + MinNewTokens int + MinInterval time.Duration + SummaryTimeout time.Duration + Now func() time.Time +} + +// ReasoningDigestor 负责把流式 reasoning 文本整理成“低频摘要事件”。 +// +// 职责边界: +// 1. 只负责缓冲、单飞、水位线、正文闸门、Flush/Close,不直接依赖 AgentService; +// 2. 只通过 SummaryFunc / SummarySink 两个函数注入模型调用与结果消费,不在这里选模型; +// 3. 一旦正文开始或显式关闸,后续摘要结果即使返回成功也必须丢弃,避免前端和持久化出现越界数据。 +type ReasoningDigestor struct { + summaryFunc ReasoningSummaryFunc + summarySink ReasoningSummarySink + baseContext context.Context + minNewRunes int + minNewTokens int + minInterval time.Duration + summaryTimeout time.Duration + now func() time.Time + + mu sync.Mutex + cond *sync.Cond + buffer strings.Builder + deltaBuffer strings.Builder + startedAt time.Time + lastRequestAt time.Time + pendingRunes int + pendingTokens int + summarySeq int + latestSummary *StreamThinkingSummaryExtra + finalEmitted bool + inFlight bool + gateClosed bool + contentStarted bool + closed bool + timer *time.Timer + timerArmed bool + currentCancel context.CancelFunc +} + +type reasoningDigestCall struct { + ctx context.Context + stop context.CancelFunc + input ReasoningSummaryInput + final bool +} + +// NewReasoningDigestor 创建一个只关注“流式思考摘要调度”的核心对象。 +// +// 步骤说明: +// 1. 先校验 SummaryFunc;它是唯一必填项,因为 Digestor 不在本文件里选择模型; +// 2. 再补齐默认水位线和最小时间间隔,让调用方即使只传核心依赖也能启动; +// 3. 最后只初始化并发控制原语,不在构造阶段启动常驻主循环,避免引入额外 goroutine 生命周期负担。 +func NewReasoningDigestor(options ReasoningDigestorOptions) (*ReasoningDigestor, error) { + if options.SummaryFunc == nil { + return nil, errors.New("reasoning digestor: SummaryFunc 不能为空") + } + + if options.MinNewRunes < 0 { + options.MinNewRunes = 0 + } + if options.MinNewTokens < 0 { + options.MinNewTokens = 0 + } + if options.MinNewRunes == 0 && options.MinNewTokens == 0 { + options.MinNewRunes = defaultReasoningDigestMinNewRunes + options.MinNewTokens = defaultReasoningDigestMinNewTokens + } + if options.MinInterval <= 0 { + options.MinInterval = defaultReasoningDigestMinInterval + } + if options.BaseContext == nil { + options.BaseContext = context.Background() + } + if options.Now == nil { + options.Now = time.Now + } + + digestor := &ReasoningDigestor{ + summaryFunc: options.SummaryFunc, + summarySink: options.SummarySink, + baseContext: options.BaseContext, + minNewRunes: options.MinNewRunes, + minNewTokens: options.MinNewTokens, + minInterval: options.MinInterval, + summaryTimeout: options.SummaryTimeout, + now: options.Now, + } + digestor.cond = sync.NewCond(&digestor.mu) + return digestor, nil +} + +// Append 追加一段 reasoning chunk,并按水位线决定是否后台触发摘要。 +// +// 步骤说明: +// 1. 先把原始 reasoning 文本写入 full buffer,保证 Flush/Close 可以拿到全量上下文; +// 2. 再把本轮新增文本记入 deltaBuffer 与 rune/token 水位线,用于“最小新增量”判断; +// 3. 若正文闸门已关闭,则只保留缓冲快照,不再调度摘要; +// 4. 若当前已有摘要请求在飞,则只更新 dirty/latest,不排队第二个请求,等单飞请求返回后再决定是否补一次。 +func (d *ReasoningDigestor) Append(reasoning string) { + if d == nil || reasoning == "" { + return + } + + var call reasoningDigestCall + var shouldStart bool + + d.mu.Lock() + if d.closed { + d.mu.Unlock() + return + } + + if d.startedAt.IsZero() { + d.startedAt = d.now() + } + d.buffer.WriteString(reasoning) + + if d.gateClosed || d.contentStarted { + d.mu.Unlock() + return + } + + d.deltaBuffer.WriteString(reasoning) + d.pendingRunes += utf8.RuneCountInString(reasoning) + d.pendingTokens += estimateReasoningTokens(reasoning) + d.finalEmitted = false + + call, shouldStart = d.prepareSummaryLocked(d.baseContext, false, false) + d.mu.Unlock() + + if shouldStart { + go d.runSummary(call) + } +} + +// MarkContentStarted 标记“正文已经开始输出”。 +// +// 职责边界: +// 1. 该方法会直接关闭摘要闸门; +// 2. 它不回收旧摘要结果,但会丢弃后续任何尚未完成的摘要调用; +// 3. 调用后即使继续 Append reasoning,也只保留缓冲,不再触发新摘要。 +func (d *ReasoningDigestor) MarkContentStarted() { + d.closeGate(true) +} + +// CloseGate 显式关闭摘要闸门,但不额外声明正文已经开始。 +func (d *ReasoningDigestor) CloseGate() { + d.closeGate(false) +} + +// Flush 在正文尚未开始时尝试补发最后一次摘要。 +// +// 步骤说明: +// 1. 先等待当前单飞请求结束,避免 Flush 与后台自动摘要并发跑两次; +// 2. 若正文已经开始或闸门已关,则直接返回,不再补摘要; +// 3. 若此前已经发过 final 且没有新增 reasoning,则跳过,避免重复 final 事件; +// 4. 其余场景会强制走一次摘要,即使新增量还没达到自动触发水位线。 +func (d *ReasoningDigestor) Flush(ctx context.Context) error { + if d == nil { + return nil + } + + call, shouldStart := d.prepareFlushCall(ctx) + if !shouldStart { + return nil + } + return d.runSummary(call) +} + +// Close 结束摘要器生命周期。 +// +// 步骤说明: +// 1. 若正文还未开始,先尝试 Flush 一次 final 摘要; +// 2. 再关闭闸门、停止等待中的定时器,并取消正在进行的摘要调用; +// 3. 最后等待单飞调用完全退出,避免遗留后台 goroutine 持续写结果。 +func (d *ReasoningDigestor) Close(ctx context.Context) error { + if d == nil { + return nil + } + + if err := d.Flush(ctx); err != nil { + return err + } + + d.mu.Lock() + if d.closed { + d.mu.Unlock() + return nil + } + + d.closed = true + d.gateClosed = true + d.stopTimerLocked() + if d.currentCancel != nil { + d.currentCancel() + } + for d.inFlight { + d.cond.Wait() + } + d.mu.Unlock() + return nil +} + +// LatestSummary 返回最近一次通过闸门校验并成功发布的摘要。 +func (d *ReasoningDigestor) LatestSummary() (StreamThinkingSummaryExtra, bool) { + if d == nil { + return StreamThinkingSummaryExtra{}, false + } + + d.mu.Lock() + defer d.mu.Unlock() + + if d.latestSummary == nil { + return StreamThinkingSummaryExtra{}, false + } + return *cloneThinkingSummaryExtra(d.latestSummary), true +} + +func (d *ReasoningDigestor) closeGate(markContentStarted bool) { + if d == nil { + return + } + + d.mu.Lock() + if markContentStarted { + d.contentStarted = true + } + d.gateClosed = true + d.pendingRunes = 0 + d.pendingTokens = 0 + d.deltaBuffer.Reset() + d.stopTimerLocked() + if d.currentCancel != nil { + d.currentCancel() + } + d.mu.Unlock() +} + +func (d *ReasoningDigestor) prepareFlushCall(ctx context.Context) (reasoningDigestCall, bool) { + d.mu.Lock() + defer d.mu.Unlock() + + if d.closed || d.gateClosed || d.contentStarted { + return reasoningDigestCall{}, false + } + + d.stopTimerLocked() + for d.inFlight { + d.cond.Wait() + if d.closed || d.gateClosed || d.contentStarted { + return reasoningDigestCall{}, false + } + } + + if strings.TrimSpace(d.buffer.String()) == "" { + return reasoningDigestCall{}, false + } + if d.finalEmitted && d.pendingRunes == 0 && d.pendingTokens == 0 { + return reasoningDigestCall{}, false + } + return d.prepareSummaryLocked(ctx, true, true) +} + +func (d *ReasoningDigestor) prepareSummaryLocked(parent context.Context, force bool, final bool) (reasoningDigestCall, bool) { + if d.closed || d.gateClosed || d.contentStarted || d.inFlight { + return reasoningDigestCall{}, false + } + + fullReasoning := d.buffer.String() + if strings.TrimSpace(fullReasoning) == "" { + return reasoningDigestCall{}, false + } + + // 1. 自动摘要必须同时满足“新增量水位线 + 最小时间间隔”。 + // 2. 若新增量不足,则直接等待后续 Append,不做空转请求。 + // 3. 若时间间隔未到,则只挂一个定时器做兜底唤醒,避免排队多个请求。 + if !force { + if !d.reachedWatermarkLocked() { + return reasoningDigestCall{}, false + } + wait := d.nextAllowedIntervalLocked() + if wait > 0 { + d.armTimerLocked(wait) + return reasoningDigestCall{}, false + } + } + + callCtx, stop := d.newCallContext(parent) + call := reasoningDigestCall{ + ctx: callCtx, + stop: stop, + input: ReasoningSummaryInput{ + FullReasoning: strings.Clone(fullReasoning), + DeltaReasoning: strings.Clone(d.deltaBuffer.String()), + PreviousSummary: cloneThinkingSummaryExtra(d.latestSummary), + CandidateSeq: d.summarySeq + 1, + Final: final, + DurationSeconds: d.durationSecondsLocked(), + }, + final: final, + } + + d.stopTimerLocked() + d.inFlight = true + d.lastRequestAt = d.now() + d.pendingRunes = 0 + d.pendingTokens = 0 + d.deltaBuffer.Reset() + d.currentCancel = stop + + return call, true +} + +func (d *ReasoningDigestor) runSummary(call reasoningDigestCall) error { + if call.stop == nil { + return nil + } + defer call.stop() + + summary, err := d.summaryFunc(call.ctx, call.input) + if err != nil { + // 1. 摘要失败时不把错误扩散回主流式链路,避免 reasoning 展示被摘要能力反向拖垮。 + // 2. 若失败期间又追加了新 reasoning,则仍按单飞规则尝试补下一次;否则等待后续 Append/Flush 兜底。 + _, _, nextCall, shouldStart := d.finishSummary(call.final, nil) + if shouldStart { + go d.runSummary(nextCall) + } + return err + } + + normalized := normalizeThinkingSummary(summary, call.input.Final, call.input.DurationSeconds) + emittedSummary, sink, nextCall, shouldStart := d.finishSummary(call.final, &normalized) + if emittedSummary != nil && sink != nil { + sink(*emittedSummary) + } + if shouldStart { + go d.runSummary(nextCall) + } + return nil +} + +func (d *ReasoningDigestor) finishSummary(final bool, summary *StreamThinkingSummaryExtra) (*StreamThinkingSummaryExtra, ReasoningSummarySink, reasoningDigestCall, bool) { + d.mu.Lock() + defer d.mu.Unlock() + + d.inFlight = false + d.currentCancel = nil + d.cond.Broadcast() + + var emittedSummary *StreamThinkingSummaryExtra + var sink ReasoningSummarySink + + // 1. 先判断正文闸门;正文一旦开始,所有晚到结果都必须丢弃。 + // 2. 再补齐 summary_seq/final/duration,并缓存 LatestSummary 供上层读取。 + // 3. 若当前请求期间又积累了新 reasoning,则只启动下一次单飞摘要,不排队多次。 + if summary != nil && !d.closed && !d.gateClosed && !d.contentStarted { + normalized := *summary + d.summarySeq++ + normalized.SummarySeq = d.summarySeq + normalized.Final = final + if normalized.DurationSeconds <= 0 { + normalized.DurationSeconds = d.durationSecondsLocked() + } + d.latestSummary = cloneThinkingSummaryExtra(&normalized) + d.finalEmitted = final + emittedSummary = cloneThinkingSummaryExtra(&normalized) + sink = d.summarySink + } + + if d.closed || d.gateClosed || d.contentStarted || final { + return emittedSummary, sink, reasoningDigestCall{}, false + } + + nextCall, shouldStart := d.prepareSummaryLocked(d.baseContext, false, false) + return emittedSummary, sink, nextCall, shouldStart +} + +func (d *ReasoningDigestor) reachedWatermarkLocked() bool { + return reachedReasoningWatermark(d.pendingRunes, d.pendingTokens, d.minNewRunes, d.minNewTokens) +} + +func (d *ReasoningDigestor) nextAllowedIntervalLocked() time.Duration { + if d.lastRequestAt.IsZero() { + return 0 + } + wait := d.minInterval - d.now().Sub(d.lastRequestAt) + if wait < 0 { + return 0 + } + return wait +} + +func (d *ReasoningDigestor) armTimerLocked(wait time.Duration) { + if wait <= 0 || d.closed || d.gateClosed || d.contentStarted { + return + } + if d.timer == nil { + d.timer = time.AfterFunc(wait, d.onTimer) + d.timerArmed = true + return + } + if d.timerArmed { + d.timer.Reset(wait) + return + } + d.timer.Reset(wait) + d.timerArmed = true +} + +func (d *ReasoningDigestor) stopTimerLocked() { + if d.timer == nil { + return + } + if d.timer.Stop() { + d.timerArmed = false + return + } + d.timerArmed = false +} + +func (d *ReasoningDigestor) onTimer() { + if d == nil { + return + } + + var call reasoningDigestCall + var shouldStart bool + + d.mu.Lock() + d.timerArmed = false + call, shouldStart = d.prepareSummaryLocked(d.baseContext, false, false) + d.mu.Unlock() + + if shouldStart { + go d.runSummary(call) + } +} + +func (d *ReasoningDigestor) newCallContext(parent context.Context) (context.Context, context.CancelFunc) { + if parent == nil { + parent = d.baseContext + } + if parent == nil { + parent = context.Background() + } + + baseCtx, baseCancel := context.WithCancel(parent) + if d.summaryTimeout <= 0 { + return baseCtx, baseCancel + } + + timeoutCtx, timeoutCancel := context.WithTimeout(baseCtx, d.summaryTimeout) + return timeoutCtx, func() { + timeoutCancel() + baseCancel() + } +} + +func (d *ReasoningDigestor) durationSecondsLocked() float64 { + if d.startedAt.IsZero() { + return 0 + } + duration := d.now().Sub(d.startedAt) + if duration <= 0 { + return 0 + } + return float64(duration.Milliseconds()) / 1000 +} + +func reachedReasoningWatermark(pendingRunes, pendingTokens, minRunes, minTokens int) bool { + if minRunes > 0 && pendingRunes >= minRunes { + return true + } + if minTokens > 0 && pendingTokens >= minTokens { + return true + } + return false +} + +func normalizeThinkingSummary(summary StreamThinkingSummaryExtra, final bool, durationSeconds float64) StreamThinkingSummaryExtra { + summary.ShortSummary = strings.TrimSpace(summary.ShortSummary) + summary.DetailSummary = strings.TrimSpace(summary.DetailSummary) + + // 1. 短摘要只是实时展示兜底,允许从长摘要压一个默认值。 + // 2. 反过来不能把短摘要补成 detail_summary,否则会绕过“短摘要不持久化”的产品语义。 + // 3. 若模型没有给 detail_summary,timeline 层会跳过持久化,仅保留本次 SSE 展示。 + if summary.ShortSummary == "" { + summary.ShortSummary = summary.DetailSummary + } + summary.Final = final + if summary.DurationSeconds <= 0 { + summary.DurationSeconds = durationSeconds + } + return summary +} + +func cloneThinkingSummaryExtra(src *StreamThinkingSummaryExtra) *StreamThinkingSummaryExtra { + if src == nil { + return nil + } + clone := *src + return &clone +} + +func estimateReasoningTokens(text string) int { + text = strings.TrimSpace(text) + if text == "" { + return 0 + } + + asciiRunes := 0 + totalTokens := 0 + for _, r := range text { + switch { + case unicode.IsSpace(r): + if asciiRunes > 0 { + totalTokens += compactASCIITokens(asciiRunes) + asciiRunes = 0 + } + case r <= unicode.MaxASCII && (unicode.IsLetter(r) || unicode.IsDigit(r)): + asciiRunes++ + default: + if asciiRunes > 0 { + totalTokens += compactASCIITokens(asciiRunes) + asciiRunes = 0 + } + totalTokens++ + } + } + if asciiRunes > 0 { + totalTokens += compactASCIITokens(asciiRunes) + } + return totalTokens +} + +func compactASCIITokens(asciiRunes int) int { + if asciiRunes <= 0 { + return 0 + } + return max(1, (asciiRunes+3)/4) +} diff --git a/backend/service/agentsvc/agent.go b/backend/service/agentsvc/agent.go index 7c7605c..63435a3 100644 --- a/backend/service/agentsvc/agent.go +++ b/backend/service/agentsvc/agent.go @@ -353,12 +353,11 @@ func (s *AgentService) runNormalChatFlow( // 6. 执行真正的流式聊天。 // fullText 用于后续写 Redis/持久化,outChan 用于把流片段实时推给前端。 - fullText, reasoningText, reasoningDurationSeconds, streamUsage, streamErr := s.streamChatFallback(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan, assistantReasoningStartedAt) + fullText, _, reasoningDurationSeconds, streamUsage, streamErr := s.streamChatFallback(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan, assistantReasoningStartedAt, userID, chatID) if streamErr != nil { pushErrNonBlocking(errChan, streamErr) return } - assistantReasoning := mergeAgentReasoningText(assistantReasoningPrefix, reasoningText) // 6.1 流式 usage 并入请求级 token 统计器: // 6.1.1 route/quicknote/taskquery 等 Generate 调用由 callback 自动累加; @@ -413,7 +412,7 @@ func (s *AgentService) runNormalChatFlow( // 8. 后置持久化(助手消息): // 8.1 先写 Redis,保证下一轮上下文可见; // 8.2 再异步可靠落库,失败通过 errChan 回传给上层。 - assistantMsg := &schema.Message{Role: schema.Assistant, Content: fullText, ReasoningContent: assistantReasoning} + assistantMsg := &schema.Message{Role: schema.Assistant, Content: fullText} if reasoningDurationSeconds > 0 { assistantMsg.Extra = map[string]any{"reasoning_duration_seconds": reasoningDurationSeconds} } @@ -426,7 +425,7 @@ func (s *AgentService) runNormalChatFlow( ConversationID: chatID, Role: "assistant", Message: fullText, - ReasoningContent: assistantReasoning, + ReasoningContent: "", ReasoningDurationSeconds: reasoningDurationSeconds, // 口径B:助手消息记录“本轮请求总 token”。 TokensConsumed: requestTotalTokens, @@ -434,9 +433,6 @@ func (s *AgentService) runNormalChatFlow( pushErrNonBlocking(errChan, saveErr) } else { assistantTimelinePayload := map[string]any{} - if strings.TrimSpace(assistantReasoning) != "" { - assistantTimelinePayload["reasoning_content"] = strings.TrimSpace(assistantReasoning) - } if reasoningDurationSeconds > 0 { assistantTimelinePayload["reasoning_duration_seconds"] = reasoningDurationSeconds } diff --git a/backend/service/agentsvc/agent_newagent.go b/backend/service/agentsvc/agent_newagent.go index ef7b00b..a5c9d25 100644 --- a/backend/service/agentsvc/agent_newagent.go +++ b/backend/service/agentsvc/agent_newagent.go @@ -182,10 +182,12 @@ func (s *AgentService) runNewAgentGraph( planClient := infrallm.WrapArkClient(s.AIHub.Max) executeClient := infrallm.WrapArkClient(s.AIHub.Max) deliverClient := infrallm.WrapArkClient(s.AIHub.Pro) + summaryClient := infrallm.WrapArkClient(s.AIHub.Lite) // 8. 适配 SSE emitter。 sseEmitter := newagentstream.NewSSEPayloadEmitter(outChan) chunkEmitter := newagentstream.NewChunkEmitter(sseEmitter, traceID, resolvedModelName, requestStart.Unix()) + chunkEmitter.SetReasoningSummaryFunc(s.makeReasoningSummaryFunc(summaryClient)) // 关键卡片事件走统一时间线持久化,保证刷新后可重建。 chunkEmitter.SetExtraEventHook(func(extra *newagentstream.OpenAIChunkExtra) { s.persistNewAgentTimelineExtraEvent(context.Background(), userID, chatID, extra) @@ -449,9 +451,11 @@ func (s *AgentService) persistNewAgentConversationMessage( } persistMsg := &schema.Message{ - Role: msg.Role, - Content: content, - ReasoningContent: strings.TrimSpace(msg.ReasoningContent), + Role: msg.Role, + Content: content, + // 可见消息持久化只保存正文;模型 raw reasoning 改由 thinking_summary 生成用户可见摘要, + // 避免历史接口或时间线刷新时重新暴露内部思考文本。 + ReasoningContent: "", } if len(msg.Extra) > 0 { persistMsg.Extra = make(map[string]any, len(msg.Extra)) @@ -498,9 +502,6 @@ func (s *AgentService) persistNewAgentConversationMessage( timelineKind = model.AgentTimelineKindAssistantText } timelinePayload := map[string]any{} - if persistPayload.ReasoningContent != "" { - timelinePayload["reasoning_content"] = persistPayload.ReasoningContent - } if reasoningDurationSeconds > 0 { timelinePayload["reasoning_duration_seconds"] = reasoningDurationSeconds } diff --git a/backend/service/agentsvc/agent_stream_fallback.go b/backend/service/agentsvc/agent_stream_fallback.go index 01c9171..38c5ef6 100644 --- a/backend/service/agentsvc/agent_stream_fallback.go +++ b/backend/service/agentsvc/agent_stream_fallback.go @@ -6,6 +6,7 @@ import ( "strings" "time" + infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" "github.com/cloudwego/eino-ext/components/model/ark" @@ -25,6 +26,8 @@ func (s *AgentService) streamChatFallback( chatHistory []*schema.Message, outChan chan<- string, reasoningStartAt *time.Time, + userID int, + chatID string, ) (string, string, int, *schema.TokenUsage, error) { messages := make([]*schema.Message, 0, len(chatHistory)+2) messages = append(messages, schema.SystemMessage(newagentprompt.SystemPrompt)) @@ -46,6 +49,24 @@ func (s *AgentService) streamChatFallback( requestID := "chatcmpl-" + uuid.NewString() created := time.Now().Unix() firstChunk := true + chunkEmitter := newagentstream.NewChunkEmitter(newagentstream.NewSSEPayloadEmitter(outChan), requestID, modelName, created) + chunkEmitter.SetReasoningSummaryFunc(s.makeReasoningSummaryFunc(infrallm.WrapArkClient(s.AIHub.Lite))) + chunkEmitter.SetExtraEventHook(func(extra *newagentstream.OpenAIChunkExtra) { + s.persistNewAgentTimelineExtraEvent(context.Background(), userID, chatID, extra) + }) + reasoningDigestor, digestorErr := chunkEmitter.NewReasoningDigestor(ctx, "fallback.speak", "fallback") + if digestorErr != nil { + return "", "", 0, nil, digestorErr + } + digestorClosed := false + closeDigestor := func() { + if reasoningDigestor == nil || digestorClosed { + return + } + digestorClosed = true + _ = reasoningDigestor.Close(ctx) + } + defer closeDigestor() var localReasoningStartAt *time.Time if reasoningStartAt != nil && !reasoningStartAt.IsZero() { @@ -61,7 +82,6 @@ func (s *AgentService) streamChatFallback( defer reader.Close() var fullText strings.Builder - var reasoningText strings.Builder var tokenUsage *schema.TokenUsage for { chunk, recvErr := reader.Recv() @@ -85,26 +105,31 @@ func (s *AgentService) streamChatFallback( now := time.Now() reasoningEndAt = &now } - fullText.WriteString(chunk.Content) - reasoningText.WriteString(chunk.ReasoningContent) - } - - payload, payloadErr := newagentstream.ToOpenAIStream(chunk, requestID, modelName, created, firstChunk) - if payloadErr != nil { - return "", "", 0, nil, payloadErr - } - if payload != "" { - outChan <- payload - firstChunk = false + // 1. fallback 链路同样不能透传 raw reasoning_content; + // 2. 只把 reasoning 喂给摘要器,正文出现时立即关门丢弃后续摘要。 + if strings.TrimSpace(chunk.ReasoningContent) != "" && reasoningDigestor != nil { + reasoningDigestor.Append(chunk.ReasoningContent) + } + if chunk.Content != "" { + if reasoningDigestor != nil { + reasoningDigestor.MarkContentStarted() + } + if emitErr := chunkEmitter.EmitAssistantText("fallback.speak", "fallback", chunk.Content, firstChunk); emitErr != nil { + return "", "", 0, nil, emitErr + } + fullText.WriteString(chunk.Content) + firstChunk = false + } } } + closeDigestor() - finishChunk, finishErr := newagentstream.ToOpenAIFinishStream(requestID, modelName, created) - if finishErr != nil { + if finishErr := chunkEmitter.EmitFinish("fallback.speak", "fallback"); finishErr != nil { return "", "", 0, nil, finishErr } - outChan <- finishChunk - outChan <- "[DONE]" + if doneErr := chunkEmitter.EmitDone(); doneErr != nil { + return "", "", 0, nil, doneErr + } reasoningDurationSeconds := 0 if localReasoningStartAt != nil { @@ -117,5 +142,5 @@ func (s *AgentService) streamChatFallback( } } - return fullText.String(), reasoningText.String(), reasoningDurationSeconds, tokenUsage, nil + return fullText.String(), "", reasoningDurationSeconds, tokenUsage, nil } diff --git a/backend/service/agentsvc/agent_timeline.go b/backend/service/agentsvc/agent_timeline.go index 793db15..fe25e87 100644 --- a/backend/service/agentsvc/agent_timeline.go +++ b/backend/service/agentsvc/agent_timeline.go @@ -10,6 +10,7 @@ import ( "github.com/LoveLosita/smartflow/backend/model" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" + eventsvc "github.com/LoveLosita/smartflow/backend/service/events" "gorm.io/gorm" ) @@ -63,13 +64,13 @@ func (s *AgentService) GetConversationTimeline(ctx context.Context, userID int, return normalizeConversationTimelineItems(items), nil } -// appendConversationTimelineEvent 统一追加单条时间线事件到 Redis + MySQL。 +// appendConversationTimelineEvent 统一追加单条时间线事件到 Redis + outbox。 // // 步骤化说明: -// 1. 先从 Redis INCR 分配 seq,若 Redis 异常则回退 DB MAX(seq)+1; -// 2. 再写 MySQL,保证刷新时至少有权威持久化; -// 3. 最后追加 Redis 时间线列表,失败只记日志,不影响主链路返回; -// 4. 返回分配到的 seq,便于后续扩展在 SSE meta 回传顺序号。 +// 1. 先分配同会话内单调递增的 seq,优先走 Redis,Redis 不可用时回退 DB; +// 2. 再把事件同步追加到 Redis timeline cache,保证刷新前的用户体验连续; +// 3. 最后发布 outbox 事件异步落 MySQL,与 chat history 的可靠落库方式对齐; +// 4. 未注入 eventPublisher 时走同步 MySQL fallback,方便本地极简环境启动。 func (s *AgentService) appendConversationTimelineEvent( ctx context.Context, userID int, @@ -95,86 +96,260 @@ func (s *AgentService) appendConversationTimelineEvent( return 0, errors.New("invalid timeline event identity") } + normalizedContent, normalizedPayload, shouldPersist := normalizeConversationTimelinePersistMaterial(normalizedKind, normalizedContent, payload) + if !shouldPersist { + return 0, nil + } + seq, err := s.nextConversationTimelineSeq(ctx, userID, normalizedChatID) if err != nil { return 0, err } - payloadJSON := marshalTimelinePayloadJSON(payload) - persistPayload := model.ChatTimelinePersistPayload{ + persistPayload := (model.ChatTimelinePersistPayload{ UserID: userID, ConversationID: normalizedChatID, Seq: seq, Kind: normalizedKind, Role: normalizedRole, Content: normalizedContent, - PayloadJSON: payloadJSON, + PayloadJSON: marshalTimelinePayloadJSON(normalizedPayload), TokensConsumed: tokensConsumed, + }).Normalize() + if s.eventPublisher != nil { + now := time.Now() + + // 1. 先写 Redis timeline cache,让刷新前的本地态和下一轮上下文都能立即看到这条事件。 + // 2. 再发布 outbox 事件,与 chat history 保持相同的“入队成功即返回”语义。 + // 3. 若 outbox 发布失败,这里返回 error 交给上层处理,不在本方法里偷偷回退成同步写库。 + s.appendConversationTimelineCacheNonBlocking( + ctx, + userID, + normalizedChatID, + buildConversationTimelineCacheItem(0, seq, normalizedKind, normalizedRole, normalizedContent, normalizedPayload, tokensConsumed, &now), + ) + if err := eventsvc.PublishAgentTimelinePersistRequested(ctx, s.eventPublisher, persistPayload); err != nil { + return 0, err + } + return seq, nil } + return s.appendConversationTimelineEventSync(ctx, userID, normalizedChatID, persistPayload, normalizedPayload) +} + +// appendConversationTimelineEventSync 在未启用 outbox 时同步写 MySQL。 +// +// 步骤化说明: +// 1. 本方法只作为 eventPublisher 为空时的降级路径,保证本地环境不依赖总线; +// 2. 若 seq 唯一键冲突,读取 DB 最大 seq 后补一个新序号,语义与 outbox 消费者保持一致; +// 3. MySQL 写入成功后再追加 Redis cache,让缓存拿到数据库生成的 id/created_at。 +func (s *AgentService) appendConversationTimelineEventSync( + ctx context.Context, + userID int, + chatID string, + persistPayload model.ChatTimelinePersistPayload, + payload map[string]any, +) (int64, error) { eventID, eventCreatedAt, err := s.repo.SaveConversationTimelineEvent(ctx, persistPayload) if err != nil { - // 1. 并发极端场景下(例如 Redis seq 分配失败后 DB 兜底)可能产生重复 seq; - // 2. 这里做一次“读取最新 MAX(seq)+1”的重试,避免主链路直接失败; - // 3. 重试仍失败则返回错误,让调用方感知真实落库失败。 - if !isTimelineSeqConflictError(err) { + // 1. 这里的冲突通常来自 Redis seq key 过期或落后于 DB。 + // 2. 由于当前是同步写库链路,可以直接读取 DB 当前最大 seq 并补一个新序号。 + // 3. 若重试后仍失败,则把数据库错误原样抛给上层,避免悄悄吞掉真实问题。 + if !model.IsTimelineSeqConflictError(err) { return 0, err } - maxSeq, seqErr := s.repo.GetConversationTimelineMaxSeq(ctx, userID, normalizedChatID) + maxSeq, seqErr := s.repo.GetConversationTimelineMaxSeq(ctx, userID, chatID) if seqErr != nil { - return 0, err + return 0, seqErr } persistPayload.Seq = maxSeq + 1 - var retryErr error - eventID, eventCreatedAt, retryErr = s.repo.SaveConversationTimelineEvent(ctx, persistPayload) - if retryErr != nil { - return 0, retryErr + eventID, eventCreatedAt, err = s.repo.SaveConversationTimelineEvent(ctx, persistPayload) + if err != nil { + return 0, err } - seq = persistPayload.Seq if s.cacheDAO != nil { - if setErr := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, normalizedChatID, seq); setErr != nil { - log.Printf("时间线 seq 冲突重试后回写 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, normalizedChatID, seq, setErr) + if setErr := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, chatID, persistPayload.Seq); setErr != nil { + log.Printf("回填时间线 seq 到 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, chatID, persistPayload.Seq, setErr) } } } + s.appendConversationTimelineCacheNonBlocking( + ctx, + userID, + chatID, + buildConversationTimelineCacheItem( + eventID, + persistPayload.Seq, + persistPayload.Kind, + persistPayload.Role, + persistPayload.Content, + payload, + persistPayload.TokensConsumed, + eventCreatedAt, + ), + ) + return persistPayload.Seq, nil +} + +// appendConversationTimelineCacheNonBlocking 尽力把单条 timeline 事件追加到 Redis。 +// +// 步骤化说明: +// 1. 缓存失败不能反向影响主链路,因为 MySQL/outbox 才是最终可靠写入; +// 2. 这里统一记录错误日志,方便排查 Redis 不可用或 payload 序列化问题; +// 3. item 由调用方提前标准化,本方法不再二次裁剪业务字段。 +func (s *AgentService) appendConversationTimelineCacheNonBlocking( + ctx context.Context, + userID int, + chatID string, + item model.GetConversationTimelineItem, +) { + if s.cacheDAO == nil { + return + } + if err := s.cacheDAO.AppendConversationTimelineEventToCache(ctx, userID, chatID, item); err != nil { + log.Printf("追加时间线缓存失败 user=%d chat=%s seq=%d kind=%s err=%v", userID, chatID, item.Seq, item.Kind, err) + } +} + +// nextConversationTimelineSeq 负责分配一条新的 timeline seq。 +// +// 步骤化说明: +// 1. 优先走 Redis INCR,避免所有事件都串行依赖 MySQL; +// 2. 再用 DB MAX(seq) 做一次自检,尽量把“Redis key 过期/落后”在写入前提前修正; +// 3. 若 Redis 不可用,则直接回退到 DB MAX(seq)+1,并把结果尽力回填回 Redis。 +func (s *AgentService) nextConversationTimelineSeq(ctx context.Context, userID int, chatID string) (int64, error) { + if s == nil || s.repo == nil { + return 0, errors.New("agent service is not initialized") + } + if ctx == nil { + ctx = context.Background() + } + + normalizedChatID := strings.TrimSpace(chatID) + if userID <= 0 || normalizedChatID == "" { + return 0, errors.New("invalid timeline seq identity") + } + + if s.cacheDAO == nil { + return s.nextConversationTimelineSeqFromDB(ctx, userID, normalizedChatID) + } + + candidateSeq, err := s.cacheDAO.IncrConversationTimelineSeq(ctx, userID, normalizedChatID) + if err != nil { + log.Printf("分配时间线 seq 时 Redis INCR 失败,回退 DB user=%d chat=%s err=%v", userID, normalizedChatID, err) + return s.nextConversationTimelineSeqFromDB(ctx, userID, normalizedChatID) + } + + // 1. Redis key 缺失时,INCR 常会从 1 重新开始,容易和已有 DB 记录撞 seq。 + // 2. 这里额外对照一次 DB 最大 seq,把明显落后的顺序号提前修正,降低 outbox 消费时的补 seq 概率。 + // 3. 该自检不会看到“尚未消费到 MySQL 的新 outbox 事件”,因此真正的极端并发兜底仍由消费者承担。 + maxSeq, err := s.repo.GetConversationTimelineMaxSeq(ctx, userID, normalizedChatID) + if err != nil { + return 0, err + } + if candidateSeq > maxSeq { + return candidateSeq, nil + } + + repairedSeq := maxSeq + 1 + if err = s.cacheDAO.SetConversationTimelineSeq(ctx, userID, normalizedChatID, repairedSeq); err != nil { + log.Printf("修正时间线 seq 到 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, normalizedChatID, repairedSeq, err) + } + return repairedSeq, nil +} + +func (s *AgentService) nextConversationTimelineSeqFromDB(ctx context.Context, userID int, chatID string) (int64, error) { + maxSeq, err := s.repo.GetConversationTimelineMaxSeq(ctx, userID, chatID) + if err != nil { + return 0, err + } + nextSeq := maxSeq + 1 if s.cacheDAO != nil { - now := time.Now() - item := model.GetConversationTimelineItem{ - ID: eventID, - Seq: seq, - Kind: normalizedKind, - Role: normalizedRole, - Content: normalizedContent, - Payload: cloneTimelinePayload(payload), - TokensConsumed: tokensConsumed, - } - if eventCreatedAt != nil { - item.CreatedAt = eventCreatedAt - } else { - item.CreatedAt = &now - } - if err := s.cacheDAO.AppendConversationTimelineEventToCache(ctx, userID, normalizedChatID, item); err != nil { - log.Printf("追加会话时间线缓存失败 user=%d chat=%s seq=%d kind=%s err=%v", userID, normalizedChatID, seq, normalizedKind, err) + if setErr := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, chatID, nextSeq); setErr != nil { + log.Printf("回填时间线 seq 到 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, chatID, nextSeq, setErr) } } - return seq, nil + return nextSeq, nil } -func isTimelineSeqConflictError(err error) bool { - if err == nil { - return false +// normalizeConversationTimelinePersistMaterial 负责把 timeline 原始输入收敛成“可缓存 + 可持久化”的口径。 +// +// 职责边界: +// 1. 对普通事件只做浅拷贝,避免调用方后续继续改 map 影响已入队 payload; +// 2. 对 thinking_summary 只保留 detail_summary 与必要 metadata,明确剔除 short_summary; +// 3. 若 thinking_summary 最终没有 detail_summary,则返回 shouldPersist=false,仅保留实时 SSE 展示,不进入 timeline。 +func normalizeConversationTimelinePersistMaterial(kind string, content string, payload map[string]any) (string, map[string]any, bool) { + normalizedKind := strings.ToLower(strings.TrimSpace(kind)) + normalizedContent := strings.TrimSpace(content) + if normalizedKind != model.AgentTimelineKindThinkingSummary { + return normalizedContent, cloneTimelinePayload(payload), true } - text := strings.ToLower(err.Error()) - return strings.Contains(text, "duplicate") && strings.Contains(text, "uk_timeline_user_chat_seq") + return sanitizeThinkingSummaryPersistMaterial(normalizedContent, payload) } -// persistNewAgentTimelineExtraEvent 把 SSE extra 卡片事件写入时间线。 +func sanitizeThinkingSummaryPersistMaterial(content string, payload map[string]any) (string, map[string]any, bool) { + detailSummary := readTimelinePayloadString(payload, "detail_summary") + if detailSummary == "" { + detailSummary = strings.TrimSpace(content) + } + if detailSummary == "" { + return "", nil, false + } + + sanitized := make(map[string]any) + copyTrimmedTimelinePayloadField(payload, sanitized, "stage") + copyTrimmedTimelinePayloadField(payload, sanitized, "block_id") + copyTrimmedTimelinePayloadField(payload, sanitized, "display_mode") + copyTimelinePayloadFieldIfPresent(payload, sanitized, "summary_seq") + copyTimelinePayloadFieldIfPresent(payload, sanitized, "final") + copyTimelinePayloadFieldIfPresent(payload, sanitized, "duration_seconds") + sanitized["detail_summary"] = detailSummary + + return detailSummary, sanitized, true +} + +func copyTrimmedTimelinePayloadField(src map[string]any, dst map[string]any, key string) { + if len(src) == 0 || dst == nil { + return + } + value, ok := src[key] + if !ok { + return + } + text, ok := value.(string) + if !ok { + return + } + trimmed := strings.TrimSpace(text) + if trimmed == "" { + return + } + dst[key] = trimmed +} + +func copyTimelinePayloadFieldIfPresent(src map[string]any, dst map[string]any, key string) { + if len(src) == 0 || dst == nil { + return + } + value, ok := src[key] + if !ok || value == nil { + return + } + dst[key] = value +} + +// persistNewAgentTimelineExtraEvent 把 SSE extra 里的结构化事件写入时间线。 // // 说明: -// 1. 只持久化真正需要刷新后重建的卡片事件; -// 2. status/reasoning/finish 等临时过程信号不落时间线; -// 3. 失败只记日志,不中断当前 SSE 输出。 -func (s *AgentService) persistNewAgentTimelineExtraEvent(ctx context.Context, userID int, chatID string, extra *newagentstream.OpenAIChunkExtra) { +// 1. 只持久化刷新后仍需重建的业务事件; +// 2. short_summary 这类临时展示信息会在 appendConversationTimelineEvent 内被过滤掉; +// 3. 失败只记日志,不反向打断当前 SSE 输出。 +func (s *AgentService) persistNewAgentTimelineExtraEvent( + ctx context.Context, + userID int, + chatID string, + extra *newagentstream.OpenAIChunkExtra, +) { kind, ok := mapTimelineKindFromStreamExtra(extra) if !ok { return @@ -193,30 +368,33 @@ func (s *AgentService) persistNewAgentTimelineExtraEvent(ctx context.Context, us buildTimelinePayloadFromStreamExtra(extra), 0, ); err != nil { - log.Printf("写入 newAgent 卡片时间线失败 user=%d chat=%s kind=%s err=%v", userID, chatID, kind, err) + log.Printf("写入 newAgent 时间线事件失败 user=%d chat=%s kind=%s err=%v", userID, chatID, kind, err) } } -func (s *AgentService) nextConversationTimelineSeq(ctx context.Context, userID int, chatID string) (int64, error) { - if s.cacheDAO != nil { - seq, err := s.cacheDAO.IncrConversationTimelineSeq(ctx, userID, chatID) - if err == nil { - return seq, nil - } - log.Printf("会话时间线 seq Redis 分配失败,回退 DB user=%d chat=%s err=%v", userID, chatID, err) +func buildConversationTimelineCacheItem( + eventID int64, + seq int64, + kind string, + role string, + content string, + payload map[string]any, + tokensConsumed int, + createdAt *time.Time, +) model.GetConversationTimelineItem { + item := model.GetConversationTimelineItem{ + ID: eventID, + Seq: seq, + Kind: kind, + Role: role, + Content: content, + Payload: cloneTimelinePayload(payload), + TokensConsumed: tokensConsumed, } - - maxSeq, err := s.repo.GetConversationTimelineMaxSeq(ctx, userID, chatID) - if err != nil { - return 0, err + if createdAt != nil { + item.CreatedAt = createdAt } - seq := maxSeq + 1 - if s.cacheDAO != nil { - if err := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, chatID, seq); err != nil { - log.Printf("会话时间线 seq 回填 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, chatID, seq, err) - } - } - return seq, nil + return item } func buildConversationTimelineItemsFromDB(events []model.AgentTimelineEvent) []model.GetConversationTimelineItem { @@ -296,7 +474,8 @@ func canonicalizeTimelineKind(kind string, role string) string { model.AgentTimelineKindToolResult, model.AgentTimelineKindConfirmRequest, model.AgentTimelineKindBusinessCard, - model.AgentTimelineKindScheduleCompleted: + model.AgentTimelineKindScheduleCompleted, + model.AgentTimelineKindThinkingSummary: return normalizedKind case "text", "message", "query": if normalizedRole == "user" { @@ -337,6 +516,9 @@ func mapTimelineKindFromStreamExtra(extra *newagentstream.OpenAIChunkExtra) (str if extra == nil { return "", false } + if isThinkingSummaryStreamExtra(extra) { + return model.AgentTimelineKindThinkingSummary, true + } switch extra.Kind { case newagentstream.StreamExtraKindToolCall: return model.AgentTimelineKindToolCall, true @@ -357,6 +539,9 @@ func buildTimelinePayloadFromStreamExtra(extra *newagentstream.OpenAIChunkExtra) if extra == nil { return nil } + if isThinkingSummaryStreamExtra(extra) { + return buildThinkingSummaryTimelinePayload(extra) + } payload := map[string]any{ "stage": strings.TrimSpace(extra.Stage), "block_id": strings.TrimSpace(extra.BlockID), @@ -400,6 +585,67 @@ func buildTimelinePayloadFromStreamExtra(extra *newagentstream.OpenAIChunkExtra) return payload } +func isThinkingSummaryStreamExtra(extra *newagentstream.OpenAIChunkExtra) bool { + if extra == nil { + return false + } + return strings.EqualFold(strings.TrimSpace(string(extra.Kind)), model.AgentTimelineKindThinkingSummary) +} + +func buildThinkingSummaryTimelinePayload(extra *newagentstream.OpenAIChunkExtra) map[string]any { + payload := map[string]any{ + "stage": strings.TrimSpace(extra.Stage), + "block_id": strings.TrimSpace(extra.BlockID), + "display_mode": string(extra.DisplayMode), + } + + if extra.ThinkingSummary != nil { + summary := extra.ThinkingSummary + payload["summary_seq"] = summary.SummarySeq + payload["final"] = summary.Final + payload["duration_seconds"] = summary.DurationSeconds + if detailSummary := strings.TrimSpace(summary.DetailSummary); detailSummary != "" { + payload["detail_summary"] = detailSummary + } + return payload + } + + if detailSummary := readTimelineExtraMetaString(extra.Meta, "detail_summary"); detailSummary != "" { + payload["detail_summary"] = detailSummary + } + return payload +} + +func readTimelineExtraMetaString(meta map[string]any, key string) string { + if len(meta) == 0 { + return "" + } + raw, ok := meta[key] + if !ok { + return "" + } + text, ok := raw.(string) + if !ok { + return "" + } + return strings.TrimSpace(text) +} + +func readTimelinePayloadString(payload map[string]any, key string) string { + if len(payload) == 0 { + return "" + } + raw, ok := payload[key] + if !ok { + return "" + } + text, ok := raw.(string) + if !ok { + return "" + } + return strings.TrimSpace(text) +} + func cloneStreamBusinessCard(card *newagentstream.StreamBusinessCardExtra) map[string]any { if card == nil { return nil diff --git a/backend/service/agentsvc/reasoning_summary.go b/backend/service/agentsvc/reasoning_summary.go new file mode 100644 index 0000000..b2b1588 --- /dev/null +++ b/backend/service/agentsvc/reasoning_summary.go @@ -0,0 +1,112 @@ +package agentsvc + +import ( + "context" + "errors" + "log" + "strings" + + infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" + newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" + newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" +) + +const reasoningSummaryMaxTokens = 700 + +type reasoningSummaryLLMResponse struct { + ShortSummary string `json:"short_summary"` + DetailSummary string `json:"detail_summary"` +} + +// makeReasoningSummaryFunc 把便宜模型封装成 stream 层可注入的摘要函数。 +// +// 职责边界: +// 1. service 层负责选择模型与 prompt,stream 层只负责调度和闸门; +// 2. 这里不持久化摘要,持久化统一走 ChunkEmitter 的 extra hook; +// 3. 摘要失败时返回 error,由 ReasoningDigestor 吞掉并等待下一次水位线/Flush 兜底。 +func (s *AgentService) makeReasoningSummaryFunc(client *infrallm.Client) newagentstream.ReasoningSummaryFunc { + if client == nil { + return nil + } + + return func(ctx context.Context, input newagentstream.ReasoningSummaryInput) (newagentstream.StreamThinkingSummaryExtra, error) { + previousSummary := "" + if input.PreviousSummary != nil { + previousSummary = input.PreviousSummary.DetailSummary + if strings.TrimSpace(previousSummary) == "" { + previousSummary = input.PreviousSummary.ShortSummary + } + } + + messages := newagentprompt.BuildReasoningSummaryMessages(newagentprompt.ReasoningSummaryPromptInput{ + FullReasoning: input.FullReasoning, + DeltaReasoning: input.DeltaReasoning, + PreviousSummary: previousSummary, + CandidateSeq: input.CandidateSeq, + Final: input.Final, + DurationSeconds: input.DurationSeconds, + }) + + resp, rawResult, err := infrallm.GenerateJSON[reasoningSummaryLLMResponse]( + ctx, + client, + messages, + infrallm.GenerateOptions{ + Temperature: 0.1, + MaxTokens: reasoningSummaryMaxTokens, + Thinking: infrallm.ThinkingModeDisabled, + Metadata: map[string]any{ + "stage": "reasoning_summary", + "candidate_seq": input.CandidateSeq, + "final": input.Final, + }, + }, + ) + if err != nil { + log.Printf("[WARN] reasoning 摘要模型调用失败 seq=%d final=%v err=%v raw=%s", + input.CandidateSeq, + input.Final, + err, + truncateReasoningSummaryRaw(rawResult), + ) + return newagentstream.StreamThinkingSummaryExtra{}, err + } + + summary := newagentstream.StreamThinkingSummaryExtra{ + ShortSummary: strings.TrimSpace(resp.ShortSummary), + DetailSummary: limitReasoningDetailSummary( + resp.DetailSummary, + newagentprompt.ReasoningSummaryDetailRuneLimit(input.FullReasoning, input.DeltaReasoning), + ), + } + if summary.ShortSummary == "" && summary.DetailSummary == "" { + return newagentstream.StreamThinkingSummaryExtra{}, errors.New("reasoning 摘要模型返回空摘要") + } + return summary, nil + } +} + +func limitReasoningDetailSummary(text string, maxRunes int) string { + text = strings.TrimSpace(text) + if text == "" || maxRunes <= 0 { + return text + } + + runes := []rune(text) + if len(runes) <= maxRunes { + return text + } + return string(runes[:maxRunes]) +} + +func truncateReasoningSummaryRaw(raw *infrallm.TextResult) string { + if raw == nil { + return "" + } + text := strings.TrimSpace(raw.Text) + runes := []rune(text) + if len(runes) <= 200 { + return text + } + return string(runes[:200]) + "..." +} diff --git a/backend/service/events/agent_timeline_persist.go b/backend/service/events/agent_timeline_persist.go new file mode 100644 index 0000000..c20bb4c --- /dev/null +++ b/backend/service/events/agent_timeline_persist.go @@ -0,0 +1,326 @@ +package events + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "strings" + + "github.com/LoveLosita/smartflow/backend/dao" + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + "github.com/LoveLosita/smartflow/backend/model" + "gorm.io/gorm" +) + +const EventTypeAgentTimelinePersistRequested = "agent.timeline.persist.requested" + +// RegisterAgentTimelinePersistHandler 注册“会话时间线持久化”消费者处理器。 +// +// 职责边界: +// 1. 只负责 timeline 事件,不处理 chat_history 等其他业务消息; +// 2. 只负责注册 handler,不负责总线启停; +// 3. 通过 outbox 通用消费事务,把“时间线写库 + consumed 推进”放进同一事务; +// 4. 若遇到 seq 唯一键冲突,会先判定是否属于重放幂等,再决定是否补新 seq 并回填 Redis。 +func RegisterAgentTimelinePersistHandler( + bus *outboxinfra.EventBus, + outboxRepo *outboxinfra.Repository, + agentRepo *dao.AgentDAO, + cacheDAO *dao.CacheDAO, +) error { + // 1. 依赖校验:缺少任一关键依赖都无法安全消费消息。 + if bus == nil { + return errors.New("event bus is nil") + } + if outboxRepo == nil { + return errors.New("outbox repository is nil") + } + if agentRepo == nil { + return errors.New("agent repo is nil") + } + + handler := func(ctx context.Context, envelope kafkabus.Envelope) error { + var payload model.ChatTimelinePersistPayload + if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { + // 1. payload 无法反序列化属于不可恢复错误,直接标 dead,避免无意义重试。 + _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析时间线持久化载荷失败: "+unmarshalErr.Error()) + return nil + } + + payload = payload.Normalize() + if !payload.HasValidIdentity() { + // 2. 这里只校验“能否唯一定位一条 timeline 记录”的最小字段集合。 + // 3. content / payload_json 是否为空由事件类型自行决定,不在这里一刀切限制。 + _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "时间线持久化载荷非法: user_id/conversation_id/seq/kind 非法") + return nil + } + + refreshCache := false + finalSeq := payload.Seq + + // 4. 统一走 outbox 消费事务入口,保证“业务写入成功 -> consumed”原子一致。 + err := outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { + finalPayload, repaired, persistErr := persistConversationTimelineEventInTx(ctx, tx, agentRepo.WithTx(tx), payload) + if persistErr != nil { + return persistErr + } + refreshCache = repaired + finalSeq = finalPayload.Seq + return nil + }) + if err != nil { + return err + } + + // 5. 只有发生“seq 冲突且补了新 seq”时,才需要重建 Redis timeline。 + // 5.1 原因:主链路已经先写过 Redis,常规成功无需重复回写。 + // 5.2 若发生补 seq,不重建会留下旧 seq 的缓存残影,刷新后顺序会错乱。 + // 5.3 缓存重建失败只记日志,不能反向把已 consumed 的 outbox 回滚。 + if refreshCache { + if refreshErr := rebuildConversationTimelineCache(ctx, agentRepo, cacheDAO, payload.UserID, payload.ConversationID, finalSeq); refreshErr != nil { + log.Printf("重建时间线缓存失败 user=%d chat=%s seq=%d err=%v", payload.UserID, payload.ConversationID, finalSeq, refreshErr) + } + } + return nil + } + + return bus.RegisterEventHandler(EventTypeAgentTimelinePersistRequested, handler) +} + +// PublishAgentTimelinePersistRequested 发布“会话时间线持久化请求”事件。 +// +// 设计目的: +// 1. 让业务层只传 DTO,不重复拼事件元数据; +// 2. 统一以 conversation_id 作为 MessageKey / AggregateID,尽量降低同会话乱序概率; +// 3. 发布失败显式返回 error,由调用方决定是否中断主链路。 +func PublishAgentTimelinePersistRequested( + ctx context.Context, + publisher outboxinfra.EventPublisher, + payload model.ChatTimelinePersistPayload, +) error { + if publisher == nil { + return errors.New("event publisher is nil") + } + + payload = payload.Normalize() + if !payload.HasValidIdentity() { + return errors.New("invalid timeline persist payload") + } + + return publisher.Publish(ctx, outboxinfra.PublishRequest{ + EventType: EventTypeAgentTimelinePersistRequested, + EventVersion: outboxinfra.DefaultEventVersion, + MessageKey: payload.ConversationID, + AggregateID: payload.ConversationID, + Payload: payload, + }) +} + +// persistConversationTimelineEventInTx 负责在单个事务里完成 timeline 事件写库。 +// +// 步骤化说明: +// 1. 先按 payload 原始 seq 尝试写入; +// 2. 若命中 seq 唯一键冲突,先查询同 seq 记录,判断是否属于“重放同一事件”; +// 3. 若不是重放,而是 Redis seq 漂移导致的新旧事件撞 seq,则用 max(seq)+1 重新分配; +// 4. 最多修复 3 次,避免异常数据把消费者拖进无限循环。 +func persistConversationTimelineEventInTx( + ctx context.Context, + tx *gorm.DB, + agentRepo *dao.AgentDAO, + payload model.ChatTimelinePersistPayload, +) (model.ChatTimelinePersistPayload, bool, error) { + if tx == nil { + return payload, false, errors.New("transaction is nil") + } + if agentRepo == nil { + return payload, false, errors.New("agent repo is nil") + } + + working := payload.Normalize() + repaired := false + + for attempt := 0; attempt < 3; attempt++ { + if _, _, err := agentRepo.SaveConversationTimelineEvent(ctx, working); err == nil { + return working, repaired, nil + } else if !model.IsTimelineSeqConflictError(err) { + return working, repaired, err + } + + // 1. 先判断是否属于“同一条事件被重复消费”。 + // 2. 若库里已有记录且字段完全一致,说明前一次其实已经成功落库,本次可视为幂等成功。 + // 3. 若字段不一致,再进入“补新 seq”分支,避免把真正的新事件吞掉。 + existing, findErr := findConversationTimelineEventBySeq(ctx, tx, working.UserID, working.ConversationID, working.Seq) + if findErr == nil && working.MatchesStoredEvent(existing) { + return working, repaired, nil + } + if findErr != nil && !errors.Is(findErr, gorm.ErrRecordNotFound) { + return working, repaired, findErr + } + + maxSeq, maxErr := loadConversationTimelineMaxSeq(ctx, tx, working.UserID, working.ConversationID) + if maxErr != nil { + return working, repaired, maxErr + } + working.Seq = maxSeq + 1 + repaired = true + } + + return working, repaired, fmt.Errorf("timeline seq repair exceeded limit user=%d chat=%s", working.UserID, working.ConversationID) +} + +func findConversationTimelineEventBySeq( + ctx context.Context, + tx *gorm.DB, + userID int, + conversationID string, + seq int64, +) (model.AgentTimelineEvent, error) { + var event model.AgentTimelineEvent + err := tx.WithContext(ctx). + Where("user_id = ? AND chat_id = ? AND seq = ?", userID, strings.TrimSpace(conversationID), seq). + Take(&event).Error + return event, err +} + +func loadConversationTimelineMaxSeq( + ctx context.Context, + tx *gorm.DB, + userID int, + conversationID string, +) (int64, error) { + var maxSeq int64 + err := tx.WithContext(ctx). + Model(&model.AgentTimelineEvent{}). + Where("user_id = ? AND chat_id = ?", userID, strings.TrimSpace(conversationID)). + Select("COALESCE(MAX(seq), 0)"). + Scan(&maxSeq).Error + if err != nil { + return 0, err + } + return maxSeq, nil +} + +// rebuildConversationTimelineCache 在“补新 seq”后重建 Redis timeline 缓存。 +// +// 说明: +// 1. 这里只在缓存存在时执行;未接 Redis 的环境直接跳过即可; +// 2. 需要整表重建而不是只 append 一条,因为旧缓存里已经存在错误 seq 的事件; +// 3. 这里不抽到 agentsvc 复用,是因为 events 不能反向依赖 service,否则会形成循环依赖。 +func rebuildConversationTimelineCache( + ctx context.Context, + agentRepo *dao.AgentDAO, + cacheDAO *dao.CacheDAO, + userID int, + conversationID string, + finalSeq int64, +) error { + if cacheDAO == nil || agentRepo == nil { + return nil + } + + events, err := agentRepo.ListConversationTimelineEvents(ctx, userID, conversationID) + if err != nil { + return err + } + items := buildConversationTimelineCacheItems(events) + if err = cacheDAO.SetConversationTimelineToCache(ctx, userID, conversationID, items); err != nil { + return err + } + + if len(items) > 0 { + finalSeq = items[len(items)-1].Seq + } + return cacheDAO.SetConversationTimelineSeq(ctx, userID, conversationID, finalSeq) +} + +func buildConversationTimelineCacheItems(events []model.AgentTimelineEvent) []model.GetConversationTimelineItem { + if len(events) == 0 { + return make([]model.GetConversationTimelineItem, 0) + } + + items := make([]model.GetConversationTimelineItem, 0, len(events)) + for _, event := range events { + item := model.GetConversationTimelineItem{ + ID: event.ID, + Seq: event.Seq, + Kind: strings.TrimSpace(event.Kind), + TokensConsumed: event.TokensConsumed, + CreatedAt: event.CreatedAt, + } + if event.Role != nil { + item.Role = strings.TrimSpace(*event.Role) + } + if event.Content != nil { + item.Content = strings.TrimSpace(*event.Content) + } + if event.Payload != nil { + var payload map[string]any + if err := json.Unmarshal([]byte(strings.TrimSpace(*event.Payload)), &payload); err == nil && len(payload) > 0 { + item.Payload = payload + } + } + items = append(items, item) + } + return normalizeConversationTimelineCacheItems(items) +} + +func normalizeConversationTimelineCacheItems(items []model.GetConversationTimelineItem) []model.GetConversationTimelineItem { + if len(items) == 0 { + return make([]model.GetConversationTimelineItem, 0) + } + + normalized := make([]model.GetConversationTimelineItem, 0, len(items)) + for _, item := range items { + role := strings.ToLower(strings.TrimSpace(item.Role)) + kind := canonicalizeConversationTimelineKind(item.Kind, role) + + if kind == "" { + switch role { + case "user": + kind = model.AgentTimelineKindUserText + case "assistant": + kind = model.AgentTimelineKindAssistantText + } + } + if role == "" { + switch kind { + case model.AgentTimelineKindUserText: + role = "user" + case model.AgentTimelineKindAssistantText: + role = "assistant" + } + } + + item.Kind = kind + item.Role = role + normalized = append(normalized, item) + } + return normalized +} + +func canonicalizeConversationTimelineKind(kind string, role string) string { + normalizedKind := strings.ToLower(strings.TrimSpace(kind)) + normalizedRole := strings.ToLower(strings.TrimSpace(role)) + + switch normalizedKind { + case model.AgentTimelineKindUserText, + model.AgentTimelineKindAssistantText, + model.AgentTimelineKindToolCall, + model.AgentTimelineKindToolResult, + model.AgentTimelineKindConfirmRequest, + model.AgentTimelineKindBusinessCard, + model.AgentTimelineKindScheduleCompleted, + model.AgentTimelineKindThinkingSummary: + return normalizedKind + case "text", "message", "query": + if normalizedRole == "user" { + return model.AgentTimelineKindUserText + } + if normalizedRole == "assistant" { + return model.AgentTimelineKindAssistantText + } + } + return normalizedKind +} diff --git a/docs/frontend/newagent_thinking_summary_对接说明.md b/docs/frontend/newagent_thinking_summary_对接说明.md new file mode 100644 index 0000000..ec9dfe8 --- /dev/null +++ b/docs/frontend/newagent_thinking_summary_对接说明.md @@ -0,0 +1,389 @@ +# NewAgent 思考摘要前端对接说明 + +## 背景 + +后端已经不再把模型原始 `reasoning_content` 直接透传给前端。新的展示入口是 SSE 顶层 `extra.kind = "thinking_summary"` 事件。 + +目标体验: + +- 用户等待模型深度思考时,前端每隔几秒收到一条短摘要,作为当前思考状态的轻量提示。 +- 展开后展示稍长的 `detail_summary`,多条按时间追加。 +- 模型开始输出正文后,当前思考摘要停止更新。 +- 刷新会话后,只恢复长摘要,不恢复短摘要。 + +## 实时 SSE 协议 + +聊天接口仍然是: + +```http +POST /api/v1/agent/chat +Content-Type: application/json +Accept: text/event-stream +``` + +SSE 每个业务包仍是标准格式: + +```text +data: {json} + +data: [DONE] +``` + +后端保活心跳是 SSE 注释行: + +```text +: ping +``` + +前端按现有逻辑忽略不能 JSON.parse 的块即可。 + +## thinking_summary 事件 + +实时思考摘要事件没有 `delta.content`,也没有 `delta.reasoning_content`。前端应从顶层 `extra.thinking_summary` 读取。 + +示例: + +```json +{ + "id": "trace-id", + "object": "chat.completion.chunk", + "created": 1777399000, + "model": "pro", + "extra": { + "kind": "thinking_summary", + "block_id": "plan.speak", + "stage": "plan", + "display_mode": "append", + "thinking_summary": { + "summary_seq": 1, + "short_summary": "正在梳理计划", + "detail_summary": "正在把用户目标拆成可执行步骤,并检查是否需要补充约束。", + "duration_seconds": 3.214 + } + } +} +``` + +字段说明: + +| 字段 | 说明 | +| --- | --- | +| `extra.kind` | 固定为 `thinking_summary`。 | +| `extra.block_id` | 当前摘要所属展示块,例如 `plan.speak`、`execute.speak`、`fallback.speak`。建议作为分组 key 的一部分。 | +| `extra.stage` | 当前节点阶段,例如 `plan`、`execute`、`fallback`。 | +| `extra.display_mode` | 当前固定为 `append`,表示长摘要按条追加。 | +| `thinking_summary.summary_seq` | 同一个摘要器内递增,用于忽略重复或乱序摘要。不要当作全局 timeline seq。 | +| `thinking_summary.short_summary` | 实时短摘要,只用于当前流式展示,不持久化。 | +| `thinking_summary.detail_summary` | 展开态长摘要,按 append 语义追加;刷新后也只恢复这个字段。 | +| `thinking_summary.duration_seconds` | 从首次收到 reasoning 到生成该摘要的耗时秒数,可能是小数。 | +| `thinking_summary.final` | 可选。若出现 `true`,表示该摘要器在没有正文打断的情况下自然收口。不要依赖它一定出现。 | + +已删除字段: + +- `state` 已从协议、prompt、timeline 持久化里删除,前端不要再依赖或展示。 + +## 前端处理建议 + +建议把思考摘要作为 assistant 消息内的一个子结构,而不是普通正文。 + +推荐 key: + +```ts +const key = extra.block_id || extra.stage || 'thinking' +``` + +推荐类型: + +```ts +export interface ThinkingSummaryPayload { + summary_seq?: number + short_summary?: string + detail_summary?: string + final?: boolean + duration_seconds?: number +} + +export interface ThinkingSummaryBlock { + key: string + stage?: string + blockId?: string + latestSeq: number + latestShort: string + details: Array<{ + seq: number + text: string + durationSeconds?: number + final?: boolean + }> + active: boolean + collapsed: boolean +} +``` + +实时处理伪代码: + +```ts +function handleThinkingSummary(extra: StreamExtra, message: AssistantMessage) { + if (extra.kind !== 'thinking_summary') return false + + const summary = extra.thinking_summary + if (!summary) return true + + const key = extra.block_id || extra.stage || 'thinking' + const block = ensureThinkingSummaryBlock(message, key, { + stage: extra.stage, + blockId: extra.block_id, + }) + + const seq = summary.summary_seq ?? block.latestSeq + 1 + if (seq <= block.latestSeq) return true + + block.latestSeq = seq + block.active = summary.final !== true + + if (summary.short_summary?.trim()) { + block.latestShort = summary.short_summary.trim() + } + + if (summary.detail_summary?.trim()) { + block.details.push({ + seq, + text: summary.detail_summary.trim(), + durationSeconds: summary.duration_seconds, + final: summary.final, + }) + } + + return true +} +``` + +正文开始时的处理: + +```ts +function handleAssistantContentStart(message: AssistantMessage) { + // 后端正文一出现就会停止当前 block 的摘要; + // 前端这里也可以把活跃思考块收口,避免动效继续闪。 + message.thinkingSummaryBlocks?.forEach(block => { + block.active = false + }) +} +``` + +注意: + +- 收到 `thinking_summary` 时,不要追加到 `assistantMessage.content`。 +- 收到 `thinking_summary` 时,不要写入旧的 `assistantMessage.reasoning`。 +- 若仍收到旧链路 `delta.reasoning_content`,可以保留兼容,但新样式应优先使用 `thinking_summary`。 +- `summary_seq` 只在同一个 `block_id/stage` 下去重;不同 block 不要互相比较。 + +## 展示语义 + +短摘要: + +- 展示最新一条 `short_summary`。 +- 适合放在折叠态标题、胶囊、加载条旁边。 +- 不要持久化到本地历史,也不要在刷新恢复后强行补出来。 + +长摘要: + +- 每次收到非空 `detail_summary` 就追加一条。 +- 展开态展示 `details` 列表。 +- 如果你想做得更像 Gemini/豆包,可以折叠态只露最新短摘要,展开态按时间展示长摘要列表。 + +收口条件: + +- 收到第一段 `delta.content`:关闭当前 assistant 消息里的活跃思考态。 +- 收到 `finish_reason` 或 `[DONE]`:关闭所有活跃思考态。 +- 收到 `thinking_summary.final === true`:可以关闭对应 block,但不要依赖它总会出现。 + +## 历史 timeline 恢复 + +刷新会话时读取: + +```http +GET /api/v1/agent/conversation-timeline?conversation_id={conversation_id} +``` + +统一响应仍是: + +```json +{ + "status": "0", + "info": "success", + "data": [] +} +``` + +`thinking_summary` timeline item 示例: + +```json +{ + "id": 123, + "seq": 8, + "kind": "thinking_summary", + "content": "正在把用户目标拆成可执行步骤,并检查是否需要补充约束。", + "payload": { + "stage": "plan", + "block_id": "plan.speak", + "display_mode": "append", + "summary_seq": 1, + "detail_summary": "正在把用户目标拆成可执行步骤,并检查是否需要补充约束。", + "duration_seconds": 3.214 + }, + "created_at": "2026-04-28T21:00:00+08:00" +} +``` + +历史恢复规则: + +- 只恢复 `detail_summary`,没有 `short_summary`。 +- 按 timeline item 的 `seq` 排序渲染即可,后端已升序返回。 +- 可用 `payload.block_id || payload.stage || "thinking"` 归组到对应 assistant 消息附近。 +- 如果当前前端还没做跨事件归组,可以先把它渲染为 assistant 消息里的“思考摘要条目”,位置按 timeline 顺序插入。 + +建议更新现有前端类型: + +```ts +export interface TimelineThinkingSummaryPayload { + stage?: string + block_id?: string + display_mode?: 'append' + summary_seq?: number + detail_summary?: string + duration_seconds?: number + final?: boolean +} + +export interface TimelineEvent { + id: number + seq: number + kind: + | 'user_text' + | 'assistant_text' + | 'tool_call' + | 'tool_result' + | 'confirm_request' + | 'schedule_completed' + | 'business_card' + | 'thinking_summary' + role?: 'user' | 'assistant' + content?: string + payload?: { + stage?: string + block_id?: string + display_mode?: 'append' | 'replace' | 'card' + thinking_summary?: never + detail_summary?: string + summary_seq?: number + duration_seconds?: number + final?: boolean + tool?: TimelineToolPayload + confirm?: TimelineConfirmPayload + business_card?: TimelineBusinessCardPayload + } + tokens_consumed?: number + created_at?: string +} +``` + +## 与正文/工具卡片的关系 + +同一轮流里可能出现: + +1. `thinking_summary` +2. `tool_call` / `tool_result` +3. `assistant_text` 或 `delta.content` +4. `finish` +5. `[DONE]` + +前端建议: + +- `thinking_summary` 是“等待过程”组件。 +- `tool_call` / `tool_result` 继续走现有工具卡片。 +- `delta.content` 继续追加到 assistant 正文。 +- `finish` / `[DONE]` 只负责收尾,不需要生成可见消息。 + +## 测试用例 + +### 1. 只有摘要,还没正文 + +输入事件: + +```json +{ + "extra": { + "kind": "thinking_summary", + "block_id": "plan.speak", + "stage": "plan", + "display_mode": "append", + "thinking_summary": { + "summary_seq": 1, + "short_summary": "正在理解需求", + "detail_summary": "正在识别用户的目标、约束和需要补充的信息。", + "duration_seconds": 2.1 + } + } +} +``` + +预期: + +- 折叠态显示“正在理解需求”。 +- 展开态新增一条 detail。 +- 正文区域不新增文字。 + +### 2. 多条摘要追加 + +输入 `summary_seq=1,2,3`。 + +预期: + +- `latestShort` 使用第 3 条短摘要。 +- `details` 有 3 条,按收到顺序或 seq 升序展示。 + +### 3. 乱序或重复摘要 + +已处理到 `summary_seq=3` 后,又收到 `summary_seq=2`。 + +预期: + +- 忽略旧事件,不回退短摘要,不追加 detail。 + +### 4. 正文开始 + +收到: + +```json +{ + "choices": [ + { + "delta": { "content": "我整理好了,下面是建议:" } + } + ] +} +``` + +预期: + +- 当前活跃思考块停止 loading 动效。 +- 正文正常追加。 +- 后续若仍意外收到同 block 摘要,可按 seq 处理,但 UI 上建议不再重新激活。 + +### 5. 历史恢复 + +timeline 返回 `kind=thinking_summary`。 + +预期: + +- 只展示 `payload.detail_summary || content`。 +- 不展示短摘要占位。 +- 不需要显示 `state`,协议里已经没有这个字段。 + +## 最小改动清单 + +1. `StreamEventPayload.extra` 增加 `thinking_summary` 字段。 +2. `TimelineEvent.kind` 增加 `thinking_summary`。 +3. SSE 解析里在 `handleStreamExtraEvent` 增加 `extra.kind === "thinking_summary"` 分支。 +4. 收到正文 `delta.content` 时,把当前思考摘要块置为非活跃。 +5. 历史 timeline 恢复时支持 `kind === "thinking_summary"`,只恢复长摘要。 diff --git a/frontend/src/api/schedule_agent.ts b/frontend/src/api/schedule_agent.ts index 15233c1..ca11da6 100644 --- a/frontend/src/api/schedule_agent.ts +++ b/frontend/src/api/schedule_agent.ts @@ -71,6 +71,16 @@ export interface TaskRecordCardData { export type BusinessCardType = 'task_query' | 'task_record' export type TaskRecordSource = 'quick_note' | 'create_task' +export interface TimelineThinkingSummaryPayload { + stage?: string + block_id?: string + display_mode?: 'append' + summary_seq?: number + detail_summary?: string + duration_seconds?: number + final?: boolean +} + export interface TimelineBusinessCardPayload { card_type: BusinessCardType title?: string @@ -92,16 +102,21 @@ export interface TimelineEvent { | 'interrupt' | 'status' | 'business_card' + | 'thinking_summary' role?: 'user' | 'assistant' content?: string payload?: { reasoning_content?: string stage?: string block_id?: string - display_mode?: 'card' + display_mode?: 'card' | 'append' tool?: TimelineToolPayload confirm?: TimelineConfirmPayload business_card?: TimelineBusinessCardPayload + summary_seq?: number + detail_summary?: string + duration_seconds?: number + final?: boolean } tokens_consumed?: number created_at: string diff --git a/frontend/src/components/dashboard/AssistantPanel.vue b/frontend/src/components/dashboard/AssistantPanel.vue index 236e60a..9d21c14 100644 --- a/frontend/src/components/dashboard/AssistantPanel.vue +++ b/frontend/src/components/dashboard/AssistantPanel.vue @@ -256,6 +256,7 @@ const unavailableHistoryMap = reactive>({}) const thinkingMessageMap = reactive>({}) const reasoningCollapsedMap = reactive>({}) const reasoningStartedAtMap = reactive>({}) +const reasoningCurrentShortSummaryMap = reactive>({}) const reasoningDurationMap = reactive>({}) const confirmOnlyStreamMap = reactive>({}) const confirmVisiblePrefixMap = reactive>({}) @@ -931,8 +932,8 @@ function appendAssistantReasoningChunk(messageId: string, chunk: string) { // 记录块级别的起始时间和初始折叠状态 reasoningStartedAtMap[blockId] = Date.now() - reasoningCollapsedMap[blockId] = false - + reasoningCollapsedMap[blockId] = true + assistantTimelineLastKindMap[messageId] = 'reasoning' } @@ -1130,6 +1131,7 @@ function cleanupHiddenAssistantMessageState(messageId: string) { delete thinkingMessageMap[messageId] delete reasoningCollapsedMap[messageId] delete reasoningStartedAtMap[messageId] + delete reasoningCurrentShortSummaryMap[messageId] delete reasoningDurationMap[messageId] delete confirmOnlyStreamMap[messageId] delete confirmVisiblePrefixMap[messageId] @@ -1318,8 +1320,24 @@ function syncConversationListItemFromMeta( } } -function renderMessageMarkdown(content: string) { - return renderMarkdown(content) +function renderMessageMarkdown(content: string, isStreaming = false) { + let html = renderMarkdown(content) + if (isStreaming) { + const dotHtml = '' + // 1. 找到最后一个能容纳行内文本的闭合标签(如

、、 等), + // 并在该标签之前插入圆点,这样圆点就始终位于文字流的末端。 + // 2. 需要从后往前搜索,避免匹配到中间段落的闭合标签。 + // 3. 如果找不到(纯文本无标签),则直接追加到末尾。 + // 4. code 属于行内文本容器,pre 属于外层包裹容器,保证代码块场景下圆点深入到代码内部。 + const inlineContainerPattern = /<\/(p|li|td|th|h[1-6]|code)>\s*(<\/(ol|ul|table|div|blockquote|pre)>\s*)*$/i + const match = html.match(inlineContainerPattern) + if (match && match.index !== undefined) { + html = html.substring(0, match.index) + dotHtml + html.substring(match.index) + } else { + html += dotHtml + } + } + return html } function isStreamingMessage(message: AssistantMessage) { @@ -1431,6 +1449,11 @@ function markReasoningFinished(blockId: string, messageId: string) { reasoningDurationMap[blockId] = Math.max(1, Math.round((Date.now() - startedAt) / 1000)) } thinkingMessageMap[messageId] = false + + // 若被展开,则思考完毕后自动闭合 + if (reasoningCollapsedMap[blockId] === false) { + reasoningCollapsedMap[blockId] = true + } } function getReasoningDurationSeconds(blockId: string) { @@ -1448,13 +1471,15 @@ function getReasoningDurationSeconds(blockId: string) { } function getReasoningStatusLabel(block: DisplayAssistantBlock) { - const durationSeconds = getReasoningDurationSeconds(block.id) - if (durationSeconds > 0) { - return `已思考(用时 ${durationSeconds} 秒)` + const isThinking = block.sourceId === activeStreamingMessageId.value && thinkingMessageMap[block.sourceId] + + if (isThinking) { + // 状态栏显示当前阶段的短摘要 + return reasoningCurrentShortSummaryMap[block.id] || '正在思考...' } - const isThinking = block.sourceId === activeStreamingMessageId.value && thinkingMessageMap[block.sourceId] - return isThinking ? '思考中' : '已思考' + // 思考结束后,状态栏显示固定文案 + return '已完成深度思考' } /** @@ -1635,16 +1660,25 @@ function getDisplayAssistantBlocks(dm: DisplayMessage): DisplayAssistantBlock[] }) } - if (shouldShowDisplayAnsweringIndicator(dm)) { - const maxSeq = blocks.length > 0 ? Math.max(...blocks.map((item) => item.seq)) : 0 - blocks.push({ - id: `${dm.id}:content-indicator`, - type: 'content_indicator', - seq: maxSeq + 1, - }) + if (shouldShowDisplayAnsweringIndicator(dm) && blocks.length === 0) { + const maxSeq = blocks.length > 0 ? Math.max(...blocks.map((item) => item.seq)) : 0 + blocks.push({ + id: `${dm.id}:content-indicator`, + type: 'content_indicator', + seq: maxSeq + 1, + } as any) + } + + const sortedBlocks = blocks.sort((left, right) => left.seq - right.seq) + + // 核心修复:确保全消息流中只有一个点。 + // 只有当整个 DisplayMessage 处于流式状态,且当前块是最后一块时,才标记为 isStreaming。 + if (isDisplayStreaming(dm) && sortedBlocks.length > 0) { + const lastBlock = sortedBlocks[sortedBlocks.length - 1] as any + lastBlock.isStreaming = true } - return blocks.sort((left, right) => left.seq - right.seq) + return sortedBlocks } function getToolTraceStateLabel(state: ToolTraceState): string { @@ -1661,9 +1695,8 @@ function getToolTraceStateLabel(state: ToolTraceState): string { } function shouldShowDisplayAnsweringIndicator(dm: DisplayMessage): boolean { - return isDisplayStreaming(dm) && - dm.sources.every(m => thinkingMessageMap[m.id] !== true) && - !dm.content.trim() + // 基础判断:处于流式,且还没有任何实质性内容(包括推理和正文) + return isDisplayStreaming(dm) && !dm.content.trim() } function getDisplayReasoningStatusLabel(dm: DisplayMessage): string { @@ -2534,7 +2567,7 @@ function prepareAssistantMessageForStreaming(message: AssistantMessage, createdA message.reasoning = '' message.createdAt = createdAt thinkingMessageMap[message.id] = isManualThinkingEnabled(selectedThinkingMode.value) - reasoningCollapsedMap[message.id] = false + reasoningCollapsedMap[message.id] = true delete reasoningStartedAtMap[message.id] delete reasoningDurationMap[message.id] clearToolTraceState(message.id) @@ -2914,7 +2947,7 @@ async function sendMessageInternal(options: SendMessageOptions = {}) { }) thinkingMessageMap[assistantMessage.id] = isManualThinkingEnabled(selectedThinkingMode.value) - reasoningCollapsedMap[assistantMessage.id] = false + reasoningCollapsedMap[assistantMessage.id] = true activeStreamingMessageId.value = assistantMessage.id messageInput.value = '' @@ -2966,7 +2999,7 @@ async function sendMessageInternal(options: SendMessageOptions = {}) { } ElMessage.error(error instanceof Error ? error.message : '发送消息失败,请稍后重试') } - reasoningCollapsedMap[assistantMessage.id] = false + reasoningCollapsedMap[assistantMessage.id] = true } finally { streamAbortController.value = null activeStreamingMessageId.value = '' @@ -3181,7 +3214,7 @@ onBeforeUnmount(() => { -
+
- {{ formatMessageTime(dm.createdAt) }} +
@@ -3245,7 +3278,10 @@ onBeforeUnmount(() => {
-
+
{ {{ getReasoningStatusLabel(block) }} +
-
-
-
-
-
- 正在思考 + +
+
+
+
-
+
-
-
+
+
-
- 正在思考 -
+
@@ -3344,7 +3386,7 @@ onBeforeUnmount(() => {
- {{ formatMessageTime(dm.createdAt) }} +
@@ -3727,6 +3769,7 @@ onBeforeUnmount(() => { filter: blur(8px); } + .assistant-shell { height: 100%; min-height: 0; @@ -4891,6 +4934,7 @@ onBeforeUnmount(() => { align-items: center; gap: 8px; color: #5a6577; + position: relative; } /* --- Tooling & Selector Beautification --- */ @@ -4966,8 +5010,8 @@ onBeforeUnmount(() => { } .chat-message__reasoning-status { - font-size: 13px; - font-weight: 600; + font-size: 15px; + font-weight: 500; line-height: 1.35; } @@ -4975,7 +5019,7 @@ onBeforeUnmount(() => { width: 16px; height: 16px; display: inline-flex; - color: #4f76ea; + color: #94a3b8; } .chat-message__reasoning-icon-svg { @@ -4998,8 +5042,8 @@ onBeforeUnmount(() => { } .chat-message__reasoning-toggle:hover { - background: rgba(79, 118, 234, 0.1); - color: #4f76ea; + background: rgba(148, 163, 184, 0.1); + color: #64748b; } .chat-message__reasoning-chevron { @@ -5020,7 +5064,7 @@ onBeforeUnmount(() => { .chat-message__reasoning-body { margin: 10px 0 10px 7px; padding-left: 16px; - border-left: 2px dashed rgba(59, 130, 246, 0.3); /* 改为虚线,更具“思考中”的科技感 */ + border-left: 2px dashed rgba(148, 163, 184, 0.4); /* 灰色虚线(同 debug 页) */ font-style: italic; color: #64748b; } @@ -5171,6 +5215,19 @@ onBeforeUnmount(() => { font-size: 11px; } +/* 消息流式输出时的右侧呼吸圆点(直接嵌入 HTML) */ +:deep(.thinking-dot-inline) { + display: inline-block; + width: 8px; + height: 8px; + background-color: #94a3b8; + border-radius: 50%; + margin-left: 8px; + vertical-align: middle; + animation: thinking-pulse 1.5s infinite ease-in-out; + flex-shrink: 0; +} + .assistant-actions { flex-wrap: wrap; gap: 8px; @@ -5368,28 +5425,19 @@ onBeforeUnmount(() => { border-color: rgba(15, 23, 42, 0.08); } -.thinking-indicator { - display: inline-flex; - align-items: center; +.thinking-dot { + width: 8px; + height: 8px; + background-color: #94a3b8; + border-radius: 50%; + margin: 0; + animation: thinking-pulse 1.5s infinite ease-in-out; } -.thinking-indicator__text { - font-size: 15px; - font-weight: 600; - color: #64748b; - background: linear-gradient( - 90deg, - #64748b 0%, - #64748b 25%, - #e2e8f0 50%, - #64748b 75%, - #64748b 100% - ); - background-size: 200% 100%; - -webkit-background-clip: text; - -webkit-text-fill-color: transparent; - background-clip: text; - animation: thinking-shimmer 2s infinite linear; +@keyframes thinking-pulse { + 0% { transform: scale(0.8); opacity: 0.5; } + 50% { transform: scale(1.2); opacity: 1; } + 100% { transform: scale(0.8); opacity: 0.5; } } @keyframes thinking-shimmer { @@ -5543,6 +5591,51 @@ onBeforeUnmount(() => { max-height: 260px; } } + +/* 扫光动效:位于标题上的白色光线从左到右划过 */ +.chat-message__reasoning-title--shimmering { + overflow: hidden; +} + +.chat-message__reasoning-title--shimmering::after { + content: ""; + position: absolute; + top: 0; + left: -100%; + width: 100%; + height: 100%; + background: linear-gradient( + 90deg, + transparent, + rgba(255, 255, 255, 0.9), + transparent + ); + transform: skewX(-20deg); + animation: shimmer-sweep 1.2s infinite linear; + pointer-events: none; +} + +@keyframes shimmer-sweep { + from { left: -150%; } + to { left: 150%; } +} + +/* 推理框展开收起弹性动效 */ +.reasoning-bounce-enter-active { + transition: all 0.4s cubic-bezier(0.34, 1.56, 0.64, 1); + transform-origin: top center; +} + +.reasoning-bounce-leave-active { + transition: all 0.2s ease; + transform-origin: top center; +} + +.reasoning-bounce-enter-from, +.reasoning-bounce-leave-to { + opacity: 0; + transform: translateY(-15px); +} + diff --git a/frontend/src/views/debug/AssistantReasoningDebug.vue b/frontend/src/views/debug/AssistantReasoningDebug.vue new file mode 100644 index 0000000..5ba0e25 --- /dev/null +++ b/frontend/src/views/debug/AssistantReasoningDebug.vue @@ -0,0 +1,25 @@ + + + + +