Version: 0.8.2.dev.260327

后端:
1.修复了消息重试链路的相关问题
2.新增redis乐观写消息机制,即使前端在重试完消息后立刻刷新,也能在redis里面读到数据
前端:
1.修了一些bug
This commit is contained in:
LoveLosita
2026-03-27 20:39:05 +08:00
parent ddb0d9cc17
commit 5fc9548420
13 changed files with 1011 additions and 219 deletions

View File

@@ -223,7 +223,10 @@ func (a *AgentDAO) EnsureRetryGroupSeed(ctx context.Context, userID int, chatID,
}
return a.db.WithContext(ctx).
Model(&model.ChatHistory{}).
Model(&model.ChatHistory{
UserID: userID,
ChatID: chatID,
}).
Where("user_id = ? AND chat_id = ? AND id IN ?", userID, chatID, ids).
Where("(retry_group_id IS NULL OR retry_group_id = '')").
Updates(map[string]any{

View File

@@ -35,44 +35,48 @@ func (d *CacheDAO) schedulePreviewKey(userID int, conversationID string) string
return fmt.Sprintf("smartflow:schedule_preview:u:%d:c:%s", userID, conversationID)
}
// SetBlacklist 鎶?Token 鎵旇繘榛戝悕鍗?
func (d *CacheDAO) conversationHistoryKey(userID int, conversationID string) string {
return fmt.Sprintf("smartflow:conversation_history:u:%d:c:%s", userID, conversationID)
}
// SetBlacklist 把 Token 写入黑名单。
func (d *CacheDAO) SetBlacklist(jti string, expiration time.Duration) error {
return d.client.Set(context.Background(), "blacklist:"+jti, "1", expiration).Err()
}
// IsBlacklisted 妫€鏌?Token 鏄惁鍦ㄩ粦鍚嶅崟涓?
// IsBlacklisted 检查 Token 是否在黑名单中。
func (d *CacheDAO) IsBlacklisted(jti string) (bool, error) {
result, err := d.client.Get(context.Background(), "blacklist:"+jti).Result()
if errors.Is(err, redis.Nil) {
return false, nil // 涓嶅湪榛戝悕鍗?
return false, nil // 不在黑名单中
} else if err != nil {
return false, err // 鍏朵粬閿欒
return false, err // 其他错误
}
return result == "1", nil // 鍦ㄩ粦鍚嶅崟
return result == "1", nil // 在黑名单中
}
func (d *CacheDAO) AddTaskClassList(ctx context.Context, userID int, list *model.UserGetTaskClassesResponse) error {
// 1. 瀹氫箟 Key锛屼娇鐢?userID 闅旂涓嶅悓鐢ㄦ埛鐨勬暟鎹?
// 1. 定义 Key,使用 userID 隔离不同用户的数据。
key := fmt.Sprintf("smartflow:task_classes:%d", userID)
// 2. 搴忓垪鍖栵細灏嗙粨鏋勪綋杞负 []byte
// 2. 序列化:将结构体转为 []byte
data, err := json.Marshal(list)
if err != nil {
return err
}
// 3. 瀛樺偍锛氳缃?30 鍒嗛挓杩囨湡锛堟牴鎹笟鍔$伒娲昏皟鏁达級
// 3. 存储:设置 30 分钟过期,可按业务需要调整。
return d.client.Set(ctx, key, data, 30*time.Minute).Err()
}
func (d *CacheDAO) GetTaskClassList(ctx context.Context, userID int) (*model.UserGetTaskClassesResponse, error) {
key := fmt.Sprintf("smartflow:task_classes:%d", userID)
var resp model.UserGetTaskClassesResponse
// 1. 浠?Redis 鑾峰彇瀛楃涓?
// 1. Redis 获取字符串。
val, err := d.client.Get(ctx, key).Result()
if err != nil {
// 娉ㄦ剰锛氬鏋滄槸 redis.Nil锛屼氦缁?Service 灞傚鐞嗘煡搴撻€昏緫
// 注意:若是 redis.Nil,则交给 Service 层处理回源查询逻辑。
return &resp, err
}
// 2. 鍙嶅簭鍒楀寲锛氬皢 JSON 杩樺師鍥炵粨鏋勪綋
// 2. 反序列化:将 JSON 还原回结构体。
err = json.Unmarshal([]byte(val), &resp)
return &resp, err
}
@@ -85,9 +89,9 @@ func (d *CacheDAO) DeleteTaskClassList(ctx context.Context, userID int) error {
func (d *CacheDAO) GetRecord(ctx context.Context, key string) (string, error) {
val, err := d.client.Get(ctx, key).Result()
if errors.Is(err, redis.Nil) {
return "", nil // 姝e父娌″懡涓殑鎯呭喌
return "", nil // 正常未命中
}
return val, err // 鐪熸鐨?Redis 鎶ラ敊
return val, err // 真正的 Redis 错误
}
func (d *CacheDAO) SaveRecord(ctx context.Context, key string, val string, ttl time.Duration) error {
@@ -118,7 +122,7 @@ func (d *CacheDAO) GetUserTasksFromCache(ctx context.Context, userID int) ([]mod
var tasks []model.Task
val, err := d.client.Get(ctx, key).Result()
if err != nil {
return nil, err // 娉ㄦ剰锛氬鏋滄槸 redis.Nil锛屼氦缁?Service 灞傚鐞嗘煡搴撻€昏緫
return nil, err // 注意:若是 redis.Nil,则交给 Service 层处理回源查询逻辑
}
err = json.Unmarshal([]byte(val), &tasks)
return tasks, err
@@ -154,7 +158,7 @@ func (d *CacheDAO) GetUserTodayScheduleFromCache(ctx context.Context, userID int
var schedules []model.UserTodaySchedule
val, err := d.client.Get(ctx, key).Result()
if err != nil {
return nil, err // 娉ㄦ剰锛氬鏋滄槸 redis.Nil锛屼氦缁?Service 灞傚鐞嗘煡搴撻€昏緫
return nil, err // 注意:若是 redis.Nil,则交给 Service 层处理回源查询逻辑
}
err = json.Unmarshal([]byte(val), &schedules)
return schedules, err
@@ -166,7 +170,7 @@ func (d *CacheDAO) SetUserTodayScheduleToCache(ctx context.Context, userID int,
if err != nil {
return err
}
// 璁剧疆杩囨湡鏃堕棿涓哄綋澶╁墿浣欑殑鏃堕棿锛岀‘淇濇瘡澶╂洿鏂颁竴娆$紦瀛?
// 设置过期时间为“当天剩余时间”,保证每天自然刷新一次缓存。
return d.client.Set(ctx, key, data, time.Until(time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day()+1, 0, 0, 0, 0, time.Now().Location()))).Err()
}
@@ -180,7 +184,7 @@ func (d *CacheDAO) GetUserWeeklyScheduleFromCache(ctx context.Context, userID in
var schedules model.UserWeekSchedule
val, err := d.client.Get(ctx, key).Result()
if err != nil {
return nil, err // 娉ㄦ剰锛氬鏋滄槸 redis.Nil锛屼氦缁?Service 灞傚鐞嗘煡搴撻€昏緫
return nil, err // 注意:若是 redis.Nil,则交给 Service 层处理回源查询逻辑
}
err = json.Unmarshal([]byte(val), &schedules)
return &schedules, err
@@ -192,7 +196,7 @@ func (d *CacheDAO) SetUserWeeklyScheduleToCache(ctx context.Context, userID int,
if err != nil {
return err
}
// 璁剧疆杩囨湡鏃堕棿涓轰竴澶?
// 设置过期时间为一天。
return d.client.Set(ctx, key, data, 24*time.Hour).Err()
}
@@ -206,7 +210,7 @@ func (d *CacheDAO) GetUserRecentCompletedSchedulesFromCache(ctx context.Context,
var resp model.UserRecentCompletedScheduleResponse
val, err := d.client.Get(ctx, key).Result()
if err != nil {
return &resp, err // 娉ㄦ剰锛氬鏋滄槸 redis.Nil锛屼氦缁?Service 灞傚鐞嗘煡搴撻€昏緫
return &resp, err // 注意:若是 redis.Nil,则交给 Service 层处理回源查询逻辑
}
err = json.Unmarshal([]byte(val), &resp)
return &resp, err
@@ -218,7 +222,7 @@ func (d *CacheDAO) SetUserRecentCompletedSchedulesToCache(ctx context.Context, u
if err != nil {
return err
}
// 璁剧疆杩囨湡鏃堕棿涓?0鍒嗛挓
// 设置过期时间为 30 分钟。
return d.client.Set(ctx, key, data, 30*time.Minute).Err()
}
@@ -232,7 +236,7 @@ func (d *CacheDAO) DeleteUserRecentCompletedSchedulesFromCache(ctx context.Conte
return err
}
if len(keys) > 0 {
// 鐢?UNLINK\(\) 寮傛鍒犻櫎锛岄檷浣庨樆濉為闄╋紱濡傞渶寮轰竴鑷村垹闄ゅ彲鏀圭敤 Del\(\)
// 使用 UNLINK() 异步删除,降低阻塞风险;若需要强一致删除可改用 Del()。
if err := d.client.Unlink(ctx, keys...).Err(); err != nil {
return err
}
@@ -250,10 +254,10 @@ func (d *CacheDAO) GetUserOngoingScheduleFromCache(ctx context.Context, userID i
var schedule model.OngoingSchedule
val, err := d.client.Get(ctx, key).Result()
if err != nil {
return &schedule, err // 娉ㄦ剰锛氬鏋滄槸 redis.Nil锛屼氦缁?Service 灞傚鐞嗘煡搴撻€昏緫
return &schedule, err // 注意:若是 redis.Nil,则交给 Service 层处理回源查询逻辑
}
if val == "null" {
return nil, nil // 涔嬪墠缂撳瓨杩囨病鏈夋鍦ㄨ繘琛岀殑鏃ョ▼锛岀洿鎺ヨ繑鍥?nil
return nil, nil // 之前缓存过“当前没有正在进行的日程”,这里直接返回 nil
}
err = json.Unmarshal([]byte(val), &schedule)
return &schedule, err
@@ -261,7 +265,7 @@ func (d *CacheDAO) GetUserOngoingScheduleFromCache(ctx context.Context, userID i
func (d *CacheDAO) SetUserOngoingScheduleToCache(ctx context.Context, userID int, schedule *model.OngoingSchedule) error {
if schedule == nil {
// 濡傛灉娌℃湁姝e湪杩涜鐨勬棩绋嬶紝璁剧疆绌哄€煎苟鐭殏杩囨湡锛岄伩鍏嶉绻佹煡搴?
// 如果当前没有正在进行的日程,则缓存空值并短暂过期,避免频繁回源查询。
key := fmt.Sprintf("smartflow:ongoing_schedule:%d", userID)
return d.client.Set(ctx, key, "null", 5*time.Minute).Err()
}
@@ -270,7 +274,7 @@ func (d *CacheDAO) SetUserOngoingScheduleToCache(ctx context.Context, userID int
if err != nil {
return err
}
// 璁剧疆杩囨湡鏃堕棿涓哄埌 endTime 鐨勫墿浣欐椂闂达紙鑻ュ凡杩囨湡鍒欎笉鍐欏叆缂撳瓨锛?
// 设置过期时间为距离 endTime 的剩余时长;若已过期,则不再写入缓存。
ttl := time.Until(schedule.EndTime)
if ttl <= 0 {
return nil
@@ -443,3 +447,81 @@ func (d *CacheDAO) DeleteSchedulePlanPreviewFromCache(ctx context.Context, userI
}
return d.client.Del(ctx, d.schedulePreviewKey(userID, normalizedConversationID)).Err()
}
// SetConversationHistoryToCache 写入“会话历史视图”缓存。
//
// 职责边界:
// 1. 负责按 user_id + conversation_id 写入前端历史查询所需的稳定 DTO
// 2. 只负责缓存当前可展示历史,不负责上下文窗口缓存;
// 3. 不负责 DB 回源,也不负责重试分组补算。
func (d *CacheDAO) SetConversationHistoryToCache(ctx context.Context, userID int, conversationID string, items []model.GetConversationHistoryItem) error {
if d == nil || d.client == nil {
return errors.New("cache dao is not initialized")
}
if userID <= 0 {
return fmt.Errorf("invalid user_id: %d", userID)
}
normalizedConversationID := strings.TrimSpace(conversationID)
if normalizedConversationID == "" {
return errors.New("conversation_id is empty")
}
data, err := json.Marshal(items)
if err != nil {
return fmt.Errorf("marshal conversation history failed: %w", err)
}
return d.client.Set(ctx, d.conversationHistoryKey(userID, normalizedConversationID), data, 1*time.Hour).Err()
}
// GetConversationHistoryFromCache 读取“会话历史视图”缓存。
//
// 输入输出语义:
// 1. 命中时返回历史 DTO 切片与 nil error
// 2. 未命中时返回 (nil, nil)
// 3. Redis 异常或反序列化失败时返回 error。
func (d *CacheDAO) GetConversationHistoryFromCache(ctx context.Context, userID int, conversationID string) ([]model.GetConversationHistoryItem, error) {
if d == nil || d.client == nil {
return nil, errors.New("cache dao is not initialized")
}
if userID <= 0 {
return nil, fmt.Errorf("invalid user_id: %d", userID)
}
normalizedConversationID := strings.TrimSpace(conversationID)
if normalizedConversationID == "" {
return nil, errors.New("conversation_id is empty")
}
raw, err := d.client.Get(ctx, d.conversationHistoryKey(userID, normalizedConversationID)).Result()
if err == redis.Nil {
return nil, nil
}
if err != nil {
return nil, err
}
var items []model.GetConversationHistoryItem
if err = json.Unmarshal([]byte(raw), &items); err != nil {
return nil, fmt.Errorf("unmarshal conversation history failed: %w", err)
}
return items, nil
}
// DeleteConversationHistoryFromCache 删除“会话历史视图”缓存。
//
// 说明:
// 1. 删除操作是幂等的key 不存在也视为成功;
// 2. 该方法用于 chat_histories 写入/补种 retry 分组后触发失效;
// 3. 这里只处理前端历史视图缓存,不影响 Agent 上下文热缓存。
func (d *CacheDAO) DeleteConversationHistoryFromCache(ctx context.Context, userID int, conversationID string) error {
if d == nil || d.client == nil {
return errors.New("cache dao is not initialized")
}
if userID <= 0 {
return fmt.Errorf("invalid user_id: %d", userID)
}
normalizedConversationID := strings.TrimSpace(conversationID)
if normalizedConversationID == "" {
return errors.New("conversation_id is empty")
}
return d.client.Del(ctx, d.conversationHistoryKey(userID, normalizedConversationID)).Err()
}

View File

@@ -65,7 +65,11 @@ func (p *GormCachePlugin) dispatchCacheLogic(modelObj interface{}, db *gorm.DB)
p.invalidTaskCache(m.UserID)
case model.AgentScheduleState:
p.invalidSchedulePlanPreviewCache(m.UserID, m.ConversationID)
case model.AgentOutboxMessage, model.ChatHistory, model.AgentChat, model.User:
case model.ChatHistory:
p.invalidConversationHistoryCache(m.UserID, m.ChatID)
case model.AgentChat:
p.invalidConversationHistoryCache(m.UserID, m.ChatID)
case model.AgentOutboxMessage, model.User:
// 这些模型目前没有定义缓存逻辑,先不处理
default:
// 只有真正没定义的模型才会到这里
@@ -124,3 +128,20 @@ func (p *GormCachePlugin) invalidSchedulePlanPreviewCache(userID int, conversati
log.Printf("[GORM-Cache] Invalidated schedule preview cache for user %d conversation %s", userID, normalizedConversationID)
}()
}
func (p *GormCachePlugin) invalidConversationHistoryCache(userID int, conversationID string) {
normalizedConversationID := strings.TrimSpace(conversationID)
if userID == 0 || normalizedConversationID == "" {
return
}
go func() {
// 1. 这里的调用目的:当聊天历史写入或重试补种更新后,删除“前端历史视图缓存”。
// 2. 这样下次访问 conversation-history 时会回源 DB并把最新 retry 版本完整回填缓存。
// 3. 注意:这里只删历史视图缓存,不删 Agent 上下文热缓存,避免影响聊天首 token。
if err := p.cacheDAO.DeleteConversationHistoryFromCache(context.Background(), userID, normalizedConversationID); err != nil {
log.Printf("[GORM-Cache] Failed to invalidate conversation history cache for user %d conversation %s: %v", userID, normalizedConversationID, err)
return
}
log.Printf("[GORM-Cache] Invalidated conversation history cache for user %d conversation %s", userID, normalizedConversationID)
}()
}

View File

@@ -435,6 +435,19 @@ func (s *AgentService) runNormalChatFlow(
pushErrNonBlocking(errChan, err)
return
}
s.appendConversationHistoryCacheOptimistically(
context.Background(),
userID,
chatID,
buildOptimisticConversationHistoryItem(
"user",
userMessage,
"",
0,
retryMeta,
requestStart,
),
)
// 普通聊天链路也需要把助手回复写入 Redis
// 否则会出现“数据库有助手消息,但 Redis 最新会话只有用户消息”的口径不一致。
@@ -472,6 +485,20 @@ func (s *AgentService) runNormalChatFlow(
TokensConsumed: requestTotalTokens,
}); saveErr != nil {
pushErrNonBlocking(errChan, saveErr)
} else {
s.appendConversationHistoryCacheOptimistically(
context.Background(),
userID,
chatID,
buildOptimisticConversationHistoryItem(
"assistant",
fullText,
assistantReasoning,
reasoningDurationSeconds,
retryMeta,
time.Now(),
),
)
}
// 9. 在主回复完成后异步尝试生成会话标题(仅首次、仅标题为空时生效)。

View File

@@ -2,14 +2,15 @@ package agentsvc
import (
"context"
"fmt"
"log"
"sort"
"strings"
"time"
"github.com/LoveLosita/smartflow/backend/conv"
"github.com/LoveLosita/smartflow/backend/model"
"github.com/LoveLosita/smartflow/backend/pkg"
"github.com/LoveLosita/smartflow/backend/respond"
"github.com/cloudwego/eino/schema"
"gorm.io/gorm"
)
@@ -37,59 +38,91 @@ func (s *AgentService) GetConversationHistory(ctx context.Context, userID int, c
return nil, gorm.ErrRecordNotFound
}
// 2. 优先读 Redis
// 2.1 命中时直接返回,复用当前聊天主链路维护的最近消息窗口
// 2.2 失败策略:缓存读取异常只记日志并继续回源 DB避免缓存抖动导致接口不可用
// 2.3 注意:缓存消息不包含稳定的 DB 主键与创建时间,因此这些字段允许为空
if s.agentCache != nil {
history, cacheErr := s.agentCache.GetHistory(ctx, normalizedChatID)
// 2. 优先读取“会话历史视图缓存”
// 2.1 这层缓存专门服务 conversation-history字段口径与前端展示一致
// 2.2 与 Agent 上下文热缓存解耦,避免为了历史多版本而拖慢首 token
// 2.3 若命中则直接返回miss 再回源 DB
if s.cacheDAO != nil {
items, cacheErr := s.cacheDAO.GetConversationHistoryFromCache(ctx, userID, normalizedChatID)
if cacheErr != nil {
log.Printf("读取会话历史缓存失败 chat_id=%s: %v", normalizedChatID, cacheErr)
} else if history != nil && !cacheConversationHistoryHasRetryMetadata(history) {
return buildConversationHistoryItemsFromCache(history), nil
log.Printf("读取会话历史视图缓存失败 chat_id=%s: %v", normalizedChatID, cacheErr)
} else if items != nil {
return items, nil
}
}
// 3. Redis 未命中时回源 DB
// 3.1 复用现有 GetUserChatHistories 读取最近 N 条历史,保证查询链路和主聊天链路口径一致
// 3.2 失败时直接上抛,由 API 层统一处理
// 3.3 成功后若缓存可用,则顺手回填 Redis降低后续冷启动成本
// 3. Redis miss 时回源 DB
// 3.1 复用现有 GetUserChatHistories 读取最近 N 条历史,保证“重试版本、落库主键、创建时间”口径稳定
// 3.2 再把 DB 结果转换成接口 DTO作为历史视图缓存回填
// 3.3 失败时直接上抛,由 API 层统一处理
histories, err := s.repo.GetUserChatHistories(ctx, userID, pkg.HistoryFetchLimitByModel("worker"), normalizedChatID)
if err != nil {
return nil, err
}
if s.agentCache != nil {
if setErr := s.agentCache.BackfillHistory(ctx, normalizedChatID, conv.ToEinoMessages(histories)); setErr != nil {
log.Printf("回填会话历史缓存失败 chat_id=%s: %v", normalizedChatID, setErr)
items := buildConversationHistoryItemsFromDB(histories)
if s.cacheDAO != nil {
if setErr := s.cacheDAO.SetConversationHistoryToCache(ctx, userID, normalizedChatID, items); setErr != nil {
log.Printf("回填会话历史视图缓存失败 chat_id=%s: %v", normalizedChatID, setErr)
}
}
return buildConversationHistoryItemsFromDB(histories), nil
return items, nil
}
// buildConversationHistoryItemsFromCache 把 Redis 中的 Eino 消息转换为接口响应
// appendConversationHistoryCacheOptimistically 把“刚生成但尚未完成 DB 持久化确认”的消息追加到历史视图缓存
//
// 职责边界:
// 1. 只做字段映射,不做权限校验或排序调整
// 2. 不补 created_at/id因为当前缓存模型不承载这两个字段
// 3. role 统一输出为 user / assistant / system避免前端再感知 schema.RoleType
func buildConversationHistoryItemsFromCache(messages []*schema.Message) []model.GetConversationHistoryItem {
items := make([]model.GetConversationHistoryItem, 0, len(messages))
for _, msg := range messages {
if msg == nil {
continue
}
items = append(items, model.GetConversationHistoryItem{
Role: normalizeConversationHistoryRole(string(msg.Role)),
Content: strings.TrimSpace(msg.Content),
ReasoningContent: strings.TrimSpace(msg.ReasoningContent),
ReasoningDurationSeconds: extractConversationReasoningDurationSeconds(msg),
RetryGroupID: extractConversationRetryGroupID(msg),
RetryIndex: extractConversationRetryIndex(msg),
})
// 1. 只服务前端会话历史展示,不参与 Agent 上下文热缓存
// 2. 优先复用现有历史视图缓存miss 时再用 DB 历史做一次启动兜底
// 3. 不保证最终权威性,最终仍以 DB 落库成功后的缓存失效与回源结果为准
func (s *AgentService) appendConversationHistoryCacheOptimistically(
ctx context.Context,
userID int,
chatID string,
newItems ...model.GetConversationHistoryItem,
) {
if s == nil || s.cacheDAO == nil {
return
}
normalizedChatID := strings.TrimSpace(chatID)
if userID <= 0 || normalizedChatID == "" || len(newItems) == 0 {
return
}
if ctx == nil {
ctx = context.Background()
}
// 1. 优先取历史视图缓存,避免每轮乐观追加都回源 DB。
items, err := s.cacheDAO.GetConversationHistoryFromCache(ctx, userID, normalizedChatID)
if err != nil {
log.Printf("读取会话历史视图缓存失败 chat_id=%s: %v", normalizedChatID, err)
return
}
// 2. 缓存 miss 时,用当前 DB 已有历史做一次基线兜底。
// 2.1 这样即便本轮是“缓存刚被 retry 补种操作删掉”,也不会只留下最新两条消息;
// 2.2 失败策略DB 兜底失败只记日志并跳过,不阻塞主回复流程。
if items == nil {
histories, hisErr := s.repo.GetUserChatHistories(ctx, userID, pkg.HistoryFetchLimitByModel("worker"), normalizedChatID)
if hisErr != nil {
log.Printf("乐观追加历史缓存时回源 DB 失败 chat_id=%s: %v", normalizedChatID, hisErr)
return
}
items = buildConversationHistoryItemsFromDB(histories)
}
merged := append([]model.GetConversationHistoryItem(nil), items...)
for _, item := range newItems {
merged = appendConversationHistoryItemIfMissing(merged, item)
}
sortConversationHistoryItems(merged)
merged = attachConversationRetryTotals(merged)
if err = s.cacheDAO.SetConversationHistoryToCache(ctx, userID, normalizedChatID, merged); err != nil {
log.Printf("乐观追加会话历史视图缓存失败 chat_id=%s: %v", normalizedChatID, err)
}
return attachConversationRetryTotals(items)
}
// buildConversationHistoryItemsFromDB 把数据库聊天记录转换为接口响应。
@@ -132,84 +165,6 @@ func derefConversationHistoryText(text *string) string {
return *text
}
func extractConversationReasoningDurationSeconds(msg *schema.Message) int {
if msg == nil || msg.Extra == nil {
return 0
}
raw, ok := msg.Extra["reasoning_duration_seconds"]
if !ok {
return 0
}
switch v := raw.(type) {
case int:
return v
case int32:
return int(v)
case int64:
return int(v)
case float64:
return int(v)
default:
return 0
}
}
func extractConversationRetryGroupID(msg *schema.Message) *string {
if msg == nil || msg.Extra == nil {
return nil
}
raw, ok := msg.Extra["retry_group_id"]
if !ok {
return nil
}
text, ok := raw.(string)
if !ok {
return nil
}
text = strings.TrimSpace(text)
if text == "" {
return nil
}
return &text
}
func extractConversationRetryIndex(msg *schema.Message) *int {
if msg == nil || msg.Extra == nil {
return nil
}
raw, ok := msg.Extra["retry_index"]
if !ok {
return nil
}
switch v := raw.(type) {
case int:
if v <= 0 {
return nil
}
return &v
case int32:
value := int(v)
if value <= 0 {
return nil
}
return &value
case int64:
value := int(v)
if value <= 0 {
return nil
}
return &value
case float64:
value := int(v)
if value <= 0 {
return nil
}
return &value
default:
return nil
}
}
func attachConversationRetryTotals(items []model.GetConversationHistoryItem) []model.GetConversationHistoryItem {
if len(items) == 0 {
return items
@@ -273,11 +228,89 @@ func normalizeConversationHistoryRole(role string) string {
}
}
func cacheConversationHistoryHasRetryMetadata(messages []*schema.Message) bool {
for _, msg := range messages {
if extractConversationRetryGroupID(msg) != nil {
return true
func buildOptimisticConversationHistoryItem(
role string,
content string,
reasoningContent string,
reasoningDurationSeconds int,
retryMeta *chatRetryMeta,
createdAt time.Time,
) model.GetConversationHistoryItem {
item := model.GetConversationHistoryItem{
Role: normalizeConversationHistoryRole(role),
Content: strings.TrimSpace(content),
ReasoningContent: strings.TrimSpace(reasoningContent),
ReasoningDurationSeconds: reasoningDurationSeconds,
}
if !createdAt.IsZero() {
t := createdAt
item.CreatedAt = &t
}
if retryMeta != nil {
item.RetryGroupID = retryMeta.GroupIDPtr()
item.RetryIndex = retryMeta.IndexPtr()
item.RetryTotal = retryMeta.IndexPtr()
}
return item
}
func appendConversationHistoryItemIfMissing(
items []model.GetConversationHistoryItem,
item model.GetConversationHistoryItem,
) []model.GetConversationHistoryItem {
targetKey := conversationHistoryItemSignature(item)
for _, existed := range items {
if conversationHistoryItemSignature(existed) == targetKey {
return items
}
}
return false
return append(items, item)
}
func conversationHistoryItemSignature(item model.GetConversationHistoryItem) string {
if item.ID > 0 {
return fmt.Sprintf("id:%d", item.ID)
}
groupID := ""
if item.RetryGroupID != nil {
groupID = strings.TrimSpace(*item.RetryGroupID)
}
retryIndex := 0
if item.RetryIndex != nil {
retryIndex = *item.RetryIndex
}
createdAt := ""
if item.CreatedAt != nil {
createdAt = item.CreatedAt.UTC().Format(time.RFC3339Nano)
}
return fmt.Sprintf(
"%s|%s|%s|%s|%d|%d|%s",
strings.TrimSpace(item.Role),
strings.TrimSpace(item.Content),
strings.TrimSpace(item.ReasoningContent),
groupID,
retryIndex,
item.ReasoningDurationSeconds,
createdAt,
)
}
func sortConversationHistoryItems(items []model.GetConversationHistoryItem) {
sort.SliceStable(items, func(i, j int) bool {
left := conversationHistoryTimestamp(items[i])
right := conversationHistoryTimestamp(items[j])
if left.Equal(right) {
return conversationHistoryItemSignature(items[i]) < conversationHistoryItemSignature(items[j])
}
return left.Before(right)
})
}
func conversationHistoryTimestamp(item model.GetConversationHistoryItem) time.Time {
if item.CreatedAt == nil {
return time.Time{}
}
return *item.CreatedAt
}

View File

@@ -345,6 +345,20 @@ func (s *AgentService) persistChatAfterReply(
pushErrNonBlocking(errChan, err)
return
}
userCreatedAt := time.Now()
s.appendConversationHistoryCacheOptimistically(
context.Background(),
userID,
chatID,
buildOptimisticConversationHistoryItem(
"user",
userMessage,
"",
0,
retryMeta,
userCreatedAt,
),
)
// 3. 助手消息同样遵循“Redis 先行 + 可靠持久化补齐”策略。
assistantMsg := &schema.Message{Role: schema.Assistant, Content: assistantReply, ReasoningContent: assistantReasoning}
@@ -378,5 +392,19 @@ func (s *AgentService) persistChatAfterReply(
TokensConsumed: assistantTokens,
}); err != nil {
pushErrNonBlocking(errChan, err)
return
}
s.appendConversationHistoryCacheOptimistically(
context.Background(),
userID,
chatID,
buildOptimisticConversationHistoryItem(
"assistant",
assistantReply,
assistantReasoning,
assistantReasoningDurationSeconds,
retryMeta,
userCreatedAt.Add(time.Millisecond),
),
)
}