Files
smartmate/backend/service/events/core_outbox_handlers.go
Losita b08ee17893 Version: 0.9.66.dev.260504
后端:
1. 阶段 2 user/auth 服务边界落地,新增 `cmd/userauth` go-zero zrpc 服务、`services/userauth` 核心实现、gateway user API/zrpc client 与 shared contracts/ports,迁移注册、登录、刷新 token、登出、JWT、黑名单和 token 额度治理
2. gateway 与启动装配切流,`cmd/all` 只保留边缘路由、鉴权和轻量组合,通过 userauth zrpc 访问核心用户能力;拆分 MySQL/Redis 初始化与 AutoMigrate 边界,`userauth` 自迁 `users` 和 token 记账幂等表,`all` 不再迁用户表
3. 清退 Gin 单体旧 user/auth DAO、model、service、router、middleware 和 JWT handler,并同步调整 agent/schedule/cache/outbox 相关调用依赖
4. 补齐 refresh token 防并发重放、MySQL 幂等 token 记账、额度 `>=` 拦截和 RPC 错误映射,避免重复记账与内部错误透出

文档:
1. 新增《学习计划论坛与Token商店PRD》
2026-05-04 15:20:47 +08:00

215 lines
7.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package events
import (
"errors"
"github.com/LoveLosita/smartflow/backend/dao"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
"github.com/LoveLosita/smartflow/backend/memory"
"github.com/LoveLosita/smartflow/backend/notification"
sharedevents "github.com/LoveLosita/smartflow/backend/shared/events"
"github.com/LoveLosita/smartflow/backend/shared/ports"
)
// RegisterCoreOutboxHandlers 注册核心业务 outbox handler。
//
// 职责边界:
// 1. 只负责聚合注册当前核心业务 handler便于 start / worker/all 等启动入口复用同一套接线顺序。
// 2. 不负责创建 eventBus/outboxRepo/DAO/memoryModule也不负责启动或关闭事件总线。
// 3. 不改变单个 Register* 函数的职责;具体 payload 解析、幂等消费和业务落库仍由各自 handler 负责。
// 4. 这里以显式 route table 的方式列出“事件类型 -> 服务归属 -> handler”避免后续新增事件时只改启动入口不改接线表。
func RegisterCoreOutboxHandlers(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
adjuster ports.TokenUsageAdjuster,
) error {
if err := validateCoreOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule); err != nil {
return err
}
return registerOutboxHandlerRoutes(coreOutboxHandlerRoutes(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule, adjuster))
}
// RegisterAllOutboxHandlers 注册当前阶段所有 outbox handler。
//
// 职责边界:
// 1. 只负责把 core / active_scheduler / notification 三类路由一次性接线;
// 2. 不负责创建依赖,也不负责启动事件总线;
// 3. 供当前启动流程在“总线启动前”统一完成显式路由注册。
func RegisterAllOutboxHandlers(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
activeTriggerWorkflow ActiveScheduleTriggeredProcessor,
notificationService *notification.NotificationService,
adjuster ports.TokenUsageAdjuster,
) error {
if err := validateAllOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule, activeTriggerWorkflow, notificationService); err != nil {
return err
}
return registerOutboxHandlerRoutes(allOutboxHandlerRoutes(
eventBus,
outboxRepo,
repoManager,
agentRepo,
cacheRepo,
memoryModule,
activeTriggerWorkflow,
notificationService,
adjuster,
))
}
// validateCoreOutboxHandlerDeps 校验核心 outbox handler 聚合注册所需依赖。
//
// 职责边界:
// 1. 只做 nil 校验不做数据库、Redis、Kafka 连通性探测,避免注册函数承担启动健康检查职责。
// 2. 返回 error 表示依赖缺失;返回 nil 表示可以安全进入逐项注册流程。
func validateCoreOutboxHandlerDeps(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
) error {
if eventBus == nil {
return errors.New("event bus is nil")
}
if outboxRepo == nil {
return errors.New("outbox repository is nil")
}
if repoManager == nil {
return errors.New("repo manager is nil")
}
if agentRepo == nil {
return errors.New("agent repo is nil")
}
if cacheRepo == nil {
return errors.New("cache repo is nil")
}
if memoryModule == nil {
return errors.New("memory module is nil")
}
return nil
}
// validateAllOutboxHandlerDeps 在核心依赖基础上,额外校验 active_scheduler 和 notification 相关依赖。
func validateAllOutboxHandlerDeps(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
activeTriggerWorkflow ActiveScheduleTriggeredProcessor,
notificationService *notification.NotificationService,
) error {
if err := validateCoreOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule); err != nil {
return err
}
if activeTriggerWorkflow == nil {
return errors.New("active schedule triggered processor is nil")
}
if notificationService == nil {
return errors.New("notification service is nil")
}
return nil
}
// coreOutboxHandlerRoutes 只描述 core 阶段的 outbox 路由。
func coreOutboxHandlerRoutes(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
adjuster ports.TokenUsageAdjuster,
) []outboxHandlerRoute {
return []outboxHandlerRoute{
{
EventType: EventTypeChatHistoryPersistRequested,
Service: outboxHandlerServiceAgent,
Register: func() error {
return RegisterChatHistoryPersistHandler(eventBus, outboxRepo, repoManager, adjuster)
},
},
{
EventType: EventTypeTaskUrgencyPromoteRequested,
Service: outboxHandlerServiceTask,
Register: func() error {
return RegisterTaskUrgencyPromoteHandler(eventBus, outboxRepo, repoManager)
},
},
{
EventType: EventTypeChatTokenUsageAdjustRequested,
Service: outboxHandlerServiceAgent,
Register: func() error {
return RegisterChatTokenUsageAdjustHandler(eventBus, outboxRepo, repoManager, adjuster)
},
},
{
EventType: EventTypeAgentStateSnapshotPersist,
Service: outboxHandlerServiceAgent,
Register: func() error {
return RegisterAgentStateSnapshotHandler(eventBus, outboxRepo, repoManager)
},
},
{
EventType: EventTypeAgentTimelinePersistRequested,
Service: outboxHandlerServiceAgent,
Register: func() error {
return RegisterAgentTimelinePersistHandler(eventBus, outboxRepo, agentRepo, cacheRepo)
},
},
{
EventType: EventTypeMemoryExtractRequested,
Service: outboxHandlerServiceMemory,
Register: func() error {
return RegisterMemoryExtractRequestedHandler(eventBus, outboxRepo, memoryModule)
},
},
}
}
// allOutboxHandlerRoutes 把当前阶段所有 outbox 路由一次性展开,供启动入口统一接线。
func allOutboxHandlerRoutes(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
activeTriggerWorkflow ActiveScheduleTriggeredProcessor,
notificationService *notification.NotificationService,
adjuster ports.TokenUsageAdjuster,
) []outboxHandlerRoute {
routes := coreOutboxHandlerRoutes(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule, adjuster)
routes = append(routes,
outboxHandlerRoute{
EventType: sharedevents.ActiveScheduleTriggeredEventType,
Service: outboxHandlerServiceActiveScheduler,
Register: func() error {
return RegisterActiveScheduleTriggeredHandler(eventBus, outboxRepo, activeTriggerWorkflow)
},
},
outboxHandlerRoute{
EventType: sharedevents.NotificationFeishuRequestedEventType,
Service: outboxHandlerServiceNotification,
Register: func() error {
return RegisterFeishuNotificationHandler(eventBus, outboxRepo, notificationService)
},
},
)
return routes
}