Version: 0.9.35.dev.260422
后端: 1. 任务查询统一紧急性提升链路——LLM 工具与前端共享读时派生 + outbox 异步落库 - service/task.go:GetUserTasks 中读时提升逻辑抽取为独立方法 GetTasksWithUrgencyPromotion,返回 []model.Task 供两路复用 - service/agentsvc/agent.go:新增 GetTasksWithUrgencyPromotionFunc 函数注入字段 - service/agentsvc/agent_task_query.go:QueryTasksForTool 优先走统一提升链路,未注入时回退旧 taskRepo 直接读取 - service/agent_bridge.go:NewAgentServiceWithSchedule 接收 TaskService 并注入提升函数 - cmd/start.go:启动接线传入 taskSv 2. 移除未使用依赖 - go.mod:删除 github.com/bytedance/mockey
This commit is contained in:
@@ -151,7 +151,7 @@ func Start() {
|
||||
courseService := service.NewCourseService(courseRepo, scheduleRepo)
|
||||
taskClassService := service.NewTaskClassService(taskClassRepo, cacheRepo, scheduleRepo, manager)
|
||||
scheduleService := service.NewScheduleService(scheduleRepo, userRepo, taskClassRepo, manager, cacheRepo)
|
||||
agentService := service.NewAgentServiceWithSchedule(aiHub, agentRepo, taskRepo, cacheRepo, agentCacheRepo, eventBus, scheduleService)
|
||||
agentService := service.NewAgentServiceWithSchedule(aiHub, agentRepo, taskRepo, cacheRepo, agentCacheRepo, eventBus, scheduleService, taskSv)
|
||||
|
||||
// newAgent 依赖接线。
|
||||
agentService.SetAgentStateStore(dao.NewAgentStateStoreAdapter(cacheRepo))
|
||||
|
||||
@@ -23,7 +23,6 @@ require (
|
||||
github.com/bahlo/generic-list-go v0.2.0 // indirect
|
||||
github.com/buger/jsonparser v1.1.1 // indirect
|
||||
github.com/bytedance/gopkg v0.1.3 // indirect
|
||||
github.com/bytedance/mockey v1.3.0 // indirect
|
||||
github.com/bytedance/sonic v1.15.0 // indirect
|
||||
github.com/bytedance/sonic/loader v0.5.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
|
||||
@@ -36,6 +36,7 @@ func NewAgentServiceWithSchedule(
|
||||
agentRedis *dao.AgentCache,
|
||||
eventPublisher outboxinfra.EventPublisher,
|
||||
scheduleSvc *ScheduleService,
|
||||
taskSvc *TaskService,
|
||||
) *AgentService {
|
||||
svc := agentsvc.NewAgentService(aiHub, repo, taskRepo, cacheDAO, agentRedis, eventPublisher)
|
||||
|
||||
@@ -46,5 +47,10 @@ func NewAgentServiceWithSchedule(
|
||||
svc.ResolvePlanningWindowFunc = scheduleSvc.ResolvePlanningWindowByTaskClasses
|
||||
}
|
||||
|
||||
// 注入任务紧急性提升依赖:复用 TaskService 的统一提升 + outbox 投递链路。
|
||||
if taskSvc != nil {
|
||||
svc.GetTasksWithUrgencyPromotionFunc = taskSvc.GetTasksWithUrgencyPromotion
|
||||
}
|
||||
|
||||
return svc
|
||||
}
|
||||
|
||||
@@ -50,6 +50,12 @@ type AgentService struct {
|
||||
// 2. 该函数只做”窗口解析”,不负责粗排与混排计算。
|
||||
ResolvePlanningWindowFunc func(ctx context.Context, userID int, taskClassIDs []int) (startWeek, startDay, endWeek, endDay int, err error)
|
||||
|
||||
// ── 任务紧急性提升依赖(函数注入,避免 service 包循环依赖)──
|
||||
|
||||
// GetTasksWithUrgencyPromotionFunc 读取用户任务并应用读时紧急性提升 + 异步落库触发。
|
||||
// 未注入时,QueryTasksForTool 回退到旧逻辑(纯内存提升,不持久化)。
|
||||
GetTasksWithUrgencyPromotionFunc func(ctx context.Context, userID int) ([]model.Task, error)
|
||||
|
||||
// ── newAgent 依赖(由 cmd/start.go 通过 Set* 方法注入)──
|
||||
toolRegistry *newagenttools.ToolRegistry
|
||||
scheduleProvider newagentmodel.ScheduleStateProvider
|
||||
|
||||
@@ -13,31 +13,47 @@ import (
|
||||
)
|
||||
|
||||
func (s *AgentService) QueryTasksForTool(ctx context.Context, req newagentmodel.TaskQueryRequest) ([]newagentmodel.TaskQueryTaskRecord, error) {
|
||||
_ = ctx
|
||||
if req.UserID <= 0 {
|
||||
return nil, errors.New("invalid user_id in task query")
|
||||
}
|
||||
if s.taskRepo == nil {
|
||||
return nil, errors.New("task repository is nil")
|
||||
}
|
||||
|
||||
tasks, err := s.taskRepo.GetTasksByUserID(req.UserID)
|
||||
if err != nil {
|
||||
if errors.Is(err, respond.UserTasksEmpty) {
|
||||
return make([]newagentmodel.TaskQueryTaskRecord, 0), nil
|
||||
var tasks []model.Task
|
||||
var err error
|
||||
|
||||
// 优先使用统一提升链路(含缓存读取 + 读时派生 + outbox 异步落库)。
|
||||
if s.GetTasksWithUrgencyPromotionFunc != nil {
|
||||
tasks, err = s.GetTasksWithUrgencyPromotionFunc(ctx, req.UserID)
|
||||
if err != nil {
|
||||
if errors.Is(err, respond.UserTasksEmpty) {
|
||||
return make([]newagentmodel.TaskQueryTaskRecord, 0), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
// 回退:未注入时走旧的 taskRepo 直接读取(无缓存、无持久化)。
|
||||
if s.taskRepo == nil {
|
||||
return nil, errors.New("task repository is nil")
|
||||
}
|
||||
tasks, err = s.taskRepo.GetTasksByUserID(req.UserID)
|
||||
if err != nil {
|
||||
if errors.Is(err, respond.UserTasksEmpty) {
|
||||
return make([]newagentmodel.TaskQueryTaskRecord, 0), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
now := time.Now()
|
||||
for i := range tasks {
|
||||
applyReadTimeUrgencyPromotion(&tasks[i], now)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
// 过滤、排序、截断。
|
||||
filtered := make([]model.Task, 0, len(tasks))
|
||||
for _, originalTask := range tasks {
|
||||
currentTask := originalTask
|
||||
applyReadTimeUrgencyPromotion(¤tTask, now)
|
||||
if !taskMatchesQueryFilter(currentTask, req) {
|
||||
for _, task := range tasks {
|
||||
if !taskMatchesQueryFilter(task, req) {
|
||||
continue
|
||||
}
|
||||
filtered = append(filtered, currentTask)
|
||||
filtered = append(filtered, task)
|
||||
}
|
||||
|
||||
sortTasksForQuery(filtered, req)
|
||||
|
||||
@@ -168,21 +168,25 @@ func (ts *TaskService) UndoCompleteTask(ctx context.Context, req *model.UserUndo
|
||||
// 2. 真实平移由异步消费者条件更新 DB;
|
||||
// 3. DB 更新后由 cache_deleter 自动删缓存,下一次读取自然拿到新状态。
|
||||
func (ts *TaskService) GetUserTasks(ctx context.Context, userID int) ([]model.GetUserTaskResp, error) {
|
||||
// 1. 读取原始任务模型(缓存优先,DB 兜底)。
|
||||
derivedTasks, err := ts.GetTasksWithUrgencyPromotion(ctx, userID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conv.ModelToGetUserTasksResp(derivedTasks), nil
|
||||
}
|
||||
|
||||
// GetTasksWithUrgencyPromotion 读取用户任务并应用读时紧急性提升 + 异步落库触发。
|
||||
//
|
||||
// 统一入口,供前端查询(GetUserTasks)和 LLM 工具查询(QueryTasksForTool)复用。
|
||||
// 调用方不应假设 DB 已更新——持久化是异步的。
|
||||
func (ts *TaskService) GetTasksWithUrgencyPromotion(ctx context.Context, userID int) ([]model.Task, error) {
|
||||
rawTasks, err := ts.getRawUserTasks(ctx, userID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 2. 读时派生:本次请求内把“已到线任务”映射到紧急象限,同时收集待异步落库任务 ID。
|
||||
derivedTasks, duePromoteTaskIDs := deriveTaskUrgencyForRead(rawTasks, time.Now())
|
||||
|
||||
// 3. 非阻断触发异步平移事件:发布失败不影响本次查询返回。
|
||||
ts.tryEnqueueTaskUrgencyPromote(ctx, userID, duePromoteTaskIDs)
|
||||
|
||||
// 4. 最后统一走 conv 转 DTO,避免 API 层直接依赖内部模型。
|
||||
response := conv.ModelToGetUserTasksResp(derivedTasks)
|
||||
return response, nil
|
||||
return derivedTasks, nil
|
||||
}
|
||||
|
||||
// getRawUserTasks 读取“原始任务模型”。
|
||||
|
||||
Reference in New Issue
Block a user