Files
smartmate/backend/cmd/agent/active_schedule_rerun.go
Losita 3b6fca44a6 Version: 0.9.77.dev.260505
后端:
1.阶段 6 CP4/CP5 目录收口与共享边界纯化
- 将 backend 根目录收口为 services、client、gateway、cmd、shared 五个一级目录
- 收拢 bootstrap、inits、infra/kafka、infra/outbox、conv、respond、pkg、middleware,移除根目录旧实现与空目录
- 将 utils 下沉到 services/userauth/internal/auth,将 logic 下沉到 services/schedule/core/planning
- 将迁移期 runtime 桥接实现统一收拢到 services/runtime/{conv,dao,eventsvc,model},删除 shared/legacy 与未再被 import 的旧 service 实现
- 将 gateway/shared/respond 收口为 HTTP/Gin 错误写回适配,shared/respond 仅保留共享错误语义与状态映射
- 将 HTTP IdempotencyMiddleware 与 RateLimitMiddleware 收口到 gateway/middleware
- 将 GormCachePlugin 下沉到 shared/infra/gormcache,将共享 RateLimiter 下沉到 shared/infra/ratelimit,将 agent token budget 下沉到 services/agent/shared
- 删除 InitEino 兼容壳,收缩 cmd/internal/coreinit 仅保留旧组合壳残留域初始化语义
- 更新微服务迁移计划与桌面 checklist,补齐 CP4/CP5 当前切流点、目录终态与验证结果
- 完成 go test ./...、git diff --check 与最终真实 smoke;health、register/login、task/create+get、schedule/today、task-class/list、memory/items、agent chat/meta/timeline/context-stats 全部 200,SSE 合并结果为 CP5_OK 且 [DONE] 只有 1 个
2026-05-05 23:25:07 +08:00

270 lines
10 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
activeapplyadapter "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/applyadapter"
activefeedbacklocate "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/feedbacklocate"
activegraph "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/graph"
activepreview "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/preview"
activesel "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/selection"
activesvc "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/service"
activeTrigger "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger"
agentstream "github.com/LoveLosita/smartflow/backend/services/agent/stream"
agentsv "github.com/LoveLosita/smartflow/backend/services/agent/sv"
rootdao "github.com/LoveLosita/smartflow/backend/services/runtime/dao"
"github.com/LoveLosita/smartflow/backend/services/runtime/model"
)
func buildActiveSchedulePreviewConfirmService(activeDAO *rootdao.ActiveScheduleDAO, dryRun *activesvc.DryRunService, scheduleApplyAdapter interface {
ApplyActiveScheduleChanges(context.Context, activeapplyadapter.ApplyActiveScheduleRequest) (activeapplyadapter.ApplyActiveScheduleResult, error)
}) (*activesvc.PreviewConfirmService, error) {
previewService, err := activepreview.NewService(activeDAO)
if err != nil {
return nil, err
}
return activesvc.NewPreviewConfirmService(dryRun, previewService, activeDAO, scheduleApplyAdapter)
}
// buildActiveScheduleSessionRerunFunc 把主动调度定位器 / graph / preview 能力装成聊天入口可调用的 rerun 闭包。
//
// 说明:
// 1. 这里只做最小接线:复用现有定位器 -> trigger -> graph -> preview 组件,不把 worker/notification 再搬一遍;
// 2. 成功时返回 session 状态、assistant 文本和业务卡片数据;
// 3. 失败时直接把 error 交回聊天入口,由上层统一写失败日志和 SSE 错误。
func buildActiveScheduleSessionRerunFunc(
activeDAO *rootdao.ActiveScheduleDAO,
graphRunner *activegraph.Runner,
previewConfirm *activesvc.PreviewConfirmService,
feedbackLocator *activefeedbacklocate.Service,
) agentsv.ActiveScheduleSessionRerunFunc {
return func(
ctx context.Context,
session *model.ActiveScheduleSessionSnapshot,
userMessage string,
traceID string,
requestStart time.Time,
) (*agentsv.ActiveScheduleSessionRerunResult, error) {
if activeDAO == nil || graphRunner == nil || previewConfirm == nil {
return nil, fmt.Errorf("主动调度 rerun 依赖未初始化")
}
if session == nil {
return nil, fmt.Errorf("主动调度 session 不能为空")
}
triggerRow, err := activeDAO.GetTriggerByID(ctx, session.TriggerID)
if err != nil {
return nil, err
}
resolvedTargetType := activeTrigger.TargetType(triggerRow.TargetType)
resolvedTargetID := triggerRow.TargetID
needsFeedbackLocate := activeTrigger.TriggerType(triggerRow.TriggerType) == activeTrigger.TriggerTypeUnfinishedFeedback &&
(resolvedTargetID <= 0 || containsString(session.State.MissingInfo, "feedback_target"))
if needsFeedbackLocate {
if feedbackLocator == nil {
question := firstNonEmptyString(
activefeedbacklocate.BuildAskUserQuestion(session.State.MissingInfo),
session.State.PendingQuestion,
)
nextState := session.State
nextState.PendingQuestion = question
nextState.MissingInfo = appendMissingString(nextState.MissingInfo, "feedback_target")
nextState.LastCandidateID = ""
nextState.LastNotificationID = ""
nextState.FailedReason = ""
nextState.ExpiresAt = nil
return &agentsv.ActiveScheduleSessionRerunResult{
AssistantText: question,
SessionState: nextState,
SessionStatus: model.ActiveScheduleSessionStatusWaitingUserReply,
}, nil
}
locateResult, locateErr := feedbackLocator.Resolve(ctx, activefeedbacklocate.Request{
UserID: triggerRow.UserID,
UserMessage: userMessage,
PendingQuestion: session.State.PendingQuestion,
MissingInfo: cloneStringSlice(session.State.MissingInfo),
})
if locateErr != nil {
return nil, locateErr
}
if locateResult.ShouldAskUser() {
question := firstNonEmptyString(
locateResult.AskUserQuestion,
activefeedbacklocate.BuildAskUserQuestion(session.State.MissingInfo),
session.State.PendingQuestion,
)
nextState := session.State
nextState.PendingQuestion = question
nextState.MissingInfo = appendMissingString(nextState.MissingInfo, "feedback_target")
nextState.LastCandidateID = ""
nextState.LastNotificationID = ""
nextState.FailedReason = ""
nextState.ExpiresAt = nil
return &agentsv.ActiveScheduleSessionRerunResult{
AssistantText: question,
SessionState: nextState,
SessionStatus: model.ActiveScheduleSessionStatusWaitingUserReply,
}, nil
}
resolvedTargetType = activeTrigger.TargetType(locateResult.TargetType)
resolvedTargetID = locateResult.TargetID
}
domainTrigger := activeTrigger.ActiveScheduleTrigger{
TriggerID: triggerRow.ID,
UserID: triggerRow.UserID,
TriggerType: activeTrigger.TriggerType(triggerRow.TriggerType),
Source: activeTrigger.SourceUserFeedback,
TargetType: resolvedTargetType,
TargetID: resolvedTargetID,
FeedbackID: triggerRow.FeedbackID,
IdempotencyKey: triggerRow.IdempotencyKey,
MockNow: nil,
IsMockTime: false,
RequestedAt: requestStart,
TraceID: traceID,
}
if err := domainTrigger.Validate(); err != nil {
return nil, err
}
graphResult, err := graphRunner.Run(ctx, domainTrigger)
if err != nil {
return nil, err
}
if graphResult == nil || graphResult.DryRunData == nil || graphResult.DryRunData.Context == nil {
return nil, fmt.Errorf("主动调度 graph 返回空结果")
}
selectionResult := graphResult.SelectionResult
state := session.State
state.LastCandidateID = strings.TrimSpace(selectionResult.SelectedCandidateID)
state.LastNotificationID = ""
state.FailedReason = ""
state.MissingInfo = cloneStringSlice(graphResult.DryRunData.Context.DerivedFacts.MissingInfo)
switch selectionResult.Action {
case activesel.ActionSelectCandidate:
if !graphResult.DryRunData.Observation.Decision.ShouldWritePreview {
return nil, fmt.Errorf("主动调度 graph 选择了候选,但未产出可写 preview")
}
previewResp, err := previewConfirm.CreatePreviewFromDryRun(ctx, activepreview.CreatePreviewRequest{
ActiveContext: graphResult.DryRunData.Context,
Observation: graphResult.DryRunData.Observation,
Candidates: graphResult.DryRunData.Candidates,
TriggerID: triggerRow.ID,
GeneratedAt: requestStart,
SelectedCandidateID: selectionResult.SelectedCandidateID,
ExplanationText: selectionResult.ExplanationText,
NotificationSummary: selectionResult.NotificationSummary,
FallbackUsed: selectionResult.FallbackUsed,
})
if err != nil {
return nil, err
}
state.PendingQuestion = ""
state.MissingInfo = nil
state.FailedReason = ""
expiresAt := previewResp.Detail.ExpiresAt
state.ExpiresAt = &expiresAt
return &agentsv.ActiveScheduleSessionRerunResult{
AssistantText: firstNonEmptyString(selectionResult.ExplanationText, selectionResult.NotificationSummary, previewResp.Detail.Explanation, previewResp.Detail.Notification, "主动调度建议已更新。"),
BusinessCard: &agentstream.StreamBusinessCardExtra{
CardType: "active_schedule_preview",
Title: "SmartFlow 日程调整建议",
Summary: firstNonEmptyString(selectionResult.NotificationSummary, previewResp.Detail.Notification, previewResp.Detail.Explanation),
Data: previewDetailToMap(previewResp.Detail),
},
SessionState: state,
SessionStatus: model.ActiveScheduleSessionStatusReadyPreview,
PreviewID: previewResp.Detail.PreviewID,
}, nil
case activesel.ActionAskUser:
question := firstNonEmptyString(selectionResult.AskUserQuestion, selectionResult.ExplanationText, "请继续补充主动调度需要的信息。")
state.PendingQuestion = question
state.ExpiresAt = nil
return &agentsv.ActiveScheduleSessionRerunResult{
AssistantText: question,
SessionState: state,
SessionStatus: model.ActiveScheduleSessionStatusWaitingUserReply,
}, nil
default:
assistantText := firstNonEmptyString(selectionResult.ExplanationText, selectionResult.NotificationSummary, "当前主动调度暂时没有需要继续处理的内容。")
state.PendingQuestion = ""
state.MissingInfo = nil
state.ExpiresAt = nil
return &agentsv.ActiveScheduleSessionRerunResult{
AssistantText: assistantText,
SessionState: state,
SessionStatus: model.ActiveScheduleSessionStatusIgnored,
}, nil
}
}
}
func previewDetailToMap(detail activepreview.ActiveSchedulePreviewDetail) map[string]any {
raw, err := json.Marshal(detail)
if err != nil {
return map[string]any{}
}
var output map[string]any
if err := json.Unmarshal(raw, &output); err != nil {
return map[string]any{}
}
return output
}
func firstNonEmptyString(values ...string) string {
for _, value := range values {
if trimmed := strings.TrimSpace(value); trimmed != "" {
return trimmed
}
}
return ""
}
func cloneStringSlice(values []string) []string {
if len(values) == 0 {
return nil
}
copied := make([]string, len(values))
copy(copied, values)
return copied
}
func appendMissingString(values []string, next string) []string {
trimmed := strings.TrimSpace(next)
if trimmed == "" {
return cloneStringSlice(values)
}
for _, value := range values {
if strings.TrimSpace(value) == trimmed {
return cloneStringSlice(values)
}
}
result := cloneStringSlice(values)
return append(result, trimmed)
}
func containsString(values []string, target string) bool {
trimmed := strings.TrimSpace(target)
if trimmed == "" {
return false
}
for _, value := range values {
if strings.TrimSpace(value) == trimmed {
return true
}
}
return false
}