Files
smartmate/backend/agent/scheduleplan/graph.go
Losita e5b27df80d Version: 0.7.2.dev.260322
feat(schedule-plan):  重构智能排程链路并修复粗排双节对齐问题

-  新增“对话级排程状态持久化”能力:引入 `agent_schedule_states` 模型/DAO,并接入启动迁移
-  智能排程图升级:补齐小幅微调(quick refine)分支,完善预算/并发/状态字段流转
-  预览链路增强:完善排程预览服务读写与桥接逻辑,新增本地预览页 `infra/schedule_preview_viewer.html`
- ♻️ 缓存治理统一:将相关缓存处理收口到 DAO + `cache_deleter` 联动清理,移除旧散落逻辑
- 🐛 修复粗排核心 bug:禁止单节降级,强制双节并按 `1-2/3-4/...` 对齐;修复结束日扫描边界问题
-  新增粗排回归测试:覆盖孤立单节、偶数起点双节、Filler 对齐等关键场景
2026-03-22 13:50:10 +08:00

211 lines
7.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package scheduleplan
import (
"context"
"errors"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
const (
// 图节点:意图识别与约束提取
schedulePlanGraphNodePlan = "schedule_plan_plan"
// 图节点:粗排构建(替代旧 preview + hybridBuild
schedulePlanGraphNodeRoughBuild = "schedule_plan_rough_build"
// 图节点:提前退出
schedulePlanGraphNodeExit = "schedule_plan_exit"
// 图节点:按天拆分并注入上下文标签
schedulePlanGraphNodeDailySplit = "schedule_plan_daily_split"
// 图节点:小改动快速微调(用于 small scope
schedulePlanGraphNodeQuickRefine = "schedule_plan_quick_refine"
// 图节点:并发日内优化
schedulePlanGraphNodeDailyRefine = "schedule_plan_daily_refine"
// 图节点:合并日内优化结果
schedulePlanGraphNodeMerge = "schedule_plan_merge"
// 图节点:周级配平优化(单步动作模式,输出阶段状态)
schedulePlanGraphNodeWeeklyRefine = "schedule_plan_weekly_refine"
// 图节点:终审校验
schedulePlanGraphNodeFinalCheck = "schedule_plan_final_check"
// 图节点:返回预览结果(不落库)
schedulePlanGraphNodeReturnPreview = "schedule_plan_return_preview"
)
// SchedulePlanGraphRunInput 是执行“智能排程 graph”所需输入。
//
// 字段说明:
// 1. Extra前端附加参数重点是 task_class_ids
// 2. ChatHistory支持连续对话微调
// 3. OutChan/ModelName保留兼容字段当前 weekly refine 主要输出阶段状态);
// 4. DailyRefineConcurrency/WeeklyAdjustBudget可选运行参数覆盖。
type SchedulePlanGraphRunInput struct {
Model *ark.ChatModel
State *SchedulePlanState
Deps SchedulePlanToolDeps
UserMessage string
Extra map[string]any
ChatHistory []*schema.Message
EmitStage func(stage, detail string)
OutChan chan<- string
ModelName string
DailyRefineConcurrency int
WeeklyAdjustBudget int
}
// RunSchedulePlanGraph 执行“智能排程”图编排。
//
// 当前链路:
// START
// -> plan
// -> roughBuild
// -> (len(task_class_ids)>=2 ? dailySplit -> dailyRefine -> merge : weeklyRefine)
// -> finalCheck
// -> returnPreview
// -> END
//
// 说明:
// 1. exit 分支可从 plan/roughBuild 直接提前终止;
// 2. 本文件只负责“连线与分支”,节点内业务都在 nodes/daily/weekly 文件中。
func RunSchedulePlanGraph(ctx context.Context, input SchedulePlanGraphRunInput) (*SchedulePlanState, error) {
// 1. 启动前硬校验。
if input.Model == nil {
return nil, errors.New("schedule plan graph: model is nil")
}
if input.State == nil {
return nil, errors.New("schedule plan graph: state is nil")
}
if err := input.Deps.validate(); err != nil {
return nil, err
}
// 2. 注入运行时配置(可选覆盖)。
if input.DailyRefineConcurrency > 0 {
input.State.DailyRefineConcurrency = input.DailyRefineConcurrency
}
if input.WeeklyAdjustBudget > 0 {
input.State.WeeklyAdjustBudget = input.WeeklyAdjustBudget
}
emitStage := func(stage, detail string) {
if input.EmitStage != nil {
input.EmitStage(stage, detail)
}
}
runner := newSchedulePlanRunner(
input.Model,
input.Deps,
emitStage,
input.UserMessage,
input.Extra,
input.ChatHistory,
input.OutChan,
input.ModelName,
input.State.DailyRefineConcurrency,
)
graph := compose.NewGraph[*SchedulePlanState, *SchedulePlanState]()
// 3. 注册节点。
if err := graph.AddLambdaNode(schedulePlanGraphNodePlan, compose.InvokableLambda(runner.planNode)); err != nil {
return nil, err
}
if err := graph.AddLambdaNode(schedulePlanGraphNodeRoughBuild, compose.InvokableLambda(runner.roughBuildNode)); err != nil {
return nil, err
}
if err := graph.AddLambdaNode(schedulePlanGraphNodeExit, compose.InvokableLambda(runner.exitNode)); err != nil {
return nil, err
}
if err := graph.AddLambdaNode(schedulePlanGraphNodeDailySplit, compose.InvokableLambda(runner.dailySplitNode)); err != nil {
return nil, err
}
if err := graph.AddLambdaNode(schedulePlanGraphNodeQuickRefine, compose.InvokableLambda(runner.quickRefineNode)); err != nil {
return nil, err
}
if err := graph.AddLambdaNode(schedulePlanGraphNodeDailyRefine, compose.InvokableLambda(runner.dailyRefineNode)); err != nil {
return nil, err
}
if err := graph.AddLambdaNode(schedulePlanGraphNodeMerge, compose.InvokableLambda(runner.mergeNode)); err != nil {
return nil, err
}
if err := graph.AddLambdaNode(schedulePlanGraphNodeWeeklyRefine, compose.InvokableLambda(runner.weeklyRefineNode)); err != nil {
return nil, err
}
if err := graph.AddLambdaNode(schedulePlanGraphNodeFinalCheck, compose.InvokableLambda(runner.finalCheckNode)); err != nil {
return nil, err
}
if err := graph.AddLambdaNode(schedulePlanGraphNodeReturnPreview, compose.InvokableLambda(runner.returnPreviewNode)); err != nil {
return nil, err
}
// 4. 连线START -> plan
if err := graph.AddEdge(compose.START, schedulePlanGraphNodePlan); err != nil {
return nil, err
}
// 5. plan 分支roughBuild | exit
if err := graph.AddBranch(schedulePlanGraphNodePlan, compose.NewGraphBranch(
runner.nextAfterPlan,
map[string]bool{
schedulePlanGraphNodeRoughBuild: true,
schedulePlanGraphNodeExit: true,
},
)); err != nil {
return nil, err
}
// 6. roughBuild 分支dailySplit | weeklyRefine | exit
if err := graph.AddBranch(schedulePlanGraphNodeRoughBuild, compose.NewGraphBranch(
runner.nextAfterRoughBuild,
map[string]bool{
schedulePlanGraphNodeDailySplit: true,
schedulePlanGraphNodeQuickRefine: true,
schedulePlanGraphNodeWeeklyRefine: true,
schedulePlanGraphNodeExit: true,
},
)); err != nil {
return nil, err
}
// 7. 固定边quickRefine -> weeklyRefinedailySplit -> dailyRefine -> merge -> weeklyRefine -> finalCheck -> returnPreview -> END
if err := graph.AddEdge(schedulePlanGraphNodeQuickRefine, schedulePlanGraphNodeWeeklyRefine); err != nil {
return nil, err
}
if err := graph.AddEdge(schedulePlanGraphNodeDailySplit, schedulePlanGraphNodeDailyRefine); err != nil {
return nil, err
}
if err := graph.AddEdge(schedulePlanGraphNodeDailyRefine, schedulePlanGraphNodeMerge); err != nil {
return nil, err
}
if err := graph.AddEdge(schedulePlanGraphNodeMerge, schedulePlanGraphNodeWeeklyRefine); err != nil {
return nil, err
}
if err := graph.AddEdge(schedulePlanGraphNodeWeeklyRefine, schedulePlanGraphNodeFinalCheck); err != nil {
return nil, err
}
if err := graph.AddEdge(schedulePlanGraphNodeFinalCheck, schedulePlanGraphNodeReturnPreview); err != nil {
return nil, err
}
if err := graph.AddEdge(schedulePlanGraphNodeReturnPreview, compose.END); err != nil {
return nil, err
}
if err := graph.AddEdge(schedulePlanGraphNodeExit, compose.END); err != nil {
return nil, err
}
// 8. 编译并执行。
// 路径最多约 8~9 个节点,保守预留 20 步避免误判。
runnable, err := graph.Compile(ctx,
compose.WithGraphName("SchedulePlanGraph"),
compose.WithMaxRunSteps(20),
compose.WithNodeTriggerMode(compose.AnyPredecessor),
)
if err != nil {
return nil, err
}
return runnable.Invoke(ctx, input.State)
}