Version: 0.9.69.dev.260504

后端:
1. 阶段 4 active-scheduler 服务边界落地,新增 `cmd/active-scheduler`、`services/active_scheduler`、`shared/contracts/activescheduler` 和 active-scheduler port,迁移 dry-run、trigger、preview、confirm zrpc 能力
2. active-scheduler outbox consumer、relay、retry loop 和 due job scanner 迁入独立服务入口,gateway `/active-schedule/*` 改为通过 zrpc client 调用
3. gateway 目录收口为 `gateway/api` + `gateway/client`,统一归档 userauth、notification、active-scheduler 的 HTTP 门面和 zrpc client
4. 将旧 `backend/active_scheduler` 领域核心下沉到 `services/active_scheduler/core`,清退旧根目录活跃实现,并补充 active-scheduler 启动期跨域依赖表检查
5. 调整单体启动与 outbox 归属,`cmd/all` 不再启动 active-scheduler workflow、scanner 或 handler

文档:
1. 更新微服务迁移计划,将阶段 4 active-scheduler 标记为首轮收口完成,并明确下一阶段进入 schedule / task / course / task-class
This commit is contained in:
Losita
2026-05-04 21:01:00 +08:00
parent abe3b4960e
commit 4d9a5c4d30
66 changed files with 2048 additions and 466 deletions

View File

@@ -0,0 +1,92 @@
package service
import (
"context"
"errors"
"time"
"github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/candidate"
schedulercontext "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/context"
"github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/observe"
"github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/ports"
"github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger"
)
// DryRunResult 是 API dry-run / worker 测试入口可直接消费的同步结果。
type DryRunResult struct {
Context *schedulercontext.ActiveScheduleContext
Observation observe.Result
Candidates []candidate.Candidate
}
// DryRunService 编排主动调度 dry-run 主链路。
//
// 职责边界:
// 1. 固定执行 BuildContext -> Observe -> GenerateCandidates
// 2. 不调用 LLM、不写 preview、不发 notification、不正式写日程
// 3. 后续 API / worker 应复用该入口,避免出现第二套 dry-run 诊断逻辑。
type DryRunService struct {
builder *schedulercontext.Builder
analyzer *observe.Analyzer
generator *candidate.Generator
}
// NewDryRunService 创建主动调度 dry-run 服务。
func NewDryRunService(readers ports.Readers) (*DryRunService, error) {
builder, err := schedulercontext.NewBuilder(readers)
if err != nil {
return nil, err
}
return &DryRunService{
builder: builder,
analyzer: observe.NewAnalyzer(),
generator: candidate.NewGenerator(),
}, nil
}
// SetClock 注入测试时钟。
func (s *DryRunService) SetClock(clock func() time.Time) {
if s != nil && s.builder != nil {
s.builder.SetClock(clock)
}
}
// DryRun 执行主动调度同步诊断。
func (s *DryRunService) DryRun(ctx context.Context, trig trigger.ActiveScheduleTrigger) (*DryRunResult, error) {
if s == nil || s.builder == nil || s.analyzer == nil || s.generator == nil {
return nil, errors.New("DryRunService 尚未正确初始化")
}
// 1. 构造上下文:读取 task / schedule / feedback 的只读事实快照。
activeContext, err := s.builder.BuildContext(ctx, trig)
if err != nil {
return nil, err
}
// 2. 主动观测:生成 metrics、issues 和初步裁决,不生成正式变更。
observation := s.analyzer.Observe(activeContext)
// 3. 候选生成:只枚举第一版允许的确定性候选,压缩融合保持关闭。
candidates := s.generator.GenerateCandidates(activeContext, observation)
fallbackCandidateID := ""
if len(candidates) > 0 {
fallbackCandidateID = candidates[0].CandidateID
}
observation = s.analyzer.FinalizeDecision(observation, len(applicableCandidates(candidates)), fallbackCandidateID)
return &DryRunResult{
Context: activeContext,
Observation: observation,
Candidates: candidates,
}, nil
}
func applicableCandidates(candidates []candidate.Candidate) []candidate.Candidate {
result := make([]candidate.Candidate, 0, len(candidates))
for _, item := range candidates {
if item.CandidateType == candidate.TypeAddTaskPoolToSchedule || item.CandidateType == candidate.TypeCreateMakeup {
result = append(result, item)
}
}
return result
}

View File

@@ -0,0 +1,31 @@
package service
import (
"context"
activegraph "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/graph"
"github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger"
)
// AsGraphDryRunFunc 把现有 dry-run service 适配成 graph runner 可用的入口。
//
// 职责边界:
// 1. 只做 service.Result -> graph.DryRunData 的轻量转换;
// 2. 不改写 dry-run 行为,不引入额外候选逻辑;
// 3. 让 graph runner 可以复用现有 BuildContext -> Observe -> GenerateCandidates 链路。
func (s *DryRunService) AsGraphDryRunFunc() activegraph.DryRunFunc {
if s == nil {
return nil
}
return func(ctx context.Context, trig trigger.ActiveScheduleTrigger) (*activegraph.DryRunData, error) {
result, err := s.DryRun(ctx, trig)
if err != nil {
return nil, err
}
return &activegraph.DryRunData{
Context: result.Context,
Observation: result.Observation,
Candidates: result.Candidates,
}, nil
}
}

View File

@@ -0,0 +1,366 @@
package service
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/LoveLosita/smartflow/backend/dao"
"github.com/LoveLosita/smartflow/backend/model"
activeapply "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/apply"
"github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/applyadapter"
activepreview "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/preview"
"gorm.io/gorm"
)
// PreviewConfirmService 编排第三阶段的预览生成、查询和确认应用。
//
// 职责边界:
// 1. 复用 dry-run 结果写 preview不重新实现候选生成
// 2. confirm 时只负责 preview 状态、幂等和 apply port 调用编排;
// 3. 正式 schedule 写入仍由 applyadapter 在事务中完成。
type PreviewConfirmService struct {
dryRun *DryRunService
preview *activepreview.Service
activeDAO *dao.ActiveScheduleDAO
applyAdapter *applyadapter.GormApplyAdapter
clock func() time.Time
}
func NewPreviewConfirmService(dryRun *DryRunService, previewService *activepreview.Service, activeDAO *dao.ActiveScheduleDAO, applyAdapter *applyadapter.GormApplyAdapter) (*PreviewConfirmService, error) {
if dryRun == nil {
return nil, errors.New("dry-run service 不能为空")
}
if previewService == nil {
return nil, errors.New("preview service 不能为空")
}
if activeDAO == nil {
return nil, errors.New("active schedule dao 不能为空")
}
if applyAdapter == nil {
return nil, errors.New("apply adapter 不能为空")
}
return &PreviewConfirmService{
dryRun: dryRun,
preview: previewService,
activeDAO: activeDAO,
applyAdapter: applyAdapter,
clock: time.Now,
}, nil
}
func (s *PreviewConfirmService) SetClock(clock func() time.Time) {
if s != nil && clock != nil {
s.clock = clock
}
}
func (s *PreviewConfirmService) CreatePreviewFromDryRun(ctx context.Context, req activepreview.CreatePreviewRequest) (*activepreview.CreatePreviewResponse, error) {
if s == nil || s.preview == nil {
return nil, errors.New("preview confirm service 未初始化")
}
return s.preview.CreatePreview(ctx, req)
}
func (s *PreviewConfirmService) GetPreview(ctx context.Context, userID int, previewID string) (*activepreview.ActiveSchedulePreviewDetail, error) {
if s == nil || s.preview == nil {
return nil, errors.New("preview confirm service 未初始化")
}
return s.preview.GetPreview(ctx, userID, previewID)
}
// ConfirmPreview 同步确认并应用主动调度预览。
//
// 步骤化说明:
// 1. 先读取 preview 并做同用户校验,避免跨用户确认;
// 2. 对已应用且命中同一幂等键的请求直接返回历史结果,避免重复写日程;
// 3. 转换 candidate/edited_changes 为 apply 请求;
// 4. 先把 preview 标记 applying再调用正式 apply adapter
// 5. 成功或失败都回写 preview保证接口返回后可排障。
func (s *PreviewConfirmService) ConfirmPreview(ctx context.Context, req activeapply.ConfirmRequest) (*activeapply.ConfirmResult, error) {
if s == nil || s.activeDAO == nil || s.applyAdapter == nil {
return nil, errors.New("preview confirm service 未初始化")
}
now := s.now()
if req.RequestedAt.IsZero() {
req.RequestedAt = now
}
previewRow, err := s.activeDAO.GetPreviewByID(ctx, req.PreviewID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, activeapply.NewApplyError(activeapply.ErrorCodeTargetNotFound, "预览不存在或已被删除", err)
}
return nil, err
}
if previewRow.UserID != req.UserID {
return nil, activeapply.NewApplyError(activeapply.ErrorCodeForbidden, "预览不属于当前用户", nil)
}
if previewRow.ApplyStatus == model.ActiveScheduleApplyStatusApplied {
if previewRow.ApplyIdempotencyKey == req.IdempotencyKey {
return alreadyAppliedResult(*previewRow), nil
}
return nil, activeapply.NewApplyError(activeapply.ErrorCodeAlreadyApplied, "预览已经应用,不能使用新的幂等键重复确认", nil)
}
applyReq, err := activeapply.ConvertConfirmToApplyRequest(*previewRow, req, now)
if err != nil {
_ = s.markApplyFailed(ctx, previewRow.ID, "", err)
return nil, err
}
if len(applyReq.Commands) == 0 {
return s.markNoopApplied(ctx, *applyReq)
}
if err = s.markApplying(ctx, *applyReq); err != nil {
return nil, err
}
adapterReq := toAdapterRequest(*applyReq)
adapterResult, err := s.applyAdapter.ApplyActiveScheduleChanges(ctx, adapterReq)
if err != nil {
classifiedErr := classifyAdapterApplyError(err)
_ = s.markApplyFailed(ctx, previewRow.ID, applyReq.ApplyID, classifiedErr)
return nil, classifiedErr
}
result := activeapply.ApplyActiveScheduleResult{
ApplyID: applyReq.ApplyID,
ApplyStatus: activeapply.ApplyStatusApplied,
AppliedEventIDs: adapterResult.AppliedEventIDs,
AppliedScheduleIDs: adapterResult.AppliedScheduleIDs,
AppliedChanges: applyReq.Changes,
SkippedChanges: applyReq.SkippedChanges,
RequestHash: applyReq.RequestHash,
NormalizedChangeHash: applyReq.NormalizedChangesHash,
}
if err = s.markApplied(ctx, *applyReq, result); err != nil {
return nil, err
}
return &activeapply.ConfirmResult{
PreviewID: applyReq.PreviewID,
ApplyID: applyReq.ApplyID,
ApplyStatus: activeapply.ApplyStatusApplied,
CandidateID: applyReq.CandidateID,
RequestHash: applyReq.RequestHash,
RequestBodyHash: applyReq.RequestBodyHash,
ApplyRequest: applyReq,
ApplyResult: &result,
SkippedChanges: applyReq.SkippedChanges,
}, nil
}
func (s *PreviewConfirmService) markApplying(ctx context.Context, req activeapply.ApplyActiveScheduleRequest) error {
return s.activeDAO.UpdatePreviewFields(ctx, req.PreviewID, map[string]any{
"apply_id": req.ApplyID,
"apply_status": model.ActiveScheduleApplyStatusApplying,
"apply_candidate_id": req.CandidateID,
"apply_idempotency_key": req.IdempotencyKey,
"apply_request_hash": req.RequestHash,
})
}
// markNoopApplied 处理 notify_only / ask_user / close 这类“确认成功但不写正式日程”的候选。
//
// 职责边界:
// 1. 只把 preview 标记为已处理,并保留幂等字段,便于同 key 重试直接命中历史结果;
// 2. 不调用 apply adapter因为这些 change 在转换阶段已经被归类为 skipped_changes
// 3. 失败时直接返回数据库错误,调用方应按系统错误处理,避免前端误以为确认成功。
func (s *PreviewConfirmService) markNoopApplied(ctx context.Context, req activeapply.ApplyActiveScheduleRequest) (*activeapply.ConfirmResult, error) {
result := activeapply.ApplyActiveScheduleResult{
ApplyID: req.ApplyID,
ApplyStatus: activeapply.ApplyStatusApplied,
AppliedChanges: []activeapply.ApplyChange{},
SkippedChanges: req.SkippedChanges,
RequestHash: req.RequestHash,
NormalizedChangeHash: req.NormalizedChangesHash,
}
if err := s.markApplied(ctx, req, result); err != nil {
return nil, err
}
return &activeapply.ConfirmResult{
PreviewID: req.PreviewID,
ApplyID: req.ApplyID,
ApplyStatus: activeapply.ApplyStatusApplied,
CandidateID: req.CandidateID,
RequestHash: req.RequestHash,
RequestBodyHash: req.RequestBodyHash,
ApplyRequest: &req,
ApplyResult: &result,
SkippedChanges: req.SkippedChanges,
}, nil
}
func (s *PreviewConfirmService) markApplied(ctx context.Context, req activeapply.ApplyActiveScheduleRequest, result activeapply.ApplyActiveScheduleResult) error {
now := s.now()
appliedChangesJSON := mustJSON(result.AppliedChanges)
appliedEventIDsJSON := mustJSON(result.AppliedEventIDs)
return s.activeDAO.UpdatePreviewFields(ctx, req.PreviewID, map[string]any{
"status": model.ActiveSchedulePreviewStatusApplied,
"apply_id": req.ApplyID,
"apply_status": model.ActiveScheduleApplyStatusApplied,
"apply_candidate_id": req.CandidateID,
"apply_idempotency_key": req.IdempotencyKey,
"apply_request_hash": req.RequestHash,
"applied_changes_json": &appliedChangesJSON,
"applied_event_ids_json": &appliedEventIDsJSON,
"apply_error": nil,
"applied_at": &now,
})
}
func (s *PreviewConfirmService) markApplyFailed(ctx context.Context, previewID string, applyID string, err error) error {
if previewID == "" {
return nil
}
message := ""
if err != nil {
message = err.Error()
}
status := model.ActiveScheduleApplyStatusFailed
if applyErr, ok := activeapply.AsApplyError(err); ok {
switch applyErr.Code {
case activeapply.ErrorCodeExpired:
status = model.ActiveScheduleApplyStatusExpired
case activeapply.ErrorCodeDBError:
status = model.ActiveScheduleApplyStatusFailed
default:
status = model.ActiveScheduleApplyStatusRejected
}
}
updates := map[string]any{
"apply_status": status,
"apply_error": &message,
}
if applyID != "" {
updates["apply_id"] = applyID
}
return s.activeDAO.UpdatePreviewFields(ctx, previewID, updates)
}
// classifyAdapterApplyError 把正式写库 adapter 的错误转换为 confirm 层统一错误码。
//
// 职责边界:
// 1. 只处理 applyadapter 已声明的业务错误码,保持 API 层只理解 active_scheduler/apply 包;
// 2. 未知错误统一归为 db_error避免把真实系统故障错误映射为用户可修正的 4xx
// 3. 原始错误作为 cause 保留,日志和 apply_error 仍能追到 adapter 返回的完整信息。
func classifyAdapterApplyError(err error) error {
if err == nil {
return nil
}
var adapterErr *applyadapter.ApplyError
if !errors.As(err, &adapterErr) {
return activeapply.NewApplyError(activeapply.ErrorCodeDBError, "主动调度正式写库失败", err)
}
switch adapterErr.Code {
case applyadapter.ErrorCodeInvalidRequest:
return activeapply.NewApplyError(activeapply.ErrorCodeInvalidRequest, adapterErr.Message, err)
case applyadapter.ErrorCodeUnsupportedChangeType:
return activeapply.NewApplyError(activeapply.ErrorCodeUnsupportedChangeType, adapterErr.Message, err)
case applyadapter.ErrorCodeTargetNotFound:
return activeapply.NewApplyError(activeapply.ErrorCodeTargetNotFound, adapterErr.Message, err)
case applyadapter.ErrorCodeTargetCompleted:
return activeapply.NewApplyError(activeapply.ErrorCodeTargetCompleted, adapterErr.Message, err)
case applyadapter.ErrorCodeTargetAlreadyScheduled:
return activeapply.NewApplyError(activeapply.ErrorCodeTargetAlreadySchedule, adapterErr.Message, err)
case applyadapter.ErrorCodeSlotConflict:
return activeapply.NewApplyError(activeapply.ErrorCodeSlotConflict, adapterErr.Message, err)
case applyadapter.ErrorCodeInvalidEditedChanges:
return activeapply.NewApplyError(activeapply.ErrorCodeInvalidEditedChanges, adapterErr.Message, err)
default:
return activeapply.NewApplyError(activeapply.ErrorCodeDBError, adapterErr.Message, err)
}
}
func (s *PreviewConfirmService) now() time.Time {
if s == nil || s.clock == nil {
return time.Now()
}
return s.clock()
}
func toAdapterRequest(req activeapply.ApplyActiveScheduleRequest) applyadapter.ApplyActiveScheduleRequest {
changes := make([]applyadapter.ApplyChange, 0, len(req.Changes))
for _, change := range req.Changes {
changes = append(changes, toAdapterChange(change))
}
return applyadapter.ApplyActiveScheduleRequest{
PreviewID: req.PreviewID,
ApplyID: req.ApplyID,
UserID: req.UserID,
CandidateID: req.CandidateID,
Changes: changes,
RequestedAt: req.RequestedAt,
TraceID: req.TraceID,
}
}
func toAdapterChange(change activeapply.ApplyChange) applyadapter.ApplyChange {
return applyadapter.ApplyChange{
ChangeID: change.ChangeID,
ChangeType: string(change.Type),
TargetType: change.TargetType,
TargetID: change.TargetID,
ToSlot: toAdapterSlotSpan(change),
DurationSections: change.DurationSections,
Metadata: cloneStringMap(change.Metadata),
}
}
func toAdapterSlotSpan(change activeapply.ApplyChange) *applyadapter.SlotSpan {
if len(change.Slots) == 0 {
return nil
}
start := change.Slots[0]
end := change.Slots[len(change.Slots)-1]
return &applyadapter.SlotSpan{
Start: applyadapter.Slot{Week: start.Week, DayOfWeek: start.DayOfWeek, Section: start.Section},
End: applyadapter.Slot{Week: end.Week, DayOfWeek: end.DayOfWeek, Section: end.Section},
DurationSections: len(change.Slots),
}
}
func alreadyAppliedResult(preview model.ActiveSchedulePreview) *activeapply.ConfirmResult {
appliedEventIDs := []int{}
if preview.AppliedEventIDsJSON != nil && *preview.AppliedEventIDsJSON != "" {
_ = json.Unmarshal([]byte(*preview.AppliedEventIDsJSON), &appliedEventIDs)
}
return &activeapply.ConfirmResult{
PreviewID: preview.ID,
ApplyID: stringValue(preview.ApplyID),
ApplyStatus: activeapply.ApplyStatusApplied,
CandidateID: preview.ApplyCandidateID,
RequestHash: preview.ApplyRequestHash,
ApplyResult: &activeapply.ApplyActiveScheduleResult{
ApplyID: stringValue(preview.ApplyID),
ApplyStatus: activeapply.ApplyStatusApplied,
AppliedEventIDs: appliedEventIDs,
RequestHash: preview.ApplyRequestHash,
},
}
}
func mustJSON(value any) string {
raw, err := json.Marshal(value)
if err != nil {
return "null"
}
return string(raw)
}
func stringValue(value *string) string {
if value == nil {
return ""
}
return *value
}
func cloneStringMap(input map[string]string) map[string]string {
if len(input) == 0 {
return nil
}
output := make(map[string]string, len(input))
for key, value := range input {
output[key] = value
}
return output
}

View File

@@ -0,0 +1,302 @@
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/LoveLosita/smartflow/backend/dao"
"github.com/LoveLosita/smartflow/backend/model"
activepreview "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/preview"
"github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/selection"
"github.com/google/uuid"
"gorm.io/gorm"
)
var (
activeScheduleConversationNamespace = uuid.NewSHA1(uuid.NameSpaceURL, []byte("smartflow:active_schedule:conversation"))
activeScheduleSessionNamespace = uuid.NewSHA1(uuid.NameSpaceURL, []byte("smartflow:active_schedule:session"))
)
// WithActiveScheduleSessionBridge 注入主动调度 session 预创建所需的 DAO。
//
// 职责边界:
// 1. 只把 trigger -> notification 前的会话桥接能力接入 workflow
// 2. 不改变 dry-run / preview / notification 的主状态机;
// 3. 为空时保留旧能力,便于局部测试与迁移期回退。
func WithActiveScheduleSessionBridge(agentDAO *dao.AgentDAO, sessionDAO *dao.ActiveScheduleSessionDAO) TriggerWorkflowOption {
return func(s *TriggerWorkflowService) {
if s == nil {
return
}
s.agentDAO = agentDAO
s.sessionDAO = sessionDAO
}
}
// bootstrapActiveScheduleConversationInTx 负责在 notification 发出前预建会话与首屏内容。
//
// 步骤化说明:
// 1. 先生成确定性的 conversation/session ID保证 trigger 重试时不会拆成多条会话;
// 2. 再在同一事务里创建或复用 agent_chats 和 active_schedule_sessions
// 3. 如果是首次落库,则顺手补一条 assistant_text必要时再补一张主动调度卡片
// 4. 任一步失败都直接返回 error让上层事务整体回滚避免“通知已发但会话底稿没落”。
func (s *TriggerWorkflowService) bootstrapActiveScheduleConversationInTx(
ctx context.Context,
tx *gorm.DB,
triggerRow model.ActiveScheduleTrigger,
previewDetail activepreview.ActiveSchedulePreviewDetail,
selectionResult selection.Result,
now time.Time,
) error {
if s == nil {
return errors.New("主动调度会话桥未初始化")
}
if s.agentDAO == nil || s.sessionDAO == nil {
return nil
}
if tx == nil {
return errors.New("gorm tx 不能为空")
}
if triggerRow.ID == "" {
return errors.New("trigger_id 不能为空")
}
conversationID := buildActiveScheduleConversationID(triggerRow.ID)
sessionID := buildActiveScheduleSessionID(triggerRow.ID)
txAgentDAO := s.agentDAO.WithTx(tx)
txSessionDAO := s.sessionDAO.WithTx(tx)
if err := ensureAgentConversationExists(ctx, txAgentDAO, triggerRow.UserID, conversationID); err != nil {
return err
}
baseSeq, err := txAgentDAO.GetConversationTimelineMaxSeq(ctx, triggerRow.UserID, conversationID)
if err != nil {
return err
}
// 1. 只有首次创建会话时才写首屏消息,避免同一 trigger 的重试把时间线重复刷一遍。
// 2. 若 timeline 已存在,说明这段主动调度会话已经被成功预热过,直接复用现成内容即可。
if baseSeq == 0 {
assistantText := resolveInitialActiveScheduleAssistantText(selectionResult, previewDetail)
if assistantText != "" {
if err := txAgentDAO.SaveChatHistoryInTx(ctx, triggerRow.UserID, conversationID, "assistant", assistantText, "", 0, 0, ""); err != nil {
return err
}
if err := saveActiveScheduleTimelineEvent(ctx, txAgentDAO, triggerRow.UserID, conversationID, baseSeq+1, model.AgentTimelineKindAssistantText, "assistant", assistantText, nil); err != nil {
return err
}
baseSeq++
}
if shouldSeedActiveSchedulePreviewCard(selectionResult) {
cardPayload, err := buildActiveScheduleBusinessCardPayload(previewDetail)
if err != nil {
return err
}
if err := saveActiveScheduleTimelineEvent(ctx, txAgentDAO, triggerRow.UserID, conversationID, baseSeq+1, model.AgentTimelineKindBusinessCard, "assistant", assistantText, cardPayload); err != nil {
return err
}
}
}
sessionSnapshot := &model.ActiveScheduleSessionSnapshot{
SessionID: sessionID,
UserID: triggerRow.UserID,
ConversationID: conversationID,
TriggerID: triggerRow.ID,
CurrentPreviewID: strings.TrimSpace(previewDetail.PreviewID),
Status: resolveInitialActiveScheduleSessionStatus(selectionResult),
State: buildInitialActiveScheduleSessionState(selectionResult, previewDetail),
CreatedAt: now,
UpdatedAt: now,
}
return txSessionDAO.UpsertActiveScheduleSession(ctx, sessionSnapshot)
}
func buildActiveScheduleConversationID(triggerID string) string {
normalized := strings.TrimSpace(triggerID)
if normalized == "" {
return uuid.NewString()
}
return uuid.NewSHA1(activeScheduleConversationNamespace, []byte(normalized)).String()
}
func buildActiveScheduleSessionID(triggerID string) string {
normalized := strings.TrimSpace(triggerID)
if normalized == "" {
return uuid.NewString()
}
return uuid.NewSHA1(activeScheduleSessionNamespace, []byte(normalized)).String()
}
func ensureAgentConversationExists(ctx context.Context, agentDAO *dao.AgentDAO, userID int, conversationID string) error {
if agentDAO == nil {
return errors.New("agent dao 不能为空")
}
if userID <= 0 {
return fmt.Errorf("invalid user_id: %d", userID)
}
normalizedConversationID := strings.TrimSpace(conversationID)
if normalizedConversationID == "" {
return errors.New("conversation_id 不能为空")
}
exists, err := agentDAO.IfChatExists(ctx, userID, normalizedConversationID)
if err != nil {
return err
}
if exists {
return nil
}
_, err = agentDAO.CreateNewChat(userID, normalizedConversationID)
return err
}
func resolveInitialActiveScheduleAssistantText(selectionResult selection.Result, previewDetail activepreview.ActiveSchedulePreviewDetail) string {
switch selectionResult.Action {
case selection.ActionAskUser:
return firstNonEmptyString(
selectionResult.AskUserQuestion,
selectionResult.ExplanationText,
previewDetail.Explanation,
previewDetail.Notification,
"请先补充主动调度需要的关键信息。",
)
default:
return firstNonEmptyString(
selectionResult.ExplanationText,
selectionResult.NotificationSummary,
previewDetail.Notification,
previewDetail.Explanation,
"主动调度建议已更新。",
)
}
}
func shouldSeedActiveSchedulePreviewCard(selectionResult selection.Result) bool {
return selectionResult.Action == selection.ActionSelectCandidate
}
func resolveInitialActiveScheduleSessionStatus(selectionResult selection.Result) string {
switch selectionResult.Action {
case selection.ActionAskUser:
return model.ActiveScheduleSessionStatusWaitingUserReply
case selection.ActionSelectCandidate:
return model.ActiveScheduleSessionStatusReadyPreview
default:
return model.ActiveScheduleSessionStatusIgnored
}
}
func buildInitialActiveScheduleSessionState(
selectionResult selection.Result,
previewDetail activepreview.ActiveSchedulePreviewDetail,
) model.ActiveScheduleSessionState {
state := model.ActiveScheduleSessionState{
LastCandidateID: strings.TrimSpace(selectionResult.SelectedCandidateID),
MissingInfo: cloneStringSlice(previewDetail.ContextSummary.MissingInfo),
}
if !previewDetail.ExpiresAt.IsZero() {
expiresAt := previewDetail.ExpiresAt
state.ExpiresAt = &expiresAt
}
switch selectionResult.Action {
case selection.ActionAskUser:
state.PendingQuestion = firstNonEmptyString(
selectionResult.AskUserQuestion,
selectionResult.ExplanationText,
)
case selection.ActionSelectCandidate:
state.PendingQuestion = ""
state.MissingInfo = nil
state.FailedReason = ""
default:
state.PendingQuestion = ""
state.MissingInfo = nil
state.ExpiresAt = nil
}
return state
}
func buildActiveScheduleBusinessCardPayload(detail activepreview.ActiveSchedulePreviewDetail) (map[string]any, error) {
raw, err := json.Marshal(map[string]any{
"business_card": map[string]any{
"card_type": "active_schedule_preview",
"title": "SmartFlow 日程调整建议",
"summary": firstNonEmptyString(detail.Notification, detail.Explanation, detail.SelectedCandidate.Summary),
"data": detail,
},
})
if err != nil {
return nil, err
}
var payload map[string]any
if err := json.Unmarshal(raw, &payload); err != nil {
return nil, err
}
return payload, nil
}
func saveActiveScheduleTimelineEvent(
ctx context.Context,
agentDAO *dao.AgentDAO,
userID int,
conversationID string,
seq int64,
kind string,
role string,
content string,
payload map[string]any,
) error {
if agentDAO == nil {
return errors.New("agent dao 不能为空")
}
normalizedConversationID := strings.TrimSpace(conversationID)
if userID <= 0 || normalizedConversationID == "" {
return errors.New("时间线事件主键不合法")
}
payloadJSON := ""
if len(payload) > 0 {
raw, err := json.Marshal(payload)
if err != nil {
return err
}
payloadJSON = string(raw)
}
_, _, err := agentDAO.SaveConversationTimelineEvent(ctx, model.ChatTimelinePersistPayload{
UserID: userID,
ConversationID: normalizedConversationID,
Seq: seq,
Kind: kind,
Role: role,
Content: content,
PayloadJSON: payloadJSON,
TokensConsumed: 0,
})
return err
}
func firstNonEmptyString(values ...string) string {
for _, value := range values {
if trimmed := strings.TrimSpace(value); trimmed != "" {
return trimmed
}
}
return ""
}
func cloneStringSlice(values []string) []string {
if len(values) == 0 {
return nil
}
cloned := make([]string, len(values))
copy(cloned, values)
return cloned
}

View File

@@ -0,0 +1,270 @@
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/LoveLosita/smartflow/backend/dao"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
"github.com/LoveLosita/smartflow/backend/model"
"github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger"
sharedevents "github.com/LoveLosita/smartflow/backend/shared/events"
"github.com/google/uuid"
"gorm.io/gorm"
)
const triggerDedupeWindow = 30 * time.Minute
// TriggerRequest 是正式主动调度触发入口的请求 DTO。
//
// 职责边界:
// 1. 负责承载 API trigger、worker due job、用户反馈归一后的触发事实
// 2. 不承载 dry-run 结果、preview 快照或 notification provider 参数;
// 3. Payload 只保存触发来源补充信息,不能塞任意业务写库参数。
type TriggerRequest struct {
UserID int
TriggerType trigger.TriggerType
Source trigger.Source
TargetType trigger.TargetType
TargetID int
FeedbackID string
IdempotencyKey string
DedupeKey string
MockNow *time.Time
IsMockTime bool
RequestedAt time.Time
Payload json.RawMessage
JobID *string
TraceID string
}
// TriggerResponse 是正式触发写入后的结果。
type TriggerResponse struct {
TriggerID string `json:"trigger_id"`
Status string `json:"status"`
PreviewID *string `json:"preview_id,omitempty"`
DedupeHit bool `json:"dedupe_hit"`
TraceID string `json:"trace_id,omitempty"`
}
// TriggerService 负责写入正式 trigger 并发布 active_schedule.triggered 事件。
//
// 职责边界:
// 1. 只负责触发信号持久化、去重和事件发布;
// 2. 不执行 dry-run、不写 preview、不发飞书
// 3. outbox 未启用时返回明确错误,避免调用方误以为正式链路已启动。
type TriggerService struct {
activeDAO *dao.ActiveScheduleDAO
publisher outboxinfra.EventPublisher
clock func() time.Time
}
func NewTriggerService(activeDAO *dao.ActiveScheduleDAO, publisher outboxinfra.EventPublisher) (*TriggerService, error) {
if activeDAO == nil {
return nil, errors.New("active schedule dao 不能为空")
}
return &TriggerService{
activeDAO: activeDAO,
publisher: publisher,
clock: time.Now,
}, nil
}
func (s *TriggerService) SetClock(clock func() time.Time) {
if s != nil && clock != nil {
s.clock = clock
}
}
// CreateAndPublish 创建正式 trigger 并发布 outbox 事件。
//
// 步骤化说明:
// 1. 先按主动调度 trigger DTO 做入口校验,确保 mock_now 不会从 worker 入口混入;
// 2. 再用 idempotency_key / dedupe_key 查询已有 trigger命中则直接返回旧状态
// 3. 新 trigger 先落库,再发布 outbox发布失败会把 trigger 标记 failed便于排障
// 4. 返回 nil error 只表示事件已入 outbox不表示 worker 已经生成 preview。
func (s *TriggerService) CreateAndPublish(ctx context.Context, req TriggerRequest) (*TriggerResponse, error) {
if s == nil || s.activeDAO == nil {
return nil, errors.New("trigger service 未初始化")
}
if s.publisher == nil {
return nil, errors.New("outbox event bus 未启用,无法执行正式主动调度 trigger")
}
now := s.now()
if req.RequestedAt.IsZero() {
req.RequestedAt = now
}
if req.IsMockTime && req.MockNow == nil {
return nil, errors.New("is_mock_time=true 时 mock_now 不能为空")
}
trig := trigger.ActiveScheduleTrigger{
UserID: req.UserID,
TriggerType: req.TriggerType,
Source: req.Source,
TargetType: req.TargetType,
TargetID: req.TargetID,
FeedbackID: req.FeedbackID,
IdempotencyKey: req.IdempotencyKey,
MockNow: req.MockNow,
IsMockTime: req.IsMockTime,
RequestedAt: req.RequestedAt,
TraceID: firstNonEmpty(req.TraceID, fmt.Sprintf("trace_active_trigger_%d", now.UnixNano())),
}
if err := trig.Validate(); err != nil {
return nil, err
}
if trig.Source == trigger.SourceAPIDryRun {
return nil, errors.New("api_dry_run 不允许创建正式 trigger")
}
dedupeKey := strings.TrimSpace(req.DedupeKey)
if dedupeKey == "" {
dedupeKey = BuildTriggerDedupeKey(req.UserID, req.TriggerType, req.TargetType, req.TargetID, req.FeedbackID, req.IdempotencyKey, trig.EffectiveNow(req.RequestedAt))
}
if existing, ok, err := s.findExistingTrigger(ctx, req.UserID, string(req.TriggerType), req.IdempotencyKey, dedupeKey); err != nil {
return nil, err
} else if ok {
return triggerResponseFromModel(existing, true), nil
}
payloadJSON := string(req.Payload)
if strings.TrimSpace(payloadJSON) == "" {
payloadJSON = "{}"
}
triggerID := "ast_" + uuid.NewString()
row := &model.ActiveScheduleTrigger{
ID: triggerID,
UserID: req.UserID,
TriggerType: string(req.TriggerType),
Source: string(req.Source),
TargetType: string(req.TargetType),
TargetID: req.TargetID,
FeedbackID: strings.TrimSpace(req.FeedbackID),
JobID: req.JobID,
IdempotencyKey: strings.TrimSpace(req.IdempotencyKey),
DedupeKey: dedupeKey,
Status: model.ActiveScheduleTriggerStatusPending,
MockNow: req.MockNow,
IsMockTime: req.IsMockTime,
RequestedAt: req.RequestedAt,
PayloadJSON: &payloadJSON,
TraceID: trig.TraceID,
}
if err := s.activeDAO.CreateTrigger(ctx, row); err != nil {
return nil, err
}
eventPayload := sharedevents.ActiveScheduleTriggeredPayload{
TriggerID: row.ID,
UserID: row.UserID,
TriggerType: row.TriggerType,
Source: row.Source,
TargetType: row.TargetType,
TargetID: row.TargetID,
FeedbackID: row.FeedbackID,
IdempotencyKey: row.IdempotencyKey,
DedupeKey: row.DedupeKey,
MockNow: row.MockNow,
IsMockTime: row.IsMockTime,
RequestedAt: row.RequestedAt,
Payload: json.RawMessage(payloadJSON),
TraceID: row.TraceID,
}
if err := eventPayload.Validate(); err != nil {
_ = s.markTriggerFailed(ctx, row.ID, "payload_invalid", err)
return nil, err
}
if err := s.publisher.Publish(ctx, outboxinfra.PublishRequest{
EventType: sharedevents.ActiveScheduleTriggeredEventType,
EventVersion: sharedevents.ActiveScheduleTriggeredEventVersion,
MessageKey: eventPayload.MessageKey(),
AggregateID: eventPayload.AggregateID(),
Payload: eventPayload,
}); err != nil {
_ = s.markTriggerFailed(ctx, row.ID, "outbox_publish_failed", err)
return nil, err
}
return triggerResponseFromModel(row, false), nil
}
func (s *TriggerService) findExistingTrigger(ctx context.Context, userID int, triggerType string, idempotencyKey string, dedupeKey string) (*model.ActiveScheduleTrigger, bool, error) {
if strings.TrimSpace(idempotencyKey) != "" {
existing, err := s.activeDAO.FindTriggerByIdempotencyKey(ctx, userID, triggerType, idempotencyKey)
if err == nil {
return existing, true, nil
}
if !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, false, err
}
}
statuses := []string{
model.ActiveScheduleTriggerStatusPending,
model.ActiveScheduleTriggerStatusProcessing,
model.ActiveScheduleTriggerStatusPreviewGenerated,
}
existing, err := s.activeDAO.FindTriggerByDedupeKey(ctx, dedupeKey, statuses)
if err == nil {
return existing, true, nil
}
if !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, false, err
}
return nil, false, nil
}
func (s *TriggerService) markTriggerFailed(ctx context.Context, triggerID string, code string, err error) error {
message := ""
if err != nil {
message = err.Error()
}
now := s.now()
return s.activeDAO.UpdateTriggerFields(ctx, triggerID, map[string]any{
"status": model.ActiveScheduleTriggerStatusFailed,
"last_error_code": code,
"last_error": &message,
"completed_at": &now,
})
}
func (s *TriggerService) now() time.Time {
if s == nil || s.clock == nil {
return time.Now()
}
return s.clock()
}
// BuildTriggerDedupeKey 生成正式触发去重键。
//
// 说明:
// 1. important_urgent_task 按 30 分钟窗口聚合,避免同一任务反复生成预览;
// 2. unfinished_feedback 优先使用 feedback_id/idempotency_key不做固定时间窗强去重
// 3. 参数非法时仍返回可读字符串,调用方会在 trigger.Validate 阶段拒绝非法输入。
func BuildTriggerDedupeKey(userID int, triggerType trigger.TriggerType, targetType trigger.TargetType, targetID int, feedbackID string, idempotencyKey string, at time.Time) string {
if triggerType == trigger.TriggerTypeUnfinishedFeedback {
return fmt.Sprintf("%d:%s:%s", userID, triggerType, firstNonEmpty(feedbackID, idempotencyKey, fmt.Sprintf("%s:%d", targetType, targetID)))
}
if at.IsZero() {
at = time.Now()
}
windowStart := at.Truncate(triggerDedupeWindow)
return fmt.Sprintf("%d:%s:%s:%d:%s", userID, triggerType, targetType, targetID, windowStart.Format(time.RFC3339))
}
func triggerResponseFromModel(row *model.ActiveScheduleTrigger, dedupeHit bool) *TriggerResponse {
if row == nil {
return &TriggerResponse{DedupeHit: dedupeHit}
}
return &TriggerResponse{
TriggerID: row.ID,
Status: row.Status,
PreviewID: row.PreviewID,
DedupeHit: dedupeHit,
TraceID: row.TraceID,
}
}

View File

@@ -0,0 +1,219 @@
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
"github.com/LoveLosita/smartflow/backend/model"
sharedevents "github.com/LoveLosita/smartflow/backend/shared/events"
)
// EnqueueActiveScheduleTriggeredInTx 在事务内写入 active_schedule.triggered outbox 消息。
//
// 职责边界:
// 1. 只负责把已经校验好的事件契约写入 outbox
// 2. 不负责创建 trigger 记录trigger 真值应由调用方先落库;
// 3. 失败时返回 error让上层决定是否整体回滚与重试。
func EnqueueActiveScheduleTriggeredInTx(
ctx context.Context,
outboxRepo *outboxinfra.Repository,
maxRetry int,
payload sharedevents.ActiveScheduleTriggeredPayload,
) error {
return enqueueContractEventInTx(
ctx,
outboxRepo,
maxRetry,
sharedevents.ActiveScheduleTriggeredEventType,
sharedevents.ActiveScheduleTriggeredEventVersion,
payload.MessageKey(),
payload.AggregateID(),
payload.AggregateID(),
payload,
payload.Validate,
)
}
// EnqueueNotificationFeishuRequestedInTx 在事务内写入 notification.feishu.requested outbox 消息。
//
// 职责边界:
// 1. 只做事件契约序列化和 outbox 入队;
// 2. 不负责 notification_records 幂等与 provider 调用;
// 3. 失败时直接返回,让 trigger -> preview -> notification 保持同事务回滚。
func EnqueueNotificationFeishuRequestedInTx(
ctx context.Context,
outboxRepo *outboxinfra.Repository,
maxRetry int,
payload sharedevents.FeishuNotificationRequestedPayload,
) error {
if err := ensureNotificationFeishuOutboxRoute(); err != nil {
return err
}
return enqueueContractEventInTx(
ctx,
outboxRepo,
maxRetry,
sharedevents.NotificationFeishuRequestedEventType,
sharedevents.NotificationFeishuRequestedEventVersion,
payload.MessageKey(),
payload.AggregateID(),
payload.AggregateID(),
payload,
payload.Validate,
)
}
// BuildTriggeredPayloadFromModel 把持久化 trigger 还原成事件载荷。
//
// 职责边界:
// 1. 只做 model -> contract DTO 映射;
// 2. 不校验 trigger 是否应该被处理,业务真值判断由 scanner / worker 完成;
// 3. 若 payload_json 不是合法 JSON返回 error让调用方回滚本次触发。
// ensureNotificationFeishuOutboxRoute 确保 publisher 侧能把飞书通知事件写入 notification outbox。
//
// 职责边界:
// 1. 这里只登记 event_type -> notification 服务归属,不注册 handler也不启动单体旧消费者
// 2. RegisterEventService 本身幂等,重复调用用于覆盖 API/worker 不同启动路径;
// 3. 若路由登记失败,直接返回给事务调用方,让 trigger 与 notification 入队一起回滚。
func ensureNotificationFeishuOutboxRoute() error {
return outboxinfra.RegisterEventService(sharedevents.NotificationFeishuRequestedEventType, outboxinfra.ServiceNotification)
}
func BuildTriggeredPayloadFromModel(row model.ActiveScheduleTrigger) (sharedevents.ActiveScheduleTriggeredPayload, error) {
var rawPayload json.RawMessage
if row.PayloadJSON != nil && strings.TrimSpace(*row.PayloadJSON) != "" {
rawPayload = json.RawMessage(strings.TrimSpace(*row.PayloadJSON))
if !json.Valid(rawPayload) {
return sharedevents.ActiveScheduleTriggeredPayload{}, errors.New("trigger payload_json 不是合法 JSON")
}
}
payload := sharedevents.ActiveScheduleTriggeredPayload{
TriggerID: row.ID,
UserID: row.UserID,
TriggerType: row.TriggerType,
Source: row.Source,
TargetType: row.TargetType,
TargetID: row.TargetID,
FeedbackID: row.FeedbackID,
IdempotencyKey: row.IdempotencyKey,
DedupeKey: row.DedupeKey,
MockNow: row.MockNow,
IsMockTime: row.IsMockTime,
RequestedAt: row.RequestedAt,
Payload: rawPayload,
TraceID: row.TraceID,
}
if err := payload.Validate(); err != nil {
return sharedevents.ActiveScheduleTriggeredPayload{}, err
}
return payload, nil
}
// BuildFeishuRequestedPayload 生成通知事件载荷。
//
// 职责边界:
// 1. 只做 trigger/preview 快照到通知契约的拼装;
// 2. 不判断是否真的要发通知,上层应先根据 decision.ShouldNotify 决定是否调用;
// 3. fallback 文案只做兜底,不替代后续 notification handler 的 provider 级策略。
func BuildFeishuRequestedPayload(
triggerRow model.ActiveScheduleTrigger,
previewID string,
notificationSummary string,
requestedAt time.Time,
) sharedevents.FeishuNotificationRequestedPayload {
summary := strings.TrimSpace(notificationSummary)
targetURL := fmt.Sprintf("/assistant/%s", buildActiveScheduleConversationID(triggerRow.ID))
return sharedevents.FeishuNotificationRequestedPayload{
UserID: triggerRow.UserID,
TriggerID: triggerRow.ID,
PreviewID: strings.TrimSpace(previewID),
TriggerType: triggerRow.TriggerType,
TargetType: triggerRow.TargetType,
TargetID: triggerRow.TargetID,
DedupeKey: BuildNotificationDedupeKey(triggerRow.UserID, triggerRow.TriggerType, triggerRow.RequestedAt),
TargetURL: targetURL,
SummaryText: summary,
FallbackText: buildNotificationFallbackText(summary, targetURL),
TraceID: triggerRow.TraceID,
RequestedAt: requestedAt,
}
}
// BuildNotificationDedupeKey 生成通知 30 分钟窗口去重键。
//
// 说明:
// 1. 第一版按 user_id + trigger_type + time_window 聚合;
// 2. 当 requested_at 缺失时回退到当前时间,避免空值直接写出脏 dedupe_key
// 3. 不拼 preview_id保证同一窗口内多次重试只会落到同一组通知记录。
func BuildNotificationDedupeKey(userID int, triggerType string, requestedAt time.Time) string {
if requestedAt.IsZero() {
requestedAt = time.Now()
}
return sharedevents.BuildFeishuNotificationDedupeKey(userID, triggerType, requestedAt, sharedevents.DefaultFeishuNotificationDedupeWindow)
}
func enqueueContractEventInTx(
ctx context.Context,
outboxRepo *outboxinfra.Repository,
maxRetry int,
eventType string,
eventVersion string,
messageKey string,
aggregateID string,
eventID string,
payload any,
validate func() error,
) error {
if outboxRepo == nil {
return errors.New("outbox repository 不能为空")
}
if validate == nil {
return errors.New("事件校验函数不能为空")
}
if err := validate(); err != nil {
return err
}
payloadJSON, err := json.Marshal(payload)
if err != nil {
return err
}
if maxRetry <= 0 {
maxRetry = 20
}
wrapped := outboxinfra.OutboxEventPayload{
EventID: strings.TrimSpace(eventID),
EventType: eventType,
EventVersion: strings.TrimSpace(eventVersion),
AggregateID: strings.TrimSpace(aggregateID),
Payload: payloadJSON,
}
// 1. 这里只负责把已经校验过的事件契约写入 outbox具体 service/table/topic 由仓库按 eventType 解析。
// 2. 这样 active scheduler 侧不再显式依赖 topic后续切服务级路由时只需要维护事件归属表。
_, err = outboxRepo.CreateMessage(ctx, eventType, strings.TrimSpace(messageKey), wrapped, maxRetry)
return err
}
func buildNotificationFallbackText(summary string, targetURL string) string {
link := strings.TrimSpace(targetURL)
if summary == "" {
return "你有一条新的日程调整建议,请查看:" + link
}
return summary + ",请查看:" + link
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return strings.TrimSpace(value)
}
}
return ""
}

View File

@@ -0,0 +1,375 @@
package service
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/LoveLosita/smartflow/backend/dao"
kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
"github.com/LoveLosita/smartflow/backend/model"
activegraph "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/graph"
activepreview "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/preview"
"github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger"
sharedevents "github.com/LoveLosita/smartflow/backend/shared/events"
"github.com/google/uuid"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
const (
triggerErrorCodePayloadMismatch = "payload_mismatch"
triggerErrorCodeWorkerFailed = "worker_failed"
)
// TriggerWorkflowService 负责第四阶段的 trigger -> dry-run -> preview -> notification 编排。
//
// 职责边界:
// 1. 只推进主动调度 trigger 的后台状态机,不负责启动 outbox worker
// 2. dry-run 与选择器都复用 active_scheduler 独立模块,不再往 newAgent 里塞主动调度逻辑;
// 3. notification 只发布 requested 事件,不直接接真实飞书 provider。
type TriggerWorkflowService struct {
activeDAO *dao.ActiveScheduleDAO
graphRunner *activegraph.Runner
outbox *outboxinfra.Repository
kafkaCfg kafkabus.Config
agentDAO *dao.AgentDAO
sessionDAO *dao.ActiveScheduleSessionDAO
clock func() time.Time
}
func NewTriggerWorkflowService(
activeDAO *dao.ActiveScheduleDAO,
graphRunner *activegraph.Runner,
outboxRepo *outboxinfra.Repository,
kafkaCfg kafkabus.Config,
) (*TriggerWorkflowService, error) {
return NewTriggerWorkflowServiceWithOptions(activeDAO, graphRunner, outboxRepo, kafkaCfg)
}
// NewTriggerWorkflowServiceWithOptions 创建主动调度 trigger 编排服务,并允许注入迁移期可选能力。
func NewTriggerWorkflowServiceWithOptions(
activeDAO *dao.ActiveScheduleDAO,
graphRunner *activegraph.Runner,
outboxRepo *outboxinfra.Repository,
kafkaCfg kafkabus.Config,
opts ...TriggerWorkflowOption,
) (*TriggerWorkflowService, error) {
if activeDAO == nil {
return nil, errors.New("active schedule dao 不能为空")
}
if graphRunner == nil {
return nil, errors.New("active scheduler graph runner 不能为空")
}
if outboxRepo == nil {
return nil, errors.New("outbox repository 不能为空")
}
svc := &TriggerWorkflowService{
activeDAO: activeDAO,
graphRunner: graphRunner,
outbox: outboxRepo,
kafkaCfg: kafkaCfg,
clock: time.Now,
}
for _, opt := range opts {
if opt != nil {
opt(svc)
}
}
return svc, nil
}
func (s *TriggerWorkflowService) SetClock(clock func() time.Time) {
if s != nil && clock != nil {
s.clock = clock
}
}
// TriggerWorkflowOption 是 trigger 编排服务的可选注入项。
type TriggerWorkflowOption func(*TriggerWorkflowService)
// ProcessTriggeredInTx 在 outbox 消费事务内推进 trigger 主链路。
//
// 步骤化说明:
// 1. 先锁 trigger 行,确保同一 trigger 在并发 worker 下只能由一个事务推进;
// 2. 再把状态切到 processing避免排障时看不出消息已经被消费
// 3. 复用 active scheduler graph 跑 dry-run + 受限选择;若发现已有 preview则直接复用避免重复写库
// 4. preview 成功后回写 trigger 状态,并在同一事务里补发 notification.requested outbox
// 5. 任一步失败都返回 error由外层 handler 负责记录 failed 状态并触发 outbox retry。
func (s *TriggerWorkflowService) ProcessTriggeredInTx(
ctx context.Context,
tx *gorm.DB,
payload sharedevents.ActiveScheduleTriggeredPayload,
) error {
if s == nil || s.activeDAO == nil || s.graphRunner == nil || s.outbox == nil {
return errors.New("trigger workflow service 未初始化")
}
if tx == nil {
return errors.New("gorm tx 不能为空")
}
if err := payload.Validate(); err != nil {
return err
}
now := s.now()
triggerRow, err := s.lockTrigger(ctx, tx, payload.TriggerID)
if err != nil {
return err
}
txDAO := s.activeDAO.WithTx(tx)
if completed, err := s.tryFinishByTerminalStatus(ctx, txDAO, *triggerRow); err != nil || completed {
return err
}
if handled, err := s.tryRejectMismatchedPayload(ctx, txDAO, *triggerRow, payload, now); err != nil || handled {
return err
}
if err := txDAO.UpdateTriggerFields(ctx, triggerRow.ID, map[string]any{
"status": model.ActiveScheduleTriggerStatusProcessing,
"processed_at": &now,
"last_error_code": nil,
"last_error": nil,
}); err != nil {
return err
}
existingPreview, err := txDAO.GetPreviewByTriggerID(ctx, triggerRow.ID)
switch {
case err == nil:
return s.finishWithExistingPreview(ctx, txDAO, *triggerRow, *existingPreview, now)
case errors.Is(err, gorm.ErrRecordNotFound):
// 继续创建新 preview。
default:
return err
}
domainTrigger := buildDomainTriggerFromModel(*triggerRow, payload)
graphResult, err := s.graphRunner.Run(ctx, domainTrigger)
if err != nil {
return err
}
if graphResult == nil || graphResult.DryRunData == nil {
return errors.New("active scheduler graph 返回空结果")
}
dryRunData := graphResult.DryRunData
if len(dryRunData.Candidates) == 0 {
return s.markClosedWithoutPreview(ctx, txDAO, triggerRow.ID, now)
}
if !dryRunData.Observation.Decision.ShouldNotify && !dryRunData.Observation.Decision.ShouldWritePreview {
return s.markClosedWithoutPreview(ctx, txDAO, triggerRow.ID, now)
}
previewService, err := activepreview.NewService(txDAO)
if err != nil {
return err
}
previewResp, err := previewService.CreatePreview(ctx, activepreview.CreatePreviewRequest{
ActiveContext: dryRunData.Context,
Observation: dryRunData.Observation,
Candidates: dryRunData.Candidates,
TriggerID: triggerRow.ID,
GeneratedAt: now,
SelectedCandidateID: graphResult.SelectionResult.SelectedCandidateID,
ExplanationText: graphResult.SelectionResult.ExplanationText,
NotificationSummary: graphResult.SelectionResult.NotificationSummary,
FallbackUsed: graphResult.SelectionResult.FallbackUsed,
})
if err != nil {
return err
}
previewID := previewResp.Detail.PreviewID
if err = txDAO.UpdateTriggerFields(ctx, triggerRow.ID, map[string]any{
"status": model.ActiveScheduleTriggerStatusPreviewGenerated,
"preview_id": &previewID,
"completed_at": &now,
"last_error_code": nil,
"last_error": nil,
}); err != nil {
return err
}
if !dryRunData.Observation.Decision.ShouldNotify {
return nil
}
// 1. 离线通知发出前,先把用户点击后要进入的助手会话和主动调度 session 预热好。
// 2. 这一步和 preview / notification outbox 在同一事务内提交,避免出现“飞书已送达但会话空白”的断裂状态。
if err := s.bootstrapActiveScheduleConversationInTx(ctx, tx, *triggerRow, previewResp.Detail, graphResult.SelectionResult, now); err != nil {
return err
}
notificationPayload := BuildFeishuRequestedPayload(
*triggerRow,
previewID,
previewResp.Detail.Notification,
now,
)
return EnqueueNotificationFeishuRequestedInTx(ctx, s.outbox.WithTx(tx), s.kafkaCfg.MaxRetry, notificationPayload)
}
// MarkTriggerFailedBestEffort 在事务外补记 trigger failed 状态,供 outbox retry 前排障。
//
// 职责边界:
// 1. 只做 best-effort 状态回写,不能影响外层对原始错误的返回;
// 2. 不负责错误分类,当前统一记为 worker_failed
// 3. 失败时静默返回,让真正的重试仍由 outbox 状态机负责。
func (s *TriggerWorkflowService) MarkTriggerFailedBestEffort(ctx context.Context, triggerID string, err error) {
if s == nil || s.activeDAO == nil || strings.TrimSpace(triggerID) == "" {
return
}
message := ""
if err != nil {
message = err.Error()
}
_ = s.activeDAO.UpdateTriggerFields(ctx, triggerID, map[string]any{
"status": model.ActiveScheduleTriggerStatusFailed,
"last_error_code": triggerErrorCodeWorkerFailed,
"last_error": &message,
})
}
func (s *TriggerWorkflowService) lockTrigger(ctx context.Context, tx *gorm.DB, triggerID string) (*model.ActiveScheduleTrigger, error) {
var row model.ActiveScheduleTrigger
err := tx.WithContext(ctx).
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("id = ?", triggerID).
First(&row).Error
if err != nil {
return nil, err
}
return &row, nil
}
func (s *TriggerWorkflowService) tryFinishByTerminalStatus(
ctx context.Context,
txDAO *dao.ActiveScheduleDAO,
row model.ActiveScheduleTrigger,
) (bool, error) {
switch row.Status {
case model.ActiveScheduleTriggerStatusPreviewGenerated,
model.ActiveScheduleTriggerStatusClosed,
model.ActiveScheduleTriggerStatusSkipped,
model.ActiveScheduleTriggerStatusRejected:
return true, nil
case model.ActiveScheduleTriggerStatusPending,
model.ActiveScheduleTriggerStatusProcessing,
model.ActiveScheduleTriggerStatusFailed:
return false, nil
default:
// 1. 遇到未知状态时,不直接报错中断,而是继续按 processing 流程推进。
// 2. 这样可以兼容迁移期历史脏数据,避免单条异常阻塞整批消费。
// 3. 真实状态最终会被下面的 UpdateTriggerFields 覆盖为 processing。
return false, nil
}
}
func (s *TriggerWorkflowService) tryRejectMismatchedPayload(
ctx context.Context,
txDAO *dao.ActiveScheduleDAO,
row model.ActiveScheduleTrigger,
payload sharedevents.ActiveScheduleTriggeredPayload,
now time.Time,
) (bool, error) {
mismatchReason := buildPayloadMismatchReason(row, payload)
if mismatchReason == "" {
return false, nil
}
if err := txDAO.UpdateTriggerFields(ctx, row.ID, map[string]any{
"status": model.ActiveScheduleTriggerStatusRejected,
"last_error_code": triggerErrorCodePayloadMismatch,
"last_error": &mismatchReason,
"completed_at": &now,
}); err != nil {
return false, err
}
return true, nil
}
func (s *TriggerWorkflowService) finishWithExistingPreview(
ctx context.Context,
txDAO *dao.ActiveScheduleDAO,
triggerRow model.ActiveScheduleTrigger,
previewRow model.ActiveSchedulePreview,
now time.Time,
) error {
previewID := previewRow.ID
return txDAO.UpdateTriggerFields(ctx, triggerRow.ID, map[string]any{
"status": model.ActiveScheduleTriggerStatusPreviewGenerated,
"preview_id": &previewID,
"completed_at": &now,
"last_error_code": nil,
"last_error": nil,
})
}
func (s *TriggerWorkflowService) markClosedWithoutPreview(
ctx context.Context,
txDAO *dao.ActiveScheduleDAO,
triggerID string,
now time.Time,
) error {
return txDAO.UpdateTriggerFields(ctx, triggerID, map[string]any{
"status": model.ActiveScheduleTriggerStatusClosed,
"completed_at": &now,
"last_error_code": nil,
"last_error": nil,
})
}
func (s *TriggerWorkflowService) now() time.Time {
if s == nil || s.clock == nil {
return time.Now()
}
return s.clock()
}
func buildDomainTriggerFromModel(
row model.ActiveScheduleTrigger,
payload sharedevents.ActiveScheduleTriggeredPayload,
) trigger.ActiveScheduleTrigger {
mockNow := row.MockNow
if mockNow == nil && payload.MockNow != nil {
mockNow = payload.MockNow
}
traceID := strings.TrimSpace(row.TraceID)
if traceID == "" {
traceID = strings.TrimSpace(payload.TraceID)
}
if traceID == "" {
traceID = "trace_active_trigger_" + uuid.NewString()
}
return trigger.ActiveScheduleTrigger{
TriggerID: row.ID,
UserID: row.UserID,
TriggerType: trigger.TriggerType(row.TriggerType),
Source: trigger.Source(row.Source),
TargetType: trigger.TargetType(row.TargetType),
TargetID: row.TargetID,
FeedbackID: row.FeedbackID,
IdempotencyKey: row.IdempotencyKey,
MockNow: mockNow,
IsMockTime: row.IsMockTime || payload.IsMockTime,
RequestedAt: row.RequestedAt,
TraceID: traceID,
}
}
func buildPayloadMismatchReason(row model.ActiveScheduleTrigger, payload sharedevents.ActiveScheduleTriggeredPayload) string {
switch {
case row.UserID != payload.UserID:
return fmt.Sprintf("trigger 事件 user_id 不一致: row=%d payload=%d", row.UserID, payload.UserID)
case row.TriggerType != payload.TriggerType:
return fmt.Sprintf("trigger 事件 trigger_type 不一致: row=%s payload=%s", row.TriggerType, payload.TriggerType)
case row.TargetType != payload.TargetType:
return fmt.Sprintf("trigger 事件 target_type 不一致: row=%s payload=%s", row.TargetType, payload.TargetType)
case row.TargetID != payload.TargetID:
return fmt.Sprintf("trigger 事件 target_id 不一致: row=%d payload=%d", row.TargetID, payload.TargetID)
default:
return ""
}
}