Files
smartmate/backend/services/active_scheduler/sv/service.go
Losita 6843c7efac Version: 0.9.71.dev.260504
后端:
1.阶段 5 task 服务边界落地
- 新增 cmd/task 与 services/task/{dao,rpc,sv},承载 task zrpc、tasks 表迁移和 task outbox 消费边界
- 新增 gateway/client/task、shared/contracts/task 和 task port,gateway /api/v1/task/* 切到 task zrpc client
- 将 task.urgency.promote.requested handler / relay / retry loop 迁入 cmd/task,单体 worker 不再消费 task outbox
- 保留单体 Agent 残留 task 查询的 publish-only 写入能力,避免迁移期 task 事件丢失
- active-scheduler task facts / due job scanner 切到 task RPC,并移除启动期 tasks 表依赖检查
- 更新阶段 5 文档,记录 task 切流点、旧实现保留、跨域 DB 依赖缩减和下一轮建议
- 补充 task rpc 示例配置
2026-05-05 00:00:09 +08:00

357 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sv
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
rootdao "github.com/LoveLosita/smartflow/backend/dao"
kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
eventsvc "github.com/LoveLosita/smartflow/backend/service/events"
activeadapters "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/adapters"
activeapply "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/apply"
activeapplyadapter "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/applyadapter"
activegraph "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/graph"
activejob "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/job"
activepreview "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/preview"
activesel "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/selection"
activesvc "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/service"
"github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger"
llmservice "github.com/LoveLosita/smartflow/backend/services/llm"
contracts "github.com/LoveLosita/smartflow/backend/shared/contracts/activescheduler"
sharedevents "github.com/LoveLosita/smartflow/backend/shared/events"
"gorm.io/gorm"
)
const defaultJobScanLimit = 50
// Options 描述 active-scheduler 独立服务的启动参数。
//
// 职责边界:
// 1. 只承载服务内部 worker 节奏和 outbox 配置;
// 2. 不承载数据库连接、模型配置或 HTTP/gateway 配置;
// 3. 零值使用安全默认值,便于本地 smoke 先跑通。
type Options struct {
JobScanEvery time.Duration
JobScanLimit int
KafkaConfig kafkabus.Config
TaskRPC activeadapters.TaskRPCConfig
ScheduleRPC activeadapters.ScheduleRPCConfig
}
// Service 是 active-scheduler 独立进程内的服务门面。
//
// 职责边界:
// 1. 对 RPC 层暴露 dry-run / trigger / preview / confirm
// 2. 对 cmd 层暴露 outbox consumer 和 due job scanner 生命周期;
// 3. 内部复用 services/active_scheduler/core 下的领域核心,避免服务入口和算法实现散落在旧根目录。
type Service struct {
dryRun *activesvc.DryRunService
trigger *activesvc.TriggerService
previewConfirm *activesvc.PreviewConfirmService
eventBus *outboxinfra.EventBus
jobScanner *activejob.Scanner
}
// New 构造 active-scheduler 服务运行态。
//
// 步骤化说明:
// 1. 先组装 active-scheduler 自有 DAO、只读 readers、dry-run 和 preview/confirm
// 2. 再按 active-scheduler 服务归属注册 outbox 路由与 active_schedule.triggered handler
// 3. 最后创建 due job scanner让 worker 能从 active_schedule_jobs 产生正式 trigger
// 4. Kafka 关闭时保留 dry-run / preview / confirm同步 trigger 会返回明确错误。
func New(db *gorm.DB, llmService *llmservice.Service, opts Options) (*Service, error) {
if db == nil {
return nil, errors.New("active-scheduler database 未初始化")
}
activeDAO := rootdao.NewActiveScheduleDAO(db)
taskRPCAdapter, err := activeadapters.NewTaskRPCAdapter(opts.TaskRPC)
if err != nil {
return nil, fmt.Errorf("initialize task rpc adapter failed: %w", err)
}
scheduleRPCAdapter, err := activeadapters.NewScheduleRPCAdapter(opts.ScheduleRPC)
if err != nil {
return nil, fmt.Errorf("initialize schedule rpc adapter failed: %w", err)
}
readers := activeadapters.ReadersWithScheduleRPC(taskRPCAdapter, scheduleRPCAdapter)
dryRun, err := activesvc.NewDryRunService(readers)
if err != nil {
return nil, err
}
previewConfirm, err := buildPreviewConfirmService(activeDAO, dryRun, scheduleRPCAdapter)
if err != nil {
return nil, err
}
outboxRepo := outboxinfra.NewRepository(db)
eventBus, err := buildActiveSchedulerEventBus(outboxRepo, opts.KafkaConfig)
if err != nil {
return nil, err
}
triggerService, err := activesvc.NewTriggerService(activeDAO, eventBus)
if err != nil {
return nil, err
}
var jobScanner *activejob.Scanner
if eventBus != nil {
graphRunner, err := buildGraphRunner(dryRun, llmService)
if err != nil {
return nil, err
}
workflow, err := activesvc.NewTriggerWorkflowServiceWithOptions(
activeDAO,
graphRunner,
outboxRepo,
opts.KafkaConfig,
activesvc.WithActiveScheduleSessionBridge(rootdao.NewAgentDAO(db), rootdao.NewActiveScheduleSessionDAO(db)),
)
if err != nil {
return nil, err
}
if err := registerActiveSchedulerOutboxHandler(eventBus, outboxRepo, workflow); err != nil {
return nil, err
}
jobScanner, err = activejob.NewScanner(activeDAO, readers, triggerService, activejob.ScannerOptions{
ScanEvery: opts.JobScanEvery,
Limit: normalizeJobScanLimit(opts.JobScanLimit),
})
if err != nil {
return nil, err
}
}
return &Service{
dryRun: dryRun,
trigger: triggerService,
previewConfirm: previewConfirm,
eventBus: eventBus,
jobScanner: jobScanner,
}, nil
}
// StartWorkers 启动 active-scheduler 自己的 outbox relay/consumer 和 due job scanner。
func (s *Service) StartWorkers(ctx context.Context) {
if s == nil {
return
}
if s.eventBus != nil {
s.eventBus.Start(ctx)
}
if s.jobScanner != nil {
s.jobScanner.Start(ctx)
}
}
// Close 关闭 active-scheduler 持有的 Kafka 资源。
func (s *Service) Close() {
if s != nil && s.eventBus != nil {
s.eventBus.Close()
}
}
// DryRun 同步执行主动调度诊断,并以 JSON 形式返回现有响应结构。
func (s *Service) DryRun(ctx context.Context, req contracts.ActiveScheduleRequest) (json.RawMessage, error) {
if s == nil || s.dryRun == nil {
return nil, errors.New("active-scheduler dry-run service 未初始化")
}
trig := buildDryRunTrigger(req, time.Now())
result, err := s.dryRun.DryRun(ctx, trig)
if err != nil {
return nil, err
}
return marshalResponseJSON(result)
}
// Trigger 创建正式 trigger 并发布 active_schedule.triggered。
func (s *Service) Trigger(ctx context.Context, req contracts.ActiveScheduleRequest) (*contracts.TriggerResponse, error) {
if s == nil || s.trigger == nil {
return nil, errors.New("active-scheduler trigger service 未初始化")
}
now := time.Now()
resp, err := s.trigger.CreateAndPublish(ctx, activesvc.TriggerRequest{
UserID: req.UserID,
TriggerType: trigger.TriggerType(req.TriggerType),
Source: trigger.SourceAPITrigger,
TargetType: trigger.TargetType(req.TargetType),
TargetID: req.TargetID,
FeedbackID: req.FeedbackID,
IdempotencyKey: req.IdempotencyKey,
MockNow: req.MockNow,
IsMockTime: req.MockNow != nil,
RequestedAt: now,
Payload: normalizePayload(req.Payload),
TraceID: fmt.Sprintf("trace_api_trigger_%d_%d", req.UserID, now.UnixNano()),
})
if err != nil {
return nil, err
}
return &contracts.TriggerResponse{
TriggerID: resp.TriggerID,
Status: resp.Status,
PreviewID: resp.PreviewID,
DedupeHit: resp.DedupeHit,
TraceID: resp.TraceID,
}, nil
}
// CreatePreview 同步 dry-run 后把 top1 候选固化为待确认预览。
func (s *Service) CreatePreview(ctx context.Context, req contracts.ActiveScheduleRequest) (json.RawMessage, error) {
if s == nil || s.dryRun == nil || s.previewConfirm == nil {
return nil, errors.New("active-scheduler preview service 未初始化")
}
now := time.Now()
trig := buildDryRunTrigger(req, now)
trig.TriggerID = fmt.Sprintf("ast_api_%d_%d", req.UserID, now.UnixNano())
trig.TraceID = fmt.Sprintf("trace_api_preview_%d_%d", req.UserID, now.UnixNano())
dryRunResult, err := s.dryRun.DryRun(ctx, trig)
if err != nil {
return nil, err
}
previewResp, err := s.previewConfirm.CreatePreviewFromDryRun(ctx, activepreview.CreatePreviewRequest{
ActiveContext: dryRunResult.Context,
Observation: dryRunResult.Observation,
Candidates: dryRunResult.Candidates,
TriggerID: trig.TriggerID,
GeneratedAt: now,
})
if err != nil {
return nil, err
}
return marshalResponseJSON(previewResp.Detail)
}
// GetPreview 查询主动调度预览详情。
func (s *Service) GetPreview(ctx context.Context, req contracts.GetPreviewRequest) (json.RawMessage, error) {
if s == nil || s.previewConfirm == nil {
return nil, errors.New("active-scheduler preview service 未初始化")
}
detail, err := s.previewConfirm.GetPreview(ctx, req.UserID, req.PreviewID)
if err != nil {
return nil, err
}
return marshalResponseJSON(detail)
}
// ConfirmPreview 同步确认并正式应用主动调度预览。
func (s *Service) ConfirmPreview(ctx context.Context, req contracts.ConfirmPreviewRequest) (json.RawMessage, error) {
if s == nil || s.previewConfirm == nil {
return nil, errors.New("active-scheduler confirm service 未初始化")
}
editedChanges, err := decodeEditedChanges(req.EditedChanges)
if err != nil {
return nil, activeapply.NewApplyError(activeapply.ErrorCodeInvalidEditedChanges, "edited_changes 不是合法的变更数组", err)
}
requestedAt := req.RequestedAt
if requestedAt.IsZero() {
requestedAt = time.Now()
}
result, err := s.previewConfirm.ConfirmPreview(ctx, activeapply.ConfirmRequest{
PreviewID: req.PreviewID,
UserID: req.UserID,
CandidateID: req.CandidateID,
Action: activeapply.ConfirmAction(req.Action),
EditedChanges: editedChanges,
IdempotencyKey: req.IdempotencyKey,
RequestedAt: requestedAt,
TraceID: req.TraceID,
})
if err != nil {
return nil, err
}
return marshalResponseJSON(result)
}
func buildPreviewConfirmService(activeDAO *rootdao.ActiveScheduleDAO, dryRun *activesvc.DryRunService, scheduleApplyAdapter interface {
ApplyActiveScheduleChanges(context.Context, activeapplyadapter.ApplyActiveScheduleRequest) (activeapplyadapter.ApplyActiveScheduleResult, error)
}) (*activesvc.PreviewConfirmService, error) {
previewService, err := activepreview.NewService(activeDAO)
if err != nil {
return nil, err
}
return activesvc.NewPreviewConfirmService(dryRun, previewService, activeDAO, scheduleApplyAdapter)
}
func buildGraphRunner(dryRun *activesvc.DryRunService, llmService *llmservice.Service) (*activegraph.Runner, error) {
var llmClient *llmservice.Client
if llmService != nil {
llmClient = llmService.ProClient()
}
return activegraph.NewRunner(dryRun.AsGraphDryRunFunc(), activesel.NewService(llmClient))
}
func buildActiveSchedulerEventBus(outboxRepo *outboxinfra.Repository, kafkaCfg kafkabus.Config) (*outboxinfra.EventBus, error) {
if outboxRepo == nil {
return nil, errors.New("active-scheduler outbox repository 未初始化")
}
if err := outboxinfra.RegisterEventService(sharedevents.ActiveScheduleTriggeredEventType, outboxinfra.ServiceActiveScheduler); err != nil {
return nil, err
}
eventBus, err := outboxinfra.NewEventBus(outboxRepo, kafkaCfg)
if err != nil {
return nil, err
}
return eventBus, nil
}
func registerActiveSchedulerOutboxHandler(eventBus *outboxinfra.EventBus, outboxRepo *outboxinfra.Repository, workflow eventsvc.ActiveScheduleTriggeredProcessor) error {
if eventBus == nil {
return nil
}
return eventsvc.RegisterActiveScheduleTriggeredHandler(eventBus, outboxRepo, workflow)
}
func buildDryRunTrigger(req contracts.ActiveScheduleRequest, now time.Time) trigger.ActiveScheduleTrigger {
return trigger.ActiveScheduleTrigger{
UserID: req.UserID,
TriggerType: trigger.TriggerType(req.TriggerType),
Source: trigger.SourceAPIDryRun,
TargetType: trigger.TargetType(req.TargetType),
TargetID: req.TargetID,
FeedbackID: req.FeedbackID,
IdempotencyKey: req.IdempotencyKey,
MockNow: req.MockNow,
IsMockTime: req.MockNow != nil,
RequestedAt: now,
}
}
func normalizePayload(raw json.RawMessage) json.RawMessage {
if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" {
return json.RawMessage("{}")
}
return raw
}
func decodeEditedChanges(raw json.RawMessage) ([]activeapply.ApplyChange, error) {
if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" || strings.TrimSpace(string(raw)) == "null" {
return nil, nil
}
var changes []activeapply.ApplyChange
if err := json.Unmarshal(raw, &changes); err != nil {
return nil, err
}
return changes, nil
}
func marshalResponseJSON(value any) (json.RawMessage, error) {
raw, err := json.Marshal(value)
if err != nil {
return nil, err
}
return json.RawMessage(raw), nil
}
func normalizeJobScanLimit(limit int) int {
if limit <= 0 {
return defaultJobScanLimit
}
return limit
}