Version: 0.8.8.dev.260403

后端:
1.新建Deliver节点:LLM生成任务总结,失败降级到机械格式化,伪流式输出
2.新建Confirm节点:确认卡片推送与状态持久化
3.新建Interrupt节点:追问/确认/默认中断三种处理路径
4.实现状态持久化体系:model层定义AgentStateStore接口+AgentStateSnapshot快照,dao/cache.go新增Redis CRUD,agent_nodes层每节点自动存快照、Deliver完成后清理
5.所有model struct补充JSON tags,支持Redis序列化/反序列化
前端:无
仓库:无
This commit is contained in:
LoveLosita
2026-04-03 20:36:31 +08:00
parent 64b946816f
commit 17e3615f74
14 changed files with 1600 additions and 112 deletions

View File

@@ -525,3 +525,82 @@ func (d *CacheDAO) DeleteConversationHistoryFromCache(ctx context.Context, userI
}
return d.client.Del(ctx, d.conversationHistoryKey(userID, normalizedConversationID)).Err()
}
// agentStateKey 返回 agent 运行态快照的 Redis key。
//
// Key 设计:
// 1. 使用 smartflow:agent_state 前缀,与现有 key 命名空间隔离;
// 2. 使用 conversationID 作为唯一标识,因为 agent 状态是按会话维度持久化的。
func (d *CacheDAO) agentStateKey(conversationID string) string {
return fmt.Sprintf("smartflow:agent_state:%s", conversationID)
}
// SaveAgentState 序列化并保存 agent 运行态快照到 Redis。
//
// 职责边界:
// 1. 只负责 JSON 序列化 + Redis SET不做业务校验
// 2. TTL 默认 24h过期自动清理避免已完成任务的快照堆积
// 3. snapshot 为 nil 时直接返回,避免写入无效数据。
func (d *CacheDAO) SaveAgentState(ctx context.Context, conversationID string, snapshot any) error {
if d == nil || d.client == nil {
return errors.New("cache dao is not initialized")
}
normalizedID := strings.TrimSpace(conversationID)
if normalizedID == "" {
return errors.New("conversation_id is empty")
}
if snapshot == nil {
return nil
}
data, err := json.Marshal(snapshot)
if err != nil {
return fmt.Errorf("marshal agent state failed: %w", err)
}
return d.client.Set(ctx, d.agentStateKey(normalizedID), data, 24*time.Hour).Err()
}
// LoadAgentState 从 Redis 读取并反序列化 agent 运行态快照。
//
// 返回值语义:
// 1. (result, true, nil):命中快照,正常返回;
// 2. (nil, false, nil):未命中,不是错误,调用方应走新建对话路径;
// 3. (nil, false, error)Redis 或反序列化错误。
func (d *CacheDAO) LoadAgentState(ctx context.Context, conversationID string, result any) (bool, error) {
if d == nil || d.client == nil {
return false, errors.New("cache dao is not initialized")
}
normalizedID := strings.TrimSpace(conversationID)
if normalizedID == "" {
return false, errors.New("conversation_id is empty")
}
raw, err := d.client.Get(ctx, d.agentStateKey(normalizedID)).Result()
if errors.Is(err, redis.Nil) {
return false, nil
}
if err != nil {
return false, err
}
if err := json.Unmarshal([]byte(raw), result); err != nil {
return false, fmt.Errorf("unmarshal agent state failed: %w", err)
}
return true, nil
}
// DeleteAgentState 删除指定会话的 agent 运行态快照。
//
// 语义:
// 1. 删除操作是幂等的key 不存在也视为成功;
// 2. 典型调用时机Deliver 节点任务完成后清理。
func (d *CacheDAO) DeleteAgentState(ctx context.Context, conversationID string) error {
if d == nil || d.client == nil {
return errors.New("cache dao is not initialized")
}
normalizedID := strings.TrimSpace(conversationID)
if normalizedID == "" {
return errors.New("conversation_id is empty")
}
return d.client.Del(ctx, d.agentStateKey(normalizedID)).Err()
}

View File

@@ -41,16 +41,16 @@ func RunAgentGraph(ctx context.Context, input newagentmodel.AgentGraphRunInput)
if err := g.AddLambdaNode(NodePlan, compose.InvokableLambda(nodes.Plan)); err != nil {
return nil, err
}
if err := g.AddLambdaNode(NodeConfirm, compose.InvokableLambda(confirmNode)); err != nil {
if err := g.AddLambdaNode(NodeConfirm, compose.InvokableLambda(nodes.Confirm)); err != nil {
return nil, err
}
if err := g.AddLambdaNode(NodeExecute, compose.InvokableLambda(nodes.Execute)); err != nil {
return nil, err
}
if err := g.AddLambdaNode(NodeInterrupt, compose.InvokableLambda(interruptNode)); err != nil {
if err := g.AddLambdaNode(NodeInterrupt, compose.InvokableLambda(nodes.Interrupt)); err != nil {
return nil, err
}
if err := g.AddLambdaNode(NodeDeliver, compose.InvokableLambda(deliverNode)); err != nil {
if err := g.AddLambdaNode(NodeDeliver, compose.InvokableLambda(nodes.Deliver)); err != nil {
return nil, err
}
@@ -130,61 +130,20 @@ func RunAgentGraph(ctx context.Context, input newagentmodel.AgentGraphRunInput)
return runnable.Invoke(ctx, state)
}
// --- 占位节点,后续逐步由 node 层替换 ---
func confirmNode(_ context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) {
if st == nil {
return nil, errors.New("confirm node: state is nil")
}
st.EnsureFlowState()
st.EnsureConversationContext()
st.EnsureChunkEmitter()
// TODO:
// 1. 这里不再做 confirm 节点内自循环等待,而是统一走中断恢复模式。
// 2. 节点职责是生成确认事件、固化待执行工具快照,并调用 st.OpenConfirmInteraction(...)。
// 3. 当前连接随后会流向 interrupt 节点收口;用户确认/取消后,由外部回调恢复到 executing 或 planning。
// 4. 这里不要直接执行写工具,必须先把待执行工具调用固化为 pending snapshot。
return st, nil
}
func interruptNode(_ context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) {
if st == nil {
return nil, errors.New("interrupt node: state is nil")
}
st.EnsureFlowState()
st.EnsureConversationContext()
st.EnsureChunkEmitter()
// TODO:
// 1. 若 PendingInteraction.Type=ask_user则像普通聊天一样流式吐出问题文本。
// 2. 若 PendingInteraction.Type=confirm则推送前端可识别的确认事件并把待执行工具调用一起带上。
// 3. 输出完成后,立刻把 AgentRuntimeState 快照持久化到 Redis + MySQL形成后续恢复点。
// 4. 当前节点结束后必须断开连接,等待用户聊天回复或确认回调重新进入 graph。
return st, nil
}
func deliverNode(_ context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) {
if st == nil {
return nil, errors.New("deliver node: state is nil")
}
flowState := st.EnsureFlowState()
st.EnsureConversationContext()
st.EnsureChunkEmitter()
// TODO: 将执行结果推给用户,并在所有外部落库完成后再标记 done。
flowState.Done()
return st, nil
}
// --- 分支函数 ---
func branchAfterChat(_ context.Context, st *newagentmodel.AgentGraphState) (string, error) {
if st == nil {
return compose.END, nil
}
if nextNode, interrupted := branchIfInterrupted(st); interrupted {
return nextNode, nil
}
flowState := st.EnsureFlowState()
if flowState == nil {
return compose.END, nil
}
switch flowState.Phase {
case newagentmodel.PhasePlanning:
return NodePlan, nil
@@ -201,11 +160,17 @@ func branchAfterChat(_ context.Context, st *newagentmodel.AgentGraphState) (stri
}
func branchAfterPlan(_ context.Context, st *newagentmodel.AgentGraphState) (string, error) {
if st == nil {
return NodePlan, nil
}
if nextNode, interrupted := branchIfInterrupted(st); interrupted {
return nextNode, nil
}
flowState := st.EnsureFlowState()
if flowState == nil {
return NodePlan, nil
}
if flowState.Phase == newagentmodel.PhaseWaitingConfirm {
return NodeConfirm, nil
}
@@ -213,11 +178,17 @@ func branchAfterPlan(_ context.Context, st *newagentmodel.AgentGraphState) (stri
}
func branchAfterConfirm(_ context.Context, st *newagentmodel.AgentGraphState) (string, error) {
if st == nil {
return NodePlan, nil
}
if nextNode, interrupted := branchIfInterrupted(st); interrupted {
return nextNode, nil
}
flowState := st.EnsureFlowState()
if flowState == nil {
return NodePlan, nil
}
switch flowState.Phase {
case newagentmodel.PhaseExecuting:
return NodeExecute, nil
@@ -231,11 +202,17 @@ func branchAfterConfirm(_ context.Context, st *newagentmodel.AgentGraphState) (s
}
func branchAfterExecute(_ context.Context, st *newagentmodel.AgentGraphState) (string, error) {
if st == nil {
return NodeExecute, nil
}
if nextNode, interrupted := branchIfInterrupted(st); interrupted {
return nextNode, nil
}
flowState := st.EnsureFlowState()
if flowState == nil {
return NodeExecute, nil
}
if flowState.Phase == newagentmodel.PhaseWaitingConfirm {
return NodeConfirm, nil
}

View File

@@ -15,27 +15,27 @@ const DefaultMaxRounds = 30
// CommonState 承载可持久化的主流程状态。
//
// 职责边界:
// 1. 负责记录当前处于哪个阶段、当前计划是什么、执行到了第几步、已经消耗了多少轮
// 1. 负责记录"当前处于哪个阶段、当前计划是什么、执行到了第几步、已经消耗了多少轮"
// 2. 负责提供最小必要的安全访问方法,避免 graph/node/prompt 层到处手写切片越界判断;
// 3. 不负责承载对话历史、tool schema、pinned context 这类模型输入材料,它们仍然属于 ConversationContext。
type CommonState struct {
// 身份信息
TraceID string
UserID int
ConversationID string
TraceID string `json:"trace_id"`
UserID int `json:"user_id"`
ConversationID string `json:"conversation_id"`
// 流程阶段
Phase Phase
Phase Phase `json:"phase"`
// 计划状态
// 1. 这里直接使用结构化的 PlanStep避免 planning -> execute 之间丢失 done_when。
// 2. CurrentStep 表示当前 plan 步骤下标,不是 execute 内部 ReAct 的思考轮次。
PlanSteps []PlanStep
CurrentStep int
// 2. CurrentStep 表示"当前 plan 步骤下标",不是 execute 内部 ReAct 的思考轮次。
PlanSteps []PlanStep `json:"plan_steps"`
CurrentStep int `json:"current_step"`
// 安全边界
MaxRounds int
RoundUsed int
MaxRounds int `json:"max_rounds"`
RoundUsed int `json:"round_used"`
}
func NewCommonState(traceID string, userID int, conversationID string) *CommonState {
@@ -97,7 +97,7 @@ func (s *CommonState) Done() {
// HasPlan 判断当前 state 是否已经持有一份完整计划。
//
// 职责边界:
// 1. 负责收口是否存在 plan这一层判断,避免外层到处写 len(PlanSteps) > 0
// 1. 负责收口"是否存在 plan"这一层判断,避免外层到处写 len(PlanSteps) > 0
// 2. 不判断 CurrentStep 当前是否有效,当前步骤是否合法由 HasCurrentPlanStep 回答;
// 3. state 为空时统一返回 false调用方可据此决定是否回退到 planning。
func (s *CommonState) HasPlan() bool {
@@ -123,7 +123,7 @@ func (s *CommonState) CurrentPlanStep() (PlanStep, bool) {
return s.PlanSteps[s.CurrentStep], true
}
// HasCurrentPlanStep 判断当前步骤是否存在且可安全读取。
// HasCurrentPlanStep 判断"当前步骤"是否存在且可安全读取。
func (s *CommonState) HasCurrentPlanStep() bool {
_, ok := s.CurrentPlanStep()
return ok

View File

@@ -6,45 +6,42 @@ import (
"github.com/cloudwego/eino/schema"
)
// ConversationContext 承载本轮要喂给模型的输入材料
// ConversationContext 承载"本轮要喂给模型的输入材料"
//
// 职责边界:
// 1. 负责保存 system prompt、对话历史、置顶注入块、工具 schema 摘要;
// 2. 负责提供最小必要的安全访问方法,避免 node / prompt 层直接散落切片操作;
// 3. 不负责流程推进phase / round / current step 仍归 CommonState 管;
// 4. 不负责真正的 prompt 组装,消息如何拼接仍应放在 prompt 层处理。
//
// TODO(newagent/prompt): 后续由 plan / execute 的 prompt builder 读取这里的数据,组装真正发给 LLM 的 messages。
// TODO(newagent/node): 后续 planNode / executeNode 只通过这里的访问方法读写上下文,避免多处直接改切片。
type ConversationContext struct {
SystemPrompt string
History []*schema.Message
PinnedBlocks []ContextBlock
ToolSchemas []ToolSchemaContext
SystemPrompt string `json:"system_prompt"`
History []*schema.Message `json:"history"`
PinnedBlocks []ContextBlock `json:"pinned_blocks"`
ToolSchemas []ToolSchemaContext `json:"-"` // 每次请求由 Service 层重新注入,不持久化
}
// ContextBlock 表示一段可被置顶注入的自然语言上下文。
// ContextBlock 表示一段可被"置顶注入"的自然语言上下文。
//
// 设计目的:
// 1. Key 用于让调用方按语义覆盖,例如 current_plan / current_step / execution_rule
// 2. Title 用于 prompt 层后续决定是否渲染成小标题;
// 3. Content 存真正的自然语言内容,保持你当前plan 用自然语言表达的思路。
// 3. Content 存真正的自然语言内容,保持你当前"plan 用自然语言表达"的思路。
type ContextBlock struct {
Key string
Title string
Content string
Key string `json:"key"`
Title string `json:"title"`
Content string `json:"content"`
}
// ToolSchemaContext 是工具描述的轻量快照。
//
// 职责边界:
// 1. 这里只保留 prompt 注入真正需要的摘要信息;
// 2. SchemaText 约定存已经整理好的自然语言 / JSON schema 摘要
// 2. SchemaText 约定存"已经整理好的自然语言 / JSON schema 摘要"
// 3. 不直接耦合具体 tool registry 里的复杂结构,避免 model 层反向依赖工具实现。
type ToolSchemaContext struct {
Name string
Desc string
SchemaText string
Name string `json:"name"`
Desc string `json:"desc"`
SchemaText string `json:"schema_text"`
}
// NewConversationContext 创建最小上下文容器。
@@ -65,7 +62,7 @@ func (c *ConversationContext) SetSystemPrompt(systemPrompt string) {
// ReplaceHistory 整体替换对话历史。
//
// 职责边界:
// 1. 负责把会话快照恢复这类场景需要的一次性覆盖入口收口到这里;
// 1. 负责把"会话快照恢复"这类场景需要的一次性覆盖入口收口到这里;
// 2. 只复制消息切片本身,避免调用方后续 append 污染同一底层数组;
// 3. 不深拷贝每个 message 指针,消息对象本身仍默认由上游按只读方式使用。
func (c *ConversationContext) ReplaceHistory(history []*schema.Message) {
@@ -105,7 +102,7 @@ func (c *ConversationContext) HistorySnapshot() []*schema.Message {
//
// 步骤说明:
// 1. Key 为空时直接忽略,因为后续无法做稳定覆盖;
// 2. 若已存在同 Key block则原位覆盖保证当前 plan / 当前步骤这类上下文始终只有一份;
// 2. 若已存在同 Key block则原位覆盖保证"当前 plan / 当前步骤"这类上下文始终只有一份;
// 3. 若不存在,则追加到末尾,至于渲染顺序由 prompt 层统一决定;
// 4. 此处不自动裁剪旧内容,避免 model 层擅自丢信息。
func (c *ConversationContext) UpsertPinnedBlock(block ContextBlock) {

View File

@@ -39,6 +39,7 @@ type AgentGraphDeps struct {
ExecuteClient *newagentllm.Client
DeliverClient *newagentllm.Client
ChunkEmitter *newagentstream.ChunkEmitter
StateStore AgentStateStore
}
// EnsureChunkEmitter 保证 graph 运行时始终有一个可用的 chunk 发射器。

View File

@@ -6,7 +6,7 @@ const (
// PhaseChatting 表示当前请求只需正常聊天,不进入 plan / execute 主链路。
PhaseChatting Phase = "chatting"
// PhaseInterrupted 表示本轮执行被待用户交互显式打断,当前连接应结束并等待恢复。
// PhaseInterrupted 表示本轮执行被"待用户交互"显式打断,当前连接应结束并等待恢复。
PhaseInterrupted Phase = "interrupted"
)
@@ -30,49 +30,52 @@ const (
PendingInteractionStatusCanceled PendingInteractionStatus = "canceled"
)
// PendingToolCallSnapshot 保存待确认工具调用的最小快照。
// PendingToolCallSnapshot 保存"待确认工具调用"的最小快照。
//
// 职责边界:
// 1. 负责保存真正落库 / 落缓存恢复执行所需的最小信息;
// 2. ArgsJSON 约定存已经序列化好的参数快照,避免此处反向依赖具体 tool 参数结构;
// 3. 不负责工具执行,不负责幂等校验,不负责回滚。
type PendingToolCallSnapshot struct {
ToolName string
ArgsJSON string
Summary string
ToolName string `json:"tool_name"`
ArgsJSON string `json:"args_json"`
Summary string `json:"summary"`
}
// PendingInteraction 保存本轮需要中断并等待用户后续动作的交互快照。
// PendingInteraction 保存"本轮需要中断并等待用户后续动作"的交互快照。
//
// 设计目的:
// 1. ask_user 与 confirm 都不是业务 tool而是流程级中断所以单独建模
// 2. ResumeNode / ResumePhase / ResumeStep 用来记录恢复点,避免用户回答后整条链路从头乱跑;
// 3. 该结构设计成可被 Redis + MySQL 直接存储的快照骨架,后续只需要补序列化与持久化接线。
//
// TODO(newagent/store): 后续把该结构整体快照到 Redis + MySQL形成双保险恢复点
// TODO(newagent/api): 后续由“用户追问回复接口 / 确认回调接口”读取这份快照并恢复运行。
// TODO(newagent/api): 后续由"用户追问回复接口 / 确认回调接口"读取这份快照并恢复运行
type PendingInteraction struct {
Version int
InteractionID string
Type PendingInteractionType
Status PendingInteractionStatus
DisplayText string
ResumeNode string
ResumePhase Phase
ResumeStep int
PendingTool *PendingToolCallSnapshot
Metadata map[string]any
Version int `json:"version"`
InteractionID string `json:"interaction_id"`
Type PendingInteractionType `json:"type"`
Status PendingInteractionStatus `json:"status"`
DisplayText string `json:"display_text"`
ResumeNode string `json:"resume_node"`
ResumePhase Phase `json:"resume_phase"`
ResumeStep int `json:"resume_step"`
PendingTool *PendingToolCallSnapshot `json:"pending_tool,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
// AgentRuntimeState 是 graph 运行时真正流转的状态容器。
//
// 职责边界:
// 1. CommonState 继续只负责主流程控制;
// 2. PendingInteraction 负责承载需要中断后恢复的交互快照;
// 2. PendingInteraction 负责承载"需要中断后恢复"的交互快照;
// 3. 这样既不污染 CommonState 的职责,又能让 graph 在一次入参里拿到完整运行态。
type AgentRuntimeState struct {
*CommonState
PendingInteraction *PendingInteraction
*CommonState `json:"common_state"`
// PendingInteraction 承载挂起交互的持久化快照。
PendingInteraction *PendingInteraction `json:"pending_interaction,omitempty"`
// PendingConfirmTool 是 Execute → Confirm 之间传递待确认工具信息的临时邮箱。
// Execute 节点写入Confirm 节点读出并清空,不参与持久化。
PendingConfirmTool *PendingToolCallSnapshot `json:"-"`
}
// NewAgentRuntimeState 创建 graph 运行态。
@@ -120,7 +123,7 @@ func (s *AgentRuntimeState) PendingInteractionType() PendingInteractionType {
return s.PendingInteraction.Type
}
// OpenAskUserInteraction 打开一个向用户追问的中断快照。
// OpenAskUserInteraction 打开一个"向用户追问"的中断快照。
func (s *AgentRuntimeState) OpenAskUserInteraction(interactionID, question, resumeNode string) {
s.openPendingInteraction(
PendingInteractionTypeAskUser,
@@ -131,7 +134,7 @@ func (s *AgentRuntimeState) OpenAskUserInteraction(interactionID, question, resu
)
}
// OpenConfirmInteraction 打开一个写操作待确认的中断快照。
// OpenConfirmInteraction 打开一个"写操作待确认"的中断快照。
func (s *AgentRuntimeState) OpenConfirmInteraction(interactionID, confirmText, resumeNode string, pendingTool *PendingToolCallSnapshot) {
s.openPendingInteraction(
PendingInteractionTypeConfirm,
@@ -166,7 +169,7 @@ func (s *AgentRuntimeState) ResumeFromPending() bool {
//
// 职责边界:
// 1. 仅负责粗暴清空快照;
// 2. 不自动恢复 phase / step避免误把取消交互”与“恢复执行混为一谈;
// 2. 不自动恢复 phase / step避免误把"取消交互"与"恢复执行"混为一谈;
// 3. 若需要恢复流程,应优先使用 ResumeFromPending。
func (s *AgentRuntimeState) ClearPendingInteraction() {
if s == nil || s.PendingInteraction == nil {
@@ -207,7 +210,7 @@ func (s *AgentRuntimeState) openPendingInteraction(
// 1. 一旦进入 pending 状态,当前连接上的 graph 应立即停止向后执行。
// 2. 这里先统一把 Phase 置为 interrupted后续恢复时再按快照写回原阶段。
// 3. 这样分支函数只需要判断 HasPendingInteraction(),无需猜测当前 phase 是否仍可信
// 3. 这样分支函数只需要判断 HasPendingInteraction(),无需猜测"当前 phase 是否仍可信"
flowState.Phase = PhaseInterrupted
}

View File

@@ -0,0 +1,49 @@
package model
import "context"
// AgentStateSnapshot 是需要持久化的 agent 运行态最小快照。
//
// 设计说明:
// 1. 只保存恢复执行所需的 RuntimeState 和 ConversationContext
// 2. 不保存 Request每轮请求级天然不跨连接
// 3. 不保存 Deps依赖注入每次由 Service 层重建);
// 4. 不保存 ToolSchemas每次请求由 Service 层重新注入)。
type AgentStateSnapshot struct {
RuntimeState *AgentRuntimeState `json:"runtime_state"`
ConversationContext *ConversationContext `json:"conversation_context"`
}
// AgentStateStore 定义 agent 状态持久化的最小接口。
//
// 职责边界:
// 1. 只负责"存 / 取 / 删"三个原子操作;
// 2. 不负责序列化细节(由实现层决定 JSON / protobuf
// 3. 不负责业务级状态校验,校验仍在 node / graph 层完成。
//
// 实现层:
// 1. dao/cache.go 上的 CacheDAO 隐式实现该接口Go duck typing
// 2. newAgent 包不直接 import dao由 Service 层在组装 Deps 时注入。
type AgentStateStore interface {
// Save 序列化并保存一份 agent 状态快照。
//
// 语义:
// 1. 同一 conversationID 被覆盖写入,保证 Redis 里始终只有最新快照;
// 2. 实现层应设 TTL避免已完成的任务快照永不清理。
Save(ctx context.Context, conversationID string, snapshot *AgentStateSnapshot) error
// Load 读取并反序列化 agent 状态快照。
//
// 返回值语义:
// 1. (snapshot, true, nil):命中快照,正常返回;
// 2. (nil, false, nil):未命中,不是错误,调用方应走新建对话路径;
// 3. (nil, false, error):真正的存储层错误。
Load(ctx context.Context, conversationID string) (*AgentStateSnapshot, bool, error)
// Delete 删除指定会话的 agent 状态快照。
//
// 语义:
// 1. 删除是幂等的key 不存在也视为成功;
// 2. 典型调用时机Deliver 节点任务完成后清理。
Delete(ctx context.Context, conversationID string) error
}

View File

@@ -11,8 +11,8 @@ import (
//
// 职责边界:
// 1. 负责把 node 层真正实现的方法统一暴露给 graph 注册;
// 2. 负责收口graph 只编排、node 真执行的结构约束;
// 3. 当前先迁移 Plan其他节点后续按同样模式逐步下沉
// 2. 负责收口"graph 只编排、node 真执行"的结构约束;
// 3. 负责在每个节点执行成功后统一做状态持久化Save/Delete
type AgentNodes struct{}
// NewAgentNodes 创建通用节点容器。
@@ -25,7 +25,7 @@ func NewAgentNodes() *AgentNodes {
// 职责边界:
// 1. 这里只做 graph -> node 的参数转接;
// 2. 真正的入口逻辑仍由 RunChatNode 负责;
// 3. 这样 graph 层后续只需挂 n.Chat而不再自己维护占位 chatNode
// 3. Chat 的 Save 交给 Service 层处理,这里不做持久化
func (n *AgentNodes) Chat(ctx context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) {
if st == nil {
return nil, errors.New("chat node: state is nil")
@@ -47,12 +47,39 @@ func (n *AgentNodes) Chat(ctx context.Context, st *newagentmodel.AgentGraphState
return st, nil
}
// Confirm 是确认阶段的正式节点方法。
//
// 职责边界:
// 1. 这里只做 graph -> node 的参数转接;
// 2. 真正的确认逻辑仍由 RunConfirmNode 负责;
// 3. 不需要 LLM Client — 确认内容由已有状态机械格式化。
// 4. Confirm 执行成功后保存状态,因为它创建了 PendingInteraction。
func (n *AgentNodes) Confirm(ctx context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) {
if st == nil {
return nil, errors.New("confirm node: state is nil")
}
if err := RunConfirmNode(
ctx,
ConfirmNodeInput{
RuntimeState: st.EnsureRuntimeState(),
ConversationContext: st.EnsureConversationContext(),
ChunkEmitter: st.EnsureChunkEmitter(),
},
); err != nil {
return nil, err
}
saveAgentState(ctx, st)
return st, nil
}
// Plan 是规划阶段的正式节点方法。
//
// 职责边界:
// 1. 这里只做 graph -> node 的参数转接;
// 2. 真正的单轮规划逻辑仍由 RunPlanNode 负责;
// 3. 这样 graph 层后续只需挂 n.Plan而不再自己维护占位 planNode
// 3. Plan 执行成功后保存状态,支持意外断线恢复
func (n *AgentNodes) Plan(ctx context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) {
if st == nil {
return nil, errors.New("plan node: state is nil")
@@ -71,6 +98,33 @@ func (n *AgentNodes) Plan(ctx context.Context, st *newagentmodel.AgentGraphState
); err != nil {
return nil, err
}
saveAgentState(ctx, st)
return st, nil
}
// Interrupt 是中断阶段的正式节点方法。
//
// 职责边界:
// 1. 这里只做 graph -> node 的参数转接;
// 2. 真正的中断逻辑仍由 RunInterruptNode 负责;
// 3. 不需要 LLM Client — 所有文本已在 PendingInteraction 里。
// 4. 不需要 Save — 上游节点Plan/Execute/Confirm已经存过了。
func (n *AgentNodes) Interrupt(ctx context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) {
if st == nil {
return nil, errors.New("interrupt node: state is nil")
}
if err := RunInterruptNode(
ctx,
InterruptNodeInput{
RuntimeState: st.EnsureRuntimeState(),
ConversationContext: st.EnsureConversationContext(),
ChunkEmitter: st.EnsureChunkEmitter(),
},
); err != nil {
return nil, err
}
return st, nil
}
@@ -78,13 +132,13 @@ func (n *AgentNodes) Plan(ctx context.Context, st *newagentmodel.AgentGraphState
//
// 职责边界:
// 1. 这里只做 graph -> node 的参数转接;
// 2. 真正的单轮执行逻辑仍由 RunExecuteNode 负责
// 3. 这样 graph 层后续只需挂 n.Execute而不再自己维护占位 executeNode。
// 2. 真正的单轮执行逻辑仍由 RunExecuteNode 负责
//
// 设计原则:
// 1. LLM 主导LLM 自己判断 done_when 是否满足,自己决定何时推进/完成;
// 2. 后端兜底:只做资源控制、安全兜底、证据记录;
// 3. 不做硬校验:后端不质疑 LLM 的 advance/complete 决策。
// 4. Execute 每轮执行成功后保存状态,支持意外断线恢复。
func (n *AgentNodes) Execute(ctx context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) {
if st == nil {
return nil, errors.New("execute node: state is nil")
@@ -103,5 +157,102 @@ func (n *AgentNodes) Execute(ctx context.Context, st *newagentmodel.AgentGraphSt
); err != nil {
return nil, err
}
saveAgentState(ctx, st)
return st, nil
}
// Deliver 是交付阶段的正式节点方法。
//
// 职责边界:
// 1. 这里只做 graph -> node 的参数转接;
// 2. 真正的交付逻辑仍由 RunDeliverNode 负责;
// 3. 调 LLM 生成任务总结,失败时降级到机械格式化。
// 4. 任务完成后删除 Redis 快照,清理持久化状态。
func (n *AgentNodes) Deliver(ctx context.Context, st *newagentmodel.AgentGraphState) (*newagentmodel.AgentGraphState, error) {
if st == nil {
return nil, errors.New("deliver node: state is nil")
}
if err := RunDeliverNode(
ctx,
DeliverNodeInput{
RuntimeState: st.EnsureRuntimeState(),
ConversationContext: st.EnsureConversationContext(),
Client: st.Deps.ResolveDeliverClient(),
ChunkEmitter: st.EnsureChunkEmitter(),
},
); err != nil {
return nil, err
}
deleteAgentState(ctx, st)
return st, nil
}
// --- 持久化辅助 ---
// saveAgentState 在节点执行成功后,将当前运行态快照保存到 Redis。
//
// 设计原则:
// 1. Save 失败只记日志,不中断 Graph 流程;
// 2. StateStore 为空时静默跳过(骨架期 / 测试环境);
// 3. conversationID 为空时也静默跳过,避免写入无效 key。
//
// TODO: 接入项目统一的日志框架后,把 _ = err 改成结构化日志。
func saveAgentState(ctx context.Context, st *newagentmodel.AgentGraphState) {
if st == nil {
return
}
store := st.Deps.StateStore
if store == nil {
return
}
runtimeState := st.EnsureRuntimeState()
if runtimeState == nil {
return
}
flowState := runtimeState.EnsureCommonState()
if flowState == nil || flowState.ConversationID == "" {
return
}
snapshot := &newagentmodel.AgentStateSnapshot{
RuntimeState: runtimeState,
ConversationContext: st.EnsureConversationContext(),
}
_ = store.Save(ctx, flowState.ConversationID, snapshot)
}
// deleteAgentState 在任务完成后,删除 Redis 中的运行态快照。
//
// 设计原则:
// 1. Delete 失败只记日志,不中断 Graph 流程;
// 2. 删除是幂等的key 不存在也视为成功;
// 3. StateStore 为空时静默跳过。
//
// TODO: 接入项目统一的日志框架后,把 _ = err 改成结构化日志。
func deleteAgentState(ctx context.Context, st *newagentmodel.AgentGraphState) {
if st == nil {
return
}
store := st.Deps.StateStore
if store == nil {
return
}
runtimeState := st.EnsureRuntimeState()
if runtimeState == nil {
return
}
flowState := runtimeState.EnsureCommonState()
if flowState == nil || flowState.ConversationID == "" {
return
}
_ = store.Delete(ctx, flowState.ConversationID)
}

View File

@@ -0,0 +1,208 @@
package newagentnode
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream"
)
const (
confirmStageName = "confirm"
confirmStatusBlockID = "confirm.status"
)
// ConfirmNodeInput 描述确认节点单轮运行所需的最小依赖。
//
// 职责边界:
// 1. 不需要 LLM Client — 确认内容由已有状态机械格式化,不调模型;
// 2. RuntimeState 提供计划步骤和待确认工具快照;
// 3. ChunkEmitter 负责推送确认事件到前端。
type ConfirmNodeInput struct {
RuntimeState *newagentmodel.AgentRuntimeState
ConversationContext *newagentmodel.ConversationContext
ChunkEmitter *newagentstream.ChunkEmitter
}
// RunConfirmNode 执行一轮确认节点逻辑。
//
// 核心职责:
// 1. 判断确认来源:有 PendingConfirmTool → 工具确认;有 PlanSteps → 计划确认;
// 2. 机械格式化确认内容(不需要 LLM 调用);
// 3. 推送确认事件 EmitConfirmRequest → 前端渲染确认卡片;
// 4. 调用 OpenConfirmInteraction 固化中断快照Phase 自动变为 interrupted。
//
// 设计原则:
// 1. 不等待用户响应 — 等待是 interruptNode 的职责;
// 2. 不执行任何工具 — 只固化"意图",执行留给恢复后的 Execute
// 3. Confirm 是图里唯一负责"生成确认事件 + 固化快照"的地方,上游节点只设 Phase。
func RunConfirmNode(ctx context.Context, input ConfirmNodeInput) error {
runtimeState, _, emitter, err := prepareConfirmNodeInput(input)
if err != nil {
return err
}
flowState := runtimeState.EnsureCommonState()
// 优先处理工具确认Execute 发起的写操作确认)。
if runtimeState.PendingConfirmTool != nil {
return handleToolConfirm(ctx, runtimeState, flowState, emitter)
}
// 其次处理计划确认Plan 完成后的整体验收)。
if flowState.HasPlan() {
return handlePlanConfirm(ctx, runtimeState, flowState, emitter)
}
// 既没有工具也没有计划 → 异常状态,不应到达此处。
return fmt.Errorf("confirm node: 没有可确认的内容(无计划、无待确认工具)")
}
// handlePlanConfirm 处理计划确认。
//
// 流程:
// 1. 从 flowState.PlanSteps 格式化可读摘要;
// 2. 推送确认事件到前端;
// 3. 调用 OpenConfirmInteraction 固化快照(无 PendingTool
func handlePlanConfirm(
ctx context.Context,
runtimeState *newagentmodel.AgentRuntimeState,
flowState *newagentmodel.CommonState,
emitter *newagentstream.ChunkEmitter,
) error {
summary := buildPlanSummary(flowState.PlanSteps)
interactionID := generateConfirmInteractionID(flowState)
if err := emitter.EmitConfirmRequest(
ctx, confirmStatusBlockID, confirmStageName,
interactionID,
"计划确认",
summary,
newagentstream.DefaultPseudoStreamOptions(),
); err != nil {
return fmt.Errorf("计划确认事件推送失败: %w", err)
}
runtimeState.OpenConfirmInteraction(
interactionID,
summary,
"plan",
nil,
)
_ = emitter.EmitStatus(
confirmStatusBlockID, confirmStageName,
"plan_confirm", "计划已生成,等待用户确认。", false,
)
return nil
}
// handleToolConfirm 处理工具确认。
//
// 流程:
// 1. 从 PendingConfirmTool 构建确认摘要;
// 2. 推送确认事件到前端;
// 3. 调用 OpenConfirmInteraction 固化快照(含 PendingTool
// 4. 清空 PendingConfirmTool 临时邮箱。
func handleToolConfirm(
ctx context.Context,
runtimeState *newagentmodel.AgentRuntimeState,
flowState *newagentmodel.CommonState,
emitter *newagentstream.ChunkEmitter,
) error {
pendingTool := runtimeState.PendingConfirmTool
summary := buildToolConfirmSummary(pendingTool)
interactionID := generateConfirmInteractionID(flowState)
if err := emitter.EmitConfirmRequest(
ctx, confirmStatusBlockID, confirmStageName,
interactionID,
"操作确认",
summary,
newagentstream.DefaultPseudoStreamOptions(),
); err != nil {
return fmt.Errorf("工具确认事件推送失败: %w", err)
}
runtimeState.OpenConfirmInteraction(
interactionID,
summary,
"execute",
pendingTool,
)
// 确认快照已固化到 PendingInteraction清空临时邮箱。
runtimeState.PendingConfirmTool = nil
_ = emitter.EmitStatus(
confirmStatusBlockID, confirmStageName,
"tool_confirm", "操作等待确认。", false,
)
return nil
}
// buildPlanSummary 把 PlanSteps 格式化成人类可读的确认摘要。
func buildPlanSummary(steps []newagentmodel.PlanStep) string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("共 %d 步:\n", len(steps)))
for i, step := range steps {
sb.WriteString(fmt.Sprintf("%d. %s", i+1, step.Content))
if step.DoneWhen != "" {
sb.WriteString(fmt.Sprintf("(完成条件:%s", step.DoneWhen))
}
sb.WriteString("\n")
}
return strings.TrimSpace(sb.String())
}
// buildToolConfirmSummary 从工具快照构建确认摘要。
func buildToolConfirmSummary(tool *newagentmodel.PendingToolCallSnapshot) string {
if tool == nil {
return "待确认操作"
}
if tool.Summary != "" {
return tool.Summary
}
detail := fmt.Sprintf("即将执行工具:%s", tool.ToolName)
if tool.ArgsJSON != "" {
var args map[string]any
if json.Unmarshal([]byte(tool.ArgsJSON), &args) == nil && len(args) > 0 {
detail += fmt.Sprintf(",参数:%s", tool.ArgsJSON)
}
}
return detail
}
// generateConfirmInteractionID 生成确认交互的唯一标识。
func generateConfirmInteractionID(flowState *newagentmodel.CommonState) string {
prefix := flowState.TraceID
if prefix == "" {
prefix = "confirm"
}
return fmt.Sprintf("%s-%d", prefix, time.Now().UnixMilli())
}
// prepareConfirmNodeInput 校验并准备确认节点的运行态依赖。
func prepareConfirmNodeInput(input ConfirmNodeInput) (
*newagentmodel.AgentRuntimeState,
*newagentmodel.ConversationContext,
*newagentstream.ChunkEmitter,
error,
) {
if input.RuntimeState == nil {
return nil, nil, nil, fmt.Errorf("confirm node: runtime state 不能为空")
}
input.RuntimeState.EnsureCommonState()
if input.ConversationContext == nil {
input.ConversationContext = newagentmodel.NewConversationContext("")
}
if input.ChunkEmitter == nil {
input.ChunkEmitter = newagentstream.NewChunkEmitter(
newagentstream.NoopPayloadEmitter(), "", "", time.Now().Unix(),
)
}
return input.RuntimeState, input.ConversationContext, input.ChunkEmitter, nil
}

View File

@@ -0,0 +1,184 @@
package newagentnode
import (
"context"
"fmt"
"strings"
"time"
"github.com/cloudwego/eino/schema"
newagentllm "github.com/LoveLosita/smartflow/backend/newAgent/llm"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt"
newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream"
)
const (
deliverStageName = "deliver"
deliverStatusBlockID = "deliver.status"
deliverSpeakBlockID = "deliver.speak"
)
// DeliverNodeInput 描述交付节点单轮运行所需的最小依赖。
//
// 职责边界:
// 1. 只负责生成交付总结并推送给用户,不负责后续流程推进;
// 2. RuntimeState 提供计划步骤和执行状态;
// 3. ConversationContext 提供执行阶段的对话历史;
// 4. 交付完成后标记流程结束。
type DeliverNodeInput struct {
RuntimeState *newagentmodel.AgentRuntimeState
ConversationContext *newagentmodel.ConversationContext
Client *newagentllm.Client
ChunkEmitter *newagentstream.ChunkEmitter
}
// RunDeliverNode 执行一轮交付节点逻辑。
//
// 核心职责:
// 1. 调 LLM 基于原始计划 + 执行历史生成交付总结;
// 2. 伪流式推送总结给用户;
// 3. 写入对话历史,保证上下文连续;
// 4. 标记流程结束。
//
// 降级策略:
// 1. LLM 调用失败时,回退到机械格式化总结,不中断流程;
// 2. 机械总结包含计划步骤列表和完成进度。
func RunDeliverNode(ctx context.Context, input DeliverNodeInput) error {
runtimeState, conversationContext, emitter, err := prepareDeliverNodeInput(input)
if err != nil {
return err
}
flowState := runtimeState.EnsureCommonState()
// 1. 推送交付阶段状态,让前端知道正在生成总结。
if err := emitter.EmitStatus(
deliverStatusBlockID,
deliverStageName,
"summarizing",
"正在生成交付总结。",
false,
); err != nil {
return fmt.Errorf("交付阶段状态推送失败: %w", err)
}
// 2. 调 LLM 生成交付总结。
summary := generateDeliverSummary(ctx, input.Client, flowState, conversationContext)
// 3. 伪流式推送总结。
if strings.TrimSpace(summary) != "" {
if err := emitter.EmitPseudoAssistantText(
ctx,
deliverSpeakBlockID,
deliverStageName,
summary,
newagentstream.DefaultPseudoStreamOptions(),
); err != nil {
return fmt.Errorf("交付总结推送失败: %w", err)
}
conversationContext.AppendHistory(schema.AssistantMessage(summary, nil))
}
// 4. 推送最终完成状态。
_ = emitter.EmitStatus(
deliverStatusBlockID,
deliverStageName,
"done",
"任务已完成。",
true,
)
// 5. 标记流程结束。
flowState.Done()
return nil
}
// generateDeliverSummary 尝试调用 LLM 生成交付总结,失败时降级到机械格式化。
func generateDeliverSummary(
ctx context.Context,
client *newagentllm.Client,
flowState *newagentmodel.CommonState,
conversationContext *newagentmodel.ConversationContext,
) string {
if client == nil {
return buildMechanicalSummary(flowState)
}
messages := newagentprompt.BuildDeliverMessages(flowState, conversationContext)
result, err := client.GenerateText(
ctx,
messages,
newagentllm.GenerateOptions{
Temperature: 0.5,
MaxTokens: 800,
Thinking: newagentllm.ThinkingModeDisabled,
Metadata: map[string]any{
"stage": deliverStageName,
},
},
)
if err != nil || result == nil || strings.TrimSpace(result.Text) == "" {
return buildMechanicalSummary(flowState)
}
return strings.TrimSpace(result.Text)
}
// buildMechanicalSummary 在 LLM 不可用时,机械拼接一份最小可用总结。
func buildMechanicalSummary(state *newagentmodel.CommonState) string {
if state == nil {
return "任务流程已结束。"
}
var sb strings.Builder
current, total := state.PlanProgress()
if !state.HasPlan() {
return "任务流程已结束。"
}
if state.Exhausted() {
sb.WriteString(fmt.Sprintf("任务因执行轮次耗尽提前结束,已完成 %d/%d 步。\n", current, total))
} else {
sb.WriteString("所有计划步骤已执行完毕。\n")
}
sb.WriteString("\n执行情况\n")
for i, step := range state.PlanSteps {
marker := "[ ]"
if i < current {
marker = "[x]"
}
sb.WriteString(fmt.Sprintf("%s %s\n", marker, strings.TrimSpace(step.Content)))
}
if state.Exhausted() && current < total {
sb.WriteString("\n如需继续完成剩余步骤可以告诉我继续。")
}
return sb.String()
}
// prepareDeliverNodeInput 校验并准备交付节点的运行态依赖。
func prepareDeliverNodeInput(input DeliverNodeInput) (
*newagentmodel.AgentRuntimeState,
*newagentmodel.ConversationContext,
*newagentstream.ChunkEmitter,
error,
) {
if input.RuntimeState == nil {
return nil, nil, nil, fmt.Errorf("deliver node: runtime state 不能为空")
}
input.RuntimeState.EnsureCommonState()
if input.ConversationContext == nil {
input.ConversationContext = newagentmodel.NewConversationContext("")
}
if input.ChunkEmitter == nil {
input.ChunkEmitter = newagentstream.NewChunkEmitter(
newagentstream.NoopPayloadEmitter(), "", "", time.Now().Unix(),
)
}
return input.RuntimeState, input.ConversationContext, input.ChunkEmitter, nil
}

View File

@@ -2,6 +2,7 @@ package newagentnode
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
@@ -177,6 +178,11 @@ func RunExecuteNode(ctx context.Context, input ExecuteNodeInput) error {
runtimeState.OpenAskUserInteraction(uuid.NewString(), question, strings.TrimSpace(input.ResumeNode))
return nil
case newagentmodel.ExecuteActionConfirm:
// LLM 申报了写操作意图,需要用户确认后才能真正执行。
// 步骤1) 把 ToolCallIntent 转成快照暂存2) 设 Phase → 下游 confirm 节点接管。
return handleExecuteActionConfirm(decision, runtimeState, flowState)
case newagentmodel.ExecuteActionNextPlan:
// LLM 判定当前步骤已完成,推进到下一步。
// 后端信任 LLM 判断,不做硬校验。
@@ -253,6 +259,39 @@ func resolveExecuteAskUserText(decision *newagentmodel.ExecuteDecision) string {
return "执行过程中遇到不确定的情况,需要向你确认。"
}
// handleExecuteActionConfirm 处理 LLM 申报的写操作确认请求。
//
// 步骤:
// 1. 把 ToolCallIntent 转成 PendingToolCallSnapshot 暂存到运行态;
// 2. 设 Phase = PhaseWaitingConfirm让下游 confirm 节点接管;
// 3. 不执行工具,也不生成确认事件 — 这些都是 confirm 节点的职责。
func handleExecuteActionConfirm(
decision *newagentmodel.ExecuteDecision,
runtimeState *newagentmodel.AgentRuntimeState,
flowState *newagentmodel.CommonState,
) error {
toolCall := decision.ToolCall
// 序列化工具参数。
argsJSON := ""
if toolCall.Arguments != nil {
if raw, err := json.Marshal(toolCall.Arguments); err == nil {
argsJSON = string(raw)
}
}
// 暂存到运行态邮箱confirm 节点会读出来。
runtimeState.PendingConfirmTool = &newagentmodel.PendingToolCallSnapshot{
ToolName: toolCall.Name,
ArgsJSON: argsJSON,
Summary: strings.TrimSpace(decision.Speak),
}
// 设 Phase让 branchAfterExecute 路由到 confirm 节点。
flowState.Phase = newagentmodel.PhaseWaitingConfirm
return nil
}
// executeToolCall 执行工具调用并记录证据。
//
// 职责边界:

View File

@@ -0,0 +1,153 @@
package newagentnode
import (
"context"
"fmt"
"time"
"github.com/cloudwego/eino/schema"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream"
)
const (
interruptStageName = "interrupt"
interruptSpeakBlockID = "interrupt.speak"
interruptStatusBlockID = "interrupt.status"
)
// InterruptNodeInput 描述中断节点单轮运行所需的最小依赖。
//
// 职责边界:
// 1. 不需要 LLM Client — 所有文本已在 PendingInteraction.DisplayText 里;
// 2. RuntimeState 提供 PendingInteraction
// 3. ChunkEmitter 负责推送收尾消息。
type InterruptNodeInput struct {
RuntimeState *newagentmodel.AgentRuntimeState
ConversationContext *newagentmodel.ConversationContext
ChunkEmitter *newagentstream.ChunkEmitter
}
// RunInterruptNode 执行一轮中断节点逻辑。
//
// 核心职责:
// 1. ask_user → 把 DisplayText 当普通 assistant 消息伪流式输出,说完就停;
// 2. confirm → 确认卡片已由 confirm 节点推送,无需额外输出;
// 3. 状态持久化已由 agent_nodes 层统一处理Interrupt 不再需要自行存快照;
// 4. 节点结束后 graph 走 END当前连接断开。
//
// 设计原则:
// 1. 中断就是正常对话的结束 — 助手说了问题/确认卡片,然后停下来等用户回复;
// 2. 用户下次回复时走正常 chat 入口chat 节点负责 resume
// 3. 不做特殊 UI不需要前端适配新的交互模式。
func RunInterruptNode(ctx context.Context, input InterruptNodeInput) error {
runtimeState, conversationContext, emitter, err := prepareInterruptNodeInput(input)
if err != nil {
return err
}
pending := runtimeState.PendingInteraction
if pending == nil {
// 无 pending interaction → 不应到达此处,防御性返回。
return fmt.Errorf("interrupt node: 无待处理交互")
}
switch pending.Type {
case newagentmodel.PendingInteractionTypeAskUser:
return handleInterruptAskUser(ctx, pending, conversationContext, emitter)
case newagentmodel.PendingInteractionTypeConfirm:
return handleInterruptConfirm(pending, emitter)
default:
// connection_lost 等其他类型 → 仅持久化,不输出。
return handleInterruptDefault(pending, emitter)
}
}
// handleInterruptAskUser 处理追问型中断。
//
// 把 PendingInteraction.DisplayText 当普通 assistant 消息伪流式输出,
// 写入历史,然后结束。用户体验和正常对话一样 — 助手问了问题,停下来等回复。
func handleInterruptAskUser(
ctx context.Context,
pending *newagentmodel.PendingInteraction,
conversationContext *newagentmodel.ConversationContext,
emitter *newagentstream.ChunkEmitter,
) error {
text := pending.DisplayText
if text == "" {
text = "请补充更多信息。"
}
// 伪流式输出,和 chatReply 一样的体感。
if err := emitter.EmitPseudoAssistantText(
ctx, interruptSpeakBlockID, interruptStageName,
text,
newagentstream.DefaultPseudoStreamOptions(),
); err != nil {
return fmt.Errorf("追问消息推送失败: %w", err)
}
// 写入对话历史,下一轮 resume 时 LLM 能看到这个上下文。
conversationContext.AppendHistory(schema.AssistantMessage(text, nil))
// 状态持久化已由 agent_nodes 层统一处理,此处不再需要自行存快照。
_ = emitter.EmitStatus(
interruptStatusBlockID, interruptStageName,
"ask_user", "已追问用户,等待回复。", false,
)
return nil
}
// handleInterruptConfirm 处理确认型中断。
//
// 确认卡片已由 confirm 节点推送,这里只需推送状态通知并持久化。
func handleInterruptConfirm(
pending *newagentmodel.PendingInteraction,
emitter *newagentstream.ChunkEmitter,
) error {
// 状态持久化已由 agent_nodes 层统一处理,此处不再需要自行存快照。
_ = emitter.EmitStatus(
interruptStatusBlockID, interruptStageName,
"confirm", "等待用户确认。", false,
)
return nil
}
// handleInterruptDefault 处理其他类型的中断(如 connection_lost
func handleInterruptDefault(
pending *newagentmodel.PendingInteraction,
emitter *newagentstream.ChunkEmitter,
) error {
// 状态持久化已由 agent_nodes 层统一处理,此处不再需要自行存快照。
_ = emitter.EmitStatus(
interruptStatusBlockID, interruptStageName,
"interrupted", "会话已中断。", false,
)
return nil
}
// prepareInterruptNodeInput 校验并准备中断节点的运行态依赖。
func prepareInterruptNodeInput(input InterruptNodeInput) (
*newagentmodel.AgentRuntimeState,
*newagentmodel.ConversationContext,
*newagentstream.ChunkEmitter,
error,
) {
if input.RuntimeState == nil {
return nil, nil, nil, fmt.Errorf("interrupt node: runtime state 不能为空")
}
input.RuntimeState.EnsureCommonState()
if input.ConversationContext == nil {
input.ConversationContext = newagentmodel.NewConversationContext("")
}
if input.ChunkEmitter == nil {
input.ChunkEmitter = newagentstream.NewChunkEmitter(
newagentstream.NoopPayloadEmitter(), "", "", time.Now().Unix(),
)
}
return input.RuntimeState, input.ConversationContext, input.ChunkEmitter, nil
}

View File

@@ -0,0 +1,67 @@
package newagentprompt
import (
"fmt"
"strings"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
"github.com/cloudwego/eino/schema"
)
const deliverSystemPrompt = `
你是 SmartFlow NewAgent 的交付器。
你的职责是基于原始计划和执行历史,生成一份简洁、诚实的任务完成总结。
请遵守以下规则:
1. 只基于已有历史和计划状态生成总结,不要编造未执行的操作。
2. 如果所有步骤都已完成,简要总结每一步的成果。
3. 如果是因轮次耗尽提前结束,如实告知用户当前进度和未完成的部分。
4. 使用自然、友好的语气,不要机械地罗列步骤。
5. 如果用户后续可能需要继续操作,给出简短的建议。
6. 只输出总结文本,不要输出 JSON不要输出 markdown 标题。
你会看到:
- 原始计划步骤及完成判定
- 当前执行进度
- 执行阶段的对话历史
`
// BuildDeliverSystemPrompt 返回交付阶段系统提示词。
func BuildDeliverSystemPrompt() string {
return strings.TrimSpace(deliverSystemPrompt)
}
// BuildDeliverMessages 组装交付阶段的 messages。
func BuildDeliverMessages(state *newagentmodel.CommonState, ctx *newagentmodel.ConversationContext) []*schema.Message {
return buildStageMessages(
BuildDeliverSystemPrompt(),
ctx,
BuildDeliverUserPrompt(state),
)
}
// BuildDeliverUserPrompt 构造交付阶段的用户提示词。
func BuildDeliverUserPrompt(state *newagentmodel.CommonState) string {
var sb strings.Builder
sb.WriteString("请为当前任务生成完成总结。\n")
sb.WriteString(renderStateSummary(state))
sb.WriteString("\n")
if state == nil || !state.HasPlan() {
sb.WriteString("当前没有正式计划,请基于对话历史简要总结本次交互。\n")
return strings.TrimSpace(sb.String())
}
current, total := state.PlanProgress()
exhausted := state.Exhausted()
if exhausted {
sb.WriteString(fmt.Sprintf("注意:任务因轮次耗尽提前结束,当前进度 %d/%d。\n", current, total))
sb.WriteString("请如实说明已完成和未完成的部分,并建议用户如何继续。\n")
} else {
sb.WriteString("所有计划步骤已执行完毕,请总结整体成果。\n")
}
return strings.TrimSpace(sb.String())
}

View File

@@ -0,0 +1,580 @@
# 日程工具设计文档
> 本文档定义了 newAgent 日程调度场景下的工具层设计。
> 工具是 LLM 与日程数据之间的唯一边界——LLM 只能通过工具的输入/输出与日程交互,永远不直接接触原始数据。
---
## 1. 设计原则
1. **工具即边界**LLM 通过工具感知和修改日程,不直接接触 state 或数据库
2. **自然语言返回**:工具返回值为自然语言 + 轻结构缩进、列表LLM 直接理解
3. **只报事实,不做判断**:读工具只报当前真实状态,不附建议/推荐/假设;写工具只报变更后的事实
4. **操作前自动校验**:写工具在执行前自动检测冲突和锁定,失败时 state 不变
5. **State 内操作**:所有写工具只修改内存中的 state不直接写库整个方案完成后由 Confirm 节点统一写库
---
## 2. 索引体系
LLM 不接触真实的日期和星期,使用两级整数索引:
- **天索引day**:规划窗口内的天数编号,从 1 开始连续递增
- 例如规划窗口为第1周周三至第3周周一共13天编号为第1天~第13天
- 工具层负责 day ↔ 真实日期 的映射
- 规划窗口由 Plan 节点向用户确认,不明确时走 ask_user
- **时段索引slot**:每天内的节课编号,范围 1-12
- 标准节次1-2, 3-4, 5-6, 7-8, 9-10, 11-12共6个标准段
- 连堂课可能跨越1-33连堂、1-44连堂、9-124连堂
- 任务时段用 (slot_start, slot_end) 表示,例如 (1, 4) = 第1-4节
- **任务定位**day + slot_start + slot_end例如 (3, 1, 4) = 第3天第1-4节
---
## 3. State 数据结构
State 是工具层的操作对象,存在于内存中,不直接暴露给 LLM。
### 3.1 整体结构
```json
{
"window": {
"total_days": 13,
"day_mapping": [
{ "day_index": 1, "week": 5, "day_of_week": 1 },
{ "day_index": 2, "week": 5, "day_of_week": 2 },
{ "day_index": 3, "week": 5, "day_of_week": 3 },
{ "day_index": 4, "week": 5, "day_of_week": 4 },
{ "day_index": 5, "week": 5, "day_of_week": 5 },
{ "day_index": 6, "week": 5, "day_of_week": 6 },
{ "day_index": 7, "week": 5, "day_of_week": 7 },
{ "day_index": 8, "week": 6, "day_of_week": 1 },
{ "day_index": 9, "week": 6, "day_of_week": 2 },
{ "day_index": 10, "week": 6, "day_of_week": 3 },
{ "day_index": 11, "week": 6, "day_of_week": 4 },
{ "day_index": 12, "week": 6, "day_of_week": 5 },
{ "day_index": 13, "week": 6, "day_of_week": 6 }
]
},
"tasks": [
{
"state_id": 1,
"source": "event",
"source_id": 101,
"name": "高等数学",
"category": "课程",
"status": "existing",
"locked": true,
"slots": [
{ "day": 1, "slot_start": 1, "slot_end": 2 },
{ "day": 4, "slot_start": 1, "slot_end": 2 },
{ "day": 8, "slot_start": 1, "slot_end": 2 }
]
},
{
"state_id": 2,
"source": "event",
"source_id": 102,
"name": "思政(水课)",
"category": "课程",
"status": "existing",
"locked": false,
"can_embed": true,
"slots": [
{ "day": 2, "slot_start": 1, "slot_end": 2 }
]
},
{
"state_id": 3,
"source": "task_item",
"source_id": 201,
"name": "复习线代",
"category": "学习",
"status": "pending",
"duration": 3,
"category_id": 10
}
]
}
```
### 3.2 字段说明
**任务通用字段:**
| 字段 | 类型 | 说明 |
|------|------|------|
| `state_id` | int | State 内唯一 ID递增工具层和 LLM 使用此 ID 交互 |
| `source` | string | 数据来源:`"event"` = 来自 ScheduleEvent`"task_item"` = 来自 TaskClassItem |
| `source_id` | int | 原表主键ScheduleEvent.ID 或 TaskClassItem.ID写库时用于反查 |
| `name` | string | 任务名称,来自 ScheduleEvent.Name 或 TaskClassItem.Content |
| `category` | string | 类别名,来自 TaskClass.Name如"课程"、"学习"、"作业" |
| `status` | string | `"existing"`(已安排)| `"pending"`(待安排)|
| `locked` | bool | 是否锁定。推导规则ScheduleEvent.Type="course" 且 CanBeEmbed=false 时为 true |
| `slots` | array | 已安排任务的时段列表,每项含 day/slot_start/slot_end |
| `duration` | int | 待安排任务需要的连续时段数(仅 pending 任务) |
| `category_id` | int | 所属 TaskClass 的 ID仅 source=task_item 时有值) |
**嵌入任务相关字段(仅 can_embed=true 的任务):**
| 字段 | 类型 | 说明 |
|------|------|------|
| `can_embed` | bool | 该时段是否允许嵌入其他任务,来自 ScheduleEvent.CanBeEmbedded |
| `embedded_by` | int | 被哪个 state_id 的任务嵌入(宿主视角) |
| `embed_host` | int | 嵌入到哪个 state_id 的时段里(嵌入任务视角) |
### 3.3 数据来源与映射
**existing 任务(从数据库加载):**
| State 字段 | 数据库来源 |
|-----------|-----------|
| source_id | ScheduleEvent.ID |
| name | ScheduleEvent.Name |
| category | ScheduleEvent.Type"course"→"课程""task"→取关联 TaskClass.Name |
| locked | ScheduleEvent.Type="course" 且 CanBeEmbedded=false |
| can_embed | ScheduleEvent.CanBeEmbedded |
| slots | 查 Schedule 表WHERE event_id=? AND week/day_of_week IN 窗口范围),按 section 连续段压缩 |
**pending 任务(从数据库加载):**
| State 字段 | 数据库来源 |
|-----------|-----------|
| source_id | TaskClassItem.ID |
| name | TaskClassItem.Content |
| category | 关联 TaskClass.Name通过 CategoryID |
| duration | 由 TaskClass.TotalSlots / Item 数量推算,或固定为 2 |
| category_id | TaskClassItem.CategoryID |
### 3.4 Section 压缩/解压
数据库中 Schedule 表逐节存储每节一条记录State 中压缩为连续范围:
```
DB 记录:
Schedule(event_id=101, week=5, day_of_week=1, section=1)
Schedule(event_id=101, week=5, day_of_week=1, section=2)
压缩为 State
{ "day": 1, "slot_start": 1, "slot_end": 2 }
```
反向操作(写库时):将 slot_start/slot_end 展开为逐条 Schedule 记录插入。
### 3.5 Day 映射
工具层通过 day_mapping 数组完成 day_index ↔ (week, day_of_week) 的双向转换:
- **读操作**:从 Schedule 表查到 (week=5, day_of_week=1, section=1),通过 day_mapping 反查 day_index=1
- **写操作**LLM 指定 day=3通过 day_mapping 查到 (week=5, day_of_week=3),用于构造 Schedule 记录
- **规划窗口**:由 Plan 节点确认范围,工具层初始化时生成 day_mapping
---
## 4. 读工具
### 4.1 get_overview
获取规划窗口的粗粒度总览,用于建立全局感知。
**入参:**
**返回示例:**
```
规划窗口共13天每天12个时段总计156个时段。
当前已占用48个空闲108个。待安排任务3个。
每日概况:
第1天占6/12 — [1]高等数学(1-2节) [2]英语(3-4节) [4]体育(5-6节)
第2天占2/12 — [5]物理(3-4节)
第3天占0/12
第4天占8/12 — [1]高等数学(1-2节) [6]线代(3-4节) [8]程序设计(9-10节)
第5天占0/12
第6天占2/12 — [2]英语(1-2节)
第7天占2/12 — [10]思政(1-2节,可嵌入)
第8天占4/12 — [1]高等数学(1-2节) [5]物理(3-4节)
第9天占0/12
第10天占0/12
第11天占0/12
第12天占0/12
第13天占0/12
可嵌入时段第7天 [10]思政(1-2节)
待安排:[3]复习线代(需3时段) [7]写实验报告(需2时段) [9]小组讨论(需2时段)
```
---
### 4.2 query_range
查看某天(或某天某段)的细粒度占用详情。
**入参:**
| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| day | int | 是 | 天索引 |
| slot_start | int | 否 | 起始节次,不传则返回整天 |
| slot_end | int | 否 | 结束节次,不传则返回整天 |
**返回示例(查整天):**
```
第4天 全天:
第1-2节[1]高等数学(固定)
第3-4节[6]线代
第5-6节
第7-8节
第9-10节[8]程序设计
第11-12节
连续空闲区第5-8节(4时段)、第11-12节(2时段)
可嵌入第1-2节已有[1]高等数学(固定,不可嵌入)
```
**返回示例(查具体范围):**
```
第4天 第5-8节
第5节
第6节
第7节
第8节
该范围4个时段全部空闲。
```
---
### 4.3 find_free
查找满足指定连续时段长度的空闲位置。
**入参:**
| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| duration | int | 是 | 需要的连续时段数 |
| day | int | 否 | 限定某天,不传则搜索全部天 |
**返回示例:**
```
满足3个连续空闲时段的位置
第2天 第5-8节4时段连续空闲
第3天 第1-6节6时段连续空闲
第3天 第7-12节6时段连续空闲
第5天 第1-12节12时段连续空闲
第6天 第3-5节3时段连续空闲
第9天 第1-3节3时段连续空闲
第10天 第5-7节3时段连续空闲
可嵌入位置(水课时段,可叠加任务):
第7天 第1-2节[10]思政,当前无嵌入任务)
```
---
### 4.4 list_tasks
列出任务清单,可按类别和状态过滤。
**入参:**
| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| category | string | 否 | 过滤类别(对应 TaskClass.Name如"课程"、"学习" |
| status | string | 否 | existing / pending / all默认 all |
**返回示例(待安排):**
```
待安排任务共3个
[3]复习线代 — 需3个连续时段类别学习
[7]写实验报告 — 需2个连续时段类别作业
[9]小组讨论 — 需2个连续时段类别学习
```
**返回示例(全部):**
```
共9个任务已安排6个待安排3个。
已安排:
[1]高等数学(课程,固定) — 第1天(1-2节) 第4天(1-2节) 第8天(1-2节)
[2]英语(课程) — 第1天(3-4节) 第6天(1-2节)
[4]体育(课程) — 第1天(5-6节)
[5]物理(课程) — 第2天(3-4节) 第8天(3-4节)
[6]线代(学习) — 第4天(3-4节)
[8]程序设计(课程) — 第4天(9-10节)
[10]思政(课程,可嵌入) — 第7天(1-2节)
待安排:
[3]复习线代(学习) — 需3时段
[7]写实验报告(作业) — 需2时段
[9]小组讨论(学习) — 需2时段
```
---
### 4.5 get_task_info
查询单个任务的详细信息。
**入参:**
| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| task_id | int | 是 | 任务 ID |
**返回示例(普通任务):**
```
[1]高等数学
类别:课程 | 状态:已安排(固定)
来源:课程表
占用时段:
第1天 第1-2节
第4天 第1-2节
第8天 第1-2节
```
**返回示例(可嵌入任务):**
```
[10]思政
类别:课程 | 状态:已安排
来源:课程表
可嵌入:是(允许在此时段嵌入其他任务)
占用时段:
第7天 第1-2节
当前嵌入任务:无
```
---
## 5. 写工具
### 5.1 place
将待安排任务放置到指定位置。
**入参:**
| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| task_id | int | 是 | 待安排任务的 ID |
| day | int | 是 | 目标天索引 |
| slot_start | int | 是 | 目标起始节次 |
**成功返回:**
```
已将 [3]复习线代 放到第5天第1-3节。
第5天当前占用[3]复习线代(1-3节)占用3/12。
待安排任务剩余2个。
```
**失败返回(冲突):**
```
放置失败第5天第1-2节已被 [4]体育 占用。
第5天当前占用[4]体育(1-4节)占用4/12。空闲时段第5-12节。
```
**失败返回(状态错误):**
```
放置失败:[1]高等数学 不是待安排任务,无法放置。
```
**成功返回(嵌入到水课):**
```
已将 [7]写实验报告 嵌入到第7天第1-2节宿主[10]思政)。
第7天当前占用[10]思政(1-2节) [7]写实验报告(嵌入1-2节)占用2/12。
待安排任务剩余2个。
```
---
### 5.2 move
移动已有任务到新位置。
**入参:**
| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| task_id | int | 是 | 任务 ID |
| new_day | int | 是 | 目标天索引 |
| new_slot_start | int | 是 | 目标起始节次 |
**成功返回:**
```
已将 [6]线代 从第4天第3-4节移至第9天第1-2节。
第4天当前占用[1]高等数学(1-2节) [8]程序设计(9-10节)占用4/12。
第9天当前占用[6]线代(1-2节)占用2/12。
```
**失败返回(冲突):**
```
移动失败第9天第1-2节已被 [9]小组讨论 占用。
第9天当前占用[9]小组讨论(1-2节)占用2/12。空闲时段第3-12节。
```
**失败返回(锁定):**
```
移动失败:[1]高等数学 是固定课程,不可移动。
```
**失败返回(状态错误):**
```
移动失败:[3]复习线代 当前为待安排状态,请使用 place 放置。
```
---
### 5.3 swap
交换两个已安排任务的位置。
**入参:**
| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| task_a | int | 是 | 任务 A 的 ID |
| task_b | int | 是 | 任务 B 的 ID |
**成功返回:**
```
交换完成:
[2]英语第1天第3-4节 → 第6天第1-2节
[6]线代第6天第1-2节 → 第1天第3-4节
第1天当前占用[1]高等数学(1-2节) [6]线代(3-4节) [4]体育(5-6节)占用6/12。
第6天当前占用[2]英语(1-2节)占用2/12。
```
**失败返回(时长不匹配):**
```
交换失败:[5]物理 占4个时段[2]英语 占2个时段时长不同无法直接交换。
```
**失败返回(任一任务锁定):**
```
交换失败:[1]高等数学 是固定课程,不可交换。
```
---
### 5.4 batch_move
批量原子移动多个任务,要么全部成功,要么全部回滚。
**入参:**
| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| moves | array | 是 | 每项包含 task_id, new_day, new_slot_start |
**成功返回:**
```
批量移动完成3个任务全部成功
[2]英语 → 第3天第1-2节
[6]线代 → 第5天第3-4节
[8]程序设计 → 第9天第5-6节
第3天当前占用[2]英语(1-2节)占用2/12。
第5天当前占用[6]线代(3-4节)占用2/12。
第9天当前占用[8]程序设计(5-6节)占用2/12。
```
**失败返回:**
```
批量移动失败,全部回滚,无任何变更。
冲突:[6]线代 → 第5天第3-4节该位置已被 [3]复习线代(1-3节) 占用。
```
---
### 5.5 unplace
将已安排任务恢复为待安排状态。
**入参:**
| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| task_id | int | 是 | 任务 ID |
**成功返回:**
```
已将 [3]复习线代 从第5天第1-3节移除恢复为待安排状态。
第5天当前占用0/12。
待安排任务剩余1个。
```
**失败返回(锁定):**
```
移除失败:[1]高等数学 是固定课程,不可移除。
```
---
## 6. 公共规则
### 冲突检测
- 所有写操作执行前自动检测目标位置是否冲突
- 冲突时拒绝操作,返回冲突任务名称和占用节次
- state 保持不变
### 锁定保护
- locked=true 的任务move / swap / unplace 直接拒绝
- place 新任务到锁定时段同样拒绝
### 状态约束
- pending 任务只能 place不能 move / swap
- existing 任务可以 move / swap / unplace
- 状态不符时返回明确错误信息
### 返回格式
- 返回值为自然语言 + 轻结构(缩进、列表)
- 占用信息始终附带每个任务的具体节次范围
- 读工具只报当前真实状态,不做假设
- 写工具只报变更后的事实,不附建议
### ID 规范
- LLM 可见的任务 ID 为 `state_id`(递增整数),不暴露 source/source_id
- `state_id` 由工具层在加载 state 时分配,不区分来源
- `source` + `source_id` 为内部字段,仅在写库时使用,不对 LLM 可见
### 嵌入任务规则
- `can_embed=true` 的任务(水课)允许其他任务嵌入到同一时段
- 嵌入任务占位时不触发冲突检测(与宿主共存)
- `find_free` 返回结果中标注可嵌入时段,让 LLM 知道哪里可以叠加
- `place` 到可嵌入时段时,若已有宿主任务,自动标记 embed_host 关系
- 嵌入任务的 locked 继承宿主:宿主不可移动时,嵌入任务也不可单独移动
### 数据库交互
- State 初始化:从 Schedule + ScheduleEvent 加载 existing 任务,从 TaskClassItem 加载 pending 任务
- State 落库Confirm 节点统一处理,将 state 变更转换为 Schedule/ScheduleEvent/TaskClassItem 的增删改
- 落库时使用 source + source_id 定位原记录,使用 day_mapping 将 day_index 转回 (week, day_of_week)
- 落库时将 (slot_start, slot_end) 展开为逐条 Schedule 记录