diff --git a/backend/api/active_schedule.go b/backend/api/active_schedule.go deleted file mode 100644 index 47f7573..0000000 --- a/backend/api/active_schedule.go +++ /dev/null @@ -1,305 +0,0 @@ -package api - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "net/http" - "time" - - activeapply "github.com/LoveLosita/smartflow/backend/active_scheduler/apply" - activepreview "github.com/LoveLosita/smartflow/backend/active_scheduler/preview" - activesvc "github.com/LoveLosita/smartflow/backend/active_scheduler/service" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" - "github.com/LoveLosita/smartflow/backend/respond" - "github.com/gin-gonic/gin" - "gorm.io/gorm" -) - -// ActiveScheduleAPI 承载主动调度开发期和验收期 API。 -// -// 职责边界: -// 1. 只负责鉴权用户、绑定请求和调用主动调度 service; -// 2. 不直接读取 DAO、不生成候选、不写 preview; -// 3. 阶段 1-2 只开放 dry-run,正式 trigger/preview/confirm 后续阶段再接入。 -type ActiveScheduleAPI struct { - dryRunService *activesvc.DryRunService - previewConfirmService *activesvc.PreviewConfirmService - triggerService *activesvc.TriggerService -} - -func NewActiveScheduleAPI(dryRunService *activesvc.DryRunService, previewConfirmService *activesvc.PreviewConfirmService, triggerService *activesvc.TriggerService) *ActiveScheduleAPI { - return &ActiveScheduleAPI{ - dryRunService: dryRunService, - previewConfirmService: previewConfirmService, - triggerService: triggerService, - } -} - -type ActiveScheduleDryRunRequest struct { - TriggerType string `json:"trigger_type" binding:"required"` - TargetType string `json:"target_type" binding:"required"` - TargetID int `json:"target_id"` - FeedbackID string `json:"feedback_id"` - IdempotencyKey string `json:"idempotency_key"` - MockNow *time.Time `json:"mock_now"` - Payload any `json:"payload"` -} - -// DryRun 同步执行主动调度诊断,不写 preview、不发通知、不修改正式日程。 -func (api *ActiveScheduleAPI) DryRun(c *gin.Context) { - if api == nil || api.dryRunService == nil { - c.JSON(http.StatusInternalServerError, respond.InternalError(nilServiceError("主动调度 dry-run service 未初始化"))) - return - } - - var req ActiveScheduleDryRunRequest - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, respond.WrongParamType) - return - } - - userID := c.GetInt("user_id") - now := time.Now() - isMockTime := req.MockNow != nil - trig := trigger.ActiveScheduleTrigger{ - UserID: userID, - TriggerType: trigger.TriggerType(req.TriggerType), - Source: trigger.SourceAPIDryRun, - TargetType: trigger.TargetType(req.TargetType), - TargetID: req.TargetID, - FeedbackID: req.FeedbackID, - IdempotencyKey: req.IdempotencyKey, - MockNow: req.MockNow, - IsMockTime: isMockTime, - RequestedAt: now, - } - - ctx, cancel := context.WithTimeout(c.Request.Context(), 3*time.Second) - defer cancel() - result, err := api.dryRunService.DryRun(ctx, trig) - if err != nil { - respond.DealWithError(c, err) - return - } - - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, result)) -} - -// Trigger 写入正式主动调度 trigger 并发布 active_schedule.triggered。 -func (api *ActiveScheduleAPI) Trigger(c *gin.Context) { - if api == nil || api.triggerService == nil { - c.JSON(http.StatusInternalServerError, respond.InternalError(nilServiceError("主动调度 trigger service 未初始化"))) - return - } - - var req ActiveScheduleDryRunRequest - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, respond.WrongParamType) - return - } - rawPayload, err := json.Marshal(req.Payload) - if err != nil { - c.JSON(http.StatusBadRequest, respond.WrongParamType) - return - } - if string(rawPayload) == "null" { - rawPayload = []byte("{}") - } - - now := time.Now() - ctx, cancel := context.WithTimeout(c.Request.Context(), 3*time.Second) - defer cancel() - result, err := api.triggerService.CreateAndPublish(ctx, activesvc.TriggerRequest{ - UserID: c.GetInt("user_id"), - TriggerType: trigger.TriggerType(req.TriggerType), - Source: trigger.SourceAPITrigger, - TargetType: trigger.TargetType(req.TargetType), - TargetID: req.TargetID, - FeedbackID: req.FeedbackID, - IdempotencyKey: req.IdempotencyKey, - MockNow: req.MockNow, - IsMockTime: req.MockNow != nil, - RequestedAt: now, - Payload: rawPayload, - TraceID: fmt.Sprintf("trace_api_trigger_%d_%d", c.GetInt("user_id"), now.UnixNano()), - }) - if err != nil { - respond.DealWithError(c, err) - return - } - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, result)) -} - -// CreatePreview 先同步 dry-run,再把 top1 候选固化为待确认预览。 -func (api *ActiveScheduleAPI) CreatePreview(c *gin.Context) { - if api == nil || api.dryRunService == nil || api.previewConfirmService == nil { - c.JSON(http.StatusInternalServerError, respond.InternalError(nilServiceError("主动调度 preview service 未初始化"))) - return - } - - var req ActiveScheduleDryRunRequest - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, respond.WrongParamType) - return - } - - userID := c.GetInt("user_id") - now := time.Now() - trig := trigger.ActiveScheduleTrigger{ - TriggerID: fmt.Sprintf("ast_api_%d_%d", userID, now.UnixNano()), - UserID: userID, - TriggerType: trigger.TriggerType(req.TriggerType), - Source: trigger.SourceAPIDryRun, - TargetType: trigger.TargetType(req.TargetType), - TargetID: req.TargetID, - FeedbackID: req.FeedbackID, - IdempotencyKey: req.IdempotencyKey, - MockNow: req.MockNow, - IsMockTime: req.MockNow != nil, - RequestedAt: now, - TraceID: fmt.Sprintf("trace_api_preview_%d_%d", userID, now.UnixNano()), - } - - ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Second) - defer cancel() - dryRunResult, err := api.dryRunService.DryRun(ctx, trig) - if err != nil { - respond.DealWithError(c, err) - return - } - previewResp, err := api.previewConfirmService.CreatePreviewFromDryRun(ctx, activepreview.CreatePreviewRequest{ - ActiveContext: dryRunResult.Context, - Observation: dryRunResult.Observation, - Candidates: dryRunResult.Candidates, - TriggerID: trig.TriggerID, - GeneratedAt: now, - }) - if err != nil { - respond.DealWithError(c, err) - return - } - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, previewResp.Detail)) -} - -// GetPreview 查询主动调度预览详情。 -func (api *ActiveScheduleAPI) GetPreview(c *gin.Context) { - if api == nil || api.previewConfirmService == nil { - c.JSON(http.StatusInternalServerError, respond.InternalError(nilServiceError("主动调度 preview service 未初始化"))) - return - } - ctx, cancel := context.WithTimeout(c.Request.Context(), 3*time.Second) - defer cancel() - detail, err := api.previewConfirmService.GetPreview(ctx, c.GetInt("user_id"), c.Param("preview_id")) - if err != nil { - respond.DealWithError(c, err) - return - } - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, detail)) -} - -// ConfirmPreview 同步确认并正式应用主动调度预览。 -func (api *ActiveScheduleAPI) ConfirmPreview(c *gin.Context) { - if api == nil || api.previewConfirmService == nil { - c.JSON(http.StatusInternalServerError, respond.InternalError(nilServiceError("主动调度 confirm service 未初始化"))) - return - } - var req activeapply.ConfirmRequest - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, respond.WrongParamType) - return - } - req.PreviewID = c.Param("preview_id") - req.UserID = c.GetInt("user_id") - if req.RequestedAt.IsZero() { - req.RequestedAt = time.Now() - } - ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Second) - defer cancel() - result, err := api.previewConfirmService.ConfirmPreview(ctx, req) - if err != nil { - writeActiveScheduleConfirmError(c, err) - return - } - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, result)) -} - -// writeActiveScheduleConfirmError 将 confirm/apply 的可预期业务拒绝映射为 4xx。 -// -// 职责边界: -// 1. 只处理主动调度 confirm/apply 链路已经分类的 ApplyError; -// 2. 不吞掉数据库、超时、panic recover 等系统错误,未知错误继续交给通用 respond 走 500; -// 3. 响应体保留 error_code / error_message,便于前端按过期、冲突、越权等场景给出明确交互。 -func writeActiveScheduleConfirmError(c *gin.Context, err error) { - if applyErr, ok := activeapply.AsApplyError(err); ok { - status := activeScheduleApplyHTTPStatus(applyErr.Code) - message := applyErr.Message - if message == "" { - message = applyErr.Error() - } - applyStatus := activeapply.ApplyStatusRejected - if applyErr.Code == activeapply.ErrorCodeExpired { - applyStatus = activeapply.ApplyStatusExpired - } - if applyErr.Code == activeapply.ErrorCodeDBError { - applyStatus = activeapply.ApplyStatusFailed - } - c.JSON(status, respond.RespWithData(respond.Response{ - Status: fmt.Sprintf("%d", status), - Info: message, - }, activeapply.ConfirmResult{ - ApplyStatus: applyStatus, - ErrorCode: applyErr.Code, - ErrorMessage: message, - })) - return - } - if errors.Is(err, gorm.ErrRecordNotFound) { - c.JSON(http.StatusNotFound, respond.RespWithData(respond.Response{ - Status: fmt.Sprintf("%d", http.StatusNotFound), - Info: "预览不存在或已被删除", - }, activeapply.ConfirmResult{ - ApplyStatus: activeapply.ApplyStatusRejected, - ErrorCode: activeapply.ErrorCodeTargetNotFound, - ErrorMessage: "预览不存在或已被删除", - })) - return - } - respond.DealWithError(c, err) -} - -// activeScheduleApplyHTTPStatus 只负责错误码到 HTTP 语义的稳定映射。 -// -// 说明: -// 1. 请求体/编辑范围问题返回 400; -// 2. 越权返回 403,目标缺失返回 404; -// 3. 过期、幂等冲突、节次冲突、目标状态变化统一返回 409,提示前端刷新预览或重新生成。 -func activeScheduleApplyHTTPStatus(code activeapply.ErrorCode) int { - switch code { - case activeapply.ErrorCodeInvalidRequest, - activeapply.ErrorCodeInvalidEditedChanges, - activeapply.ErrorCodeUnsupportedChangeType: - return http.StatusBadRequest - case activeapply.ErrorCodeForbidden: - return http.StatusForbidden - case activeapply.ErrorCodeTargetNotFound: - return http.StatusNotFound - case activeapply.ErrorCodeExpired, - activeapply.ErrorCodeIdempotencyConflict, - activeapply.ErrorCodeBaseVersionChanged, - activeapply.ErrorCodeTargetCompleted, - activeapply.ErrorCodeTargetAlreadySchedule, - activeapply.ErrorCodeSlotConflict, - activeapply.ErrorCodeAlreadyApplied: - return http.StatusConflict - default: - return http.StatusInternalServerError - } -} - -type nilServiceError string - -func (e nilServiceError) Error() string { - return string(e) -} diff --git a/backend/cmd/active-scheduler/main.go b/backend/cmd/active-scheduler/main.go new file mode 100644 index 0000000..4311e09 --- /dev/null +++ b/backend/cmd/active-scheduler/main.go @@ -0,0 +1,73 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + + "github.com/LoveLosita/smartflow/backend/bootstrap" + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + "github.com/LoveLosita/smartflow/backend/inits" + activeschedulerdao "github.com/LoveLosita/smartflow/backend/services/active_scheduler/dao" + activeschedulerrpc "github.com/LoveLosita/smartflow/backend/services/active_scheduler/rpc" + activeschedulersv "github.com/LoveLosita/smartflow/backend/services/active_scheduler/sv" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" + "github.com/spf13/viper" +) + +func main() { + if err := bootstrap.LoadConfig(); err != nil { + log.Fatalf("failed to load config: %v", err) + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + db, err := activeschedulerdao.OpenDBFromConfig() + if err != nil { + log.Fatalf("failed to connect active-scheduler database: %v", err) + } + + aiHub, err := inits.InitEino() + if err != nil { + log.Fatalf("failed to initialize active-scheduler Eino runtime: %v", err) + } + llmService := llmservice.New(llmservice.Options{ + AIHub: aiHub, + APIKey: os.Getenv("ARK_API_KEY"), + BaseURL: viper.GetString("agent.baseURL"), + CourseVisionModel: viper.GetString("courseImport.visionModel"), + }) + + svc, err := activeschedulersv.New(db, llmService, activeschedulersv.Options{ + JobScanEvery: viper.GetDuration("activeScheduler.jobScanEvery"), + JobScanLimit: viper.GetInt("activeScheduler.jobScanLimit"), + KafkaConfig: kafkabus.LoadConfig(), + }) + if err != nil { + log.Fatalf("failed to initialize active-scheduler service: %v", err) + } + defer svc.Close() + svc.StartWorkers(ctx) + log.Println("Active-scheduler outbox consumer and due job scanner started") + + server, listenOn, err := activeschedulerrpc.NewServer(activeschedulerrpc.ServerOptions{ + ListenOn: viper.GetString("activeScheduler.rpc.listenOn"), + Timeout: viper.GetDuration("activeScheduler.rpc.timeout"), + Service: svc, + }) + if err != nil { + log.Fatalf("failed to build active-scheduler zrpc server: %v", err) + } + defer server.Stop() + + go func() { + log.Printf("active-scheduler zrpc service starting on %s", listenOn) + server.Start() + }() + + <-ctx.Done() + log.Println("active-scheduler service stopping") +} diff --git a/backend/cmd/start.go b/backend/cmd/start.go index cf29084..b9af716 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -11,21 +11,13 @@ import ( "syscall" "time" - activeadapters "github.com/LoveLosita/smartflow/backend/active_scheduler/adapters" - "github.com/LoveLosita/smartflow/backend/active_scheduler/applyadapter" - activefeedbacklocate "github.com/LoveLosita/smartflow/backend/active_scheduler/feedbacklocate" - activegraph "github.com/LoveLosita/smartflow/backend/active_scheduler/graph" - activejob "github.com/LoveLosita/smartflow/backend/active_scheduler/job" - activepreview "github.com/LoveLosita/smartflow/backend/active_scheduler/preview" - activesel "github.com/LoveLosita/smartflow/backend/active_scheduler/selection" - activesvc "github.com/LoveLosita/smartflow/backend/active_scheduler/service" - activeTrigger "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" - "github.com/LoveLosita/smartflow/backend/api" "github.com/LoveLosita/smartflow/backend/bootstrap" "github.com/LoveLosita/smartflow/backend/dao" - gatewaynotification "github.com/LoveLosita/smartflow/backend/gateway/notification" + "github.com/LoveLosita/smartflow/backend/gateway/api" + gatewayactivescheduler "github.com/LoveLosita/smartflow/backend/gateway/client/activescheduler" + gatewaynotification "github.com/LoveLosita/smartflow/backend/gateway/client/notification" + gatewayuserauth "github.com/LoveLosita/smartflow/backend/gateway/client/userauth" gatewayrouter "github.com/LoveLosita/smartflow/backend/gateway/router" - gatewayuserauth "github.com/LoveLosita/smartflow/backend/gateway/userauth" kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" "github.com/LoveLosita/smartflow/backend/inits" @@ -43,6 +35,14 @@ import ( "github.com/LoveLosita/smartflow/backend/service" agentsvcsvc "github.com/LoveLosita/smartflow/backend/service/agentsvc" eventsvc "github.com/LoveLosita/smartflow/backend/service/events" + activeadapters "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/adapters" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/applyadapter" + activefeedbacklocate "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/feedbacklocate" + activegraph "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/graph" + activepreview "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/preview" + activesel "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/selection" + activesvc "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/service" + activeTrigger "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ragservice "github.com/LoveLosita/smartflow/backend/services/rag" ragconfig "github.com/LoveLosita/smartflow/backend/services/rag/config" @@ -59,20 +59,18 @@ import ( // 2. 不承载业务逻辑,业务仍然由 service / newAgent / memory 等领域模块负责; // 3. 不决定进程角色,api / worker / all 由 StartAPI、StartWorker、StartAll 选择启动哪些生命周期。 type appRuntime struct { - db *gorm.DB - redisClient *redis.Client - cacheRepo *dao.CacheDAO - agentRepo *dao.AgentDAO - agentCache *dao.AgentCache - manager *dao.RepoManager - outboxRepo *outboxinfra.Repository - eventBus eventsvc.OutboxBus - memoryModule *memory.Module - activeJobScanner *activejob.Scanner - activeTriggerWorkflow *activesvc.TriggerWorkflowService - limiter *pkg.RateLimiter - handlers *api.ApiHandlers - userAuthClient *gatewayuserauth.Client + db *gorm.DB + redisClient *redis.Client + cacheRepo *dao.CacheDAO + agentRepo *dao.AgentDAO + agentCache *dao.AgentCache + manager *dao.RepoManager + outboxRepo *outboxinfra.Repository + eventBus eventsvc.OutboxBus + memoryModule *memory.Module + limiter *pkg.RateLimiter + handlers *api.ApiHandlers + userAuthClient *gatewayuserauth.Client } // loadConfig 锻炼? @@ -112,7 +110,7 @@ func StartAPI() { } // StartWorker 只启动后台异步能力,不注册 Gin 路由。 -// 当前包含 outbox relay / Kafka consumer / memory worker / 主动调度扫描。 +// 当前包含单体残留域 outbox relay / Kafka consumer / memory worker;主动调度扫描已迁到 cmd/active-scheduler。 func StartWorker() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() @@ -223,6 +221,14 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { if err != nil { return nil, fmt.Errorf("failed to initialize notification zrpc client: %w", err) } + activeSchedulerClient, err := gatewayactivescheduler.NewClient(gatewayactivescheduler.ClientConfig{ + Endpoints: viper.GetStringSlice("activeScheduler.rpc.endpoints"), + Target: viper.GetString("activeScheduler.rpc.target"), + Timeout: viper.GetDuration("activeScheduler.rpc.timeout"), + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize active-scheduler zrpc client: %w", err) + } taskSv := service.NewTaskService(taskRepo, cacheRepo, eventBus) taskSv.SetActiveScheduleDAO(manager.ActiveSchedule) courseService := buildCourseService(llmService, courseRepo, scheduleRepo) @@ -258,10 +264,6 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { if err != nil { return nil, err } - activeScheduleTrigger, err := activesvc.NewTriggerService(manager.ActiveSchedule, eventBus) - if err != nil { - return nil, err - } activeSchedulePreviewConfirm, err := buildActiveSchedulePreviewConfirmService(db, manager.ActiveSchedule, activeScheduleDryRun) if err != nil { return nil, err @@ -276,45 +278,22 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { return nil, err } agentService.SetActiveScheduleSessionRerunFunc(buildActiveScheduleSessionRerunFunc(manager.ActiveSchedule, activeScheduleGraphRunner, activeSchedulePreviewConfirm, activeScheduleFeedbackLocator)) - var activeTriggerWorkflow *activesvc.TriggerWorkflowService - var activeJobScanner *activejob.Scanner - if eventBus != nil { - activeTriggerWorkflow, err = activesvc.NewTriggerWorkflowServiceWithOptions( - manager.ActiveSchedule, - activeScheduleGraphRunner, - outboxRepo, - kafkabus.LoadConfig(), - activesvc.WithActiveScheduleSessionBridge(manager.Agent, manager.ActiveScheduleSession), - ) - if err != nil { - return nil, err - } - activeJobScanner, err = activejob.NewScanner(manager.ActiveSchedule, activeadapters.ReadersFromGorm(activeReaders), activeScheduleTrigger, activejob.ScannerOptions{ - ScanEvery: viper.GetDuration("activeScheduler.jobScanEvery"), - Limit: viper.GetInt("activeScheduler.jobScanLimit"), - }) - if err != nil { - return nil, err - } - } - handlers := buildAPIHandlers(taskSv, taskClassService, courseService, scheduleService, agentService, memoryModule, activeScheduleDryRun, activeSchedulePreviewConfirm, activeScheduleTrigger, notificationClient) + handlers := buildAPIHandlers(taskSv, taskClassService, courseService, scheduleService, agentService, memoryModule, activeSchedulerClient, notificationClient) runtime := &appRuntime{ db: db, redisClient: rdb, cacheRepo: cacheRepo, - agentRepo: agentRepo, - agentCache: agentCacheRepo, - manager: manager, - outboxRepo: outboxRepo, - eventBus: eventBus, - memoryModule: memoryModule, - activeJobScanner: activeJobScanner, - activeTriggerWorkflow: activeTriggerWorkflow, - limiter: limiter, - handlers: handlers, - userAuthClient: userAuthClient, + agentRepo: agentRepo, + agentCache: agentCacheRepo, + manager: manager, + outboxRepo: outboxRepo, + eventBus: eventBus, + memoryModule: memoryModule, + limiter: limiter, + handlers: handlers, + userAuthClient: userAuthClient, } if runtime.eventBus != nil { if err := runtime.registerEventHandlers(); err != nil { @@ -834,9 +813,7 @@ func buildAPIHandlers( scheduleService *service.ScheduleService, agentService *service.AgentService, memoryModule *memory.Module, - activeScheduleDryRun *activesvc.DryRunService, - activeSchedulePreviewConfirm *activesvc.PreviewConfirmService, - activeScheduleTrigger *activesvc.TriggerService, + activeSchedulerClient ports.ActiveSchedulerCommandClient, notificationClient ports.NotificationCommandClient, ) *api.ApiHandlers { return &api.ApiHandlers{ @@ -846,7 +823,7 @@ func buildAPIHandlers( ScheduleHandler: api.NewScheduleAPI(scheduleService), AgentHandler: api.NewAgentHandler(agentService), MemoryHandler: api.NewMemoryHandler(memoryModule), - ActiveSchedule: api.NewActiveScheduleAPI(activeScheduleDryRun, activeSchedulePreviewConfirm, activeScheduleTrigger), + ActiveSchedule: api.NewActiveScheduleAPI(activeSchedulerClient), Notification: api.NewNotificationAPI(notificationClient), } } @@ -866,22 +843,17 @@ func (r *appRuntime) startWorkers(ctx context.Context) { if r.memoryModule != nil { r.memoryModule.StartWorker(ctx) } - if r.activeJobScanner != nil { - r.activeJobScanner.Start(ctx) - log.Println("Active schedule due job scanner started") - } } func (r *appRuntime) registerEventHandlers() error { - // 调用目的:在运行时启动前一次性完成“事件类型 -> 服务归属 -> handler”的显式接线,避免 API 模式发布事件时拿不到路由表。 - if err := eventsvc.RegisterAllOutboxHandlers( + // 调用目的:只注册仍留在单体残留域内的 outbox handler;active-scheduler / notification 已由各自独立进程管理消费边界。 + if err := eventsvc.RegisterCoreOutboxHandlers( r.eventBus, r.outboxRepo, r.manager, r.agentRepo, r.cacheRepo, r.memoryModule, - r.activeTriggerWorkflow, r.userAuthClient, ); err != nil { return err diff --git a/backend/config.example.yaml b/backend/config.example.yaml index cff1436..97e1499 100644 --- a/backend/config.example.yaml +++ b/backend/config.example.yaml @@ -59,6 +59,16 @@ notification: retryScanEvery: 1m retryBatchSize: 50 +# 主动调度服务配置。 +activeScheduler: + rpc: + listenOn: "0.0.0.0:9083" + endpoints: + - "127.0.0.1:9083" + timeout: 8s + jobScanEvery: 1m + jobScanLimit: 50 + # 时间与学期边界配置。 time: zone: "Asia/Shanghai" diff --git a/backend/gateway/api/active_schedule.go b/backend/gateway/api/active_schedule.go new file mode 100644 index 0000000..f56b923 --- /dev/null +++ b/backend/gateway/api/active_schedule.go @@ -0,0 +1,212 @@ +package api + +import ( + "context" + "errors" + "fmt" + "net/http" + "time" + + "github.com/LoveLosita/smartflow/backend/respond" + contracts "github.com/LoveLosita/smartflow/backend/shared/contracts/activescheduler" + "github.com/LoveLosita/smartflow/backend/shared/ports" + "github.com/gin-gonic/gin" +) + +const activeScheduleAPITimeout = 8 * time.Second + +// ActiveScheduleAPI 承载主动调度开发期和验收期 API。 +// +// 职责边界: +// 1. 只负责鉴权用户、绑定请求和调用 active-scheduler zrpc client; +// 2. 不直接读取 DAO、不生成候选、不写 preview; +// 3. 复杂响应由 active-scheduler 服务返回 JSON,gateway 只做边缘透传。 +type ActiveScheduleAPI struct { + client ports.ActiveSchedulerCommandClient +} + +func NewActiveScheduleAPI(client ports.ActiveSchedulerCommandClient) *ActiveScheduleAPI { + return &ActiveScheduleAPI{client: client} +} + +// DryRun 同步执行主动调度诊断,不写 preview、不发通知、不修改正式日程。 +func (api *ActiveScheduleAPI) DryRun(c *gin.Context) { + if api == nil || api.client == nil { + c.JSON(http.StatusInternalServerError, respond.InternalError(nilServiceError("主动调度 zrpc client 未初始化"))) + return + } + + var req contracts.ActiveScheduleRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, respond.WrongParamType) + return + } + req.UserID = c.GetInt("user_id") + + ctx, cancel := context.WithTimeout(c.Request.Context(), activeScheduleAPITimeout) + defer cancel() + result, err := api.client.DryRun(ctx, req) + if err != nil { + respond.DealWithError(c, err) + return + } + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, result)) +} + +// Trigger 写入正式主动调度 trigger 并发布 active_schedule.triggered。 +func (api *ActiveScheduleAPI) Trigger(c *gin.Context) { + if api == nil || api.client == nil { + c.JSON(http.StatusInternalServerError, respond.InternalError(nilServiceError("主动调度 zrpc client 未初始化"))) + return + } + + var req contracts.ActiveScheduleRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, respond.WrongParamType) + return + } + req.UserID = c.GetInt("user_id") + + ctx, cancel := context.WithTimeout(c.Request.Context(), activeScheduleAPITimeout) + defer cancel() + result, err := api.client.Trigger(ctx, req) + if err != nil { + respond.DealWithError(c, err) + return + } + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, result)) +} + +// CreatePreview 先同步 dry-run,再把 top1 候选固化为待确认预览。 +func (api *ActiveScheduleAPI) CreatePreview(c *gin.Context) { + if api == nil || api.client == nil { + c.JSON(http.StatusInternalServerError, respond.InternalError(nilServiceError("主动调度 zrpc client 未初始化"))) + return + } + + var req contracts.ActiveScheduleRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, respond.WrongParamType) + return + } + req.UserID = c.GetInt("user_id") + + ctx, cancel := context.WithTimeout(c.Request.Context(), activeScheduleAPITimeout) + defer cancel() + result, err := api.client.CreatePreview(ctx, req) + if err != nil { + respond.DealWithError(c, err) + return + } + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, result)) +} + +// GetPreview 查询主动调度预览详情。 +func (api *ActiveScheduleAPI) GetPreview(c *gin.Context) { + if api == nil || api.client == nil { + c.JSON(http.StatusInternalServerError, respond.InternalError(nilServiceError("主动调度 zrpc client 未初始化"))) + return + } + ctx, cancel := context.WithTimeout(c.Request.Context(), activeScheduleAPITimeout) + defer cancel() + detail, err := api.client.GetPreview(ctx, contracts.GetPreviewRequest{ + UserID: c.GetInt("user_id"), + PreviewID: c.Param("preview_id"), + }) + if err != nil { + respond.DealWithError(c, err) + return + } + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, detail)) +} + +// ConfirmPreview 同步确认并正式应用主动调度预览。 +func (api *ActiveScheduleAPI) ConfirmPreview(c *gin.Context) { + if api == nil || api.client == nil { + c.JSON(http.StatusInternalServerError, respond.InternalError(nilServiceError("主动调度 zrpc client 未初始化"))) + return + } + var req contracts.ConfirmPreviewRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, respond.WrongParamType) + return + } + req.PreviewID = c.Param("preview_id") + req.UserID = c.GetInt("user_id") + if req.RequestedAt.IsZero() { + req.RequestedAt = time.Now() + } + + ctx, cancel := context.WithTimeout(c.Request.Context(), activeScheduleAPITimeout) + defer cancel() + result, err := api.client.ConfirmPreview(ctx, req) + if err != nil { + writeActiveScheduleConfirmError(c, err) + return + } + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, result)) +} + +// writeActiveScheduleConfirmError 将 confirm/apply 的可预期业务拒绝映射为 4xx。 +// +// 职责边界: +// 1. 只处理 active-scheduler zrpc client 已恢复的 ApplyError; +// 2. 不吞掉数据库、超时、panic recover 等系统错误,未知错误继续交给通用 respond 走 500; +// 3. 响应体保留 error_code / error_message,便于前端按过期、冲突、越权等场景给出明确交互。 +func writeActiveScheduleConfirmError(c *gin.Context, err error) { + var applyErr *contracts.ApplyError + if errors.As(err, &applyErr) { + status := activeScheduleApplyHTTPStatus(applyErr.Code) + message := applyErr.Message + if message == "" { + message = applyErr.Error() + } + applyStatus := contracts.ApplyStatusRejected + if applyErr.Code == contracts.ApplyErrorCodeExpired { + applyStatus = contracts.ApplyStatusExpired + } + if applyErr.Code == contracts.ApplyErrorCodeDBError { + applyStatus = contracts.ApplyStatusFailed + } + c.JSON(status, respond.RespWithData(respond.Response{ + Status: fmt.Sprintf("%d", status), + Info: message, + }, contracts.ConfirmErrorResult{ + ApplyStatus: applyStatus, + ErrorCode: applyErr.Code, + ErrorMessage: message, + })) + return + } + respond.DealWithError(c, err) +} + +// activeScheduleApplyHTTPStatus 只负责错误码到 HTTP 语义的稳定映射。 +func activeScheduleApplyHTTPStatus(code contracts.ApplyErrorCode) int { + switch code { + case contracts.ApplyErrorCodeInvalidRequest, + contracts.ApplyErrorCodeInvalidEditedChanges, + contracts.ApplyErrorCodeUnsupportedChangeType: + return http.StatusBadRequest + case contracts.ApplyErrorCodeForbidden: + return http.StatusForbidden + case contracts.ApplyErrorCodeTargetNotFound: + return http.StatusNotFound + case contracts.ApplyErrorCodeExpired, + contracts.ApplyErrorCodeIdempotencyConflict, + contracts.ApplyErrorCodeBaseVersionChanged, + contracts.ApplyErrorCodeTargetCompleted, + contracts.ApplyErrorCodeTargetAlreadySchedule, + contracts.ApplyErrorCodeSlotConflict, + contracts.ApplyErrorCodeAlreadyApplied: + return http.StatusConflict + default: + return http.StatusInternalServerError + } +} + +type nilServiceError string + +func (e nilServiceError) Error() string { + return string(e) +} diff --git a/backend/api/agent.go b/backend/gateway/api/agent.go similarity index 100% rename from backend/api/agent.go rename to backend/gateway/api/agent.go diff --git a/backend/api/container.go b/backend/gateway/api/container.go similarity index 100% rename from backend/api/container.go rename to backend/gateway/api/container.go diff --git a/backend/api/course.go b/backend/gateway/api/course.go similarity index 100% rename from backend/api/course.go rename to backend/gateway/api/course.go diff --git a/backend/api/memory.go b/backend/gateway/api/memory.go similarity index 100% rename from backend/api/memory.go rename to backend/gateway/api/memory.go diff --git a/backend/api/notification.go b/backend/gateway/api/notification.go similarity index 100% rename from backend/api/notification.go rename to backend/gateway/api/notification.go diff --git a/backend/api/schedule.go b/backend/gateway/api/schedule.go similarity index 100% rename from backend/api/schedule.go rename to backend/gateway/api/schedule.go diff --git a/backend/api/task-class.go b/backend/gateway/api/task-class.go similarity index 100% rename from backend/api/task-class.go rename to backend/gateway/api/task-class.go diff --git a/backend/api/task.go b/backend/gateway/api/task.go similarity index 100% rename from backend/api/task.go rename to backend/gateway/api/task.go diff --git a/backend/gateway/userapi/handler.go b/backend/gateway/api/userauth/handler.go similarity index 99% rename from backend/gateway/userapi/handler.go rename to backend/gateway/api/userauth/handler.go index 790278d..d662b7a 100644 --- a/backend/gateway/userapi/handler.go +++ b/backend/gateway/api/userauth/handler.go @@ -1,4 +1,4 @@ -package userapi +package userauthapi import ( "context" diff --git a/backend/gateway/userapi/routes.go b/backend/gateway/api/userauth/routes.go similarity index 98% rename from backend/gateway/userapi/routes.go rename to backend/gateway/api/userauth/routes.go index 3411394..4fda1e1 100644 --- a/backend/gateway/userapi/routes.go +++ b/backend/gateway/api/userauth/routes.go @@ -1,4 +1,4 @@ -package userapi +package userauthapi import ( gatewaymiddleware "github.com/LoveLosita/smartflow/backend/gateway/middleware" diff --git a/backend/gateway/client/activescheduler/client.go b/backend/gateway/client/activescheduler/client.go new file mode 100644 index 0000000..7f11206 --- /dev/null +++ b/backend/gateway/client/activescheduler/client.go @@ -0,0 +1,195 @@ +package activescheduler + +import ( + "context" + "encoding/json" + "errors" + "strings" + "time" + + activepb "github.com/LoveLosita/smartflow/backend/services/active_scheduler/rpc/pb" + contracts "github.com/LoveLosita/smartflow/backend/shared/contracts/activescheduler" + "github.com/zeromicro/go-zero/zrpc" +) + +const ( + defaultEndpoint = "127.0.0.1:9083" + defaultTimeout = 8 * time.Second +) + +type ClientConfig struct { + Endpoints []string + Target string + Timeout time.Duration +} + +// Client 是 gateway 侧 active-scheduler zrpc 的最小适配层。 +// +// 职责边界: +// 1. 只负责跨进程 gRPC 调用和响应 JSON 透传,不碰 DAO、graph、outbox 或 job scanner; +// 2. confirm/apply 业务拒绝从 gRPC status 反解成共享 ApplyError,便于 API 层维持既有响应形状; +// 3. 复杂响应不在 gateway 重新建模,避免主动调度 DTO 复制扩散。 +type Client struct { + rpc activepb.ActiveSchedulerClient +} + +func NewClient(cfg ClientConfig) (*Client, error) { + timeout := cfg.Timeout + if timeout <= 0 { + timeout = defaultTimeout + } + endpoints := normalizeEndpoints(cfg.Endpoints) + target := strings.TrimSpace(cfg.Target) + if len(endpoints) == 0 && target == "" { + endpoints = []string{defaultEndpoint} + } + + zclient, err := zrpc.NewClient(zrpc.RpcClientConf{ + Endpoints: endpoints, + Target: target, + NonBlock: true, + Timeout: int64(timeout / time.Millisecond), + }) + if err != nil { + return nil, err + } + return &Client{rpc: activepb.NewActiveSchedulerClient(zclient.Conn())}, nil +} + +func (c *Client) DryRun(ctx context.Context, req contracts.ActiveScheduleRequest) (json.RawMessage, error) { + if err := c.ensureReady(); err != nil { + return nil, err + } + resp, err := c.rpc.DryRun(ctx, requestToPB(req)) + if err != nil { + return nil, responseFromRPCError(err) + } + return jsonFromResponse(resp) +} + +func (c *Client) Trigger(ctx context.Context, req contracts.ActiveScheduleRequest) (*contracts.TriggerResponse, error) { + if err := c.ensureReady(); err != nil { + return nil, err + } + resp, err := c.rpc.Trigger(ctx, requestToPB(req)) + if err != nil { + return nil, responseFromRPCError(err) + } + return triggerFromPB(resp), nil +} + +func (c *Client) CreatePreview(ctx context.Context, req contracts.ActiveScheduleRequest) (json.RawMessage, error) { + if err := c.ensureReady(); err != nil { + return nil, err + } + resp, err := c.rpc.CreatePreview(ctx, requestToPB(req)) + if err != nil { + return nil, responseFromRPCError(err) + } + return jsonFromResponse(resp) +} + +func (c *Client) GetPreview(ctx context.Context, req contracts.GetPreviewRequest) (json.RawMessage, error) { + if err := c.ensureReady(); err != nil { + return nil, err + } + resp, err := c.rpc.GetPreview(ctx, &activepb.GetPreviewRequest{ + UserId: int64(req.UserID), + PreviewId: req.PreviewID, + }) + if err != nil { + return nil, responseFromRPCError(err) + } + return jsonFromResponse(resp) +} + +func (c *Client) ConfirmPreview(ctx context.Context, req contracts.ConfirmPreviewRequest) (json.RawMessage, error) { + if err := c.ensureReady(); err != nil { + return nil, err + } + resp, err := c.rpc.ConfirmPreview(ctx, confirmToPB(req)) + if err != nil { + return nil, responseFromRPCError(err) + } + return jsonFromResponse(resp) +} + +func (c *Client) ensureReady() error { + if c == nil || c.rpc == nil { + return errors.New("active-scheduler zrpc client is not initialized") + } + return nil +} + +func requestToPB(req contracts.ActiveScheduleRequest) *activepb.ActiveScheduleRequest { + mockNowUnixNano := int64(0) + if req.MockNow != nil && !req.MockNow.IsZero() { + mockNowUnixNano = req.MockNow.UnixNano() + } + return &activepb.ActiveScheduleRequest{ + UserId: int64(req.UserID), + TriggerType: req.TriggerType, + TargetType: req.TargetType, + TargetId: int64(req.TargetID), + FeedbackId: req.FeedbackID, + IdempotencyKey: req.IdempotencyKey, + MockNowUnixNano: mockNowUnixNano, + PayloadJson: []byte(req.Payload), + } +} + +func confirmToPB(req contracts.ConfirmPreviewRequest) *activepb.ConfirmPreviewRequest { + requestedAtUnixNano := int64(0) + if !req.RequestedAt.IsZero() { + requestedAtUnixNano = req.RequestedAt.UnixNano() + } + return &activepb.ConfirmPreviewRequest{ + UserId: int64(req.UserID), + PreviewId: req.PreviewID, + CandidateId: req.CandidateID, + Action: req.Action, + EditedChangesJson: []byte(req.EditedChanges), + IdempotencyKey: req.IdempotencyKey, + RequestedAtUnixNano: requestedAtUnixNano, + TraceId: req.TraceID, + } +} + +func triggerFromPB(resp *activepb.TriggerResponse) *contracts.TriggerResponse { + if resp == nil { + return &contracts.TriggerResponse{} + } + var previewID *string + if resp.HasPreviewId { + value := resp.PreviewId + previewID = &value + } + return &contracts.TriggerResponse{ + TriggerID: resp.TriggerId, + Status: resp.Status, + PreviewID: previewID, + DedupeHit: resp.DedupeHit, + TraceID: resp.TraceId, + } +} + +func jsonFromResponse(resp *activepb.JSONResponse) (json.RawMessage, error) { + if resp == nil { + return nil, errors.New("active-scheduler zrpc service returned empty JSON response") + } + if len(resp.DataJson) == 0 { + return json.RawMessage("null"), nil + } + return json.RawMessage(resp.DataJson), nil +} + +func normalizeEndpoints(values []string) []string { + endpoints := make([]string, 0, len(values)) + for _, value := range values { + trimmed := strings.TrimSpace(value) + if trimmed != "" { + endpoints = append(endpoints, trimmed) + } + } + return endpoints +} diff --git a/backend/gateway/client/activescheduler/errors.go b/backend/gateway/client/activescheduler/errors.go new file mode 100644 index 0000000..f0aeb71 --- /dev/null +++ b/backend/gateway/client/activescheduler/errors.go @@ -0,0 +1,116 @@ +package activescheduler + +import ( + "errors" + "fmt" + "strings" + + "github.com/LoveLosita/smartflow/backend/respond" + contracts "github.com/LoveLosita/smartflow/backend/shared/contracts/activescheduler" + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const activeSchedulerApplyErrorDomain = "smartflow.active_scheduler.apply" + +// responseFromRPCError 负责把 active-scheduler 的 gRPC 错误反解回项目内错误。 +// +// 职责边界: +// 1. confirm/apply 业务错误恢复为 shared/contracts/activescheduler.ApplyError; +// 2. 普通业务错误恢复为 respond.Response,供 API 层复用 DealWithError; +// 3. 服务不可用或未知内部错误包装成普通 error,避免误报成用户可修正的参数问题。 +func responseFromRPCError(err error) error { + if err == nil { + return nil + } + + st, ok := status.FromError(err) + if !ok { + return wrapRPCError(err) + } + if applyErr, ok := applyErrorFromStatus(st); ok { + return applyErr + } + if resp, ok := responseFromStatus(st); ok { + return resp + } + + switch st.Code() { + case codes.Internal, codes.Unknown, codes.Unavailable, codes.DeadlineExceeded, codes.DataLoss, codes.Unimplemented: + msg := strings.TrimSpace(st.Message()) + if msg == "" { + msg = "active-scheduler zrpc service internal error" + } + return wrapRPCError(errors.New(msg)) + } + + msg := strings.TrimSpace(st.Message()) + if msg == "" { + msg = "active-scheduler zrpc service rejected request" + } + return respond.Response{Status: grpcCodeToRespondStatus(st.Code()), Info: msg} +} + +func applyErrorFromStatus(st *status.Status) (*contracts.ApplyError, bool) { + for _, detail := range st.Details() { + info, ok := detail.(*errdetails.ErrorInfo) + if !ok || info.Domain != activeSchedulerApplyErrorDomain { + continue + } + message := strings.TrimSpace(st.Message()) + if message == "" && info.Metadata != nil { + message = strings.TrimSpace(info.Metadata["info"]) + } + return &contracts.ApplyError{ + Code: contracts.ApplyErrorCode(strings.TrimSpace(info.Reason)), + Message: message, + }, true + } + return nil, false +} + +func responseFromStatus(st *status.Status) (respond.Response, bool) { + if st == nil { + return respond.Response{}, false + } + for _, detail := range st.Details() { + info, ok := detail.(*errdetails.ErrorInfo) + if !ok { + continue + } + statusValue := strings.TrimSpace(info.Reason) + if statusValue == "" { + statusValue = grpcCodeToRespondStatus(st.Code()) + } + message := strings.TrimSpace(st.Message()) + if message == "" && info.Metadata != nil { + message = strings.TrimSpace(info.Metadata["info"]) + } + if message == "" { + message = statusValue + } + return respond.Response{Status: statusValue, Info: message}, true + } + return respond.Response{}, false +} + +func grpcCodeToRespondStatus(code codes.Code) string { + switch code { + case codes.Unauthenticated: + return respond.ErrUnauthorized.Status + case codes.InvalidArgument: + return respond.MissingParam.Status + case codes.Internal, codes.Unknown, codes.DataLoss: + return "500" + default: + return "400" + } +} + +func wrapRPCError(err error) error { + if err == nil { + return nil + } + return fmt.Errorf("调用 active-scheduler zrpc 服务失败: %w", err) +} diff --git a/backend/gateway/notification/client.go b/backend/gateway/client/notification/client.go similarity index 100% rename from backend/gateway/notification/client.go rename to backend/gateway/client/notification/client.go diff --git a/backend/gateway/notification/errors.go b/backend/gateway/client/notification/errors.go similarity index 100% rename from backend/gateway/notification/errors.go rename to backend/gateway/client/notification/errors.go diff --git a/backend/gateway/userauth/client.go b/backend/gateway/client/userauth/client.go similarity index 100% rename from backend/gateway/userauth/client.go rename to backend/gateway/client/userauth/client.go diff --git a/backend/gateway/userauth/errors.go b/backend/gateway/client/userauth/errors.go similarity index 100% rename from backend/gateway/userauth/errors.go rename to backend/gateway/client/userauth/errors.go diff --git a/backend/gateway/router/router.go b/backend/gateway/router/router.go index 0185881..270cb7d 100644 --- a/backend/gateway/router/router.go +++ b/backend/gateway/router/router.go @@ -7,10 +7,10 @@ import ( "net/http" "time" - "github.com/LoveLosita/smartflow/backend/api" "github.com/LoveLosita/smartflow/backend/dao" + "github.com/LoveLosita/smartflow/backend/gateway/api" + userauthapi "github.com/LoveLosita/smartflow/backend/gateway/api/userauth" gatewaymiddleware "github.com/LoveLosita/smartflow/backend/gateway/middleware" - "github.com/LoveLosita/smartflow/backend/gateway/userapi" rootmiddleware "github.com/LoveLosita/smartflow/backend/middleware" "github.com/LoveLosita/smartflow/backend/pkg" "github.com/LoveLosita/smartflow/backend/shared/ports" @@ -66,7 +66,7 @@ func RegisterRouters(handlers *api.ApiHandlers, authClient ports.UserAuthClient, }) }) - userapi.RegisterRoutes(apiGroup, userapi.NewUserHandler(authClient), authClient, limiter) + userauthapi.RegisterRoutes(apiGroup, userauthapi.NewUserHandler(authClient), authClient, limiter) taskGroup := apiGroup.Group("/task") { diff --git a/backend/service/events/outbox_bus.go b/backend/service/events/outbox_bus.go index be62250..df9a342 100644 --- a/backend/service/events/outbox_bus.go +++ b/backend/service/events/outbox_bus.go @@ -169,7 +169,6 @@ func OutboxServiceNames() []string { string(outboxHandlerServiceAgent), string(outboxHandlerServiceTask), string(outboxHandlerServiceMemory), - string(outboxHandlerServiceActiveScheduler), } } diff --git a/backend/active_scheduler/adapters/gorm_readers.go b/backend/services/active_scheduler/core/adapters/gorm_readers.go similarity index 98% rename from backend/active_scheduler/adapters/gorm_readers.go rename to backend/services/active_scheduler/core/adapters/gorm_readers.go index bcce50b..bec1f74 100644 --- a/backend/active_scheduler/adapters/gorm_readers.go +++ b/backend/services/active_scheduler/core/adapters/gorm_readers.go @@ -6,10 +6,10 @@ import ( "fmt" "time" - "github.com/LoveLosita/smartflow/backend/active_scheduler/ports" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" "github.com/LoveLosita/smartflow/backend/conv" "github.com/LoveLosita/smartflow/backend/model" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/ports" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" "gorm.io/gorm" ) diff --git a/backend/active_scheduler/apply/convert.go b/backend/services/active_scheduler/core/apply/convert.go similarity index 100% rename from backend/active_scheduler/apply/convert.go rename to backend/services/active_scheduler/core/apply/convert.go diff --git a/backend/active_scheduler/apply/convert_helpers.go b/backend/services/active_scheduler/core/apply/convert_helpers.go similarity index 100% rename from backend/active_scheduler/apply/convert_helpers.go rename to backend/services/active_scheduler/core/apply/convert_helpers.go diff --git a/backend/active_scheduler/apply/hash.go b/backend/services/active_scheduler/core/apply/hash.go similarity index 100% rename from backend/active_scheduler/apply/hash.go rename to backend/services/active_scheduler/core/apply/hash.go diff --git a/backend/active_scheduler/apply/types.go b/backend/services/active_scheduler/core/apply/types.go similarity index 100% rename from backend/active_scheduler/apply/types.go rename to backend/services/active_scheduler/core/apply/types.go diff --git a/backend/active_scheduler/apply/validate.go b/backend/services/active_scheduler/core/apply/validate.go similarity index 100% rename from backend/active_scheduler/apply/validate.go rename to backend/services/active_scheduler/core/apply/validate.go diff --git a/backend/active_scheduler/applyadapter/adapter.go b/backend/services/active_scheduler/core/applyadapter/adapter.go similarity index 100% rename from backend/active_scheduler/applyadapter/adapter.go rename to backend/services/active_scheduler/core/applyadapter/adapter.go diff --git a/backend/active_scheduler/applyadapter/types.go b/backend/services/active_scheduler/core/applyadapter/types.go similarity index 100% rename from backend/active_scheduler/applyadapter/types.go rename to backend/services/active_scheduler/core/applyadapter/types.go diff --git a/backend/active_scheduler/candidate/candidate.go b/backend/services/active_scheduler/core/candidate/candidate.go similarity index 96% rename from backend/active_scheduler/candidate/candidate.go rename to backend/services/active_scheduler/core/candidate/candidate.go index 75c2d8c..57f0fa5 100644 --- a/backend/active_scheduler/candidate/candidate.go +++ b/backend/services/active_scheduler/core/candidate/candidate.go @@ -4,10 +4,10 @@ import ( "fmt" "sort" - schedulercontext "github.com/LoveLosita/smartflow/backend/active_scheduler/context" - "github.com/LoveLosita/smartflow/backend/active_scheduler/observe" - "github.com/LoveLosita/smartflow/backend/active_scheduler/ports" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" + schedulercontext "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/context" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/observe" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/ports" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" ) type Type string diff --git a/backend/active_scheduler/context/builder.go b/backend/services/active_scheduler/core/context/builder.go similarity index 97% rename from backend/active_scheduler/context/builder.go rename to backend/services/active_scheduler/core/context/builder.go index 566538c..64e1f8e 100644 --- a/backend/active_scheduler/context/builder.go +++ b/backend/services/active_scheduler/core/context/builder.go @@ -5,8 +5,8 @@ import ( "errors" "time" - "github.com/LoveLosita/smartflow/backend/active_scheduler/ports" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/ports" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" ) // Builder 负责把统一 trigger 转成主动调度只读事实快照。 diff --git a/backend/active_scheduler/context/context.go b/backend/services/active_scheduler/core/context/context.go similarity index 92% rename from backend/active_scheduler/context/context.go rename to backend/services/active_scheduler/core/context/context.go index 4c68de9..71d170b 100644 --- a/backend/active_scheduler/context/context.go +++ b/backend/services/active_scheduler/core/context/context.go @@ -3,8 +3,8 @@ package schedulercontext import ( "time" - "github.com/LoveLosita/smartflow/backend/active_scheduler/ports" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/ports" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" ) const ( diff --git a/backend/active_scheduler/feedbacklocate/dto.go b/backend/services/active_scheduler/core/feedbacklocate/dto.go similarity index 100% rename from backend/active_scheduler/feedbacklocate/dto.go rename to backend/services/active_scheduler/core/feedbacklocate/dto.go diff --git a/backend/active_scheduler/feedbacklocate/prompt.go b/backend/services/active_scheduler/core/feedbacklocate/prompt.go similarity index 100% rename from backend/active_scheduler/feedbacklocate/prompt.go rename to backend/services/active_scheduler/core/feedbacklocate/prompt.go diff --git a/backend/active_scheduler/feedbacklocate/service.go b/backend/services/active_scheduler/core/feedbacklocate/service.go similarity index 98% rename from backend/active_scheduler/feedbacklocate/service.go rename to backend/services/active_scheduler/core/feedbacklocate/service.go index 4e07fcf..6ee382c 100644 --- a/backend/active_scheduler/feedbacklocate/service.go +++ b/backend/services/active_scheduler/core/feedbacklocate/service.go @@ -9,8 +9,8 @@ import ( "strings" "time" - "github.com/LoveLosita/smartflow/backend/active_scheduler/ports" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/ports" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) diff --git a/backend/active_scheduler/graph/runner.go b/backend/services/active_scheduler/core/graph/runner.go similarity index 93% rename from backend/active_scheduler/graph/runner.go rename to backend/services/active_scheduler/core/graph/runner.go index e29c2a2..bf58edf 100644 --- a/backend/active_scheduler/graph/runner.go +++ b/backend/services/active_scheduler/core/graph/runner.go @@ -5,11 +5,11 @@ import ( "errors" "fmt" - "github.com/LoveLosita/smartflow/backend/active_scheduler/candidate" - schedulercontext "github.com/LoveLosita/smartflow/backend/active_scheduler/context" - "github.com/LoveLosita/smartflow/backend/active_scheduler/observe" - "github.com/LoveLosita/smartflow/backend/active_scheduler/selection" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/candidate" + schedulercontext "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/context" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/observe" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/selection" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" "github.com/cloudwego/eino/compose" ) diff --git a/backend/active_scheduler/job/scanner.go b/backend/services/active_scheduler/core/job/scanner.go similarity index 96% rename from backend/active_scheduler/job/scanner.go rename to backend/services/active_scheduler/core/job/scanner.go index c4d0c93..cca66e0 100644 --- a/backend/active_scheduler/job/scanner.go +++ b/backend/services/active_scheduler/core/job/scanner.go @@ -8,11 +8,11 @@ import ( "log" "time" - "github.com/LoveLosita/smartflow/backend/active_scheduler/ports" - activesvc "github.com/LoveLosita/smartflow/backend/active_scheduler/service" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" "github.com/LoveLosita/smartflow/backend/dao" "github.com/LoveLosita/smartflow/backend/model" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/ports" + activesvc "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/service" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" ) const ( diff --git a/backend/active_scheduler/observe/observe.go b/backend/services/active_scheduler/core/observe/observe.go similarity index 98% rename from backend/active_scheduler/observe/observe.go rename to backend/services/active_scheduler/core/observe/observe.go index cb2f8ec..2ed111d 100644 --- a/backend/active_scheduler/observe/observe.go +++ b/backend/services/active_scheduler/core/observe/observe.go @@ -3,8 +3,8 @@ package observe import ( "time" - schedulercontext "github.com/LoveLosita/smartflow/backend/active_scheduler/context" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" + schedulercontext "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/context" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" ) type DecisionAction string diff --git a/backend/active_scheduler/ports/facts.go b/backend/services/active_scheduler/core/ports/facts.go similarity index 100% rename from backend/active_scheduler/ports/facts.go rename to backend/services/active_scheduler/core/ports/facts.go diff --git a/backend/active_scheduler/preview/converter.go b/backend/services/active_scheduler/core/preview/converter.go similarity index 97% rename from backend/active_scheduler/preview/converter.go rename to backend/services/active_scheduler/core/preview/converter.go index c64c01f..5020488 100644 --- a/backend/active_scheduler/preview/converter.go +++ b/backend/services/active_scheduler/core/preview/converter.go @@ -9,11 +9,11 @@ import ( "strings" "time" - "github.com/LoveLosita/smartflow/backend/active_scheduler/candidate" - schedulercontext "github.com/LoveLosita/smartflow/backend/active_scheduler/context" - "github.com/LoveLosita/smartflow/backend/active_scheduler/observe" - "github.com/LoveLosita/smartflow/backend/active_scheduler/ports" "github.com/LoveLosita/smartflow/backend/model" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/candidate" + schedulercontext "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/context" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/observe" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/ports" ) func candidateDTO(item candidate.Candidate) CandidateDTO { diff --git a/backend/active_scheduler/preview/dto.go b/backend/services/active_scheduler/core/preview/dto.go similarity index 96% rename from backend/active_scheduler/preview/dto.go rename to backend/services/active_scheduler/core/preview/dto.go index 4d5d5e7..3c8de51 100644 --- a/backend/active_scheduler/preview/dto.go +++ b/backend/services/active_scheduler/core/preview/dto.go @@ -4,10 +4,10 @@ import ( "encoding/json" "time" - "github.com/LoveLosita/smartflow/backend/active_scheduler/candidate" - schedulercontext "github.com/LoveLosita/smartflow/backend/active_scheduler/context" - "github.com/LoveLosita/smartflow/backend/active_scheduler/observe" - "github.com/LoveLosita/smartflow/backend/active_scheduler/ports" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/candidate" + schedulercontext "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/context" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/observe" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/ports" ) // CreatePreviewRequest 是把 dry-run 结果固化成主动调度预览的请求 DTO。 diff --git a/backend/active_scheduler/preview/service.go b/backend/services/active_scheduler/core/preview/service.go similarity index 97% rename from backend/active_scheduler/preview/service.go rename to backend/services/active_scheduler/core/preview/service.go index 454de2b..7910f1a 100644 --- a/backend/active_scheduler/preview/service.go +++ b/backend/services/active_scheduler/core/preview/service.go @@ -7,10 +7,10 @@ import ( "strings" "time" - "github.com/LoveLosita/smartflow/backend/active_scheduler/candidate" - schedulercontext "github.com/LoveLosita/smartflow/backend/active_scheduler/context" - "github.com/LoveLosita/smartflow/backend/active_scheduler/observe" "github.com/LoveLosita/smartflow/backend/model" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/candidate" + schedulercontext "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/context" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/observe" "github.com/google/uuid" "gorm.io/gorm" ) diff --git a/backend/active_scheduler/selection/dto.go b/backend/services/active_scheduler/core/selection/dto.go similarity index 89% rename from backend/active_scheduler/selection/dto.go rename to backend/services/active_scheduler/core/selection/dto.go index cd3e9d4..59667a9 100644 --- a/backend/active_scheduler/selection/dto.go +++ b/backend/services/active_scheduler/core/selection/dto.go @@ -1,9 +1,9 @@ package selection import ( - "github.com/LoveLosita/smartflow/backend/active_scheduler/candidate" - schedulercontext "github.com/LoveLosita/smartflow/backend/active_scheduler/context" - "github.com/LoveLosita/smartflow/backend/active_scheduler/observe" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/candidate" + schedulercontext "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/context" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/observe" ) const ( diff --git a/backend/active_scheduler/selection/prompt.go b/backend/services/active_scheduler/core/selection/prompt.go similarity index 98% rename from backend/active_scheduler/selection/prompt.go rename to backend/services/active_scheduler/core/selection/prompt.go index a71e354..99b9b21 100644 --- a/backend/active_scheduler/selection/prompt.go +++ b/backend/services/active_scheduler/core/selection/prompt.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/LoveLosita/smartflow/backend/active_scheduler/candidate" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/candidate" ) const selectionSystemPrompt = ` diff --git a/backend/active_scheduler/selection/service.go b/backend/services/active_scheduler/core/selection/service.go similarity index 99% rename from backend/active_scheduler/selection/service.go rename to backend/services/active_scheduler/core/selection/service.go index 9f52d52..ac51588 100644 --- a/backend/active_scheduler/selection/service.go +++ b/backend/services/active_scheduler/core/selection/service.go @@ -9,7 +9,7 @@ import ( "strings" "time" - "github.com/LoveLosita/smartflow/backend/active_scheduler/candidate" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/candidate" llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) diff --git a/backend/active_scheduler/service/dry_run.go b/backend/services/active_scheduler/core/service/dry_run.go similarity index 86% rename from backend/active_scheduler/service/dry_run.go rename to backend/services/active_scheduler/core/service/dry_run.go index cc02e11..bcfad6f 100644 --- a/backend/active_scheduler/service/dry_run.go +++ b/backend/services/active_scheduler/core/service/dry_run.go @@ -5,11 +5,11 @@ import ( "errors" "time" - "github.com/LoveLosita/smartflow/backend/active_scheduler/candidate" - schedulercontext "github.com/LoveLosita/smartflow/backend/active_scheduler/context" - "github.com/LoveLosita/smartflow/backend/active_scheduler/observe" - "github.com/LoveLosita/smartflow/backend/active_scheduler/ports" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/candidate" + schedulercontext "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/context" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/observe" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/ports" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" ) // DryRunResult 是 API dry-run / worker 测试入口可直接消费的同步结果。 diff --git a/backend/active_scheduler/service/dry_run_graph.go b/backend/services/active_scheduler/core/service/dry_run_graph.go similarity index 82% rename from backend/active_scheduler/service/dry_run_graph.go rename to backend/services/active_scheduler/core/service/dry_run_graph.go index bc5f566..f748bb7 100644 --- a/backend/active_scheduler/service/dry_run_graph.go +++ b/backend/services/active_scheduler/core/service/dry_run_graph.go @@ -3,8 +3,8 @@ package service import ( "context" - activegraph "github.com/LoveLosita/smartflow/backend/active_scheduler/graph" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" + activegraph "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/graph" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" ) // AsGraphDryRunFunc 把现有 dry-run service 适配成 graph runner 可用的入口。 diff --git a/backend/active_scheduler/service/preview_confirm.go b/backend/services/active_scheduler/core/service/preview_confirm.go similarity index 98% rename from backend/active_scheduler/service/preview_confirm.go rename to backend/services/active_scheduler/core/service/preview_confirm.go index c0a636b..b95ef46 100644 --- a/backend/active_scheduler/service/preview_confirm.go +++ b/backend/services/active_scheduler/core/service/preview_confirm.go @@ -6,11 +6,11 @@ import ( "errors" "time" - activeapply "github.com/LoveLosita/smartflow/backend/active_scheduler/apply" - "github.com/LoveLosita/smartflow/backend/active_scheduler/applyadapter" - activepreview "github.com/LoveLosita/smartflow/backend/active_scheduler/preview" "github.com/LoveLosita/smartflow/backend/dao" "github.com/LoveLosita/smartflow/backend/model" + activeapply "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/apply" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/applyadapter" + activepreview "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/preview" "gorm.io/gorm" ) diff --git a/backend/active_scheduler/service/session_bridge.go b/backend/services/active_scheduler/core/service/session_bridge.go similarity index 98% rename from backend/active_scheduler/service/session_bridge.go rename to backend/services/active_scheduler/core/service/session_bridge.go index cd098bd..efbe77b 100644 --- a/backend/active_scheduler/service/session_bridge.go +++ b/backend/services/active_scheduler/core/service/session_bridge.go @@ -8,10 +8,10 @@ import ( "strings" "time" - activepreview "github.com/LoveLosita/smartflow/backend/active_scheduler/preview" - "github.com/LoveLosita/smartflow/backend/active_scheduler/selection" "github.com/LoveLosita/smartflow/backend/dao" "github.com/LoveLosita/smartflow/backend/model" + activepreview "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/preview" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/selection" "github.com/google/uuid" "gorm.io/gorm" ) diff --git a/backend/active_scheduler/service/trigger.go b/backend/services/active_scheduler/core/service/trigger.go similarity index 99% rename from backend/active_scheduler/service/trigger.go rename to backend/services/active_scheduler/core/service/trigger.go index 42f0c74..8f9c45a 100644 --- a/backend/active_scheduler/service/trigger.go +++ b/backend/services/active_scheduler/core/service/trigger.go @@ -8,10 +8,10 @@ import ( "strings" "time" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" "github.com/LoveLosita/smartflow/backend/dao" outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" "github.com/LoveLosita/smartflow/backend/model" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" sharedevents "github.com/LoveLosita/smartflow/backend/shared/events" "github.com/google/uuid" "gorm.io/gorm" diff --git a/backend/active_scheduler/service/trigger_outbox.go b/backend/services/active_scheduler/core/service/trigger_outbox.go similarity index 100% rename from backend/active_scheduler/service/trigger_outbox.go rename to backend/services/active_scheduler/core/service/trigger_outbox.go diff --git a/backend/active_scheduler/service/trigger_pipeline.go b/backend/services/active_scheduler/core/service/trigger_pipeline.go similarity index 97% rename from backend/active_scheduler/service/trigger_pipeline.go rename to backend/services/active_scheduler/core/service/trigger_pipeline.go index 3686a87..60694e0 100644 --- a/backend/active_scheduler/service/trigger_pipeline.go +++ b/backend/services/active_scheduler/core/service/trigger_pipeline.go @@ -7,13 +7,13 @@ import ( "strings" "time" - activegraph "github.com/LoveLosita/smartflow/backend/active_scheduler/graph" - activepreview "github.com/LoveLosita/smartflow/backend/active_scheduler/preview" - "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" "github.com/LoveLosita/smartflow/backend/dao" kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" "github.com/LoveLosita/smartflow/backend/model" + activegraph "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/graph" + activepreview "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/preview" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" sharedevents "github.com/LoveLosita/smartflow/backend/shared/events" "github.com/google/uuid" "gorm.io/gorm" diff --git a/backend/active_scheduler/trigger/types.go b/backend/services/active_scheduler/core/trigger/types.go similarity index 100% rename from backend/active_scheduler/trigger/types.go rename to backend/services/active_scheduler/core/trigger/types.go diff --git a/backend/services/active_scheduler/dao/connect.go b/backend/services/active_scheduler/dao/connect.go new file mode 100644 index 0000000..a560836 --- /dev/null +++ b/backend/services/active_scheduler/dao/connect.go @@ -0,0 +1,131 @@ +package dao + +import ( + "fmt" + + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + coremodel "github.com/LoveLosita/smartflow/backend/model" + "github.com/spf13/viper" + "gorm.io/driver/mysql" + "gorm.io/gorm" +) + +// OpenDBFromConfig 创建 active-scheduler 服务自己的数据库句柄。 +// +// 职责边界: +// 1. 只迁移 active-scheduler 拥有的 trigger / preview / job / session 表和本服务 outbox 表; +// 2. 不迁移 task、schedule、agent、notification 或 user/auth 表,避免独立进程越权管理其它服务模型; +// 3. 返回的 *gorm.DB 供服务内主链路、due job scanner 和 outbox consumer 复用。 +func OpenDBFromConfig() (*gorm.DB, error) { + host := viper.GetString("database.host") + port := viper.GetString("database.port") + user := viper.GetString("database.user") + password := viper.GetString("database.password") + dbname := viper.GetString("database.dbname") + + dsn := fmt.Sprintf( + "%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", + user, password, host, port, dbname, + ) + + db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) + if err != nil { + return nil, err + } + if err = db.AutoMigrate( + &coremodel.ActiveScheduleJob{}, + &coremodel.ActiveScheduleTrigger{}, + &coremodel.ActiveSchedulePreview{}, + &coremodel.ActiveScheduleSession{}, + ); err != nil { + return nil, fmt.Errorf("auto migrate active-scheduler tables failed: %w", err) + } + if err = autoMigrateActiveSchedulerOutboxTable(db); err != nil { + return nil, err + } + if err = ensureRuntimeDependencyTables(db); err != nil { + return nil, err + } + return db, nil +} + +// autoMigrateActiveSchedulerOutboxTable 只迁移 active-scheduler 服务自己的 outbox 物理表。 +// +// 职责边界: +// 1. 只负责 active-scheduler.outbox 对应表,不碰其它服务 outbox; +// 2. 让独立 active-scheduler 服务可以单独发布 trigger 并消费 active_schedule.triggered; +// 3. 若后续调整 outbox 表名,只改 service catalog,不在这里硬编码。 +func autoMigrateActiveSchedulerOutboxTable(db *gorm.DB) error { + cfg, ok := outboxinfra.ResolveServiceConfig(outboxinfra.ServiceActiveScheduler) + if !ok { + return fmt.Errorf("resolve active-scheduler outbox config failed") + } + if err := db.Table(cfg.TableName).AutoMigrate(&coremodel.AgentOutboxMessage{}); err != nil { + return fmt.Errorf("auto migrate active-scheduler outbox table failed for %s (%s): %w", cfg.Name, cfg.TableName, err) + } + return nil +} + +type runtimeDependencyTable struct { + Name string + Reason string +} + +// ensureRuntimeDependencyTables 在服务启动期校验迁移期共享主库依赖。 +// +// 职责边界: +// 1. 只检查表是否存在,不 AutoMigrate、不补列、不修改任何跨域表; +// 2. 把 active-scheduler 运行时仍然需要的 task / schedule / agent / notification outbox 边界显式化; +// 3. 若部署顺序、库权限或表结构归属不满足,启动阶段直接 fail fast,避免第一次 trigger 才反复重试。 +func ensureRuntimeDependencyTables(db *gorm.DB) error { + if db == nil { + return fmt.Errorf("active-scheduler runtime dependency check failed: db is nil") + } + for _, table := range activeSchedulerRuntimeDependencyTables() { + if err := ensureTableExists(db, table); err != nil { + return err + } + } + return nil +} + +// ensureTableExists 只做存在性探测。 +// +// 职责边界: +// 1. 不负责判断字段是否兼容,字段级契约由拥有该表的服务迁移脚本保证; +// 2. 不负责自动修复缺失表,避免 active-scheduler 越权创建其它服务的数据模型; +// 3. 返回错误会阻止服务启动,让部署问题尽早显现。 +func ensureTableExists(db *gorm.DB, table runtimeDependencyTable) error { + if table.Name == "" { + return fmt.Errorf("active-scheduler runtime dependency table name is empty: %s", table.Reason) + } + if db.Migrator().HasTable(table.Name) { + return nil + } + return fmt.Errorf("active-scheduler runtime dependency table missing: %s (%s)", table.Name, table.Reason) +} + +// activeSchedulerRuntimeDependencyTables 列出迁移期运行仍需共享主库访问的外部表。 +// +// 说明: +// 1. active-scheduler 自有表在 OpenDBFromConfig 内迁移,这里只放跨域依赖; +// 2. notification outbox 表名来自 service catalog,避免和 outbox 多表路由配置漂移; +// 3. 后续切到 task/schedule/agent/notification RPC 或 read model 后,应从这里移除对应表依赖。 +func activeSchedulerRuntimeDependencyTables() []runtimeDependencyTable { + notificationOutboxTable := "notification_outbox_messages" + if cfg, ok := outboxinfra.ResolveServiceConfig(outboxinfra.ServiceNotification); ok && cfg.TableName != "" { + notificationOutboxTable = cfg.TableName + } + + return []runtimeDependencyTable{ + {Name: "tasks", Reason: "dry-run 读取 task_pool 事实,confirm 时锁定 task_pool 目标"}, + {Name: "schedule_events", Reason: "dry-run 读取日程事实,confirm 时写入正式日程事件"}, + {Name: "schedules", Reason: "dry-run 读取节次占用,confirm 时写入正式节次"}, + {Name: "task_classes", Reason: "confirm create_makeup 时校验 task_item 归属"}, + {Name: "task_items", Reason: "confirm create_makeup 时锁定 task_item 目标"}, + {Name: "agent_chats", Reason: "trigger 生成 preview 后预建主动调度会话"}, + {Name: "chat_histories", Reason: "trigger 生成 preview 后写入会话首屏消息"}, + {Name: "agent_timeline_events", Reason: "trigger 生成 preview 后写入主动调度时间线卡片"}, + {Name: notificationOutboxTable, Reason: "ShouldNotify=true 时投递 notification.feishu.requested 事件"}, + } +} diff --git a/backend/services/active_scheduler/rpc/active_scheduler.proto b/backend/services/active_scheduler/rpc/active_scheduler.proto new file mode 100644 index 0000000..dc36c8b --- /dev/null +++ b/backend/services/active_scheduler/rpc/active_scheduler.proto @@ -0,0 +1,53 @@ +syntax = "proto3"; + +package smartflow.active_scheduler; + +option go_package = "github.com/LoveLosita/smartflow/backend/services/active_scheduler/rpc/pb"; + +service ActiveScheduler { + rpc DryRun(ActiveScheduleRequest) returns (JSONResponse); + rpc Trigger(ActiveScheduleRequest) returns (TriggerResponse); + rpc CreatePreview(ActiveScheduleRequest) returns (JSONResponse); + rpc GetPreview(GetPreviewRequest) returns (JSONResponse); + rpc ConfirmPreview(ConfirmPreviewRequest) returns (JSONResponse); +} + +message ActiveScheduleRequest { + int64 user_id = 1; + string trigger_type = 2; + string target_type = 3; + int64 target_id = 4; + string feedback_id = 5; + string idempotency_key = 6; + int64 mock_now_unix_nano = 7; + bytes payload_json = 8; +} + +message GetPreviewRequest { + int64 user_id = 1; + string preview_id = 2; +} + +message ConfirmPreviewRequest { + int64 user_id = 1; + string preview_id = 2; + string candidate_id = 3; + string action = 4; + bytes edited_changes_json = 5; + string idempotency_key = 6; + int64 requested_at_unix_nano = 7; + string trace_id = 8; +} + +message JSONResponse { + bytes data_json = 1; +} + +message TriggerResponse { + string trigger_id = 1; + string status = 2; + string preview_id = 3; + bool has_preview_id = 4; + bool dedupe_hit = 5; + string trace_id = 6; +} diff --git a/backend/services/active_scheduler/rpc/errors.go b/backend/services/active_scheduler/rpc/errors.go new file mode 100644 index 0000000..420371b --- /dev/null +++ b/backend/services/active_scheduler/rpc/errors.go @@ -0,0 +1,122 @@ +package rpc + +import ( + "errors" + "log" + "strings" + + "github.com/LoveLosita/smartflow/backend/respond" + activeapply "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/apply" + contracts "github.com/LoveLosita/smartflow/backend/shared/contracts/activescheduler" + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + activeSchedulerErrorDomain = "smartflow.active_scheduler" + activeSchedulerApplyErrorDomain = "smartflow.active_scheduler.apply" +) + +// grpcErrorFromServiceError 负责把 active-scheduler 内部错误收口成 gRPC status。 +// +// 职责边界: +// 1. apply 业务错误保留 error_code,供 gateway 恢复 confirm/apply 的 HTTP 语义; +// 2. respond.Response 继续按项目内业务码传输; +// 3. 未分类错误只暴露通用内部错误,详细信息留在服务日志。 +func grpcErrorFromServiceError(err error) error { + if err == nil { + return nil + } + if applyErr, ok := activeapply.AsApplyError(err); ok { + return grpcErrorFromApplyError(applyErr) + } + + var resp respond.Response + if errors.As(err, &resp) { + return grpcErrorFromResponse(resp) + } + + log.Printf("active-scheduler rpc internal error: %v", err) + return status.Error(codes.Internal, "active-scheduler service internal error") +} + +func grpcErrorFromApplyError(applyErr *activeapply.ApplyError) error { + if applyErr == nil { + return status.Error(codes.Internal, "active-scheduler apply error") + } + message := strings.TrimSpace(applyErr.Message) + if message == "" { + message = string(applyErr.Code) + } + st := status.New(grpcCodeFromApplyErrorCode(applyErr.Code), message) + detail := &errdetails.ErrorInfo{ + Domain: activeSchedulerApplyErrorDomain, + Reason: string(applyErr.Code), + Metadata: map[string]string{ + "info": message, + }, + } + withDetails, err := st.WithDetails(detail) + if err != nil { + return st.Err() + } + return withDetails.Err() +} + +func grpcErrorFromResponse(resp respond.Response) error { + code := grpcCodeFromRespondStatus(resp.Status) + message := strings.TrimSpace(resp.Info) + if message == "" { + message = strings.TrimSpace(resp.Status) + } + st := status.New(code, message) + detail := &errdetails.ErrorInfo{ + Domain: activeSchedulerErrorDomain, + Reason: resp.Status, + Metadata: map[string]string{ + "info": resp.Info, + }, + } + withDetails, err := st.WithDetails(detail) + if err != nil { + return st.Err() + } + return withDetails.Err() +} + +func grpcCodeFromApplyErrorCode(code activeapply.ErrorCode) codes.Code { + switch contracts.ApplyErrorCode(code) { + case contracts.ApplyErrorCodeForbidden: + return codes.PermissionDenied + case contracts.ApplyErrorCodeTargetNotFound: + return codes.NotFound + case contracts.ApplyErrorCodeDBError: + return codes.Internal + case contracts.ApplyErrorCodeExpired, + contracts.ApplyErrorCodeIdempotencyConflict, + contracts.ApplyErrorCodeBaseVersionChanged, + contracts.ApplyErrorCodeTargetCompleted, + contracts.ApplyErrorCodeTargetAlreadySchedule, + contracts.ApplyErrorCodeSlotConflict, + contracts.ApplyErrorCodeAlreadyApplied: + return codes.FailedPrecondition + default: + return codes.InvalidArgument + } +} + +func grpcCodeFromRespondStatus(statusValue string) codes.Code { + switch strings.TrimSpace(statusValue) { + case respond.MissingToken.Status, respond.InvalidToken.Status, respond.InvalidClaims.Status, + respond.ErrUnauthorized.Status, respond.WrongTokenType.Status, respond.UserLoggedOut.Status: + return codes.Unauthenticated + case respond.MissingParam.Status, respond.WrongParamType.Status, respond.ParamTooLong.Status: + return codes.InvalidArgument + } + + if strings.HasPrefix(strings.TrimSpace(statusValue), "5") { + return codes.Internal + } + return codes.InvalidArgument +} diff --git a/backend/services/active_scheduler/rpc/handler.go b/backend/services/active_scheduler/rpc/handler.go new file mode 100644 index 0000000..51a1fdc --- /dev/null +++ b/backend/services/active_scheduler/rpc/handler.go @@ -0,0 +1,155 @@ +package rpc + +import ( + "context" + "encoding/json" + "errors" + "time" + + "github.com/LoveLosita/smartflow/backend/respond" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/rpc/pb" + activeschedulersv "github.com/LoveLosita/smartflow/backend/services/active_scheduler/sv" + contracts "github.com/LoveLosita/smartflow/backend/shared/contracts/activescheduler" +) + +type Handler struct { + pb.UnimplementedActiveSchedulerServer + svc *activeschedulersv.Service +} + +func NewHandler(svc *activeschedulersv.Service) *Handler { + return &Handler{svc: svc} +} + +// DryRun 负责把 gRPC 请求转换为主动调度 dry-run 服务调用。 +func (h *Handler) DryRun(ctx context.Context, req *pb.ActiveScheduleRequest) (*pb.JSONResponse, error) { + if h == nil || h.svc == nil { + return nil, grpcErrorFromServiceError(errors.New("active-scheduler service dependency not initialized")) + } + if req == nil { + return nil, grpcErrorFromServiceError(respond.MissingParam) + } + data, err := h.svc.DryRun(ctx, activeScheduleRequestFromPB(req)) + if err != nil { + return nil, grpcErrorFromServiceError(err) + } + return jsonResponse(data), nil +} + +func (h *Handler) Trigger(ctx context.Context, req *pb.ActiveScheduleRequest) (*pb.TriggerResponse, error) { + if h == nil || h.svc == nil { + return nil, grpcErrorFromServiceError(errors.New("active-scheduler service dependency not initialized")) + } + if req == nil { + return nil, grpcErrorFromServiceError(respond.MissingParam) + } + resp, err := h.svc.Trigger(ctx, activeScheduleRequestFromPB(req)) + if err != nil { + return nil, grpcErrorFromServiceError(err) + } + return triggerResponseToPB(resp), nil +} + +func (h *Handler) CreatePreview(ctx context.Context, req *pb.ActiveScheduleRequest) (*pb.JSONResponse, error) { + if h == nil || h.svc == nil { + return nil, grpcErrorFromServiceError(errors.New("active-scheduler service dependency not initialized")) + } + if req == nil { + return nil, grpcErrorFromServiceError(respond.MissingParam) + } + data, err := h.svc.CreatePreview(ctx, activeScheduleRequestFromPB(req)) + if err != nil { + return nil, grpcErrorFromServiceError(err) + } + return jsonResponse(data), nil +} + +func (h *Handler) GetPreview(ctx context.Context, req *pb.GetPreviewRequest) (*pb.JSONResponse, error) { + if h == nil || h.svc == nil { + return nil, grpcErrorFromServiceError(errors.New("active-scheduler service dependency not initialized")) + } + if req == nil { + return nil, grpcErrorFromServiceError(respond.MissingParam) + } + data, err := h.svc.GetPreview(ctx, contracts.GetPreviewRequest{ + UserID: int(req.UserId), + PreviewID: req.PreviewId, + }) + if err != nil { + return nil, grpcErrorFromServiceError(err) + } + return jsonResponse(data), nil +} + +func (h *Handler) ConfirmPreview(ctx context.Context, req *pb.ConfirmPreviewRequest) (*pb.JSONResponse, error) { + if h == nil || h.svc == nil { + return nil, grpcErrorFromServiceError(errors.New("active-scheduler service dependency not initialized")) + } + if req == nil { + return nil, grpcErrorFromServiceError(respond.MissingParam) + } + data, err := h.svc.ConfirmPreview(ctx, confirmRequestFromPB(req)) + if err != nil { + return nil, grpcErrorFromServiceError(err) + } + return jsonResponse(data), nil +} + +func activeScheduleRequestFromPB(req *pb.ActiveScheduleRequest) contracts.ActiveScheduleRequest { + var mockNow *time.Time + if req.MockNowUnixNano > 0 { + value := time.Unix(0, req.MockNowUnixNano) + mockNow = &value + } + return contracts.ActiveScheduleRequest{ + UserID: int(req.UserId), + TriggerType: req.TriggerType, + TargetType: req.TargetType, + TargetID: int(req.TargetId), + FeedbackID: req.FeedbackId, + IdempotencyKey: req.IdempotencyKey, + MockNow: mockNow, + Payload: json.RawMessage(req.PayloadJson), + } +} + +func confirmRequestFromPB(req *pb.ConfirmPreviewRequest) contracts.ConfirmPreviewRequest { + requestedAt := time.Time{} + if req.RequestedAtUnixNano > 0 { + requestedAt = time.Unix(0, req.RequestedAtUnixNano) + } + return contracts.ConfirmPreviewRequest{ + UserID: int(req.UserId), + PreviewID: req.PreviewId, + CandidateID: req.CandidateId, + Action: req.Action, + EditedChanges: json.RawMessage(req.EditedChangesJson), + IdempotencyKey: req.IdempotencyKey, + RequestedAt: requestedAt, + TraceID: req.TraceId, + } +} + +func triggerResponseToPB(resp *contracts.TriggerResponse) *pb.TriggerResponse { + if resp == nil { + return &pb.TriggerResponse{} + } + previewID := "" + hasPreviewID := false + if resp.PreviewID != nil { + previewID = *resp.PreviewID + hasPreviewID = previewID != "" + } + return &pb.TriggerResponse{ + TriggerId: resp.TriggerID, + Status: resp.Status, + PreviewId: previewID, + HasPreviewId: hasPreviewID, + DedupeHit: resp.DedupeHit, + TraceId: resp.TraceID, + } +} + +func jsonResponse(data json.RawMessage) *pb.JSONResponse { + return &pb.JSONResponse{DataJson: []byte(data)} +} diff --git a/backend/services/active_scheduler/rpc/pb/active_scheduler.pb.go b/backend/services/active_scheduler/rpc/pb/active_scheduler.pb.go new file mode 100644 index 0000000..e8d84bd --- /dev/null +++ b/backend/services/active_scheduler/rpc/pb/active_scheduler.pb.go @@ -0,0 +1,82 @@ +package pb + +import proto "github.com/golang/protobuf/proto" + +var _ = proto.Marshal + +const _ = proto.ProtoPackageIsVersion3 + +type ActiveScheduleRequest struct { + UserId int64 `protobuf:"varint,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + TriggerType string `protobuf:"bytes,2,opt,name=trigger_type,json=triggerType,proto3" json:"trigger_type,omitempty"` + TargetType string `protobuf:"bytes,3,opt,name=target_type,json=targetType,proto3" json:"target_type,omitempty"` + TargetId int64 `protobuf:"varint,4,opt,name=target_id,json=targetId,proto3" json:"target_id,omitempty"` + FeedbackId string `protobuf:"bytes,5,opt,name=feedback_id,json=feedbackId,proto3" json:"feedback_id,omitempty"` + IdempotencyKey string `protobuf:"bytes,6,opt,name=idempotency_key,json=idempotencyKey,proto3" json:"idempotency_key,omitempty"` + MockNowUnixNano int64 `protobuf:"varint,7,opt,name=mock_now_unix_nano,json=mockNowUnixNano,proto3" json:"mock_now_unix_nano,omitempty"` + PayloadJson []byte `protobuf:"bytes,8,opt,name=payload_json,json=payloadJson,proto3" json:"payload_json,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ActiveScheduleRequest) Reset() { *m = ActiveScheduleRequest{} } +func (m *ActiveScheduleRequest) String() string { return proto.CompactTextString(m) } +func (*ActiveScheduleRequest) ProtoMessage() {} + +type GetPreviewRequest struct { + UserId int64 `protobuf:"varint,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + PreviewId string `protobuf:"bytes,2,opt,name=preview_id,json=previewId,proto3" json:"preview_id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *GetPreviewRequest) Reset() { *m = GetPreviewRequest{} } +func (m *GetPreviewRequest) String() string { return proto.CompactTextString(m) } +func (*GetPreviewRequest) ProtoMessage() {} + +type ConfirmPreviewRequest struct { + UserId int64 `protobuf:"varint,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + PreviewId string `protobuf:"bytes,2,opt,name=preview_id,json=previewId,proto3" json:"preview_id,omitempty"` + CandidateId string `protobuf:"bytes,3,opt,name=candidate_id,json=candidateId,proto3" json:"candidate_id,omitempty"` + Action string `protobuf:"bytes,4,opt,name=action,proto3" json:"action,omitempty"` + EditedChangesJson []byte `protobuf:"bytes,5,opt,name=edited_changes_json,json=editedChangesJson,proto3" json:"edited_changes_json,omitempty"` + IdempotencyKey string `protobuf:"bytes,6,opt,name=idempotency_key,json=idempotencyKey,proto3" json:"idempotency_key,omitempty"` + RequestedAtUnixNano int64 `protobuf:"varint,7,opt,name=requested_at_unix_nano,json=requestedAtUnixNano,proto3" json:"requested_at_unix_nano,omitempty"` + TraceId string `protobuf:"bytes,8,opt,name=trace_id,json=traceId,proto3" json:"trace_id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ConfirmPreviewRequest) Reset() { *m = ConfirmPreviewRequest{} } +func (m *ConfirmPreviewRequest) String() string { return proto.CompactTextString(m) } +func (*ConfirmPreviewRequest) ProtoMessage() {} + +type JSONResponse struct { + DataJson []byte `protobuf:"bytes,1,opt,name=data_json,json=dataJson,proto3" json:"data_json,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *JSONResponse) Reset() { *m = JSONResponse{} } +func (m *JSONResponse) String() string { return proto.CompactTextString(m) } +func (*JSONResponse) ProtoMessage() {} + +type TriggerResponse struct { + TriggerId string `protobuf:"bytes,1,opt,name=trigger_id,json=triggerId,proto3" json:"trigger_id,omitempty"` + Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` + PreviewId string `protobuf:"bytes,3,opt,name=preview_id,json=previewId,proto3" json:"preview_id,omitempty"` + HasPreviewId bool `protobuf:"varint,4,opt,name=has_preview_id,json=hasPreviewId,proto3" json:"has_preview_id,omitempty"` + DedupeHit bool `protobuf:"varint,5,opt,name=dedupe_hit,json=dedupeHit,proto3" json:"dedupe_hit,omitempty"` + TraceId string `protobuf:"bytes,6,opt,name=trace_id,json=traceId,proto3" json:"trace_id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *TriggerResponse) Reset() { *m = TriggerResponse{} } +func (m *TriggerResponse) String() string { return proto.CompactTextString(m) } +func (*TriggerResponse) ProtoMessage() {} diff --git a/backend/services/active_scheduler/rpc/pb/active_scheduler_grpc.pb.go b/backend/services/active_scheduler/rpc/pb/active_scheduler_grpc.pb.go new file mode 100644 index 0000000..9a88cb6 --- /dev/null +++ b/backend/services/active_scheduler/rpc/pb/active_scheduler_grpc.pb.go @@ -0,0 +1,201 @@ +package pb + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +const ( + ActiveScheduler_DryRun_FullMethodName = "/smartflow.active_scheduler.ActiveScheduler/DryRun" + ActiveScheduler_Trigger_FullMethodName = "/smartflow.active_scheduler.ActiveScheduler/Trigger" + ActiveScheduler_CreatePreview_FullMethodName = "/smartflow.active_scheduler.ActiveScheduler/CreatePreview" + ActiveScheduler_GetPreview_FullMethodName = "/smartflow.active_scheduler.ActiveScheduler/GetPreview" + ActiveScheduler_ConfirmPreview_FullMethodName = "/smartflow.active_scheduler.ActiveScheduler/ConfirmPreview" +) + +type ActiveSchedulerClient interface { + DryRun(ctx context.Context, in *ActiveScheduleRequest, opts ...grpc.CallOption) (*JSONResponse, error) + Trigger(ctx context.Context, in *ActiveScheduleRequest, opts ...grpc.CallOption) (*TriggerResponse, error) + CreatePreview(ctx context.Context, in *ActiveScheduleRequest, opts ...grpc.CallOption) (*JSONResponse, error) + GetPreview(ctx context.Context, in *GetPreviewRequest, opts ...grpc.CallOption) (*JSONResponse, error) + ConfirmPreview(ctx context.Context, in *ConfirmPreviewRequest, opts ...grpc.CallOption) (*JSONResponse, error) +} + +type activeSchedulerClient struct { + cc grpc.ClientConnInterface +} + +func NewActiveSchedulerClient(cc grpc.ClientConnInterface) ActiveSchedulerClient { + return &activeSchedulerClient{cc} +} + +func (c *activeSchedulerClient) DryRun(ctx context.Context, in *ActiveScheduleRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, ActiveScheduler_DryRun_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *activeSchedulerClient) Trigger(ctx context.Context, in *ActiveScheduleRequest, opts ...grpc.CallOption) (*TriggerResponse, error) { + out := new(TriggerResponse) + err := c.cc.Invoke(ctx, ActiveScheduler_Trigger_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *activeSchedulerClient) CreatePreview(ctx context.Context, in *ActiveScheduleRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, ActiveScheduler_CreatePreview_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *activeSchedulerClient) GetPreview(ctx context.Context, in *GetPreviewRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, ActiveScheduler_GetPreview_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *activeSchedulerClient) ConfirmPreview(ctx context.Context, in *ConfirmPreviewRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, ActiveScheduler_ConfirmPreview_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +type ActiveSchedulerServer interface { + DryRun(context.Context, *ActiveScheduleRequest) (*JSONResponse, error) + Trigger(context.Context, *ActiveScheduleRequest) (*TriggerResponse, error) + CreatePreview(context.Context, *ActiveScheduleRequest) (*JSONResponse, error) + GetPreview(context.Context, *GetPreviewRequest) (*JSONResponse, error) + ConfirmPreview(context.Context, *ConfirmPreviewRequest) (*JSONResponse, error) +} + +type UnimplementedActiveSchedulerServer struct{} + +func (UnimplementedActiveSchedulerServer) DryRun(context.Context, *ActiveScheduleRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method DryRun not implemented") +} + +func (UnimplementedActiveSchedulerServer) Trigger(context.Context, *ActiveScheduleRequest) (*TriggerResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Trigger not implemented") +} + +func (UnimplementedActiveSchedulerServer) CreatePreview(context.Context, *ActiveScheduleRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CreatePreview not implemented") +} + +func (UnimplementedActiveSchedulerServer) GetPreview(context.Context, *GetPreviewRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetPreview not implemented") +} + +func (UnimplementedActiveSchedulerServer) ConfirmPreview(context.Context, *ConfirmPreviewRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ConfirmPreview not implemented") +} + +func RegisterActiveSchedulerServer(s grpc.ServiceRegistrar, srv ActiveSchedulerServer) { + s.RegisterService(&ActiveScheduler_ServiceDesc, srv) +} + +func _ActiveScheduler_DryRun_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ActiveScheduleRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ActiveSchedulerServer).DryRun(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: ActiveScheduler_DryRun_FullMethodName} + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ActiveSchedulerServer).DryRun(ctx, req.(*ActiveScheduleRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _ActiveScheduler_Trigger_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ActiveScheduleRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ActiveSchedulerServer).Trigger(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: ActiveScheduler_Trigger_FullMethodName} + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ActiveSchedulerServer).Trigger(ctx, req.(*ActiveScheduleRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _ActiveScheduler_CreatePreview_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ActiveScheduleRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ActiveSchedulerServer).CreatePreview(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: ActiveScheduler_CreatePreview_FullMethodName} + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ActiveSchedulerServer).CreatePreview(ctx, req.(*ActiveScheduleRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _ActiveScheduler_GetPreview_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetPreviewRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ActiveSchedulerServer).GetPreview(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: ActiveScheduler_GetPreview_FullMethodName} + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ActiveSchedulerServer).GetPreview(ctx, req.(*GetPreviewRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _ActiveScheduler_ConfirmPreview_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ConfirmPreviewRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ActiveSchedulerServer).ConfirmPreview(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: ActiveScheduler_ConfirmPreview_FullMethodName} + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ActiveSchedulerServer).ConfirmPreview(ctx, req.(*ConfirmPreviewRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var ActiveScheduler_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "smartflow.active_scheduler.ActiveScheduler", + HandlerType: (*ActiveSchedulerServer)(nil), + Methods: []grpc.MethodDesc{ + {MethodName: "DryRun", Handler: _ActiveScheduler_DryRun_Handler}, + {MethodName: "Trigger", Handler: _ActiveScheduler_Trigger_Handler}, + {MethodName: "CreatePreview", Handler: _ActiveScheduler_CreatePreview_Handler}, + {MethodName: "GetPreview", Handler: _ActiveScheduler_GetPreview_Handler}, + {MethodName: "ConfirmPreview", Handler: _ActiveScheduler_ConfirmPreview_Handler}, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "services/active_scheduler/rpc/active_scheduler.proto", +} diff --git a/backend/services/active_scheduler/rpc/server.go b/backend/services/active_scheduler/rpc/server.go new file mode 100644 index 0000000..f58eb25 --- /dev/null +++ b/backend/services/active_scheduler/rpc/server.go @@ -0,0 +1,60 @@ +package rpc + +import ( + "errors" + "strings" + "time" + + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/rpc/pb" + activeschedulersv "github.com/LoveLosita/smartflow/backend/services/active_scheduler/sv" + "github.com/zeromicro/go-zero/core/service" + "github.com/zeromicro/go-zero/zrpc" + "google.golang.org/grpc" +) + +const ( + defaultListenOn = "0.0.0.0:9083" + defaultTimeout = 8 * time.Second +) + +type ServerOptions struct { + ListenOn string + Timeout time.Duration + Service *activeschedulersv.Service +} + +// NewServer 创建 active-scheduler zrpc 服务端。 +// +// 职责边界: +// 1. 只负责 zrpc server 配置与 gRPC handler 注册; +// 2. 不创建数据库、LLM、outbox 或 worker,它们由 cmd/active-scheduler 管理; +// 3. 返回 listenOn 供进程入口打印启动日志。 +func NewServer(opts ServerOptions) (*zrpc.RpcServer, string, error) { + if opts.Service == nil { + return nil, "", errors.New("active-scheduler service dependency not initialized") + } + + listenOn := strings.TrimSpace(opts.ListenOn) + if listenOn == "" { + listenOn = defaultListenOn + } + timeout := opts.Timeout + if timeout <= 0 { + timeout = defaultTimeout + } + + server, err := zrpc.NewServer(zrpc.RpcServerConf{ + ServiceConf: service.ServiceConf{ + Name: "active-scheduler.rpc", + Mode: service.DevMode, + }, + ListenOn: listenOn, + Timeout: int64(timeout / time.Millisecond), + }, func(grpcServer *grpc.Server) { + pb.RegisterActiveSchedulerServer(grpcServer, NewHandler(opts.Service)) + }) + if err != nil { + return nil, "", err + } + return server, listenOn, nil +} diff --git a/backend/services/active_scheduler/sv/service.go b/backend/services/active_scheduler/sv/service.go new file mode 100644 index 0000000..764dbee --- /dev/null +++ b/backend/services/active_scheduler/sv/service.go @@ -0,0 +1,345 @@ +package sv + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + rootdao "github.com/LoveLosita/smartflow/backend/dao" + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + eventsvc "github.com/LoveLosita/smartflow/backend/service/events" + activeadapters "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/adapters" + activeapply "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/apply" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/applyadapter" + activegraph "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/graph" + activejob "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/job" + activepreview "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/preview" + activesel "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/selection" + activesvc "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/service" + "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" + contracts "github.com/LoveLosita/smartflow/backend/shared/contracts/activescheduler" + sharedevents "github.com/LoveLosita/smartflow/backend/shared/events" + "gorm.io/gorm" +) + +const defaultJobScanLimit = 50 + +// Options 描述 active-scheduler 独立服务的启动参数。 +// +// 职责边界: +// 1. 只承载服务内部 worker 节奏和 outbox 配置; +// 2. 不承载数据库连接、模型配置或 HTTP/gateway 配置; +// 3. 零值使用安全默认值,便于本地 smoke 先跑通。 +type Options struct { + JobScanEvery time.Duration + JobScanLimit int + KafkaConfig kafkabus.Config +} + +// Service 是 active-scheduler 独立进程内的服务门面。 +// +// 职责边界: +// 1. 对 RPC 层暴露 dry-run / trigger / preview / confirm; +// 2. 对 cmd 层暴露 outbox consumer 和 due job scanner 生命周期; +// 3. 内部复用 services/active_scheduler/core 下的领域核心,避免服务入口和算法实现散落在旧根目录。 +type Service struct { + dryRun *activesvc.DryRunService + trigger *activesvc.TriggerService + previewConfirm *activesvc.PreviewConfirmService + eventBus *outboxinfra.EventBus + jobScanner *activejob.Scanner +} + +// New 构造 active-scheduler 服务运行态。 +// +// 步骤化说明: +// 1. 先组装 active-scheduler 自有 DAO、只读 readers、dry-run 和 preview/confirm; +// 2. 再按 active-scheduler 服务归属注册 outbox 路由与 active_schedule.triggered handler; +// 3. 最后创建 due job scanner,让 worker 能从 active_schedule_jobs 产生正式 trigger; +// 4. Kafka 关闭时保留 dry-run / preview / confirm,同步 trigger 会返回明确错误。 +func New(db *gorm.DB, llmService *llmservice.Service, opts Options) (*Service, error) { + if db == nil { + return nil, errors.New("active-scheduler database 未初始化") + } + + activeDAO := rootdao.NewActiveScheduleDAO(db) + activeReaders := activeadapters.NewGormReaders(db) + readers := activeadapters.ReadersFromGorm(activeReaders) + dryRun, err := activesvc.NewDryRunService(readers) + if err != nil { + return nil, err + } + previewConfirm, err := buildPreviewConfirmService(db, activeDAO, dryRun) + if err != nil { + return nil, err + } + + outboxRepo := outboxinfra.NewRepository(db) + eventBus, err := buildActiveSchedulerEventBus(outboxRepo, opts.KafkaConfig) + if err != nil { + return nil, err + } + triggerService, err := activesvc.NewTriggerService(activeDAO, eventBus) + if err != nil { + return nil, err + } + + var jobScanner *activejob.Scanner + if eventBus != nil { + graphRunner, err := buildGraphRunner(dryRun, llmService) + if err != nil { + return nil, err + } + workflow, err := activesvc.NewTriggerWorkflowServiceWithOptions( + activeDAO, + graphRunner, + outboxRepo, + opts.KafkaConfig, + activesvc.WithActiveScheduleSessionBridge(rootdao.NewAgentDAO(db), rootdao.NewActiveScheduleSessionDAO(db)), + ) + if err != nil { + return nil, err + } + if err := registerActiveSchedulerOutboxHandler(eventBus, outboxRepo, workflow); err != nil { + return nil, err + } + jobScanner, err = activejob.NewScanner(activeDAO, readers, triggerService, activejob.ScannerOptions{ + ScanEvery: opts.JobScanEvery, + Limit: normalizeJobScanLimit(opts.JobScanLimit), + }) + if err != nil { + return nil, err + } + } + + return &Service{ + dryRun: dryRun, + trigger: triggerService, + previewConfirm: previewConfirm, + eventBus: eventBus, + jobScanner: jobScanner, + }, nil +} + +// StartWorkers 启动 active-scheduler 自己的 outbox relay/consumer 和 due job scanner。 +func (s *Service) StartWorkers(ctx context.Context) { + if s == nil { + return + } + if s.eventBus != nil { + s.eventBus.Start(ctx) + } + if s.jobScanner != nil { + s.jobScanner.Start(ctx) + } +} + +// Close 关闭 active-scheduler 持有的 Kafka 资源。 +func (s *Service) Close() { + if s != nil && s.eventBus != nil { + s.eventBus.Close() + } +} + +// DryRun 同步执行主动调度诊断,并以 JSON 形式返回现有响应结构。 +func (s *Service) DryRun(ctx context.Context, req contracts.ActiveScheduleRequest) (json.RawMessage, error) { + if s == nil || s.dryRun == nil { + return nil, errors.New("active-scheduler dry-run service 未初始化") + } + trig := buildDryRunTrigger(req, time.Now()) + result, err := s.dryRun.DryRun(ctx, trig) + if err != nil { + return nil, err + } + return marshalResponseJSON(result) +} + +// Trigger 创建正式 trigger 并发布 active_schedule.triggered。 +func (s *Service) Trigger(ctx context.Context, req contracts.ActiveScheduleRequest) (*contracts.TriggerResponse, error) { + if s == nil || s.trigger == nil { + return nil, errors.New("active-scheduler trigger service 未初始化") + } + now := time.Now() + resp, err := s.trigger.CreateAndPublish(ctx, activesvc.TriggerRequest{ + UserID: req.UserID, + TriggerType: trigger.TriggerType(req.TriggerType), + Source: trigger.SourceAPITrigger, + TargetType: trigger.TargetType(req.TargetType), + TargetID: req.TargetID, + FeedbackID: req.FeedbackID, + IdempotencyKey: req.IdempotencyKey, + MockNow: req.MockNow, + IsMockTime: req.MockNow != nil, + RequestedAt: now, + Payload: normalizePayload(req.Payload), + TraceID: fmt.Sprintf("trace_api_trigger_%d_%d", req.UserID, now.UnixNano()), + }) + if err != nil { + return nil, err + } + return &contracts.TriggerResponse{ + TriggerID: resp.TriggerID, + Status: resp.Status, + PreviewID: resp.PreviewID, + DedupeHit: resp.DedupeHit, + TraceID: resp.TraceID, + }, nil +} + +// CreatePreview 同步 dry-run 后把 top1 候选固化为待确认预览。 +func (s *Service) CreatePreview(ctx context.Context, req contracts.ActiveScheduleRequest) (json.RawMessage, error) { + if s == nil || s.dryRun == nil || s.previewConfirm == nil { + return nil, errors.New("active-scheduler preview service 未初始化") + } + now := time.Now() + trig := buildDryRunTrigger(req, now) + trig.TriggerID = fmt.Sprintf("ast_api_%d_%d", req.UserID, now.UnixNano()) + trig.TraceID = fmt.Sprintf("trace_api_preview_%d_%d", req.UserID, now.UnixNano()) + + dryRunResult, err := s.dryRun.DryRun(ctx, trig) + if err != nil { + return nil, err + } + previewResp, err := s.previewConfirm.CreatePreviewFromDryRun(ctx, activepreview.CreatePreviewRequest{ + ActiveContext: dryRunResult.Context, + Observation: dryRunResult.Observation, + Candidates: dryRunResult.Candidates, + TriggerID: trig.TriggerID, + GeneratedAt: now, + }) + if err != nil { + return nil, err + } + return marshalResponseJSON(previewResp.Detail) +} + +// GetPreview 查询主动调度预览详情。 +func (s *Service) GetPreview(ctx context.Context, req contracts.GetPreviewRequest) (json.RawMessage, error) { + if s == nil || s.previewConfirm == nil { + return nil, errors.New("active-scheduler preview service 未初始化") + } + detail, err := s.previewConfirm.GetPreview(ctx, req.UserID, req.PreviewID) + if err != nil { + return nil, err + } + return marshalResponseJSON(detail) +} + +// ConfirmPreview 同步确认并正式应用主动调度预览。 +func (s *Service) ConfirmPreview(ctx context.Context, req contracts.ConfirmPreviewRequest) (json.RawMessage, error) { + if s == nil || s.previewConfirm == nil { + return nil, errors.New("active-scheduler confirm service 未初始化") + } + editedChanges, err := decodeEditedChanges(req.EditedChanges) + if err != nil { + return nil, activeapply.NewApplyError(activeapply.ErrorCodeInvalidEditedChanges, "edited_changes 不是合法的变更数组", err) + } + requestedAt := req.RequestedAt + if requestedAt.IsZero() { + requestedAt = time.Now() + } + result, err := s.previewConfirm.ConfirmPreview(ctx, activeapply.ConfirmRequest{ + PreviewID: req.PreviewID, + UserID: req.UserID, + CandidateID: req.CandidateID, + Action: activeapply.ConfirmAction(req.Action), + EditedChanges: editedChanges, + IdempotencyKey: req.IdempotencyKey, + RequestedAt: requestedAt, + TraceID: req.TraceID, + }) + if err != nil { + return nil, err + } + return marshalResponseJSON(result) +} + +func buildPreviewConfirmService(db *gorm.DB, activeDAO *rootdao.ActiveScheduleDAO, dryRun *activesvc.DryRunService) (*activesvc.PreviewConfirmService, error) { + previewService, err := activepreview.NewService(activeDAO) + if err != nil { + return nil, err + } + return activesvc.NewPreviewConfirmService(dryRun, previewService, activeDAO, applyadapter.NewGormApplyAdapter(db)) +} + +func buildGraphRunner(dryRun *activesvc.DryRunService, llmService *llmservice.Service) (*activegraph.Runner, error) { + var llmClient *llmservice.Client + if llmService != nil { + llmClient = llmService.ProClient() + } + return activegraph.NewRunner(dryRun.AsGraphDryRunFunc(), activesel.NewService(llmClient)) +} + +func buildActiveSchedulerEventBus(outboxRepo *outboxinfra.Repository, kafkaCfg kafkabus.Config) (*outboxinfra.EventBus, error) { + if outboxRepo == nil { + return nil, errors.New("active-scheduler outbox repository 未初始化") + } + if err := outboxinfra.RegisterEventService(sharedevents.ActiveScheduleTriggeredEventType, outboxinfra.ServiceActiveScheduler); err != nil { + return nil, err + } + eventBus, err := outboxinfra.NewEventBus(outboxRepo, kafkaCfg) + if err != nil { + return nil, err + } + return eventBus, nil +} + +func registerActiveSchedulerOutboxHandler(eventBus *outboxinfra.EventBus, outboxRepo *outboxinfra.Repository, workflow eventsvc.ActiveScheduleTriggeredProcessor) error { + if eventBus == nil { + return nil + } + return eventsvc.RegisterActiveScheduleTriggeredHandler(eventBus, outboxRepo, workflow) +} + +func buildDryRunTrigger(req contracts.ActiveScheduleRequest, now time.Time) trigger.ActiveScheduleTrigger { + return trigger.ActiveScheduleTrigger{ + UserID: req.UserID, + TriggerType: trigger.TriggerType(req.TriggerType), + Source: trigger.SourceAPIDryRun, + TargetType: trigger.TargetType(req.TargetType), + TargetID: req.TargetID, + FeedbackID: req.FeedbackID, + IdempotencyKey: req.IdempotencyKey, + MockNow: req.MockNow, + IsMockTime: req.MockNow != nil, + RequestedAt: now, + } +} + +func normalizePayload(raw json.RawMessage) json.RawMessage { + if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" { + return json.RawMessage("{}") + } + return raw +} + +func decodeEditedChanges(raw json.RawMessage) ([]activeapply.ApplyChange, error) { + if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" { + return nil, nil + } + var changes []activeapply.ApplyChange + if err := json.Unmarshal(raw, &changes); err != nil { + return nil, err + } + return changes, nil +} + +func marshalResponseJSON(value any) (json.RawMessage, error) { + raw, err := json.Marshal(value) + if err != nil { + return nil, err + } + return json.RawMessage(raw), nil +} + +func normalizeJobScanLimit(limit int) int { + if limit <= 0 { + return defaultJobScanLimit + } + return limit +} diff --git a/backend/shared/contracts/activescheduler/types.go b/backend/shared/contracts/activescheduler/types.go new file mode 100644 index 0000000..cc1676c --- /dev/null +++ b/backend/shared/contracts/activescheduler/types.go @@ -0,0 +1,118 @@ +package activescheduler + +import ( + "encoding/json" + "fmt" + "strings" + "time" +) + +// ActiveScheduleRequest 是 gateway 调用 active-scheduler 的通用触发请求。 +// +// 职责边界: +// 1. 只承载 dry-run / trigger / preview 三个入口共享的触发事实; +// 2. user_id 由 gateway 从 JWT 上下文补齐,不信任前端传入; +// 3. payload 保留原始 JSON,由服务侧按 trigger_type 再解释,避免 gateway 承担领域解析。 +type ActiveScheduleRequest struct { + UserID int `json:"user_id,omitempty"` + TriggerType string `json:"trigger_type" binding:"required"` + TargetType string `json:"target_type" binding:"required"` + TargetID int `json:"target_id"` + FeedbackID string `json:"feedback_id,omitempty"` + IdempotencyKey string `json:"idempotency_key,omitempty"` + MockNow *time.Time `json:"mock_now,omitempty"` + Payload json.RawMessage `json:"payload,omitempty"` +} + +// TriggerResponse 是正式触发写入后的跨进程响应。 +type TriggerResponse struct { + TriggerID string `json:"trigger_id"` + Status string `json:"status"` + PreviewID *string `json:"preview_id,omitempty"` + DedupeHit bool `json:"dedupe_hit"` + TraceID string `json:"trace_id,omitempty"` +} + +// GetPreviewRequest 是查询主动调度预览详情的跨进程请求。 +type GetPreviewRequest struct { + UserID int `json:"user_id,omitempty"` + PreviewID string `json:"preview_id"` +} + +// ConfirmPreviewRequest 是确认主动调度预览的跨进程请求。 +// +// 职责边界: +// 1. edited_changes 保留原始 JSON,由 active-scheduler 服务侧反序列化为 apply.Change; +// 2. gateway 只补 user_id / preview_id,不理解正式写库命令; +// 3. action / idempotency_key 继续保持现有前端请求语义。 +type ConfirmPreviewRequest struct { + PreviewID string `json:"preview_id,omitempty"` + UserID int `json:"user_id,omitempty"` + CandidateID string `json:"candidate_id"` + Action string `json:"action"` + EditedChanges json.RawMessage `json:"edited_changes,omitempty"` + IdempotencyKey string `json:"idempotency_key"` + RequestedAt time.Time `json:"requested_at,omitempty"` + TraceID string `json:"trace_id,omitempty"` +} + +type ApplyErrorCode string + +const ( + ApplyErrorCodeExpired ApplyErrorCode = "expired" + ApplyErrorCodeIdempotencyConflict ApplyErrorCode = "idempotency_conflict" + ApplyErrorCodeBaseVersionChanged ApplyErrorCode = "base_version_changed" + ApplyErrorCodeTargetNotFound ApplyErrorCode = "target_not_found" + ApplyErrorCodeTargetCompleted ApplyErrorCode = "target_completed" + ApplyErrorCodeTargetAlreadySchedule ApplyErrorCode = "target_already_scheduled" + ApplyErrorCodeSlotConflict ApplyErrorCode = "slot_conflict" + ApplyErrorCodeInvalidEditedChanges ApplyErrorCode = "invalid_edited_changes" + ApplyErrorCodeUnsupportedChangeType ApplyErrorCode = "unsupported_change_type" + ApplyErrorCodeDBError ApplyErrorCode = "db_error" + ApplyErrorCodeInvalidRequest ApplyErrorCode = "invalid_request" + ApplyErrorCodeForbidden ApplyErrorCode = "forbidden" + ApplyErrorCodeAlreadyApplied ApplyErrorCode = "already_applied" +) + +const ( + ApplyStatusRejected = "rejected" + ApplyStatusExpired = "expired" + ApplyStatusFailed = "failed" +) + +// ApplyError 是 active-scheduler confirm/apply 业务拒绝的跨进程错误。 +// +// 职责边界: +// 1. 只承载可映射到 HTTP 4xx/5xx 的错误码与展示文案; +// 2. 不暴露服务内部 applyadapter、DAO 或事务错误类型; +// 3. cause 仅用于 gateway 日志串联,响应体只使用 code/message。 +type ApplyError struct { + Code ApplyErrorCode + Message string + Cause error +} + +func (e *ApplyError) Error() string { + if e == nil { + return "" + } + message := strings.TrimSpace(e.Message) + if message == "" { + return string(e.Code) + } + return fmt.Sprintf("%s: %s", e.Code, message) +} + +func (e *ApplyError) Unwrap() error { + if e == nil { + return nil + } + return e.Cause +} + +// ConfirmErrorResult 是 confirm/apply 失败时仍返回给前端的稳定数据形状。 +type ConfirmErrorResult struct { + ApplyStatus string `json:"apply_status"` + ErrorCode ApplyErrorCode `json:"error_code,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` +} diff --git a/backend/shared/ports/active_scheduler.go b/backend/shared/ports/active_scheduler.go new file mode 100644 index 0000000..612240d --- /dev/null +++ b/backend/shared/ports/active_scheduler.go @@ -0,0 +1,22 @@ +package ports + +import ( + "context" + "encoding/json" + + contracts "github.com/LoveLosita/smartflow/backend/shared/contracts/activescheduler" +) + +// ActiveSchedulerCommandClient 是 gateway 调用 active-scheduler 服务的最小能力集合。 +// +// 职责边界: +// 1. 只覆盖当前 HTTP 入口需要的 dry-run / trigger / preview / confirm; +// 2. 不暴露 active-scheduler 的 DAO、graph、selection、job scanner 或 outbox consumer; +// 3. 复杂响应先以原始 JSON 透传,避免 gateway 重建一套主动调度 DTO。 +type ActiveSchedulerCommandClient interface { + DryRun(ctx context.Context, req contracts.ActiveScheduleRequest) (json.RawMessage, error) + Trigger(ctx context.Context, req contracts.ActiveScheduleRequest) (*contracts.TriggerResponse, error) + CreatePreview(ctx context.Context, req contracts.ActiveScheduleRequest) (json.RawMessage, error) + GetPreview(ctx context.Context, req contracts.GetPreviewRequest) (json.RawMessage, error) + ConfirmPreview(ctx context.Context, req contracts.ConfirmPreviewRequest) (json.RawMessage, error) +} diff --git a/docs/backend/微服务四步迁移与第二阶段并行开发计划.md b/docs/backend/微服务四步迁移与第二阶段并行开发计划.md index 74d72f4..9f8395a 100644 --- a/docs/backend/微服务四步迁移与第二阶段并行开发计划.md +++ b/docs/backend/微服务四步迁移与第二阶段并行开发计划.md @@ -11,6 +11,7 @@ 3. 阶段 1.5 / 1.6 已完成:`backend/services/llm` 和 `backend/services/rag` 已经是当前 canonical 入口,`backend/infra/llm` 和 `backend/infra/rag` 的 `.go` 旧实现已删除。 4. 阶段 2 已完成:`user/auth` 已经从 Gin 单体抽成 `cmd/userauth` + `services/userauth` 的 go-zero zrpc 服务边界,gateway 只保留 user HTTP 入口、鉴权、额度门禁和轻量转发。 5. 阶段 3 `notification` 服务化已完成实现、code review 修复和真实 smoke;不要再把 outbox、llm-service、rag-service 或 user/auth 当成未完成待办。 +6. 阶段 4 `active-scheduler` 服务化已完成首轮收口:独立进程、zrpc、服务级 outbox consumer / relay / retry loop、gateway 门面和代码归属已经切到新边界。 本计划遵守两个硬原则: @@ -68,11 +69,12 @@ Gin Gateway 只做边缘层职责: 4. 直接维护服务内部重试与投递状态。 5. 直接维护用户黑名单、JWT 签发、token 额度账本这类 user/auth 内部状态。 -当前阶段 2 切流点: +当前 gateway 切流点: -1. `/api/v1/user/*` 由 `backend/gateway/userapi` 承载 HTTP 入口,核心能力通过 `backend/gateway/userauth` 调 `cmd/userauth` zrpc。 +1. `/api/v1/user/*` 由 `backend/gateway/api/userauth` 承载 HTTP 入口,核心能力通过 `backend/gateway/client/userauth` 调 `cmd/userauth` zrpc。 2. `gateway/middleware` 的 JWT 鉴权和 token quota guard 只调 `userauth`,不直接读写 `users`、Redis 黑名单或额度缓存。 -3. zrpc client 放在 gateway/service 调用侧目录,不放进 `cmd`。`cmd` 只负责进程入口和装配,不承载跨服务 client 语义。 +3. `notification`、`active-scheduler` 等跨服务 zrpc client 统一放在 `backend/gateway/client/`,HTTP 门面统一放在 `backend/gateway/api`。 +4. zrpc client 不放进 `cmd`。`cmd` 只负责进程入口和装配,不承载跨服务 client 语义。 ### 3.2 服务层 @@ -96,9 +98,11 @@ gozero 服务负责领域能力: > > 当前状态:`llm-service` / `rag-service` 这两个边界已经先做成 `backend/services/*` 的服务内模块,调用仍由 `backend/cmd/start.go` 在同一进程内装配,不是 gozero 独立进程。 > -> 当前状态:`user/auth` 已经完成 go-zero zrpc 独立进程拆分,是阶段 2 样板。服务端在 `backend/services/userauth`,进程入口在 `backend/cmd/userauth`,gateway client 在 `backend/gateway/userauth`。 +> 当前状态:`user/auth` 已经完成 go-zero zrpc 独立进程拆分,是阶段 2 样板。服务端在 `backend/services/userauth`,进程入口在 `backend/cmd/userauth`,gateway HTTP 门面在 `backend/gateway/api/userauth`,gateway client 在 `backend/gateway/client/userauth`。 > -> 当前状态:`notification` 已经完成阶段 3 拆分。服务端在 `backend/services/notification`,进程入口在 `backend/cmd/notification`,gateway client 在 `backend/gateway/notification`,服务级 outbox consumer 和 retry loop 已随服务入口迁出。 +> 当前状态:`notification` 已经完成阶段 3 拆分。服务端在 `backend/services/notification`,进程入口在 `backend/cmd/notification`,gateway client 在 `backend/gateway/client/notification`,服务级 outbox consumer 和 retry loop 已随服务入口迁出。 +> +> 当前状态:`active-scheduler` 已经完成阶段 4 首轮收口。服务端在 `backend/services/active_scheduler`,进程入口在 `backend/cmd/active-scheduler`,gateway HTTP 门面在 `backend/gateway/api`,gateway client 在 `backend/gateway/client/activescheduler`。 ### 3.3 事件层 @@ -145,7 +149,7 @@ gozero 服务负责领域能力: | 1.6 | 再抽 rag-service(已完成) | 已完成,`backend/services/rag` 作为当前 canonical 入口 | `go test ./...` + memory retrieve / rerank smoke | | 2 | 先拆 user/auth(已完成) | 已完成,阶段 2 样板 commit 点:userauth zrpc、gateway userapi、JWT/黑名单/额度治理、启动与迁移边界已收口 | 已完成注册/登录/刷新/并发 refresh/登出/鉴权/token quota smoke | | 3 | 再拆 notification(已完成) | 已完成,`cmd/notification` + `services/notification` zrpc / outbox consumer / retry loop 已收口,旧单体实现已删除;是否 commit 等用户明确要求 | 已完成 notification E2E smoke + worker-only smoke | -| 4 | 再拆 active-scheduler | 预览生成和确认链路通过 gozero 服务跑通后 commit | dry-run / preview / confirm smoke | +| 4 | 再拆 active-scheduler(已完成首轮收口) | 已完成,`cmd/active-scheduler` + `services/active_scheduler` zrpc / outbox consumer / retry loop / gateway 门面已收口;是否 commit 等用户明确要求 | 已完成 dry-run / trigger / preview ready smoke;confirm 仍建议单独做写入型 smoke | | 5 | 再拆 schedule / task / course / task-class | 每个领域完成一次切流就 commit 一次 | schedule/task/course/task-class 回归 + 全链路 smoke | | 6 | 再拆 agent / memory | agent 编排服务、memory 支撑服务和后台 worker 独立后 commit | agent chat / SSE / memory extract / memory retrieve smoke | | 7 | Gin Gateway 收口 | 网关不再直接碰核心业务表后 commit | Gateway 路由、鉴权、组合逻辑 smoke | @@ -311,8 +315,8 @@ flowchart LR 1. 新增 `backend/cmd/userauth/main.go` 作为 userauth 独立进程入口。 2. 新增 `backend/services/userauth/**`,承载注册、登录、刷新 token、登出、JWT 签发/校验、黑名单、token 额度治理和 token 记账幂等。 -3. 新增 `backend/gateway/userapi/**`,承载 `/api/v1/user/register`、`/api/v1/user/login`、`/api/v1/user/refresh-token`、`/api/v1/user/logout` 的 HTTP handler。 -4. 新增 `backend/gateway/userauth/**`,承载 gateway 侧 zrpc client 和 gRPC 错误反解。 +3. 新增并归档到 `backend/gateway/api/userauth/**`,承载 `/api/v1/user/register`、`/api/v1/user/login`、`/api/v1/user/refresh-token`、`/api/v1/user/logout` 的 HTTP handler。 +4. 新增并归档到 `backend/gateway/client/userauth/**`,承载 gateway 侧 zrpc client 和 gRPC 错误反解。 5. 新增 `backend/gateway/middleware/**`,把 JWT 鉴权和 token quota guard 改成调用 userauth,不再直接碰 users 表或 Redis 黑名单细节。 6. 新增 `backend/shared/contracts/userauth` 与 `backend/shared/ports`,只放跨层契约和端口接口。 7. 拆分 MySQL / Redis 初始化和 AutoMigrate 边界:`cmd/all` 走 `ConnectCoreDB` / `InitCoreRedis`,只迁单体残留域;`cmd/userauth` 自己迁 `users` 和 `user_token_usage_adjustments`。 @@ -379,7 +383,7 @@ flowchart LR 本轮收口状态(2026-05-04): 1. `cmd/notification` 已承载 notification zrpc 启动、DB 迁移、服务级 outbox consumer 和重试扫描。 -2. `backend/services/notification` 已收进 DAO、model、sv、rpc、飞书 provider 和 outbox handler;gateway 通过 `backend/gateway/notification` zrpc client 调用。 +2. `backend/services/notification` 已收进 DAO、model、sv、rpc、飞书 provider 和 outbox handler;gateway 通过 `backend/gateway/client/notification` zrpc client 调用。 3. 主动调度侧只写入 `notification.feishu.requested`,publisher 侧只注册事件归属到 `notification`,不再启动单体 notification consumer。 4. 旧 `backend/notification`、旧 DAO/model 和旧 `service/events/notification_feishu.go` 已删除;review 发现的 sending 租约恢复和 RPC timeout 边界已修复。 5. 真实 smoke 已通过:`notification_outbox_messages.id=3` 已从 `pending` 推进到 `consumed`,`smartflow.notification.outbox` 已出现 `outbox_id=3`,对应 `notification_records` 生成并按未启用通道进入 `skipped`。 @@ -413,6 +417,15 @@ flowchart LR 3. confirm / apply smoke。 4. `mock_now` 和幂等回归测试。 +本轮收口状态(2026-05-04): + +1. `cmd/active-scheduler` 已承载 active-scheduler zrpc 启动、DB 迁移、服务级 outbox consumer、relay、retry loop 和 due job scanner。 +2. `backend/services/active_scheduler` 已收进 DAO、sv、rpc 和主动调度核心逻辑;复杂领域流程统一下沉到 `backend/services/active_scheduler/core`,旧 `backend/active_scheduler` 活跃实现已移除。 +3. gateway HTTP 门面已统一到 `backend/gateway/api`,active-scheduler、notification、userauth 的 zrpc client 已统一到 `backend/gateway/client/*`。 +4. 单体 `cmd/start.go` 不再启动 active-scheduler workflow / scanner / handler;gateway 的 `/api/v1/active-schedule/*` 只做鉴权、参数绑定、超时和 zrpc 转发。 +5. 迁移期仍共享主库读取 / 写入 task、schedule、agent 会话与 notification outbox 相关表;active-scheduler 启动时会显式检查这些运行时依赖表,后续阶段 5/6 再逐步切成 RPC 或 read model。 +6. 已完成真实 smoke:`trigger -> active_scheduler_outbox_messages -> consume -> preview ready` 闭环通过,当前 trigger 没有出现单体误消费导致的 dead handler。 + --- ### 4.9 阶段 5:再拆 schedule / task / course / task-class @@ -509,15 +522,14 @@ flowchart LR 当前建议按这个顺序推进: -注:阶段 1.5 / 1.6 / 2 / 3 已完成;`notification` 已完成实现、code review 修复和真实 smoke,不再作为下一轮待办。 +注:阶段 1.5 / 1.6 / 2 / 3 / 4 已完成首轮收口;`notification` 和 `active-scheduler` 都不再作为“未拆服务”待办。 1. 以阶段 1 的服务级 outbox 为当前基线,不再回头做共享 outbox 方案。 2. 保持 `backend/services/llm` 和 `backend/services/rag` 为 canonical 入口,不再把它们写成待办。 3. 保持 `backend/services/userauth` + `cmd/userauth` 为阶段 2 样板,不再回头恢复 Gin 单体 user/auth。 -4. 下一步进入阶段 4,优先切 `active-scheduler`。 -5. 然后切 schedule / task / course / task-class。 -6. 再切 agent / memory,把聊天编排和记忆链路独立出去。 -7. 最后把 Gin 收口成纯 Gateway。 +4. 下一步进入阶段 5,优先切 schedule / task / course / task-class,逐步替换 active-scheduler 当前的跨域 DB 依赖。 +5. 再切 agent / memory,把聊天编排、主动调度会话复跑和记忆链路独立出去。 +6. 最后把 Gin 收口成纯 Gateway。 一句话总结: @@ -702,11 +714,11 @@ SmartFlow-Agent/ > 当前目录到目标目录的映射: > > 1. `backend/services/userauth/*` 已经是阶段 2 终态样板;旧 `backend/api/user.go`、`backend/service/user.go`、`backend/dao/user.go`、`backend/model/user.go`、`backend/model/auth.go`、`backend/auth/jwt_handler.go`、`backend/middleware/token_handler.go`、`backend/middleware/token_quota_guard.go`、`backend/routers/routers.go` 不再作为活跃实现。 -> 2. `backend/gateway/userapi/*` 是 user HTTP 入口,`backend/gateway/userauth/*` 是 userauth zrpc client,二者都属于 gateway 边缘层。 +> 2. `backend/gateway/api/userauth/*` 是 user HTTP 入口,`backend/gateway/client/userauth/*` 是 userauth zrpc client,二者都属于 gateway 边缘层。 > 3. `backend/service/*.go` 这批现有业务逻辑,后面要分别迁到各自服务根目录下的 `sv/`。 > 4. `backend/service/agentsvc/*` 和 `backend/newAgent/*`,后面要收束到 `backend/services/agent/sv/` + `internal/{prompt,graph,stream,tool,session,router}`。 -> 5. `backend/services/notification/*` 已经是阶段 3 终态样板;`backend/cmd/notification` 是独立进程入口,`backend/gateway/notification` 是 gateway 侧 zrpc client,`backend/shared/contracts/notification` 只放跨层契约;旧 `backend/notification/*`、旧 DAO/model 和旧 `service/events/notification_feishu.go` 不再作为活跃实现。 -> 6. `backend/active_scheduler/*`,后面要收束到 `backend/services/active-scheduler/`,其中 `graph/selection/feedbacklocate/apply/job` 归入 `internal/`。 +> 5. `backend/services/notification/*` 已经是阶段 3 终态样板;`backend/cmd/notification` 是独立进程入口,`backend/gateway/client/notification` 是 gateway 侧 zrpc client,`backend/shared/contracts/notification` 只放跨层契约;旧 `backend/notification/*`、旧 DAO/model 和旧 `service/events/notification_feishu.go` 不再作为活跃实现。 +> 6. `backend/services/active_scheduler/*` 已经是阶段 4 当前样板;`backend/cmd/active-scheduler` 是独立进程入口,`backend/gateway/client/activescheduler` 是 gateway 侧 zrpc client,`backend/services/active_scheduler/core` 承载迁移期领域核心;旧 `backend/active_scheduler/*` 不再作为活跃实现。 > 7. `backend/memory/*`,后面要收束到 `backend/services/memory/`;当前 `memory/service/*` 只是迁移过渡态,终态还是按 `sv/` 或 `internal/` 拆开。 > > 说明 4:`shared` 先保留 `events` 和少量跨服务底座型 `infra`。以后如果真的出现跨服务 DTO / 枚举 / 常量,再新增 `contracts` 一类目录,但不要把 `dao`、`model`、`sv`、`handler` 这类服务私有层塞进去。 @@ -751,7 +763,7 @@ SmartFlow-Agent/ ### 6.6 `notification` / `active-scheduler` 的服务内结构 1. `notification` 建议直接收束成更标准的服务内单体壳:外层统一成 `dao/`、`model/`、`sv/`、`handler.go`、`start.go`,当前的 `runner.go`、`provider.go`、`dedupe.go`、`channel_service.go` 这类细节先保留在服务内部,但后面要逐步并入 `sv/` 或 `internal/notification/`,不要长期挂成一串平级文件。 -2. `active-scheduler` 建议收束成同类服务壳,外层只保留 `dao/`、`model/`、`sv/`、`handler.go`、`start.go`,把 `graph`、`selection`、`feedbacklocate`、`apply`、`job` 这些复杂流程统一下沉到 `internal/`。 +2. `active-scheduler` 已先收束成 `cmd/active-scheduler` + `services/active_scheduler/{dao,rpc,sv,core}`;`core` 是迁移期领域核心承载处,后续若不再被单体 agent 会话复跑路径引用,可继续改成 `internal/{graph,selection,preview,feedbacklocate,apply,job,trigger}`。 3. 这样做的目标,是让后续每个服务的阅读方式都更接近你熟悉的 seckill 风格,而不是把一个服务拆成十几个平级目录。 ### 6.7 每个服务的典型用例 @@ -760,11 +772,11 @@ SmartFlow-Agent/ | 服务 | 典型用例 | 结构收束建议 | 不允许的改法 | | --- | --- | --- | --- | -| `user/auth` | 注册、登录、刷新、登出、JWT 签发、黑名单、token 额度门禁、token 记账幂等 | 已完成:`cmd/userauth` + `services/userauth/{sv,dao,model,internal/auth,rpc}`;gateway 侧是 `gateway/userapi` + `gateway/userauth` | 不要恢复旧 Gin user/auth 实现;不要让 gateway 直连 users 表、Redis 黑名单或额度缓存;不要把 zrpc client 放进 `cmd` | +| `user/auth` | 注册、登录、刷新、登出、JWT 签发、黑名单、token 额度门禁、token 记账幂等 | 已完成:`cmd/userauth` + `services/userauth/{sv,dao,model,internal/auth,rpc}`;gateway 侧是 `gateway/api/userauth` + `gateway/client/userauth` | 不要恢复旧 Gin user/auth 实现;不要让 gateway 直连 users 表、Redis 黑名单或额度缓存;不要把 zrpc client 放进 `cmd` | | `course` | 课程导入、图片解析、课表校验、课程落表,图片解析走 `llm-service` | `handler.go` / `sv/` / `dao/` / `model/` / `internal/{parse,import,conflict,adapter}/` | 不要把课程解析代码写成网关临时脚本 | | `task-class` | 任务类创建/更新、items 批量 upsert、嵌入时间同步 | `handler.go` / `sv/` / `dao/` / `model/` / `internal/{convert,batch,item}/` | 不要把批处理拼装沉到 handler 里,也不要让 agent 直接改库 | | `notification` | 消费 `notification.feishu.requested`、写通知记录、幂等、重试、provider 投递 | `start.go` / `handler.go` / `sv/` / `dao/` / `model/` / `internal/{provider,runner,dedupe,channel,retry}/` | 不要把通知投递逻辑散回 worker 或 gateway | -| `active-scheduler` | `trigger -> dry-run -> preview -> confirm`、建议生成、反馈定位;候选选择走 `llm-service` | `start.go` / `handler.go` / `sv/` / `dao/` / `model/` / `internal/{graph,selection,preview,feedbacklocate,apply,job,trigger}/` | 不要把 graph/selection 继续长成对外平级框架 | +| `active-scheduler` | `trigger -> dry-run -> preview -> confirm`、建议生成、反馈定位;候选选择走 `llm-service` | 已完成首轮收口:`cmd/active-scheduler` + `services/active_scheduler/{dao,rpc,sv,core}`;下一步把跨域 DB 访问替成服务契约后,可把 `core` 进一步改为 `internal/{graph,selection,preview,feedbacklocate,apply,job,trigger}` | 不要把 graph/selection 散回 gateway 或旧根目录 | | `schedule` | 正式日程所有权、查询、删除、应用命令 | `start.go` / `handler.go` / `sv/` / `dao/` / `model/` / `internal/{command,event,conflict}/` | 不要让 gateway 直接写 schedule 表 | | `task` | 任务新增、完成/撤销、任务池查询、紧急性平移 | `start.go` / `handler.go` / `sv/` / `dao/` / `model/` / `internal/{policy,event,urgency}/` | 不要把 task 的状态机塞进 agent 或 schedule | | `llm-service` | 统一模型调用、provider 路由、流式输出、重试、限流、审计 | `start.go` / `handler.go` / `sv/` / `model/` / `internal/{provider,router,stream,quota,audit}/` | 不要把业务 prompt、状态机、工具编排塞进模型出口 | @@ -866,10 +878,10 @@ graph TD 3. 阶段 1 已完成,当前 outbox 基线是服务级表、服务级 topic、服务级 consumer group;worker 仍在单体内装配,后续随对应服务迁出。 4. 阶段 2 已完成,`user/auth` 已经是样板服务,不要再把它当成下一轮待办。 5. 阶段 3 `notification` 已完成实现、code review 修复和真实 smoke;`llm-service`、`rag-service` 也已完成,不要重新当成待办。 -6. 下一轮默认从阶段 4 `active-scheduler` 开始;它后续要回到更像 seckill 的服务内单体壳。 +6. 阶段 4 `active-scheduler` 已完成首轮收口;后续不要再把它当成“未拆服务”,除非是在补契约测试或继续替换跨域 DB 访问。 7. `shared` 只保留跨进程契约和少量跨服务底座,不承载业务逻辑、DAO、模型或状态机。 8. 如果后续要改目录,必须先回答“这个文件属于哪一个典型用例”,回答不清楚就先别动结构。 -9. 当前文档已经可以作为切对话基线;后续代理默认按本文件推进。现阶段的迁移基线入口是 `backend/cmd/api`、`backend/cmd/worker`、`backend/cmd/all`,它们只是当前仓库的启动壳,不是终态。`backend/cmd/userauth` 是阶段 2 的独立服务入口,`backend/cmd/notification` 是阶段 3 的独立服务入口。终态仍然是“一个服务一个独立 `main.go`”,只在出现新的契约风险、边界变化或业务语义变化时再重新讨论架构。 +9. 当前文档已经可以作为切对话基线;后续代理默认按本文件推进。现阶段的迁移基线入口是 `backend/cmd/api`、`backend/cmd/worker`、`backend/cmd/all`,它们只是当前仓库的启动壳,不是终态。`backend/cmd/userauth` 是阶段 2 的独立服务入口,`backend/cmd/notification` 是阶段 3 的独立服务入口,`backend/cmd/active-scheduler` 是阶段 4 的独立服务入口。终态仍然是“一个服务一个独立 `main.go`”,只在出现新的契约风险、边界变化或业务语义变化时再重新讨论架构。 ### 6.10 启动方式与进程模型 @@ -992,7 +1004,7 @@ graph TD 这段用于避免后续代理重复踩阶段 2 已经纠偏过的问题。 -1. 阶段 3 `notification` 已完成;后续起步默认是阶段 4 `active-scheduler`,不是 outbox、llm-service、rag-service、user/auth 或 notification。 +1. 阶段 3 `notification` 和阶段 4 `active-scheduler` 已完成首轮收口;后续起步默认是阶段 5 `schedule / task / course / task-class`,不是 outbox、llm-service、rag-service、user/auth、notification 或 active-scheduler。 2. 主代理负责 leader:先读必要文档和代码,拆任务,关键阻塞任务自己做;子代理只能承担并行、明确、非阻塞的侧翼任务。 3. 如果确实有会影响切分方向的不确定点,先总结成拍板点问用户;文档已经写清楚的内容不要重复问。 4. 查库一律用 `docker exec`。MySQL / Redis 都按这个规则走;不直接用本机客户端绕过容器。 @@ -1036,7 +1048,7 @@ graph TD 1. `backend/cmd/userauth/main.go` 是 userauth 独立进程入口。 2. `backend/services/userauth` 拥有 user/auth 核心业务、DAO、模型、JWT、黑名单、额度治理、zrpc server 和 token 记账幂等表。 -3. `backend/gateway/userapi` 是 HTTP user 入口,`backend/gateway/userauth` 是 zrpc client,`backend/gateway/middleware` 只调 userauth 做鉴权和额度门禁。 +3. `backend/gateway/api/userauth` 是 HTTP user 入口,`backend/gateway/client/userauth` 是 zrpc client,`backend/gateway/middleware` 只调 userauth 做鉴权和额度门禁。 4. `backend/shared/contracts/userauth` 和 `backend/shared/ports` 只承载跨层契约,不承载服务私有业务实现。 5. `cmd/all` 不再迁 `users`,`cmd/userauth` 自己迁 `users` 和 `user_token_usage_adjustments`。 6. 完整本地 smoke 需要同时启动 `cmd/all` 和 `cmd/userauth`。 @@ -1045,13 +1057,22 @@ graph TD 1. `backend/cmd/notification/main.go` 是 notification 独立进程入口,负责 DB 迁移、zrpc server、notification outbox consumer 和 retry loop 的统一生命周期。 2. `backend/services/notification` 拥有 notification 核心业务、DAO、模型、飞书 provider、幂等、投递记录状态机、重试扫描和 outbox handler。 -3. `backend/gateway/notification` 是 gateway 侧 zrpc client;gateway 只保留 notification HTTP 入口、鉴权和轻量组合逻辑,不再直连 notification DAO/service。 +3. `backend/gateway/client/notification` 是 gateway 侧 zrpc client;gateway 只保留 notification HTTP 入口、鉴权和轻量组合逻辑,不再直连 notification DAO/service。 4. `backend/shared/contracts/notification` 和 `backend/shared/ports` 只承载跨层契约和端口接口,不承载服务私有业务实现。 5. notification 内部是 `userauth` 同款最小手搓 zrpc 框架,不使用 goctl 自动脚手架;`rpc` 只保留 `NewServer` 供 `cmd/notification` 管理 signal、outbox consumer、retry loop 和 server 生命周期。 6. 旧 `backend/notification/*`、旧 `backend/dao/notification_channel.go`、旧 `backend/model/notification_channel.go` 和旧 `backend/service/events/notification_feishu.go` 已删除;若 `backend/notification` 目录壳仍存在,它不参与编译,也不作为活跃实现。 7. notification outbox consumer 已迁入独立服务边界并处理 `notification.feishu.requested`,覆盖 payload/version 校验、dead/retry/consumed 状态推进和毒消息回退。 8. 已完成真实 smoke:`notification_outbox_messages` 可从 `pending` 推进到 `consumed`,Kafka `smartflow.notification.outbox` 可看到对应 outbox 消息,`notification_records` 可生成幂等记录并按通道状态进入预期状态。 +阶段 4 当前基线: + +1. `backend/cmd/active-scheduler/main.go` 是 active-scheduler 独立进程入口,负责 DB 迁移、zrpc server、active-scheduler outbox consumer、relay、retry loop 和 due job scanner 的统一生命周期。 +2. `backend/services/active_scheduler` 拥有 active-scheduler DAO、rpc、sv 和领域核心;核心流程当前在 `backend/services/active_scheduler/core`,旧 `backend/active_scheduler` 不再作为活跃实现存在。 +3. `backend/gateway/api` 是 HTTP 门面统一目录,`backend/gateway/client/activescheduler` 是 gateway 侧 zrpc client。 +4. `backend/shared/contracts/activescheduler` 和 `backend/shared/ports` 只承载跨层契约和端口接口,不承载服务私有业务实现。 +5. `cmd/all` 不再启动 active-scheduler workflow / scanner / handler;完整本地 smoke 需要同时启动 `cmd/all`、`cmd/userauth`、`cmd/notification` 和 `cmd/active-scheduler`。 +6. 迁移期仍共享主库访问 task、schedule、agent 会话和 notification outbox 相关表;active-scheduler 启动时做依赖表检查,后续由 schedule/task/agent 拆分继续缩小这条共享边界。 + --- ## 7. 风险与回退 @@ -1121,6 +1142,6 @@ graph TD 处理: -1. 以当前编译入口和路由装配为准:`gateway/userapi` + `gateway/userauth` + `services/userauth` 是阶段 2 当前样板。 +1. 以当前编译入口和路由装配为准:`gateway/api/userauth` + `gateway/client/userauth` + `services/userauth` 是阶段 2 当前样板。 2. 任何 user/auth 新能力先放进 `services/userauth`,gateway 只做 HTTP 适配和 respond 响应。 3. 如果 user/auth 调用失败,先查 `cmd/userauth` 是否启动、zrpc endpoint 是否正确、服务内 MySQL/Redis 是否可连,不要把逻辑搬回 gateway。