Files
Losita 3b6fca44a6 Version: 0.9.77.dev.260505
后端:
1.阶段 6 CP4/CP5 目录收口与共享边界纯化
- 将 backend 根目录收口为 services、client、gateway、cmd、shared 五个一级目录
- 收拢 bootstrap、inits、infra/kafka、infra/outbox、conv、respond、pkg、middleware,移除根目录旧实现与空目录
- 将 utils 下沉到 services/userauth/internal/auth,将 logic 下沉到 services/schedule/core/planning
- 将迁移期 runtime 桥接实现统一收拢到 services/runtime/{conv,dao,eventsvc,model},删除 shared/legacy 与未再被 import 的旧 service 实现
- 将 gateway/shared/respond 收口为 HTTP/Gin 错误写回适配,shared/respond 仅保留共享错误语义与状态映射
- 将 HTTP IdempotencyMiddleware 与 RateLimitMiddleware 收口到 gateway/middleware
- 将 GormCachePlugin 下沉到 shared/infra/gormcache,将共享 RateLimiter 下沉到 shared/infra/ratelimit,将 agent token budget 下沉到 services/agent/shared
- 删除 InitEino 兼容壳,收缩 cmd/internal/coreinit 仅保留旧组合壳残留域初始化语义
- 更新微服务迁移计划与桌面 checklist,补齐 CP4/CP5 当前切流点、目录终态与验证结果
- 完成 go test ./...、git diff --check 与最终真实 smoke;health、register/login、task/create+get、schedule/today、task-class/list、memory/items、agent chat/meta/timeline/context-stats 全部 200,SSE 合并结果为 CP5_OK 且 [DONE] 只有 1 个
2026-05-05 23:25:07 +08:00

129 lines
4.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sv
import (
"context"
"encoding/json"
"errors"
"log"
"strconv"
"time"
"github.com/LoveLosita/smartflow/backend/services/runtime/model"
taskdao "github.com/LoveLosita/smartflow/backend/services/task/dao"
kafkabus "github.com/LoveLosita/smartflow/backend/shared/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
"gorm.io/gorm"
)
const (
// EventTypeTaskUrgencyPromoteRequested 是“任务紧急性平移请求”事件类型。
EventTypeTaskUrgencyPromoteRequested = "task.urgency.promote.requested"
)
// OutboxBus 是 task 服务注册消费 handler 需要的最小总线接口。
type OutboxBus interface {
RegisterEventHandler(eventType string, handler outboxinfra.MessageHandler) error
}
// RegisterTaskUrgencyPromoteRoute 只登记 task 事件归属,不注册消费 handler。
//
// 职责边界:
// 1. 供迁移期其它进程发布 task 事件时解析到 task_outbox_messages
// 2. 不创建 Kafka consumer也不启动 task handler
// 3. 真正消费仍由 cmd/task 调用 RegisterTaskUrgencyPromoteHandler 承担。
func RegisterTaskUrgencyPromoteRoute() error {
return outboxinfra.RegisterEventService(EventTypeTaskUrgencyPromoteRequested, outboxinfra.ServiceTask)
}
// RegisterTaskUrgencyPromoteHandler 注册 task 服务自己的“紧急性平移”消费者。
//
// 职责边界:
// 1. 只处理 task.urgency.promote.requested不处理 agent/memory 等其它事件;
// 2. 业务更新和 outbox consumed 推进放在同一事务内;
// 3. handler 不创建 DAO 或 event bus避免消费链路隐藏启动依赖。
func RegisterTaskUrgencyPromoteHandler(bus OutboxBus, outboxRepo *outboxinfra.Repository, taskDAO *taskdao.TaskDAO) error {
if bus == nil {
return errors.New("event bus is nil")
}
if outboxRepo == nil {
return errors.New("outbox repository is nil")
}
if taskDAO == nil {
return errors.New("task dao is nil")
}
if err := RegisterTaskUrgencyPromoteRoute(); err != nil {
return err
}
route, ok := outboxinfra.ResolveEventRoute(EventTypeTaskUrgencyPromoteRequested)
if !ok {
return errors.New("task.urgency.promote.requested route is missing")
}
eventOutboxRepo := outboxRepo.WithRoute(route)
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
var payload model.TaskUrgencyPromoteRequestedPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析任务紧急性平移载荷失败: "+unmarshalErr.Error())
return nil
}
payload.TaskIDs = sanitizePositiveUniqueIntIDs(payload.TaskIDs)
if payload.UserID <= 0 || len(payload.TaskIDs) == 0 {
_ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "任务紧急性平移载荷无效: user_id 或 task_ids 非法")
return nil
}
return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
updated, err := taskDAO.WithTx(tx).PromoteTaskUrgencyByIDs(ctx, payload.UserID, payload.TaskIDs, time.Now())
if err != nil {
return err
}
log.Printf("任务紧急性平移消费完成: user_id=%d task_count=%d affected=%d outbox_id=%d", payload.UserID, len(payload.TaskIDs), updated, envelope.OutboxID)
return nil
})
}
return bus.RegisterEventHandler(EventTypeTaskUrgencyPromoteRequested, handler)
}
// PublishTaskUrgencyPromoteRequested 发布“任务紧急性平移请求”事件。
func PublishTaskUrgencyPromoteRequested(ctx context.Context, publisher outboxinfra.EventPublisher, payload model.TaskUrgencyPromoteRequestedPayload) error {
if publisher == nil {
return errors.New("event publisher is nil")
}
if payload.UserID <= 0 {
return errors.New("invalid user_id")
}
payload.TaskIDs = sanitizePositiveUniqueIntIDs(payload.TaskIDs)
if len(payload.TaskIDs) == 0 {
return errors.New("task_ids is empty")
}
if payload.TriggeredAt.IsZero() {
payload.TriggeredAt = time.Now()
}
return publisher.Publish(ctx, outboxinfra.PublishRequest{
EventType: EventTypeTaskUrgencyPromoteRequested,
EventVersion: outboxinfra.DefaultEventVersion,
MessageKey: strconv.Itoa(payload.UserID),
AggregateID: strconv.Itoa(payload.UserID),
Payload: payload,
})
}
func sanitizePositiveUniqueIntIDs(ids []int) []int {
seen := make(map[int]struct{}, len(ids))
result := make([]int, 0, len(ids))
for _, id := range ids {
if id <= 0 {
continue
}
if _, exists := seen[id]; exists {
continue
}
seen[id] = struct{}{}
result = append(result, id)
}
return result
}