diff --git a/backend/active_scheduler/service/trigger_outbox.go b/backend/active_scheduler/service/trigger_outbox.go index 0620835..388b181 100644 --- a/backend/active_scheduler/service/trigger_outbox.go +++ b/backend/active_scheduler/service/trigger_outbox.go @@ -8,7 +8,6 @@ import ( "strings" "time" - kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" "github.com/LoveLosita/smartflow/backend/model" sharedevents "github.com/LoveLosita/smartflow/backend/shared/events" @@ -25,13 +24,13 @@ const requestedNotificationDedupeWindow = 30 * time.Minute func EnqueueActiveScheduleTriggeredInTx( ctx context.Context, outboxRepo *outboxinfra.Repository, - kafkaCfg kafkabus.Config, + maxRetry int, payload sharedevents.ActiveScheduleTriggeredPayload, ) error { return enqueueContractEventInTx( ctx, outboxRepo, - kafkaCfg, + maxRetry, sharedevents.ActiveScheduleTriggeredEventType, sharedevents.ActiveScheduleTriggeredEventVersion, payload.MessageKey(), @@ -51,13 +50,13 @@ func EnqueueActiveScheduleTriggeredInTx( func EnqueueNotificationFeishuRequestedInTx( ctx context.Context, outboxRepo *outboxinfra.Repository, - kafkaCfg kafkabus.Config, + maxRetry int, payload sharedevents.FeishuNotificationRequestedPayload, ) error { return enqueueContractEventInTx( ctx, outboxRepo, - kafkaCfg, + maxRetry, sharedevents.NotificationFeishuRequestedEventType, sharedevents.NotificationFeishuRequestedEventVersion, payload.MessageKey(), @@ -156,7 +155,7 @@ func BuildNotificationDedupeKey(userID int, triggerType string, requestedAt time func enqueueContractEventInTx( ctx context.Context, outboxRepo *outboxinfra.Repository, - kafkaCfg kafkabus.Config, + maxRetry int, eventType string, eventVersion string, messageKey string, @@ -179,8 +178,10 @@ func enqueueContractEventInTx( if err != nil { return err } + if maxRetry <= 0 { + maxRetry = 20 + } - cfg := normalizeKafkaConfig(kafkaCfg) wrapped := outboxinfra.OutboxEventPayload{ EventID: strings.TrimSpace(eventID), EventType: eventType, @@ -188,20 +189,12 @@ func enqueueContractEventInTx( AggregateID: strings.TrimSpace(aggregateID), Payload: payloadJSON, } - _, err = outboxRepo.CreateMessage(ctx, eventType, cfg.Topic, strings.TrimSpace(messageKey), wrapped, cfg.MaxRetry) + // 1. 这里只负责把已经校验过的事件契约写入 outbox;具体 service/table/topic 由仓库按 eventType 解析。 + // 2. 这样 active scheduler 侧不再显式依赖 topic,后续切服务级路由时只需要维护事件归属表。 + _, err = outboxRepo.CreateMessage(ctx, eventType, strings.TrimSpace(messageKey), wrapped, maxRetry) return err } -func normalizeKafkaConfig(cfg kafkabus.Config) kafkabus.Config { - if strings.TrimSpace(cfg.Topic) == "" { - cfg.Topic = kafkabus.DefaultTopic - } - if cfg.MaxRetry <= 0 { - cfg.MaxRetry = 20 - } - return cfg -} - func buildNotificationFallbackText(summary string, targetURL string) string { link := strings.TrimSpace(targetURL) if summary == "" { diff --git a/backend/active_scheduler/service/trigger_pipeline.go b/backend/active_scheduler/service/trigger_pipeline.go index 3175211..3686a87 100644 --- a/backend/active_scheduler/service/trigger_pipeline.go +++ b/backend/active_scheduler/service/trigger_pipeline.go @@ -209,7 +209,7 @@ func (s *TriggerWorkflowService) ProcessTriggeredInTx( previewResp.Detail.Notification, now, ) - return EnqueueNotificationFeishuRequestedInTx(ctx, s.outbox.WithTx(tx), s.kafkaCfg, notificationPayload) + return EnqueueNotificationFeishuRequestedInTx(ctx, s.outbox.WithTx(tx), s.kafkaCfg.MaxRetry, notificationPayload) } // MarkTriggerFailedBestEffort 在事务外补记 trigger failed 状态,供 outbox retry 前排障。 diff --git a/backend/cmd/start.go b/backend/cmd/start.go index 9996988..5fca1ab 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -64,7 +64,7 @@ type appRuntime struct { agentCache *dao.AgentCache manager *dao.RepoManager outboxRepo *outboxinfra.Repository - eventBus *outboxinfra.EventBus + eventBus eventsvc.OutboxBus memoryModule *memory.Module activeJobScanner *activejob.Scanner activeTriggerWorkflow *activesvc.TriggerWorkflowService @@ -88,46 +88,39 @@ func loadConfig() error { return nil } -// Start 保留历史入口,默认仍按 all 模式启动。 -// -// 职责边界: -// 1. 兼容 backend/main.go 以及旧部署命令; -// 2. 不新增业务语义,只委托给 StartAll; -// 3. 后续若部署全面切到独立 api/worker,可逐步废弃该兼容入口。 +// Start 保留历史兼容入口,当前默认等价于 StartAll。 +// 1. 兼容 backend/main.go 和旧部署命令。 +// 2. 不新增业务语义,只转发给 StartAll。 +// 3. 后续若全面切到独立 api/worker 启动,本入口只保留过渡兼容。 func Start() { StartAll() } -// StartAll 启动迁移期兼容模式:HTTP API 与后台 worker 在同一进程内运行。 +// StartAll 启动当前仓库的完整运行态:HTTP API + 后台 worker。 +// 这仍然是迁移期的兼容装配,不是终态的“一个服务一个 main.go”模型。 func StartAll() { - ctx := context.Background() + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() runtime := mustBuildRuntime(ctx) defer runtime.close() runtime.startWorkers(ctx) - runtime.startHTTP() + runtime.startHTTP(ctx) } -// StartAPI 只启动 Gin API 及其同步 service/dao 依赖,不启动后台 worker。 -// -// 说明: -// 1. 该模式仍是“带 service/dao 的 API 单体”,不是最终 API Gateway; -// 2. API 可以继续写入 outbox,但不负责消费 outbox,也不启动 memory worker; -// 3. worker 停止时,API 仍可提供同步接口,只是异步能力会延迟处理。 +// StartAPI 只启动 Gin API 和其同步依赖,不启动后台 worker。 +// 这仍是迁移期的单体 API 模式,不是终态的独立网关。 func StartAPI() { - ctx := context.Background() + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() runtime := mustBuildRuntime(ctx) defer runtime.close() - runtime.startHTTP() + runtime.startHTTP(ctx) } // StartWorker 只启动后台异步能力,不注册 Gin 路由。 -// -// 运行内容: -// 1. outbox relay:扫描 pending 消息并投递 Kafka; -// 2. Kafka consumer:消费事件并分发到业务 handler; -// 3. memory worker:处理 memory_jobs 后台任务。 +// 当前包含 outbox relay / Kafka consumer / memory worker / 主动调度扫描 / 通知重试。 func StartWorker() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() @@ -305,7 +298,7 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { } handlers := buildAPIHandlers(userService, taskSv, taskClassService, courseService, scheduleService, agentService, memoryModule, activeScheduleDryRun, activeSchedulePreviewConfirm, activeScheduleTrigger, notificationChannelService) - return &appRuntime{ + runtime := &appRuntime{ db: db, redisClient: rdb, cacheRepo: cacheRepo, @@ -321,7 +314,13 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { notificationService: notificationService, limiter: limiter, handlers: handlers, - }, nil + } + if runtime.eventBus != nil { + if err := runtime.registerEventHandlers(); err != nil { + return nil, err + } + } + return runtime, nil } func buildRAGRuntime(ctx context.Context) (infrarag.Runtime, error) { @@ -346,16 +345,24 @@ func buildRAGRuntime(ctx context.Context) (infrarag.Runtime, error) { return ragRuntime, nil } -func buildEventBus(outboxRepo *outboxinfra.Repository) (*outboxinfra.EventBus, error) { - // outbox 通用事件总线接线: - // 1. API 模式只使用 Publish 写入 outbox,不启动后台循环; - // 2. worker/all 模式再显式注册 handler 并启动后台循环; +func buildEventBus(outboxRepo *outboxinfra.Repository) (eventsvc.OutboxBus, error) { + // outbox 多 service 门面装配: + // 1. 按 service 维度创建独立 engine,topic / group 由 service 名称推导; + // 2. 对外仍然只暴露一个 Publish / Start / Close 门面; // 3. kafka.enabled=false 时返回 nil,业务按既有降级策略执行。 kafkaCfg := kafkabus.LoadConfig() - eventBus, err := outboxinfra.NewEventBus(outboxRepo, kafkaCfg) - if err != nil { - return nil, fmt.Errorf("failed to initialize outbox event bus: %w", err) + serviceBuses := make(map[string]eventsvc.OutboxBus, len(eventsvc.OutboxServiceNames())) + for _, serviceName := range eventsvc.OutboxServiceNames() { + bus, err := eventsvc.NewServiceOutboxBus(outboxRepo, kafkaCfg, serviceName) + if err != nil { + return nil, fmt.Errorf("failed to initialize outbox event bus for service %s: %w", serviceName, err) + } + if bus != nil { + serviceBuses[serviceName] = bus + } } + + eventBus := eventsvc.NewRoutedOutboxBus(serviceBuses) if eventBus == nil { log.Println("Outbox event bus is disabled") } @@ -855,9 +862,6 @@ func (r *appRuntime) startWorkers(ctx context.Context) { } if r.eventBus != nil { - if err := r.registerEventHandlers(); err != nil { - log.Fatalf("Failed to register outbox event handlers: %v", err) - } r.eventBus.Start(ctx) log.Println("Outbox event bus started") } else { @@ -878,24 +882,25 @@ func (r *appRuntime) startWorkers(ctx context.Context) { } func (r *appRuntime) registerEventHandlers() error { - // 调用目的:worker/all 启动时复用同一套核心事件注册顺序,避免未来新增入口后复制多份 handler 接线。 - if err := eventsvc.RegisterCoreOutboxHandlers(r.eventBus, r.outboxRepo, r.manager, r.agentRepo, r.cacheRepo, r.memoryModule); err != nil { + // 调用目的:在运行时启动前一次性完成“事件类型 -> 服务归属 -> handler”的显式接线,避免 API 模式发布事件时拿不到路由表。 + if err := eventsvc.RegisterAllOutboxHandlers( + r.eventBus, + r.outboxRepo, + r.manager, + r.agentRepo, + r.cacheRepo, + r.memoryModule, + r.activeTriggerWorkflow, + r.notificationService, + ); err != nil { return err } - if err := eventsvc.RegisterActiveScheduleTriggeredHandler(r.eventBus, r.outboxRepo, r.activeTriggerWorkflow); err != nil { - return fmt.Errorf("注册主动调度触发 handler 失败: %w", err) - } - // 调用目的:飞书通知事件消费与 notification retry loop 复用同一个服务实例, - // 保证后续接入真实 provider 时只需要替换启动期注入配置。 - if err := eventsvc.RegisterFeishuNotificationHandler(r.eventBus, r.outboxRepo, r.notificationService); err != nil { - return fmt.Errorf("注册飞书通知 handler 失败: %w", err) - } return nil } -func (r *appRuntime) startHTTP() { +func (r *appRuntime) startHTTP(ctx context.Context) { router := routers.RegisterRouters(r.handlers, r.cacheRepo, r.userRepo, r.limiter) - routers.StartEngine(router) + routers.StartEngine(ctx, router) } func (r *appRuntime) close() { diff --git a/backend/infra/kafka/config.go b/backend/infra/kafka/config.go index c62a6b3..cddf524 100644 --- a/backend/infra/kafka/config.go +++ b/backend/infra/kafka/config.go @@ -19,6 +19,8 @@ type Config struct { Brokers []string Topic string GroupID string + // ServiceName 表示当前进程所属的 outbox 服务;为空时保持单体全量模式。 + ServiceName string // RetryScanInterval/RetryBatchSize/MaxRetry 作用于 outbox 扫描与失败重试。 RetryScanInterval time.Duration RetryBatchSize int @@ -40,10 +42,14 @@ func LoadConfig() Config { Brokers: brokers, Topic: strings.TrimSpace(viper.GetString("kafka.topic")), GroupID: strings.TrimSpace(viper.GetString("kafka.groupID")), + ServiceName: strings.TrimSpace(viper.GetString("outbox.serviceName")), RetryScanInterval: viper.GetDuration("kafka.retryScanInterval"), RetryBatchSize: viper.GetInt("kafka.retryBatchSize"), MaxRetry: viper.GetInt("kafka.maxRetry"), } + if cfg.ServiceName == "" { + cfg.ServiceName = strings.TrimSpace(viper.GetString("kafka.serviceName")) + } if cfg.Topic == "" { cfg.Topic = DefaultTopic } diff --git a/backend/infra/kafka/consumer.go b/backend/infra/kafka/consumer.go index 4124e1c..1ddd851 100644 --- a/backend/infra/kafka/consumer.go +++ b/backend/infra/kafka/consumer.go @@ -3,6 +3,7 @@ package kafka import ( "context" "errors" + "strings" segmentkafka "github.com/segmentio/kafka-go" ) @@ -13,7 +14,13 @@ type Consumer struct { func NewConsumer(cfg Config) (*Consumer, error) { if len(cfg.Brokers) == 0 { - return nil, errors.New("kafka brokers 未配置") + return nil, errors.New("kafka brokers not configured") + } + if strings.TrimSpace(cfg.Topic) == "" { + return nil, errors.New("kafka topic not configured") + } + if strings.TrimSpace(cfg.GroupID) == "" { + return nil, errors.New("kafka groupID not configured") } reader := segmentkafka.NewReader(segmentkafka.ReaderConfig{ Brokers: cfg.Brokers, @@ -30,14 +37,14 @@ func NewConsumer(cfg Config) (*Consumer, error) { // Dequeue 从 Kafka 拉取一条消息(不自动提交 offset)。 func (c *Consumer) Dequeue(ctx context.Context) (segmentkafka.Message, error) { if c == nil || c.reader == nil { - return segmentkafka.Message{}, errors.New("kafka consumer 未初始化") + return segmentkafka.Message{}, errors.New("kafka consumer not initialized") } return c.reader.FetchMessage(ctx) } func (c *Consumer) Commit(ctx context.Context, msg segmentkafka.Message) error { if c == nil || c.reader == nil { - return errors.New("kafka consumer 未初始化") + return errors.New("kafka consumer not initialized") } return c.reader.CommitMessages(ctx, msg) } diff --git a/backend/infra/kafka/envelope.go b/backend/infra/kafka/envelope.go index 2b0148b..a00c47a 100644 --- a/backend/infra/kafka/envelope.go +++ b/backend/infra/kafka/envelope.go @@ -18,6 +18,8 @@ type Envelope struct { EventType string `json:"event_type"` // EventVersion 是事件版本号(默认 v1)。 EventVersion string `json:"event_version,omitempty"` + // ServiceName 是事件归属服务;空值通常表示旧兼容消息或全量模式。 + ServiceName string `json:"service_name,omitempty"` // AggregateID 是聚合主键(例如 conversation_id),用于追踪同一业务对象事件流。 AggregateID string `json:"aggregate_id,omitempty"` diff --git a/backend/infra/outbox/engine.go b/backend/infra/outbox/engine.go index 3099086..cf13c74 100644 --- a/backend/infra/outbox/engine.go +++ b/backend/infra/outbox/engine.go @@ -40,19 +40,19 @@ type PublishRequest struct { Payload any } -// Engine 是 Outbox + Kafka 通用异步引擎。 +// Engine 是单个服务的 Outbox + Kafka 异步引擎。 // // 职责边界: -// 1. 负责 outbox 扫描、kafka 投递、kafka 消费、状态机推进; +// 1. 负责一个服务目录下的 outbox 扫描、Kafka 投递、Kafka 消费、状态机推进; // 2. 负责 event_type -> handler 路由; -// 3. 不负责任何业务语义(业务由 handler 承担)。 +// 3. 不负责任何跨服务路由决策,跨服务分发由 EventBus 门面完成。 type Engine struct { repo *Repository producer *kafkabus.Producer consumer *kafkabus.Consumer brokers []string - topic string + route ServiceRoute maxRetry int scanEvery time.Duration scanBatch int @@ -61,11 +61,12 @@ type Engine struct { handlers map[string]MessageHandler } -// NewEngine 创建异步引擎。 +// NewEngine 创建单服务异步引擎。 // // 规则: // 1. kafka.enabled=false 时返回 nil,调用方可降级同步; -// 2. producer/consumer 任一步失败都会回收已创建资源。 +// 2. serviceName 非空时优先使用服务级默认目录,topic/group/table 不再沿用共享终态; +// 3. producer/consumer 任一步失败都会回收已创建资源。 func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) { if !cfg.Enabled { return nil, nil @@ -74,6 +75,11 @@ func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) { return nil, errors.New("outbox repository is nil") } + route := resolveEngineRoute(repo, cfg) + cfg.Topic = route.Topic + cfg.GroupID = route.GroupID + + serviceRepo := repo.WithRoute(route) producer, err := kafkabus.NewProducer(cfg) if err != nil { return nil, err @@ -85,11 +91,11 @@ func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) { } return &Engine{ - repo: repo, + repo: serviceRepo, producer: producer, consumer: consumer, brokers: cfg.Brokers, - topic: cfg.Topic, + route: route, maxRetry: cfg.MaxRetry, scanEvery: cfg.RetryScanInterval, scanBatch: cfg.RetryBatchSize, @@ -118,7 +124,7 @@ func (e *Engine) RegisterEventHandler(eventType string, handler MessageHandler) e.handlersMu.Lock() defer e.handlersMu.Unlock() if _, exists := e.handlers[eventType]; exists { - log.Printf("outbox handler 覆盖注册: event_type=%s", eventType) + log.Printf("outbox handler 覆盖注册: service=%s event_type=%s", e.route.ServiceName, eventType) } e.handlers[eventType] = handler return nil @@ -137,11 +143,20 @@ func (e *Engine) Start(ctx context.Context) { return } - log.Printf("outbox engine starting: topic=%s brokers=%v retry_scan=%s batch=%d", e.topic, e.brokers, e.scanEvery, e.scanBatch) - if err := kafkabus.WaitTopicReady(ctx, e.brokers, e.topic, 30*time.Second); err != nil { + log.Printf( + "outbox engine starting: service=%s table=%s topic=%s group=%s brokers=%v retry_scan=%s batch=%d", + e.route.ServiceName, + e.route.TableName, + e.route.Topic, + e.route.GroupID, + e.brokers, + e.scanEvery, + e.scanBatch, + ) + if err := kafkabus.WaitTopicReady(ctx, e.brokers, e.route.Topic, 30*time.Second); err != nil { log.Printf("Kafka topic not ready before consume loop start: %v", err) } else { - log.Printf("Kafka topic is ready: %s", e.topic) + log.Printf("Kafka topic is ready: %s", e.route.Topic) } e.StartDispatch(ctx) @@ -149,11 +164,6 @@ func (e *Engine) Start(ctx context.Context) { } // StartDispatch 单独启动 outbox -> Kafka 的投递循环。 -// -// 职责边界: -// 1. 只负责启动 dispatch 后台 goroutine,不负责启动 Kafka 消费; -// 2. 不重复执行 Start 中的 topic readiness 等待,避免改变原 Start(ctx) 的启动语义; -// 3. ctx 取消后由内部循环自行退出,调用方无需额外停止 goroutine。 func (e *Engine) StartDispatch(ctx context.Context) { if e == nil { return @@ -162,11 +172,6 @@ func (e *Engine) StartDispatch(ctx context.Context) { } // StartConsume 单独启动 Kafka -> handler 的消费循环。 -// -// 职责边界: -// 1. 只负责启动 consume 后台 goroutine,不负责扫描或投递 outbox; -// 2. 不注册业务 handler,handler 仍由 RegisterEventHandler 显式注入; -// 3. ctx 取消或 consumer 返回 context.Canceled 时,内部循环按既有逻辑退出。 func (e *Engine) StartConsume(ctx context.Context) { if e == nil { return @@ -202,7 +207,7 @@ func (e *Engine) Enqueue(ctx context.Context, eventType, messageKey string, payl // 步骤: // 1. 标准化 event_type/version/key; // 2. payload 序列化; -// 3. 写入 outbox(仅本地写库,不做 kafka 网络 IO)。 +// 3. 写入当前服务的 outbox 表,不再由调用方手传 topic。 func (e *Engine) Publish(ctx context.Context, req PublishRequest) error { if e == nil { return errors.New("outbox engine is nil") @@ -227,7 +232,7 @@ func (e *Engine) Publish(ctx context.Context, req PublishRequest) error { return err } - _, err = e.repo.CreateMessage(ctx, eventType, e.topic, messageKey, OutboxEventPayload{ + _, err = e.repo.CreateMessage(ctx, eventType, messageKey, OutboxEventPayload{ EventID: strings.TrimSpace(req.EventID), EventType: eventType, EventVersion: eventVersion, @@ -246,13 +251,13 @@ func (e *Engine) startDispatchLoop(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - pendingMessages, err := e.repo.ListDueMessages(ctx, e.scanBatch) + pendingMessages, err := e.repo.ListDueMessages(ctx, e.route.ServiceName, e.scanBatch) if err != nil { log.Printf("扫描 outbox 失败: %v", err) continue } if len(pendingMessages) > 0 { - log.Printf("outbox due messages=%d, start dispatch", len(pendingMessages)) + log.Printf("outbox due messages=%d, service=%s start dispatch", len(pendingMessages), e.route.ServiceName) } for _, msg := range pendingMessages { @@ -287,18 +292,23 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error { if eventPayload.EventID == "" { eventPayload.EventID = strconv.FormatInt(outboxMsg.ID, 10) } + serviceName := strings.TrimSpace(outboxMsg.ServiceName) + if serviceName == "" { + serviceName = e.route.ServiceName + } envelope := kafkabus.Envelope{ OutboxID: outboxMsg.ID, EventID: eventPayload.EventID, EventType: eventPayload.EventType, EventVersion: eventPayload.EventVersion, + ServiceName: serviceName, AggregateID: eventPayload.AggregateID, Payload: eventPayload.PayloadJSON, } raw, err := json.Marshal(envelope) if err != nil { - markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 包装失败: "+err.Error()) + markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 封装失败: "+err.Error()) if markErr != nil { log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr) } @@ -329,7 +339,7 @@ func (e *Engine) startConsumeLoop(ctx context.Context) { if errors.Is(err, context.Canceled) { return } - log.Printf("Kafka 消费拉取失败(topic=%s): %v", e.topic, err) + log.Printf("Kafka 消费拉取失败(topic=%s): %v", e.route.Topic, err) time.Sleep(300 * time.Millisecond) continue } @@ -344,11 +354,11 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er var envelope kafkabus.Envelope if err := json.Unmarshal(msg.Value, &envelope); err != nil { _ = e.consumer.Commit(ctx, msg) - return fmt.Errorf("解析 Kafka 包装失败: %w", err) + return fmt.Errorf("解析 Kafka 封装失败: %w", err) } if envelope.OutboxID <= 0 { _ = e.consumer.Commit(ctx, msg) - return errors.New("Kafka 包装缺少 outbox_id") + return errors.New("Kafka 封装缺少 outbox_id") } eventType := strings.TrimSpace(envelope.EventType) @@ -360,9 +370,36 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er return nil } + runtimeServiceName := strings.TrimSpace(e.route.ServiceName) + if runtimeServiceName != "" { + messageServiceName := strings.TrimSpace(envelope.ServiceName) + if messageServiceName == "" { + if resolvedServiceName, ok := ResolveEventService(eventType); ok { + messageServiceName = resolvedServiceName + } + } + if messageServiceName == "" || messageServiceName != runtimeServiceName { + log.Printf( + "跳过非本服务事件: runtime_service=%s message_service=%s event_type=%s outbox_id=%d", + runtimeServiceName, + messageServiceName, + eventType, + envelope.OutboxID, + ) + if err := e.consumer.Commit(ctx, msg); err != nil { + return err + } + return nil + } + } + handler, ok := e.getHandler(eventType) if !ok { - _ = e.repo.MarkDead(ctx, envelope.OutboxID, "未知事件类型: "+eventType) + if runtimeServiceName == "" { + _ = e.repo.MarkDead(ctx, envelope.OutboxID, "未知事件类型: "+eventType) + } else { + _ = e.repo.MarkDead(ctx, envelope.OutboxID, "本服务未注册 handler: "+eventType) + } if err := e.consumer.Commit(ctx, msg); err != nil { return err } @@ -381,3 +418,51 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er return e.consumer.Commit(ctx, msg) } + +func resolveEngineRoute(repo *Repository, cfg kafkabus.Config) ServiceRoute { + route := ServiceRoute{ + ServiceName: strings.TrimSpace(cfg.ServiceName), + Topic: strings.TrimSpace(cfg.Topic), + GroupID: strings.TrimSpace(cfg.GroupID), + } + if repo != nil { + repoRoute := normalizeServiceRoute(repo.route) + if route.ServiceName == "" { + route.ServiceName = repoRoute.ServiceName + } + if route.TableName == "" { + route.TableName = repoRoute.TableName + } + if route.Topic == "" { + route.Topic = repoRoute.Topic + } + if route.GroupID == "" { + route.GroupID = repoRoute.GroupID + } + } + + if route.ServiceName != "" { + defaultRoute := DefaultServiceRoute(route.ServiceName) + if route.TableName == "" { + route.TableName = defaultRoute.TableName + } + if route.Topic == "" { + route.Topic = defaultRoute.Topic + } + if route.GroupID == "" { + route.GroupID = defaultRoute.GroupID + } + return normalizeServiceRoute(route) + } + + if route.TableName == "" { + route.TableName = DefaultServiceRoute(ServiceNameAgent).TableName + } + if route.Topic == "" { + route.Topic = kafkabus.DefaultTopic + } + if route.GroupID == "" { + route.GroupID = kafkabus.DefaultGroup + } + return normalizeServiceRoute(route) +} diff --git a/backend/infra/outbox/event_bus.go b/backend/infra/outbox/event_bus.go index 423ff4d..5ea724a 100644 --- a/backend/infra/outbox/event_bus.go +++ b/backend/infra/outbox/event_bus.go @@ -3,111 +3,175 @@ package outbox import ( "context" "errors" + "fmt" + "strings" + "sync" kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" ) // EventPublisher 是通用事件发布能力接口。 -// -// 职责边界: -// 1. 只暴露“发布事件”这一件事,隐藏底层 outbox/kafka 实现细节; -// 2. 业务层只依赖该接口,避免直接耦合具体引擎结构体; -// 3. 该接口不承诺“立即消费成功”,只承诺“事件已入队或返回错误”。 type EventPublisher interface { Publish(ctx context.Context, req PublishRequest) error } -// EventBus 是 outbox 异步总线的门面对象。 +// EventBus 是 outbox 多服务引擎的门面。 // -// 设计目的: -// 1. 对外提供“发布 + 注册处理器 + 启停”三类最小能力; -// 2. 对内复用 Engine,不重复实现状态机和调度逻辑; -// 3. 为后续引入更多事件类型提供统一扩展点。 +// 职责边界: +// 1. 对外只暴露“发布、注册 handler、启动、关闭”四类能力; +// 2. 内部按事件归属把调用路由到对应 service engine; +// 3. 不再把共享 topic 当主路径,服务级路由始终优先。 type EventBus struct { - engine *Engine + repo *Repository + cfg kafkabus.Config + + mu sync.RWMutex + engines map[string]*Engine } -// NewEventBus 创建通用事件总线。 +// NewEventBus 创建多服务事件门面。 // // 说明: -// 1. 当 kafka.enabled=false 时返回 nil,调用方可直接降级为同步模式; -// 2. 该方法只创建基础设施对象,不自动注册任何业务事件处理器; -// 3. 业务事件处理器注册应由上层在启动阶段显式完成,避免隐式副作用。 +// 1. kafka.enabled=false 时返回 nil,调用方可直接降级; +// 2. 实际 service engine 在需要时按服务目录懒加载; +// 3. 懒加载不会改变既有事件契约,只是把物理资源拆到各自服务。 func NewEventBus(repo *Repository, cfg kafkabus.Config) (*EventBus, error) { - engine, err := NewEngine(repo, cfg) + if !cfg.Enabled { + return nil, nil + } + if repo == nil { + return nil, errors.New("outbox repository is nil") + } + return &EventBus{ + repo: repo, + cfg: cfg, + engines: make(map[string]*Engine), + }, nil +} + +// RegisterEventHandler 注册事件处理器。 +func (b *EventBus) RegisterEventHandler(eventType string, handler MessageHandler) error { + if b == nil { + return errors.New("event bus is not initialized") + } + + route, err := b.routeForEvent(eventType) + if err != nil { + return err + } + engine, err := b.ensureEngine(route) + if err != nil { + return err + } + return engine.RegisterEventHandler(eventType, handler) +} + +// Publish 把事件路由到对应服务的 outbox 表与 Kafka 资源。 +func (b *EventBus) Publish(ctx context.Context, req PublishRequest) error { + if b == nil { + return errors.New("event bus is not initialized") + } + + route, err := b.routeForEvent(req.EventType) + if err != nil { + return err + } + engine, err := b.ensureEngine(route) + if err != nil { + return err + } + return engine.Publish(ctx, req) +} + +// Start 启动所有已创建的 service engine。 +func (b *EventBus) Start(ctx context.Context) { + if b == nil { + return + } + for _, engine := range b.snapshotEngines() { + go engine.Start(ctx) + } +} + +// StartDispatch 只启动所有已创建 engine 的 dispatch 循环。 +func (b *EventBus) StartDispatch(ctx context.Context) { + if b == nil { + return + } + for _, engine := range b.snapshotEngines() { + go engine.StartDispatch(ctx) + } +} + +// StartConsume 只启动所有已创建 engine 的消费循环。 +func (b *EventBus) StartConsume(ctx context.Context) { + if b == nil { + return + } + for _, engine := range b.snapshotEngines() { + go engine.StartConsume(ctx) + } +} + +// Close 关闭所有 service engine 的 Kafka 资源。 +func (b *EventBus) Close() { + if b == nil { + return + } + for _, engine := range b.snapshotEngines() { + engine.Close() + } +} + +func (b *EventBus) routeForEvent(eventType string) (ServiceRoute, error) { + route, ok := ResolveEventRoute(eventType) + if !ok { + return ServiceRoute{}, fmt.Errorf("outbox route not registered: eventType=%s", strings.TrimSpace(eventType)) + } + return route, nil +} + +func (b *EventBus) ensureEngine(route ServiceRoute) (*Engine, error) { + serviceName := route.ServiceName + if serviceName == "" { + return nil, errors.New("serviceName is empty") + } + + b.mu.RLock() + if engine, ok := b.engines[serviceName]; ok { + b.mu.RUnlock() + return engine, nil + } + b.mu.RUnlock() + + b.mu.Lock() + defer b.mu.Unlock() + if engine, ok := b.engines[serviceName]; ok { + return engine, nil + } + + cfg := b.cfg + cfg.ServiceName = serviceName + cfg.Topic = route.Topic + cfg.GroupID = route.GroupID + + engine, err := NewEngine(b.repo, cfg) if err != nil { return nil, err } if engine == nil { return nil, nil } - return &EventBus{engine: engine}, nil + b.engines[serviceName] = engine + return engine, nil } -// RegisterEventHandler 注册事件处理器。 -// -// 失败语义: -// 1. bus 未初始化时直接返回错误; -// 2. event_type 为空或 handler 为空时返回错误; -// 3. 重复注册时采用“后者覆盖前者”并打日志(由 Engine 负责)。 -func (b *EventBus) RegisterEventHandler(eventType string, handler MessageHandler) error { - if b == nil || b.engine == nil { - return errors.New("event bus is not initialized") +func (b *EventBus) snapshotEngines() []*Engine { + b.mu.RLock() + defer b.mu.RUnlock() + engines := make([]*Engine, 0, len(b.engines)) + for _, engine := range b.engines { + engines = append(engines, engine) } - return b.engine.RegisterEventHandler(eventType, handler) -} - -// Publish 发布事件到 outbox 队列。 -// -// 关键语义: -// 1. 返回 nil 仅表示“已写入 outbox 成功”; -// 2. 真正 Kafka 投递与业务消费由后台异步循环完成; -// 3. 若返回 error,表示本次入队失败,调用方应按业务策略决定是否重试/降级。 -func (b *EventBus) Publish(ctx context.Context, req PublishRequest) error { - if b == nil || b.engine == nil { - return errors.New("event bus is not initialized") - } - return b.engine.Publish(ctx, req) -} - -// Start 启动事件总线后台循环(dispatch + consume)。 -func (b *EventBus) Start(ctx context.Context) { - if b == nil || b.engine == nil { - return - } - b.engine.Start(ctx) -} - -// StartDispatch 单独启动事件总线的 outbox 投递循环。 -// -// 职责边界: -// 1. 只暴露 relay/dispatch 运行职责,便于独立进程只负责投递; -// 2. 不启动消费循环,避免与独立 consumer 进程争抢职责; -// 3. 不改变 Start(ctx) 的既有组合启动行为。 -func (b *EventBus) StartDispatch(ctx context.Context) { - if b == nil || b.engine == nil { - return - } - b.engine.StartDispatch(ctx) -} - -// StartConsume 单独启动事件总线的 Kafka 消费循环。 -// -// 职责边界: -// 1. 只暴露 consumer 运行职责,便于独立进程只负责消费; -// 2. 不扫描 outbox、不投递 Kafka,状态推进仍复用 Engine 既有逻辑; -// 3. handler 注册仍由调用方在启动前显式完成。 -func (b *EventBus) StartConsume(ctx context.Context) { - if b == nil || b.engine == nil { - return - } - b.engine.StartConsume(ctx) -} - -// Close 关闭事件总线资源(producer/consumer)。 -func (b *EventBus) Close() { - if b == nil || b.engine == nil { - return - } - b.engine.Close() + return engines } diff --git a/backend/infra/outbox/repository.go b/backend/infra/outbox/repository.go index 0ae922b..e9bb641 100644 --- a/backend/infra/outbox/repository.go +++ b/backend/infra/outbox/repository.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "errors" + "fmt" + "strings" "time" "github.com/LoveLosita/smartflow/backend/model" @@ -15,10 +17,11 @@ import ( // // 职责边界: // 1. 只负责 outbox 状态流转与通用事务编排; -// 2. 不负责任何业务语义(例如聊天/任务/标题等具体落库); -// 3. 消费成功时通过回调把业务动作注入同一事务,保证原子一致。 +// 2. 不负责聊天、任务、通知等具体业务语义; +// 3. 同一仓储实例只面向一个服务级 outbox 目录,避免把共享表当成终态。 type Repository struct { - db *gorm.DB + db *gorm.DB + route ServiceRoute } func NewRepository(db *gorm.DB) *Repository { @@ -27,20 +30,51 @@ func NewRepository(db *gorm.DB) *Repository { // WithTx 用外部事务句柄构造同事务仓储实例。 func (d *Repository) WithTx(tx *gorm.DB) *Repository { - return &Repository{db: tx} + if d == nil { + return &Repository{db: tx} + } + return &Repository{db: tx, route: d.route} } -// CreateMessage 把事件写入 outbox(入队)。 +// WithRoute 用指定服务目录构造服务级仓储。 // -// 步骤: -// 1. 序列化 payload; -// 2. 初始化 pending 状态; -// 3. 写入 outbox 并返回 outbox_id。 -func (d *Repository) CreateMessage(ctx context.Context, eventType, topic, messageKey string, payload any, maxRetry int) (int64, error) { +// 职责边界: +// 1. 只切换 outbox 物理目录,不改变事务句柄; +// 2. 适合多个 service engine 共享同一 DB 连接; +// 3. 保留 route 的 table/topic/group,避免回落到共享 topic。 +func (d *Repository) WithRoute(route ServiceRoute) *Repository { + route = normalizeServiceRoute(route) + if d == nil { + return &Repository{route: route} + } + return &Repository{db: d.db, route: route} +} + +// CreateMessage 把事件写入 outbox。 +// +// 职责边界: +// 1. 只接受 eventType、messageKey、payload 和 maxRetry,不再允许业务侧显式传 topic; +// 2. table/topic/group 统一由 eventType -> service -> route 解析,确保服务级路由是唯一入口; +// 3. eventType 未注册时直接返回 error,避免消息静默落到默认表或默认 topic。 +func (d *Repository) CreateMessage(ctx context.Context, eventType string, messageKey string, payload any, maxRetry int) (int64, error) { + if d == nil || d.db == nil { + return 0, errors.New("outbox repository is nil") + } + + eventType = strings.TrimSpace(eventType) + if eventType == "" { + return 0, errors.New("eventType is empty") + } + messageKey = strings.TrimSpace(messageKey) if maxRetry <= 0 { maxRetry = 20 } + route, err := d.resolvePublishRoute(eventType) + if err != nil { + return 0, err + } + raw, err := json.Marshal(payload) if err != nil { return 0, err @@ -49,7 +83,8 @@ func (d *Repository) CreateMessage(ctx context.Context, eventType, topic, messag now := time.Now() msg := model.AgentOutboxMessage{ EventType: eventType, - Topic: topic, + ServiceName: route.ServiceName, + Topic: route.Topic, MessageKey: messageKey, Payload: string(raw), Status: model.OutboxStatusPending, @@ -58,39 +93,48 @@ func (d *Repository) CreateMessage(ctx context.Context, eventType, topic, messag NextRetryAt: &now, } - if err = d.db.WithContext(ctx).Create(&msg).Error; err != nil { + if err = d.db.WithContext(ctx).Table(route.TableName).Create(&msg).Error; err != nil { return 0, err } return msg.ID, nil } +// GetByID 从当前仓储绑定的 outbox 表读取指定消息。 func (d *Repository) GetByID(ctx context.Context, id int64) (*model.AgentOutboxMessage, error) { var msg model.AgentOutboxMessage - if err := d.db.WithContext(ctx).Where("id = ?", id).First(&msg).Error; err != nil { + if err := d.scopedDB(ctx).Where("id = ?", id).First(&msg).Error; err != nil { return nil, err } return &msg, nil } // ListDueMessages 拉取到期可投递消息。 -func (d *Repository) ListDueMessages(ctx context.Context, limit int) ([]model.AgentOutboxMessage, error) { +// +// 说明: +// 1. serviceName 为空时保持当前仓储目录内的扫描语义; +// 2. serviceName 非空时只扫描对应服务的消息; +// 3. 这样既能支持单服务 relay,也能支持后续多服务 relay。 +func (d *Repository) ListDueMessages(ctx context.Context, serviceName string, limit int) ([]model.AgentOutboxMessage, error) { if limit <= 0 { limit = 100 } now := time.Now() var messages []model.AgentOutboxMessage - err := d.db.WithContext(ctx). + query := d.scopedDB(ctx). Where("status = ? AND next_retry_at IS NOT NULL AND next_retry_at <= ?", model.OutboxStatusPending, now). Order("next_retry_at ASC, id ASC"). - Limit(limit). - Find(&messages).Error - if err != nil { + Limit(limit) + serviceName = strings.TrimSpace(serviceName) + if serviceName != "" { + query = query.Where("service_name = ?", serviceName) + } + if err := query.Find(&messages).Error; err != nil { return nil, err } return messages, nil } -// MarkPublished 标记为已投递 Kafka。 +// MarkPublished 标记消息已成功投递到 Kafka。 func (d *Repository) MarkPublished(ctx context.Context, id int64) error { now := time.Now() updates := map[string]interface{}{ @@ -99,14 +143,14 @@ func (d *Repository) MarkPublished(ctx context.Context, id int64) error { "last_error": nil, "next_retry_at": nil, } - result := d.db.WithContext(ctx). + result := d.scopedDB(ctx). Model(&model.AgentOutboxMessage{}). Where("id = ? AND status NOT IN (?, ?)", id, model.OutboxStatusConsumed, model.OutboxStatusDead). Updates(updates) return result.Error } -// MarkDead 标记为死信。 +// MarkDead 把消息标记为死信。 func (d *Repository) MarkDead(ctx context.Context, id int64, reason string) error { now := time.Now() lastErr := truncateError(reason) @@ -116,21 +160,20 @@ func (d *Repository) MarkDead(ctx context.Context, id int64, reason string) erro "next_retry_at": nil, "updated_at": now, } - return d.db.WithContext(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error + return d.scopedDB(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error } // MarkFailedForRetry 记录一次可重试失败并推进重试窗口。 // // 步骤: -// 1. 行级锁读取当前状态; -// 2. 最终态幂等短路; -// 3. retry_count+1; -// 4. 计算 next_retry_at 或 dead; -// 5. 写回状态快照。 +// 1. 行级锁读取当前消息状态; +// 2. 已进入 consumed/dead 时幂等短路; +// 3. retry_count+1,并根据最大次数决定继续 pending 还是转 dead; +// 4. 写回 last_error 和 next_retry_at,交给下一轮扫描继续投递。 func (d *Repository) MarkFailedForRetry(ctx context.Context, id int64, reason string) error { return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var msg model.AgentOutboxMessage - err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", id).First(&msg).Error + err := tx.Table(d.tableName()).Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", id).First(&msg).Error if err != nil { return err } @@ -159,7 +202,7 @@ func (d *Repository) MarkFailedForRetry(ctx context.Context, id int64, reason st "next_retry_at": nextRetryAt, "updated_at": now, } - return tx.Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error + return tx.Table(d.tableName()).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error }) } @@ -167,13 +210,13 @@ func (d *Repository) MarkFailedForRetry(ctx context.Context, id int64, reason st // // 步骤: // 1. 事务内锁定 outbox 记录; -// 2. 已 consumed/dead 时幂等返回; -// 3. 执行业务回调 fn(tx); +// 2. consumed/dead 状态幂等返回; +// 3. 执行业务回调 fn(tx),让业务落库和 outbox 状态共用同一事务; // 4. 业务成功后统一标记 consumed。 func (d *Repository) ConsumeAndMarkConsumed(ctx context.Context, outboxID int64, fn func(tx *gorm.DB) error) error { return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var outboxMsg model.AgentOutboxMessage - err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", outboxID).First(&outboxMsg).Error + err := tx.Table(d.tableName()).Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", outboxID).First(&outboxMsg).Error if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil @@ -198,10 +241,46 @@ func (d *Repository) ConsumeAndMarkConsumed(ctx context.Context, outboxID int64, "next_retry_at": nil, "updated_at": now, } - return tx.Model(&model.AgentOutboxMessage{}).Where("id = ?", outboxID).Updates(updates).Error + return tx.Table(d.tableName()).Model(&model.AgentOutboxMessage{}).Where("id = ?", outboxID).Updates(updates).Error }) } +func (d *Repository) scopedDB(ctx context.Context) *gorm.DB { + return d.db.WithContext(ctx).Table(d.tableName()) +} + +func (d *Repository) tableName() string { + if d == nil { + return DefaultServiceRoute(ServiceNameAgent).TableName + } + + route := normalizeServiceRoute(d.route) + if route.TableName != "" { + return route.TableName + } + return DefaultServiceRoute(ServiceNameAgent).TableName +} + +func (d *Repository) resolvePublishRoute(eventType string) (ServiceRoute, error) { + if d == nil { + return ServiceRoute{}, errors.New("outbox repository is nil") + } + + eventType = strings.TrimSpace(eventType) + if eventType == "" { + return ServiceRoute{}, errors.New("eventType is empty") + } + + route, ok := ResolveEventRoute(eventType) + if !ok { + return ServiceRoute{}, fmt.Errorf("outbox route not registered: eventType=%s", eventType) + } + if d.route.ServiceName != "" && route.ServiceName != d.route.ServiceName { + return ServiceRoute{}, fmt.Errorf("eventType %s belongs to service %s, current repo service %s", eventType, route.ServiceName, d.route.ServiceName) + } + return normalizeServiceRoute(route), nil +} + func calcRetryBackoff(retryCount int) time.Duration { if retryCount <= 0 { return time.Second diff --git a/backend/infra/outbox/route_registry.go b/backend/infra/outbox/route_registry.go new file mode 100644 index 0000000..1fedf59 --- /dev/null +++ b/backend/infra/outbox/route_registry.go @@ -0,0 +1,136 @@ +package outbox + +import ( + "errors" + "fmt" + "strings" + "sync" +) + +var outboxRouteRegistry = struct { + sync.RWMutex + eventToService map[string]string + serviceRoutes map[string]ServiceRoute +}{ + eventToService: make(map[string]string), + serviceRoutes: make(map[string]ServiceRoute), +} + +// RegisterServiceRoute 注册或覆盖某个服务的物理 outbox 路由。 +// +// 职责边界: +// 1. 只登记“服务 -> table/topic/group”目录,不登记事件归属; +// 2. 同服务重复注册时以后者覆盖前者,方便显式配置覆盖默认目录; +// 3. 空服务名直接报错,避免把共享 topic 误当成新终态。 +func RegisterServiceRoute(route ServiceRoute) error { + route = normalizeServiceRoute(route) + if route.ServiceName == "" { + return errors.New("serviceName is empty") + } + + outboxRouteRegistry.Lock() + defer outboxRouteRegistry.Unlock() + + outboxRouteRegistry.serviceRoutes[route.ServiceName] = route + return nil +} + +// RegisterEventService 记录“事件类型 -> 服务归属”的全局路由。 +// +// 职责边界: +// 1. 只登记跨进程都要识别的事件归属,不承载 handler 逻辑; +// 2. 同一 event_type 只能归属一个服务,重复登记同值视为幂等; +// 3. 若该服务还没有显式路由,则先写入默认服务目录,保证后续能查到 table/topic/group。 +func RegisterEventService(eventType, serviceName string) error { + eventType = strings.TrimSpace(eventType) + if eventType == "" { + return errors.New("eventType is empty") + } + serviceName = normalizeServiceName(serviceName) + if serviceName == "" { + return errors.New("serviceName is empty") + } + + outboxRouteRegistry.Lock() + defer outboxRouteRegistry.Unlock() + + if existing, ok := outboxRouteRegistry.eventToService[eventType]; ok { + if existing != serviceName { + return fmt.Errorf("eventType %s already registered to service %s", eventType, existing) + } + return nil + } + + outboxRouteRegistry.eventToService[eventType] = serviceName + return nil +} + +// ResolveEventService 查询某个事件类型的归属服务。 +// +// 返回值说明: +// 1. serviceName 为登记结果; +// 2. ok=false 表示当前路由表里还没有这个事件类型的归属信息。 +func ResolveEventService(eventType string) (serviceName string, ok bool) { + eventType = strings.TrimSpace(eventType) + if eventType == "" { + return "", false + } + + outboxRouteRegistry.RLock() + defer outboxRouteRegistry.RUnlock() + + serviceName, ok = outboxRouteRegistry.eventToService[eventType] + return serviceName, ok +} + +// ResolveServiceRoute 查询某个服务的物理 outbox 配置。 +// +// 返回值说明: +// 1. route 始终返回一个可执行的目录结果,未显式注册时回退默认目录; +// 2. ok=true 表示命中显式注册目录,ok=false 表示走默认目录; +// 3. 这样既能支持显式配置覆盖,也能让基础设施在启动初期就有稳定默认值。 +func ResolveServiceRoute(serviceName string) (route ServiceRoute, ok bool) { + serviceName = normalizeServiceName(serviceName) + if serviceName == "" { + return DefaultServiceRoute(""), false + } + + outboxRouteRegistry.RLock() + route, ok = outboxRouteRegistry.serviceRoutes[serviceName] + outboxRouteRegistry.RUnlock() + if ok { + return normalizeServiceRoute(route), true + } + if route, ok = configuredServiceRoute(serviceName); ok { + return route, true + } + return DefaultServiceRoute(serviceName), false +} + +// ResolveEventRoute 先按事件查服务,再按服务查物理目录。 +// +// 返回值说明: +// 1. route 包含事件所在服务的 table/topic/group; +// 2. ok=true 只表示“事件 -> 服务归属”已登记; +// 3. 服务目录若未显式注册,会自动回退到默认目录。 +func ResolveEventRoute(eventType string) (route ServiceRoute, ok bool) { + serviceName, ok := ResolveEventService(eventType) + if !ok { + return ServiceRoute{}, false + } + route, _ = ResolveServiceRoute(serviceName) + return route, true +} + +func configuredServiceRoute(serviceName string) (ServiceRoute, bool) { + cfg, ok := ResolveServiceConfig(serviceName) + if !ok { + return ServiceRoute{}, false + } + return normalizeServiceRoute(ServiceRoute{ + ServiceName: cfg.Name, + TableName: cfg.TableName, + Topic: cfg.Topic, + GroupID: cfg.GroupID, + }), true +} diff --git a/backend/infra/outbox/service_catalog.go b/backend/infra/outbox/service_catalog.go new file mode 100644 index 0000000..9fc487a --- /dev/null +++ b/backend/infra/outbox/service_catalog.go @@ -0,0 +1,168 @@ +package outbox + +import ( + "fmt" + "sort" + "strings" + "sync" + + "github.com/spf13/viper" +) + +const ( + ServiceAgent = "agent" + ServiceTask = "task" + ServiceMemory = "memory" + ServiceActiveScheduler = "active-scheduler" + ServiceNotification = "notification" +) + +// ServiceConfig 描述一个服务级 outbox 的固定归属。 +// +// 职责边界: +// 1. 只描述“事件属于哪个服务、写哪张表、发哪个 topic、用哪个 group”。 +// 2. 不承载具体业务 handler,也不承载 Kafka 消息体格式。 +// 3. 服务级写入、扫描和消费都应从这里读取同一份映射,避免配置漂移。 +type ServiceConfig struct { + Name string + Topic string + GroupID string + TableName string +} + +var serviceCatalogCache = struct { + sync.RWMutex + loaded bool + entries map[string]ServiceConfig +}{ + entries: make(map[string]ServiceConfig), +} + +// LoadServiceConfigs 读取服务级 outbox 目录。 +// +// 说明: +// 1. 先给出默认终态映射,再允许通过配置中心覆盖 topic/groupID/table; +// 2. 该目录只负责服务级 outbox 基础设施,不混入业务逻辑; +// 3. 若某个服务配置缺失,直接使用默认值,避免启动期因为非关键配置崩掉。 +func LoadServiceConfigs() map[string]ServiceConfig { + serviceCatalogCache.Lock() + defer serviceCatalogCache.Unlock() + + if serviceCatalogCache.loaded { + return cloneServiceConfigs(serviceCatalogCache.entries) + } + + entries := map[string]ServiceConfig{ + ServiceAgent: { + Name: ServiceAgent, + Topic: "smartflow.agent.outbox", + GroupID: "smartflow-agent-outbox-consumer", + TableName: "agent_outbox_messages", + }, + ServiceTask: { + Name: ServiceTask, + Topic: "smartflow.task.outbox", + GroupID: "smartflow-task-outbox-consumer", + TableName: "task_outbox_messages", + }, + ServiceMemory: { + Name: ServiceMemory, + Topic: "smartflow.memory.outbox", + GroupID: "smartflow-memory-outbox-consumer", + TableName: "memory_outbox_messages", + }, + ServiceActiveScheduler: { + Name: ServiceActiveScheduler, + Topic: "smartflow.active-scheduler.outbox", + GroupID: "smartflow-active-scheduler-outbox-consumer", + TableName: "active_scheduler_outbox_messages", + }, + ServiceNotification: { + Name: ServiceNotification, + Topic: "smartflow.notification.outbox", + GroupID: "smartflow-notification-outbox-consumer", + TableName: "notification_outbox_messages", + }, + } + + for name, entry := range entries { + entries[name] = overrideServiceConfig(entry) + } + + serviceCatalogCache.entries = entries + serviceCatalogCache.loaded = true + return cloneServiceConfigs(entries) +} + +// ResolveServiceConfig 查询某个服务的 outbox 目录。 +func ResolveServiceConfig(serviceName string) (ServiceConfig, bool) { + serviceName = strings.TrimSpace(serviceName) + if serviceName == "" { + return ServiceConfig{}, false + } + + entries := LoadServiceConfigs() + cfg, ok := entries[serviceName] + return cfg, ok +} + +// ResolveEventServiceConfig 先解析事件归属服务,再返回该服务的 outbox 目录。 +func ResolveEventServiceConfig(eventType string) (ServiceConfig, bool) { + serviceName, ok := ResolveEventService(eventType) + if !ok { + return ServiceConfig{}, false + } + return ResolveServiceConfig(serviceName) +} + +// ServiceTables 返回当前目录中的所有 outbox 表名。 +func ServiceTables() []string { + entries := LoadServiceConfigs() + tables := make([]string, 0, len(entries)) + for _, entry := range entries { + tables = append(tables, entry.TableName) + } + sort.Strings(tables) + return tables +} + +// ServiceNames 返回当前目录中的所有服务名。 +func ServiceNames() []string { + entries := LoadServiceConfigs() + names := make([]string, 0, len(entries)) + for name := range entries { + names = append(names, name) + } + sort.Strings(names) + return names +} + +func overrideServiceConfig(entry ServiceConfig) ServiceConfig { + upperName := strings.TrimSpace(entry.Name) + if upperName == "" { + return entry + } + + topicKey := fmt.Sprintf("outbox.services.%s.topic", upperName) + groupKey := fmt.Sprintf("outbox.services.%s.groupID", upperName) + tableKey := fmt.Sprintf("outbox.services.%s.table", upperName) + + if topic := strings.TrimSpace(viper.GetString(topicKey)); topic != "" { + entry.Topic = topic + } + if groupID := strings.TrimSpace(viper.GetString(groupKey)); groupID != "" { + entry.GroupID = groupID + } + if tableName := strings.TrimSpace(viper.GetString(tableKey)); tableName != "" { + entry.TableName = tableName + } + return entry +} + +func cloneServiceConfigs(entries map[string]ServiceConfig) map[string]ServiceConfig { + cloned := make(map[string]ServiceConfig, len(entries)) + for name, entry := range entries { + cloned[name] = entry + } + return cloned +} diff --git a/backend/infra/outbox/service_route.go b/backend/infra/outbox/service_route.go new file mode 100644 index 0000000..88e7ef7 --- /dev/null +++ b/backend/infra/outbox/service_route.go @@ -0,0 +1,145 @@ +package outbox + +import ( + "strings" +) + +const ( + ServiceNameAgent = "agent" + ServiceNameTask = "task" + ServiceNameMemory = "memory" + ServiceNameActiveScheduler = "active-scheduler" + ServiceNameNotification = "notification" +) + +// ServiceRoute 描述一个 outbox 服务的终态路由信息。 +// +// 职责边界: +// 1. 只承载服务级 outbox 的 table/topic/group 目录信息; +// 2. 不承载 handler、事务或 Kafka 连接对象; +// 3. 允许上层按事件类型先查服务,再由服务查到自己的物理资源。 +type ServiceRoute struct { + ServiceName string + TableName string + Topic string + GroupID string +} + +var builtinServiceRoutes = map[string]ServiceRoute{ + ServiceNameAgent: { + ServiceName: ServiceNameAgent, + TableName: "agent_outbox_messages", + Topic: "smartflow.agent.outbox", + GroupID: "smartflow-agent-outbox-consumer", + }, + ServiceNameTask: { + ServiceName: ServiceNameTask, + TableName: "task_outbox_messages", + Topic: "smartflow.task.outbox", + GroupID: "smartflow-task-outbox-consumer", + }, + ServiceNameMemory: { + ServiceName: ServiceNameMemory, + TableName: "memory_outbox_messages", + Topic: "smartflow.memory.outbox", + GroupID: "smartflow-memory-outbox-consumer", + }, + ServiceNameActiveScheduler: { + ServiceName: ServiceNameActiveScheduler, + TableName: "active_scheduler_outbox_messages", + Topic: "smartflow.active-scheduler.outbox", + GroupID: "smartflow-active-scheduler-outbox-consumer", + }, + ServiceNameNotification: { + ServiceName: ServiceNameNotification, + TableName: "notification_outbox_messages", + Topic: "smartflow.notification.outbox", + GroupID: "smartflow-notification-outbox-consumer", + }, +} + +// DefaultServiceRoutes 返回当前已知服务的默认路由清单。 +// +// 说明: +// 1. 这里是“目录初始值”,用于自动建表和首次注册时兜底; +// 2. 运行时若显式注册了服务路由,会以显式注册结果为准; +// 3. 返回值是拷贝,调用方可安全遍历,不会污染全局目录。 +func DefaultServiceRoutes() []ServiceRoute { + return []ServiceRoute{ + builtinServiceRoutes[ServiceNameAgent], + builtinServiceRoutes[ServiceNameTask], + builtinServiceRoutes[ServiceNameMemory], + builtinServiceRoutes[ServiceNameActiveScheduler], + builtinServiceRoutes[ServiceNameNotification], + } +} + +// DefaultServiceRoute 根据服务名生成终态路由。 +// +// 规则: +// 1. 已知服务直接返回约定映射; +// 2. 未知服务按命名约定生成 table/topic/group,避免继续落回共享 topic; +// 3. 空服务名回退到 agent 兼容路径,保住历史单体模式。 +func DefaultServiceRoute(serviceName string) ServiceRoute { + serviceName = normalizeServiceName(serviceName) + if serviceName == "" { + serviceName = ServiceNameAgent + } + if route, ok := builtinServiceRoutes[serviceName]; ok { + return route + } + + tablePrefix := strings.NewReplacer("-", "_").Replace(serviceName) + if tablePrefix == "" { + tablePrefix = ServiceNameAgent + } + + return ServiceRoute{ + ServiceName: serviceName, + TableName: tablePrefix + "_outbox_messages", + Topic: "smartflow." + serviceName + ".outbox", + GroupID: "smartflow-" + serviceName + "-outbox-consumer", + } +} + +func normalizeServiceName(serviceName string) string { + return strings.TrimSpace(serviceName) +} + +// normalizeServiceRoute 把空字段补成可执行的默认值。 +// +// 说明: +// 1. 只做字符串裁剪和缺省补齐,不做注册副作用; +// 2. 服务名为空时只保留历史兼容路径,不强行把它当成新服务; +// 3. 这一步是 route 目录的最后一道兜底,避免上层拿到半成品路由。 +func normalizeServiceRoute(route ServiceRoute) ServiceRoute { + route.ServiceName = normalizeServiceName(route.ServiceName) + route.TableName = strings.TrimSpace(route.TableName) + route.Topic = strings.TrimSpace(route.Topic) + route.GroupID = strings.TrimSpace(route.GroupID) + + if route.ServiceName == "" { + if route.TableName == "" { + route.TableName = builtinServiceRoutes[ServiceNameAgent].TableName + } + if route.Topic == "" { + route.Topic = builtinServiceRoutes[ServiceNameAgent].Topic + } + if route.GroupID == "" { + route.GroupID = builtinServiceRoutes[ServiceNameAgent].GroupID + } + return route + } + + defaultRoute := DefaultServiceRoute(route.ServiceName) + if route.TableName == "" { + route.TableName = defaultRoute.TableName + } + if route.Topic == "" { + route.Topic = defaultRoute.Topic + } + if route.GroupID == "" { + route.GroupID = defaultRoute.GroupID + } + return route +} diff --git a/backend/inits/mysql.go b/backend/inits/mysql.go index 6b13a5f..fa2cb1d 100644 --- a/backend/inits/mysql.go +++ b/backend/inits/mysql.go @@ -4,6 +4,7 @@ import ( "fmt" "log" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" "github.com/LoveLosita/smartflow/backend/model" "github.com/spf13/viper" "gorm.io/driver/mysql" @@ -26,7 +27,6 @@ func autoMigrateModels(db *gorm.DB) error { &model.ActiveSchedulePreview{}, &model.NotificationRecord{}, &model.UserNotificationChannel{}, - &model.AgentOutboxMessage{}, &model.AgentScheduleState{}, &model.ActiveScheduleSession{}, &model.AgentStateSnapshotRecord{}, @@ -41,12 +41,36 @@ func autoMigrateModels(db *gorm.DB) error { return fmt.Errorf("auto migrate failed for %T: %w", m, err) } } + if err := autoMigrateOutboxTables(db); err != nil { + return err + } if err := backfillAutoMigrateData(db); err != nil { return err } return nil } +// autoMigrateOutboxTables 按服务目录一次性创建各服务的 outbox 物理表。 +// +// 职责边界: +// 1. 只创建 outbox 目录,不改写业务表; +// 2. 每张表都复用同一套模型结构,保证字段和索引一致; +// 3. 这里显式列出服务目录,避免把共享单表误当成终态。 +func autoMigrateOutboxTables(db *gorm.DB) error { + // 1. 这里必须按服务目录读取最终生效的 table 名,而不能只看默认内置映射。 + // 2. 这样即使后续通过配置覆盖 outbox.services.*.table,启动建表也会和运行时写入保持一致。 + for _, serviceName := range outboxinfra.ServiceNames() { + cfg, ok := outboxinfra.ResolveServiceConfig(serviceName) + if !ok { + return fmt.Errorf("resolve outbox config failed for service %s", serviceName) + } + if err := db.Table(cfg.TableName).AutoMigrate(&model.AgentOutboxMessage{}); err != nil { + return fmt.Errorf("auto migrate outbox table failed for %s (%s): %w", cfg.Name, cfg.TableName, err) + } + } + return nil +} + // backfillAutoMigrateData 补齐 AutoMigrate 无法表达的条件回填。 // // 职责边界: diff --git a/backend/main.go b/backend/main.go index 3634462..bdf7979 100644 --- a/backend/main.go +++ b/backend/main.go @@ -4,6 +4,8 @@ import ( "github.com/LoveLosita/smartflow/backend/cmd" ) +// main 保留仓库根入口的兼容壳,阶段 0 期间仍转发到 cmd.Start()。 +// 终态会逐步迁移为各服务各自的独立 main.go。 func main() { cmd.Start() } diff --git a/backend/model/outbox.go b/backend/model/outbox.go index 477311d..ddb10a1 100644 --- a/backend/model/outbox.go +++ b/backend/model/outbox.go @@ -22,10 +22,11 @@ const ( type AgentOutboxMessage struct { ID int64 `gorm:"column:id;primaryKey;autoIncrement"` - EventType string `gorm:"column:biz_type;type:varchar(64);not null;index:idx_outbox_status_next,priority:3;comment:事件类型"` - Topic string `gorm:"column:topic;type:varchar(128);not null;comment:Kafka Topic"` - MessageKey string `gorm:"column:message_key;type:varchar(128);not null;comment:Kafka 消息键"` - Payload string `gorm:"column:payload;type:longtext;not null;comment:业务载荷(JSON)"` + EventType string `gorm:"column:biz_type;type:varchar(64);not null;index:idx_outbox_status_next,priority:3;comment:事件类型"` + ServiceName string `gorm:"column:service_name;type:varchar(64);not null;default:'';index:idx_outbox_service_name,priority:1;comment:所属服务"` + Topic string `gorm:"column:topic;type:varchar(128);not null;comment:Kafka Topic"` + MessageKey string `gorm:"column:message_key;type:varchar(128);not null;comment:Kafka 消息键"` + Payload string `gorm:"column:payload;type:longtext;not null;comment:业务载荷(JSON)"` Status string `gorm:"column:status;type:varchar(32);not null;index:idx_outbox_status_next,priority:1;comment:pending/published/consumed/dead"` RetryCount int `gorm:"column:retry_count;not null;default:0;comment:已重试次数"` @@ -40,5 +41,8 @@ type AgentOutboxMessage struct { } func (AgentOutboxMessage) TableName() string { + // 1. 这里保留历史兼容默认表名,避免非 outbox 基础设施调用直接失效。 + // 2. 服务级多表路由由 backend/infra/outbox 显式通过 db.Table(...) 控制。 + // 3. 这样既能兼容旧代码,也不会把共享单表当成终态。 return "agent_outbox_messages" } diff --git a/backend/routers/routers.go b/backend/routers/routers.go index 81e5b70..43b71db 100644 --- a/backend/routers/routers.go +++ b/backend/routers/routers.go @@ -1,9 +1,13 @@ // Package routers 路由配置 -// 定义所有HTTP路由和路由组 +// 定义所有 HTTP 路由和路由组 package routers import ( + "context" + "errors" "log" + "net/http" + "time" "github.com/LoveLosita/smartflow/backend/api" "github.com/LoveLosita/smartflow/backend/dao" @@ -13,23 +17,45 @@ import ( "github.com/spf13/viper" ) -// StartEngine 注册路由 -func StartEngine(r *gin.Engine) { - // 从配置中获取端口 +// StartEngine 启动 HTTP 服务,并在上下文取消时尽量优雅退出。 +func StartEngine(ctx context.Context, r *gin.Engine) { + // 1. 先解析端口,保持和历史行为一致。 + // 2. 再用 http.Server 托管 gin engine,方便在取消信号到来时执行 Shutdown。 port := viper.GetString("server.port") if port == "" { - port = "8080" // 默认端口 + port = "8080" } - // 启动服务器 - log.Printf("Server starting on port %s...", port) - if err := r.Run(":" + port); err != nil { - log.Fatalf("Failed to start server: %v", err) + srv := &http.Server{ + Addr: ":" + port, + Handler: r, + } + + errCh := make(chan error, 1) + go func() { + log.Printf("Server starting on port %s...", port) + errCh <- srv.ListenAndServe() + }() + + select { + case <-ctx.Done(): + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(shutdownCtx); err != nil && !errors.Is(err, context.Canceled) { + log.Printf("Failed to shutdown server gracefully: %v", err) + } + if err := <-errCh; err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("Failed to start server: %v", err) + } + case err := <-errCh: + if err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("Failed to start server: %v", err) + } } } func RegisterRouters(handlers *api.ApiHandlers, cache *dao.CacheDAO, userRepo *dao.UserDAO, limiter *pkg.RateLimiter) *gin.Engine { - // 初始化Gin引擎 + // 初始化 Gin 引擎 r := gin.Default() // 在这里注册所有的路由和路由组 apiGroup := r.Group("/api/v1") @@ -130,7 +156,7 @@ func RegisterRouters(handlers *api.ApiHandlers, cache *dao.CacheDAO, userRepo *d notificationGroup.POST("/channels/feishu/test", handlers.Notification.TestFeishuWebhook) } } - // 初始化Gin引擎 + // 初始化 Gin 引擎 log.Println("Routes setup completed") return r } diff --git a/backend/service/events/active_schedule_triggered.go b/backend/service/events/active_schedule_triggered.go index defea80..a469a52 100644 --- a/backend/service/events/active_schedule_triggered.go +++ b/backend/service/events/active_schedule_triggered.go @@ -32,7 +32,7 @@ type ActiveScheduleTriggeredProcessor interface { // 3. 若事务返回 error,则 best-effort 回写 trigger failed,并把错误交给 outbox 做 retry; // 4. 这里不直接 import active_scheduler 的具体实现,避免 service/events 和业务编排层互相反向耦合。 func RegisterActiveScheduleTriggeredHandler( - bus *outboxinfra.EventBus, + bus OutboxBus, outboxRepo *outboxinfra.Repository, processor ActiveScheduleTriggeredProcessor, ) error { @@ -45,24 +45,28 @@ func RegisterActiveScheduleTriggeredHandler( if processor == nil { return errors.New("active schedule triggered processor is nil") } + eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, sharedevents.ActiveScheduleTriggeredEventType) + if err != nil { + return err + } handler := func(ctx context.Context, envelope kafkabus.Envelope) error { if !isAllowedTriggeredEventVersion(envelope.EventVersion) { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, fmt.Sprintf("active_schedule.triggered 版本不受支持: %s", envelope.EventVersion)) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, fmt.Sprintf("active_schedule.triggered 版本不受支持: %s", envelope.EventVersion)) return nil } var payload sharedevents.ActiveScheduleTriggeredPayload if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析 active_schedule.triggered 载荷失败: "+unmarshalErr.Error()) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析 active_schedule.triggered 载荷失败: "+unmarshalErr.Error()) return nil } if validateErr := payload.Validate(); validateErr != nil { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "active_schedule.triggered 载荷非法: "+validateErr.Error()) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "active_schedule.triggered 载荷非法: "+validateErr.Error()) return nil } - err := outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { + err := eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { return processor.ProcessTriggeredInTx(ctx, tx, payload) }) if err != nil { diff --git a/backend/service/events/agent_state_persist.go b/backend/service/events/agent_state_persist.go index 9d397a1..9265f41 100644 --- a/backend/service/events/agent_state_persist.go +++ b/backend/service/events/agent_state_persist.go @@ -35,7 +35,7 @@ type AgentStateSnapshotPayload struct { // 2. 使用 upsert 语义,同一 conversation_id 只保留最新快照; // 3. 通过 outbox 通用消费事务保证"业务写入 + consumed 推进"原子一致。 func RegisterAgentStateSnapshotHandler( - bus *outboxinfra.EventBus, + bus OutboxBus, outboxRepo *outboxinfra.Repository, repoManager *dao.RepoManager, ) error { @@ -48,15 +48,19 @@ func RegisterAgentStateSnapshotHandler( if repoManager == nil { return errors.New("repo manager is nil") } + eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeAgentStateSnapshotPersist) + if err != nil { + return err + } handler := func(ctx context.Context, envelope kafkabus.Envelope) error { var payload AgentStateSnapshotPayload if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析快照载荷失败: "+unmarshalErr.Error()) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析快照载荷失败: "+unmarshalErr.Error()) return nil } - return outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { + return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { record := model.AgentStateSnapshotRecord{ ConversationID: payload.ConversationID, UserID: payload.UserID, diff --git a/backend/service/events/agent_timeline_persist.go b/backend/service/events/agent_timeline_persist.go index c20bb4c..ee72410 100644 --- a/backend/service/events/agent_timeline_persist.go +++ b/backend/service/events/agent_timeline_persist.go @@ -25,7 +25,7 @@ const EventTypeAgentTimelinePersistRequested = "agent.timeline.persist.requested // 3. 通过 outbox 通用消费事务,把“时间线写库 + consumed 推进”放进同一事务; // 4. 若遇到 seq 唯一键冲突,会先判定是否属于重放幂等,再决定是否补新 seq 并回填 Redis。 func RegisterAgentTimelinePersistHandler( - bus *outboxinfra.EventBus, + bus OutboxBus, outboxRepo *outboxinfra.Repository, agentRepo *dao.AgentDAO, cacheDAO *dao.CacheDAO, @@ -40,12 +40,16 @@ func RegisterAgentTimelinePersistHandler( if agentRepo == nil { return errors.New("agent repo is nil") } + eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeAgentTimelinePersistRequested) + if err != nil { + return err + } handler := func(ctx context.Context, envelope kafkabus.Envelope) error { var payload model.ChatTimelinePersistPayload if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { // 1. payload 无法反序列化属于不可恢复错误,直接标 dead,避免无意义重试。 - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析时间线持久化载荷失败: "+unmarshalErr.Error()) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析时间线持久化载荷失败: "+unmarshalErr.Error()) return nil } @@ -53,7 +57,7 @@ func RegisterAgentTimelinePersistHandler( if !payload.HasValidIdentity() { // 2. 这里只校验“能否唯一定位一条 timeline 记录”的最小字段集合。 // 3. content / payload_json 是否为空由事件类型自行决定,不在这里一刀切限制。 - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "时间线持久化载荷非法: user_id/conversation_id/seq/kind 非法") + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "时间线持久化载荷非法: user_id/conversation_id/seq/kind 非法") return nil } @@ -61,7 +65,7 @@ func RegisterAgentTimelinePersistHandler( finalSeq := payload.Seq // 4. 统一走 outbox 消费事务入口,保证“业务写入成功 -> consumed”原子一致。 - err := outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { + err := eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { finalPayload, repaired, persistErr := persistConversationTimelineEventInTx(ctx, tx, agentRepo.WithTx(tx), payload) if persistErr != nil { return persistErr diff --git a/backend/service/events/chat_history_persist.go b/backend/service/events/chat_history_persist.go index 5f2ab5c..5d0300f 100644 --- a/backend/service/events/chat_history_persist.go +++ b/backend/service/events/chat_history_persist.go @@ -30,7 +30,7 @@ const ( // 3. 通过 outbox 通用事务入口把"业务写入 + consumed 推进"合并为一个事务; // 4. 当前版本仅注册新路由键(chat.history.persist.requested),不再注册旧兼容键。 func RegisterChatHistoryPersistHandler( - bus *outboxinfra.EventBus, + bus OutboxBus, outboxRepo *outboxinfra.Repository, repoManager *dao.RepoManager, ) error { @@ -44,6 +44,10 @@ func RegisterChatHistoryPersistHandler( if repoManager == nil { return errors.New("repo manager is nil") } + eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeChatHistoryPersistRequested) + if err != nil { + return err + } // 2. 定义统一处理器: // 2.1 解析 payload; @@ -53,12 +57,12 @@ func RegisterChatHistoryPersistHandler( var payload model.ChatHistoryPersistPayload if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { // 2.1 payload 非法属于不可恢复错误,直接标 dead,避免无意义重试。 - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析聊天持久化载荷失败: "+unmarshalErr.Error()) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析聊天持久化载荷失败: "+unmarshalErr.Error()) return nil } // 2.2 使用 outbox 通用消费事务,保证"业务写入 + consumed 状态推进"原子一致。 - return outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { + return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { // 2.2.1 基于同一个 tx 构造 RepoManager,复用你现有跨包事务模型。 txM := repoManager.WithTx(tx) // 2.2.2 在同事务内写入聊天历史与会话计数。 diff --git a/backend/service/events/chat_token_usage_adjust.go b/backend/service/events/chat_token_usage_adjust.go index 673ec17..3e1fb0c 100644 --- a/backend/service/events/chat_token_usage_adjust.go +++ b/backend/service/events/chat_token_usage_adjust.go @@ -30,7 +30,7 @@ const ( // 2. 通过 outbox 统一消费事务入口,保证“业务成功 + consumed 推进”原子一致; // 3. 非法载荷直接标记 dead,避免无意义重试。 func RegisterChatTokenUsageAdjustHandler( - bus *outboxinfra.EventBus, + bus OutboxBus, outboxRepo *outboxinfra.Repository, repoManager *dao.RepoManager, ) error { @@ -43,20 +43,24 @@ func RegisterChatTokenUsageAdjustHandler( if repoManager == nil { return errors.New("repo manager is nil") } + eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeChatTokenUsageAdjustRequested) + if err != nil { + return err + } handler := func(ctx context.Context, envelope kafkabus.Envelope) error { var payload model.ChatTokenUsageAdjustPayload if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析会话 token 调整载荷失败: "+unmarshalErr.Error()) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析会话 token 调整载荷失败: "+unmarshalErr.Error()) return nil } if payload.UserID <= 0 || payload.TokensDelta <= 0 || payload.ConversationID == "" { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "会话 token 调整载荷无效: user_id/conversation_id/tokens_delta 非法") + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "会话 token 调整载荷无效: user_id/conversation_id/tokens_delta 非法") return nil } - return outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { + return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { txM := repoManager.WithTx(tx) return txM.Agent.AdjustTokenUsageInTx(ctx, payload.UserID, payload.ConversationID, payload.TokensDelta) }) diff --git a/backend/service/events/core_outbox_handlers.go b/backend/service/events/core_outbox_handlers.go index 179b325..bd89922 100644 --- a/backend/service/events/core_outbox_handlers.go +++ b/backend/service/events/core_outbox_handlers.go @@ -2,11 +2,12 @@ package events import ( "errors" - "fmt" "github.com/LoveLosita/smartflow/backend/dao" outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" "github.com/LoveLosita/smartflow/backend/memory" + "github.com/LoveLosita/smartflow/backend/notification" + sharedevents "github.com/LoveLosita/smartflow/backend/shared/events" ) // RegisterCoreOutboxHandlers 注册核心业务 outbox handler。 @@ -15,9 +16,9 @@ import ( // 1. 只负责聚合注册当前核心业务 handler,便于 start / worker/all 等启动入口复用同一套接线顺序。 // 2. 不负责创建 eventBus/outboxRepo/DAO/memoryModule,也不负责启动或关闭事件总线。 // 3. 不改变单个 Register* 函数的职责;具体 payload 解析、幂等消费和业务落库仍由各自 handler 负责。 -// 4. 入口先完整校验依赖,避免注册到一半才发现依赖缺失,导致事件总线处于半注册状态。 +// 4. 这里以显式 route table 的方式列出“事件类型 -> 服务归属 -> handler”,避免后续新增事件时只改启动入口不改接线表。 func RegisterCoreOutboxHandlers( - eventBus *outboxinfra.EventBus, + eventBus OutboxBus, outboxRepo *outboxinfra.Repository, repoManager *dao.RepoManager, agentRepo *dao.AgentDAO, @@ -28,28 +29,39 @@ func RegisterCoreOutboxHandlers( return err } - // 1. 按照现有 start.go 的接线顺序注册,保证迁移到 worker/all 后消费行为不发生隐式变化。 - // 2. 每一步只包一层业务语义错误,便于启动日志直接定位是哪类 handler 注册失败。 - if err := RegisterChatHistoryPersistHandler(eventBus, outboxRepo, repoManager); err != nil { - return fmt.Errorf("注册聊天历史持久化 handler 失败: %w", err) - } - if err := RegisterTaskUrgencyPromoteHandler(eventBus, outboxRepo, repoManager); err != nil { - return fmt.Errorf("注册任务紧急度平移 handler 失败: %w", err) - } - if err := RegisterChatTokenUsageAdjustHandler(eventBus, outboxRepo, repoManager); err != nil { - return fmt.Errorf("注册会话 token 调整 handler 失败: %w", err) - } - if err := RegisterAgentStateSnapshotHandler(eventBus, outboxRepo, repoManager); err != nil { - return fmt.Errorf("注册 agent 状态快照 handler 失败: %w", err) - } - if err := RegisterAgentTimelinePersistHandler(eventBus, outboxRepo, agentRepo, cacheRepo); err != nil { - return fmt.Errorf("注册 agent 时间线持久化 handler 失败: %w", err) - } - if err := RegisterMemoryExtractRequestedHandler(eventBus, outboxRepo, memoryModule); err != nil { - return fmt.Errorf("注册记忆抽取 handler 失败: %w", err) + return registerOutboxHandlerRoutes(coreOutboxHandlerRoutes(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule)) +} + +// RegisterAllOutboxHandlers 注册当前阶段所有 outbox handler。 +// +// 职责边界: +// 1. 只负责把 core / active_scheduler / notification 三类路由一次性接线; +// 2. 不负责创建依赖,也不负责启动事件总线; +// 3. 供当前启动流程在“总线启动前”统一完成显式路由注册。 +func RegisterAllOutboxHandlers( + eventBus OutboxBus, + outboxRepo *outboxinfra.Repository, + repoManager *dao.RepoManager, + agentRepo *dao.AgentDAO, + cacheRepo *dao.CacheDAO, + memoryModule *memory.Module, + activeTriggerWorkflow ActiveScheduleTriggeredProcessor, + notificationService *notification.NotificationService, +) error { + if err := validateAllOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule, activeTriggerWorkflow, notificationService); err != nil { + return err } - return nil + return registerOutboxHandlerRoutes(allOutboxHandlerRoutes( + eventBus, + outboxRepo, + repoManager, + agentRepo, + cacheRepo, + memoryModule, + activeTriggerWorkflow, + notificationService, + )) } // validateCoreOutboxHandlerDeps 校验核心 outbox handler 聚合注册所需依赖。 @@ -58,7 +70,7 @@ func RegisterCoreOutboxHandlers( // 1. 只做 nil 校验,不做数据库、Redis、Kafka 连通性探测,避免注册函数承担启动健康检查职责。 // 2. 返回 error 表示依赖缺失;返回 nil 表示可以安全进入逐项注册流程。 func validateCoreOutboxHandlerDeps( - eventBus *outboxinfra.EventBus, + eventBus OutboxBus, outboxRepo *outboxinfra.Repository, repoManager *dao.RepoManager, agentRepo *dao.AgentDAO, @@ -85,3 +97,112 @@ func validateCoreOutboxHandlerDeps( } return nil } + +// validateAllOutboxHandlerDeps 在核心依赖基础上,额外校验 active_scheduler 和 notification 相关依赖。 +func validateAllOutboxHandlerDeps( + eventBus OutboxBus, + outboxRepo *outboxinfra.Repository, + repoManager *dao.RepoManager, + agentRepo *dao.AgentDAO, + cacheRepo *dao.CacheDAO, + memoryModule *memory.Module, + activeTriggerWorkflow ActiveScheduleTriggeredProcessor, + notificationService *notification.NotificationService, +) error { + if err := validateCoreOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule); err != nil { + return err + } + if activeTriggerWorkflow == nil { + return errors.New("active schedule triggered processor is nil") + } + if notificationService == nil { + return errors.New("notification service is nil") + } + return nil +} + +// coreOutboxHandlerRoutes 只描述 core 阶段的 outbox 路由。 +func coreOutboxHandlerRoutes( + eventBus OutboxBus, + outboxRepo *outboxinfra.Repository, + repoManager *dao.RepoManager, + agentRepo *dao.AgentDAO, + cacheRepo *dao.CacheDAO, + memoryModule *memory.Module, +) []outboxHandlerRoute { + return []outboxHandlerRoute{ + { + EventType: EventTypeChatHistoryPersistRequested, + Service: outboxHandlerServiceAgent, + Register: func() error { + return RegisterChatHistoryPersistHandler(eventBus, outboxRepo, repoManager) + }, + }, + { + EventType: EventTypeTaskUrgencyPromoteRequested, + Service: outboxHandlerServiceTask, + Register: func() error { + return RegisterTaskUrgencyPromoteHandler(eventBus, outboxRepo, repoManager) + }, + }, + { + EventType: EventTypeChatTokenUsageAdjustRequested, + Service: outboxHandlerServiceAgent, + Register: func() error { + return RegisterChatTokenUsageAdjustHandler(eventBus, outboxRepo, repoManager) + }, + }, + { + EventType: EventTypeAgentStateSnapshotPersist, + Service: outboxHandlerServiceAgent, + Register: func() error { + return RegisterAgentStateSnapshotHandler(eventBus, outboxRepo, repoManager) + }, + }, + { + EventType: EventTypeAgentTimelinePersistRequested, + Service: outboxHandlerServiceAgent, + Register: func() error { + return RegisterAgentTimelinePersistHandler(eventBus, outboxRepo, agentRepo, cacheRepo) + }, + }, + { + EventType: EventTypeMemoryExtractRequested, + Service: outboxHandlerServiceMemory, + Register: func() error { + return RegisterMemoryExtractRequestedHandler(eventBus, outboxRepo, memoryModule) + }, + }, + } +} + +// allOutboxHandlerRoutes 把当前阶段所有 outbox 路由一次性展开,供启动入口统一接线。 +func allOutboxHandlerRoutes( + eventBus OutboxBus, + outboxRepo *outboxinfra.Repository, + repoManager *dao.RepoManager, + agentRepo *dao.AgentDAO, + cacheRepo *dao.CacheDAO, + memoryModule *memory.Module, + activeTriggerWorkflow ActiveScheduleTriggeredProcessor, + notificationService *notification.NotificationService, +) []outboxHandlerRoute { + routes := coreOutboxHandlerRoutes(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule) + routes = append(routes, + outboxHandlerRoute{ + EventType: sharedevents.ActiveScheduleTriggeredEventType, + Service: outboxHandlerServiceActiveScheduler, + Register: func() error { + return RegisterActiveScheduleTriggeredHandler(eventBus, outboxRepo, activeTriggerWorkflow) + }, + }, + outboxHandlerRoute{ + EventType: sharedevents.NotificationFeishuRequestedEventType, + Service: outboxHandlerServiceNotification, + Register: func() error { + return RegisterFeishuNotificationHandler(eventBus, outboxRepo, notificationService) + }, + }, + ) + return routes +} diff --git a/backend/service/events/memory_extract_requested.go b/backend/service/events/memory_extract_requested.go index 7568117..3cb13c5 100644 --- a/backend/service/events/memory_extract_requested.go +++ b/backend/service/events/memory_extract_requested.go @@ -33,7 +33,7 @@ const ( // 2. 不在消费回调里执行 LLM 重计算; // 3. 通过 memory.Module.WithTx(tx) 复用同一套接入门面,保证事务边界仍由 outbox 掌控。 func RegisterMemoryExtractRequestedHandler( - bus *outboxinfra.EventBus, + bus OutboxBus, outboxRepo *outboxinfra.Repository, memoryModule *memory.Module, ) error { @@ -46,20 +46,24 @@ func RegisterMemoryExtractRequestedHandler( if memoryModule == nil { return errors.New("memory module is nil") } + eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeMemoryExtractRequested) + if err != nil { + return err + } handler := func(ctx context.Context, envelope kafkabus.Envelope) error { var payload model.MemoryExtractRequestedPayload if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析记忆抽取载荷失败: "+unmarshalErr.Error()) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析记忆抽取载荷失败: "+unmarshalErr.Error()) return nil } if validateErr := validateMemoryExtractPayload(payload); validateErr != nil { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "记忆抽取载荷非法: "+validateErr.Error()) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "记忆抽取载荷非法: "+validateErr.Error()) return nil } - return outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { + return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { jobPayload := memorymodel.ExtractJobPayload{ UserID: payload.UserID, ConversationID: strings.TrimSpace(payload.ConversationID), @@ -87,7 +91,7 @@ func RegisterMemoryExtractRequestedHandler( func EnqueueMemoryExtractRequestedInTx( ctx context.Context, outboxRepo *outboxinfra.Repository, - kafkaCfg kafkabus.Config, + maxRetry int, chatPayload model.ChatHistoryPersistPayload, ) error { if !isMemoryWriteEnabled() { @@ -107,6 +111,10 @@ func EnqueueMemoryExtractRequestedInTx( return err } + if maxRetry <= 0 { + maxRetry = 20 + } + outboxPayload := outboxinfra.OutboxEventPayload{ EventType: EventTypeMemoryExtractRequested, EventVersion: outboxinfra.DefaultEventVersion, @@ -114,13 +122,14 @@ func EnqueueMemoryExtractRequestedInTx( Payload: payloadJSON, } + // 1. 这里只传 eventType 与消息键,服务归属、outbox 表和 Kafka topic 统一交给仓库路由层解析。 + // 2. 这样聊天持久化链路不会继续感知 memory 服务的物理 topic,避免拆服务时出现双写口径。 _, err = outboxRepo.CreateMessage( ctx, EventTypeMemoryExtractRequested, - kafkaCfg.Topic, strings.TrimSpace(chatPayload.ConversationID), outboxPayload, - kafkaCfg.MaxRetry, + maxRetry, ) return err } diff --git a/backend/service/events/notification_feishu.go b/backend/service/events/notification_feishu.go index 9e65504..41ea53c 100644 --- a/backend/service/events/notification_feishu.go +++ b/backend/service/events/notification_feishu.go @@ -20,7 +20,7 @@ import ( // 2. 不承担 notification_records 状态机细节,状态流转全部下沉到 notification 模块; // 3. 不在 handler 内部创建 provider/service,避免事件消费与 retry loop 使用两套不同配置。 func RegisterFeishuNotificationHandler( - bus *outboxinfra.EventBus, + bus OutboxBus, outboxRepo *outboxinfra.Repository, svc *notification.NotificationService, ) error { @@ -33,23 +33,27 @@ func RegisterFeishuNotificationHandler( if svc == nil { return errors.New("notification service is nil") } + eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, sharedevents.NotificationFeishuRequestedEventType) + if err != nil { + return err + } handler := func(ctx context.Context, envelope kafkabus.Envelope) error { // 1. 先校验 event_version,避免未来协议破坏性升级后旧 handler 误吃新消息。 // 2. 当前阶段只接受 v1;版本不匹配属于不可恢复协议错误,直接标记 dead。 eventVersion := strings.TrimSpace(envelope.EventVersion) if eventVersion != "" && eventVersion != sharedevents.NotificationFeishuRequestedEventVersion { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "notification.feishu.requested event_version 不匹配: "+eventVersion) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "notification.feishu.requested event_version 不匹配: "+eventVersion) return nil } var payload sharedevents.FeishuNotificationRequestedPayload if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析 notification.feishu.requested 载荷失败: "+unmarshalErr.Error()) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析 notification.feishu.requested 载荷失败: "+unmarshalErr.Error()) return nil } if validateErr := payload.Validate(); validateErr != nil { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "notification.feishu.requested 载荷非法: "+validateErr.Error()) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "notification.feishu.requested 载荷非法: "+validateErr.Error()) return nil } @@ -58,7 +62,7 @@ func RegisterFeishuNotificationHandler( return handleErr } - if consumeErr := outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, nil); consumeErr != nil { + if consumeErr := eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, nil); consumeErr != nil { return consumeErr } diff --git a/backend/service/events/outbox_bus.go b/backend/service/events/outbox_bus.go new file mode 100644 index 0000000..2de4cfe --- /dev/null +++ b/backend/service/events/outbox_bus.go @@ -0,0 +1,217 @@ +package events + +import ( + "context" + "errors" + "fmt" + "sort" + "strings" + + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" +) + +// OutboxBus 是启动侧和业务侧共享的 outbox 门面。 +// +// 职责边界: +// 1. 对上只暴露 Publish / RegisterEventHandler / Start / Close 这四个能力; +// 2. 对内可以按 service 维度路由到多个底层 engine; +// 3. 不承载任何业务处理逻辑,只做事件归属、topic/group 和 engine 选择。 +type OutboxBus interface { + outboxinfra.EventPublisher + RegisterEventHandler(eventType string, handler outboxinfra.MessageHandler) error + Start(ctx context.Context) + Close() +} + +type routedOutboxBus struct { + buses map[string]OutboxBus +} + +// NewRoutedOutboxBus 把多个 service 级 outbox bus 组装成一个门面。 +// +// 1. 这里不创建底层 engine,只接收上层已经建好的 service bus; +// 2. 事件发布和 handler 注册都按事件归属路由到对应 service; +// 3. 任一 service bus 缺失时直接报错,避免静默回落到共享 topic。 +func NewRoutedOutboxBus(buses map[string]OutboxBus) OutboxBus { + normalized := make(map[string]OutboxBus, len(buses)) + for serviceName, bus := range buses { + serviceName = strings.TrimSpace(serviceName) + if serviceName == "" || bus == nil { + continue + } + normalized[serviceName] = bus + } + if len(normalized) == 0 { + return nil + } + return &routedOutboxBus{buses: normalized} +} + +// NewServiceOutboxBus 基于 service 级 topic / group 创建底层 outbox engine。 +// +// 1. topic / group 由 service 名称推导,不再要求调用方显式传入共享 topic; +// 2. kafka 未启用时返回 nil,调用侧可以继续走同步降级路径; +// 3. 这里不注册 handler,注册仍由启动侧统一完成。 +func NewServiceOutboxBus(repo *outboxinfra.Repository, baseCfg kafkabus.Config, serviceName string) (OutboxBus, error) { + if repo == nil { + return nil, errors.New("outbox repository is nil") + } + + serviceName = strings.TrimSpace(serviceName) + if serviceName == "" { + return nil, errors.New("serviceName is empty") + } + + route, _ := outboxinfra.ResolveServiceRoute(serviceName) + cfg := baseCfg + cfg.Topic = strings.TrimSpace(route.Topic) + cfg.GroupID = strings.TrimSpace(route.GroupID) + cfg.ServiceName = strings.TrimSpace(route.ServiceName) + if cfg.ServiceName == "" { + cfg.ServiceName = serviceName + } + + bus, err := outboxinfra.NewEventBus(repo.WithRoute(route), cfg) + if err != nil { + return nil, err + } + if bus == nil { + return nil, nil + } + return bus, nil +} + +func (b *routedOutboxBus) Publish(ctx context.Context, req outboxinfra.PublishRequest) error { + serviceBus, err := b.resolveBusByEventType(req.EventType) + if err != nil { + return err + } + return serviceBus.Publish(ctx, req) +} + +func (b *routedOutboxBus) RegisterEventHandler(eventType string, handler outboxinfra.MessageHandler) error { + if handler == nil { + return errors.New("handler is nil") + } + serviceBus, err := b.resolveBusByEventType(eventType) + if err != nil { + return err + } + return serviceBus.RegisterEventHandler(eventType, handler) +} + +func (b *routedOutboxBus) Start(ctx context.Context) { + if b == nil { + return + } + for _, serviceName := range orderedOutboxServiceNames(b.buses) { + b.buses[serviceName].Start(ctx) + } +} + +func (b *routedOutboxBus) Close() { + if b == nil { + return + } + for _, serviceName := range orderedOutboxServiceNames(b.buses) { + b.buses[serviceName].Close() + } +} + +func (b *routedOutboxBus) resolveBusByEventType(eventType string) (OutboxBus, error) { + if b == nil { + return nil, errors.New("outbox bus is not initialized") + } + + eventType = strings.TrimSpace(eventType) + if eventType == "" { + return nil, errors.New("eventType is empty") + } + + serviceName, ok := outboxinfra.ResolveEventService(eventType) + if !ok { + return nil, fmt.Errorf("outbox route not registered: eventType=%s", eventType) + } + + serviceBus, ok := b.buses[strings.TrimSpace(serviceName)] + if !ok || serviceBus == nil { + return nil, fmt.Errorf("service outbox bus is missing: service=%s eventType=%s", serviceName, eventType) + } + return serviceBus, nil +} + +func orderedOutboxServiceNames(buses map[string]OutboxBus) []string { + ordered := make([]string, 0, len(buses)) + seen := make(map[string]struct{}, len(buses)) + + for _, serviceName := range OutboxServiceNames() { + if _, ok := buses[serviceName]; ok { + ordered = append(ordered, serviceName) + seen[serviceName] = struct{}{} + } + } + + extras := make([]string, 0) + for serviceName := range buses { + if _, ok := seen[serviceName]; ok { + continue + } + extras = append(extras, serviceName) + } + sort.Strings(extras) + return append(ordered, extras...) +} + +// OutboxServiceNames 返回当前阶段启用的 service 级 outbox 名称。 +func OutboxServiceNames() []string { + return []string{ + string(outboxHandlerServiceAgent), + string(outboxHandlerServiceTask), + string(outboxHandlerServiceMemory), + string(outboxHandlerServiceActiveScheduler), + string(outboxHandlerServiceNotification), + } +} + +// ResolveOutboxTopicForService 把 service 名称映射成独立 Kafka topic。 +// +// 1. 这里保留现在的命名风格:smartflow..outbox; +// 2. 空 service 只作为兜底,不作为主路径; +// 3. 调用侧不再传共享 topic,避免入口继续依赖旧结构。 +func ResolveOutboxTopicForService(serviceName string) string { + route, _ := outboxinfra.ResolveServiceRoute(serviceName) + if topic := strings.TrimSpace(route.Topic); topic != "" { + return topic + } + serviceName = strings.TrimSpace(serviceName) + if serviceName == "" { + return kafkabus.DefaultTopic + } + return "smartflow." + serviceName + ".outbox" +} + +// ResolveOutboxGroupForService 把 service 名称映射成独立 Kafka group。 +func ResolveOutboxGroupForService(serviceName string) string { + route, _ := outboxinfra.ResolveServiceRoute(serviceName) + if groupID := strings.TrimSpace(route.GroupID); groupID != "" { + return groupID + } + serviceName = strings.TrimSpace(serviceName) + if serviceName == "" { + return kafkabus.DefaultGroup + } + return "smartflow-" + serviceName + "-outbox-consumer" +} + +// ResolveOutboxTopicForEvent 根据事件归属 service 计算 topic。 +func ResolveOutboxTopicForEvent(eventType string) (string, error) { + route, ok := outboxinfra.ResolveEventRoute(eventType) + if !ok { + return "", fmt.Errorf("outbox route not registered: eventType=%s", strings.TrimSpace(eventType)) + } + if topic := strings.TrimSpace(route.Topic); topic != "" { + return topic, nil + } + return ResolveOutboxTopicForService(route.ServiceName), nil +} diff --git a/backend/service/events/outbox_handler_routes.go b/backend/service/events/outbox_handler_routes.go new file mode 100644 index 0000000..1748ca6 --- /dev/null +++ b/backend/service/events/outbox_handler_routes.go @@ -0,0 +1,74 @@ +package events + +import ( + "fmt" + "strings" + + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" +) + +// outboxHandlerService 表示 outbox 路由归属的业务服务。 +// +// 这里只记录服务归属,不承载具体实现包名,方便在启动日志和路由表里直接阅读。 +type outboxHandlerService string + +const ( + outboxHandlerServiceAgent outboxHandlerService = "agent" + outboxHandlerServiceTask outboxHandlerService = "task" + outboxHandlerServiceMemory outboxHandlerService = "memory" + outboxHandlerServiceActiveScheduler outboxHandlerService = "active-scheduler" + outboxHandlerServiceNotification outboxHandlerService = "notification" +) + +// outboxHandlerRoute 显式描述“事件类型 -> 服务归属 -> handler 注册动作”。 +// +// 1. EventType 负责唯一定位 outbox 路由键; +// 2. Service 负责标明该路由归属的业务服务; +// 3. Register 只负责把对应 handler 挂到总线,不承载业务逻辑。 +type outboxHandlerRoute struct { + EventType string + Service outboxHandlerService + Register func() error +} + +// registerOutboxHandlerRoutes 逐条注册路由表里的 handler。 +// +// 1. 先把事件类型和服务归属写进路由表,避免启动入口散落多处 if err != nil; +// 2. 再统一执行注册动作,保证失败时能直接定位到具体 event_type 和 service; +// 3. 若某条路由缺少注册函数,直接返回 error,避免静默漏注册。 +func registerOutboxHandlerRoutes(routes []outboxHandlerRoute) error { + for _, route := range routes { + if route.Register == nil { + return fmt.Errorf("outbox handler route 缺少注册函数: event_type=%s service=%s", route.EventType, route.Service) + } + if err := outboxinfra.RegisterEventService(route.EventType, string(route.Service)); err != nil { + return fmt.Errorf("登记 outbox 事件归属失败(event_type=%s, service=%s): %w", route.EventType, route.Service, err) + } + if err := route.Register(); err != nil { + return fmt.Errorf("注册 outbox handler 失败(event_type=%s, service=%s): %w", route.EventType, route.Service, err) + } + } + return nil +} + +// scopedOutboxRepoForEvent 负责把通用 outbox 仓库收敛到某个事件所属的服务表。// +// 职责边界: +// 1. 只做事件->服务->表的路由,不碰业务写入语义; +// 2. 返回的仓库只适合当前事件的 MarkDead / ConsumeAndMarkConsumed / MarkFailedForRetry; +// 3. 路由缺失时直接返回错误,避免默默写回默认表。 +func scopedOutboxRepoForEvent(outboxRepo *outboxinfra.Repository, eventType string) (*outboxinfra.Repository, error) { + if outboxRepo == nil { + return nil, fmt.Errorf("outbox repository is nil") + } + + eventType = strings.TrimSpace(eventType) + if eventType == "" { + return nil, fmt.Errorf("eventType is empty") + } + + route, ok := outboxinfra.ResolveEventRoute(eventType) + if !ok { + return nil, fmt.Errorf("outbox route not registered: eventType=%s", eventType) + } + return outboxRepo.WithRoute(route), nil +} diff --git a/backend/service/events/task_urgency_promote.go b/backend/service/events/task_urgency_promote.go index a4a8ca0..f4c5c36 100644 --- a/backend/service/events/task_urgency_promote.go +++ b/backend/service/events/task_urgency_promote.go @@ -31,7 +31,7 @@ const ( // 2. 只处理 `task.urgency.promote.requested` 事件,不处理其他业务事件; // 3. 通过 `ConsumeAndMarkConsumed` 把“业务更新 + outbox consumed 推进”放进同一事务。 func RegisterTaskUrgencyPromoteHandler( - bus *outboxinfra.EventBus, + bus OutboxBus, outboxRepo *outboxinfra.Repository, repoManager *dao.RepoManager, ) error { @@ -45,25 +45,29 @@ func RegisterTaskUrgencyPromoteHandler( if repoManager == nil { return errors.New("repo manager is nil") } + eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeTaskUrgencyPromoteRequested) + if err != nil { + return err + } // 2. 定义统一处理函数。 handler := func(ctx context.Context, envelope kafkabus.Envelope) error { // 2.1 先解析 payload;解析失败属于不可恢复错误,直接标记 dead。 var payload model.TaskUrgencyPromoteRequestedPayload if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析任务紧急性平移载荷失败: "+unmarshalErr.Error()) + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析任务紧急性平移载荷失败: "+unmarshalErr.Error()) return nil } // 2.2 做轻量参数净化,避免脏数据进入 DAO。 payload.TaskIDs = sanitizePositiveUniqueIntIDs(payload.TaskIDs) if payload.UserID <= 0 || len(payload.TaskIDs) == 0 { - _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "任务紧急性平移载荷无效: user_id 或 task_ids 非法") + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "任务紧急性平移载荷无效: user_id 或 task_ids 非法") return nil } // 2.3 统一走 outbox 消费事务入口,保证“业务成功 -> consumed”原子一致。 - return outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { + return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { // 2.3.1 基于同一 tx 构造 RepoManager,复用现有跨 DAO 事务模式。 txM := repoManager.WithTx(tx) // 2.3.2 以消费时刻为准做条件更新,确保“到线”判定与真实落库时刻一致。 diff --git a/docs/backend/微服务四步迁移与第二阶段并行开发计划.md b/docs/backend/微服务四步迁移与第二阶段并行开发计划.md index 16050c9..97e4bed 100644 --- a/docs/backend/微服务四步迁移与第二阶段并行开发计划.md +++ b/docs/backend/微服务四步迁移与第二阶段并行开发计划.md @@ -2,7 +2,12 @@ ## 1. 文档定位 -这份文档是当前后端迁移的主总纲,目标是把现状从“单体 Gin + 统一 outbox”平滑演进到“Gin Gateway + gozero 服务群 + 服务级 outbox + Kafka 共享运输层”。 +这份文档是当前后端迁移的主总纲,目标是把阶段 0 的“单体 Gin + 统一 outbox”基线,继续平滑演进到“Gin Gateway + gozero 服务群 + 服务级 outbox + Kafka 共享运输层”。 + +当前进度口径: + +1. 阶段 0 已完成:运行入口、事件契约、路由清单和 outbox 语义已经冻结。 +2. 阶段 1 已完成:当前基线已经切成服务级 outbox 表、服务级 Kafka topic、服务级 consumer group;仍在单体进程内装配多个服务级 worker,后续拆微服务时再物理迁出。 本计划遵守两个硬原则: @@ -23,13 +28,15 @@ 3. `notification_records` 已经有独立状态机、幂等、重试能力。 4. 事件契约已经拆成 `active_schedule.triggered`、`notification.feishu.requested`、`schedule.apply.*` 等独立 payload。 -但当前 outbox 仍然偏单体: +阶段 1 之后,当前 outbox 已经不是共享单表 / 单 topic / 单 group 形态: -1. relay 还带着“全局扫一遍”的思维。 -2. consumer 还默认“一个 worker 吃全部事件”。 -3. 未知事件的处理策略还不适合长期多服务共用。 +1. `agent`、`task`、`memory`、`active-scheduler`、`notification` 已经有各自的 outbox 表。 +2. 每个服务都有自己的 relay worker,把本服务 outbox 表投递到本服务 Kafka topic。 +3. Kafka 仍是共享运输层,但 topic 已按服务切开,不再把新流量写进共享 topic。 +4. 消费侧已经按服务 consumer group 隔离,不再用一个 worker 吃全部事件。 +5. 当前仍是单体进程内多 worker 装配;worker 后续会跟随对应服务一起迁出,不在阶段 1 直接拆进程。 -所以现在最合适的路线不是直接把所有服务拆完,而是先把 outbox 升级成服务级基础设施,再按服务边界逐个切出去。 +所以后续路线不是再补一次 outbox 基建,而是在这个阶段 1 基线上,按服务边界逐个把 gozero 服务、DAO / model / worker 和启动入口迁出去。 --- @@ -76,9 +83,21 @@ gozero 服务负责领域能力: 1. 每个服务拥有自己的 outbox。 2. 每个服务有自己的 relay worker。 -3. Kafka 作为共享运输层。 +3. Kafka 作为共享运输层,但业务 topic 不共享。 4. 同一服务的多个实例进入同一个 consumer group,做横向负载均衡。 -5. topic 优先按服务/事件域划分,过渡期如果共享 topic,必须有显式 `event_type -> service` 路由。 +5. topic 按服务/事件域划分,消费侧按服务 consumer group 隔离。 +6. 新增事件必须登记 `event_type -> service` 路由,再由服务 catalog 映射到对应 outbox 表、topic 和 consumer group。 +7. 未知事件不能靠“写进共享表再由全局 worker 兜底”处理;必须在路由注册、死信或显式失败之间选一种清晰策略。 + +当前阶段 1 基线: + +| 服务 | outbox 表 | Kafka topic | consumer group | +| --- | --- | --- | --- | +| `agent` | `agent_outbox_messages` | `smartflow.agent.outbox` | `smartflow-agent-outbox-consumer` | +| `task` | `task_outbox_messages` | `smartflow.task.outbox` | `smartflow-task-outbox-consumer` | +| `memory` | `memory_outbox_messages` | `smartflow.memory.outbox` | `smartflow-memory-outbox-consumer` | +| `active-scheduler` | `active_scheduler_outbox_messages` | `smartflow.active-scheduler.outbox` | `smartflow-active-scheduler-outbox-consumer` | +| `notification` | `notification_outbox_messages` | `smartflow.notification.outbox` | `smartflow-notification-outbox-consumer` | ### 3.4 共享层边界 @@ -97,8 +116,8 @@ gozero 服务负责领域能力: | 阶段 | 目标 | 建议 commit 点 | 建议测试 | | --- | --- | --- | --- | -| 0 | 语义冻结和基线确认 | 当前运行边界、事件契约和路由清单固定后,先做一个基线 commit | `go test ./...`,`api / worker / all` 启动 smoke | -| 1 | Outbox v2 基建 | 服务级 outbox 路由和 relay 逻辑打通后 commit 一次 | 全量单测 + outbox 发布/消费 smoke | +| 0 | 语义冻结和基线确认(已完成) | 阶段 0 已作为历史基线保存;后续只在契约变化时回看 | `go test ./...`,`api / worker / all` 启动 smoke | +| 1 | Outbox v2 基建(已完成,当前基线) | 当前已具备阶段 1 保存点:服务级 outbox 表、topic、group 和多 worker 装配已打通 | 已完成健康检查、服务级 outbox 写入/投递/消费 smoke、Kafka group lag 核对 | | 1.5 | 先抽 llm-service | 统一模型调用、provider 路由、流式输出和审计后 commit | course / active-scheduler / memory 模型调用 smoke | | 1.6 | 再抽 rag-service | 向量化、召回、重排、检索能力跑通后 commit | memory retrieve / rerank smoke | | 2 | 先拆 user/auth | user 路由、JWT 签发和 token 额度治理独立后 commit | 注册/登录/刷新/登出 smoke + token quota 回归 | @@ -113,7 +132,9 @@ gozero 服务负责领域能力: --- -### 4.2 阶段 0:语义冻结和基线确认 +### 4.2 阶段 0:语义冻结和基线确认(已完成) + +本节保留为历史回顾,用来说明阶段 0 冻结了什么,不是当前待办。 目标: @@ -142,32 +163,69 @@ gozero 服务负责领域能力: --- -### 4.3 阶段 1:Outbox v2 基建 +### 4.3 阶段 1:Outbox v2 基建(已完成) -目标: +当前结论: -1. 把 outbox 从“单体内部事件泵”升级成“服务级事件总线能力”。 -2. 让 outbox 先具备服务归属,再谈服务拆分。 -3. 为后面的 gozero 服务切分打地基。 +1. outbox 已经从“单体内部事件泵”升级成“服务级事件总线能力”。 +2. 当前基线已经具备 `event_type -> service -> outbox 表 / topic / group` 的服务归属链路。 +3. 服务还没有物理拆成多个 gozero 进程;目前是在单体内按服务装配多个 relay worker 和 consumer worker。 +4. 后续拆微服务时,worker 会随对应服务迁出,不再重新设计 outbox 边界。 -这一步要做的事: +已完成的事: -1. 引入 outbox 归属概念,服务和 outbox 一一对应。 -2. relay worker 变成服务自己的后台进程。 -3. consumer 按服务订阅路由,不再一个 worker 什么都吃。 -4. 过渡期允许旧实现与新实现并行,但切流点必须清晰。 -5. 如果短期共享 topic,必须有显式事件路由,不能靠“未知事件直接 dead”来硬顶。 +1. 引入 outbox 服务归属概念,`agent`、`task`、`memory`、`active-scheduler`、`notification` 分别对应自己的 outbox 表。 +2. relay worker 已按服务拆开,每个 relay 只扫描本服务 outbox 表并投递到本服务 topic。 +3. consumer 已按服务 group 隔离,每个服务只处理归属到自己的事件。 +4. topic 已经直接切成服务级 topic,不再把“共享 topic”作为当前终态或过渡依赖。 +5. 事件发布入口已经通过路由和 catalog 决定服务归属,避免新事件默认回流到共享 outbox。 -建议提交点: +当前 outbox 总线结构: -1. outbox 归属和路由抽象完成后,先保存一个 commit。 -2. 第一条服务级 relay 跑通后,再保存一个 commit。 +```mermaid +flowchart LR + Biz["业务发布事件"] --> Router["event_type -> service 路由"] + Router --> Catalog["service -> table/topic/group"] -建议测试: + Catalog --> AgentTable["agent_outbox_messages"] + Catalog --> TaskTable["task_outbox_messages"] + Catalog --> MemoryTable["memory_outbox_messages"] + Catalog --> ActiveTable["active_scheduler_outbox_messages"] + Catalog --> NotifyTable["notification_outbox_messages"] -1. `go test ./...` -2. outbox 发布 / 投递 / 消费 smoke。 -3. 未知事件不会误伤其他服务的路由验证。 + AgentTable --> AgentRelay["agent relay"] --> AgentTopic["smartflow.agent.outbox"] --> AgentGroup["smartflow-agent-outbox-consumer"] + TaskTable --> TaskRelay["task relay"] --> TaskTopic["smartflow.task.outbox"] --> TaskGroup["smartflow-task-outbox-consumer"] + MemoryTable --> MemoryRelay["memory relay"] --> MemoryTopic["smartflow.memory.outbox"] --> MemoryGroup["smartflow-memory-outbox-consumer"] + ActiveTable --> ActiveRelay["active-scheduler relay"] --> ActiveTopic["smartflow.active-scheduler.outbox"] --> ActiveGroup["smartflow-active-scheduler-outbox-consumer"] + NotifyTable --> NotifyRelay["notification relay"] --> NotifyTopic["smartflow.notification.outbox"] --> NotifyGroup["smartflow-notification-outbox-consumer"] +``` + +当前切流点: + +1. 新事件先按 `event_type` 查服务路由,再按服务 catalog 写入对应 outbox 表。 +2. relay 只处理本服务表里的记录,不再全局扫描一个共享表。 +3. Kafka topic 已经按服务切开,consumer group 也按服务切开。 +4. `agent_outbox_messages` 仍沿用历史默认表名作为 `agent` 服务表;表内旧数据是共享 outbox 时代的历史存量,不代表新流量仍回流 `agent`。 + +仍保留的旧实现: + +1. `backend/cmd/api`、`backend/cmd/worker`、`backend/cmd/all` 仍是当前启动壳,尚未拆成终态的多 gozero 进程。 +2. `backend/model/outbox.go` 仍保留兼容模型;实际写入表由 outbox repository 根据服务路由选择。 +3. 既有事件契约和 handler 继续保留,后续按服务迁移时再逐步移动到对应服务目录。 + +已完成验证: + +1. 健康检查返回 200。 +2. MySQL 当前只有服务级 outbox 表,没有继续依赖共享 `outbox_messages` 表。 +3. Kafka 已存在 5 个服务级 topic 和 5 个服务级 consumer group。 +4. 最新 smoke 中,`task.urgency.promote.requested` 写入 `task_outbox_messages`,topic 为 `smartflow.task.outbox`,状态流转到 `consumed`,`task` group lag 为 0。 +5. 对应任务优先级从 `2` 更新到 `1`,证明发布、投递、消费和业务处理链路已经闭环。 + +下一步: + +1. 不再重复做 Outbox v2 基建。 +2. 后续从阶段 1.5 / 1.6 开始,按 `llm-service`、`rag-service`、`user/auth` 等服务边界推进物理拆分。 +3. 迁移任何新服务时,必须复用当前 outbox 路由、服务 catalog、relay 和 consumer group 隔离规则。 --- @@ -414,22 +472,21 @@ gozero 服务负责领域能力: ## 5. 推荐执行顺序 -近期建议按这个顺序推进: +当前建议按这个顺序推进: -1. 先冻结现有语义和契约。 -2. 再做 Outbox v2。 -3. 先切 llm-service,把统一模型出口从各业务服务里抽出去。 -4. 再切 rag-service,把检索基础设施从 memory / agent 里抽出去。 -5. 先切 user/auth,把登录态和额度门禁从 gateway 拿出去。 -6. 再切 notification。 -7. 再切 active-scheduler。 -8. 然后切 schedule / task / course / task-class。 -9. 再切 agent / memory,把聊天编排和记忆链路独立出去。 -10. 最后把 Gin 收口成纯 Gateway。 +1. 以阶段 1 的服务级 outbox 为当前基线,不再回头做共享 outbox 方案。 +2. 先切 llm-service,把统一模型出口从各业务服务里抽出去。 +3. 再切 rag-service,把检索基础设施从 memory / agent 里抽出去。 +4. 先切 user/auth,把登录态和额度门禁从 gateway 拿出去。 +5. 再切 notification。 +6. 再切 active-scheduler。 +7. 然后切 schedule / task / course / task-class。 +8. 再切 agent / memory,把聊天编排和记忆链路独立出去。 +9. 最后把 Gin 收口成纯 Gateway。 一句话总结: -> 先把 outbox 从单体内部兜底机制变成服务级基础设施;再把 llm-service 抽成全仓统一模型出口,把 rag-service 抽成统一检索基础设施;然后把 user/auth 从 gateway 里抽出去,清掉用户表直连和额度门禁耦合;接着把 notification 切成第一条事件驱动服务线;然后让 active-scheduler、schedule、task、course、task-class 按稳定边界逐步独立;再把 agent / memory 独立出来,完成聊天编排和记忆链路的服务化;最后把 Gin 收口成真正的 Gateway。 +> outbox 的服务级基础设施已经完成;接下来先把 llm-service 抽成全仓统一模型出口,把 rag-service 抽成统一检索基础设施;然后把 user/auth 从 gateway 里抽出去,清掉用户表直连和额度门禁耦合;接着把 notification 切成第一条事件驱动服务线;然后让 active-scheduler、schedule、task、course、task-class 按稳定边界逐步独立;再把 agent / memory 独立出来,完成聊天编排和记忆链路的服务化;最后把 Gin 收口成真正的 Gateway。 --- @@ -650,7 +707,7 @@ SmartFlow-Agent/ 3. `agent` 不是公共能力,它应当单独成服务;`memory` 也是独立支撑服务,不应长期挂在 gateway 里。 4. `notification` 和 `active-scheduler` 都应该回到更像 seckill 的服务内单体结构,避免成为“半个框架”。 5. `llm-service` 是全仓统一模型出口;`rag-service` 是统一检索基础设施;`rag-service` 依赖 `llm-service`,不反向依赖业务服务。 -6. outbox 先升级成服务级基础设施,再按域边界逐个切出去。 +6. outbox 已经升级成服务级基础设施,后续直接在当前服务级表 / topic / group 基线上按域边界逐个切出去。 7. 后续任何服务目录调整,都要先对照下面的“典型用例”;如果这次改动说不清它属于哪个用例,就先不要动结构,只补文件或补注释。 ### 6.6 `notification` / `active-scheduler` 的服务内结构 @@ -757,6 +814,8 @@ graph TD 2. `agent` 仍然是编排层,实际会通过契约调用 `user/auth`、`course`、`task-class`、`notification`、`active-scheduler`、`schedule`、`task` 和 `memory`。 3. Gateway 不直接碰 `llm-service` / `rag-service`,只把请求转给对应业务服务。 4. 图里的 outbox 是“每个服务自己的 outbox 表 + 专属 relay worker”的抽象,不代表所有服务共用一张表。 +5. 当前阶段 1 已完成 `agent`、`task`、`memory`、`active-scheduler`、`notification` 的服务级 outbox 表、topic 和 consumer group;尚未物理拆出的服务后续沿用同一模式补齐。 +6. Kafka 是共享运输层,不是共享业务 topic;新流量不应再默认进入单一共享 topic。 ### 6.9 切对话交接卡 @@ -764,11 +823,13 @@ graph TD 1. 先读顺序:`3.2` 服务层、`4.1` 阶段总览、`4.3` Outbox v2、`4.4` llm-service、`4.5` rag-service、`4.6` user/auth、`4.7` notification、`4.8` active-scheduler、`4.10` agent / memory、`6.5` 切对话锚点、`6.7` 典型用例、`6.8` 最终关系图、`6.10` 启动方式、`6.11` 测试自动化、`6.12` 多代理执行闭环。 2. 已冻结的终态是 `Gin Gateway + gozero 服务群 + 服务级 outbox + Kafka 共享运输层`。 -3. 已冻结的基础设施服务是 `llm-service` 和 `rag-service`,其中 `rag-service` 只依赖 `llm-service`。 -4. 已冻结的业务服务优先级是 `user/auth -> notification -> active-scheduler -> schedule/task/course/task-class -> agent/memory`,其中 `notification` 和 `active-scheduler` 要回到更像 seckill 的服务内单体壳。 -5. `shared` 只保留跨进程契约和少量跨服务底座,不承载业务逻辑、DAO、模型或状态机。 -6. 如果后续要改目录,必须先回答“这个文件属于哪一个典型用例”,回答不清楚就先别动结构。 -7. 当前文档已经可以作为切对话基线;后续代理默认按本文件推进,只在出现新的契约风险、边界变化或业务语义变化时再重新讨论架构。 +3. 阶段 1 已完成,当前 outbox 基线是服务级表、服务级 topic、服务级 consumer group;worker 仍在单体内装配,后续随对应服务迁出。 +4. 已冻结的基础设施服务是 `llm-service` 和 `rag-service`,其中 `rag-service` 只依赖 `llm-service`。 +5. 下一轮默认从阶段 1.5 / 1.6 继续,先抽 `llm-service` 和 `rag-service`;业务服务优先级仍是 `user/auth -> notification -> active-scheduler -> schedule/task/course/task-class -> agent/memory`。 +6. `notification` 和 `active-scheduler` 后续要回到更像 seckill 的服务内单体壳。 +7. `shared` 只保留跨进程契约和少量跨服务底座,不承载业务逻辑、DAO、模型或状态机。 +8. 如果后续要改目录,必须先回答“这个文件属于哪一个典型用例”,回答不清楚就先别动结构。 +9. 当前文档已经可以作为切对话基线;后续代理默认按本文件推进。现阶段的迁移基线入口是 `backend/cmd/api`、`backend/cmd/worker`、`backend/cmd/all`,它们只是当前仓库的启动壳,不是终态。终态仍然是“一个服务一个独立 `main.go`”,只在出现新的契约风险、边界变化或业务语义变化时再重新讨论架构。 ### 6.10 启动方式与进程模型 @@ -884,6 +945,27 @@ graph TD --- +### 6.13 阶段 0 历史基线与阶段 1 当前基线快照 + +阶段 0 历史基线: + +1. 阶段 0 时,仓库入口是 `backend/cmd/api`、`backend/cmd/worker`、`backend/cmd/all`;`backend/main.go` 只保留兼容壳,不作为终态入口。 +2. 阶段 0 时,Gin 路由集中在 `/api/v1/...` 下,主要覆盖 `user`、`task`、`course`、`task-class`、`schedule`、`agent`、`memory`、`active-schedule`、`notification`。 +3. 阶段 0 时,outbox 还是共享单表 + 单 topic / 单 group 形态;阶段 0 只冻结 envelope、版本号、handler 路由和消费边界,不扩 topic。 +4. 阶段 0 时,`notification`、`active-scheduler`、`memory`、`agent` 已有单体内服务化雏形;`user/auth` 仍停留在用户域服务层能力,尚未独立成终态服务。 +5. 阶段 0 只做语义冻结和基线确认,不切服务,不搬 DAO / model,不重排目录,只把运行入口、事件契约、路由清单和 outbox 语义定住,作为 Outbox v2 的历史基线。 + +阶段 1 当前基线: + +1. 当前运行入口仍是 `backend/cmd/api`、`backend/cmd/worker`、`backend/cmd/all`,但 worker 已按服务装配多条 outbox 链路。 +2. 当前 outbox 已切成服务级表:`agent_outbox_messages`、`task_outbox_messages`、`memory_outbox_messages`、`active_scheduler_outbox_messages`、`notification_outbox_messages`。 +3. 当前 Kafka 已切成服务级 topic:`smartflow.agent.outbox`、`smartflow.task.outbox`、`smartflow.memory.outbox`、`smartflow.active-scheduler.outbox`、`smartflow.notification.outbox`。 +4. 当前消费侧已切成服务级 consumer group:`smartflow-agent-outbox-consumer`、`smartflow-task-outbox-consumer`、`smartflow-memory-outbox-consumer`、`smartflow-active-scheduler-outbox-consumer`、`smartflow-notification-outbox-consumer`。 +5. 当前仍是单体内多 worker 装配,不代表服务已经物理拆出;后续拆服务时,要把对应 outbox 表、relay、topic、consumer group 和 handler 一起迁到服务进程里。 +6. `agent_outbox_messages` 沿用历史默认表名作为 `agent` 服务 outbox 表,历史存量不代表新事件仍使用共享 outbox。 + +--- + ## 7. 风险与回退 ### 风险 1:消息路由不清 @@ -896,9 +978,9 @@ graph TD 回退: -1. 先把 topic 再切细。 -2. 先用服务级 consumer group 隔离。 -3. 先保留 `all` 模式兜底。 +1. 不回退到共享 topic;先冻结新增事件,补齐 `event_type -> service` 路由和 service catalog 配置。 +2. 如果某个服务误消费,先暂停对应服务 worker 或 consumer group,不影响其他服务 group。 +3. 保留 `all` 模式作为本地运行兜底,但 `all` 内部仍按服务级 worker 装配。 ### 风险 2:relay 并行过早 @@ -926,3 +1008,17 @@ graph TD 1. 保留当前单体实现。 2. 只做接口和契约前置拆分。 3. 不提前拆 schedule/task。 + +### 风险 4:历史 outbox 存量被误判 + +表现: + +1. `agent_outbox_messages` 里存在共享 outbox 时代的旧数据,看起来像所有事件仍写回 `agent`。 +2. 后续代理误以为阶段 1 没有完成,又重复设计共享表迁移方案。 +3. 清理历史数据时误删仍有业务价值的审计记录。 + +处理: + +1. 以新事件写入表、topic 和 consumer group 为准判断当前链路,不用旧存量判断新路由。 +2. 历史数据清理或归档单独立项,不和服务拆分混在同一轮做。 +3. 如需清理本地测试存量,必须遵守 `6.11` 的数据库操作边界,破坏性操作单独确认。