diff --git a/backend/cmd/active-scheduler/main.go b/backend/cmd/active-scheduler/main.go index 907b531..962579c 100644 --- a/backend/cmd/active-scheduler/main.go +++ b/backend/cmd/active-scheduler/main.go @@ -46,6 +46,11 @@ func main() { JobScanEvery: viper.GetDuration("activeScheduler.jobScanEvery"), JobScanLimit: viper.GetInt("activeScheduler.jobScanLimit"), KafkaConfig: kafkabus.LoadConfig(), + TaskRPC: activeadapters.TaskRPCConfig{ + Endpoints: viper.GetStringSlice("task.rpc.endpoints"), + Target: viper.GetString("task.rpc.target"), + Timeout: viper.GetDuration("task.rpc.timeout"), + }, ScheduleRPC: activeadapters.ScheduleRPCConfig{ Endpoints: viper.GetStringSlice("schedule.rpc.endpoints"), Target: viper.GetString("schedule.rpc.target"), diff --git a/backend/cmd/start.go b/backend/cmd/start.go index d9ae3e4..a22b197 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -17,6 +17,7 @@ import ( gatewayactivescheduler "github.com/LoveLosita/smartflow/backend/gateway/client/activescheduler" gatewaynotification "github.com/LoveLosita/smartflow/backend/gateway/client/notification" gatewayschedule "github.com/LoveLosita/smartflow/backend/gateway/client/schedule" + gatewaytask "github.com/LoveLosita/smartflow/backend/gateway/client/task" gatewayuserauth "github.com/LoveLosita/smartflow/backend/gateway/client/userauth" gatewayrouter "github.com/LoveLosita/smartflow/backend/gateway/router" kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" @@ -230,6 +231,14 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { if err != nil { return nil, fmt.Errorf("failed to initialize schedule zrpc client: %w", err) } + taskClient, err := gatewaytask.NewClient(gatewaytask.ClientConfig{ + Endpoints: viper.GetStringSlice("task.rpc.endpoints"), + Target: viper.GetString("task.rpc.target"), + Timeout: viper.GetDuration("task.rpc.timeout"), + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize task zrpc client: %w", err) + } activeSchedulerClient, err := gatewayactivescheduler.NewClient(gatewayactivescheduler.ClientConfig{ Endpoints: viper.GetStringSlice("activeScheduler.rpc.endpoints"), Target: viper.GetString("activeScheduler.rpc.target"), @@ -238,7 +247,11 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { if err != nil { return nil, fmt.Errorf("failed to initialize active-scheduler zrpc client: %w", err) } - taskSv := service.NewTaskService(taskRepo, cacheRepo, eventBus) + if err := eventsvc.RegisterTaskUrgencyPromoteRoute(); err != nil { + return nil, fmt.Errorf("failed to register task outbox route: %w", err) + } + taskOutboxPublisher := buildTaskOutboxPublisher(outboxRepo) + taskSv := service.NewTaskService(taskRepo, cacheRepo, taskOutboxPublisher) taskSv.SetActiveScheduleDAO(manager.ActiveSchedule) courseService := buildCourseService(llmService, courseRepo, scheduleRepo) taskClassService := service.NewTaskClassService(taskClassRepo, cacheRepo, scheduleRepo, manager) @@ -268,9 +281,16 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { memoryCfg, ) - // 1. 迁移期 task_pool 事实仍由单体 task 表读取,下一轮切 task 服务后替换为 task RPC; + // 1. task_pool facts 已统一走 task RPC,避免聊天 rerun 继续直连 tasks 表; // 2. schedule facts / feedback / apply 已统一走 schedule RPC,避免聊天 rerun 继续直连 schedule 表。 - activeTaskReader := activeadapters.NewGormReaders(db) + activeTaskAdapter, err := activeadapters.NewTaskRPCAdapter(activeadapters.TaskRPCConfig{ + Endpoints: viper.GetStringSlice("task.rpc.endpoints"), + Target: viper.GetString("task.rpc.target"), + Timeout: viper.GetDuration("task.rpc.timeout"), + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize task rpc adapter for active-scheduler rerun: %w", err) + } activeScheduleAdapter, err := activeadapters.NewScheduleRPCAdapter(activeadapters.ScheduleRPCConfig{ Endpoints: viper.GetStringSlice("schedule.rpc.endpoints"), Target: viper.GetString("schedule.rpc.target"), @@ -279,7 +299,7 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { if err != nil { return nil, fmt.Errorf("failed to initialize schedule rpc adapter for active-scheduler rerun: %w", err) } - activeScheduleDryRun, err := activesvc.NewDryRunService(activeadapters.ReadersWithScheduleRPC(activeTaskReader, activeScheduleAdapter)) + activeScheduleDryRun, err := activesvc.NewDryRunService(activeadapters.ReadersWithScheduleRPC(activeTaskAdapter, activeScheduleAdapter)) if err != nil { return nil, err } @@ -297,7 +317,7 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { return nil, err } agentService.SetActiveScheduleSessionRerunFunc(buildActiveScheduleSessionRerunFunc(manager.ActiveSchedule, activeScheduleGraphRunner, activeSchedulePreviewConfirm, activeScheduleFeedbackLocator)) - handlers := buildAPIHandlers(taskSv, taskClassService, courseService, scheduleClient, agentService, memoryModule, activeSchedulerClient, notificationClient) + handlers := buildAPIHandlers(taskClient, taskClassService, courseService, scheduleClient, agentService, memoryModule, activeSchedulerClient, notificationClient) runtime := &appRuntime{ db: db, @@ -368,6 +388,68 @@ func buildEventBus(outboxRepo *outboxinfra.Repository) (eventsvc.OutboxBus, erro return eventBus, nil } +type repositoryOutboxPublisher struct { + repo *outboxinfra.Repository + maxRetry int +} + +// buildTaskOutboxPublisher 构造单体残留 task 查询链路的发布器。 +// +// 职责边界: +// 1. 只负责把 Agent 残留 TaskService 产生的 task 事件写入 task_outbox_messages; +// 2. 不创建 task consumer / relay,消费边界仍归 cmd/task; +// 3. kafka.enabled=false 时返回 nil,保持本地降级语义与旧 eventBus 一致。 +func buildTaskOutboxPublisher(outboxRepo *outboxinfra.Repository) outboxinfra.EventPublisher { + kafkaCfg := kafkabus.LoadConfig() + if !kafkaCfg.Enabled || outboxRepo == nil { + return nil + } + return &repositoryOutboxPublisher{ + repo: outboxRepo, + maxRetry: kafkaCfg.MaxRetry, + } +} + +// Publish 以 publish-only 方式写入服务级 outbox。 +// +// 说明: +// 1. 这里不复用 outbox EventBus,是因为 EventBus 会创建并启动对应 service engine; +// 2. 单体残留只允许发布 task 事件,不允许启动 task consumer,否则会和 cmd/task 抢同一 consumer group; +// 3. payload 仍包装成统一 OutboxEventPayload,确保 cmd/task relay / consumer 能按标准协议解析。 +func (p *repositoryOutboxPublisher) Publish(ctx context.Context, req outboxinfra.PublishRequest) error { + if p == nil || p.repo == nil { + return fmt.Errorf("task outbox publisher is not initialized") + } + + eventType := strings.TrimSpace(req.EventType) + if eventType == "" { + return fmt.Errorf("eventType is empty") + } + eventVersion := strings.TrimSpace(req.EventVersion) + if eventVersion == "" { + eventVersion = outboxinfra.DefaultEventVersion + } + messageKey := strings.TrimSpace(req.MessageKey) + aggregateID := strings.TrimSpace(req.AggregateID) + if aggregateID == "" { + aggregateID = messageKey + } + + payloadJSON, err := json.Marshal(req.Payload) + if err != nil { + return err + } + + _, err = p.repo.CreateMessage(ctx, eventType, messageKey, outboxinfra.OutboxEventPayload{ + EventID: strings.TrimSpace(req.EventID), + EventType: eventType, + EventVersion: eventVersion, + AggregateID: aggregateID, + Payload: payloadJSON, + }, p.maxRetry) + return err +} + func buildCourseService(llmService *llmservice.Service, courseRepo *dao.CourseDAO, scheduleRepo *dao.ScheduleDAO) *service.CourseService { courseImageResponsesClient := llmService.CourseImageResponsesClient() return service.NewCourseService( @@ -823,7 +905,7 @@ func buildQuickTaskQueryFunc(agentService *service.AgentService) func(ctx contex } func buildAPIHandlers( - taskService *service.TaskService, + taskClient ports.TaskCommandClient, taskClassService *service.TaskClassService, courseService *service.CourseService, scheduleClient ports.ScheduleCommandClient, @@ -833,7 +915,7 @@ func buildAPIHandlers( notificationClient ports.NotificationCommandClient, ) *api.ApiHandlers { return &api.ApiHandlers{ - TaskHandler: api.NewTaskHandler(taskService), + TaskHandler: api.NewTaskHandler(taskClient), TaskClassHandler: api.NewTaskClassHandler(taskClassService), CourseHandler: api.NewCourseHandler(courseService), ScheduleHandler: api.NewScheduleAPI(scheduleClient), diff --git a/backend/cmd/task/main.go b/backend/cmd/task/main.go new file mode 100644 index 0000000..471dd33 --- /dev/null +++ b/backend/cmd/task/main.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + + "github.com/LoveLosita/smartflow/backend/bootstrap" + rootdao "github.com/LoveLosita/smartflow/backend/dao" + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + rootmiddleware "github.com/LoveLosita/smartflow/backend/middleware" + taskdao "github.com/LoveLosita/smartflow/backend/services/task/dao" + taskrpc "github.com/LoveLosita/smartflow/backend/services/task/rpc" + tasksv "github.com/LoveLosita/smartflow/backend/services/task/sv" + "github.com/spf13/viper" +) + +func main() { + if err := bootstrap.LoadConfig(); err != nil { + log.Fatalf("failed to load config: %v", err) + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + db, err := taskdao.OpenDBFromConfig() + if err != nil { + log.Fatalf("failed to connect task database: %v", err) + } + redisClient, err := taskdao.OpenRedisFromConfig() + if err != nil { + log.Fatalf("failed to connect task redis: %v", err) + } + defer redisClient.Close() + + cacheRepo := rootdao.NewCacheDAO(redisClient) + if err := db.Use(rootmiddleware.NewGormCachePlugin(cacheRepo)); err != nil { + log.Fatalf("failed to initialize task cache deleter: %v", err) + } + + taskRepo := taskdao.NewTaskDAO(db) + outboxRepo := outboxinfra.NewRepository(db) + eventBus, err := outboxinfra.NewEventBus(outboxRepo, kafkabus.LoadConfig()) + if err != nil { + log.Fatalf("failed to initialize task outbox bus: %v", err) + } + + svc := tasksv.NewTaskService(taskRepo, cacheRepo, eventBus) + // 迁移期 task 服务仍 best-effort 维护 active-scheduler due job,后续改成 RPC/事件后再移除该跨域 DAO。 + svc.SetActiveScheduleDAO(rootdao.NewActiveScheduleDAO(db)) + + if eventBus != nil { + if err := tasksv.RegisterTaskUrgencyPromoteHandler(eventBus, outboxRepo, taskRepo); err != nil { + log.Fatalf("failed to register task outbox handler: %v", err) + } + eventBus.Start(ctx) + defer eventBus.Close() + log.Println("Task outbox consumer started") + } else { + log.Println("Task outbox consumer is disabled") + } + + server, listenOn, err := taskrpc.NewServer(taskrpc.ServerOptions{ + ListenOn: viper.GetString("task.rpc.listenOn"), + Timeout: viper.GetDuration("task.rpc.timeout"), + Service: svc, + }) + if err != nil { + log.Fatalf("failed to build task zrpc server: %v", err) + } + defer server.Stop() + + go func() { + log.Printf("task zrpc service starting on %s", listenOn) + server.Start() + }() + + <-ctx.Done() + log.Println("task service stopping") +} diff --git a/backend/config.example.yaml b/backend/config.example.yaml index 94921af..b8eb63d 100644 --- a/backend/config.example.yaml +++ b/backend/config.example.yaml @@ -67,6 +67,14 @@ schedule: - "127.0.0.1:9084" timeout: 6s +# 任务服务配置。 +task: + rpc: + listenOn: "0.0.0.0:9085" + endpoints: + - "127.0.0.1:9085" + timeout: 6s + # 主动调度服务配置。 activeScheduler: rpc: diff --git a/backend/gateway/api/task.go b/backend/gateway/api/task.go index 1d90c8d..46ba20b 100644 --- a/backend/gateway/api/task.go +++ b/backend/gateway/api/task.go @@ -2,64 +2,54 @@ package api import ( "context" - "fmt" "net/http" "time" - "github.com/LoveLosita/smartflow/backend/model" "github.com/LoveLosita/smartflow/backend/respond" - "github.com/LoveLosita/smartflow/backend/service" + taskcontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/task" + "github.com/LoveLosita/smartflow/backend/shared/ports" "github.com/gin-gonic/gin" ) +const taskRequestTimeout = 6 * time.Second + type TaskHandler struct { - // 伸出手:准备接住 Service - svc *service.TaskService + client ports.TaskCommandClient } -// NewTaskHandler 创建 TaskHandler 实例 -func NewTaskHandler(svc *service.TaskService) *TaskHandler { - return &TaskHandler{ - svc: svc, - } +// NewTaskHandler 创建 task HTTP 门面。 +func NewTaskHandler(client ports.TaskCommandClient) *TaskHandler { + return &TaskHandler{client: client} } func (th *TaskHandler) AddTask(c *gin.Context) { - //1. 绑定请求参数 - var req model.UserAddTaskRequest + var req taskcontracts.AddTaskRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, respond.WrongParamType) - fmt.Println(err) return } - // 用户ID从上下文中获取 - userID := c.GetInt("user_id") - //2. 调用 Service 层处理业务逻辑 - // 创建一个带 1 秒超时的上下文 - ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second) - defer cancel() // 记得释放资源 - resp, err := th.svc.AddTask(ctx, &req, userID) + req.UserID = c.GetInt("user_id") + + ctx, cancel := context.WithTimeout(c.Request.Context(), taskRequestTimeout) + defer cancel() + resp, err := th.client.AddTask(ctx, req) if err != nil { respond.DealWithError(c, err) return } - //3. 返回响应 c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } func (th *TaskHandler) GetUserTasks(c *gin.Context) { - // 用户ID从上下文中获取 userID := c.GetInt("user_id") - //2. 调用 Service 层处理业务逻辑 - // 创建一个带 1 秒超时的上下文 - ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second) - defer cancel() // 记得释放资源 - resp, err := th.svc.GetUserTasks(ctx, userID) + + ctx, cancel := context.WithTimeout(c.Request.Context(), taskRequestTimeout) + defer cancel() + resp, err := th.client.GetUserTasks(ctx, userID) if err != nil { respond.DealWithError(c, err) return } - //3. 返回响应 c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } @@ -67,163 +57,98 @@ func (th *TaskHandler) GetUserTasks(c *gin.Context) { // // 职责边界: // 1. 负责解析 ids 与读取鉴权上下文中的 user_id; -// 2. 负责调用 Service 复用任务缓存读取链路; +// 2. 负责调用 task 服务,不直接读取任务缓存或数据库; // 3. 不修改任务、不触发幂等中间件、不反写 NewAgent timeline 历史 payload。 func (th *TaskHandler) BatchTaskStatus(c *gin.Context) { - // 1. 绑定请求参数。ids 允许为空切片,表示前端当前没有需要 hydration 的任务卡片。 - var req model.BatchTaskStatusRequest + var req taskcontracts.BatchTaskStatusRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, respond.WrongParamType) - fmt.Println(err) return } + req.UserID = c.GetInt("user_id") - // 2. 从鉴权上下文读取 user_id,Service 会继续用该 user_id 限定任务集合。 - userID := c.GetInt("user_id") - - // 3. 设置短超时:该接口只读缓存/任务列表,避免异常情况下长时间占用连接。 - ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second) + ctx, cancel := context.WithTimeout(c.Request.Context(), taskRequestTimeout) defer cancel() - - // 4. 调用 Service 做 ID 归一化与当前状态查询。 - resp, err := th.svc.BatchTaskStatus(ctx, &req, userID) + resp, err := th.client.BatchTaskStatus(ctx, req) if err != nil { respond.DealWithError(c, err) return } - - // 5. 返回统一响应结构,items 为空时仍按 success 返回,便于前端无分支处理。 c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } // CompleteTask 标记任务为已完成。 -// -// 职责边界: -// 1. 负责解析请求与读取 user_id; -// 2. 负责调用 Service 执行业务; -// 3. 不负责幂等校验(幂等由路由中间件处理)。 func (th *TaskHandler) CompleteTask(c *gin.Context) { - // 1. 绑定请求参数。 - var req model.UserCompleteTaskRequest + var req taskcontracts.CompleteTaskRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, respond.WrongParamType) - fmt.Println(err) return } + req.UserID = c.GetInt("user_id") - // 2. 从鉴权上下文获取 user_id,保证只能操作自己的任务。 - userID := c.GetInt("user_id") - - // 3. 设置短超时,避免该写接口长期占用连接。 - ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second) + ctx, cancel := context.WithTimeout(c.Request.Context(), taskRequestTimeout) defer cancel() - - // 4. 调用 Service 执行"标记完成"逻辑。 - resp, err := th.svc.CompleteTask(ctx, &req, userID) + resp, err := th.client.CompleteTask(ctx, req) if err != nil { respond.DealWithError(c, err) return } - - // 5. 返回统一响应结构。 c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } -// UndoCompleteTask 取消任务"已完成"勾选。 -// -// 职责边界: -// 1. 负责解析请求与读取 user_id; -// 2. 负责调用 Service 执行业务恢复; -// 3. 不负责"任务是否已完成"的业务判断(由 Service/DAO 负责)。 +// UndoCompleteTask 取消任务“已完成”勾选。 func (th *TaskHandler) UndoCompleteTask(c *gin.Context) { - // 1. 绑定请求参数。 - var req model.UserUndoCompleteTaskRequest + var req taskcontracts.UndoCompleteTaskRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, respond.WrongParamType) - fmt.Println(err) return } + req.UserID = c.GetInt("user_id") - // 2. 从鉴权上下文读取 user_id,保证只操作当前用户任务。 - userID := c.GetInt("user_id") - - // 3. 设置短超时,避免该写接口占用连接过久。 - ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second) + ctx, cancel := context.WithTimeout(c.Request.Context(), taskRequestTimeout) defer cancel() - - // 4. 调用 Service 执行"取消已完成勾选"逻辑。 - resp, err := th.svc.UndoCompleteTask(ctx, &req, userID) + resp, err := th.client.UndoCompleteTask(ctx, req) if err != nil { respond.DealWithError(c, err) return } - - // 5. 返回统一响应结构。 c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } // UpdateTask 更新任务属性(部分更新)。 -// -// 职责边界: -// 1. 负责解析请求与读取 user_id; -// 2. 负责调用 Service 执行业务; -// 3. 不负责幂等校验(幂等由路由中间件处理)。 func (th *TaskHandler) UpdateTask(c *gin.Context) { - // 1. 绑定请求参数。 - var req model.UserUpdateTaskRequest + var req taskcontracts.UpdateTaskRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, respond.WrongParamType) - fmt.Println(err) return } + req.UserID = c.GetInt("user_id") - // 2. 从鉴权上下文读取 user_id,保证只操作当前用户任务。 - userID := c.GetInt("user_id") - - // 3. 设置短超时,避免该写接口占用连接过久。 - ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second) + ctx, cancel := context.WithTimeout(c.Request.Context(), taskRequestTimeout) defer cancel() - - // 4. 调用 Service 执行更新逻辑。 - resp, err := th.svc.UpdateTask(ctx, &req, userID) + resp, err := th.client.UpdateTask(ctx, req) if err != nil { respond.DealWithError(c, err) return } - - // 5. 返回统一响应结构。 c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } // DeleteTask 永久删除指定任务。 -// -// 职责边界: -// 1. 负责解析请求与读取 user_id; -// 2. 负责调用 Service 执行删除; -// 3. 不负责幂等校验(幂等由路由中间件处理)。 func (th *TaskHandler) DeleteTask(c *gin.Context) { - // 1. 绑定请求参数。 - var req model.UserCompleteTaskRequest + var req taskcontracts.DeleteTaskRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, respond.WrongParamType) - fmt.Println(err) return } + req.UserID = c.GetInt("user_id") - // 2. 从鉴权上下文读取 user_id,保证只操作当前用户任务。 - userID := c.GetInt("user_id") - - // 3. 设置短超时,避免该写接口占用连接过久。 - ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second) + ctx, cancel := context.WithTimeout(c.Request.Context(), taskRequestTimeout) defer cancel() - - // 4. 调用 Service 执行删除逻辑。 - taskID, err := th.svc.DeleteTask(ctx, &req, userID) + resp, err := th.client.DeleteTask(ctx, req) if err != nil { respond.DealWithError(c, err) return } - - // 5. 返回统一响应结构。 - c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, gin.H{"task_id": taskID})) + c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, resp)) } diff --git a/backend/gateway/client/task/client.go b/backend/gateway/client/task/client.go new file mode 100644 index 0000000..ffafd97 --- /dev/null +++ b/backend/gateway/client/task/client.go @@ -0,0 +1,149 @@ +package task + +import ( + "context" + "encoding/json" + "errors" + "strings" + "time" + + taskpb "github.com/LoveLosita/smartflow/backend/services/task/rpc/pb" + taskcontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/task" + "github.com/zeromicro/go-zero/zrpc" + "google.golang.org/grpc" +) + +const ( + defaultEndpoint = "127.0.0.1:9085" + defaultTimeout = 6 * time.Second +) + +type ClientConfig struct { + Endpoints []string + Target string + Timeout time.Duration +} + +// Client 是 gateway 侧 task zrpc 的最小适配层。 +// +// 职责边界: +// 1. 只负责跨进程 gRPC 调用和 JSON 透传,不碰 DAO、outbox 或 active-scheduler job; +// 2. HTTP 入参仍由 gateway/api 做基础绑定,业务校验交给 task 服务; +// 3. 复杂响应不在 gateway 重建模型,避免 DTO 复制扩散。 +type Client struct { + rpc taskpb.TaskClient +} + +func NewClient(cfg ClientConfig) (*Client, error) { + timeout := cfg.Timeout + if timeout <= 0 { + timeout = defaultTimeout + } + endpoints := normalizeEndpoints(cfg.Endpoints) + target := strings.TrimSpace(cfg.Target) + if len(endpoints) == 0 && target == "" { + endpoints = []string{defaultEndpoint} + } + + zclient, err := zrpc.NewClient(zrpc.RpcClientConf{ + Endpoints: endpoints, + Target: target, + NonBlock: true, + Timeout: int64(timeout / time.Millisecond), + }) + if err != nil { + return nil, err + } + client := &Client{rpc: taskpb.NewTaskClient(zclient.Conn())} + if err := client.ping(timeout); err != nil { + return nil, err + } + return client, nil +} + +func (c *Client) AddTask(ctx context.Context, req taskcontracts.AddTaskRequest) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.AddTask, req) + return jsonFromResponse(resp, err) +} + +func (c *Client) GetUserTasks(ctx context.Context, userID int) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.GetUserTasks, taskcontracts.UserRequest{UserID: userID}) + return jsonFromResponse(resp, err) +} + +func (c *Client) BatchTaskStatus(ctx context.Context, req taskcontracts.BatchTaskStatusRequest) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.BatchTaskStatus, req) + return jsonFromResponse(resp, err) +} + +func (c *Client) CompleteTask(ctx context.Context, req taskcontracts.CompleteTaskRequest) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.CompleteTask, req) + return jsonFromResponse(resp, err) +} + +func (c *Client) UndoCompleteTask(ctx context.Context, req taskcontracts.UndoCompleteTaskRequest) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.UndoCompleteTask, req) + return jsonFromResponse(resp, err) +} + +func (c *Client) UpdateTask(ctx context.Context, req taskcontracts.UpdateTaskRequest) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.UpdateTask, req) + return jsonFromResponse(resp, err) +} + +func (c *Client) DeleteTask(ctx context.Context, req taskcontracts.DeleteTaskRequest) (json.RawMessage, error) { + resp, err := c.callJSON(ctx, c.rpc.DeleteTask, req) + return jsonFromResponse(resp, err) +} + +func (c *Client) ensureReady() error { + if c == nil || c.rpc == nil { + return errors.New("task zrpc client is not initialized") + } + return nil +} + +func (c *Client) ping(timeout time.Duration) error { + if err := c.ensureReady(); err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + _, err := c.rpc.Ping(ctx, &taskpb.StatusResponse{}) + return responseFromRPCError(err) +} + +func (c *Client) callJSON(ctx context.Context, fn func(context.Context, *taskpb.JSONRequest, ...grpc.CallOption) (*taskpb.JSONResponse, error), payload any) (*taskpb.JSONResponse, error) { + if err := c.ensureReady(); err != nil { + return nil, err + } + raw, err := json.Marshal(payload) + if err != nil { + return nil, err + } + return fn(ctx, &taskpb.JSONRequest{PayloadJson: raw}) +} + +func jsonFromResponse(resp *taskpb.JSONResponse, rpcErr error) (json.RawMessage, error) { + if rpcErr != nil { + return nil, responseFromRPCError(rpcErr) + } + if resp == nil { + return nil, errors.New("task zrpc service returned empty JSON response") + } + if len(resp.DataJson) == 0 { + return json.RawMessage("null"), nil + } + return json.RawMessage(resp.DataJson), nil +} + +func normalizeEndpoints(values []string) []string { + endpoints := make([]string, 0, len(values)) + for _, value := range values { + trimmed := strings.TrimSpace(value) + if trimmed != "" { + endpoints = append(endpoints, trimmed) + } + } + return endpoints +} diff --git a/backend/gateway/client/task/errors.go b/backend/gateway/client/task/errors.go new file mode 100644 index 0000000..fb88363 --- /dev/null +++ b/backend/gateway/client/task/errors.go @@ -0,0 +1,94 @@ +package task + +import ( + "errors" + "fmt" + "strings" + + "github.com/LoveLosita/smartflow/backend/respond" + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// responseFromRPCError 负责把 task 的 gRPC 错误反解回项目内错误。 +// +// 职责边界: +// 1. 只在 gateway 边缘层使用; +// 2. 业务错误尽量恢复成 respond.Response,方便 API 层继续复用 DealWithError; +// 3. 服务不可用或未知内部错误包装成普通 error,避免误报成用户可修正的参数问题。 +func responseFromRPCError(err error) error { + if err == nil { + return nil + } + + st, ok := status.FromError(err) + if !ok { + return wrapRPCError(err) + } + if resp, ok := responseFromStatus(st); ok { + return resp + } + + switch st.Code() { + case codes.Internal, codes.Unknown, codes.Unavailable, codes.DeadlineExceeded, codes.DataLoss, codes.Unimplemented: + msg := strings.TrimSpace(st.Message()) + if msg == "" { + msg = "task zrpc service internal error" + } + return wrapRPCError(errors.New(msg)) + } + + msg := strings.TrimSpace(st.Message()) + if msg == "" { + msg = "task zrpc service rejected request" + } + return respond.Response{Status: grpcCodeToRespondStatus(st.Code()), Info: msg} +} + +func responseFromStatus(st *status.Status) (respond.Response, bool) { + if st == nil { + return respond.Response{}, false + } + for _, detail := range st.Details() { + info, ok := detail.(*errdetails.ErrorInfo) + if !ok { + continue + } + statusValue := strings.TrimSpace(info.Reason) + if statusValue == "" { + statusValue = grpcCodeToRespondStatus(st.Code()) + } + message := strings.TrimSpace(st.Message()) + if message == "" && info.Metadata != nil { + message = strings.TrimSpace(info.Metadata["info"]) + } + if message == "" { + message = statusValue + } + return respond.Response{Status: statusValue, Info: message}, true + } + return respond.Response{}, false +} + +func grpcCodeToRespondStatus(code codes.Code) string { + switch code { + case codes.Unauthenticated: + return respond.ErrUnauthorized.Status + case codes.InvalidArgument: + return respond.MissingParam.Status + case codes.NotFound: + return respond.UserTasksEmpty.Status + case codes.Internal, codes.Unknown, codes.DataLoss: + return "500" + default: + return "400" + } +} + +func wrapRPCError(err error) error { + if err == nil { + return nil + } + return fmt.Errorf("调用 task zrpc 服务失败: %w", err) +} diff --git a/backend/service/events/core_outbox_handlers.go b/backend/service/events/core_outbox_handlers.go index fb4f7c7..c9cb725 100644 --- a/backend/service/events/core_outbox_handlers.go +++ b/backend/service/events/core_outbox_handlers.go @@ -137,13 +137,6 @@ func coreOutboxHandlerRoutes( return RegisterChatHistoryPersistHandler(eventBus, outboxRepo, repoManager, adjuster) }, }, - { - EventType: EventTypeTaskUrgencyPromoteRequested, - Service: outboxHandlerServiceTask, - Register: func() error { - return RegisterTaskUrgencyPromoteHandler(eventBus, outboxRepo, repoManager) - }, - }, { EventType: EventTypeChatTokenUsageAdjustRequested, Service: outboxHandlerServiceAgent, diff --git a/backend/service/events/outbox_bus.go b/backend/service/events/outbox_bus.go index df9a342..ca7cc87 100644 --- a/backend/service/events/outbox_bus.go +++ b/backend/service/events/outbox_bus.go @@ -167,7 +167,6 @@ func orderedOutboxServiceNames(buses map[string]OutboxBus) []string { func OutboxServiceNames() []string { return []string{ string(outboxHandlerServiceAgent), - string(outboxHandlerServiceTask), string(outboxHandlerServiceMemory), } } diff --git a/backend/service/events/task_urgency_promote.go b/backend/service/events/task_urgency_promote.go index f4c5c36..6b2bf17 100644 --- a/backend/service/events/task_urgency_promote.go +++ b/backend/service/events/task_urgency_promote.go @@ -24,6 +24,16 @@ const ( EventTypeTaskUrgencyPromoteRequested = "task.urgency.promote.requested" ) +// RegisterTaskUrgencyPromoteRoute 只登记 task 事件归属,不注册消费 handler。 +// +// 职责边界: +// 1. 供单体残留路径在迁移期继续把 task 事件写入 task_outbox_messages; +// 2. 不创建 consumer,也不启动 handler,真正消费已迁到 cmd/task; +// 3. 重复登记同一归属是幂等操作。 +func RegisterTaskUrgencyPromoteRoute() error { + return outboxinfra.RegisterEventService(EventTypeTaskUrgencyPromoteRequested, string(outboxHandlerServiceTask)) +} + // RegisterTaskUrgencyPromoteHandler 注册“任务紧急性平移”消费者处理器。 // // 职责边界: diff --git a/backend/services/active_scheduler/core/adapters/task_rpc.go b/backend/services/active_scheduler/core/adapters/task_rpc.go new file mode 100644 index 0000000..a9d10d5 --- /dev/null +++ b/backend/services/active_scheduler/core/adapters/task_rpc.go @@ -0,0 +1,152 @@ +package adapters + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + activeports "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/ports" + taskpb "github.com/LoveLosita/smartflow/backend/services/task/rpc/pb" + taskcontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/task" + "github.com/zeromicro/go-zero/zrpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + defaultTaskRPCEndpoint = "127.0.0.1:9085" + defaultTaskRPCTimeout = 6 * time.Second +) + +type TaskRPCConfig struct { + Endpoints []string + Target string + Timeout time.Duration +} + +// TaskRPCAdapter 是 active-scheduler 访问 task 服务的 RPC 适配器。 +// +// 职责边界: +// 1. 只读取 task_pool 事实并转换为 active-scheduler 内部 DTO; +// 2. 不写 tasks 表、不维护 task outbox,也不处理 due job 状态; +// 3. 让 active-scheduler dry-run / due scanner 不再直接访问 tasks 表。 +type TaskRPCAdapter struct { + rpc taskpb.TaskClient +} + +func NewTaskRPCAdapter(cfg TaskRPCConfig) (*TaskRPCAdapter, error) { + timeout := cfg.Timeout + if timeout <= 0 { + timeout = defaultTaskRPCTimeout + } + endpoints := normalizeTaskRPCEndpoints(cfg.Endpoints) + target := strings.TrimSpace(cfg.Target) + if len(endpoints) == 0 && target == "" { + endpoints = []string{defaultTaskRPCEndpoint} + } + zclient, err := zrpc.NewClient(zrpc.RpcClientConf{ + Endpoints: endpoints, + Target: target, + NonBlock: true, + Timeout: int64(timeout / time.Millisecond), + }) + if err != nil { + return nil, err + } + adapter := &TaskRPCAdapter{rpc: taskpb.NewTaskClient(zclient.Conn())} + if err := adapter.ping(timeout); err != nil { + return nil, err + } + return adapter, nil +} + +func (a *TaskRPCAdapter) GetTaskForActiveSchedule(ctx context.Context, req activeports.TaskRequest) (activeports.TaskFact, bool, error) { + if err := a.ensureReady(); err != nil { + return activeports.TaskFact{}, false, err + } + payload, err := json.Marshal(taskcontracts.TaskFactRequest{ + UserID: req.UserID, + TaskID: req.TaskID, + Now: req.Now, + }) + if err != nil { + return activeports.TaskFact{}, false, err + } + resp, err := a.rpc.GetTaskForActiveSchedule(ctx, &taskpb.JSONRequest{PayloadJson: payload}) + if err != nil { + return activeports.TaskFact{}, false, taskRPCError(err) + } + var contractResp taskcontracts.TaskFactResponse + if err := json.Unmarshal(taskJSONBytes(resp), &contractResp); err != nil { + return activeports.TaskFact{}, false, err + } + return taskFactToActive(contractResp.Task), contractResp.Found, nil +} + +func (a *TaskRPCAdapter) ensureReady() error { + if a == nil || a.rpc == nil { + return errors.New("task rpc adapter 未初始化") + } + return nil +} + +func (a *TaskRPCAdapter) ping(timeout time.Duration) error { + if err := a.ensureReady(); err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + _, err := a.rpc.Ping(ctx, &taskpb.StatusResponse{}) + return taskRPCError(err) +} + +func taskRPCError(err error) error { + if err == nil { + return nil + } + st, ok := status.FromError(err) + if !ok { + return err + } + if st.Code() == codes.NotFound { + return nil + } + if st.Code() == codes.Internal || st.Code() == codes.Unavailable || st.Code() == codes.DeadlineExceeded { + return fmt.Errorf("调用 task zrpc 服务失败: %w", err) + } + return err +} + +func taskFactToActive(task taskcontracts.TaskFact) activeports.TaskFact { + return activeports.TaskFact{ + ID: task.ID, + UserID: task.UserID, + Title: task.Title, + Priority: task.Priority, + IsCompleted: task.IsCompleted, + DeadlineAt: task.DeadlineAt, + UrgencyThresholdAt: task.UrgencyThresholdAt, + EstimatedSections: task.EstimatedSections, + } +} + +func taskJSONBytes(resp *taskpb.JSONResponse) []byte { + if resp == nil || len(resp.DataJson) == 0 { + return []byte("null") + } + return resp.DataJson +} + +func normalizeTaskRPCEndpoints(values []string) []string { + endpoints := make([]string, 0, len(values)) + for _, value := range values { + trimmed := strings.TrimSpace(value) + if trimmed != "" { + endpoints = append(endpoints, trimmed) + } + } + return endpoints +} diff --git a/backend/services/active_scheduler/dao/connect.go b/backend/services/active_scheduler/dao/connect.go index dbc8bfb..074baa0 100644 --- a/backend/services/active_scheduler/dao/connect.go +++ b/backend/services/active_scheduler/dao/connect.go @@ -75,7 +75,7 @@ type runtimeDependencyTable struct { // // 职责边界: // 1. 只检查表是否存在,不 AutoMigrate、不补列、不修改任何跨域表; -// 2. 把 active-scheduler 运行时仍然需要的 task / agent / notification outbox 边界显式化; +// 2. 把 active-scheduler 运行时仍然需要的 agent / notification outbox 边界显式化; // 3. 若部署顺序、库权限或表结构归属不满足,启动阶段直接 fail fast,避免第一次 trigger 才反复重试。 func ensureRuntimeDependencyTables(db *gorm.DB) error { if db == nil { @@ -110,7 +110,7 @@ func ensureTableExists(db *gorm.DB, table runtimeDependencyTable) error { // 说明: // 1. active-scheduler 自有表在 OpenDBFromConfig 内迁移,这里只放跨域依赖; // 2. notification outbox 表名来自 service catalog,避免和 outbox 多表路由配置漂移; -// 3. schedule 读写已切到 schedule RPC;后续切到 task/agent/notification RPC 或 read model 后,应继续移除对应表依赖。 +// 3. schedule 与 task 事实读取已切到 RPC;后续切到 agent/notification RPC 或 read model 后,应继续移除对应表依赖。 func activeSchedulerRuntimeDependencyTables() []runtimeDependencyTable { notificationOutboxTable := "notification_outbox_messages" if cfg, ok := outboxinfra.ResolveServiceConfig(outboxinfra.ServiceNotification); ok && cfg.TableName != "" { @@ -118,7 +118,6 @@ func activeSchedulerRuntimeDependencyTables() []runtimeDependencyTable { } return []runtimeDependencyTable{ - {Name: "tasks", Reason: "迁移期 dry-run / due job scanner 仍读取 task_pool 事实,下一轮切 task RPC 后移除"}, {Name: "agent_chats", Reason: "trigger 生成 preview 后预建主动调度会话"}, {Name: "chat_histories", Reason: "trigger 生成 preview 后写入会话首屏消息"}, {Name: "agent_timeline_events", Reason: "trigger 生成 preview 后写入主动调度时间线卡片"}, diff --git a/backend/services/active_scheduler/sv/service.go b/backend/services/active_scheduler/sv/service.go index 1856e60..f065383 100644 --- a/backend/services/active_scheduler/sv/service.go +++ b/backend/services/active_scheduler/sv/service.go @@ -39,6 +39,7 @@ type Options struct { JobScanEvery time.Duration JobScanLimit int KafkaConfig kafkabus.Config + TaskRPC activeadapters.TaskRPCConfig ScheduleRPC activeadapters.ScheduleRPCConfig } @@ -69,12 +70,15 @@ func New(db *gorm.DB, llmService *llmservice.Service, opts Options) (*Service, e } activeDAO := rootdao.NewActiveScheduleDAO(db) - activeReaders := activeadapters.NewGormReaders(db) + taskRPCAdapter, err := activeadapters.NewTaskRPCAdapter(opts.TaskRPC) + if err != nil { + return nil, fmt.Errorf("initialize task rpc adapter failed: %w", err) + } scheduleRPCAdapter, err := activeadapters.NewScheduleRPCAdapter(opts.ScheduleRPC) if err != nil { return nil, fmt.Errorf("initialize schedule rpc adapter failed: %w", err) } - readers := activeadapters.ReadersWithScheduleRPC(activeReaders, scheduleRPCAdapter) + readers := activeadapters.ReadersWithScheduleRPC(taskRPCAdapter, scheduleRPCAdapter) dryRun, err := activesvc.NewDryRunService(readers) if err != nil { return nil, err diff --git a/backend/services/task/dao/connect.go b/backend/services/task/dao/connect.go new file mode 100644 index 0000000..f5010a1 --- /dev/null +++ b/backend/services/task/dao/connect.go @@ -0,0 +1,92 @@ +package dao + +import ( + "context" + "fmt" + + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + "github.com/LoveLosita/smartflow/backend/model" + "github.com/go-redis/redis/v8" + "github.com/spf13/viper" + "gorm.io/driver/mysql" + "gorm.io/gorm" +) + +// OpenDBFromConfig 创建 task 服务自己的数据库句柄。 +// +// 职责边界: +// 1. 只迁移 tasks 表和 task 服务自己的 outbox 表; +// 2. 不迁移 active-scheduler、schedule、course 或 task-class 表; +// 3. 迁移期仍检查 active_schedule_jobs 是否存在,因为 task 写入后还会 best-effort 同步 due job。 +func OpenDBFromConfig() (*gorm.DB, error) { + host := viper.GetString("database.host") + port := viper.GetString("database.port") + user := viper.GetString("database.user") + password := viper.GetString("database.password") + dbname := viper.GetString("database.dbname") + + dsn := fmt.Sprintf( + "%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", + user, password, host, port, dbname, + ) + + db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) + if err != nil { + return nil, err + } + if err = db.AutoMigrate(&model.Task{}); err != nil { + return nil, fmt.Errorf("auto migrate task tables failed: %w", err) + } + if err = autoMigrateTaskOutboxTable(db); err != nil { + return nil, err + } + if err = ensureRuntimeDependencyTables(db); err != nil { + return nil, err + } + return db, nil +} + +// OpenRedisFromConfig 创建 task 服务自己的 Redis 句柄。 +// +// 职责边界: +// 1. 只负责初始化 task 缓存和紧急性平移去重锁所需 Redis client; +// 2. 不清理任何业务 key; +// 3. Ping 失败直接返回错误,避免缓存链路静默降级。 +func OpenRedisFromConfig() (*redis.Client, error) { + client := redis.NewClient(&redis.Options{ + Addr: viper.GetString("redis.host") + ":" + viper.GetString("redis.port"), + Password: viper.GetString("redis.password"), + DB: 0, + }) + if _, err := client.Ping(context.Background()).Result(); err != nil { + return nil, err + } + return client, nil +} + +// autoMigrateTaskOutboxTable 只迁移 task 服务自己的 outbox 物理表。 +func autoMigrateTaskOutboxTable(db *gorm.DB) error { + cfg, ok := outboxinfra.ResolveServiceConfig(outboxinfra.ServiceTask) + if !ok { + return fmt.Errorf("resolve task outbox config failed") + } + if err := db.Table(cfg.TableName).AutoMigrate(&model.AgentOutboxMessage{}); err != nil { + return fmt.Errorf("auto migrate task outbox table failed for %s (%s): %w", cfg.Name, cfg.TableName, err) + } + return nil +} + +// ensureRuntimeDependencyTables 显式检查 task 迁移期仍写入的跨域表。 +// +// 说明: +// 1. active_schedule_jobs 属于 active-scheduler,自有迁移仍由 active-scheduler 管理; +// 2. 本轮为保持任务写入后 due job 同步语义,task 服务只检查存在性; +// 3. 下一轮把 due job 同步改为 active-scheduler RPC 或事件后,应从这里移除。 +func ensureRuntimeDependencyTables(db *gorm.DB) error { + for _, table := range []string{"active_schedule_jobs"} { + if !db.Migrator().HasTable(table) { + return fmt.Errorf("task runtime dependency table missing: %s", table) + } + } + return nil +} diff --git a/backend/services/task/dao/task.go b/backend/services/task/dao/task.go new file mode 100644 index 0000000..e89d9b7 --- /dev/null +++ b/backend/services/task/dao/task.go @@ -0,0 +1,360 @@ +package dao + +import ( + "context" + "errors" + "time" + + "github.com/LoveLosita/smartflow/backend/model" + "github.com/LoveLosita/smartflow/backend/respond" + "gorm.io/gorm" +) + +type TaskDAO struct { + // 这是一个口袋,用来装数据库连接实例 + db *gorm.DB +} + +// NewTaskDAO 创建TaskDAO实例 +// NewTaskDAO 接收一个 *gorm.DB,并把它塞进结构体的口袋里 +func NewTaskDAO(db *gorm.DB) *TaskDAO { + return &TaskDAO{ + db: db, + } +} + +func (r *TaskDAO) WithTx(tx *gorm.DB) *TaskDAO { + return &TaskDAO{db: tx} +} + +// AddTask 为指定用户添加任务 +func (dao *TaskDAO) AddTask(req *model.Task) (*model.Task, error) { + if err := dao.db.Create(req).Error; err != nil { + return nil, err + } + return req, nil +} + +func (dao *TaskDAO) GetTasksByUserID(userID int) ([]model.Task, error) { + var tasks []model.Task + if err := dao.db.Where("user_id = ?", userID).Find(&tasks).Error; err != nil { + return nil, err + } + if len(tasks) == 0 { // 如果没有任务,返回自定义错误 + return nil, respond.UserTasksEmpty + } + return tasks, nil +} + +// GetTaskByUserAndID 读取当前用户拥有的单个任务快照。 +// +// 职责边界: +// 1. 只按 user_id + task_id 做所有权限定查询; +// 2. 不做主动调度事实转换,也不处理 found=false 语义; +// 3. gorm.ErrRecordNotFound 由调用方按业务场景映射。 +func (dao *TaskDAO) GetTaskByUserAndID(ctx context.Context, userID int, taskID int) (*model.Task, error) { + if userID <= 0 || taskID <= 0 { + return nil, gorm.ErrRecordNotFound + } + var task model.Task + if err := dao.db.WithContext(ctx). + Where("id = ? AND user_id = ?", taskID, userID). + First(&task).Error; err != nil { + return nil, err + } + return &task, nil +} + +// CompleteTaskByID 将指定任务标记为"已完成"。 +// +// 职责边界: +// 1. 只负责"当前用户 + 指定 task_id"的完成状态更新; +// 2. 不负责幂等中间件(由路由层统一挂载); +// 3. 不负责业务层响应包装(由 Service 层处理)。 +// +// 返回语义: +// 1. 第一个返回值 *model.Task:返回更新后的任务快照(至少含 ID/UserID/IsCompleted); +// 2. 第二个返回值 bool: +// 2.1 true:任务原本就已完成,本次属于幂等命中; +// 2.2 false:本次从未完成成功更新为已完成; +// 3. error: +// 3.1 gorm.ErrRecordNotFound:任务不存在或不属于当前用户; +// 3.2 其他 error:数据库异常。 +func (dao *TaskDAO) CompleteTaskByID(ctx context.Context, userID int, taskID int) (*model.Task, bool, error) { + // 1. 基础兜底:非法参数直接返回"记录不存在"语义,避免下游误写。 + if userID <= 0 || taskID <= 0 { + return nil, false, gorm.ErrRecordNotFound + } + + // 2. 先查询目标任务,明确区分"已完成"与"不存在"。 + var target model.Task + findErr := dao.db.WithContext(ctx). + Where("id = ? AND user_id = ?", taskID, userID). + First(&target).Error + if findErr != nil { + return nil, false, findErr + } + + // 3. 若任务已完成,直接按幂等成功返回,不再写库。 + if target.IsCompleted { + return &target, true, nil + } + + // 4. 若任务未完成,执行状态更新。 + // + // 4.1 使用 Model(&model.Task{UserID:userID}) 的目的: + // 让 cache_deleter 在 GORM Update 回调里拿到 user_id,从而正确删除任务缓存。 + // 4.2 更新条件继续限定 user_id + id,避免误更新其他用户数据。 + updateResult := dao.db.WithContext(ctx). + Model(&model.Task{UserID: userID}). + Where("id = ? AND user_id = ?", taskID, userID). + Update("is_completed", true) + if updateResult.Error != nil { + return nil, false, updateResult.Error + } + + // 5. 极端并发兜底: + // 5.1 若 RowsAffected=0,可能是并发请求已先一步更新; + // 5.2 此时二次读取任务状态,若已完成则按幂等成功返回,否则视为不存在/异常。 + if updateResult.RowsAffected == 0 { + var check model.Task + checkErr := dao.db.WithContext(ctx). + Where("id = ? AND user_id = ?", taskID, userID). + First(&check).Error + if checkErr != nil { + return nil, false, checkErr + } + if check.IsCompleted { + return &check, true, nil + } + return nil, false, errors.New("任务状态更新失败") + } + + // 6. 返回更新后的快照给 Service 层组装响应。 + target.IsCompleted = true + return &target, false, nil +} + +// UndoCompleteTaskByID 将指定任务从"已完成"恢复为"未完成"。 +// +// 职责边界: +// 1. 只负责当前用户(user_id)下指定 task_id 的状态恢复; +// 2. 若任务本就未完成,按业务要求返回明确错误,不做幂等成功; +// 3. 不负责响应文案拼装(由 Service 层处理)。 +// +// 返回语义: +// 1. *model.Task:恢复后的任务快照; +// 2. error: +// 2.1 gorm.ErrRecordNotFound:任务不存在或不属于当前用户; +// 2.2 respond.TaskNotCompleted:任务当前不是"已完成"状态,不能执行取消勾选; +// 2.3 其他 error:数据库异常。 +func (dao *TaskDAO) UndoCompleteTaskByID(ctx context.Context, userID int, taskID int) (*model.Task, error) { + // 1. 参数兜底:非法 user/task 参数统一按"记录不存在"处理,避免误写。 + if userID <= 0 || taskID <= 0 { + return nil, gorm.ErrRecordNotFound + } + + // 2. 先读取目标任务,明确区分"不存在"和"状态不允许恢复"。 + var target model.Task + findErr := dao.db.WithContext(ctx). + Where("id = ? AND user_id = ?", taskID, userID). + First(&target).Error + if findErr != nil { + return nil, findErr + } + + // 3. 严格业务约束:若任务当前未完成,直接返回业务错误。 + // 3.1 这是本接口和"标记完成"接口的关键差异:这里不做幂等成功。 + if !target.IsCompleted { + return nil, respond.TaskNotCompleted + } + + // 4. 执行状态恢复(is_completed=true -> false)。 + // + // 4.1 使用 Model(&model.Task{UserID:userID}) 的目的是让 cache_deleter 拿到 user_id, + // 从而在回调中正确删除该用户任务缓存。 + updateResult := dao.db.WithContext(ctx). + Model(&model.Task{UserID: userID}). + Where("id = ? AND user_id = ?", taskID, userID). + Update("is_completed", false) + if updateResult.Error != nil { + return nil, updateResult.Error + } + + // 5. 并发兜底: + // 5.1 若 RowsAffected=0,说明可能被并发请求先一步恢复; + // 5.2 重新读取当前状态,若已是未完成则按业务规则返回"任务未完成"错误。 + if updateResult.RowsAffected == 0 { + var check model.Task + checkErr := dao.db.WithContext(ctx). + Where("id = ? AND user_id = ?", taskID, userID). + First(&check).Error + if checkErr != nil { + return nil, checkErr + } + if !check.IsCompleted { + return nil, respond.TaskNotCompleted + } + return nil, errors.New("取消任务完成状态失败") + } + + // 6. 回填恢复后状态并返回。 + target.IsCompleted = false + return &target, nil +} + +// PromoteTaskUrgencyByIDs 批量执行"任务紧急性平移"。 +// +// 职责边界: +// 1. 只负责把满足条件的任务从"不紧急象限"平移到"紧急象限": +// 1.1 priority=2 -> 1(重要不紧急 -> 重要且紧急); +// 1.2 priority=4 -> 3(不简单不重要 -> 简单不重要); +// 2. 只更新本次指定 user_id + task_ids 范围内的数据; +// 3. 不负责事件发布、重试去重和缓存策略(由 Service/Outbox 负责)。 +// +// 幂等与一致性说明: +// 1. SQL 条件会限制 `is_completed=0`、`urgency_threshold_at<=now`、`priority IN (2,4)`; +// 2. 同一批任务重复调用时,已经平移过的记录不会再次更新(幂等); +// 3. 使用 `Model(&model.Task{UserID:userID})` 是为了让 GORM 回调拿到 user_id,从而触发 cache_deleter 删除任务缓存。 +func (dao *TaskDAO) PromoteTaskUrgencyByIDs(ctx context.Context, userID int, taskIDs []int, now time.Time) (int64, error) { + // 1. 基础兜底:非法 user 或空任务列表直接无操作返回。 + if userID <= 0 || len(taskIDs) == 0 { + return 0, nil + } + + // 2. 去重并过滤非正数 ID,避免无效 where in 条件放大 SQL 噪音。 + validTaskIDs := compactPositiveIntIDs(taskIDs) + if len(validTaskIDs) == 0 { + return 0, nil + } + + // 3. 条件更新:只更新"已到紧急分界线且仍处于非紧急象限"的任务。 + result := dao.db.WithContext(ctx). + Model(&model.Task{UserID: userID}). + Where("user_id = ?", userID). + Where("id IN ?", validTaskIDs). + Where("is_completed = ?", false). + Where("urgency_threshold_at IS NOT NULL AND urgency_threshold_at <= ?", now). + Where("priority IN ?", []int{2, 4}). + Update("priority", gorm.Expr("CASE WHEN priority = 2 THEN 1 WHEN priority = 4 THEN 3 ELSE priority END")) + + if result.Error != nil { + return 0, result.Error + } + return result.RowsAffected, nil +} + +// UpdateTaskByID 按 task_id + user_id 更新指定字段。 +// +// 职责边界: +// 1. 只负责按 updates map 执行 SET 子句更新; +// 2. 不负责业务规则(如优先级范围校验),由 Service 层处理; +// 3. 使用 Model(&model.Task{UserID: userID}) 让 cache_deleter 回调拿到 user_id。 +// +// 返回语义: +// 1. *model.Task:更新后的完整任务快照; +// 2. error: +// 2.1 gorm.ErrRecordNotFound:任务不存在或不属于当前用户; +// 2.2 其他 error:数据库异常。 +func (dao *TaskDAO) UpdateTaskByID(ctx context.Context, userID int, taskID int, updates map[string]interface{}) (*model.Task, error) { + // 1. 参数兜底:非法参数直接返回"记录不存在"语义。 + if userID <= 0 || taskID <= 0 { + return nil, gorm.ErrRecordNotFound + } + + // 2. 先查询目标任务,确认存在且归属当前用户。 + var target model.Task + findErr := dao.db.WithContext(ctx). + Where("id = ? AND user_id = ?", taskID, userID). + First(&target).Error + if findErr != nil { + return nil, findErr + } + + // 3. 执行部分字段更新。 + // 3.1 使用 Model(&model.Task{UserID: userID}) 触发 cache_deleter。 + // 3.2 限定 id + user_id 条件,避免误更新。 + updateResult := dao.db.WithContext(ctx). + Model(&model.Task{UserID: userID}). + Where("id = ? AND user_id = ?", taskID, userID). + Updates(updates) + if updateResult.Error != nil { + return nil, updateResult.Error + } + + // 4. 更新后重新读取,保证返回完整且一致的快照。 + var updated model.Task + if err := dao.db.WithContext(ctx). + Where("id = ? AND user_id = ?", taskID, userID). + First(&updated).Error; err != nil { + return nil, err + } + + return &updated, nil +} + +// DeleteTaskByID 永久删除指定任务(硬删除)。 +// +// 职责边界: +// 1. 只负责删除 user_id + task_id 对应的记录; +// 2. 使用 Model(&model.Task{UserID: userID}) 触发 cache_deleter 删除用户任务缓存; +// 3. 不负责级联清理日程(tasks 与 schedule_events 无直接外键关联)。 +// +// 返回语义: +// 1. *model.Task:被删除的任务快照(用于响应前端); +// 2. error: +// 2.1 gorm.ErrRecordNotFound:任务不存在或不属于当前用户; +// 2.2 其他 error:数据库异常。 +func (dao *TaskDAO) DeleteTaskByID(ctx context.Context, userID int, taskID int) (*model.Task, error) { + // 1. 参数兜底。 + if userID <= 0 || taskID <= 0 { + return nil, gorm.ErrRecordNotFound + } + + // 2. 先查询目标任务,确认存在且归属当前用户,同时获取快照用于响应。 + var target model.Task + findErr := dao.db.WithContext(ctx). + Where("id = ? AND user_id = ?", taskID, userID). + First(&target).Error + if findErr != nil { + return nil, findErr + } + + // 3. 执行硬删除。 + // 3.1 使用 Model(&model.Task{UserID: userID}) 触发 cache_deleter。 + deleteResult := dao.db.WithContext(ctx). + Model(&model.Task{UserID: userID}). + Where("id = ? AND user_id = ?", taskID, userID). + Delete(&model.Task{}) + if deleteResult.Error != nil { + return nil, deleteResult.Error + } + + // 4. 并发兜底:RowsAffected=0 说明被并发请求先一步删除。 + if deleteResult.RowsAffected == 0 { + return nil, gorm.ErrRecordNotFound + } + + return &target, nil +} + +// compactPositiveIntIDs 对 int 切片做"去重 + 过滤非正数"。 +// +// 说明: +// 1. 该函数是 DAO 内部参数清洗工具,不参与任何业务判定; +// 2. 返回结果不保证稳定顺序,对当前 SQL where in 场景无影响。 +func compactPositiveIntIDs(ids []int) []int { + seen := make(map[int]struct{}, len(ids)) + result := make([]int, 0, len(ids)) + for _, id := range ids { + if id <= 0 { + continue + } + if _, exists := seen[id]; exists { + continue + } + seen[id] = struct{}{} + result = append(result, id) + } + return result +} diff --git a/backend/services/task/rpc/errors.go b/backend/services/task/rpc/errors.go new file mode 100644 index 0000000..8e2007d --- /dev/null +++ b/backend/services/task/rpc/errors.go @@ -0,0 +1,70 @@ +package rpc + +import ( + "errors" + "log" + "strings" + + "github.com/LoveLosita/smartflow/backend/respond" + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const taskErrorDomain = "smartflow.task" + +// grpcErrorFromServiceError 负责把 task 内部错误转换为 gRPC status。 +// +// 职责边界: +// 1. respond.Response 保留项目内部 status/info,供 gateway 反解; +// 2. 未分类错误只暴露通用内部错误,详细信息留在服务日志; +// 3. 不在 RPC 层重判业务规则,业务语义仍由 sv/dao 决定。 +func grpcErrorFromServiceError(err error) error { + if err == nil { + return nil + } + var resp respond.Response + if errors.As(err, &resp) { + return grpcErrorFromResponse(resp) + } + log.Printf("task rpc internal error: %v", err) + return status.Error(codes.Internal, "task service internal error") +} + +func grpcErrorFromResponse(resp respond.Response) error { + code := grpcCodeFromRespondStatus(resp.Status) + message := strings.TrimSpace(resp.Info) + if message == "" { + message = strings.TrimSpace(resp.Status) + } + st := status.New(code, message) + detail := &errdetails.ErrorInfo{ + Domain: taskErrorDomain, + Reason: resp.Status, + Metadata: map[string]string{ + "info": resp.Info, + }, + } + withDetails, err := st.WithDetails(detail) + if err != nil { + return st.Err() + } + return withDetails.Err() +} + +func grpcCodeFromRespondStatus(statusValue string) codes.Code { + switch strings.TrimSpace(statusValue) { + case respond.MissingToken.Status, respond.InvalidToken.Status, respond.InvalidClaims.Status, + respond.ErrUnauthorized.Status, respond.WrongTokenType.Status, respond.UserLoggedOut.Status: + return codes.Unauthenticated + case respond.MissingParam.Status, respond.WrongParamType.Status, respond.ParamTooLong.Status, + respond.WrongTaskID.Status, respond.WrongUserID.Status, respond.InvalidPriority.Status: + return codes.InvalidArgument + case respond.UserTasksEmpty.Status: + return codes.NotFound + } + if strings.HasPrefix(strings.TrimSpace(statusValue), "5") { + return codes.Internal + } + return codes.InvalidArgument +} diff --git a/backend/services/task/rpc/handler.go b/backend/services/task/rpc/handler.go new file mode 100644 index 0000000..fae3fa7 --- /dev/null +++ b/backend/services/task/rpc/handler.go @@ -0,0 +1,158 @@ +package rpc + +import ( + "context" + "encoding/json" + "errors" + + "github.com/LoveLosita/smartflow/backend/model" + "github.com/LoveLosita/smartflow/backend/respond" + "github.com/LoveLosita/smartflow/backend/services/task/rpc/pb" + tasksv "github.com/LoveLosita/smartflow/backend/services/task/sv" + taskcontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/task" +) + +type Handler struct { + pb.UnimplementedTaskServer + svc *tasksv.TaskService +} + +func NewHandler(svc *tasksv.TaskService) *Handler { + return &Handler{svc: svc} +} + +// Ping 供调用方在启动期确认 task zrpc 已可用。 +func (h *Handler) Ping(ctx context.Context, req *pb.StatusResponse) (*pb.StatusResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + return &pb.StatusResponse{}, nil +} + +func (h *Handler) AddTask(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq taskcontracts.AddTaskRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.AddTask(ctx, &model.UserAddTaskRequest{ + Title: contractReq.Title, + PriorityGroup: contractReq.PriorityGroup, + EstimatedSections: contractReq.EstimatedSections, + DeadlineAt: contractReq.DeadlineAt, + }, contractReq.UserID) + return jsonResponse(data, err) +} + +func (h *Handler) GetUserTasks(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq taskcontracts.UserRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.GetUserTasks(ctx, contractReq.UserID) + return jsonResponse(data, err) +} + +func (h *Handler) BatchTaskStatus(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq taskcontracts.BatchTaskStatusRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.BatchTaskStatus(ctx, &model.BatchTaskStatusRequest{IDs: contractReq.IDs}, contractReq.UserID) + return jsonResponse(data, err) +} + +func (h *Handler) CompleteTask(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq taskcontracts.CompleteTaskRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.CompleteTask(ctx, &model.UserCompleteTaskRequest{TaskID: contractReq.TaskID}, contractReq.UserID) + return jsonResponse(data, err) +} + +func (h *Handler) UndoCompleteTask(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq taskcontracts.UndoCompleteTaskRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.UndoCompleteTask(ctx, &model.UserUndoCompleteTaskRequest{TaskID: contractReq.TaskID}, contractReq.UserID) + return jsonResponse(data, err) +} + +func (h *Handler) UpdateTask(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq taskcontracts.UpdateTaskRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + data, err := h.svc.UpdateTask(ctx, &model.UserUpdateTaskRequest{ + TaskID: contractReq.TaskID, + Title: contractReq.Title, + PriorityGroup: contractReq.PriorityGroup, + DeadlineAt: contractReq.DeadlineAt, + UrgencyThresholdAt: contractReq.UrgencyThresholdAt, + }, contractReq.UserID) + return jsonResponse(data, err) +} + +func (h *Handler) DeleteTask(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq taskcontracts.DeleteTaskRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + taskID, err := h.svc.DeleteTask(ctx, &model.UserCompleteTaskRequest{TaskID: contractReq.TaskID}, contractReq.UserID) + return jsonResponse(map[string]int{"task_id": taskID}, err) +} + +func (h *Handler) GetTaskForActiveSchedule(ctx context.Context, req *pb.JSONRequest) (*pb.JSONResponse, error) { + if err := h.ensureReady(req); err != nil { + return nil, err + } + var contractReq taskcontracts.TaskFactRequest + if err := json.Unmarshal(req.PayloadJson, &contractReq); err != nil { + return nil, grpcErrorFromServiceError(respond.WrongParamType) + } + task, found, err := h.svc.GetTaskForActiveSchedule(ctx, contractReq) + return jsonResponse(taskcontracts.TaskFactResponse{Task: task, Found: found}, err) +} + +func (h *Handler) ensureReady(req any) error { + if h == nil || h.svc == nil { + return grpcErrorFromServiceError(errors.New("task service dependency not initialized")) + } + if req == nil { + return grpcErrorFromServiceError(respond.MissingParam) + } + return nil +} + +func jsonResponse(value any, err error) (*pb.JSONResponse, error) { + if err != nil { + return nil, grpcErrorFromServiceError(err) + } + raw, err := json.Marshal(value) + if err != nil { + return nil, grpcErrorFromServiceError(err) + } + return &pb.JSONResponse{DataJson: raw}, nil +} diff --git a/backend/services/task/rpc/pb/task.pb.go b/backend/services/task/rpc/pb/task.pb.go new file mode 100644 index 0000000..b6fe02e --- /dev/null +++ b/backend/services/task/rpc/pb/task.pb.go @@ -0,0 +1,39 @@ +package pb + +import proto "github.com/golang/protobuf/proto" + +var _ = proto.Marshal + +const _ = proto.ProtoPackageIsVersion3 + +type JSONRequest struct { + PayloadJson []byte `protobuf:"bytes,1,opt,name=payload_json,json=payloadJson,proto3" json:"payload_json,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *JSONRequest) Reset() { *m = JSONRequest{} } +func (m *JSONRequest) String() string { return proto.CompactTextString(m) } +func (*JSONRequest) ProtoMessage() {} + +type JSONResponse struct { + DataJson []byte `protobuf:"bytes,1,opt,name=data_json,json=dataJson,proto3" json:"data_json,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *JSONResponse) Reset() { *m = JSONResponse{} } +func (m *JSONResponse) String() string { return proto.CompactTextString(m) } +func (*JSONResponse) ProtoMessage() {} + +type StatusResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatusResponse) Reset() { *m = StatusResponse{} } +func (m *StatusResponse) String() string { return proto.CompactTextString(m) } +func (*StatusResponse) ProtoMessage() {} diff --git a/backend/services/task/rpc/pb/task_grpc.pb.go b/backend/services/task/rpc/pb/task_grpc.pb.go new file mode 100644 index 0000000..fb57d6e --- /dev/null +++ b/backend/services/task/rpc/pb/task_grpc.pb.go @@ -0,0 +1,191 @@ +package pb + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +const ( + Task_Ping_FullMethodName = "/smartflow.task.Task/Ping" + Task_AddTask_FullMethodName = "/smartflow.task.Task/AddTask" + Task_GetUserTasks_FullMethodName = "/smartflow.task.Task/GetUserTasks" + Task_BatchTaskStatus_FullMethodName = "/smartflow.task.Task/BatchTaskStatus" + Task_CompleteTask_FullMethodName = "/smartflow.task.Task/CompleteTask" + Task_UndoCompleteTask_FullMethodName = "/smartflow.task.Task/UndoCompleteTask" + Task_UpdateTask_FullMethodName = "/smartflow.task.Task/UpdateTask" + Task_DeleteTask_FullMethodName = "/smartflow.task.Task/DeleteTask" + Task_GetTaskForActiveSchedule_FullMethodName = "/smartflow.task.Task/GetTaskForActiveSchedule" +) + +type TaskClient interface { + Ping(ctx context.Context, in *StatusResponse, opts ...grpc.CallOption) (*StatusResponse, error) + AddTask(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + GetUserTasks(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + BatchTaskStatus(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + CompleteTask(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + UndoCompleteTask(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + UpdateTask(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + DeleteTask(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) + GetTaskForActiveSchedule(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) +} + +type taskClient struct { + cc grpc.ClientConnInterface +} + +func NewTaskClient(cc grpc.ClientConnInterface) TaskClient { + return &taskClient{cc} +} + +func (c *taskClient) Ping(ctx context.Context, in *StatusResponse, opts ...grpc.CallOption) (*StatusResponse, error) { + out := new(StatusResponse) + err := c.cc.Invoke(ctx, Task_Ping_FullMethodName, in, out, opts...) + return out, err +} + +func (c *taskClient) AddTask(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Task_AddTask_FullMethodName, in, out, opts...) + return out, err +} + +func (c *taskClient) GetUserTasks(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Task_GetUserTasks_FullMethodName, in, out, opts...) + return out, err +} + +func (c *taskClient) BatchTaskStatus(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Task_BatchTaskStatus_FullMethodName, in, out, opts...) + return out, err +} + +func (c *taskClient) CompleteTask(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Task_CompleteTask_FullMethodName, in, out, opts...) + return out, err +} + +func (c *taskClient) UndoCompleteTask(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Task_UndoCompleteTask_FullMethodName, in, out, opts...) + return out, err +} + +func (c *taskClient) UpdateTask(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Task_UpdateTask_FullMethodName, in, out, opts...) + return out, err +} + +func (c *taskClient) DeleteTask(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Task_DeleteTask_FullMethodName, in, out, opts...) + return out, err +} + +func (c *taskClient) GetTaskForActiveSchedule(ctx context.Context, in *JSONRequest, opts ...grpc.CallOption) (*JSONResponse, error) { + out := new(JSONResponse) + err := c.cc.Invoke(ctx, Task_GetTaskForActiveSchedule_FullMethodName, in, out, opts...) + return out, err +} + +type TaskServer interface { + Ping(context.Context, *StatusResponse) (*StatusResponse, error) + AddTask(context.Context, *JSONRequest) (*JSONResponse, error) + GetUserTasks(context.Context, *JSONRequest) (*JSONResponse, error) + BatchTaskStatus(context.Context, *JSONRequest) (*JSONResponse, error) + CompleteTask(context.Context, *JSONRequest) (*JSONResponse, error) + UndoCompleteTask(context.Context, *JSONRequest) (*JSONResponse, error) + UpdateTask(context.Context, *JSONRequest) (*JSONResponse, error) + DeleteTask(context.Context, *JSONRequest) (*JSONResponse, error) + GetTaskForActiveSchedule(context.Context, *JSONRequest) (*JSONResponse, error) +} + +type UnimplementedTaskServer struct{} + +func (UnimplementedTaskServer) Ping(context.Context, *StatusResponse) (*StatusResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") +} +func (UnimplementedTaskServer) AddTask(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method AddTask not implemented") +} +func (UnimplementedTaskServer) GetUserTasks(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetUserTasks not implemented") +} +func (UnimplementedTaskServer) BatchTaskStatus(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method BatchTaskStatus not implemented") +} +func (UnimplementedTaskServer) CompleteTask(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CompleteTask not implemented") +} +func (UnimplementedTaskServer) UndoCompleteTask(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method UndoCompleteTask not implemented") +} +func (UnimplementedTaskServer) UpdateTask(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpdateTask not implemented") +} +func (UnimplementedTaskServer) DeleteTask(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method DeleteTask not implemented") +} +func (UnimplementedTaskServer) GetTaskForActiveSchedule(context.Context, *JSONRequest) (*JSONResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetTaskForActiveSchedule not implemented") +} + +func RegisterTaskServer(s grpc.ServiceRegistrar, srv TaskServer) { + s.RegisterService(&Task_ServiceDesc, srv) +} + +func _Task_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StatusResponse) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TaskServer).Ping(ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: Task_Ping_FullMethodName} + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TaskServer).Ping(ctx, req.(*StatusResponse)) + } + return interceptor(ctx, in, info, handler) +} + +func _Task_JSON_Handler(fullMethod string, invoke func(TaskServer, context.Context, *JSONRequest) (*JSONResponse, error)) grpc.MethodHandler { + return func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(JSONRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return invoke(srv.(TaskServer), ctx, in) + } + info := &grpc.UnaryServerInfo{Server: srv, FullMethod: fullMethod} + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return invoke(srv.(TaskServer), ctx, req.(*JSONRequest)) + } + return interceptor(ctx, in, info, handler) + } +} + +var Task_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "smartflow.task.Task", + HandlerType: (*TaskServer)(nil), + Methods: []grpc.MethodDesc{ + {MethodName: "Ping", Handler: _Task_Ping_Handler}, + {MethodName: "AddTask", Handler: _Task_JSON_Handler(Task_AddTask_FullMethodName, TaskServer.AddTask)}, + {MethodName: "GetUserTasks", Handler: _Task_JSON_Handler(Task_GetUserTasks_FullMethodName, TaskServer.GetUserTasks)}, + {MethodName: "BatchTaskStatus", Handler: _Task_JSON_Handler(Task_BatchTaskStatus_FullMethodName, TaskServer.BatchTaskStatus)}, + {MethodName: "CompleteTask", Handler: _Task_JSON_Handler(Task_CompleteTask_FullMethodName, TaskServer.CompleteTask)}, + {MethodName: "UndoCompleteTask", Handler: _Task_JSON_Handler(Task_UndoCompleteTask_FullMethodName, TaskServer.UndoCompleteTask)}, + {MethodName: "UpdateTask", Handler: _Task_JSON_Handler(Task_UpdateTask_FullMethodName, TaskServer.UpdateTask)}, + {MethodName: "DeleteTask", Handler: _Task_JSON_Handler(Task_DeleteTask_FullMethodName, TaskServer.DeleteTask)}, + {MethodName: "GetTaskForActiveSchedule", Handler: _Task_JSON_Handler(Task_GetTaskForActiveSchedule_FullMethodName, TaskServer.GetTaskForActiveSchedule)}, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "task.proto", +} diff --git a/backend/services/task/rpc/server.go b/backend/services/task/rpc/server.go new file mode 100644 index 0000000..c255e92 --- /dev/null +++ b/backend/services/task/rpc/server.go @@ -0,0 +1,60 @@ +package rpc + +import ( + "errors" + "strings" + "time" + + "github.com/LoveLosita/smartflow/backend/services/task/rpc/pb" + tasksv "github.com/LoveLosita/smartflow/backend/services/task/sv" + "github.com/zeromicro/go-zero/core/service" + "github.com/zeromicro/go-zero/zrpc" + "google.golang.org/grpc" +) + +const ( + defaultListenOn = "0.0.0.0:9085" + defaultTimeout = 6 * time.Second +) + +type ServerOptions struct { + ListenOn string + Timeout time.Duration + Service *tasksv.TaskService +} + +// NewServer 创建 task zrpc 服务端。 +// +// 职责边界: +// 1. 只负责 zrpc server 配置与 gRPC handler 注册; +// 2. 不创建数据库、Redis 或业务服务,它们由 cmd/task 管理; +// 3. 返回 listenOn 供进程入口打印启动日志。 +func NewServer(opts ServerOptions) (*zrpc.RpcServer, string, error) { + if opts.Service == nil { + return nil, "", errors.New("task service dependency not initialized") + } + + listenOn := strings.TrimSpace(opts.ListenOn) + if listenOn == "" { + listenOn = defaultListenOn + } + timeout := opts.Timeout + if timeout <= 0 { + timeout = defaultTimeout + } + + server, err := zrpc.NewServer(zrpc.RpcServerConf{ + ServiceConf: service.ServiceConf{ + Name: "task.rpc", + Mode: service.DevMode, + }, + ListenOn: listenOn, + Timeout: int64(timeout / time.Millisecond), + }, func(grpcServer *grpc.Server) { + pb.RegisterTaskServer(grpcServer, NewHandler(opts.Service)) + }) + if err != nil { + return nil, "", err + } + return server, listenOn, nil +} diff --git a/backend/services/task/rpc/task.proto b/backend/services/task/rpc/task.proto new file mode 100644 index 0000000..573f569 --- /dev/null +++ b/backend/services/task/rpc/task.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; + +package smartflow.task; + +option go_package = "github.com/LoveLosita/smartflow/backend/services/task/rpc/pb"; + +service Task { + rpc Ping(StatusResponse) returns (StatusResponse); + rpc AddTask(JSONRequest) returns (JSONResponse); + rpc GetUserTasks(JSONRequest) returns (JSONResponse); + rpc BatchTaskStatus(JSONRequest) returns (JSONResponse); + rpc CompleteTask(JSONRequest) returns (JSONResponse); + rpc UndoCompleteTask(JSONRequest) returns (JSONResponse); + rpc UpdateTask(JSONRequest) returns (JSONResponse); + rpc DeleteTask(JSONRequest) returns (JSONResponse); + rpc GetTaskForActiveSchedule(JSONRequest) returns (JSONResponse); +} + +message JSONRequest { + bytes payload_json = 1; +} + +message JSONResponse { + bytes data_json = 1; +} + +message StatusResponse { +} diff --git a/backend/services/task/sv/outbox.go b/backend/services/task/sv/outbox.go new file mode 100644 index 0000000..1563268 --- /dev/null +++ b/backend/services/task/sv/outbox.go @@ -0,0 +1,128 @@ +package sv + +import ( + "context" + "encoding/json" + "errors" + "log" + "strconv" + "time" + + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + "github.com/LoveLosita/smartflow/backend/model" + taskdao "github.com/LoveLosita/smartflow/backend/services/task/dao" + "gorm.io/gorm" +) + +const ( + // EventTypeTaskUrgencyPromoteRequested 是“任务紧急性平移请求”事件类型。 + EventTypeTaskUrgencyPromoteRequested = "task.urgency.promote.requested" +) + +// OutboxBus 是 task 服务注册消费 handler 需要的最小总线接口。 +type OutboxBus interface { + RegisterEventHandler(eventType string, handler outboxinfra.MessageHandler) error +} + +// RegisterTaskUrgencyPromoteRoute 只登记 task 事件归属,不注册消费 handler。 +// +// 职责边界: +// 1. 供迁移期其它进程发布 task 事件时解析到 task_outbox_messages; +// 2. 不创建 Kafka consumer,也不启动 task handler; +// 3. 真正消费仍由 cmd/task 调用 RegisterTaskUrgencyPromoteHandler 承担。 +func RegisterTaskUrgencyPromoteRoute() error { + return outboxinfra.RegisterEventService(EventTypeTaskUrgencyPromoteRequested, outboxinfra.ServiceTask) +} + +// RegisterTaskUrgencyPromoteHandler 注册 task 服务自己的“紧急性平移”消费者。 +// +// 职责边界: +// 1. 只处理 task.urgency.promote.requested,不处理 agent/memory 等其它事件; +// 2. 业务更新和 outbox consumed 推进放在同一事务内; +// 3. handler 不创建 DAO 或 event bus,避免消费链路隐藏启动依赖。 +func RegisterTaskUrgencyPromoteHandler(bus OutboxBus, outboxRepo *outboxinfra.Repository, taskDAO *taskdao.TaskDAO) error { + if bus == nil { + return errors.New("event bus is nil") + } + if outboxRepo == nil { + return errors.New("outbox repository is nil") + } + if taskDAO == nil { + return errors.New("task dao is nil") + } + if err := RegisterTaskUrgencyPromoteRoute(); err != nil { + return err + } + route, ok := outboxinfra.ResolveEventRoute(EventTypeTaskUrgencyPromoteRequested) + if !ok { + return errors.New("task.urgency.promote.requested route is missing") + } + eventOutboxRepo := outboxRepo.WithRoute(route) + + handler := func(ctx context.Context, envelope kafkabus.Envelope) error { + var payload model.TaskUrgencyPromoteRequestedPayload + if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析任务紧急性平移载荷失败: "+unmarshalErr.Error()) + return nil + } + + payload.TaskIDs = sanitizePositiveUniqueIntIDs(payload.TaskIDs) + if payload.UserID <= 0 || len(payload.TaskIDs) == 0 { + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "任务紧急性平移载荷无效: user_id 或 task_ids 非法") + return nil + } + + return eventOutboxRepo.ConsumeAndMarkConsumed(ctx, envelope.OutboxID, func(tx *gorm.DB) error { + updated, err := taskDAO.WithTx(tx).PromoteTaskUrgencyByIDs(ctx, payload.UserID, payload.TaskIDs, time.Now()) + if err != nil { + return err + } + log.Printf("任务紧急性平移消费完成: user_id=%d task_count=%d affected=%d outbox_id=%d", payload.UserID, len(payload.TaskIDs), updated, envelope.OutboxID) + return nil + }) + } + + return bus.RegisterEventHandler(EventTypeTaskUrgencyPromoteRequested, handler) +} + +// PublishTaskUrgencyPromoteRequested 发布“任务紧急性平移请求”事件。 +func PublishTaskUrgencyPromoteRequested(ctx context.Context, publisher outboxinfra.EventPublisher, payload model.TaskUrgencyPromoteRequestedPayload) error { + if publisher == nil { + return errors.New("event publisher is nil") + } + if payload.UserID <= 0 { + return errors.New("invalid user_id") + } + payload.TaskIDs = sanitizePositiveUniqueIntIDs(payload.TaskIDs) + if len(payload.TaskIDs) == 0 { + return errors.New("task_ids is empty") + } + if payload.TriggeredAt.IsZero() { + payload.TriggeredAt = time.Now() + } + + return publisher.Publish(ctx, outboxinfra.PublishRequest{ + EventType: EventTypeTaskUrgencyPromoteRequested, + EventVersion: outboxinfra.DefaultEventVersion, + MessageKey: strconv.Itoa(payload.UserID), + AggregateID: strconv.Itoa(payload.UserID), + Payload: payload, + }) +} + +func sanitizePositiveUniqueIntIDs(ids []int) []int { + seen := make(map[int]struct{}, len(ids)) + result := make([]int, 0, len(ids)) + for _, id := range ids { + if id <= 0 { + continue + } + if _, exists := seen[id]; exists { + continue + } + seen[id] = struct{}{} + result = append(result, id) + } + return result +} diff --git a/backend/services/task/sv/service.go b/backend/services/task/sv/service.go new file mode 100644 index 0000000..0a1701d --- /dev/null +++ b/backend/services/task/sv/service.go @@ -0,0 +1,575 @@ +package sv + +import ( + "context" + "errors" + "fmt" + "log" + "time" + + "github.com/LoveLosita/smartflow/backend/conv" + rootdao "github.com/LoveLosita/smartflow/backend/dao" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + "github.com/LoveLosita/smartflow/backend/model" + "github.com/LoveLosita/smartflow/backend/respond" + taskdao "github.com/LoveLosita/smartflow/backend/services/task/dao" + taskcontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/task" + "github.com/go-redis/redis/v8" + "gorm.io/gorm" +) + +const ( + // taskBatchStatusMaxIDs 限制批量状态查询的单次任务 ID 数量,避免大请求放大缓存/内存扫描成本。 + taskBatchStatusMaxIDs = 100 + // taskUrgencyPromoteDedupeTTL 是"同一任务平移请求"的去重锁有效期。 + // + // 设计考虑: + // 1. 太短会导致消费稍慢时被重复投递; + // 2. 太长会导致首次投递失败后恢复变慢; + // 3. 这里先取 120 秒作为折中值,后续可按线上观测再调优。 + taskUrgencyPromoteDedupeTTL = 120 * time.Second + // taskUrgencyPromoteDedupeKeyFmt 是任务平移去重键模板。 + taskUrgencyPromoteDedupeKeyFmt = "smartflow:task:promote:pending:%d:%d" +) + +type TaskService struct { + // dao 负责任务表读写。 + dao *taskdao.TaskDAO + // cache 负责任务列表缓存与 Redis 去重锁能力。 + cache *rootdao.CacheDAO + // eventPublisher 负责发布 outbox 事件(可能为空:例如未启用 Kafka/总线时)。 + eventPublisher outboxinfra.EventPublisher + // activeScheduleDAO 负责维护主动调度 due job;为空时保持旧任务链路兼容。 + activeScheduleDAO *rootdao.ActiveScheduleDAO +} + +// NewTaskService 创建 TaskService 实例。 +// +// 职责边界: +// 1. 只做依赖注入,不做连接可用性探测; +// 2. 允许 eventPublisher 为空(用于本地降级场景)。 +func NewTaskService(taskDAO *taskdao.TaskDAO, cacheDAO *rootdao.CacheDAO, eventPublisher outboxinfra.EventPublisher) *TaskService { + return &TaskService{ + dao: taskDAO, + cache: cacheDAO, + eventPublisher: eventPublisher, + } +} + +// SetActiveScheduleDAO 注入主动调度自有表仓储。 +// +// 职责边界: +// 1. 只负责迁移期依赖接线,避免扩大 TaskService 构造函数调用面; +// 2. 不改变任务主流程语义,未注入时主动调度 job 同步自动降级为 no-op。 +func (ts *TaskService) SetActiveScheduleDAO(activeScheduleDAO *rootdao.ActiveScheduleDAO) { + if ts != nil { + ts.activeScheduleDAO = activeScheduleDAO + } +} + +// AddTask 新增任务。 +// +// 职责边界: +// 1. 负责参数转换、优先级合法性校验与写库; +// 2. 不负责"紧急性自动平移"逻辑(该逻辑发生在任务读取时的懒触发链路)。 +func (ts *TaskService) AddTask(ctx context.Context, req *model.UserAddTaskRequest, userID int) (*model.UserAddTaskResponse, error) { + // 1. 把用户请求转换为内部模型,避免 API 层结构直接泄漏到 DAO。 + taskModel := conv.UserAddTaskRequestToModel(req, userID) + // 2. 优先级范围校验:当前任务体系只允许 1~4。 + if taskModel.Priority < 1 || taskModel.Priority >= 5 { + return nil, respond.InvalidPriority + } + // 3. 写库。 + createdTask, err := ts.dao.AddTask(taskModel) + if err != nil { + return nil, err + } + ts.syncActiveScheduleJobBestEffort(ctx, createdTask) + // 4. 返回对外响应 DTO。 + response := conv.ModelToUserAddTaskResponse(createdTask) + return response, nil +} + +// GetTaskForActiveSchedule 读取 active-scheduler 所需的 task_pool 最小事实。 +// +// 职责边界: +// 1. 只把 task 服务拥有的任务快照转换为跨进程契约; +// 2. 不读取 schedule,也不生成主动调度候选; +// 3. found=false 表示目标不存在或当前用户无权访问,由 active-scheduler 观察链路降级处理。 +func (ts *TaskService) GetTaskForActiveSchedule(ctx context.Context, req taskcontracts.TaskFactRequest) (taskcontracts.TaskFact, bool, error) { + if ts == nil || ts.dao == nil { + return taskcontracts.TaskFact{}, false, errors.New("task service 未初始化") + } + task, err := ts.dao.GetTaskByUserAndID(ctx, req.UserID, req.TaskID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return taskcontracts.TaskFact{}, false, nil + } + return taskcontracts.TaskFact{}, false, err + } + + estimatedSections := task.EstimatedSections + if estimatedSections <= 0 { + estimatedSections = 1 + } + if estimatedSections > 4 { + estimatedSections = 4 + } + return taskcontracts.TaskFact{ + ID: task.ID, + UserID: task.UserID, + Title: task.Title, + Priority: task.Priority, + IsCompleted: task.IsCompleted, + DeadlineAt: task.DeadlineAt, + UrgencyThresholdAt: task.UrgencyThresholdAt, + EstimatedSections: estimatedSections, + }, true, nil +} + +// CompleteTask 将用户指定任务标记为"已完成"。 +// +// 职责边界: +// 1. 负责入参校验与业务错误映射; +// 2. 负责调用 DAO 执行状态更新; +// 3. 不负责幂等键校验(幂等由中间件处理); +// 4. 不负责缓存删除细节(缓存删除由 GORM cache_deleter 回调触发)。 +func (ts *TaskService) CompleteTask(ctx context.Context, req *model.UserCompleteTaskRequest, userID int) (*model.UserCompleteTaskResponse, error) { + // 1. 参数兜底:请求体为空、非法 user 或非法 task_id 直接返回业务错误。 + if req == nil || userID <= 0 || req.TaskID <= 0 { + return nil, respond.WrongTaskID + } + + // 2. 调用 DAO 执行"查询 + 必要时更新"。 + updatedTask, alreadyCompleted, err := ts.dao.CompleteTaskByID(ctx, userID, req.TaskID) + if err != nil { + // 2.1 任务不存在或不属于当前用户时,统一映射为 WrongTaskID。 + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, respond.WrongTaskID + } + // 2.2 其余数据库异常向上透传,交由统一错误处理器返回 500。 + return nil, err + } + if updatedTask == nil { + // 3. 极端防御:DAO 不应返回 nil,若发生则视为内部异常。 + return nil, errors.New("complete task succeeded but task is nil") + } + + // 4. 构造响应: + // 4.1 already_completed=true 表示本次命中幂等,不影响最终成功状态; + // 4.2 is_completed 始终为 true,便于前端直接刷新状态。 + resp := &model.UserCompleteTaskResponse{ + TaskID: updatedTask.ID, + IsCompleted: true, + AlreadyCompleted: alreadyCompleted, + Status: "completed", + } + ts.cancelActiveScheduleJobBestEffort(ctx, updatedTask.UserID, updatedTask.ID, "task_completed") + return resp, nil +} + +// UndoCompleteTask 取消用户任务的"已完成勾选"。 +// +// 职责边界: +// 1. 负责入参校验与业务错误映射; +// 2. 负责调用 DAO 执行状态恢复; +// 3. 不负责幂等缓存(本接口按需求要求:任务未完成时必须报错); +// 4. 不负责缓存删除细节(由 GORM cache_deleter 回调自动处理)。 +func (ts *TaskService) UndoCompleteTask(ctx context.Context, req *model.UserUndoCompleteTaskRequest, userID int) (*model.UserUndoCompleteTaskResponse, error) { + // 1. 参数兜底:请求体为空、非法 user 或非法 task_id 直接返回业务错误。 + if req == nil || userID <= 0 || req.TaskID <= 0 { + return nil, respond.WrongTaskID + } + + // 2. 调用 DAO 执行"恢复未完成"逻辑。 + updatedTask, err := ts.dao.UndoCompleteTaskByID(ctx, userID, req.TaskID) + if err != nil { + // 2.1 任务不存在或不属于当前用户,统一映射为 WrongTaskID。 + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, respond.WrongTaskID + } + // 2.2 任务本来就未完成:按需求返回明确业务错误。 + if errors.Is(err, respond.TaskNotCompleted) { + return nil, respond.TaskNotCompleted + } + // 2.3 其余数据库异常继续向上透传。 + return nil, err + } + if updatedTask == nil { + // 3. 极端防御:DAO 成功但返回 nil,视为内部异常。 + return nil, errors.New("undo complete task succeeded but task is nil") + } + + // 4. 组装响应:恢复成功后 is_completed 恒为 false。 + resp := &model.UserUndoCompleteTaskResponse{ + TaskID: updatedTask.ID, + IsCompleted: false, + Status: "uncompleted", + } + return resp, nil +} + +// GetUserTasks 获取用户任务列表(含"读时紧急性派生"与"异步平移触发")。 +// +// 核心流程(步骤化): +// 1. 先读缓存,未命中再回源 DB,并把"原始模型"回填缓存; +// 2. 在内存里做"读时派生":仅用于本次返回给前端,不直接改库; +// 3. 收集"已到紧急分界线且仍处于非紧急象限"的任务 ID; +// 4. 通过 Redis SETNX 去重后,发布 outbox 事件异步落库; +// 5. 无论发布成功与否,都优先返回本次派生结果,保证用户读体验。 +// +// 一致性策略: +// 1. 缓存里存的是原始任务,不是派生后的优先级; +// 2. 真实平移由异步消费者条件更新 DB; +// 3. DB 更新后由 cache_deleter 自动删缓存,下一次读取自然拿到新状态。 +func (ts *TaskService) GetUserTasks(ctx context.Context, userID int) ([]model.GetUserTaskResp, error) { + derivedTasks, err := ts.GetTasksWithUrgencyPromotion(ctx, userID) + if err != nil { + return nil, err + } + return conv.ModelToGetUserTasksResp(derivedTasks), nil +} + +// BatchTaskStatus 批量查询当前登录用户任务的完成状态。 +// +// 职责边界: +// 1. 负责请求 ID 的过滤、去重和数量限制; +// 2. 只返回当前用户有权访问且仍存在的任务,避免泄露其他用户任务状态; +// 3. 复用 getRawUserTasks 的 Redis 任务列表缓存链路,不新增绕过缓存的 DAO 查询; +// 4. 该接口只读,不触发 GORM cache_deleter,也不反向修改 NewAgent timeline 历史快照。 +func (ts *TaskService) BatchTaskStatus(ctx context.Context, req *model.BatchTaskStatusRequest, userID int) (*model.BatchTaskStatusResponse, error) { + resp := &model.BatchTaskStatusResponse{ + Items: []model.BatchTaskStatusItem{}, + } + if userID <= 0 { + return nil, respond.WrongUserID + } + if req == nil { + return resp, nil + } + + // 1. 先把前端传入的历史卡片 task id 做归一化。 + // 1.1 非法 ID 直接过滤,避免无意义匹配; + // 1.2 保留首次出现顺序,方便前端按请求顺序回填; + // 1.3 超过上限时截断,避免单次 hydration 请求放大服务端成本。 + validIDs := compactPositiveUniqueTaskIDsWithLimit(req.IDs, taskBatchStatusMaxIDs) + if len(validIDs) == 0 { + return resp, nil + } + + // 2. 复用原始任务读取链路。 + // 2.1 命中 Redis 时直接读取 smartflow:tasks:{userID}; + // 2.2 未命中时由 getRawUserTasks 回源 DB 并回填缓存; + // 2.3 用户没有任何任务时映射为空 items,符合 hydration 的“无匹配不报错”语义。 + tasks, err := ts.getRawUserTasks(ctx, userID) + if err != nil { + if errors.Is(err, respond.UserTasksEmpty) { + return resp, nil + } + return nil, err + } + + // 3. 在当前用户任务集合内做内存匹配。 + // 3.1 不命中的 ID 可能是已删除、属于其他用户、或历史快照里的旧任务,统一静默过滤; + // 3.2 返回字段只包含当前模型可用的完成状态,避免伪造不存在的 updated_at。 + taskByID := make(map[int]model.Task, len(tasks)) + for _, task := range tasks { + taskByID[task.ID] = task + } + for _, id := range validIDs { + task, exists := taskByID[id] + if !exists { + continue + } + resp.Items = append(resp.Items, model.BatchTaskStatusItem{ + ID: task.ID, + IsCompleted: task.IsCompleted, + }) + } + return resp, nil +} + +// GetTasksWithUrgencyPromotion 读取用户任务并应用读时紧急性提升 + 异步落库触发。 +// +// 统一入口,供前端查询(GetUserTasks)和 LLM 工具查询(QueryTasksForTool)复用。 +// 调用方不应假设 DB 已更新——持久化是异步的。 +func (ts *TaskService) GetTasksWithUrgencyPromotion(ctx context.Context, userID int) ([]model.Task, error) { + rawTasks, err := ts.getRawUserTasks(ctx, userID) + if err != nil { + return nil, err + } + derivedTasks, duePromoteTaskIDs := deriveTaskUrgencyForRead(rawTasks, time.Now()) + ts.tryEnqueueTaskUrgencyPromote(ctx, userID, duePromoteTaskIDs) + return derivedTasks, nil +} + +// getRawUserTasks 读取"原始任务模型"。 +// +// 职责边界: +// 1. 负责缓存命中/回源 DB/回填缓存; +// 2. 不做优先级派生,不做异步事件投递; +// 3. 缓存写失败只记日志,不阻断主流程。 +func (ts *TaskService) getRawUserTasks(ctx context.Context, userID int) ([]model.Task, error) { + // 1. 先查缓存:命中则直接返回。 + cachedTasks, err := ts.cache.GetUserTasksFromCache(ctx, userID) + if err == nil { + return cachedTasks, nil + } + + // 2. 非 redis.Nil 错误直接返回,避免掩盖真实故障。 + if !errors.Is(err, redis.Nil) { + return nil, err + } + + // 3. 缓存未命中回源 DB。 + dbTasks, err := ts.dao.GetTasksByUserID(userID) + if err != nil { + return nil, err + } + + // 4. 回填缓存(失败不阻断主链路)。 + if setErr := ts.cache.SetUserTasksToCache(ctx, userID, dbTasks); setErr != nil { + log.Printf("写入用户任务缓存失败: user_id=%d err=%v", userID, setErr) + } + return dbTasks, nil +} + +// deriveTaskUrgencyForRead 对任务做"读时紧急性派生",并收集需要异步落库的任务 ID。 +// +// 职责边界: +// 1. 只在内存里改本次返回值,不写 DB; +// 2. 只做"到线且未完成任务"的优先级映射; +// 3. 不处理去重锁和事件发布。 +// +// 返回语义: +// 1. 第一个返回值:可直接用于响应前端的派生任务切片; +// 2. 第二个返回值:需要发"异步平移事件"的任务 ID 列表(可能为空)。 +func deriveTaskUrgencyForRead(tasks []model.Task, now time.Time) ([]model.Task, []int) { + // 1. 拷贝切片,避免修改调用方持有的原始数据。 + derived := make([]model.Task, len(tasks)) + copy(derived, tasks) + + pendingPromoteTaskIDs := make([]int, 0, len(derived)) + + // 2. 逐条判断是否满足"自动平移"条件。 + for idx := range derived { + current := &derived[idx] + + // 2.1 已完成任务不参与平移。 + if current.IsCompleted { + continue + } + // 2.2 没有分界线的任务不参与平移。 + if current.UrgencyThresholdAt == nil { + continue + } + // 2.3 尚未到分界线,不平移。 + if current.UrgencyThresholdAt.After(now) { + continue + } + + // 2.4 到线后,仅把"不紧急象限"平移到对应"紧急象限"。 + // 2.4.1 重要不紧急(2) -> 重要且紧急(1) + // 2.4.2 不简单不重要(4) -> 简单不重要(3) + switch current.Priority { + case 2: + current.Priority = 1 + pendingPromoteTaskIDs = append(pendingPromoteTaskIDs, current.ID) + case 4: + current.Priority = 3 + pendingPromoteTaskIDs = append(pendingPromoteTaskIDs, current.ID) + default: + // 2.4.3 其他优先级不处理(包含已经是 1/3 的情况)。 + } + } + return derived, pendingPromoteTaskIDs +} + +// tryEnqueueTaskUrgencyPromote 尝试发布"任务紧急性平移请求"事件。 +// +// 职责边界: +// 1. 负责 Redis 去重锁 + outbox 发布; +// 2. 不负责真正落库(由消费者负责); +// 3. 发布失败时要释放本次抢到的去重锁,避免任务被长时间"误判已投递"。 +func (ts *TaskService) tryEnqueueTaskUrgencyPromote(ctx context.Context, userID int, taskIDs []int) { + // 1. 基础兜底:无发布器或无候选任务时直接返回。 + if ts.eventPublisher == nil || userID <= 0 || len(taskIDs) == 0 { + return + } + + // 2. 先做任务 ID 清洗,避免无效 ID 参与去重与发布。 + validTaskIDs := compactPositiveUniqueTaskIDs(taskIDs) + if len(validTaskIDs) == 0 { + return + } + + // 3. 逐个抢 SETNX 去重锁: + // 3.1 抢到锁才允许进入本次发布; + // 3.2 抢不到说明已有请求在途,本次跳过即可; + // 3.3 抢锁失败只记录日志,不中断主流程。 + lockedTaskIDs := make([]int, 0, len(validTaskIDs)) + lockedKeys := make([]string, 0, len(validTaskIDs)) + for _, taskID := range validTaskIDs { + lockKey := fmt.Sprintf(taskUrgencyPromoteDedupeKeyFmt, userID, taskID) + locked, lockErr := ts.cache.AcquireLock(ctx, lockKey, taskUrgencyPromoteDedupeTTL) + if lockErr != nil { + log.Printf("任务平移去重锁获取失败: user_id=%d task_id=%d err=%v", userID, taskID, lockErr) + continue + } + if !locked { + continue + } + lockedTaskIDs = append(lockedTaskIDs, taskID) + lockedKeys = append(lockedKeys, lockKey) + } + if len(lockedTaskIDs) == 0 { + return + } + + // 4. 发布 outbox 事件:这里只保证"成功入 outbox 或返回错误",不等待消费者执行完成。 + publishErr := PublishTaskUrgencyPromoteRequested(ctx, ts.eventPublisher, model.TaskUrgencyPromoteRequestedPayload{ + UserID: userID, + TaskIDs: lockedTaskIDs, + TriggeredAt: time.Now(), + }) + if publishErr != nil { + // 4.1 失败回滚:释放本次抢到的去重锁,避免后续请求因误锁而无法再投递。 + ts.releaseTaskPromoteLocks(lockedKeys) + log.Printf("任务平移事件发布失败: user_id=%d task_ids=%v err=%v", userID, lockedTaskIDs, publishErr) + return + } + + log.Printf("任务平移事件已发布: user_id=%d task_ids=%v", userID, lockedTaskIDs) +} + +// releaseTaskPromoteLocks 释放任务平移去重锁。 +// +// 说明: +// 1. 仅用于"发布失败回滚"场景; +// 2. 使用 Background 避免请求上下文已取消时导致锁释放失败。 +func (ts *TaskService) releaseTaskPromoteLocks(lockKeys []string) { + if len(lockKeys) == 0 { + return + } + releaseCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + for _, key := range lockKeys { + if err := ts.cache.ReleaseLock(releaseCtx, key); err != nil { + log.Printf("任务平移去重锁释放失败: key=%s err=%v", key, err) + } + } +} + +// compactPositiveUniqueTaskIDs 对任务 ID 做"过滤非正数 + 去重"。 +// +// 职责边界: +// 1. 只做参数清洗; +// 2. 不承载业务规则判断。 +func compactPositiveUniqueTaskIDs(taskIDs []int) []int { + return compactPositiveUniqueTaskIDsWithLimit(taskIDs, 0) +} + +// compactPositiveUniqueTaskIDsWithLimit 对任务 ID 做"过滤非正数 + 去重 + 可选限量"。 +// +// 职责边界: +// 1. 只做纯参数归一化,不查询任务、不判断权限; +// 2. limit <= 0 表示不限制数量,供既有调用保持原行为; +// 3. 达到 limit 后立即停止扫描,避免超长请求继续消耗 CPU。 +func compactPositiveUniqueTaskIDsWithLimit(taskIDs []int, limit int) []int { + seen := make(map[int]struct{}, len(taskIDs)) + result := make([]int, 0, len(taskIDs)) + for _, taskID := range taskIDs { + if taskID <= 0 { + continue + } + if _, exists := seen[taskID]; exists { + continue + } + seen[taskID] = struct{}{} + result = append(result, taskID) + if limit > 0 && len(result) >= limit { + break + } + } + return result +} + +// UpdateTask 更新用户指定任务的属性(部分更新)。 +// +// 职责边界: +// 1. 负责参数校验:task_id 合法性、priority_group 范围; +// 2. 负责将请求 DTO 转换为 DAO 层的 updates map; +// 3. 空请求体(无字段需要更新)返回明确业务错误; +// 4. 不负责缓存删除(由 GORM cache_deleter 回调自动处理)。 +func (ts *TaskService) UpdateTask(ctx context.Context, req *model.UserUpdateTaskRequest, userID int) (model.GetUserTaskResp, error) { + // 1. 参数兜底。 + if req == nil || userID <= 0 || req.TaskID <= 0 { + return model.GetUserTaskResp{}, respond.WrongTaskID + } + + // 2. 构造 updates map:只有非 nil 的字段才写入。 + updates := make(map[string]interface{}) + if req.Title != nil { + updates["title"] = *req.Title + } + if req.PriorityGroup != nil { + // 2.1 优先级范围校验:当前任务体系只允许 1~4。 + if *req.PriorityGroup < 1 || *req.PriorityGroup > 4 { + return model.GetUserTaskResp{}, respond.InvalidPriority + } + // 2.2 JSON 字段名是 priority_group,数据库列名是 priority。 + updates["priority"] = *req.PriorityGroup + } + if req.DeadlineAt != nil { + updates["deadline_at"] = *req.DeadlineAt + } + if req.UrgencyThresholdAt != nil { + updates["urgency_threshold_at"] = *req.UrgencyThresholdAt + } + + // 3. 空更新检测:至少需要一个可更新字段。 + if len(updates) == 0 { + return model.GetUserTaskResp{}, respond.TaskUpdateNoFields + } + + // 4. 调用 DAO 执行更新。 + updatedTask, err := ts.dao.UpdateTaskByID(ctx, userID, req.TaskID, updates) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return model.GetUserTaskResp{}, respond.WrongTaskID + } + return model.GetUserTaskResp{}, err + } + ts.syncActiveScheduleJobBestEffort(ctx, updatedTask) + + // 5. 转换为响应 DTO。 + return conv.ModelToGetUserTaskResp(updatedTask), nil +} + +// DeleteTask 永久删除用户指定任务。 +// +// 职责边界: +// 1. 负责入参校验与业务错误映射; +// 2. 负责调用 DAO 执行硬删除; +// 3. 任务不存在时返回幂等信息码(TaskAlreadyDeleted); +// 4. 不负责缓存删除(由 GORM cache_deleter 回调自动处理)。 +func (ts *TaskService) DeleteTask(ctx context.Context, req *model.UserCompleteTaskRequest, userID int) (int, error) { + // 1. 参数兜底。 + if req == nil || userID <= 0 || req.TaskID <= 0 { + return 0, respond.WrongTaskID + } + + // 2. 调用 DAO 执行删除。 + deletedTask, err := ts.dao.DeleteTaskByID(ctx, userID, req.TaskID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // 2.1 任务不存在或不属于当前用户:按幂等语义返回信息码。 + return 0, respond.TaskAlreadyDeleted + } + return 0, err + } + ts.cancelActiveScheduleJobBestEffort(ctx, deletedTask.UserID, deletedTask.ID, "task_deleted") + + return deletedTask.ID, nil +} diff --git a/backend/services/task/sv/task_active_schedule.go b/backend/services/task/sv/task_active_schedule.go new file mode 100644 index 0000000..199588f --- /dev/null +++ b/backend/services/task/sv/task_active_schedule.go @@ -0,0 +1,91 @@ +package sv + +import ( + "context" + "errors" + "fmt" + "log" + "time" + + "github.com/LoveLosita/smartflow/backend/model" + "gorm.io/gorm" +) + +// syncActiveScheduleJobBestEffort 在任务变更后同步主动调度 due job。 +// +// 职责边界: +// 1. 只维护 important_urgent_task 的 job,不直接触发主动调度主链路; +// 2. 任务未完成且存在 urgency_threshold_at 时 upsert pending job; +// 3. 任务已完成或阈值为空时取消当前 pending job; +// 4. 当前任务接口尚未整体事务化,job 同步失败只记日志,避免任务主写入出现“已落库但接口失败”的更差体验。 +func (ts *TaskService) syncActiveScheduleJobBestEffort(ctx context.Context, task *model.Task) { + if ts == nil || ts.activeScheduleDAO == nil || task == nil { + return + } + if task.IsCompleted || task.UrgencyThresholdAt == nil { + ts.cancelActiveScheduleJobBestEffort(ctx, task.UserID, task.ID, "task_not_schedulable") + return + } + + job := &model.ActiveScheduleJob{ + ID: activeScheduleJobID(task.UserID, task.ID), + UserID: task.UserID, + TaskID: task.ID, + TriggerType: model.ActiveScheduleTriggerTypeImportantUrgentTask, + Status: model.ActiveScheduleJobStatusPending, + TriggerAt: *task.UrgencyThresholdAt, + DedupeKey: activeScheduleTriggerDedupeKey(task.UserID, task.ID, *task.UrgencyThresholdAt), + TraceID: activeScheduleTraceID(task.UserID, task.ID), + } + if err := ts.activeScheduleDAO.CreateOrUpdateJob(ctx, job); err != nil { + log.Printf("主动调度 job upsert 失败: user_id=%d task_id=%d err=%v", task.UserID, task.ID, err) + } +} + +// cancelActiveScheduleJobBestEffort 取消任务当前待触发 job。 +// +// 职责边界: +// 1. 只取消 pending job,历史 triggered/skipped/failed 记录保留审计; +// 2. 找不到 pending job 属于正常幂等场景; +// 3. reason 只进入 last_error_code,方便后续排障知道取消来源。 +func (ts *TaskService) cancelActiveScheduleJobBestEffort(ctx context.Context, userID int, taskID int, reason string) { + if ts == nil || ts.activeScheduleDAO == nil || userID <= 0 || taskID <= 0 { + return + } + job, err := ts.activeScheduleDAO.FindPendingJobByTask(ctx, userID, taskID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return + } + log.Printf("主动调度 pending job 查询失败: user_id=%d task_id=%d err=%v", userID, taskID, err) + return + } + now := time.Now() + updates := map[string]any{ + "status": model.ActiveScheduleJobStatusCanceled, + "last_error_code": reason, + "last_scanned_at": &now, + } + if err = ts.activeScheduleDAO.UpdateJobFields(ctx, job.ID, updates); err != nil { + log.Printf("主动调度 pending job 取消失败: user_id=%d task_id=%d job_id=%s err=%v", userID, taskID, job.ID, err) + } +} + +func activeScheduleJobID(userID int, taskID int) string { + return fmt.Sprintf("asj_task_%d_%d", userID, taskID) +} + +func activeScheduleTraceID(userID int, taskID int) string { + return fmt.Sprintf("trace_active_task_%d_%d", userID, taskID) +} + +func activeScheduleTriggerDedupeKey(userID int, taskID int, triggerAt time.Time) string { + windowStart := triggerAt.Truncate(30 * time.Minute) + return fmt.Sprintf("%d:%s:%s:%d:%s", + userID, + model.ActiveScheduleTriggerTypeImportantUrgentTask, + model.ActiveScheduleTargetTypeTaskPool, + taskID, + windowStart.Format(time.RFC3339), + ) +} diff --git a/backend/shared/contracts/task/types.go b/backend/shared/contracts/task/types.go new file mode 100644 index 0000000..5876599 --- /dev/null +++ b/backend/shared/contracts/task/types.go @@ -0,0 +1,73 @@ +package task + +import "time" + +// AddTaskRequest 是 task 服务新增任务的跨进程契约。 +// +// 职责边界: +// 1. 只承载 gateway 鉴权后补齐的 user_id 和前端任务字段; +// 2. 不承载 HTTP token、幂等键或缓存语义; +// 3. 业务校验仍由 task 服务内部完成。 +type AddTaskRequest struct { + UserID int `json:"user_id"` + Title string `json:"title"` + PriorityGroup int `json:"priority_group"` + EstimatedSections int `json:"estimated_sections"` + DeadlineAt *time.Time `json:"deadline_at"` +} + +type UserRequest struct { + UserID int `json:"user_id"` +} + +type CompleteTaskRequest struct { + UserID int `json:"user_id"` + TaskID int `json:"task_id"` +} + +type UndoCompleteTaskRequest struct { + UserID int `json:"user_id"` + TaskID int `json:"task_id"` +} + +type DeleteTaskRequest struct { + UserID int `json:"user_id"` + TaskID int `json:"task_id"` +} + +type UpdateTaskRequest struct { + UserID int `json:"user_id"` + TaskID int `json:"task_id"` + Title *string `json:"title"` + PriorityGroup *int `json:"priority_group"` + DeadlineAt *time.Time `json:"deadline_at"` + UrgencyThresholdAt *time.Time `json:"urgency_threshold_at"` +} + +type BatchTaskStatusRequest struct { + UserID int `json:"user_id"` + IDs []int `json:"ids"` +} + +type TaskFactRequest struct { + UserID int `json:"user_id"` + TaskID int `json:"task_id"` + Now time.Time `json:"now"` +} + +// TaskFact 是 active-scheduler 读取 task_pool 事实时需要的最小快照。 +type TaskFact struct { + ID int `json:"id"` + UserID int `json:"user_id"` + Title string `json:"title"` + Priority int `json:"priority"` + IsCompleted bool `json:"is_completed"` + DeadlineAt *time.Time `json:"deadline_at,omitempty"` + UrgencyThresholdAt *time.Time `json:"urgency_threshold_at,omitempty"` + EstimatedSections int `json:"estimated_sections"` +} + +type TaskFactResponse struct { + Task TaskFact `json:"task"` + Found bool `json:"found"` +} diff --git a/backend/shared/ports/task.go b/backend/shared/ports/task.go new file mode 100644 index 0000000..4ad93e8 --- /dev/null +++ b/backend/shared/ports/task.go @@ -0,0 +1,24 @@ +package ports + +import ( + "context" + "encoding/json" + + taskcontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/task" +) + +// TaskCommandClient 是 gateway 调用 task 服务的最小能力集合。 +// +// 职责边界: +// 1. 只覆盖当前 `/api/v1/task/*` HTTP 门面需要的能力; +// 2. 不暴露 task DAO、outbox 状态机或 active-scheduler due job 同步细节; +// 3. 复杂响应先以 JSON 透传,避免 gateway 复制 task 内部 DTO。 +type TaskCommandClient interface { + AddTask(ctx context.Context, req taskcontracts.AddTaskRequest) (json.RawMessage, error) + GetUserTasks(ctx context.Context, userID int) (json.RawMessage, error) + BatchTaskStatus(ctx context.Context, req taskcontracts.BatchTaskStatusRequest) (json.RawMessage, error) + CompleteTask(ctx context.Context, req taskcontracts.CompleteTaskRequest) (json.RawMessage, error) + UndoCompleteTask(ctx context.Context, req taskcontracts.UndoCompleteTaskRequest) (json.RawMessage, error) + UpdateTask(ctx context.Context, req taskcontracts.UpdateTaskRequest) (json.RawMessage, error) + DeleteTask(ctx context.Context, req taskcontracts.DeleteTaskRequest) (json.RawMessage, error) +} diff --git a/docs/backend/微服务四步迁移与第二阶段并行开发计划.md b/docs/backend/微服务四步迁移与第二阶段并行开发计划.md index 2553c47..5aaef24 100644 --- a/docs/backend/微服务四步迁移与第二阶段并行开发计划.md +++ b/docs/backend/微服务四步迁移与第二阶段并行开发计划.md @@ -430,14 +430,18 @@ flowchart LR ### 4.9 阶段 5:再拆 schedule / task / course / task-class -当前进展(2026-05-04 首刀): +当前进展(2026-05-04): -1. `schedule` 已开始服务化:新增 `cmd/schedule`、`services/schedule/{dao,rpc,sv,core}`、`gateway/client/schedule`、`shared/contracts/schedule` 和 `shared/ports` schedule port。 +1. 首刀 `schedule` 已完成服务化:新增 `cmd/schedule`、`services/schedule/{dao,rpc,sv,core}`、`gateway/client/schedule`、`shared/contracts/schedule` 和 `shared/ports` schedule port。 2. gateway 的 `/api/v1/schedule/*` HTTP 门面已切到 schedule zrpc client;gateway 不再通过 `backend/service.ScheduleService` 直接承载 schedule HTTP 入口业务。 -3. active-scheduler 的 schedule facts / feedback / confirm apply 已改为调用 schedule RPC adapter;`cmd/active-scheduler` 启动依赖检查已移除 `schedule_events`、`schedules`、`task_classes`、`task_items`,迁移期仍直接读取 `tasks`。 -4. gateway schedule client 和 active-scheduler schedule RPC adapter 已接入 `Ping` 启动期健康检查;单体聊天主动调度 rerun 的 schedule facts / feedback / apply 也已切到 schedule RPC,task facts 暂时仍走 Gorm。 -5. 旧实现仍保留:`backend/service/schedule.go`、`backend/dao/schedule.go`、active-scheduler 旧 Gorm apply adapter 暂时保留,用于 agent 迁移期、单体残留路径和回退。 -6. 当前切流点:HTTP schedule 流量进入 `cmd/schedule`;active-scheduler 正式写日程进入 schedule 服务;course / task-class / agent 内部仍存在直接 DAO 调用,后续按域继续切。 +3. active-scheduler 的 schedule facts / feedback / confirm apply 已改为调用 schedule RPC adapter;`cmd/active-scheduler` 启动依赖检查已移除 `schedule_events`、`schedules`、`task_classes`、`task_items`。 +4. 第二刀 `task` 已开始服务化:新增 `cmd/task`、`services/task/{dao,rpc,sv}`、`gateway/client/task`、`shared/contracts/task` 和 `shared/ports` task port。 +5. gateway 的 `/api/v1/task/*` HTTP 门面已切到 task zrpc client;gateway 只负责鉴权、参数绑定、短超时和响应透传,不再直接调用 `backend/service.TaskService`。 +6. active-scheduler 的 task facts / due job scanner 已切到 task RPC adapter;`cmd/active-scheduler` 启动依赖检查已移除 `tasks`,进一步缩小 active-scheduler 对跨域主库表的直接依赖。 +7. `task.urgency.promote.requested` 的 handler、relay、retry loop 已迁入 `cmd/task`;单体 outbox worker 只保留 agent / memory consumer,Agent 残留查询链路只允许 publish-only 写入 `task_outbox_messages`,避免单体和 task 独立服务抢同一 task consumer group。 +8. 旧实现仍保留:`backend/service/schedule.go`、`backend/dao/schedule.go`、`backend/service/task.go`、`backend/dao/task.go`、active-scheduler 旧 Gorm apply adapter 暂时保留,用于 agent 迁移期、单体残留路径和回退。 +9. 当前切流点:HTTP schedule 流量进入 `cmd/schedule`;HTTP task 流量进入 `cmd/task`;active-scheduler 读取 task/schedule facts 与正式写日程均走 RPC;course / task-class / agent 内部仍存在直接 DAO 调用,后续按域继续切。 +10. 当前残留跨域 DB 依赖:task 服务迁移期仍 best-effort 写 `active_schedule_jobs`;active-scheduler 仍直接写 agent 会话 / timeline 和 notification outbox 相关表;agent 本地 task 查询链路仍保留旧 `TaskService` 作为迁移期适配。 目标: @@ -448,7 +452,7 @@ flowchart LR 这一步要做的事: -1. `schedule` 先独立,再看 `task`、`course`、`task-class`。 +1. `schedule`、`task` 已先后独立;下一轮优先评估 `task-class`,再看 `course`。 2. 每个领域只维护自己的写模型。 3. 通过事件或明确 RPC 契约通信。 4. 继续保持并行迁移,旧实现和新实现可以短期并存。 @@ -456,8 +460,8 @@ flowchart LR 建议提交点: 1. schedule 切流完成后 commit。 -2. course / task-class 切流完成后 commit。 -3. task 切流完成后 commit。 +2. task 切流完成后 commit。 +3. course / task-class 切流完成后 commit。 建议测试: @@ -1080,7 +1084,17 @@ graph TD 3. `backend/gateway/api` 是 HTTP 门面统一目录,`backend/gateway/client/activescheduler` 是 gateway 侧 zrpc client。 4. `backend/shared/contracts/activescheduler` 和 `backend/shared/ports` 只承载跨层契约和端口接口,不承载服务私有业务实现。 5. `cmd/all` 不再启动 active-scheduler workflow / scanner / handler;完整本地 smoke 需要同时启动 `cmd/all`、`cmd/userauth`、`cmd/notification` 和 `cmd/active-scheduler`。 -6. 迁移期仍共享主库访问 task、schedule、agent 会话和 notification outbox 相关表;active-scheduler 启动时做依赖表检查,后续由 schedule/task/agent 拆分继续缩小这条共享边界。 +6. 阶段 4 收口时仍共享主库访问 task、schedule、agent 会话和 notification outbox 相关表;阶段 5 已先通过 schedule / task RPC 继续缩小这条共享边界。 + +阶段 5 当前基线: + +1. `backend/cmd/schedule/main.go` 是 schedule 独立进程入口,`backend/cmd/task/main.go` 是 task 独立进程入口,二者各自初始化 DB / Redis / zrpc server 和所需服务内资源。 +2. `backend/services/schedule` 拥有正式日程领域核心,`backend/services/task` 拥有任务池读写、完成/撤销、紧急性平移和 task outbox handler。 +3. `backend/gateway/api` 继续作为 HTTP 门面统一目录,`backend/gateway/client/schedule` 与 `backend/gateway/client/task` 作为 gateway 侧 zrpc client。 +4. `backend/shared/contracts/schedule`、`backend/shared/contracts/task` 和 `backend/shared/ports` 只承载跨进程契约与端口接口,不放 DAO、model 或业务状态机。 +5. active-scheduler 的 schedule facts / feedback / confirm apply 已走 schedule RPC,task facts / due job scanner 已走 task RPC;启动依赖检查不再要求 `schedule_events`、`schedules`、`task_classes`、`task_items` 或 `tasks`。 +6. `task.urgency.promote.requested` 的消费边界已迁入 `cmd/task`;单体 outbox worker 不再启动 task service bus,只保留 Agent 残留路径的 publish-only 写入能力,避免迁移期重复 relay / consume。 +7. 本阶段残留:task 服务仍 best-effort 写 `active_schedule_jobs`;agent 本地 task 查询、quick task 创建、course / task-class 仍存在直接 DAO 调用;active-scheduler 旧 Gorm apply adapter 保留为迁移期残留,不作为新流量主路径。 ---