Files
smartmate/backend/cmd/start.go
Losita 61db646805 Version: 0.9.80.dev.260506
后端:
1. LLM 独立服务与统一计费出口落地:新增 `cmd/llm`、`client/llm` 与 `services/llm/rpc`,补齐 BillingContext、CreditBalanceGuard、价格规则解析、stream usage 归集与 `credit.charge.requested` outbox 发布,active-scheduler / agent / course / memory / gateway fallback 全部改走 llm zrpc,不再各自本地初始化模型。
2. TokenStore 收口为 Credit 权威账本:新增 credit account / ledger / product / order / price-rule / reward-rule 能力与 Redis 快照缓存,扩展 tokenstore rpc/client 支撑余额快照、消耗看板、商品、订单、流水、价格规则和奖励规则,并接入 LLM charge 事件消费完成 Credit 扣费落账。
3. 计费旧链路下线与网关切口切换:`/token-store` 语义整体切到 `/credit-store`,agent chat 移除旧 TokenQuotaGuard,userauth 的 CheckTokenQuota / AdjustTokenUsage 改为废弃,聊天历史落库不再同步旧 token 额度账本,course 图片解析请求补 user_id 进入新计费口径。

前端:
4. 计划广场从 mock 数据切到真实接口:新增 forum api/types,首页支持真实列表、标签、搜索、防抖、点赞、导入和发布计划,详情页补齐帖子详情、评论树、回复和删除评论链路,同时补上“至少一个标签”的前后端约束与默认标签兜底。
5. 商店页切到 Credit 体系并重做展示:顶部改为余额 + Credit/Token 消耗看板,支持 24h/7d/30d/all 周期切换;套餐区展示原价与当前价;历史区改为当前用户 Credit 流水并支持查看更多,整体视觉和交互同步收口。

仓库:
6. 配置与本地启动体系补齐 llm / outbox 编排:`config.example.yaml` 增加 llm rpc 和统一 outbox service 配置,`dev-common.ps1` 把 llm 纳入多服务依赖并自动建 Kafka topic,`docker-compose.yml` 同步初始化 agent/task/memory/active-scheduler/notification/taskclass-forum/llm/token-store 全量 outbox topic。
2026-05-06 20:16:53 +08:00

892 lines
36 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package cmd
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
activeschedulerclient "github.com/LoveLosita/smartflow/backend/client/activescheduler"
agentclient "github.com/LoveLosita/smartflow/backend/client/agent"
courseclient "github.com/LoveLosita/smartflow/backend/client/course"
llmclient "github.com/LoveLosita/smartflow/backend/client/llm"
memoryclient "github.com/LoveLosita/smartflow/backend/client/memory"
notificationclient "github.com/LoveLosita/smartflow/backend/client/notification"
scheduleclient "github.com/LoveLosita/smartflow/backend/client/schedule"
taskclient "github.com/LoveLosita/smartflow/backend/client/task"
taskclassclient "github.com/LoveLosita/smartflow/backend/client/taskclass"
taskclassforumclient "github.com/LoveLosita/smartflow/backend/client/taskclassforum"
tokenstoreclient "github.com/LoveLosita/smartflow/backend/client/tokenstore"
userauthclient "github.com/LoveLosita/smartflow/backend/client/userauth"
coreinit "github.com/LoveLosita/smartflow/backend/cmd/internal/coreinit"
"github.com/LoveLosita/smartflow/backend/gateway/api"
gatewayrouter "github.com/LoveLosita/smartflow/backend/gateway/router"
activeadapters "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/adapters"
activeapplyadapter "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/applyadapter"
activefeedbacklocate "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/feedbacklocate"
activegraph "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/graph"
activepreview "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/preview"
activesel "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/selection"
activesvc "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/service"
activeTrigger "github.com/LoveLosita/smartflow/backend/services/active_scheduler/core/trigger"
agentstream "github.com/LoveLosita/smartflow/backend/services/agent/stream"
agentsv "github.com/LoveLosita/smartflow/backend/services/agent/sv"
agenttools "github.com/LoveLosita/smartflow/backend/services/agent/tools"
"github.com/LoveLosita/smartflow/backend/services/agent/tools/web"
coursedao "github.com/LoveLosita/smartflow/backend/services/course/dao"
coursesv "github.com/LoveLosita/smartflow/backend/services/course/sv"
llmservice "github.com/LoveLosita/smartflow/backend/services/llm"
"github.com/LoveLosita/smartflow/backend/services/memory"
memorymodel "github.com/LoveLosita/smartflow/backend/services/memory/model"
memoryobserve "github.com/LoveLosita/smartflow/backend/services/memory/observe"
ragservice "github.com/LoveLosita/smartflow/backend/services/rag"
ragconfig "github.com/LoveLosita/smartflow/backend/services/rag/config"
"github.com/LoveLosita/smartflow/backend/services/runtime/dao"
eventsvc "github.com/LoveLosita/smartflow/backend/services/runtime/eventsvc"
"github.com/LoveLosita/smartflow/backend/services/runtime/model"
scheduledao "github.com/LoveLosita/smartflow/backend/services/schedule/dao"
schedulesv "github.com/LoveLosita/smartflow/backend/services/schedule/sv"
taskdao "github.com/LoveLosita/smartflow/backend/services/task/dao"
tasksv "github.com/LoveLosita/smartflow/backend/services/task/sv"
"github.com/LoveLosita/smartflow/backend/shared/infra/bootstrap"
gormcache "github.com/LoveLosita/smartflow/backend/shared/infra/gormcache"
kafkabus "github.com/LoveLosita/smartflow/backend/shared/infra/kafka"
outboxinfra "github.com/LoveLosita/smartflow/backend/shared/infra/outbox"
ratelimit "github.com/LoveLosita/smartflow/backend/shared/infra/ratelimit"
"github.com/LoveLosita/smartflow/backend/shared/ports"
"github.com/go-redis/redis/v8"
"github.com/spf13/viper"
"gorm.io/gorm"
)
const (
gatewayAgentRPCChatEnabledKey = "agent.rpc.chat.enabled"
gatewayAgentRPCAPIEnabledKey = "agent.rpc.api.enabled"
)
// appRuntime 承载一次进程启动所需的依赖图。
//
// 职责边界:
// 1. 只负责保存启动期已经装配好的基础设施、仓储、服务和 HTTP handler
// 2. 不承载业务逻辑,业务仍然由 service / agent / memory 等领域模块负责;
// 3. 不决定进程角色api / worker 由 StartAPI、StartWorker 选择启动哪些生命周期StartAll 仅保留兼容别名。
type appRuntime struct {
db *gorm.DB
redisClient *redis.Client
cacheRepo *dao.CacheDAO
agentRepo *dao.AgentDAO
agentCache *dao.AgentCache
manager *dao.RepoManager
outboxRepo *outboxinfra.Repository
limiter *ratelimit.RateLimiter
handlers *api.ApiHandlers
userAuthClient *userauthclient.Client
forumClient *taskclassforumclient.Client
tokenClient *tokenstoreclient.Client
}
// loadConfig 锻炼?
func loadConfig() error {
return bootstrap.LoadConfig()
}
// Start 保留历史兼容入口,当前默认等价于 StartAPI。
// 1. 兼容 backend/main.go 和旧部署命令。
// 2. 不新增业务语义,只转发给 StartAPI。
// 3. 后续若全面切到独立 api/worker 启动,本入口只保留过渡兼容。
func Start() {
StartAPI()
}
// StartAll 保留给历史入口与旧命令的兼容别名,当前语义与 StartAPI 完全一致。
// 1. cmd/all 已移除,不再作为后端本地启动标准入口。
// 2. 之所以暂时保留该函数,是为了避免仓库根兼容入口和旧脚本立刻失效。
// 3. 后续若仓库根入口一并收口,可直接删除该兼容别名。
func StartAll() {
StartAPI()
}
// StartAPI 只启动 Gin API 和其同步依赖,不启动后台 worker。
// 这仍是迁移期的单体 API 模式,不是终态的独立网关。
func StartAPI() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
runtime := mustBuildRuntime(ctx)
defer runtime.close()
runtime.startHTTP(ctx)
}
// StartWorker 保留历史 worker 入口,但阶段 6 后不再拥有 agent / memory 消费边界。
// 当前语义:
// 1. agent outbox relay / consumer 已迁到 cmd/agent
// 2. memory worker 已迁到 cmd/memory
// 3. 该入口仅用于兼容旧启动命令,后续可在 gateway 收口阶段删除。
func StartWorker() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
runtime := mustBuildRuntime(ctx)
defer runtime.close()
runtime.startWorkers(ctx)
log.Println("Worker process started")
<-ctx.Done()
log.Println("Worker process stopping")
}
func mustBuildRuntime(ctx context.Context) *appRuntime {
runtime, err := buildRuntime(ctx)
if err != nil {
log.Fatalf("Failed to initialize application runtime: %v", err)
}
return runtime
}
// buildRuntime 装配应用依赖图,但不启动 HTTP 或后台循环。
//
// 步骤说明:
// 1. 先初始化配置、数据库、Redis 等 gateway 必需基础设施;
// 2. 再构造各服务 zrpc client并按开关决定是否装配 agent 本地 fallback
// 3. 最后构造 HTTP handlers供 api/all 模式按需启动;
// 4. worker 模式暂时也复用 gateway 依赖图,但不再启动 agent / memory worker。
func buildRuntime(ctx context.Context) (*appRuntime, error) {
if err := loadConfig(); err != nil {
return nil, err
}
db, err := coreinit.ConnectCoreDB()
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}
rdb, err := coreinit.InitCoreRedis()
if err != nil {
return nil, fmt.Errorf("failed to connect to redis: %w", err)
}
limiter := ratelimit.NewRateLimiter(rdb)
// DAO 层初始化。
cacheRepo := dao.NewCacheDAO(rdb)
_ = db.Use(gormcache.NewGormCachePlugin(cacheRepo))
// Service 层初始化。
userAuthClient, err := userauthclient.NewClient(userauthclient.ClientConfig{
Endpoints: viper.GetStringSlice("userauth.rpc.endpoints"),
Target: viper.GetString("userauth.rpc.target"),
Timeout: viper.GetDuration("userauth.rpc.timeout"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize userauth zrpc client: %w", err)
}
notificationClient, err := notificationclient.NewClient(notificationclient.ClientConfig{
Endpoints: viper.GetStringSlice("notification.rpc.endpoints"),
Target: viper.GetString("notification.rpc.target"),
Timeout: viper.GetDuration("notification.rpc.timeout"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize notification zrpc client: %w", err)
}
forumClient, err := taskclassforumclient.NewClient(taskclassforumclient.ClientConfig{
Endpoints: viper.GetStringSlice("taskclassforum.rpc.endpoints"),
Target: viper.GetString("taskclassforum.rpc.target"),
Timeout: viper.GetDuration("taskclassforum.rpc.timeout"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize taskclassforum zrpc client: %w", err)
}
tokenClient, err := tokenstoreclient.NewClient(tokenstoreclient.ClientConfig{
Endpoints: viper.GetStringSlice("tokenstore.rpc.endpoints"),
Target: viper.GetString("tokenstore.rpc.target"),
Timeout: viper.GetDuration("tokenstore.rpc.timeout"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize tokenstore zrpc client: %w", err)
}
scheduleClient, err := scheduleclient.NewClient(scheduleclient.ClientConfig{
Endpoints: viper.GetStringSlice("schedule.rpc.endpoints"),
Target: viper.GetString("schedule.rpc.target"),
Timeout: viper.GetDuration("schedule.rpc.timeout"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize schedule zrpc client: %w", err)
}
taskClient, err := taskclient.NewClient(taskclient.ClientConfig{
Endpoints: viper.GetStringSlice("task.rpc.endpoints"),
Target: viper.GetString("task.rpc.target"),
Timeout: viper.GetDuration("task.rpc.timeout"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize task zrpc client: %w", err)
}
taskClassClient, err := taskclassclient.NewClient(taskclassclient.ClientConfig{
Endpoints: viper.GetStringSlice("taskClass.rpc.endpoints"),
Target: viper.GetString("taskClass.rpc.target"),
Timeout: viper.GetDuration("taskClass.rpc.timeout"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize task-class zrpc client: %w", err)
}
courseClient, err := courseclient.NewClient(courseclient.ClientConfig{
Endpoints: viper.GetStringSlice("course.rpc.endpoints"),
Target: viper.GetString("course.rpc.target"),
Timeout: viper.GetDuration("course.rpc.timeout"),
MaxImageBytes: viper.GetInt64("courseImport.maxImageBytes"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize course zrpc client: %w", err)
}
memoryClient, err := memoryclient.NewClient(memoryclient.ClientConfig{
Endpoints: viper.GetStringSlice("memory.rpc.endpoints"),
Target: viper.GetString("memory.rpc.target"),
Timeout: viper.GetDuration("memory.rpc.timeout"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize memory zrpc client: %w", err)
}
agentRPCClient, err := agentclient.NewClient(agentclient.ClientConfig{
Endpoints: viper.GetStringSlice("agent.rpc.endpoints"),
Target: viper.GetString("agent.rpc.target"),
Timeout: viper.GetDuration("agent.rpc.timeout"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize agent zrpc client: %w", err)
}
activeSchedulerClient, err := activeschedulerclient.NewClient(activeschedulerclient.ClientConfig{
Endpoints: viper.GetStringSlice("activeScheduler.rpc.endpoints"),
Target: viper.GetString("activeScheduler.rpc.target"),
Timeout: viper.GetDuration("activeScheduler.rpc.timeout"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize active-scheduler zrpc client: %w", err)
}
var agentRepo *dao.AgentDAO
var agentCacheRepo *dao.AgentCache
var manager *dao.RepoManager
var outboxRepo *outboxinfra.Repository
var agentService *agentsv.AgentService
if shouldBuildGatewayAgentFallback() {
log.Println("Gateway agent RPC fallback is enabled; building local AgentService compatibility path")
llmService, err := llmclient.NewService(llmclient.ServiceConfig{
ClientConfig: llmclient.ClientConfig{
Endpoints: viper.GetStringSlice("llm.rpc.endpoints"),
Target: viper.GetString("llm.rpc.target"),
Timeout: viper.GetDuration("llm.rpc.timeout"),
},
CourseVisionModel: viper.GetString("courseImport.visionModel"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize llm zrpc client: %w", err)
}
ragService, err := buildRAGService(ctx)
if err != nil {
return nil, err
}
ragRuntime := ragService.Runtime()
memoryCfg := memory.LoadConfigFromViper()
memoryObserver := memoryobserve.NewLoggerObserver(log.Default())
memoryMetrics := memoryobserve.NewMetricsRegistry()
agentCacheRepo = dao.NewAgentCache(rdb)
taskRepo := dao.NewTaskDAO(db)
taskServiceRepo := taskdao.NewTaskDAO(db)
taskClassRepo := dao.NewTaskClassDAO(db)
scheduleServiceRepo := scheduledao.NewScheduleDAO(db)
manager = dao.NewManager(db)
agentRepo = dao.NewAgentDAO(db)
outboxRepo = outboxinfra.NewRepository(db)
// 1. fallback 仅用于 RPC 开关关闭时的迁移期回退,不再启动 agent outbox event bus。
// 2. fallback 产生的事件仍写入服务级 outbox 表,由 cmd/agent / cmd/task 独立进程负责 relay / consume。
eventPublisher := buildCoreOutboxPublisher(outboxRepo)
if err := eventsvc.RegisterTaskUrgencyPromoteRoute(); err != nil {
return nil, fmt.Errorf("failed to register task outbox route: %w", err)
}
taskOutboxPublisher := buildTaskOutboxPublisher(outboxRepo)
taskSv := tasksv.NewTaskService(taskServiceRepo, cacheRepo, taskOutboxPublisher)
taskSv.SetActiveScheduleDAO(manager.ActiveSchedule)
scheduleService := schedulesv.NewScheduleService(scheduleServiceRepo, taskClassRepo, manager, cacheRepo)
agentService = agentsv.NewAgentService(
llmService,
agentRepo,
taskRepo,
cacheRepo,
agentCacheRepo,
manager.ActiveSchedule,
manager.ActiveScheduleSession,
eventPublisher,
)
// 1. 仍由启动装配层注入旧 service 的排程能力,避免 agent/sv 反向 import 旧 service 形成循环依赖。
// 2. 后续 schedule/task 完全走 RPC 后,这两个函数注入点可继续缩掉。
agentService.SmartPlanningMultiRawFunc = scheduleService.SmartPlanningMultiRaw
agentService.HybridScheduleWithPlanMultiFunc = scheduleService.HybridScheduleWithPlanMulti
agentService.ResolvePlanningWindowFunc = scheduleService.ResolvePlanningWindowByTaskClasses
agentService.GetTasksWithUrgencyPromotionFunc = taskSv.GetTasksWithUrgencyPromotion
configureAgentService(
agentService,
ragRuntime,
agentRepo,
cacheRepo,
taskClient,
taskClassClient,
scheduleClient,
memoryClient,
memoryCfg,
memoryObserver,
memoryMetrics,
)
// 1. task_pool facts 已统一走 task RPC避免聊天 rerun 继续直连 tasks 表;
// 2. schedule facts / feedback / apply 已统一走 schedule RPC避免聊天 rerun 继续直连 schedule 表。
activeTaskAdapter, err := activeadapters.NewTaskRPCAdapter(activeadapters.TaskRPCConfig{
Endpoints: viper.GetStringSlice("task.rpc.endpoints"),
Target: viper.GetString("task.rpc.target"),
Timeout: viper.GetDuration("task.rpc.timeout"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize task rpc adapter for active-scheduler rerun: %w", err)
}
activeScheduleAdapter, err := activeadapters.NewScheduleRPCAdapter(activeadapters.ScheduleRPCConfig{
Endpoints: viper.GetStringSlice("schedule.rpc.endpoints"),
Target: viper.GetString("schedule.rpc.target"),
Timeout: viper.GetDuration("schedule.rpc.timeout"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize schedule rpc adapter for active-scheduler rerun: %w", err)
}
activeScheduleDryRun, err := activesvc.NewDryRunService(activeadapters.ReadersWithScheduleRPC(activeTaskAdapter, activeScheduleAdapter))
if err != nil {
return nil, err
}
activeSchedulePreviewConfirm, err := buildActiveSchedulePreviewConfirmService(manager.ActiveSchedule, activeScheduleDryRun, activeScheduleAdapter)
if err != nil {
return nil, err
}
// 1. 主动调度选择器单独复用 Pro 模型LLM 失败时由 selection 层显式回退到确定性候选;
// 2. dry-run 与 selection 通过 graph runner 串起来,避免 trigger_pipeline 再拼第二套候选逻辑。
activeScheduleLLMClient := llmService.ProClient()
activeScheduleSelector := activesel.NewService(activeScheduleLLMClient)
activeScheduleFeedbackLocator := activefeedbacklocate.NewService(activeScheduleAdapter, activeScheduleLLMClient)
activeScheduleGraphRunner, err := activegraph.NewRunner(activeScheduleDryRun.AsGraphDryRunFunc(), activeScheduleSelector)
if err != nil {
return nil, err
}
agentService.SetActiveScheduleSessionRerunFunc(buildActiveScheduleSessionRerunFunc(manager.ActiveSchedule, activeScheduleGraphRunner, activeSchedulePreviewConfirm, activeScheduleFeedbackLocator))
} else {
log.Println("Gateway agent local fallback is disabled; /agent HTTP routes use cmd/agent zrpc")
}
handlers := buildAPIHandlers(taskClient, taskClassClient, courseClient, scheduleClient, agentService, agentRPCClient, memoryClient, activeSchedulerClient, notificationClient)
runtime := &appRuntime{
db: db,
redisClient: rdb,
cacheRepo: cacheRepo,
agentRepo: agentRepo,
agentCache: agentCacheRepo,
manager: manager,
outboxRepo: outboxRepo,
limiter: limiter,
handlers: handlers,
userAuthClient: userAuthClient,
forumClient: forumClient,
tokenClient: tokenClient,
}
return runtime, nil
}
// shouldBuildGatewayAgentFallback 判断 gateway 是否需要保留本地 AgentService 回退面。
//
// 职责边界:
// 1. 只读取启动期配置,不做运行时动态切换;
// 2. chat 或非 chat 任一 RPC 开关关闭时,保守装配 fallback避免旧环境无法启动
// 3. 两个开关都开启时跳过本地 agent 编排依赖,让 gateway 只保留 HTTP/SSE 门面。
func shouldBuildGatewayAgentFallback() bool {
return !viper.GetBool(gatewayAgentRPCChatEnabledKey) || !viper.GetBool(gatewayAgentRPCAPIEnabledKey)
}
func buildRAGService(ctx context.Context) (*ragservice.Service, error) {
ragCfg := ragconfig.LoadFromViper()
if !ragCfg.Enabled {
log.Println("RAG service is disabled")
return ragservice.New(ragservice.Options{}), nil
}
// 1. 当前项目尚未完成全局观测平台建设,这里先注入一层轻量 Observer
// 2. RAG 内部只依赖 Observer 接口,后续若全项目统一日志/指标系统,只需替换这里;
// 3. 这样可以避免 RAG 单独自建一套割裂的日志基础设施。
ragLogger := log.Default()
ragService, err := ragservice.NewFromConfig(ctx, ragCfg, ragservice.FactoryDeps{
Logger: ragLogger,
Observer: ragservice.NewLoggerObserver(ragLogger),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize RAG service: %w", err)
}
log.Printf("RAG service initialized: store=%s embed=%s reranker=%s", ragCfg.Store, ragCfg.EmbedProvider, ragCfg.RerankerProvider)
return ragService, nil
}
// buildCoreOutboxPublisher 构造单体残留发布器。
//
// 职责边界:
// 1. 只负责把 agent 主链路产生的跨服务事件写入对应服务 outbox 表;
// 2. 不创建 memory consumer / relaymemory 消费边界已迁往 cmd/memory
// 3. kafka.enabled=false 时返回 nil让聊天历史继续走同步 DB fallback。
func buildCoreOutboxPublisher(outboxRepo *outboxinfra.Repository) outboxinfra.EventPublisher {
kafkaCfg := kafkabus.LoadConfig()
if !kafkaCfg.Enabled || outboxRepo == nil {
return nil
}
return &repositoryOutboxPublisher{
repo: outboxRepo,
maxRetry: kafkaCfg.MaxRetry,
}
}
type repositoryOutboxPublisher struct {
repo *outboxinfra.Repository
maxRetry int
}
// buildTaskOutboxPublisher 构造单体残留 task 查询链路的发布器。
//
// 职责边界:
// 1. 只负责把 Agent 残留 TaskService 产生的 task 事件写入 task_outbox_messages
// 2. 不创建 task consumer / relay消费边界仍归 cmd/task
// 3. kafka.enabled=false 时返回 nil保持本地降级语义与旧 eventBus 一致。
func buildTaskOutboxPublisher(outboxRepo *outboxinfra.Repository) outboxinfra.EventPublisher {
kafkaCfg := kafkabus.LoadConfig()
if !kafkaCfg.Enabled || outboxRepo == nil {
return nil
}
return &repositoryOutboxPublisher{
repo: outboxRepo,
maxRetry: kafkaCfg.MaxRetry,
}
}
// Publish 以 publish-only 方式写入服务级 outbox。
//
// 说明:
// 1. 这里不复用 outbox EventBus是因为 EventBus 会创建并可能启动对应 service engine
// 2. 单体残留在 task / memory 等迁移期只允许发布跨服务事件,不允许抢对应 consumer group
// 3. payload 仍包装成统一 OutboxEventPayload确保独立服务 relay / consumer 能按标准协议解析。
func (p *repositoryOutboxPublisher) Publish(ctx context.Context, req outboxinfra.PublishRequest) error {
if p == nil || p.repo == nil {
return fmt.Errorf("outbox publisher is not initialized")
}
eventType := strings.TrimSpace(req.EventType)
if eventType == "" {
return fmt.Errorf("eventType is empty")
}
eventVersion := strings.TrimSpace(req.EventVersion)
if eventVersion == "" {
eventVersion = outboxinfra.DefaultEventVersion
}
messageKey := strings.TrimSpace(req.MessageKey)
aggregateID := strings.TrimSpace(req.AggregateID)
if aggregateID == "" {
aggregateID = messageKey
}
payloadJSON, err := json.Marshal(req.Payload)
if err != nil {
return err
}
_, err = p.repo.CreateMessage(ctx, eventType, messageKey, outboxinfra.OutboxEventPayload{
EventID: strings.TrimSpace(req.EventID),
EventType: eventType,
EventVersion: eventVersion,
AggregateID: aggregateID,
Payload: payloadJSON,
}, p.maxRetry)
return err
}
func buildCourseService(llmService *llmservice.Service, courseRepo *coursedao.CourseDAO, scheduleRepo *dao.ScheduleDAO) *coursesv.CourseService {
courseImageResponsesClient := llmService.CourseImageResponsesClient()
return coursesv.NewCourseService(
courseRepo,
scheduleRepo,
courseImageResponsesClient,
coursesv.NewCourseImageParseConfig(
viper.GetInt64("courseImport.maxImageBytes"),
viper.GetInt("courseImport.maxTokens"),
),
viper.GetString("courseImport.visionModel"),
)
}
func buildActiveSchedulePreviewConfirmService(activeDAO *dao.ActiveScheduleDAO, dryRun *activesvc.DryRunService, scheduleApplyAdapter interface {
ApplyActiveScheduleChanges(context.Context, activeapplyadapter.ApplyActiveScheduleRequest) (activeapplyadapter.ApplyActiveScheduleResult, error)
}) (*activesvc.PreviewConfirmService, error) {
previewService, err := activepreview.NewService(activeDAO)
if err != nil {
return nil, err
}
return activesvc.NewPreviewConfirmService(dryRun, previewService, activeDAO, scheduleApplyAdapter)
}
// buildActiveScheduleSessionRerunFunc 把主动调度定位器 / graph / preview 能力装成聊天入口可调用的 rerun 闭包。
//
// 说明:
// 1. 这里只做最小接线:复用现有定位器 -> trigger -> graph -> preview 组件,不把 worker/notification 再搬一遍;
// 2. 成功时返回 session 状态、assistant 文本和业务卡片数据;
// 3. 失败时直接把 error 交回聊天入口,由上层统一写失败日志和 SSE 错误。
func buildActiveScheduleSessionRerunFunc(
activeDAO *dao.ActiveScheduleDAO,
graphRunner *activegraph.Runner,
previewConfirm *activesvc.PreviewConfirmService,
feedbackLocator *activefeedbacklocate.Service,
) agentsv.ActiveScheduleSessionRerunFunc {
return func(
ctx context.Context,
session *model.ActiveScheduleSessionSnapshot,
userMessage string,
traceID string,
requestStart time.Time,
) (*agentsv.ActiveScheduleSessionRerunResult, error) {
if activeDAO == nil || graphRunner == nil || previewConfirm == nil {
return nil, fmt.Errorf("主动调度 rerun 依赖未初始化")
}
if session == nil {
return nil, fmt.Errorf("主动调度 session 不能为空")
}
triggerRow, err := activeDAO.GetTriggerByID(ctx, session.TriggerID)
if err != nil {
return nil, err
}
resolvedTargetType := activeTrigger.TargetType(triggerRow.TargetType)
resolvedTargetID := triggerRow.TargetID
needsFeedbackLocate := activeTrigger.TriggerType(triggerRow.TriggerType) == activeTrigger.TriggerTypeUnfinishedFeedback &&
(resolvedTargetID <= 0 || containsString(session.State.MissingInfo, "feedback_target"))
// 1. unfinished_feedback 在目标缺失时先走定位器,把用户补充信息转成可校验的 schedule_event。
// 2. 定位失败时直接 ask_user不硬猜 target_id也不继续跑 graph。
// 3. 定位成功后只改本次 domainTrigger 的 target_type / target_id不写正式日程。
if needsFeedbackLocate {
if feedbackLocator == nil {
question := firstNonEmptyString(
activefeedbacklocate.BuildAskUserQuestion(session.State.MissingInfo),
session.State.PendingQuestion,
)
nextState := session.State
nextState.PendingQuestion = question
nextState.MissingInfo = appendMissingString(nextState.MissingInfo, "feedback_target")
nextState.LastCandidateID = ""
nextState.LastNotificationID = ""
nextState.FailedReason = ""
nextState.ExpiresAt = nil
return &agentsv.ActiveScheduleSessionRerunResult{
AssistantText: question,
SessionState: nextState,
SessionStatus: model.ActiveScheduleSessionStatusWaitingUserReply,
}, nil
}
locateResult, locateErr := feedbackLocator.Resolve(ctx, activefeedbacklocate.Request{
UserID: triggerRow.UserID,
UserMessage: userMessage,
PendingQuestion: session.State.PendingQuestion,
MissingInfo: cloneStringSlice(session.State.MissingInfo),
})
if locateErr != nil {
return nil, locateErr
}
if locateResult.ShouldAskUser() {
question := firstNonEmptyString(
locateResult.AskUserQuestion,
activefeedbacklocate.BuildAskUserQuestion(session.State.MissingInfo),
session.State.PendingQuestion,
)
nextState := session.State
nextState.PendingQuestion = question
nextState.MissingInfo = appendMissingString(nextState.MissingInfo, "feedback_target")
nextState.LastCandidateID = ""
nextState.LastNotificationID = ""
nextState.FailedReason = ""
nextState.ExpiresAt = nil
return &agentsv.ActiveScheduleSessionRerunResult{
AssistantText: question,
SessionState: nextState,
SessionStatus: model.ActiveScheduleSessionStatusWaitingUserReply,
}, nil
}
resolvedTargetType = activeTrigger.TargetType(locateResult.TargetType)
resolvedTargetID = locateResult.TargetID
}
// 1. 定位完成后再构造 domainTrigger避免 unfinished_feedback 的 target_id 为空时误触校验失败。
// 2. 这里仍然复用现有 graph -> preview 链路,不写新排程引擎。
domainTrigger := activeTrigger.ActiveScheduleTrigger{
TriggerID: triggerRow.ID,
UserID: triggerRow.UserID,
TriggerType: activeTrigger.TriggerType(triggerRow.TriggerType),
Source: activeTrigger.SourceUserFeedback,
TargetType: resolvedTargetType,
TargetID: resolvedTargetID,
FeedbackID: triggerRow.FeedbackID,
IdempotencyKey: triggerRow.IdempotencyKey,
MockNow: nil,
IsMockTime: false,
RequestedAt: requestStart,
TraceID: traceID,
}
if err := domainTrigger.Validate(); err != nil {
return nil, err
}
graphResult, err := graphRunner.Run(ctx, domainTrigger)
if err != nil {
return nil, err
}
if graphResult == nil || graphResult.DryRunData == nil || graphResult.DryRunData.Context == nil {
return nil, fmt.Errorf("主动调度 graph 返回空结果")
}
selectionResult := graphResult.SelectionResult
state := session.State
state.LastCandidateID = strings.TrimSpace(selectionResult.SelectedCandidateID)
state.LastNotificationID = ""
state.FailedReason = ""
state.MissingInfo = cloneStringSlice(graphResult.DryRunData.Context.DerivedFacts.MissingInfo)
switch selectionResult.Action {
case activesel.ActionSelectCandidate:
if !graphResult.DryRunData.Observation.Decision.ShouldWritePreview {
return nil, fmt.Errorf("主动调度 graph 选择了候选,但未产出可写 preview")
}
previewResp, err := previewConfirm.CreatePreviewFromDryRun(ctx, activepreview.CreatePreviewRequest{
ActiveContext: graphResult.DryRunData.Context,
Observation: graphResult.DryRunData.Observation,
Candidates: graphResult.DryRunData.Candidates,
TriggerID: triggerRow.ID,
GeneratedAt: requestStart,
SelectedCandidateID: selectionResult.SelectedCandidateID,
ExplanationText: selectionResult.ExplanationText,
NotificationSummary: selectionResult.NotificationSummary,
FallbackUsed: selectionResult.FallbackUsed,
})
if err != nil {
return nil, err
}
state.PendingQuestion = ""
state.MissingInfo = nil
state.FailedReason = ""
expiresAt := previewResp.Detail.ExpiresAt
state.ExpiresAt = &expiresAt
return &agentsv.ActiveScheduleSessionRerunResult{
AssistantText: firstNonEmptyString(selectionResult.ExplanationText, selectionResult.NotificationSummary, previewResp.Detail.Explanation, previewResp.Detail.Notification, "主动调度建议已更新。"),
BusinessCard: &agentstream.StreamBusinessCardExtra{
CardType: "active_schedule_preview",
Title: "SmartFlow 日程调整建议",
Summary: firstNonEmptyString(selectionResult.NotificationSummary, previewResp.Detail.Notification, previewResp.Detail.Explanation),
Data: previewDetailToMap(previewResp.Detail),
},
SessionState: state,
SessionStatus: model.ActiveScheduleSessionStatusReadyPreview,
PreviewID: previewResp.Detail.PreviewID,
}, nil
case activesel.ActionAskUser:
question := firstNonEmptyString(selectionResult.AskUserQuestion, selectionResult.ExplanationText, "请继续补充主动调度需要的信息。")
state.PendingQuestion = question
state.ExpiresAt = nil
return &agentsv.ActiveScheduleSessionRerunResult{
AssistantText: question,
SessionState: state,
SessionStatus: model.ActiveScheduleSessionStatusWaitingUserReply,
}, nil
default:
assistantText := firstNonEmptyString(selectionResult.ExplanationText, selectionResult.NotificationSummary, "当前主动调度暂时没有需要继续处理的内容。")
state.PendingQuestion = ""
state.MissingInfo = nil
state.ExpiresAt = nil
return &agentsv.ActiveScheduleSessionRerunResult{
AssistantText: assistantText,
SessionState: state,
SessionStatus: model.ActiveScheduleSessionStatusIgnored,
}, nil
}
}
}
// previewDetailToMap 将 active_schedule preview 详情转成通用 map供 timeline business_card 直接复用。
func previewDetailToMap(detail activepreview.ActiveSchedulePreviewDetail) map[string]any {
raw, err := json.Marshal(detail)
if err != nil {
return map[string]any{}
}
var output map[string]any
if err := json.Unmarshal(raw, &output); err != nil {
return map[string]any{}
}
return output
}
// firstNonEmptyString 负责在一组候选文本里挑出第一条可展示内容。
func firstNonEmptyString(values ...string) string {
for _, value := range values {
if trimmed := strings.TrimSpace(value); trimmed != "" {
return trimmed
}
}
return ""
}
// cloneStringSlice 负责复制 string 切片,避免直接复用底层数组被后续修改。
func cloneStringSlice(values []string) []string {
if len(values) == 0 {
return nil
}
copied := make([]string, len(values))
copy(copied, values)
return copied
}
// appendMissingString 负责把缺失字段名补回状态数组,避免 ask_user 分支把原始缺失项冲掉。
func appendMissingString(values []string, next string) []string {
trimmed := strings.TrimSpace(next)
if trimmed == "" {
return cloneStringSlice(values)
}
for _, value := range values {
if strings.TrimSpace(value) == trimmed {
return cloneStringSlice(values)
}
}
result := cloneStringSlice(values)
return append(result, trimmed)
}
// containsString 负责判断 missing_info 里是否已经标记过某个缺失项。
func containsString(values []string, target string) bool {
trimmed := strings.TrimSpace(target)
if trimmed == "" {
return false
}
for _, value := range values {
if strings.TrimSpace(value) == trimmed {
return true
}
}
return false
}
func configureAgentService(
agentService *agentsv.AgentService,
ragRuntime ragservice.Runtime,
agentRepo *dao.AgentDAO,
cacheRepo *dao.CacheDAO,
taskClient agentsv.TaskRPCClient,
taskClassClient agentsv.TaskClassAgentRPCClient,
scheduleClient agentsv.ScheduleAgentRPCClient,
memoryReaderClient ports.MemoryReaderClient,
memoryCfg memorymodel.Config,
memoryObserver memoryobserve.Observer,
memoryMetrics memoryobserve.MetricsRecorder,
) {
if agentService == nil {
return
}
// agent 依赖接线。
agentService.SetAgentStateStore(dao.NewAgentStateStoreAdapter(cacheRepo))
var webSearchProvider web.SearchProvider
webProvider := viper.GetString("websearch.provider")
switch webProvider {
case "bocha":
bochaKey := viper.GetString("websearch.apiKey")
if bochaKey == "" {
log.Println("WebSearch: 博查 API Key 为空,降级为 mock")
webSearchProvider = &web.MockProvider{}
} else {
webSearchProvider = web.NewBochaProvider(bochaKey, "")
log.Println("WebSearch provider: bocha")
}
case "mock", "":
webSearchProvider = &web.MockProvider{}
log.Println("WebSearch provider: mock模拟模式")
default:
// 未识别的 provider 类型降级为 mock 并输出警告。
log.Printf("WebSearch provider %q 未识别,降级为 mock", webProvider)
webSearchProvider = &web.MockProvider{}
}
agentService.SetToolRegistry(agenttools.NewDefaultRegistryWithDeps(agenttools.DefaultRegistryDeps{
RAGRuntime: ragRuntime,
WebSearchProvider: webSearchProvider,
TaskClassWriteDeps: agenttools.TaskClassWriteDeps{
UpsertTaskClass: agentsv.NewTaskClassRPCUpsertFunc(taskClassClient),
},
}))
agentService.SetScheduleProvider(agentsv.NewScheduleRPCProvider(scheduleClient, taskClassClient))
agentService.SetCompactionStore(agentRepo)
// 1. quick task 创建 / 查询统一走 task zrpc避免 agent 工具链继续直连 tasks 表;
// 2. task-class upsert 与 schedule provider 已在 CP5 统一切到 task-class/schedule zrpc
// 3. task 服务不可用时由 quick_task 节点返回轻量失败文案,不影响 agent 其它分支。
agentService.SetQuickTaskDeps(agentsv.NewTaskRPCQuickTaskDeps(taskClient))
// 1. agent 主链路读取记忆统一走 memory zrpc避免 CP3 后继续直连本进程 memory.Module
// 2. observer / metrics 继续复用启动期装配,保证注入侧观测在 RPC 切流后不丢;
// 3. gateway 不再组装 memory.Modulememory worker / 管理能力统一交给 cmd/memory
// 4. memory 服务暂不可用时,预取链路只记录警告并软降级,不阻断聊天主流程。
agentService.SetMemoryReader(agentsv.NewMemoryRPCReader(memoryReaderClient, memoryObserver, memoryMetrics), memoryCfg)
}
func buildAPIHandlers(
taskClient ports.TaskCommandClient,
taskClassClient ports.TaskClassCommandClient,
courseClient ports.CourseCommandClient,
scheduleClient ports.ScheduleCommandClient,
agentService *agentsv.AgentService,
agentRPCClient *agentclient.Client,
memoryClient ports.MemoryCommandClient,
activeSchedulerClient ports.ActiveSchedulerCommandClient,
notificationClient ports.NotificationCommandClient,
) *api.ApiHandlers {
return &api.ApiHandlers{
TaskHandler: api.NewTaskHandler(taskClient),
TaskClassHandler: api.NewTaskClassHandler(taskClassClient),
CourseHandler: api.NewCourseHandler(courseClient),
ScheduleHandler: api.NewScheduleAPI(scheduleClient),
AgentHandler: api.NewAgentHandlerWithRPC(agentService, agentRPCClient),
MemoryHandler: api.NewMemoryHandler(memoryClient),
ActiveSchedule: api.NewActiveScheduleAPI(activeSchedulerClient),
Notification: api.NewNotificationAPI(notificationClient),
}
}
func (r *appRuntime) startWorkers(ctx context.Context) {
if r == nil {
return
}
log.Println("Gateway outbox worker is disabled; agent relay/consumer is managed by cmd/agent")
log.Println("Memory worker is managed by cmd/memory in phase 6")
}
func (r *appRuntime) startHTTP(ctx context.Context) {
router := gatewayrouter.RegisterRouters(r.handlers, r.userAuthClient, r.forumClient, r.tokenClient, r.cacheRepo, r.limiter)
gatewayrouter.StartEngine(ctx, router)
}
func (r *appRuntime) close() {
if r == nil {
return
}
}