后端:
1.阶段 6 CP4/CP5 目录收口与共享边界纯化
- 将 backend 根目录收口为 services、client、gateway、cmd、shared 五个一级目录
- 收拢 bootstrap、inits、infra/kafka、infra/outbox、conv、respond、pkg、middleware,移除根目录旧实现与空目录
- 将 utils 下沉到 services/userauth/internal/auth,将 logic 下沉到 services/schedule/core/planning
- 将迁移期 runtime 桥接实现统一收拢到 services/runtime/{conv,dao,eventsvc,model},删除 shared/legacy 与未再被 import 的旧 service 实现
- 将 gateway/shared/respond 收口为 HTTP/Gin 错误写回适配,shared/respond 仅保留共享错误语义与状态映射
- 将 HTTP IdempotencyMiddleware 与 RateLimitMiddleware 收口到 gateway/middleware
- 将 GormCachePlugin 下沉到 shared/infra/gormcache,将共享 RateLimiter 下沉到 shared/infra/ratelimit,将 agent token budget 下沉到 services/agent/shared
- 删除 InitEino 兼容壳,收缩 cmd/internal/coreinit 仅保留旧组合壳残留域初始化语义
- 更新微服务迁移计划与桌面 checklist,补齐 CP4/CP5 当前切流点、目录终态与验证结果
- 完成 go test ./...、git diff --check 与最终真实 smoke;health、register/login、task/create+get、schedule/today、task-class/list、memory/items、agent chat/meta/timeline/context-stats 全部 200,SSE 合并结果为 CP5_OK 且 [DONE] 只有 1 个
249 lines
8.3 KiB
Go
249 lines
8.3 KiB
Go
package worker
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"strings"
|
||
|
||
memoryrepo "github.com/LoveLosita/smartflow/backend/services/memory/internal/repo"
|
||
memoryutils "github.com/LoveLosita/smartflow/backend/services/memory/internal/utils"
|
||
memorymodel "github.com/LoveLosita/smartflow/backend/services/memory/model"
|
||
"github.com/LoveLosita/smartflow/backend/services/runtime/model"
|
||
)
|
||
|
||
// ApplyActionOutcome 是单个决策动作的执行结果。
|
||
//
|
||
// 说明:
|
||
// 1. Action 记录本次执行的动作类型(ADD/UPDATE/DELETE/NONE);
|
||
// 2. OldItem 仅在 UPDATE/DELETE 时有值,用于审计 before 快照;
|
||
// 3. NewItem 仅在 ADD/UPDATE 时有值,用于审计 after 快照和向量同步;
|
||
// 4. NeedsSync 标记是否需要触发向量同步(ADD 和 UPDATE 需要)。
|
||
type ApplyActionOutcome struct {
|
||
Action string
|
||
MemoryID int64
|
||
OldItem *model.MemoryItem // UPDATE/DELETE 时的 before 快照
|
||
NewItem *model.MemoryItem // ADD/UPDATE 时的 after 快照
|
||
NeedsSync bool // 是否需要向量同步
|
||
}
|
||
|
||
// ApplyFinalDecision 把汇总后的最终决策落为数据库动作。
|
||
//
|
||
// 职责边界:
|
||
// 1. 在调用方事务内执行,不做独立事务管理;
|
||
// 2. 负责写 memory_items + memory_audit_logs,不负责 job 状态推进;
|
||
// 3. 所有动作的审计日志都由这里统一产出。
|
||
//
|
||
// 参数说明:
|
||
// - itemRepo/auditRepo 必须是事务绑定的实例(WithTx 后的);
|
||
// - fact 是当前正在处理的标准化事实;
|
||
// - job/payload 提供写入所需的上下文(user_id、conversation_id 等)。
|
||
func ApplyFinalDecision(
|
||
ctx context.Context,
|
||
itemRepo *memoryrepo.ItemRepo,
|
||
auditRepo *memoryrepo.AuditRepo,
|
||
decision memorymodel.FinalDecision,
|
||
fact memorymodel.NormalizedFact,
|
||
job *model.MemoryJob,
|
||
payload memorymodel.ExtractJobPayload,
|
||
) (*ApplyActionOutcome, error) {
|
||
switch decision.Action {
|
||
case memorymodel.DecisionActionAdd:
|
||
return applyAdd(ctx, itemRepo, auditRepo, fact, job, payload, decision.Reason)
|
||
case memorymodel.DecisionActionUpdate:
|
||
return applyUpdate(ctx, itemRepo, auditRepo, decision, fact, job, payload)
|
||
case memorymodel.DecisionActionDelete:
|
||
return applyDelete(ctx, itemRepo, auditRepo, decision, payload.UserID)
|
||
case memorymodel.DecisionActionNone:
|
||
return &ApplyActionOutcome{
|
||
Action: memorymodel.DecisionActionNone,
|
||
NeedsSync: false,
|
||
}, nil
|
||
default:
|
||
return nil, fmt.Errorf("未知的决策动作: %s", decision.Action)
|
||
}
|
||
}
|
||
|
||
// applyAdd 执行新增动作:构建 MemoryItem → 写库 → 写审计。
|
||
func applyAdd(
|
||
ctx context.Context,
|
||
itemRepo *memoryrepo.ItemRepo,
|
||
auditRepo *memoryrepo.AuditRepo,
|
||
fact memorymodel.NormalizedFact,
|
||
job *model.MemoryJob,
|
||
payload memorymodel.ExtractJobPayload,
|
||
reason string,
|
||
) (*ApplyActionOutcome, error) {
|
||
// 1. 复用 runner.go 的 buildMemoryItems 构建单条 MemoryItem。
|
||
items := buildMemoryItems(job, payload, []memorymodel.NormalizedFact{fact})
|
||
if len(items) == 0 {
|
||
return nil, fmt.Errorf("构建记忆条目失败: memory_type=%s", fact.MemoryType)
|
||
}
|
||
|
||
// 2. 写库,GORM Create 会自动填充 items[0].ID。
|
||
if err := itemRepo.UpsertItems(ctx, items); err != nil {
|
||
return nil, fmt.Errorf("新增记忆写入失败: %w", err)
|
||
}
|
||
// 注意:必须在 UpsertItems 之后取 items[0],因为 GORM Create 回填 ID 到 items[i],
|
||
// 之前用 item := items[0] 在 UpsertItems 之前拷贝,导致副本 ID 永远为 0。
|
||
item := items[0]
|
||
|
||
// 3. 写审计日志(create 动作只有 after 快照)。
|
||
audit := memoryutils.BuildItemAuditLog(
|
||
item.ID,
|
||
item.UserID,
|
||
memoryutils.AuditOperationCreate,
|
||
"system",
|
||
formatAuditReason("决策层新增", reason),
|
||
nil,
|
||
&item,
|
||
)
|
||
if err := auditRepo.Create(ctx, audit); err != nil {
|
||
return nil, fmt.Errorf("新增审计写入失败: %w", err)
|
||
}
|
||
|
||
return &ApplyActionOutcome{
|
||
Action: memorymodel.DecisionActionAdd,
|
||
MemoryID: item.ID,
|
||
NewItem: &item,
|
||
NeedsSync: true,
|
||
}, nil
|
||
}
|
||
|
||
// applyUpdate 执行更新动作:查 before → 更新字段 → 写审计(before+after)。
|
||
func applyUpdate(
|
||
ctx context.Context,
|
||
itemRepo *memoryrepo.ItemRepo,
|
||
auditRepo *memoryrepo.AuditRepo,
|
||
decision memorymodel.FinalDecision,
|
||
fact memorymodel.NormalizedFact,
|
||
job *model.MemoryJob,
|
||
payload memorymodel.ExtractJobPayload,
|
||
) (*ApplyActionOutcome, error) {
|
||
// 1. 查 before 快照,同时确认旧记忆存在且属于该用户。
|
||
oldItem, err := itemRepo.GetByIDForUser(ctx, payload.UserID, decision.TargetID)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询旧记忆失败(id=%d): %w", decision.TargetID, err)
|
||
}
|
||
|
||
// 2. 重新计算 NormalizedContent 和 ContentHash,保证和 NormalizeFacts 的逻辑一致。
|
||
// 原因:LLM 输出的 merged content 需要重新走归一化链,避免大小写/空格差异导致后续 Hash 去重失效。
|
||
updatedContent := strings.TrimSpace(decision.Content)
|
||
if updatedContent == "" {
|
||
updatedContent = fact.Content
|
||
}
|
||
normalizedContent := strings.ToLower(updatedContent)
|
||
// 复用 utils.HashContent 的 sha256(memoryType + "::" + normalizedContent) 算法。
|
||
contentHash := memoryutils.HashContent(fact.MemoryType, normalizedContent)
|
||
|
||
title := strings.TrimSpace(decision.Title)
|
||
if title == "" {
|
||
title = oldItem.Title
|
||
}
|
||
|
||
// 3. 执行内容更新。
|
||
fields := memorymodel.UpdateContentFields{
|
||
Title: title,
|
||
Content: updatedContent,
|
||
NormalizedContent: normalizedContent,
|
||
ContentHash: contentHash,
|
||
Confidence: fact.Confidence,
|
||
Importance: fact.Importance,
|
||
}
|
||
if err := itemRepo.UpdateContentByID(ctx, decision.TargetID, fields); err != nil {
|
||
return nil, fmt.Errorf("更新记忆内容失败(id=%d): %w", decision.TargetID, err)
|
||
}
|
||
|
||
// 4. 构造 after 快照用于审计。
|
||
afterItem := *oldItem
|
||
afterItem.Title = title
|
||
afterItem.Content = updatedContent
|
||
if afterItem.NormalizedContent != nil {
|
||
afterItem.NormalizedContent = &normalizedContent
|
||
} else {
|
||
afterItem.NormalizedContent = strPtrFromValue(normalizedContent)
|
||
}
|
||
if afterItem.ContentHash != nil {
|
||
afterItem.ContentHash = &contentHash
|
||
} else {
|
||
afterItem.ContentHash = strPtrFromValue(contentHash)
|
||
}
|
||
afterItem.Confidence = fact.Confidence
|
||
afterItem.Importance = fact.Importance
|
||
|
||
// 5. 写审计日志(update 动作同时有 before 和 after 快照)。
|
||
audit := memoryutils.BuildItemAuditLog(
|
||
oldItem.ID,
|
||
oldItem.UserID,
|
||
memoryutils.AuditOperationUpdate,
|
||
"system",
|
||
formatAuditReason("决策层更新", decision.Reason),
|
||
oldItem,
|
||
&afterItem,
|
||
)
|
||
if err := auditRepo.Create(ctx, audit); err != nil {
|
||
return nil, fmt.Errorf("更新审计写入失败: %w", err)
|
||
}
|
||
|
||
// 6. 向量状态重置为 pending,触发向量重同步。
|
||
// 原因:内容变了,旧向量已过期,需要重新 embed。
|
||
_ = itemRepo.UpdateVectorStateByID(ctx, oldItem.ID, "pending", nil)
|
||
|
||
return &ApplyActionOutcome{
|
||
Action: memorymodel.DecisionActionUpdate,
|
||
MemoryID: oldItem.ID,
|
||
OldItem: oldItem,
|
||
NewItem: &afterItem,
|
||
NeedsSync: true,
|
||
}, nil
|
||
}
|
||
|
||
// applyDelete 执行软删除动作:查 before → 软删 → 写审计(before only)。
|
||
func applyDelete(
|
||
ctx context.Context,
|
||
itemRepo *memoryrepo.ItemRepo,
|
||
auditRepo *memoryrepo.AuditRepo,
|
||
decision memorymodel.FinalDecision,
|
||
userID int,
|
||
) (*ApplyActionOutcome, error) {
|
||
// 1. 查 before 快照。
|
||
oldItem, err := itemRepo.GetByIDForUser(ctx, userID, decision.TargetID)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询旧记忆失败(id=%d): %w", decision.TargetID, err)
|
||
}
|
||
|
||
// 2. 执行软删除。
|
||
if err := itemRepo.SoftDeleteByID(ctx, userID, decision.TargetID); err != nil {
|
||
return nil, fmt.Errorf("软删除记忆失败(id=%d): %w", decision.TargetID, err)
|
||
}
|
||
|
||
// 3. 写审计日志(delete 动作只有 before 快照)。
|
||
audit := memoryutils.BuildItemAuditLog(
|
||
oldItem.ID,
|
||
oldItem.UserID,
|
||
memoryutils.AuditOperationDelete,
|
||
"system",
|
||
formatAuditReason("决策层删除", decision.Reason),
|
||
oldItem,
|
||
nil,
|
||
)
|
||
if err := auditRepo.Create(ctx, audit); err != nil {
|
||
return nil, fmt.Errorf("删除审计写入失败: %w", err)
|
||
}
|
||
|
||
return &ApplyActionOutcome{
|
||
Action: memorymodel.DecisionActionDelete,
|
||
MemoryID: oldItem.ID,
|
||
OldItem: oldItem,
|
||
NeedsSync: false,
|
||
}, nil
|
||
}
|
||
|
||
// formatAuditReason 统一审计日志的 reason 格式。
|
||
func formatAuditReason(prefix, detail string) string {
|
||
detail = strings.TrimSpace(detail)
|
||
if detail == "" {
|
||
return prefix
|
||
}
|
||
return prefix + ": " + detail
|
||
}
|