Files
smartmate/docs/功能决策记录/Outbox_Kafka_异步持久化决策记录.md
Losita 626fc700d2 Version: 0.6.1.dev.260316
♻️ refactor(outbox): 抽离通用事件总线,并完成 event_type-only 收口

-  新增 `infra` 层通用 `EventBus` / `EventContract`,统一事件发布与消费协议
- 🔄 将聊天持久化链路调整为通过 `service/events` 注册 handler 并发布事件,进一步解耦业务逻辑与异步处理流程
- 🧹 移除 `chat_history_async` 旧适配实现,以及基于 `biz_type` 的兼容分发逻辑
- 📝 更新 Outbox 异步持久化决策记录,明确保留方案 A,并正式启用方案 B
- 📚 同步更新 README 中关于 Outbox + Kafka 可靠异步链路的说明
- 🚚 当前 `outbox + kafka` 已与项目业务链路完全解耦,沉淀为通用、可靠性更强的消息队列能力;后续将参考消息队列的典型使用方式,逐步扩展到更多业务场景
-  补充跨不同分类事务管理器中的 `agent dao` 注册与接入支持
2026-03-16 13:00:26 +08:00

7.8 KiB
Raw Permalink Blame History

功能决策记录FDROutbox + Kafka 异步持久化链路

1. 基本信息

  • 记录编号FDR-2026-03-OUTBOX-KAFKA
  • 功能名称聊天消息异步可靠持久化Outbox + Kafka
  • 记录日期2026-03-16
  • 决策状态方案A保留历史阶段方案B生效当前阶段
  • 负责人:项目协作实现(你 + Codex
  • 关联模块:
    • backend/service/agentsvc
    • backend/service/events
    • backend/infra/outbox
    • backend/infra/kafka
    • backend/dao/agent.go
    • backend/cmd/start.go

2. 背景与问题

  • 业务背景:/agent/chat 是流式接口,用户对首字延迟敏感;但聊天记录又必须可靠落库,不能因为 Kafka 瞬时抖动而丢失。
  • 决策前的核心矛盾:
    1. 直接同步写 Kafka会把 broker 网络抖动算进请求耗时。
    2. 只发 Kafka 不落本地,会出现“发送失败即丢消息”的可靠性缺口。
    3. 业务代码直接依赖 Kafka 细节,复用性和可维护性差。

3. 决策目标

  1. 请求主链路只承担“本地可控成本”,不阻塞外部消息系统。
  2. 持久化链路具备明确状态机(pending/published/consumed/dead)与可重试能力。
  3. 业务层调用尽量简化为“发布业务事件”,不耦合 outbox/kafka 细节。
  4. 为后续新增事件类型(不仅聊天)保留扩展空间。

4. 方案对比

4.1 方案A历史方案保留记录

Outbox 入库 + 扫描投 Kafka + 消费落库,且包含“历史兼容逻辑”。

  • 历史实现特征:
    1. 业务入口存在聊天语义适配层(如早期 chat_history_async 风格的适配)。
    2. 消费侧路由曾兼容旧字段与旧格式(包括 biz_type 兼容路径)。
    3. 代码可运行,但“基础设施层”仍残留业务痕迹。
  • 优点:
    1. 可靠性目标已达成,消息不会因 Kafka 瞬时故障直接丢失。
    2. 首字延迟不再直接绑定 Kafka 可用性。
  • 缺点:
    1. 兼容分支较多,可读性与长期维护成本偏高。
    2. 不利于沉淀成“通用事件总线”,扩展新业务事件时心智负担大。

4.2 方案B当前生效方案

把 Outbox + Kafka 沉淀为通用事件总线,仅保留 event_type 统一协议,不再保留旧格式回退。

  • 当前实现特征:
    1. 业务层通过 EventPublisher.Publish(...) 发布事件,不直接操作 Kafka。
    2. outbox payload 统一为事件外壳:event_id/event_type/event_version/aggregate_id/payload
    3. 消费侧仅按 event_type 路由 handler不再做旧协议回退。
    4. 业务 handler 下沉到 backend/service/events/*infra 只负责总线能力与状态机推进。
  • 优点:
    1. 解耦更彻底:基础设施和业务语义边界清晰。
    2. 新增业务事件只需“定义事件 + 注册 handler”复用链路成本低。
    3. 可观测性更统一,排障路径更短。
  • 代价:
    1. 对历史旧格式消息不再兼容,切换前需确认无旧积压。
    2. 需要团队统一遵守 event_type 协议规范。

5. 最终决策

  • 采用结果方案B当前生效
  • 保留方案A文档用于复盘演进路径与面试叙述从可用到可扩展的重构过程
  • 关键判断依据:
    1. 方案B在不牺牲可靠性的前提下显著提升架构清晰度。
    2. 方案B更符合“通用总线”目标便于后续挂载标题生成、token统计、更多 agent 事件。

6. 当前实现细节方案B

6.1 分层与职责

  1. 通用事件总线门面:backend/infra/outbox/event_bus.go
  2. Outbox 核心引擎(扫描/投递/消费/路由):backend/infra/outbox/engine.go
  3. Outbox 状态仓储(状态流转 + 通用消费事务):backend/infra/outbox/repository.go
  4. 统一事件协议解析:backend/infra/outbox/event_contract.go
  5. Kafka 协议包装:backend/infra/kafka/envelope.go
  6. 业务事件注册与发布(聊天持久化):backend/service/events/chat_history_persist.go
  7. 启动接线:backend/cmd/start.go

6.2 现行主链路event_type-only

flowchart TD
    A["AgentService.saveChatHistoryReliable"] --> B{"异步总线是否启用"}
    B -- 否 --> C["同步写库 dao.SaveChatHistory"]
    B -- 是 --> D["PublishChatHistoryPersistRequested"]
    D --> E["EventBus.Publish -> outbox.CreateMessage(status=pending)"]
    E --> F["dispatch loop 扫描 due outbox"]
    F --> G["producer.Enqueue -> Kafka"]
    G --> H["outbox.MarkPublished"]
    H --> I["consume loop 拉取 Kafka 消息"]
    I --> J["按 event_type 分发 handler"]
    J --> K["ConsumeAndMarkConsumed(事务)"]
    K --> L["dao.SaveChatHistoryInTx 写 chat_histories"]
    K --> M["更新 agent_chats.message_count/last_message_at"]
    K --> N["outbox.status=consumed"]

6.3 状态机口径

  • pending:已入 outbox待投递。
  • published:已投递 Kafka待消费处理。
  • consumed:业务处理成功并完成状态推进(最终态)。
  • dead:达到重试上限或不可恢复错误(最终态)。

状态流转:

  • 正常:pending -> published -> consumed
  • 可重试失败:pending/published -> pending(next_retry_at++)
  • 不可恢复/超限:pending/published -> dead

6.4 当前关键策略

  1. 请求链路只写 outbox不在主链路做 Kafka 网络 IO。
  2. 消费侧统一通过 ConsumeAndMarkConsumed 做“业务写入 + consumed 推进”同事务。
  3. event_type 缺失、payload 非法、未知事件类型等,按不可恢复错误处理并标 dead
  4. 重试采用指数退避,上限由配置控制(默认 max_retry=20)。

7. 与方案A差异清单本次更新重点

  1. 删除聊天专用 outbox 适配层文件,改为通用事件发布/注册方式。
  2. 路由统一收敛为 event_type,不再依赖旧 biz_type 兼容分发语义。
  3. payload 解析仅接受统一事件外壳,不再保留旧业务 JSON 回退逻辑。
  4. 业务消费处理器迁移到 backend/service/eventsinfra 不再承载聊天业务语义。

8. 影响范围

  1. 代码层:
    • backend/infra/outbox/*
    • backend/infra/kafka/*
    • backend/service/events/*
    • backend/service/agentsvc/agent.go
    • backend/cmd/start.go
  2. 数据层:
    • 仍使用 agent_outbox_messages 作为状态机表(表结构不变)。
    • 逻辑字段改为 EventType(映射数据库列 biz_type,仅为兼容历史表结构命名)。
  3. 接口层:
    • /api/v1/agent/chat 对外协议不变。

9. 风险与应对

风险1切换时存在历史旧格式积压消息

  • 现象:旧格式消息可能被判定为不可解析并进入 dead
  • 应对:
    1. 切换前确认 outbox 积压清零。
    2. 若必须保留旧消息,先做一次人工迁移/回放再切流。

风险2事件类型规范失控命名冲突/语义漂移)

  • 应对:
    1. 统一事件命名规范:domain.resource.action
    2. 通过 event_version 控制未来演进,避免“同名不同义”。

风险3消费者停摆导致积压

  • 应对:
    1. 启动日志打印核心配置并可观测状态。
    2. 监控 pending/published/dead 数量与 next_retry_at 老化。

10. 验证与回滚

10.1 验证项

  1. 端到端:确认 outbox 记录从 pending -> published -> consumed
  2. 异常注入Kafka 暂停后消息保留在 outbox 并按退避重试。
  3. 恢复验证Kafka 恢复后积压可继续消费并落库。
  4. 一致性:chat_historiesagent_chats.message_count 语义一致。

10.2 回滚策略

  1. 配置 kafka.enabled=false,服务自动降级到同步直写路径。
  2. outbox 表数据保留,可在恢复后继续处理或人工回放。

11. 后续计划

  1. 在总线层补结构化指标(积压量、重试分布、死信速率)。
  2. 增加 dead-letter 管理工具(筛选、重放、归档)。
  3. 持续扩展事件类型,把标题生成、统计类异步任务挂到同一总线。