diff --git a/backend/api/agent.go b/backend/api/agent.go index d8a99fc..928bfda 100644 --- a/backend/api/agent.go +++ b/backend/api/agent.go @@ -52,6 +52,11 @@ func (api *AgentHandler) ChatAgent(c *gin.Context) { // 3) 规范化会话 ID conversationID := strings.TrimSpace(req.ConversationID) if conversationID == "" { + // confirm_action 需要关联已存在的会话状态,缺少 conversation_id 直接报错。 + if _, ok := req.Extra["confirm_action"]; ok { + c.JSON(http.StatusBadRequest, respond.MissingConversationID) + return + } conversationID = uuid.NewString() } c.Writer.Header().Set("X-Conversation-ID", conversationID) diff --git a/backend/cmd/start.go b/backend/cmd/start.go index 194f8a5..8375899 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -6,11 +6,13 @@ import ( "log" "github.com/LoveLosita/smartflow/backend/api" + "github.com/LoveLosita/smartflow/backend/conv" "github.com/LoveLosita/smartflow/backend/dao" kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" "github.com/LoveLosita/smartflow/backend/inits" "github.com/LoveLosita/smartflow/backend/middleware" + newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" "github.com/LoveLosita/smartflow/backend/pkg" "github.com/LoveLosita/smartflow/backend/routers" "github.com/LoveLosita/smartflow/backend/service" @@ -100,6 +102,12 @@ func Start() { scheduleService := service.NewScheduleService(scheduleRepo, userRepo, taskClassRepo, manager, cacheRepo) agentService := service.NewAgentServiceWithSchedule(aiHub, agentRepo, taskRepo, cacheRepo, agentCacheRepo, eventBus, scheduleService) + // newAgent 依赖接线。 + agentService.SetAgentStateStore(dao.NewAgentStateStoreAdapter(cacheRepo)) + agentService.SetToolRegistry(newagenttools.NewDefaultRegistry()) + agentService.SetScheduleProvider(conv.NewScheduleProvider(scheduleRepo, taskClassRepo)) + agentService.SetSchedulePersistor(conv.NewSchedulePersistorAdapter(manager)) + // API 层初始化。 userApi := api.NewUserHandler(userService) taskApi := api.NewTaskHandler(taskSv) diff --git a/backend/conv/schedule_persist.go b/backend/conv/schedule_persist.go new file mode 100644 index 0000000..d5b83ec --- /dev/null +++ b/backend/conv/schedule_persist.go @@ -0,0 +1,174 @@ +package conv + +import ( + "context" + "fmt" + + "github.com/LoveLosita/smartflow/backend/dao" + "github.com/LoveLosita/smartflow/backend/model" + newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" +) + +// SchedulePersistorAdapter 实现 model.SchedulePersistor 接口。 +// 组合 RepoManager,调用 PersistScheduleChanges 持久化变更。 +type SchedulePersistorAdapter struct { + manager *dao.RepoManager +} + +// NewSchedulePersistorAdapter 创建持久化适配器。 +func NewSchedulePersistorAdapter(manager *dao.RepoManager) *SchedulePersistorAdapter { + return &SchedulePersistorAdapter{manager: manager} +} + +// PersistScheduleChanges 实现 model.SchedulePersistor 接口。 +func (a *SchedulePersistorAdapter) PersistScheduleChanges(ctx context.Context, original, modified *newagenttools.ScheduleState, userID int) error { + return PersistScheduleChanges(ctx, a.manager, original, modified, userID) +} + +// PersistScheduleChanges 将内存中的 ScheduleState 变更持久化到数据库。 +// +// 职责边界: +// 1. 调用 DiffScheduleState 计算变更; +// 2. 在事务中逐个应用变更到数据库; +// 3. 全部成功或全部回滚,保证原子性。 +func PersistScheduleChanges( + ctx context.Context, + manager *dao.RepoManager, + original *newagenttools.ScheduleState, + modified *newagenttools.ScheduleState, + userID int, +) error { + changes := DiffScheduleState(original, modified) + if len(changes) == 0 { + return nil + } + + return manager.Transaction(ctx, func(txM *dao.RepoManager) error { + for _, change := range changes { + if err := applyScheduleChange(ctx, txM, change, userID); err != nil { + return fmt.Errorf("应用变更失败 [%s %s]: %w", change.Type, change.Name, err) + } + } + return nil + }) +} + +// applyScheduleChange 应用单个变更到数据库。 +func applyScheduleChange(ctx context.Context, manager *dao.RepoManager, change ScheduleChange, userID int) error { + switch change.Type { + case ChangePlace: + return applyPlaceChange(ctx, manager, change, userID) + case ChangeMove: + return applyMoveChange(ctx, manager, change, userID) + case ChangeUnplace: + return applyUnplaceChange(ctx, manager, change, userID) + default: + return fmt.Errorf("未知变更类型: %s", change.Type) + } +} + +// applyPlaceChange 应用放置变更。 +func applyPlaceChange(ctx context.Context, manager *dao.RepoManager, change ScheduleChange, userID int) error { + // Place:pending → placed,为现有 Event 创建 Schedule + // 前提:Event 已经存在(SourceID 是 ScheduleEvent.ID) + // NewCoords 包含所有需要放置的位置(可能多天/多节) + + if len(change.NewCoords) == 0 { + return fmt.Errorf("place 变更缺少目标位置") + } + + if change.Source != "event" || change.SourceID == 0 { + return fmt.Errorf("place 变更需要有效的 event source") + } + + // 按周天分组,压缩成 slot ranges + groups := groupCoordsByWeekDay(change.NewCoords) + for week, dayGroups := range groups { + for dayOfWeek, coords := range dayGroups { + startSection, endSection := minMaxSection(coords) + + // 创建 schedule 记录(event 已存在,只创建 schedule) + schedules := make([]model.Schedule, endSection-startSection+1) + for sec := startSection; sec <= endSection; sec++ { + schedules[sec-startSection] = model.Schedule{ + UserID: userID, + Week: week, + DayOfWeek: dayOfWeek, + Section: sec, + EventID: change.SourceID, + } + } + + // 批量创建 + _, err := manager.Schedule.AddSchedules(schedules) + if err != nil { + return fmt.Errorf("创建 schedule 失败: %w", err) + } + } + } + return nil +} + +// applyMoveChange 应用移动变更。 +func applyMoveChange(ctx context.Context, manager *dao.RepoManager, change ScheduleChange, userID int) error { + // Move:已有 schedule,只更新位置 + // 需要删除旧位置的 schedule,在新位置创建新 schedule + + // 1. 删除旧位置 + if change.Source == "event" && change.SourceID != 0 { + if err := manager.Schedule.DeleteScheduleEventAndSchedule(ctx, change.SourceID, userID); err != nil { + return fmt.Errorf("删除旧位置失败: %w", err) + } + } + + // 2. 创建新位置(复用 place 逻辑) + return applyPlaceChange(ctx, manager, change, userID) +} + +// applyUnplaceChange 应用移除变更。 +func applyUnplaceChange(ctx context.Context, manager *dao.RepoManager, change ScheduleChange, userID int) error { + // Unplace:删除 schedule,任务恢复为 pending + if change.Source == "event" && change.SourceID != 0 { + return manager.Schedule.DeleteScheduleEventAndSchedule(ctx, change.SourceID, userID) + } + return fmt.Errorf("unplace 变更的 source 不是 event: %s", change.Source) +} + +// ==================== 辅助函数 ==================== + +// intPtr 返回 int 指针,零值返回 nil。 +func intPtr(v int) *int { + if v == 0 { + return nil + } + return &v +} + +// groupCoordsByWeekDay 按周天分组坐标。 +func groupCoordsByWeekDay(coords []SlotCoord) map[int]map[int][]SlotCoord { + result := make(map[int]map[int][]SlotCoord) + for _, coord := range coords { + if result[coord.Week] == nil { + result[coord.Week] = make(map[int][]SlotCoord) + } + result[coord.Week][coord.DayOfWeek] = append(result[coord.Week][coord.DayOfWeek], coord) + } + return result +} + +// minMaxSection 返回坐标列表中的最小和最大节次。 +func minMaxSection(coords []SlotCoord) (min, max int) { + if len(coords) == 0 { + return 0, 0 + } + min, max = coords[0].Section, coords[0].Section + for _, c := range coords[1:] { + if c.Section < min { + min = c.Section + } + if c.Section > max { + max = c.Section + } + } + return +} diff --git a/backend/conv/schedule_provider.go b/backend/conv/schedule_provider.go new file mode 100644 index 0000000..beecb06 --- /dev/null +++ b/backend/conv/schedule_provider.go @@ -0,0 +1,112 @@ +package conv + +import ( + "context" + "fmt" + "time" + + "github.com/LoveLosita/smartflow/backend/dao" + "github.com/LoveLosita/smartflow/backend/model" + newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" +) + +// ScheduleProvider 实现 model.ScheduleStateProvider 接口。 +// 通过 DAO 层加载用户的日程和任务数据,调用 LoadScheduleState 构建内存状态。 +// +// 职责边界: +// 1. 只负责"从 DB 查数据 + 调 LoadScheduleState 转换",不含业务逻辑; +// 2. 不负责缓存(由上层 Service 决定是否缓存); +// 3. 不负责 Diff 和持久化(由 Confirm 流程负责)。 +type ScheduleProvider struct { + scheduleDAO *dao.ScheduleDAO + taskClassDAO *dao.TaskClassDAO +} + +// NewScheduleProvider 创建 ScheduleProvider。 +func NewScheduleProvider(scheduleDAO *dao.ScheduleDAO, taskClassDAO *dao.TaskClassDAO) *ScheduleProvider { + return &ScheduleProvider{ + scheduleDAO: scheduleDAO, + taskClassDAO: taskClassDAO, + } +} + +// LoadScheduleState 实现 model.ScheduleStateProvider 接口。 +// 加载用户当前周的日程和所有待安排任务,构建 ScheduleState。 +func (p *ScheduleProvider) LoadScheduleState(ctx context.Context, userID int) (*newagenttools.ScheduleState, error) { + // 1. 确定当前周。 + now := time.Now() + week, _, err := RealDateToRelativeDate(now.Format(DateFormat)) + if err != nil { + return nil, fmt.Errorf("解析当前日期失败: %w", err) + } + + // 2. 加载当前周的所有日程(含 Event + EmbeddedTask 预加载)。 + schedules, err := p.scheduleDAO.GetUserWeeklySchedule(ctx, userID, week) + if err != nil { + return nil, fmt.Errorf("加载用户周日程失败: %w", err) + } + + // 3. 加载用户所有任务类(含 Items 预加载)。 + // 两步:先拿 ID 列表,再批量获取完整数据(含 Items)。 + taskClasses, err := p.loadCompleteTaskClasses(ctx, userID) + if err != nil { + return nil, err + } + + // 4. 构建 WindowDay 列表(当前周 7 天)。 + windowDays := make([]WindowDay, 7) + for i := 0; i < 7; i++ { + windowDays[i] = WindowDay{Week: week, DayOfWeek: i + 1} + } + + // 5. 构建额外 item category 映射(已加载全部 taskClass,通常为空)。 + extraItemCategories := buildExtraItemCategories(schedules, taskClasses) + + // 6. 调用已有的 LoadScheduleState 构建内存状态。 + return LoadScheduleState(schedules, taskClasses, extraItemCategories, windowDays), nil +} + +// loadCompleteTaskClasses 批量加载用户所有任务类(含 Items 预加载)。 +func (p *ScheduleProvider) loadCompleteTaskClasses(ctx context.Context, userID int) ([]model.TaskClass, error) { + basicClasses, err := p.taskClassDAO.GetUserTaskClasses(userID) + if err != nil { + return nil, fmt.Errorf("加载用户任务类失败: %w", err) + } + if len(basicClasses) == 0 { + return nil, nil + } + + ids := make([]int, len(basicClasses)) + for i, tc := range basicClasses { + ids[i] = tc.ID + } + + complete, err := p.taskClassDAO.GetCompleteTaskClassesByIDs(ctx, userID, ids) + if err != nil { + return nil, fmt.Errorf("加载完整任务类失败: %w", err) + } + return complete, nil +} + +// buildExtraItemCategories 从已有日程中提取不属于给定 taskClasses 的 task event 的 category 映射。 +// 当加载全部 taskClass 时,通常返回空 map。 +func buildExtraItemCategories(schedules []model.Schedule, taskClasses []model.TaskClass) map[int]string { + knownItemIDs := make(map[int]bool) + for _, tc := range taskClasses { + for _, item := range tc.Items { + knownItemIDs[item.ID] = true + } + } + + categories := make(map[int]string) + for _, s := range schedules { + if s.Event == nil || s.Event.Type != "task" || s.Event.RelID == nil { + continue + } + itemID := *s.Event.RelID + if !knownItemIDs[itemID] { + categories[itemID] = "任务" + } + } + return categories +} diff --git a/backend/dao/agent_state_store_adapter.go b/backend/dao/agent_state_store_adapter.go new file mode 100644 index 0000000..8ff25af --- /dev/null +++ b/backend/dao/agent_state_store_adapter.go @@ -0,0 +1,53 @@ +package dao + +import ( + "context" + "errors" + + newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" +) + +// AgentStateStoreAdapter 将 CacheDAO 适配为 newAgent 的 AgentStateStore 接口。 +// +// 职责边界: +// 1. CacheDAO 的 LoadAgentState 使用 out-parameter 模式,需要适配到返回值模式; +// 2. CacheDAO 的 SaveAgentState 接受 any,需要适配到 *AgentStateSnapshot; +// 3. DeleteAgentState 签名已匹配,直接转发。 +type AgentStateStoreAdapter struct { + cache *CacheDAO +} + +// NewAgentStateStoreAdapter 创建适配器。 +func NewAgentStateStoreAdapter(cache *CacheDAO) *AgentStateStoreAdapter { + return &AgentStateStoreAdapter{cache: cache} +} + +// Save 序列化并保存 agent 状态快照。 +func (a *AgentStateStoreAdapter) Save(ctx context.Context, conversationID string, snapshot *newagentmodel.AgentStateSnapshot) error { + if a == nil || a.cache == nil { + return errors.New("agent state store adapter is not initialized") + } + return a.cache.SaveAgentState(ctx, conversationID, snapshot) +} + +// Load 读取并反序列化 agent 状态快照。 +func (a *AgentStateStoreAdapter) Load(ctx context.Context, conversationID string) (*newagentmodel.AgentStateSnapshot, bool, error) { + if a == nil || a.cache == nil { + return nil, false, errors.New("agent state store adapter is not initialized") + } + + var snapshot newagentmodel.AgentStateSnapshot + ok, err := a.cache.LoadAgentState(ctx, conversationID, &snapshot) + if err != nil || !ok { + return nil, ok, err + } + return &snapshot, true, nil +} + +// Delete 删除 agent 状态快照。 +func (a *AgentStateStoreAdapter) Delete(ctx context.Context, conversationID string) error { + if a == nil || a.cache == nil { + return errors.New("agent state store adapter is not initialized") + } + return a.cache.DeleteAgentState(ctx, conversationID) +} diff --git a/backend/newAgent/ARCHITECTURE.md b/backend/newAgent/ARCHITECTURE.md new file mode 100644 index 0000000..ec80e57 --- /dev/null +++ b/backend/newAgent/ARCHITECTURE.md @@ -0,0 +1,681 @@ +# NewAgent 架构全景 + +> 本文档帮助读者建立对 newAgent 的完整心智模型,从宏观到微观逐层展开。 + +--- + +## 一、一句话概括 + +newAgent 是一个 **状态机驱动的有向图**:用户消息进入 Chat 节点,经过意图分类、计划生成、用户确认、工具执行、最终交付,每一步由 Phase 和 PendingInteraction 驱动路由。 + +--- + +## 二、宏观架构 + +### 2.1 目录结构 + +``` +newAgent/ +├── graph/ 图骨架:节点注册、边连线、分支路由 +├── model/ 数据模型:状态、合约、接口定义 +├── node/ 节点实现:每个节点的业务逻辑 +├── prompt/ 提示词:每个阶段的 system prompt 和用户 prompt 构造 +├── llm/ LLM 客户端:文本生成、JSON 解析、流式适配 +├── stream/ SSE 输出:伪流式推送、OpenAI 兼容格式 +├── tools/ 工具层:10 个排程工具 + 注册表 +├── shared/ 公共工具:重试、时区 +├── router/ 路由(当前为空,路由逻辑在 graph/ 中) +├── ROADMAP.md 改造计划 +└── ARCHITECTURE.md 本文档 +``` + +### 2.2 图结构 + +``` +START + │ + v + Chat ──────┬── 意图=chat ────→ END(直接回复) + │ │ + │ └── 意图=task ──→ Plan ──┬── continue ──→ Plan(继续规划) + │ │ │ + │ │ ├── ask_user ──→ Interrupt ──→ END + │ │ │ + │ │ └── plan_done ──→ Confirm + │ │ │ + │ │ ├── 有计划 → 确认卡片 + │ │ │ │ + │ │ │ v + │ │ │ Interrupt ──→ END + │ │ │ + │ │ └── 有 PendingConfirmTool → 确认卡片 + │ │ + │ v + │ Interrupt ──→ END + │ + │ [用户确认后重新进入图] + │ + v + Chat(resume) ──┬── accept → 恢复 PendingConfirmTool → Execute + │ + └── reject → 回到 planning 或 executing + +Execute ──┬── continue(读工具) ──→ Execute(继续 ReAct) + ├── continue(无工具) ──→ Execute + ├── confirm(写工具) ──→ Confirm ──→ Interrupt ──→ END + ├── ask_user ──→ Interrupt ──→ END + ├── next_plan ──┬── 有剩余步骤 → Execute + │ └── 无剩余步骤 → Deliver + ├── done ──→ Deliver + └── 轮次耗尽 ──→ Deliver(强制) + +Deliver ──→ END(最终总结,清理持久化快照) +``` + +### 2.3 一次完整排课的请求序列 + +``` +请求1: 用户发 "帮我安排下周的复习" + → Chat(intent=task) → Plan(plan_done, 2步计划) → Confirm → Interrupt → END + 前端展示确认卡片 + +请求2: 用户点 "确认" + → Chat(resume, accept) → 确认通过 → Execute(读工具 get_overview) + → Execute(读工具 find_free) + → Execute(写工具 place → confirm) → Confirm → Interrupt → END + 前端展示写操作确认卡片 + +请求3: 用户点 "确认" + → Chat(resume, accept) → Execute(执行 pending tool place) → 持久化 + → Execute(next_plan → 下一步) + → Execute(done) → Deliver → END + 前端展示最终总结 +``` + +--- + +## 三、状态模型 + +理解 newAgent 的关键在于理解 **什么东西在什么时机变化**。 + +### 3.1 三个核心状态对象 + +``` +┌─────────────────────┐ 持久化到 Redis +│ AgentRuntimeState │ ← StateStore.Save/Load +│ │ +│ ┌───────────────┐ │ +│ │ CommonState │ │ ← 每个节点都可能修改 +│ │ - Phase │ │ +│ │ - PlanSteps │ │ +│ │ - CurrentStep │ │ +│ │ - RoundUsed │ │ +│ └───────────────┘ │ +│ │ +│ PendingInteraction │ ← 确认/追问 的交互快照 +│ PendingConfirmTool │ ← Execute→Confirm 的临时邮箱 +└─────────────────────┘ + +┌─────────────────────────┐ 不持久化,每次请求重建 +│ ConversationContext │ +│ - SystemPrompt │ ← 各节点 prompt 函数构造 +│ - History []*Message │ ← 对话历史(assistant+tool 配对) +│ - PinnedBlocks │ ← 置顶上下文(计划、工具摘要) +│ - ToolSchemas │ ← 工具 schema 注入 +└─────────────────────────┘ + +┌─────────────────────────┐ 懒加载,首次 Execute 时读取 +│ ScheduleState │ +│ - Window (天数+映射) │ +│ - Tasks []ScheduleTask │ ← 工具操作的数据源 +└─────────────────────────┘ +``` + +### 3.2 Phase 状态转换 + +``` +PhasePlanning Plan 节点 plan_done + 用户确认后 + │ + v +PhaseExecuting Execute 节点执行中 + │ + ├──→ PhaseWaitingConfirm Execute 输出 action=confirm + │ │ + │ v 用户确认 + │ PhaseExecuting 恢复继续执行 + │ + ├──→ PhaseDone Execute 输出 done 或所有步骤完成 + │ + └──→ PhaseInterrupted 被中断(追问/确认等待用户输入) +``` + +### 3.3 PendingInteraction 生命周期 + +``` +场景 A: 计划确认 + Plan → plan_done → Confirm 节点 → OpenConfirmInteraction(type="confirm") + → Interrupt 展示 → END + → 用户确认 → Chat(resume) → ResumeFromPending() → Phase=executing + +场景 B: 写操作确认 + Execute → action=confirm → 设置 PendingConfirmTool → Confirm 节点 + → OpenConfirmInteraction(type="confirm", PendingTool=快照) + → Interrupt 展示 → END + → 用户确认 → Chat(resume) → PendingConfirmTool 从快照恢复 → Execute 执行工具 + +场景 C: 追问 + Execute → action=ask_user → OpenAskUserInteraction(question) + → Interrupt 展示 → END + → 用户回复 → Chat(resume) → Phase 回到 executing +``` + +### 3.4 PendingConfirmTool 临时邮箱 + +这个字段 **不持久化**,只在单次图运行中存在: + +``` +Execute(action=confirm) + → PendingConfirmTool = {ToolName, ArgsJSON, Summary} + → Phase = waiting_confirm + → Confirm 节点读取 → 转入 PendingInteraction.PendingTool + → PendingConfirmTool 被清空 + +用户确认后重新进入图: + → Chat(resume) → 从 PendingInteraction.PendingTool 恢复到 PendingConfirmTool + → Execute 发现 PendingConfirmTool 非空 → 直接执行工具 → 清空 +``` + +--- + +## 四、各节点详解 + +### 4.1 Chat 节点 (`node/chat.go`) + +**职责**:入口分流 + 中断恢复 + +**两条路径**: +1. **首次进入**:调 LLM 做意图分类("chat" / "task"),chat 直接回复,task 转到 Plan +2. **中断恢复**:读取 PendingInteraction,根据类型(ask_user / confirm)走不同恢复路径 + +**关键逻辑**: +``` +if HasPendingInteraction(): + handleChatResume() // 不调 LLM +else: + chatIntentDecision() // 调 LLM 做意图分类 +``` + +**confirm resume 的 accept/reject 处理**: +- accept:从 PendingInteraction.PendingTool 恢复 PendingConfirmTool,Phase=executing +- reject(有 PendingTool):不恢复 PendingConfirmTool,Phase=executing(LLM 换方案) +- reject(无 PendingTool):调用 RejectPlan(),Phase=planning(回到规划) + +### 4.2 Plan 节点 (`node/plan.go`) + +**职责**:LLM 生成结构化计划 + +**两阶段 LLM 调用**: +1. **Phase 1 快速评估**:temperature=0.2, max_tokens=1600, thinking=关闭 + - 输出 PlanDecision,判断 Complexity(simple/moderate/complex) +2. **Phase 2 深度规划**(仅 complex 任务触发):thinking=开启, max_tokens=3200 + - 生成更详细的 PlanStep 列表(含 DoneWhen 完成判定条件) + +**三种 action**: +- `continue`:继续规划(多轮对话中补充信息) +- `ask_user`:追问用户 +- `plan_done`:规划完成,输出 PlanSteps + +**计划写入 PinnedBlocks**:用 `UpsertPinnedBlock` 把计划文本注入 ConversationContext,后续 Execute 阶段自动带入。 + +### 4.3 Confirm 节点 (`node/confirm.go`) + +**职责**:创建确认卡片,不调 LLM + +**两种确认**: +1. **计划确认**(Phase=waiting_confirm, PendingConfirmTool 为空):格式化计划摘要,创建 PendingInteraction +2. **工具确认**(PendingConfirmTool 非空):格式化工具操作摘要,把 PendingTool 快照转入 PendingInteraction + +**关键**:Confirm 节点执行后,PendingConfirmTool 被清空(数据已转移到 PendingInteraction.PendingTool)。 + +### 4.4 Execute 节点 (`node/execute.go`) + +**职责**:LLM 主导的 ReAct 循环,这是最复杂的节点。 + +**入口判断优先级**: +``` +1. PendingConfirmTool 非空 → executePendingTool() → 结束 +2. 无有效 PlanStep → 报错 +3. 正常 ReAct → 调 LLM → 处理决策 +``` + +**LLM 调用参数**:temperature=0.3, max_tokens=1200, thinking=开启 + +**JSON 解析失败处理**(correction 机制): +``` +LLM 输出非 JSON: + → ConsecutiveCorrections++ + → 追加修正消息到历史 + → return nil(图循环回来,LLM 看到修正消息后重试) + → 连续 3 次失败 → 返回硬错误,终止 +``` + +**五种 action 处理**: +| action | 行为 | 工具执行? | +|--------|------|-----------| +| continue + tool_call | 读工具直接执行 | 是,executeToolCall() | +| continue 无 tool | 仅说话,继续循环 | 否 | +| confirm | 暂存 PendingConfirmTool | 否,等用户确认 | +| ask_user | 打开追问 | 否 | +| next_plan | 推进步骤 | 否 | +| done | 结束所有步骤 | 否 | + +**工具执行后历史消息格式**: +``` +assistant message: {Role: "assistant", ToolCalls: [{ID, Function: {Name, Arguments}}]} +tool message: {Role: "tool", ToolCallID: <匹配ID>, Content: "工具结果"} +``` +这对消息必须配对,否则 OpenAI 兼容 API 会拒绝请求。 + +**轮次预算**:MaxRounds 默认 30,耗尽强制进入 Deliver。 + +### 4.5 Interrupt 节点 (`node/interrupt.go`) + +**职责**:向用户展示消息后暂停图执行 + +**三种类型**: +- ask_user:伪流式展示 DisplayText +- confirm:展示确认状态 +- 默认:展示通用中断信息 + +### 4.6 Deliver 节点 (`node/deliver.go`) + +**职责**:生成最终总结 + +- 调 LLM(temperature=0.5, max_tokens=800)生成总结 +- 失败时降级到机械格式化(逐条列出步骤 + 完成标记) +- 完成后调用 deleteAgentState() 清理 Redis 快照 + +--- + +## 五、LLM 交互模式 + +### 5.1 统一 JSON 协议 + +所有 LLM 输出都是严格 JSON,不是纯文本。每个阶段有自己的合约: + +**Plan 合约** (`model/plan_contract.go`): +```json +{ + "speak": "...", + "action": "continue|ask_user|plan_done", + "reason": "...", + "complexity": "simple|moderate|complex", + "need_thinking": false, + "plan_steps": [{"content": "...", "done_when": "..."}] +} +``` + +**Execute 合约** (`model/execute_contract.go`): +```json +{ + "speak": "...", + "action": "continue|ask_user|confirm|next_plan|done", + "reason": "...", + "goal_check": "(next_plan/done 时必填)", + "tool_call": {"name": "工具名", "arguments": {...}} +} +``` + +### 5.2 JSON 解析容错 + +`llm/json.go` 的 `ParseJSONObject` 能处理: +- LLM 在 JSON 前后附带文字 → 提取中间的 JSON 对象 +- Markdown 代码块包裹(```json ... ```)→ 剥离 +- 嵌套对象(大括号配对计数) + +### 5.3 Correction 循环 + +当 LLM 输出非法 JSON 时: +``` +1. 原始输出作为 assistant 消息追加到历史 +2. 修正提示作为 user 消息追加到历史 +3. return nil → 图循环回来 +4. LLM 看到修正消息,下一轮输出合法 JSON +5. ConsecutiveCorrections 重置为 0 +6. 连续 3 次失败 → 硬错误终止 +``` + +--- + +## 六、工具系统 + +### 6.1 数据模型 + +`ScheduleState` 是工具操作的唯一数据源: + +``` +ScheduleState +├── Window 时间窗口 +│ ├── TotalDays 总天数(如 5 或 7) +│ └── DayMapping[] day_index → (week, day_of_week) 映射 +└── Tasks[] 扁平任务列表 + ├── source="event" 来自日程表的已有课程/任务 + │ ├── Slots[] 压缩的时段范围 + │ ├── CanEmbed 是否允许嵌入 + │ └── Locked 是否锁定(不可移动) + └── source="task_item" 来自任务类的待安排任务 + ├── Duration 需要的连续时段数 + ├── CategoryID 所属 TaskClass.ID + └── Status="pending" 待安排 +``` + +### 6.2 10 个工具 + +**读工具(直接执行,不需要确认)**: + +| 工具 | 用途 | 典型调用时机 | +|------|------|------------| +| `get_overview` | 全局概览:天数、占用统计、可嵌入、待安排 | LLM 需要了解全局 | +| `query_range` | 查询指定天/时段的详情 | LLM 需要具体位置信息 | +| `find_free` | 查找连续空闲时段 | LLM 需要找空位放任务 | +| `list_tasks` | 按条件列出任务 | LLM 需要筛选任务 | +| `get_task_info` | 单个任务详情(含嵌入关系) | LLM 需要具体任务信息 | + +**写工具(需用户确认)**: + +| 工具 | 用途 | 关键逻辑 | +|------|------|---------| +| `place` | 放置 pending 任务到时段 | 自动检测嵌入(CanEmbed=true 的宿主) | +| `move` | 移动已有任务到新位置 | 冲突检测(排除自身) | +| `swap` | 交换两个等时长任务的时段 | 冲突时自动回滚 | +| `batch_move` | 批量移动多个任务 | 原子性:任一冲突全部回滚 | +| `unplace` | 取消放置,恢复 pending | 清理双向嵌入关系 | + +### 6.3 工具执行流程 + +**读工具**(action=continue + tool_call): +``` +Execute → executeToolCall() → registry.Execute() → 追加 assistant+tool 消息对 → return nil → 图循环 +``` + +**写工具**(action=confirm): +``` +Execute → handleExecuteActionConfirm() → 暂存 PendingConfirmTool +→ Confirm 节点 → Interrupt → 用户确认 +→ Chat(resume) → 恢复 PendingConfirmTool +→ Execute → executePendingTool() → registry.Execute() + persistor + 追加消息对 +``` + +### 6.4 持久化路径 + +``` +工具执行成功 + → DiffScheduleState(original, modified) → []ScheduleChange + → PersistScheduleChanges(事务) + → applyPlaceChange / applyMoveChange / applyUnplaceChange +``` + +**当前限制**:`applyPlaceChange` 只处理 `source="event"`,`source="task_item"` 会报错。详见 ROADMAP.md P0 缺口。 + +--- + +## 七、SSE 输出系统 + +### 7.1 ChunkEmitter + +所有节点通过 `ChunkEmitter` 向前端推送事件: + +``` +EmitPseudoAssistantText() → 伪流式文本(分段推送,模拟打字效果) +EmitStatus() → 状态推送("正在执行第2步") +EmitConfirmRequest() → 确认卡片 +EmitFinish() / EmitDone() → 结束标记 +``` + +### 7.2 伪流式 + +LLM 的一次性文本输出通过 `SplitPseudoStreamText` 拆分成多个 chunk: +- 按中英文标点断句 +- 每个 chunk 8~24 个字符 +- 间隔 40ms 推送 + +### 7.3 OpenAI 兼容格式 + +`stream/openai.go` 定义了 OpenAI 兼容的 SSE 格式,通过 `ext` 字段扩展: +- `reasoning_text`:思考过程 +- `assistant_text`:正文 +- `status`:状态更新 +- `tool_call` / `tool_result`:工具调用 +- `confirm_request`:确认卡片 +- `interrupt`:中断消息 + +--- + +## 八、持久化模型 + +### 8.1 三个持久化层次 + +| 层级 | 机制 | 何时触发 | 存什么 | +|------|------|---------|--------| +| 快照 | AgentStateStore (Redis) | Plan/Confirm/Execute 节点后 | AgentRuntimeState + ConversationContext | +| 变更 | SchedulePersistor (MySQL) | 写工具执行后 | ScheduleState 的 diff | +| 历史 | Redis + MySQL | 图运行完成后 | 完整对话历史 | + +### 8.2 快照恢复流程 + +``` +用户发送新消息(图需要从中断恢复) + → loadOrCreateRuntimeState() + → StateStore.Load(conversationID) + → 如果存在:恢复 RuntimeState + ConversationContext + → 如果不存在:创建全新状态 +``` + +快照在 Deliver 后被 `deleteAgentState()` 清理。 + +--- + +## 九、Prompt 体系 + +### 9.1 prompt 构造模式 + +所有阶段共享 `buildStageMessages()` 函数: + +``` +System Prompt(节点专属) + │ + v +Pinned Blocks(置顶上下文块,作为独立 system 消息注入) + │ + v +Tool Schemas(工具 schema,作为独立 system 消息注入) + │ + v +History(对话历史,Tool 消息降级为 User 消息以兼容 API) + │ + v +User Prompt(节点专属用户提示) +``` + +### 9.2 各阶段 prompt 要点 + +| 阶段 | 核心指令 | 关键约束 | +|------|---------|---------| +| Chat | 分类意图:chat vs task | 保守默认为 task | +| Plan | 两阶段:快速评估 + 深度规划 | 简单任务不开启 thinking | +| Execute | ReAct:思考→执行→观察 | goal_check 为 next_plan/done 必填 | +| Deliver | 总结计划执行结果 | 失败降级到机械格式化 | + +### 9.3 置顶上下文块 + +``` +PinnedBlocks 是跨节点共享的上下文,通过 Key 去重: + +execution_context ← Execute 节点注入(当前步骤、完成判定等) +plan ← Plan 节点注入(完整计划文本) +tool_summary ← Execute 节点注入(可用工具摘要) +``` + +--- + +## 十、图路由逻辑 (`graph/common_graph.go`) + +路由函数是图的核心控制逻辑,决定了每步之后走向哪个节点: + +### branchAfterChat +``` +if PendingInteraction → Interrupt +else switch Phase: + planning → Plan + executing → Execute + done → Deliver + chatting → END +``` + +### branchAfterPlan +``` +if PendingInteraction → Interrupt +else switch Phase: + waiting_confirm → Confirm + planning → Plan(continue,继续规划) + executing → Execute(不应该发生,但防御性路由) +``` + +### branchAfterConfirm +``` +if PendingInteraction → Interrupt +else → Execute(确认通过) +``` + +### branchAfterExecute +``` +if PendingInteraction → Interrupt +else switch Phase: + executing → Execute(继续循环) + done → Deliver + waiting_confirm → Confirm(不应该发生,防御性路由) +``` + +### 关键保护机制 + +所有分支函数都以 `branchIfInterrupted()` 开头: +```go +func branchIfInterrupted(st *AgentGraphState) string { + if st.RuntimeState.HasPendingInteraction() { + return "interrupt" + } + return "" +} +``` +这确保任何节点设置了 PendingInteraction 后,图都会走向 Interrupt 节点展示给用户。 + +--- + +## 十一、Service 集成层 (`service/agentsvc/agent_newagent.go`) + +### 入口函数:runNewAgentGraph + +``` +1. 规范化 conversationID, modelName +2. 确保会话存在(Redis 缓存 → DB) +3. 构建重试元数据 +4. 加载或创建 RuntimeState(从 Redis 快照恢复) +5. 构建 AgentGraphRequest(ConfirmAction 从 extra 取) +6. 包装 Ark 客户端 +7. 创建 SSE 适配器 + ChunkEmitter +8. 组装 AgentGraphDeps(注入所有依赖) +9. 调用 RunAgentGraph() +10. 持久化对话历史到 Redis + MySQL +11. 发送 [DONE] 标记,触发异步标题生成 +``` + +### 依赖注入 + +``` +cmd/start.go: + → NewScheduleProvider(scheduleDAO, taskClassDAO) → SetScheduleProvider() + → NewSchedulePersistorAdapter(repoManager) → SetSchedulePersistor() + → NewDefaultRegistry() → SetToolRegistry() + → NewRedisStateStore(cacheDAO) → SetAgentStateStore() +``` + +--- + +## 十二、如何调试 + +### 12.1 日志关键字 + +| 搜索关键字 | 含义 | +|-----------|------| +| `[DEBUG] execute LLM` | Execute 节点的 LLM 原始输出和解析结果 | +| `[DEBUG] plan LLM` | Plan 节点的 LLM 输出 | +| `[WARN] execute 决策不合法` | LLM 输出合法 JSON 但 action 不合法 | +| `[DEBUG] execute LLM 输出解析失败` | JSON 解析失败,触发 correction | +| `PersistScheduleChanges` | 持久化调用 | +| `loadOrCreateRuntimeState` | 状态恢复/创建 | + +### 12.2 常见问题排查 + +**SSE 断开**: +1. 检查 `[DEBUG] execute LLM` 日志,看 LLM 输出是否为合法 JSON +2. 如果输出 `[NEXT_PLAN]` 等纯文本 → prompt 问题(已修复,参考 execute.go 的 correction 机制) +3. 如果输出合法 JSON 但 action 不对 → 检查 prompt 的合约文本 + +**工具不执行**: +1. 检查 PendingConfirmTool 是否被正确设置和恢复 +2. 检查 ScheduleState 是否为 nil(可能 ScheduleProvider 未注入) +3. 检查 history 中 assistant+tool 消息是否配对(ToolCallID 是否匹配) + +**图循环不退出**: +1. 检查 ConsecutiveCorrections 计数(可能 LLM 反复输出非法 JSON) +2. 检查 RoundUsed 是否耗尽(MaxRounds 默认 30) +3. 检查 Phase 是否卡在某个状态 + +### 12.3 单元测试 + +``` +node/execute_confirm_flow_test.go → 7 个测试,覆盖完整 confirm 回路 +node/llm_tool_orchestration_test.go → 5 个测试,覆盖真实排课场景 +``` + +测试使用 mock LLM(预定义 JSON 响应序列)和 mock 工具注册表,不依赖外部服务。 + +--- + +## 十三、关键设计决策及理由 + +| 决策 | 理由 | +|------|------| +| Phase 驱动路由而非硬编码序列 | 同一个图支持多种流程(直接聊天、排课、追问恢复),Phase 是最小状态信号 | +| PendingInteraction 作为中断快照 | 图是无状态的(每次请求重新运行),需要一种机制跨请求传递"等用户回复"的上下文 | +| PendingConfirmTool 作为临时邮箱 | Execute 和 Confirm 之间不能直接传参(中间隔了 Interrupt+END+Chat),用运行态字段传递 | +| JSON 协议而非文本标记 | LLM 输出结构化数据,后端用泛型解析,避免正则匹配的不确定性 | +| Correction 机制 | LLM 不是 100% 可靠,需要给修正机会,但限制最大连续次数避免死循环 | +| 伪流式而非真流式 | LLM API 的一次性返回更适合分段推送,真流式实现复杂且收益低 | +| 工具操作扁平 ScheduleState | 避免嵌套数据结构,工具只需关心"在哪里放什么" | +| Diff 持久化 | 只持久化变更部分,减少 DB 操作,支持原子性 | +| PinnedBlocks 注入上下文 | 计划、工具摘要等信息不需要每轮都重复,用置顶块注入一次即可 | + +--- + +## 十四、关键文件速查 + +| 想了解... | 看这个文件 | +|----------|----------| +| 图怎么连的 | `graph/common_graph.go` | +| 每个节点怎么被调用的 | `node/agent_nodes.go` | +| Chat 怎么分类意图的 | `prompt/chat.go` + `node/chat.go` | +| Plan 怎么生成计划的 | `prompt/plan.go` + `node/plan.go` | +| Execute 的 ReAct 循环 | `node/execute.go` | +| confirm 回路怎么转的 | `node/confirm.go` + `node/chat.go`(handleConfirmResume) | +| LLM 输出什么格式 | `model/plan_contract.go` + `model/execute_contract.go` | +| JSON 解析怎么容错的 | `llm/json.go` | +| correction 怎么追回的 | `node/correction.go` | +| 工具怎么注册和执行的 | `tools/registry.go` | +| 工具操作什么数据 | `tools/state.go` | +| SSE 输出什么格式 | `stream/openai.go` | +| 状态怎么持久化的 | `model/state_store.go` + `conv/schedule_persist.go` | +| 日程数据怎么加载的 | `conv/schedule_provider.go` + `conv/schedule_state.go` | +| Service 怎么组装的 | `service/agentsvc/agent_newagent.go` | +| API 怎么调用的 | `api/agent.go` | +| 距离全链路还差什么 | `ROADMAP.md` | diff --git a/backend/newAgent/ROADMAP.md b/backend/newAgent/ROADMAP.md new file mode 100644 index 0000000..ebfded8 --- /dev/null +++ b/backend/newAgent/ROADMAP.md @@ -0,0 +1,300 @@ +# NewAgent 全链路改造计划 + +> 本文档面向后续 coding agent,描述当前 newAgent 的架构现状、改造计划,以及距离"会主动问用户问题、生成多个任务类、自己 ReAct 排日程的智能体"还差什么。 + +--- + +## 一、目标智能体行为 + +用户说:"帮我安排下周的复习计划"。 + +期望的完整链路: + +``` +1. Chat 节点:LLM 主动追问 + - "这周有几门考试?" + - "复习强度偏好?均匀分布还是集中在前几天?" + - "有没有要排除的时段?" + +2. Plan 节点:LLM 生成结构化计划 + - 识别意图为"批量安排任务类到日程" + - 输出 needs_rough_build=true + task_class_ids + - 或者:LLM 调用 create_task_class 工具创建新的任务类 + +3. Confirm 节点:展示计划,用户确认 + +4. RoughBuild 节点(新增):确定性粗排 + - 调用 SmartPlanningRawItemsMulti() 算法 + - 结果写入 ScheduleState(pending tasks 预填 suggested slots) + +5. Execute 节点:LLM 用读写工具微调 + - 查看 get_overview,发现粗排结果 + - 用 move/swap 调整不合理的安排 + - 用 place 处理粗排未能安排的任务 + - 每次写操作经 confirm 流程 + +6. Deliver 节点:生成最终总结 + - 变更持久化到 DB + - 向用户展示排课结果 +``` + +--- + +## 二、当前架构 + +### 图结构 + +``` +Chat → Plan → Confirm → Execute(ReAct) → Deliver +``` + +### 已实现的能力 + +| 模块 | 文件 | 状态 | +|------|------|------| +| 图骨架 | `node/agent_nodes.go` | 已实现,6 个节点 | +| Chat 节点 | `node/chat.go` | 已实现,支持 confirm resume | +| Plan 节点 | `node/plan.go` + `prompt/plan.go` | 已实现,LLM 生成结构化 PlanStep | +| Confirm 节点 | `node/confirm.go` | 已实现,创建 PendingInteraction | +| Execute 节点 | `node/execute.go` + `prompt/execute.go` | 已实现,ReAct + correction + confirm 流 | +| Deliver 节点 | `node/deliver.go` | 已实现,LLM 生成总结 | +| 10 个读写工具 | `tools/read_tools.go` + `tools/write_tools.go` | 已实现,5 读 5 写 | +| 工具注册表 | `tools/registry.go` | 已实现 | +| ScheduleState 加载 | `conv/schedule_provider.go` + `conv/schedule_state.go` | 已实现,从 DB 加载日程+任务类 | +| Confirm 回路测试 | `node/execute_confirm_flow_test.go` | 7 个测试全通过 | +| 端到端排课测试 | `node/llm_tool_orchestration_test.go` | 5 个测试全通过 | +| JSON 协议修正 | `prompt/execute.go` | 已修复,LLM 输出严格 JSON | + +### 已有数据流 + +``` +ScheduleProvider.LoadScheduleState(userID) + → ScheduleDAO.GetUserWeeklySchedule() // 现有日程 + → TaskClassDAO.GetCompleteTaskClassesByIDs() // 任务类(含 Items) + → LoadScheduleState() // 合并为 ScheduleState + - existing tasks (source="event") + - pending tasks (source="task_item", status="pending") +``` + +--- + +## 三、缺口分析 + +### 按优先级排列 + +#### P0:粗排接入(核心能力) + +**问题**:新 agent 没有 `SmartPlanningRawItemsMulti` 的调用路径。让 LLM 一个个 place 效率极低且全局最优性差。 + +**改造内容**: + +1. **Plan 节点输出扩展** + - 文件:`model/plan_contract.go`(或 PlanDecision 所在文件) + - PlanDecision 增加 `NeedsRoughBuild bool` 和 `TaskClassIDs []int` + - `prompt/plan.go` 引导 LLM 判断意图: + - 用户意图为"批量安排/智能排课/把任务类排进日程" → `needs_rough_build: true` + - 从前端 `extra` 或对话中提取 `task_class_ids` + - 其他意图 → `needs_rough_build: false` + +2. **新增 RoughBuild 图节点** + - 新文件:`node/rough_build.go` + - 不调 LLM,纯确定性逻辑: + ``` + 输入:task_class_ids, userID + 步骤: + 1. 调 ScheduleService.HybridScheduleWithPlanMulti(ctx, userID, taskClassIDs) + (内部调用 SmartPlanningRawItemsMulti 粗排) + 2. 将粗排结果写入 ScheduleState: + - pending tasks 的 Slots 字段填入 suggested 位置 + - pending tasks 的 Status 保持 "pending"(LLM 可调整,也可以改为 "suggested" 区分) + 3. 推送状态给前端 + 输出:ScheduleState 已填充粗排结果 + ``` + - 路由:`needs_rough_build=true` 时 Confirm 之后走 RoughBuild,否则跳过 + +3. **图路由修改** + - 文件:`node/agent_nodes.go` + - Confirm 之后、Execute 之前插入条件分支: + ```go + func (n *AgentNodes) branchAfterConfirm(st *AgentGraphState) string { + plan := st.RuntimeState.PlanDecision + if plan != nil && plan.NeedsRoughBuild { + return "rough_build" + } + return "execute" + } + ``` + +4. **依赖注入** + - 文件:`model/graph_run_state.go` AgentGraphDeps + - 新增 `RoughBuildFunc` 闭包(和旧 agent 的 `HybridScheduleWithPlanMultiFunc` 同理) + - 文件:`service/agentsvc/agent_newagent.go` + - 注入 `ScheduleService.HybridScheduleWithPlanMulti` 到 AgentGraphDeps + +**参考实现**:旧 agent 的 `agent/node/schedule_plan.go` 的 `runRoughBuildNode()` 函数。 + +--- + +#### P0:持久化 task_item 放置(必须) + +**问题**:`conv/schedule_persist.go` 的 `applyPlaceChange` 只处理 `source="event"`,当 LLM place 一个 `source="task_item"` 的 pending 任务时会报错。 + +**改造内容**: + +- 文件:`conv/schedule_persist.go` +- `applyPlaceChange` 增加 `source="task_item"` 分支: + ``` + if source == "task_item": + 1. 创建 ScheduleEvent(type="task", rel_id=SourceID, name=change.Name) + 2. 为每个 NewCoord 创建 Schedule 记录 + 3. 如果有嵌入关系(EmbedHost 非空): + - 找到宿主 schedule 记录 + - 设置 schedules.embedded_task_id = SourceID + 4. 更新 task_items.embedded_time(调用 TaskClassDAO.UpdateTaskClassItemEmbeddedTime) + 5. 更新 task_items.status = 2 (applied) + ``` +- `applyUnplaceChange` 也需要增加 `source="task_item"` 分支(反向清理) + +**参考实现**:旧 agent 的 `service/task-class.go` 的 `BatchApplyPlans()` 函数(第 327-536 行)。 + +--- + +#### P1:TaskClass 约束元数据暴露给 LLM + +**问题**:LLM 看到的 pending task 只有 name/category/duration,缺少 TaskClass 级别的调度约束。 + +**改造内容**: + +1. **扩展 ScheduleState 结构** + - 文件:`newAgent/tools/state.go` + - 新增: + ```go + type TaskClassMeta struct { + ID int `json:"id"` + Name string `json:"name"` + Strategy string `json:"strategy"` // "steady" | "rapid" + ExcludedSlots []int `json:"excluded_slots"` // 排除的半天时段索引 + AllowFillerCourse bool `json:"allow_filler_course"` // 是否允许嵌入水课 + TotalSlots int `json:"total_slots"` // 总时间预算 + } + ``` + - `ScheduleState` 增加 `TaskClasses []TaskClassMeta` + +2. **LoadScheduleState 填充元数据** + - 文件:`conv/schedule_state.go` + - 在 Step 4(处理 pending task items)同时填充 `state.TaskClasses` + +3. **读工具输出约束信息** + - 文件:`tools/read_tools.go` + - `get_overview` 输出中增加任务类约束描述 + - 示例: + ``` + 任务类约束: + [学习] 策略=均匀分布, 总预算=12节, 允许嵌入水课=是, 排除时段=[3,4] + ``` + +--- + +#### P2:任务类 ID 从前端传入 + +**问题**:前端请求需要在 `extra` 中传递 `task_class_ids`,后端需要接收并传递到图内部。 + +**改造内容**: + +1. **API 层**:`api/agent.go` 的请求结构增加 `Extra map[string]any` +2. **Service 层**:`service/agentsvc/agent_newagent.go` 从 `extra` 中提取 `task_class_ids`,存入 RuntimeState 或 AgentGraphRequest +3. **Plan 节点**:从 RuntimeState/Request 中读取 `task_class_ids`,合并 LLM 从对话中提取的 IDs + +**参考实现**:旧 agent 的 `agent/node/schedule_plan.go` 的 `normalizeTaskClassIDs()` 函数。 + +--- + +#### P3:LLM 主动追问能力增强 + +**问题**:当前 Chat 节点主要做"接收用户消息 + confirm resume",缺少"LLM 主动收集排课需求"的能力。 + +**改造内容**: + +- Chat 节点的 prompt 增强: + - 引导 LLM 在信息不足时主动追问 + - 追问内容:考试科目、复习偏好、时段排除、强度偏好 + - 追问方式:通过 `ask_user` action 或直接在 speak 中提问 +- 可能需要新增 ConversationContext 的"收集到的需求"字段 +- 收集到的需求在 Plan 节点中被使用 + +--- + +#### P4:LLM 创建任务类工具(锦上添花) + +**问题**:用户说"帮我安排复习",但系统里没有对应的 TaskClass,LLM 无法创建。 + +**改造内容**: + +1. **新增工具**:`create_task_class` + - 参数:`name`, `strategy`, `total_slots`, `allow_filler_course`, `items: [{content, duration}]` + - 行为:调用 TaskClassDAO 在 DB 中创建 TaskClass + Items + - 返回:创建结果 + 新的 task_class_id + +2. **新增工具**:`update_task_class`(可选) + - 修改已有任务类的参数 + +3. **ScheduleState 动态刷新** + - 创建后需要重新加载 ScheduleState 以反映新的 pending tasks + +--- + +## 四、改造顺序建议 + +``` +Phase 1(打通核心链路) + ├── P0: 粗排接入(RoughBuild 节点 + 图路由 + 依赖注入) + ├── P0: 持久化 task_item 放置 + └── P2: task_class_ids 从前端传入 + +Phase 2(提升排课质量) + ├── P1: TaskClass 约束元数据暴露 + └── P1: 读工具输出优化(携带约束信息) + +Phase 3(智能化) + ├── P3: Chat 节点追问能力增强 + └── P4: create_task_class 工具 +``` + +--- + +## 五、关键设计决策记录 + +1. **粗排由图节点驱动,不由 LLM 驱动**:粗排是确定性算法,浪费 LLM 调用不划算。 +2. **粗排通过 Plan 节点的 `needs_rough_build` 标签触发**:不是所有请求都需要粗排,LLM 判断意图后打标签。 +3. **粗排结果写入 ScheduleState 的 Slots 字段**:LLM 在 Execute 阶段看到的是"已粗排、可调整"的状态,用 move/swap 微调。 +4. **task_class_ids 来源**:前端 `extra` 传入为主,LLM 从对话提取为辅。 +5. **持久化用 Diff 模式**:对比 original 和 modified ScheduleState,只持久化变更部分。 + +--- + +## 六、关键文件索引 + +| 用途 | 文件 | +|------|------| +| 图骨架与路由 | `newAgent/node/agent_nodes.go` | +| Chat 节点 | `newAgent/node/chat.go` | +| Plan 节点 | `newAgent/node/plan.go` | +| Confirm 节点 | `newAgent/node/confirm.go` | +| Execute 节点 | `newAgent/node/execute.go` | +| Deliver 节点 | `newAgent/node/deliver.go` | +| Execute prompt | `newAgent/prompt/execute.go` | +| Plan prompt | `newAgent/prompt/plan.go` | +| 工具状态模型 | `newAgent/tools/state.go` | +| 读工具 | `newAgent/tools/read_tools.go` | +| 写工具 | `newAgent/tools/write_tools.go` | +| 工具注册表 | `newAgent/tools/registry.go` | +| 图运行态 | `newAgent/model/graph_run_state.go` | +| 公共状态 | `newAgent/model/common_state.go` | +| 日程状态加载 | `conv/schedule_provider.go` | +| DB→State 转换 | `conv/schedule_state.go` | +| Diff 算法 | `conv/schedule_state.go` (DiffScheduleState) | +| 持久化 | `conv/schedule_persist.go` | +| Service 集成 | `service/agentsvc/agent_newagent.go` | +| 粗排算法(旧,复用) | `logic/smart_planning.go` | +| 旧 agent 粗排节点(参考) | `agent/node/schedule_plan.go` | +| 旧 agent 批量应用(参考) | `service/task-class.go` BatchApplyPlans | diff --git a/backend/newAgent/llm/ark.go b/backend/newAgent/llm/ark.go index 0e5c98a..dab8bea 100644 --- a/backend/newAgent/llm/ark.go +++ b/backend/newAgent/llm/ark.go @@ -1,3 +1,13 @@ +// 过渡期遗留文件。 +// +// 这里的 CallArkText / CallArkJSON 是为了让旧 agent 代码(route/quicknote 等) +// 在迁移到统一 Client 之前能继续直接持有 *ark.ChatModel。 +// +// 替代路径: +// - CallArkText → WrapArkClient(arkModel) + client.GenerateText(...) +// - CallArkJSON → WrapArkClient(arkModel) + GenerateJSON[T](...) +// +// 待旧 agent 代码全部收敛到 Client 接口后,本文件可整体删除。 package newagentllm import ( diff --git a/backend/newAgent/llm/ark_adapter.go b/backend/newAgent/llm/ark_adapter.go new file mode 100644 index 0000000..ef5017f --- /dev/null +++ b/backend/newAgent/llm/ark_adapter.go @@ -0,0 +1,99 @@ +package newagentllm + +import ( + "context" + "errors" + "io" + + "github.com/cloudwego/eino-ext/components/model/ark" + einoModel "github.com/cloudwego/eino/components/model" + "github.com/cloudwego/eino/schema" + arkModel "github.com/volcengine/volcengine-go-sdk/service/arkruntime/model" +) + +// WrapArkClient 将 ark.ChatModel 适配为 newAgent 的统一 Client。 +// +// 职责边界: +// 1. generateText:调用 ark.ChatModel.Generate(非流式),供 GenerateJSON 使用; +// 2. streamText:调用 ark.ChatModel.Stream(流式),供 EmitPseudoAssistantText 等使用; +// 3. 两者共用 buildArkStreamOptions 统一构造调用选项。 +func WrapArkClient(arkChatModel *ark.ChatModel) *Client { + if arkChatModel == nil { + return nil + } + + // 非流式文本生成,供 GenerateJSON / GenerateText 调用路径使用。 + generateFunc := func(ctx context.Context, messages []*schema.Message, options GenerateOptions) (*TextResult, error) { + arkOpts := buildArkStreamOptions(options) + msg, err := arkChatModel.Generate(ctx, messages, arkOpts...) + if err != nil { + return nil, err + } + if msg == nil { + return nil, errors.New("ark model returned nil message") + } + return &TextResult{Text: msg.Content}, nil + } + + // 流式文本生成。 + streamFunc := func(ctx context.Context, messages []*schema.Message, options GenerateOptions) (StreamReader, error) { + arkOpts := buildArkStreamOptions(options) + reader, err := arkChatModel.Stream(ctx, messages, arkOpts...) + if err != nil { + return nil, err + } + return &arkStreamReaderAdapter{reader: reader}, nil + } + + return NewClient(generateFunc, streamFunc) +} + +// buildArkStreamOptions 将 newAgent 的 GenerateOptions 转换为 ark 的流式调用选项。 +func buildArkStreamOptions(options GenerateOptions) []einoModel.Option { + // Thinking + thinkingType := arkModel.ThinkingTypeDisabled + if options.Thinking == ThinkingModeEnabled { + thinkingType = arkModel.ThinkingTypeEnabled + } + + opts := []einoModel.Option{ + ark.WithThinking(&arkModel.Thinking{Type: thinkingType}), + } + + // Temperature + if options.Temperature > 0 { + opts = append(opts, einoModel.WithTemperature(float32(options.Temperature))) + } + + // MaxTokens + if options.MaxTokens > 0 { + opts = append(opts, einoModel.WithMaxTokens(options.MaxTokens)) + } + + return opts +} + +// arkStreamReaderAdapter 适配 ark.ChatModel.Stream 返回的 reader。 +// ark.Stream 返回 schema.StreamReader[*schema.Message],其 Close() 方法无返回值 +// 而我们的 StreamReader 接口要求 Close() error +type arkStreamReaderAdapter struct { + reader *schema.StreamReader[*schema.Message] +} + +// Recv 转发到 ark reader 的 Recv 方法。 +func (r *arkStreamReaderAdapter) Recv() (*schema.Message, error) { + if r == nil || r.reader == nil { + return nil, io.EOF + } + return r.reader.Recv() +} + +// Close 转发到 ark reader 的 Close 方法。 +// ark 的 Close() 无返回值,我们适配为返回 nil +func (r *arkStreamReaderAdapter) Close() error { + if r == nil || r.reader == nil { + return nil + } + r.reader.Close() + return nil +} diff --git a/backend/newAgent/model/common_state.go b/backend/newAgent/model/common_state.go index 9d60510..c86c053 100644 --- a/backend/newAgent/model/common_state.go +++ b/backend/newAgent/model/common_state.go @@ -36,6 +36,9 @@ type CommonState struct { // 安全边界 MaxRounds int `json:"max_rounds"` RoundUsed int `json:"round_used"` + + // 连续修正计数:LLM 连续输出不合法决策的次数,超过阈值后强制终止避免死循环。 + ConsecutiveCorrections int `json:"consecutive_corrections"` } func NewCommonState(traceID string, userID int, conversationID string) *CommonState { diff --git a/backend/newAgent/model/graph_run_state.go b/backend/newAgent/model/graph_run_state.go index acbcfd8..7b956dc 100644 --- a/backend/newAgent/model/graph_run_state.go +++ b/backend/newAgent/model/graph_run_state.go @@ -32,18 +32,19 @@ func (r *AgentGraphRequest) Normalize() { // AgentGraphDeps 描述 graph/node 层运行时真正依赖的可插拔能力。 // // 设计目的: -// 1. 让 graph 不再只拿到“裸状态”,而是能拿到上下文、模型和输出能力; +// 1. 让 graph 不再只拿到”裸状态”,而是能拿到上下文、模型和输出能力; // 2. Chat/Plan/Execute/Deliver 允许分别挂不同 client,但也允许先复用同一个 client; // 3. ChunkEmitter 统一承接阶段提示、正文、工具事件、确认请求等 SSE 输出。 type AgentGraphDeps struct { - ChatClient *newagentllm.Client - PlanClient *newagentllm.Client - ExecuteClient *newagentllm.Client - DeliverClient *newagentllm.Client - ChunkEmitter *newagentstream.ChunkEmitter - StateStore AgentStateStore - ToolRegistry *newagenttools.ToolRegistry - ScheduleProvider ScheduleStateProvider // 按 DAO 注入,Execute 节点按需加载 ScheduleState + ChatClient *newagentllm.Client + PlanClient *newagentllm.Client + ExecuteClient *newagentllm.Client + DeliverClient *newagentllm.Client + ChunkEmitter *newagentstream.ChunkEmitter + StateStore AgentStateStore + ToolRegistry *newagenttools.ToolRegistry + ScheduleProvider ScheduleStateProvider // 按 DAO 注入,Execute 节点按需加载 ScheduleState + SchedulePersistor SchedulePersistor // 按 DAO 注入,用于写工具执行后持久化变更 } // EnsureChunkEmitter 保证 graph 运行时始终有一个可用的 chunk 发射器。 @@ -131,19 +132,6 @@ type AgentGraphRunInput struct { Deps AgentGraphDeps } -// AgentGraphState 是 graph 内部真正流转的运行态容器。 -// -// 职责边界: -// 1. 负责把“流程状态 + 对话上下文 + 请求输入 + 运行依赖”收口到同一个对象; -// 2. 负责给 graph 分支和 node 提供最小必要的兜底访问方法; -// 3. 不负责持久化,不负责真正业务执行。 -// ScheduleStateProvider 定义加载 ScheduleState 的接口。 -// 由 DAO 层或 Service 层实现,注入到 AgentGraphDeps 中。 -// 使用接口而非具体 DAO 类型,避免 model → dao 的循环依赖。 -type ScheduleStateProvider interface { - LoadScheduleState(ctx context.Context, userID int) (*newagenttools.ScheduleState, error) -} - // AgentGraphState 是 graph 内部真正流转的运行态容器。 // // 职责边界: @@ -151,11 +139,12 @@ type ScheduleStateProvider interface { // 2. 负责给 graph 分支和 node 提供最小必要的兜底访问方法; // 3. 不负责持久化,不负责真正业务执行。 type AgentGraphState struct { - RuntimeState *AgentRuntimeState - ConversationContext *ConversationContext - Request AgentGraphRequest - Deps AgentGraphDeps - ScheduleState *newagenttools.ScheduleState // 工具操作的内存数据源,Execute 节点按需加载 + RuntimeState *AgentRuntimeState + ConversationContext *ConversationContext + Request AgentGraphRequest + Deps AgentGraphDeps + ScheduleState *newagenttools.ScheduleState // 工具操作的内存数据源,Execute 节点按需加载 + OriginalScheduleState *newagenttools.ScheduleState // 首次加载时的原始快照,供 diff 用 } // NewAgentGraphState 把入口参数整理成 graph 内部状态。 @@ -239,5 +228,7 @@ func (s *AgentGraphState) EnsureScheduleState(ctx context.Context) (*newagenttoo return nil, err } s.ScheduleState = state + // 保存原始快照,供后续 diff 使用。 + s.OriginalScheduleState = state.Clone() return state, nil } diff --git a/backend/newAgent/model/state_store.go b/backend/newAgent/model/state_store.go index b703ffd..33fb0b8 100644 --- a/backend/newAgent/model/state_store.go +++ b/backend/newAgent/model/state_store.go @@ -1,6 +1,10 @@ package model -import "context" +import ( + "context" + + newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" +) // AgentStateSnapshot 是需要持久化的 agent 运行态最小快照。 // @@ -47,3 +51,17 @@ type AgentStateStore interface { // 2. 典型调用时机:Deliver 节点任务完成后清理。 Delete(ctx context.Context, conversationID string) error } + +// ScheduleStateProvider 定义加载 ScheduleState 的接口。 +// 由 DAO 层或 Service 层实现,注入到 AgentGraphDeps 中。 +// 使用接口而非具体 DAO 类型,避免 model → dao 的循环依赖。 +type ScheduleStateProvider interface { + LoadScheduleState(ctx context.Context, userID int) (*newagenttools.ScheduleState, error) +} + +// SchedulePersistor 定义持久化 ScheduleState 变更的接口。 +// 由 Service 层或 DAO 层实现,注入到 AgentGraphDeps 中。 +// 使用接口而非具体 DAO 类型,避免 model → dao 的循环依赖。 +type SchedulePersistor interface { + PersistScheduleChanges(ctx context.Context, original, modified *newagenttools.ScheduleState, userID int) error +} diff --git a/backend/newAgent/node/agent_nodes.go b/backend/newAgent/node/agent_nodes.go index a57a94a..15a447a 100644 --- a/backend/newAgent/node/agent_nodes.go +++ b/backend/newAgent/node/agent_nodes.go @@ -3,6 +3,7 @@ package newagentnode import ( "context" "errors" + "fmt" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" @@ -147,21 +148,39 @@ func (n *AgentNodes) Execute(ctx context.Context, st *newagentmodel.AgentGraphSt // 按需加载 ScheduleState(首次执行时从 DB 加载,后续复用内存中的 state)。 var scheduleState *newagenttools.ScheduleState - if ss, _ := st.EnsureScheduleState(ctx); ss != nil { + if ss, loadErr := st.EnsureScheduleState(ctx); loadErr != nil { + return nil, fmt.Errorf("execute node: 加载日程状态失败: %w", loadErr) + } else if ss != nil { scheduleState = ss } + // 注入工具 schema 到 ConversationContext,让 LLM 能看到可用工具列表。 + if st.Deps.ToolRegistry != nil { + schemas := st.Deps.ToolRegistry.Schemas() + toolSchemas := make([]newagentmodel.ToolSchemaContext, len(schemas)) + for i, s := range schemas { + toolSchemas[i] = newagentmodel.ToolSchemaContext{ + Name: s.Name, + Desc: s.Desc, + SchemaText: s.SchemaText, + } + } + st.EnsureConversationContext().SetToolSchemas(toolSchemas) + } + if err := RunExecuteNode( ctx, ExecuteNodeInput{ - RuntimeState: st.EnsureRuntimeState(), - ConversationContext: st.EnsureConversationContext(), - UserInput: st.Request.UserInput, - Client: st.Deps.ResolveExecuteClient(), - ChunkEmitter: st.EnsureChunkEmitter(), - ResumeNode: "execute", - ToolRegistry: st.Deps.ToolRegistry, - ScheduleState: scheduleState, + RuntimeState: st.EnsureRuntimeState(), + ConversationContext: st.EnsureConversationContext(), + UserInput: st.Request.UserInput, + Client: st.Deps.ResolveExecuteClient(), + ChunkEmitter: st.EnsureChunkEmitter(), + ResumeNode: "execute", + ToolRegistry: st.Deps.ToolRegistry, + ScheduleState: scheduleState, + SchedulePersistor: st.Deps.SchedulePersistor, + OriginalScheduleState: st.OriginalScheduleState, }, ); err != nil { return nil, err diff --git a/backend/newAgent/node/chat.go b/backend/newAgent/node/chat.go index 2b532b1..836e836 100644 --- a/backend/newAgent/node/chat.go +++ b/backend/newAgent/node/chat.go @@ -176,7 +176,14 @@ func handleConfirmResume( switch action { case "accept": + // 恢复前保存待执行工具,Execute 节点需要它。 + pendingTool := pending.PendingTool runtimeState.ResumeFromPending() + // 将待执行工具放回临时邮箱,供 Execute 节点执行。 + if pendingTool != nil { + copied := *pendingTool + runtimeState.PendingConfirmTool = &copied + } flowState.Phase = newagentmodel.PhaseExecuting _ = emitter.EmitStatus( chatStatusBlockID, chatStageName, diff --git a/backend/newAgent/node/execute.go b/backend/newAgent/node/execute.go index ff9aa34..b578dc1 100644 --- a/backend/newAgent/node/execute.go +++ b/backend/newAgent/node/execute.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log" "strings" "time" @@ -30,16 +31,20 @@ const ( // 2. RuntimeState 提供 plan 步骤与轮次预算; // 3. ConversationContext 提供历史对话与置顶上下文; // 4. ToolRegistry 提供工具注册表; -// 5. ScheduleState 提供工具操作的内存数据源(可为 nil,由调用方按需加载)。 +// 5. ScheduleState 提供工具操作的内存数据源(可为 nil,由调用方按需加载); +// 6. SchedulePersistor 用于写工具执行后持久化变更; +// 7. OriginalScheduleState 是首次加载时的原始快照,用于 diff。 type ExecuteNodeInput struct { - RuntimeState *newagentmodel.AgentRuntimeState - ConversationContext *newagentmodel.ConversationContext - UserInput string - Client *newagentllm.Client - ChunkEmitter *newagentstream.ChunkEmitter - ResumeNode string - ToolRegistry *newagenttools.ToolRegistry - ScheduleState *newagenttools.ScheduleState // 工具操作的内存数据源,由调用方从 AgentGraphState 注入 + RuntimeState *newagentmodel.AgentRuntimeState + ConversationContext *newagentmodel.ConversationContext + UserInput string + Client *newagentllm.Client + ChunkEmitter *newagentstream.ChunkEmitter + ResumeNode string + ToolRegistry *newagenttools.ToolRegistry + ScheduleState *newagenttools.ScheduleState + SchedulePersistor newagentmodel.SchedulePersistor + OriginalScheduleState *newagenttools.ScheduleState } // ExecuteRoundObservation 记录执行阶段每轮的关键观察。 @@ -85,6 +90,11 @@ func RunExecuteNode(ctx context.Context, input ExecuteNodeInput) error { } flowState := runtimeState.EnsureCommonState() + // 1.5. 确认执行分支:如果用户已确认写操作,直接执行工具。 + if runtimeState.PendingConfirmTool != nil { + return executePendingTool(ctx, runtimeState, conversationContext, input.ToolRegistry, input.ScheduleState, input.SchedulePersistor, input.OriginalScheduleState, emitter) + } + // 2. 检查是否有可执行的 plan 步骤。 if !flowState.HasCurrentPlanStep() { return fmt.Errorf("execute node: 当前无有效 plan 步骤,无法执行") @@ -127,20 +137,69 @@ func RunExecuteNode(ctx context.Context, input ExecuteNodeInput) error { }, }, ) + const maxConsecutiveCorrections = 3 + + // 提前捕获原始文本,用于日志和 correction。 + rawText := "" + if rawResult != nil { + rawText = strings.TrimSpace(rawResult.Text) + } + if err != nil { - if rawResult != nil && strings.TrimSpace(rawResult.Text) != "" { - return fmt.Errorf("执行决策解析失败,原始输出=%s,错误=%w", strings.TrimSpace(rawResult.Text), err) + if rawText != "" { + log.Printf("[DEBUG] execute LLM 输出解析失败 chat=%s round=%d raw=%s", + flowState.ConversationID, flowState.RoundUsed, rawText) + flowState.ConsecutiveCorrections++ + if flowState.ConsecutiveCorrections >= maxConsecutiveCorrections { + return fmt.Errorf("连续 %d 次输出非 JSON,终止执行: 原始输出=%s", + flowState.ConsecutiveCorrections, rawText) + } + AppendLLMCorrectionWithHint( + conversationContext, + rawText, + "你的输出不是合法 JSON,无法解析。", + "你必须输出严格的 JSON 格式,不要使用 [NEXT_PLAN] 等纯文本标记。合法格式示例:{\"speak\":\"...\",\"action\":\"next_plan\",\"goal_check\":\"...\",\"reason\":\"...\"}", + ) + return nil } return fmt.Errorf("执行阶段模型调用失败: %w", err) } + + // 调试日志:输出 LLM 原始返回和解析后的决策,方便排查。 + log.Printf("[DEBUG] execute LLM 响应 chat=%s round=%d action=%s speak_len=%d raw_len=%d raw_preview=%.200s", + flowState.ConversationID, flowState.RoundUsed, + decision.Action, len(decision.Speak), len(rawText), rawText) + if err := decision.Validate(); err != nil { - return fmt.Errorf("执行决策不合法: %w", err) + flowState.ConsecutiveCorrections++ + log.Printf("[WARN] execute 决策不合法 chat=%s round=%d consecutive=%d/%d err=%s", + flowState.ConversationID, flowState.RoundUsed, + flowState.ConsecutiveCorrections, maxConsecutiveCorrections, err.Error()) + if flowState.ConsecutiveCorrections >= maxConsecutiveCorrections { + return fmt.Errorf("连续 %d 次决策不合法,终止执行: %s (原始输出: %s)", + flowState.ConsecutiveCorrections, err.Error(), rawText) + } + // 给 LLM 修正机会。 + AppendLLMCorrectionWithHint( + conversationContext, + rawText, + fmt.Sprintf("你的执行决策不合法:%s", err.Error()), + "合法的 action 包括:continue(继续当前步骤)、ask_user(追问用户)、confirm(写操作确认)、next_plan(推进到下一步)、done(任务完成)。", + ) + return nil } + // 决策合法,重置连续修正计数。 + flowState.ConsecutiveCorrections = 0 + // 自省校验:next_plan / done 必须附带 goal_check,否则不推进,追加修正让 LLM 重试。 if decision.Action == newagentmodel.ExecuteActionNextPlan || decision.Action == newagentmodel.ExecuteActionDone { if strings.TrimSpace(decision.GoalCheck) == "" { + flowState.ConsecutiveCorrections++ + if flowState.ConsecutiveCorrections >= maxConsecutiveCorrections { + return fmt.Errorf("连续 %d 次 goal_check 为空,终止执行", flowState.ConsecutiveCorrections) + } AppendLLMCorrectionWithHint( conversationContext, decision.Speak, @@ -346,12 +405,131 @@ func executeToolCall( // 2. 执行工具。 result := registry.Execute(scheduleState, toolName, toolCall.Arguments) - // 3. 将工具结果追加到对话历史,让 LLM 下一轮能看到。 + // 3. 将工具调用和结果以合法的 assistant+tool 消息对追加到对话历史。 + // + // 修复说明: + // 旧实现直接追加裸 Tool 消息(无 ToolCallID、无前置 assistant tool_calls), + // 违反 OpenAI 兼容 API 消息格式约束,导致 API 拒绝请求、连接断开。 + // 正确做法:先追加带 ToolCalls 的 assistant 消息,再追加带匹配 ToolCallID 的 tool 消息。 + toolCallID := uuid.NewString() + + argsJSON := "{}" + if toolCall.Arguments != nil { + if raw, err := json.Marshal(toolCall.Arguments); err == nil { + argsJSON = string(raw) + } + } + conversationContext.AppendHistory(&schema.Message{ - Role: schema.Tool, - Content: result, + Role: schema.Assistant, + Content: "", + ToolCalls: []schema.ToolCall{ + { + ID: toolCallID, + Type: "function", + Function: schema.FunctionCall{ + Name: toolName, + Arguments: argsJSON, + }, + }, + }, }) + conversationContext.AppendHistory(&schema.Message{ + Role: schema.Tool, + Content: result, + ToolCallID: toolCallID, + ToolName: toolName, + }) + + return nil +} + +// executePendingTool 执行用户已确认的写工具。 +// +// 职责边界: +// 1. 从 PendingConfirmTool 读取工具名和参数(已序列化); +// 2. 反序列化参数后调用工具执行; +// 3. 将结果追加到历史,清空 PendingConfirmTool; +// 4. 执行成功后调用 persistor 持久化变更; +// 5. 不调用 LLM,直接返回让下一轮继续。 +func executePendingTool( + ctx context.Context, + runtimeState *newagentmodel.AgentRuntimeState, + conversationContext *newagentmodel.ConversationContext, + registry *newagenttools.ToolRegistry, + scheduleState *newagenttools.ScheduleState, + persistor newagentmodel.SchedulePersistor, + originalState *newagenttools.ScheduleState, + emitter *newagentstream.ChunkEmitter, +) error { + pending := runtimeState.PendingConfirmTool + if pending == nil { + return nil + } + + // 1. 反序列化参数。 + var args map[string]any + if err := json.Unmarshal([]byte(pending.ArgsJSON), &args); err != nil { + return fmt.Errorf("解析工具参数失败: %w", err) + } + + // 2. 推送状态。 + if err := emitter.EmitStatus( + executeStatusBlockID, + executeStageName, + "tool_call", + fmt.Sprintf("正在执行工具:%s", pending.ToolName), + false, + ); err != nil { + return fmt.Errorf("工具调用状态推送失败: %w", err) + } + + // 3. 校验依赖:写工具必须持有有效的日程状态。 + if scheduleState == nil { + return fmt.Errorf("日程状态未加载,无法执行已确认的写工具 %s", pending.ToolName) + } + + // 4. 执行工具。 + result := registry.Execute(scheduleState, pending.ToolName, args) + + // 5. 将工具调用和结果以合法的 assistant+tool 消息对追加到历史。 + // + // 修复说明:同 executeToolCall,需要配对的 assistant+tool 消息。 + toolCallID := uuid.NewString() + + conversationContext.AppendHistory(&schema.Message{ + Role: schema.Assistant, + Content: "", + ToolCalls: []schema.ToolCall{ + { + ID: toolCallID, + Type: "function", + Function: schema.FunctionCall{ + Name: pending.ToolName, + Arguments: pending.ArgsJSON, + }, + }, + }, + }) + + conversationContext.AppendHistory(&schema.Message{ + Role: schema.Tool, + Content: result, + ToolCallID: toolCallID, + ToolName: pending.ToolName, + }) + + // 6. 清空临时邮箱,避免重复执行。 + runtimeState.PendingConfirmTool = nil + + // 7. 持久化变更(如果有 persistor)。 + if persistor != nil && originalState != nil { + if err := persistor.PersistScheduleChanges(ctx, originalState, scheduleState, runtimeState.UserID); err != nil { + return fmt.Errorf("持久化日程变更失败: %w", err) + } + } + return nil } diff --git a/backend/newAgent/prompt/base.go b/backend/newAgent/prompt/base.go new file mode 100644 index 0000000..033fcd2 --- /dev/null +++ b/backend/newAgent/prompt/base.go @@ -0,0 +1,185 @@ +package newagentprompt + +import ( + "fmt" + "strings" + + newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" + "github.com/cloudwego/eino/schema" +) + +// buildStageMessages 组装某个阶段通用的 messages。 +// +// 步骤说明: +// 1. 先合并 context 自带 system prompt 与阶段 prompt,保证通用约束和阶段约束都生效; +// 2. 再把置顶上下文块和工具摘要补成 system message,尽量顶在 history 前面; +// 3. 最后追加历史消息与本轮 user prompt,保持"新约束在前、历史在后"的稳定顺序。 +func buildStageMessages(stageSystemPrompt string, ctx *newagentmodel.ConversationContext, runtimeUserPrompt string) []*schema.Message { + messages := make([]*schema.Message, 0, 4) + + mergedSystemPrompt := mergeSystemPrompts(ctx, stageSystemPrompt) + if mergedSystemPrompt != "" { + messages = append(messages, schema.SystemMessage(mergedSystemPrompt)) + } + + if pinnedText := renderPinnedBlocks(ctx); pinnedText != "" { + messages = append(messages, schema.SystemMessage(pinnedText)) + } + + if toolText := renderToolSchemas(ctx); toolText != "" { + messages = append(messages, schema.SystemMessage(toolText)) + } + + if ctx != nil { + history := ctx.HistorySnapshot() + if len(history) > 0 { + // 兼容旧快照:裸 Tool 消息(无 ToolCallID)违反 OpenAI 兼容 API 格式约束, + // 会触发 API 拒绝请求导致连接断开。 + // 这里将裸 Tool 消息降级为 User 消息,保证向后兼容。 + for i, msg := range history { + if msg.Role == schema.Tool && msg.ToolCallID == "" { + history[i] = &schema.Message{ + Role: schema.User, + Content: fmt.Sprintf("[工具执行结果]\n%s", msg.Content), + } + } + } + messages = append(messages, history...) + } + } + + runtimeUserPrompt = strings.TrimSpace(runtimeUserPrompt) + if runtimeUserPrompt != "" { + messages = append(messages, schema.UserMessage(runtimeUserPrompt)) + } + + return messages +} + +// renderStateSummary 把当前流程状态渲染成简洁文本。 +func renderStateSummary(state *newagentmodel.CommonState) string { + if state == nil { + return "当前状态:state 缺失,请先做兜底处理。" + } + + var sb strings.Builder + current, total := state.PlanProgress() + + sb.WriteString(fmt.Sprintf("当前阶段:%s\n", state.Phase)) + sb.WriteString(fmt.Sprintf("当前轮次:%d/%d\n", state.RoundUsed, state.MaxRounds)) + + if !state.HasPlan() { + sb.WriteString("当前完整 plan:暂无。\n") + return sb.String() + } + + sb.WriteString("当前完整 plan:\n") + for i, step := range state.PlanSteps { + sb.WriteString(fmt.Sprintf("%d. %s\n", i+1, strings.TrimSpace(step.Content))) + if strings.TrimSpace(step.DoneWhen) != "" { + sb.WriteString(fmt.Sprintf(" 完成判定:%s\n", strings.TrimSpace(step.DoneWhen))) + } + } + + if step, ok := state.CurrentPlanStep(); ok { + sb.WriteString(fmt.Sprintf("当前步骤进度:%d/%d\n", current, total)) + sb.WriteString("当前步骤内容:\n") + sb.WriteString(strings.TrimSpace(step.Content)) + sb.WriteString("\n") + if strings.TrimSpace(step.DoneWhen) != "" { + sb.WriteString("当前步骤完成判定:\n") + sb.WriteString(strings.TrimSpace(step.DoneWhen)) + sb.WriteString("\n") + } + } else { + sb.WriteString("当前步骤进度:暂时无有效当前步骤。\n") + } + + return sb.String() +} + +// renderPinnedBlocks 把 ConversationContext 中的置顶块渲染成独立的 system 文本。 +func renderPinnedBlocks(ctx *newagentmodel.ConversationContext) string { + if ctx == nil { + return "" + } + + blocks := ctx.PinnedBlocksSnapshot() + if len(blocks) == 0 { + return "" + } + + var sb strings.Builder + sb.WriteString("以下是后端置顶注入的上下文,请优先遵守:\n") + for _, block := range blocks { + title := strings.TrimSpace(block.Title) + if title == "" { + title = strings.TrimSpace(block.Key) + } + if title != "" { + sb.WriteString("【") + sb.WriteString(title) + sb.WriteString("】\n") + } + sb.WriteString(strings.TrimSpace(block.Content)) + sb.WriteString("\n") + } + return strings.TrimSpace(sb.String()) +} + +// renderToolSchemas 把工具摘要渲染成独立文本块。 +func renderToolSchemas(ctx *newagentmodel.ConversationContext) string { + if ctx == nil { + return "" + } + + schemas := ctx.ToolSchemasSnapshot() + if len(schemas) == 0 { + return "" + } + + var sb strings.Builder + sb.WriteString("以下是当前可用工具摘要,仅供你在规划时参考能力边界:\n") + for _, item := range schemas { + name := strings.TrimSpace(item.Name) + desc := strings.TrimSpace(item.Desc) + schemaText := strings.TrimSpace(item.SchemaText) + + if name != "" { + sb.WriteString("- 工具名:") + sb.WriteString(name) + sb.WriteString("\n") + } + if desc != "" { + sb.WriteString(" 说明:") + sb.WriteString(desc) + sb.WriteString("\n") + } + if schemaText != "" { + sb.WriteString(" 参数摘要:") + sb.WriteString(schemaText) + sb.WriteString("\n") + } + } + + return strings.TrimSpace(sb.String()) +} + +func mergeSystemPrompts(ctx *newagentmodel.ConversationContext, stageSystemPrompt string) string { + base := "" + if ctx != nil { + base = strings.TrimSpace(ctx.SystemPrompt) + } + stageSystemPrompt = strings.TrimSpace(stageSystemPrompt) + + switch { + case base == "" && stageSystemPrompt == "": + return "" + case base == "": + return stageSystemPrompt + case stageSystemPrompt == "": + return base + default: + return base + "\n\n" + stageSystemPrompt + } +} diff --git a/backend/newAgent/prompt/chat.go b/backend/newAgent/prompt/chat.go index 106acab..9fe74d3 100644 --- a/backend/newAgent/prompt/chat.go +++ b/backend/newAgent/prompt/chat.go @@ -47,9 +47,16 @@ func BuildChatIntentMessages(conversationContext *newagentmodel.ConversationCont } } + // 只在 history 末尾还没有当前用户消息时才追加, + // 避免与 loadConversationContext 的预追加产生重复。 trimmedInput := strings.TrimSpace(userInput) if trimmedInput != "" { - messages = append(messages, schema.UserMessage(trimmedInput)) + alreadyLast := len(messages) > 0 && + messages[len(messages)-1].Role == schema.User && + messages[len(messages)-1].Content == trimmedInput + if !alreadyLast { + messages = append(messages, schema.UserMessage(trimmedInput)) + } } return messages diff --git a/backend/newAgent/prompt/execute.go b/backend/newAgent/prompt/execute.go index eda6a17..1edca95 100644 --- a/backend/newAgent/prompt/execute.go +++ b/backend/newAgent/prompt/execute.go @@ -1,34 +1,25 @@ package newagentprompt import ( + "fmt" "strings" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" "github.com/cloudwego/eino/schema" ) -const ( - // ExecuteNextPlanSignal 表示“当前 plan step 已完成,可以推进到下一个步骤”。 - ExecuteNextPlanSignal = "[NEXT_PLAN]" - - // ExecuteDoneSignal 表示“整个任务已经完成,可以进入最终交付”。 - ExecuteDoneSignal = "[DONE]" - - // ExecuteAskUserSignal 表示“执行当前步骤缺少关键信息,需要向用户追问”。 - ExecuteAskUserSignal = "[ASK_USER]" -) - const executeSystemPrompt = ` 你是 SmartFlow NewAgent 的执行器。 你的职责是在“当前 plan 步骤”的约束下,进行思考、执行、观察,再决定下一步动作。 请遵守以下规则: 1. 只围绕当前步骤行动,不要擅自跳到其他 plan 步骤。 -2. 只有当你确认当前步骤已经完成时,才输出 ` + "`" + `[NEXT_PLAN]` + "`" + `,且必须在 goal_check 中逐条对照 done_when 说明完成依据。 -3. 只有当你确认整个任务已经完成时,才输出 ` + "`" + `[DONE]` + "`" + `,且必须在 goal_check 中总结整体完成证据。 -4. 如果执行当前步骤缺少关键上下文,且无法通过已有历史或工具补齐,可以输出 ` + "`" + `[ASK_USER]` + "`" + `。 -5. 不要伪造工具结果;如果尚未真正拿到观察结果,就不要假装已经完成。 -6. goal_check 是你输出 next_plan / done 时的强制字段,禁止为空;必须显式地逐条对照 done_when,说明"哪些条件已满足、依据是什么"。 +2. 只输出严格 JSON,不要输出 markdown,不要输出额外解释,不要在 JSON 外再补文字。 +3. 只有当你确认当前步骤已经完成时,才输出 action=next_plan,且必须在 goal_check 中逐条对照 done_when 说明完成依据。 +4. 只有当你确认整个任务已经完成时,才输出 action=done,且必须在 goal_check 中总结整体完成证据。 +5. 如果执行当前步骤缺少关键上下文,且无法通过已有历史或工具补齐,输出 action=ask_user。 +6. 不要伪造工具结果;如果尚未真正拿到观察结果,就不要假装已经完成。 +7. goal_check 是你输出 next_plan / done 时的强制字段,禁止为空;必须显式地逐条对照 done_when,说明”哪些条件已满足、依据是什么”。 你会看到: - 当前完整 plan @@ -37,7 +28,7 @@ const executeSystemPrompt = ` - 工具摘要 - 历史对话与历史观察 -请把注意力聚焦在“当前步骤是否完成,以及下一步最合理的执行动作”上。 +请把注意力聚焦在”当前步骤是否完成,以及下一步最合理的执行动作”上。 ` // BuildExecuteSystemPrompt 返回执行阶段系统提示词。 @@ -45,6 +36,56 @@ func BuildExecuteSystemPrompt() string { return strings.TrimSpace(executeSystemPrompt) } +// BuildExecuteDecisionContractText 返回执行阶段的输出协议说明。 +func BuildExecuteDecisionContractText() string { + return strings.TrimSpace(fmt.Sprintf(` +输出协议(严格 JSON): +- speak:给用户看的话 +- action:只能是 %s / %s / %s / %s / %s +- reason:给后端和日志看的简短说明 +- goal_check:输出 %s 或 %s 时必填,对照 done_when 逐条验证 +- tool_call:输出 %s 时可附带写工具意图(需 confirm),输出 %s 时可附带读工具调用 +- tool_call 格式:{"name": "工具名", "arguments": {...}} + +合法示例: +{ + "speak": "我来查一下本周的安排。", + "action": "%s", + "reason": "需要先调用 get_overview 获取当前数据", + "tool_call": { + "name": "get_overview", + "arguments": {} + } +} + +{ + "speak": "查询完成。", + "action": "%s", + "reason": "已拿到当前周课程列表", + "goal_check": "已通过 get_overview 确认本周课程列表,满足完成条件" +} + +{ + "speak": "", + "action": "%s", + "reason": "整个任务已完成" +} +`, + newagentmodel.ExecuteActionContinue, + newagentmodel.ExecuteActionAskUser, + newagentmodel.ExecuteActionConfirm, + newagentmodel.ExecuteActionNextPlan, + newagentmodel.ExecuteActionDone, + newagentmodel.ExecuteActionNextPlan, + newagentmodel.ExecuteActionDone, + newagentmodel.ExecuteActionConfirm, + newagentmodel.ExecuteActionContinue, + newagentmodel.ExecuteActionContinue, + newagentmodel.ExecuteActionNextPlan, + newagentmodel.ExecuteActionDone, + )) +} + // BuildExecuteMessages 组装执行阶段的 messages。 func BuildExecuteMessages(state *newagentmodel.CommonState, ctx *newagentmodel.ConversationContext) []*schema.Message { return buildStageMessages( @@ -71,16 +112,12 @@ func BuildExecuteUserPrompt(state *newagentmodel.CommonState) string { sb.WriteString("执行要求:\n") sb.WriteString("1. 始终围绕下面这个当前步骤行动。\n") sb.WriteString("2. 若当前步骤未完成,请继续思考-执行-观察循环。\n") - sb.WriteString("3. 若当前步骤已完成,请输出 ") - sb.WriteString(ExecuteNextPlanSignal) - sb.WriteString(",并填写 goal_check 说明完成依据。\n") - sb.WriteString("4. 若整个任务已完成,请输出 ") - sb.WriteString(ExecuteDoneSignal) - sb.WriteString(",并填写 goal_check 总结整体证据。\n") - sb.WriteString("5. 若缺少关键用户信息且现有上下文无法补足,请输出 ") - sb.WriteString(ExecuteAskUserSignal) - sb.WriteString("。\n") + sb.WriteString("3. 若当前步骤已完成,请输出 action=next_plan,并填写 goal_check 说明完成依据。\n") + sb.WriteString("4. 若整个任务已完成,请输出 action=done,并填写 goal_check 总结整体证据。\n") + sb.WriteString("5. 若缺少关键用户信息且现有上下文无法补足,请输出 action=ask_user。\n") sb.WriteString("6. 输出 next_plan 或 done 时,goal_check 不能为空,必须对照 done_when 逐条验证。\n") + sb.WriteString("\n") + sb.WriteString(BuildExecuteDecisionContractText()) sb.WriteString("\n当前步骤正文:\n") sb.WriteString(strings.TrimSpace(currentStep.Content)) sb.WriteString("\n") diff --git a/backend/newAgent/prompt/plan.go b/backend/newAgent/prompt/plan.go index 0f20f00..9a44fd5 100644 --- a/backend/newAgent/prompt/plan.go +++ b/backend/newAgent/prompt/plan.go @@ -131,168 +131,3 @@ func BuildPlanDecisionContractText() string { newagentmodel.PlanActionDone, )) } - -// buildStageMessages 组装某个阶段通用的 messages。 -// -// 步骤说明: -// 1. 先合并 context 自带 system prompt 与阶段 prompt,保证通用约束和阶段约束都生效; -// 2. 再把置顶上下文块和工具摘要补成 system message,尽量顶在 history 前面; -// 3. 最后追加历史消息与本轮 user prompt,保持“新约束在前、历史在后”的稳定顺序。 -func buildStageMessages(stageSystemPrompt string, ctx *newagentmodel.ConversationContext, runtimeUserPrompt string) []*schema.Message { - messages := make([]*schema.Message, 0, 4) - - mergedSystemPrompt := mergeSystemPrompts(ctx, stageSystemPrompt) - if mergedSystemPrompt != "" { - messages = append(messages, schema.SystemMessage(mergedSystemPrompt)) - } - - if pinnedText := renderPinnedBlocks(ctx); pinnedText != "" { - messages = append(messages, schema.SystemMessage(pinnedText)) - } - - if toolText := renderToolSchemas(ctx); toolText != "" { - messages = append(messages, schema.SystemMessage(toolText)) - } - - if ctx != nil { - history := ctx.HistorySnapshot() - if len(history) > 0 { - messages = append(messages, history...) - } - } - - runtimeUserPrompt = strings.TrimSpace(runtimeUserPrompt) - if runtimeUserPrompt != "" { - messages = append(messages, schema.UserMessage(runtimeUserPrompt)) - } - - return messages -} - -// renderStateSummary 把当前流程状态渲染成简洁文本。 -func renderStateSummary(state *newagentmodel.CommonState) string { - if state == nil { - return "当前状态:state 缺失,请先做兜底处理。" - } - - var sb strings.Builder - current, total := state.PlanProgress() - - sb.WriteString(fmt.Sprintf("当前阶段:%s\n", state.Phase)) - sb.WriteString(fmt.Sprintf("当前轮次:%d/%d\n", state.RoundUsed, state.MaxRounds)) - - if !state.HasPlan() { - sb.WriteString("当前完整 plan:暂无。\n") - return sb.String() - } - - sb.WriteString("当前完整 plan:\n") - for i, step := range state.PlanSteps { - sb.WriteString(fmt.Sprintf("%d. %s\n", i+1, strings.TrimSpace(step.Content))) - if strings.TrimSpace(step.DoneWhen) != "" { - sb.WriteString(fmt.Sprintf(" 完成判定:%s\n", strings.TrimSpace(step.DoneWhen))) - } - } - - if step, ok := state.CurrentPlanStep(); ok { - sb.WriteString(fmt.Sprintf("当前步骤进度:%d/%d\n", current, total)) - sb.WriteString("当前步骤内容:\n") - sb.WriteString(strings.TrimSpace(step.Content)) - sb.WriteString("\n") - if strings.TrimSpace(step.DoneWhen) != "" { - sb.WriteString("当前步骤完成判定:\n") - sb.WriteString(strings.TrimSpace(step.DoneWhen)) - sb.WriteString("\n") - } - } else { - sb.WriteString("当前步骤进度:暂时无有效当前步骤。\n") - } - - return sb.String() -} - -// renderPinnedBlocks 把 ConversationContext 中的置顶块渲染成独立的 system 文本。 -func renderPinnedBlocks(ctx *newagentmodel.ConversationContext) string { - if ctx == nil { - return "" - } - - blocks := ctx.PinnedBlocksSnapshot() - if len(blocks) == 0 { - return "" - } - - var sb strings.Builder - sb.WriteString("以下是后端置顶注入的上下文,请优先遵守:\n") - for _, block := range blocks { - title := strings.TrimSpace(block.Title) - if title == "" { - title = strings.TrimSpace(block.Key) - } - if title != "" { - sb.WriteString("【") - sb.WriteString(title) - sb.WriteString("】\n") - } - sb.WriteString(strings.TrimSpace(block.Content)) - sb.WriteString("\n") - } - return strings.TrimSpace(sb.String()) -} - -// renderToolSchemas 把工具摘要渲染成独立文本块。 -func renderToolSchemas(ctx *newagentmodel.ConversationContext) string { - if ctx == nil { - return "" - } - - schemas := ctx.ToolSchemasSnapshot() - if len(schemas) == 0 { - return "" - } - - var sb strings.Builder - sb.WriteString("以下是当前可用工具摘要,仅供你在规划时参考能力边界:\n") - for _, item := range schemas { - name := strings.TrimSpace(item.Name) - desc := strings.TrimSpace(item.Desc) - schemaText := strings.TrimSpace(item.SchemaText) - - if name != "" { - sb.WriteString("- 工具名:") - sb.WriteString(name) - sb.WriteString("\n") - } - if desc != "" { - sb.WriteString(" 说明:") - sb.WriteString(desc) - sb.WriteString("\n") - } - if schemaText != "" { - sb.WriteString(" 参数摘要:") - sb.WriteString(schemaText) - sb.WriteString("\n") - } - } - - return strings.TrimSpace(sb.String()) -} - -func mergeSystemPrompts(ctx *newagentmodel.ConversationContext, stageSystemPrompt string) string { - base := "" - if ctx != nil { - base = strings.TrimSpace(ctx.SystemPrompt) - } - stageSystemPrompt = strings.TrimSpace(stageSystemPrompt) - - switch { - case base == "" && stageSystemPrompt == "": - return "" - case base == "": - return stageSystemPrompt - case stageSystemPrompt == "": - return base - default: - return base + "\n\n" + stageSystemPrompt - } -} diff --git a/backend/newAgent/stream/emitter.go b/backend/newAgent/stream/emitter.go index 36b404b..be98915 100644 --- a/backend/newAgent/stream/emitter.go +++ b/backend/newAgent/stream/emitter.go @@ -196,7 +196,7 @@ func (e *ChunkEmitter) EmitStatus(blockID, stage, code, summary string, includeR return nil } - text := BuildStageReasoningText(stage, summary) + text := buildStageReasoningText(stage, summary) payload, err := ToOpenAIReasoningChunkWithExtra( e.RequestID, e.ModelName, @@ -220,7 +220,7 @@ func (e *ChunkEmitter) EmitToolCallStart(blockID, stage, toolName, summary, argu return nil } - text := BuildToolCallReasoningText(toolName, summary, argumentsPreview) + text := buildToolCallReasoningText(toolName, summary, argumentsPreview) payload, err := ToOpenAIReasoningChunkWithExtra( e.RequestID, e.ModelName, @@ -244,7 +244,7 @@ func (e *ChunkEmitter) EmitToolCallResult(blockID, stage, toolName, summary, arg return nil } - text := BuildToolResultReasoningText(toolName, summary) + text := buildToolResultReasoningText(toolName, summary) payload, err := ToOpenAIReasoningChunkWithExtra( e.RequestID, e.ModelName, @@ -273,7 +273,7 @@ func (e *ChunkEmitter) EmitConfirmRequest(ctx context.Context, blockID, stage, i return nil } - text := BuildConfirmAssistantText(title, summary) + text := buildConfirmAssistantText(title, summary) extra := NewConfirmRequestExtra(blockID, stage, interactionID, title, summary) return e.emitPseudoText( ctx, @@ -310,7 +310,7 @@ func (e *ChunkEmitter) EmitInterruptMessage(ctx context.Context, blockID, stage, return nil } - text := BuildInterruptAssistantText(interactionType, summary) + text := buildInterruptAssistantText(interactionType, summary) extra := NewInterruptExtra(blockID, stage, interactionID, interactionType, summary) return e.emitPseudoText( ctx, @@ -395,8 +395,7 @@ func EmitDone(emit PayloadEmitter) error { return NewChunkEmitter(emit, "", "", 0).EmitDone() } -// BuildStageReasoningText 生成统一阶段提示文本。 -func BuildStageReasoningText(stage, detail string) string { +func buildStageReasoningText(stage, detail string) string { stage = strings.TrimSpace(stage) detail = strings.TrimSpace(detail) @@ -410,8 +409,7 @@ func BuildStageReasoningText(stage, detail string) string { } } -// BuildToolCallReasoningText 生成“工具调用开始”时的可读提示文本。 -func BuildToolCallReasoningText(toolName, summary, argumentsPreview string) string { +func buildToolCallReasoningText(toolName, summary, argumentsPreview string) string { toolName = strings.TrimSpace(toolName) summary = strings.TrimSpace(summary) argumentsPreview = strings.TrimSpace(argumentsPreview) @@ -429,8 +427,7 @@ func BuildToolCallReasoningText(toolName, summary, argumentsPreview string) stri return strings.TrimSpace(strings.Join(lines, "\n")) } -// BuildToolResultReasoningText 生成“工具调用结果”时的可读提示文本。 -func BuildToolResultReasoningText(toolName, summary string) string { +func buildToolResultReasoningText(toolName, summary string) string { toolName = strings.TrimSpace(toolName) summary = strings.TrimSpace(summary) @@ -444,8 +441,7 @@ func BuildToolResultReasoningText(toolName, summary string) string { } } -// BuildConfirmAssistantText 生成给用户看的确认文案。 -func BuildConfirmAssistantText(title, summary string) string { +func buildConfirmAssistantText(title, summary string) string { title = strings.TrimSpace(title) summary = strings.TrimSpace(summary) @@ -459,8 +455,7 @@ func BuildConfirmAssistantText(title, summary string) string { } } -// BuildInterruptAssistantText 生成给用户看的中断文案。 -func BuildInterruptAssistantText(interactionType, summary string) string { +func buildInterruptAssistantText(interactionType, summary string) string { interactionType = strings.TrimSpace(interactionType) summary = strings.TrimSpace(summary) diff --git a/backend/newAgent/stream/sse_adapter.go b/backend/newAgent/stream/sse_adapter.go new file mode 100644 index 0000000..62b4c45 --- /dev/null +++ b/backend/newAgent/stream/sse_adapter.go @@ -0,0 +1,38 @@ +package newagentstream + +import ( + "fmt" +) + +// NewSSEPayloadEmitter 创建将 chunk 事件写入 outChan 的 emitter。 +// +// 职责边界: +// 1. 接收 outChan(SSE 输出通道),返回 PayloadEmitter 函数; +// 2. 只把原始 JSON payload 写入通道,不添加 "data: " 前缀和 "\n\n" 后缀; +// 3. SSE 格式化("data: " + payload + "\n\n")由 API 层的 writeSSEData 统一处理; +// 4. 发送失败时返回 error,但不关闭通道(通道由调用方管理)。 +// +// 使用示例: +// +// emitter := NewSSEPayloadEmitter(outChan) +// chunkEmitter := NewChunkEmitter(emitter, requestID, modelName, created) +// chunkEmitter.EmitAssistantText("", "", "hello", true) +func NewSSEPayloadEmitter(outChan chan<- string) PayloadEmitter { + return func(payload string) error { + if outChan == nil { + return nil + } + + if payload == "" { + return nil + } + + select { + case outChan <- payload: + return nil + default: + // 通道已满或已关闭:不阻塞,直接返回错误。 + return fmt.Errorf("outChan full or closed") + } + } +} diff --git a/backend/newAgent/tools/args.go b/backend/newAgent/tools/args.go new file mode 100644 index 0000000..dd29da2 --- /dev/null +++ b/backend/newAgent/tools/args.go @@ -0,0 +1,87 @@ +package newagenttools + +import "fmt" + +// ==================== 参数解析辅助 ==================== +// 这些函数专门用于从 LLM 输出的 map[string]any 中提取工具参数。 +// JSON 反序列化后数字默认为 float64,字符串为 string,需要类型断言。 + +// argsInt 从 map 中提取 int 值。支持 float64(JSON 反序列化的默认类型)。 +func argsInt(args map[string]any, key string) (int, bool) { + v, ok := args[key] + if !ok { + return 0, false + } + switch n := v.(type) { + case float64: + return int(n), true + case int: + return n, true + } + return 0, false +} + +// argsString 从 map 中提取 string 值。 +func argsString(args map[string]any, key string) (string, bool) { + v, ok := args[key] + if !ok { + return "", false + } + s, ok := v.(string) + return s, ok +} + +// argsIntPtr 从 map 中提取可选 int 值,不存在返回 nil。 +func argsIntPtr(args map[string]any, key string) *int { + v, ok := argsInt(args, key) + if !ok { + return nil + } + return &v +} + +// argsStringPtr 从 map 中提取可选 string 值,不存在返回 nil。 +func argsStringPtr(args map[string]any, key string) *string { + v, ok := argsString(args, key) + if !ok { + return nil + } + return &v +} + +// argsMoveList 从 map 中提取 batch_move 的 moves 数组。 +func argsMoveList(args map[string]any) ([]MoveRequest, error) { + v, ok := args["moves"] + if !ok { + return nil, fmt.Errorf("缺少 moves 参数") + } + arr, ok := v.([]any) + if !ok { + return nil, fmt.Errorf("moves 参数必须是数组") + } + moves := make([]MoveRequest, 0, len(arr)) + for i, item := range arr { + m, ok := item.(map[string]any) + if !ok { + return nil, fmt.Errorf("moves[%d] 不是有效对象", i) + } + taskID, ok := argsInt(m, "task_id") + if !ok { + return nil, fmt.Errorf("moves[%d].task_id 缺失或无效", i) + } + newDay, ok := argsInt(m, "new_day") + if !ok { + return nil, fmt.Errorf("moves[%d].new_day 缺失或无效", i) + } + newSlotStart, ok := argsInt(m, "new_slot_start") + if !ok { + return nil, fmt.Errorf("moves[%d].new_slot_start 缺失或无效", i) + } + moves = append(moves, MoveRequest{ + TaskID: taskID, + NewDay: newDay, + NewSlotStart: newSlotStart, + }) + } + return moves, nil +} diff --git a/backend/newAgent/tools/read_tools.go b/backend/newAgent/tools/read_tools.go index c48fb9f..ac57939 100644 --- a/backend/newAgent/tools/read_tools.go +++ b/backend/newAgent/tools/read_tools.go @@ -148,6 +148,7 @@ func queryRangeSpecific(state *ScheduleState, day, startSlot, endSlot int) strin var sb strings.Builder sb.WriteString(fmt.Sprintf("第%d天 第%s:\n\n", day, formatSlotRange(startSlot, endSlot))) + total := endSlot - startSlot + 1 freeCount := 0 for s := startSlot; s <= endSlot; s++ { occupant := slotOccupiedBy(state, day, s) @@ -159,21 +160,9 @@ func queryRangeSpecific(state *ScheduleState, day, startSlot, endSlot int) strin } } - total := endSlot - startSlot + 1 - sb.WriteString(fmt.Sprintf("\n该范围%d个时段全部空闲。\n", total)) - if freeCount < total { - // 替换"全部空闲"为实际空闲数 - sb.Reset() - // 重新构建(非全部空闲的情况不需要"该范围全部空闲") - sb.WriteString(fmt.Sprintf("第%d天 第%s:\n\n", day, formatSlotRange(startSlot, endSlot))) - for s := startSlot; s <= endSlot; s++ { - occupant := slotOccupiedBy(state, day, s) - if occupant == nil { - sb.WriteString(fmt.Sprintf("第%d节:空\n", s)) - } else { - sb.WriteString(fmt.Sprintf("第%d节:[%d]%s\n", s, occupant.StateID, occupant.Name)) - } - } + if freeCount == total { + sb.WriteString(fmt.Sprintf("\n该范围%d个时段全部空闲。\n", total)) + } else { sb.WriteString(fmt.Sprintf("\n该范围%d个时段中,%d个空闲,%d个被占用。\n", total, freeCount, total-freeCount)) } diff --git a/backend/newAgent/tools/registry.go b/backend/newAgent/tools/registry.go index ae27722..fb16001 100644 --- a/backend/newAgent/tools/registry.go +++ b/backend/newAgent/tools/registry.go @@ -85,88 +85,6 @@ func (r *ToolRegistry) IsWriteTool(name string) bool { return writeTools[name] } -// ==================== 参数解析辅助 ==================== - -// argsInt 从 map 中提取 int 值。支持 float64(JSON 反序列化的默认类型)。 -func argsInt(args map[string]any, key string) (int, bool) { - v, ok := args[key] - if !ok { - return 0, false - } - switch n := v.(type) { - case float64: - return int(n), true - case int: - return n, true - } - return 0, false -} - -// argsString 从 map 中提取 string 值。 -func argsString(args map[string]any, key string) (string, bool) { - v, ok := args[key] - if !ok { - return "", false - } - s, ok := v.(string) - return s, ok -} - -// argsIntPtr 从 map 中提取可选 int 值,不存在返回 nil。 -func argsIntPtr(args map[string]any, key string) *int { - v, ok := argsInt(args, key) - if !ok { - return nil - } - return &v -} - -// argsStringPtr 从 map 中提取可选 string 值,不存在返回 nil。 -func argsStringPtr(args map[string]any, key string) *string { - v, ok := argsString(args, key) - if !ok { - return nil - } - return &v -} - -// argsMoveList 从 map 中提取 batch_move 的 moves 数组。 -func argsMoveList(args map[string]any) ([]MoveRequest, error) { - v, ok := args["moves"] - if !ok { - return nil, fmt.Errorf("缺少 moves 参数") - } - arr, ok := v.([]any) - if !ok { - return nil, fmt.Errorf("moves 参数必须是数组") - } - moves := make([]MoveRequest, 0, len(arr)) - for i, item := range arr { - m, ok := item.(map[string]any) - if !ok { - return nil, fmt.Errorf("moves[%d] 不是有效对象", i) - } - taskID, ok := argsInt(m, "task_id") - if !ok { - return nil, fmt.Errorf("moves[%d].task_id 缺失或无效", i) - } - newDay, ok := argsInt(m, "new_day") - if !ok { - return nil, fmt.Errorf("moves[%d].new_day 缺失或无效", i) - } - newSlotStart, ok := argsInt(m, "new_slot_start") - if !ok { - return nil, fmt.Errorf("moves[%d].new_slot_start 缺失或无效", i) - } - moves = append(moves, MoveRequest{ - TaskID: taskID, - NewDay: newDay, - NewSlotStart: newSlotStart, - }) - } - return moves, nil -} - // ==================== 写工具名集合 ==================== var writeTools = map[string]bool{ diff --git a/backend/newAgent/tools/write_helpers.go b/backend/newAgent/tools/write_helpers.go index 083acfa..bebf74f 100644 --- a/backend/newAgent/tools/write_helpers.go +++ b/backend/newAgent/tools/write_helpers.go @@ -2,12 +2,11 @@ package newagenttools import ( "fmt" + "sort" "strings" ) // ==================== 写工具专用辅助函数 ==================== -// 复用 read_helpers.go 中的:formatSlotRange, formatTaskLabel, slotOccupiedBy, -// findFreeRangesOnDay, getTasksOnDay, countDayOccupied, taskOnDay, freeRange // ==================== 校验函数 ==================== @@ -129,6 +128,63 @@ func countPending(state *ScheduleState) int { return count } +// ==================== 任务时段辅助 ==================== + +// formatTaskSlotsBrief 将任务的时段列表格式化为简短描述。 +// 如 "第1天(1-2节) 第4天(3-4节)"。 +func formatTaskSlotsBrief(slots []TaskSlot) string { + parts := make([]string, 0, len(slots)) + for _, slot := range slots { + parts = append(parts, fmt.Sprintf("第%d天第%s", slot.Day, formatSlotRange(slot.SlotStart, slot.SlotEnd))) + } + return strings.Join(parts, " ") +} + +// collectAffectedDays 从旧位置和新位置中收集所有涉及的天(去重排序)。 +func collectAffectedDays(oldSlots, newSlots []TaskSlot) []int { + days := make(map[int]bool) + for _, s := range oldSlots { + days[s.Day] = true + } + for _, s := range newSlots { + days[s.Day] = true + } + return sortedKeys(days) +} + +// collectAffectedDaysFromSlots 从单个 slot 列表中收集涉及的天。 +func collectAffectedDaysFromSlots(slots []TaskSlot) []int { + days := make(map[int]bool) + for _, s := range slots { + days[s.Day] = true + } + return sortedKeys(days) +} + +// sortedKeys 将 map 的 key 排序后返回。 +func sortedKeys(m map[int]bool) []int { + keys := make([]int, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Ints(keys) + return keys +} + +// uniqueSorted 对 int 切片去重并排序。 +func uniqueSorted(s []int) []int { + seen := make(map[int]bool) + result := make([]int, 0, len(s)) + for _, v := range s { + if !seen[v] { + seen[v] = true + result = append(result, v) + } + } + sort.Ints(result) + return result +} + // ==================== 输出格式化 ==================== // formatDayOccupancy 格式化某天的占用摘要。 diff --git a/backend/newAgent/tools/write_tools.go b/backend/newAgent/tools/write_tools.go index 7d6d9cc..770c43b 100644 --- a/backend/newAgent/tools/write_tools.go +++ b/backend/newAgent/tools/write_tools.go @@ -2,7 +2,6 @@ package newagenttools import ( "fmt" - "sort" "strings" ) @@ -366,14 +365,14 @@ func Unplace(state *ScheduleState, taskID int) string { if task.EmbeddedBy != nil { guest := state.TaskByStateID(*task.EmbeddedBy) if guest != nil { + // 先从嵌入时设置的 Slots 推算 Duration,再清空。 + // Place 嵌入时 guest.Slots 被设置为实际占用范围,这里从中恢复时长。 + if len(guest.Slots) > 0 { + guest.Duration = taskDuration(*guest) + } guest.EmbedHost = nil guest.Slots = nil guest.Status = "pending" - // 恢复客人的 Duration:从原始数据推断。 - // 嵌入客人只占一个 slot range,取其长度作为 duration。 - if len(oldSlots) > 0 { - // 客人被嵌入到宿主的 slot 里,客人自己的 slot 在嵌入时被设置了 - } } task.EmbeddedBy = nil } @@ -394,60 +393,3 @@ func Unplace(state *ScheduleState, taskID int) string { sb.WriteString(fmt.Sprintf("待安排任务剩余:%d个。", countPending(state))) return sb.String() } - -// ==================== 内部辅助函数 ==================== - -// formatTaskSlotsBrief 将任务的时段列表格式化为简短描述。 -// 如 "第1天(1-2节) 第4天(3-4节)"。 -func formatTaskSlotsBrief(slots []TaskSlot) string { - parts := make([]string, 0, len(slots)) - for _, slot := range slots { - parts = append(parts, fmt.Sprintf("第%d天第%s", slot.Day, formatSlotRange(slot.SlotStart, slot.SlotEnd))) - } - return strings.Join(parts, " ") -} - -// collectAffectedDays 从旧位置和新位置中收集所有涉及的天(去重排序)。 -func collectAffectedDays(oldSlots, newSlots []TaskSlot) []int { - days := make(map[int]bool) - for _, s := range oldSlots { - days[s.Day] = true - } - for _, s := range newSlots { - days[s.Day] = true - } - return sortedKeys(days) -} - -// collectAffectedDaysFromSlots 从单个 slot 列表中收集涉及的天。 -func collectAffectedDaysFromSlots(slots []TaskSlot) []int { - days := make(map[int]bool) - for _, s := range slots { - days[s.Day] = true - } - return sortedKeys(days) -} - -// sortedKeys 将 map 的 key 排序后返回。 -func sortedKeys(m map[int]bool) []int { - keys := make([]int, 0, len(m)) - for k := range m { - keys = append(keys, k) - } - sort.Ints(keys) - return keys -} - -// uniqueSorted 对 int 切片去重并排序。 -func uniqueSorted(s []int) []int { - seen := make(map[int]bool) - result := make([]int, 0, len(s)) - for _, v := range s { - if !seen[v] { - seen[v] = true - result = append(result, v) - } - } - sort.Ints(result) - return result -} diff --git a/backend/respond/respond.go b/backend/respond/respond.go index a70df89..a17bb7b 100644 --- a/backend/respond/respond.go +++ b/backend/respond/respond.go @@ -334,6 +334,11 @@ var ( //请求相关的响应 Info: "schedule plan preview not found", } + MissingConversationID = Response{ //确认/恢复请求缺少会话ID + Status: "40054", + Info: "conversation_id is required when confirm_action is present", + } + RouteControlInternalError = Response{ //路由控制码内部错误 Status: "50001", Info: "route control failed", diff --git a/backend/service/agentsvc/agent.go b/backend/service/agentsvc/agent.go index 0cae26c..5a5a1cc 100644 --- a/backend/service/agentsvc/agent.go +++ b/backend/service/agentsvc/agent.go @@ -16,6 +16,8 @@ import ( outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" "github.com/LoveLosita/smartflow/backend/inits" "github.com/LoveLosita/smartflow/backend/model" + newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" + newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" "github.com/LoveLosita/smartflow/backend/pkg" "github.com/LoveLosita/smartflow/backend/respond" eventsvc "github.com/LoveLosita/smartflow/backend/service/events" @@ -42,12 +44,18 @@ type AgentService struct { // 1. 负责把“多任务类粗排结果 + 既有日程”合并成 HybridEntries; // 2. daily/weekly ReAct 全部基于这个结果继续优化。 HybridScheduleWithPlanMultiFunc func(ctx context.Context, userID int, taskClassIDs []int) ([]model.HybridScheduleEntry, []model.TaskClassItem, error) - // ResolvePlanningWindowFunc 负责把 task_class_ids 解析成“全局排程窗口”的相对周/天边界。 + // ResolvePlanningWindowFunc 负责把 task_class_ids 解析成”全局排程窗口”的相对周/天边界。 // // 作用: // 1. 给周级 Move 增加硬边界,避免首尾不足一周时移出有效日期范围; - // 2. 该函数只做“窗口解析”,不负责粗排与混排计算。 + // 2. 该函数只做”窗口解析”,不负责粗排与混排计算。 ResolvePlanningWindowFunc func(ctx context.Context, userID int, taskClassIDs []int) (startWeek, startDay, endWeek, endDay int, err error) + + // ── newAgent 依赖(由 cmd/start.go 通过 Set* 方法注入)── + toolRegistry *newagenttools.ToolRegistry + scheduleProvider newagentmodel.ScheduleStateProvider + schedulePersistor newagentmodel.SchedulePersistor + agentStateStore newagentmodel.AgentStateStore } // NewAgentService 构造 AgentService。 @@ -522,13 +530,27 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin requestStart := time.Now() traceID := uuid.NewString() - // 1. 每个请求都返回两个通道: - // - outChan:推送流式输出片段; - // - errChan:推送异步阶段错误(非阻塞上报)。 outChan := make(chan string, 8) errChan := make(chan error, 1) - // 0. 初始化“请求级 token 统计器”,用于聚合本次请求所有模型开销。 + go func() { + defer close(outChan) + s.runNewAgentGraph(ctx, userMessage, ifThinking, modelName, userID, chatID, extra, traceID, requestStart, outChan, errChan) + }() + + return outChan, errChan +} + +// agentChatOld 是旧路由逻辑的备份,暂时保留供回滚使用。 +// TODO: 新 graph 稳定后删除。 +func (s *AgentService) agentChatOld(ctx context.Context, userMessage string, ifThinking bool, modelName string, userID int, chatID string, extra map[string]any) (<-chan string, <-chan error) { + requestStart := time.Now() + traceID := uuid.NewString() + + outChan := make(chan string, 8) + errChan := make(chan error, 1) + + // 0. 初始化”请求级 token 统计器”,用于聚合本次请求所有模型开销。 requestCtx, _ := withRequestTokenMeter(ctx) // 1) 规范会话 ID,选择模型。 diff --git a/backend/service/agentsvc/agent_newagent.go b/backend/service/agentsvc/agent_newagent.go new file mode 100644 index 0000000..aeee2a3 --- /dev/null +++ b/backend/service/agentsvc/agent_newagent.go @@ -0,0 +1,399 @@ +package agentsvc + +import ( + "context" + "fmt" + "log" + "strings" + "time" + + newagentgraph "github.com/LoveLosita/smartflow/backend/newAgent/graph" + newagentllm "github.com/LoveLosita/smartflow/backend/newAgent/llm" + newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" + newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" + newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" + "github.com/cloudwego/eino/schema" + + agentchat "github.com/LoveLosita/smartflow/backend/agent/chat" + "github.com/LoveLosita/smartflow/backend/conv" + "github.com/LoveLosita/smartflow/backend/model" + "github.com/LoveLosita/smartflow/backend/pkg" +) + +// runNewAgentGraph 运行 newAgent 通用 graph,直接替换旧 agent 路由逻辑。 +// +// 职责边界: +// 1. 负责构造 AgentGraphRunInput(RuntimeState、ConversationContext、Request、Deps); +// 2. 负责将 outChan 适配为 ChunkEmitter; +// 3. 负责调用 graph.RunAgentGraph; +// 4. 负责持久化聊天历史(复用现有逻辑)。 +// +// 设计原则: +// 1. 直接走 newAgent graph,不再经过旧的 agentrouter 路由决策; +// 2. 所有任务类型(chat、task、quick_note)都由 graph 内部 LLM 决策; +// 3. 状态恢复、工具执行、确认流程全部由 graph 节点处理。 +func (s *AgentService) runNewAgentGraph( + ctx context.Context, + userMessage string, + ifThinking bool, + modelName string, + userID int, + chatID string, + extra map[string]any, + traceID string, + requestStart time.Time, + outChan chan<- string, + errChan chan error, +) { + requestCtx, _ := withRequestTokenMeter(ctx) + + // 1. 规范会话 ID 和模型选择。 + chatID = normalizeConversationID(chatID) + _, resolvedModelName := s.pickChatModel(modelName) + + // 2. 确保会话存在(优先缓存,必要时回源 DB)。 + result, err := s.agentCache.GetConversationStatus(requestCtx, chatID) + if err != nil { + pushErrNonBlocking(errChan, err) + return + } + if !result { + innerResult, ifErr := s.repo.IfChatExists(requestCtx, userID, chatID) + if ifErr != nil { + pushErrNonBlocking(errChan, ifErr) + return + } + if !innerResult { + if _, err = s.repo.CreateNewChat(userID, chatID); err != nil { + pushErrNonBlocking(errChan, err) + return + } + } + if err = s.agentCache.SetConversationStatus(requestCtx, chatID); err != nil { + log.Printf("设置会话状态缓存失败 chat=%s: %v", chatID, err) + } + } + + // 3. 构建重试元数据。 + retryMeta, err := s.buildChatRetryMeta(requestCtx, userID, chatID, extra) + if err != nil { + pushErrNonBlocking(errChan, err) + return + } + + // 4. 从 StateStore 加载或创建 RuntimeState。 + // 恢复场景(confirm/ask_user)同时拿到快照中保存的 ConversationContext, + // 其中包含工具调用/结果等中间消息,保证后续 LLM 调用的消息链完整。 + runtimeState, savedConversationContext := s.loadOrCreateRuntimeState(requestCtx, chatID, userID) + + // 5. 构造 ConversationContext。 + // 优先使用快照中恢复的 ConversationContext(含工具调用/结果), + // 无快照时从 Redis LLM 历史缓存加载。 + var conversationContext *newagentmodel.ConversationContext + if savedConversationContext != nil { + conversationContext = savedConversationContext + // 把用户本轮输入追加到恢复的上下文中(与 loadConversationContext 行为一致)。 + if strings.TrimSpace(userMessage) != "" { + conversationContext.AppendHistory(schema.UserMessage(userMessage)) + } + } else { + conversationContext = s.loadConversationContext(requestCtx, chatID, userMessage) + } + + // 6. 构造 AgentGraphRequest。 + var confirmAction string + if len(extra) > 0 { + confirmAction = readAgentExtraString(extra, "confirm_action") + } + graphRequest := newagentmodel.AgentGraphRequest{ + UserInput: userMessage, + ConfirmAction: confirmAction, + } + graphRequest.Normalize() + + // 7. 适配 LLM clients(从 AIHub 的 ark.ChatModel 转换为 newAgent LLM Client)。 + chatClient := newagentllm.WrapArkClient(s.AIHub.Worker) + planClient := newagentllm.WrapArkClient(s.AIHub.Worker) + executeClient := newagentllm.WrapArkClient(s.AIHub.Worker) + deliverClient := newagentllm.WrapArkClient(s.AIHub.Worker) + + // 8. 适配 SSE emitter。 + sseEmitter := newagentstream.NewSSEPayloadEmitter(outChan) + chunkEmitter := newagentstream.NewChunkEmitter(sseEmitter, traceID, resolvedModelName, requestStart.Unix()) + + // 9. 构造 AgentGraphDeps(由 cmd/start.go 注入的依赖)。 + deps := newagentmodel.AgentGraphDeps{ + ChatClient: chatClient, + PlanClient: planClient, + ExecuteClient: executeClient, + DeliverClient: deliverClient, + ChunkEmitter: chunkEmitter, + StateStore: s.agentStateStore, + ToolRegistry: s.toolRegistry, + ScheduleProvider: s.scheduleProvider, + SchedulePersistor: s.schedulePersistor, + } + + // 10. 构造 AgentGraphRunInput 并运行 graph。 + runInput := newagentmodel.AgentGraphRunInput{ + RuntimeState: runtimeState, + ConversationContext: conversationContext, + Request: graphRequest, + Deps: deps, + } + + finalState, graphErr := newagentgraph.RunAgentGraph(requestCtx, runInput) + if graphErr != nil { + log.Printf("[ERROR] newAgent graph 执行失败 trace=%s chat=%s: %v", traceID, chatID, graphErr) + pushErrNonBlocking(errChan, fmt.Errorf("graph 执行失败: %w", graphErr)) + + // Graph 出错时回退普通聊天,保证可用性。 + s.runNormalChatFlow(requestCtx, s.AIHub.Worker, resolvedModelName, userMessage, "", nil, retryMeta, ifThinking, userID, chatID, traceID, requestStart, outChan, errChan) + return + } + + // 11. 持久化聊天历史(用户消息 + 助手回复)。 + s.persistChatAfterGraph(requestCtx, userID, chatID, userMessage, finalState, retryMeta, requestStart, outChan, errChan) + + // 12. 发送 OpenAI 兼容的流式结束标记,告知客户端 stream 已完成。 + _ = chunkEmitter.EmitDone() + + // 13. 异步生成会话标题。 + s.ensureConversationTitleAsync(userID, chatID) +} + +// loadOrCreateRuntimeState 从 StateStore 加载或创建新的 RuntimeState。 +// +// 返回值: +// - RuntimeState:可持久化流程状态; +// - ConversationContext:快照中保存的完整对话上下文(含工具调用/结果), +// 仅在恢复已有快照时非 nil,新建会话时为 nil。 +// +// 设计说明: +// 1. 快照中的 ConversationContext 包含 graph 执行期间的完整中间消息(工具调用、工具结果等), +// 这些消息不会出现在 Redis LLM 历史缓存中; +// 2. 恢复场景(confirm/ask_user)必须使用快照中的 ConversationContext,否则工具结果丢失, +// 导致后续 LLM 调用收到非法的裸 Tool 消息,API 拒绝请求、连接断开。 +func (s *AgentService) loadOrCreateRuntimeState(ctx context.Context, chatID string, userID int) (*newagentmodel.AgentRuntimeState, *newagentmodel.ConversationContext) { + newRT := func() (*newagentmodel.AgentRuntimeState, *newagentmodel.ConversationContext) { + rt := newagentmodel.NewAgentRuntimeState(nil) + cs := rt.EnsureCommonState() + cs.UserID = userID + cs.ConversationID = chatID // saveAgentState 依赖此字段决定是否持久化 + return rt, nil + } + + if s.agentStateStore == nil { + return newRT() + } + + snapshot, ok, err := s.agentStateStore.Load(ctx, chatID) + log.Printf("[DEBUG] loadOrCreateRuntimeState chatID=%s ok=%v err=%v hasRuntime=%v hasPending=%v hasCtx=%v", + chatID, ok, err, + snapshot != nil && snapshot.RuntimeState != nil, + snapshot != nil && snapshot.RuntimeState != nil && snapshot.RuntimeState.HasPendingInteraction(), + snapshot != nil && snapshot.ConversationContext != nil, + ) + if err != nil { + log.Printf("加载 agent 状态失败 chat=%s: %v", chatID, err) + return newRT() + } + if ok && snapshot != nil && snapshot.RuntimeState != nil { + // 恢复运行态,确保身份信息与当前请求一致。 + cs := snapshot.RuntimeState.EnsureCommonState() + cs.UserID = userID + cs.ConversationID = chatID + return snapshot.RuntimeState, snapshot.ConversationContext + } + return newRT() +} + +// loadConversationContext 加载对话历史,构造 ConversationContext。 +func (s *AgentService) loadConversationContext(ctx context.Context, chatID, userMessage string) *newagentmodel.ConversationContext { + // 从 Redis 加载历史。 + history, err := s.agentCache.GetHistory(ctx, chatID) + if err != nil { + log.Printf("加载历史失败 chat=%s: %v", chatID, err) + history = nil + } + + // 缓存未命中时回源 DB。 + if history == nil { + histories, hisErr := s.repo.GetUserChatHistories(ctx, 0, pkg.HistoryFetchLimitByModel("worker"), chatID) + if hisErr != nil { + log.Printf("从 DB 加载历史失败 chat=%s: %v", chatID, hisErr) + } else { + history = conv.ToEinoMessages(histories) + // 回填到 Redis。 + if backfillErr := s.agentCache.BackfillHistory(ctx, chatID, history); backfillErr != nil { + log.Printf("回填历史到 Redis 失败 chat=%s: %v", chatID, backfillErr) + } + } + } + + // 构造 ConversationContext。 + conversationContext := newagentmodel.NewConversationContext(agentchat.SystemPrompt) + if history != nil { + conversationContext.ReplaceHistory(history) + } + + // 把用户本轮输入追加到历史(供 graph 使用)。 + if strings.TrimSpace(userMessage) != "" { + conversationContext.AppendHistory(schema.UserMessage(userMessage)) + } + + return conversationContext +} + +// persistChatAfterGraph graph 执行完成后持久化聊天历史。 +func (s *AgentService) persistChatAfterGraph( + ctx context.Context, + userID int, + chatID string, + userMessage string, + finalState *newagentmodel.AgentGraphState, + retryMeta *chatRetryMeta, + requestStart time.Time, + outChan chan<- string, + errChan chan error, +) { + if finalState == nil { + return + } + + // 1. 持久化用户消息:先写 LLM 上下文 Redis,再落 DB,最后更新 UI 历史缓存。 + userMsg := &schema.Message{Role: schema.User, Content: userMessage} + if retryExtra := retryMeta.CacheExtra(); len(retryExtra) > 0 { + userMsg.Extra = retryExtra + } + if err := s.agentCache.PushMessage(ctx, chatID, userMsg); err != nil { + log.Printf("写入用户消息到 LLM 上下文 Redis 失败 chat=%s: %v", chatID, err) + } + + userPayload := model.ChatHistoryPersistPayload{ + UserID: userID, + ConversationID: chatID, + Role: "user", + Message: userMessage, + ReasoningContent: "", + ReasoningDurationSeconds: 0, + RetryGroupID: retryMeta.GroupIDPtr(), + RetryIndex: retryMeta.IndexPtr(), + RetryFromUserMessageID: retryMeta.FromUserMessageIDPtr(), + RetryFromAssistantMessageID: retryMeta.FromAssistantMessageIDPtr(), + TokensConsumed: 0, + } + if err := s.PersistChatHistory(ctx, userPayload); err != nil { + pushErrNonBlocking(errChan, err) + } + userCreatedAt := time.Now() + s.appendConversationHistoryCacheOptimistically( + context.Background(), + userID, + chatID, + buildOptimisticConversationHistoryItem("user", userMessage, "", 0, retryMeta, userCreatedAt), + ) + + // 2. 从 ConversationContext 提取助手回复(最后一条 assistant 消息)。 + conversationContext := finalState.ConversationContext + if conversationContext == nil || len(conversationContext.History) == 0 { + return + } + + var lastAssistantMsg *schema.Message + for i := len(conversationContext.History) - 1; i >= 0; i-- { + msg := conversationContext.History[i] + if msg.Role == schema.Assistant { + lastAssistantMsg = msg + break + } + } + + if lastAssistantMsg == nil { + return + } + + assistantReply := lastAssistantMsg.Content + reasoningContent := lastAssistantMsg.ReasoningContent + var reasoningDurationSeconds int + if lastAssistantMsg.Extra != nil { + if dur, ok := lastAssistantMsg.Extra["reasoning_duration_seconds"].(float64); ok { + reasoningDurationSeconds = int(dur) + } + } + + // 3. 持久化助手消息:先写 LLM 上下文 Redis,再落 DB,最后更新 UI 历史缓存。 + assistantMsg := &schema.Message{ + Role: schema.Assistant, + Content: assistantReply, + ReasoningContent: reasoningContent, + } + if reasoningDurationSeconds > 0 { + assistantMsg.Extra = map[string]any{"reasoning_duration_seconds": reasoningDurationSeconds} + } + if retryExtra := retryMeta.CacheExtra(); len(retryExtra) > 0 { + if assistantMsg.Extra == nil { + assistantMsg.Extra = make(map[string]any) + } + for k, v := range retryExtra { + assistantMsg.Extra[k] = v + } + } + if err := s.agentCache.PushMessage(context.Background(), chatID, assistantMsg); err != nil { + log.Printf("写入助手消息到 LLM 上下文 Redis 失败 chat=%s: %v", chatID, err) + } + + requestTotalTokens := snapshotRequestTokenMeter(ctx).TotalTokens + assistantPayload := model.ChatHistoryPersistPayload{ + UserID: userID, + ConversationID: chatID, + Role: "assistant", + Message: assistantReply, + ReasoningContent: reasoningContent, + ReasoningDurationSeconds: reasoningDurationSeconds, + RetryGroupID: retryMeta.GroupIDPtr(), + RetryIndex: retryMeta.IndexPtr(), + RetryFromUserMessageID: retryMeta.FromUserMessageIDPtr(), + RetryFromAssistantMessageID: retryMeta.FromAssistantMessageIDPtr(), + TokensConsumed: requestTotalTokens, + } + if err := s.PersistChatHistory(ctx, assistantPayload); err != nil { + pushErrNonBlocking(errChan, err) + } else { + s.appendConversationHistoryCacheOptimistically( + context.Background(), + userID, + chatID, + buildOptimisticConversationHistoryItem( + "assistant", + assistantReply, + reasoningContent, + reasoningDurationSeconds, + retryMeta, + time.Now(), + ), + ) + } +} + +// --- 依赖注入字段 --- + +// toolRegistry 由 cmd/start.go 注入 +func (s *AgentService) SetToolRegistry(registry *newagenttools.ToolRegistry) { + s.toolRegistry = registry +} + +// scheduleProvider 由 cmd/start.go 注入 +func (s *AgentService) SetScheduleProvider(provider newagentmodel.ScheduleStateProvider) { + s.scheduleProvider = provider +} + +// schedulePersistor 由 cmd/start.go 注入 +func (s *AgentService) SetSchedulePersistor(persistor newagentmodel.SchedulePersistor) { + s.schedulePersistor = persistor +} + +// agentStateStore 由 cmd/start.go 注入 +func (s *AgentService) SetAgentStateStore(store newagentmodel.AgentStateStore) { + s.agentStateStore = store +}