Files
smartmate/backend/shared/events/active_schedule.go
Losita ba23ebd201 Version: 0.9.62.dev.260502
后端:
1. 主动调度补齐 `unfinished_feedback` 定位闭环——用户补充信息先在滚动窗口内定位到可校验的日程块,定位失败则继续 ask_user,不再硬猜 target_id 或直接跑 graph。
2. 聊天占管重跑链路加并发保护——`waiting_user_reply -> rerunning` 改为 DB CAS 抢占,重复补充只返回可见等待提示,避免并发生成多份 preview。
3. rerun 结果回写继续收口——新 preview_id 同步回 trigger 审计指针,session 只在拿到新 preview 时更新当前预览,ready_preview 后清空追问状态并释放回普通聊天。
4. 主动调度事件校验放宽 unfinished_feedback 的空 target 场景,允许先触发、后定位,再进入 graph + preview 主链路。
2026-05-02 12:41:50 +08:00

137 lines
4.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package events
import (
"encoding/json"
"errors"
"strconv"
"strings"
"time"
)
const (
ActiveScheduleTriggeredEventType = "active_schedule.triggered"
ActiveScheduleTriggeredEventVersion = "1"
)
const (
ActiveScheduleTriggerTypeImportantUrgentTask = "important_urgent_task"
ActiveScheduleTriggerTypeUnfinishedFeedback = "unfinished_feedback"
ActiveScheduleSourceWorkerDueJob = "worker_due_job"
ActiveScheduleSourceAPITrigger = "api_trigger"
ActiveScheduleSourceAPIDryRun = "api_dry_run"
ActiveScheduleSourceUserFeedback = "user_feedback"
ActiveScheduleTargetTypeTaskPool = "task_pool"
ActiveScheduleTargetTypeScheduleEvent = "schedule_event"
ActiveScheduleTargetTypeTaskItem = "task_item"
)
// ActiveScheduleTriggeredPayload 是 active_schedule.triggered 的事件载荷。
//
// 职责边界:
// 1. 只描述“主动调度链路需要处理一个触发信号”这一事件事实;
// 2. 不复用 GORM model也不承载候选生成、预览写入或通知投递逻辑
// 3. Payload 只保留触发源补充 JSON消费方需要按自身 DTO 再解析。
type ActiveScheduleTriggeredPayload struct {
TriggerID string `json:"trigger_id"`
UserID int `json:"user_id"`
TriggerType string `json:"trigger_type"`
Source string `json:"source"`
TargetType string `json:"target_type"`
TargetID int `json:"target_id"`
FeedbackID string `json:"feedback_id,omitempty"`
IdempotencyKey string `json:"idempotency_key,omitempty"`
DedupeKey string `json:"dedupe_key,omitempty"`
MockNow *time.Time `json:"mock_now,omitempty"`
IsMockTime bool `json:"is_mock_time"`
RequestedAt time.Time `json:"requested_at"`
Payload json.RawMessage `json:"payload,omitempty"`
TraceID string `json:"trace_id,omitempty"`
}
// Validate 校验事件契约必填字段与第一版枚举范围。
//
// 职责边界:
// 1. 只做协议级基础校验,避免无效事件进入 worker
// 2. 不检查 target 是否存在、是否归属用户,这些属于业务读模型责任;
// 3. dry-run 不应发布该事件,因此 source=api_dry_run 会被拒绝。
func (p ActiveScheduleTriggeredPayload) Validate() error {
if strings.TrimSpace(p.TriggerID) == "" {
return errors.New("trigger_id 不能为空")
}
if p.UserID <= 0 {
return errors.New("user_id 必须大于 0")
}
if !isAllowedActiveScheduleTriggerType(p.TriggerType) {
return errors.New("trigger_type 不在主动调度第一版允许范围内")
}
if !isAllowedActiveScheduleSource(p.Source) {
return errors.New("source 不在主动调度第一版允许范围内")
}
if p.Source == ActiveScheduleSourceAPIDryRun {
return errors.New("api_dry_run 不允许发布 active_schedule.triggered")
}
if !isAllowedActiveScheduleTargetType(p.TargetType) {
return errors.New("target_type 不在主动调度第一版允许范围内")
}
if p.TargetID <= 0 && strings.TrimSpace(p.TriggerType) != ActiveScheduleTriggerTypeUnfinishedFeedback {
return errors.New("target_id 必须大于 0")
}
if p.RequestedAt.IsZero() {
return errors.New("requested_at 不能为空")
}
if p.MockNow != nil && !p.IsMockTime {
return errors.New("mock_now 非空时必须标记 is_mock_time=true")
}
return nil
}
// MessageKey 返回 outbox/Kafka 消息键。
//
// 说明:
// 1. 按文档约定使用 user_id便于同一用户事件在消费侧保持局部有序
// 2. 只做字符串构造,不访问数据库。
func (p ActiveScheduleTriggeredPayload) MessageKey() string {
if p.UserID <= 0 {
return ""
}
return strconv.Itoa(p.UserID)
}
// AggregateID 返回事件聚合 ID。
//
// 说明:
// 1. active_schedule.triggered 的聚合主键是 trigger_id
// 2. 若 trigger_id 为空,返回空字符串,由发布方在 Validate 前发现问题。
func (p ActiveScheduleTriggeredPayload) AggregateID() string {
return strings.TrimSpace(p.TriggerID)
}
func isAllowedActiveScheduleTriggerType(value string) bool {
switch strings.TrimSpace(value) {
case ActiveScheduleTriggerTypeImportantUrgentTask, ActiveScheduleTriggerTypeUnfinishedFeedback:
return true
default:
return false
}
}
func isAllowedActiveScheduleSource(value string) bool {
switch strings.TrimSpace(value) {
case ActiveScheduleSourceWorkerDueJob, ActiveScheduleSourceAPITrigger, ActiveScheduleSourceAPIDryRun, ActiveScheduleSourceUserFeedback:
return true
default:
return false
}
}
func isAllowedActiveScheduleTargetType(value string) bool {
switch strings.TrimSpace(value) {
case ActiveScheduleTargetTypeTaskPool, ActiveScheduleTargetTypeScheduleEvent, ActiveScheduleTargetTypeTaskItem:
return true
default:
return false
}
}