diff --git a/README.md b/README.md index 7aec86d..a58a4a9 100644 --- a/README.md +++ b/README.md @@ -274,7 +274,7 @@ CREATE TABLE `users` | **持久层数据库** | **MySQL 8.0** | 存储用户、任务、课表及日程运行图(Schedules)的核心数据。 | | **ORM 框架** | **GORM** | 用于简化 Go 与数据库的交互,利用事务处理 `Apply` 接口的原子性操作。 | | **高性能缓存** | **Redis** | 缓存用户的周日程视图(避免频繁扫表)、存储 Token 临时限额、实现分布式锁防止重复排程。 | -| **消息队列** | **Kafka** | **异步解耦**:当用户点击“应用”时,通过 Kafka 异步触发 AI 消耗统计及任务状态同步。 | +| **消息队列** | **Outbox + Kafka** | **可靠异步解耦**:请求主链路先写 Outbox,后台再投递 Kafka 并消费落库,既降低首字延迟又避免消息瞬时丢失。 | | **AI 编排框架** | **Eino** | 作为 AI Agent 的大脑,根据排程策略(Steady/Rapid)计算任务与水课的嵌入逻辑。 | | **身份认证** | **JWT** | 实现无状态登录,将 `user_id` 封装在 Token 中,确保数据的用户隔离。 | | **配置管理** | **Viper** | 管理数据库、Redis、Kafka 的连接参数,支持多环境(开发/生产)切换。 | diff --git a/backend/cmd/start.go b/backend/cmd/start.go index b6d32b6..45463b3 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -14,6 +14,7 @@ import ( "github.com/LoveLosita/smartflow/backend/pkg" "github.com/LoveLosita/smartflow/backend/routers" "github.com/LoveLosita/smartflow/backend/service" + eventsvc "github.com/LoveLosita/smartflow/backend/service/events" "github.com/spf13/viper" ) @@ -61,21 +62,28 @@ func Start() { agentRepo := dao.NewAgentDAO(db) outboxRepo := outboxinfra.NewRepository(db) - // outbox 异步链路接线: - // - 读取 Kafka 配置 - // - 创建基础设施级 outbox 异步引擎 - // - 引擎内部负责 dispatch/consume 两个后台循环 + // outbox 通用事件总线接线(第二阶段): + // 1. 读取 Kafka 配置; + // 2. 创建 infra 级 EventBus; + // 3. 显式注册“聊天持久化”事件处理器; + // 4. 启动总线后台 dispatch/consume 循环。 kafkaCfg := kafkabus.LoadConfig() - asyncPipeline, err := outboxinfra.NewChatHistoryAsync(outboxRepo, kafkaCfg) + eventBus, err := outboxinfra.NewEventBus(outboxRepo, kafkaCfg) if err != nil { - log.Fatalf("Failed to initialize Kafka async pipeline: %v", err) + log.Fatalf("Failed to initialize outbox event bus: %v", err) } - if asyncPipeline != nil { - asyncPipeline.Start(context.Background()) - defer asyncPipeline.Close() - log.Println("Kafka async pipeline started") + if eventBus != nil { + // 3. 在启动前完成“业务事件处理器”注册。 + // 3.1 这里显式调用 service/events,保证 infra 层不承载业务语义。 + // 3.2 若注册失败直接中止启动,避免“消息已入队但无人消费”的隐性故障。 + if err = eventsvc.RegisterChatHistoryPersistHandler(eventBus, outboxRepo, manager); err != nil { + log.Fatalf("Failed to register chat history event handler: %v", err) + } + eventBus.Start(context.Background()) + defer eventBus.Close() + log.Println("Outbox event bus started") } else { - log.Println("Kafka async pipeline is disabled") + log.Println("Outbox event bus is disabled") } // Service 层初始化。 @@ -84,7 +92,7 @@ func Start() { courseService := service.NewCourseService(courseRepo, scheduleRepo) taskClassService := service.NewTaskClassService(taskClassRepo, cacheRepo, scheduleRepo, manager) scheduleService := service.NewScheduleService(scheduleRepo, userRepo, taskClassRepo, manager, cacheRepo) - agentService := service.NewAgentService(aiHub, agentRepo, taskRepo, agentCacheRepo, asyncPipeline) + agentService := service.NewAgentService(aiHub, agentRepo, taskRepo, agentCacheRepo, eventBus) // API 层初始化。 userApi := api.NewUserHandler(userService) diff --git a/backend/dao/agent.go b/backend/dao/agent.go index b572e0b..bdd7e1d 100644 --- a/backend/dao/agent.go +++ b/backend/dao/agent.go @@ -19,41 +19,68 @@ func NewAgentDAO(db *gorm.DB) *AgentDAO { return &AgentDAO{db: db} } -func (a *AgentDAO) SaveChatHistory(ctx context.Context, userID int, conversationID string, role, message string) error { - // 1. 同步落库路径也要保证“消息写入”和“会话计数更新”原子一致。 - // 因此这里使用事务,避免出现“有消息但 message_count 没加”或反过来的不一致状态。 - return a.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { - // 1.1 先写 chat_histories。 - userChat := model.ChatHistory{ - UserID: userID, - MessageContent: &message, - Role: &role, - ChatID: conversationID, - } - if err := tx.Create(&userChat).Error; err != nil { - return err - } +func (r *AgentDAO) WithTx(tx *gorm.DB) *AgentDAO { + return &AgentDAO{db: tx} +} - // 1.2 再原子更新 agent_chats 的统计字段: - // - message_count: +1 - // - last_message_at: 当前时间 - // 这样 message_count 语义就稳定等于“已成功落库的消息条数”。 - now := time.Now() - updates := map[string]interface{}{ - "message_count": gorm.Expr("message_count + ?", 1), - "last_message_at": &now, - } - result := tx.Model(&model.AgentChat{}). - Where("user_id = ? AND chat_id = ?", userID, conversationID). - Updates(updates) - if result.Error != nil { - return result.Error - } - if result.RowsAffected == 0 { - // 会话不存在视为数据不一致,回滚事务,防止产生“孤儿历史记录”。 - return fmt.Errorf("conversation not found when updating stats: user_id=%d chat_id=%s", userID, conversationID) - } - return nil +// saveChatHistoryCore 是“聊天消息落库 + 会话统计更新”的核心实现。 +// +// 职责边界: +// 1. 只执行当前 DAO 句柄上的数据库写入动作; +// 2. 不主动开启事务(事务由调用方决定); +// 3. 保证 chat_histories 与 agent_chats.message_count 的一致性口径。 +// +// 失败处理: +// 1. 任一步骤失败都返回 error; +// 2. 若调用方处于事务中,返回 error 会触发事务回滚。 +func (a *AgentDAO) saveChatHistoryCore(ctx context.Context, userID int, conversationID string, role, message string) error { + // 1. 先写 chat_histories 原始消息。 + userChat := model.ChatHistory{ + UserID: userID, + MessageContent: &message, + Role: &role, + ChatID: conversationID, + } + if err := a.db.WithContext(ctx).Create(&userChat).Error; err != nil { + return err + } + + // 2. 再更新会话统计(message_count +1, last_message_at=now)。 + now := time.Now() + updates := map[string]interface{}{ + "message_count": gorm.Expr("message_count + ?", 1), + "last_message_at": &now, + } + result := a.db.WithContext(ctx).Model(&model.AgentChat{}). + Where("user_id = ? AND chat_id = ?", userID, conversationID). + Updates(updates) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + // 会话不存在时直接失败,避免出现“孤儿历史消息”。 + return fmt.Errorf("conversation not found when updating stats: user_id=%d chat_id=%s", userID, conversationID) + } + return nil +} + +// SaveChatHistoryInTx 在调用方“已开启事务”的场景下写入聊天历史。 +// +// 设计目的: +// 1. 给服务层组合多个 DAO 操作时复用,避免嵌套事务; +// 2. 让 outbox 消费处理器可以和业务写入共享同一个 tx。 +func (a *AgentDAO) SaveChatHistoryInTx(ctx context.Context, userID int, conversationID string, role, message string) error { + return a.saveChatHistoryCore(ctx, userID, conversationID, role, message) +} + +// SaveChatHistory 在同步直写路径下写入聊天历史。 +// +// 说明: +// 1. 该方法会自行开启事务; +// 2. 内部复用 saveChatHistoryCore,确保和 SaveChatHistoryInTx 的业务口径完全一致。 +func (a *AgentDAO) SaveChatHistory(ctx context.Context, userID int, conversationID string, role, message string) error { + return a.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + return a.WithTx(tx).saveChatHistoryCore(ctx, userID, conversationID, role, message) }) } @@ -80,7 +107,7 @@ func (a *AgentDAO) GetUserChatHistories(ctx context.Context, userID, limit int, if err != nil { return nil, err } - // 保留“最近 N 条”的前提下,反转为时间正序,便于模型消费 + // 保留“最近 N 条”后,反转成时间正序,方便模型消费。 for i, j := 0, len(histories)-1; i < j; i, j = i+1, j-1 { histories[i], histories[j] = histories[j], histories[i] } @@ -92,17 +119,14 @@ func (a *AgentDAO) IfChatExists(ctx context.Context, userID int, chatID string) err := a.db.WithContext(ctx).Where("user_id = ? AND chat_id = ?", userID, chatID).First(&chat).Error if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { - return false, nil // 没有找到记录,表示会话不存在 + return false, nil } return false, err } return true, nil } -// GetConversationMeta 查询单个会话的元信息。 -// 用途: -// 1) 给前端提供“当前会话标题/消息数/最近消息时间”等展示字段; -// 2) 与流式聊天接口解耦,避免在 SSE 头部里塞动态标题。 +// GetConversationMeta 查询单个会话元信息。 func (a *AgentDAO) GetConversationMeta(ctx context.Context, userID int, chatID string) (*model.AgentChat, error) { var chat model.AgentChat err := a.db.WithContext(ctx). @@ -116,10 +140,6 @@ func (a *AgentDAO) GetConversationMeta(ctx context.Context, userID int, chatID s } // GetConversationTitle 读取当前会话标题。 -// 返回值说明: -// 1) title:标题内容(若为空表示尚未生成); -// 2) exists:会话是否存在; -// 3) err:数据库错误。 func (a *AgentDAO) GetConversationTitle(ctx context.Context, userID int, chatID string) (title string, exists bool, err error) { var chat model.AgentChat queryErr := a.db.WithContext(ctx). @@ -138,10 +158,7 @@ func (a *AgentDAO) GetConversationTitle(ctx context.Context, userID int, chatID return strings.TrimSpace(*chat.Title), true, nil } -// UpdateConversationTitleIfEmpty 仅在标题为空时写入会话标题。 -// 设计目的: -// 1) 避免每轮对话都覆盖已有标题; -// 2) 并发下保持幂等:多个 goroutine 同时尝试写标题,最终只会成功一次。 +// UpdateConversationTitleIfEmpty 仅在标题为空时更新会话标题。 func (a *AgentDAO) UpdateConversationTitleIfEmpty(ctx context.Context, userID int, chatID, title string) error { normalized := strings.TrimSpace(title) if normalized == "" { diff --git a/backend/dao/base.go b/backend/dao/base.go index ca6f046..1020bdf 100644 --- a/backend/dao/base.go +++ b/backend/dao/base.go @@ -6,7 +6,7 @@ import ( "gorm.io/gorm" ) -// RepoManager 囊括了所有的 Repo +// RepoManager 聚合所有 DAO,供服务层做跨仓储事务编排。 type RepoManager struct { db *gorm.DB Schedule *ScheduleDAO @@ -14,6 +14,7 @@ type RepoManager struct { Course *CourseDAO TaskClass *TaskClassDAO User *UserDAO + Agent *AgentDAO } func NewManager(db *gorm.DB) *RepoManager { @@ -24,21 +25,37 @@ func NewManager(db *gorm.DB) *RepoManager { Course: NewCourseDAO(db), TaskClass: NewTaskClassDAO(db), User: NewUserDAO(db), + Agent: NewAgentDAO(db), } } -// Transaction 核心函数:开启一个带事务的“新管理器” +// WithTx 基于外部事务句柄构造“同事务 RepoManager”。 +// +// 职责边界: +// 1. 只做 DAO 依赖重绑定,不开启/提交/回滚事务; +// 2. 让服务层在一个 tx 内调用多个 DAO 方法; +// 3. 适用于 outbox 消费处理器这类“基础设施事务 + 业务事务合并”的场景。 +func (m *RepoManager) WithTx(tx *gorm.DB) *RepoManager { + return &RepoManager{ + db: tx, + Schedule: m.Schedule.WithTx(tx), + Task: m.Task.WithTx(tx), + TaskClass: m.TaskClass.WithTx(tx), + Course: m.Course.WithTx(tx), + User: m.User.WithTx(tx), + Agent: m.Agent.WithTx(tx), + } +} + +// Transaction 开启事务并把“同事务 RepoManager”传给回调。 +// +// 使用约束: +// 1. 回调里应只使用 txM 下挂 DAO,避免混入事务外句柄; +// 2. 回调返回 error 会触发整体回滚; +// 3. 回调返回 nil 表示提交事务。 func (m *RepoManager) Transaction(ctx context.Context, fn func(txM *RepoManager) error) error { return m.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { - // 💡 关键:创建一个新的 RepoManager,里面的 Repo 全部注入这个 tx 句柄 - txM := &RepoManager{ - db: tx, - Schedule: m.Schedule.WithTx(tx), - Task: m.Task.WithTx(tx), - TaskClass: m.TaskClass.WithTx(tx), - Course: m.Course.WithTx(tx), - User: m.User.WithTx(tx), - } + txM := m.WithTx(tx) return fn(txM) }) } diff --git a/backend/infra/kafka/envelope.go b/backend/infra/kafka/envelope.go index dbb6fff..2b0148b 100644 --- a/backend/infra/kafka/envelope.go +++ b/backend/infra/kafka/envelope.go @@ -2,13 +2,25 @@ package kafka import "encoding/json" -// Envelope 是 outbox 投递到 Kafka 的统一包裹结构。 -// 设计目的: -// 1) 消费端先拿到 outbox_id,可直接回写状态; -// 2) biz_type 做分发,支持后续扩展更多异步业务; -// 3) payload 保持原始 JSON,按业务类型再反序列化。 +// Envelope 是 outbox 投递到 Kafka 的统一协议包。 +// +// 协议边界: +// 1. 这是总线协议,不包含具体业务字段; +// 2. 路由只依赖 event_type,不再保留 biz_type 兼容字段; +// 3. payload 为原始业务 JSON,由业务 handler 决定如何反序列化。 type Envelope struct { - OutboxID int64 `json:"outbox_id"` - BizType string `json:"biz_type"` - Payload json.RawMessage `json:"payload"` + // OutboxID 是 outbox 状态机主键,用于消费者回写 consumed/retry/dead。 + OutboxID int64 `json:"outbox_id"` + // EventID 是事件唯一标识(当前默认回退为 outbox_id 字符串)。 + EventID string `json:"event_id,omitempty"` + + // EventType 是唯一路由键(例如 chat.history.persist.requested)。 + EventType string `json:"event_type"` + // EventVersion 是事件版本号(默认 v1)。 + EventVersion string `json:"event_version,omitempty"` + // AggregateID 是聚合主键(例如 conversation_id),用于追踪同一业务对象事件流。 + AggregateID string `json:"aggregate_id,omitempty"` + + // Payload 是业务载荷 JSON。 + Payload json.RawMessage `json:"payload"` } diff --git a/backend/infra/outbox/chat_history_async.go b/backend/infra/outbox/chat_history_async.go deleted file mode 100644 index d7d77b3..0000000 --- a/backend/infra/outbox/chat_history_async.go +++ /dev/null @@ -1,91 +0,0 @@ -package outbox - -import ( - "context" - "encoding/json" - "errors" - - kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" - "github.com/LoveLosita/smartflow/backend/model" -) - -// ChatHistoryAsync 是“聊天记录异步持久化”的业务适配器。 -// -// 设计目的: -// 1) 让业务层只调用 EnqueueChatHistoryPersist,而不感知扫描/投递/消费细节; -// 2) 保持现有 Agent 代码调用习惯,降低改造面; -// 3) 把具体的 outbox+kafka 主流程彻底收敛到 infra。 -type ChatHistoryAsync struct { - engine *Engine -} - -// NewChatHistoryAsync 创建聊天记录异步适配器并注册处理器。 -// -// 处理器职责: -// 1) 从 envelope payload 解析聊天载荷; -// 2) 调用仓储“落库并标记 consumed”; -// 3) 解析失败时标记 dead(不可恢复错误),避免无意义重试。 -func NewChatHistoryAsync(repo *Repository, cfg kafkabus.Config) (*ChatHistoryAsync, error) { - // 1. 先创建通用引擎,内部会按 cfg.Enabled 决定是否启用。 - engine, err := NewEngine(repo, cfg) - if err != nil { - return nil, err - } - if engine == nil { - // 2. 异步开关关闭:返回 nil 交给上层走同步降级路径。 - return nil, nil - } - - // 3. 注册“聊天记录持久化”业务处理器。 - // 该处理器只做三件事: - // 3.1 解析 payload; - // 3.2 调仓储落库并推进 consumed; - // 3.3 遇到不可恢复错误时标记 dead。 - if err = engine.RegisterHandler(model.OutboxBizTypeChatHistoryPersist, func(ctx context.Context, envelope kafkabus.Envelope) error { - var payload model.ChatHistoryPersistPayload - if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { - _ = repo.MarkDead(ctx, envelope.OutboxID, "解析聊天持久化载荷失败: "+unmarshalErr.Error()) - // 返回 nil:该错误已被标记为 dead,不需要再走重试。 - return nil - } - // 返回 error:由 engine 统一标记为 retry 并提交 offset。 - return repo.PersistChatHistoryAndMarkConsumed(ctx, envelope.OutboxID, payload) - }); err != nil { - // 4. 注册失败时回收已创建的引擎资源,防止泄漏。 - engine.Close() - return nil, err - } - - // 5. 返回业务适配器,对业务层暴露“更语义化”的调用入口。 - return &ChatHistoryAsync{engine: engine}, nil -} - -// Start 启动异步引擎(扫描 + 消费)。 -func (a *ChatHistoryAsync) Start(ctx context.Context) { - // 允许在未初始化(例如异步关闭)时被安全调用。 - if a == nil || a.engine == nil { - return - } - a.engine.Start(ctx) -} - -// Close 关闭异步引擎资源。 -func (a *ChatHistoryAsync) Close() { - // 允许在未初始化(例如异步关闭)时被安全调用。 - if a == nil || a.engine == nil { - return - } - a.engine.Close() -} - -// EnqueueChatHistoryPersist 将聊天记录持久化请求写入 outbox。 -// 该方法是业务层唯一需要调用的入口。 -func (a *ChatHistoryAsync) EnqueueChatHistoryPersist(ctx context.Context, payload model.ChatHistoryPersistPayload) error { - // 1. 若引擎未初始化,说明启动配置有问题或异步功能未启用。 - // 这里显式返回错误,交由业务层按需降级/告警。 - if a == nil || a.engine == nil { - return errors.New("chat history async is not initialized") - } - // 2. 以 conversation_id 作为 messageKey,尽量让同会话消息落在稳定分区。 - return a.engine.Enqueue(ctx, model.OutboxBizTypeChatHistoryPersist, payload.ConversationID, payload) -} diff --git a/backend/infra/outbox/engine.go b/backend/infra/outbox/engine.go index 1c0a8a9..a053b5d 100644 --- a/backend/infra/outbox/engine.go +++ b/backend/infra/outbox/engine.go @@ -6,6 +6,8 @@ import ( "errors" "fmt" "log" + "strconv" + "strings" "sync" "time" @@ -15,24 +17,35 @@ import ( "gorm.io/gorm" ) -// MessageHandler 是 outbox 消费分发处理器。 +// MessageHandler 是事件消费处理器。 // -// 设计约束: -// 1) 入参 envelope 已经完成最外层解析(含 outbox_id、biz_type、payload); -// 2) 若返回 nil,表示业务处理成功,框架将继续提交 Kafka offset; -// 3) 若返回 error,框架会按“可重试错误”处理:回写 outbox 失败状态并进入重试窗口。 +// 语义约束: +// 1. 入参 envelope 已完成最外层解析; +// 2. 返回 nil 表示处理成功,框架提交 offset; +// 3. 返回 error 表示可重试失败,框架回写 retry 后提交 offset。 type MessageHandler func(ctx context.Context, envelope kafkabus.Envelope) error -// Engine 是 Outbox + Kafka 的通用异步引擎。 +// PublishRequest 是通用事件发布入参。 +// +// 设计目标: +// 1. 业务只描述“要发什么事件”,不关心 outbox/kafka 细节; +// 2. 统一收敛事件元数据(event_type/version/aggregate_id); +// 3. payload 支持任意 DTO,由 infra 统一 JSON 序列化。 +type PublishRequest struct { + EventType string + EventVersion string + MessageKey string + AggregateID string + EventID string + Payload any +} + +// Engine 是 Outbox + Kafka 通用异步引擎。 // // 职责边界: -// 1) 负责 outbox 扫描、Kafka 投递、Kafka 消费与统一状态机流转; -// 2) 负责 biz_type 到处理器的分发; -// 3) 不关心具体业务含义(例如“聊天记录落库”),业务语义由 handler 提供。 -// -// 状态流转口径: -// pending -> published -> consumed(成功); -// pending/published --失败--> pending(带 next_retry_at) 或 dead(达到最大重试)。 +// 1. 负责 outbox 扫描、kafka 投递、kafka 消费、状态机推进; +// 2. 负责 event_type -> handler 路由; +// 3. 不负责任何业务语义(业务由 handler 承担)。 type Engine struct { repo *Repository producer *kafkabus.Producer @@ -48,23 +61,19 @@ type Engine struct { handlers map[string]MessageHandler } -// NewEngine 创建 outbox 异步引擎。 +// NewEngine 创建异步引擎。 // -// 说明: -// 1) cfg.Enabled=false 时返回 nil,调用方可按“异步关闭”处理; -// 2) producer/consumer 初始化失败时会确保资源回收,避免半初始化泄漏。 +// 规则: +// 1. kafka.enabled=false 时返回 nil,调用方可降级同步; +// 2. producer/consumer 任一步失败都会回收已创建资源。 func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) { - // 1. 配置关闭时直接返回 nil,让上层可以“无侵入降级为同步模式”。 if !cfg.Enabled { return nil, nil } - // 2. 仓储缺失属于启动期配置错误,直接返回。 if repo == nil { return nil, errors.New("outbox repository is nil") } - // 3. 先初始化 producer,再初始化 consumer。 - // 如果第二步失败,要主动回收第一步资源,避免泄漏。 producer, err := kafkabus.NewProducer(cfg) if err != nil { return nil, err @@ -75,7 +84,6 @@ func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) { return nil, err } - // 4. 汇总配置,构造引擎实例。 return &Engine{ repo: repo, producer: producer, @@ -89,78 +97,62 @@ func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) { }, nil } -// RegisterHandler 注册某个 biz_type 的消费处理器。 -// -// 设计要求: -// 1) biz_type 必须唯一,重复注册会覆盖旧值(并打印提示日志); -// 2) handler 不能为空; -// 3) 建议在 Start 前完成注册,减少运行时热更新复杂度。 -func (e *Engine) RegisterHandler(bizType string, handler MessageHandler) error { - // 1. 参数校验:防止业务侧在启动链路上把 nil 引擎继续往下用。 +// RegisterHandler 是历史别名(等价 RegisterEventHandler)。 +func (e *Engine) RegisterHandler(eventType string, handler MessageHandler) error { + return e.RegisterEventHandler(eventType, handler) +} + +// RegisterEventHandler 注册事件处理器。 +func (e *Engine) RegisterEventHandler(eventType string, handler MessageHandler) error { if e == nil { return errors.New("outbox engine is nil") } - // 2. biz_type 为空会导致无法分发,提前拦截。 - if bizType == "" { - return errors.New("bizType is empty") + eventType = strings.TrimSpace(eventType) + if eventType == "" { + return errors.New("eventType is empty") } - // 3. handler 为空会在消费时 panic,必须提前拒绝。 if handler == nil { return errors.New("handler is nil") } - // 4. 加写锁更新 handler 映射,保证并发注册时 map 安全。 e.handlersMu.Lock() defer e.handlersMu.Unlock() - if _, exists := e.handlers[bizType]; exists { - log.Printf("outbox handler 覆盖注册: biz_type=%s", bizType) + if _, exists := e.handlers[eventType]; exists { + log.Printf("outbox handler 覆盖注册: event_type=%s", eventType) } - e.handlers[bizType] = handler + e.handlers[eventType] = handler return nil } -func (e *Engine) getHandler(bizType string) (MessageHandler, bool) { - // 读锁足够满足并发读取需求,避免无谓阻塞。 +func (e *Engine) getHandler(eventType string) (MessageHandler, bool) { e.handlersMu.RLock() defer e.handlersMu.RUnlock() - h, ok := e.handlers[bizType] + h, ok := e.handlers[eventType] return h, ok } -// Start 启动 outbox 异步引擎。 -// -// 会启动两个后台循环: -// 1) dispatch loop:扫描 due outbox 并投递到 Kafka; -// 2) consume loop:消费 Kafka 并按 biz_type 分发处理。 +// Start 启动 dispatch + consume 两个后台循环。 func (e *Engine) Start(ctx context.Context) { if e == nil { return } - // 1. 启动日志:把关键运行参数打出来,便于排查“为什么没消费/没扫描”。 log.Printf("outbox engine starting: topic=%s brokers=%v retry_scan=%s batch=%d", e.topic, e.brokers, e.scanEvery, e.scanBatch) - - // 2. 启动前探活 topic 是否可用。 - // 注意:即使探活失败也不会阻断引擎启动,后续循环会继续重试。 if err := kafkabus.WaitTopicReady(ctx, e.brokers, e.topic, 30*time.Second); err != nil { log.Printf("Kafka topic not ready before consume loop start: %v", err) } else { log.Printf("Kafka topic is ready: %s", e.topic) } - // 3. 并行启动两条核心循环: - // - dispatch loop:负责 outbox -> Kafka; - // - consume loop:负责 Kafka -> handler -> outbox 状态推进。 go e.startDispatchLoop(ctx) go e.startConsumeLoop(ctx) } -// Close 关闭 producer/consumer 资源。 +// Close 关闭 kafka 资源。 func (e *Engine) Close() { if e == nil { return } - // 逐个关闭并记录错误,避免某个 close 失败导致后续资源无法回收。 if err := e.producer.Close(); err != nil { log.Printf("关闭 Kafka producer 失败: %v", err) } @@ -169,35 +161,65 @@ func (e *Engine) Close() { } } -// Enqueue 把业务消息写入 outbox(请求路径调用)。 +// Enqueue 是历史别名(等价 Publish)。 +func (e *Engine) Enqueue(ctx context.Context, eventType, messageKey string, payload any) error { + return e.Publish(ctx, PublishRequest{ + EventType: eventType, + MessageKey: messageKey, + AggregateID: messageKey, + Payload: payload, + }) +} + +// Publish 发布事件到 outbox。 // -// 注意: -// 1) 该方法不做 Kafka 网络写入,只有数据库写入; -// 2) messageKey 建议使用业务幂等键(如 conversation_id)以提升分区稳定性; -// 3) payload 需要可 JSON 序列化。 -func (e *Engine) Enqueue(ctx context.Context, bizType, messageKey string, payload any) error { +// 步骤: +// 1. 标准化 event_type/version/key; +// 2. payload 序列化; +// 3. 写入 outbox(仅本地写库,不做 kafka 网络 IO)。 +func (e *Engine) Publish(ctx context.Context, req PublishRequest) error { if e == nil { return errors.New("outbox engine is nil") } - // 这里故意只写数据库,不做 Kafka 网络 IO, - // 目的是把请求耗时稳定在“单次写库”的可控范围。 - _, err := e.repo.CreateMessage(ctx, bizType, e.topic, messageKey, payload, e.maxRetry) + + eventType := strings.TrimSpace(req.EventType) + if eventType == "" { + return errors.New("eventType is empty") + } + eventVersion := strings.TrimSpace(req.EventVersion) + if eventVersion == "" { + eventVersion = DefaultEventVersion + } + messageKey := strings.TrimSpace(req.MessageKey) + aggregateID := strings.TrimSpace(req.AggregateID) + if aggregateID == "" { + aggregateID = messageKey + } + + payloadJSON, err := json.Marshal(req.Payload) + if err != nil { + return err + } + + _, err = e.repo.CreateMessage(ctx, eventType, e.topic, messageKey, OutboxEventPayload{ + EventID: strings.TrimSpace(req.EventID), + EventType: eventType, + EventVersion: eventVersion, + AggregateID: aggregateID, + Payload: payloadJSON, + }, e.maxRetry) return err } func (e *Engine) startDispatchLoop(ctx context.Context) { - // 1. 定时扫描 due outbox 记录。 - // 扫描间隔由 scanEvery 控制,避免每次请求都主动触发投递造成抖动。 ticker := time.NewTicker(e.scanEvery) defer ticker.Stop() for { select { case <-ctx.Done(): - // 2. 收到退出信号后优雅停止循环。 return case <-ticker.C: - // 3. 拉取当前窗口内可投递消息。 pendingMessages, err := e.repo.ListDueMessages(ctx, e.scanBatch) if err != nil { log.Printf("扫描 outbox 失败: %v", err) @@ -207,7 +229,6 @@ func (e *Engine) startDispatchLoop(ctx context.Context) { log.Printf("outbox due messages=%d, start dispatch", len(pendingMessages)) } - // 4. 逐条投递,单条失败不影响同批后续消息。 for _, msg := range pendingMessages { if err = e.dispatchOne(ctx, msg.ID); err != nil { log.Printf("重试投递 outbox 消息失败(id=%d): %v", msg.ID, err) @@ -218,29 +239,39 @@ func (e *Engine) startDispatchLoop(ctx context.Context) { } func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error { - // 1. 投递前重新按 ID 读取最新状态,避免用到过期快照。 outboxMsg, err := e.repo.GetByID(ctx, outboxID) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { - // 1.1 记录已不存在(可能被清理),按幂等成功处理。 return nil } return err } - // 1.2 最终态直接跳过,避免重复投递。 if outboxMsg.Status == model.OutboxStatusConsumed || outboxMsg.Status == model.OutboxStatusDead { return nil } - // 2. 组装 Kafka 包装体,统一带上 outbox_id 供消费端做状态回写。 + eventPayload, payloadErr := parseOutboxEventPayload(outboxMsg.Payload) + if payloadErr != nil { + markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "解析 outbox 事件包失败: "+payloadErr.Error()) + if markErr != nil { + log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr) + } + return payloadErr + } + if eventPayload.EventID == "" { + eventPayload.EventID = strconv.FormatInt(outboxMsg.ID, 10) + } + envelope := kafkabus.Envelope{ - OutboxID: outboxMsg.ID, - BizType: outboxMsg.BizType, - Payload: json.RawMessage(outboxMsg.Payload), + OutboxID: outboxMsg.ID, + EventID: eventPayload.EventID, + EventType: eventPayload.EventType, + EventVersion: eventPayload.EventVersion, + AggregateID: eventPayload.AggregateID, + Payload: eventPayload.PayloadJSON, } raw, err := json.Marshal(envelope) if err != nil { - // 2.1 包装层序列化失败通常不可恢复,直接标 dead。 markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 包装失败: "+err.Error()) if markErr != nil { log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr) @@ -248,8 +279,6 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error { return err } - // 3. 先投 Kafka,再把 outbox 状态推进到 published。 - // 任一步骤失败都回写 retry,让扫描器后续重试。 if err = e.producer.Enqueue(ctx, outboxMsg.Topic, outboxMsg.MessageKey, raw); err != nil { _ = e.repo.MarkFailedForRetry(ctx, outboxMsg.ID, "投递 Kafka 失败: "+err.Error()) return err @@ -262,29 +291,23 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error { } func (e *Engine) startConsumeLoop(ctx context.Context) { - // 消费循环采用“拉取 -> 处理 -> 提交 offset”的标准模型。 for { select { case <-ctx.Done(): - // 1. 收到退出信号后终止循环。 return default: } - // 2. 拉取下一条 Kafka 消息。 msg, err := e.consumer.Dequeue(ctx) if err != nil { if errors.Is(err, context.Canceled) { - // 2.1 context 主动取消时,不记错误日志,直接退出。 return } - // 2.2 临时错误短暂退避后继续,避免空转刷日志。 log.Printf("Kafka 消费拉取失败(topic=%s): %v", e.topic, err) time.Sleep(300 * time.Millisecond) continue } - // 3. 单条消息处理失败仅记录日志,不阻断消费循环。 if err = e.handleMessage(ctx, msg); err != nil { log.Printf("处理 Kafka 消息失败(topic=%s, partition=%d, offset=%d): %v", msg.Topic, msg.Partition, msg.Offset, err) } @@ -292,33 +315,35 @@ func (e *Engine) startConsumeLoop(ctx context.Context) { } func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) error { - // 1. 先解析最外层 envelope,拿到 outbox_id + biz_type + payload。 var envelope kafkabus.Envelope if err := json.Unmarshal(msg.Value, &envelope); err != nil { - // 1.1 包装层损坏时无法恢复,直接提交 offset 防止无限重放。 _ = e.consumer.Commit(ctx, msg) return fmt.Errorf("解析 Kafka 包装失败: %w", err) } if envelope.OutboxID <= 0 { - // 1.2 缺少 outbox_id 无法回写状态,同样提交 offset 跳过。 _ = e.consumer.Commit(ctx, msg) return errors.New("Kafka 包装缺少 outbox_id") } - // 2. 根据 biz_type 查找业务处理器。 - handler, ok := e.getHandler(envelope.BizType) - if !ok { - // 2.1 未注册处理器是配置错误,标记 dead 并提交 offset,避免重复消费。 - _ = e.repo.MarkDead(ctx, envelope.OutboxID, "未知业务类型: "+envelope.BizType) + eventType := strings.TrimSpace(envelope.EventType) + if eventType == "" { + _ = e.repo.MarkDead(ctx, envelope.OutboxID, "消息缺少事件类型") + if err := e.consumer.Commit(ctx, msg); err != nil { + return err + } + return nil + } + + handler, ok := e.getHandler(eventType) + if !ok { + _ = e.repo.MarkDead(ctx, envelope.OutboxID, "未知事件类型: "+eventType) if err := e.consumer.Commit(ctx, msg); err != nil { return err } return nil } - // 3. 调用业务处理器。 if err := handler(ctx, envelope); err != nil { - // 统一按“可重试错误”处理,回写 retry 状态后提交 offset,避免同一条消息在 Kafka 侧死循环。 if markErr := e.repo.MarkFailedForRetry(ctx, envelope.OutboxID, "消费处理失败: "+err.Error()); markErr != nil { return markErr } @@ -328,6 +353,5 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er return err } - // 4. 业务处理成功后提交 offset。 return e.consumer.Commit(ctx, msg) } diff --git a/backend/infra/outbox/event_bus.go b/backend/infra/outbox/event_bus.go new file mode 100644 index 0000000..dd53529 --- /dev/null +++ b/backend/infra/outbox/event_bus.go @@ -0,0 +1,87 @@ +package outbox + +import ( + "context" + "errors" + + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" +) + +// EventPublisher 是通用事件发布能力接口。 +// +// 职责边界: +// 1. 只暴露“发布事件”这一件事,隐藏底层 outbox/kafka 实现细节; +// 2. 业务层只依赖该接口,避免直接耦合具体引擎结构体; +// 3. 该接口不承诺“立即消费成功”,只承诺“事件已入队或返回错误”。 +type EventPublisher interface { + Publish(ctx context.Context, req PublishRequest) error +} + +// EventBus 是 outbox 异步总线的门面对象。 +// +// 设计目的: +// 1. 对外提供“发布 + 注册处理器 + 启停”三类最小能力; +// 2. 对内复用 Engine,不重复实现状态机和调度逻辑; +// 3. 为后续引入更多事件类型提供统一扩展点。 +type EventBus struct { + engine *Engine +} + +// NewEventBus 创建通用事件总线。 +// +// 说明: +// 1. 当 kafka.enabled=false 时返回 nil,调用方可直接降级为同步模式; +// 2. 该方法只创建基础设施对象,不自动注册任何业务事件处理器; +// 3. 业务事件处理器注册应由上层在启动阶段显式完成,避免隐式副作用。 +func NewEventBus(repo *Repository, cfg kafkabus.Config) (*EventBus, error) { + engine, err := NewEngine(repo, cfg) + if err != nil { + return nil, err + } + if engine == nil { + return nil, nil + } + return &EventBus{engine: engine}, nil +} + +// RegisterEventHandler 注册事件处理器。 +// +// 失败语义: +// 1. bus 未初始化时直接返回错误; +// 2. event_type 为空或 handler 为空时返回错误; +// 3. 重复注册时采用“后者覆盖前者”并打日志(由 Engine 负责)。 +func (b *EventBus) RegisterEventHandler(eventType string, handler MessageHandler) error { + if b == nil || b.engine == nil { + return errors.New("event bus is not initialized") + } + return b.engine.RegisterEventHandler(eventType, handler) +} + +// Publish 发布事件到 outbox 队列。 +// +// 关键语义: +// 1. 返回 nil 仅表示“已写入 outbox 成功”; +// 2. 真正 Kafka 投递与业务消费由后台异步循环完成; +// 3. 若返回 error,表示本次入队失败,调用方应按业务策略决定是否重试/降级。 +func (b *EventBus) Publish(ctx context.Context, req PublishRequest) error { + if b == nil || b.engine == nil { + return errors.New("event bus is not initialized") + } + return b.engine.Publish(ctx, req) +} + +// Start 启动事件总线后台循环(dispatch + consume)。 +func (b *EventBus) Start(ctx context.Context) { + if b == nil || b.engine == nil { + return + } + b.engine.Start(ctx) +} + +// Close 关闭事件总线资源(producer/consumer)。 +func (b *EventBus) Close() { + if b == nil || b.engine == nil { + return + } + b.engine.Close() +} diff --git a/backend/infra/outbox/event_contract.go b/backend/infra/outbox/event_contract.go new file mode 100644 index 0000000..c7022a9 --- /dev/null +++ b/backend/infra/outbox/event_contract.go @@ -0,0 +1,64 @@ +package outbox + +import ( + "encoding/json" + "errors" + "strings" +) + +const ( + // DefaultEventVersion 是通用事件协议默认版本。 + DefaultEventVersion = "v1" +) + +// OutboxEventPayload 是 outbox.payload 的统一事件外壳。 +type OutboxEventPayload struct { + EventID string `json:"event_id,omitempty"` + EventType string `json:"event_type"` + EventVersion string `json:"event_version,omitempty"` + AggregateID string `json:"aggregate_id,omitempty"` + Payload json.RawMessage `json:"payload"` +} + +// ParsedOutboxEventPayload 是 dispatch 阶段使用的标准化结构。 +type ParsedOutboxEventPayload struct { + EventID string + EventType string + EventVersion string + AggregateID string + PayloadJSON json.RawMessage +} + +// parseOutboxEventPayload 解析 outbox.payload。 +// +// 当前策略(极致清理版): +// 1. 只接受“统一事件外壳”格式; +// 2. 不再支持旧格式纯业务 JSON 回退; +// 3. event_type 缺失时直接报错,交由上层标 dead。 +func parseOutboxEventPayload(rawPayload string) (*ParsedOutboxEventPayload, error) { + var wrapped OutboxEventPayload + if err := json.Unmarshal([]byte(rawPayload), &wrapped); err != nil { + return nil, err + } + + eventType := strings.TrimSpace(wrapped.EventType) + if eventType == "" { + return nil, errors.New("event type is empty") + } + if len(wrapped.Payload) == 0 { + return nil, errors.New("payload is empty") + } + + eventVersion := strings.TrimSpace(wrapped.EventVersion) + if eventVersion == "" { + eventVersion = DefaultEventVersion + } + + return &ParsedOutboxEventPayload{ + EventID: strings.TrimSpace(wrapped.EventID), + EventType: eventType, + EventVersion: eventVersion, + AggregateID: strings.TrimSpace(wrapped.AggregateID), + PayloadJSON: wrapped.Payload, + }, nil +} diff --git a/backend/infra/outbox/repository.go b/backend/infra/outbox/repository.go index 10bf319..0ae922b 100644 --- a/backend/infra/outbox/repository.go +++ b/backend/infra/outbox/repository.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "time" "github.com/LoveLosita/smartflow/backend/model" @@ -12,47 +11,44 @@ import ( "gorm.io/gorm/clause" ) +// Repository 是 outbox 状态机仓储。 +// +// 职责边界: +// 1. 只负责 outbox 状态流转与通用事务编排; +// 2. 不负责任何业务语义(例如聊天/任务/标题等具体落库); +// 3. 消费成功时通过回调把业务动作注入同一事务,保证原子一致。 type Repository struct { db *gorm.DB } -// NewRepository 构造 outbox 仓储。 -// 该仓储只关心“数据库状态机”,不关心 Kafka 投递/消费。 func NewRepository(db *gorm.DB) *Repository { return &Repository{db: db} } -// CreateMessage 是通用 outbox 入队入口。 +// WithTx 用外部事务句柄构造同事务仓储实例。 +func (d *Repository) WithTx(tx *gorm.DB) *Repository { + return &Repository{db: tx} +} + +// CreateMessage 把事件写入 outbox(入队)。 // -// 设计说明: -// 1) 该方法只做“把消息安全写入本地 outbox 表”,不做任何 Kafka 网络调用; -// 2) next_retry_at 初始化为当前时间,表示“可立即被扫描器捞取”; -// 3) biz_type 由业务方传入,用于消费侧分发到不同处理器; -// 4) payload 会被序列化为 JSON 字符串存入 payload 字段,后续再按 biz_type 反序列化。 -// -// 这也是 Outbox 模式的核心:请求路径只承担本地写库成本,把外部系统不确定性(Kafka 延迟/抖动) -// 转移给后台异步循环处理。 -func (d *Repository) CreateMessage(ctx context.Context, bizType, topic, messageKey string, payload any, maxRetry int) (int64, error) { - // 1. 防御式兜底:若调用方未传 maxRetry,则统一使用默认值 20。 - // 这样可以避免某些链路遗漏配置导致消息无限重试或零重试。 +// 步骤: +// 1. 序列化 payload; +// 2. 初始化 pending 状态; +// 3. 写入 outbox 并返回 outbox_id。 +func (d *Repository) CreateMessage(ctx context.Context, eventType, topic, messageKey string, payload any, maxRetry int) (int64, error) { if maxRetry <= 0 { maxRetry = 20 } - // 2. 先把业务载荷序列化成 JSON 字符串。 - // 序列化失败属于“请求入队前失败”,此时不应创建 outbox 记录,直接返回错误即可。 raw, err := json.Marshal(payload) if err != nil { return 0, err } - // 3. 组装 outbox 初始记录: - // - status=pending:表示待投递; - // - retry_count=0:尚未重试; - // - next_retry_at=now:扫描器可立即捞取并尝试首次投递。 now := time.Now() msg := model.AgentOutboxMessage{ - BizType: bizType, + EventType: eventType, Topic: topic, MessageKey: messageKey, Payload: string(raw), @@ -62,21 +58,12 @@ func (d *Repository) CreateMessage(ctx context.Context, bizType, topic, messageK NextRetryAt: &now, } - // 4. 落库成功后返回 outbox 主键,供上层日志/追踪链路使用。 if err = d.db.WithContext(ctx).Create(&msg).Error; err != nil { return 0, err } return msg.ID, nil } -// CreateChatHistoryMessage 是聊天记录持久化的兼容入口。 -// 说明:为了避免现有业务调用一次性改太多,先保留该方法作为 CreateMessage 的薄封装。 -func (d *Repository) CreateChatHistoryMessage(ctx context.Context, topic, messageKey string, payload model.ChatHistoryPersistPayload, maxRetry int) (int64, error) { - return d.CreateMessage(ctx, model.OutboxBizTypeChatHistoryPersist, topic, messageKey, payload, maxRetry) -} - -// GetByID 按主键读取 outbox 记录。 -// 该方法通常用于 dispatch 前“再读一次最新状态”,避免使用过期快照。 func (d *Repository) GetByID(ctx context.Context, id int64) (*model.AgentOutboxMessage, error) { var msg model.AgentOutboxMessage if err := d.db.WithContext(ctx).Where("id = ?", id).First(&msg).Error; err != nil { @@ -85,13 +72,8 @@ func (d *Repository) GetByID(ctx context.Context, id int64) (*model.AgentOutboxM return &msg, nil } -// ListDueMessages 拉取“到期可投递”的 pending 消息。 -// 条件说明: -// 1) status = pending:只处理待投递状态; -// 2) next_retry_at <= now:到达可重试/可首次投递时间; -// 3) 按 next_retry_at + id 升序:保证老消息优先,降低饥饿概率。 +// ListDueMessages 拉取到期可投递消息。 func (d *Repository) ListDueMessages(ctx context.Context, limit int) ([]model.AgentOutboxMessage, error) { - // 1. 限流兜底,避免误传 0 导致一次拉取过多消息。 if limit <= 0 { limit = 100 } @@ -108,10 +90,8 @@ func (d *Repository) ListDueMessages(ctx context.Context, limit int) ([]model.Ag return messages, nil } -// MarkPublished 仅在消息未进入最终态时更新为 published,避免覆盖 consumed/dead。 +// MarkPublished 标记为已投递 Kafka。 func (d *Repository) MarkPublished(ctx context.Context, id int64) error { - // 1. published 代表“已成功写入 Kafka”。 - // 2. 清理 last_error/next_retry_at,表示当前无需重试。 now := time.Now() updates := map[string]interface{}{ "status": model.OutboxStatusPublished, @@ -119,7 +99,6 @@ func (d *Repository) MarkPublished(ctx context.Context, id int64) error { "last_error": nil, "next_retry_at": nil, } - // 3. 额外加状态保护,避免并发下把 consumed/dead 错误覆盖回 published。 result := d.db.WithContext(ctx). Model(&model.AgentOutboxMessage{}). Where("id = ? AND status NOT IN (?, ?)", id, model.OutboxStatusConsumed, model.OutboxStatusDead). @@ -127,10 +106,8 @@ func (d *Repository) MarkPublished(ctx context.Context, id int64) error { return result.Error } -// MarkDead 把消息标记为死信(最终失败,不再重试)。 -// 常见场景:载荷不可反序列化、biz_type 未注册等“不可恢复错误”。 +// MarkDead 标记为死信。 func (d *Repository) MarkDead(ctx context.Context, id int64, reason string) error { - // 1. 错误文本统一裁剪,避免超长错误撑爆字段或日志。 now := time.Now() lastErr := truncateError(reason) updates := map[string]interface{}{ @@ -142,39 +119,38 @@ func (d *Repository) MarkDead(ctx context.Context, id int64, reason string) erro return d.db.WithContext(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error } -// MarkFailedForRetry 把一次失败写回 outbox 状态机,并计算下一次重试窗口。 -// 该方法必须在事务内完成“读当前状态 + 写新状态”,保证并发时计数和状态一致。 +// MarkFailedForRetry 记录一次可重试失败并推进重试窗口。 +// +// 步骤: +// 1. 行级锁读取当前状态; +// 2. 最终态幂等短路; +// 3. retry_count+1; +// 4. 计算 next_retry_at 或 dead; +// 5. 写回状态快照。 func (d *Repository) MarkFailedForRetry(ctx context.Context, id int64, reason string) error { return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { - // 1. 行级锁读取,避免多个 goroutine 同时更新同一条消息导致 retry_count 乱序。 var msg model.AgentOutboxMessage err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", id).First(&msg).Error if err != nil { return err } - // 2. 若已是最终态(consumed/dead),直接幂等返回。 - // 这样即使出现重复调用,也不会把最终态改坏。 if msg.Status == model.OutboxStatusConsumed || msg.Status == model.OutboxStatusDead { return nil } - // 3. 递增重试计数并判断是否达到最大重试次数。 nextRetryCount := msg.RetryCount + 1 now := time.Now() status := model.OutboxStatusPending var nextRetryAt *time.Time if nextRetryCount >= msg.MaxRetry { - // 3.1 达到上限:转 dead,停止后续扫描重试。 status = model.OutboxStatusDead nextRetryAt = nil } else { - // 3.2 未到上限:按指数退避计算下一次可重试时间。 t := now.Add(calcRetryBackoff(nextRetryCount)) nextRetryAt = &t } - // 4. 写回失败原因与状态快照,便于排查问题。 lastErr := truncateError(reason) updates := map[string]interface{}{ "status": status, @@ -187,66 +163,34 @@ func (d *Repository) MarkFailedForRetry(ctx context.Context, id int64, reason st }) } -// PersistChatHistoryAndMarkConsumed 负责“消费成功后落业务库 + 标记 outbox consumed”。 -// 之所以必须放在同一个事务里,是为了保证“业务落库”和“状态推进”原子一致: -// - 若业务写入失败,不应把 outbox 标记为 consumed; -// - 若标记 consumed 失败,也应回滚业务写入,避免出现不可追踪的不一致。 -func (d *Repository) PersistChatHistoryAndMarkConsumed(ctx context.Context, outboxID int64, payload model.ChatHistoryPersistPayload) error { +// ConsumeAndMarkConsumed 是通用“消费成功事务入口”。 +// +// 步骤: +// 1. 事务内锁定 outbox 记录; +// 2. 已 consumed/dead 时幂等返回; +// 3. 执行业务回调 fn(tx); +// 4. 业务成功后统一标记 consumed。 +func (d *Repository) ConsumeAndMarkConsumed(ctx context.Context, outboxID int64, fn func(tx *gorm.DB) error) error { return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { - // 1. 先锁定 outbox 记录,确保同一条消息不会被并发消费者重复推进状态。 var outboxMsg model.AgentOutboxMessage err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", outboxID).First(&outboxMsg).Error if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { - // 1.1 幂等兜底:记录不存在时视为“无事可做”。 return nil } return err } - // 1.2 若已 consumed/dead,说明已被处理过或已终止,直接幂等返回。 - if outboxMsg.Status == model.OutboxStatusConsumed { - return nil - } - if outboxMsg.Status == model.OutboxStatusDead { + if outboxMsg.Status == model.OutboxStatusConsumed || outboxMsg.Status == model.OutboxStatusDead { return nil } - // 2. 写入聊天历史业务表(chat_histories)。 - // 这里不包含 token 统计等扩展字段,只负责核心消息落库。 - chatMsg := payload.Message - chatRole := payload.Role - history := model.ChatHistory{ - UserID: payload.UserID, - ChatID: payload.ConversationID, - MessageContent: &chatMsg, - Role: &chatRole, - } - if err = tx.Create(&history).Error; err != nil { - return err + if fn != nil { + if err = fn(tx); err != nil { + return err + } } - // 3. 同一事务内原子更新会话统计信息: - // - message_count + 1 - // - last_message_at = now - // 这样可以保证 message_count 与 chat_histories 的真实落库条数一致。 now := time.Now() - chatUpdates := map[string]interface{}{ - "message_count": gorm.Expr("message_count + ?", 1), - "last_message_at": &now, - } - chatResult := tx.Model(&model.AgentChat{}). - Where("user_id = ? AND chat_id = ?", payload.UserID, payload.ConversationID). - Updates(chatUpdates) - if chatResult.Error != nil { - return chatResult.Error - } - if chatResult.RowsAffected == 0 { - // 会话不存在时回滚,让 outbox 继续重试/告警,而不是吞掉不一致。 - return fmt.Errorf("conversation not found when updating stats: user_id=%d chat_id=%s", payload.UserID, payload.ConversationID) - } - - // 4. 业务写入成功后,把 outbox 推进到 consumed 最终态。 - // 并清理错误与重试字段,表示该消息生命周期结束。 updates := map[string]interface{}{ "status": model.OutboxStatusConsumed, "consumed_at": &now, @@ -258,8 +202,6 @@ func (d *Repository) PersistChatHistoryAndMarkConsumed(ctx context.Context, outb }) } -// calcRetryBackoff 计算指数退避时间。 -// 规则:1s, 2s, 4s, 8s, 16s, 32s(最多封顶到第 6 档)。 func calcRetryBackoff(retryCount int) time.Duration { if retryCount <= 0 { return time.Second @@ -270,7 +212,6 @@ func calcRetryBackoff(retryCount int) time.Duration { return time.Second * time.Duration(1<<(retryCount-1)) } -// truncateError 限制错误文本最大长度,防止写库失败或日志污染。 func truncateError(reason string) string { if len(reason) <= 2000 { return reason diff --git a/backend/model/agent.go b/backend/model/agent.go index 9787332..234c139 100644 --- a/backend/model/agent.go +++ b/backend/model/agent.go @@ -9,6 +9,19 @@ type UserSendMessageRequest struct { Thinking bool `json:"thinking,omitempty"` } +// ChatHistoryPersistPayload 是“聊天消息持久化请求”业务 DTO。 +// +// 职责边界: +// 1. 只描述聊天业务需要落库的核心字段; +// 2. 可被同步直写路径与异步事件路径复用; +// 3. 不包含 outbox/kafka 协议字段(这些字段由 infra 层统一封装)。 +type ChatHistoryPersistPayload struct { + UserID int `json:"user_id"` + ConversationID string `json:"conversation_id"` + Role string `json:"role"` + Message string `json:"message"` +} + // GetConversationMetaResponse 是会话元信息查询接口的返回结构。 // 说明: // 1) title 可能为空字符串(表示标题尚未生成); diff --git a/backend/model/outbox.go b/backend/model/outbox.go index c473dce..477311d 100644 --- a/backend/model/outbox.go +++ b/backend/model/outbox.go @@ -3,40 +3,36 @@ package model import "time" const ( - // OutboxStatusPending 表示消息已落 outbox,等待投递或等待下次重试窗口到达。 + // OutboxStatusPending 表示消息已写入 outbox,等待投递或重试窗口到达。 OutboxStatusPending = "pending" - // OutboxStatusPublished 表示消息已成功写入 Kafka,但尚未完成业务消费。 + // OutboxStatusPublished 表示消息已成功写入 Kafka,但业务消费尚未完成。 OutboxStatusPublished = "published" - // OutboxStatusConsumed 表示消息对应的业务逻辑已成功执行(本项目中即聊天记录已落库)。 + // OutboxStatusConsumed 表示消息对应业务处理已成功完成(最终态)。 OutboxStatusConsumed = "consumed" - // OutboxStatusDead 表示达到最大重试次数或出现不可恢复错误,进入死信终态。 + // OutboxStatusDead 表示达到重试上限或出现不可恢复错误(最终态)。 OutboxStatusDead = "dead" - - // OutboxBizTypeChatHistoryPersist 当前唯一业务类型:聊天记录异步持久化。 - OutboxBizTypeChatHistoryPersist = "chat_history_persist" ) -// AgentOutboxMessage 是 outbox 模式的核心表结构: -// 1. 先写本地数据库(保证事务内可见); -// 2. 再由后台扫描并投递 Kafka; -// 3. 由消费者完成最终业务落库并回写状态。 +// AgentOutboxMessage 是 outbox 状态机表模型。 +// +// 关键说明: +// 1. EventType 映射到数据库 `biz_type` 列(为兼容历史表结构,不改 DDL); +// 2. Payload 保存统一事件外壳 JSON; +// 3. Status/RetryCount/NextRetryAt 组成重试状态机。 type AgentOutboxMessage struct { ID int64 `gorm:"column:id;primaryKey;autoIncrement"` - // BizType 决定消费者侧如何解释 Payload。 - BizType string `gorm:"column:biz_type;type:varchar(64);not null;index:idx_outbox_status_next,priority:3;comment:业务类型"` - // Topic/MessageKey 用于 Kafka 路由与分区稳定性。 + + EventType string `gorm:"column:biz_type;type:varchar(64);not null;index:idx_outbox_status_next,priority:3;comment:事件类型"` Topic string `gorm:"column:topic;type:varchar(128);not null;comment:Kafka Topic"` MessageKey string `gorm:"column:message_key;type:varchar(128);not null;comment:Kafka 消息键"` - // Payload 存储业务 JSON,消费时再反序列化为具体 payload 结构。 - Payload string `gorm:"column:payload;type:longtext;not null;comment:业务载荷(JSON)"` - // Status + NextRetryAt + RetryCount 共同描述“是否可被调度重试”。 + Payload string `gorm:"column:payload;type:longtext;not null;comment:业务载荷(JSON)"` + Status string `gorm:"column:status;type:varchar(32);not null;index:idx_outbox_status_next,priority:1;comment:pending/published/consumed/dead"` RetryCount int `gorm:"column:retry_count;not null;default:0;comment:已重试次数"` MaxRetry int `gorm:"column:max_retry;not null;default:20;comment:最大重试次数"` NextRetryAt *time.Time `gorm:"column:next_retry_at;index:idx_outbox_status_next,priority:2;comment:下次重试时间"` - // LastError 记录最近一次失败原因,便于排障和可观测。 - LastError *string `gorm:"column:last_error;type:text;comment:最后一次错误"` - // PublishedAt/ConsumedAt 便于统计“投递延迟”和“消费完成耗时”。 + LastError *string `gorm:"column:last_error;type:text;comment:最后一次错误"` + PublishedAt *time.Time `gorm:"column:published_at;comment:投递到 Kafka 时间"` ConsumedAt *time.Time `gorm:"column:consumed_at;comment:消费完成时间"` CreatedAt *time.Time `gorm:"column:created_at;autoCreateTime"` @@ -46,12 +42,3 @@ type AgentOutboxMessage struct { func (AgentOutboxMessage) TableName() string { return "agent_outbox_messages" } - -// ChatHistoryPersistPayload 是“聊天记录持久化”消息的业务载荷。 -// 注意:该载荷既会被写入 outbox,也会被封装到 Kafka Envelope 中传输。 -type ChatHistoryPersistPayload struct { - UserID int `json:"user_id"` - ConversationID string `json:"conversation_id"` - Role string `json:"role"` - Message string `json:"message"` -} diff --git a/backend/service/agent_bridge.go b/backend/service/agent_bridge.go index e0240f9..bdbff6a 100644 --- a/backend/service/agent_bridge.go +++ b/backend/service/agent_bridge.go @@ -17,6 +17,6 @@ type AgentService = agentsvc.AgentService // 说明: // 1) 外部调用签名保持不变; // 2) 真实构造逻辑已下沉到 service/agentsvc 包。 -func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, agentRedis *dao.AgentCache, asyncPipeline *outboxinfra.ChatHistoryAsync) *AgentService { - return agentsvc.NewAgentService(aiHub, repo, taskRepo, agentRedis, asyncPipeline) +func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, agentRedis *dao.AgentCache, eventPublisher outboxinfra.EventPublisher) *AgentService { + return agentsvc.NewAgentService(aiHub, repo, taskRepo, agentRedis, eventPublisher) } diff --git a/backend/service/agentsvc/agent.go b/backend/service/agentsvc/agent.go index 881a858..aea792e 100644 --- a/backend/service/agentsvc/agent.go +++ b/backend/service/agentsvc/agent.go @@ -13,29 +13,30 @@ import ( "github.com/LoveLosita/smartflow/backend/inits" "github.com/LoveLosita/smartflow/backend/model" "github.com/LoveLosita/smartflow/backend/pkg" + eventsvc "github.com/LoveLosita/smartflow/backend/service/events" "github.com/cloudwego/eino-ext/components/model/ark" "github.com/cloudwego/eino/schema" "github.com/google/uuid" ) type AgentService struct { - AIHub *inits.AIHub - repo *dao.AgentDAO - taskRepo *dao.TaskDAO - agentCache *dao.AgentCache - asyncPipeline *outboxinfra.ChatHistoryAsync + AIHub *inits.AIHub + repo *dao.AgentDAO + taskRepo *dao.TaskDAO + agentCache *dao.AgentCache + eventPublisher outboxinfra.EventPublisher } // NewAgentService 构造 AgentService。 // 这里通过依赖注入把“模型、仓储、缓存、异步持久化通道”统一交给服务层管理, // 便于后续在单测中替换实现,或在启动流程中按环境切换配置。 -func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, agentRedis *dao.AgentCache, asyncPipeline *outboxinfra.ChatHistoryAsync) *AgentService { +func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, agentRedis *dao.AgentCache, eventPublisher outboxinfra.EventPublisher) *AgentService { return &AgentService{ - AIHub: aiHub, - repo: repo, - taskRepo: taskRepo, - agentCache: agentRedis, - asyncPipeline: asyncPipeline, + AIHub: aiHub, + repo: repo, + taskRepo: taskRepo, + agentCache: agentRedis, + eventPublisher: eventPublisher, } } @@ -63,17 +64,28 @@ func (s *AgentService) pickChatModel(requestModel string) (*ark.ChatModel, strin return s.AIHub.Worker, "worker" } -// saveChatHistoryReliable 统一封装“聊天记录持久化入口”: -// 1) 开启异步链路时,走 outbox + Kafka; -// 2) 未开启时,直接同步写库。 -func (s *AgentService) saveChatHistoryReliable(ctx context.Context, payload model.ChatHistoryPersistPayload) error { - // 1. 未注入异步通道时(例如本地极简环境),直接同步写 DB。 +// PersistChatHistory 是 Agent 聊天链路唯一的“消息持久化入口”。 +// +// 职责边界: +// 1. 负责根据当前部署模式选择“异步 outbox”或“同步直写 DB”; +// 2. 负责把统一 DTO(ChatHistoryPersistPayload)交给下游基础设施; +// 3. 不负责 Redis 上下文写入(Redis 由调用方在链路中先行处理); +// 4. 不负责消费完成回调(异步模式下由 outbox 消费者负责最终落库)。 +func (s *AgentService) PersistChatHistory(ctx context.Context, payload model.ChatHistoryPersistPayload) error { + // 1. 未注入事件发布器时(例如本地极简环境),直接同步写 DB。 // 这样可以保证功能不依赖 Kafka 也能跑通。 - if s.asyncPipeline == nil { + if s.eventPublisher == nil { return s.repo.SaveChatHistory(ctx, payload.UserID, payload.ConversationID, payload.Role, payload.Message) } - // 2. 已启用异步通道时,只入 outbox,不在请求路径阻塞 Kafka。 - return s.asyncPipeline.EnqueueChatHistoryPersist(ctx, payload) + // 2. 已启用异步总线时,只发布“持久化请求事件”,不在请求路径阻塞 Kafka。 + // 2.1 发布成功仅代表“事件安全入队”,实际落库由消费者异步完成。 + return eventsvc.PublishChatHistoryPersistRequested(ctx, s.eventPublisher, payload) +} + +// saveChatHistoryReliable 是历史兼容别名。 +// 迁移策略:先保留旧方法名,避免同轮改动跨文件过大;后续可统一替换为 PersistChatHistory。 +func (s *AgentService) saveChatHistoryReliable(ctx context.Context, payload model.ChatHistoryPersistPayload) error { + return s.PersistChatHistory(ctx, payload) } // pushErrNonBlocking 向错误通道“尽力投递”错误。 @@ -167,7 +179,7 @@ func (s *AgentService) runNormalChatFlow( log.Printf("写入用户消息到 Redis 失败: %v", err) } - if err = s.saveChatHistoryReliable(ctx, model.ChatHistoryPersistPayload{ + if err = s.PersistChatHistory(ctx, model.ChatHistoryPersistPayload{ UserID: userID, ConversationID: chatID, Role: "user", @@ -186,7 +198,7 @@ func (s *AgentService) runNormalChatFlow( log.Printf("写入助手消息到 Redis 失败: %v", err) } - if saveErr := s.saveChatHistoryReliable(context.Background(), model.ChatHistoryPersistPayload{ + if saveErr := s.PersistChatHistory(context.Background(), model.ChatHistoryPersistPayload{ UserID: userID, ConversationID: chatID, Role: "assistant", diff --git a/backend/service/agentsvc/agent_quick_note.go b/backend/service/agentsvc/agent_quick_note.go index 2d257c7..516d7b6 100644 --- a/backend/service/agentsvc/agent_quick_note.go +++ b/backend/service/agentsvc/agent_quick_note.go @@ -351,7 +351,7 @@ func (s *AgentService) persistChatAfterReply( } // 2. 再把用户消息写入可靠持久化通道(outbox 或同步 DB)。 - if err := s.saveChatHistoryReliable(ctx, model.ChatHistoryPersistPayload{ + if err := s.PersistChatHistory(ctx, model.ChatHistoryPersistPayload{ UserID: userID, ConversationID: chatID, Role: "user", @@ -367,7 +367,7 @@ func (s *AgentService) persistChatAfterReply( } // 4. 助手消息持久化失败不阻断主流程,通过 errChan 异步上报。 - if err := s.saveChatHistoryReliable(context.Background(), model.ChatHistoryPersistPayload{ + if err := s.PersistChatHistory(context.Background(), model.ChatHistoryPersistPayload{ UserID: userID, ConversationID: chatID, Role: "assistant", diff --git a/backend/service/events/chat_history_persist.go b/backend/service/events/chat_history_persist.go new file mode 100644 index 0000000..60a0ac2 --- /dev/null +++ b/backend/service/events/chat_history_persist.go @@ -0,0 +1,104 @@ +package events + +import ( + "context" + "encoding/json" + "errors" + + "github.com/LoveLosita/smartflow/backend/dao" + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + "github.com/LoveLosita/smartflow/backend/model" + "gorm.io/gorm" +) + +const ( + // EventTypeChatHistoryPersistRequested 是“聊天消息持久化请求”的业务事件类型。 + // + // 命名策略: + // 1. 只描述业务语义,不包含 outbox/kafka 等实现词; + // 2. 作为新路由键长期保留,后续协议变化优先走 event_version; + // 3. 旧路由键仅作兼容,不再作为新发布默认值。 + EventTypeChatHistoryPersistRequested = "chat.history.persist.requested" +) + +// RegisterChatHistoryPersistHandler 注册“聊天消息持久化”消费者处理器。 +// +// 职责边界: +// 1. 只负责聊天事件,不处理其他业务事件; +// 2. 只负责注册,不负责总线启停; +// 3. 通过 outbox 通用事务入口把“业务写入 + consumed 推进”合并为一个事务; +// 4. 当前版本仅注册新路由键(chat.history.persist.requested),不再注册旧兼容键。 +func RegisterChatHistoryPersistHandler( + bus *outboxinfra.EventBus, + outboxRepo *outboxinfra.Repository, + repoManager *dao.RepoManager, +) error { + // 1. 依赖校验:任何一个关键依赖为空都无法安全处理消息。 + if bus == nil { + return errors.New("event bus is nil") + } + if outboxRepo == nil { + return errors.New("outbox repository is nil") + } + if repoManager == nil { + return errors.New("repo manager is nil") + } + + // 2. 定义统一处理器: + // 2.1 解析 payload; + // 2.2 调用 outbox 通用消费事务; + // 2.3 在事务回调中复用 RepoManager.WithTx 执行业务 DAO 写入。 + handler := func(ctx context.Context, envelope kafkabus.Envelope) error { + var payload model.ChatHistoryPersistPayload + if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { + // 2.1 payload 非法属于不可恢复错误,直接标 dead,避免无意义重试。 + _ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析聊天持久化载荷失败: "+unmarshalErr.Error()) + return nil + } + + // 2.2 使用 outbox 通用消费事务,保证“业务写入 + consumed 状态推进”原子一致。 + return outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { + // 2.2.1 基于同一个 tx 构造 RepoManager,复用你现有跨包事务模型。 + txM := repoManager.WithTx(tx) + // 2.2.2 在同事务内写入聊天历史与会话计数。 + return txM.Agent.SaveChatHistoryInTx( + ctx, + payload.UserID, + payload.ConversationID, + payload.Role, + payload.Message, + ) + }) + } + + // 3. 注册新路由键(主路由)。 + if err := bus.RegisterEventHandler(EventTypeChatHistoryPersistRequested, handler); err != nil { + return err + } + + return nil +} + +// PublishChatHistoryPersistRequested 发布“聊天消息持久化请求”事件。 +// +// 设计目的: +// 1. 让业务层只传 DTO,不重复拼事件元数据; +// 2. 统一消息键策略(conversation_id 作为 MessageKey/AggregateID); +// 3. 发布失败时显式返回 error,由调用方决定是否降级到同步写库。 +func PublishChatHistoryPersistRequested( + ctx context.Context, + publisher outboxinfra.EventPublisher, + payload model.ChatHistoryPersistPayload, +) error { + if publisher == nil { + return errors.New("event publisher is nil") + } + return publisher.Publish(ctx, outboxinfra.PublishRequest{ + EventType: EventTypeChatHistoryPersistRequested, + EventVersion: outboxinfra.DefaultEventVersion, + MessageKey: payload.ConversationID, + AggregateID: payload.ConversationID, + Payload: payload, + }) +} diff --git a/docs/功能决策记录/Outbox_Kafka_异步持久化决策记录.md b/docs/功能决策记录/Outbox_Kafka_异步持久化决策记录.md new file mode 100644 index 0000000..fbbb112 --- /dev/null +++ b/docs/功能决策记录/Outbox_Kafka_异步持久化决策记录.md @@ -0,0 +1,165 @@ +# 功能决策记录(FDR):Outbox + Kafka 异步持久化链路 + +## 1. 基本信息 +- 记录编号:FDR-2026-03-OUTBOX-KAFKA +- 功能名称:聊天消息异步可靠持久化(Outbox + Kafka) +- 记录日期:2026-03-16 +- 决策状态:方案A保留(历史阶段),方案B生效(当前阶段) +- 负责人:项目协作实现(你 + Codex) +- 关联模块: + - `backend/service/agentsvc` + - `backend/service/events` + - `backend/infra/outbox` + - `backend/infra/kafka` + - `backend/dao/agent.go` + - `backend/cmd/start.go` + +## 2. 背景与问题 +- 业务背景:`/agent/chat` 是流式接口,用户对首字延迟敏感;但聊天记录又必须可靠落库,不能因为 Kafka 瞬时抖动而丢失。 +- 决策前的核心矛盾: + 1. 直接同步写 Kafka,会把 broker 网络抖动算进请求耗时。 + 2. 只发 Kafka 不落本地,会出现“发送失败即丢消息”的可靠性缺口。 + 3. 业务代码直接依赖 Kafka 细节,复用性和可维护性差。 + +## 3. 决策目标 +1. 请求主链路只承担“本地可控成本”,不阻塞外部消息系统。 +2. 持久化链路具备明确状态机(`pending/published/consumed/dead`)与可重试能力。 +3. 业务层调用尽量简化为“发布业务事件”,不耦合 outbox/kafka 细节。 +4. 为后续新增事件类型(不仅聊天)保留扩展空间。 + +## 4. 方案对比 + +### 4.1 方案A(历史方案,保留记录) +Outbox 入库 + 扫描投 Kafka + 消费落库,且包含“历史兼容逻辑”。 + +- 历史实现特征: + 1. 业务入口存在聊天语义适配层(如早期 `chat_history_async` 风格的适配)。 + 2. 消费侧路由曾兼容旧字段与旧格式(包括 `biz_type` 兼容路径)。 + 3. 代码可运行,但“基础设施层”仍残留业务痕迹。 +- 优点: + 1. 可靠性目标已达成,消息不会因 Kafka 瞬时故障直接丢失。 + 2. 首字延迟不再直接绑定 Kafka 可用性。 +- 缺点: + 1. 兼容分支较多,可读性与长期维护成本偏高。 + 2. 不利于沉淀成“通用事件总线”,扩展新业务事件时心智负担大。 + +### 4.2 方案B(当前生效方案) +把 Outbox + Kafka 沉淀为通用事件总线,仅保留 `event_type` 统一协议,不再保留旧格式回退。 + +- 当前实现特征: + 1. 业务层通过 `EventPublisher.Publish(...)` 发布事件,不直接操作 Kafka。 + 2. outbox payload 统一为事件外壳:`event_id/event_type/event_version/aggregate_id/payload`。 + 3. 消费侧仅按 `event_type` 路由 handler,不再做旧协议回退。 + 4. 业务 handler 下沉到 `backend/service/events/*`,infra 只负责总线能力与状态机推进。 +- 优点: + 1. 解耦更彻底:基础设施和业务语义边界清晰。 + 2. 新增业务事件只需“定义事件 + 注册 handler”,复用链路成本低。 + 3. 可观测性更统一,排障路径更短。 +- 代价: + 1. 对历史旧格式消息不再兼容,切换前需确认无旧积压。 + 2. 需要团队统一遵守 `event_type` 协议规范。 + +## 5. 最终决策 +- 采用结果:方案B(当前生效)。 +- 保留方案A文档:用于复盘演进路径与面试叙述(从可用到可扩展的重构过程)。 +- 关键判断依据: + 1. 方案B在不牺牲可靠性的前提下,显著提升架构清晰度。 + 2. 方案B更符合“通用总线”目标,便于后续挂载标题生成、token统计、更多 agent 事件。 + +## 6. 当前实现细节(方案B) + +### 6.1 分层与职责 +1. 通用事件总线门面:`backend/infra/outbox/event_bus.go` +2. Outbox 核心引擎(扫描/投递/消费/路由):`backend/infra/outbox/engine.go` +3. Outbox 状态仓储(状态流转 + 通用消费事务):`backend/infra/outbox/repository.go` +4. 统一事件协议解析:`backend/infra/outbox/event_contract.go` +5. Kafka 协议包装:`backend/infra/kafka/envelope.go` +6. 业务事件注册与发布(聊天持久化):`backend/service/events/chat_history_persist.go` +7. 启动接线:`backend/cmd/start.go` + +### 6.2 现行主链路(event_type-only) +```mermaid +flowchart TD + A["AgentService.saveChatHistoryReliable"] --> B{"异步总线是否启用"} + B -- 否 --> C["同步写库 dao.SaveChatHistory"] + B -- 是 --> D["PublishChatHistoryPersistRequested"] + D --> E["EventBus.Publish -> outbox.CreateMessage(status=pending)"] + E --> F["dispatch loop 扫描 due outbox"] + F --> G["producer.Enqueue -> Kafka"] + G --> H["outbox.MarkPublished"] + H --> I["consume loop 拉取 Kafka 消息"] + I --> J["按 event_type 分发 handler"] + J --> K["ConsumeAndMarkConsumed(事务)"] + K --> L["dao.SaveChatHistoryInTx 写 chat_histories"] + K --> M["更新 agent_chats.message_count/last_message_at"] + K --> N["outbox.status=consumed"] +``` + +### 6.3 状态机口径 +- `pending`:已入 outbox,待投递。 +- `published`:已投递 Kafka,待消费处理。 +- `consumed`:业务处理成功并完成状态推进(最终态)。 +- `dead`:达到重试上限或不可恢复错误(最终态)。 + +状态流转: +- 正常:`pending -> published -> consumed` +- 可重试失败:`pending/published -> pending(next_retry_at++)` +- 不可恢复/超限:`pending/published -> dead` + +### 6.4 当前关键策略 +1. 请求链路只写 outbox,不在主链路做 Kafka 网络 IO。 +2. 消费侧统一通过 `ConsumeAndMarkConsumed` 做“业务写入 + consumed 推进”同事务。 +3. `event_type` 缺失、payload 非法、未知事件类型等,按不可恢复错误处理并标 `dead`。 +4. 重试采用指数退避,上限由配置控制(默认 `max_retry=20`)。 + +## 7. 与方案A差异清单(本次更新重点) +1. 删除聊天专用 outbox 适配层文件,改为通用事件发布/注册方式。 +2. 路由统一收敛为 `event_type`,不再依赖旧 `biz_type` 兼容分发语义。 +3. payload 解析仅接受统一事件外壳,不再保留旧业务 JSON 回退逻辑。 +4. 业务消费处理器迁移到 `backend/service/events`,infra 不再承载聊天业务语义。 + +## 8. 影响范围 +1. 代码层: + - `backend/infra/outbox/*` + - `backend/infra/kafka/*` + - `backend/service/events/*` + - `backend/service/agentsvc/agent.go` + - `backend/cmd/start.go` +2. 数据层: + - 仍使用 `agent_outbox_messages` 作为状态机表(表结构不变)。 + - 逻辑字段改为 `EventType`(映射数据库列 `biz_type`,仅为兼容历史表结构命名)。 +3. 接口层: + - `/api/v1/agent/chat` 对外协议不变。 + +## 9. 风险与应对 +### 风险1:切换时存在历史旧格式积压消息 +- 现象:旧格式消息可能被判定为不可解析并进入 `dead`。 +- 应对: + 1. 切换前确认 outbox 积压清零。 + 2. 若必须保留旧消息,先做一次人工迁移/回放再切流。 + +### 风险2:事件类型规范失控(命名冲突/语义漂移) +- 应对: + 1. 统一事件命名规范:`domain.resource.action`。 + 2. 通过 `event_version` 控制未来演进,避免“同名不同义”。 + +### 风险3:消费者停摆导致积压 +- 应对: + 1. 启动日志打印核心配置并可观测状态。 + 2. 监控 `pending/published/dead` 数量与 `next_retry_at` 老化。 + +## 10. 验证与回滚 +### 10.1 验证项 +1. 端到端:确认 outbox 记录从 `pending -> published -> consumed`。 +2. 异常注入:Kafka 暂停后消息保留在 outbox 并按退避重试。 +3. 恢复验证:Kafka 恢复后积压可继续消费并落库。 +4. 一致性:`chat_histories` 与 `agent_chats.message_count` 语义一致。 + +### 10.2 回滚策略 +1. 配置 `kafka.enabled=false`,服务自动降级到同步直写路径。 +2. outbox 表数据保留,可在恢复后继续处理或人工回放。 + +## 11. 后续计划 +1. 在总线层补结构化指标(积压量、重试分布、死信速率)。 +2. 增加 dead-letter 管理工具(筛选、重放、归档)。 +3. 持续扩展事件类型,把标题生成、统计类异步任务挂到同一总线。