Version: 0.9.18.dev.260415

后端:
1. ChatNode 路由从 GenerateJSON 重构为流式控制码路由
- 新建 backend/newAgent/router/chat_route.go:流式增量控制码解析器 StreamRouteParser,复用 agent 的 <SMARTFLOW_ROUTE> 正则模式
- 更新 backend/newAgent/node/chat.go:RunChatNode 从 GenerateJSON(阻塞等完整 JSON)改为 Stream + 控制码解析 + 分支流式处理
- streamAndDispatch 核心循环:逐 chunk 喂解析器,控制码解析后按 route 分发
- handleDirectReplyStream:thinking=false 同一流续传,thinking=true 关流后二次 thinking 调用
- handleDeepAnswerStream:移除"让我想想"过渡语,直接关流后发起第二次流式调用(thinking 由 effectiveThinking 控制)
- handleRouteExecuteStream / handleRoutePlanStream:关流 → 推送 status → 设 Phase
- 更新 backend/newAgent/prompt/chat.go:路由 prompt 从 JSON 格式改为控制码标签格式
- 更新 backend/newAgent/model/chat_contract.go:ChatRoutingDecision 新增 Thinking / Raw 字段,移除 Speak / Reason
2. Thinking 参数从 bool 扩展为 string 三态
- 更新 backend/model/agent.go:UserSendMessageRequest.Thinking 从 bool 改为 string
- 更新 backend/service/agentsvc/agent.go:AgentChat / runNormalChatFlow 适配 string 类型,新增 thinkingModeToBool 兼容旧链路
- 更新 backend/service/agentsvc/agent_newagent.go:runNewAgentGraph 接收 thinkingMode string 并注入 CommonState
3. CommonState 新增 ThinkingMode / ExecuteThinking 字段
- 更新 backend/newAgent/model/common_state.go:ThinkingMode 控制下游 thinking 行为("true" 强开 / "false" 强关 / "auto"交路由决策)
- ChatNode 通过 resolveEffectiveThinking 合并前端偏好与路由决策,传递给所有下游处理函数
4. 新增真流式推送方法
- 更新 backend/newAgent/stream/emitter.go:新增 EmitStreamAssistantText / EmitStreamReasoningText,桥接 StreamReader → SSE chunk
前端:无
仓库:无
This commit is contained in:
LoveLosita
2026-04-15 11:04:27 +08:00
parent b72e202822
commit 21eed5af75
9 changed files with 658 additions and 234 deletions

View File

@@ -26,18 +26,19 @@ const (
//
// 职责边界:
// 1. Route 决定后续处理路径;
// 2. Speak 始终填写:给用户看的话
// 3. NeedsRoughBuild 仅在 route=execute 且满足粗排条件时为 true
// 4. NeedsRefineAfterRoughBuild 仅在 needs_rough_build=true 时有效
// 5. AllowReorder 表示是否允许打乱 suggested 任务顺序,仅用户明确授权时应为 true
// 6. Reason 给后端和日志看。
// 2. NeedsRoughBuild 仅在 route=execute 且满足粗排条件时为 true
// 3. NeedsRefineAfterRoughBuild 仅在 needs_rough_build=true 时有效
// 4. AllowReorder 表示是否允许打乱 suggested 任务顺序,仅用户明确授权时应为 true
// 5. Thinking 表示下游 Execute 节点是否应开启深度思考
// 6. Raw 保留控制码原文,供日志排查;
// 7. 用户可见内容speak由流式输出自然产出不由本结构承载。
type ChatRoutingDecision struct {
Route ChatRoute `json:"route"`
Speak string `json:"speak,omitempty"`
NeedsRoughBuild bool `json:"needs_rough_build,omitempty"`
NeedsRefineAfterRoughBuild bool `json:"needs_refine_after_rough_build,omitempty"`
AllowReorder bool `json:"allow_reorder,omitempty"`
Reason string `json:"reason,omitempty"`
Route ChatRoute
NeedsRoughBuild bool
NeedsRefineAfterRoughBuild bool
AllowReorder bool
Thinking bool
Raw string
}
// Normalize 统一清洗路由决策中的字符串字段。
@@ -46,8 +47,7 @@ func (d *ChatRoutingDecision) Normalize() {
return
}
d.Route = ChatRoute(strings.TrimSpace(string(d.Route)))
d.Speak = strings.TrimSpace(d.Speak)
d.Reason = strings.TrimSpace(d.Reason)
d.Raw = strings.TrimSpace(d.Raw)
}
// Validate 校验路由决策的最小合法性。
@@ -67,16 +67,12 @@ func (d *ChatRoutingDecision) Validate() error {
return fmt.Errorf("未知 route: %s", d.Route)
}
// direct_reply 必须有 speak。
if d.Route == ChatRouteDirectReply && d.Speak == "" {
return fmt.Errorf("direct_reply 必须携带 speak")
}
// 非 execute 路由不应携带粗排和粗排后微调标记,统一归一化为 false。
if d.Route != ChatRouteExecute {
d.NeedsRoughBuild = false
d.NeedsRefineAfterRoughBuild = false
d.AllowReorder = false
d.Thinking = false
}
// 只有 needs_rough_build=true 时needs_refine_after_rough_build 才有语义。
if !d.NeedsRoughBuild {

View File

@@ -30,7 +30,7 @@ const (
FlowTerminalStatusExhausted FlowTerminalStatus = "exhausted"
)
// FlowTerminalOutcome 保存流程为什么结束的最终结果快照。
// FlowTerminalOutcome 保存"流程为什么结束"的最终结果快照。
//
// 职责边界:
// 1. Stage 说明终止发生在哪个阶段,便于 graph/deliver/debug 统一收口;
@@ -97,7 +97,7 @@ type CommonState struct {
// NeedsRoughBuild 由 Plan 节点在 plan_done 时写入,标记 Confirm 后是否需要走粗排节点。
// 粗排节点执行完毕后会将此字段重置为 false。
NeedsRoughBuild bool `json:"needs_rough_build,omitempty"`
// NeedsRefineAfterRoughBuild 表示粗排完成后是否需要立即进入微调
// NeedsRefineAfterRoughBuild 表示"粗排完成后是否需要立即进入微调"
//
// 说明:
// 1. 该标记主要用于 chat->execute 的直执行链路;
@@ -105,13 +105,21 @@ type CommonState struct {
// 3. false 表示用户仅要求完成排入,粗排成功后可直接收口,等待后续再优化。
NeedsRefineAfterRoughBuild bool `json:"needs_refine_after_rough_build,omitempty"`
// AllowReorder 表示本轮是否允许打乱 suggested 任务的相对顺序。
// 默认 false只有用户明确说明可以打乱顺序/顺序不重要才会为 true。
// 默认 false只有用户明确说明"可以打乱顺序/顺序不重要"才会为 true。
AllowReorder bool `json:"allow_reorder,omitempty"`
// SuggestedOrderBaseline 保存本轮 execute 启动前的 suggested 任务相对顺序基线。
// SuggestedOrderBaseline 保存"本轮 execute 启动前"的 suggested 任务相对顺序基线。
// OrderGuard 节点会基于该基线判断微调是否破坏顺序约束。
SuggestedOrderBaseline []int `json:"suggested_order_baseline,omitempty"`
// TerminalOutcome 保存“本轮流程最终如何结束”的统一收口结果
// ExecuteThinking 由 Chat 路由决策传入,表示 Execute 节点是否应开启深度思考
// 预埋字段,当前阶段 Execute 节点可自行决定是否读取。
ExecuteThinking bool `json:"execute_thinking,omitempty"`
// ThinkingMode 由前端传入,控制所有下游 LLM 调用的 thinking 行为。
// "true" 强制开启,"false" 强制关闭,"auto"(默认)交给路由决策。
ThinkingMode string `json:"thinking_mode,omitempty"`
// TerminalOutcome 保存"本轮流程最终如何结束"的统一收口结果。
// 第二轮开始rough_build / execute / deliver 都应围绕这份快照判断收口语义。
TerminalOutcome *FlowTerminalOutcome `json:"terminal_outcome,omitempty"`
}
@@ -184,12 +192,12 @@ func (s *CommonState) RejectPlan() {
s.ClearTerminalOutcome()
}
// ResetForNextRun 在上一轮已经收口,且本轮准备开始新请求时重置执行期临时状态。
// ResetForNextRun 在"上一轮已经收口,且本轮准备开始新请求"时重置执行期临时状态。
//
// 职责边界:
// 1. 负责清理会污染新一轮执行的临时字段(轮次、修正计数、计划游标、粗排开关、顺序基线、终止结果);
// 2. 不负责清理会话身份与跨轮共享数据ConversationID/UserID/TaskClassIDs/TaskClasses/历史上下文/ScheduleState
// 3. 该方法是幂等操作:重复调用不会引入额外副作用,便于在加载兜底 + chat 入口双保险场景下复用。
// 3. 该方法是幂等操作:重复调用不会引入额外副作用,便于在"加载兜底 + chat 入口"双保险场景下复用。
func (s *CommonState) ResetForNextRun() {
if s == nil {
return
@@ -237,7 +245,7 @@ func (s *CommonState) Done() {
}
}
// Abort 将当前流程标记为业务语义上的主动终止
// Abort 将当前流程标记为"业务语义上的主动终止"
//
// 步骤说明:
// 1. 统一写入 PhaseDone保证 graph 后续直接进入 deliver 收口;
@@ -255,7 +263,7 @@ func (s *CommonState) Abort(stage, code, userMessage, internalReason string) {
s.TerminalOutcome.Normalize()
}
// Exhaust 将当前流程标记为安全边界触发的被动停止
// Exhaust 将当前流程标记为"安全边界触发的被动停止"
func (s *CommonState) Exhaust(stage, userMessage, internalReason string) {
s.Phase = PhaseDone
s.TerminalOutcome = &FlowTerminalOutcome{
@@ -289,17 +297,17 @@ func (s *CommonState) TerminalStatus() FlowTerminalStatus {
return s.TerminalOutcome.Status
}
// IsCompleted 判断当前是否属于正常完成
// IsCompleted 判断当前是否属于"正常完成"
func (s *CommonState) IsCompleted() bool {
return s.TerminalStatus() == FlowTerminalStatusCompleted
}
// IsAborted 判断当前是否属于主动中止
// IsAborted 判断当前是否属于"主动中止"
func (s *CommonState) IsAborted() bool {
return s.TerminalStatus() == FlowTerminalStatusAborted
}
// IsExhaustedTerminal 判断当前是否属于轮次耗尽收口
// IsExhaustedTerminal 判断当前是否属于"轮次耗尽收口"
func (s *CommonState) IsExhaustedTerminal() bool {
return s.TerminalStatus() == FlowTerminalStatusExhausted
}

View File

@@ -3,15 +3,18 @@ package newagentnode
import (
"context"
"fmt"
"io"
"log"
"strings"
"time"
"github.com/cloudwego/eino/schema"
"github.com/google/uuid"
infrallm "github.com/LoveLosita/smartflow/backend/infra/llm"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt"
newagentrouter "github.com/LoveLosita/smartflow/backend/newAgent/router"
newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream"
)
@@ -21,7 +24,7 @@ const (
chatSpeakBlockID = "chat.speak"
// chatHistoryKindKey 用于在 history 中打运行态标记,供 prompt 层做上下文分层。
chatHistoryKindKey = "newagent_history_kind"
// chatHistoryKindExecuteLoopClosed 表示上一轮 execute loop 已正常收口
// chatHistoryKindExecuteLoopClosed 表示"上一轮 execute loop 已正常收口"
// prompt 侧会据此把旧 loop 归档到 msg1而不是继续占用 msg2 窗口。
chatHistoryKindExecuteLoopClosed = "execute_loop_closed"
)
@@ -75,9 +78,9 @@ func RunChatNode(ctx context.Context, input ChatNodeInput) error {
if !runtimeState.HasPendingInteraction() && flowState.Phase == newagentmodel.PhaseDone {
terminalBefore := flowState.TerminalStatus()
roundBefore := flowState.RoundUsed
// 1. 只有正常完成(completed)才打 loop 收口标记:
// 1.1 这样下一轮进入 execute 时msg2 会只保留当前活跃循环窗口;
// 1.2 异常收口exhausted/aborted不打标记允许后续继续时沿用上一轮 loop 轨迹。
// 1. 只有"正常完成(completed)"才打 loop 收口标记:
// 1.1 这样下一轮进入 execute 时msg2 会只保留"当前活跃循环"窗口;
// 1.2 异常收口exhausted/aborted不打标记允许后续"继续"时沿用上一轮 loop 轨迹。
if terminalBefore == newagentmodel.FlowTerminalStatusCompleted {
appendExecuteLoopClosedMarker(conversationContext)
}
@@ -89,86 +92,28 @@ func RunChatNode(ctx context.Context, input ChatNodeInput) error {
terminalBefore,
)
}
messages := newagentprompt.BuildChatRoutingMessages(conversationContext, input.UserInput, flowState)
nonce := uuid.NewString()
messages := newagentprompt.BuildChatRoutingMessages(conversationContext, input.UserInput, flowState, nonce)
decision, rawResult, err := infrallm.GenerateJSON[newagentmodel.ChatRoutingDecision](
ctx,
input.Client,
messages,
infrallm.GenerateOptions{
Temperature: 0.1,
MaxTokens: 500,
Thinking: infrallm.ThinkingModeDisabled,
Metadata: map[string]any{
"stage": chatStageName,
"phase": "routing",
},
reader, err := input.Client.Stream(ctx, messages, infrallm.GenerateOptions{
Temperature: 0.7,
Thinking: infrallm.ThinkingModeDisabled,
Metadata: map[string]any{
"stage": chatStageName,
"phase": "routing",
},
)
rawText := ""
if rawResult != nil {
rawText = strings.TrimSpace(rawResult.Text)
}
})
if err != nil {
// 路由失败 → 保守:走 plan。
log.Printf("[WARN] chat routing LLM failed chat=%s raw=%s err=%v",
flowState.ConversationID, rawText, err)
log.Printf("[WARN] chat routing stream failed chat=%s err=%v", flowState.ConversationID, err)
flowState.Phase = newagentmodel.PhasePlanning
return nil
}
if validateErr := decision.Validate(); validateErr != nil {
log.Printf("[WARN] chat routing decision invalid chat=%s raw=%s err=%v",
flowState.ConversationID, rawText, validateErr)
flowState.Phase = newagentmodel.PhasePlanning
return nil
}
// 1. 二次粗排硬闸门:若上下文已存在 rough_build_done 且用户未明确要求“重新粗排”,
// 则强制关闭 needs_rough_build避免“微调请求被误判成再次粗排”。
// 2. 该闸门只收紧粗排开关,不改路由 route确保 execute 微调链路仍可继续。
// 3. 一旦用户明确表达“从头重排/重新粗排”,仍允许 needs_rough_build=true 生效。
if shouldDisableRoughBuildForRefine(conversationContext, input.UserInput, decision) {
decision.NeedsRoughBuild = false
decision.NeedsRefineAfterRoughBuild = false
}
log.Printf(
"[DEBUG] chat routing chat=%s route=%s needs_rough_build=%v needs_refine_after_rough_build=%v allow_reorder=%v has_rough_build_done=%v task_class_count=%d reason=%s",
flowState.ConversationID,
decision.Route,
decision.NeedsRoughBuild,
decision.NeedsRefineAfterRoughBuild,
decision.AllowReorder,
hasRoughBuildDoneMarker(conversationContext),
len(flowState.TaskClassIDs),
decision.Reason,
)
flowState.AllowReorder = resolveAllowReorder(input.UserInput, decision.AllowReorder)
// 3. 按路由决策推进。
switch decision.Route {
case newagentmodel.ChatRouteDirectReply:
return handleDirectReply(ctx, decision, conversationContext, emitter, flowState)
case newagentmodel.ChatRouteExecute:
return handleRouteExecute(decision, emitter, flowState)
case newagentmodel.ChatRouteDeepAnswer:
return handleDeepAnswer(ctx, input, decision, conversationContext, emitter, flowState)
case newagentmodel.ChatRoutePlan:
return handleRoutePlan(decision, emitter, flowState)
default:
flowState.Phase = newagentmodel.PhasePlanning
return nil
}
parser := newagentrouter.NewStreamRouteParser(nonce)
return streamAndDispatch(ctx, reader, parser, input, emitter, flowState, conversationContext)
}
// appendExecuteLoopClosedMarker 在 history 中写入execute loop 已正常收口标记。
// appendExecuteLoopClosedMarker 在 history 中写入"execute loop 已正常收口"标记。
//
// 职责边界:
// 1. 只负责写一个轻量 marker供 prompt 分层;
@@ -207,51 +152,254 @@ func isExecuteLoopClosedMarker(msg *schema.Message) bool {
return strings.TrimSpace(kind) == chatHistoryKindExecuteLoopClosed
}
// handleDirectReply 处理简单任务:直接输出回复
func handleDirectReply(
// streamAndDispatch 是流式路由分发的核心循环
//
// 步骤说明:
// 1. 从 StreamReader 逐 chunk 读取,喂给 StreamRouteParser 增量解析控制码;
// 2. 控制码解析完成后,根据 route 进入对应的流式处理分支;
// 3. 控制码解析超时或流异常结束 → fallback 到 plan。
func streamAndDispatch(
ctx context.Context,
decision *newagentmodel.ChatRoutingDecision,
conversationContext *newagentmodel.ConversationContext,
reader infrallm.StreamReader,
parser *newagentrouter.StreamRouteParser,
input ChatNodeInput,
emitter *newagentstream.ChunkEmitter,
flowState *newagentmodel.CommonState,
conversationContext *newagentmodel.ConversationContext,
) error {
if strings.TrimSpace(decision.Speak) != "" {
if err := emitter.EmitPseudoAssistantText(
ctx, chatSpeakBlockID, chatStageName,
decision.Speak,
newagentstream.DefaultPseudoStreamOptions(),
); err != nil {
return fmt.Errorf("闲聊回复推送失败: %w", err)
for {
chunk, err := reader.Recv()
if err == io.EOF {
if !parser.RouteReady() {
log.Printf("[WARN] chat stream ended before route resolved chat=%s", flowState.ConversationID)
flowState.Phase = newagentmodel.PhasePlanning
return nil
}
break
}
conversationContext.AppendHistory(schema.AssistantMessage(decision.Speak, nil))
if err != nil {
log.Printf("[WARN] chat stream recv error chat=%s err=%v", flowState.ConversationID, err)
flowState.Phase = newagentmodel.PhasePlanning
return nil
}
content := ""
if chunk != nil {
content = chunk.Content
}
visible, routeReady, _ := parser.Feed(content)
if !routeReady {
continue
}
// 控制码解析完成,进入路由分发。
decision := parser.Decision()
// 二次粗排硬闸门:若上下文已存在 rough_build_done 且用户未明确要求"重新粗排"
// 则强制关闭 needs_rough_build避免"微调请求被误判成再次粗排"。
if shouldDisableRoughBuildForRefine(conversationContext, input.UserInput, decision) {
decision.NeedsRoughBuild = false
decision.NeedsRefineAfterRoughBuild = false
}
log.Printf(
"[DEBUG] chat routing chat=%s route=%s needs_rough_build=%v needs_refine_after_rough_build=%v allow_reorder=%v thinking=%v has_rough_build_done=%v task_class_count=%d raw=%s",
flowState.ConversationID,
decision.Route,
decision.NeedsRoughBuild,
decision.NeedsRefineAfterRoughBuild,
decision.AllowReorder,
decision.Thinking,
hasRoughBuildDoneMarker(conversationContext),
len(flowState.TaskClassIDs),
decision.Raw,
)
flowState.AllowReorder = resolveAllowReorder(input.UserInput, decision.AllowReorder)
effectiveThinking := resolveEffectiveThinking(flowState.ThinkingMode, decision.Thinking)
switch decision.Route {
case newagentmodel.ChatRouteDirectReply:
return handleDirectReplyStream(ctx, reader, input, emitter, conversationContext, flowState, effectiveThinking, visible)
case newagentmodel.ChatRouteExecute:
return handleRouteExecuteStream(reader, emitter, flowState, decision, input.UserInput, effectiveThinking, visible)
case newagentmodel.ChatRouteDeepAnswer:
return handleDeepAnswerStream(ctx, reader, input, emitter, conversationContext, flowState, effectiveThinking)
case newagentmodel.ChatRoutePlan:
return handleRoutePlanStream(reader, emitter, flowState, effectiveThinking, visible)
default:
flowState.Phase = newagentmodel.PhasePlanning
return nil
}
}
return nil
}
// resolveEffectiveThinking 根据前端 ThinkingMode 和路由决策合并出最终 thinking 状态。
//
// 规则:
// - "true" 强制开启;
// - "false" 强制关闭;
// - "auto"/"" 交给路由决策的 decisionThinking。
func resolveEffectiveThinking(mode string, decisionThinking bool) bool {
switch strings.TrimSpace(strings.ToLower(mode)) {
case "true":
return true
case "false":
return false
default:
return decisionThinking
}
}
// handleDirectReplyStream 处理闲聊回复。
//
// 两种模式:
// 1. thinking=false同一流续传逐 chunk 推送;
// 2. thinking=true关闭路由流发起第二次 thinking 流式调用。
func handleDirectReplyStream(
ctx context.Context,
reader infrallm.StreamReader,
input ChatNodeInput,
emitter *newagentstream.ChunkEmitter,
conversationContext *newagentmodel.ConversationContext,
flowState *newagentmodel.CommonState,
effectiveThinking bool,
firstVisible string,
) error {
if effectiveThinking {
return handleThinkingReplyStream(ctx, reader, input, emitter, conversationContext, flowState)
}
return handleDirectReplyContinueStream(ctx, reader, emitter, conversationContext, flowState, firstVisible)
}
// handleThinkingReplyStream 处理需要思考的回复:关闭路由流 → 第二次 thinking 流式调用。
func handleThinkingReplyStream(
ctx context.Context,
reader infrallm.StreamReader,
input ChatNodeInput,
emitter *newagentstream.ChunkEmitter,
conversationContext *newagentmodel.ConversationContext,
flowState *newagentmodel.CommonState,
) error {
_ = reader.Close()
deepMessages := newagentprompt.BuildDeepAnswerMessages(conversationContext, input.UserInput)
deepReader, err := input.Client.Stream(ctx, deepMessages, infrallm.GenerateOptions{
Temperature: 0.5,
MaxTokens: 2000,
Thinking: infrallm.ThinkingModeEnabled,
Metadata: map[string]any{
"stage": chatStageName,
"phase": "direct_reply_thinking",
},
})
if err != nil {
log.Printf("[WARN] thinking reply stream failed chat=%s err=%v", flowState.ConversationID, err)
flowState.Phase = newagentmodel.PhaseChatting
return nil
}
deepText, err := emitter.EmitStreamAssistantText(ctx, deepReader, chatSpeakBlockID, chatStageName)
_ = deepReader.Close()
if err != nil {
log.Printf("[WARN] thinking reply emit error chat=%s err=%v", flowState.ConversationID, err)
flowState.Phase = newagentmodel.PhaseChatting
return nil
}
deepText = strings.TrimSpace(deepText)
if deepText != "" {
conversationContext.AppendHistory(schema.AssistantMessage(deepText, nil))
}
flowState.Phase = newagentmodel.PhaseChatting
return nil
}
// handleRouteExecute 处理中等任务:推送简短确认,设 PhaseExecuting
// handleDirectReplyContinueStream 处理无思考的闲聊:同一流续传
func handleDirectReplyContinueStream(
ctx context.Context,
reader infrallm.StreamReader,
emitter *newagentstream.ChunkEmitter,
conversationContext *newagentmodel.ConversationContext,
flowState *newagentmodel.CommonState,
firstVisible string,
) error {
var fullText strings.Builder
fullText.WriteString(firstVisible)
// 推送控制码之后的第一段内容。
if strings.TrimSpace(firstVisible) != "" {
if err := emitter.EmitAssistantText(chatSpeakBlockID, chatStageName, firstVisible, true); err != nil {
return fmt.Errorf("闲聊回复推送失败: %w", err)
}
}
firstChunk := firstVisible == ""
// 继续读同一个流,逐 chunk 推送。
for {
chunk, err := reader.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("[WARN] direct_reply stream error chat=%s err=%v", flowState.ConversationID, err)
break
}
if chunk == nil || chunk.Content == "" {
continue
}
if err := emitter.EmitAssistantText(chatSpeakBlockID, chatStageName, chunk.Content, firstChunk); err != nil {
return fmt.Errorf("闲聊回复推送失败: %w", err)
}
fullText.WriteString(chunk.Content)
firstChunk = false
}
text := fullText.String()
if strings.TrimSpace(text) != "" {
conversationContext.AppendHistory(schema.AssistantMessage(text, nil))
}
flowState.Phase = newagentmodel.PhaseChatting
return nil
}
// handleRouteExecuteStream 处理工具调用路由:推送状态确认 → 设 PhaseExecuting。
//
// 不把 speak 写入 history因为真正的回复由 Execute 节点产出。
func handleRouteExecute(
decision *newagentmodel.ChatRoutingDecision,
// 说明:
// 1. 关闭路由流(后续内容不需要);
// 2. 推送轻量状态通知;
// 3. 设置流程状态,进入 Execute 或 RoughBuild。
func handleRouteExecuteStream(
reader infrallm.StreamReader,
emitter *newagentstream.ChunkEmitter,
flowState *newagentmodel.CommonState,
decision *newagentmodel.ChatRoutingDecision,
userInput string,
effectiveThinking bool,
speak string,
) error {
speak := strings.TrimSpace(decision.Speak)
if speak == "" {
// 关闭路由流。
_ = reader.Close()
if strings.TrimSpace(speak) == "" {
speak = "好的,我来处理。"
}
// 推送轻量状态通知,让前端知道请求已接收
// 推送轻量状态通知。
_ = emitter.EmitStatus(chatStatusBlockID, chatStageName, "accepted", speak, false)
// 清空旧 PlanSteps 并设 PhaseExecuting,避免上一次任务残留的步骤被 HasPlan() 误判
// 清空旧 PlanSteps 并设 PhaseExecuting。
flowState.StartDirectExecute()
// 1. 默认不走粗排与粗排后微调,避免沿用上轮遗留标记
// 2. 只有 route 判定为“需要粗排”且确实有 task_class_ids 时,才打开粗排开关。
// 3. 粗排后是否立即进入微调,完全由路由决策显式标记控制。
// 粗排开关逻辑
flowState.NeedsRoughBuild = false
flowState.NeedsRefineAfterRoughBuild = false
if decision.NeedsRoughBuild && len(flowState.TaskClassIDs) > 0 {
@@ -259,15 +407,17 @@ func handleRouteExecute(
flowState.NeedsRefineAfterRoughBuild = decision.NeedsRefineAfterRoughBuild
}
flowState.ExecuteThinking = effectiveThinking
return nil
}
// resolveAllowReorder 统一计算本轮是否允许打乱顺序
// resolveAllowReorder 统一计算"本轮是否允许打乱顺序"
//
// 步骤化说明:
// 1. 后端先做显式语义判定:用户明确允许/明确禁止时,直接以后端判定为准;
// 2. 若后端未识别到显式语义,再回退到路由模型的 allow_reorder 字段;
// 3. 默认返回 false确保保持顺序是系统默认行为。
// 3. 默认返回 false确保"保持顺序"是系统默认行为。
func resolveAllowReorder(userInput string, modelAllowReorder bool) bool {
switch detectReorderPreference(userInput) {
case reorderAllow:
@@ -279,11 +429,11 @@ func resolveAllowReorder(userInput string, modelAllowReorder bool) bool {
}
}
// detectReorderPreference 识别用户是否明确授权打乱顺序
// detectReorderPreference 识别用户是否"明确授权打乱顺序"
//
// 职责边界:
// 1. 只负责关键词级别的显式意图识别,不做复杂语义推理;
// 2. 若同时命中允许”与“禁止,优先按禁止处理,避免误放开顺序约束;
// 2. 若同时命中"允许"与"禁止",优先按"禁止"处理,避免误放开顺序约束;
// 3. 未命中显式表达时返回 unknown交给上层兜底策略。
func detectReorderPreference(userInput string) reorderPreference {
text := strings.ToLower(strings.TrimSpace(userInput))
@@ -332,12 +482,12 @@ func containsAnyPhrase(text string, phrases []string) bool {
return false
}
// shouldDisableRoughBuildForRefine 判断是否应在 chat 路由阶段关闭再次粗排
// shouldDisableRoughBuildForRefine 判断是否应在 chat 路由阶段关闭"再次粗排"
//
// 判定规则:
// 1. 当前决策未请求粗排时,直接不干预;
// 2. 上下文不存在 rough_build_done 时,不干预(首次粗排仍可走);
// 3. 若用户未明确要求重新粗排/从头重排,则关闭粗排开关,避免误触发。
// 3. 若用户未明确要求"重新粗排/从头重排",则关闭粗排开关,避免误触发。
func shouldDisableRoughBuildForRefine(
conversationContext *newagentmodel.ConversationContext,
userInput string,
@@ -364,7 +514,7 @@ func hasRoughBuildDoneMarker(conversationContext *newagentmodel.ConversationCont
return false
}
// isExplicitRoughBuildRequest 识别用户是否明确要求重新粗排/从头重排
// isExplicitRoughBuildRequest 识别用户是否明确要求"重新粗排/从头重排"
func isExplicitRoughBuildRequest(userInput string) bool {
text := strings.ToLower(strings.TrimSpace(userInput))
if text == "" {
@@ -388,80 +538,81 @@ func isExplicitRoughBuildRequest(userInput string) bool {
return containsAnyPhrase(text, keywords)
}
// handleDeepAnswer 处理复杂问答:推送过渡语 → 原地开 thinking 再调一次 LLM → 输出深度回答
func handleDeepAnswer(
// handleDeepAnswerStream 处理复杂问答:关闭路由流 → 第二次流式调用
//
// 步骤说明:
// 1. 关闭第一个路由流;
// 2. 发起第二次流式 LLM 调用thinking 由 effectiveThinking 控制);
// 3. 真流式推送 reasoning + 正文;
// 4. 完整回复写入 history。
func handleDeepAnswerStream(
ctx context.Context,
reader infrallm.StreamReader,
input ChatNodeInput,
decision *newagentmodel.ChatRoutingDecision,
conversationContext *newagentmodel.ConversationContext,
emitter *newagentstream.ChunkEmitter,
conversationContext *newagentmodel.ConversationContext,
flowState *newagentmodel.CommonState,
effectiveThinking bool,
) error {
// 1. 推送过渡语
briefSpeak := strings.TrimSpace(decision.Speak)
if briefSpeak == "" {
briefSpeak = "让我想想。"
}
if err := emitter.EmitPseudoAssistantText(
ctx, chatSpeakBlockID, chatStageName,
briefSpeak,
newagentstream.DefaultPseudoStreamOptions(),
); err != nil {
return fmt.Errorf("过渡文案推送失败: %w", err)
}
// 1. 关闭第一个路由流
_ = reader.Close()
// 2. 第二次 LLM 调用:开 thinking深度回答
// 2. 第二次流式调用
thinkingOpt := infrallm.ThinkingModeDisabled
if effectiveThinking {
thinkingOpt = infrallm.ThinkingModeEnabled
}
deepMessages := newagentprompt.BuildDeepAnswerMessages(conversationContext, input.UserInput)
deepResult, err := input.Client.GenerateText(ctx, deepMessages, infrallm.GenerateOptions{
deepReader, err := input.Client.Stream(ctx, deepMessages, infrallm.GenerateOptions{
Temperature: 0.5,
MaxTokens: 2000,
Thinking: infrallm.ThinkingModeEnabled,
Thinking: thinkingOpt,
Metadata: map[string]any{
"stage": chatStageName,
"phase": "deep_answer",
},
})
if err != nil || deepResult == nil {
// 深度回答失败 → 降级,只保留过渡语。
log.Printf("[WARN] deep answer LLM failed chat=%s err=%v", flowState.ConversationID, err)
conversationContext.AppendHistory(schema.AssistantMessage(briefSpeak, nil))
if err != nil {
// 深度回答失败 → 降级返回。
log.Printf("[WARN] deep answer stream failed chat=%s err=%v", flowState.ConversationID, err)
flowState.Phase = newagentmodel.PhaseChatting
return nil
}
// 3. 输出深度回答
deepText := strings.TrimSpace(deepResult.Text)
// 3. 真流式推送 reasoning + 正文
deepText, err := emitter.EmitStreamAssistantText(ctx, deepReader, chatSpeakBlockID, chatStageName)
_ = deepReader.Close()
if err != nil {
log.Printf("[WARN] deep answer stream emit error chat=%s err=%v", flowState.ConversationID, err)
flowState.Phase = newagentmodel.PhaseChatting
return nil
}
deepText = strings.TrimSpace(deepText)
if deepText == "" {
conversationContext.AppendHistory(schema.AssistantMessage(briefSpeak, nil))
flowState.Phase = newagentmodel.PhaseChatting
return nil
}
if err := emitter.EmitPseudoAssistantText(
ctx, chatSpeakBlockID, chatStageName,
deepText,
newagentstream.DefaultPseudoStreamOptions(),
); err != nil {
return fmt.Errorf("深度回答推送失败: %w", err)
}
// 将完整回复(过渡语 + 深度回答)写入 history。
fullReply := briefSpeak + "\n\n" + deepText
conversationContext.AppendHistory(schema.AssistantMessage(fullReply, nil))
// 4. 完整回复写入 history。
conversationContext.AppendHistory(schema.AssistantMessage(deepText, nil))
flowState.Phase = newagentmodel.PhaseChatting
return nil
}
// handleRoutePlan 处理复杂规划:推送确认语,设 PhasePlanning。
func handleRoutePlan(
decision *newagentmodel.ChatRoutingDecision,
// handleRoutePlanStream 处理规划路由:推送状态确认 → 设 PhasePlanning。
func handleRoutePlanStream(
reader infrallm.StreamReader,
emitter *newagentstream.ChunkEmitter,
flowState *newagentmodel.CommonState,
effectiveThinking bool,
speak string,
) error {
speak := strings.TrimSpace(decision.Speak)
if speak == "" {
// 关闭路由流。
_ = reader.Close()
if strings.TrimSpace(speak) == "" {
speak = "好的,让我来规划一下。"
}

View File

@@ -3,62 +3,71 @@ package newagentprompt
import (
"fmt"
"strings"
"time"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
"github.com/cloudwego/eino/schema"
)
const chatRoutingSystemPrompt = `
你是 SmartFlow 的智能路由器。你的职责是判断用户意图的复杂度,并决定后续处理路径
你会看到:
- 历史对话
- 用户本轮输入
- 当前可用工具摘要(如有)
- 本次排课涉及的任务类约束(如有)
请遵守以下规则:
1. 只输出严格 JSON不要输出 markdown不要输出额外解释。
2. 根据用户意图判断复杂度并选择路由。
3. speak 字段始终填写:给用户看的话。
你是 SmartFlow 的智能路由器。你的回复必须以路由控制码开头,控制码后紧跟用户可见的内容
路由规则:
- direct_reply纯闲聊、简单问答、打招呼、感谢等。speak 直接写你的完整回复。
- execute需要用工具处理的请求查询日程、移动课程、排课等但不需要先制定计划。speak 写简短确认。
- deep_answer复杂问题但不需要工具如分析建议、深度解释等需要深度思考后直接回答。speak 写过渡语(如"让我想想")。
- plan用户明确要求先制定计划或涉及多阶段复杂规划。speak 写确认
- direct_reply纯闲聊、简单问答、打招呼、感谢等。控制码后直接输出完整回复。
- execute需要用工具处理的请求查询日程、移动课程、排课等但不需要先制定计划。控制码后输出简短确认。
- deep_answer复杂问题但不需要工具如分析建议、深度解释等需要深度思考后回答。控制码后输出过渡语(如"让我想想")。
- plan用户明确要求先制定计划或涉及多阶段复杂规划。控制码后输出简短确认。
粗排判断:当用户意图包含"批量安排/排课/把任务类排进日程",且上下文中有任务类 ID 时,设置 needs_rough_build=true。
粗排判断:当用户意图包含"批量安排/排课/把任务类排进日程",且上下文中有任务类 ID 时,设置 rough_build=true。
二次粗排约束(强约束):
- 若上下文已出现 rough_build_done且用户未明确要求重新粗排/从头重排,必须设置 needs_rough_build=false。
- 移动/微调/优化/均匀化/调顺序等请求默认视为 refine不得再次触发 rough build。
- 若上下文已出现 rough_build_done且用户未明确要求"重新粗排/从头重排",必须设置 rough_build=false。
- "移动/微调/优化/均匀化/调顺序"等请求默认视为 refine不得再次触发 rough build。
粗排后微调判断:
- 仅当 needs_rough_build=true 时才判断 needs_refine_after_rough_build
- 若用户明确提出优化目标/偏好(如"尽量均衡""周三别太满""某门课往后挪"),设 needs_refine_after_rough_build=true。
- 若用户只要求"先排进去/给初稿",未提出微调目标,设 needs_refine_after_rough_build=false。
- 仅当 rough_build=true 时才判断 refine
- 若用户明确提出优化目标/偏好(如"尽量均衡""周三别太满""某门课往后挪"),设 refine=true。
- 若用户只要求"先排进去/给初稿",未提出微调目标,设 refine=false。
顺序授权判断:
- allow_reorder 仅在用户明确说明允许打乱顺序/顺序不重要时才为 true。
- 用户明确要求保持顺序/不要打乱时必须为 false。
- reorder 仅在用户明确说明"允许打乱顺序/顺序不重要"时才为 true。
- 用户明确要求"保持顺序/不要打乱"时必须为 false。
- 若用户未明确提及顺序,一律为 false。
深度思考判断:
- thinking 仅在 route=execute 时有效。
- 当用户请求涉及复杂推理、多条件约束、需要深度分析后才能执行的操作时,设 thinking=true。
- 简单查询、单步操作设 thinking=false。
输出协议(严格 JSON
{"route":"direct_reply / execute / deep_answer / plan","speak":"给用户看的话","needs_rough_build":false,"needs_refine_after_rough_build":false,"allow_reorder":false,"reason":"简短判断依据"}
输出格式(严格两段式
第一段(控制码,用户不可见,后端会截取):
<SMARTFLOW_ROUTE nonce="给定nonce" route="direct_reply|execute|deep_answer|plan" rough_build="false" refine="false" reorder="false" thinking="false"/>
第二段(紧接控制码之后,用户可见):
根据路由输出对应内容。
属性说明(仅 route=execute 时有效,其余路由省略这些属性):
- rough_build是否需要粗排
- refine粗排后是否需要微调
- reorder是否允许打乱顺序
- thinking后续执行阶段是否需要深度思考
合法示例:
{"route":"direct_reply","speak":"你好!我是 SmartFlow 助手,有什么可以帮你的?","reason":"用户打招呼"}
<SMARTFLOW_ROUTE nonce="给定nonce" route="direct_reply"/>
你好!我是 SmartFlow 助手,有什么可以帮你的?
{"route":"execute","speak":"好的,我来帮你看看今天的安排。","reason":"需要调用工具查询日程","needs_rough_build":false,"needs_refine_after_rough_build":false,"allow_reorder":false}
<SMARTFLOW_ROUTE nonce="给定nonce" route="execute"/>
好的,我来帮你看看今天的安排。
{"route":"execute","speak":"好的,我来帮你排课。","reason":"批量排课需求,有任务类 ID未给微调偏好","needs_rough_build":true,"needs_refine_after_rough_build":false,"allow_reorder":false}
<SMARTFLOW_ROUTE nonce="给定nonce" route="execute" rough_build="true" refine="false" reorder="false" thinking="false"/>
好的,我来帮你排课。
{"route":"execute","speak":"好的,我来帮你排课并按你的偏好做微调。","reason":"批量排课需求,有任务类 ID且给出明确微调偏好","needs_rough_build":true,"needs_refine_after_rough_build":true,"allow_reorder":false}
<SMARTFLOW_ROUTE nonce="给定nonce" route="execute" rough_build="true" refine="true" reorder="false" thinking="true"/>
好的,我来帮你排课并按你的偏好做微调。
{"route":"execute","speak":"好的,我按你的要求重排。","reason":"用户明确允许打乱顺序","needs_rough_build":false,"needs_refine_after_rough_build":false,"allow_reorder":true}
<SMARTFLOW_ROUTE nonce="给定nonce" route="deep_answer"/>
这是个好问题,让我仔细想想。
{"route":"deep_answer","speak":"这是个好问题,让我仔细想想。","reason":"需要深度分析但不需要工具"}
<SMARTFLOW_ROUTE nonce="给定nonce" route="plan"/>
明白,我来帮你制定一个完整的学习计划。
{"route":"plan","speak":"明白,我来帮你制定一个完整的学习计划。","reason":"用户明确要求制定计划"}
禁止输出任何 JSON、markdown 代码块或额外解释。nonce 必须精确使用给定值。
`
// BuildChatRoutingSystemPrompt 返回路由阶段的系统提示词。
@@ -67,22 +76,21 @@ func BuildChatRoutingSystemPrompt() string {
}
// BuildChatRoutingMessages 组装路由阶段的 messages。
func BuildChatRoutingMessages(ctx *newagentmodel.ConversationContext, userInput string, state *newagentmodel.CommonState) []*schema.Message {
func BuildChatRoutingMessages(ctx *newagentmodel.ConversationContext, userInput string, state *newagentmodel.CommonState, nonce string) []*schema.Message {
return buildStageMessages(
BuildChatRoutingSystemPrompt(),
ctx,
BuildChatRoutingUserPrompt(ctx, userInput, state),
BuildChatRoutingUserPrompt(ctx, userInput, state, nonce),
)
}
// BuildChatRoutingUserPrompt 构造路由阶段的用户提示词。
func BuildChatRoutingUserPrompt(ctx *newagentmodel.ConversationContext, userInput string, state *newagentmodel.CommonState) string {
func BuildChatRoutingUserPrompt(ctx *newagentmodel.ConversationContext, userInput string, state *newagentmodel.CommonState, nonce string) string {
var sb strings.Builder
sb.WriteString("请判断用户本轮意图的复杂度,并选择最合适的路由。\n")
sb.WriteString("若 route=execute 且 needs_rough_build=true请同时判断 needs_refine_after_rough_build")
sb.WriteString("只有用户明确提出微调目标时才为 true。\n")
sb.WriteString("请同时输出 allow_reorder只有用户明确授权打乱顺序时才为 true默认 false。\n")
sb.WriteString(fmt.Sprintf("nonce=%s\n", nonce))
sb.WriteString(fmt.Sprintf("当前时间=%s\n", time.Now().In(time.Local).Format("2006-01-02 15:04")))
sb.WriteString("\n请判断用户本轮意图的复杂度选择最合适的路由并输出控制码和对应内容。\n")
// 注入任务类上下文(供粗排判断参考)。
if state != nil && len(state.TaskClassIDs) > 0 {

View File

@@ -0,0 +1,164 @@
package newagentrouter
import (
"fmt"
"regexp"
"strings"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
)
var (
// chatRouteHeaderRegex 从模型流式输出中解析 SMARTFLOW_ROUTE 控制码头部。
//
// 格式示例:
// <SMARTFLOW_ROUTE nonce="abc" route="execute" rough_build="true" refine="false" reorder="false" thinking="true"/>
//
// 属性说明:
// 1. nonce防注入校验必须与调用方传入的 nonce 精确匹配;
// 2. route路由目标direct_reply / execute / deep_answer / plan
// 3. rough_build可选仅 route=execute 时有效,默认 false
// 4. refine可选仅 rough_build=true 时有效,默认 false
// 5. reorder可选仅 route=execute 时有效,默认 false
// 6. thinking可选仅 route=execute 时有效,默认 false。
chatRouteHeaderRegex = regexp.MustCompile(
`(?is)<\s*SMARTFLOW_ROUTE\b` +
`[^>]*\bnonce\s*=\s*["']?([a-zA-Z0-9\-]+)["']?` +
`[^>]*\broute\s*=\s*["']?(direct_reply|execute|deep_answer|plan)["']?` +
`(?:[^>]*\brough_build\s*=\s*["']?(true|false)["']?)?` +
`(?:[^>]*\brefine\s*=\s*["']?(true|false)["']?)?` +
`(?:[^>]*\breorder\s*=\s*["']?(true|false)["']?)?` +
`(?:[^>]*\bthinking\s*=\s*["']?(true|false)["']?)?` +
`[^>]*/\s*>`)
)
// StreamRouteParser 从 LLM 流式输出中增量提取路由决策。
//
// 协议约定:模型输出以 SMARTFLOW_ROUTE 控制码标签开头,标签结束后是用户可见内容。
// 例如:<SMARTFLOW_ROUTE nonce="abc" route="direct_reply"/>你好!很高兴见到你...
//
// 职责边界:
// 1. 只负责从流式 chunk 中提取控制码并解析为 ChatRoutingDecision
// 2. 不负责推送 SSE chunk不负责决定后续走哪条链路
// 3. 控制码解析失败时标记 fallback由上层决定降级策略。
type StreamRouteParser struct {
buf strings.Builder
nonce string
routeFound bool
decision *newagentmodel.ChatRoutingDecision
}
// NewStreamRouteParser 创建流式路由解析器。
func NewStreamRouteParser(nonce string) *StreamRouteParser {
return &StreamRouteParser{
nonce: strings.ToLower(strings.TrimSpace(nonce)),
}
}
// Feed 写入一段 chunk content。
//
// 返回值:
// - visible控制码标签之后的内容用户可见文本
// - routeReady路由决策是否已确定
// - err解析错误。
//
// 调用方应在 routeReady=true 后调用 Decision() 获取路由决策,
// 并根据 route 进入对应分支处理 visible 及后续 chunk。
func (p *StreamRouteParser) Feed(content string) (visible string, routeReady bool, err error) {
if p.routeFound {
// 路由已解析,后续 chunk 直接透传。
return content, true, nil
}
p.buf.WriteString(content)
text := p.buf.String()
match := chatRouteHeaderRegex.FindStringSubmatchIndex(text)
if match == nil {
// 控制码尚未完整,检查是否应该 fallback。
if len(text) > 500 {
// 超过 500 字符仍未匹配到控制码 -> fallback 到 plan。
p.routeFound = true
p.decision = &newagentmodel.ChatRoutingDecision{
Route: newagentmodel.ChatRoutePlan,
Raw: text,
}
return text, true, fmt.Errorf("控制码解析超时fallback 到 plan")
}
return "", false, nil
}
// 提取匹配到的子组。
groups := chatRouteHeaderRegex.FindStringSubmatch(text)
if len(groups) < 3 {
return "", false, fmt.Errorf("控制码正则子组不足: %d", len(groups))
}
// nonce 校验。
parsedNonce := strings.ToLower(strings.TrimSpace(groups[1]))
if parsedNonce != p.nonce {
return "", false, fmt.Errorf("nonce 不匹配: got=%s expected=%s", parsedNonce, p.nonce)
}
// 解析 route。
route := newagentmodel.ChatRoute(strings.TrimSpace(groups[2]))
// 解析可选布尔属性(默认 false
roughBuild := parseOptionalBool(groups, 3)
refine := parseOptionalBool(groups, 4)
reorder := parseOptionalBool(groups, 5)
thinking := parseOptionalBool(groups, 6)
p.decision = &newagentmodel.ChatRoutingDecision{
Route: route,
NeedsRoughBuild: roughBuild,
NeedsRefineAfterRoughBuild: refine,
AllowReorder: reorder,
Thinking: thinking,
Raw: groups[0],
}
// 归一化与校验。
if validateErr := p.decision.Validate(); validateErr != nil {
// 校验失败 -> fallback 到 plan。
p.decision.Route = newagentmodel.ChatRoutePlan
p.decision.NeedsRoughBuild = false
p.decision.NeedsRefineAfterRoughBuild = false
p.decision.AllowReorder = false
p.decision.Thinking = false
}
p.routeFound = true
// 控制码标签之后的文本作为 visible 返回。
fullMatch := groups[0]
tagEndIdx := strings.Index(text, fullMatch)
if tagEndIdx >= 0 {
afterTag := text[tagEndIdx+len(fullMatch):]
// 去掉标签后紧跟的换行符(如果有)。
afterTag = strings.TrimPrefix(afterTag, "\r\n")
afterTag = strings.TrimPrefix(afterTag, "\n")
return afterTag, true, nil
}
return "", true, nil
}
// RouteReady 返回路由决策是否已确定。
func (p *StreamRouteParser) RouteReady() bool {
return p.routeFound
}
// Decision 返回已解析的路由决策RouteReady=true 后可用)。
func (p *StreamRouteParser) Decision() *newagentmodel.ChatRoutingDecision {
return p.decision
}
// parseOptionalBool 从正则子组中解析可选布尔值。
// 如果子组不存在或为空,返回 false。
func parseOptionalBool(groups []string, index int) bool {
if index >= len(groups) {
return false
}
return strings.TrimSpace(groups[index]) == "true"
}

View File

@@ -3,19 +3,22 @@ package newagentstream
import (
"context"
"fmt"
"io"
"strings"
"time"
infrallm "github.com/LoveLosita/smartflow/backend/infra/llm"
)
// PayloadEmitter 是真正向外层 SSE 管道写 chunk 的最小接口。
//
// 说明:
// 1. 这里刻意不用 chan/string 绑死实现;
// 2. 上层既可以传写 channel的函数,也可以传写 gin stream的函数;
// 2. 上层既可以传"写 channel"的函数,也可以传"写 gin stream"的函数;
// 3. 只要签名是 `func(string) error`,都能接进来。
type PayloadEmitter func(payload string) error
// StageEmitter 是 graph/node 对当前阶段进行推送的兼容接口。
// StageEmitter 是 graph/node 对"当前阶段"进行推送的兼容接口。
//
// 设计说明:
// 1. 旧调用侧仍然只关心 stage/detail 两段文本,因此这里先保留;
@@ -23,7 +26,7 @@ type PayloadEmitter func(payload string) error
// 3. 这样能兼顾当前兼容性和后续协议升级空间。
type StageEmitter func(stage, detail string)
// PseudoStreamOptions 描述整段文字伪流式输出的切块与节奏配置。
// PseudoStreamOptions 描述"整段文字伪流式输出"的切块与节奏配置。
//
// 字段语义:
// 1. MinChunkRunes达到该最小长度后若命中标点/换行等边界,可提前切块;
@@ -51,7 +54,7 @@ func DefaultPseudoStreamOptions() PseudoStreamOptions {
// ChunkEmitter 是 newAgent 统一的 SSE chunk 发射器。
//
// 职责边界:
// 1. 负责把正文 / 思考 / 工具事件 / 确认请求 / 中断提示统一转换成 OpenAI 兼容 payload
// 1. 负责把"正文 / 思考 / 工具事件 / 确认请求 / 中断提示"统一转换成 OpenAI 兼容 payload
// 2. 负责在必要时把结构化事件附带成 extra同时给当前前端提供可读的降级文本
// 3. 不负责决定什么时候发什么,也不负责持久化状态。
type ChunkEmitter struct {
@@ -365,7 +368,92 @@ func (e *ChunkEmitter) EmitDone() error {
return e.emit("[DONE]")
}
// EmitStageAsReasoning 把“阶段提示”伪装成 reasoning chunk 推给前端
// EmitStreamAssistantText 从 StreamReader 逐 chunk 读取并实时推送 assistant 正文
//
// 职责边界:
// 1. 负责把 StreamReader 的每个 chunk 实时转换为 SSE payload 推送;
// 2. 负责累计完整文本并返回,供调用方写入 history
// 3. 不负责打开/关闭 StreamReader调用方负责生命周期管理。
func (e *ChunkEmitter) EmitStreamAssistantText(
ctx context.Context,
reader infrallm.StreamReader,
blockID, stage string,
) (string, error) {
if e == nil || reader == nil {
return "", nil
}
var fullText strings.Builder
firstChunk := true
for {
chunk, err := reader.Recv()
if err != nil {
if err == io.EOF {
break
}
return fullText.String(), err
}
// 推送 reasoning content。
if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
if emitErr := e.EmitReasoningText(blockID, stage, chunk.ReasoningContent, firstChunk); emitErr != nil {
return fullText.String(), emitErr
}
firstChunk = false
}
// 推送 assistant 正文。
if chunk != nil && chunk.Content != "" {
if emitErr := e.EmitAssistantText(blockID, stage, chunk.Content, firstChunk); emitErr != nil {
return fullText.String(), emitErr
}
fullText.WriteString(chunk.Content)
firstChunk = false
}
}
return fullText.String(), nil
}
// EmitStreamReasoningText 从 StreamReader 逐 chunk 读取并实时推送 reasoning 文字。
//
// 与 EmitStreamAssistantText 结构相同,但只推送 ReasoningContent不推送 Content。
// 用于只需展示思考过程而无需展示正文的场景。
func (e *ChunkEmitter) EmitStreamReasoningText(
ctx context.Context,
reader infrallm.StreamReader,
blockID, stage string,
) (string, error) {
if e == nil || reader == nil {
return "", nil
}
var fullText strings.Builder
firstChunk := true
for {
chunk, err := reader.Recv()
if err != nil {
if err == io.EOF {
break
}
return fullText.String(), err
}
if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
if emitErr := e.EmitReasoningText(blockID, stage, chunk.ReasoningContent, firstChunk); emitErr != nil {
return fullText.String(), emitErr
}
fullText.WriteString(chunk.ReasoningContent)
firstChunk = false
}
}
return fullText.String(), nil
}
// EmitStageAsReasoning 把"阶段提示"伪装成 reasoning chunk 推给前端。
//
// 兼容说明:
// 1. 保留旧函数签名,方便当前旧链路直接复用;
@@ -378,7 +466,7 @@ func EmitStageAsReasoning(emit PayloadEmitter, requestID, modelName string, crea
// EmitAssistantReply 把一段完整正文作为 assistant chunk 推出。
//
// 注意:
// 1. 这里保持整段发,不主动切块;
// 1. 这里保持"整段发",不主动切块;
// 2. 若后续某条链路需要更自然的阅读节奏,应直接调用 EmitPseudoAssistantText
// 3. 为兼容老调用侧,这里 blockID 和 stage 都留空。
func EmitAssistantReply(emit PayloadEmitter, requestID, modelName string, created int64, content string, includeRole bool) error {
@@ -493,7 +581,7 @@ func (e *ChunkEmitter) emitPseudoText(ctx context.Context, text string, options
return nil
}
// SplitPseudoStreamText 按标点优先、长度兜底的策略切分整段文本。
// SplitPseudoStreamText 按"标点优先、长度兜底"的策略切分整段文本。
//
// 步骤说明:
// 1. 优先在句号、问号、感叹号、分号、换行等自然边界切块,保证阅读顺畅;