diff --git a/backend/api/container.go b/backend/api/container.go index e834c78..db1b3cb 100644 --- a/backend/api/container.go +++ b/backend/api/container.go @@ -7,4 +7,5 @@ type ApiHandlers struct { TaskClassHandler *TaskClassHandler ScheduleHandler *ScheduleAPI AgentHandler *AgentHandler + MemoryHandler *MemoryHandler } diff --git a/backend/api/memory.go b/backend/api/memory.go new file mode 100644 index 0000000..36318b2 --- /dev/null +++ b/backend/api/memory.go @@ -0,0 +1,290 @@ +package api + +import ( + "context" + "errors" + "net/http" + "strconv" + "strings" + "time" + + memorypkg "github.com/LoveLosita/smartflow/backend/memory" + memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" + "github.com/LoveLosita/smartflow/backend/model" + "github.com/LoveLosita/smartflow/backend/respond" + "github.com/gin-gonic/gin" +) + +type MemoryHandler struct { + module *memorypkg.Module +} + +var errMemoryHandlerNotReady = errors.New("memory handler is not initialized") + +func NewMemoryHandler(module *memorypkg.Module) *MemoryHandler { + return &MemoryHandler{module: module} +} + +func (h *MemoryHandler) ListItems(c *gin.Context) { + if h == nil || h.module == nil { + c.JSON(http.StatusInternalServerError, respond.InternalError(errMemoryHandlerNotReady)) + return + } + + limit, ok := parseOptionalInt(c.Query("limit")) + if !ok { + c.JSON(http.StatusBadRequest, respond.WrongParamType) + return + } + statusesRaw := c.Query("statuses") + if strings.TrimSpace(statusesRaw) == "" { + statusesRaw = c.Query("status") + } + memoryTypesRaw := c.Query("memory_types") + if strings.TrimSpace(memoryTypesRaw) == "" { + memoryTypesRaw = c.Query("memory_type") + } + + ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) + defer cancel() + + items, err := h.module.ListItems(ctx, memorymodel.ListItemsRequest{ + UserID: c.GetInt("user_id"), + ConversationID: strings.TrimSpace(c.Query("conversation_id")), + Statuses: splitCSV(statusesRaw), + MemoryTypes: splitCSV(memoryTypesRaw), + Limit: limit, + }) + if err != nil { + respond.DealWithError(c, err) + return + } + + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, toMemoryItemViews(items))) +} + +func (h *MemoryHandler) GetItem(c *gin.Context) { + if h == nil || h.module == nil { + c.JSON(http.StatusInternalServerError, respond.InternalError(errMemoryHandlerNotReady)) + return + } + + memoryID, ok := parseMemoryIDParam(c) + if !ok { + c.JSON(http.StatusBadRequest, respond.WrongParamType) + return + } + + ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) + defer cancel() + + item, err := h.module.GetItem(ctx, model.MemoryGetItemRequest{ + UserID: c.GetInt("user_id"), + MemoryID: memoryID, + }) + if err != nil { + respond.DealWithError(c, err) + return + } + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, toMemoryItemView(item))) +} + +func (h *MemoryHandler) CreateItem(c *gin.Context) { + if h == nil || h.module == nil { + c.JSON(http.StatusInternalServerError, respond.InternalError(errMemoryHandlerNotReady)) + return + } + + var req model.MemoryCreateItemRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, respond.WrongParamType) + return + } + req.UserID = c.GetInt("user_id") + req.OperatorType = "user" + + ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) + defer cancel() + + item, err := h.module.CreateItem(ctx, req) + if err != nil { + respond.DealWithError(c, err) + return + } + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, toMemoryItemView(item))) +} + +func (h *MemoryHandler) UpdateItem(c *gin.Context) { + if h == nil || h.module == nil { + c.JSON(http.StatusInternalServerError, respond.InternalError(errMemoryHandlerNotReady)) + return + } + + memoryID, ok := parseMemoryIDParam(c) + if !ok { + c.JSON(http.StatusBadRequest, respond.WrongParamType) + return + } + + var req model.MemoryUpdateItemRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, respond.WrongParamType) + return + } + req.UserID = c.GetInt("user_id") + req.MemoryID = memoryID + req.OperatorType = "user" + + ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) + defer cancel() + + item, err := h.module.UpdateItem(ctx, req) + if err != nil { + respond.DealWithError(c, err) + return + } + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, toMemoryItemView(item))) +} + +func (h *MemoryHandler) DeleteItem(c *gin.Context) { + if h == nil || h.module == nil { + c.JSON(http.StatusInternalServerError, respond.InternalError(errMemoryHandlerNotReady)) + return + } + + memoryID, ok := parseMemoryIDParam(c) + if !ok { + c.JSON(http.StatusBadRequest, respond.WrongParamType) + return + } + + var body struct { + Reason string `json:"reason"` + } + _ = c.ShouldBindJSON(&body) + + ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) + defer cancel() + + item, err := h.module.DeleteItem(ctx, model.MemoryDeleteItemRequest{ + UserID: c.GetInt("user_id"), + MemoryID: memoryID, + Reason: strings.TrimSpace(body.Reason), + OperatorType: "user", + }) + if err != nil { + respond.DealWithError(c, err) + return + } + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, toMemoryItemView(item))) +} + +func (h *MemoryHandler) RestoreItem(c *gin.Context) { + if h == nil || h.module == nil { + c.JSON(http.StatusInternalServerError, respond.InternalError(errMemoryHandlerNotReady)) + return + } + + memoryID, ok := parseMemoryIDParam(c) + if !ok { + c.JSON(http.StatusBadRequest, respond.WrongParamType) + return + } + + var body struct { + Reason string `json:"reason"` + } + _ = c.ShouldBindJSON(&body) + + ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) + defer cancel() + + item, err := h.module.RestoreItem(ctx, model.MemoryRestoreItemRequest{ + UserID: c.GetInt("user_id"), + MemoryID: memoryID, + Reason: strings.TrimSpace(body.Reason), + OperatorType: "user", + }) + if err != nil { + respond.DealWithError(c, err) + return + } + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, toMemoryItemView(item))) +} + +func parseMemoryIDParam(c *gin.Context) (int64, bool) { + raw := strings.TrimSpace(c.Param("id")) + if raw == "" { + return 0, false + } + value, err := strconv.ParseInt(raw, 10, 64) + if err != nil || value <= 0 { + return 0, false + } + return value, true +} + +func parseOptionalInt(raw string) (int, bool) { + raw = strings.TrimSpace(raw) + if raw == "" { + return 0, true + } + value, err := strconv.Atoi(raw) + if err != nil { + return 0, false + } + return value, true +} + +func splitCSV(raw string) []string { + raw = strings.TrimSpace(raw) + if raw == "" { + return nil + } + parts := strings.Split(raw, ",") + result := make([]string, 0, len(parts)) + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "" { + continue + } + result = append(result, part) + } + return result +} + +func toMemoryItemViews(items []memorymodel.ItemDTO) []model.MemoryItemView { + if len(items) == 0 { + return nil + } + result := make([]model.MemoryItemView, 0, len(items)) + for _, item := range items { + result = append(result, toMemoryItemView(&item)) + } + return result +} + +func toMemoryItemView(item *memorymodel.ItemDTO) model.MemoryItemView { + if item == nil { + return model.MemoryItemView{} + } + return model.MemoryItemView{ + ID: item.ID, + UserID: item.UserID, + ConversationID: item.ConversationID, + AssistantID: item.AssistantID, + RunID: item.RunID, + MemoryType: item.MemoryType, + Title: item.Title, + Content: item.Content, + ContentHash: item.ContentHash, + Confidence: item.Confidence, + Importance: item.Importance, + SensitivityLevel: item.SensitivityLevel, + IsExplicit: item.IsExplicit, + Status: item.Status, + TTLAt: item.TTLAt, + CreatedAt: item.CreatedAt, + UpdatedAt: item.UpdatedAt, + } +} diff --git a/backend/cmd/start.go b/backend/cmd/start.go index f81cdf3..e9252ee 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -14,6 +14,7 @@ import ( ragconfig "github.com/LoveLosita/smartflow/backend/infra/rag/config" "github.com/LoveLosita/smartflow/backend/inits" "github.com/LoveLosita/smartflow/backend/memory" + memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" "github.com/LoveLosita/smartflow/backend/middleware" newagentconv "github.com/LoveLosita/smartflow/backend/newAgent/conv" newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" @@ -78,7 +79,18 @@ func Start() { // 1. memory 模块对启动层只暴露一个门面。 // 2. 后续若接入统一 DI 容器,也优先注入这个门面,而不是继续暴露内部 repo/service。 memoryCfg := memory.LoadConfigFromViper() - memoryModule := memory.NewModule(db, infrallm.WrapArkClient(aiHub.Worker), ragRuntime, memoryCfg) + memoryObserver := memoryobserve.NewLoggerObserver(log.Default()) + memoryMetrics := memoryobserve.NewMetricsRegistry() + memoryModule := memory.NewModuleWithObserve( + db, + infrallm.WrapArkClient(aiHub.Worker), + ragRuntime, + memoryCfg, + memory.ObserveDeps{ + Observer: memoryObserver, + Metrics: memoryMetrics, + }, + ) // DAO 层初始化。 cacheRepo := dao.NewCacheDAO(rdb) @@ -180,6 +192,7 @@ func Start() { taskClassApi := api.NewTaskClassHandler(taskClassService) scheduleApi := api.NewScheduleAPI(scheduleService) agentApi := api.NewAgentHandler(agentService) + memoryApi := api.NewMemoryHandler(memoryModule) handlers := &api.ApiHandlers{ UserHandler: userApi, TaskHandler: taskApi, @@ -187,6 +200,7 @@ func Start() { CourseHandler: courseApi, ScheduleHandler: scheduleApi, AgentHandler: agentApi, + MemoryHandler: memoryApi, } r := routers.RegisterRouters(handlers, cacheRepo, userRepo, limiter) diff --git a/backend/memory/cleanup/dedup_policy.go b/backend/memory/cleanup/dedup_policy.go new file mode 100644 index 0000000..ec68e70 --- /dev/null +++ b/backend/memory/cleanup/dedup_policy.go @@ -0,0 +1,73 @@ +package cleanup + +import ( + "sort" + "time" + + "github.com/LoveLosita/smartflow/backend/model" +) + +const dedupRecentTieWindow = 24 * time.Hour + +// DedupDecision 描述单个重复组的治理结论。 +type DedupDecision struct { + Keep model.MemoryItem + Archive []model.MemoryItem +} + +// DecideDedupGroup 决定一组重复 active 记忆中“保留谁、归档谁”。 +// +// 步骤化说明: +// 1. 先按“最近更新时间”判断谁更值得保留,符合治理计划里的“优先保留最近更新”; +// 2. 若更新时间非常接近,再比较 confidence/importance,避免刚好相差几秒就误保留低质量版本; +// 3. 最后用主键逆序兜底,保证同组治理结果稳定可复现。 +func DecideDedupGroup(items []model.MemoryItem) DedupDecision { + if len(items) == 0 { + return DedupDecision{} + } + + ordered := make([]model.MemoryItem, len(items)) + copy(ordered, items) + sort.SliceStable(ordered, func(i, j int) bool { + return preferDedupKeep(ordered[i], ordered[j]) + }) + + return DedupDecision{ + Keep: ordered[0], + Archive: ordered[1:], + } +} + +func preferDedupKeep(left model.MemoryItem, right model.MemoryItem) bool { + leftTime := dedupBaseTime(left) + rightTime := dedupBaseTime(right) + + diff := leftTime.Sub(rightTime) + if diff < 0 { + diff = -diff + } + if diff > dedupRecentTieWindow { + return leftTime.After(rightTime) + } + + if left.Confidence != right.Confidence { + return left.Confidence > right.Confidence + } + if left.Importance != right.Importance { + return left.Importance > right.Importance + } + if !leftTime.Equal(rightTime) { + return leftTime.After(rightTime) + } + return left.ID > right.ID +} + +func dedupBaseTime(item model.MemoryItem) time.Time { + if item.UpdatedAt != nil { + return *item.UpdatedAt + } + if item.CreatedAt != nil { + return *item.CreatedAt + } + return time.Time{} +} diff --git a/backend/memory/cleanup/dedup_runner.go b/backend/memory/cleanup/dedup_runner.go new file mode 100644 index 0000000..3190d4f --- /dev/null +++ b/backend/memory/cleanup/dedup_runner.go @@ -0,0 +1,257 @@ +package cleanup + +import ( + "context" + "errors" + "strconv" + "strings" + "time" + + memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" + memoryrepo "github.com/LoveLosita/smartflow/backend/memory/repo" + memoryutils "github.com/LoveLosita/smartflow/backend/memory/utils" + memoryvectorsync "github.com/LoveLosita/smartflow/backend/memory/vectorsync" + "github.com/LoveLosita/smartflow/backend/model" + "gorm.io/gorm" +) + +// DedupRunner 负责执行一次离线记忆去重治理。 +// +// 职责边界: +// 1. 只处理“active + content_hash 非空”的重复组; +// 2. 只负责 archive + audit + 向量删除桥接,不负责自动定时调度; +// 3. 支持 dry-run,便于上线初期先观察治理结果再正式落库。 +type DedupRunner struct { + db *gorm.DB + itemRepo *memoryrepo.ItemRepo + auditRepo *memoryrepo.AuditRepo + vectorSyncer *memoryvectorsync.Syncer + observer memoryobserve.Observer + metrics memoryobserve.MetricsRecorder +} + +func NewDedupRunner( + db *gorm.DB, + itemRepo *memoryrepo.ItemRepo, + auditRepo *memoryrepo.AuditRepo, + vectorSyncer *memoryvectorsync.Syncer, + observer memoryobserve.Observer, + metrics memoryobserve.MetricsRecorder, +) *DedupRunner { + if observer == nil { + observer = memoryobserve.NewNopObserver() + } + if metrics == nil { + metrics = memoryobserve.NewNopMetrics() + } + return &DedupRunner{ + db: db, + itemRepo: itemRepo, + auditRepo: auditRepo, + vectorSyncer: vectorSyncer, + observer: observer, + metrics: metrics, + } +} + +// Run 执行一次离线去重治理。 +func (r *DedupRunner) Run(ctx context.Context, req model.MemoryDedupCleanupRequest) (model.MemoryDedupCleanupResult, error) { + result := model.MemoryDedupCleanupResult{ + DryRun: req.DryRun, + } + if r == nil || r.db == nil || r.itemRepo == nil || r.auditRepo == nil { + return result, errors.New("memory dedup runner is not initialized") + } + + items, err := r.itemRepo.ListActiveItemsForDedup(ctx, req.UserID, req.Limit) + if err != nil { + r.recordDedupObserve(ctx, req, result, false, err) + return result, err + } + + groups := groupDuplicateItems(items) + result.ScannedGroupCount = len(groups) + if len(groups) == 0 { + r.recordDedupObserve(ctx, req, result, true, nil) + return result, nil + } + + for _, group := range groups { + decision := DecideDedupGroup(group) + if decision.Keep.ID > 0 { + result.KeptCount++ + } + if len(decision.Archive) == 0 { + continue + } + + result.DedupedGroupCount++ + archiveIDs := collectDedupIDs(decision.Archive) + result.ArchivedCount += len(archiveIDs) + result.ArchivedIDs = append(result.ArchivedIDs, archiveIDs...) + if req.DryRun { + continue + } + + now := time.Now() + txErr := r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + itemRepo := r.itemRepo.WithTx(tx) + auditRepo := r.auditRepo.WithTx(tx) + + if archiveErr := itemRepo.ArchiveByIDsAt(ctx, archiveIDs, now); archiveErr != nil { + return archiveErr + } + + for _, item := range decision.Archive { + after := item + after.Status = model.MemoryItemStatusArchived + after.UpdatedAt = &now + after.VectorStatus = "pending" + + audit := memoryutils.BuildItemAuditLog( + item.ID, + item.UserID, + memoryutils.AuditOperationArchive, + normalizeCleanupOperator(req.OperatorType), + normalizeCleanupReason(req.Reason), + &item, + &after, + ) + if createErr := auditRepo.Create(ctx, audit); createErr != nil { + return createErr + } + } + return nil + }) + if txErr != nil { + r.recordDedupObserve(ctx, req, result, false, txErr) + return result, txErr + } + + r.vectorSyncer.Delete(ctx, "", archiveIDs) + r.metrics.AddCounter(memoryobserve.MetricCleanupArchivedTotal, int64(len(archiveIDs)), map[string]string{ + "dry_run": "false", + }) + } + + r.recordDedupObserve(ctx, req, result, true, nil) + return result, nil +} + +func groupDuplicateItems(items []model.MemoryItem) [][]model.MemoryItem { + if len(items) == 0 { + return nil + } + + result := make([][]model.MemoryItem, 0) + currentGroup := make([]model.MemoryItem, 0, 2) + currentKey := "" + for _, item := range items { + key := dedupGroupKey(item) + if key == "" { + continue + } + if currentKey == "" || currentKey != key { + if len(currentGroup) > 1 { + copied := make([]model.MemoryItem, len(currentGroup)) + copy(copied, currentGroup) + result = append(result, copied) + } + currentKey = key + currentGroup = currentGroup[:0] + } + currentGroup = append(currentGroup, item) + } + if len(currentGroup) > 1 { + copied := make([]model.MemoryItem, len(currentGroup)) + copy(copied, currentGroup) + result = append(result, copied) + } + return result +} + +func dedupGroupKey(item model.MemoryItem) string { + contentHash := strings.TrimSpace(derefString(item.ContentHash)) + if item.UserID <= 0 || strings.TrimSpace(item.MemoryType) == "" || contentHash == "" { + return "" + } + return strings.Join([]string{ + strconv.Itoa(item.UserID), + item.MemoryType, + contentHash, + }, "::") +} + +func collectDedupIDs(items []model.MemoryItem) []int64 { + ids := make([]int64, 0, len(items)) + for _, item := range items { + if item.ID <= 0 { + continue + } + ids = append(ids, item.ID) + } + return ids +} + +func normalizeCleanupOperator(operatorType string) string { + operatorType = strings.TrimSpace(operatorType) + if operatorType == "" { + return "system" + } + return memoryutils.NormalizeOperatorType(operatorType) +} + +func normalizeCleanupReason(reason string) string { + reason = strings.TrimSpace(reason) + if reason == "" { + return "离线 dedup 治理归档重复记忆" + } + return reason +} + +func derefString(value *string) string { + if value == nil { + return "" + } + return strings.TrimSpace(*value) +} + +func (r *DedupRunner) recordDedupObserve( + ctx context.Context, + req model.MemoryDedupCleanupRequest, + result model.MemoryDedupCleanupResult, + success bool, + err error, +) { + if r == nil { + return + } + + status := "success" + level := memoryobserve.LevelInfo + if !success || err != nil { + status = "error" + level = memoryobserve.LevelWarn + } + + r.observer.Observe(ctx, memoryobserve.Event{ + Level: level, + Component: memoryobserve.ComponentCleanup, + Operation: memoryobserve.OperationDedup, + Fields: map[string]any{ + "user_id": req.UserID, + "limit": req.Limit, + "dry_run": req.DryRun, + "scanned_group_count": result.ScannedGroupCount, + "deduped_group_count": result.DedupedGroupCount, + "archived_count": result.ArchivedCount, + "success": success && err == nil, + "error": err, + "error_code": memoryobserve.ClassifyError(err), + }, + }) + r.metrics.AddCounter(memoryobserve.MetricCleanupRunTotal, 1, map[string]string{ + "operation": "dedup", + "status": status, + }) +} diff --git a/backend/memory/model/item.go b/backend/memory/model/item.go index c7ecd6b..ab65cfe 100644 --- a/backend/memory/model/item.go +++ b/backend/memory/model/item.go @@ -67,10 +67,39 @@ type ListItemsRequest struct { Limit int } -// DeleteItemRequest 描述软删除一条记忆时所需的最小参数。 -type DeleteItemRequest struct { - UserID int - MemoryID int64 - Reason string - OperatorType string +// CreateItemFields 是 repo 层落库时真正需要的字段集合。 +type CreateItemFields struct { + UserID int + ConversationID string + AssistantID string + RunID string + MemoryType string + Title string + Content string + NormalizedContent string + ContentHash string + Confidence float64 + Importance float64 + SensitivityLevel int + IsExplicit bool + Status string + TTLAt *time.Time + VectorStatus string + SourceMessageID *int64 + SourceEventID *string + LastAccessAt *time.Time +} + +// UpdateItemFields 是“用户管理侧修改记忆”时 repo 层允许更新的字段集合。 +type UpdateItemFields struct { + MemoryType string + Title string + Content string + NormalizedContent string + ContentHash string + Confidence float64 + Importance float64 + SensitivityLevel int + IsExplicit bool + TTLAt *time.Time } diff --git a/backend/memory/module.go b/backend/memory/module.go index 9d0dbb2..4e50414 100644 --- a/backend/memory/module.go +++ b/backend/memory/module.go @@ -7,11 +7,15 @@ import ( infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" infrarag "github.com/LoveLosita/smartflow/backend/infra/rag" + memorycleanup "github.com/LoveLosita/smartflow/backend/memory/cleanup" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" + memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" memoryorchestrator "github.com/LoveLosita/smartflow/backend/memory/orchestrator" memoryrepo "github.com/LoveLosita/smartflow/backend/memory/repo" memoryservice "github.com/LoveLosita/smartflow/backend/memory/service" + memoryvectorsync "github.com/LoveLosita/smartflow/backend/memory/vectorsync" memoryworker "github.com/LoveLosita/smartflow/backend/memory/worker" + "github.com/LoveLosita/smartflow/backend/model" "gorm.io/gorm" ) @@ -26,6 +30,8 @@ type Module struct { cfg memorymodel.Config llmClient *infrallm.Client ragRuntime infrarag.Runtime + observer memoryobserve.Observer + metrics memoryobserve.MetricsRecorder jobRepo *memoryrepo.JobRepo itemRepo *memoryrepo.ItemRepo @@ -35,9 +41,17 @@ type Module struct { enqueueService *memoryservice.EnqueueService readService *memoryservice.ReadService manageService *memoryservice.ManageService + vectorSyncer *memoryvectorsync.Syncer + dedupRunner *memorycleanup.DedupRunner runner *memoryworker.Runner } +// ObserveDeps 描述 memory 模块可选的观测依赖。 +type ObserveDeps struct { + Observer memoryobserve.Observer + Metrics memoryobserve.MetricsRecorder +} + // LoadConfigFromViper 复用 memory 子包里的配置加载逻辑,对外收口一个统一入口。 func LoadConfigFromViper() memorymodel.Config { return memoryservice.LoadConfigFromViper() @@ -51,7 +65,18 @@ func LoadConfigFromViper() memorymodel.Config { // 3. ragRuntime 允许为 nil,此时读取/向量同步自动回退旧逻辑; // 4. 若后续接入统一 DI 容器,也应优先注册这个 Module,而不是把内部 repo/service 继续向外泄漏。 func NewModule(db *gorm.DB, llmClient *infrallm.Client, ragRuntime infrarag.Runtime, cfg memorymodel.Config) *Module { - return wireModule(db, llmClient, ragRuntime, cfg) + return NewModuleWithObserve(db, llmClient, ragRuntime, cfg, ObserveDeps{}) +} + +// NewModuleWithObserve 创建带观测依赖的 memory 模块门面。 +func NewModuleWithObserve( + db *gorm.DB, + llmClient *infrallm.Client, + ragRuntime infrarag.Runtime, + cfg memorymodel.Config, + deps ObserveDeps, +) *Module { + return wireModule(db, llmClient, ragRuntime, cfg, deps) } // WithTx 返回绑定到指定事务连接的同构门面。 @@ -67,7 +92,10 @@ func (m *Module) WithTx(tx *gorm.DB) *Module { if tx == nil { return m } - return wireModule(tx, m.llmClient, m.ragRuntime, m.cfg) + return wireModule(tx, m.llmClient, m.ragRuntime, m.cfg, ObserveDeps{ + Observer: m.observer, + Metrics: m.metrics, + }) } // EnqueueExtract 把一次记忆抽取请求入队到 memory_jobs。 @@ -98,14 +126,46 @@ func (m *Module) ListItems(ctx context.Context, req memorymodel.ListItemsRequest return m.manageService.ListItems(ctx, req) } +// GetItem 返回当前用户自己的单条记忆详情。 +func (m *Module) GetItem(ctx context.Context, req model.MemoryGetItemRequest) (*memorymodel.ItemDTO, error) { + if m == nil || m.manageService == nil { + return nil, errors.New("memory module manage service is nil") + } + return m.manageService.GetItem(ctx, req) +} + +// CreateItem 手动新增一条用户记忆。 +func (m *Module) CreateItem(ctx context.Context, req model.MemoryCreateItemRequest) (*memorymodel.ItemDTO, error) { + if m == nil || m.manageService == nil { + return nil, errors.New("memory module manage service is nil") + } + return m.manageService.CreateItem(ctx, req) +} + +// UpdateItem 手动修改一条用户记忆。 +func (m *Module) UpdateItem(ctx context.Context, req model.MemoryUpdateItemRequest) (*memorymodel.ItemDTO, error) { + if m == nil || m.manageService == nil { + return nil, errors.New("memory module manage service is nil") + } + return m.manageService.UpdateItem(ctx, req) +} + // DeleteItem 软删除一条记忆,并补写审计日志。 -func (m *Module) DeleteItem(ctx context.Context, req memorymodel.DeleteItemRequest) (*memorymodel.ItemDTO, error) { +func (m *Module) DeleteItem(ctx context.Context, req model.MemoryDeleteItemRequest) (*memorymodel.ItemDTO, error) { if m == nil || m.manageService == nil { return nil, errors.New("memory module manage service is nil") } return m.manageService.DeleteItem(ctx, req) } +// RestoreItem 恢复一条 deleted/archived 记忆。 +func (m *Module) RestoreItem(ctx context.Context, req model.MemoryRestoreItemRequest) (*memorymodel.ItemDTO, error) { + if m == nil || m.manageService == nil { + return nil, errors.New("memory module manage service is nil") + } + return m.manageService.RestoreItem(ctx, req) +} + // GetUserSetting 读取用户当前生效的记忆开关。 func (m *Module) GetUserSetting(ctx context.Context, userID int) (memorymodel.UserSettingDTO, error) { if m == nil || m.manageService == nil { @@ -122,6 +182,30 @@ func (m *Module) UpsertUserSetting(ctx context.Context, req memorymodel.UpdateUs return m.manageService.UpsertUserSetting(ctx, req) } +// RunDedupCleanup 执行一次离线 dedup 治理。 +func (m *Module) RunDedupCleanup(ctx context.Context, req model.MemoryDedupCleanupRequest) (model.MemoryDedupCleanupResult, error) { + if m == nil || m.dedupRunner == nil { + return model.MemoryDedupCleanupResult{}, errors.New("memory module dedup runner is nil") + } + return m.dedupRunner.Run(ctx, req) +} + +// MemoryObserver 暴露 memory 模块当前使用的 observer,供注入桥接等外围能力复用。 +func (m *Module) MemoryObserver() memoryobserve.Observer { + if m == nil || m.observer == nil { + return memoryobserve.NewNopObserver() + } + return m.observer +} + +// MemoryMetrics 暴露 memory 模块当前使用的轻量计数器。 +func (m *Module) MemoryMetrics() memoryobserve.MetricsRecorder { + if m == nil || m.metrics == nil { + return memoryobserve.NewNopMetrics() + } + return m.metrics +} + // StartWorker 启动 memory 后台 worker。 // // 说明: @@ -142,15 +226,30 @@ func (m *Module) StartWorker(ctx context.Context) { log.Println("Memory worker started") } -func wireModule(db *gorm.DB, llmClient *infrallm.Client, ragRuntime infrarag.Runtime, cfg memorymodel.Config) *Module { +func wireModule( + db *gorm.DB, + llmClient *infrallm.Client, + ragRuntime infrarag.Runtime, + cfg memorymodel.Config, + deps ObserveDeps, +) *Module { jobRepo := memoryrepo.NewJobRepo(db) itemRepo := memoryrepo.NewItemRepo(db) auditRepo := memoryrepo.NewAuditRepo(db) settingsRepo := memoryrepo.NewSettingsRepo(db) + observer := deps.Observer + if observer == nil { + observer = memoryobserve.NewLoggerObserver(log.Default()) + } + metrics := deps.Metrics + if metrics == nil { + metrics = memoryobserve.NewMetricsRegistry() + } + vectorSyncer := memoryvectorsync.NewSyncer(ragRuntime, itemRepo, observer, metrics) enqueueService := memoryservice.NewEnqueueService(jobRepo) - readService := memoryservice.NewReadService(itemRepo, settingsRepo, ragRuntime, cfg) - manageService := memoryservice.NewManageService(db, itemRepo, auditRepo, settingsRepo) + readService := memoryservice.NewReadService(itemRepo, settingsRepo, ragRuntime, cfg, observer, metrics) + manageService := memoryservice.NewManageService(db, itemRepo, auditRepo, settingsRepo, vectorSyncer, observer, metrics) extractor := memoryorchestrator.NewLLMWriteOrchestrator(llmClient, cfg) // 决策编排器:仅在 DecisionEnabled 时才创建有效实例。 @@ -161,13 +260,16 @@ func wireModule(db *gorm.DB, llmClient *infrallm.Client, ragRuntime infrarag.Run decisionOrchestrator = memoryorchestrator.NewLLMDecisionOrchestrator(llmClient, cfg) } - runner := memoryworker.NewRunner(db, jobRepo, itemRepo, auditRepo, settingsRepo, extractor, ragRuntime, cfg, decisionOrchestrator) + runner := memoryworker.NewRunner(db, jobRepo, itemRepo, auditRepo, settingsRepo, extractor, ragRuntime, cfg, decisionOrchestrator, vectorSyncer, observer, metrics) + dedupRunner := memorycleanup.NewDedupRunner(db, itemRepo, auditRepo, vectorSyncer, observer, metrics) return &Module{ db: db, cfg: cfg, llmClient: llmClient, ragRuntime: ragRuntime, + observer: observer, + metrics: metrics, jobRepo: jobRepo, itemRepo: itemRepo, auditRepo: auditRepo, @@ -175,6 +277,8 @@ func wireModule(db *gorm.DB, llmClient *infrallm.Client, ragRuntime infrarag.Run enqueueService: enqueueService, readService: readService, manageService: manageService, + vectorSyncer: vectorSyncer, + dedupRunner: dedupRunner, runner: runner, } } diff --git a/backend/memory/observe/log_fields.go b/backend/memory/observe/log_fields.go new file mode 100644 index 0000000..5afe34a --- /dev/null +++ b/backend/memory/observe/log_fields.go @@ -0,0 +1,119 @@ +package observe + +import ( + "context" + "errors" + "strings" +) + +const ( + ComponentRead = "read" + ComponentWrite = "write" + ComponentInject = "inject" + ComponentManage = "manage" + ComponentCleanup = "cleanup" + + OperationRetrieve = "retrieve" + OperationDecision = "decision" + OperationInject = "inject" + OperationManage = "manage" + OperationDedup = "dedup" + + MetricJobTotal = "memory_job_total" + MetricJobRetryTotal = "memory_job_retry_total" + MetricDecisionTotal = "memory_decision_total" + MetricDecisionFallbackTotal = "memory_decision_fallback_total" + MetricRetrieveHitTotal = "memory_retrieve_hit_total" + MetricRetrieveDedupDropTotal = "memory_retrieve_dedup_drop_total" + MetricInjectItemTotal = "memory_inject_item_total" + MetricRAGFallbackTotal = "memory_rag_fallback_total" + MetricManageTotal = "memory_manage_total" + MetricCleanupRunTotal = "memory_cleanup_run_total" + MetricCleanupArchivedTotal = "memory_cleanup_archived_total" +) + +type fieldsContextKey struct{} + +// WithFields 把 memory 链路公共字段挂进上下文,供下游日志复用。 +// +// 职责边界: +// 1. 只负责字段透传与覆盖,不负责真正打印日志; +// 2. 只保留有意义的字段,避免结构化日志长期堆积空值; +// 3. 若上游已写入同名字段,则以后写值为准,方便链路逐层补齐上下文。 +func WithFields(ctx context.Context, fields map[string]any) context.Context { + if len(fields) == 0 { + return ctx + } + if ctx == nil { + ctx = context.Background() + } + + merged := FieldsFromContext(ctx) + for key, value := range fields { + key = strings.TrimSpace(key) + if key == "" || !shouldKeepField(value) { + continue + } + merged[key] = value + } + if len(merged) == 0 { + return ctx + } + return context.WithValue(ctx, fieldsContextKey{}, merged) +} + +// FieldsFromContext 读取当前上下文中已经累积的观测字段。 +func FieldsFromContext(ctx context.Context) map[string]any { + if ctx == nil { + return map[string]any{} + } + raw, ok := ctx.Value(fieldsContextKey{}).(map[string]any) + if !ok || len(raw) == 0 { + return map[string]any{} + } + + result := make(map[string]any, len(raw)) + for key, value := range raw { + result[key] = value + } + return result +} + +// MergeFields 合并多份结构化字段,后写同名字段覆盖先写字段。 +func MergeFields(parts ...map[string]any) map[string]any { + result := make(map[string]any) + for _, part := range parts { + for key, value := range part { + key = strings.TrimSpace(key) + if key == "" || !shouldKeepField(value) { + continue + } + result[key] = value + } + } + return result +} + +// ClassifyError 把常见错误压成稳定错误码,便于日志与指标统一聚合。 +func ClassifyError(err error) string { + switch { + case err == nil: + return "" + case errors.Is(err, context.DeadlineExceeded): + return "deadline_exceeded" + case errors.Is(err, context.Canceled): + return "canceled" + default: + return "memory_error" + } +} + +func shouldKeepField(value any) bool { + if value == nil { + return false + } + if text, ok := value.(string); ok { + return strings.TrimSpace(text) != "" + } + return true +} diff --git a/backend/memory/observe/metrics.go b/backend/memory/observe/metrics.go new file mode 100644 index 0000000..ae529a0 --- /dev/null +++ b/backend/memory/observe/metrics.go @@ -0,0 +1,158 @@ +package observe + +import ( + "sort" + "strings" + "sync" +) + +// CounterSnapshot 是轻量计数器的快照视图,供后续排障或接平台时读取。 +type CounterSnapshot struct { + Name string + Labels map[string]string + Value int64 +} + +// MetricsRecorder 描述 memory 模块对计数器的最小依赖。 +type MetricsRecorder interface { + AddCounter(name string, delta int64, labels map[string]string) + Snapshot() []CounterSnapshot +} + +// NewNopMetrics 返回空实现,保证无观测平台时仍可安全运行。 +func NewNopMetrics() MetricsRecorder { + return nopMetrics{} +} + +type nopMetrics struct{} + +func (nopMetrics) AddCounter(string, int64, map[string]string) {} + +func (nopMetrics) Snapshot() []CounterSnapshot { + return nil +} + +// MetricsRegistry 是 memory 模块当前阶段的轻量内存计数器实现。 +// +// 职责边界: +// 1. 只做线程安全计数,不负责导出协议; +// 2. 标签做低基数归一化,避免治理期临时字段把指标打爆; +// 3. 后续若项目统一接 Prometheus,可直接保留调用口径并替换实现。 +type MetricsRegistry struct { + mu sync.RWMutex + counters map[string]*counterRecord +} + +type counterRecord struct { + name string + labels map[string]string + value int64 +} + +func NewMetricsRegistry() *MetricsRegistry { + return &MetricsRegistry{ + counters: make(map[string]*counterRecord), + } +} + +// AddCounter 追加计数值;delta<=0 时直接忽略,避免脏数据污染快照。 +func (r *MetricsRegistry) AddCounter(name string, delta int64, labels map[string]string) { + if r == nil || delta <= 0 { + return + } + + name = strings.TrimSpace(name) + if name == "" { + return + } + normalizedLabels := normalizeLabels(labels) + key := buildCounterKey(name, normalizedLabels) + + r.mu.Lock() + defer r.mu.Unlock() + if existing, ok := r.counters[key]; ok { + existing.value += delta + return + } + r.counters[key] = &counterRecord{ + name: name, + labels: normalizedLabels, + value: delta, + } +} + +// Snapshot 返回当前全部计数器快照,便于后续排障或测试读取。 +func (r *MetricsRegistry) Snapshot() []CounterSnapshot { + if r == nil { + return nil + } + + r.mu.RLock() + defer r.mu.RUnlock() + if len(r.counters) == 0 { + return nil + } + + keys := make([]string, 0, len(r.counters)) + for key := range r.counters { + keys = append(keys, key) + } + sort.Strings(keys) + + result := make([]CounterSnapshot, 0, len(keys)) + for _, key := range keys { + record := r.counters[key] + labels := make(map[string]string, len(record.labels)) + for labelKey, labelValue := range record.labels { + labels[labelKey] = labelValue + } + result = append(result, CounterSnapshot{ + Name: record.name, + Labels: labels, + Value: record.value, + }) + } + return result +} + +func normalizeLabels(labels map[string]string) map[string]string { + if len(labels) == 0 { + return nil + } + + result := make(map[string]string, len(labels)) + for key, value := range labels { + key = strings.TrimSpace(key) + value = strings.TrimSpace(value) + if key == "" || value == "" { + continue + } + result[key] = value + } + if len(result) == 0 { + return nil + } + return result +} + +func buildCounterKey(name string, labels map[string]string) string { + if len(labels) == 0 { + return name + } + + keys := make([]string, 0, len(labels)) + for key := range labels { + keys = append(keys, key) + } + sort.Strings(keys) + + var sb strings.Builder + sb.WriteString(name) + for _, key := range keys { + sb.WriteString("|") + sb.WriteString(key) + sb.WriteString("=") + sb.WriteString(labels[key]) + } + return sb.String() +} diff --git a/backend/memory/observe/observer.go b/backend/memory/observe/observer.go new file mode 100644 index 0000000..74b51ea --- /dev/null +++ b/backend/memory/observe/observer.go @@ -0,0 +1,109 @@ +package observe + +import ( + "context" + "fmt" + "log" + "sort" + "strings" +) + +// Level 表示 memory 结构化观测事件等级。 +type Level string + +const ( + LevelInfo Level = "info" + LevelWarn Level = "warn" + LevelError Level = "error" +) + +// Event 描述一次 memory 模块内部结构化观测事件。 +// +// 职责边界: +// 1. 只承载稳定字段,不绑定具体日志平台; +// 2. 组件与操作名尽量保持低基数,避免后续指标聚合失控; +// 3. 字段内容应偏“排障与治理”,不承载大段原始文本。 +type Event struct { + Level Level + Component string + Operation string + Fields map[string]any +} + +// Observer 是 memory 模块的最小观测接口。 +type Observer interface { + Observe(ctx context.Context, event Event) +} + +// ObserverFunc 允许用函数快速适配 Observer。 +type ObserverFunc func(ctx context.Context, event Event) + +func (f ObserverFunc) Observe(ctx context.Context, event Event) { + if f == nil { + return + } + f(ctx, event) +} + +// NewNopObserver 返回空实现,保证观测能力不会反向阻塞主链路。 +func NewNopObserver() Observer { + return ObserverFunc(func(context.Context, Event) {}) +} + +// NewLoggerObserver 返回标准日志实现,当前阶段默认打到后端进程日志。 +func NewLoggerObserver(logger *log.Logger) Observer { + if logger == nil { + logger = log.Default() + } + return &loggerObserver{logger: logger} +} + +type loggerObserver struct { + logger *log.Logger +} + +func (o *loggerObserver) Observe(ctx context.Context, event Event) { + if o == nil || o.logger == nil { + return + } + + level := strings.TrimSpace(string(event.Level)) + if level == "" { + level = string(LevelInfo) + } + component := strings.TrimSpace(event.Component) + if component == "" { + component = "unknown" + } + operation := strings.TrimSpace(event.Operation) + if operation == "" { + operation = "unknown" + } + + fields := FieldsFromContext(ctx) + for key, value := range event.Fields { + key = strings.TrimSpace(key) + if key == "" || !shouldKeepField(value) { + continue + } + fields[key] = value + } + + parts := []string{ + "memory", + fmt.Sprintf("level=%s", level), + fmt.Sprintf("component=%s", component), + fmt.Sprintf("operation=%s", operation), + } + + keys := make([]string, 0, len(fields)) + for key := range fields { + keys = append(keys, key) + } + sort.Strings(keys) + for _, key := range keys { + parts = append(parts, fmt.Sprintf("%s=%v", key, fields[key])) + } + + o.logger.Print(strings.Join(parts, " ")) +} diff --git a/backend/memory/repo/item_repo.go b/backend/memory/repo/item_repo.go index f5d9c4b..c74a1db 100644 --- a/backend/memory/repo/item_repo.go +++ b/backend/memory/repo/item_repo.go @@ -46,6 +46,54 @@ func (r *ItemRepo) UpsertItems(ctx context.Context, items []model.MemoryItem) er return nil } +// Create 写入单条记忆并返回带自增主键的结果。 +// +// 职责边界: +// 1. 只负责单条落库,不负责内容归一化与业务校验; +// 2. 默认把 vector_status 视为上游已决策好的桥接状态,不在这里擅自改写; +// 3. 返回值用于上游继续写 audit 或做向量同步。 +func (r *ItemRepo) Create(ctx context.Context, fields memorymodel.CreateItemFields) (*model.MemoryItem, error) { + if r == nil || r.db == nil { + return nil, errors.New("memory item repo is nil") + } + if fields.UserID <= 0 { + return nil, errors.New("memory item create user_id is invalid") + } + + item := model.MemoryItem{ + UserID: fields.UserID, + ConversationID: strPtrOrNil(fields.ConversationID), + AssistantID: strPtrOrNil(fields.AssistantID), + RunID: strPtrOrNil(fields.RunID), + MemoryType: fields.MemoryType, + Title: fields.Title, + Content: fields.Content, + NormalizedContent: strPtrOrNil(fields.NormalizedContent), + ContentHash: strPtrOrNil(fields.ContentHash), + Confidence: fields.Confidence, + Importance: fields.Importance, + SensitivityLevel: fields.SensitivityLevel, + SourceMessageID: fields.SourceMessageID, + SourceEventID: fields.SourceEventID, + IsExplicit: fields.IsExplicit, + Status: fields.Status, + TTLAt: fields.TTLAt, + LastAccessAt: fields.LastAccessAt, + VectorStatus: fields.VectorStatus, + } + if item.Status == "" { + item.Status = model.MemoryItemStatusActive + } + if strings.TrimSpace(item.VectorStatus) == "" { + item.VectorStatus = "pending" + } + + if err := r.db.WithContext(ctx).Create(&item).Error; err != nil { + return nil, err + } + return &item, nil +} + // FindByQuery 按统一过滤条件读取记忆条目。 // // 步骤化说明: @@ -324,6 +372,53 @@ func (r *ItemRepo) UpdateContentByID(ctx context.Context, memoryID int64, fields }).Error } +// UpdateManagedFieldsByID 更新“用户管理侧”允许修改的记忆字段。 +func (r *ItemRepo) UpdateManagedFieldsByID(ctx context.Context, userID int, memoryID int64, fields memorymodel.UpdateItemFields) error { + return r.UpdateManagedFieldsByIDAt(ctx, userID, memoryID, fields, time.Now()) +} + +// UpdateManagedFieldsByIDAt 更新“用户管理侧”允许修改的记忆字段,并允许显式指定更新时间。 +// +// 步骤化说明: +// 1. 这里只改内容侧和展示侧字段,不改 user_id/status 等归属语义; +// 2. memory_type/content 变化后,会把 vector_status 置为 pending,提示上游需要重新同步向量; +// 3. TTLAt 允许被设置为 nil,用于显式清空过期时间。 +func (r *ItemRepo) UpdateManagedFieldsByIDAt( + ctx context.Context, + userID int, + memoryID int64, + fields memorymodel.UpdateItemFields, + updatedAt time.Time, +) error { + if r == nil || r.db == nil { + return errors.New("memory item repo is nil") + } + if userID <= 0 || memoryID <= 0 { + return errors.New("memory item update params is invalid") + } + if updatedAt.IsZero() { + updatedAt = time.Now() + } + + return r.db.WithContext(ctx). + Model(&model.MemoryItem{}). + Where("id = ? AND user_id = ?", memoryID, userID). + Updates(map[string]any{ + "memory_type": fields.MemoryType, + "title": fields.Title, + "content": fields.Content, + "normalized_content": fields.NormalizedContent, + "content_hash": fields.ContentHash, + "confidence": fields.Confidence, + "importance": fields.Importance, + "sensitivity_level": fields.SensitivityLevel, + "is_explicit": fields.IsExplicit, + "ttl_at": fields.TTLAt, + "vector_status": "pending", + "updated_at": updatedAt, + }).Error +} + // SoftDeleteByID 软删除指定用户的某条记忆。 // // 说明: @@ -348,6 +443,95 @@ func (r *ItemRepo) SoftDeleteByID(ctx context.Context, userID int, memoryID int6 }).Error } +// RestoreByID 把 deleted/archived 记忆恢复为 active。 +func (r *ItemRepo) RestoreByID(ctx context.Context, userID int, memoryID int64) error { + return r.RestoreByIDAt(ctx, userID, memoryID, time.Now()) +} + +// RestoreByIDAt 把 deleted/archived 记忆恢复为 active,并显式刷新 vector_status。 +// +// 这样做的原因: +// 1. 恢复后的记忆需要重新参与语义召回,因此向量侧也要重新同步; +// 2. 这里统一把 vector_status 置为 pending,避免上游遗漏桥接状态更新; +// 3. 若目标记录本身已是 active,上游应先读快照决定是否真的调用恢复。 +func (r *ItemRepo) RestoreByIDAt(ctx context.Context, userID int, memoryID int64, updatedAt time.Time) error { + if r == nil || r.db == nil { + return errors.New("memory item repo is nil") + } + if userID <= 0 || memoryID <= 0 { + return errors.New("memory item restore params is invalid") + } + if updatedAt.IsZero() { + updatedAt = time.Now() + } + + return r.db.WithContext(ctx). + Model(&model.MemoryItem{}). + Where("id = ? AND user_id = ?", memoryID, userID). + Updates(map[string]any{ + "status": model.MemoryItemStatusActive, + "vector_status": "pending", + "updated_at": updatedAt, + }).Error +} + +// ArchiveByIDsAt 把一批重复记忆改为 archived,并等待上游删除向量副本。 +func (r *ItemRepo) ArchiveByIDsAt(ctx context.Context, ids []int64, updatedAt time.Time) error { + if r == nil || r.db == nil { + return errors.New("memory item repo is nil") + } + if len(ids) == 0 { + return nil + } + if updatedAt.IsZero() { + updatedAt = time.Now() + } + + return r.db.WithContext(ctx). + Model(&model.MemoryItem{}). + Where("id IN ?", ids). + Where("status = ?", model.MemoryItemStatusActive). + Updates(map[string]any{ + "status": model.MemoryItemStatusArchived, + "vector_status": "pending", + "updated_at": updatedAt, + }).Error +} + +// ListActiveItemsForDedup 读取“当前仍 active 且带 content_hash”的候选记忆,供离线 dedup 治理使用。 +// +// 步骤化说明: +// 1. 只扫描 status=active 且 hash 非空的记录,因为治理目标是“活跃重复项”; +// 2. 先按 user/type/hash 分组,再按更新时间、置信度、主键逆序排列,方便上游顺序分组; +// 3. Limit 仅用于保守控量,不保证整组完整,因此首次治理建议留空或给足够大值。 +func (r *ItemRepo) ListActiveItemsForDedup(ctx context.Context, userID int, limit int) ([]model.MemoryItem, error) { + if r == nil || r.db == nil { + return nil, errors.New("memory item repo is nil") + } + + db := r.db.WithContext(ctx). + Model(&model.MemoryItem{}). + Where("status = ?", model.MemoryItemStatusActive). + Where("content_hash IS NOT NULL AND content_hash <> ''") + if userID > 0 { + db = db.Where("user_id = ?", userID) + } + if limit > 0 { + db = db.Limit(limit) + } + + var items []model.MemoryItem + err := db. + Order("user_id ASC"). + Order("memory_type ASC"). + Order("content_hash ASC"). + Order("updated_at DESC"). + Order("confidence DESC"). + Order("id DESC"). + Find(&items).Error + return items, err +} + func applyScopedEquality(db *gorm.DB, column, value string, includeGlobal bool) *gorm.DB { value = strings.TrimSpace(value) if value == "" { diff --git a/backend/memory/service/manage_service.go b/backend/memory/service/manage_service.go index 32606c2..048b790 100644 --- a/backend/memory/service/manage_service.go +++ b/backend/memory/service/manage_service.go @@ -7,15 +7,20 @@ import ( "time" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" + memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" memoryrepo "github.com/LoveLosita/smartflow/backend/memory/repo" memoryutils "github.com/LoveLosita/smartflow/backend/memory/utils" + memoryvectorsync "github.com/LoveLosita/smartflow/backend/memory/vectorsync" "github.com/LoveLosita/smartflow/backend/model" + "github.com/LoveLosita/smartflow/backend/respond" "gorm.io/gorm" ) const ( - defaultManageListLimit = 20 - maxManageListLimit = 100 + defaultManageListLimit = 20 + maxManageListLimit = 100 + defaultManualConfidence = 0.95 + defaultManualImportance = 0.90 ) // ManageService 负责 memory 模块内部的管理面能力。 @@ -29,6 +34,9 @@ type ManageService struct { itemRepo *memoryrepo.ItemRepo auditRepo *memoryrepo.AuditRepo settingsRepo *memoryrepo.SettingsRepo + vectorSyncer *memoryvectorsync.Syncer + observer memoryobserve.Observer + metrics memoryobserve.MetricsRecorder } func NewManageService( @@ -36,12 +44,24 @@ func NewManageService( itemRepo *memoryrepo.ItemRepo, auditRepo *memoryrepo.AuditRepo, settingsRepo *memoryrepo.SettingsRepo, + vectorSyncer *memoryvectorsync.Syncer, + observer memoryobserve.Observer, + metrics memoryobserve.MetricsRecorder, ) *ManageService { + if observer == nil { + observer = memoryobserve.NewNopObserver() + } + if metrics == nil { + metrics = memoryobserve.NewNopMetrics() + } return &ManageService{ db: db, itemRepo: itemRepo, auditRepo: auditRepo, settingsRepo: settingsRepo, + vectorSyncer: vectorSyncer, + observer: observer, + metrics: metrics, } } @@ -77,18 +97,152 @@ func (s *ManageService) ListItems(ctx context.Context, req memorymodel.ListItems return toItemDTOs(items), nil } +// GetItem 返回“当前用户自己的某条记忆”详情。 +func (s *ManageService) GetItem(ctx context.Context, req model.MemoryGetItemRequest) (*memorymodel.ItemDTO, error) { + if s == nil || s.itemRepo == nil { + return nil, errors.New("memory manage service is nil") + } + if req.UserID <= 0 { + return nil, respond.WrongUserID + } + if req.MemoryID <= 0 { + return nil, respond.WrongParamType + } + + item, err := s.itemRepo.GetByIDForUser(ctx, req.UserID, req.MemoryID) + if err != nil { + return nil, translateManageError(err) + } + dto := toItemDTO(*item) + return &dto, nil +} + +// CreateItem 手动新增一条用户记忆,并补审计与向量同步桥接。 +func (s *ManageService) CreateItem(ctx context.Context, req model.MemoryCreateItemRequest) (*memorymodel.ItemDTO, error) { + if s == nil || s.db == nil || s.itemRepo == nil || s.auditRepo == nil { + return nil, errors.New("memory manage service is not initialized") + } + if req.UserID <= 0 { + return nil, respond.WrongUserID + } + + fields, err := buildCreateItemFields(req) + if err != nil { + s.recordManageAction(ctx, "create", req.UserID, 0, fields.MemoryType, false, err) + return nil, err + } + + var createdItem model.MemoryItem + err = s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + itemRepo := s.itemRepo.WithTx(tx) + auditRepo := s.auditRepo.WithTx(tx) + + created, createErr := itemRepo.Create(ctx, fields) + if createErr != nil { + return createErr + } + createdItem = *created + + audit := memoryutils.BuildItemAuditLog( + createdItem.ID, + createdItem.UserID, + memoryutils.AuditOperationCreate, + memoryutils.NormalizeOperatorType(req.OperatorType), + normalizeManageReason(req.Reason, "用户手动新增记忆"), + nil, + &createdItem, + ) + return auditRepo.Create(ctx, audit) + }) + if err != nil { + err = translateManageError(err) + s.recordManageAction(ctx, "create", req.UserID, 0, fields.MemoryType, false, err) + return nil, err + } + + s.vectorSyncer.Upsert(ctx, "", []model.MemoryItem{createdItem}) + s.recordManageAction(ctx, "create", req.UserID, createdItem.ID, createdItem.MemoryType, true, nil) + dto := toItemDTO(createdItem) + return &dto, nil +} + +// UpdateItem 手动修改一条用户记忆,并补审计与向量重同步桥接。 +func (s *ManageService) UpdateItem(ctx context.Context, req model.MemoryUpdateItemRequest) (*memorymodel.ItemDTO, error) { + if s == nil || s.db == nil || s.itemRepo == nil || s.auditRepo == nil { + return nil, errors.New("memory manage service is not initialized") + } + if req.UserID <= 0 { + return nil, respond.WrongUserID + } + if req.MemoryID <= 0 { + return nil, respond.WrongParamType + } + + var updatedItem model.MemoryItem + err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + itemRepo := s.itemRepo.WithTx(tx) + auditRepo := s.auditRepo.WithTx(tx) + + current, getErr := itemRepo.GetByIDForUser(ctx, req.UserID, req.MemoryID) + if getErr != nil { + return getErr + } + + fields, afterItem, buildErr := buildUpdateItemFields(req, *current) + if buildErr != nil { + return buildErr + } + now := time.Now() + afterItem.UpdatedAt = &now + afterItem.VectorStatus = "pending" + + if updateErr := itemRepo.UpdateManagedFieldsByIDAt(ctx, req.UserID, req.MemoryID, fields, now); updateErr != nil { + return updateErr + } + + audit := memoryutils.BuildItemAuditLog( + current.ID, + current.UserID, + memoryutils.AuditOperationUpdate, + memoryutils.NormalizeOperatorType(req.OperatorType), + normalizeManageReason(req.Reason, "用户手动修改记忆"), + current, + &afterItem, + ) + if auditErr := auditRepo.Create(ctx, audit); auditErr != nil { + return auditErr + } + + updatedItem = afterItem + return nil + }) + if err != nil { + err = translateManageError(err) + s.recordManageAction(ctx, "update", req.UserID, req.MemoryID, resolveUpdateMemoryType(req), false, err) + return nil, err + } + + s.vectorSyncer.Upsert(ctx, "", []model.MemoryItem{updatedItem}) + s.recordManageAction(ctx, "update", req.UserID, updatedItem.ID, updatedItem.MemoryType, true, nil) + dto := toItemDTO(updatedItem) + return &dto, nil +} + // DeleteItem 软删除一条记忆,并补写审计日志。 // // 步骤化说明: // 1. 先在事务里读取当前条目快照,确保审计前镜像和实际删除对象一致; // 2. 若该条目已是 deleted,则直接按幂等语义返回,避免重复写多条删除审计; // 3. 状态更新成功后再写 audit log,保证“有删除就有审计”,失败时整笔事务回滚。 -func (s *ManageService) DeleteItem(ctx context.Context, req memorymodel.DeleteItemRequest) (*memorymodel.ItemDTO, error) { +func (s *ManageService) DeleteItem(ctx context.Context, req model.MemoryDeleteItemRequest) (*memorymodel.ItemDTO, error) { if s == nil || s.db == nil || s.itemRepo == nil || s.auditRepo == nil { return nil, errors.New("memory manage service is not initialized") } - if req.UserID <= 0 || req.MemoryID <= 0 { - return nil, nil + if req.UserID <= 0 { + return nil, respond.WrongUserID + } + if req.MemoryID <= 0 { + return nil, respond.WrongParamType } now := time.Now() @@ -113,8 +267,9 @@ func (s *ManageService) DeleteItem(ctx context.Context, req memorymodel.DeleteIt after := before after.Status = model.MemoryItemStatusDeleted after.UpdatedAt = &now + after.VectorStatus = "pending" - if err = itemRepo.UpdateStatusByIDAt(ctx, req.UserID, req.MemoryID, model.MemoryItemStatusDeleted, now); err != nil { + if err = itemRepo.SoftDeleteByID(ctx, req.UserID, req.MemoryID); err != nil { return err } @@ -135,16 +290,87 @@ func (s *ManageService) DeleteItem(ctx context.Context, req memorymodel.DeleteIt return nil }) if err != nil { + err = translateManageError(err) + s.recordManageAction(ctx, "delete", req.UserID, req.MemoryID, "", false, err) return nil, err } if deletedItem.ID <= 0 { return nil, nil } + if deletedItem.Status == model.MemoryItemStatusDeleted { + s.vectorSyncer.Delete(ctx, "", []int64{deletedItem.ID}) + } + s.recordManageAction(ctx, "delete", req.UserID, deletedItem.ID, deletedItem.MemoryType, true, nil) result := toItemDTO(deletedItem) return &result, nil } +// RestoreItem 把 archived/deleted 记忆恢复为 active,并补审计与向量同步桥接。 +func (s *ManageService) RestoreItem(ctx context.Context, req model.MemoryRestoreItemRequest) (*memorymodel.ItemDTO, error) { + if s == nil || s.db == nil || s.itemRepo == nil || s.auditRepo == nil { + return nil, errors.New("memory manage service is not initialized") + } + if req.UserID <= 0 { + return nil, respond.WrongUserID + } + if req.MemoryID <= 0 { + return nil, respond.WrongParamType + } + + var restoredItem model.MemoryItem + err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + itemRepo := s.itemRepo.WithTx(tx) + auditRepo := s.auditRepo.WithTx(tx) + + current, getErr := itemRepo.GetByIDForUser(ctx, req.UserID, req.MemoryID) + if getErr != nil { + return getErr + } + if current.Status == model.MemoryItemStatusActive { + restoredItem = *current + return nil + } + + now := time.Now() + before := *current + after := before + after.Status = model.MemoryItemStatusActive + after.UpdatedAt = &now + after.VectorStatus = "pending" + + if restoreErr := itemRepo.RestoreByIDAt(ctx, req.UserID, req.MemoryID, now); restoreErr != nil { + return restoreErr + } + + audit := memoryutils.BuildItemAuditLog( + before.ID, + before.UserID, + memoryutils.AuditOperationRestore, + memoryutils.NormalizeOperatorType(req.OperatorType), + normalizeManageReason(req.Reason, "用户恢复记忆"), + &before, + &after, + ) + if auditErr := auditRepo.Create(ctx, audit); auditErr != nil { + return auditErr + } + + restoredItem = after + return nil + }) + if err != nil { + err = translateManageError(err) + s.recordManageAction(ctx, "restore", req.UserID, req.MemoryID, "", false, err) + return nil, err + } + + s.vectorSyncer.Upsert(ctx, "", []model.MemoryItem{restoredItem}) + s.recordManageAction(ctx, "restore", req.UserID, restoredItem.ID, restoredItem.MemoryType, true, nil) + dto := toItemDTO(restoredItem) + return &dto, nil +} + // GetUserSetting 返回用户当前生效的记忆开关。 // // 返回语义: @@ -201,3 +427,233 @@ func normalizeDeleteReason(reason string) string { } return reason } + +func normalizeManageReason(reason string, fallback string) string { + reason = strings.TrimSpace(reason) + if reason == "" { + return fallback + } + return reason +} + +func translateManageError(err error) error { + switch { + case err == nil: + return nil + case errors.Is(err, gorm.ErrRecordNotFound): + return respond.MemoryItemNotFound + default: + return err + } +} + +func buildCreateItemFields(req model.MemoryCreateItemRequest) (memorymodel.CreateItemFields, error) { + memoryType, err := normalizeManagedMemoryType(req.MemoryType) + if err != nil { + return memorymodel.CreateItemFields{}, err + } + content, normalizedContent, err := normalizeManagedContent(req.Content) + if err != nil { + return memorymodel.CreateItemFields{}, err + } + + title := normalizeManagedTitle(req.Title, content) + return memorymodel.CreateItemFields{ + UserID: req.UserID, + ConversationID: strings.TrimSpace(req.ConversationID), + AssistantID: strings.TrimSpace(req.AssistantID), + RunID: strings.TrimSpace(req.RunID), + MemoryType: memoryType, + Title: title, + Content: content, + NormalizedContent: normalizedContent, + ContentHash: memoryutils.HashContent(memoryType, normalizedContent), + Confidence: normalizeManageScore(req.Confidence, defaultManualConfidence), + Importance: normalizeManageScore(req.Importance, defaultManualImportance), + SensitivityLevel: normalizeManageSensitivity(req.SensitivityLevel, 0), + IsExplicit: normalizeManageBool(req.IsExplicit, true), + Status: model.MemoryItemStatusActive, + TTLAt: req.TTLAt, + VectorStatus: "pending", + }, nil +} + +func buildUpdateItemFields( + req model.MemoryUpdateItemRequest, + current model.MemoryItem, +) (memorymodel.UpdateItemFields, model.MemoryItem, error) { + memoryType := current.MemoryType + if req.MemoryType != nil { + normalizedType, err := normalizeManagedMemoryType(*req.MemoryType) + if err != nil { + return memorymodel.UpdateItemFields{}, model.MemoryItem{}, err + } + memoryType = normalizedType + } + + content := current.Content + if req.Content != nil { + normalizedContentValue, _, err := normalizeManagedContent(*req.Content) + if err != nil { + return memorymodel.UpdateItemFields{}, model.MemoryItem{}, err + } + content = normalizedContentValue + } + normalizedContent := normalizeContentForHash(content) + if normalizedContent == "" { + return memorymodel.UpdateItemFields{}, model.MemoryItem{}, respond.MemoryInvalidContent + } + + title := current.Title + if req.Title != nil { + title = normalizeManagedTitle(*req.Title, content) + } + ttlAt := current.TTLAt + if req.ClearTTL { + ttlAt = nil + } else if req.TTLAt != nil { + ttlAt = req.TTLAt + } + + fields := memorymodel.UpdateItemFields{ + MemoryType: memoryType, + Title: title, + Content: content, + NormalizedContent: normalizedContent, + ContentHash: memoryutils.HashContent(memoryType, normalizedContent), + Confidence: normalizeManageScore(req.Confidence, current.Confidence), + Importance: normalizeManageScore(req.Importance, current.Importance), + SensitivityLevel: normalizeManageSensitivity(req.SensitivityLevel, current.SensitivityLevel), + IsExplicit: normalizeManageBool(req.IsExplicit, current.IsExplicit), + TTLAt: ttlAt, + } + + after := current + after.MemoryType = fields.MemoryType + after.Title = fields.Title + after.Content = fields.Content + after.NormalizedContent = strPtr(fields.NormalizedContent) + after.ContentHash = strPtr(fields.ContentHash) + after.Confidence = fields.Confidence + after.Importance = fields.Importance + after.SensitivityLevel = fields.SensitivityLevel + after.IsExplicit = fields.IsExplicit + after.TTLAt = fields.TTLAt + return fields, after, nil +} + +func normalizeManagedMemoryType(raw string) (string, error) { + normalized := memorymodel.NormalizeMemoryType(raw) + if normalized == "" { + return "", respond.MemoryInvalidType + } + return normalized, nil +} + +func normalizeManagedContent(raw string) (string, string, error) { + content := strings.TrimSpace(raw) + if content == "" { + return "", "", respond.MemoryInvalidContent + } + normalized := normalizeContentForHash(content) + if normalized == "" { + return "", "", respond.MemoryInvalidContent + } + return content, normalized, nil +} + +func normalizeManagedTitle(raw string, content string) string { + title := strings.TrimSpace(raw) + if title != "" { + return title + } + content = strings.TrimSpace(content) + if content == "" { + return "未命名记忆" + } + runes := []rune(content) + if len(runes) > 24 { + return string(runes[:24]) + } + return content +} + +func normalizeManageScore(value *float64, defaultValue float64) float64 { + if value == nil { + return clamp01(defaultValue) + } + return clamp01(*value) +} + +func normalizeManageSensitivity(value *int, defaultValue int) int { + if value == nil { + return defaultValue + } + if *value < 0 { + return defaultValue + } + return *value +} + +func normalizeManageBool(value *bool, defaultValue bool) bool { + if value == nil { + return defaultValue + } + return *value +} + +func resolveUpdateMemoryType(req model.MemoryUpdateItemRequest) string { + if req.MemoryType == nil { + return "" + } + return strings.TrimSpace(*req.MemoryType) +} + +func strPtr(value string) *string { + value = strings.TrimSpace(value) + if value == "" { + return nil + } + result := value + return &result +} + +func (s *ManageService) recordManageAction( + ctx context.Context, + operation string, + userID int, + memoryID int64, + memoryType string, + success bool, + err error, +) { + if s == nil { + return + } + + status := "success" + level := memoryobserve.LevelInfo + if !success || err != nil { + status = "error" + level = memoryobserve.LevelWarn + } + + s.metrics.AddCounter(memoryobserve.MetricManageTotal, 1, map[string]string{ + "operation": strings.TrimSpace(operation), + "status": status, + }) + s.observer.Observe(ctx, memoryobserve.Event{ + Level: level, + Component: memoryobserve.ComponentManage, + Operation: memoryobserve.OperationManage, + Fields: map[string]any{ + "user_id": userID, + "memory_id": memoryID, + "action": strings.TrimSpace(operation), + "memory_type": strings.TrimSpace(memoryType), + "success": success && err == nil, + "error": err, + "error_code": memoryobserve.ClassifyError(err), + }, + }) +} diff --git a/backend/memory/service/read_service.go b/backend/memory/service/read_service.go index cdce5ad..16d358f 100644 --- a/backend/memory/service/read_service.go +++ b/backend/memory/service/read_service.go @@ -10,6 +10,7 @@ import ( infrarag "github.com/LoveLosita/smartflow/backend/infra/rag" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" + memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" memoryrepo "github.com/LoveLosita/smartflow/backend/memory/repo" memoryutils "github.com/LoveLosita/smartflow/backend/memory/utils" "github.com/LoveLosita/smartflow/backend/model" @@ -31,6 +32,26 @@ type ReadService struct { settingsRepo *memoryrepo.SettingsRepo ragRuntime infrarag.Runtime cfg memorymodel.Config + observer memoryobserve.Observer + metrics memoryobserve.MetricsRecorder +} + +type retrieveTelemetry struct { + ReadMode string + QueryLen int + LegacyHitCount int + PinnedHitCount int + SemanticHitCount int + DedupDropCount int + FinalCount int + Degraded bool + RAGFallbackUsed bool +} + +type semanticRetrieveTelemetry struct { + HitCount int + Degraded bool + RAGFallbackUsed bool } func NewReadService( @@ -38,12 +59,22 @@ func NewReadService( settingsRepo *memoryrepo.SettingsRepo, ragRuntime infrarag.Runtime, cfg memorymodel.Config, + observer memoryobserve.Observer, + metrics memoryobserve.MetricsRecorder, ) *ReadService { + if observer == nil { + observer = memoryobserve.NewNopObserver() + } + if metrics == nil { + metrics = memoryobserve.NewNopMetrics() + } return &ReadService{ itemRepo: itemRepo, settingsRepo: settingsRepo, ragRuntime: ragRuntime, cfg: cfg, + observer: observer, + metrics: metrics, } } @@ -60,9 +91,14 @@ func (s *ReadService) Retrieve(ctx context.Context, req memorymodel.RetrieveRequ if now.IsZero() { now = time.Now() } + telemetry := retrieveTelemetry{ + ReadMode: s.cfg.EffectiveReadMode(), + QueryLen: len(strings.TrimSpace(req.Query)), + } setting, err := s.settingsRepo.GetByUserID(ctx, req.UserID) if err != nil { + s.recordRetrieve(ctx, req, telemetry, err) return nil, err } effectiveSetting := memoryutils.EffectiveUserSetting(setting, req.UserID) @@ -72,16 +108,29 @@ func (s *ReadService) Retrieve(ctx context.Context, req memorymodel.RetrieveRequ limit := normalizeLimit(req.Limit, defaultRetrieveLimit, maxRetrieveLimit) if s.cfg.EffectiveReadMode() == memorymodel.MemoryReadModeHybrid { - return s.HybridRetrieve(ctx, req, effectiveSetting, limit, now) + items, hybridTelemetry, hybridErr := s.HybridRetrieve(ctx, req, effectiveSetting, limit, now) + hybridTelemetry.ReadMode = memorymodel.MemoryReadModeHybrid + hybridTelemetry.QueryLen = telemetry.QueryLen + s.recordRetrieve(ctx, req, hybridTelemetry, hybridErr) + return items, hybridErr } if s.cfg.RAGEnabled && s.ragRuntime != nil && strings.TrimSpace(req.Query) != "" { items, ragErr := s.retrieveByRAG(ctx, req, effectiveSetting, limit, now) if ragErr == nil && len(items) > 0 { + telemetry.SemanticHitCount = len(items) + telemetry.FinalCount = len(items) + s.recordRetrieve(ctx, req, telemetry, nil) return items, nil } + telemetry.Degraded = true + telemetry.RAGFallbackUsed = true } - return s.retrieveByLegacy(ctx, req, limit, now, effectiveSetting) + items, legacyErr := s.retrieveByLegacy(ctx, req, limit, now, effectiveSetting) + telemetry.LegacyHitCount = len(items) + telemetry.FinalCount = len(items) + s.recordRetrieve(ctx, req, telemetry, legacyErr) + return items, legacyErr } func (s *ReadService) retrieveByLegacy( @@ -180,6 +229,58 @@ func normalizeRetrieveMemoryTypes(raw []string) []string { } } +func (s *ReadService) recordRetrieve( + ctx context.Context, + req memorymodel.RetrieveRequest, + telemetry retrieveTelemetry, + err error, +) { + if s == nil { + return + } + + level := memoryobserve.LevelInfo + if err != nil { + level = memoryobserve.LevelWarn + } + s.observer.Observe(ctx, memoryobserve.Event{ + Level: level, + Component: memoryobserve.ComponentRead, + Operation: memoryobserve.OperationRetrieve, + Fields: map[string]any{ + "user_id": req.UserID, + "read_mode": telemetry.ReadMode, + "query_len": telemetry.QueryLen, + "legacy_hit_count": telemetry.LegacyHitCount, + "pinned_hit_count": telemetry.PinnedHitCount, + "semantic_hit_count": telemetry.SemanticHitCount, + "dedup_drop_count": telemetry.DedupDropCount, + "final_count": telemetry.FinalCount, + "degraded": telemetry.Degraded, + "rag_fallback_used": telemetry.RAGFallbackUsed, + "success": err == nil, + "error": err, + "error_code": memoryobserve.ClassifyError(err), + }, + }) + + if telemetry.FinalCount > 0 { + s.metrics.AddCounter(memoryobserve.MetricRetrieveHitTotal, int64(telemetry.FinalCount), map[string]string{ + "read_mode": strings.TrimSpace(telemetry.ReadMode), + }) + } + if telemetry.DedupDropCount > 0 { + s.metrics.AddCounter(memoryobserve.MetricRetrieveDedupDropTotal, int64(telemetry.DedupDropCount), map[string]string{ + "read_mode": strings.TrimSpace(telemetry.ReadMode), + }) + } + if telemetry.RAGFallbackUsed { + s.metrics.AddCounter(memoryobserve.MetricRAGFallbackTotal, 1, map[string]string{ + "read_mode": strings.TrimSpace(telemetry.ReadMode), + }) + } +} + // scoreRetrievedItem 计算 legacy 读链路的确定性排序分数。 // // 说明: diff --git a/backend/memory/service/retrieve_merge.go b/backend/memory/service/retrieve_merge.go index 4cd53c5..6e7f550 100644 --- a/backend/memory/service/retrieve_merge.go +++ b/backend/memory/service/retrieve_merge.go @@ -23,41 +23,50 @@ func (s *ReadService) HybridRetrieve( effectiveSetting model.MemoryUserSetting, limit int, now time.Time, -) ([]memorymodel.ItemDTO, error) { +) ([]memorymodel.ItemDTO, retrieveTelemetry, error) { + telemetry := retrieveTelemetry{} if s == nil || s.itemRepo == nil { - return nil, nil + return nil, telemetry, nil } if !effectiveSetting.MemoryEnabled { - return nil, nil + return nil, telemetry, nil } pinnedItems, err := s.retrievePinnedCandidates(ctx, req, effectiveSetting, now) if err != nil { - return nil, err + return nil, telemetry, err } - semanticItems, err := s.retrieveSemanticCandidates(ctx, req, effectiveSetting, limit, now) + telemetry.PinnedHitCount = len(pinnedItems) + + semanticItems, semanticTelemetry, err := s.retrieveSemanticCandidates(ctx, req, effectiveSetting, limit, now) if err != nil { - return nil, err + return nil, telemetry, err } + telemetry.SemanticHitCount = len(semanticItems) + telemetry.Degraded = semanticTelemetry.Degraded + telemetry.RAGFallbackUsed = semanticTelemetry.RAGFallbackUsed merged := make([]memorymodel.ItemDTO, 0, len(pinnedItems)+len(semanticItems)) merged = append(merged, pinnedItems...) merged = append(merged, semanticItems...) if len(merged) == 0 { - return nil, nil + return nil, telemetry, nil } + beforeDedupCount := len(merged) merged = dedupByID(merged) merged = dedupByHash(merged) merged = dedupByText(merged) + telemetry.DedupDropCount = beforeDedupCount - len(merged) merged = RankItems(merged, now) merged = applyTypeBudget(merged, s.cfg) if len(merged) == 0 { - return nil, nil + return nil, telemetry, nil } + telemetry.FinalCount = len(merged) _ = s.itemRepo.TouchLastAccessAt(ctx, collectItemDTOIDs(merged), now) - return merged, nil + return merged, telemetry, nil } func (s *ReadService) retrievePinnedCandidates( @@ -81,20 +90,26 @@ func (s *ReadService) retrieveSemanticCandidates( effectiveSetting model.MemoryUserSetting, limit int, now time.Time, -) ([]memorymodel.ItemDTO, error) { +) ([]memorymodel.ItemDTO, semanticRetrieveTelemetry, error) { + telemetry := semanticRetrieveTelemetry{} queryText := strings.TrimSpace(req.Query) if queryText == "" { - return nil, nil + return nil, telemetry, nil } candidateLimit := hybridSemanticTopK(s.cfg, limit) if s.cfg.RAGEnabled && s.ragRuntime != nil { items, err := s.retrieveSemanticCandidatesByRAG(ctx, req, effectiveSetting, candidateLimit, now) if shouldReturnSemanticRAGResult(items, err) { - return items, nil + telemetry.HitCount = len(items) + return items, telemetry, nil } + telemetry.Degraded = true + telemetry.RAGFallbackUsed = true } - return s.retrieveSemanticCandidatesByMySQL(ctx, req, effectiveSetting, candidateLimit, now) + items, err := s.retrieveSemanticCandidatesByMySQL(ctx, req, effectiveSetting, candidateLimit, now) + telemetry.HitCount = len(items) + return items, telemetry, err } func (s *ReadService) retrieveSemanticCandidatesByRAG( diff --git a/backend/memory/utils/audit.go b/backend/memory/utils/audit.go index a711c05..2f1cdd1 100644 --- a/backend/memory/utils/audit.go +++ b/backend/memory/utils/audit.go @@ -12,8 +12,12 @@ const ( AuditOperationCreate = "create" // AuditOperationUpdate 表示决策层更新已有记忆的内容。 AuditOperationUpdate = "update" + // AuditOperationArchive 表示治理层把重复记忆归档。 + AuditOperationArchive = "archive" // AuditOperationDelete 表示对已有记忆做软删除。 AuditOperationDelete = "delete" + // AuditOperationRestore 表示把已删除/归档记忆恢复为 active。 + AuditOperationRestore = "restore" ) // BuildItemAuditLog 构造记忆变更审计日志。 diff --git a/backend/memory/vectorsync/syncer.go b/backend/memory/vectorsync/syncer.go new file mode 100644 index 0000000..29bce7d --- /dev/null +++ b/backend/memory/vectorsync/syncer.go @@ -0,0 +1,213 @@ +package vectorsync + +import ( + "context" + "fmt" + "log" + "strings" + + infrarag "github.com/LoveLosita/smartflow/backend/infra/rag" + memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" + memoryrepo "github.com/LoveLosita/smartflow/backend/memory/repo" + "github.com/LoveLosita/smartflow/backend/model" +) + +// Syncer 负责 memory_items 与向量库之间的最小桥接。 +// +// 职责边界: +// 1. 只负责“把已经落库的记忆同步到 RAG / 从 RAG 删除”; +// 2. 不负责决定哪些记忆该写、该删、该恢复,这些决策仍由上游 service/worker/cleanup 控制; +// 3. 同步失败时只回写 vector_status 并打观测,不反向回滚业务事务,避免把在线链路拖成强依赖。 +type Syncer struct { + ragRuntime infrarag.Runtime + itemRepo *memoryrepo.ItemRepo + observer memoryobserve.Observer + metrics memoryobserve.MetricsRecorder + logger *log.Logger +} + +func NewSyncer( + ragRuntime infrarag.Runtime, + itemRepo *memoryrepo.ItemRepo, + observer memoryobserve.Observer, + metrics memoryobserve.MetricsRecorder, +) *Syncer { + if observer == nil { + observer = memoryobserve.NewNopObserver() + } + if metrics == nil { + metrics = memoryobserve.NewNopMetrics() + } + return &Syncer{ + ragRuntime: ragRuntime, + itemRepo: itemRepo, + observer: observer, + metrics: metrics, + logger: log.Default(), + } +} + +// Upsert 把新增/修改/恢复后的记忆同步到向量库。 +func (s *Syncer) Upsert(ctx context.Context, traceID string, items []model.MemoryItem) { + if s == nil || s.ragRuntime == nil || s.itemRepo == nil || len(items) == 0 { + return + } + + requestItems := make([]infrarag.MemoryIngestItem, 0, len(items)) + for _, item := range items { + requestItems = append(requestItems, infrarag.MemoryIngestItem{ + MemoryID: item.ID, + UserID: item.UserID, + ConversationID: strValue(item.ConversationID), + AssistantID: strValue(item.AssistantID), + RunID: strValue(item.RunID), + MemoryType: item.MemoryType, + Title: item.Title, + Content: item.Content, + Confidence: item.Confidence, + Importance: item.Importance, + SensitivityLevel: item.SensitivityLevel, + IsExplicit: item.IsExplicit, + Status: item.Status, + TTLAt: item.TTLAt, + CreatedAt: item.CreatedAt, + }) + } + + result, err := s.ragRuntime.IngestMemory(memoryobserve.WithFields(ctx, map[string]any{ + "trace_id": traceID, + }), infrarag.MemoryIngestRequest{ + TraceID: traceID, + Action: "add", + Items: requestItems, + }) + if err != nil { + s.observer.Observe(ctx, memoryobserve.Event{ + Level: memoryobserve.LevelWarn, + Component: memoryobserve.ComponentWrite, + Operation: "vector_upsert", + Fields: map[string]any{ + "trace_id": traceID, + "item_count": len(items), + "success": false, + "error": err, + "error_code": memoryobserve.ClassifyError(err), + }, + }) + for _, item := range items { + _ = s.itemRepo.UpdateVectorStateByID(ctx, item.ID, "failed", nil) + } + return + } + + vectorIDMap := make(map[int64]string, len(result.DocumentIDs)) + for _, documentID := range result.DocumentIDs { + memoryID := parseMemoryID(documentID) + if memoryID <= 0 { + continue + } + vectorIDMap[memoryID] = documentID + } + + for _, item := range items { + vectorID := strPtrOrNil(vectorIDMap[item.ID]) + _ = s.itemRepo.UpdateVectorStateByID(ctx, item.ID, "synced", vectorID) + } + s.observer.Observe(ctx, memoryobserve.Event{ + Level: memoryobserve.LevelInfo, + Component: memoryobserve.ComponentWrite, + Operation: "vector_upsert", + Fields: map[string]any{ + "trace_id": traceID, + "item_count": len(items), + "document_count": len(result.DocumentIDs), + "success": true, + }, + }) +} + +// Delete 把一批记忆对应的向量从向量库中删除。 +func (s *Syncer) Delete(ctx context.Context, traceID string, memoryIDs []int64) { + if s == nil || len(memoryIDs) == 0 { + return + } + if s.ragRuntime == nil || s.itemRepo == nil { + return + } + + documentIDs := make([]string, 0, len(memoryIDs)) + for _, id := range memoryIDs { + documentIDs = append(documentIDs, fmt.Sprintf("memory:%d", id)) + } + + err := s.ragRuntime.DeleteMemory(memoryobserve.WithFields(ctx, map[string]any{ + "trace_id": traceID, + }), documentIDs) + if err != nil { + s.observer.Observe(ctx, memoryobserve.Event{ + Level: memoryobserve.LevelWarn, + Component: memoryobserve.ComponentWrite, + Operation: "vector_delete", + Fields: map[string]any{ + "trace_id": traceID, + "item_count": len(memoryIDs), + "success": false, + "error": err, + "error_code": memoryobserve.ClassifyError(err), + }, + }) + for _, memoryID := range memoryIDs { + _ = s.itemRepo.UpdateVectorStateByID(ctx, memoryID, "failed", nil) + } + return + } + + for _, memoryID := range memoryIDs { + _ = s.itemRepo.UpdateVectorStateByID(ctx, memoryID, "deleted", nil) + } + s.observer.Observe(ctx, memoryobserve.Event{ + Level: memoryobserve.LevelInfo, + Component: memoryobserve.ComponentWrite, + Operation: "vector_delete", + Fields: map[string]any{ + "trace_id": traceID, + "item_count": len(memoryIDs), + "success": true, + }, + }) +} + +func parseMemoryID(documentID string) int64 { + documentID = strings.TrimSpace(documentID) + if !strings.HasPrefix(documentID, "memory:") { + return 0 + } + raw := strings.TrimPrefix(documentID, "memory:") + if strings.HasPrefix(raw, "uid:") { + return 0 + } + var value int64 + for _, ch := range raw { + if ch < '0' || ch > '9' { + return 0 + } + value = value*10 + int64(ch-'0') + } + return value +} + +func strPtrOrNil(v string) *string { + v = strings.TrimSpace(v) + if v == "" { + return nil + } + value := v + return &value +} + +func strValue(v *string) string { + if v == nil { + return "" + } + return strings.TrimSpace(*v) +} diff --git a/backend/memory/worker/decision_flow.go b/backend/memory/worker/decision_flow.go index c9e9b06..9be964a 100644 --- a/backend/memory/worker/decision_flow.go +++ b/backend/memory/worker/decision_flow.go @@ -33,6 +33,11 @@ type factDecisionResult struct { Outcomes []*ApplyActionOutcome } +type candidateRecallResult struct { + Items []memorymodel.CandidateSnapshot + FallbackMode string +} + // executeDecisionFlow 在 worker 内编排"召回→逐对比对→汇总→执行"全流程。 // // 职责边界: @@ -116,6 +121,7 @@ func (r *Runner) executeDecisionForFact( } } if len(existing) > 0 { + r.recordDecisionObservation(ctx, job, payload, fact, 0, memorymodel.DecisionActionNone, "hash_exact", true, nil) result.Outcomes = append(result.Outcomes, &ApplyActionOutcome{ Action: memorymodel.DecisionActionNone, NeedsSync: false, @@ -124,7 +130,8 @@ func (r *Runner) executeDecisionForFact( } // Step 2: Milvus 语义召回(含降级)。 - candidates := r.recallCandidates(ctx, payload, fact) + recallResult := r.recallCandidates(ctx, payload, fact) + candidates := recallResult.Items // 打印召回候选详情,便于排查向量召回和阈值过滤效果。 if r.logger != nil { @@ -151,9 +158,11 @@ func (r *Runner) executeDecisionForFact( // Step 5: 校验 + 执行。 actionOutcome, err := ApplyFinalDecision(ctx, itemRepo, auditRepo, *decision, fact, job, payload) if err != nil { + r.recordDecisionObservation(ctx, job, payload, fact, len(candidates), decision.Action, recallResult.FallbackMode, false, err) return nil, fmt.Errorf("执行决策动作失败: %w", err) } result.Outcomes = append(result.Outcomes, actionOutcome) + r.recordDecisionObservation(ctx, job, payload, fact, len(candidates), decision.Action, recallResult.FallbackMode, true, nil) // Step 6: conflict (DELETE) 后需要补一个 ADD 写入新 fact。 // 原因:旧记忆矛盾需删除,但新事实本身仍然有效,必须写入。 @@ -180,7 +189,7 @@ func (r *Runner) recallCandidates( ctx context.Context, payload memorymodel.ExtractJobPayload, fact memorymodel.NormalizedFact, -) []memorymodel.CandidateSnapshot { +) candidateRecallResult { // 1. 优先使用 Milvus 向量语义召回。 if r.ragRuntime != nil { retrieveResult, err := r.ragRuntime.RetrieveMemory(ctx, infrarag.MemoryRetrieveRequest{ @@ -194,7 +203,10 @@ func (r *Runner) recallCandidates( if err == nil && len(retrieveResult.Items) > 0 { candidates := r.buildCandidatesFromRAG(retrieveResult.Items) if len(candidates) > 0 { - return candidates + return candidateRecallResult{ + Items: candidates, + FallbackMode: "rag", + } } // RAG 返回了结果但 DocumentID 全部解析失败,降级到 MySQL。 if r.logger != nil { @@ -204,10 +216,17 @@ func (r *Runner) recallCandidates( if err != nil && r.logger != nil { r.logger.Printf("[WARN][去重] Milvus 语义召回失败,降级到 MySQL: user_id=%d memory_type=%s topk=%d err=%v", payload.UserID, fact.MemoryType, r.cfg.DecisionCandidateTopK, err) } + return candidateRecallResult{ + Items: r.recallCandidatesFromMySQL(ctx, payload, fact), + FallbackMode: "rag_to_mysql", + } } // 2. 降级:按 user_id + memory_type + status=active 查最近 N 条。 - return r.recallCandidatesFromMySQL(ctx, payload, fact) + return candidateRecallResult{ + Items: r.recallCandidatesFromMySQL(ctx, payload, fact), + FallbackMode: "mysql_only", + } } // buildCandidatesFromRAG 从 RAG 检索结果构建候选快照列表。 diff --git a/backend/memory/worker/runner.go b/backend/memory/worker/runner.go index 46b8a23..3f29257 100644 --- a/backend/memory/worker/runner.go +++ b/backend/memory/worker/runner.go @@ -6,15 +6,16 @@ import ( "errors" "fmt" "log" - "strconv" "strings" "time" infrarag "github.com/LoveLosita/smartflow/backend/infra/rag" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" + memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" memoryorchestrator "github.com/LoveLosita/smartflow/backend/memory/orchestrator" memoryrepo "github.com/LoveLosita/smartflow/backend/memory/repo" memoryutils "github.com/LoveLosita/smartflow/backend/memory/utils" + memoryvectorsync "github.com/LoveLosita/smartflow/backend/memory/vectorsync" "github.com/LoveLosita/smartflow/backend/model" "gorm.io/gorm" ) @@ -42,6 +43,9 @@ type Runner struct { extractor Extractor ragRuntime infrarag.Runtime logger *log.Logger + vectorSyncer *memoryvectorsync.Syncer + observer memoryobserve.Observer + metrics memoryobserve.MetricsRecorder // 决策层依赖。 // 说明: @@ -62,7 +66,16 @@ func NewRunner( ragRuntime infrarag.Runtime, cfg memorymodel.Config, decisionOrchestrator *memoryorchestrator.LLMDecisionOrchestrator, + vectorSyncer *memoryvectorsync.Syncer, + observer memoryobserve.Observer, + metrics memoryobserve.MetricsRecorder, ) *Runner { + if observer == nil { + observer = memoryobserve.NewNopObserver() + } + if metrics == nil { + metrics = memoryobserve.NewNopMetrics() + } return &Runner{ db: db, jobRepo: jobRepo, @@ -72,6 +85,9 @@ func NewRunner( extractor: extractor, ragRuntime: ragRuntime, logger: log.Default(), + vectorSyncer: vectorSyncer, + observer: observer, + metrics: metrics, cfg: cfg, decisionOrchestrator: decisionOrchestrator, } @@ -96,6 +112,11 @@ func (r *Runner) RunOnce(ctx context.Context) (*RunOnceResult, error) { if job == nil { return &RunOnceResult{Claimed: false}, nil } + if job.RetryCount > 0 { + r.metrics.AddCounter(memoryobserve.MetricJobRetryTotal, 1, map[string]string{ + "job_type": strings.TrimSpace(job.JobType), + }) + } result := &RunOnceResult{ Claimed: true, @@ -110,21 +131,25 @@ func (r *Runner) RunOnce(ctx context.Context) (*RunOnceResult, error) { failReason := fmt.Sprintf("解析任务载荷失败: %v", err) _ = r.jobRepo.MarkFailed(ctx, job.ID, failReason) result.Status = model.MemoryJobStatusFailed + r.recordJobOutcome(ctx, job, nil, result.Status, false, err) return result, nil } // 3. 先读取用户记忆设置。总开关关闭时,任务直接成功结束,不再继续抽取和落库。 setting, err := r.settingsRepo.GetByUserID(ctx, payload.UserID) if err != nil { + r.recordJobOutcome(ctx, job, &payload, model.MemoryJobStatusFailed, false, err) return nil, err } effectiveSetting := memoryutils.EffectiveUserSetting(setting, payload.UserID) if !effectiveSetting.MemoryEnabled { if err = r.jobRepo.MarkSuccess(ctx, job.ID); err != nil { + r.recordJobOutcome(ctx, job, &payload, model.MemoryJobStatusFailed, false, err) return nil, err } result.Status = model.MemoryJobStatusSuccess r.logger.Printf("memory worker skipped by user setting: job_id=%d user_id=%d", job.ID, payload.UserID) + r.recordJobOutcome(ctx, job, &payload, result.Status, true, nil) return result, nil } @@ -134,26 +159,31 @@ func (r *Runner) RunOnce(ctx context.Context) (*RunOnceResult, error) { failReason := fmt.Sprintf("抽取执行失败: %v", extractErr) _ = r.jobRepo.MarkFailed(ctx, job.ID, failReason) result.Status = model.MemoryJobStatusFailed + r.recordJobOutcome(ctx, job, &payload, result.Status, false, extractErr) return result, nil } facts = memoryutils.FilterFactsBySetting(facts, effectiveSetting) if len(facts) == 0 { if err = r.jobRepo.MarkSuccess(ctx, job.ID); err != nil { + r.recordJobOutcome(ctx, job, &payload, model.MemoryJobStatusFailed, false, err) return nil, err } result.Status = model.MemoryJobStatusSuccess r.logger.Printf("memory worker run once noop: job_id=%d", job.ID) + r.recordJobOutcome(ctx, job, &payload, result.Status, true, nil) return result, nil } items := buildMemoryItems(job, payload, facts) if len(items) == 0 { if err = r.jobRepo.MarkSuccess(ctx, job.ID); err != nil { + r.recordJobOutcome(ctx, job, &payload, model.MemoryJobStatusFailed, false, err) return nil, err } result.Status = model.MemoryJobStatusSuccess r.logger.Printf("memory worker run once empty-after-normalize: job_id=%d", job.ID) + r.recordJobOutcome(ctx, job, &payload, result.Status, true, nil) return result, nil } @@ -169,16 +199,19 @@ func (r *Runner) RunOnce(ctx context.Context) (*RunOnceResult, error) { failReason := fmt.Sprintf("决策降级后记忆落库失败: %v", err) _ = r.jobRepo.MarkFailed(ctx, job.ID, failReason) result.Status = model.MemoryJobStatusFailed + r.recordJobOutcome(ctx, job, &payload, result.Status, false, err) return result, nil } result.Status = model.MemoryJobStatusSuccess result.Facts = len(items) r.syncMemoryVectors(ctx, items) + r.recordJobOutcome(ctx, job, &payload, result.Status, true, nil) return result, nil } // FallbackMode=drop:丢弃本轮抽取结果,直接标记 job 成功。 _ = r.jobRepo.MarkSuccess(ctx, job.ID) result.Status = model.MemoryJobStatusSuccess + r.recordJobOutcome(ctx, job, &payload, result.Status, true, nil) return result, nil } @@ -189,6 +222,7 @@ func (r *Runner) RunOnce(ctx context.Context) (*RunOnceResult, error) { r.syncVectorDeletes(ctx, outcome.VectorDeletes) r.logger.Printf("[去重] 决策流程完成: job_id=%d user_id=%d 新增=%d 更新=%d 删除=%d 跳过=%d", job.ID, payload.UserID, outcome.AddCount, outcome.UpdateCount, outcome.DeleteCount, outcome.NoneCount) + r.recordJobOutcome(ctx, job, &payload, result.Status, true, nil) return result, nil } @@ -197,6 +231,7 @@ func (r *Runner) RunOnce(ctx context.Context) (*RunOnceResult, error) { failReason := fmt.Sprintf("记忆落库失败: %v", err) _ = r.jobRepo.MarkFailed(ctx, job.ID, failReason) result.Status = model.MemoryJobStatusFailed + r.recordJobOutcome(ctx, job, &payload, result.Status, false, err) return result, nil } @@ -204,6 +239,7 @@ func (r *Runner) RunOnce(ctx context.Context) (*RunOnceResult, error) { result.Facts = len(items) r.syncMemoryVectors(ctx, items) r.logger.Printf("memory worker run once success: job_id=%d extracted_facts=%d", job.ID, len(items)) + r.recordJobOutcome(ctx, job, &payload, result.Status, true, nil) return result, nil } @@ -268,56 +304,10 @@ func buildMemoryItems(job *model.MemoryJob, payload memorymodel.ExtractJobPayloa } func (r *Runner) syncMemoryVectors(ctx context.Context, items []model.MemoryItem) { - if r == nil || r.ragRuntime == nil || r.itemRepo == nil || len(items) == 0 { + if r == nil || r.vectorSyncer == nil || len(items) == 0 { return } - - requestItems := make([]infrarag.MemoryIngestItem, 0, len(items)) - for _, item := range items { - requestItems = append(requestItems, infrarag.MemoryIngestItem{ - MemoryID: item.ID, - UserID: item.UserID, - ConversationID: strValue(item.ConversationID), - AssistantID: strValue(item.AssistantID), - RunID: strValue(item.RunID), - MemoryType: item.MemoryType, - Title: item.Title, - Content: item.Content, - Confidence: item.Confidence, - Importance: item.Importance, - SensitivityLevel: item.SensitivityLevel, - IsExplicit: item.IsExplicit, - Status: item.Status, - TTLAt: item.TTLAt, - CreatedAt: item.CreatedAt, - }) - } - - result, err := r.ragRuntime.IngestMemory(ctx, infrarag.MemoryIngestRequest{ - Action: "add", - Items: requestItems, - }) - if err != nil { - r.logger.Printf("[WARN][去重] 记忆向量同步失败: count=%d err=%v", len(items), err) - for _, item := range items { - _ = r.itemRepo.UpdateVectorStateByID(ctx, item.ID, "failed", nil) - } - return - } - - vectorIDMap := make(map[int64]string, len(result.DocumentIDs)) - for _, documentID := range result.DocumentIDs { - memoryID := parseMemoryID(documentID) - if memoryID <= 0 { - continue - } - vectorIDMap[memoryID] = documentID - } - - for _, item := range items { - vectorID := strPtrOrNil(vectorIDMap[item.ID]) - _ = r.itemRepo.UpdateVectorStateByID(ctx, item.ID, "synced", vectorID) - } + r.vectorSyncer.Upsert(ctx, "", items) } // syncVectorDeletes 处理决策层 DELETE 动作产出的向量清理需求。 @@ -327,33 +317,10 @@ func (r *Runner) syncMemoryVectors(ctx context.Context, items []model.MemoryItem // 2. 调 Runtime.DeleteMemory 真正从 Milvus 删除对应向量; // 3. 更新 MySQL vector_status 标记删除结果。 func (r *Runner) syncVectorDeletes(ctx context.Context, memoryIDs []int64) { - if r == nil || len(memoryIDs) == 0 { + if r == nil || r.vectorSyncer == nil || len(memoryIDs) == 0 { return } - - // 1. 构造 documentID 列表。 - documentIDs := make([]string, 0, len(memoryIDs)) - for _, id := range memoryIDs { - documentIDs = append(documentIDs, fmt.Sprintf("memory:%d", id)) - } - - // 2. 调 Runtime 删除向量。 - if r.ragRuntime != nil { - if err := r.ragRuntime.DeleteMemory(ctx, documentIDs); err != nil { - r.logger.Printf("[WARN][去重] Milvus 向量删除失败,标记为 pending 等待后续清理: count=%d ids=%v err=%v", len(memoryIDs), memoryIDs, err) - } else { - r.logger.Printf("[去重] Milvus 向量删除完成: count=%d ids=%v", len(memoryIDs), memoryIDs) - } - } - - // 3. 更新 MySQL vector_status。 - for _, memoryID := range memoryIDs { - if updateErr := r.itemRepo.UpdateVectorStateByID(ctx, memoryID, "deleted", nil); updateErr != nil { - if r.logger != nil { - r.logger.Printf("[WARN] 向量状态更新失败: memory_id=%d err=%v", memoryID, updateErr) - } - } - } + r.vectorSyncer.Delete(ctx, "", memoryIDs) } func resolveMemoryTTLAt(base time.Time, memoryType string) *time.Time { @@ -395,11 +362,106 @@ func int64PtrOrNil(v int64) *int64 { return &value } -func strValue(v *string) string { - if v == nil { - return "" +func (r *Runner) recordJobOutcome( + ctx context.Context, + job *model.MemoryJob, + payload *memorymodel.ExtractJobPayload, + status string, + success bool, + err error, +) { + if r == nil { + return } - return strings.TrimSpace(*v) + + level := memoryobserve.LevelInfo + if !success || err != nil { + level = memoryobserve.LevelWarn + } + fields := map[string]any{ + "job_id": jobIDValue(job), + "status": strings.TrimSpace(status), + "success": success && err == nil, + "error": err, + "error_code": memoryobserve.ClassifyError(err), + } + if payload != nil { + fields["trace_id"] = strings.TrimSpace(payload.TraceID) + fields["user_id"] = payload.UserID + fields["conversation_id"] = strings.TrimSpace(payload.ConversationID) + } + + r.observer.Observe(ctx, memoryobserve.Event{ + Level: level, + Component: memoryobserve.ComponentWrite, + Operation: "job", + Fields: fields, + }) + r.metrics.AddCounter(memoryobserve.MetricJobTotal, 1, map[string]string{ + "status": strings.TrimSpace(status), + }) +} + +func (r *Runner) recordDecisionObservation( + ctx context.Context, + job *model.MemoryJob, + payload memorymodel.ExtractJobPayload, + fact memorymodel.NormalizedFact, + candidateCount int, + finalAction string, + fallbackMode string, + success bool, + err error, +) { + if r == nil { + return + } + + level := memoryobserve.LevelInfo + status := "success" + if !success || err != nil { + level = memoryobserve.LevelWarn + status = "error" + } + fallbackMode = strings.TrimSpace(fallbackMode) + if fallbackMode == "" { + fallbackMode = "none" + } + + r.observer.Observe(ctx, memoryobserve.Event{ + Level: level, + Component: memoryobserve.ComponentWrite, + Operation: memoryobserve.OperationDecision, + Fields: map[string]any{ + "trace_id": strings.TrimSpace(payload.TraceID), + "user_id": payload.UserID, + "conversation_id": strings.TrimSpace(payload.ConversationID), + "job_id": jobIDValue(job), + "fact_type": strings.TrimSpace(fact.MemoryType), + "candidate_count": candidateCount, + "final_action": strings.TrimSpace(finalAction), + "fallback_mode": fallbackMode, + "success": success && err == nil, + "error": err, + "error_code": memoryobserve.ClassifyError(err), + }, + }) + r.metrics.AddCounter(memoryobserve.MetricDecisionTotal, 1, map[string]string{ + "action": strings.TrimSpace(finalAction), + "status": status, + }) + if fallbackMode != "none" && fallbackMode != "hash_exact" && fallbackMode != "rag" { + r.metrics.AddCounter(memoryobserve.MetricDecisionFallbackTotal, 1, map[string]string{ + "mode": fallbackMode, + }) + } +} + +func jobIDValue(job *model.MemoryJob) int64 { + if job == nil { + return 0 + } + return job.ID } func parseMemoryID(documentID string) int64 { @@ -411,9 +473,13 @@ func parseMemoryID(documentID string) int64 { if strings.HasPrefix(raw, "uid:") { return 0 } - memoryID, err := strconv.ParseInt(raw, 10, 64) - if err != nil { - return 0 + + var value int64 + for _, ch := range raw { + if ch < '0' || ch > '9' { + return 0 + } + value = value*10 + int64(ch-'0') } - return memoryID + return value } diff --git a/backend/model/memory_manage.go b/backend/model/memory_manage.go new file mode 100644 index 0000000..a128d6d --- /dev/null +++ b/backend/model/memory_manage.go @@ -0,0 +1,105 @@ +package model + +import "time" + +// MemoryGetItemRequest 描述“查看我的某条记忆”所需的最小参数。 +type MemoryGetItemRequest struct { + UserID int + MemoryID int64 +} + +// MemoryCreateItemRequest 描述“手动新增一条记忆”的输入。 +type MemoryCreateItemRequest struct { + UserID int `json:"-"` + ConversationID string `json:"conversation_id,omitempty"` + AssistantID string `json:"assistant_id,omitempty"` + RunID string `json:"run_id,omitempty"` + MemoryType string `json:"memory_type"` + Title string `json:"title"` + Content string `json:"content"` + Confidence *float64 `json:"confidence,omitempty"` + Importance *float64 `json:"importance,omitempty"` + SensitivityLevel *int `json:"sensitivity_level,omitempty"` + IsExplicit *bool `json:"is_explicit,omitempty"` + TTLAt *time.Time `json:"ttl_at,omitempty"` + Reason string `json:"reason,omitempty"` + OperatorType string `json:"-"` +} + +// MemoryUpdateItemRequest 描述“手动修改一条记忆”的 Patch 输入。 +// +// 说明: +// 1. 使用指针区分“未传字段”和“显式传零值”; +// 2. ClearTTL 用于表达“显式清空 ttl_at”; +// 3. 当前仍只允许修改内容侧字段,不开放跨用户、跨归属字段改写。 +type MemoryUpdateItemRequest struct { + UserID int `json:"-"` + MemoryID int64 `json:"-"` + MemoryType *string `json:"memory_type,omitempty"` + Title *string `json:"title,omitempty"` + Content *string `json:"content,omitempty"` + Confidence *float64 `json:"confidence,omitempty"` + Importance *float64 `json:"importance,omitempty"` + SensitivityLevel *int `json:"sensitivity_level,omitempty"` + IsExplicit *bool `json:"is_explicit,omitempty"` + TTLAt *time.Time `json:"ttl_at,omitempty"` + ClearTTL bool `json:"clear_ttl,omitempty"` + Reason string `json:"reason,omitempty"` + OperatorType string `json:"-"` +} + +// MemoryDeleteItemRequest 描述“删除我的一条记忆”的输入。 +type MemoryDeleteItemRequest struct { + UserID int + MemoryID int64 + Reason string + OperatorType string +} + +// MemoryRestoreItemRequest 描述“恢复我的一条记忆”的输入。 +type MemoryRestoreItemRequest struct { + UserID int + MemoryID int64 + Reason string + OperatorType string +} + +// MemoryDedupCleanupRequest 描述离线去重治理任务的执行参数。 +type MemoryDedupCleanupRequest struct { + UserID int + Limit int + DryRun bool + Reason string + OperatorType string +} + +// MemoryDedupCleanupResult 描述一次离线去重治理的汇总结果。 +type MemoryDedupCleanupResult struct { + ScannedGroupCount int `json:"scanned_group_count"` + DedupedGroupCount int `json:"deduped_group_count"` + KeptCount int `json:"kept_count"` + ArchivedCount int `json:"archived_count"` + ArchivedIDs []int64 `json:"archived_ids,omitempty"` + DryRun bool `json:"dry_run"` +} + +// MemoryItemView 是前端可见的记忆条目视图。 +type MemoryItemView struct { + ID int64 `json:"id"` + UserID int `json:"user_id"` + ConversationID string `json:"conversation_id,omitempty"` + AssistantID string `json:"assistant_id,omitempty"` + RunID string `json:"run_id,omitempty"` + MemoryType string `json:"memory_type"` + Title string `json:"title"` + Content string `json:"content"` + ContentHash string `json:"content_hash,omitempty"` + Confidence float64 `json:"confidence"` + Importance float64 `json:"importance"` + SensitivityLevel int `json:"sensitivity_level"` + IsExplicit bool `json:"is_explicit"` + Status string `json:"status"` + TTLAt *time.Time `json:"ttl_at,omitempty"` + CreatedAt *time.Time `json:"created_at,omitempty"` + UpdatedAt *time.Time `json:"updated_at,omitempty"` +} diff --git a/backend/respond/respond.go b/backend/respond/respond.go index a17bb7b..df2f844 100644 --- a/backend/respond/respond.go +++ b/backend/respond/respond.go @@ -339,6 +339,21 @@ var ( //请求相关的响应 Info: "conversation_id is required when confirm_action is present", } + MemoryItemNotFound = Response{ //记忆条目不存在 + Status: "40055", + Info: "memory item not found", + } + + MemoryInvalidType = Response{ //记忆类型不合法 + Status: "40056", + Info: "invalid memory type", + } + + MemoryInvalidContent = Response{ //记忆内容为空或不合法 + Status: "40057", + Info: "invalid memory content", + } + RouteControlInternalError = Response{ //路由控制码内部错误 Status: "50001", Info: "route control failed", diff --git a/backend/routers/routers.go b/backend/routers/routers.go index 230fbb1..520bf3f 100644 --- a/backend/routers/routers.go +++ b/backend/routers/routers.go @@ -97,6 +97,16 @@ func RegisterRouters(handlers *api.ApiHandlers, cache *dao.CacheDAO, userRepo *d agentGroup.GET("/schedule-preview", handlers.AgentHandler.GetSchedulePlanPreview) agentGroup.GET("/context-stats", handlers.AgentHandler.GetContextStats) } + memoryGroup := apiGroup.Group("/memory") + { + memoryGroup.Use(middleware.JWTTokenAuth(cache), middleware.RateLimitMiddleware(limiter, 20, 1)) + memoryGroup.GET("/items", handlers.MemoryHandler.ListItems) + memoryGroup.GET("/items/:id", handlers.MemoryHandler.GetItem) + memoryGroup.POST("/items", middleware.IdempotencyMiddleware(cache), handlers.MemoryHandler.CreateItem) + memoryGroup.PATCH("/items/:id", middleware.IdempotencyMiddleware(cache), handlers.MemoryHandler.UpdateItem) + memoryGroup.DELETE("/items/:id", middleware.IdempotencyMiddleware(cache), handlers.MemoryHandler.DeleteItem) + memoryGroup.POST("/items/:id/restore", middleware.IdempotencyMiddleware(cache), handlers.MemoryHandler.RestoreItem) + } } // 初始化Gin引擎 log.Println("Routes setup completed") diff --git a/backend/service/agentsvc/agent.go b/backend/service/agentsvc/agent.go index 5331e7a..c777bc8 100644 --- a/backend/service/agentsvc/agent.go +++ b/backend/service/agentsvc/agent.go @@ -16,6 +16,7 @@ import ( outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" "github.com/LoveLosita/smartflow/backend/inits" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" + memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" "github.com/LoveLosita/smartflow/backend/model" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" @@ -60,6 +61,8 @@ type AgentService struct { compactionStore newagentmodel.CompactionStore memoryReader MemoryReader memoryCfg memorymodel.Config + memoryObserver memoryobserve.Observer + memoryMetrics memoryobserve.MetricsRecorder } // NewAgentService 构造 AgentService。 diff --git a/backend/service/agentsvc/agent_memory.go b/backend/service/agentsvc/agent_memory.go index 2a87317..8179777 100644 --- a/backend/service/agentsvc/agent_memory.go +++ b/backend/service/agentsvc/agent_memory.go @@ -7,6 +7,7 @@ import ( "time" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" + memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" ) @@ -27,10 +28,21 @@ type MemoryReader interface { Retrieve(ctx context.Context, req memorymodel.RetrieveRequest) ([]memorymodel.ItemDTO, error) } +type memoryObserveProvider interface { + MemoryObserver() memoryobserve.Observer + MemoryMetrics() memoryobserve.MetricsRecorder +} + // SetMemoryReader 注入 newAgent 主链路读取记忆所需的薄接口与渲染配置。 func (s *AgentService) SetMemoryReader(reader MemoryReader, cfg memorymodel.Config) { s.memoryReader = reader s.memoryCfg = cfg + s.memoryObserver = memoryobserve.NewNopObserver() + s.memoryMetrics = memoryobserve.NewNopMetrics() + if provider, ok := reader.(memoryObserveProvider); ok { + s.memoryObserver = provider.MemoryObserver() + s.memoryMetrics = provider.MemoryMetrics() + } } // injectMemoryContext 在 graph 执行前,把本轮相关记忆写入 ConversationContext 的 pinned block。 @@ -64,6 +76,7 @@ func (s *AgentService) injectMemoryContext( }) if err != nil { conversationContext.RemovePinnedBlock(newAgentMemoryBlockKey) + s.recordMemoryInject(ctx, userID, 0, false, err) log.Printf("读取记忆上下文失败 user=%d chat=%s err=%v", userID, chatID, err) return } @@ -71,6 +84,7 @@ func (s *AgentService) injectMemoryContext( content := renderMemoryPinnedContentByMode(items, s.memoryCfg.EffectiveInjectRenderMode()) if content == "" { conversationContext.RemovePinnedBlock(newAgentMemoryBlockKey) + s.recordMemoryInject(ctx, userID, len(items), false, nil) return } @@ -79,6 +93,7 @@ func (s *AgentService) injectMemoryContext( Title: newAgentMemoryBlockTitle, Content: content, }) + s.recordMemoryInject(ctx, userID, len(items), true, nil) } // shouldInjectMemoryForInput 判断当前输入是否值得触发一次记忆召回。 @@ -100,3 +115,49 @@ func shouldInjectMemoryForInput(userMessage string) bool { return true } } + +func (s *AgentService) recordMemoryInject( + ctx context.Context, + userID int, + inputCount int, + success bool, + err error, +) { + if s == nil { + return + } + observer := s.memoryObserver + if observer == nil { + observer = memoryobserve.NewNopObserver() + } + metrics := s.memoryMetrics + if metrics == nil { + metrics = memoryobserve.NewNopMetrics() + } + + level := memoryobserve.LevelInfo + if err != nil { + level = memoryobserve.LevelWarn + } + observer.Observe(ctx, memoryobserve.Event{ + Level: level, + Component: memoryobserve.ComponentInject, + Operation: memoryobserve.OperationInject, + Fields: map[string]any{ + "user_id": userID, + "inject_mode": s.memoryCfg.EffectiveInjectRenderMode(), + "input_count": inputCount, + "rendered_count": inputCount, + "token_budget": 0, + "fallback": false, + "success": success && err == nil, + "error": err, + "error_code": memoryobserve.ClassifyError(err), + }, + }) + if inputCount > 0 { + metrics.AddCounter(memoryobserve.MetricInjectItemTotal, int64(inputCount), map[string]string{ + "inject_mode": s.memoryCfg.EffectiveInjectRenderMode(), + }) + } +}