Version: 0.9.77.dev.260505

后端:
1.阶段 6 CP4/CP5 目录收口与共享边界纯化
- 将 backend 根目录收口为 services、client、gateway、cmd、shared 五个一级目录
- 收拢 bootstrap、inits、infra/kafka、infra/outbox、conv、respond、pkg、middleware,移除根目录旧实现与空目录
- 将 utils 下沉到 services/userauth/internal/auth,将 logic 下沉到 services/schedule/core/planning
- 将迁移期 runtime 桥接实现统一收拢到 services/runtime/{conv,dao,eventsvc,model},删除 shared/legacy 与未再被 import 的旧 service 实现
- 将 gateway/shared/respond 收口为 HTTP/Gin 错误写回适配,shared/respond 仅保留共享错误语义与状态映射
- 将 HTTP IdempotencyMiddleware 与 RateLimitMiddleware 收口到 gateway/middleware
- 将 GormCachePlugin 下沉到 shared/infra/gormcache,将共享 RateLimiter 下沉到 shared/infra/ratelimit,将 agent token budget 下沉到 services/agent/shared
- 删除 InitEino 兼容壳,收缩 cmd/internal/coreinit 仅保留旧组合壳残留域初始化语义
- 更新微服务迁移计划与桌面 checklist,补齐 CP4/CP5 当前切流点、目录终态与验证结果
- 完成 go test ./...、git diff --check 与最终真实 smoke;health、register/login、task/create+get、schedule/today、task-class/list、memory/items、agent chat/meta/timeline/context-stats 全部 200,SSE 合并结果为 CP5_OK 且 [DONE] 只有 1 个
This commit is contained in:
Losita
2026-05-05 23:25:07 +08:00
parent 2a96f4c6f9
commit 3b6fca44a6
226 changed files with 731 additions and 3497 deletions

View File

@@ -0,0 +1,85 @@
package eventsvc
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
sharedevents "github.com/LoveLosita/smartflow/backend/shared/events"
kafkabus "github.com/LoveLosita/smartflow/backend/shared/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
"gorm.io/gorm"
)
// ActiveScheduleTriggeredProcessor 描述 active_schedule.triggered worker 真正执行业务所需的最小能力。
//
// 职责边界:
// 1. ProcessTriggeredInTx 负责事务内的 trigger -> preview -> notification 编排;
// 2. MarkTriggerFailedBestEffort 负责事务外的失败回写,避免 outbox retry 前完全没有业务态可查;
// 3. 接口本身不限定具体实现,便于迁移期由 active_scheduler 模块独立演进。
type ActiveScheduleTriggeredProcessor interface {
ProcessTriggeredInTx(ctx context.Context, tx *gorm.DB, payload sharedevents.ActiveScheduleTriggeredPayload) error
MarkTriggerFailedBestEffort(ctx context.Context, triggerID string, err error)
}
// RegisterActiveScheduleTriggeredHandler 注册 active_schedule.triggered outbox handler。
//
// 步骤化说明:
// 1. 先做 envelope -> contract DTO 解析与版本校验,明显坏消息直接标记 dead
// 2. 再通过 ConsumeAndMarkConsumed 把“业务落库 + consumed 推进”收敛在同一事务里;
// 3. 若事务返回 error则 best-effort 回写 trigger failed并把错误交给 outbox 做 retry
// 4. 这里不直接 import active_scheduler 的具体实现,避免 service/events 和业务编排层互相反向耦合。
func RegisterActiveScheduleTriggeredHandler(
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
processor ActiveScheduleTriggeredProcessor,
) error {
if bus == nil {
return errors.New("event bus is nil")
}
if outboxRepo == nil {
return errors.New("outbox repository is nil")
}
if processor == nil {
return errors.New("active schedule triggered processor is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, sharedevents.ActiveScheduleTriggeredEventType)
if err != nil {
return err
}
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
if !isAllowedTriggeredEventVersion(envelope.EventVersion) {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, fmt.Sprintf("active_schedule.triggered 版本不受支持: %s", envelope.EventVersion))
return nil
}
var payload sharedevents.ActiveScheduleTriggeredPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析 active_schedule.triggered 载荷失败: "+unmarshalErr.Error())
return nil
}
if validateErr := payload.Validate(); validateErr != nil {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "active_schedule.triggered 载荷非法: "+validateErr.Error())
return nil
}
err := eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
return processor.ProcessTriggeredInTx(ctx, tx, payload)
})
if err != nil {
processor.MarkTriggerFailedBestEffort(ctx, payload.TriggerID, err)
return err
}
return nil
}
return bus.RegisterEventHandler(sharedevents.ActiveScheduleTriggeredEventType, handler)
}
func isAllowedTriggeredEventVersion(version string) bool {
version = strings.TrimSpace(version)
return version == "" || version == sharedevents.ActiveScheduleTriggeredEventVersion
}

View File

@@ -0,0 +1,130 @@
package eventsvc
import (
"context"
"encoding/json"
"errors"
"log"
agentmodel "github.com/LoveLosita/smartflow/backend/services/agent/model"
"github.com/LoveLosita/smartflow/backend/services/runtime/dao"
"github.com/LoveLosita/smartflow/backend/services/runtime/model"
kafkabus "github.com/LoveLosita/smartflow/backend/shared/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
const (
// EventTypeAgentStateSnapshotPersist 是"agent 状态快照持久化"的业务事件类型。
EventTypeAgentStateSnapshotPersist = "agent.state.snapshot.persist"
)
// AgentStateSnapshotPayload 是 outbox 事件的业务载荷。
type AgentStateSnapshotPayload struct {
ConversationID string `json:"conversation_id"`
UserID int `json:"user_id"`
Phase string `json:"phase"`
SnapshotJSON string `json:"snapshot_json"`
}
// RegisterAgentStateSnapshotHandler 注册"agent 状态快照持久化"消费者处理器。
//
// 职责边界:
// 1. 只负责快照写入 agent_state_snapshot_records 表;
// 2. 使用 upsert 语义,同一 conversation_id 只保留最新快照;
// 3. 通过 outbox 通用消费事务保证"业务写入 + consumed 推进"原子一致。
func RegisterAgentStateSnapshotHandler(
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
) error {
if bus == nil {
return errors.New("event bus is nil")
}
if outboxRepo == nil {
return errors.New("outbox repository is nil")
}
if repoManager == nil {
return errors.New("repo manager is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeAgentStateSnapshotPersist)
if err != nil {
return err
}
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
var payload AgentStateSnapshotPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析快照载荷失败: "+unmarshalErr.Error())
return nil
}
return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
record := model.AgentStateSnapshotRecord{
ConversationID: payload.ConversationID,
UserID: payload.UserID,
Phase: payload.Phase,
SnapshotJSON: payload.SnapshotJSON,
}
return tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "conversation_id"}},
DoUpdates: clause.AssignmentColumns([]string{"user_id", "phase", "snapshot_json", "updated_at"}),
}).Create(&record).Error
})
}
return bus.RegisterEventHandler(EventTypeAgentStateSnapshotPersist, handler)
}
// PublishAgentStateSnapshot 发布"agent 状态快照持久化"事件到 outbox。
//
// 设计说明:
// 1. 将快照 JSON 序列化后通过 outbox 异步写入 MySQL
// 2. publisher 为 nil 时静默降级Kafka 未启用场景);
// 3. 发布失败只记日志,不中断主流程。
func PublishAgentStateSnapshot(
ctx context.Context,
publisher outboxinfra.EventPublisher,
snapshot *agentmodel.AgentStateSnapshot,
conversationID string,
userID int,
) {
if publisher == nil {
return
}
if snapshot == nil {
return
}
snapshotJSON, err := json.Marshal(snapshot)
if err != nil {
log.Printf("[WARN] 序列化 agent 状态快照失败 chat=%s: %v", conversationID, err)
return
}
phase := ""
if snapshot.RuntimeState != nil {
cs := snapshot.RuntimeState.EnsureCommonState()
if cs != nil {
phase = string(cs.Phase)
}
}
payload := AgentStateSnapshotPayload{
ConversationID: conversationID,
UserID: userID,
Phase: phase,
SnapshotJSON: string(snapshotJSON),
}
if err := publisher.Publish(ctx, outboxinfra.PublishRequest{
EventType: EventTypeAgentStateSnapshotPersist,
EventVersion: outboxinfra.DefaultEventVersion,
MessageKey: conversationID,
AggregateID: conversationID,
Payload: payload,
}); err != nil {
log.Printf("[WARN] 发布 agent 状态快照事件失败 chat=%s: %v", conversationID, err)
}
}

View File

@@ -0,0 +1,330 @@
package eventsvc
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"strings"
"github.com/LoveLosita/smartflow/backend/services/runtime/dao"
"github.com/LoveLosita/smartflow/backend/services/runtime/model"
kafkabus "github.com/LoveLosita/smartflow/backend/shared/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
"gorm.io/gorm"
)
const EventTypeAgentTimelinePersistRequested = "agent.timeline.persist.requested"
// RegisterAgentTimelinePersistHandler 注册“会话时间线持久化”消费者处理器。
//
// 职责边界:
// 1. 只负责 timeline 事件,不处理 chat_history 等其他业务消息;
// 2. 只负责注册 handler不负责总线启停
// 3. 通过 outbox 通用消费事务,把“时间线写库 + consumed 推进”放进同一事务;
// 4. 若遇到 seq 唯一键冲突,会先判定是否属于重放幂等,再决定是否补新 seq 并回填 Redis。
func RegisterAgentTimelinePersistHandler(
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
agentRepo *dao.AgentDAO,
cacheDAO *dao.CacheDAO,
) error {
// 1. 依赖校验:缺少任一关键依赖都无法安全消费消息。
if bus == nil {
return errors.New("event bus is nil")
}
if outboxRepo == nil {
return errors.New("outbox repository is nil")
}
if agentRepo == nil {
return errors.New("agent repo is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeAgentTimelinePersistRequested)
if err != nil {
return err
}
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
var payload model.ChatTimelinePersistPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
// 1. payload 无法反序列化属于不可恢复错误,直接标 dead避免无意义重试。
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析时间线持久化载荷失败: "+unmarshalErr.Error())
return nil
}
payload = payload.Normalize()
if !payload.HasValidIdentity() {
// 2. 这里只校验“能否唯一定位一条 timeline 记录”的最小字段集合。
// 3. content / payload_json 是否为空由事件类型自行决定,不在这里一刀切限制。
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "时间线持久化载荷非法: user_id/conversation_id/seq/kind 非法")
return nil
}
refreshCache := false
finalSeq := payload.Seq
// 4. 统一走 outbox 消费事务入口,保证“业务写入成功 -> consumed”原子一致。
err := eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
finalPayload, repaired, persistErr := persistConversationTimelineEventInTx(ctx, tx, agentRepo.WithTx(tx), payload)
if persistErr != nil {
return persistErr
}
refreshCache = repaired
finalSeq = finalPayload.Seq
return nil
})
if err != nil {
return err
}
// 5. 只有发生“seq 冲突且补了新 seq”时才需要重建 Redis timeline。
// 5.1 原因:主链路已经先写过 Redis常规成功无需重复回写。
// 5.2 若发生补 seq不重建会留下旧 seq 的缓存残影,刷新后顺序会错乱。
// 5.3 缓存重建失败只记日志,不能反向把已 consumed 的 outbox 回滚。
if refreshCache {
if refreshErr := rebuildConversationTimelineCache(ctx, agentRepo, cacheDAO, payload.UserID, payload.ConversationID, finalSeq); refreshErr != nil {
log.Printf("重建时间线缓存失败 user=%d chat=%s seq=%d err=%v", payload.UserID, payload.ConversationID, finalSeq, refreshErr)
}
}
return nil
}
return bus.RegisterEventHandler(EventTypeAgentTimelinePersistRequested, handler)
}
// PublishAgentTimelinePersistRequested 发布“会话时间线持久化请求”事件。
//
// 设计目的:
// 1. 让业务层只传 DTO不重复拼事件元数据
// 2. 统一以 conversation_id 作为 MessageKey / AggregateID尽量降低同会话乱序概率
// 3. 发布失败显式返回 error由调用方决定是否中断主链路。
func PublishAgentTimelinePersistRequested(
ctx context.Context,
publisher outboxinfra.EventPublisher,
payload model.ChatTimelinePersistPayload,
) error {
if publisher == nil {
return errors.New("event publisher is nil")
}
payload = payload.Normalize()
if !payload.HasValidIdentity() {
return errors.New("invalid timeline persist payload")
}
return publisher.Publish(ctx, outboxinfra.PublishRequest{
EventType: EventTypeAgentTimelinePersistRequested,
EventVersion: outboxinfra.DefaultEventVersion,
MessageKey: payload.ConversationID,
AggregateID: payload.ConversationID,
Payload: payload,
})
}
// persistConversationTimelineEventInTx 负责在单个事务里完成 timeline 事件写库。
//
// 步骤化说明:
// 1. 先按 payload 原始 seq 尝试写入;
// 2. 若命中 seq 唯一键冲突,先查询同 seq 记录,判断是否属于“重放同一事件”;
// 3. 若不是重放,而是 Redis seq 漂移导致的新旧事件撞 seq则用 max(seq)+1 重新分配;
// 4. 最多修复 3 次,避免异常数据把消费者拖进无限循环。
func persistConversationTimelineEventInTx(
ctx context.Context,
tx *gorm.DB,
agentRepo *dao.AgentDAO,
payload model.ChatTimelinePersistPayload,
) (model.ChatTimelinePersistPayload, bool, error) {
if tx == nil {
return payload, false, errors.New("transaction is nil")
}
if agentRepo == nil {
return payload, false, errors.New("agent repo is nil")
}
working := payload.Normalize()
repaired := false
for attempt := 0; attempt < 3; attempt++ {
if _, _, err := agentRepo.SaveConversationTimelineEvent(ctx, working); err == nil {
return working, repaired, nil
} else if !model.IsTimelineSeqConflictError(err) {
return working, repaired, err
}
// 1. 先判断是否属于“同一条事件被重复消费”。
// 2. 若库里已有记录且字段完全一致,说明前一次其实已经成功落库,本次可视为幂等成功。
// 3. 若字段不一致,再进入“补新 seq”分支避免把真正的新事件吞掉。
existing, findErr := findConversationTimelineEventBySeq(ctx, tx, working.UserID, working.ConversationID, working.Seq)
if findErr == nil && working.MatchesStoredEvent(existing) {
return working, repaired, nil
}
if findErr != nil && !errors.Is(findErr, gorm.ErrRecordNotFound) {
return working, repaired, findErr
}
maxSeq, maxErr := loadConversationTimelineMaxSeq(ctx, tx, working.UserID, working.ConversationID)
if maxErr != nil {
return working, repaired, maxErr
}
working.Seq = maxSeq + 1
repaired = true
}
return working, repaired, fmt.Errorf("timeline seq repair exceeded limit user=%d chat=%s", working.UserID, working.ConversationID)
}
func findConversationTimelineEventBySeq(
ctx context.Context,
tx *gorm.DB,
userID int,
conversationID string,
seq int64,
) (model.AgentTimelineEvent, error) {
var event model.AgentTimelineEvent
err := tx.WithContext(ctx).
Where("user_id = ? AND chat_id = ? AND seq = ?", userID, strings.TrimSpace(conversationID), seq).
Take(&event).Error
return event, err
}
func loadConversationTimelineMaxSeq(
ctx context.Context,
tx *gorm.DB,
userID int,
conversationID string,
) (int64, error) {
var maxSeq int64
err := tx.WithContext(ctx).
Model(&model.AgentTimelineEvent{}).
Where("user_id = ? AND chat_id = ?", userID, strings.TrimSpace(conversationID)).
Select("COALESCE(MAX(seq), 0)").
Scan(&maxSeq).Error
if err != nil {
return 0, err
}
return maxSeq, nil
}
// rebuildConversationTimelineCache 在“补新 seq”后重建 Redis timeline 缓存。
//
// 说明:
// 1. 这里只在缓存存在时执行;未接 Redis 的环境直接跳过即可;
// 2. 需要整表重建而不是只 append 一条,因为旧缓存里已经存在错误 seq 的事件;
// 3. 这里不抽到 agent/sv 复用,是因为 events 不能反向依赖 service否则会形成循环依赖。
func rebuildConversationTimelineCache(
ctx context.Context,
agentRepo *dao.AgentDAO,
cacheDAO *dao.CacheDAO,
userID int,
conversationID string,
finalSeq int64,
) error {
if cacheDAO == nil || agentRepo == nil {
return nil
}
events, err := agentRepo.ListConversationTimelineEvents(ctx, userID, conversationID)
if err != nil {
return err
}
items := buildConversationTimelineCacheItems(events)
if err = cacheDAO.SetConversationTimelineToCache(ctx, userID, conversationID, items); err != nil {
return err
}
if len(items) > 0 {
finalSeq = items[len(items)-1].Seq
}
return cacheDAO.SetConversationTimelineSeq(ctx, userID, conversationID, finalSeq)
}
func buildConversationTimelineCacheItems(events []model.AgentTimelineEvent) []model.GetConversationTimelineItem {
if len(events) == 0 {
return make([]model.GetConversationTimelineItem, 0)
}
items := make([]model.GetConversationTimelineItem, 0, len(events))
for _, event := range events {
item := model.GetConversationTimelineItem{
ID: event.ID,
Seq: event.Seq,
Kind: strings.TrimSpace(event.Kind),
TokensConsumed: event.TokensConsumed,
CreatedAt: event.CreatedAt,
}
if event.Role != nil {
item.Role = strings.TrimSpace(*event.Role)
}
if event.Content != nil {
item.Content = strings.TrimSpace(*event.Content)
}
if event.Payload != nil {
var payload map[string]any
if err := json.Unmarshal([]byte(strings.TrimSpace(*event.Payload)), &payload); err == nil && len(payload) > 0 {
item.Payload = payload
}
}
items = append(items, item)
}
return normalizeConversationTimelineCacheItems(items)
}
func normalizeConversationTimelineCacheItems(items []model.GetConversationTimelineItem) []model.GetConversationTimelineItem {
if len(items) == 0 {
return make([]model.GetConversationTimelineItem, 0)
}
normalized := make([]model.GetConversationTimelineItem, 0, len(items))
for _, item := range items {
role := strings.ToLower(strings.TrimSpace(item.Role))
kind := canonicalizeConversationTimelineKind(item.Kind, role)
if kind == "" {
switch role {
case "user":
kind = model.AgentTimelineKindUserText
case "assistant":
kind = model.AgentTimelineKindAssistantText
}
}
if role == "" {
switch kind {
case model.AgentTimelineKindUserText:
role = "user"
case model.AgentTimelineKindAssistantText:
role = "assistant"
}
}
item.Kind = kind
item.Role = role
normalized = append(normalized, item)
}
return normalized
}
func canonicalizeConversationTimelineKind(kind string, role string) string {
normalizedKind := strings.ToLower(strings.TrimSpace(kind))
normalizedRole := strings.ToLower(strings.TrimSpace(role))
switch normalizedKind {
case model.AgentTimelineKindUserText,
model.AgentTimelineKindAssistantText,
model.AgentTimelineKindToolCall,
model.AgentTimelineKindToolResult,
model.AgentTimelineKindConfirmRequest,
model.AgentTimelineKindBusinessCard,
model.AgentTimelineKindScheduleCompleted,
model.AgentTimelineKindThinkingSummary:
return normalizedKind
case "text", "message", "query":
if normalizedRole == "user" {
return model.AgentTimelineKindUserText
}
if normalizedRole == "assistant" {
return model.AgentTimelineKindAssistantText
}
}
return normalizedKind
}

View File

@@ -0,0 +1,115 @@
package eventsvc
import (
"context"
"encoding/json"
"errors"
"strconv"
"strings"
"github.com/LoveLosita/smartflow/backend/services/runtime/dao"
"github.com/LoveLosita/smartflow/backend/services/runtime/model"
contracts "github.com/LoveLosita/smartflow/backend/shared/contracts/userauth"
kafkabus "github.com/LoveLosita/smartflow/backend/shared/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
"github.com/LoveLosita/smartflow/backend/shared/ports"
"gorm.io/gorm"
)
const (
// EventTypeChatHistoryPersistRequested 是聊天消息持久化请求的业务事件类型。
EventTypeChatHistoryPersistRequested = "chat.history.persist.requested"
)
// RegisterChatHistoryPersistHandler 注册“聊天消息持久化”消费者。
// 职责边界:
// 1. 只处理聊天历史事件,不处理其它业务事件;
// 2. 只负责注册,不负责总线启动;
// 3. 先写本地 chat 相关表,再调用 userauth 调整 token 额度;
// 4. 当前版本仅注册新路由键,不再注册旧兼容键。
func RegisterChatHistoryPersistHandler(
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
adjuster ports.TokenUsageAdjuster,
) error {
if bus == nil {
return errors.New("event bus is nil")
}
if outboxRepo == nil {
return errors.New("outbox repository is nil")
}
if repoManager == nil {
return errors.New("repo manager is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeChatHistoryPersistRequested)
if err != nil {
return err
}
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
var payload model.ChatHistoryPersistPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析聊天持久化载荷失败: "+unmarshalErr.Error())
return nil
}
eventID := strings.TrimSpace(envelope.EventID)
if eventID == "" {
eventID = strconv.FormatInt(envelope.OutboxID, 10)
}
if err := eventOutboxRepo.ConsumeInTx(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
txM := repoManager.WithTx(tx)
return txM.Agent.SaveChatHistoryInTx(
ctx,
payload.UserID,
payload.ConversationID,
payload.Role,
payload.Message,
payload.ReasoningContent,
payload.ReasoningDurationSeconds,
payload.TokensConsumed,
eventID,
)
}); err != nil {
return err
}
if payload.TokensConsumed > 0 {
if adjuster == nil {
return errors.New("userauth token adjuster is nil")
}
if _, err := adjuster.AdjustTokenUsage(ctx, contracts.AdjustTokenUsageRequest{
EventID: eventID,
UserID: payload.UserID,
TokenDelta: payload.TokensConsumed,
}); err != nil {
return err
}
}
return eventOutboxRepo.MarkConsumed(ctx, envelope.OutboxID)
}
return bus.RegisterEventHandler(EventTypeChatHistoryPersistRequested, handler)
}
// PublishChatHistoryPersistRequested 发布“聊天消息持久化请求”事件。
func PublishChatHistoryPersistRequested(
ctx context.Context,
publisher outboxinfra.EventPublisher,
payload model.ChatHistoryPersistPayload,
) error {
if publisher == nil {
return errors.New("event publisher is nil")
}
return publisher.Publish(ctx, outboxinfra.PublishRequest{
EventType: EventTypeChatHistoryPersistRequested,
EventVersion: outboxinfra.DefaultEventVersion,
MessageKey: payload.ConversationID,
AggregateID: payload.ConversationID,
Payload: payload,
})
}

View File

@@ -0,0 +1,126 @@
package eventsvc
import (
"context"
"encoding/json"
"errors"
"strconv"
"strings"
"time"
"github.com/LoveLosita/smartflow/backend/services/runtime/dao"
"github.com/LoveLosita/smartflow/backend/services/runtime/model"
contracts "github.com/LoveLosita/smartflow/backend/shared/contracts/userauth"
kafkabus "github.com/LoveLosita/smartflow/backend/shared/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
"github.com/LoveLosita/smartflow/backend/shared/ports"
"gorm.io/gorm"
)
const (
// EventTypeChatTokenUsageAdjustRequested 是“会话 token 额度调整”事件类型。
// 命名约束:
// 1. 只表达业务语义,不泄露 outbox/kafka 实现细节;
// 2. 作为稳定路由键长期保留,后续演进优先通过 event_version。
EventTypeChatTokenUsageAdjustRequested = "chat.token.usage.adjust.requested"
)
// RegisterChatTokenUsageAdjustHandler 注册“会话 token 额度调整”消费者。
// 职责边界:
// 1. 只处理 token 调整事件,不处理聊天正文落库;
// 2. 先写本地账本,再调用 userauth 侧做额度同步;
// 3. 非法载荷直接标记 dead避免无意义重试。
func RegisterChatTokenUsageAdjustHandler(
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
adjuster ports.TokenUsageAdjuster,
) error {
if bus == nil {
return errors.New("event bus is nil")
}
if outboxRepo == nil {
return errors.New("outbox repository is nil")
}
if repoManager == nil {
return errors.New("repo manager is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeChatTokenUsageAdjustRequested)
if err != nil {
return err
}
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
var payload model.ChatTokenUsageAdjustPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析会话 token 调整载荷失败: "+unmarshalErr.Error())
return nil
}
if payload.UserID <= 0 || payload.TokensDelta <= 0 || payload.ConversationID == "" {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "会话 token 调整载荷无效: user_id/conversation_id/tokens_delta 非法")
return nil
}
eventID := strings.TrimSpace(envelope.EventID)
if eventID == "" {
eventID = strconv.FormatInt(envelope.OutboxID, 10)
}
if err := eventOutboxRepo.ConsumeInTx(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
txM := repoManager.WithTx(tx)
return txM.Agent.AdjustTokenUsageInTx(ctx, payload.UserID, payload.ConversationID, payload.TokensDelta, eventID)
}); err != nil {
return err
}
if adjuster == nil {
return errors.New("userauth token adjuster is nil")
}
if _, err := adjuster.AdjustTokenUsage(ctx, contracts.AdjustTokenUsageRequest{
EventID: eventID,
UserID: payload.UserID,
TokenDelta: payload.TokensDelta,
}); err != nil {
return err
}
return eventOutboxRepo.MarkConsumed(ctx, envelope.OutboxID)
}
return bus.RegisterEventHandler(EventTypeChatTokenUsageAdjustRequested, handler)
}
// PublishChatTokenUsageAdjustRequested 发布“会话 token 额度调整”事件。
// 1. 这里只保证 outbox 写入成功,不等待消费结果;
// 2. 业务层只关心 DTO不关心 outbox/Kafka 细节。
func PublishChatTokenUsageAdjustRequested(
ctx context.Context,
publisher outboxinfra.EventPublisher,
payload model.ChatTokenUsageAdjustPayload,
) error {
if publisher == nil {
return errors.New("event publisher is nil")
}
if payload.UserID <= 0 {
return errors.New("invalid user_id")
}
if payload.TokensDelta <= 0 {
return errors.New("invalid tokens_delta")
}
if payload.ConversationID == "" {
return errors.New("invalid conversation_id")
}
if payload.TriggeredAt.IsZero() {
payload.TriggeredAt = time.Now()
}
return publisher.Publish(ctx, outboxinfra.PublishRequest{
EventType: EventTypeChatTokenUsageAdjustRequested,
EventVersion: outboxinfra.DefaultEventVersion,
MessageKey: payload.ConversationID,
AggregateID: strconv.Itoa(payload.UserID) + ":" + payload.ConversationID,
Payload: payload,
})
}

View File

@@ -0,0 +1,185 @@
package eventsvc
import (
"errors"
"github.com/LoveLosita/smartflow/backend/services/memory"
"github.com/LoveLosita/smartflow/backend/services/runtime/dao"
sharedevents "github.com/LoveLosita/smartflow/backend/shared/events"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
"github.com/LoveLosita/smartflow/backend/shared/ports"
)
// RegisterCoreOutboxHandlers 注册单体残留内仍由 agent 边界消费的 outbox handler。
//
// 职责边界:
// 1. 只负责聚合注册当前单体残留内仍归 agent 进程消费的 handler
// 2. 不负责创建 eventBus/outboxRepo/DAO也不负责启动或关闭事件总线。
// 3. 不改变单个 Register* 函数的职责;具体 payload 解析、幂等消费和业务落库仍由各自 handler 负责。
// 4. memory.extract.requested 已在阶段 6 CP1 迁往 cmd/memory这里只登记其路由不再注册消费 handler。
func RegisterCoreOutboxHandlers(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
adjuster ports.TokenUsageAdjuster,
) error {
if err := validateCoreOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo); err != nil {
return err
}
if err := RegisterMemoryExtractRoute(); err != nil {
return err
}
return registerOutboxHandlerRoutes(coreOutboxHandlerRoutes(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule, adjuster))
}
// RegisterAllOutboxHandlers 注册当前阶段所有 outbox handler。
//
// 职责边界:
// 1. 只负责把当前单体残留域的 core / active_scheduler 路由一次性接线;
// 2. 不负责创建依赖,也不负责启动事件总线;
// 3. notification 已独立到 cmd/notification自有 outbox consumer 不再由单体注册;
// 4. 供当前启动流程在“总线启动前”统一完成显式路由注册。
func RegisterAllOutboxHandlers(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
activeTriggerWorkflow ActiveScheduleTriggeredProcessor,
adjuster ports.TokenUsageAdjuster,
) error {
if err := validateAllOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule, activeTriggerWorkflow); err != nil {
return err
}
return registerOutboxHandlerRoutes(allOutboxHandlerRoutes(
eventBus,
outboxRepo,
repoManager,
agentRepo,
cacheRepo,
memoryModule,
activeTriggerWorkflow,
adjuster,
))
}
// validateCoreOutboxHandlerDeps 校验核心 outbox handler 聚合注册所需依赖。
//
// 职责边界:
// 1. 只做 nil 校验不做数据库、Redis、Kafka 连通性探测,避免注册函数承担启动健康检查职责。
// 2. 返回 error 表示依赖缺失;返回 nil 表示可以安全进入逐项注册流程。
func validateCoreOutboxHandlerDeps(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
) error {
if eventBus == nil {
return errors.New("event bus is nil")
}
if outboxRepo == nil {
return errors.New("outbox repository is nil")
}
if repoManager == nil {
return errors.New("repo manager is nil")
}
if agentRepo == nil {
return errors.New("agent repo is nil")
}
if cacheRepo == nil {
return errors.New("cache repo is nil")
}
return nil
}
// validateAllOutboxHandlerDeps 在核心依赖基础上,额外校验 active_scheduler 相关依赖。
func validateAllOutboxHandlerDeps(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
activeTriggerWorkflow ActiveScheduleTriggeredProcessor,
) error {
if err := validateCoreOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo); err != nil {
return err
}
if activeTriggerWorkflow == nil {
return errors.New("active schedule triggered processor is nil")
}
return nil
}
// coreOutboxHandlerRoutes 只描述 core 阶段的 outbox 路由。
func coreOutboxHandlerRoutes(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
adjuster ports.TokenUsageAdjuster,
) []outboxHandlerRoute {
return []outboxHandlerRoute{
{
EventType: EventTypeChatHistoryPersistRequested,
Service: outboxHandlerServiceAgent,
Register: func() error {
return RegisterChatHistoryPersistHandler(eventBus, outboxRepo, repoManager, adjuster)
},
},
{
EventType: EventTypeChatTokenUsageAdjustRequested,
Service: outboxHandlerServiceAgent,
Register: func() error {
return RegisterChatTokenUsageAdjustHandler(eventBus, outboxRepo, repoManager, adjuster)
},
},
{
EventType: EventTypeAgentStateSnapshotPersist,
Service: outboxHandlerServiceAgent,
Register: func() error {
return RegisterAgentStateSnapshotHandler(eventBus, outboxRepo, repoManager)
},
},
{
EventType: EventTypeAgentTimelinePersistRequested,
Service: outboxHandlerServiceAgent,
Register: func() error {
return RegisterAgentTimelinePersistHandler(eventBus, outboxRepo, agentRepo, cacheRepo)
},
},
}
}
// allOutboxHandlerRoutes 把当前阶段所有 outbox 路由一次性展开,供启动入口统一接线。
func allOutboxHandlerRoutes(
eventBus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
memoryModule *memory.Module,
activeTriggerWorkflow ActiveScheduleTriggeredProcessor,
adjuster ports.TokenUsageAdjuster,
) []outboxHandlerRoute {
routes := coreOutboxHandlerRoutes(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule, adjuster)
routes = append(routes,
outboxHandlerRoute{
EventType: sharedevents.ActiveScheduleTriggeredEventType,
Service: outboxHandlerServiceActiveScheduler,
Register: func() error {
return RegisterActiveScheduleTriggeredHandler(eventBus, outboxRepo, activeTriggerWorkflow)
},
},
)
return routes
}

View File

@@ -0,0 +1,262 @@
package eventsvc
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/LoveLosita/smartflow/backend/services/memory"
memorymodel "github.com/LoveLosita/smartflow/backend/services/memory/model"
"github.com/LoveLosita/smartflow/backend/services/runtime/model"
kafkabus "github.com/LoveLosita/smartflow/backend/shared/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
"github.com/spf13/viper"
"gorm.io/gorm"
)
const (
// EventTypeMemoryExtractRequested 是“记忆抽取请求”事件类型。
EventTypeMemoryExtractRequested = "memory.extract.requested"
maxMemorySourceTextLength = 1500
)
// RegisterMemoryExtractRoute 只登记 memory.extract.requested 的服务归属。
//
// 职责边界:
// 1. 只保证发布侧能把事件写入 memory_outbox_messages
// 2. 不注册消费 handler消费边界在阶段 6 CP1 起归 cmd/memory
// 3. 重复调用按 outbox 路由注册的幂等语义处理。
func RegisterMemoryExtractRoute() error {
return outboxinfra.RegisterEventService(EventTypeMemoryExtractRequested, outboxinfra.ServiceMemory)
}
// RegisterMemoryExtractRequestedHandler 注册“记忆抽取请求”消费者。
//
// 职责边界:
// 1. 只负责把事件转为 memory_jobs 任务;
// 2. 不在消费回调里执行 LLM 重计算;
// 3. 通过 memory.Module.WithTx(tx) 复用同一套接入门面,保证事务边界仍由 outbox 掌控。
func RegisterMemoryExtractRequestedHandler(
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
memoryModule *memory.Module,
) error {
if bus == nil {
return errors.New("event bus is nil")
}
if outboxRepo == nil {
return errors.New("outbox repository is nil")
}
if memoryModule == nil {
return errors.New("memory module is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeMemoryExtractRequested)
if err != nil {
return err
}
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
var payload model.MemoryExtractRequestedPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析记忆抽取载荷失败: "+unmarshalErr.Error())
return nil
}
if validateErr := validateMemoryExtractPayload(payload); validateErr != nil {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "记忆抽取载荷非法: "+validateErr.Error())
return nil
}
return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
jobPayload := memorymodel.ExtractJobPayload{
UserID: payload.UserID,
ConversationID: strings.TrimSpace(payload.ConversationID),
AssistantID: strings.TrimSpace(payload.AssistantID),
RunID: strings.TrimSpace(payload.RunID),
SourceMessageID: payload.SourceMessageID,
SourceRole: strings.TrimSpace(payload.SourceRole),
SourceText: strings.TrimSpace(payload.SourceText),
OccurredAt: payload.OccurredAt,
TraceID: strings.TrimSpace(payload.TraceID),
IdempotencyKey: strings.TrimSpace(payload.IdempotencyKey),
}
return memoryModule.WithTx(tx).EnqueueExtract(ctx, jobPayload, envelope.EventID)
})
}
return bus.RegisterEventHandler(EventTypeMemoryExtractRequested, handler)
}
// EnqueueMemoryExtractRequestedInTx 在事务内写入 memory.extract.requested outbox 消息。
//
// 设计目的:
// 1. 让“聊天消息已落库”和“记忆抽取事件已入队”同事务提交;
// 2. 任意一步失败都整体回滚,避免出现链路断点。
func EnqueueMemoryExtractRequestedInTx(
ctx context.Context,
outboxRepo *outboxinfra.Repository,
maxRetry int,
chatPayload model.ChatHistoryPersistPayload,
) error {
if !isMemoryWriteEnabled() {
return nil
}
if outboxRepo == nil {
return errors.New("outbox repository is nil")
}
memoryPayload, shouldEnqueue := buildMemoryExtractPayloadFromChat(chatPayload)
if !shouldEnqueue {
return nil
}
payloadJSON, err := json.Marshal(memoryPayload)
if err != nil {
return err
}
if maxRetry <= 0 {
maxRetry = 20
}
outboxPayload := outboxinfra.OutboxEventPayload{
EventType: EventTypeMemoryExtractRequested,
EventVersion: outboxinfra.DefaultEventVersion,
AggregateID: strings.TrimSpace(chatPayload.ConversationID),
Payload: payloadJSON,
}
// 1. 这里只传 eventType 与消息键服务归属、outbox 表和 Kafka topic 统一交给仓库路由层解析。
// 2. 这样聊天持久化链路不会继续感知 memory 服务的物理 topic避免拆服务时出现双写口径。
_, err = outboxRepo.CreateMessage(
ctx,
EventTypeMemoryExtractRequested,
strings.TrimSpace(chatPayload.ConversationID),
outboxPayload,
maxRetry,
)
return err
}
// PublishMemoryExtractFromGraph 在 graph 完成后直接发布记忆抽取事件。
//
// 设计目的:
// 1. 绕过 chat-persist 链路,由 agent service 在 graph 完成后按需调用;
// 2. 内部完成 source text 截断、幂等 key 生成、memory 开关检查;
// 3. 发布失败只记日志,不阻断主链路。
func PublishMemoryExtractFromGraph(
ctx context.Context,
publisher outboxinfra.EventPublisher,
userID int,
conversationID string,
sourceText string,
) error {
if !isMemoryWriteEnabled() {
return nil
}
if publisher == nil {
return errors.New("event publisher is nil")
}
sourceText = strings.TrimSpace(sourceText)
if sourceText == "" || userID <= 0 || strings.TrimSpace(conversationID) == "" {
return nil
}
truncated := truncateByRune(sourceText, maxMemorySourceTextLength)
now := time.Now()
payload := model.MemoryExtractRequestedPayload{
UserID: userID,
ConversationID: strings.TrimSpace(conversationID),
SourceRole: "user",
SourceText: truncated,
OccurredAt: now,
IdempotencyKey: buildMemoryExtractIdempotencyKey(userID, conversationID, truncated),
}
return publisher.Publish(ctx, outboxinfra.PublishRequest{
EventType: EventTypeMemoryExtractRequested,
EventVersion: outboxinfra.DefaultEventVersion,
MessageKey: payload.ConversationID,
AggregateID: payload.ConversationID,
Payload: payload,
})
}
func buildMemoryExtractPayloadFromChat(chatPayload model.ChatHistoryPersistPayload) (model.MemoryExtractRequestedPayload, bool) {
role := strings.ToLower(strings.TrimSpace(chatPayload.Role))
if role != "user" {
return model.MemoryExtractRequestedPayload{}, false
}
sourceText := strings.TrimSpace(chatPayload.Message)
if sourceText == "" {
return model.MemoryExtractRequestedPayload{}, false
}
truncatedSourceText := truncateByRune(sourceText, maxMemorySourceTextLength)
now := time.Now()
return model.MemoryExtractRequestedPayload{
UserID: chatPayload.UserID,
ConversationID: strings.TrimSpace(chatPayload.ConversationID),
// Day1 先保留 assistant_id/run_id 空值,后续从主链路上下文补齐。
AssistantID: "",
RunID: "",
SourceMessageID: 0,
SourceRole: role,
SourceText: truncatedSourceText,
OccurredAt: now,
TraceID: "",
IdempotencyKey: buildMemoryExtractIdempotencyKey(chatPayload.UserID, chatPayload.ConversationID, truncatedSourceText),
}, true
}
func validateMemoryExtractPayload(payload model.MemoryExtractRequestedPayload) error {
if payload.UserID <= 0 {
return errors.New("user_id is invalid")
}
if strings.TrimSpace(payload.ConversationID) == "" {
return errors.New("conversation_id is empty")
}
if strings.TrimSpace(payload.SourceRole) == "" {
return errors.New("source_role is empty")
}
if strings.TrimSpace(payload.SourceText) == "" {
return errors.New("source_text is empty")
}
if strings.TrimSpace(payload.IdempotencyKey) == "" {
return errors.New("idempotency_key is empty")
}
return nil
}
func buildMemoryExtractIdempotencyKey(userID int, conversationID, sourceText string) string {
raw := fmt.Sprintf("%d|%s|%s", userID, strings.TrimSpace(conversationID), strings.TrimSpace(sourceText))
sum := sha256.Sum256([]byte(raw))
return "memory_extract_" + strconv.Itoa(userID) + "_" + hex.EncodeToString(sum[:8])
}
func truncateByRune(raw string, max int) string {
if max <= 0 {
return ""
}
runes := []rune(raw)
if len(runes) <= max {
return raw
}
return string(runes[:max])
}
func isMemoryWriteEnabled() bool {
if !viper.IsSet("memory.enabled") {
return true
}
return viper.GetBool("memory.enabled")
}

View File

@@ -0,0 +1,214 @@
package eventsvc
import (
"context"
"errors"
"fmt"
"sort"
"strings"
kafkabus "github.com/LoveLosita/smartflow/backend/shared/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
)
// OutboxBus 是启动侧和业务侧共享的 outbox 门面。
//
// 职责边界:
// 1. 对上只暴露 Publish / RegisterEventHandler / Start / Close 这四个能力;
// 2. 对内可以按 service 维度路由到多个底层 engine
// 3. 不承载任何业务处理逻辑只做事件归属、topic/group 和 engine 选择。
type OutboxBus interface {
outboxinfra.EventPublisher
RegisterEventHandler(eventType string, handler outboxinfra.MessageHandler) error
Start(ctx context.Context)
Close()
}
type routedOutboxBus struct {
buses map[string]OutboxBus
}
// NewRoutedOutboxBus 把多个 service 级 outbox bus 组装成一个门面。
//
// 1. 这里不创建底层 engine只接收上层已经建好的 service bus
// 2. 事件发布和 handler 注册都按事件归属路由到对应 service
// 3. 任一 service bus 缺失时直接报错,避免静默回落到共享 topic。
func NewRoutedOutboxBus(buses map[string]OutboxBus) OutboxBus {
normalized := make(map[string]OutboxBus, len(buses))
for serviceName, bus := range buses {
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" || bus == nil {
continue
}
normalized[serviceName] = bus
}
if len(normalized) == 0 {
return nil
}
return &routedOutboxBus{buses: normalized}
}
// NewServiceOutboxBus 基于 service 级 topic / group 创建底层 outbox engine。
//
// 1. topic / group 由 service 名称推导,不再要求调用方显式传入共享 topic
// 2. kafka 未启用时返回 nil调用侧可以继续走同步降级路径
// 3. 这里不注册 handler注册仍由启动侧统一完成。
func NewServiceOutboxBus(repo *outboxinfra.Repository, baseCfg kafkabus.Config, serviceName string) (OutboxBus, error) {
if repo == nil {
return nil, errors.New("outbox repository is nil")
}
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" {
return nil, errors.New("serviceName is empty")
}
route, _ := outboxinfra.ResolveServiceRoute(serviceName)
cfg := baseCfg
cfg.Topic = strings.TrimSpace(route.Topic)
cfg.GroupID = strings.TrimSpace(route.GroupID)
cfg.ServiceName = strings.TrimSpace(route.ServiceName)
if cfg.ServiceName == "" {
cfg.ServiceName = serviceName
}
bus, err := outboxinfra.NewEventBus(repo.WithRoute(route), cfg)
if err != nil {
return nil, err
}
if bus == nil {
return nil, nil
}
return bus, nil
}
func (b *routedOutboxBus) Publish(ctx context.Context, req outboxinfra.PublishRequest) error {
serviceBus, err := b.resolveBusByEventType(req.EventType)
if err != nil {
return err
}
return serviceBus.Publish(ctx, req)
}
func (b *routedOutboxBus) RegisterEventHandler(eventType string, handler outboxinfra.MessageHandler) error {
if handler == nil {
return errors.New("handler is nil")
}
serviceBus, err := b.resolveBusByEventType(eventType)
if err != nil {
return err
}
return serviceBus.RegisterEventHandler(eventType, handler)
}
func (b *routedOutboxBus) Start(ctx context.Context) {
if b == nil {
return
}
for _, serviceName := range orderedOutboxServiceNames(b.buses) {
b.buses[serviceName].Start(ctx)
}
}
func (b *routedOutboxBus) Close() {
if b == nil {
return
}
for _, serviceName := range orderedOutboxServiceNames(b.buses) {
b.buses[serviceName].Close()
}
}
func (b *routedOutboxBus) resolveBusByEventType(eventType string) (OutboxBus, error) {
if b == nil {
return nil, errors.New("outbox bus is not initialized")
}
eventType = strings.TrimSpace(eventType)
if eventType == "" {
return nil, errors.New("eventType is empty")
}
serviceName, ok := outboxinfra.ResolveEventService(eventType)
if !ok {
return nil, fmt.Errorf("outbox route not registered: eventType=%s", eventType)
}
serviceBus, ok := b.buses[strings.TrimSpace(serviceName)]
if !ok || serviceBus == nil {
return nil, fmt.Errorf("service outbox bus is missing: service=%s eventType=%s", serviceName, eventType)
}
return serviceBus, nil
}
func orderedOutboxServiceNames(buses map[string]OutboxBus) []string {
ordered := make([]string, 0, len(buses))
seen := make(map[string]struct{}, len(buses))
for _, serviceName := range OutboxServiceNames() {
if _, ok := buses[serviceName]; ok {
ordered = append(ordered, serviceName)
seen[serviceName] = struct{}{}
}
}
extras := make([]string, 0)
for serviceName := range buses {
if _, ok := seen[serviceName]; ok {
continue
}
extras = append(extras, serviceName)
}
sort.Strings(extras)
return append(ordered, extras...)
}
// OutboxServiceNames 返回当前阶段启用的 service 级 outbox 名称。
func OutboxServiceNames() []string {
return []string{
string(outboxHandlerServiceAgent),
string(outboxHandlerServiceMemory),
}
}
// ResolveOutboxTopicForService 把 service 名称映射成独立 Kafka topic。
//
// 1. 这里保留现在的命名风格smartflow.<service>.outbox
// 2. 空 service 只作为兜底,不作为主路径;
// 3. 调用侧不再传共享 topic避免入口继续依赖旧结构。
func ResolveOutboxTopicForService(serviceName string) string {
route, _ := outboxinfra.ResolveServiceRoute(serviceName)
if topic := strings.TrimSpace(route.Topic); topic != "" {
return topic
}
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" {
return kafkabus.DefaultTopic
}
return "smartflow." + serviceName + ".outbox"
}
// ResolveOutboxGroupForService 把 service 名称映射成独立 Kafka group。
func ResolveOutboxGroupForService(serviceName string) string {
route, _ := outboxinfra.ResolveServiceRoute(serviceName)
if groupID := strings.TrimSpace(route.GroupID); groupID != "" {
return groupID
}
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" {
return kafkabus.DefaultGroup
}
return "smartflow-" + serviceName + "-outbox-consumer"
}
// ResolveOutboxTopicForEvent 根据事件归属 service 计算 topic。
func ResolveOutboxTopicForEvent(eventType string) (string, error) {
route, ok := outboxinfra.ResolveEventRoute(eventType)
if !ok {
return "", fmt.Errorf("outbox route not registered: eventType=%s", strings.TrimSpace(eventType))
}
if topic := strings.TrimSpace(route.Topic); topic != "" {
return topic, nil
}
return ResolveOutboxTopicForService(route.ServiceName), nil
}

View File

@@ -0,0 +1,74 @@
package eventsvc
import (
"fmt"
"strings"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
)
// outboxHandlerService 表示 outbox 路由归属的业务服务。
//
// 这里只记录服务归属,不承载具体实现包名,方便在启动日志和路由表里直接阅读。
type outboxHandlerService string
const (
outboxHandlerServiceAgent outboxHandlerService = "agent"
outboxHandlerServiceTask outboxHandlerService = "task"
outboxHandlerServiceMemory outboxHandlerService = "memory"
outboxHandlerServiceActiveScheduler outboxHandlerService = "active-scheduler"
outboxHandlerServiceNotification outboxHandlerService = "notification"
)
// outboxHandlerRoute 显式描述“事件类型 -> 服务归属 -> handler 注册动作”。
//
// 1. EventType 负责唯一定位 outbox 路由键;
// 2. Service 负责标明该路由归属的业务服务;
// 3. Register 只负责把对应 handler 挂到总线,不承载业务逻辑。
type outboxHandlerRoute struct {
EventType string
Service outboxHandlerService
Register func() error
}
// registerOutboxHandlerRoutes 逐条注册路由表里的 handler。
//
// 1. 先把事件类型和服务归属写进路由表,避免启动入口散落多处 if err != nil
// 2. 再统一执行注册动作,保证失败时能直接定位到具体 event_type 和 service
// 3. 若某条路由缺少注册函数,直接返回 error避免静默漏注册。
func registerOutboxHandlerRoutes(routes []outboxHandlerRoute) error {
for _, route := range routes {
if route.Register == nil {
return fmt.Errorf("outbox handler route 缺少注册函数: event_type=%s service=%s", route.EventType, route.Service)
}
if err := outboxinfra.RegisterEventService(route.EventType, string(route.Service)); err != nil {
return fmt.Errorf("登记 outbox 事件归属失败event_type=%s, service=%s: %w", route.EventType, route.Service, err)
}
if err := route.Register(); err != nil {
return fmt.Errorf("注册 outbox handler 失败event_type=%s, service=%s: %w", route.EventType, route.Service, err)
}
}
return nil
}
// scopedOutboxRepoForEvent 负责把通用 outbox 仓库收敛到某个事件所属的服务表。//
// 职责边界:
// 1. 只做事件->服务->表的路由,不碰业务写入语义;
// 2. 返回的仓库只适合当前事件的 MarkDead / ConsumeAndMarkConsumed / MarkFailedForRetry
// 3. 路由缺失时直接返回错误,避免默默写回默认表。
func scopedOutboxRepoForEvent(outboxRepo *outboxinfra.Repository, eventType string) (*outboxinfra.Repository, error) {
if outboxRepo == nil {
return nil, fmt.Errorf("outbox repository is nil")
}
eventType = strings.TrimSpace(eventType)
if eventType == "" {
return nil, fmt.Errorf("eventType is empty")
}
route, ok := outboxinfra.ResolveEventRoute(eventType)
if !ok {
return nil, fmt.Errorf("outbox route not registered: eventType=%s", eventType)
}
return outboxRepo.WithRoute(route), nil
}

View File

@@ -0,0 +1,150 @@
package eventsvc
import (
"context"
"encoding/json"
"errors"
"log"
"strconv"
"time"
"github.com/LoveLosita/smartflow/backend/services/runtime/dao"
"github.com/LoveLosita/smartflow/backend/services/runtime/model"
kafkabus "github.com/LoveLosita/smartflow/backend/shared/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
"gorm.io/gorm"
)
const (
// EventTypeTaskUrgencyPromoteRequested 是“任务紧急性平移请求”事件类型。
//
// 命名约束:
// 1. 只表达业务语义,不泄露 Kafka/outbox 技术细节;
// 2. 作为稳定路由键长期保留,后续协议演进优先走 event_version。
EventTypeTaskUrgencyPromoteRequested = "task.urgency.promote.requested"
)
// RegisterTaskUrgencyPromoteRoute 只登记 task 事件归属,不注册消费 handler。
//
// 职责边界:
// 1. 供单体残留路径在迁移期继续把 task 事件写入 task_outbox_messages
// 2. 不创建 consumer也不启动 handler真正消费已迁到 cmd/task
// 3. 重复登记同一归属是幂等操作。
func RegisterTaskUrgencyPromoteRoute() error {
return outboxinfra.RegisterEventService(EventTypeTaskUrgencyPromoteRequested, string(outboxHandlerServiceTask))
}
// RegisterTaskUrgencyPromoteHandler 注册“任务紧急性平移”消费者处理器。
//
// 职责边界:
// 1. 只负责注册 handler不负责启动/关闭事件总线;
// 2. 只处理 `task.urgency.promote.requested` 事件,不处理其他业务事件;
// 3. 通过 `ConsumeAndMarkConsumed` 把“业务更新 + outbox consumed 推进”放进同一事务。
func RegisterTaskUrgencyPromoteHandler(
bus OutboxBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
) error {
// 1. 依赖校验:缺少任意关键依赖都不能安全消费消息。
if bus == nil {
return errors.New("event bus is nil")
}
if outboxRepo == nil {
return errors.New("outbox repository is nil")
}
if repoManager == nil {
return errors.New("repo manager is nil")
}
eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, EventTypeTaskUrgencyPromoteRequested)
if err != nil {
return err
}
// 2. 定义统一处理函数。
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
// 2.1 先解析 payload解析失败属于不可恢复错误直接标记 dead。
var payload model.TaskUrgencyPromoteRequestedPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析任务紧急性平移载荷失败: "+unmarshalErr.Error())
return nil
}
// 2.2 做轻量参数净化,避免脏数据进入 DAO。
payload.TaskIDs = sanitizePositiveUniqueIntIDs(payload.TaskIDs)
if payload.UserID <= 0 || len(payload.TaskIDs) == 0 {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "任务紧急性平移载荷无效: user_id 或 task_ids 非法")
return nil
}
// 2.3 统一走 outbox 消费事务入口,保证“业务成功 -> consumed”原子一致。
return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
// 2.3.1 基于同一 tx 构造 RepoManager复用现有跨 DAO 事务模式。
txM := repoManager.WithTx(tx)
// 2.3.2 以消费时刻为准做条件更新,确保“到线”判定与真实落库时刻一致。
updated, err := txM.Task.PromoteTaskUrgencyByIDs(ctx, payload.UserID, payload.TaskIDs, time.Now())
if err != nil {
return err
}
log.Printf("任务紧急性平移消费完成: user_id=%d task_count=%d affected=%d outbox_id=%d", payload.UserID, len(payload.TaskIDs), updated, envelope.OutboxID)
return nil
})
}
// 3. 注册事件处理器。
return bus.RegisterEventHandler(EventTypeTaskUrgencyPromoteRequested, handler)
}
// PublishTaskUrgencyPromoteRequested 发布“任务紧急性平移请求”事件。
//
// 职责边界:
// 1. 只负责把业务 DTO 发布到 outbox不负责等待消费结果
// 2. 若发布失败,返回 error 交给调用方决定是否降级或重试。
func PublishTaskUrgencyPromoteRequested(
ctx context.Context,
publisher outboxinfra.EventPublisher,
payload model.TaskUrgencyPromoteRequestedPayload,
) error {
if publisher == nil {
return errors.New("event publisher is nil")
}
if payload.UserID <= 0 {
return errors.New("invalid user_id")
}
payload.TaskIDs = sanitizePositiveUniqueIntIDs(payload.TaskIDs)
if len(payload.TaskIDs) == 0 {
return errors.New("task_ids is empty")
}
if payload.TriggeredAt.IsZero() {
payload.TriggeredAt = time.Now()
}
return publisher.Publish(ctx, outboxinfra.PublishRequest{
EventType: EventTypeTaskUrgencyPromoteRequested,
EventVersion: outboxinfra.DefaultEventVersion,
// 这里使用 user_id 作为消息键,确保同一用户相关平移事件尽量落到同一分区,降低乱序概率。
MessageKey: strconv.Itoa(payload.UserID),
AggregateID: strconv.Itoa(payload.UserID),
Payload: payload,
})
}
// sanitizePositiveUniqueIntIDs 过滤非正数并去重。
//
// 说明:
// 1. 该函数只做参数净化,不承载业务判定;
// 2. 不保证顺序稳定,对当前 SQL where in 语义无影响。
func sanitizePositiveUniqueIntIDs(ids []int) []int {
seen := make(map[int]struct{}, len(ids))
result := make([]int, 0, len(ids))
for _, id := range ids {
if id <= 0 {
continue
}
if _, exists := seen[id]; exists {
continue
}
seen[id] = struct{}{}
result = append(result, id)
}
return result
}