From 6d22acb27036cd146557b41aff835f8351eebe7b Mon Sep 17 00:00:00 2001 From: Losita <2810873701@qq.com> Date: Sun, 29 Mar 2026 22:12:23 +0800 Subject: [PATCH] =?UTF-8?q?Version:=200.8.4.dev.260329=20=E5=90=8E?= =?UTF-8?q?=E7=AB=AF=EF=BC=9A=201.=E6=96=B0=E5=BB=BAnewAgent=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E5=A4=B9=EF=BC=8C=E6=98=AF=E7=9A=84=E4=BD=A0=E6=B2=A1?= =?UTF-8?q?=E5=90=AC=E9=94=99=EF=BC=8C=E5=88=9A=E5=88=9A=E6=90=AC=E8=BF=81?= =?UTF-8?q?=E5=AE=8C=E7=9A=84=E6=97=A7=E7=BB=93=E6=9E=84=E5=8F=88=E5=87=86?= =?UTF-8?q?=E5=A4=87=E6=8E=A8=E7=BF=BB=E4=BA=86=EF=BC=9A=E5=9B=A0=E4=B8=BA?= =?UTF-8?q?=E9=80=9A=E7=94=A8=E6=80=A7=E5=A4=AA=E5=B7=AE=EF=BC=8C=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E9=9C=80=E6=B1=82=E5=A4=8D=E6=9D=82=E4=B8=80=E7=82=B9?= =?UTF-8?q?=E5=B0=B1=E6=8B=9B=E6=9E=B6=E4=B8=8D=E4=BA=86=E3=80=82=E6=9C=80?= =?UTF-8?q?=E6=96=B0=E7=9A=84=E6=9E=B6=E6=9E=84=E5=B7=B2=E7=BB=8F=E5=9C=A8?= =?UTF-8?q?=E8=B7=AF=E4=B8=8A=EF=BC=8C=E8=BF=99=E5=BA=94=E8=AF=A5=E6=98=AF?= =?UTF-8?q?=E8=BF=99=E4=B8=AA=E9=A1=B9=E7=9B=AE=E7=9A=84=E6=AD=A3=E7=A1=AE?= =?UTF-8?q?=E8=B7=AF=E7=BA=BF=E4=BA=86=EF=BC=8C=E7=9B=AE=E5=89=8D=E6=AD=A3?= =?UTF-8?q?=E5=9C=A8=E6=90=AD=E9=AA=A8=E6=9E=B6=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 前端: 无改动 全仓库: 无改动 --- backend/agent/model/schedule_refine.go | 100 +++--- backend/model/agent.go | 166 +++++++++- backend/newAgent/graph/common_graph.go | 292 ++++++++++++++++++ backend/newAgent/llm/ark.go | 83 +++++ backend/newAgent/llm/client.go | 216 +++++++++++++ backend/newAgent/llm/json.go | 112 +++++++ backend/newAgent/model/common_state.go | 147 +++++++++ .../newAgent/model/conversation_context.go | 215 +++++++++++++ backend/newAgent/model/execute_contract.go | 205 ++++++++++++ backend/newAgent/model/pending_interaction.go | 220 +++++++++++++ backend/newAgent/prompt/execute.go | 104 +++++++ backend/newAgent/prompt/plan.go | 244 +++++++++++++++ backend/newAgent/shared/retry.go | 85 +++++ backend/newAgent/shared/time.go | 49 +++ backend/newAgent/stream/emitter.go | 115 +++++++ backend/newAgent/stream/openai.go | 102 ++++++ newagent-roadmap.md | 70 +++++ 17 files changed, 2474 insertions(+), 51 deletions(-) create mode 100644 backend/newAgent/graph/common_graph.go create mode 100644 backend/newAgent/llm/ark.go create mode 100644 backend/newAgent/llm/client.go create mode 100644 backend/newAgent/llm/json.go create mode 100644 backend/newAgent/model/common_state.go create mode 100644 backend/newAgent/model/conversation_context.go create mode 100644 backend/newAgent/model/execute_contract.go create mode 100644 backend/newAgent/model/pending_interaction.go create mode 100644 backend/newAgent/prompt/execute.go create mode 100644 backend/newAgent/prompt/plan.go create mode 100644 backend/newAgent/shared/retry.go create mode 100644 backend/newAgent/shared/time.go create mode 100644 backend/newAgent/stream/emitter.go create mode 100644 backend/newAgent/stream/openai.go create mode 100644 newagent-roadmap.md diff --git a/backend/agent/model/schedule_refine.go b/backend/agent/model/schedule_refine.go index efd2df3..56482ce 100644 --- a/backend/agent/model/schedule_refine.go +++ b/backend/agent/model/schedule_refine.go @@ -29,7 +29,7 @@ const ( defaultRepairReserve = ScheduleRefineDefaultRepairReserve ) -// RefineContract 琛ㄧず鏈疆寰皟鎰忓浘濂戠害銆? +// RefineContract 表示本轮微调意图契约。 type RefineContract struct { Intent string `json:"intent"` Strategy string `json:"strategy"` @@ -39,13 +39,13 @@ type RefineContract struct { OrderScope string `json:"order_scope"` } -// RefineAssertion 琛ㄧず鍙敱鍚庣鐩存帴鍒ゅ畾鐨勭粨鏋勫寲纭柇瑷€銆? +// RefineAssertion 表示可由后端直接判定的结构化硬断言。 // -// 瀛楁璇存槑锛? -// 1. Metric锛氭柇瑷€鎸囨爣鍚嶏紝渚嬪 source_move_ratio_percent锛? -// 2. Operator锛氭瘮杈冩搷浣滅锛屾敮鎸?== / <= / >= / between锛? -// 3. Value/Min/Max锛氶槇鍊硷紱 -// 4. Week/TargetWeek锛氬彲閫夊懆娆′笂涓嬫枃銆? +// 字段说明: +// 1. Metric:断言指标名,例如 source_move_ratio_percent; +// 2. Operator:比较操作符,支持 == / <= / >= / between; +// 3. Value/Min/Max:阈值; +// 4. Week/TargetWeek:可选周次上下文。 type RefineAssertion struct { Metric string `json:"metric"` Operator string `json:"operator"` @@ -56,7 +56,7 @@ type RefineAssertion struct { TargetWeek int `json:"target_week,omitempty"` } -// HardCheckReport 琛ㄧず缁堝纭牎楠岀粨鏋溿€? +// HardCheckReport 表示终审硬校验结果。 type HardCheckReport struct { PhysicsPassed bool `json:"physics_passed"` PhysicsIssues []string `json:"physics_issues,omitempty"` @@ -71,7 +71,7 @@ type HardCheckReport struct { RepairTried bool `json:"repair_tried"` } -// ReactRoundObservation 璁板綍姣忚疆 ReAct 鐨勫叧閿瀵熴€? +// ReactRoundObservation 记录每轮 ReAct 的关键观察。 type ReactRoundObservation struct { Round int `json:"round"` GoalCheck string `json:"goal_check,omitempty"` @@ -84,13 +84,13 @@ type ReactRoundObservation struct { Reflect string `json:"reflect,omitempty"` } -// PlannerPlan 琛ㄧず Planner 鐢熸垚鐨勯樁娈垫墽琛岃鍒掋€? +// PlannerPlan 表示 Planner 生成的阶段执行计划。 type PlannerPlan struct { Summary string `json:"summary"` Steps []string `json:"steps,omitempty"` } -// RefineSlicePlan 琛ㄧず鍒囩墖鑺傜偣杈撳嚭銆? +// RefineSlicePlan 表示切片节点输出。 type RefineSlicePlan struct { WeekFilter []int `json:"week_filter,omitempty"` SourceDays []int `json:"source_days,omitempty"` @@ -99,12 +99,12 @@ type RefineSlicePlan struct { Reason string `json:"reason,omitempty"` } -// RefineObjective 琛ㄧず鈥滃彲鎵ц涓斿彲鏍¢獙鈥濈殑鐩爣绾︽潫銆? +// RefineObjective 表示"可执行且可校验"的目标约束。 // -// 璁捐璇存槑锛? -// 1. 鐢?contract/slice 浠庤嚜鐒惰瑷€缂栬瘧寰楀埌锛? -// 2. 鎵ц闃舵锛坉one 鏀跺彛锛変笌缁堝闃舵锛坔ard_check锛夊叡鐢ㄥ悓涓€浠界害鏉燂紱 -// 3. 閬垮厤鈥滄墽琛岄€昏緫涓庣粓瀹¢€昏緫鍚勮鍚勮瘽鈥濄€? +// 设计说明: +// 1. 由 contract/slice 从自然语言编译得到; +// 2. 执行阶段(done 收口)与终审阶段(hard_check)共用同一份约束; +// 3. 避免"执行逻辑与终审逻辑各说各话"。 type RefineObjective struct { Mode string `json:"mode,omitempty"` // none | move_all | move_ratio @@ -122,9 +122,9 @@ type RefineObjective struct { Reason string `json:"reason,omitempty"` } -// ScheduleRefineState 鏄繛缁井璋冨浘鐨勭粺涓€鐘舵€併€? +// ScheduleRefineState 是连续微调图的统一状态。 type ScheduleRefineState struct { - // 1) 璇锋眰涓婁笅鏂? + // 1) 请求上下文 TraceID string UserID int ConversationID string @@ -132,19 +132,19 @@ type ScheduleRefineState struct { RequestNow time.Time RequestNowText string - // 2) 缁ф壙鑷瑙堝揩鐓х殑鏁版嵁 + // 2) 继承自预览快照的数据 TaskClassIDs []int Constraints []string - // InitialHybridEntries 淇濆瓨鏈疆寰皟寮€濮嬪墠鐨勫熀绾匡紝鐢ㄤ簬缁堝鍋氣€滃墠鍚庡姣斺€濄€? - // 璇存槑锛? - // 1. 鍙璇箟锛屼笉鍙備笌鎵ц鏈熸敼鍐欙紱 - // 2. 缁堝鍙熀浜庡畠鍒ゆ柇鈥滄潵婧愪换鍔℃槸鍚︾湡姝h縼绉诲埌鐩爣鍖哄煙鈥濄€? + // InitialHybridEntries 保存本轮微调开始前的基线,用于终审做"前后对比"。 + // 说明: + // 1. 只读语义,不参与执行期改写; + // 2. 终审只基于它判断"来源任务是否真正迁移到目标区域"。 InitialHybridEntries []model.HybridScheduleEntry HybridEntries []model.HybridScheduleEntry AllocatedItems []model.TaskClassItem CandidatePlans []model.UserWeekSchedule - // 3) 鏈疆鎵ц鐘舵€? + // 3) 本轮执行状态 UserIntent string Contract RefineContract @@ -152,7 +152,7 @@ type ScheduleRefineState struct { PerTaskBudget int ExecuteMax int ReplanMax int - // CompositeRetryMax 琛ㄧず澶嶅悎璺敱澶辫触鍚庣殑鏈€澶ч噸璇曟鏁帮紙涓嶅惈棣栨灏濊瘯锛夈€? + // CompositeRetryMax 表示复合路由失败后的最大重试次数(不含首次尝试)。 CompositeRetryMax int PlanUsed int @@ -169,27 +169,27 @@ type ScheduleRefineState struct { CurrentPlan PlannerPlan BatchMoveAllowed bool - // DisableCompositeTools=true 琛ㄧず宸茶繘鍏?ReAct 鍏滃簳锛岀姝㈠啀璋冪敤澶嶅悎宸ュ叿銆? + // DisableCompositeTools=true 表示已进入 ReAct 兜底,禁止再调用复合工具。 DisableCompositeTools bool - // CompositeRouteTried 鏍囪鏄惁灏濊瘯杩団€滃鍚堟壒澶勭悊璺敱鈥濄€? + // CompositeRouteTried 标记是否尝试过"复合批处理路由"。 CompositeRouteTried bool - // CompositeRouteSucceeded 鏍囪澶嶅悎鎵瑰鐞嗚矾鐢辨槸鍚﹀凡瀹屾垚鈥滃鍚堝垎鏀嚭绔欌€濄€? + // CompositeRouteSucceeded 标记复合批处理路由是否已完成"复合分支出站"。 // - // 璇存槑锛? - // 1. true 琛ㄧず褰撳墠閾捐矾鍙互璺宠繃 ReAct 鍏滃簳锛岀洿鎺ヨ繘鍏?hard_check锛? - // 2. 瀹冧笉绛変环浜庘€滅粓瀹″凡閫氳繃鈥濓紝缁堝鏄惁閫氳繃浠嶄互鍚庣画 HardCheck 缁撴灉涓哄噯锛? - // 3. 杩欐牱鍖哄垎鏄负浜嗛伩鍏嶁€滃鍚堝伐鍏峰凡鎴愬姛鎵ц锛屼絾涓氬姟鐩爣瑕佺瓑缁堝瑁佸喅鈥濇椂琚鍒や负澶辫触銆? + // 说明: + // 1. true 表示当前链路可以跳过 ReAct 兜底,直接进入 hard_check; + // 2. 它不等价于"终审已通过",终审是否通过仍以后续 HardCheck 结果为准; + // 3. 这样区分是为了避免"复合工具已成功执行,但业务目标要等终审裁决"时被误判为失败。 CompositeRouteSucceeded bool TaskActionUsed map[int]int EntriesVersion int SeenSlotQueries map[string]struct{} - // RequiredCompositeTool 琛ㄧず鏈疆绛栫暐瑕佹眰鈥滃繀椤昏嚦灏戞垚鍔熶竴娆♀€濈殑澶嶅悎宸ュ叿銆? - // 鍙栧€肩害瀹氾細"" | "SpreadEven" | "MinContextSwitch"銆? + // RequiredCompositeTool 表示本轮策略要求"必须至少成功一次"的复合工具。 + // 取值约定:"" | "SpreadEven" | "MinContextSwitch"。 RequiredCompositeTool string - // CompositeToolCalled 璁板綍澶嶅悎宸ュ叿鏄惁鑷冲皯璋冪敤杩囦竴娆★紙涓嶅尯鍒嗘垚鍔熷け璐ワ級銆? + // CompositeToolCalled 记录复合工具是否至少调用过一次(不区分成功失败)。 CompositeToolCalled map[string]bool - // CompositeToolSuccess 璁板綍澶嶅悎宸ュ叿鏄惁鑷冲皯鎴愬姛杩囦竴娆°€? + // CompositeToolSuccess 记录复合工具是否至少成功过一次。 CompositeToolSuccess map[string]bool SlicePlan RefineSlicePlan @@ -202,20 +202,20 @@ type ScheduleRefineState struct { LastFailedCallSignature string OriginOrderMap map[int]int - // 4) 缁堝鐘舵€? + // 4) 终审状态 HardCheck HardCheckReport - // 5) 鏈€缁堣緭鍑? + // 5) 最终输出 FinalSummary string Completed bool } -// NewScheduleRefineState 鍩轰簬涓婁竴鐗堥瑙堝揩鐓у垵濮嬪寲鐘舵€併€? +// NewScheduleRefineState 基于上一版预览快照初始化状态。 // -// 鑱岃矗杈圭晫锛? -// 1. 璐熻矗鍒濆鍖栭绠椼€佷笂涓嬫枃瀛楁涓庡彲鍙樼姸鎬佸鍣紱 -// 2. 璐熻矗鎷疯礉 preview 鏁版嵁锛岄伩鍏嶈法璇锋眰寮曠敤姹℃煋锛? -// 3. 涓嶈礋璐e仛浠讳綍璋冨害鍔ㄤ綔銆? +// 职责边界: +// 1. 负责初始化预算、上下文字段与可变状态容器; +// 2. 负责拷贝 preview 数据,避免跨请求引用污染; +// 3. 不负责做任何调度动作。 func NewScheduleRefineState(traceID string, userID int, conversationID string, userMessage string, preview *model.SchedulePlanPreviewCache) *ScheduleRefineState { now := nowToMinute() st := &ScheduleRefineState{ @@ -249,7 +249,7 @@ func NewScheduleRefineState(traceID string, userID int, conversationID string, u Summary: "initialized, waiting for planner output", }, SlicePlan: RefineSlicePlan{ - Reason: "灏氭湭鍒囩墖", + Reason: "尚未切片", }, } if preview == nil { @@ -285,7 +285,7 @@ func cloneWeekSchedules(src []model.UserWeekSchedule) []model.UserWeekSchedule { return agentshared.CloneWeekSchedules(src) } -// buildOriginOrderMap 鏋勫缓 suggested 浠诲姟鐨勫垵濮嬮『搴忓熀绾匡紙task_item_id -> rank锛夈€? +// buildOriginOrderMap 构建 suggested 任务的初始顺序基线(task_item_id -> rank)。 func buildOriginOrderMap(entries []model.HybridScheduleEntry) map[int]int { orderMap := make(map[int]int) if len(entries) == 0 { @@ -320,12 +320,12 @@ func buildOriginOrderMap(entries []model.HybridScheduleEntry) map[int]int { return orderMap } -// FinalHardCheckPassed 鍒ゆ柇鈥滄渶缁堢粓瀹♀€濇槸鍚︽暣浣撻€氳繃銆? +// FinalHardCheckPassed 判断"最终终审"是否整体通过。 // -// 鑱岃矗杈圭晫锛? -// 1. 璐熻矗鑱氬悎 physics/order/intent 涓夌被纭牎楠岀粨鏋滐紝缁欐湇鍔″眰涓庢€荤粨闃舵缁熶竴澶嶇敤锛? -// 2. 涓嶈礋璐hЕ鍙戠粓瀹★紝涔熶笉璐熻矗鎺ㄥ淇鍔ㄤ綔锛? -// 3. nil state 瑙嗕负鏈€氳繃锛岄伩鍏嶄笂灞傛妸缂哄け缁撴灉璇垽涓烘垚鍔熴€? +// 职责边界: +// 1. 负责聚合 physics/order/intent 三类硬校验结果,给服务层与总结阶段统一复用; +// 2. 不负责触发终审,也不负责推送修复动作; +// 3. nil state 视为未通过,避免上层把缺失结果误判为成功。 func FinalHardCheckPassed(st *ScheduleRefineState) bool { if st == nil { return false diff --git a/backend/model/agent.go b/backend/model/agent.go index 84e31be..314919a 100644 --- a/backend/model/agent.go +++ b/backend/model/agent.go @@ -1,6 +1,58 @@ package model -import "time" +import ( + "encoding/json" + "fmt" + "strings" + "time" +) + +// AgentResumeType 表示本轮请求想恢复哪一类挂起交互。 +type AgentResumeType string + +const ( + AgentResumeTypeAskUser AgentResumeType = "ask_user" + AgentResumeTypeConfirm AgentResumeType = "confirm" + AgentResumeTypeConnectionRecover AgentResumeType = "connection_recover" +) + +// AgentResumeAction 表示用户这次恢复请求携带的动作类型。 +type AgentResumeAction string + +const ( + AgentResumeActionReply AgentResumeAction = "reply" + AgentResumeActionApprove AgentResumeAction = "approve" + AgentResumeActionReject AgentResumeAction = "reject" + AgentResumeActionCancel AgentResumeAction = "cancel" + AgentResumeActionResume AgentResumeAction = "resume" +) + +// AgentResumeRequest 是 extra.resume 的统一结构。 +// +// 设计目的: +// 1. 继续复用现有聊天入口,不再额外新增一条“确认专用接口”; +// 2. 前端只提交“我要恢复哪次交互、这次动作是什么”,不直接改后端 state; +// 3. 后端进入聊天主链路前,先读取这份结构,再决定走 confirm / ask_user / connection_recover 哪条恢复路径。 +// +// 推荐前端请求形态: +// +// { +// "message": "", +// "extra": { +// "resume": { +// "interaction_id": "xxx", +// "type": "confirm", +// "action": "approve" +// } +// } +// } +// +// TODO(newagent/api): 进入聊天主流程前,优先调用 req.ResumeRequest();若命中恢复协议,则不要把本轮请求按普通聊天处理。 +type AgentResumeRequest struct { + InteractionID string `json:"interaction_id"` + Type AgentResumeType `json:"type,omitempty"` + Action AgentResumeAction `json:"action"` +} type UserSendMessageRequest struct { ConversationID string `json:"conversation_id,omitempty"` @@ -10,6 +62,118 @@ type UserSendMessageRequest struct { Extra map[string]any `json:"extra,omitempty"` } +// ResumeRequest 从 extra.resume 中解析结构化恢复请求。 +// +// 步骤说明: +// 1. 若 extra 或 extra.resume 不存在,则直接返回 nil,表示本轮是普通聊天请求; +// 2. 先把任意 map/struct 形态统一转成 JSON,再反序列化到强类型结构,避免入口层到处手写断言; +// 3. 解析成功后先做 Normalize,再做最小必要校验,防止后续业务层拿到脏协议继续流转; +// 4. 这里只负责协议解析与基本校验,不负责真正恢复状态,也不负责改 Redis/MySQL。 +func (r *UserSendMessageRequest) ResumeRequest() (*AgentResumeRequest, error) { + if r == nil || len(r.Extra) == 0 { + return nil, nil + } + + rawResume, ok := r.Extra["resume"] + if !ok || rawResume == nil { + return nil, nil + } + + data, err := json.Marshal(rawResume) + if err != nil { + return nil, fmt.Errorf("序列化 extra.resume 失败: %w", err) + } + + var resume AgentResumeRequest + if err := json.Unmarshal(data, &resume); err != nil { + return nil, fmt.Errorf("解析 extra.resume 失败: %w", err) + } + + resume.Normalize() + if err := resume.Validate(); err != nil { + return nil, err + } + return &resume, nil +} + +// Normalize 统一清洗恢复协议中的字符串字段。 +func (r *AgentResumeRequest) Normalize() { + if r == nil { + return + } + r.InteractionID = strings.TrimSpace(r.InteractionID) + r.Type = AgentResumeType(strings.TrimSpace(string(r.Type))) + r.Action = AgentResumeAction(strings.TrimSpace(string(r.Action))) +} + +// Validate 校验恢复协议的最小合法性。 +// +// 职责边界: +// 1. 只校验“是否像一份合法的恢复协议”,不校验 interaction_id 是否真实存在; +// 2. confirm / ask_user / connection_recover 共用一条入口,但动作集合不同,所以这里做显式分流校验; +// 3. 对于 ask_user 回复,真正的回答正文仍建议优先放在顶层 message,这里不强制要求额外 answer 字段。 +func (r *AgentResumeRequest) Validate() error { + if r == nil { + return nil + } + if r.InteractionID == "" { + return fmt.Errorf("extra.resume.interaction_id 不能为空") + } + if r.Action == "" { + return fmt.Errorf("extra.resume.action 不能为空") + } + + switch r.Type { + case "", AgentResumeTypeConfirm: + switch r.Action { + case AgentResumeActionApprove, AgentResumeActionReject, AgentResumeActionCancel: + return nil + default: + return fmt.Errorf("confirm 恢复动作非法: %s", r.Action) + } + case AgentResumeTypeAskUser: + switch r.Action { + case AgentResumeActionReply, AgentResumeActionCancel: + return nil + default: + return fmt.Errorf("ask_user 恢复动作非法: %s", r.Action) + } + case AgentResumeTypeConnectionRecover: + switch r.Action { + case AgentResumeActionResume, AgentResumeActionCancel: + return nil + default: + return fmt.Errorf("connection_recover 恢复动作非法: %s", r.Action) + } + default: + return fmt.Errorf("extra.resume.type 非法: %s", r.Type) + } +} + +// IsConfirmResume 判断当前恢复请求是否属于 confirm 分支。 +func (r *AgentResumeRequest) IsConfirmResume() bool { + if r == nil { + return false + } + return r.Type == "" || r.Type == AgentResumeTypeConfirm +} + +// IsAskUserResume 判断当前恢复请求是否属于 ask_user 分支。 +func (r *AgentResumeRequest) IsAskUserResume() bool { + if r == nil { + return false + } + return r.Type == AgentResumeTypeAskUser +} + +// IsConnectionRecoverResume 判断当前恢复请求是否属于 connection_recover 分支。 +func (r *AgentResumeRequest) IsConnectionRecoverResume() bool { + if r == nil { + return false + } + return r.Type == AgentResumeTypeConnectionRecover +} + type ChatHistoryPersistPayload struct { UserID int `json:"user_id"` ConversationID string `json:"conversation_id"` diff --git a/backend/newAgent/graph/common_graph.go b/backend/newAgent/graph/common_graph.go new file mode 100644 index 0000000..a42c179 --- /dev/null +++ b/backend/newAgent/graph/common_graph.go @@ -0,0 +1,292 @@ +package graph + +import ( + "context" + "errors" + + newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" + "github.com/cloudwego/eino/compose" +) + +const ( + GraphName = "agent_loop" + + NodeChat = "chat" + NodePlan = "plan" + NodeConfirm = "confirm" + NodeExecute = "execute" + NodeInterrupt = "interrupt" + NodeDeliver = "deliver" +) + +func RunAgentGraph(ctx context.Context, state *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { + if state == nil { + return nil, errors.New("agent graph: state is nil") + } + + flowState := state.EnsureCommonState() + if flowState == nil { + return nil, errors.New("agent graph: common state is nil") + } + + g := compose.NewGraph[*newagentmodel.AgentRuntimeState, *newagentmodel.AgentRuntimeState]() + + // --- 注册节点 --- + if err := g.AddLambdaNode(NodeChat, compose.InvokableLambda(chatNode)); err != nil { + return nil, err + } + if err := g.AddLambdaNode(NodePlan, compose.InvokableLambda(planNode)); err != nil { + return nil, err + } + if err := g.AddLambdaNode(NodeConfirm, compose.InvokableLambda(confirmNode)); err != nil { + return nil, err + } + if err := g.AddLambdaNode(NodeExecute, compose.InvokableLambda(executeNode)); err != nil { + return nil, err + } + if err := g.AddLambdaNode(NodeInterrupt, compose.InvokableLambda(interruptNode)); err != nil { + return nil, err + } + if err := g.AddLambdaNode(NodeDeliver, compose.InvokableLambda(deliverNode)); err != nil { + return nil, err + } + + // --- 连边 --- + // 1. 所有请求统一先过 chat 入口,这样普通聊天、首次任务、恢复执行都走同一入口。 + // 2. chat 不再负责旧式“多业务图路由”,只负责决定后续应该进入哪个统一节点。 + if err := g.AddEdge(compose.START, NodeChat); err != nil { + return nil, err + } + // Chat → END(普通聊天) / Plan / Confirm / Execute / Deliver / Interrupt + if err := g.AddBranch(NodeChat, compose.NewGraphBranch( + branchAfterChat, + map[string]bool{ + NodePlan: true, + NodeConfirm: true, + NodeExecute: true, + NodeDeliver: true, + NodeInterrupt: true, + compose.END: true, + }, + )); err != nil { + return nil, err + } + // Plan → Plan(继续规划) / Confirm(规划完成) / Interrupt(需要追问用户) + if err := g.AddBranch(NodePlan, compose.NewGraphBranch( + branchAfterPlan, + map[string]bool{ + NodePlan: true, + NodeConfirm: true, + NodeInterrupt: true, + }, + )); err != nil { + return nil, err + } + // Confirm → Plan(用户拒绝或重规划) / Execute(确认后继续执行) / Interrupt(产出确认中断并等待外部回调) + if err := g.AddBranch(NodeConfirm, compose.NewGraphBranch( + branchAfterConfirm, + map[string]bool{ + NodePlan: true, + NodeExecute: true, + NodeInterrupt: true, + }, + )); err != nil { + return nil, err + } + // Execute → Execute(继续 ReAct) / Confirm(写操作待确认) / Deliver(完成) / Interrupt(需要追问用户) + if err := g.AddBranch(NodeExecute, compose.NewGraphBranch( + branchAfterExecute, + map[string]bool{ + NodeExecute: true, + NodeConfirm: true, + NodeDeliver: true, + NodeInterrupt: true, + }, + )); err != nil { + return nil, err + } + // Interrupt → END:当前连接必须在这里收口,等待用户输入或确认回调恢复。 + if err := g.AddEdge(NodeInterrupt, compose.END); err != nil { + return nil, err + } + // Deliver → END + if err := g.AddEdge(NodeDeliver, compose.END); err != nil { + return nil, err + } + + // --- 编译运行 --- + maxSteps := flowState.MaxRounds + 10 + runnable, err := g.Compile(ctx, + compose.WithGraphName(GraphName), + compose.WithMaxRunSteps(maxSteps), + compose.WithNodeTriggerMode(compose.AnyPredecessor), + ) + if err != nil { + return nil, err + } + return runnable.Invoke(ctx, state) +} + +// --- 占位节点,后续由 node 层替换 --- + +func chatNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { + if st == nil { + return nil, errors.New("chat node: state is nil") + } + st.EnsureCommonState() + + // TODO: + // 1. 识别当前请求是普通聊天、首次任务进入,还是从 pending interaction 恢复。 + // 2. 若只是普通聊天,则生成回复并把 Phase 设为 PhaseChatting,后续直接 END。 + // 3. 若识别到任务意图,则把 Phase 切到 planning / waiting_confirm / executing 对应阶段。 + // 4. 若本轮是恢复请求,则这里只负责吞掉用户最新输入并准备恢复,不再重复输出闲聊回复。 + return st, nil +} + +func planNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { + if st == nil { + return nil, errors.New("plan node: state is nil") + } + st.EnsureCommonState() + + // TODO: + // 1. 每轮把“完整 plan + 当前步骤 + 置顶上下文”注入给 LLM,让模型只补一步规划。 + // 2. 若缺少关键信息,则调用 st.OpenAskUserInteraction(...) 打开 ask_user 中断。 + // 3. 若规划已经完整,则调用 st.FinishPlan(steps),把流程切到 waiting_confirm。 + // 4. 若规划未完成,则保持 PhasePlanning,分支回到 plan 继续循环。 + return st, nil +} + +func confirmNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { + if st == nil { + return nil, errors.New("confirm node: state is nil") + } + st.EnsureCommonState() + + // TODO: + // 1. 这里不再做“confirm 节点内自循环等待”,而是统一走中断恢复模式。 + // 2. 节点职责是生成确认事件、固化待执行工具快照,并调用 st.OpenConfirmInteraction(...)。 + // 3. 当前连接随后会流向 interrupt 节点收口;用户确认/取消后,由外部回调恢复到 executing 或 planning。 + // 4. 这里不要直接执行写工具,必须先把待执行工具调用固化为 pending snapshot。 + return st, nil +} + +func executeNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { + if st == nil { + return nil, errors.New("execute node: state is nil") + } + flowState := st.EnsureCommonState() + + // TODO: + // 1. 让 LLM 在“当前步骤”约束下做一轮 ReAct:思考 → 调工具/观察 → reflection。 + // 2. 若执行中发现缺少关键用户信息,则调用 st.OpenAskUserInteraction(...) 并走 interrupt。 + // 3. 若命中写工具确认闸门: + // 3.1 若走同连接确认,则把 Phase 置为 waiting_confirm 并跳到 confirm; + // 3.2 若走短连接恢复,则调用 st.OpenConfirmInteraction(...) 并走 interrupt。 + // 4. 若当前步骤已完成,则由 node 层决定是 AdvanceStep() 继续,还是 Done() 进入交付。 + flowState.NextRound() + return st, nil +} + +func interruptNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { + if st == nil { + return nil, errors.New("interrupt node: state is nil") + } + st.EnsureCommonState() + + // TODO: + // 1. 若 PendingInteraction.Type=ask_user,则像普通聊天一样流式吐出问题文本。 + // 2. 若 PendingInteraction.Type=confirm,则推送前端可识别的确认事件,并把待执行工具调用一起带上。 + // 3. 输出完成后,立刻把 AgentRuntimeState 快照持久化到 Redis + MySQL,形成后续恢复点。 + // 4. 当前节点结束后必须断开连接,等待用户聊天回复或确认回调重新进入 graph。 + return st, nil +} + +func deliverNode(_ context.Context, st *newagentmodel.AgentRuntimeState) (*newagentmodel.AgentRuntimeState, error) { + if st == nil { + return nil, errors.New("deliver node: state is nil") + } + flowState := st.EnsureCommonState() + + // TODO: 将执行结果推给用户,并在所有外部落库完成后再标记 done。 + flowState.Done() + return st, nil +} + +// --- 分支函数 --- + +func branchAfterChat(_ context.Context, st *newagentmodel.AgentRuntimeState) (string, error) { + if nextNode, interrupted := branchIfInterrupted(st); interrupted { + return nextNode, nil + } + + flowState := st.EnsureCommonState() + switch flowState.Phase { + case newagentmodel.PhasePlanning: + return NodePlan, nil + case newagentmodel.PhaseWaitingConfirm: + return NodeConfirm, nil + case newagentmodel.PhaseExecuting: + return NodeExecute, nil + case newagentmodel.PhaseDone: + return NodeDeliver, nil + default: + // 普通聊天场景,回复已在 chatNode 生成,当前请求可直接结束。 + return compose.END, nil + } +} + +func branchAfterPlan(_ context.Context, st *newagentmodel.AgentRuntimeState) (string, error) { + if nextNode, interrupted := branchIfInterrupted(st); interrupted { + return nextNode, nil + } + + flowState := st.EnsureCommonState() + if flowState.Phase == newagentmodel.PhaseWaitingConfirm { + return NodeConfirm, nil + } + return NodePlan, nil +} + +func branchAfterConfirm(_ context.Context, st *newagentmodel.AgentRuntimeState) (string, error) { + if nextNode, interrupted := branchIfInterrupted(st); interrupted { + return nextNode, nil + } + + flowState := st.EnsureCommonState() + switch flowState.Phase { + case newagentmodel.PhaseExecuting: + return NodeExecute, nil + case newagentmodel.PhaseWaitingConfirm: + // 1. confirm 节点产出确认请求后,当前连接必须进入 interrupt 收口。 + // 2. 真正的用户确认结果应由外部回调写回状态,再重新进入 graph。 + return NodeInterrupt, nil + default: + return NodePlan, nil + } +} + +func branchAfterExecute(_ context.Context, st *newagentmodel.AgentRuntimeState) (string, error) { + if nextNode, interrupted := branchIfInterrupted(st); interrupted { + return nextNode, nil + } + + flowState := st.EnsureCommonState() + if flowState.Phase == newagentmodel.PhaseWaitingConfirm { + return NodeConfirm, nil + } + if flowState.Phase == newagentmodel.PhaseDone || flowState.Exhausted() { + return NodeDeliver, nil + } + return NodeExecute, nil +} + +func branchIfInterrupted(st *newagentmodel.AgentRuntimeState) (string, bool) { + if st == nil { + return "", false + } + if st.HasPendingInteraction() { + return NodeInterrupt, true + } + return "", false +} diff --git a/backend/newAgent/llm/ark.go b/backend/newAgent/llm/ark.go new file mode 100644 index 0000000..0e5c98a --- /dev/null +++ b/backend/newAgent/llm/ark.go @@ -0,0 +1,83 @@ +package newagentllm + +import ( + "context" + "errors" + "strings" + + "github.com/cloudwego/eino-ext/components/model/ark" + einoModel "github.com/cloudwego/eino/components/model" + "github.com/cloudwego/eino/schema" + arkModel "github.com/volcengine/volcengine-go-sdk/service/arkruntime/model" +) + +// ArkCallOptions 是基于 ark.ChatModel 的通用调用选项。 +// +// 设计目的: +// 1. 当前 route / quicknote 都还直接持有 *ark.ChatModel; +// 2. 在它们完全收敛到更抽象的 Client 前,先把重复的 ark 调用样板抽成公共层; +// 3. 这样本轮就能先删除 route/quicknote 里那几份重复的 Generate 样板代码。 +type ArkCallOptions struct { + Temperature float64 + MaxTokens int + Thinking ThinkingMode +} + +// CallArkText 调用 ark 模型并返回纯文本。 +// +// 职责边界: +// 1. 负责拼 system + user 两段消息; +// 2. 负责统一配置 thinking / temperature / maxTokens; +// 3. 负责拦截空响应; +// 4. 不负责 JSON 解析,不负责业务字段校验。 +func CallArkText(ctx context.Context, chatModel *ark.ChatModel, systemPrompt, userPrompt string, options ArkCallOptions) (string, error) { + if chatModel == nil { + return "", errors.New("ark model is nil") + } + + messages := []*schema.Message{ + schema.SystemMessage(systemPrompt), + schema.UserMessage(userPrompt), + } + resp, err := chatModel.Generate(ctx, messages, buildArkOptions(options)...) + if err != nil { + return "", err + } + if resp == nil { + return "", errors.New("模型返回为空") + } + + text := strings.TrimSpace(resp.Content) + if text == "" { + return "", errors.New("模型返回内容为空") + } + return text, nil +} + +// CallArkJSON 调用 ark 模型并直接解析 JSON。 +func CallArkJSON[T any](ctx context.Context, chatModel *ark.ChatModel, systemPrompt, userPrompt string, options ArkCallOptions) (*T, string, error) { + raw, err := CallArkText(ctx, chatModel, systemPrompt, userPrompt, options) + if err != nil { + return nil, "", err + } + parsed, err := ParseJSONObject[T](raw) + if err != nil { + return nil, raw, err + } + return parsed, raw, nil +} + +func buildArkOptions(options ArkCallOptions) []einoModel.Option { + thinkingType := arkModel.ThinkingTypeDisabled + if options.Thinking == ThinkingModeEnabled { + thinkingType = arkModel.ThinkingTypeEnabled + } + opts := []einoModel.Option{ + ark.WithThinking(&arkModel.Thinking{Type: thinkingType}), + einoModel.WithTemperature(float32(options.Temperature)), + } + if options.MaxTokens > 0 { + opts = append(opts, einoModel.WithMaxTokens(options.MaxTokens)) + } + return opts +} diff --git a/backend/newAgent/llm/client.go b/backend/newAgent/llm/client.go new file mode 100644 index 0000000..f656d4e --- /dev/null +++ b/backend/newAgent/llm/client.go @@ -0,0 +1,216 @@ +package newagentllm + +import ( + "context" + "errors" + "fmt" + "strings" + + "github.com/cloudwego/eino/schema" +) + +// ThinkingMode 描述本次模型调用对 thinking 的期望。 +// +// 职责边界: +// 1. 这里只表达“调用方希望怎样配置推理模式”; +// 2. 不直接绑定某个具体模型厂商的参数枚举; +// 3. 真正如何把它翻译成 ark / OpenAI / 其他 provider 的 option,由后续适配层负责。 +type ThinkingMode string + +const ( + ThinkingModeDefault ThinkingMode = "default" + ThinkingModeEnabled ThinkingMode = "enabled" + ThinkingModeDisabled ThinkingMode = "disabled" +) + +// GenerateOptions 是 Agent 内部统一的模型调用选项。 +// +// 设计目的: +// 1. 先把“每个 skill 都会反复传的参数”收敛成一份结构; +// 2. 让 node 层以后只表达“我要什么”,不再自己重复组织 option; +// 3. 暂时不追求覆盖所有 provider 参数,先把最常用的几个公共位抽出来。 +type GenerateOptions struct { + Temperature float64 + MaxTokens int + Thinking ThinkingMode + Metadata map[string]any +} + +// TextResult 是统一文本生成结果。 +// +// 职责边界: +// 1. Text 保存模型最终返回的纯文本; +// 2. Usage 保存本次调用的 token 使用量,供后续统一统计; +// 3. 不负责 JSON 解析,不负责业务字段映射。 +type TextResult struct { + Text string + Usage *schema.TokenUsage +} + +// StreamReader 抽象了“可逐块 Recv 的流式返回器”。 +// +// 之所以不直接依赖某个具体 SDK 的 reader 类型,是因为 Agent 现在还在建骨架阶段, +// 后续接 ark、OpenAI 兼容层还是别的 provider,都可以往这个最小接口上适配。 +type StreamReader interface { + Recv() (*schema.Message, error) + Close() error +} + +// TextGenerateFunc 是文本生成的统一适配函数签名。 +type TextGenerateFunc func(ctx context.Context, messages []*schema.Message, options GenerateOptions) (*TextResult, error) + +// StreamGenerateFunc 是流式生成的统一适配函数签名。 +type StreamGenerateFunc func(ctx context.Context, messages []*schema.Message, options GenerateOptions) (StreamReader, error) + +// Client 是 Agent 里的统一模型客户端门面。 +// +// 职责边界: +// 1. 负责把 node 层的“模型调用意图”收敛到统一入口; +// 2. 负责统一参数校验、空响应防御、GenerateJSON 复用; +// 3. 不负责写 prompt,不负责业务 fallback,也不直接持有具体厂商 SDK 细节。 +type Client struct { + generateText TextGenerateFunc + streamText StreamGenerateFunc +} + +// NewClient 创建统一模型客户端。 +func NewClient(generateText TextGenerateFunc, streamText StreamGenerateFunc) *Client { + return &Client{ + generateText: generateText, + streamText: streamText, + } +} + +// GenerateText 执行一次统一文本生成。 +// +// 职责边界: +// 1. 负责做最小必要的入参校验; +// 2. 负责统一拦截“模型空响应”这类公共问题; +// 3. 不负责业务 prompt 拼接,也不负责把文本再映射成业务结构。 +func (c *Client) GenerateText(ctx context.Context, messages []*schema.Message, options GenerateOptions) (*TextResult, error) { + if c == nil || c.generateText == nil { + return nil, errors.New("agent llm client is not ready") + } + if len(messages) == 0 { + return nil, errors.New("llm messages is empty") + } + + result, err := c.generateText(ctx, messages, options) + if err != nil { + return nil, err + } + if result == nil { + return nil, errors.New("llm result is nil") + } + if strings.TrimSpace(result.Text) == "" { + return nil, errors.New("llm returned empty text") + } + return result, nil +} + +// GenerateJSON 先走统一文本生成,再走统一 JSON 解析。 +// +// 设计说明: +// 1. 旧 agent 里每个 skill 都各自写了一份“Generate -> 提取 JSON -> 反序列化”; +// 2. 这里先把这一整段收敛成公共链路,后续 quicknote/taskquery/schedule 都直接复用; +// 3. 返回 parsed + rawResult,方便上层既能拿结构化字段,也能在打点/回退时保留原文。 +// 4. 这里做成泛型函数而不是方法,是因为 Go 不支持“方法自带类型参数”。 +func GenerateJSON[T any](ctx context.Context, client *Client, messages []*schema.Message, options GenerateOptions) (*T, *TextResult, error) { + result, err := client.GenerateText(ctx, messages, options) + if err != nil { + return nil, nil, err + } + + parsed, err := ParseJSONObject[T](result.Text) + if err != nil { + return nil, result, err + } + return parsed, result, nil +} + +// Stream 打开统一流式调用入口。 +// +// 职责边界: +// 1. 只负责把“流式生成能力”暴露给上层; +// 2. 不负责 chunk 到 OpenAI 协议的转换,那部分应放在 stream/; +// 3. 不负责累计全文,也不负责 token 统计落库。 +func (c *Client) Stream(ctx context.Context, messages []*schema.Message, options GenerateOptions) (StreamReader, error) { + if c == nil || c.streamText == nil { + return nil, errors.New("agent llm stream client is not ready") + } + if len(messages) == 0 { + return nil, errors.New("llm messages is empty") + } + return c.streamText(ctx, messages, options) +} + +// BuildSystemUserMessages 构造最常见的“system + history + user”消息列表。 +// +// 设计说明: +// 1. 这是旧 agent 中高频重复片段,几乎每个 skill 都会拼一次; +// 2. 这里先把最稳定的消息编排方式沉淀下来,减少 node 层样板代码; +// 3. 只做消息切片装配,不做 prompt 生成。 +func BuildSystemUserMessages(systemPrompt string, history []*schema.Message, userPrompt string) []*schema.Message { + messages := make([]*schema.Message, 0, len(history)+2) + if strings.TrimSpace(systemPrompt) != "" { + messages = append(messages, schema.SystemMessage(systemPrompt)) + } + if len(history) > 0 { + messages = append(messages, history...) + } + if strings.TrimSpace(userPrompt) != "" { + messages = append(messages, schema.UserMessage(userPrompt)) + } + return messages +} + +// CloneUsage 深拷贝 token usage,避免后续多处累加时共享同一指针。 +func CloneUsage(usage *schema.TokenUsage) *schema.TokenUsage { + if usage == nil { + return nil + } + copied := *usage + return &copied +} + +// MergeUsage 合并两段 usage。 +// +// 合并策略: +// 1. 对“同一次调用不同流分片”的场景,取更大值作为最终值; +// 2. 对“多次独立调用累计”的场景,应由上层显式做加法,而不是用这个函数; +// 3. 该函数只适用于“同一次调用的分块 usage 收敛”。 +func MergeUsage(base *schema.TokenUsage, incoming *schema.TokenUsage) *schema.TokenUsage { + if incoming == nil { + return CloneUsage(base) + } + if base == nil { + return CloneUsage(incoming) + } + + merged := *base + if incoming.PromptTokens > merged.PromptTokens { + merged.PromptTokens = incoming.PromptTokens + } + if incoming.CompletionTokens > merged.CompletionTokens { + merged.CompletionTokens = incoming.CompletionTokens + } + if incoming.TotalTokens > merged.TotalTokens { + merged.TotalTokens = incoming.TotalTokens + } + if incoming.PromptTokenDetails.CachedTokens > merged.PromptTokenDetails.CachedTokens { + merged.PromptTokenDetails.CachedTokens = incoming.PromptTokenDetails.CachedTokens + } + if incoming.CompletionTokensDetails.ReasoningTokens > merged.CompletionTokensDetails.ReasoningTokens { + merged.CompletionTokensDetails.ReasoningTokens = incoming.CompletionTokensDetails.ReasoningTokens + } + return &merged +} + +// FormatEmptyResponseError 统一生成“模型返回空结果”的错误文案。 +func FormatEmptyResponseError(scene string) error { + scene = strings.TrimSpace(scene) + if scene == "" { + scene = "unknown" + } + return fmt.Errorf("模型在 %s 场景返回空结果", scene) +} diff --git a/backend/newAgent/llm/json.go b/backend/newAgent/llm/json.go new file mode 100644 index 0000000..f41691e --- /dev/null +++ b/backend/newAgent/llm/json.go @@ -0,0 +1,112 @@ +package newagentllm + +import ( + "encoding/json" + "errors" + "fmt" + "strings" +) + +// ParseJSONObject 解析模型返回中的 JSON 对象。 +// +// 职责边界: +// 1. 负责处理“模型输出前后夹杂解释文字 / markdown 代码块”的常见情况; +// 2. 负责提取最外层 JSON object 并反序列化为目标结构; +// 3. 不负责业务字段合法性校验,例如 priority 是否在 1~4,应由上层 node 再校验。 +func ParseJSONObject[T any](raw string) (*T, error) { + clean := strings.TrimSpace(raw) + if clean == "" { + return nil, errors.New("模型返回为空,无法解析 JSON") + } + + objectText := ExtractJSONObject(clean) + if objectText == "" { + return nil, fmt.Errorf("模型返回中未找到 JSON 对象: %s", truncateForError(clean)) + } + + var out T + if err := json.Unmarshal([]byte(objectText), &out); err != nil { + return nil, fmt.Errorf("JSON 解析失败: %w", err) + } + return &out, nil +} + +// ExtractJSONObject 从混合文本里提取第一个完整 JSON 对象。 +// +// 设计说明: +// 1. LLM 很容易输出“这里是结果:{...}”这种半结构化文本; +// 2. 这里用括号计数而不是正则,避免嵌套对象一多就误截断; +// 3. 目前只提取 object,不提取 array,因为当前 agent 的路由/规划契约基本都是对象。 +func ExtractJSONObject(text string) string { + clean := trimMarkdownCodeFence(strings.TrimSpace(text)) + if clean == "" { + return "" + } + + start := strings.Index(clean, "{") + if start < 0 { + return "" + } + + depth := 0 + inString := false + escaped := false + for idx := start; idx < len(clean); idx++ { + ch := clean[idx] + + if escaped { + escaped = false + continue + } + if ch == '\\' && inString { + escaped = true + continue + } + if ch == '"' { + inString = !inString + continue + } + if inString { + continue + } + + switch ch { + case '{': + depth++ + case '}': + depth-- + if depth == 0 { + return clean[start : idx+1] + } + } + } + return "" +} + +func trimMarkdownCodeFence(text string) string { + trimmed := strings.TrimSpace(text) + if !strings.HasPrefix(trimmed, "```") { + return trimmed + } + + lines := strings.Split(trimmed, "\n") + if len(lines) == 0 { + return trimmed + } + + // 1. 去掉首行 ```json / ```; + // 2. 若末行是 ```,一并去掉; + // 3. 中间正文保持原样,避免破坏 JSON 的换行结构。 + body := lines[1:] + if len(body) > 0 && strings.TrimSpace(body[len(body)-1]) == "```" { + body = body[:len(body)-1] + } + return strings.TrimSpace(strings.Join(body, "\n")) +} + +func truncateForError(text string) string { + if len(text) <= 160 { + return text + } + return text[:160] + "..." +} diff --git a/backend/newAgent/model/common_state.go b/backend/newAgent/model/common_state.go new file mode 100644 index 0000000..1433adc --- /dev/null +++ b/backend/newAgent/model/common_state.go @@ -0,0 +1,147 @@ +package model + +// Phase 表示 agent 循环图当前所处的阶段。 +type Phase string + +const ( + PhasePlanning Phase = "planning" + PhaseWaitingConfirm Phase = "waiting_confirm" + PhaseExecuting Phase = "executing" + PhaseDone Phase = "done" +) + +const DefaultMaxRounds = 30 + +type CommonState struct { + // 身份 + TraceID string + UserID int + ConversationID string + + // 流程阶段 + Phase Phase + + // Plan + PlanSteps []string + CurrentStep int + + // 安全边界 + MaxRounds int + RoundUsed int +} + +func NewCommonState(traceID string, userID int, conversationID string) *CommonState { + return &CommonState{ + TraceID: traceID, + UserID: userID, + ConversationID: conversationID, + Phase: PhasePlanning, + MaxRounds: DefaultMaxRounds, + } +} + +// NextRound 消耗一轮并返回是否还有余量。 +func (s *CommonState) NextRound() bool { + s.RoundUsed++ + return s.RoundUsed <= s.MaxRounds +} + +// Exhausted 判断是否已耗尽轮次。 +func (s *CommonState) Exhausted() bool { + return s.RoundUsed >= s.MaxRounds +} + +// FinishPlan 标记 plan 完成,进入等待确认阶段。 +func (s *CommonState) FinishPlan(steps []string) { + s.PlanSteps = steps + s.CurrentStep = 0 + s.Phase = PhaseWaitingConfirm +} + +// ConfirmPlan 用户确认后进入执行阶段。 +func (s *CommonState) ConfirmPlan() { + s.Phase = PhaseExecuting +} + +// RejectPlan 用户拒绝,回到规划阶段。 +func (s *CommonState) RejectPlan() { + s.PlanSteps = nil + s.CurrentStep = 0 + s.Phase = PhasePlanning +} + +// AdvanceStep 推进到下一个 plan 步骤,返回是否还有剩余步骤。 +func (s *CommonState) AdvanceStep() bool { + s.CurrentStep++ + return s.CurrentStep < len(s.PlanSteps) +} + +// Done 标记整个流程结束。 +func (s *CommonState) Done() { + s.Phase = PhaseDone +} + +// HasPlan 判断当前 state 是否已经持有一份可执行的 plan。 +// +// 职责边界: +// 1. 负责把“外部直接判断 len(PlanSteps) > 0”的零散逻辑收口到 state 内部; +// 2. 只回答“是否存在 plan”,不判断当前索引是否有效; +// 3. 当 state 为空时返回 false,调用方可据此决定是否回退到重新规划。 +func (s *CommonState) HasPlan() bool { + if s == nil { + return false + } + return len(s.PlanSteps) > 0 +} + +// CurrentPlanStep 返回当前正在执行的 plan 步骤文本。 +// +// 职责边界: +// 1. 负责根据 CurrentStep 安全读取 PlanSteps,避免调用方重复写切片越界判断; +// 2. 当 state 为空、plan 为空、或当前索引越界时,统一返回 ("", false); +// 3. 不负责推进步骤,也不负责修正 CurrentStep 的取值。 +func (s *CommonState) CurrentPlanStep() (string, bool) { + if s == nil { + return "", false + } + if s.CurrentStep < 0 || s.CurrentStep >= len(s.PlanSteps) { + return "", false + } + return s.PlanSteps[s.CurrentStep], true +} + +// HasCurrentPlanStep 判断“当前步骤”是否存在且可安全读取。 +// +// 职责边界: +// 1. 负责给 graph / node 层提供一个更直白的布尔判断入口; +// 2. 内部复用 CurrentPlanStep,避免两处维护相同的索引边界逻辑; +// 3. 不返回步骤内容,只回答“当前是否还有可注入的步骤”。 +func (s *CommonState) HasCurrentPlanStep() bool { + _, ok := s.CurrentPlanStep() + return ok +} + +// PlanProgress 返回当前 plan 的执行进度。 +// +// 输出语义: +// 1. current 使用对人类更友好的 1-based 序号,适合直接写入 prompt 或日志; +// 2. total 表示当前 plan 总步数; +// 3. 若尚未生成 plan,则返回 (0, 0); +// 4. 若 CurrentStep 已越过末尾,则 current 会被收敛到 total,避免上层出现 total+1 这类噪音值。 +func (s *CommonState) PlanProgress() (current int, total int) { + if s == nil { + return 0, 0 + } + + total = len(s.PlanSteps) + if total == 0 { + return 0, 0 + } + if s.CurrentStep < 0 { + return 0, total + } + if s.CurrentStep >= total { + return total, total + } + return s.CurrentStep + 1, total +} diff --git a/backend/newAgent/model/conversation_context.go b/backend/newAgent/model/conversation_context.go new file mode 100644 index 0000000..b70ac62 --- /dev/null +++ b/backend/newAgent/model/conversation_context.go @@ -0,0 +1,215 @@ +package model + +import ( + "strings" + + "github.com/cloudwego/eino/schema" +) + +// ConversationContext 承载“本轮要喂给模型的输入材料”。 +// +// 职责边界: +// 1. 负责保存 system prompt、对话历史、置顶注入块、工具 schema 摘要; +// 2. 负责提供最小必要的安全访问方法,避免 node / prompt 层直接散落切片操作; +// 3. 不负责流程推进,phase / round / current step 仍归 CommonState 管; +// 4. 不负责真正的 prompt 组装,消息如何拼接仍应放在 prompt 层处理。 +// +// TODO(newagent/prompt): 后续由 plan / execute 的 prompt builder 读取这里的数据,组装真正发给 LLM 的 messages。 +// TODO(newagent/node): 后续 planNode / executeNode 只通过这里的访问方法读写上下文,避免多处直接改切片。 +type ConversationContext struct { + SystemPrompt string + History []*schema.Message + PinnedBlocks []ContextBlock + ToolSchemas []ToolSchemaContext +} + +// ContextBlock 表示一段可被“置顶注入”的自然语言上下文。 +// +// 设计目的: +// 1. Key 用于让调用方按语义覆盖,例如 current_plan / current_step / execution_rule; +// 2. Title 用于 prompt 层后续决定是否渲染成小标题; +// 3. Content 存真正的自然语言内容,保持你当前“plan 用自然语言表达”的思路。 +type ContextBlock struct { + Key string + Title string + Content string +} + +// ToolSchemaContext 是工具描述的轻量快照。 +// +// 职责边界: +// 1. 这里只保留 prompt 注入真正需要的摘要信息; +// 2. SchemaText 约定存“已经整理好的自然语言 / JSON schema 摘要”; +// 3. 不直接耦合具体 tool registry 里的复杂结构,避免 model 层反向依赖工具实现。 +type ToolSchemaContext struct { + Name string + Desc string + SchemaText string +} + +// NewConversationContext 创建最小上下文容器。 +func NewConversationContext(systemPrompt string) *ConversationContext { + return &ConversationContext{ + SystemPrompt: strings.TrimSpace(systemPrompt), + } +} + +// SetSystemPrompt 更新系统提示词。 +func (c *ConversationContext) SetSystemPrompt(systemPrompt string) { + if c == nil { + return + } + c.SystemPrompt = strings.TrimSpace(systemPrompt) +} + +// ReplaceHistory 整体替换对话历史。 +// +// 职责边界: +// 1. 负责把“会话快照恢复”这类场景需要的一次性覆盖入口收口到这里; +// 2. 只复制消息切片本身,避免调用方后续 append 污染同一底层数组; +// 3. 不深拷贝每个 message 指针,消息对象本身仍默认由上游按只读方式使用。 +func (c *ConversationContext) ReplaceHistory(history []*schema.Message) { + if c == nil { + return + } + c.History = cloneMessageSlice(history) +} + +// AppendHistory 追加对话历史。 +// +// 处理策略: +// 1. 跳过 nil message,避免后续 prompt 拼装时出现空指针; +// 2. 仅负责顺序追加,不做去重,不做裁剪; +// 3. 历史裁剪策略属于后续 prompt / memory 层能力,此处先不下沉。 +func (c *ConversationContext) AppendHistory(messages ...*schema.Message) { + if c == nil || len(messages) == 0 { + return + } + for _, msg := range messages { + if msg == nil { + continue + } + c.History = append(c.History, msg) + } +} + +// HistorySnapshot 返回历史消息的浅拷贝切片。 +func (c *ConversationContext) HistorySnapshot() []*schema.Message { + if c == nil { + return nil + } + return cloneMessageSlice(c.History) +} + +// UpsertPinnedBlock 按 Key 写入或覆盖一段置顶上下文。 +// +// 步骤说明: +// 1. Key 为空时直接忽略,因为后续无法做稳定覆盖; +// 2. 若已存在同 Key block,则原位覆盖,保证“当前 plan / 当前步骤”这类上下文始终只有一份; +// 3. 若不存在,则追加到末尾,至于渲染顺序由 prompt 层统一决定; +// 4. 此处不自动裁剪旧内容,避免 model 层擅自丢信息。 +func (c *ConversationContext) UpsertPinnedBlock(block ContextBlock) { + if c == nil { + return + } + + key := strings.TrimSpace(block.Key) + if key == "" { + return + } + block.Key = key + block.Title = strings.TrimSpace(block.Title) + block.Content = strings.TrimSpace(block.Content) + + for i := range c.PinnedBlocks { + if c.PinnedBlocks[i].Key == key { + c.PinnedBlocks[i] = block + return + } + } + c.PinnedBlocks = append(c.PinnedBlocks, block) +} + +// RemovePinnedBlock 删除指定 Key 的置顶上下文。 +func (c *ConversationContext) RemovePinnedBlock(key string) bool { + if c == nil { + return false + } + + key = strings.TrimSpace(key) + if key == "" { + return false + } + + for i := range c.PinnedBlocks { + if c.PinnedBlocks[i].Key != key { + continue + } + c.PinnedBlocks = append(c.PinnedBlocks[:i], c.PinnedBlocks[i+1:]...) + return true + } + return false +} + +// PinnedBlockByKey 按 Key 读取指定的置顶上下文。 +func (c *ConversationContext) PinnedBlockByKey(key string) (ContextBlock, bool) { + if c == nil { + return ContextBlock{}, false + } + + key = strings.TrimSpace(key) + if key == "" { + return ContextBlock{}, false + } + + for i := range c.PinnedBlocks { + if c.PinnedBlocks[i].Key == key { + return c.PinnedBlocks[i], true + } + } + return ContextBlock{}, false +} + +// PinnedBlocksSnapshot 返回置顶上下文块的浅拷贝切片。 +func (c *ConversationContext) PinnedBlocksSnapshot() []ContextBlock { + if c == nil { + return nil + } + result := make([]ContextBlock, len(c.PinnedBlocks)) + copy(result, c.PinnedBlocks) + return result +} + +// SetToolSchemas 整体替换工具 schema 摘要。 +func (c *ConversationContext) SetToolSchemas(schemas []ToolSchemaContext) { + if c == nil { + return + } + c.ToolSchemas = cloneToolSchemaSlice(schemas) +} + +// ToolSchemasSnapshot 返回工具 schema 摘要的浅拷贝切片。 +func (c *ConversationContext) ToolSchemasSnapshot() []ToolSchemaContext { + if c == nil { + return nil + } + return cloneToolSchemaSlice(c.ToolSchemas) +} + +func cloneMessageSlice(messages []*schema.Message) []*schema.Message { + if len(messages) == 0 { + return nil + } + result := make([]*schema.Message, len(messages)) + copy(result, messages) + return result +} + +func cloneToolSchemaSlice(schemas []ToolSchemaContext) []ToolSchemaContext { + if len(schemas) == 0 { + return nil + } + result := make([]ToolSchemaContext, len(schemas)) + copy(result, schemas) + return result +} diff --git a/backend/newAgent/model/execute_contract.go b/backend/newAgent/model/execute_contract.go new file mode 100644 index 0000000..283f51a --- /dev/null +++ b/backend/newAgent/model/execute_contract.go @@ -0,0 +1,205 @@ +package model + +import ( + "fmt" + "strings" +) + +// ExecuteAction 表示 execute 阶段单轮决策的动作类型。 +// +// 设计原则: +// 1. LLM 只负责“申报本轮想做什么”,不直接推进状态; +// 2. 后端只围绕这些有限动作做流程校验、证据校验、安全校验; +// 3. 动作枚举保持收敛,避免 execute 节点后续再次长成“自由文本协议”。 +type ExecuteAction string + +const ( + // ExecuteActionContinue 表示当前步骤尚未完成,需要继续本步骤的 ReAct 循环。 + ExecuteActionContinue ExecuteAction = "continue" + + // ExecuteActionAskUser 表示当前步骤缺少外部信息,需要中断并追问用户。 + ExecuteActionAskUser ExecuteAction = "ask_user" + + // ExecuteActionConfirm 表示当前步骤准备执行写操作,但必须先进入确认闸门。 + ExecuteActionConfirm ExecuteAction = "confirm" + + // ExecuteActionNextPlan 表示当前步骤已完成,可以推进到下一个 plan 步骤。 + ExecuteActionNextPlan ExecuteAction = "next_plan" + + // ExecuteActionDone 表示整个任务已完成,可以进入最终交付。 + ExecuteActionDone ExecuteAction = "done" +) + +// ExecuteDecision 是 execute prompt 单轮产出的统一决策结构。 +// +// 职责边界: +// 1. Speak 是这轮先对用户说的话,适合在真正调工具前流式吐给前端; +// 2. Action 是模型申报的“下一步动作类型”; +// 3. Reason 是给后端和日志看的简短解释,不直接等价于完成证明; +// 4. ToolCall 只是“意图”,不代表工具已经真正执行成功。 +type ExecuteDecision struct { + Speak string `json:"speak,omitempty"` + Action ExecuteAction `json:"action"` + Reason string `json:"reason,omitempty"` + ToolCall *ToolCallIntent `json:"tool_call,omitempty"` +} + +// Normalize 统一清洗 execute 决策中的字符串字段。 +func (d *ExecuteDecision) Normalize() { + if d == nil { + return + } + d.Speak = strings.TrimSpace(d.Speak) + d.Action = ExecuteAction(strings.TrimSpace(string(d.Action))) + d.Reason = strings.TrimSpace(d.Reason) + if d.ToolCall != nil { + d.ToolCall.Normalize() + } +} + +// Validate 校验 execute 决策的最小合法性。 +// +// 校验原则: +// 1. 这里只校验“协议是否自洽”,不校验工具是否真实存在,也不校验当前步骤是否真的完成; +// 2. 只允许少量动作与 tool_call 共存,避免后续 node 层收到含糊决策; +// 3. 真正的三类最小校验应放在执行层,这里只做第一道轻量门禁。 +func (d *ExecuteDecision) Validate() error { + if d == nil { + return fmt.Errorf("execute decision 不能为空") + } + + d.Normalize() + if d.Action == "" { + return fmt.Errorf("execute decision.action 不能为空") + } + + switch d.Action { + case ExecuteActionContinue: + if d.ToolCall != nil { + return d.ToolCall.Validate() + } + return nil + case ExecuteActionAskUser: + if d.ToolCall != nil { + return fmt.Errorf("ask_user 动作不应携带 tool_call") + } + return nil + case ExecuteActionConfirm: + if d.ToolCall == nil { + return fmt.Errorf("confirm 动作必须携带待确认的 tool_call") + } + return d.ToolCall.Validate() + case ExecuteActionNextPlan, ExecuteActionDone: + if d.ToolCall != nil { + return fmt.Errorf("%s 动作不应携带 tool_call", d.Action) + } + return nil + default: + return fmt.Errorf("未知 execute action: %s", d.Action) + } +} + +// ToolCallIntent 表示 execute 阶段申报的工具调用意图。 +// +// 设计目的: +// 1. 这里只描述“模型想调用什么工具、传什么参数”,不代表调用已经发生; +// 2. Arguments 暂时保留 map 结构,方便 prompt 输出原生 JSON 对象; +// 3. 是否需要 confirm 不应由模型决定,后续应由工具注册表或后端策略判定。 +type ToolCallIntent struct { + Name string `json:"name"` + Arguments map[string]any `json:"arguments,omitempty"` +} + +// Normalize 清洗工具调用意图中的稳定字段。 +func (t *ToolCallIntent) Normalize() { + if t == nil { + return + } + t.Name = strings.TrimSpace(t.Name) +} + +// Validate 校验工具调用意图的最小合法性。 +func (t *ToolCallIntent) Validate() error { + if t == nil { + return fmt.Errorf("tool_call 不能为空") + } + t.Normalize() + if t.Name == "" { + return fmt.Errorf("tool_call.name 不能为空") + } + return nil +} + +// ExecuteEvidenceSource 表示“当前步骤完成证明”来自哪里。 +type ExecuteEvidenceSource string + +const ( + // ExecuteEvidenceSourceToolObservation 表示来自读工具或分析工具的真实 observation。 + ExecuteEvidenceSourceToolObservation ExecuteEvidenceSource = "tool_observation" + + // ExecuteEvidenceSourceWriteReceipt 表示来自写工具成功执行后的回执。 + ExecuteEvidenceSourceWriteReceipt ExecuteEvidenceSource = "write_receipt" + + // ExecuteEvidenceSourceUserReply 表示来自用户补充回答的外部事实。 + ExecuteEvidenceSourceUserReply ExecuteEvidenceSource = "user_reply" +) + +// ExecuteEvidenceReceipt 表示“一条可被后端认可的最小事实证据”。 +// +// 职责边界: +// 1. StepIndex 用来绑定这条证据属于哪个 plan 步骤,避免旧 observation 污染新步骤; +// 2. Source / Name / Success 描述“这条证据是怎么来的、是否真的发生了”; +// 3. Summary 只用于日志、调试和交付串联,不替代原始 observation 本身; +// 4. 这里不做语义推理,只负责记录事实。 +type ExecuteEvidenceReceipt struct { + StepIndex int `json:"step_index"` + Source ExecuteEvidenceSource `json:"source"` + Name string `json:"name,omitempty"` + ArgumentsDigest string `json:"arguments_digest,omitempty"` + Success bool `json:"success"` + Summary string `json:"summary,omitempty"` +} + +// Normalize 清洗证据回执中的稳定字段。 +func (r *ExecuteEvidenceReceipt) Normalize() { + if r == nil { + return + } + r.Source = ExecuteEvidenceSource(strings.TrimSpace(string(r.Source))) + r.Name = strings.TrimSpace(r.Name) + r.ArgumentsDigest = strings.TrimSpace(r.ArgumentsDigest) + r.Summary = strings.TrimSpace(r.Summary) +} + +// Validate 校验证据回执是否具备最小可用信息。 +func (r *ExecuteEvidenceReceipt) Validate() error { + if r == nil { + return fmt.Errorf("evidence receipt 不能为空") + } + + r.Normalize() + if r.StepIndex < 0 { + return fmt.Errorf("evidence receipt.step_index 不能小于 0") + } + switch r.Source { + case ExecuteEvidenceSourceToolObservation, ExecuteEvidenceSourceWriteReceipt, ExecuteEvidenceSourceUserReply: + default: + return fmt.Errorf("未知 evidence source: %s", r.Source) + } + return nil +} + +// ExecuteValidationResult 保存 execute 单轮的三类最小校验结果。 +// +// 三类校验语义: +// 1. FlowPassed:当前动作在流程上是否合法,例如 done 是否允许直接发生; +// 2. EvidencePassed:当前动作是否有最小事实证据支撑; +// 3. SafetyPassed:当前动作是否触发了安全兜底,例如超轮次、重复空转、待确认未完成。 +type ExecuteValidationResult struct { + FlowPassed bool `json:"flow_passed"` + FlowReason string `json:"flow_reason,omitempty"` + EvidencePassed bool `json:"evidence_passed"` + EvidenceReason string `json:"evidence_reason,omitempty"` + SafetyPassed bool `json:"safety_passed"` + SafetyReason string `json:"safety_reason,omitempty"` +} diff --git a/backend/newAgent/model/pending_interaction.go b/backend/newAgent/model/pending_interaction.go new file mode 100644 index 0000000..b3e10b6 --- /dev/null +++ b/backend/newAgent/model/pending_interaction.go @@ -0,0 +1,220 @@ +package model + +import "strings" + +const ( + // PhaseChatting 表示当前请求只需正常聊天,不进入 plan / execute 主链路。 + PhaseChatting Phase = "chatting" + + // PhaseInterrupted 表示本轮执行被“待用户交互”显式打断,当前连接应结束并等待恢复。 + PhaseInterrupted Phase = "interrupted" +) + +const PendingInteractionSnapshotVersion = 1 + +// PendingInteractionType 表示当前挂起交互的类型。 +type PendingInteractionType string + +const ( + PendingInteractionTypeAskUser PendingInteractionType = "ask_user" + PendingInteractionTypeConfirm PendingInteractionType = "confirm" + PendingInteractionTypeConnectionLost PendingInteractionType = "connection_lost" +) + +// PendingInteractionStatus 表示挂起交互的生命周期状态。 +type PendingInteractionStatus string + +const ( + PendingInteractionStatusOpen PendingInteractionStatus = "open" + PendingInteractionStatusResolved PendingInteractionStatus = "resolved" + PendingInteractionStatusCanceled PendingInteractionStatus = "canceled" +) + +// PendingToolCallSnapshot 保存“待确认工具调用”的最小快照。 +// +// 职责边界: +// 1. 负责保存真正落库 / 落缓存恢复执行所需的最小信息; +// 2. ArgsJSON 约定存已经序列化好的参数快照,避免此处反向依赖具体 tool 参数结构; +// 3. 不负责工具执行,不负责幂等校验,不负责回滚。 +type PendingToolCallSnapshot struct { + ToolName string + ArgsJSON string + Summary string +} + +// PendingInteraction 保存“本轮需要中断并等待用户后续动作”的交互快照。 +// +// 设计目的: +// 1. ask_user 与 confirm 都不是业务 tool,而是流程级中断,所以单独建模; +// 2. ResumeNode / ResumePhase / ResumeStep 用来记录恢复点,避免用户回答后整条链路从头乱跑; +// 3. 该结构设计成可被 Redis + MySQL 直接存储的快照骨架,后续只需要补序列化与持久化接线。 +// +// TODO(newagent/store): 后续把该结构整体快照到 Redis + MySQL,形成双保险恢复点。 +// TODO(newagent/api): 后续由“用户追问回复接口 / 确认回调接口”读取这份快照并恢复运行。 +type PendingInteraction struct { + Version int + InteractionID string + Type PendingInteractionType + Status PendingInteractionStatus + DisplayText string + ResumeNode string + ResumePhase Phase + ResumeStep int + PendingTool *PendingToolCallSnapshot + Metadata map[string]any +} + +// AgentRuntimeState 是 graph 运行时真正流转的状态容器。 +// +// 职责边界: +// 1. CommonState 继续只负责主流程控制; +// 2. PendingInteraction 负责承载“需要中断后恢复”的交互快照; +// 3. 这样既不污染 CommonState 的职责,又能让 graph 在一次入参里拿到完整运行态。 +type AgentRuntimeState struct { + *CommonState + PendingInteraction *PendingInteraction +} + +// NewAgentRuntimeState 创建 graph 运行态。 +func NewAgentRuntimeState(state *CommonState) *AgentRuntimeState { + rt := &AgentRuntimeState{CommonState: state} + rt.EnsureCommonState() + return rt +} + +// EnsureCommonState 保证运行态里始终有一份可用的流程状态。 +// +// 步骤说明: +// 1. 若 CommonState 为空,则补一份最小默认值,避免 graph / node 层空指针; +// 2. 若 Phase 尚未设置,则默认回到 planning,保持当前主链路的保守起点; +// 3. 若 MaxRounds 未设置,则回填默认值,避免编译后运行时无上限循环。 +func (s *AgentRuntimeState) EnsureCommonState() *CommonState { + if s == nil { + return nil + } + if s.CommonState == nil { + s.CommonState = &CommonState{} + } + if s.CommonState.Phase == "" { + s.CommonState.Phase = PhasePlanning + } + if s.CommonState.MaxRounds <= 0 { + s.CommonState.MaxRounds = DefaultMaxRounds + } + return s.CommonState +} + +// HasPendingInteraction 判断当前是否存在待恢复交互。 +func (s *AgentRuntimeState) HasPendingInteraction() bool { + if s == nil || s.PendingInteraction == nil { + return false + } + return s.PendingInteraction.Status == PendingInteractionStatusOpen +} + +// PendingInteractionType 返回当前挂起交互类型。 +func (s *AgentRuntimeState) PendingInteractionType() PendingInteractionType { + if !s.HasPendingInteraction() { + return "" + } + return s.PendingInteraction.Type +} + +// OpenAskUserInteraction 打开一个“向用户追问”的中断快照。 +func (s *AgentRuntimeState) OpenAskUserInteraction(interactionID, question, resumeNode string) { + s.openPendingInteraction( + PendingInteractionTypeAskUser, + interactionID, + question, + resumeNode, + nil, + ) +} + +// OpenConfirmInteraction 打开一个“写操作待确认”的中断快照。 +func (s *AgentRuntimeState) OpenConfirmInteraction(interactionID, confirmText, resumeNode string, pendingTool *PendingToolCallSnapshot) { + s.openPendingInteraction( + PendingInteractionTypeConfirm, + interactionID, + confirmText, + resumeNode, + pendingTool, + ) +} + +// ResumeFromPending 从挂起交互恢复主流程。 +// +// 步骤说明: +// 1. 仅当存在 open 状态的 pending interaction 时才执行恢复; +// 2. 恢复时回写之前快照下来的 phase / step,确保继续跑的是原任务位置而不是新分支; +// 3. 恢复成功后清空挂起快照,避免同一份 pending 被重复消费。 +func (s *AgentRuntimeState) ResumeFromPending() bool { + if !s.HasPendingInteraction() { + return false + } + + flowState := s.EnsureCommonState() + pending := s.PendingInteraction + flowState.Phase = pending.ResumePhase + flowState.CurrentStep = pending.ResumeStep + pending.Status = PendingInteractionStatusResolved + s.PendingInteraction = nil + return true +} + +// ClearPendingInteraction 直接清空挂起交互。 +// +// 职责边界: +// 1. 仅负责粗暴清空快照; +// 2. 不自动恢复 phase / step,避免误把“取消交互”与“恢复执行”混为一谈; +// 3. 若需要恢复流程,应优先使用 ResumeFromPending。 +func (s *AgentRuntimeState) ClearPendingInteraction() { + if s == nil || s.PendingInteraction == nil { + return + } + s.PendingInteraction.Status = PendingInteractionStatusCanceled + s.PendingInteraction = nil +} + +func (s *AgentRuntimeState) openPendingInteraction( + interactionType PendingInteractionType, + interactionID string, + displayText string, + resumeNode string, + pendingTool *PendingToolCallSnapshot, +) { + if s == nil { + return + } + + flowState := s.EnsureCommonState() + resumePhase := flowState.Phase + if resumePhase == "" { + resumePhase = PhasePlanning + } + + s.PendingInteraction = &PendingInteraction{ + Version: PendingInteractionSnapshotVersion, + InteractionID: strings.TrimSpace(interactionID), + Type: interactionType, + Status: PendingInteractionStatusOpen, + DisplayText: strings.TrimSpace(displayText), + ResumeNode: strings.TrimSpace(resumeNode), + ResumePhase: resumePhase, + ResumeStep: flowState.CurrentStep, + PendingTool: clonePendingToolCallSnapshot(pendingTool), + } + + // 1. 一旦进入 pending 状态,当前连接上的 graph 应立即停止向后执行。 + // 2. 这里先统一把 Phase 置为 interrupted,后续恢复时再按快照写回原阶段。 + // 3. 这样分支函数只需要判断 HasPendingInteraction(),无需猜测“当前 phase 是否仍可信”。 + flowState.Phase = PhaseInterrupted +} + +func clonePendingToolCallSnapshot(snapshot *PendingToolCallSnapshot) *PendingToolCallSnapshot { + if snapshot == nil { + return nil + } + copied := *snapshot + return &copied +} diff --git a/backend/newAgent/prompt/execute.go b/backend/newAgent/prompt/execute.go new file mode 100644 index 0000000..9861f02 --- /dev/null +++ b/backend/newAgent/prompt/execute.go @@ -0,0 +1,104 @@ +package newagentprompt + +import ( + "strings" + + newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" + "github.com/cloudwego/eino/schema" +) + +const ( + // ExecuteNextPlanSignal 表示“当前 plan 步骤已经完成,可以进入下一个步骤”。 + // + // TODO(newagent/node): 后续 executeNode 识别到该信号后,调用 state.AdvanceStep() 或决定进入交付阶段。 + ExecuteNextPlanSignal = "[NEXT_PLAN]" + + // ExecuteDoneSignal 表示“整个任务已经完成,可以结束执行链路”。 + // + // TODO(newagent/node): 后续 executeNode 识别到该信号后,调用 state.Done() 并进入 deliver。 + ExecuteDoneSignal = "[DONE]" + + // ExecuteAskUserSignal 表示“执行阶段缺关键信息,需要向用户追问”。 + // + // TODO(newagent/node): 后续若你决定支持 ask_user,这里可作为统一控制信号继续扩展。 + ExecuteAskUserSignal = "[ASK_USER]" +) + +const executeSystemPrompt = ` +你是 SmartFlow NewAgent 的执行器。 + +你的职责是在“当前 plan 步骤”的约束下,进行思考、执行、观察,再决定下一步动作。 + +请遵守以下规则: +1. 只围绕当前步骤行动,不要擅自跳到其他 plan 步骤。 +2. 只有当你确认当前步骤已经完成时,才输出 ` + "`" + `[NEXT_PLAN]` + "`" + `。 +3. 只有当你确认整个任务已经完成时,才输出 ` + "`" + `[DONE]` + "`" + `。 +4. 如果执行当前步骤缺少关键上下文,且无法通过已有历史或工具补齐,可以输出 ` + "`" + `[ASK_USER]` + "`" + `。 +5. 不要伪造工具结果;如果尚未真正拿到观察结果,就不要假装已经完成。 + +你会看到: +- 当前完整 plan +- 当前步骤 +- 置顶上下文块 +- 工具摘要 +- 历史对话与历史观察 + +请把注意力聚焦在“当前步骤是否完成,以及下一步最合理的执行动作”上。 +` + +// BuildExecuteSystemPrompt 返回执行阶段系统提示词。 +func BuildExecuteSystemPrompt() string { + return strings.TrimSpace(executeSystemPrompt) +} + +// BuildExecuteMessages 组装执行阶段的 messages。 +// +// 职责边界: +// 1. 负责收敛执行阶段需要的 system / pinned / history / runtime prompt; +// 2. 负责把“完整 plan + 当前步骤 + 控制信号”显式告知模型; +// 3. 不负责解析模型输出,也不负责真正调用工具。 +// +// TODO(newagent/node): 后续 executeNode 应直接复用这个方法,而不是在节点内手拼执行提示词。 +func BuildExecuteMessages(state *newagentmodel.CommonState, ctx *newagentmodel.ConversationContext) []*schema.Message { + return buildStageMessages( + BuildExecuteSystemPrompt(), + ctx, + BuildExecuteUserPrompt(state), + ) +} + +// BuildExecuteUserPrompt 构造执行阶段的用户提示词。 +func BuildExecuteUserPrompt(state *newagentmodel.CommonState) string { + var sb strings.Builder + + sb.WriteString("请继续当前任务的执行阶段。\n") + sb.WriteString(renderStateSummary(state)) + sb.WriteString("\n") + + if state == nil || !state.HasPlan() { + sb.WriteString("当前没有可执行的完整 plan,请不要盲目进入执行;如有需要请回退到规划阶段。\n") + return strings.TrimSpace(sb.String()) + } + + if currentStep, ok := state.CurrentPlanStep(); ok { + sb.WriteString("执行要求:\n") + sb.WriteString("1. 始终围绕下面这个当前步骤行动。\n") + sb.WriteString("2. 若当前步骤未完成,请继续思考-执行-观察循环。\n") + sb.WriteString("3. 若当前步骤已完成,请输出 ") + sb.WriteString(ExecuteNextPlanSignal) + sb.WriteString("。\n") + sb.WriteString("4. 若整个任务已完成,请输出 ") + sb.WriteString(ExecuteDoneSignal) + sb.WriteString("。\n") + sb.WriteString("5. 若缺少关键用户信息且现有上下文无法补足,请输出 ") + sb.WriteString(ExecuteAskUserSignal) + sb.WriteString("。\n") + sb.WriteString("\n当前步骤正文:\n") + sb.WriteString(currentStep) + sb.WriteString("\n") + } else { + sb.WriteString("当前 plan 已存在,但当前步骤索引无效;请不要擅自执行其他步骤。\n") + } + + return strings.TrimSpace(sb.String()) +} diff --git a/backend/newAgent/prompt/plan.go b/backend/newAgent/prompt/plan.go new file mode 100644 index 0000000..0de1936 --- /dev/null +++ b/backend/newAgent/prompt/plan.go @@ -0,0 +1,244 @@ +package newagentprompt + +import ( + "fmt" + "strings" + + newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" + "github.com/cloudwego/eino/schema" +) + +const ( + // PlanDoneSignal 表示“规划阶段结束,可以进入 confirm 或下一阶段”。 + // + // TODO(newagent/node): 后续由 planNode 读取模型输出时识别这个信号,并据此调用 state.FinishPlan(...)。 + PlanDoneSignal = "[PLAN_DONE]" +) + +const planSystemPrompt = ` +你是 SmartFlow NewAgent 的规划器。 + +你的职责不是直接执行任务,而是先把用户意图拆成一组清晰、稳定、可逐步执行的自然语言计划。 + +请遵守以下规则: +1. 只负责规划,不要假装已经调用了工具,也不要伪造执行结果。 +2. 每一轮只推进一步规划;如果信息不足,可以明确指出缺口。 +3. 若当前计划仍不完整,就继续围绕当前任务补全计划,不要跳去执行细节。 +4. 若你认为计划已经完整可执行,请在输出中显式带上 ` + "`" + `[` + `PLAN_DONE` + `]` + "`" + ` 信号。 +5. 计划必须使用自然语言,便于后端将完整 plan 重新注入到后续上下文顶部。 + +你会看到: +- 当前阶段与轮次信息 +- 已有完整 plan(如果之前已经规划过) +- 当前步骤(如果已存在) +- 置顶上下文块 +- 可用工具摘要 +- 历史对话 + +请基于这些输入继续规划,而不是重复忽略既有 plan。 +` + +// BuildPlanSystemPrompt 返回规划阶段系统提示词。 +func BuildPlanSystemPrompt() string { + return strings.TrimSpace(planSystemPrompt) +} + +// BuildPlanMessages 组装规划阶段的 messages。 +// +// 职责边界: +// 1. 负责把 state + context 收敛成规划阶段模型输入; +// 2. 负责把“置顶上下文”和“工具摘要”放到 history 前面,降低模型跑偏概率; +// 3. 不负责解析模型输出,不负责判断是否真的完成规划。 +// +// TODO(newagent/node): 后续 planNode 直接复用这个入口,不要在节点里散落拼 message 的逻辑。 +func BuildPlanMessages(state *newagentmodel.CommonState, ctx *newagentmodel.ConversationContext, userInput string) []*schema.Message { + return buildStageMessages( + BuildPlanSystemPrompt(), + ctx, + BuildPlanUserPrompt(state, userInput), + ) +} + +// BuildPlanUserPrompt 构造规划阶段的用户提示词。 +// +// 设计目标: +// 1. 把当前阶段、轮次、既有 plan、当前步骤等控制信息显式写给模型; +// 2. 保持自然语言风格,方便你后续继续改成自己想要的控制协议; +// 3. 用户原始输入单独放在末尾,避免被系统拼装信息淹没。 +func BuildPlanUserPrompt(state *newagentmodel.CommonState, userInput string) string { + var sb strings.Builder + + sb.WriteString("请继续当前任务的规划阶段。\n") + sb.WriteString(renderStateSummary(state)) + sb.WriteString("\n") + sb.WriteString("本轮目标:围绕当前任务继续规划,直到形成一份稳定、可执行的自然语言 plan。\n") + sb.WriteString("如果计划已经完整,请显式输出 ") + sb.WriteString(PlanDoneSignal) + sb.WriteString("。\n") + + trimmedInput := strings.TrimSpace(userInput) + if trimmedInput != "" { + sb.WriteString("\n用户本轮输入:\n") + sb.WriteString(trimmedInput) + sb.WriteString("\n") + } + + return strings.TrimSpace(sb.String()) +} + +// buildStageMessages 组装某个阶段通用的 messages。 +// +// 步骤说明: +// 1. 先合并 context 自带 system prompt 与阶段 prompt,保证通用约束和阶段约束都能生效; +// 2. 再把置顶上下文块和工具摘要补成 system message,尽量顶在 history 前面; +// 3. 最后追加历史消息与本轮 user prompt,保持“新约束在前、历史在后”的稳定顺序。 +func buildStageMessages(stageSystemPrompt string, ctx *newagentmodel.ConversationContext, runtimeUserPrompt string) []*schema.Message { + messages := make([]*schema.Message, 0, 4) + + mergedSystemPrompt := mergeSystemPrompts(ctx, stageSystemPrompt) + if mergedSystemPrompt != "" { + messages = append(messages, schema.SystemMessage(mergedSystemPrompt)) + } + + if pinnedText := renderPinnedBlocks(ctx); pinnedText != "" { + messages = append(messages, schema.SystemMessage(pinnedText)) + } + + if toolText := renderToolSchemas(ctx); toolText != "" { + messages = append(messages, schema.SystemMessage(toolText)) + } + + if ctx != nil { + history := ctx.HistorySnapshot() + if len(history) > 0 { + messages = append(messages, history...) + } + } + + runtimeUserPrompt = strings.TrimSpace(runtimeUserPrompt) + if runtimeUserPrompt != "" { + messages = append(messages, schema.UserMessage(runtimeUserPrompt)) + } + + return messages +} + +// renderStateSummary 将当前流程状态渲染成简洁文本。 +func renderStateSummary(state *newagentmodel.CommonState) string { + if state == nil { + return "当前状态:state 缺失,请先进行兜底处理。" + } + + var sb strings.Builder + current, total := state.PlanProgress() + + sb.WriteString(fmt.Sprintf("当前阶段:%s\n", state.Phase)) + sb.WriteString(fmt.Sprintf("当前轮次:%d/%d\n", state.RoundUsed, state.MaxRounds)) + + if !state.HasPlan() { + sb.WriteString("当前完整 plan:暂无。\n") + return sb.String() + } + + sb.WriteString("当前完整 plan:\n") + for i, step := range state.PlanSteps { + sb.WriteString(fmt.Sprintf("%d. %s\n", i+1, strings.TrimSpace(step))) + } + + if step, ok := state.CurrentPlanStep(); ok { + sb.WriteString(fmt.Sprintf("当前步骤进度:%d/%d\n", current, total)) + sb.WriteString("当前步骤内容:\n") + sb.WriteString(step) + sb.WriteString("\n") + } else { + sb.WriteString("当前步骤进度:暂无有效当前步骤。\n") + } + + return sb.String() +} + +// renderPinnedBlocks 将 ConversationContext 中的置顶块渲染成一段独立的 system 内容。 +func renderPinnedBlocks(ctx *newagentmodel.ConversationContext) string { + if ctx == nil { + return "" + } + + blocks := ctx.PinnedBlocksSnapshot() + if len(blocks) == 0 { + return "" + } + + var sb strings.Builder + sb.WriteString("以下是后端置顶注入的上下文,请优先遵守:\n") + for _, block := range blocks { + title := strings.TrimSpace(block.Title) + if title == "" { + title = strings.TrimSpace(block.Key) + } + if title != "" { + sb.WriteString("【") + sb.WriteString(title) + sb.WriteString("】\n") + } + sb.WriteString(strings.TrimSpace(block.Content)) + sb.WriteString("\n") + } + return strings.TrimSpace(sb.String()) +} + +// renderToolSchemas 将工具摘要渲染成独立文本块。 +func renderToolSchemas(ctx *newagentmodel.ConversationContext) string { + if ctx == nil { + return "" + } + + schemas := ctx.ToolSchemasSnapshot() + if len(schemas) == 0 { + return "" + } + + var sb strings.Builder + sb.WriteString("以下是当前可用工具摘要,仅供你在规划时参考能力边界:\n") + for _, item := range schemas { + name := strings.TrimSpace(item.Name) + desc := strings.TrimSpace(item.Desc) + schemaText := strings.TrimSpace(item.SchemaText) + + if name != "" { + sb.WriteString("- 工具名:") + sb.WriteString(name) + sb.WriteString("\n") + } + if desc != "" { + sb.WriteString(" 说明:") + sb.WriteString(desc) + sb.WriteString("\n") + } + if schemaText != "" { + sb.WriteString(" 参数摘要:") + sb.WriteString(schemaText) + sb.WriteString("\n") + } + } + + return strings.TrimSpace(sb.String()) +} + +func mergeSystemPrompts(ctx *newagentmodel.ConversationContext, stageSystemPrompt string) string { + base := "" + if ctx != nil { + base = strings.TrimSpace(ctx.SystemPrompt) + } + stageSystemPrompt = strings.TrimSpace(stageSystemPrompt) + + switch { + case base == "" && stageSystemPrompt == "": + return "" + case base == "": + return stageSystemPrompt + case stageSystemPrompt == "": + return base + default: + return base + "\n\n" + stageSystemPrompt + } +} diff --git a/backend/newAgent/shared/retry.go b/backend/newAgent/shared/retry.go new file mode 100644 index 0000000..c091723 --- /dev/null +++ b/backend/newAgent/shared/retry.go @@ -0,0 +1,85 @@ +package newagentshared + +import ( + "context" + "time" +) + +// RetryOptions 描述公共重试策略。 +// +// 职责边界: +// 1. 这里只定义“是否重试、最多几次、间隔多久”; +// 2. 不关心具体业务是工具调用失败、模型 JSON 失败还是 DB 暂时不可用; +// 3. 真正的业务兜底文案仍应由上层 node 决定。 +type RetryOptions struct { + MaxAttempts int + Interval time.Duration + ShouldRetry func(err error) bool + OnRetry func(attempt int, err error) +} + +// Do 执行一个只返回 error 的重试任务。 +// +// 执行规则: +// 1. 第一次执行也算一次 attempt; +// 2. 任意一次成功即立即返回; +// 3. 上下文取消、达到最大次数、或 ShouldRetry=false 时立即停止。 +func Do(ctx context.Context, options RetryOptions, fn func(attempt int) error) error { + _, err := DoValue[struct{}](ctx, options, func(attempt int) (struct{}, error) { + return struct{}{}, fn(attempt) + }) + return err +} + +// DoValue 执行一个带返回值的通用重试任务。 +// +// 设计说明: +// 1. 旧 agent 里后续很多地方都会出现“失败重试 2~3 次”的模式; +// 2. 这里先把循环骨架统一,避免每个 skill 自己写 for + sleep + ctx.Done; +// 3. 上层只需关心“本轮失败要不要继续”,而不是重复造轮子。 +func DoValue[T any](ctx context.Context, options RetryOptions, fn func(attempt int) (T, error)) (T, error) { + var zero T + + maxAttempts := options.MaxAttempts + if maxAttempts <= 0 { + maxAttempts = 1 + } + + for attempt := 1; attempt <= maxAttempts; attempt++ { + if err := ctx.Err(); err != nil { + return zero, err + } + + value, err := fn(attempt) + if err == nil { + return value, nil + } + + // 1. 到最后一次了,直接返回原错误,避免无意义等待。 + if attempt >= maxAttempts { + return zero, err + } + // 2. 业务显式声明“不值得重试”时,立刻停止。 + if options.ShouldRetry != nil && !options.ShouldRetry(err) { + return zero, err + } + // 3. 把重试钩子留给上层,用于打点或阶段提示。 + if options.OnRetry != nil { + options.OnRetry(attempt, err) + } + // 4. 没有配置间隔则马上下一轮;配置了则等待,同时尊重 ctx 取消。 + if options.Interval <= 0 { + continue + } + + timer := time.NewTimer(options.Interval) + select { + case <-ctx.Done(): + timer.Stop() + return zero, ctx.Err() + case <-timer.C: + } + } + + return zero, nil +} diff --git a/backend/newAgent/shared/time.go b/backend/newAgent/shared/time.go new file mode 100644 index 0000000..8ddee0a --- /dev/null +++ b/backend/newAgent/shared/time.go @@ -0,0 +1,49 @@ +package newagentshared + +import ( + "sync" + "time" +) + +const ( + // MinuteLayout 是 Agent 内部统一的分钟级时间文本格式。 + // + // 设计原因: + // 1. agent 里大量场景只需要精确到分钟; + // 2. 秒级精度会增加提示词噪声,也容易让“同一请求内的当前时间”出现抖动; + // 3. 先统一成一份常量,后续 quicknote / schedule 都直接复用。 + MinuteLayout = "2006-01-02 15:04" +) + +var ( + shanghaiLocOnce sync.Once + shanghaiLoc *time.Location +) + +// ShanghaiLocation 返回 Agent 内部统一使用的东八区时区。 +func ShanghaiLocation() *time.Location { + shanghaiLocOnce.Do(func() { + loc, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + // 兜底使用固定东八区,避免极端环境下因为系统时区文件缺失导致整个链路失败。 + loc = time.FixedZone("CST", 8*3600) + } + shanghaiLoc = loc + }) + return shanghaiLoc +} + +// NowToMinute 返回当前北京时间,并截断到分钟级。 +func NowToMinute() time.Time { + return time.Now().In(ShanghaiLocation()).Truncate(time.Minute) +} + +// NormalizeToMinute 把任意时间统一到北京时间分钟粒度。 +func NormalizeToMinute(t time.Time) time.Time { + return t.In(ShanghaiLocation()).Truncate(time.Minute) +} + +// FormatMinute 把时间格式化为统一分钟级文本。 +func FormatMinute(t time.Time) string { + return NormalizeToMinute(t).Format(MinuteLayout) +} diff --git a/backend/newAgent/stream/emitter.go b/backend/newAgent/stream/emitter.go new file mode 100644 index 0000000..1e62a89 --- /dev/null +++ b/backend/newAgent/stream/emitter.go @@ -0,0 +1,115 @@ +package newagentstream + +import ( + "fmt" + "strings" +) + +// PayloadEmitter 是真正向外层 SSE 管道写 chunk 的最小接口。 +// +// 说明: +// 1. 这里刻意不用 chan/string 绑死实现; +// 2. 上层既可以传“写 channel”的函数,也可以传“写 gin stream”的函数; +// 3. 只要签名是 `func(string) error`,都能接进来。 +type PayloadEmitter func(payload string) error + +// StageEmitter 是 graph/node 对“当前阶段”进行推送的最小接口。 +type StageEmitter func(stage, detail string) + +// NoopPayloadEmitter 返回一个空实现,便于骨架期安全占位。 +func NoopPayloadEmitter() PayloadEmitter { + return func(string) error { return nil } +} + +// NoopStageEmitter 返回一个空实现,避免 graph 在没有接前端时处处判空。 +func NoopStageEmitter() StageEmitter { + return func(stage, detail string) {} +} + +// WrapStageEmitter 把可空函数包装成稳定的 StageEmitter。 +func WrapStageEmitter(fn func(stage, detail string)) StageEmitter { + if fn == nil { + return NoopStageEmitter() + } + return fn +} + +// EmitStageAsReasoning 把“阶段提示”伪装成 reasoning chunk 推给前端。 +// +// 设计背景: +// 1. 你当前 Apifox 只认思考块和正文块,因此阶段提示需要先借 reasoning_content 走通; +// 2. 这样后续真正前端上线时,只需要在这一层换协议,而不必回到各 skill 重改 graph; +// 3. 这里不拼花哨格式,只给出稳定、可读、可 grep 的文本。 +func EmitStageAsReasoning(emit PayloadEmitter, requestID, modelName string, created int64, stage, detail string, includeRole bool) error { + if emit == nil { + return nil + } + + text := BuildStageReasoningText(stage, detail) + payload, err := ToOpenAIReasoningChunk(requestID, modelName, created, text, includeRole) + if err != nil { + return err + } + if payload == "" { + return nil + } + return emit(payload) +} + +// EmitAssistantReply 把一段完整正文作为 assistant chunk 推出。 +// +// 注意: +// 1. 这里是“整段发”,不是把文本强行拆碎; +// 2. 这样后续如果某条链路不需要真流式,也可以复用统一出口; +// 3. 真正按 token/chunk 细粒度流式输出,应由 llm.Stream + 上层循环处理。 +func EmitAssistantReply(emit PayloadEmitter, requestID, modelName string, created int64, content string, includeRole bool) error { + if emit == nil { + return nil + } + payload, err := ToOpenAIAssistantChunk(requestID, modelName, created, content, includeRole) + if err != nil { + return err + } + if payload == "" { + return nil + } + return emit(payload) +} + +// EmitFinish 统一输出 stop 结束块。 +func EmitFinish(emit PayloadEmitter, requestID, modelName string, created int64) error { + if emit == nil { + return nil + } + payload, err := ToOpenAIFinishStream(requestID, modelName, created) + if err != nil { + return err + } + if payload == "" { + return nil + } + return emit(payload) +} + +// EmitDone 统一输出 OpenAI 兼容流式结束标记。 +func EmitDone(emit PayloadEmitter) error { + if emit == nil { + return nil + } + return emit("[DONE]") +} + +// BuildStageReasoningText 生成统一阶段提示文本。 +func BuildStageReasoningText(stage, detail string) string { + stage = strings.TrimSpace(stage) + detail = strings.TrimSpace(detail) + + switch { + case stage != "" && detail != "": + return fmt.Sprintf("阶段:%s\n%s", stage, detail) + case stage != "": + return fmt.Sprintf("阶段:%s", stage) + default: + return detail + } +} diff --git a/backend/newAgent/stream/openai.go b/backend/newAgent/stream/openai.go new file mode 100644 index 0000000..2810250 --- /dev/null +++ b/backend/newAgent/stream/openai.go @@ -0,0 +1,102 @@ +package newagentstream + +import ( + "encoding/json" + + "github.com/cloudwego/eino/schema" +) + +// OpenAIChunkResponse 是 OpenAI 兼容的流式 chunk DTO。 +// +// 之所以单独放到 Agent/stream: +// 1. 未来无论 quicknote、taskquery 还是 schedule,只要需要 SSE 都会复用这套协议壳; +// 2. 这样 node/graph 层只关注“我要推什么内容”,不再自己拼 JSON; +// 3. 后续如果前端协议升级,也能在这里集中改。 +type OpenAIChunkResponse struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + Model string `json:"model"` + Choices []OpenAIChunkChoice `json:"choices"` +} + +// OpenAIChunkChoice 对应 OpenAI choices[0]。 +type OpenAIChunkChoice struct { + Index int `json:"index"` + Delta OpenAIChunkDelta `json:"delta"` + FinishReason *string `json:"finish_reason"` +} + +// OpenAIChunkDelta 是真正承载 role/content/reasoning 的位置。 +type OpenAIChunkDelta struct { + Role string `json:"role,omitempty"` + Content string `json:"content,omitempty"` + ReasoningContent string `json:"reasoning_content,omitempty"` +} + +// ToOpenAIStream 把 Eino message 转成 OpenAI 兼容 chunk。 +// +// 职责边界: +// 1. 负责把 chunk.Content / chunk.ReasoningContent 映射到协议字段; +// 2. 负责按 includeRole 决定是否在首块带上 assistant 角色; +// 3. 不负责发送,也不负责决定“这个 chunk 该不该推”。 +func ToOpenAIStream(chunk *schema.Message, requestID, modelName string, created int64, includeRole bool) (string, error) { + delta := OpenAIChunkDelta{} + if includeRole { + delta.Role = "assistant" + } + if chunk != nil { + delta.Content = chunk.Content + delta.ReasoningContent = chunk.ReasoningContent + } + return buildOpenAIChunkPayload(requestID, modelName, created, delta, nil) +} + +// ToOpenAIReasoningChunk 直接构造一个 reasoning chunk。 +func ToOpenAIReasoningChunk(requestID, modelName string, created int64, reasoning string, includeRole bool) (string, error) { + delta := OpenAIChunkDelta{ReasoningContent: reasoning} + if includeRole { + delta.Role = "assistant" + } + return buildOpenAIChunkPayload(requestID, modelName, created, delta, nil) +} + +// ToOpenAIAssistantChunk 直接构造一个正文 chunk。 +func ToOpenAIAssistantChunk(requestID, modelName string, created int64, content string, includeRole bool) (string, error) { + delta := OpenAIChunkDelta{Content: content} + if includeRole { + delta.Role = "assistant" + } + return buildOpenAIChunkPayload(requestID, modelName, created, delta, nil) +} + +// ToOpenAIFinishStream 生成流式结束 chunk(finish_reason=stop)。 +func ToOpenAIFinishStream(requestID, modelName string, created int64) (string, error) { + stop := "stop" + return buildOpenAIChunkPayload(requestID, modelName, created, OpenAIChunkDelta{}, &stop) +} + +func buildOpenAIChunkPayload(requestID, modelName string, created int64, delta OpenAIChunkDelta, finishReason *string) (string, error) { + // 1. 若既没有 role,也没有正文/思考,也没有 finish_reason,则视为“空块”,直接跳过。 + // 2. 这样可以避免上层每次都自己写一遍空块判断。 + if delta.Role == "" && delta.Content == "" && delta.ReasoningContent == "" && finishReason == nil { + return "", nil + } + + dto := OpenAIChunkResponse{ + ID: requestID, + Object: "chat.completion.chunk", + Created: created, + Model: modelName, + Choices: []OpenAIChunkChoice{{ + Index: 0, + Delta: delta, + FinishReason: finishReason, + }}, + } + data, err := json.Marshal(dto) + if err != nil { + return "", err + } + return string(data), nil +} diff --git a/newagent-roadmap.md b/newagent-roadmap.md new file mode 100644 index 0000000..30c7704 --- /dev/null +++ b/newagent-roadmap.md @@ -0,0 +1,70 @@ +# NewAgent 改造路线 + +## 核心架构 + +砍掉路由和多张业务 graph,换成一张通用循环图,用 eino compose 搭建。能力通过 tool 横向扩展,图本身不再变。 + +### 循环图结构 + +``` +START → Plan Loop(循环,直到 PLAN_DONE) → Confirm Plan → Execute Loop(ReAct + Reflection) → 交付 → END +``` + +- Plan Loop:每轮后端注入上下文(当前阶段、已确定步骤、已收集信息),LLM 每次只想一步,可调感知类 tool 收集信息,输出 [PLAN_DONE] 进入下一阶段 +- Confirm:plan 完成后推给用户确认,不 ok 回 Plan 重来 +- Execute Loop:按 plan 逐步调 tool,每步完 reflection,发现 plan 有问题可自行修正 +- 交付:执行结果推给用户检查 + +### 关键设计决策 + +1. **不需要路由**:整个 agent 就是一个 tool-use 循环,LLM 自己判断聊天还是干活,上下文理解能力本身就是最好的路由器 +2. **写操作前必须 confirm**:所有会改动日程的 tool 执行前自动触发用户确认,读操作不需要 +3. **Thinking 策略**:首轮理解意图时开,后续 tool 循环轮次关掉 +4. **Graph 的角色**:从业务流程编排降级为基础设施,编排 agent loop 本身(LLM → tool → LLM 循环),业务逻辑下沉到 tools + +## 工具设计原则 + +工具做计算,LLM 做决策。LLM 不碰原始时间数据,只看自然语言级别的信息。 + +### 感知类(让 LLM "看"时间) +- `get_free_slots(date_range, min_duration)` — 返回空闲时段 +- `get_conflicts(proposed_event)` — 返回冲突信息 +- `get_day_summary(date)` — 某天负载概况 +- `get_task_context(task_id)` — 任务完整上下文 + +### 操作类(让 LLM "动手") +- `create_event` / `update_event` / `delete_event` — 基础 CRUD +- `batch_create_events(events[])` — 批量创建 +- `swap_events(event_a, event_b)` — 交换时间段 +- `reschedule_event(event_id, constraints)` — 自动找合适时段重排 + +### 分析类(让 LLM "想") +- `estimate_workload(date_range)` — 工作量分布 +- `find_best_slot(duration, deadline, preferences)` — 给定约束算最优时段 +- `check_feasibility(task_list, deadline)` — 可行性检测 + +## 状态设计 + +State 和 Context 分离: + +- `AgentState`:阶段标记、plan 步骤、confirm 状态、tool 调用记录、轮次计数(流程控制) +- `ConversationContext`:消息历史、system prompt、tool schemas、注入的阶段上下文(对话管理) + +两者通过 traceID / session 关联,数据结构分开。 + +## 落地方式 + +- `newagent/` 复制型搬迁,分层结构延续 +- 已搬迁:`llm/`(client、ark、json)、`stream/`(emitter、openai)、`shared/`(time、retry) +- 包名统一为 `newagent` 前缀(newagentllm、newagentstream、newagentshared) +- 新增 `tool/` 目录存放可插拔工具 +- 老 agent 保留对照,跑通再删 + +## 优先级 + +P0(先做):循环图 + 感知类工具 + 基础 CRUD + ask_user 多轮交互 + confirm 机制 +P1(后续):记忆机制、websearch、skills、分析类工具 + +## 协作模式 + +我(人类)搭函数框架 + 写清楚注释,Codex 负责填充实现。架构决策权在人类手里。