Version: 0.9.20.dev.260415

后端:
1. 修复 query_available_slots section_from/section_to 错误覆盖 duration 并使用精确匹配而非范围包含
- 更新backend/newAgent/tools/schedule/read_filter_tools.go:移除 span = exactTo - exactFrom + 1 对 duration 的覆盖;matchSectionRange
  从精确匹配改为范围包含语义(slotStart < exactFrom || slotEnd > exactTo)
2. Execute 上下文窗口从硬编码裁剪改造为 80k token 动态预算 + LLM滚动压缩
- 基础设施层:AgentChat 新增 compaction 三个持久化字段,dao 新增 CRUD,Redis 新增缓存;pkg 新增 ExecuteTokenBudget常量、ExecuteTokenBreakdown 结构体、CheckExecuteTokenBudget 预算检查函数
- prompt 层:新建 compact_msg1.go / compact_msg2.go 分别实现msg1(历史对话)和 msg2(ReAct Loop)的 LLM 压缩;execute_context.go 移除 msg1 的 1400 字符/30 轮/120 字符三重裁剪和 msg2 的 8 条窗口限制,改为全量加载
- node 层:新建 execute_compact.go(compactExecuteMessagesIfNeeded:预算检查 → msg1 优先压缩 → msg2 兜底 → SSE 通知 → token 分布持久化);execute.go ReAct 循环插入 compact 调用 - 服务/API 层:AgentGraphDeps / AgentService 新增 CompactionStore 注入链路;新增 GET /api/v1/agent/context-stats 查询接口
- 启动层:cmd/start.go 注入 agentRepo 为 CompactionStore
3. 新增 Execute Context Compaction 决策报告
- 新建docs/功能决策记录/Execute_Context_Compaction_决策记录.md

前端:无 仓库:无
This commit is contained in:
LoveLosita
2026-04-15 22:01:37 +08:00
parent e77d42fce5
commit 8bde981592
23 changed files with 1921 additions and 8532 deletions

View File

@@ -264,3 +264,27 @@ func (api *AgentHandler) GetSchedulePlanPreview(c *gin.Context) {
}
c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, preview))
}
// GetContextStats 获取指定会话的上下文窗口 token 分布统计。
func (api *AgentHandler) GetContextStats(c *gin.Context) {
conversationID := strings.TrimSpace(c.Query("conversation_id"))
if conversationID == "" {
c.JSON(http.StatusBadRequest, respond.MissingParam)
return
}
userID := c.GetInt("user_id")
ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second)
defer cancel()
statsJSON, err := api.svc.GetContextStats(ctx, userID, conversationID)
if err != nil {
respond.DealWithError(c, err)
return
}
// 直接透传 JSON 字符串,避免二次序列化。
var raw json.RawMessage = json.RawMessage(statsJSON)
c.JSON(http.StatusOK, respond.RespWithData(respond.Ok, raw))
}

View File

@@ -170,6 +170,7 @@ func Start() {
}))
agentService.SetScheduleProvider(newagentconv.NewScheduleProvider(scheduleRepo, taskClassRepo))
agentService.SetSchedulePersistor(newagentconv.NewSchedulePersistorAdapter(manager))
agentService.SetCompactionStore(agentRepo)
agentService.SetMemoryReader(memoryModule)
// API 层初始化。

View File

@@ -305,3 +305,39 @@ func extractMessageHistoryID(msg *schema.Message) int {
return 0
}
}
// ---- Compaction 缓存 ----
func (m *AgentCache) compactionKey(chatID string) string {
return fmt.Sprintf("smartflow:compaction:%s", chatID)
}
// SaveCompactionCache 将压缩摘要缓存到 Redis。
func (m *AgentCache) SaveCompactionCache(ctx context.Context, chatID string, summary string, watermark int) error {
key := m.compactionKey(chatID)
data, _ := json.Marshal(map[string]any{
"summary": summary,
"watermark": watermark,
})
return m.client.Set(ctx, key, data, m.expiration).Err()
}
// LoadCompactionCache 从 Redis 读取压缩摘要缓存。
func (m *AgentCache) LoadCompactionCache(ctx context.Context, chatID string) (summary string, watermark int, ok bool, err error) {
key := m.compactionKey(chatID)
val, err := m.client.Get(ctx, key).Result()
if err != nil {
if err == redis.Nil {
return "", 0, false, nil
}
return "", 0, false, err
}
var data struct {
Summary string `json:"summary"`
Watermark int `json:"watermark"`
}
if jsonErr := json.Unmarshal([]byte(val), &data); jsonErr != nil {
return "", 0, false, nil
}
return data.Summary, data.Watermark, true, nil
}

View File

@@ -401,3 +401,57 @@ func (a *AgentDAO) GetConversationList(ctx context.Context, userID, page, pageSi
}
return chats, total, nil
}
// ---- Compaction 相关 ----
// SaveCompaction 保存压缩摘要和水位线。
func (a *AgentDAO) SaveCompaction(ctx context.Context, userID int, chatID string, summary string, watermark int) error {
return a.db.WithContext(ctx).
Model(&model.AgentChat{}).
Where("user_id = ? AND chat_id = ?", userID, chatID).
Updates(map[string]any{
"compaction_summary": summary,
"compaction_watermark": watermark,
}).Error
}
// LoadCompaction 读取压缩摘要和水位线。
func (a *AgentDAO) LoadCompaction(ctx context.Context, userID int, chatID string) (summary string, watermark int, err error) {
var chat model.AgentChat
err = a.db.WithContext(ctx).
Select("compaction_summary", "compaction_watermark").
Where("user_id = ? AND chat_id = ?", userID, chatID).
First(&chat).Error
if err != nil {
return "", 0, err
}
if chat.CompactionSummary != nil {
summary = *chat.CompactionSummary
}
watermark = chat.CompactionWatermark
return
}
// SaveContextTokenStats 保存上下文窗口 token 分布统计。
func (a *AgentDAO) SaveContextTokenStats(ctx context.Context, userID int, chatID string, statsJSON string) error {
return a.db.WithContext(ctx).
Model(&model.AgentChat{}).
Where("user_id = ? AND chat_id = ?", userID, chatID).
Update("context_token_stats", statsJSON).Error
}
// LoadContextTokenStats 读取上下文窗口 token 分布统计。
func (a *AgentDAO) LoadContextTokenStats(ctx context.Context, userID int, chatID string) (string, error) {
var chat model.AgentChat
err := a.db.WithContext(ctx).
Select("context_token_stats").
Where("user_id = ? AND chat_id = ?", userID, chatID).
First(&chat).Error
if err != nil {
return "", err
}
if chat.ContextTokenStats != nil {
return *chat.ContextTokenStats, nil
}
return "", nil
}

View File

@@ -270,19 +270,22 @@ type SSEMessageData struct {
}
type AgentChat struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement;comment:自增ID"`
ChatID string `gorm:"column:chat_id;type:varchar(36);not null;uniqueIndex:uk_chat_id;comment:会话UUID"`
UserID int `gorm:"column:user_id;not null;index:idx_user_last,priority:1;index:idx_user_status,priority:1;comment:所属用户ID"`
Title *string `gorm:"column:title;type:varchar(255);comment:会话标题"`
SystemPrompt *string `gorm:"column:system_prompt;type:text;comment:系统提示词"`
Model *string `gorm:"column:model;type:varchar(100);comment:模型标识"`
MessageCount int `gorm:"column:message_count;not null;default:0;comment:消息总数"`
TokensTotal int `gorm:"column:tokens_total;not null;default:0;comment:累计Token"`
LastMessageAt *time.Time `gorm:"column:last_message_at;comment:最后消息时间"`
Status string `gorm:"column:status;type:varchar(32);not null;default:active;index:idx_user_status,priority:2;comment:会话状态"`
CreatedAt *time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt *time.Time `gorm:"column:updated_at;autoUpdateTime"`
DeletedAt *time.Time `gorm:"column:deleted_at;comment:软删除时间"`
ID int64 `gorm:"column:id;primaryKey;autoIncrement;comment:自增ID"`
ChatID string `gorm:"column:chat_id;type:varchar(36);not null;uniqueIndex:uk_chat_id;comment:会话UUID"`
UserID int `gorm:"column:user_id;not null;index:idx_user_last,priority:1;index:idx_user_status,priority:1;comment:所属用户ID"`
Title *string `gorm:"column:title;type:varchar(255);comment:会话标题"`
SystemPrompt *string `gorm:"column:system_prompt;type:text;comment:系统提示词"`
Model *string `gorm:"column:model;type:varchar(100);comment:模型标识"`
MessageCount int `gorm:"column:message_count;not null;default:0;comment:消息总数"`
TokensTotal int `gorm:"column:tokens_total;not null;default:0;comment:累计Token"`
LastMessageAt *time.Time `gorm:"column:last_message_at;comment:最后消息时间"`
Status string `gorm:"column:status;type:varchar(32);not null;default:active;index:idx_user_status,priority:2;comment:会话状态"`
CompactionSummary *string `gorm:"column:compaction_summary;type:text;comment:历史上下文压缩摘要"`
CompactionWatermark int `gorm:"column:compaction_watermark;not null;default:0;comment:压缩水位线最后被压缩的消息ID"`
ContextTokenStats *string `gorm:"column:context_token_stats;type:json;comment:上下文窗口实时token分布"`
CreatedAt *time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt *time.Time `gorm:"column:updated_at;autoUpdateTime"`
DeletedAt *time.Time `gorm:"column:deleted_at;comment:软删除时间"`
}
func (AgentChat) TableName() string { return "agent_chats" }

View File

@@ -0,0 +1,639 @@
# Memory 向 Mem0 靠拢三步冲刺计划newAgent
## 1. 一句话结论
当前 `memory` 已经具备了“可异步写入、可基础抽取、可基础检索、可注入 newAgent”的骨架但距离真正有 Mem0 味道的记忆系统,还差三块核心能力:
1. 写入侧没有“先召回旧记忆,再做 `ADD/UPDATE/DELETE/NONE` 决策”的治理层。
2. 读侧没有把“硬约束优先、语义召回补充、结果去重、注入预算”做成稳定链路。
3. 系统层没有形成“可灰度、可解释、可清理、可回滚”的治理闭环。
因此建议按三步走推进,并严格遵守一个原则:
1. 每一轮只处理一个能力域。
2. 第一步只动写入决策层。
3. 第二步只动读链路与注入质量。
4. 第三步只动治理、清理、指标与切流收口。
---
## 2. 本文档给谁看
本文档面向三类读者:
1. 需要继续实现 `memory/newAgent` 的 agent。
2. 需要拆任务、排优先级的人。
3. 需要快速判断“本轮该改什么、不该改什么”的维护者。
本文档不是背景介绍文档,而是“可直接拿去拆工单和接力开发”的冲刺说明。
---
## 3. 当前现状与目标差距
### 3.1 当前已完成的部分
当前已经有的能力:
1. 聊天消息可通过 `outbox -> memory.extract.requested -> memory_jobs -> worker` 进入异步记忆链路。
2. Worker 可调用 LLM 做事实抽取,并通过 `NormalizeFacts` 做批内标准化和批内去重。
3. `memory_items / memory_jobs / memory_audit_logs / memory_user_settings` 四张核心表已经建立并接线。
4. `ReadService` 已可做基础查询与轻量排序。
5. `newAgent` 已通过 `injectMemoryContext` 把记忆写入 pinned block。
6. 用户设置、删除、审计已经具备基础治理能力。
### 3.2 当前离 Mem0 还差什么
最关键的差距如下:
| 能力 | 当前状态 | 与 Mem0 的差距 |
| --- | --- | --- |
| 异步入队 | 已完成 | 基本到位 |
| 抽取候选事实 | 已完成 | 缺少更强的抽取后治理 |
| 批内去重 | 已完成 | 仅限单批,不处理历史记忆 |
| 历史去重 | 未完成 | 需要按旧记忆召回后做决策 |
| `ADD/UPDATE/DELETE/NONE` 决策 | 未完成 | 这是最关键差距 |
| 语义召回 | 部分完成 | 接口有了,质量与稳定性未形成闭环 |
| 读侧去重 | 未完成 | 现在更多是展示层弱去重 |
| Prompt 注入 | 基础版已接 | 还没有类型分层与预算控制 |
| 管理治理 | 部分完成 | 还缺更新、恢复、历史清理、指标闭环 |
| 灰度/回滚 | 较弱 | 需要细粒度 feature flag 与分阶段切流 |
### 3.3 本次冲刺的目标定义
本轮不是要把项目做成完整 Mem0也不是做图记忆或多 Provider 平台而是要做到一个“Mem0-lite 可自信上线”的状态。满足以下条件,就可以认为基本靠近目标:
1. 相同或同义记忆不会无脑越写越多。
2. 用户纠正一条旧记忆时,系统更倾向于更新旧值,而不是新增一条冲突值。
3. 读侧能优先拿到“硬约束 + 偏好 + 当前话题相关事实”,而不是仅按最近更新时间胡乱注入。
4. Prompt 注入是稳定、可控、可解释的,而不是纯拼接。
5. 出问题时可以快速关掉某一层能力,而不是整条 memory 链路一起陪葬。
---
## 4. 设计原则与边界
### 4.1 每轮只处理一个能力域
为避免回归问题无法定位,本计划明确规定:
1. 第一步只处理“写入决策层”。
2. 第二步只处理“读取与注入层”。
3. 第三步只处理“治理、清理与切流层”。
禁止在同一轮里同时大改:
1. `memory` 写入逻辑。
2. `newAgent` 图节点结构。
3. WebSearch / 其他 RAG 语料。
4. 多个 prompt 体系。
### 4.2 保留旧实现,走并行迁移
整个冲刺必须遵守并行迁移策略:
1. 旧的“抽取后直接 `Create`”路径先保留。
2. 新的“决策后 ApplyAction”路径并行落地。
3. 用 feature flag 灰度切流。
4. 验证通过后,再决定是否删除旧路径。
### 4.3 不新增“memory 工具化”这条支线
本轮不建议把 `memory` 改成一个显式工具让 `newAgent` 主动调用,原因如下:
1. 当前 `pinned block` 已经接入主链路,切点稳定。
2. 本轮目标是让记忆“更准”,不是让图结构更复杂。
3. 若同时引入工具化调用,会把“写入决策层”和“图编排层”耦到一起。
因此本轮默认继续沿用:
1. `backend/memory/service/read_service.go`
2. `backend/service/agentsvc/agent_memory.go`
3. `pinned block` 注入
---
## 5. 三步走总览
| 步骤 | 只处理的能力域 | 核心目标 | 旧实现是否保留 |
| --- | --- | --- | --- |
| 第一步 | 写入决策层 | 把“抽取即新增”升级为“召回旧记忆 + 决策动作” | 保留 |
| 第二步 | 读链路与注入层 | 把“查到就拼”升级为“硬约束优先 + 语义补充 + 注入预算” | 保留 |
| 第三步 | 治理与切流层 | 把“能跑”升级为“可灰度、可观测、可清理、可回滚” | 收口 |
---
## 6. 第一步:先把写入侧做成 Mem0-lite
### 6.1 这一步解决什么问题
当前写入链路本质上还是:
`抽取 -> 标准化 -> 直接写 memory_items`
这会带来三个直接问题:
1. 历史同义记忆不会合并。
2. 用户纠正旧记忆时,系统更可能新增一条相反记忆。
3. `content_hash` 现在更多只是存了个字段,没有真正承担“历史治理”的职责。
第一步的目标是把写入链路升级为:
`抽取 -> 召回旧记忆候选 -> 临时 ID 映射 -> LLM 决策 -> ApplyAction`
### 6.2 本轮要落的能力
第一步必须落地以下能力:
1. 为每条新候选 fact 召回有限个旧记忆候选。
2. 用临时整数 ID 或候选序号喂给决策模型,避免模型直接编造真实 `memory_id`
3. 让模型只输出结构化 JSON 决策:`ADD/UPDATE/DELETE/NONE`
4. 后端严格校验决策合法性,再执行数据库动作。
5. `UPDATE/DELETE` 也必须补齐审计日志,而不是只有 `create/delete`
### 6.3 推荐的文件落点
建议新增文件:
1. `backend/memory/model/decision.go`
- 定义决策 DTO、候选旧记忆 DTO、ApplyAction DTO。
2. `backend/memory/orchestrator/llm_decision_orchestrator.go`
- 负责“给定新 fact + 旧候选 -> 输出结构化动作决策”。
3. `backend/memory/utils/decision_id_map.go`
- 负责“真实 memory_id <-> 临时决策 ID”的映射。
4. `backend/memory/utils/decision_validate.go`
- 负责校验动作是否合法、目标 ID 是否存在、动作字段是否完整。
5. `backend/memory/worker/decision_flow.go`
- 负责 worker 内的“候选召回 -> 决策 -> 动作执行编排”。
6. `backend/memory/worker/apply_actions.go`
- 负责把 `ADD/UPDATE/DELETE/NONE` 落为数据库动作与审计。
建议修改文件:
1. `backend/memory/model/config.go`
2. `backend/memory/service/config_loader.go`
3. `backend/memory/repo/item_repo.go`
4. `backend/memory/worker/runner.go`
5. `backend/memory/utils/audit.go`
### 6.4 推荐新增配置
建议新增配置项,全部走 `memory` 命名空间:
1. `memory.decision.enabled`
- 是否启用决策层。
2. `memory.decision.candidateTopK`
- 每个新 fact 召回多少个旧记忆候选。
3. `memory.decision.fallbackMode`
- 建议支持 `legacy_add` / `drop` 两种模式。
4. `memory.write.mode`
- 建议支持 `legacy` / `decision` 两种模式。
建议默认值:
1. `memory.decision.enabled=false`
2. `memory.write.mode=legacy`
3. `memory.decision.candidateTopK=5`
4. `memory.decision.fallbackMode=legacy_add`
### 6.5 `ItemRepo` 需要补的能力
当前 `ItemRepo` 只有“查、建、删状态、刷访问时间、刷向量状态”,还不够支撑决策动作。第一步至少要补以下能力:
1. `FindDecisionCandidates(...)`
-`user_id + assistant_id + conversation_id + run_id + memory_type` 查候选。
- 当 RAG 可用时,可优先用向量召回补候选。
2. `UpdateContentByID(...)`
- 用于 `UPDATE`
- 至少要更新:`title/content/normalized_content/content_hash/confidence/importance/sensitivity_level/is_explicit/updated_at`
3. `SoftDeleteByID(...)`
- 用于决策型 `DELETE`
4. `FindActiveByHash(...)`
- 给兜底幂等或低成本重复检测预留接口。
注意:
1. 不要把这些逻辑继续堆进 `UpsertItems`
2. `UpsertItems` 可以暂时保留给 legacy 路径使用。
3. 新路径应尽量使用显式动作函数,而不是一个“万能 Upsert”。
### 6.6 Worker 内推荐的执行顺序
对每个 job建议执行以下顺序
1. 先抽取新事实。
2. 对抽取结果做 `NormalizeFacts`
3. 按用户设置过滤。
4.`memory.decision.enabled=false`,直接走旧路径并返回。
5. 对每条新 fact 召回旧候选:
- 先查强约束域内候选。
-`memory.rag.enabled=true`,再用 RAG 补充语义候选。
6. 对候选做临时 ID 映射。
7.`LLMDecisionOrchestrator` 输出动作。
8. 后端校验动作合法性。
9. 执行动作:
- `ADD`:创建 item + `create` audit
- `UPDATE`:更新旧 item + `update` audit
- `DELETE`:软删除旧 item + `delete` audit
- `NONE`:只记日志,不动表
10. 根据动作决定是否做向量同步:
- `ADD`:新增向量
- `UPDATE`:重写向量
- `DELETE`:删向量或打 pending 删除标记
### 6.7 决策 Prompt 的建议约束
决策 prompt 需要非常收敛,建议只允许模型做一件事:
1. 给定一条新 fact。
2. 给定少量旧候选。
3.`ADD/UPDATE/DELETE/NONE` 中选一个动作。
不建议第一版就让模型:
1. 一次同时处理多条新 fact 与多条旧事实的复杂批量决策。
2. 自己生成复杂的替代文案策略。
3. 自己修改 scope 或元数据。
推荐第一版输出结构大致为:
```json
{
"decisions": [
{
"candidate_index": 0,
"action": "UPDATE",
"target_temp_id": 2,
"title": "更新后的标题",
"content": "更新后的内容",
"reason": "新事实是在纠正旧事实"
}
]
}
```
### 6.8 这一步的验收标准
满足以下条件,可认为第一步完成:
1. 重复表达同一偏好,不会连续生成多条 `active` 记忆。
2. 用户显式纠正旧偏好时,会更倾向触发 `UPDATE`,而不是再新增一条冲突记忆。
3. `memory_audit_logs` 能明确区分 `create/update/delete`
4. 决策层失败时,不会阻断原有 legacy 链路。
5. 关闭 `memory.decision.enabled` 后,系统行为可完全回到当前实现。
### 6.9 这一步的回滚点
第一步必须保留明确回滚点:
1. 关闭 `memory.decision.enabled`
2. `memory.write.mode` 切回 `legacy`
回滚后仍然使用:
1. `LLMWriteOrchestrator.ExtractFacts`
2. `NormalizeFacts`
3. `buildMemoryItems`
4. `ItemRepo.UpsertItems`
### 6.10 这一步明确不做什么
第一步不要顺手做以下事情:
1. 不重构 `newAgent` 图节点。
2. 不引入 memory 工具调用。
3. 不做图记忆。
4. 不做用户侧“编辑记忆内容”的管理 API。
5. 不同时改 WebSearch 的 RAG 链路。
---
## 7. 第二步:把读取与注入做成真正可用的记忆链路
### 7.1 这一步解决什么问题
写入侧即使更聪明,如果读出来的还是“按分数凑五条,再平铺给 prompt”整体体验依然不会像 Mem0。
第二步要解决的问题是:
1. 硬约束和偏好不能被普通事实挤掉。
2. 历史重复项不能继续在读侧污染 TopK。
3. 注入给模型的文本需要可控,而不是简单平铺。
4. RAG 可用时要真正成为加分项,不可用时要稳定降级。
### 7.2 本轮要落的能力
第二步必须落地以下能力:
1. 读侧合并“结构化强约束召回”和“语义候选召回”。
2. 读侧在服务层做真正的去重,而不是只在渲染字符串时弱去重。
3. 注入文本按类型分组,而不是所有内容同一层级平铺。
4. 给每一类记忆设置注入预算,避免事实类把 prompt 撑爆。
### 7.3 推荐的文件落点
建议优先修改文件:
1. `backend/memory/service/read_service.go`
2. `backend/memory/repo/item_repo.go`
3. `backend/service/agentsvc/agent_memory.go`
如需补辅助文件,建议新增:
1. `backend/memory/service/retrieve_merge.go`
- 负责多路召回的结果合并、去重、预算裁剪。
2. `backend/memory/service/retrieve_rank.go`
- 负责重排与门控。
3. `backend/service/agentsvc/agent_memory_render.go`
- 负责把 memory DTO 渲染成稳定的注入 block。
说明:
1. 当前 `agent_memory.go` 已经不算小。
2. 第二步不要继续往单文件里堆“召回策略 + 去重 + 渲染模板”。
3. 这一轮拆开渲染层是合理的职责拆分,不属于跨能力域大重构。
### 7.4 读取侧推荐的新流程
建议读侧升级为以下顺序:
1. 先从 MySQL 拉“必守约束”:
- `constraint`
- 高置信度 `preference`
2. 再按当前 query 做相关召回:
-`memory.rag.enabled=true`,优先走 RAG
- 否则走 legacy DB 排序
3. 合并两路结果。
4. 先按 `memory_id` 去重。
5. 再按 `content_hash` 去重。
6. 最后才按渲染文本兜底去重。
7. 对结果做类型预算:
- `constraint`:优先保留
- `preference`:次优先
- `todo_hint`:控制数量
- `fact`:最容易膨胀,要严格限额
### 7.5 注入层推荐的渲染方式
当前渲染方式更像“扁平清单”。第二步建议升级成“分段注入”,例如:
1. 必守约束
2. 用户偏好
3. 当前话题相关事实
4. 近期线索
推荐生成类似文本:
```text
以下是与当前对话相关的用户记忆,仅在确实有帮助时参考,不要机械复述。
【必守约束】
- 用户点外卖不要香菜。
【用户偏好】
- 用户偏爱黑咖啡。
【当前话题相关事实】
- 用户最近在准备周四的程序设计作业。
```
这样做的好处:
1. 模型更容易区分“必须遵守”和“仅可参考”。
2. 日后更容易按类型做 budget。
3. 若发生错误注入,也更容易解释是哪一层出错。
### 7.6 第二步建议新增配置
建议新增:
1. `memory.read.mode`
- 建议支持 `legacy` / `hybrid`
2. `memory.read.factLimit`
3. `memory.read.preferenceLimit`
4. `memory.read.constraintLimit`
5. `memory.inject.renderMode`
- 建议支持 `flat` / `typed_v2`
建议默认值:
1. `memory.read.mode=legacy`
2. `memory.inject.renderMode=flat`
灰度时再逐步切到:
1. `memory.read.mode=hybrid`
2. `memory.inject.renderMode=typed_v2`
### 7.7 这一步的验收标准
满足以下条件,可认为第二步完成:
1. 同一条重复记忆即使数据库里有多条,最终注入给 prompt 也只保留一条。
2. `constraint` 类记忆不会轻易被 `fact` 类挤出注入集合。
3. RAG 异常时,系统仍能稳定退回 legacy 读取逻辑。
4. 注入文本结构清晰,且总长度稳定,不会一轮长一轮短。
5. newAgent 的 `pinned block` 内容更可读、更可解释。
### 7.8 这一步的回滚点
第二步必须支持快速回滚:
1. `memory.read.mode=legacy`
2. `memory.inject.renderMode=flat`
3. `memory.rag.enabled=false`
回滚后保留:
1. 旧的 `ReadService.retrieveByLegacy`
2. 当前 `agent_memory.go` 扁平渲染逻辑
### 7.9 这一步明确不做什么
第二步不要顺手做以下事情:
1. 不把 memory 改造成工具调用。
2. 不改 `newAgent` 的图路由结构。
3. 不把 WebSearch 一起并进统一召回。
4. 不在这一轮清理历史重复脏数据。
---
## 8. 第三步:做治理、清理、指标与切流收口
### 8.1 这一步解决什么问题
前两步做完后,系统可能“效果已经不错”,但仍缺三个上线必须项:
1. 出问题时怎么知道错在哪一层。
2. 历史已经写进去的重复脏数据怎么治理。
3. 什么时候能把 legacy 路径关掉。
第三步就是收口这一层。
### 8.2 本轮要落的能力
第三步建议至少做以下能力:
1. 为写入决策、读取召回、注入渲染补齐结构化日志和指标。
2. 增加历史重复清理能力。
3. 补齐 `update/restore` 等审计语义。
4. 明确 feature flag 切流策略与回滚手册。
5. 更新文档,避免后续维护者只看到旧 README。
### 8.3 推荐的文件落点
建议修改文件:
1. `backend/memory/utils/audit.go`
2. `backend/memory/service/manage_service.go`
3. `backend/memory/repo/item_repo.go`
4. `backend/memory/README.md`
5. `backend/memory/记忆模块实施计划.md`
建议新增文件:
1. `backend/memory/cleanup/dedup_runner.go`
- 用于历史重复治理。
2. `backend/memory/cleanup/dedup_policy.go`
- 负责定义“保留哪条、归档哪条”。
3. `backend/memory/observe/log_fields.go`
- 统一日志字段,避免不同文件各写各的。
### 8.4 历史数据清理建议
建议不要直接写危险 SQL 一把梭清表,而是通过可审计的治理流程清理历史脏数据:
1.`user_id + memory_type + content_hash + status=active` 扫描重复组。
2. 为每组挑一个保留主记录:
- 优先保留最近更新
- 或优先保留置信度更高
3. 其余重复项改为 `archived``deleted`
4. 对每次治理动作写审计日志。
建议第一版优先做“离线治理工具”或“手动触发 job”不要直接绑到主 worker 周期任务里。
### 8.5 建议补的指标
第三步建议至少打这些指标:
1. `memory_job_success_rate`
2. `memory_job_retry_rate`
3. `memory_decision_distribution`
4. `memory_decision_fallback_rate`
5. `memory_retrieve_hit_count`
6. `memory_retrieve_dedup_drop_count`
7. `memory_inject_item_count`
8. `memory_rag_fallback_rate`
9. `memory_wrong_mention_rate`
10. `memory_user_correction_rate`
其中前八项可以本轮先落,后两项可通过后续用户纠正入口接入。
### 8.6 建议的切流顺序
第三步不要“一刀切”。建议按以下顺序灰度:
1. 阶段 A决策层 shadow 模式
- 真正写库仍走 legacy
- 新决策层只做日志,不生效
2. 阶段 B决策层仅对显式记忆生效
3. 阶段 C决策层对全部写入生效
4. 阶段 D读侧切到 hybrid
5. 阶段 E注入切到 typed_v2
6. 阶段 F历史清理跑完再考虑关闭 legacy 默认路径
### 8.7 这一步的验收标准
满足以下条件,可认为第三步完成:
1. 能从日志看清某条记忆为何被 `ADD/UPDATE/DELETE/NONE`
2. 能从指标看清读侧命中、去重、降级、回滚情况。
3. 能对历史重复数据做可审计清理。
4. 出现异常时可在分钟级通过开关退回 legacy。
5. 文档与代码现状一致,不再依赖口头传递。
### 8.8 这一步的回滚点
第三步的回滚不应影响前两步代码保留,只需回切开关:
1. 决策层回到 `legacy`
2. 读侧回到 `legacy`
3. 注入渲染回到 `flat`
4. 停掉清理任务
### 8.9 这一步明确不做什么
第三步仍然不建议同时做以下事情:
1. 不做图记忆。
2. 不做多 Provider 工厂化。
3. 不拆独立 memory 服务。
4. 不把 WebSearch 与 Memory 强行合并到同一轮上线。
---
## 9. 推荐的三轮交付顺序
如果资源有限,建议严格按下面顺序推进:
1. 先做第一步。
- 原因:写侧如果还是“抽取即新增”,读侧再怎么优化也会越来越脏。
2. 再做第二步。
- 原因:写侧稳定后,读侧才能真正体现效果。
3. 最后做第三步。
- 原因:治理、指标、清理要建立在前两步能力已经基本成形的前提下。
一句话总结:
1. 先让系统“会整理记忆”。
2. 再让系统“会正确读记忆”。
3. 最后让系统“可稳定上线和维护”。
---
## 10. 建议的任务拆分方式
如果后续要多人并行,建议按职责边界拆,而不是按文件随意拆:
### 10.1 第一步可拆为两块
1. 决策模型与编排
- `decision.go`
- `llm_decision_orchestrator.go`
- `decision_validate.go`
2. Repo 与动作执行
- `item_repo.go`
- `apply_actions.go`
- `audit.go`
### 10.2 第二步可拆为两块
1. 读侧召回与合并
- `read_service.go`
- `retrieve_merge.go`
- `retrieve_rank.go`
2. newAgent 注入渲染
- `agent_memory.go`
- `agent_memory_render.go`
### 10.3 第三步可拆为两块
1. 治理与清理
- `dedup_runner.go`
- `manage_service.go`
2. 观测与文档
- 指标日志
- README / 计划文档更新
---
## 11. 如果只看一个结论,请看这里
要让当前 memory 真正靠近 Mem0不是再加一张表也不是再加一个 prompt而是要完成以下收敛
1. 写入侧从“抽到就加”升级为“先回看旧记忆,再决定加改删不做”。
2. 读侧从“查到就拼”升级为“硬约束优先、语义补充、结果去重、预算注入”。
3. 系统侧从“能跑”升级为“有灰度、有指标、有清理、有回滚”。
只要三步按这个顺序推进,最终得到的就不是一个“会不断积灰的记忆表”,而是一套真正能为 `newAgent` 服务的记忆系统。

View File

@@ -1,93 +0,0 @@
# Execute 阶段历史消息注入改造 Handoff
## 背景
execute 阶段给 LLM 的 4 条消息msg0-msg3全部是人工构造的摘要原始对话历史从未被直接注入。
导致断连恢复后 "继续" 成为孤立的 currentGoalLLM 无法从上下文推断原始意图。
## 现状
### 消息结构 (`execute_context.go:50-67`)
```
msg0 (System): 规则 + 工具简表
msg1 (Assistant): 历史摘要 — 用 pickExecuteUserInputs 提取 firstUser/lastUser 拼成一行
msg2 (Assistant): 当轮 ReAct Loop 窗口 — thought/tool_call/observation 详细记录
msg3 (System): 执行状态 + 锚点 — 又用 firstUser/lastUser 拼 "当前用户诉求"/"首轮目标来源"
```
### History 存储方式 (`execute.go`)
每一轮 ReAct loop 往 ConversationContext.History 写 3 条消息:
1. `assistant` + speak自然语言如 "我先查看当前安排。"
2. `assistant` + ToolCalls工具调用 JSONcontent 为空)
3. `tool` + observation工具返回可能很长
### 问题函数
- `pickExecuteUserInputs()` (execute_context.go:772) — 从全量 history 取第一条和最后一条 user message
- `extractExecuteGoalAnchors()` (execute_context.go:751) — 用上面的结果作为 initial/current goal
- `buildExecuteMessage1V3()` (execute_context.go:269) — 把 goal 拼成摘要行
- `buildExecuteMessage3()` (execute_context.go:351) — 把 goal 拼成执行锚点
## 改造方案
### 核心思路
msg1 从"人工提炼的摘要"变成"真实对话流"。只注入 **user + assistant speak**,不含 tool_call / observation这些已由 msg2 承载)。
### 改造后的消息结构
```
msg0 (System): 规则 + 工具简表(不变)
msg1 (Assistant): 真实对话历史user + assistant speak 交替)
msg2 (Assistant): 当轮 ReAct Loop 窗口(不变)
msg3 (System): 执行状态(删掉 goal 锚点,因为 msg1 已包含完整意图链)
```
### msg1 改造示例
现在:
```
历史上下文(仅供参考):
- 用户目标:帮我排一下这些任务类;最近补充:继续
- 历史归档 ReAct 摘要:已折叠 15 条旧记录...
```
改造后:
```
对话历史:
user: "帮我把周末的课整到工作日"
assistant: "好的,我来查看当前安排。"
assistant: "已找到6个周末任务开始逐个移动。"
assistant: "第一个任务已移动完成。"
user: "继续"
assistant: "继续处理剩余任务。"
历史归档 ReAct 摘要:已折叠 15 条旧记录...
```
### msg3 改造
删掉以下由 firstUser/lastUser 驱动的锚点:
- `当前用户诉求xxx`
- `首轮目标来源xxx`
保留其他锚点轮次、模式、plan 步骤、任务类等)。
### 上下文开销
speak 都是短句1-2 句60 轮 execute 约 60 条 speak远小于 msg2 里的工具结果 JSON。
可考虑加条数上限(如最近 30 条 user+assistant超出部分走归档摘要。
## 涉及文件
| 文件 | 改动 |
|---|---|
| `prompt/execute_context.go` | 重写 `buildExecuteMessage1V3`,新增从 history 提取 user+speak 的函数;删 `pickExecuteUserInputs` / `extractExecuteGoalAnchors`;简化 `buildExecuteMessage3` |
| `prompt/execute.go` | 无改动(系统规则不含 list_tasks 相关内容,已清理) |
| `node/execute.go` | 无改动history 写入逻辑已经存了 speak无需修改 |
## 已完成的前置工作
- `list_tasks` 工具已删除registry / prompt / context 全部清理干净,编译通过)

File diff suppressed because it is too large Load Diff

View File

@@ -67,6 +67,7 @@ type AgentGraphDeps struct {
ToolRegistry *newagenttools.ToolRegistry
ScheduleProvider ScheduleStateProvider // 按 DAO 注入Execute 节点按需加载 ScheduleState
SchedulePersistor SchedulePersistor // 按 DAO 注入,用于写工具执行后持久化变更
CompactionStore CompactionStore // 按 DAO 注入,用于 Execute 上下文压缩持久化
RoughBuildFunc RoughBuildFunc // 按 Service 注入,粗排算法入口
WriteSchedulePreview WriteSchedulePreviewFunc // 按 Service 注入,排程预览写入入口
}

View File

@@ -79,3 +79,11 @@ type ScopedScheduleStateProvider interface {
type SchedulePersistor interface {
PersistScheduleChanges(ctx context.Context, original, modified *schedule.ScheduleState, userID int) error
}
// CompactionStore 定义上下文压缩的持久化接口。
// 由 Service 层实现(组合 DAO + Redis Cache注入到 ExecuteNodeInput。
type CompactionStore interface {
LoadCompaction(ctx context.Context, userID int, chatID string) (summary string, watermark int, err error)
SaveCompaction(ctx context.Context, userID int, chatID string, summary string, watermark int) error
SaveContextTokenStats(ctx context.Context, userID int, chatID string, statsJSON string) error
}

View File

@@ -218,6 +218,7 @@ func (n *AgentNodes) Execute(ctx context.Context, st *newagentmodel.AgentGraphSt
ToolRegistry: st.Deps.ToolRegistry,
ScheduleState: scheduleState,
SchedulePersistor: st.Deps.SchedulePersistor,
CompactionStore: st.Deps.CompactionStore,
WriteSchedulePreview: st.Deps.WriteSchedulePreview,
OriginalScheduleState: st.OriginalScheduleState,
AlwaysExecute: st.Request.AlwaysExecute,

View File

@@ -55,6 +55,7 @@ type ExecuteNodeInput struct {
ToolRegistry *newagenttools.ToolRegistry
ScheduleState *schedule.ScheduleState
SchedulePersistor newagentmodel.SchedulePersistor
CompactionStore newagentmodel.CompactionStore
WriteSchedulePreview newagentmodel.WriteSchedulePreviewFunc
OriginalScheduleState *schedule.ScheduleState
AlwaysExecute bool // true 时写工具跳过确认闸门直接执行
@@ -180,6 +181,12 @@ func RunExecuteNode(ctx context.Context, input ExecuteNodeInput) error {
// 5. 构造本轮执行输入,请求 LLM 输出 ExecuteDecision。
messages := newagentprompt.BuildExecuteMessages(flowState, conversationContext)
// 5.1 Token 预算检查 & 上下文压缩。
messages = compactExecuteMessagesIfNeeded(
ctx, messages, input, flowState, emitter,
)
log.Printf(
"[DEBUG] execute LLM context begin chat=%s round=%d message_count=%d\n%s\n[DEBUG] execute LLM context end chat=%s round=%d",
flowState.ConversationID,

View File

@@ -0,0 +1,197 @@
package newagentnode
import (
"context"
"encoding/json"
"fmt"
"log"
newagentmodel "github.com/LoveLosita/smartflow/backend/newAgent/model"
newagentprompt "github.com/LoveLosita/smartflow/backend/newAgent/prompt"
newagentstream "github.com/LoveLosita/smartflow/backend/newAgent/stream"
"github.com/LoveLosita/smartflow/backend/pkg"
"github.com/cloudwego/eino/schema"
)
// compactExecuteMessagesIfNeeded 检查 Execute prompt 的 token 预算,
// 超限时对 msg1历史对话和 msg2ReAct Loop执行 LLM 压缩。
//
// 消息布局约定(由 BuildExecuteMessages 返回):
//
// [0] system — msg0: 系统规则
// [1] assistant — msg1: 历史对话上下文
// [2] assistant — msg2: 当轮 ReAct Loop 记录
// [3] system — msg3: 当前状态 + 用户提示
func compactExecuteMessagesIfNeeded(
ctx context.Context,
messages []*schema.Message,
input ExecuteNodeInput,
flowState *newagentmodel.CommonState,
emitter *newagentstream.ChunkEmitter,
) []*schema.Message {
if len(messages) != 4 {
return messages
}
// 提取四条消息的文本内容
msg0 := messages[0].Content
msg1 := messages[1].Content
msg2 := messages[2].Content
msg3 := messages[3].Content
// Token 预算检查
breakdown, overBudget, needCompactMsg1, needCompactMsg2 := pkg.CheckExecuteTokenBudget(msg0, msg1, msg2, msg3)
log.Printf(
"[COMPACT] token budget check: total=%d budget=%d over=%v compactMsg1=%v compactMsg2=%v (msg0=%d msg1=%d msg2=%d msg3=%d)",
breakdown.Total, breakdown.Budget, overBudget, needCompactMsg1, needCompactMsg2,
breakdown.Msg0, breakdown.Msg1, breakdown.Msg2, breakdown.Msg3,
)
if !overBudget {
// 未超限,记录 token 分布后直接返回
saveTokenStats(ctx, input, flowState, breakdown)
return messages
}
// ---- msg1 压缩 ----
if needCompactMsg1 {
msg1 = compactMsg1IfNeeded(ctx, input, flowState, emitter, msg1)
messages[1].Content = msg1
// 压缩 msg1 后重算预算
breakdown = pkg.EstimateExecuteMessagesTokens(msg0, msg1, msg2, msg3)
}
// ---- msg2 压缩 ----
if needCompactMsg2 || breakdown.Total > pkg.ExecuteTokenBudget {
msg2 = compactMsg2IfNeeded(ctx, input, flowState, emitter, msg2)
messages[2].Content = msg2
breakdown = pkg.EstimateExecuteMessagesTokens(msg0, msg1, msg2, msg3)
}
// 记录最终 token 分布
saveTokenStats(ctx, input, flowState, breakdown)
log.Printf(
"[COMPACT] after compaction: total=%d budget=%d (msg0=%d msg1=%d msg2=%d msg3=%d)",
breakdown.Total, breakdown.Budget,
breakdown.Msg0, breakdown.Msg1, breakdown.Msg2, breakdown.Msg3,
)
return messages
}
// compactMsg1IfNeeded 对 msg1历史对话执行 LLM 压缩。
func compactMsg1IfNeeded(
ctx context.Context,
input ExecuteNodeInput,
flowState *newagentmodel.CommonState,
emitter *newagentstream.ChunkEmitter,
msg1 string,
) string {
compactionStore := input.CompactionStore
if compactionStore == nil {
log.Printf("[COMPACT] CompactionStore is nil, skip msg1 compaction")
return msg1
}
// 加载已有压缩摘要
existingSummary, _, err := compactionStore.LoadCompaction(ctx, flowState.UserID, flowState.ConversationID)
if err != nil {
log.Printf("[COMPACT] load existing compaction failed: %v, proceed without cache", err)
}
// SSE: 压缩开始
tokenBefore := pkg.EstimateTextTokens(msg1)
_ = emitter.EmitStatus(
executeStatusBlockID, "compact_msg1", "context_compact_start",
fmt.Sprintf("正在压缩对话历史(%d tokens...", tokenBefore),
false,
)
// 调用 LLM 压缩
newSummary, err := newagentprompt.CompactMsg1(ctx, input.Client, msg1, existingSummary)
if err != nil {
log.Printf("[COMPACT] compact msg1 failed: %v", err)
_ = emitter.EmitStatus(
executeStatusBlockID, "compact_msg1", "context_compact_done",
"对话历史压缩失败,使用原始文本",
false,
)
return msg1
}
// SSE: 压缩完成
tokenAfter := pkg.EstimateTextTokens(newSummary)
_ = emitter.EmitStatus(
executeStatusBlockID, "compact_msg1", "context_compact_done",
fmt.Sprintf("对话历史已压缩:%d → %d tokens", tokenBefore, tokenAfter),
false,
)
// 持久化压缩结果
if err := compactionStore.SaveCompaction(ctx, flowState.UserID, flowState.ConversationID, newSummary, flowState.RoundUsed); err != nil {
log.Printf("[COMPACT] save compaction failed: %v", err)
}
return newSummary
}
// compactMsg2IfNeeded 对 msg2ReAct Loop 记录)执行 LLM 压缩。
func compactMsg2IfNeeded(
ctx context.Context,
input ExecuteNodeInput,
flowState *newagentmodel.CommonState,
emitter *newagentstream.ChunkEmitter,
msg2 string,
) string {
// SSE: 压缩开始
tokenBefore := pkg.EstimateTextTokens(msg2)
_ = emitter.EmitStatus(
executeStatusBlockID, "compact_msg2", "context_compact_start",
fmt.Sprintf("正在压缩执行记录(%d tokens...", tokenBefore),
false,
)
// 调用 LLM 压缩
compressed, err := newagentprompt.CompactMsg2(ctx, input.Client, msg2)
if err != nil {
log.Printf("[COMPACT] compact msg2 failed: %v", err)
_ = emitter.EmitStatus(
executeStatusBlockID, "compact_msg2", "context_compact_done",
"执行记录压缩失败,使用原始文本",
false,
)
return msg2
}
// SSE: 压缩完成
tokenAfter := pkg.EstimateTextTokens(compressed)
_ = emitter.EmitStatus(
executeStatusBlockID, "compact_msg2", "context_compact_done",
fmt.Sprintf("执行记录已压缩:%d → %d tokens", tokenBefore, tokenAfter),
false,
)
return compressed
}
// saveTokenStats 持久化当前 token 分布到 DB。
func saveTokenStats(
ctx context.Context,
input ExecuteNodeInput,
flowState *newagentmodel.CommonState,
breakdown pkg.ExecuteTokenBreakdown,
) {
compactionStore := input.CompactionStore
if compactionStore == nil {
return
}
statsJSON, err := json.Marshal(breakdown)
if err != nil {
log.Printf("[COMPACT] marshal token stats failed: %v", err)
return
}
if err := compactionStore.SaveContextTokenStats(ctx, flowState.UserID, flowState.ConversationID, string(statsJSON)); err != nil {
log.Printf("[COMPACT] save token stats failed: %v", err)
}
}

View File

@@ -0,0 +1,62 @@
package newagentprompt
import (
"context"
"fmt"
infrallm "github.com/LoveLosita/smartflow/backend/infra/llm"
"github.com/cloudwego/eino/schema"
)
const compactMsg1SystemPrompt = `你是一个对话压缩助手。你的任务是将以下多轮对话历史压缩为一段简洁的结构化摘要。
要求:
1. 保留用户的核心诉求和意图(原文关键词不要丢失)
2. 保留所有已确认的约束条件和规则
3. 保留关键操作决策和结果(比如排程相关的调整结果)
4. 保留用户偏好的重要信息
5. 去除冗余和重复信息
6. 按要点列出,每条一行
直接输出压缩后的摘要,不要输出其他内容。`
// CompactMsg1 将 msg1历史对话压缩为摘要。
// existingSummary 不为空时表示已有旧摘要,需要合并压缩。
func CompactMsg1(
ctx context.Context,
client *infrallm.Client,
historyText string,
existingSummary string,
) (string, error) {
var userContent string
if existingSummary != "" {
userContent = fmt.Sprintf(`已有压缩摘要:
%s
新增的对话记录:
%s
请将以上两部分合并为一份更紧凑的摘要。`, existingSummary, historyText)
} else {
userContent = fmt.Sprintf(`对话历史:
%s
请压缩以上对话历史。`, historyText)
}
messages := []*schema.Message{
schema.SystemMessage(compactMsg1SystemPrompt),
schema.UserMessage(userContent),
}
result, err := client.GenerateText(ctx, messages, infrallm.GenerateOptions{
MaxTokens: 4000,
})
if err != nil {
return "", fmt.Errorf("compact msg1 LLM call failed: %w", err)
}
if result == nil || result.Text == "" {
return "", fmt.Errorf("compact msg1 LLM returned empty result")
}
return result.Text, nil
}

View File

@@ -0,0 +1,49 @@
package newagentprompt
import (
"context"
"fmt"
infrallm "github.com/LoveLosita/smartflow/backend/infra/llm"
"github.com/cloudwego/eino/schema"
)
const compactMsg2SystemPrompt = `你是一个执行记录压缩助手。你的任务是将以下 ReAct 执行循环记录压缩为简洁摘要。
要求:
1. 保留每个工具调用的关键返回值尤其是包含排程数据的JSON
2. 保留执行路径(哪些操作成功了,哪些失败了)
3. 保留当前执行进度(正在做什么,下一步要做什么)
4. 去除重复的工具调用结果
5. 按时间顺序组织,每条一行
直接输出压缩后的摘要,不要输出其他内容。`
// CompactMsg2 将 msg2ReAct Loop 记录)的早期部分压缩为摘要。
// recentText 是保留的近期记录原文,不参与压缩。
func CompactMsg2(
ctx context.Context,
client *infrallm.Client,
earlyLoopText string,
) (string, error) {
userContent := fmt.Sprintf(`早期的 ReAct 执行记录:
%s
请压缩以上执行记录,保留关键信息。`, earlyLoopText)
messages := []*schema.Message{
schema.SystemMessage(compactMsg2SystemPrompt),
schema.UserMessage(userContent),
}
result, err := client.GenerateText(ctx, messages, infrallm.GenerateOptions{
MaxTokens: 4000,
})
if err != nil {
return "", fmt.Errorf("compact msg2 LLM call failed: %w", err)
}
if result == nil || result.Text == "" {
return "", fmt.Errorf("compact msg2 LLM returned empty result")
}
return result.Text, nil
}

View File

@@ -17,13 +17,15 @@ const (
executeHistoryKindLoopClosed = "execute_loop_closed"
executeHistoryKindStepAdvanced = "execute_step_advanced"
// executeLoopWindowLimit 控制当轮 ReAct Loop 窗口最多保留多少条记录。
// 采用固定窗口能避免上下文无上限增长,且可保持“最近行为”可追踪。
// executeLoopWindowLimit 控制当轮 ReAct Loop 窗口最多保留多少条记录。
executeLoopWindowLimit = 8
// executeTrimmedObservationText 是重复工具压缩后的 observation 占位文案。
// 当同工具在窗口内出现多次时,只保留最新一条真实结果,其余旧结果统一替换为该文案。
executeTrimmedObservationText = "当前工具调用结果已经被使用过,当前无需使用,为节省上下文空间,已折叠"
// executeConversationTurnLimit 控制 msg1 注入的最大对话轮数user + assistant speak
// 超出时保留最近的条目,早期部分由 ReAct 摘要兜底。
executeConversationTurnLimit = 30
)
type executeToolSchemaDoc struct {
@@ -44,9 +46,9 @@ const executeMessage1MaxRunes = 1400
//
// 消息结构(固定):
// 1. message[0] 固定 prompt规则 + 微调硬引导 + 输出约束 + 工具简表)
// 2. message[1] 历史上下文(聊天摘要 + 早期 ReAct 摘要)
// 2. message[1] 历史上下文(真实对话流 + 早期 ReAct 摘要)
// 3. message[2] 当轮 ReAct Loop 窗口thought/reason + tool_call + observation 绑定展示)
// 4. message[3] 当前执行状态(含初始目标、结束判断原则、非目标
// 4. message[3] 当前执行状态(轮次、模式、plan 步骤、任务类等
func buildExecuteStageMessages(
stageSystemPrompt string,
state *newagentmodel.CommonState,
@@ -80,134 +82,7 @@ func buildExecuteMessage0(stageSystemPrompt string, ctx *newagentmodel.Conversat
return base + "\n\n" + toolCatalog
}
// buildExecuteMessage1 生成历史上下文短摘要
func buildExecuteMessage1(ctx *newagentmodel.ConversationContext) string {
lines := []string{"历史上下文(仅供参考):"}
if ctx == nil {
lines = append(lines,
"- 用户目标:暂无可用历史输入。",
"- 阶段锚点:按当前工具事实推进执行。",
"- 早期 ReAct 摘要:暂无。",
)
return strings.Join(lines, "\n")
}
history := ctx.HistorySnapshot()
firstUser, lastUser := pickExecuteUserInputs(history)
switch {
case firstUser == "":
lines = append(lines, "- 用户目标:暂无可用历史输入。")
case lastUser != "" && lastUser != firstUser:
lines = append(lines, "- 用户目标:"+firstUser+";最近补充:"+lastUser)
default:
lines = append(lines, "- 用户目标:"+firstUser)
}
if hasExecuteRoughBuildDone(ctx) {
lines = append(lines, "- 阶段锚点:粗排已完成,本轮仅做微调,不重新 place。")
} else {
lines = append(lines, "- 阶段锚点:按当前工具事实推进,不做无依据操作。")
}
allLoops := collectExecuteLoopRecords(history)
lines = append(lines, "- 早期 ReAct 摘要:"+buildEarlyExecuteReactSummary(allLoops, executeLoopWindowLimit))
return strings.Join(lines, "\n")
}
// buildExecuteMessage2 生成当轮 ReAct Loop 窗口。
//
// 规则:
// 1. 每条记录都展示 thought/reason + tool_call + observation
// 2. 对窗口内重复工具应用压缩:同工具只保留最新一条真实 observation
// 3. 被压缩的旧 observation 统一替换为占位文案,避免语义断裂。
func buildExecuteMessage2(ctx *newagentmodel.ConversationContext) string {
lines := []string{"当轮 ReAct Loop 记录(窗口):"}
if ctx == nil {
lines = append(lines, "- 暂无可用 ReAct 记录。")
return strings.Join(lines, "\n")
}
allLoops := collectExecuteLoopRecords(ctx.HistorySnapshot())
if len(allLoops) == 0 {
lines = append(lines, "- 暂无可用 ReAct 记录。")
return strings.Join(lines, "\n")
}
windowLoops := tailExecuteLoops(allLoops, executeLoopWindowLimit)
windowLoops = compressExecuteLoopObservationsByTool(windowLoops)
for i, loop := range windowLoops {
lines = append(lines, fmt.Sprintf("%d) thought/reason%s", i+1, loop.Thought))
lines = append(lines, fmt.Sprintf(" tool_call%s", renderExecuteToolCallText(loop.ToolName, loop.ToolArgs)))
lines = append(lines, fmt.Sprintf(" observation%s", loop.Observation))
}
return strings.Join(lines, "\n")
}
// buildExecuteMessage3 生成当前执行状态与执行锚点。
// buildExecuteMessage1V2 生成历史摘要:
// 1. 已收口的 loop 归档到 msg1
// 2. 当前活跃 loop 只保留“早期摘要”;
// 3. 最终对 msg1 做统一长度裁剪,控制 token 开销。
func buildExecuteMessage1V2(ctx *newagentmodel.ConversationContext) string {
lines := []string{"历史上下文(仅供参考):"}
if ctx == nil {
lines = append(lines,
"- 用户目标:暂无可用历史输入。",
"- 阶段锚点:按当前工具事实推进执行。",
"- 历史归档 ReAct 摘要:暂无。",
"- 当前循环早期摘要:暂无。",
)
return trimExecuteMessage1ByBudget(strings.Join(lines, "\n"))
}
history := ctx.HistorySnapshot()
firstUser, lastUser := pickExecuteUserInputs(history)
switch {
case firstUser == "":
lines = append(lines, "- 用户目标:暂无可用历史输入。")
case lastUser != "" && lastUser != firstUser:
lines = append(lines, "- 用户目标:"+firstUser+";最近补充:"+lastUser)
default:
lines = append(lines, "- 用户目标:"+firstUser)
}
if hasExecuteRoughBuildDone(ctx) {
lines = append(lines, "- 阶段锚点:粗排已完成,本轮仅做微调,不重新 place。")
} else {
lines = append(lines, "- 阶段锚点:按当前工具事实推进,不做无依据操作。")
}
archivedLoops, activeLoops := splitExecuteLoopRecordsByBoundary(history)
lines = append(lines, "- 历史归档 ReAct 摘要:"+buildEarlyExecuteReactSummary(archivedLoops, 0))
lines = append(lines, "- 当前循环早期摘要:"+buildEarlyExecuteReactSummary(activeLoops, executeLoopWindowLimit))
return trimExecuteMessage1ByBudget(strings.Join(lines, "\n"))
}
// buildExecuteMessage2V2 仅展示“当前活跃 loop”的窗口记录。
func buildExecuteMessage2V2(ctx *newagentmodel.ConversationContext) string {
lines := []string{"当轮 ReAct Loop 记录(窗口):"}
if ctx == nil {
lines = append(lines, "- 暂无可用 ReAct 记录。")
return strings.Join(lines, "\n")
}
_, activeLoops := splitExecuteLoopRecordsByBoundary(ctx.HistorySnapshot())
if len(activeLoops) == 0 {
lines = append(lines, "- 暂无可用 ReAct 记录。")
return strings.Join(lines, "\n")
}
windowLoops := tailExecuteLoops(activeLoops, executeLoopWindowLimit)
windowLoops = compressExecuteLoopObservationsByTool(windowLoops)
for i, loop := range windowLoops {
lines = append(lines, fmt.Sprintf("%d) thought/reason%s", i+1, loop.Thought))
lines = append(lines, fmt.Sprintf(" tool_call%s", renderExecuteToolCallText(loop.ToolName, loop.ToolArgs)))
lines = append(lines, fmt.Sprintf(" observation%s", loop.Observation))
}
return strings.Join(lines, "\n")
}
// splitExecuteLoopRecordsByBoundary 按“已收口标记”拆分归档/活跃 ReAct 记录。
// splitExecuteLoopRecordsByBoundary 按已收口标记拆分归档/活跃 ReAct 记录
//
// 规则:
// 1. 标记之前的记录归档到 msg1
@@ -265,29 +140,38 @@ func trimExecuteMessage1ByBudget(content string) string {
return string(runes[:executeMessage1MaxRunes-3]) + "..."
}
// buildExecuteMessage1V3 负责把上一轮 loop 归档并入 msg1并统一做长度裁剪。
// buildExecuteMessage1V3 负责把真实对话流 + 上一轮 loop 归档并入 msg1并统一做长度裁剪。
//
// 改造说明:
// 1. msg1 从人工提炼的摘要变为真实对话流,只注入 user + assistant speak
// 2. tool_call / observation 不在 msg1 中重复(已由 msg2 承载);
// 3. 超出 executeConversationTurnLimit 的早期对话不注入,由 ReAct 摘要兜底。
func buildExecuteMessage1V3(ctx *newagentmodel.ConversationContext) string {
lines := []string{"历史上下文(仅供参考)"}
lines := []string{"历史上下文:"}
if ctx == nil {
lines = append(lines,
"- 用户目标:暂无可用历史输入。",
"- 对话历史:暂无。",
"- 阶段锚点:按当前工具事实推进执行。",
"- 历史归档 ReAct 摘要:暂无。",
"- 历史归档 ReAct 窗口:暂无。",
"- 当前循环早期摘要:暂无。",
)
return trimExecuteMessage1ByBudget(strings.Join(lines, "\n"))
return strings.Join(lines, "\n")
}
history := ctx.HistorySnapshot()
firstUser, lastUser := pickExecuteUserInputs(history)
switch {
case firstUser == "":
lines = append(lines, "- 用户目标:暂无可用历史输入。")
case lastUser != "" && lastUser != firstUser:
lines = append(lines, "- 用户目标:"+firstUser+";最近补充:"+lastUser)
default:
lines = append(lines, "- 用户目标:"+firstUser)
// 注入真实对话流user + assistant speak全量放入不再限制轮数和单条长度。
turns := collectExecuteConversationTurns(history)
if len(turns) == 0 {
lines = append(lines, "- 对话历史:暂无。")
} else {
turnLines := make([]string, 0, len(turns)+1)
turnLines = append(turnLines, "对话历史:")
for _, turn := range turns {
turnLines = append(turnLines, turn.Role+": \""+turn.Content+"\"")
}
lines = append(lines, strings.Join(turnLines, "\n"))
}
if hasExecuteRoughBuildDone(ctx) {
@@ -296,20 +180,18 @@ func buildExecuteMessage1V3(ctx *newagentmodel.ConversationContext) string {
lines = append(lines, "- 阶段锚点:按当前工具事实推进,不做无依据操作。")
}
// 1. 通过收口标记拆分“归档 loop / 当前活跃 loop”。
// 2. 归档 loop 的窗口条目直接并入 msg1满足“上一轮 msg2 挪入 msg1”。
// 3. 当前活跃 loop 在 msg1 只保留早期摘要,详细窗口交给 msg2。
archivedLoops, activeLoops := splitExecuteLoopRecordsByBoundary(history)
lines = append(lines, "- 历史归档 ReAct 摘要:"+buildEarlyExecuteReactSummary(archivedLoops, executeLoopWindowLimit))
lines = append(lines, renderArchivedExecuteLoopWindowForMessage1V3(archivedLoops))
lines = append(lines, "- 当前循环早期摘要:"+buildEarlyExecuteReactSummary(activeLoops, executeLoopWindowLimit))
return trimExecuteMessage1ByBudget(strings.Join(lines, "\n"))
return strings.Join(lines, "\n")
}
// buildExecuteMessage2V3 承载当前活跃 loop”的窗口
// 若是新一轮刚开始(活跃 loop 为空),明确返回已清空状态。
// buildExecuteMessage2V3 承载当前活跃 loop 的全部记录
// 若是新一轮刚开始(活跃 loop 为空),明确返回已清空状态。
// 不再限制窗口大小token 预算由 execute 层统一管理。
func buildExecuteMessage2V3(ctx *newagentmodel.ConversationContext) string {
lines := []string{"当轮 ReAct Loop 记录(窗口)"}
lines := []string{"当轮 ReAct Loop 记录:"}
if ctx == nil {
lines = append(lines, "- 暂无可用 ReAct 记录。")
return strings.Join(lines, "\n")
@@ -321,9 +203,8 @@ func buildExecuteMessage2V3(ctx *newagentmodel.ConversationContext) string {
return strings.Join(lines, "\n")
}
windowLoops := tailExecuteLoops(activeLoops, executeLoopWindowLimit)
windowLoops = compressExecuteLoopObservationsByTool(windowLoops)
for i, loop := range windowLoops {
// 全量放入,不再限制窗口大小
for i, loop := range activeLoops {
lines = append(lines, fmt.Sprintf("%d) thought/reason%s", i+1, loop.Thought))
lines = append(lines, fmt.Sprintf(" tool_call%s", renderExecuteToolCallText(loop.ToolName, loop.ToolArgs)))
lines = append(lines, fmt.Sprintf(" observation%s", loop.Observation))
@@ -367,18 +248,8 @@ func buildExecuteMessage3(state *newagentmodel.CommonState, ctx *newagentmodel.C
"- 当前模式:"+modeText,
)
initialGoal, currentGoal := extractExecuteGoalAnchors(ctx)
if currentGoal == "" {
currentGoal = "暂无可用目标描述,请按当前上下文稳步推进。"
}
lines = append(lines, "执行锚点:")
lines = append(lines, "- 当前用户诉求:"+currentGoal)
if initialGoal != "" && initialGoal != currentGoal {
lines = append(lines, "- 首轮目标来源:"+initialGoal)
}
// 1. 有 plan 时,把当前步骤与完成判定强制写入 msg3。
// 2. 该锚点用于约束模型只推进当前步骤,避免退化成泛化 ReAct。
// 2. 该锚点用于约束模型只推进当前步骤,避免退化成泛化 ReAct。
// 3. 当前步骤不可读时给出兜底指引,避免引用旧步骤。
if state != nil && state.HasPlan() {
current, total := state.PlanProgress()
@@ -411,7 +282,7 @@ func buildExecuteMessage3(state *newagentmodel.CommonState, ctx *newagentmodel.C
if hasExecuteRoughBuildDone(ctx) {
lines = append(lines, "- 阶段约束:粗排已完成,本轮只微调 suggestedexisting 仅作已安排事实参考,不作为可移动目标。")
}
lines = append(lines, "- 参数纪律:工具参数必须严格使用 schema 字段;若返回参数非法,需先改参再继续。")
lines = append(lines, "- 参数纪律:工具参数必须严格使用 schema 字段;若返回'参数非法',需先改参再继续。")
if state != nil {
if state.AllowReorder {
lines = append(lines, "- 顺序策略:用户已明确允许打乱顺序,可在必要时使用 min_context_switch。")
@@ -465,11 +336,7 @@ func renderExecuteToolCatalogCompact(ctx *newagentmodel.ConversationContext) str
return strings.Join(lines, "\n")
}
// renderExecuteToolReturnHint 返回工具的返回类型 + 最小示例
//
// 说明:
// 1. 所有工具当前都返回 string但部分是“JSON 字符串”,这里补齐内容形态示例减少模型盲猜;
// 2. 示例只保留最小片段,避免工具说明过长挤占上下文窗口。
// renderExecuteToolReturnHint 返回工具的返回类型 + 最小示例。
func renderExecuteToolReturnHint(toolName string) (returnType string, sample string) {
returnType = "string自然语言文本"
switch strings.ToLower(strings.TrimSpace(toolName)) {
@@ -669,11 +536,6 @@ func tailExecuteLoops(records []executeLoopRecord, limit int) []executeLoopRecor
}
// compressExecuteLoopObservationsByTool 对窗口内重复工具做 observation 压缩。
//
// 规则:
// 1. 以“工具名”作为压缩键;
// 2. 同工具仅保留最新一条 observation 原文;
// 3. 旧记录保持 thought/tool_call不丢记录仅替换 observation。
func compressExecuteLoopObservationsByTool(records []executeLoopRecord) []executeLoopRecord {
if len(records) == 0 {
return records
@@ -748,15 +610,6 @@ func buildEarlyExecuteReactSummary(records []executeLoopRecord, windowLimit int)
return fmt.Sprintf("已折叠 %d 条旧记录,涉及:%s。", len(early), strings.Join(parts, "、"))
}
func extractExecuteGoalAnchors(ctx *newagentmodel.ConversationContext) (initial string, current string) {
if ctx == nil {
return "", ""
}
history := ctx.HistorySnapshot()
firstUser, lastUser := pickExecuteUserInputs(history)
return firstUser, lastUser
}
func hasExecuteRoughBuildDone(ctx *newagentmodel.ConversationContext) bool {
if ctx == nil {
return false
@@ -769,25 +622,47 @@ func hasExecuteRoughBuildDone(ctx *newagentmodel.ConversationContext) bool {
return false
}
func pickExecuteUserInputs(history []*schema.Message) (first string, last string) {
realUsers := make([]string, 0, 2)
// conversationTurn 表示对话历史中的一轮交互user 或 assistant speak
type conversationTurn struct {
Role string
Content string
}
// collectExecuteConversationTurns 从历史消息中提取 user + assistant speak 对话流。
//
// 提取规则:
// 1. 只保留 user 消息(排除 correction prompt和 assistant speak 消息(非空 Content 且无 ToolCalls
// 2. 全量保留不再限制轮数和单条长度token 预算由 execute 层统一管理);
// 3. 返回的条目按原始时间顺序排列。
func collectExecuteConversationTurns(history []*schema.Message) []conversationTurn {
if len(history) == 0 {
return nil
}
turns := make([]conversationTurn, 0, len(history))
for _, msg := range history {
if msg == nil || msg.Role != schema.User {
if msg == nil {
continue
}
if isExecuteCorrectionPrompt(msg) {
continue
}
text := compactExecuteText(msg.Content, 120)
text := strings.TrimSpace(msg.Content)
if text == "" {
continue
}
realUsers = append(realUsers, text)
switch msg.Role {
case schema.User:
if isExecuteCorrectionPrompt(msg) {
continue
}
turns = append(turns, conversationTurn{Role: "user", Content: text})
case schema.Assistant:
if len(msg.ToolCalls) > 0 {
continue
}
turns = append(turns, conversationTurn{Role: "assistant", Content: text})
}
}
if len(realUsers) == 0 {
return "", ""
}
return realUsers[0], realUsers[len(realUsers)-1]
return turns
}
func isExecuteCorrectionPrompt(msg *schema.Message) bool {

View File

@@ -487,7 +487,8 @@ func parseQueryAvailableOptions(state *ScheduleState, args map[string]any) (quer
if exactFrom < 1 || exactTo > 12 || exactFrom > exactTo {
return queryAvailableOptions{}, fmt.Errorf("精确节次区间非法:%d-%d", exactFrom, exactTo)
}
span = exactTo - exactFrom + 1
// 不再用 section_from/section_to 覆盖 spanduration
// 两者独立span 控制每段长度section_from/section_to 控制搜索范围。
}
options := queryAvailableOptions{
@@ -663,7 +664,8 @@ func matchSectionRange(
exactTo *int,
) bool {
if exactFrom != nil && exactTo != nil {
if slotStart != *exactFrom || slotEnd != *exactTo {
// 范围包含语义slot 必须完全落在 [section_from, section_to] 区间内
if slotStart < *exactFrom || slotEnd > *exactTo {
return false
}
}

View File

@@ -21,6 +21,12 @@ const (
SessionWindowMin = 32
SessionWindowMax = 4096
SessionWindowBuffer = 2
// ---- Execute Context Compaction 预算 ----
// Execute 阶段 prompt 总 token 上限
ExecuteTokenBudget = 80000
// msg0 + msg3 固定开销 + 安全余量
ExecuteReserveTokens = 8000
)
// MaxContextTokensByModel 返回指定模型的最大上下文 token。
@@ -144,3 +150,42 @@ func CalcSessionWindowSize(trimmedHistoryLen int) int {
func isCJK(r rune) bool {
return unicode.Is(unicode.Han, r) || unicode.Is(unicode.Hiragana, r) || unicode.Is(unicode.Katakana, r) || unicode.Is(unicode.Hangul, r)
}
// ExecuteTokenBreakdown 是 Execute 阶段四条消息的 token 分布。
type ExecuteTokenBreakdown struct {
Msg0 int `json:"msg0"`
Msg1 int `json:"msg1"`
Msg2 int `json:"msg2"`
Msg3 int `json:"msg3"`
Total int `json:"total"`
Budget int `json:"budget"`
}
// EstimateExecuteMessagesTokens 估算 Execute 四条消息的 token 分布。
func EstimateExecuteMessagesTokens(msg0, msg1, msg2, msg3 string) ExecuteTokenBreakdown {
b := ExecuteTokenBreakdown{
Msg0: EstimateTextTokens(msg0),
Msg1: EstimateTextTokens(msg1),
Msg2: EstimateTextTokens(msg2),
Msg3: EstimateTextTokens(msg3),
Budget: ExecuteTokenBudget,
}
b.Total = b.Msg0 + b.Msg1 + b.Msg2 + b.Msg3
return b
}
// CheckExecuteTokenBudget 检查是否超出 token 预算。
// 返回 breakdown、是否超限、是否需要压缩 msg1、是否需要压缩 msg2。
func CheckExecuteTokenBudget(msg0, msg1, msg2, msg3 string) (breakdown ExecuteTokenBreakdown, overBudget bool, needCompactMsg1 bool, needCompactMsg2 bool) {
breakdown = EstimateExecuteMessagesTokens(msg0, msg1, msg2, msg3)
overBudget = breakdown.Total > ExecuteTokenBudget
if !overBudget {
return
}
// msg1 超过可用预算的一半时需要压缩
available := ExecuteTokenBudget - ExecuteReserveTokens
needCompactMsg1 = breakdown.Msg1 > available/2
// 压缩 msg1 后仍超限,则压缩 msg2
needCompactMsg2 = (breakdown.Total - breakdown.Msg1 + available/4) > ExecuteTokenBudget
return
}

View File

@@ -95,6 +95,7 @@ func RegisterRouters(handlers *api.ApiHandlers, cache *dao.CacheDAO, userRepo *d
agentGroup.GET("/conversation-list", handlers.AgentHandler.GetConversationList)
agentGroup.GET("/conversation-history", handlers.AgentHandler.GetConversationHistory)
agentGroup.GET("/schedule-preview", handlers.AgentHandler.GetSchedulePlanPreview)
agentGroup.GET("/context-stats", handlers.AgentHandler.GetContextStats)
}
}
// 初始化Gin引擎

View File

@@ -56,6 +56,7 @@ type AgentService struct {
scheduleProvider newagentmodel.ScheduleStateProvider
schedulePersistor newagentmodel.SchedulePersistor
agentStateStore newagentmodel.AgentStateStore
compactionStore newagentmodel.CompactionStore
memoryReader MemoryReader
}

View File

@@ -349,3 +349,8 @@ func trimRunes(text string, limit int) string {
runes := []rune(text)
return string(runes[:limit])
}
// GetContextStats 获取指定会话的上下文窗口 token 分布统计。
func (s *AgentService) GetContextStats(ctx context.Context, userID int, chatID string) (string, error) {
return s.repo.LoadContextTokenStats(ctx, userID, chatID)
}

View File

@@ -168,6 +168,7 @@ func (s *AgentService) runNewAgentGraph(
ToolRegistry: s.toolRegistry,
ScheduleProvider: s.scheduleProvider,
SchedulePersistor: s.schedulePersistor,
CompactionStore: s.compactionStore,
RoughBuildFunc: s.makeRoughBuildFunc(),
WriteSchedulePreview: s.makeWriteSchedulePreviewFunc(),
}
@@ -645,3 +646,8 @@ func (s *AgentService) SetSchedulePersistor(persistor newagentmodel.SchedulePers
func (s *AgentService) SetAgentStateStore(store newagentmodel.AgentStateStore) {
s.agentStateStore = store
}
// compactionStore 由 cmd/start.go 注入
func (s *AgentService) SetCompactionStore(store newagentmodel.CompactionStore) {
s.compactionStore = store
}