Version: 0.6.3.dev.260316

 feat(task): 新增四象限任务懒触发自动平移链路(读时派生 + Outbox 异步收敛)

- 🧩 为 `Task` 模型新增 `urgency_threshold_at` 字段,并补充复合索引 `user_id,is_completed,urgency_threshold_at,priority` 及相关事件 payload
- ♻️ 重构 `TaskService.GetUserTasks`:调整为“缓存/DB 读取原始任务 -> 读时派生优先级(`2 -> 1`、`4 -> 3`)-> 通过 `SETNX` 去重后发布平移事件”的处理链路
- 🚚 新增任务平移事件链路:
  - `service/events/task_urgency_promote.go`
  - 事件类型:`task.urgency.promote.requested`
  - 支持 `Publish` + `RegisterHandler` + `ConsumeAndMarkConsumed` 的事务化消费流程
- 🛡️ 为 `TaskDAO` 新增幂等批量更新能力 `PromoteTaskUrgencyByIDs`,采用条件更新策略,仅对“达到阈值且未完成”的任务生效
- 🔌 更新启动接线逻辑:注册任务平移 handler,并将 `eventBus` 注入 `NewTaskService`
- 🧹 修复并升级任务缓存层,统一为 `[]model.Task` 原始模型缓存;同时清理误导性注释,并补充详细中文步骤化注释
- 🔗 打通 `QuickNote` 链路中的 `urgency_threshold_at` 透传与校验,覆盖 `state` / `tool` / `nodes` / `prompt` / `agent_quick_note` 全链路
- 💾 写库时补充落库 `task.UrgencyThresholdAt`
- 📝 新增功能决策记录

之前画的饼正在一块块填上~这一块饼填上之后,第一批开发的后端部分基本已经搞定了。后面的功能全都是天马行空的拓展功能。
This commit is contained in:
Losita
2026-03-16 20:33:33 +08:00
parent daeff0afab
commit 84371e2ff8
12 changed files with 792 additions and 115 deletions

View File

@@ -124,11 +124,12 @@ func (s *AgentService) tryHandleQuickNoteWithGraph(
CreateTask: func(ctx context.Context, req quicknote.QuickNoteCreateTaskRequest) (*quicknote.QuickNoteCreateTaskResult, error) {
// 3.2.1 把 quick note 的工具入参映射成项目 Task 模型。
taskModel := &model.Task{
UserID: req.UserID,
Title: req.Title,
Priority: req.PriorityGroup,
IsCompleted: false,
DeadlineAt: req.DeadlineAt,
UserID: req.UserID,
Title: req.Title,
Priority: req.PriorityGroup,
IsCompleted: false,
DeadlineAt: req.DeadlineAt,
UrgencyThresholdAt: req.UrgencyThresholdAt,
}
// 3.2.2 调用 DAO 写库。
@@ -139,10 +140,11 @@ func (s *AgentService) tryHandleQuickNoteWithGraph(
// 3.2.3 把写库结果回填给 graph 状态,用于后续回复拼装。
return &quicknote.QuickNoteCreateTaskResult{
TaskID: created.ID,
Title: created.Title,
PriorityGroup: created.Priority,
DeadlineAt: created.DeadlineAt,
TaskID: created.ID,
Title: created.Title,
PriorityGroup: created.Priority,
DeadlineAt: created.DeadlineAt,
UrgencyThresholdAt: created.UrgencyThresholdAt,
}, nil
},
},

View File

@@ -0,0 +1,136 @@
package events
import (
"context"
"encoding/json"
"errors"
"log"
"strconv"
"time"
"github.com/LoveLosita/smartflow/backend/dao"
kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
"github.com/LoveLosita/smartflow/backend/model"
"gorm.io/gorm"
)
const (
// EventTypeTaskUrgencyPromoteRequested 是“任务紧急性平移请求”事件类型。
//
// 命名约束:
// 1. 只表达业务语义,不泄露 Kafka/outbox 技术细节;
// 2. 作为稳定路由键长期保留,后续协议演进优先走 event_version。
EventTypeTaskUrgencyPromoteRequested = "task.urgency.promote.requested"
)
// RegisterTaskUrgencyPromoteHandler 注册“任务紧急性平移”消费者处理器。
//
// 职责边界:
// 1. 只负责注册 handler不负责启动/关闭事件总线;
// 2. 只处理 `task.urgency.promote.requested` 事件,不处理其他业务事件;
// 3. 通过 `ConsumeAndMarkConsumed` 把“业务更新 + outbox consumed 推进”放进同一事务。
func RegisterTaskUrgencyPromoteHandler(
bus *outboxinfra.EventBus,
outboxRepo *outboxinfra.Repository,
repoManager *dao.RepoManager,
) error {
// 1. 依赖校验:缺少任意关键依赖都不能安全消费消息。
if bus == nil {
return errors.New("event bus is nil")
}
if outboxRepo == nil {
return errors.New("outbox repository is nil")
}
if repoManager == nil {
return errors.New("repo manager is nil")
}
// 2. 定义统一处理函数。
handler := func(ctx context.Context, envelope kafkabus.Envelope) error {
// 2.1 先解析 payload解析失败属于不可恢复错误直接标记 dead。
var payload model.TaskUrgencyPromoteRequestedPayload
if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析任务紧急性平移载荷失败: "+unmarshalErr.Error())
return nil
}
// 2.2 做轻量参数净化,避免脏数据进入 DAO。
payload.TaskIDs = sanitizePositiveUniqueIntIDs(payload.TaskIDs)
if payload.UserID <= 0 || len(payload.TaskIDs) == 0 {
_ = outboxRepo.MarkDead(ctx, envelope.OutboxID, "任务紧急性平移载荷无效: user_id 或 task_ids 非法")
return nil
}
// 2.3 统一走 outbox 消费事务入口,保证“业务成功 -> consumed”原子一致。
return outboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error {
// 2.3.1 基于同一 tx 构造 RepoManager复用现有跨 DAO 事务模式。
txM := repoManager.WithTx(tx)
// 2.3.2 以消费时刻为准做条件更新,确保“到线”判定与真实落库时刻一致。
updated, err := txM.Task.PromoteTaskUrgencyByIDs(ctx, payload.UserID, payload.TaskIDs, time.Now())
if err != nil {
return err
}
log.Printf("任务紧急性平移消费完成: user_id=%d task_count=%d affected=%d outbox_id=%d", payload.UserID, len(payload.TaskIDs), updated, envelope.OutboxID)
return nil
})
}
// 3. 注册事件处理器。
return bus.RegisterEventHandler(EventTypeTaskUrgencyPromoteRequested, handler)
}
// PublishTaskUrgencyPromoteRequested 发布“任务紧急性平移请求”事件。
//
// 职责边界:
// 1. 只负责把业务 DTO 发布到 outbox不负责等待消费结果
// 2. 若发布失败,返回 error 交给调用方决定是否降级或重试。
func PublishTaskUrgencyPromoteRequested(
ctx context.Context,
publisher outboxinfra.EventPublisher,
payload model.TaskUrgencyPromoteRequestedPayload,
) error {
if publisher == nil {
return errors.New("event publisher is nil")
}
if payload.UserID <= 0 {
return errors.New("invalid user_id")
}
payload.TaskIDs = sanitizePositiveUniqueIntIDs(payload.TaskIDs)
if len(payload.TaskIDs) == 0 {
return errors.New("task_ids is empty")
}
if payload.TriggeredAt.IsZero() {
payload.TriggeredAt = time.Now()
}
return publisher.Publish(ctx, outboxinfra.PublishRequest{
EventType: EventTypeTaskUrgencyPromoteRequested,
EventVersion: outboxinfra.DefaultEventVersion,
// 这里使用 user_id 作为消息键,确保同一用户相关平移事件尽量落到同一分区,降低乱序概率。
MessageKey: strconv.Itoa(payload.UserID),
AggregateID: strconv.Itoa(payload.UserID),
Payload: payload,
})
}
// sanitizePositiveUniqueIntIDs 过滤非正数并去重。
//
// 说明:
// 1. 该函数只做参数净化,不承载业务判定;
// 2. 不保证顺序稳定,对当前 SQL where in 语义无影响。
func sanitizePositiveUniqueIntIDs(ids []int) []int {
seen := make(map[int]struct{}, len(ids))
result := make([]int, 0, len(ids))
for _, id := range ids {
if id <= 0 {
continue
}
if _, exists := seen[id]; exists {
continue
}
seen[id] = struct{}{}
result = append(result, id)
}
return result
}

View File

@@ -3,69 +3,281 @@ package service
import (
"context"
"errors"
"fmt"
"log"
"time"
"github.com/LoveLosita/smartflow/backend/conv"
"github.com/LoveLosita/smartflow/backend/dao"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
"github.com/LoveLosita/smartflow/backend/model"
"github.com/LoveLosita/smartflow/backend/respond"
eventsvc "github.com/LoveLosita/smartflow/backend/service/events"
"github.com/go-redis/redis/v8"
)
const (
// taskUrgencyPromoteDedupeTTL 是“同一任务平移请求”的去重锁有效期。
//
// 设计考虑:
// 1. 太短会导致消费稍慢时被重复投递;
// 2. 太长会导致首次投递失败后恢复变慢;
// 3. 这里先取 120 秒作为折中值,后续可按线上观测再调优。
taskUrgencyPromoteDedupeTTL = 120 * time.Second
// taskUrgencyPromoteDedupeKeyFmt 是任务平移去重键模板。
taskUrgencyPromoteDedupeKeyFmt = "smartflow:task:promote:pending:%d:%d"
)
type TaskService struct {
// 伸出手:准备接住 DAO
dao *dao.TaskDAO
// dao 负责任务表读写。
dao *dao.TaskDAO
// cache 负责任务列表缓存与 Redis 去重锁能力。
cache *dao.CacheDAO
// eventPublisher 负责发布 outbox 事件(可能为空:例如未启用 Kafka/总线时)。
eventPublisher outboxinfra.EventPublisher
}
// NewTaskService 创建 TaskService 实例
func NewTaskService(dao *dao.TaskDAO, cache *dao.CacheDAO) *TaskService {
// NewTaskService 创建 TaskService 实例
//
// 职责边界:
// 1. 只做依赖注入,不做连接可用性探测;
// 2. 允许 eventPublisher 为空(用于本地降级场景)。
func NewTaskService(taskDAO *dao.TaskDAO, cacheDAO *dao.CacheDAO, eventPublisher outboxinfra.EventPublisher) *TaskService {
return &TaskService{
dao: dao,
cache: cache,
dao: taskDAO,
cache: cacheDAO,
eventPublisher: eventPublisher,
}
}
// AddTask 新增任务。
//
// 职责边界:
// 1. 负责参数转换、优先级合法性校验与写库;
// 2. 不负责“紧急性自动平移”逻辑(该逻辑发生在任务读取时的懒触发链路)。
func (ts *TaskService) AddTask(ctx context.Context, req *model.UserAddTaskRequest, userID int) (*model.UserAddTaskResponse, error) {
//1. 调用 conv 层进行转换
// 1. 把用户请求转换为内部模型,避免 API 层结构直接泄漏到 DAO。
taskModel := conv.UserAddTaskRequestToModel(req, userID)
//2.检查优先级是否合法
// 2. 优先级范围校验:当前任务体系只允许 1~4。
if taskModel.Priority < 1 || taskModel.Priority >= 5 {
return nil, respond.InvalidPriority
}
//3. 调用 courseDAO 层进行数据持久化
// 3. 写库。
createdTask, err := ts.dao.AddTask(taskModel)
if err != nil {
return nil, err
}
//4. 调用 conv 层进行响应转换
// 4. 返回对外响应 DTO。
response := conv.ModelToUserAddTaskResponse(createdTask)
return response, nil
}
// GetUserTasks 获取用户任务列表(含“读时紧急性派生”与“异步平移触发”)。
//
// 核心流程(步骤化):
// 1. 先读缓存,未命中再回源 DB并把“原始模型”回填缓存
// 2. 在内存里做“读时派生”:仅用于本次返回给前端,不直接改库;
// 3. 收集“已到紧急分界线且仍处于非紧急象限”的任务 ID
// 4. 通过 Redis SETNX 去重后,发布 outbox 事件异步落库;
// 5. 无论发布成功与否,都优先返回本次派生结果,保证用户读体验。
//
// 一致性策略:
// 1. 缓存里存的是原始任务,不是派生后的优先级;
// 2. 真实平移由异步消费者条件更新 DB
// 3. DB 更新后由 cache_deleter 自动删缓存,下一次读取自然拿到新状态。
func (ts *TaskService) GetUserTasks(ctx context.Context, userID int) ([]model.GetUserTaskResp, error) {
//1. 先尝试从缓存获取数据
cachedResp, err := ts.cache.GetUserTasksFromCache(ctx, userID)
if err == nil {
// 缓存命中,直接返回
return cachedResp, nil
}
// 如果是 redis.Nil 错误,说明缓存未命中,我们继续查库
if !errors.Is(err, redis.Nil) {
return nil, err // 其他错误,返回错误
}
//2. 调用 courseDAO 层获取数据
tasks, err := ts.dao.GetTasksByUserID(userID)
// 1. 读取原始任务模型缓存优先DB 兜底)。
rawTasks, err := ts.getRawUserTasks(ctx, userID)
if err != nil {
return nil, err
}
//3. 调用 conv 层进行响应转换
response := conv.ModelToGetUserTasksResp(tasks)
//4. 将结果存入缓存设置合理的过期时间24h
err = ts.cache.SetUserTasksToCache(ctx, userID, response)
if err != nil {
// 缓存写入失败,记录日志但不影响正常返回数据
log.Printf("Failed to cache user tasks for userID %d: %v", userID, err)
}
// 2. 读时派生:本次请求内把“已到线任务”映射到紧急象限,同时收集待异步落库任务 ID。
derivedTasks, duePromoteTaskIDs := deriveTaskUrgencyForRead(rawTasks, time.Now())
// 3. 非阻断触发异步平移事件:发布失败不影响本次查询返回。
ts.tryEnqueueTaskUrgencyPromote(ctx, userID, duePromoteTaskIDs)
// 4. 最后统一走 conv 转 DTO避免 API 层直接依赖内部模型。
response := conv.ModelToGetUserTasksResp(derivedTasks)
return response, nil
}
// getRawUserTasks 读取“原始任务模型”。
//
// 职责边界:
// 1. 负责缓存命中/回源 DB/回填缓存;
// 2. 不做优先级派生,不做异步事件投递;
// 3. 缓存写失败只记日志,不阻断主流程。
func (ts *TaskService) getRawUserTasks(ctx context.Context, userID int) ([]model.Task, error) {
// 1. 先查缓存:命中则直接返回。
cachedTasks, err := ts.cache.GetUserTasksFromCache(ctx, userID)
if err == nil {
return cachedTasks, nil
}
// 2. 非 redis.Nil 错误直接返回,避免掩盖真实故障。
if !errors.Is(err, redis.Nil) {
return nil, err
}
// 3. 缓存未命中回源 DB。
dbTasks, err := ts.dao.GetTasksByUserID(userID)
if err != nil {
return nil, err
}
// 4. 回填缓存(失败不阻断主链路)。
if setErr := ts.cache.SetUserTasksToCache(ctx, userID, dbTasks); setErr != nil {
log.Printf("写入用户任务缓存失败: user_id=%d err=%v", userID, setErr)
}
return dbTasks, nil
}
// deriveTaskUrgencyForRead 对任务做“读时紧急性派生”,并收集需要异步落库的任务 ID。
//
// 职责边界:
// 1. 只在内存里改本次返回值,不写 DB
// 2. 只做“到线且未完成任务”的优先级映射;
// 3. 不处理去重锁和事件发布。
//
// 返回语义:
// 1. 第一个返回值:可直接用于响应前端的派生任务切片;
// 2. 第二个返回值:需要发“异步平移事件”的任务 ID 列表(可能为空)。
func deriveTaskUrgencyForRead(tasks []model.Task, now time.Time) ([]model.Task, []int) {
// 1. 拷贝切片,避免修改调用方持有的原始数据。
derived := make([]model.Task, len(tasks))
copy(derived, tasks)
pendingPromoteTaskIDs := make([]int, 0, len(derived))
// 2. 逐条判断是否满足“自动平移”条件。
for idx := range derived {
current := &derived[idx]
// 2.1 已完成任务不参与平移。
if current.IsCompleted {
continue
}
// 2.2 没有分界线的任务不参与平移。
if current.UrgencyThresholdAt == nil {
continue
}
// 2.3 尚未到分界线,不平移。
if current.UrgencyThresholdAt.After(now) {
continue
}
// 2.4 到线后,仅把“不紧急象限”平移到对应“紧急象限”。
// 2.4.1 重要不紧急(2) -> 重要且紧急(1)
// 2.4.2 不简单不重要(4) -> 简单不重要(3)
switch current.Priority {
case 2:
current.Priority = 1
pendingPromoteTaskIDs = append(pendingPromoteTaskIDs, current.ID)
case 4:
current.Priority = 3
pendingPromoteTaskIDs = append(pendingPromoteTaskIDs, current.ID)
default:
// 2.4.3 其他优先级不处理(包含已经是 1/3 的情况)。
}
}
return derived, pendingPromoteTaskIDs
}
// tryEnqueueTaskUrgencyPromote 尝试发布“任务紧急性平移请求”事件。
//
// 职责边界:
// 1. 负责 Redis 去重锁 + outbox 发布;
// 2. 不负责真正落库(由消费者负责);
// 3. 发布失败时要释放本次抢到的去重锁,避免任务被长时间“误判已投递”。
func (ts *TaskService) tryEnqueueTaskUrgencyPromote(ctx context.Context, userID int, taskIDs []int) {
// 1. 基础兜底:无发布器或无候选任务时直接返回。
if ts.eventPublisher == nil || userID <= 0 || len(taskIDs) == 0 {
return
}
// 2. 先做任务 ID 清洗,避免无效 ID 参与去重与发布。
validTaskIDs := compactPositiveUniqueTaskIDs(taskIDs)
if len(validTaskIDs) == 0 {
return
}
// 3. 逐个抢 SETNX 去重锁:
// 3.1 抢到锁才允许进入本次发布;
// 3.2 抢不到说明已有请求在途,本次跳过即可;
// 3.3 抢锁失败只记录日志,不中断主流程。
lockedTaskIDs := make([]int, 0, len(validTaskIDs))
lockedKeys := make([]string, 0, len(validTaskIDs))
for _, taskID := range validTaskIDs {
lockKey := fmt.Sprintf(taskUrgencyPromoteDedupeKeyFmt, userID, taskID)
locked, lockErr := ts.cache.AcquireLock(ctx, lockKey, taskUrgencyPromoteDedupeTTL)
if lockErr != nil {
log.Printf("任务平移去重锁获取失败: user_id=%d task_id=%d err=%v", userID, taskID, lockErr)
continue
}
if !locked {
continue
}
lockedTaskIDs = append(lockedTaskIDs, taskID)
lockedKeys = append(lockedKeys, lockKey)
}
if len(lockedTaskIDs) == 0 {
return
}
// 4. 发布 outbox 事件:这里只保证“成功入 outbox 或返回错误”,不等待消费者执行完成。
publishErr := eventsvc.PublishTaskUrgencyPromoteRequested(ctx, ts.eventPublisher, model.TaskUrgencyPromoteRequestedPayload{
UserID: userID,
TaskIDs: lockedTaskIDs,
TriggeredAt: time.Now(),
})
if publishErr != nil {
// 4.1 失败回滚:释放本次抢到的去重锁,避免后续请求因误锁而无法再投递。
ts.releaseTaskPromoteLocks(lockedKeys)
log.Printf("任务平移事件发布失败: user_id=%d task_ids=%v err=%v", userID, lockedTaskIDs, publishErr)
return
}
log.Printf("任务平移事件已发布: user_id=%d task_ids=%v", userID, lockedTaskIDs)
}
// releaseTaskPromoteLocks 释放任务平移去重锁。
//
// 说明:
// 1. 仅用于“发布失败回滚”场景;
// 2. 使用 Background 避免请求上下文已取消时导致锁释放失败。
func (ts *TaskService) releaseTaskPromoteLocks(lockKeys []string) {
if len(lockKeys) == 0 {
return
}
releaseCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
for _, key := range lockKeys {
if err := ts.cache.ReleaseLock(releaseCtx, key); err != nil {
log.Printf("任务平移去重锁释放失败: key=%s err=%v", key, err)
}
}
}
// compactPositiveUniqueTaskIDs 对任务 ID 做“过滤非正数 + 去重”。
//
// 职责边界:
// 1. 只做参数清洗;
// 2. 不承载业务规则判断。
func compactPositiveUniqueTaskIDs(taskIDs []int) []int {
seen := make(map[int]struct{}, len(taskIDs))
result := make([]int, 0, len(taskIDs))
for _, taskID := range taskIDs {
if taskID <= 0 {
continue
}
if _, exists := seen[taskID]; exists {
continue
}
seen[taskID] = struct{}{}
result = append(result, taskID)
}
return result
}