Files
smartmate/backend/services/active_scheduler/sv/service.go
Losita 29b8cf0ada Version: 0.9.70.dev.260504
后端:
1. 阶段 5 schedule 首刀服务化落地,新增 `cmd/schedule`、`services/schedule/{dao,rpc,sv,core}`、`gateway/client/schedule`、`shared/contracts/schedule` 和 schedule port
2. gateway `/api/v1/schedule/*` 切到 schedule zrpc client,HTTP 门面只保留鉴权、参数绑定、超时和轻量转发
3. active-scheduler 的 schedule facts、feedback 和 confirm apply 改为调用 schedule RPC adapter,减少对 `schedule_events`、`schedules`、`task_classes`、`task_items` 的跨域 DB 依赖
4. 单体聊天主动调度 rerun 的 schedule 读写链路切到 schedule RPC,迁移期仅保留 task facts 直读 Gorm
5. 为 schedule zrpc 补充 `Ping` 启动健康检查,并在 gateway client 与 active-scheduler adapter 初始化时校验服务可用
6. `cmd/schedule` 独立初始化 DB / Redis,只 AutoMigrate schedule 自有表,并显式检查迁移期 task / task-class 依赖表
7. 更新 active-scheduler 依赖表检查和 preview confirm apply 抽象,保留旧 Gorm 实现作为迁移期回退路径
8. 补充 `schedule.rpc` 示例配置和 schedule HTTP RPC 超时配置

文档:
1. 更新微服务迁移计划,将阶段 5 schedule 首刀进展、当前切流点、旧实现保留范围和 active-scheduler DB 依赖收缩情况写入基线
2026-05-04 22:33:38 +08:00

353 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sv
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
rootdao "github.com/LoveLosita/smartflow/backend/dao"
kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
eventsvc "github.com/LoveLosita/smartflow/backend/service/events"
activeadapters "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/adapters"
activeapply "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/apply"
activeapplyadapter "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/applyadapter"
activegraph "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/graph"
activejob "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/job"
activepreview "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/preview"
activesel "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/selection"
activesvc "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/service"
"github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger"
llmservice "github.com/LoveLosita/smartflow/backend/services/llm"
contracts "github.com/LoveLosita/smartflow/backend/shared/contracts/activescheduler"
sharedevents "github.com/LoveLosita/smartflow/backend/shared/events"
"gorm.io/gorm"
)
const defaultJobScanLimit = 50
// Options 描述 active-scheduler 独立服务的启动参数。
//
// 职责边界:
// 1. 只承载服务内部 worker 节奏和 outbox 配置;
// 2. 不承载数据库连接、模型配置或 HTTP/gateway 配置;
// 3. 零值使用安全默认值,便于本地 smoke 先跑通。
type Options struct {
JobScanEvery time.Duration
JobScanLimit int
KafkaConfig kafkabus.Config
ScheduleRPC activeadapters.ScheduleRPCConfig
}
// Service 是 active-scheduler 独立进程内的服务门面。
//
// 职责边界:
// 1. 对 RPC 层暴露 dry-run / trigger / preview / confirm
// 2. 对 cmd 层暴露 outbox consumer 和 due job scanner 生命周期;
// 3. 内部复用 services/active_scheduler/core 下的领域核心,避免服务入口和算法实现散落在旧根目录。
type Service struct {
dryRun *activesvc.DryRunService
trigger *activesvc.TriggerService
previewConfirm *activesvc.PreviewConfirmService
eventBus *outboxinfra.EventBus
jobScanner *activejob.Scanner
}
// New 构造 active-scheduler 服务运行态。
//
// 步骤化说明:
// 1. 先组装 active-scheduler 自有 DAO、只读 readers、dry-run 和 preview/confirm
// 2. 再按 active-scheduler 服务归属注册 outbox 路由与 active_schedule.triggered handler
// 3. 最后创建 due job scanner让 worker 能从 active_schedule_jobs 产生正式 trigger
// 4. Kafka 关闭时保留 dry-run / preview / confirm同步 trigger 会返回明确错误。
func New(db *gorm.DB, llmService *llmservice.Service, opts Options) (*Service, error) {
if db == nil {
return nil, errors.New("active-scheduler database 未初始化")
}
activeDAO := rootdao.NewActiveScheduleDAO(db)
activeReaders := activeadapters.NewGormReaders(db)
scheduleRPCAdapter, err := activeadapters.NewScheduleRPCAdapter(opts.ScheduleRPC)
if err != nil {
return nil, fmt.Errorf("initialize schedule rpc adapter failed: %w", err)
}
readers := activeadapters.ReadersWithScheduleRPC(activeReaders, scheduleRPCAdapter)
dryRun, err := activesvc.NewDryRunService(readers)
if err != nil {
return nil, err
}
previewConfirm, err := buildPreviewConfirmService(activeDAO, dryRun, scheduleRPCAdapter)
if err != nil {
return nil, err
}
outboxRepo := outboxinfra.NewRepository(db)
eventBus, err := buildActiveSchedulerEventBus(outboxRepo, opts.KafkaConfig)
if err != nil {
return nil, err
}
triggerService, err := activesvc.NewTriggerService(activeDAO, eventBus)
if err != nil {
return nil, err
}
var jobScanner *activejob.Scanner
if eventBus != nil {
graphRunner, err := buildGraphRunner(dryRun, llmService)
if err != nil {
return nil, err
}
workflow, err := activesvc.NewTriggerWorkflowServiceWithOptions(
activeDAO,
graphRunner,
outboxRepo,
opts.KafkaConfig,
activesvc.WithActiveScheduleSessionBridge(rootdao.NewAgentDAO(db), rootdao.NewActiveScheduleSessionDAO(db)),
)
if err != nil {
return nil, err
}
if err := registerActiveSchedulerOutboxHandler(eventBus, outboxRepo, workflow); err != nil {
return nil, err
}
jobScanner, err = activejob.NewScanner(activeDAO, readers, triggerService, activejob.ScannerOptions{
ScanEvery: opts.JobScanEvery,
Limit: normalizeJobScanLimit(opts.JobScanLimit),
})
if err != nil {
return nil, err
}
}
return &Service{
dryRun: dryRun,
trigger: triggerService,
previewConfirm: previewConfirm,
eventBus: eventBus,
jobScanner: jobScanner,
}, nil
}
// StartWorkers 启动 active-scheduler 自己的 outbox relay/consumer 和 due job scanner。
func (s *Service) StartWorkers(ctx context.Context) {
if s == nil {
return
}
if s.eventBus != nil {
s.eventBus.Start(ctx)
}
if s.jobScanner != nil {
s.jobScanner.Start(ctx)
}
}
// Close 关闭 active-scheduler 持有的 Kafka 资源。
func (s *Service) Close() {
if s != nil && s.eventBus != nil {
s.eventBus.Close()
}
}
// DryRun 同步执行主动调度诊断,并以 JSON 形式返回现有响应结构。
func (s *Service) DryRun(ctx context.Context, req contracts.ActiveScheduleRequest) (json.RawMessage, error) {
if s == nil || s.dryRun == nil {
return nil, errors.New("active-scheduler dry-run service 未初始化")
}
trig := buildDryRunTrigger(req, time.Now())
result, err := s.dryRun.DryRun(ctx, trig)
if err != nil {
return nil, err
}
return marshalResponseJSON(result)
}
// Trigger 创建正式 trigger 并发布 active_schedule.triggered。
func (s *Service) Trigger(ctx context.Context, req contracts.ActiveScheduleRequest) (*contracts.TriggerResponse, error) {
if s == nil || s.trigger == nil {
return nil, errors.New("active-scheduler trigger service 未初始化")
}
now := time.Now()
resp, err := s.trigger.CreateAndPublish(ctx, activesvc.TriggerRequest{
UserID: req.UserID,
TriggerType: trigger.TriggerType(req.TriggerType),
Source: trigger.SourceAPITrigger,
TargetType: trigger.TargetType(req.TargetType),
TargetID: req.TargetID,
FeedbackID: req.FeedbackID,
IdempotencyKey: req.IdempotencyKey,
MockNow: req.MockNow,
IsMockTime: req.MockNow != nil,
RequestedAt: now,
Payload: normalizePayload(req.Payload),
TraceID: fmt.Sprintf("trace_api_trigger_%d_%d", req.UserID, now.UnixNano()),
})
if err != nil {
return nil, err
}
return &contracts.TriggerResponse{
TriggerID: resp.TriggerID,
Status: resp.Status,
PreviewID: resp.PreviewID,
DedupeHit: resp.DedupeHit,
TraceID: resp.TraceID,
}, nil
}
// CreatePreview 同步 dry-run 后把 top1 候选固化为待确认预览。
func (s *Service) CreatePreview(ctx context.Context, req contracts.ActiveScheduleRequest) (json.RawMessage, error) {
if s == nil || s.dryRun == nil || s.previewConfirm == nil {
return nil, errors.New("active-scheduler preview service 未初始化")
}
now := time.Now()
trig := buildDryRunTrigger(req, now)
trig.TriggerID = fmt.Sprintf("ast_api_%d_%d", req.UserID, now.UnixNano())
trig.TraceID = fmt.Sprintf("trace_api_preview_%d_%d", req.UserID, now.UnixNano())
dryRunResult, err := s.dryRun.DryRun(ctx, trig)
if err != nil {
return nil, err
}
previewResp, err := s.previewConfirm.CreatePreviewFromDryRun(ctx, activepreview.CreatePreviewRequest{
ActiveContext: dryRunResult.Context,
Observation: dryRunResult.Observation,
Candidates: dryRunResult.Candidates,
TriggerID: trig.TriggerID,
GeneratedAt: now,
})
if err != nil {
return nil, err
}
return marshalResponseJSON(previewResp.Detail)
}
// GetPreview 查询主动调度预览详情。
func (s *Service) GetPreview(ctx context.Context, req contracts.GetPreviewRequest) (json.RawMessage, error) {
if s == nil || s.previewConfirm == nil {
return nil, errors.New("active-scheduler preview service 未初始化")
}
detail, err := s.previewConfirm.GetPreview(ctx, req.UserID, req.PreviewID)
if err != nil {
return nil, err
}
return marshalResponseJSON(detail)
}
// ConfirmPreview 同步确认并正式应用主动调度预览。
func (s *Service) ConfirmPreview(ctx context.Context, req contracts.ConfirmPreviewRequest) (json.RawMessage, error) {
if s == nil || s.previewConfirm == nil {
return nil, errors.New("active-scheduler confirm service 未初始化")
}
editedChanges, err := decodeEditedChanges(req.EditedChanges)
if err != nil {
return nil, activeapply.NewApplyError(activeapply.ErrorCodeInvalidEditedChanges, "edited_changes 不是合法的变更数组", err)
}
requestedAt := req.RequestedAt
if requestedAt.IsZero() {
requestedAt = time.Now()
}
result, err := s.previewConfirm.ConfirmPreview(ctx, activeapply.ConfirmRequest{
PreviewID: req.PreviewID,
UserID: req.UserID,
CandidateID: req.CandidateID,
Action: activeapply.ConfirmAction(req.Action),
EditedChanges: editedChanges,
IdempotencyKey: req.IdempotencyKey,
RequestedAt: requestedAt,
TraceID: req.TraceID,
})
if err != nil {
return nil, err
}
return marshalResponseJSON(result)
}
func buildPreviewConfirmService(activeDAO *rootdao.ActiveScheduleDAO, dryRun *activesvc.DryRunService, scheduleApplyAdapter interface {
ApplyActiveScheduleChanges(context.Context, activeapplyadapter.ApplyActiveScheduleRequest) (activeapplyadapter.ApplyActiveScheduleResult, error)
}) (*activesvc.PreviewConfirmService, error) {
previewService, err := activepreview.NewService(activeDAO)
if err != nil {
return nil, err
}
return activesvc.NewPreviewConfirmService(dryRun, previewService, activeDAO, scheduleApplyAdapter)
}
func buildGraphRunner(dryRun *activesvc.DryRunService, llmService *llmservice.Service) (*activegraph.Runner, error) {
var llmClient *llmservice.Client
if llmService != nil {
llmClient = llmService.ProClient()
}
return activegraph.NewRunner(dryRun.AsGraphDryRunFunc(), activesel.NewService(llmClient))
}
func buildActiveSchedulerEventBus(outboxRepo *outboxinfra.Repository, kafkaCfg kafkabus.Config) (*outboxinfra.EventBus, error) {
if outboxRepo == nil {
return nil, errors.New("active-scheduler outbox repository 未初始化")
}
if err := outboxinfra.RegisterEventService(sharedevents.ActiveScheduleTriggeredEventType, outboxinfra.ServiceActiveScheduler); err != nil {
return nil, err
}
eventBus, err := outboxinfra.NewEventBus(outboxRepo, kafkaCfg)
if err != nil {
return nil, err
}
return eventBus, nil
}
func registerActiveSchedulerOutboxHandler(eventBus *outboxinfra.EventBus, outboxRepo *outboxinfra.Repository, workflow eventsvc.ActiveScheduleTriggeredProcessor) error {
if eventBus == nil {
return nil
}
return eventsvc.RegisterActiveScheduleTriggeredHandler(eventBus, outboxRepo, workflow)
}
func buildDryRunTrigger(req contracts.ActiveScheduleRequest, now time.Time) trigger.ActiveScheduleTrigger {
return trigger.ActiveScheduleTrigger{
UserID: req.UserID,
TriggerType: trigger.TriggerType(req.TriggerType),
Source: trigger.SourceAPIDryRun,
TargetType: trigger.TargetType(req.TargetType),
TargetID: req.TargetID,
FeedbackID: req.FeedbackID,
IdempotencyKey: req.IdempotencyKey,
MockNow: req.MockNow,
IsMockTime: req.MockNow != nil,
RequestedAt: now,
}
}
func normalizePayload(raw json.RawMessage) json.RawMessage {
if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" {
return json.RawMessage("{}")
}
return raw
}
func decodeEditedChanges(raw json.RawMessage) ([]activeapply.ApplyChange, error) {
if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" {
return nil, nil
}
var changes []activeapply.ApplyChange
if err := json.Unmarshal(raw, &changes); err != nil {
return nil, err
}
return changes, nil
}
func marshalResponseJSON(value any) (json.RawMessage, error) {
raw, err := json.Marshal(value)
if err != nil {
return nil, err
}
return json.RawMessage(raw), nil
}
func normalizeJobScanLimit(limit int) int {
if limit <= 0 {
return defaultJobScanLimit
}
return limit
}