Version: 0.6.1.dev.260316
♻️ refactor(outbox): 抽离通用事件总线,并完成 event_type-only 收口 - ✨ 新增 `infra` 层通用 `EventBus` / `EventContract`,统一事件发布与消费协议 - 🔄 将聊天持久化链路调整为通过 `service/events` 注册 handler 并发布事件,进一步解耦业务逻辑与异步处理流程 - 🧹 移除 `chat_history_async` 旧适配实现,以及基于 `biz_type` 的兼容分发逻辑 - 📝 更新 Outbox 异步持久化决策记录,明确保留方案 A,并正式启用方案 B - 📚 同步更新 README 中关于 Outbox + Kafka 可靠异步链路的说明 - 🚚 当前 `outbox + kafka` 已与项目业务链路完全解耦,沉淀为通用、可靠性更强的消息队列能力;后续将参考消息队列的典型使用方式,逐步扩展到更多业务场景 - ✨ 补充跨不同分类事务管理器中的 `agent dao` 注册与接入支持
This commit is contained in:
@@ -2,13 +2,25 @@ package kafka
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
// Envelope 是 outbox 投递到 Kafka 的统一包裹结构。
|
||||
// 设计目的:
|
||||
// 1) 消费端先拿到 outbox_id,可直接回写状态;
|
||||
// 2) biz_type 做分发,支持后续扩展更多异步业务;
|
||||
// 3) payload 保持原始 JSON,按业务类型再反序列化。
|
||||
// Envelope 是 outbox 投递到 Kafka 的统一协议包。
|
||||
//
|
||||
// 协议边界:
|
||||
// 1. 这是总线协议,不包含具体业务字段;
|
||||
// 2. 路由只依赖 event_type,不再保留 biz_type 兼容字段;
|
||||
// 3. payload 为原始业务 JSON,由业务 handler 决定如何反序列化。
|
||||
type Envelope struct {
|
||||
OutboxID int64 `json:"outbox_id"`
|
||||
BizType string `json:"biz_type"`
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
// OutboxID 是 outbox 状态机主键,用于消费者回写 consumed/retry/dead。
|
||||
OutboxID int64 `json:"outbox_id"`
|
||||
// EventID 是事件唯一标识(当前默认回退为 outbox_id 字符串)。
|
||||
EventID string `json:"event_id,omitempty"`
|
||||
|
||||
// EventType 是唯一路由键(例如 chat.history.persist.requested)。
|
||||
EventType string `json:"event_type"`
|
||||
// EventVersion 是事件版本号(默认 v1)。
|
||||
EventVersion string `json:"event_version,omitempty"`
|
||||
// AggregateID 是聚合主键(例如 conversation_id),用于追踪同一业务对象事件流。
|
||||
AggregateID string `json:"aggregate_id,omitempty"`
|
||||
|
||||
// Payload 是业务载荷 JSON。
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
}
|
||||
|
||||
@@ -1,91 +0,0 @@
|
||||
package outbox
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
||||
kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka"
|
||||
"github.com/LoveLosita/smartflow/backend/model"
|
||||
)
|
||||
|
||||
// ChatHistoryAsync 是“聊天记录异步持久化”的业务适配器。
|
||||
//
|
||||
// 设计目的:
|
||||
// 1) 让业务层只调用 EnqueueChatHistoryPersist,而不感知扫描/投递/消费细节;
|
||||
// 2) 保持现有 Agent 代码调用习惯,降低改造面;
|
||||
// 3) 把具体的 outbox+kafka 主流程彻底收敛到 infra。
|
||||
type ChatHistoryAsync struct {
|
||||
engine *Engine
|
||||
}
|
||||
|
||||
// NewChatHistoryAsync 创建聊天记录异步适配器并注册处理器。
|
||||
//
|
||||
// 处理器职责:
|
||||
// 1) 从 envelope payload 解析聊天载荷;
|
||||
// 2) 调用仓储“落库并标记 consumed”;
|
||||
// 3) 解析失败时标记 dead(不可恢复错误),避免无意义重试。
|
||||
func NewChatHistoryAsync(repo *Repository, cfg kafkabus.Config) (*ChatHistoryAsync, error) {
|
||||
// 1. 先创建通用引擎,内部会按 cfg.Enabled 决定是否启用。
|
||||
engine, err := NewEngine(repo, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if engine == nil {
|
||||
// 2. 异步开关关闭:返回 nil 交给上层走同步降级路径。
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// 3. 注册“聊天记录持久化”业务处理器。
|
||||
// 该处理器只做三件事:
|
||||
// 3.1 解析 payload;
|
||||
// 3.2 调仓储落库并推进 consumed;
|
||||
// 3.3 遇到不可恢复错误时标记 dead。
|
||||
if err = engine.RegisterHandler(model.OutboxBizTypeChatHistoryPersist, func(ctx context.Context, envelope kafkabus.Envelope) error {
|
||||
var payload model.ChatHistoryPersistPayload
|
||||
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
|
||||
_ = repo.MarkDead(ctx, envelope.OutboxID, "解析聊天持久化载荷失败: "+unmarshalErr.Error())
|
||||
// 返回 nil:该错误已被标记为 dead,不需要再走重试。
|
||||
return nil
|
||||
}
|
||||
// 返回 error:由 engine 统一标记为 retry 并提交 offset。
|
||||
return repo.PersistChatHistoryAndMarkConsumed(ctx, envelope.OutboxID, payload)
|
||||
}); err != nil {
|
||||
// 4. 注册失败时回收已创建的引擎资源,防止泄漏。
|
||||
engine.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 5. 返回业务适配器,对业务层暴露“更语义化”的调用入口。
|
||||
return &ChatHistoryAsync{engine: engine}, nil
|
||||
}
|
||||
|
||||
// Start 启动异步引擎(扫描 + 消费)。
|
||||
func (a *ChatHistoryAsync) Start(ctx context.Context) {
|
||||
// 允许在未初始化(例如异步关闭)时被安全调用。
|
||||
if a == nil || a.engine == nil {
|
||||
return
|
||||
}
|
||||
a.engine.Start(ctx)
|
||||
}
|
||||
|
||||
// Close 关闭异步引擎资源。
|
||||
func (a *ChatHistoryAsync) Close() {
|
||||
// 允许在未初始化(例如异步关闭)时被安全调用。
|
||||
if a == nil || a.engine == nil {
|
||||
return
|
||||
}
|
||||
a.engine.Close()
|
||||
}
|
||||
|
||||
// EnqueueChatHistoryPersist 将聊天记录持久化请求写入 outbox。
|
||||
// 该方法是业务层唯一需要调用的入口。
|
||||
func (a *ChatHistoryAsync) EnqueueChatHistoryPersist(ctx context.Context, payload model.ChatHistoryPersistPayload) error {
|
||||
// 1. 若引擎未初始化,说明启动配置有问题或异步功能未启用。
|
||||
// 这里显式返回错误,交由业务层按需降级/告警。
|
||||
if a == nil || a.engine == nil {
|
||||
return errors.New("chat history async is not initialized")
|
||||
}
|
||||
// 2. 以 conversation_id 作为 messageKey,尽量让同会话消息落在稳定分区。
|
||||
return a.engine.Enqueue(ctx, model.OutboxBizTypeChatHistoryPersist, payload.ConversationID, payload)
|
||||
}
|
||||
@@ -6,6 +6,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -15,24 +17,35 @@ import (
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// MessageHandler 是 outbox 消费分发处理器。
|
||||
// MessageHandler 是事件消费处理器。
|
||||
//
|
||||
// 设计约束:
|
||||
// 1) 入参 envelope 已经完成最外层解析(含 outbox_id、biz_type、payload);
|
||||
// 2) 若返回 nil,表示业务处理成功,框架将继续提交 Kafka offset;
|
||||
// 3) 若返回 error,框架会按“可重试错误”处理:回写 outbox 失败状态并进入重试窗口。
|
||||
// 语义约束:
|
||||
// 1. 入参 envelope 已完成最外层解析;
|
||||
// 2. 返回 nil 表示处理成功,框架提交 offset;
|
||||
// 3. 返回 error 表示可重试失败,框架回写 retry 后提交 offset。
|
||||
type MessageHandler func(ctx context.Context, envelope kafkabus.Envelope) error
|
||||
|
||||
// Engine 是 Outbox + Kafka 的通用异步引擎。
|
||||
// PublishRequest 是通用事件发布入参。
|
||||
//
|
||||
// 设计目标:
|
||||
// 1. 业务只描述“要发什么事件”,不关心 outbox/kafka 细节;
|
||||
// 2. 统一收敛事件元数据(event_type/version/aggregate_id);
|
||||
// 3. payload 支持任意 DTO,由 infra 统一 JSON 序列化。
|
||||
type PublishRequest struct {
|
||||
EventType string
|
||||
EventVersion string
|
||||
MessageKey string
|
||||
AggregateID string
|
||||
EventID string
|
||||
Payload any
|
||||
}
|
||||
|
||||
// Engine 是 Outbox + Kafka 通用异步引擎。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1) 负责 outbox 扫描、Kafka 投递、Kafka 消费与统一状态机流转;
|
||||
// 2) 负责 biz_type 到处理器的分发;
|
||||
// 3) 不关心具体业务含义(例如“聊天记录落库”),业务语义由 handler 提供。
|
||||
//
|
||||
// 状态流转口径:
|
||||
// pending -> published -> consumed(成功);
|
||||
// pending/published --失败--> pending(带 next_retry_at) 或 dead(达到最大重试)。
|
||||
// 1. 负责 outbox 扫描、kafka 投递、kafka 消费、状态机推进;
|
||||
// 2. 负责 event_type -> handler 路由;
|
||||
// 3. 不负责任何业务语义(业务由 handler 承担)。
|
||||
type Engine struct {
|
||||
repo *Repository
|
||||
producer *kafkabus.Producer
|
||||
@@ -48,23 +61,19 @@ type Engine struct {
|
||||
handlers map[string]MessageHandler
|
||||
}
|
||||
|
||||
// NewEngine 创建 outbox 异步引擎。
|
||||
// NewEngine 创建异步引擎。
|
||||
//
|
||||
// 说明:
|
||||
// 1) cfg.Enabled=false 时返回 nil,调用方可按“异步关闭”处理;
|
||||
// 2) producer/consumer 初始化失败时会确保资源回收,避免半初始化泄漏。
|
||||
// 规则:
|
||||
// 1. kafka.enabled=false 时返回 nil,调用方可降级同步;
|
||||
// 2. producer/consumer 任一步失败都会回收已创建资源。
|
||||
func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) {
|
||||
// 1. 配置关闭时直接返回 nil,让上层可以“无侵入降级为同步模式”。
|
||||
if !cfg.Enabled {
|
||||
return nil, nil
|
||||
}
|
||||
// 2. 仓储缺失属于启动期配置错误,直接返回。
|
||||
if repo == nil {
|
||||
return nil, errors.New("outbox repository is nil")
|
||||
}
|
||||
|
||||
// 3. 先初始化 producer,再初始化 consumer。
|
||||
// 如果第二步失败,要主动回收第一步资源,避免泄漏。
|
||||
producer, err := kafkabus.NewProducer(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -75,7 +84,6 @@ func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 4. 汇总配置,构造引擎实例。
|
||||
return &Engine{
|
||||
repo: repo,
|
||||
producer: producer,
|
||||
@@ -89,78 +97,62 @@ func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// RegisterHandler 注册某个 biz_type 的消费处理器。
|
||||
//
|
||||
// 设计要求:
|
||||
// 1) biz_type 必须唯一,重复注册会覆盖旧值(并打印提示日志);
|
||||
// 2) handler 不能为空;
|
||||
// 3) 建议在 Start 前完成注册,减少运行时热更新复杂度。
|
||||
func (e *Engine) RegisterHandler(bizType string, handler MessageHandler) error {
|
||||
// 1. 参数校验:防止业务侧在启动链路上把 nil 引擎继续往下用。
|
||||
// RegisterHandler 是历史别名(等价 RegisterEventHandler)。
|
||||
func (e *Engine) RegisterHandler(eventType string, handler MessageHandler) error {
|
||||
return e.RegisterEventHandler(eventType, handler)
|
||||
}
|
||||
|
||||
// RegisterEventHandler 注册事件处理器。
|
||||
func (e *Engine) RegisterEventHandler(eventType string, handler MessageHandler) error {
|
||||
if e == nil {
|
||||
return errors.New("outbox engine is nil")
|
||||
}
|
||||
// 2. biz_type 为空会导致无法分发,提前拦截。
|
||||
if bizType == "" {
|
||||
return errors.New("bizType is empty")
|
||||
eventType = strings.TrimSpace(eventType)
|
||||
if eventType == "" {
|
||||
return errors.New("eventType is empty")
|
||||
}
|
||||
// 3. handler 为空会在消费时 panic,必须提前拒绝。
|
||||
if handler == nil {
|
||||
return errors.New("handler is nil")
|
||||
}
|
||||
|
||||
// 4. 加写锁更新 handler 映射,保证并发注册时 map 安全。
|
||||
e.handlersMu.Lock()
|
||||
defer e.handlersMu.Unlock()
|
||||
if _, exists := e.handlers[bizType]; exists {
|
||||
log.Printf("outbox handler 覆盖注册: biz_type=%s", bizType)
|
||||
if _, exists := e.handlers[eventType]; exists {
|
||||
log.Printf("outbox handler 覆盖注册: event_type=%s", eventType)
|
||||
}
|
||||
e.handlers[bizType] = handler
|
||||
e.handlers[eventType] = handler
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Engine) getHandler(bizType string) (MessageHandler, bool) {
|
||||
// 读锁足够满足并发读取需求,避免无谓阻塞。
|
||||
func (e *Engine) getHandler(eventType string) (MessageHandler, bool) {
|
||||
e.handlersMu.RLock()
|
||||
defer e.handlersMu.RUnlock()
|
||||
h, ok := e.handlers[bizType]
|
||||
h, ok := e.handlers[eventType]
|
||||
return h, ok
|
||||
}
|
||||
|
||||
// Start 启动 outbox 异步引擎。
|
||||
//
|
||||
// 会启动两个后台循环:
|
||||
// 1) dispatch loop:扫描 due outbox 并投递到 Kafka;
|
||||
// 2) consume loop:消费 Kafka 并按 biz_type 分发处理。
|
||||
// Start 启动 dispatch + consume 两个后台循环。
|
||||
func (e *Engine) Start(ctx context.Context) {
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 1. 启动日志:把关键运行参数打出来,便于排查“为什么没消费/没扫描”。
|
||||
log.Printf("outbox engine starting: topic=%s brokers=%v retry_scan=%s batch=%d", e.topic, e.brokers, e.scanEvery, e.scanBatch)
|
||||
|
||||
// 2. 启动前探活 topic 是否可用。
|
||||
// 注意:即使探活失败也不会阻断引擎启动,后续循环会继续重试。
|
||||
if err := kafkabus.WaitTopicReady(ctx, e.brokers, e.topic, 30*time.Second); err != nil {
|
||||
log.Printf("Kafka topic not ready before consume loop start: %v", err)
|
||||
} else {
|
||||
log.Printf("Kafka topic is ready: %s", e.topic)
|
||||
}
|
||||
|
||||
// 3. 并行启动两条核心循环:
|
||||
// - dispatch loop:负责 outbox -> Kafka;
|
||||
// - consume loop:负责 Kafka -> handler -> outbox 状态推进。
|
||||
go e.startDispatchLoop(ctx)
|
||||
go e.startConsumeLoop(ctx)
|
||||
}
|
||||
|
||||
// Close 关闭 producer/consumer 资源。
|
||||
// Close 关闭 kafka 资源。
|
||||
func (e *Engine) Close() {
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
// 逐个关闭并记录错误,避免某个 close 失败导致后续资源无法回收。
|
||||
if err := e.producer.Close(); err != nil {
|
||||
log.Printf("关闭 Kafka producer 失败: %v", err)
|
||||
}
|
||||
@@ -169,35 +161,65 @@ func (e *Engine) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue 把业务消息写入 outbox(请求路径调用)。
|
||||
// Enqueue 是历史别名(等价 Publish)。
|
||||
func (e *Engine) Enqueue(ctx context.Context, eventType, messageKey string, payload any) error {
|
||||
return e.Publish(ctx, PublishRequest{
|
||||
EventType: eventType,
|
||||
MessageKey: messageKey,
|
||||
AggregateID: messageKey,
|
||||
Payload: payload,
|
||||
})
|
||||
}
|
||||
|
||||
// Publish 发布事件到 outbox。
|
||||
//
|
||||
// 注意:
|
||||
// 1) 该方法不做 Kafka 网络写入,只有数据库写入;
|
||||
// 2) messageKey 建议使用业务幂等键(如 conversation_id)以提升分区稳定性;
|
||||
// 3) payload 需要可 JSON 序列化。
|
||||
func (e *Engine) Enqueue(ctx context.Context, bizType, messageKey string, payload any) error {
|
||||
// 步骤:
|
||||
// 1. 标准化 event_type/version/key;
|
||||
// 2. payload 序列化;
|
||||
// 3. 写入 outbox(仅本地写库,不做 kafka 网络 IO)。
|
||||
func (e *Engine) Publish(ctx context.Context, req PublishRequest) error {
|
||||
if e == nil {
|
||||
return errors.New("outbox engine is nil")
|
||||
}
|
||||
// 这里故意只写数据库,不做 Kafka 网络 IO,
|
||||
// 目的是把请求耗时稳定在“单次写库”的可控范围。
|
||||
_, err := e.repo.CreateMessage(ctx, bizType, e.topic, messageKey, payload, e.maxRetry)
|
||||
|
||||
eventType := strings.TrimSpace(req.EventType)
|
||||
if eventType == "" {
|
||||
return errors.New("eventType is empty")
|
||||
}
|
||||
eventVersion := strings.TrimSpace(req.EventVersion)
|
||||
if eventVersion == "" {
|
||||
eventVersion = DefaultEventVersion
|
||||
}
|
||||
messageKey := strings.TrimSpace(req.MessageKey)
|
||||
aggregateID := strings.TrimSpace(req.AggregateID)
|
||||
if aggregateID == "" {
|
||||
aggregateID = messageKey
|
||||
}
|
||||
|
||||
payloadJSON, err := json.Marshal(req.Payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = e.repo.CreateMessage(ctx, eventType, e.topic, messageKey, OutboxEventPayload{
|
||||
EventID: strings.TrimSpace(req.EventID),
|
||||
EventType: eventType,
|
||||
EventVersion: eventVersion,
|
||||
AggregateID: aggregateID,
|
||||
Payload: payloadJSON,
|
||||
}, e.maxRetry)
|
||||
return err
|
||||
}
|
||||
|
||||
func (e *Engine) startDispatchLoop(ctx context.Context) {
|
||||
// 1. 定时扫描 due outbox 记录。
|
||||
// 扫描间隔由 scanEvery 控制,避免每次请求都主动触发投递造成抖动。
|
||||
ticker := time.NewTicker(e.scanEvery)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// 2. 收到退出信号后优雅停止循环。
|
||||
return
|
||||
case <-ticker.C:
|
||||
// 3. 拉取当前窗口内可投递消息。
|
||||
pendingMessages, err := e.repo.ListDueMessages(ctx, e.scanBatch)
|
||||
if err != nil {
|
||||
log.Printf("扫描 outbox 失败: %v", err)
|
||||
@@ -207,7 +229,6 @@ func (e *Engine) startDispatchLoop(ctx context.Context) {
|
||||
log.Printf("outbox due messages=%d, start dispatch", len(pendingMessages))
|
||||
}
|
||||
|
||||
// 4. 逐条投递,单条失败不影响同批后续消息。
|
||||
for _, msg := range pendingMessages {
|
||||
if err = e.dispatchOne(ctx, msg.ID); err != nil {
|
||||
log.Printf("重试投递 outbox 消息失败(id=%d): %v", msg.ID, err)
|
||||
@@ -218,29 +239,39 @@ func (e *Engine) startDispatchLoop(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error {
|
||||
// 1. 投递前重新按 ID 读取最新状态,避免用到过期快照。
|
||||
outboxMsg, err := e.repo.GetByID(ctx, outboxID)
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
// 1.1 记录已不存在(可能被清理),按幂等成功处理。
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
// 1.2 最终态直接跳过,避免重复投递。
|
||||
if outboxMsg.Status == model.OutboxStatusConsumed || outboxMsg.Status == model.OutboxStatusDead {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 2. 组装 Kafka 包装体,统一带上 outbox_id 供消费端做状态回写。
|
||||
eventPayload, payloadErr := parseOutboxEventPayload(outboxMsg.Payload)
|
||||
if payloadErr != nil {
|
||||
markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "解析 outbox 事件包失败: "+payloadErr.Error())
|
||||
if markErr != nil {
|
||||
log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr)
|
||||
}
|
||||
return payloadErr
|
||||
}
|
||||
if eventPayload.EventID == "" {
|
||||
eventPayload.EventID = strconv.FormatInt(outboxMsg.ID, 10)
|
||||
}
|
||||
|
||||
envelope := kafkabus.Envelope{
|
||||
OutboxID: outboxMsg.ID,
|
||||
BizType: outboxMsg.BizType,
|
||||
Payload: json.RawMessage(outboxMsg.Payload),
|
||||
OutboxID: outboxMsg.ID,
|
||||
EventID: eventPayload.EventID,
|
||||
EventType: eventPayload.EventType,
|
||||
EventVersion: eventPayload.EventVersion,
|
||||
AggregateID: eventPayload.AggregateID,
|
||||
Payload: eventPayload.PayloadJSON,
|
||||
}
|
||||
raw, err := json.Marshal(envelope)
|
||||
if err != nil {
|
||||
// 2.1 包装层序列化失败通常不可恢复,直接标 dead。
|
||||
markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 包装失败: "+err.Error())
|
||||
if markErr != nil {
|
||||
log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr)
|
||||
@@ -248,8 +279,6 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 3. 先投 Kafka,再把 outbox 状态推进到 published。
|
||||
// 任一步骤失败都回写 retry,让扫描器后续重试。
|
||||
if err = e.producer.Enqueue(ctx, outboxMsg.Topic, outboxMsg.MessageKey, raw); err != nil {
|
||||
_ = e.repo.MarkFailedForRetry(ctx, outboxMsg.ID, "投递 Kafka 失败: "+err.Error())
|
||||
return err
|
||||
@@ -262,29 +291,23 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error {
|
||||
}
|
||||
|
||||
func (e *Engine) startConsumeLoop(ctx context.Context) {
|
||||
// 消费循环采用“拉取 -> 处理 -> 提交 offset”的标准模型。
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// 1. 收到退出信号后终止循环。
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// 2. 拉取下一条 Kafka 消息。
|
||||
msg, err := e.consumer.Dequeue(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
// 2.1 context 主动取消时,不记错误日志,直接退出。
|
||||
return
|
||||
}
|
||||
// 2.2 临时错误短暂退避后继续,避免空转刷日志。
|
||||
log.Printf("Kafka 消费拉取失败(topic=%s): %v", e.topic, err)
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
// 3. 单条消息处理失败仅记录日志,不阻断消费循环。
|
||||
if err = e.handleMessage(ctx, msg); err != nil {
|
||||
log.Printf("处理 Kafka 消息失败(topic=%s, partition=%d, offset=%d): %v", msg.Topic, msg.Partition, msg.Offset, err)
|
||||
}
|
||||
@@ -292,33 +315,35 @@ func (e *Engine) startConsumeLoop(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) error {
|
||||
// 1. 先解析最外层 envelope,拿到 outbox_id + biz_type + payload。
|
||||
var envelope kafkabus.Envelope
|
||||
if err := json.Unmarshal(msg.Value, &envelope); err != nil {
|
||||
// 1.1 包装层损坏时无法恢复,直接提交 offset 防止无限重放。
|
||||
_ = e.consumer.Commit(ctx, msg)
|
||||
return fmt.Errorf("解析 Kafka 包装失败: %w", err)
|
||||
}
|
||||
if envelope.OutboxID <= 0 {
|
||||
// 1.2 缺少 outbox_id 无法回写状态,同样提交 offset 跳过。
|
||||
_ = e.consumer.Commit(ctx, msg)
|
||||
return errors.New("Kafka 包装缺少 outbox_id")
|
||||
}
|
||||
|
||||
// 2. 根据 biz_type 查找业务处理器。
|
||||
handler, ok := e.getHandler(envelope.BizType)
|
||||
if !ok {
|
||||
// 2.1 未注册处理器是配置错误,标记 dead 并提交 offset,避免重复消费。
|
||||
_ = e.repo.MarkDead(ctx, envelope.OutboxID, "未知业务类型: "+envelope.BizType)
|
||||
eventType := strings.TrimSpace(envelope.EventType)
|
||||
if eventType == "" {
|
||||
_ = e.repo.MarkDead(ctx, envelope.OutboxID, "消息缺少事件类型")
|
||||
if err := e.consumer.Commit(ctx, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
handler, ok := e.getHandler(eventType)
|
||||
if !ok {
|
||||
_ = e.repo.MarkDead(ctx, envelope.OutboxID, "未知事件类型: "+eventType)
|
||||
if err := e.consumer.Commit(ctx, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 3. 调用业务处理器。
|
||||
if err := handler(ctx, envelope); err != nil {
|
||||
// 统一按“可重试错误”处理,回写 retry 状态后提交 offset,避免同一条消息在 Kafka 侧死循环。
|
||||
if markErr := e.repo.MarkFailedForRetry(ctx, envelope.OutboxID, "消费处理失败: "+err.Error()); markErr != nil {
|
||||
return markErr
|
||||
}
|
||||
@@ -328,6 +353,5 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er
|
||||
return err
|
||||
}
|
||||
|
||||
// 4. 业务处理成功后提交 offset。
|
||||
return e.consumer.Commit(ctx, msg)
|
||||
}
|
||||
|
||||
87
backend/infra/outbox/event_bus.go
Normal file
87
backend/infra/outbox/event_bus.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package outbox
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka"
|
||||
)
|
||||
|
||||
// EventPublisher 是通用事件发布能力接口。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 只暴露“发布事件”这一件事,隐藏底层 outbox/kafka 实现细节;
|
||||
// 2. 业务层只依赖该接口,避免直接耦合具体引擎结构体;
|
||||
// 3. 该接口不承诺“立即消费成功”,只承诺“事件已入队或返回错误”。
|
||||
type EventPublisher interface {
|
||||
Publish(ctx context.Context, req PublishRequest) error
|
||||
}
|
||||
|
||||
// EventBus 是 outbox 异步总线的门面对象。
|
||||
//
|
||||
// 设计目的:
|
||||
// 1. 对外提供“发布 + 注册处理器 + 启停”三类最小能力;
|
||||
// 2. 对内复用 Engine,不重复实现状态机和调度逻辑;
|
||||
// 3. 为后续引入更多事件类型提供统一扩展点。
|
||||
type EventBus struct {
|
||||
engine *Engine
|
||||
}
|
||||
|
||||
// NewEventBus 创建通用事件总线。
|
||||
//
|
||||
// 说明:
|
||||
// 1. 当 kafka.enabled=false 时返回 nil,调用方可直接降级为同步模式;
|
||||
// 2. 该方法只创建基础设施对象,不自动注册任何业务事件处理器;
|
||||
// 3. 业务事件处理器注册应由上层在启动阶段显式完成,避免隐式副作用。
|
||||
func NewEventBus(repo *Repository, cfg kafkabus.Config) (*EventBus, error) {
|
||||
engine, err := NewEngine(repo, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if engine == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return &EventBus{engine: engine}, nil
|
||||
}
|
||||
|
||||
// RegisterEventHandler 注册事件处理器。
|
||||
//
|
||||
// 失败语义:
|
||||
// 1. bus 未初始化时直接返回错误;
|
||||
// 2. event_type 为空或 handler 为空时返回错误;
|
||||
// 3. 重复注册时采用“后者覆盖前者”并打日志(由 Engine 负责)。
|
||||
func (b *EventBus) RegisterEventHandler(eventType string, handler MessageHandler) error {
|
||||
if b == nil || b.engine == nil {
|
||||
return errors.New("event bus is not initialized")
|
||||
}
|
||||
return b.engine.RegisterEventHandler(eventType, handler)
|
||||
}
|
||||
|
||||
// Publish 发布事件到 outbox 队列。
|
||||
//
|
||||
// 关键语义:
|
||||
// 1. 返回 nil 仅表示“已写入 outbox 成功”;
|
||||
// 2. 真正 Kafka 投递与业务消费由后台异步循环完成;
|
||||
// 3. 若返回 error,表示本次入队失败,调用方应按业务策略决定是否重试/降级。
|
||||
func (b *EventBus) Publish(ctx context.Context, req PublishRequest) error {
|
||||
if b == nil || b.engine == nil {
|
||||
return errors.New("event bus is not initialized")
|
||||
}
|
||||
return b.engine.Publish(ctx, req)
|
||||
}
|
||||
|
||||
// Start 启动事件总线后台循环(dispatch + consume)。
|
||||
func (b *EventBus) Start(ctx context.Context) {
|
||||
if b == nil || b.engine == nil {
|
||||
return
|
||||
}
|
||||
b.engine.Start(ctx)
|
||||
}
|
||||
|
||||
// Close 关闭事件总线资源(producer/consumer)。
|
||||
func (b *EventBus) Close() {
|
||||
if b == nil || b.engine == nil {
|
||||
return
|
||||
}
|
||||
b.engine.Close()
|
||||
}
|
||||
64
backend/infra/outbox/event_contract.go
Normal file
64
backend/infra/outbox/event_contract.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package outbox
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultEventVersion 是通用事件协议默认版本。
|
||||
DefaultEventVersion = "v1"
|
||||
)
|
||||
|
||||
// OutboxEventPayload 是 outbox.payload 的统一事件外壳。
|
||||
type OutboxEventPayload struct {
|
||||
EventID string `json:"event_id,omitempty"`
|
||||
EventType string `json:"event_type"`
|
||||
EventVersion string `json:"event_version,omitempty"`
|
||||
AggregateID string `json:"aggregate_id,omitempty"`
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
}
|
||||
|
||||
// ParsedOutboxEventPayload 是 dispatch 阶段使用的标准化结构。
|
||||
type ParsedOutboxEventPayload struct {
|
||||
EventID string
|
||||
EventType string
|
||||
EventVersion string
|
||||
AggregateID string
|
||||
PayloadJSON json.RawMessage
|
||||
}
|
||||
|
||||
// parseOutboxEventPayload 解析 outbox.payload。
|
||||
//
|
||||
// 当前策略(极致清理版):
|
||||
// 1. 只接受“统一事件外壳”格式;
|
||||
// 2. 不再支持旧格式纯业务 JSON 回退;
|
||||
// 3. event_type 缺失时直接报错,交由上层标 dead。
|
||||
func parseOutboxEventPayload(rawPayload string) (*ParsedOutboxEventPayload, error) {
|
||||
var wrapped OutboxEventPayload
|
||||
if err := json.Unmarshal([]byte(rawPayload), &wrapped); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
eventType := strings.TrimSpace(wrapped.EventType)
|
||||
if eventType == "" {
|
||||
return nil, errors.New("event type is empty")
|
||||
}
|
||||
if len(wrapped.Payload) == 0 {
|
||||
return nil, errors.New("payload is empty")
|
||||
}
|
||||
|
||||
eventVersion := strings.TrimSpace(wrapped.EventVersion)
|
||||
if eventVersion == "" {
|
||||
eventVersion = DefaultEventVersion
|
||||
}
|
||||
|
||||
return &ParsedOutboxEventPayload{
|
||||
EventID: strings.TrimSpace(wrapped.EventID),
|
||||
EventType: eventType,
|
||||
EventVersion: eventVersion,
|
||||
AggregateID: strings.TrimSpace(wrapped.AggregateID),
|
||||
PayloadJSON: wrapped.Payload,
|
||||
}, nil
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/LoveLosita/smartflow/backend/model"
|
||||
@@ -12,47 +11,44 @@ import (
|
||||
"gorm.io/gorm/clause"
|
||||
)
|
||||
|
||||
// Repository 是 outbox 状态机仓储。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 只负责 outbox 状态流转与通用事务编排;
|
||||
// 2. 不负责任何业务语义(例如聊天/任务/标题等具体落库);
|
||||
// 3. 消费成功时通过回调把业务动作注入同一事务,保证原子一致。
|
||||
type Repository struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
// NewRepository 构造 outbox 仓储。
|
||||
// 该仓储只关心“数据库状态机”,不关心 Kafka 投递/消费。
|
||||
func NewRepository(db *gorm.DB) *Repository {
|
||||
return &Repository{db: db}
|
||||
}
|
||||
|
||||
// CreateMessage 是通用 outbox 入队入口。
|
||||
// WithTx 用外部事务句柄构造同事务仓储实例。
|
||||
func (d *Repository) WithTx(tx *gorm.DB) *Repository {
|
||||
return &Repository{db: tx}
|
||||
}
|
||||
|
||||
// CreateMessage 把事件写入 outbox(入队)。
|
||||
//
|
||||
// 设计说明:
|
||||
// 1) 该方法只做“把消息安全写入本地 outbox 表”,不做任何 Kafka 网络调用;
|
||||
// 2) next_retry_at 初始化为当前时间,表示“可立即被扫描器捞取”;
|
||||
// 3) biz_type 由业务方传入,用于消费侧分发到不同处理器;
|
||||
// 4) payload 会被序列化为 JSON 字符串存入 payload 字段,后续再按 biz_type 反序列化。
|
||||
//
|
||||
// 这也是 Outbox 模式的核心:请求路径只承担本地写库成本,把外部系统不确定性(Kafka 延迟/抖动)
|
||||
// 转移给后台异步循环处理。
|
||||
func (d *Repository) CreateMessage(ctx context.Context, bizType, topic, messageKey string, payload any, maxRetry int) (int64, error) {
|
||||
// 1. 防御式兜底:若调用方未传 maxRetry,则统一使用默认值 20。
|
||||
// 这样可以避免某些链路遗漏配置导致消息无限重试或零重试。
|
||||
// 步骤:
|
||||
// 1. 序列化 payload;
|
||||
// 2. 初始化 pending 状态;
|
||||
// 3. 写入 outbox 并返回 outbox_id。
|
||||
func (d *Repository) CreateMessage(ctx context.Context, eventType, topic, messageKey string, payload any, maxRetry int) (int64, error) {
|
||||
if maxRetry <= 0 {
|
||||
maxRetry = 20
|
||||
}
|
||||
|
||||
// 2. 先把业务载荷序列化成 JSON 字符串。
|
||||
// 序列化失败属于“请求入队前失败”,此时不应创建 outbox 记录,直接返回错误即可。
|
||||
raw, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// 3. 组装 outbox 初始记录:
|
||||
// - status=pending:表示待投递;
|
||||
// - retry_count=0:尚未重试;
|
||||
// - next_retry_at=now:扫描器可立即捞取并尝试首次投递。
|
||||
now := time.Now()
|
||||
msg := model.AgentOutboxMessage{
|
||||
BizType: bizType,
|
||||
EventType: eventType,
|
||||
Topic: topic,
|
||||
MessageKey: messageKey,
|
||||
Payload: string(raw),
|
||||
@@ -62,21 +58,12 @@ func (d *Repository) CreateMessage(ctx context.Context, bizType, topic, messageK
|
||||
NextRetryAt: &now,
|
||||
}
|
||||
|
||||
// 4. 落库成功后返回 outbox 主键,供上层日志/追踪链路使用。
|
||||
if err = d.db.WithContext(ctx).Create(&msg).Error; err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return msg.ID, nil
|
||||
}
|
||||
|
||||
// CreateChatHistoryMessage 是聊天记录持久化的兼容入口。
|
||||
// 说明:为了避免现有业务调用一次性改太多,先保留该方法作为 CreateMessage 的薄封装。
|
||||
func (d *Repository) CreateChatHistoryMessage(ctx context.Context, topic, messageKey string, payload model.ChatHistoryPersistPayload, maxRetry int) (int64, error) {
|
||||
return d.CreateMessage(ctx, model.OutboxBizTypeChatHistoryPersist, topic, messageKey, payload, maxRetry)
|
||||
}
|
||||
|
||||
// GetByID 按主键读取 outbox 记录。
|
||||
// 该方法通常用于 dispatch 前“再读一次最新状态”,避免使用过期快照。
|
||||
func (d *Repository) GetByID(ctx context.Context, id int64) (*model.AgentOutboxMessage, error) {
|
||||
var msg model.AgentOutboxMessage
|
||||
if err := d.db.WithContext(ctx).Where("id = ?", id).First(&msg).Error; err != nil {
|
||||
@@ -85,13 +72,8 @@ func (d *Repository) GetByID(ctx context.Context, id int64) (*model.AgentOutboxM
|
||||
return &msg, nil
|
||||
}
|
||||
|
||||
// ListDueMessages 拉取“到期可投递”的 pending 消息。
|
||||
// 条件说明:
|
||||
// 1) status = pending:只处理待投递状态;
|
||||
// 2) next_retry_at <= now:到达可重试/可首次投递时间;
|
||||
// 3) 按 next_retry_at + id 升序:保证老消息优先,降低饥饿概率。
|
||||
// ListDueMessages 拉取到期可投递消息。
|
||||
func (d *Repository) ListDueMessages(ctx context.Context, limit int) ([]model.AgentOutboxMessage, error) {
|
||||
// 1. 限流兜底,避免误传 0 导致一次拉取过多消息。
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
}
|
||||
@@ -108,10 +90,8 @@ func (d *Repository) ListDueMessages(ctx context.Context, limit int) ([]model.Ag
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
// MarkPublished 仅在消息未进入最终态时更新为 published,避免覆盖 consumed/dead。
|
||||
// MarkPublished 标记为已投递 Kafka。
|
||||
func (d *Repository) MarkPublished(ctx context.Context, id int64) error {
|
||||
// 1. published 代表“已成功写入 Kafka”。
|
||||
// 2. 清理 last_error/next_retry_at,表示当前无需重试。
|
||||
now := time.Now()
|
||||
updates := map[string]interface{}{
|
||||
"status": model.OutboxStatusPublished,
|
||||
@@ -119,7 +99,6 @@ func (d *Repository) MarkPublished(ctx context.Context, id int64) error {
|
||||
"last_error": nil,
|
||||
"next_retry_at": nil,
|
||||
}
|
||||
// 3. 额外加状态保护,避免并发下把 consumed/dead 错误覆盖回 published。
|
||||
result := d.db.WithContext(ctx).
|
||||
Model(&model.AgentOutboxMessage{}).
|
||||
Where("id = ? AND status NOT IN (?, ?)", id, model.OutboxStatusConsumed, model.OutboxStatusDead).
|
||||
@@ -127,10 +106,8 @@ func (d *Repository) MarkPublished(ctx context.Context, id int64) error {
|
||||
return result.Error
|
||||
}
|
||||
|
||||
// MarkDead 把消息标记为死信(最终失败,不再重试)。
|
||||
// 常见场景:载荷不可反序列化、biz_type 未注册等“不可恢复错误”。
|
||||
// MarkDead 标记为死信。
|
||||
func (d *Repository) MarkDead(ctx context.Context, id int64, reason string) error {
|
||||
// 1. 错误文本统一裁剪,避免超长错误撑爆字段或日志。
|
||||
now := time.Now()
|
||||
lastErr := truncateError(reason)
|
||||
updates := map[string]interface{}{
|
||||
@@ -142,39 +119,38 @@ func (d *Repository) MarkDead(ctx context.Context, id int64, reason string) erro
|
||||
return d.db.WithContext(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error
|
||||
}
|
||||
|
||||
// MarkFailedForRetry 把一次失败写回 outbox 状态机,并计算下一次重试窗口。
|
||||
// 该方法必须在事务内完成“读当前状态 + 写新状态”,保证并发时计数和状态一致。
|
||||
// MarkFailedForRetry 记录一次可重试失败并推进重试窗口。
|
||||
//
|
||||
// 步骤:
|
||||
// 1. 行级锁读取当前状态;
|
||||
// 2. 最终态幂等短路;
|
||||
// 3. retry_count+1;
|
||||
// 4. 计算 next_retry_at 或 dead;
|
||||
// 5. 写回状态快照。
|
||||
func (d *Repository) MarkFailedForRetry(ctx context.Context, id int64, reason string) error {
|
||||
return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
// 1. 行级锁读取,避免多个 goroutine 同时更新同一条消息导致 retry_count 乱序。
|
||||
var msg model.AgentOutboxMessage
|
||||
err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", id).First(&msg).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 2. 若已是最终态(consumed/dead),直接幂等返回。
|
||||
// 这样即使出现重复调用,也不会把最终态改坏。
|
||||
if msg.Status == model.OutboxStatusConsumed || msg.Status == model.OutboxStatusDead {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 3. 递增重试计数并判断是否达到最大重试次数。
|
||||
nextRetryCount := msg.RetryCount + 1
|
||||
now := time.Now()
|
||||
status := model.OutboxStatusPending
|
||||
var nextRetryAt *time.Time
|
||||
if nextRetryCount >= msg.MaxRetry {
|
||||
// 3.1 达到上限:转 dead,停止后续扫描重试。
|
||||
status = model.OutboxStatusDead
|
||||
nextRetryAt = nil
|
||||
} else {
|
||||
// 3.2 未到上限:按指数退避计算下一次可重试时间。
|
||||
t := now.Add(calcRetryBackoff(nextRetryCount))
|
||||
nextRetryAt = &t
|
||||
}
|
||||
|
||||
// 4. 写回失败原因与状态快照,便于排查问题。
|
||||
lastErr := truncateError(reason)
|
||||
updates := map[string]interface{}{
|
||||
"status": status,
|
||||
@@ -187,66 +163,34 @@ func (d *Repository) MarkFailedForRetry(ctx context.Context, id int64, reason st
|
||||
})
|
||||
}
|
||||
|
||||
// PersistChatHistoryAndMarkConsumed 负责“消费成功后落业务库 + 标记 outbox consumed”。
|
||||
// 之所以必须放在同一个事务里,是为了保证“业务落库”和“状态推进”原子一致:
|
||||
// - 若业务写入失败,不应把 outbox 标记为 consumed;
|
||||
// - 若标记 consumed 失败,也应回滚业务写入,避免出现不可追踪的不一致。
|
||||
func (d *Repository) PersistChatHistoryAndMarkConsumed(ctx context.Context, outboxID int64, payload model.ChatHistoryPersistPayload) error {
|
||||
// ConsumeAndMarkConsumed 是通用“消费成功事务入口”。
|
||||
//
|
||||
// 步骤:
|
||||
// 1. 事务内锁定 outbox 记录;
|
||||
// 2. 已 consumed/dead 时幂等返回;
|
||||
// 3. 执行业务回调 fn(tx);
|
||||
// 4. 业务成功后统一标记 consumed。
|
||||
func (d *Repository) ConsumeAndMarkConsumed(ctx context.Context, outboxID int64, fn func(tx *gorm.DB) error) error {
|
||||
return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
// 1. 先锁定 outbox 记录,确保同一条消息不会被并发消费者重复推进状态。
|
||||
var outboxMsg model.AgentOutboxMessage
|
||||
err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", outboxID).First(&outboxMsg).Error
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
// 1.1 幂等兜底:记录不存在时视为“无事可做”。
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
// 1.2 若已 consumed/dead,说明已被处理过或已终止,直接幂等返回。
|
||||
if outboxMsg.Status == model.OutboxStatusConsumed {
|
||||
return nil
|
||||
}
|
||||
if outboxMsg.Status == model.OutboxStatusDead {
|
||||
if outboxMsg.Status == model.OutboxStatusConsumed || outboxMsg.Status == model.OutboxStatusDead {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 2. 写入聊天历史业务表(chat_histories)。
|
||||
// 这里不包含 token 统计等扩展字段,只负责核心消息落库。
|
||||
chatMsg := payload.Message
|
||||
chatRole := payload.Role
|
||||
history := model.ChatHistory{
|
||||
UserID: payload.UserID,
|
||||
ChatID: payload.ConversationID,
|
||||
MessageContent: &chatMsg,
|
||||
Role: &chatRole,
|
||||
}
|
||||
if err = tx.Create(&history).Error; err != nil {
|
||||
return err
|
||||
if fn != nil {
|
||||
if err = fn(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 同一事务内原子更新会话统计信息:
|
||||
// - message_count + 1
|
||||
// - last_message_at = now
|
||||
// 这样可以保证 message_count 与 chat_histories 的真实落库条数一致。
|
||||
now := time.Now()
|
||||
chatUpdates := map[string]interface{}{
|
||||
"message_count": gorm.Expr("message_count + ?", 1),
|
||||
"last_message_at": &now,
|
||||
}
|
||||
chatResult := tx.Model(&model.AgentChat{}).
|
||||
Where("user_id = ? AND chat_id = ?", payload.UserID, payload.ConversationID).
|
||||
Updates(chatUpdates)
|
||||
if chatResult.Error != nil {
|
||||
return chatResult.Error
|
||||
}
|
||||
if chatResult.RowsAffected == 0 {
|
||||
// 会话不存在时回滚,让 outbox 继续重试/告警,而不是吞掉不一致。
|
||||
return fmt.Errorf("conversation not found when updating stats: user_id=%d chat_id=%s", payload.UserID, payload.ConversationID)
|
||||
}
|
||||
|
||||
// 4. 业务写入成功后,把 outbox 推进到 consumed 最终态。
|
||||
// 并清理错误与重试字段,表示该消息生命周期结束。
|
||||
updates := map[string]interface{}{
|
||||
"status": model.OutboxStatusConsumed,
|
||||
"consumed_at": &now,
|
||||
@@ -258,8 +202,6 @@ func (d *Repository) PersistChatHistoryAndMarkConsumed(ctx context.Context, outb
|
||||
})
|
||||
}
|
||||
|
||||
// calcRetryBackoff 计算指数退避时间。
|
||||
// 规则:1s, 2s, 4s, 8s, 16s, 32s(最多封顶到第 6 档)。
|
||||
func calcRetryBackoff(retryCount int) time.Duration {
|
||||
if retryCount <= 0 {
|
||||
return time.Second
|
||||
@@ -270,7 +212,6 @@ func calcRetryBackoff(retryCount int) time.Duration {
|
||||
return time.Second * time.Duration(1<<(retryCount-1))
|
||||
}
|
||||
|
||||
// truncateError 限制错误文本最大长度,防止写库失败或日志污染。
|
||||
func truncateError(reason string) string {
|
||||
if len(reason) <= 2000 {
|
||||
return reason
|
||||
|
||||
Reference in New Issue
Block a user