后端: 1. 启动层完成第一轮运行边界拆分,新增 `all / api / worker` 三种进程模式:`all` 保持原单体行为,`api` 只启动 Gin 与同步业务依赖,`worker` 只启动 outbox、Kafka consumer 与 memory worker。 2. 启动装配从单个入口拆成 runtime 依赖图,配置、DB、Redis、RAG、memory、DAO、Service、Handler、newAgent 依赖统一集中构造,再按进程角色选择启动 HTTP 或后台循环。 3. outbox 事件总线补齐 dispatch / consume 分离启动能力,支持后续 relay 与 consumer 独立进程化,同时保留原组合启动语义。 4. 核心 outbox handler 注册收口为公共接线入口,统一校验依赖并复用注册顺序,避免 api / worker / all 多入口复制事件注册逻辑。 迁移说明: 5. 本轮只迁运行边界,不拆业务服务边界;旧单体入口仍保留并默认走 `all` 兼容模式,当前切流点是 API 不再消费异步事件,worker 承担后台消费与 memory 任务。 6. 补充微服务四步迁移与第二阶段并行开发计划,明确先拆 API/Worker,再接主动调度与飞书通知,后续再拆 notification、active-scheduler、schedule/task。
384 lines
10 KiB
Go
384 lines
10 KiB
Go
package outbox
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"log"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka"
|
||
"github.com/LoveLosita/smartflow/backend/model"
|
||
segmentkafka "github.com/segmentio/kafka-go"
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
// MessageHandler 是事件消费处理器。
|
||
//
|
||
// 语义约束:
|
||
// 1. 入参 envelope 已完成最外层解析;
|
||
// 2. 返回 nil 表示处理成功,框架提交 offset;
|
||
// 3. 返回 error 表示可重试失败,框架回写 retry 后提交 offset。
|
||
type MessageHandler func(ctx context.Context, envelope kafkabus.Envelope) error
|
||
|
||
// PublishRequest 是通用事件发布入参。
|
||
//
|
||
// 设计目标:
|
||
// 1. 业务只描述“要发什么事件”,不关心 outbox/kafka 细节;
|
||
// 2. 统一收敛事件元数据(event_type/version/aggregate_id);
|
||
// 3. payload 支持任意 DTO,由 infra 统一 JSON 序列化。
|
||
type PublishRequest struct {
|
||
EventType string
|
||
EventVersion string
|
||
MessageKey string
|
||
AggregateID string
|
||
EventID string
|
||
Payload any
|
||
}
|
||
|
||
// Engine 是 Outbox + Kafka 通用异步引擎。
|
||
//
|
||
// 职责边界:
|
||
// 1. 负责 outbox 扫描、kafka 投递、kafka 消费、状态机推进;
|
||
// 2. 负责 event_type -> handler 路由;
|
||
// 3. 不负责任何业务语义(业务由 handler 承担)。
|
||
type Engine struct {
|
||
repo *Repository
|
||
producer *kafkabus.Producer
|
||
consumer *kafkabus.Consumer
|
||
|
||
brokers []string
|
||
topic string
|
||
maxRetry int
|
||
scanEvery time.Duration
|
||
scanBatch int
|
||
|
||
handlersMu sync.RWMutex
|
||
handlers map[string]MessageHandler
|
||
}
|
||
|
||
// NewEngine 创建异步引擎。
|
||
//
|
||
// 规则:
|
||
// 1. kafka.enabled=false 时返回 nil,调用方可降级同步;
|
||
// 2. producer/consumer 任一步失败都会回收已创建资源。
|
||
func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) {
|
||
if !cfg.Enabled {
|
||
return nil, nil
|
||
}
|
||
if repo == nil {
|
||
return nil, errors.New("outbox repository is nil")
|
||
}
|
||
|
||
producer, err := kafkabus.NewProducer(cfg)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
consumer, err := kafkabus.NewConsumer(cfg)
|
||
if err != nil {
|
||
_ = producer.Close()
|
||
return nil, err
|
||
}
|
||
|
||
return &Engine{
|
||
repo: repo,
|
||
producer: producer,
|
||
consumer: consumer,
|
||
brokers: cfg.Brokers,
|
||
topic: cfg.Topic,
|
||
maxRetry: cfg.MaxRetry,
|
||
scanEvery: cfg.RetryScanInterval,
|
||
scanBatch: cfg.RetryBatchSize,
|
||
handlers: make(map[string]MessageHandler),
|
||
}, nil
|
||
}
|
||
|
||
// RegisterHandler 是历史别名(等价 RegisterEventHandler)。
|
||
func (e *Engine) RegisterHandler(eventType string, handler MessageHandler) error {
|
||
return e.RegisterEventHandler(eventType, handler)
|
||
}
|
||
|
||
// RegisterEventHandler 注册事件处理器。
|
||
func (e *Engine) RegisterEventHandler(eventType string, handler MessageHandler) error {
|
||
if e == nil {
|
||
return errors.New("outbox engine is nil")
|
||
}
|
||
eventType = strings.TrimSpace(eventType)
|
||
if eventType == "" {
|
||
return errors.New("eventType is empty")
|
||
}
|
||
if handler == nil {
|
||
return errors.New("handler is nil")
|
||
}
|
||
|
||
e.handlersMu.Lock()
|
||
defer e.handlersMu.Unlock()
|
||
if _, exists := e.handlers[eventType]; exists {
|
||
log.Printf("outbox handler 覆盖注册: event_type=%s", eventType)
|
||
}
|
||
e.handlers[eventType] = handler
|
||
return nil
|
||
}
|
||
|
||
func (e *Engine) getHandler(eventType string) (MessageHandler, bool) {
|
||
e.handlersMu.RLock()
|
||
defer e.handlersMu.RUnlock()
|
||
h, ok := e.handlers[eventType]
|
||
return h, ok
|
||
}
|
||
|
||
// Start 启动 dispatch + consume 两个后台循环。
|
||
func (e *Engine) Start(ctx context.Context) {
|
||
if e == nil {
|
||
return
|
||
}
|
||
|
||
log.Printf("outbox engine starting: topic=%s brokers=%v retry_scan=%s batch=%d", e.topic, e.brokers, e.scanEvery, e.scanBatch)
|
||
if err := kafkabus.WaitTopicReady(ctx, e.brokers, e.topic, 30*time.Second); err != nil {
|
||
log.Printf("Kafka topic not ready before consume loop start: %v", err)
|
||
} else {
|
||
log.Printf("Kafka topic is ready: %s", e.topic)
|
||
}
|
||
|
||
e.StartDispatch(ctx)
|
||
e.StartConsume(ctx)
|
||
}
|
||
|
||
// StartDispatch 单独启动 outbox -> Kafka 的投递循环。
|
||
//
|
||
// 职责边界:
|
||
// 1. 只负责启动 dispatch 后台 goroutine,不负责启动 Kafka 消费;
|
||
// 2. 不重复执行 Start 中的 topic readiness 等待,避免改变原 Start(ctx) 的启动语义;
|
||
// 3. ctx 取消后由内部循环自行退出,调用方无需额外停止 goroutine。
|
||
func (e *Engine) StartDispatch(ctx context.Context) {
|
||
if e == nil {
|
||
return
|
||
}
|
||
go e.startDispatchLoop(ctx)
|
||
}
|
||
|
||
// StartConsume 单独启动 Kafka -> handler 的消费循环。
|
||
//
|
||
// 职责边界:
|
||
// 1. 只负责启动 consume 后台 goroutine,不负责扫描或投递 outbox;
|
||
// 2. 不注册业务 handler,handler 仍由 RegisterEventHandler 显式注入;
|
||
// 3. ctx 取消或 consumer 返回 context.Canceled 时,内部循环按既有逻辑退出。
|
||
func (e *Engine) StartConsume(ctx context.Context) {
|
||
if e == nil {
|
||
return
|
||
}
|
||
go e.startConsumeLoop(ctx)
|
||
}
|
||
|
||
// Close 关闭 kafka 资源。
|
||
func (e *Engine) Close() {
|
||
if e == nil {
|
||
return
|
||
}
|
||
if err := e.producer.Close(); err != nil {
|
||
log.Printf("关闭 Kafka producer 失败: %v", err)
|
||
}
|
||
if err := e.consumer.Close(); err != nil {
|
||
log.Printf("关闭 Kafka consumer 失败: %v", err)
|
||
}
|
||
}
|
||
|
||
// Enqueue 是历史别名(等价 Publish)。
|
||
func (e *Engine) Enqueue(ctx context.Context, eventType, messageKey string, payload any) error {
|
||
return e.Publish(ctx, PublishRequest{
|
||
EventType: eventType,
|
||
MessageKey: messageKey,
|
||
AggregateID: messageKey,
|
||
Payload: payload,
|
||
})
|
||
}
|
||
|
||
// Publish 发布事件到 outbox。
|
||
//
|
||
// 步骤:
|
||
// 1. 标准化 event_type/version/key;
|
||
// 2. payload 序列化;
|
||
// 3. 写入 outbox(仅本地写库,不做 kafka 网络 IO)。
|
||
func (e *Engine) Publish(ctx context.Context, req PublishRequest) error {
|
||
if e == nil {
|
||
return errors.New("outbox engine is nil")
|
||
}
|
||
|
||
eventType := strings.TrimSpace(req.EventType)
|
||
if eventType == "" {
|
||
return errors.New("eventType is empty")
|
||
}
|
||
eventVersion := strings.TrimSpace(req.EventVersion)
|
||
if eventVersion == "" {
|
||
eventVersion = DefaultEventVersion
|
||
}
|
||
messageKey := strings.TrimSpace(req.MessageKey)
|
||
aggregateID := strings.TrimSpace(req.AggregateID)
|
||
if aggregateID == "" {
|
||
aggregateID = messageKey
|
||
}
|
||
|
||
payloadJSON, err := json.Marshal(req.Payload)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
_, err = e.repo.CreateMessage(ctx, eventType, e.topic, messageKey, OutboxEventPayload{
|
||
EventID: strings.TrimSpace(req.EventID),
|
||
EventType: eventType,
|
||
EventVersion: eventVersion,
|
||
AggregateID: aggregateID,
|
||
Payload: payloadJSON,
|
||
}, e.maxRetry)
|
||
return err
|
||
}
|
||
|
||
func (e *Engine) startDispatchLoop(ctx context.Context) {
|
||
ticker := time.NewTicker(e.scanEvery)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-ticker.C:
|
||
pendingMessages, err := e.repo.ListDueMessages(ctx, e.scanBatch)
|
||
if err != nil {
|
||
log.Printf("扫描 outbox 失败: %v", err)
|
||
continue
|
||
}
|
||
if len(pendingMessages) > 0 {
|
||
log.Printf("outbox due messages=%d, start dispatch", len(pendingMessages))
|
||
}
|
||
|
||
for _, msg := range pendingMessages {
|
||
if err = e.dispatchOne(ctx, msg.ID); err != nil {
|
||
log.Printf("重试投递 outbox 消息失败(id=%d): %v", msg.ID, err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error {
|
||
outboxMsg, err := e.repo.GetByID(ctx, outboxID)
|
||
if err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return nil
|
||
}
|
||
return err
|
||
}
|
||
if outboxMsg.Status == model.OutboxStatusConsumed || outboxMsg.Status == model.OutboxStatusDead {
|
||
return nil
|
||
}
|
||
|
||
eventPayload, payloadErr := parseOutboxEventPayload(outboxMsg.Payload)
|
||
if payloadErr != nil {
|
||
markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "解析 outbox 事件包失败: "+payloadErr.Error())
|
||
if markErr != nil {
|
||
log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr)
|
||
}
|
||
return payloadErr
|
||
}
|
||
if eventPayload.EventID == "" {
|
||
eventPayload.EventID = strconv.FormatInt(outboxMsg.ID, 10)
|
||
}
|
||
|
||
envelope := kafkabus.Envelope{
|
||
OutboxID: outboxMsg.ID,
|
||
EventID: eventPayload.EventID,
|
||
EventType: eventPayload.EventType,
|
||
EventVersion: eventPayload.EventVersion,
|
||
AggregateID: eventPayload.AggregateID,
|
||
Payload: eventPayload.PayloadJSON,
|
||
}
|
||
raw, err := json.Marshal(envelope)
|
||
if err != nil {
|
||
markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 包装失败: "+err.Error())
|
||
if markErr != nil {
|
||
log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr)
|
||
}
|
||
return err
|
||
}
|
||
|
||
if err = e.producer.Enqueue(ctx, outboxMsg.Topic, outboxMsg.MessageKey, raw); err != nil {
|
||
_ = e.repo.MarkFailedForRetry(ctx, outboxMsg.ID, "投递 Kafka 失败: "+err.Error())
|
||
return err
|
||
}
|
||
if err = e.repo.MarkPublished(ctx, outboxMsg.ID); err != nil {
|
||
_ = e.repo.MarkFailedForRetry(ctx, outboxMsg.ID, "更新已投递状态失败: "+err.Error())
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (e *Engine) startConsumeLoop(ctx context.Context) {
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
default:
|
||
}
|
||
|
||
msg, err := e.consumer.Dequeue(ctx)
|
||
if err != nil {
|
||
if errors.Is(err, context.Canceled) {
|
||
return
|
||
}
|
||
log.Printf("Kafka 消费拉取失败(topic=%s): %v", e.topic, err)
|
||
time.Sleep(300 * time.Millisecond)
|
||
continue
|
||
}
|
||
|
||
if err = e.handleMessage(ctx, msg); err != nil {
|
||
log.Printf("处理 Kafka 消息失败(topic=%s, partition=%d, offset=%d): %v", msg.Topic, msg.Partition, msg.Offset, err)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) error {
|
||
var envelope kafkabus.Envelope
|
||
if err := json.Unmarshal(msg.Value, &envelope); err != nil {
|
||
_ = e.consumer.Commit(ctx, msg)
|
||
return fmt.Errorf("解析 Kafka 包装失败: %w", err)
|
||
}
|
||
if envelope.OutboxID <= 0 {
|
||
_ = e.consumer.Commit(ctx, msg)
|
||
return errors.New("Kafka 包装缺少 outbox_id")
|
||
}
|
||
|
||
eventType := strings.TrimSpace(envelope.EventType)
|
||
if eventType == "" {
|
||
_ = e.repo.MarkDead(ctx, envelope.OutboxID, "消息缺少事件类型")
|
||
if err := e.consumer.Commit(ctx, msg); err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
handler, ok := e.getHandler(eventType)
|
||
if !ok {
|
||
_ = e.repo.MarkDead(ctx, envelope.OutboxID, "未知事件类型: "+eventType)
|
||
if err := e.consumer.Commit(ctx, msg); err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
if err := handler(ctx, envelope); err != nil {
|
||
if markErr := e.repo.MarkFailedForRetry(ctx, envelope.OutboxID, "消费处理失败: "+err.Error()); markErr != nil {
|
||
return markErr
|
||
}
|
||
if commitErr := e.consumer.Commit(ctx, msg); commitErr != nil {
|
||
return commitErr
|
||
}
|
||
return err
|
||
}
|
||
|
||
return e.consumer.Commit(ctx, msg)
|
||
}
|