diff --git a/AGENTS.md b/AGENTS.md index 2516b78..d052ecf 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -4,3 +4,33 @@ 1. 默认语言规则:所有注释、接口文案、说明、评审反馈均使用中文。 2. 请勤加注释,尤其是复杂逻辑部分,确保代码易于理解和维护。 + +## 注释规范(强制) + +1. 默认使用中文注释,禁止英文注释(专业术语除外)。 +2. 复杂逻辑必须写“步骤化注释”,用 `1. / 2. / 2.1` 这种编号,说明: + - 这一步要做什么 + - 为什么要这样做 + - 失败时怎么处理 + - 兜底/回退策略是什么 +3. 函数注释至少说明“职责边界”: + - 这个函数负责什么 + - 不负责什么 + - 输入输出语义(尤其是 bool、error、状态字段) +4. 涉及分支、重试、事务、幂等、并发、状态机的代码,必须写清楚判断依据与流转条件。 +5. 跨文件调用前必须写“调用目的注释”,让读者不跳转文件也能理解当前代码意图。 +6. 注释禁止空话(如“设置变量”“调用方法”);必须写业务意图与约束。 +7. 改动代码时,如修改了复杂逻辑,必须同步更新注释;注释过期视为不合格提交。 +8. 不要求每行都注释;简单直白代码可省略,重点保证关键路径可读性。 + +## 注释风格示例 + +推荐: +- `// 1. 先查缓存,命中则避免回源 DB,降低接口延迟。` +- `// 2. 缓存未命中再查库;若查库失败直接返回,避免写入不完整状态。` +- `// 3. 写库成功后再更新缓存,保证“先真后快”,避免脏缓存。` + +不推荐: +- `// 查询缓存` +- `// 调用 DAO` +- `// 返回结果` diff --git a/backend/agent/quicknote/graph.go b/backend/agent/quicknote/graph.go index 2186c86..0d4aed1 100644 --- a/backend/agent/quicknote/graph.go +++ b/backend/agent/quicknote/graph.go @@ -37,6 +37,7 @@ type QuickNoteGraphRunInput struct { // RunQuickNoteGraph 执行“随口记”图编排。 // 该文件只负责“连线与分支”,节点内部逻辑全部下沉到 nodes.go。 func RunQuickNoteGraph(ctx context.Context, input QuickNoteGraphRunInput) (*QuickNoteState, error) { + // 1. 启动前硬校验:模型、状态、依赖缺一不可。 if input.Model == nil { return nil, errors.New("quick note graph: model is nil") } @@ -47,6 +48,7 @@ func RunQuickNoteGraph(ctx context.Context, input QuickNoteGraphRunInput) (*Quic return nil, err } + // 2. 统一封装阶段推送函数,避免各节点反复判空。 emitStage := func(stage, detail string) { if input.EmitStage != nil { input.EmitStage(stage, detail) @@ -54,13 +56,17 @@ func RunQuickNoteGraph(ctx context.Context, input QuickNoteGraphRunInput) (*Quic } // 统一初始化“当前时间基准”,避免同一请求内相对时间口径漂移。 + // 2.1 若上游未设置 RequestNow,这里补齐。 if input.State.RequestNow.IsZero() { input.State.RequestNow = quickNoteNowToMinute() } + // 2.2 若上游未设置文本基准,这里按统一格式补齐。 if strings.TrimSpace(input.State.RequestNowText) == "" { input.State.RequestNowText = formatQuickNoteTimeToMinute(input.State.RequestNow) } + // 3. 构建工具包并取出写库工具。 + // 这样 graph 运行时只关心“调用工具”,不关心工具如何注册。 toolBundle, err := BuildQuickNoteToolBundle(ctx, input.Deps) if err != nil { return nil, err @@ -69,10 +75,14 @@ func RunQuickNoteGraph(ctx context.Context, input QuickNoteGraphRunInput) (*Quic if err != nil { return nil, err } + + // 4. runner 负责把依赖收口,graph 只保留连线定义。 runner := newQuickNoteRunner(input, createTaskTool, emitStage) + // 5. 创建状态图容器:输入/输出类型都为 *QuickNoteState。 graph := compose.NewGraph[*QuickNoteState, *QuickNoteState]() + // 6. 注册节点(意图 -> 优先级 -> 持久化 -> 退出)。 if err = graph.AddLambdaNode(quickNoteGraphNodeIntent, compose.InvokableLambda(runner.intentNode)); err != nil { return nil, err } @@ -90,11 +100,13 @@ func RunQuickNoteGraph(ctx context.Context, input QuickNoteGraphRunInput) (*Quic } // 连线:START -> intent + // 7. 所有请求统一先过 intent 节点,确保意图和时间校验在前。 if err = graph.AddEdge(compose.START, quickNoteGraphNodeIntent); err != nil { return nil, err } // 分支:intent 后决定去 priority 还是 exit。 + // 8. 非随口记或时间非法时直接 exit,避免进入后续写库路径。 if err = graph.AddBranch(quickNoteGraphNodeIntent, compose.NewGraphBranch( runner.nextAfterIntent, map[string]bool{ @@ -106,16 +118,19 @@ func RunQuickNoteGraph(ctx context.Context, input QuickNoteGraphRunInput) (*Quic } // exit 直接结束。 + // 9. exit 是显式终点前节点,方便后续插入“统一收尾逻辑”。 if err = graph.AddEdge(quickNoteGraphNodeExit, compose.END); err != nil { return nil, err } // priority -> persist。 + // 10. 通过优先级节点后,进入持久化节点。 if err = graph.AddEdge(quickNoteGraphNodeRank, quickNoteGraphNodePersist); err != nil { return nil, err } // persist 后决定“重试 persist”还是结束。 + // 11. 重试策略由状态字段驱动,不在 graph 层写重试计数逻辑。 if err = graph.AddBranch(quickNoteGraphNodePersist, compose.NewGraphBranch( runner.nextAfterPersist, map[string]bool{ @@ -126,11 +141,14 @@ func RunQuickNoteGraph(ctx context.Context, input QuickNoteGraphRunInput) (*Quic return nil, err } + // 12. 运行步数上限:至少 12 步,并根据 MaxToolRetry 预留重试步数。 + // 防止异常分支导致无限循环。 maxSteps := input.State.MaxToolRetry + 10 if maxSteps < 12 { maxSteps = 12 } + // 13. 编译图得到可执行实例。 runnable, err := graph.Compile(ctx, compose.WithGraphName("QuickNoteGraph"), compose.WithMaxRunSteps(maxSteps), @@ -140,5 +158,6 @@ func RunQuickNoteGraph(ctx context.Context, input QuickNoteGraphRunInput) (*Quic return nil, err } + // 14. 执行图并返回最终状态。 return runnable.Invoke(ctx, input.State) } diff --git a/backend/agent/quicknote/nodes.go b/backend/agent/quicknote/nodes.go index d119898..4b52819 100644 --- a/backend/agent/quicknote/nodes.go +++ b/backend/agent/quicknote/nodes.go @@ -42,21 +42,26 @@ type quickNotePlanModelOutput struct { // 1) trustRoute 命中时,直接走单请求聚合规划,跳过二次意图识别; // 2) 无论是否走快路径,最终都要走本地时间硬校验,防止脏时间落库。 func runQuickNoteIntentNode(ctx context.Context, st *QuickNoteState, input QuickNoteGraphRunInput, emitStage func(stage, detail string)) (*QuickNoteState, error) { + // 0. 基础防御:state 为空直接返回错误,避免后续节点空指针。 if st == nil { return nil, errors.New("quick note graph: nil state in intent node") } + // 1. 如果上游路由已高置信命中 quick_note,则走“单请求聚合快路径”。 if input.SkipIntentVerification { emitStage("quick_note.intent.analyzing", "已由上游路由判定为任务请求,跳过二次意图判断。") st.IsQuickNoteIntent = true st.IntentJudgeReason = "上游路由已命中 quick_note,跳过二次意图判定" st.PlannedBySingleCall = true + // 1.1 一次调用里尽量拿齐 title/deadline/priority/banter,减少串行模型开销。 emitStage("quick_note.plan.generating", "正在一次性生成时间归一化、优先级与回复润色。") plan, planErr := planQuickNoteInSingleCall(ctx, input.Model, st.RequestNowText, st.RequestNow, st.UserInput) if planErr != nil { + // 1.2 聚合规划失败不终止链路,改为后续本地兜底。 st.IntentJudgeReason += ";聚合规划失败,回退本地兜底" } else { + // 1.3 仅在字段有效时回填,避免无效值污染状态。 if strings.TrimSpace(plan.Title) != "" { st.ExtractedTitle = strings.TrimSpace(plan.Title) } @@ -71,10 +76,12 @@ func runQuickNoteIntentNode(ctx context.Context, st *QuickNoteState, input Quick st.ExtractedBanter = strings.TrimSpace(plan.Banter) } + // 1.4 如果模型没给标题,基于原句做本地标题提取兜底。 if strings.TrimSpace(st.ExtractedTitle) == "" { st.ExtractedTitle = deriveQuickNoteTitleFromInput(st.UserInput) } + // 1.5 无论是否聚合成功,都要进行本地时间硬校验,防止脏时间写库。 emitStage("quick_note.deadline.validating", "正在校验并归一化任务时间。") userDeadline, userHasTimeHint, userDeadlineErr := parseOptionalDeadlineFromUserInput(st.UserInput, st.RequestNow) if userHasTimeHint && userDeadlineErr != nil { @@ -84,12 +91,14 @@ func runQuickNoteIntentNode(ctx context.Context, st *QuickNoteState, input Quick return st, nil } if userDeadline != nil { + // 用户原句能解析出时间时,以原句解析结果为准(更贴近真实输入)。 st.ExtractedDeadline = userDeadline st.ExtractedDeadlineText = strings.TrimSpace(st.UserInput) } return st, nil } + // 2. 常规路径:先让模型做意图识别 + 初步抽取。 emitStage("quick_note.intent.analyzing", "正在分析用户输入是否属于任务安排请求。") prompt := fmt.Sprintf(`当前时间(北京时间,精确到分钟):%s 用户输入:%s @@ -108,6 +117,7 @@ func runQuickNoteIntentNode(ctx context.Context, st *QuickNoteState, input Quick st.UserInput, ) + // 2.1 模型调用失败时,保守回退普通聊天,避免误写任务。 raw, callErr := callModelForJSON(ctx, input.Model, QuickNoteIntentPrompt, prompt) if callErr != nil { st.IsQuickNoteIntent = false @@ -115,6 +125,7 @@ func runQuickNoteIntentNode(ctx context.Context, st *QuickNoteState, input Quick return st, nil } + // 2.2 解析失败同样回退普通聊天,保证稳定性优先。 parsed, parseErr := parseJSONPayload[quickNoteIntentModelOutput](raw) if parseErr != nil { st.IsQuickNoteIntent = false @@ -125,9 +136,11 @@ func runQuickNoteIntentNode(ctx context.Context, st *QuickNoteState, input Quick st.IsQuickNoteIntent = parsed.IsQuickNote st.IntentJudgeReason = strings.TrimSpace(parsed.Reason) if !st.IsQuickNoteIntent { + // 非随口记:后续通过分支直接退出 graph。 return st, nil } + // 2.3 处理标题字段:为空时回退到用户原句。 title := strings.TrimSpace(parsed.Title) if title == "" { title = strings.TrimSpace(st.UserInput) @@ -137,6 +150,7 @@ func runQuickNoteIntentNode(ctx context.Context, st *QuickNoteState, input Quick emitStage("quick_note.deadline.validating", "正在校验并归一化任务时间。") // Step A:优先尝试解析模型抽取出来的 deadline。 + // 这样可利用模型“结构化理解”能力先拿一次候选时间。 st.ExtractedDeadlineText = strings.TrimSpace(parsed.DeadlineAt) if st.ExtractedDeadlineText != "" { if deadline, deadlineErr := parseOptionalDeadlineWithNow(st.ExtractedDeadlineText, st.RequestNow); deadlineErr == nil { @@ -145,6 +159,7 @@ func runQuickNoteIntentNode(ctx context.Context, st *QuickNoteState, input Quick } // Step B:基于用户原句执行“本地时间解析 + 合法性校验”。 + // 本地校验是最终硬门槛,确保“用户给错时间不会被静默写成 NULL”。 userDeadline, userHasTimeHint, userDeadlineErr := parseOptionalDeadlineFromUserInput(st.UserInput, st.RequestNow) if userHasTimeHint && userDeadlineErr != nil { st.DeadlineValidationError = userDeadlineErr.Error() @@ -154,6 +169,7 @@ func runQuickNoteIntentNode(ctx context.Context, st *QuickNoteState, input Quick } if st.ExtractedDeadline == nil && userDeadline != nil { + // 当模型未提取出时间,但原句能解析时,补写时间结果。 st.ExtractedDeadline = userDeadline if st.ExtractedDeadlineText == "" { st.ExtractedDeadlineText = strings.TrimSpace(st.UserInput) @@ -171,10 +187,12 @@ func runQuickNotePriorityNode(ctx context.Context, st *QuickNoteState, input Qui if st == nil { return nil, errors.New("quick note graph: nil state in priority node") } + // 1. 非随口记或时间校验失败时,不做优先级评估。 if !st.IsQuickNoteIntent || strings.TrimSpace(st.DeadlineValidationError) != "" { return st, nil } + // 2. 已有合法优先级则直接复用,避免重复调用模型。 if IsValidTaskPriority(st.ExtractedPriority) { if strings.TrimSpace(st.ExtractedPriorityReason) == "" { st.ExtractedPriorityReason = "复用聚合规划优先级" @@ -182,6 +200,7 @@ func runQuickNotePriorityNode(ctx context.Context, st *QuickNoteState, input Qui emitStage("quick_note.priority.evaluating", "已复用聚合规划结果中的优先级。") return st, nil } + // 3. 快路径下若缺失优先级,直接本地兜底,追求低延迟。 if input.SkipIntentVerification || st.PlannedBySingleCall { st.ExtractedPriority = fallbackPriority(st) st.ExtractedPriorityReason = "聚合规划未给出合法优先级,使用本地兜底" @@ -189,6 +208,7 @@ func runQuickNotePriorityNode(ctx context.Context, st *QuickNoteState, input Qui return st, nil } + // 4. 常规路径才调用独立优先级模型。 emitStage("quick_note.priority.evaluating", "正在评估任务优先级。") deadlineText := "无" if st.ExtractedDeadline != nil { @@ -218,6 +238,7 @@ func runQuickNotePriorityNode(ctx context.Context, st *QuickNoteState, input Qui deadlineText, ) + // 4.1 调用失败:使用本地兜底,不中断主链路。 raw, callErr := callModelForJSON(ctx, input.Model, QuickNotePriorityPrompt, prompt) if callErr != nil { st.ExtractedPriority = fallbackPriority(st) @@ -225,6 +246,7 @@ func runQuickNotePriorityNode(ctx context.Context, st *QuickNoteState, input Qui return st, nil } + // 4.2 解析失败或非法值:同样兜底。 parsed, parseErr := parseJSONPayload[quickNotePriorityModelOutput](raw) if parseErr != nil || !IsValidTaskPriority(parsed.PriorityGroup) { st.ExtractedPriority = fallbackPriority(st) @@ -244,10 +266,12 @@ func runQuickNotePersistNodeInternal(ctx context.Context, st *QuickNoteState, cr if st == nil { return nil, errors.New("quick note graph: nil state in persist node") } + // 1. 非随口记或时间非法时不允许落库。 if !st.IsQuickNoteIntent || strings.TrimSpace(st.DeadlineValidationError) != "" { return st, nil } + // 2. 准备工具入参:优先使用已评估优先级,缺失则兜底。 emitStage("quick_note.persisting", "正在写入任务数据。") priority := st.ExtractedPriority if !IsValidTaskPriority(priority) { @@ -260,6 +284,7 @@ func runQuickNotePersistNodeInternal(ctx context.Context, st *QuickNoteState, cr deadlineText = st.ExtractedDeadline.In(quickNoteLocation()).Format(time.RFC3339) } + // 3. 工具参数序列化失败视作一次失败尝试,交由重试分支处理。 toolInput := QuickNoteCreateTaskToolInput{ Title: st.ExtractedTitle, PriorityGroup: priority, @@ -275,6 +300,7 @@ func runQuickNotePersistNodeInternal(ctx context.Context, st *QuickNoteState, cr return st, nil } + // 4. 调用写库工具。 rawOutput, invokeErr := createTaskTool.InvokableRun(ctx, string(rawInput)) if invokeErr != nil { st.RecordToolError(invokeErr.Error()) @@ -285,6 +311,7 @@ func runQuickNotePersistNodeInternal(ctx context.Context, st *QuickNoteState, cr return st, nil } + // 5. 工具返回解析失败同样按“可重试错误”处理。 toolOutput, parseErr := parseJSONPayload[QuickNoteCreateTaskToolOutput](rawOutput) if parseErr != nil { st.RecordToolError("解析工具返回失败: " + parseErr.Error()) @@ -305,6 +332,7 @@ func runQuickNotePersistNodeInternal(ctx context.Context, st *QuickNoteState, cr return st, nil } + // 6. 写库成功后回填状态,并准备最终回复内容。 st.RecordToolSuccess(toolOutput.TaskID) if strings.TrimSpace(toolOutput.Title) != "" { st.ExtractedTitle = strings.TrimSpace(toolOutput.Title) @@ -324,6 +352,9 @@ func runQuickNotePersistNodeInternal(ctx context.Context, st *QuickNoteState, cr // selectQuickNoteNextAfterIntent 根据意图与时间校验结果决定 intent 后分支。 func selectQuickNoteNextAfterIntent(st *QuickNoteState) string { + // 1) 非随口记 -> exit; + // 2) 时间校验失败 -> exit; + // 3) 其余 -> priority 节点。 if st == nil || !st.IsQuickNoteIntent { return quickNoteGraphNodeExit } @@ -335,6 +366,11 @@ func selectQuickNoteNextAfterIntent(st *QuickNoteState) string { // selectQuickNoteNextAfterPersist 根据持久化状态决定 persist 后分支。 func selectQuickNoteNextAfterPersist(st *QuickNoteState) string { + // 分支规则: + // 1) state=nil:防御式结束; + // 2) 已持久化:结束; + // 3) 可重试:回到 persist 重试; + // 4) 不可重试:写失败文案并结束。 if st == nil { return compose.END } @@ -351,12 +387,14 @@ func selectQuickNoteNextAfterPersist(st *QuickNoteState) string { } func getInvokableToolByName(bundle *QuickNoteToolBundle, name string) (tool.InvokableTool, error) { + // 1. 校验工具包有效性。 if bundle == nil { return nil, errors.New("tool bundle is nil") } if len(bundle.Tools) == 0 || len(bundle.ToolInfos) == 0 { return nil, errors.New("tool bundle is empty") } + // 2. 通过 ToolInfo 名称定位并拿到同索引的 Tool 实例。 for idx, info := range bundle.ToolInfos { if info == nil || info.Name != name { continue @@ -371,14 +409,17 @@ func getInvokableToolByName(bundle *QuickNoteToolBundle, name string) (tool.Invo } func callModelForJSON(ctx context.Context, chatModel *ark.ChatModel, systemPrompt, userPrompt string) (string, error) { + // 默认 JSON 输出场景 token 足够小,使用 256 作为保守上限。 return callModelForJSONWithMaxTokens(ctx, chatModel, systemPrompt, userPrompt, 256) } func callModelForJSONWithMaxTokens(ctx context.Context, chatModel *ark.ChatModel, systemPrompt, userPrompt string, maxTokens int) (string, error) { + // 1. 构造 system + user 两段消息。 messages := []*schema.Message{ schema.SystemMessage(systemPrompt), schema.UserMessage(userPrompt), } + // 2. 统一关闭 thinking,降低额外延迟,并用温度 0 提升结构化稳定性。 opts := []einoModel.Option{ ark.WithThinking(&arkModel.Thinking{Type: arkModel.ThinkingTypeDisabled}), einoModel.WithTemperature(0), @@ -387,6 +428,7 @@ func callModelForJSONWithMaxTokens(ctx context.Context, chatModel *ark.ChatModel opts = append(opts, einoModel.WithMaxTokens(maxTokens)) } + // 3. 调模型并对空响应做防御校验。 resp, err := chatModel.Generate(ctx, messages, opts...) if err != nil { return "", err @@ -418,6 +460,7 @@ func planQuickNoteInSingleCall( now time.Time, userInput string, ) (*quickNotePlannedResult, error) { + // 1. 构造聚合 prompt:一次返回所有结构化字段,减少多次 LLM 往返。 prompt := fmt.Sprintf(`当前时间(北京时间,精确到分钟):%s 用户输入:%s @@ -438,10 +481,12 @@ func planQuickNoteInSingleCall( strings.TrimSpace(userInput), ) + // 2. 控制 maxTokens,避免模型冗长输出导致延迟上升。 raw, err := callModelForJSONWithMaxTokens(ctx, chatModel, QuickNotePlanPrompt, prompt, 220) if err != nil { return nil, err } + // 3. 解析模型输出 JSON。 parsed, parseErr := parseJSONPayload[quickNotePlanModelOutput](raw) if parseErr != nil { return nil, parseErr @@ -455,12 +500,14 @@ func planQuickNoteInSingleCall( Banter: strings.TrimSpace(parsed.Banter), } + // 4. banter 只保留首行,防止模型输出多行破坏最终回复风格。 if result.Banter != "" { if idx := strings.Index(result.Banter, "\n"); idx >= 0 { result.Banter = strings.TrimSpace(result.Banter[:idx]) } } + // 5. 对 deadline 做本地二次校验,确保可落库。 if result.DeadlineText != "" { if deadline, deadlineErr := parseOptionalDeadlineWithNow(result.DeadlineText, now); deadlineErr == nil { result.Deadline = deadline @@ -470,11 +517,13 @@ func planQuickNoteInSingleCall( } func parseJSONPayload[T any](raw string) (*T, error) { + // 1. 空字符串直接失败。 clean := strings.TrimSpace(raw) if clean == "" { return nil, errors.New("empty response") } + // 2. 兼容 ```json ... ``` 包裹输出。 if strings.HasPrefix(clean, "```") { clean = strings.TrimPrefix(clean, "```json") clean = strings.TrimPrefix(clean, "```") @@ -482,11 +531,13 @@ func parseJSONPayload[T any](raw string) (*T, error) { clean = strings.TrimSpace(clean) } + // 3. 先尝试整体反序列化(最快路径)。 var out T if err := json.Unmarshal([]byte(clean), &out); err == nil { return &out, nil } + // 4. 若模型附带额外文本,则提取最外层 JSON 对象再解析。 obj := extractJSONObject(clean) if obj == "" { return nil, fmt.Errorf("no json object found in: %s", clean) @@ -498,6 +549,8 @@ func parseJSONPayload[T any](raw string) (*T, error) { } func extractJSONObject(text string) string { + // 简化提取策略:取首个“{”到最后“}”的片段。 + // 对当前 prompt 场景足够稳定,且实现成本低。 start := strings.Index(text, "{") end := strings.LastIndex(text, "}") if start == -1 || end == -1 || end <= start { @@ -507,6 +560,10 @@ func extractJSONObject(text string) string { } func fallbackPriority(st *QuickNoteState) int { + // 兜底规则: + // 1) 有截止时间且 <=48h:重要且紧急; + // 2) 有截止时间但较远:重要不紧急; + // 3) 无截止时间:简单不重要。 if st == nil { return QuickNotePrioritySimpleNotImportant } @@ -521,11 +578,13 @@ func fallbackPriority(st *QuickNoteState) int { // deriveQuickNoteTitleFromInput 在“跳过二次意图判定”场景下,从用户原句提取任务标题。 func deriveQuickNoteTitleFromInput(userInput string) string { + // 1. 先清理空白。 text := strings.TrimSpace(userInput) if text == "" { return "这条任务" } + // 2. 去掉常见指令前缀,保留核心任务语义。 prefixes := []string{ "请帮我", "麻烦帮我", "麻烦你", "帮我", "提醒我", "请提醒我", "记一下", "记个", "帮我记一下", } @@ -536,6 +595,7 @@ func deriveQuickNoteTitleFromInput(userInput string) string { } } + // 3. 截断“记得/到时候”等尾部提醒语,避免标题过长。 suffixSeparators := []string{ ",记得", ",记得", ",到时候", ",到时候", " 到时候", ",别忘了", ",别忘了", "。记得", } @@ -546,6 +606,7 @@ func deriveQuickNoteTitleFromInput(userInput string) string { } } + // 4. 收尾清理标点;若清理后为空则回退原句。 text = strings.Trim(text, ",,。.!!?;; ") if text == "" { return strings.TrimSpace(userInput) diff --git a/backend/agent/quicknote/prompt.go b/backend/agent/quicknote/prompt.go index db60918..a9a56f9 100644 --- a/backend/agent/quicknote/prompt.go +++ b/backend/agent/quicknote/prompt.go @@ -5,6 +5,10 @@ const ( // - 仅负责判断用户输入应走 quick_note 还是 chat; // - 不直接回答用户问题; // - 必须输出可机读控制码,便于后端无歧义解析。 + // 额外说明: + // 1) 这里要求固定 XML 结构,是为了让后端做严格字符串/标签解析,而不是模糊关键词匹配; + // 2) 增加 reason 标签,主要用于日志排障(看模型为何判到 quick_note/chat); + // 3) 明确“禁止输出其他内容”,是为了减少模型附加寒暄导致解析失败。 QuickNoteRouteControlPrompt = `你是 SmartFlow 的请求分流控制器。 你的唯一任务是给后端返回可机读控制码,不要做用户可见回复,不要解释。 @@ -22,6 +26,10 @@ const ( // QuickNotePlanPrompt 用于“单请求聚合规划”: // - 在一次调用内完成标题抽取、时间归一化、优先级评估、跟进句生成; // - 主要用于路由已明确命中 quick_note 的场景,以降低串行 LLM 调用次数。 + // 额外说明: + // 1) 强制 JSON 输出,减少后端解析分支复杂度; + // 2) deadline_at 统一分钟级,方便直接映射到数据库时间字段; + // 3) banter 与事实分离,避免润色文案污染结构化字段。 QuickNotePlanPrompt = `你是 SmartFlow 的任务聚合规划器。 你将基于用户输入,一次性输出任务规划结果,供后端直接写库。 @@ -42,6 +50,7 @@ const ( // 1) 只做识别与抽取,不允许模型宣称“已写库”; // 2) 遇到相对时间必须先换算成绝对时间,减少后续工具层歧义; // 3) 若无时间信息必须返回空字符串,避免幻觉时间污染数据库。 + // 4) 把“当前时间”明确注入 prompt,保证相对时间换算有统一基准。 QuickNoteIntentPrompt = `你是 SmartFlow 的“随口记分诊器”。 请判断用户输入是否表达了“帮我记一个任务/日程”的需求。 - 若是,请提取任务标题与时间线索。 @@ -51,6 +60,7 @@ const ( // QuickNotePriorityPrompt 用于第二阶段:将任务归类到四象限优先级。 // 输出会直接映射到 tasks.priority(1~4),因此要求结果必须可解释。 + // 这里强调“理由必须可解释”,是为了后续日志复盘时能看懂模型为何这么判。 QuickNotePriorityPrompt = `你是 SmartFlow 的任务优先级评估器。 根据任务内容、时间约束和执行成本,输出优先级 priority_group: 1=重要且紧急,2=重要不紧急,3=简单不重要,4=不简单不重要。 diff --git a/backend/agent/quicknote/runner.go b/backend/agent/quicknote/runner.go index 3a85141..ae542d2 100644 --- a/backend/agent/quicknote/runner.go +++ b/backend/agent/quicknote/runner.go @@ -17,6 +17,8 @@ type quickNoteRunner struct { emitStage func(stage, detail string) } +// newQuickNoteRunner 构造请求级 runner。 +// 说明:runner 生命周期仅限一次 graph invoke,不做跨请求复用。 func newQuickNoteRunner(input QuickNoteGraphRunInput, createTaskTool tool.InvokableTool, emitStage func(stage, detail string)) *quickNoteRunner { return &quickNoteRunner{ input: input, @@ -26,28 +28,34 @@ func newQuickNoteRunner(input QuickNoteGraphRunInput, createTaskTool tool.Invoka } func (r *quickNoteRunner) intentNode(ctx context.Context, st *QuickNoteState) (*QuickNoteState, error) { + // 方法引用适配层:把 runner 内部依赖透传到纯函数节点实现。 return runQuickNoteIntentNode(ctx, st, r.input, r.emitStage) } func (r *quickNoteRunner) priorityNode(ctx context.Context, st *QuickNoteState) (*QuickNoteState, error) { + // 方法引用适配层:让 graph.go 保持“只连线,不写业务细节”。 return runQuickNotePriorityNode(ctx, st, r.input, r.emitStage) } func (r *quickNoteRunner) persistNode(ctx context.Context, st *QuickNoteState) (*QuickNoteState, error) { + // 这里注入 createTaskTool,是为了让 persist 节点不直接依赖外部容器对象。 return runQuickNotePersistNodeInternal(ctx, st, r.createTaskTool, r.input, r.emitStage) } func (r *quickNoteRunner) nextAfterIntent(ctx context.Context, st *QuickNoteState) (string, error) { + // 当前分支决策是纯状态函数,不依赖 context,保留参数仅为适配 GraphBranch 签名。 _ = ctx return selectQuickNoteNextAfterIntent(st), nil } func (r *quickNoteRunner) nextAfterPersist(ctx context.Context, st *QuickNoteState) (string, error) { + // 当前分支决策是纯状态函数,不依赖 context,保留参数仅为适配 GraphBranch 签名。 _ = ctx return selectQuickNoteNextAfterPersist(st), nil } func (r *quickNoteRunner) exitNode(ctx context.Context, st *QuickNoteState) (*QuickNoteState, error) { + // exit 节点不做任何业务逻辑,仅把当前状态原样透传到 END。 _ = ctx return st, nil } diff --git a/backend/agent/quicknote/state.go b/backend/agent/quicknote/state.go index b7e5e3b..0ad5074 100644 --- a/backend/agent/quicknote/state.go +++ b/backend/agent/quicknote/state.go @@ -104,6 +104,7 @@ type QuickNoteState struct { // NewQuickNoteState 创建随口记状态对象并初始化默认重试次数。 func NewQuickNoteState(traceID string, userID int, conversationID, userInput string) *QuickNoteState { + // 1. 在“进入链路”这一刻固化时间基准,后续所有相对时间都以它为准。 requestNow := quickNoteNowToMinute() return &QuickNoteState{ TraceID: traceID, @@ -118,27 +119,36 @@ func NewQuickNoteState(traceID string, userID int, conversationID, userInput str // CanRetryTool 判断当前是否还能继续重试工具调用。 func (s *QuickNoteState) CanRetryTool() bool { + // 规则:已尝试次数 < 最大重试次数 才允许继续。 + // 这里不做 <=,是为了让“第 MaxToolRetry 次失败后”及时停机并给用户明确反馈。 return s.ToolAttemptCount < s.MaxToolRetry } // RecordToolError 记录一次工具调用失败。 func (s *QuickNoteState) RecordToolError(errMsg string) { + // 1. 每失败一次都要累加计数,供分支节点判断是否继续重试。 s.ToolAttemptCount++ + // 2. 保留最后一次错误,便于日志与排障定位“最终失败原因”。 s.LastToolError = errMsg } // RecordToolSuccess 记录一次工具调用成功。 func (s *QuickNoteState) RecordToolSuccess(taskID int) { + // 1. 成功同样计入尝试次数,便于还原完整调用轨迹。 s.ToolAttemptCount++ + // 2. 回填 task_id 和成功标志,供后续节点拼接成功回复。 s.PersistedTaskID = taskID s.Persisted = true + // 3. 成功后清空错误,避免后续误读历史失败信息。 s.LastToolError = "" } // quickNoteLocation 返回随口记链路使用的业务时区。 func quickNoteLocation() *time.Location { + // 1. 优先加载业务固定时区,保证“明天/今晚”等语义与用户预期一致。 loc, err := time.LoadLocation(quickNoteTimezoneName) if err != nil { + // 2. 极端情况下回退到系统本地时区,避免因时区加载失败导致链路整体不可用。 return time.Local } return loc @@ -146,10 +156,12 @@ func quickNoteLocation() *time.Location { // quickNoteNowToMinute 返回当前时间并截断到分钟级。 func quickNoteNowToMinute() time.Time { + // 统一截断到分钟,避免秒级抖动导致“同一次请求前后解析口径不一致”。 return time.Now().In(quickNoteLocation()).Truncate(time.Minute) } // formatQuickNoteTimeToMinute 将时间格式化为分钟级字符串。 func formatQuickNoteTimeToMinute(t time.Time) string { + // 输出前统一转换到业务时区,避免日志和 prompt 出现跨时区混淆。 return t.In(quickNoteLocation()).Format(QuickNoteDatetimeMinuteLayout) } diff --git a/backend/agent/quicknote/tool.go b/backend/agent/quicknote/tool.go index 049beb1..a4372d8 100644 --- a/backend/agent/quicknote/tool.go +++ b/backend/agent/quicknote/tool.go @@ -69,9 +69,11 @@ type QuickNoteToolDeps struct { } func (d QuickNoteToolDeps) validate() error { + // 1. ResolveUserID 为空会导致工具无法绑定当前用户,必须提前失败。 if d.ResolveUserID == nil { return errors.New("quick note tool deps: ResolveUserID is nil") } + // 2. CreateTask 为空说明没有真实写库实现,工具无法完成核心职责。 if d.CreateTask == nil { return errors.New("quick note tool deps: CreateTask is nil") } @@ -128,18 +130,23 @@ type QuickNoteCreateTaskToolOutput struct { // BuildQuickNoteToolBundle 构建“AI随口记”工具包。 // 这是 agent 目录给上层编排层(chain/graph/react)提供的统一入口。 func BuildQuickNoteToolBundle(ctx context.Context, deps QuickNoteToolDeps) (*QuickNoteToolBundle, error) { + // 1. 启动期做依赖校验,尽早暴露 wiring 问题,避免运行时才 panic。 if err := deps.validate(); err != nil { return nil, err } + // 2. 通过 InferTool 把 Go 函数声明成“模型可调用工具”。 + // 该闭包函数是工具的真实执行体,后续所有参数校验都在这里兜底。 createTaskTool, err := toolutils.InferTool( ToolNameQuickNoteCreateTask, ToolDescQuickNoteCreateTask, func(ctx context.Context, input *QuickNoteCreateTaskToolInput) (*QuickNoteCreateTaskToolOutput, error) { + // 2.1 防御式检查:工具调用参数不能为 nil。 if input == nil { return nil, errors.New("工具参数不能为空") } + // 2.2 标题与优先级是写库硬条件,必须先校验。 title := strings.TrimSpace(input.Title) if title == "" { return nil, errors.New("title 不能为空") @@ -156,6 +163,7 @@ func BuildQuickNoteToolBundle(ctx context.Context, deps QuickNoteToolDeps) (*Qui return nil, err } + // 2.3 user_id 一律来自鉴权上下文,不信任模型侧入参,防止越权写别人的任务。 userID, err := deps.ResolveUserID(ctx) if err != nil { return nil, fmt.Errorf("解析用户身份失败: %w", err) @@ -164,6 +172,7 @@ func BuildQuickNoteToolBundle(ctx context.Context, deps QuickNoteToolDeps) (*Qui return nil, fmt.Errorf("非法 user_id=%d", userID) } + // 2.4 走业务层写库。 result, err := deps.CreateTask(ctx, QuickNoteCreateTaskRequest{ UserID: userID, Title: title, @@ -177,6 +186,7 @@ func BuildQuickNoteToolBundle(ctx context.Context, deps QuickNoteToolDeps) (*Qui return nil, errors.New("写入任务后返回结果异常") } + // 2.5 结果归一化:优先使用业务层返回值,其次回退到入参,保证输出稳定可读。 finalTitle := title if strings.TrimSpace(result.Title) != "" { finalTitle = strings.TrimSpace(result.Title) @@ -187,6 +197,7 @@ func BuildQuickNoteToolBundle(ctx context.Context, deps QuickNoteToolDeps) (*Qui finalPriority = result.PriorityGroup } + // 2.6 截止时间输出统一为 RFC3339,便于跨系统传输与调试。 deadlineStr := "" if result.DeadlineAt != nil { deadlineStr = result.DeadlineAt.In(quickNoteLocation()).Format(time.RFC3339) @@ -194,6 +205,7 @@ func BuildQuickNoteToolBundle(ctx context.Context, deps QuickNoteToolDeps) (*Qui deadlineStr = deadline.In(quickNoteLocation()).Format(time.RFC3339) } + // 2.7 组装给模型的结构化结果,包含可直接面向用户的 message 草稿。 return &QuickNoteCreateTaskToolOutput{ TaskID: result.TaskID, Title: finalTitle, @@ -208,6 +220,7 @@ func BuildQuickNoteToolBundle(ctx context.Context, deps QuickNoteToolDeps) (*Qui return nil, fmt.Errorf("构建随口记工具失败: %w", err) } + // 3. Tools 给执行节点使用,ToolInfos 给模型注册 schema 使用,二者都要返回。 tools := []tool.BaseTool{createTaskTool} infos, err := collectToolInfos(ctx, tools) if err != nil { @@ -221,6 +234,7 @@ func BuildQuickNoteToolBundle(ctx context.Context, deps QuickNoteToolDeps) (*Qui } func collectToolInfos(ctx context.Context, tools []tool.BaseTool) ([]*schema.ToolInfo, error) { + // 按工具列表顺序提取 ToolInfo,确保“tools[idx] <-> infos[idx]”一一对应。 infos := make([]*schema.ToolInfo, 0, len(tools)) for _, t := range tools { info, err := t.Info(ctx) @@ -235,16 +249,20 @@ func collectToolInfos(ctx context.Context, tools []tool.BaseTool) ([]*schema.Too // parseOptionalDeadline 解析工具输入中的可选截止时间。 // 该入口用于“工具参数强校验”:只要调用方给了非空 deadline_at,就必须能被解析。 func parseOptionalDeadline(raw string) (*time.Time, error) { + // 1. 先做标点与空白归一化,避免中文输入噪声影响解析。 value := normalizeDeadlineInput(raw) if value == "" { + // 2. 空字符串合法,表示任务无截止时间。 return nil, nil } + // 3. 统一按“严格模式”解析:给了时间就必须成功解析。 deadline, hasHint, err := parseOptionalDeadlineFromText(value, quickNoteNowToMinute()) if err != nil { return nil, err } if deadline == nil { + // 4. 区分“无时间线索”和“有线索但不支持”,返回更准确错误信息。 if !hasHint { return nil, fmt.Errorf("deadline_at 格式不支持: %s", value) } @@ -256,6 +274,7 @@ func parseOptionalDeadline(raw string) (*time.Time, error) { // parseOptionalDeadlineWithNow 在给定时间基准下解析 deadline。 // 该函数保持“严格模式”:非空字符串无法解析时会直接返回 error。 func parseOptionalDeadlineWithNow(raw string, now time.Time) (*time.Time, error) { + // 场景:模型已给出 deadline_at,需要基于同一 requestNow 再次硬校验。 value := normalizeDeadlineInput(raw) if value == "" { return nil, nil @@ -277,6 +296,7 @@ func parseOptionalDeadlineWithNow(raw string, now time.Time) (*time.Time, error) // - hasHint=false 且 err=nil:文本里没有明显时间线索,应视为“用户没给时间”; // - hasHint=true 且 err!=nil:用户给了时间但格式非法,应提示用户修正,不应落库。 func parseOptionalDeadlineFromUserInput(raw string, now time.Time) (*time.Time, bool, error) { + // 场景:解析用户原始句子时,允许“没给时间”,但不允许“给了错误时间却静默通过”。 value := normalizeDeadlineInput(raw) if value == "" { return nil, false, nil @@ -285,8 +305,10 @@ func parseOptionalDeadlineFromUserInput(raw string, now time.Time) (*time.Time, deadline, hasHint, err := parseOptionalDeadlineFromText(value, now) if err != nil { if hasHint { + // 有时间线索 + 解析失败:上层应明确提示用户改时间格式。 return nil, true, err } + // 无明显时间线索:按“未提供时间”处理。 return nil, false, nil } if deadline == nil { @@ -308,14 +330,17 @@ func parseOptionalDeadlineFromText(value string, now time.Time) (*time.Time, boo return nil, false, nil } + // 1. 统一时区与时间基准,保证相对时间可重复计算。 loc := quickNoteLocation() now = now.In(loc) hasHint := hasDeadlineHint(value) + // 2. 先尝试绝对时间(优先级更高,歧义更小)。 if abs, ok := tryParseAbsoluteDeadline(value, loc); ok { return abs, true, nil } + // 3. 再尝试相对时间(明天/下周一/今晚)。 if rel, recognized, err := tryParseRelativeDeadline(value, now, loc); recognized { if err != nil { return nil, true, err @@ -323,6 +348,7 @@ func parseOptionalDeadlineFromText(value string, now time.Time) (*time.Time, boo return rel, true, nil } + // 4. 到这里仍失败时,根据 hasHint 决定返回“软失败”还是“硬失败”。 if hasHint { return nil, true, fmt.Errorf("deadline_at 格式不支持: %s", value) } @@ -331,10 +357,12 @@ func parseOptionalDeadlineFromText(value string, now time.Time) (*time.Time, boo // normalizeDeadlineInput 把中文标点和空白先归一化,降低格式解析的噪声。 func normalizeDeadlineInput(raw string) string { + // 先 trim,避免纯空格输入影响后续逻辑。 trimmed := strings.TrimSpace(raw) if trimmed == "" { return "" } + // 将中文标点统一成英文形态,降低正则和 layout 解析复杂度。 replacer := strings.NewReplacer( ":", ":", ",", ",", @@ -349,6 +377,7 @@ func normalizeDeadlineInput(raw string) string { // 1) 用户根本没给时间(允许 deadline 为空); // 2) 用户给了时间但写错(必须提示修正,不能静默写 NULL)。 func hasDeadlineHint(value string) bool { + // 1. 先用结构化正则快速判断(时间格式、日期格式、周几格式)。 if quickNoteClockHMRegex.MatchString(value) || quickNoteClockCNRegex.MatchString(value) || quickNoteYMDRegex.MatchString(value) || @@ -357,6 +386,7 @@ func hasDeadlineHint(value string) bool { quickNoteWeekdayRegex.MatchString(value) { return true } + // 2. 再用词元判断“明天/今晚”等语义线索。 for _, token := range quickNoteRelativeTokens { if strings.Contains(value, token) { return true @@ -368,6 +398,7 @@ func hasDeadlineHint(value string) bool { // tryParseAbsoluteDeadline 尝试按绝对时间格式解析。 // 若只提供日期(无时分),默认归一到当天 23:59,表示“当日截止”。 func tryParseAbsoluteDeadline(value string, loc *time.Location) (*time.Time, bool) { + // 逐个 layout 尝试,命中即返回。 for _, layout := range quickNoteDeadlineLayouts { var ( t time.Time @@ -385,9 +416,11 @@ func tryParseAbsoluteDeadline(value string, loc *time.Location) (*time.Time, boo continue } + // Date-only 输入(例如 2026-03-20)默认补到 23:59。 if _, dateOnly := quickNoteDateOnlyLayouts[layout]; dateOnly { t = time.Date(t.Year(), t.Month(), t.Day(), 23, 59, 0, 0, loc) } else { + // 非 date-only 则统一清零秒级,保持分钟粒度一致。 t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), 0, 0, loc) } return &t, true @@ -400,11 +433,13 @@ func tryParseAbsoluteDeadline(value string, loc *time.Location) (*time.Time, boo // - 明天交报告(默认 23:59) // - 下周一上午9点开会(解析为下周一 09:00) func tryParseRelativeDeadline(value string, now time.Time, loc *time.Location) (*time.Time, bool, error) { + // 1. 先确定“哪一天”。 baseDate, recognized := inferBaseDate(value, now, loc) if !recognized { return nil, false, nil } + // 2. 再解析“几点几分”,若缺失则按语义默认时刻兜底。 hour, minute, hasExplicitClock, err := extractClock(value) if err != nil { return nil, true, err @@ -424,6 +459,7 @@ func tryParseRelativeDeadline(value string, now time.Time, loc *time.Location) ( // 3) 周几表达(本周/下周); // 4) 明天/后天/今晚等相对词。 func inferBaseDate(value string, now time.Time, loc *time.Location) (time.Time, bool) { + // 1) yyyy年MM月dd日 if matched := quickNoteYMDRegex.FindStringSubmatch(value); len(matched) == 4 { year, _ := strconv.Atoi(matched[1]) month, _ := strconv.Atoi(matched[2]) @@ -433,6 +469,7 @@ func inferBaseDate(value string, now time.Time, loc *time.Location) (time.Time, } } + // 2) MM月dd日(自动推断年份:若今年已过则滚到明年) if matched := quickNoteMDRegex.FindStringSubmatch(value); len(matched) == 3 { month, _ := strconv.Atoi(matched[1]) day, _ := strconv.Atoi(matched[2]) @@ -451,6 +488,7 @@ func inferBaseDate(value string, now time.Time, loc *time.Location) (time.Time, return candidate, true } + // 3) 本周/下周 + 周几 if matched := quickNoteWeekdayRegex.FindStringSubmatch(value); len(matched) == 3 { prefix := matched[1] target, ok := toWeekday(matched[2]) @@ -459,6 +497,7 @@ func inferBaseDate(value string, now time.Time, loc *time.Location) (time.Time, } } + // 4) 今天/明天/后天/大后天/昨天等相对词 today := startOfDay(now) switch { case strings.Contains(value, "大后天"): @@ -481,10 +520,12 @@ func inferBaseDate(value string, now time.Time, loc *time.Location) (time.Time, // - 24h 表达:18:30 // - 中文表达:3点、3点半、3点20分 func extractClock(value string) (int, int, bool, error) { + // hour/minute 最终会用于 time.Date,需要先做范围约束。 hour := 0 minute := 0 hasClock := false + // 1) 24 小时制:18:30 if matched := quickNoteClockHMRegex.FindStringSubmatch(value); len(matched) == 3 { h, errH := strconv.Atoi(matched[1]) m, errM := strconv.Atoi(matched[2]) @@ -495,6 +536,7 @@ func extractClock(value string) (int, int, bool, error) { minute = m hasClock = true } else if matched := quickNoteClockCNRegex.FindStringSubmatch(value); len(matched) >= 2 { + // 2) 中文时刻:3点 / 3点半 / 3点20分 h, errH := strconv.Atoi(matched[1]) if errH != nil { return 0, 0, true, fmt.Errorf("deadline_at 时间解析失败: %s", value) @@ -516,9 +558,11 @@ func extractClock(value string) (int, int, bool, error) { } if !hasClock { + // 没有显式时刻并不是错误,交给默认时刻策略处理。 return 0, 0, false, nil } + // 3) 根据“下午/晚上/中午/凌晨”等语义修正 12/24 小时制。 if isPMHint(value) && hour < 12 { hour += 12 } @@ -537,6 +581,7 @@ func extractClock(value string) (int, int, bool, error) { // defaultClockByHint 当文本只给了“日期/相对日”但没给具体时刻时,按语义兜底。 func defaultClockByHint(value string) (int, int) { + // 没有明确时刻时按中文语义设置一个“可解释的默认值”。 switch { case strings.Contains(value, "凌晨"): return 1, 0 @@ -555,19 +600,23 @@ func defaultClockByHint(value string) (int, int) { } func isPMHint(value string) bool { + // 下午/晚上/傍晚通常应映射到 12:00 之后。 return strings.Contains(value, "下午") || strings.Contains(value, "晚上") || strings.Contains(value, "今晚") || strings.Contains(value, "傍晚") } func isNoonHint(value string) bool { + // “中午 1 点”这类表达通常是 13:00 而非 01:00。 return strings.Contains(value, "中午") } func startOfDay(t time.Time) time.Time { + // 保留原时区,只把时分秒归零。 loc := t.Location() return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, loc) } func isValidDate(year, month, day int) bool { + // 先做快速范围筛,再用 time.Date 回填校验闰月闰年和越界日期。 if month < 1 || month > 12 || day < 1 || day > 31 { return false } @@ -576,6 +625,7 @@ func isValidDate(year, month, day int) bool { } func toWeekday(chinese string) (time.Weekday, bool) { + // 把中文周几映射到 Go 的 Weekday 枚举。 switch chinese { case "一": return time.Monday, true @@ -598,12 +648,14 @@ func toWeekday(chinese string) (time.Weekday, bool) { // resolveWeekdayDate 根据“本周/下周 + 周几”换算目标日期。 func resolveWeekdayDate(now time.Time, prefix string, target time.Weekday) time.Time { + // 1. 先定位本周周一。 today := startOfDay(now) weekdayOffset := (int(today.Weekday()) + 6) % 7 weekStart := today.AddDate(0, 0, -weekdayOffset) targetOffset := (int(target) + 6) % 7 candidateThisWeek := weekStart.AddDate(0, 0, targetOffset) + // 2. 再根据“本周/下周/无前缀”选择最终日期。 switch { case strings.HasPrefix(prefix, "下"): return candidateThisWeek.AddDate(0, 0, 7) diff --git a/backend/cmd/start.go b/backend/cmd/start.go index 7ef211c..b6d32b6 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -7,8 +7,9 @@ import ( "github.com/LoveLosita/smartflow/backend/api" "github.com/LoveLosita/smartflow/backend/dao" + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" "github.com/LoveLosita/smartflow/backend/inits" - kafkabus "github.com/LoveLosita/smartflow/backend/kafka" "github.com/LoveLosita/smartflow/backend/middleware" "github.com/LoveLosita/smartflow/backend/pkg" "github.com/LoveLosita/smartflow/backend/routers" @@ -58,14 +59,14 @@ func Start() { scheduleRepo := dao.NewScheduleDAO(db) manager := dao.NewManager(db) agentRepo := dao.NewAgentDAO(db) - outboxRepo := dao.NewOutboxDAO(db) + outboxRepo := outboxinfra.NewRepository(db) // outbox 异步链路接线: // - 读取 Kafka 配置 - // - 初始化 producer/consumer - // - 启动 dispatch/consume 两个后台循环 + // - 创建基础设施级 outbox 异步引擎 + // - 引擎内部负责 dispatch/consume 两个后台循环 kafkaCfg := kafkabus.LoadConfig() - asyncPipeline, err := service.NewAgentAsyncPipeline(outboxRepo, kafkaCfg) + asyncPipeline, err := outboxinfra.NewChatHistoryAsync(outboxRepo, kafkaCfg) if err != nil { log.Fatalf("Failed to initialize Kafka async pipeline: %v", err) } diff --git a/backend/dao/outbox.go b/backend/dao/outbox.go deleted file mode 100644 index 433c781..0000000 --- a/backend/dao/outbox.go +++ /dev/null @@ -1,190 +0,0 @@ -package dao - -import ( - "context" - "encoding/json" - "errors" - "time" - - "github.com/LoveLosita/smartflow/backend/model" - "gorm.io/gorm" - "gorm.io/gorm/clause" -) - -type OutboxDAO struct { - db *gorm.DB -} - -func NewOutboxDAO(db *gorm.DB) *OutboxDAO { - return &OutboxDAO{db: db} -} - -func (d *OutboxDAO) CreateChatHistoryMessage(ctx context.Context, topic, messageKey string, payload model.ChatHistoryPersistPayload, maxRetry int) (int64, error) { - if maxRetry <= 0 { - maxRetry = 20 - } - raw, err := json.Marshal(payload) - if err != nil { - return 0, err - } - now := time.Now() - msg := model.AgentOutboxMessage{ - BizType: model.OutboxBizTypeChatHistoryPersist, - Topic: topic, - MessageKey: messageKey, - Payload: string(raw), - Status: model.OutboxStatusPending, - RetryCount: 0, - MaxRetry: maxRetry, - NextRetryAt: &now, - } - if err = d.db.WithContext(ctx).Create(&msg).Error; err != nil { - return 0, err - } - return msg.ID, nil -} - -func (d *OutboxDAO) GetByID(ctx context.Context, id int64) (*model.AgentOutboxMessage, error) { - var msg model.AgentOutboxMessage - if err := d.db.WithContext(ctx).Where("id = ?", id).First(&msg).Error; err != nil { - return nil, err - } - return &msg, nil -} - -func (d *OutboxDAO) ListDueMessages(ctx context.Context, limit int) ([]model.AgentOutboxMessage, error) { - if limit <= 0 { - limit = 100 - } - now := time.Now() - var messages []model.AgentOutboxMessage - err := d.db.WithContext(ctx). - Where("status = ? AND next_retry_at IS NOT NULL AND next_retry_at <= ?", model.OutboxStatusPending, now). - Order("next_retry_at ASC, id ASC"). - Limit(limit). - Find(&messages).Error - if err != nil { - return nil, err - } - return messages, nil -} - -// MarkPublished 仅在消息未进入最终态时更新为 published,避免覆盖 consumed/dead。 -func (d *OutboxDAO) MarkPublished(ctx context.Context, id int64) error { - now := time.Now() - updates := map[string]interface{}{ - "status": model.OutboxStatusPublished, - "published_at": &now, - "last_error": nil, - "next_retry_at": nil, - } - result := d.db.WithContext(ctx). - Model(&model.AgentOutboxMessage{}). - Where("id = ? AND status NOT IN (?, ?)", id, model.OutboxStatusConsumed, model.OutboxStatusDead). - Updates(updates) - return result.Error -} - -func (d *OutboxDAO) MarkDead(ctx context.Context, id int64, reason string) error { - now := time.Now() - lastErr := truncateError(reason) - updates := map[string]interface{}{ - "status": model.OutboxStatusDead, - "last_error": &lastErr, - "next_retry_at": nil, - "updated_at": now, - } - return d.db.WithContext(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error -} - -func (d *OutboxDAO) MarkFailedForRetry(ctx context.Context, id int64, reason string) error { - return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { - var msg model.AgentOutboxMessage - err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", id).First(&msg).Error - if err != nil { - return err - } - if msg.Status == model.OutboxStatusConsumed || msg.Status == model.OutboxStatusDead { - return nil - } - - nextRetryCount := msg.RetryCount + 1 - now := time.Now() - status := model.OutboxStatusPending - var nextRetryAt *time.Time - if nextRetryCount >= msg.MaxRetry { - status = model.OutboxStatusDead - nextRetryAt = nil - } else { - t := now.Add(calcRetryBackoff(nextRetryCount)) - nextRetryAt = &t - } - lastErr := truncateError(reason) - updates := map[string]interface{}{ - "status": status, - "retry_count": nextRetryCount, - "last_error": &lastErr, - "next_retry_at": nextRetryAt, - "updated_at": now, - } - return tx.Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error - }) -} - -func (d *OutboxDAO) PersistChatHistoryAndMarkConsumed(ctx context.Context, outboxID int64, payload model.ChatHistoryPersistPayload) error { - return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { - var outboxMsg model.AgentOutboxMessage - err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", outboxID).First(&outboxMsg).Error - if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil - } - return err - } - if outboxMsg.Status == model.OutboxStatusConsumed { - return nil - } - if outboxMsg.Status == model.OutboxStatusDead { - return nil - } - - chatMsg := payload.Message - chatRole := payload.Role - history := model.ChatHistory{ - UserID: payload.UserID, - ChatID: payload.ConversationID, - MessageContent: &chatMsg, - Role: &chatRole, - } - if err = tx.Create(&history).Error; err != nil { - return err - } - - now := time.Now() - updates := map[string]interface{}{ - "status": model.OutboxStatusConsumed, - "consumed_at": &now, - "last_error": nil, - "next_retry_at": nil, - "updated_at": now, - } - return tx.Model(&model.AgentOutboxMessage{}).Where("id = ?", outboxID).Updates(updates).Error - }) -} - -func calcRetryBackoff(retryCount int) time.Duration { - if retryCount <= 0 { - return time.Second - } - if retryCount > 6 { - retryCount = 6 - } - return time.Second * time.Duration(1<<(retryCount-1)) -} - -func truncateError(reason string) string { - if len(reason) <= 2000 { - return reason - } - return reason[:2000] -} diff --git a/backend/kafka/admin.go b/backend/infra/kafka/admin.go similarity index 100% rename from backend/kafka/admin.go rename to backend/infra/kafka/admin.go diff --git a/backend/kafka/config.go b/backend/infra/kafka/config.go similarity index 100% rename from backend/kafka/config.go rename to backend/infra/kafka/config.go diff --git a/backend/kafka/consumer.go b/backend/infra/kafka/consumer.go similarity index 100% rename from backend/kafka/consumer.go rename to backend/infra/kafka/consumer.go diff --git a/backend/kafka/envelope.go b/backend/infra/kafka/envelope.go similarity index 100% rename from backend/kafka/envelope.go rename to backend/infra/kafka/envelope.go diff --git a/backend/kafka/producer.go b/backend/infra/kafka/producer.go similarity index 100% rename from backend/kafka/producer.go rename to backend/infra/kafka/producer.go diff --git a/backend/infra/outbox/chat_history_async.go b/backend/infra/outbox/chat_history_async.go new file mode 100644 index 0000000..d7d77b3 --- /dev/null +++ b/backend/infra/outbox/chat_history_async.go @@ -0,0 +1,91 @@ +package outbox + +import ( + "context" + "encoding/json" + "errors" + + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + "github.com/LoveLosita/smartflow/backend/model" +) + +// ChatHistoryAsync 是“聊天记录异步持久化”的业务适配器。 +// +// 设计目的: +// 1) 让业务层只调用 EnqueueChatHistoryPersist,而不感知扫描/投递/消费细节; +// 2) 保持现有 Agent 代码调用习惯,降低改造面; +// 3) 把具体的 outbox+kafka 主流程彻底收敛到 infra。 +type ChatHistoryAsync struct { + engine *Engine +} + +// NewChatHistoryAsync 创建聊天记录异步适配器并注册处理器。 +// +// 处理器职责: +// 1) 从 envelope payload 解析聊天载荷; +// 2) 调用仓储“落库并标记 consumed”; +// 3) 解析失败时标记 dead(不可恢复错误),避免无意义重试。 +func NewChatHistoryAsync(repo *Repository, cfg kafkabus.Config) (*ChatHistoryAsync, error) { + // 1. 先创建通用引擎,内部会按 cfg.Enabled 决定是否启用。 + engine, err := NewEngine(repo, cfg) + if err != nil { + return nil, err + } + if engine == nil { + // 2. 异步开关关闭:返回 nil 交给上层走同步降级路径。 + return nil, nil + } + + // 3. 注册“聊天记录持久化”业务处理器。 + // 该处理器只做三件事: + // 3.1 解析 payload; + // 3.2 调仓储落库并推进 consumed; + // 3.3 遇到不可恢复错误时标记 dead。 + if err = engine.RegisterHandler(model.OutboxBizTypeChatHistoryPersist, func(ctx context.Context, envelope kafkabus.Envelope) error { + var payload model.ChatHistoryPersistPayload + if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { + _ = repo.MarkDead(ctx, envelope.OutboxID, "解析聊天持久化载荷失败: "+unmarshalErr.Error()) + // 返回 nil:该错误已被标记为 dead,不需要再走重试。 + return nil + } + // 返回 error:由 engine 统一标记为 retry 并提交 offset。 + return repo.PersistChatHistoryAndMarkConsumed(ctx, envelope.OutboxID, payload) + }); err != nil { + // 4. 注册失败时回收已创建的引擎资源,防止泄漏。 + engine.Close() + return nil, err + } + + // 5. 返回业务适配器,对业务层暴露“更语义化”的调用入口。 + return &ChatHistoryAsync{engine: engine}, nil +} + +// Start 启动异步引擎(扫描 + 消费)。 +func (a *ChatHistoryAsync) Start(ctx context.Context) { + // 允许在未初始化(例如异步关闭)时被安全调用。 + if a == nil || a.engine == nil { + return + } + a.engine.Start(ctx) +} + +// Close 关闭异步引擎资源。 +func (a *ChatHistoryAsync) Close() { + // 允许在未初始化(例如异步关闭)时被安全调用。 + if a == nil || a.engine == nil { + return + } + a.engine.Close() +} + +// EnqueueChatHistoryPersist 将聊天记录持久化请求写入 outbox。 +// 该方法是业务层唯一需要调用的入口。 +func (a *ChatHistoryAsync) EnqueueChatHistoryPersist(ctx context.Context, payload model.ChatHistoryPersistPayload) error { + // 1. 若引擎未初始化,说明启动配置有问题或异步功能未启用。 + // 这里显式返回错误,交由业务层按需降级/告警。 + if a == nil || a.engine == nil { + return errors.New("chat history async is not initialized") + } + // 2. 以 conversation_id 作为 messageKey,尽量让同会话消息落在稳定分区。 + return a.engine.Enqueue(ctx, model.OutboxBizTypeChatHistoryPersist, payload.ConversationID, payload) +} diff --git a/backend/infra/outbox/engine.go b/backend/infra/outbox/engine.go new file mode 100644 index 0000000..1c0a8a9 --- /dev/null +++ b/backend/infra/outbox/engine.go @@ -0,0 +1,333 @@ +package outbox + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "sync" + "time" + + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + "github.com/LoveLosita/smartflow/backend/model" + segmentkafka "github.com/segmentio/kafka-go" + "gorm.io/gorm" +) + +// MessageHandler 是 outbox 消费分发处理器。 +// +// 设计约束: +// 1) 入参 envelope 已经完成最外层解析(含 outbox_id、biz_type、payload); +// 2) 若返回 nil,表示业务处理成功,框架将继续提交 Kafka offset; +// 3) 若返回 error,框架会按“可重试错误”处理:回写 outbox 失败状态并进入重试窗口。 +type MessageHandler func(ctx context.Context, envelope kafkabus.Envelope) error + +// Engine 是 Outbox + Kafka 的通用异步引擎。 +// +// 职责边界: +// 1) 负责 outbox 扫描、Kafka 投递、Kafka 消费与统一状态机流转; +// 2) 负责 biz_type 到处理器的分发; +// 3) 不关心具体业务含义(例如“聊天记录落库”),业务语义由 handler 提供。 +// +// 状态流转口径: +// pending -> published -> consumed(成功); +// pending/published --失败--> pending(带 next_retry_at) 或 dead(达到最大重试)。 +type Engine struct { + repo *Repository + producer *kafkabus.Producer + consumer *kafkabus.Consumer + + brokers []string + topic string + maxRetry int + scanEvery time.Duration + scanBatch int + + handlersMu sync.RWMutex + handlers map[string]MessageHandler +} + +// NewEngine 创建 outbox 异步引擎。 +// +// 说明: +// 1) cfg.Enabled=false 时返回 nil,调用方可按“异步关闭”处理; +// 2) producer/consumer 初始化失败时会确保资源回收,避免半初始化泄漏。 +func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) { + // 1. 配置关闭时直接返回 nil,让上层可以“无侵入降级为同步模式”。 + if !cfg.Enabled { + return nil, nil + } + // 2. 仓储缺失属于启动期配置错误,直接返回。 + if repo == nil { + return nil, errors.New("outbox repository is nil") + } + + // 3. 先初始化 producer,再初始化 consumer。 + // 如果第二步失败,要主动回收第一步资源,避免泄漏。 + producer, err := kafkabus.NewProducer(cfg) + if err != nil { + return nil, err + } + consumer, err := kafkabus.NewConsumer(cfg) + if err != nil { + _ = producer.Close() + return nil, err + } + + // 4. 汇总配置,构造引擎实例。 + return &Engine{ + repo: repo, + producer: producer, + consumer: consumer, + brokers: cfg.Brokers, + topic: cfg.Topic, + maxRetry: cfg.MaxRetry, + scanEvery: cfg.RetryScanInterval, + scanBatch: cfg.RetryBatchSize, + handlers: make(map[string]MessageHandler), + }, nil +} + +// RegisterHandler 注册某个 biz_type 的消费处理器。 +// +// 设计要求: +// 1) biz_type 必须唯一,重复注册会覆盖旧值(并打印提示日志); +// 2) handler 不能为空; +// 3) 建议在 Start 前完成注册,减少运行时热更新复杂度。 +func (e *Engine) RegisterHandler(bizType string, handler MessageHandler) error { + // 1. 参数校验:防止业务侧在启动链路上把 nil 引擎继续往下用。 + if e == nil { + return errors.New("outbox engine is nil") + } + // 2. biz_type 为空会导致无法分发,提前拦截。 + if bizType == "" { + return errors.New("bizType is empty") + } + // 3. handler 为空会在消费时 panic,必须提前拒绝。 + if handler == nil { + return errors.New("handler is nil") + } + + // 4. 加写锁更新 handler 映射,保证并发注册时 map 安全。 + e.handlersMu.Lock() + defer e.handlersMu.Unlock() + if _, exists := e.handlers[bizType]; exists { + log.Printf("outbox handler 覆盖注册: biz_type=%s", bizType) + } + e.handlers[bizType] = handler + return nil +} + +func (e *Engine) getHandler(bizType string) (MessageHandler, bool) { + // 读锁足够满足并发读取需求,避免无谓阻塞。 + e.handlersMu.RLock() + defer e.handlersMu.RUnlock() + h, ok := e.handlers[bizType] + return h, ok +} + +// Start 启动 outbox 异步引擎。 +// +// 会启动两个后台循环: +// 1) dispatch loop:扫描 due outbox 并投递到 Kafka; +// 2) consume loop:消费 Kafka 并按 biz_type 分发处理。 +func (e *Engine) Start(ctx context.Context) { + if e == nil { + return + } + + // 1. 启动日志:把关键运行参数打出来,便于排查“为什么没消费/没扫描”。 + log.Printf("outbox engine starting: topic=%s brokers=%v retry_scan=%s batch=%d", e.topic, e.brokers, e.scanEvery, e.scanBatch) + + // 2. 启动前探活 topic 是否可用。 + // 注意:即使探活失败也不会阻断引擎启动,后续循环会继续重试。 + if err := kafkabus.WaitTopicReady(ctx, e.brokers, e.topic, 30*time.Second); err != nil { + log.Printf("Kafka topic not ready before consume loop start: %v", err) + } else { + log.Printf("Kafka topic is ready: %s", e.topic) + } + + // 3. 并行启动两条核心循环: + // - dispatch loop:负责 outbox -> Kafka; + // - consume loop:负责 Kafka -> handler -> outbox 状态推进。 + go e.startDispatchLoop(ctx) + go e.startConsumeLoop(ctx) +} + +// Close 关闭 producer/consumer 资源。 +func (e *Engine) Close() { + if e == nil { + return + } + // 逐个关闭并记录错误,避免某个 close 失败导致后续资源无法回收。 + if err := e.producer.Close(); err != nil { + log.Printf("关闭 Kafka producer 失败: %v", err) + } + if err := e.consumer.Close(); err != nil { + log.Printf("关闭 Kafka consumer 失败: %v", err) + } +} + +// Enqueue 把业务消息写入 outbox(请求路径调用)。 +// +// 注意: +// 1) 该方法不做 Kafka 网络写入,只有数据库写入; +// 2) messageKey 建议使用业务幂等键(如 conversation_id)以提升分区稳定性; +// 3) payload 需要可 JSON 序列化。 +func (e *Engine) Enqueue(ctx context.Context, bizType, messageKey string, payload any) error { + if e == nil { + return errors.New("outbox engine is nil") + } + // 这里故意只写数据库,不做 Kafka 网络 IO, + // 目的是把请求耗时稳定在“单次写库”的可控范围。 + _, err := e.repo.CreateMessage(ctx, bizType, e.topic, messageKey, payload, e.maxRetry) + return err +} + +func (e *Engine) startDispatchLoop(ctx context.Context) { + // 1. 定时扫描 due outbox 记录。 + // 扫描间隔由 scanEvery 控制,避免每次请求都主动触发投递造成抖动。 + ticker := time.NewTicker(e.scanEvery) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // 2. 收到退出信号后优雅停止循环。 + return + case <-ticker.C: + // 3. 拉取当前窗口内可投递消息。 + pendingMessages, err := e.repo.ListDueMessages(ctx, e.scanBatch) + if err != nil { + log.Printf("扫描 outbox 失败: %v", err) + continue + } + if len(pendingMessages) > 0 { + log.Printf("outbox due messages=%d, start dispatch", len(pendingMessages)) + } + + // 4. 逐条投递,单条失败不影响同批后续消息。 + for _, msg := range pendingMessages { + if err = e.dispatchOne(ctx, msg.ID); err != nil { + log.Printf("重试投递 outbox 消息失败(id=%d): %v", msg.ID, err) + } + } + } + } +} + +func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error { + // 1. 投递前重新按 ID 读取最新状态,避免用到过期快照。 + outboxMsg, err := e.repo.GetByID(ctx, outboxID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // 1.1 记录已不存在(可能被清理),按幂等成功处理。 + return nil + } + return err + } + // 1.2 最终态直接跳过,避免重复投递。 + if outboxMsg.Status == model.OutboxStatusConsumed || outboxMsg.Status == model.OutboxStatusDead { + return nil + } + + // 2. 组装 Kafka 包装体,统一带上 outbox_id 供消费端做状态回写。 + envelope := kafkabus.Envelope{ + OutboxID: outboxMsg.ID, + BizType: outboxMsg.BizType, + Payload: json.RawMessage(outboxMsg.Payload), + } + raw, err := json.Marshal(envelope) + if err != nil { + // 2.1 包装层序列化失败通常不可恢复,直接标 dead。 + markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 包装失败: "+err.Error()) + if markErr != nil { + log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr) + } + return err + } + + // 3. 先投 Kafka,再把 outbox 状态推进到 published。 + // 任一步骤失败都回写 retry,让扫描器后续重试。 + if err = e.producer.Enqueue(ctx, outboxMsg.Topic, outboxMsg.MessageKey, raw); err != nil { + _ = e.repo.MarkFailedForRetry(ctx, outboxMsg.ID, "投递 Kafka 失败: "+err.Error()) + return err + } + if err = e.repo.MarkPublished(ctx, outboxMsg.ID); err != nil { + _ = e.repo.MarkFailedForRetry(ctx, outboxMsg.ID, "更新已投递状态失败: "+err.Error()) + return err + } + return nil +} + +func (e *Engine) startConsumeLoop(ctx context.Context) { + // 消费循环采用“拉取 -> 处理 -> 提交 offset”的标准模型。 + for { + select { + case <-ctx.Done(): + // 1. 收到退出信号后终止循环。 + return + default: + } + + // 2. 拉取下一条 Kafka 消息。 + msg, err := e.consumer.Dequeue(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + // 2.1 context 主动取消时,不记错误日志,直接退出。 + return + } + // 2.2 临时错误短暂退避后继续,避免空转刷日志。 + log.Printf("Kafka 消费拉取失败(topic=%s): %v", e.topic, err) + time.Sleep(300 * time.Millisecond) + continue + } + + // 3. 单条消息处理失败仅记录日志,不阻断消费循环。 + if err = e.handleMessage(ctx, msg); err != nil { + log.Printf("处理 Kafka 消息失败(topic=%s, partition=%d, offset=%d): %v", msg.Topic, msg.Partition, msg.Offset, err) + } + } +} + +func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) error { + // 1. 先解析最外层 envelope,拿到 outbox_id + biz_type + payload。 + var envelope kafkabus.Envelope + if err := json.Unmarshal(msg.Value, &envelope); err != nil { + // 1.1 包装层损坏时无法恢复,直接提交 offset 防止无限重放。 + _ = e.consumer.Commit(ctx, msg) + return fmt.Errorf("解析 Kafka 包装失败: %w", err) + } + if envelope.OutboxID <= 0 { + // 1.2 缺少 outbox_id 无法回写状态,同样提交 offset 跳过。 + _ = e.consumer.Commit(ctx, msg) + return errors.New("Kafka 包装缺少 outbox_id") + } + + // 2. 根据 biz_type 查找业务处理器。 + handler, ok := e.getHandler(envelope.BizType) + if !ok { + // 2.1 未注册处理器是配置错误,标记 dead 并提交 offset,避免重复消费。 + _ = e.repo.MarkDead(ctx, envelope.OutboxID, "未知业务类型: "+envelope.BizType) + if err := e.consumer.Commit(ctx, msg); err != nil { + return err + } + return nil + } + + // 3. 调用业务处理器。 + if err := handler(ctx, envelope); err != nil { + // 统一按“可重试错误”处理,回写 retry 状态后提交 offset,避免同一条消息在 Kafka 侧死循环。 + if markErr := e.repo.MarkFailedForRetry(ctx, envelope.OutboxID, "消费处理失败: "+err.Error()); markErr != nil { + return markErr + } + if commitErr := e.consumer.Commit(ctx, msg); commitErr != nil { + return commitErr + } + return err + } + + // 4. 业务处理成功后提交 offset。 + return e.consumer.Commit(ctx, msg) +} diff --git a/backend/infra/outbox/repository.go b/backend/infra/outbox/repository.go new file mode 100644 index 0000000..0a5849b --- /dev/null +++ b/backend/infra/outbox/repository.go @@ -0,0 +1,259 @@ +package outbox + +import ( + "context" + "encoding/json" + "errors" + "time" + + "github.com/LoveLosita/smartflow/backend/model" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type Repository struct { + db *gorm.DB +} + +// NewRepository 构造 outbox 仓储。 +// 该仓储只关心“数据库状态机”,不关心 Kafka 投递/消费。 +func NewRepository(db *gorm.DB) *Repository { + return &Repository{db: db} +} + +// CreateMessage 是通用 outbox 入队入口。 +// +// 设计说明: +// 1) 该方法只做“把消息安全写入本地 outbox 表”,不做任何 Kafka 网络调用; +// 2) next_retry_at 初始化为当前时间,表示“可立即被扫描器捞取”; +// 3) biz_type 由业务方传入,用于消费侧分发到不同处理器; +// 4) payload 会被序列化为 JSON 字符串存入 payload 字段,后续再按 biz_type 反序列化。 +// +// 这也是 Outbox 模式的核心:请求路径只承担本地写库成本,把外部系统不确定性(Kafka 延迟/抖动) +// 转移给后台异步循环处理。 +func (d *Repository) CreateMessage(ctx context.Context, bizType, topic, messageKey string, payload any, maxRetry int) (int64, error) { + // 1. 防御式兜底:若调用方未传 maxRetry,则统一使用默认值 20。 + // 这样可以避免某些链路遗漏配置导致消息无限重试或零重试。 + if maxRetry <= 0 { + maxRetry = 20 + } + + // 2. 先把业务载荷序列化成 JSON 字符串。 + // 序列化失败属于“请求入队前失败”,此时不应创建 outbox 记录,直接返回错误即可。 + raw, err := json.Marshal(payload) + if err != nil { + return 0, err + } + + // 3. 组装 outbox 初始记录: + // - status=pending:表示待投递; + // - retry_count=0:尚未重试; + // - next_retry_at=now:扫描器可立即捞取并尝试首次投递。 + now := time.Now() + msg := model.AgentOutboxMessage{ + BizType: bizType, + Topic: topic, + MessageKey: messageKey, + Payload: string(raw), + Status: model.OutboxStatusPending, + RetryCount: 0, + MaxRetry: maxRetry, + NextRetryAt: &now, + } + + // 4. 落库成功后返回 outbox 主键,供上层日志/追踪链路使用。 + if err = d.db.WithContext(ctx).Create(&msg).Error; err != nil { + return 0, err + } + return msg.ID, nil +} + +// CreateChatHistoryMessage 是聊天记录持久化的兼容入口。 +// 说明:为了避免现有业务调用一次性改太多,先保留该方法作为 CreateMessage 的薄封装。 +func (d *Repository) CreateChatHistoryMessage(ctx context.Context, topic, messageKey string, payload model.ChatHistoryPersistPayload, maxRetry int) (int64, error) { + return d.CreateMessage(ctx, model.OutboxBizTypeChatHistoryPersist, topic, messageKey, payload, maxRetry) +} + +// GetByID 按主键读取 outbox 记录。 +// 该方法通常用于 dispatch 前“再读一次最新状态”,避免使用过期快照。 +func (d *Repository) GetByID(ctx context.Context, id int64) (*model.AgentOutboxMessage, error) { + var msg model.AgentOutboxMessage + if err := d.db.WithContext(ctx).Where("id = ?", id).First(&msg).Error; err != nil { + return nil, err + } + return &msg, nil +} + +// ListDueMessages 拉取“到期可投递”的 pending 消息。 +// 条件说明: +// 1) status = pending:只处理待投递状态; +// 2) next_retry_at <= now:到达可重试/可首次投递时间; +// 3) 按 next_retry_at + id 升序:保证老消息优先,降低饥饿概率。 +func (d *Repository) ListDueMessages(ctx context.Context, limit int) ([]model.AgentOutboxMessage, error) { + // 1. 限流兜底,避免误传 0 导致一次拉取过多消息。 + if limit <= 0 { + limit = 100 + } + now := time.Now() + var messages []model.AgentOutboxMessage + err := d.db.WithContext(ctx). + Where("status = ? AND next_retry_at IS NOT NULL AND next_retry_at <= ?", model.OutboxStatusPending, now). + Order("next_retry_at ASC, id ASC"). + Limit(limit). + Find(&messages).Error + if err != nil { + return nil, err + } + return messages, nil +} + +// MarkPublished 仅在消息未进入最终态时更新为 published,避免覆盖 consumed/dead。 +func (d *Repository) MarkPublished(ctx context.Context, id int64) error { + // 1. published 代表“已成功写入 Kafka”。 + // 2. 清理 last_error/next_retry_at,表示当前无需重试。 + now := time.Now() + updates := map[string]interface{}{ + "status": model.OutboxStatusPublished, + "published_at": &now, + "last_error": nil, + "next_retry_at": nil, + } + // 3. 额外加状态保护,避免并发下把 consumed/dead 错误覆盖回 published。 + result := d.db.WithContext(ctx). + Model(&model.AgentOutboxMessage{}). + Where("id = ? AND status NOT IN (?, ?)", id, model.OutboxStatusConsumed, model.OutboxStatusDead). + Updates(updates) + return result.Error +} + +// MarkDead 把消息标记为死信(最终失败,不再重试)。 +// 常见场景:载荷不可反序列化、biz_type 未注册等“不可恢复错误”。 +func (d *Repository) MarkDead(ctx context.Context, id int64, reason string) error { + // 1. 错误文本统一裁剪,避免超长错误撑爆字段或日志。 + now := time.Now() + lastErr := truncateError(reason) + updates := map[string]interface{}{ + "status": model.OutboxStatusDead, + "last_error": &lastErr, + "next_retry_at": nil, + "updated_at": now, + } + return d.db.WithContext(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error +} + +// MarkFailedForRetry 把一次失败写回 outbox 状态机,并计算下一次重试窗口。 +// 该方法必须在事务内完成“读当前状态 + 写新状态”,保证并发时计数和状态一致。 +func (d *Repository) MarkFailedForRetry(ctx context.Context, id int64, reason string) error { + return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + // 1. 行级锁读取,避免多个 goroutine 同时更新同一条消息导致 retry_count 乱序。 + var msg model.AgentOutboxMessage + err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", id).First(&msg).Error + if err != nil { + return err + } + + // 2. 若已是最终态(consumed/dead),直接幂等返回。 + // 这样即使出现重复调用,也不会把最终态改坏。 + if msg.Status == model.OutboxStatusConsumed || msg.Status == model.OutboxStatusDead { + return nil + } + + // 3. 递增重试计数并判断是否达到最大重试次数。 + nextRetryCount := msg.RetryCount + 1 + now := time.Now() + status := model.OutboxStatusPending + var nextRetryAt *time.Time + if nextRetryCount >= msg.MaxRetry { + // 3.1 达到上限:转 dead,停止后续扫描重试。 + status = model.OutboxStatusDead + nextRetryAt = nil + } else { + // 3.2 未到上限:按指数退避计算下一次可重试时间。 + t := now.Add(calcRetryBackoff(nextRetryCount)) + nextRetryAt = &t + } + + // 4. 写回失败原因与状态快照,便于排查问题。 + lastErr := truncateError(reason) + updates := map[string]interface{}{ + "status": status, + "retry_count": nextRetryCount, + "last_error": &lastErr, + "next_retry_at": nextRetryAt, + "updated_at": now, + } + return tx.Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error + }) +} + +// PersistChatHistoryAndMarkConsumed 负责“消费成功后落业务库 + 标记 outbox consumed”。 +// 之所以必须放在同一个事务里,是为了保证“业务落库”和“状态推进”原子一致: +// - 若业务写入失败,不应把 outbox 标记为 consumed; +// - 若标记 consumed 失败,也应回滚业务写入,避免出现不可追踪的不一致。 +func (d *Repository) PersistChatHistoryAndMarkConsumed(ctx context.Context, outboxID int64, payload model.ChatHistoryPersistPayload) error { + return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + // 1. 先锁定 outbox 记录,确保同一条消息不会被并发消费者重复推进状态。 + var outboxMsg model.AgentOutboxMessage + err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", outboxID).First(&outboxMsg).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // 1.1 幂等兜底:记录不存在时视为“无事可做”。 + return nil + } + return err + } + // 1.2 若已 consumed/dead,说明已被处理过或已终止,直接幂等返回。 + if outboxMsg.Status == model.OutboxStatusConsumed { + return nil + } + if outboxMsg.Status == model.OutboxStatusDead { + return nil + } + + // 2. 写入聊天历史业务表(chat_histories)。 + // 这里不包含 token 统计等扩展字段,只负责核心消息落库。 + chatMsg := payload.Message + chatRole := payload.Role + history := model.ChatHistory{ + UserID: payload.UserID, + ChatID: payload.ConversationID, + MessageContent: &chatMsg, + Role: &chatRole, + } + if err = tx.Create(&history).Error; err != nil { + return err + } + + // 3. 业务写入成功后,把 outbox 推进到 consumed 最终态。 + // 并清理错误与重试字段,表示该消息生命周期结束。 + now := time.Now() + updates := map[string]interface{}{ + "status": model.OutboxStatusConsumed, + "consumed_at": &now, + "last_error": nil, + "next_retry_at": nil, + "updated_at": now, + } + return tx.Model(&model.AgentOutboxMessage{}).Where("id = ?", outboxID).Updates(updates).Error + }) +} + +// calcRetryBackoff 计算指数退避时间。 +// 规则:1s, 2s, 4s, 8s, 16s, 32s(最多封顶到第 6 档)。 +func calcRetryBackoff(retryCount int) time.Duration { + if retryCount <= 0 { + return time.Second + } + if retryCount > 6 { + retryCount = 6 + } + return time.Second * time.Duration(1<<(retryCount-1)) +} + +// truncateError 限制错误文本最大长度,防止写库失败或日志污染。 +func truncateError(reason string) string { + if len(reason) <= 2000 { + return reason + } + return reason[:2000] +} diff --git a/backend/service/agent_async_pipeline.go b/backend/service/agent_async_pipeline.go deleted file mode 100644 index 6e2b687..0000000 --- a/backend/service/agent_async_pipeline.go +++ /dev/null @@ -1,229 +0,0 @@ -package service - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "log" - "time" - - "github.com/LoveLosita/smartflow/backend/dao" - kafkabus "github.com/LoveLosita/smartflow/backend/kafka" - "github.com/LoveLosita/smartflow/backend/model" - segmentkafka "github.com/segmentio/kafka-go" - "gorm.io/gorm" -) - -// AgentAsyncPipeline 负责 outbox 扫描、Kafka 投递与消费落库。 -type AgentAsyncPipeline struct { - outboxRepo *dao.OutboxDAO - producer *kafkabus.Producer - consumer *kafkabus.Consumer - brokers []string - topic string - maxRetry int - scanEvery time.Duration - scanBatch int -} - -func NewAgentAsyncPipeline(outboxRepo *dao.OutboxDAO, cfg kafkabus.Config) (*AgentAsyncPipeline, error) { - if !cfg.Enabled { - return nil, nil - } - producer, err := kafkabus.NewProducer(cfg) - if err != nil { - return nil, err - } - consumer, err := kafkabus.NewConsumer(cfg) - if err != nil { - _ = producer.Close() - return nil, err - } - return &AgentAsyncPipeline{ - outboxRepo: outboxRepo, - producer: producer, - consumer: consumer, - brokers: cfg.Brokers, - topic: cfg.Topic, - maxRetry: cfg.MaxRetry, - scanEvery: cfg.RetryScanInterval, - scanBatch: cfg.RetryBatchSize, - }, nil -} - -func (p *AgentAsyncPipeline) Start(ctx context.Context) { - if p == nil { - return - } - - log.Printf("Kafka async pipeline starting: topic=%s brokers=%v retry_scan=%s batch=%d", p.topic, p.brokers, p.scanEvery, p.scanBatch) - if err := kafkabus.WaitTopicReady(ctx, p.brokers, p.topic, 30*time.Second); err != nil { - log.Printf("Kafka topic not ready before consume loop start: %v", err) - } else { - log.Printf("Kafka topic is ready: %s", p.topic) - } - - go p.startDispatchLoop(ctx) - go p.startConsumeLoop(ctx) -} - -func (p *AgentAsyncPipeline) Close() { - if p == nil { - return - } - if err := p.producer.Close(); err != nil { - log.Printf("关闭 Kafka producer 失败: %v", err) - } - if err := p.consumer.Close(); err != nil { - log.Printf("关闭 Kafka consumer 失败: %v", err) - } -} - -// EnqueueChatHistoryPersist 仅把消息写入 outbox。 -// -// 关键设计: -// 1) 不再在请求路径里做“首次同步投递 Kafka”; -// 2) 投递统一由 startDispatchLoop 异步扫描执行; -// 3) CreateChatHistoryMessage 会设置 next_retry_at=now,扫描器下一轮即可捞取。 -// -// 这样可以把请求链路成本收敛到“写 outbox”,避免 Kafka 写入延迟污染首字和主链路时延。 -func (p *AgentAsyncPipeline) EnqueueChatHistoryPersist(ctx context.Context, payload model.ChatHistoryPersistPayload) error { - if p == nil { - return errors.New("Kafka 异步链路未初始化") - } - _, err := p.outboxRepo.CreateChatHistoryMessage(ctx, p.topic, payload.ConversationID, payload, p.maxRetry) - return err -} - -func (p *AgentAsyncPipeline) startDispatchLoop(ctx context.Context) { - ticker := time.NewTicker(p.scanEvery) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - pendingMessages, err := p.outboxRepo.ListDueMessages(ctx, p.scanBatch) - if err != nil { - log.Printf("扫描 outbox 失败: %v", err) - continue - } - if len(pendingMessages) > 0 { - log.Printf("outbox due messages=%d, start dispatch", len(pendingMessages)) - } - for _, msg := range pendingMessages { - if err = p.dispatchOne(ctx, msg.ID); err != nil { - log.Printf("重试投递 outbox 消息失败(id=%d): %v", msg.ID, err) - } - } - } - } -} - -func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) error { - outboxMsg, err := p.outboxRepo.GetByID(ctx, outboxID) - if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil - } - return err - } - if outboxMsg.Status == model.OutboxStatusConsumed || outboxMsg.Status == model.OutboxStatusDead { - return nil - } - - envelope := kafkabus.Envelope{ - OutboxID: outboxMsg.ID, - BizType: outboxMsg.BizType, - Payload: json.RawMessage(outboxMsg.Payload), - } - raw, err := json.Marshal(envelope) - if err != nil { - markErr := p.outboxRepo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 包装失败: "+err.Error()) - if markErr != nil { - log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr) - } - return err - } - - if err = p.producer.Enqueue(ctx, outboxMsg.Topic, outboxMsg.MessageKey, raw); err != nil { - _ = p.outboxRepo.MarkFailedForRetry(ctx, outboxMsg.ID, "投递 Kafka 失败: "+err.Error()) - return err - } - if err = p.outboxRepo.MarkPublished(ctx, outboxMsg.ID); err != nil { - _ = p.outboxRepo.MarkFailedForRetry(ctx, outboxMsg.ID, "更新已投递状态失败: "+err.Error()) - return err - } - return nil -} - -func (p *AgentAsyncPipeline) startConsumeLoop(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - } - - msg, err := p.consumer.Dequeue(ctx) - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - log.Printf("Kafka 消费拉取失败(topic=%s): %v", p.topic, err) - time.Sleep(300 * time.Millisecond) - continue - } - if err = p.handleMessage(ctx, msg); err != nil { - log.Printf("处理 Kafka 消息失败(topic=%s, partition=%d, offset=%d): %v", msg.Topic, msg.Partition, msg.Offset, err) - } - } -} - -func (p *AgentAsyncPipeline) handleMessage(ctx context.Context, msg segmentkafka.Message) error { - var envelope kafkabus.Envelope - if err := json.Unmarshal(msg.Value, &envelope); err != nil { - _ = p.consumer.Commit(ctx, msg) - return fmt.Errorf("解析 Kafka 包装失败: %w", err) - } - if envelope.OutboxID <= 0 { - _ = p.consumer.Commit(ctx, msg) - return errors.New("Kafka 包装缺少 outbox_id") - } - - switch envelope.BizType { - case model.OutboxBizTypeChatHistoryPersist: - return p.consumeChatHistory(ctx, msg, envelope) - default: - _ = p.outboxRepo.MarkDead(ctx, envelope.OutboxID, "未知业务类型: "+envelope.BizType) - if err := p.consumer.Commit(ctx, msg); err != nil { - return err - } - return nil - } -} - -func (p *AgentAsyncPipeline) consumeChatHistory(ctx context.Context, msg segmentkafka.Message, envelope kafkabus.Envelope) error { - var payload model.ChatHistoryPersistPayload - if err := json.Unmarshal(envelope.Payload, &payload); err != nil { - _ = p.outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析聊天持久化载荷失败: "+err.Error()) - if commitErr := p.consumer.Commit(ctx, msg); commitErr != nil { - return commitErr - } - return nil - } - - if err := p.outboxRepo.PersistChatHistoryAndMarkConsumed(ctx, envelope.OutboxID, payload); err != nil { - if markErr := p.outboxRepo.MarkFailedForRetry(ctx, envelope.OutboxID, "消费并落库失败: "+err.Error()); markErr != nil { - return markErr - } - if commitErr := p.consumer.Commit(ctx, msg); commitErr != nil { - return commitErr - } - return err - } - - return p.consumer.Commit(ctx, msg) -} diff --git a/backend/service/agent_bridge.go b/backend/service/agent_bridge.go new file mode 100644 index 0000000..e0240f9 --- /dev/null +++ b/backend/service/agent_bridge.go @@ -0,0 +1,22 @@ +package service + +import ( + "github.com/LoveLosita/smartflow/backend/dao" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + "github.com/LoveLosita/smartflow/backend/inits" + "github.com/LoveLosita/smartflow/backend/service/agentsvc" +) + +// AgentService 是 service 层对 agentsvc.AgentService 的兼容别名。 +// 迁移目的: +// 1) 把 Agent 业务实现收拢到 service/agentsvc,提升目录整洁度; +// 2) 不破坏既有调用方(api/cmd 仍然可以引用 service.AgentService)。 +type AgentService = agentsvc.AgentService + +// NewAgentService 是迁移期兼容构造函数。 +// 说明: +// 1) 外部调用签名保持不变; +// 2) 真实构造逻辑已下沉到 service/agentsvc 包。 +func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, agentRedis *dao.AgentCache, asyncPipeline *outboxinfra.ChatHistoryAsync) *AgentService { + return agentsvc.NewAgentService(aiHub, repo, taskRepo, agentRedis, asyncPipeline) +} diff --git a/backend/service/agent.go b/backend/service/agentsvc/agent.go similarity index 70% rename from backend/service/agent.go rename to backend/service/agentsvc/agent.go index 4b8afef..8db077d 100644 --- a/backend/service/agent.go +++ b/backend/service/agentsvc/agent.go @@ -1,4 +1,4 @@ -package service +package agentsvc import ( "context" @@ -9,6 +9,7 @@ import ( "github.com/LoveLosita/smartflow/backend/agent/chat" "github.com/LoveLosita/smartflow/backend/conv" "github.com/LoveLosita/smartflow/backend/dao" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" "github.com/LoveLosita/smartflow/backend/inits" "github.com/LoveLosita/smartflow/backend/model" "github.com/LoveLosita/smartflow/backend/pkg" @@ -22,10 +23,13 @@ type AgentService struct { repo *dao.AgentDAO taskRepo *dao.TaskDAO agentCache *dao.AgentCache - asyncPipeline *AgentAsyncPipeline + asyncPipeline *outboxinfra.ChatHistoryAsync } -func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, agentRedis *dao.AgentCache, asyncPipeline *AgentAsyncPipeline) *AgentService { +// NewAgentService 构造 AgentService。 +// 这里通过依赖注入把“模型、仓储、缓存、异步持久化通道”统一交给服务层管理, +// 便于后续在单测中替换实现,或在启动流程中按环境切换配置。 +func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskDAO, agentRedis *dao.AgentCache, asyncPipeline *outboxinfra.ChatHistoryAsync) *AgentService { return &AgentService{ AIHub: aiHub, repo: repo, @@ -35,6 +39,10 @@ func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, taskRepo *dao.TaskD } } +// normalizeConversationID 规范会话 ID。 +// 规则: +// 1) 去除首尾空白; +// 2) 若为空则生成 UUID,保证后续缓存/数据库操作始终有合法 chat_id。 func normalizeConversationID(chatID string) string { trimmed := strings.TrimSpace(chatID) if trimmed == "" { @@ -43,6 +51,10 @@ func normalizeConversationID(chatID string) string { return trimmed } +// pickChatModel 根据请求选择模型。 +// 当前约定: +// - strategist:策略模型; +// - 其余值默认 worker(包含空字符串场景)。 func (s *AgentService) pickChatModel(requestModel string) (*ark.ChatModel, string) { modelName := strings.TrimSpace(requestModel) if strings.EqualFold(modelName, "strategist") { @@ -55,12 +67,19 @@ func (s *AgentService) pickChatModel(requestModel string) (*ark.ChatModel, strin // 1) 开启异步链路时,走 outbox + Kafka; // 2) 未开启时,直接同步写库。 func (s *AgentService) saveChatHistoryReliable(ctx context.Context, payload model.ChatHistoryPersistPayload) error { + // 1. 未注入异步通道时(例如本地极简环境),直接同步写 DB。 + // 这样可以保证功能不依赖 Kafka 也能跑通。 if s.asyncPipeline == nil { return s.repo.SaveChatHistory(ctx, payload.UserID, payload.ConversationID, payload.Role, payload.Message) } + // 2. 已启用异步通道时,只入 outbox,不在请求路径阻塞 Kafka。 return s.asyncPipeline.EnqueueChatHistoryPersist(ctx, payload) } +// pushErrNonBlocking 向错误通道“尽力投递”错误。 +// 目的: +// 1) 避免 goroutine 在 errChan 满时被阻塞导致泄漏; +// 2) 保证主业务协程不因“错误上报拥塞”卡死。 func pushErrNonBlocking(errChan chan error, err error) { select { case errChan <- err: @@ -86,6 +105,7 @@ func (s *AgentService) runNormalChatFlow( outChan chan<- string, errChan chan error, ) { + // 1. 先尝试从 Redis 读历史,命中可直接进入模型推理,减少 DB 压力。 chatHistory, err := s.agentCache.GetHistory(ctx, chatID) if err != nil { pushErrNonBlocking(errChan, err) @@ -94,6 +114,7 @@ func (s *AgentService) runNormalChatFlow( cacheMiss := false if chatHistory == nil { + // 2. 缓存未命中时回源 DB,并转换为 Eino message 格式。 cacheMiss = true histories, hisErr := s.repo.GetUserChatHistories(ctx, userID, pkg.HistoryFetchLimitByModel(resolvedModelName), chatID) if hisErr != nil { @@ -103,10 +124,13 @@ func (s *AgentService) runNormalChatFlow( chatHistory = conv.ToEinoMessages(histories) } + // 3. 计算本次请求可用的历史 token 预算,并执行历史裁剪。 + // 这样可以在上下文增长时稳定控制模型窗口,避免超长上下文引发报错或高延迟。 historyBudget := pkg.HistoryTokenBudgetByModel(resolvedModelName, chat.SystemPrompt, userMessage) trimmedHistory, totalHistoryTokens, keptHistoryTokens, droppedCount := pkg.TrimHistoryByTokenBudget(chatHistory, historyBudget) chatHistory = trimmedHistory + // 4. 根据裁剪后历史长度更新 Redis 会话窗口配置,并主动执行窗口收敛。 targetWindow := pkg.CalcSessionWindowSize(len(chatHistory)) if err = s.agentCache.SetSessionWindowSize(ctx, chatID, targetWindow); err != nil { log.Printf("设置历史窗口失败 chat=%s: %v", chatID, err) @@ -121,18 +145,24 @@ func (s *AgentService) runNormalChatFlow( } if cacheMiss { + // 5. 回源后把历史回填到 Redis,减少下一次请求的冷启动成本。 if err = s.agentCache.BackfillHistory(ctx, chatID, chatHistory); err != nil { pushErrNonBlocking(errChan, err) return } } + // 6. 执行真正的流式聊天。 + // fullText 用于后续写 Redis/持久化,outChan 用于把流片段实时推给前端。 fullText, streamErr := chat.StreamChat(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan, traceID, chatID, requestStart) if streamErr != nil { pushErrNonBlocking(errChan, streamErr) return } + // 7. 后置持久化(用户消息): + // 7.1 先写 Redis,保证“最新会话上下文”可立即用于下一轮推理; + // 7.2 再走可靠持久化入口(outbox 或同步 DB)。 if err = s.agentCache.PushMessage(ctx, chatID, &schema.Message{Role: schema.User, Content: userMessage}); err != nil { log.Printf("写入用户消息到 Redis 失败: %v", err) } @@ -149,6 +179,9 @@ func (s *AgentService) runNormalChatFlow( // 普通聊天链路也需要把助手回复写入 Redis, // 否则会出现“数据库有助手消息,但 Redis 最新会话只有用户消息”的口径不一致。 + // 8. 后置持久化(助手消息): + // 8.1 先写 Redis,保证下一轮上下文可见; + // 8.2 再异步可靠落库,失败通过 errChan 回传给上层。 if err = s.agentCache.PushMessage(context.Background(), chatID, &schema.Message{Role: schema.Assistant, Content: fullText}); err != nil { log.Printf("写入助手消息到 Redis 失败: %v", err) } @@ -167,6 +200,9 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin requestStart := time.Now() traceID := uuid.NewString() + // 1. 每个请求都返回两个通道: + // - outChan:推送流式输出片段; + // - errChan:推送异步阶段错误(非阻塞上报)。 outChan := make(chan string, 8) errChan := make(chan error, 1) @@ -175,6 +211,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin selectedModel, resolvedModelName := s.pickChatModel(modelName) // 2) 确保会话存在(优先缓存,必要时回源 DB 并创建)。 + // 2.1 先查 Redis 会话标记,命中则可跳过 DB 存在性校验。 result, err := s.agentCache.GetConversationStatus(ctx, chatID) if err != nil { errChan <- err @@ -183,6 +220,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin return outChan, errChan } if !result { + // 2.2 缓存未命中时回源 DB:确认会话是否存在。 innerResult, ifErr := s.repo.IfChatExists(ctx, userID, chatID) if ifErr != nil { errChan <- ifErr @@ -191,6 +229,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin return outChan, errChan } if !innerResult { + // 2.3 DB 里也不存在则创建新会话。 if _, err = s.repo.CreateNewChat(userID, chatID); err != nil { errChan <- err close(outChan) @@ -198,6 +237,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin return outChan, errChan } } + // 2.4 补写 Redis 会话标记,优化下次访问。 if err = s.agentCache.SetConversationStatus(ctx, chatID); err != nil { log.Printf("设置会话状态缓存失败 chat=%s: %v", chatID, err) } @@ -210,15 +250,19 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin go func() { defer close(outChan) + // 3.1 先走轻量路由,判断是否进入“随口记”图。 routing := s.decideQuickNoteRouting(ctx, selectedModel, userMessage) if !routing.EnterQuickNote { + // 3.2 非随口记:直接走普通聊天主链路。 s.runNormalChatFlow(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, userID, chatID, traceID, requestStart, outChan, errChan) return } + // 3.3 随口记:先发阶段状态,减少用户等待时的“无反馈感”。 progress := newQuickNoteProgressEmitter(outChan, resolvedModelName, true) progress.Emit("request.accepted", routing.Detail) + // 3.4 执行随口记 graph。 quickHandled, quickState, quickErr := s.tryHandleQuickNoteWithGraph( ctx, selectedModel, @@ -230,10 +274,12 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin progress.Emit, ) if quickErr != nil { + // graph 出错不直接中断用户请求,而是回退普通聊天,保证可用性优先。 log.Printf("随口记 graph 执行失败,回退普通聊天 trace_id=%s chat_id=%s err=%v", traceID, chatID, quickErr) } if quickHandled { + // 3.5 随口记处理成功:组织最终回复并按 OpenAI 兼容格式输出。 progress.Emit("quick_note.reply.polishing", "正在结合你的话题润色回复。") quickReply := buildQuickNoteFinalReply(ctx, selectedModel, userMessage, quickState) if emitErr := emitSingleAssistantCompletion(outChan, resolvedModelName, quickReply); emitErr != nil { @@ -241,10 +287,12 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin return } + // 3.6 对随口记回复执行统一后置持久化(Redis + outbox/DB)。 s.persistChatAfterReply(ctx, userID, chatID, userMessage, quickReply, errChan) return } + // 3.7 路由误判或 graph 判定非随口记时,回落普通聊天,保证“能聊”。 progress.Emit("quick_note.fallback", "当前输入不是随口记请求,切换到普通对话。") s.runNormalChatFlow(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, userID, chatID, traceID, requestStart, outChan, errChan) }() diff --git a/backend/service/agent_quick_note.go b/backend/service/agentsvc/agent_quick_note.go similarity index 75% rename from backend/service/agent_quick_note.go rename to backend/service/agentsvc/agent_quick_note.go index fdc52ed..2d257c7 100644 --- a/backend/service/agent_quick_note.go +++ b/backend/service/agentsvc/agent_quick_note.go @@ -1,4 +1,4 @@ -package service +package agentsvc import ( "context" @@ -35,11 +35,15 @@ type quickNoteProgressEmitter struct { enablePush bool } +// newQuickNoteProgressEmitter 构造“阶段进度推送器”。 +// 该推送器只负责发 reasoning 块,不负责正文回复。 func newQuickNoteProgressEmitter(outChan chan<- string, modelName string, enable bool) *quickNoteProgressEmitter { + // 1. 模型名兜底,避免出现空 model 字段导致客户端兼容性问题。 resolvedModel := strings.TrimSpace(modelName) if resolvedModel == "" { resolvedModel = "worker" } + // 2. 每次请求生成独立 request_id,方便前端或日志侧关联本次流式输出。 return &quickNoteProgressEmitter{ outChan: outChan, modelName: resolvedModel, @@ -54,9 +58,11 @@ func newQuickNoteProgressEmitter(outChan chan<- string, modelName string, enable // 1) 这里不输出 role,避免和后续正文 role 块冲突; // 2) 即使发送失败,也只记录日志,不影响主流程继续执行。 func (e *quickNoteProgressEmitter) Emit(stage, detail string) { + // 1. 推送器不可用(nil/禁用/无通道)时直接返回,避免 panic。 if e == nil || !e.enablePush || e.outChan == nil { return } + // 2. 统一清理空白,避免日志和输出里出现异常空字符串。 stage = strings.TrimSpace(stage) detail = strings.TrimSpace(detail) if stage == "" && detail == "" { @@ -68,8 +74,10 @@ func (e *quickNoteProgressEmitter) Emit(stage, detail string) { reasoning += "\n" + detail } + // 3. 复用 OpenAI 兼容封装:把阶段文本伪装成 reasoning_content。 chunk, err := chat.ToOpenAIStream(&schema.Message{ReasoningContent: reasoning}, e.requestID, e.modelName, e.created, false) if err != nil { + // 3.1 阶段推送失败不应影响主链路,只打日志即可。 log.Printf("输出随口记阶段状态失败 stage=%s err=%v", stage, err) return } @@ -93,19 +101,28 @@ func (s *AgentService) tryHandleQuickNoteWithGraph( trustRoute bool, emitStage func(stage, detail string), ) (handled bool, state *quicknote.QuickNoteState, err error) { + // 1. 依赖预检:taskRepo 或模型未注入时,不做随口记处理,交给上层回落聊天。 if s.taskRepo == nil || selectedModel == nil { return false, nil, nil } + // 2. 初始化随口记状态对象(贯穿 graph 全流程的共享上下文)。 state = quicknote.NewQuickNoteState(traceID, userID, chatID, userMessage) + + // 3. 执行 quick note graph。 + // 本次依赖注入了两个“工具能力”: + // 3.1 ResolveUserID:从当前请求上下文确定 user_id; + // 3.2 CreateTask:真正执行任务写库。 finalState, runErr := quicknote.RunQuickNoteGraph(ctx, quicknote.QuickNoteGraphRunInput{ Model: selectedModel, State: state, Deps: quicknote.QuickNoteToolDeps{ ResolveUserID: func(ctx context.Context) (int, error) { + // 当前链路 userID 已由上层鉴权拿到,这里直接复用。 return userID, nil }, CreateTask: func(ctx context.Context, req quicknote.QuickNoteCreateTaskRequest) (*quicknote.QuickNoteCreateTaskResult, error) { + // 3.2.1 把 quick note 的工具入参映射成项目 Task 模型。 taskModel := &model.Task{ UserID: req.UserID, Title: req.Title, @@ -113,10 +130,14 @@ func (s *AgentService) tryHandleQuickNoteWithGraph( IsCompleted: false, DeadlineAt: req.DeadlineAt, } + + // 3.2.2 调用 DAO 写库。 created, createErr := s.taskRepo.AddTask(taskModel) if createErr != nil { return nil, createErr } + + // 3.2.3 把写库结果回填给 graph 状态,用于后续回复拼装。 return &quicknote.QuickNoteCreateTaskResult{ TaskID: created.ID, Title: created.Title, @@ -129,11 +150,15 @@ func (s *AgentService) tryHandleQuickNoteWithGraph( EmitStage: emitStage, }) if runErr != nil { + // 4. graph 执行失败由上层统一决定是否回退普通聊天。 return false, nil, runErr } + + // 5. graph 正常结束但判定“非随口记”时,明确返回 handled=false。 if finalState == nil || !finalState.IsQuickNoteIntent { return false, nil, nil } + // 6. 走到这里表示随口记链路已完成(含写库成功或业务失败反馈文案)。 return true, finalState, nil } @@ -142,12 +167,14 @@ func (s *AgentService) tryHandleQuickNoteWithGraph( // 1) 保持现有 OpenAI 兼容格式不变; // 2) 正文只发一次,不做伪分段。 func emitSingleAssistantCompletion(outChan chan<- string, modelName, reply string) error { + // 1. 模型名兜底,保持 OpenAI 兼容响应字段完整。 if strings.TrimSpace(modelName) == "" { modelName = "worker" } requestID := "chatcmpl-" + uuid.NewString() created := time.Now().Unix() + // 2. 正文 chunk(完整一次性输出,不做人为拆片)。 chunk, err := chat.ToOpenAIStream(&schema.Message{Role: schema.Assistant, Content: reply}, requestID, modelName, created, true) if err != nil { return err @@ -156,6 +183,7 @@ func emitSingleAssistantCompletion(outChan chan<- string, modelName, reply strin outChan <- chunk } + // 3. 按 OpenAI 风格补 finish chunk + [DONE],确保客户端可正确收尾。 finishChunk, err := chat.ToOpenAIFinishStream(requestID, modelName, created) if err != nil { return err @@ -171,12 +199,14 @@ func emitSingleAssistantCompletion(outChan chan<- string, modelName, reply strin // 2) 轻松跟进句交给 AI 生成,贴合用户话题; // 3) AI 生成失败时自动降级为固定友好文案,保证稳定可用。 func buildQuickNoteFinalReply(ctx context.Context, selectedModel *ark.ChatModel, userMessage string, state *quicknote.QuickNoteState) string { + // 1. 极端兜底:状态为空时给出稳定失败文案,避免返回空字符串。 if state == nil { return "我这次没成功记上,别急,再发我一次我马上补上。" } // 仅当“确实拿到了有效 task_id”时才走成功文案,避免出现“回复成功但库里没数据”的错觉。 if state.Persisted && state.PersistedTaskID > 0 { + // 2. 组装“事实段”:标题 + 优先级 + 截止时间。 title := strings.TrimSpace(state.ExtractedTitle) if title == "" { title = "这条任务" @@ -193,13 +223,17 @@ func buildQuickNoteFinalReply(ctx context.Context, selectedModel *ark.ChatModel, } factLine := fmt.Sprintf("好,给你安排上了:%s(%s%s)。", title, priorityText, deadlineText) + + // 2.1 如果 graph 单次请求已生成 banter,直接使用,避免重复调用模型。 if strings.TrimSpace(state.ExtractedBanter) != "" { return factLine + " " + strings.TrimSpace(state.ExtractedBanter) } + // 2.2 聚合调用模式下,通常已在主流程完成风格化,给稳定文案即可。 if state.PlannedBySingleCall { return factLine + " 已帮你稳稳记下,放心推进。" } + // 2.3 兜底生成轻松跟进句;失败则降级固定文案,确保体验连续。 banter, err := generateQuickNoteBanter(ctx, selectedModel, userMessage, title, priorityText, deadlineText) if err != nil { return factLine + " 这下可以先安心推进,不用等 ddl 来敲门了。" @@ -210,13 +244,16 @@ func buildQuickNoteFinalReply(ctx context.Context, selectedModel *ark.ChatModel, return factLine + " " + banter } + // 3. 若时间校验失败,优先返回“可执行的修正引导”。 if strings.TrimSpace(state.DeadlineValidationError) != "" { return "我识别到你给了时间,但格式不够明确,暂时不敢乱记。你可以改成比如:2026-03-20 18:30、明天下午3点、下周一上午9点,我立刻帮你安排。" } + // 4. 若 graph 已给出助手回复(例如非意图/业务失败原因),优先透传。 if strings.TrimSpace(state.AssistantReply) != "" { return strings.TrimSpace(state.AssistantReply) } + // 5. 最终兜底文案。 return "这次没成功写入任务,我没跑路,再给我一次我就把它稳稳记上。" } @@ -233,10 +270,12 @@ func generateQuickNoteBanter( priorityText string, deadlineText string, ) (string, error) { + // 1. 模型防御校验。 if selectedModel == nil { return "", fmt.Errorf("model is nil") } + // 2. 把事实信息显式塞入 prompt,约束模型只能“润色语气”。 prompt := fmt.Sprintf(`用户原话:%s 已确认事实: - 任务标题:%s @@ -250,11 +289,15 @@ func generateQuickNoteBanter( strings.TrimSpace(deadlineText), ) + // 3. 构造消息: + // - system:定义输出边界(一句话、不改事实); + // - user:提供本次上下文素材。 messages := []*schema.Message{ schema.SystemMessage(quicknote.QuickNoteReplyBanterPrompt), schema.UserMessage(prompt), } + // 4. 调用模型生成 banter,并显式关闭 thinking,减少额外延迟。 resp, err := selectedModel.Generate(ctx, messages, ark.WithThinking(&arkModel.Thinking{Type: arkModel.ThinkingTypeDisabled}), einoModel.WithTemperature(0.7), @@ -267,6 +310,10 @@ func generateQuickNoteBanter( return "", fmt.Errorf("empty response") } + // 5. 输出清洗: + // 5.1 去首尾空白与引号; + // 5.2 若模型多行输出,只取第一行; + // 5.3 最终为空则视为失败,让上层走降级文案。 text := strings.TrimSpace(resp.Content) text = strings.Trim(text, "\"'“”‘’") if text == "" { @@ -281,6 +328,8 @@ func generateQuickNoteBanter( // decideQuickNoteRouting 决定当前输入是否进入“随口记 graph”。 // 该函数只是服务层薄封装,具体控制码解析逻辑已下沉到 agent/route 包。 func (s *AgentService) decideQuickNoteRouting(ctx context.Context, selectedModel *ark.ChatModel, userMessage string) quickNoteRoutingDecision { + // 这里保留方法是为了让 AgentService 对外语义完整, + // 同时避免上层调用方直接依赖 route 包,降低耦合。 _ = s return route.DecideQuickNoteRouting(ctx, selectedModel, userMessage) } @@ -296,10 +345,12 @@ func (s *AgentService) persistChatAfterReply( assistantReply string, errChan chan error, ) { + // 1. 先把用户消息写入 Redis,保证会话上下文“马上可见”。 if err := s.agentCache.PushMessage(ctx, chatID, &schema.Message{Role: schema.User, Content: userMessage}); err != nil { log.Printf("写入用户消息到 Redis 失败: %v", err) } + // 2. 再把用户消息写入可靠持久化通道(outbox 或同步 DB)。 if err := s.saveChatHistoryReliable(ctx, model.ChatHistoryPersistPayload{ UserID: userID, ConversationID: chatID, @@ -310,10 +361,12 @@ func (s *AgentService) persistChatAfterReply( return } + // 3. 助手消息同样遵循“Redis 先行 + 可靠持久化补齐”策略。 if err := s.agentCache.PushMessage(context.Background(), chatID, &schema.Message{Role: schema.Assistant, Content: assistantReply}); err != nil { log.Printf("写入助手消息到 Redis 失败: %v", err) } + // 4. 助手消息持久化失败不阻断主流程,通过 errChan 异步上报。 if err := s.saveChatHistoryReliable(context.Background(), model.ChatHistoryPersistPayload{ UserID: userID, ConversationID: chatID, diff --git a/backend/service/agent_quick_note_route_test.go b/backend/service/agentsvc/agent_quick_note_route_test.go similarity index 99% rename from backend/service/agent_quick_note_route_test.go rename to backend/service/agentsvc/agent_quick_note_route_test.go index 6f9ee0e..9691f81 100644 --- a/backend/service/agent_quick_note_route_test.go +++ b/backend/service/agentsvc/agent_quick_note_route_test.go @@ -1,4 +1,4 @@ -package service +package agentsvc import ( "strings"