Version: 0.9.64.dev.260503

后端:
1. 服务级 outbox 基础设施全量落地——新增 service route / service catalog / route registry,重构 outbox engine、repository、event bus 和 model,按 `event_type -> service -> table/topic/group` 统一写入与投递,保留 `agent` 兼容壳但不再依赖共享 outbox
2. Kafka 投递、消费与启动装配同步切换——更新 kafka config、consumer、envelope,接入服务级 topic 与 consumer group,并同步调整 mysql 初始化、start/main/router 装配,保证各服务 relay / consumer 独立装配
3. 业务事件处理器按服务归属重接新 bus——`active-scheduler` 触发链路,以及 `agent` / `memory` / `notification` / `task` 相关 outbox handler 统一切到新路由注册与服务目录,避免新流量回流共享表
4. 同步更新《微服务四步迁移与第二阶段并行开发计划》,把阶段 1 改成当前基线并补齐结构图、阶段快照、风险回退和多代理执行口径
This commit is contained in:
Losita
2026-05-03 20:29:00 +08:00
parent 166fb1b507
commit a6c1e5d077
28 changed files with 1631 additions and 340 deletions

View File

@@ -8,7 +8,6 @@ import (
"strings"
"time"
kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
"github.com/LoveLosita/smartflow/backend/model"
sharedevents "github.com/LoveLosita/smartflow/backend/shared/events"
@@ -25,13 +24,13 @@ const requestedNotificationDedupeWindow = 30 * time.Minute
func EnqueueActiveScheduleTriggeredInTx(
ctx context.Context,
outboxRepo *outboxinfra.Repository,
kafkaCfg kafkabus.Config,
maxRetry int,
payload sharedevents.ActiveScheduleTriggeredPayload,
) error {
return enqueueContractEventInTx(
ctx,
outboxRepo,
kafkaCfg,
maxRetry,
sharedevents.ActiveScheduleTriggeredEventType,
sharedevents.ActiveScheduleTriggeredEventVersion,
payload.MessageKey(),
@@ -51,13 +50,13 @@ func EnqueueActiveScheduleTriggeredInTx(
func EnqueueNotificationFeishuRequestedInTx(
ctx context.Context,
outboxRepo *outboxinfra.Repository,
kafkaCfg kafkabus.Config,
maxRetry int,
payload sharedevents.FeishuNotificationRequestedPayload,
) error {
return enqueueContractEventInTx(
ctx,
outboxRepo,
kafkaCfg,
maxRetry,
sharedevents.NotificationFeishuRequestedEventType,
sharedevents.NotificationFeishuRequestedEventVersion,
payload.MessageKey(),
@@ -156,7 +155,7 @@ func BuildNotificationDedupeKey(userID int, triggerType string, requestedAt time
func enqueueContractEventInTx(
ctx context.Context,
outboxRepo *outboxinfra.Repository,
kafkaCfg kafkabus.Config,
maxRetry int,
eventType string,
eventVersion string,
messageKey string,
@@ -179,8 +178,10 @@ func enqueueContractEventInTx(
if err != nil {
return err
}
if maxRetry <= 0 {
maxRetry = 20
}
cfg := normalizeKafkaConfig(kafkaCfg)
wrapped := outboxinfra.OutboxEventPayload{
EventID: strings.TrimSpace(eventID),
EventType: eventType,
@@ -188,20 +189,12 @@ func enqueueContractEventInTx(
AggregateID: strings.TrimSpace(aggregateID),
Payload: payloadJSON,
}
_, err = outboxRepo.CreateMessage(ctx, eventType, cfg.Topic, strings.TrimSpace(messageKey), wrapped, cfg.MaxRetry)
// 1. 这里只负责把已经校验过的事件契约写入 outbox具体 service/table/topic 由仓库按 eventType 解析。
// 2. 这样 active scheduler 侧不再显式依赖 topic后续切服务级路由时只需要维护事件归属表。
_, err = outboxRepo.CreateMessage(ctx, eventType, strings.TrimSpace(messageKey), wrapped, maxRetry)
return err
}
func normalizeKafkaConfig(cfg kafkabus.Config) kafkabus.Config {
if strings.TrimSpace(cfg.Topic) == "" {
cfg.Topic = kafkabus.DefaultTopic
}
if cfg.MaxRetry <= 0 {
cfg.MaxRetry = 20
}
return cfg
}
func buildNotificationFallbackText(summary string, targetURL string) string {
link := strings.TrimSpace(targetURL)
if summary == "" {

View File

@@ -209,7 +209,7 @@ func (s *TriggerWorkflowService) ProcessTriggeredInTx(
previewResp.Detail.Notification,
now,
)
return EnqueueNotificationFeishuRequestedInTx(ctx, s.outbox.WithTx(tx), s.kafkaCfg, notificationPayload)
return EnqueueNotificationFeishuRequestedInTx(ctx, s.outbox.WithTx(tx), s.kafkaCfg.MaxRetry, notificationPayload)
}
// MarkTriggerFailedBestEffort 在事务外补记 trigger failed 状态,供 outbox retry 前排障。

View File

@@ -64,7 +64,7 @@ type appRuntime struct {
agentCache *dao.AgentCache
manager *dao.RepoManager
outboxRepo *outboxinfra.Repository
eventBus *outboxinfra.EventBus
eventBus eventsvc.OutboxBus
memoryModule *memory.Module
activeJobScanner *activejob.Scanner
activeTriggerWorkflow *activesvc.TriggerWorkflowService
@@ -88,46 +88,39 @@ func loadConfig() error {
return nil
}
// Start 保留历史入口,默认仍按 all 模式启动
//
// 职责边界:
// 1. 兼容 backend/main.go 以及旧部署命令;
// 2. 不新增业务语义,只委托给 StartAll
// 3. 后续若部署全面切到独立 api/worker可逐步废弃该兼容入口。
// Start 保留历史兼容入口,当前默认等价于 StartAll
// 1. 兼容 backend/main.go 和旧部署命令。
// 2. 不新增业务语义,只转发给 StartAll。
// 3. 后续若全面切到独立 api/worker 启动,本入口只保留过渡兼容。
func Start() {
StartAll()
}
// StartAll 启动迁移期兼容模式HTTP API 后台 worker 在同一进程内运行
// StartAll 启动当前仓库的完整运行态HTTP API + 后台 worker。
// 这仍然是迁移期的兼容装配,不是终态的“一个服务一个 main.go”模型。
func StartAll() {
ctx := context.Background()
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
runtime := mustBuildRuntime(ctx)
defer runtime.close()
runtime.startWorkers(ctx)
runtime.startHTTP()
runtime.startHTTP(ctx)
}
// StartAPI 只启动 Gin API 其同步 service/dao 依赖,不启动后台 worker。
//
// 说明:
// 1. 该模式仍是“带 service/dao 的 API 单体”,不是最终 API Gateway
// 2. API 可以继续写入 outbox但不负责消费 outbox也不启动 memory worker
// 3. worker 停止时API 仍可提供同步接口,只是异步能力会延迟处理。
// StartAPI 只启动 Gin API 其同步依赖,不启动后台 worker。
// 这仍是迁移期的单体 API 模式,不是终态的独立网关。
func StartAPI() {
ctx := context.Background()
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
runtime := mustBuildRuntime(ctx)
defer runtime.close()
runtime.startHTTP()
runtime.startHTTP(ctx)
}
// StartWorker 只启动后台异步能力,不注册 Gin 路由。
//
// 运行内容:
// 1. outbox relay扫描 pending 消息并投递 Kafka
// 2. Kafka consumer消费事件并分发到业务 handler
// 3. memory worker处理 memory_jobs 后台任务。
// 当前包含 outbox relay / Kafka consumer / memory worker / 主动调度扫描 / 通知重试。
func StartWorker() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
@@ -305,7 +298,7 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) {
}
handlers := buildAPIHandlers(userService, taskSv, taskClassService, courseService, scheduleService, agentService, memoryModule, activeScheduleDryRun, activeSchedulePreviewConfirm, activeScheduleTrigger, notificationChannelService)
return &appRuntime{
runtime := &appRuntime{
db: db,
redisClient: rdb,
cacheRepo: cacheRepo,
@@ -321,7 +314,13 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) {
notificationService: notificationService,
limiter: limiter,
handlers: handlers,
}, nil
}
if runtime.eventBus != nil {
if err := runtime.registerEventHandlers(); err != nil {
return nil, err
}
}
return runtime, nil
}
func buildRAGRuntime(ctx context.Context) (infrarag.Runtime, error) {
@@ -346,16 +345,24 @@ func buildRAGRuntime(ctx context.Context) (infrarag.Runtime, error) {
return ragRuntime, nil
}
func buildEventBus(outboxRepo *outboxinfra.Repository) (*outboxinfra.EventBus, error) {
// outbox 通用事件总线接线
// 1. API 模式只使用 Publish 写入 outbox不启动后台循环
// 2. worker/all 模式再显式注册 handler 并启动后台循环
func buildEventBus(outboxRepo *outboxinfra.Repository) (eventsvc.OutboxBus, error) {
// outbox 多 service 门面装配
// 1. 按 service 维度创建独立 enginetopic / group 由 service 名称推导
// 2. 对外仍然只暴露一个 Publish / Start / Close 门面
// 3. kafka.enabled=false 时返回 nil业务按既有降级策略执行。
kafkaCfg := kafkabus.LoadConfig()
eventBus, err := outboxinfra.NewEventBus(outboxRepo, kafkaCfg)
if err != nil {
return nil, fmt.Errorf("failed to initialize outbox event bus: %w", err)
serviceBuses := make(map[string]eventsvc.OutboxBus, len(eventsvc.OutboxServiceNames()))
for _, serviceName := range eventsvc.OutboxServiceNames() {
bus, err := eventsvc.NewServiceOutboxBus(outboxRepo, kafkaCfg, serviceName)
if err != nil {
return nil, fmt.Errorf("failed to initialize outbox event bus for service %s: %w", serviceName, err)
}
if bus != nil {
serviceBuses[serviceName] = bus
}
}
eventBus := eventsvc.NewRoutedOutboxBus(serviceBuses)
if eventBus == nil {
log.Println("Outbox event bus is disabled")
}
@@ -855,9 +862,6 @@ func (r *appRuntime) startWorkers(ctx context.Context) {
}
if r.eventBus != nil {
if err := r.registerEventHandlers(); err != nil {
log.Fatalf("Failed to register outbox event handlers: %v", err)
}
r.eventBus.Start(ctx)
log.Println("Outbox event bus started")
} else {
@@ -878,24 +882,25 @@ func (r *appRuntime) startWorkers(ctx context.Context) {
}
func (r *appRuntime) registerEventHandlers() error {
// 调用目的:worker/all 启动时复用同一套核心事件注册顺序,避免未来新增入口后复制多份 handler 接线
if err := eventsvc.RegisterCoreOutboxHandlers(r.eventBus, r.outboxRepo, r.manager, r.agentRepo, r.cacheRepo, r.memoryModule); err != nil {
// 调用目的:在运行时启动前一次性完成“事件类型 -> 服务归属 -> handler”的显式接线避免 API 模式发布事件时拿不到路由表
if err := eventsvc.RegisterAllOutboxHandlers(
r.eventBus,
r.outboxRepo,
r.manager,
r.agentRepo,
r.cacheRepo,
r.memoryModule,
r.activeTriggerWorkflow,
r.notificationService,
); err != nil {
return err
}
if err := eventsvc.RegisterActiveScheduleTriggeredHandler(r.eventBus, r.outboxRepo, r.activeTriggerWorkflow); err != nil {
return fmt.Errorf("注册主动调度触发 handler 失败: %w", err)
}
// 调用目的:飞书通知事件消费与 notification retry loop 复用同一个服务实例,
// 保证后续接入真实 provider 时只需要替换启动期注入配置。
if err := eventsvc.RegisterFeishuNotificationHandler(r.eventBus, r.outboxRepo, r.notificationService); err != nil {
return fmt.Errorf("注册飞书通知 handler 失败: %w", err)
}
return nil
}
func (r *appRuntime) startHTTP() {
func (r *appRuntime) startHTTP(ctx context.Context) {
router := routers.RegisterRouters(r.handlers, r.cacheRepo, r.userRepo, r.limiter)
routers.StartEngine(router)
routers.StartEngine(ctx, router)
}
func (r *appRuntime) close() {

View File

@@ -19,6 +19,8 @@ type Config struct {
Brokers []string
Topic string
GroupID string
// ServiceName 表示当前进程所属的 outbox 服务;为空时保持单体全量模式。
ServiceName string
// RetryScanInterval/RetryBatchSize/MaxRetry 作用于 outbox 扫描与失败重试。
RetryScanInterval time.Duration
RetryBatchSize int
@@ -40,10 +42,14 @@ func LoadConfig() Config {
Brokers: brokers,
Topic: strings.TrimSpace(viper.GetString("kafka.topic")),
GroupID: strings.TrimSpace(viper.GetString("kafka.groupID")),
ServiceName: strings.TrimSpace(viper.GetString("outbox.serviceName")),
RetryScanInterval: viper.GetDuration("kafka.retryScanInterval"),
RetryBatchSize: viper.GetInt("kafka.retryBatchSize"),
MaxRetry: viper.GetInt("kafka.maxRetry"),
}
if cfg.ServiceName == "" {
cfg.ServiceName = strings.TrimSpace(viper.GetString("kafka.serviceName"))
}
if cfg.Topic == "" {
cfg.Topic = DefaultTopic
}

View File

@@ -3,6 +3,7 @@ package kafka
import (
"context"
"errors"
"strings"
segmentkafka "github.com/segmentio/kafka-go"
)
@@ -13,7 +14,13 @@ type Consumer struct {
func NewConsumer(cfg Config) (*Consumer, error) {
if len(cfg.Brokers) == 0 {
return nil, errors.New("kafka brokers 未配置")
return nil, errors.New("kafka brokers not configured")
}
if strings.TrimSpace(cfg.Topic) == "" {
return nil, errors.New("kafka topic not configured")
}
if strings.TrimSpace(cfg.GroupID) == "" {
return nil, errors.New("kafka groupID not configured")
}
reader := segmentkafka.NewReader(segmentkafka.ReaderConfig{
Brokers: cfg.Brokers,
@@ -30,14 +37,14 @@ func NewConsumer(cfg Config) (*Consumer, error) {
// Dequeue 从 Kafka 拉取一条消息(不自动提交 offset
func (c *Consumer) Dequeue(ctx context.Context) (segmentkafka.Message, error) {
if c == nil || c.reader == nil {
return segmentkafka.Message{}, errors.New("kafka consumer 未初始化")
return segmentkafka.Message{}, errors.New("kafka consumer not initialized")
}
return c.reader.FetchMessage(ctx)
}
func (c *Consumer) Commit(ctx context.Context, msg segmentkafka.Message) error {
if c == nil || c.reader == nil {
return errors.New("kafka consumer 未初始化")
return errors.New("kafka consumer not initialized")
}
return c.reader.CommitMessages(ctx, msg)
}

View File

@@ -18,6 +18,8 @@ type Envelope struct {
EventType string `json:"event_type"`
// EventVersion 是事件版本号(默认 v1
EventVersion string `json:"event_version,omitempty"`
// ServiceName 是事件归属服务;空值通常表示旧兼容消息或全量模式。
ServiceName string `json:"service_name,omitempty"`
// AggregateID 是聚合主键(例如 conversation_id用于追踪同一业务对象事件流。
AggregateID string `json:"aggregate_id,omitempty"`

View File

@@ -40,19 +40,19 @@ type PublishRequest struct {
Payload any
}
// Engine 是 Outbox + Kafka 通用异步引擎。
// Engine 是单个服务的 Outbox + Kafka 异步引擎。
//
// 职责边界:
// 1. 负责 outbox 扫描、kafka 投递、kafka 消费、状态机推进;
// 1. 负责一个服务目录下的 outbox 扫描、Kafka 投递、Kafka 消费、状态机推进;
// 2. 负责 event_type -> handler 路由;
// 3. 不负责任何业务语义(业务由 handler 承担)
// 3. 不负责任何跨服务路由决策,跨服务分发由 EventBus 门面完成
type Engine struct {
repo *Repository
producer *kafkabus.Producer
consumer *kafkabus.Consumer
brokers []string
topic string
route ServiceRoute
maxRetry int
scanEvery time.Duration
scanBatch int
@@ -61,11 +61,12 @@ type Engine struct {
handlers map[string]MessageHandler
}
// NewEngine 创建异步引擎。
// NewEngine 创建单服务异步引擎。
//
// 规则:
// 1. kafka.enabled=false 时返回 nil调用方可降级同步
// 2. producer/consumer 任一步失败都会回收已创建资源。
// 2. serviceName 非空时优先使用服务级默认目录topic/group/table 不再沿用共享终态;
// 3. producer/consumer 任一步失败都会回收已创建资源。
func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) {
if !cfg.Enabled {
return nil, nil
@@ -74,6 +75,11 @@ func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) {
return nil, errors.New("outbox repository is nil")
}
route := resolveEngineRoute(repo, cfg)
cfg.Topic = route.Topic
cfg.GroupID = route.GroupID
serviceRepo := repo.WithRoute(route)
producer, err := kafkabus.NewProducer(cfg)
if err != nil {
return nil, err
@@ -85,11 +91,11 @@ func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) {
}
return &Engine{
repo: repo,
repo: serviceRepo,
producer: producer,
consumer: consumer,
brokers: cfg.Brokers,
topic: cfg.Topic,
route: route,
maxRetry: cfg.MaxRetry,
scanEvery: cfg.RetryScanInterval,
scanBatch: cfg.RetryBatchSize,
@@ -118,7 +124,7 @@ func (e *Engine) RegisterEventHandler(eventType string, handler MessageHandler)
e.handlersMu.Lock()
defer e.handlersMu.Unlock()
if _, exists := e.handlers[eventType]; exists {
log.Printf("outbox handler 覆盖注册: event_type=%s", eventType)
log.Printf("outbox handler 覆盖注册: service=%s event_type=%s", e.route.ServiceName, eventType)
}
e.handlers[eventType] = handler
return nil
@@ -137,11 +143,20 @@ func (e *Engine) Start(ctx context.Context) {
return
}
log.Printf("outbox engine starting: topic=%s brokers=%v retry_scan=%s batch=%d", e.topic, e.brokers, e.scanEvery, e.scanBatch)
if err := kafkabus.WaitTopicReady(ctx, e.brokers, e.topic, 30*time.Second); err != nil {
log.Printf(
"outbox engine starting: service=%s table=%s topic=%s group=%s brokers=%v retry_scan=%s batch=%d",
e.route.ServiceName,
e.route.TableName,
e.route.Topic,
e.route.GroupID,
e.brokers,
e.scanEvery,
e.scanBatch,
)
if err := kafkabus.WaitTopicReady(ctx, e.brokers, e.route.Topic, 30*time.Second); err != nil {
log.Printf("Kafka topic not ready before consume loop start: %v", err)
} else {
log.Printf("Kafka topic is ready: %s", e.topic)
log.Printf("Kafka topic is ready: %s", e.route.Topic)
}
e.StartDispatch(ctx)
@@ -149,11 +164,6 @@ func (e *Engine) Start(ctx context.Context) {
}
// StartDispatch 单独启动 outbox -> Kafka 的投递循环。
//
// 职责边界:
// 1. 只负责启动 dispatch 后台 goroutine不负责启动 Kafka 消费;
// 2. 不重复执行 Start 中的 topic readiness 等待,避免改变原 Start(ctx) 的启动语义;
// 3. ctx 取消后由内部循环自行退出,调用方无需额外停止 goroutine。
func (e *Engine) StartDispatch(ctx context.Context) {
if e == nil {
return
@@ -162,11 +172,6 @@ func (e *Engine) StartDispatch(ctx context.Context) {
}
// StartConsume 单独启动 Kafka -> handler 的消费循环。
//
// 职责边界:
// 1. 只负责启动 consume 后台 goroutine不负责扫描或投递 outbox
// 2. 不注册业务 handlerhandler 仍由 RegisterEventHandler 显式注入;
// 3. ctx 取消或 consumer 返回 context.Canceled 时,内部循环按既有逻辑退出。
func (e *Engine) StartConsume(ctx context.Context) {
if e == nil {
return
@@ -202,7 +207,7 @@ func (e *Engine) Enqueue(ctx context.Context, eventType, messageKey string, payl
// 步骤:
// 1. 标准化 event_type/version/key
// 2. payload 序列化;
// 3. 写入 outbox(仅本地写库,不做 kafka 网络 IO
// 3. 写入当前服务的 outbox 表,不再由调用方手传 topic
func (e *Engine) Publish(ctx context.Context, req PublishRequest) error {
if e == nil {
return errors.New("outbox engine is nil")
@@ -227,7 +232,7 @@ func (e *Engine) Publish(ctx context.Context, req PublishRequest) error {
return err
}
_, err = e.repo.CreateMessage(ctx, eventType, e.topic, messageKey, OutboxEventPayload{
_, err = e.repo.CreateMessage(ctx, eventType, messageKey, OutboxEventPayload{
EventID: strings.TrimSpace(req.EventID),
EventType: eventType,
EventVersion: eventVersion,
@@ -246,13 +251,13 @@ func (e *Engine) startDispatchLoop(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
pendingMessages, err := e.repo.ListDueMessages(ctx, e.scanBatch)
pendingMessages, err := e.repo.ListDueMessages(ctx, e.route.ServiceName, e.scanBatch)
if err != nil {
log.Printf("扫描 outbox 失败: %v", err)
continue
}
if len(pendingMessages) > 0 {
log.Printf("outbox due messages=%d, start dispatch", len(pendingMessages))
log.Printf("outbox due messages=%d, service=%s start dispatch", len(pendingMessages), e.route.ServiceName)
}
for _, msg := range pendingMessages {
@@ -287,18 +292,23 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error {
if eventPayload.EventID == "" {
eventPayload.EventID = strconv.FormatInt(outboxMsg.ID, 10)
}
serviceName := strings.TrimSpace(outboxMsg.ServiceName)
if serviceName == "" {
serviceName = e.route.ServiceName
}
envelope := kafkabus.Envelope{
OutboxID: outboxMsg.ID,
EventID: eventPayload.EventID,
EventType: eventPayload.EventType,
EventVersion: eventPayload.EventVersion,
ServiceName: serviceName,
AggregateID: eventPayload.AggregateID,
Payload: eventPayload.PayloadJSON,
}
raw, err := json.Marshal(envelope)
if err != nil {
markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 装失败: "+err.Error())
markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 装失败: "+err.Error())
if markErr != nil {
log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr)
}
@@ -329,7 +339,7 @@ func (e *Engine) startConsumeLoop(ctx context.Context) {
if errors.Is(err, context.Canceled) {
return
}
log.Printf("Kafka 消费拉取失败(topic=%s): %v", e.topic, err)
log.Printf("Kafka 消费拉取失败(topic=%s): %v", e.route.Topic, err)
time.Sleep(300 * time.Millisecond)
continue
}
@@ -344,11 +354,11 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er
var envelope kafkabus.Envelope
if err := json.Unmarshal(msg.Value, &envelope); err != nil {
_ = e.consumer.Commit(ctx, msg)
return fmt.Errorf("解析 Kafka 装失败: %w", err)
return fmt.Errorf("解析 Kafka 装失败: %w", err)
}
if envelope.OutboxID <= 0 {
_ = e.consumer.Commit(ctx, msg)
return errors.New("Kafka 装缺少 outbox_id")
return errors.New("Kafka 装缺少 outbox_id")
}
eventType := strings.TrimSpace(envelope.EventType)
@@ -360,9 +370,36 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er
return nil
}
runtimeServiceName := strings.TrimSpace(e.route.ServiceName)
if runtimeServiceName != "" {
messageServiceName := strings.TrimSpace(envelope.ServiceName)
if messageServiceName == "" {
if resolvedServiceName, ok := ResolveEventService(eventType); ok {
messageServiceName = resolvedServiceName
}
}
if messageServiceName == "" || messageServiceName != runtimeServiceName {
log.Printf(
"跳过非本服务事件: runtime_service=%s message_service=%s event_type=%s outbox_id=%d",
runtimeServiceName,
messageServiceName,
eventType,
envelope.OutboxID,
)
if err := e.consumer.Commit(ctx, msg); err != nil {
return err
}
return nil
}
}
handler, ok := e.getHandler(eventType)
if !ok {
_ = e.repo.MarkDead(ctx, envelope.OutboxID, "未知事件类型: "+eventType)
if runtimeServiceName == "" {
_ = e.repo.MarkDead(ctx, envelope.OutboxID, "未知事件类型: "+eventType)
} else {
_ = e.repo.MarkDead(ctx, envelope.OutboxID, "本服务未注册 handler: "+eventType)
}
if err := e.consumer.Commit(ctx, msg); err != nil {
return err
}
@@ -381,3 +418,51 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er
return e.consumer.Commit(ctx, msg)
}
func resolveEngineRoute(repo *Repository, cfg kafkabus.Config) ServiceRoute {
route := ServiceRoute{
ServiceName: strings.TrimSpace(cfg.ServiceName),
Topic: strings.TrimSpace(cfg.Topic),
GroupID: strings.TrimSpace(cfg.GroupID),
}
if repo != nil {
repoRoute := normalizeServiceRoute(repo.route)
if route.ServiceName == "" {
route.ServiceName = repoRoute.ServiceName
}
if route.TableName == "" {
route.TableName = repoRoute.TableName
}
if route.Topic == "" {
route.Topic = repoRoute.Topic
}
if route.GroupID == "" {
route.GroupID = repoRoute.GroupID
}
}
if route.ServiceName != "" {
defaultRoute := DefaultServiceRoute(route.ServiceName)
if route.TableName == "" {
route.TableName = defaultRoute.TableName
}
if route.Topic == "" {
route.Topic = defaultRoute.Topic
}
if route.GroupID == "" {
route.GroupID = defaultRoute.GroupID
}
return normalizeServiceRoute(route)
}
if route.TableName == "" {
route.TableName = DefaultServiceRoute(ServiceNameAgent).TableName
}
if route.Topic == "" {
route.Topic = kafkabus.DefaultTopic
}
if route.GroupID == "" {
route.GroupID = kafkabus.DefaultGroup
}
return normalizeServiceRoute(route)
}

View File

@@ -3,111 +3,175 @@ package outbox
import (
"context"
"errors"
"fmt"
"strings"
"sync"
kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka"
)
// EventPublisher 是通用事件发布能力接口。
//
// 职责边界:
// 1. 只暴露“发布事件”这一件事,隐藏底层 outbox/kafka 实现细节;
// 2. 业务层只依赖该接口,避免直接耦合具体引擎结构体;
// 3. 该接口不承诺“立即消费成功”,只承诺“事件已入队或返回错误”。
type EventPublisher interface {
Publish(ctx context.Context, req PublishRequest) error
}
// EventBus 是 outbox 异步总线的门面对象
// EventBus 是 outbox 多服务引擎的门面。
//
// 设计目的
// 1. 对外提供“发布 + 注册处理器 + 启停”三类最小能力;
// 2. 对内复用 Engine不重复实现状态机和调度逻辑
// 3. 为后续引入更多事件类型提供统一扩展点
// 职责边界
// 1. 对外只暴露“发布、注册 handler、启动、关闭”四类能力;
// 2. 内部按事件归属把调用路由到对应 service engine
// 3. 不再把共享 topic 当主路径,服务级路由始终优先
type EventBus struct {
engine *Engine
repo *Repository
cfg kafkabus.Config
mu sync.RWMutex
engines map[string]*Engine
}
// NewEventBus 创建通用事件总线
// NewEventBus 创建多服务事件门面
//
// 说明:
// 1. kafka.enabled=false 时返回 nil调用方可直接降级为同步模式
// 2. 该方法只创建基础设施对象,不自动注册任何业务事件处理器
// 3. 业务事件处理器注册应由上层在启动阶段显式完成,避免隐式副作用
// 1. kafka.enabled=false 时返回 nil调用方可直接降级
// 2. 实际 service engine 在需要时按服务目录懒加载
// 3. 懒加载不会改变既有事件契约,只是把物理资源拆到各自服务
func NewEventBus(repo *Repository, cfg kafkabus.Config) (*EventBus, error) {
engine, err := NewEngine(repo, cfg)
if !cfg.Enabled {
return nil, nil
}
if repo == nil {
return nil, errors.New("outbox repository is nil")
}
return &EventBus{
repo: repo,
cfg: cfg,
engines: make(map[string]*Engine),
}, nil
}
// RegisterEventHandler 注册事件处理器。
func (b *EventBus) RegisterEventHandler(eventType string, handler MessageHandler) error {
if b == nil {
return errors.New("event bus is not initialized")
}
route, err := b.routeForEvent(eventType)
if err != nil {
return err
}
engine, err := b.ensureEngine(route)
if err != nil {
return err
}
return engine.RegisterEventHandler(eventType, handler)
}
// Publish 把事件路由到对应服务的 outbox 表与 Kafka 资源。
func (b *EventBus) Publish(ctx context.Context, req PublishRequest) error {
if b == nil {
return errors.New("event bus is not initialized")
}
route, err := b.routeForEvent(req.EventType)
if err != nil {
return err
}
engine, err := b.ensureEngine(route)
if err != nil {
return err
}
return engine.Publish(ctx, req)
}
// Start 启动所有已创建的 service engine。
func (b *EventBus) Start(ctx context.Context) {
if b == nil {
return
}
for _, engine := range b.snapshotEngines() {
go engine.Start(ctx)
}
}
// StartDispatch 只启动所有已创建 engine 的 dispatch 循环。
func (b *EventBus) StartDispatch(ctx context.Context) {
if b == nil {
return
}
for _, engine := range b.snapshotEngines() {
go engine.StartDispatch(ctx)
}
}
// StartConsume 只启动所有已创建 engine 的消费循环。
func (b *EventBus) StartConsume(ctx context.Context) {
if b == nil {
return
}
for _, engine := range b.snapshotEngines() {
go engine.StartConsume(ctx)
}
}
// Close 关闭所有 service engine 的 Kafka 资源。
func (b *EventBus) Close() {
if b == nil {
return
}
for _, engine := range b.snapshotEngines() {
engine.Close()
}
}
func (b *EventBus) routeForEvent(eventType string) (ServiceRoute, error) {
route, ok := ResolveEventRoute(eventType)
if !ok {
return ServiceRoute{}, fmt.Errorf("outbox route not registered: eventType=%s", strings.TrimSpace(eventType))
}
return route, nil
}
func (b *EventBus) ensureEngine(route ServiceRoute) (*Engine, error) {
serviceName := route.ServiceName
if serviceName == "" {
return nil, errors.New("serviceName is empty")
}
b.mu.RLock()
if engine, ok := b.engines[serviceName]; ok {
b.mu.RUnlock()
return engine, nil
}
b.mu.RUnlock()
b.mu.Lock()
defer b.mu.Unlock()
if engine, ok := b.engines[serviceName]; ok {
return engine, nil
}
cfg := b.cfg
cfg.ServiceName = serviceName
cfg.Topic = route.Topic
cfg.GroupID = route.GroupID
engine, err := NewEngine(b.repo, cfg)
if err != nil {
return nil, err
}
if engine == nil {
return nil, nil
}
return &EventBus{engine: engine}, nil
b.engines[serviceName] = engine
return engine, nil
}
// RegisterEventHandler 注册事件处理器。
//
// 失败语义:
// 1. bus 未初始化时直接返回错误;
// 2. event_type 为空或 handler 为空时返回错误;
// 3. 重复注册时采用“后者覆盖前者”并打日志(由 Engine 负责)。
func (b *EventBus) RegisterEventHandler(eventType string, handler MessageHandler) error {
if b == nil || b.engine == nil {
return errors.New("event bus is not initialized")
func (b *EventBus) snapshotEngines() []*Engine {
b.mu.RLock()
defer b.mu.RUnlock()
engines := make([]*Engine, 0, len(b.engines))
for _, engine := range b.engines {
engines = append(engines, engine)
}
return b.engine.RegisterEventHandler(eventType, handler)
}
// Publish 发布事件到 outbox 队列。
//
// 关键语义:
// 1. 返回 nil 仅表示“已写入 outbox 成功”;
// 2. 真正 Kafka 投递与业务消费由后台异步循环完成;
// 3. 若返回 error表示本次入队失败调用方应按业务策略决定是否重试/降级。
func (b *EventBus) Publish(ctx context.Context, req PublishRequest) error {
if b == nil || b.engine == nil {
return errors.New("event bus is not initialized")
}
return b.engine.Publish(ctx, req)
}
// Start 启动事件总线后台循环dispatch + consume
func (b *EventBus) Start(ctx context.Context) {
if b == nil || b.engine == nil {
return
}
b.engine.Start(ctx)
}
// StartDispatch 单独启动事件总线的 outbox 投递循环。
//
// 职责边界:
// 1. 只暴露 relay/dispatch 运行职责,便于独立进程只负责投递;
// 2. 不启动消费循环,避免与独立 consumer 进程争抢职责;
// 3. 不改变 Start(ctx) 的既有组合启动行为。
func (b *EventBus) StartDispatch(ctx context.Context) {
if b == nil || b.engine == nil {
return
}
b.engine.StartDispatch(ctx)
}
// StartConsume 单独启动事件总线的 Kafka 消费循环。
//
// 职责边界:
// 1. 只暴露 consumer 运行职责,便于独立进程只负责消费;
// 2. 不扫描 outbox、不投递 Kafka状态推进仍复用 Engine 既有逻辑;
// 3. handler 注册仍由调用方在启动前显式完成。
func (b *EventBus) StartConsume(ctx context.Context) {
if b == nil || b.engine == nil {
return
}
b.engine.StartConsume(ctx)
}
// Close 关闭事件总线资源producer/consumer
func (b *EventBus) Close() {
if b == nil || b.engine == nil {
return
}
b.engine.Close()
return engines
}

View File

@@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/LoveLosita/smartflow/backend/model"
@@ -15,10 +17,11 @@ import (
//
// 职责边界:
// 1. 只负责 outbox 状态流转与通用事务编排;
// 2. 不负责任何业务语义(例如聊天/任务/标题等具体落库)
// 3. 消费成功时通过回调把业务动作注入同一事务,保证原子一致
// 2. 不负责聊天任务、通知等具体业务语义
// 3. 同一仓储实例只面向一个服务级 outbox 目录,避免把共享表当成终态
type Repository struct {
db *gorm.DB
db *gorm.DB
route ServiceRoute
}
func NewRepository(db *gorm.DB) *Repository {
@@ -27,20 +30,51 @@ func NewRepository(db *gorm.DB) *Repository {
// WithTx 用外部事务句柄构造同事务仓储实例。
func (d *Repository) WithTx(tx *gorm.DB) *Repository {
return &Repository{db: tx}
if d == nil {
return &Repository{db: tx}
}
return &Repository{db: tx, route: d.route}
}
// CreateMessage 把事件写入 outbox入队
// WithRoute 用指定服务目录构造服务级仓储
//
// 步骤
// 1. 序列化 payload
// 2. 初始化 pending 状态
// 3. 写入 outbox 并返回 outbox_id
func (d *Repository) CreateMessage(ctx context.Context, eventType, topic, messageKey string, payload any, maxRetry int) (int64, error) {
// 职责边界
// 1. 只切换 outbox 物理目录,不改变事务句柄
// 2. 适合多个 service engine 共享同一 DB 连接
// 3. 保留 route 的 table/topic/group避免回落到共享 topic
func (d *Repository) WithRoute(route ServiceRoute) *Repository {
route = normalizeServiceRoute(route)
if d == nil {
return &Repository{route: route}
}
return &Repository{db: d.db, route: route}
}
// CreateMessage 把事件写入 outbox。
//
// 职责边界:
// 1. 只接受 eventType、messageKey、payload 和 maxRetry不再允许业务侧显式传 topic
// 2. table/topic/group 统一由 eventType -> service -> route 解析,确保服务级路由是唯一入口;
// 3. eventType 未注册时直接返回 error避免消息静默落到默认表或默认 topic。
func (d *Repository) CreateMessage(ctx context.Context, eventType string, messageKey string, payload any, maxRetry int) (int64, error) {
if d == nil || d.db == nil {
return 0, errors.New("outbox repository is nil")
}
eventType = strings.TrimSpace(eventType)
if eventType == "" {
return 0, errors.New("eventType is empty")
}
messageKey = strings.TrimSpace(messageKey)
if maxRetry <= 0 {
maxRetry = 20
}
route, err := d.resolvePublishRoute(eventType)
if err != nil {
return 0, err
}
raw, err := json.Marshal(payload)
if err != nil {
return 0, err
@@ -49,7 +83,8 @@ func (d *Repository) CreateMessage(ctx context.Context, eventType, topic, messag
now := time.Now()
msg := model.AgentOutboxMessage{
EventType: eventType,
Topic: topic,
ServiceName: route.ServiceName,
Topic: route.Topic,
MessageKey: messageKey,
Payload: string(raw),
Status: model.OutboxStatusPending,
@@ -58,39 +93,48 @@ func (d *Repository) CreateMessage(ctx context.Context, eventType, topic, messag
NextRetryAt: &now,
}
if err = d.db.WithContext(ctx).Create(&msg).Error; err != nil {
if err = d.db.WithContext(ctx).Table(route.TableName).Create(&msg).Error; err != nil {
return 0, err
}
return msg.ID, nil
}
// GetByID 从当前仓储绑定的 outbox 表读取指定消息。
func (d *Repository) GetByID(ctx context.Context, id int64) (*model.AgentOutboxMessage, error) {
var msg model.AgentOutboxMessage
if err := d.db.WithContext(ctx).Where("id = ?", id).First(&msg).Error; err != nil {
if err := d.scopedDB(ctx).Where("id = ?", id).First(&msg).Error; err != nil {
return nil, err
}
return &msg, nil
}
// ListDueMessages 拉取到期可投递消息。
func (d *Repository) ListDueMessages(ctx context.Context, limit int) ([]model.AgentOutboxMessage, error) {
//
// 说明:
// 1. serviceName 为空时保持当前仓储目录内的扫描语义;
// 2. serviceName 非空时只扫描对应服务的消息;
// 3. 这样既能支持单服务 relay也能支持后续多服务 relay。
func (d *Repository) ListDueMessages(ctx context.Context, serviceName string, limit int) ([]model.AgentOutboxMessage, error) {
if limit <= 0 {
limit = 100
}
now := time.Now()
var messages []model.AgentOutboxMessage
err := d.db.WithContext(ctx).
query := d.scopedDB(ctx).
Where("status = ? AND next_retry_at IS NOT NULL AND next_retry_at <= ?", model.OutboxStatusPending, now).
Order("next_retry_at ASC, id ASC").
Limit(limit).
Find(&messages).Error
if err != nil {
Limit(limit)
serviceName = strings.TrimSpace(serviceName)
if serviceName != "" {
query = query.Where("service_name = ?", serviceName)
}
if err := query.Find(&messages).Error; err != nil {
return nil, err
}
return messages, nil
}
// MarkPublished 标记为已投递 Kafka。
// MarkPublished 标记消息已成功投递 Kafka。
func (d *Repository) MarkPublished(ctx context.Context, id int64) error {
now := time.Now()
updates := map[string]interface{}{
@@ -99,14 +143,14 @@ func (d *Repository) MarkPublished(ctx context.Context, id int64) error {
"last_error": nil,
"next_retry_at": nil,
}
result := d.db.WithContext(ctx).
result := d.scopedDB(ctx).
Model(&model.AgentOutboxMessage{}).
Where("id = ? AND status NOT IN (?, ?)", id, model.OutboxStatusConsumed, model.OutboxStatusDead).
Updates(updates)
return result.Error
}
// MarkDead 标记为死信。
// MarkDead 把消息标记为死信。
func (d *Repository) MarkDead(ctx context.Context, id int64, reason string) error {
now := time.Now()
lastErr := truncateError(reason)
@@ -116,21 +160,20 @@ func (d *Repository) MarkDead(ctx context.Context, id int64, reason string) erro
"next_retry_at": nil,
"updated_at": now,
}
return d.db.WithContext(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error
return d.scopedDB(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error
}
// MarkFailedForRetry 记录一次可重试失败并推进重试窗口。
//
// 步骤:
// 1. 行级锁读取当前状态;
// 2. 最终态幂等短路;
// 3. retry_count+1
// 4. 计算 next_retry_at 或 dead
// 5. 写回状态快照。
// 1. 行级锁读取当前消息状态;
// 2. 已进入 consumed/dead 时幂等短路;
// 3. retry_count+1,并根据最大次数决定继续 pending 还是转 dead
// 4. 写回 last_error 和 next_retry_at,交给下一轮扫描继续投递。
func (d *Repository) MarkFailedForRetry(ctx context.Context, id int64, reason string) error {
return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var msg model.AgentOutboxMessage
err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", id).First(&msg).Error
err := tx.Table(d.tableName()).Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", id).First(&msg).Error
if err != nil {
return err
}
@@ -159,7 +202,7 @@ func (d *Repository) MarkFailedForRetry(ctx context.Context, id int64, reason st
"next_retry_at": nextRetryAt,
"updated_at": now,
}
return tx.Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error
return tx.Table(d.tableName()).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error
})
}
@@ -167,13 +210,13 @@ func (d *Repository) MarkFailedForRetry(ctx context.Context, id int64, reason st
//
// 步骤:
// 1. 事务内锁定 outbox 记录;
// 2. consumed/dead 幂等返回;
// 3. 执行业务回调 fn(tx)
// 2. consumed/dead 状态幂等返回;
// 3. 执行业务回调 fn(tx),让业务落库和 outbox 状态共用同一事务
// 4. 业务成功后统一标记 consumed。
func (d *Repository) ConsumeAndMarkConsumed(ctx context.Context, outboxID int64, fn func(tx *gorm.DB) error) error {
return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var outboxMsg model.AgentOutboxMessage
err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", outboxID).First(&outboxMsg).Error
err := tx.Table(d.tableName()).Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", outboxID).First(&outboxMsg).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
@@ -198,10 +241,46 @@ func (d *Repository) ConsumeAndMarkConsumed(ctx context.Context, outboxID int64,
"next_retry_at": nil,
"updated_at": now,
}
return tx.Model(&model.AgentOutboxMessage{}).Where("id = ?", outboxID).Updates(updates).Error
return tx.Table(d.tableName()).Model(&model.AgentOutboxMessage{}).Where("id = ?", outboxID).Updates(updates).Error
})
}
func (d *Repository) scopedDB(ctx context.Context) *gorm.DB {
return d.db.WithContext(ctx).Table(d.tableName())
}
func (d *Repository) tableName() string {
if d == nil {
return DefaultServiceRoute(ServiceNameAgent).TableName
}
route := normalizeServiceRoute(d.route)
if route.TableName != "" {
return route.TableName
}
return DefaultServiceRoute(ServiceNameAgent).TableName
}
func (d *Repository) resolvePublishRoute(eventType string) (ServiceRoute, error) {
if d == nil {
return ServiceRoute{}, errors.New("outbox repository is nil")
}
eventType = strings.TrimSpace(eventType)
if eventType == "" {
return ServiceRoute{}, errors.New("eventType is empty")
}
route, ok := ResolveEventRoute(eventType)
if !ok {
return ServiceRoute{}, fmt.Errorf("outbox route not registered: eventType=%s", eventType)
}
if d.route.ServiceName != "" && route.ServiceName != d.route.ServiceName {
return ServiceRoute{}, fmt.Errorf("eventType %s belongs to service %s, current repo service %s", eventType, route.ServiceName, d.route.ServiceName)
}
return normalizeServiceRoute(route), nil
}
func calcRetryBackoff(retryCount int) time.Duration {
if retryCount <= 0 {
return time.Second

View File

@@ -0,0 +1,136 @@
package outbox
import (
"errors"
"fmt"
"strings"
"sync"
)
var outboxRouteRegistry = struct {
sync.RWMutex
eventToService map[string]string
serviceRoutes map[string]ServiceRoute
}{
eventToService: make(map[string]string),
serviceRoutes: make(map[string]ServiceRoute),
}
// RegisterServiceRoute 注册或覆盖某个服务的物理 outbox 路由。
//
// 职责边界:
// 1. 只登记“服务 -> table/topic/group”目录不登记事件归属
// 2. 同服务重复注册时以后者覆盖前者,方便显式配置覆盖默认目录;
// 3. 空服务名直接报错,避免把共享 topic 误当成新终态。
func RegisterServiceRoute(route ServiceRoute) error {
route = normalizeServiceRoute(route)
if route.ServiceName == "" {
return errors.New("serviceName is empty")
}
outboxRouteRegistry.Lock()
defer outboxRouteRegistry.Unlock()
outboxRouteRegistry.serviceRoutes[route.ServiceName] = route
return nil
}
// RegisterEventService 记录“事件类型 -> 服务归属”的全局路由。
//
// 职责边界:
// 1. 只登记跨进程都要识别的事件归属,不承载 handler 逻辑;
// 2. 同一 event_type 只能归属一个服务,重复登记同值视为幂等;
// 3. 若该服务还没有显式路由,则先写入默认服务目录,保证后续能查到 table/topic/group。
func RegisterEventService(eventType, serviceName string) error {
eventType = strings.TrimSpace(eventType)
if eventType == "" {
return errors.New("eventType is empty")
}
serviceName = normalizeServiceName(serviceName)
if serviceName == "" {
return errors.New("serviceName is empty")
}
outboxRouteRegistry.Lock()
defer outboxRouteRegistry.Unlock()
if existing, ok := outboxRouteRegistry.eventToService[eventType]; ok {
if existing != serviceName {
return fmt.Errorf("eventType %s already registered to service %s", eventType, existing)
}
return nil
}
outboxRouteRegistry.eventToService[eventType] = serviceName
return nil
}
// ResolveEventService 查询某个事件类型的归属服务。
//
// 返回值说明:
// 1. serviceName 为登记结果;
// 2. ok=false 表示当前路由表里还没有这个事件类型的归属信息。
func ResolveEventService(eventType string) (serviceName string, ok bool) {
eventType = strings.TrimSpace(eventType)
if eventType == "" {
return "", false
}
outboxRouteRegistry.RLock()
defer outboxRouteRegistry.RUnlock()
serviceName, ok = outboxRouteRegistry.eventToService[eventType]
return serviceName, ok
}
// ResolveServiceRoute 查询某个服务的物理 outbox 配置。
//
// 返回值说明:
// 1. route 始终返回一个可执行的目录结果,未显式注册时回退默认目录;
// 2. ok=true 表示命中显式注册目录ok=false 表示走默认目录;
// 3. 这样既能支持显式配置覆盖,也能让基础设施在启动初期就有稳定默认值。
func ResolveServiceRoute(serviceName string) (route ServiceRoute, ok bool) {
serviceName = normalizeServiceName(serviceName)
if serviceName == "" {
return DefaultServiceRoute(""), false
}
outboxRouteRegistry.RLock()
route, ok = outboxRouteRegistry.serviceRoutes[serviceName]
outboxRouteRegistry.RUnlock()
if ok {
return normalizeServiceRoute(route), true
}
if route, ok = configuredServiceRoute(serviceName); ok {
return route, true
}
return DefaultServiceRoute(serviceName), false
}
// ResolveEventRoute 先按事件查服务,再按服务查物理目录。
//
// 返回值说明:
// 1. route 包含事件所在服务的 table/topic/group
// 2. ok=true 只表示“事件 -> 服务归属”已登记;
// 3. 服务目录若未显式注册,会自动回退到默认目录。
func ResolveEventRoute(eventType string) (route ServiceRoute, ok bool) {
serviceName, ok := ResolveEventService(eventType)
if !ok {
return ServiceRoute{}, false
}
route, _ = ResolveServiceRoute(serviceName)
return route, true
}
func configuredServiceRoute(serviceName string) (ServiceRoute, bool) {
cfg, ok := ResolveServiceConfig(serviceName)
if !ok {
return ServiceRoute{}, false
}
return normalizeServiceRoute(ServiceRoute{
ServiceName: cfg.Name,
TableName: cfg.TableName,
Topic: cfg.Topic,
GroupID: cfg.GroupID,
}), true
}

View File

@@ -0,0 +1,168 @@
package outbox
import (
"fmt"
"sort"
"strings"
"sync"
"github.com/spf13/viper"
)
const (
ServiceAgent = "agent"
ServiceTask = "task"
ServiceMemory = "memory"
ServiceActiveScheduler = "active-scheduler"
ServiceNotification = "notification"
)
// ServiceConfig 描述一个服务级 outbox 的固定归属。
//
// 职责边界:
// 1. 只描述“事件属于哪个服务、写哪张表、发哪个 topic、用哪个 group”。
// 2. 不承载具体业务 handler也不承载 Kafka 消息体格式。
// 3. 服务级写入、扫描和消费都应从这里读取同一份映射,避免配置漂移。
type ServiceConfig struct {
Name string
Topic string
GroupID string
TableName string
}
var serviceCatalogCache = struct {
sync.RWMutex
loaded bool
entries map[string]ServiceConfig
}{
entries: make(map[string]ServiceConfig),
}
// LoadServiceConfigs 读取服务级 outbox 目录。
//
// 说明:
// 1. 先给出默认终态映射,再允许通过配置中心覆盖 topic/groupID/table
// 2. 该目录只负责服务级 outbox 基础设施,不混入业务逻辑;
// 3. 若某个服务配置缺失,直接使用默认值,避免启动期因为非关键配置崩掉。
func LoadServiceConfigs() map[string]ServiceConfig {
serviceCatalogCache.Lock()
defer serviceCatalogCache.Unlock()
if serviceCatalogCache.loaded {
return cloneServiceConfigs(serviceCatalogCache.entries)
}
entries := map[string]ServiceConfig{
ServiceAgent: {
Name: ServiceAgent,
Topic: "smartflow.agent.outbox",
GroupID: "smartflow-agent-outbox-consumer",
TableName: "agent_outbox_messages",
},
ServiceTask: {
Name: ServiceTask,
Topic: "smartflow.task.outbox",
GroupID: "smartflow-task-outbox-consumer",
TableName: "task_outbox_messages",
},
ServiceMemory: {
Name: ServiceMemory,
Topic: "smartflow.memory.outbox",
GroupID: "smartflow-memory-outbox-consumer",
TableName: "memory_outbox_messages",
},
ServiceActiveScheduler: {
Name: ServiceActiveScheduler,
Topic: "smartflow.active-scheduler.outbox",
GroupID: "smartflow-active-scheduler-outbox-consumer",
TableName: "active_scheduler_outbox_messages",
},
ServiceNotification: {
Name: ServiceNotification,
Topic: "smartflow.notification.outbox",
GroupID: "smartflow-notification-outbox-consumer",
TableName: "notification_outbox_messages",
},
}
for name, entry := range entries {
entries[name] = overrideServiceConfig(entry)
}
serviceCatalogCache.entries = entries
serviceCatalogCache.loaded = true
return cloneServiceConfigs(entries)
}
// ResolveServiceConfig 查询某个服务的 outbox 目录。
func ResolveServiceConfig(serviceName string) (ServiceConfig, bool) {
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" {
return ServiceConfig{}, false
}
entries := LoadServiceConfigs()
cfg, ok := entries[serviceName]
return cfg, ok
}
// ResolveEventServiceConfig 先解析事件归属服务,再返回该服务的 outbox 目录。
func ResolveEventServiceConfig(eventType string) (ServiceConfig, bool) {
serviceName, ok := ResolveEventService(eventType)
if !ok {
return ServiceConfig{}, false
}
return ResolveServiceConfig(serviceName)
}
// ServiceTables 返回当前目录中的所有 outbox 表名。
func ServiceTables() []string {
entries := LoadServiceConfigs()
tables := make([]string, 0, len(entries))
for _, entry := range entries {
tables = append(tables, entry.TableName)
}
sort.Strings(tables)
return tables
}
// ServiceNames 返回当前目录中的所有服务名。
func ServiceNames() []string {
entries := LoadServiceConfigs()
names := make([]string, 0, len(entries))
for name := range entries {
names = append(names, name)
}
sort.Strings(names)
return names
}
func overrideServiceConfig(entry ServiceConfig) ServiceConfig {
upperName := strings.TrimSpace(entry.Name)
if upperName == "" {
return entry
}
topicKey := fmt.Sprintf("outbox.services.%s.topic", upperName)
groupKey := fmt.Sprintf("outbox.services.%s.groupID", upperName)
tableKey := fmt.Sprintf("outbox.services.%s.table", upperName)
if topic := strings.TrimSpace(viper.GetString(topicKey)); topic != "" {
entry.Topic = topic
}
if groupID := strings.TrimSpace(viper.GetString(groupKey)); groupID != "" {
entry.GroupID = groupID
}
if tableName := strings.TrimSpace(viper.GetString(tableKey)); tableName != "" {
entry.TableName = tableName
}
return entry
}
func cloneServiceConfigs(entries map[string]ServiceConfig) map[string]ServiceConfig {
cloned := make(map[string]ServiceConfig, len(entries))
for name, entry := range entries {
cloned[name] = entry
}
return cloned
}

View File

@@ -0,0 +1,145 @@
package outbox
import (
"strings"
)
const (
ServiceNameAgent = "agent"
ServiceNameTask = "task"
ServiceNameMemory = "memory"
ServiceNameActiveScheduler = "active-scheduler"
ServiceNameNotification = "notification"
)
// ServiceRoute 描述一个 outbox 服务的终态路由信息。
//
// 职责边界:
// 1. 只承载服务级 outbox 的 table/topic/group 目录信息;
// 2. 不承载 handler、事务或 Kafka 连接对象;
// 3. 允许上层按事件类型先查服务,再由服务查到自己的物理资源。
type ServiceRoute struct {
ServiceName string
TableName string
Topic string
GroupID string
}
var builtinServiceRoutes = map[string]ServiceRoute{
ServiceNameAgent: {
ServiceName: ServiceNameAgent,
TableName: "agent_outbox_messages",
Topic: "smartflow.agent.outbox",
GroupID: "smartflow-agent-outbox-consumer",
},
ServiceNameTask: {
ServiceName: ServiceNameTask,
TableName: "task_outbox_messages",
Topic: "smartflow.task.outbox",
GroupID: "smartflow-task-outbox-consumer",
},
ServiceNameMemory: {
ServiceName: ServiceNameMemory,
TableName: "memory_outbox_messages",
Topic: "smartflow.memory.outbox",
GroupID: "smartflow-memory-outbox-consumer",
},
ServiceNameActiveScheduler: {
ServiceName: ServiceNameActiveScheduler,
TableName: "active_scheduler_outbox_messages",
Topic: "smartflow.active-scheduler.outbox",
GroupID: "smartflow-active-scheduler-outbox-consumer",
},
ServiceNameNotification: {
ServiceName: ServiceNameNotification,
TableName: "notification_outbox_messages",
Topic: "smartflow.notification.outbox",
GroupID: "smartflow-notification-outbox-consumer",
},
}
// DefaultServiceRoutes 返回当前已知服务的默认路由清单。
//
// 说明:
// 1. 这里是“目录初始值”,用于自动建表和首次注册时兜底;
// 2. 运行时若显式注册了服务路由,会以显式注册结果为准;
// 3. 返回值是拷贝,调用方可安全遍历,不会污染全局目录。
func DefaultServiceRoutes() []ServiceRoute {
return []ServiceRoute{
builtinServiceRoutes[ServiceNameAgent],
builtinServiceRoutes[ServiceNameTask],
builtinServiceRoutes[ServiceNameMemory],
builtinServiceRoutes[ServiceNameActiveScheduler],
builtinServiceRoutes[ServiceNameNotification],
}
}
// DefaultServiceRoute 根据服务名生成终态路由。
//
// 规则:
// 1. 已知服务直接返回约定映射;
// 2. 未知服务按命名约定生成 table/topic/group避免继续落回共享 topic
// 3. 空服务名回退到 agent 兼容路径,保住历史单体模式。
func DefaultServiceRoute(serviceName string) ServiceRoute {
serviceName = normalizeServiceName(serviceName)
if serviceName == "" {
serviceName = ServiceNameAgent
}
if route, ok := builtinServiceRoutes[serviceName]; ok {
return route
}
tablePrefix := strings.NewReplacer("-", "_").Replace(serviceName)
if tablePrefix == "" {
tablePrefix = ServiceNameAgent
}
return ServiceRoute{
ServiceName: serviceName,
TableName: tablePrefix + "_outbox_messages",
Topic: "smartflow." + serviceName + ".outbox",
GroupID: "smartflow-" + serviceName + "-outbox-consumer",
}
}
func normalizeServiceName(serviceName string) string {
return strings.TrimSpace(serviceName)
}
// normalizeServiceRoute 把空字段补成可执行的默认值。
//
// 说明:
// 1. 只做字符串裁剪和缺省补齐,不做注册副作用;
// 2. 服务名为空时只保留历史兼容路径,不强行把它当成新服务;
// 3. 这一步是 route 目录的最后一道兜底,避免上层拿到半成品路由。
func normalizeServiceRoute(route ServiceRoute) ServiceRoute {
route.ServiceName = normalizeServiceName(route.ServiceName)
route.TableName = strings.TrimSpace(route.TableName)
route.Topic = strings.TrimSpace(route.Topic)
route.GroupID = strings.TrimSpace(route.GroupID)
if route.ServiceName == "" {
if route.TableName == "" {
route.TableName = builtinServiceRoutes[ServiceNameAgent].TableName
}
if route.Topic == "" {
route.Topic = builtinServiceRoutes[ServiceNameAgent].Topic
}
if route.GroupID == "" {
route.GroupID = builtinServiceRoutes[ServiceNameAgent].GroupID
}
return route
}
defaultRoute := DefaultServiceRoute(route.ServiceName)
if route.TableName == "" {
route.TableName = defaultRoute.TableName
}
if route.Topic == "" {
route.Topic = defaultRoute.Topic
}
if route.GroupID == "" {
route.GroupID = defaultRoute.GroupID
}
return route
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"log"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
"github.com/LoveLosita/smartflow/backend/model"
"github.com/spf13/viper"
"gorm.io/driver/mysql"
@@ -26,7 +27,6 @@ func autoMigrateModels(db *gorm.DB) error {
&model.ActiveSchedulePreview{},
&model.NotificationRecord{},
&model.UserNotificationChannel{},
&model.AgentOutboxMessage{},
&model.AgentScheduleState{},
&model.ActiveScheduleSession{},
&model.AgentStateSnapshotRecord{},
@@ -41,12 +41,36 @@ func autoMigrateModels(db *gorm.DB) error {
return fmt.Errorf("auto migrate failed for %T: %w", m, err)
}
}
if err := autoMigrateOutboxTables(db); err != nil {
return err
}
if err := backfillAutoMigrateData(db); err != nil {
return err
}
return nil
}
// autoMigrateOutboxTables 按服务目录一次性创建各服务的 outbox 物理表。
//
// 职责边界:
// 1. 只创建 outbox 目录,不改写业务表;
// 2. 每张表都复用同一套模型结构,保证字段和索引一致;
// 3. 这里显式列出服务目录,避免把共享单表误当成终态。
func autoMigrateOutboxTables(db *gorm.DB) error {
// 1. 这里必须按服务目录读取最终生效的 table 名,而不能只看默认内置映射。
// 2. 这样即使后续通过配置覆盖 outbox.services.*.table启动建表也会和运行时写入保持一致。
for _, serviceName := range outboxinfra.ServiceNames() {
cfg, ok := outboxinfra.ResolveServiceConfig(serviceName)
if !ok {
return fmt.Errorf("resolve outbox config failed for service %s", serviceName)
}
if err := db.Table(cfg.TableName).AutoMigrate(&model.AgentOutboxMessage{}); err != nil {
return fmt.Errorf("auto migrate outbox table failed for %s (%s): %w", cfg.Name, cfg.TableName, err)
}
}
return nil
}
// backfillAutoMigrateData 补齐 AutoMigrate 无法表达的条件回填。
//
// 职责边界:

View File

@@ -4,6 +4,8 @@ import (
"github.com/LoveLosita/smartflow/backend/cmd"
)
// main 保留仓库根入口的兼容壳,阶段 0 期间仍转发到 cmd.Start()。
// 终态会逐步迁移为各服务各自的独立 main.go。
func main() {
cmd.Start()
}

View File

@@ -22,10 +22,11 @@ const (
type AgentOutboxMessage struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
EventType string `gorm:"column:biz_type;type:varchar(64);not null;index:idx_outbox_status_next,priority:3;comment:事件类型"`
Topic string `gorm:"column:topic;type:varchar(128);not null;comment:Kafka Topic"`
MessageKey string `gorm:"column:message_key;type:varchar(128);not null;comment:Kafka 消息键"`
Payload string `gorm:"column:payload;type:longtext;not null;comment:业务载荷(JSON)"`
EventType string `gorm:"column:biz_type;type:varchar(64);not null;index:idx_outbox_status_next,priority:3;comment:事件类型"`
ServiceName string `gorm:"column:service_name;type:varchar(64);not null;default:'';index:idx_outbox_service_name,priority:1;comment:所属服务"`
Topic string `gorm:"column:topic;type:varchar(128);not null;comment:Kafka Topic"`
MessageKey string `gorm:"column:message_key;type:varchar(128);not null;comment:Kafka 消息键"`
Payload string `gorm:"column:payload;type:longtext;not null;comment:业务载荷(JSON)"`
Status string `gorm:"column:status;type:varchar(32);not null;index:idx_outbox_status_next,priority:1;comment:pending/published/consumed/dead"`
RetryCount int `gorm:"column:retry_count;not null;default:0;comment:已重试次数"`
@@ -40,5 +41,8 @@ type AgentOutboxMessage struct {
}
func (AgentOutboxMessage) TableName() string {
// 1. 这里保留历史兼容默认表名,避免非 outbox 基础设施调用直接失效。
// 2. 服务级多表路由由 backend/infra/outbox 显式通过 db.Table(...) 控制。
// 3. 这样既能兼容旧代码,也不会把共享单表当成终态。
return "agent_outbox_messages"
}

View File

@@ -1,9 +1,13 @@
// Package routers 路由配置
// 定义所有HTTP路由和路由组
// 定义所有 HTTP 路由和路由组
package routers
import (
"context"
"errors"
"log"
"net/http"
"time"
"github.com/LoveLosita/smartflow/backend/api"
"github.com/LoveLosita/smartflow/backend/dao"
@@ -13,23 +17,45 @@ import (
"github.com/spf13/viper"
)
// StartEngine 注册路由
func StartEngine(r *gin.Engine) {
// 从配置中获取端口
// StartEngine 启动 HTTP 服务,并在上下文取消时尽量优雅退出。
func StartEngine(ctx context.Context, r *gin.Engine) {
// 1. 先解析端口,保持和历史行为一致。
// 2. 再用 http.Server 托管 gin engine方便在取消信号到来时执行 Shutdown。
port := viper.GetString("server.port")
if port == "" {
port = "8080" // 默认端口
port = "8080"
}
// 启动服务器
log.Printf("Server starting on port %s...", port)
if err := r.Run(":" + port); err != nil {
log.Fatalf("Failed to start server: %v", err)
srv := &http.Server{
Addr: ":" + port,
Handler: r,
}
errCh := make(chan error, 1)
go func() {
log.Printf("Server starting on port %s...", port)
errCh <- srv.ListenAndServe()
}()
select {
case <-ctx.Done():
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(shutdownCtx); err != nil && !errors.Is(err, context.Canceled) {
log.Printf("Failed to shutdown server gracefully: %v", err)
}
if err := <-errCh; err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("Failed to start server: %v", err)
}
case err := <-errCh:
if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("Failed to start server: %v", err)
}
}
}
func RegisterRouters(handlers *api.ApiHandlers, cache *dao.CacheDAO, userRepo *dao.UserDAO, limiter *pkg.RateLimiter) *gin.Engine {
// 初始化Gin引擎
// 初始化 Gin 引擎
r := gin.Default()
// 在这里注册所有的路由和路由组
apiGroup := r.Group("/api/v1")
@@ -130,7 +156,7 @@ func RegisterRouters(handlers *api.ApiHandlers, cache *dao.CacheDAO, userRepo *d
notificationGroup.POST("/channels/feishu/test", handlers.Notification.TestFeishuWebhook)
}
}
// 初始化Gin引擎
// 初始化 Gin 引擎
log.Println("Routes setup completed")
return r
}

View File

@@ -32,7 +32,7 @@ type ActiveScheduleTriggeredProcessor interface {
// 3. 若事务返回 error则 best-effort 回写 trigger failed并把错误交给 outbox 做 retry
// 4. 这里不直接 import active_scheduler 的具体实现,避免 service/events 和业务编排层互相反向耦合。
func RegisterActiveScheduleTriggeredHandler(
bus *outboxinfra.EventBus,
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
processor ActiveScheduleTriggeredProcessor,
) error {
@@ -45,24 +45,28 @@ func RegisterActiveScheduleTriggeredHandler(
if processor == nil {
return errors.New("active schedule triggered processor is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, sharedevents.ActiveScheduleTriggeredEventType)
if err != nil {
return err
}
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
if !isAllowedTriggeredEventVersion(envelope.EventVersion) {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, fmt.Sprintf("active_schedule.triggered 版本不受支持: %s", envelope.EventVersion))
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, fmt.Sprintf("active_schedule.triggered 版本不受支持: %s", envelope.EventVersion))
return nil
}
var payload sharedevents.ActiveScheduleTriggeredPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析 active_schedule.triggered 载荷失败: "+unmarshalErr.Error())
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析 active_schedule.triggered 载荷失败: "+unmarshalErr.Error())
return nil
}
if validateErr := payload.Validate(); validateErr != nil {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "active_schedule.triggered 载荷非法: "+validateErr.Error())
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "active_schedule.triggered 载荷非法: "+validateErr.Error())
return nil
}
err := outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
err := eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
return processor.ProcessTriggeredInTx(ctx, tx, payload)
})
if err != nil {

View File

@@ -35,7 +35,7 @@ type AgentStateSnapshotPayload struct {
// 2. 使用 upsert 语义,同一 conversation_id 只保留最新快照;
// 3. 通过 outbox 通用消费事务保证"业务写入 + consumed 推进"原子一致。
func RegisterAgentStateSnapshotHandler(
bus *outboxinfra.EventBus,
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
) error {
@@ -48,15 +48,19 @@ func RegisterAgentStateSnapshotHandler(
if repoManager == nil {
return errors.New("repo manager is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeAgentStateSnapshotPersist)
if err != nil {
return err
}
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
var payload AgentStateSnapshotPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析快照载荷失败: "+unmarshalErr.Error())
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析快照载荷失败: "+unmarshalErr.Error())
return nil
}
return outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
record := model.AgentStateSnapshotRecord{
ConversationID: payload.ConversationID,
UserID: payload.UserID,

View File

@@ -25,7 +25,7 @@ const EventTypeAgentTimelinePersistRequested = "agent.timeline.persist.requested
// 3. 通过 outbox 通用消费事务,把“时间线写库 + consumed 推进”放进同一事务;
// 4. 若遇到 seq 唯一键冲突,会先判定是否属于重放幂等,再决定是否补新 seq 并回填 Redis。
func RegisterAgentTimelinePersistHandler(
bus *outboxinfra.EventBus,
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
agentRepo *dao.AgentDAO,
cacheDAO *dao.CacheDAO,
@@ -40,12 +40,16 @@ func RegisterAgentTimelinePersistHandler(
if agentRepo == nil {
return errors.New("agent repo is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeAgentTimelinePersistRequested)
if err != nil {
return err
}
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
var payload model.ChatTimelinePersistPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
// 1. payload 无法反序列化属于不可恢复错误,直接标 dead避免无意义重试。
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析时间线持久化载荷失败: "+unmarshalErr.Error())
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析时间线持久化载荷失败: "+unmarshalErr.Error())
return nil
}
@@ -53,7 +57,7 @@ func RegisterAgentTimelinePersistHandler(
if !payload.HasValidIdentity() {
// 2. 这里只校验“能否唯一定位一条 timeline 记录”的最小字段集合。
// 3. content / payload_json 是否为空由事件类型自行决定,不在这里一刀切限制。
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "时间线持久化载荷非法: user_id/conversation_id/seq/kind 非法")
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "时间线持久化载荷非法: user_id/conversation_id/seq/kind 非法")
return nil
}
@@ -61,7 +65,7 @@ func RegisterAgentTimelinePersistHandler(
finalSeq := payload.Seq
// 4. 统一走 outbox 消费事务入口,保证“业务写入成功 -> consumed”原子一致。
err := outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
err := eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
finalPayload, repaired, persistErr := persistConversationTimelineEventInTx(ctx, tx, agentRepo.WithTx(tx), payload)
if persistErr != nil {
return persistErr

View File

@@ -30,7 +30,7 @@ const (
// 3. 通过 outbox 通用事务入口把"业务写入 + consumed 推进"合并为一个事务;
// 4. 当前版本仅注册新路由键chat.history.persist.requested不再注册旧兼容键。
func RegisterChatHistoryPersistHandler(
bus *outboxinfra.EventBus,
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
) error {
@@ -44,6 +44,10 @@ func RegisterChatHistoryPersistHandler(
if repoManager == nil {
return errors.New("repo manager is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeChatHistoryPersistRequested)
if err != nil {
return err
}
// 2. 定义统一处理器:
// 2.1 解析 payload
@@ -53,12 +57,12 @@ func RegisterChatHistoryPersistHandler(
var payload model.ChatHistoryPersistPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
// 2.1 payload 非法属于不可恢复错误,直接标 dead避免无意义重试。
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析聊天持久化载荷失败: "+unmarshalErr.Error())
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析聊天持久化载荷失败: "+unmarshalErr.Error())
return nil
}
// 2.2 使用 outbox 通用消费事务,保证"业务写入 + consumed 状态推进"原子一致。
return outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
// 2.2.1 基于同一个 tx 构造 RepoManager复用你现有跨包事务模型。
txM := repoManager.WithTx(tx)
// 2.2.2 在同事务内写入聊天历史与会话计数。

View File

@@ -30,7 +30,7 @@ const (
// 2. 通过 outbox 统一消费事务入口,保证“业务成功 + consumed 推进”原子一致;
// 3. 非法载荷直接标记 dead避免无意义重试。
func RegisterChatTokenUsageAdjustHandler(
bus *outboxinfra.EventBus,
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
) error {
@@ -43,20 +43,24 @@ func RegisterChatTokenUsageAdjustHandler(
if repoManager == nil {
return errors.New("repo manager is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeChatTokenUsageAdjustRequested)
if err != nil {
return err
}
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
var payload model.ChatTokenUsageAdjustPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析会话 token 调整载荷失败: "+unmarshalErr.Error())
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析会话 token 调整载荷失败: "+unmarshalErr.Error())
return nil
}
if payload.UserID <= 0 || payload.TokensDelta <= 0 || payload.ConversationID == "" {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "会话 token 调整载荷无效: user_id/conversation_id/tokens_delta 非法")
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "会话 token 调整载荷无效: user_id/conversation_id/tokens_delta 非法")
return nil
}
return outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
txM := repoManager.WithTx(tx)
return txM.Agent.AdjustTokenUsageInTx(ctx, payload.UserID, payload.ConversationID, payload.TokensDelta)
})

View File

@@ -2,11 +2,12 @@ package events
import (
"errors"
"fmt"
"github.com/LoveLosita/smartflow/backend/dao"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
"github.com/LoveLosita/smartflow/backend/memory"
"github.com/LoveLosita/smartflow/backend/notification"
sharedevents "github.com/LoveLosita/smartflow/backend/shared/events"
)
// RegisterCoreOutboxHandlers 注册核心业务 outbox handler。
@@ -15,9 +16,9 @@ import (
// 1. 只负责聚合注册当前核心业务 handler便于 start / worker/all 等启动入口复用同一套接线顺序。
// 2. 不负责创建 eventBus/outboxRepo/DAO/memoryModule也不负责启动或关闭事件总线。
// 3. 不改变单个 Register* 函数的职责;具体 payload 解析、幂等消费和业务落库仍由各自 handler 负责。
// 4. 入口先完整校验依赖,避免注册到一半才发现依赖缺失,导致事件总线处于半注册状态
// 4. 这里以显式 route table 的方式列出“事件类型 -> 服务归属 -> handler”避免后续新增事件时只改启动入口不改接线表
func RegisterCoreOutboxHandlers(
eventBus *outboxinfra.EventBus,
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
@@ -28,28 +29,39 @@ func RegisterCoreOutboxHandlers(
return err
}
// 1. 按照现有 start.go 的接线顺序注册,保证迁移到 worker/all 后消费行为不发生隐式变化。
// 2. 每一步只包一层业务语义错误,便于启动日志直接定位是哪类 handler 注册失败。
if err := RegisterChatHistoryPersistHandler(eventBus, outboxRepo, repoManager); err != nil {
return fmt.Errorf("注册聊天历史持久化 handler 失败: %w", err)
}
if err := RegisterTaskUrgencyPromoteHandler(eventBus, outboxRepo, repoManager); err != nil {
return fmt.Errorf("注册任务紧急度平移 handler 失败: %w", err)
}
if err := RegisterChatTokenUsageAdjustHandler(eventBus, outboxRepo, repoManager); err != nil {
return fmt.Errorf("注册会话 token 调整 handler 失败: %w", err)
}
if err := RegisterAgentStateSnapshotHandler(eventBus, outboxRepo, repoManager); err != nil {
return fmt.Errorf("注册 agent 状态快照 handler 失败: %w", err)
}
if err := RegisterAgentTimelinePersistHandler(eventBus, outboxRepo, agentRepo, cacheRepo); err != nil {
return fmt.Errorf("注册 agent 时间线持久化 handler 失败: %w", err)
}
if err := RegisterMemoryExtractRequestedHandler(eventBus, outboxRepo, memoryModule); err != nil {
return fmt.Errorf("注册记忆抽取 handler 失败: %w", err)
return registerOutboxHandlerRoutes(coreOutboxHandlerRoutes(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule))
}
// RegisterAllOutboxHandlers 注册当前阶段所有 outbox handler。
//
// 职责边界:
// 1. 只负责把 core / active_scheduler / notification 三类路由一次性接线;
// 2. 不负责创建依赖,也不负责启动事件总线;
// 3. 供当前启动流程在“总线启动前”统一完成显式路由注册。
func RegisterAllOutboxHandlers(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
activeTriggerWorkflow ActiveScheduleTriggeredProcessor,
notificationService *notification.NotificationService,
) error {
if err := validateAllOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule, activeTriggerWorkflow, notificationService); err != nil {
return err
}
return nil
return registerOutboxHandlerRoutes(allOutboxHandlerRoutes(
eventBus,
outboxRepo,
repoManager,
agentRepo,
cacheRepo,
memoryModule,
activeTriggerWorkflow,
notificationService,
))
}
// validateCoreOutboxHandlerDeps 校验核心 outbox handler 聚合注册所需依赖。
@@ -58,7 +70,7 @@ func RegisterCoreOutboxHandlers(
// 1. 只做 nil 校验不做数据库、Redis、Kafka 连通性探测,避免注册函数承担启动健康检查职责。
// 2. 返回 error 表示依赖缺失;返回 nil 表示可以安全进入逐项注册流程。
func validateCoreOutboxHandlerDeps(
eventBus *outboxinfra.EventBus,
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
@@ -85,3 +97,112 @@ func validateCoreOutboxHandlerDeps(
}
return nil
}
// validateAllOutboxHandlerDeps 在核心依赖基础上,额外校验 active_scheduler 和 notification 相关依赖。
func validateAllOutboxHandlerDeps(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
activeTriggerWorkflow ActiveScheduleTriggeredProcessor,
notificationService *notification.NotificationService,
) error {
if err := validateCoreOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule); err != nil {
return err
}
if activeTriggerWorkflow == nil {
return errors.New("active schedule triggered processor is nil")
}
if notificationService == nil {
return errors.New("notification service is nil")
}
return nil
}
// coreOutboxHandlerRoutes 只描述 core 阶段的 outbox 路由。
func coreOutboxHandlerRoutes(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
) []outboxHandlerRoute {
return []outboxHandlerRoute{
{
EventType: EventTypeChatHistoryPersistRequested,
Service: outboxHandlerServiceAgent,
Register: func() error {
return RegisterChatHistoryPersistHandler(eventBus, outboxRepo, repoManager)
},
},
{
EventType: EventTypeTaskUrgencyPromoteRequested,
Service: outboxHandlerServiceTask,
Register: func() error {
return RegisterTaskUrgencyPromoteHandler(eventBus, outboxRepo, repoManager)
},
},
{
EventType: EventTypeChatTokenUsageAdjustRequested,
Service: outboxHandlerServiceAgent,
Register: func() error {
return RegisterChatTokenUsageAdjustHandler(eventBus, outboxRepo, repoManager)
},
},
{
EventType: EventTypeAgentStateSnapshotPersist,
Service: outboxHandlerServiceAgent,
Register: func() error {
return RegisterAgentStateSnapshotHandler(eventBus, outboxRepo, repoManager)
},
},
{
EventType: EventTypeAgentTimelinePersistRequested,
Service: outboxHandlerServiceAgent,
Register: func() error {
return RegisterAgentTimelinePersistHandler(eventBus, outboxRepo, agentRepo, cacheRepo)
},
},
{
EventType: EventTypeMemoryExtractRequested,
Service: outboxHandlerServiceMemory,
Register: func() error {
return RegisterMemoryExtractRequestedHandler(eventBus, outboxRepo, memoryModule)
},
},
}
}
// allOutboxHandlerRoutes 把当前阶段所有 outbox 路由一次性展开,供启动入口统一接线。
func allOutboxHandlerRoutes(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
activeTriggerWorkflow ActiveScheduleTriggeredProcessor,
notificationService *notification.NotificationService,
) []outboxHandlerRoute {
routes := coreOutboxHandlerRoutes(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule)
routes = append(routes,
outboxHandlerRoute{
EventType: sharedevents.ActiveScheduleTriggeredEventType,
Service: outboxHandlerServiceActiveScheduler,
Register: func() error {
return RegisterActiveScheduleTriggeredHandler(eventBus, outboxRepo, activeTriggerWorkflow)
},
},
outboxHandlerRoute{
EventType: sharedevents.NotificationFeishuRequestedEventType,
Service: outboxHandlerServiceNotification,
Register: func() error {
return RegisterFeishuNotificationHandler(eventBus, outboxRepo, notificationService)
},
},
)
return routes
}

View File

@@ -33,7 +33,7 @@ const (
// 2. 不在消费回调里执行 LLM 重计算;
// 3. 通过 memory.Module.WithTx(tx) 复用同一套接入门面,保证事务边界仍由 outbox 掌控。
func RegisterMemoryExtractRequestedHandler(
bus *outboxinfra.EventBus,
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
memoryModule *memory.Module,
) error {
@@ -46,20 +46,24 @@ func RegisterMemoryExtractRequestedHandler(
if memoryModule == nil {
return errors.New("memory module is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeMemoryExtractRequested)
if err != nil {
return err
}
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
var payload model.MemoryExtractRequestedPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析记忆抽取载荷失败: "+unmarshalErr.Error())
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析记忆抽取载荷失败: "+unmarshalErr.Error())
return nil
}
if validateErr := validateMemoryExtractPayload(payload); validateErr != nil {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "记忆抽取载荷非法: "+validateErr.Error())
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "记忆抽取载荷非法: "+validateErr.Error())
return nil
}
return outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
jobPayload := memorymodel.ExtractJobPayload{
UserID: payload.UserID,
ConversationID: strings.TrimSpace(payload.ConversationID),
@@ -87,7 +91,7 @@ func RegisterMemoryExtractRequestedHandler(
func EnqueueMemoryExtractRequestedInTx(
ctx context.Context,
outboxRepo *outboxinfra.Repository,
kafkaCfg kafkabus.Config,
maxRetry int,
chatPayload model.ChatHistoryPersistPayload,
) error {
if !isMemoryWriteEnabled() {
@@ -107,6 +111,10 @@ func EnqueueMemoryExtractRequestedInTx(
return err
}
if maxRetry <= 0 {
maxRetry = 20
}
outboxPayload := outboxinfra.OutboxEventPayload{
EventType: EventTypeMemoryExtractRequested,
EventVersion: outboxinfra.DefaultEventVersion,
@@ -114,13 +122,14 @@ func EnqueueMemoryExtractRequestedInTx(
Payload: payloadJSON,
}
// 1. 这里只传 eventType 与消息键服务归属、outbox 表和 Kafka topic 统一交给仓库路由层解析。
// 2. 这样聊天持久化链路不会继续感知 memory 服务的物理 topic避免拆服务时出现双写口径。
_, err = outboxRepo.CreateMessage(
ctx,
EventTypeMemoryExtractRequested,
kafkaCfg.Topic,
strings.TrimSpace(chatPayload.ConversationID),
outboxPayload,
kafkaCfg.MaxRetry,
maxRetry,
)
return err
}

View File

@@ -20,7 +20,7 @@ import (
// 2. 不承担 notification_records 状态机细节,状态流转全部下沉到 notification 模块;
// 3. 不在 handler 内部创建 provider/service避免事件消费与 retry loop 使用两套不同配置。
func RegisterFeishuNotificationHandler(
bus *outboxinfra.EventBus,
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
svc *notification.NotificationService,
) error {
@@ -33,23 +33,27 @@ func RegisterFeishuNotificationHandler(
if svc == nil {
return errors.New("notification service is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, sharedevents.NotificationFeishuRequestedEventType)
if err != nil {
return err
}
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
// 1. 先校验 event_version避免未来协议破坏性升级后旧 handler 误吃新消息。
// 2. 当前阶段只接受 v1版本不匹配属于不可恢复协议错误直接标记 dead。
eventVersion := strings.TrimSpace(envelope.EventVersion)
if eventVersion != "" && eventVersion != sharedevents.NotificationFeishuRequestedEventVersion {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "notification.feishu.requested event_version 不匹配: "+eventVersion)
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "notification.feishu.requested event_version 不匹配: "+eventVersion)
return nil
}
var payload sharedevents.FeishuNotificationRequestedPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析 notification.feishu.requested 载荷失败: "+unmarshalErr.Error())
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析 notification.feishu.requested 载荷失败: "+unmarshalErr.Error())
return nil
}
if validateErr := payload.Validate(); validateErr != nil {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "notification.feishu.requested 载荷非法: "+validateErr.Error())
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "notification.feishu.requested 载荷非法: "+validateErr.Error())
return nil
}
@@ -58,7 +62,7 @@ func RegisterFeishuNotificationHandler(
return handleErr
}
if consumeErr := outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, nil); consumeErr != nil {
if consumeErr := eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, nil); consumeErr != nil {
return consumeErr
}

View File

@@ -0,0 +1,217 @@
package events
import (
"context"
"errors"
"fmt"
"sort"
"strings"
kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
)
// OutboxBus 是启动侧和业务侧共享的 outbox 门面。
//
// 职责边界:
// 1. 对上只暴露 Publish / RegisterEventHandler / Start / Close 这四个能力;
// 2. 对内可以按 service 维度路由到多个底层 engine
// 3. 不承载任何业务处理逻辑只做事件归属、topic/group 和 engine 选择。
type OutboxBus interface {
outboxinfra.EventPublisher
RegisterEventHandler(eventType string, handler outboxinfra.MessageHandler) error
Start(ctx context.Context)
Close()
}
type routedOutboxBus struct {
buses map[string]OutboxBus
}
// NewRoutedOutboxBus 把多个 service 级 outbox bus 组装成一个门面。
//
// 1. 这里不创建底层 engine只接收上层已经建好的 service bus
// 2. 事件发布和 handler 注册都按事件归属路由到对应 service
// 3. 任一 service bus 缺失时直接报错,避免静默回落到共享 topic。
func NewRoutedOutboxBus(buses map[string]OutboxBus) OutboxBus {
normalized := make(map[string]OutboxBus, len(buses))
for serviceName, bus := range buses {
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" || bus == nil {
continue
}
normalized[serviceName] = bus
}
if len(normalized) == 0 {
return nil
}
return &routedOutboxBus{buses: normalized}
}
// NewServiceOutboxBus 基于 service 级 topic / group 创建底层 outbox engine。
//
// 1. topic / group 由 service 名称推导,不再要求调用方显式传入共享 topic
// 2. kafka 未启用时返回 nil调用侧可以继续走同步降级路径
// 3. 这里不注册 handler注册仍由启动侧统一完成。
func NewServiceOutboxBus(repo *outboxinfra.Repository, baseCfg kafkabus.Config, serviceName string) (OutboxBus, error) {
if repo == nil {
return nil, errors.New("outbox repository is nil")
}
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" {
return nil, errors.New("serviceName is empty")
}
route, _ := outboxinfra.ResolveServiceRoute(serviceName)
cfg := baseCfg
cfg.Topic = strings.TrimSpace(route.Topic)
cfg.GroupID = strings.TrimSpace(route.GroupID)
cfg.ServiceName = strings.TrimSpace(route.ServiceName)
if cfg.ServiceName == "" {
cfg.ServiceName = serviceName
}
bus, err := outboxinfra.NewEventBus(repo.WithRoute(route), cfg)
if err != nil {
return nil, err
}
if bus == nil {
return nil, nil
}
return bus, nil
}
func (b *routedOutboxBus) Publish(ctx context.Context, req outboxinfra.PublishRequest) error {
serviceBus, err := b.resolveBusByEventType(req.EventType)
if err != nil {
return err
}
return serviceBus.Publish(ctx, req)
}
func (b *routedOutboxBus) RegisterEventHandler(eventType string, handler outboxinfra.MessageHandler) error {
if handler == nil {
return errors.New("handler is nil")
}
serviceBus, err := b.resolveBusByEventType(eventType)
if err != nil {
return err
}
return serviceBus.RegisterEventHandler(eventType, handler)
}
func (b *routedOutboxBus) Start(ctx context.Context) {
if b == nil {
return
}
for _, serviceName := range orderedOutboxServiceNames(b.buses) {
b.buses[serviceName].Start(ctx)
}
}
func (b *routedOutboxBus) Close() {
if b == nil {
return
}
for _, serviceName := range orderedOutboxServiceNames(b.buses) {
b.buses[serviceName].Close()
}
}
func (b *routedOutboxBus) resolveBusByEventType(eventType string) (OutboxBus, error) {
if b == nil {
return nil, errors.New("outbox bus is not initialized")
}
eventType = strings.TrimSpace(eventType)
if eventType == "" {
return nil, errors.New("eventType is empty")
}
serviceName, ok := outboxinfra.ResolveEventService(eventType)
if !ok {
return nil, fmt.Errorf("outbox route not registered: eventType=%s", eventType)
}
serviceBus, ok := b.buses[strings.TrimSpace(serviceName)]
if !ok || serviceBus == nil {
return nil, fmt.Errorf("service outbox bus is missing: service=%s eventType=%s", serviceName, eventType)
}
return serviceBus, nil
}
func orderedOutboxServiceNames(buses map[string]OutboxBus) []string {
ordered := make([]string, 0, len(buses))
seen := make(map[string]struct{}, len(buses))
for _, serviceName := range OutboxServiceNames() {
if _, ok := buses[serviceName]; ok {
ordered = append(ordered, serviceName)
seen[serviceName] = struct{}{}
}
}
extras := make([]string, 0)
for serviceName := range buses {
if _, ok := seen[serviceName]; ok {
continue
}
extras = append(extras, serviceName)
}
sort.Strings(extras)
return append(ordered, extras...)
}
// OutboxServiceNames 返回当前阶段启用的 service 级 outbox 名称。
func OutboxServiceNames() []string {
return []string{
string(outboxHandlerServiceAgent),
string(outboxHandlerServiceTask),
string(outboxHandlerServiceMemory),
string(outboxHandlerServiceActiveScheduler),
string(outboxHandlerServiceNotification),
}
}
// ResolveOutboxTopicForService 把 service 名称映射成独立 Kafka topic。
//
// 1. 这里保留现在的命名风格smartflow.<service>.outbox
// 2. 空 service 只作为兜底,不作为主路径;
// 3. 调用侧不再传共享 topic避免入口继续依赖旧结构。
func ResolveOutboxTopicForService(serviceName string) string {
route, _ := outboxinfra.ResolveServiceRoute(serviceName)
if topic := strings.TrimSpace(route.Topic); topic != "" {
return topic
}
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" {
return kafkabus.DefaultTopic
}
return "smartflow." + serviceName + ".outbox"
}
// ResolveOutboxGroupForService 把 service 名称映射成独立 Kafka group。
func ResolveOutboxGroupForService(serviceName string) string {
route, _ := outboxinfra.ResolveServiceRoute(serviceName)
if groupID := strings.TrimSpace(route.GroupID); groupID != "" {
return groupID
}
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" {
return kafkabus.DefaultGroup
}
return "smartflow-" + serviceName + "-outbox-consumer"
}
// ResolveOutboxTopicForEvent 根据事件归属 service 计算 topic。
func ResolveOutboxTopicForEvent(eventType string) (string, error) {
route, ok := outboxinfra.ResolveEventRoute(eventType)
if !ok {
return "", fmt.Errorf("outbox route not registered: eventType=%s", strings.TrimSpace(eventType))
}
if topic := strings.TrimSpace(route.Topic); topic != "" {
return topic, nil
}
return ResolveOutboxTopicForService(route.ServiceName), nil
}

View File

@@ -0,0 +1,74 @@
package events
import (
"fmt"
"strings"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
)
// outboxHandlerService 表示 outbox 路由归属的业务服务。
//
// 这里只记录服务归属,不承载具体实现包名,方便在启动日志和路由表里直接阅读。
type outboxHandlerService string
const (
outboxHandlerServiceAgent outboxHandlerService = "agent"
outboxHandlerServiceTask outboxHandlerService = "task"
outboxHandlerServiceMemory outboxHandlerService = "memory"
outboxHandlerServiceActiveScheduler outboxHandlerService = "active-scheduler"
outboxHandlerServiceNotification outboxHandlerService = "notification"
)
// outboxHandlerRoute 显式描述“事件类型 -> 服务归属 -> handler 注册动作”。
//
// 1. EventType 负责唯一定位 outbox 路由键;
// 2. Service 负责标明该路由归属的业务服务;
// 3. Register 只负责把对应 handler 挂到总线,不承载业务逻辑。
type outboxHandlerRoute struct {
EventType string
Service outboxHandlerService
Register func() error
}
// registerOutboxHandlerRoutes 逐条注册路由表里的 handler。
//
// 1. 先把事件类型和服务归属写进路由表,避免启动入口散落多处 if err != nil
// 2. 再统一执行注册动作,保证失败时能直接定位到具体 event_type 和 service
// 3. 若某条路由缺少注册函数,直接返回 error避免静默漏注册。
func registerOutboxHandlerRoutes(routes []outboxHandlerRoute) error {
for _, route := range routes {
if route.Register == nil {
return fmt.Errorf("outbox handler route 缺少注册函数: event_type=%s service=%s", route.EventType, route.Service)
}
if err := outboxinfra.RegisterEventService(route.EventType, string(route.Service)); err != nil {
return fmt.Errorf("登记 outbox 事件归属失败event_type=%s, service=%s: %w", route.EventType, route.Service, err)
}
if err := route.Register(); err != nil {
return fmt.Errorf("注册 outbox handler 失败event_type=%s, service=%s: %w", route.EventType, route.Service, err)
}
}
return nil
}
// scopedOutboxRepoForEvent 负责把通用 outbox 仓库收敛到某个事件所属的服务表。//
// 职责边界:
// 1. 只做事件->服务->表的路由,不碰业务写入语义;
// 2. 返回的仓库只适合当前事件的 MarkDead / ConsumeAndMarkConsumed / MarkFailedForRetry
// 3. 路由缺失时直接返回错误,避免默默写回默认表。
func scopedOutboxRepoForEvent(outboxRepo *outboxinfra.Repository, eventType string) (*outboxinfra.Repository, error) {
if outboxRepo == nil {
return nil, fmt.Errorf("outbox repository is nil")
}
eventType = strings.TrimSpace(eventType)
if eventType == "" {
return nil, fmt.Errorf("eventType is empty")
}
route, ok := outboxinfra.ResolveEventRoute(eventType)
if !ok {
return nil, fmt.Errorf("outbox route not registered: eventType=%s", eventType)
}
return outboxRepo.WithRoute(route), nil
}

View File

@@ -31,7 +31,7 @@ const (
// 2. 只处理 `task.urgency.promote.requested` 事件,不处理其他业务事件;
// 3. 通过 `ConsumeAndMarkConsumed` 把“业务更新 + outbox consumed 推进”放进同一事务。
func RegisterTaskUrgencyPromoteHandler(
bus *outboxinfra.EventBus,
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
) error {
@@ -45,25 +45,29 @@ func RegisterTaskUrgencyPromoteHandler(
if repoManager == nil {
return errors.New("repo manager is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeTaskUrgencyPromoteRequested)
if err != nil {
return err
}
// 2. 定义统一处理函数。
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
// 2.1 先解析 payload解析失败属于不可恢复错误直接标记 dead。
var payload model.TaskUrgencyPromoteRequestedPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析任务紧急性平移载荷失败: "+unmarshalErr.Error())
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析任务紧急性平移载荷失败: "+unmarshalErr.Error())
return nil
}
// 2.2 做轻量参数净化,避免脏数据进入 DAO。
payload.TaskIDs = sanitizePositiveUniqueIntIDs(payload.TaskIDs)
if payload.UserID <= 0 || len(payload.TaskIDs) == 0 {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "任务紧急性平移载荷无效: user_id 或 task_ids 非法")
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "任务紧急性平移载荷无效: user_id 或 task_ids 非法")
return nil
}
// 2.3 统一走 outbox 消费事务入口,保证“业务成功 -> consumed”原子一致。
return outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
// 2.3.1 基于同一 tx 构造 RepoManager复用现有跨 DAO 事务模式。
txM := repoManager.WithTx(tx)
// 2.3.2 以消费时刻为准做条件更新,确保“到线”判定与真实落库时刻一致。