feat;优化了记忆检索的速度和token消耗(将question提出交给planenr)

This commit is contained in:
SengokuCola
2025-12-24 18:43:32 +08:00
parent 490589b0ad
commit 0852af49f9
15 changed files with 448 additions and 152 deletions

View File

@@ -2,7 +2,7 @@ import time
import json
import asyncio
import re
from typing import List, Dict, Any, Optional, Tuple, Set
from typing import List, Dict, Any, Optional, Tuple
from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
@@ -11,7 +11,8 @@ from src.common.database.database_model import ThinkingBack
from src.memory_system.retrieval_tools import get_tool_registry, init_all_tools
from src.memory_system.memory_utils import parse_questions_json
from src.llm_models.payload_content.message import MessageBuilder, RoleType, Message
from src.bw_learner.jargon_explainer import match_jargon_from_text, retrieve_concepts_with_jargon
from src.chat.message_receive.chat_stream import get_chat_manager
from src.bw_learner.jargon_explainer import retrieve_concepts_with_jargon
logger = get_logger("memory_retrieval")
@@ -100,6 +101,7 @@ def init_memory_retrieval_prompt():
**工具说明:**
- 如果涉及过往事件,或者查询某个过去可能提到过的概念,或者某段时间发生的事件。可以使用聊天记录查询工具查询过往事件
- 如果涉及人物,可以使用人物信息查询工具查询人物信息
- 如果遇到不熟悉的词语、缩写、黑话或网络用语可以使用query_words工具查询其含义
- 如果没有可靠信息且查询时间充足或者不确定查询类别也可以使用lpmm知识库查询作为辅助信息
**思考**
@@ -202,7 +204,6 @@ async def _react_agent_solve_question(
max_iterations: int = 5,
timeout: float = 30.0,
initial_info: str = "",
initial_jargon_concepts: Optional[List[str]] = None,
) -> Tuple[bool, str, List[Dict[str, Any]], bool]:
"""使用ReAct架构的Agent来解决问题
@@ -211,28 +212,29 @@ async def _react_agent_solve_question(
chat_id: 聊天ID
max_iterations: 最大迭代次数
timeout: 超时时间(秒)
initial_info: 初始信息(如概念检索结果)将作为collected_info的初始值
initial_jargon_concepts: 预先已解析过的黑话列表,避免重复解释
initial_info: 初始信息将作为collected_info的初始值
Returns:
Tuple[bool, str, List[Dict[str, Any]], bool]: (是否找到答案, 答案内容, 思考步骤列表, 是否超时)
"""
start_time = time.time()
collected_info = initial_info if initial_info else ""
enable_jargon_detection = global_config.memory.enable_jargon_detection
seen_jargon_concepts: Set[str] = set()
if enable_jargon_detection and initial_jargon_concepts:
for concept in initial_jargon_concepts:
concept = (concept or "").strip()
if concept:
seen_jargon_concepts.add(concept)
# 构造日志前缀:[聊天流名称],用于在日志中标识聊天流
try:
chat_name = get_chat_manager().get_stream_name(chat_id) or chat_id
except Exception:
chat_name = chat_id
react_log_prefix = f"[{chat_name}] "
thinking_steps = []
is_timeout = False
conversation_messages: List[Message] = []
first_head_prompt: Optional[str] = None # 保存第一次使用的head_prompt用于日志显示
last_tool_name: Optional[str] = None # 记录最后一次使用的工具名称
# 正常迭代max_iterations 次(最终评估单独处理,不算在迭代中)
for iteration in range(max_iterations):
# 使用 while 循环,支持额外迭代
iteration = 0
max_iterations_with_extra = max_iterations
while iteration < max_iterations_with_extra:
# 检查超时
if time.time() - start_time > timeout:
logger.warning(f"ReAct Agent超时已迭代{iteration}")
@@ -475,7 +477,7 @@ async def _react_agent_solve_question(
step["observations"] = ["检测到finish_search文本格式调用找到答案"]
thinking_steps.append(step)
logger.info(
f"ReAct Agent {iteration + 1} 次迭代 通过finish_search文本格式找到关于问题{question}的答案: {parsed_answer}"
f"{react_log_prefix}{iteration + 1} 次迭代 通过finish_search文本格式找到关于问题{question}的答案: {parsed_answer}"
)
_log_conversation_messages(
@@ -488,7 +490,7 @@ async def _react_agent_solve_question(
else:
# found_answer为True但没有提供answer视为错误继续迭代
logger.warning(
f"ReAct Agent {iteration + 1} 次迭代 finish_search文本格式found_answer为True但未提供answer"
f"{react_log_prefix}{iteration + 1} 次迭代 finish_search文本格式found_answer为True但未提供answer"
)
else:
# 未找到答案,直接退出查询
@@ -497,7 +499,9 @@ async def _react_agent_solve_question(
)
step["observations"] = ["检测到finish_search文本格式调用未找到答案"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 通过finish_search文本格式判断未找到答案")
logger.info(
f"{react_log_prefix}{iteration + 1} 次迭代 通过finish_search文本格式判断未找到答案"
)
_log_conversation_messages(
conversation_messages,
@@ -509,10 +513,12 @@ async def _react_agent_solve_question(
# 如果没有检测到finish_search格式记录思考过程继续下一轮迭代
step["observations"] = [f"思考完成,但未调用工具。响应: {response}"]
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 思考完成但未调用工具: {response}")
logger.info(
f"{react_log_prefix}{iteration + 1} 次迭代 思考完成但未调用工具: {response}"
)
collected_info += f"思考: {response}"
else:
logger.warning(f"ReAct Agent {iteration + 1} 次迭代 无工具调用且无响应")
logger.warning(f"{react_log_prefix}{iteration + 1} 次迭代 无工具调用且无响应")
step["observations"] = ["无响应且无工具调用"]
thinking_steps.append(step)
continue
@@ -541,7 +547,7 @@ async def _react_agent_solve_question(
step["observations"] = ["检测到finish_search工具调用找到答案"]
thinking_steps.append(step)
logger.info(
f"ReAct Agent {iteration + 1} 次迭代 通过finish_search工具找到关于问题{question}的答案: {finish_search_answer}"
f"{react_log_prefix}{iteration + 1} 次迭代 通过finish_search工具找到关于问题{question}的答案: {finish_search_answer}"
)
_log_conversation_messages(
@@ -554,14 +560,16 @@ async def _react_agent_solve_question(
else:
# found_answer为True但没有提供answer视为错误
logger.warning(
f"ReAct Agent {iteration + 1} 次迭代 finish_search工具found_answer为True但未提供answer"
f"{react_log_prefix}{iteration + 1} 次迭代 finish_search工具found_answer为True但未提供answer"
)
else:
# 未找到答案,直接退出查询
step["actions"].append({"action_type": "finish_search", "action_params": {"found_answer": False}})
step["observations"] = ["检测到finish_search工具调用未找到答案"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 通过finish_search工具判断未找到答案")
logger.info(
f"{react_log_prefix}{iteration + 1} 次迭代 通过finish_search工具判断未找到答案"
)
_log_conversation_messages(
conversation_messages,
@@ -578,13 +586,16 @@ async def _react_agent_solve_question(
tool_args = tool_call.args or {}
logger.debug(
f"ReAct Agent {iteration + 1} 次迭代 工具调用 {i + 1}/{len(tool_calls)}: {tool_name}({tool_args})"
f"{react_log_prefix}{iteration + 1} 次迭代 工具调用 {i + 1}/{len(tool_calls)}: {tool_name}({tool_args})"
)
# 跳过finish_search工具调用已经在上面处理过了
if tool_name == "finish_search":
continue
# 记录最后一次使用的工具名称(用于判断是否需要额外迭代)
last_tool_name = tool_name
# 普通工具调用
tool = tool_registry.get_tool(tool_name)
if tool:
@@ -604,14 +615,18 @@ async def _react_agent_solve_question(
return f"查询{tool_name_str}({param_str})的结果:{observation}"
except Exception as e:
error_msg = f"工具执行失败: {str(e)}"
logger.error(f"ReAct Agent 第 {iter_num + 1} 次迭代 工具 {tool_name_str} {error_msg}")
logger.error(
f"{react_log_prefix}{iter_num + 1} 次迭代 工具 {tool_name_str} {error_msg}"
)
return f"查询{tool_name_str}失败: {error_msg}"
tool_tasks.append(execute_single_tool(tool, tool_params, tool_name, iteration))
step["actions"].append({"action_type": tool_name, "action_params": tool_args})
else:
error_msg = f"未知的工具类型: {tool_name}"
logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 工具 {i + 1}/{len(tool_calls)} {error_msg}")
logger.warning(
f"{react_log_prefix}{iteration + 1} 次迭代 工具 {i + 1}/{len(tool_calls)} {error_msg}"
)
tool_tasks.append(asyncio.create_task(asyncio.sleep(0, result=f"查询{tool_name}失败: {error_msg}")))
# 并行执行所有工具
@@ -622,31 +637,16 @@ async def _react_agent_solve_question(
for i, (tool_call_item, observation) in enumerate(zip(tool_calls, observations, strict=False)):
if isinstance(observation, Exception):
observation = f"工具执行异常: {str(observation)}"
logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 工具 {i + 1} 执行异常: {observation}")
logger.error(
f"{react_log_prefix}{iteration + 1} 次迭代 工具 {i + 1} 执行异常: {observation}"
)
observation_text = observation if isinstance(observation, str) else str(observation)
stripped_observation = observation_text.strip()
step["observations"].append(observation_text)
collected_info += f"\n{observation_text}\n"
if stripped_observation:
# 检查工具输出中是否有新的jargon如果有则追加到工具结果中
if enable_jargon_detection:
jargon_concepts = match_jargon_from_text(stripped_observation, chat_id)
if jargon_concepts:
new_concepts = []
for concept in jargon_concepts:
normalized_concept = concept.strip()
if normalized_concept and normalized_concept not in seen_jargon_concepts:
new_concepts.append(normalized_concept)
seen_jargon_concepts.add(normalized_concept)
if new_concepts:
jargon_info = await retrieve_concepts_with_jargon(new_concepts, chat_id)
if jargon_info:
# 将jargon查询结果追加到工具结果中
observation_text += f"\n\n{jargon_info}"
collected_info += f"\n{jargon_info}\n"
logger.info(f"工具输出触发黑话解析: {new_concepts}")
# 不再自动检测工具输出中的jargon改为通过 query_words 工具主动查询
tool_builder = MessageBuilder()
tool_builder.set_role(RoleType.Tool)
tool_builder.add_text_content(observation_text)
@@ -655,15 +655,24 @@ async def _react_agent_solve_question(
thinking_steps.append(step)
# 检查是否需要额外迭代:如果最后一次使用的工具是 search_chat_history 且达到最大迭代次数,额外增加一回合
if iteration + 1 >= max_iterations and last_tool_name == "search_chat_history" and not is_timeout:
max_iterations_with_extra = max_iterations + 1
logger.info(
f"{react_log_prefix}达到最大迭代次数(已迭代{iteration + 1}次),最后一次使用工具为 search_chat_history额外增加一回合尝试"
)
iteration += 1
# 正常迭代结束后,如果达到最大迭代次数或超时,执行最终评估
# 最终评估单独处理,不算在迭代中
should_do_final_evaluation = False
if is_timeout:
should_do_final_evaluation = True
logger.warning(f"ReAct Agent超时,已迭代{iteration + 1}次,进入最终评估")
elif iteration + 1 >= max_iterations:
logger.warning(f"{react_log_prefix}超时,已迭代{iteration}次,进入最终评估")
elif iteration >= max_iterations:
should_do_final_evaluation = True
logger.info(f"ReAct Agent达到最大迭代次数(已迭代{iteration + 1}次),进入最终评估")
logger.info(f"{react_log_prefix}达到最大迭代次数(已迭代{iteration}次),进入最终评估")
if should_do_final_evaluation:
# 获取必要变量用于最终评估
@@ -766,8 +775,8 @@ async def _react_agent_solve_question(
return False, "最终评估阶段LLM调用失败", thinking_steps, is_timeout
if global_config.debug.show_memory_prompt:
logger.info(f"ReAct Agent 最终评估Prompt: {evaluation_prompt}")
logger.info(f"ReAct Agent 最终评估响应: {eval_response}")
logger.info(f"{react_log_prefix}最终评估Prompt: {evaluation_prompt}")
logger.info(f"{react_log_prefix}最终评估响应: {eval_response}")
# 从最终评估响应中提取found_answer或not_enough_info
found_answer_content = None
@@ -998,7 +1007,6 @@ async def _process_single_question(
chat_id: str,
context: str,
initial_info: str = "",
initial_jargon_concepts: Optional[List[str]] = None,
max_iterations: Optional[int] = None,
) -> Optional[str]:
"""处理单个问题的查询
@@ -1007,8 +1015,8 @@ async def _process_single_question(
question: 要查询的问题
chat_id: 聊天ID
context: 上下文信息
initial_info: 初始信息(如概念检索结果)将传递给ReAct Agent
initial_jargon_concepts: 已经处理过的黑话概念列表用于ReAct阶段的去重
initial_info: 初始信息将传递给ReAct Agent
max_iterations: 最大迭代次数
Returns:
Optional[str]: 如果找到答案返回格式化的结果字符串否则返回None
@@ -1022,8 +1030,6 @@ async def _process_single_question(
# 直接使用ReAct Agent查询不再从thinking_back获取缓存
# logger.info(f"使用ReAct Agent查询问题: {question[:50]}...")
jargon_concepts_for_agent = initial_jargon_concepts if global_config.memory.enable_jargon_detection else None
# 如果未指定max_iterations使用配置的默认值
if max_iterations is None:
max_iterations = global_config.memory.max_agent_iterations
@@ -1034,7 +1040,6 @@ async def _process_single_question(
max_iterations=max_iterations,
timeout=global_config.memory.agent_timeout_seconds,
initial_info=question_initial_info,
initial_jargon_concepts=jargon_concepts_for_agent,
)
# 存储查询历史到数据库(超时时不存储)
@@ -1062,6 +1067,8 @@ async def build_memory_retrieval_prompt(
target: str,
chat_stream,
think_level: int = 1,
unknown_words: Optional[List[str]] = None,
question: Optional[str] = None,
) -> str:
"""构建记忆检索提示
使用两段式查询第一步生成问题第二步使用ReAct Agent查询答案
@@ -1071,14 +1078,33 @@ async def build_memory_retrieval_prompt(
sender: 发送者名称
target: 目标消息内容
chat_stream: 聊天流对象
tool_executor: 工具执行器(保留参数以兼容接口)
think_level: 思考深度等级
unknown_words: Planner 提供的未知词语列表,优先使用此列表而不是从聊天记录匹配
question: Planner 提供的问题,当 planner_question 配置开启时,直接使用此问题进行检索
Returns:
str: 记忆检索结果字符串
"""
start_time = time.time()
logger.info(f"检测是否需要回忆,元消息:{message[:30]}...,消息长度: {len(message)}")
# 构造日志前缀:[聊天流名称],用于在日志中标识聊天流(优先群名称/用户昵称)
try:
group_info = chat_stream.group_info
user_info = chat_stream.user_info
# 群聊优先使用群名称
if group_info is not None and getattr(group_info, "group_name", None):
stream_name = group_info.group_name.strip() or str(group_info.group_id)
# 私聊使用用户昵称
elif user_info is not None and getattr(user_info, "user_nickname", None):
stream_name = user_info.user_nickname.strip() or str(user_info.user_id)
# 兜底使用 stream_id
else:
stream_name = chat_stream.stream_id
except Exception:
stream_name = chat_stream.stream_id
log_prefix = f"[{stream_name}] " if stream_name else ""
logger.info(f"{log_prefix}检测是否需要回忆,元消息:{message[:30]}...,消息长度: {len(message)}")
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
bot_name = global_config.bot.nickname
@@ -1089,63 +1115,78 @@ async def build_memory_retrieval_prompt(
if not recent_query_history:
recent_query_history = "最近没有查询记录。"
# 第一步:生成问题
question_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_question_prompt",
bot_name=bot_name,
time_now=time_now,
chat_history=message,
recent_query_history=recent_query_history,
sender=sender,
target_message=target,
)
success, response, reasoning_content, model_name = await llm_api.generate_with_model(
question_prompt,
model_config=model_config.model_task_config.tool_use,
request_type="memory.question",
)
if global_config.debug.show_memory_prompt:
logger.info(f"记忆检索问题生成提示词: {question_prompt}")
# logger.info(f"记忆检索问题生成响应: {response}")
if not success:
logger.error(f"LLM生成问题失败: {response}")
return ""
# 解析概念列表和问题列表
_, questions = parse_questions_json(response)
if questions:
logger.info(f"解析到 {len(questions)} 个问题: {questions}")
enable_jargon_detection = global_config.memory.enable_jargon_detection
concepts: List[str] = []
if enable_jargon_detection:
# 使用匹配逻辑自动识别聊天中的黑话概念
concepts = match_jargon_from_text(message, chat_id)
if concepts:
logger.info(f"黑话匹配命中 {len(concepts)} 个概念: {concepts}")
# 第一步:生成问题或使用 Planner 提供的问题
questions = []
# 如果 planner_question 配置开启,只使用 Planner 提供的问题,不使用旧模式
if global_config.memory.planner_question:
if question and isinstance(question, str) and question.strip():
# 清理和验证 question
cleaned_question = question.strip()
questions = [cleaned_question]
logger.info(f"{log_prefix}使用 Planner 提供的 question: {cleaned_question}")
else:
logger.debug("黑话匹配未命中任何概念")
# planner_question 开启但没有提供 question跳过记忆检索
logger.debug(f"{log_prefix}planner_question 已开启但未提供 question跳过记忆检索")
end_time = time.time()
logger.info(f"{log_prefix}无当次查询,不返回任何结果,耗时: {(end_time - start_time):.3f}")
return ""
else:
logger.debug("已禁用记忆检索中的黑话识别")
# planner_question 关闭使用旧模式LLM 生成问题
question_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_question_prompt",
bot_name=bot_name,
time_now=time_now,
chat_history=message,
recent_query_history=recent_query_history,
sender=sender,
target_message=target,
)
# 对匹配到的概念进行jargon检索作为初始信息
success, response, reasoning_content, model_name = await llm_api.generate_with_model(
question_prompt,
model_config=model_config.model_task_config.tool_use,
request_type="memory.question",
)
if global_config.debug.show_memory_prompt:
logger.info(f"{log_prefix}记忆检索问题生成提示词: {question_prompt}")
# logger.info(f"记忆检索问题生成响应: {response}")
if not success:
logger.error(f"{log_prefix}LLM生成问题失败: {response}")
return ""
# 解析概念列表和问题列表
_, questions = parse_questions_json(response)
if questions:
logger.info(f"{log_prefix}解析到 {len(questions)} 个问题: {questions}")
# 初始阶段:使用 Planner 提供的 unknown_words 进行检索(如果提供)
initial_info = ""
if enable_jargon_detection and concepts:
concept_info = await retrieve_concepts_with_jargon(concepts, chat_id)
if concept_info:
initial_info += concept_info
logger.debug(f"概念检索完成,结果: {concept_info}")
else:
logger.debug("概念检索未找到任何结果")
if unknown_words and len(unknown_words) > 0:
# 清理和去重 unknown_words
cleaned_concepts = []
for word in unknown_words:
if isinstance(word, str):
cleaned = word.strip()
if cleaned:
cleaned_concepts.append(cleaned)
if cleaned_concepts:
# 对匹配到的概念进行jargon检索作为初始信息
concept_info = await retrieve_concepts_with_jargon(cleaned_concepts, chat_id)
if concept_info:
initial_info += concept_info
logger.info(
f"{log_prefix}使用 Planner 提供的 unknown_words{len(cleaned_concepts)} 个概念,检索结果: {concept_info[:100]}..."
)
else:
logger.debug(f"{log_prefix}unknown_words 检索未找到任何结果")
if not questions:
logger.debug("模型认为不需要检索记忆或解析失败,不返回任何查询结果")
logger.debug(f"{log_prefix}模型认为不需要检索记忆或解析失败,不返回任何查询结果")
end_time = time.time()
logger.info(f"无当次查询,不返回任何结果,耗时: {(end_time - start_time):.3f}")
logger.info(f"{log_prefix}无当次查询,不返回任何结果,耗时: {(end_time - start_time):.3f}")
return ""
# 第二步:并行处理所有问题(使用配置的最大迭代次数和超时时间)
@@ -1157,17 +1198,16 @@ async def build_memory_retrieval_prompt(
max_iterations = base_max_iterations
timeout_seconds = global_config.memory.agent_timeout_seconds
logger.debug(
f"问题数量: {len(questions)}think_level={think_level},设置最大迭代次数: {max_iterations}(基础值: {base_max_iterations}),超时时间: {timeout_seconds}"
f"{log_prefix}问题数量: {len(questions)}think_level={think_level},设置最大迭代次数: {max_iterations}(基础值: {base_max_iterations}),超时时间: {timeout_seconds}"
)
# 并行处理所有问题,将概念检索结果作为初始信息传递
# 并行处理所有问题
question_tasks = [
_process_single_question(
question=question,
chat_id=chat_id,
context=message,
initial_info=initial_info,
initial_jargon_concepts=concepts if enable_jargon_detection else None,
max_iterations=max_iterations,
)
for question in questions
@@ -1180,7 +1220,7 @@ async def build_memory_retrieval_prompt(
question_results: List[str] = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"处理问题 '{questions[i]}' 时发生异常: {result}")
logger.error(f"{log_prefix}处理问题 '{questions[i]}' 时发生异常: {result}")
elif result is not None:
question_results.append(result)
@@ -1216,14 +1256,14 @@ async def build_memory_retrieval_prompt(
current_count = len(question_results)
cached_count = len(all_results) - current_count
logger.info(
f"记忆检索成功,耗时: {(end_time - start_time):.3f}秒,"
f"{log_prefix}记忆检索成功,耗时: {(end_time - start_time):.3f}秒,"
f"当前查询 {current_count} 条记忆,缓存 {cached_count} 条记忆,共 {len(all_results)} 条记忆"
)
return f"你回忆起了以下信息:\n{retrieved_memory}\n如果与回复内容相关,可以参考这些回忆的信息。\n"
else:
logger.debug("所有问题均未找到答案,且无缓存答案")
logger.debug(f"{log_prefix}所有问题均未找到答案,且无缓存答案")
return ""
except Exception as e:
logger.error(f"记忆检索时发生异常: {str(e)}")
logger.error(f"{log_prefix}记忆检索时发生异常: {str(e)}")
return ""