Version: 0.9.53.dev.260429
后端: 1. 流式思考链路从 raw reasoning_content 切到 `thinking_summary` 摘要协议,补齐摘要 prompt、digestor 与 Lite 压缩链路,plan / execute / fallback 统一改为“只出摘要、不透原始推理”,正文开始后自动关停摘要流。 2. thinking_summary 打通 timeline / SSE / outbox 持久化闭环,只落 detail_summary 与必要 metadata,并补强 seq 自检、冲突幂等识别与补 seq 回填,提升重放恢复稳定性。 3. 会话历史口径继续收紧,assistant 正文与时间线不再回写 raw reasoning_content,仅保留正文与思考耗时,避免刷新恢复时再次暴露内部推理文本。 前端: 4. 助手页开始接入 thinking_summary 实时流与历史恢复,补齐短摘要状态、长摘要折叠区、正文开流后自动收口,并增加调试入口用于协议联调与验收。 5. 当前前端助手页仍是残次过渡态,本版先以 thinking_summary 协议接通和基础渲染为主,样式、交互与细节体验暂未收平,下一版集中修复。 仓库: 6. 补充 thinking_summary 对接说明,明确 SSE 协议、timeline 恢复口径与 short/detail summary 的使用边界。
This commit is contained in:
@@ -353,12 +353,11 @@ func (s *AgentService) runNormalChatFlow(
|
||||
|
||||
// 6. 执行真正的流式聊天。
|
||||
// fullText 用于后续写 Redis/持久化,outChan 用于把流片段实时推给前端。
|
||||
fullText, reasoningText, reasoningDurationSeconds, streamUsage, streamErr := s.streamChatFallback(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan, assistantReasoningStartedAt)
|
||||
fullText, _, reasoningDurationSeconds, streamUsage, streamErr := s.streamChatFallback(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan, assistantReasoningStartedAt, userID, chatID)
|
||||
if streamErr != nil {
|
||||
pushErrNonBlocking(errChan, streamErr)
|
||||
return
|
||||
}
|
||||
assistantReasoning := mergeAgentReasoningText(assistantReasoningPrefix, reasoningText)
|
||||
|
||||
// 6.1 流式 usage 并入请求级 token 统计器:
|
||||
// 6.1.1 route/quicknote/taskquery 等 Generate 调用由 callback 自动累加;
|
||||
@@ -413,7 +412,7 @@ func (s *AgentService) runNormalChatFlow(
|
||||
// 8. 后置持久化(助手消息):
|
||||
// 8.1 先写 Redis,保证下一轮上下文可见;
|
||||
// 8.2 再异步可靠落库,失败通过 errChan 回传给上层。
|
||||
assistantMsg := &schema.Message{Role: schema.Assistant, Content: fullText, ReasoningContent: assistantReasoning}
|
||||
assistantMsg := &schema.Message{Role: schema.Assistant, Content: fullText}
|
||||
if reasoningDurationSeconds > 0 {
|
||||
assistantMsg.Extra = map[string]any{"reasoning_duration_seconds": reasoningDurationSeconds}
|
||||
}
|
||||
@@ -426,7 +425,7 @@ func (s *AgentService) runNormalChatFlow(
|
||||
ConversationID: chatID,
|
||||
Role: "assistant",
|
||||
Message: fullText,
|
||||
ReasoningContent: assistantReasoning,
|
||||
ReasoningContent: "",
|
||||
ReasoningDurationSeconds: reasoningDurationSeconds,
|
||||
// 口径B:助手消息记录“本轮请求总 token”。
|
||||
TokensConsumed: requestTotalTokens,
|
||||
@@ -434,9 +433,6 @@ func (s *AgentService) runNormalChatFlow(
|
||||
pushErrNonBlocking(errChan, saveErr)
|
||||
} else {
|
||||
assistantTimelinePayload := map[string]any{}
|
||||
if strings.TrimSpace(assistantReasoning) != "" {
|
||||
assistantTimelinePayload["reasoning_content"] = strings.TrimSpace(assistantReasoning)
|
||||
}
|
||||
if reasoningDurationSeconds > 0 {
|
||||
assistantTimelinePayload["reasoning_duration_seconds"] = reasoningDurationSeconds
|
||||
}
|
||||
|
||||
@@ -182,10 +182,12 @@ func (s *AgentService) runNewAgentGraph(
|
||||
planClient := infrallm.WrapArkClient(s.AIHub.Max)
|
||||
executeClient := infrallm.WrapArkClient(s.AIHub.Max)
|
||||
deliverClient := infrallm.WrapArkClient(s.AIHub.Pro)
|
||||
summaryClient := infrallm.WrapArkClient(s.AIHub.Lite)
|
||||
|
||||
// 8. 适配 SSE emitter。
|
||||
sseEmitter := newagentstream.NewSSEPayloadEmitter(outChan)
|
||||
chunkEmitter := newagentstream.NewChunkEmitter(sseEmitter, traceID, resolvedModelName, requestStart.Unix())
|
||||
chunkEmitter.SetReasoningSummaryFunc(s.makeReasoningSummaryFunc(summaryClient))
|
||||
// 关键卡片事件走统一时间线持久化,保证刷新后可重建。
|
||||
chunkEmitter.SetExtraEventHook(func(extra *newagentstream.OpenAIChunkExtra) {
|
||||
s.persistNewAgentTimelineExtraEvent(context.Background(), userID, chatID, extra)
|
||||
@@ -449,9 +451,11 @@ func (s *AgentService) persistNewAgentConversationMessage(
|
||||
}
|
||||
|
||||
persistMsg := &schema.Message{
|
||||
Role: msg.Role,
|
||||
Content: content,
|
||||
ReasoningContent: strings.TrimSpace(msg.ReasoningContent),
|
||||
Role: msg.Role,
|
||||
Content: content,
|
||||
// 可见消息持久化只保存正文;模型 raw reasoning 改由 thinking_summary 生成用户可见摘要,
|
||||
// 避免历史接口或时间线刷新时重新暴露内部思考文本。
|
||||
ReasoningContent: "",
|
||||
}
|
||||
if len(msg.Extra) > 0 {
|
||||
persistMsg.Extra = make(map[string]any, len(msg.Extra))
|
||||
@@ -498,9 +502,6 @@ func (s *AgentService) persistNewAgentConversationMessage(
|
||||
timelineKind = model.AgentTimelineKindAssistantText
|
||||
}
|
||||
timelinePayload := map[string]any{}
|
||||
if persistPayload.ReasoningContent != "" {
|
||||
timelinePayload["reasoning_content"] = persistPayload.ReasoningContent
|
||||
}
|
||||
if reasoningDurationSeconds > 0 {
|
||||
timelinePayload["reasoning_duration_seconds"] = reasoningDurationSeconds
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
infrallm "github.com/LoveLosita/smartflow/backend/infra/llm"
|
||||
newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt"
|
||||
newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream"
|
||||
"github.com/cloudwego/eino-ext/components/model/ark"
|
||||
@@ -25,6 +26,8 @@ func (s *AgentService) streamChatFallback(
|
||||
chatHistory []*schema.Message,
|
||||
outChan chan<- string,
|
||||
reasoningStartAt *time.Time,
|
||||
userID int,
|
||||
chatID string,
|
||||
) (string, string, int, *schema.TokenUsage, error) {
|
||||
messages := make([]*schema.Message, 0, len(chatHistory)+2)
|
||||
messages = append(messages, schema.SystemMessage(newagentprompt.SystemPrompt))
|
||||
@@ -46,6 +49,24 @@ func (s *AgentService) streamChatFallback(
|
||||
requestID := "chatcmpl-" + uuid.NewString()
|
||||
created := time.Now().Unix()
|
||||
firstChunk := true
|
||||
chunkEmitter := newagentstream.NewChunkEmitter(newagentstream.NewSSEPayloadEmitter(outChan), requestID, modelName, created)
|
||||
chunkEmitter.SetReasoningSummaryFunc(s.makeReasoningSummaryFunc(infrallm.WrapArkClient(s.AIHub.Lite)))
|
||||
chunkEmitter.SetExtraEventHook(func(extra *newagentstream.OpenAIChunkExtra) {
|
||||
s.persistNewAgentTimelineExtraEvent(context.Background(), userID, chatID, extra)
|
||||
})
|
||||
reasoningDigestor, digestorErr := chunkEmitter.NewReasoningDigestor(ctx, "fallback.speak", "fallback")
|
||||
if digestorErr != nil {
|
||||
return "", "", 0, nil, digestorErr
|
||||
}
|
||||
digestorClosed := false
|
||||
closeDigestor := func() {
|
||||
if reasoningDigestor == nil || digestorClosed {
|
||||
return
|
||||
}
|
||||
digestorClosed = true
|
||||
_ = reasoningDigestor.Close(ctx)
|
||||
}
|
||||
defer closeDigestor()
|
||||
|
||||
var localReasoningStartAt *time.Time
|
||||
if reasoningStartAt != nil && !reasoningStartAt.IsZero() {
|
||||
@@ -61,7 +82,6 @@ func (s *AgentService) streamChatFallback(
|
||||
defer reader.Close()
|
||||
|
||||
var fullText strings.Builder
|
||||
var reasoningText strings.Builder
|
||||
var tokenUsage *schema.TokenUsage
|
||||
for {
|
||||
chunk, recvErr := reader.Recv()
|
||||
@@ -85,26 +105,31 @@ func (s *AgentService) streamChatFallback(
|
||||
now := time.Now()
|
||||
reasoningEndAt = &now
|
||||
}
|
||||
fullText.WriteString(chunk.Content)
|
||||
reasoningText.WriteString(chunk.ReasoningContent)
|
||||
}
|
||||
|
||||
payload, payloadErr := newagentstream.ToOpenAIStream(chunk, requestID, modelName, created, firstChunk)
|
||||
if payloadErr != nil {
|
||||
return "", "", 0, nil, payloadErr
|
||||
}
|
||||
if payload != "" {
|
||||
outChan <- payload
|
||||
firstChunk = false
|
||||
// 1. fallback 链路同样不能透传 raw reasoning_content;
|
||||
// 2. 只把 reasoning 喂给摘要器,正文出现时立即关门丢弃后续摘要。
|
||||
if strings.TrimSpace(chunk.ReasoningContent) != "" && reasoningDigestor != nil {
|
||||
reasoningDigestor.Append(chunk.ReasoningContent)
|
||||
}
|
||||
if chunk.Content != "" {
|
||||
if reasoningDigestor != nil {
|
||||
reasoningDigestor.MarkContentStarted()
|
||||
}
|
||||
if emitErr := chunkEmitter.EmitAssistantText("fallback.speak", "fallback", chunk.Content, firstChunk); emitErr != nil {
|
||||
return "", "", 0, nil, emitErr
|
||||
}
|
||||
fullText.WriteString(chunk.Content)
|
||||
firstChunk = false
|
||||
}
|
||||
}
|
||||
}
|
||||
closeDigestor()
|
||||
|
||||
finishChunk, finishErr := newagentstream.ToOpenAIFinishStream(requestID, modelName, created)
|
||||
if finishErr != nil {
|
||||
if finishErr := chunkEmitter.EmitFinish("fallback.speak", "fallback"); finishErr != nil {
|
||||
return "", "", 0, nil, finishErr
|
||||
}
|
||||
outChan <- finishChunk
|
||||
outChan <- "[DONE]"
|
||||
if doneErr := chunkEmitter.EmitDone(); doneErr != nil {
|
||||
return "", "", 0, nil, doneErr
|
||||
}
|
||||
|
||||
reasoningDurationSeconds := 0
|
||||
if localReasoningStartAt != nil {
|
||||
@@ -117,5 +142,5 @@ func (s *AgentService) streamChatFallback(
|
||||
}
|
||||
}
|
||||
|
||||
return fullText.String(), reasoningText.String(), reasoningDurationSeconds, tokenUsage, nil
|
||||
return fullText.String(), "", reasoningDurationSeconds, tokenUsage, nil
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/LoveLosita/smartflow/backend/model"
|
||||
newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream"
|
||||
eventsvc "github.com/LoveLosita/smartflow/backend/service/events"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
@@ -63,13 +64,13 @@ func (s *AgentService) GetConversationTimeline(ctx context.Context, userID int,
|
||||
return normalizeConversationTimelineItems(items), nil
|
||||
}
|
||||
|
||||
// appendConversationTimelineEvent 统一追加单条时间线事件到 Redis + MySQL。
|
||||
// appendConversationTimelineEvent 统一追加单条时间线事件到 Redis + outbox。
|
||||
//
|
||||
// 步骤化说明:
|
||||
// 1. 先从 Redis INCR 分配 seq,若 Redis 异常则回退 DB MAX(seq)+1;
|
||||
// 2. 再写 MySQL,保证刷新时至少有权威持久化;
|
||||
// 3. 最后追加 Redis 时间线列表,失败只记日志,不影响主链路返回;
|
||||
// 4. 返回分配到的 seq,便于后续扩展在 SSE meta 回传顺序号。
|
||||
// 1. 先分配同会话内单调递增的 seq,优先走 Redis,Redis 不可用时回退 DB;
|
||||
// 2. 再把事件同步追加到 Redis timeline cache,保证刷新前的用户体验连续;
|
||||
// 3. 最后发布 outbox 事件异步落 MySQL,与 chat history 的可靠落库方式对齐;
|
||||
// 4. 未注入 eventPublisher 时走同步 MySQL fallback,方便本地极简环境启动。
|
||||
func (s *AgentService) appendConversationTimelineEvent(
|
||||
ctx context.Context,
|
||||
userID int,
|
||||
@@ -95,86 +96,260 @@ func (s *AgentService) appendConversationTimelineEvent(
|
||||
return 0, errors.New("invalid timeline event identity")
|
||||
}
|
||||
|
||||
normalizedContent, normalizedPayload, shouldPersist := normalizeConversationTimelinePersistMaterial(normalizedKind, normalizedContent, payload)
|
||||
if !shouldPersist {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
seq, err := s.nextConversationTimelineSeq(ctx, userID, normalizedChatID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
payloadJSON := marshalTimelinePayloadJSON(payload)
|
||||
persistPayload := model.ChatTimelinePersistPayload{
|
||||
persistPayload := (model.ChatTimelinePersistPayload{
|
||||
UserID: userID,
|
||||
ConversationID: normalizedChatID,
|
||||
Seq: seq,
|
||||
Kind: normalizedKind,
|
||||
Role: normalizedRole,
|
||||
Content: normalizedContent,
|
||||
PayloadJSON: payloadJSON,
|
||||
PayloadJSON: marshalTimelinePayloadJSON(normalizedPayload),
|
||||
TokensConsumed: tokensConsumed,
|
||||
}).Normalize()
|
||||
if s.eventPublisher != nil {
|
||||
now := time.Now()
|
||||
|
||||
// 1. 先写 Redis timeline cache,让刷新前的本地态和下一轮上下文都能立即看到这条事件。
|
||||
// 2. 再发布 outbox 事件,与 chat history 保持相同的“入队成功即返回”语义。
|
||||
// 3. 若 outbox 发布失败,这里返回 error 交给上层处理,不在本方法里偷偷回退成同步写库。
|
||||
s.appendConversationTimelineCacheNonBlocking(
|
||||
ctx,
|
||||
userID,
|
||||
normalizedChatID,
|
||||
buildConversationTimelineCacheItem(0, seq, normalizedKind, normalizedRole, normalizedContent, normalizedPayload, tokensConsumed, &now),
|
||||
)
|
||||
if err := eventsvc.PublishAgentTimelinePersistRequested(ctx, s.eventPublisher, persistPayload); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return seq, nil
|
||||
}
|
||||
return s.appendConversationTimelineEventSync(ctx, userID, normalizedChatID, persistPayload, normalizedPayload)
|
||||
}
|
||||
|
||||
// appendConversationTimelineEventSync 在未启用 outbox 时同步写 MySQL。
|
||||
//
|
||||
// 步骤化说明:
|
||||
// 1. 本方法只作为 eventPublisher 为空时的降级路径,保证本地环境不依赖总线;
|
||||
// 2. 若 seq 唯一键冲突,读取 DB 最大 seq 后补一个新序号,语义与 outbox 消费者保持一致;
|
||||
// 3. MySQL 写入成功后再追加 Redis cache,让缓存拿到数据库生成的 id/created_at。
|
||||
func (s *AgentService) appendConversationTimelineEventSync(
|
||||
ctx context.Context,
|
||||
userID int,
|
||||
chatID string,
|
||||
persistPayload model.ChatTimelinePersistPayload,
|
||||
payload map[string]any,
|
||||
) (int64, error) {
|
||||
eventID, eventCreatedAt, err := s.repo.SaveConversationTimelineEvent(ctx, persistPayload)
|
||||
if err != nil {
|
||||
// 1. 并发极端场景下(例如 Redis seq 分配失败后 DB 兜底)可能产生重复 seq;
|
||||
// 2. 这里做一次“读取最新 MAX(seq)+1”的重试,避免主链路直接失败;
|
||||
// 3. 重试仍失败则返回错误,让调用方感知真实落库失败。
|
||||
if !isTimelineSeqConflictError(err) {
|
||||
// 1. 这里的冲突通常来自 Redis seq key 过期或落后于 DB。
|
||||
// 2. 由于当前是同步写库链路,可以直接读取 DB 当前最大 seq 并补一个新序号。
|
||||
// 3. 若重试后仍失败,则把数据库错误原样抛给上层,避免悄悄吞掉真实问题。
|
||||
if !model.IsTimelineSeqConflictError(err) {
|
||||
return 0, err
|
||||
}
|
||||
maxSeq, seqErr := s.repo.GetConversationTimelineMaxSeq(ctx, userID, normalizedChatID)
|
||||
maxSeq, seqErr := s.repo.GetConversationTimelineMaxSeq(ctx, userID, chatID)
|
||||
if seqErr != nil {
|
||||
return 0, err
|
||||
return 0, seqErr
|
||||
}
|
||||
persistPayload.Seq = maxSeq + 1
|
||||
var retryErr error
|
||||
eventID, eventCreatedAt, retryErr = s.repo.SaveConversationTimelineEvent(ctx, persistPayload)
|
||||
if retryErr != nil {
|
||||
return 0, retryErr
|
||||
eventID, eventCreatedAt, err = s.repo.SaveConversationTimelineEvent(ctx, persistPayload)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
seq = persistPayload.Seq
|
||||
if s.cacheDAO != nil {
|
||||
if setErr := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, normalizedChatID, seq); setErr != nil {
|
||||
log.Printf("时间线 seq 冲突重试后回写 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, normalizedChatID, seq, setErr)
|
||||
if setErr := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, chatID, persistPayload.Seq); setErr != nil {
|
||||
log.Printf("回填时间线 seq 到 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, chatID, persistPayload.Seq, setErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.appendConversationTimelineCacheNonBlocking(
|
||||
ctx,
|
||||
userID,
|
||||
chatID,
|
||||
buildConversationTimelineCacheItem(
|
||||
eventID,
|
||||
persistPayload.Seq,
|
||||
persistPayload.Kind,
|
||||
persistPayload.Role,
|
||||
persistPayload.Content,
|
||||
payload,
|
||||
persistPayload.TokensConsumed,
|
||||
eventCreatedAt,
|
||||
),
|
||||
)
|
||||
return persistPayload.Seq, nil
|
||||
}
|
||||
|
||||
// appendConversationTimelineCacheNonBlocking 尽力把单条 timeline 事件追加到 Redis。
|
||||
//
|
||||
// 步骤化说明:
|
||||
// 1. 缓存失败不能反向影响主链路,因为 MySQL/outbox 才是最终可靠写入;
|
||||
// 2. 这里统一记录错误日志,方便排查 Redis 不可用或 payload 序列化问题;
|
||||
// 3. item 由调用方提前标准化,本方法不再二次裁剪业务字段。
|
||||
func (s *AgentService) appendConversationTimelineCacheNonBlocking(
|
||||
ctx context.Context,
|
||||
userID int,
|
||||
chatID string,
|
||||
item model.GetConversationTimelineItem,
|
||||
) {
|
||||
if s.cacheDAO == nil {
|
||||
return
|
||||
}
|
||||
if err := s.cacheDAO.AppendConversationTimelineEventToCache(ctx, userID, chatID, item); err != nil {
|
||||
log.Printf("追加时间线缓存失败 user=%d chat=%s seq=%d kind=%s err=%v", userID, chatID, item.Seq, item.Kind, err)
|
||||
}
|
||||
}
|
||||
|
||||
// nextConversationTimelineSeq 负责分配一条新的 timeline seq。
|
||||
//
|
||||
// 步骤化说明:
|
||||
// 1. 优先走 Redis INCR,避免所有事件都串行依赖 MySQL;
|
||||
// 2. 再用 DB MAX(seq) 做一次自检,尽量把“Redis key 过期/落后”在写入前提前修正;
|
||||
// 3. 若 Redis 不可用,则直接回退到 DB MAX(seq)+1,并把结果尽力回填回 Redis。
|
||||
func (s *AgentService) nextConversationTimelineSeq(ctx context.Context, userID int, chatID string) (int64, error) {
|
||||
if s == nil || s.repo == nil {
|
||||
return 0, errors.New("agent service is not initialized")
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
normalizedChatID := strings.TrimSpace(chatID)
|
||||
if userID <= 0 || normalizedChatID == "" {
|
||||
return 0, errors.New("invalid timeline seq identity")
|
||||
}
|
||||
|
||||
if s.cacheDAO == nil {
|
||||
return s.nextConversationTimelineSeqFromDB(ctx, userID, normalizedChatID)
|
||||
}
|
||||
|
||||
candidateSeq, err := s.cacheDAO.IncrConversationTimelineSeq(ctx, userID, normalizedChatID)
|
||||
if err != nil {
|
||||
log.Printf("分配时间线 seq 时 Redis INCR 失败,回退 DB user=%d chat=%s err=%v", userID, normalizedChatID, err)
|
||||
return s.nextConversationTimelineSeqFromDB(ctx, userID, normalizedChatID)
|
||||
}
|
||||
|
||||
// 1. Redis key 缺失时,INCR 常会从 1 重新开始,容易和已有 DB 记录撞 seq。
|
||||
// 2. 这里额外对照一次 DB 最大 seq,把明显落后的顺序号提前修正,降低 outbox 消费时的补 seq 概率。
|
||||
// 3. 该自检不会看到“尚未消费到 MySQL 的新 outbox 事件”,因此真正的极端并发兜底仍由消费者承担。
|
||||
maxSeq, err := s.repo.GetConversationTimelineMaxSeq(ctx, userID, normalizedChatID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if candidateSeq > maxSeq {
|
||||
return candidateSeq, nil
|
||||
}
|
||||
|
||||
repairedSeq := maxSeq + 1
|
||||
if err = s.cacheDAO.SetConversationTimelineSeq(ctx, userID, normalizedChatID, repairedSeq); err != nil {
|
||||
log.Printf("修正时间线 seq 到 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, normalizedChatID, repairedSeq, err)
|
||||
}
|
||||
return repairedSeq, nil
|
||||
}
|
||||
|
||||
func (s *AgentService) nextConversationTimelineSeqFromDB(ctx context.Context, userID int, chatID string) (int64, error) {
|
||||
maxSeq, err := s.repo.GetConversationTimelineMaxSeq(ctx, userID, chatID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
nextSeq := maxSeq + 1
|
||||
if s.cacheDAO != nil {
|
||||
now := time.Now()
|
||||
item := model.GetConversationTimelineItem{
|
||||
ID: eventID,
|
||||
Seq: seq,
|
||||
Kind: normalizedKind,
|
||||
Role: normalizedRole,
|
||||
Content: normalizedContent,
|
||||
Payload: cloneTimelinePayload(payload),
|
||||
TokensConsumed: tokensConsumed,
|
||||
}
|
||||
if eventCreatedAt != nil {
|
||||
item.CreatedAt = eventCreatedAt
|
||||
} else {
|
||||
item.CreatedAt = &now
|
||||
}
|
||||
if err := s.cacheDAO.AppendConversationTimelineEventToCache(ctx, userID, normalizedChatID, item); err != nil {
|
||||
log.Printf("追加会话时间线缓存失败 user=%d chat=%s seq=%d kind=%s err=%v", userID, normalizedChatID, seq, normalizedKind, err)
|
||||
if setErr := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, chatID, nextSeq); setErr != nil {
|
||||
log.Printf("回填时间线 seq 到 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, chatID, nextSeq, setErr)
|
||||
}
|
||||
}
|
||||
return seq, nil
|
||||
return nextSeq, nil
|
||||
}
|
||||
|
||||
func isTimelineSeqConflictError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
// normalizeConversationTimelinePersistMaterial 负责把 timeline 原始输入收敛成“可缓存 + 可持久化”的口径。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 对普通事件只做浅拷贝,避免调用方后续继续改 map 影响已入队 payload;
|
||||
// 2. 对 thinking_summary 只保留 detail_summary 与必要 metadata,明确剔除 short_summary;
|
||||
// 3. 若 thinking_summary 最终没有 detail_summary,则返回 shouldPersist=false,仅保留实时 SSE 展示,不进入 timeline。
|
||||
func normalizeConversationTimelinePersistMaterial(kind string, content string, payload map[string]any) (string, map[string]any, bool) {
|
||||
normalizedKind := strings.ToLower(strings.TrimSpace(kind))
|
||||
normalizedContent := strings.TrimSpace(content)
|
||||
if normalizedKind != model.AgentTimelineKindThinkingSummary {
|
||||
return normalizedContent, cloneTimelinePayload(payload), true
|
||||
}
|
||||
text := strings.ToLower(err.Error())
|
||||
return strings.Contains(text, "duplicate") && strings.Contains(text, "uk_timeline_user_chat_seq")
|
||||
return sanitizeThinkingSummaryPersistMaterial(normalizedContent, payload)
|
||||
}
|
||||
|
||||
// persistNewAgentTimelineExtraEvent 把 SSE extra 卡片事件写入时间线。
|
||||
func sanitizeThinkingSummaryPersistMaterial(content string, payload map[string]any) (string, map[string]any, bool) {
|
||||
detailSummary := readTimelinePayloadString(payload, "detail_summary")
|
||||
if detailSummary == "" {
|
||||
detailSummary = strings.TrimSpace(content)
|
||||
}
|
||||
if detailSummary == "" {
|
||||
return "", nil, false
|
||||
}
|
||||
|
||||
sanitized := make(map[string]any)
|
||||
copyTrimmedTimelinePayloadField(payload, sanitized, "stage")
|
||||
copyTrimmedTimelinePayloadField(payload, sanitized, "block_id")
|
||||
copyTrimmedTimelinePayloadField(payload, sanitized, "display_mode")
|
||||
copyTimelinePayloadFieldIfPresent(payload, sanitized, "summary_seq")
|
||||
copyTimelinePayloadFieldIfPresent(payload, sanitized, "final")
|
||||
copyTimelinePayloadFieldIfPresent(payload, sanitized, "duration_seconds")
|
||||
sanitized["detail_summary"] = detailSummary
|
||||
|
||||
return detailSummary, sanitized, true
|
||||
}
|
||||
|
||||
func copyTrimmedTimelinePayloadField(src map[string]any, dst map[string]any, key string) {
|
||||
if len(src) == 0 || dst == nil {
|
||||
return
|
||||
}
|
||||
value, ok := src[key]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
text, ok := value.(string)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
trimmed := strings.TrimSpace(text)
|
||||
if trimmed == "" {
|
||||
return
|
||||
}
|
||||
dst[key] = trimmed
|
||||
}
|
||||
|
||||
func copyTimelinePayloadFieldIfPresent(src map[string]any, dst map[string]any, key string) {
|
||||
if len(src) == 0 || dst == nil {
|
||||
return
|
||||
}
|
||||
value, ok := src[key]
|
||||
if !ok || value == nil {
|
||||
return
|
||||
}
|
||||
dst[key] = value
|
||||
}
|
||||
|
||||
// persistNewAgentTimelineExtraEvent 把 SSE extra 里的结构化事件写入时间线。
|
||||
//
|
||||
// 说明:
|
||||
// 1. 只持久化真正需要刷新后重建的卡片事件;
|
||||
// 2. status/reasoning/finish 等临时过程信号不落时间线;
|
||||
// 3. 失败只记日志,不中断当前 SSE 输出。
|
||||
func (s *AgentService) persistNewAgentTimelineExtraEvent(ctx context.Context, userID int, chatID string, extra *newagentstream.OpenAIChunkExtra) {
|
||||
// 1. 只持久化刷新后仍需重建的业务事件;
|
||||
// 2. short_summary 这类临时展示信息会在 appendConversationTimelineEvent 内被过滤掉;
|
||||
// 3. 失败只记日志,不反向打断当前 SSE 输出。
|
||||
func (s *AgentService) persistNewAgentTimelineExtraEvent(
|
||||
ctx context.Context,
|
||||
userID int,
|
||||
chatID string,
|
||||
extra *newagentstream.OpenAIChunkExtra,
|
||||
) {
|
||||
kind, ok := mapTimelineKindFromStreamExtra(extra)
|
||||
if !ok {
|
||||
return
|
||||
@@ -193,30 +368,33 @@ func (s *AgentService) persistNewAgentTimelineExtraEvent(ctx context.Context, us
|
||||
buildTimelinePayloadFromStreamExtra(extra),
|
||||
0,
|
||||
); err != nil {
|
||||
log.Printf("写入 newAgent 卡片时间线失败 user=%d chat=%s kind=%s err=%v", userID, chatID, kind, err)
|
||||
log.Printf("写入 newAgent 时间线事件失败 user=%d chat=%s kind=%s err=%v", userID, chatID, kind, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *AgentService) nextConversationTimelineSeq(ctx context.Context, userID int, chatID string) (int64, error) {
|
||||
if s.cacheDAO != nil {
|
||||
seq, err := s.cacheDAO.IncrConversationTimelineSeq(ctx, userID, chatID)
|
||||
if err == nil {
|
||||
return seq, nil
|
||||
}
|
||||
log.Printf("会话时间线 seq Redis 分配失败,回退 DB user=%d chat=%s err=%v", userID, chatID, err)
|
||||
func buildConversationTimelineCacheItem(
|
||||
eventID int64,
|
||||
seq int64,
|
||||
kind string,
|
||||
role string,
|
||||
content string,
|
||||
payload map[string]any,
|
||||
tokensConsumed int,
|
||||
createdAt *time.Time,
|
||||
) model.GetConversationTimelineItem {
|
||||
item := model.GetConversationTimelineItem{
|
||||
ID: eventID,
|
||||
Seq: seq,
|
||||
Kind: kind,
|
||||
Role: role,
|
||||
Content: content,
|
||||
Payload: cloneTimelinePayload(payload),
|
||||
TokensConsumed: tokensConsumed,
|
||||
}
|
||||
|
||||
maxSeq, err := s.repo.GetConversationTimelineMaxSeq(ctx, userID, chatID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
if createdAt != nil {
|
||||
item.CreatedAt = createdAt
|
||||
}
|
||||
seq := maxSeq + 1
|
||||
if s.cacheDAO != nil {
|
||||
if err := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, chatID, seq); err != nil {
|
||||
log.Printf("会话时间线 seq 回填 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, chatID, seq, err)
|
||||
}
|
||||
}
|
||||
return seq, nil
|
||||
return item
|
||||
}
|
||||
|
||||
func buildConversationTimelineItemsFromDB(events []model.AgentTimelineEvent) []model.GetConversationTimelineItem {
|
||||
@@ -296,7 +474,8 @@ func canonicalizeTimelineKind(kind string, role string) string {
|
||||
model.AgentTimelineKindToolResult,
|
||||
model.AgentTimelineKindConfirmRequest,
|
||||
model.AgentTimelineKindBusinessCard,
|
||||
model.AgentTimelineKindScheduleCompleted:
|
||||
model.AgentTimelineKindScheduleCompleted,
|
||||
model.AgentTimelineKindThinkingSummary:
|
||||
return normalizedKind
|
||||
case "text", "message", "query":
|
||||
if normalizedRole == "user" {
|
||||
@@ -337,6 +516,9 @@ func mapTimelineKindFromStreamExtra(extra *newagentstream.OpenAIChunkExtra) (str
|
||||
if extra == nil {
|
||||
return "", false
|
||||
}
|
||||
if isThinkingSummaryStreamExtra(extra) {
|
||||
return model.AgentTimelineKindThinkingSummary, true
|
||||
}
|
||||
switch extra.Kind {
|
||||
case newagentstream.StreamExtraKindToolCall:
|
||||
return model.AgentTimelineKindToolCall, true
|
||||
@@ -357,6 +539,9 @@ func buildTimelinePayloadFromStreamExtra(extra *newagentstream.OpenAIChunkExtra)
|
||||
if extra == nil {
|
||||
return nil
|
||||
}
|
||||
if isThinkingSummaryStreamExtra(extra) {
|
||||
return buildThinkingSummaryTimelinePayload(extra)
|
||||
}
|
||||
payload := map[string]any{
|
||||
"stage": strings.TrimSpace(extra.Stage),
|
||||
"block_id": strings.TrimSpace(extra.BlockID),
|
||||
@@ -400,6 +585,67 @@ func buildTimelinePayloadFromStreamExtra(extra *newagentstream.OpenAIChunkExtra)
|
||||
return payload
|
||||
}
|
||||
|
||||
func isThinkingSummaryStreamExtra(extra *newagentstream.OpenAIChunkExtra) bool {
|
||||
if extra == nil {
|
||||
return false
|
||||
}
|
||||
return strings.EqualFold(strings.TrimSpace(string(extra.Kind)), model.AgentTimelineKindThinkingSummary)
|
||||
}
|
||||
|
||||
func buildThinkingSummaryTimelinePayload(extra *newagentstream.OpenAIChunkExtra) map[string]any {
|
||||
payload := map[string]any{
|
||||
"stage": strings.TrimSpace(extra.Stage),
|
||||
"block_id": strings.TrimSpace(extra.BlockID),
|
||||
"display_mode": string(extra.DisplayMode),
|
||||
}
|
||||
|
||||
if extra.ThinkingSummary != nil {
|
||||
summary := extra.ThinkingSummary
|
||||
payload["summary_seq"] = summary.SummarySeq
|
||||
payload["final"] = summary.Final
|
||||
payload["duration_seconds"] = summary.DurationSeconds
|
||||
if detailSummary := strings.TrimSpace(summary.DetailSummary); detailSummary != "" {
|
||||
payload["detail_summary"] = detailSummary
|
||||
}
|
||||
return payload
|
||||
}
|
||||
|
||||
if detailSummary := readTimelineExtraMetaString(extra.Meta, "detail_summary"); detailSummary != "" {
|
||||
payload["detail_summary"] = detailSummary
|
||||
}
|
||||
return payload
|
||||
}
|
||||
|
||||
func readTimelineExtraMetaString(meta map[string]any, key string) string {
|
||||
if len(meta) == 0 {
|
||||
return ""
|
||||
}
|
||||
raw, ok := meta[key]
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
text, ok := raw.(string)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
return strings.TrimSpace(text)
|
||||
}
|
||||
|
||||
func readTimelinePayloadString(payload map[string]any, key string) string {
|
||||
if len(payload) == 0 {
|
||||
return ""
|
||||
}
|
||||
raw, ok := payload[key]
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
text, ok := raw.(string)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
return strings.TrimSpace(text)
|
||||
}
|
||||
|
||||
func cloneStreamBusinessCard(card *newagentstream.StreamBusinessCardExtra) map[string]any {
|
||||
if card == nil {
|
||||
return nil
|
||||
|
||||
112
backend/service/agentsvc/reasoning_summary.go
Normal file
112
backend/service/agentsvc/reasoning_summary.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package agentsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
infrallm "github.com/LoveLosita/smartflow/backend/infra/llm"
|
||||
newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt"
|
||||
newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream"
|
||||
)
|
||||
|
||||
const reasoningSummaryMaxTokens = 700
|
||||
|
||||
type reasoningSummaryLLMResponse struct {
|
||||
ShortSummary string `json:"short_summary"`
|
||||
DetailSummary string `json:"detail_summary"`
|
||||
}
|
||||
|
||||
// makeReasoningSummaryFunc 把便宜模型封装成 stream 层可注入的摘要函数。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. service 层负责选择模型与 prompt,stream 层只负责调度和闸门;
|
||||
// 2. 这里不持久化摘要,持久化统一走 ChunkEmitter 的 extra hook;
|
||||
// 3. 摘要失败时返回 error,由 ReasoningDigestor 吞掉并等待下一次水位线/Flush 兜底。
|
||||
func (s *AgentService) makeReasoningSummaryFunc(client *infrallm.Client) newagentstream.ReasoningSummaryFunc {
|
||||
if client == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return func(ctx context.Context, input newagentstream.ReasoningSummaryInput) (newagentstream.StreamThinkingSummaryExtra, error) {
|
||||
previousSummary := ""
|
||||
if input.PreviousSummary != nil {
|
||||
previousSummary = input.PreviousSummary.DetailSummary
|
||||
if strings.TrimSpace(previousSummary) == "" {
|
||||
previousSummary = input.PreviousSummary.ShortSummary
|
||||
}
|
||||
}
|
||||
|
||||
messages := newagentprompt.BuildReasoningSummaryMessages(newagentprompt.ReasoningSummaryPromptInput{
|
||||
FullReasoning: input.FullReasoning,
|
||||
DeltaReasoning: input.DeltaReasoning,
|
||||
PreviousSummary: previousSummary,
|
||||
CandidateSeq: input.CandidateSeq,
|
||||
Final: input.Final,
|
||||
DurationSeconds: input.DurationSeconds,
|
||||
})
|
||||
|
||||
resp, rawResult, err := infrallm.GenerateJSON[reasoningSummaryLLMResponse](
|
||||
ctx,
|
||||
client,
|
||||
messages,
|
||||
infrallm.GenerateOptions{
|
||||
Temperature: 0.1,
|
||||
MaxTokens: reasoningSummaryMaxTokens,
|
||||
Thinking: infrallm.ThinkingModeDisabled,
|
||||
Metadata: map[string]any{
|
||||
"stage": "reasoning_summary",
|
||||
"candidate_seq": input.CandidateSeq,
|
||||
"final": input.Final,
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("[WARN] reasoning 摘要模型调用失败 seq=%d final=%v err=%v raw=%s",
|
||||
input.CandidateSeq,
|
||||
input.Final,
|
||||
err,
|
||||
truncateReasoningSummaryRaw(rawResult),
|
||||
)
|
||||
return newagentstream.StreamThinkingSummaryExtra{}, err
|
||||
}
|
||||
|
||||
summary := newagentstream.StreamThinkingSummaryExtra{
|
||||
ShortSummary: strings.TrimSpace(resp.ShortSummary),
|
||||
DetailSummary: limitReasoningDetailSummary(
|
||||
resp.DetailSummary,
|
||||
newagentprompt.ReasoningSummaryDetailRuneLimit(input.FullReasoning, input.DeltaReasoning),
|
||||
),
|
||||
}
|
||||
if summary.ShortSummary == "" && summary.DetailSummary == "" {
|
||||
return newagentstream.StreamThinkingSummaryExtra{}, errors.New("reasoning 摘要模型返回空摘要")
|
||||
}
|
||||
return summary, nil
|
||||
}
|
||||
}
|
||||
|
||||
func limitReasoningDetailSummary(text string, maxRunes int) string {
|
||||
text = strings.TrimSpace(text)
|
||||
if text == "" || maxRunes <= 0 {
|
||||
return text
|
||||
}
|
||||
|
||||
runes := []rune(text)
|
||||
if len(runes) <= maxRunes {
|
||||
return text
|
||||
}
|
||||
return string(runes[:maxRunes])
|
||||
}
|
||||
|
||||
func truncateReasoningSummaryRaw(raw *infrallm.TextResult) string {
|
||||
if raw == nil {
|
||||
return ""
|
||||
}
|
||||
text := strings.TrimSpace(raw.Text)
|
||||
runes := []rune(text)
|
||||
if len(runes) <= 200 {
|
||||
return text
|
||||
}
|
||||
return string(runes[:200]) + "..."
|
||||
}
|
||||
326
backend/service/events/agent_timeline_persist.go
Normal file
326
backend/service/events/agent_timeline_persist.go
Normal file
@@ -0,0 +1,326 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"github.com/LoveLosita/smartflow/backend/dao"
|
||||
kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka"
|
||||
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
|
||||
"github.com/LoveLosita/smartflow/backend/model"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
const EventTypeAgentTimelinePersistRequested = "agent.timeline.persist.requested"
|
||||
|
||||
// RegisterAgentTimelinePersistHandler 注册“会话时间线持久化”消费者处理器。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 只负责 timeline 事件,不处理 chat_history 等其他业务消息;
|
||||
// 2. 只负责注册 handler,不负责总线启停;
|
||||
// 3. 通过 outbox 通用消费事务,把“时间线写库 + consumed 推进”放进同一事务;
|
||||
// 4. 若遇到 seq 唯一键冲突,会先判定是否属于重放幂等,再决定是否补新 seq 并回填 Redis。
|
||||
func RegisterAgentTimelinePersistHandler(
|
||||
bus *outboxinfra.EventBus,
|
||||
outboxRepo *outboxinfra.Repository,
|
||||
agentRepo *dao.AgentDAO,
|
||||
cacheDAO *dao.CacheDAO,
|
||||
) error {
|
||||
// 1. 依赖校验:缺少任一关键依赖都无法安全消费消息。
|
||||
if bus == nil {
|
||||
return errors.New("event bus is nil")
|
||||
}
|
||||
if outboxRepo == nil {
|
||||
return errors.New("outbox repository is nil")
|
||||
}
|
||||
if agentRepo == nil {
|
||||
return errors.New("agent repo is nil")
|
||||
}
|
||||
|
||||
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
|
||||
var payload model.ChatTimelinePersistPayload
|
||||
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
|
||||
// 1. payload 无法反序列化属于不可恢复错误,直接标 dead,避免无意义重试。
|
||||
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析时间线持久化载荷失败: "+unmarshalErr.Error())
|
||||
return nil
|
||||
}
|
||||
|
||||
payload = payload.Normalize()
|
||||
if !payload.HasValidIdentity() {
|
||||
// 2. 这里只校验“能否唯一定位一条 timeline 记录”的最小字段集合。
|
||||
// 3. content / payload_json 是否为空由事件类型自行决定,不在这里一刀切限制。
|
||||
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "时间线持久化载荷非法: user_id/conversation_id/seq/kind 非法")
|
||||
return nil
|
||||
}
|
||||
|
||||
refreshCache := false
|
||||
finalSeq := payload.Seq
|
||||
|
||||
// 4. 统一走 outbox 消费事务入口,保证“业务写入成功 -> consumed”原子一致。
|
||||
err := outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
|
||||
finalPayload, repaired, persistErr := persistConversationTimelineEventInTx(ctx, tx, agentRepo.WithTx(tx), payload)
|
||||
if persistErr != nil {
|
||||
return persistErr
|
||||
}
|
||||
refreshCache = repaired
|
||||
finalSeq = finalPayload.Seq
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 5. 只有发生“seq 冲突且补了新 seq”时,才需要重建 Redis timeline。
|
||||
// 5.1 原因:主链路已经先写过 Redis,常规成功无需重复回写。
|
||||
// 5.2 若发生补 seq,不重建会留下旧 seq 的缓存残影,刷新后顺序会错乱。
|
||||
// 5.3 缓存重建失败只记日志,不能反向把已 consumed 的 outbox 回滚。
|
||||
if refreshCache {
|
||||
if refreshErr := rebuildConversationTimelineCache(ctx, agentRepo, cacheDAO, payload.UserID, payload.ConversationID, finalSeq); refreshErr != nil {
|
||||
log.Printf("重建时间线缓存失败 user=%d chat=%s seq=%d err=%v", payload.UserID, payload.ConversationID, finalSeq, refreshErr)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return bus.RegisterEventHandler(EventTypeAgentTimelinePersistRequested, handler)
|
||||
}
|
||||
|
||||
// PublishAgentTimelinePersistRequested 发布“会话时间线持久化请求”事件。
|
||||
//
|
||||
// 设计目的:
|
||||
// 1. 让业务层只传 DTO,不重复拼事件元数据;
|
||||
// 2. 统一以 conversation_id 作为 MessageKey / AggregateID,尽量降低同会话乱序概率;
|
||||
// 3. 发布失败显式返回 error,由调用方决定是否中断主链路。
|
||||
func PublishAgentTimelinePersistRequested(
|
||||
ctx context.Context,
|
||||
publisher outboxinfra.EventPublisher,
|
||||
payload model.ChatTimelinePersistPayload,
|
||||
) error {
|
||||
if publisher == nil {
|
||||
return errors.New("event publisher is nil")
|
||||
}
|
||||
|
||||
payload = payload.Normalize()
|
||||
if !payload.HasValidIdentity() {
|
||||
return errors.New("invalid timeline persist payload")
|
||||
}
|
||||
|
||||
return publisher.Publish(ctx, outboxinfra.PublishRequest{
|
||||
EventType: EventTypeAgentTimelinePersistRequested,
|
||||
EventVersion: outboxinfra.DefaultEventVersion,
|
||||
MessageKey: payload.ConversationID,
|
||||
AggregateID: payload.ConversationID,
|
||||
Payload: payload,
|
||||
})
|
||||
}
|
||||
|
||||
// persistConversationTimelineEventInTx 负责在单个事务里完成 timeline 事件写库。
|
||||
//
|
||||
// 步骤化说明:
|
||||
// 1. 先按 payload 原始 seq 尝试写入;
|
||||
// 2. 若命中 seq 唯一键冲突,先查询同 seq 记录,判断是否属于“重放同一事件”;
|
||||
// 3. 若不是重放,而是 Redis seq 漂移导致的新旧事件撞 seq,则用 max(seq)+1 重新分配;
|
||||
// 4. 最多修复 3 次,避免异常数据把消费者拖进无限循环。
|
||||
func persistConversationTimelineEventInTx(
|
||||
ctx context.Context,
|
||||
tx *gorm.DB,
|
||||
agentRepo *dao.AgentDAO,
|
||||
payload model.ChatTimelinePersistPayload,
|
||||
) (model.ChatTimelinePersistPayload, bool, error) {
|
||||
if tx == nil {
|
||||
return payload, false, errors.New("transaction is nil")
|
||||
}
|
||||
if agentRepo == nil {
|
||||
return payload, false, errors.New("agent repo is nil")
|
||||
}
|
||||
|
||||
working := payload.Normalize()
|
||||
repaired := false
|
||||
|
||||
for attempt := 0; attempt < 3; attempt++ {
|
||||
if _, _, err := agentRepo.SaveConversationTimelineEvent(ctx, working); err == nil {
|
||||
return working, repaired, nil
|
||||
} else if !model.IsTimelineSeqConflictError(err) {
|
||||
return working, repaired, err
|
||||
}
|
||||
|
||||
// 1. 先判断是否属于“同一条事件被重复消费”。
|
||||
// 2. 若库里已有记录且字段完全一致,说明前一次其实已经成功落库,本次可视为幂等成功。
|
||||
// 3. 若字段不一致,再进入“补新 seq”分支,避免把真正的新事件吞掉。
|
||||
existing, findErr := findConversationTimelineEventBySeq(ctx, tx, working.UserID, working.ConversationID, working.Seq)
|
||||
if findErr == nil && working.MatchesStoredEvent(existing) {
|
||||
return working, repaired, nil
|
||||
}
|
||||
if findErr != nil && !errors.Is(findErr, gorm.ErrRecordNotFound) {
|
||||
return working, repaired, findErr
|
||||
}
|
||||
|
||||
maxSeq, maxErr := loadConversationTimelineMaxSeq(ctx, tx, working.UserID, working.ConversationID)
|
||||
if maxErr != nil {
|
||||
return working, repaired, maxErr
|
||||
}
|
||||
working.Seq = maxSeq + 1
|
||||
repaired = true
|
||||
}
|
||||
|
||||
return working, repaired, fmt.Errorf("timeline seq repair exceeded limit user=%d chat=%s", working.UserID, working.ConversationID)
|
||||
}
|
||||
|
||||
func findConversationTimelineEventBySeq(
|
||||
ctx context.Context,
|
||||
tx *gorm.DB,
|
||||
userID int,
|
||||
conversationID string,
|
||||
seq int64,
|
||||
) (model.AgentTimelineEvent, error) {
|
||||
var event model.AgentTimelineEvent
|
||||
err := tx.WithContext(ctx).
|
||||
Where("user_id = ? AND chat_id = ? AND seq = ?", userID, strings.TrimSpace(conversationID), seq).
|
||||
Take(&event).Error
|
||||
return event, err
|
||||
}
|
||||
|
||||
func loadConversationTimelineMaxSeq(
|
||||
ctx context.Context,
|
||||
tx *gorm.DB,
|
||||
userID int,
|
||||
conversationID string,
|
||||
) (int64, error) {
|
||||
var maxSeq int64
|
||||
err := tx.WithContext(ctx).
|
||||
Model(&model.AgentTimelineEvent{}).
|
||||
Where("user_id = ? AND chat_id = ?", userID, strings.TrimSpace(conversationID)).
|
||||
Select("COALESCE(MAX(seq), 0)").
|
||||
Scan(&maxSeq).Error
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return maxSeq, nil
|
||||
}
|
||||
|
||||
// rebuildConversationTimelineCache 在“补新 seq”后重建 Redis timeline 缓存。
|
||||
//
|
||||
// 说明:
|
||||
// 1. 这里只在缓存存在时执行;未接 Redis 的环境直接跳过即可;
|
||||
// 2. 需要整表重建而不是只 append 一条,因为旧缓存里已经存在错误 seq 的事件;
|
||||
// 3. 这里不抽到 agentsvc 复用,是因为 events 不能反向依赖 service,否则会形成循环依赖。
|
||||
func rebuildConversationTimelineCache(
|
||||
ctx context.Context,
|
||||
agentRepo *dao.AgentDAO,
|
||||
cacheDAO *dao.CacheDAO,
|
||||
userID int,
|
||||
conversationID string,
|
||||
finalSeq int64,
|
||||
) error {
|
||||
if cacheDAO == nil || agentRepo == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
events, err := agentRepo.ListConversationTimelineEvents(ctx, userID, conversationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
items := buildConversationTimelineCacheItems(events)
|
||||
if err = cacheDAO.SetConversationTimelineToCache(ctx, userID, conversationID, items); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(items) > 0 {
|
||||
finalSeq = items[len(items)-1].Seq
|
||||
}
|
||||
return cacheDAO.SetConversationTimelineSeq(ctx, userID, conversationID, finalSeq)
|
||||
}
|
||||
|
||||
func buildConversationTimelineCacheItems(events []model.AgentTimelineEvent) []model.GetConversationTimelineItem {
|
||||
if len(events) == 0 {
|
||||
return make([]model.GetConversationTimelineItem, 0)
|
||||
}
|
||||
|
||||
items := make([]model.GetConversationTimelineItem, 0, len(events))
|
||||
for _, event := range events {
|
||||
item := model.GetConversationTimelineItem{
|
||||
ID: event.ID,
|
||||
Seq: event.Seq,
|
||||
Kind: strings.TrimSpace(event.Kind),
|
||||
TokensConsumed: event.TokensConsumed,
|
||||
CreatedAt: event.CreatedAt,
|
||||
}
|
||||
if event.Role != nil {
|
||||
item.Role = strings.TrimSpace(*event.Role)
|
||||
}
|
||||
if event.Content != nil {
|
||||
item.Content = strings.TrimSpace(*event.Content)
|
||||
}
|
||||
if event.Payload != nil {
|
||||
var payload map[string]any
|
||||
if err := json.Unmarshal([]byte(strings.TrimSpace(*event.Payload)), &payload); err == nil && len(payload) > 0 {
|
||||
item.Payload = payload
|
||||
}
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
return normalizeConversationTimelineCacheItems(items)
|
||||
}
|
||||
|
||||
func normalizeConversationTimelineCacheItems(items []model.GetConversationTimelineItem) []model.GetConversationTimelineItem {
|
||||
if len(items) == 0 {
|
||||
return make([]model.GetConversationTimelineItem, 0)
|
||||
}
|
||||
|
||||
normalized := make([]model.GetConversationTimelineItem, 0, len(items))
|
||||
for _, item := range items {
|
||||
role := strings.ToLower(strings.TrimSpace(item.Role))
|
||||
kind := canonicalizeConversationTimelineKind(item.Kind, role)
|
||||
|
||||
if kind == "" {
|
||||
switch role {
|
||||
case "user":
|
||||
kind = model.AgentTimelineKindUserText
|
||||
case "assistant":
|
||||
kind = model.AgentTimelineKindAssistantText
|
||||
}
|
||||
}
|
||||
if role == "" {
|
||||
switch kind {
|
||||
case model.AgentTimelineKindUserText:
|
||||
role = "user"
|
||||
case model.AgentTimelineKindAssistantText:
|
||||
role = "assistant"
|
||||
}
|
||||
}
|
||||
|
||||
item.Kind = kind
|
||||
item.Role = role
|
||||
normalized = append(normalized, item)
|
||||
}
|
||||
return normalized
|
||||
}
|
||||
|
||||
func canonicalizeConversationTimelineKind(kind string, role string) string {
|
||||
normalizedKind := strings.ToLower(strings.TrimSpace(kind))
|
||||
normalizedRole := strings.ToLower(strings.TrimSpace(role))
|
||||
|
||||
switch normalizedKind {
|
||||
case model.AgentTimelineKindUserText,
|
||||
model.AgentTimelineKindAssistantText,
|
||||
model.AgentTimelineKindToolCall,
|
||||
model.AgentTimelineKindToolResult,
|
||||
model.AgentTimelineKindConfirmRequest,
|
||||
model.AgentTimelineKindBusinessCard,
|
||||
model.AgentTimelineKindScheduleCompleted,
|
||||
model.AgentTimelineKindThinkingSummary:
|
||||
return normalizedKind
|
||||
case "text", "message", "query":
|
||||
if normalizedRole == "user" {
|
||||
return model.AgentTimelineKindUserText
|
||||
}
|
||||
if normalizedRole == "assistant" {
|
||||
return model.AgentTimelineKindAssistantText
|
||||
}
|
||||
}
|
||||
return normalizedKind
|
||||
}
|
||||
Reference in New Issue
Block a user