From 0f749e9f5a35cfd995e2c87241396615be48c4c2 Mon Sep 17 00:00:00 2001 From: Losita <2810873701@qq.com> Date: Sun, 19 Apr 2026 19:03:41 +0800 Subject: [PATCH] =?UTF-8?q?Version:=200.9.32.dev.260419=20=E5=90=8E?= =?UTF-8?q?=E7=AB=AF=EF=BC=9A=201.=20=E4=BC=9A=E8=AF=9D=E5=8E=86=E5=8F=B2?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E5=88=87=E6=8D=A2=E4=B8=BA=E7=BB=9F=E4=B8=80?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E7=BA=BF=E8=AF=BB=E5=8F=96=EF=BC=8C=E5=B9=B6?= =?UTF-8?q?=E5=85=BC=E5=AE=B9=20extra.resume=20=E6=81=A2=E5=A4=8D=E5=8D=8F?= =?UTF-8?q?=E8=AE=AE=20=20=20-=20api/agent.go=EF=BC=9A=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=20resume->confirm=5Faction=20=E6=98=A0=E5=B0=84=EF=BC=88approv?= =?UTF-8?q?e/reject/cancel=EF=BC=89=EF=BC=8C=E6=81=A2=E5=A4=8D=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E7=BC=BA=20conversation=5Fid=20=E6=97=B6=E6=8B=A6?= =?UTF-8?q?=E6=88=AA=EF=BC=9BGetConversationHistory=20=E6=94=B9=E4=B8=BA?= =?UTF-8?q?=20GetConversationTimeline=20=20=20-=20routers/routers.go?= =?UTF-8?q?=EF=BC=9A=E8=B7=AF=E7=94=B1=E4=BB=8E=20GET=20/conversation-hist?= =?UTF-8?q?ory=20=E5=88=87=E6=8D=A2=E4=B8=BA=20GET=20/conversation-timelin?= =?UTF-8?q?e=20=20=20-=20model/agent.go=EF=BC=9A=E5=88=A0=E9=99=A4=20GetCo?= =?UTF-8?q?nversationHistoryItem=20=E6=97=A7=20DTO=202.=20=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E4=BC=9A=E8=AF=9D=E6=97=B6=E9=97=B4=E7=BA=BF=E6=8C=81?= =?UTF-8?q?=E4=B9=85=E5=8C=96=E9=93=BE=E8=B7=AF=EF=BC=88MySQL=20+=20Redis?= =?UTF-8?q?=EF=BC=89=20=20=20-=20=E6=96=B0=E5=A2=9E=20model/agent=5Ftimeli?= =?UTF-8?q?ne.go=EF=BC=9A=E5=AE=9A=E4=B9=89=20timeline=20kind=E3=80=81Agen?= =?UTF-8?q?tTimelineEvent=E3=80=81=E6=8C=81=E4=B9=85=E5=8C=96/=E8=BF=94?= =?UTF-8?q?=E5=9B=9E=E7=BB=93=E6=9E=84=20=20=20-=20=E6=96=B0=E5=A2=9E=20da?= =?UTF-8?q?o/agent=5Ftimeline.go=EF=BC=9A=E5=86=99=E5=85=A5=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E3=80=81=E6=8C=89=20seq=20=E6=9F=A5=E8=AF=A2=E3=80=81?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=20max=20seq=20=20=20-=20inits/mysql.go?= =?UTF-8?q?=EF=BC=9AAutoMigrate=20=E5=A2=9E=E5=8A=A0=20AgentTimelineEvent?= =?UTF-8?q?=20=20=20-=20dao/cache.go=EF=BC=9A=E6=96=B0=E5=A2=9E=20timeline?= =?UTF-8?q?=20list/seq=20key=EF=BC=8C=E6=94=AF=E6=8C=81=20incr/set=20seq?= =?UTF-8?q?=E3=80=81append/list=E3=80=81=E5=85=A8=E9=87=8F=E5=9B=9E?= =?UTF-8?q?=E5=A1=AB=E4=B8=8E=E5=88=A0=E9=99=A4=20=20=20-=20=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=20service/agentsvc/agent=5Ftimeline.go=EF=BC=9A?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E7=BA=BF=E8=AF=BB=E5=86=99=E7=BC=96=E6=8E=92?= =?UTF-8?q?=EF=BC=88Redis=20=E4=BC=98=E5=85=88=E3=80=81DB=20=E5=9B=9E?= =?UTF-8?q?=E6=BA=90=E3=80=81seq=20=E5=88=86=E9=85=8D=E4=B8=8E=E5=86=B2?= =?UTF-8?q?=E7=AA=81=E9=87=8D=E8=AF=95=E3=80=81extra=20=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E6=98=A0=E5=B0=84=EF=BC=89=203.=20=E8=81=8A=E5=A4=A9=E4=B8=BB?= =?UTF-8?q?=E9=93=BE=E8=B7=AF=E6=94=B9=E4=B8=BA=E5=86=99=E5=85=A5=20timeli?= =?UTF-8?q?ne=EF=BC=8C=E6=97=A7=20history=20=E6=9C=8D=E5=8A=A1=E4=B8=8B?= =?UTF-8?q?=E7=BA=BF=20=20=20-=20service/agentsvc/agent.go=EF=BC=9A?= =?UTF-8?q?=E6=99=AE=E9=80=9A=E8=81=8A=E5=A4=A9=E7=94=A8=E6=88=B7/?= =?UTF-8?q?=E5=8A=A9=E6=89=8B=E6=B6=88=E6=81=AF=E6=94=B9=E4=B8=BA=20append?= =?UTF-8?q?ConversationTimelineEvent=20=20=20-=20service/agentsvc/agent=5F?= =?UTF-8?q?newagent.go=EF=BC=9A=E9=80=8F=E4=BC=A0=20resume=5Finteraction?= =?UTF-8?q?=5Fid=EF=BC=9B=E6=B3=A8=E5=85=A5=20emitter=20extra=20hook=20?= =?UTF-8?q?=E6=8C=81=E4=B9=85=E5=8C=96=E5=8D=A1=E7=89=87=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=EF=BC=9B=E6=AD=A3=E6=96=87=E5=86=99=E5=85=A5=20timeline=20=20?= =?UTF-8?q?=20-=20=E5=88=A0=E9=99=A4=20service/agentsvc/agent=5Fhistory.go?= =?UTF-8?q?=EF=BC=9A=E4=B8=8B=E7=BA=BF=20conversation-history=20=E6=97=A7?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E7=BC=96=E6=8E=92=204.=20newAgent=20?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E4=B8=8E=E7=A1=AE=E8=AE=A4=E9=98=B2=E4=B8=B2?= =?UTF-8?q?=E5=8D=95=E5=A2=9E=E5=BC=BA=20=20=20-=20newAgent/model/graph=5F?= =?UTF-8?q?run=5Fstate.go=EF=BC=9AAgentGraphRequest=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=20ResumeInteractionID=20=20=20-=20newAgent/node/agent=5Fnodes.?= =?UTF-8?q?go=EF=BC=9A=E9=80=8F=E4=BC=A0=20ResumeInteractionID=20=20=20-?= =?UTF-8?q?=20newAgent/node/chat.go=EF=BC=9A=E5=A2=9E=E5=8A=A0=20stale=5Fr?= =?UTF-8?q?esume=20=E6=A0=A1=E9=AA=8C=EF=BC=9Baccept/reject=20=E5=85=BC?= =?UTF-8?q?=E5=AE=B9=20approve/cancel=EF=BC=9B=E9=9D=9E=E6=B3=95=E5=8A=A8?= =?UTF-8?q?=E4=BD=9C=E8=BF=94=E5=9B=9E=20invalid=5Fconfirm=5Faction=20=20?= =?UTF-8?q?=20-=20newAgent/stream/emitter.go=EF=BC=9A=E6=96=B0=E5=A2=9E=20?= =?UTF-8?q?extraEventHook=20/=20SetExtraEventHook=EF=BC=8C=E5=9C=A8=20extr?= =?UTF-8?q?a-only=20=E4=B8=8E=20confirm=20=E4=BA=8B=E4=BB=B6=E8=A7=A6?= =?UTF-8?q?=E5=8F=91=205.=20=E6=97=A5=E7=A8=8B=E6=9A=82=E5=AD=98=E5=90=8E?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E5=88=B7=E6=96=B0=E9=A2=84=E8=A7=88=E7=BC=93?= =?UTF-8?q?=E5=AD=98=EF=BC=8C=E9=81=BF=E5=85=8D=E8=AF=BB=E5=88=B0=E6=8B=96?= =?UTF-8?q?=E6=8B=BD=E5=89=8D=E6=97=A7=E6=95=B0=E6=8D=AE=20=20=20-=20servi?= =?UTF-8?q?ce/agentsvc/agent=5Fschedule=5Fstate.go=EF=BC=9ASave=20?= =?UTF-8?q?=E5=90=8E=E9=87=8D=E5=BB=BA=E5=B9=B6=E8=A6=86=E7=9B=96=20previe?= =?UTF-8?q?w=20=E7=BC=93=E5=AD=98=EF=BC=8C=E4=BF=9D=E7=95=99=20trace/candi?= =?UTF-8?q?date=20=E7=AD=89=E5=AD=97=E6=AE=B5=206.=20=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E5=A4=B1=E6=95=88=E7=AD=96=E7=95=A5=E8=B0=83=E6=95=B4=E5=88=B0?= =?UTF-8?q?=20timeline=20=E5=8F=A3=E5=BE=84=20=20=20-=20middleware/cache?= =?UTF-8?q?=5Fdeleter.go=EF=BC=9A=E7=A7=BB=E9=99=A4=20conversation-history?= =?UTF-8?q?=20=E5=A4=B1=E6=95=88=E9=80=BB=E8=BE=91=EF=BC=9BChatHistory/Age?= =?UTF-8?q?ntChat/AgentTimelineEvent=20=E5=8A=A0=E5=85=A5=E5=BF=BD?= =?UTF-8?q?=E7=95=A5=E9=9B=86=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 前端: 7. 新增时间线接口与类型定义 - frontend/src/api/schedule_agent.ts:新增 TimelineEvent/TimelineToolPayload/TimelineConfirmPayload 与 getConversationTimeline 8. AssistantPanel 全面对接 timeline 重建消息与卡片 - frontend/src/components/dashboard/AssistantPanel.vue:移除旧 history merge/normalize,新增 rebuildStateFromTimeline;支持 execution mode(always_execute);支持 resume-only 发送;修复 confirm 弹层手动关闭后重复弹出;会话标题显示放宽;流式中隐藏 action bar 9. 精排弹窗健壮性与交互动效优化 - frontend/src/components/assistant/ScheduleFineTuneModal.vue:previewData 支持 nullable,新增 visible 控制与 watch 初始化,补齐空值保护并调整弹窗动画 仓库: 10. 新增前端时间线接入说明文档 - docs/frontend/newagent_timeline_对接说明.md:接口、kind、payload、刷新重建与迁移建议 --- backend/api/agent.go | 64 ++- backend/dao/agent_timeline.go | 86 ++++ backend/dao/cache.go | 151 +++++-- backend/inits/mysql.go | 1 + backend/middleware/cache_deleter.go | 24 +- backend/model/agent.go | 9 - backend/model/agent_timeline.go | 63 +++ backend/newAgent/model/graph_run_state.go | 5 +- backend/newAgent/node/agent_nodes.go | 1 + backend/newAgent/node/chat.go | 44 +- backend/newAgent/stream/emitter.go | 22 + backend/routers/routers.go | 2 +- backend/service/agentsvc/agent.go | 46 ++- backend/service/agentsvc/agent_history.go | 257 ------------ backend/service/agentsvc/agent_newagent.go | 51 ++- .../service/agentsvc/agent_schedule_state.go | 106 ++++- backend/service/agentsvc/agent_timeline.go | 388 ++++++++++++++++++ docs/frontend/newagent_timeline_对接说明.md | 137 +++++++ frontend/src/api/schedule_agent.ts | 45 ++ .../assistant/ScheduleFineTuneModal.vue | 58 ++- .../components/dashboard/AssistantPanel.vue | 327 +++++++++------ frontend/src/views/DashboardView.vue | 218 +++++++++- 22 files changed, 1565 insertions(+), 540 deletions(-) create mode 100644 backend/dao/agent_timeline.go create mode 100644 backend/model/agent_timeline.go delete mode 100644 backend/service/agentsvc/agent_history.go create mode 100644 backend/service/agentsvc/agent_timeline.go create mode 100644 docs/frontend/newagent_timeline_对接说明.md diff --git a/backend/api/agent.go b/backend/api/agent.go index b01263b..081b3c2 100644 --- a/backend/api/agent.go +++ b/backend/api/agent.go @@ -34,6 +34,23 @@ func writeSSEData(w io.Writer, payload string) error { return err } +// mapResumeConfirmAction 把 extra.resume.action 映射为现有 confirm_action 口径。 +// +// 映射规则: +// 1. approve -> accept(确认执行); +// 2. reject/cancel -> reject(拒绝执行); +// 3. 兜底走 reject,避免脏值误触发执行。 +func mapResumeConfirmAction(action model.AgentResumeAction) string { + switch action { + case model.AgentResumeActionApprove: + return "accept" + case model.AgentResumeActionReject, model.AgentResumeActionCancel: + return "reject" + default: + return "reject" + } +} + func (api *AgentHandler) ChatAgent(c *gin.Context) { // 1) 设置 SSE 响应头 c.Writer.Header().Set("Content-Type", "text/event-stream") @@ -49,10 +66,34 @@ func (api *AgentHandler) ChatAgent(c *gin.Context) { return } + // 2.1 兼容新恢复协议:把 extra.resume 统一映射到现有内部字段。 + // 1. 前端新协议只传 resume,不再直接传 confirm_action; + // 2. 后端这里做一次入口归一,保证下游状态机继续按既有字段消费; + // 3. 解析失败直接返回 400,避免把非法恢复请求当普通消息继续执行。 + resumeReq, resumeErr := req.ResumeRequest() + if resumeErr != nil { + c.JSON(http.StatusBadRequest, respond.WrongParamType) + return + } + if resumeReq != nil { + if req.Extra == nil { + req.Extra = make(map[string]any) + } + req.Extra["resume_interaction_id"] = resumeReq.InteractionID + if resumeReq.IsConfirmResume() { + req.Extra["confirm_action"] = mapResumeConfirmAction(resumeReq.Action) + } + } + // 3) 规范化会话 ID conversationID := strings.TrimSpace(req.ConversationID) if conversationID == "" { - // confirm_action 需要关联已存在的会话状态,缺少 conversation_id 直接报错。 + // 恢复类请求必须关联既有会话状态,缺少 conversation_id 直接报错。 + if resumeReq != nil { + c.JSON(http.StatusBadRequest, respond.MissingConversationID) + return + } + // 兼容旧协议:confirm_action 也必须绑定已有会话。 if _, ok := req.Extra["confirm_action"]; ok { c.JSON(http.StatusBadRequest, respond.MissingConversationID) return @@ -209,29 +250,25 @@ func (api *AgentHandler) GetConversationList(c *gin.Context) { c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } -// GetConversationHistory 返回指定会话的聊天历史记录。 +// GetConversationTimeline 返回指定会话的统一时间线(正文+卡片)。 // -// 设计说明: -// 1) 该接口只读历史,不负责改写 Redis/DB 中的会话状态; -// 2) 读取顺序复用现有服务层能力:先校验归属,再查 Redis,未命中再回源 DB; -// 3) 会话不存在时统一返回 400,避免前端把无效会话误判成系统故障。 -func (api *AgentHandler) GetConversationHistory(c *gin.Context) { - // 1. 参数校验:conversation_id 必填。 +// 说明: +// 1. 该接口是新前端刷新重建的单一来源; +// 2. 返回结果已按 seq 升序,前端按数组顺序渲染即可; +// 3. 会话不存在时统一返回 400,避免误判成系统异常。 +func (api *AgentHandler) GetConversationTimeline(c *gin.Context) { conversationID := strings.TrimSpace(c.Query("conversation_id")) if conversationID == "" { c.JSON(http.StatusBadRequest, respond.MissingParam) return } - // 2. 从鉴权上下文取当前用户 ID,确保查询范围只落在“本人会话”内。 userID := c.GetInt("user_id") - // 3. 设置短超时,避免缓存抖动或慢查询长期占用连接。 ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) defer cancel() - // 4. 调 service 查询聊天历史。 - history, err := api.svc.GetConversationHistory(ctx, userID, conversationID) + timeline, err := api.svc.GetConversationTimeline(ctx, userID, conversationID) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { c.JSON(http.StatusBadRequest, respond.WrongParamType) @@ -241,8 +278,7 @@ func (api *AgentHandler) GetConversationHistory(c *gin.Context) { return } - // 5. 返回统一响应结构。 - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, history)) + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, timeline)) } // GetSchedulePlanPreview 返回“指定会话”的排程结构化预览。 diff --git a/backend/dao/agent_timeline.go b/backend/dao/agent_timeline.go new file mode 100644 index 0000000..53918ea --- /dev/null +++ b/backend/dao/agent_timeline.go @@ -0,0 +1,86 @@ +package dao + +import ( + "context" + "strings" + "time" + + "github.com/LoveLosita/smartflow/backend/model" +) + +// SaveConversationTimelineEvent 持久化单条会话时间线事件到 MySQL。 +// +// 职责边界: +// 1. 只做单条写入,不负责 seq 分配; +// 2. 只保证字段标准化(去空格、空值置 nil),不做业务语义修正; +// 3. 返回 error 让上层决定是否中断当前链路。 +func (a *AgentDAO) SaveConversationTimelineEvent(ctx context.Context, payload model.ChatTimelinePersistPayload) (int64, *time.Time, error) { + normalizedChatID := strings.TrimSpace(payload.ConversationID) + normalizedKind := strings.TrimSpace(payload.Kind) + normalizedRole := strings.TrimSpace(payload.Role) + normalizedContent := strings.TrimSpace(payload.Content) + normalizedPayloadJSON := strings.TrimSpace(payload.PayloadJSON) + + var rolePtr *string + if normalizedRole != "" { + rolePtr = &normalizedRole + } + var contentPtr *string + if normalizedContent != "" { + contentPtr = &normalizedContent + } + var payloadPtr *string + if normalizedPayloadJSON != "" { + payloadPtr = &normalizedPayloadJSON + } + + event := model.AgentTimelineEvent{ + UserID: payload.UserID, + ChatID: normalizedChatID, + Seq: payload.Seq, + Kind: normalizedKind, + Role: rolePtr, + Content: contentPtr, + Payload: payloadPtr, + TokensConsumed: payload.TokensConsumed, + } + if err := a.db.WithContext(ctx).Create(&event).Error; err != nil { + return 0, nil, err + } + return event.ID, event.CreatedAt, nil +} + +// ListConversationTimelineEvents 查询会话时间线,按 seq 正序返回。 +func (a *AgentDAO) ListConversationTimelineEvents(ctx context.Context, userID int, chatID string) ([]model.AgentTimelineEvent, error) { + normalizedChatID := strings.TrimSpace(chatID) + var events []model.AgentTimelineEvent + err := a.db.WithContext(ctx). + Where("user_id = ? AND chat_id = ?", userID, normalizedChatID). + Order("seq ASC"). + Order("id ASC"). + Find(&events).Error + if err != nil { + return nil, err + } + return events, nil +} + +// GetConversationTimelineMaxSeq 返回会话时间线当前最大 seq。 +// +// 说明: +// 1. 该方法主要用于 Redis 顺序号不可用时的 DB 兜底; +// 2. 无记录时返回 0,不视为错误; +// 3. 上层需要自行 +1 后再写入新事件。 +func (a *AgentDAO) GetConversationTimelineMaxSeq(ctx context.Context, userID int, chatID string) (int64, error) { + normalizedChatID := strings.TrimSpace(chatID) + var maxSeq int64 + err := a.db.WithContext(ctx). + Model(&model.AgentTimelineEvent{}). + Where("user_id = ? AND chat_id = ?", userID, normalizedChatID). + Select("COALESCE(MAX(seq), 0)"). + Scan(&maxSeq).Error + if err != nil { + return 0, err + } + return maxSeq, nil +} diff --git a/backend/dao/cache.go b/backend/dao/cache.go index 162fc12..1047566 100644 --- a/backend/dao/cache.go +++ b/backend/dao/cache.go @@ -37,8 +37,12 @@ func (d *CacheDAO) schedulePreviewKey(userID int, conversationID string) string return fmt.Sprintf("smartflow:schedule_preview:u:%d:c:%s", userID, conversationID) } -func (d *CacheDAO) conversationHistoryKey(userID int, conversationID string) string { - return fmt.Sprintf("smartflow:conversation_history:u:%d:c:%s", userID, conversationID) +func (d *CacheDAO) conversationTimelineKey(userID int, conversationID string) string { + return fmt.Sprintf("smartflow:conversation_timeline:u:%d:c:%s", userID, conversationID) +} + +func (d *CacheDAO) conversationTimelineSeqKey(userID int, conversationID string) string { + return fmt.Sprintf("smartflow:conversation_timeline_seq:u:%d:c:%s", userID, conversationID) } // SetBlacklist 把 Token 写入黑名单。 @@ -450,13 +454,59 @@ func (d *CacheDAO) DeleteSchedulePlanPreviewFromCache(ctx context.Context, userI return d.client.Del(ctx, d.schedulePreviewKey(userID, normalizedConversationID)).Err() } -// SetConversationHistoryToCache 写入“会话历史视图”缓存。 +// IncrConversationTimelineSeq 原子递增并返回会话时间线 seq。 // -// 职责边界: -// 1. 负责按 user_id + conversation_id 写入前端历史查询所需的稳定 DTO; -// 2. 只负责缓存当前可展示历史,不负责上下文窗口缓存; -// 3. 不负责 DB 回源,也不负责重试分组补算。 -func (d *CacheDAO) SetConversationHistoryToCache(ctx context.Context, userID int, conversationID string, items []model.GetConversationHistoryItem) error { +// 说明: +// 1. seq 只在同一 user_id + conversation_id 维度内递增; +// 2. 使用 Redis INCR 保证并发下不会拿到重复顺序号; +// 3. 该 key 也会设置 TTL,避免长尾会话长期占用缓存。 +func (d *CacheDAO) IncrConversationTimelineSeq(ctx context.Context, userID int, conversationID string) (int64, error) { + if d == nil || d.client == nil { + return 0, errors.New("cache dao is not initialized") + } + if userID <= 0 { + return 0, fmt.Errorf("invalid user_id: %d", userID) + } + normalizedConversationID := strings.TrimSpace(conversationID) + if normalizedConversationID == "" { + return 0, errors.New("conversation_id is empty") + } + + key := d.conversationTimelineSeqKey(userID, normalizedConversationID) + pipe := d.client.Pipeline() + incrCmd := pipe.Incr(ctx, key) + pipe.Expire(ctx, key, 24*time.Hour) + if _, err := pipe.Exec(ctx); err != nil { + return 0, err + } + return incrCmd.Val(), nil +} + +// SetConversationTimelineSeq 强制设置会话时间线当前 seq(DB 回填 Redis 兜底场景)。 +func (d *CacheDAO) SetConversationTimelineSeq(ctx context.Context, userID int, conversationID string, seq int64) error { + if d == nil || d.client == nil { + return errors.New("cache dao is not initialized") + } + if userID <= 0 { + return fmt.Errorf("invalid user_id: %d", userID) + } + normalizedConversationID := strings.TrimSpace(conversationID) + if normalizedConversationID == "" { + return errors.New("conversation_id is empty") + } + if seq < 0 { + seq = 0 + } + return d.client.Set(ctx, d.conversationTimelineSeqKey(userID, normalizedConversationID), seq, 24*time.Hour).Err() +} + +// AppendConversationTimelineEventToCache 追加单条时间线缓存事件。 +func (d *CacheDAO) AppendConversationTimelineEventToCache( + ctx context.Context, + userID int, + conversationID string, + item model.GetConversationTimelineItem, +) error { if d == nil || d.client == nil { return errors.New("cache dao is not initialized") } @@ -468,20 +518,53 @@ func (d *CacheDAO) SetConversationHistoryToCache(ctx context.Context, userID int return errors.New("conversation_id is empty") } - data, err := json.Marshal(items) + data, err := json.Marshal(item) if err != nil { - return fmt.Errorf("marshal conversation history failed: %w", err) + return fmt.Errorf("marshal conversation timeline item failed: %w", err) } - return d.client.Set(ctx, d.conversationHistoryKey(userID, normalizedConversationID), data, 1*time.Hour).Err() + + key := d.conversationTimelineKey(userID, normalizedConversationID) + pipe := d.client.Pipeline() + pipe.RPush(ctx, key, data) + pipe.Expire(ctx, key, 24*time.Hour) + _, err = pipe.Exec(ctx) + return err } -// GetConversationHistoryFromCache 读取“会话历史视图”缓存。 -// -// 输入输出语义: -// 1. 命中时返回历史 DTO 切片与 nil error; -// 2. 未命中时返回 (nil, nil); -// 3. Redis 异常或反序列化失败时返回 error。 -func (d *CacheDAO) GetConversationHistoryFromCache(ctx context.Context, userID int, conversationID string) ([]model.GetConversationHistoryItem, error) { +// SetConversationTimelineToCache 全量回填时间线缓存。 +func (d *CacheDAO) SetConversationTimelineToCache(ctx context.Context, userID int, conversationID string, items []model.GetConversationTimelineItem) error { + if d == nil || d.client == nil { + return errors.New("cache dao is not initialized") + } + if userID <= 0 { + return fmt.Errorf("invalid user_id: %d", userID) + } + normalizedConversationID := strings.TrimSpace(conversationID) + if normalizedConversationID == "" { + return errors.New("conversation_id is empty") + } + + key := d.conversationTimelineKey(userID, normalizedConversationID) + pipe := d.client.Pipeline() + pipe.Del(ctx, key) + if len(items) > 0 { + values := make([]interface{}, 0, len(items)) + for _, item := range items { + data, err := json.Marshal(item) + if err != nil { + return fmt.Errorf("marshal conversation timeline item failed: %w", err) + } + values = append(values, data) + } + pipe.RPush(ctx, key, values...) + } + pipe.Expire(ctx, key, 24*time.Hour) + _, err := pipe.Exec(ctx) + return err +} + +// GetConversationTimelineFromCache 读取时间线缓存(按 seq 正序)。 +func (d *CacheDAO) GetConversationTimelineFromCache(ctx context.Context, userID int, conversationID string) ([]model.GetConversationTimelineItem, error) { if d == nil || d.client == nil { return nil, errors.New("cache dao is not initialized") } @@ -493,28 +576,30 @@ func (d *CacheDAO) GetConversationHistoryFromCache(ctx context.Context, userID i return nil, errors.New("conversation_id is empty") } - raw, err := d.client.Get(ctx, d.conversationHistoryKey(userID, normalizedConversationID)).Result() + rawItems, err := d.client.LRange(ctx, d.conversationTimelineKey(userID, normalizedConversationID), 0, -1).Result() if err == redis.Nil { return nil, nil } if err != nil { return nil, err } + if len(rawItems) == 0 { + return nil, nil + } - var items []model.GetConversationHistoryItem - if err = json.Unmarshal([]byte(raw), &items); err != nil { - return nil, fmt.Errorf("unmarshal conversation history failed: %w", err) + items := make([]model.GetConversationTimelineItem, 0, len(rawItems)) + for _, raw := range rawItems { + var item model.GetConversationTimelineItem + if err := json.Unmarshal([]byte(raw), &item); err != nil { + return nil, fmt.Errorf("unmarshal conversation timeline item failed: %w", err) + } + items = append(items, item) } return items, nil } -// DeleteConversationHistoryFromCache 删除“会话历史视图”缓存。 -// -// 说明: -// 1. 删除操作是幂等的,key 不存在也视为成功; -// 2. 该方法用于 chat_histories 写入/补种 retry 分组后触发失效; -// 3. 这里只处理前端历史视图缓存,不影响 Agent 上下文热缓存。 -func (d *CacheDAO) DeleteConversationHistoryFromCache(ctx context.Context, userID int, conversationID string) error { +// DeleteConversationTimelineFromCache 删除时间线缓存和 seq 缓存。 +func (d *CacheDAO) DeleteConversationTimelineFromCache(ctx context.Context, userID int, conversationID string) error { if d == nil || d.client == nil { return errors.New("cache dao is not initialized") } @@ -525,7 +610,11 @@ func (d *CacheDAO) DeleteConversationHistoryFromCache(ctx context.Context, userI if normalizedConversationID == "" { return errors.New("conversation_id is empty") } - return d.client.Del(ctx, d.conversationHistoryKey(userID, normalizedConversationID)).Err() + return d.client.Del( + ctx, + d.conversationTimelineKey(userID, normalizedConversationID), + d.conversationTimelineSeqKey(userID, normalizedConversationID), + ).Err() } // agentStateKey 返回 agent 运行态快照的 Redis key。 @@ -615,7 +704,7 @@ const ( // memoryPrefetchKey 生成用户+会话维度的记忆预取缓存 key。 // -// 1. 格式:smartflow:memory_prefetch:u:{userID}:c:{chatID},与 conversationHistoryKey / schedulePreviewKey 命名风格一致; +// 1. 格式:smartflow:memory_prefetch:u:{userID}:c:{chatID},与 conversationTimelineKey / schedulePreviewKey 命名风格一致; // 2. chatID 为空时 key 为 smartflow:memory_prefetch:u:5:c:,仍然合法且唯一,不会与其他会话 key 冲突; // 3. 加 chatID 隔离后,不同会话各自维护独立的预取缓存,避免会话间记忆上下文互相覆盖。 func (d *CacheDAO) memoryPrefetchKey(userID int, chatID string) string { diff --git a/backend/inits/mysql.go b/backend/inits/mysql.go index e2f02b0..576ad93 100644 --- a/backend/inits/mysql.go +++ b/backend/inits/mysql.go @@ -15,6 +15,7 @@ func autoMigrateModels(db *gorm.DB) error { &model.User{}, &model.AgentChat{}, &model.ChatHistory{}, + &model.AgentTimelineEvent{}, &model.Task{}, &model.TaskClass{}, &model.TaskClassItem{}, diff --git a/backend/middleware/cache_deleter.go b/backend/middleware/cache_deleter.go index 02e8692..4a81a8d 100644 --- a/backend/middleware/cache_deleter.go +++ b/backend/middleware/cache_deleter.go @@ -74,10 +74,6 @@ func (p *GormCachePlugin) dispatchCacheLogic(modelObj interface{}) { p.invalidTaskCache(m.UserID) case model.AgentScheduleState: p.invalidSchedulePlanPreviewCache(m.UserID, m.ConversationID) - case model.ChatHistory: - p.invalidConversationHistoryCache(m.UserID, m.ChatID) - case model.AgentChat: - p.invalidConversationHistoryCache(m.UserID, m.ChatID) case model.MemoryItem: // 1. 管理面删除/修改/恢复/新增记忆时,自动失效该用户所有会话的预取缓存; // 2. repo 方法通过 Model(&model.MemoryItem{UserID: userID}) 携带 userID, @@ -86,6 +82,9 @@ func (p *GormCachePlugin) dispatchCacheLogic(modelObj interface{}) { p.invalidMemoryPrefetchCache(m.UserID) case model.AgentOutboxMessage, model.User, + model.ChatHistory, + model.AgentChat, + model.AgentTimelineEvent, model.AgentStateSnapshotRecord, model.MemoryJob, model.MemoryAuditLog, @@ -152,23 +151,6 @@ func (p *GormCachePlugin) invalidSchedulePlanPreviewCache(userID int, conversati }() } -func (p *GormCachePlugin) invalidConversationHistoryCache(userID int, conversationID string) { - normalizedConversationID := strings.TrimSpace(conversationID) - if userID == 0 || normalizedConversationID == "" { - return - } - - go func() { - // 1. 聊天历史写入或重试补种后,删除历史视图缓存,保证下次列表/详情能拿到最新版本。 - // 2. 这里只清“前台历史视图缓存”,不碰 LLM 上下文热缓存,避免影响首 token 体验。 - if err := p.cacheDAO.DeleteConversationHistoryFromCache(context.Background(), userID, normalizedConversationID); err != nil { - log.Printf("[GORM-Cache] Failed to invalidate conversation history cache for user %d conversation %s: %v", userID, normalizedConversationID, err) - return - } - log.Printf("[GORM-Cache] Invalidated conversation history cache for user %d conversation %s", userID, normalizedConversationID) - }() -} - // invalidMemoryPrefetchCache 失效指定用户所有会话的记忆预取缓存。 // // 步骤化说明: diff --git a/backend/model/agent.go b/backend/model/agent.go index 411b96b..75d6e91 100644 --- a/backend/model/agent.go +++ b/backend/model/agent.go @@ -220,15 +220,6 @@ type GetConversationListResponse struct { HasMore bool `json:"has_more"` } -type GetConversationHistoryItem struct { - ID int `json:"id,omitempty"` - Role string `json:"role"` - Content string `json:"content"` - CreatedAt *time.Time `json:"created_at,omitempty"` - ReasoningContent string `json:"reasoning_content,omitempty"` - ReasoningDurationSeconds int `json:"reasoning_duration_seconds,omitempty"` -} - type SchedulePlanPreviewCache struct { UserID int `json:"user_id"` ConversationID string `json:"conversation_id"` diff --git a/backend/model/agent_timeline.go b/backend/model/agent_timeline.go new file mode 100644 index 0000000..3f097ea --- /dev/null +++ b/backend/model/agent_timeline.go @@ -0,0 +1,63 @@ +package model + +import "time" + +// AgentTimelineKind 定义会话时间线事件类型。 +// +// 说明: +// 1. 这些类型面向前端渲染,要求语义稳定,不随节点内部实现细节频繁变化; +// 2. 文本消息和卡片事件共用一条时间线,前端只按 seq 顺序渲染; +// 3. token 统计仍以 chat_histories / agent_chats 为准,时间线只做展示顺序与结构承载。 +const ( + AgentTimelineKindUserText = "user_text" + AgentTimelineKindAssistantText = "assistant_text" + AgentTimelineKindToolCall = "tool_call" + AgentTimelineKindToolResult = "tool_result" + AgentTimelineKindConfirmRequest = "confirm_request" + AgentTimelineKindScheduleCompleted = "schedule_completed" +) + +// AgentTimelineEvent 表示会话里“可展示事件”的统一持久化记录。 +// +// 职责边界: +// 1. 只承载“顺序 + 展示信息”,不替代 chat_histories 的消息账本职责; +// 2. seq 是同一会话内的单调递增顺序号,用于刷新后重建展示顺序; +// 3. payload 只保存前端渲染需要的结构化信息,不存整个运行时快照。 +type AgentTimelineEvent struct { + ID int64 `gorm:"column:id;primaryKey;autoIncrement"` + UserID int `gorm:"column:user_id;not null;uniqueIndex:uk_timeline_user_chat_seq,priority:1;index:idx_timeline_user_chat_created,priority:1;comment:所属用户ID"` + ChatID string `gorm:"column:chat_id;type:varchar(36);not null;uniqueIndex:uk_timeline_user_chat_seq,priority:2;index:idx_timeline_user_chat_created,priority:2;comment:会话UUID"` + Seq int64 `gorm:"column:seq;not null;uniqueIndex:uk_timeline_user_chat_seq,priority:3;comment:会话内顺序号"` + Kind string `gorm:"column:kind;type:varchar(64);not null;comment:事件类型"` + Role *string `gorm:"column:role;type:varchar(32);comment:消息角色"` + Content *string `gorm:"column:content;type:text;comment:正文内容"` + Payload *string `gorm:"column:payload;type:json;comment:结构化负载"` + TokensConsumed int `gorm:"column:tokens_consumed;not null;default:0;comment:该事件关联token,默认0"` + CreatedAt *time.Time `gorm:"column:created_at;autoCreateTime;index:idx_timeline_user_chat_created,priority:3"` +} + +func (AgentTimelineEvent) TableName() string { return "agent_timeline_events" } + +// ChatTimelinePersistPayload 定义时间线单条事件落库输入。 +type ChatTimelinePersistPayload struct { + UserID int `json:"user_id"` + ConversationID string `json:"conversation_id"` + Seq int64 `json:"seq"` + Kind string `json:"kind"` + Role string `json:"role,omitempty"` + Content string `json:"content,omitempty"` + PayloadJSON string `json:"payload_json,omitempty"` + TokensConsumed int `json:"tokens_consumed"` +} + +// GetConversationTimelineItem 定义前端读取时间线接口的单条返回项。 +type GetConversationTimelineItem struct { + ID int64 `json:"id,omitempty"` + Seq int64 `json:"seq"` + Kind string `json:"kind"` + Role string `json:"role,omitempty"` + Content string `json:"content,omitempty"` + Payload map[string]any `json:"payload,omitempty"` + TokensConsumed int `json:"tokens_consumed,omitempty"` + CreatedAt *time.Time `json:"created_at,omitempty"` +} diff --git a/backend/newAgent/model/graph_run_state.go b/backend/newAgent/model/graph_run_state.go index cd46b72..b0dc374 100644 --- a/backend/newAgent/model/graph_run_state.go +++ b/backend/newAgent/model/graph_run_state.go @@ -21,7 +21,9 @@ import ( type AgentGraphRequest struct { UserInput string ConfirmAction string // "accept" / "reject" / "",仅 confirm 恢复场景由前端传入 - AlwaysExecute bool // true 时写工具跳过确认闸门直接执行,适合前端已展示预览、用户无需逐步确认的场景 + // ResumeInteractionID 用于校验“本次恢复请求”是否命中了当前 pending 交互,避免旧卡片误恢复。 + ResumeInteractionID string + AlwaysExecute bool // true 时写工具跳过确认闸门直接执行,适合前端已展示预览、用户无需逐步确认的场景 } // Normalize 统一清洗请求级输入中的字符串字段。 @@ -31,6 +33,7 @@ func (r *AgentGraphRequest) Normalize() { } r.UserInput = strings.TrimSpace(r.UserInput) r.ConfirmAction = strings.TrimSpace(r.ConfirmAction) + r.ResumeInteractionID = strings.TrimSpace(r.ResumeInteractionID) } // RoughBuildPlacement 是粗排算法返回的单条放置结果。 diff --git a/backend/newAgent/node/agent_nodes.go b/backend/newAgent/node/agent_nodes.go index 5ff38db..919f0f8 100644 --- a/backend/newAgent/node/agent_nodes.go +++ b/backend/newAgent/node/agent_nodes.go @@ -39,6 +39,7 @@ func (n *AgentNodes) Chat(ctx context.Context, st *newagentmodel.AgentGraphState ConversationContext: st.EnsureConversationContext(), UserInput: st.Request.UserInput, ConfirmAction: st.Request.ConfirmAction, + ResumeInteractionID: st.Request.ResumeInteractionID, Client: st.Deps.ResolveChatClient(), ChunkEmitter: st.EnsureChunkEmitter(), CompactionStore: st.Deps.CompactionStore, diff --git a/backend/newAgent/node/chat.go b/backend/newAgent/node/chat.go index 7ab2444..9cdc373 100644 --- a/backend/newAgent/node/chat.go +++ b/backend/newAgent/node/chat.go @@ -49,6 +49,7 @@ type ChatNodeInput struct { ConversationContext *newagentmodel.ConversationContext UserInput string ConfirmAction string + ResumeInteractionID string Client *infrallm.Client ChunkEmitter *newagentstream.ChunkEmitter CompactionStore newagentmodel.CompactionStore // 上下文压缩持久化 @@ -679,6 +680,14 @@ func handleChatResume( pending := runtimeState.PendingInteraction flowState := runtimeState.EnsureCommonState() + if isMismatchedResumeInteraction(input.ResumeInteractionID, pending) { + _ = emitter.EmitStatus( + chatStatusBlockID, chatStageName, + "stale_resume", "当前确认已过期,请刷新后重试。", false, + ) + return nil + } + // 用户输入在 service 层进入 graph 前已经统一追加到 ConversationContext。 // 这里不再二次写入,避免 pending 恢复路径把同一轮 user message 追加两次。 @@ -715,10 +724,18 @@ func handleConfirmResume( pending *newagentmodel.PendingInteraction, emitter *newagentstream.ChunkEmitter, ) error { + if isMismatchedResumeInteraction(input.ResumeInteractionID, pending) { + _ = emitter.EmitStatus( + chatStatusBlockID, chatStageName, + "stale_resume", "当前确认已过期,请刷新后重试。", false, + ) + return nil + } + action := strings.ToLower(strings.TrimSpace(input.ConfirmAction)) switch action { - case "accept": + case "accept", "approve": // 恢复前保存待执行工具,Execute 节点需要它。 pendingTool := pending.PendingTool runtimeState.ResumeFromPending() @@ -733,7 +750,7 @@ func handleConfirmResume( "confirmed", "已确认,开始执行。", false, ) - case "reject": + case "reject", "cancel": runtimeState.ResumeFromPending() if pending.PendingTool != nil { // 工具确认被拒 → 回到 executing 换策略。 @@ -748,17 +765,26 @@ func handleConfirmResume( ) default: - // 无合法 confirm action → 保守:等同于 reject。 - runtimeState.ResumeFromPending() - if pending.PendingTool != nil { - flowState.Phase = newagentmodel.PhaseExecuting - } else { - flowState.RejectPlan() - } + _ = emitter.EmitStatus( + chatStatusBlockID, chatStageName, + "invalid_confirm_action", "未识别确认动作,请重试。", false, + ) } return nil } +func isMismatchedResumeInteraction(resumeInteractionID string, pending *newagentmodel.PendingInteraction) bool { + if pending == nil { + return false + } + resumeID := strings.TrimSpace(resumeInteractionID) + pendingID := strings.TrimSpace(pending.InteractionID) + if resumeID == "" || pendingID == "" { + return false + } + return resumeID != pendingID +} + // prepareChatNodeInput 校验并准备聊天节点的运行态依赖。 func prepareChatNodeInput(input ChatNodeInput) ( *newagentmodel.AgentRuntimeState, diff --git a/backend/newAgent/stream/emitter.go b/backend/newAgent/stream/emitter.go index 3edbf98..d5d0956 100644 --- a/backend/newAgent/stream/emitter.go +++ b/backend/newAgent/stream/emitter.go @@ -62,6 +62,11 @@ type ChunkEmitter struct { RequestID string ModelName string Created int64 + // extraEventHook 用于把关键结构化事件同步给上层做持久化。 + // 1. hook 失败不能影响 SSE 主链路; + // 2. hook 只接收 extra 结构,避免 emitter 反向依赖业务层; + // 3. 不注入时保持空实现,兼容旧调用路径。 + extraEventHook func(extra *OpenAIChunkExtra) } // NoopPayloadEmitter 返回一个空实现,便于骨架期安全占位。 @@ -109,6 +114,14 @@ func NewChunkEmitter(emit PayloadEmitter, requestID, modelName string, created i } } +// SetExtraEventHook 设置结构化事件回调。 +func (e *ChunkEmitter) SetExtraEventHook(hook func(extra *OpenAIChunkExtra)) { + if e == nil { + return + } + e.extraEventHook = hook +} + // EmitReasoningText 输出一段 reasoning 文字,并附带 reasoning_text extra。 func (e *ChunkEmitter) EmitReasoningText(blockID, stage, text string, includeRole bool) error { if e == nil || e.emit == nil { @@ -233,6 +246,7 @@ func (e *ChunkEmitter) emitExtraOnly(extra *OpenAIChunkExtra) error { if e == nil || e.emit == nil { return nil } + e.emitExtraEventHook(extra) payload, err := ToOpenAIStreamWithExtra( nil, e.RequestID, @@ -250,6 +264,13 @@ func (e *ChunkEmitter) emitExtraOnly(extra *OpenAIChunkExtra) error { return e.emit(payload) } +func (e *ChunkEmitter) emitExtraEventHook(extra *OpenAIChunkExtra) { + if e == nil || e.extraEventHook == nil || extra == nil { + return + } + e.extraEventHook(extra) +} + // EmitConfirmRequest 输出一次待确认事件。 // // 当前展示策略: @@ -263,6 +284,7 @@ func (e *ChunkEmitter) EmitConfirmRequest(ctx context.Context, blockID, stage, i text := buildConfirmAssistantText(title, summary) extra := NewConfirmRequestExtra(blockID, stage, interactionID, title, summary) + e.emitExtraEventHook(extra) return e.emitPseudoText( ctx, text, diff --git a/backend/routers/routers.go b/backend/routers/routers.go index 464bf6d..55d5e19 100644 --- a/backend/routers/routers.go +++ b/backend/routers/routers.go @@ -93,7 +93,7 @@ func RegisterRouters(handlers *api.ApiHandlers, cache *dao.CacheDAO, userRepo *d agentGroup.POST("/chat", middleware.TokenQuotaGuard(cache, userRepo), handlers.AgentHandler.ChatAgent) agentGroup.GET("/conversation-meta", handlers.AgentHandler.GetConversationMeta) agentGroup.GET("/conversation-list", handlers.AgentHandler.GetConversationList) - agentGroup.GET("/conversation-history", handlers.AgentHandler.GetConversationHistory) + agentGroup.GET("/conversation-timeline", handlers.AgentHandler.GetConversationTimeline) agentGroup.GET("/schedule-preview", handlers.AgentHandler.GetSchedulePlanPreview) agentGroup.GET("/context-stats", handlers.AgentHandler.GetContextStats) agentGroup.POST("/schedule-state", handlers.AgentHandler.SaveScheduleState) diff --git a/backend/service/agentsvc/agent.go b/backend/service/agentsvc/agent.go index a96267b..2368c4b 100644 --- a/backend/service/agentsvc/agent.go +++ b/backend/service/agentsvc/agent.go @@ -386,18 +386,19 @@ func (s *AgentService) runNormalChatFlow( pushErrNonBlocking(errChan, err) return } - s.appendConversationHistoryCacheOptimistically( - context.Background(), + if _, timelineErr := s.appendConversationTimelineEvent( + ctx, userID, chatID, - buildOptimisticConversationHistoryItem( - "user", - userMessage, - "", - 0, - requestStart, - ), - ) + model.AgentTimelineKindUserText, + "user", + userMessage, + nil, + 0, + ); timelineErr != nil { + pushErrNonBlocking(errChan, timelineErr) + return + } } // 普通聊天链路也需要把助手回复写入 Redis, @@ -425,18 +426,25 @@ func (s *AgentService) runNormalChatFlow( }); saveErr != nil { pushErrNonBlocking(errChan, saveErr) } else { - s.appendConversationHistoryCacheOptimistically( + assistantTimelinePayload := map[string]any{} + if strings.TrimSpace(assistantReasoning) != "" { + assistantTimelinePayload["reasoning_content"] = strings.TrimSpace(assistantReasoning) + } + if reasoningDurationSeconds > 0 { + assistantTimelinePayload["reasoning_duration_seconds"] = reasoningDurationSeconds + } + if _, timelineErr := s.appendConversationTimelineEvent( context.Background(), userID, chatID, - buildOptimisticConversationHistoryItem( - "assistant", - fullText, - assistantReasoning, - reasoningDurationSeconds, - time.Now(), - ), - ) + model.AgentTimelineKindAssistantText, + "assistant", + fullText, + assistantTimelinePayload, + requestTotalTokens, + ); timelineErr != nil { + pushErrNonBlocking(errChan, timelineErr) + } } // 9. 在主回复完成后异步尝试生成会话标题(仅首次、仅标题为空时生效)。 diff --git a/backend/service/agentsvc/agent_history.go b/backend/service/agentsvc/agent_history.go deleted file mode 100644 index 446c675..0000000 --- a/backend/service/agentsvc/agent_history.go +++ /dev/null @@ -1,257 +0,0 @@ -package agentsvc - -import ( - "context" - "fmt" - "log" - "sort" - "strings" - "time" - - "github.com/LoveLosita/smartflow/backend/model" - "github.com/LoveLosita/smartflow/backend/pkg" - "github.com/LoveLosita/smartflow/backend/respond" - "gorm.io/gorm" -) - -// GetConversationHistory 返回指定会话的聊天历史。 -// -// 职责边界: -// 1. 负责会话 ID 归一化、会话归属校验,以及“先 Redis、后 DB”的读取编排; -// 2. 负责把缓存消息 / DB 记录统一转换为 API 响应 DTO; -// 3. 不负责补写会话标题,也不负责修改聊天主链路的缓存写入策略。 -func (s *AgentService) GetConversationHistory(ctx context.Context, userID int, chatID string) ([]model.GetConversationHistoryItem, error) { - normalizedChatID := strings.TrimSpace(chatID) - if normalizedChatID == "" { - return nil, respond.MissingParam - } - - // 1. 先做归属校验: - // 1.1 Redis 历史缓存只按 chat_id 分桶,不能单靠缓存判断用户归属; - // 1.2 因此先查会话是否属于当前用户,避免命中别人会话缓存时产生越权读取; - // 1.3 若会话不存在,统一返回 gorm.ErrRecordNotFound,交由 API 层映射为参数错误。 - exists, err := s.repo.IfChatExists(ctx, userID, normalizedChatID) - if err != nil { - return nil, err - } - if !exists { - return nil, gorm.ErrRecordNotFound - } - - // 2. 优先读取“会话历史视图缓存”: - // 2.1 这层缓存专门服务 conversation-history,字段口径与前端展示一致; - // 2.2 与 Agent 上下文热缓存解耦,避免为了历史多版本而拖慢首 token; - // 2.3 若命中则直接返回,miss 再回源 DB。 - if s.cacheDAO != nil { - items, cacheErr := s.cacheDAO.GetConversationHistoryFromCache(ctx, userID, normalizedChatID) - if cacheErr != nil { - log.Printf("读取会话历史视图缓存失败 chat_id=%s: %v", normalizedChatID, cacheErr) - } else if conversationHistoryCacheCanServe(items) { - return items, nil - } - } - - // 3. Redis miss 时回源 DB: - // 3.1 复用现有 GetUserChatHistories 读取最近 N 条历史,保证“重试版本、落库主键、创建时间”口径稳定; - // 3.2 再把 DB 结果转换成接口 DTO,作为历史视图缓存回填; - // 3.3 失败时直接上抛,由 API 层统一处理。 - histories, err := s.repo.GetUserChatHistories(ctx, userID, pkg.HistoryFetchLimitByModel("worker"), normalizedChatID) - if err != nil { - return nil, err - } - - items := buildConversationHistoryItemsFromDB(histories) - - if s.cacheDAO != nil { - if setErr := s.cacheDAO.SetConversationHistoryToCache(ctx, userID, normalizedChatID, items); setErr != nil { - log.Printf("回填会话历史视图缓存失败 chat_id=%s: %v", normalizedChatID, setErr) - } - } - - return items, nil -} - -// appendConversationHistoryCacheOptimistically 把“刚生成但尚未完成 DB 持久化确认”的消息追加到历史视图缓存。 -// -// 职责边界: -// 1. 只服务前端会话历史展示,不参与 Agent 上下文热缓存; -// 2. 优先复用现有历史视图缓存,miss 时再用 DB 历史做一次启动兜底; -// 3. 不保证最终权威性,最终仍以 DB 落库成功后的缓存失效与回源结果为准。 -func (s *AgentService) appendConversationHistoryCacheOptimistically( - ctx context.Context, - userID int, - chatID string, - newItems ...model.GetConversationHistoryItem, -) { - if s == nil || s.cacheDAO == nil { - return - } - normalizedChatID := strings.TrimSpace(chatID) - if userID <= 0 || normalizedChatID == "" || len(newItems) == 0 { - return - } - if ctx == nil { - ctx = context.Background() - } - - // 1. 优先取历史视图缓存,避免每轮乐观追加都回源 DB。 - items, err := s.cacheDAO.GetConversationHistoryFromCache(ctx, userID, normalizedChatID) - if err != nil { - log.Printf("读取会话历史视图缓存失败 chat_id=%s: %v", normalizedChatID, err) - return - } - - // 2. 缓存 miss 时,用当前 DB 已有历史做一次基线兜底。 - // 2.1 这样即便本轮是“缓存刚被 retry 补种操作删掉”,也不会只留下最新两条消息; - // 2.2 失败策略:DB 兜底失败只记日志并跳过,不阻塞主回复流程。 - if items == nil { - histories, hisErr := s.repo.GetUserChatHistories(ctx, userID, pkg.HistoryFetchLimitByModel("worker"), normalizedChatID) - if hisErr != nil { - log.Printf("乐观追加历史缓存时回源 DB 失败 chat_id=%s: %v", normalizedChatID, hisErr) - return - } - items = buildConversationHistoryItemsFromDB(histories) - } - - merged := append([]model.GetConversationHistoryItem(nil), items...) - for _, item := range newItems { - merged = appendConversationHistoryItemIfMissing(merged, item) - } - sortConversationHistoryItems(merged) - - if err = s.cacheDAO.SetConversationHistoryToCache(ctx, userID, normalizedChatID, merged); err != nil { - log.Printf("乐观追加会话历史视图缓存失败 chat_id=%s: %v", normalizedChatID, err) - } -} - -// buildConversationHistoryItemsFromDB 把数据库聊天记录转换为接口响应。 -// -// 职责边界: -// 1. 只透传 DB 已有字段,不尝试补算 reasoning_content; -// 2. message_content / role 为空时兜底为空串与 system,避免空指针影响接口; -// 3. 保持 DAO 返回的时间正序,前端可直接渲染。 -func buildConversationHistoryItemsFromDB(histories []model.ChatHistory) []model.GetConversationHistoryItem { - items := make([]model.GetConversationHistoryItem, 0, len(histories)) - for _, history := range histories { - content := "" - if history.MessageContent != nil { - content = strings.TrimSpace(*history.MessageContent) - } - - role := "system" - if history.Role != nil { - role = normalizeConversationHistoryRole(*history.Role) - } - - items = append(items, model.GetConversationHistoryItem{ - ID: history.ID, - Role: role, - Content: content, - CreatedAt: history.CreatedAt, - ReasoningContent: strings.TrimSpace(derefConversationHistoryText(history.ReasoningContent)), - ReasoningDurationSeconds: history.ReasoningDurationSeconds, - }) - } - return items -} - -func derefConversationHistoryText(text *string) string { - if text == nil { - return "" - } - return *text -} - -func normalizeConversationHistoryRole(role string) string { - switch strings.ToLower(strings.TrimSpace(role)) { - case "user": - return "user" - case "assistant": - return "assistant" - default: - return "system" - } -} - -func conversationHistoryCacheCanServe(items []model.GetConversationHistoryItem) bool { - // 1. 历史接口一旦被前端用于“重试/编辑”等二次动作,消息 id 就必须稳定可追溯。 - // 2. 乐观缓存里的新消息在 DB 落库前没有自增主键,若直接返回,会让前端拿到占位 id。 - // 3. 因此只有“缓存里的每条消息都带稳定 DB id”时,才允许直接命中缓存;否则强制回源 DB。 - for _, item := range items { - if item.ID <= 0 { - return false - } - } - return items != nil -} - -func buildOptimisticConversationHistoryItem( - role string, - content string, - reasoningContent string, - reasoningDurationSeconds int, - createdAt time.Time, -) model.GetConversationHistoryItem { - item := model.GetConversationHistoryItem{ - Role: normalizeConversationHistoryRole(role), - Content: strings.TrimSpace(content), - ReasoningContent: strings.TrimSpace(reasoningContent), - ReasoningDurationSeconds: reasoningDurationSeconds, - } - if !createdAt.IsZero() { - t := createdAt - item.CreatedAt = &t - } - return item -} - -func appendConversationHistoryItemIfMissing( - items []model.GetConversationHistoryItem, - item model.GetConversationHistoryItem, -) []model.GetConversationHistoryItem { - targetKey := conversationHistoryItemSignature(item) - for _, existed := range items { - if conversationHistoryItemSignature(existed) == targetKey { - return items - } - } - return append(items, item) -} - -func conversationHistoryItemSignature(item model.GetConversationHistoryItem) string { - if item.ID > 0 { - return fmt.Sprintf("id:%d", item.ID) - } - - createdAt := "" - if item.CreatedAt != nil { - createdAt = item.CreatedAt.UTC().Format(time.RFC3339Nano) - } - - return fmt.Sprintf( - "%s|%s|%s|%d|%s", - strings.TrimSpace(item.Role), - strings.TrimSpace(item.Content), - strings.TrimSpace(item.ReasoningContent), - item.ReasoningDurationSeconds, - createdAt, - ) -} - -func sortConversationHistoryItems(items []model.GetConversationHistoryItem) { - sort.SliceStable(items, func(i, j int) bool { - left := conversationHistoryTimestamp(items[i]) - right := conversationHistoryTimestamp(items[j]) - if left.Equal(right) { - return conversationHistoryItemSignature(items[i]) < conversationHistoryItemSignature(items[j]) - } - return left.Before(right) - }) -} - -func conversationHistoryTimestamp(item model.GetConversationHistoryItem) time.Time { - if item.CreatedAt == nil { - return time.Time{} - } - return *item.CreatedAt -} diff --git a/backend/service/agentsvc/agent_newagent.go b/backend/service/agentsvc/agent_newagent.go index 0f58216..541f3d5 100644 --- a/backend/service/agentsvc/agent_newagent.go +++ b/backend/service/agentsvc/agent_newagent.go @@ -159,14 +159,19 @@ func (s *AgentService) runNewAgentGraph( } // 6. 构造 AgentGraphRequest。 - var confirmAction string + var ( + confirmAction string + resumeInteractionID string + ) if len(extra) > 0 { confirmAction = readAgentExtraString(extra, "confirm_action") + resumeInteractionID = readAgentExtraString(extra, "resume_interaction_id") } graphRequest := newagentmodel.AgentGraphRequest{ - UserInput: userMessage, - ConfirmAction: confirmAction, - AlwaysExecute: readAgentExtraBool(extra, "always_execute"), + UserInput: userMessage, + ConfirmAction: confirmAction, + ResumeInteractionID: resumeInteractionID, + AlwaysExecute: readAgentExtraBool(extra, "always_execute"), } graphRequest.Normalize() @@ -181,6 +186,10 @@ func (s *AgentService) runNewAgentGraph( // 8. 适配 SSE emitter。 sseEmitter := newagentstream.NewSSEPayloadEmitter(outChan) chunkEmitter := newagentstream.NewChunkEmitter(sseEmitter, traceID, resolvedModelName, requestStart.Unix()) + // 关键卡片事件走统一时间线持久化,保证刷新后可重建。 + chunkEmitter.SetExtraEventHook(func(extra *newagentstream.OpenAIChunkExtra) { + s.persistNewAgentTimelineExtraEvent(context.Background(), userID, chatID, extra) + }) // 9. 构造 AgentGraphDeps(由 cmd/start.go 注入的依赖)。 deps := newagentmodel.AgentGraphDeps{ @@ -466,19 +475,33 @@ func (s *AgentService) persistNewAgentConversationMessage( return err } - now := time.Now() - s.appendConversationHistoryCacheOptimistically( + // 统一写入会话时间线,保证正文与卡片可按单一 seq 顺序重建。 + timelineKind := model.AgentTimelineKindAssistantText + switch role { + case "user": + timelineKind = model.AgentTimelineKindUserText + case "assistant": + timelineKind = model.AgentTimelineKindAssistantText + } + timelinePayload := map[string]any{} + if persistPayload.ReasoningContent != "" { + timelinePayload["reasoning_content"] = persistPayload.ReasoningContent + } + if reasoningDurationSeconds > 0 { + timelinePayload["reasoning_duration_seconds"] = reasoningDurationSeconds + } + if _, err := s.appendConversationTimelineEvent( ctx, userID, chatID, - buildOptimisticConversationHistoryItem( - role, - content, - persistPayload.ReasoningContent, - reasoningDurationSeconds, - now, - ), - ) + timelineKind, + role, + content, + timelinePayload, + tokensConsumed, + ); err != nil { + return err + } return nil } diff --git a/backend/service/agentsvc/agent_schedule_state.go b/backend/service/agentsvc/agent_schedule_state.go index 2bf6e91..2b00dfb 100644 --- a/backend/service/agentsvc/agent_schedule_state.go +++ b/backend/service/agentsvc/agent_schedule_state.go @@ -5,27 +5,27 @@ import ( "errors" "fmt" "log" + "strings" "github.com/LoveLosita/smartflow/backend/model" newagentconv "github.com/LoveLosita/smartflow/backend/newAgent/conv" + newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" "github.com/LoveLosita/smartflow/backend/respond" ) -// SaveScheduleState 前端暂存日程调整到 Redis 快照。 +// SaveScheduleState 处理前端拖拽后的“暂存排程状态”请求。 // // 职责边界: -// 1. 只负责更新 Redis 中的 ScheduleState 中 source=task_item 的任务; -// 2. 接受绝对时间格式(与 apply-batch 统一),由 conv 层转换为内部相对坐标; -// 3. source=event 的课程保持快照原值不变; -// 4. 不负责写 MySQL、不负责刷新预览缓存; -// 5. 不负责触发 graph 执行(由 confirm_action=accept 驱动)。 +// 1. 负责把前端绝对坐标写回当前会话的 ScheduleState 快照; +// 2. 负责刷新 Redis 预览缓存,保证后续预览读取与最新拖拽一致; +// 3. 不负责写 MySQL 正式课表,也不负责触发新一轮 graph 执行。 func (s *AgentService) SaveScheduleState( ctx context.Context, userID int, conversationID string, items []model.SaveScheduleStatePlacedItem, ) error { - // 1. 加载快照。 + // 1. 加载会话快照;没有快照说明当前会话不在可微调窗口内。 if s.agentStateStore == nil { return errors.New("agent state store 未初始化") } @@ -33,12 +33,11 @@ func (s *AgentService) SaveScheduleState( if err != nil { return fmt.Errorf("加载快照失败: %w", err) } - if !ok || snapshot == nil || snapshot.ScheduleState == nil { return respond.ScheduleStateSnapshotNotFound } - // 2. 校验归属。 + // 2. 做会话归属校验,防止跨用户写入别人的会话快照。 if snapshot.RuntimeState != nil { cs := snapshot.RuntimeState.EnsureCommonState() if cs.UserID != 0 && cs.UserID != userID { @@ -46,17 +45,98 @@ func (s *AgentService) SaveScheduleState( } } - // 3. 调用 conv 层将绝对时间放置项应用到 ScheduleState。 + // 3. 将前端绝对坐标应用到内存态 ScheduleState。 + // 3.1 这里只修改 source=task_item 任务; + // 3.2 source=event 课程位保持不变; + // 3.3 坐标非法时由 ApplyPlacedItems 返回明确错误。 if err := newagentconv.ApplyPlacedItems(snapshot.ScheduleState, items); err != nil { return err } - // 4. 写回 Redis。 + // 4. 先写回运行态快照,确保“拖拽后的状态”成为后续读链路真值。 if err := s.agentStateStore.Save(ctx, conversationID, snapshot); err != nil { return fmt.Errorf("保存快照失败: %w", err) } - log.Printf("[INFO] schedule state saved chat=%s user=%d item_count=%d", - conversationID, userID, len(items)) + // 5. 再刷新预览缓存,避免 GetSchedulePlanPreview 读到拖拽前旧缓存。 + if err := s.refreshSchedulePreviewAfterStateSave(ctx, userID, conversationID, snapshot); err != nil { + return err + } + + log.Printf("[INFO] schedule state saved chat=%s user=%d item_count=%d", conversationID, userID, len(items)) + return nil +} + +// refreshSchedulePreviewAfterStateSave 按“最新快照”重建并覆盖 Redis 预览缓存。 +// +// 职责边界: +// 1. 只处理 Redis 预览缓存,不负责 MySQL 快照; +// 2. 以最新 ScheduleState 为准,修复“预览读到旧拖拽结果”的回滚问题; +// 3. 尽量保留旧预览中的 trace_id/candidate_plans,避免前端字段突变。 +func (s *AgentService) refreshSchedulePreviewAfterStateSave( + ctx context.Context, + userID int, + conversationID string, + snapshot *newagentmodel.AgentStateSnapshot, +) error { + // 1. 依赖不完整时直接跳过,避免写入不完整缓存。 + if s == nil || s.cacheDAO == nil || snapshot == nil || snapshot.ScheduleState == nil { + return nil + } + normalizedConversationID := strings.TrimSpace(conversationID) + if normalizedConversationID == "" { + return nil + } + + // 2. 从运行态提取 task_class_ids,保证预览过滤口径与会话一致。 + taskClassIDs := make([]int, 0) + if snapshot.RuntimeState != nil { + flowState := snapshot.RuntimeState.EnsureCommonState() + taskClassIDs = append(taskClassIDs, flowState.TaskClassIDs...) + } + + // 3. 基于最新 ScheduleState 生成预览主干(hybrid_entries 为最新真值)。 + preview := newagentconv.ScheduleStateToPreview( + snapshot.ScheduleState, + userID, + normalizedConversationID, + taskClassIDs, + "", + ) + if preview == nil { + return nil + } + + // 4. 合并旧预览里需要保留的字段,避免前端依赖字段突然丢失。 + existingPreview, err := s.cacheDAO.GetSchedulePlanPreviewFromCache(ctx, userID, normalizedConversationID) + if err != nil { + return fmt.Errorf("读取排程预览缓存失败: %w", err) + } + if existingPreview != nil { + preview.TraceID = strings.TrimSpace(existingPreview.TraceID) + if len(existingPreview.CandidatePlans) > 0 { + preview.CandidatePlans = cloneWeekSchedules(existingPreview.CandidatePlans) + } + if len(existingPreview.AllocatedItems) > 0 { + preview.AllocatedItems = cloneTaskClassItems(existingPreview.AllocatedItems) + } + if len(preview.TaskClassIDs) == 0 && len(existingPreview.TaskClassIDs) > 0 { + preview.TaskClassIDs = append([]int(nil), existingPreview.TaskClassIDs...) + } + } + if preview.CandidatePlans == nil { + preview.CandidatePlans = make([]model.UserWeekSchedule, 0) + } + if preview.HybridEntries == nil { + preview.HybridEntries = make([]model.HybridScheduleEntry, 0) + } + if preview.TaskClassIDs == nil { + preview.TaskClassIDs = make([]int, 0) + } + + // 5. 回写 Redis 预览缓存;失败则返回错误,让前端可感知并重试。 + if err := s.cacheDAO.SetSchedulePlanPreviewToCache(ctx, userID, normalizedConversationID, preview); err != nil { + return fmt.Errorf("刷新排程预览缓存失败: %w", err) + } return nil } diff --git a/backend/service/agentsvc/agent_timeline.go b/backend/service/agentsvc/agent_timeline.go new file mode 100644 index 0000000..7175501 --- /dev/null +++ b/backend/service/agentsvc/agent_timeline.go @@ -0,0 +1,388 @@ +package agentsvc + +import ( + "context" + "encoding/json" + "errors" + "log" + "strings" + "time" + + "github.com/LoveLosita/smartflow/backend/model" + newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" + "gorm.io/gorm" +) + +// GetConversationTimeline 返回指定会话的统一时间线(正文+卡片)列表。 +// +// 职责边界: +// 1. 只读,不修改会话状态; +// 2. 顺序以 seq 为准,保证刷新后可稳定重建; +// 3. 优先读 Redis 时间线缓存,未命中再回源 MySQL。 +func (s *AgentService) GetConversationTimeline(ctx context.Context, userID int, chatID string) ([]model.GetConversationTimelineItem, error) { + normalizedChatID := normalizeConversationID(chatID) + if userID <= 0 || strings.TrimSpace(normalizedChatID) == "" { + return nil, gorm.ErrRecordNotFound + } + + exists, err := s.repo.IfChatExists(ctx, userID, normalizedChatID) + if err != nil { + return nil, err + } + if !exists { + return nil, gorm.ErrRecordNotFound + } + + if s.cacheDAO != nil { + cacheItems, cacheErr := s.cacheDAO.GetConversationTimelineFromCache(ctx, userID, normalizedChatID) + if cacheErr == nil && cacheItems != nil { + return normalizeConversationTimelineItems(cacheItems), nil + } + if cacheErr != nil { + log.Printf("读取会话时间线缓存失败 user=%d chat=%s err=%v", userID, normalizedChatID, cacheErr) + } + } + + events, err := s.repo.ListConversationTimelineEvents(ctx, userID, normalizedChatID) + if err != nil { + return nil, err + } + items := buildConversationTimelineItemsFromDB(events) + + if s.cacheDAO != nil { + if err := s.cacheDAO.SetConversationTimelineToCache(ctx, userID, normalizedChatID, items); err != nil { + log.Printf("回填会话时间线缓存失败 user=%d chat=%s err=%v", userID, normalizedChatID, err) + } + if len(items) > 0 { + if err := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, normalizedChatID, items[len(items)-1].Seq); err != nil { + log.Printf("回填会话时间线 seq 失败 user=%d chat=%s err=%v", userID, normalizedChatID, err) + } + } + } + + return normalizeConversationTimelineItems(items), nil +} + +// appendConversationTimelineEvent 统一追加单条时间线事件到 Redis + MySQL。 +// +// 步骤化说明: +// 1. 先从 Redis INCR 分配 seq,若 Redis 异常则回退 DB MAX(seq)+1; +// 2. 再写 MySQL,保证刷新时至少有权威持久化; +// 3. 最后追加 Redis 时间线列表,失败只记日志,不影响主链路返回; +// 4. 返回分配到的 seq,便于后续扩展在 SSE meta 回传顺序号。 +func (s *AgentService) appendConversationTimelineEvent( + ctx context.Context, + userID int, + chatID string, + kind string, + role string, + content string, + payload map[string]any, + tokensConsumed int, +) (int64, error) { + if s == nil || s.repo == nil { + return 0, errors.New("agent service is not initialized") + } + if ctx == nil { + ctx = context.Background() + } + + normalizedChatID := strings.TrimSpace(chatID) + normalizedRole := strings.TrimSpace(role) + normalizedKind := canonicalizeTimelineKind(kind, normalizedRole) + normalizedContent := strings.TrimSpace(content) + if userID <= 0 || normalizedChatID == "" || normalizedKind == "" { + return 0, errors.New("invalid timeline event identity") + } + + seq, err := s.nextConversationTimelineSeq(ctx, userID, normalizedChatID) + if err != nil { + return 0, err + } + + payloadJSON := marshalTimelinePayloadJSON(payload) + persistPayload := model.ChatTimelinePersistPayload{ + UserID: userID, + ConversationID: normalizedChatID, + Seq: seq, + Kind: normalizedKind, + Role: normalizedRole, + Content: normalizedContent, + PayloadJSON: payloadJSON, + TokensConsumed: tokensConsumed, + } + eventID, eventCreatedAt, err := s.repo.SaveConversationTimelineEvent(ctx, persistPayload) + if err != nil { + // 1. 并发极端场景下(例如 Redis seq 分配失败后 DB 兜底)可能产生重复 seq; + // 2. 这里做一次“读取最新 MAX(seq)+1”的重试,避免主链路直接失败; + // 3. 重试仍失败则返回错误,让调用方感知真实落库失败。 + if !isTimelineSeqConflictError(err) { + return 0, err + } + maxSeq, seqErr := s.repo.GetConversationTimelineMaxSeq(ctx, userID, normalizedChatID) + if seqErr != nil { + return 0, err + } + persistPayload.Seq = maxSeq + 1 + var retryErr error + eventID, eventCreatedAt, retryErr = s.repo.SaveConversationTimelineEvent(ctx, persistPayload) + if retryErr != nil { + return 0, retryErr + } + seq = persistPayload.Seq + if s.cacheDAO != nil { + if setErr := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, normalizedChatID, seq); setErr != nil { + log.Printf("时间线 seq 冲突重试后回写 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, normalizedChatID, seq, setErr) + } + } + } + + if s.cacheDAO != nil { + now := time.Now() + item := model.GetConversationTimelineItem{ + ID: eventID, + Seq: seq, + Kind: normalizedKind, + Role: normalizedRole, + Content: normalizedContent, + Payload: cloneTimelinePayload(payload), + TokensConsumed: tokensConsumed, + } + if eventCreatedAt != nil { + item.CreatedAt = eventCreatedAt + } else { + item.CreatedAt = &now + } + if err := s.cacheDAO.AppendConversationTimelineEventToCache(ctx, userID, normalizedChatID, item); err != nil { + log.Printf("追加会话时间线缓存失败 user=%d chat=%s seq=%d kind=%s err=%v", userID, normalizedChatID, seq, normalizedKind, err) + } + } + return seq, nil +} + +func isTimelineSeqConflictError(err error) bool { + if err == nil { + return false + } + text := strings.ToLower(err.Error()) + return strings.Contains(text, "duplicate") && strings.Contains(text, "uk_timeline_user_chat_seq") +} + +// persistNewAgentTimelineExtraEvent 把 SSE extra 卡片事件写入时间线。 +// +// 说明: +// 1. 只持久化真正需要刷新后重建的卡片事件; +// 2. status/reasoning/finish 等临时过程信号不落时间线; +// 3. 失败只记日志,不中断当前 SSE 输出。 +func (s *AgentService) persistNewAgentTimelineExtraEvent(ctx context.Context, userID int, chatID string, extra *newagentstream.OpenAIChunkExtra) { + kind, ok := mapTimelineKindFromStreamExtra(extra) + if !ok { + return + } + if ctx == nil { + ctx = context.Background() + } + + if _, err := s.appendConversationTimelineEvent( + ctx, + userID, + chatID, + kind, + "", + "", + buildTimelinePayloadFromStreamExtra(extra), + 0, + ); err != nil { + log.Printf("写入 newAgent 卡片时间线失败 user=%d chat=%s kind=%s err=%v", userID, chatID, kind, err) + } +} + +func (s *AgentService) nextConversationTimelineSeq(ctx context.Context, userID int, chatID string) (int64, error) { + if s.cacheDAO != nil { + seq, err := s.cacheDAO.IncrConversationTimelineSeq(ctx, userID, chatID) + if err == nil { + return seq, nil + } + log.Printf("会话时间线 seq Redis 分配失败,回退 DB user=%d chat=%s err=%v", userID, chatID, err) + } + + maxSeq, err := s.repo.GetConversationTimelineMaxSeq(ctx, userID, chatID) + if err != nil { + return 0, err + } + seq := maxSeq + 1 + if s.cacheDAO != nil { + if err := s.cacheDAO.SetConversationTimelineSeq(ctx, userID, chatID, seq); err != nil { + log.Printf("会话时间线 seq 回填 Redis 失败 user=%d chat=%s seq=%d err=%v", userID, chatID, seq, err) + } + } + return seq, nil +} + +func buildConversationTimelineItemsFromDB(events []model.AgentTimelineEvent) []model.GetConversationTimelineItem { + if len(events) == 0 { + return make([]model.GetConversationTimelineItem, 0) + } + items := make([]model.GetConversationTimelineItem, 0, len(events)) + for _, event := range events { + item := model.GetConversationTimelineItem{ + ID: event.ID, + Seq: event.Seq, + Kind: strings.TrimSpace(event.Kind), + TokensConsumed: event.TokensConsumed, + CreatedAt: event.CreatedAt, + } + if event.Role != nil { + item.Role = strings.TrimSpace(*event.Role) + } + if event.Content != nil { + item.Content = strings.TrimSpace(*event.Content) + } + if event.Payload != nil { + var payload map[string]any + if err := json.Unmarshal([]byte(strings.TrimSpace(*event.Payload)), &payload); err == nil && len(payload) > 0 { + item.Payload = payload + } + } + items = append(items, item) + } + return normalizeConversationTimelineItems(items) +} + +// normalizeConversationTimelineItems 统一收敛 timeline 的 kind/role 口径,避免前端切分失效。 +func normalizeConversationTimelineItems(items []model.GetConversationTimelineItem) []model.GetConversationTimelineItem { + if len(items) == 0 { + return make([]model.GetConversationTimelineItem, 0) + } + normalized := make([]model.GetConversationTimelineItem, 0, len(items)) + for _, item := range items { + role := strings.ToLower(strings.TrimSpace(item.Role)) + kind := canonicalizeTimelineKind(item.Kind, role) + + // kind 缺失时尝试从 role 反推文本类型,保障“用户分段锚点”可用。 + if kind == "" { + switch role { + case "user": + kind = model.AgentTimelineKindUserText + case "assistant": + kind = model.AgentTimelineKindAssistantText + } + } + // role 缺失时按文本类型补齐,减少前端额外兼容判断。 + if role == "" { + switch kind { + case model.AgentTimelineKindUserText: + role = "user" + case model.AgentTimelineKindAssistantText: + role = "assistant" + } + } + + item.Kind = kind + item.Role = role + normalized = append(normalized, item) + } + return normalized +} + +// canonicalizeTimelineKind 统一 kind 别名,收敛到文档定义值。 +func canonicalizeTimelineKind(kind string, role string) string { + normalizedKind := strings.ToLower(strings.TrimSpace(kind)) + normalizedRole := strings.ToLower(strings.TrimSpace(role)) + switch normalizedKind { + case model.AgentTimelineKindUserText, + model.AgentTimelineKindAssistantText, + model.AgentTimelineKindToolCall, + model.AgentTimelineKindToolResult, + model.AgentTimelineKindConfirmRequest, + model.AgentTimelineKindScheduleCompleted: + return normalizedKind + case "text", "message", "query": + if normalizedRole == "user" { + return model.AgentTimelineKindUserText + } + if normalizedRole == "assistant" { + return model.AgentTimelineKindAssistantText + } + return normalizedKind + default: + return normalizedKind + } +} + +func marshalTimelinePayloadJSON(payload map[string]any) string { + if len(payload) == 0 { + return "" + } + data, err := json.Marshal(payload) + if err != nil { + return "" + } + return string(data) +} + +func cloneTimelinePayload(payload map[string]any) map[string]any { + if len(payload) == 0 { + return nil + } + cloned := make(map[string]any, len(payload)) + for key, value := range payload { + cloned[key] = value + } + return cloned +} + +func mapTimelineKindFromStreamExtra(extra *newagentstream.OpenAIChunkExtra) (string, bool) { + if extra == nil { + return "", false + } + switch extra.Kind { + case newagentstream.StreamExtraKindToolCall: + return model.AgentTimelineKindToolCall, true + case newagentstream.StreamExtraKindToolResult: + return model.AgentTimelineKindToolResult, true + case newagentstream.StreamExtraKindConfirm: + return model.AgentTimelineKindConfirmRequest, true + case newagentstream.StreamExtraKindScheduleCompleted: + return model.AgentTimelineKindScheduleCompleted, true + default: + return "", false + } +} + +func buildTimelinePayloadFromStreamExtra(extra *newagentstream.OpenAIChunkExtra) map[string]any { + if extra == nil { + return nil + } + payload := map[string]any{ + "stage": strings.TrimSpace(extra.Stage), + "block_id": strings.TrimSpace(extra.BlockID), + "display_mode": string(extra.DisplayMode), + } + if extra.Tool != nil { + payload["tool"] = map[string]any{ + "name": strings.TrimSpace(extra.Tool.Name), + "status": strings.TrimSpace(extra.Tool.Status), + "summary": strings.TrimSpace(extra.Tool.Summary), + "arguments_preview": strings.TrimSpace(extra.Tool.ArgumentsPreview), + } + } + if extra.Confirm != nil { + payload["confirm"] = map[string]any{ + "interaction_id": strings.TrimSpace(extra.Confirm.InteractionID), + "title": strings.TrimSpace(extra.Confirm.Title), + "summary": strings.TrimSpace(extra.Confirm.Summary), + } + } + if extra.Interrupt != nil { + payload["interrupt"] = map[string]any{ + "interaction_id": strings.TrimSpace(extra.Interrupt.InteractionID), + "type": strings.TrimSpace(extra.Interrupt.Type), + "summary": strings.TrimSpace(extra.Interrupt.Summary), + } + } + if len(extra.Meta) > 0 { + payload["meta"] = cloneTimelinePayload(extra.Meta) + } + return payload +} diff --git a/docs/frontend/newagent_timeline_对接说明.md b/docs/frontend/newagent_timeline_对接说明.md new file mode 100644 index 0000000..93d0c05 --- /dev/null +++ b/docs/frontend/newagent_timeline_对接说明.md @@ -0,0 +1,137 @@ +# NewAgent 时间线对接说明(前端) + +## 1. 变更目标 + +后端已将 **正文消息** 与 **卡片事件(工具调用/确认/排程完成)** 统一落到会话时间线,刷新页面后可完整恢复,不再依赖“页面不刷新且持续订阅 SSE”才能看到卡片。 + +本次是开发环境直切,旧接口已退役: + +- 退役:`GET /api/v1/agent/conversation-history` +- 新增:`GET /api/v1/agent/conversation-timeline` + +## 2. 新接口 + +### 2.1 请求 + +`GET /api/v1/agent/conversation-timeline?conversation_id={conversation_id}` + +鉴权与其他 agent 接口一致:JWT。 + +### 2.2 响应结构 + +`data` 是按顺序返回的数组,每项结构如下: + +```json +{ + "id": 123, + "seq": 1, + "kind": "assistant_text", + "role": "assistant", + "content": "你好,我来帮你处理。", + "payload": { + "reasoning_content": "..." + }, + "tokens_consumed": 0, + "created_at": "2026-04-19T12:00:00+08:00" +} +``` + +字段说明: + +- `seq`:同一会话内严格递增顺序号,前端渲染顺序以它为准。 +- `kind`:事件类型(见下文映射表)。 +- `role`/`content`:正文消息使用。 +- `payload`:卡片渲染所需结构化信息。 + +## 3. kind 映射(前端渲染) + +- `user_text`:用户正文气泡。 +- `assistant_text`:助手正文气泡。 +- `tool_call`:工具开始卡片。 +- `tool_result`:工具结果卡片。 +- `confirm_request`:确认卡片。 +- `schedule_completed`:排程完成卡片(展示完成态,详情仍走原有排程查询接口)。 + +### 3.1 卡片 payload 结构 + +`tool_call` / `tool_result`: + +```json +{ + "stage": "execute", + "block_id": "execute.status", + "display_mode": "card", + "tool": { + "name": "move", + "status": "start|done|blocked|failed", + "summary": "xxx", + "arguments_preview": "xxx" + } +} +``` + +`confirm_request`: + +```json +{ + "stage": "confirm", + "block_id": "confirm.status", + "display_mode": "card", + "confirm": { + "interaction_id": "xxx", + "title": "操作确认", + "summary": "xxx" + } +} +``` + +`schedule_completed`: + +```json +{ + "stage": "deliver", + "block_id": "deliver.status", + "display_mode": "card" +} +``` + +## 4. 前端建议改造 + +### 4.1 会话初始化/刷新 + +进入会话页或刷新后,调用一次: + +- `GET /api/v1/agent/conversation-timeline` + +然后: + +1. 直接按返回数组顺序渲染; +2. 不需要再拼接旧 `conversation-history`; +3. 旧接口调用逻辑可直接删除。 + +### 4.2 进行中会话(SSE) + +进行中仍可继续消费 SSE(实时体验不变)。 + +建议策略: + +1. 首屏先用 `conversation-timeline` 重建历史; +2. 新一轮聊天过程中继续把 SSE 增量渲染到当前 UI; +3. 页面刷新时再次拉 `conversation-timeline`,即可恢复完整状态。 + +## 5. 顺序保证 + +后端在写入时间线时为每个事件分配 `seq`,并将正文与卡片写入同一链路: + +- 用户正文(`user_text`) +- 助手正文(`assistant_text`) +- 工具开始/结果(`tool_call`/`tool_result`) +- 确认卡片(`confirm_request`) +- 排程完成卡片(`schedule_completed`) + +因此前端只要遵循 `seq` 顺序渲染,就不会出现“正文和卡片乱序”问题。 + +## 6. Token 说明 + +时间线中的 `tokens_consumed` 仅作为展示冗余字段,真实 token 账本统计仍以后端原有口径(`chat_histories` / `agent_chats`)为准。 + diff --git a/frontend/src/api/schedule_agent.ts b/frontend/src/api/schedule_agent.ts index d407fa4..2ea90d2 100644 --- a/frontend/src/api/schedule_agent.ts +++ b/frontend/src/api/schedule_agent.ts @@ -3,6 +3,37 @@ import type { ApiResponse } from '@/types/api' import type { PlacedItem, SchedulePreviewData } from '@/types/dashboard' import { extractErrorMessage } from '@/utils/http' +export interface TimelineToolPayload { + name: string + status: 'start' | 'done' | 'blocked' | 'failed' + summary: string + arguments_preview?: string +} + +export interface TimelineConfirmPayload { + interaction_id: string + title: string + summary: string +} + +export interface TimelineEvent { + id: number + seq: number + kind: 'user_text' | 'assistant_text' | 'tool_call' | 'tool_result' | 'confirm_request' | 'schedule_completed' + role?: 'user' | 'assistant' + content?: string + payload?: { + reasoning_content?: string + stage?: string + block_id?: string + display_mode?: 'card' + tool?: TimelineToolPayload + confirm?: TimelineConfirmPayload + } + tokens_consumed?: number + created_at: string +} + /** * 获取排程预览数据 */ @@ -17,6 +48,20 @@ export async function getSchedulePreview(conversationId: string): Promise { + try { + const response = await http.get>('/agent/conversation-timeline', { + params: { conversation_id: conversationId }, + }) + return response.data.data + } catch (error) { + throw new Error(extractErrorMessage(error, '获取会话时间线失败')) + } +} + /** * 暂存排程状态到 Redis */ diff --git a/frontend/src/components/assistant/ScheduleFineTuneModal.vue b/frontend/src/components/assistant/ScheduleFineTuneModal.vue index 44dd9ed..64a0834 100644 --- a/frontend/src/components/assistant/ScheduleFineTuneModal.vue +++ b/frontend/src/components/assistant/ScheduleFineTuneModal.vue @@ -1,11 +1,12 @@