Version: 0.9.37.dev.260423

后端:
1. Plan / Execute / Deliver 三节点真流式输出——替换 GenerateJSON/GenerateText 为 Client.Stream + 两阶段流式解析
- newAgent/router/decision_parser.go:新增 StreamDecisionParser,从 LLM 流中增量提取 <SMARTFLOW_DECISION> 标签内 JSON,标签后文本作为用户可见正文逐 token 返回;含 9 项单测覆盖正常提取、跨 chunk 拆分、fallback、解析失败、空正文等场景
- newAgent/node/deliver.go:GenerateText 替换为 Client.Stream + EmitStreamAssistantText 真流式推送,降级/机械路径仍走伪流式
- newAgent/node/plan.go:GenerateJSON 替换为 Client.Stream + DecisionParser 两阶段流式,thinking 内容独立推流,speak 正文逐 token 推送
- newAgent/node/execute.go:同上两阶段流式改造,保留完整 correction 机制(ConsecutiveCorrections / tool_call 数组检测 / 空文本回退),speak 推送段删除  EmitPseudoAssistantText
- newAgent/prompt/plan.go + execute.go:系统提示词与输出协议从"只输出严格 JSON"改为 SMARTFLOW_DECISION 两阶段格式(标签内 JSON + 标签后自然语言正文),移除 speak 字段

2. 前端零改动——EmitAssistantText 产出的 SSE chunk 格式与伪流式完全一致,前端无需适配
This commit is contained in:
LoveLosita
2026-04-23 16:28:45 +08:00
parent 3c2f3c0b71
commit 7b37db64eb
6 changed files with 556 additions and 277 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"regexp"
"strconv"
@@ -13,6 +14,7 @@ import (
infrallm "github.com/LoveLosita/smartflow/backend/infra/llm"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt"
newagentrouter "github.com/LoveLosita/smartflow/backend/newAgent/router"
newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream"
newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools"
"github.com/LoveLosita/smartflow/backend/newAgent/tools/schedule"
@@ -192,13 +194,14 @@ func RunExecuteNode(ctx context.Context, input ExecuteNodeInput) error {
})
logNodeLLMContext(executeStageName, "decision", flowState, messages)
decision, rawResult, err := infrallm.GenerateJSON[newagentmodel.ExecuteDecision](
// 两阶段流式执行:从 LLM 流中先提取 <SMARTFLOW_DECISION> 决策标签,再流式推送 speak 正文。
reader, err := input.Client.Stream(
ctx,
input.Client,
messages,
infrallm.GenerateOptions{
Temperature: 1.0, // thinking 模式强制要求 temperature=1
MaxTokens: 16000, // 需为 thinking chain 留出足够预算
Temperature: 1.0,
MaxTokens: 16000,
Thinking: resolveThinkingMode(input.ThinkingEnabled),
Metadata: map[string]any{
"stage": executeStageName,
@@ -207,14 +210,48 @@ func RunExecuteNode(ctx context.Context, input ExecuteNodeInput) error {
},
},
)
// 提前捕获原始文本,用于日志和 correction。
rawText := ""
if rawResult != nil {
rawText = strings.TrimSpace(rawResult.Text)
if err != nil {
return fmt.Errorf("执行阶段 Stream 调用失败: %w", err)
}
if err != nil {
if rawText != "" {
parser := newagentrouter.NewStreamDecisionParser()
firstChunk := true
var decision *newagentmodel.ExecuteDecision
var fullText strings.Builder
rawText := ""
// 阶段一:解析决策标签。
for {
chunk, recvErr := reader.Recv()
if recvErr == io.EOF {
break
}
if recvErr != nil {
log.Printf("[WARN] execute stream recv error chat=%s err=%v", flowState.ConversationID, recvErr)
break
}
if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
if emitErr := emitter.EmitReasoningText(executeSpeakBlockID, executeStageName, chunk.ReasoningContent, firstChunk); emitErr != nil {
return fmt.Errorf("执行 thinking 推送失败: %w", emitErr)
}
firstChunk = false
}
content := ""
if chunk != nil {
content = chunk.Content
}
visible, ready, _ := parser.Feed(content)
if !ready {
continue
}
result := parser.Result()
rawText = result.RawBuffer
if result.Fallback || result.ParseFailed {
log.Printf("[DEBUG] execute LLM 输出解析失败 chat=%s round=%d raw=%s",
flowState.ConversationID, flowState.RoundUsed, rawText)
flowState.ConsecutiveCorrections++
@@ -222,23 +259,71 @@ func RunExecuteNode(ctx context.Context, input ExecuteNodeInput) error {
return fmt.Errorf("连续 %d 次输出非 JSON终止执行: 原始输出=%s",
flowState.ConsecutiveCorrections, rawText)
}
// 区分两种常见失败:
// 1. tool_call 是数组LLM 想批量调工具)→ 告知只能单次调用,保留已有上下文;
// 2. 真正的 JSON 格式损坏 → 要求重新输出合法 JSON。
var errorDesc, optionHint string
if strings.Contains(rawText, `"tool_call": [`) || strings.Contains(rawText, `"tool_call":[`) {
errorDesc = "你在 tool_call 字段传入了数组,但每轮只能调用一个工具,不支持批量格式。"
optionHint = "请把多个工具调用拆开,每轮只调一个,拿到结果后再继续下一步。示例:{\"speak\":\"...\",\"action\":\"continue\",\"reason\":\"...\",\"tool_call\":{\"name\":\"get_task_info\",\"arguments\":{\"task_id\":1}}}"
optionHint = "请把多个工具调用拆开,每轮只调一个,拿到结果后再继续下一步。"
} else {
errorDesc = "你的输出不是合法 JSON,无法解析。"
optionHint = "你必须输出严格的 JSON 格式。合法格式示例:{\"speak\":\"...\",\"action\":\"continue\",\"reason\":\"...\",\"tool_call\":{\"name\":\"工具名\",\"arguments\":{}}}"
errorDesc = "你的输出不包含合法的 SMARTFLOW_DECISION 标签,无法解析。"
optionHint = "你必须输出 <SMARTFLOW_DECISION>{JSON}</SMARTFLOW_DECISION>,然后在标签后输出正文。"
}
AppendLLMCorrectionWithHint(conversationContext, rawText, errorDesc, optionHint)
return nil
}
// 模型返回空文本(常见原因:上下文过长、模型异常),走 correction 重试而非直接 fatal。
if strings.Contains(err.Error(), "empty text") {
var parseErr error
decision, parseErr = infrallm.ParseJSONObject[newagentmodel.ExecuteDecision](result.DecisionJSON)
if parseErr != nil {
log.Printf("[DEBUG] execute LLM JSON 解析失败 chat=%s round=%d json=%s raw=%s",
flowState.ConversationID, flowState.RoundUsed, result.DecisionJSON, rawText)
flowState.ConsecutiveCorrections++
if flowState.ConsecutiveCorrections >= maxConsecutiveCorrections {
return fmt.Errorf("连续 %d 次输出非 JSON终止执行: 原始输出=%s",
flowState.ConsecutiveCorrections, rawText)
}
AppendLLMCorrectionWithHint(conversationContext, rawText,
"决策标签内的 JSON 格式不合法。",
"请确保 <SMARTFLOW_DECISION> 标签内是合法 JSON然后用标签后输出正文。")
return nil
}
// 阶段二:流式推送 speak同一 reader 继续读取)。
if visible != "" {
if emitErr := emitter.EmitAssistantText(executeSpeakBlockID, executeStageName, visible, firstChunk); emitErr != nil {
return fmt.Errorf("执行文案推送失败: %w", emitErr)
}
fullText.WriteString(visible)
firstChunk = false
}
for {
chunk2, recvErr2 := reader.Recv()
if recvErr2 == io.EOF {
break
}
if recvErr2 != nil {
log.Printf("[WARN] execute speak stream error chat=%s err=%v", flowState.ConversationID, recvErr2)
break
}
if chunk2 == nil {
continue
}
if strings.TrimSpace(chunk2.ReasoningContent) != "" {
_ = emitter.EmitReasoningText(executeSpeakBlockID, executeStageName, chunk2.ReasoningContent, false)
}
if chunk2.Content != "" {
if emitErr := emitter.EmitAssistantText(executeSpeakBlockID, executeStageName, chunk2.Content, firstChunk); emitErr != nil {
return fmt.Errorf("执行文案推送失败: %w", emitErr)
}
fullText.WriteString(chunk2.Content)
firstChunk = false
}
}
break
}
// 流结束但未找到决策标签。
if decision == nil {
if strings.TrimSpace(rawText) == "" {
log.Printf("[WARN] execute LLM 返回空文本 chat=%s round=%d consecutive=%d/%d",
flowState.ConversationID, flowState.RoundUsed,
flowState.ConsecutiveCorrections+1, maxConsecutiveCorrections)
@@ -250,15 +335,16 @@ func RunExecuteNode(ctx context.Context, input ExecuteNodeInput) error {
conversationContext,
"",
"模型没有返回任何内容。",
"请重新输出合法 JSON 格式的执行决策。",
"请重新输出 <SMARTFLOW_DECISION>{JSON}</SMARTFLOW_DECISION> 格式的执行决策。",
)
return nil
}
return fmt.Errorf("执行阶段模型调用失败: %w", err)
return fmt.Errorf("执行阶段流结束但未提取到决策标签")
}
// 调试日志:输出 LLM 原始返回和解析后的决策,方便排查。
decision.Speak = fullText.String()
// 调试日志:输出解析后的决策,方便排查。
log.Printf("[DEBUG] execute LLM 响应 chat=%s round=%d action=%s speak_len=%d raw_len=%d raw_preview=%.200s",
flowState.ConversationID, flowState.RoundUsed,
decision.Action, len(decision.Speak), len(rawText), rawText)
@@ -338,34 +424,19 @@ func RunExecuteNode(ctx context.Context, input ExecuteNodeInput) error {
}
}
// 6. speak 推流与历史写入。
//
// AlwaysExecute=true 时confirm 动作不走确认卡片speak 和 continue 一样直接推流;
// AlwaysExecute=false 时confirm 的 speak 不推流(由确认卡片展示),但仍写入历史,
// 防止 LLM 下一轮忘记自己的计划,形成重复确认循环。
speakText := decision.Speak // 已由 normalizeSpeak 处理,末尾含 \n
// 6. speak 已在流式循环中推送,此处仅做持久化与历史写入。
speakText := decision.Speak
if speakText != "" {
isConfirmWithCard := decision.Action == newagentmodel.ExecuteActionConfirm && !input.AlwaysExecute
isAskUser := decision.Action == newagentmodel.ExecuteActionAskUser
isAbort := decision.Action == newagentmodel.ExecuteActionAbort
if !isConfirmWithCard && !isAskUser && !isAbort {
// 推流给前端
msg := schema.AssistantMessage(speakText, nil)
if err := emitter.EmitPseudoAssistantText(
ctx,
executeSpeakBlockID,
executeStageName,
speakText,
newagentstream.DefaultPseudoStreamOptions(),
); err != nil {
return fmt.Errorf("执行文案推送失败: %w", err)
}
persistVisibleAssistantMessage(ctx, input.PersistVisibleMessage, flowState, msg)
}
// 1. confirm / ask_user 的 speak 仍写入历史,避免下一轮 LLM 丢失自己的执行上下文
// 2. abort 不在这里写历史,避免先输出中间 speak再在 deliver 收到第二份终止文案。
// 3. ask_user 只是不在这里伪流式推送,真正的对外展示仍由 PendingInteraction.DisplayText 承担。
// confirm / ask_user 的 speak 仍写入历史,避免下一轮 LLM 丢失上下文
// abort 不写历史,避免 deliver 终止文案冲突
if !isAbort {
conversationContext.AppendHistory(&schema.Message{
Role: schema.Assistant,