Files
Losita 3b6fca44a6 Version: 0.9.77.dev.260505
后端:
1.阶段 6 CP4/CP5 目录收口与共享边界纯化
- 将 backend 根目录收口为 services、client、gateway、cmd、shared 五个一级目录
- 收拢 bootstrap、inits、infra/kafka、infra/outbox、conv、respond、pkg、middleware,移除根目录旧实现与空目录
- 将 utils 下沉到 services/userauth/internal/auth,将 logic 下沉到 services/schedule/core/planning
- 将迁移期 runtime 桥接实现统一收拢到 services/runtime/{conv,dao,eventsvc,model},删除 shared/legacy 与未再被 import 的旧 service 实现
- 将 gateway/shared/respond 收口为 HTTP/Gin 错误写回适配,shared/respond 仅保留共享错误语义与状态映射
- 将 HTTP IdempotencyMiddleware 与 RateLimitMiddleware 收口到 gateway/middleware
- 将 GormCachePlugin 下沉到 shared/infra/gormcache,将共享 RateLimiter 下沉到 shared/infra/ratelimit,将 agent token budget 下沉到 services/agent/shared
- 删除 InitEino 兼容壳,收缩 cmd/internal/coreinit 仅保留旧组合壳残留域初始化语义
- 更新微服务迁移计划与桌面 checklist,补齐 CP4/CP5 当前切流点、目录终态与验证结果
- 完成 go test ./...、git diff --check 与最终真实 smoke;health、register/login、task/create+get、schedule/today、task-class/list、memory/items、agent chat/meta/timeline/context-stats 全部 200,SSE 合并结果为 CP5_OK 且 [DONE] 只有 1 个
2026-05-05 23:25:07 +08:00

215 lines
6.3 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package eventsvc
import (
"context"
"errors"
"fmt"
"sort"
"strings"
kafkabus "github.com/LoveLosita/smartflow/backend/shared/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
)
// OutboxBus 是启动侧和业务侧共享的 outbox 门面。
//
// 职责边界:
// 1. 对上只暴露 Publish / RegisterEventHandler / Start / Close 这四个能力;
// 2. 对内可以按 service 维度路由到多个底层 engine
// 3. 不承载任何业务处理逻辑只做事件归属、topic/group 和 engine 选择。
type OutboxBus interface {
outboxinfra.EventPublisher
RegisterEventHandler(eventType string, handler outboxinfra.MessageHandler) error
Start(ctx context.Context)
Close()
}
type routedOutboxBus struct {
buses map[string]OutboxBus
}
// NewRoutedOutboxBus 把多个 service 级 outbox bus 组装成一个门面。
//
// 1. 这里不创建底层 engine只接收上层已经建好的 service bus
// 2. 事件发布和 handler 注册都按事件归属路由到对应 service
// 3. 任一 service bus 缺失时直接报错,避免静默回落到共享 topic。
func NewRoutedOutboxBus(buses map[string]OutboxBus) OutboxBus {
normalized := make(map[string]OutboxBus, len(buses))
for serviceName, bus := range buses {
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" || bus == nil {
continue
}
normalized[serviceName] = bus
}
if len(normalized) == 0 {
return nil
}
return &routedOutboxBus{buses: normalized}
}
// NewServiceOutboxBus 基于 service 级 topic / group 创建底层 outbox engine。
//
// 1. topic / group 由 service 名称推导,不再要求调用方显式传入共享 topic
// 2. kafka 未启用时返回 nil调用侧可以继续走同步降级路径
// 3. 这里不注册 handler注册仍由启动侧统一完成。
func NewServiceOutboxBus(repo *outboxinfra.Repository, baseCfg kafkabus.Config, serviceName string) (OutboxBus, error) {
if repo == nil {
return nil, errors.New("outbox repository is nil")
}
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" {
return nil, errors.New("serviceName is empty")
}
route, _ := outboxinfra.ResolveServiceRoute(serviceName)
cfg := baseCfg
cfg.Topic = strings.TrimSpace(route.Topic)
cfg.GroupID = strings.TrimSpace(route.GroupID)
cfg.ServiceName = strings.TrimSpace(route.ServiceName)
if cfg.ServiceName == "" {
cfg.ServiceName = serviceName
}
bus, err := outboxinfra.NewEventBus(repo.WithRoute(route), cfg)
if err != nil {
return nil, err
}
if bus == nil {
return nil, nil
}
return bus, nil
}
func (b *routedOutboxBus) Publish(ctx context.Context, req outboxinfra.PublishRequest) error {
serviceBus, err := b.resolveBusByEventType(req.EventType)
if err != nil {
return err
}
return serviceBus.Publish(ctx, req)
}
func (b *routedOutboxBus) RegisterEventHandler(eventType string, handler outboxinfra.MessageHandler) error {
if handler == nil {
return errors.New("handler is nil")
}
serviceBus, err := b.resolveBusByEventType(eventType)
if err != nil {
return err
}
return serviceBus.RegisterEventHandler(eventType, handler)
}
func (b *routedOutboxBus) Start(ctx context.Context) {
if b == nil {
return
}
for _, serviceName := range orderedOutboxServiceNames(b.buses) {
b.buses[serviceName].Start(ctx)
}
}
func (b *routedOutboxBus) Close() {
if b == nil {
return
}
for _, serviceName := range orderedOutboxServiceNames(b.buses) {
b.buses[serviceName].Close()
}
}
func (b *routedOutboxBus) resolveBusByEventType(eventType string) (OutboxBus, error) {
if b == nil {
return nil, errors.New("outbox bus is not initialized")
}
eventType = strings.TrimSpace(eventType)
if eventType == "" {
return nil, errors.New("eventType is empty")
}
serviceName, ok := outboxinfra.ResolveEventService(eventType)
if !ok {
return nil, fmt.Errorf("outbox route not registered: eventType=%s", eventType)
}
serviceBus, ok := b.buses[strings.TrimSpace(serviceName)]
if !ok || serviceBus == nil {
return nil, fmt.Errorf("service outbox bus is missing: service=%s eventType=%s", serviceName, eventType)
}
return serviceBus, nil
}
func orderedOutboxServiceNames(buses map[string]OutboxBus) []string {
ordered := make([]string, 0, len(buses))
seen := make(map[string]struct{}, len(buses))
for _, serviceName := range OutboxServiceNames() {
if _, ok := buses[serviceName]; ok {
ordered = append(ordered, serviceName)
seen[serviceName] = struct{}{}
}
}
extras := make([]string, 0)
for serviceName := range buses {
if _, ok := seen[serviceName]; ok {
continue
}
extras = append(extras, serviceName)
}
sort.Strings(extras)
return append(ordered, extras...)
}
// OutboxServiceNames 返回当前阶段启用的 service 级 outbox 名称。
func OutboxServiceNames() []string {
return []string{
string(outboxHandlerServiceAgent),
string(outboxHandlerServiceMemory),
}
}
// ResolveOutboxTopicForService 把 service 名称映射成独立 Kafka topic。
//
// 1. 这里保留现在的命名风格smartflow.<service>.outbox
// 2. 空 service 只作为兜底,不作为主路径;
// 3. 调用侧不再传共享 topic避免入口继续依赖旧结构。
func ResolveOutboxTopicForService(serviceName string) string {
route, _ := outboxinfra.ResolveServiceRoute(serviceName)
if topic := strings.TrimSpace(route.Topic); topic != "" {
return topic
}
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" {
return kafkabus.DefaultTopic
}
return "smartflow." + serviceName + ".outbox"
}
// ResolveOutboxGroupForService 把 service 名称映射成独立 Kafka group。
func ResolveOutboxGroupForService(serviceName string) string {
route, _ := outboxinfra.ResolveServiceRoute(serviceName)
if groupID := strings.TrimSpace(route.GroupID); groupID != "" {
return groupID
}
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" {
return kafkabus.DefaultGroup
}
return "smartflow-" + serviceName + "-outbox-consumer"
}
// ResolveOutboxTopicForEvent 根据事件归属 service 计算 topic。
func ResolveOutboxTopicForEvent(eventType string) (string, error) {
route, ok := outboxinfra.ResolveEventRoute(eventType)
if !ok {
return "", fmt.Errorf("outbox route not registered: eventType=%s", strings.TrimSpace(eventType))
}
if topic := strings.TrimSpace(route.Topic); topic != "" {
return topic, nil
}
return ResolveOutboxTopicForService(route.ServiceName), nil
}