remove:移除传统记忆模式,默认开启lpmm模式

This commit is contained in:
SengokuCola
2026-03-11 18:20:34 +08:00
parent 788de70450
commit 045bd5e183
7 changed files with 5 additions and 897 deletions

View File

@@ -1,19 +0,0 @@
你的名字是{bot_name}。现在是{time_now}。
你正在参与聊天,你需要搜集信息来帮助你进行回复。
当前聊天记录:
{chat_history}
已收集的信息:
{collected_info}
**工具说明:**
- 如果涉及过往事件,或者查询某个过去可能提到过的概念,或者某段时间发生的事件。可以使用聊天记录查询工具查询过往事件
- 如果涉及人物,可以使用人物信息查询工具查询人物信息
- 如果遇到不熟悉的词语、缩写、黑话或网络用语可以使用query_words工具查询其含义
- 如果没有可靠信息且查询时间充足或者不确定查询类别也可以使用lpmm知识库查询作为辅助信息
**思考**
- 你可以对查询思路给出简短的思考:思考要简短,直接切入要点
- 先思考当前信息是否足够回答问题
- 如果信息不足则需要使用tool查询信息你必须给出使用什么工具进行查询
- 当你决定结束查询时必须调用return_information工具返回总结信息并结束查询

View File

@@ -1164,15 +1164,6 @@ class ExperimentalConfig(ConfigBase):
)
"""_wrap_为指定聊天添加额外的prompt配置列表"""
lpmm_memory: bool = Field(
default=False,
json_schema_extra={
"x-widget": "switch",
"x-icon": "database",
},
)
"""是否将聊天历史总结导入到LPMM知识库。开启后chat_history_summarizer总结出的历史记录会同时导入到知识库"""
class MaimMessageConfig(ConfigBase):
"""maim_message配置类"""

View File

@@ -942,8 +942,8 @@ class ChatHistorySummarizer:
else:
logger.warning(f"{self.log_prefix} 存储聊天历史记录到数据库失败")
# 如果配置开启,同时导入到LPMM知识库
if global_config.lpmm_knowledge.enable and global_config.experimental.lpmm_memory:
# 同时导入到LPMM知识库
if global_config.lpmm_knowledge.enable:
await self._import_to_lpmm_knowledge(
theme=theme,
summary=summary,

View File

@@ -237,13 +237,8 @@ async def _react_agent_solve_question(
if first_head_prompt is None:
# 第一次构建使用初始的collected_info即initial_info
initial_collected_info = initial_info or ""
# 根据配置选择使用哪个 prompt
prompt_name = (
"memory_retrieval_react_prompt_head_lpmm"
if global_config.experimental.lpmm_memory
else "memory_retrieval_react_prompt_head"
)
first_head_prompt_template = prompt_manager.get_prompt(prompt_name)
# 使用 LPMM 知识库检索 prompt
first_head_prompt_template = prompt_manager.get_prompt("memory_retrieval_react_prompt_head_lpmm")
first_head_prompt_template.add_context("bot_name", bot_name)
first_head_prompt_template.add_context("time_now", time_now)
first_head_prompt_template.add_context("chat_history", chat_history)

View File

@@ -11,9 +11,7 @@ from .tool_registry import (
)
# 导入所有工具的注册函数
from .query_chat_history import register_tool as register_query_chat_history
from .query_lpmm_knowledge import register_tool as register_lpmm_knowledge
from .query_person_info import register_tool as register_query_person_info
from .query_words import register_tool as register_query_words
from .return_information import register_tool as register_return_information
from src.config.config import global_config
@@ -21,13 +19,10 @@ from src.config.config import global_config
def init_all_tools():
"""初始化并注册所有记忆检索工具"""
# 如果开启了lpmm_memory则不注册query_chat_history工具
if not global_config.experimental.lpmm_memory:
register_query_chat_history()
register_query_person_info()
register_query_words()
register_return_information()
# LPMM知识库检索工具
if global_config.lpmm_knowledge.lpmm_mode == "agent":
register_lpmm_knowledge()

View File

@@ -1,528 +0,0 @@
"""
根据关键词或参与人在chat_history中查询记忆 - 工具实现
从ChatHistory表的聊天记录概述库中查询
"""
import json
from typing import Optional, Set
from datetime import datetime
from src.common.logger import get_logger
from src.common.database.database_model import ChatHistory
from src.chat.utils.utils import parse_keywords_string
from src.config.config import global_config
from .tool_registry import register_memory_retrieval_tool
logger = get_logger("memory_retrieval_tools")
def _parse_blacklist_to_chat_ids(blacklist: list[str]) -> Set[str]:
"""将黑名单配置platform:id:type格式转换为chat_id集合
Args:
blacklist: 黑名单配置列表,格式为 ["platform:id:type", ...]
Returns:
Set[str]: chat_id集合
"""
chat_ids = set()
if not blacklist:
return chat_ids
try:
from src.common.utils.utils_session import SessionUtils
for blacklist_item in blacklist:
if not isinstance(blacklist_item, str):
continue
try:
parts = blacklist_item.split(":")
if len(parts) != 3:
logger.warning(f"黑名单配置格式错误,应为 platform:id:type实际: {blacklist_item}")
continue
platform = parts[0]
id_str = parts[1]
stream_type = parts[2]
# 判断是否为群聊
is_group = stream_type == "group"
# 转换为chat_id
if is_group:
chat_id = SessionUtils.calculate_session_id(platform, group_id=str(id_str))
else:
chat_id = SessionUtils.calculate_session_id(platform, user_id=str(id_str))
if chat_id:
chat_ids.add(chat_id)
else:
logger.warning(f"无法将黑名单配置转换为chat_id: {blacklist_item}")
except Exception as e:
logger.warning(f"解析黑名单配置失败: {blacklist_item}, 错误: {e}")
except Exception as e:
logger.error(f"初始化黑名单chat_id集合失败: {e}")
return chat_ids
def _is_chat_id_in_blacklist(chat_id: str) -> bool:
"""检查chat_id是否在全局记忆黑名单中
Args:
chat_id: 要检查的chat_id
Returns:
bool: 如果chat_id在黑名单中返回True否则返回False
"""
blacklist = getattr(global_config.memory, "global_memory_blacklist", [])
if not blacklist:
return False
blacklist_chat_ids = _parse_blacklist_to_chat_ids(blacklist)
return chat_id in blacklist_chat_ids
async def search_chat_history(
chat_id: str,
keyword: Optional[str] = None,
participant: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
) -> str:
"""根据关键词或参与人查询记忆返回匹配的记忆id、记忆标题theme和关键词keywords
Args:
chat_id: 聊天ID
keyword: 关键词(可选,支持多个关键词,可用空格、逗号等分隔。匹配规则:如果关键词数量<=2必须全部匹配如果关键词数量>2允许n-1个关键词匹配
participant: 参与人昵称(可选)
start_time: 开始时间(可选,格式如:'2025-01-01''2025-01-01 12:00:00''2025/01/01'。如果只提供start_time查询该时间点之后的记录
end_time: 结束时间(可选,格式如:'2025-01-01''2025-01-01 12:00:00''2025/01/01'。如果只提供end_time查询该时间点之前的记录。如果同时提供start_time和end_time查询该时间段内的记录
Returns:
str: 查询结果包含记忆id、theme和keywords
"""
try:
# 检查参数
if not keyword and not participant and not start_time and not end_time:
return "未指定查询参数需要提供keyword、participant、start_time或end_time之一"
# 解析时间参数
start_timestamp = None
end_timestamp = None
if start_time:
try:
from src.memory_system.memory_utils import parse_datetime_to_timestamp
start_timestamp = parse_datetime_to_timestamp(start_time)
except ValueError as e:
return f"开始时间格式错误: {str(e)},支持格式如:'2025-01-01''2025-01-01 12:00:00''2025/01/01'"
if end_time:
try:
from src.memory_system.memory_utils import parse_datetime_to_timestamp
end_timestamp = parse_datetime_to_timestamp(end_time)
except ValueError as e:
return f"结束时间格式错误: {str(e)},支持格式如:'2025-01-01''2025-01-01 12:00:00''2025/01/01'"
# 验证时间范围
if start_timestamp and end_timestamp and start_timestamp > end_timestamp:
return "开始时间不能晚于结束时间"
# 构建查询条件
# 检查当前chat_id是否在黑名单中
is_current_chat_in_blacklist = _is_chat_id_in_blacklist(chat_id)
# 根据配置决定是否限制在当前 chat_id 内查询
# 如果当前chat_id在黑名单中强制使用本地查询
use_global_search = global_config.memory.global_memory and not is_current_chat_in_blacklist
if use_global_search:
# 全局查询所有聊天记录,但排除黑名单中的聊天流
blacklist_chat_ids = _parse_blacklist_to_chat_ids(global_config.memory.global_memory_blacklist)
if blacklist_chat_ids:
# 排除黑名单中的chat_id
query = ChatHistory.select().where(~(ChatHistory.chat_id.in_(blacklist_chat_ids)))
logger.debug(
f"search_chat_history 启用全局查询模式(排除黑名单 {len(blacklist_chat_ids)} 个聊天流keyword={keyword}, participant={participant}"
)
else:
# 没有黑名单,查询所有
query = ChatHistory.select()
logger.debug(
f"search_chat_history 启用全局查询模式,忽略 chat_id 过滤keyword={keyword}, participant={participant}"
)
else:
# 仅在当前聊天流内查询
if is_current_chat_in_blacklist:
logger.debug(
f"search_chat_history 当前聊天流在黑名单中强制使用本地查询chat_id={chat_id}, keyword={keyword}, participant={participant}"
)
query = ChatHistory.select().where(ChatHistory.chat_id == chat_id)
# 添加时间过滤条件
if start_timestamp is not None and end_timestamp is not None:
# 查询指定时间段内的记录(记录的时间范围与查询时间段有交集)
# 记录的开始时间在查询时间段内,或记录的结束时间在查询时间段内,或记录完全包含查询时间段
query = query.where(
(
(ChatHistory.start_time >= start_timestamp) & (ChatHistory.start_time <= end_timestamp)
) # 记录开始时间在查询时间段内
| (
(ChatHistory.end_time >= start_timestamp) & (ChatHistory.end_time <= end_timestamp)
) # 记录结束时间在查询时间段内
| (
(ChatHistory.start_time <= start_timestamp) & (ChatHistory.end_time >= end_timestamp)
) # 记录完全包含查询时间段
)
logger.debug(
f"search_chat_history 添加时间范围过滤: {start_timestamp} - {end_timestamp}, keyword={keyword}, participant={participant}"
)
elif start_timestamp is not None:
# 只提供开始时间,查询该时间点之后的记录(记录的开始时间或结束时间在该时间点之后)
query = query.where(ChatHistory.end_time >= start_timestamp)
logger.debug(
f"search_chat_history 添加开始时间过滤: >= {start_timestamp}, keyword={keyword}, participant={participant}"
)
elif end_timestamp is not None:
# 只提供结束时间,查询该时间点之前的记录(记录的开始时间或结束时间在该时间点之前)
query = query.where(ChatHistory.start_time <= end_timestamp)
logger.debug(
f"search_chat_history 添加结束时间过滤: <= {end_timestamp}, keyword={keyword}, participant={participant}"
)
# 执行查询
records = list(query.order_by(ChatHistory.start_time.desc()).limit(50))
filtered_records = []
for record in records:
participant_matched = True # 如果没有participant条件默认为True
keyword_matched = True # 如果没有keyword条件默认为True
# 检查参与人匹配
if participant:
participant_matched = False
participants_list = []
if record.participants:
try:
participants_data = (
json.loads(record.participants)
if isinstance(record.participants, str)
else record.participants
)
if isinstance(participants_data, list):
participants_list = [str(p).lower() for p in participants_data]
except (json.JSONDecodeError, TypeError, ValueError):
pass
participant_lower = participant.lower().strip()
if participant_lower and any(participant_lower in p for p in participants_list):
participant_matched = True
# 检查关键词匹配
if keyword:
keyword_matched = False
# 解析多个关键词(支持空格、逗号等分隔符)
keywords_list = parse_keywords_string(keyword) or (
[keyword.strip()] if keyword.strip() else []
)
# 转换为小写以便匹配
keywords_lower = [kw.lower() for kw in keywords_list if kw.strip()]
if keywords_lower:
# 在theme、keywords、summary、original_text中搜索
theme = (record.theme or "").lower()
summary = (record.summary or "").lower()
original_text = (record.original_text or "").lower()
# 解析record中的keywords JSON
record_keywords_list = []
if record.keywords:
try:
keywords_data = (
json.loads(record.keywords) if isinstance(record.keywords, str) else record.keywords
)
if isinstance(keywords_data, list):
record_keywords_list = [str(k).lower() for k in keywords_data]
except (json.JSONDecodeError, TypeError, ValueError):
pass
# 有容错的全匹配:如果关键词数量>2允许n-1个关键词匹配否则必须全部匹配
matched_count = 0
for kw in keywords_lower:
kw_matched = (
kw in theme
or kw in summary
or kw in original_text
or any(kw in k for k in record_keywords_list)
)
if kw_matched:
matched_count += 1
# 计算需要匹配的关键词数量
total_keywords = len(keywords_lower)
if total_keywords > 2:
# 关键词数量>2允许n-1个关键词匹配
required_matches = total_keywords - 1
else:
# 关键词数量<=2必须全部匹配
required_matches = total_keywords
keyword_matched = matched_count >= required_matches
# 两者都匹配如果同时有participant和keyword需要两者都匹配如果只有一个条件只需要该条件匹配
matched = participant_matched and keyword_matched
if matched:
filtered_records.append(record)
if not filtered_records:
# 构建查询条件描述
conditions = []
if keyword:
keywords_str = "".join(parse_keywords_string(keyword))
conditions.append(f"关键词'{keywords_str}'")
if participant:
conditions.append(f"参与人'{participant}'")
if start_timestamp or end_timestamp:
time_desc = ""
if start_timestamp and end_timestamp:
start_str = datetime.fromtimestamp(start_timestamp).strftime("%Y-%m-%d %H:%M:%S")
end_str = datetime.fromtimestamp(end_timestamp).strftime("%Y-%m-%d %H:%M:%S")
time_desc = f"时间范围'{start_str}''{end_str}'"
elif start_timestamp:
start_str = datetime.fromtimestamp(start_timestamp).strftime("%Y-%m-%d %H:%M:%S")
time_desc = f"时间>='{start_str}'"
elif end_timestamp:
end_str = datetime.fromtimestamp(end_timestamp).strftime("%Y-%m-%d %H:%M:%S")
time_desc = f"时间<='{end_str}'"
if time_desc:
conditions.append(time_desc)
if conditions:
conditions_str = "".join(conditions)
return f"未找到满足条件({conditions_str})的聊天记录"
else:
return "未找到相关聊天记录"
# 如果匹配结果超过20条不返回具体记录只返回提示和所有相关关键词
if len(filtered_records) > 15:
# 统计所有记录上的关键词并去重
all_keywords_set = set()
for record in filtered_records:
if record.keywords:
try:
keywords_data = (
json.loads(record.keywords) if isinstance(record.keywords, str) else record.keywords
)
if isinstance(keywords_data, list):
for k in keywords_data:
k_str = str(k).strip()
if k_str:
all_keywords_set.add(k_str)
except (json.JSONDecodeError, TypeError, ValueError):
continue
# xxx 使用用户原始查询词,优先 keyword其次 participant最后退化成“当前条件”
search_label = keyword or participant or "当前条件"
if all_keywords_set:
keywords_str = "".join(sorted(all_keywords_set))
return (
f"包含“{search_label}”的结果过多,请尝试更多关键词精确查找\n\n"
f'有关"{search_label}"的关键词:\n'
f"{keywords_str}"
)
else:
return (
f'包含“{search_label}”的结果过多,请尝试更多关键词精确查找\n\n有关"{search_label}"的关键词信息为空'
)
# 构建结果文本返回id、theme和keywords最多20条
results = []
for record in filtered_records[:20]:
result_parts = []
# 添加记忆ID
result_parts.append(f"记忆ID{record.id}")
# 添加主题
if record.theme:
result_parts.append(f"主题:{record.theme}")
else:
result_parts.append("主题:(无)")
# 添加关键词
if record.keywords:
try:
keywords_data = json.loads(record.keywords) if isinstance(record.keywords, str) else record.keywords
if isinstance(keywords_data, list) and keywords_data:
keywords_str = "".join([str(k) for k in keywords_data])
result_parts.append(f"关键词:{keywords_str}")
else:
result_parts.append("关键词:(无)")
except (json.JSONDecodeError, TypeError, ValueError):
result_parts.append("关键词:(无)")
else:
result_parts.append("关键词:(无)")
results.append("\n".join(result_parts))
if not results:
return "未找到相关聊天记录"
response_text = "\n\n---\n\n".join(results)
return response_text
except Exception as e:
logger.error(f"查询聊天历史概述失败: {e}")
return f"查询失败: {str(e)}"
async def get_chat_history_detail(chat_id: str, memory_ids: str) -> str:
"""根据记忆ID展示某条或某几条记忆的具体内容
Args:
chat_id: 聊天ID
memory_ids: 记忆ID可以是单个ID"123"或多个ID用逗号分隔"1,2,3"
Returns:
str: 记忆的详细内容
"""
try:
# 解析memory_ids
id_list = []
# 尝试解析为逗号分隔的ID列表
try:
id_list = [int(id_str.strip()) for id_str in memory_ids.split(",") if id_str.strip()]
except ValueError:
return f"无效的记忆ID格式: {memory_ids}请使用数字ID多个ID用逗号分隔'123''123,456'"
if not id_list:
return "未提供有效的记忆ID"
# 查询记录
query = ChatHistory.select().where((ChatHistory.chat_id == chat_id) & (ChatHistory.id.in_(id_list)))
records = list(query.order_by(ChatHistory.start_time.desc()))
if not records:
return f"未找到ID为{id_list}的记忆记录可能ID不存在或不属于当前聊天"
# 对即将返回的记录增加使用计数
for record in records:
try:
ChatHistory.update(count=ChatHistory.count + 1).where(ChatHistory.id == record.id).execute()
record.count = (record.count or 0) + 1
except Exception as update_error:
logger.error(f"更新聊天记录概述计数失败: {update_error}")
# 构建详细结果
results = []
for record in records:
result_parts = []
# 添加记忆ID
result_parts.append(f"记忆ID{record.id}")
# 添加主题
if record.theme:
result_parts.append(f"主题:{record.theme}")
# 添加时间范围
start_str = datetime.fromtimestamp(record.start_time).strftime("%Y-%m-%d %H:%M:%S")
end_str = datetime.fromtimestamp(record.end_time).strftime("%Y-%m-%d %H:%M:%S")
result_parts.append(f"时间:{start_str} - {end_str}")
# 添加参与人
if record.participants:
try:
participants_data = (
json.loads(record.participants) if isinstance(record.participants, str) else record.participants
)
if isinstance(participants_data, list) and participants_data:
participants_str = "".join([str(p) for p in participants_data])
result_parts.append(f"参与人:{participants_str}")
except (json.JSONDecodeError, TypeError, ValueError):
pass
# 添加关键词
if record.keywords:
try:
keywords_data = json.loads(record.keywords) if isinstance(record.keywords, str) else record.keywords
if isinstance(keywords_data, list) and keywords_data:
keywords_str = "".join([str(k) for k in keywords_data])
result_parts.append(f"关键词:{keywords_str}")
except (json.JSONDecodeError, TypeError, ValueError):
pass
# 添加概括
if record.summary:
result_parts.append(f"概括:{record.summary}")
results.append("\n".join(result_parts))
if not results:
return "未找到相关记忆记录"
response_text = "\n\n" + "=" * 50 + "\n\n".join(results)
return response_text
except Exception as e:
logger.error(f"获取记忆详情失败: {e}")
return f"查询失败: {str(e)}"
def register_tool():
"""注册工具"""
# 注册工具1搜索记忆
register_memory_retrieval_tool(
name="search_chat_history",
description="根据关键词或参与人查询记忆返回匹配的记忆id、记忆标题theme和关键词keywords。用于快速搜索和定位相关记忆。匹配规则如果关键词数量<=2必须全部匹配如果关键词数量>2允许n-1个关键词匹配容错匹配。支持按时间点或时间段进行查询。",
parameters=[
{
"name": "keyword",
"type": "string",
"description": "关键词(可选,支持多个关键词,可用空格、逗号、斜杠等分隔,如:'麦麦 百度网盘''麦麦,百度网盘'。用于在主题、关键词、概括、原文中搜索。匹配规则:如果关键词数量<=2必须全部匹配如果关键词数量>2允许n-1个关键词匹配",
"required": False,
},
{
"name": "participant",
"type": "string",
"description": "参与人昵称(可选),用于查询包含该参与人的记忆",
"required": False,
},
{
"name": "start_time",
"type": "string",
"description": "开始时间(可选),格式如:'2025-01-01''2025-01-01 12:00:00''2025/01/01'。如果只提供start_time查询该时间点之后的记录。如果同时提供start_time和end_time查询该时间段内的记录",
"required": False,
},
{
"name": "end_time",
"type": "string",
"description": "结束时间(可选),格式如:'2025-01-01''2025-01-01 12:00:00''2025/01/01'。如果只提供end_time查询该时间点之前的记录。如果同时提供start_time和end_time查询该时间段内的记录",
"required": False,
},
],
execute_func=search_chat_history,
)
# 注册工具2获取记忆详情
register_memory_retrieval_tool(
name="get_chat_history_detail",
description="根据记忆ID展示某条或某几条记忆的具体内容。包括主题、时间、参与人、关键词、概括和关键信息点等详细信息。需要先使用search_chat_history工具获取记忆ID。",
parameters=[
{
"name": "memory_ids",
"type": "string",
"description": "记忆ID可以是单个ID'123'或多个ID用逗号分隔'123,456,789'",
"required": True,
},
],
execute_func=get_chat_history_detail,
)

View File

@@ -1,326 +0,0 @@
"""
根据person_name查询用户信息 - 工具实现
支持模糊查询,可以查询某个用户的所有信息
"""
import json
from datetime import datetime
from src.common.logger import get_logger
from src.common.database.database_model import PersonInfo
from .tool_registry import register_memory_retrieval_tool
logger = get_logger("memory_retrieval_tools")
def _calculate_similarity(query: str, target: str) -> float:
"""计算查询词在目标字符串中的相似度比例
Args:
query: 查询词
target: 目标字符串
Returns:
float: 相似度比例0.0-1.0),查询词长度 / 目标字符串长度
"""
if not query or not target:
return 0.0
query_len = len(query)
target_len = len(target)
if target_len == 0:
return 0.0
return query_len / target_len
def _format_group_nick_names(group_nick_name_field) -> str:
"""格式化群昵称信息
Args:
group_nick_name_field: 群昵称字段可能是字符串JSON或None
Returns:
str: 格式化后的群昵称信息字符串
"""
if not group_nick_name_field:
return ""
try:
# 解析JSON格式的群昵称列表
group_nick_names_data = (
json.loads(group_nick_name_field) if isinstance(group_nick_name_field, str) else group_nick_name_field
)
if not isinstance(group_nick_names_data, list) or not group_nick_names_data:
return ""
# 格式化群昵称列表
group_nick_list = []
for item in group_nick_names_data:
if isinstance(item, dict):
group_id = item.get("group_id", "未知群号")
group_nick_name = item.get("group_nick_name", "未知群昵称")
group_nick_list.append(f" - 群号 {group_id}{group_nick_name}")
elif isinstance(item, str):
# 兼容旧格式(如果存在)
group_nick_list.append(f" - {item}")
if group_nick_list:
return "群昵称:\n" + "\n".join(group_nick_list)
return ""
except (json.JSONDecodeError, TypeError, ValueError) as e:
logger.warning(f"解析群昵称信息失败: {e}")
# 如果解析失败,尝试显示原始内容(截断)
if isinstance(group_nick_name_field, str):
preview = group_nick_name_field[:200]
if len(group_nick_name_field) > 200:
preview += "..."
return f"群昵称(原始数据):{preview}"
return ""
async def query_person_info(person_name: str) -> str:
"""根据person_name查询用户信息使用模糊查询
Args:
person_name: 用户名称person_name字段
Returns:
str: 查询结果,包含用户的所有信息
"""
try:
person_name = str(person_name).strip()
if not person_name:
return "用户名称为空"
# 构建查询条件(使用模糊查询)
query = PersonInfo.select().where(PersonInfo.person_name.contains(person_name))
# 执行查询
records = list(query.limit(20)) # 最多返回20条记录
if not records:
return f"未找到模糊匹配'{person_name}'的用户信息"
# 根据相似度过滤结果查询词在目标字符串中至少占50%
SIMILARITY_THRESHOLD = 0.5
filtered_records = []
for record in records:
if not record.person_name:
continue
# 精确匹配总是保留相似度100%
if record.person_name.strip() == person_name:
filtered_records.append(record)
else:
# 模糊匹配需要检查相似度
similarity = _calculate_similarity(person_name, record.person_name.strip())
if similarity >= SIMILARITY_THRESHOLD:
filtered_records.append(record)
if not filtered_records:
return f"未找到相似度≥50%的匹配'{person_name}'的用户信息"
# 区分精确匹配和模糊匹配的结果
exact_matches = []
fuzzy_matches = []
for record in filtered_records:
# 检查是否是精确匹配
if record.person_name and record.person_name.strip() == person_name:
exact_matches.append(record)
else:
fuzzy_matches.append(record)
# 构建结果文本
results = []
# 先处理精确匹配的结果
for record in exact_matches:
result_parts = []
result_parts.append("【精确匹配】") # 标注为精确匹配
# 基本信息
if record.person_name:
result_parts.append(f"用户名称:{record.person_name}")
if record.nickname:
result_parts.append(f"昵称:{record.nickname}")
if record.person_id:
result_parts.append(f"用户ID{record.person_id}")
if record.platform:
result_parts.append(f"平台:{record.platform}")
if record.user_id:
result_parts.append(f"平台用户ID{record.user_id}")
# 群昵称信息
group_nick_name_str = _format_group_nick_names(getattr(record, "group_nick_name", None))
if group_nick_name_str:
result_parts.append(group_nick_name_str)
# 名称设定原因
if record.name_reason:
result_parts.append(f"名称设定原因:{record.name_reason}")
# 认识状态
result_parts.append(f"是否已认识:{'' if record.is_known else ''}")
# 时间信息
if record.know_since:
know_since_str = datetime.fromtimestamp(record.know_since).strftime("%Y-%m-%d %H:%M:%S")
result_parts.append(f"首次认识时间:{know_since_str}")
if record.last_know:
last_know_str = datetime.fromtimestamp(record.last_know).strftime("%Y-%m-%d %H:%M:%S")
result_parts.append(f"最后认识时间:{last_know_str}")
if record.know_times:
result_parts.append(f"认识次数:{int(record.know_times)}")
# 记忆点memory_points
if record.memory_points:
try:
memory_points_data = (
json.loads(record.memory_points)
if isinstance(record.memory_points, str)
else record.memory_points
)
if isinstance(memory_points_data, list) and memory_points_data:
# 解析记忆点格式category:content:weight
memory_list = []
for memory_point in memory_points_data:
if memory_point and isinstance(memory_point, str):
parts = memory_point.split(":", 2)
if len(parts) >= 3:
category = parts[0].strip()
content = parts[1].strip()
weight = parts[2].strip()
memory_list.append(f" - [{category}] {content} (权重: {weight})")
else:
memory_list.append(f" - {memory_point}")
if memory_list:
result_parts.append("记忆点:\n" + "\n".join(memory_list))
except (json.JSONDecodeError, TypeError, ValueError) as e:
logger.warning(f"解析用户 {record.person_id} 的memory_points失败: {e}")
# 如果解析失败,直接显示原始内容(截断)
memory_preview = str(record.memory_points)[:200]
if len(str(record.memory_points)) > 200:
memory_preview += "..."
result_parts.append(f"记忆点(原始数据):{memory_preview}")
results.append("\n".join(result_parts))
# 再处理模糊匹配的结果
for record in fuzzy_matches:
result_parts = []
result_parts.append("【模糊匹配】") # 标注为模糊匹配
# 基本信息
if record.person_name:
result_parts.append(f"用户名称:{record.person_name}")
if record.nickname:
result_parts.append(f"昵称:{record.nickname}")
if record.person_id:
result_parts.append(f"用户ID{record.person_id}")
if record.platform:
result_parts.append(f"平台:{record.platform}")
if record.user_id:
result_parts.append(f"平台用户ID{record.user_id}")
# 群昵称信息
group_nick_name_str = _format_group_nick_names(getattr(record, "group_nick_name", None))
if group_nick_name_str:
result_parts.append(group_nick_name_str)
# 名称设定原因
if record.name_reason:
result_parts.append(f"名称设定原因:{record.name_reason}")
# 认识状态
result_parts.append(f"是否已认识:{'' if record.is_known else ''}")
# 时间信息
if record.know_since:
know_since_str = datetime.fromtimestamp(record.know_since).strftime("%Y-%m-%d %H:%M:%S")
result_parts.append(f"首次认识时间:{know_since_str}")
if record.last_know:
last_know_str = datetime.fromtimestamp(record.last_know).strftime("%Y-%m-%d %H:%M:%S")
result_parts.append(f"最后认识时间:{last_know_str}")
if record.know_times:
result_parts.append(f"认识次数:{int(record.know_times)}")
# 记忆点memory_points
if record.memory_points:
try:
memory_points_data = (
json.loads(record.memory_points)
if isinstance(record.memory_points, str)
else record.memory_points
)
if isinstance(memory_points_data, list) and memory_points_data:
# 解析记忆点格式category:content:weight
memory_list = []
for memory_point in memory_points_data:
if memory_point and isinstance(memory_point, str):
parts = memory_point.split(":", 2)
if len(parts) >= 3:
category = parts[0].strip()
content = parts[1].strip()
weight = parts[2].strip()
memory_list.append(f" - [{category}] {content} (权重: {weight})")
else:
memory_list.append(f" - {memory_point}")
if memory_list:
result_parts.append("记忆点:\n" + "\n".join(memory_list))
except (json.JSONDecodeError, TypeError, ValueError) as e:
logger.warning(f"解析用户 {record.person_id} 的memory_points失败: {e}")
# 如果解析失败,直接显示原始内容(截断)
memory_preview = str(record.memory_points)[:200]
if len(str(record.memory_points)) > 200:
memory_preview += "..."
result_parts.append(f"记忆点(原始数据):{memory_preview}")
results.append("\n".join(result_parts))
# 组合所有结果
if not results:
return f"未找到匹配'{person_name}'的用户信息"
response_text = "\n\n---\n\n".join(results)
# 添加统计信息
total_count = len(filtered_records)
exact_count = len(exact_matches)
fuzzy_count = len(fuzzy_matches)
# 显示精确匹配和模糊匹配的统计
if exact_count > 0 or fuzzy_count > 0:
stats_parts = []
if exact_count > 0:
stats_parts.append(f"精确匹配:{exact_count}")
if fuzzy_count > 0:
stats_parts.append(f"模糊匹配:{fuzzy_count}")
stats_text = "".join(stats_parts)
response_text = f"找到 {total_count} 条匹配的用户信息({stats_text}\n\n{response_text}"
elif total_count > 1:
response_text = f"找到 {total_count} 条匹配的用户信息:\n\n{response_text}"
else:
response_text = f"找到用户信息:\n\n{response_text}"
# 如果结果数量达到限制,添加提示
if total_count >= 20:
response_text += "\n\n(已显示前20条结果可能还有更多匹配记录)"
return response_text
except Exception as e:
logger.error(f"查询用户信息失败: {e}")
return f"查询失败: {str(e)}"
def register_tool():
"""注册工具"""
register_memory_retrieval_tool(
name="query_person_info",
description="根据查询某个用户的所有信息。名称、昵称、平台、用户ID、qq号、群昵称等",
parameters=[
{"name": "person_name", "type": "string", "description": "用户名称,用于查询用户信息", "required": True}
],
execute_func=query_person_info,
)