diff --git a/backend/active_scheduler/feedbacklocate/dto.go b/backend/active_scheduler/feedbacklocate/dto.go new file mode 100644 index 0000000..e88b848 --- /dev/null +++ b/backend/active_scheduler/feedbacklocate/dto.go @@ -0,0 +1,84 @@ +package feedbacklocate + +import "strings" + +const ( + // ActionSelectCandidate 表示模型已经把补充信息定位到某个 schedule_event。 + ActionSelectCandidate = "select_candidate" + // ActionAskUser 表示模型无法稳定定位,需要继续追问用户。 + ActionAskUser = "ask_user" + + // TargetTypeScheduleEvent 是本阶段允许返回的唯一目标类型。 + TargetTypeScheduleEvent = "schedule_event" +) + +// Request 是反馈定位节点的最小输入。 +// +// 职责边界: +// 1. 只承载定位当前补充信息所需的上下文,不携带正式排程写入能力。 +// 2. 不负责候选筛选或 preview 落库,最终只返回“定位成功”或“继续追问”。 +type Request struct { + UserID int + UserMessage string + PendingQuestion string + MissingInfo []string +} + +// Result 是反馈定位节点的最小输出。 +// +// 职责边界: +// 1. 只表达“是否已经定位到 schedule_event”以及“是否需要继续 ask_user”。 +// 2. 不携带正式日程写入结果,也不直接产出 preview。 +type Result struct { + Action string + TargetType string + TargetID int + Reason string + AskUserQuestion string +} + +// IsResolved 表示本次定位是否已经拿到可校验的 schedule_event。 +// +// 输入输出语义: +// 1. 只有 action=select_candidate 且 target_type=schedule_event 且 target_id>0 才算成功。 +// 2. 其余情况都视为需要继续 ask_user。 +func (r Result) IsResolved() bool { + return strings.EqualFold(strings.TrimSpace(r.Action), ActionSelectCandidate) && + strings.EqualFold(strings.TrimSpace(r.TargetType), TargetTypeScheduleEvent) && + r.TargetID > 0 +} + +// ShouldAskUser 表示本次定位是否应该回退为追问。 +func (r Result) ShouldAskUser() bool { + return !r.IsResolved() +} + +type promptInput struct { + GeneratedAt string `json:"generated_at"` + UserMessage string `json:"user_message"` + PendingQuestion string `json:"pending_question,omitempty"` + MissingInfo []string `json:"missing_info,omitempty"` + Window promptWindowInput `json:"window"` + Candidates []eventCandidate `json:"candidates"` +} + +type promptWindowInput struct { + StartAt string `json:"start_at"` + EndAt string `json:"end_at"` +} + +type eventCandidate struct { + TargetID int `json:"target_id"` + Title string `json:"title"` + SourceType string `json:"source_type,omitempty"` + RelatedID int `json:"related_id,omitempty"` + SlotSummary string `json:"slot_summary,omitempty"` +} + +type llmResponse struct { + Action string `json:"action"` + TargetType string `json:"target_type"` + TargetID int `json:"target_id"` + Reason string `json:"reason"` + AskUserQuestion string `json:"ask_user_question"` +} diff --git a/backend/active_scheduler/feedbacklocate/prompt.go b/backend/active_scheduler/feedbacklocate/prompt.go new file mode 100644 index 0000000..225f820 --- /dev/null +++ b/backend/active_scheduler/feedbacklocate/prompt.go @@ -0,0 +1,69 @@ +package feedbacklocate + +import ( + "encoding/json" + "strings" +) + +const locateSystemPrompt = ` +你是 SmartFlow 主动调度里专门负责 unfinished_feedback 的定位器。 +你的任务只有一个:根据用户补充的话,把它定位到当前滚动窗口中的某一个 schedule_event;定位不了就继续 ask_user。 + +硬规则: +1. 只允许输出 JSON,不要输出 markdown,不要输出解释性正文。 +2. 只允许返回 action / target_type / target_id / reason / ask_user_question 这几个字段。 +3. target_type 只能是 schedule_event。 +4. target_id 必须来自候选列表里的 target_id,不要编造,不要猜一个新的。 +5. 当你不能稳定定位时,action 必须是 ask_user,并给出一句短问题。 +6. 当用户补充信息已经足够时,action 必须是 select_candidate。 +7. 请优先结合当前时间、用户原始补充话术、pending question 和候选日程的时间顺序来判断。 +` + +func buildPromptInput(req Request, generatedAt string, windowStart string, windowEnd string, candidates []eventCandidate) promptInput { + input := promptInput{ + GeneratedAt: generatedAt, + UserMessage: strings.TrimSpace(req.UserMessage), + Window: promptWindowInput{ + StartAt: windowStart, + EndAt: windowEnd, + }, + } + + if trimmed := strings.TrimSpace(req.PendingQuestion); trimmed != "" { + input.PendingQuestion = trimmed + } + if len(req.MissingInfo) > 0 { + input.MissingInfo = cloneAndTrimStrings(req.MissingInfo) + } + input.Candidates = append([]eventCandidate(nil), candidates...) + return input +} + +func buildUserPrompt(input promptInput) (string, error) { + raw, err := json.MarshalIndent(input, "", " ") + if err != nil { + return "", err + } + + var builder strings.Builder + builder.WriteString("请根据输入定位当前滚动窗口中的 schedule_event。") + builder.WriteString("只输出 JSON,不要补充任何其它内容。\n") + builder.WriteString("输入:\n") + builder.WriteString(string(raw)) + return builder.String(), nil +} + +// BuildAskUserQuestion 负责把 missing_info 转成继续追问用户的短问题。 +func BuildAskUserQuestion(missingInfo []string) string { + normalized := cloneAndTrimStrings(missingInfo) + if len(normalized) == 0 { + return "请补充能唯一定位到未完成日程块的信息。" + } + + for _, item := range normalized { + if item == "feedback_target" { + return "请告诉我你指的是哪一个未完成的日程块,比如具体时间或名称。" + } + } + return "请补充 " + strings.Join(normalized, "、") + " 对应的信息。" +} diff --git a/backend/active_scheduler/feedbacklocate/service.go b/backend/active_scheduler/feedbacklocate/service.go new file mode 100644 index 0000000..6fa5897 --- /dev/null +++ b/backend/active_scheduler/feedbacklocate/service.go @@ -0,0 +1,367 @@ +package feedbacklocate + +import ( + "context" + "errors" + "fmt" + "log" + "sort" + "strings" + "time" + + "github.com/LoveLosita/smartflow/backend/active_scheduler/ports" + "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" + infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" +) + +const locateMaxTokens = 800 + +// Service 负责把 unfinished_feedback 的补充话术定位到当前滚动窗口内的 schedule_event。 +// +// 职责边界: +// 1. 只做“定位”与“继续追问”的判断,不负责正式日程写入。 +// 2. 候选来自 ScheduleReader,JSON 判定来自 LLM,二者任一不可用时都回退为 ask_user。 +// 3. 不创建新工具系统,也不直接产出 preview。 +type Service struct { + reader ports.ScheduleReader + client *infrallm.Client + clock func() time.Time + logger *log.Logger +} + +// NewService 创建反馈定位服务。 +// +// 说明: +// 1. reader / client 允许为空,方便在模型不可用或读模型暂时不可用时直接回退 ask_user。 +// 2. 真正的定位能力只在 Resolve 内部按需启用。 +func NewService(reader ports.ScheduleReader, client *infrallm.Client) *Service { + return &Service{ + reader: reader, + client: client, + clock: time.Now, + logger: log.Default(), + } +} + +// SetClock 允许测试注入稳定时间。 +func (s *Service) SetClock(clock func() time.Time) { + if s != nil && clock != nil { + s.clock = clock + } +} + +// SetLogger 允许外部替换日志器。 +func (s *Service) SetLogger(logger *log.Logger) { + if s != nil && logger != nil { + s.logger = logger + } +} + +// Resolve 负责把用户补充信息定位到当前滚动窗口中的一个 schedule_event。 +// +// 输入输出语义: +// 1. 成功时返回 action=select_candidate,且 target_type=schedule_event、target_id 可校验。 +// 2. 失败时不硬猜,统一返回 action=ask_user。 +// 3. 只有上下文取消这类外部中断才会返回 error。 +func (s *Service) Resolve(ctx context.Context, req Request) (Result, error) { + if ctx == nil { + ctx = context.Background() + } + if err := ctx.Err(); err != nil { + return Result{}, err + } + if req.UserID <= 0 { + return Result{}, errors.New("feedback locate user_id 不能为空") + } + + now := s.now() + windowStart := now + windowEnd := now.Add(24 * time.Hour) + + candidates, err := s.loadCandidates(ctx, req.UserID, windowStart, windowEnd, now) + if err != nil { + return s.buildAskUserResult(req, "读取滚动窗口日程失败: "+err.Error()), nil + } + if len(candidates) == 0 { + return s.buildAskUserResult(req, "当前滚动窗口内没有可定位的日程块"), nil + } + + if s == nil || s.client == nil { + return s.buildAskUserResult(req, "模型暂不可用"), nil + } + + userPrompt, err := buildUserPrompt(buildPromptInput( + req, + now.In(time.Local).Format(time.RFC3339), + windowStart.In(time.Local).Format(time.RFC3339), + windowEnd.In(time.Local).Format(time.RFC3339), + candidates, + )) + if err != nil { + return s.buildAskUserResult(req, "定位 prompt 构造失败"), nil + } + + messages := infrallm.BuildSystemUserMessages(strings.TrimSpace(locateSystemPrompt), nil, userPrompt) + resp, rawResult, err := infrallm.GenerateJSON[llmResponse]( + ctx, + s.client, + messages, + infrallm.GenerateOptions{ + Temperature: 0.1, + MaxTokens: locateMaxTokens, + Thinking: infrallm.ThinkingModeDisabled, + Metadata: map[string]any{ + "stage": "active_scheduler_feedback_locate", + "candidate_count": len(candidates), + }, + }, + ) + if err != nil { + if s.logger != nil { + s.logger.Printf("[WARN] active scheduler feedback locate failed: err=%v raw=%s", err, truncateRaw(rawResult)) + } + return s.buildAskUserResult(req, "模型定位失败"), nil + } + + result, fallbackUsed := s.convertResponse(req, resp, candidates) + if fallbackUsed && s.logger != nil { + selectedID := 0 + action := "" + targetType := "" + if resp != nil { + selectedID = resp.TargetID + action = strings.TrimSpace(resp.Action) + targetType = strings.TrimSpace(resp.TargetType) + } + s.logger.Printf("[WARN] active scheduler feedback locate fallback: action=%q target_type=%q target_id=%d", action, targetType, selectedID) + } + return result, nil +} + +func (s *Service) convertResponse(req Request, resp *llmResponse, candidates []eventCandidate) (Result, bool) { + if resp == nil { + return s.buildAskUserResult(req, "模型返回空结果"), true + } + + candidateMap := make(map[int]eventCandidate, len(candidates)) + for _, item := range candidates { + candidateMap[item.TargetID] = item + } + + action := normalizeAction(resp.Action) + targetType := strings.TrimSpace(resp.TargetType) + targetID := resp.TargetID + reason := strings.TrimSpace(resp.Reason) + askUserQuestion := strings.TrimSpace(resp.AskUserQuestion) + + if action == ActionSelectCandidate && + strings.EqualFold(targetType, TargetTypeScheduleEvent) && + targetID > 0 { + if _, ok := candidateMap[targetID]; ok { + return Result{ + Action: ActionSelectCandidate, + TargetType: TargetTypeScheduleEvent, + TargetID: targetID, + Reason: reason, + AskUserQuestion: "", + }, false + } + } + + question := firstNonEmptyString( + askUserQuestion, + BuildAskUserQuestion(req.MissingInfo), + req.PendingQuestion, + ) + return Result{ + Action: ActionAskUser, + TargetType: TargetTypeScheduleEvent, + TargetID: 0, + Reason: reason, + AskUserQuestion: question, + }, true +} + +func (s *Service) buildAskUserResult(req Request, reason string) Result { + return Result{ + Action: ActionAskUser, + TargetType: TargetTypeScheduleEvent, + TargetID: 0, + Reason: strings.TrimSpace(reason), + AskUserQuestion: firstNonEmptyString(BuildAskUserQuestion(req.MissingInfo), req.PendingQuestion), + } +} + +func (s *Service) loadCandidates(ctx context.Context, userID int, windowStart time.Time, windowEnd time.Time, now time.Time) ([]eventCandidate, error) { + if s == nil || s.reader == nil { + return nil, nil + } + + facts, err := s.reader.GetScheduleFactsByWindow(ctx, ports.ScheduleWindowRequest{ + UserID: userID, + TargetType: string(trigger.TargetTypeScheduleEvent), + TargetID: 0, + WindowStart: windowStart, + WindowEnd: windowEnd, + Now: now, + }) + if err != nil { + return nil, err + } + return buildEventCandidates(facts.Events), nil +} + +func buildEventCandidates(events []ports.ScheduleEventFact) []eventCandidate { + if len(events) == 0 { + return nil + } + + sorted := append([]ports.ScheduleEventFact(nil), events...) + sort.SliceStable(sorted, func(i, j int) bool { + return eventBefore(sorted[i], sorted[j]) + }) + + candidates := make([]eventCandidate, 0, len(sorted)) + for _, item := range sorted { + candidates = append(candidates, eventCandidate{ + TargetID: item.ID, + Title: strings.TrimSpace(item.Title), + SourceType: strings.TrimSpace(item.SourceType), + RelatedID: item.RelID, + SlotSummary: summarizeSlots(item.Slots), + }) + } + return candidates +} + +func eventBefore(left, right ports.ScheduleEventFact) bool { + leftStart := firstSlotStart(left.Slots) + rightStart := firstSlotStart(right.Slots) + if !leftStart.IsZero() && !rightStart.IsZero() && !leftStart.Equal(rightStart) { + return leftStart.Before(rightStart) + } + if left.ID != right.ID { + return left.ID < right.ID + } + return strings.TrimSpace(left.Title) < strings.TrimSpace(right.Title) +} + +func firstSlotStart(slots []ports.Slot) time.Time { + if len(slots) == 0 { + return time.Time{} + } + sorted := append([]ports.Slot(nil), slots...) + sort.SliceStable(sorted, func(i, j int) bool { + return slotBefore(sorted[i], sorted[j]) + }) + return sorted[0].StartAt +} + +func slotBefore(left, right ports.Slot) bool { + if !left.StartAt.IsZero() && !right.StartAt.IsZero() && !left.StartAt.Equal(right.StartAt) { + return left.StartAt.Before(right.StartAt) + } + if left.Week != right.Week { + return left.Week < right.Week + } + if left.DayOfWeek != right.DayOfWeek { + return left.DayOfWeek < right.DayOfWeek + } + return left.Section < right.Section +} + +func summarizeSlots(slots []ports.Slot) string { + if len(slots) == 0 { + return "" + } + + sorted := append([]ports.Slot(nil), slots...) + sort.SliceStable(sorted, func(i, j int) bool { + return slotBefore(sorted[i], sorted[j]) + }) + + parts := make([]string, 0, minInt(3, len(sorted))) + for idx, slot := range sorted { + if idx >= 3 { + break + } + parts = append(parts, summarizeSlot(slot)) + } + if len(sorted) > 3 { + parts = append(parts, "...") + } + return strings.Join(parts, ";") +} + +func summarizeSlot(slot ports.Slot) string { + if !slot.StartAt.IsZero() && !slot.EndAt.IsZero() { + return fmt.Sprintf("%s-%s", slot.StartAt.In(time.Local).Format("01-02 15:04"), slot.EndAt.In(time.Local).Format("15:04")) + } + return fmt.Sprintf("W%d-D%d-S%d", slot.Week, slot.DayOfWeek, slot.Section) +} + +func normalizeAction(raw string) string { + switch strings.ToLower(strings.TrimSpace(raw)) { + case ActionSelectCandidate: + return ActionSelectCandidate + case ActionAskUser: + return ActionAskUser + default: + return "" + } +} + +func firstNonEmptyString(values ...string) string { + for _, value := range values { + if trimmed := strings.TrimSpace(value); trimmed != "" { + return trimmed + } + } + return "" +} + +func cloneAndTrimStrings(values []string) []string { + if len(values) == 0 { + return nil + } + result := make([]string, 0, len(values)) + seen := make(map[string]struct{}, len(values)) + for _, item := range values { + trimmed := strings.TrimSpace(item) + if trimmed == "" { + continue + } + if _, ok := seen[trimmed]; ok { + continue + } + seen[trimmed] = struct{}{} + result = append(result, trimmed) + } + return result +} + +func truncateRaw(raw *infrallm.TextResult) string { + if raw == nil { + return "" + } + text := strings.TrimSpace(raw.Text) + runes := []rune(text) + if len(runes) <= 200 { + return text + } + return string(runes[:200]) + "..." +} + +func (s *Service) now() time.Time { + if s == nil || s.clock == nil { + return time.Now() + } + return s.clock() +} + +func minInt(left, right int) int { + if left < right { + return left + } + return right +} diff --git a/backend/cmd/start.go b/backend/cmd/start.go index d8e5deb..9996988 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -13,6 +13,7 @@ import ( activeadapters "github.com/LoveLosita/smartflow/backend/active_scheduler/adapters" "github.com/LoveLosita/smartflow/backend/active_scheduler/applyadapter" + activefeedbacklocate "github.com/LoveLosita/smartflow/backend/active_scheduler/feedbacklocate" activegraph "github.com/LoveLosita/smartflow/backend/active_scheduler/graph" activejob "github.com/LoveLosita/smartflow/backend/active_scheduler/job" activepreview "github.com/LoveLosita/smartflow/backend/active_scheduler/preview" @@ -223,6 +224,7 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { taskRepo, cacheRepo, agentCacheRepo, + manager.ActiveSchedule, manager.ActiveScheduleSession, eventBus, scheduleService, @@ -258,11 +260,12 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { // 2. dry-run 与 selection 通过 graph runner 串起来,避免 trigger_pipeline 再拼第二套候选逻辑。 activeScheduleLLMClient := infrallm.WrapArkClient(aiHub.Pro) activeScheduleSelector := activesel.NewService(activeScheduleLLMClient) + activeScheduleFeedbackLocator := activefeedbacklocate.NewService(activeReaders, activeScheduleLLMClient) activeScheduleGraphRunner, err := activegraph.NewRunner(activeScheduleDryRun.AsGraphDryRunFunc(), activeScheduleSelector) if err != nil { return nil, err } - agentService.SetActiveScheduleSessionRerunFunc(buildActiveScheduleSessionRerunFunc(manager.ActiveSchedule, activeScheduleGraphRunner, activeSchedulePreviewConfirm)) + agentService.SetActiveScheduleSessionRerunFunc(buildActiveScheduleSessionRerunFunc(manager.ActiveSchedule, activeScheduleGraphRunner, activeSchedulePreviewConfirm, activeScheduleFeedbackLocator)) // 1. 生产投递先切到用户级飞书 Webhook provider,mock provider 文件继续保留给后续单测和本地隔离验证。 // 2. provider 与配置测试接口共用同一个实例,保证“测试成功”和“正式投递”走同一套 URL 校验、JSON 拼装和 HTTP 结果分类。 feishuProvider, err := notification.NewWebhookFeishuProvider(manager.Notification, notification.WebhookFeishuProviderOptions{ @@ -390,16 +393,17 @@ func buildActiveSchedulePreviewConfirmService(db *gorm.DB, activeDAO *dao.Active return activesvc.NewPreviewConfirmService(dryRun, previewService, activeDAO, applyadapter.NewGormApplyAdapter(db)) } -// buildActiveScheduleSessionRerunFunc 把主动调度 graph / preview 能力装成聊天入口可调用的 rerun 闭包。 +// buildActiveScheduleSessionRerunFunc 把主动调度定位器 / graph / preview 能力装成聊天入口可调用的 rerun 闭包。 // // 说明: -// 1. 这里只做最小接线:复用现有 trigger -> graph -> preview 组件,不把 worker/notification 再搬一遍; +// 1. 这里只做最小接线:复用现有定位器 -> trigger -> graph -> preview 组件,不把 worker/notification 再搬一遍; // 2. 成功时返回 session 状态、assistant 文本和业务卡片数据; // 3. 失败时直接把 error 交回聊天入口,由上层统一写失败日志和 SSE 错误。 func buildActiveScheduleSessionRerunFunc( activeDAO *dao.ActiveScheduleDAO, graphRunner *activegraph.Runner, previewConfirm *activesvc.PreviewConfirmService, + feedbackLocator *activefeedbacklocate.Service, ) agentsvcsvc.ActiveScheduleSessionRerunFunc { return func( ctx context.Context, @@ -419,16 +423,74 @@ func buildActiveScheduleSessionRerunFunc( if err != nil { return nil, err } - // 1. 当前最小接线先复用“原 trigger + 最新数据库事实”重跑 active scheduler graph。 - // 2. 用户这次回复的正文已经由聊天入口写进 conversation/timeline,但还没有下沉到 active_scheduler readers。 - // 3. 后续若要让 ask_user 回复直接改写 graph 事实源,应在 reader/context builder 层继续补这一跳。 + resolvedTargetType := activeTrigger.TargetType(triggerRow.TargetType) + resolvedTargetID := triggerRow.TargetID + needsFeedbackLocate := activeTrigger.TriggerType(triggerRow.TriggerType) == activeTrigger.TriggerTypeUnfinishedFeedback && + (resolvedTargetID <= 0 || containsString(session.State.MissingInfo, "feedback_target")) + + // 1. unfinished_feedback 在目标缺失时先走定位器,把用户补充信息转成可校验的 schedule_event。 + // 2. 定位失败时直接 ask_user,不硬猜 target_id,也不继续跑 graph。 + // 3. 定位成功后只改本次 domainTrigger 的 target_type / target_id,不写正式日程。 + if needsFeedbackLocate { + if feedbackLocator == nil { + question := firstNonEmptyString( + activefeedbacklocate.BuildAskUserQuestion(session.State.MissingInfo), + session.State.PendingQuestion, + ) + nextState := session.State + nextState.PendingQuestion = question + nextState.MissingInfo = appendMissingString(nextState.MissingInfo, "feedback_target") + nextState.LastCandidateID = "" + nextState.LastNotificationID = "" + nextState.FailedReason = "" + nextState.ExpiresAt = nil + return &agentsvcsvc.ActiveScheduleSessionRerunResult{ + AssistantText: question, + SessionState: nextState, + SessionStatus: model.ActiveScheduleSessionStatusWaitingUserReply, + }, nil + } + locateResult, locateErr := feedbackLocator.Resolve(ctx, activefeedbacklocate.Request{ + UserID: triggerRow.UserID, + UserMessage: userMessage, + PendingQuestion: session.State.PendingQuestion, + MissingInfo: cloneStringSlice(session.State.MissingInfo), + }) + if locateErr != nil { + return nil, locateErr + } + if locateResult.ShouldAskUser() { + question := firstNonEmptyString( + locateResult.AskUserQuestion, + activefeedbacklocate.BuildAskUserQuestion(session.State.MissingInfo), + session.State.PendingQuestion, + ) + nextState := session.State + nextState.PendingQuestion = question + nextState.MissingInfo = appendMissingString(nextState.MissingInfo, "feedback_target") + nextState.LastCandidateID = "" + nextState.LastNotificationID = "" + nextState.FailedReason = "" + nextState.ExpiresAt = nil + return &agentsvcsvc.ActiveScheduleSessionRerunResult{ + AssistantText: question, + SessionState: nextState, + SessionStatus: model.ActiveScheduleSessionStatusWaitingUserReply, + }, nil + } + resolvedTargetType = activeTrigger.TargetType(locateResult.TargetType) + resolvedTargetID = locateResult.TargetID + } + + // 1. 定位完成后再构造 domainTrigger,避免 unfinished_feedback 的 target_id 为空时误触校验失败。 + // 2. 这里仍然复用现有 graph -> preview 链路,不写新排程引擎。 domainTrigger := activeTrigger.ActiveScheduleTrigger{ TriggerID: triggerRow.ID, UserID: triggerRow.UserID, TriggerType: activeTrigger.TriggerType(triggerRow.TriggerType), Source: activeTrigger.SourceUserFeedback, - TargetType: activeTrigger.TargetType(triggerRow.TargetType), - TargetID: triggerRow.TargetID, + TargetType: resolvedTargetType, + TargetID: resolvedTargetID, FeedbackID: triggerRow.FeedbackID, IdempotencyKey: triggerRow.IdempotencyKey, MockNow: nil, @@ -550,6 +612,35 @@ func cloneStringSlice(values []string) []string { return copied } +// appendMissingString 负责把缺失字段名补回状态数组,避免 ask_user 分支把原始缺失项冲掉。 +func appendMissingString(values []string, next string) []string { + trimmed := strings.TrimSpace(next) + if trimmed == "" { + return cloneStringSlice(values) + } + for _, value := range values { + if strings.TrimSpace(value) == trimmed { + return cloneStringSlice(values) + } + } + result := cloneStringSlice(values) + return append(result, trimmed) +} + +// containsString 负责判断 missing_info 里是否已经标记过某个缺失项。 +func containsString(values []string, target string) bool { + trimmed := strings.TrimSpace(target) + if trimmed == "" { + return false + } + for _, value := range values { + if strings.TrimSpace(value) == trimmed { + return true + } + } + return false +} + func configureAgentService( agentService *service.AgentService, ragRuntime infrarag.Runtime, diff --git a/backend/dao/active_schedule_session.go b/backend/dao/active_schedule_session.go index a8fc219..e3a3f9f 100644 --- a/backend/dao/active_schedule_session.go +++ b/backend/dao/active_schedule_session.go @@ -192,6 +192,44 @@ func (d *ActiveScheduleSessionDAO) UpdateActiveScheduleSessionFieldsBySessionID( Updates(normalizedUpdates).Error } +// TryTransitionActiveScheduleSessionStatusBySessionID 按 session_id 原子切换主动调度会话状态。 +// +// 职责边界: +// 1. 只负责“当前状态仍为 fromStatus 时才切到 toStatus”的轻量 CAS,不写 state_json 和 preview_id; +// 2. 返回 true 表示本次调用抢到了状态推进权,可以继续执行后续 rerun; +// 3. 返回 false 表示已有其他请求先推进了状态,调用方应降级为占管提示,避免重复生成 preview。 +func (d *ActiveScheduleSessionDAO) TryTransitionActiveScheduleSessionStatusBySessionID(ctx context.Context, sessionID string, fromStatus string, toStatus string) (bool, error) { + if err := d.ensureDB(); err != nil { + return false, err + } + + normalizedSessionID := strings.TrimSpace(sessionID) + if normalizedSessionID == "" { + return false, errors.New("session_id is empty") + } + + normalizedFrom, err := normalizeActiveScheduleSessionStatus(fromStatus) + if err != nil { + return false, fmt.Errorf("invalid active schedule session from status: %w", err) + } + normalizedTo, err := normalizeActiveScheduleSessionStatus(toStatus) + if err != nil { + return false, fmt.Errorf("invalid active schedule session to status: %w", err) + } + + result := d.db.WithContext(ctx). + Model(&model.ActiveScheduleSession{}). + Where("session_id = ? AND status = ?", normalizedSessionID, normalizedFrom). + Updates(map[string]any{ + "status": normalizedTo, + "updated_at": time.Now(), + }) + if result.Error != nil { + return false, result.Error + } + return result.RowsAffected > 0, nil +} + // UpdateActiveScheduleSessionFieldsByConversationID 按 user_id + conversation_id 更新最新记录的局部字段。 // // 步骤化说明: diff --git a/backend/service/agent_bridge.go b/backend/service/agent_bridge.go index 1ff088f..1f7b118 100644 --- a/backend/service/agent_bridge.go +++ b/backend/service/agent_bridge.go @@ -25,10 +25,11 @@ func NewAgentService( taskRepo *dao.TaskDAO, cacheDAO *dao.CacheDAO, agentRedis *dao.AgentCache, + activeScheduleDAO *dao.ActiveScheduleDAO, activeSessionDAO *dao.ActiveScheduleSessionDAO, eventPublisher outboxinfra.EventPublisher, ) *AgentService { - return agentsvc.NewAgentService(aiHub, repo, taskRepo, cacheDAO, agentRedis, activeSessionDAO, eventPublisher) + return agentsvc.NewAgentService(aiHub, repo, taskRepo, cacheDAO, agentRedis, activeScheduleDAO, activeSessionDAO, eventPublisher) } // NewAgentServiceWithSchedule 在基础 AgentService 上注入排程依赖。 @@ -43,12 +44,13 @@ func NewAgentServiceWithSchedule( taskRepo *dao.TaskDAO, cacheDAO *dao.CacheDAO, agentRedis *dao.AgentCache, + activeScheduleDAO *dao.ActiveScheduleDAO, activeSessionDAO *dao.ActiveScheduleSessionDAO, eventPublisher outboxinfra.EventPublisher, scheduleSvc *ScheduleService, taskSvc *TaskService, ) *AgentService { - svc := agentsvc.NewAgentService(aiHub, repo, taskRepo, cacheDAO, agentRedis, activeSessionDAO, eventPublisher) + svc := agentsvc.NewAgentService(aiHub, repo, taskRepo, cacheDAO, agentRedis, activeScheduleDAO, activeSessionDAO, eventPublisher) // 注入排程依赖:将 service 层方法包装为函数闭包,避免循环依赖。 if scheduleSvc != nil { diff --git a/backend/service/agentsvc/agent.go b/backend/service/agentsvc/agent.go index a94e0dc..9db635e 100644 --- a/backend/service/agentsvc/agent.go +++ b/backend/service/agentsvc/agent.go @@ -31,6 +31,7 @@ type AgentService struct { taskRepo *dao.TaskDAO cacheDAO *dao.CacheDAO agentCache *dao.AgentCache + activeScheduleDAO *dao.ActiveScheduleDAO activeScheduleSessionDAO *dao.ActiveScheduleSessionDAO eventPublisher outboxinfra.EventPublisher @@ -79,6 +80,7 @@ func NewAgentService( taskRepo *dao.TaskDAO, cacheDAO *dao.CacheDAO, agentRedis *dao.AgentCache, + activeScheduleDAO *dao.ActiveScheduleDAO, activeSessionDAO *dao.ActiveScheduleSessionDAO, eventPublisher outboxinfra.EventPublisher, ) *AgentService { @@ -93,6 +95,7 @@ func NewAgentService( taskRepo: taskRepo, cacheDAO: cacheDAO, agentCache: agentRedis, + activeScheduleDAO: activeScheduleDAO, activeScheduleSessionDAO: activeSessionDAO, eventPublisher: eventPublisher, } diff --git a/backend/service/agentsvc/agent_active_schedule_session.go b/backend/service/agentsvc/agent_active_schedule_session.go index b0a2634..147b689 100644 --- a/backend/service/agentsvc/agent_active_schedule_session.go +++ b/backend/service/agentsvc/agent_active_schedule_session.go @@ -114,6 +114,27 @@ func (s *AgentService) persistActiveScheduleSessionBestEffort(ctx context.Contex return nil } +// persistActiveScheduleTriggerPreviewBestEffort 负责把 rerun 产生的新 preview_id 同步回 trigger。 +// +// 职责边界: +// 1. 只维护 trigger -> preview 的审计指针,不修改 preview 内容,也不推进 confirm/apply 状态; +// 2. trigger_id 或 preview_id 为空时直接跳过,避免把不完整 rerun 结果写入触发记录; +// 3. DAO 未注入时保持迁移期兼容,调用方仍以 session 写回作为主流程。 +func (s *AgentService) persistActiveScheduleTriggerPreviewBestEffort(ctx context.Context, triggerID string, previewID string) error { + if s == nil || s.activeScheduleDAO == nil { + return nil + } + normalizedTriggerID := strings.TrimSpace(triggerID) + normalizedPreviewID := strings.TrimSpace(previewID) + if normalizedTriggerID == "" || normalizedPreviewID == "" { + return nil + } + return s.activeScheduleDAO.UpdateTriggerFields(ctx, normalizedTriggerID, map[string]any{ + "preview_id": &normalizedPreviewID, + "updated_at": time.Now(), + }) +} + // handleActiveScheduleSessionChat 处理被主动调度 session 占管的聊天入口。 // // 步骤化说明: @@ -165,20 +186,36 @@ func (s *AgentService) handleActiveScheduleSessionChat( } // 1. 收到用户补充信息后,先把 session 切成 rerunning,避免并发请求继续按旧状态走普通聊天。 // 2. 这个阶段只是状态切换,不代表 graph 已经完成。 - session.Status = model.ActiveScheduleSessionStatusRerunning - if err := s.persistActiveScheduleSessionBestEffort(ctx, session); err != nil { + // 3. 这里必须使用 DB CAS 抢占 rerun 权限,避免两条补充消息同时读到 waiting_user_reply 后重复生成 preview。 + switched, err := s.activeScheduleSessionDAO.TryTransitionActiveScheduleSessionStatusBySessionID( + ctx, + session.SessionID, + model.ActiveScheduleSessionStatusWaitingUserReply, + model.ActiveScheduleSessionStatusRerunning, + ) + if err != nil { return true, err } + if !switched { + if err := s.respondActiveScheduleRerunning(ctx, userID, chatID, traceID, resolvedModelName, requestStart, outChan); err != nil { + return true, err + } + return true, nil + } + session.Status = model.ActiveScheduleSessionStatusRerunning + if s.cacheDAO != nil { + if cacheErr := s.cacheDAO.SetActiveScheduleSessionToCache(ctx, session); cacheErr != nil { + log.Printf("回填主动调度 rerunning session 缓存失败 session=%s err=%v", session.SessionID, cacheErr) + } + } return true, s.runActiveScheduleSessionRerun(ctx, session, trimmedMessage, traceID, requestStart, resolvedModelName, outChan, errChan) case model.ActiveScheduleSessionStatusRerunning: // 1. rerunning 是占管中的过渡态,说明当前会话已经在重跑或刚开始重跑。 // 2. 这里不再触发第二次 rerun,只给用户一个可见的等待提示。 if trimmedMessage != "" { - assistantText := "主动调度正在重新生成建议,请稍后再试。" - if err := s.persistNewAgentConversationMessage(ctx, userID, chatID, schema.AssistantMessage(assistantText, nil), 0); err != nil { + if err := s.respondActiveScheduleRerunning(ctx, userID, chatID, traceID, resolvedModelName, requestStart, outChan); err != nil { return true, err } - emitActiveScheduleAssistantChunk(outChan, traceID, resolvedModelName, requestStart, assistantText, nil) } return true, nil default: @@ -186,6 +223,29 @@ func (s *AgentService) handleActiveScheduleSessionChat( } } +// respondActiveScheduleRerunning 负责在重复补充命中并发保护时写入可见提示。 +// +// 职责边界: +// 1. 只写聊天历史和 SSE 文本,不推进 session、trigger、preview 状态; +// 2. 用于 rerunning 状态或 CAS 抢占失败后的兜底提示,避免再次触发 graph; +// 3. 写入失败时返回 error,让上层按聊天入口的错误通道处理。 +func (s *AgentService) respondActiveScheduleRerunning( + ctx context.Context, + userID int, + chatID string, + traceID string, + resolvedModelName string, + requestStart time.Time, + outChan chan<- string, +) error { + assistantText := "主动调度正在重新生成建议,请稍后再试。" + if err := s.persistNewAgentConversationMessage(ctx, userID, chatID, schema.AssistantMessage(assistantText, nil), 0); err != nil { + return err + } + emitActiveScheduleAssistantChunk(outChan, traceID, resolvedModelName, requestStart, assistantText, nil) + return nil +} + // runActiveScheduleSessionRerun 负责把 waiting_user_reply 的用户补充同步推进成新的主动调度结果。 // // 职责边界: @@ -230,10 +290,9 @@ func (s *AgentService) runActiveScheduleSessionRerun( } session.Status = finalStatus session.State = result.SessionState - if strings.TrimSpace(result.PreviewID) != "" { - session.CurrentPreviewID = strings.TrimSpace(result.PreviewID) - } else if session.Status != model.ActiveScheduleSessionStatusReadyPreview { - session.CurrentPreviewID = "" + previewID := strings.TrimSpace(result.PreviewID) + if previewID != "" { + session.CurrentPreviewID = previewID } if session.Status == model.ActiveScheduleSessionStatusReadyPreview { session.State.PendingQuestion = "" @@ -241,6 +300,12 @@ func (s *AgentService) runActiveScheduleSessionRerun( session.State.FailedReason = "" } + if previewID != "" { + if err := s.persistActiveScheduleTriggerPreviewBestEffort(ctx, session.TriggerID, previewID); err != nil { + return err + } + } + if err := s.persistActiveScheduleSessionBestEffort(ctx, session); err != nil { return err } diff --git a/backend/shared/events/active_schedule.go b/backend/shared/events/active_schedule.go index 7610516..5bd84af 100644 --- a/backend/shared/events/active_schedule.go +++ b/backend/shared/events/active_schedule.go @@ -75,7 +75,7 @@ func (p ActiveScheduleTriggeredPayload) Validate() error { if !isAllowedActiveScheduleTargetType(p.TargetType) { return errors.New("target_type 不在主动调度第一版允许范围内") } - if p.TargetID <= 0 { + if p.TargetID <= 0 && strings.TrimSpace(p.TriggerType) != ActiveScheduleTriggerTypeUnfinishedFeedback { return errors.New("target_id 必须大于 0") } if p.RequestedAt.IsZero() { diff --git a/docs/backend/主动调度缺口分阶段实施计划.md b/docs/backend/主动调度缺口分阶段实施计划.md index 9512e92..02eb759 100644 --- a/docs/backend/主动调度缺口分阶段实施计划.md +++ b/docs/backend/主动调度缺口分阶段实施计划.md @@ -4,6 +4,8 @@ 目标只有一个:把主动调度剩下的缺口按阶段补完,并且每个阶段都能明确验收、明确自动化边界、明确是否已经完成。后续我会在这里持续把 `[ ]` 改成 `[x]`。 +补充约定:本文档是阶段进度看板,不再重复写设计细节;已经落地的阶段必须及时改成 `[x]`,并保留简短验证记录。实现方案和边界口径以《第二阶段主动调度 MVP 实现方案.md》为准。 + --- ## 0. 当前仓库基线 @@ -25,9 +27,19 @@ - [x] `CreatePreview` 已切到 graph + 受限 selector,不再是固定 top1 / `Candidates[0]`。 - [x] `active_schedule_sessions` 已正式进入代码,并接好缓存链路。 - [x] 聊天入口已按 session 状态拦截,`waiting_user_reply / rerunning` 会接管补信息链路。 -- [ ] `unfinished_feedback` 的“定位 -> ask_user -> 重跑 graph”闭环还没完全做实。 -- [ ] 聊天页里的主动调度 preview 卡片 / 微调弹窗还没有最小适配。 -- [ ] 剩余极限验收项还没完全脚本化。 +- [x] `unfinished_feedback` 的“定位 -> ask_user -> 重跑 graph”后端闭环已经接上,前端最小适配也已落地。 +- [x] 聊天页里的主动调度 preview 卡片 / 微调弹窗已完成最小适配。 +- [ ] 剩余极限验收项已做过实测验证,但还在继续脚本化沉淀。 + +### 近期实测记录(2026-05-02) + +这轮重新核验后,主动调度最小闭环已经再次跑通,可作为后续阶段 5 负向边界验收的基线。 + +1. 测试账号为 `test0424 / 123456`,任务为“做马原大作业”,`mock_now` 固定到 `2026-04-30T01:14:21+08:00` 的周四窗口。 +2. 这次链路跑出了 `trigger_id=ast_1bb62e3e-f2cf-48a9-8f29-1461b99bff6b`、`preview_id=asp_e79db789-ba16-4108-a843-cd33c03aa3f6`、`conversation_id=ce525dc0-101a-50ca-8993-7fc466328de2`、`notification_records.id=22`。 +3. DB 对账显示 `active_schedule_triggers.status=preview_generated`、`active_schedule_previews.status=ready`、`active_schedule_sessions.status=ready_preview`,而 `schedule_events` 对该 preview 的正式写入计数为 0,说明这次只到预览态,没有执行正式 apply。 +4. 浏览器里周四列同时存在已有课程和待确认任务块;周六窗口只剩一个孤立任务块属于正常现象,不是后端漏课程。 +5. 这批结果说明:当前收口已经不在“主链路能不能跑通”,而是在“阶段 5 的过期、重复、篡改、冲突、幂等、通知和 outbox 负向边界能不能全部挡住”。 ### 代码锚点 @@ -71,8 +83,8 @@ | 阶段 0 | [x] | 补 `estimated_sections` 写入入口 | 创建任务时能稳定写入 1~4 节,主动调度只消费落库值 | 可以,API + DB + `go test` | | 阶段 1 | [x] | 补主动调度 Eino graph 和 LLM 解释 / 补全兜底 | 产生候选、有限裁决、输出解释、保留 fallback | 可以,后端单测 + API 验证 | | 阶段 2 | [x] | 补 `active_schedule_sessions`、聊天拦截和缓存链路 | `waiting_user_reply / rerunning` 拦截生效,`ready_preview` 释放 | 可以,API + DB + 路由验证 | -| 阶段 3 | [ ] | 补 `unfinished_feedback`、`ask_user` 闭环和前端最小适配 | 用户在聊天页补信息后能重跑 graph 并刷新 preview | 后端可自动,前端需浏览器验证 | -| 阶段 4 | [ ] | 收口飞书通知与会话链接 | `action_url` 指向 `/assistant/{conversation_id}`,通知 payload 从简 | 可以,webhook POST + DB 验证 | +| 阶段 3 | [x] | 完成 `unfinished_feedback`、`ask_user` 闭环和前端最小适配 | 用户在聊天页补信息后能重跑 graph 并刷新 preview | 后端可自动,前端需浏览器验证 | +| 阶段 4 | [x] | 收口飞书通知与会话链接 | `action_url` 指向 `/assistant/{conversation_id}`,通知 payload 从简 | 可以,webhook POST + DB 验证 | | 阶段 5 | [ ] | 跑完第五阶段剩余验收和失败注入脚本 | 冲突、过期、重复确认、重试、dead/skipped 全覆盖 | 可以,基本全自动 | --- @@ -268,9 +280,9 @@ **当前状态** -`unfinished_feedback` 目前还偏向“已有目标就能做”,但“定位不稳怎么办、用户回一句怎么办、如何重跑 graph”还没有完全闭环。 +`unfinished_feedback` 的后端定位闭环、聊天页最小适配和并发兜底都已经收口,阶段 3 已完成。 -**要做什么** +**已落地内容** 1. 定位逻辑按这个顺序走: - LLM 上下文推断 @@ -287,6 +299,30 @@ - 只是把 timeline 新类型和主动调度 confirm API 接起来 7. 后端负责把主动调度 preview DTO 转成前端容易复用的结构,前端不背脏活。 +**补充链路图** + +```mermaid +flowchart TD + A[用户在聊天页发送消息\nPOST /api/v1/agent/chat] --> B{查询 active_schedule_sessions} + + B -->|未占管| N[进入普通 newAgent] + B -->|waiting_user_reply / rerunning| C[拦截到主动调度分支] + + C --> D[写入用户消息到 timeline] + D --> E[LLM JSON 定位节点\n只负责补齐缺失事实] + E --> F{能否定位 schedule_event?} + + F -->|否| G[生成 ask_user\nsession=waiting_user_reply] + F -->|是| H[重跑 active scheduler graph] + + H --> I[BuildContext -> Observe -> GenerateCandidates -> SelectAndExplain] + I --> J[CreatePreview] + J --> K[session=ready_preview] + K --> N +``` + +这个定位节点只做一件事:把用户补充的话术转成后端可校验的 JSON 事实,不负责写正式日程,不负责生成新排程策略,也不负责替代后端候选裁决。 + **验收点** 1. 用户补完当前主动调度缺失事实后,能刷新 preview 并解除锁定;解锁后再说“我周末不想学习”这类偏好话术时,直接走现有 newAgent memory / execute 链路。 @@ -300,69 +336,40 @@ - 前端卡片展示和按钮分支,建议用浏览器实际打开一次做可视确认。 - 如果只是检查 DOM / 路由 / 请求是否发对,能自动;如果要看卡片样式是否真的对齐,还是需要浏览器看一眼。 +**验证记录** + +1. `go build ./cmd/all` 已通过。 +2. `go test ./...` 已通过。 +3. 并发补充场景已验证:同一会话两条补充同时到来时,只会有一条抢到 rerun,另一条返回占管提示,不会重复生成 preview。 + --- ### 阶段 4:收口飞书通知与会话链接 **当前状态** -用户级 webhook 配置、通知投递、测试接口已经有基础,但主入口还需要统一收口到聊天会话链接,不能再把旧的 `/schedule-adjust/{preview_id}` 当新目标。 +已完成。用户级 webhook 配置、通知投递、测试接口和会话链接已经统一到 `/assistant/{conversation_id}`,不再把旧的 `/schedule-adjust/{preview_id}` 当作新入口。 -**要做什么** +**已落地内容** -1. 通知前先绑定或预创建 `conversation_id`。 -2. `action_url` 统一走: +1. 通知前由后端预创建或绑定 `conversation_id`,保证飞书点击后直接进入同一会话。 +2. `action_url` 已统一为 `/assistant/{conversation_id}`;通知 payload 保持从简,只保留会话跳转和排障所需字段。 +3. 用户级飞书 webhook 的保存 / 查询 / 删除 / 测试接口已接通,真实投递与测试共用同一套 provider 校验和 JSON 拼装。 +4. `notification_records` 已覆盖 `sent / failed / dead / skipped` 和 retry 相关状态。 +5. 用户未配置或禁用 webhook 时,通知记录会落 `skipped`,不阻塞主链路。 -```text -/assistant/{conversation_id} -``` +**验证记录** -3. 本地测试和示例配置继续用 `localhost`,上线后再换正式域名。 -4. 业务 JSON 保持从简,只让飞书流程去编排消息,不把复杂卡片协议塞进 webhook。 -5. 维持当前通知状态机: - - `sent` - - `failed` - - `dead` - - `skipped` - - retry 相关状态 +1. `PUT /api/v1/notification/channels/feishu` 已跑通。 +2. `POST /api/v1/notification/channels/feishu/test` 已跑通,成功时会回写 `last_test_status / last_test_at`。 +3. `POST /api/v1/active-schedule/trigger` 后生成的通知请求 payload 中,`message.action_url` 已指向 `/assistant/{conversation_id}`。 +4. 真实飞书消息样式和外部页面交互仍属于最终验收,不影响后端收口结论。 -**建议 payload 形态** +**建议保留的边界** -```json -{ - "event": "smartflow.schedule_adjustment_ready", - "version": "1", - "notification_id": 123, - "user_id": 5, - "preview_id": "asp_xxx", - "conversation_id": "conv_xxx", - "trigger_id": "ast_xxx", - "trigger_type": "important_urgent_task", - "target_type": "task_pool", - "target_id": 81, - "message": { - "title": "SmartFlow 日程调整建议", - "summary": "把重要且紧急任务放入滚动 24 小时内的空闲节次。", - "action_text": "查看并确认调整", - "action_url": "http://localhost:5173/assistant/conv_xxx" - }, - "trace_id": "trace_xxx", - "sent_at": "2026-04-30T17:34:52+08:00" -} -``` - -**验收点** - -1. 通知里的跳转链接能直接进聊天页。 -2. 用户级 webhook 的保存、查询、删除、测试都能跑通。 -3. 未配置、临时失败、不可恢复失败的状态都能在 `notification_records` 里看见。 -4. 用户已经在聊天页时,不再强依赖飞书通知承接回复。 - -**自动化测试** - -- 可以自动跑。 -- 建议路径:Webhook POST、测试接口、`notification_records` 状态断言、真实 webhook 收到后人工看一次消息。 -- 如果需要验证“飞书真的收到”,最终还是要看外部页面一次,但 HTTP 层和状态层可以自动。 +1. 这里不再恢复旧的 `/schedule-adjust/{preview_id}` 主入口。 +2. 业务 JSON 继续从简,不把复杂卡片协议塞进 webhook。 +3. 后续如果要扩展新的通知渠道,先复用 `notification_records` 状态机和 `FeishuProvider` 的抽象边界。 --- @@ -370,7 +377,7 @@ **当前状态** -主链路已经有了,但极限边界还需要系统化收口。 +主链路已经有了,而且周四窗口的最小闭环已经再次跑通;现在阶段 5 的重点只剩极限边界、失败注入和脚本化收口。这里不再重跑阶段 3 的整套 `ask_user` 主链路,只保留 1 条真实 chat 烟测确认入口未回归。 **要做什么** @@ -391,7 +398,12 @@ 1. 所有核心状态机都能串起来排障。 2. 同一条 preview / notification / apply 不会被重复落库。 3. 过期、冲突、篡改、失败注入都能拒绝。 -4. 最终能把这一轮主动调度缺口标成完成。 +4. 预览过期、重复 confirm、错误 candidate / 跨用户 preview / preview-session 不匹配都能挡住。 +5. 同一 `idempotency_key / dedupe_key` 不能重复生成有效 trigger / preview / notification。 +6. notification 的 `skipped / failed / dead / sent` 状态都能在 DB 里对上。 +7. outbox 重复消费不会重复投递或重复写通知记录。 +8. `api / worker / all` 三种启动边界相关 handler / job 注册不能缺失。 +9. 最终能把这一轮主动调度缺口标成完成。 **自动化测试** @@ -414,6 +426,7 @@ 8. `unfinished_feedback` 先定位,再 `ask_user`,定位成功后直接生成补做 preview,不移动原任务。 9. 用户在聊天页说偏好时,不归主动调度接管;解锁后直接走现有 newAgent memory / execute 链路。 10. 只有后台离线自动触达才走飞书;用户已经在会话里时,不需要再先走飞书通知。 +11. `ask_user` 闭环只新增一个 LLM JSON 定位节点,沿用正常 `chat` 入口和 session 拦截,不单独再造工具系统。 --- diff --git a/docs/backend/第二阶段主动调度MVP功能预期.md b/docs/backend/第二阶段主动调度MVP功能预期.md index 4d714a5..8a7e0a5 100644 --- a/docs/backend/第二阶段主动调度MVP功能预期.md +++ b/docs/backend/第二阶段主动调度MVP功能预期.md @@ -1,5 +1,19 @@ # 第二阶段主动调度 MVP 功能预期 +## 0. 当前状态摘要(2026-05-02) + +本文档仍作为主动调度 MVP 的产品边界说明;具体实现拆分、表结构和阶段进度以《第二阶段主动调度 MVP 实现方案.md》和《主动调度缺口分阶段实施计划.md》为准。 + +截至当前仓库状态,MVP 主闭环已经从“功能预期”推进到可演示、可排障的实现状态: + +1. `important_urgent_task` 已能从任务池生成主动调度 trigger,写入 `active_schedule_previews`,并通过飞书 webhook 触达用户。 +2. 飞书链接已统一进入 `/assistant/{conversation_id}`,不再使用独立 `/schedule-adjust/{preview_id}` 入口。 +3. 用户进入助手页后,可以看到主动调度卡片,并打开日程预览与精排弹窗;确认前只展示 preview,不写正式 `schedule_events`。 +4. 用户确认后才进入同步 apply 链路;apply 失败必须回写可排障状态,不能半写正式日程。 +5. `unfinished_feedback` 的“定位 -> ask_user -> 重跑 graph -> 刷新 preview”闭环已经落地,阶段 5 不再重复全套主链路,只保留 1 条真实 chat 烟测确认入口未回归。 + +演示和验收时需要特别注意:主动调度窗口是“从当前时刻起滚动 24 小时”,不是自然日或整周。如果窗口落在周六且该用户没有课程,预览里只出现待安排任务是符合语义的;需要展示“已有课程 + 待确认任务”同屏时,应把 `mock_now` 固定到有课程的周四窗口。 + ## 1. 文档目的 本文档先讨论第二阶段最终想做成什么功能,不进入具体代码拆分和表结构实现。 @@ -530,20 +544,40 @@ assumed_completed --- -## 11. MVP 验收标准 +## 11. MVP 验收标准与当前状态 -第一版功能算完成,需要满足: +第一版验收按“已落地口径、阶段 5 主验收、后置能力”三类看,避免把后续演进项混进本轮收口。 + +已落地并作为 MVP 口径的能力: 1. 只扫描滚动 24 小时内的问题。 2. 能识别四象限“重要且紧急”池中尚未进入日程视图的任务。 -3. 能在用户反馈未完成后,识别受影响的 schedule 任务与后继任务。 -4. 动态任务计划时间过去后默认按已完成推进,不主动追问。 -5. 当前动态任务失败且影响后继时,能按“局部重排 -> 延后结束 -> 压缩融合”顺序生成候选。 -6. 能输出结构化 metrics / issues / decision / candidates。 -7. 候选项必须来自后端,不让 LLM 自由生成正式写库参数;LLM 只做解释、追问和有限选择。 -8. 不直接写正式 schedule,只写预览或触达用户。 -9. 能发布 `notification.feishu.requested` 提醒用户回系统确认。 -10. 用户确认后才允许进入正式应用链路。 +3. 能输出结构化 `metrics / issues / decision / candidates`。 +4. 候选项必须来自后端,不让 LLM 自由生成正式写库参数;LLM 只做解释、追问和有限选择。 +5. 确认前不直接写正式 `schedule_events / schedules`,只写 `active_schedule_previews` 或触达用户。 +6. 能发布 `notification.feishu.requested`,并通过 `notification_records` 记录 `sent / failed / dead / skipped` 等状态。 +7. 飞书触达后进入 `/assistant/{conversation_id}`,由助手会话页承载预览、微调和确认。 +8. 用户确认后才允许进入同步 apply 链路;成功写正式日程,失败回写预览状态和错误原因。 +9. `unfinished_feedback` 已支持用户补充事实后的 `ask_user -> rerun -> ready_preview` 闭环。 +10. 动态任务计划时间过去后默认按 `assumed_completed` 推进,不主动追问。 + +阶段 5 主验收重点: + +1. preview 过期后 confirm 必须拒绝。 +2. 同一 preview 重复 confirm 只能生效一次。 +3. 错误 `candidate_id`、跨用户 `preview_id`、preview 与 session 不匹配都必须拒绝。 +4. 冲突或不可写入时,apply 应失败并保留可排障状态。 +5. 相同 `idempotency_key / dedupe_key` 不能重复生成有效 trigger / preview / notification。 +6. notification 要覆盖未配置 webhook 的 `skipped`、临时失败的 `failed/retry`、永久失败的 `dead` 和成功的 `sent`。 +7. 重复消费同一 `notification.feishu.requested` 不能重复投递或重复写 `notification_records`。 +8. `api / worker / all` 三种启动边界相关 handler / job 注册不能缺失。 + +后置能力,不作为当前 MVP 完成阻塞: + +1. 课程变化触发、疲劳反馈触发和更复杂的跨天全局重排。 +2. `compress_with_next_dynamic_task` 压缩融合候选,当前只保留 schema / 口径,不默认生成。 +3. apply 成功后的撤销按钮和完整回滚体验。 +4. 更细颗粒的偏好打破策略,例如哪些偏好是硬约束、哪些偏好可在高风险任务前让位。 --- @@ -563,9 +597,18 @@ assumed_completed --- -## 13. 待讨论问题 +## 13. 已收口与后续演进问题 -1. 主动观测能力是否最终落成独立工具,以及具体命名是什么。 -2. 四象限懒加载轮换要如何补齐:后台 worker 定时刷新、主动调度前同步刷新,还是抽公共轮换服务。 -3. 前后对比回滚机制复用现有哪条链路,是否需要新增主动调度预览类型。 -4. 魔改粗排时,哪些用户偏好可以被打破,哪些偏好必须保持为硬约束。 +已收口: + +1. 主动观测能力当前落在 `backend/active_scheduler` 准独立模块内,主链路走 graph / service pipeline,不进入 ReAct 工具循环。 +2. 主入口统一到 `/assistant/{conversation_id}`,飞书只负责触达和跳转,不在飞书内完成复杂确认。 +3. 预览独立写入 `active_schedule_previews`,不塞进 `agent_schedule_states`;正式 apply 仍走后端重校验。 +4. task_pool 任务进入正式日程时使用 `schedule_events.task_source_type=task_pool`,不创建孤儿 `task_item`。 + +后续演进: + +1. 四象限懒加载轮换仍建议继续评估后台刷新或抽公共轮换服务,避免主动调度读取到过期任务池。 +2. 前后对比与回滚体验还可以继续增强,但不影响当前“确认前只预览、确认后同步 apply”的 MVP 语义。 +3. 压缩融合和复杂偏好打破策略暂不打开,等主链路和失败注入脚本稳定后再评估。 +4. 课程变化、疲劳反馈和跨天计划优化后置到后续版本。 diff --git a/docs/backend/第二阶段主动调度MVP实现方案.md b/docs/backend/第二阶段主动调度MVP实现方案.md index e1b0422..82599d9 100644 --- a/docs/backend/第二阶段主动调度MVP实现方案.md +++ b/docs/backend/第二阶段主动调度MVP实现方案.md @@ -2,7 +2,7 @@ ## 0. Handoff 说明 -本文档已收口为第二阶段主动调度 MVP 的最终实施版。截至 2026-04-30,后端第一至第四阶段主体代码已实现并通过本地 `go test ./...`;真实飞书 webhook 配置接口和 `important_urgent_task` 主动触发端到端链路已通过本地后端验收。接手者请优先阅读本节、第 10 章装配边界和第 14 章验证 checklist,再从第五阶段剩余验收继续推进。 +本文档已收口为第二阶段主动调度 MVP 的最终实施版。截至 2026-05-02,后端第一至第四阶段主体代码已实现并通过本地 `go test ./...`;真实飞书 webhook 配置接口、`/assistant/{conversation_id}` 会话链接和 `important_urgent_task` 主动触发端到端链路已通过本地后端验收。阶段进度看板请以《主动调度缺口分阶段实施计划.md》为准,那里会把已完成阶段标成 `[x]`。接手者请优先阅读本节、第 10 章装配边界和第 14 章验证 checklist,再从第五阶段剩余验收继续推进。 当前核心共识: @@ -60,7 +60,7 @@ 3. task_pool 正式落库写 `schedule_events(type=task, task_source_type=task_pool, rel_id=tasks.id)`。 4. 补做块新增 event,不移动原已排任务。 -第四阶段:worker 与 notification。(主体代码已完成,真实 webhook 配置接口已验收) +第四阶段:worker 与 notification。(已完成,真实 webhook 配置接口已验收) 1. 接入 `active_schedule.triggered` worker handler 和 due job scanner。 2. 接入 `notification.feishu.requested` handler。 @@ -75,7 +75,7 @@ 3. 根据日志和测试结果补齐 trace 字段与错误码。 4. 主链路稳定后再评估是否打开压缩融合候选。 -第六阶段:主动调度 graph 补齐、会话桥与聊天页合流。(待实施) +第六阶段:主动调度 graph 补齐、会话桥与聊天页合流。(主体已完成,剩余最终验收与文档收口) 0. `estimated_sections` 写入入口已经补完:普通任务创建请求、转换层和 quick task / 随口记创建任务时,都会把 LLM 估计的 1~4 节写入 `tasks.estimated_sections`;主动调度只消费该字段,不在 graph 内重新猜任务耗时。 1. 补主动调度 Eino graph:把现有 `BuildContext -> Observe -> GenerateCandidates -> CreatePreview` 固定 pipeline 整理成 graph 节点,并新增 LLM 解释 / 有限选择、`ask_user`、fallback 分支;当前代码里的 first-fit / `Candidates[0]` 只能作为过渡实现。 @@ -141,7 +141,7 @@ - 已实现 preview 写入、详情查询、`apply_id + idempotency_key`、候选转换、同步 apply adapter。 - `add_task_pool_to_schedule` 已能正式写入 `schedule_events(type=task, task_source_type=task_pool, rel_id=tasks.id)` 和对应 `schedules`。 - `create_makeup` 转换与 adapter 已预留并实现基本写入路径,但尚需在第四 / 第五阶段结合正式 unfinished feedback worker 场景补端到端验收。 -4. 第四阶段:worker 与 notification 主体代码。 +4. 第四阶段:worker 与 notification 主体代码(已完成)。 - 已接入 `active_schedule.triggered` worker handler、due job scanner、`notification.feishu.requested` handler 和 notification retry loop。 - 已新增 `backend/notification` provider / service 分层,mock provider 保留,真实投递切到用户级飞书 Webhook 触发器 provider。 - 已新增 `user_notification_channels` model / DAO,并接入 AutoMigrate 与 `RepoManager`。 @@ -201,17 +201,30 @@ 下一阶段入口: -1. 下一步继续第五阶段剩余验收,不需要重做 dry-run / preview / confirm 主链路,也不需要重做第四阶段 provider / handler 主体代码。 -2. 第五阶段剩余重点: - - confirm apply 冲突失败、过期拒绝。 +1. 下一步继续第五阶段剩余验收,不需要重做 dry-run / preview / confirm 主链路,也不需要重做第四阶段 provider / handler 主体代码,也不需要重做第六阶段 graph / session bridge / 聊天页合流主体代码。 +2. 第五阶段剩余重点已经收口为负向边界和脚本化验收: + - 保留前置验证:`go build ./cmd/all`、backend 下 `go test ./...`,并清理本次生成的 `.gocache`。 + - 阶段 3 主链路不再全量重跑,只做 1 条真实 chat 烟测确认入口未回归。 + - preview 过期、重复 confirm、candidate / preview 篡改、preview 与 session 不匹配都必须拒绝。 + - 冲突失败或不可写入场景必须失败并保留可排障状态。 + - trigger 幂等、notification 状态矩阵、outbox 重复消费和 `api / worker / all` 启动边界必须补齐证据。 - 更完整的边界清理:测试数据隔离策略、失败注入脚本化、前端真实地址替换为正式域名配置。 -4. 工作区注意: +3. 工作区注意: - 另一个前端对话可能在改前端;后端阶段不要碰 `frontend` 相关改动。 - 当前允许单个 Go 文件 700 行以内;超过 700 再评估拆分。 - 每次执行 `go test` 后必须清理根目录 `.gocache`。 - 后续阶段必须优先自动化验收:能由代码、API、DB 查询、日志查询验证的内容,由实现者自己跑完并记录结果。 - 如果受限于外部账号、真实飞书环境、浏览器人工交互、权限或本地环境,导致某项验收无法完成,不能默认为通过,也不能在报告中省略;必须明确写出未验收项、阻塞原因、建议由用户执行的操作和预期结果。 +### 0.4 2026-05-02 最新验收记录 + +这轮接手时,周四窗口的主动调度最小闭环已经再次跑通,可作为周报截图和阶段 5 续验的基线。 + +1. 测试账号使用 `test0424 / 123456`,任务为“做马原大作业”,并把 `mock_now` 固定到 `2026-04-30T01:14:21+08:00` 的周四窗口,避免周六空窗造成的误判。 +2. trigger `ast_1bb62e3e-f2cf-48a9-8f29-1461b99bff6b` 生成 preview `asp_e79db789-ba16-4108-a843-cd33c03aa3f6`,会话 `ce525dc0-101a-50ca-8993-7fc466328de2` 挂接当前 preview,`notification_records.id=22` 发送成功。 +3. DB 里 `active_schedule_triggers.status=preview_generated`、`active_schedule_previews.status=ready`、`active_schedule_sessions.status=ready_preview`,而 `schedule_events` 对该 preview 的正式写入计数为 0,说明这次只到预览态,还没有执行正式 apply。 +4. 浏览器里能看到周四列同时存在已有课程和待确认任务块,证明“滚动 24 小时窗口 + 课程事实 + task_pool 预览”的展示链路是完整的。 + ## 1. 文档目的 本文档承接《第二阶段主动调度 MVP 功能预期》和《微服务四步迁移与第二阶段并行开发计划》,用于把产品预期逐步落成可执行的工程方案。