diff --git a/backend/agent/graph.go b/backend/agent/graph.go index 62ec996..245ebd9 100644 --- a/backend/agent/graph.go +++ b/backend/agent/graph.go @@ -13,7 +13,7 @@ import ( arkModel "github.com/volcengine/volcengine-go-sdk/service/arkruntime/model" ) -// StreamResponse 为 OpenAI/DeepSeek 兼容的流式 chunk 结构。 +// StreamResponse 是 OpenAI/DeepSeek 兼容的流式 chunk 结构。 type StreamResponse struct { ID string `json:"id"` Object string `json:"object"` @@ -88,8 +88,24 @@ func ToOpenAIFinishStream(requestID, modelName string, created int64) (string, e return string(jsonBytes), nil } -func StreamChat(ctx context.Context, llm *ark.ChatModel, modelName string, userInput string, ifThinking bool, chatHistory []*schema.Message, outChan chan<- string) (string, error) { - // 1) 组装提示消息 +// StreamChat 负责模型流式输出,并在关键节点打点: +// 1) 流连接建立(llm.Stream 返回) +// 2) 首包到达(首字延迟) +// 3) 流式输出结束 +func StreamChat( + ctx context.Context, + llm *ark.ChatModel, + modelName string, + userInput string, + ifThinking bool, + chatHistory []*schema.Message, + outChan chan<- string, + traceID string, + chatID string, + requestStart time.Time, +) (string, error) { + /*callStart := time.Now()*/ + messages := make([]*schema.Message, 0) messages = append(messages, schema.SystemMessage(SystemPrompt)) if len(chatHistory) > 0 { @@ -97,13 +113,14 @@ func StreamChat(ctx context.Context, llm *ark.ChatModel, modelName string, userI } messages = append(messages, schema.UserMessage(userInput)) - // 2) 发起流式请求 var thinking *ark.Thinking if ifThinking { thinking = &arkModel.Thinking{Type: arkModel.ThinkingTypeEnabled} } else { thinking = &arkModel.Thinking{Type: arkModel.ThinkingTypeDisabled} } + + /*connectStart := time.Now()*/ reader, err := llm.Stream(ctx, messages, ark.WithThinking(thinking)) if err != nil { return "", err @@ -116,8 +133,18 @@ func StreamChat(ctx context.Context, llm *ark.ChatModel, modelName string, userI requestID := "chatcmpl-" + uuid.NewString() created := time.Now().Unix() firstChunk := true + chunkCount := 0 + /*streamRecvStart := time.Now() + + log.Printf("打点|流连接建立|trace_id=%s|chat_id=%s|request_id=%s|本步耗时_ms=%d|请求累计_ms=%d|history_len=%d", + traceID, + chatID, + requestID, + time.Since(connectStart).Milliseconds(), + time.Since(requestStart).Milliseconds(), + len(chatHistory), + )*/ - // 3) 持续转发 chunk var fullText strings.Builder for { chunk, err := reader.Recv() @@ -136,11 +163,20 @@ func StreamChat(ctx context.Context, llm *ark.ChatModel, modelName string, userI } if payload != "" { outChan <- payload - firstChunk = false + chunkCount++ + /*if firstChunk { + log.Printf("打点|首包到达|trace_id=%s|chat_id=%s|request_id=%s|本步耗时_ms=%d|请求累计_ms=%d", + traceID, + chatID, + requestID, + time.Since(streamRecvStart).Milliseconds(), + time.Since(requestStart).Milliseconds(), + ) + firstChunk = false + }*/ } } - // 4) 发送结束 chunk 和 [DONE] finishChunk, err := ToOpenAIFinishStream(requestID, modelName, created) if err != nil { return "", err @@ -148,5 +184,15 @@ func StreamChat(ctx context.Context, llm *ark.ChatModel, modelName string, userI outChan <- finishChunk outChan <- "[DONE]" + /*log.Printf("打点|流式输出结束|trace_id=%s|chat_id=%s|request_id=%s|chunks=%d|reply_chars=%d|本步耗时_ms=%d|请求累计_ms=%d", + traceID, + chatID, + requestID, + chunkCount, + len(fullText.String()), + time.Since(callStart).Milliseconds(), + time.Since(requestStart).Milliseconds(), + )*/ + return fullText.String(), nil } diff --git a/backend/dao/outbox.go b/backend/dao/outbox.go index 7553192..433c781 100644 --- a/backend/dao/outbox.go +++ b/backend/dao/outbox.go @@ -11,11 +11,6 @@ import ( "gorm.io/gorm/clause" ) -// OutboxDAO 封装 outbox 表读写逻辑。 -// outbox 状态机约定: -// pending -> published -> consumed(成功终态) -// pending/published -> pending(失败重试) -// pending/published -> dead(不可恢复或达到最大重试) type OutboxDAO struct { db *gorm.DB } @@ -24,11 +19,6 @@ func NewOutboxDAO(db *gorm.DB) *OutboxDAO { return &OutboxDAO{db: db} } -// CreateChatHistoryMessage 创建“聊天记录持久化”的 outbox 消息。 -// 关键点: -// 1) 初始状态为 pending; -// 2) NextRetryAt=now,允许被“首次同步投递”或“扫描器”立即处理; -// 3) payload 以 JSON 形式落表,保证消费端可重放。 func (d *OutboxDAO) CreateChatHistoryMessage(ctx context.Context, topic, messageKey string, payload model.ChatHistoryPersistPayload, maxRetry int) (int64, error) { if maxRetry <= 0 { maxRetry = 20 @@ -62,8 +52,6 @@ func (d *OutboxDAO) GetByID(ctx context.Context, id int64) (*model.AgentOutboxMe return &msg, nil } -// ListDueMessages 查询“到期可重试”的 pending 消息。 -// 查询条件:status=pending 且 next_retry_at<=当前时间。 func (d *OutboxDAO) ListDueMessages(ctx context.Context, limit int) ([]model.AgentOutboxMessage, error) { if limit <= 0 { limit = 100 @@ -81,10 +69,7 @@ func (d *OutboxDAO) ListDueMessages(ctx context.Context, limit int) ([]model.Age return messages, nil } -// MarkPublished 将消息标记为“已写入 Kafka”。 -// 注意: -// 1) 仅在非终态(非 consumed/dead)下更新,避免覆盖最终状态; -// 2) 清理 next_retry_at,避免已投递消息继续被扫描器重复拉取。 +// MarkPublished 仅在消息未进入最终态时更新为 published,避免覆盖 consumed/dead。 func (d *OutboxDAO) MarkPublished(ctx context.Context, id int64) error { now := time.Now() updates := map[string]interface{}{ @@ -100,7 +85,6 @@ func (d *OutboxDAO) MarkPublished(ctx context.Context, id int64) error { return result.Error } -// MarkDead 将消息置为死信终态。 func (d *OutboxDAO) MarkDead(ctx context.Context, id int64, reason string) error { now := time.Now() lastErr := truncateError(reason) @@ -113,11 +97,6 @@ func (d *OutboxDAO) MarkDead(ctx context.Context, id int64, reason string) error return d.db.WithContext(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error } -// MarkFailedForRetry 在失败时推进重试状态。 -// 关键点: -// 1) 事务 + FOR UPDATE 防并发覆盖(尤其是 dispatch/consume 并发场景); -// 2) retry_count 自增; -// 3) 达到 max_retry 后转 dead,否则按指数退避设置 next_retry_at。 func (d *OutboxDAO) MarkFailedForRetry(ctx context.Context, id int64, reason string) error { return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var msg model.AgentOutboxMessage @@ -125,7 +104,6 @@ func (d *OutboxDAO) MarkFailedForRetry(ctx context.Context, id int64, reason str if err != nil { return err } - // 终态直接跳过,保持幂等。 if msg.Status == model.OutboxStatusConsumed || msg.Status == model.OutboxStatusDead { return nil } @@ -153,8 +131,6 @@ func (d *OutboxDAO) MarkFailedForRetry(ctx context.Context, id int64, reason str }) } -// PersistChatHistoryAndMarkConsumed 执行“消费业务”并回写 consumed。 -// 这里把“写 chat_histories”与“更新 outbox 状态”放进同一事务,保证原子性。 func (d *OutboxDAO) PersistChatHistoryAndMarkConsumed(ctx context.Context, outboxID int64, payload model.ChatHistoryPersistPayload) error { return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var outboxMsg model.AgentOutboxMessage @@ -165,7 +141,6 @@ func (d *OutboxDAO) PersistChatHistoryAndMarkConsumed(ctx context.Context, outbo } return err } - // 幂等保护:重复消费不重复落库。 if outboxMsg.Status == model.OutboxStatusConsumed { return nil } @@ -197,7 +172,6 @@ func (d *OutboxDAO) PersistChatHistoryAndMarkConsumed(ctx context.Context, outbo }) } -// calcRetryBackoff 指数退避(上限 2^5=32 秒)。 func calcRetryBackoff(retryCount int) time.Duration { if retryCount <= 0 { return time.Second diff --git a/backend/kafka/consumer.go b/backend/kafka/consumer.go index ff77f11..4124e1c 100644 --- a/backend/kafka/consumer.go +++ b/backend/kafka/consumer.go @@ -7,8 +7,6 @@ import ( segmentkafka "github.com/segmentio/kafka-go" ) -// Consumer 是 Kafka 读取端封装。 -// 采用“手动提交 offset”,确保业务落库与 offset 提交的顺序可控。 type Consumer struct { reader *segmentkafka.Reader } @@ -18,12 +16,11 @@ func NewConsumer(cfg Config) (*Consumer, error) { return nil, errors.New("kafka brokers 未配置") } reader := segmentkafka.NewReader(segmentkafka.ReaderConfig{ - Brokers: cfg.Brokers, - Topic: cfg.Topic, - GroupID: cfg.GroupID, - MinBytes: 1, - MaxBytes: 10e6, - // 关闭自动提交,业务处理成功后显式 Commit。 + Brokers: cfg.Brokers, + Topic: cfg.Topic, + GroupID: cfg.GroupID, + MinBytes: 1, + MaxBytes: 10e6, CommitInterval: 0, StartOffset: segmentkafka.FirstOffset, }) @@ -38,7 +35,6 @@ func (c *Consumer) Dequeue(ctx context.Context) (segmentkafka.Message, error) { return c.reader.FetchMessage(ctx) } -// Commit 显式提交 offset。 func (c *Consumer) Commit(ctx context.Context, msg segmentkafka.Message) error { if c == nil || c.reader == nil { return errors.New("kafka consumer 未初始化") diff --git a/backend/kafka/producer.go b/backend/kafka/producer.go index 05034e1..c587dc8 100644 --- a/backend/kafka/producer.go +++ b/backend/kafka/producer.go @@ -7,8 +7,6 @@ import ( segmentkafka "github.com/segmentio/kafka-go" ) -// Producer 是 Kafka 写入端封装。 -// 这里保持同步写(Async=false),方便把写入结果直接反馈给 outbox 状态机。 type Producer struct { writer *segmentkafka.Writer } @@ -18,18 +16,15 @@ func NewProducer(cfg Config) (*Producer, error) { return nil, errors.New("kafka brokers 未配置") } writer := &segmentkafka.Writer{ - Addr: segmentkafka.TCP(cfg.Brokers...), - // Hash 分区器保证相同 key 落同一分区,利于同会话消息顺序。 + Addr: segmentkafka.TCP(cfg.Brokers...), Balancer: &segmentkafka.Hash{}, RequiredAcks: segmentkafka.RequireOne, - // 关闭异步,确保写失败时可立即触发 outbox 重试逻辑。 - Async: false, + Async: false, } return &Producer{writer: writer}, nil } // Enqueue 将消息写入 Kafka。 -// 成功仅代表“已被 Kafka 接收”,不代表业务已完成(业务完成由 consumer + 落库决定)。 func (p *Producer) Enqueue(ctx context.Context, topic, key string, value []byte) error { if p == nil || p.writer == nil { return errors.New("kafka producer 未初始化") diff --git a/backend/service/agent.go b/backend/service/agent.go index 9982a1f..3f0892e 100644 --- a/backend/service/agent.go +++ b/backend/service/agent.go @@ -4,6 +4,7 @@ import ( "context" "log" "strings" + "time" "github.com/LoveLosita/smartflow/backend/agent" "github.com/LoveLosita/smartflow/backend/conv" @@ -48,9 +49,9 @@ func (s *AgentService) pickChatModel(requestModel string) (*ark.ChatModel, strin return s.AIHub.Worker, "worker" } -// saveChatHistoryReliable 是聊天记录持久化的统一入口: -// 1) 启用 outbox + Kafka 时,走异步可靠链路; -// 2) 未启用时,退化为同步写数据库。 +// saveChatHistoryReliable 统一封装“聊天记录持久化入口”: +// 1) 开启异步链路时,走 outbox + Kafka; +// 2) 未开启时,直接同步写库。 func (s *AgentService) saveChatHistoryReliable(ctx context.Context, payload model.ChatHistoryPersistPayload) error { if s.asyncPipeline == nil { return s.repo.SaveChatHistory(ctx, payload.UserID, payload.ConversationID, payload.Role, payload.Message) @@ -62,20 +63,24 @@ func pushErrNonBlocking(errChan chan error, err error) { select { case errChan <- err: default: - log.Printf("error channel is full, drop error: %v", err) + log.Printf("错误通道已满,丢弃错误: %v", err) } } func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThinking bool, modelName string, userID int, chatID string) (<-chan string, <-chan error) { - // 1) 准备输出通道。 + requestStart := time.Now() + traceID := uuid.NewString() + outChan := make(chan string, 5) errChan := make(chan error, 1) - // 2) 规范会话 ID 并选择模型。 + // 1) 规范会话 ID,选择模型。 chatID = normalizeConversationID(chatID) selectedModel, resolvedModelName := s.pickChatModel(modelName) + /*log.Printf("打点|请求开始|trace_id=%s|chat_id=%s|user_id=%d|model=%s|请求累计_ms=%d", + traceID, chatID, userID, resolvedModelName, time.Since(requestStart).Milliseconds())*/ - // 3) 确保会话存在:先查缓存,再回源数据库,必要时创建新会话。 + // 2) 确保会话存在(优先缓存,必要时回源 DB 并创建)。 result, err := s.agentCache.GetConversationStatus(ctx, chatID) if err != nil { errChan <- err @@ -100,11 +105,11 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin } } if err = s.agentCache.SetConversationStatus(ctx, chatID); err != nil { - log.Printf("failed to set conversation status cache for %s: %v", chatID, err) + log.Printf("设置会话状态缓存失败 chat=%s: %v", chatID, err) } } - // 4) 组装历史上下文:先读缓存,缓存未命中再读数据库。 + // 3) 拉取并裁剪历史上下文。 chatHistory, err := s.agentCache.GetHistory(ctx, chatID) if err != nil { errChan <- err @@ -126,26 +131,23 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin chatHistory = conv.ToEinoMessages(histories) } - // 5) 基于 token 预算裁剪历史,避免请求超长。 historyBudget := pkg.HistoryTokenBudgetByModel(resolvedModelName, agent.SystemPrompt, userMessage) trimmedHistory, totalHistoryTokens, keptHistoryTokens, droppedCount := pkg.TrimHistoryByTokenBudget(chatHistory, historyBudget) chatHistory = trimmedHistory - // 6) 根据裁剪结果调整 Redis 会话窗口,控制缓存体积。 targetWindow := pkg.CalcSessionWindowSize(len(chatHistory)) if err = s.agentCache.SetSessionWindowSize(ctx, chatID, targetWindow); err != nil { - log.Printf("failed to set history window for %s: %v", chatID, err) + log.Printf("设置历史窗口失败 chat=%s: %v", chatID, err) } if err = s.agentCache.EnforceHistoryWindow(ctx, chatID); err != nil { - log.Printf("failed to enforce history window for %s: %v", chatID, err) + log.Printf("执行历史窗口裁剪失败 chat=%s: %v", chatID, err) } if droppedCount > 0 { - log.Printf("agent history trimmed: chat=%s total_tokens=%d kept_tokens=%d dropped=%d budget=%d target_window=%d", + log.Printf("历史裁剪: chat=%s total_tokens=%d kept_tokens=%d dropped=%d budget=%d target_window=%d", chatID, totalHistoryTokens, keptHistoryTokens, droppedCount, historyBudget, targetWindow) } - // 缓存未命中时,把“裁剪后的历史”回填 Redis。 if cacheMiss { if err = s.agentCache.BackfillHistory(ctx, chatID, chatHistory); err != nil { errChan <- err @@ -155,36 +157,60 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin } } - // 7) 先同步写 Redis,再把数据库持久化交给 outbox 可靠链路。 - if err = s.agentCache.PushMessage(ctx, chatID, &schema.Message{Role: schema.User, Content: userMessage}); err != nil { - log.Printf("failed to push user message into redis history: %v", err) - } - if err = s.saveChatHistoryReliable(ctx, model.ChatHistoryPersistPayload{ - UserID: userID, - ConversationID: chatID, - Role: "user", - Message: userMessage, - }); err != nil { - errChan <- err - close(outChan) - close(errChan) - return outChan, errChan - } + // 单请求主链路打点:开流前准备完成。 + /*log.Printf("打点|开流前准备完成|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d|history_len=%d|cache_miss=%t", + traceID, + chatID, + time.Since(requestStart).Milliseconds(), + time.Since(requestStart).Milliseconds(), + len(chatHistory), + cacheMiss, + )*/ - // 8) 启动流式对话。 + // 4) 启动流式输出,回答完成后执行后置持久化。 go func() { defer close(outChan) - fullText, streamErr := agent.StreamChat(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan) + /*streamStart := time.Now()*/ + fullText, streamErr := agent.StreamChat(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan, traceID, chatID, requestStart) if streamErr != nil { pushErrNonBlocking(errChan, streamErr) return } + /*log.Printf("打点|流式输出完成|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d|reply_chars=%d", + traceID, chatID, time.Since(streamStart).Milliseconds(), time.Since(requestStart).Milliseconds(), len(fullText)) - // 9) 助手回答完成后,重复同样流程:先写 Redis,再异步持久化。 - if cacheErr := s.agentCache.PushMessage(context.Background(), chatID, &schema.Message{Role: schema.Assistant, Content: fullText}); cacheErr != nil { - log.Printf("failed to push assistant message into redis history: %v", cacheErr) + postPersistStart := time.Now() + + stepStart := time.Now()*/ + if err = s.agentCache.PushMessage(ctx, chatID, &schema.Message{Role: schema.User, Content: userMessage}); err != nil { + log.Printf("写入用户消息到 Redis 失败: %v", err) } + /*log.Printf("打点|后置持久化_用户_写Redis|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d", + traceID, chatID, time.Since(stepStart).Milliseconds(), time.Since(requestStart).Milliseconds()) + + stepStart = time.Now()*/ + if err = s.saveChatHistoryReliable(ctx, model.ChatHistoryPersistPayload{ + UserID: userID, + ConversationID: chatID, + Role: "user", + Message: userMessage, + }); err != nil { + errChan <- err + close(outChan) + close(errChan) + } + /*log.Printf("打点|后置持久化_用户_写持久化请求|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d", + traceID, chatID, time.Since(stepStart).Milliseconds(), time.Since(requestStart).Milliseconds()) + + stepStart = time.Now() + if cacheErr := s.agentCache.PushMessage(context.Background(), chatID, &schema.Message{Role: schema.Assistant, Content: fullText}); cacheErr != nil { + log.Printf("写入助手消息到 Redis 失败: %v", cacheErr) + } + log.Printf("打点|后置持久化_助手_写Redis|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d", + traceID, chatID, time.Since(stepStart).Milliseconds(), time.Since(requestStart).Milliseconds()) + + stepStart = time.Now()*/ if saveErr := s.saveChatHistoryReliable(context.Background(), model.ChatHistoryPersistPayload{ UserID: userID, ConversationID: chatID, @@ -193,6 +219,11 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin }); saveErr != nil { pushErrNonBlocking(errChan, saveErr) } + /*log.Printf("打点|后置持久化_助手_写持久化请求|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d", + traceID, chatID, time.Since(stepStart).Milliseconds(), time.Since(requestStart).Milliseconds()) + + log.Printf("打点|后置持久化完成|trace_id=%s|chat_id=%s|本步耗时_ms=%d|请求累计_ms=%d", + traceID, chatID, time.Since(postPersistStart).Milliseconds(), time.Since(requestStart).Milliseconds())*/ }() return outChan, errChan diff --git a/backend/service/agent_async_pipeline.go b/backend/service/agent_async_pipeline.go index f1d8f93..6e2b687 100644 --- a/backend/service/agent_async_pipeline.go +++ b/backend/service/agent_async_pipeline.go @@ -15,11 +15,7 @@ import ( "gorm.io/gorm" ) -// AgentAsyncPipeline 负责 outbox 的“异步可靠链路”: -// 1) 业务侧写 outbox(pending); -// 2) dispatch loop 扫描并投递 Kafka(published); -// 3) consume loop 消费并落库(consumed); -// 4) 任一步失败按重试策略回到 pending 或 dead。 +// AgentAsyncPipeline 负责 outbox 扫描、Kafka 投递与消费落库。 type AgentAsyncPipeline struct { outboxRepo *dao.OutboxDAO producer *kafkabus.Producer @@ -56,9 +52,6 @@ func NewAgentAsyncPipeline(outboxRepo *dao.OutboxDAO, cfg kafkabus.Config) (*Age }, nil } -// Start 启动两个后台协程: -// - startDispatchLoop:扫描 pending 并投递 Kafka -// - startConsumeLoop:消费 Kafka 并执行业务落库 func (p *AgentAsyncPipeline) Start(ctx context.Context) { if p == nil { return @@ -66,8 +59,6 @@ func (p *AgentAsyncPipeline) Start(ctx context.Context) { log.Printf("Kafka async pipeline starting: topic=%s brokers=%v retry_scan=%s batch=%d", p.topic, p.brokers, p.scanEvery, p.scanBatch) if err := kafkabus.WaitTopicReady(ctx, p.brokers, p.topic, 30*time.Second); err != nil { - // 首次部署常见情况:broker 已起来但 topic/partition 尚未可用。 - // 这里明确打印,避免“消息堆积但控制台无提示”的观感。 log.Printf("Kafka topic not ready before consume loop start: %v", err) } else { log.Printf("Kafka topic is ready: %s", p.topic) @@ -89,25 +80,22 @@ func (p *AgentAsyncPipeline) Close() { } } -// EnqueueChatHistoryPersist 是业务侧入口: -// 1) 先写 outbox; -// 2) 立刻尝试“首发投递”一次(非阻塞主流程); -// 3) 失败后由扫描器按 next_retry_at 继续重试。 +// EnqueueChatHistoryPersist 仅把消息写入 outbox。 +// +// 关键设计: +// 1) 不再在请求路径里做“首次同步投递 Kafka”; +// 2) 投递统一由 startDispatchLoop 异步扫描执行; +// 3) CreateChatHistoryMessage 会设置 next_retry_at=now,扫描器下一轮即可捞取。 +// +// 这样可以把请求链路成本收敛到“写 outbox”,避免 Kafka 写入延迟污染首字和主链路时延。 func (p *AgentAsyncPipeline) EnqueueChatHistoryPersist(ctx context.Context, payload model.ChatHistoryPersistPayload) error { if p == nil { return errors.New("Kafka 异步链路未初始化") } - outboxID, err := p.outboxRepo.CreateChatHistoryMessage(ctx, p.topic, payload.ConversationID, payload, p.maxRetry) - if err != nil { - return err - } - if err = p.dispatchOne(context.Background(), outboxID); err != nil { - log.Printf("outbox 消息 %d 首次投递失败,等待扫描重试: %v", outboxID, err) - } - return nil + _, err := p.outboxRepo.CreateChatHistoryMessage(ctx, p.topic, payload.ConversationID, payload, p.maxRetry) + return err } -// startDispatchLoop 定时扫描 pending 且到期的消息,逐条尝试投递。 func (p *AgentAsyncPipeline) startDispatchLoop(ctx context.Context) { ticker := time.NewTicker(p.scanEvery) defer ticker.Stop() @@ -134,11 +122,6 @@ func (p *AgentAsyncPipeline) startDispatchLoop(ctx context.Context) { } } -// dispatchOne 执行单条 outbox 投递: -// 1) 读取 outbox 行; -// 2) 组装 Envelope; -// 3) 写 Kafka; -// 4) 回写 published。 func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) error { outboxMsg, err := p.outboxRepo.GetByID(ctx, outboxID) if err != nil { @@ -147,7 +130,6 @@ func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) er } return err } - // 终态不再重复投递。 if outboxMsg.Status == model.OutboxStatusConsumed || outboxMsg.Status == model.OutboxStatusDead { return nil } @@ -159,7 +141,6 @@ func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) er } raw, err := json.Marshal(envelope) if err != nil { - // 序列化都失败通常是坏数据,直接死信。 markErr := p.outboxRepo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 包装失败: "+err.Error()) if markErr != nil { log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr) @@ -178,7 +159,6 @@ func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) er return nil } -// startConsumeLoop 持续从 Kafka 拉取消息并处理。 func (p *AgentAsyncPipeline) startConsumeLoop(ctx context.Context) { for { select { @@ -202,11 +182,9 @@ func (p *AgentAsyncPipeline) startConsumeLoop(ctx context.Context) { } } -// handleMessage 先解析 Envelope,再按 biz_type 分发到具体处理器。 func (p *AgentAsyncPipeline) handleMessage(ctx context.Context, msg segmentkafka.Message) error { var envelope kafkabus.Envelope if err := json.Unmarshal(msg.Value, &envelope); err != nil { - // 包装体坏数据,提交 offset 跳过,避免阻塞分区。 _ = p.consumer.Commit(ctx, msg) return fmt.Errorf("解析 Kafka 包装失败: %w", err) } @@ -219,7 +197,6 @@ func (p *AgentAsyncPipeline) handleMessage(ctx context.Context, msg segmentkafka case model.OutboxBizTypeChatHistoryPersist: return p.consumeChatHistory(ctx, msg, envelope) default: - // 未知业务类型直接死信并提交,避免反复重试无意义数据。 _ = p.outboxRepo.MarkDead(ctx, envelope.OutboxID, "未知业务类型: "+envelope.BizType) if err := p.consumer.Commit(ctx, msg); err != nil { return err @@ -228,10 +205,6 @@ func (p *AgentAsyncPipeline) handleMessage(ctx context.Context, msg segmentkafka } } -// consumeChatHistory 执行“聊天记录持久化”消费逻辑。 -// 提交策略说明: -// 1) 成功落库后提交; -// 2) 失败时先回写 outbox(便于重试/排障),再提交,避免分区阻塞。 func (p *AgentAsyncPipeline) consumeChatHistory(ctx context.Context, msg segmentkafka.Message, envelope kafkabus.Envelope) error { var payload model.ChatHistoryPersistPayload if err := json.Unmarshal(envelope.Payload, &payload); err != nil {