后端:
1.阶段 6 CP4/CP5 目录收口与共享边界纯化
- 将 backend 根目录收口为 services、client、gateway、cmd、shared 五个一级目录
- 收拢 bootstrap、inits、infra/kafka、infra/outbox、conv、respond、pkg、middleware,移除根目录旧实现与空目录
- 将 utils 下沉到 services/userauth/internal/auth,将 logic 下沉到 services/schedule/core/planning
- 将迁移期 runtime 桥接实现统一收拢到 services/runtime/{conv,dao,eventsvc,model},删除 shared/legacy 与未再被 import 的旧 service 实现
- 将 gateway/shared/respond 收口为 HTTP/Gin 错误写回适配,shared/respond 仅保留共享错误语义与状态映射
- 将 HTTP IdempotencyMiddleware 与 RateLimitMiddleware 收口到 gateway/middleware
- 将 GormCachePlugin 下沉到 shared/infra/gormcache,将共享 RateLimiter 下沉到 shared/infra/ratelimit,将 agent token budget 下沉到 services/agent/shared
- 删除 InitEino 兼容壳,收缩 cmd/internal/coreinit 仅保留旧组合壳残留域初始化语义
- 更新微服务迁移计划与桌面 checklist,补齐 CP4/CP5 当前切流点、目录终态与验证结果
- 完成 go test ./...、git diff --check 与最终真实 smoke;health、register/login、task/create+get、schedule/today、task-class/list、memory/items、agent chat/meta/timeline/context-stats 全部 200,SSE 合并结果为 CP5_OK 且 [DONE] 只有 1 个
298 lines
9.8 KiB
Go
298 lines
9.8 KiB
Go
package sv
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"log"
|
||
|
||
memorymodule "github.com/LoveLosita/smartflow/backend/services/memory"
|
||
memorymodel "github.com/LoveLosita/smartflow/backend/services/memory/model"
|
||
eventsvc "github.com/LoveLosita/smartflow/backend/services/runtime/eventsvc"
|
||
coremodel "github.com/LoveLosita/smartflow/backend/services/runtime/model"
|
||
memorycontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/memory"
|
||
kafkabus "github.com/LoveLosita/smartflow/backend/shared/infra/kafka"
|
||
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
|
||
)
|
||
|
||
// Service 是 memory 独立进程的服务门面。
|
||
//
|
||
// 职责边界:
|
||
// 1. 负责持有现有 memory.Module,复用 repo / service / worker / orchestrator 核心逻辑;
|
||
// 2. 负责把 memory.extract.requested 注册到 memory 服务自己的 outbox consumer;
|
||
// 3. 负责承接 CP2 后 gateway memory 管理流量,但不负责 HTTP 参数绑定、鉴权或幂等。
|
||
type Service struct {
|
||
module *memorymodule.Module
|
||
eventBus *outboxinfra.EventBus
|
||
}
|
||
|
||
// Options 描述 memory 服务启动所需依赖。
|
||
type Options struct {
|
||
Module *memorymodule.Module
|
||
OutboxRepo *outboxinfra.Repository
|
||
KafkaConfig kafkabus.Config
|
||
}
|
||
|
||
// NewService 组装 memory 独立服务。
|
||
//
|
||
// 步骤化说明:
|
||
// 1. 先校验 Module,保证 memory repo / worker / orchestrator 已由启动层完成装配;
|
||
// 2. 再登记 memory.extract.requested -> memory 的服务归属,避免 outbox 路由回落到 agent;
|
||
// 3. 最后在 Kafka 开启时创建 memory 服务自己的 EventBus 并注册消费 handler。
|
||
func NewService(opts Options) (*Service, error) {
|
||
if opts.Module == nil {
|
||
return nil, errors.New("memory module dependency not initialized")
|
||
}
|
||
|
||
if err := outboxinfra.RegisterEventService(eventsvc.EventTypeMemoryExtractRequested, outboxinfra.ServiceMemory); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var eventBus *outboxinfra.EventBus
|
||
if opts.OutboxRepo != nil {
|
||
bus, err := outboxinfra.NewEventBus(opts.OutboxRepo, opts.KafkaConfig)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
eventBus = bus
|
||
if eventBus != nil {
|
||
if err := eventsvc.RegisterMemoryExtractRequestedHandler(eventBus, opts.OutboxRepo, opts.Module); err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
}
|
||
|
||
return &Service{
|
||
module: opts.Module,
|
||
eventBus: eventBus,
|
||
}, nil
|
||
}
|
||
|
||
// Ping 用于 zrpc 启动期健康检查。
|
||
//
|
||
// 返回语义:
|
||
// 1. nil 表示 memory Module 已完成装配;
|
||
// 2. error 表示服务依赖缺失,调用方应认为 memory 服务不可用。
|
||
func (s *Service) Ping(context.Context) error {
|
||
if s == nil || s.module == nil {
|
||
return errors.New("memory service dependency not initialized")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Retrieve 读取 agent 主链路后续可注入 prompt 的候选记忆。
|
||
//
|
||
// 职责边界:
|
||
// 1. 只把跨进程契约转成既有 memory.Module 的读取请求,避免重写召回、门控和降级逻辑;
|
||
// 2. 不负责 prompt 拼装、Redis 预取缓存和主链路失败降级,这些仍留在 agent 服务侧;
|
||
// 3. 返回字段保持与 ItemView 一致,保证 CP3 只改变进程边界,不改变注入内容语义。
|
||
func (s *Service) Retrieve(ctx context.Context, req memorycontracts.RetrieveRequest) ([]memorycontracts.ItemDTO, error) {
|
||
if err := s.ensureModule(); err != nil {
|
||
return nil, err
|
||
}
|
||
items, err := s.module.Retrieve(ctx, memorymodel.RetrieveRequest{
|
||
Query: req.Query,
|
||
UserID: req.UserID,
|
||
ConversationID: req.ConversationID,
|
||
AssistantID: req.AssistantID,
|
||
RunID: req.RunID,
|
||
MemoryTypes: append([]string(nil), req.MemoryTypes...),
|
||
Limit: req.Limit,
|
||
Now: req.Now,
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return toItemDTOs(items), nil
|
||
}
|
||
|
||
// ListItems 查询当前用户的记忆管理列表。
|
||
//
|
||
// 职责边界:
|
||
// 1. 只把跨进程契约转成现有 memory.Module 请求,复用旧管理逻辑;
|
||
// 2. 不在服务门面重做 limit/status/type 等业务规则,避免 CP2 改坏既有语义;
|
||
// 3. 返回稳定 ItemView,保持 gateway 切流前后的 JSON 字段一致。
|
||
func (s *Service) ListItems(ctx context.Context, req memorycontracts.ListItemsRequest) ([]memorycontracts.ItemView, error) {
|
||
if err := s.ensureModule(); err != nil {
|
||
return nil, err
|
||
}
|
||
items, err := s.module.ListItems(ctx, memorymodel.ListItemsRequest{
|
||
UserID: req.UserID,
|
||
ConversationID: req.ConversationID,
|
||
Statuses: append([]string(nil), req.Statuses...),
|
||
MemoryTypes: append([]string(nil), req.MemoryTypes...),
|
||
Limit: req.Limit,
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return toItemViews(items), nil
|
||
}
|
||
|
||
// GetItem 返回当前用户自己的单条记忆详情。
|
||
func (s *Service) GetItem(ctx context.Context, req memorycontracts.GetItemRequest) (*memorycontracts.ItemView, error) {
|
||
if err := s.ensureModule(); err != nil {
|
||
return nil, err
|
||
}
|
||
item, err := s.module.GetItem(ctx, coremodel.MemoryGetItemRequest{
|
||
UserID: req.UserID,
|
||
MemoryID: req.MemoryID,
|
||
})
|
||
return toItemViewPtr(item), err
|
||
}
|
||
|
||
// CreateItem 手动新增一条用户记忆,并沿用既有审计与向量同步逻辑。
|
||
func (s *Service) CreateItem(ctx context.Context, req memorycontracts.CreateItemRequest) (*memorycontracts.ItemView, error) {
|
||
if err := s.ensureModule(); err != nil {
|
||
return nil, err
|
||
}
|
||
item, err := s.module.CreateItem(ctx, coremodel.MemoryCreateItemRequest{
|
||
UserID: req.UserID,
|
||
ConversationID: req.ConversationID,
|
||
AssistantID: req.AssistantID,
|
||
RunID: req.RunID,
|
||
MemoryType: req.MemoryType,
|
||
Title: req.Title,
|
||
Content: req.Content,
|
||
Confidence: req.Confidence,
|
||
Importance: req.Importance,
|
||
SensitivityLevel: req.SensitivityLevel,
|
||
IsExplicit: req.IsExplicit,
|
||
TTLAt: req.TTLAt,
|
||
Reason: req.Reason,
|
||
OperatorType: req.OperatorType,
|
||
})
|
||
return toItemViewPtr(item), err
|
||
}
|
||
|
||
// UpdateItem 手动修改一条用户记忆,并沿用既有审计与向量同步逻辑。
|
||
func (s *Service) UpdateItem(ctx context.Context, req memorycontracts.UpdateItemRequest) (*memorycontracts.ItemView, error) {
|
||
if err := s.ensureModule(); err != nil {
|
||
return nil, err
|
||
}
|
||
item, err := s.module.UpdateItem(ctx, coremodel.MemoryUpdateItemRequest{
|
||
UserID: req.UserID,
|
||
MemoryID: req.MemoryID,
|
||
MemoryType: req.MemoryType,
|
||
Title: req.Title,
|
||
Content: req.Content,
|
||
Confidence: req.Confidence,
|
||
Importance: req.Importance,
|
||
SensitivityLevel: req.SensitivityLevel,
|
||
IsExplicit: req.IsExplicit,
|
||
TTLAt: req.TTLAt,
|
||
ClearTTL: req.ClearTTL,
|
||
Reason: req.Reason,
|
||
OperatorType: req.OperatorType,
|
||
})
|
||
return toItemViewPtr(item), err
|
||
}
|
||
|
||
// DeleteItem 软删除一条记忆,返回删除后的条目视图。
|
||
func (s *Service) DeleteItem(ctx context.Context, req memorycontracts.DeleteItemRequest) (*memorycontracts.ItemView, error) {
|
||
if err := s.ensureModule(); err != nil {
|
||
return nil, err
|
||
}
|
||
item, err := s.module.DeleteItem(ctx, coremodel.MemoryDeleteItemRequest{
|
||
UserID: req.UserID,
|
||
MemoryID: req.MemoryID,
|
||
Reason: req.Reason,
|
||
OperatorType: req.OperatorType,
|
||
})
|
||
return toItemViewPtr(item), err
|
||
}
|
||
|
||
// RestoreItem 恢复一条 deleted/archived 记忆,返回恢复后的条目视图。
|
||
func (s *Service) RestoreItem(ctx context.Context, req memorycontracts.RestoreItemRequest) (*memorycontracts.ItemView, error) {
|
||
if err := s.ensureModule(); err != nil {
|
||
return nil, err
|
||
}
|
||
item, err := s.module.RestoreItem(ctx, coremodel.MemoryRestoreItemRequest{
|
||
UserID: req.UserID,
|
||
MemoryID: req.MemoryID,
|
||
Reason: req.Reason,
|
||
OperatorType: req.OperatorType,
|
||
})
|
||
return toItemViewPtr(item), err
|
||
}
|
||
|
||
// StartWorkers 启动 memory 服务拥有的后台生命周期。
|
||
//
|
||
// 步骤化说明:
|
||
// 1. 先启动 memory outbox relay / consumer,让 memory.extract.requested 可以被转成 memory_jobs;
|
||
// 2. 再启动 memory worker 轮询 memory_jobs,执行抽取、审计与向量同步;
|
||
// 3. Kafka 关闭时 eventBus 为空,只启动本地 worker,保留无 Kafka 环境下的降级能力。
|
||
func (s *Service) StartWorkers(ctx context.Context) {
|
||
if s == nil {
|
||
return
|
||
}
|
||
if s.eventBus != nil {
|
||
s.eventBus.Start(ctx)
|
||
log.Println("Memory outbox consumer started")
|
||
} else {
|
||
log.Println("Memory outbox consumer is disabled")
|
||
}
|
||
if s.module != nil {
|
||
s.module.StartWorker(ctx)
|
||
}
|
||
}
|
||
|
||
// Close 关闭 memory 服务持有的外部资源。
|
||
func (s *Service) Close() {
|
||
if s == nil || s.eventBus == nil {
|
||
return
|
||
}
|
||
s.eventBus.Close()
|
||
}
|
||
|
||
func (s *Service) ensureModule() error {
|
||
if s == nil || s.module == nil {
|
||
return errors.New("memory service dependency not initialized")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func toItemViews(items []memorymodel.ItemDTO) []memorycontracts.ItemView {
|
||
if len(items) == 0 {
|
||
return nil
|
||
}
|
||
result := make([]memorycontracts.ItemView, 0, len(items))
|
||
for _, item := range items {
|
||
result = append(result, toItemView(item))
|
||
}
|
||
return result
|
||
}
|
||
|
||
func toItemDTOs(items []memorymodel.ItemDTO) []memorycontracts.ItemDTO {
|
||
return toItemViews(items)
|
||
}
|
||
|
||
func toItemViewPtr(item *memorymodel.ItemDTO) *memorycontracts.ItemView {
|
||
if item == nil {
|
||
return nil
|
||
}
|
||
view := toItemView(*item)
|
||
return &view
|
||
}
|
||
|
||
func toItemView(item memorymodel.ItemDTO) memorycontracts.ItemView {
|
||
return memorycontracts.ItemView{
|
||
ID: item.ID,
|
||
UserID: item.UserID,
|
||
ConversationID: item.ConversationID,
|
||
AssistantID: item.AssistantID,
|
||
RunID: item.RunID,
|
||
MemoryType: item.MemoryType,
|
||
Title: item.Title,
|
||
Content: item.Content,
|
||
ContentHash: item.ContentHash,
|
||
Confidence: item.Confidence,
|
||
Importance: item.Importance,
|
||
SensitivityLevel: item.SensitivityLevel,
|
||
IsExplicit: item.IsExplicit,
|
||
Status: item.Status,
|
||
TTLAt: item.TTLAt,
|
||
CreatedAt: item.CreatedAt,
|
||
UpdatedAt: item.UpdatedAt,
|
||
}
|
||
}
|