Version: 0.7.3.dev.260322

♻️ refactor(schedule-refine): [WIP] 重构 Plan-and-Execute ReAct 链路,并增强 JSON 解析兜底能力

- 🧩 重构 `schedulerefine` 主流程,引入 `Planner` / `Replan` 机制,以及执行预算与轮次状态管理
- 🧠 扩展状态与观察上下文,补充工具结果、失败签名、连续失败计数与后置反思策略等信息
- 🔧 增强工具层能力与参数兼容性,补齐 `Query` / `Move` / `Swap` / `BatchMove` / `Verify` 等行为及约束校验
- 🛡️ 提升解析鲁棒性,支持从代码块或混杂文本中提取首个 JSON 对象,并增加单次解析重试机制
- 👀 增强可观测性,补充 `debug raw` 阶段输出与分片透传能力
- ✍️ 优化提示词近端约束,将严格 JSON 输出协议追加到各节点 `userPrompt` 末尾

- 🚧 备注:当前链路仍处于持续调优阶段,稳定性与可用性仍需进一步验证
This commit is contained in:
Losita
2026-03-22 22:38:51 +08:00
parent e5b27df80d
commit 525a8b32cb
12 changed files with 3809 additions and 100 deletions

View File

@@ -393,7 +393,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
}
// 3.6 schedule_plan执行智能排程 graph。
if routing.Action == route.ActionSchedulePlan {
if routing.Action == route.ActionSchedulePlanCreate {
reply, planErr := s.runSchedulePlanFlow(requestCtx, selectedModel, userMessage, userID, chatID, traceID, extra, progress.Emit, outChan, resolvedModelName)
if planErr != nil {
log.Printf("智能排程 graph 执行失败,回退普通聊天 trace_id=%s chat_id=%s err=%v", traceID, chatID, planErr)
@@ -412,7 +412,26 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
return
}
// 3.7 未知 action 兜底:走普通聊天,保证可用性
// 3.7 schedule_plan_refine执行“连续微调排程”graph
if routing.Action == route.ActionSchedulePlanRefine {
reply, refineErr := s.runScheduleRefineFlow(requestCtx, selectedModel, userMessage, userID, chatID, traceID, progress.Emit, outChan, resolvedModelName)
if refineErr != nil {
// 连续微调失败不再回落普通聊天,直接上报错误。
pushErrNonBlocking(errChan, refineErr)
return
}
if emitErr := emitSingleAssistantCompletion(outChan, resolvedModelName, reply); emitErr != nil {
pushErrNonBlocking(errChan, emitErr)
return
}
requestTotalTokens := snapshotRequestTokenMeter(requestCtx).TotalTokens
s.persistChatAfterReply(requestCtx, userID, chatID, userMessage, reply, 0, requestTotalTokens, errChan)
s.ensureConversationTitleAsync(userID, chatID)
return
}
// 3.8 未知 action 兜底:走普通聊天,保证可用性。
s.runNormalChatFlow(requestCtx, selectedModel, resolvedModelName, userMessage, ifThinking, userID, chatID, traceID, requestStart, outChan, errChan)
}()

View File

@@ -0,0 +1,154 @@
package agentsvc
import (
"context"
"errors"
"log"
"strings"
"github.com/LoveLosita/smartflow/backend/agent/scheduleplan"
"github.com/LoveLosita/smartflow/backend/agent/schedulerefine"
"github.com/LoveLosita/smartflow/backend/model"
"github.com/LoveLosita/smartflow/backend/respond"
"github.com/cloudwego/eino-ext/components/model/ark"
)
// runScheduleRefineFlow 执行“连续对话微调排程”分支。
//
// 职责边界:
// 1. 负责读取“上一版排程预览快照”(优先 Redis缺失再回源 MySQL
// 2. 负责调用独立 schedulerefine 图链路完成本轮微调;
// 3. 负责把微调结果回写预览缓存与状态快照,供后续继续微调;
// 4. 不负责聊天消息持久化(消息持久化由 AgentChat 主链路统一处理)。
func (s *AgentService) runScheduleRefineFlow(
ctx context.Context,
selectedModel *ark.ChatModel,
userMessage string,
userID int,
chatID string,
traceID string,
emitStage func(stage, detail string),
outChan chan<- string,
modelName string,
) (string, error) {
_ = outChan
_ = modelName
// 1. 依赖预检:模型为空时无法执行任何节点,直接失败避免空指针。
if selectedModel == nil {
return "", errors.New("schedule refine model is nil")
}
emitStage("schedule_refine.context.loading", "正在加载上一版排程上下文。")
// 2. 先查 Redis 预览快照,保证热路径低延迟。
// 2.1 如果 Redis 未命中,再回源 MySQL 快照兜底;
// 2.2 如果两者都没有,说明当前会话没有可微调基础,直接返回业务错误。
preview := s.loadSchedulePreviewContext(ctx, userID, chatID)
if preview == nil {
return "", respond.SchedulePlanPreviewNotFound
}
// 3. 初始化微调状态并运行独立图。
state := schedulerefine.NewScheduleRefineState(traceID, userID, chatID, userMessage, preview)
finalState, runErr := schedulerefine.RunScheduleRefineGraph(ctx, schedulerefine.ScheduleRefineGraphRunInput{
Model: selectedModel,
State: state,
EmitStage: emitStage,
})
if runErr != nil {
return "", runErr
}
if finalState == nil {
return "", errors.New("schedule refine graph returned nil state")
}
// 4. 调用目的:
// 4.1 saveSchedulePlanPreview 目前是“预览缓存 + MySQL 快照”的统一写入口;
// 4.2 这里把 refine state 映射为 scheduleplan state复用已有落盘链路
// 4.3 这样可以保证 create/refine 两条链路写入口径一致,便于后续统一维护。
s.saveSchedulePlanPreview(ctx, userID, chatID, convertRefineStateToPlanState(finalState))
reply := strings.TrimSpace(finalState.FinalSummary)
if reply == "" {
reply = "微调已完成,但本轮未生成总结文案。"
}
return reply, nil
}
// loadSchedulePreviewContext 读取“可用于连续微调”的排程上下文快照。
//
// 步骤化说明:
// 1. 先查 Redis命中则直接返回时延最小
// 2. Redis miss 再查 MySQL保证缓存过期后仍可继续微调
// 3. 若 MySQL 命中且 Redis 可用,顺便回填 Redis提升后续命中率
// 4. 任一步失败仅打日志,不 panic由上层根据返回 nil 做统一处理。
func (s *AgentService) loadSchedulePreviewContext(ctx context.Context, userID int, chatID string) *model.SchedulePlanPreviewCache {
normalizedChatID := strings.TrimSpace(chatID)
if normalizedChatID == "" || userID <= 0 {
return nil
}
if s.cacheDAO != nil {
preview, err := s.cacheDAO.GetSchedulePlanPreviewFromCache(ctx, userID, normalizedChatID)
if err != nil {
log.Printf("读取排程预览缓存失败 chat_id=%s: %v", normalizedChatID, err)
} else if preview != nil {
return preview
}
}
if s.repo == nil {
return nil
}
snapshot, err := s.repo.GetScheduleStateSnapshot(ctx, userID, normalizedChatID)
if err != nil {
log.Printf("读取排程状态快照失败 chat_id=%s: %v", normalizedChatID, err)
return nil
}
if snapshot == nil {
return nil
}
preview := snapshotToSchedulePlanPreviewCache(snapshot)
if preview != nil && s.cacheDAO != nil {
if setErr := s.cacheDAO.SetSchedulePlanPreviewToCache(ctx, userID, normalizedChatID, preview); setErr != nil {
log.Printf("回填排程预览缓存失败 chat_id=%s: %v", normalizedChatID, setErr)
}
}
return preview
}
// convertRefineStateToPlanState 把 schedulerefine 状态映射为 scheduleplan 状态。
//
// 设计意图:
// 1. 复用现有 saveSchedulePlanPreview 写入链路,减少重复落盘代码;
// 2. 仅映射“预览持久化必须字段”,避免把 refine 运行期临时字段带入存储层;
// 3. 后续如要扩展 refine 专属快照字段,可在该映射处集中演进。
func convertRefineStateToPlanState(st *schedulerefine.ScheduleRefineState) *scheduleplan.SchedulePlanState {
if st == nil {
return nil
}
adjustmentScope := "medium"
if st.Contract.Strategy == "keep" {
adjustmentScope = "small"
}
return &scheduleplan.SchedulePlanState{
TraceID: strings.TrimSpace(st.TraceID),
UserID: st.UserID,
ConversationID: strings.TrimSpace(st.ConversationID),
UserIntent: strings.TrimSpace(st.UserIntent),
Constraints: append([]string(nil), st.Constraints...),
TaskClassIDs: append([]int(nil), st.TaskClassIDs...),
Strategy: "steady",
AdjustmentScope: adjustmentScope,
IsAdjustment: true,
HybridEntries: append([]model.HybridScheduleEntry(nil), st.HybridEntries...),
AllocatedItems: cloneTaskClassItems(st.AllocatedItems),
CandidatePlans: cloneWeekSchedules(st.CandidatePlans),
FinalSummary: strings.TrimSpace(st.FinalSummary),
Completed: st.Completed,
}
}