Version: 0.9.34.dev.260421

后端:
1. 旧 Agent 管线(agent/)全面下线,共享逻辑迁移至 newAgent/
- 删除 backend/agent/ 整个目录(44 个 Go 文件),5 条旧专用流程已由 newAgent 统一 graph 取代
- 共享逻辑迁入 newAgent/:clone(shared/clone.go)、时间解析(shared/deadline.go)、优先级常量(shared/task_priority.go)、TaskQuery 类型(model/taskquery_types.go)、SystemPrompt(prompt/system.go)、Usage 合并(stream/usage.go)
2. service 层清除 agent/ 全部依赖
- 删除 4 个旧流程入口文件(agent_route / agent_quick_note / agent_schedule_plan / agent_schedule_refine)
- agent_task_query.go 删除 runTaskQueryFlow,参数类型切到 newagentmodel
- agent.go / agent_newagent.go / agent_schedule_preview.go / agent_schedule_state.go / cmd/start.go / quicknote.go:agent* 引用全部替换为 newagent*
3. 流式降级回退路径内联到 service 层(agent_stream_fallback.go),消除最后一条 agent/chat 依赖

前端:
1. ScheduleFineTuneModal 幂等键追加 classId 后缀,修复多任务类并行保存 key 重复
This commit is contained in:
LoveLosita
2026-04-21 20:10:16 +08:00
parent b309a32a98
commit 73ab0f43aa
60 changed files with 560 additions and 15261 deletions

View File

@@ -8,7 +8,6 @@ import (
"strings"
"time"
agentchat "github.com/LoveLosita/smartflow/backend/agent/chat"
"github.com/LoveLosita/smartflow/backend/conv"
"github.com/LoveLosita/smartflow/backend/dao"
outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox"
@@ -17,6 +16,7 @@ import (
memoryobserve "github.com/LoveLosita/smartflow/backend/memory/observe"
"github.com/LoveLosita/smartflow/backend/model"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt"
newagenttools "github.com/LoveLosita/smartflow/backend/newAgent/tools"
"github.com/LoveLosita/smartflow/backend/pkg"
eventsvc "github.com/LoveLosita/smartflow/backend/service/events"
@@ -318,7 +318,7 @@ func (s *AgentService) runNormalChatFlow(
// 3. 计算本次请求可用的历史 token 预算,并执行历史裁剪。
// 这样可以在上下文增长时稳定控制模型窗口,避免超长上下文引发报错或高延迟。
historyBudget := pkg.HistoryTokenBudgetByModel(resolvedModelName, agentchat.SystemPrompt, userMessage)
historyBudget := pkg.HistoryTokenBudgetByModel(resolvedModelName, newagentprompt.SystemPrompt, userMessage)
trimmedHistory, totalHistoryTokens, keptHistoryTokens, droppedCount := pkg.TrimHistoryByTokenBudget(chatHistory, historyBudget)
chatHistory = trimmedHistory
@@ -346,7 +346,7 @@ func (s *AgentService) runNormalChatFlow(
// 6. 执行真正的流式聊天。
// fullText 用于后续写 Redis/持久化outChan 用于把流片段实时推给前端。
fullText, reasoningText, reasoningDurationSeconds, streamUsage, streamErr := agentchat.StreamChat(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan, traceID, chatID, requestStart, assistantReasoningStartedAt)
fullText, reasoningText, reasoningDurationSeconds, streamUsage, streamErr := s.streamChatFallback(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan, assistantReasoningStartedAt)
if streamErr != nil {
pushErrNonBlocking(errChan, streamErr)
return

View File

@@ -18,9 +18,9 @@ import (
"github.com/cloudwego/eino/schema"
"github.com/spf13/viper"
agentchat "github.com/LoveLosita/smartflow/backend/agent/chat"
"github.com/LoveLosita/smartflow/backend/conv"
"github.com/LoveLosita/smartflow/backend/model"
newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt"
"github.com/LoveLosita/smartflow/backend/pkg"
"github.com/LoveLosita/smartflow/backend/respond"
eventsvc "github.com/LoveLosita/smartflow/backend/service/events"
@@ -393,7 +393,7 @@ func (s *AgentService) loadConversationContext(ctx context.Context, chatID, user
}
// 构造 ConversationContext。
conversationContext := newagentmodel.NewConversationContext(agentchat.SystemPrompt)
conversationContext := newagentmodel.NewConversationContext(newagentprompt.SystemPrompt)
if history != nil {
conversationContext.ReplaceHistory(history)
}

View File

@@ -1,303 +0,0 @@
package agentsvc
import (
"context"
"fmt"
"log"
"strings"
"time"
agentgraph "github.com/LoveLosita/smartflow/backend/agent/graph"
agentllm "github.com/LoveLosita/smartflow/backend/agent/llm"
agentmodel "github.com/LoveLosita/smartflow/backend/agent/model"
agentnode "github.com/LoveLosita/smartflow/backend/agent/node"
agentrouter "github.com/LoveLosita/smartflow/backend/agent/router"
agentstream "github.com/LoveLosita/smartflow/backend/agent/stream"
"github.com/LoveLosita/smartflow/backend/model"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/google/uuid"
)
// quickNoteRoutingDecision 只是路由层结果的本地别名。
// 保留这个别名是为了尽量少改调用侧agent.go 中的字段访问保持不变)。
type quickNoteRoutingDecision = agentrouter.RoutingDecision
// quickNoteProgressEmitter 负责把“链路阶段状态”伪装成 OpenAI 兼容的 reasoning_content chunk。
// 设计目标:
// 1) 不改现有 OpenAI 兼容协议外壳;
// 2) 让 Apifox 在等待期间也能看到“思考块”,避免用户空等;
// 3) 该 emitter 只负责状态,不负责最终正文回复和 [DONE] 结束块。
type quickNoteProgressEmitter struct {
outChan chan<- string
modelName string
requestID string
created int64
enablePush bool
reasoning strings.Builder
startedAt *time.Time
}
// newQuickNoteProgressEmitter 构造“阶段进度推送器”。
// 该推送器只负责发 reasoning 块,不负责正文回复。
func newQuickNoteProgressEmitter(outChan chan<- string, modelName string, enable bool) *quickNoteProgressEmitter {
// 1. 模型名兜底,避免出现空 model 字段导致客户端兼容性问题。
resolvedModel := strings.TrimSpace(modelName)
if resolvedModel == "" {
resolvedModel = "worker"
}
// 2. 每次请求生成独立 request_id方便前端或日志侧关联本次流式输出。
return &quickNoteProgressEmitter{
outChan: outChan,
modelName: resolvedModel,
requestID: "chatcmpl-" + uuid.NewString(),
created: time.Now().Unix(),
enablePush: enable,
}
}
// Emit 按“阶段 + 说明”输出 reasoning_content。
// 注意:
// 1) 这里不输出 role避免和后续正文 role 块冲突;
// 2) 即使发送失败,也只记录日志,不影响主流程继续执行。
func (e *quickNoteProgressEmitter) Emit(stage, detail string) {
// 1. 推送器不可用nil/禁用/无通道)时直接返回,避免 panic。
if e == nil || !e.enablePush || e.outChan == nil {
return
}
// 2. 统一清理空白,避免日志和输出里出现异常空字符串。
stage = strings.TrimSpace(stage)
detail = strings.TrimSpace(detail)
if stage == "" && detail == "" {
return
}
if e.startedAt == nil {
now := time.Now()
e.startedAt = &now
}
if e.reasoning.Len() > 0 {
e.reasoning.WriteString("\n\n")
}
if stage != "" {
e.reasoning.WriteString("阶段:")
e.reasoning.WriteString(stage)
}
if detail != "" {
if stage != "" {
e.reasoning.WriteString("\n")
}
e.reasoning.WriteString(detail)
}
// 3. 调用目的:阶段提示统一走 Agent/stream 的 reasoning chunk 包装,
// 避免 service 层继续自己拼 OpenAI 兼容 JSON。
err := agentstream.EmitStageAsReasoning(func(payload string) error {
e.outChan <- payload
return nil
}, e.requestID, e.modelName, e.created, stage, detail, false)
if err != nil {
// 3.1 阶段推送失败不应影响主链路,只打日志即可。
log.Printf("输出随口记阶段状态失败 stage=%s err=%v", stage, err)
return
}
}
func (e *quickNoteProgressEmitter) HistoryText() string {
if e == nil {
return ""
}
return strings.TrimSpace(e.reasoning.String())
}
func (e *quickNoteProgressEmitter) StartedAt() *time.Time {
if e == nil || e.startedAt == nil {
return nil
}
startCopy := *e.startedAt
return &startCopy
}
func (e *quickNoteProgressEmitter) DurationSeconds(end time.Time) int {
if e == nil || e.startedAt == nil {
return 0
}
if !end.After(*e.startedAt) {
return 0
}
return int(end.Sub(*e.startedAt) / time.Second)
}
// tryHandleQuickNoteWithGraph 尝试用“随口记 graph”处理本次用户输入。
// 返回值语义:
// 1) handled=true本次请求已在随口记链路处理完成成功/失败都会返回文案);
// 2) handled=false不是随口记意图调用方应回落普通聊天链路
// 3) state用于拼接最终“一次性正文回复”。
func (s *AgentService) tryHandleQuickNoteWithGraph(
ctx context.Context,
selectedModel *ark.ChatModel,
userMessage string,
userID int,
chatID string,
traceID string,
trustRoute bool,
emitStage func(stage, detail string),
) (handled bool, state *agentmodel.QuickNoteState, err error) {
// 1. 依赖预检taskRepo 或模型未注入时,不做随口记处理,交给上层回落聊天。
if s.taskRepo == nil || selectedModel == nil {
return false, nil, nil
}
// 2. 初始化随口记状态对象(贯穿 graph 全流程的共享上下文)。
state = agentmodel.NewQuickNoteState(traceID, userID, chatID, userMessage)
// 3. 执行 quick note graph。
// 本次依赖注入了两个“工具能力”:
// 3.1 ResolveUserID从当前请求上下文确定 user_id
// 3.2 CreateTask真正执行任务写库。
finalState, runErr := agentgraph.RunQuickNoteGraph(ctx, agentnode.QuickNoteGraphRunInput{
Model: selectedModel,
State: state,
Deps: agentnode.QuickNoteToolDeps{
ResolveUserID: func(ctx context.Context) (int, error) {
// 当前链路 userID 已由上层鉴权拿到,这里直接复用。
return userID, nil
},
CreateTask: func(ctx context.Context, req agentnode.QuickNoteCreateTaskRequest) (*agentnode.QuickNoteCreateTaskResult, error) {
// 3.2.1 把 quick note 的工具入参映射成项目 Task 模型。
taskModel := &model.Task{
UserID: req.UserID,
Title: req.Title,
Priority: req.PriorityGroup,
IsCompleted: false,
DeadlineAt: req.DeadlineAt,
UrgencyThresholdAt: req.UrgencyThresholdAt,
}
// 3.2.2 调用 DAO 写库。
created, createErr := s.taskRepo.AddTask(taskModel)
if createErr != nil {
return nil, createErr
}
// 3.2.3 把写库结果回填给 graph 状态,用于后续回复拼装。
return &agentnode.QuickNoteCreateTaskResult{
TaskID: created.ID,
Title: created.Title,
PriorityGroup: created.Priority,
DeadlineAt: created.DeadlineAt,
UrgencyThresholdAt: created.UrgencyThresholdAt,
}, nil
},
},
SkipIntentVerification: trustRoute,
EmitStage: emitStage,
})
if runErr != nil {
// 4. graph 执行失败由上层统一决定是否回退普通聊天。
return false, nil, runErr
}
// 5. graph 正常结束但判定“非随口记”时,明确返回 handled=false。
if finalState == nil || !finalState.IsQuickNoteIntent {
return false, nil, nil
}
// 6. 走到这里表示随口记链路已完成(含写库成功或业务失败反馈文案)。
return true, finalState, nil
}
// emitSingleAssistantCompletion 将单条完整回复包装成 OpenAI 兼容 chunk 流并写入 outChan。
// 说明:
// 1) 保持现有 OpenAI 兼容格式不变;
// 2) 正文只发一次,不做伪分段。
func emitSingleAssistantCompletion(outChan chan<- string, modelName, reply string) error {
// 1. 模型名兜底,保持 OpenAI 兼容响应字段完整。
if strings.TrimSpace(modelName) == "" {
modelName = "worker"
}
requestID := "chatcmpl-" + uuid.NewString()
created := time.Now().Unix()
emit := func(payload string) error {
outChan <- payload
return nil
}
if err := agentstream.EmitAssistantReply(emit, requestID, modelName, created, reply, true); err != nil {
return err
}
if err := agentstream.EmitFinish(emit, requestID, modelName, created); err != nil {
return err
}
return agentstream.EmitDone(emit)
}
// buildQuickNoteFinalReply 生成最终的一次性正文回复。
// 组合策略:
// 1) 任务事实(标题/优先级/截止时间)由后端拼接,确保准确;
// 2) 轻松跟进句交给 AI 生成,贴合用户话题;
// 3) AI 生成失败时自动降级为固定友好文案,保证稳定可用。
func buildQuickNoteFinalReply(ctx context.Context, selectedModel *ark.ChatModel, userMessage string, state *agentmodel.QuickNoteState) string {
// 1. 极端兜底:状态为空时给出稳定失败文案,避免返回空字符串。
if state == nil {
return "我这次没成功记上,别急,再发我一次我马上补上。"
}
// 仅当“确实拿到了有效 task_id”时才走成功文案避免出现“回复成功但库里没数据”的错觉。
if state.Persisted && state.PersistedTaskID > 0 {
// 2. 组装“事实段”:标题 + 优先级 + 截止时间。
title := strings.TrimSpace(state.ExtractedTitle)
if title == "" {
title = "这条任务"
}
priorityText := "已安排优先级"
if agentmodel.IsValidTaskPriority(state.ExtractedPriority) {
priorityText = fmt.Sprintf("优先级:%s", agentmodel.PriorityLabelCN(state.ExtractedPriority))
}
deadlineText := ""
if state.ExtractedDeadline != nil {
deadlineText = fmt.Sprintf(";截止时间 %s", state.ExtractedDeadline.In(time.Local).Format("2006-01-02 15:04"))
}
factLine := fmt.Sprintf("好,给你安排上了:%s%s%s。", title, priorityText, deadlineText)
// 2.1 如果 graph 单次请求已生成 banter直接使用避免重复调用模型。
if strings.TrimSpace(state.ExtractedBanter) != "" {
return factLine + " " + strings.TrimSpace(state.ExtractedBanter)
}
// 2.2 聚合调用模式下,通常已在主流程完成风格化,给稳定文案即可。
if state.PlannedBySingleCall {
return factLine + " 已帮你稳稳记下,放心推进。"
}
// 2.3 兜底生成轻松跟进句;失败则降级固定文案,确保体验连续。
banter, err := agentllm.GenerateQuickNoteBanter(ctx, selectedModel, userMessage, title, priorityText, deadlineText)
if err != nil {
return factLine + " 这下可以先安心推进,不用等 ddl 来敲门了。"
}
if strings.TrimSpace(banter) == "" {
return factLine + " 这下可以先安心推进,不用等 ddl 来敲门了。"
}
return factLine + " " + banter
}
// 3. 若时间校验失败,优先返回“可执行的修正引导”。
if strings.TrimSpace(state.DeadlineValidationError) != "" {
return "我识别到你给了时间但格式不够明确暂时不敢乱记。你可以改成比如2026-03-20 18:30、明天下午3点、下周一上午9点我立刻帮你安排。"
}
// 4. 若 graph 已给出助手回复(例如非意图/业务失败原因),优先透传。
if strings.TrimSpace(state.AssistantReply) != "" {
return strings.TrimSpace(state.AssistantReply)
}
// 5. 最终兜底文案。
return "这次没成功写入任务,我没跑路,再给我一次我就把它稳稳记上。"
}
// decideQuickNoteRouting 决定当前输入是否进入“随口记 graph”。
// 该函数只是服务层薄封装,具体控制码解析逻辑已下沉到 Agent/router 包。
func (s *AgentService) decideQuickNoteRouting(ctx context.Context, selectedModel *ark.ChatModel, userMessage string) quickNoteRoutingDecision {
// 这里保留方法是为了让 AgentService 对外语义完整,
// 同时避免上层调用方直接依赖 Agent/router降低耦合。
_ = s
return agentrouter.DecideQuickNoteRouting(ctx, selectedModel, userMessage)
}

View File

@@ -1,27 +0,0 @@
package agentsvc
import (
"context"
agentrouter "github.com/LoveLosita/smartflow/backend/agent/router"
"github.com/cloudwego/eino-ext/components/model/ark"
)
// actionRoutingDecision 是 route 层分流结果在 agentsvc 的本地别名。
//
// 设计目的:
// 1. 让 AgentService 对 route 包保持“最小接触面”;
// 2. 后续若 route 包返回结构调整,只需改这个桥接文件。
type actionRoutingDecision = agentrouter.RoutingDecision
// decideActionRouting 决定当前请求走向哪条业务链路。
//
// 职责边界:
// 1. 只负责调用 route 包拿分流结论;
// 2. 不负责执行任何业务节点;
// 3. route 层失败会通过 RoutingDecision.RouteFailed 向上层显式暴露。
func (s *AgentService) decideActionRouting(ctx context.Context, selectedModel *ark.ChatModel, userMessage string) actionRoutingDecision {
// 这里保留方法封装,是为了避免上层直接依赖 route 包,降低耦合。
_ = s
return agentrouter.DecideActionRouting(ctx, selectedModel, userMessage)
}

View File

@@ -1,162 +0,0 @@
package agentsvc
import (
"context"
"errors"
"log"
"strings"
agentgraph "github.com/LoveLosita/smartflow/backend/agent/graph"
agentmodel "github.com/LoveLosita/smartflow/backend/agent/model"
agentnode "github.com/LoveLosita/smartflow/backend/agent/node"
"github.com/LoveLosita/smartflow/backend/conv"
"github.com/LoveLosita/smartflow/backend/model"
"github.com/LoveLosita/smartflow/backend/pkg"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/cloudwego/eino/schema"
"github.com/spf13/viper"
)
// runSchedulePlanFlow 执行“智能排程”分支。
//
// 职责边界:
// 1. 负责把本次请求接入 scheduleplan graph并注入运行依赖。
// 2. 负责读取对话历史(优先 Redis未命中再回源 DB用于连续对话微调。
// 3. 负责把排程预览快照写入 Redis供查询接口拉取 JSON
// 4. 负责返回给上层“可直接发给用户的最终文本回复”。
// 5. 不负责聊天持久化(由 AgentChat 主链路统一处理)。
func (s *AgentService) runSchedulePlanFlow(
ctx context.Context,
selectedModel *ark.ChatModel,
userMessage string,
userID int,
chatID string,
traceID string,
extra map[string]any,
emitStage func(stage, detail string),
outChan chan<- string,
modelName string,
) (string, error) {
// 1. 依赖预检:缺硬依赖时直接失败,避免进入 graph 后才出现空指针或半途失败。
// 1.1 SmartPlanningMultiRaw / HybridScheduleWithPlanMulti / ResolvePlanningWindow 任一缺失都无法继续。
// 1.2 selectedModel 为空时无法执行 LLM 节点,直接返回错误由上层处理。
if s.SmartPlanningMultiRawFunc == nil || s.HybridScheduleWithPlanMultiFunc == nil || s.ResolvePlanningWindowFunc == nil {
return "", errors.New("schedule plan service dependencies are not ready")
}
if selectedModel == nil {
return "", errors.New("schedule plan model is nil")
}
// 2. 连续对话微调前置处理:先尝试读取“上一版预览快照”,再清理旧 key。
// 2.1 先读后删的原因:
// 2.1.1 若先删再读,会丢失“连续微调起点”;
// 2.1.2 先读可让本轮在内存中复用上轮 HybridEntries。
// 2.2 清理旧 key 仍然保留,避免前端在本轮进行中误读到旧结果。
var previousPreview *model.SchedulePlanPreviewCache
if s.cacheDAO != nil {
preview, getErr := s.cacheDAO.GetSchedulePlanPreviewFromCache(ctx, userID, chatID)
if getErr != nil {
log.Printf("读取上一版排程预览失败 chat_id=%s: %v", chatID, getErr)
} else {
previousPreview = preview
}
if delErr := s.cacheDAO.DeleteSchedulePlanPreviewFromCache(ctx, userID, chatID); delErr != nil {
log.Printf("清理旧排程预览失败 chat_id=%s: %v", chatID, delErr)
}
}
// 2.3 Redis miss 时回落 MySQL 快照:
// 2.3.1 目的:即使 Redis TTL 过期,也能延续同会话微调语境;
// 2.3.2 回填:命中 DB 后尝试回填 Redis提高后续读取命中率
// 2.3.3 失败策略DB 读取异常只打日志,链路继续按“无历史快照”执行。
if previousPreview == nil && s.repo != nil {
snapshot, snapshotErr := s.repo.GetScheduleStateSnapshot(ctx, userID, chatID)
if snapshotErr != nil {
log.Printf("从 MySQL 读取排程快照失败 chat_id=%s: %v", chatID, snapshotErr)
} else if snapshot != nil {
previousPreview = snapshotToSchedulePlanPreviewCache(snapshot)
if s.cacheDAO != nil && previousPreview != nil {
if setErr := s.cacheDAO.SetSchedulePlanPreviewToCache(ctx, userID, chatID, previousPreview); setErr != nil {
log.Printf("回填排程预览缓存失败 chat_id=%s: %v", chatID, setErr)
}
}
}
}
// 3. 读取对话历史:先快后稳。
// 3.1 先查 Redis命中则避免回源 DB降低请求时延。
// 3.2 Redis 异常仅记录日志,不中断主流程(回源 DB 兜底)。
var chatHistory []*schema.Message
if s.agentCache != nil {
history, err := s.agentCache.GetHistory(ctx, chatID)
if err != nil {
log.Printf("获取排程对话历史失败 chat_id=%s: %v", chatID, err)
} else if history != nil {
chatHistory = history
}
}
// 3.3 Redis 未命中时回源 DB保证链路在缓存波动时仍可用。
// 3.4 DB 回源失败同样只记日志并继续,让 graph 按“无历史”降级运行。
if chatHistory == nil && s.repo != nil {
histories, hisErr := s.repo.GetUserChatHistories(ctx, userID, pkg.HistoryFetchLimitByModel("worker"), chatID)
if hisErr != nil {
log.Printf("回源 DB 获取排程对话历史失败 chat_id=%s: %v", chatID, hisErr)
} else {
chatHistory = conv.ToEinoMessages(histories)
}
}
// 4. 执行 graph 主流程。
// 4.1 这里只负责参数拼装与调用,不在 service 层重复实现 graph 节点逻辑。
// 4.2 并发度/预算从配置注入,避免把调优参数写死在代码中。
state := agentmodel.NewSchedulePlanState(traceID, userID, chatID)
// 4.3 连续对话微调注入:
// 4.3.1 若命中上轮预览,则把任务类/混合条目/分配结果注入 state
// 4.3.2 这样 rough_build 可按需复用旧底板,避免每轮都重新粗排。
if previousPreview != nil {
state.HasPreviousPreview = true
state.PreviousTaskClassIDs = append([]int(nil), previousPreview.TaskClassIDs...)
state.PreviousHybridEntries = cloneHybridEntries(previousPreview.HybridEntries)
state.PreviousAllocatedItems = cloneTaskClassItems(previousPreview.AllocatedItems)
state.PreviousCandidatePlans = cloneWeekSchedules(previousPreview.CandidatePlans)
}
finalState, runErr := agentgraph.RunSchedulePlanGraph(ctx, agentnode.SchedulePlanGraphRunInput{
Model: selectedModel,
State: state,
Deps: agentnode.SchedulePlanToolDeps{
SmartPlanningMultiRaw: s.SmartPlanningMultiRawFunc,
HybridScheduleWithPlanMulti: s.HybridScheduleWithPlanMultiFunc,
ResolvePlanningWindow: s.ResolvePlanningWindowFunc,
},
UserMessage: userMessage,
Extra: extra,
ChatHistory: chatHistory,
EmitStage: emitStage,
OutChan: outChan,
ModelName: modelName,
DailyRefineConcurrency: viper.GetInt("agent.dailyRefineConcurrency"),
WeeklyAdjustBudget: viper.GetInt("agent.weeklyAdjustBudget"),
})
if runErr != nil {
// 4.3 graph 失败直接上抛,由上层决定回落或报错。
return "", runErr
}
// 5. 组装最终回复文本。
// 5.1 明确移除“把排程结果序列化成 JSON 文本直接回传”的抽象,
// 避免在 SSE 聊天链路里吐出原始 JSON影响前端展示与用户体验。
// 5.2 当 finalState 为空或 summary 为空时,返回统一兜底文案,保证接口有稳定输出。
if finalState == nil {
return "排程流程异常,请稍后重试。", nil
}
reply := strings.TrimSpace(finalState.FinalSummary)
if reply == "" {
reply = "排程流程已完成,但未生成结果摘要。"
}
// 6. 旁路写入排程预览缓存(结构化 JSON给查询接口拉取。
// 6.1 失败只记日志,不影响本次对话回复;
// 6.2 成功后前端可通过 conversation_id 获取 candidate_plans。
s.saveSchedulePlanPreview(ctx, userID, chatID, finalState)
return reply, nil
}

View File

@@ -7,63 +7,11 @@ import (
"strings"
"time"
agentmodel "github.com/LoveLosita/smartflow/backend/agent/model"
agentshared "github.com/LoveLosita/smartflow/backend/agent/shared"
"github.com/LoveLosita/smartflow/backend/model"
newagentshared "github.com/LoveLosita/smartflow/backend/newAgent/shared"
"github.com/LoveLosita/smartflow/backend/respond"
)
// saveSchedulePlanPreview 负责把排程结果同步写入“查询预览”所需的缓存与快照。
//
// 职责边界:
// 1. 负责把 graph 最终状态映射为统一预览 DTO并先写 Redis、再写 MySQL 快照。
// 2. 负责执行“失败不阻断主回复”的旁路持久化策略,避免影响聊天主链路。
// 3. 不负责 SSE 输出,不负责聊天消息落库,也不负责 refine 状态到 plan 状态的转换。
func (s *AgentService) saveSchedulePlanPreview(ctx context.Context, userID int, chatID string, finalState *agentmodel.SchedulePlanState) {
// 1. 先做最小前置校验,避免把空状态或空会话写成脏快照。
if s == nil || finalState == nil {
return
}
normalizedChatID := strings.TrimSpace(chatID)
if normalizedChatID == "" {
return
}
// 2. 组装统一预览缓存结构。
// 2.1 summary 为空时使用统一兜底文案,保证查询接口始终有稳定输出。
// 2.2 所有切片字段都做深拷贝,避免缓存与 graph state 共享底层数组。
preview := &model.SchedulePlanPreviewCache{
UserID: userID,
ConversationID: normalizedChatID,
TraceID: strings.TrimSpace(finalState.TraceID),
Summary: schedulePlanSummaryOrFallback(strings.TrimSpace(finalState.FinalSummary)),
CandidatePlans: cloneWeekSchedules(finalState.CandidatePlans),
TaskClassIDs: append([]int(nil), finalState.TaskClassIDs...),
HybridEntries: cloneHybridEntries(finalState.HybridEntries),
AllocatedItems: cloneTaskClassItems(finalState.AllocatedItems),
GeneratedAt: time.Now(),
}
// 3. 先写 Redis 预览,保证前端查询链路优先命中低时延缓存。
// 3.1 Redis 写失败只记日志,不中断主流程;
// 3.2 真正兜底由后续 MySQL 快照承担。
if s.cacheDAO != nil {
if err := s.cacheDAO.SetSchedulePlanPreviewToCache(ctx, userID, normalizedChatID, preview); err != nil {
log.Printf("写入排程预览缓存失败 chat_id=%s: %v", normalizedChatID, err)
}
}
// 4. 再写 MySQL 快照,保证缓存失效后仍能恢复预览与连续微调上下文。
// 4.1 这里继续采用“同步写快照”的策略,因为下一轮 refine 依赖强一致读取;
// 4.2 写库失败同样只记日志,避免让用户侧回复因为旁路持久化失败而中断。
if s.repo != nil {
snapshot := buildSchedulePlanSnapshotFromState(userID, normalizedChatID, finalState)
if err := s.repo.UpsertScheduleStateSnapshot(ctx, snapshot); err != nil {
log.Printf("写入排程状态快照失败 chat_id=%s: %v", normalizedChatID, err)
}
}
}
// GetSchedulePlanPreview 按 conversation_id 读取结构化排程预览。
//
// 职责边界:
@@ -81,8 +29,6 @@ func (s *AgentService) GetSchedulePlanPreview(ctx context.Context, userID int, c
}
// 2. 优先查 Redis。
// 2.1 命中后立即校验 user_id避免把别人的会话预览泄露给当前用户
// 2.2 缓存异常直接上抛,由接口层统一处理错误响应。
if s.cacheDAO != nil {
preview, err := s.cacheDAO.GetSchedulePlanPreviewFromCache(ctx, userID, normalizedChatID)
if err != nil {
@@ -92,7 +38,7 @@ func (s *AgentService) GetSchedulePlanPreview(ctx context.Context, userID int, c
if preview.UserID > 0 && preview.UserID != userID {
return nil, respond.SchedulePlanPreviewNotFound
}
plans := cloneWeekSchedules(preview.CandidatePlans)
plans := newagentshared.CloneWeekSchedules(preview.CandidatePlans)
if plans == nil {
plans = make([]model.UserWeekSchedule, 0)
}
@@ -101,7 +47,7 @@ func (s *AgentService) GetSchedulePlanPreview(ctx context.Context, userID int, c
TraceID: strings.TrimSpace(preview.TraceID),
Summary: strings.TrimSpace(preview.Summary),
CandidatePlans: plans,
HybridEntries: cloneHybridEntries(preview.HybridEntries),
HybridEntries: newagentshared.CloneHybridEntries(preview.HybridEntries),
TaskClassIDs: preview.TaskClassIDs,
GeneratedAt: preview.GeneratedAt,
}, nil
@@ -109,8 +55,6 @@ func (s *AgentService) GetSchedulePlanPreview(ctx context.Context, userID int, c
}
// 3. Redis 未命中时回源 MySQL。
// 3.1 命中快照后顺手回填 Redis提高后续命中率
// 3.2 DB 未命中才真正返回 not found避免缓存过期造成假阴性。
if s.repo != nil {
snapshot, err := s.repo.GetScheduleStateSnapshot(ctx, userID, normalizedChatID)
if err != nil {
@@ -131,49 +75,6 @@ func (s *AgentService) GetSchedulePlanPreview(ctx context.Context, userID int, c
return nil, respond.SchedulePlanPreviewNotFound
}
// cloneWeekSchedules 负责深拷贝周视图排程,避免缓存与运行态共享底层切片。
func cloneWeekSchedules(src []model.UserWeekSchedule) []model.UserWeekSchedule {
return agentshared.CloneWeekSchedules(src)
}
// cloneHybridEntries 负责深拷贝混合排程条目,避免跨请求污染。
func cloneHybridEntries(src []model.HybridScheduleEntry) []model.HybridScheduleEntry {
return agentshared.CloneHybridEntries(src)
}
// cloneTaskClassItems 负责深拷贝任务项切片,包含内部指针字段的安全复制。
func cloneTaskClassItems(src []model.TaskClassItem) []model.TaskClassItem {
return agentshared.CloneTaskClassItems(src)
}
// buildSchedulePlanSnapshotFromState 把 graph 最终状态映射成可持久化的快照 DTO。
//
// 职责边界:
// 1. 负责字段归一化、深拷贝和 state_version 补齐。
// 2. 不负责数据库写入,也不负责生成业务摘要文案。
func buildSchedulePlanSnapshotFromState(userID int, conversationID string, st *agentmodel.SchedulePlanState) *model.SchedulePlanStateSnapshot {
if st == nil {
return nil
}
return &model.SchedulePlanStateSnapshot{
UserID: userID,
ConversationID: conversationID,
StateVersion: model.SchedulePlanStateVersionV1,
TaskClassIDs: append([]int(nil), st.TaskClassIDs...),
Constraints: append([]string(nil), st.Constraints...),
HybridEntries: cloneHybridEntries(st.HybridEntries),
AllocatedItems: cloneTaskClassItems(st.AllocatedItems),
CandidatePlans: cloneWeekSchedules(st.CandidatePlans),
UserIntent: strings.TrimSpace(st.UserIntent),
Strategy: strings.TrimSpace(st.Strategy),
AdjustmentScope: strings.TrimSpace(st.AdjustmentScope),
RestartRequested: st.RestartRequested,
FinalSummary: strings.TrimSpace(st.FinalSummary),
Completed: st.Completed,
TraceID: strings.TrimSpace(st.TraceID),
}
}
// snapshotToSchedulePlanPreviewCache 把 MySQL 快照映射成 Redis 预览缓存结构。
func snapshotToSchedulePlanPreviewCache(snapshot *model.SchedulePlanStateSnapshot) *model.SchedulePlanPreviewCache {
if snapshot == nil {
@@ -188,10 +89,10 @@ func snapshotToSchedulePlanPreviewCache(snapshot *model.SchedulePlanStateSnapsho
ConversationID: snapshot.ConversationID,
TraceID: strings.TrimSpace(snapshot.TraceID),
Summary: schedulePlanSummaryOrFallback(strings.TrimSpace(snapshot.FinalSummary)),
CandidatePlans: cloneWeekSchedules(snapshot.CandidatePlans),
CandidatePlans: newagentshared.CloneWeekSchedules(snapshot.CandidatePlans),
TaskClassIDs: append([]int(nil), snapshot.TaskClassIDs...),
HybridEntries: cloneHybridEntries(snapshot.HybridEntries),
AllocatedItems: cloneTaskClassItems(snapshot.AllocatedItems),
HybridEntries: newagentshared.CloneHybridEntries(snapshot.HybridEntries),
AllocatedItems: newagentshared.CloneTaskClassItems(snapshot.AllocatedItems),
GeneratedAt: generatedAt,
}
}
@@ -201,7 +102,7 @@ func snapshotToSchedulePlanPreviewResponse(snapshot *model.SchedulePlanStateSnap
if snapshot == nil {
return nil
}
plans := cloneWeekSchedules(snapshot.CandidatePlans)
plans := newagentshared.CloneWeekSchedules(snapshot.CandidatePlans)
if plans == nil {
plans = make([]model.UserWeekSchedule, 0)
}
@@ -214,7 +115,7 @@ func snapshotToSchedulePlanPreviewResponse(snapshot *model.SchedulePlanStateSnap
TraceID: strings.TrimSpace(snapshot.TraceID),
Summary: schedulePlanSummaryOrFallback(strings.TrimSpace(snapshot.FinalSummary)),
CandidatePlans: plans,
HybridEntries: cloneHybridEntries(snapshot.HybridEntries),
HybridEntries: newagentshared.CloneHybridEntries(snapshot.HybridEntries),
TaskClassIDs: snapshot.TaskClassIDs,
GeneratedAt: generatedAt,
}

View File

@@ -1,175 +0,0 @@
package agentsvc
import (
"context"
"errors"
"log"
"strings"
agentgraph "github.com/LoveLosita/smartflow/backend/agent/graph"
agentmodel "github.com/LoveLosita/smartflow/backend/agent/model"
agentnode "github.com/LoveLosita/smartflow/backend/agent/node"
"github.com/LoveLosita/smartflow/backend/model"
"github.com/LoveLosita/smartflow/backend/respond"
"github.com/cloudwego/eino-ext/components/model/ark"
)
// runScheduleRefineFlow 鎵ц鈥滆繛缁璇濆井璋冩帓绋嬧€濆垎鏀€?
//
// 鑱岃矗杈圭晫锛?
// 1. 璐熻矗璇诲彇鈥滀笂涓€鐗堟帓绋嬮瑙堝揩鐓р€濓紙浼樺厛 Redis锛岀己澶卞啀鍥炴簮 MySQL锛夛紱
// 2. 璐熻矗璋冪敤鐙珛 schedulerefine 鍥鹃摼璺畬鎴愭湰杞井璋冿紱
// 3. 璐熻矗鎶婂井璋冪粨鏋滃洖鍐欓瑙堢紦瀛樹笌鐘舵€佸揩鐓э紝渚涘悗缁户缁井璋冿紱
// 4. 涓嶈礋璐h亰澶╂秷鎭寔涔呭寲锛堟秷鎭寔涔呭寲鐢?AgentChat 涓婚摼璺粺涓€澶勭悊锛夈€?
func (s *AgentService) runScheduleRefineFlow(
ctx context.Context,
selectedModel *ark.ChatModel,
userMessage string,
userID int,
chatID string,
traceID string,
emitStage func(stage, detail string),
outChan chan<- string,
modelName string,
) (string, error) {
_ = outChan
_ = modelName
// 1. 渚濊禆棰勬锛氭ā鍨嬩负绌烘椂鏃犳硶鎵ц浠讳綍鑺傜偣锛岀洿鎺ュけ璐ラ伩鍏嶇┖鎸囬拡銆?
if selectedModel == nil {
return "", errors.New("schedule refine model is nil")
}
emitStage("schedule_refine.context.loading", "正在加载上一版排程上下文。")
// 2. 鍏堟煡 Redis 棰勮蹇収锛屼繚璇佺儹璺緞浣庡欢杩熴€?
// 2.1 濡傛灉 Redis 鏈懡涓紝鍐嶅洖婧?MySQL 蹇収鍏滃簳锛?
// 2.2 濡傛灉涓よ€呴兘娌℃湁锛岃鏄庡綋鍓嶄細璇濇病鏈夊彲寰皟鍩虹锛岀洿鎺ヨ繑鍥炰笟鍔¢敊璇€?
preview := s.loadSchedulePreviewContext(ctx, userID, chatID)
if preview == nil {
return "", respond.SchedulePlanPreviewNotFound
}
// 3. 鍒濆鍖栧井璋冪姸鎬佸苟杩愯鐙珛鍥俱€?
state := agentnode.NewScheduleRefineState(traceID, userID, chatID, userMessage, preview)
finalState, runErr := agentgraph.RunScheduleRefineGraph(ctx, agentnode.ScheduleRefineGraphRunInput{
Model: selectedModel,
State: state,
EmitStage: emitStage,
})
if runErr != nil {
return "", runErr
}
if finalState == nil {
return "", errors.New("schedule refine graph returned nil state")
}
// 4. 璋冪敤鐩殑锛?
// 4.1 saveSchedulePlanPreview 鐩墠鏄€滈瑙堢紦瀛?+ MySQL 蹇収鈥濈殑缁熶竴鍐欏叆鍙o紱
// 4.2 杩欓噷鎶?refine state 鏄犲皠涓?scheduleplan state锛屽鐢ㄥ凡鏈夎惤鐩橀摼璺紱
// 4.3 浣嗚嫢鏄€滅嫭绔嬪鍚堝垎鏀凡鍑虹珯銆佺粓瀹′粛澶辫触鈥濓紝鍒欎笉瑕嗙洊涓婁竴鐗堥瑙堬紝閬垮厤澶栭儴璇互涓烘柊鏂规宸查獙璇侀€氳繃銆?
if shouldPersistScheduleRefinePreview(finalState) {
s.saveSchedulePlanPreview(ctx, userID, chatID, convertRefineStateToPlanState(finalState))
} else {
emitStage("schedule_refine.preview.skipped", "复合分支终审未通过,本轮结果不覆盖上一版预览。")
}
reply := strings.TrimSpace(finalState.FinalSummary)
if reply == "" {
reply = "微调已完成,但本轮未生成总结文案。"
}
return reply, nil
}
// loadSchedulePreviewContext 璇诲彇鈥滃彲鐢ㄤ簬杩炵画寰皟鈥濈殑鎺掔▼涓婁笅鏂囧揩鐓с€?
//
// 姝ラ鍖栬鏄庯細
// 1. 鍏堟煡 Redis锛氬懡涓垯鐩存帴杩斿洖锛屾椂寤舵渶灏忥紱
// 2. Redis miss 鍐嶆煡 MySQL锛氫繚璇佺紦瀛樿繃鏈熷悗浠嶅彲缁х画寰皟锛?
// 3. 鑻?MySQL 鍛戒腑涓?Redis 鍙敤锛岄『渚垮洖濉?Redis锛屾彁鍗囧悗缁懡涓巼锛?
// 4. 浠讳竴姝ュけ璐ヤ粎鎵撴棩蹇楋紝涓?panic锛岀敱涓婂眰鏍规嵁杩斿洖 nil 鍋氱粺涓€澶勭悊銆?
func (s *AgentService) loadSchedulePreviewContext(ctx context.Context, userID int, chatID string) *model.SchedulePlanPreviewCache {
normalizedChatID := strings.TrimSpace(chatID)
if normalizedChatID == "" || userID <= 0 {
return nil
}
if s.cacheDAO != nil {
preview, err := s.cacheDAO.GetSchedulePlanPreviewFromCache(ctx, userID, normalizedChatID)
if err != nil {
log.Printf("璇诲彇鎺掔▼棰勮缂撳瓨澶辫触 chat_id=%s: %v", normalizedChatID, err)
} else if preview != nil {
return preview
}
}
if s.repo == nil {
return nil
}
snapshot, err := s.repo.GetScheduleStateSnapshot(ctx, userID, normalizedChatID)
if err != nil {
log.Printf("璇诲彇鎺掔▼鐘舵€佸揩鐓уけ璐?chat_id=%s: %v", normalizedChatID, err)
return nil
}
if snapshot == nil {
return nil
}
preview := snapshotToSchedulePlanPreviewCache(snapshot)
if preview != nil && s.cacheDAO != nil {
if setErr := s.cacheDAO.SetSchedulePlanPreviewToCache(ctx, userID, normalizedChatID, preview); setErr != nil {
log.Printf("鍥炲~鎺掔▼棰勮缂撳瓨澶辫触 chat_id=%s: %v", normalizedChatID, setErr)
}
}
return preview
}
// convertRefineStateToPlanState 鎶?schedulerefine 鐘舵€佹槧灏勪负 scheduleplan 鐘舵€併€?
//
// 璁捐鎰忓浘锛?
// 1. 澶嶇敤鐜版湁 saveSchedulePlanPreview 鍐欏叆閾捐矾锛屽噺灏戦噸澶嶈惤鐩樹唬鐮侊紱
// 2. 浠呮槧灏勨€滈瑙堟寔涔呭寲蹇呴』瀛楁鈥濓紝閬垮厤鎶?refine 杩愯鏈熶复鏃跺瓧娈靛甫鍏ュ瓨鍌ㄥ眰锛?
// 3. 鍚庣画濡傝鎵╁睍 refine 涓撳睘蹇収瀛楁锛屽彲鍦ㄨ鏄犲皠澶勯泦涓紨杩涖€?
func convertRefineStateToPlanState(st *agentnode.ScheduleRefineState) *agentmodel.SchedulePlanState {
if st == nil {
return nil
}
adjustmentScope := "medium"
if st.Contract.Strategy == "keep" {
adjustmentScope = "small"
}
return &agentmodel.SchedulePlanState{
TraceID: strings.TrimSpace(st.TraceID),
UserID: st.UserID,
ConversationID: strings.TrimSpace(st.ConversationID),
UserIntent: strings.TrimSpace(st.UserIntent),
Constraints: append([]string(nil), st.Constraints...),
TaskClassIDs: append([]int(nil), st.TaskClassIDs...),
Strategy: "steady",
AdjustmentScope: adjustmentScope,
IsAdjustment: true,
HybridEntries: append([]model.HybridScheduleEntry(nil), st.HybridEntries...),
AllocatedItems: cloneTaskClassItems(st.AllocatedItems),
CandidatePlans: cloneWeekSchedules(st.CandidatePlans),
FinalSummary: strings.TrimSpace(st.FinalSummary),
Completed: st.Completed,
}
}
// shouldPersistScheduleRefinePreview 鍒ゆ柇鈥滄湰杞井璋冪粨鏋滄槸鍚﹀簲瑕嗙洊涓婁竴鐗堥瑙堚€濄€?
//
// 鑱岃矗杈圭晫锛?
// 1. 榛樿娌跨敤鍘熸湁 refine 鎸佷箙鍖栫瓥鐣ワ紝淇濊瘉鏅€?ReAct 寰皟閾捐矾涓嶅彈褰卞搷锛?
// 2. 浠呭綋鈥滅嫭绔嬪鍚堝垎鏀凡鐩存帴鍑虹珯锛屼絾缁堝鏈€氳繃鈥濇椂锛屾嫆缁濊鐩栦笂涓€鐗堥瑙堬紱
// 3. 杩欐牱鍙互閬垮厤澶栧眰鎶婃湭缁忛獙璇佺殑澶嶅悎缁撴灉褰撴垚鏂扮殑鍩虹嚎缁х画婊氬姩寰皟銆?
func shouldPersistScheduleRefinePreview(st *agentnode.ScheduleRefineState) bool {
if st == nil {
return false
}
if st.CompositeRouteSucceeded && !agentnode.FinalHardCheckPassed(st) {
return false
}
return true
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/LoveLosita/smartflow/backend/model"
newagentconv "github.com/LoveLosita/smartflow/backend/newAgent/conv"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
newagentshared "github.com/LoveLosita/smartflow/backend/newAgent/shared"
"github.com/LoveLosita/smartflow/backend/respond"
)
@@ -115,10 +116,10 @@ func (s *AgentService) refreshSchedulePreviewAfterStateSave(
if existingPreview != nil {
preview.TraceID = strings.TrimSpace(existingPreview.TraceID)
if len(existingPreview.CandidatePlans) > 0 {
preview.CandidatePlans = cloneWeekSchedules(existingPreview.CandidatePlans)
preview.CandidatePlans = newagentshared.CloneWeekSchedules(existingPreview.CandidatePlans)
}
if len(existingPreview.AllocatedItems) > 0 {
preview.AllocatedItems = cloneTaskClassItems(existingPreview.AllocatedItems)
preview.AllocatedItems = newagentshared.CloneTaskClassItems(existingPreview.AllocatedItems)
}
if len(preview.TaskClassIDs) == 0 && len(existingPreview.TaskClassIDs) > 0 {
preview.TaskClassIDs = append([]int(nil), existingPreview.TaskClassIDs...)

View File

@@ -0,0 +1,121 @@
package agentsvc
import (
"context"
"io"
"strings"
"time"
newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt"
newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/cloudwego/eino/schema"
"github.com/google/uuid"
arkModel "github.com/volcengine/volcengine-go-sdk/service/arkruntime/model"
)
// streamChatFallback 是 graph 执行失败时的降级流式聊天。
// 内联了旧 agentchat.StreamChat 的核心逻辑,不再依赖 agent/ 包。
func (s *AgentService) streamChatFallback(
ctx context.Context,
llm *ark.ChatModel,
modelName string,
userInput string,
ifThinking bool,
chatHistory []*schema.Message,
outChan chan<- string,
reasoningStartAt *time.Time,
) (string, string, int, *schema.TokenUsage, error) {
messages := make([]*schema.Message, 0, len(chatHistory)+2)
messages = append(messages, schema.SystemMessage(newagentprompt.SystemPrompt))
if len(chatHistory) > 0 {
messages = append(messages, chatHistory...)
}
messages = append(messages, schema.UserMessage(userInput))
var thinking *ark.Thinking
if ifThinking {
thinking = &arkModel.Thinking{Type: arkModel.ThinkingTypeEnabled}
} else {
thinking = &arkModel.Thinking{Type: arkModel.ThinkingTypeDisabled}
}
if strings.TrimSpace(modelName) == "" {
modelName = "smartflow-worker"
}
requestID := "chatcmpl-" + uuid.NewString()
created := time.Now().Unix()
firstChunk := true
var localReasoningStartAt *time.Time
if reasoningStartAt != nil && !reasoningStartAt.IsZero() {
startCopy := reasoningStartAt.In(time.Local)
localReasoningStartAt = &startCopy
}
var reasoningEndAt *time.Time
reader, err := llm.Stream(ctx, messages, ark.WithThinking(thinking))
if err != nil {
return "", "", 0, nil, err
}
defer reader.Close()
var fullText strings.Builder
var reasoningText strings.Builder
var tokenUsage *schema.TokenUsage
for {
chunk, recvErr := reader.Recv()
if recvErr == io.EOF {
break
}
if recvErr != nil {
return "", "", 0, nil, recvErr
}
if chunk != nil && chunk.ResponseMeta != nil && chunk.ResponseMeta.Usage != nil {
tokenUsage = newagentstream.MergeUsage(tokenUsage, chunk.ResponseMeta.Usage)
}
if chunk != nil {
if strings.TrimSpace(chunk.ReasoningContent) != "" && localReasoningStartAt == nil {
now := time.Now()
localReasoningStartAt = &now
}
if strings.TrimSpace(chunk.Content) != "" && localReasoningStartAt != nil && reasoningEndAt == nil {
now := time.Now()
reasoningEndAt = &now
}
fullText.WriteString(chunk.Content)
reasoningText.WriteString(chunk.ReasoningContent)
}
payload, payloadErr := newagentstream.ToOpenAIStream(chunk, requestID, modelName, created, firstChunk)
if payloadErr != nil {
return "", "", 0, nil, payloadErr
}
if payload != "" {
outChan <- payload
firstChunk = false
}
}
finishChunk, finishErr := newagentstream.ToOpenAIFinishStream(requestID, modelName, created)
if finishErr != nil {
return "", "", 0, nil, finishErr
}
outChan <- finishChunk
outChan <- "[DONE]"
reasoningDurationSeconds := 0
if localReasoningStartAt != nil {
if reasoningEndAt == nil {
now := time.Now()
reasoningEndAt = &now
}
if reasoningEndAt.After(*localReasoningStartAt) {
reasoningDurationSeconds = int(reasoningEndAt.Sub(*localReasoningStartAt) / time.Second)
}
}
return fullText.String(), reasoningText.String(), reasoningDurationSeconds, tokenUsage, nil
}

View File

@@ -7,44 +7,12 @@ import (
"strings"
"time"
agentgraph "github.com/LoveLosita/smartflow/backend/agent/graph"
agentmodel "github.com/LoveLosita/smartflow/backend/agent/model"
agentnode "github.com/LoveLosita/smartflow/backend/agent/node"
"github.com/LoveLosita/smartflow/backend/model"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
"github.com/LoveLosita/smartflow/backend/respond"
"github.com/cloudwego/eino-ext/components/model/ark"
)
func (s *AgentService) runTaskQueryFlow(
ctx context.Context,
selectedModel *ark.ChatModel,
userMessage string,
userID int,
emitStage func(stage, detail string),
) (string, error) {
if s == nil || s.taskRepo == nil {
return "", errors.New("task query service dependency is not ready")
}
if selectedModel == nil {
return "", errors.New("task query model is nil")
}
requestNow := time.Now().In(time.Local).Format("2006-01-02 15:04")
state := agentmodel.NewTaskQueryState(strings.TrimSpace(userMessage), requestNow, agentmodel.DefaultTaskQueryReflectRetry)
return agentgraph.RunTaskQueryGraph(ctx, agentnode.TaskQueryGraphRunInput{
Model: selectedModel,
State: state,
EmitStage: emitStage,
Deps: agentnode.TaskQueryToolDeps{
QueryTasks: func(ctx context.Context, req agentnode.TaskQueryRequest) ([]agentnode.TaskQueryTaskRecord, error) {
req.UserID = userID
return s.QueryTasksForTool(ctx, req)
},
},
})
}
func (s *AgentService) QueryTasksForTool(ctx context.Context, req agentnode.TaskQueryRequest) ([]agentnode.TaskQueryTaskRecord, error) {
func (s *AgentService) QueryTasksForTool(ctx context.Context, req newagentmodel.TaskQueryRequest) ([]newagentmodel.TaskQueryTaskRecord, error) {
_ = ctx
if req.UserID <= 0 {
return nil, errors.New("invalid user_id in task query")
@@ -56,7 +24,7 @@ func (s *AgentService) QueryTasksForTool(ctx context.Context, req agentnode.Task
tasks, err := s.taskRepo.GetTasksByUserID(req.UserID)
if err != nil {
if errors.Is(err, respond.UserTasksEmpty) {
return make([]agentnode.TaskQueryTaskRecord, 0), nil
return make([]newagentmodel.TaskQueryTaskRecord, 0), nil
}
return nil, err
}
@@ -77,9 +45,9 @@ func (s *AgentService) QueryTasksForTool(ctx context.Context, req agentnode.Task
filtered = filtered[:req.Limit]
}
records := make([]agentnode.TaskQueryTaskRecord, 0, len(filtered))
records := make([]newagentmodel.TaskQueryTaskRecord, 0, len(filtered))
for _, task := range filtered {
records = append(records, agentnode.TaskQueryTaskRecord{
records = append(records, newagentmodel.TaskQueryTaskRecord{
ID: task.ID,
Title: task.Title,
PriorityGroup: task.Priority,
@@ -106,7 +74,7 @@ func applyReadTimeUrgencyPromotion(task *model.Task, now time.Time) {
}
}
func taskMatchesQueryFilter(task model.Task, req agentnode.TaskQueryRequest) bool {
func taskMatchesQueryFilter(task model.Task, req newagentmodel.TaskQueryRequest) bool {
if !req.IncludeCompleted && task.IsCompleted {
return false
}
@@ -130,7 +98,7 @@ func taskMatchesQueryFilter(task model.Task, req agentnode.TaskQueryRequest) boo
return true
}
func sortTasksForQuery(tasks []model.Task, req agentnode.TaskQueryRequest) {
func sortTasksForQuery(tasks []model.Task, req newagentmodel.TaskQueryRequest) {
if len(tasks) <= 1 {
return
}