diff --git a/backend/model/agent.go b/backend/model/agent.go
index b0439b6..8304216 100644
--- a/backend/model/agent.go
+++ b/backend/model/agent.go
@@ -58,7 +58,7 @@ type UserSendMessageRequest struct {
ConversationID string `json:"conversation_id,omitempty"`
Message string `json:"message" binding:"required"`
Model string `json:"model,omitempty"`
- Thinking bool `json:"thinking,omitempty"`
+ Thinking string `json:"thinking,omitempty"`
Extra map[string]any `json:"extra,omitempty"`
}
diff --git a/backend/newAgent/model/chat_contract.go b/backend/newAgent/model/chat_contract.go
index 90fbc2c..00ff2b8 100644
--- a/backend/newAgent/model/chat_contract.go
+++ b/backend/newAgent/model/chat_contract.go
@@ -26,18 +26,19 @@ const (
//
// 职责边界:
// 1. Route 决定后续处理路径;
-// 2. Speak 始终填写:给用户看的话;
-// 3. NeedsRoughBuild 仅在 route=execute 且满足粗排条件时为 true;
-// 4. NeedsRefineAfterRoughBuild 仅在 needs_rough_build=true 时有效;
-// 5. AllowReorder 表示是否允许打乱 suggested 任务顺序,仅用户明确授权时应为 true;
-// 6. Reason 给后端和日志看。
+// 2. NeedsRoughBuild 仅在 route=execute 且满足粗排条件时为 true;
+// 3. NeedsRefineAfterRoughBuild 仅在 needs_rough_build=true 时有效;
+// 4. AllowReorder 表示是否允许打乱 suggested 任务顺序,仅用户明确授权时应为 true;
+// 5. Thinking 表示下游 Execute 节点是否应开启深度思考;
+// 6. Raw 保留控制码原文,供日志排查;
+// 7. 用户可见内容(speak)由流式输出自然产出,不由本结构承载。
type ChatRoutingDecision struct {
- Route ChatRoute `json:"route"`
- Speak string `json:"speak,omitempty"`
- NeedsRoughBuild bool `json:"needs_rough_build,omitempty"`
- NeedsRefineAfterRoughBuild bool `json:"needs_refine_after_rough_build,omitempty"`
- AllowReorder bool `json:"allow_reorder,omitempty"`
- Reason string `json:"reason,omitempty"`
+ Route ChatRoute
+ NeedsRoughBuild bool
+ NeedsRefineAfterRoughBuild bool
+ AllowReorder bool
+ Thinking bool
+ Raw string
}
// Normalize 统一清洗路由决策中的字符串字段。
@@ -46,8 +47,7 @@ func (d *ChatRoutingDecision) Normalize() {
return
}
d.Route = ChatRoute(strings.TrimSpace(string(d.Route)))
- d.Speak = strings.TrimSpace(d.Speak)
- d.Reason = strings.TrimSpace(d.Reason)
+ d.Raw = strings.TrimSpace(d.Raw)
}
// Validate 校验路由决策的最小合法性。
@@ -67,16 +67,12 @@ func (d *ChatRoutingDecision) Validate() error {
return fmt.Errorf("未知 route: %s", d.Route)
}
- // direct_reply 必须有 speak。
- if d.Route == ChatRouteDirectReply && d.Speak == "" {
- return fmt.Errorf("direct_reply 必须携带 speak")
- }
-
// 非 execute 路由不应携带粗排和粗排后微调标记,统一归一化为 false。
if d.Route != ChatRouteExecute {
d.NeedsRoughBuild = false
d.NeedsRefineAfterRoughBuild = false
d.AllowReorder = false
+ d.Thinking = false
}
// 只有 needs_rough_build=true 时,needs_refine_after_rough_build 才有语义。
if !d.NeedsRoughBuild {
diff --git a/backend/newAgent/model/common_state.go b/backend/newAgent/model/common_state.go
index 2e9a661..ae15fda 100644
--- a/backend/newAgent/model/common_state.go
+++ b/backend/newAgent/model/common_state.go
@@ -30,7 +30,7 @@ const (
FlowTerminalStatusExhausted FlowTerminalStatus = "exhausted"
)
-// FlowTerminalOutcome 保存“流程为什么结束”的最终结果快照。
+// FlowTerminalOutcome 保存"流程为什么结束"的最终结果快照。
//
// 职责边界:
// 1. Stage 说明终止发生在哪个阶段,便于 graph/deliver/debug 统一收口;
@@ -97,7 +97,7 @@ type CommonState struct {
// NeedsRoughBuild 由 Plan 节点在 plan_done 时写入,标记 Confirm 后是否需要走粗排节点。
// 粗排节点执行完毕后会将此字段重置为 false。
NeedsRoughBuild bool `json:"needs_rough_build,omitempty"`
- // NeedsRefineAfterRoughBuild 表示“粗排完成后是否需要立即进入微调”。
+ // NeedsRefineAfterRoughBuild 表示"粗排完成后是否需要立即进入微调"。
//
// 说明:
// 1. 该标记主要用于 chat->execute 的直执行链路;
@@ -105,13 +105,21 @@ type CommonState struct {
// 3. false 表示用户仅要求完成排入,粗排成功后可直接收口,等待后续再优化。
NeedsRefineAfterRoughBuild bool `json:"needs_refine_after_rough_build,omitempty"`
// AllowReorder 表示本轮是否允许打乱 suggested 任务的相对顺序。
- // 默认 false,只有用户明确说明“可以打乱顺序/顺序不重要”才会为 true。
+ // 默认 false,只有用户明确说明"可以打乱顺序/顺序不重要"才会为 true。
AllowReorder bool `json:"allow_reorder,omitempty"`
- // SuggestedOrderBaseline 保存“本轮 execute 启动前”的 suggested 任务相对顺序基线。
+ // SuggestedOrderBaseline 保存"本轮 execute 启动前"的 suggested 任务相对顺序基线。
// OrderGuard 节点会基于该基线判断微调是否破坏顺序约束。
SuggestedOrderBaseline []int `json:"suggested_order_baseline,omitempty"`
- // TerminalOutcome 保存“本轮流程最终如何结束”的统一收口结果。
+ // ExecuteThinking 由 Chat 路由决策传入,表示 Execute 节点是否应开启深度思考。
+ // 预埋字段,当前阶段 Execute 节点可自行决定是否读取。
+ ExecuteThinking bool `json:"execute_thinking,omitempty"`
+
+ // ThinkingMode 由前端传入,控制所有下游 LLM 调用的 thinking 行为。
+ // "true" 强制开启,"false" 强制关闭,"auto"(默认)交给路由决策。
+ ThinkingMode string `json:"thinking_mode,omitempty"`
+
+ // TerminalOutcome 保存"本轮流程最终如何结束"的统一收口结果。
// 第二轮开始,rough_build / execute / deliver 都应围绕这份快照判断收口语义。
TerminalOutcome *FlowTerminalOutcome `json:"terminal_outcome,omitempty"`
}
@@ -184,12 +192,12 @@ func (s *CommonState) RejectPlan() {
s.ClearTerminalOutcome()
}
-// ResetForNextRun 在“上一轮已经收口,且本轮准备开始新请求”时重置执行期临时状态。
+// ResetForNextRun 在"上一轮已经收口,且本轮准备开始新请求"时重置执行期临时状态。
//
// 职责边界:
// 1. 负责清理会污染新一轮执行的临时字段(轮次、修正计数、计划游标、粗排开关、顺序基线、终止结果);
// 2. 不负责清理会话身份与跨轮共享数据(ConversationID/UserID/TaskClassIDs/TaskClasses/历史上下文/ScheduleState);
-// 3. 该方法是幂等操作:重复调用不会引入额外副作用,便于在“加载兜底 + chat 入口”双保险场景下复用。
+// 3. 该方法是幂等操作:重复调用不会引入额外副作用,便于在"加载兜底 + chat 入口"双保险场景下复用。
func (s *CommonState) ResetForNextRun() {
if s == nil {
return
@@ -237,7 +245,7 @@ func (s *CommonState) Done() {
}
}
-// Abort 将当前流程标记为“业务语义上的主动终止”。
+// Abort 将当前流程标记为"业务语义上的主动终止"。
//
// 步骤说明:
// 1. 统一写入 PhaseDone,保证 graph 后续直接进入 deliver 收口;
@@ -255,7 +263,7 @@ func (s *CommonState) Abort(stage, code, userMessage, internalReason string) {
s.TerminalOutcome.Normalize()
}
-// Exhaust 将当前流程标记为“安全边界触发的被动停止”。
+// Exhaust 将当前流程标记为"安全边界触发的被动停止"。
func (s *CommonState) Exhaust(stage, userMessage, internalReason string) {
s.Phase = PhaseDone
s.TerminalOutcome = &FlowTerminalOutcome{
@@ -289,17 +297,17 @@ func (s *CommonState) TerminalStatus() FlowTerminalStatus {
return s.TerminalOutcome.Status
}
-// IsCompleted 判断当前是否属于“正常完成”。
+// IsCompleted 判断当前是否属于"正常完成"。
func (s *CommonState) IsCompleted() bool {
return s.TerminalStatus() == FlowTerminalStatusCompleted
}
-// IsAborted 判断当前是否属于“主动中止”。
+// IsAborted 判断当前是否属于"主动中止"。
func (s *CommonState) IsAborted() bool {
return s.TerminalStatus() == FlowTerminalStatusAborted
}
-// IsExhaustedTerminal 判断当前是否属于“轮次耗尽收口”。
+// IsExhaustedTerminal 判断当前是否属于"轮次耗尽收口"。
func (s *CommonState) IsExhaustedTerminal() bool {
return s.TerminalStatus() == FlowTerminalStatusExhausted
}
diff --git a/backend/newAgent/node/chat.go b/backend/newAgent/node/chat.go
index 967286b..2204920 100644
--- a/backend/newAgent/node/chat.go
+++ b/backend/newAgent/node/chat.go
@@ -3,15 +3,18 @@ package newagentnode
import (
"context"
"fmt"
+ "io"
"log"
"strings"
"time"
"github.com/cloudwego/eino/schema"
+ "github.com/google/uuid"
infrallm "github.com/LoveLosita/smartflow/backend/infra/llm"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt"
+ newagentrouter "github.com/LoveLosita/smartflow/backend/newAgent/router"
newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream"
)
@@ -21,7 +24,7 @@ const (
chatSpeakBlockID = "chat.speak"
// chatHistoryKindKey 用于在 history 中打运行态标记,供 prompt 层做上下文分层。
chatHistoryKindKey = "newagent_history_kind"
- // chatHistoryKindExecuteLoopClosed 表示“上一轮 execute loop 已正常收口”。
+ // chatHistoryKindExecuteLoopClosed 表示"上一轮 execute loop 已正常收口"。
// prompt 侧会据此把旧 loop 归档到 msg1,而不是继续占用 msg2 窗口。
chatHistoryKindExecuteLoopClosed = "execute_loop_closed"
)
@@ -75,9 +78,9 @@ func RunChatNode(ctx context.Context, input ChatNodeInput) error {
if !runtimeState.HasPendingInteraction() && flowState.Phase == newagentmodel.PhaseDone {
terminalBefore := flowState.TerminalStatus()
roundBefore := flowState.RoundUsed
- // 1. 只有“正常完成(completed)”才打 loop 收口标记:
- // 1.1 这样下一轮进入 execute 时,msg2 会只保留“当前活跃循环”窗口;
- // 1.2 异常收口(exhausted/aborted)不打标记,允许后续“继续”时沿用上一轮 loop 轨迹。
+ // 1. 只有"正常完成(completed)"才打 loop 收口标记:
+ // 1.1 这样下一轮进入 execute 时,msg2 会只保留"当前活跃循环"窗口;
+ // 1.2 异常收口(exhausted/aborted)不打标记,允许后续"继续"时沿用上一轮 loop 轨迹。
if terminalBefore == newagentmodel.FlowTerminalStatusCompleted {
appendExecuteLoopClosedMarker(conversationContext)
}
@@ -89,86 +92,28 @@ func RunChatNode(ctx context.Context, input ChatNodeInput) error {
terminalBefore,
)
}
- messages := newagentprompt.BuildChatRoutingMessages(conversationContext, input.UserInput, flowState)
+ nonce := uuid.NewString()
+ messages := newagentprompt.BuildChatRoutingMessages(conversationContext, input.UserInput, flowState, nonce)
- decision, rawResult, err := infrallm.GenerateJSON[newagentmodel.ChatRoutingDecision](
- ctx,
- input.Client,
- messages,
- infrallm.GenerateOptions{
- Temperature: 0.1,
- MaxTokens: 500,
- Thinking: infrallm.ThinkingModeDisabled,
- Metadata: map[string]any{
- "stage": chatStageName,
- "phase": "routing",
- },
+ reader, err := input.Client.Stream(ctx, messages, infrallm.GenerateOptions{
+ Temperature: 0.7,
+ Thinking: infrallm.ThinkingModeDisabled,
+ Metadata: map[string]any{
+ "stage": chatStageName,
+ "phase": "routing",
},
- )
-
- rawText := ""
- if rawResult != nil {
- rawText = strings.TrimSpace(rawResult.Text)
- }
-
+ })
if err != nil {
- // 路由失败 → 保守:走 plan。
- log.Printf("[WARN] chat routing LLM failed chat=%s raw=%s err=%v",
- flowState.ConversationID, rawText, err)
+ log.Printf("[WARN] chat routing stream failed chat=%s err=%v", flowState.ConversationID, err)
flowState.Phase = newagentmodel.PhasePlanning
return nil
}
- if validateErr := decision.Validate(); validateErr != nil {
- log.Printf("[WARN] chat routing decision invalid chat=%s raw=%s err=%v",
- flowState.ConversationID, rawText, validateErr)
- flowState.Phase = newagentmodel.PhasePlanning
- return nil
- }
-
- // 1. 二次粗排硬闸门:若上下文已存在 rough_build_done 且用户未明确要求“重新粗排”,
- // 则强制关闭 needs_rough_build,避免“微调请求被误判成再次粗排”。
- // 2. 该闸门只收紧粗排开关,不改路由 route,确保 execute 微调链路仍可继续。
- // 3. 一旦用户明确表达“从头重排/重新粗排”,仍允许 needs_rough_build=true 生效。
- if shouldDisableRoughBuildForRefine(conversationContext, input.UserInput, decision) {
- decision.NeedsRoughBuild = false
- decision.NeedsRefineAfterRoughBuild = false
- }
-
- log.Printf(
- "[DEBUG] chat routing chat=%s route=%s needs_rough_build=%v needs_refine_after_rough_build=%v allow_reorder=%v has_rough_build_done=%v task_class_count=%d reason=%s",
- flowState.ConversationID,
- decision.Route,
- decision.NeedsRoughBuild,
- decision.NeedsRefineAfterRoughBuild,
- decision.AllowReorder,
- hasRoughBuildDoneMarker(conversationContext),
- len(flowState.TaskClassIDs),
- decision.Reason,
- )
- flowState.AllowReorder = resolveAllowReorder(input.UserInput, decision.AllowReorder)
-
- // 3. 按路由决策推进。
- switch decision.Route {
- case newagentmodel.ChatRouteDirectReply:
- return handleDirectReply(ctx, decision, conversationContext, emitter, flowState)
-
- case newagentmodel.ChatRouteExecute:
- return handleRouteExecute(decision, emitter, flowState)
-
- case newagentmodel.ChatRouteDeepAnswer:
- return handleDeepAnswer(ctx, input, decision, conversationContext, emitter, flowState)
-
- case newagentmodel.ChatRoutePlan:
- return handleRoutePlan(decision, emitter, flowState)
-
- default:
- flowState.Phase = newagentmodel.PhasePlanning
- return nil
- }
+ parser := newagentrouter.NewStreamRouteParser(nonce)
+ return streamAndDispatch(ctx, reader, parser, input, emitter, flowState, conversationContext)
}
-// appendExecuteLoopClosedMarker 在 history 中写入“execute loop 已正常收口”标记。
+// appendExecuteLoopClosedMarker 在 history 中写入"execute loop 已正常收口"标记。
//
// 职责边界:
// 1. 只负责写一个轻量 marker,供 prompt 分层;
@@ -207,51 +152,254 @@ func isExecuteLoopClosedMarker(msg *schema.Message) bool {
return strings.TrimSpace(kind) == chatHistoryKindExecuteLoopClosed
}
-// handleDirectReply 处理简单任务:直接输出回复。
-func handleDirectReply(
+// streamAndDispatch 是流式路由分发的核心循环。
+//
+// 步骤说明:
+// 1. 从 StreamReader 逐 chunk 读取,喂给 StreamRouteParser 增量解析控制码;
+// 2. 控制码解析完成后,根据 route 进入对应的流式处理分支;
+// 3. 控制码解析超时或流异常结束 → fallback 到 plan。
+func streamAndDispatch(
ctx context.Context,
- decision *newagentmodel.ChatRoutingDecision,
- conversationContext *newagentmodel.ConversationContext,
+ reader infrallm.StreamReader,
+ parser *newagentrouter.StreamRouteParser,
+ input ChatNodeInput,
emitter *newagentstream.ChunkEmitter,
flowState *newagentmodel.CommonState,
+ conversationContext *newagentmodel.ConversationContext,
) error {
- if strings.TrimSpace(decision.Speak) != "" {
- if err := emitter.EmitPseudoAssistantText(
- ctx, chatSpeakBlockID, chatStageName,
- decision.Speak,
- newagentstream.DefaultPseudoStreamOptions(),
- ); err != nil {
- return fmt.Errorf("闲聊回复推送失败: %w", err)
+ for {
+ chunk, err := reader.Recv()
+ if err == io.EOF {
+ if !parser.RouteReady() {
+ log.Printf("[WARN] chat stream ended before route resolved chat=%s", flowState.ConversationID)
+ flowState.Phase = newagentmodel.PhasePlanning
+ return nil
+ }
+ break
}
- conversationContext.AppendHistory(schema.AssistantMessage(decision.Speak, nil))
+ if err != nil {
+ log.Printf("[WARN] chat stream recv error chat=%s err=%v", flowState.ConversationID, err)
+ flowState.Phase = newagentmodel.PhasePlanning
+ return nil
+ }
+
+ content := ""
+ if chunk != nil {
+ content = chunk.Content
+ }
+
+ visible, routeReady, _ := parser.Feed(content)
+ if !routeReady {
+ continue
+ }
+
+ // 控制码解析完成,进入路由分发。
+ decision := parser.Decision()
+
+ // 二次粗排硬闸门:若上下文已存在 rough_build_done 且用户未明确要求"重新粗排",
+ // 则强制关闭 needs_rough_build,避免"微调请求被误判成再次粗排"。
+ if shouldDisableRoughBuildForRefine(conversationContext, input.UserInput, decision) {
+ decision.NeedsRoughBuild = false
+ decision.NeedsRefineAfterRoughBuild = false
+ }
+
+ log.Printf(
+ "[DEBUG] chat routing chat=%s route=%s needs_rough_build=%v needs_refine_after_rough_build=%v allow_reorder=%v thinking=%v has_rough_build_done=%v task_class_count=%d raw=%s",
+ flowState.ConversationID,
+ decision.Route,
+ decision.NeedsRoughBuild,
+ decision.NeedsRefineAfterRoughBuild,
+ decision.AllowReorder,
+ decision.Thinking,
+ hasRoughBuildDoneMarker(conversationContext),
+ len(flowState.TaskClassIDs),
+ decision.Raw,
+ )
+
+ flowState.AllowReorder = resolveAllowReorder(input.UserInput, decision.AllowReorder)
+ effectiveThinking := resolveEffectiveThinking(flowState.ThinkingMode, decision.Thinking)
+
+ switch decision.Route {
+ case newagentmodel.ChatRouteDirectReply:
+ return handleDirectReplyStream(ctx, reader, input, emitter, conversationContext, flowState, effectiveThinking, visible)
+
+ case newagentmodel.ChatRouteExecute:
+ return handleRouteExecuteStream(reader, emitter, flowState, decision, input.UserInput, effectiveThinking, visible)
+
+ case newagentmodel.ChatRouteDeepAnswer:
+ return handleDeepAnswerStream(ctx, reader, input, emitter, conversationContext, flowState, effectiveThinking)
+
+ case newagentmodel.ChatRoutePlan:
+ return handleRoutePlanStream(reader, emitter, flowState, effectiveThinking, visible)
+
+ default:
+ flowState.Phase = newagentmodel.PhasePlanning
+ return nil
+ }
+ }
+ return nil
+}
+
+// resolveEffectiveThinking 根据前端 ThinkingMode 和路由决策合并出最终 thinking 状态。
+//
+// 规则:
+// - "true" 强制开启;
+// - "false" 强制关闭;
+// - "auto"/"" 交给路由决策的 decisionThinking。
+func resolveEffectiveThinking(mode string, decisionThinking bool) bool {
+ switch strings.TrimSpace(strings.ToLower(mode)) {
+ case "true":
+ return true
+ case "false":
+ return false
+ default:
+ return decisionThinking
+ }
+}
+
+// handleDirectReplyStream 处理闲聊回复。
+//
+// 两种模式:
+// 1. thinking=false:同一流续传,逐 chunk 推送;
+// 2. thinking=true:关闭路由流,发起第二次 thinking 流式调用。
+func handleDirectReplyStream(
+ ctx context.Context,
+ reader infrallm.StreamReader,
+ input ChatNodeInput,
+ emitter *newagentstream.ChunkEmitter,
+ conversationContext *newagentmodel.ConversationContext,
+ flowState *newagentmodel.CommonState,
+ effectiveThinking bool,
+ firstVisible string,
+) error {
+ if effectiveThinking {
+ return handleThinkingReplyStream(ctx, reader, input, emitter, conversationContext, flowState)
+ }
+ return handleDirectReplyContinueStream(ctx, reader, emitter, conversationContext, flowState, firstVisible)
+}
+
+// handleThinkingReplyStream 处理需要思考的回复:关闭路由流 → 第二次 thinking 流式调用。
+func handleThinkingReplyStream(
+ ctx context.Context,
+ reader infrallm.StreamReader,
+ input ChatNodeInput,
+ emitter *newagentstream.ChunkEmitter,
+ conversationContext *newagentmodel.ConversationContext,
+ flowState *newagentmodel.CommonState,
+) error {
+ _ = reader.Close()
+
+ deepMessages := newagentprompt.BuildDeepAnswerMessages(conversationContext, input.UserInput)
+ deepReader, err := input.Client.Stream(ctx, deepMessages, infrallm.GenerateOptions{
+ Temperature: 0.5,
+ MaxTokens: 2000,
+ Thinking: infrallm.ThinkingModeEnabled,
+ Metadata: map[string]any{
+ "stage": chatStageName,
+ "phase": "direct_reply_thinking",
+ },
+ })
+ if err != nil {
+ log.Printf("[WARN] thinking reply stream failed chat=%s err=%v", flowState.ConversationID, err)
+ flowState.Phase = newagentmodel.PhaseChatting
+ return nil
+ }
+
+ deepText, err := emitter.EmitStreamAssistantText(ctx, deepReader, chatSpeakBlockID, chatStageName)
+ _ = deepReader.Close()
+ if err != nil {
+ log.Printf("[WARN] thinking reply emit error chat=%s err=%v", flowState.ConversationID, err)
+ flowState.Phase = newagentmodel.PhaseChatting
+ return nil
+ }
+
+ deepText = strings.TrimSpace(deepText)
+ if deepText != "" {
+ conversationContext.AppendHistory(schema.AssistantMessage(deepText, nil))
}
flowState.Phase = newagentmodel.PhaseChatting
return nil
}
-// handleRouteExecute 处理中等任务:推送简短确认,设 PhaseExecuting。
+// handleDirectReplyContinueStream 处理无思考的闲聊:同一流续传。
+func handleDirectReplyContinueStream(
+ ctx context.Context,
+ reader infrallm.StreamReader,
+ emitter *newagentstream.ChunkEmitter,
+ conversationContext *newagentmodel.ConversationContext,
+ flowState *newagentmodel.CommonState,
+ firstVisible string,
+) error {
+ var fullText strings.Builder
+ fullText.WriteString(firstVisible)
+
+ // 推送控制码之后的第一段内容。
+ if strings.TrimSpace(firstVisible) != "" {
+ if err := emitter.EmitAssistantText(chatSpeakBlockID, chatStageName, firstVisible, true); err != nil {
+ return fmt.Errorf("闲聊回复推送失败: %w", err)
+ }
+ }
+
+ firstChunk := firstVisible == ""
+ // 继续读同一个流,逐 chunk 推送。
+ for {
+ chunk, err := reader.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ log.Printf("[WARN] direct_reply stream error chat=%s err=%v", flowState.ConversationID, err)
+ break
+ }
+ if chunk == nil || chunk.Content == "" {
+ continue
+ }
+ if err := emitter.EmitAssistantText(chatSpeakBlockID, chatStageName, chunk.Content, firstChunk); err != nil {
+ return fmt.Errorf("闲聊回复推送失败: %w", err)
+ }
+ fullText.WriteString(chunk.Content)
+ firstChunk = false
+ }
+
+ text := fullText.String()
+ if strings.TrimSpace(text) != "" {
+ conversationContext.AppendHistory(schema.AssistantMessage(text, nil))
+ }
+
+ flowState.Phase = newagentmodel.PhaseChatting
+ return nil
+}
+
+// handleRouteExecuteStream 处理工具调用路由:推送状态确认 → 设 PhaseExecuting。
//
-// 不把 speak 写入 history,因为真正的回复由 Execute 节点产出。
-func handleRouteExecute(
- decision *newagentmodel.ChatRoutingDecision,
+// 说明:
+// 1. 关闭路由流(后续内容不需要);
+// 2. 推送轻量状态通知;
+// 3. 设置流程状态,进入 Execute 或 RoughBuild。
+func handleRouteExecuteStream(
+ reader infrallm.StreamReader,
emitter *newagentstream.ChunkEmitter,
flowState *newagentmodel.CommonState,
+ decision *newagentmodel.ChatRoutingDecision,
+ userInput string,
+ effectiveThinking bool,
+ speak string,
) error {
- speak := strings.TrimSpace(decision.Speak)
- if speak == "" {
+ // 关闭路由流。
+ _ = reader.Close()
+
+ if strings.TrimSpace(speak) == "" {
speak = "好的,我来处理。"
}
- // 推送轻量状态通知,让前端知道请求已接收。
+ // 推送轻量状态通知。
_ = emitter.EmitStatus(chatStatusBlockID, chatStageName, "accepted", speak, false)
- // 清空旧 PlanSteps 并设 PhaseExecuting,避免上一次任务残留的步骤被 HasPlan() 误判。
+ // 清空旧 PlanSteps 并设 PhaseExecuting。
flowState.StartDirectExecute()
- // 1. 默认不走粗排与粗排后微调,避免沿用上轮遗留标记。
- // 2. 只有 route 判定为“需要粗排”且确实有 task_class_ids 时,才打开粗排开关。
- // 3. 粗排后是否立即进入微调,完全由路由决策显式标记控制。
+ // 粗排开关逻辑。
flowState.NeedsRoughBuild = false
flowState.NeedsRefineAfterRoughBuild = false
if decision.NeedsRoughBuild && len(flowState.TaskClassIDs) > 0 {
@@ -259,15 +407,17 @@ func handleRouteExecute(
flowState.NeedsRefineAfterRoughBuild = decision.NeedsRefineAfterRoughBuild
}
+ flowState.ExecuteThinking = effectiveThinking
+
return nil
}
-// resolveAllowReorder 统一计算“本轮是否允许打乱顺序”。
+// resolveAllowReorder 统一计算"本轮是否允许打乱顺序"。
//
// 步骤化说明:
// 1. 后端先做显式语义判定:用户明确允许/明确禁止时,直接以后端判定为准;
// 2. 若后端未识别到显式语义,再回退到路由模型的 allow_reorder 字段;
-// 3. 默认返回 false,确保“保持顺序”是系统默认行为。
+// 3. 默认返回 false,确保"保持顺序"是系统默认行为。
func resolveAllowReorder(userInput string, modelAllowReorder bool) bool {
switch detectReorderPreference(userInput) {
case reorderAllow:
@@ -279,11 +429,11 @@ func resolveAllowReorder(userInput string, modelAllowReorder bool) bool {
}
}
-// detectReorderPreference 识别用户是否“明确授权打乱顺序”。
+// detectReorderPreference 识别用户是否"明确授权打乱顺序"。
//
// 职责边界:
// 1. 只负责关键词级别的显式意图识别,不做复杂语义推理;
-// 2. 若同时命中“允许”与“禁止”,优先按“禁止”处理,避免误放开顺序约束;
+// 2. 若同时命中"允许"与"禁止",优先按"禁止"处理,避免误放开顺序约束;
// 3. 未命中显式表达时返回 unknown,交给上层兜底策略。
func detectReorderPreference(userInput string) reorderPreference {
text := strings.ToLower(strings.TrimSpace(userInput))
@@ -332,12 +482,12 @@ func containsAnyPhrase(text string, phrases []string) bool {
return false
}
-// shouldDisableRoughBuildForRefine 判断是否应在 chat 路由阶段关闭“再次粗排”。
+// shouldDisableRoughBuildForRefine 判断是否应在 chat 路由阶段关闭"再次粗排"。
//
// 判定规则:
// 1. 当前决策未请求粗排时,直接不干预;
// 2. 上下文不存在 rough_build_done 时,不干预(首次粗排仍可走);
-// 3. 若用户未明确要求“重新粗排/从头重排”,则关闭粗排开关,避免误触发。
+// 3. 若用户未明确要求"重新粗排/从头重排",则关闭粗排开关,避免误触发。
func shouldDisableRoughBuildForRefine(
conversationContext *newagentmodel.ConversationContext,
userInput string,
@@ -364,7 +514,7 @@ func hasRoughBuildDoneMarker(conversationContext *newagentmodel.ConversationCont
return false
}
-// isExplicitRoughBuildRequest 识别用户是否明确要求“重新粗排/从头重排”。
+// isExplicitRoughBuildRequest 识别用户是否明确要求"重新粗排/从头重排"。
func isExplicitRoughBuildRequest(userInput string) bool {
text := strings.ToLower(strings.TrimSpace(userInput))
if text == "" {
@@ -388,80 +538,81 @@ func isExplicitRoughBuildRequest(userInput string) bool {
return containsAnyPhrase(text, keywords)
}
-// handleDeepAnswer 处理复杂问答:推送过渡语 → 原地开 thinking 再调一次 LLM → 输出深度回答。
-func handleDeepAnswer(
+// handleDeepAnswerStream 处理复杂问答:关闭路由流 → 第二次流式调用。
+//
+// 步骤说明:
+// 1. 关闭第一个路由流;
+// 2. 发起第二次流式 LLM 调用(thinking 由 effectiveThinking 控制);
+// 3. 真流式推送 reasoning + 正文;
+// 4. 完整回复写入 history。
+func handleDeepAnswerStream(
ctx context.Context,
+ reader infrallm.StreamReader,
input ChatNodeInput,
- decision *newagentmodel.ChatRoutingDecision,
- conversationContext *newagentmodel.ConversationContext,
emitter *newagentstream.ChunkEmitter,
+ conversationContext *newagentmodel.ConversationContext,
flowState *newagentmodel.CommonState,
+ effectiveThinking bool,
) error {
- // 1. 推送过渡语。
- briefSpeak := strings.TrimSpace(decision.Speak)
- if briefSpeak == "" {
- briefSpeak = "让我想想。"
- }
- if err := emitter.EmitPseudoAssistantText(
- ctx, chatSpeakBlockID, chatStageName,
- briefSpeak,
- newagentstream.DefaultPseudoStreamOptions(),
- ); err != nil {
- return fmt.Errorf("过渡文案推送失败: %w", err)
- }
+ // 1. 关闭第一个路由流。
+ _ = reader.Close()
- // 2. 第二次 LLM 调用:开 thinking,深度回答。
+ // 2. 第二次流式调用。
+ thinkingOpt := infrallm.ThinkingModeDisabled
+ if effectiveThinking {
+ thinkingOpt = infrallm.ThinkingModeEnabled
+ }
deepMessages := newagentprompt.BuildDeepAnswerMessages(conversationContext, input.UserInput)
- deepResult, err := input.Client.GenerateText(ctx, deepMessages, infrallm.GenerateOptions{
+ deepReader, err := input.Client.Stream(ctx, deepMessages, infrallm.GenerateOptions{
Temperature: 0.5,
MaxTokens: 2000,
- Thinking: infrallm.ThinkingModeEnabled,
+ Thinking: thinkingOpt,
Metadata: map[string]any{
"stage": chatStageName,
"phase": "deep_answer",
},
})
-
- if err != nil || deepResult == nil {
- // 深度回答失败 → 降级,只保留过渡语。
- log.Printf("[WARN] deep answer LLM failed chat=%s err=%v", flowState.ConversationID, err)
- conversationContext.AppendHistory(schema.AssistantMessage(briefSpeak, nil))
+ if err != nil {
+ // 深度回答失败 → 降级返回。
+ log.Printf("[WARN] deep answer stream failed chat=%s err=%v", flowState.ConversationID, err)
flowState.Phase = newagentmodel.PhaseChatting
return nil
}
- // 3. 输出深度回答。
- deepText := strings.TrimSpace(deepResult.Text)
+ // 3. 真流式推送 reasoning + 正文。
+ deepText, err := emitter.EmitStreamAssistantText(ctx, deepReader, chatSpeakBlockID, chatStageName)
+ _ = deepReader.Close()
+ if err != nil {
+ log.Printf("[WARN] deep answer stream emit error chat=%s err=%v", flowState.ConversationID, err)
+ flowState.Phase = newagentmodel.PhaseChatting
+ return nil
+ }
+
+ deepText = strings.TrimSpace(deepText)
if deepText == "" {
- conversationContext.AppendHistory(schema.AssistantMessage(briefSpeak, nil))
flowState.Phase = newagentmodel.PhaseChatting
return nil
}
- if err := emitter.EmitPseudoAssistantText(
- ctx, chatSpeakBlockID, chatStageName,
- deepText,
- newagentstream.DefaultPseudoStreamOptions(),
- ); err != nil {
- return fmt.Errorf("深度回答推送失败: %w", err)
- }
-
- // 将完整回复(过渡语 + 深度回答)写入 history。
- fullReply := briefSpeak + "\n\n" + deepText
- conversationContext.AppendHistory(schema.AssistantMessage(fullReply, nil))
+ // 4. 完整回复写入 history。
+ conversationContext.AppendHistory(schema.AssistantMessage(deepText, nil))
flowState.Phase = newagentmodel.PhaseChatting
return nil
}
-// handleRoutePlan 处理复杂规划:推送确认语,设 PhasePlanning。
-func handleRoutePlan(
- decision *newagentmodel.ChatRoutingDecision,
+// handleRoutePlanStream 处理规划路由:推送状态确认 → 设 PhasePlanning。
+func handleRoutePlanStream(
+ reader infrallm.StreamReader,
emitter *newagentstream.ChunkEmitter,
flowState *newagentmodel.CommonState,
+ effectiveThinking bool,
+ speak string,
) error {
- speak := strings.TrimSpace(decision.Speak)
- if speak == "" {
+ // 关闭路由流。
+ _ = reader.Close()
+
+ if strings.TrimSpace(speak) == "" {
speak = "好的,让我来规划一下。"
}
diff --git a/backend/newAgent/prompt/chat.go b/backend/newAgent/prompt/chat.go
index 6c33635..1dbe63f 100644
--- a/backend/newAgent/prompt/chat.go
+++ b/backend/newAgent/prompt/chat.go
@@ -3,62 +3,71 @@ package newagentprompt
import (
"fmt"
"strings"
+ "time"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
"github.com/cloudwego/eino/schema"
)
const chatRoutingSystemPrompt = `
-你是 SmartFlow 的智能路由器。你的职责是判断用户意图的复杂度,并决定后续处理路径。
-
-你会看到:
-- 历史对话
-- 用户本轮输入
-- 当前可用工具摘要(如有)
-- 本次排课涉及的任务类约束(如有)
-
-请遵守以下规则:
-1. 只输出严格 JSON,不要输出 markdown,不要输出额外解释。
-2. 根据用户意图判断复杂度并选择路由。
-3. speak 字段始终填写:给用户看的话。
+你是 SmartFlow 的智能路由器。你的回复必须以路由控制码开头,控制码后紧跟用户可见的内容。
路由规则:
-- direct_reply:纯闲聊、简单问答、打招呼、感谢等。speak 直接写你的完整回复。
-- execute:需要用工具处理的请求(查询日程、移动课程、排课等),但不需要先制定计划。speak 写简短确认。
-- deep_answer:复杂问题但不需要工具(如分析建议、深度解释等),需要深度思考后直接回答。speak 写过渡语(如"让我想想")。
-- plan:用户明确要求先制定计划,或涉及多阶段复杂规划。speak 写确认语。
+- direct_reply:纯闲聊、简单问答、打招呼、感谢等。控制码后直接输出完整回复。
+- execute:需要用工具处理的请求(查询日程、移动课程、排课等),但不需要先制定计划。控制码后输出简短确认。
+- deep_answer:复杂问题但不需要工具(如分析建议、深度解释等),需要深度思考后回答。控制码后输出过渡语(如"让我想想")。
+- plan:用户明确要求先制定计划,或涉及多阶段复杂规划。控制码后输出简短确认。
-粗排判断:当用户意图包含"批量安排/排课/把任务类排进日程",且上下文中有任务类 ID 时,设置 needs_rough_build=true。
+粗排判断:当用户意图包含"批量安排/排课/把任务类排进日程",且上下文中有任务类 ID 时,设置 rough_build=true。
二次粗排约束(强约束):
-- 若上下文已出现 rough_build_done,且用户未明确要求“重新粗排/从头重排”,必须设置 needs_rough_build=false。
-- “移动/微调/优化/均匀化/调顺序”等请求默认视为 refine,不得再次触发 rough build。
+- 若上下文已出现 rough_build_done,且用户未明确要求"重新粗排/从头重排",必须设置 rough_build=false。
+- "移动/微调/优化/均匀化/调顺序"等请求默认视为 refine,不得再次触发 rough build。
粗排后微调判断:
-- 仅当 needs_rough_build=true 时才判断 needs_refine_after_rough_build。
-- 若用户明确提出优化目标/偏好(如"尽量均衡""周三别太满""某门课往后挪"),设 needs_refine_after_rough_build=true。
-- 若用户只要求"先排进去/给初稿",未提出微调目标,设 needs_refine_after_rough_build=false。
+- 仅当 rough_build=true 时才判断 refine。
+- 若用户明确提出优化目标/偏好(如"尽量均衡""周三别太满""某门课往后挪"),设 refine=true。
+- 若用户只要求"先排进去/给初稿",未提出微调目标,设 refine=false。
顺序授权判断:
-- allow_reorder 仅在用户明确说明“允许打乱顺序/顺序不重要”时才为 true。
-- 用户明确要求“保持顺序/不要打乱”时必须为 false。
+- reorder 仅在用户明确说明"允许打乱顺序/顺序不重要"时才为 true。
+- 用户明确要求"保持顺序/不要打乱"时必须为 false。
- 若用户未明确提及顺序,一律为 false。
+深度思考判断:
+- thinking 仅在 route=execute 时有效。
+- 当用户请求涉及复杂推理、多条件约束、需要深度分析后才能执行的操作时,设 thinking=true。
+- 简单查询、单步操作设 thinking=false。
-输出协议(严格 JSON):
-{"route":"direct_reply / execute / deep_answer / plan","speak":"给用户看的话","needs_rough_build":false,"needs_refine_after_rough_build":false,"allow_reorder":false,"reason":"简短判断依据"}
+输出格式(严格两段式):
+第一段(控制码,用户不可见,后端会截取):
+
+第二段(紧接控制码之后,用户可见):
+根据路由输出对应内容。
+
+属性说明(仅 route=execute 时有效,其余路由省略这些属性):
+- rough_build:是否需要粗排
+- refine:粗排后是否需要微调
+- reorder:是否允许打乱顺序
+- thinking:后续执行阶段是否需要深度思考
合法示例:
-{"route":"direct_reply","speak":"你好!我是 SmartFlow 助手,有什么可以帮你的?","reason":"用户打招呼"}
+
+你好!我是 SmartFlow 助手,有什么可以帮你的?
-{"route":"execute","speak":"好的,我来帮你看看今天的安排。","reason":"需要调用工具查询日程","needs_rough_build":false,"needs_refine_after_rough_build":false,"allow_reorder":false}
+
+好的,我来帮你看看今天的安排。
-{"route":"execute","speak":"好的,我来帮你排课。","reason":"批量排课需求,有任务类 ID,未给微调偏好","needs_rough_build":true,"needs_refine_after_rough_build":false,"allow_reorder":false}
+
+好的,我来帮你排课。
-{"route":"execute","speak":"好的,我来帮你排课并按你的偏好做微调。","reason":"批量排课需求,有任务类 ID,且给出明确微调偏好","needs_rough_build":true,"needs_refine_after_rough_build":true,"allow_reorder":false}
+
+好的,我来帮你排课并按你的偏好做微调。
-{"route":"execute","speak":"好的,我按你的要求重排。","reason":"用户明确允许打乱顺序","needs_rough_build":false,"needs_refine_after_rough_build":false,"allow_reorder":true}
+
+这是个好问题,让我仔细想想。
-{"route":"deep_answer","speak":"这是个好问题,让我仔细想想。","reason":"需要深度分析但不需要工具"}
+
+明白,我来帮你制定一个完整的学习计划。
-{"route":"plan","speak":"明白,我来帮你制定一个完整的学习计划。","reason":"用户明确要求制定计划"}
+禁止输出任何 JSON、markdown 代码块或额外解释。nonce 必须精确使用给定值。
`
// BuildChatRoutingSystemPrompt 返回路由阶段的系统提示词。
@@ -67,22 +76,21 @@ func BuildChatRoutingSystemPrompt() string {
}
// BuildChatRoutingMessages 组装路由阶段的 messages。
-func BuildChatRoutingMessages(ctx *newagentmodel.ConversationContext, userInput string, state *newagentmodel.CommonState) []*schema.Message {
+func BuildChatRoutingMessages(ctx *newagentmodel.ConversationContext, userInput string, state *newagentmodel.CommonState, nonce string) []*schema.Message {
return buildStageMessages(
BuildChatRoutingSystemPrompt(),
ctx,
- BuildChatRoutingUserPrompt(ctx, userInput, state),
+ BuildChatRoutingUserPrompt(ctx, userInput, state, nonce),
)
}
// BuildChatRoutingUserPrompt 构造路由阶段的用户提示词。
-func BuildChatRoutingUserPrompt(ctx *newagentmodel.ConversationContext, userInput string, state *newagentmodel.CommonState) string {
+func BuildChatRoutingUserPrompt(ctx *newagentmodel.ConversationContext, userInput string, state *newagentmodel.CommonState, nonce string) string {
var sb strings.Builder
- sb.WriteString("请判断用户本轮意图的复杂度,并选择最合适的路由。\n")
- sb.WriteString("若 route=execute 且 needs_rough_build=true,请同时判断 needs_refine_after_rough_build:")
- sb.WriteString("只有用户明确提出微调目标时才为 true。\n")
- sb.WriteString("请同时输出 allow_reorder:只有用户明确授权打乱顺序时才为 true,默认 false。\n")
+ sb.WriteString(fmt.Sprintf("nonce=%s\n", nonce))
+ sb.WriteString(fmt.Sprintf("当前时间=%s\n", time.Now().In(time.Local).Format("2006-01-02 15:04")))
+ sb.WriteString("\n请判断用户本轮意图的复杂度,选择最合适的路由,并输出控制码和对应内容。\n")
// 注入任务类上下文(供粗排判断参考)。
if state != nil && len(state.TaskClassIDs) > 0 {
diff --git a/backend/newAgent/router/chat_route.go b/backend/newAgent/router/chat_route.go
new file mode 100644
index 0000000..265eb5b
--- /dev/null
+++ b/backend/newAgent/router/chat_route.go
@@ -0,0 +1,164 @@
+package newagentrouter
+
+import (
+ "fmt"
+ "regexp"
+ "strings"
+
+ newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
+)
+
+var (
+ // chatRouteHeaderRegex 从模型流式输出中解析 SMARTFLOW_ROUTE 控制码头部。
+ //
+ // 格式示例:
+ //
+ //
+ // 属性说明:
+ // 1. nonce:防注入校验,必须与调用方传入的 nonce 精确匹配;
+ // 2. route:路由目标(direct_reply / execute / deep_answer / plan);
+ // 3. rough_build:可选,仅 route=execute 时有效,默认 false;
+ // 4. refine:可选,仅 rough_build=true 时有效,默认 false;
+ // 5. reorder:可选,仅 route=execute 时有效,默认 false;
+ // 6. thinking:可选,仅 route=execute 时有效,默认 false。
+ chatRouteHeaderRegex = regexp.MustCompile(
+ `(?is)<\s*SMARTFLOW_ROUTE\b` +
+ `[^>]*\bnonce\s*=\s*["']?([a-zA-Z0-9\-]+)["']?` +
+ `[^>]*\broute\s*=\s*["']?(direct_reply|execute|deep_answer|plan)["']?` +
+ `(?:[^>]*\brough_build\s*=\s*["']?(true|false)["']?)?` +
+ `(?:[^>]*\brefine\s*=\s*["']?(true|false)["']?)?` +
+ `(?:[^>]*\breorder\s*=\s*["']?(true|false)["']?)?` +
+ `(?:[^>]*\bthinking\s*=\s*["']?(true|false)["']?)?` +
+ `[^>]*/\s*>`)
+)
+
+// StreamRouteParser 从 LLM 流式输出中增量提取路由决策。
+//
+// 协议约定:模型输出以 SMARTFLOW_ROUTE 控制码标签开头,标签结束后是用户可见内容。
+// 例如:你好!很高兴见到你...
+//
+// 职责边界:
+// 1. 只负责从流式 chunk 中提取控制码并解析为 ChatRoutingDecision;
+// 2. 不负责推送 SSE chunk,不负责决定后续走哪条链路;
+// 3. 控制码解析失败时标记 fallback,由上层决定降级策略。
+type StreamRouteParser struct {
+ buf strings.Builder
+ nonce string
+ routeFound bool
+ decision *newagentmodel.ChatRoutingDecision
+}
+
+// NewStreamRouteParser 创建流式路由解析器。
+func NewStreamRouteParser(nonce string) *StreamRouteParser {
+ return &StreamRouteParser{
+ nonce: strings.ToLower(strings.TrimSpace(nonce)),
+ }
+}
+
+// Feed 写入一段 chunk content。
+//
+// 返回值:
+// - visible:控制码标签之后的内容(用户可见文本);
+// - routeReady:路由决策是否已确定;
+// - err:解析错误。
+//
+// 调用方应在 routeReady=true 后调用 Decision() 获取路由决策,
+// 并根据 route 进入对应分支处理 visible 及后续 chunk。
+func (p *StreamRouteParser) Feed(content string) (visible string, routeReady bool, err error) {
+ if p.routeFound {
+ // 路由已解析,后续 chunk 直接透传。
+ return content, true, nil
+ }
+
+ p.buf.WriteString(content)
+
+ text := p.buf.String()
+ match := chatRouteHeaderRegex.FindStringSubmatchIndex(text)
+ if match == nil {
+ // 控制码尚未完整,检查是否应该 fallback。
+ if len(text) > 500 {
+ // 超过 500 字符仍未匹配到控制码 -> fallback 到 plan。
+ p.routeFound = true
+ p.decision = &newagentmodel.ChatRoutingDecision{
+ Route: newagentmodel.ChatRoutePlan,
+ Raw: text,
+ }
+ return text, true, fmt.Errorf("控制码解析超时,fallback 到 plan")
+ }
+ return "", false, nil
+ }
+
+ // 提取匹配到的子组。
+ groups := chatRouteHeaderRegex.FindStringSubmatch(text)
+ if len(groups) < 3 {
+ return "", false, fmt.Errorf("控制码正则子组不足: %d", len(groups))
+ }
+
+ // nonce 校验。
+ parsedNonce := strings.ToLower(strings.TrimSpace(groups[1]))
+ if parsedNonce != p.nonce {
+ return "", false, fmt.Errorf("nonce 不匹配: got=%s expected=%s", parsedNonce, p.nonce)
+ }
+
+ // 解析 route。
+ route := newagentmodel.ChatRoute(strings.TrimSpace(groups[2]))
+
+ // 解析可选布尔属性(默认 false)。
+ roughBuild := parseOptionalBool(groups, 3)
+ refine := parseOptionalBool(groups, 4)
+ reorder := parseOptionalBool(groups, 5)
+ thinking := parseOptionalBool(groups, 6)
+
+ p.decision = &newagentmodel.ChatRoutingDecision{
+ Route: route,
+ NeedsRoughBuild: roughBuild,
+ NeedsRefineAfterRoughBuild: refine,
+ AllowReorder: reorder,
+ Thinking: thinking,
+ Raw: groups[0],
+ }
+
+ // 归一化与校验。
+ if validateErr := p.decision.Validate(); validateErr != nil {
+ // 校验失败 -> fallback 到 plan。
+ p.decision.Route = newagentmodel.ChatRoutePlan
+ p.decision.NeedsRoughBuild = false
+ p.decision.NeedsRefineAfterRoughBuild = false
+ p.decision.AllowReorder = false
+ p.decision.Thinking = false
+ }
+
+ p.routeFound = true
+
+ // 控制码标签之后的文本作为 visible 返回。
+ fullMatch := groups[0]
+ tagEndIdx := strings.Index(text, fullMatch)
+ if tagEndIdx >= 0 {
+ afterTag := text[tagEndIdx+len(fullMatch):]
+ // 去掉标签后紧跟的换行符(如果有)。
+ afterTag = strings.TrimPrefix(afterTag, "\r\n")
+ afterTag = strings.TrimPrefix(afterTag, "\n")
+ return afterTag, true, nil
+ }
+
+ return "", true, nil
+}
+
+// RouteReady 返回路由决策是否已确定。
+func (p *StreamRouteParser) RouteReady() bool {
+ return p.routeFound
+}
+
+// Decision 返回已解析的路由决策(RouteReady=true 后可用)。
+func (p *StreamRouteParser) Decision() *newagentmodel.ChatRoutingDecision {
+ return p.decision
+}
+
+// parseOptionalBool 从正则子组中解析可选布尔值。
+// 如果子组不存在或为空,返回 false。
+func parseOptionalBool(groups []string, index int) bool {
+ if index >= len(groups) {
+ return false
+ }
+ return strings.TrimSpace(groups[index]) == "true"
+}
diff --git a/backend/newAgent/stream/emitter.go b/backend/newAgent/stream/emitter.go
index a53d5d2..95a6ab4 100644
--- a/backend/newAgent/stream/emitter.go
+++ b/backend/newAgent/stream/emitter.go
@@ -3,19 +3,22 @@ package newagentstream
import (
"context"
"fmt"
+ "io"
"strings"
"time"
+
+ infrallm "github.com/LoveLosita/smartflow/backend/infra/llm"
)
// PayloadEmitter 是真正向外层 SSE 管道写 chunk 的最小接口。
//
// 说明:
// 1. 这里刻意不用 chan/string 绑死实现;
-// 2. 上层既可以传“写 channel”的函数,也可以传“写 gin stream”的函数;
+// 2. 上层既可以传"写 channel"的函数,也可以传"写 gin stream"的函数;
// 3. 只要签名是 `func(string) error`,都能接进来。
type PayloadEmitter func(payload string) error
-// StageEmitter 是 graph/node 对“当前阶段”进行推送的兼容接口。
+// StageEmitter 是 graph/node 对"当前阶段"进行推送的兼容接口。
//
// 设计说明:
// 1. 旧调用侧仍然只关心 stage/detail 两段文本,因此这里先保留;
@@ -23,7 +26,7 @@ type PayloadEmitter func(payload string) error
// 3. 这样能兼顾当前兼容性和后续协议升级空间。
type StageEmitter func(stage, detail string)
-// PseudoStreamOptions 描述“整段文字伪流式输出”的切块与节奏配置。
+// PseudoStreamOptions 描述"整段文字伪流式输出"的切块与节奏配置。
//
// 字段语义:
// 1. MinChunkRunes:达到该最小长度后,若命中标点/换行等边界,可提前切块;
@@ -51,7 +54,7 @@ func DefaultPseudoStreamOptions() PseudoStreamOptions {
// ChunkEmitter 是 newAgent 统一的 SSE chunk 发射器。
//
// 职责边界:
-// 1. 负责把“正文 / 思考 / 工具事件 / 确认请求 / 中断提示”统一转换成 OpenAI 兼容 payload;
+// 1. 负责把"正文 / 思考 / 工具事件 / 确认请求 / 中断提示"统一转换成 OpenAI 兼容 payload;
// 2. 负责在必要时把结构化事件附带成 extra,同时给当前前端提供可读的降级文本;
// 3. 不负责决定什么时候发什么,也不负责持久化状态。
type ChunkEmitter struct {
@@ -365,7 +368,92 @@ func (e *ChunkEmitter) EmitDone() error {
return e.emit("[DONE]")
}
-// EmitStageAsReasoning 把“阶段提示”伪装成 reasoning chunk 推给前端。
+// EmitStreamAssistantText 从 StreamReader 逐 chunk 读取并实时推送 assistant 正文。
+//
+// 职责边界:
+// 1. 负责把 StreamReader 的每个 chunk 实时转换为 SSE payload 推送;
+// 2. 负责累计完整文本并返回,供调用方写入 history;
+// 3. 不负责打开/关闭 StreamReader,调用方负责生命周期管理。
+func (e *ChunkEmitter) EmitStreamAssistantText(
+ ctx context.Context,
+ reader infrallm.StreamReader,
+ blockID, stage string,
+) (string, error) {
+ if e == nil || reader == nil {
+ return "", nil
+ }
+
+ var fullText strings.Builder
+ firstChunk := true
+
+ for {
+ chunk, err := reader.Recv()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ return fullText.String(), err
+ }
+
+ // 推送 reasoning content。
+ if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
+ if emitErr := e.EmitReasoningText(blockID, stage, chunk.ReasoningContent, firstChunk); emitErr != nil {
+ return fullText.String(), emitErr
+ }
+ firstChunk = false
+ }
+
+ // 推送 assistant 正文。
+ if chunk != nil && chunk.Content != "" {
+ if emitErr := e.EmitAssistantText(blockID, stage, chunk.Content, firstChunk); emitErr != nil {
+ return fullText.String(), emitErr
+ }
+ fullText.WriteString(chunk.Content)
+ firstChunk = false
+ }
+ }
+
+ return fullText.String(), nil
+}
+
+// EmitStreamReasoningText 从 StreamReader 逐 chunk 读取并实时推送 reasoning 文字。
+//
+// 与 EmitStreamAssistantText 结构相同,但只推送 ReasoningContent,不推送 Content。
+// 用于只需展示思考过程而无需展示正文的场景。
+func (e *ChunkEmitter) EmitStreamReasoningText(
+ ctx context.Context,
+ reader infrallm.StreamReader,
+ blockID, stage string,
+) (string, error) {
+ if e == nil || reader == nil {
+ return "", nil
+ }
+
+ var fullText strings.Builder
+ firstChunk := true
+
+ for {
+ chunk, err := reader.Recv()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ return fullText.String(), err
+ }
+
+ if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
+ if emitErr := e.EmitReasoningText(blockID, stage, chunk.ReasoningContent, firstChunk); emitErr != nil {
+ return fullText.String(), emitErr
+ }
+ fullText.WriteString(chunk.ReasoningContent)
+ firstChunk = false
+ }
+ }
+
+ return fullText.String(), nil
+}
+
+// EmitStageAsReasoning 把"阶段提示"伪装成 reasoning chunk 推给前端。
//
// 兼容说明:
// 1. 保留旧函数签名,方便当前旧链路直接复用;
@@ -378,7 +466,7 @@ func EmitStageAsReasoning(emit PayloadEmitter, requestID, modelName string, crea
// EmitAssistantReply 把一段完整正文作为 assistant chunk 推出。
//
// 注意:
-// 1. 这里保持“整段发”,不主动切块;
+// 1. 这里保持"整段发",不主动切块;
// 2. 若后续某条链路需要更自然的阅读节奏,应直接调用 EmitPseudoAssistantText;
// 3. 为兼容老调用侧,这里 blockID 和 stage 都留空。
func EmitAssistantReply(emit PayloadEmitter, requestID, modelName string, created int64, content string, includeRole bool) error {
@@ -493,7 +581,7 @@ func (e *ChunkEmitter) emitPseudoText(ctx context.Context, text string, options
return nil
}
-// SplitPseudoStreamText 按“标点优先、长度兜底”的策略切分整段文本。
+// SplitPseudoStreamText 按"标点优先、长度兜底"的策略切分整段文本。
//
// 步骤说明:
// 1. 优先在句号、问号、感叹号、分号、换行等自然边界切块,保证阅读顺畅;
diff --git a/backend/service/agentsvc/agent.go b/backend/service/agentsvc/agent.go
index 7a4c032..5c1cb6a 100644
--- a/backend/service/agentsvc/agent.go
+++ b/backend/service/agentsvc/agent.go
@@ -90,6 +90,12 @@ func normalizeConversationID(chatID string) string {
return trimmed
}
+// thinkingModeToBool 将前端传入的 thinking 模式转换为旧链路所需的 bool 值。
+// 仅 "true" 返回 true,其余("false"/"auto"/"")均返回 false。
+func thinkingModeToBool(mode string) bool {
+ return strings.TrimSpace(strings.ToLower(mode)) == "true"
+}
+
// pickChatModel 根据请求选择模型。
// 当前约定:
// - strategist:策略模型;
@@ -569,7 +575,7 @@ func (s *AgentService) runNormalChatFlow(
s.ensureConversationTitleAsync(userID, chatID)
}
-func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThinking bool, modelName string, userID int, chatID string, extra map[string]any) (<-chan string, <-chan error) {
+func (s *AgentService) AgentChat(ctx context.Context, userMessage string, thinkingMode string, modelName string, userID int, chatID string, extra map[string]any) (<-chan string, <-chan error) {
requestStart := time.Now()
traceID := uuid.NewString()
@@ -578,7 +584,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
go func() {
defer close(outChan)
- s.runNewAgentGraph(ctx, userMessage, ifThinking, modelName, userID, chatID, extra, traceID, requestStart, outChan, errChan)
+ s.runNewAgentGraph(ctx, userMessage, thinkingMode, modelName, userID, chatID, extra, traceID, requestStart, outChan, errChan)
}()
return outChan, errChan
@@ -586,7 +592,8 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
// agentChatOld 是旧路由逻辑的备份,暂时保留供回滚使用。
// TODO: 新 graph 稳定后删除。
-func (s *AgentService) agentChatOld(ctx context.Context, userMessage string, ifThinking bool, modelName string, userID int, chatID string, extra map[string]any) (<-chan string, <-chan error) {
+func (s *AgentService) agentChatOld(ctx context.Context, userMessage string, thinkingMode string, modelName string, userID int, chatID string, extra map[string]any) (<-chan string, <-chan error) {
+ ifThinking := thinkingModeToBool(thinkingMode)
requestStart := time.Now()
traceID := uuid.NewString()
diff --git a/backend/service/agentsvc/agent_newagent.go b/backend/service/agentsvc/agent_newagent.go
index 26c49e1..c55a4d3 100644
--- a/backend/service/agentsvc/agent_newagent.go
+++ b/backend/service/agentsvc/agent_newagent.go
@@ -44,7 +44,7 @@ const (
func (s *AgentService) runNewAgentGraph(
ctx context.Context,
userMessage string,
- ifThinking bool,
+ thinkingMode string,
modelName string,
userID int,
chatID string,
@@ -113,9 +113,11 @@ func (s *AgentService) runNewAgentGraph(
// 5.1.2 检索失败只降级为“本轮不注入记忆”,不阻断主链路。
s.injectMemoryContext(requestCtx, conversationContext, userID, chatID, userMessage)
- // 5.5 若 extra 携带 task_class_ids,校验后写入 CommonState(仅首轮/尚未设置时生效,跨轮持久化)。
- // 校验:通过 LoadTaskClassMetas → GetCompleteTaskClassesByIDs 检查所有 ID 是否存在且属于当前用户;
- // 校验失败时向 errChan 推送 WrongTaskClassID(code=40040),前端收到 SSE 错误事件。
+ // 5.5 将前端传入的 thinkingMode 写入 CommonState,供 ChatNode 及下游节点读取。
+ cs := runtimeState.EnsureCommonState()
+ cs.ThinkingMode = thinkingMode
+
+ // 5.6 若 extra 携带 task_class_ids,校验后写入 CommonState(仅首轮/尚未设置时生效,跨轮持久化)。
if taskClassIDs := readAgentExtraIntSlice(extra, "task_class_ids"); len(taskClassIDs) > 0 {
cs := runtimeState.EnsureCommonState()
if len(cs.TaskClassIDs) == 0 {
@@ -186,7 +188,7 @@ func (s *AgentService) runNewAgentGraph(
pushErrNonBlocking(errChan, fmt.Errorf("graph 执行失败: %w", graphErr))
// Graph 出错时回退普通聊天,保证可用性。
- s.runNormalChatFlow(requestCtx, s.AIHub.Worker, resolvedModelName, userMessage, "", nil, retryMeta, ifThinking, userID, chatID, traceID, requestStart, outChan, errChan)
+ s.runNormalChatFlow(requestCtx, s.AIHub.Worker, resolvedModelName, userMessage, "", nil, retryMeta, thinkingModeToBool(thinkingMode), userID, chatID, traceID, requestStart, outChan, errChan)
return
}