Files
smartmate/backend/cmd/start.go
Losita 7d21b6516f Version: 0.9.56.dev.260429
后端:
1. 启动层完成第一轮运行边界拆分,新增 `all / api / worker` 三种进程模式:`all` 保持原单体行为,`api` 只启动 Gin 与同步业务依赖,`worker` 只启动 outbox、Kafka consumer 与 memory worker。
2. 启动装配从单个入口拆成 runtime 依赖图,配置、DB、Redis、RAG、memory、DAO、Service、Handler、newAgent 依赖统一集中构造,再按进程角色选择启动 HTTP 或后台循环。
3. outbox 事件总线补齐 dispatch / consume 分离启动能力,支持后续 relay 与 consumer 独立进程化,同时保留原组合启动语义。
4. 核心 outbox handler 注册收口为公共接线入口,统一校验依赖并复用注册顺序,避免 api / worker / all 多入口复制事件注册逻辑。

迁移说明:
5. 本轮只迁运行边界,不拆业务服务边界;旧单体入口仍保留并默认走 `all` 兼容模式,当前切流点是 API 不再消费异步事件,worker 承担后台消费与 memory 任务。
6. 补充微服务四步迁移与第二阶段并行开发计划,明确先拆 API/Worker,再接主动调度与飞书通知,后续再拆 notification、active-scheduler、schedule/task。
2026-04-29 17:44:42 +08:00

538 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package cmd
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/LoveLosita/smartflow/backend/api"
"github.com/LoveLosita/smartflow/backend/dao"
kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka"
infrallm "github.com/LoveLosita/smartflow/backend/infra/llm"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
infrarag "github.com/LoveLosita/smartflow/backend/infra/rag"
ragconfig "github.com/LoveLosita/smartflow/backend/infra/rag/config"
"github.com/LoveLosita/smartflow/backend/inits"
"github.com/LoveLosita/smartflow/backend/memory"
memorymodel "github.com/LoveLosita/smartflow/backend/memory/model"
memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe"
"github.com/LoveLosita/smartflow/backend/middleware"
"github.com/LoveLosita/smartflow/backend/model"
newagentconv "github.com/LoveLosita/smartflow/backend/newAgent/conv"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools"
"github.com/LoveLosita/smartflow/backend/newAgent/tools/web"
"github.com/LoveLosita/smartflow/backend/pkg"
"github.com/LoveLosita/smartflow/backend/routers"
"github.com/LoveLosita/smartflow/backend/service"
eventsvc "github.com/LoveLosita/smartflow/backend/service/events"
"github.com/go-redis/redis/v8"
"github.com/spf13/viper"
"gorm.io/gorm"
)
// appRuntime 承载一次进程启动所需的依赖图。
//
// 职责边界:
// 1. 只负责保存启动期已经装配好的基础设施、仓储、服务和 HTTP handler
// 2. 不承载业务逻辑,业务仍然由 service / newAgent / memory 等领域模块负责;
// 3. 不决定进程角色api / worker / all 由 StartAPI、StartWorker、StartAll 选择启动哪些生命周期。
type appRuntime struct {
db *gorm.DB
redisClient *redis.Client
cacheRepo *dao.CacheDAO
userRepo *dao.UserDAO
agentRepo *dao.AgentDAO
agentCache *dao.AgentCache
manager *dao.RepoManager
outboxRepo *outboxinfra.Repository
eventBus *outboxinfra.EventBus
memoryModule *memory.Module
limiter *pkg.RateLimiter
handlers *api.ApiHandlers
}
// loadConfig 加载应用配置。
func loadConfig() error {
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
// 1. 兼容从仓库根目录执行 `go run ./backend/cmd/api` 的场景;
// 2. 从 backend 目录执行时仍优先命中当前目录,不改变现有默认行为。
viper.AddConfigPath("backend")
if err := viper.ReadInConfig(); err != nil {
return fmt.Errorf("failed to read config file: %w", err)
}
log.Println("Config loaded successfully")
return nil
}
// Start 保留历史入口,默认仍按 all 模式启动。
//
// 职责边界:
// 1. 兼容 backend/main.go 以及旧部署命令;
// 2. 不新增业务语义,只委托给 StartAll
// 3. 后续若部署全面切到独立 api/worker可逐步废弃该兼容入口。
func Start() {
StartAll()
}
// StartAll 启动迁移期兼容模式HTTP API 与后台 worker 在同一进程内运行。
func StartAll() {
ctx := context.Background()
runtime := mustBuildRuntime(ctx)
defer runtime.close()
runtime.startWorkers(ctx)
runtime.startHTTP()
}
// StartAPI 只启动 Gin API 及其同步 service/dao 依赖,不启动后台 worker。
//
// 说明:
// 1. 该模式仍是“带 service/dao 的 API 单体”,不是最终 API Gateway
// 2. API 可以继续写入 outbox但不负责消费 outbox也不启动 memory worker
// 3. worker 停止时API 仍可提供同步接口,只是异步能力会延迟处理。
func StartAPI() {
ctx := context.Background()
runtime := mustBuildRuntime(ctx)
defer runtime.close()
runtime.startHTTP()
}
// StartWorker 只启动后台异步能力,不注册 Gin 路由。
//
// 运行内容:
// 1. outbox relay扫描 pending 消息并投递 Kafka
// 2. Kafka consumer消费事件并分发到业务 handler
// 3. memory worker处理 memory_jobs 后台任务。
func StartWorker() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
runtime := mustBuildRuntime(ctx)
defer runtime.close()
runtime.startWorkers(ctx)
log.Println("Worker process started")
<-ctx.Done()
log.Println("Worker process stopping")
}
func mustBuildRuntime(ctx context.Context) *appRuntime {
runtime, err := buildRuntime(ctx)
if err != nil {
log.Fatalf("Failed to initialize application runtime: %v", err)
}
return runtime
}
// buildRuntime 装配应用依赖图,但不启动 HTTP 或后台循环。
//
// 步骤说明:
// 1. 先初始化配置、数据库、Redis、模型、RAG、memory 等基础设施;
// 2. 再构造 DAO / Service / newAgent 依赖;
// 3. 最后构造 HTTP handlers供 api/all 模式按需启动;
// 4. worker 模式暂时也复用完整依赖图,避免同轮迁移拆出两套装配逻辑。
func buildRuntime(ctx context.Context) (*appRuntime, error) {
if err := loadConfig(); err != nil {
return nil, err
}
db, err := inits.ConnectDB()
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}
rdb := inits.InitRedis()
limiter := pkg.NewRateLimiter(rdb)
aiHub, err := inits.InitEino()
if err != nil {
return nil, fmt.Errorf("failed to initialize Eino: %w", err)
}
ragRuntime, err := buildRAGRuntime(ctx)
if err != nil {
return nil, err
}
memoryCfg := memory.LoadConfigFromViper()
memoryObserver := memoryobserve.NewLoggerObserver(log.Default())
memoryMetrics := memoryobserve.NewMetricsRegistry()
memoryModule := memory.NewModuleWithObserve(
db,
infrallm.WrapArkClient(aiHub.Pro),
ragRuntime,
memoryCfg,
memory.ObserveDeps{
Observer: memoryObserver,
Metrics: memoryMetrics,
},
)
// DAO 层初始化。
cacheRepo := dao.NewCacheDAO(rdb)
agentCacheRepo := dao.NewAgentCache(rdb)
_ = db.Use(middleware.NewGormCachePlugin(cacheRepo))
userRepo := dao.NewUserDAO(db)
taskRepo := dao.NewTaskDAO(db)
courseRepo := dao.NewCourseDAO(db)
taskClassRepo := dao.NewTaskClassDAO(db)
scheduleRepo := dao.NewScheduleDAO(db)
manager := dao.NewManager(db)
agentRepo := dao.NewAgentDAO(db)
outboxRepo := outboxinfra.NewRepository(db)
eventBus, err := buildEventBus(outboxRepo)
if err != nil {
return nil, err
}
// Service 层初始化。
userService := service.NewUserService(userRepo, cacheRepo)
taskSv := service.NewTaskService(taskRepo, cacheRepo, eventBus)
courseService := buildCourseService(courseRepo, scheduleRepo)
taskClassService := service.NewTaskClassService(taskClassRepo, cacheRepo, scheduleRepo, manager)
scheduleService := service.NewScheduleService(scheduleRepo, userRepo, taskClassRepo, manager, cacheRepo)
agentService := service.NewAgentServiceWithSchedule(aiHub, agentRepo, taskRepo, cacheRepo, agentCacheRepo, eventBus, scheduleService, taskSv)
configureAgentService(
agentService,
ragRuntime,
agentRepo,
cacheRepo,
taskRepo,
taskClassRepo,
scheduleRepo,
memoryModule,
memoryCfg,
)
handlers := buildAPIHandlers(userService, taskSv, taskClassService, courseService, scheduleService, agentService, memoryModule)
return &appRuntime{
db: db,
redisClient: rdb,
cacheRepo: cacheRepo,
userRepo: userRepo,
agentRepo: agentRepo,
agentCache: agentCacheRepo,
manager: manager,
outboxRepo: outboxRepo,
eventBus: eventBus,
memoryModule: memoryModule,
limiter: limiter,
handlers: handlers,
}, nil
}
func buildRAGRuntime(ctx context.Context) (infrarag.Runtime, error) {
ragCfg := ragconfig.LoadFromViper()
if !ragCfg.Enabled {
log.Println("RAG runtime is disabled")
return nil, nil
}
// 1. 当前项目尚未完成全局观测平台建设,这里先注入一层轻量 Observer
// 2. RAG 内部只依赖 Observer 接口,后续若全项目统一日志/指标系统,只需替换这里;
// 3. 这样可以避免 RAG 单独自建一套割裂的日志基础设施。
ragLogger := log.Default()
ragRuntime, err := infrarag.NewRuntimeFromConfig(ctx, ragCfg, infrarag.FactoryDeps{
Logger: ragLogger,
Observer: infrarag.NewLoggerObserver(ragLogger),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize RAG runtime: %w", err)
}
log.Printf("RAG runtime initialized: store=%s embed=%s reranker=%s", ragCfg.Store, ragCfg.EmbedProvider, ragCfg.RerankerProvider)
return ragRuntime, nil
}
func buildEventBus(outboxRepo *outboxinfra.Repository) (*outboxinfra.EventBus, error) {
// outbox 通用事件总线接线:
// 1. API 模式只使用 Publish 写入 outbox不启动后台循环
// 2. worker/all 模式再显式注册 handler 并启动后台循环;
// 3. kafka.enabled=false 时返回 nil业务按既有降级策略执行。
kafkaCfg := kafkabus.LoadConfig()
eventBus, err := outboxinfra.NewEventBus(outboxRepo, kafkaCfg)
if err != nil {
return nil, fmt.Errorf("failed to initialize outbox event bus: %w", err)
}
if eventBus == nil {
log.Println("Outbox event bus is disabled")
}
return eventBus, nil
}
func buildCourseService(courseRepo *dao.CourseDAO, scheduleRepo *dao.ScheduleDAO) *service.CourseService {
courseImageResponsesClient := infrallm.NewArkResponsesClient(
os.Getenv("ARK_API_KEY"),
viper.GetString("agent.baseURL"),
viper.GetString("courseImport.visionModel"),
)
return service.NewCourseService(
courseRepo,
scheduleRepo,
courseImageResponsesClient,
service.NewCourseImageParseConfig(
viper.GetInt64("courseImport.maxImageBytes"),
viper.GetInt("courseImport.maxTokens"),
),
viper.GetString("courseImport.visionModel"),
)
}
func configureAgentService(
agentService *service.AgentService,
ragRuntime infrarag.Runtime,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
taskRepo *dao.TaskDAO,
taskClassRepo *dao.TaskClassDAO,
scheduleRepo *dao.ScheduleDAO,
memoryModule *memory.Module,
memoryCfg memorymodel.Config,
) {
if agentService == nil {
return
}
// newAgent 依赖接线。
agentService.SetAgentStateStore(dao.NewAgentStateStoreAdapter(cacheRepo))
var webSearchProvider web.SearchProvider
webProvider := viper.GetString("websearch.provider")
switch webProvider {
case "bocha":
bochaKey := viper.GetString("websearch.apiKey")
if bochaKey == "" {
log.Println("WebSearch: 博查 API Key 为空,降级为 mock")
webSearchProvider = &web.MockProvider{}
} else {
webSearchProvider = web.NewBochaProvider(bochaKey, "")
log.Println("WebSearch provider: bocha")
}
case "mock", "":
webSearchProvider = &web.MockProvider{}
log.Println("WebSearch provider: mock模拟模式")
default:
// 未识别的 provider 类型降级为 mock 并输出警告。
log.Printf("WebSearch provider %q 未识别,降级为 mock", webProvider)
webSearchProvider = &web.MockProvider{}
}
agentService.SetToolRegistry(newagenttools.NewDefaultRegistryWithDeps(newagenttools.DefaultRegistryDeps{
RAGRuntime: ragRuntime,
WebSearchProvider: webSearchProvider,
TaskClassWriteDeps: newagenttools.TaskClassWriteDeps{
UpsertTaskClass: buildTaskClassUpsertFunc(taskClassRepo),
},
}))
agentService.SetScheduleProvider(newagentconv.NewScheduleProvider(scheduleRepo, taskClassRepo))
agentService.SetCompactionStore(agentRepo)
agentService.SetQuickTaskDeps(newagentmodel.QuickTaskDeps{
CreateTask: buildQuickTaskCreateFunc(taskRepo),
QueryTasks: buildQuickTaskQueryFunc(agentService),
})
agentService.SetMemoryReader(memoryModule, memoryCfg)
}
func buildTaskClassUpsertFunc(taskClassRepo *dao.TaskClassDAO) func(userID int, input newagenttools.TaskClassUpsertInput) (newagenttools.TaskClassUpsertPersistResult, error) {
return func(userID int, input newagenttools.TaskClassUpsertInput) (newagenttools.TaskClassUpsertPersistResult, error) {
req := input.Request
taskClassID := 0
created := input.ID == 0
err := taskClassRepo.Transaction(func(txDAO *dao.TaskClassDAO) error {
// 1. 先构造任务类主体,保持与现有 AddOrUpdateTaskClass 口径一致。
taskClass := &model.TaskClass{
ID: input.ID,
Name: &req.Name,
Mode: &req.Mode,
SubjectType: stringPtrOrNil(req.SubjectType),
DifficultyLevel: stringPtrOrNil(req.DifficultyLevel),
CognitiveIntensity: stringPtrOrNil(req.CognitiveIntensity),
TotalSlots: &req.Config.TotalSlots,
Strategy: &req.Config.Strategy,
ExcludedSlots: req.Config.ExcludedSlots,
ExcludedDaysOfWeek: req.Config.ExcludedDaysOfWeek,
}
taskClass.AllowFillerCourse = &req.Config.AllowFillerCourse
// 2. 自动模式下写入日期范围;手动模式允许为空。
if req.StartDate != "" {
startDate, parseErr := time.ParseInLocation("2006-01-02", req.StartDate, time.Local)
if parseErr != nil {
return parseErr
}
taskClass.StartDate = &startDate
}
if req.EndDate != "" {
endDate, parseErr := time.ParseInLocation("2006-01-02", req.EndDate, time.Local)
if parseErr != nil {
return parseErr
}
taskClass.EndDate = &endDate
}
// 3. upsert 主体后拿到稳定 task_class_id供 items 绑定 category_id。
updatedID, upsertErr := txDAO.AddOrUpdateTaskClass(userID, taskClass)
if upsertErr != nil {
return upsertErr
}
taskClassID = updatedID
// 4. 构造任务块并批量 upsert。
items := make([]model.TaskClassItem, 0, len(req.Items))
for _, itemReq := range req.Items {
categoryID := taskClassID
order := itemReq.Order
content := itemReq.Content
status := model.TaskItemStatusUnscheduled
items = append(items, model.TaskClassItem{
ID: itemReq.ID,
CategoryID: &categoryID,
Order: &order,
Content: &content,
EmbeddedTime: itemReq.EmbeddedTime,
Status: &status,
})
}
return txDAO.AddOrUpdateTaskClassItems(userID, items)
})
if err != nil {
return newagenttools.TaskClassUpsertPersistResult{}, err
}
return newagenttools.TaskClassUpsertPersistResult{
TaskClassID: taskClassID,
Created: created,
}, nil
}
}
func buildQuickTaskCreateFunc(taskRepo *dao.TaskDAO) func(userID int, title string, priorityGroup int, deadlineAt *time.Time, urgencyThresholdAt *time.Time) (int, error) {
return func(userID int, title string, priorityGroup int, deadlineAt *time.Time, urgencyThresholdAt *time.Time) (int, error) {
created, err := taskRepo.AddTask(&model.Task{
UserID: userID,
Title: title,
Priority: priorityGroup,
IsCompleted: false,
DeadlineAt: deadlineAt,
UrgencyThresholdAt: urgencyThresholdAt,
})
if err != nil {
return 0, err
}
return created.ID, nil
}
}
func buildQuickTaskQueryFunc(agentService *service.AgentService) func(ctx context.Context, userID int, params newagentmodel.TaskQueryParams) ([]newagentmodel.TaskQueryResult, error) {
return func(ctx context.Context, userID int, params newagentmodel.TaskQueryParams) ([]newagentmodel.TaskQueryResult, error) {
req := newagentmodel.TaskQueryRequest{
UserID: userID,
Quadrant: params.Quadrant,
SortBy: params.SortBy,
Order: params.Order,
Limit: params.Limit,
IncludeCompleted: params.IncludeCompleted,
Keyword: params.Keyword,
DeadlineBefore: params.DeadlineBefore,
DeadlineAfter: params.DeadlineAfter,
}
records, err := agentService.QueryTasksForTool(ctx, req)
if err != nil {
return nil, err
}
results := make([]newagentmodel.TaskQueryResult, 0, len(records))
for _, r := range records {
deadlineStr := ""
if r.DeadlineAt != nil {
deadlineStr = r.DeadlineAt.In(time.Local).Format("2006-01-02 15:04")
}
results = append(results, newagentmodel.TaskQueryResult{
ID: r.ID,
Title: r.Title,
PriorityGroup: r.PriorityGroup,
IsCompleted: r.IsCompleted,
DeadlineAt: deadlineStr,
})
}
return results, nil
}
}
func buildAPIHandlers(
userService *service.UserService,
taskService *service.TaskService,
taskClassService *service.TaskClassService,
courseService *service.CourseService,
scheduleService *service.ScheduleService,
agentService *service.AgentService,
memoryModule *memory.Module,
) *api.ApiHandlers {
return &api.ApiHandlers{
UserHandler: api.NewUserHandler(userService),
TaskHandler: api.NewTaskHandler(taskService),
TaskClassHandler: api.NewTaskClassHandler(taskClassService),
CourseHandler: api.NewCourseHandler(courseService),
ScheduleHandler: api.NewScheduleAPI(scheduleService),
AgentHandler: api.NewAgentHandler(agentService),
MemoryHandler: api.NewMemoryHandler(memoryModule),
}
}
func (r *appRuntime) startWorkers(ctx context.Context) {
if r == nil {
return
}
if r.eventBus != nil {
if err := r.registerEventHandlers(); err != nil {
log.Fatalf("Failed to register outbox event handlers: %v", err)
}
r.eventBus.Start(ctx)
log.Println("Outbox event bus started")
} else {
log.Println("Outbox event bus is disabled")
}
if r.memoryModule != nil {
r.memoryModule.StartWorker(ctx)
}
}
func (r *appRuntime) registerEventHandlers() error {
// 调用目的worker/all 启动时复用同一套核心事件注册顺序,避免未来新增入口后复制多份 handler 接线。
return eventsvc.RegisterCoreOutboxHandlers(r.eventBus, r.outboxRepo, r.manager, r.agentRepo, r.cacheRepo, r.memoryModule)
}
func (r *appRuntime) startHTTP() {
router := routers.RegisterRouters(r.handlers, r.cacheRepo, r.userRepo, r.limiter)
routers.StartEngine(router)
}
func (r *appRuntime) close() {
if r == nil {
return
}
if r.eventBus != nil {
r.eventBus.Close()
}
}
func stringPtrOrNil(value string) *string {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return nil
}
return &trimmed
}