From e1819c565378b3e54a148c2cdae985fa0d4152d0 Mon Sep 17 00:00:00 2001 From: Losita <2810873701@qq.com> Date: Tue, 5 May 2026 13:52:49 +0800 Subject: [PATCH] Version: 0.9.74.dev.260505 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 后端: 1.阶段 6 memory 服务化 CP1-CP3 落地 - 新增 cmd/memory 独立进程入口,落地 services/memory dao/rpc/sv 与 memory zrpc pb - 将 memory.extract.requested outbox 消费与 memory worker 迁入 cmd/memory,单体 worker 不再消费 memory outbox - 新增 gateway/client/memory、shared/contracts/memory 和 shared/ports memory port - 将 /api/v1/memory/items* HTTP 管理面切到 memory zrpc,gateway 只保留鉴权、限流、幂等、参数绑定和响应透传 - 新增 memory Retrieve RPC,并将 agent 主链路 memory reader 切到 memory zrpc 读取 - 补充 agent memory RPC reader 适配器,保留注入侧 observer / metrics 观测能力 - 保留旧 backend/memory 核心实现作为迁移期复用与回退面,cmd/memory 内部继续复用既有 Module / ReadService 逻辑 - 补充 memory.rpc 示例配置,更新单体 outbox 发布边界与 memory handler 注释口径 --- backend/cmd/memory/main.go | 138 ++++++++ backend/cmd/start.go | 92 ++++-- backend/config.example.yaml | 5 + backend/gateway/api/memory.go | 93 ++---- backend/gateway/client/memory/client.go | 155 +++++++++ backend/gateway/client/memory/errors.go | 94 ++++++ backend/service/agentsvc/memory_rpc_reader.go | 121 +++++++ .../service/events/core_outbox_handlers.go | 26 +- .../events/memory_extract_requested.go | 10 + backend/services/memory/dao/connect.go | 89 ++++++ backend/services/memory/rpc/errors.go | 74 +++++ backend/services/memory/rpc/handler.go | 136 ++++++++ backend/services/memory/rpc/memory.proto | 27 ++ backend/services/memory/rpc/pb/memory.pb.go | 39 +++ .../services/memory/rpc/pb/memory_grpc.pb.go | 181 +++++++++++ backend/services/memory/rpc/server.go | 60 ++++ backend/services/memory/sv/service.go | 297 ++++++++++++++++++ backend/shared/contracts/memory/types.go | 128 ++++++++ backend/shared/ports/memory.go | 33 ++ 19 files changed, 1688 insertions(+), 110 deletions(-) create mode 100644 backend/cmd/memory/main.go create mode 100644 backend/gateway/client/memory/client.go create mode 100644 backend/gateway/client/memory/errors.go create mode 100644 backend/service/agentsvc/memory_rpc_reader.go create mode 100644 backend/services/memory/dao/connect.go create mode 100644 backend/services/memory/rpc/errors.go create mode 100644 backend/services/memory/rpc/handler.go create mode 100644 backend/services/memory/rpc/memory.proto create mode 100644 backend/services/memory/rpc/pb/memory.pb.go create mode 100644 backend/services/memory/rpc/pb/memory_grpc.pb.go create mode 100644 backend/services/memory/rpc/server.go create mode 100644 backend/services/memory/sv/service.go create mode 100644 backend/shared/contracts/memory/types.go create mode 100644 backend/shared/ports/memory.go diff --git a/backend/cmd/memory/main.go b/backend/cmd/memory/main.go new file mode 100644 index 0000000..bd96563 --- /dev/null +++ b/backend/cmd/memory/main.go @@ -0,0 +1,138 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + "github.com/LoveLosita/smartflow/backend/bootstrap" + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + "github.com/LoveLosita/smartflow/backend/inits" + memorymodule "github.com/LoveLosita/smartflow/backend/memory" + memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" + memorydao "github.com/LoveLosita/smartflow/backend/services/memory/dao" + memoryrpc "github.com/LoveLosita/smartflow/backend/services/memory/rpc" + memorysv "github.com/LoveLosita/smartflow/backend/services/memory/sv" + ragservice "github.com/LoveLosita/smartflow/backend/services/rag" + ragconfig "github.com/LoveLosita/smartflow/backend/services/rag/config" + "github.com/spf13/viper" +) + +func main() { + if err := bootstrap.LoadConfig(); err != nil { + log.Fatalf("failed to load config: %v", err) + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + db, err := memorydao.OpenDBFromConfig() + if err != nil { + log.Fatalf("failed to connect memory database: %v", err) + } + + llmClient, err := buildMemoryLLMClient() + if err != nil { + log.Fatalf("failed to initialize memory LLM client: %v", err) + } + + ragRuntime, err := buildMemoryRAGRuntime(ctx) + if err != nil { + log.Fatalf("failed to initialize memory RAG runtime: %v", err) + } + + memoryCfg := memorymodule.LoadConfigFromViper() + memoryObserver := memoryobserve.NewLoggerObserver(log.Default()) + memoryMetrics := memoryobserve.NewMetricsRegistry() + module := memorymodule.NewModuleWithObserve( + db, + llmClient, + ragRuntime, + memoryCfg, + memorymodule.ObserveDeps{ + Observer: memoryObserver, + Metrics: memoryMetrics, + }, + ) + + outboxRepo := outboxinfra.NewRepository(db) + svc, err := memorysv.NewService(memorysv.Options{ + Module: module, + OutboxRepo: outboxRepo, + KafkaConfig: kafkabus.LoadConfig(), + }) + if err != nil { + log.Fatalf("failed to initialize memory service: %v", err) + } + defer svc.Close() + + svc.StartWorkers(ctx) + + server, listenOn, err := memoryrpc.NewServer(memoryrpc.ServerOptions{ + ListenOn: viper.GetString("memory.rpc.listenOn"), + Timeout: viper.GetDuration("memory.rpc.timeout"), + Service: svc, + }) + if err != nil { + log.Fatalf("failed to build memory zrpc server: %v", err) + } + defer server.Stop() + + go func() { + log.Printf("memory zrpc service starting on %s", listenOn) + server.Start() + }() + + <-ctx.Done() + log.Println("memory service stopping") +} + +// buildMemoryLLMClient 初始化 memory 抽取链路使用的模型客户端。 +// +// 说明: +// 1. CP1 先复用既有 llm-service canonical 入口,不在 memory 服务里重建模型调用封装; +// 2. 当前启动入口与 cmd/start.go / cmd/active-scheduler 都需要 Eino 初始化,后续若出现第三处重复装配,应抽公共 bootstrap; +// 3. 返回 ProClient 是因为现有 memory.Module 只需要 llmservice.Client,不需要完整 Service。 +func buildMemoryLLMClient() (*llmservice.Client, error) { + aiHub, err := inits.InitEino() + if err != nil { + return nil, err + } + llmService := llmservice.New(llmservice.Options{ + AIHub: aiHub, + APIKey: os.Getenv("ARK_API_KEY"), + BaseURL: viper.GetString("agent.baseURL"), + CourseVisionModel: viper.GetString("courseImport.visionModel"), + }) + return llmService.ProClient(), nil +} + +// buildMemoryRAGRuntime 初始化 memory 检索与向量同步使用的 RAG Runtime。 +// +// 暂不抽公共层原因: +// 1. 本轮只迁 memory 一个能力域,避免同时调整 cmd/start.go 的既有装配路径; +// 2. RAG 的 canonical 入口已在 services/rag 内,当前函数只做启动层配置读取与日志包装; +// 3. 等 agent 服务也迁出后,再统一评估 llm/rag 启动装配的公共 bootstrap。 +func buildMemoryRAGRuntime(ctx context.Context) (ragservice.Runtime, error) { + ragCfg := ragconfig.LoadFromViper() + if !ragCfg.Enabled { + log.Println("RAG service is disabled for memory") + return nil, nil + } + + ragLogger := log.Default() + ragService, err := ragservice.NewFromConfig(ctx, ragCfg, ragservice.FactoryDeps{ + Logger: ragLogger, + Observer: ragservice.NewLoggerObserver(ragLogger), + }) + if err != nil { + return nil, fmt.Errorf("build memory RAG service failed: %w", err) + } + log.Printf("Memory RAG runtime initialized: store=%s embed=%s reranker=%s", ragCfg.Store, ragCfg.EmbedProvider, ragCfg.RerankerProvider) + return ragService.Runtime(), nil +} diff --git a/backend/cmd/start.go b/backend/cmd/start.go index 54b2bfd..1126a58 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -16,6 +16,7 @@ import ( "github.com/LoveLosita/smartflow/backend/gateway/api" gatewayactivescheduler "github.com/LoveLosita/smartflow/backend/gateway/client/activescheduler" gatewaycourse "github.com/LoveLosita/smartflow/backend/gateway/client/course" + gatewaymemory "github.com/LoveLosita/smartflow/backend/gateway/client/memory" gatewaynotification "github.com/LoveLosita/smartflow/backend/gateway/client/notification" gatewayschedule "github.com/LoveLosita/smartflow/backend/gateway/client/schedule" gatewaytask "github.com/LoveLosita/smartflow/backend/gateway/client/task" @@ -114,7 +115,7 @@ func StartAPI() { } // StartWorker 只启动后台异步能力,不注册 Gin 路由。 -// 当前包含单体残留域 outbox relay / Kafka consumer / memory worker;主动调度扫描已迁到 cmd/active-scheduler。 +// 当前只包含单体残留域 agent outbox relay / Kafka consumer;memory worker 已迁到 cmd/memory。 func StartWorker() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() @@ -202,10 +203,11 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { agentRepo := dao.NewAgentDAO(db) outboxRepo := outboxinfra.NewRepository(db) - eventBus, err := buildEventBus(outboxRepo) + eventBus, err := buildAgentEventBus(outboxRepo) if err != nil { return nil, err } + eventPublisher := buildCoreOutboxPublisher(outboxRepo) // Service 层初始化。 userAuthClient, err := gatewayuserauth.NewClient(gatewayuserauth.ClientConfig{ @@ -257,6 +259,14 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { if err != nil { return nil, fmt.Errorf("failed to initialize course zrpc client: %w", err) } + memoryClient, err := gatewaymemory.NewClient(gatewaymemory.ClientConfig{ + Endpoints: viper.GetStringSlice("memory.rpc.endpoints"), + Target: viper.GetString("memory.rpc.target"), + Timeout: viper.GetDuration("memory.rpc.timeout"), + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize memory zrpc client: %w", err) + } activeSchedulerClient, err := gatewayactivescheduler.NewClient(gatewayactivescheduler.ClientConfig{ Endpoints: viper.GetStringSlice("activeScheduler.rpc.endpoints"), Target: viper.GetString("activeScheduler.rpc.target"), @@ -280,7 +290,7 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { agentCacheRepo, manager.ActiveSchedule, manager.ActiveScheduleSession, - eventBus, + eventPublisher, scheduleService, taskSv, ) @@ -293,8 +303,10 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { taskRepo, taskClassRepo, scheduleRepo, - memoryModule, + memoryClient, memoryCfg, + memoryObserver, + memoryMetrics, ) // 1. task_pool facts 已统一走 task RPC,避免聊天 rerun 继续直连 tasks 表; @@ -333,7 +345,7 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { return nil, err } agentService.SetActiveScheduleSessionRerunFunc(buildActiveScheduleSessionRerunFunc(manager.ActiveSchedule, activeScheduleGraphRunner, activeSchedulePreviewConfirm, activeScheduleFeedbackLocator)) - handlers := buildAPIHandlers(taskClient, taskClassClient, courseClient, scheduleClient, agentService, memoryModule, activeSchedulerClient, notificationClient) + handlers := buildAPIHandlers(taskClient, taskClassClient, courseClient, scheduleClient, agentService, memoryClient, activeSchedulerClient, notificationClient) runtime := &appRuntime{ db: db, @@ -380,21 +392,19 @@ func buildRAGService(ctx context.Context) (*ragservice.Service, error) { return ragService, nil } -func buildEventBus(outboxRepo *outboxinfra.Repository) (eventsvc.OutboxBus, error) { - // outbox 多 service 门面装配: - // 1. 按 service 维度创建独立 engine,topic / group 由 service 名称推导; - // 2. 对外仍然只暴露一个 Publish / Start / Close 门面; - // 3. kafka.enabled=false 时返回 nil,业务按既有降级策略执行。 +func buildAgentEventBus(outboxRepo *outboxinfra.Repository) (eventsvc.OutboxBus, error) { + // agent outbox 消费边界装配: + // 1. 单体残留在 CP1 后只消费 agent 自己的 outbox; + // 2. memory.extract.requested 仍可被发布到 memory_outbox_messages,但消费与 worker 已迁往 cmd/memory; + // 3. kafka.enabled=false 时返回 nil,业务按既有同步降级策略执行。 kafkaCfg := kafkabus.LoadConfig() - serviceBuses := make(map[string]eventsvc.OutboxBus, len(eventsvc.OutboxServiceNames())) - for _, serviceName := range eventsvc.OutboxServiceNames() { - bus, err := eventsvc.NewServiceOutboxBus(outboxRepo, kafkaCfg, serviceName) - if err != nil { - return nil, fmt.Errorf("failed to initialize outbox event bus for service %s: %w", serviceName, err) - } - if bus != nil { - serviceBuses[serviceName] = bus - } + bus, err := eventsvc.NewServiceOutboxBus(outboxRepo, kafkaCfg, outboxinfra.ServiceAgent) + if err != nil { + return nil, fmt.Errorf("failed to initialize outbox event bus for service %s: %w", outboxinfra.ServiceAgent, err) + } + serviceBuses := make(map[string]eventsvc.OutboxBus, 1) + if bus != nil { + serviceBuses[outboxinfra.ServiceAgent] = bus } eventBus := eventsvc.NewRoutedOutboxBus(serviceBuses) @@ -404,6 +414,23 @@ func buildEventBus(outboxRepo *outboxinfra.Repository) (eventsvc.OutboxBus, erro return eventBus, nil } +// buildCoreOutboxPublisher 构造单体残留发布器。 +// +// 职责边界: +// 1. 只负责把 agent 主链路产生的跨服务事件写入对应服务 outbox 表; +// 2. 不创建 memory consumer / relay,memory 消费边界已迁往 cmd/memory; +// 3. kafka.enabled=false 时返回 nil,让聊天历史继续走同步 DB fallback。 +func buildCoreOutboxPublisher(outboxRepo *outboxinfra.Repository) outboxinfra.EventPublisher { + kafkaCfg := kafkabus.LoadConfig() + if !kafkaCfg.Enabled || outboxRepo == nil { + return nil + } + return &repositoryOutboxPublisher{ + repo: outboxRepo, + maxRetry: kafkaCfg.MaxRetry, + } +} + type repositoryOutboxPublisher struct { repo *outboxinfra.Repository maxRetry int @@ -429,12 +456,12 @@ func buildTaskOutboxPublisher(outboxRepo *outboxinfra.Repository) outboxinfra.Ev // Publish 以 publish-only 方式写入服务级 outbox。 // // 说明: -// 1. 这里不复用 outbox EventBus,是因为 EventBus 会创建并启动对应 service engine; -// 2. 单体残留只允许发布 task 事件,不允许启动 task consumer,否则会和 cmd/task 抢同一 consumer group; -// 3. payload 仍包装成统一 OutboxEventPayload,确保 cmd/task relay / consumer 能按标准协议解析。 +// 1. 这里不复用 outbox EventBus,是因为 EventBus 会创建并可能启动对应 service engine; +// 2. 单体残留在 task / memory 等迁移期只允许发布跨服务事件,不允许抢对应 consumer group; +// 3. payload 仍包装成统一 OutboxEventPayload,确保独立服务 relay / consumer 能按标准协议解析。 func (p *repositoryOutboxPublisher) Publish(ctx context.Context, req outboxinfra.PublishRequest) error { if p == nil || p.repo == nil { - return fmt.Errorf("task outbox publisher is not initialized") + return fmt.Errorf("outbox publisher is not initialized") } eventType := strings.TrimSpace(req.EventType) @@ -746,8 +773,10 @@ func configureAgentService( taskRepo *dao.TaskDAO, taskClassRepo *dao.TaskClassDAO, scheduleRepo *dao.ScheduleDAO, - memoryModule *memory.Module, + memoryReaderClient ports.MemoryReaderClient, memoryCfg memorymodel.Config, + memoryObserver memoryobserve.Observer, + memoryMetrics memoryobserve.MetricsRecorder, ) { if agentService == nil { return @@ -790,7 +819,11 @@ func configureAgentService( CreateTask: buildQuickTaskCreateFunc(taskRepo), QueryTasks: buildQuickTaskQueryFunc(agentService), }) - agentService.SetMemoryReader(memoryModule, memoryCfg) + // 1. agent 主链路读取记忆统一走 memory zrpc,避免 CP3 后继续直连本进程 memory.Module; + // 2. observer / metrics 继续复用启动期装配,保证注入侧观测在 RPC 切流后不丢; + // 3. 旧 memoryModule 仍保留在启动图中,作为迁移期依赖和后续回退面; + // 4. memory 服务暂不可用时,预取链路只记录警告并软降级,不阻断聊天主流程。 + agentService.SetMemoryReader(agentsvcsvc.NewMemoryRPCReader(memoryReaderClient, memoryObserver, memoryMetrics), memoryCfg) } func buildTaskClassUpsertFunc(taskClassRepo *dao.TaskClassDAO) func(userID int, input newagenttools.TaskClassUpsertInput) (newagenttools.TaskClassUpsertPersistResult, error) { @@ -926,7 +959,7 @@ func buildAPIHandlers( courseClient ports.CourseCommandClient, scheduleClient ports.ScheduleCommandClient, agentService *service.AgentService, - memoryModule *memory.Module, + memoryClient ports.MemoryCommandClient, activeSchedulerClient ports.ActiveSchedulerCommandClient, notificationClient ports.NotificationCommandClient, ) *api.ApiHandlers { @@ -936,7 +969,7 @@ func buildAPIHandlers( CourseHandler: api.NewCourseHandler(courseClient), ScheduleHandler: api.NewScheduleAPI(scheduleClient), AgentHandler: api.NewAgentHandler(agentService), - MemoryHandler: api.NewMemoryHandler(memoryModule), + MemoryHandler: api.NewMemoryHandler(memoryClient), ActiveSchedule: api.NewActiveScheduleAPI(activeSchedulerClient), Notification: api.NewNotificationAPI(notificationClient), } @@ -953,10 +986,7 @@ func (r *appRuntime) startWorkers(ctx context.Context) { } else { log.Println("Outbox event bus is disabled") } - - if r.memoryModule != nil { - r.memoryModule.StartWorker(ctx) - } + log.Println("Memory worker is managed by cmd/memory in phase 6 CP1") } func (r *appRuntime) registerEventHandlers() error { diff --git a/backend/config.example.yaml b/backend/config.example.yaml index 67c82bd..f5b83a1 100644 --- a/backend/config.example.yaml +++ b/backend/config.example.yaml @@ -156,6 +156,11 @@ rag: # 记忆模块配置。 memory: + rpc: + listenOn: "0.0.0.0:9088" + endpoints: + - "127.0.0.1:9088" + timeout: 6s enabled: true rag: enabled: true diff --git a/backend/gateway/api/memory.go b/backend/gateway/api/memory.go index 36318b2..31cb817 100644 --- a/backend/gateway/api/memory.go +++ b/backend/gateway/api/memory.go @@ -8,25 +8,30 @@ import ( "strings" "time" - memorypkg "github.com/LoveLosita/smartflow/backend/memory" - memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" - "github.com/LoveLosita/smartflow/backend/model" "github.com/LoveLosita/smartflow/backend/respond" + memorycontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/memory" + "github.com/LoveLosita/smartflow/backend/shared/ports" "github.com/gin-gonic/gin" ) type MemoryHandler struct { - module *memorypkg.Module + client ports.MemoryCommandClient } var errMemoryHandlerNotReady = errors.New("memory handler is not initialized") -func NewMemoryHandler(module *memorypkg.Module) *MemoryHandler { - return &MemoryHandler{module: module} +// NewMemoryHandler 创建 memory HTTP 门面。 +// +// 职责边界: +// 1. gateway 只负责鉴权后的参数绑定、超时和响应透传; +// 2. 记忆管理业务、审计、向量同步和状态校验都交给 memory zrpc 服务; +// 3. agent 的 memory reader 已在 CP3 切到 memory zrpc,HTTP 管理面这里只保留管理职责。 +func NewMemoryHandler(client ports.MemoryCommandClient) *MemoryHandler { + return &MemoryHandler{client: client} } func (h *MemoryHandler) ListItems(c *gin.Context) { - if h == nil || h.module == nil { + if h == nil || h.client == nil { c.JSON(http.StatusInternalServerError, respond.InternalError(errMemoryHandlerNotReady)) return } @@ -48,7 +53,7 @@ func (h *MemoryHandler) ListItems(c *gin.Context) { ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) defer cancel() - items, err := h.module.ListItems(ctx, memorymodel.ListItemsRequest{ + resp, err := h.client.ListItems(ctx, memorycontracts.ListItemsRequest{ UserID: c.GetInt("user_id"), ConversationID: strings.TrimSpace(c.Query("conversation_id")), Statuses: splitCSV(statusesRaw), @@ -60,11 +65,11 @@ func (h *MemoryHandler) ListItems(c *gin.Context) { return } - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, toMemoryItemViews(items))) + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } func (h *MemoryHandler) GetItem(c *gin.Context) { - if h == nil || h.module == nil { + if h == nil || h.client == nil { c.JSON(http.StatusInternalServerError, respond.InternalError(errMemoryHandlerNotReady)) return } @@ -78,7 +83,7 @@ func (h *MemoryHandler) GetItem(c *gin.Context) { ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) defer cancel() - item, err := h.module.GetItem(ctx, model.MemoryGetItemRequest{ + resp, err := h.client.GetItem(ctx, memorycontracts.GetItemRequest{ UserID: c.GetInt("user_id"), MemoryID: memoryID, }) @@ -86,16 +91,16 @@ func (h *MemoryHandler) GetItem(c *gin.Context) { respond.DealWithError(c, err) return } - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, toMemoryItemView(item))) + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } func (h *MemoryHandler) CreateItem(c *gin.Context) { - if h == nil || h.module == nil { + if h == nil || h.client == nil { c.JSON(http.StatusInternalServerError, respond.InternalError(errMemoryHandlerNotReady)) return } - var req model.MemoryCreateItemRequest + var req memorycontracts.CreateItemRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, respond.WrongParamType) return @@ -106,16 +111,16 @@ func (h *MemoryHandler) CreateItem(c *gin.Context) { ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) defer cancel() - item, err := h.module.CreateItem(ctx, req) + resp, err := h.client.CreateItem(ctx, req) if err != nil { respond.DealWithError(c, err) return } - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, toMemoryItemView(item))) + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } func (h *MemoryHandler) UpdateItem(c *gin.Context) { - if h == nil || h.module == nil { + if h == nil || h.client == nil { c.JSON(http.StatusInternalServerError, respond.InternalError(errMemoryHandlerNotReady)) return } @@ -126,7 +131,7 @@ func (h *MemoryHandler) UpdateItem(c *gin.Context) { return } - var req model.MemoryUpdateItemRequest + var req memorycontracts.UpdateItemRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, respond.WrongParamType) return @@ -138,16 +143,16 @@ func (h *MemoryHandler) UpdateItem(c *gin.Context) { ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) defer cancel() - item, err := h.module.UpdateItem(ctx, req) + resp, err := h.client.UpdateItem(ctx, req) if err != nil { respond.DealWithError(c, err) return } - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, toMemoryItemView(item))) + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } func (h *MemoryHandler) DeleteItem(c *gin.Context) { - if h == nil || h.module == nil { + if h == nil || h.client == nil { c.JSON(http.StatusInternalServerError, respond.InternalError(errMemoryHandlerNotReady)) return } @@ -166,7 +171,7 @@ func (h *MemoryHandler) DeleteItem(c *gin.Context) { ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) defer cancel() - item, err := h.module.DeleteItem(ctx, model.MemoryDeleteItemRequest{ + resp, err := h.client.DeleteItem(ctx, memorycontracts.DeleteItemRequest{ UserID: c.GetInt("user_id"), MemoryID: memoryID, Reason: strings.TrimSpace(body.Reason), @@ -176,11 +181,11 @@ func (h *MemoryHandler) DeleteItem(c *gin.Context) { respond.DealWithError(c, err) return } - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, toMemoryItemView(item))) + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } func (h *MemoryHandler) RestoreItem(c *gin.Context) { - if h == nil || h.module == nil { + if h == nil || h.client == nil { c.JSON(http.StatusInternalServerError, respond.InternalError(errMemoryHandlerNotReady)) return } @@ -199,7 +204,7 @@ func (h *MemoryHandler) RestoreItem(c *gin.Context) { ctx, cancel := context.WithTimeout(c.Request.Context(), 2*time.Second) defer cancel() - item, err := h.module.RestoreItem(ctx, model.MemoryRestoreItemRequest{ + resp, err := h.client.RestoreItem(ctx, memorycontracts.RestoreItemRequest{ UserID: c.GetInt("user_id"), MemoryID: memoryID, Reason: strings.TrimSpace(body.Reason), @@ -209,7 +214,7 @@ func (h *MemoryHandler) RestoreItem(c *gin.Context) { respond.DealWithError(c, err) return } - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, toMemoryItemView(item))) + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } func parseMemoryIDParam(c *gin.Context) (int64, bool) { @@ -252,39 +257,3 @@ func splitCSV(raw string) []string { } return result } - -func toMemoryItemViews(items []memorymodel.ItemDTO) []model.MemoryItemView { - if len(items) == 0 { - return nil - } - result := make([]model.MemoryItemView, 0, len(items)) - for _, item := range items { - result = append(result, toMemoryItemView(&item)) - } - return result -} - -func toMemoryItemView(item *memorymodel.ItemDTO) model.MemoryItemView { - if item == nil { - return model.MemoryItemView{} - } - return model.MemoryItemView{ - ID: item.ID, - UserID: item.UserID, - ConversationID: item.ConversationID, - AssistantID: item.AssistantID, - RunID: item.RunID, - MemoryType: item.MemoryType, - Title: item.Title, - Content: item.Content, - ContentHash: item.ContentHash, - Confidence: item.Confidence, - Importance: item.Importance, - SensitivityLevel: item.SensitivityLevel, - IsExplicit: item.IsExplicit, - Status: item.Status, - TTLAt: item.TTLAt, - CreatedAt: item.CreatedAt, - UpdatedAt: item.UpdatedAt, - } -} diff --git a/backend/gateway/client/memory/client.go b/backend/gateway/client/memory/client.go new file mode 100644 index 0000000..9b864d3 --- /dev/null +++ b/backend/gateway/client/memory/client.go @@ -0,0 +1,155 @@ +package memory + +import ( + "context" + "encoding/json" + "errors" + "strings" + "time" + + memorypb "github.com/LoveLosita/smartflow/backend/services/memory/rpc/pb" + memorycontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/memory" + "github.com/zeromicro/go-zero/zrpc" + "google.golang.org/grpc" +) + +const ( + defaultEndpoint = "127.0.0.1:9088" + defaultTimeout = 6 * time.Second +) + +type ClientConfig struct { + Endpoints []string + Target string + Timeout time.Duration +} + +// Client 是 gateway 访问 memory zrpc 的最小适配层。 +// +// 职责边界: +// 1. 只负责跨进程 gRPC 调用和 JSON 透传,不触碰 memory repo、worker 或 outbox; +// 2. HTTP 入参仍由 gateway/api 做基础绑定,业务校验交给 memory 服务; +// 3. 复杂响应不在 gateway 重建模型,避免 DTO 复制扩散。 +type Client struct { + rpc memorypb.MemoryClient +} + +func NewClient(cfg ClientConfig) (*Client, error) { + timeout := cfg.Timeout + if timeout <= 0 { + timeout = defaultTimeout + } + endpoints := normalizeEndpoints(cfg.Endpoints) + target := strings.TrimSpace(cfg.Target) + if len(endpoints) == 0 && target == "" { + endpoints = []string{defaultEndpoint} + } + + zclient, err := zrpc.NewClient(zrpc.RpcClientConf{ + Endpoints: endpoints, + Target: target, + NonBlock: true, + Timeout: int64(timeout / time.Millisecond), + }) + if err != nil { + return nil, err + } + // 1. 这里不在构造期 Ping memory 服务,避免 cmd/memory 短暂不可用时拖垮整个 gateway/worker 启动。 + // 2. 真正的可用性检查延迟到各个 RPC 调用,由 `/api/v1/memory/*` 自己返回局部错误。 + client := &Client{rpc: memorypb.NewMemoryClient(zclient.Conn())} + return client, nil +} + +// Retrieve 调用 memory 服务完成 agent 记忆读取。 +// +// 职责边界: +// 1. 只负责跨进程 JSON 编解码和 gRPC 错误还原; +// 2. 不在 gateway 侧重做召回、过滤或 prompt 渲染; +// 3. 返回 ItemDTO 给 agent 适配器继续转换为内部模型。 +func (c *Client) Retrieve(ctx context.Context, req memorycontracts.RetrieveRequest) ([]memorycontracts.ItemDTO, error) { + resp, err := c.callJSON(ctx, c.rpc.Retrieve, req) + raw, err := jsonFromResponse(resp, err) + if err != nil { + return nil, err + } + if len(raw) == 0 || string(raw) == "null" { + return nil, nil + } + var items []memorycontracts.ItemDTO + if err := json.Unmarshal(raw, &items); err != nil { + return nil, err + } + return items, nil +} + +func (c *Client) ListItems(ctx context.Context, req memorycontracts.ListItemsRequest) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.ListItems, req) + return jsonFromResponse(resp, err) +} + +func (c *Client) GetItem(ctx context.Context, req memorycontracts.GetItemRequest) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.GetItem, req) + return jsonFromResponse(resp, err) +} + +func (c *Client) CreateItem(ctx context.Context, req memorycontracts.CreateItemRequest) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.CreateItem, req) + return jsonFromResponse(resp, err) +} + +func (c *Client) UpdateItem(ctx context.Context, req memorycontracts.UpdateItemRequest) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.UpdateItem, req) + return jsonFromResponse(resp, err) +} + +func (c *Client) DeleteItem(ctx context.Context, req memorycontracts.DeleteItemRequest) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.DeleteItem, req) + return jsonFromResponse(resp, err) +} + +func (c *Client) RestoreItem(ctx context.Context, req memorycontracts.RestoreItemRequest) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.RestoreItem, req) + return jsonFromResponse(resp, err) +} + +func (c *Client) ensureReady() error { + if c == nil || c.rpc == nil { + return errors.New("memory zrpc client is not initialized") + } + return nil +} + +func (c *Client) callJSON(ctx context.Context, fn func(context.Context, *memorypb.JSONRequest, ...grpc.CallOption) (*memorypb.JSONResponse, error), payload any) (*memorypb.JSONResponse, error) { + if err := c.ensureReady(); err != nil { + return nil, err + } + raw, err := json.Marshal(payload) + if err != nil { + return nil, err + } + return fn(ctx, &memorypb.JSONRequest{PayloadJson: raw}) +} + +func jsonFromResponse(resp *memorypb.JSONResponse, rpcErr error) (json.RawMessage, error) { + if rpcErr != nil { + return nil, responseFromRPCError(rpcErr) + } + if resp == nil { + return nil, errors.New("memory zrpc service returned empty JSON response") + } + if len(resp.DataJson) == 0 { + return json.RawMessage("null"), nil + } + return json.RawMessage(resp.DataJson), nil +} + +func normalizeEndpoints(values []string) []string { + endpoints := make([]string, 0, len(values)) + for _, value := range values { + trimmed := strings.TrimSpace(value) + if trimmed != "" { + endpoints = append(endpoints, trimmed) + } + } + return endpoints +} diff --git a/backend/gateway/client/memory/errors.go b/backend/gateway/client/memory/errors.go new file mode 100644 index 0000000..5642253 --- /dev/null +++ b/backend/gateway/client/memory/errors.go @@ -0,0 +1,94 @@ +package memory + +import ( + "errors" + "fmt" + "strings" + + "github.com/LoveLosita/smartflow/backend/respond" + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// responseFromRPCError 负责把 memory 的 gRPC 错误反解回项目内错误。 +// +// 职责边界: +// 1. 只在 gateway 边缘层使用; +// 2. 业务错误尽量恢复成 respond.Response,方便 API 层复用 DealWithError; +// 3. 服务不可用或未知内部错误包装成普通 error,避免误报成用户可修正的参数问题。 +func responseFromRPCError(err error) error { + if err == nil { + return nil + } + + st, ok := status.FromError(err) + if !ok { + return wrapRPCError(err) + } + if resp, ok := responseFromStatus(st); ok { + return resp + } + + switch st.Code() { + case codes.Internal, codes.Unknown, codes.Unavailable, codes.DeadlineExceeded, codes.DataLoss, codes.Unimplemented: + msg := strings.TrimSpace(st.Message()) + if msg == "" { + msg = "memory zrpc service internal error" + } + return wrapRPCError(errors.New(msg)) + } + + msg := strings.TrimSpace(st.Message()) + if msg == "" { + msg = "memory zrpc service rejected request" + } + return respond.Response{Status: grpcCodeToRespondStatus(st.Code()), Info: msg} +} + +func responseFromStatus(st *status.Status) (respond.Response, bool) { + if st == nil { + return respond.Response{}, false + } + for _, detail := range st.Details() { + info, ok := detail.(*errdetails.ErrorInfo) + if !ok { + continue + } + statusValue := strings.TrimSpace(info.Reason) + if statusValue == "" { + statusValue = grpcCodeToRespondStatus(st.Code()) + } + message := strings.TrimSpace(st.Message()) + if message == "" && info.Metadata != nil { + message = strings.TrimSpace(info.Metadata["info"]) + } + if message == "" { + message = statusValue + } + return respond.Response{Status: statusValue, Info: message}, true + } + return respond.Response{}, false +} + +func grpcCodeToRespondStatus(code codes.Code) string { + switch code { + case codes.Unauthenticated: + return respond.ErrUnauthorized.Status + case codes.InvalidArgument: + return respond.MissingParam.Status + case codes.NotFound: + return respond.MemoryItemNotFound.Status + case codes.Internal, codes.Unknown, codes.DataLoss: + return "500" + default: + return "400" + } +} + +func wrapRPCError(err error) error { + if err == nil { + return nil + } + return fmt.Errorf("调用 memory zrpc 服务失败: %w", err) +} diff --git a/backend/service/agentsvc/memory_rpc_reader.go b/backend/service/agentsvc/memory_rpc_reader.go new file mode 100644 index 0000000..00f7da3 --- /dev/null +++ b/backend/service/agentsvc/memory_rpc_reader.go @@ -0,0 +1,121 @@ +package agentsvc + +import ( + "context" + "errors" + + memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" + memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" + memorycontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/memory" +) + +// MemoryRPCReaderClient 描述 agent 读取 memory zrpc 所需的最小能力。 +// +// 职责边界: +// 1. 只读取候选记忆,不暴露管理写接口; +// 2. 不要求调用方知道 gateway/client/memory 的具体实现; +// 3. 错误原样返回给预取链路,由 agent 侧负责软降级和观测记录。 +type MemoryRPCReaderClient interface { + Retrieve(ctx context.Context, req memorycontracts.RetrieveRequest) ([]memorycontracts.ItemDTO, error) +} + +type memoryRPCReader struct { + client MemoryRPCReaderClient + observer memoryobserve.Observer + metrics memoryobserve.MetricsRecorder +} + +// NewMemoryRPCReader 创建跨进程 memory reader 适配器。 +// +// 职责边界: +// 1. 只把 agent 内部的 memorymodel.RetrieveRequest 转成共享契约; +// 2. 不持有 memory.Module,避免 CP3 后 agent 主链路继续直连本进程记忆服务; +// 3. observer / metrics 只用于 agent 注入观测,不参与 retrieve 业务调用; +// 4. client 为空时返回 nil,让 SetMemoryReader 保持既有“无 reader 则不注入”的降级语义。 +func NewMemoryRPCReader( + client MemoryRPCReaderClient, + observer memoryobserve.Observer, + metrics memoryobserve.MetricsRecorder, +) MemoryReader { + if client == nil { + return nil + } + if observer == nil { + observer = memoryobserve.NewNopObserver() + } + if metrics == nil { + metrics = memoryobserve.NewNopMetrics() + } + return &memoryRPCReader{ + client: client, + observer: observer, + metrics: metrics, + } +} + +// Retrieve 通过 memory zrpc 读取候选记忆并转换回 agent 内部 DTO。 +func (r *memoryRPCReader) Retrieve(ctx context.Context, req memorymodel.RetrieveRequest) ([]memorymodel.ItemDTO, error) { + if r == nil || r.client == nil { + return nil, errors.New("memory rpc reader client is nil") + } + items, err := r.client.Retrieve(ctx, memorycontracts.RetrieveRequest{ + Query: req.Query, + UserID: req.UserID, + ConversationID: req.ConversationID, + AssistantID: req.AssistantID, + RunID: req.RunID, + MemoryTypes: append([]string(nil), req.MemoryTypes...), + Limit: req.Limit, + Now: req.Now, + }) + if err != nil { + return nil, err + } + return toMemoryModelItems(items), nil +} + +// MemoryObserver 暴露 agent 注入链路使用的 observer,保持 CP3 切流前后的注入观测连续。 +func (r *memoryRPCReader) MemoryObserver() memoryobserve.Observer { + if r == nil || r.observer == nil { + return memoryobserve.NewNopObserver() + } + return r.observer +} + +// MemoryMetrics 暴露 agent 注入链路使用的 metrics,避免 RPC reader 切流后指标静默丢失。 +func (r *memoryRPCReader) MemoryMetrics() memoryobserve.MetricsRecorder { + if r == nil || r.metrics == nil { + return memoryobserve.NewNopMetrics() + } + return r.metrics +} + +// toMemoryModelItems 只做跨层 DTO 字段搬运,不改变排序、过滤和记忆内容。 +func toMemoryModelItems(items []memorycontracts.ItemDTO) []memorymodel.ItemDTO { + if len(items) == 0 { + return nil + } + result := make([]memorymodel.ItemDTO, 0, len(items)) + for _, item := range items { + result = append(result, memorymodel.ItemDTO{ + ID: item.ID, + UserID: item.UserID, + ConversationID: item.ConversationID, + AssistantID: item.AssistantID, + RunID: item.RunID, + MemoryType: item.MemoryType, + Title: item.Title, + Content: item.Content, + ContentHash: item.ContentHash, + Confidence: item.Confidence, + Importance: item.Importance, + SensitivityLevel: item.SensitivityLevel, + IsExplicit: item.IsExplicit, + Status: item.Status, + TTLAt: item.TTLAt, + CreatedAt: item.CreatedAt, + UpdatedAt: item.UpdatedAt, + }) + } + return result +} diff --git a/backend/service/events/core_outbox_handlers.go b/backend/service/events/core_outbox_handlers.go index c9cb725..479af4b 100644 --- a/backend/service/events/core_outbox_handlers.go +++ b/backend/service/events/core_outbox_handlers.go @@ -10,13 +10,13 @@ import ( "github.com/LoveLosita/smartflow/backend/shared/ports" ) -// RegisterCoreOutboxHandlers 注册核心业务 outbox handler。 +// RegisterCoreOutboxHandlers 注册单体残留内仍由 agent 边界消费的 outbox handler。 // // 职责边界: -// 1. 只负责聚合注册当前核心业务 handler,便于 start / worker/all 等启动入口复用同一套接线顺序。 -// 2. 不负责创建 eventBus/outboxRepo/DAO/memoryModule,也不负责启动或关闭事件总线。 +// 1. 只负责聚合注册当前单体残留内仍归 agent 进程消费的 handler; +// 2. 不负责创建 eventBus/outboxRepo/DAO,也不负责启动或关闭事件总线。 // 3. 不改变单个 Register* 函数的职责;具体 payload 解析、幂等消费和业务落库仍由各自 handler 负责。 -// 4. 这里以显式 route table 的方式列出“事件类型 -> 服务归属 -> handler”,避免后续新增事件时只改启动入口不改接线表。 +// 4. memory.extract.requested 已在阶段 6 CP1 迁往 cmd/memory,这里只登记其路由,不再注册消费 handler。 func RegisterCoreOutboxHandlers( eventBus OutboxBus, outboxRepo *outboxinfra.Repository, @@ -26,7 +26,10 @@ func RegisterCoreOutboxHandlers( memoryModule *memory.Module, adjuster ports.TokenUsageAdjuster, ) error { - if err := validateCoreOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule); err != nil { + if err := validateCoreOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo); err != nil { + return err + } + if err := RegisterMemoryExtractRoute(); err != nil { return err } @@ -77,7 +80,6 @@ func validateCoreOutboxHandlerDeps( repoManager *dao.RepoManager, agentRepo *dao.AgentDAO, cacheRepo *dao.CacheDAO, - memoryModule *memory.Module, ) error { if eventBus == nil { return errors.New("event bus is nil") @@ -94,9 +96,6 @@ func validateCoreOutboxHandlerDeps( if cacheRepo == nil { return errors.New("cache repo is nil") } - if memoryModule == nil { - return errors.New("memory module is nil") - } return nil } @@ -110,7 +109,7 @@ func validateAllOutboxHandlerDeps( memoryModule *memory.Module, activeTriggerWorkflow ActiveScheduleTriggeredProcessor, ) error { - if err := validateCoreOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule); err != nil { + if err := validateCoreOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo); err != nil { return err } if activeTriggerWorkflow == nil { @@ -158,13 +157,6 @@ func coreOutboxHandlerRoutes( return RegisterAgentTimelinePersistHandler(eventBus, outboxRepo, agentRepo, cacheRepo) }, }, - { - EventType: EventTypeMemoryExtractRequested, - Service: outboxHandlerServiceMemory, - Register: func() error { - return RegisterMemoryExtractRequestedHandler(eventBus, outboxRepo, memoryModule) - }, - }, } } diff --git a/backend/service/events/memory_extract_requested.go b/backend/service/events/memory_extract_requested.go index 3cb13c5..85febe1 100644 --- a/backend/service/events/memory_extract_requested.go +++ b/backend/service/events/memory_extract_requested.go @@ -26,6 +26,16 @@ const ( maxMemorySourceTextLength = 1500 ) +// RegisterMemoryExtractRoute 只登记 memory.extract.requested 的服务归属。 +// +// 职责边界: +// 1. 只保证发布侧能把事件写入 memory_outbox_messages; +// 2. 不注册消费 handler,消费边界在阶段 6 CP1 起归 cmd/memory; +// 3. 重复调用按 outbox 路由注册的幂等语义处理。 +func RegisterMemoryExtractRoute() error { + return outboxinfra.RegisterEventService(EventTypeMemoryExtractRequested, outboxinfra.ServiceMemory) +} + // RegisterMemoryExtractRequestedHandler 注册“记忆抽取请求”消费者。 // // 职责边界: diff --git a/backend/services/memory/dao/connect.go b/backend/services/memory/dao/connect.go new file mode 100644 index 0000000..4199d4a --- /dev/null +++ b/backend/services/memory/dao/connect.go @@ -0,0 +1,89 @@ +package dao + +import ( + "context" + "fmt" + + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + coremodel "github.com/LoveLosita/smartflow/backend/model" + "github.com/go-redis/redis/v8" + "github.com/spf13/viper" + "gorm.io/driver/mysql" + "gorm.io/gorm" +) + +// OpenDBFromConfig 创建 memory 服务自己的数据库句柄。 +// +// 职责边界: +// 1. 只迁移 memory_items / memory_jobs / memory_audit_logs / memory_user_settings 以及 memory 服务自己的 outbox 表; +// 2. 不迁移 agent、task、schedule、active-scheduler、notification 等跨域表,避免独立进程越权管理别的领域; +// 3. 返回的 *gorm.DB 供 memory 服务内部 repo、worker 和 outbox consumer 复用。 +func OpenDBFromConfig() (*gorm.DB, error) { + host := viper.GetString("database.host") + port := viper.GetString("database.port") + user := viper.GetString("database.user") + password := viper.GetString("database.password") + dbname := viper.GetString("database.dbname") + + dsn := fmt.Sprintf( + "%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", + user, password, host, port, dbname, + ) + + // 1. 先按统一配置建立 MySQL 连接;若连接失败,独立 memory 进程直接 fail fast。 + db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) + if err != nil { + return nil, err + } + + // 2. 只迁移 memory 自有表,明确与 agent/task/schedule 等跨域模型隔离。 + if err = db.AutoMigrate( + &coremodel.MemoryItem{}, + &coremodel.MemoryJob{}, + &coremodel.MemoryAuditLog{}, + &coremodel.MemoryUserSetting{}, + ); err != nil { + return nil, fmt.Errorf("auto migrate memory tables failed: %w", err) + } + + // 3. 再迁移 memory 服务自己的 outbox 物理表,让独立服务可以单独发布与消费 memory 事件。 + if err = autoMigrateMemoryOutboxTable(db); err != nil { + return nil, err + } + return db, nil +} + +// OpenRedisFromConfig 创建 memory 服务自己的 Redis 句柄。 +// +// 职责边界: +// 1. 只负责初始化 memory 独立进程所需的 Redis client; +// 2. 不创建、不预热、不清理任何 memory 业务 key; +// 3. Ping 失败直接返回 error,让入口在缓存、锁或幂等依赖异常时尽早暴露问题。 +func OpenRedisFromConfig() (*redis.Client, error) { + client := redis.NewClient(&redis.Options{ + Addr: viper.GetString("redis.host") + ":" + viper.GetString("redis.port"), + Password: viper.GetString("redis.password"), + DB: 0, + }) + if _, err := client.Ping(context.Background()).Result(); err != nil { + return nil, err + } + return client, nil +} + +// autoMigrateMemoryOutboxTable 只迁移 memory 服务自己的 outbox 物理表。 +// +// 职责边界: +// 1. 只负责 service catalog 中 memory 对应的 outbox 表,不硬编码别的服务表名; +// 2. 共享 AgentOutboxMessage 结构作为表结构模板,但物理表仍归 memory 服务所有; +// 3. 若后续 outbox 表名调整,只改 service catalog,不在这里散落配置。 +func autoMigrateMemoryOutboxTable(db *gorm.DB) error { + cfg, ok := outboxinfra.ResolveServiceConfig(outboxinfra.ServiceMemory) + if !ok { + return fmt.Errorf("resolve memory outbox config failed") + } + if err := db.Table(cfg.TableName).AutoMigrate(&coremodel.AgentOutboxMessage{}); err != nil { + return fmt.Errorf("auto migrate memory outbox table failed for %s (%s): %w", cfg.Name, cfg.TableName, err) + } + return nil +} diff --git a/backend/services/memory/rpc/errors.go b/backend/services/memory/rpc/errors.go new file mode 100644 index 0000000..c9df506 --- /dev/null +++ b/backend/services/memory/rpc/errors.go @@ -0,0 +1,74 @@ +package rpc + +import ( + "errors" + "log" + "strings" + + "github.com/LoveLosita/smartflow/backend/respond" + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + errMemoryServiceNotReady = errors.New("memory service dependency not initialized") +) + +const memoryErrorDomain = "smartflow.memory" + +// grpcErrorFromServiceError 负责把 memory 内部错误转换为 gRPC status。 +// +// 职责边界: +// 1. respond.Response 保留项目内部 status/info,供 gateway 反解; +// 2. 未分类错误只暴露通用内部错误,详细信息留在服务日志; +// 3. 不在 RPC 层重判业务规则,业务语义仍由 memory.Module 决定。 +func grpcErrorFromServiceError(err error) error { + if err == nil { + return nil + } + var resp respond.Response + if errors.As(err, &resp) { + return grpcErrorFromResponse(resp) + } + log.Printf("memory rpc internal error: %v", err) + return status.Error(codes.Internal, "memory service internal error") +} + +func grpcErrorFromResponse(resp respond.Response) error { + code := grpcCodeFromRespondStatus(resp.Status) + message := strings.TrimSpace(resp.Info) + if message == "" { + message = strings.TrimSpace(resp.Status) + } + st := status.New(code, message) + detail := &errdetails.ErrorInfo{ + Domain: memoryErrorDomain, + Reason: resp.Status, + Metadata: map[string]string{ + "info": resp.Info, + }, + } + withDetails, err := st.WithDetails(detail) + if err != nil { + return st.Err() + } + return withDetails.Err() +} + +func grpcCodeFromRespondStatus(statusValue string) codes.Code { + switch strings.TrimSpace(statusValue) { + case respond.MissingToken.Status, respond.InvalidToken.Status, respond.InvalidClaims.Status, + respond.ErrUnauthorized.Status, respond.WrongTokenType.Status, respond.UserLoggedOut.Status: + return codes.Unauthenticated + case respond.MemoryItemNotFound.Status: + return codes.NotFound + case respond.MissingParam.Status, respond.WrongParamType.Status, respond.ParamTooLong.Status, + respond.WrongUserID.Status, respond.MemoryInvalidType.Status, respond.MemoryInvalidContent.Status: + return codes.InvalidArgument + } + if strings.HasPrefix(strings.TrimSpace(statusValue), "5") { + return codes.Internal + } + return codes.InvalidArgument +} diff --git a/backend/services/memory/rpc/handler.go b/backend/services/memory/rpc/handler.go new file mode 100644 index 0000000..e3a5f66 --- /dev/null +++ b/backend/services/memory/rpc/handler.go @@ -0,0 +1,136 @@ +package rpc + +import ( + "context" + "encoding/json" + + "github.com/LoveLosita/smartflow/backend/respond" + "github.com/LoveLosita/smartflow/backend/services/memory/rpc/pb" + memorysv "github.com/LoveLosita/smartflow/backend/services/memory/sv" + memorycontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/memory" +) + +type Handler struct { + pb.UnimplementedMemoryServer + svc *memorysv.Service +} + +func NewHandler(svc *memorysv.Service) *Handler { + return &Handler{svc: svc} +} + +// Ping 供调用方在启动期确认 memory zrpc 已可用。 +func (h *Handler) Ping(ctx context.Context, req *pb.StatusResponse) (*pb.StatusResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + if err := h.svc.Ping(ctx); err != nil { + return nil, grpcErrorFromServiceError(err) + } + return &pb.StatusResponse{}, nil +} + +func (h *Handler) Retrieve(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq memorycontracts.RetrieveRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.Retrieve(ctx, contractReq) + return jsonResponse(data, err) +} + +func (h *Handler) ListItems(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq memorycontracts.ListItemsRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.ListItems(ctx, contractReq) + return jsonResponse(data, err) +} + +func (h *Handler) GetItem(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq memorycontracts.GetItemRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.GetItem(ctx, contractReq) + return jsonResponse(data, err) +} + +func (h *Handler) CreateItem(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq memorycontracts.CreateItemRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.CreateItem(ctx, contractReq) + return jsonResponse(data, err) +} + +func (h *Handler) UpdateItem(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq memorycontracts.UpdateItemRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.UpdateItem(ctx, contractReq) + return jsonResponse(data, err) +} + +func (h *Handler) DeleteItem(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq memorycontracts.DeleteItemRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.DeleteItem(ctx, contractReq) + return jsonResponse(data, err) +} + +func (h *Handler) RestoreItem(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq memorycontracts.RestoreItemRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.RestoreItem(ctx, contractReq) + return jsonResponse(data, err) +} + +func (h *Handler) ensureReady(req any) error { + if h == nil || h.svc == nil { + return grpcErrorFromServiceError(errMemoryServiceNotReady) + } + if req == nil { + return grpcErrorFromServiceError(respond.MissingParam) + } + return nil +} + +func jsonResponse(value any, err error) (*pb.JSONResponse, error) { + if err != nil { + return nil, grpcErrorFromServiceError(err) + } + raw, err := json.Marshal(value) + if err != nil { + return nil, grpcErrorFromServiceError(err) + } + return &pb.JSONResponse{DataJson: raw}, nil +} diff --git a/backend/services/memory/rpc/memory.proto b/backend/services/memory/rpc/memory.proto new file mode 100644 index 0000000..a393906 --- /dev/null +++ b/backend/services/memory/rpc/memory.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +package smartflow.memory; + +option go_package = "github.com/LoveLosita/smartflow/backend/services/memory/rpc/pb"; + +service Memory { + rpc Ping(StatusResponse) returns (StatusResponse); + rpc Retrieve(JSONRequest) returns (JSONResponse); + rpc ListItems(JSONRequest) returns (JSONResponse); + rpc GetItem(JSONRequest) returns (JSONResponse); + rpc CreateItem(JSONRequest) returns (JSONResponse); + rpc UpdateItem(JSONRequest) returns (JSONResponse); + rpc DeleteItem(JSONRequest) returns (JSONResponse); + rpc RestoreItem(JSONRequest) returns (JSONResponse); +} + +message JSONRequest { + bytes payload_json = 1; +} + +message JSONResponse { + bytes data_json = 1; +} + +message StatusResponse { +} diff --git a/backend/services/memory/rpc/pb/memory.pb.go b/backend/services/memory/rpc/pb/memory.pb.go new file mode 100644 index 0000000..b6fe02e --- /dev/null +++ b/backend/services/memory/rpc/pb/memory.pb.go @@ -0,0 +1,39 @@ +package pb + +import proto "github.com/golang/protobuf/proto" + +var _ = proto.Marshal + +const _ = proto.ProtoPackageIsVersion3 + +type JSONRequest struct { + PayloadJson []byte `protobuf:"bytes,1,opt,name=payload_json,json=payloadJson,proto3" json:"payload_json,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *JSONRequest) Reset() { *m = JSONRequest{} } +func (m *JSONRequest) String() string { return proto.CompactTextString(m) } +func (*JSONRequest) ProtoMessage() {} + +type JSONResponse struct { + DataJson []byte `protobuf:"bytes,1,opt,name=data_json,json=dataJson,proto3" json:"data_json,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *JSONResponse) Reset() { *m = JSONResponse{} } +func (m *JSONResponse) String() string { return proto.CompactTextString(m) } +func (*JSONResponse) ProtoMessage() {} + +type StatusResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatusResponse) Reset() { *m = StatusResponse{} } +func (m *StatusResponse) String() string { return proto.CompactTextString(m) } +func (*StatusResponse) ProtoMessage() {} diff --git a/backend/services/memory/rpc/pb/memory_grpc.pb.go b/backend/services/memory/rpc/pb/memory_grpc.pb.go new file mode 100644 index 0000000..40d2db7 --- /dev/null +++ b/backend/services/memory/rpc/pb/memory_grpc.pb.go @@ -0,0 +1,181 @@ +package pb + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +const ( + Memory_Ping_FullMethodName = "/smartflow.memory.Memory/Ping" + Memory_Retrieve_FullMethodName = "/smartflow.memory.Memory/Retrieve" + Memory_ListItems_FullMethodName = "/smartflow.memory.Memory/ListItems" + Memory_GetItem_FullMethodName = "/smartflow.memory.Memory/GetItem" + Memory_CreateItem_FullMethodName = "/smartflow.memory.Memory/CreateItem" + Memory_UpdateItem_FullMethodName = "/smartflow.memory.Memory/UpdateItem" + Memory_DeleteItem_FullMethodName = "/smartflow.memory.Memory/DeleteItem" + Memory_RestoreItem_FullMethodName = "/smartflow.memory.Memory/RestoreItem" +) + +type MemoryClient interface { + Ping(ctx context.Context, in *StatusResponse, opts ...grpc.CallOption) (*StatusResponse, error) + Retrieve(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + ListItems(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + GetItem(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + CreateItem(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + UpdateItem(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + DeleteItem(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + RestoreItem(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) +} + +type memoryClient struct { + cc grpc.ClientConnInterface +} + +func NewMemoryClient(cc grpc.ClientConnInterface) MemoryClient { + return &memoryClient{cc} +} + +func (c *memoryClient) Ping(ctx context.Context, in *StatusResponse, opts ...grpc.CallOption) (*StatusResponse, error) { + out := new(StatusResponse) + err := c.cc.Invoke(ctx, Memory_Ping_FullMethodName, in, out, opts...) + return out, err +} + +func (c *memoryClient) Retrieve(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Memory_Retrieve_FullMethodName, in, out, opts...) + return out, err +} + +func (c *memoryClient) ListItems(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Memory_ListItems_FullMethodName, in, out, opts...) + return out, err +} + +func (c *memoryClient) GetItem(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Memory_GetItem_FullMethodName, in, out, opts...) + return out, err +} + +func (c *memoryClient) CreateItem(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Memory_CreateItem_FullMethodName, in, out, opts...) + return out, err +} + +func (c *memoryClient) UpdateItem(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Memory_UpdateItem_FullMethodName, in, out, opts...) + return out, err +} + +func (c *memoryClient) DeleteItem(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Memory_DeleteItem_FullMethodName, in, out, opts...) + return out, err +} + +func (c *memoryClient) RestoreItem(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Memory_RestoreItem_FullMethodName, in, out, opts...) + return out, err +} + +type MemoryServer interface { + Ping(context.Context, *StatusResponse) (*StatusResponse, error) + Retrieve(context.Context, *JSONRequest) (*JSONResponse, error) + ListItems(context.Context, *JSONRequest) (*JSONResponse, error) + GetItem(context.Context, *JSONRequest) (*JSONResponse, error) + CreateItem(context.Context, *JSONRequest) (*JSONResponse, error) + UpdateItem(context.Context, *JSONRequest) (*JSONResponse, error) + DeleteItem(context.Context, *JSONRequest) (*JSONResponse, error) + RestoreItem(context.Context, *JSONRequest) (*JSONResponse, error) +} + +type UnimplementedMemoryServer struct{} + +func (UnimplementedMemoryServer) Ping(context.Context, *StatusResponse) (*StatusResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") +} +func (UnimplementedMemoryServer) Retrieve(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Retrieve not implemented") +} +func (UnimplementedMemoryServer) ListItems(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ListItems not implemented") +} +func (UnimplementedMemoryServer) GetItem(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetItem not implemented") +} +func (UnimplementedMemoryServer) CreateItem(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CreateItem not implemented") +} +func (UnimplementedMemoryServer) UpdateItem(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpdateItem not implemented") +} +func (UnimplementedMemoryServer) DeleteItem(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method DeleteItem not implemented") +} +func (UnimplementedMemoryServer) RestoreItem(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RestoreItem not implemented") +} + +func RegisterMemoryServer(s grpc.ServiceRegistrar, srv MemoryServer) { + s.RegisterService(&Memory_ServiceDesc, srv) +} + +func _Memory_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StatusResponse) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MemoryServer).Ping(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Memory_Ping_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MemoryServer).Ping(ctx, req.(*StatusResponse)) + } + return interceptor(ctx, in, info, handler) +} + +func _Memory_JSON_Handler(fullMethod string, invoke func(MemoryServer, context.Context, *JSONRequest) (*JSONResponse, error)) grpc.MethodHandler { + return func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(JSONRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return invoke(srv.(MemoryServer), ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: fullMethod} + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return invoke(srv.(MemoryServer), ctx, req.(*JSONRequest)) + } + return interceptor(ctx, in, info, handler) + } +} + +var Memory_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "smartflow.memory.Memory", + HandlerType: (*MemoryServer)(nil), + Methods: []grpc.MethodDesc{ + {MethodName: "Ping", Handler: _Memory_Ping_Handler}, + {MethodName: "Retrieve", Handler: _Memory_JSON_Handler(Memory_Retrieve_FullMethodName, MemoryServer.Retrieve)}, + {MethodName: "ListItems", Handler: _Memory_JSON_Handler(Memory_ListItems_FullMethodName, MemoryServer.ListItems)}, + {MethodName: "GetItem", Handler: _Memory_JSON_Handler(Memory_GetItem_FullMethodName, MemoryServer.GetItem)}, + {MethodName: "CreateItem", Handler: _Memory_JSON_Handler(Memory_CreateItem_FullMethodName, MemoryServer.CreateItem)}, + {MethodName: "UpdateItem", Handler: _Memory_JSON_Handler(Memory_UpdateItem_FullMethodName, MemoryServer.UpdateItem)}, + {MethodName: "DeleteItem", Handler: _Memory_JSON_Handler(Memory_DeleteItem_FullMethodName, MemoryServer.DeleteItem)}, + {MethodName: "RestoreItem", Handler: _Memory_JSON_Handler(Memory_RestoreItem_FullMethodName, MemoryServer.RestoreItem)}, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "services/memory/rpc/memory.proto", +} diff --git a/backend/services/memory/rpc/server.go b/backend/services/memory/rpc/server.go new file mode 100644 index 0000000..1a35fb8 --- /dev/null +++ b/backend/services/memory/rpc/server.go @@ -0,0 +1,60 @@ +package rpc + +import ( + "errors" + "strings" + "time" + + "github.com/LoveLosita/smartflow/backend/services/memory/rpc/pb" + memorysv "github.com/LoveLosita/smartflow/backend/services/memory/sv" + "github.com/zeromicro/go-zero/core/service" + "github.com/zeromicro/go-zero/zrpc" + "google.golang.org/grpc" +) + +const ( + defaultListenOn = "0.0.0.0:9088" + defaultTimeout = 6 * time.Second +) + +type ServerOptions struct { + ListenOn string + Timeout time.Duration + Service *memorysv.Service +} + +// NewServer 创建 memory zrpc 服务端。 +// +// 职责边界: +// 1. 只负责 zrpc server 配置与 gRPC handler 注册; +// 2. 不创建数据库、LLM、RAG、outbox 或 worker,它们由 cmd/memory 管理; +// 3. 返回 listenOn 供进程入口打印启动日志。 +func NewServer(opts ServerOptions) (*zrpc.RpcServer, string, error) { + if opts.Service == nil { + return nil, "", errors.New("memory service dependency not initialized") + } + + listenOn := strings.TrimSpace(opts.ListenOn) + if listenOn == "" { + listenOn = defaultListenOn + } + timeout := opts.Timeout + if timeout <= 0 { + timeout = defaultTimeout + } + + server, err := zrpc.NewServer(zrpc.RpcServerConf{ + ServiceConf: service.ServiceConf{ + Name: "memory.rpc", + Mode: service.DevMode, + }, + ListenOn: listenOn, + Timeout: int64(timeout / time.Millisecond), + }, func(grpcServer *grpc.Server) { + pb.RegisterMemoryServer(grpcServer, NewHandler(opts.Service)) + }) + if err != nil { + return nil, "", err + } + return server, listenOn, nil +} diff --git a/backend/services/memory/sv/service.go b/backend/services/memory/sv/service.go new file mode 100644 index 0000000..640a37e --- /dev/null +++ b/backend/services/memory/sv/service.go @@ -0,0 +1,297 @@ +package sv + +import ( + "context" + "errors" + "log" + + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + memorymodule "github.com/LoveLosita/smartflow/backend/memory" + memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" + coremodel "github.com/LoveLosita/smartflow/backend/model" + eventsvc "github.com/LoveLosita/smartflow/backend/service/events" + memorycontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/memory" +) + +// Service 是 memory 独立进程的服务门面。 +// +// 职责边界: +// 1. 负责持有现有 memory.Module,复用 repo / service / worker / orchestrator 核心逻辑; +// 2. 负责把 memory.extract.requested 注册到 memory 服务自己的 outbox consumer; +// 3. 负责承接 CP2 后 gateway memory 管理流量,但不负责 HTTP 参数绑定、鉴权或幂等。 +type Service struct { + module *memorymodule.Module + eventBus *outboxinfra.EventBus +} + +// Options 描述 memory 服务启动所需依赖。 +type Options struct { + Module *memorymodule.Module + OutboxRepo *outboxinfra.Repository + KafkaConfig kafkabus.Config +} + +// NewService 组装 memory 独立服务。 +// +// 步骤化说明: +// 1. 先校验 Module,保证 memory repo / worker / orchestrator 已由启动层完成装配; +// 2. 再登记 memory.extract.requested -> memory 的服务归属,避免 outbox 路由回落到 agent; +// 3. 最后在 Kafka 开启时创建 memory 服务自己的 EventBus 并注册消费 handler。 +func NewService(opts Options) (*Service, error) { + if opts.Module == nil { + return nil, errors.New("memory module dependency not initialized") + } + + if err := outboxinfra.RegisterEventService(eventsvc.EventTypeMemoryExtractRequested, outboxinfra.ServiceMemory); err != nil { + return nil, err + } + + var eventBus *outboxinfra.EventBus + if opts.OutboxRepo != nil { + bus, err := outboxinfra.NewEventBus(opts.OutboxRepo, opts.KafkaConfig) + if err != nil { + return nil, err + } + eventBus = bus + if eventBus != nil { + if err := eventsvc.RegisterMemoryExtractRequestedHandler(eventBus, opts.OutboxRepo, opts.Module); err != nil { + return nil, err + } + } + } + + return &Service{ + module: opts.Module, + eventBus: eventBus, + }, nil +} + +// Ping 用于 zrpc 启动期健康检查。 +// +// 返回语义: +// 1. nil 表示 memory Module 已完成装配; +// 2. error 表示服务依赖缺失,调用方应认为 memory 服务不可用。 +func (s *Service) Ping(context.Context) error { + if s == nil || s.module == nil { + return errors.New("memory service dependency not initialized") + } + return nil +} + +// Retrieve 读取 agent 主链路后续可注入 prompt 的候选记忆。 +// +// 职责边界: +// 1. 只把跨进程契约转成既有 memory.Module 的读取请求,避免重写召回、门控和降级逻辑; +// 2. 不负责 prompt 拼装、Redis 预取缓存和主链路失败降级,这些仍留在 agent 服务侧; +// 3. 返回字段保持与 ItemView 一致,保证 CP3 只改变进程边界,不改变注入内容语义。 +func (s *Service) Retrieve(ctx context.Context, req memorycontracts.RetrieveRequest) ([]memorycontracts.ItemDTO, error) { + if err := s.ensureModule(); err != nil { + return nil, err + } + items, err := s.module.Retrieve(ctx, memorymodel.RetrieveRequest{ + Query: req.Query, + UserID: req.UserID, + ConversationID: req.ConversationID, + AssistantID: req.AssistantID, + RunID: req.RunID, + MemoryTypes: append([]string(nil), req.MemoryTypes...), + Limit: req.Limit, + Now: req.Now, + }) + if err != nil { + return nil, err + } + return toItemDTOs(items), nil +} + +// ListItems 查询当前用户的记忆管理列表。 +// +// 职责边界: +// 1. 只把跨进程契约转成现有 memory.Module 请求,复用旧管理逻辑; +// 2. 不在服务门面重做 limit/status/type 等业务规则,避免 CP2 改坏既有语义; +// 3. 返回稳定 ItemView,保持 gateway 切流前后的 JSON 字段一致。 +func (s *Service) ListItems(ctx context.Context, req memorycontracts.ListItemsRequest) ([]memorycontracts.ItemView, error) { + if err := s.ensureModule(); err != nil { + return nil, err + } + items, err := s.module.ListItems(ctx, memorymodel.ListItemsRequest{ + UserID: req.UserID, + ConversationID: req.ConversationID, + Statuses: append([]string(nil), req.Statuses...), + MemoryTypes: append([]string(nil), req.MemoryTypes...), + Limit: req.Limit, + }) + if err != nil { + return nil, err + } + return toItemViews(items), nil +} + +// GetItem 返回当前用户自己的单条记忆详情。 +func (s *Service) GetItem(ctx context.Context, req memorycontracts.GetItemRequest) (*memorycontracts.ItemView, error) { + if err := s.ensureModule(); err != nil { + return nil, err + } + item, err := s.module.GetItem(ctx, coremodel.MemoryGetItemRequest{ + UserID: req.UserID, + MemoryID: req.MemoryID, + }) + return toItemViewPtr(item), err +} + +// CreateItem 手动新增一条用户记忆,并沿用既有审计与向量同步逻辑。 +func (s *Service) CreateItem(ctx context.Context, req memorycontracts.CreateItemRequest) (*memorycontracts.ItemView, error) { + if err := s.ensureModule(); err != nil { + return nil, err + } + item, err := s.module.CreateItem(ctx, coremodel.MemoryCreateItemRequest{ + UserID: req.UserID, + ConversationID: req.ConversationID, + AssistantID: req.AssistantID, + RunID: req.RunID, + MemoryType: req.MemoryType, + Title: req.Title, + Content: req.Content, + Confidence: req.Confidence, + Importance: req.Importance, + SensitivityLevel: req.SensitivityLevel, + IsExplicit: req.IsExplicit, + TTLAt: req.TTLAt, + Reason: req.Reason, + OperatorType: req.OperatorType, + }) + return toItemViewPtr(item), err +} + +// UpdateItem 手动修改一条用户记忆,并沿用既有审计与向量同步逻辑。 +func (s *Service) UpdateItem(ctx context.Context, req memorycontracts.UpdateItemRequest) (*memorycontracts.ItemView, error) { + if err := s.ensureModule(); err != nil { + return nil, err + } + item, err := s.module.UpdateItem(ctx, coremodel.MemoryUpdateItemRequest{ + UserID: req.UserID, + MemoryID: req.MemoryID, + MemoryType: req.MemoryType, + Title: req.Title, + Content: req.Content, + Confidence: req.Confidence, + Importance: req.Importance, + SensitivityLevel: req.SensitivityLevel, + IsExplicit: req.IsExplicit, + TTLAt: req.TTLAt, + ClearTTL: req.ClearTTL, + Reason: req.Reason, + OperatorType: req.OperatorType, + }) + return toItemViewPtr(item), err +} + +// DeleteItem 软删除一条记忆,返回删除后的条目视图。 +func (s *Service) DeleteItem(ctx context.Context, req memorycontracts.DeleteItemRequest) (*memorycontracts.ItemView, error) { + if err := s.ensureModule(); err != nil { + return nil, err + } + item, err := s.module.DeleteItem(ctx, coremodel.MemoryDeleteItemRequest{ + UserID: req.UserID, + MemoryID: req.MemoryID, + Reason: req.Reason, + OperatorType: req.OperatorType, + }) + return toItemViewPtr(item), err +} + +// RestoreItem 恢复一条 deleted/archived 记忆,返回恢复后的条目视图。 +func (s *Service) RestoreItem(ctx context.Context, req memorycontracts.RestoreItemRequest) (*memorycontracts.ItemView, error) { + if err := s.ensureModule(); err != nil { + return nil, err + } + item, err := s.module.RestoreItem(ctx, coremodel.MemoryRestoreItemRequest{ + UserID: req.UserID, + MemoryID: req.MemoryID, + Reason: req.Reason, + OperatorType: req.OperatorType, + }) + return toItemViewPtr(item), err +} + +// StartWorkers 启动 memory 服务拥有的后台生命周期。 +// +// 步骤化说明: +// 1. 先启动 memory outbox relay / consumer,让 memory.extract.requested 可以被转成 memory_jobs; +// 2. 再启动 memory worker 轮询 memory_jobs,执行抽取、审计与向量同步; +// 3. Kafka 关闭时 eventBus 为空,只启动本地 worker,保留无 Kafka 环境下的降级能力。 +func (s *Service) StartWorkers(ctx context.Context) { + if s == nil { + return + } + if s.eventBus != nil { + s.eventBus.Start(ctx) + log.Println("Memory outbox consumer started") + } else { + log.Println("Memory outbox consumer is disabled") + } + if s.module != nil { + s.module.StartWorker(ctx) + } +} + +// Close 关闭 memory 服务持有的外部资源。 +func (s *Service) Close() { + if s == nil || s.eventBus == nil { + return + } + s.eventBus.Close() +} + +func (s *Service) ensureModule() error { + if s == nil || s.module == nil { + return errors.New("memory service dependency not initialized") + } + return nil +} + +func toItemViews(items []memorymodel.ItemDTO) []memorycontracts.ItemView { + if len(items) == 0 { + return nil + } + result := make([]memorycontracts.ItemView, 0, len(items)) + for _, item := range items { + result = append(result, toItemView(item)) + } + return result +} + +func toItemDTOs(items []memorymodel.ItemDTO) []memorycontracts.ItemDTO { + return toItemViews(items) +} + +func toItemViewPtr(item *memorymodel.ItemDTO) *memorycontracts.ItemView { + if item == nil { + return nil + } + view := toItemView(*item) + return &view +} + +func toItemView(item memorymodel.ItemDTO) memorycontracts.ItemView { + return memorycontracts.ItemView{ + ID: item.ID, + UserID: item.UserID, + ConversationID: item.ConversationID, + AssistantID: item.AssistantID, + RunID: item.RunID, + MemoryType: item.MemoryType, + Title: item.Title, + Content: item.Content, + ContentHash: item.ContentHash, + Confidence: item.Confidence, + Importance: item.Importance, + SensitivityLevel: item.SensitivityLevel, + IsExplicit: item.IsExplicit, + Status: item.Status, + TTLAt: item.TTLAt, + CreatedAt: item.CreatedAt, + UpdatedAt: item.UpdatedAt, + } +} diff --git a/backend/shared/contracts/memory/types.go b/backend/shared/contracts/memory/types.go new file mode 100644 index 0000000..3780d67 --- /dev/null +++ b/backend/shared/contracts/memory/types.go @@ -0,0 +1,128 @@ +package memory + +import "time" + +// ListItemsRequest 是 gateway 查询记忆管理列表时传给 memory 服务的契约。 +// +// 职责边界: +// 1. UserID 由 gateway 鉴权后补齐,不信任前端传入; +// 2. ConversationID / Statuses / MemoryTypes / Limit 只表达查询条件,不承载过滤策略; +// 3. 具体默认状态、最大 limit 和越权判断仍由 memory 服务内部处理。 +type ListItemsRequest struct { + UserID int `json:"user_id"` + ConversationID string `json:"conversation_id,omitempty"` + Statuses []string `json:"statuses,omitempty"` + MemoryTypes []string `json:"memory_types,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// RetrieveRequest 描述 agent 主链路注入记忆前的跨进程读取请求。 +// +// 职责边界: +// 1. 只表达“按当前用户输入召回候选记忆”所需的最小参数; +// 2. 不承载 prompt 渲染、缓存预取、降级策略,这些仍由 agent 服务负责; +// 3. Now 允许调用方传入统一时间基准,空值时由 memory 服务复用既有默认逻辑。 +type RetrieveRequest struct { + Query string `json:"query,omitempty"` + UserID int `json:"user_id"` + ConversationID string `json:"conversation_id,omitempty"` + AssistantID string `json:"assistant_id,omitempty"` + RunID string `json:"run_id,omitempty"` + MemoryTypes []string `json:"memory_types,omitempty"` + Limit int `json:"limit,omitempty"` + Now time.Time `json:"now,omitempty"` +} + +// GetItemRequest 描述查看当前用户某条记忆的跨进程请求。 +type GetItemRequest struct { + UserID int `json:"user_id"` + MemoryID int64 `json:"memory_id"` +} + +// CreateItemRequest 描述手动新增记忆的跨进程请求。 +// +// 职责边界: +// 1. UserID / OperatorType 由 gateway 填充,前端只提交业务字段; +// 2. Confidence / Importance 等指针字段保留“未传”和“显式零值”的区别; +// 3. 业务校验、审计、向量同步仍归 memory 服务内部负责。 +type CreateItemRequest struct { + UserID int `json:"user_id"` + ConversationID string `json:"conversation_id,omitempty"` + AssistantID string `json:"assistant_id,omitempty"` + RunID string `json:"run_id,omitempty"` + MemoryType string `json:"memory_type"` + Title string `json:"title"` + Content string `json:"content"` + Confidence *float64 `json:"confidence,omitempty"` + Importance *float64 `json:"importance,omitempty"` + SensitivityLevel *int `json:"sensitivity_level,omitempty"` + IsExplicit *bool `json:"is_explicit,omitempty"` + TTLAt *time.Time `json:"ttl_at,omitempty"` + Reason string `json:"reason,omitempty"` + OperatorType string `json:"operator_type,omitempty"` +} + +// UpdateItemRequest 描述手动修改记忆的跨进程请求。 +type UpdateItemRequest struct { + UserID int `json:"user_id"` + MemoryID int64 `json:"memory_id"` + MemoryType *string `json:"memory_type,omitempty"` + Title *string `json:"title,omitempty"` + Content *string `json:"content,omitempty"` + Confidence *float64 `json:"confidence,omitempty"` + Importance *float64 `json:"importance,omitempty"` + SensitivityLevel *int `json:"sensitivity_level,omitempty"` + IsExplicit *bool `json:"is_explicit,omitempty"` + TTLAt *time.Time `json:"ttl_at,omitempty"` + ClearTTL bool `json:"clear_ttl,omitempty"` + Reason string `json:"reason,omitempty"` + OperatorType string `json:"operator_type,omitempty"` +} + +// DeleteItemRequest 描述软删除记忆的跨进程请求。 +type DeleteItemRequest struct { + UserID int `json:"user_id"` + MemoryID int64 `json:"memory_id"` + Reason string `json:"reason,omitempty"` + OperatorType string `json:"operator_type,omitempty"` +} + +// RestoreItemRequest 描述恢复 deleted/archived 记忆的跨进程请求。 +type RestoreItemRequest struct { + UserID int `json:"user_id"` + MemoryID int64 `json:"memory_id"` + Reason string `json:"reason,omitempty"` + OperatorType string `json:"operator_type,omitempty"` +} + +// ItemView 是 memory 管理接口对 gateway 返回的稳定 JSON 视图。 +// +// 职责边界: +// 1. 只保存前端可见字段,不暴露 GORM 字段或内部向量同步状态; +// 2. JSON 字段名保持原 `/api/v1/memory/items` 语义,避免 CP2 切流影响前端; +// 3. 时间字段继续使用 time.Time 指针,由标准 JSON 编码输出 RFC3339。 +type ItemView struct { + ID int64 `json:"id"` + UserID int `json:"user_id"` + ConversationID string `json:"conversation_id,omitempty"` + AssistantID string `json:"assistant_id,omitempty"` + RunID string `json:"run_id,omitempty"` + MemoryType string `json:"memory_type"` + Title string `json:"title"` + Content string `json:"content"` + ContentHash string `json:"content_hash,omitempty"` + Confidence float64 `json:"confidence"` + Importance float64 `json:"importance"` + SensitivityLevel int `json:"sensitivity_level"` + IsExplicit bool `json:"is_explicit"` + Status string `json:"status"` + TTLAt *time.Time `json:"ttl_at,omitempty"` + CreatedAt *time.Time `json:"created_at,omitempty"` + UpdatedAt *time.Time `json:"updated_at,omitempty"` +} + +// ItemDTO 是 agent 读取链路使用的记忆传输视图。 +// +// 迁移期 retrieve 与管理接口共享同一组可传输字段,避免在 contract 层维护两份 +// 形状完全一致的结构;后续若 agent 读取需要隐藏或新增字段,再单独拆出独立 DTO。 +type ItemDTO = ItemView diff --git a/backend/shared/ports/memory.go b/backend/shared/ports/memory.go new file mode 100644 index 0000000..334ae5d --- /dev/null +++ b/backend/shared/ports/memory.go @@ -0,0 +1,33 @@ +package ports + +import ( + "context" + "encoding/json" + + memorycontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/memory" +) + +// MemoryCommandClient 是 gateway 调用 memory 管理服务的最小能力集合。 +// +// 职责边界: +// 1. 只覆盖当前 `/api/v1/memory/items` HTTP 门面需要的管理能力; +// 2. 不暴露 memory repo、worker、orchestrator、向量同步或 outbox consumer; +// 3. 复杂响应以 JSON 透传,避免 gateway 复制 memory 内部 DTO。 +type MemoryCommandClient interface { + ListItems(ctx context.Context, req memorycontracts.ListItemsRequest) (json.RawMessage, error) + GetItem(ctx context.Context, req memorycontracts.GetItemRequest) (json.RawMessage, error) + CreateItem(ctx context.Context, req memorycontracts.CreateItemRequest) (json.RawMessage, error) + UpdateItem(ctx context.Context, req memorycontracts.UpdateItemRequest) (json.RawMessage, error) + DeleteItem(ctx context.Context, req memorycontracts.DeleteItemRequest) (json.RawMessage, error) + RestoreItem(ctx context.Context, req memorycontracts.RestoreItemRequest) (json.RawMessage, error) +} + +// MemoryReaderClient 是 agent 主链路读取 memory zrpc 的最小端口。 +// +// 职责边界: +// 1. 只覆盖 prompt 注入前的候选记忆召回; +// 2. 不暴露管理写接口,避免 agent 侧误拿管理能力做读取以外的事; +// 3. 调用失败由 agent 预取链路软降级,不在端口层吞错。 +type MemoryReaderClient interface { + Retrieve(ctx context.Context, req memorycontracts.RetrieveRequest) ([]memorycontracts.ItemDTO, error) +}