From 9902ca3563133d434f27eaa77e864c5c06b5b332 Mon Sep 17 00:00:00 2001 From: Losita <2810873701@qq.com> Date: Sun, 3 May 2026 23:21:03 +0800 Subject: [PATCH] Version: 0.9.65.dev.260503 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 后端: 1. 阶段 1.5/1.6 收口 llm-service / rag-service,统一模型出口与检索基础设施入口,清退 backend/infra/llm 与 backend/infra/rag 旧实现; 2. 同步更新相关调用链与微服务迁移计划文档 --- .../feedbacklocate/service.go | 16 +-- backend/active_scheduler/selection/service.go | 16 +-- backend/cmd/start.go | 50 ++++---- backend/memory/module.go | 18 +-- .../orchestrator/llm_decision_orchestrator.go | 18 +-- .../orchestrator/llm_write_orchestrator.go | 14 +-- backend/memory/service/read_scope.go | 6 +- backend/memory/service/read_service.go | 8 +- backend/memory/vectorsync/syncer.go | 12 +- backend/memory/worker/decision_flow.go | 6 +- backend/memory/worker/runner.go | 6 +- backend/newAgent/model/graph_run_state.go | 18 +-- backend/newAgent/node/chat.go | 32 ++--- backend/newAgent/node/deliver.go | 8 +- .../newAgent/node/execute/action_router.go | 6 +- backend/newAgent/node/execute/run.go | 4 +- backend/newAgent/node/plan.go | 14 +-- backend/newAgent/node/quick_task.go | 8 +- backend/newAgent/node/unified_compact.go | 4 +- backend/newAgent/prompt/compact_msg1.go | 6 +- backend/newAgent/prompt/compact_msg2.go | 6 +- backend/newAgent/router/decision_parser.go | 4 +- backend/newAgent/shared/node_thinking.go | 8 +- .../newAgent/shared/node_unified_compact.go | 4 +- backend/newAgent/stream/emitter.go | 6 +- backend/newAgent/tools/registry.go | 4 +- backend/service/agent_bridge.go | 10 +- backend/service/agentsvc/agent.go | 25 ++-- backend/service/agentsvc/agent_meta.go | 34 +++--- backend/service/agentsvc/agent_newagent.go | 23 ++-- .../service/agentsvc/agent_stream_fallback.go | 28 +++-- backend/service/agentsvc/reasoning_summary.go | 12 +- backend/service/course.go | 6 +- backend/service/course_parse_ark.go | 20 ++-- backend/{infra => services}/llm/ark.go | 18 +-- .../{infra => services}/llm/ark_adapter.go | 26 ++-- .../llm/ark_responses_client.go | 21 ++-- backend/{infra => services}/llm/client.go | 79 +++---------- backend/{infra => services}/llm/json.go | 20 +--- backend/services/llm/service.go | 109 +++++++++++++++++ .../rag/service.go => services/rag/api.go} | 2 +- .../rag/chunk/text_chunker.go | 2 +- .../{infra => services}/rag/config/config.go | 0 .../{infra => services}/rag/core/errors.go | 0 .../rag/core/interfaces.go | 0 .../{infra => services}/rag/core/observer.go | 4 +- .../{infra => services}/rag/core/pipeline.go | 0 backend/{infra => services}/rag/core/types.go | 0 .../{infra => services}/rag/corpus/common.go | 0 .../rag/corpus/memory_corpus.go | 2 +- .../rag/corpus/web_corpus.go | 2 +- .../rag/embed/eino_embedder.go | 0 .../rag/embed/mock_embedder.go | 0 backend/{infra => services}/rag/factory.go | 12 +- backend/{infra => services}/rag/observe.go | 2 +- backend/{infra => services}/rag/rag.go | 10 +- .../rag/rerank/eino_reranker.go | 2 +- .../rag/rerank/noop_reranker.go | 2 +- .../rag/retrieve/vector_retriever.go | 2 +- backend/{infra => services}/rag/runtime.go | 8 +- backend/services/rag/service.go | 111 ++++++++++++++++++ .../rag/store/inmemory_store.go | 2 +- .../rag/store/milvus_store.go | 2 +- .../rag/store/vector_store.go | 2 +- .../微服务四步迁移与第二阶段并行开发计划.md | 26 +++- 65 files changed, 550 insertions(+), 376 deletions(-) rename backend/{infra => services}/llm/ark.go (73%) rename backend/{infra => services}/llm/ark_adapter.go (68%) rename backend/{infra => services}/llm/ark_responses_client.go (93%) rename backend/{infra => services}/llm/client.go (56%) rename backend/{infra => services}/llm/json.go (67%) create mode 100644 backend/services/llm/service.go rename backend/{infra/rag/service.go => services/rag/api.go} (97%) rename backend/{infra => services}/rag/chunk/text_chunker.go (96%) rename backend/{infra => services}/rag/config/config.go (100%) rename backend/{infra => services}/rag/core/errors.go (100%) rename backend/{infra => services}/rag/core/interfaces.go (100%) rename backend/{infra => services}/rag/core/observer.go (97%) rename backend/{infra => services}/rag/core/pipeline.go (100%) rename backend/{infra => services}/rag/core/types.go (100%) rename backend/{infra => services}/rag/corpus/common.go (100%) rename backend/{infra => services}/rag/corpus/memory_corpus.go (98%) rename backend/{infra => services}/rag/corpus/web_corpus.go (98%) rename backend/{infra => services}/rag/embed/eino_embedder.go (100%) rename backend/{infra => services}/rag/embed/mock_embedder.go (100%) rename backend/{infra => services}/rag/factory.go (91%) rename backend/{infra => services}/rag/observe.go (92%) rename backend/{infra => services}/rag/rag.go (60%) rename backend/{infra => services}/rag/rerank/eino_reranker.go (86%) rename backend/{infra => services}/rag/rerank/noop_reranker.go (91%) rename backend/{infra => services}/rag/retrieve/vector_retriever.go (97%) rename backend/{infra => services}/rag/runtime.go (97%) create mode 100644 backend/services/rag/service.go rename backend/{infra => services}/rag/store/inmemory_store.go (98%) rename backend/{infra => services}/rag/store/milvus_store.go (99%) rename backend/{infra => services}/rag/store/vector_store.go (75%) diff --git a/backend/active_scheduler/feedbacklocate/service.go b/backend/active_scheduler/feedbacklocate/service.go index 6fa5897..4e07fcf 100644 --- a/backend/active_scheduler/feedbacklocate/service.go +++ b/backend/active_scheduler/feedbacklocate/service.go @@ -11,7 +11,7 @@ import ( "github.com/LoveLosita/smartflow/backend/active_scheduler/ports" "github.com/LoveLosita/smartflow/backend/active_scheduler/trigger" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) const locateMaxTokens = 800 @@ -24,7 +24,7 @@ const locateMaxTokens = 800 // 3. 不创建新工具系统,也不直接产出 preview。 type Service struct { reader ports.ScheduleReader - client *infrallm.Client + client *llmservice.Client clock func() time.Time logger *log.Logger } @@ -34,7 +34,7 @@ type Service struct { // 说明: // 1. reader / client 允许为空,方便在模型不可用或读模型暂时不可用时直接回退 ask_user。 // 2. 真正的定位能力只在 Resolve 内部按需启用。 -func NewService(reader ports.ScheduleReader, client *infrallm.Client) *Service { +func NewService(reader ports.ScheduleReader, client *llmservice.Client) *Service { return &Service{ reader: reader, client: client, @@ -101,15 +101,15 @@ func (s *Service) Resolve(ctx context.Context, req Request) (Result, error) { return s.buildAskUserResult(req, "定位 prompt 构造失败"), nil } - messages := infrallm.BuildSystemUserMessages(strings.TrimSpace(locateSystemPrompt), nil, userPrompt) - resp, rawResult, err := infrallm.GenerateJSON[llmResponse]( + messages := llmservice.BuildSystemUserMessages(strings.TrimSpace(locateSystemPrompt), nil, userPrompt) + resp, rawResult, err := llmservice.GenerateJSON[llmResponse]( ctx, s.client, messages, - infrallm.GenerateOptions{ + llmservice.GenerateOptions{ Temperature: 0.1, MaxTokens: locateMaxTokens, - Thinking: infrallm.ThinkingModeDisabled, + Thinking: llmservice.ThinkingModeDisabled, Metadata: map[string]any{ "stage": "active_scheduler_feedback_locate", "candidate_count": len(candidates), @@ -340,7 +340,7 @@ func cloneAndTrimStrings(values []string) []string { return result } -func truncateRaw(raw *infrallm.TextResult) string { +func truncateRaw(raw *llmservice.TextResult) string { if raw == nil { return "" } diff --git a/backend/active_scheduler/selection/service.go b/backend/active_scheduler/selection/service.go index 4f9bf11..9f52d52 100644 --- a/backend/active_scheduler/selection/service.go +++ b/backend/active_scheduler/selection/service.go @@ -10,7 +10,7 @@ import ( "time" "github.com/LoveLosita/smartflow/backend/active_scheduler/candidate" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) const selectionMaxTokens = 1200 @@ -22,7 +22,7 @@ const selectionMaxTokens = 1200 // 2. LLM 失败、输出非法或选择不存在候选时,回退到后端 fallback candidate; // 3. 不写 preview、不发通知、不修改正式日程。 type Service struct { - client *infrallm.Client + client *llmservice.Client clock func() time.Time logger *log.Logger } @@ -33,7 +33,7 @@ type Service struct { // 1. client 允许为空;为空时选择器只走确定性 fallback,便于本地测试和降级; // 2. 真正的模型接入在 cmd/start.go 中完成:aiHub.Pro -> llm.Client -> selection.Service; // 3. 选择器本身不持有模型配置,只表达本业务域的 prompt 和结果校验。 -func NewService(client *infrallm.Client) *Service { +func NewService(client *llmservice.Client) *Service { return &Service{ client: client, clock: time.Now, @@ -70,19 +70,19 @@ func (s *Service) Select(ctx context.Context, req SelectRequest) (Result, error) return buildFallbackResult(req, "选择器 prompt 构造失败: "+err.Error()), nil } - messages := infrallm.BuildSystemUserMessages( + messages := llmservice.BuildSystemUserMessages( strings.TrimSpace(selectionSystemPrompt), nil, userPrompt, ) - resp, rawResult, err := infrallm.GenerateJSON[llmSelectionResponse]( + resp, rawResult, err := llmservice.GenerateJSON[llmSelectionResponse]( ctx, s.client, messages, - infrallm.GenerateOptions{ + llmservice.GenerateOptions{ Temperature: 0.1, MaxTokens: selectionMaxTokens, - Thinking: infrallm.ThinkingModeDisabled, + Thinking: llmservice.ThinkingModeDisabled, Metadata: map[string]any{ "stage": "active_scheduler_select", "candidate_count": len(req.Candidates), @@ -275,7 +275,7 @@ func firstNonEmpty(values ...string) string { return "" } -func truncateRaw(raw *infrallm.TextResult) string { +func truncateRaw(raw *llmservice.TextResult) string { if raw == nil { return "" } diff --git a/backend/cmd/start.go b/backend/cmd/start.go index 5fca1ab..b8399fb 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -23,10 +23,7 @@ import ( "github.com/LoveLosita/smartflow/backend/api" "github.com/LoveLosita/smartflow/backend/dao" kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" - infrarag "github.com/LoveLosita/smartflow/backend/infra/rag" - ragconfig "github.com/LoveLosita/smartflow/backend/infra/rag/config" "github.com/LoveLosita/smartflow/backend/inits" "github.com/LoveLosita/smartflow/backend/memory" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" @@ -44,6 +41,9 @@ import ( "github.com/LoveLosita/smartflow/backend/service" agentsvcsvc "github.com/LoveLosita/smartflow/backend/service/agentsvc" eventsvc "github.com/LoveLosita/smartflow/backend/service/events" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" + ragservice "github.com/LoveLosita/smartflow/backend/services/rag" + ragconfig "github.com/LoveLosita/smartflow/backend/services/rag/config" "github.com/go-redis/redis/v8" "github.com/spf13/viper" "gorm.io/gorm" @@ -167,17 +167,25 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { return nil, fmt.Errorf("failed to initialize Eino: %w", err) } - ragRuntime, err := buildRAGRuntime(ctx) + llmService := llmservice.New(llmservice.Options{ + AIHub: aiHub, + APIKey: os.Getenv("ARK_API_KEY"), + BaseURL: viper.GetString("agent.baseURL"), + CourseVisionModel: viper.GetString("courseImport.visionModel"), + }) + + ragService, err := buildRAGService(ctx) if err != nil { return nil, err } + ragRuntime := ragService.Runtime() memoryCfg := memory.LoadConfigFromViper() memoryObserver := memoryobserve.NewLoggerObserver(log.Default()) memoryMetrics := memoryobserve.NewMetricsRegistry() memoryModule := memory.NewModuleWithObserve( db, - infrallm.WrapArkClient(aiHub.Pro), + llmService.ProClient(), ragRuntime, memoryCfg, memory.ObserveDeps{ @@ -208,11 +216,11 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { userService := service.NewUserService(userRepo, cacheRepo) taskSv := service.NewTaskService(taskRepo, cacheRepo, eventBus) taskSv.SetActiveScheduleDAO(manager.ActiveSchedule) - courseService := buildCourseService(courseRepo, scheduleRepo) + courseService := buildCourseService(llmService, courseRepo, scheduleRepo) taskClassService := service.NewTaskClassService(taskClassRepo, cacheRepo, scheduleRepo, manager) scheduleService := service.NewScheduleService(scheduleRepo, userRepo, taskClassRepo, manager, cacheRepo) agentService := service.NewAgentServiceWithSchedule( - aiHub, + llmService, agentRepo, taskRepo, cacheRepo, @@ -251,7 +259,7 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { } // 1. 主动调度选择器单独复用 Pro 模型,LLM 失败时由 selection 层显式回退到确定性候选; // 2. dry-run 与 selection 通过 graph runner 串起来,避免 trigger_pipeline 再拼第二套候选逻辑。 - activeScheduleLLMClient := infrallm.WrapArkClient(aiHub.Pro) + activeScheduleLLMClient := llmService.ProClient() activeScheduleSelector := activesel.NewService(activeScheduleLLMClient) activeScheduleFeedbackLocator := activefeedbacklocate.NewService(activeReaders, activeScheduleLLMClient) activeScheduleGraphRunner, err := activegraph.NewRunner(activeScheduleDryRun.AsGraphDryRunFunc(), activeScheduleSelector) @@ -323,26 +331,26 @@ func buildRuntime(ctx context.Context) (*appRuntime, error) { return runtime, nil } -func buildRAGRuntime(ctx context.Context) (infrarag.Runtime, error) { +func buildRAGService(ctx context.Context) (*ragservice.Service, error) { ragCfg := ragconfig.LoadFromViper() if !ragCfg.Enabled { - log.Println("RAG runtime is disabled") - return nil, nil + log.Println("RAG service is disabled") + return ragservice.New(ragservice.Options{}), nil } // 1. 当前项目尚未完成全局观测平台建设,这里先注入一层轻量 Observer; // 2. RAG 内部只依赖 Observer 接口,后续若全项目统一日志/指标系统,只需替换这里; // 3. 这样可以避免 RAG 单独自建一套割裂的日志基础设施。 ragLogger := log.Default() - ragRuntime, err := infrarag.NewRuntimeFromConfig(ctx, ragCfg, infrarag.FactoryDeps{ + ragService, err := ragservice.NewFromConfig(ctx, ragCfg, ragservice.FactoryDeps{ Logger: ragLogger, - Observer: infrarag.NewLoggerObserver(ragLogger), + Observer: ragservice.NewLoggerObserver(ragLogger), }) if err != nil { - return nil, fmt.Errorf("failed to initialize RAG runtime: %w", err) + return nil, fmt.Errorf("failed to initialize RAG service: %w", err) } - log.Printf("RAG runtime initialized: store=%s embed=%s reranker=%s", ragCfg.Store, ragCfg.EmbedProvider, ragCfg.RerankerProvider) - return ragRuntime, nil + log.Printf("RAG service initialized: store=%s embed=%s reranker=%s", ragCfg.Store, ragCfg.EmbedProvider, ragCfg.RerankerProvider) + return ragService, nil } func buildEventBus(outboxRepo *outboxinfra.Repository) (eventsvc.OutboxBus, error) { @@ -369,12 +377,8 @@ func buildEventBus(outboxRepo *outboxinfra.Repository) (eventsvc.OutboxBus, erro return eventBus, nil } -func buildCourseService(courseRepo *dao.CourseDAO, scheduleRepo *dao.ScheduleDAO) *service.CourseService { - courseImageResponsesClient := infrallm.NewArkResponsesClient( - os.Getenv("ARK_API_KEY"), - viper.GetString("agent.baseURL"), - viper.GetString("courseImport.visionModel"), - ) +func buildCourseService(llmService *llmservice.Service, courseRepo *dao.CourseDAO, scheduleRepo *dao.ScheduleDAO) *service.CourseService { + courseImageResponsesClient := llmService.CourseImageResponsesClient() return service.NewCourseService( courseRepo, scheduleRepo, @@ -650,7 +654,7 @@ func containsString(values []string, target string) bool { func configureAgentService( agentService *service.AgentService, - ragRuntime infrarag.Runtime, + ragRuntime ragservice.Runtime, agentRepo *dao.AgentDAO, cacheRepo *dao.CacheDAO, taskRepo *dao.TaskDAO, diff --git a/backend/memory/module.go b/backend/memory/module.go index 4e50414..a1deba0 100644 --- a/backend/memory/module.go +++ b/backend/memory/module.go @@ -5,8 +5,6 @@ import ( "errors" "log" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" - infrarag "github.com/LoveLosita/smartflow/backend/infra/rag" memorycleanup "github.com/LoveLosita/smartflow/backend/memory/cleanup" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" @@ -16,6 +14,8 @@ import ( memoryvectorsync "github.com/LoveLosita/smartflow/backend/memory/vectorsync" memoryworker "github.com/LoveLosita/smartflow/backend/memory/worker" "github.com/LoveLosita/smartflow/backend/model" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" + ragservice "github.com/LoveLosita/smartflow/backend/services/rag" "gorm.io/gorm" ) @@ -28,8 +28,8 @@ import ( type Module struct { db *gorm.DB cfg memorymodel.Config - llmClient *infrallm.Client - ragRuntime infrarag.Runtime + llmClient *llmservice.Client + ragRuntime ragservice.Runtime observer memoryobserve.Observer metrics memoryobserve.MetricsRecorder @@ -64,15 +64,15 @@ func LoadConfigFromViper() memorymodel.Config { // 2. llmClient 允许为 nil,此时写入链路会自动回退到本地 fallback 抽取; // 3. ragRuntime 允许为 nil,此时读取/向量同步自动回退旧逻辑; // 4. 若后续接入统一 DI 容器,也应优先注册这个 Module,而不是把内部 repo/service 继续向外泄漏。 -func NewModule(db *gorm.DB, llmClient *infrallm.Client, ragRuntime infrarag.Runtime, cfg memorymodel.Config) *Module { +func NewModule(db *gorm.DB, llmClient *llmservice.Client, ragRuntime ragservice.Runtime, cfg memorymodel.Config) *Module { return NewModuleWithObserve(db, llmClient, ragRuntime, cfg, ObserveDeps{}) } // NewModuleWithObserve 创建带观测依赖的 memory 模块门面。 func NewModuleWithObserve( db *gorm.DB, - llmClient *infrallm.Client, - ragRuntime infrarag.Runtime, + llmClient *llmservice.Client, + ragRuntime ragservice.Runtime, cfg memorymodel.Config, deps ObserveDeps, ) *Module { @@ -228,8 +228,8 @@ func (m *Module) StartWorker(ctx context.Context) { func wireModule( db *gorm.DB, - llmClient *infrallm.Client, - ragRuntime infrarag.Runtime, + llmClient *llmservice.Client, + ragRuntime ragservice.Runtime, cfg memorymodel.Config, deps ObserveDeps, ) *Module { diff --git a/backend/memory/orchestrator/llm_decision_orchestrator.go b/backend/memory/orchestrator/llm_decision_orchestrator.go index 7f45da7..6769d5e 100644 --- a/backend/memory/orchestrator/llm_decision_orchestrator.go +++ b/backend/memory/orchestrator/llm_decision_orchestrator.go @@ -6,8 +6,8 @@ import ( "log" "strings" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) const defaultDecisionCompareMaxTokens = 600 @@ -19,13 +19,13 @@ const defaultDecisionCompareMaxTokens = 600 // 2. LLM 只输出 relation(关系类型),不输出 action,不输出 target ID; // 3. LLM 调用失败时返回 error,由上层决定是否视为 unrelated。 type LLMDecisionOrchestrator struct { - client *infrallm.Client + client *llmservice.Client cfg memorymodel.Config logger *log.Logger } // NewLLMDecisionOrchestrator 构造决策比对编排器。 -func NewLLMDecisionOrchestrator(client *infrallm.Client, cfg memorymodel.Config) *LLMDecisionOrchestrator { +func NewLLMDecisionOrchestrator(client *llmservice.Client, cfg memorymodel.Config) *LLMDecisionOrchestrator { return &LLMDecisionOrchestrator{ client: client, cfg: cfg, @@ -52,14 +52,14 @@ func (o *LLMDecisionOrchestrator) Compare( systemPrompt := buildDecisionCompareSystemPrompt() userPrompt := buildDecisionCompareUserPrompt(fact, candidate) - messages := infrallm.BuildSystemUserMessages(systemPrompt, nil, userPrompt) + messages := llmservice.BuildSystemUserMessages(systemPrompt, nil, userPrompt) // 2. 调用 LLM 做结构化输出,温度用低值保证判断稳定。 - resp, _, err := infrallm.GenerateJSON[decisionCompareResponse]( + resp, _, err := llmservice.GenerateJSON[decisionCompareResponse]( ctx, o.client, messages, - infrallm.GenerateOptions{ + llmservice.GenerateOptions{ Temperature: 0.1, MaxTokens: defaultDecisionCompareMaxTokens, Thinking: resolveMemoryThinkingMode(o.cfg.LLMThinking), @@ -127,9 +127,9 @@ func buildDecisionCompareUserPrompt(fact memorymodel.NormalizedFact, candidate m } // resolveMemoryThinkingMode 根据配置布尔值返回对应的 ThinkingMode。 -func resolveMemoryThinkingMode(enabled bool) infrallm.ThinkingMode { +func resolveMemoryThinkingMode(enabled bool) llmservice.ThinkingMode { if enabled { - return infrallm.ThinkingModeEnabled + return llmservice.ThinkingModeEnabled } - return infrallm.ThinkingModeDisabled + return llmservice.ThinkingModeDisabled } diff --git a/backend/memory/orchestrator/llm_write_orchestrator.go b/backend/memory/orchestrator/llm_write_orchestrator.go index ca15fa1..c5121a6 100644 --- a/backend/memory/orchestrator/llm_write_orchestrator.go +++ b/backend/memory/orchestrator/llm_write_orchestrator.go @@ -7,9 +7,9 @@ import ( "log" "strings" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" memoryutils "github.com/LoveLosita/smartflow/backend/memory/utils" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) const ( @@ -24,13 +24,13 @@ const ( // 2. 不负责落库,不负责任务状态机推进; // 3. 当 LLM 不可用或输出异常时,回退到保守的本地抽取,保证链路不完全断。 type LLMWriteOrchestrator struct { - client *infrallm.Client + client *llmservice.Client cfg memorymodel.Config logger *log.Logger } // NewLLMWriteOrchestrator 构造 LLM 版记忆写入编排器。 -func NewLLMWriteOrchestrator(client *infrallm.Client, cfg memorymodel.Config) *LLMWriteOrchestrator { +func NewLLMWriteOrchestrator(client *llmservice.Client, cfg memorymodel.Config) *LLMWriteOrchestrator { return &LLMWriteOrchestrator{ client: client, cfg: cfg, @@ -54,17 +54,17 @@ func (o *LLMWriteOrchestrator) ExtractFacts(ctx context.Context, payload memorym return fallbackNormalizedFacts(payload), nil } - messages := infrallm.BuildSystemUserMessages( + messages := llmservice.BuildSystemUserMessages( buildMemoryExtractSystemPrompt(o.cfg.ExtractPrompt), nil, buildMemoryExtractUserPrompt(payload), ) - resp, rawResult, err := infrallm.GenerateJSON[memoryExtractResponse]( + resp, rawResult, err := llmservice.GenerateJSON[memoryExtractResponse]( ctx, o.client, messages, - infrallm.GenerateOptions{ + llmservice.GenerateOptions{ Temperature: clampTemperature(o.cfg.LLMTemperature), MaxTokens: defaultMemoryExtractMaxTokens, Thinking: resolveMemoryThinkingMode(o.cfg.LLMThinking), @@ -319,7 +319,7 @@ func isSkipIntent(intent string) bool { } } -func truncateForLog(raw *infrallm.TextResult) string { +func truncateForLog(raw *llmservice.TextResult) string { if raw == nil { return "" } diff --git a/backend/memory/service/read_scope.go b/backend/memory/service/read_scope.go index ed8c376..a4c03a1 100644 --- a/backend/memory/service/read_scope.go +++ b/backend/memory/service/read_scope.go @@ -3,8 +3,8 @@ package service import ( "time" - infrarag "github.com/LoveLosita/smartflow/backend/infra/rag" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" + ragservice "github.com/LoveLosita/smartflow/backend/services/rag" ) // buildReadScopedItemQuery 构造读侧统一使用的 MySQL 查询条件。 @@ -53,8 +53,8 @@ func buildReadScopedRAGRequest( req memorymodel.RetrieveRequest, topK int, threshold float64, -) infrarag.MemoryRetrieveRequest { - return infrarag.MemoryRetrieveRequest{ +) ragservice.MemoryRetrieveRequest { + return ragservice.MemoryRetrieveRequest{ Query: req.Query, TopK: topK, Threshold: threshold, diff --git a/backend/memory/service/read_service.go b/backend/memory/service/read_service.go index 4302057..158c84e 100644 --- a/backend/memory/service/read_service.go +++ b/backend/memory/service/read_service.go @@ -8,12 +8,12 @@ import ( "strings" "time" - infrarag "github.com/LoveLosita/smartflow/backend/infra/rag" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" memoryrepo "github.com/LoveLosita/smartflow/backend/memory/repo" memoryutils "github.com/LoveLosita/smartflow/backend/memory/utils" "github.com/LoveLosita/smartflow/backend/model" + ragservice "github.com/LoveLosita/smartflow/backend/services/rag" ) const ( @@ -30,7 +30,7 @@ const ( type ReadService struct { itemRepo *memoryrepo.ItemRepo settingsRepo *memoryrepo.SettingsRepo - ragRuntime infrarag.Runtime + ragRuntime ragservice.Runtime cfg memorymodel.Config observer memoryobserve.Observer metrics memoryobserve.MetricsRecorder @@ -57,7 +57,7 @@ type semanticRetrieveTelemetry struct { func NewReadService( itemRepo *memoryrepo.ItemRepo, settingsRepo *memoryrepo.SettingsRepo, - ragRuntime infrarag.Runtime, + ragRuntime ragservice.Runtime, cfg memorymodel.Config, observer memoryobserve.Observer, metrics memoryobserve.MetricsRecorder, @@ -347,7 +347,7 @@ func collectMemoryIDs(items []model.MemoryItem) []int64 { return ids } -func buildMemoryDTOFromRetrieveHit(hit infrarag.RetrieveHit) (memorymodel.ItemDTO, int64) { +func buildMemoryDTOFromRetrieveHit(hit ragservice.RetrieveHit) (memorymodel.ItemDTO, int64) { memoryID := parseMemoryIDFromDocumentID(hit.DocumentID) metadata := hit.Metadata content := strings.TrimSpace(hit.Text) diff --git a/backend/memory/vectorsync/syncer.go b/backend/memory/vectorsync/syncer.go index 29bce7d..91b56ed 100644 --- a/backend/memory/vectorsync/syncer.go +++ b/backend/memory/vectorsync/syncer.go @@ -6,10 +6,10 @@ import ( "log" "strings" - infrarag "github.com/LoveLosita/smartflow/backend/infra/rag" memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" memoryrepo "github.com/LoveLosita/smartflow/backend/memory/repo" "github.com/LoveLosita/smartflow/backend/model" + ragservice "github.com/LoveLosita/smartflow/backend/services/rag" ) // Syncer 负责 memory_items 与向量库之间的最小桥接。 @@ -19,7 +19,7 @@ import ( // 2. 不负责决定哪些记忆该写、该删、该恢复,这些决策仍由上游 service/worker/cleanup 控制; // 3. 同步失败时只回写 vector_status 并打观测,不反向回滚业务事务,避免把在线链路拖成强依赖。 type Syncer struct { - ragRuntime infrarag.Runtime + ragRuntime ragservice.Runtime itemRepo *memoryrepo.ItemRepo observer memoryobserve.Observer metrics memoryobserve.MetricsRecorder @@ -27,7 +27,7 @@ type Syncer struct { } func NewSyncer( - ragRuntime infrarag.Runtime, + ragRuntime ragservice.Runtime, itemRepo *memoryrepo.ItemRepo, observer memoryobserve.Observer, metrics memoryobserve.MetricsRecorder, @@ -53,9 +53,9 @@ func (s *Syncer) Upsert(ctx context.Context, traceID string, items []model.Memor return } - requestItems := make([]infrarag.MemoryIngestItem, 0, len(items)) + requestItems := make([]ragservice.MemoryIngestItem, 0, len(items)) for _, item := range items { - requestItems = append(requestItems, infrarag.MemoryIngestItem{ + requestItems = append(requestItems, ragservice.MemoryIngestItem{ MemoryID: item.ID, UserID: item.UserID, ConversationID: strValue(item.ConversationID), @@ -76,7 +76,7 @@ func (s *Syncer) Upsert(ctx context.Context, traceID string, items []model.Memor result, err := s.ragRuntime.IngestMemory(memoryobserve.WithFields(ctx, map[string]any{ "trace_id": traceID, - }), infrarag.MemoryIngestRequest{ + }), ragservice.MemoryIngestRequest{ TraceID: traceID, Action: "add", Items: requestItems, diff --git a/backend/memory/worker/decision_flow.go b/backend/memory/worker/decision_flow.go index 9be964a..e2d080d 100644 --- a/backend/memory/worker/decision_flow.go +++ b/backend/memory/worker/decision_flow.go @@ -4,11 +4,11 @@ import ( "context" "fmt" - infrarag "github.com/LoveLosita/smartflow/backend/infra/rag" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" memoryrepo "github.com/LoveLosita/smartflow/backend/memory/repo" memoryutils "github.com/LoveLosita/smartflow/backend/memory/utils" "github.com/LoveLosita/smartflow/backend/model" + ragservice "github.com/LoveLosita/smartflow/backend/services/rag" "gorm.io/gorm" ) @@ -192,7 +192,7 @@ func (r *Runner) recallCandidates( ) candidateRecallResult { // 1. 优先使用 Milvus 向量语义召回。 if r.ragRuntime != nil { - retrieveResult, err := r.ragRuntime.RetrieveMemory(ctx, infrarag.MemoryRetrieveRequest{ + retrieveResult, err := r.ragRuntime.RetrieveMemory(ctx, ragservice.MemoryRetrieveRequest{ Query: fact.Content, TopK: r.cfg.DecisionCandidateTopK, Threshold: r.cfg.DecisionCandidateMinScore, @@ -235,7 +235,7 @@ func (r *Runner) recallCandidates( // 1. 从 DocumentID(格式 memory:{id})解析出 mysql_id; // 2. 从 metadata 提取 title 和 memory_type; // 3. 跳过无法解析 DocumentID 的结果。 -func (r *Runner) buildCandidatesFromRAG(hits []infrarag.RetrieveHit) []memorymodel.CandidateSnapshot { +func (r *Runner) buildCandidatesFromRAG(hits []ragservice.RetrieveHit) []memorymodel.CandidateSnapshot { candidates := make([]memorymodel.CandidateSnapshot, 0, len(hits)) for _, hit := range hits { memoryID := parseMemoryID(hit.DocumentID) diff --git a/backend/memory/worker/runner.go b/backend/memory/worker/runner.go index 650ea0a..bd49bb2 100644 --- a/backend/memory/worker/runner.go +++ b/backend/memory/worker/runner.go @@ -9,7 +9,6 @@ import ( "strings" "time" - infrarag "github.com/LoveLosita/smartflow/backend/infra/rag" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" memoryorchestrator "github.com/LoveLosita/smartflow/backend/memory/orchestrator" @@ -17,6 +16,7 @@ import ( memoryutils "github.com/LoveLosita/smartflow/backend/memory/utils" memoryvectorsync "github.com/LoveLosita/smartflow/backend/memory/vectorsync" "github.com/LoveLosita/smartflow/backend/model" + ragservice "github.com/LoveLosita/smartflow/backend/services/rag" "gorm.io/gorm" ) @@ -41,7 +41,7 @@ type Runner struct { auditRepo *memoryrepo.AuditRepo settingsRepo *memoryrepo.SettingsRepo extractor Extractor - ragRuntime infrarag.Runtime + ragRuntime ragservice.Runtime logger *log.Logger vectorSyncer *memoryvectorsync.Syncer observer memoryobserve.Observer @@ -63,7 +63,7 @@ func NewRunner( auditRepo *memoryrepo.AuditRepo, settingsRepo *memoryrepo.SettingsRepo, extractor Extractor, - ragRuntime infrarag.Runtime, + ragRuntime ragservice.Runtime, cfg memorymodel.Config, decisionOrchestrator *memoryorchestrator.LLMDecisionOrchestrator, vectorSyncer *memoryvectorsync.Syncer, diff --git a/backend/newAgent/model/graph_run_state.go b/backend/newAgent/model/graph_run_state.go index 7193fae..52d0056 100644 --- a/backend/newAgent/model/graph_run_state.go +++ b/backend/newAgent/model/graph_run_state.go @@ -5,10 +5,10 @@ import ( "strings" "time" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" schedule "github.com/LoveLosita/smartflow/backend/newAgent/tools/schedule" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" "github.com/cloudwego/eino/schema" ) @@ -71,10 +71,10 @@ type PersistVisibleMessageFunc func(ctx context.Context, state *CommonState, msg // 2. Chat/Plan/Execute/Deliver 允许分别挂不同 client,但也允许先复用同一个 client; // 3. ChunkEmitter 统一承接阶段提示、正文、工具事件、确认请求等 SSE 输出。 type AgentGraphDeps struct { - ChatClient *infrallm.Client - PlanClient *infrallm.Client - ExecuteClient *infrallm.Client - DeliverClient *infrallm.Client + ChatClient *llmservice.Client + PlanClient *llmservice.Client + ExecuteClient *llmservice.Client + DeliverClient *llmservice.Client ChunkEmitter *newagentstream.ChunkEmitter StateStore AgentStateStore ToolRegistry *newagenttools.ToolRegistry @@ -141,7 +141,7 @@ func (d *AgentGraphDeps) EnsureChunkEmitter() *newagentstream.ChunkEmitter { } // ResolveChatClient 返回 chat 阶段可用的模型客户端。 -func (d *AgentGraphDeps) ResolveChatClient() *infrallm.Client { +func (d *AgentGraphDeps) ResolveChatClient() *llmservice.Client { if d == nil { return nil } @@ -154,7 +154,7 @@ func (d *AgentGraphDeps) ResolveChatClient() *infrallm.Client { // 1. 优先使用显式注入的 PlanClient; // 2. 若未单独注入,则回退到 ChatClient; // 3. 这样在骨架期可先用一套 client 跑通,再按需拆分 strategist / worker。 -func (d *AgentGraphDeps) ResolvePlanClient() *infrallm.Client { +func (d *AgentGraphDeps) ResolvePlanClient() *llmservice.Client { if d == nil { return nil } @@ -165,7 +165,7 @@ func (d *AgentGraphDeps) ResolvePlanClient() *infrallm.Client { } // ResolveExecuteClient 返回 execute 阶段可用的模型客户端。 -func (d *AgentGraphDeps) ResolveExecuteClient() *infrallm.Client { +func (d *AgentGraphDeps) ResolveExecuteClient() *llmservice.Client { if d == nil { return nil } @@ -179,7 +179,7 @@ func (d *AgentGraphDeps) ResolveExecuteClient() *infrallm.Client { } // ResolveDeliverClient 返回 deliver 阶段可用的模型客户端。 -func (d *AgentGraphDeps) ResolveDeliverClient() *infrallm.Client { +func (d *AgentGraphDeps) ResolveDeliverClient() *llmservice.Client { if d == nil { return nil } diff --git a/backend/newAgent/node/chat.go b/backend/newAgent/node/chat.go index 24876bc..aa3d63b 100644 --- a/backend/newAgent/node/chat.go +++ b/backend/newAgent/node/chat.go @@ -11,11 +11,11 @@ import ( "github.com/cloudwego/eino/schema" "github.com/google/uuid" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" newagentrouter "github.com/LoveLosita/smartflow/backend/newAgent/router" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) const ( @@ -50,7 +50,7 @@ type ChatNodeInput struct { UserInput string ConfirmAction string ResumeInteractionID string - Client *infrallm.Client + Client *llmservice.Client ChunkEmitter *newagentstream.ChunkEmitter CompactionStore newagentmodel.CompactionStore // 上下文压缩持久化 PersistVisibleMessage newagentmodel.PersistVisibleMessageFunc @@ -107,9 +107,9 @@ func RunChatNode(ctx context.Context, input ChatNodeInput) error { }) logNodeLLMContext(chatStageName, "routing", flowState, messages) - reader, err := input.Client.Stream(ctx, messages, infrallm.GenerateOptions{ + reader, err := input.Client.Stream(ctx, messages, llmservice.GenerateOptions{ Temperature: 0.7, - Thinking: infrallm.ThinkingModeDisabled, + Thinking: llmservice.ThinkingModeDisabled, Metadata: map[string]any{ "stage": chatStageName, "phase": "routing", @@ -172,7 +172,7 @@ func isExecuteLoopClosedMarker(msg *schema.Message) bool { // 3. 控制码解析超时或流异常结束 → fallback 到 plan。 func streamAndDispatch( ctx context.Context, - reader infrallm.StreamReader, + reader llmservice.StreamReader, parser *newagentrouter.StreamRouteParser, input ChatNodeInput, emitter *newagentstream.ChunkEmitter, @@ -292,7 +292,7 @@ func resolveEffectiveThinking(mode string, route newagentmodel.ChatRoute, decisi // 2. thinking=true:关闭路由流,发起第二次 thinking 流式调用。 func handleDirectReplyStream( ctx context.Context, - reader infrallm.StreamReader, + reader llmservice.StreamReader, input ChatNodeInput, emitter *newagentstream.ChunkEmitter, conversationContext *newagentmodel.ConversationContext, @@ -309,7 +309,7 @@ func handleDirectReplyStream( // handleThinkingReplyStream 处理需要思考的回复:关闭路由流 → 第二次 thinking 流式调用。 func handleThinkingReplyStream( ctx context.Context, - reader infrallm.StreamReader, + reader llmservice.StreamReader, input ChatNodeInput, emitter *newagentstream.ChunkEmitter, conversationContext *newagentmodel.ConversationContext, @@ -327,10 +327,10 @@ func handleThinkingReplyStream( StatusBlockID: chatStatusBlockID, }) logNodeLLMContext(chatStageName, "direct_reply_thinking", flowState, deepMessages) - deepReader, err := input.Client.Stream(ctx, deepMessages, infrallm.GenerateOptions{ + deepReader, err := input.Client.Stream(ctx, deepMessages, llmservice.GenerateOptions{ Temperature: 0.5, MaxTokens: 2000, - Thinking: infrallm.ThinkingModeEnabled, + Thinking: llmservice.ThinkingModeEnabled, Metadata: map[string]any{ "stage": chatStageName, "phase": "direct_reply_thinking", @@ -363,7 +363,7 @@ func handleThinkingReplyStream( // handleDirectReplyContinueStream 处理无思考的闲聊:同一流续传。 func handleDirectReplyContinueStream( ctx context.Context, - reader infrallm.StreamReader, + reader llmservice.StreamReader, input ChatNodeInput, emitter *newagentstream.ChunkEmitter, conversationContext *newagentmodel.ConversationContext, @@ -419,7 +419,7 @@ func handleDirectReplyContinueStream( // 2. 推送轻量状态通知; // 3. 设置流程状态,进入 Execute 或 RoughBuild。 func handleRouteExecuteStream( - reader infrallm.StreamReader, + reader llmservice.StreamReader, emitter *newagentstream.ChunkEmitter, flowState *newagentmodel.CommonState, decision *newagentmodel.ChatRoutingDecision, @@ -674,7 +674,7 @@ func isExplicitNoRefineAfterRoughBuildRequest(userInput string) bool { // 4. 完整回复写入 history。 func handleDeepAnswerStream( ctx context.Context, - reader infrallm.StreamReader, + reader llmservice.StreamReader, input ChatNodeInput, emitter *newagentstream.ChunkEmitter, conversationContext *newagentmodel.ConversationContext, @@ -685,9 +685,9 @@ func handleDeepAnswerStream( _ = reader.Close() // 2. 第二次流式调用。 - thinkingOpt := infrallm.ThinkingModeDisabled + thinkingOpt := llmservice.ThinkingModeDisabled if effectiveThinking { - thinkingOpt = infrallm.ThinkingModeEnabled + thinkingOpt = llmservice.ThinkingModeEnabled } deepMessages := newagentprompt.BuildDeepAnswerMessages(flowState, conversationContext, input.UserInput) deepMessages = compactUnifiedMessagesIfNeeded(ctx, deepMessages, UnifiedCompactInput{ @@ -699,7 +699,7 @@ func handleDeepAnswerStream( StatusBlockID: chatStatusBlockID, }) logNodeLLMContext(chatStageName, "deep_answer", flowState, deepMessages) - deepReader, err := input.Client.Stream(ctx, deepMessages, infrallm.GenerateOptions{ + deepReader, err := input.Client.Stream(ctx, deepMessages, llmservice.GenerateOptions{ Temperature: 0.5, MaxTokens: 2000, Thinking: thinkingOpt, @@ -741,7 +741,7 @@ func handleDeepAnswerStream( // handleRoutePlanStream 处理规划路由:推送状态确认 → 设 PhasePlanning。 func handleRoutePlanStream( - reader infrallm.StreamReader, + reader llmservice.StreamReader, emitter *newagentstream.ChunkEmitter, flowState *newagentmodel.CommonState, effectiveThinking bool, diff --git a/backend/newAgent/node/deliver.go b/backend/newAgent/node/deliver.go index 6e0da3a..3082c1a 100644 --- a/backend/newAgent/node/deliver.go +++ b/backend/newAgent/node/deliver.go @@ -9,10 +9,10 @@ import ( "github.com/cloudwego/eino/schema" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) const ( @@ -31,7 +31,7 @@ const ( type DeliverNodeInput struct { RuntimeState *newagentmodel.AgentRuntimeState ConversationContext *newagentmodel.ConversationContext - Client *infrallm.Client + Client *llmservice.Client ChunkEmitter *newagentstream.ChunkEmitter ThinkingEnabled bool // 是否开启 thinking,由 config.yaml 的 agent.thinking.deliver 注入 CompactionStore newagentmodel.CompactionStore // 上下文压缩持久化 @@ -128,7 +128,7 @@ func RunDeliverNode(ctx context.Context, input DeliverNodeInput) error { // - streamed:true 表示文本已通过 EmitStreamAssistantText 真流式推送到前端,调用方无需再伪流式。 func generateDeliverSummary( ctx context.Context, - client *infrallm.Client, + client *llmservice.Client, flowState *newagentmodel.CommonState, conversationContext *newagentmodel.ConversationContext, thinkingEnabled bool, @@ -162,7 +162,7 @@ func generateDeliverSummary( reader, err := client.Stream( ctx, messages, - infrallm.GenerateOptions{ + llmservice.GenerateOptions{ Temperature: 0.5, MaxTokens: 800, Thinking: resolveThinkingMode(thinkingEnabled), diff --git a/backend/newAgent/node/execute/action_router.go b/backend/newAgent/node/execute/action_router.go index d628a58..399a358 100644 --- a/backend/newAgent/node/execute/action_router.go +++ b/backend/newAgent/node/execute/action_router.go @@ -8,11 +8,11 @@ import ( "log" "strings" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" newagentrouter "github.com/LoveLosita/smartflow/backend/newAgent/router" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" "github.com/cloudwego/eino/schema" "github.com/google/uuid" ) @@ -38,7 +38,7 @@ func collectExecuteDecisionFromLLM( reader, err := input.Client.Stream( ctx, messages, - infrallm.GenerateOptions{ + llmservice.GenerateOptions{ Temperature: 1.0, MaxTokens: 131072, Thinking: newagentshared.ResolveThinkingMode(input.ThinkingEnabled), @@ -123,7 +123,7 @@ func collectExecuteDecisionFromLLM( return nil, nil } - decision, parseErr := infrallm.ParseJSONObject[newagentmodel.ExecuteDecision](result.DecisionJSON) + decision, parseErr := llmservice.ParseJSONObject[newagentmodel.ExecuteDecision](result.DecisionJSON) if parseErr != nil { log.Printf( "[DEBUG] execute LLM JSON 解析失败 chat=%s round=%d json=%s raw=%s", diff --git a/backend/newAgent/node/execute/run.go b/backend/newAgent/node/execute/run.go index ca91961..0b74e2b 100644 --- a/backend/newAgent/node/execute/run.go +++ b/backend/newAgent/node/execute/run.go @@ -5,12 +5,12 @@ import ( "fmt" newagentshared "github.com/LoveLosita/smartflow/backend/newAgent/shared" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" "github.com/LoveLosita/smartflow/backend/newAgent/tools/schedule" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) const ( @@ -29,7 +29,7 @@ type ExecuteNodeInput struct { RuntimeState *newagentmodel.AgentRuntimeState ConversationContext *newagentmodel.ConversationContext UserInput string - Client *infrallm.Client + Client *llmservice.Client ChunkEmitter *newagentstream.ChunkEmitter ResumeNode string ToolRegistry *newagenttools.ToolRegistry diff --git a/backend/newAgent/node/plan.go b/backend/newAgent/node/plan.go index aaf8e8b..6d8b2dc 100644 --- a/backend/newAgent/node/plan.go +++ b/backend/newAgent/node/plan.go @@ -10,11 +10,11 @@ import ( "github.com/google/uuid" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" newagentrouter "github.com/LoveLosita/smartflow/backend/newAgent/router" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" "github.com/cloudwego/eino/schema" ) @@ -34,7 +34,7 @@ type PlanNodeInput struct { RuntimeState *newagentmodel.AgentRuntimeState ConversationContext *newagentmodel.ConversationContext UserInput string - Client *infrallm.Client + Client *llmservice.Client ChunkEmitter *newagentstream.ChunkEmitter ResumeNode string AlwaysExecute bool // true 时计划生成后自动确认,不进入 confirm 节点 @@ -87,7 +87,7 @@ func RunPlanNode(ctx context.Context, input PlanNodeInput) error { reader, err := input.Client.Stream( ctx, messages, - infrallm.GenerateOptions{ + llmservice.GenerateOptions{ Temperature: 0.2, // 显式设置上限,避免依赖框架默认值(默认 4096)导致长决策被截断。 // 注意:当前模型接口 max_tokens 上限为 131072,超过会 400。 @@ -149,7 +149,7 @@ func RunPlanNode(ctx context.Context, input PlanNodeInput) error { return fmt.Errorf("规划解析失败,原始输出=%s", result.RawBuffer) } - decision, parseErr := infrallm.ParseJSONObject[newagentmodel.PlanDecision](result.DecisionJSON) + decision, parseErr := llmservice.ParseJSONObject[newagentmodel.PlanDecision](result.DecisionJSON) if parseErr != nil { return fmt.Errorf("规划决策 JSON 解析失败: %w (raw=%s)", parseErr, result.RawBuffer) } @@ -390,9 +390,9 @@ func buildPinnedPlanText(steps []newagentmodel.PlanStep) string { // resolveThinkingMode 根据配置布尔值返回对应的 ThinkingMode。 // 供 plan / execute / deliver 节点统一使用。 -func resolveThinkingMode(enabled bool) infrallm.ThinkingMode { +func resolveThinkingMode(enabled bool) llmservice.ThinkingMode { if enabled { - return infrallm.ThinkingModeEnabled + return llmservice.ThinkingModeEnabled } - return infrallm.ThinkingModeDisabled + return llmservice.ThinkingModeDisabled } diff --git a/backend/newAgent/node/quick_task.go b/backend/newAgent/node/quick_task.go index 82e0d38..5a4d405 100644 --- a/backend/newAgent/node/quick_task.go +++ b/backend/newAgent/node/quick_task.go @@ -8,13 +8,13 @@ import ( "strings" "time" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" taskmodel "github.com/LoveLosita/smartflow/backend/model" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" newagentrouter "github.com/LoveLosita/smartflow/backend/newAgent/router" newagentshared "github.com/LoveLosita/smartflow/backend/newAgent/shared" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" "github.com/cloudwego/eino/schema" ) @@ -30,7 +30,7 @@ type QuickTaskNodeInput struct { RuntimeState *newagentmodel.AgentRuntimeState ConversationContext *newagentmodel.ConversationContext UserInput string - Client *infrallm.Client + Client *llmservice.Client ChunkEmitter *newagentstream.ChunkEmitter QuickTaskDeps newagentmodel.QuickTaskDeps PersistVisibleMessage newagentmodel.PersistVisibleMessageFunc @@ -77,7 +77,7 @@ func RunQuickTaskNode(ctx context.Context, input QuickTaskNodeInput) error { messages := newagentprompt.BuildQuickTaskMessagesSimple(input.UserInput) // 2. 真流式调用 LLM。 - reader, err := input.Client.Stream(ctx, messages, infrallm.GenerateOptions{ + reader, err := input.Client.Stream(ctx, messages, llmservice.GenerateOptions{ Temperature: 0.3, MaxTokens: 512, }) @@ -130,7 +130,7 @@ func RunQuickTaskNode(ctx context.Context, input QuickTaskNodeInput) error { // 解析 JSON。 log.Printf("[DEBUG] quick_task: LLM 原始决策 JSON chat=%s json=%s", flowState.ConversationID, result.DecisionJSON) var parseErr error - decision, parseErr = infrallm.ParseJSONObject[quickTaskDecision](result.DecisionJSON) + decision, parseErr = llmservice.ParseJSONObject[quickTaskDecision](result.DecisionJSON) if parseErr != nil { log.Printf("[DEBUG] quick_task: JSON 解析失败 chat=%s json=%s", flowState.ConversationID, result.DecisionJSON) if result.RawBuffer != "" { diff --git a/backend/newAgent/node/unified_compact.go b/backend/newAgent/node/unified_compact.go index bce1eb6..91cad2a 100644 --- a/backend/newAgent/node/unified_compact.go +++ b/backend/newAgent/node/unified_compact.go @@ -6,11 +6,11 @@ import ( "fmt" "log" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" "github.com/LoveLosita/smartflow/backend/pkg" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" "github.com/cloudwego/eino/schema" ) @@ -22,7 +22,7 @@ import ( // 3. StageName 和 StatusBlockID 用于区分日志来源和 SSE 状态推送。 type UnifiedCompactInput struct { // Client 用于调用 LLM 压缩 msg1/msg2。 - Client *infrallm.Client + Client *llmservice.Client // CompactionStore 用于持久化压缩摘要和 token 统计,为 nil 时跳过持久化。 CompactionStore newagentmodel.CompactionStore // FlowState 提供 userID / chatID / roundUsed 等定位信息。 diff --git a/backend/newAgent/prompt/compact_msg1.go b/backend/newAgent/prompt/compact_msg1.go index 2fd2c61..6e8aed5 100644 --- a/backend/newAgent/prompt/compact_msg1.go +++ b/backend/newAgent/prompt/compact_msg1.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" "github.com/cloudwego/eino/schema" ) @@ -24,7 +24,7 @@ const compactMsg1SystemPrompt = `你是一个对话压缩助手。你的任务 // existingSummary 不为空时表示已有旧摘要,需要合并压缩。 func CompactMsg1( ctx context.Context, - client *infrallm.Client, + client *llmservice.Client, historyText string, existingSummary string, ) (string, error) { @@ -49,7 +49,7 @@ func CompactMsg1( schema.UserMessage(userContent), } - result, err := client.GenerateText(ctx, messages, infrallm.GenerateOptions{ + result, err := client.GenerateText(ctx, messages, llmservice.GenerateOptions{ MaxTokens: 4000, }) if err != nil { diff --git a/backend/newAgent/prompt/compact_msg2.go b/backend/newAgent/prompt/compact_msg2.go index b7ebb43..95605c5 100644 --- a/backend/newAgent/prompt/compact_msg2.go +++ b/backend/newAgent/prompt/compact_msg2.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" "github.com/cloudwego/eino/schema" ) @@ -23,7 +23,7 @@ const compactMsg2SystemPrompt = `你是一个执行记录压缩助手。你的 // recentText 是保留的近期记录原文,不参与压缩。 func CompactMsg2( ctx context.Context, - client *infrallm.Client, + client *llmservice.Client, earlyLoopText string, ) (string, error) { userContent := fmt.Sprintf(`早期的 ReAct 执行记录: @@ -36,7 +36,7 @@ func CompactMsg2( schema.UserMessage(userContent), } - result, err := client.GenerateText(ctx, messages, infrallm.GenerateOptions{ + result, err := client.GenerateText(ctx, messages, llmservice.GenerateOptions{ MaxTokens: 4000, }) if err != nil { diff --git a/backend/newAgent/router/decision_parser.go b/backend/newAgent/router/decision_parser.go index d990552..dd46671 100644 --- a/backend/newAgent/router/decision_parser.go +++ b/backend/newAgent/router/decision_parser.go @@ -26,7 +26,7 @@ var ( // StreamDecisionResult 描述解析器的最终输出状态。 type StreamDecisionResult struct { // DecisionJSON 是标签内提取的完整 JSON 字符串。 - // 调用方应使用 infrallm.ParseJSONObject[T] 将其解析为具体决策类型。 + // 调用方应使用 llmservice.ParseJSONObject[T] 将其解析为具体决策类型。 DecisionJSON string // BeforeText 是 标签之前的自然语言前言。 @@ -179,7 +179,7 @@ func (p *StreamDecisionParser) Result() *StreamDecisionResult { } // extractJSONFromTag 从标签内文本中提取第一个完整 JSON 对象。 -// 复用括号计数逻辑,与 infrallm.ExtractJSONObject 一致。 +// 复用括号计数逻辑,与 llmservice.ExtractJSONObject 一致。 func extractJSONFromTag(text string) string { clean := strings.TrimSpace(text) if clean == "" { diff --git a/backend/newAgent/shared/node_thinking.go b/backend/newAgent/shared/node_thinking.go index ad14b7c..9a2da15 100644 --- a/backend/newAgent/shared/node_thinking.go +++ b/backend/newAgent/shared/node_thinking.go @@ -1,10 +1,10 @@ package newagentshared -import infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" +import llmservice "github.com/LoveLosita/smartflow/backend/services/llm" -func ResolveThinkingMode(enabled bool) infrallm.ThinkingMode { +func ResolveThinkingMode(enabled bool) llmservice.ThinkingMode { if enabled { - return infrallm.ThinkingModeEnabled + return llmservice.ThinkingModeEnabled } - return infrallm.ThinkingModeDisabled + return llmservice.ThinkingModeDisabled } diff --git a/backend/newAgent/shared/node_unified_compact.go b/backend/newAgent/shared/node_unified_compact.go index 7d1c41f..4dab8c6 100644 --- a/backend/newAgent/shared/node_unified_compact.go +++ b/backend/newAgent/shared/node_unified_compact.go @@ -6,11 +6,11 @@ import ( "fmt" "log" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" "github.com/LoveLosita/smartflow/backend/pkg" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" "github.com/cloudwego/eino/schema" ) @@ -22,7 +22,7 @@ import ( // 3. StageName 和 StatusBlockID 用于区分日志来源与 SSE 状态推送目标。 type UnifiedCompactInput struct { // Client 用于调用 LLM 压缩 msg1/msg2。 - Client *infrallm.Client + Client *llmservice.Client // CompactionStore 用于持久化压缩摘要和 token 统计,为 nil 时跳过持久化。 CompactionStore newagentmodel.CompactionStore // FlowState 提供 userID / conversationID / roundUsed 等定位信息。 diff --git a/backend/newAgent/stream/emitter.go b/backend/newAgent/stream/emitter.go index b0fb041..6717557 100644 --- a/backend/newAgent/stream/emitter.go +++ b/backend/newAgent/stream/emitter.go @@ -8,7 +8,7 @@ import ( "sync" "time" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) // PayloadEmitter 是真正向外层 SSE 管道写 chunk 的最小接口。 @@ -540,7 +540,7 @@ func (e *ChunkEmitter) EmitDone() error { // 3. 不负责打开/关闭 StreamReader,调用方负责生命周期管理。 func (e *ChunkEmitter) EmitStreamAssistantText( ctx context.Context, - reader infrallm.StreamReader, + reader llmservice.StreamReader, blockID, stage string, ) (string, error) { if e == nil || reader == nil { @@ -598,7 +598,7 @@ func (e *ChunkEmitter) EmitStreamAssistantText( // 用于只需展示思考过程而无需展示正文的场景。 func (e *ChunkEmitter) EmitStreamReasoningText( ctx context.Context, - reader infrallm.StreamReader, + reader llmservice.StreamReader, blockID, stage string, ) (string, error) { if e == nil || reader == nil { diff --git a/backend/newAgent/tools/registry.go b/backend/newAgent/tools/registry.go index 5c55e57..d91dfcd 100644 --- a/backend/newAgent/tools/registry.go +++ b/backend/newAgent/tools/registry.go @@ -5,9 +5,9 @@ import ( "sort" "strings" - infrarag "github.com/LoveLosita/smartflow/backend/infra/rag" "github.com/LoveLosita/smartflow/backend/newAgent/tools/schedule" "github.com/LoveLosita/smartflow/backend/newAgent/tools/web" + ragservice "github.com/LoveLosita/smartflow/backend/services/rag" ) // ToolHandler 约定所有工具的统一执行签名。 @@ -32,7 +32,7 @@ type ToolSchemaEntry struct { // 2. 某些依赖即便暂未使用也允许保留,避免业务层重新到处 new; // 3. 具体依赖缺失时由对应工具自行返回结构化失败结果。 type DefaultRegistryDeps struct { - RAGRuntime infrarag.Runtime + RAGRuntime ragservice.Runtime // WebSearchProvider 为 nil 时,web_search / web_fetch 仍会注册, // 但 handler 会返回“暂未启用”的只读 observation,不阻断主流程。 diff --git a/backend/service/agent_bridge.go b/backend/service/agent_bridge.go index 1f7b118..a436ee8 100644 --- a/backend/service/agent_bridge.go +++ b/backend/service/agent_bridge.go @@ -3,8 +3,8 @@ package service import ( "github.com/LoveLosita/smartflow/backend/dao" outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" - "github.com/LoveLosita/smartflow/backend/inits" "github.com/LoveLosita/smartflow/backend/service/agentsvc" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) // AgentService 是 service 层对 agentsvc.AgentService 的兼容别名。 @@ -20,7 +20,7 @@ type AgentService = agentsvc.AgentService // 2) 主动调度 session DAO 也在这里显式透传,避免聊天入口再去回查全局单例; // 3) 真实构造逻辑已下沉到 service/agentsvc 包。 func NewAgentService( - aiHub *inits.AIHub, + llmService *llmservice.Service, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, cacheDAO *dao.CacheDAO, @@ -29,7 +29,7 @@ func NewAgentService( activeSessionDAO *dao.ActiveScheduleSessionDAO, eventPublisher outboxinfra.EventPublisher, ) *AgentService { - return agentsvc.NewAgentService(aiHub, repo, taskRepo, cacheDAO, agentRedis, activeScheduleDAO, activeSessionDAO, eventPublisher) + return agentsvc.NewAgentService(llmService, repo, taskRepo, cacheDAO, agentRedis, activeScheduleDAO, activeSessionDAO, eventPublisher) } // NewAgentServiceWithSchedule 在基础 AgentService 上注入排程依赖。 @@ -39,7 +39,7 @@ func NewAgentService( // 2) 排程依赖为可选:未注入时排程路由自动回退到普通聊天; // 3) 主动调度 session DAO 仍沿用统一构造注入,避免排程分支自己拼装仓储。 func NewAgentServiceWithSchedule( - aiHub *inits.AIHub, + llmService *llmservice.Service, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, cacheDAO *dao.CacheDAO, @@ -50,7 +50,7 @@ func NewAgentServiceWithSchedule( scheduleSvc *ScheduleService, taskSvc *TaskService, ) *AgentService { - svc := agentsvc.NewAgentService(aiHub, repo, taskRepo, cacheDAO, agentRedis, activeScheduleDAO, activeSessionDAO, eventPublisher) + svc := agentsvc.NewAgentService(llmService, repo, taskRepo, cacheDAO, agentRedis, activeScheduleDAO, activeSessionDAO, eventPublisher) // 注入排程依赖:将 service 层方法包装为函数闭包,避免循环依赖。 if scheduleSvc != nil { diff --git a/backend/service/agentsvc/agent.go b/backend/service/agentsvc/agent.go index 9db635e..f50c2d4 100644 --- a/backend/service/agentsvc/agent.go +++ b/backend/service/agentsvc/agent.go @@ -3,6 +3,7 @@ package agentsvc import ( "context" "encoding/json" + "errors" "log" "strconv" "strings" @@ -11,7 +12,6 @@ import ( "github.com/LoveLosita/smartflow/backend/conv" "github.com/LoveLosita/smartflow/backend/dao" outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" - "github.com/LoveLosita/smartflow/backend/inits" memorymodel "github.com/LoveLosita/smartflow/backend/memory/model" memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe" "github.com/LoveLosita/smartflow/backend/model" @@ -20,13 +20,13 @@ import ( newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools" "github.com/LoveLosita/smartflow/backend/pkg" eventsvc "github.com/LoveLosita/smartflow/backend/service/events" - "github.com/cloudwego/eino-ext/components/model/ark" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" "github.com/cloudwego/eino/schema" "github.com/google/uuid" ) type AgentService struct { - AIHub *inits.AIHub + llmService *llmservice.Service repo *dao.AgentDAO taskRepo *dao.TaskDAO cacheDAO *dao.CacheDAO @@ -75,7 +75,7 @@ type AgentService struct { // 这里通过依赖注入把“模型、仓储、缓存、异步持久化通道”统一交给服务层管理, // 便于后续在单测中替换实现,或在启动流程中按环境切换配置。 func NewAgentService( - aiHub *inits.AIHub, + llmService *llmservice.Service, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, cacheDAO *dao.CacheDAO, @@ -90,7 +90,7 @@ func NewAgentService( ensureTokenMeterCallbackRegistered() return &AgentService{ - AIHub: aiHub, + llmService: llmService, repo: repo, taskRepo: taskRepo, cacheDAO: cacheDAO, @@ -123,8 +123,11 @@ func thinkingModeToBool(mode string) bool { // 当前约定: // - 旧链路已全面切到 newAgent graph,这里仅作为 runNormalChatFlow 回退时的模型选择入口; // - 统一返回 Pro 模型,旧 strategist 参数不再生效。 -func (s *AgentService) pickChatModel(requestModel string) (*ark.ChatModel, string) { - return s.AIHub.Pro, "pro" +func (s *AgentService) pickChatModel(requestModel string) (*llmservice.Client, string) { + if s == nil || s.llmService == nil { + return nil, "pro" + } + return s.llmService.ProClient(), "pro" } // PersistChatHistory 是 Agent 聊天链路唯一的“消息持久化入口”。 @@ -304,7 +307,7 @@ func pushErrNonBlocking(errChan chan error, err error) { // 2) 开启随口记进度推送后,最终判定“非随口记”时回落到普通聊天。 func (s *AgentService) runNormalChatFlow( ctx context.Context, - selectedModel *ark.ChatModel, + selectedModel *llmservice.Client, resolvedModelName string, userMessage string, userPersisted bool, @@ -365,6 +368,12 @@ func (s *AgentService) runNormalChatFlow( } } + // 6.0. 没有可用模型时,直接中止普通聊天,避免写入半截用户消息后没有后续回复。 + if selectedModel == nil { + pushErrNonBlocking(errChan, errors.New("llm client is not ready")) + return + } + // 6. 执行真正的流式聊天。 // fullText 用于后续写 Redis/持久化,outChan 用于把流片段实时推给前端。 fullText, _, reasoningDurationSeconds, streamUsage, streamErr := s.streamChatFallback(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan, assistantReasoningStartedAt, userID, chatID) diff --git a/backend/service/agentsvc/agent_meta.go b/backend/service/agentsvc/agent_meta.go index c3ea28f..4122308 100644 --- a/backend/service/agentsvc/agent_meta.go +++ b/backend/service/agentsvc/agent_meta.go @@ -11,10 +11,8 @@ import ( "github.com/LoveLosita/smartflow/backend/model" "github.com/LoveLosita/smartflow/backend/respond" eventsvc "github.com/LoveLosita/smartflow/backend/service/events" - "github.com/cloudwego/eino-ext/components/model/ark" - einoModel "github.com/cloudwego/eino/components/model" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" "github.com/cloudwego/eino/schema" - arkModel "github.com/volcengine/volcengine-go-sdk/service/arkruntime/model" ) const ( @@ -253,11 +251,11 @@ func (s *AgentService) generateConversationTitle(ctx context.Context, history [] } // 2. 标题生成属于结构化短输出,关闭 thinking 并限制 tokens,降低延迟与发散。 - resp, err := modelInst.Generate(ctx, messages, - ark.WithThinking(&arkModel.Thinking{Type: arkModel.ThinkingTypeDisabled}), - einoModel.WithTemperature(0.2), - einoModel.WithMaxTokens(40), - ) + resp, err := modelInst.GenerateText(ctx, messages, llmservice.GenerateOptions{ + Temperature: 0.2, + MaxTokens: 40, + Thinking: llmservice.ThinkingModeDisabled, + }) if err != nil { return "", 0, err } @@ -267,26 +265,26 @@ func (s *AgentService) generateConversationTitle(ctx context.Context, history [] // 2.1 标题链路的 token 从模型响应 usage 中提取;缺失则按 0 处理,不影响主流程。 titleTokens := 0 - if resp.ResponseMeta != nil && resp.ResponseMeta.Usage != nil { + if resp.Usage != nil { titleTokens = normalizeUsageTotal( - resp.ResponseMeta.Usage.TotalTokens, - resp.ResponseMeta.Usage.PromptTokens, - resp.ResponseMeta.Usage.CompletionTokens, + resp.Usage.TotalTokens, + resp.Usage.PromptTokens, + resp.Usage.CompletionTokens, ) } - return normalizeConversationTitle(resp.Content), titleTokens, nil + return normalizeConversationTitle(resp.Text), titleTokens, nil } // pickTitleModel 选择用于标题生成的模型。 // 优先 Lite(成本低、速度快);Lite 不可用时回退 Pro。 -func (s *AgentService) pickTitleModel() *ark.ChatModel { - if s.AIHub == nil { +func (s *AgentService) pickTitleModel() *llmservice.Client { + if s == nil || s.llmService == nil { return nil } - if s.AIHub.Lite != nil { - return s.AIHub.Lite + if client := s.llmService.LiteClient(); client != nil { + return client } - return s.AIHub.Pro + return s.llmService.ProClient() } // buildConversationTitleUserPrompt 把消息历史拼成可读文本供模型总结。 diff --git a/backend/service/agentsvc/agent_newagent.go b/backend/service/agentsvc/agent_newagent.go index 0d56ab4..1a3bdb3 100644 --- a/backend/service/agentsvc/agent_newagent.go +++ b/backend/service/agentsvc/agent_newagent.go @@ -8,7 +8,6 @@ import ( "strings" "time" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentconv "github.com/LoveLosita/smartflow/backend/newAgent/conv" newagentgraph "github.com/LoveLosita/smartflow/backend/newAgent/graph" newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model" @@ -57,6 +56,11 @@ func (s *AgentService) runNewAgentGraph( errChan chan error, ) { requestCtx, _ := withRequestTokenMeter(ctx) + if s == nil || s.llmService == nil { + // 0. newAgent 主链强依赖 llm-service;装配漏传时直接返回错误,避免 nil receiver panic。 + pushErrNonBlocking(errChan, errors.New("agent llm service is not initialized")) + return + } // 1. 规范会话 ID 和模型选择。 chatID = normalizeConversationID(chatID) @@ -184,14 +188,15 @@ func (s *AgentService) runNewAgentGraph( } graphRequest.Normalize() - // 8. 适配 LLM clients(从 AIHub 的 ark.ChatModel 转换为 newAgent LLM Client)。 + // 8. 适配 LLM clients(统一从 llm-service 取出 newAgent 图所需模型,不再直接碰 AIHub)。 // 8.1 Chat/Deliver 使用 Pro 模型:路由分流、闲聊、交付总结属于标准复杂度。 // 8.2 Plan/Execute 使用 Max 模型:规划和 ReAct 循环需要深度推理能力。 - chatClient := infrallm.WrapArkClient(s.AIHub.Pro) - planClient := infrallm.WrapArkClient(s.AIHub.Max) - executeClient := infrallm.WrapArkClient(s.AIHub.Max) - deliverClient := infrallm.WrapArkClient(s.AIHub.Pro) - summaryClient := infrallm.WrapArkClient(s.AIHub.Lite) + llmClients := s.llmService.NewAgentModelClients() + chatClient := llmClients.Chat + planClient := llmClients.Plan + executeClient := llmClients.Execute + deliverClient := llmClients.Deliver + summaryClient := llmClients.Summary // 9. 适配 SSE emitter。 sseEmitter := newagentstream.NewSSEPayloadEmitter(outChan) @@ -244,8 +249,8 @@ func (s *AgentService) runNewAgentGraph( log.Printf("[ERROR] newAgent graph 执行失败 trace=%s chat=%s: %v", traceID, chatID, graphErr) pushErrNonBlocking(errChan, fmt.Errorf("graph 执行失败: %w", graphErr)) - // Graph 出错时回退普通聊天,保证可用性。回退使用 Pro 模型。 - s.runNormalChatFlow(requestCtx, s.AIHub.Pro, resolvedModelName, userMessage, true, "", nil, thinkingModeToBool(thinkingMode), userID, chatID, traceID, requestStart, outChan, errChan) + // Graph 出错时回退普通聊天,保证可用性。回退使用 llm-service 的 Pro 模型。 + s.runNormalChatFlow(requestCtx, chatClient, resolvedModelName, userMessage, true, "", nil, thinkingModeToBool(thinkingMode), userID, chatID, traceID, requestStart, outChan, errChan) return } diff --git a/backend/service/agentsvc/agent_stream_fallback.go b/backend/service/agentsvc/agent_stream_fallback.go index 38c5ef6..43be6f6 100644 --- a/backend/service/agentsvc/agent_stream_fallback.go +++ b/backend/service/agentsvc/agent_stream_fallback.go @@ -6,20 +6,18 @@ import ( "strings" "time" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" - "github.com/cloudwego/eino-ext/components/model/ark" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" "github.com/cloudwego/eino/schema" "github.com/google/uuid" - arkModel "github.com/volcengine/volcengine-go-sdk/service/arkruntime/model" ) // streamChatFallback 是 graph 执行失败时的降级流式聊天。 // 内联了旧 agentchat.StreamChat 的核心逻辑,不再依赖 agent/ 包。 func (s *AgentService) streamChatFallback( ctx context.Context, - llm *ark.ChatModel, + llm *llmservice.Client, modelName string, userInput string, ifThinking bool, @@ -36,13 +34,6 @@ func (s *AgentService) streamChatFallback( } messages = append(messages, schema.UserMessage(userInput)) - var thinking *ark.Thinking - if ifThinking { - thinking = &arkModel.Thinking{Type: arkModel.ThinkingTypeEnabled} - } else { - thinking = &arkModel.Thinking{Type: arkModel.ThinkingTypeDisabled} - } - if strings.TrimSpace(modelName) == "" { modelName = "smartflow-worker" } @@ -50,7 +41,11 @@ func (s *AgentService) streamChatFallback( created := time.Now().Unix() firstChunk := true chunkEmitter := newagentstream.NewChunkEmitter(newagentstream.NewSSEPayloadEmitter(outChan), requestID, modelName, created) - chunkEmitter.SetReasoningSummaryFunc(s.makeReasoningSummaryFunc(infrallm.WrapArkClient(s.AIHub.Lite))) + reasoningSummaryClient := s.llmService.LiteClient() + if reasoningSummaryClient == nil { + reasoningSummaryClient = s.llmService.ProClient() + } + chunkEmitter.SetReasoningSummaryFunc(s.makeReasoningSummaryFunc(reasoningSummaryClient)) chunkEmitter.SetExtraEventHook(func(extra *newagentstream.OpenAIChunkExtra) { s.persistNewAgentTimelineExtraEvent(context.Background(), userID, chatID, extra) }) @@ -75,7 +70,14 @@ func (s *AgentService) streamChatFallback( } var reasoningEndAt *time.Time - reader, err := llm.Stream(ctx, messages, ark.WithThinking(thinking)) + thinkingMode := llmservice.ThinkingModeDisabled + if ifThinking { + thinkingMode = llmservice.ThinkingModeEnabled + } + + reader, err := llm.Stream(ctx, messages, llmservice.GenerateOptions{ + Thinking: thinkingMode, + }) if err != nil { return "", "", 0, nil, err } diff --git a/backend/service/agentsvc/reasoning_summary.go b/backend/service/agentsvc/reasoning_summary.go index b2b1588..d75a7d3 100644 --- a/backend/service/agentsvc/reasoning_summary.go +++ b/backend/service/agentsvc/reasoning_summary.go @@ -6,9 +6,9 @@ import ( "log" "strings" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt" newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) const reasoningSummaryMaxTokens = 700 @@ -24,7 +24,7 @@ type reasoningSummaryLLMResponse struct { // 1. service 层负责选择模型与 prompt,stream 层只负责调度和闸门; // 2. 这里不持久化摘要,持久化统一走 ChunkEmitter 的 extra hook; // 3. 摘要失败时返回 error,由 ReasoningDigestor 吞掉并等待下一次水位线/Flush 兜底。 -func (s *AgentService) makeReasoningSummaryFunc(client *infrallm.Client) newagentstream.ReasoningSummaryFunc { +func (s *AgentService) makeReasoningSummaryFunc(client *llmservice.Client) newagentstream.ReasoningSummaryFunc { if client == nil { return nil } @@ -47,14 +47,14 @@ func (s *AgentService) makeReasoningSummaryFunc(client *infrallm.Client) newagen DurationSeconds: input.DurationSeconds, }) - resp, rawResult, err := infrallm.GenerateJSON[reasoningSummaryLLMResponse]( + resp, rawResult, err := llmservice.GenerateJSON[reasoningSummaryLLMResponse]( ctx, client, messages, - infrallm.GenerateOptions{ + llmservice.GenerateOptions{ Temperature: 0.1, MaxTokens: reasoningSummaryMaxTokens, - Thinking: infrallm.ThinkingModeDisabled, + Thinking: llmservice.ThinkingModeDisabled, Metadata: map[string]any{ "stage": "reasoning_summary", "candidate_seq": input.CandidateSeq, @@ -99,7 +99,7 @@ func limitReasoningDetailSummary(text string, maxRunes int) string { return string(runes[:maxRunes]) } -func truncateReasoningSummaryRaw(raw *infrallm.TextResult) string { +func truncateReasoningSummaryRaw(raw *llmservice.TextResult) string { if raw == nil { return "" } diff --git a/backend/service/course.go b/backend/service/course.go index d7cd299..9dd87d6 100644 --- a/backend/service/course.go +++ b/backend/service/course.go @@ -6,16 +6,16 @@ import ( "github.com/LoveLosita/smartflow/backend/conv" "github.com/LoveLosita/smartflow/backend/dao" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" "github.com/LoveLosita/smartflow/backend/model" "github.com/LoveLosita/smartflow/backend/respond" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) type CourseService struct { // 伸出手:准备接住 DAO courseDAO *dao.CourseDAO scheduleDAO *dao.ScheduleDAO - courseImageResponsesClient *infrallm.ArkResponsesClient + courseImageResponsesClient *llmservice.ArkResponsesClient courseImageConfig CourseImageParseConfig courseImageModel string } @@ -24,7 +24,7 @@ type CourseService struct { func NewCourseService( courseDAO *dao.CourseDAO, scheduleDAO *dao.ScheduleDAO, - courseImageResponsesClient *infrallm.ArkResponsesClient, + courseImageResponsesClient *llmservice.ArkResponsesClient, courseImageConfig CourseImageParseConfig, courseImageModel string, ) *CourseService { diff --git a/backend/service/course_parse_ark.go b/backend/service/course_parse_ark.go index 5fc338f..45ea9c7 100644 --- a/backend/service/course_parse_ark.go +++ b/backend/service/course_parse_ark.go @@ -8,16 +8,20 @@ import ( "strings" "time" - infrallm "github.com/LoveLosita/smartflow/backend/infra/llm" "github.com/LoveLosita/smartflow/backend/model" + llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) // ParseCourseTableImage 使用 Ark SDK Responses 解析课程表图片。 func (ss *CourseService) ParseCourseTableImage(ctx context.Context, req model.CourseImageParseRequest) (*model.CourseImageParseResponse, error) { if ss == nil || ss.courseImageResponsesClient == nil { + modelName := "" + if ss != nil { + modelName = ss.courseImageModel + } log.Printf( "[COURSE_PARSE][SERVICE] parser unavailable model_name=%q filename=%q mime=%q bytes=%d", - ss.courseImageModel, + modelName, req.Filename, req.MIMEType, len(req.ImageBytes), @@ -57,7 +61,7 @@ func (ss *CourseService) ParseCourseTableImage(ctx context.Context, req model.Co base64Chars, promptChars, base64Chars+promptChars+len(strings.TrimSpace(courseImageParseSystemPrompt)), - infrallm.ThinkingModeDisabled, + llmservice.ThinkingModeDisabled, courseImageParseTemperature, ss.courseImageConfig.MaxTokens, "json_object", @@ -66,10 +70,10 @@ func (ss *CourseService) ParseCourseTableImage(ctx context.Context, req model.Co // 1. 课程表图片识别输出体量大,显式透传 max_output_tokens,避免被默认值截断。 // 2. text_format 固定为 json_object,降低输出混入解释文本导致解析失败的概率。 // 3. thinking 显式关闭,优先保证课程导入链路稳定性。 - draft, rawResult, err := infrallm.GenerateArkResponsesJSON[model.CourseImageParseResponse](ctx, ss.courseImageResponsesClient, messages, infrallm.ArkResponsesOptions{ + draft, rawResult, err := llmservice.GenerateArkResponsesJSON[model.CourseImageParseResponse](ctx, ss.courseImageResponsesClient, messages, llmservice.ArkResponsesOptions{ Temperature: courseImageParseTemperature, MaxOutputTokens: ss.courseImageConfig.MaxTokens, - Thinking: infrallm.ThinkingModeDisabled, + Thinking: llmservice.ThinkingModeDisabled, TextFormat: "json_object", }) if err != nil { @@ -188,12 +192,12 @@ func (ss *CourseService) ParseCourseTableImage(ctx context.Context, req model.Co return normalizedDraft, nil } -func buildCourseImageParseResponsesMessages(req *model.CourseImageParseRequest) ([]infrallm.ArkResponsesMessage, int, int) { +func buildCourseImageParseResponsesMessages(req *model.CourseImageParseRequest) ([]llmservice.ArkResponsesMessage, int, int) { userPrompt := fmt.Sprintf(courseImageParseUserPromptTemplate, req.Filename, req.MIMEType) base64Data := base64.StdEncoding.EncodeToString(req.ImageBytes) imageDataURL := fmt.Sprintf("data:%s;base64,%s", req.MIMEType, base64Data) - messages := []infrallm.ArkResponsesMessage{ + messages := []llmservice.ArkResponsesMessage{ { Role: "system", Text: strings.TrimSpace(courseImageParseSystemPrompt), @@ -208,7 +212,7 @@ func buildCourseImageParseResponsesMessages(req *model.CourseImageParseRequest) return messages, len(base64Data), len(strings.TrimSpace(userPrompt)) } -func isCourseImageOutputTruncated(rawResult *infrallm.ArkResponsesResult) bool { +func isCourseImageOutputTruncated(rawResult *llmservice.ArkResponsesResult) bool { if rawResult == nil { return false } diff --git a/backend/infra/llm/ark.go b/backend/services/llm/ark.go similarity index 73% rename from backend/infra/llm/ark.go rename to backend/services/llm/ark.go index 6ee6c14..a2e7bbb 100644 --- a/backend/infra/llm/ark.go +++ b/backend/services/llm/ark.go @@ -1,7 +1,3 @@ -// 过渡期统一 Ark 调用封装。 -// -// 这里保留 CallArkText / CallArkJSON,方便暂时还直接持有 *ark.ChatModel 的调用点 -// 逐步迁移到统一 Client。后续 memory 也可以直接复用这套中立层。 package llm import ( @@ -15,12 +11,7 @@ import ( arkModel "github.com/volcengine/volcengine-go-sdk/service/arkruntime/model" ) -// ArkCallOptions 是基于 ark.ChatModel 的通用调用选项。 -// -// 设计目的: -// 1. 先把 Ark 调用样板抽成公共层; -// 2. 再由 WrapArkClient 提供统一 Client; -// 3. 让上层尽量只关注业务 prompt 和结构化结果。 +// ArkCallOptions 是直接调用 ark.ChatModel 时使用的通用入参。 type ArkCallOptions struct { Temperature float64 MaxTokens int @@ -28,12 +19,6 @@ type ArkCallOptions struct { } // CallArkText 调用 ark 模型并返回纯文本。 -// -// 职责边界: -// 1. 负责拼 system + user 两段消息; -// 2. 负责统一配置 thinking / temperature / maxTokens; -// 3. 负责拦截空响应; -// 4. 不负责 JSON 解析,不负责业务字段校验。 func CallArkText(ctx context.Context, chatModel *ark.ChatModel, systemPrompt, userPrompt string, options ArkCallOptions) (string, error) { if chatModel == nil { return "", errors.New("ark model is nil") @@ -76,6 +61,7 @@ func buildArkOptions(options ArkCallOptions) []einoModel.Option { if options.Thinking == ThinkingModeEnabled { thinkingType = arkModel.ThinkingTypeEnabled } + opts := []einoModel.Option{ ark.WithThinking(&arkModel.Thinking{Type: thinkingType}), einoModel.WithTemperature(float32(options.Temperature)), diff --git a/backend/infra/llm/ark_adapter.go b/backend/services/llm/ark_adapter.go similarity index 68% rename from backend/infra/llm/ark_adapter.go rename to backend/services/llm/ark_adapter.go index 3234501..9d2d28b 100644 --- a/backend/infra/llm/ark_adapter.go +++ b/backend/services/llm/ark_adapter.go @@ -12,17 +12,14 @@ import ( ) // WrapArkClient 将 ark.ChatModel 适配为统一 Client。 -// -// 职责边界: -// 1. generateText:调用 ark.ChatModel.Generate(非流式),供 GenerateJSON 使用; -// 2. streamText:调用 ark.ChatModel.Stream(流式),供需要流式输出的场景使用; -// 3. 两者共用同一套 options 转换。 +// 1. generateText 走 Generate,供 GenerateJSON/GenerateText 使用。 +// 2. streamText 走 Stream,供需要流式输出的场景使用。 +// 3. 两条路径共用同一套参数转换逻辑。 func WrapArkClient(arkChatModel *ark.ChatModel) *Client { if arkChatModel == nil { return nil } - // 非流式文本生成,供 GenerateJSON / GenerateText 调用路径使用。 generateFunc := func(ctx context.Context, messages []*schema.Message, options GenerateOptions) (*TextResult, error) { arkOpts := buildArkStreamOptions(options) msg, err := arkChatModel.Generate(ctx, messages, arkOpts...) @@ -47,7 +44,6 @@ func WrapArkClient(arkChatModel *ark.ChatModel) *Client { }, nil } - // 流式文本生成。 streamFunc := func(ctx context.Context, messages []*schema.Message, options GenerateOptions) (StreamReader, error) { arkOpts := buildArkStreamOptions(options) reader, err := arkChatModel.Stream(ctx, messages, arkOpts...) @@ -60,11 +56,10 @@ func WrapArkClient(arkChatModel *ark.ChatModel) *Client { return NewClient(generateFunc, streamFunc) } -// buildArkStreamOptions 将统一 GenerateOptions 转换为 ark 的流式调用选项。 +// buildArkStreamOptions 将统一的 GenerateOptions 转换为 ark 的流式调用参数。 func buildArkStreamOptions(options GenerateOptions) []einoModel.Option { thinkingEnabled := options.Thinking == ThinkingModeEnabled - // Thinking thinkingType := arkModel.ThinkingTypeDisabled if thinkingEnabled { thinkingType = arkModel.ThinkingTypeEnabled @@ -73,16 +68,12 @@ func buildArkStreamOptions(options GenerateOptions) []einoModel.Option { ark.WithThinking(&arkModel.Thinking{Type: thinkingType}), } - // Temperature:thinking 模型强制要求 temperature=1,否则 API 静默忽略 thinking。 if thinkingEnabled { opts = append(opts, einoModel.WithTemperature(1.0)) } else if options.Temperature > 0 { opts = append(opts, einoModel.WithTemperature(float32(options.Temperature))) } - // MaxTokens:thinking 模式下 thinking token 占用 max_tokens 预算, - // 调用方设定的值仅代表"期望输出长度",实际预算需留出思考空间。 - // 最低保障 16000,避免思考链被截断导致输出为空或非 JSON。 maxTokens := options.MaxTokens if thinkingEnabled { const minThinkingBudget = 16000 @@ -97,14 +88,12 @@ func buildArkStreamOptions(options GenerateOptions) []einoModel.Option { return opts } -// arkStreamReaderAdapter 适配 ark.ChatModel.Stream 返回的 reader。 -// ark.Stream 返回 schema.StreamReader[*schema.Message],其 Close() 方法无返回值 -// 而我们的 StreamReader 接口要求 Close() error +// arkStreamReaderAdapter 把 ark 的流式 reader 转成统一的 StreamReader 接口。 type arkStreamReaderAdapter struct { reader *schema.StreamReader[*schema.Message] } -// Recv 转发到 ark reader 的 Recv 方法。 +// Recv 转发到底层 reader。 func (r *arkStreamReaderAdapter) Recv() (*schema.Message, error) { if r == nil || r.reader == nil { return nil, io.EOF @@ -112,8 +101,7 @@ func (r *arkStreamReaderAdapter) Recv() (*schema.Message, error) { return r.reader.Recv() } -// Close 转发到 ark reader 的 Close 方法。 -// ark 的 Close() 无返回值,我们适配为返回 nil +// Close 适配 ark reader 的 Close 行为。 func (r *arkStreamReaderAdapter) Close() error { if r == nil || r.reader == nil { return nil diff --git a/backend/infra/llm/ark_responses_client.go b/backend/services/llm/ark_responses_client.go similarity index 93% rename from backend/infra/llm/ark_responses_client.go rename to backend/services/llm/ark_responses_client.go index a82aed6..7f7eede 100644 --- a/backend/infra/llm/ark_responses_client.go +++ b/backend/services/llm/ark_responses_client.go @@ -11,11 +11,6 @@ import ( ) // ArkResponsesMessage 描述一次 Responses 输入消息。 -// -// 职责边界: -// 1. 负责表达角色与多模态内容(文本/图片); -// 2. 不负责业务 prompt 生成; -// 3. 不负责输出 JSON 的字段校验。 type ArkResponsesMessage struct { Role string Text string @@ -23,7 +18,7 @@ type ArkResponsesMessage struct { ImageDetail string } -// ArkResponsesOptions 描述 Responses 生成选项。 +// ArkResponsesOptions 描述 Responses 调用参数。 type ArkResponsesOptions struct { Model string Temperature float64 @@ -32,14 +27,14 @@ type ArkResponsesOptions struct { TextFormat string } -// ArkResponsesUsage 统一透传 token 使用量。 +// ArkResponsesUsage 统一转写 token usage。 type ArkResponsesUsage struct { InputTokens int64 OutputTokens int64 TotalTokens int64 } -// ArkResponsesResult 是 Ark Responses 的统一输出结构。 +// ArkResponsesResult 是 Responses 调用的统一输出结构。 type ArkResponsesResult struct { Text string Status string @@ -56,11 +51,9 @@ type ArkResponsesClient struct { } // NewArkResponsesClient 创建 Ark SDK Responses 客户端。 -// -// 说明: -// 1. model 为空时返回 nil,表示当前能力未启用; -// 2. baseURL 为空时使用 SDK 默认地址; -// 3. 仅负责客户端创建,不做连通性探测。 +// 1. model 为空时直接返回 nil,表示这条能力没有启用。 +// 2. baseURL 为空时使用 SDK 默认地址。 +// 3. 这里只负责本地构造,不做连通性探测。 func NewArkResponsesClient(apiKey string, baseURL string, model string) *ArkResponsesClient { model = strings.TrimSpace(model) if model == "" { @@ -104,7 +97,7 @@ func (c *ArkResponsesClient) GenerateText(ctx context.Context, messages []ArkRes return result, nil } -// GenerateArkResponsesJSON 先调用 Responses,再解析为 JSON 结构体。 +// GenerateArkResponsesJSON 先调用 Responses,再解析成 JSON 结构体。 func GenerateArkResponsesJSON[T any](ctx context.Context, client *ArkResponsesClient, messages []ArkResponsesMessage, options ArkResponsesOptions) (*T, *ArkResponsesResult, error) { if client == nil { return nil, nil, errors.New("ark responses client is not ready") diff --git a/backend/infra/llm/client.go b/backend/services/llm/client.go similarity index 56% rename from backend/infra/llm/client.go rename to backend/services/llm/client.go index 4536971..2abedac 100644 --- a/backend/infra/llm/client.go +++ b/backend/services/llm/client.go @@ -9,12 +9,7 @@ import ( "github.com/cloudwego/eino/schema" ) -// ThinkingMode 描述本次模型调用对 thinking 的期望。 -// -// 职责边界: -// 1. 这里只表达“调用方希望怎样配置推理模式”; -// 2. 不直接绑定某个具体模型厂商的参数枚举; -// 3. 真正如何把它翻译成 ark / OpenAI / 其他 provider 的 option,由后续适配层负责。 +// ThinkingMode 描述这次模型调用对 thinking 的期望。 type ThinkingMode string const ( @@ -23,12 +18,7 @@ const ( ThinkingModeDisabled ThinkingMode = "disabled" ) -// GenerateOptions 是统一模型调用选项。 -// -// 设计目的: -// 1. 先把“每个 skill / worker 都会反复传的参数”收敛成一份结构; -// 2. 让上层以后只表达“我要什么”,不再自己重复组织 option; -// 3. 暂时不追求覆盖所有 provider 参数,先把最常用的几个公共位抽出来。 +// GenerateOptions 统一收敛文本调用时最常见的公共参数。 type GenerateOptions struct { Temperature float64 MaxTokens int @@ -36,40 +26,32 @@ type GenerateOptions struct { Metadata map[string]any } -// TextResult 是统一文本生成结果。 -// -// 职责边界: -// 1. Text 保存模型最终返回的纯文本; -// 2. Usage 保存本次调用的 token 使用量,供后续统一统计; -// 3. 不负责 JSON 解析,不负责业务字段映射。 +// TextResult 保存一次文本生成的最终结果和 usage。 +// 1. Text 存放模型返回的纯文本。 +// 2. Usage 方便上层做统一统计。 +// 3. 这里不负责 JSON 解析,也不负责业务字段映射。 type TextResult struct { - Text string - Usage *schema.TokenUsage - // FinishReason 透传 provider 的停止原因,便于上层判断是否因 length 等原因被截断。 + Text string + Usage *schema.TokenUsage FinishReason string } -// StreamReader 抽象了“可逐块 Recv 的流式返回器”。 -// -// 之所以不直接依赖某个具体 SDK 的 reader 类型,是因为现在还处在骨架收敛阶段, -// 后续接 ark、OpenAI 兼容层还是别的 provider,都可以往这个最小接口上适配。 +// StreamReader 抽象可以逐块读取消息的流式返回器。 type StreamReader interface { Recv() (*schema.Message, error) Close() error } -// TextGenerateFunc 是文本生成的统一适配函数签名。 +// TextGenerateFunc 定义统一文本生成函数签名。 type TextGenerateFunc func(ctx context.Context, messages []*schema.Message, options GenerateOptions) (*TextResult, error) -// StreamGenerateFunc 是流式生成的统一适配函数签名。 +// StreamGenerateFunc 定义统一流式生成函数签名。 type StreamGenerateFunc func(ctx context.Context, messages []*schema.Message, options GenerateOptions) (StreamReader, error) // Client 是统一模型客户端门面。 -// -// 职责边界: -// 1. 负责把调用方的“模型调用意图”收敛到统一入口; -// 2. 负责统一参数校验、空响应防御、GenerateJSON 复用; -// 3. 不负责写 prompt,不负责业务 fallback,也不直接持有具体厂商 SDK 细节。 +// 1. 只做最小输入校验和空响应防御。 +// 2. 不负责 prompt 拼装,也不负责业务 fallback。 +// 3. 具体 provider 的细节由上层适配器收敛进来。 type Client struct { generateText TextGenerateFunc streamText StreamGenerateFunc @@ -84,11 +66,6 @@ func NewClient(generateText TextGenerateFunc, streamText StreamGenerateFunc) *Cl } // GenerateText 执行一次统一文本生成。 -// -// 职责边界: -// 1. 负责做最小必要的入参校验; -// 2. 负责统一拦截“模型空响应”这类公共问题; -// 3. 不负责业务 prompt 拼接,也不负责把文本再映射成业务结构。 func (c *Client) GenerateText(ctx context.Context, messages []*schema.Message, options GenerateOptions) (*TextResult, error) { if c == nil || c.generateText == nil { return nil, errors.New("llm client is not ready") @@ -111,11 +88,6 @@ func (c *Client) GenerateText(ctx context.Context, messages []*schema.Message, o } // GenerateJSON 先走统一文本生成,再走统一 JSON 解析。 -// -// 设计说明: -// 1. 把“Generate -> 提取 JSON -> 反序列化”这段公共链路收敛起来; -// 2. 上层只关心业务结构,不需要重复实现解析样板; -// 3. 返回 parsed + rawResult,方便打点与回退时保留原文。 func GenerateJSON[T any](ctx context.Context, client *Client, messages []*schema.Message, options GenerateOptions) (*T, *TextResult, error) { result, err := client.GenerateText(ctx, messages, options) if err != nil { @@ -130,11 +102,6 @@ func GenerateJSON[T any](ctx context.Context, client *Client, messages []*schema } // Stream 打开统一流式调用入口。 -// -// 职责边界: -// 1. 只负责把“流式生成能力”暴露给上层; -// 2. 不负责 chunk 到 OpenAI 协议的转换,那部分应放在 stream/; -// 3. 不负责累计全文,也不负责 token 统计落库。 func (c *Client) Stream(ctx context.Context, messages []*schema.Message, options GenerateOptions) (StreamReader, error) { if c == nil || c.streamText == nil { return nil, errors.New("llm stream client is not ready") @@ -145,12 +112,7 @@ func (c *Client) Stream(ctx context.Context, messages []*schema.Message, options return c.streamText(ctx, messages, options) } -// BuildSystemUserMessages 构造最常见的“system + history + user”消息列表。 -// -// 设计说明: -// 1. 先把最稳定的消息编排方式沉淀下来,减少各业务域样板代码; -// 2. 只做消息切片装配,不做 prompt 生成; -// 3. 供 agent / memory 等多个能力域复用。 +// BuildSystemUserMessages 构造最常见的 system + history + user 消息列表。 func BuildSystemUserMessages(systemPrompt string, history []*schema.Message, userPrompt string) []*schema.Message { messages := make([]*schema.Message, 0, len(history)+2) if strings.TrimSpace(systemPrompt) != "" { @@ -165,7 +127,7 @@ func BuildSystemUserMessages(systemPrompt string, history []*schema.Message, use return messages } -// CloneUsage 深拷贝 token usage,避免后续多处累加时共享同一指针。 +// CloneUsage 深拷贝 token usage,避免后续累加时共享同一个指针。 func CloneUsage(usage *schema.TokenUsage) *schema.TokenUsage { if usage == nil { return nil @@ -174,12 +136,7 @@ func CloneUsage(usage *schema.TokenUsage) *schema.TokenUsage { return &copied } -// MergeUsage 合并两段 usage。 -// -// 合并策略: -// 1. 对“同一次调用不同流分片”的场景,取更大值作为最终值; -// 2. 对“多次独立调用累计”的场景,应由上层显式做加法,而不是用这个函数; -// 3. 该函数只适用于“同一次调用的分块 usage 收敛”。 +// MergeUsage 合并两段 usage,取各字段更大的值作为累计结果。 func MergeUsage(base *schema.TokenUsage, incoming *schema.TokenUsage) *schema.TokenUsage { if incoming == nil { return CloneUsage(base) @@ -207,7 +164,7 @@ func MergeUsage(base *schema.TokenUsage, incoming *schema.TokenUsage) *schema.To return &merged } -// FormatEmptyResponseError 统一生成“模型返回空结果”的错误文案。 +// FormatEmptyResponseError 统一模型空结果的错误文案。 func FormatEmptyResponseError(scene string) error { scene = strings.TrimSpace(scene) if scene == "" { diff --git a/backend/infra/llm/json.go b/backend/services/llm/json.go similarity index 67% rename from backend/infra/llm/json.go rename to backend/services/llm/json.go index 2d029ed..c402f6c 100644 --- a/backend/infra/llm/json.go +++ b/backend/services/llm/json.go @@ -7,12 +7,10 @@ import ( "strings" ) -// ParseJSONObject 解析模型返回中的 JSON 对象。 -// -// 职责边界: -// 1. 负责处理“模型输出前后夹杂解释文字 / markdown 代码块”的常见情况; -// 2. 负责提取最外层 JSON object 并反序列化为目标结构; -// 3. 不负责业务字段合法性校验,应由上层调用方自行校验。 +// ParseJSONObject 解析模型返回内容中的 JSON 对象。 +// 1. 先剥离常见的 markdown 代码块包装。 +// 2. 再从混合文本里提取最外层 JSON 对象。 +// 3. 这里只负责结构解析,不负责字段合法性校验。 func ParseJSONObject[T any](raw string) (*T, error) { clean := strings.TrimSpace(raw) if clean == "" { @@ -31,12 +29,7 @@ func ParseJSONObject[T any](raw string) (*T, error) { return &out, nil } -// ExtractJSONObject 从混合文本里提取第一个完整 JSON 对象。 -// -// 设计说明: -// 1. LLM 很容易输出“这里是结果:{...}”这种半结构化文本; -// 2. 这里用括号计数而不是正则,避免嵌套对象一多就误截断; -// 3. 目前只提取 object,不提取 array,因为当前契约基本都是对象。 +// ExtractJSONObject 从混合文本中提取第一个完整的 JSON 对象。 func ExtractJSONObject(text string) string { clean := trimMarkdownCodeFence(strings.TrimSpace(text)) if clean == "" { @@ -94,9 +87,6 @@ func trimMarkdownCodeFence(text string) string { return trimmed } - // 1. 去掉首行 ```json / ```; - // 2. 若末行是 ```,一并去掉; - // 3. 中间正文保持原样,避免破坏 JSON 的换行结构。 body := lines[1:] if len(body) > 0 && strings.TrimSpace(body[len(body)-1]) == "```" { body = body[:len(body)-1] diff --git a/backend/services/llm/service.go b/backend/services/llm/service.go new file mode 100644 index 0000000..8936c06 --- /dev/null +++ b/backend/services/llm/service.go @@ -0,0 +1,109 @@ +package llm + +import ( + "strings" + + "github.com/LoveLosita/smartflow/backend/inits" +) + +// Service 只负责统一暴露已经构造好的模型客户端,不负责 prompt 和业务编排。 +type Service struct { + liteClient *Client + proClient *Client + maxClient *Client + courseImageResponsesClient *ArkResponsesClient +} + +// Options 描述 llm-service 初始化时需要接管的启动期依赖。 +// 1. AIHub 仍然是当前进程内 Ark ChatModel 的来源,但服务层只保存统一 Client。 +// 2. CourseImageResponsesClient 允许外部预先注入,便于测试或特殊启动路径复用。 +// 3. 某个字段为空时不报错,直接保留 nil,交给上层继续走兼容降级。 +type Options struct { + AIHub *inits.AIHub + APIKey string + BaseURL string + CourseVisionModel string + CourseImageResponsesClient *ArkResponsesClient +} + +// AgentModelClients 一次性暴露 newAgent 图常用的模型分配结果。 +type AgentModelClients struct { + Chat *Client + Plan *Client + Execute *Client + Deliver *Client + Summary *Client +} + +// New 构造 llm-service。 +// 1. 不返回 error,是为了让上层继续按 nil 客户端做逐步降级。 +// 2. 只要 AIHub 已初始化,就把其中的 ChatModel 收敛成统一 Client。 +// 3. 课程图片解析客户端在这里统一构建,避免业务层直接依赖 Responses SDK。 +func New(opts Options) *Service { + svc := &Service{} + + if opts.AIHub != nil { + svc.liteClient = WrapArkClient(opts.AIHub.Lite) + svc.proClient = WrapArkClient(opts.AIHub.Pro) + svc.maxClient = WrapArkClient(opts.AIHub.Max) + } + + if opts.CourseImageResponsesClient != nil { + svc.courseImageResponsesClient = opts.CourseImageResponsesClient + } else { + apiKey := strings.TrimSpace(opts.APIKey) + baseURL := strings.TrimSpace(opts.BaseURL) + model := strings.TrimSpace(opts.CourseVisionModel) + if apiKey != "" && model != "" { + svc.courseImageResponsesClient = NewArkResponsesClient(apiKey, baseURL, model) + } + } + + return svc +} + +// LiteClient 返回低成本短输出模型客户端。 +func (s *Service) LiteClient() *Client { + if s == nil { + return nil + } + return s.liteClient +} + +// ProClient 返回默认复杂对话模型客户端。 +func (s *Service) ProClient() *Client { + if s == nil { + return nil + } + return s.proClient +} + +// MaxClient 返回深度推理模型客户端。 +func (s *Service) MaxClient() *Client { + if s == nil { + return nil + } + return s.maxClient +} + +// CourseImageResponsesClient 返回课程图片解析所用的 Responses 客户端。 +func (s *Service) CourseImageResponsesClient() *ArkResponsesClient { + if s == nil { + return nil + } + return s.courseImageResponsesClient +} + +// NewAgentModelClients 一次性返回 newAgent 图里常用的模型分配。 +func (s *Service) NewAgentModelClients() AgentModelClients { + if s == nil { + return AgentModelClients{} + } + return AgentModelClients{ + Chat: s.ProClient(), + Plan: s.MaxClient(), + Execute: s.MaxClient(), + Deliver: s.ProClient(), + Summary: s.LiteClient(), + } +} diff --git a/backend/infra/rag/service.go b/backend/services/rag/api.go similarity index 97% rename from backend/infra/rag/service.go rename to backend/services/rag/api.go index f8bce78..1cdfca9 100644 --- a/backend/infra/rag/service.go +++ b/backend/services/rag/api.go @@ -5,7 +5,7 @@ import ( "time" ) -// Runtime 是 RAG Infra 对业务侧暴露的唯一稳定方法面。 +// Runtime 是 RAG service 对业务侧暴露的唯一稳定方法面。 // // 职责边界: // 1. 负责承接 memory/web 两类语料的统一入库与检索入口; diff --git a/backend/infra/rag/chunk/text_chunker.go b/backend/services/rag/chunk/text_chunker.go similarity index 96% rename from backend/infra/rag/chunk/text_chunker.go rename to backend/services/rag/chunk/text_chunker.go index 39ed133..fb61c5d 100644 --- a/backend/infra/rag/chunk/text_chunker.go +++ b/backend/services/rag/chunk/text_chunker.go @@ -5,7 +5,7 @@ import ( "fmt" "strings" - "github.com/LoveLosita/smartflow/backend/infra/rag/core" + "github.com/LoveLosita/smartflow/backend/services/rag/core" ) // TextChunker 是默认文本切块器。 diff --git a/backend/infra/rag/config/config.go b/backend/services/rag/config/config.go similarity index 100% rename from backend/infra/rag/config/config.go rename to backend/services/rag/config/config.go diff --git a/backend/infra/rag/core/errors.go b/backend/services/rag/core/errors.go similarity index 100% rename from backend/infra/rag/core/errors.go rename to backend/services/rag/core/errors.go diff --git a/backend/infra/rag/core/interfaces.go b/backend/services/rag/core/interfaces.go similarity index 100% rename from backend/infra/rag/core/interfaces.go rename to backend/services/rag/core/interfaces.go diff --git a/backend/infra/rag/core/observer.go b/backend/services/rag/core/observer.go similarity index 97% rename from backend/infra/rag/core/observer.go rename to backend/services/rag/core/observer.go index d96d4fd..c00c31e 100644 --- a/backend/infra/rag/core/observer.go +++ b/backend/services/rag/core/observer.go @@ -21,7 +21,7 @@ const ( // ObserveEvent 描述一次统一观测事件。 // // 职责边界: -// 1. 只承载 RAG Infra 的结构化运行信息; +// 1. 只承载 RAG service 的结构化运行信息; // 2. 不绑定具体日志系统、指标系统或 tracing 实现; // 3. 字段内容应尽量稳定,便于后续统一接入全局观测平台。 type ObserveEvent struct { @@ -31,7 +31,7 @@ type ObserveEvent struct { Fields map[string]any } -// Observer 是 RAG Infra 的最小观测接口。 +// Observer 是 RAG service 的最小观测接口。 // // 职责边界: // 1. 负责消费结构化事件; diff --git a/backend/infra/rag/core/pipeline.go b/backend/services/rag/core/pipeline.go similarity index 100% rename from backend/infra/rag/core/pipeline.go rename to backend/services/rag/core/pipeline.go diff --git a/backend/infra/rag/core/types.go b/backend/services/rag/core/types.go similarity index 100% rename from backend/infra/rag/core/types.go rename to backend/services/rag/core/types.go diff --git a/backend/infra/rag/corpus/common.go b/backend/services/rag/corpus/common.go similarity index 100% rename from backend/infra/rag/corpus/common.go rename to backend/services/rag/corpus/common.go diff --git a/backend/infra/rag/corpus/memory_corpus.go b/backend/services/rag/corpus/memory_corpus.go similarity index 98% rename from backend/infra/rag/corpus/memory_corpus.go rename to backend/services/rag/corpus/memory_corpus.go index af461f8..62d9991 100644 --- a/backend/infra/rag/corpus/memory_corpus.go +++ b/backend/services/rag/corpus/memory_corpus.go @@ -7,7 +7,7 @@ import ( "strings" "time" - "github.com/LoveLosita/smartflow/backend/infra/rag/core" + "github.com/LoveLosita/smartflow/backend/services/rag/core" ) const memoryCorpusName = "memory" diff --git a/backend/infra/rag/corpus/web_corpus.go b/backend/services/rag/corpus/web_corpus.go similarity index 98% rename from backend/infra/rag/corpus/web_corpus.go rename to backend/services/rag/corpus/web_corpus.go index a3f4443..fd90724 100644 --- a/backend/infra/rag/corpus/web_corpus.go +++ b/backend/services/rag/corpus/web_corpus.go @@ -7,7 +7,7 @@ import ( "strings" "time" - "github.com/LoveLosita/smartflow/backend/infra/rag/core" + "github.com/LoveLosita/smartflow/backend/services/rag/core" ) const webCorpusName = "web" diff --git a/backend/infra/rag/embed/eino_embedder.go b/backend/services/rag/embed/eino_embedder.go similarity index 100% rename from backend/infra/rag/embed/eino_embedder.go rename to backend/services/rag/embed/eino_embedder.go diff --git a/backend/infra/rag/embed/mock_embedder.go b/backend/services/rag/embed/mock_embedder.go similarity index 100% rename from backend/infra/rag/embed/mock_embedder.go rename to backend/services/rag/embed/mock_embedder.go diff --git a/backend/infra/rag/factory.go b/backend/services/rag/factory.go similarity index 91% rename from backend/infra/rag/factory.go rename to backend/services/rag/factory.go index c1e8eb0..227c711 100644 --- a/backend/infra/rag/factory.go +++ b/backend/services/rag/factory.go @@ -7,12 +7,12 @@ import ( "os" "strings" - ragchunk "github.com/LoveLosita/smartflow/backend/infra/rag/chunk" - ragconfig "github.com/LoveLosita/smartflow/backend/infra/rag/config" - "github.com/LoveLosita/smartflow/backend/infra/rag/core" - ragembed "github.com/LoveLosita/smartflow/backend/infra/rag/embed" - ragrerank "github.com/LoveLosita/smartflow/backend/infra/rag/rerank" - ragstore "github.com/LoveLosita/smartflow/backend/infra/rag/store" + ragchunk "github.com/LoveLosita/smartflow/backend/services/rag/chunk" + ragconfig "github.com/LoveLosita/smartflow/backend/services/rag/config" + "github.com/LoveLosita/smartflow/backend/services/rag/core" + ragembed "github.com/LoveLosita/smartflow/backend/services/rag/embed" + ragrerank "github.com/LoveLosita/smartflow/backend/services/rag/rerank" + ragstore "github.com/LoveLosita/smartflow/backend/services/rag/store" ) // FactoryDeps 描述 Runtime 工厂所需的可选依赖。 diff --git a/backend/infra/rag/observe.go b/backend/services/rag/observe.go similarity index 92% rename from backend/infra/rag/observe.go rename to backend/services/rag/observe.go index e0d0ddb..97f7ed2 100644 --- a/backend/infra/rag/observe.go +++ b/backend/services/rag/observe.go @@ -3,7 +3,7 @@ package rag import ( "log" - "github.com/LoveLosita/smartflow/backend/infra/rag/core" + "github.com/LoveLosita/smartflow/backend/services/rag/core" ) // ObserveLevel 对外暴露统一观测等级别名,避免启动层直接依赖 core 细节。 diff --git a/backend/infra/rag/rag.go b/backend/services/rag/rag.go similarity index 60% rename from backend/infra/rag/rag.go rename to backend/services/rag/rag.go index c1c3283..25db141 100644 --- a/backend/infra/rag/rag.go +++ b/backend/services/rag/rag.go @@ -1,11 +1,11 @@ package rag import ( - "github.com/LoveLosita/smartflow/backend/infra/rag/chunk" - "github.com/LoveLosita/smartflow/backend/infra/rag/core" - "github.com/LoveLosita/smartflow/backend/infra/rag/embed" - "github.com/LoveLosita/smartflow/backend/infra/rag/rerank" - "github.com/LoveLosita/smartflow/backend/infra/rag/store" + "github.com/LoveLosita/smartflow/backend/services/rag/chunk" + "github.com/LoveLosita/smartflow/backend/services/rag/core" + "github.com/LoveLosita/smartflow/backend/services/rag/embed" + "github.com/LoveLosita/smartflow/backend/services/rag/rerank" + "github.com/LoveLosita/smartflow/backend/services/rag/store" ) // NewDefaultPipeline 构造默认可运行的 RAG Pipeline。 diff --git a/backend/infra/rag/rerank/eino_reranker.go b/backend/services/rag/rerank/eino_reranker.go similarity index 86% rename from backend/infra/rag/rerank/eino_reranker.go rename to backend/services/rag/rerank/eino_reranker.go index 931a246..6926eb8 100644 --- a/backend/infra/rag/rerank/eino_reranker.go +++ b/backend/services/rag/rerank/eino_reranker.go @@ -4,7 +4,7 @@ import ( "context" "errors" - "github.com/LoveLosita/smartflow/backend/infra/rag/core" + "github.com/LoveLosita/smartflow/backend/services/rag/core" ) // EinoReranker 是 Eino 重排器占位实现。 diff --git a/backend/infra/rag/rerank/noop_reranker.go b/backend/services/rag/rerank/noop_reranker.go similarity index 91% rename from backend/infra/rag/rerank/noop_reranker.go rename to backend/services/rag/rerank/noop_reranker.go index 2706873..22485d6 100644 --- a/backend/infra/rag/rerank/noop_reranker.go +++ b/backend/services/rag/rerank/noop_reranker.go @@ -4,7 +4,7 @@ import ( "context" "sort" - "github.com/LoveLosita/smartflow/backend/infra/rag/core" + "github.com/LoveLosita/smartflow/backend/services/rag/core" ) // NoopReranker 是默认重排器(仅按原 score 排序)。 diff --git a/backend/infra/rag/retrieve/vector_retriever.go b/backend/services/rag/retrieve/vector_retriever.go similarity index 97% rename from backend/infra/rag/retrieve/vector_retriever.go rename to backend/services/rag/retrieve/vector_retriever.go index 0d00950..7ff31e5 100644 --- a/backend/infra/rag/retrieve/vector_retriever.go +++ b/backend/services/rag/retrieve/vector_retriever.go @@ -5,7 +5,7 @@ import ( "fmt" "strings" - "github.com/LoveLosita/smartflow/backend/infra/rag/core" + "github.com/LoveLosita/smartflow/backend/services/rag/core" ) // VectorRetriever 是通用检索器(embed + vector search)。 diff --git a/backend/infra/rag/runtime.go b/backend/services/rag/runtime.go similarity index 97% rename from backend/infra/rag/runtime.go rename to backend/services/rag/runtime.go index 41eaaad..7c77a21 100644 --- a/backend/infra/rag/runtime.go +++ b/backend/services/rag/runtime.go @@ -7,9 +7,9 @@ import ( "strings" "time" - ragconfig "github.com/LoveLosita/smartflow/backend/infra/rag/config" - "github.com/LoveLosita/smartflow/backend/infra/rag/core" - "github.com/LoveLosita/smartflow/backend/infra/rag/corpus" + ragconfig "github.com/LoveLosita/smartflow/backend/services/rag/config" + "github.com/LoveLosita/smartflow/backend/services/rag/core" + "github.com/LoveLosita/smartflow/backend/services/rag/corpus" ) type runtime struct { @@ -343,7 +343,7 @@ func (r *runtime) recoverPublicPanic( return } - // 1. runtime 是 RAG Infra 对业务侧暴露的最终方法面,任何下层 panic 都不应再穿透到业务协程。 + // 1. runtime 是 RAG service 对业务侧暴露的最终方法面,任何下层 panic 都不应再穿透到业务协程。 // 2. 这里统一把 panic 转成 error,并补一条结构化观测,方便继续排查是哪一层依赖失控。 // 3. 保留 stack 是为了在“进程不崩”的前提下仍能定位根因,避免只剩一句 recovered 无法复盘。 panicErr := fmt.Errorf("rag runtime panic recovered: corpus=%s operation=%s panic=%v", corpusName, operation, recovered) diff --git a/backend/services/rag/service.go b/backend/services/rag/service.go new file mode 100644 index 0000000..fc38f79 --- /dev/null +++ b/backend/services/rag/service.go @@ -0,0 +1,111 @@ +package rag + +import ( + "context" + + ragconfig "github.com/LoveLosita/smartflow/backend/services/rag/config" +) + +// Options 描述 rag-service 需要持有的底层运行时。 +type Options struct { + Runtime Runtime +} + +// Service 是 rag-service 对外暴露的统一入口。 +// +// 职责边界: +// 1. 负责持有运行时,并把 memory / web 两条能力线统一收口到服务层。 +// 2. 负责在服务入口内完成基于配置的运行时装配。 +// 3. 不直接承载 chunk / embed / store 的实现细节,这些细节下沉到服务树内部子包。 +type Service struct { + runtime Runtime +} + +// New 使用调用方传入的运行时构造服务。 +func New(opts Options) *Service { + return &Service{runtime: opts.Runtime} +} + +// NewFromConfig 基于服务树内的配置与工厂能力构造自给自足的 RAG 服务。 +func NewFromConfig(ctx context.Context, cfg ragconfig.Config, deps FactoryDeps) (*Service, error) { + if !cfg.Enabled { + return New(Options{}), nil + } + runtime, err := NewRuntimeFromConfig(ctx, cfg, deps) + if err != nil { + return nil, err + } + return NewWithRuntime(runtime), nil +} + +// Runtime 返回当前服务持有的运行时。 +func (s *Service) Runtime() Runtime { + if s == nil { + return nil + } + return s.runtime +} + +// IngestMemory 写入记忆语料。 +func (s *Service) IngestMemory(ctx context.Context, req MemoryIngestRequest) (*IngestResult, error) { + if s == nil || s.runtime == nil { + return nil, nil + } + return s.runtime.IngestMemory(ctx, req) +} + +// RetrieveMemory 检索记忆语料。 +func (s *Service) RetrieveMemory(ctx context.Context, req MemoryRetrieveRequest) (*RetrieveResult, error) { + if s == nil || s.runtime == nil { + return nil, nil + } + return s.runtime.RetrieveMemory(ctx, req) +} + +// DeleteMemory 删除指定记忆文档。 +func (s *Service) DeleteMemory(ctx context.Context, documentIDs []string) error { + if s == nil || s.runtime == nil { + return nil + } + if ctx == nil { + ctx = context.Background() + } + return s.runtime.DeleteMemory(ctx, documentIDs) +} + +// IngestWeb 写入网页语料。 +func (s *Service) IngestWeb(ctx context.Context, req WebIngestRequest) (*IngestResult, error) { + if s == nil || s.runtime == nil { + return nil, nil + } + return s.runtime.IngestWeb(ctx, req) +} + +// RetrieveWeb 检索网页语料。 +func (s *Service) RetrieveWeb(ctx context.Context, req WebRetrieveRequest) (*RetrieveResult, error) { + if s == nil || s.runtime == nil { + return nil, nil + } + return s.runtime.RetrieveWeb(ctx, req) +} + +// EnsureRuntime 返回一个可继续向下传递的运行时引用。 +func (s *Service) EnsureRuntime() Runtime { + if s == nil { + return nil + } + return s.runtime +} + +// SetRuntime 允许在装配阶段延迟注入运行时。 +func (s *Service) SetRuntime(runtime Runtime) { + if s == nil { + return + } + s.runtime = runtime +} + +// NewWithRuntime 用显式运行时构造服务。 +func NewWithRuntime(runtime Runtime) *Service { + return New(Options{Runtime: runtime}) +} diff --git a/backend/infra/rag/store/inmemory_store.go b/backend/services/rag/store/inmemory_store.go similarity index 98% rename from backend/infra/rag/store/inmemory_store.go rename to backend/services/rag/store/inmemory_store.go index f15ba28..19e4df9 100644 --- a/backend/infra/rag/store/inmemory_store.go +++ b/backend/services/rag/store/inmemory_store.go @@ -10,7 +10,7 @@ import ( "sync" "time" - "github.com/LoveLosita/smartflow/backend/infra/rag/core" + "github.com/LoveLosita/smartflow/backend/services/rag/core" ) // InMemoryVectorStore 是本地开发用向量存储实现。 diff --git a/backend/infra/rag/store/milvus_store.go b/backend/services/rag/store/milvus_store.go similarity index 99% rename from backend/infra/rag/store/milvus_store.go rename to backend/services/rag/store/milvus_store.go index 2e8db58..39a92a6 100644 --- a/backend/infra/rag/store/milvus_store.go +++ b/backend/services/rag/store/milvus_store.go @@ -14,7 +14,7 @@ import ( "sync" "time" - "github.com/LoveLosita/smartflow/backend/infra/rag/core" + "github.com/LoveLosita/smartflow/backend/services/rag/core" ) // MilvusConfig 描述 Milvus REST 存储配置。 diff --git a/backend/infra/rag/store/vector_store.go b/backend/services/rag/store/vector_store.go similarity index 75% rename from backend/infra/rag/store/vector_store.go rename to backend/services/rag/store/vector_store.go index 3535fdb..5549461 100644 --- a/backend/infra/rag/store/vector_store.go +++ b/backend/services/rag/store/vector_store.go @@ -1,6 +1,6 @@ package store -import "github.com/LoveLosita/smartflow/backend/infra/rag/core" +import "github.com/LoveLosita/smartflow/backend/services/rag/core" // EnsureCompile 用于静态校验实现是否满足接口。 func EnsureCompile() { diff --git a/docs/backend/微服务四步迁移与第二阶段并行开发计划.md b/docs/backend/微服务四步迁移与第二阶段并行开发计划.md index 97e4bed..62f6ac1 100644 --- a/docs/backend/微服务四步迁移与第二阶段并行开发计划.md +++ b/docs/backend/微服务四步迁移与第二阶段并行开发计划.md @@ -36,6 +36,8 @@ 4. 消费侧已经按服务 consumer group 隔离,不再用一个 worker 吃全部事件。 5. 当前仍是单体进程内多 worker 装配;worker 后续会跟随对应服务一起迁出,不在阶段 1 直接拆进程。 +阶段 1.5 / 1.6 也已经先落地完毕:`backend/services/llm` 和 `backend/services/rag` 已经成为当前 canonical 入口,`backend/infra/llm` 和 `backend/infra/rag` 的 `.go` 旧实现已删除,仅保留迁移说明文档。当前仍然是单体进程内多 worker 装配,llm / rag 先完成服务化收口,还没有进入 gozero 进程拆分。 + 所以后续路线不是再补一次 outbox 基建,而是在这个阶段 1 基线上,按服务边界逐个把 gozero 服务、DAO / model / worker 和启动入口迁出去。 --- @@ -78,6 +80,8 @@ gozero 服务负责领域能力: > 说明:`agent` 和 `memory` 都可以单独成服务,不应再被写成“公共能力”;其中 `agent` 更像对外对话编排服务,`memory` 更像其支撑服务/worker 服务。 > > 说明:`llm-service` 先抽成全仓统一模型出口,`rag-service` 再抽成检索基础设施服务;`rag-service` 只能依赖 `llm-service`,不反向依赖具体业务服务。 +> +> 当前状态:`llm-service` / `rag-service` 这两个边界已经先做成 `backend/services/*` 的服务内模块,调用仍由 `backend/cmd/start.go` 在同一进程内装配,不是 gozero 独立进程。 ### 3.3 事件层 @@ -118,8 +122,8 @@ gozero 服务负责领域能力: | --- | --- | --- | --- | | 0 | 语义冻结和基线确认(已完成) | 阶段 0 已作为历史基线保存;后续只在契约变化时回看 | `go test ./...`,`api / worker / all` 启动 smoke | | 1 | Outbox v2 基建(已完成,当前基线) | 当前已具备阶段 1 保存点:服务级 outbox 表、topic、group 和多 worker 装配已打通 | 已完成健康检查、服务级 outbox 写入/投递/消费 smoke、Kafka group lag 核对 | -| 1.5 | 先抽 llm-service | 统一模型调用、provider 路由、流式输出和审计后 commit | course / active-scheduler / memory 模型调用 smoke | -| 1.6 | 再抽 rag-service | 向量化、召回、重排、检索能力跑通后 commit | memory retrieve / rerank smoke | +| 1.5 | 先抽 llm-service(已完成) | 已完成,`backend/services/llm` 作为当前 canonical 入口 | `go test ./...` + course / active-scheduler / memory 模型调用 smoke | +| 1.6 | 再抽 rag-service(已完成) | 已完成,`backend/services/rag` 作为当前 canonical 入口 | `go test ./...` + memory retrieve / rerank smoke | | 2 | 先拆 user/auth | user 路由、JWT 签发和 token 额度治理独立后 commit | 注册/登录/刷新/登出 smoke + token quota 回归 | | 3 | 再拆 notification | notification 服务能独立消费和重试后 commit | notification E2E smoke + worker-only smoke | | 4 | 再拆 active-scheduler | 预览生成和确认链路通过 gozero 服务跑通后 commit | dry-run / preview / confirm smoke | @@ -229,7 +233,7 @@ flowchart LR --- -### 4.4 阶段 1.5:先抽 llm-service +### 4.4 阶段 1.5:先抽 llm-service(已完成) 目标: @@ -237,6 +241,12 @@ flowchart LR 2. 让 `course`、`active-scheduler`、`memory`、`agent` 对模型调用的依赖先收口到统一服务。 3. 先把模型 provider 路由、流式输出、限流、审计这些共性收束起来,避免每个服务各写一份。 +当前状态: + +1. 代码已经落到 `backend/services/llm`。 +2. `backend/infra/llm` 的 `.go` 旧实现已删除,仅保留迁移说明。 +3. 仍由 `backend/cmd/start.go` 在同一进程内装配,尚未引入 gozero 独立服务进程。 + 这一步要做的事: 1. 把当前分散在业务服务里的模型调用入口改成统一调用 `llm-service`。 @@ -260,7 +270,7 @@ flowchart LR --- -### 4.5 阶段 1.6:再抽 rag-service +### 4.5 阶段 1.6:再抽 rag-service(已完成) 目标: @@ -268,6 +278,12 @@ flowchart LR 2. 让向量化、召回、重排、向量库读写先进入独立服务。 3. 明确 `rag-service` 只能依赖 `llm-service` 做 embedding / rerank,不反向依赖业务服务。 +当前状态: + +1. 代码已经落到 `backend/services/rag`。 +2. `backend/infra/rag` 的 `.go` 旧实现已删除,仅保留迁移说明。 +3. 仍由 `backend/cmd/start.go` 在同一进程内装配,尚未引入 gozero 独立服务进程。 + 这一步要做的事: 1. 把当前分散在 `memory`、`agent` 里的检索逻辑改成统一调用 `rag-service`。 @@ -474,6 +490,8 @@ flowchart LR 当前建议按这个顺序推进: +注:阶段 1.5 / 1.6 已完成,当前实际推进可从阶段 2 开始。 + 1. 以阶段 1 的服务级 outbox 为当前基线,不再回头做共享 outbox 方案。 2. 先切 llm-service,把统一模型出口从各业务服务里抽出去。 3. 再切 rag-service,把检索基础设施从 memory / agent 里抽出去。