Files
smartmate/backend/newAgent/graph/common_graph.go
Losita 6d22acb270 Version: 0.8.4.dev.260329
后端:
1.新建newAgent文件夹,是的你没听错,刚刚搬迁完的旧结构又准备推翻了:因为通用性太差,用户需求复杂一点就招架不了。最新的架构已经在路上,这应该是这个项目的正确路线了,目前正在搭骨架。

前端:
无改动

全仓库:
无改动
2026-03-29 22:12:23 +08:00

293 lines
9.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package graph
import (
"context"
"errors"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
"github.com/cloudwego/eino/compose"
)
const (
GraphName = "agent_loop"
NodeChat = "chat"
NodePlan = "plan"
NodeConfirm = "confirm"
NodeExecute = "execute"
NodeInterrupt = "interrupt"
NodeDeliver = "deliver"
)
func RunAgentGraph(ctx context.Context, state *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) {
if state == nil {
return nil, errors.New("agent graph: state is nil")
}
flowState := state.EnsureCommonState()
if flowState == nil {
return nil, errors.New("agent graph: common state is nil")
}
g := compose.NewGraph[*newagentmodel.AgentRuntimeState, *newagentmodel.AgentRuntimeState]()
// --- 注册节点 ---
if err := g.AddLambdaNode(NodeChat, compose.InvokableLambda(chatNode)); err != nil {
return nil, err
}
if err := g.AddLambdaNode(NodePlan, compose.InvokableLambda(planNode)); err != nil {
return nil, err
}
if err := g.AddLambdaNode(NodeConfirm, compose.InvokableLambda(confirmNode)); err != nil {
return nil, err
}
if err := g.AddLambdaNode(NodeExecute, compose.InvokableLambda(executeNode)); err != nil {
return nil, err
}
if err := g.AddLambdaNode(NodeInterrupt, compose.InvokableLambda(interruptNode)); err != nil {
return nil, err
}
if err := g.AddLambdaNode(NodeDeliver, compose.InvokableLambda(deliverNode)); err != nil {
return nil, err
}
// --- 连边 ---
// 1. 所有请求统一先过 chat 入口,这样普通聊天、首次任务、恢复执行都走同一入口。
// 2. chat 不再负责旧式“多业务图路由”,只负责决定后续应该进入哪个统一节点。
if err := g.AddEdge(compose.START, NodeChat); err != nil {
return nil, err
}
// Chat → END(普通聊天) / Plan / Confirm / Execute / Deliver / Interrupt
if err := g.AddBranch(NodeChat, compose.NewGraphBranch(
branchAfterChat,
map[string]bool{
NodePlan: true,
NodeConfirm: true,
NodeExecute: true,
NodeDeliver: true,
NodeInterrupt: true,
compose.END: true,
},
)); err != nil {
return nil, err
}
// Plan → Plan(继续规划) / Confirm(规划完成) / Interrupt(需要追问用户)
if err := g.AddBranch(NodePlan, compose.NewGraphBranch(
branchAfterPlan,
map[string]bool{
NodePlan: true,
NodeConfirm: true,
NodeInterrupt: true,
},
)); err != nil {
return nil, err
}
// Confirm → Plan(用户拒绝或重规划) / Execute(确认后继续执行) / Interrupt(产出确认中断并等待外部回调)
if err := g.AddBranch(NodeConfirm, compose.NewGraphBranch(
branchAfterConfirm,
map[string]bool{
NodePlan: true,
NodeExecute: true,
NodeInterrupt: true,
},
)); err != nil {
return nil, err
}
// Execute → Execute(继续 ReAct) / Confirm(写操作待确认) / Deliver(完成) / Interrupt(需要追问用户)
if err := g.AddBranch(NodeExecute, compose.NewGraphBranch(
branchAfterExecute,
map[string]bool{
NodeExecute: true,
NodeConfirm: true,
NodeDeliver: true,
NodeInterrupt: true,
},
)); err != nil {
return nil, err
}
// Interrupt → END当前连接必须在这里收口等待用户输入或确认回调恢复。
if err := g.AddEdge(NodeInterrupt, compose.END); err != nil {
return nil, err
}
// Deliver → END
if err := g.AddEdge(NodeDeliver, compose.END); err != nil {
return nil, err
}
// --- 编译运行 ---
maxSteps := flowState.MaxRounds + 10
runnable, err := g.Compile(ctx,
compose.WithGraphName(GraphName),
compose.WithMaxRunSteps(maxSteps),
compose.WithNodeTriggerMode(compose.AnyPredecessor),
)
if err != nil {
return nil, err
}
return runnable.Invoke(ctx, state)
}
// --- 占位节点,后续由 node 层替换 ---
func chatNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) {
if st == nil {
return nil, errors.New("chat node: state is nil")
}
st.EnsureCommonState()
// TODO:
// 1. 识别当前请求是普通聊天、首次任务进入,还是从 pending interaction 恢复。
// 2. 若只是普通聊天,则生成回复并把 Phase 设为 PhaseChatting后续直接 END。
// 3. 若识别到任务意图,则把 Phase 切到 planning / waiting_confirm / executing 对应阶段。
// 4. 若本轮是恢复请求,则这里只负责吞掉用户最新输入并准备恢复,不再重复输出闲聊回复。
return st, nil
}
func planNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) {
if st == nil {
return nil, errors.New("plan node: state is nil")
}
st.EnsureCommonState()
// TODO:
// 1. 每轮把“完整 plan + 当前步骤 + 置顶上下文”注入给 LLM让模型只补一步规划。
// 2. 若缺少关键信息,则调用 st.OpenAskUserInteraction(...) 打开 ask_user 中断。
// 3. 若规划已经完整,则调用 st.FinishPlan(steps),把流程切到 waiting_confirm。
// 4. 若规划未完成,则保持 PhasePlanning分支回到 plan 继续循环。
return st, nil
}
func confirmNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) {
if st == nil {
return nil, errors.New("confirm node: state is nil")
}
st.EnsureCommonState()
// TODO:
// 1. 这里不再做“confirm 节点内自循环等待”,而是统一走中断恢复模式。
// 2. 节点职责是生成确认事件、固化待执行工具快照,并调用 st.OpenConfirmInteraction(...)。
// 3. 当前连接随后会流向 interrupt 节点收口;用户确认/取消后,由外部回调恢复到 executing 或 planning。
// 4. 这里不要直接执行写工具,必须先把待执行工具调用固化为 pending snapshot。
return st, nil
}
func executeNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) {
if st == nil {
return nil, errors.New("execute node: state is nil")
}
flowState := st.EnsureCommonState()
// TODO:
// 1. 让 LLM 在“当前步骤”约束下做一轮 ReAct思考 → 调工具/观察 → reflection。
// 2. 若执行中发现缺少关键用户信息,则调用 st.OpenAskUserInteraction(...) 并走 interrupt。
// 3. 若命中写工具确认闸门:
// 3.1 若走同连接确认,则把 Phase 置为 waiting_confirm 并跳到 confirm
// 3.2 若走短连接恢复,则调用 st.OpenConfirmInteraction(...) 并走 interrupt。
// 4. 若当前步骤已完成,则由 node 层决定是 AdvanceStep() 继续,还是 Done() 进入交付。
flowState.NextRound()
return st, nil
}
func interruptNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) {
if st == nil {
return nil, errors.New("interrupt node: state is nil")
}
st.EnsureCommonState()
// TODO:
// 1. 若 PendingInteraction.Type=ask_user则像普通聊天一样流式吐出问题文本。
// 2. 若 PendingInteraction.Type=confirm则推送前端可识别的确认事件并把待执行工具调用一起带上。
// 3. 输出完成后,立刻把 AgentRuntimeState 快照持久化到 Redis + MySQL形成后续恢复点。
// 4. 当前节点结束后必须断开连接,等待用户聊天回复或确认回调重新进入 graph。
return st, nil
}
func deliverNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) {
if st == nil {
return nil, errors.New("deliver node: state is nil")
}
flowState := st.EnsureCommonState()
// TODO: 将执行结果推给用户,并在所有外部落库完成后再标记 done。
flowState.Done()
return st, nil
}
// --- 分支函数 ---
func branchAfterChat(_ context.Context, st *newagentmodel.AgentRuntimeState) (string, error) {
if nextNode, interrupted := branchIfInterrupted(st); interrupted {
return nextNode, nil
}
flowState := st.EnsureCommonState()
switch flowState.Phase {
case newagentmodel.PhasePlanning:
return NodePlan, nil
case newagentmodel.PhaseWaitingConfirm:
return NodeConfirm, nil
case newagentmodel.PhaseExecuting:
return NodeExecute, nil
case newagentmodel.PhaseDone:
return NodeDeliver, nil
default:
// 普通聊天场景,回复已在 chatNode 生成,当前请求可直接结束。
return compose.END, nil
}
}
func branchAfterPlan(_ context.Context, st *newagentmodel.AgentRuntimeState) (string, error) {
if nextNode, interrupted := branchIfInterrupted(st); interrupted {
return nextNode, nil
}
flowState := st.EnsureCommonState()
if flowState.Phase == newagentmodel.PhaseWaitingConfirm {
return NodeConfirm, nil
}
return NodePlan, nil
}
func branchAfterConfirm(_ context.Context, st *newagentmodel.AgentRuntimeState) (string, error) {
if nextNode, interrupted := branchIfInterrupted(st); interrupted {
return nextNode, nil
}
flowState := st.EnsureCommonState()
switch flowState.Phase {
case newagentmodel.PhaseExecuting:
return NodeExecute, nil
case newagentmodel.PhaseWaitingConfirm:
// 1. confirm 节点产出确认请求后,当前连接必须进入 interrupt 收口。
// 2. 真正的用户确认结果应由外部回调写回状态,再重新进入 graph。
return NodeInterrupt, nil
default:
return NodePlan, nil
}
}
func branchAfterExecute(_ context.Context, st *newagentmodel.AgentRuntimeState) (string, error) {
if nextNode, interrupted := branchIfInterrupted(st); interrupted {
return nextNode, nil
}
flowState := st.EnsureCommonState()
if flowState.Phase == newagentmodel.PhaseWaitingConfirm {
return NodeConfirm, nil
}
if flowState.Phase == newagentmodel.PhaseDone || flowState.Exhausted() {
return NodeDeliver, nil
}
return NodeExecute, nil
}
func branchIfInterrupted(st *newagentmodel.AgentRuntimeState) (string, bool) {
if st == nil {
return "", false
}
if st.HasPendingInteraction() {
return NodeInterrupt, true
}
return "", false
}