Files
smartmate/backend/service/agentsvc/agent_schedule_refine.go
Losita 525a8b32cb Version: 0.7.3.dev.260322
♻️ refactor(schedule-refine): [WIP] 重构 Plan-and-Execute ReAct 链路,并增强 JSON 解析兜底能力

- 🧩 重构 `schedulerefine` 主流程,引入 `Planner` / `Replan` 机制,以及执行预算与轮次状态管理
- 🧠 扩展状态与观察上下文,补充工具结果、失败签名、连续失败计数与后置反思策略等信息
- 🔧 增强工具层能力与参数兼容性,补齐 `Query` / `Move` / `Swap` / `BatchMove` / `Verify` 等行为及约束校验
- 🛡️ 提升解析鲁棒性,支持从代码块或混杂文本中提取首个 JSON 对象,并增加单次解析重试机制
- 👀 增强可观测性,补充 `debug raw` 阶段输出与分片透传能力
- ✍️ 优化提示词近端约束,将严格 JSON 输出协议追加到各节点 `userPrompt` 末尾

- 🚧 备注:当前链路仍处于持续调优阶段,稳定性与可用性仍需进一步验证
2026-03-22 22:38:51 +08:00

155 lines
5.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package agentsvc
import (
"context"
"errors"
"log"
"strings"
"github.com/LoveLosita/smartflow/backend/agent/scheduleplan"
"github.com/LoveLosita/smartflow/backend/agent/schedulerefine"
"github.com/LoveLosita/smartflow/backend/model"
"github.com/LoveLosita/smartflow/backend/respond"
"github.com/cloudwego/eino-ext/components/model/ark"
)
// runScheduleRefineFlow 执行“连续对话微调排程”分支。
//
// 职责边界:
// 1. 负责读取“上一版排程预览快照”(优先 Redis缺失再回源 MySQL
// 2. 负责调用独立 schedulerefine 图链路完成本轮微调;
// 3. 负责把微调结果回写预览缓存与状态快照,供后续继续微调;
// 4. 不负责聊天消息持久化(消息持久化由 AgentChat 主链路统一处理)。
func (s *AgentService) runScheduleRefineFlow(
ctx context.Context,
selectedModel *ark.ChatModel,
userMessage string,
userID int,
chatID string,
traceID string,
emitStage func(stage, detail string),
outChan chan<- string,
modelName string,
) (string, error) {
_ = outChan
_ = modelName
// 1. 依赖预检:模型为空时无法执行任何节点,直接失败避免空指针。
if selectedModel == nil {
return "", errors.New("schedule refine model is nil")
}
emitStage("schedule_refine.context.loading", "正在加载上一版排程上下文。")
// 2. 先查 Redis 预览快照,保证热路径低延迟。
// 2.1 如果 Redis 未命中,再回源 MySQL 快照兜底;
// 2.2 如果两者都没有,说明当前会话没有可微调基础,直接返回业务错误。
preview := s.loadSchedulePreviewContext(ctx, userID, chatID)
if preview == nil {
return "", respond.SchedulePlanPreviewNotFound
}
// 3. 初始化微调状态并运行独立图。
state := schedulerefine.NewScheduleRefineState(traceID, userID, chatID, userMessage, preview)
finalState, runErr := schedulerefine.RunScheduleRefineGraph(ctx, schedulerefine.ScheduleRefineGraphRunInput{
Model: selectedModel,
State: state,
EmitStage: emitStage,
})
if runErr != nil {
return "", runErr
}
if finalState == nil {
return "", errors.New("schedule refine graph returned nil state")
}
// 4. 调用目的:
// 4.1 saveSchedulePlanPreview 目前是“预览缓存 + MySQL 快照”的统一写入口;
// 4.2 这里把 refine state 映射为 scheduleplan state复用已有落盘链路
// 4.3 这样可以保证 create/refine 两条链路写入口径一致,便于后续统一维护。
s.saveSchedulePlanPreview(ctx, userID, chatID, convertRefineStateToPlanState(finalState))
reply := strings.TrimSpace(finalState.FinalSummary)
if reply == "" {
reply = "微调已完成,但本轮未生成总结文案。"
}
return reply, nil
}
// loadSchedulePreviewContext 读取“可用于连续微调”的排程上下文快照。
//
// 步骤化说明:
// 1. 先查 Redis命中则直接返回时延最小
// 2. Redis miss 再查 MySQL保证缓存过期后仍可继续微调
// 3. 若 MySQL 命中且 Redis 可用,顺便回填 Redis提升后续命中率
// 4. 任一步失败仅打日志,不 panic由上层根据返回 nil 做统一处理。
func (s *AgentService) loadSchedulePreviewContext(ctx context.Context, userID int, chatID string) *model.SchedulePlanPreviewCache {
normalizedChatID := strings.TrimSpace(chatID)
if normalizedChatID == "" || userID <= 0 {
return nil
}
if s.cacheDAO != nil {
preview, err := s.cacheDAO.GetSchedulePlanPreviewFromCache(ctx, userID, normalizedChatID)
if err != nil {
log.Printf("读取排程预览缓存失败 chat_id=%s: %v", normalizedChatID, err)
} else if preview != nil {
return preview
}
}
if s.repo == nil {
return nil
}
snapshot, err := s.repo.GetScheduleStateSnapshot(ctx, userID, normalizedChatID)
if err != nil {
log.Printf("读取排程状态快照失败 chat_id=%s: %v", normalizedChatID, err)
return nil
}
if snapshot == nil {
return nil
}
preview := snapshotToSchedulePlanPreviewCache(snapshot)
if preview != nil && s.cacheDAO != nil {
if setErr := s.cacheDAO.SetSchedulePlanPreviewToCache(ctx, userID, normalizedChatID, preview); setErr != nil {
log.Printf("回填排程预览缓存失败 chat_id=%s: %v", normalizedChatID, setErr)
}
}
return preview
}
// convertRefineStateToPlanState 把 schedulerefine 状态映射为 scheduleplan 状态。
//
// 设计意图:
// 1. 复用现有 saveSchedulePlanPreview 写入链路,减少重复落盘代码;
// 2. 仅映射“预览持久化必须字段”,避免把 refine 运行期临时字段带入存储层;
// 3. 后续如要扩展 refine 专属快照字段,可在该映射处集中演进。
func convertRefineStateToPlanState(st *schedulerefine.ScheduleRefineState) *scheduleplan.SchedulePlanState {
if st == nil {
return nil
}
adjustmentScope := "medium"
if st.Contract.Strategy == "keep" {
adjustmentScope = "small"
}
return &scheduleplan.SchedulePlanState{
TraceID: strings.TrimSpace(st.TraceID),
UserID: st.UserID,
ConversationID: strings.TrimSpace(st.ConversationID),
UserIntent: strings.TrimSpace(st.UserIntent),
Constraints: append([]string(nil), st.Constraints...),
TaskClassIDs: append([]int(nil), st.TaskClassIDs...),
Strategy: "steady",
AdjustmentScope: adjustmentScope,
IsAdjustment: true,
HybridEntries: append([]model.HybridScheduleEntry(nil), st.HybridEntries...),
AllocatedItems: cloneTaskClassItems(st.AllocatedItems),
CandidatePlans: cloneWeekSchedules(st.CandidatePlans),
FinalSummary: strings.TrimSpace(st.FinalSummary),
Completed: st.Completed,
}
}