Version: 0.9.78.dev.260506
This commit is contained in:
29
backend/shared/infra/outbox/migration.go
Normal file
29
backend/shared/infra/outbox/migration.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package outbox
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
runtimemodel "github.com/LoveLosita/smartflow/backend/services/runtime/model"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// AutoMigrateServiceTable 按服务目录迁移单个服务拥有的 outbox 表。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 只负责创建或补齐服务级 outbox 物理表,不迁移任何业务表;
|
||||
// 2. table 名统一从 service catalog 解析,避免独立服务和 core 进程各写一份默认值;
|
||||
// 3. 失败时返回带 service/table 的错误,方便启动期直接定位配置漂移。
|
||||
func AutoMigrateServiceTable(db *gorm.DB, serviceName string) error {
|
||||
if db == nil {
|
||||
return fmt.Errorf("auto migrate outbox table failed for %s: db is nil", serviceName)
|
||||
}
|
||||
|
||||
cfg, ok := ResolveServiceConfig(serviceName)
|
||||
if !ok {
|
||||
return fmt.Errorf("resolve outbox config failed for service %s", serviceName)
|
||||
}
|
||||
if err := db.Table(cfg.TableName).AutoMigrate(&runtimemodel.AgentOutboxMessage{}); err != nil {
|
||||
return fmt.Errorf("auto migrate outbox table failed for %s (%s): %w", cfg.Name, cfg.TableName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
103
backend/shared/infra/outbox/repository_publisher.go
Normal file
103
backend/shared/infra/outbox/repository_publisher.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package outbox
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// RepositoryPublisher 只负责把事件写入服务级 outbox 表。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 负责复用 Repository 的 eventType -> service -> table 路由能力写入 outbox;
|
||||
// 2. 不启动 Kafka relay / consumer,也不注册任何 handler;
|
||||
// 3. 适合独立 RPC 服务进程只发布事件、统一由 worker 进程消费的迁移期场景。
|
||||
type RepositoryPublisher struct {
|
||||
repo *Repository
|
||||
maxRetry int
|
||||
}
|
||||
|
||||
// NewRepositoryPublisher 基于 outbox 仓储创建轻量发布器。
|
||||
func NewRepositoryPublisher(repo *Repository, maxRetry int) *RepositoryPublisher {
|
||||
return &RepositoryPublisher{
|
||||
repo: repo,
|
||||
maxRetry: maxRetry,
|
||||
}
|
||||
}
|
||||
|
||||
// Publish 写入统一事件外壳,保持与 Engine.Publish 相同的 outbox payload 格式。
|
||||
//
|
||||
// 步骤说明:
|
||||
// 1. 先校验事件类型和业务 payload,明显坏入参直接返回错误,避免写入不可消费消息;
|
||||
// 2. 再把业务 payload 序列化成 RawMessage,并包进统一事件外壳,保证 worker 解析口径一致;
|
||||
// 3. 最后交给 Repository 按事件路由落表;路由缺失时返回错误,由业务侧决定是否降级。
|
||||
func (p *RepositoryPublisher) Publish(ctx context.Context, req PublishRequest) error {
|
||||
if p == nil || p.repo == nil {
|
||||
return errors.New("outbox repository publisher is nil")
|
||||
}
|
||||
|
||||
eventType := strings.TrimSpace(req.EventType)
|
||||
if eventType == "" {
|
||||
return errors.New("eventType is empty")
|
||||
}
|
||||
if req.Payload == nil {
|
||||
return errors.New("payload is nil")
|
||||
}
|
||||
|
||||
payloadJSON, err := json.Marshal(req.Payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
eventVersion := strings.TrimSpace(req.EventVersion)
|
||||
if eventVersion == "" {
|
||||
eventVersion = DefaultEventVersion
|
||||
}
|
||||
|
||||
eventID := strings.TrimSpace(req.EventID)
|
||||
messageKey := strings.TrimSpace(req.MessageKey)
|
||||
if messageKey == "" {
|
||||
messageKey = eventID
|
||||
}
|
||||
if messageKey == "" {
|
||||
messageKey = eventType
|
||||
}
|
||||
|
||||
aggregateID := strings.TrimSpace(req.AggregateID)
|
||||
if aggregateID == "" {
|
||||
aggregateID = messageKey
|
||||
}
|
||||
|
||||
_, err = p.repo.CreateMessage(ctx, eventType, messageKey, OutboxEventPayload{
|
||||
EventID: eventID,
|
||||
EventType: eventType,
|
||||
EventVersion: eventVersion,
|
||||
AggregateID: aggregateID,
|
||||
Payload: payloadJSON,
|
||||
}, p.maxRetry)
|
||||
return err
|
||||
}
|
||||
|
||||
// PublishWithTx 使用外部事务写入 outbox 消息。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 只把底层 Repository 切到调用方传入的事务句柄,事件外壳和路由逻辑仍复用 Publish;
|
||||
// 2. 不提交或回滚事务,事务生命周期由业务用例控制;
|
||||
// 3. 适合“业务表更新 + outbox 入队”必须原子提交的场景。
|
||||
func (p *RepositoryPublisher) PublishWithTx(ctx context.Context, tx *gorm.DB, req PublishRequest) error {
|
||||
if p == nil || p.repo == nil {
|
||||
return errors.New("outbox repository publisher 未初始化")
|
||||
}
|
||||
if tx == nil {
|
||||
return errors.New("gorm 事务句柄为空")
|
||||
}
|
||||
|
||||
txPublisher := &RepositoryPublisher{
|
||||
repo: p.repo.WithTx(tx),
|
||||
maxRetry: p.maxRetry,
|
||||
}
|
||||
return txPublisher.Publish(ctx, req)
|
||||
}
|
||||
@@ -15,6 +15,8 @@ const (
|
||||
ServiceMemory = "memory"
|
||||
ServiceActiveScheduler = "active-scheduler"
|
||||
ServiceNotification = "notification"
|
||||
ServiceTaskClassForum = "taskclass-forum"
|
||||
ServiceTokenStore = "token-store"
|
||||
)
|
||||
|
||||
// ServiceConfig 描述一个服务级 outbox 的固定归属。
|
||||
@@ -83,6 +85,18 @@ func LoadServiceConfigs() map[string]ServiceConfig {
|
||||
GroupID: "smartflow-notification-outbox-consumer",
|
||||
TableName: "notification_outbox_messages",
|
||||
},
|
||||
ServiceTaskClassForum: {
|
||||
Name: ServiceTaskClassForum,
|
||||
Topic: "smartflow.taskclass-forum.outbox",
|
||||
GroupID: "smartflow-taskclass-forum-outbox-consumer",
|
||||
TableName: "taskclass_forum_outbox_messages",
|
||||
},
|
||||
ServiceTokenStore: {
|
||||
Name: ServiceTokenStore,
|
||||
Topic: "smartflow.token-store.outbox",
|
||||
GroupID: "smartflow-token-store-outbox-consumer",
|
||||
TableName: "token_store_outbox_messages",
|
||||
},
|
||||
}
|
||||
|
||||
for name, entry := range entries {
|
||||
|
||||
@@ -10,6 +10,8 @@ const (
|
||||
ServiceNameMemory = "memory"
|
||||
ServiceNameActiveScheduler = "active-scheduler"
|
||||
ServiceNameNotification = "notification"
|
||||
ServiceNameTaskClassForum = "taskclass-forum"
|
||||
ServiceNameTokenStore = "token-store"
|
||||
)
|
||||
|
||||
// ServiceRoute 描述一个 outbox 服务的终态路由信息。
|
||||
@@ -56,6 +58,18 @@ var builtinServiceRoutes = map[string]ServiceRoute{
|
||||
Topic: "smartflow.notification.outbox",
|
||||
GroupID: "smartflow-notification-outbox-consumer",
|
||||
},
|
||||
ServiceNameTaskClassForum: {
|
||||
ServiceName: ServiceNameTaskClassForum,
|
||||
TableName: "taskclass_forum_outbox_messages",
|
||||
Topic: "smartflow.taskclass-forum.outbox",
|
||||
GroupID: "smartflow-taskclass-forum-outbox-consumer",
|
||||
},
|
||||
ServiceNameTokenStore: {
|
||||
ServiceName: ServiceNameTokenStore,
|
||||
TableName: "token_store_outbox_messages",
|
||||
Topic: "smartflow.token-store.outbox",
|
||||
GroupID: "smartflow-token-store-outbox-consumer",
|
||||
},
|
||||
}
|
||||
|
||||
// DefaultServiceRoutes 返回当前已知服务的默认路由清单。
|
||||
@@ -71,6 +85,8 @@ func DefaultServiceRoutes() []ServiceRoute {
|
||||
builtinServiceRoutes[ServiceNameMemory],
|
||||
builtinServiceRoutes[ServiceNameActiveScheduler],
|
||||
builtinServiceRoutes[ServiceNameNotification],
|
||||
builtinServiceRoutes[ServiceNameTaskClassForum],
|
||||
builtinServiceRoutes[ServiceNameTokenStore],
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user