diff --git a/backend/newAgent/graph/common_graph.go b/backend/newAgent/graph/common_graph.go index a42c179..b83720d 100644 --- a/backend/newAgent/graph/common_graph.go +++ b/backend/newAgent/graph/common_graph.go @@ -5,6 +5,7 @@ import ( "errors" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" + newagentnode "github.com/LoveLosita/smartflow/backend/newAgent/node" "github.com/cloudwego/eino/compose" ) @@ -19,23 +20,25 @@ const ( NodeDeliver = "deliver" ) -func RunAgentGraph(ctx context.Context, state *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { +func RunAgentGraph(ctx context.Context, input newagentmodel.AgentGraphRunInput) (*newagentmodel.AgentGraphState, error) { + state := newagentmodel.NewAgentGraphState(input) if state == nil { - return nil, errors.New("agent graph: state is nil") + return nil, errors.New("agent graph: graph state is nil") } - flowState := state.EnsureCommonState() + flowState := state.EnsureFlowState() if flowState == nil { - return nil, errors.New("agent graph: common state is nil") + return nil, errors.New("agent graph: flow state is nil") } - g := compose.NewGraph[*newagentmodel.AgentRuntimeState, *newagentmodel.AgentRuntimeState]() + nodes := newagentnode.NewAgentNodes() + g := compose.NewGraph[*newagentmodel.AgentGraphState, *newagentmodel.AgentGraphState]() // --- 注册节点 --- if err := g.AddLambdaNode(NodeChat, compose.InvokableLambda(chatNode)); err != nil { return nil, err } - if err := g.AddLambdaNode(NodePlan, compose.InvokableLambda(planNode)); err != nil { + if err := g.AddLambdaNode(NodePlan, compose.InvokableLambda(nodes.Plan)); err != nil { return nil, err } if err := g.AddLambdaNode(NodeConfirm, compose.InvokableLambda(confirmNode)); err != nil { @@ -127,41 +130,32 @@ func RunAgentGraph(ctx context.Context, state *newagentmodel.AgentRuntimeState) return runnable.Invoke(ctx, state) } -// --- 占位节点,后续由 node 层替换 --- +// --- 占位节点,后续逐步由 node 层替换 --- -func chatNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { +func chatNode(_ context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) { if st == nil { return nil, errors.New("chat node: state is nil") } - st.EnsureCommonState() + st.EnsureFlowState() + st.EnsureConversationContext() + st.EnsureChunkEmitter() // TODO: // 1. 识别当前请求是普通聊天、首次任务进入,还是从 pending interaction 恢复。 // 2. 若只是普通聊天,则生成回复并把 Phase 设为 PhaseChatting,后续直接 END。 // 3. 若识别到任务意图,则把 Phase 切到 planning / waiting_confirm / executing 对应阶段。 // 4. 若本轮是恢复请求,则这里只负责吞掉用户最新输入并准备恢复,不再重复输出闲聊回复。 + // 5. 后续 chatNode 可直接读取 st.Request.UserInput、st.ConversationContext 与 st.Deps.ResolveChatClient()。 return st, nil } -func planNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { - if st == nil { - return nil, errors.New("plan node: state is nil") - } - st.EnsureCommonState() - - // TODO: - // 1. 每轮把“完整 plan + 当前步骤 + 置顶上下文”注入给 LLM,让模型只补一步规划。 - // 2. 若缺少关键信息,则调用 st.OpenAskUserInteraction(...) 打开 ask_user 中断。 - // 3. 若规划已经完整,则调用 st.FinishPlan(steps),把流程切到 waiting_confirm。 - // 4. 若规划未完成,则保持 PhasePlanning,分支回到 plan 继续循环。 - return st, nil -} - -func confirmNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { +func confirmNode(_ context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) { if st == nil { return nil, errors.New("confirm node: state is nil") } - st.EnsureCommonState() + st.EnsureFlowState() + st.EnsureConversationContext() + st.EnsureChunkEmitter() // TODO: // 1. 这里不再做“confirm 节点内自循环等待”,而是统一走中断恢复模式。 @@ -171,14 +165,18 @@ func confirmNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newag return st, nil } -func executeNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { +func executeNode(_ context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) { if st == nil { return nil, errors.New("execute node: state is nil") } - flowState := st.EnsureCommonState() + flowState := st.EnsureFlowState() + st.EnsureConversationContext() + st.EnsureChunkEmitter() // TODO: // 1. 让 LLM 在“当前步骤”约束下做一轮 ReAct:思考 → 调工具/观察 → reflection。 + // 1.1 执行阶段所需上下文应直接从 st.ConversationContext 读取。 + // 1.2 执行阶段模型依赖应通过 st.Deps.ResolveExecuteClient() 获取。 // 2. 若执行中发现缺少关键用户信息,则调用 st.OpenAskUserInteraction(...) 并走 interrupt。 // 3. 若命中写工具确认闸门: // 3.1 若走同连接确认,则把 Phase 置为 waiting_confirm 并跳到 confirm; @@ -188,11 +186,13 @@ func executeNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newag return st, nil } -func interruptNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { +func interruptNode(_ context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) { if st == nil { return nil, errors.New("interrupt node: state is nil") } - st.EnsureCommonState() + st.EnsureFlowState() + st.EnsureConversationContext() + st.EnsureChunkEmitter() // TODO: // 1. 若 PendingInteraction.Type=ask_user,则像普通聊天一样流式吐出问题文本。 @@ -202,11 +202,13 @@ func interruptNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*new return st, nil } -func deliverNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { +func deliverNode(_ context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) { if st == nil { return nil, errors.New("deliver node: state is nil") } - flowState := st.EnsureCommonState() + flowState := st.EnsureFlowState() + st.EnsureConversationContext() + st.EnsureChunkEmitter() // TODO: 将执行结果推给用户,并在所有外部落库完成后再标记 done。 flowState.Done() @@ -215,12 +217,12 @@ func deliverNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newag // --- 分支函数 --- -func branchAfterChat(_ context.Context, st *newagentmodel.AgentRuntimeState) (string, error) { +func branchAfterChat(_ context.Context, st *newagentmodel.AgentGraphState) (string, error) { if nextNode, interrupted := branchIfInterrupted(st); interrupted { return nextNode, nil } - flowState := st.EnsureCommonState() + flowState := st.EnsureFlowState() switch flowState.Phase { case newagentmodel.PhasePlanning: return NodePlan, nil @@ -236,24 +238,24 @@ func branchAfterChat(_ context.Context, st *newagentmodel.AgentRuntimeState) (st } } -func branchAfterPlan(_ context.Context, st *newagentmodel.AgentRuntimeState) (string, error) { +func branchAfterPlan(_ context.Context, st *newagentmodel.AgentGraphState) (string, error) { if nextNode, interrupted := branchIfInterrupted(st); interrupted { return nextNode, nil } - flowState := st.EnsureCommonState() + flowState := st.EnsureFlowState() if flowState.Phase == newagentmodel.PhaseWaitingConfirm { return NodeConfirm, nil } return NodePlan, nil } -func branchAfterConfirm(_ context.Context, st *newagentmodel.AgentRuntimeState) (string, error) { +func branchAfterConfirm(_ context.Context, st *newagentmodel.AgentGraphState) (string, error) { if nextNode, interrupted := branchIfInterrupted(st); interrupted { return nextNode, nil } - flowState := st.EnsureCommonState() + flowState := st.EnsureFlowState() switch flowState.Phase { case newagentmodel.PhaseExecuting: return NodeExecute, nil @@ -266,12 +268,12 @@ func branchAfterConfirm(_ context.Context, st *newagentmodel.AgentRuntimeState) } } -func branchAfterExecute(_ context.Context, st *newagentmodel.AgentRuntimeState) (string, error) { +func branchAfterExecute(_ context.Context, st *newagentmodel.AgentGraphState) (string, error) { if nextNode, interrupted := branchIfInterrupted(st); interrupted { return nextNode, nil } - flowState := st.EnsureCommonState() + flowState := st.EnsureFlowState() if flowState.Phase == newagentmodel.PhaseWaitingConfirm { return NodeConfirm, nil } @@ -281,11 +283,12 @@ func branchAfterExecute(_ context.Context, st *newagentmodel.AgentRuntimeState) return NodeExecute, nil } -func branchIfInterrupted(st *newagentmodel.AgentRuntimeState) (string, bool) { +func branchIfInterrupted(st *newagentmodel.AgentGraphState) (string, bool) { if st == nil { return "", false } - if st.HasPendingInteraction() { + runtimeState := st.EnsureRuntimeState() + if runtimeState != nil && runtimeState.HasPendingInteraction() { return NodeInterrupt, true } return "", false diff --git a/backend/newAgent/model/common_state.go b/backend/newAgent/model/common_state.go index 1433adc..e350927 100644 --- a/backend/newAgent/model/common_state.go +++ b/backend/newAgent/model/common_state.go @@ -1,6 +1,6 @@ package model -// Phase 表示 agent 循环图当前所处的阶段。 +// Phase 表示 agent 主循环当前所处的大阶段。 type Phase string const ( @@ -12,8 +12,14 @@ const ( const DefaultMaxRounds = 30 +// CommonState 承载可持久化的主流程状态。 +// +// 职责边界: +// 1. 负责记录“当前处于哪个阶段、当前计划是什么、执行到了第几步、已经消耗了多少轮”; +// 2. 负责提供最小必要的安全访问方法,避免 graph/node/prompt 层到处手写切片越界判断; +// 3. 不负责承载对话历史、tool schema、pinned context 这类模型输入材料,它们仍然属于 ConversationContext。 type CommonState struct { - // 身份 + // 身份信息 TraceID string UserID int ConversationID string @@ -21,8 +27,10 @@ type CommonState struct { // 流程阶段 Phase Phase - // Plan - PlanSteps []string + // 计划状态 + // 1. 这里直接使用结构化的 PlanStep,避免 planning -> execute 之间丢失 done_when。 + // 2. CurrentStep 表示“当前 plan 步骤下标”,不是 execute 内部 ReAct 的思考轮次。 + PlanSteps []PlanStep CurrentStep int // 安全边界 @@ -40,53 +48,58 @@ func NewCommonState(traceID string, userID int, conversationID string) *CommonSt } } -// NextRound 消耗一轮并返回是否还有余量。 +// NextRound 消耗一轮预算,并返回当前是否仍在允许范围内。 func (s *CommonState) NextRound() bool { s.RoundUsed++ return s.RoundUsed <= s.MaxRounds } -// Exhausted 判断是否已耗尽轮次。 +// Exhausted 判断是否已经耗尽轮次预算。 func (s *CommonState) Exhausted() bool { return s.RoundUsed >= s.MaxRounds } -// FinishPlan 标记 plan 完成,进入等待确认阶段。 -func (s *CommonState) FinishPlan(steps []string) { +// FinishPlan 在 planning 完成后固化完整计划,并推进到待确认阶段。 +// +// 步骤说明: +// 1. 直接保存完整的 []PlanStep,避免 execute 阶段再去依赖 pinned context 回捞完成判定; +// 2. 统一把 CurrentStep 重置到第 0 步,保证后续 confirm/execute 都从计划开头进入; +// 3. 这里只负责状态切换,不负责刷新 ConversationContext 中的置顶 plan 文本。 +func (s *CommonState) FinishPlan(steps []PlanStep) { s.PlanSteps = steps s.CurrentStep = 0 s.Phase = PhaseWaitingConfirm } -// ConfirmPlan 用户确认后进入执行阶段。 +// ConfirmPlan 表示用户已确认计划,流程进入执行阶段。 func (s *CommonState) ConfirmPlan() { s.Phase = PhaseExecuting } -// RejectPlan 用户拒绝,回到规划阶段。 +// RejectPlan 表示用户拒绝当前计划,清空计划并回退到 planning。 func (s *CommonState) RejectPlan() { s.PlanSteps = nil s.CurrentStep = 0 s.Phase = PhasePlanning } -// AdvanceStep 推进到下一个 plan 步骤,返回是否还有剩余步骤。 +// AdvanceStep 推进到下一个计划步骤,并返回是否仍有剩余步骤。 func (s *CommonState) AdvanceStep() bool { s.CurrentStep++ return s.CurrentStep < len(s.PlanSteps) } -// Done 标记整个流程结束。 +// Done 标记整个任务流程已经结束。 func (s *CommonState) Done() { s.Phase = PhaseDone } -// HasPlan 判断当前 state 是否已经持有一份可执行的 plan。 +// HasPlan 判断当前 state 是否已经持有一份完整计划。 // // 职责边界: -// 1. 负责把“外部直接判断 len(PlanSteps) > 0”的零散逻辑收口到 state 内部; -// 2. 只回答“是否存在 plan”,不判断当前索引是否有效; -// 3. 当 state 为空时返回 false,调用方可据此决定是否回退到重新规划。 +// 1. 负责收口“是否存在 plan”这一层判断,避免外层到处写 len(PlanSteps) > 0; +// 2. 不判断 CurrentStep 当前是否有效,当前步骤是否合法由 HasCurrentPlanStep 回答; +// 3. state 为空时统一返回 false,调用方可据此决定是否回退到 planning。 func (s *CommonState) HasPlan() bool { if s == nil { return false @@ -94,40 +107,35 @@ func (s *CommonState) HasPlan() bool { return len(s.PlanSteps) > 0 } -// CurrentPlanStep 返回当前正在执行的 plan 步骤文本。 +// CurrentPlanStep 返回当前正在执行的结构化计划步骤。 // // 职责边界: -// 1. 负责根据 CurrentStep 安全读取 PlanSteps,避免调用方重复写切片越界判断; -// 2. 当 state 为空、plan 为空、或当前索引越界时,统一返回 ("", false); +// 1. 负责根据 CurrentStep 安全读取 PlanSteps,避免 graph/node/prompt 层重复写越界判断; +// 2. 若 state 为空、plan 为空、或当前索引越界,则统一返回 (PlanStep{}, false); // 3. 不负责推进步骤,也不负责修正 CurrentStep 的取值。 -func (s *CommonState) CurrentPlanStep() (string, bool) { +func (s *CommonState) CurrentPlanStep() (PlanStep, bool) { if s == nil { - return "", false + return PlanStep{}, false } if s.CurrentStep < 0 || s.CurrentStep >= len(s.PlanSteps) { - return "", false + return PlanStep{}, false } return s.PlanSteps[s.CurrentStep], true } // HasCurrentPlanStep 判断“当前步骤”是否存在且可安全读取。 -// -// 职责边界: -// 1. 负责给 graph / node 层提供一个更直白的布尔判断入口; -// 2. 内部复用 CurrentPlanStep,避免两处维护相同的索引边界逻辑; -// 3. 不返回步骤内容,只回答“当前是否还有可注入的步骤”。 func (s *CommonState) HasCurrentPlanStep() bool { _, ok := s.CurrentPlanStep() return ok } -// PlanProgress 返回当前 plan 的执行进度。 +// PlanProgress 返回当前计划的执行进度。 // // 输出语义: -// 1. current 使用对人类更友好的 1-based 序号,适合直接写入 prompt 或日志; -// 2. total 表示当前 plan 总步数; -// 3. 若尚未生成 plan,则返回 (0, 0); -// 4. 若 CurrentStep 已越过末尾,则 current 会被收敛到 total,避免上层出现 total+1 这类噪音值。 +// 1. current 使用更适合给用户看的 1-based 序号; +// 2. total 表示当前计划的总步数; +// 3. 若当前还没有计划,则返回 (0, 0); +// 4. 若 CurrentStep 已越界到末尾之后,则把 current 收敛到 total,避免出现 total+1 这种噪音值。 func (s *CommonState) PlanProgress() (current int, total int) { if s == nil { return 0, 0 diff --git a/backend/newAgent/model/graph_run_state.go b/backend/newAgent/model/graph_run_state.go new file mode 100644 index 0000000..6522381 --- /dev/null +++ b/backend/newAgent/model/graph_run_state.go @@ -0,0 +1,193 @@ +package model + +import ( + "strings" + + newagentllm "github.com/LoveLosita/smartflow/backend/newAgent/llm" + newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" +) + +// AgentGraphRequest 描述一次 agent graph 运行的请求级输入。 +// +// 职责边界: +// 1. 这里只放“当前这次请求”天然携带的轻量数据,例如用户本轮输入; +// 2. 不负责承载可持久化流程状态,流程状态仍归 AgentRuntimeState; +// 3. 不负责承载 LLM / emitter / store 等依赖,这些统一放进 AgentGraphDeps。 +type AgentGraphRequest struct { + UserInput string +} + +// Normalize 统一清洗请求级输入中的字符串字段。 +func (r *AgentGraphRequest) Normalize() { + if r == nil { + return + } + r.UserInput = strings.TrimSpace(r.UserInput) +} + +// AgentGraphDeps 描述 graph/node 层运行时真正依赖的可插拔能力。 +// +// 设计目的: +// 1. 让 graph 不再只拿到“裸状态”,而是能拿到上下文、模型和输出能力; +// 2. Chat/Plan/Execute/Deliver 允许分别挂不同 client,但也允许先复用同一个 client; +// 3. ChunkEmitter 统一承接阶段提示、正文、工具事件、确认请求等 SSE 输出。 +type AgentGraphDeps struct { + ChatClient *newagentllm.Client + PlanClient *newagentllm.Client + ExecuteClient *newagentllm.Client + DeliverClient *newagentllm.Client + ChunkEmitter *newagentstream.ChunkEmitter +} + +// EnsureChunkEmitter 保证 graph 运行时始终有一个可用的 chunk 发射器。 +// +// 步骤说明: +// 1. 依赖为空时回退到 Noop emitter,避免骨架期因为没接前端而到处判空; +// 2. 这里只兜底“能安全调用”,不负责填充真实 request_id / model_name; +// 3. 后续 service 层一旦接上真实 emitter,会自然覆盖这里的空实现。 +func (d *AgentGraphDeps) EnsureChunkEmitter() *newagentstream.ChunkEmitter { + if d == nil { + return newagentstream.NewChunkEmitter(newagentstream.NoopPayloadEmitter(), "", "", 0) + } + if d.ChunkEmitter == nil { + d.ChunkEmitter = newagentstream.NewChunkEmitter(newagentstream.NoopPayloadEmitter(), "", "", 0) + } + return d.ChunkEmitter +} + +// ResolveChatClient 返回 chat 阶段可用的模型客户端。 +func (d *AgentGraphDeps) ResolveChatClient() *newagentllm.Client { + if d == nil { + return nil + } + return d.ChatClient +} + +// ResolvePlanClient 返回 planning 阶段可用的模型客户端。 +// +// 兜底策略: +// 1. 优先使用显式注入的 PlanClient; +// 2. 若未单独注入,则回退到 ChatClient; +// 3. 这样在骨架期可先用一套 client 跑通,再按需拆分 strategist / worker。 +func (d *AgentGraphDeps) ResolvePlanClient() *newagentllm.Client { + if d == nil { + return nil + } + if d.PlanClient != nil { + return d.PlanClient + } + return d.ChatClient +} + +// ResolveExecuteClient 返回 execute 阶段可用的模型客户端。 +func (d *AgentGraphDeps) ResolveExecuteClient() *newagentllm.Client { + if d == nil { + return nil + } + if d.ExecuteClient != nil { + return d.ExecuteClient + } + if d.PlanClient != nil { + return d.PlanClient + } + return d.ChatClient +} + +// ResolveDeliverClient 返回 deliver 阶段可用的模型客户端。 +func (d *AgentGraphDeps) ResolveDeliverClient() *newagentllm.Client { + if d == nil { + return nil + } + if d.DeliverClient != nil { + return d.DeliverClient + } + if d.ExecuteClient != nil { + return d.ExecuteClient + } + if d.PlanClient != nil { + return d.PlanClient + } + return d.ChatClient +} + +// AgentGraphRunInput 是执行 newAgent 通用 graph 所需的完整入口参数。 +// +// 字段说明: +// 1. RuntimeState:可持久化流程状态与 pending interaction; +// 2. ConversationContext:本轮喂给模型的上下文材料; +// 3. Request:当前这次请求的轻量输入; +// 4. Deps:graph/node 层真正依赖的可插拔能力。 +type AgentGraphRunInput struct { + RuntimeState *AgentRuntimeState + ConversationContext *ConversationContext + Request AgentGraphRequest + Deps AgentGraphDeps +} + +// AgentGraphState 是 graph 内部真正流转的运行态容器。 +// +// 职责边界: +// 1. 负责把“流程状态 + 对话上下文 + 请求输入 + 运行依赖”收口到同一个对象; +// 2. 负责给 graph 分支和 node 提供最小必要的兜底访问方法; +// 3. 不负责持久化,不负责真正业务执行。 +type AgentGraphState struct { + RuntimeState *AgentRuntimeState + ConversationContext *ConversationContext + Request AgentGraphRequest + Deps AgentGraphDeps +} + +// NewAgentGraphState 把入口参数整理成 graph 内部状态。 +func NewAgentGraphState(input AgentGraphRunInput) *AgentGraphState { + st := &AgentGraphState{ + RuntimeState: input.RuntimeState, + ConversationContext: input.ConversationContext, + Request: input.Request, + Deps: input.Deps, + } + st.Request.Normalize() + st.EnsureRuntimeState() + st.EnsureConversationContext() + st.Deps.EnsureChunkEmitter() + return st +} + +// EnsureRuntimeState 保证 graph 内部始终持有一份可用的运行态。 +func (s *AgentGraphState) EnsureRuntimeState() *AgentRuntimeState { + if s == nil { + return nil + } + if s.RuntimeState == nil { + s.RuntimeState = NewAgentRuntimeState(nil) + } + s.RuntimeState.EnsureCommonState() + return s.RuntimeState +} + +// EnsureFlowState 返回可持久化的主流程状态。 +func (s *AgentGraphState) EnsureFlowState() *CommonState { + runtimeState := s.EnsureRuntimeState() + if runtimeState == nil { + return nil + } + return runtimeState.EnsureCommonState() +} + +// EnsureConversationContext 保证 graph 内部始终持有一份可用的会话上下文。 +func (s *AgentGraphState) EnsureConversationContext() *ConversationContext { + if s == nil { + return nil + } + if s.ConversationContext == nil { + s.ConversationContext = NewConversationContext("") + } + return s.ConversationContext +} + +// EnsureChunkEmitter 返回 graph 可安全调用的 chunk 发射器。 +func (s *AgentGraphState) EnsureChunkEmitter() *newagentstream.ChunkEmitter { + if s == nil { + return newagentstream.NewChunkEmitter(newagentstream.NoopPayloadEmitter(), "", "", 0) + } + return s.Deps.EnsureChunkEmitter() +} diff --git a/backend/newAgent/model/plan_contract.go b/backend/newAgent/model/plan_contract.go new file mode 100644 index 0000000..275b8e5 --- /dev/null +++ b/backend/newAgent/model/plan_contract.go @@ -0,0 +1,121 @@ +package model + +import ( + "fmt" + "strings" +) + +// PlanAction 表示规划阶段单轮决策的动作类型。 +// +// 设计原则: +// 1. 规划阶段只关心“继续规划 / 追问用户 / 规划完成”这三类动作; +// 2. 这里先不把工具调用塞进 contract,避免过早把 plan loop 复杂化; +// 3. 规划层产出的是“自然语言计划”,不是执行层的工具动作。 +type PlanAction string + +const ( + // PlanActionContinue 表示当前信息已足够,继续规划下一轮。 + PlanActionContinue PlanAction = "continue" + + // PlanActionAskUser 表示当前规划缺少关键信息,需要中断并追问用户。 + PlanActionAskUser PlanAction = "ask_user" + + // PlanActionDone 表示规划已经完成,可以进入 confirm 或下一阶段。 + PlanActionDone PlanAction = "plan_done" +) + +// PlanDecision 是 plan prompt 单轮产出的统一决策结构。 +// +// 职责边界: +// 1. Speak 是本轮先对用户说的话;若 action=ask_user,通常这里会承载要追问的问题; +// 2. Action 是规划阶段的下一步动作类型; +// 3. Reason 是给后端和日志看的简短解释; +// 4. PlanSteps 只在 plan_done 时要求返回,表示本轮最终确认下来的完整自然语言计划。 +type PlanDecision struct { + Speak string `json:"speak,omitempty"` + Action PlanAction `json:"action"` + Reason string `json:"reason,omitempty"` + PlanSteps []PlanStep `json:"plan_steps,omitempty"` +} + +// Normalize 统一清洗规划决策中的字符串字段。 +func (d *PlanDecision) Normalize() { + if d == nil { + return + } + d.Speak = strings.TrimSpace(d.Speak) + d.Action = PlanAction(strings.TrimSpace(string(d.Action))) + d.Reason = strings.TrimSpace(d.Reason) + for i := range d.PlanSteps { + d.PlanSteps[i].Normalize() + } +} + +// Validate 校验规划决策的最小合法性。 +// +// 校验原则: +// 1. 这里只校验“协议是否自洽”,不校验规划内容是否聪明、是否足够好; +// 2. 只有 plan_done 允许返回完整 plan_steps; +// 3. 真正的规划质量判断仍留给后续 node 层和用户确认环节。 +func (d *PlanDecision) Validate() error { + if d == nil { + return fmt.Errorf("plan decision 不能为空") + } + + d.Normalize() + if d.Action == "" { + return fmt.Errorf("plan decision.action 不能为空") + } + + switch d.Action { + case PlanActionContinue, PlanActionAskUser: + if len(d.PlanSteps) > 0 { + return fmt.Errorf("%s 动作不应携带 plan_steps", d.Action) + } + return nil + case PlanActionDone: + if len(d.PlanSteps) == 0 { + return fmt.Errorf("plan_done 动作必须携带完整 plan_steps") + } + for i := range d.PlanSteps { + if err := d.PlanSteps[i].Validate(); err != nil { + return fmt.Errorf("plan_steps[%d] 非法: %w", i, err) + } + } + return nil + default: + return fmt.Errorf("未知 plan action: %s", d.Action) + } +} + +// PlanStep 表示规划阶段产出的一条自然语言步骤。 +// +// 设计说明: +// 1. Content 是步骤正文,后续可直接落到 CommonState.PlanSteps; +// 2. DoneWhen 是可选的完成判定描述,用来给 execute 阶段提供最小退出条件; +// 3. 这里仍然保持“自然语言优先”,不把 plan step 过度结构化。 +type PlanStep struct { + Content string `json:"content"` + DoneWhen string `json:"done_when,omitempty"` +} + +// Normalize 统一清洗 plan step 中的字符串字段。 +func (s *PlanStep) Normalize() { + if s == nil { + return + } + s.Content = strings.TrimSpace(s.Content) + s.DoneWhen = strings.TrimSpace(s.DoneWhen) +} + +// Validate 校验单条 plan step 的最小合法性。 +func (s *PlanStep) Validate() error { + if s == nil { + return fmt.Errorf("plan step 不能为空") + } + s.Normalize() + if s.Content == "" { + return fmt.Errorf("plan step.content 不能为空") + } + return nil +} diff --git a/backend/newAgent/node/agent_nodes.go b/backend/newAgent/node/agent_nodes.go new file mode 100644 index 0000000..19da54b --- /dev/null +++ b/backend/newAgent/node/agent_nodes.go @@ -0,0 +1,48 @@ +package newagentnode + +import ( + "context" + "errors" + + newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" +) + +// AgentNodes 是 newAgent 通用图的节点容器。 +// +// 职责边界: +// 1. 负责把 node 层真正实现的方法统一暴露给 graph 注册; +// 2. 负责收口“graph 只编排、node 真执行”的结构约束; +// 3. 当前先迁移 Plan,其他节点后续按同样模式逐步下沉。 +type AgentNodes struct{} + +// NewAgentNodes 创建通用节点容器。 +func NewAgentNodes() *AgentNodes { + return &AgentNodes{} +} + +// Plan 是规划阶段的正式节点方法。 +// +// 职责边界: +// 1. 这里只做 graph -> node 的参数转接; +// 2. 真正的单轮规划逻辑仍由 RunPlanNode 负责; +// 3. 这样 graph 层后续只需挂 n.Plan,而不再自己维护占位 planNode。 +func (n *AgentNodes) Plan(ctx context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) { + if st == nil { + return nil, errors.New("plan node: state is nil") + } + + if err := RunPlanNode( + ctx, + PlanNodeInput{ + RuntimeState: st.EnsureRuntimeState(), + ConversationContext: st.EnsureConversationContext(), + UserInput: st.Request.UserInput, + Client: st.Deps.ResolvePlanClient(), + ChunkEmitter: st.EnsureChunkEmitter(), + ResumeNode: "plan", + }, + ); err != nil { + return nil, err + } + return st, nil +} diff --git a/backend/newAgent/node/plan.go b/backend/newAgent/node/plan.go new file mode 100644 index 0000000..0bcbad6 --- /dev/null +++ b/backend/newAgent/node/plan.go @@ -0,0 +1,205 @@ +package newagentnode + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/google/uuid" + + newagentllm "github.com/LoveLosita/smartflow/backend/newAgent/llm" + newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" + newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" + newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" + "github.com/cloudwego/eino/schema" +) + +const ( + planStageName = "plan" + planStatusBlockID = "plan.status" + planSpeakBlockID = "plan.speak" + planPinnedKey = "current_plan" + planCurrentStepKey = "current_step" + planCurrentStepTitle = "当前步骤" + planFullPlanTitle = "当前完整计划" +) + +// PlanNodeInput 描述单轮规划节点执行所需的最小依赖。 +type PlanNodeInput struct { + RuntimeState *newagentmodel.AgentRuntimeState + ConversationContext *newagentmodel.ConversationContext + UserInput string + Client *newagentllm.Client + ChunkEmitter *newagentstream.ChunkEmitter + ResumeNode string +} + +// RunPlanNode 执行一轮规划节点逻辑。 +// +// 步骤说明: +// 1. 先校验最小依赖,并推送一条“正在规划”的状态,避免用户空等; +// 2. 再用 prompt/plan.go 组装 messages,请模型严格输出 PlanDecision JSON; +// 3. 若模型先对用户说了话,则先把 speak 伪流式推给前端,并写回 history; +// 4. 最后按 action 推进流程: +// 4.1 continue:继续停留在 planning; +// 4.2 ask_user:打开 pending interaction,后续交给 interrupt 收口; +// 4.3 plan_done:固化完整计划,刷新 pinned context,并进入 waiting_confirm。 +func RunPlanNode(ctx context.Context, input PlanNodeInput) error { + runtimeState, conversationContext, emitter, err := preparePlanNodeInput(input) + if err != nil { + return err + } + flowState := runtimeState.EnsureCommonState() + + // 1. 先发一条阶段状态,让前端知道当前已经进入规划环节。 + if err := emitter.EmitStatus( + planStatusBlockID, + planStageName, + "planning", + "正在梳理目标并补全执行计划。", + false, + ); err != nil { + return fmt.Errorf("规划阶段状态推送失败: %w", err) + } + + // 2. 构造本轮规划输入,并要求模型输出结构化 PlanDecision。 + messages := newagentprompt.BuildPlanMessages(flowState, conversationContext, input.UserInput) + decision, rawResult, err := newagentllm.GenerateJSON[newagentmodel.PlanDecision]( + ctx, + input.Client, + messages, + newagentllm.GenerateOptions{ + Temperature: 0.2, + MaxTokens: 1600, + Thinking: newagentllm.ThinkingModeEnabled, + Metadata: map[string]any{ + "stage": planStageName, + }, + }, + ) + if err != nil { + if rawResult != nil && strings.TrimSpace(rawResult.Text) != "" { + return fmt.Errorf("规划输出解析失败,原始输出=%s,错误=%w", strings.TrimSpace(rawResult.Text), err) + } + return fmt.Errorf("规划阶段模型调用失败: %w", err) + } + if err := decision.Validate(); err != nil { + return fmt.Errorf("规划决策不合法: %w", err) + } + + // 3. 若模型先对用户说了话,则先以伪流式推送,再写回 history,保证上下文连续。 + if strings.TrimSpace(decision.Speak) != "" { + if err := emitter.EmitPseudoAssistantText( + ctx, + planSpeakBlockID, + planStageName, + decision.Speak, + newagentstream.DefaultPseudoStreamOptions(), + ); err != nil { + return fmt.Errorf("规划文案推送失败: %w", err) + } + conversationContext.AppendHistory(schema.AssistantMessage(decision.Speak, nil)) + } + + // 4. 按规划动作推进流程状态。 + switch decision.Action { + case newagentmodel.PlanActionContinue: + flowState.Phase = newagentmodel.PhasePlanning + return nil + case newagentmodel.PlanActionAskUser: + question := resolvePlanAskUserText(decision) + runtimeState.OpenAskUserInteraction(uuid.NewString(), question, strings.TrimSpace(input.ResumeNode)) + return nil + case newagentmodel.PlanActionDone: + // 4.1 直接把结构化 PlanStep 固化到 CommonState,避免 state 层丢失 done_when。 + // 4.2 再把完整自然语言计划写入 pinned context,保证后续 execute 优先看到。 + // 4.3 最后进入 waiting_confirm,等待用户确认整体计划。 + flowState.FinishPlan(decision.PlanSteps) + writePlanPinnedBlocks(conversationContext, decision.PlanSteps) + return nil + default: + return fmt.Errorf("未支持的规划动作: %s", decision.Action) + } +} + +func preparePlanNodeInput(input PlanNodeInput) (*newagentmodel.AgentRuntimeState, *newagentmodel.ConversationContext, *newagentstream.ChunkEmitter, error) { + if input.RuntimeState == nil { + return nil, nil, nil, fmt.Errorf("plan node: runtime state 不能为空") + } + if input.Client == nil { + return nil, nil, nil, fmt.Errorf("plan node: plan client 未注入") + } + + input.RuntimeState.EnsureCommonState() + if input.ConversationContext == nil { + input.ConversationContext = newagentmodel.NewConversationContext("") + } + if input.ChunkEmitter == nil { + input.ChunkEmitter = newagentstream.NewChunkEmitter(newagentstream.NoopPayloadEmitter(), "", "", time.Now().Unix()) + } + return input.RuntimeState, input.ConversationContext, input.ChunkEmitter, nil +} + +func resolvePlanAskUserText(decision *newagentmodel.PlanDecision) string { + if decision == nil { + return "我还缺一点关键信息,想先向你确认一下。" + } + if strings.TrimSpace(decision.Speak) != "" { + return strings.TrimSpace(decision.Speak) + } + if strings.TrimSpace(decision.Reason) != "" { + return strings.TrimSpace(decision.Reason) + } + return "我还缺一点关键信息,想先向你确认一下。" +} + +func writePlanPinnedBlocks(ctx *newagentmodel.ConversationContext, steps []newagentmodel.PlanStep) { + if ctx == nil { + return + } + + fullPlanText := buildPinnedPlanText(steps) + if strings.TrimSpace(fullPlanText) != "" { + ctx.UpsertPinnedBlock(newagentmodel.ContextBlock{ + Key: planPinnedKey, + Title: planFullPlanTitle, + Content: fullPlanText, + }) + } + + if len(steps) == 0 { + return + } + + firstStep := strings.TrimSpace(steps[0].Content) + if strings.TrimSpace(steps[0].DoneWhen) != "" { + firstStep = fmt.Sprintf("%s\n完成判定:%s", firstStep, strings.TrimSpace(steps[0].DoneWhen)) + } + ctx.UpsertPinnedBlock(newagentmodel.ContextBlock{ + Key: planCurrentStepKey, + Title: planCurrentStepTitle, + Content: firstStep, + }) +} + +func buildPinnedPlanText(steps []newagentmodel.PlanStep) string { + if len(steps) == 0 { + return "" + } + + lines := make([]string, 0, len(steps)) + for i, step := range steps { + content := strings.TrimSpace(step.Content) + if content == "" { + continue + } + + line := fmt.Sprintf("%d. %s", i+1, content) + if strings.TrimSpace(step.DoneWhen) != "" { + line += fmt.Sprintf("\n完成判定:%s", strings.TrimSpace(step.DoneWhen)) + } + lines = append(lines, line) + } + return strings.TrimSpace(strings.Join(lines, "\n\n")) +} diff --git a/backend/newAgent/prompt/execute.go b/backend/newAgent/prompt/execute.go index 9861f02..8748ae3 100644 --- a/backend/newAgent/prompt/execute.go +++ b/backend/newAgent/prompt/execute.go @@ -8,25 +8,18 @@ import ( ) const ( - // ExecuteNextPlanSignal 表示“当前 plan 步骤已经完成,可以进入下一个步骤”。 - // - // TODO(newagent/node): 后续 executeNode 识别到该信号后,调用 state.AdvanceStep() 或决定进入交付阶段。 + // ExecuteNextPlanSignal 表示“当前 plan step 已完成,可以推进到下一个步骤”。 ExecuteNextPlanSignal = "[NEXT_PLAN]" - // ExecuteDoneSignal 表示“整个任务已经完成,可以结束执行链路”。 - // - // TODO(newagent/node): 后续 executeNode 识别到该信号后,调用 state.Done() 并进入 deliver。 + // ExecuteDoneSignal 表示“整个任务已经完成,可以进入最终交付”。 ExecuteDoneSignal = "[DONE]" - // ExecuteAskUserSignal 表示“执行阶段缺关键信息,需要向用户追问”。 - // - // TODO(newagent/node): 后续若你决定支持 ask_user,这里可作为统一控制信号继续扩展。 + // ExecuteAskUserSignal 表示“执行当前步骤缺少关键信息,需要向用户追问”。 ExecuteAskUserSignal = "[ASK_USER]" ) const executeSystemPrompt = ` 你是 SmartFlow NewAgent 的执行器。 - 你的职责是在“当前 plan 步骤”的约束下,进行思考、执行、观察,再决定下一步动作。 请遵守以下规则: @@ -52,13 +45,6 @@ func BuildExecuteSystemPrompt() string { } // BuildExecuteMessages 组装执行阶段的 messages。 -// -// 职责边界: -// 1. 负责收敛执行阶段需要的 system / pinned / history / runtime prompt; -// 2. 负责把“完整 plan + 当前步骤 + 控制信号”显式告知模型; -// 3. 不负责解析模型输出,也不负责真正调用工具。 -// -// TODO(newagent/node): 后续 executeNode 应直接复用这个方法,而不是在节点内手拼执行提示词。 func BuildExecuteMessages(state *newagentmodel.CommonState, ctx *newagentmodel.ConversationContext) []*schema.Message { return buildStageMessages( BuildExecuteSystemPrompt(), @@ -94,8 +80,13 @@ func BuildExecuteUserPrompt(state *newagentmodel.CommonState) string { sb.WriteString(ExecuteAskUserSignal) sb.WriteString("。\n") sb.WriteString("\n当前步骤正文:\n") - sb.WriteString(currentStep) + sb.WriteString(strings.TrimSpace(currentStep.Content)) sb.WriteString("\n") + if strings.TrimSpace(currentStep.DoneWhen) != "" { + sb.WriteString("\n当前步骤完成判定:\n") + sb.WriteString(strings.TrimSpace(currentStep.DoneWhen)) + sb.WriteString("\n") + } } else { sb.WriteString("当前 plan 已存在,但当前步骤索引无效;请不要擅自执行其他步骤。\n") } diff --git a/backend/newAgent/prompt/plan.go b/backend/newAgent/prompt/plan.go index 0de1936..65ad3e9 100644 --- a/backend/newAgent/prompt/plan.go +++ b/backend/newAgent/prompt/plan.go @@ -8,24 +8,17 @@ import ( "github.com/cloudwego/eino/schema" ) -const ( - // PlanDoneSignal 表示“规划阶段结束,可以进入 confirm 或下一阶段”。 - // - // TODO(newagent/node): 后续由 planNode 读取模型输出时识别这个信号,并据此调用 state.FinishPlan(...)。 - PlanDoneSignal = "[PLAN_DONE]" -) - const planSystemPrompt = ` 你是 SmartFlow NewAgent 的规划器。 - -你的职责不是直接执行任务,而是先把用户意图拆成一组清晰、稳定、可逐步执行的自然语言计划。 +你的职责不是直接执行任务,而是先把用户意图拆成一组清晰、稳定、可逐步执行的自然语言计划,并严格按后端约定的 JSON 协议输出。 请遵守以下规则: 1. 只负责规划,不要假装已经调用了工具,也不要伪造执行结果。 -2. 每一轮只推进一步规划;如果信息不足,可以明确指出缺口。 +2. 每一轮只推进一步规划;如果信息不足,应明确转成 ask_user,而不是继续硬猜。 3. 若当前计划仍不完整,就继续围绕当前任务补全计划,不要跳去执行细节。 -4. 若你认为计划已经完整可执行,请在输出中显式带上 ` + "`" + `[` + `PLAN_DONE` + `]` + "`" + ` 信号。 -5. 计划必须使用自然语言,便于后端将完整 plan 重新注入到后续上下文顶部。 +4. 若你认为计划已经完整可执行,请返回 action=plan_done,并附带完整 plan_steps。 +5. plan_steps 必须使用自然语言,便于后端将完整 plan 重新注入到后续上下文顶部。 +6. 只输出 JSON,不要输出 markdown,不要输出额外解释,不要在 JSON 外再补文字。 你会看到: - 当前阶段与轮次信息 @@ -47,10 +40,8 @@ func BuildPlanSystemPrompt() string { // // 职责边界: // 1. 负责把 state + context 收敛成规划阶段模型输入; -// 2. 负责把“置顶上下文”和“工具摘要”放到 history 前面,降低模型跑偏概率; -// 3. 不负责解析模型输出,不负责判断是否真的完成规划。 -// -// TODO(newagent/node): 后续 planNode 直接复用这个入口,不要在节点里散落拼 message 的逻辑。 +// 2. 负责把置顶上下文和工具摘要放在 history 前面,降低模型跑偏概率; +// 3. 不负责解析模型输出,也不负责判断规划质量。 func BuildPlanMessages(state *newagentmodel.CommonState, ctx *newagentmodel.ConversationContext, userInput string) []*schema.Message { return buildStageMessages( BuildPlanSystemPrompt(), @@ -60,21 +51,15 @@ func BuildPlanMessages(state *newagentmodel.CommonState, ctx *newagentmodel.Conv } // BuildPlanUserPrompt 构造规划阶段的用户提示词。 -// -// 设计目标: -// 1. 把当前阶段、轮次、既有 plan、当前步骤等控制信息显式写给模型; -// 2. 保持自然语言风格,方便你后续继续改成自己想要的控制协议; -// 3. 用户原始输入单独放在末尾,避免被系统拼装信息淹没。 func BuildPlanUserPrompt(state *newagentmodel.CommonState, userInput string) string { var sb strings.Builder sb.WriteString("请继续当前任务的规划阶段。\n") sb.WriteString(renderStateSummary(state)) sb.WriteString("\n") - sb.WriteString("本轮目标:围绕当前任务继续规划,直到形成一份稳定、可执行的自然语言 plan。\n") - sb.WriteString("如果计划已经完整,请显式输出 ") - sb.WriteString(PlanDoneSignal) - sb.WriteString("。\n") + sb.WriteString("本轮目标:围绕当前任务继续规划,直到形成一份稳定、可执行的自然语言 plan,或在信息不足时明确追问用户。\n\n") + sb.WriteString(BuildPlanDecisionContractText()) + sb.WriteString("\n") trimmedInput := strings.TrimSpace(userInput) if trimmedInput != "" { @@ -86,10 +71,61 @@ func BuildPlanUserPrompt(state *newagentmodel.CommonState, userInput string) str return strings.TrimSpace(sb.String()) } +// BuildPlanDecisionContractText 返回规划阶段的输出协议说明。 +func BuildPlanDecisionContractText() string { + return strings.TrimSpace(fmt.Sprintf(` +输出协议(严格 JSON): +- speak:给用户看的话;若 action=%s,这里通常就是要追问用户的问题 +- action:只能是 %s / %s / %s +- reason:给后端和日志看的简短说明 +- plan_steps:仅当 action=%s 时允许返回;返回时必须是完整计划,不是增量 +- plan_steps[].content:步骤正文,必填 +- plan_steps[].done_when:可选,建议写“什么情况下算这一步做完” + +合法示例: +{ + "speak": "我先把计划再收束一下。", + "action": "%s", + "reason": "当前信息已足够继续规划" +} + +{ + "speak": "你更希望我优先安排今天,还是按整周来规划?", + "action": "%s", + "reason": "当前时间范围仍不明确" +} + +{ + "speak": "计划已经整理好了,我先给你确认一下。", + "action": "%s", + "reason": "当前计划已具备执行条件", + "plan_steps": [ + { + "content": "先确认本周可用时间范围", + "done_when": "拿到明确的可用时间段列表" + }, + { + "content": "基于可用时间生成执行安排", + "done_when": "得到一份用户可确认的安排方案" + } + ] +} +`, + newagentmodel.PlanActionAskUser, + newagentmodel.PlanActionContinue, + newagentmodel.PlanActionAskUser, + newagentmodel.PlanActionDone, + newagentmodel.PlanActionDone, + newagentmodel.PlanActionContinue, + newagentmodel.PlanActionAskUser, + newagentmodel.PlanActionDone, + )) +} + // buildStageMessages 组装某个阶段通用的 messages。 // // 步骤说明: -// 1. 先合并 context 自带 system prompt 与阶段 prompt,保证通用约束和阶段约束都能生效; +// 1. 先合并 context 自带 system prompt 与阶段 prompt,保证通用约束和阶段约束都生效; // 2. 再把置顶上下文块和工具摘要补成 system message,尽量顶在 history 前面; // 3. 最后追加历史消息与本轮 user prompt,保持“新约束在前、历史在后”的稳定顺序。 func buildStageMessages(stageSystemPrompt string, ctx *newagentmodel.ConversationContext, runtimeUserPrompt string) []*schema.Message { @@ -123,10 +159,10 @@ func buildStageMessages(stageSystemPrompt string, ctx *newagentmodel.Conversatio return messages } -// renderStateSummary 将当前流程状态渲染成简洁文本。 +// renderStateSummary 把当前流程状态渲染成简洁文本。 func renderStateSummary(state *newagentmodel.CommonState) string { if state == nil { - return "当前状态:state 缺失,请先进行兜底处理。" + return "当前状态:state 缺失,请先做兜底处理。" } var sb strings.Builder @@ -142,22 +178,30 @@ func renderStateSummary(state *newagentmodel.CommonState) string { sb.WriteString("当前完整 plan:\n") for i, step := range state.PlanSteps { - sb.WriteString(fmt.Sprintf("%d. %s\n", i+1, strings.TrimSpace(step))) + sb.WriteString(fmt.Sprintf("%d. %s\n", i+1, strings.TrimSpace(step.Content))) + if strings.TrimSpace(step.DoneWhen) != "" { + sb.WriteString(fmt.Sprintf(" 完成判定:%s\n", strings.TrimSpace(step.DoneWhen))) + } } if step, ok := state.CurrentPlanStep(); ok { sb.WriteString(fmt.Sprintf("当前步骤进度:%d/%d\n", current, total)) sb.WriteString("当前步骤内容:\n") - sb.WriteString(step) + sb.WriteString(strings.TrimSpace(step.Content)) sb.WriteString("\n") + if strings.TrimSpace(step.DoneWhen) != "" { + sb.WriteString("当前步骤完成判定:\n") + sb.WriteString(strings.TrimSpace(step.DoneWhen)) + sb.WriteString("\n") + } } else { - sb.WriteString("当前步骤进度:暂无有效当前步骤。\n") + sb.WriteString("当前步骤进度:暂时无有效当前步骤。\n") } return sb.String() } -// renderPinnedBlocks 将 ConversationContext 中的置顶块渲染成一段独立的 system 内容。 +// renderPinnedBlocks 把 ConversationContext 中的置顶块渲染成独立的 system 文本。 func renderPinnedBlocks(ctx *newagentmodel.ConversationContext) string { if ctx == nil { return "" @@ -186,7 +230,7 @@ func renderPinnedBlocks(ctx *newagentmodel.ConversationContext) string { return strings.TrimSpace(sb.String()) } -// renderToolSchemas 将工具摘要渲染成独立文本块。 +// renderToolSchemas 把工具摘要渲染成独立文本块。 func renderToolSchemas(ctx *newagentmodel.ConversationContext) string { if ctx == nil { return "" diff --git a/backend/newAgent/stream/emitter.go b/backend/newAgent/stream/emitter.go index 1e62a89..36b404b 100644 --- a/backend/newAgent/stream/emitter.go +++ b/backend/newAgent/stream/emitter.go @@ -1,8 +1,10 @@ package newagentstream import ( + "context" "fmt" "strings" + "time" ) // PayloadEmitter 是真正向外层 SSE 管道写 chunk 的最小接口。 @@ -13,9 +15,52 @@ import ( // 3. 只要签名是 `func(string) error`,都能接进来。 type PayloadEmitter func(payload string) error -// StageEmitter 是 graph/node 对“当前阶段”进行推送的最小接口。 +// StageEmitter 是 graph/node 对“当前阶段”进行推送的兼容接口。 +// +// 设计说明: +// 1. 旧调用侧仍然只关心 stage/detail 两段文本,因此这里先保留; +// 2. 新的结构化事件能力会通过 ChunkEmitter 补齐,而不是继续扩展这个函数签名; +// 3. 这样能兼顾当前兼容性和后续协议升级空间。 type StageEmitter func(stage, detail string) +// PseudoStreamOptions 描述“整段文字伪流式输出”的切块与节奏配置。 +// +// 字段语义: +// 1. MinChunkRunes:达到该最小长度后,若命中标点/换行等边界,可提前切块; +// 2. MaxChunkRunes:单块最大 rune 数,超过后强制切块,避免一次性发太长; +// 3. ChunkInterval:块与块之间的等待时间;为 0 时表示只做切块,不做人为延迟。 +type PseudoStreamOptions struct { + MinChunkRunes int + MaxChunkRunes int + ChunkInterval time.Duration +} + +const ( + defaultPseudoStreamMinChunkRunes = 8 + defaultPseudoStreamMaxChunkRunes = 24 +) + +// DefaultPseudoStreamOptions 返回一份适合中文短句展示的默认伪流式配置。 +func DefaultPseudoStreamOptions() PseudoStreamOptions { + return PseudoStreamOptions{ + MinChunkRunes: defaultPseudoStreamMinChunkRunes, + MaxChunkRunes: defaultPseudoStreamMaxChunkRunes, + } +} + +// ChunkEmitter 是 newAgent 统一的 SSE chunk 发射器。 +// +// 职责边界: +// 1. 负责把“正文 / 思考 / 工具事件 / 确认请求 / 中断提示”统一转换成 OpenAI 兼容 payload; +// 2. 负责在必要时把结构化事件附带成 extra,同时给当前前端提供可读的降级文本; +// 3. 不负责决定什么时候发什么,也不负责持久化状态。 +type ChunkEmitter struct { + emit PayloadEmitter + RequestID string + ModelName string + Created int64 +} + // NoopPayloadEmitter 返回一个空实现,便于骨架期安全占位。 func NoopPayloadEmitter() PayloadEmitter { return func(string) error { return nil } @@ -34,69 +79,320 @@ func WrapStageEmitter(fn func(stage, detail string)) StageEmitter { return fn } -// EmitStageAsReasoning 把“阶段提示”伪装成 reasoning chunk 推给前端。 +// NewChunkEmitter 创建统一 chunk 发射器。 // -// 设计背景: -// 1. 你当前 Apifox 只认思考块和正文块,因此阶段提示需要先借 reasoning_content 走通; -// 2. 这样后续真正前端上线时,只需要在这一层换协议,而不必回到各 skill 重改 graph; -// 3. 这里不拼花哨格式,只给出稳定、可读、可 grep 的文本。 -func EmitStageAsReasoning(emit PayloadEmitter, requestID, modelName string, created int64, stage, detail string, includeRole bool) error { +// 兜底策略: +// 1. emit 为空时回退到 Noop,避免骨架期到处判空; +// 2. modelName 为空时回填 worker,保持 OpenAI 兼容字段稳定; +// 3. created <= 0 时用当前时间兜底,避免上层还没决定时间戳就无法复用。 +func NewChunkEmitter(emit PayloadEmitter, requestID, modelName string, created int64) *ChunkEmitter { if emit == nil { + emit = NoopPayloadEmitter() + } + + modelName = strings.TrimSpace(modelName) + if modelName == "" { + modelName = "worker" + } + if created <= 0 { + created = time.Now().Unix() + } + + return &ChunkEmitter{ + emit: emit, + RequestID: strings.TrimSpace(requestID), + ModelName: modelName, + Created: created, + } +} + +// EmitReasoningText 输出一段 reasoning 文字,并附带 reasoning_text extra。 +func (e *ChunkEmitter) EmitReasoningText(blockID, stage, text string, includeRole bool) error { + if e == nil || e.emit == nil { return nil } - text := BuildStageReasoningText(stage, detail) - payload, err := ToOpenAIReasoningChunk(requestID, modelName, created, text, includeRole) + text = strings.TrimSpace(text) + if text == "" { + return nil + } + + payload, err := ToOpenAIReasoningChunkWithExtra( + e.RequestID, + e.ModelName, + e.Created, + text, + includeRole, + NewReasoningTextExtra(blockID, stage), + ) if err != nil { return err } if payload == "" { return nil } - return emit(payload) + return e.emit(payload) +} + +// EmitAssistantText 输出一段 assistant 正文,并附带 assistant_text extra。 +func (e *ChunkEmitter) EmitAssistantText(blockID, stage, text string, includeRole bool) error { + if e == nil || e.emit == nil { + return nil + } + + text = strings.TrimSpace(text) + if text == "" { + return nil + } + + payload, err := ToOpenAIAssistantChunkWithExtra( + e.RequestID, + e.ModelName, + e.Created, + text, + includeRole, + NewAssistantTextExtra(blockID, stage), + ) + if err != nil { + return err + } + if payload == "" { + return nil + } + return e.emit(payload) +} + +// EmitPseudoReasoningText 把整段 reasoning 文本按伪流式方式逐块推出。 +func (e *ChunkEmitter) EmitPseudoReasoningText(ctx context.Context, blockID, stage, text string, options PseudoStreamOptions) error { + return e.emitPseudoText( + ctx, + text, + options, + func(chunk string, includeRole bool) error { + return e.EmitReasoningText(blockID, stage, chunk, includeRole) + }, + ) +} + +// EmitPseudoAssistantText 把整段 assistant 文本按伪流式方式逐块推出。 +func (e *ChunkEmitter) EmitPseudoAssistantText(ctx context.Context, blockID, stage, text string, options PseudoStreamOptions) error { + return e.emitPseudoText( + ctx, + text, + options, + func(chunk string, includeRole bool) error { + return e.EmitAssistantText(blockID, stage, chunk, includeRole) + }, + ) +} + +// EmitStatus 输出一条阶段状态事件。 +// +// 当前兼容策略: +// 1. extra 用 status 表达结构化语义; +// 2. reasoning_content 里同时放一份可读降级文本,保证旧前端也能看到。 +func (e *ChunkEmitter) EmitStatus(blockID, stage, code, summary string, includeRole bool) error { + if e == nil || e.emit == nil { + return nil + } + + text := BuildStageReasoningText(stage, summary) + payload, err := ToOpenAIReasoningChunkWithExtra( + e.RequestID, + e.ModelName, + e.Created, + text, + includeRole, + NewStatusExtra(blockID, stage, code, summary), + ) + if err != nil { + return err + } + if payload == "" { + return nil + } + return e.emit(payload) +} + +// EmitToolCallStart 输出一次工具调用开始事件。 +func (e *ChunkEmitter) EmitToolCallStart(blockID, stage, toolName, summary, argumentsPreview string, includeRole bool) error { + if e == nil || e.emit == nil { + return nil + } + + text := BuildToolCallReasoningText(toolName, summary, argumentsPreview) + payload, err := ToOpenAIReasoningChunkWithExtra( + e.RequestID, + e.ModelName, + e.Created, + text, + includeRole, + NewToolCallExtra(blockID, stage, toolName, "start", summary, argumentsPreview), + ) + if err != nil { + return err + } + if payload == "" { + return nil + } + return e.emit(payload) +} + +// EmitToolCallResult 输出一次工具调用结果事件。 +func (e *ChunkEmitter) EmitToolCallResult(blockID, stage, toolName, summary, argumentsPreview string, includeRole bool) error { + if e == nil || e.emit == nil { + return nil + } + + text := BuildToolResultReasoningText(toolName, summary) + payload, err := ToOpenAIReasoningChunkWithExtra( + e.RequestID, + e.ModelName, + e.Created, + text, + includeRole, + NewToolResultExtra(blockID, stage, toolName, "done", summary, argumentsPreview), + ) + if err != nil { + return err + } + if payload == "" { + return nil + } + return e.emit(payload) +} + +// EmitConfirmRequest 输出一次待确认事件。 +// +// 当前展示策略: +// 1. 对旧前端,confirm 文案通过 assistant content 直接可见; +// 2. 对新前端,extra.confirm 可直接驱动确认卡片或按钮; +// 3. 默认使用伪流式,避免确认文案整块砸下来太生硬。 +func (e *ChunkEmitter) EmitConfirmRequest(ctx context.Context, blockID, stage, interactionID, title, summary string, options PseudoStreamOptions) error { + if e == nil || e.emit == nil { + return nil + } + + text := BuildConfirmAssistantText(title, summary) + extra := NewConfirmRequestExtra(blockID, stage, interactionID, title, summary) + return e.emitPseudoText( + ctx, + text, + options, + func(chunk string, includeRole bool) error { + payload, err := ToOpenAIAssistantChunkWithExtra( + e.RequestID, + e.ModelName, + e.Created, + chunk, + includeRole, + extra, + ) + if err != nil { + return err + } + if payload == "" { + return nil + } + return e.emit(payload) + }, + ) +} + +// EmitInterruptMessage 输出一次中断提示。 +// +// 适用场景: +// 1. ask_user 追问; +// 2. 告知用户当前会话已进入等待状态; +// 3. 后续 connection_lost 恢复若需要对用户补一句解释,也可复用这一入口。 +func (e *ChunkEmitter) EmitInterruptMessage(ctx context.Context, blockID, stage, interactionID, interactionType, summary string, options PseudoStreamOptions) error { + if e == nil || e.emit == nil { + return nil + } + + text := BuildInterruptAssistantText(interactionType, summary) + extra := NewInterruptExtra(blockID, stage, interactionID, interactionType, summary) + return e.emitPseudoText( + ctx, + text, + options, + func(chunk string, includeRole bool) error { + payload, err := ToOpenAIAssistantChunkWithExtra( + e.RequestID, + e.ModelName, + e.Created, + chunk, + includeRole, + extra, + ) + if err != nil { + return err + } + if payload == "" { + return nil + } + return e.emit(payload) + }, + ) +} + +// EmitFinish 统一输出 stop 结束块,并带上 finish extra。 +func (e *ChunkEmitter) EmitFinish(blockID, stage string) error { + if e == nil || e.emit == nil { + return nil + } + + payload, err := ToOpenAIFinishStreamWithExtra( + e.RequestID, + e.ModelName, + e.Created, + NewFinishExtra(blockID, stage), + ) + if err != nil { + return err + } + if payload == "" { + return nil + } + return e.emit(payload) +} + +// EmitDone 统一输出 OpenAI 兼容流式结束标记。 +func (e *ChunkEmitter) EmitDone() error { + if e == nil || e.emit == nil { + return nil + } + return e.emit("[DONE]") +} + +// EmitStageAsReasoning 把“阶段提示”伪装成 reasoning chunk 推给前端。 +// +// 兼容说明: +// 1. 保留旧函数签名,方便当前旧链路直接复用; +// 2. 实际实现已升级为统一的 ChunkEmitter + status extra; +// 3. 这样后续新链路可以直接跳过这个兼容函数,转用结构化方法。 +func EmitStageAsReasoning(emit PayloadEmitter, requestID, modelName string, created int64, stage, detail string, includeRole bool) error { + return NewChunkEmitter(emit, requestID, modelName, created).EmitStatus(stage, stage, stage, detail, includeRole) } // EmitAssistantReply 把一段完整正文作为 assistant chunk 推出。 // // 注意: -// 1. 这里是“整段发”,不是把文本强行拆碎; -// 2. 这样后续如果某条链路不需要真流式,也可以复用统一出口; -// 3. 真正按 token/chunk 细粒度流式输出,应由 llm.Stream + 上层循环处理。 +// 1. 这里保持“整段发”,不主动切块; +// 2. 若后续某条链路需要更自然的阅读节奏,应直接调用 EmitPseudoAssistantText; +// 3. 为兼容老调用侧,这里 blockID 和 stage 都留空。 func EmitAssistantReply(emit PayloadEmitter, requestID, modelName string, created int64, content string, includeRole bool) error { - if emit == nil { - return nil - } - payload, err := ToOpenAIAssistantChunk(requestID, modelName, created, content, includeRole) - if err != nil { - return err - } - if payload == "" { - return nil - } - return emit(payload) + return NewChunkEmitter(emit, requestID, modelName, created).EmitAssistantText("", "", content, includeRole) } // EmitFinish 统一输出 stop 结束块。 func EmitFinish(emit PayloadEmitter, requestID, modelName string, created int64) error { - if emit == nil { - return nil - } - payload, err := ToOpenAIFinishStream(requestID, modelName, created) - if err != nil { - return err - } - if payload == "" { - return nil - } - return emit(payload) + return NewChunkEmitter(emit, requestID, modelName, created).EmitFinish("", "") } // EmitDone 统一输出 OpenAI 兼容流式结束标记。 func EmitDone(emit PayloadEmitter) error { - if emit == nil { - return nil - } - return emit("[DONE]") + return NewChunkEmitter(emit, "", "", 0).EmitDone() } // BuildStageReasoningText 生成统一阶段提示文本。 @@ -113,3 +409,187 @@ func BuildStageReasoningText(stage, detail string) string { return detail } } + +// BuildToolCallReasoningText 生成“工具调用开始”时的可读提示文本。 +func BuildToolCallReasoningText(toolName, summary, argumentsPreview string) string { + toolName = strings.TrimSpace(toolName) + summary = strings.TrimSpace(summary) + argumentsPreview = strings.TrimSpace(argumentsPreview) + + lines := make([]string, 0, 3) + if toolName != "" { + lines = append(lines, fmt.Sprintf("正在调用工具:%s", toolName)) + } + if summary != "" { + lines = append(lines, summary) + } + if argumentsPreview != "" { + lines = append(lines, fmt.Sprintf("参数摘要:%s", argumentsPreview)) + } + return strings.TrimSpace(strings.Join(lines, "\n")) +} + +// BuildToolResultReasoningText 生成“工具调用结果”时的可读提示文本。 +func BuildToolResultReasoningText(toolName, summary string) string { + toolName = strings.TrimSpace(toolName) + summary = strings.TrimSpace(summary) + + switch { + case toolName != "" && summary != "": + return fmt.Sprintf("工具结果:%s\n%s", toolName, summary) + case toolName != "": + return fmt.Sprintf("工具结果:%s", toolName) + default: + return summary + } +} + +// BuildConfirmAssistantText 生成给用户看的确认文案。 +func BuildConfirmAssistantText(title, summary string) string { + title = strings.TrimSpace(title) + summary = strings.TrimSpace(summary) + + switch { + case title != "" && summary != "": + return fmt.Sprintf("%s\n%s", title, summary) + case title != "": + return title + default: + return summary + } +} + +// BuildInterruptAssistantText 生成给用户看的中断文案。 +func BuildInterruptAssistantText(interactionType, summary string) string { + interactionType = strings.TrimSpace(interactionType) + summary = strings.TrimSpace(summary) + + switch { + case interactionType != "" && summary != "": + return fmt.Sprintf("当前进入 %s 阶段。\n%s", interactionType, summary) + case summary != "": + return summary + default: + return interactionType + } +} + +func (e *ChunkEmitter) emitPseudoText(ctx context.Context, text string, options PseudoStreamOptions, emitChunk func(chunk string, includeRole bool) error) error { + if emitChunk == nil { + return nil + } + text = strings.TrimSpace(text) + if text == "" { + return nil + } + + chunks := SplitPseudoStreamText(text, options) + for i, chunk := range chunks { + if err := emitChunk(chunk, i == 0); err != nil { + return err + } + if i < len(chunks)-1 { + if err := waitPseudoStreamInterval(ctx, options.ChunkInterval); err != nil { + return err + } + } + } + return nil +} + +// SplitPseudoStreamText 按“标点优先、长度兜底”的策略切分整段文本。 +// +// 步骤说明: +// 1. 优先在句号、问号、感叹号、分号、换行等自然边界切块,保证阅读顺畅; +// 2. 若长时间遇不到合适边界,则在 MaxChunkRunes 处强制切块,避免整段卡太久; +// 3. 对中文文本优先按 rune 长度处理,避免多字节字符被截断。 +func SplitPseudoStreamText(text string, options PseudoStreamOptions) []string { + text = strings.TrimSpace(text) + if text == "" { + return nil + } + + options = normalizePseudoStreamOptions(options) + runes := []rune(text) + if len(runes) <= options.MaxChunkRunes { + return []string{text} + } + + chunks := make([]string, 0, len(runes)/options.MinChunkRunes+1) + start := 0 + size := 0 + for i, r := range runes { + size++ + + shouldFlush := false + if size >= options.MaxChunkRunes { + shouldFlush = true + } + if size >= options.MinChunkRunes && isPseudoStreamBoundary(r) { + shouldFlush = true + } + if !shouldFlush { + continue + } + + chunk := strings.TrimSpace(string(runes[start : i+1])) + if chunk != "" { + chunks = append(chunks, chunk) + } + start = i + 1 + size = 0 + } + + if start < len(runes) { + chunk := strings.TrimSpace(string(runes[start:])) + if chunk != "" { + chunks = append(chunks, chunk) + } + } + + if len(chunks) == 0 { + return []string{text} + } + return chunks +} + +func normalizePseudoStreamOptions(options PseudoStreamOptions) PseudoStreamOptions { + if options.MinChunkRunes <= 0 { + options.MinChunkRunes = defaultPseudoStreamMinChunkRunes + } + if options.MaxChunkRunes <= 0 { + options.MaxChunkRunes = defaultPseudoStreamMaxChunkRunes + } + if options.MaxChunkRunes < options.MinChunkRunes { + options.MaxChunkRunes = options.MinChunkRunes + } + return options +} + +func isPseudoStreamBoundary(r rune) bool { + switch r { + case '。', '!', '?', ';', ':', ',', '.', '!', '?', ';', ':', ',', '\n': + return true + default: + return false + } +} + +func waitPseudoStreamInterval(ctx context.Context, interval time.Duration) error { + if interval <= 0 { + return nil + } + if ctx == nil { + ctx = context.Background() + } + + timer := time.NewTimer(interval) + defer timer.Stop() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} diff --git a/backend/newAgent/stream/openai.go b/backend/newAgent/stream/openai.go index 2810250..3b0d601 100644 --- a/backend/newAgent/stream/openai.go +++ b/backend/newAgent/stream/openai.go @@ -8,16 +8,17 @@ import ( // OpenAIChunkResponse 是 OpenAI 兼容的流式 chunk DTO。 // -// 之所以单独放到 Agent/stream: -// 1. 未来无论 quicknote、taskquery 还是 schedule,只要需要 SSE 都会复用这套协议壳; -// 2. 这样 node/graph 层只关注“我要推什么内容”,不再自己拼 JSON; -// 3. 后续如果前端协议升级,也能在这里集中改。 +// 设计说明: +// 1. 外层继续保持 OpenAI 兼容壳,避免前端和调试工具一次性大改; +// 2. 新增顶层 Extra 字段,用来承载“工具调用 / 确认请求 / 中断恢复”等结构化事件; +// 3. 这样旧前端仍可继续读取 delta.content / delta.reasoning_content,新前端则可渐进消费 extra。 type OpenAIChunkResponse struct { ID string `json:"id"` Object string `json:"object"` Created int64 `json:"created"` Model string `json:"model"` - Choices []OpenAIChunkChoice `json:"choices"` + Choices []OpenAIChunkChoice `json:"choices,omitempty"` + Extra *OpenAIChunkExtra `json:"extra,omitempty"` } // OpenAIChunkChoice 对应 OpenAI choices[0]。 @@ -34,13 +35,87 @@ type OpenAIChunkDelta struct { ReasoningContent string `json:"reasoning_content,omitempty"` } +// StreamExtraKind 表示当前 chunk 在业务语义上属于哪类事件。 +type StreamExtraKind string + +const ( + StreamExtraKindReasoningText StreamExtraKind = "reasoning_text" + StreamExtraKindAssistantText StreamExtraKind = "assistant_text" + StreamExtraKindStatus StreamExtraKind = "status" + StreamExtraKindToolCall StreamExtraKind = "tool_call" + StreamExtraKindToolResult StreamExtraKind = "tool_result" + StreamExtraKindConfirm StreamExtraKind = "confirm_request" + StreamExtraKindInterrupt StreamExtraKind = "interrupt" + StreamExtraKindFinish StreamExtraKind = "finish" +) + +// StreamDisplayMode 表示前端更适合如何展示该结构化事件。 +type StreamDisplayMode string + +const ( + StreamDisplayModeAppend StreamDisplayMode = "append" + StreamDisplayModeReplace StreamDisplayMode = "replace" + StreamDisplayModeCard StreamDisplayMode = "card" +) + +// OpenAIChunkExtra 是挂在 OpenAI 兼容壳上的结构化扩展字段。 +// +// 职责边界: +// 1. Kind / Stage / BlockID 提供前端排版和分组所需的最小元信息; +// 2. Status / Tool / Confirm / Interrupt 只存展示层真正需要的摘要,不直接耦合后端完整状态对象; +// 3. Meta 留给后续做灰度扩展,避免每加一种小字段都要立刻改 DTO 结构。 +type OpenAIChunkExtra struct { + Kind StreamExtraKind `json:"kind,omitempty"` + BlockID string `json:"block_id,omitempty"` + Stage string `json:"stage,omitempty"` + DisplayMode StreamDisplayMode `json:"display_mode,omitempty"` + Status *StreamStatusExtra `json:"status,omitempty"` + Tool *StreamToolExtra `json:"tool,omitempty"` + Confirm *StreamConfirmExtra `json:"confirm,omitempty"` + Interrupt *StreamInterruptExtra `json:"interrupt,omitempty"` + Meta map[string]any `json:"meta,omitempty"` +} + +// StreamStatusExtra 表示普通阶段状态或提示性事件。 +type StreamStatusExtra struct { + Code string `json:"code,omitempty"` + Summary string `json:"summary,omitempty"` +} + +// StreamToolExtra 表示一次工具调用相关事件。 +type StreamToolExtra struct { + Name string `json:"name,omitempty"` + Status string `json:"status,omitempty"` + Summary string `json:"summary,omitempty"` + ArgumentsPreview string `json:"arguments_preview,omitempty"` +} + +// StreamConfirmExtra 表示一次待确认事件的展示摘要。 +type StreamConfirmExtra struct { + InteractionID string `json:"interaction_id,omitempty"` + Title string `json:"title,omitempty"` + Summary string `json:"summary,omitempty"` +} + +// StreamInterruptExtra 表示一次中断事件的展示摘要。 +type StreamInterruptExtra struct { + InteractionID string `json:"interaction_id,omitempty"` + Type string `json:"type,omitempty"` + Summary string `json:"summary,omitempty"` +} + // ToOpenAIStream 把 Eino message 转成 OpenAI 兼容 chunk。 +func ToOpenAIStream(chunk *schema.Message, requestID, modelName string, created int64, includeRole bool) (string, error) { + return ToOpenAIStreamWithExtra(chunk, requestID, modelName, created, includeRole, nil) +} + +// ToOpenAIStreamWithExtra 把 Eino message 转成带 extra 的 OpenAI 兼容 chunk。 // // 职责边界: // 1. 负责把 chunk.Content / chunk.ReasoningContent 映射到协议字段; -// 2. 负责按 includeRole 决定是否在首块带上 assistant 角色; +// 2. 负责挂载可选 extra,供前端识别工具调用、确认请求等结构化事件; // 3. 不负责发送,也不负责决定“这个 chunk 该不该推”。 -func ToOpenAIStream(chunk *schema.Message, requestID, modelName string, created int64, includeRole bool) (string, error) { +func ToOpenAIStreamWithExtra(chunk *schema.Message, requestID, modelName string, created int64, includeRole bool, extra *OpenAIChunkExtra) (string, error) { delta := OpenAIChunkDelta{} if includeRole { delta.Role = "assistant" @@ -49,50 +124,177 @@ func ToOpenAIStream(chunk *schema.Message, requestID, modelName string, created delta.Content = chunk.Content delta.ReasoningContent = chunk.ReasoningContent } - return buildOpenAIChunkPayload(requestID, modelName, created, delta, nil) + return buildOpenAIChunkPayload(requestID, modelName, created, delta, nil, extra) } // ToOpenAIReasoningChunk 直接构造一个 reasoning chunk。 func ToOpenAIReasoningChunk(requestID, modelName string, created int64, reasoning string, includeRole bool) (string, error) { + return ToOpenAIReasoningChunkWithExtra(requestID, modelName, created, reasoning, includeRole, nil) +} + +// ToOpenAIReasoningChunkWithExtra 直接构造一个带 extra 的 reasoning chunk。 +func ToOpenAIReasoningChunkWithExtra(requestID, modelName string, created int64, reasoning string, includeRole bool, extra *OpenAIChunkExtra) (string, error) { delta := OpenAIChunkDelta{ReasoningContent: reasoning} if includeRole { delta.Role = "assistant" } - return buildOpenAIChunkPayload(requestID, modelName, created, delta, nil) + return buildOpenAIChunkPayload(requestID, modelName, created, delta, nil, extra) } // ToOpenAIAssistantChunk 直接构造一个正文 chunk。 func ToOpenAIAssistantChunk(requestID, modelName string, created int64, content string, includeRole bool) (string, error) { + return ToOpenAIAssistantChunkWithExtra(requestID, modelName, created, content, includeRole, nil) +} + +// ToOpenAIAssistantChunkWithExtra 直接构造一个带 extra 的正文 chunk。 +func ToOpenAIAssistantChunkWithExtra(requestID, modelName string, created int64, content string, includeRole bool, extra *OpenAIChunkExtra) (string, error) { delta := OpenAIChunkDelta{Content: content} if includeRole { delta.Role = "assistant" } - return buildOpenAIChunkPayload(requestID, modelName, created, delta, nil) + return buildOpenAIChunkPayload(requestID, modelName, created, delta, nil, extra) } // ToOpenAIFinishStream 生成流式结束 chunk(finish_reason=stop)。 func ToOpenAIFinishStream(requestID, modelName string, created int64) (string, error) { - stop := "stop" - return buildOpenAIChunkPayload(requestID, modelName, created, OpenAIChunkDelta{}, &stop) + return ToOpenAIFinishStreamWithExtra(requestID, modelName, created, nil) } -func buildOpenAIChunkPayload(requestID, modelName string, created int64, delta OpenAIChunkDelta, finishReason *string) (string, error) { - // 1. 若既没有 role,也没有正文/思考,也没有 finish_reason,则视为“空块”,直接跳过。 - // 2. 这样可以避免上层每次都自己写一遍空块判断。 - if delta.Role == "" && delta.Content == "" && delta.ReasoningContent == "" && finishReason == nil { +// ToOpenAIFinishStreamWithExtra 生成带 extra 的流式结束 chunk。 +func ToOpenAIFinishStreamWithExtra(requestID, modelName string, created int64, extra *OpenAIChunkExtra) (string, error) { + stop := "stop" + return buildOpenAIChunkPayload(requestID, modelName, created, OpenAIChunkDelta{}, &stop, extra) +} + +// NewReasoningTextExtra 创建“思考文字”事件的 extra。 +func NewReasoningTextExtra(blockID, stage string) *OpenAIChunkExtra { + return &OpenAIChunkExtra{ + Kind: StreamExtraKindReasoningText, + BlockID: blockID, + Stage: stage, + DisplayMode: StreamDisplayModeAppend, + } +} + +// NewAssistantTextExtra 创建“正文文字”事件的 extra。 +func NewAssistantTextExtra(blockID, stage string) *OpenAIChunkExtra { + return &OpenAIChunkExtra{ + Kind: StreamExtraKindAssistantText, + BlockID: blockID, + Stage: stage, + DisplayMode: StreamDisplayModeAppend, + } +} + +// NewStatusExtra 创建普通状态事件的 extra。 +func NewStatusExtra(blockID, stage, code, summary string) *OpenAIChunkExtra { + return &OpenAIChunkExtra{ + Kind: StreamExtraKindStatus, + BlockID: blockID, + Stage: stage, + DisplayMode: StreamDisplayModeCard, + Status: &StreamStatusExtra{ + Code: code, + Summary: summary, + }, + } +} + +// NewToolCallExtra 创建“工具调用开始/中间态”事件的 extra。 +func NewToolCallExtra(blockID, stage, toolName, status, summary, argumentsPreview string) *OpenAIChunkExtra { + return &OpenAIChunkExtra{ + Kind: StreamExtraKindToolCall, + BlockID: blockID, + Stage: stage, + DisplayMode: StreamDisplayModeCard, + Tool: &StreamToolExtra{ + Name: toolName, + Status: status, + Summary: summary, + ArgumentsPreview: argumentsPreview, + }, + } +} + +// NewToolResultExtra 创建“工具结果”事件的 extra。 +func NewToolResultExtra(blockID, stage, toolName, status, summary, argumentsPreview string) *OpenAIChunkExtra { + return &OpenAIChunkExtra{ + Kind: StreamExtraKindToolResult, + BlockID: blockID, + Stage: stage, + DisplayMode: StreamDisplayModeCard, + Tool: &StreamToolExtra{ + Name: toolName, + Status: status, + Summary: summary, + ArgumentsPreview: argumentsPreview, + }, + } +} + +// NewConfirmRequestExtra 创建“待确认”事件的 extra。 +func NewConfirmRequestExtra(blockID, stage, interactionID, title, summary string) *OpenAIChunkExtra { + return &OpenAIChunkExtra{ + Kind: StreamExtraKindConfirm, + BlockID: blockID, + Stage: stage, + DisplayMode: StreamDisplayModeCard, + Confirm: &StreamConfirmExtra{ + InteractionID: interactionID, + Title: title, + Summary: summary, + }, + } +} + +// NewInterruptExtra 创建“中断”事件的 extra。 +func NewInterruptExtra(blockID, stage, interactionID, interactionType, summary string) *OpenAIChunkExtra { + return &OpenAIChunkExtra{ + Kind: StreamExtraKindInterrupt, + BlockID: blockID, + Stage: stage, + DisplayMode: StreamDisplayModeCard, + Interrupt: &StreamInterruptExtra{ + InteractionID: interactionID, + Type: interactionType, + Summary: summary, + }, + } +} + +// NewFinishExtra 创建“收尾完成”事件的 extra。 +func NewFinishExtra(blockID, stage string) *OpenAIChunkExtra { + return &OpenAIChunkExtra{ + Kind: StreamExtraKindFinish, + BlockID: blockID, + Stage: stage, + DisplayMode: StreamDisplayModeReplace, + } +} + +func buildOpenAIChunkPayload(requestID, modelName string, created int64, delta OpenAIChunkDelta, finishReason *string, extra *OpenAIChunkExtra) (string, error) { + // 1. 若既没有 role,也没有正文/思考,也没有 finish_reason,且也没有 extra,则视为“空块”,直接跳过。 + // 2. 这样后续 emitter 即使拆成“结构化事件 + 文本事件”双轨,也能复用统一的空块兜底。 + if delta.Role == "" && delta.Content == "" && delta.ReasoningContent == "" && finishReason == nil && !hasStreamExtra(extra) { return "", nil } + choices := make([]OpenAIChunkChoice, 0, 1) + if delta.Role != "" || delta.Content != "" || delta.ReasoningContent != "" || finishReason != nil { + choices = append(choices, OpenAIChunkChoice{ + Index: 0, + Delta: delta, + FinishReason: finishReason, + }) + } + dto := OpenAIChunkResponse{ ID: requestID, Object: "chat.completion.chunk", Created: created, Model: modelName, - Choices: []OpenAIChunkChoice{{ - Index: 0, - Delta: delta, - FinishReason: finishReason, - }}, + Choices: choices, + Extra: extra, } data, err := json.Marshal(dto) if err != nil { @@ -100,3 +302,18 @@ func buildOpenAIChunkPayload(requestID, modelName string, created int64, delta O } return string(data), nil } + +func hasStreamExtra(extra *OpenAIChunkExtra) bool { + if extra == nil { + return false + } + return extra.Kind != "" || + extra.BlockID != "" || + extra.Stage != "" || + extra.DisplayMode != "" || + extra.Status != nil || + extra.Tool != nil || + extra.Confirm != nil || + extra.Interrupt != nil || + len(extra.Meta) > 0 +}