Version: 0.5.0.dev.260310

refactor: ♻️ 调整 Outbox 写入时序并移除 Kafka 首包同步投递逻辑

* 将 `outbox` 表写入逻辑后置到 LLM 请求之后,减少主链路阻塞
* 删除 Codex 生成的 Kafka 首包同步投递抽象逻辑,简化消息发送流程
* 优化 SSE 首字到达时间,整体降低约 1s 延迟
* 当前在请求 LLM 之前的流程全部为 Redis 操作,显著降低 IO 开销

docs: 📊 保留 SSE 链路性能打点逻辑

* 保留原有 SSE 全链路打点计时代码,便于后续性能排查与分析
* 当前默认注释,如需使用可手动启用进行性能调试
This commit is contained in:
Losita
2026-03-10 23:10:09 +08:00
parent 959049db42
commit 912a6d8cfe
6 changed files with 138 additions and 123 deletions

View File

@@ -13,7 +13,7 @@ import (
arkModel "github.com/volcengine/volcengine-go-sdk/service/arkruntime/model"
)
// StreamResponse OpenAI/DeepSeek 兼容的流式 chunk 结构。
// StreamResponse OpenAI/DeepSeek 兼容的流式 chunk 结构。
type StreamResponse struct {
ID string `json:"id"`
Object string `json:"object"`
@@ -88,8 +88,24 @@ func ToOpenAIFinishStream(requestID, modelName string, created int64) (string, e
return string(jsonBytes), nil
}
func StreamChat(ctx context.Context, llm *ark.ChatModel, modelName string, userInput string, ifThinking bool, chatHistory []*schema.Message, outChan chan<- string) (string, error) {
// 1) 组装提示消息
// StreamChat 负责模型流式输出,并在关键节点打点:
// 1) 流连接建立llm.Stream 返回)
// 2) 首包到达(首字延迟)
// 3) 流式输出结束
func StreamChat(
ctx context.Context,
llm *ark.ChatModel,
modelName string,
userInput string,
ifThinking bool,
chatHistory []*schema.Message,
outChan chan<- string,
traceID string,
chatID string,
requestStart time.Time,
) (string, error) {
/*callStart := time.Now()*/
messages := make([]*schema.Message, 0)
messages = append(messages, schema.SystemMessage(SystemPrompt))
if len(chatHistory) > 0 {
@@ -97,13 +113,14 @@ func StreamChat(ctx context.Context, llm *ark.ChatModel, modelName string, userI
}
messages = append(messages, schema.UserMessage(userInput))
// 2) 发起流式请求
var thinking *ark.Thinking
if ifThinking {
thinking = &arkModel.Thinking{Type: arkModel.ThinkingTypeEnabled}
} else {
thinking = &arkModel.Thinking{Type: arkModel.ThinkingTypeDisabled}
}
/*connectStart := time.Now()*/
reader, err := llm.Stream(ctx, messages, ark.WithThinking(thinking))
if err != nil {
return "", err
@@ -116,8 +133,18 @@ func StreamChat(ctx context.Context, llm *ark.ChatModel, modelName string, userI
requestID := "chatcmpl-" + uuid.NewString()
created := time.Now().Unix()
firstChunk := true
chunkCount := 0
/*streamRecvStart := time.Now()
log.Printf("打点|流连接建立|trace_id=%s|chat_id=%s|request_id=%s|本步耗时_ms=%d|请求累计_ms=%d|history_len=%d",
traceID,
chatID,
requestID,
time.Since(connectStart).Milliseconds(),
time.Since(requestStart).Milliseconds(),
len(chatHistory),
)*/
// 3) 持续转发 chunk
var fullText strings.Builder
for {
chunk, err := reader.Recv()
@@ -136,11 +163,20 @@ func StreamChat(ctx context.Context, llm *ark.ChatModel, modelName string, userI
}
if payload != "" {
outChan <- payload
firstChunk = false
chunkCount++
/*if firstChunk {
log.Printf("打点|首包到达|trace_id=%s|chat_id=%s|request_id=%s|本步耗时_ms=%d|请求累计_ms=%d",
traceID,
chatID,
requestID,
time.Since(streamRecvStart).Milliseconds(),
time.Since(requestStart).Milliseconds(),
)
firstChunk = false
}*/
}
}
// 4) 发送结束 chunk 和 [DONE]
finishChunk, err := ToOpenAIFinishStream(requestID, modelName, created)
if err != nil {
return "", err
@@ -148,5 +184,15 @@ func StreamChat(ctx context.Context, llm *ark.ChatModel, modelName string, userI
outChan <- finishChunk
outChan <- "[DONE]"
/*log.Printf("打点|流式输出结束|trace_id=%s|chat_id=%s|request_id=%s|chunks=%d|reply_chars=%d|本步耗时_ms=%d|请求累计_ms=%d",
traceID,
chatID,
requestID,
chunkCount,
len(fullText.String()),
time.Since(callStart).Milliseconds(),
time.Since(requestStart).Milliseconds(),
)*/
return fullText.String(), nil
}

View File

@@ -11,11 +11,6 @@ import (
"gorm.io/gorm/clause"
)
// OutboxDAO 封装 outbox 表读写逻辑。
// outbox 状态机约定:
// pending -> published -> consumed成功终态
// pending/published -> pending失败重试
// pending/published -> dead不可恢复或达到最大重试
type OutboxDAO struct {
db *gorm.DB
}
@@ -24,11 +19,6 @@ func NewOutboxDAO(db *gorm.DB) *OutboxDAO {
return &OutboxDAO{db: db}
}
// CreateChatHistoryMessage 创建“聊天记录持久化”的 outbox 消息。
// 关键点:
// 1) 初始状态为 pending
// 2) NextRetryAt=now允许被“首次同步投递”或“扫描器”立即处理
// 3) payload 以 JSON 形式落表,保证消费端可重放。
func (d *OutboxDAO) CreateChatHistoryMessage(ctx context.Context, topic, messageKey string, payload model.ChatHistoryPersistPayload, maxRetry int) (int64, error) {
if maxRetry <= 0 {
maxRetry = 20
@@ -62,8 +52,6 @@ func (d *OutboxDAO) GetByID(ctx context.Context, id int64) (*model.AgentOutboxMe
return &msg, nil
}
// ListDueMessages 查询“到期可重试”的 pending 消息。
// 查询条件status=pending 且 next_retry_at<=当前时间。
func (d *OutboxDAO) ListDueMessages(ctx context.Context, limit int) ([]model.AgentOutboxMessage, error) {
if limit <= 0 {
limit = 100
@@ -81,10 +69,7 @@ func (d *OutboxDAO) ListDueMessages(ctx context.Context, limit int) ([]model.Age
return messages, nil
}
// MarkPublished 将消息标记为“已写入 Kafka”
// 注意:
// 1) 仅在非终态(非 consumed/dead下更新避免覆盖最终状态
// 2) 清理 next_retry_at避免已投递消息继续被扫描器重复拉取。
// MarkPublished 仅在消息未进入最终态时更新为 published避免覆盖 consumed/dead
func (d *OutboxDAO) MarkPublished(ctx context.Context, id int64) error {
now := time.Now()
updates := map[string]interface{}{
@@ -100,7 +85,6 @@ func (d *OutboxDAO) MarkPublished(ctx context.Context, id int64) error {
return result.Error
}
// MarkDead 将消息置为死信终态。
func (d *OutboxDAO) MarkDead(ctx context.Context, id int64, reason string) error {
now := time.Now()
lastErr := truncateError(reason)
@@ -113,11 +97,6 @@ func (d *OutboxDAO) MarkDead(ctx context.Context, id int64, reason string) error
return d.db.WithContext(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error
}
// MarkFailedForRetry 在失败时推进重试状态。
// 关键点:
// 1) 事务 + FOR UPDATE 防并发覆盖(尤其是 dispatch/consume 并发场景);
// 2) retry_count 自增;
// 3) 达到 max_retry 后转 dead否则按指数退避设置 next_retry_at。
func (d *OutboxDAO) MarkFailedForRetry(ctx context.Context, id int64, reason string) error {
return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var msg model.AgentOutboxMessage
@@ -125,7 +104,6 @@ func (d *OutboxDAO) MarkFailedForRetry(ctx context.Context, id int64, reason str
if err != nil {
return err
}
// 终态直接跳过,保持幂等。
if msg.Status == model.OutboxStatusConsumed || msg.Status == model.OutboxStatusDead {
return nil
}
@@ -153,8 +131,6 @@ func (d *OutboxDAO) MarkFailedForRetry(ctx context.Context, id int64, reason str
})
}
// PersistChatHistoryAndMarkConsumed 执行“消费业务”并回写 consumed。
// 这里把“写 chat_histories”与“更新 outbox 状态”放进同一事务,保证原子性。
func (d *OutboxDAO) PersistChatHistoryAndMarkConsumed(ctx context.Context, outboxID int64, payload model.ChatHistoryPersistPayload) error {
return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var outboxMsg model.AgentOutboxMessage
@@ -165,7 +141,6 @@ func (d *OutboxDAO) PersistChatHistoryAndMarkConsumed(ctx context.Context, outbo
}
return err
}
// 幂等保护:重复消费不重复落库。
if outboxMsg.Status == model.OutboxStatusConsumed {
return nil
}
@@ -197,7 +172,6 @@ func (d *OutboxDAO) PersistChatHistoryAndMarkConsumed(ctx context.Context, outbo
})
}
// calcRetryBackoff 指数退避(上限 2^5=32 秒)。
func calcRetryBackoff(retryCount int) time.Duration {
if retryCount <= 0 {
return time.Second

View File

@@ -7,8 +7,6 @@ import (
segmentkafka "github.com/segmentio/kafka-go"
)
// Consumer 是 Kafka 读取端封装。
// 采用“手动提交 offset”确保业务落库与 offset 提交的顺序可控。
type Consumer struct {
reader *segmentkafka.Reader
}
@@ -18,12 +16,11 @@ func NewConsumer(cfg Config) (*Consumer, error) {
return nil, errors.New("kafka brokers 未配置")
}
reader := segmentkafka.NewReader(segmentkafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topic,
GroupID: cfg.GroupID,
MinBytes: 1,
MaxBytes: 10e6,
// 关闭自动提交,业务处理成功后显式 Commit。
Brokers: cfg.Brokers,
Topic: cfg.Topic,
GroupID: cfg.GroupID,
MinBytes: 1,
MaxBytes: 10e6,
CommitInterval: 0,
StartOffset: segmentkafka.FirstOffset,
})
@@ -38,7 +35,6 @@ func (c *Consumer) Dequeue(ctx context.Context) (segmentkafka.Message, error) {
return c.reader.FetchMessage(ctx)
}
// Commit 显式提交 offset。
func (c *Consumer) Commit(ctx context.Context, msg segmentkafka.Message) error {
if c == nil || c.reader == nil {
return errors.New("kafka consumer 未初始化")

View File

@@ -7,8 +7,6 @@ import (
segmentkafka "github.com/segmentio/kafka-go"
)
// Producer 是 Kafka 写入端封装。
// 这里保持同步写Async=false方便把写入结果直接反馈给 outbox 状态机。
type Producer struct {
writer *segmentkafka.Writer
}
@@ -18,18 +16,15 @@ func NewProducer(cfg Config) (*Producer, error) {
return nil, errors.New("kafka brokers 未配置")
}
writer := &segmentkafka.Writer{
Addr: segmentkafka.TCP(cfg.Brokers...),
// Hash 分区器保证相同 key 落同一分区,利于同会话消息顺序。
Addr: segmentkafka.TCP(cfg.Brokers...),
Balancer: &segmentkafka.Hash{},
RequiredAcks: segmentkafka.RequireOne,
// 关闭异步,确保写失败时可立即触发 outbox 重试逻辑。
Async: false,
Async: false,
}
return &Producer{writer: writer}, nil
}
// Enqueue 将消息写入 Kafka。
// 成功仅代表“已被 Kafka 接收”,不代表业务已完成(业务完成由 consumer + 落库决定)。
func (p *Producer) Enqueue(ctx context.Context, topic, key string, value []byte) error {
if p == nil || p.writer == nil {
return errors.New("kafka producer 未初始化")

View File

@@ -4,6 +4,7 @@ import (
"context"
"log"
"strings"
"time"
"github.com/LoveLosita/smartflow/backend/agent"
"github.com/LoveLosita/smartflow/backend/conv"
@@ -48,9 +49,9 @@ func (s *AgentService) pickChatModel(requestModel string) (*ark.ChatModel, strin
return s.AIHub.Worker, "worker"
}
// saveChatHistoryReliable 聊天记录持久化的统一入口:
// 1) 启用 outbox + Kafka 时,走异步可靠链路
// 2) 未启用时,退化为同步写数据库。
// saveChatHistoryReliable 统一封装“聊天记录持久化入口
// 1) 开启异步链路时,走 outbox + Kafka
// 2) 未开启时,直接同步写库。
func (s *AgentService) saveChatHistoryReliable(ctx context.Context, payload model.ChatHistoryPersistPayload) error {
if s.asyncPipeline == nil {
return s.repo.SaveChatHistory(ctx, payload.UserID, payload.ConversationID, payload.Role, payload.Message)
@@ -62,20 +63,24 @@ func pushErrNonBlocking(errChan chan error, err error) {
select {
case errChan <- err:
default:
log.Printf("error channel is full, drop error: %v", err)
log.Printf("错误通道已满,丢弃错误: %v", err)
}
}
func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThinking bool, modelName string, userID int, chatID string) (<-chan string, <-chan error) {
// 1) 准备输出通道。
requestStart := time.Now()
traceID := uuid.NewString()
outChan := make(chan string, 5)
errChan := make(chan error, 1)
// 2) 规范会话 ID选择模型。
// 1) 规范会话 ID选择模型。
chatID = normalizeConversationID(chatID)
selectedModel, resolvedModelName := s.pickChatModel(modelName)
/*log.Printf("打点|请求开始|trace_id=%s|chat_id=%s|user_id=%d|model=%s|请求累计_ms=%d",
traceID, chatID, userID, resolvedModelName, time.Since(requestStart).Milliseconds())*/
// 3) 确保会话存在:先查缓存,再回源数据库,必要时创建新会话
// 2) 确保会话存在(优先缓存,必要时回源 DB 并创建)
result, err := s.agentCache.GetConversationStatus(ctx, chatID)
if err != nil {
errChan <- err
@@ -100,11 +105,11 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
}
}
if err = s.agentCache.SetConversationStatus(ctx, chatID); err != nil {
log.Printf("failed to set conversation status cache for %s: %v", chatID, err)
log.Printf("设置会话状态缓存失败 chat=%s: %v", chatID, err)
}
}
// 4) 组装历史上下文:先读缓存,缓存未命中再读数据库
// 3) 拉取并裁剪历史上下文
chatHistory, err := s.agentCache.GetHistory(ctx, chatID)
if err != nil {
errChan <- err
@@ -126,26 +131,23 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
chatHistory = conv.ToEinoMessages(histories)
}
// 5) 基于 token 预算裁剪历史,避免请求超长。
historyBudget := pkg.HistoryTokenBudgetByModel(resolvedModelName, agent.SystemPrompt, userMessage)
trimmedHistory, totalHistoryTokens, keptHistoryTokens, droppedCount := pkg.TrimHistoryByTokenBudget(chatHistory, historyBudget)
chatHistory = trimmedHistory
// 6) 根据裁剪结果调整 Redis 会话窗口,控制缓存体积。
targetWindow := pkg.CalcSessionWindowSize(len(chatHistory))
if err = s.agentCache.SetSessionWindowSize(ctx, chatID, targetWindow); err != nil {
log.Printf("failed to set history window for %s: %v", chatID, err)
log.Printf("设置历史窗口失败 chat=%s: %v", chatID, err)
}
if err = s.agentCache.EnforceHistoryWindow(ctx, chatID); err != nil {
log.Printf("failed to enforce history window for %s: %v", chatID, err)
log.Printf("执行历史窗口裁剪失败 chat=%s: %v", chatID, err)
}
if droppedCount > 0 {
log.Printf("agent history trimmed: chat=%s total_tokens=%d kept_tokens=%d dropped=%d budget=%d target_window=%d",
log.Printf("历史裁剪: chat=%s total_tokens=%d kept_tokens=%d dropped=%d budget=%d target_window=%d",
chatID, totalHistoryTokens, keptHistoryTokens, droppedCount, historyBudget, targetWindow)
}
// 缓存未命中时,把“裁剪后的历史”回填 Redis。
if cacheMiss {
if err = s.agentCache.BackfillHistory(ctx, chatID, chatHistory); err != nil {
errChan <- err
@@ -155,36 +157,60 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
}
}
// 7) 先同步写 Redis再把数据库持久化交给 outbox 可靠链路
if err = s.agentCache.PushMessage(ctx, chatID, &schema.Message{Role: schema.User, Content: userMessage}); err != nil {
log.Printf("failed to push user message into redis history: %v", err)
}
if err = s.saveChatHistoryReliable(ctx, model.ChatHistoryPersistPayload{
UserID: userID,
ConversationID: chatID,
Role: "user",
Message: userMessage,
}); err != nil {
errChan <- err
close(outChan)
close(errChan)
return outChan, errChan
}
// 单请求主链路打点:开流前准备完成
/*log.Printf("打点|开流前准备完成|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d|history_len=%d|cache_miss=%t",
traceID,
chatID,
time.Since(requestStart).Milliseconds(),
time.Since(requestStart).Milliseconds(),
len(chatHistory),
cacheMiss,
)*/
// 8) 启动流式对话
// 4) 启动流式输出,回答完成后执行后置持久化
go func() {
defer close(outChan)
fullText, streamErr := agent.StreamChat(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan)
/*streamStart := time.Now()*/
fullText, streamErr := agent.StreamChat(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan, traceID, chatID, requestStart)
if streamErr != nil {
pushErrNonBlocking(errChan, streamErr)
return
}
/*log.Printf("打点|流式输出完成|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d|reply_chars=%d",
traceID, chatID, time.Since(streamStart).Milliseconds(), time.Since(requestStart).Milliseconds(), len(fullText))
// 9) 助手回答完成后,重复同样流程:先写 Redis再异步持久化。
if cacheErr := s.agentCache.PushMessage(context.Background(), chatID, &schema.Message{Role: schema.Assistant, Content: fullText}); cacheErr != nil {
log.Printf("failed to push assistant message into redis history: %v", cacheErr)
postPersistStart := time.Now()
stepStart := time.Now()*/
if err = s.agentCache.PushMessage(ctx, chatID, &schema.Message{Role: schema.User, Content: userMessage}); err != nil {
log.Printf("写入用户消息到 Redis 失败: %v", err)
}
/*log.Printf("打点|后置持久化_用户_写Redis|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d",
traceID, chatID, time.Since(stepStart).Milliseconds(), time.Since(requestStart).Milliseconds())
stepStart = time.Now()*/
if err = s.saveChatHistoryReliable(ctx, model.ChatHistoryPersistPayload{
UserID: userID,
ConversationID: chatID,
Role: "user",
Message: userMessage,
}); err != nil {
errChan <- err
close(outChan)
close(errChan)
}
/*log.Printf("打点|后置持久化_用户_写持久化请求|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d",
traceID, chatID, time.Since(stepStart).Milliseconds(), time.Since(requestStart).Milliseconds())
stepStart = time.Now()
if cacheErr := s.agentCache.PushMessage(context.Background(), chatID, &schema.Message{Role: schema.Assistant, Content: fullText}); cacheErr != nil {
log.Printf("写入助手消息到 Redis 失败: %v", cacheErr)
}
log.Printf("打点|后置持久化_助手_写Redis|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d",
traceID, chatID, time.Since(stepStart).Milliseconds(), time.Since(requestStart).Milliseconds())
stepStart = time.Now()*/
if saveErr := s.saveChatHistoryReliable(context.Background(), model.ChatHistoryPersistPayload{
UserID: userID,
ConversationID: chatID,
@@ -193,6 +219,11 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
}); saveErr != nil {
pushErrNonBlocking(errChan, saveErr)
}
/*log.Printf("打点|后置持久化_助手_写持久化请求|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d",
traceID, chatID, time.Since(stepStart).Milliseconds(), time.Since(requestStart).Milliseconds())
log.Printf("打点|后置持久化完成|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d",
traceID, chatID, time.Since(postPersistStart).Milliseconds(), time.Since(requestStart).Milliseconds())*/
}()
return outChan, errChan

View File

@@ -15,11 +15,7 @@ import (
"gorm.io/gorm"
)
// AgentAsyncPipeline 负责 outbox 的“异步可靠链路”:
// 1) 业务侧写 outboxpending
// 2) dispatch loop 扫描并投递 Kafkapublished
// 3) consume loop 消费并落库consumed
// 4) 任一步失败按重试策略回到 pending 或 dead。
// AgentAsyncPipeline 负责 outbox 扫描、Kafka 投递与消费落库。
type AgentAsyncPipeline struct {
outboxRepo *dao.OutboxDAO
producer *kafkabus.Producer
@@ -56,9 +52,6 @@ func NewAgentAsyncPipeline(outboxRepo *dao.OutboxDAO, cfg kafkabus.Config) (*Age
}, nil
}
// Start 启动两个后台协程:
// - startDispatchLoop扫描 pending 并投递 Kafka
// - startConsumeLoop消费 Kafka 并执行业务落库
func (p *AgentAsyncPipeline) Start(ctx context.Context) {
if p == nil {
return
@@ -66,8 +59,6 @@ func (p *AgentAsyncPipeline) Start(ctx context.Context) {
log.Printf("Kafka async pipeline starting: topic=%s brokers=%v retry_scan=%s batch=%d", p.topic, p.brokers, p.scanEvery, p.scanBatch)
if err := kafkabus.WaitTopicReady(ctx, p.brokers, p.topic, 30*time.Second); err != nil {
// 首次部署常见情况broker 已起来但 topic/partition 尚未可用。
// 这里明确打印,避免“消息堆积但控制台无提示”的观感。
log.Printf("Kafka topic not ready before consume loop start: %v", err)
} else {
log.Printf("Kafka topic is ready: %s", p.topic)
@@ -89,25 +80,22 @@ func (p *AgentAsyncPipeline) Close() {
}
}
// EnqueueChatHistoryPersist 是业务侧入口:
// 1) 先写 outbox
// 2) 立刻尝试“首发投递”一次(非阻塞主流程);
// 3) 失败后由扫描器按 next_retry_at 继续重试。
// EnqueueChatHistoryPersist 仅把消息写入 outbox。
//
// 关键设计:
// 1) 不再在请求路径里做“首次同步投递 Kafka”
// 2) 投递统一由 startDispatchLoop 异步扫描执行;
// 3) CreateChatHistoryMessage 会设置 next_retry_at=now扫描器下一轮即可捞取。
//
// 这样可以把请求链路成本收敛到“写 outbox”避免 Kafka 写入延迟污染首字和主链路时延。
func (p *AgentAsyncPipeline) EnqueueChatHistoryPersist(ctx context.Context, payload model.ChatHistoryPersistPayload) error {
if p == nil {
return errors.New("Kafka 异步链路未初始化")
}
outboxID, err := p.outboxRepo.CreateChatHistoryMessage(ctx, p.topic, payload.ConversationID, payload, p.maxRetry)
if err != nil {
return err
}
if err = p.dispatchOne(context.Background(), outboxID); err != nil {
log.Printf("outbox 消息 %d 首次投递失败,等待扫描重试: %v", outboxID, err)
}
return nil
_, err := p.outboxRepo.CreateChatHistoryMessage(ctx, p.topic, payload.ConversationID, payload, p.maxRetry)
return err
}
// startDispatchLoop 定时扫描 pending 且到期的消息,逐条尝试投递。
func (p *AgentAsyncPipeline) startDispatchLoop(ctx context.Context) {
ticker := time.NewTicker(p.scanEvery)
defer ticker.Stop()
@@ -134,11 +122,6 @@ func (p *AgentAsyncPipeline) startDispatchLoop(ctx context.Context) {
}
}
// dispatchOne 执行单条 outbox 投递:
// 1) 读取 outbox 行;
// 2) 组装 Envelope
// 3) 写 Kafka
// 4) 回写 published。
func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) error {
outboxMsg, err := p.outboxRepo.GetByID(ctx, outboxID)
if err != nil {
@@ -147,7 +130,6 @@ func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) er
}
return err
}
// 终态不再重复投递。
if outboxMsg.Status == model.OutboxStatusConsumed || outboxMsg.Status == model.OutboxStatusDead {
return nil
}
@@ -159,7 +141,6 @@ func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) er
}
raw, err := json.Marshal(envelope)
if err != nil {
// 序列化都失败通常是坏数据,直接死信。
markErr := p.outboxRepo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 包装失败: "+err.Error())
if markErr != nil {
log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr)
@@ -178,7 +159,6 @@ func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) er
return nil
}
// startConsumeLoop 持续从 Kafka 拉取消息并处理。
func (p *AgentAsyncPipeline) startConsumeLoop(ctx context.Context) {
for {
select {
@@ -202,11 +182,9 @@ func (p *AgentAsyncPipeline) startConsumeLoop(ctx context.Context) {
}
}
// handleMessage 先解析 Envelope再按 biz_type 分发到具体处理器。
func (p *AgentAsyncPipeline) handleMessage(ctx context.Context, msg segmentkafka.Message) error {
var envelope kafkabus.Envelope
if err := json.Unmarshal(msg.Value, &envelope); err != nil {
// 包装体坏数据,提交 offset 跳过,避免阻塞分区。
_ = p.consumer.Commit(ctx, msg)
return fmt.Errorf("解析 Kafka 包装失败: %w", err)
}
@@ -219,7 +197,6 @@ func (p *AgentAsyncPipeline) handleMessage(ctx context.Context, msg segmentkafka
case model.OutboxBizTypeChatHistoryPersist:
return p.consumeChatHistory(ctx, msg, envelope)
default:
// 未知业务类型直接死信并提交,避免反复重试无意义数据。
_ = p.outboxRepo.MarkDead(ctx, envelope.OutboxID, "未知业务类型: "+envelope.BizType)
if err := p.consumer.Commit(ctx, msg); err != nil {
return err
@@ -228,10 +205,6 @@ func (p *AgentAsyncPipeline) handleMessage(ctx context.Context, msg segmentkafka
}
}
// consumeChatHistory 执行“聊天记录持久化”消费逻辑。
// 提交策略说明:
// 1) 成功落库后提交;
// 2) 失败时先回写 outbox便于重试/排障),再提交,避免分区阻塞。
func (p *AgentAsyncPipeline) consumeChatHistory(ctx context.Context, msg segmentkafka.Message, envelope kafkabus.Envelope) error {
var payload model.ChatHistoryPersistPayload
if err := json.Unmarshal(envelope.Payload, &payload); err != nil {