Files
Losita 3b6fca44a6 Version: 0.9.77.dev.260505
后端:
1.阶段 6 CP4/CP5 目录收口与共享边界纯化
- 将 backend 根目录收口为 services、client、gateway、cmd、shared 五个一级目录
- 收拢 bootstrap、inits、infra/kafka、infra/outbox、conv、respond、pkg、middleware,移除根目录旧实现与空目录
- 将 utils 下沉到 services/userauth/internal/auth,将 logic 下沉到 services/schedule/core/planning
- 将迁移期 runtime 桥接实现统一收拢到 services/runtime/{conv,dao,eventsvc,model},删除 shared/legacy 与未再被 import 的旧 service 实现
- 将 gateway/shared/respond 收口为 HTTP/Gin 错误写回适配,shared/respond 仅保留共享错误语义与状态映射
- 将 HTTP IdempotencyMiddleware 与 RateLimitMiddleware 收口到 gateway/middleware
- 将 GormCachePlugin 下沉到 shared/infra/gormcache,将共享 RateLimiter 下沉到 shared/infra/ratelimit,将 agent token budget 下沉到 services/agent/shared
- 删除 InitEino 兼容壳,收缩 cmd/internal/coreinit 仅保留旧组合壳残留域初始化语义
- 更新微服务迁移计划与桌面 checklist,补齐 CP4/CP5 当前切流点、目录终态与验证结果
- 完成 go test ./...、git diff --check 与最终真实 smoke;health、register/login、task/create+get、schedule/today、task-class/list、memory/items、agent chat/meta/timeline/context-stats 全部 200,SSE 合并结果为 CP5_OK 且 [DONE] 只有 1 个
2026-05-05 23:25:07 +08:00

497 lines
18 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package applyadapter
import (
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"github.com/LoveLosita/smartflow/backend/services/runtime/conv"
"github.com/LoveLosita/smartflow/backend/services/runtime/model"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// GormApplyAdapter 负责把主动调度确认后的变更写入正式 schedule 表。
//
// 职责边界:
// 1. 只写 schedule_events / schedules并在事务内完成目标重校验与冲突重校验
// 2. 不回写 active_schedule_previews不发布 outbox不调用 API/service/task
// 3. 不创建 task_item也不更新 task / task_items 状态task_pool 是否已安排由 schedule_events 反查判断。
//
// 迁移说明:
// 1. 本文件从 active-scheduler applyadapter 原样搬入 schedule 服务,先让正式日程写所有权回到 schedule
// 2. active-scheduler 下的旧 adapter 本轮暂留作回退与历史编译兼容,切流稳定后再删除旧实现;
// 3. 暂不抽 shared 公共层,因为该逻辑属于 schedule 写模型状态机,放入 shared 会污染跨进程契约层。
type GormApplyAdapter struct {
db *gorm.DB
}
func NewGormApplyAdapter(db *gorm.DB) *GormApplyAdapter {
return &GormApplyAdapter{db: db}
}
// ApplyActiveScheduleChanges 在单个数据库事务内应用主动调度变更。
//
// 事务语义:
// 1. 先规范化所有 change 的节次,并检查本次请求内部是否自相冲突;
// 2. 事务内锁定目标事实并重查 schedules 占用,任何冲突都直接返回 slot_conflict
// 3. 所有 event 和 schedules 都成功插入后才提交;任一错误都会回滚,避免半写。
//
// 输入输出:
// 1. req.UserID / req.PreviewID / req.Changes 必须有效;
// 2. 返回的 AppliedEventIDs 是新建 schedule_events.id
// 3. error 若为 *ApplyError上游可按 Code 分类处理。
func (a *GormApplyAdapter) ApplyActiveScheduleChanges(ctx context.Context, req ApplyActiveScheduleRequest) (ApplyActiveScheduleResult, error) {
if a == nil || a.db == nil {
return ApplyActiveScheduleResult{}, newApplyError(ErrorCodeInvalidRequest, "主动调度 apply adapter 未初始化", nil)
}
normalized, err := normalizeRequest(req)
if err != nil {
return ApplyActiveScheduleResult{}, err
}
result := ApplyActiveScheduleResult{ApplyID: req.ApplyID}
err = a.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
appliedEventIDs := make([]int, 0, len(normalized))
appliedScheduleIDs := make([]int, 0)
for _, change := range normalized {
var eventIDs []int
var scheduleIDs []int
var applyErr error
switch {
case isAddTaskPoolChange(change):
eventIDs, scheduleIDs, applyErr = a.applyTaskPoolChange(ctx, tx, req, change)
case isCreateMakeupChange(change):
eventIDs, scheduleIDs, applyErr = a.applyMakeupChange(ctx, tx, req, change)
default:
applyErr = newApplyError(ErrorCodeUnsupportedChangeType, fmt.Sprintf("不支持的主动调度变更类型:%s", change.ChangeType), nil)
}
if applyErr != nil {
return applyErr
}
appliedEventIDs = append(appliedEventIDs, eventIDs...)
appliedScheduleIDs = append(appliedScheduleIDs, scheduleIDs...)
}
result.AppliedEventIDs = appliedEventIDs
result.AppliedScheduleIDs = appliedScheduleIDs
return nil
})
if err != nil {
return ApplyActiveScheduleResult{}, classifyDBError(err)
}
return result, nil
}
func (a *GormApplyAdapter) applyTaskPoolChange(ctx context.Context, tx *gorm.DB, req ApplyActiveScheduleRequest, change normalizedChange) ([]int, []int, error) {
targetID := change.TargetID
if change.TargetType != "" && change.TargetType != TargetTypeTaskPool {
return nil, nil, newApplyError(ErrorCodeInvalidEditedChanges, "add_task_pool_to_schedule 只能写入 task_pool 目标", nil)
}
// 调用目的:锁住同一个 task_pool 任务,串行化“是否已经进入日程”的判断,避免并发确认写出重复任务块。
task, err := lockTaskPool(ctx, tx, req.UserID, targetID)
if err != nil {
return nil, nil, err
}
if task.IsCompleted {
return nil, nil, newApplyError(ErrorCodeTargetCompleted, "task_pool 任务已完成,不能再加入日程", nil)
}
if err := ensureTaskPoolNotScheduled(ctx, tx, req.UserID, task.ID); err != nil {
return nil, nil, err
}
if err := ensureSlotsFree(ctx, tx, req.UserID, change); err != nil {
return nil, nil, err
}
eventName := strings.TrimSpace(task.Title)
if eventName == "" {
eventName = fmt.Sprintf("任务 %d", task.ID)
}
relID := task.ID
return insertTaskEventWithSchedules(ctx, tx, req, change, eventPayload{
Name: eventName,
TaskSourceType: TaskSourceTypeTaskPool,
RelID: relID,
Sections: change.Sections,
})
}
func (a *GormApplyAdapter) applyMakeupChange(ctx context.Context, tx *gorm.DB, req ApplyActiveScheduleRequest, change normalizedChange) ([]int, []int, error) {
target, err := resolveMakeupTarget(ctx, tx, req.UserID, change)
if err != nil {
return nil, nil, err
}
if err := ensureSlotsFree(ctx, tx, req.UserID, change); err != nil {
return nil, nil, err
}
return insertTaskEventWithSchedules(ctx, tx, req, change, eventPayload{
Name: target.Name,
TaskSourceType: target.TaskSourceType,
RelID: target.RelID,
MakeupForEventID: &target.MakeupForEventID,
Sections: change.Sections,
})
}
type normalizedChange struct {
ApplyChange
Week int
DayOfWeek int
Sections []int
}
func normalizeRequest(req ApplyActiveScheduleRequest) ([]normalizedChange, error) {
if req.UserID <= 0 {
return nil, newApplyError(ErrorCodeInvalidRequest, "user_id 不能为空", nil)
}
if strings.TrimSpace(req.PreviewID) == "" {
return nil, newApplyError(ErrorCodeInvalidRequest, "preview_id 不能为空", nil)
}
if len(req.Changes) == 0 {
return nil, newApplyError(ErrorCodeInvalidRequest, "changes 不能为空", nil)
}
seenSlots := make(map[string]struct{})
normalized := make([]normalizedChange, 0, len(req.Changes))
for _, change := range req.Changes {
sections, err := normalizeSections(change)
if err != nil {
return nil, err
}
for _, section := range sections {
key := fmt.Sprintf("%d:%d:%d", change.ToSlot.Start.Week, change.ToSlot.Start.DayOfWeek, section)
if _, exists := seenSlots[key]; exists {
return nil, newApplyError(ErrorCodeSlotConflict, "本次确认请求内部存在重复节次", nil)
}
seenSlots[key] = struct{}{}
}
normalized = append(normalized, normalizedChange{
ApplyChange: change,
Week: change.ToSlot.Start.Week,
DayOfWeek: change.ToSlot.Start.DayOfWeek,
Sections: sections,
})
}
return normalized, nil
}
func normalizeSections(change ApplyChange) ([]int, error) {
if change.TargetID <= 0 {
return nil, newApplyError(ErrorCodeInvalidEditedChanges, "变更目标 ID 不能为空", nil)
}
if change.ToSlot == nil {
return nil, newApplyError(ErrorCodeInvalidEditedChanges, "变更缺少目标节次", nil)
}
start := change.ToSlot.Start
end := change.ToSlot.End
if start.Week <= 0 || start.DayOfWeek < 1 || start.DayOfWeek > 7 || start.Section < 1 || start.Section > 12 {
return nil, newApplyError(ErrorCodeInvalidEditedChanges, "目标起始节次不合法", nil)
}
duration := change.DurationSections
if duration <= 0 {
duration = change.ToSlot.DurationSections
}
if end.Section <= 0 && duration > 0 {
end = Slot{Week: start.Week, DayOfWeek: start.DayOfWeek, Section: start.Section + duration - 1}
}
if end.Week <= 0 && end.DayOfWeek <= 0 && end.Section <= 0 {
end = start
}
if end.Week != start.Week || end.DayOfWeek != start.DayOfWeek || end.Section < start.Section {
return nil, newApplyError(ErrorCodeInvalidEditedChanges, "目标节次必须是同一天内的连续区间", nil)
}
if end.Section > 12 {
return nil, newApplyError(ErrorCodeInvalidEditedChanges, "目标结束节次不合法", nil)
}
actualDuration := end.Section - start.Section + 1
if duration > 0 && duration != actualDuration {
return nil, newApplyError(ErrorCodeInvalidEditedChanges, "duration_sections 与目标节次跨度不一致", nil)
}
sections := make([]int, 0, actualDuration)
for section := start.Section; section <= end.Section; section++ {
sections = append(sections, section)
}
return sections, nil
}
func isAddTaskPoolChange(change normalizedChange) bool {
if change.ChangeType == ChangeTypeAddTaskPoolToSchedule {
return true
}
return change.ChangeType == changeTypeAdd && change.TargetType == TargetTypeTaskPool
}
func isCreateMakeupChange(change normalizedChange) bool {
return change.ChangeType == ChangeTypeCreateMakeup
}
func lockTaskPool(ctx context.Context, tx *gorm.DB, userID, taskID int) (model.Task, error) {
var task model.Task
err := tx.WithContext(ctx).
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("id = ? AND user_id = ?", taskID, userID).
First(&task).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return model.Task{}, newApplyError(ErrorCodeTargetNotFound, "task_pool 任务不存在或不属于当前用户", nil)
}
return model.Task{}, newApplyError(ErrorCodeDBError, "读取 task_pool 任务失败", err)
}
return task, nil
}
func ensureTaskPoolNotScheduled(ctx context.Context, tx *gorm.DB, userID, taskID int) error {
var count int64
err := tx.WithContext(ctx).
Model(&model.ScheduleEvent{}).
Where("user_id = ? AND type = ? AND task_source_type = ? AND rel_id = ?", userID, scheduleEventTypeTask, TaskSourceTypeTaskPool, taskID).
Count(&count).Error
if err != nil {
return newApplyError(ErrorCodeDBError, "检查 task_pool 是否已进入日程失败", err)
}
if count > 0 {
return newApplyError(ErrorCodeTargetAlreadyScheduled, "task_pool 任务已进入日程", nil)
}
return nil
}
func ensureSlotsFree(ctx context.Context, tx *gorm.DB, userID int, change normalizedChange) error {
sections := change.Sections
if len(sections) == 0 {
return newApplyError(ErrorCodeInvalidEditedChanges, "目标节次不能为空", nil)
}
sort.Ints(sections)
startSection := sections[0]
endSection := sections[len(sections)-1]
// 1. 在事务内对目标节次加行锁,命中任何已有 schedules 都视为冲突。
// 2. 若并发事务在检查后抢先插入同一唯一键,后续 Create 会被唯一索引兜底拦截并整体回滚。
// 3. MVP 不处理课程嵌入,任何已有课程、固定日程或任务都不可覆盖。
var occupied []model.Schedule
err := tx.WithContext(ctx).
Model(&model.Schedule{}).
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("user_id = ? AND week = ? AND day_of_week = ? AND section IN ?", userID, change.Week, change.DayOfWeek, sections).
Find(&occupied).Error
if err != nil {
return newApplyError(ErrorCodeDBError, "检查目标节次冲突失败", err)
}
if len(occupied) > 0 {
return newApplyError(ErrorCodeSlotConflict, fmt.Sprintf("第 %d-%d 节已被占用", startSection, endSection), nil)
}
return nil
}
type eventPayload struct {
Name string
TaskSourceType string
RelID int
MakeupForEventID *int
Sections []int
}
func insertTaskEventWithSchedules(ctx context.Context, tx *gorm.DB, req ApplyActiveScheduleRequest, change normalizedChange, payload eventPayload) ([]int, []int, error) {
sections := append([]int(nil), payload.Sections...)
sort.Ints(sections)
start := sections[0]
end := sections[len(sections)-1]
startTime, endTime, err := conv.RelativeTimeToRealTime(change.Week, change.DayOfWeek, start, end)
if err != nil {
return nil, nil, newApplyError(ErrorCodeInvalidEditedChanges, "目标节次无法转换为绝对时间", err)
}
previewID := strings.TrimSpace(req.PreviewID)
event := model.ScheduleEvent{
UserID: req.UserID,
Name: payload.Name,
Type: scheduleEventTypeTask,
TaskSourceType: payload.TaskSourceType,
RelID: &payload.RelID,
MakeupForEventID: payload.MakeupForEventID,
ActivePreviewID: &previewID,
CanBeEmbedded: false,
StartTime: startTime,
EndTime: endTime,
}
if err := tx.WithContext(ctx).Create(&event).Error; err != nil {
return nil, nil, newApplyError(ErrorCodeDBError, "写入 schedule_events 失败", err)
}
schedules := make([]model.Schedule, 0, len(sections))
for _, section := range sections {
schedules = append(schedules, model.Schedule{
EventID: event.ID,
UserID: req.UserID,
Week: change.Week,
DayOfWeek: change.DayOfWeek,
Section: section,
Status: scheduleStatusNormal,
})
}
if err := tx.WithContext(ctx).Create(&schedules).Error; err != nil {
return nil, nil, newApplyError(ErrorCodeDBError, "写入 schedules 失败", err)
}
scheduleIDs := make([]int, 0, len(schedules))
for _, schedule := range schedules {
scheduleIDs = append(scheduleIDs, schedule.ID)
}
return []int{event.ID}, scheduleIDs, nil
}
type makeupTarget struct {
Name string
TaskSourceType string
RelID int
MakeupForEventID int
}
func resolveMakeupTarget(ctx context.Context, tx *gorm.DB, userID int, change normalizedChange) (makeupTarget, error) {
makeupForEventID := parsePositiveInt(change.Metadata["makeup_for_event_id"])
if change.TargetType == "" || change.TargetType == TargetTypeScheduleEvent {
if change.TargetID > 0 {
makeupForEventID = change.TargetID
}
return resolveMakeupFromEvent(ctx, tx, userID, makeupForEventID)
}
if makeupForEventID <= 0 {
return makeupTarget{}, newApplyError(ErrorCodeInvalidEditedChanges, "create_makeup 必须提供 makeup_for_event_id", nil)
}
if _, err := lockScheduleEvent(ctx, tx, userID, makeupForEventID); err != nil {
return makeupTarget{}, err
}
switch change.TargetType {
case TargetTypeTaskPool:
task, err := lockTaskPool(ctx, tx, userID, change.TargetID)
if err != nil {
return makeupTarget{}, err
}
if task.IsCompleted {
return makeupTarget{}, newApplyError(ErrorCodeTargetCompleted, "补做目标 task_pool 已完成", nil)
}
return makeupTarget{
Name: nonEmpty(task.Title, fmt.Sprintf("任务 %d", task.ID)),
TaskSourceType: TaskSourceTypeTaskPool,
RelID: task.ID,
MakeupForEventID: makeupForEventID,
}, nil
case TargetTypeTaskItem:
item, err := lockTaskItemForUser(ctx, tx, userID, change.TargetID)
if err != nil {
return makeupTarget{}, err
}
return makeupTarget{
Name: nonEmpty(stringPtrValue(item.Content), fmt.Sprintf("任务块 %d", item.ID)),
TaskSourceType: TaskSourceTypeTaskItem,
RelID: item.ID,
MakeupForEventID: makeupForEventID,
}, nil
default:
return makeupTarget{}, newApplyError(ErrorCodeInvalidEditedChanges, "create_makeup 目标类型不合法", nil)
}
}
func resolveMakeupFromEvent(ctx context.Context, tx *gorm.DB, userID, eventID int) (makeupTarget, error) {
event, err := lockScheduleEvent(ctx, tx, userID, eventID)
if err != nil {
return makeupTarget{}, err
}
if event.Type != scheduleEventTypeTask || event.RelID == nil || *event.RelID <= 0 {
return makeupTarget{}, newApplyError(ErrorCodeInvalidEditedChanges, "补做来源必须是已排任务日程", nil)
}
sourceType := event.TaskSourceType
if sourceType == "" {
sourceType = TaskSourceTypeTaskItem
}
if sourceType != TaskSourceTypeTaskItem && sourceType != TaskSourceTypeTaskPool {
return makeupTarget{}, newApplyError(ErrorCodeInvalidEditedChanges, "补做来源任务类型不合法", nil)
}
return makeupTarget{
Name: nonEmpty(event.Name, fmt.Sprintf("补做任务 %d", event.ID)),
TaskSourceType: sourceType,
RelID: *event.RelID,
MakeupForEventID: event.ID,
}, nil
}
func lockScheduleEvent(ctx context.Context, tx *gorm.DB, userID, eventID int) (model.ScheduleEvent, error) {
if eventID <= 0 {
return model.ScheduleEvent{}, newApplyError(ErrorCodeInvalidEditedChanges, "makeup_for_event_id 不能为空", nil)
}
var event model.ScheduleEvent
err := tx.WithContext(ctx).
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("id = ? AND user_id = ?", eventID, userID).
First(&event).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return model.ScheduleEvent{}, newApplyError(ErrorCodeTargetNotFound, "补做来源日程不存在或不属于当前用户", nil)
}
return model.ScheduleEvent{}, newApplyError(ErrorCodeDBError, "读取补做来源日程失败", err)
}
return event, nil
}
func lockTaskItemForUser(ctx context.Context, tx *gorm.DB, userID, taskItemID int) (model.TaskClassItem, error) {
var item model.TaskClassItem
err := tx.WithContext(ctx).
Table("task_items").
Select("task_items.*").
Joins("JOIN task_classes ON task_classes.id = task_items.category_id").
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("task_items.id = ? AND task_classes.user_id = ?", taskItemID, userID).
First(&item).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return model.TaskClassItem{}, newApplyError(ErrorCodeTargetNotFound, "task_item 不存在或不属于当前用户", nil)
}
return model.TaskClassItem{}, newApplyError(ErrorCodeDBError, "读取 task_item 失败", err)
}
return item, nil
}
func parsePositiveInt(value string) int {
parsed, err := strconv.Atoi(strings.TrimSpace(value))
if err != nil || parsed <= 0 {
return 0
}
return parsed
}
func nonEmpty(value, fallback string) string {
if strings.TrimSpace(value) == "" {
return fallback
}
return strings.TrimSpace(value)
}
func stringPtrValue(value *string) string {
if value == nil {
return ""
}
return *value
}
func classifyDBError(err error) error {
if err == nil {
return nil
}
var applyErr *ApplyError
if errors.As(err, &applyErr) {
return applyErr
}
message := strings.ToLower(err.Error())
if strings.Contains(message, "duplicate entry") ||
strings.Contains(message, "unique constraint") ||
strings.Contains(message, "unique violation") ||
strings.Contains(message, "idx_user_slot_atomic") {
return newApplyError(ErrorCodeSlotConflict, "目标节次已被其他日程占用", err)
}
return newApplyError(ErrorCodeDBError, "主动调度正式写库失败", err)
}