Version: 0.9.21.dev.260416

后端:
1. Memory 写入链路新增"召回→比对→汇总"去重决策层
- 新增决策流程:Runner 根据decision.enabled 配置走决策路径(语义召回候选 → Hash 精确命中 → LLM 逐对比对 → 汇总决策 → 执行 ADD/UPDATE/DELETE/NONE),默认关闭,旧路径完全保留
- 新增 LLMDecisionOrchestrator:单对关系判断编排器,输出 duplicate/update/conflict/unrelated 四种关系
- 新增 decision_flow / apply_actions:决策流程主循环与动作落地(新增、更新内容、软删除、跳过)
- 新增 aggregate_decision / decision_validate:汇总规则(按优先级判定动作)与 LLM 输出校验
- 新增 decision model:CandidateSnapshot / ComparisonResult / FinalDecision 等决策层核心类型
- ItemRepo 新增 FindActiveByHash / UpdateContentByID / SoftDeleteByID 三个决策层专用方法
- RAG Runtime / Pipeline / Service 新增 DeleteMemory 向量删除能力,MilvusStore 补充 duplicate collection 错误识别
- Runner 新增 syncVectorDeletes 处理决策层 DELETE 动作的向量清理
- config 新增 decision(enabled/candidateTopK/candidateMinScore/fallbackMode)和 write.mode 配置项,config_loader 增加默认值兜底
- 删除 HANDOFF-RAG复用后续实施计划.md 和旧 log.txt,新增 Log.txt 记录决策流程调试日志
- normalize_facts 导出 HashContent 供决策层复用,audit 新增 update 操作常量

前端:无 仓库:无
This commit is contained in:
Losita
2026-04-16 12:11:58 +08:00
parent 8bde981592
commit 634a9fb926
21 changed files with 1271 additions and 1032 deletions

View File

@@ -0,0 +1,248 @@
package worker
import (
"context"
"fmt"
"strings"
memorymodel "github.com/LoveLosita/smartflow/backend/memory/model"
memoryrepo "github.com/LoveLosita/smartflow/backend/memory/repo"
memoryutils "github.com/LoveLosita/smartflow/backend/memory/utils"
"github.com/LoveLosita/smartflow/backend/model"
)
// ApplyActionOutcome 是单个决策动作的执行结果。
//
// 说明:
// 1. Action 记录本次执行的动作类型ADD/UPDATE/DELETE/NONE
// 2. OldItem 仅在 UPDATE/DELETE 时有值,用于审计 before 快照;
// 3. NewItem 仅在 ADD/UPDATE 时有值,用于审计 after 快照和向量同步;
// 4. NeedsSync 标记是否需要触发向量同步ADD 和 UPDATE 需要)。
type ApplyActionOutcome struct {
Action string
MemoryID int64
OldItem *model.MemoryItem // UPDATE/DELETE 时的 before 快照
NewItem *model.MemoryItem // ADD/UPDATE 时的 after 快照
NeedsSync bool // 是否需要向量同步
}
// ApplyFinalDecision 把汇总后的最终决策落为数据库动作。
//
// 职责边界:
// 1. 在调用方事务内执行,不做独立事务管理;
// 2. 负责写 memory_items + memory_audit_logs不负责 job 状态推进;
// 3. 所有动作的审计日志都由这里统一产出。
//
// 参数说明:
// - itemRepo/auditRepo 必须是事务绑定的实例WithTx 后的);
// - fact 是当前正在处理的标准化事实;
// - job/payload 提供写入所需的上下文user_id、conversation_id 等)。
func ApplyFinalDecision(
ctx context.Context,
itemRepo *memoryrepo.ItemRepo,
auditRepo *memoryrepo.AuditRepo,
decision memorymodel.FinalDecision,
fact memorymodel.NormalizedFact,
job *model.MemoryJob,
payload memorymodel.ExtractJobPayload,
) (*ApplyActionOutcome, error) {
switch decision.Action {
case memorymodel.DecisionActionAdd:
return applyAdd(ctx, itemRepo, auditRepo, fact, job, payload, decision.Reason)
case memorymodel.DecisionActionUpdate:
return applyUpdate(ctx, itemRepo, auditRepo, decision, fact, job, payload)
case memorymodel.DecisionActionDelete:
return applyDelete(ctx, itemRepo, auditRepo, decision, payload.UserID)
case memorymodel.DecisionActionNone:
return &ApplyActionOutcome{
Action: memorymodel.DecisionActionNone,
NeedsSync: false,
}, nil
default:
return nil, fmt.Errorf("未知的决策动作: %s", decision.Action)
}
}
// applyAdd 执行新增动作:构建 MemoryItem → 写库 → 写审计。
func applyAdd(
ctx context.Context,
itemRepo *memoryrepo.ItemRepo,
auditRepo *memoryrepo.AuditRepo,
fact memorymodel.NormalizedFact,
job *model.MemoryJob,
payload memorymodel.ExtractJobPayload,
reason string,
) (*ApplyActionOutcome, error) {
// 1. 复用 runner.go 的 buildMemoryItems 构建单条 MemoryItem。
items := buildMemoryItems(job, payload, []memorymodel.NormalizedFact{fact})
if len(items) == 0 {
return nil, fmt.Errorf("构建记忆条目失败: memory_type=%s", fact.MemoryType)
}
// 2. 写库GORM Create 会自动填充 items[0].ID。
if err := itemRepo.UpsertItems(ctx, items); err != nil {
return nil, fmt.Errorf("新增记忆写入失败: %w", err)
}
// 注意:必须在 UpsertItems 之后取 items[0],因为 GORM Create 回填 ID 到 items[i]
// 之前用 item := items[0] 在 UpsertItems 之前拷贝,导致副本 ID 永远为 0。
item := items[0]
// 3. 写审计日志create 动作只有 after 快照)。
audit := memoryutils.BuildItemAuditLog(
item.ID,
item.UserID,
memoryutils.AuditOperationCreate,
"system",
formatAuditReason("决策层新增", reason),
nil,
&item,
)
if err := auditRepo.Create(ctx, audit); err != nil {
return nil, fmt.Errorf("新增审计写入失败: %w", err)
}
return &ApplyActionOutcome{
Action: memorymodel.DecisionActionAdd,
MemoryID: item.ID,
NewItem: &item,
NeedsSync: true,
}, nil
}
// applyUpdate 执行更新动作:查 before → 更新字段 → 写审计before+after
func applyUpdate(
ctx context.Context,
itemRepo *memoryrepo.ItemRepo,
auditRepo *memoryrepo.AuditRepo,
decision memorymodel.FinalDecision,
fact memorymodel.NormalizedFact,
job *model.MemoryJob,
payload memorymodel.ExtractJobPayload,
) (*ApplyActionOutcome, error) {
// 1. 查 before 快照,同时确认旧记忆存在且属于该用户。
oldItem, err := itemRepo.GetByIDForUser(ctx, payload.UserID, decision.TargetID)
if err != nil {
return nil, fmt.Errorf("查询旧记忆失败(id=%d): %w", decision.TargetID, err)
}
// 2. 重新计算 NormalizedContent 和 ContentHash保证和 NormalizeFacts 的逻辑一致。
// 原因LLM 输出的 merged content 需要重新走归一化链,避免大小写/空格差异导致后续 Hash 去重失效。
updatedContent := strings.TrimSpace(decision.Content)
if updatedContent == "" {
updatedContent = fact.Content
}
normalizedContent := strings.ToLower(updatedContent)
// 复用 utils.HashContent 的 sha256(memoryType + "::" + normalizedContent) 算法。
contentHash := memoryutils.HashContent(fact.MemoryType, normalizedContent)
title := strings.TrimSpace(decision.Title)
if title == "" {
title = oldItem.Title
}
// 3. 执行内容更新。
fields := memorymodel.UpdateContentFields{
Title: title,
Content: updatedContent,
NormalizedContent: normalizedContent,
ContentHash: contentHash,
Confidence: fact.Confidence,
Importance: fact.Importance,
}
if err := itemRepo.UpdateContentByID(ctx, decision.TargetID, fields); err != nil {
return nil, fmt.Errorf("更新记忆内容失败(id=%d): %w", decision.TargetID, err)
}
// 4. 构造 after 快照用于审计。
afterItem := *oldItem
afterItem.Title = title
afterItem.Content = updatedContent
if afterItem.NormalizedContent != nil {
afterItem.NormalizedContent = &normalizedContent
} else {
afterItem.NormalizedContent = strPtrFromValue(normalizedContent)
}
if afterItem.ContentHash != nil {
afterItem.ContentHash = &contentHash
} else {
afterItem.ContentHash = strPtrFromValue(contentHash)
}
afterItem.Confidence = fact.Confidence
afterItem.Importance = fact.Importance
// 5. 写审计日志update 动作同时有 before 和 after 快照)。
audit := memoryutils.BuildItemAuditLog(
oldItem.ID,
oldItem.UserID,
memoryutils.AuditOperationUpdate,
"system",
formatAuditReason("决策层更新", decision.Reason),
oldItem,
&afterItem,
)
if err := auditRepo.Create(ctx, audit); err != nil {
return nil, fmt.Errorf("更新审计写入失败: %w", err)
}
// 6. 向量状态重置为 pending触发向量重同步。
// 原因:内容变了,旧向量已过期,需要重新 embed。
_ = itemRepo.UpdateVectorStateByID(ctx, oldItem.ID, "pending", nil)
return &ApplyActionOutcome{
Action: memorymodel.DecisionActionUpdate,
MemoryID: oldItem.ID,
OldItem: oldItem,
NewItem: &afterItem,
NeedsSync: true,
}, nil
}
// applyDelete 执行软删除动作:查 before → 软删 → 写审计before only
func applyDelete(
ctx context.Context,
itemRepo *memoryrepo.ItemRepo,
auditRepo *memoryrepo.AuditRepo,
decision memorymodel.FinalDecision,
userID int,
) (*ApplyActionOutcome, error) {
// 1. 查 before 快照。
oldItem, err := itemRepo.GetByIDForUser(ctx, userID, decision.TargetID)
if err != nil {
return nil, fmt.Errorf("查询旧记忆失败(id=%d): %w", decision.TargetID, err)
}
// 2. 执行软删除。
if err := itemRepo.SoftDeleteByID(ctx, userID, decision.TargetID); err != nil {
return nil, fmt.Errorf("软删除记忆失败(id=%d): %w", decision.TargetID, err)
}
// 3. 写审计日志delete 动作只有 before 快照)。
audit := memoryutils.BuildItemAuditLog(
oldItem.ID,
oldItem.UserID,
memoryutils.AuditOperationDelete,
"system",
formatAuditReason("决策层删除", decision.Reason),
oldItem,
nil,
)
if err := auditRepo.Create(ctx, audit); err != nil {
return nil, fmt.Errorf("删除审计写入失败: %w", err)
}
return &ApplyActionOutcome{
Action: memorymodel.DecisionActionDelete,
MemoryID: oldItem.ID,
OldItem: oldItem,
NeedsSync: false,
}, nil
}
// formatAuditReason 统一审计日志的 reason 格式。
func formatAuditReason(prefix, detail string) string {
detail = strings.TrimSpace(detail)
if detail == "" {
return prefix
}
return prefix + ": " + detail
}

View File

@@ -0,0 +1,367 @@
package worker
import (
"context"
"fmt"
infrarag "github.com/LoveLosita/smartflow/backend/infra/rag"
memorymodel "github.com/LoveLosita/smartflow/backend/memory/model"
memoryrepo "github.com/LoveLosita/smartflow/backend/memory/repo"
memoryutils "github.com/LoveLosita/smartflow/backend/memory/utils"
"github.com/LoveLosita/smartflow/backend/model"
"gorm.io/gorm"
)
// DecisionFlowOutcome 是一轮决策流程的汇总结果。
//
// 说明:
// 1. AddCount/UpdateCount/DeleteCount/NoneCount 分别统计四种动作的执行次数;
// 2. ItemsToSync 收集所有需要向量同步的 itemADD 和 UPDATE 产出的);
// 3. VectorDeletes 收集所有需要从向量库删除的 memory_idDELETE 动作产出的)。
type DecisionFlowOutcome struct {
AddCount int
UpdateCount int
DeleteCount int
NoneCount int
ItemsToSync []model.MemoryItem // 需要向量同步的新增/更新 item
VectorDeletes []int64 // 需要从向量库删除的 memory_id 列表
}
// factDecisionResult 是单条 fact 的决策执行结果,支持一对多动作。
// 原因conflict 场景下会产生 DELETE + ADD 两个动作,需要打包返回。
type factDecisionResult struct {
Outcomes []*ApplyActionOutcome
}
// executeDecisionFlow 在 worker 内编排"召回→逐对比对→汇总→执行"全流程。
//
// 职责边界:
// 1. 对每条 fact 独立执行完整决策流程fact 之间互不影响;
// 2. 所有数据库写操作在同一个事务内完成,保证原子性;
// 3. 向量同步在事务外异步执行,不影响事务提交。
//
// 降级策略:
// 1. Milvus 不可用时,回退到 MySQL 按类型查最近 N 条活跃记忆;
// 2. 单条 LLM 比对失败不影响其他候选,视为 unrelated
// 3. 整体流程报错时,由上层根据 FallbackMode 决定是否退回旧路径。
func (r *Runner) executeDecisionFlow(
ctx context.Context,
job *model.MemoryJob,
payload memorymodel.ExtractJobPayload,
facts []memorymodel.NormalizedFact,
) (*DecisionFlowOutcome, error) {
outcome := &DecisionFlowOutcome{
ItemsToSync: make([]model.MemoryItem, 0, len(facts)),
VectorDeletes: make([]int64, 0),
}
// 1. 所有数据库写操作在同一个事务内完成。
err := r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
itemRepo := r.itemRepo.WithTx(tx)
auditRepo := r.auditRepo.WithTx(tx)
jobRepo := r.jobRepo.WithTx(tx)
for _, fact := range facts {
// 2. 对每条 fact 执行完整决策流程。
result, err := r.executeDecisionForFact(ctx, itemRepo, auditRepo, fact, job, payload)
if err != nil {
// 单条 fact 决策失败不影响其他 fact记录日志后继续。
if r.logger != nil {
r.logger.Printf("[WARN][去重] 单条 fact 决策失败,跳过继续: job_id=%d user_id=%d memory_type=%s hash=%s err=%v", job.ID, payload.UserID, fact.MemoryType, fact.ContentHash, err)
}
continue
}
// 3. 汇总结果到全局 outcome。
for _, actionOutcome := range result.Outcomes {
r.collectActionOutcome(outcome, actionOutcome)
}
}
// 4. 事务内最后确认 job 成功。
return jobRepo.MarkSuccess(ctx, job.ID)
})
if err != nil {
return nil, err
}
return outcome, nil
}
// executeDecisionForFact 对单条 fact 执行完整决策流程。
//
// 步骤:
// 1. Hash 精确命中检查 — 已有完全相同内容则直接跳过;
// 2. Milvus 语义召回 — 从旧记忆中筛出 TopK 候选(含降级);
// 3. 逐对 LLM 比对 — 每次拿一条新 fact 和一条旧候选比对;
// 4. 确定性汇总 — 根据 LLM 比对结果确定 ADD/UPDATE/DELETE/NONE
// 5. 校验 + 执行 — 落为数据库动作 + 审计日志。
func (r *Runner) executeDecisionForFact(
ctx context.Context,
itemRepo *memoryrepo.ItemRepo,
auditRepo *memoryrepo.AuditRepo,
fact memorymodel.NormalizedFact,
job *model.MemoryJob,
payload memorymodel.ExtractJobPayload,
) (*factDecisionResult, error) {
result := &factDecisionResult{}
// Step 1: Hash 精确命中检查。
// 原因:如果已有完全相同内容的记忆,直接跳过,无需调 LLM。
existing, err := itemRepo.FindActiveByHash(ctx, payload.UserID, fact.ContentHash)
if err != nil {
if r.logger != nil {
r.logger.Printf("[WARN][去重] Hash 精确匹配查询失败: user_id=%d memory_type=%s hash=%s err=%v", payload.UserID, fact.MemoryType, fact.ContentHash, err)
}
}
if len(existing) > 0 {
result.Outcomes = append(result.Outcomes, &ApplyActionOutcome{
Action: memorymodel.DecisionActionNone,
NeedsSync: false,
})
return result, nil
}
// Step 2: Milvus 语义召回(含降级)。
candidates := r.recallCandidates(ctx, payload, fact)
// 打印召回候选详情,便于排查向量召回和阈值过滤效果。
if r.logger != nil {
r.logger.Printf("[DEBUG][去重] 语义召回候选: job_id=%d user_id=%d memory_type=%s candidate_count=%d",
job.ID, payload.UserID, fact.MemoryType, len(candidates))
for _, c := range candidates {
r.logger.Printf("[DEBUG][去重] 候选详情: memory_id=%d score=%.4f content=\"%s\"",
c.MemoryID, c.Score, truncateRunes(c.Content, 50))
}
}
// Step 3: 逐对 LLM 比对。
comparisons := r.compareWithCandidates(ctx, fact, candidates)
// Step 4: 确定性汇总。
decision := memoryutils.AggregateComparisons(fact, comparisons, candidates)
// 打印汇总决策结果,便于排查去重终态。
if r.logger != nil {
r.logger.Printf("[DEBUG][去重] 汇总决策: job_id=%d action=%s target_id=%d reason=\"%s\"",
job.ID, decision.Action, decision.TargetID, decision.Reason)
}
// Step 5: 校验 + 执行。
actionOutcome, err := ApplyFinalDecision(ctx, itemRepo, auditRepo, *decision, fact, job, payload)
if err != nil {
return nil, fmt.Errorf("执行决策动作失败: %w", err)
}
result.Outcomes = append(result.Outcomes, actionOutcome)
// Step 6: conflict (DELETE) 后需要补一个 ADD 写入新 fact。
// 原因:旧记忆矛盾需删除,但新事实本身仍然有效,必须写入。
if decision.Action == memorymodel.DecisionActionDelete {
addDecision := memorymodel.FinalDecision{
Action: memorymodel.DecisionActionAdd,
Reason: "冲突旧记忆已删除,写入新事实",
}
addOutcome, addErr := ApplyFinalDecision(ctx, itemRepo, auditRepo, addDecision, fact, job, payload)
if addErr != nil {
if r.logger != nil {
r.logger.Printf("[WARN] 冲突后补增失败: memory_type=%s err=%v", fact.MemoryType, addErr)
}
} else if addOutcome != nil {
result.Outcomes = append(result.Outcomes, addOutcome)
}
}
return result, nil
}
// recallCandidates 从旧记忆中召回候选,先尝试 Milvus降级时用 MySQL。
func (r *Runner) recallCandidates(
ctx context.Context,
payload memorymodel.ExtractJobPayload,
fact memorymodel.NormalizedFact,
) []memorymodel.CandidateSnapshot {
// 1. 优先使用 Milvus 向量语义召回。
if r.ragRuntime != nil {
retrieveResult, err := r.ragRuntime.RetrieveMemory(ctx, infrarag.MemoryRetrieveRequest{
Query: fact.Content,
TopK: r.cfg.DecisionCandidateTopK,
Threshold: r.cfg.DecisionCandidateMinScore,
UserID: payload.UserID,
MemoryTypes: []string{fact.MemoryType},
Action: "search",
})
if err == nil && len(retrieveResult.Items) > 0 {
candidates := r.buildCandidatesFromRAG(retrieveResult.Items)
if len(candidates) > 0 {
return candidates
}
// RAG 返回了结果但 DocumentID 全部解析失败,降级到 MySQL。
if r.logger != nil {
r.logger.Printf("[WARN][去重] Milvus 返回 %d 条结果但 DocumentID 全部解析失败,降级到 MySQL: user_id=%d memory_type=%s", len(retrieveResult.Items), payload.UserID, fact.MemoryType)
}
}
if err != nil && r.logger != nil {
r.logger.Printf("[WARN][去重] Milvus 语义召回失败,降级到 MySQL: user_id=%d memory_type=%s topk=%d err=%v", payload.UserID, fact.MemoryType, r.cfg.DecisionCandidateTopK, err)
}
}
// 2. 降级:按 user_id + memory_type + status=active 查最近 N 条。
return r.recallCandidatesFromMySQL(ctx, payload, fact)
}
// buildCandidatesFromRAG 从 RAG 检索结果构建候选快照列表。
//
// 步骤:
// 1. 从 DocumentID格式 memory:{id})解析出 mysql_id
// 2. 从 metadata 提取 title 和 memory_type
// 3. 跳过无法解析 DocumentID 的结果。
func (r *Runner) buildCandidatesFromRAG(hits []infrarag.RetrieveHit) []memorymodel.CandidateSnapshot {
candidates := make([]memorymodel.CandidateSnapshot, 0, len(hits))
for _, hit := range hits {
memoryID := parseMemoryID(hit.DocumentID)
if memoryID <= 0 {
if r.logger != nil {
r.logger.Printf("[WARN][去重] DocumentID 解析失败,跳过候选: document_id=%q", hit.DocumentID)
}
continue
}
candidates = append(candidates, memorymodel.CandidateSnapshot{
MemoryID: memoryID,
Title: asStringFromMap(hit.Metadata, "title"),
Content: hit.Text,
MemoryType: asStringFromMap(hit.Metadata, "memory_type"),
Score: hit.Score,
})
}
return candidates
}
// recallCandidatesFromMySQL 从 MySQL 查最近 N 条活跃记忆作为候选。
// 这是 Milvus 不可用时的降级方案。
func (r *Runner) recallCandidatesFromMySQL(
ctx context.Context,
payload memorymodel.ExtractJobPayload,
fact memorymodel.NormalizedFact,
) []memorymodel.CandidateSnapshot {
items, err := r.itemRepo.FindByQuery(ctx, memorymodel.ItemQuery{
UserID: payload.UserID,
MemoryTypes: []string{fact.MemoryType},
Statuses: []string{model.MemoryItemStatusActive},
Limit: r.cfg.DecisionCandidateTopK,
})
if err != nil {
if r.logger != nil {
r.logger.Printf("[WARN] MySQL 降级召回失败: err=%v", err)
}
return nil
}
candidates := make([]memorymodel.CandidateSnapshot, 0, len(items))
for _, item := range items {
candidates = append(candidates, memorymodel.CandidateSnapshot{
MemoryID: item.ID,
Title: item.Title,
Content: item.Content,
MemoryType: item.MemoryType,
Score: 0, // MySQL 降级无向量分数
})
}
return candidates
}
// compareWithCandidates 对每个候选逐一调 LLM 做关系判断。
//
// 说明:
// 1. LLM 调用失败时视为 unrelated不影响其他候选的比对
// 2. 对比对结果做校验,不合法的也视为 unrelated
// 3. 无候选或决策编排器为空时返回空切片,上层直接走 ADD 路径。
func (r *Runner) compareWithCandidates(
ctx context.Context,
fact memorymodel.NormalizedFact,
candidates []memorymodel.CandidateSnapshot,
) []memorymodel.ComparisonResult {
if r.decisionOrchestrator == nil || len(candidates) == 0 {
return nil
}
comparisons := make([]memorymodel.ComparisonResult, 0, len(candidates))
for _, candidate := range candidates {
compResult, err := r.decisionOrchestrator.Compare(ctx, fact, candidate)
if err != nil {
// LLM 调用失败 → 视为 unrelated不影响其他候选。
if r.logger != nil {
r.logger.Printf("[WARN][去重] LLM 逐对比较调用失败,视为 unrelated: candidate_id=%d memory_type=%s err=%v", candidate.MemoryID, fact.MemoryType, err)
}
continue
}
// 校验 LLM 输出合法性,不合法也跳过。
if validateErr := memoryutils.ValidateComparisonResult(compResult); validateErr != nil {
if r.logger != nil {
r.logger.Printf("[WARN][去重] LLM 比对结果校验不通过,视为 unrelated: candidate_id=%d memory_type=%s relation=%s err=%v", candidate.MemoryID, fact.MemoryType, compResult.Relation, validateErr)
}
continue
}
comparisons = append(comparisons, *compResult)
// 打印 LLM 比对结果,便于排查误判。
if r.logger != nil {
r.logger.Printf("[DEBUG][去重] LLM 比对结果: candidate_id=%d score=%.4f relation=%s reason=\"%s\" candidate_content=\"%s\"",
candidate.MemoryID, candidate.Score, compResult.Relation, compResult.Reason, truncateRunes(candidate.Content, 50))
}
}
return comparisons
}
// collectActionOutcome 汇总单个动作结果到全局 outcome。
func (r *Runner) collectActionOutcome(outcome *DecisionFlowOutcome, actionOutcome *ApplyActionOutcome) {
if actionOutcome == nil {
return
}
switch actionOutcome.Action {
case memorymodel.DecisionActionAdd:
outcome.AddCount++
if actionOutcome.NeedsSync && actionOutcome.NewItem != nil {
outcome.ItemsToSync = append(outcome.ItemsToSync, *actionOutcome.NewItem)
}
case memorymodel.DecisionActionUpdate:
outcome.UpdateCount++
if actionOutcome.NeedsSync && actionOutcome.NewItem != nil {
outcome.ItemsToSync = append(outcome.ItemsToSync, *actionOutcome.NewItem)
}
case memorymodel.DecisionActionDelete:
outcome.DeleteCount++
outcome.VectorDeletes = append(outcome.VectorDeletes, actionOutcome.MemoryID)
case memorymodel.DecisionActionNone:
outcome.NoneCount++
}
}
// asStringFromMap 从 metadata map 中安全提取字符串值。
func asStringFromMap(m map[string]any, key string) string {
if m == nil {
return ""
}
v, ok := m[key]
if !ok || v == nil {
return ""
}
return fmt.Sprintf("%v", v)
}
// truncateRunes 截取字符串前 n 个 rune超出则追加 "..."。
// 用途:日志内容预览,避免超长内容撑爆单行日志。
func truncateRunes(s string, n int) string {
runes := []rune(s)
if len(runes) <= n {
return s
}
if n <= 0 {
return ""
}
return string(runes[:n]) + "..."
}

View File

@@ -12,6 +12,7 @@ import (
infrarag "github.com/LoveLosita/smartflow/backend/infra/rag"
memorymodel "github.com/LoveLosita/smartflow/backend/memory/model"
memoryorchestrator "github.com/LoveLosita/smartflow/backend/memory/orchestrator"
memoryrepo "github.com/LoveLosita/smartflow/backend/memory/repo"
memoryutils "github.com/LoveLosita/smartflow/backend/memory/utils"
"github.com/LoveLosita/smartflow/backend/model"
@@ -41,6 +42,13 @@ type Runner struct {
extractor Extractor
ragRuntime infrarag.Runtime
logger *log.Logger
// 决策层依赖。
// 说明:
// 1. cfg 提供决策层配置是否启用、TopK、MinScore、FallbackMode
// 2. decisionOrchestrator 在决策启用时负责 LLM 逐对比较,为 nil 时走旧路径。
cfg memorymodel.Config
decisionOrchestrator *memoryorchestrator.LLMDecisionOrchestrator
}
// NewRunner 构造记忆 worker 执行器。
@@ -52,16 +60,20 @@ func NewRunner(
settingsRepo *memoryrepo.SettingsRepo,
extractor Extractor,
ragRuntime infrarag.Runtime,
cfg memorymodel.Config,
decisionOrchestrator *memoryorchestrator.LLMDecisionOrchestrator,
) *Runner {
return &Runner{
db: db,
jobRepo: jobRepo,
itemRepo: itemRepo,
auditRepo: auditRepo,
settingsRepo: settingsRepo,
extractor: extractor,
ragRuntime: ragRuntime,
logger: log.Default(),
db: db,
jobRepo: jobRepo,
itemRepo: itemRepo,
auditRepo: auditRepo,
settingsRepo: settingsRepo,
extractor: extractor,
ragRuntime: ragRuntime,
logger: log.Default(),
cfg: cfg,
decisionOrchestrator: decisionOrchestrator,
}
}
@@ -145,7 +157,42 @@ func (r *Runner) RunOnce(ctx context.Context) (*RunOnceResult, error) {
return result, nil
}
// 5. 先在事务里写入记忆条目和审计日志,再统一确认 job 成功
// 5. 根据配置选择写入路径:决策层 or 旧路径
if r.cfg.DecisionEnabled && r.decisionOrchestrator != nil {
// 5a. 决策路径:召回→比对→汇总→执行。
outcome, decisionErr := r.executeDecisionFlow(ctx, job, payload, facts)
if decisionErr != nil {
// 决策流程整体失败,根据 FallbackMode 决定是否退回旧路径。
r.logger.Printf("[WARN][去重] 决策流程整体失败: job_id=%d user_id=%d facts_count=%d fallback=%s err=%v", job.ID, payload.UserID, len(facts), r.cfg.DecisionFallbackMode, decisionErr)
if r.cfg.DecisionFallbackMode == "legacy_add" {
if err = r.persistMemoryWrite(ctx, job.ID, items); err != nil {
failReason := fmt.Sprintf("决策降级后记忆落库失败: %v", err)
_ = r.jobRepo.MarkFailed(ctx, job.ID, failReason)
result.Status = model.MemoryJobStatusFailed
return result, nil
}
result.Status = model.MemoryJobStatusSuccess
result.Facts = len(items)
r.syncMemoryVectors(ctx, items)
return result, nil
}
// FallbackMode=drop丢弃本轮抽取结果直接标记 job 成功。
_ = r.jobRepo.MarkSuccess(ctx, job.ID)
result.Status = model.MemoryJobStatusSuccess
return result, nil
}
// 5b. 决策成功:同步向量(新增/更新)和删除过期向量。
result.Status = model.MemoryJobStatusSuccess
result.Facts = outcome.AddCount + outcome.UpdateCount + outcome.DeleteCount
r.syncMemoryVectors(ctx, outcome.ItemsToSync)
r.syncVectorDeletes(ctx, outcome.VectorDeletes)
r.logger.Printf("[去重] 决策流程完成: job_id=%d user_id=%d 新增=%d 更新=%d 删除=%d 跳过=%d",
job.ID, payload.UserID, outcome.AddCount, outcome.UpdateCount, outcome.DeleteCount, outcome.NoneCount)
return result, nil
}
// 5c. 旧路径:和现在完全一样 — 先在事务里写入记忆条目和审计日志,再统一确认 job 成功。
if err = r.persistMemoryWrite(ctx, job.ID, items); err != nil {
failReason := fmt.Sprintf("记忆落库失败: %v", err)
_ = r.jobRepo.MarkFailed(ctx, job.ID, failReason)
@@ -251,7 +298,7 @@ func (r *Runner) syncMemoryVectors(ctx context.Context, items []model.MemoryItem
Items: requestItems,
})
if err != nil {
r.logger.Printf("memory vector sync failed: err=%v", err)
r.logger.Printf("[WARN][去重] 记忆向量同步失败: count=%d err=%v", len(items), err)
for _, item := range items {
_ = r.itemRepo.UpdateVectorStateByID(ctx, item.ID, "failed", nil)
}
@@ -273,6 +320,42 @@ func (r *Runner) syncMemoryVectors(ctx context.Context, items []model.MemoryItem
}
}
// syncVectorDeletes 处理决策层 DELETE 动作产出的向量清理需求。
//
// 步骤:
// 1. 将 memoryID 转为 Milvus documentID"memory:{id}" 格式);
// 2. 调 Runtime.DeleteMemory 真正从 Milvus 删除对应向量;
// 3. 更新 MySQL vector_status 标记删除结果。
func (r *Runner) syncVectorDeletes(ctx context.Context, memoryIDs []int64) {
if r == nil || len(memoryIDs) == 0 {
return
}
// 1. 构造 documentID 列表。
documentIDs := make([]string, 0, len(memoryIDs))
for _, id := range memoryIDs {
documentIDs = append(documentIDs, fmt.Sprintf("memory:%d", id))
}
// 2. 调 Runtime 删除向量。
if r.ragRuntime != nil {
if err := r.ragRuntime.DeleteMemory(ctx, documentIDs); err != nil {
r.logger.Printf("[WARN][去重] Milvus 向量删除失败,标记为 pending 等待后续清理: count=%d ids=%v err=%v", len(memoryIDs), memoryIDs, err)
} else {
r.logger.Printf("[去重] Milvus 向量删除完成: count=%d ids=%v", len(memoryIDs), memoryIDs)
}
}
// 3. 更新 MySQL vector_status。
for _, memoryID := range memoryIDs {
if updateErr := r.itemRepo.UpdateVectorStateByID(ctx, memoryID, "deleted", nil); updateErr != nil {
if r.logger != nil {
r.logger.Printf("[WARN] 向量状态更新失败: memory_id=%d err=%v", memoryID, updateErr)
}
}
}
}
func resolveMemoryTTLAt(base time.Time, memoryType string) *time.Time {
switch memoryType {
case memorymodel.MemoryTypeTodoHint: