feat: 接入论坛奖励 outbox 链路

This commit is contained in:
Losita
2026-05-05 10:44:33 +08:00
parent 4fc6c0cac3
commit c42f0c5b8c
31 changed files with 1381 additions and 101 deletions

View File

@@ -25,6 +25,8 @@ import (
// 3. 返回 error 表示可重试失败,框架回写 retry 后提交 offset。
type MessageHandler func(ctx context.Context, envelope kafkabus.Envelope) error
const minPublishedRescueAfter = 10 * time.Second
// PublishRequest 是通用事件发布入参。
//
// 设计目标:
@@ -56,6 +58,10 @@ type Engine struct {
maxRetry int
scanEvery time.Duration
scanBatch int
// publishedRescueAfter 是 published 消息本地兜底消费窗口,避免 Kafka 已投递但 consumer 长时间未完成时永久卡住。
publishedRescueAfter time.Duration
// publishedRescueEnabled 控制是否启用本地兜底消费;默认只给幂等账务类服务打开。
publishedRescueEnabled bool
handlersMu sync.RWMutex
handlers map[string]MessageHandler
@@ -91,15 +97,17 @@ func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) {
}
return &Engine{
repo: serviceRepo,
producer: producer,
consumer: consumer,
brokers: cfg.Brokers,
route: route,
maxRetry: cfg.MaxRetry,
scanEvery: cfg.RetryScanInterval,
scanBatch: cfg.RetryBatchSize,
handlers: make(map[string]MessageHandler),
repo: serviceRepo,
producer: producer,
consumer: consumer,
brokers: cfg.Brokers,
route: route,
maxRetry: cfg.MaxRetry,
scanEvery: cfg.RetryScanInterval,
scanBatch: cfg.RetryBatchSize,
publishedRescueAfter: normalizePublishedRescueAfter(cfg.RetryScanInterval),
publishedRescueEnabled: route.ServiceName == ServiceNameTokenStore,
handlers: make(map[string]MessageHandler),
}, nil
}
@@ -265,6 +273,9 @@ func (e *Engine) startDispatchLoop(ctx context.Context) {
log.Printf("重试投递 outbox 消息失败(id=%d): %v", msg.ID, err)
}
}
if err = e.rescueStalePublishedMessages(ctx); err != nil {
log.Printf("兜底消费已投递 outbox 消息失败(service=%s): %v", e.route.ServiceName, err)
}
}
}
}
@@ -281,7 +292,7 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error {
return nil
}
eventPayload, payloadErr := parseOutboxEventPayload(outboxMsg.Payload)
envelope, payloadErr := e.envelopeFromOutboxMessage(outboxMsg)
if payloadErr != nil {
markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "解析 outbox 事件包失败: "+payloadErr.Error())
if markErr != nil {
@@ -289,23 +300,6 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error {
}
return payloadErr
}
if eventPayload.EventID == "" {
eventPayload.EventID = strconv.FormatInt(outboxMsg.ID, 10)
}
serviceName := strings.TrimSpace(outboxMsg.ServiceName)
if serviceName == "" {
serviceName = e.route.ServiceName
}
envelope := kafkabus.Envelope{
OutboxID: outboxMsg.ID,
EventID: eventPayload.EventID,
EventType: eventPayload.EventType,
EventVersion: eventPayload.EventVersion,
ServiceName: serviceName,
AggregateID: eventPayload.AggregateID,
Payload: eventPayload.PayloadJSON,
}
raw, err := json.Marshal(envelope)
if err != nil {
markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 封装失败: "+err.Error())
@@ -326,6 +320,93 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error {
return nil
}
// rescueStalePublishedMessages 对 published 后长时间未 consumed 的消息做本地兜底消费。
//
// 职责边界:
// 1. 只处理当前 service 表内的 stale published 消息,不扫描其它服务;
// 2. 不重新投递 Kafka直接复用 handler 的幂等消费逻辑,避免同一坏分区长期卡死;
// 3. 单条失败只写日志并继续下一条,避免一条坏消息阻断整批兜底。
func (e *Engine) rescueStalePublishedMessages(ctx context.Context) error {
if e == nil || !e.publishedRescueEnabled {
return nil
}
before := time.Now().Add(-e.publishedRescueAfter)
messages, err := e.repo.ListStalePublishedMessages(ctx, e.route.ServiceName, before, e.scanBatch)
if err != nil {
return err
}
if len(messages) > 0 {
log.Printf("outbox stale published messages=%d, service=%s start local consume", len(messages), e.route.ServiceName)
}
for _, msg := range messages {
if err := e.consumePublishedOne(ctx, msg.ID); err != nil {
log.Printf("兜底消费 outbox 消息失败(id=%d, service=%s): %v", msg.ID, e.route.ServiceName, err)
}
}
return nil
}
// consumePublishedOne 兜底消费单条已投递但未完成的 outbox 消息。
//
// 职责边界:
// 1. 只在当前状态仍为 published 时处理,避免覆盖正常 consumer 的最终态;
// 2. 解析失败标记 dead业务失败交给 handleEnvelope 推进重试;
// 3. 不提交 Kafka offset因为这里没有从 Kafka 读取消息。
func (e *Engine) consumePublishedOne(ctx context.Context, outboxID int64) error {
outboxMsg, err := e.repo.GetByID(ctx, outboxID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return err
}
if outboxMsg.Status != model.OutboxStatusPublished {
return nil
}
envelope, payloadErr := e.envelopeFromOutboxMessage(outboxMsg)
if payloadErr != nil {
markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "解析已投递 outbox 事件包失败: "+payloadErr.Error())
if markErr != nil {
log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr)
}
return payloadErr
}
return e.handleEnvelope(ctx, envelope, false)
}
// envelopeFromOutboxMessage 把 outbox 表记录还原成统一事件信封。
//
// 职责边界:
// 1. 只做 payload 外壳解析和缺省字段补齐;
// 2. 不判断业务事件是否合法,具体校验仍交给 handler
// 3. event_id 缺失时使用 outbox id 兜底,保持历史消息可消费。
func (e *Engine) envelopeFromOutboxMessage(outboxMsg *model.AgentOutboxMessage) (kafkabus.Envelope, error) {
if outboxMsg == nil {
return kafkabus.Envelope{}, errors.New("outbox message is nil")
}
eventPayload, err := parseOutboxEventPayload(outboxMsg.Payload)
if err != nil {
return kafkabus.Envelope{}, err
}
if eventPayload.EventID == "" {
eventPayload.EventID = strconv.FormatInt(outboxMsg.ID, 10)
}
serviceName := strings.TrimSpace(outboxMsg.ServiceName)
if serviceName == "" {
serviceName = e.route.ServiceName
}
return kafkabus.Envelope{
OutboxID: outboxMsg.ID,
EventID: eventPayload.EventID,
EventType: eventPayload.EventType,
EventVersion: eventPayload.EventVersion,
ServiceName: serviceName,
AggregateID: eventPayload.AggregateID,
Payload: eventPayload.PayloadJSON,
}, nil
}
func (e *Engine) startConsumeLoop(ctx context.Context) {
for {
select {
@@ -361,12 +442,33 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er
return errors.New("Kafka 封装缺少 outbox_id")
}
if err := e.handleEnvelope(ctx, envelope, true); err != nil {
if commitErr := e.consumer.Commit(ctx, msg); commitErr != nil {
return commitErr
}
return err
}
return e.consumer.Commit(ctx, msg)
}
// handleEnvelope 执行统一事件信封的本地 handler 路由和状态推进。
//
// 职责边界:
// 1. 负责事件类型、服务归属和 handler 存在性校验;
// 2. handler 成功后由业务 handler 自己标记 consumed
// 3. retryOnFailure=true 时才把失败消息退回 pending避免本地兜底把已投递消息重复投到 Kafka。
func (e *Engine) handleEnvelope(ctx context.Context, envelope kafkabus.Envelope, retryOnFailure bool) error {
status, err := e.currentMessageStatus(ctx, envelope.OutboxID)
if err != nil {
return err
}
if status != model.OutboxStatusPublished {
return nil
}
eventType := strings.TrimSpace(envelope.EventType)
if eventType == "" {
_ = e.repo.MarkDead(ctx, envelope.OutboxID, "消息缺少事件类型")
if err := e.consumer.Commit(ctx, msg); err != nil {
return err
}
return nil
}
@@ -386,9 +488,6 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er
eventType,
envelope.OutboxID,
)
if err := e.consumer.Commit(ctx, msg); err != nil {
return err
}
return nil
}
}
@@ -400,23 +499,50 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er
} else {
_ = e.repo.MarkDead(ctx, envelope.OutboxID, "本服务未注册 handler: "+eventType)
}
if err := e.consumer.Commit(ctx, msg); err != nil {
return err
}
return nil
}
if err := handler(ctx, envelope); err != nil {
if markErr := e.repo.MarkFailedForRetry(ctx, envelope.OutboxID, "消费处理失败: "+err.Error()); markErr != nil {
return markErr
}
if commitErr := e.consumer.Commit(ctx, msg); commitErr != nil {
return commitErr
if retryOnFailure {
if markErr := e.repo.MarkFailedForRetry(ctx, envelope.OutboxID, "消费处理失败: "+err.Error()); markErr != nil {
return markErr
}
}
return err
}
return e.consumer.Commit(ctx, msg)
return nil
}
// currentMessageStatus 读取 outbox 当前状态,作为重复 Kafka 消息的第一道闸门。
//
// 职责边界:
// 1. 只返回当前状态,不推进状态机;
// 2. 记录已消失时按最终态处理,避免历史 Kafka 消息造成消费循环报错;
// 3. handler 只允许在 published 状态执行pending/consumed/dead 都直接跳过。
func (e *Engine) currentMessageStatus(ctx context.Context, outboxID int64) (string, error) {
msg, err := e.repo.GetByID(ctx, outboxID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return model.OutboxStatusConsumed, nil
}
return "", err
}
return strings.TrimSpace(msg.Status), nil
}
// normalizePublishedRescueAfter 根据扫描间隔计算 published 兜底窗口。
//
// 职责边界:
// 1. 只做最小窗口保护,避免刚投递的消息被立即本地重复消费;
// 2. 不读取配置中心,保持 outbox engine 构造参数单一;
// 3. 返回值越小恢复越快,越大重复消费概率越低。
func normalizePublishedRescueAfter(scanEvery time.Duration) time.Duration {
rescueAfter := scanEvery * 3
if rescueAfter < minPublishedRescueAfter {
return minPublishedRescueAfter
}
return rescueAfter
}
func resolveEngineRoute(repo *Repository, cfg kafkabus.Config) ServiceRoute {