Version: 0.7.2.dev.260322

feat(schedule-plan):  重构智能排程链路并修复粗排双节对齐问题

-  新增“对话级排程状态持久化”能力:引入 `agent_schedule_states` 模型/DAO,并接入启动迁移
-  智能排程图升级:补齐小幅微调(quick refine)分支,完善预算/并发/状态字段流转
-  预览链路增强:完善排程预览服务读写与桥接逻辑,新增本地预览页 `infra/schedule_preview_viewer.html`
- ♻️ 缓存治理统一:将相关缓存处理收口到 DAO + `cache_deleter` 联动清理,移除旧散落逻辑
- 🐛 修复粗排核心 bug:禁止单节降级,强制双节并按 `1-2/3-4/...` 对齐;修复结束日扫描边界问题
-  新增粗排回归测试:覆盖孤立单节、偶数起点双节、Filler 对齐等关键场景
This commit is contained in:
Losita
2026-03-22 13:50:10 +08:00
parent f3f9902e93
commit e5b27df80d
20 changed files with 1961 additions and 166 deletions

View File

@@ -18,8 +18,8 @@ type AgentService = agentsvc.AgentService
// 说明:
// 1) 外部调用签名不变,新增排程依赖通过可选方式注入(见 NewAgentServiceWithSchedule
// 2) 真实构造逻辑已下沉到 service/agentsvc 包。
func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, agentRedis *dao.AgentCache, eventPublisher outboxinfra.EventPublisher) *AgentService {
return agentsvc.NewAgentService(aiHub, repo, taskRepo, agentRedis, eventPublisher)
func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, cacheDAO *dao.CacheDAO, agentRedis *dao.AgentCache, eventPublisher outboxinfra.EventPublisher) *AgentService {
return agentsvc.NewAgentService(aiHub, repo, taskRepo, cacheDAO, agentRedis, eventPublisher)
}
// NewAgentServiceWithSchedule 在基础 AgentService 上注入排程依赖。
@@ -32,11 +32,12 @@ func NewAgentServiceWithSchedule(
aiHub *inits.AIHub,
repo *dao.AgentDAO,
taskRepo *dao.TaskDAO,
cacheDAO *dao.CacheDAO,
agentRedis *dao.AgentCache,
eventPublisher outboxinfra.EventPublisher,
scheduleSvc *ScheduleService,
) *AgentService {
svc := agentsvc.NewAgentService(aiHub, repo, taskRepo, agentRedis, eventPublisher)
svc := agentsvc.NewAgentService(aiHub, repo, taskRepo, cacheDAO, agentRedis, eventPublisher)
// 注入排程依赖:将 service 层方法包装为函数闭包,避免循环依赖。
if scheduleSvc != nil {

View File

@@ -25,6 +25,7 @@ type AgentService struct {
AIHub *inits.AIHub
repo *dao.AgentDAO
taskRepo *dao.TaskDAO
cacheDAO *dao.CacheDAO
agentCache *dao.AgentCache
eventPublisher outboxinfra.EventPublisher
@@ -49,7 +50,7 @@ type AgentService struct {
// NewAgentService 构造 AgentService。
// 这里通过依赖注入把“模型、仓储、缓存、异步持久化通道”统一交给服务层管理,
// 便于后续在单测中替换实现,或在启动流程中按环境切换配置。
func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, agentRedis *dao.AgentCache, eventPublisher outboxinfra.EventPublisher) *AgentService {
func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, cacheDAO *dao.CacheDAO, agentRedis *dao.AgentCache, eventPublisher outboxinfra.EventPublisher) *AgentService {
// 全局注册一次 token 采集 callback
// 1. 只注册一次,避免重复处理;
// 2. 只有带 RequestTokenMeter 的请求上下文才会真正累加。
@@ -59,6 +60,7 @@ func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskD
AIHub: aiHub,
repo: repo,
taskRepo: taskRepo,
cacheDAO: cacheDAO,
agentCache: agentRedis,
eventPublisher: eventPublisher,
}

View File

@@ -51,17 +51,34 @@ func (s *AgentService) runSchedulePlanFlow(
// 2.1.2 先读可让本轮在内存中复用上轮 HybridEntries。
// 2.2 清理旧 key 仍然保留,避免前端在本轮进行中误读到旧结果。
var previousPreview *model.SchedulePlanPreviewCache
if s.agentCache != nil {
preview, getErr := s.agentCache.GetSchedulePlanPreview(ctx, userID, chatID)
if s.cacheDAO != nil {
preview, getErr := s.cacheDAO.GetSchedulePlanPreviewFromCache(ctx, userID, chatID)
if getErr != nil {
log.Printf("读取上一版排程预览失败 chat_id=%s: %v", chatID, getErr)
} else {
previousPreview = preview
}
if delErr := s.agentCache.DeleteSchedulePlanPreview(ctx, userID, chatID); delErr != nil {
if delErr := s.cacheDAO.DeleteSchedulePlanPreviewFromCache(ctx, userID, chatID); delErr != nil {
log.Printf("清理旧排程预览失败 chat_id=%s: %v", chatID, delErr)
}
}
// 2.3 Redis miss 时回落 MySQL 快照:
// 2.3.1 目的:即使 Redis TTL 过期,也能延续同会话微调语境;
// 2.3.2 回填:命中 DB 后尝试回填 Redis提高后续读取命中率
// 2.3.3 失败策略DB 读取异常只打日志,链路继续按“无历史快照”执行。
if previousPreview == nil && s.repo != nil {
snapshot, snapshotErr := s.repo.GetScheduleStateSnapshot(ctx, userID, chatID)
if snapshotErr != nil {
log.Printf("从 MySQL 读取排程快照失败 chat_id=%s: %v", chatID, snapshotErr)
} else if snapshot != nil {
previousPreview = snapshotToSchedulePlanPreviewCache(snapshot)
if s.cacheDAO != nil && previousPreview != nil {
if setErr := s.cacheDAO.SetSchedulePlanPreviewToCache(ctx, userID, chatID, previousPreview); setErr != nil {
log.Printf("回填排程预览缓存失败 chat_id=%s: %v", chatID, setErr)
}
}
}
}
// 3. 读取对话历史:先快后稳。
// 3.1 先查 Redis命中则避免回源 DB降低请求时延。
@@ -99,6 +116,7 @@ func (s *AgentService) runSchedulePlanFlow(
state.PreviousTaskClassIDs = append([]int(nil), previousPreview.TaskClassIDs...)
state.PreviousHybridEntries = cloneHybridEntries(previousPreview.HybridEntries)
state.PreviousAllocatedItems = cloneTaskClassItems(previousPreview.AllocatedItems)
state.PreviousCandidatePlans = cloneWeekSchedules(previousPreview.CandidatePlans)
}
finalState, runErr := scheduleplan.RunSchedulePlanGraph(ctx, scheduleplan.SchedulePlanGraphRunInput{
Model: selectedModel,

View File

@@ -19,8 +19,8 @@ import (
// 2. 负责以“失败不阻断聊天主链路”的策略执行写入;
// 3. 不负责 SSE 返回协议,不负责数据库落库。
func (s *AgentService) saveSchedulePlanPreview(ctx context.Context, userID int, chatID string, finalState *scheduleplan.SchedulePlanState) {
// 1. 基础前置校验:任何关键依赖缺失都直接返回,避免产生无意义错误日志
if s == nil || s.agentCache == nil || finalState == nil {
// 1. 基础前置校验:state 为空时直接返回,避免写入半成品快照
if s == nil || finalState == nil {
return
}
normalizedChatID := strings.TrimSpace(chatID)
@@ -48,11 +48,24 @@ func (s *AgentService) saveSchedulePlanPreview(ctx context.Context, userID int,
GeneratedAt: time.Now(),
}
// 3. 尝试写入缓存:
// 3.1 写入失败仅打日志,不上抛错误,保证聊天接口协议与可用性不受影响
// 3.2 兜底策略是“用户仍可收到文本摘要”,只是暂时无法通过新接口拉取结构化预览
if err := s.agentCache.SetSchedulePlanPreview(ctx, userID, normalizedChatID, preview); err != nil {
log.Printf("写入排程预览缓存失败 chat_id=%s: %v", normalizedChatID, err)
// 3. 调用目的:先写 Redis 预览,保证前端查询接口能快速读取结构化结果。
// 3.1 Redis 是“快路径”;失败只记录日志,不中断主链路
// 3.2 失败兜底由后续 MySQL 快照承接
if s.cacheDAO != nil {
if err := s.cacheDAO.SetSchedulePlanPreviewToCache(ctx, userID, normalizedChatID, preview); err != nil {
log.Printf("写入排程预览缓存失败 chat_id=%s: %v", normalizedChatID, err)
}
}
// 4. 调用目的:同步写 MySQL 状态快照,保证 Redis 失效后仍可连续微调。
// 4.1 这里采用“同步写库”而不是 outbox因为下一轮微调要强实时读取
// 4.2 快照写入失败只打日志,不阻断本轮用户回复,避免体验抖动;
// 4.3 revision 自增由 DAO 的 upsert 冲突更新负责。
if s.repo != nil {
snapshot := buildSchedulePlanSnapshotFromState(userID, normalizedChatID, finalState)
if err := s.repo.UpsertScheduleStateSnapshot(ctx, snapshot); err != nil {
log.Printf("写入排程状态快照失败 chat_id=%s: %v", normalizedChatID, err)
}
}
}
@@ -68,37 +81,58 @@ func (s *AgentService) GetSchedulePlanPreview(ctx context.Context, userID int, c
if normalizedChatID == "" {
return nil, respond.MissingParam
}
if s == nil || s.agentCache == nil {
return nil, errors.New("agent cache is not initialized")
if s == nil {
return nil, errors.New("agent service is not initialized")
}
// 2. 查询缓存并校验归属:
// 2.1 缓存未命中:统一返回“预览不存在/已过期”;
// 2.2 命中但 user_id 不一致:按未命中处理,避免泄露他人会话信息;
// 2.3 失败兜底:缓存读异常直接上抛,由 API 层统一错误处理。
preview, err := s.agentCache.GetSchedulePlanPreview(ctx, userID, normalizedChatID)
if err != nil {
return nil, err
}
if preview == nil {
return nil, respond.SchedulePlanPreviewNotFound
}
if preview.UserID > 0 && preview.UserID != userID {
return nil, respond.SchedulePlanPreviewNotFound
if s.cacheDAO != nil {
preview, err := s.cacheDAO.GetSchedulePlanPreviewFromCache(ctx, userID, normalizedChatID)
if err != nil {
return nil, err
}
if preview != nil {
if preview.UserID > 0 && preview.UserID != userID {
return nil, respond.SchedulePlanPreviewNotFound
}
plans := cloneWeekSchedules(preview.CandidatePlans)
if plans == nil {
plans = make([]model.UserWeekSchedule, 0)
}
return &model.GetSchedulePlanPreviewResponse{
ConversationID: normalizedChatID,
TraceID: strings.TrimSpace(preview.TraceID),
Summary: strings.TrimSpace(preview.Summary),
CandidatePlans: plans,
GeneratedAt: preview.GeneratedAt,
}, nil
}
}
// 3. 映射响应结构,保证输出字段稳定。
plans := cloneWeekSchedules(preview.CandidatePlans)
if plans == nil {
plans = make([]model.UserWeekSchedule, 0)
// 3. Redis 未命中时回落 MySQL 快照:
// 3.1 读取成功后直接返回,避免用户看到“预览不存在”的假阴性;
// 3.2 若本次命中 DB 且缓存可用,则顺手回填 Redis提升后续命中率
// 3.3 DB 也未命中时再返回 not found。
if s.repo != nil {
snapshot, err := s.repo.GetScheduleStateSnapshot(ctx, userID, normalizedChatID)
if err != nil {
return nil, err
}
if snapshot != nil {
response := snapshotToSchedulePlanPreviewResponse(snapshot)
if s.cacheDAO != nil {
cachePreview := snapshotToSchedulePlanPreviewCache(snapshot)
if setErr := s.cacheDAO.SetSchedulePlanPreviewToCache(ctx, userID, normalizedChatID, cachePreview); setErr != nil {
log.Printf("回填排程预览缓存失败 chat_id=%s: %v", normalizedChatID, setErr)
}
}
return response, nil
}
}
return &model.GetSchedulePlanPreviewResponse{
ConversationID: normalizedChatID,
TraceID: strings.TrimSpace(preview.TraceID),
Summary: strings.TrimSpace(preview.Summary),
CandidatePlans: plans,
GeneratedAt: preview.GeneratedAt,
}, nil
return nil, respond.SchedulePlanPreviewNotFound
}
// cloneWeekSchedules 对周视图排程结果做深拷贝,避免切片引用共享。
@@ -160,3 +194,84 @@ func cloneTaskClassItems(src []model.TaskClassItem) []model.TaskClassItem {
}
return dst
}
// buildSchedulePlanSnapshotFromState 把 graph 运行结果映射成可持久化快照 DTO。
//
// 职责边界:
// 1. 负责字段映射与深拷贝,避免跨层共享可变切片;
// 2. 负责补齐 state_version 默认值;
// 3. 不负责数据库写入(写入由 DAO 承担)。
func buildSchedulePlanSnapshotFromState(userID int, conversationID string, st *scheduleplan.SchedulePlanState) *model.SchedulePlanStateSnapshot {
if st == nil {
return nil
}
return &model.SchedulePlanStateSnapshot{
UserID: userID,
ConversationID: conversationID,
StateVersion: model.SchedulePlanStateVersionV1,
TaskClassIDs: append([]int(nil), st.TaskClassIDs...),
Constraints: append([]string(nil), st.Constraints...),
HybridEntries: cloneHybridEntries(st.HybridEntries),
AllocatedItems: cloneTaskClassItems(st.AllocatedItems),
CandidatePlans: cloneWeekSchedules(st.CandidatePlans),
UserIntent: strings.TrimSpace(st.UserIntent),
Strategy: strings.TrimSpace(st.Strategy),
AdjustmentScope: strings.TrimSpace(st.AdjustmentScope),
RestartRequested: st.RestartRequested,
FinalSummary: strings.TrimSpace(st.FinalSummary),
Completed: st.Completed,
TraceID: strings.TrimSpace(st.TraceID),
}
}
// snapshotToSchedulePlanPreviewCache 把 MySQL 快照转换为 Redis 预览缓存结构。
func snapshotToSchedulePlanPreviewCache(snapshot *model.SchedulePlanStateSnapshot) *model.SchedulePlanPreviewCache {
if snapshot == nil {
return nil
}
summary := strings.TrimSpace(snapshot.FinalSummary)
if summary == "" {
summary = "排程流程已完成,但未生成结果摘要。"
}
generatedAt := snapshot.UpdatedAt
if generatedAt.IsZero() {
generatedAt = time.Now()
}
return &model.SchedulePlanPreviewCache{
UserID: snapshot.UserID,
ConversationID: snapshot.ConversationID,
TraceID: strings.TrimSpace(snapshot.TraceID),
Summary: summary,
CandidatePlans: cloneWeekSchedules(snapshot.CandidatePlans),
TaskClassIDs: append([]int(nil), snapshot.TaskClassIDs...),
HybridEntries: cloneHybridEntries(snapshot.HybridEntries),
AllocatedItems: cloneTaskClassItems(snapshot.AllocatedItems),
GeneratedAt: generatedAt,
}
}
// snapshotToSchedulePlanPreviewResponse 把 MySQL 快照转换为查询接口响应。
func snapshotToSchedulePlanPreviewResponse(snapshot *model.SchedulePlanStateSnapshot) *model.GetSchedulePlanPreviewResponse {
if snapshot == nil {
return nil
}
plans := cloneWeekSchedules(snapshot.CandidatePlans)
if plans == nil {
plans = make([]model.UserWeekSchedule, 0)
}
summary := strings.TrimSpace(snapshot.FinalSummary)
if summary == "" {
summary = "排程流程已完成,但未生成结果摘要。"
}
generatedAt := snapshot.UpdatedAt
if generatedAt.IsZero() {
generatedAt = time.Now()
}
return &model.GetSchedulePlanPreviewResponse{
ConversationID: snapshot.ConversationID,
TraceID: strings.TrimSpace(snapshot.TraceID),
Summary: summary,
CandidatePlans: plans,
GeneratedAt: generatedAt,
}
}