feat:新增记忆唤醒流程

This commit is contained in:
SengokuCola
2025-05-12 12:50:08 +08:00
parent d7c5f7031c
commit 319352294b
34 changed files with 183 additions and 460 deletions

View File

@@ -0,0 +1,102 @@
# 工具系统使用指南
## 概述
`tool_can_use` 是一个插件式工具系统,允许轻松扩展和注册新工具。每个工具作为独立的文件存在于该目录下,系统会自动发现和注册这些工具。
## 工具结构
每个工具应该继承 `BaseTool` 基类并实现必要的属性和方法:
```python
from src.tools.tool_can_use.base_tool import BaseTool, register_tool
class MyNewTool(BaseTool):
# 工具名称,必须唯一
name = "my_new_tool"
# 工具描述告诉LLM这个工具的用途
description = "这是一个新工具,用于..."
# 工具参数定义遵循JSONSchema格式
parameters = {
"type": "object",
"properties": {
"param1": {
"type": "string",
"description": "参数1的描述"
},
"param2": {
"type": "integer",
"description": "参数2的描述"
}
},
"required": ["param1"] # 必需的参数列表
}
async def execute(self, function_args, message_txt=""):
"""执行工具逻辑
Args:
function_args: 工具调用参数
message_txt: 原始消息文本
Returns:
dict: 包含执行结果的字典必须包含name和content字段
"""
# 实现工具逻辑
result = f"工具执行结果: {function_args.get('param1')}"
return {
"name": self.name,
"content": result
}
# 注册工具
register_tool(MyNewTool)
```
## 自动注册机制
工具系统通过以下步骤自动注册工具:
1.`__init__.py`中,`discover_tools()`函数会自动遍历当前目录中的所有Python文件
2. 对于每个文件,系统会寻找继承自`BaseTool`的类
3. 这些类会被自动注册到工具注册表中
只要确保在每个工具文件的末尾调用`register_tool(YourToolClass)`,工具就会被自动注册。
## 添加新工具步骤
1.`tool_can_use`目录下创建新的Python文件`my_new_tool.py`
2. 导入`BaseTool``register_tool`
3. 创建继承自`BaseTool`的工具类
4. 实现必要的属性(`name`, `description`, `parameters`
5. 实现`execute`方法
6. 使用`register_tool`注册工具
## 与ToolUser整合
`ToolUser`类已经更新为使用这个新的工具系统,它会:
1. 自动获取所有已注册工具的定义
2. 基于工具名称找到对应的工具实例
3. 调用工具的`execute`方法
## 使用示例
```python
from src.tools.tool_use import ToolUser
# 创建工具用户
tool_user = ToolUser()
# 使用工具
result = await tool_user.use_tool(message_txt="查询关于Python的知识", sender_name="用户", chat_stream=chat_stream)
# 处理结果
if result["used_tools"]:
print("工具使用结果:", result["collected_info"])
else:
print("未使用工具")
```

View File

@@ -0,0 +1,20 @@
from src.tools.tool_can_use.base_tool import (
BaseTool,
register_tool,
discover_tools,
get_all_tool_definitions,
get_tool_instance,
TOOL_REGISTRY,
)
__all__ = [
"BaseTool",
"register_tool",
"discover_tools",
"get_all_tool_definitions",
"get_tool_instance",
"TOOL_REGISTRY",
]
# 自动发现并注册工具
discover_tools()

View File

@@ -0,0 +1,115 @@
from typing import List, Any, Optional, Type
import inspect
import importlib
import pkgutil
import os
from src.common.logger_manager import get_logger
from rich.traceback import install
install(extra_lines=3)
logger = get_logger("base_tool")
# 工具注册表
TOOL_REGISTRY = {}
class BaseTool:
"""所有工具的基类"""
# 工具名称,子类必须重写
name = None
# 工具描述,子类必须重写
description = None
# 工具参数定义,子类必须重写
parameters = None
@classmethod
def get_tool_definition(cls) -> dict[str, Any]:
"""获取工具定义用于LLM工具调用
Returns:
dict: 工具定义字典
"""
if not cls.name or not cls.description or not cls.parameters:
raise NotImplementedError(f"工具类 {cls.__name__} 必须定义 name, description 和 parameters 属性")
return {
"type": "function",
"function": {"name": cls.name, "description": cls.description, "parameters": cls.parameters},
}
async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]:
"""执行工具函数
Args:
function_args: 工具调用参数
Returns:
dict: 工具执行结果
"""
raise NotImplementedError("子类必须实现execute方法")
def register_tool(tool_class: Type[BaseTool]):
"""注册工具到全局注册表
Args:
tool_class: 工具类
"""
if not issubclass(tool_class, BaseTool):
raise TypeError(f"{tool_class.__name__} 不是 BaseTool 的子类")
tool_name = tool_class.name
if not tool_name:
raise ValueError(f"工具类 {tool_class.__name__} 没有定义 name 属性")
TOOL_REGISTRY[tool_name] = tool_class
logger.info(f"已注册: {tool_name}")
def discover_tools():
"""自动发现并注册tool_can_use目录下的所有工具"""
# 获取当前目录路径
current_dir = os.path.dirname(os.path.abspath(__file__))
package_name = os.path.basename(current_dir)
# 遍历包中的所有模块
for _, module_name, _ in pkgutil.iter_modules([current_dir]):
# 跳过当前模块和__pycache__
if module_name == "base_tool" or module_name.startswith("__"):
continue
# 导入模块
module = importlib.import_module(f"src.tools.{package_name}.{module_name}")
# 查找模块中的工具类
for _, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj != BaseTool:
register_tool(obj)
logger.info(f"工具发现完成,共注册 {len(TOOL_REGISTRY)} 个工具")
def get_all_tool_definitions() -> List[dict[str, Any]]:
"""获取所有已注册工具的定义
Returns:
List[dict]: 工具定义列表
"""
return [tool_class().get_tool_definition() for tool_class in TOOL_REGISTRY.values()]
def get_tool_instance(tool_name: str) -> Optional[BaseTool]:
"""获取指定名称的工具实例
Args:
tool_name: 工具名称
Returns:
Optional[BaseTool]: 工具实例如果找不到则返回None
"""
tool_class = TOOL_REGISTRY.get(tool_name)
if not tool_class:
return None
return tool_class()

View File

@@ -0,0 +1,49 @@
from src.tools.tool_can_use.base_tool import BaseTool
from src.common.logger import get_module_logger
from typing import Any
logger = get_module_logger("compare_numbers_tool")
class CompareNumbersTool(BaseTool):
"""比较两个数大小的工具"""
name = "compare_numbers"
description = "使用工具 比较两个数的大小,返回较大的数"
parameters = {
"type": "object",
"properties": {
"num1": {"type": "number", "description": "第一个数字"},
"num2": {"type": "number", "description": "第二个数字"},
},
"required": ["num1", "num2"],
}
async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]:
"""执行比较两个数的大小
Args:
function_args: 工具参数
Returns:
dict: 工具执行结果
"""
try:
num1 = function_args.get("num1")
num2 = function_args.get("num2")
if num1 > num2:
result = f"{num1} 大于 {num2}"
elif num1 < num2:
result = f"{num1} 小于 {num2}"
else:
result = f"{num1} 等于 {num2}"
return {"type": "comparison_result", "id": f"{num1}_vs_{num2}", "content": result}
except Exception as e:
logger.error(f"比较数字失败: {str(e)}")
return {"type": "info", "id": f"{num1}_vs_{num2}", "content": f"比较数字失败,炸了: {str(e)}"}
# 注册工具
# register_tool(CompareNumbersTool)

View File

@@ -0,0 +1,135 @@
from src.tools.tool_can_use.base_tool import BaseTool
from src.plugins.chat.utils import get_embedding
from src.common.database import db
from src.common.logger_manager import get_logger
from typing import Any, Union
logger = get_logger("get_knowledge_tool")
class SearchKnowledgeTool(BaseTool):
"""从知识库中搜索相关信息的工具"""
name = "search_knowledge"
description = "使用工具从知识库中搜索相关信息"
parameters = {
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索查询关键词"},
"threshold": {"type": "number", "description": "相似度阈值0.0到1.0之间"},
},
"required": ["query"],
}
async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]:
"""执行知识库搜索
Args:
function_args: 工具参数
Returns:
dict: 工具执行结果
"""
try:
query = function_args.get("query")
threshold = function_args.get("threshold", 0.4)
# 调用知识库搜索
embedding = await get_embedding(query, request_type="info_retrieval")
if embedding:
knowledge_info = self.get_info_from_db(embedding, limit=3, threshold=threshold)
if knowledge_info:
content = f"你知道这些知识: {knowledge_info}"
else:
content = f"你不太了解有关{query}的知识"
return {"type": "knowledge", "id": query, "content": content}
return {"type": "info", "id": query, "content": f"无法获取关于'{query}'的嵌入向量,你知识库炸了"}
except Exception as e:
logger.error(f"知识库搜索工具执行失败: {str(e)}")
return {"type": "info", "id": query, "content": f"知识库搜索失败,炸了: {str(e)}"}
@staticmethod
def get_info_from_db(
query_embedding: list, limit: int = 1, threshold: float = 0.5, return_raw: bool = False
) -> Union[str, list]:
"""从数据库中获取相关信息
Args:
query_embedding: 查询的嵌入向量
limit: 最大返回结果数
threshold: 相似度阈值
return_raw: 是否返回原始结果
Returns:
Union[str, list]: 格式化的信息字符串或原始结果列表
"""
if not query_embedding:
return "" if not return_raw else []
# 使用余弦相似度计算
pipeline = [
{
"$addFields": {
"dotProduct": {
"$reduce": {
"input": {"$range": [0, {"$size": "$embedding"}]},
"initialValue": 0,
"in": {
"$add": [
"$$value",
{
"$multiply": [
{"$arrayElemAt": ["$embedding", "$$this"]},
{"$arrayElemAt": [query_embedding, "$$this"]},
]
},
]
},
}
},
"magnitude1": {
"$sqrt": {
"$reduce": {
"input": "$embedding",
"initialValue": 0,
"in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]},
}
}
},
"magnitude2": {
"$sqrt": {
"$reduce": {
"input": query_embedding,
"initialValue": 0,
"in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]},
}
}
},
}
},
{"$addFields": {"similarity": {"$divide": ["$dotProduct", {"$multiply": ["$magnitude1", "$magnitude2"]}]}}},
{
"$match": {
"similarity": {"$gte": threshold} # 只保留相似度大于等于阈值的结果
}
},
{"$sort": {"similarity": -1}},
{"$limit": limit},
{"$project": {"content": 1, "similarity": 1}},
]
results = list(db.knowledges.aggregate(pipeline))
logger.debug(f"知识库查询结果数量: {len(results)}")
if not results:
return "" if not return_raw else []
if return_raw:
return results
else:
# 返回所有找到的内容,用换行分隔
return "\n".join(str(result["content"]) for result in results)
# 注册工具
# register_tool(SearchKnowledgeTool)

View File

@@ -0,0 +1,39 @@
from src.tools.tool_can_use.base_tool import BaseTool
from src.common.logger_manager import get_logger
from typing import Dict, Any
from datetime import datetime
import time
logger = get_logger("get_time_date")
class GetCurrentDateTimeTool(BaseTool):
"""获取当前时间、日期、年份和星期的工具"""
name = "get_current_date_time"
description = "当有人询问或者涉及到具体时间或者日期的时候,必须使用这个工具"
parameters = {
"type": "object",
"properties": {},
"required": [],
}
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
"""执行获取当前时间、日期、年份和星期
Args:
function_args: 工具参数(此工具不使用)
Returns:
Dict: 工具执行结果
"""
current_time = datetime.now().strftime("%H:%M:%S")
current_date = datetime.now().strftime("%Y-%m-%d")
current_year = datetime.now().strftime("%Y")
current_weekday = datetime.now().strftime("%A")
return {
"type": "time_info",
"id": f"time_info_{time.time()}",
"content": f"当前时间: {current_time}, 日期: {current_date}, 年份: {current_year}, 星期: {current_weekday}",
}

View File

@@ -0,0 +1,162 @@
from src.tools.tool_can_use.base_tool import BaseTool
from src.plugins.chat.utils import get_embedding
# from src.common.database import db
from src.common.logger_manager import get_logger
from typing import Dict, Any
from src.plugins.knowledge.knowledge_lib import qa_manager
logger = get_logger("lpmm_get_knowledge_tool")
class SearchKnowledgeFromLPMMTool(BaseTool):
"""从LPMM知识库中搜索相关信息的工具"""
name = "lpmm_search_knowledge"
description = "从知识库中搜索相关信息,如果你需要知识,就使用这个工具"
parameters = {
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索查询关键词"},
"threshold": {"type": "number", "description": "相似度阈值0.0到1.0之间"},
},
"required": ["query"],
}
async def execute(self, function_args: Dict[str, Any]) -> Dict[str, Any]:
"""执行知识库搜索
Args:
function_args: 工具参数
Returns:
Dict: 工具执行结果
"""
try:
query = function_args.get("query")
# threshold = function_args.get("threshold", 0.4)
# 调用知识库搜索
embedding = await get_embedding(query, request_type="info_retrieval")
if embedding:
knowledge_info = qa_manager.get_knowledge(query)
logger.debug(f"知识库查询结果: {knowledge_info}")
if knowledge_info:
content = f"你知道这些知识: {knowledge_info}"
else:
content = f"你不太了解有关{query}的知识"
return {"type": "lpmm_knowledge", "id": query, "content": content}
# 如果获取嵌入失败
return {"type": "info", "id": query, "content": f"无法获取关于'{query}'的嵌入向量你lpmm知识库炸了"}
except Exception as e:
logger.error(f"知识库搜索工具执行失败: {str(e)}")
# 在其他异常情况下,确保 id 仍然是 query (如果它被定义了)
query_id = query if "query" in locals() else "unknown_query"
return {"type": "info", "id": query_id, "content": f"lpmm知识库搜索失败炸了: {str(e)}"}
# def get_info_from_db(
# self, query_embedding: list, limit: int = 1, threshold: float = 0.5, return_raw: bool = False
# ) -> Union[str, list]:
# """从数据库中获取相关信息
# Args:
# query_embedding: 查询的嵌入向量
# limit: 最大返回结果数
# threshold: 相似度阈值
# return_raw: 是否返回原始结果
# Returns:
# Union[str, list]: 格式化的信息字符串或原始结果列表
# """
# if not query_embedding:
# return "" if not return_raw else []
# # 使用余弦相似度计算
# pipeline = [
# {
# "$addFields": {
# "dotProduct": {
# "$reduce": {
# "input": {"$range": [0, {"$size": "$embedding"}]},
# "initialValue": 0,
# "in": {
# "$add": [
# "$$value",
# {
# "$multiply": [
# {"$arrayElemAt": ["$embedding", "$$this"]},
# {"$arrayElemAt": [query_embedding, "$$this"]},
# ]
# },
# ]
# },
# }
# },
# "magnitude1": {
# "$sqrt": {
# "$reduce": {
# "input": "$embedding",
# "initialValue": 0,
# "in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]},
# }
# }
# },
# "magnitude2": {
# "$sqrt": {
# "$reduce": {
# "input": query_embedding,
# "initialValue": 0,
# "in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]},
# }
# }
# },
# }
# },
# {"$addFields": {"similarity": {"$divide": ["$dotProduct", {"$multiply": ["$magnitude1", "$magnitude2"]}]}}},
# {
# "$match": {
# "similarity": {"$gte": threshold} # 只保留相似度大于等于阈值的结果
# }
# },
# {"$sort": {"similarity": -1}},
# {"$limit": limit},
# {"$project": {"content": 1, "similarity": 1}},
# ]
# results = list(db.knowledges.aggregate(pipeline))
# logger.debug(f"知识库查询结果数量: {len(results)}")
# if not results:
# return "" if not return_raw else []
# if return_raw:
# return results
# else:
# # 返回所有找到的内容,用换行分隔
# return "\n".join(str(result["content"]) for result in results)
def _format_results(self, results: list) -> str:
"""格式化结果"""
if not results:
return "未找到相关知识。"
formatted_string = "我找到了一些相关知识:\n"
for i, result in enumerate(results):
# chunk_id = result.get("chunk_id")
text = result.get("text", "")
source = result.get("source", "未知来源")
source_type = result.get("source_type", "未知类型")
similarity = result.get("similarity", 0.0)
formatted_string += (
f"{i + 1}. (相似度: {similarity:.2f}) 类型: {source_type}, 来源: {source} \n内容片段: {text}\n\n"
)
# 暂时去掉chunk_id
# formatted_string += f"{i + 1}. (相似度: {similarity:.2f}) 类型: {source_type}, 来源: {source}, Chunk ID: {chunk_id} \n内容片段: {text}\n\n"
return formatted_string
# 注册工具
# register_tool(SearchKnowledgeTool)

View File

@@ -0,0 +1,107 @@
from src.tools.tool_can_use.base_tool import BaseTool, register_tool
from src.plugins.person_info.person_info import person_info_manager
from src.common.logger_manager import get_logger
import time
logger = get_logger("rename_person_tool")
class RenamePersonTool(BaseTool):
name = "rename_person"
description = (
"这个工具可以改变用户的昵称。你可以选择改变对他人的称呼。你想给人改名,叫别人别的称呼,需要调用这个工具。"
)
parameters = {
"type": "object",
"properties": {
"person_name": {"type": "string", "description": "需要重新取名的用户的当前昵称"},
"message_content": {
"type": "string",
"description": "当前的聊天内容或特定要求,用于提供取名建议的上下文,尽可能详细。",
},
},
"required": ["person_name"],
}
async def execute(self, function_args: dict, message_txt=""):
"""
执行取名工具逻辑
Args:
function_args (dict): 包含 'person_name' 和可选 'message_content' 的字典
message_txt (str): 原始消息文本 (这里未使用,因为 message_content 更明确)
Returns:
dict: 包含执行结果的字典
"""
person_name_to_find = function_args.get("person_name")
request_context = function_args.get("message_content", "") # 如果没有提供,则为空字符串
if not person_name_to_find:
return {"name": self.name, "content": "错误:必须提供需要重命名的用户昵称 (person_name)。"}
try:
# 1. 根据昵称查找用户信息
logger.debug(f"尝试根据昵称 '{person_name_to_find}' 查找用户...")
person_info = await person_info_manager.get_person_info_by_name(person_name_to_find)
if not person_info:
logger.info(f"未找到昵称为 '{person_name_to_find}' 的用户。")
return {
"name": self.name,
"content": f"找不到昵称为 '{person_name_to_find}' 的用户。请确保输入的是我之前为该用户取的昵称。",
}
person_id = person_info.get("person_id")
user_nickname = person_info.get("nickname") # 这是用户原始昵称
user_cardname = person_info.get("user_cardname")
user_avatar = person_info.get("user_avatar")
if not person_id:
logger.error(f"找到了用户 '{person_name_to_find}' 但无法获取 person_id")
return {"name": self.name, "content": f"找到了用户 '{person_name_to_find}' 但获取内部ID时出错。"}
# 2. 调用 qv_person_name 进行取名
logger.debug(
f"为用户 {person_id} (原昵称: {person_name_to_find}) 调用 qv_person_name请求上下文: '{request_context}'"
)
result = await person_info_manager.qv_person_name(
person_id=person_id,
user_nickname=user_nickname,
user_cardname=user_cardname,
user_avatar=user_avatar,
request=request_context,
)
# 3. 处理结果
if result and result.get("nickname"):
new_name = result["nickname"]
# reason = result.get("reason", "未提供理由")
logger.info(f"成功为用户 {person_id} 取了新昵称: {new_name}")
content = f"已成功将用户 {person_name_to_find} 的备注名更新为 {new_name}"
logger.info(content)
return {"type": "info", "id": f"rename_success_{time.time()}", "content": content}
else:
logger.warning(f"为用户 {person_id} 调用 qv_person_name 后未能成功获取新昵称。")
# 尝试从内存中获取可能已经更新的名字
current_name = await person_info_manager.get_value(person_id, "person_name")
if current_name and current_name != person_name_to_find:
return {
"name": self.name,
"content": f"尝试取新昵称时遇到一点小问题,但我已经将 '{person_name_to_find}' 的昵称更新为 '{current_name}' 了。",
}
else:
return {
"name": self.name,
"content": f"尝试为 '{person_name_to_find}' 取新昵称时遇到了问题,未能成功生成。可能需要稍后再试。",
}
except Exception as e:
error_msg = f"重命名失败: {str(e)}"
logger.error(error_msg, exc_info=True)
return {"type": "info_error", "id": f"rename_error_{time.time()}", "content": error_msg}
# 注册工具
register_tool(RenamePersonTool)