From 21eed5af750c662260e0fb46612b24ffe22a2f83 Mon Sep 17 00:00:00 2001 From: LoveLosita <2810873701@qq.com> Date: Wed, 15 Apr 2026 11:04:27 +0800 Subject: [PATCH] =?UTF-8?q?Version:=200.9.18.dev.260415=20=E5=90=8E?= =?UTF-8?q?=E7=AB=AF=EF=BC=9A=201.=20ChatNode=20=E8=B7=AF=E7=94=B1?= =?UTF-8?q?=E4=BB=8E=20GenerateJSON=20=E9=87=8D=E6=9E=84=E4=B8=BA=E6=B5=81?= =?UTF-8?q?=E5=BC=8F=E6=8E=A7=E5=88=B6=E7=A0=81=E8=B7=AF=E7=94=B1=20-=20?= =?UTF-8?q?=E6=96=B0=E5=BB=BA=20backend/newAgent/router/chat=5Froute.go?= =?UTF-8?q?=EF=BC=9A=E6=B5=81=E5=BC=8F=E5=A2=9E=E9=87=8F=E6=8E=A7=E5=88=B6?= =?UTF-8?q?=E7=A0=81=E8=A7=A3=E6=9E=90=E5=99=A8=20StreamRouteParser?= =?UTF-8?q?=EF=BC=8C=E5=A4=8D=E7=94=A8=20agent=20=E7=9A=84=20=20=E6=AD=A3=E5=88=99=E6=A8=A1=E5=BC=8F=20-=20=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=20backend/newAgent/node/chat.go=EF=BC=9ARunChatNode?= =?UTF-8?q?=20=E4=BB=8E=20GenerateJSON=EF=BC=88=E9=98=BB=E5=A1=9E=E7=AD=89?= =?UTF-8?q?=E5=AE=8C=E6=95=B4=20JSON=EF=BC=89=E6=94=B9=E4=B8=BA=20Stream?= =?UTF-8?q?=20+=20=E6=8E=A7=E5=88=B6=E7=A0=81=E8=A7=A3=E6=9E=90=20+=20?= =?UTF-8?q?=E5=88=86=E6=94=AF=E6=B5=81=E5=BC=8F=E5=A4=84=E7=90=86=20-=20st?= =?UTF-8?q?reamAndDispatch=20=E6=A0=B8=E5=BF=83=E5=BE=AA=E7=8E=AF=EF=BC=9A?= =?UTF-8?q?=E9=80=90=20chunk=20=E5=96=82=E8=A7=A3=E6=9E=90=E5=99=A8?= =?UTF-8?q?=EF=BC=8C=E6=8E=A7=E5=88=B6=E7=A0=81=E8=A7=A3=E6=9E=90=E5=90=8E?= =?UTF-8?q?=E6=8C=89=20route=20=E5=88=86=E5=8F=91=20-=20handleDirectReplyS?= =?UTF-8?q?tream=EF=BC=9Athinking=3Dfalse=20=E5=90=8C=E4=B8=80=E6=B5=81?= =?UTF-8?q?=E7=BB=AD=E4=BC=A0=EF=BC=8Cthinking=3Dtrue=20=E5=85=B3=E6=B5=81?= =?UTF-8?q?=E5=90=8E=E4=BA=8C=E6=AC=A1=20thinking=20=E8=B0=83=E7=94=A8=20-?= =?UTF-8?q?=20handleDeepAnswerStream=EF=BC=9A=E7=A7=BB=E9=99=A4"=E8=AE=A9?= =?UTF-8?q?=E6=88=91=E6=83=B3=E6=83=B3"=E8=BF=87=E6=B8=A1=E8=AF=AD?= =?UTF-8?q?=EF=BC=8C=E7=9B=B4=E6=8E=A5=E5=85=B3=E6=B5=81=E5=90=8E=E5=8F=91?= =?UTF-8?q?=E8=B5=B7=E7=AC=AC=E4=BA=8C=E6=AC=A1=E6=B5=81=E5=BC=8F=E8=B0=83?= =?UTF-8?q?=E7=94=A8=EF=BC=88thinking=20=E7=94=B1=20effectiveThinking=20?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=EF=BC=89=20-=20handleRouteExecuteStream=20/?= =?UTF-8?q?=20handleRoutePlanStream=EF=BC=9A=E5=85=B3=E6=B5=81=20=E2=86=92?= =?UTF-8?q?=20=E6=8E=A8=E9=80=81=20status=20=E2=86=92=20=E8=AE=BE=20Phase?= =?UTF-8?q?=20-=20=E6=9B=B4=E6=96=B0=20backend/newAgent/prompt/chat.go?= =?UTF-8?q?=EF=BC=9A=E8=B7=AF=E7=94=B1=20prompt=20=E4=BB=8E=20JSON=20?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E6=94=B9=E4=B8=BA=E6=8E=A7=E5=88=B6=E7=A0=81?= =?UTF-8?q?=E6=A0=87=E7=AD=BE=E6=A0=BC=E5=BC=8F=20-=20=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=20backend/newAgent/model/chat=5Fcontract.go=EF=BC=9AChatRoutin?= =?UTF-8?q?gDecision=20=E6=96=B0=E5=A2=9E=20Thinking=20/=20Raw=20=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=EF=BC=8C=E7=A7=BB=E9=99=A4=20Speak=20/=20Reason=202.?= =?UTF-8?q?=20Thinking=20=E5=8F=82=E6=95=B0=E4=BB=8E=20bool=20=E6=89=A9?= =?UTF-8?q?=E5=B1=95=E4=B8=BA=20string=20=E4=B8=89=E6=80=81=20-=20?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=20backend/model/agent.go=EF=BC=9AUserSendMes?= =?UTF-8?q?sageRequest.Thinking=20=E4=BB=8E=20bool=20=E6=94=B9=E4=B8=BA=20?= =?UTF-8?q?string=20-=20=E6=9B=B4=E6=96=B0=20backend/service/agentsvc/agen?= =?UTF-8?q?t.go=EF=BC=9AAgentChat=20/=20runNormalChatFlow=20=E9=80=82?= =?UTF-8?q?=E9=85=8D=20string=20=E7=B1=BB=E5=9E=8B=EF=BC=8C=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=20thinkingModeToBool=20=E5=85=BC=E5=AE=B9=E6=97=A7?= =?UTF-8?q?=E9=93=BE=E8=B7=AF=20-=20=E6=9B=B4=E6=96=B0=20backend/service/a?= =?UTF-8?q?gentsvc/agent=5Fnewagent.go=EF=BC=9ArunNewAgentGraph=20?= =?UTF-8?q?=E6=8E=A5=E6=94=B6=20thinkingMode=20string=20=E5=B9=B6=E6=B3=A8?= =?UTF-8?q?=E5=85=A5=20CommonState=203.=20CommonState=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=20ThinkingMode=20/=20ExecuteThinking=20=E5=AD=97=E6=AE=B5=20-?= =?UTF-8?q?=20=E6=9B=B4=E6=96=B0=20backend/newAgent/model/common=5Fstate.g?= =?UTF-8?q?o=EF=BC=9AThinkingMode=20=E6=8E=A7=E5=88=B6=E4=B8=8B=E6=B8=B8?= =?UTF-8?q?=20thinking=20=E8=A1=8C=E4=B8=BA=EF=BC=88"true"=20=E5=BC=BA?= =?UTF-8?q?=E5=BC=80=20/=20"false"=20=E5=BC=BA=E5=85=B3=20/=20"auto"?= =?UTF-8?q?=E4=BA=A4=E8=B7=AF=E7=94=B1=E5=86=B3=E7=AD=96=EF=BC=89=20-=20Ch?= =?UTF-8?q?atNode=20=E9=80=9A=E8=BF=87=20resolveEffectiveThinking=20?= =?UTF-8?q?=E5=90=88=E5=B9=B6=E5=89=8D=E7=AB=AF=E5=81=8F=E5=A5=BD=E4=B8=8E?= =?UTF-8?q?=E8=B7=AF=E7=94=B1=E5=86=B3=E7=AD=96=EF=BC=8C=E4=BC=A0=E9=80=92?= =?UTF-8?q?=E7=BB=99=E6=89=80=E6=9C=89=E4=B8=8B=E6=B8=B8=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=87=BD=E6=95=B0=204.=20=E6=96=B0=E5=A2=9E=E7=9C=9F=E6=B5=81?= =?UTF-8?q?=E5=BC=8F=E6=8E=A8=E9=80=81=E6=96=B9=E6=B3=95=20-=20=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=20backend/newAgent/stream/emitter.go=EF=BC=9A?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=20EmitStreamAssistantText=20/=20EmitStreamRe?= =?UTF-8?q?asoningText=EF=BC=8C=E6=A1=A5=E6=8E=A5=20StreamReader=20?= =?UTF-8?q?=E2=86=92=20SSE=20chunk=20=E5=89=8D=E7=AB=AF=EF=BC=9A=E6=97=A0?= =?UTF-8?q?=20=E4=BB=93=E5=BA=93=EF=BC=9A=E6=97=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/agent.go | 2 +- backend/newAgent/model/chat_contract.go | 32 +- backend/newAgent/model/common_state.go | 32 +- backend/newAgent/node/chat.go | 447 ++++++++++++++------- backend/newAgent/prompt/chat.go | 88 ++-- backend/newAgent/router/chat_route.go | 164 ++++++++ backend/newAgent/stream/emitter.go | 102 ++++- backend/service/agentsvc/agent.go | 13 +- backend/service/agentsvc/agent_newagent.go | 12 +- 9 files changed, 658 insertions(+), 234 deletions(-) create mode 100644 backend/newAgent/router/chat_route.go diff --git a/backend/model/agent.go b/backend/model/agent.go index b0439b6..8304216 100644 --- a/backend/model/agent.go +++ b/backend/model/agent.go @@ -58,7 +58,7 @@ type UserSendMessageRequest struct { ConversationID string `json:"conversation_id,omitempty"` Message string `json:"message" binding:"required"` Model string `json:"model,omitempty"` - Thinking bool `json:"thinking,omitempty"` + Thinking string `json:"thinking,omitempty"` Extra map[string]any `json:"extra,omitempty"` } diff --git a/backend/newAgent/model/chat_contract.go b/backend/newAgent/model/chat_contract.go index 90fbc2c..00ff2b8 100644 --- a/backend/newAgent/model/chat_contract.go +++ b/backend/newAgent/model/chat_contract.go @@ -26,18 +26,19 @@ const ( // // 职责边界: // 1. Route 决定后续处理路径; -// 2. Speak 始终填写:给用户看的话; -// 3. NeedsRoughBuild 仅在 route=execute 且满足粗排条件时为 true; -// 4. NeedsRefineAfterRoughBuild 仅在 needs_rough_build=true 时有效; -// 5. AllowReorder 表示是否允许打乱 suggested 任务顺序,仅用户明确授权时应为 true; -// 6. Reason 给后端和日志看。 +// 2. NeedsRoughBuild 仅在 route=execute 且满足粗排条件时为 true; +// 3. NeedsRefineAfterRoughBuild 仅在 needs_rough_build=true 时有效; +// 4. AllowReorder 表示是否允许打乱 suggested 任务顺序,仅用户明确授权时应为 true; +// 5. Thinking 表示下游 Execute 节点是否应开启深度思考; +// 6. Raw 保留控制码原文,供日志排查; +// 7. 用户可见内容(speak)由流式输出自然产出,不由本结构承载。 type ChatRoutingDecision struct { - Route ChatRoute `json:"route"` - Speak string `json:"speak,omitempty"` - NeedsRoughBuild bool `json:"needs_rough_build,omitempty"` - NeedsRefineAfterRoughBuild bool `json:"needs_refine_after_rough_build,omitempty"` - AllowReorder bool `json:"allow_reorder,omitempty"` - Reason string `json:"reason,omitempty"` + Route ChatRoute + NeedsRoughBuild bool + NeedsRefineAfterRoughBuild bool + AllowReorder bool + Thinking bool + Raw string } // Normalize 统一清洗路由决策中的字符串字段。 @@ -46,8 +47,7 @@ func (d *ChatRoutingDecision) Normalize() { return } d.Route = ChatRoute(strings.TrimSpace(string(d.Route))) - d.Speak = strings.TrimSpace(d.Speak) - d.Reason = strings.TrimSpace(d.Reason) + d.Raw = strings.TrimSpace(d.Raw) } // Validate 校验路由决策的最小合法性。 @@ -67,16 +67,12 @@ func (d *ChatRoutingDecision) Validate() error { return fmt.Errorf("未知 route: %s", d.Route) } - // direct_reply 必须有 speak。 - if d.Route == ChatRouteDirectReply && d.Speak == "" { - return fmt.Errorf("direct_reply 必须携带 speak") - } - // 非 execute 路由不应携带粗排和粗排后微调标记,统一归一化为 false。 if d.Route != ChatRouteExecute { d.NeedsRoughBuild = false d.NeedsRefineAfterRoughBuild = false d.AllowReorder = false + d.Thinking = false } // 只有 needs_rough_build=true 时,needs_refine_after_rough_build 才有语义。 if !d.NeedsRoughBuild { diff --git a/backend/newAgent/model/common_state.go b/backend/newAgent/model/common_state.go index 2e9a661..ae15fda 100644 --- a/backend/newAgent/model/common_state.go +++ b/backend/newAgent/model/common_state.go @@ -30,7 +30,7 @@ const ( FlowTerminalStatusExhausted FlowTerminalStatus = "exhausted" ) -// FlowTerminalOutcome 保存“流程为什么结束”的最终结果快照。 +// FlowTerminalOutcome 保存"流程为什么结束"的最终结果快照。 // // 职责边界: // 1. Stage 说明终止发生在哪个阶段,便于 graph/deliver/debug 统一收口; @@ -97,7 +97,7 @@ type CommonState struct { // NeedsRoughBuild 由 Plan 节点在 plan_done 时写入,标记 Confirm 后是否需要走粗排节点。 // 粗排节点执行完毕后会将此字段重置为 false。 NeedsRoughBuild bool `json:"needs_rough_build,omitempty"` - // NeedsRefineAfterRoughBuild 表示“粗排完成后是否需要立即进入微调”。 + // NeedsRefineAfterRoughBuild 表示"粗排完成后是否需要立即进入微调"。 // // 说明: // 1. 该标记主要用于 chat->execute 的直执行链路; @@ -105,13 +105,21 @@ type CommonState struct { // 3. false 表示用户仅要求完成排入,粗排成功后可直接收口,等待后续再优化。 NeedsRefineAfterRoughBuild bool `json:"needs_refine_after_rough_build,omitempty"` // AllowReorder 表示本轮是否允许打乱 suggested 任务的相对顺序。 - // 默认 false,只有用户明确说明“可以打乱顺序/顺序不重要”才会为 true。 + // 默认 false,只有用户明确说明"可以打乱顺序/顺序不重要"才会为 true。 AllowReorder bool `json:"allow_reorder,omitempty"` - // SuggestedOrderBaseline 保存“本轮 execute 启动前”的 suggested 任务相对顺序基线。 + // SuggestedOrderBaseline 保存"本轮 execute 启动前"的 suggested 任务相对顺序基线。 // OrderGuard 节点会基于该基线判断微调是否破坏顺序约束。 SuggestedOrderBaseline []int `json:"suggested_order_baseline,omitempty"` - // TerminalOutcome 保存“本轮流程最终如何结束”的统一收口结果。 + // ExecuteThinking 由 Chat 路由决策传入,表示 Execute 节点是否应开启深度思考。 + // 预埋字段,当前阶段 Execute 节点可自行决定是否读取。 + ExecuteThinking bool `json:"execute_thinking,omitempty"` + + // ThinkingMode 由前端传入,控制所有下游 LLM 调用的 thinking 行为。 + // "true" 强制开启,"false" 强制关闭,"auto"(默认)交给路由决策。 + ThinkingMode string `json:"thinking_mode,omitempty"` + + // TerminalOutcome 保存"本轮流程最终如何结束"的统一收口结果。 // 第二轮开始,rough_build / execute / deliver 都应围绕这份快照判断收口语义。 TerminalOutcome *FlowTerminalOutcome `json:"terminal_outcome,omitempty"` } @@ -184,12 +192,12 @@ func (s *CommonState) RejectPlan() { s.ClearTerminalOutcome() } -// ResetForNextRun 在“上一轮已经收口,且本轮准备开始新请求”时重置执行期临时状态。 +// ResetForNextRun 在"上一轮已经收口,且本轮准备开始新请求"时重置执行期临时状态。 // // 职责边界: // 1. 负责清理会污染新一轮执行的临时字段(轮次、修正计数、计划游标、粗排开关、顺序基线、终止结果); // 2. 不负责清理会话身份与跨轮共享数据(ConversationID/UserID/TaskClassIDs/TaskClasses/历史上下文/ScheduleState); -// 3. 该方法是幂等操作:重复调用不会引入额外副作用,便于在“加载兜底 + chat 入口”双保险场景下复用。 +// 3. 该方法是幂等操作:重复调用不会引入额外副作用,便于在"加载兜底 + chat 入口"双保险场景下复用。 func (s *CommonState) ResetForNextRun() { if s == nil { return @@ -237,7 +245,7 @@ func (s *CommonState) Done() { } } -// Abort 将当前流程标记为“业务语义上的主动终止”。 +// Abort 将当前流程标记为"业务语义上的主动终止"。 // // 步骤说明: // 1. 统一写入 PhaseDone,保证 graph 后续直接进入 deliver 收口; @@ -255,7 +263,7 @@ func (s *CommonState) Abort(stage, code, userMessage, internalReason string) { s.TerminalOutcome.Normalize() } -// Exhaust 将当前流程标记为“安全边界触发的被动停止”。 +// Exhaust 将当前流程标记为"安全边界触发的被动停止"。 func (s *CommonState) Exhaust(stage, userMessage, internalReason string) { s.Phase = PhaseDone s.TerminalOutcome = &FlowTerminalOutcome{ @@ -289,17 +297,17 @@ func (s *CommonState) TerminalStatus() FlowTerminalStatus { return s.TerminalOutcome.Status } -// IsCompleted 判断当前是否属于“正常完成”。 +// IsCompleted 判断当前是否属于"正常完成"。 func (s *CommonState) IsCompleted() bool { return s.TerminalStatus() == FlowTerminalStatusCompleted } -// IsAborted 判断当前是否属于“主动中止”。 +// IsAborted 判断当前是否属于"主动中止"。 func (s *CommonState) IsAborted() bool { return s.TerminalStatus() == FlowTerminalStatusAborted } -// IsExhaustedTerminal 判断当前是否属于“轮次耗尽收口”。 +// IsExhaustedTerminal 判断当前是否属于"轮次耗尽收口"。 func (s *CommonState) IsExhaustedTerminal() bool { return s.TerminalStatus() == FlowTerminalStatusExhausted } diff --git a/backend/newAgent/node/chat.go b/backend/newAgent/node/chat.go index 967286b..2204920 100644 --- a/backend/newAgent/node/chat.go +++ b/backend/newAgent/node/chat.go @@ -3,15 +3,18 @@ package newagentnode import ( "context" "fmt" + "io" "log" "strings" "time" "github.com/cloudwego/eino/schema" + "github.com/google/uuid" infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" + newagentrouter "github.com/LoveLosita/smartflow/backend/newAgent/router" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" ) @@ -21,7 +24,7 @@ const ( chatSpeakBlockID = "chat.speak" // chatHistoryKindKey 用于在 history 中打运行态标记,供 prompt 层做上下文分层。 chatHistoryKindKey = "newagent_history_kind" - // chatHistoryKindExecuteLoopClosed 表示“上一轮 execute loop 已正常收口”。 + // chatHistoryKindExecuteLoopClosed 表示"上一轮 execute loop 已正常收口"。 // prompt 侧会据此把旧 loop 归档到 msg1,而不是继续占用 msg2 窗口。 chatHistoryKindExecuteLoopClosed = "execute_loop_closed" ) @@ -75,9 +78,9 @@ func RunChatNode(ctx context.Context, input ChatNodeInput) error { if !runtimeState.HasPendingInteraction() && flowState.Phase == newagentmodel.PhaseDone { terminalBefore := flowState.TerminalStatus() roundBefore := flowState.RoundUsed - // 1. 只有“正常完成(completed)”才打 loop 收口标记: - // 1.1 这样下一轮进入 execute 时,msg2 会只保留“当前活跃循环”窗口; - // 1.2 异常收口(exhausted/aborted)不打标记,允许后续“继续”时沿用上一轮 loop 轨迹。 + // 1. 只有"正常完成(completed)"才打 loop 收口标记: + // 1.1 这样下一轮进入 execute 时,msg2 会只保留"当前活跃循环"窗口; + // 1.2 异常收口(exhausted/aborted)不打标记,允许后续"继续"时沿用上一轮 loop 轨迹。 if terminalBefore == newagentmodel.FlowTerminalStatusCompleted { appendExecuteLoopClosedMarker(conversationContext) } @@ -89,86 +92,28 @@ func RunChatNode(ctx context.Context, input ChatNodeInput) error { terminalBefore, ) } - messages := newagentprompt.BuildChatRoutingMessages(conversationContext, input.UserInput, flowState) + nonce := uuid.NewString() + messages := newagentprompt.BuildChatRoutingMessages(conversationContext, input.UserInput, flowState, nonce) - decision, rawResult, err := infrallm.GenerateJSON[newagentmodel.ChatRoutingDecision]( - ctx, - input.Client, - messages, - infrallm.GenerateOptions{ - Temperature: 0.1, - MaxTokens: 500, - Thinking: infrallm.ThinkingModeDisabled, - Metadata: map[string]any{ - "stage": chatStageName, - "phase": "routing", - }, + reader, err := input.Client.Stream(ctx, messages, infrallm.GenerateOptions{ + Temperature: 0.7, + Thinking: infrallm.ThinkingModeDisabled, + Metadata: map[string]any{ + "stage": chatStageName, + "phase": "routing", }, - ) - - rawText := "" - if rawResult != nil { - rawText = strings.TrimSpace(rawResult.Text) - } - + }) if err != nil { - // 路由失败 → 保守:走 plan。 - log.Printf("[WARN] chat routing LLM failed chat=%s raw=%s err=%v", - flowState.ConversationID, rawText, err) + log.Printf("[WARN] chat routing stream failed chat=%s err=%v", flowState.ConversationID, err) flowState.Phase = newagentmodel.PhasePlanning return nil } - if validateErr := decision.Validate(); validateErr != nil { - log.Printf("[WARN] chat routing decision invalid chat=%s raw=%s err=%v", - flowState.ConversationID, rawText, validateErr) - flowState.Phase = newagentmodel.PhasePlanning - return nil - } - - // 1. 二次粗排硬闸门:若上下文已存在 rough_build_done 且用户未明确要求“重新粗排”, - // 则强制关闭 needs_rough_build,避免“微调请求被误判成再次粗排”。 - // 2. 该闸门只收紧粗排开关,不改路由 route,确保 execute 微调链路仍可继续。 - // 3. 一旦用户明确表达“从头重排/重新粗排”,仍允许 needs_rough_build=true 生效。 - if shouldDisableRoughBuildForRefine(conversationContext, input.UserInput, decision) { - decision.NeedsRoughBuild = false - decision.NeedsRefineAfterRoughBuild = false - } - - log.Printf( - "[DEBUG] chat routing chat=%s route=%s needs_rough_build=%v needs_refine_after_rough_build=%v allow_reorder=%v has_rough_build_done=%v task_class_count=%d reason=%s", - flowState.ConversationID, - decision.Route, - decision.NeedsRoughBuild, - decision.NeedsRefineAfterRoughBuild, - decision.AllowReorder, - hasRoughBuildDoneMarker(conversationContext), - len(flowState.TaskClassIDs), - decision.Reason, - ) - flowState.AllowReorder = resolveAllowReorder(input.UserInput, decision.AllowReorder) - - // 3. 按路由决策推进。 - switch decision.Route { - case newagentmodel.ChatRouteDirectReply: - return handleDirectReply(ctx, decision, conversationContext, emitter, flowState) - - case newagentmodel.ChatRouteExecute: - return handleRouteExecute(decision, emitter, flowState) - - case newagentmodel.ChatRouteDeepAnswer: - return handleDeepAnswer(ctx, input, decision, conversationContext, emitter, flowState) - - case newagentmodel.ChatRoutePlan: - return handleRoutePlan(decision, emitter, flowState) - - default: - flowState.Phase = newagentmodel.PhasePlanning - return nil - } + parser := newagentrouter.NewStreamRouteParser(nonce) + return streamAndDispatch(ctx, reader, parser, input, emitter, flowState, conversationContext) } -// appendExecuteLoopClosedMarker 在 history 中写入“execute loop 已正常收口”标记。 +// appendExecuteLoopClosedMarker 在 history 中写入"execute loop 已正常收口"标记。 // // 职责边界: // 1. 只负责写一个轻量 marker,供 prompt 分层; @@ -207,51 +152,254 @@ func isExecuteLoopClosedMarker(msg *schema.Message) bool { return strings.TrimSpace(kind) == chatHistoryKindExecuteLoopClosed } -// handleDirectReply 处理简单任务:直接输出回复。 -func handleDirectReply( +// streamAndDispatch 是流式路由分发的核心循环。 +// +// 步骤说明: +// 1. 从 StreamReader 逐 chunk 读取,喂给 StreamRouteParser 增量解析控制码; +// 2. 控制码解析完成后,根据 route 进入对应的流式处理分支; +// 3. 控制码解析超时或流异常结束 → fallback 到 plan。 +func streamAndDispatch( ctx context.Context, - decision *newagentmodel.ChatRoutingDecision, - conversationContext *newagentmodel.ConversationContext, + reader infrallm.StreamReader, + parser *newagentrouter.StreamRouteParser, + input ChatNodeInput, emitter *newagentstream.ChunkEmitter, flowState *newagentmodel.CommonState, + conversationContext *newagentmodel.ConversationContext, ) error { - if strings.TrimSpace(decision.Speak) != "" { - if err := emitter.EmitPseudoAssistantText( - ctx, chatSpeakBlockID, chatStageName, - decision.Speak, - newagentstream.DefaultPseudoStreamOptions(), - ); err != nil { - return fmt.Errorf("闲聊回复推送失败: %w", err) + for { + chunk, err := reader.Recv() + if err == io.EOF { + if !parser.RouteReady() { + log.Printf("[WARN] chat stream ended before route resolved chat=%s", flowState.ConversationID) + flowState.Phase = newagentmodel.PhasePlanning + return nil + } + break } - conversationContext.AppendHistory(schema.AssistantMessage(decision.Speak, nil)) + if err != nil { + log.Printf("[WARN] chat stream recv error chat=%s err=%v", flowState.ConversationID, err) + flowState.Phase = newagentmodel.PhasePlanning + return nil + } + + content := "" + if chunk != nil { + content = chunk.Content + } + + visible, routeReady, _ := parser.Feed(content) + if !routeReady { + continue + } + + // 控制码解析完成,进入路由分发。 + decision := parser.Decision() + + // 二次粗排硬闸门:若上下文已存在 rough_build_done 且用户未明确要求"重新粗排", + // 则强制关闭 needs_rough_build,避免"微调请求被误判成再次粗排"。 + if shouldDisableRoughBuildForRefine(conversationContext, input.UserInput, decision) { + decision.NeedsRoughBuild = false + decision.NeedsRefineAfterRoughBuild = false + } + + log.Printf( + "[DEBUG] chat routing chat=%s route=%s needs_rough_build=%v needs_refine_after_rough_build=%v allow_reorder=%v thinking=%v has_rough_build_done=%v task_class_count=%d raw=%s", + flowState.ConversationID, + decision.Route, + decision.NeedsRoughBuild, + decision.NeedsRefineAfterRoughBuild, + decision.AllowReorder, + decision.Thinking, + hasRoughBuildDoneMarker(conversationContext), + len(flowState.TaskClassIDs), + decision.Raw, + ) + + flowState.AllowReorder = resolveAllowReorder(input.UserInput, decision.AllowReorder) + effectiveThinking := resolveEffectiveThinking(flowState.ThinkingMode, decision.Thinking) + + switch decision.Route { + case newagentmodel.ChatRouteDirectReply: + return handleDirectReplyStream(ctx, reader, input, emitter, conversationContext, flowState, effectiveThinking, visible) + + case newagentmodel.ChatRouteExecute: + return handleRouteExecuteStream(reader, emitter, flowState, decision, input.UserInput, effectiveThinking, visible) + + case newagentmodel.ChatRouteDeepAnswer: + return handleDeepAnswerStream(ctx, reader, input, emitter, conversationContext, flowState, effectiveThinking) + + case newagentmodel.ChatRoutePlan: + return handleRoutePlanStream(reader, emitter, flowState, effectiveThinking, visible) + + default: + flowState.Phase = newagentmodel.PhasePlanning + return nil + } + } + return nil +} + +// resolveEffectiveThinking 根据前端 ThinkingMode 和路由决策合并出最终 thinking 状态。 +// +// 规则: +// - "true" 强制开启; +// - "false" 强制关闭; +// - "auto"/"" 交给路由决策的 decisionThinking。 +func resolveEffectiveThinking(mode string, decisionThinking bool) bool { + switch strings.TrimSpace(strings.ToLower(mode)) { + case "true": + return true + case "false": + return false + default: + return decisionThinking + } +} + +// handleDirectReplyStream 处理闲聊回复。 +// +// 两种模式: +// 1. thinking=false:同一流续传,逐 chunk 推送; +// 2. thinking=true:关闭路由流,发起第二次 thinking 流式调用。 +func handleDirectReplyStream( + ctx context.Context, + reader infrallm.StreamReader, + input ChatNodeInput, + emitter *newagentstream.ChunkEmitter, + conversationContext *newagentmodel.ConversationContext, + flowState *newagentmodel.CommonState, + effectiveThinking bool, + firstVisible string, +) error { + if effectiveThinking { + return handleThinkingReplyStream(ctx, reader, input, emitter, conversationContext, flowState) + } + return handleDirectReplyContinueStream(ctx, reader, emitter, conversationContext, flowState, firstVisible) +} + +// handleThinkingReplyStream 处理需要思考的回复:关闭路由流 → 第二次 thinking 流式调用。 +func handleThinkingReplyStream( + ctx context.Context, + reader infrallm.StreamReader, + input ChatNodeInput, + emitter *newagentstream.ChunkEmitter, + conversationContext *newagentmodel.ConversationContext, + flowState *newagentmodel.CommonState, +) error { + _ = reader.Close() + + deepMessages := newagentprompt.BuildDeepAnswerMessages(conversationContext, input.UserInput) + deepReader, err := input.Client.Stream(ctx, deepMessages, infrallm.GenerateOptions{ + Temperature: 0.5, + MaxTokens: 2000, + Thinking: infrallm.ThinkingModeEnabled, + Metadata: map[string]any{ + "stage": chatStageName, + "phase": "direct_reply_thinking", + }, + }) + if err != nil { + log.Printf("[WARN] thinking reply stream failed chat=%s err=%v", flowState.ConversationID, err) + flowState.Phase = newagentmodel.PhaseChatting + return nil + } + + deepText, err := emitter.EmitStreamAssistantText(ctx, deepReader, chatSpeakBlockID, chatStageName) + _ = deepReader.Close() + if err != nil { + log.Printf("[WARN] thinking reply emit error chat=%s err=%v", flowState.ConversationID, err) + flowState.Phase = newagentmodel.PhaseChatting + return nil + } + + deepText = strings.TrimSpace(deepText) + if deepText != "" { + conversationContext.AppendHistory(schema.AssistantMessage(deepText, nil)) } flowState.Phase = newagentmodel.PhaseChatting return nil } -// handleRouteExecute 处理中等任务:推送简短确认,设 PhaseExecuting。 +// handleDirectReplyContinueStream 处理无思考的闲聊:同一流续传。 +func handleDirectReplyContinueStream( + ctx context.Context, + reader infrallm.StreamReader, + emitter *newagentstream.ChunkEmitter, + conversationContext *newagentmodel.ConversationContext, + flowState *newagentmodel.CommonState, + firstVisible string, +) error { + var fullText strings.Builder + fullText.WriteString(firstVisible) + + // 推送控制码之后的第一段内容。 + if strings.TrimSpace(firstVisible) != "" { + if err := emitter.EmitAssistantText(chatSpeakBlockID, chatStageName, firstVisible, true); err != nil { + return fmt.Errorf("闲聊回复推送失败: %w", err) + } + } + + firstChunk := firstVisible == "" + // 继续读同一个流,逐 chunk 推送。 + for { + chunk, err := reader.Recv() + if err == io.EOF { + break + } + if err != nil { + log.Printf("[WARN] direct_reply stream error chat=%s err=%v", flowState.ConversationID, err) + break + } + if chunk == nil || chunk.Content == "" { + continue + } + if err := emitter.EmitAssistantText(chatSpeakBlockID, chatStageName, chunk.Content, firstChunk); err != nil { + return fmt.Errorf("闲聊回复推送失败: %w", err) + } + fullText.WriteString(chunk.Content) + firstChunk = false + } + + text := fullText.String() + if strings.TrimSpace(text) != "" { + conversationContext.AppendHistory(schema.AssistantMessage(text, nil)) + } + + flowState.Phase = newagentmodel.PhaseChatting + return nil +} + +// handleRouteExecuteStream 处理工具调用路由:推送状态确认 → 设 PhaseExecuting。 // -// 不把 speak 写入 history,因为真正的回复由 Execute 节点产出。 -func handleRouteExecute( - decision *newagentmodel.ChatRoutingDecision, +// 说明: +// 1. 关闭路由流(后续内容不需要); +// 2. 推送轻量状态通知; +// 3. 设置流程状态,进入 Execute 或 RoughBuild。 +func handleRouteExecuteStream( + reader infrallm.StreamReader, emitter *newagentstream.ChunkEmitter, flowState *newagentmodel.CommonState, + decision *newagentmodel.ChatRoutingDecision, + userInput string, + effectiveThinking bool, + speak string, ) error { - speak := strings.TrimSpace(decision.Speak) - if speak == "" { + // 关闭路由流。 + _ = reader.Close() + + if strings.TrimSpace(speak) == "" { speak = "好的,我来处理。" } - // 推送轻量状态通知,让前端知道请求已接收。 + // 推送轻量状态通知。 _ = emitter.EmitStatus(chatStatusBlockID, chatStageName, "accepted", speak, false) - // 清空旧 PlanSteps 并设 PhaseExecuting,避免上一次任务残留的步骤被 HasPlan() 误判。 + // 清空旧 PlanSteps 并设 PhaseExecuting。 flowState.StartDirectExecute() - // 1. 默认不走粗排与粗排后微调,避免沿用上轮遗留标记。 - // 2. 只有 route 判定为“需要粗排”且确实有 task_class_ids 时,才打开粗排开关。 - // 3. 粗排后是否立即进入微调,完全由路由决策显式标记控制。 + // 粗排开关逻辑。 flowState.NeedsRoughBuild = false flowState.NeedsRefineAfterRoughBuild = false if decision.NeedsRoughBuild && len(flowState.TaskClassIDs) > 0 { @@ -259,15 +407,17 @@ func handleRouteExecute( flowState.NeedsRefineAfterRoughBuild = decision.NeedsRefineAfterRoughBuild } + flowState.ExecuteThinking = effectiveThinking + return nil } -// resolveAllowReorder 统一计算“本轮是否允许打乱顺序”。 +// resolveAllowReorder 统一计算"本轮是否允许打乱顺序"。 // // 步骤化说明: // 1. 后端先做显式语义判定:用户明确允许/明确禁止时,直接以后端判定为准; // 2. 若后端未识别到显式语义,再回退到路由模型的 allow_reorder 字段; -// 3. 默认返回 false,确保“保持顺序”是系统默认行为。 +// 3. 默认返回 false,确保"保持顺序"是系统默认行为。 func resolveAllowReorder(userInput string, modelAllowReorder bool) bool { switch detectReorderPreference(userInput) { case reorderAllow: @@ -279,11 +429,11 @@ func resolveAllowReorder(userInput string, modelAllowReorder bool) bool { } } -// detectReorderPreference 识别用户是否“明确授权打乱顺序”。 +// detectReorderPreference 识别用户是否"明确授权打乱顺序"。 // // 职责边界: // 1. 只负责关键词级别的显式意图识别,不做复杂语义推理; -// 2. 若同时命中“允许”与“禁止”,优先按“禁止”处理,避免误放开顺序约束; +// 2. 若同时命中"允许"与"禁止",优先按"禁止"处理,避免误放开顺序约束; // 3. 未命中显式表达时返回 unknown,交给上层兜底策略。 func detectReorderPreference(userInput string) reorderPreference { text := strings.ToLower(strings.TrimSpace(userInput)) @@ -332,12 +482,12 @@ func containsAnyPhrase(text string, phrases []string) bool { return false } -// shouldDisableRoughBuildForRefine 判断是否应在 chat 路由阶段关闭“再次粗排”。 +// shouldDisableRoughBuildForRefine 判断是否应在 chat 路由阶段关闭"再次粗排"。 // // 判定规则: // 1. 当前决策未请求粗排时,直接不干预; // 2. 上下文不存在 rough_build_done 时,不干预(首次粗排仍可走); -// 3. 若用户未明确要求“重新粗排/从头重排”,则关闭粗排开关,避免误触发。 +// 3. 若用户未明确要求"重新粗排/从头重排",则关闭粗排开关,避免误触发。 func shouldDisableRoughBuildForRefine( conversationContext *newagentmodel.ConversationContext, userInput string, @@ -364,7 +514,7 @@ func hasRoughBuildDoneMarker(conversationContext *newagentmodel.ConversationCont return false } -// isExplicitRoughBuildRequest 识别用户是否明确要求“重新粗排/从头重排”。 +// isExplicitRoughBuildRequest 识别用户是否明确要求"重新粗排/从头重排"。 func isExplicitRoughBuildRequest(userInput string) bool { text := strings.ToLower(strings.TrimSpace(userInput)) if text == "" { @@ -388,80 +538,81 @@ func isExplicitRoughBuildRequest(userInput string) bool { return containsAnyPhrase(text, keywords) } -// handleDeepAnswer 处理复杂问答:推送过渡语 → 原地开 thinking 再调一次 LLM → 输出深度回答。 -func handleDeepAnswer( +// handleDeepAnswerStream 处理复杂问答:关闭路由流 → 第二次流式调用。 +// +// 步骤说明: +// 1. 关闭第一个路由流; +// 2. 发起第二次流式 LLM 调用(thinking 由 effectiveThinking 控制); +// 3. 真流式推送 reasoning + 正文; +// 4. 完整回复写入 history。 +func handleDeepAnswerStream( ctx context.Context, + reader infrallm.StreamReader, input ChatNodeInput, - decision *newagentmodel.ChatRoutingDecision, - conversationContext *newagentmodel.ConversationContext, emitter *newagentstream.ChunkEmitter, + conversationContext *newagentmodel.ConversationContext, flowState *newagentmodel.CommonState, + effectiveThinking bool, ) error { - // 1. 推送过渡语。 - briefSpeak := strings.TrimSpace(decision.Speak) - if briefSpeak == "" { - briefSpeak = "让我想想。" - } - if err := emitter.EmitPseudoAssistantText( - ctx, chatSpeakBlockID, chatStageName, - briefSpeak, - newagentstream.DefaultPseudoStreamOptions(), - ); err != nil { - return fmt.Errorf("过渡文案推送失败: %w", err) - } + // 1. 关闭第一个路由流。 + _ = reader.Close() - // 2. 第二次 LLM 调用:开 thinking,深度回答。 + // 2. 第二次流式调用。 + thinkingOpt := infrallm.ThinkingModeDisabled + if effectiveThinking { + thinkingOpt = infrallm.ThinkingModeEnabled + } deepMessages := newagentprompt.BuildDeepAnswerMessages(conversationContext, input.UserInput) - deepResult, err := input.Client.GenerateText(ctx, deepMessages, infrallm.GenerateOptions{ + deepReader, err := input.Client.Stream(ctx, deepMessages, infrallm.GenerateOptions{ Temperature: 0.5, MaxTokens: 2000, - Thinking: infrallm.ThinkingModeEnabled, + Thinking: thinkingOpt, Metadata: map[string]any{ "stage": chatStageName, "phase": "deep_answer", }, }) - - if err != nil || deepResult == nil { - // 深度回答失败 → 降级,只保留过渡语。 - log.Printf("[WARN] deep answer LLM failed chat=%s err=%v", flowState.ConversationID, err) - conversationContext.AppendHistory(schema.AssistantMessage(briefSpeak, nil)) + if err != nil { + // 深度回答失败 → 降级返回。 + log.Printf("[WARN] deep answer stream failed chat=%s err=%v", flowState.ConversationID, err) flowState.Phase = newagentmodel.PhaseChatting return nil } - // 3. 输出深度回答。 - deepText := strings.TrimSpace(deepResult.Text) + // 3. 真流式推送 reasoning + 正文。 + deepText, err := emitter.EmitStreamAssistantText(ctx, deepReader, chatSpeakBlockID, chatStageName) + _ = deepReader.Close() + if err != nil { + log.Printf("[WARN] deep answer stream emit error chat=%s err=%v", flowState.ConversationID, err) + flowState.Phase = newagentmodel.PhaseChatting + return nil + } + + deepText = strings.TrimSpace(deepText) if deepText == "" { - conversationContext.AppendHistory(schema.AssistantMessage(briefSpeak, nil)) flowState.Phase = newagentmodel.PhaseChatting return nil } - if err := emitter.EmitPseudoAssistantText( - ctx, chatSpeakBlockID, chatStageName, - deepText, - newagentstream.DefaultPseudoStreamOptions(), - ); err != nil { - return fmt.Errorf("深度回答推送失败: %w", err) - } - - // 将完整回复(过渡语 + 深度回答)写入 history。 - fullReply := briefSpeak + "\n\n" + deepText - conversationContext.AppendHistory(schema.AssistantMessage(fullReply, nil)) + // 4. 完整回复写入 history。 + conversationContext.AppendHistory(schema.AssistantMessage(deepText, nil)) flowState.Phase = newagentmodel.PhaseChatting return nil } -// handleRoutePlan 处理复杂规划:推送确认语,设 PhasePlanning。 -func handleRoutePlan( - decision *newagentmodel.ChatRoutingDecision, +// handleRoutePlanStream 处理规划路由:推送状态确认 → 设 PhasePlanning。 +func handleRoutePlanStream( + reader infrallm.StreamReader, emitter *newagentstream.ChunkEmitter, flowState *newagentmodel.CommonState, + effectiveThinking bool, + speak string, ) error { - speak := strings.TrimSpace(decision.Speak) - if speak == "" { + // 关闭路由流。 + _ = reader.Close() + + if strings.TrimSpace(speak) == "" { speak = "好的,让我来规划一下。" } diff --git a/backend/newAgent/prompt/chat.go b/backend/newAgent/prompt/chat.go index 6c33635..1dbe63f 100644 --- a/backend/newAgent/prompt/chat.go +++ b/backend/newAgent/prompt/chat.go @@ -3,62 +3,71 @@ package newagentprompt import ( "fmt" "strings" + "time" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" "github.com/cloudwego/eino/schema" ) const chatRoutingSystemPrompt = ` -你是 SmartFlow 的智能路由器。你的职责是判断用户意图的复杂度,并决定后续处理路径。 - -你会看到: -- 历史对话 -- 用户本轮输入 -- 当前可用工具摘要(如有) -- 本次排课涉及的任务类约束(如有) - -请遵守以下规则: -1. 只输出严格 JSON,不要输出 markdown,不要输出额外解释。 -2. 根据用户意图判断复杂度并选择路由。 -3. speak 字段始终填写:给用户看的话。 +你是 SmartFlow 的智能路由器。你的回复必须以路由控制码开头,控制码后紧跟用户可见的内容。 路由规则: -- direct_reply:纯闲聊、简单问答、打招呼、感谢等。speak 直接写你的完整回复。 -- execute:需要用工具处理的请求(查询日程、移动课程、排课等),但不需要先制定计划。speak 写简短确认。 -- deep_answer:复杂问题但不需要工具(如分析建议、深度解释等),需要深度思考后直接回答。speak 写过渡语(如"让我想想")。 -- plan:用户明确要求先制定计划,或涉及多阶段复杂规划。speak 写确认语。 +- direct_reply:纯闲聊、简单问答、打招呼、感谢等。控制码后直接输出完整回复。 +- execute:需要用工具处理的请求(查询日程、移动课程、排课等),但不需要先制定计划。控制码后输出简短确认。 +- deep_answer:复杂问题但不需要工具(如分析建议、深度解释等),需要深度思考后回答。控制码后输出过渡语(如"让我想想")。 +- plan:用户明确要求先制定计划,或涉及多阶段复杂规划。控制码后输出简短确认。 -粗排判断:当用户意图包含"批量安排/排课/把任务类排进日程",且上下文中有任务类 ID 时,设置 needs_rough_build=true。 +粗排判断:当用户意图包含"批量安排/排课/把任务类排进日程",且上下文中有任务类 ID 时,设置 rough_build=true。 二次粗排约束(强约束): -- 若上下文已出现 rough_build_done,且用户未明确要求“重新粗排/从头重排”,必须设置 needs_rough_build=false。 -- “移动/微调/优化/均匀化/调顺序”等请求默认视为 refine,不得再次触发 rough build。 +- 若上下文已出现 rough_build_done,且用户未明确要求"重新粗排/从头重排",必须设置 rough_build=false。 +- "移动/微调/优化/均匀化/调顺序"等请求默认视为 refine,不得再次触发 rough build。 粗排后微调判断: -- 仅当 needs_rough_build=true 时才判断 needs_refine_after_rough_build。 -- 若用户明确提出优化目标/偏好(如"尽量均衡""周三别太满""某门课往后挪"),设 needs_refine_after_rough_build=true。 -- 若用户只要求"先排进去/给初稿",未提出微调目标,设 needs_refine_after_rough_build=false。 +- 仅当 rough_build=true 时才判断 refine。 +- 若用户明确提出优化目标/偏好(如"尽量均衡""周三别太满""某门课往后挪"),设 refine=true。 +- 若用户只要求"先排进去/给初稿",未提出微调目标,设 refine=false。 顺序授权判断: -- allow_reorder 仅在用户明确说明“允许打乱顺序/顺序不重要”时才为 true。 -- 用户明确要求“保持顺序/不要打乱”时必须为 false。 +- reorder 仅在用户明确说明"允许打乱顺序/顺序不重要"时才为 true。 +- 用户明确要求"保持顺序/不要打乱"时必须为 false。 - 若用户未明确提及顺序,一律为 false。 +深度思考判断: +- thinking 仅在 route=execute 时有效。 +- 当用户请求涉及复杂推理、多条件约束、需要深度分析后才能执行的操作时,设 thinking=true。 +- 简单查询、单步操作设 thinking=false。 -输出协议(严格 JSON): -{"route":"direct_reply / execute / deep_answer / plan","speak":"给用户看的话","needs_rough_build":false,"needs_refine_after_rough_build":false,"allow_reorder":false,"reason":"简短判断依据"} +输出格式(严格两段式): +第一段(控制码,用户不可见,后端会截取): + +第二段(紧接控制码之后,用户可见): +根据路由输出对应内容。 + +属性说明(仅 route=execute 时有效,其余路由省略这些属性): +- rough_build:是否需要粗排 +- refine:粗排后是否需要微调 +- reorder:是否允许打乱顺序 +- thinking:后续执行阶段是否需要深度思考 合法示例: -{"route":"direct_reply","speak":"你好!我是 SmartFlow 助手,有什么可以帮你的?","reason":"用户打招呼"} + +你好!我是 SmartFlow 助手,有什么可以帮你的? -{"route":"execute","speak":"好的,我来帮你看看今天的安排。","reason":"需要调用工具查询日程","needs_rough_build":false,"needs_refine_after_rough_build":false,"allow_reorder":false} + +好的,我来帮你看看今天的安排。 -{"route":"execute","speak":"好的,我来帮你排课。","reason":"批量排课需求,有任务类 ID,未给微调偏好","needs_rough_build":true,"needs_refine_after_rough_build":false,"allow_reorder":false} + +好的,我来帮你排课。 -{"route":"execute","speak":"好的,我来帮你排课并按你的偏好做微调。","reason":"批量排课需求,有任务类 ID,且给出明确微调偏好","needs_rough_build":true,"needs_refine_after_rough_build":true,"allow_reorder":false} + +好的,我来帮你排课并按你的偏好做微调。 -{"route":"execute","speak":"好的,我按你的要求重排。","reason":"用户明确允许打乱顺序","needs_rough_build":false,"needs_refine_after_rough_build":false,"allow_reorder":true} + +这是个好问题,让我仔细想想。 -{"route":"deep_answer","speak":"这是个好问题,让我仔细想想。","reason":"需要深度分析但不需要工具"} + +明白,我来帮你制定一个完整的学习计划。 -{"route":"plan","speak":"明白,我来帮你制定一个完整的学习计划。","reason":"用户明确要求制定计划"} +禁止输出任何 JSON、markdown 代码块或额外解释。nonce 必须精确使用给定值。 ` // BuildChatRoutingSystemPrompt 返回路由阶段的系统提示词。 @@ -67,22 +76,21 @@ func BuildChatRoutingSystemPrompt() string { } // BuildChatRoutingMessages 组装路由阶段的 messages。 -func BuildChatRoutingMessages(ctx *newagentmodel.ConversationContext, userInput string, state *newagentmodel.CommonState) []*schema.Message { +func BuildChatRoutingMessages(ctx *newagentmodel.ConversationContext, userInput string, state *newagentmodel.CommonState, nonce string) []*schema.Message { return buildStageMessages( BuildChatRoutingSystemPrompt(), ctx, - BuildChatRoutingUserPrompt(ctx, userInput, state), + BuildChatRoutingUserPrompt(ctx, userInput, state, nonce), ) } // BuildChatRoutingUserPrompt 构造路由阶段的用户提示词。 -func BuildChatRoutingUserPrompt(ctx *newagentmodel.ConversationContext, userInput string, state *newagentmodel.CommonState) string { +func BuildChatRoutingUserPrompt(ctx *newagentmodel.ConversationContext, userInput string, state *newagentmodel.CommonState, nonce string) string { var sb strings.Builder - sb.WriteString("请判断用户本轮意图的复杂度,并选择最合适的路由。\n") - sb.WriteString("若 route=execute 且 needs_rough_build=true,请同时判断 needs_refine_after_rough_build:") - sb.WriteString("只有用户明确提出微调目标时才为 true。\n") - sb.WriteString("请同时输出 allow_reorder:只有用户明确授权打乱顺序时才为 true,默认 false。\n") + sb.WriteString(fmt.Sprintf("nonce=%s\n", nonce)) + sb.WriteString(fmt.Sprintf("当前时间=%s\n", time.Now().In(time.Local).Format("2006-01-02 15:04"))) + sb.WriteString("\n请判断用户本轮意图的复杂度,选择最合适的路由,并输出控制码和对应内容。\n") // 注入任务类上下文(供粗排判断参考)。 if state != nil && len(state.TaskClassIDs) > 0 { diff --git a/backend/newAgent/router/chat_route.go b/backend/newAgent/router/chat_route.go new file mode 100644 index 0000000..265eb5b --- /dev/null +++ b/backend/newAgent/router/chat_route.go @@ -0,0 +1,164 @@ +package newagentrouter + +import ( + "fmt" + "regexp" + "strings" + + newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" +) + +var ( + // chatRouteHeaderRegex 从模型流式输出中解析 SMARTFLOW_ROUTE 控制码头部。 + // + // 格式示例: + // + // + // 属性说明: + // 1. nonce:防注入校验,必须与调用方传入的 nonce 精确匹配; + // 2. route:路由目标(direct_reply / execute / deep_answer / plan); + // 3. rough_build:可选,仅 route=execute 时有效,默认 false; + // 4. refine:可选,仅 rough_build=true 时有效,默认 false; + // 5. reorder:可选,仅 route=execute 时有效,默认 false; + // 6. thinking:可选,仅 route=execute 时有效,默认 false。 + chatRouteHeaderRegex = regexp.MustCompile( + `(?is)<\s*SMARTFLOW_ROUTE\b` + + `[^>]*\bnonce\s*=\s*["']?([a-zA-Z0-9\-]+)["']?` + + `[^>]*\broute\s*=\s*["']?(direct_reply|execute|deep_answer|plan)["']?` + + `(?:[^>]*\brough_build\s*=\s*["']?(true|false)["']?)?` + + `(?:[^>]*\brefine\s*=\s*["']?(true|false)["']?)?` + + `(?:[^>]*\breorder\s*=\s*["']?(true|false)["']?)?` + + `(?:[^>]*\bthinking\s*=\s*["']?(true|false)["']?)?` + + `[^>]*/\s*>`) +) + +// StreamRouteParser 从 LLM 流式输出中增量提取路由决策。 +// +// 协议约定:模型输出以 SMARTFLOW_ROUTE 控制码标签开头,标签结束后是用户可见内容。 +// 例如:你好!很高兴见到你... +// +// 职责边界: +// 1. 只负责从流式 chunk 中提取控制码并解析为 ChatRoutingDecision; +// 2. 不负责推送 SSE chunk,不负责决定后续走哪条链路; +// 3. 控制码解析失败时标记 fallback,由上层决定降级策略。 +type StreamRouteParser struct { + buf strings.Builder + nonce string + routeFound bool + decision *newagentmodel.ChatRoutingDecision +} + +// NewStreamRouteParser 创建流式路由解析器。 +func NewStreamRouteParser(nonce string) *StreamRouteParser { + return &StreamRouteParser{ + nonce: strings.ToLower(strings.TrimSpace(nonce)), + } +} + +// Feed 写入一段 chunk content。 +// +// 返回值: +// - visible:控制码标签之后的内容(用户可见文本); +// - routeReady:路由决策是否已确定; +// - err:解析错误。 +// +// 调用方应在 routeReady=true 后调用 Decision() 获取路由决策, +// 并根据 route 进入对应分支处理 visible 及后续 chunk。 +func (p *StreamRouteParser) Feed(content string) (visible string, routeReady bool, err error) { + if p.routeFound { + // 路由已解析,后续 chunk 直接透传。 + return content, true, nil + } + + p.buf.WriteString(content) + + text := p.buf.String() + match := chatRouteHeaderRegex.FindStringSubmatchIndex(text) + if match == nil { + // 控制码尚未完整,检查是否应该 fallback。 + if len(text) > 500 { + // 超过 500 字符仍未匹配到控制码 -> fallback 到 plan。 + p.routeFound = true + p.decision = &newagentmodel.ChatRoutingDecision{ + Route: newagentmodel.ChatRoutePlan, + Raw: text, + } + return text, true, fmt.Errorf("控制码解析超时,fallback 到 plan") + } + return "", false, nil + } + + // 提取匹配到的子组。 + groups := chatRouteHeaderRegex.FindStringSubmatch(text) + if len(groups) < 3 { + return "", false, fmt.Errorf("控制码正则子组不足: %d", len(groups)) + } + + // nonce 校验。 + parsedNonce := strings.ToLower(strings.TrimSpace(groups[1])) + if parsedNonce != p.nonce { + return "", false, fmt.Errorf("nonce 不匹配: got=%s expected=%s", parsedNonce, p.nonce) + } + + // 解析 route。 + route := newagentmodel.ChatRoute(strings.TrimSpace(groups[2])) + + // 解析可选布尔属性(默认 false)。 + roughBuild := parseOptionalBool(groups, 3) + refine := parseOptionalBool(groups, 4) + reorder := parseOptionalBool(groups, 5) + thinking := parseOptionalBool(groups, 6) + + p.decision = &newagentmodel.ChatRoutingDecision{ + Route: route, + NeedsRoughBuild: roughBuild, + NeedsRefineAfterRoughBuild: refine, + AllowReorder: reorder, + Thinking: thinking, + Raw: groups[0], + } + + // 归一化与校验。 + if validateErr := p.decision.Validate(); validateErr != nil { + // 校验失败 -> fallback 到 plan。 + p.decision.Route = newagentmodel.ChatRoutePlan + p.decision.NeedsRoughBuild = false + p.decision.NeedsRefineAfterRoughBuild = false + p.decision.AllowReorder = false + p.decision.Thinking = false + } + + p.routeFound = true + + // 控制码标签之后的文本作为 visible 返回。 + fullMatch := groups[0] + tagEndIdx := strings.Index(text, fullMatch) + if tagEndIdx >= 0 { + afterTag := text[tagEndIdx+len(fullMatch):] + // 去掉标签后紧跟的换行符(如果有)。 + afterTag = strings.TrimPrefix(afterTag, "\r\n") + afterTag = strings.TrimPrefix(afterTag, "\n") + return afterTag, true, nil + } + + return "", true, nil +} + +// RouteReady 返回路由决策是否已确定。 +func (p *StreamRouteParser) RouteReady() bool { + return p.routeFound +} + +// Decision 返回已解析的路由决策(RouteReady=true 后可用)。 +func (p *StreamRouteParser) Decision() *newagentmodel.ChatRoutingDecision { + return p.decision +} + +// parseOptionalBool 从正则子组中解析可选布尔值。 +// 如果子组不存在或为空,返回 false。 +func parseOptionalBool(groups []string, index int) bool { + if index >= len(groups) { + return false + } + return strings.TrimSpace(groups[index]) == "true" +} diff --git a/backend/newAgent/stream/emitter.go b/backend/newAgent/stream/emitter.go index a53d5d2..95a6ab4 100644 --- a/backend/newAgent/stream/emitter.go +++ b/backend/newAgent/stream/emitter.go @@ -3,19 +3,22 @@ package newagentstream import ( "context" "fmt" + "io" "strings" "time" + + infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" ) // PayloadEmitter 是真正向外层 SSE 管道写 chunk 的最小接口。 // // 说明: // 1. 这里刻意不用 chan/string 绑死实现; -// 2. 上层既可以传“写 channel”的函数,也可以传“写 gin stream”的函数; +// 2. 上层既可以传"写 channel"的函数,也可以传"写 gin stream"的函数; // 3. 只要签名是 `func(string) error`,都能接进来。 type PayloadEmitter func(payload string) error -// StageEmitter 是 graph/node 对“当前阶段”进行推送的兼容接口。 +// StageEmitter 是 graph/node 对"当前阶段"进行推送的兼容接口。 // // 设计说明: // 1. 旧调用侧仍然只关心 stage/detail 两段文本,因此这里先保留; @@ -23,7 +26,7 @@ type PayloadEmitter func(payload string) error // 3. 这样能兼顾当前兼容性和后续协议升级空间。 type StageEmitter func(stage, detail string) -// PseudoStreamOptions 描述“整段文字伪流式输出”的切块与节奏配置。 +// PseudoStreamOptions 描述"整段文字伪流式输出"的切块与节奏配置。 // // 字段语义: // 1. MinChunkRunes:达到该最小长度后,若命中标点/换行等边界,可提前切块; @@ -51,7 +54,7 @@ func DefaultPseudoStreamOptions() PseudoStreamOptions { // ChunkEmitter 是 newAgent 统一的 SSE chunk 发射器。 // // 职责边界: -// 1. 负责把“正文 / 思考 / 工具事件 / 确认请求 / 中断提示”统一转换成 OpenAI 兼容 payload; +// 1. 负责把"正文 / 思考 / 工具事件 / 确认请求 / 中断提示"统一转换成 OpenAI 兼容 payload; // 2. 负责在必要时把结构化事件附带成 extra,同时给当前前端提供可读的降级文本; // 3. 不负责决定什么时候发什么,也不负责持久化状态。 type ChunkEmitter struct { @@ -365,7 +368,92 @@ func (e *ChunkEmitter) EmitDone() error { return e.emit("[DONE]") } -// EmitStageAsReasoning 把“阶段提示”伪装成 reasoning chunk 推给前端。 +// EmitStreamAssistantText 从 StreamReader 逐 chunk 读取并实时推送 assistant 正文。 +// +// 职责边界: +// 1. 负责把 StreamReader 的每个 chunk 实时转换为 SSE payload 推送; +// 2. 负责累计完整文本并返回,供调用方写入 history; +// 3. 不负责打开/关闭 StreamReader,调用方负责生命周期管理。 +func (e *ChunkEmitter) EmitStreamAssistantText( + ctx context.Context, + reader infrallm.StreamReader, + blockID, stage string, +) (string, error) { + if e == nil || reader == nil { + return "", nil + } + + var fullText strings.Builder + firstChunk := true + + for { + chunk, err := reader.Recv() + if err != nil { + if err == io.EOF { + break + } + return fullText.String(), err + } + + // 推送 reasoning content。 + if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { + if emitErr := e.EmitReasoningText(blockID, stage, chunk.ReasoningContent, firstChunk); emitErr != nil { + return fullText.String(), emitErr + } + firstChunk = false + } + + // 推送 assistant 正文。 + if chunk != nil && chunk.Content != "" { + if emitErr := e.EmitAssistantText(blockID, stage, chunk.Content, firstChunk); emitErr != nil { + return fullText.String(), emitErr + } + fullText.WriteString(chunk.Content) + firstChunk = false + } + } + + return fullText.String(), nil +} + +// EmitStreamReasoningText 从 StreamReader 逐 chunk 读取并实时推送 reasoning 文字。 +// +// 与 EmitStreamAssistantText 结构相同,但只推送 ReasoningContent,不推送 Content。 +// 用于只需展示思考过程而无需展示正文的场景。 +func (e *ChunkEmitter) EmitStreamReasoningText( + ctx context.Context, + reader infrallm.StreamReader, + blockID, stage string, +) (string, error) { + if e == nil || reader == nil { + return "", nil + } + + var fullText strings.Builder + firstChunk := true + + for { + chunk, err := reader.Recv() + if err != nil { + if err == io.EOF { + break + } + return fullText.String(), err + } + + if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { + if emitErr := e.EmitReasoningText(blockID, stage, chunk.ReasoningContent, firstChunk); emitErr != nil { + return fullText.String(), emitErr + } + fullText.WriteString(chunk.ReasoningContent) + firstChunk = false + } + } + + return fullText.String(), nil +} + +// EmitStageAsReasoning 把"阶段提示"伪装成 reasoning chunk 推给前端。 // // 兼容说明: // 1. 保留旧函数签名,方便当前旧链路直接复用; @@ -378,7 +466,7 @@ func EmitStageAsReasoning(emit PayloadEmitter, requestID, modelName string, crea // EmitAssistantReply 把一段完整正文作为 assistant chunk 推出。 // // 注意: -// 1. 这里保持“整段发”,不主动切块; +// 1. 这里保持"整段发",不主动切块; // 2. 若后续某条链路需要更自然的阅读节奏,应直接调用 EmitPseudoAssistantText; // 3. 为兼容老调用侧,这里 blockID 和 stage 都留空。 func EmitAssistantReply(emit PayloadEmitter, requestID, modelName string, created int64, content string, includeRole bool) error { @@ -493,7 +581,7 @@ func (e *ChunkEmitter) emitPseudoText(ctx context.Context, text string, options return nil } -// SplitPseudoStreamText 按“标点优先、长度兜底”的策略切分整段文本。 +// SplitPseudoStreamText 按"标点优先、长度兜底"的策略切分整段文本。 // // 步骤说明: // 1. 优先在句号、问号、感叹号、分号、换行等自然边界切块,保证阅读顺畅; diff --git a/backend/service/agentsvc/agent.go b/backend/service/agentsvc/agent.go index 7a4c032..5c1cb6a 100644 --- a/backend/service/agentsvc/agent.go +++ b/backend/service/agentsvc/agent.go @@ -90,6 +90,12 @@ func normalizeConversationID(chatID string) string { return trimmed } +// thinkingModeToBool 将前端传入的 thinking 模式转换为旧链路所需的 bool 值。 +// 仅 "true" 返回 true,其余("false"/"auto"/"")均返回 false。 +func thinkingModeToBool(mode string) bool { + return strings.TrimSpace(strings.ToLower(mode)) == "true" +} + // pickChatModel 根据请求选择模型。 // 当前约定: // - strategist:策略模型; @@ -569,7 +575,7 @@ func (s *AgentService) runNormalChatFlow( s.ensureConversationTitleAsync(userID, chatID) } -func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThinking bool, modelName string, userID int, chatID string, extra map[string]any) (<-chan string, <-chan error) { +func (s *AgentService) AgentChat(ctx context.Context, userMessage string, thinkingMode string, modelName string, userID int, chatID string, extra map[string]any) (<-chan string, <-chan error) { requestStart := time.Now() traceID := uuid.NewString() @@ -578,7 +584,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin go func() { defer close(outChan) - s.runNewAgentGraph(ctx, userMessage, ifThinking, modelName, userID, chatID, extra, traceID, requestStart, outChan, errChan) + s.runNewAgentGraph(ctx, userMessage, thinkingMode, modelName, userID, chatID, extra, traceID, requestStart, outChan, errChan) }() return outChan, errChan @@ -586,7 +592,8 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin // agentChatOld 是旧路由逻辑的备份,暂时保留供回滚使用。 // TODO: 新 graph 稳定后删除。 -func (s *AgentService) agentChatOld(ctx context.Context, userMessage string, ifThinking bool, modelName string, userID int, chatID string, extra map[string]any) (<-chan string, <-chan error) { +func (s *AgentService) agentChatOld(ctx context.Context, userMessage string, thinkingMode string, modelName string, userID int, chatID string, extra map[string]any) (<-chan string, <-chan error) { + ifThinking := thinkingModeToBool(thinkingMode) requestStart := time.Now() traceID := uuid.NewString() diff --git a/backend/service/agentsvc/agent_newagent.go b/backend/service/agentsvc/agent_newagent.go index 26c49e1..c55a4d3 100644 --- a/backend/service/agentsvc/agent_newagent.go +++ b/backend/service/agentsvc/agent_newagent.go @@ -44,7 +44,7 @@ const ( func (s *AgentService) runNewAgentGraph( ctx context.Context, userMessage string, - ifThinking bool, + thinkingMode string, modelName string, userID int, chatID string, @@ -113,9 +113,11 @@ func (s *AgentService) runNewAgentGraph( // 5.1.2 检索失败只降级为“本轮不注入记忆”,不阻断主链路。 s.injectMemoryContext(requestCtx, conversationContext, userID, chatID, userMessage) - // 5.5 若 extra 携带 task_class_ids,校验后写入 CommonState(仅首轮/尚未设置时生效,跨轮持久化)。 - // 校验:通过 LoadTaskClassMetas → GetCompleteTaskClassesByIDs 检查所有 ID 是否存在且属于当前用户; - // 校验失败时向 errChan 推送 WrongTaskClassID(code=40040),前端收到 SSE 错误事件。 + // 5.5 将前端传入的 thinkingMode 写入 CommonState,供 ChatNode 及下游节点读取。 + cs := runtimeState.EnsureCommonState() + cs.ThinkingMode = thinkingMode + + // 5.6 若 extra 携带 task_class_ids,校验后写入 CommonState(仅首轮/尚未设置时生效,跨轮持久化)。 if taskClassIDs := readAgentExtraIntSlice(extra, "task_class_ids"); len(taskClassIDs) > 0 { cs := runtimeState.EnsureCommonState() if len(cs.TaskClassIDs) == 0 { @@ -186,7 +188,7 @@ func (s *AgentService) runNewAgentGraph( pushErrNonBlocking(errChan, fmt.Errorf("graph 执行失败: %w", graphErr)) // Graph 出错时回退普通聊天,保证可用性。 - s.runNormalChatFlow(requestCtx, s.AIHub.Worker, resolvedModelName, userMessage, "", nil, retryMeta, ifThinking, userID, chatID, traceID, requestStart, outChan, errChan) + s.runNormalChatFlow(requestCtx, s.AIHub.Worker, resolvedModelName, userMessage, "", nil, retryMeta, thinkingModeToBool(thinkingMode), userID, chatID, traceID, requestStart, outChan, errChan) return }