Version: 0.9.32.dev.260419

后端:
1. 会话历史接口切换为统一时间线读取,并兼容 extra.resume 恢复协议
  - api/agent.go:新增 resume->confirm_action 映射(approve/reject/cancel),恢复请求缺 conversation_id 时拦截;GetConversationHistory 改为 GetConversationTimeline
  - routers/routers.go:路由从 GET /conversation-history 切换为 GET /conversation-timeline
  - model/agent.go:删除 GetConversationHistoryItem 旧 DTO
2. 新增会话时间线持久化链路(MySQL + Redis)
  - 新增 model/agent_timeline.go:定义 timeline kind、AgentTimelineEvent、持久化/返回结构
  - 新增 dao/agent_timeline.go:写入事件、按 seq 查询、查询 max seq
  - inits/mysql.go:AutoMigrate 增加 AgentTimelineEvent
  - dao/cache.go:新增 timeline list/seq key,支持 incr/set seq、append/list、全量回填与删除
  - 新增 service/agentsvc/agent_timeline.go:时间线读写编排(Redis 优先、DB 回源、seq 分配与冲突重试、extra 事件映射)
3. 聊天主链路改为写入 timeline,旧 history 服务下线
  - service/agentsvc/agent.go:普通聊天用户/助手消息改为 appendConversationTimelineEvent
  - service/agentsvc/agent_newagent.go:透传 resume_interaction_id;注入 emitter extra hook 持久化卡片事件;正文写入 timeline
  - 删除 service/agentsvc/agent_history.go:下线 conversation-history 旧缓存编排
4. newAgent 恢复与确认防串单增强
  - newAgent/model/graph_run_state.go:AgentGraphRequest 新增 ResumeInteractionID
  - newAgent/node/agent_nodes.go:透传 ResumeInteractionID
  - newAgent/node/chat.go:增加 stale_resume 校验;accept/reject 兼容 approve/cancel;非法动作返回 invalid_confirm_action
  - newAgent/stream/emitter.go:新增 extraEventHook / SetExtraEventHook,在 extra-only 与 confirm 事件触发
5. 日程暂存后同步刷新预览缓存,避免读到拖拽前旧数据
  - service/agentsvc/agent_schedule_state.go:Save 后重建并覆盖 preview 缓存,保留 trace/candidate 等字段
6. 缓存失效策略调整到 timeline 口径
  - middleware/cache_deleter.go:移除 conversation-history 失效逻辑;ChatHistory/AgentChat/AgentTimelineEvent 加入忽略集合

前端:
7. 新增时间线接口与类型定义
  - frontend/src/api/schedule_agent.ts:新增 TimelineEvent/TimelineToolPayload/TimelineConfirmPayload 与 getConversationTimeline
8. AssistantPanel 全面对接 timeline 重建消息与卡片
  - frontend/src/components/dashboard/AssistantPanel.vue:移除旧 history merge/normalize,新增 rebuildStateFromTimeline;支持 execution mode(always_execute);支持 resume-only 发送;修复 confirm 弹层手动关闭后重复弹出;会话标题显示放宽;流式中隐藏 action bar
9. 精排弹窗健壮性与交互动效优化
  - frontend/src/components/assistant/ScheduleFineTuneModal.vue:previewData 支持 nullable,新增 visible 控制与 watch 初始化,补齐空值保护并调整弹窗动画

仓库:
10. 新增前端时间线接入说明文档
  - docs/frontend/newagent_timeline_对接说明.md:接口、kind、payload、刷新重建与迁移建议
This commit is contained in:
Losita
2026-04-19 19:03:41 +08:00
parent 668af5f6c0
commit 0f749e9f5a
22 changed files with 1565 additions and 540 deletions

View File

@@ -34,6 +34,23 @@ func writeSSEData(w io.Writer, payload string) error {
return err
}
// mapResumeConfirmAction 把 extra.resume.action 映射为现有 confirm_action 口径。
//
// 映射规则:
// 1. approve -> accept确认执行
// 2. reject/cancel -> reject拒绝执行
// 3. 兜底走 reject避免脏值误触发执行。
func mapResumeConfirmAction(action model.AgentResumeAction) string {
switch action {
case model.AgentResumeActionApprove:
return "accept"
case model.AgentResumeActionReject, model.AgentResumeActionCancel:
return "reject"
default:
return "reject"
}
}
func (api *AgentHandler) ChatAgent(c *gin.Context) {
// 1) 设置 SSE 响应头
c.Writer.Header().Set("Content-Type", "text/event-stream")
@@ -49,10 +66,34 @@ func (api *AgentHandler) ChatAgent(c *gin.Context) {
return
}
// 2.1 兼容新恢复协议:把 extra.resume 统一映射到现有内部字段。
// 1. 前端新协议只传 resume不再直接传 confirm_action
// 2. 后端这里做一次入口归一,保证下游状态机继续按既有字段消费;
// 3. 解析失败直接返回 400避免把非法恢复请求当普通消息继续执行。
resumeReq, resumeErr := req.ResumeRequest()
if resumeErr != nil {
c.JSON(http.StatusBadRequest, respond.WrongParamType)
return
}
if resumeReq != nil {
if req.Extra == nil {
req.Extra = make(map[string]any)
}
req.Extra["resume_interaction_id"] = resumeReq.InteractionID
if resumeReq.IsConfirmResume() {
req.Extra["confirm_action"] = mapResumeConfirmAction(resumeReq.Action)
}
}
// 3) 规范化会话 ID
conversationID := strings.TrimSpace(req.ConversationID)
if conversationID == "" {
// confirm_action 需要关联已存在的会话状态,缺少 conversation_id 直接报错。
// 恢复类请求必须关联既有会话状态,缺少 conversation_id 直接报错。
if resumeReq != nil {
c.JSON(http.StatusBadRequest, respond.MissingConversationID)
return
}
// 兼容旧协议confirm_action 也必须绑定已有会话。
if _, ok := req.Extra["confirm_action"]; ok {
c.JSON(http.StatusBadRequest, respond.MissingConversationID)
return
@@ -209,29 +250,25 @@ func (api *AgentHandler) GetConversationList(c *gin.Context) {
c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp))
}
// GetConversationHistory 返回指定会话的聊天历史记录
// GetConversationTimeline 返回指定会话的统一时间线(正文+卡片)
//
// 设计说明:
// 1) 该接口只读历史,不负责改写 Redis/DB 中的会话状态
// 2) 读取顺序复用现有服务层能力:先校验归属,再查 Redis未命中再回源 DB
// 3) 会话不存在时统一返回 400避免前端把无效会话误判成系统故障
func (api *AgentHandler) GetConversationHistory(c *gin.Context) {
// 1. 参数校验conversation_id 必填。
// 说明:
// 1. 该接口是新前端刷新重建的单一来源
// 2. 返回结果已按 seq 升序,前端按数组顺序渲染即可
// 3. 会话不存在时统一返回 400避免误判成系统异常
func (api *AgentHandler) GetConversationTimeline(c *gin.Context) {
conversationID := strings.TrimSpace(c.Query("conversation_id"))
if conversationID == "" {
c.JSON(http.StatusBadRequest, respond.MissingParam)
return
}
// 2. 从鉴权上下文取当前用户 ID确保查询范围只落在“本人会话”内。
userID := c.GetInt("user_id")
// 3. 设置短超时,避免缓存抖动或慢查询长期占用连接。
ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second)
defer cancel()
// 4. 调 service 查询聊天历史。
history, err := api.svc.GetConversationHistory(ctx, userID, conversationID)
timeline, err := api.svc.GetConversationTimeline(ctx, userID, conversationID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
c.JSON(http.StatusBadRequest, respond.WrongParamType)
@@ -241,8 +278,7 @@ func (api *AgentHandler) GetConversationHistory(c *gin.Context) {
return
}
// 5. 返回统一响应结构。
c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, history))
c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, timeline))
}
// GetSchedulePlanPreview 返回“指定会话”的排程结构化预览。

View File

@@ -0,0 +1,86 @@
package dao
import (
"context"
"strings"
"time"
"github.com/LoveLosita/smartflow/backend/model"
)
// SaveConversationTimelineEvent 持久化单条会话时间线事件到 MySQL。
//
// 职责边界:
// 1. 只做单条写入,不负责 seq 分配;
// 2. 只保证字段标准化(去空格、空值置 nil不做业务语义修正
// 3. 返回 error 让上层决定是否中断当前链路。
func (a *AgentDAO) SaveConversationTimelineEvent(ctx context.Context, payload model.ChatTimelinePersistPayload) (int64, *time.Time, error) {
normalizedChatID := strings.TrimSpace(payload.ConversationID)
normalizedKind := strings.TrimSpace(payload.Kind)
normalizedRole := strings.TrimSpace(payload.Role)
normalizedContent := strings.TrimSpace(payload.Content)
normalizedPayloadJSON := strings.TrimSpace(payload.PayloadJSON)
var rolePtr *string
if normalizedRole != "" {
rolePtr = &normalizedRole
}
var contentPtr *string
if normalizedContent != "" {
contentPtr = &normalizedContent
}
var payloadPtr *string
if normalizedPayloadJSON != "" {
payloadPtr = &normalizedPayloadJSON
}
event := model.AgentTimelineEvent{
UserID: payload.UserID,
ChatID: normalizedChatID,
Seq: payload.Seq,
Kind: normalizedKind,
Role: rolePtr,
Content: contentPtr,
Payload: payloadPtr,
TokensConsumed: payload.TokensConsumed,
}
if err := a.db.WithContext(ctx).Create(&event).Error; err != nil {
return 0, nil, err
}
return event.ID, event.CreatedAt, nil
}
// ListConversationTimelineEvents 查询会话时间线,按 seq 正序返回。
func (a *AgentDAO) ListConversationTimelineEvents(ctx context.Context, userID int, chatID string) ([]model.AgentTimelineEvent, error) {
normalizedChatID := strings.TrimSpace(chatID)
var events []model.AgentTimelineEvent
err := a.db.WithContext(ctx).
Where("user_id = ? AND chat_id = ?", userID, normalizedChatID).
Order("seq ASC").
Order("id ASC").
Find(&events).Error
if err != nil {
return nil, err
}
return events, nil
}
// GetConversationTimelineMaxSeq 返回会话时间线当前最大 seq。
//
// 说明:
// 1. 该方法主要用于 Redis 顺序号不可用时的 DB 兜底;
// 2. 无记录时返回 0不视为错误
// 3. 上层需要自行 +1 后再写入新事件。
func (a *AgentDAO) GetConversationTimelineMaxSeq(ctx context.Context, userID int, chatID string) (int64, error) {
normalizedChatID := strings.TrimSpace(chatID)
var maxSeq int64
err := a.db.WithContext(ctx).
Model(&model.AgentTimelineEvent{}).
Where("user_id = ? AND chat_id = ?", userID, normalizedChatID).
Select("COALESCE(MAX(seq), 0)").
Scan(&maxSeq).Error
if err != nil {
return 0, err
}
return maxSeq, nil
}

View File

@@ -37,8 +37,12 @@ func (d *CacheDAO) schedulePreviewKey(userID int, conversationID string) string
return fmt.Sprintf("smartflow:schedule_preview:u:%d:c:%s", userID, conversationID)
}
func (d *CacheDAO) conversationHistoryKey(userID int, conversationID string) string {
return fmt.Sprintf("smartflow:conversation_history:u:%d:c:%s", userID, conversationID)
func (d *CacheDAO) conversationTimelineKey(userID int, conversationID string) string {
return fmt.Sprintf("smartflow:conversation_timeline:u:%d:c:%s", userID, conversationID)
}
func (d *CacheDAO) conversationTimelineSeqKey(userID int, conversationID string) string {
return fmt.Sprintf("smartflow:conversation_timeline_seq:u:%d:c:%s", userID, conversationID)
}
// SetBlacklist 把 Token 写入黑名单。
@@ -450,13 +454,59 @@ func (d *CacheDAO) DeleteSchedulePlanPreviewFromCache(ctx context.Context, userI
return d.client.Del(ctx, d.schedulePreviewKey(userID, normalizedConversationID)).Err()
}
// SetConversationHistoryToCache 写入“会话历史视图”缓存
// IncrConversationTimelineSeq 原子递增并返回会话时间线 seq
//
// 职责边界
// 1. 负责按 user_id + conversation_id 写入前端历史查询所需的稳定 DTO
// 2. 只负责缓存当前可展示历史,不负责上下文窗口缓存
// 3. 不负责 DB 回源,也不负责重试分组补算
func (d *CacheDAO) SetConversationHistoryToCache(ctx context.Context, userID int, conversationID string, items []model.GetConversationHistoryItem) error {
// 说明
// 1. seq 只在同一 user_id + conversation_id 维度内递增
// 2. 使用 Redis INCR 保证并发下不会拿到重复顺序号
// 3. 该 key 也会设置 TTL避免长尾会话长期占用缓存
func (d *CacheDAO) IncrConversationTimelineSeq(ctx context.Context, userID int, conversationID string) (int64, error) {
if d == nil || d.client == nil {
return 0, errors.New("cache dao is not initialized")
}
if userID <= 0 {
return 0, fmt.Errorf("invalid user_id: %d", userID)
}
normalizedConversationID := strings.TrimSpace(conversationID)
if normalizedConversationID == "" {
return 0, errors.New("conversation_id is empty")
}
key := d.conversationTimelineSeqKey(userID, normalizedConversationID)
pipe := d.client.Pipeline()
incrCmd := pipe.Incr(ctx, key)
pipe.Expire(ctx, key, 24*time.Hour)
if _, err := pipe.Exec(ctx); err != nil {
return 0, err
}
return incrCmd.Val(), nil
}
// SetConversationTimelineSeq 强制设置会话时间线当前 seqDB 回填 Redis 兜底场景)。
func (d *CacheDAO) SetConversationTimelineSeq(ctx context.Context, userID int, conversationID string, seq int64) error {
if d == nil || d.client == nil {
return errors.New("cache dao is not initialized")
}
if userID <= 0 {
return fmt.Errorf("invalid user_id: %d", userID)
}
normalizedConversationID := strings.TrimSpace(conversationID)
if normalizedConversationID == "" {
return errors.New("conversation_id is empty")
}
if seq < 0 {
seq = 0
}
return d.client.Set(ctx, d.conversationTimelineSeqKey(userID, normalizedConversationID), seq, 24*time.Hour).Err()
}
// AppendConversationTimelineEventToCache 追加单条时间线缓存事件。
func (d *CacheDAO) AppendConversationTimelineEventToCache(
ctx context.Context,
userID int,
conversationID string,
item model.GetConversationTimelineItem,
) error {
if d == nil || d.client == nil {
return errors.New("cache dao is not initialized")
}
@@ -468,20 +518,53 @@ func (d *CacheDAO) SetConversationHistoryToCache(ctx context.Context, userID int
return errors.New("conversation_id is empty")
}
data, err := json.Marshal(items)
data, err := json.Marshal(item)
if err != nil {
return fmt.Errorf("marshal conversation history failed: %w", err)
return fmt.Errorf("marshal conversation timeline item failed: %w", err)
}
return d.client.Set(ctx, d.conversationHistoryKey(userID, normalizedConversationID), data, 1*time.Hour).Err()
key := d.conversationTimelineKey(userID, normalizedConversationID)
pipe := d.client.Pipeline()
pipe.RPush(ctx, key, data)
pipe.Expire(ctx, key, 24*time.Hour)
_, err = pipe.Exec(ctx)
return err
}
// GetConversationHistoryFromCache 读取“会话历史视图”缓存。
//
// 输入输出语义:
// 1. 命中时返回历史 DTO 切片与 nil error
// 2. 未命中时返回 (nil, nil)
// 3. Redis 异常或反序列化失败时返回 error。
func (d *CacheDAO) GetConversationHistoryFromCache(ctx context.Context, userID int, conversationID string) ([]model.GetConversationHistoryItem, error) {
// SetConversationTimelineToCache 全量回填时间线缓存。
func (d *CacheDAO) SetConversationTimelineToCache(ctx context.Context, userID int, conversationID string, items []model.GetConversationTimelineItem) error {
if d == nil || d.client == nil {
return errors.New("cache dao is not initialized")
}
if userID <= 0 {
return fmt.Errorf("invalid user_id: %d", userID)
}
normalizedConversationID := strings.TrimSpace(conversationID)
if normalizedConversationID == "" {
return errors.New("conversation_id is empty")
}
key := d.conversationTimelineKey(userID, normalizedConversationID)
pipe := d.client.Pipeline()
pipe.Del(ctx, key)
if len(items) > 0 {
values := make([]interface{}, 0, len(items))
for _, item := range items {
data, err := json.Marshal(item)
if err != nil {
return fmt.Errorf("marshal conversation timeline item failed: %w", err)
}
values = append(values, data)
}
pipe.RPush(ctx, key, values...)
}
pipe.Expire(ctx, key, 24*time.Hour)
_, err := pipe.Exec(ctx)
return err
}
// GetConversationTimelineFromCache 读取时间线缓存(按 seq 正序)。
func (d *CacheDAO) GetConversationTimelineFromCache(ctx context.Context, userID int, conversationID string) ([]model.GetConversationTimelineItem, error) {
if d == nil || d.client == nil {
return nil, errors.New("cache dao is not initialized")
}
@@ -493,28 +576,30 @@ func (d *CacheDAO) GetConversationHistoryFromCache(ctx context.Context, userID i
return nil, errors.New("conversation_id is empty")
}
raw, err := d.client.Get(ctx, d.conversationHistoryKey(userID, normalizedConversationID)).Result()
rawItems, err := d.client.LRange(ctx, d.conversationTimelineKey(userID, normalizedConversationID), 0, -1).Result()
if err == redis.Nil {
return nil, nil
}
if err != nil {
return nil, err
}
if len(rawItems) == 0 {
return nil, nil
}
var items []model.GetConversationHistoryItem
if err = json.Unmarshal([]byte(raw), &items); err != nil {
return nil, fmt.Errorf("unmarshal conversation history failed: %w", err)
items := make([]model.GetConversationTimelineItem, 0, len(rawItems))
for _, raw := range rawItems {
var item model.GetConversationTimelineItem
if err := json.Unmarshal([]byte(raw), &item); err != nil {
return nil, fmt.Errorf("unmarshal conversation timeline item failed: %w", err)
}
items = append(items, item)
}
return items, nil
}
// DeleteConversationHistoryFromCache 删除“会话历史视图”缓存。
//
// 说明:
// 1. 删除操作是幂等的key 不存在也视为成功;
// 2. 该方法用于 chat_histories 写入/补种 retry 分组后触发失效;
// 3. 这里只处理前端历史视图缓存,不影响 Agent 上下文热缓存。
func (d *CacheDAO) DeleteConversationHistoryFromCache(ctx context.Context, userID int, conversationID string) error {
// DeleteConversationTimelineFromCache 删除时间线缓存和 seq 缓存。
func (d *CacheDAO) DeleteConversationTimelineFromCache(ctx context.Context, userID int, conversationID string) error {
if d == nil || d.client == nil {
return errors.New("cache dao is not initialized")
}
@@ -525,7 +610,11 @@ func (d *CacheDAO) DeleteConversationHistoryFromCache(ctx context.Context, userI
if normalizedConversationID == "" {
return errors.New("conversation_id is empty")
}
return d.client.Del(ctx, d.conversationHistoryKey(userID, normalizedConversationID)).Err()
return d.client.Del(
ctx,
d.conversationTimelineKey(userID, normalizedConversationID),
d.conversationTimelineSeqKey(userID, normalizedConversationID),
).Err()
}
// agentStateKey 返回 agent 运行态快照的 Redis key。
@@ -615,7 +704,7 @@ const (
// memoryPrefetchKey 生成用户+会话维度的记忆预取缓存 key。
//
// 1. 格式smartflow:memory_prefetch:u:{userID}:c:{chatID},与 conversationHistoryKey / schedulePreviewKey 命名风格一致;
// 1. 格式smartflow:memory_prefetch:u:{userID}:c:{chatID},与 conversationTimelineKey / schedulePreviewKey 命名风格一致;
// 2. chatID 为空时 key 为 smartflow:memory_prefetch:u:5:c:,仍然合法且唯一,不会与其他会话 key 冲突;
// 3. 加 chatID 隔离后,不同会话各自维护独立的预取缓存,避免会话间记忆上下文互相覆盖。
func (d *CacheDAO) memoryPrefetchKey(userID int, chatID string) string {

View File

@@ -15,6 +15,7 @@ func autoMigrateModels(db *gorm.DB) error {
&model.User{},
&model.AgentChat{},
&model.ChatHistory{},
&model.AgentTimelineEvent{},
&model.Task{},
&model.TaskClass{},
&model.TaskClassItem{},

View File

@@ -74,10 +74,6 @@ func (p *GormCachePlugin) dispatchCacheLogic(modelObj interface{}) {
p.invalidTaskCache(m.UserID)
case model.AgentScheduleState:
p.invalidSchedulePlanPreviewCache(m.UserID, m.ConversationID)
case model.ChatHistory:
p.invalidConversationHistoryCache(m.UserID, m.ChatID)
case model.AgentChat:
p.invalidConversationHistoryCache(m.UserID, m.ChatID)
case model.MemoryItem:
// 1. 管理面删除/修改/恢复/新增记忆时,自动失效该用户所有会话的预取缓存;
// 2. repo 方法通过 Model(&model.MemoryItem{UserID: userID}) 携带 userID
@@ -86,6 +82,9 @@ func (p *GormCachePlugin) dispatchCacheLogic(modelObj interface{}) {
p.invalidMemoryPrefetchCache(m.UserID)
case model.AgentOutboxMessage,
model.User,
model.ChatHistory,
model.AgentChat,
model.AgentTimelineEvent,
model.AgentStateSnapshotRecord,
model.MemoryJob,
model.MemoryAuditLog,
@@ -152,23 +151,6 @@ func (p *GormCachePlugin) invalidSchedulePlanPreviewCache(userID int, conversati
}()
}
func (p *GormCachePlugin) invalidConversationHistoryCache(userID int, conversationID string) {
normalizedConversationID := strings.TrimSpace(conversationID)
if userID == 0 || normalizedConversationID == "" {
return
}
go func() {
// 1. 聊天历史写入或重试补种后,删除历史视图缓存,保证下次列表/详情能拿到最新版本。
// 2. 这里只清“前台历史视图缓存”,不碰 LLM 上下文热缓存,避免影响首 token 体验。
if err := p.cacheDAO.DeleteConversationHistoryFromCache(context.Background(), userID, normalizedConversationID); err != nil {
log.Printf("[GORM-Cache] Failed to invalidate conversation history cache for user %d conversation %s: %v", userID, normalizedConversationID, err)
return
}
log.Printf("[GORM-Cache] Invalidated conversation history cache for user %d conversation %s", userID, normalizedConversationID)
}()
}
// invalidMemoryPrefetchCache 失效指定用户所有会话的记忆预取缓存。
//
// 步骤化说明:

View File

@@ -220,15 +220,6 @@ type GetConversationListResponse struct {
HasMore bool `json:"has_more"`
}
type GetConversationHistoryItem struct {
ID int `json:"id,omitempty"`
Role string `json:"role"`
Content string `json:"content"`
CreatedAt *time.Time `json:"created_at,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"`
ReasoningDurationSeconds int `json:"reasoning_duration_seconds,omitempty"`
}
type SchedulePlanPreviewCache struct {
UserID int `json:"user_id"`
ConversationID string `json:"conversation_id"`

View File

@@ -0,0 +1,63 @@
package model
import "time"
// AgentTimelineKind 定义会话时间线事件类型。
//
// 说明:
// 1. 这些类型面向前端渲染,要求语义稳定,不随节点内部实现细节频繁变化;
// 2. 文本消息和卡片事件共用一条时间线,前端只按 seq 顺序渲染;
// 3. token 统计仍以 chat_histories / agent_chats 为准,时间线只做展示顺序与结构承载。
const (
AgentTimelineKindUserText = "user_text"
AgentTimelineKindAssistantText = "assistant_text"
AgentTimelineKindToolCall = "tool_call"
AgentTimelineKindToolResult = "tool_result"
AgentTimelineKindConfirmRequest = "confirm_request"
AgentTimelineKindScheduleCompleted = "schedule_completed"
)
// AgentTimelineEvent 表示会话里“可展示事件”的统一持久化记录。
//
// 职责边界:
// 1. 只承载“顺序 + 展示信息”,不替代 chat_histories 的消息账本职责;
// 2. seq 是同一会话内的单调递增顺序号,用于刷新后重建展示顺序;
// 3. payload 只保存前端渲染需要的结构化信息,不存整个运行时快照。
type AgentTimelineEvent struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
UserID int `gorm:"column:user_id;not null;uniqueIndex:uk_timeline_user_chat_seq,priority:1;index:idx_timeline_user_chat_created,priority:1;comment:所属用户ID"`
ChatID string `gorm:"column:chat_id;type:varchar(36);not null;uniqueIndex:uk_timeline_user_chat_seq,priority:2;index:idx_timeline_user_chat_created,priority:2;comment:会话UUID"`
Seq int64 `gorm:"column:seq;not null;uniqueIndex:uk_timeline_user_chat_seq,priority:3;comment:会话内顺序号"`
Kind string `gorm:"column:kind;type:varchar(64);not null;comment:事件类型"`
Role *string `gorm:"column:role;type:varchar(32);comment:消息角色"`
Content *string `gorm:"column:content;type:text;comment:正文内容"`
Payload *string `gorm:"column:payload;type:json;comment:结构化负载"`
TokensConsumed int `gorm:"column:tokens_consumed;not null;default:0;comment:该事件关联token默认0"`
CreatedAt *time.Time `gorm:"column:created_at;autoCreateTime;index:idx_timeline_user_chat_created,priority:3"`
}
func (AgentTimelineEvent) TableName() string { return "agent_timeline_events" }
// ChatTimelinePersistPayload 定义时间线单条事件落库输入。
type ChatTimelinePersistPayload struct {
UserID int `json:"user_id"`
ConversationID string `json:"conversation_id"`
Seq int64 `json:"seq"`
Kind string `json:"kind"`
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
PayloadJSON string `json:"payload_json,omitempty"`
TokensConsumed int `json:"tokens_consumed"`
}
// GetConversationTimelineItem 定义前端读取时间线接口的单条返回项。
type GetConversationTimelineItem struct {
ID int64 `json:"id,omitempty"`
Seq int64 `json:"seq"`
Kind string `json:"kind"`
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
Payload map[string]any `json:"payload,omitempty"`
TokensConsumed int `json:"tokens_consumed,omitempty"`
CreatedAt *time.Time `json:"created_at,omitempty"`
}

View File

@@ -21,7 +21,9 @@ import (
type AgentGraphRequest struct {
UserInput string
ConfirmAction string // "accept" / "reject" / "",仅 confirm 恢复场景由前端传入
AlwaysExecute bool // true 时写工具跳过确认闸门直接执行,适合前端已展示预览、用户无需逐步确认的场景
// ResumeInteractionID 用于校验“本次恢复请求”是否命中了当前 pending 交互,避免旧卡片误恢复。
ResumeInteractionID string
AlwaysExecute bool // true 时写工具跳过确认闸门直接执行,适合前端已展示预览、用户无需逐步确认的场景
}
// Normalize 统一清洗请求级输入中的字符串字段。
@@ -31,6 +33,7 @@ func (r *AgentGraphRequest) Normalize() {
}
r.UserInput = strings.TrimSpace(r.UserInput)
r.ConfirmAction = strings.TrimSpace(r.ConfirmAction)
r.ResumeInteractionID = strings.TrimSpace(r.ResumeInteractionID)
}
// RoughBuildPlacement 是粗排算法返回的单条放置结果。

View File

@@ -39,6 +39,7 @@ func (n *AgentNodes) Chat(ctx context.Context, st *newagentmodel.AgentGraphState
ConversationContext: st.EnsureConversationContext(),
UserInput: st.Request.UserInput,
ConfirmAction: st.Request.ConfirmAction,
ResumeInteractionID: st.Request.ResumeInteractionID,
Client: st.Deps.ResolveChatClient(),
ChunkEmitter: st.EnsureChunkEmitter(),
CompactionStore: st.Deps.CompactionStore,

View File

@@ -49,6 +49,7 @@ type ChatNodeInput struct {
ConversationContext *newagentmodel.ConversationContext
UserInput string
ConfirmAction string
ResumeInteractionID string
Client *infrallm.Client
ChunkEmitter *newagentstream.ChunkEmitter
CompactionStore newagentmodel.CompactionStore // 上下文压缩持久化
@@ -679,6 +680,14 @@ func handleChatResume(
pending := runtimeState.PendingInteraction
flowState := runtimeState.EnsureCommonState()
if isMismatchedResumeInteraction(input.ResumeInteractionID, pending) {
_ = emitter.EmitStatus(
chatStatusBlockID, chatStageName,
"stale_resume", "当前确认已过期,请刷新后重试。", false,
)
return nil
}
// 用户输入在 service 层进入 graph 前已经统一追加到 ConversationContext。
// 这里不再二次写入,避免 pending 恢复路径把同一轮 user message 追加两次。
@@ -715,10 +724,18 @@ func handleConfirmResume(
pending *newagentmodel.PendingInteraction,
emitter *newagentstream.ChunkEmitter,
) error {
if isMismatchedResumeInteraction(input.ResumeInteractionID, pending) {
_ = emitter.EmitStatus(
chatStatusBlockID, chatStageName,
"stale_resume", "当前确认已过期,请刷新后重试。", false,
)
return nil
}
action := strings.ToLower(strings.TrimSpace(input.ConfirmAction))
switch action {
case "accept":
case "accept", "approve":
// 恢复前保存待执行工具Execute 节点需要它。
pendingTool := pending.PendingTool
runtimeState.ResumeFromPending()
@@ -733,7 +750,7 @@ func handleConfirmResume(
"confirmed", "已确认,开始执行。", false,
)
case "reject":
case "reject", "cancel":
runtimeState.ResumeFromPending()
if pending.PendingTool != nil {
// 工具确认被拒 → 回到 executing 换策略。
@@ -748,17 +765,26 @@ func handleConfirmResume(
)
default:
// 无合法 confirm action → 保守:等同于 reject。
runtimeState.ResumeFromPending()
if pending.PendingTool != nil {
flowState.Phase = newagentmodel.PhaseExecuting
} else {
flowState.RejectPlan()
}
_ = emitter.EmitStatus(
chatStatusBlockID, chatStageName,
"invalid_confirm_action", "未识别确认动作,请重试。", false,
)
}
return nil
}
func isMismatchedResumeInteraction(resumeInteractionID string, pending *newagentmodel.PendingInteraction) bool {
if pending == nil {
return false
}
resumeID := strings.TrimSpace(resumeInteractionID)
pendingID := strings.TrimSpace(pending.InteractionID)
if resumeID == "" || pendingID == "" {
return false
}
return resumeID != pendingID
}
// prepareChatNodeInput 校验并准备聊天节点的运行态依赖。
func prepareChatNodeInput(input ChatNodeInput) (
*newagentmodel.AgentRuntimeState,

View File

@@ -62,6 +62,11 @@ type ChunkEmitter struct {
RequestID string
ModelName string
Created int64
// extraEventHook 用于把关键结构化事件同步给上层做持久化。
// 1. hook 失败不能影响 SSE 主链路;
// 2. hook 只接收 extra 结构,避免 emitter 反向依赖业务层;
// 3. 不注入时保持空实现,兼容旧调用路径。
extraEventHook func(extra *OpenAIChunkExtra)
}
// NoopPayloadEmitter 返回一个空实现,便于骨架期安全占位。
@@ -109,6 +114,14 @@ func NewChunkEmitter(emit PayloadEmitter, requestID, modelName string, created i
}
}
// SetExtraEventHook 设置结构化事件回调。
func (e *ChunkEmitter) SetExtraEventHook(hook func(extra *OpenAIChunkExtra)) {
if e == nil {
return
}
e.extraEventHook = hook
}
// EmitReasoningText 输出一段 reasoning 文字,并附带 reasoning_text extra。
func (e *ChunkEmitter) EmitReasoningText(blockID, stage, text string, includeRole bool) error {
if e == nil || e.emit == nil {
@@ -233,6 +246,7 @@ func (e *ChunkEmitter) emitExtraOnly(extra *OpenAIChunkExtra) error {
if e == nil || e.emit == nil {
return nil
}
e.emitExtraEventHook(extra)
payload, err := ToOpenAIStreamWithExtra(
nil,
e.RequestID,
@@ -250,6 +264,13 @@ func (e *ChunkEmitter) emitExtraOnly(extra *OpenAIChunkExtra) error {
return e.emit(payload)
}
func (e *ChunkEmitter) emitExtraEventHook(extra *OpenAIChunkExtra) {
if e == nil || e.extraEventHook == nil || extra == nil {
return
}
e.extraEventHook(extra)
}
// EmitConfirmRequest 输出一次待确认事件。
//
// 当前展示策略:
@@ -263,6 +284,7 @@ func (e *ChunkEmitter) EmitConfirmRequest(ctx context.Context, blockID, stage, i
text := buildConfirmAssistantText(title, summary)
extra := NewConfirmRequestExtra(blockID, stage, interactionID, title, summary)
e.emitExtraEventHook(extra)
return e.emitPseudoText(
ctx,
text,

View File

@@ -93,7 +93,7 @@ func RegisterRouters(handlers *api.ApiHandlers, cache *dao.CacheDAO, userRepo *d
agentGroup.POST("/chat", middleware.TokenQuotaGuard(cache, userRepo), handlers.AgentHandler.ChatAgent)
agentGroup.GET("/conversation-meta", handlers.AgentHandler.GetConversationMeta)
agentGroup.GET("/conversation-list", handlers.AgentHandler.GetConversationList)
agentGroup.GET("/conversation-history", handlers.AgentHandler.GetConversationHistory)
agentGroup.GET("/conversation-timeline", handlers.AgentHandler.GetConversationTimeline)
agentGroup.GET("/schedule-preview", handlers.AgentHandler.GetSchedulePlanPreview)
agentGroup.GET("/context-stats", handlers.AgentHandler.GetContextStats)
agentGroup.POST("/schedule-state", handlers.AgentHandler.SaveScheduleState)

View File

@@ -386,18 +386,19 @@ func (s *AgentService) runNormalChatFlow(
pushErrNonBlocking(errChan, err)
return
}
s.appendConversationHistoryCacheOptimistically(
context.Background(),
if _, timelineErr := s.appendConversationTimelineEvent(
ctx,
userID,
chatID,
buildOptimisticConversationHistoryItem(
"user",
userMessage,
"",
0,
requestStart,
),
)
model.AgentTimelineKindUserText,
"user",
userMessage,
nil,
0,
); timelineErr != nil {
pushErrNonBlocking(errChan, timelineErr)
return
}
}
// 普通聊天链路也需要把助手回复写入 Redis
@@ -425,18 +426,25 @@ func (s *AgentService) runNormalChatFlow(
}); saveErr != nil {
pushErrNonBlocking(errChan, saveErr)
} else {
s.appendConversationHistoryCacheOptimistically(
assistantTimelinePayload := map[string]any{}
if strings.TrimSpace(assistantReasoning) != "" {
assistantTimelinePayload["reasoning_content"] = strings.TrimSpace(assistantReasoning)
}
if reasoningDurationSeconds > 0 {
assistantTimelinePayload["reasoning_duration_seconds"] = reasoningDurationSeconds
}
if _, timelineErr := s.appendConversationTimelineEvent(
context.Background(),
userID,
chatID,
buildOptimisticConversationHistoryItem(
"assistant",
fullText,
assistantReasoning,
reasoningDurationSeconds,
time.Now(),
),
)
model.AgentTimelineKindAssistantText,
"assistant",
fullText,
assistantTimelinePayload,
requestTotalTokens,
); timelineErr != nil {
pushErrNonBlocking(errChan, timelineErr)
}
}
// 9. 在主回复完成后异步尝试生成会话标题(仅首次、仅标题为空时生效)。

View File

@@ -1,257 +0,0 @@
package agentsvc
import (
"context"
"fmt"
"log"
"sort"
"strings"
"time"
"github.com/LoveLosita/smartflow/backend/model"
"github.com/LoveLosita/smartflow/backend/pkg"
"github.com/LoveLosita/smartflow/backend/respond"
"gorm.io/gorm"
)
// GetConversationHistory 返回指定会话的聊天历史。
//
// 职责边界:
// 1. 负责会话 ID 归一化、会话归属校验,以及“先 Redis、后 DB”的读取编排
// 2. 负责把缓存消息 / DB 记录统一转换为 API 响应 DTO
// 3. 不负责补写会话标题,也不负责修改聊天主链路的缓存写入策略。
func (s *AgentService) GetConversationHistory(ctx context.Context, userID int, chatID string) ([]model.GetConversationHistoryItem, error) {
normalizedChatID := strings.TrimSpace(chatID)
if normalizedChatID == "" {
return nil, respond.MissingParam
}
// 1. 先做归属校验:
// 1.1 Redis 历史缓存只按 chat_id 分桶,不能单靠缓存判断用户归属;
// 1.2 因此先查会话是否属于当前用户,避免命中别人会话缓存时产生越权读取;
// 1.3 若会话不存在,统一返回 gorm.ErrRecordNotFound交由 API 层映射为参数错误。
exists, err := s.repo.IfChatExists(ctx, userID, normalizedChatID)
if err != nil {
return nil, err
}
if !exists {
return nil, gorm.ErrRecordNotFound
}
// 2. 优先读取“会话历史视图缓存”:
// 2.1 这层缓存专门服务 conversation-history字段口径与前端展示一致
// 2.2 与 Agent 上下文热缓存解耦,避免为了历史多版本而拖慢首 token
// 2.3 若命中则直接返回miss 再回源 DB。
if s.cacheDAO != nil {
items, cacheErr := s.cacheDAO.GetConversationHistoryFromCache(ctx, userID, normalizedChatID)
if cacheErr != nil {
log.Printf("读取会话历史视图缓存失败 chat_id=%s: %v", normalizedChatID, cacheErr)
} else if conversationHistoryCacheCanServe(items) {
return items, nil
}
}
// 3. Redis miss 时回源 DB
// 3.1 复用现有 GetUserChatHistories 读取最近 N 条历史,保证“重试版本、落库主键、创建时间”口径稳定;
// 3.2 再把 DB 结果转换成接口 DTO作为历史视图缓存回填
// 3.3 失败时直接上抛,由 API 层统一处理。
histories, err := s.repo.GetUserChatHistories(ctx, userID, pkg.HistoryFetchLimitByModel("worker"), normalizedChatID)
if err != nil {
return nil, err
}
items := buildConversationHistoryItemsFromDB(histories)
if s.cacheDAO != nil {
if setErr := s.cacheDAO.SetConversationHistoryToCache(ctx, userID, normalizedChatID, items); setErr != nil {
log.Printf("回填会话历史视图缓存失败 chat_id=%s: %v", normalizedChatID, setErr)
}
}
return items, nil
}
// appendConversationHistoryCacheOptimistically 把“刚生成但尚未完成 DB 持久化确认”的消息追加到历史视图缓存。
//
// 职责边界:
// 1. 只服务前端会话历史展示,不参与 Agent 上下文热缓存;
// 2. 优先复用现有历史视图缓存miss 时再用 DB 历史做一次启动兜底;
// 3. 不保证最终权威性,最终仍以 DB 落库成功后的缓存失效与回源结果为准。
func (s *AgentService) appendConversationHistoryCacheOptimistically(
ctx context.Context,
userID int,
chatID string,
newItems ...model.GetConversationHistoryItem,
) {
if s == nil || s.cacheDAO == nil {
return
}
normalizedChatID := strings.TrimSpace(chatID)
if userID <= 0 || normalizedChatID == "" || len(newItems) == 0 {
return
}
if ctx == nil {
ctx = context.Background()
}
// 1. 优先取历史视图缓存,避免每轮乐观追加都回源 DB。
items, err := s.cacheDAO.GetConversationHistoryFromCache(ctx, userID, normalizedChatID)
if err != nil {
log.Printf("读取会话历史视图缓存失败 chat_id=%s: %v", normalizedChatID, err)
return
}
// 2. 缓存 miss 时,用当前 DB 已有历史做一次基线兜底。
// 2.1 这样即便本轮是“缓存刚被 retry 补种操作删掉”,也不会只留下最新两条消息;
// 2.2 失败策略DB 兜底失败只记日志并跳过,不阻塞主回复流程。
if items == nil {
histories, hisErr := s.repo.GetUserChatHistories(ctx, userID, pkg.HistoryFetchLimitByModel("worker"), normalizedChatID)
if hisErr != nil {
log.Printf("乐观追加历史缓存时回源 DB 失败 chat_id=%s: %v", normalizedChatID, hisErr)
return
}
items = buildConversationHistoryItemsFromDB(histories)
}
merged := append([]model.GetConversationHistoryItem(nil), items...)
for _, item := range newItems {
merged = appendConversationHistoryItemIfMissing(merged, item)
}
sortConversationHistoryItems(merged)
if err = s.cacheDAO.SetConversationHistoryToCache(ctx, userID, normalizedChatID, merged); err != nil {
log.Printf("乐观追加会话历史视图缓存失败 chat_id=%s: %v", normalizedChatID, err)
}
}
// buildConversationHistoryItemsFromDB 把数据库聊天记录转换为接口响应。
//
// 职责边界:
// 1. 只透传 DB 已有字段,不尝试补算 reasoning_content
// 2. message_content / role 为空时兜底为空串与 system避免空指针影响接口
// 3. 保持 DAO 返回的时间正序,前端可直接渲染。
func buildConversationHistoryItemsFromDB(histories []model.ChatHistory) []model.GetConversationHistoryItem {
items := make([]model.GetConversationHistoryItem, 0, len(histories))
for _, history := range histories {
content := ""
if history.MessageContent != nil {
content = strings.TrimSpace(*history.MessageContent)
}
role := "system"
if history.Role != nil {
role = normalizeConversationHistoryRole(*history.Role)
}
items = append(items, model.GetConversationHistoryItem{
ID: history.ID,
Role: role,
Content: content,
CreatedAt: history.CreatedAt,
ReasoningContent: strings.TrimSpace(derefConversationHistoryText(history.ReasoningContent)),
ReasoningDurationSeconds: history.ReasoningDurationSeconds,
})
}
return items
}
func derefConversationHistoryText(text *string) string {
if text == nil {
return ""
}
return *text
}
func normalizeConversationHistoryRole(role string) string {
switch strings.ToLower(strings.TrimSpace(role)) {
case "user":
return "user"
case "assistant":
return "assistant"
default:
return "system"
}
}
func conversationHistoryCacheCanServe(items []model.GetConversationHistoryItem) bool {
// 1. 历史接口一旦被前端用于“重试/编辑”等二次动作,消息 id 就必须稳定可追溯。
// 2. 乐观缓存里的新消息在 DB 落库前没有自增主键,若直接返回,会让前端拿到占位 id。
// 3. 因此只有“缓存里的每条消息都带稳定 DB id”时才允许直接命中缓存否则强制回源 DB。
for _, item := range items {
if item.ID <= 0 {
return false
}
}
return items != nil
}
func buildOptimisticConversationHistoryItem(
role string,
content string,
reasoningContent string,
reasoningDurationSeconds int,
createdAt time.Time,
) model.GetConversationHistoryItem {
item := model.GetConversationHistoryItem{
Role: normalizeConversationHistoryRole(role),
Content: strings.TrimSpace(content),
ReasoningContent: strings.TrimSpace(reasoningContent),
ReasoningDurationSeconds: reasoningDurationSeconds,
}
if !createdAt.IsZero() {
t := createdAt
item.CreatedAt = &t
}
return item
}
func appendConversationHistoryItemIfMissing(
items []model.GetConversationHistoryItem,
item model.GetConversationHistoryItem,
) []model.GetConversationHistoryItem {
targetKey := conversationHistoryItemSignature(item)
for _, existed := range items {
if conversationHistoryItemSignature(existed) == targetKey {
return items
}
}
return append(items, item)
}
func conversationHistoryItemSignature(item model.GetConversationHistoryItem) string {
if item.ID > 0 {
return fmt.Sprintf("id:%d", item.ID)
}
createdAt := ""
if item.CreatedAt != nil {
createdAt = item.CreatedAt.UTC().Format(time.RFC3339Nano)
}
return fmt.Sprintf(
"%s|%s|%s|%d|%s",
strings.TrimSpace(item.Role),
strings.TrimSpace(item.Content),
strings.TrimSpace(item.ReasoningContent),
item.ReasoningDurationSeconds,
createdAt,
)
}
func sortConversationHistoryItems(items []model.GetConversationHistoryItem) {
sort.SliceStable(items, func(i, j int) bool {
left := conversationHistoryTimestamp(items[i])
right := conversationHistoryTimestamp(items[j])
if left.Equal(right) {
return conversationHistoryItemSignature(items[i]) < conversationHistoryItemSignature(items[j])
}
return left.Before(right)
})
}
func conversationHistoryTimestamp(item model.GetConversationHistoryItem) time.Time {
if item.CreatedAt == nil {
return time.Time{}
}
return *item.CreatedAt
}

View File

@@ -159,14 +159,19 @@ func (s *AgentService) runNewAgentGraph(
}
// 6. 构造 AgentGraphRequest。
var confirmAction string
var (
confirmAction string
resumeInteractionID string
)
if len(extra) > 0 {
confirmAction = readAgentExtraString(extra, "confirm_action")
resumeInteractionID = readAgentExtraString(extra, "resume_interaction_id")
}
graphRequest := newagentmodel.AgentGraphRequest{
UserInput: userMessage,
ConfirmAction: confirmAction,
AlwaysExecute: readAgentExtraBool(extra, "always_execute"),
UserInput: userMessage,
ConfirmAction: confirmAction,
ResumeInteractionID: resumeInteractionID,
AlwaysExecute: readAgentExtraBool(extra, "always_execute"),
}
graphRequest.Normalize()
@@ -181,6 +186,10 @@ func (s *AgentService) runNewAgentGraph(
// 8. 适配 SSE emitter。
sseEmitter := newagentstream.NewSSEPayloadEmitter(outChan)
chunkEmitter := newagentstream.NewChunkEmitter(sseEmitter, traceID, resolvedModelName, requestStart.Unix())
// 关键卡片事件走统一时间线持久化,保证刷新后可重建。
chunkEmitter.SetExtraEventHook(func(extra *newagentstream.OpenAIChunkExtra) {
s.persistNewAgentTimelineExtraEvent(context.Background(), userID, chatID, extra)
})
// 9. 构造 AgentGraphDeps由 cmd/start.go 注入的依赖)。
deps := newagentmodel.AgentGraphDeps{
@@ -466,19 +475,33 @@ func (s *AgentService) persistNewAgentConversationMessage(
return err
}
now := time.Now()
s.appendConversationHistoryCacheOptimistically(
// 统一写入会话时间线,保证正文与卡片可按单一 seq 顺序重建。
timelineKind := model.AgentTimelineKindAssistantText
switch role {
case "user":
timelineKind = model.AgentTimelineKindUserText
case "assistant":
timelineKind = model.AgentTimelineKindAssistantText
}
timelinePayload := map[string]any{}
if persistPayload.ReasoningContent != "" {
timelinePayload["reasoning_content"] = persistPayload.ReasoningContent
}
if reasoningDurationSeconds > 0 {
timelinePayload["reasoning_duration_seconds"] = reasoningDurationSeconds
}
if _, err := s.appendConversationTimelineEvent(
ctx,
userID,
chatID,
buildOptimisticConversationHistoryItem(
role,
content,
persistPayload.ReasoningContent,
reasoningDurationSeconds,
now,
),
)
timelineKind,
role,
content,
timelinePayload,
tokensConsumed,
); err != nil {
return err
}
return nil
}

View File

@@ -5,27 +5,27 @@ import (
"errors"
"fmt"
"log"
"strings"
"github.com/LoveLosita/smartflow/backend/model"
newagentconv "github.com/LoveLosita/smartflow/backend/newAgent/conv"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
"github.com/LoveLosita/smartflow/backend/respond"
)
// SaveScheduleState 前端暂存日程调整到 Redis 快照
// SaveScheduleState 处理前端拖拽后的“暂存排程状态”请求
//
// 职责边界:
// 1. 负责更新 Redis 中的 ScheduleState 中 source=task_item 的任务
// 2. 接受绝对时间格式(与 apply-batch 统一),由 conv 层转换为内部相对坐标
// 3. source=event 的课程保持快照原值不变;
// 4. 不负责写 MySQL、不负责刷新预览缓存
// 5. 不负责触发 graph 执行(由 confirm_action=accept 驱动)。
// 1. 负责把前端绝对坐标写回当前会话的 ScheduleState 快照
// 2. 负责刷新 Redis 预览缓存,保证后续预览读取与最新拖拽一致
// 3. 不负责写 MySQL 正式课表,也不负责触发新一轮 graph 执行。
func (s *AgentService) SaveScheduleState(
ctx context.Context,
userID int,
conversationID string,
items []model.SaveScheduleStatePlacedItem,
) error {
// 1. 加载快照
// 1. 加载会话快照;没有快照说明当前会话不在可微调窗口内
if s.agentStateStore == nil {
return errors.New("agent state store 未初始化")
}
@@ -33,12 +33,11 @@ func (s *AgentService) SaveScheduleState(
if err != nil {
return fmt.Errorf("加载快照失败: %w", err)
}
if !ok || snapshot == nil || snapshot.ScheduleState == nil {
return respond.ScheduleStateSnapshotNotFound
}
// 2. 校验归属
// 2. 做会话归属校验,防止跨用户写入别人的会话快照
if snapshot.RuntimeState != nil {
cs := snapshot.RuntimeState.EnsureCommonState()
if cs.UserID != 0 && cs.UserID != userID {
@@ -46,17 +45,98 @@ func (s *AgentService) SaveScheduleState(
}
}
// 3. 调用 conv 层将绝对时间放置项应用到 ScheduleState。
// 3. 将前端绝对坐标应用到内存态 ScheduleState。
// 3.1 这里只修改 source=task_item 任务;
// 3.2 source=event 课程位保持不变;
// 3.3 坐标非法时由 ApplyPlacedItems 返回明确错误。
if err := newagentconv.ApplyPlacedItems(snapshot.ScheduleState, items); err != nil {
return err
}
// 4. 写回 Redis
// 4. 写回运行态快照,确保“拖拽后的状态”成为后续读链路真值
if err := s.agentStateStore.Save(ctx, conversationID, snapshot); err != nil {
return fmt.Errorf("保存快照失败: %w", err)
}
log.Printf("[INFO] schedule state saved chat=%s user=%d item_count=%d",
conversationID, userID, len(items))
// 5. 再刷新预览缓存,避免 GetSchedulePlanPreview 读到拖拽前旧缓存。
if err := s.refreshSchedulePreviewAfterStateSave(ctx, userID, conversationID, snapshot); err != nil {
return err
}
log.Printf("[INFO] schedule state saved chat=%s user=%d item_count=%d", conversationID, userID, len(items))
return nil
}
// refreshSchedulePreviewAfterStateSave 按“最新快照”重建并覆盖 Redis 预览缓存。
//
// 职责边界:
// 1. 只处理 Redis 预览缓存,不负责 MySQL 快照;
// 2. 以最新 ScheduleState 为准,修复“预览读到旧拖拽结果”的回滚问题;
// 3. 尽量保留旧预览中的 trace_id/candidate_plans避免前端字段突变。
func (s *AgentService) refreshSchedulePreviewAfterStateSave(
ctx context.Context,
userID int,
conversationID string,
snapshot *newagentmodel.AgentStateSnapshot,
) error {
// 1. 依赖不完整时直接跳过,避免写入不完整缓存。
if s == nil || s.cacheDAO == nil || snapshot == nil || snapshot.ScheduleState == nil {
return nil
}
normalizedConversationID := strings.TrimSpace(conversationID)
if normalizedConversationID == "" {
return nil
}
// 2. 从运行态提取 task_class_ids保证预览过滤口径与会话一致。
taskClassIDs := make([]int, 0)
if snapshot.RuntimeState != nil {
flowState := snapshot.RuntimeState.EnsureCommonState()
taskClassIDs = append(taskClassIDs, flowState.TaskClassIDs...)
}
// 3. 基于最新 ScheduleState 生成预览主干hybrid_entries 为最新真值)。
preview := newagentconv.ScheduleStateToPreview(
snapshot.ScheduleState,
userID,
normalizedConversationID,
taskClassIDs,
"",
)
if preview == nil {
return nil
}
// 4. 合并旧预览里需要保留的字段,避免前端依赖字段突然丢失。
existingPreview, err := s.cacheDAO.GetSchedulePlanPreviewFromCache(ctx, userID, normalizedConversationID)
if err != nil {
return fmt.Errorf("读取排程预览缓存失败: %w", err)
}
if existingPreview != nil {
preview.TraceID = strings.TrimSpace(existingPreview.TraceID)
if len(existingPreview.CandidatePlans) > 0 {
preview.CandidatePlans = cloneWeekSchedules(existingPreview.CandidatePlans)
}
if len(existingPreview.AllocatedItems) > 0 {
preview.AllocatedItems = cloneTaskClassItems(existingPreview.AllocatedItems)
}
if len(preview.TaskClassIDs) == 0 && len(existingPreview.TaskClassIDs) > 0 {
preview.TaskClassIDs = append([]int(nil), existingPreview.TaskClassIDs...)
}
}
if preview.CandidatePlans == nil {
preview.CandidatePlans = make([]model.UserWeekSchedule, 0)
}
if preview.HybridEntries == nil {
preview.HybridEntries = make([]model.HybridScheduleEntry, 0)
}
if preview.TaskClassIDs == nil {
preview.TaskClassIDs = make([]int, 0)
}
// 5. 回写 Redis 预览缓存;失败则返回错误,让前端可感知并重试。
if err := s.cacheDAO.SetSchedulePlanPreviewToCache(ctx, userID, normalizedConversationID, preview); err != nil {
return fmt.Errorf("刷新排程预览缓存失败: %w", err)
}
return nil
}

View File

@@ -0,0 +1,388 @@
package agentsvc
import (
"context"
"encoding/json"
"errors"
"log"
"strings"
"time"
"github.com/LoveLosita/smartflow/backend/model"
newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream"
"gorm.io/gorm"
)
// GetConversationTimeline 返回指定会话的统一时间线(正文+卡片)列表。
//
// 职责边界:
// 1. 只读,不修改会话状态;
// 2. 顺序以 seq 为准,保证刷新后可稳定重建;
// 3. 优先读 Redis 时间线缓存,未命中再回源 MySQL。
func (s *AgentService) GetConversationTimeline(ctx context.Context, userID int, chatID string) ([]model.GetConversationTimelineItem, error) {
normalizedChatID := normalizeConversationID(chatID)
if userID <= 0 || strings.TrimSpace(normalizedChatID) == "" {
return nil, gorm.ErrRecordNotFound
}
exists, err := s.repo.IfChatExists(ctx, userID, normalizedChatID)
if err != nil {
return nil, err
}
if !exists {
return nil, gorm.ErrRecordNotFound
}
if s.cacheDAO != nil {
cacheItems, cacheErr := s.cacheDAO.GetConversationTimelineFromCache(ctx, userID, normalizedChatID)
if cacheErr == nil && cacheItems != nil {
return normalizeConversationTimelineItems(cacheItems), nil
}
if cacheErr != nil {
log.Printf("读取会话时间线缓存失败 user=%d chat=%s err=%v", userID, normalizedChatID, cacheErr)
}
}
events, err := s.repo.ListConversationTimelineEvents(ctx, userID, normalizedChatID)
if err != nil {
return nil, err
}
items := buildConversationTimelineItemsFromDB(events)
if s.cacheDAO != nil {
if err := s.cacheDAO.SetConversationTimelineToCache(ctx, userID, normalizedChatID, items); err != nil {
log.Printf("回填会话时间线缓存失败 user=%d chat=%s err=%v", userID, normalizedChatID, err)
}
if len(items) > 0 {
if err := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, normalizedChatID, items[len(items)-1].Seq); err != nil {
log.Printf("回填会话时间线 seq 失败 user=%d chat=%s err=%v", userID, normalizedChatID, err)
}
}
}
return normalizeConversationTimelineItems(items), nil
}
// appendConversationTimelineEvent 统一追加单条时间线事件到 Redis + MySQL。
//
// 步骤化说明:
// 1. 先从 Redis INCR 分配 seq若 Redis 异常则回退 DB MAX(seq)+1
// 2. 再写 MySQL保证刷新时至少有权威持久化
// 3. 最后追加 Redis 时间线列表,失败只记日志,不影响主链路返回;
// 4. 返回分配到的 seq便于后续扩展在 SSE meta 回传顺序号。
func (s *AgentService) appendConversationTimelineEvent(
ctx context.Context,
userID int,
chatID string,
kind string,
role string,
content string,
payload map[string]any,
tokensConsumed int,
) (int64, error) {
if s == nil || s.repo == nil {
return 0, errors.New("agent service is not initialized")
}
if ctx == nil {
ctx = context.Background()
}
normalizedChatID := strings.TrimSpace(chatID)
normalizedRole := strings.TrimSpace(role)
normalizedKind := canonicalizeTimelineKind(kind, normalizedRole)
normalizedContent := strings.TrimSpace(content)
if userID <= 0 || normalizedChatID == "" || normalizedKind == "" {
return 0, errors.New("invalid timeline event identity")
}
seq, err := s.nextConversationTimelineSeq(ctx, userID, normalizedChatID)
if err != nil {
return 0, err
}
payloadJSON := marshalTimelinePayloadJSON(payload)
persistPayload := model.ChatTimelinePersistPayload{
UserID: userID,
ConversationID: normalizedChatID,
Seq: seq,
Kind: normalizedKind,
Role: normalizedRole,
Content: normalizedContent,
PayloadJSON: payloadJSON,
TokensConsumed: tokensConsumed,
}
eventID, eventCreatedAt, err := s.repo.SaveConversationTimelineEvent(ctx, persistPayload)
if err != nil {
// 1. 并发极端场景下(例如 Redis seq 分配失败后 DB 兜底)可能产生重复 seq
// 2. 这里做一次“读取最新 MAX(seq)+1”的重试避免主链路直接失败
// 3. 重试仍失败则返回错误,让调用方感知真实落库失败。
if !isTimelineSeqConflictError(err) {
return 0, err
}
maxSeq, seqErr := s.repo.GetConversationTimelineMaxSeq(ctx, userID, normalizedChatID)
if seqErr != nil {
return 0, err
}
persistPayload.Seq = maxSeq + 1
var retryErr error
eventID, eventCreatedAt, retryErr = s.repo.SaveConversationTimelineEvent(ctx, persistPayload)
if retryErr != nil {
return 0, retryErr
}
seq = persistPayload.Seq
if s.cacheDAO != nil {
if setErr := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, normalizedChatID, seq); setErr != nil {
log.Printf("时间线 seq 冲突重试后回写 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, normalizedChatID, seq, setErr)
}
}
}
if s.cacheDAO != nil {
now := time.Now()
item := model.GetConversationTimelineItem{
ID: eventID,
Seq: seq,
Kind: normalizedKind,
Role: normalizedRole,
Content: normalizedContent,
Payload: cloneTimelinePayload(payload),
TokensConsumed: tokensConsumed,
}
if eventCreatedAt != nil {
item.CreatedAt = eventCreatedAt
} else {
item.CreatedAt = &now
}
if err := s.cacheDAO.AppendConversationTimelineEventToCache(ctx, userID, normalizedChatID, item); err != nil {
log.Printf("追加会话时间线缓存失败 user=%d chat=%s seq=%d kind=%s err=%v", userID, normalizedChatID, seq, normalizedKind, err)
}
}
return seq, nil
}
func isTimelineSeqConflictError(err error) bool {
if err == nil {
return false
}
text := strings.ToLower(err.Error())
return strings.Contains(text, "duplicate") && strings.Contains(text, "uk_timeline_user_chat_seq")
}
// persistNewAgentTimelineExtraEvent 把 SSE extra 卡片事件写入时间线。
//
// 说明:
// 1. 只持久化真正需要刷新后重建的卡片事件;
// 2. status/reasoning/finish 等临时过程信号不落时间线;
// 3. 失败只记日志,不中断当前 SSE 输出。
func (s *AgentService) persistNewAgentTimelineExtraEvent(ctx context.Context, userID int, chatID string, extra *newagentstream.OpenAIChunkExtra) {
kind, ok := mapTimelineKindFromStreamExtra(extra)
if !ok {
return
}
if ctx == nil {
ctx = context.Background()
}
if _, err := s.appendConversationTimelineEvent(
ctx,
userID,
chatID,
kind,
"",
"",
buildTimelinePayloadFromStreamExtra(extra),
0,
); err != nil {
log.Printf("写入 newAgent 卡片时间线失败 user=%d chat=%s kind=%s err=%v", userID, chatID, kind, err)
}
}
func (s *AgentService) nextConversationTimelineSeq(ctx context.Context, userID int, chatID string) (int64, error) {
if s.cacheDAO != nil {
seq, err := s.cacheDAO.IncrConversationTimelineSeq(ctx, userID, chatID)
if err == nil {
return seq, nil
}
log.Printf("会话时间线 seq Redis 分配失败,回退 DB user=%d chat=%s err=%v", userID, chatID, err)
}
maxSeq, err := s.repo.GetConversationTimelineMaxSeq(ctx, userID, chatID)
if err != nil {
return 0, err
}
seq := maxSeq + 1
if s.cacheDAO != nil {
if err := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, chatID, seq); err != nil {
log.Printf("会话时间线 seq 回填 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, chatID, seq, err)
}
}
return seq, nil
}
func buildConversationTimelineItemsFromDB(events []model.AgentTimelineEvent) []model.GetConversationTimelineItem {
if len(events) == 0 {
return make([]model.GetConversationTimelineItem, 0)
}
items := make([]model.GetConversationTimelineItem, 0, len(events))
for _, event := range events {
item := model.GetConversationTimelineItem{
ID: event.ID,
Seq: event.Seq,
Kind: strings.TrimSpace(event.Kind),
TokensConsumed: event.TokensConsumed,
CreatedAt: event.CreatedAt,
}
if event.Role != nil {
item.Role = strings.TrimSpace(*event.Role)
}
if event.Content != nil {
item.Content = strings.TrimSpace(*event.Content)
}
if event.Payload != nil {
var payload map[string]any
if err := json.Unmarshal([]byte(strings.TrimSpace(*event.Payload)), &payload); err == nil && len(payload) > 0 {
item.Payload = payload
}
}
items = append(items, item)
}
return normalizeConversationTimelineItems(items)
}
// normalizeConversationTimelineItems 统一收敛 timeline 的 kind/role 口径,避免前端切分失效。
func normalizeConversationTimelineItems(items []model.GetConversationTimelineItem) []model.GetConversationTimelineItem {
if len(items) == 0 {
return make([]model.GetConversationTimelineItem, 0)
}
normalized := make([]model.GetConversationTimelineItem, 0, len(items))
for _, item := range items {
role := strings.ToLower(strings.TrimSpace(item.Role))
kind := canonicalizeTimelineKind(item.Kind, role)
// kind 缺失时尝试从 role 反推文本类型,保障“用户分段锚点”可用。
if kind == "" {
switch role {
case "user":
kind = model.AgentTimelineKindUserText
case "assistant":
kind = model.AgentTimelineKindAssistantText
}
}
// role 缺失时按文本类型补齐,减少前端额外兼容判断。
if role == "" {
switch kind {
case model.AgentTimelineKindUserText:
role = "user"
case model.AgentTimelineKindAssistantText:
role = "assistant"
}
}
item.Kind = kind
item.Role = role
normalized = append(normalized, item)
}
return normalized
}
// canonicalizeTimelineKind 统一 kind 别名,收敛到文档定义值。
func canonicalizeTimelineKind(kind string, role string) string {
normalizedKind := strings.ToLower(strings.TrimSpace(kind))
normalizedRole := strings.ToLower(strings.TrimSpace(role))
switch normalizedKind {
case model.AgentTimelineKindUserText,
model.AgentTimelineKindAssistantText,
model.AgentTimelineKindToolCall,
model.AgentTimelineKindToolResult,
model.AgentTimelineKindConfirmRequest,
model.AgentTimelineKindScheduleCompleted:
return normalizedKind
case "text", "message", "query":
if normalizedRole == "user" {
return model.AgentTimelineKindUserText
}
if normalizedRole == "assistant" {
return model.AgentTimelineKindAssistantText
}
return normalizedKind
default:
return normalizedKind
}
}
func marshalTimelinePayloadJSON(payload map[string]any) string {
if len(payload) == 0 {
return ""
}
data, err := json.Marshal(payload)
if err != nil {
return ""
}
return string(data)
}
func cloneTimelinePayload(payload map[string]any) map[string]any {
if len(payload) == 0 {
return nil
}
cloned := make(map[string]any, len(payload))
for key, value := range payload {
cloned[key] = value
}
return cloned
}
func mapTimelineKindFromStreamExtra(extra *newagentstream.OpenAIChunkExtra) (string, bool) {
if extra == nil {
return "", false
}
switch extra.Kind {
case newagentstream.StreamExtraKindToolCall:
return model.AgentTimelineKindToolCall, true
case newagentstream.StreamExtraKindToolResult:
return model.AgentTimelineKindToolResult, true
case newagentstream.StreamExtraKindConfirm:
return model.AgentTimelineKindConfirmRequest, true
case newagentstream.StreamExtraKindScheduleCompleted:
return model.AgentTimelineKindScheduleCompleted, true
default:
return "", false
}
}
func buildTimelinePayloadFromStreamExtra(extra *newagentstream.OpenAIChunkExtra) map[string]any {
if extra == nil {
return nil
}
payload := map[string]any{
"stage": strings.TrimSpace(extra.Stage),
"block_id": strings.TrimSpace(extra.BlockID),
"display_mode": string(extra.DisplayMode),
}
if extra.Tool != nil {
payload["tool"] = map[string]any{
"name": strings.TrimSpace(extra.Tool.Name),
"status": strings.TrimSpace(extra.Tool.Status),
"summary": strings.TrimSpace(extra.Tool.Summary),
"arguments_preview": strings.TrimSpace(extra.Tool.ArgumentsPreview),
}
}
if extra.Confirm != nil {
payload["confirm"] = map[string]any{
"interaction_id": strings.TrimSpace(extra.Confirm.InteractionID),
"title": strings.TrimSpace(extra.Confirm.Title),
"summary": strings.TrimSpace(extra.Confirm.Summary),
}
}
if extra.Interrupt != nil {
payload["interrupt"] = map[string]any{
"interaction_id": strings.TrimSpace(extra.Interrupt.InteractionID),
"type": strings.TrimSpace(extra.Interrupt.Type),
"summary": strings.TrimSpace(extra.Interrupt.Summary),
}
}
if len(extra.Meta) > 0 {
payload["meta"] = cloneTimelinePayload(extra.Meta)
}
return payload
}