diff --git a/README.md b/README.md index e663f70..3ab77cf 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,9 @@ +# 许可证 + +本项目采用 **GNU Affero General Public License v3.0 (AGPL-3.0)** 进行许可。 + +详见根目录 [`LICENSE`](./LICENSE) 文件。 + # 时伴 SmartMate > 越用越懂你的成长型 AI 排程伙伴 · 面向大学生的陪伴式日程管理平台 diff --git a/backend/api/agent.go b/backend/api/agent.go index fd108bb..b6c47e0 100644 --- a/backend/api/agent.go +++ b/backend/api/agent.go @@ -285,6 +285,12 @@ func (api *AgentHandler) GetContextStats(c *gin.Context) { } // 直接透传 JSON 字符串,避免二次序列化。 + // 当会话尚未产生 compaction 统计时,LoadContextTokenStats 返回空字符串, + // 此时 json.RawMessage("") 在 MarshalJSON 时会报 "unexpected end of JSON input", + // 所以空值时需要替换为 "null",保证序列化安全。 + if strings.TrimSpace(statsJSON) == "" { + statsJSON = "null" + } var raw json.RawMessage = json.RawMessage(statsJSON) c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, raw)) } diff --git a/backend/config.example.yaml b/backend/config.example.yaml index 270c999..d72acd1 100644 --- a/backend/config.example.yaml +++ b/backend/config.example.yaml @@ -170,8 +170,8 @@ memory: claimBatch: 1 decision: # 决策层总开关。 - # 开启后,写入链路会从“直接新增”升级成“召回旧记忆 -> 比对 -> 决策动作”。 - enabled: false + # 开启后,写入链路会从”直接新增”升级成”召回旧记忆 -> 比对 -> 决策动作”。 + enabled: true # 决策层语义候选数上限。 candidateTopK: 5 # 决策层语义候选最低相似度阈值。 @@ -186,6 +186,8 @@ memory: # decision:启用决策式写入 # 注意:只有 decision.enabled=true 时,这个值才真正生效。 mode: legacy + # 写入最低置信度阈值,抽取结果 confidence 低于此值直接丢弃。 + minConfidence: 0.5 # 联网搜索能力配置。 websearch: diff --git a/backend/dao/cache.go b/backend/dao/cache.go index ff8c3f9..162fc12 100644 --- a/backend/dao/cache.go +++ b/backend/dao/cache.go @@ -4,7 +4,9 @@ import ( "context" "encoding/json" "errors" + "fmt" + memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" "strings" "time" @@ -604,3 +606,107 @@ func (d *CacheDAO) DeleteAgentState(ctx context.Context, conversationID string) } return d.client.Del(ctx, d.agentStateKey(normalizedID)).Err() } + +// --- 记忆预取缓存 --- + +const ( + memoryPrefetchTTL = 30 * time.Minute +) + +// memoryPrefetchKey 生成用户+会话维度的记忆预取缓存 key。 +// +// 1. 格式:smartflow:memory_prefetch:u:{userID}:c:{chatID},与 conversationHistoryKey / schedulePreviewKey 命名风格一致; +// 2. chatID 为空时 key 为 smartflow:memory_prefetch:u:5:c:,仍然合法且唯一,不会与其他会话 key 冲突; +// 3. 加 chatID 隔离后,不同会话各自维护独立的预取缓存,避免会话间记忆上下文互相覆盖。 +func (d *CacheDAO) memoryPrefetchKey(userID int, chatID string) string { + return fmt.Sprintf("smartflow:memory_prefetch:u:%d:c:%s", userID, chatID) +} + +// GetMemoryPrefetchCache 读取用户记忆预取缓存。 +// +// 输入输出语义: +// 1. 命中时返回 ItemDTO 切片与 nil error; +// 2. 未命中时返回 nil, nil; +// 3. Redis 异常或反序列化失败时返回 error。 +func (d *CacheDAO) GetMemoryPrefetchCache(ctx context.Context, userID int, chatID string) ([]memorymodel.ItemDTO, error) { + if d == nil || d.client == nil { + return nil, errors.New("cache dao is not initialized") + } + if userID <= 0 { + return nil, nil + } + + key := d.memoryPrefetchKey(userID, chatID) + raw, err := d.client.Get(ctx, key).Result() + if errors.Is(err, redis.Nil) { + return nil, nil + } + if err != nil { + return nil, err + } + + var items []memorymodel.ItemDTO + if err = json.Unmarshal([]byte(raw), &items); err != nil { + return nil, fmt.Errorf("unmarshal memory prefetch cache failed: %w", err) + } + return items, nil +} + +// SetMemoryPrefetchCache 写入用户记忆预取缓存。 +// +// 职责边界: +// 1. 负责将检索后的记忆 DTO 写入 Redis,供下一轮 Chat 节点即时消费; +// 2. TTL 30 分钟,靠自然过期淘汰,不需要显式 Invalidate; +// 3. items 为空或 nil 时直接返回,避免写入无效数据。 +func (d *CacheDAO) SetMemoryPrefetchCache(ctx context.Context, userID int, chatID string, items []memorymodel.ItemDTO) error { + if d == nil || d.client == nil { + return errors.New("cache dao is not initialized") + } + if userID <= 0 || len(items) == 0 { + return nil + } + + data, err := json.Marshal(items) + if err != nil { + return fmt.Errorf("marshal memory prefetch cache failed: %w", err) + } + key := d.memoryPrefetchKey(userID, chatID) + return d.client.Set(ctx, key, data, memoryPrefetchTTL).Err() +} + +// DeleteMemoryPrefetchCacheByUser 删除指定用户所有会话的记忆预取缓存。 +// +// 步骤化说明: +// 1. 用 SCAN 遍历 smartflow:memory_prefetch:u:{userID}:c:* 匹配的所有 key; +// 2. 用 UNLINK 异步删除,避免阻塞 Redis 主线程; +// 3. 复用 DeleteUserRecentCompletedSchedulesFromCache 的 SCAN+UNLINK 模式; +// 4. 该方法被 GORM cache deleter 和空检索清理两条链路共同调用,保证缓存一致性。 +func (d *CacheDAO) DeleteMemoryPrefetchCacheByUser(ctx context.Context, userID int) error { + if d == nil || d.client == nil { + return errors.New("cache dao is not initialized") + } + if userID <= 0 { + return nil + } + + pattern := fmt.Sprintf("smartflow:memory_prefetch:u:%d:c:*", userID) + var cursor uint64 + for { + keys, next, err := d.client.Scan(ctx, cursor, pattern, 500).Result() + if err != nil { + return err + } + if len(keys) > 0 { + // 1. UNLINK 是 DEL 的异步版本,不会阻塞 Redis 主线程; + // 2. 即使 key 不存在也不会报错,幂等安全。 + if err := d.client.Unlink(ctx, keys...).Err(); err != nil { + return err + } + } + cursor = next + if cursor == 0 { + break + } + } + return nil +} diff --git a/backend/infra/rag/corpus/memory_corpus.go b/backend/infra/rag/corpus/memory_corpus.go index af461f8..fe93eca 100644 --- a/backend/infra/rag/corpus/memory_corpus.go +++ b/backend/infra/rag/corpus/memory_corpus.go @@ -116,6 +116,7 @@ func (c *MemoryCorpus) BuildRetrieveFilter(_ context.Context, req any) (map[stri } filter := map[string]any{ "user_id": input.UserID, + "status": "active", } if v := strings.TrimSpace(input.ConversationID); v != "" { filter["conversation_id"] = v diff --git a/backend/memory/model/config.go b/backend/memory/model/config.go index b11d530..bd2b729 100644 --- a/backend/memory/model/config.go +++ b/backend/memory/model/config.go @@ -65,6 +65,13 @@ type Config struct { DecisionCandidateMinScore float64 // Milvus 语义召回最低相似度 DecisionFallbackMode string // "legacy_add"(退回旧路径直接新增)/ "drop"(丢弃) WriteMode string // "legacy"(旧路径)/ "decision"(决策流程),仅 DecisionEnabled=true 时生效 + + // 写入置信度阈值。 + // 说明: + // 1. 抽取结果 confidence 低于此值直接丢弃,不做入库; + // 2. 默认 0.5,与"守门员"prompt 的 confidence>=0.5 输出规则配合; + // 3. fallback 路径 confidence 设为 0.45,低于默认阈值,LLM 不可用时不写入。 + WriteMinConfidence float64 } // NormalizeReadMode 统一读取模式字符串。 diff --git a/backend/memory/orchestrator/llm_write_orchestrator.go b/backend/memory/orchestrator/llm_write_orchestrator.go index d9a8889..b3c16ab 100644 --- a/backend/memory/orchestrator/llm_write_orchestrator.go +++ b/backend/memory/orchestrator/llm_write_orchestrator.go @@ -92,7 +92,8 @@ func (o *LLMWriteOrchestrator) ExtractFacts(ctx context.Context, payload memorym } type memoryExtractResponse struct { - Facts []memoryExtractFact `json:"facts"` + MessageIntent string `json:"message_intent"` + Facts []memoryExtractFact `json:"facts"` } type memoryExtractFact struct { @@ -123,33 +124,43 @@ func buildMemoryExtractSystemPrompt(override string) string { return override } - return strings.TrimSpace(`你是一个“记忆抽取器”。 -你的任务是从单条用户消息中抽取值得长期记住的事实、偏好、约束、待办线索。 + return strings.TrimSpace(`你是一个”记忆守门员”。 +你的任务是判断用户消息是否包含值得长期记住的信息,如有则提取。 请只输出 JSON 对象,不要输出解释、不要输出 markdown。 输出格式: { - "facts": [ + “message_intent”: “chitchat|task_request|knowledge_qa|preference|personal_fact|standing_instruction”, + “facts”: [ { - "memory_type": "preference|constraint|fact|todo_hint", - "title": "短标题", - "content": "完整事实内容", - "confidence": 0.0, - "importance": 0.0, - "sensitivity_level": 0, - "is_explicit": false + “memory_type”: “preference|constraint|fact|todo_hint”, + “title”: “短标题”, + “content”: “完整事实内容”, + “confidence”: 0.0, + “importance”: 0.0, + “sensitivity_level”: 0, + “is_explicit”: false } ] } +意图分类规则: +- chitchat:闲聊、寒暄、情绪表达(”你好””谢谢””我今天好累””嗯嗯”) +- task_request:一次性任务请求(”帮我查天气””定个闹钟””帮我写个邮件”) +- knowledge_qa:知识问答、信息查询(”什么是量子力学””北京明天多少度”) +- preference:用户偏好、习惯、口味(”我喜欢吃辣””别用简称””我习惯用微信”) +- personal_fact:个人事实(”我有两个孩子””我在上海工作””我老婆对花生过敏”) +- standing_instruction:持久指令(”以后都用英文回复我””记住我的生日是3月5号”) + 规则: -1. 最多输出 5 条事实。 -2. 只保留稳定、未来可能复用的信息,闲聊、寒暄、一次性噪声不要记。 -3. 用户明确说“记住”或“以后提醒我”时,is_explicit 设为 true。 -4. confidence 表示这条事实是否真的值得记,取 0 到 1。 -5. importance 表示对后续提醒/陪伴的价值,取 0 到 1。 +1. 先判断 message_intent。chitchat / task_request / knowledge_qa 三类,facts 输出空数组。 +2. 只有 preference / personal_fact / standing_instruction 才提取 facts,最多 3 条。 +3. 一条消息可能同时包含任务和偏好(如”帮我查天气,记住我喜欢晴天”),此时 intent 取偏好类型,facts 只保留偏好部分。 +4. confidence 表示这条事实是否真的值得长期记,取 0 到 1。低于 0.5 的不要输出。 +5. importance 表示对后续陪伴的价值,取 0 到 1。 6. sensitivity_level 取 0 到 2,数字越大越敏感。 -7. 不确定就少记,不要编造。`) +7. 用户明确说”记住”或”以后提醒我”时,is_explicit 设为 true。 +8. 宁可漏记也不要滥记。大多数消息不应该产生任何 facts。`) } func buildMemoryExtractUserPrompt(payload memorymodel.ExtractJobPayload) string { @@ -167,15 +178,27 @@ func buildMemoryExtractUserPrompt(payload memorymodel.ExtractJobPayload) string raw, err := json.MarshalIndent(request, "", " ") if err != nil { - return fmt.Sprintf("请从这条消息中抽取可长期记住的信息:%s", payload.SourceText) + return fmt.Sprintf("请分析这条用户消息,判断是否需要写入长期记忆:%s", payload.SourceText) } - return fmt.Sprintf("请从下面这条用户消息中抽取可长期记住的信息,最多 %d 条。\n输入:\n%s", - defaultMemoryExtractMaxFacts, string(raw)) + return fmt.Sprintf("请分析下面这条用户消息,判断 message_intent,如包含值得长期记住的信息则提取 facts。\n输入:\n%s", + string(raw)) } func convertExtractResponse(resp *memoryExtractResponse) []memorymodel.FactCandidate { - if resp == nil || len(resp.Facts) == 0 { + if resp == nil { + return nil + } + + // 意图过滤:跳过不需要记忆的消息类型。 + // 兼容自定义 prompt(不返回 message_intent 时跳过此检查,保持向后兼容)。 + if intent := strings.TrimSpace(resp.MessageIntent); intent != "" { + if isSkipIntent(intent) { + return nil + } + } + + if len(resp.Facts) == 0 { return nil } @@ -225,7 +248,7 @@ func fallbackNormalizedFacts(payload memorymodel.ExtractJobPayload) []memorymode MemoryType: memorymodel.MemoryTypeFact, Title: buildFallbackTitle(sourceText), Content: sourceText, - Confidence: 0.55, + Confidence: 0.45, Importance: defaultImportanceByType(memorymodel.MemoryTypeFact), SensitivityLevel: 0, IsExplicit: false, @@ -287,6 +310,17 @@ func defaultImportanceByType(memoryType string) float64 { } } +// isSkipIntent 判断意图是否属于"不需要记忆"的类别。 +// chitchat / task_request / knowledge_qa 三类直接跳过,不产出任何候选事实。 +func isSkipIntent(intent string) bool { + switch strings.ToLower(strings.TrimSpace(intent)) { + case "chitchat", "task_request", "knowledge_qa": + return true + default: + return false + } +} + func truncateForLog(raw *infrallm.TextResult) string { if raw == nil { return "" diff --git a/backend/memory/repo/item_repo.go b/backend/memory/repo/item_repo.go index c74a1db..d44fb41 100644 --- a/backend/memory/repo/item_repo.go +++ b/backend/memory/repo/item_repo.go @@ -258,7 +258,7 @@ func (r *ItemRepo) UpdateStatusByIDAt( } return r.db.WithContext(ctx). - Model(&model.MemoryItem{}). + Model(&model.MemoryItem{UserID: userID}). Where("id = ? AND user_id = ?", memoryID, userID). Updates(map[string]any{ "status": status, @@ -401,7 +401,7 @@ func (r *ItemRepo) UpdateManagedFieldsByIDAt( } return r.db.WithContext(ctx). - Model(&model.MemoryItem{}). + Model(&model.MemoryItem{UserID: userID}). Where("id = ? AND user_id = ?", memoryID, userID). Updates(map[string]any{ "memory_type": fields.MemoryType, @@ -434,7 +434,7 @@ func (r *ItemRepo) SoftDeleteByID(ctx context.Context, userID int, memoryID int6 } return r.db.WithContext(ctx). - Model(&model.MemoryItem{}). + Model(&model.MemoryItem{UserID: userID}). Where("id = ? AND user_id = ?", memoryID, userID). Updates(map[string]any{ "status": model.MemoryItemStatusDeleted, @@ -466,7 +466,7 @@ func (r *ItemRepo) RestoreByIDAt(ctx context.Context, userID int, memoryID int64 } return r.db.WithContext(ctx). - Model(&model.MemoryItem{}). + Model(&model.MemoryItem{UserID: userID}). Where("id = ? AND user_id = ?", memoryID, userID). Updates(map[string]any{ "status": model.MemoryItemStatusActive, diff --git a/backend/memory/service/config_loader.go b/backend/memory/service/config_loader.go index bd695f4..b87e6c8 100644 --- a/backend/memory/service/config_loader.go +++ b/backend/memory/service/config_loader.go @@ -39,6 +39,7 @@ func LoadConfigFromViper() memorymodel.Config { DecisionCandidateMinScore: viper.GetFloat64("memory.decision.candidateMinScore"), DecisionFallbackMode: viper.GetString("memory.decision.fallbackMode"), WriteMode: viper.GetString("memory.write.mode"), + WriteMinConfidence: viper.GetFloat64("memory.write.minConfidence"), } if cfg.Threshold <= 0 { @@ -83,6 +84,9 @@ func LoadConfigFromViper() memorymodel.Config { if cfg.WriteMode == "" { cfg.WriteMode = "legacy" } + if cfg.WriteMinConfidence <= 0 { + cfg.WriteMinConfidence = 0.5 + } return cfg } diff --git a/backend/memory/service/retrieve_merge.go b/backend/memory/service/retrieve_merge.go index 6e7f550..c7c50e9 100644 --- a/backend/memory/service/retrieve_merge.go +++ b/backend/memory/service/retrieve_merge.go @@ -10,12 +10,12 @@ import ( "github.com/LoveLosita/smartflow/backend/model" ) -// HybridRetrieve 统一承接读取侧混合召回链路。 +// HybridRetrieve 统一承接读取侧 RAG-first 召回链路。 // // 步骤化说明: -// 1. 结构化路由先取 constraint / 高置信 preference,给模型一份稳定“硬约束底座”; -// 2. 再补语义候选,优先走 RAG;RAG 报错或 0 命中时都回退 MySQL,保证链路韧性; -// 3. 两路结果统一做三级去重、排序与类型预算裁剪,只对最终真正注入的条目刷新 last_access_at; +// 1. 优先走 RAG 语义搜索,按 query 相关性召回候选记忆; +// 2. RAG 报错或 0 命中时回退 MySQL,保证链路韧性; +// 3. 召回结果做三级去重、排序与类型预算裁剪(总量不超过调用方 limit); // 4. 旧 legacy 链路完全保留,方便通过配置快速回滚。 func (s *ReadService) HybridRetrieve( ctx context.Context, @@ -32,41 +32,33 @@ func (s *ReadService) HybridRetrieve( return nil, telemetry, nil } - pinnedItems, err := s.retrievePinnedCandidates(ctx, req, effectiveSetting, now) + // RAG-first:只走语义召回,不再全量拉 MySQL pinned。 + items, semanticTelemetry, err := s.retrieveSemanticCandidates(ctx, req, effectiveSetting, limit, now) if err != nil { return nil, telemetry, err } - telemetry.PinnedHitCount = len(pinnedItems) - - semanticItems, semanticTelemetry, err := s.retrieveSemanticCandidates(ctx, req, effectiveSetting, limit, now) - if err != nil { - return nil, telemetry, err - } - telemetry.SemanticHitCount = len(semanticItems) + telemetry.SemanticHitCount = semanticTelemetry.HitCount telemetry.Degraded = semanticTelemetry.Degraded telemetry.RAGFallbackUsed = semanticTelemetry.RAGFallbackUsed - merged := make([]memorymodel.ItemDTO, 0, len(pinnedItems)+len(semanticItems)) - merged = append(merged, pinnedItems...) - merged = append(merged, semanticItems...) - if len(merged) == 0 { + if len(items) == 0 { return nil, telemetry, nil } - beforeDedupCount := len(merged) - merged = dedupByID(merged) - merged = dedupByHash(merged) - merged = dedupByText(merged) - telemetry.DedupDropCount = beforeDedupCount - len(merged) - merged = RankItems(merged, now) - merged = applyTypeBudget(merged, s.cfg) - if len(merged) == 0 { + beforeDedupCount := len(items) + items = dedupByID(items) + items = dedupByHash(items) + items = dedupByText(items) + telemetry.DedupDropCount = beforeDedupCount - len(items) + items = RankItems(items, now) + items = applyTypeBudget(items, s.cfg, limit) + if len(items) == 0 { return nil, telemetry, nil } - telemetry.FinalCount = len(merged) + telemetry.FinalCount = len(items) - _ = s.itemRepo.TouchLastAccessAt(ctx, collectItemDTOIDs(merged), now) - return merged, telemetry, nil + _ = s.itemRepo.TouchLastAccessAt(ctx, collectItemDTOIDs(items), now) + return items, telemetry, nil } func (s *ReadService) retrievePinnedCandidates( @@ -155,7 +147,7 @@ func (s *ReadService) retrieveSemanticCandidatesByMySQL( req, now, []string{model.MemoryItemStatusActive}, - normalizeLimit(candidateLimit*3, candidateLimit*3, maxRetrieveLimit*3), + normalizeLimit(candidateLimit, candidateLimit, maxRetrieveLimit), ) items, err := s.itemRepo.FindByQuery(ctx, query) @@ -255,17 +247,22 @@ func preferCurrentItem(previous memorymodel.ItemDTO, current memorymodel.ItemDTO return true } -// applyTypeBudget 在排序结果上应用四类记忆预算。 +// applyTypeBudget 在排序结果上应用四类记忆预算,并以 callerLimit 作为总量硬上限。 // // 说明: // 1. 每种类型先保底自己的预算上限,避免 fact 抢掉 constraint 的位置; // 2. 裁剪时保持当前排序顺序,不在这里重新打分; -// 3. 最终总量由四类预算之和共同决定,默认 18 条。 -func applyTypeBudget(items []memorymodel.ItemDTO, cfg memorymodel.Config) []memorymodel.ItemDTO { +// 3. 最终总量不超过 min(callerLimit, cfg.TotalReadBudget())。 +func applyTypeBudget(items []memorymodel.ItemDTO, cfg memorymodel.Config, callerLimit int) []memorymodel.ItemDTO { if len(items) == 0 { return nil } + hardCap := cfg.TotalReadBudget() + if callerLimit > 0 && callerLimit < hardCap { + hardCap = callerLimit + } + budgetByType := map[string]int{ memorymodel.MemoryTypeConstraint: cfg.EffectiveReadConstraintLimit(), memorymodel.MemoryTypePreference: cfg.EffectiveReadPreferenceLimit(), @@ -273,9 +270,9 @@ func applyTypeBudget(items []memorymodel.ItemDTO, cfg memorymodel.Config) []memo memorymodel.MemoryTypeTodoHint: cfg.EffectiveReadTodoHintLimit(), } usedByType := make(map[string]int, len(budgetByType)) - result := make([]memorymodel.ItemDTO, 0, minInt(len(items), cfg.TotalReadBudget())) + result := make([]memorymodel.ItemDTO, 0, minInt(len(items), hardCap)) for _, item := range items { - if len(result) >= cfg.TotalReadBudget() { + if len(result) >= hardCap { break } @@ -289,11 +286,10 @@ func applyTypeBudget(items []memorymodel.ItemDTO, cfg memorymodel.Config) []memo return result } +// hybridSemanticTopK 计算语义召回的候选集大小。 +// 使用 callerLimit 的 2 倍作为 TopK,保证去重后仍有足够结果填充预算。 func hybridSemanticTopK(cfg memorymodel.Config, limit int) int { - if cfg.TotalReadBudget() > limit { - return cfg.TotalReadBudget() - } - return limit + return limit * 2 } func resolveBudgetMemoryType(memoryType string) string { diff --git a/backend/memory/utils/settings.go b/backend/memory/utils/settings.go index c6d4461..357cdd1 100644 --- a/backend/memory/utils/settings.go +++ b/backend/memory/utils/settings.go @@ -60,3 +60,22 @@ func FilterItemsBySetting(items []model.MemoryItem, setting model.MemoryUserSett } return result } + +// FilterFactsByConfidence 按置信度阈值过滤候选事实。 +// +// 说明: +// 1. minConfidence <= 0 时不做过滤,保持向后兼容; +// 2. 过滤在 FilterFactsBySetting 之后执行,是写入链路的第二道程序化门槛; +// 3. 阈值由 memory.write.minConfidence 配置控制,默认 0.5。 +func FilterFactsByConfidence(facts []memorymodel.NormalizedFact, minConfidence float64) []memorymodel.NormalizedFact { + if minConfidence <= 0 || len(facts) == 0 { + return facts + } + result := make([]memorymodel.NormalizedFact, 0, len(facts)) + for _, fact := range facts { + if fact.Confidence >= minConfidence { + result = append(result, fact) + } + } + return result +} diff --git a/backend/memory/worker/runner.go b/backend/memory/worker/runner.go index 3f29257..743c8ec 100644 --- a/backend/memory/worker/runner.go +++ b/backend/memory/worker/runner.go @@ -163,6 +163,7 @@ func (r *Runner) RunOnce(ctx context.Context) (*RunOnceResult, error) { return result, nil } facts = memoryutils.FilterFactsBySetting(facts, effectiveSetting) + facts = memoryutils.FilterFactsByConfidence(facts, r.cfg.WriteMinConfidence) if len(facts) == 0 { if err = r.jobRepo.MarkSuccess(ctx, job.ID); err != nil { diff --git a/backend/middleware/cache_deleter.go b/backend/middleware/cache_deleter.go index 22cc44e..02e8692 100644 --- a/backend/middleware/cache_deleter.go +++ b/backend/middleware/cache_deleter.go @@ -78,11 +78,16 @@ func (p *GormCachePlugin) dispatchCacheLogic(modelObj interface{}) { p.invalidConversationHistoryCache(m.UserID, m.ChatID) case model.AgentChat: p.invalidConversationHistoryCache(m.UserID, m.ChatID) + case model.MemoryItem: + // 1. 管理面删除/修改/恢复/新增记忆时,自动失效该用户所有会话的预取缓存; + // 2. repo 方法通过 Model(&model.MemoryItem{UserID: userID}) 携带 userID, + // 此处从模型实例中提取 UserID 进行精准失效; + // 3. 若 UserID 为 0(无 userID 参数的 repo 方法),invalidMemoryPrefetchCache 内部守卫会直接跳过。 + p.invalidMemoryPrefetchCache(m.UserID) case model.AgentOutboxMessage, model.User, model.AgentStateSnapshotRecord, model.MemoryJob, - model.MemoryItem, model.MemoryAuditLog, model.MemoryUserSetting: // 这些模型当前没有前台缓存读取链路依赖,故意静默忽略。 @@ -163,3 +168,23 @@ func (p *GormCachePlugin) invalidConversationHistoryCache(userID int, conversati log.Printf("[GORM-Cache] Invalidated conversation history cache for user %d conversation %s", userID, normalizedConversationID) }() } + +// invalidMemoryPrefetchCache 失效指定用户所有会话的记忆预取缓存。 +// +// 步骤化说明: +// 1. 先守卫 userID==0,无 userID 的 repo 方法(如 UpdateContentByID)触发 callback 时直接跳过; +// 2. 异步调用 DeleteMemoryPrefetchCacheByUser,按模式 smartflow:memory_prefetch:u:{userID}:c:* 批量删除; +// 3. 失败只记日志,不阻塞主事务,30 分钟 TTL 自然过期兜底。 +func (p *GormCachePlugin) invalidMemoryPrefetchCache(userID int) { + if userID == 0 { + return + } + + go func() { + if err := p.cacheDAO.DeleteMemoryPrefetchCacheByUser(context.Background(), userID); err != nil { + log.Printf("[GORM-Cache] Failed to invalidate memory prefetch cache for user %d: %v", userID, err) + return + } + log.Printf("[GORM-Cache] Invalidated memory prefetch cache for user %d", userID) + }() +} diff --git a/backend/newAgent/model/graph_run_state.go b/backend/newAgent/model/graph_run_state.go index a7d3106..1628070 100644 --- a/backend/newAgent/model/graph_run_state.go +++ b/backend/newAgent/model/graph_run_state.go @@ -3,6 +3,7 @@ package model import ( "context" "strings" + "time" infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" @@ -13,7 +14,7 @@ import ( // AgentGraphRequest 描述一次 agent graph 运行的请求级输入。 // // 职责边界: -// 1. 这里只放“当前这次请求”天然携带的轻量数据,例如用户本轮输入; +// 1. 这里只放"当前这次请求"天然携带的轻量数据,例如用户本轮输入; // 2. 不负责承载可持久化流程状态,流程状态仍归 AgentRuntimeState; // 3. 不负责承载 LLM / emitter / store 等依赖,这些统一放进 AgentGraphDeps。 type AgentGraphRequest struct { @@ -54,7 +55,7 @@ type WriteSchedulePreviewFunc func(ctx context.Context, state *schedule.Schedule // AgentGraphDeps 描述 graph/node 层运行时真正依赖的可插拔能力。 // // 设计目的: -// 1. 让 graph 不再只拿到”裸状态”,而是能拿到上下文、模型和输出能力; +// 1. 让 graph 不再只拿到"裸状态",而是能拿到上下文、模型和输出能力; // 2. Chat/Plan/Execute/Deliver 允许分别挂不同 client,但也允许先复用同一个 client; // 3. ChunkEmitter 统一承接阶段提示、正文、工具事件、确认请求等 SSE 输出。 type AgentGraphDeps struct { @@ -70,13 +71,29 @@ type AgentGraphDeps struct { CompactionStore CompactionStore // 按 DAO 注入,用于 Execute 上下文压缩持久化 RoughBuildFunc RoughBuildFunc // 按 Service 注入,粗排算法入口 WriteSchedulePreview WriteSchedulePreviewFunc // 按 Service 注入,排程预览写入入口 + + // 记忆预取管线:由 service 层启动的后台检索 goroutine 写入。 + // channel 携带已渲染的文本内容(非原始 ItemDTO),节点直接写入 pinned block。 + MemoryFuture chan string // buffered(1),携带 renderMemoryPinnedContentByMode 的输出 + MemoryConsumed bool // 保证 channel 只读一次,后续 Execute ReAct 循环跳过等待 } +// --- 记忆 pinned block 常量(供 agentsvc 和 node 层共享) --- + +const ( + // MemoryContextBlockKey 记忆上下文在 ConversationContext PinnedBlock 中的唯一 key。 + MemoryContextBlockKey = "memory_context" + // MemoryContextBlockTitle 记忆上下文 pinned block 的标题,用于 prompt 渲染。 + MemoryContextBlockTitle = "相关记忆" + // MemoryFreshTimeout 是 Execute/Plan 节点等待后台记忆检索完成的最大时长。 + MemoryFreshTimeout = 500 * time.Millisecond +) + // EnsureChunkEmitter 保证 graph 运行时始终有一个可用的 chunk 发射器。 // // 步骤说明: // 1. 依赖为空时回退到 Noop emitter,避免骨架期因为没接前端而到处判空; -// 2. 这里只兜底“能安全调用”,不负责填充真实 request_id / model_name; +// 2. 这里只兜底"能安全调用",不负责填充真实 request_id / model_name; // 3. 后续 service 层一旦接上真实 emitter,会自然覆盖这里的空实现。 func (d *AgentGraphDeps) EnsureChunkEmitter() *newagentstream.ChunkEmitter { if d == nil { @@ -250,7 +267,7 @@ func (s *AgentGraphState) EnsureScheduleState(ctx context.Context) (*schedule.Sc if s.OriginalScheduleState == nil { // 1. 兼容老快照:历史 Redis 快照里可能还没带 original_state。 // 2. 当前阶段虽然已经不落库,但后续若重新接回 diff 链,仍需要稳定的原始快照。 - // 3. 因此这里在“已恢复出 ScheduleState、但缺 original”时补一份克隆兜底。 + // 3. 因此这里在"已恢复出 ScheduleState、但缺 original"时补一份克隆兜底。 s.OriginalScheduleState = s.ScheduleState.Clone() } schedule.FilterScheduleStateForTaskClassScope(s.ScheduleState, flowState.TaskClassIDs) @@ -266,7 +283,7 @@ func (s *AgentGraphState) EnsureScheduleState(ctx context.Context) (*schedule.Sc err error ) // 1. 若 provider 支持按 task_class_ids 精确加载,则优先走 scoped 入口。 - // 2. 这样可以让 DayMapping 与粗排算法使用同一批任务类窗口,避免“全量任务类脏日期污染本轮窗口”。 + // 2. 这样可以让 DayMapping 与粗排算法使用同一批任务类窗口,避免"全量任务类脏日期污染本轮窗口"。 // 3. 若当前实现尚未支持 scoped 加载,则回退到旧入口,并继续复用后面的 scope 裁剪。 if scopedProvider, ok := s.Deps.ScheduleProvider.(ScopedScheduleStateProvider); ok && len(flowState.TaskClassIDs) > 0 { state, err = scopedProvider.LoadScheduleStateForTaskClasses(ctx, userID, flowState.TaskClassIDs) diff --git a/backend/newAgent/node/agent_nodes.go b/backend/newAgent/node/agent_nodes.go index 9e46431..42b42e7 100644 --- a/backend/newAgent/node/agent_nodes.go +++ b/backend/newAgent/node/agent_nodes.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "log" + "strings" + "time" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" "github.com/LoveLosita/smartflow/backend/newAgent/tools/schedule" @@ -105,6 +107,9 @@ func (n *AgentNodes) Plan(ctx context.Context, st *newagentmodel.AgentGraphState return nil, errors.New("plan node: state is nil") } + // 等待后台记忆检索完成,注入最新记忆后再启动 Plan。 + ensureFreshMemory(st) + if err := RunPlanNode( ctx, PlanNodeInput{ @@ -206,6 +211,9 @@ func (n *AgentNodes) Execute(ctx context.Context, st *newagentmodel.AgentGraphSt st.EnsureConversationContext().SetToolSchemas(toolSchemas) } + // 等待后台记忆检索完成,注入最新记忆后再启动 Execute。 + ensureFreshMemory(st) + if err := RunExecuteNode( ctx, ExecuteNodeInput{ @@ -292,6 +300,34 @@ func (n *AgentNodes) Deliver(ctx context.Context, st *newagentmodel.AgentGraphSt return st, nil } +// --- 记忆预取消费辅助 --- + +// ensureFreshMemory 等待后台记忆检索完成,将最新结果注入 ConversationContext。 +// +// 设计说明: +// 1. 只在首次调用时等待 channel(最多 500ms),后续调用直接跳过; +// 2. 覆盖 ConversationContext 中已有的缓存记忆(UpsertPinnedBlock 按 key 覆盖); +// 3. timeout 后保留缓存记忆不替换,保证 Execute ReAct 循环不会因超时丢失记忆。 +func ensureFreshMemory(st *newagentmodel.AgentGraphState) { + if st == nil || st.Deps.MemoryConsumed || st.Deps.MemoryFuture == nil { + return + } + st.Deps.MemoryConsumed = true // 标记已消费,后续调用直接跳过 + + select { + case content := <-st.Deps.MemoryFuture: + if strings.TrimSpace(content) != "" { + st.EnsureConversationContext().UpsertPinnedBlock(newagentmodel.ContextBlock{ + Key: newagentmodel.MemoryContextBlockKey, + Title: newagentmodel.MemoryContextBlockTitle, + Content: content, + }) + } + case <-time.After(newagentmodel.MemoryFreshTimeout): + // timeout:保留 ConversationContext 中已有的缓存记忆,不做额外操作 + } +} + // --- 持久化辅助 --- // saveAgentState 在节点执行成功后,将当前运行态快照保存到 Redis。 diff --git a/backend/service/agentsvc/agent_memory.go b/backend/service/agentsvc/agent_memory.go index 8179777..f34b6dc 100644 --- a/backend/service/agentsvc/agent_memory.go +++ b/backend/service/agentsvc/agent_memory.go @@ -12,16 +12,14 @@ import ( ) const ( - newAgentMemoryBlockKey = "memory_context" newAgentMemoryRetrieveLimit = 5 - newAgentMemoryBlockTitle = "相关记忆" newAgentMemoryIntroLine = "以下是与当前对话相关的用户记忆,仅在自然且确实有帮助时参考,不要生硬复述。" ) // MemoryReader 描述 newAgent 主链路读取记忆所需的最小能力。 // // 职责边界: -// 1. 只负责“按当前输入取回候选记忆”; +// 1. 只负责"按当前输入取回候选记忆"; // 2. 不负责 prompt 拼装,也不要求调用方感知 memory 模块内部 repo/service 结构; // 3. 返回值直接复用 memory DTO,避免 service 层再维护一套重复结构。 type MemoryReader interface { @@ -47,27 +45,68 @@ func (s *AgentService) SetMemoryReader(reader MemoryReader, cfg memorymodel.Conf // injectMemoryContext 在 graph 执行前,把本轮相关记忆写入 ConversationContext 的 pinned block。 // -// 步骤说明: -// 1. 先做前置门控:没有 reader、没有有效用户、或输入属于“确认/应答型短句”时,直接清掉旧 block,避免快照残留污染本轮 prompt。 -// 2. 再调用 memory 检索:查询失败只记日志,不中断主链路,保证 newAgent 的可用性优先。 -// 3. 检索成功后把结果渲染成稳定的中文文本,并用固定 key 覆盖写入,确保每轮都能刷新而不是越积越多。 +// 改造后采用"预取管线"模式: +// 1. 先读 Redis 预取缓存(上一轮写入),命中则立即注入到 ConversationContext; +// 2. 再启动后台 goroutine 做完整记忆检索,渲染后发到 channel + 写 Redis; +// 3. Chat 节点直接用缓存记忆启动(首字节零延迟),Execute/Plan 通过 channel 消费最新结果。 func (s *AgentService) injectMemoryContext( ctx context.Context, conversationContext *newagentmodel.ConversationContext, userID int, chatID string, userMessage string, -) { +) chan string { + memoryFuture := make(chan string, 1) + if conversationContext == nil { - return + return memoryFuture } - if s.memoryReader == nil || userID <= 0 || !shouldInjectMemoryForInput(userMessage) { - conversationContext.RemovePinnedBlock(newAgentMemoryBlockKey) - return + // 1. 门控检查:无 reader 或无效用户时清掉旧 block 并返回空 channel。 + if s.memoryReader == nil || userID <= 0 { + conversationContext.RemovePinnedBlock(newagentmodel.MemoryContextBlockKey) + return memoryFuture } - items, err := s.memoryReader.Retrieve(ctx, memorymodel.RetrieveRequest{ + // 2. 读 Redis 预取缓存(<5ms),命中则注入。 + cachedItems, _ := s.cacheDAO.GetMemoryPrefetchCache(ctx, userID, chatID) + if len(cachedItems) > 0 { + content := renderMemoryPinnedContentByMode(cachedItems, s.memoryCfg.EffectiveInjectRenderMode()) + if content != "" { + conversationContext.UpsertPinnedBlock(newagentmodel.ContextBlock{ + Key: newagentmodel.MemoryContextBlockKey, + Title: newagentmodel.MemoryContextBlockTitle, + Content: content, + }) + s.recordMemoryInject(ctx, userID, len(cachedItems), true, nil, "prefetch_cache") + log.Printf("[INFO] memory prefetch: 从 Redis 缓存注入记忆 user=%d count=%d", userID, len(cachedItems)) + } + } + + // 3. 短应答不启动后台检索,节省资源。 + if !shouldInjectMemoryForInput(userMessage) { + log.Printf("[INFO] memory prefetch: 短应答跳过检索 user=%d msg=%q", userID, userMessage) + return memoryFuture + } + + // 4. 启动后台 goroutine:完整检索 → 渲染 → 发 channel + 写 Redis。 + log.Printf("[INFO] memory prefetch: 启动后台检索 goroutine user=%d chat=%s", userID, chatID) + go s.prefetchMemoryForNextTurn(userID, chatID, userMessage, memoryFuture) + + return memoryFuture +} + +// prefetchMemoryForNextTurn 后台执行完整记忆检索,将结果渲染后发送到 channel 并写入 Redis。 +// +// 职责边界: +// 1. 检索结果渲染为文本后发送到 memoryFuture channel(供 Execute/Plan 节点消费); +// 2. 原始 ItemDTO 写入 Redis 预取缓存(供下一轮 Chat 节点消费); +// 3. 检索失败只记日志,不阻断主链路。 +func (s *AgentService) prefetchMemoryForNextTurn(userID int, chatID, userMessage string, memoryFuture chan string) { + bgCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + items, err := s.memoryReader.Retrieve(bgCtx, memorymodel.RetrieveRequest{ Query: strings.TrimSpace(userMessage), UserID: userID, ConversationID: strings.TrimSpace(chatID), @@ -75,32 +114,40 @@ func (s *AgentService) injectMemoryContext( Now: time.Now(), }) if err != nil { - conversationContext.RemovePinnedBlock(newAgentMemoryBlockKey) - s.recordMemoryInject(ctx, userID, 0, false, err) - log.Printf("读取记忆上下文失败 user=%d chat=%s err=%v", userID, chatID, err) + log.Printf("[WARN] 记忆预取失败 user=%d chat=%s: %v", userID, chatID, err) + s.recordMemoryInject(bgCtx, userID, 0, false, err, "prefetch_retrieve") return } + log.Printf("[INFO] memory prefetch: 后台检索完成 user=%d count=%d", userID, len(items)) + + if len(items) == 0 { + // 1. 检索为空说明该用户当前没有可用记忆,旧缓存已过期; + // 2. 主动清除该用户所有会话的预取缓存,避免过期记忆在下一轮继续注入; + // 3. 清除失败只记日志,不阻断主链路,缓存自然过期也可兜底。 + if cacheErr := s.cacheDAO.DeleteMemoryPrefetchCacheByUser(context.Background(), userID); cacheErr != nil { + log.Printf("[WARN] memory prefetch cache clear failed (empty result) user=%d: %v", userID, cacheErr) + } + return + } + + // 渲染并发送到 channel(供 Execute/Plan 节点消费)。 content := renderMemoryPinnedContentByMode(items, s.memoryCfg.EffectiveInjectRenderMode()) - if content == "" { - conversationContext.RemovePinnedBlock(newAgentMemoryBlockKey) - s.recordMemoryInject(ctx, userID, len(items), false, nil) - return + if content != "" { + memoryFuture <- content } - conversationContext.UpsertPinnedBlock(newagentmodel.ContextBlock{ - Key: newAgentMemoryBlockKey, - Title: newAgentMemoryBlockTitle, - Content: content, - }) - s.recordMemoryInject(ctx, userID, len(items), true, nil) + // 同时写入 Redis 供下一轮 Chat 使用。 + if cacheErr := s.cacheDAO.SetMemoryPrefetchCache(context.Background(), userID, chatID, items); cacheErr != nil { + log.Printf("[WARN] 记忆预取缓存写入失败 user=%d: %v", userID, cacheErr) + } } // shouldInjectMemoryForInput 判断当前输入是否值得触发一次记忆召回。 // // 步骤说明: // 1. 空输入直接跳过; -// 2. 对“好/确认/ok”这类弱语义应答做显式拦截,避免 legacy fallback 在无查询价值时注入一批高分但不相关的旧记忆; +// 2. 对"好/确认/ok"这类弱语义应答做显式拦截,避免 legacy fallback 在无查询价值时注入一批高分但不相关的旧记忆; // 3. 其余输入一律放行,优先保证 MVP 可用。 func shouldInjectMemoryForInput(userMessage string) bool { trimmed := strings.TrimSpace(userMessage) @@ -122,6 +169,7 @@ func (s *AgentService) recordMemoryInject( inputCount int, success bool, err error, + source string, ) { if s == nil { return @@ -153,11 +201,13 @@ func (s *AgentService) recordMemoryInject( "success": success && err == nil, "error": err, "error_code": memoryobserve.ClassifyError(err), + "source": source, }, }) if inputCount > 0 { metrics.AddCounter(memoryobserve.MetricInjectItemTotal, int64(inputCount), map[string]string{ "inject_mode": s.memoryCfg.EffectiveInjectRenderMode(), + "source": source, }) } } diff --git a/backend/service/agentsvc/agent_newagent.go b/backend/service/agentsvc/agent_newagent.go index 1838fcd..159bb36 100644 --- a/backend/service/agentsvc/agent_newagent.go +++ b/backend/service/agentsvc/agent_newagent.go @@ -108,10 +108,11 @@ func (s *AgentService) runNewAgentGraph( } else { conversationContext = s.loadConversationContext(requestCtx, chatID, userMessage) } - // 5.1. 在 graph 执行前统一补充与当前输入相关的记忆上下文。 - // 5.1.1 这里采用 pinned block 注入,这样 chat / plan / execute / deliver 各阶段都能自动复用。 - // 5.1.2 检索失败只降级为“本轮不注入记忆”,不阻断主链路。 - s.injectMemoryContext(requestCtx, conversationContext, userID, chatID, userMessage) + // 5.1. 在 graph 执行前统一补充与当前输入相关的记忆上下文(预取管线模式)。 + // 5.1.1 先读 Redis 预取缓存注入到 ConversationContext,再启动后台 goroutine 做完整检索; + // 5.1.2 返回的 channel 传入 Deps,供 Execute/Plan 节点在启动前消费最新记忆; + // 5.1.3 检索失败只降级为”本轮不注入记忆”,不阻断主链路。 + memoryFuture := s.injectMemoryContext(requestCtx, conversationContext, userID, chatID, userMessage) // 5.5 将前端传入的 thinkingMode 写入 CommonState,供 ChatNode 及下游节点读取。 cs := runtimeState.EnsureCommonState() @@ -171,6 +172,7 @@ func (s *AgentService) runNewAgentGraph( CompactionStore: s.compactionStore, RoughBuildFunc: s.makeRoughBuildFunc(), WriteSchedulePreview: s.makeWriteSchedulePreviewFunc(), + MemoryFuture: memoryFuture, } // 10. 构造 AgentGraphRunInput 并运行 graph。