Merge branch 'r-dev' of https://github.com/Mai-with-u/MaiBot into r-dev

This commit is contained in:
SengokuCola
2026-04-03 22:42:29 +08:00
173 changed files with 64683 additions and 1066 deletions

View File

@@ -0,0 +1,133 @@
from typing import Any, Dict, List, Tuple
from src.chat.message_receive.chat_manager import chat_manager as _chat_manager
from src.common.logger import get_logger
from src.person_info.person_info import resolve_person_id_for_memory
from src.services.memory_service import memory_service
logger = get_logger("knowledge_fetcher")
class KnowledgeFetcher:
"""知识调取器"""
def __init__(self, private_name: str, stream_id: str):
self.private_name = private_name
self.stream_id = stream_id
def _resolve_private_memory_context(self) -> Dict[str, str]:
session = _chat_manager.get_session_by_session_id(self.stream_id)
if session is None:
return {"chat_id": self.stream_id}
group_id = str(getattr(session, "group_id", "") or "").strip()
user_id = str(getattr(session, "user_id", "") or "").strip()
platform = str(getattr(session, "platform", "") or "").strip()
person_id = ""
if not group_id:
try:
person_id = resolve_person_id_for_memory(
person_name=self.private_name,
platform=platform,
user_id=user_id,
)
except Exception as exc:
logger.debug(f"[私聊][{self.private_name}]解析人物ID失败: {exc}")
return {
"chat_id": self.stream_id,
"person_id": person_id,
"user_id": user_id,
"group_id": group_id,
}
async def _memory_get_knowledge(self, query: str) -> str:
"""获取相关知识
Args:
query: 查询内容
Returns:
str: 构造好的,带相关度的知识
"""
logger.debug(f"[私聊][{self.private_name}]正在从长期记忆中获取知识")
try:
context = self._resolve_private_memory_context()
search_kwargs = {
"limit": 5,
"mode": "search",
"chat_id": context.get("chat_id", ""),
"person_id": context.get("person_id", ""),
"user_id": context.get("user_id", ""),
"group_id": context.get("group_id", ""),
"respect_filter": True,
}
result = await memory_service.search(query, **search_kwargs)
if not result.success:
logger.warning(
f"[私聊][{self.private_name}]长期记忆查询失败: {result.error or '未知错误'}"
)
return f"长期记忆检索失败:{result.error or '未知错误'}"
if not result.filtered and not result.hits and search_kwargs["person_id"]:
fallback_kwargs = dict(search_kwargs)
fallback_kwargs["person_id"] = ""
logger.debug(f"[私聊][{self.private_name}]人物过滤未命中,退回仅按会话检索长期记忆")
result = await memory_service.search(query, **fallback_kwargs)
if not result.success:
logger.warning(
f"[私聊][{self.private_name}]长期记忆回退查询失败: {result.error or '未知错误'}"
)
return f"长期记忆检索失败:{result.error or '未知错误'}"
knowledge_info = result.to_text(limit=5)
if result.filtered:
logger.debug(f"[私聊][{self.private_name}]长期记忆查询被聊天过滤策略跳过")
else:
logger.debug(f"[私聊][{self.private_name}]长期记忆查询结果: {knowledge_info[:150]}")
return knowledge_info or "未找到匹配的知识"
except Exception as e:
logger.error(f"[私聊][{self.private_name}]长期记忆搜索工具执行失败: {str(e)}")
return "未找到匹配的知识"
async def fetch(self, query: str, chat_history: List[Dict[str, Any]]) -> Tuple[str, str]:
"""获取相关知识
Args:
query: 查询内容
chat_history: 聊天历史 (PFC dict format)
Returns:
Tuple[str, str]: (获取的知识, 知识来源)
"""
_ = chat_history
# NOTE: Hippocampus memory system was redesigned in v0.12.2
# The old get_memory_from_text API no longer exists
# For now, we'll skip the memory retrieval part and only use LPMM knowledge
# TODO: Integrate with new memory system if needed
knowledge_text = ""
sources_text = "无记忆匹配" # 默认值
# # 从记忆中获取相关知识 (DISABLED - old Hippocampus API)
# related_memory = await HippocampusManager.get_instance().get_memory_from_text(
# text=f"{query}\n{chat_history_text}",
# max_memory_num=3,
# max_memory_length=2,
# max_depth=3,
# fast_retrieval=False,
# )
# if related_memory:
# sources = []
# for memory in related_memory:
# knowledge_text += memory[1] + "\n"
# sources.append(f"记忆片段{memory[0]}")
# knowledge_text = knowledge_text.strip()
# sources_text = "".join(sources)
knowledge_text += "\n现在有以下**知识**可供参考:\n "
knowledge_text += await self._memory_get_knowledge(query)
knowledge_text += "\n请记住这些**知识**,并根据**知识**回答问题。\n"
return knowledge_text or "未找到相关知识", sources_text or "无记忆匹配"

View File

@@ -1,90 +0,0 @@
from src.chat.knowledge.embedding_store import EmbeddingManager
from src.chat.knowledge.qa_manager import QAManager
from src.chat.knowledge.kg_manager import KGManager
from src.chat.knowledge.global_logger import logger
from src.config.config import global_config
import os
INVALID_ENTITY = [
"",
"",
"",
"",
"",
"我们",
"你们",
"他们",
"她们",
"它们",
]
RAG_GRAPH_NAMESPACE = "rag-graph"
RAG_ENT_CNT_NAMESPACE = "rag-ent-cnt"
RAG_PG_HASH_NAMESPACE = "rag-pg-hash"
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
DATA_PATH = os.path.join(ROOT_PATH, "data")
qa_manager = None
inspire_manager = None
def get_qa_manager():
return qa_manager
def lpmm_start_up(): # sourcery skip: extract-duplicate-method
# 检查LPMM知识库是否启用
if global_config.lpmm_knowledge.enable:
logger.info("正在初始化Mai-LPMM")
logger.info("创建LLM客户端")
# 初始化Embedding库
embed_manager = EmbeddingManager(
max_workers=global_config.lpmm_knowledge.max_embedding_workers,
chunk_size=global_config.lpmm_knowledge.embedding_chunk_size,
)
logger.info("正在从文件加载Embedding库")
try:
embed_manager.load_from_file()
except Exception as e:
logger.warning(f"此消息不会影响正常使用从文件加载Embedding库时{e}")
# logger.warning("如果你是第一次导入知识,或者还未导入知识,请忽略此错误")
logger.info("Embedding库加载完成")
# 初始化KG
kg_manager = KGManager()
logger.info("正在从文件加载KG")
try:
kg_manager.load_from_file()
except Exception as e:
logger.warning(f"此消息不会影响正常使用从文件加载KG时{e}")
# logger.warning("如果你是第一次导入知识,或者还未导入知识,请忽略此错误")
logger.info("KG加载完成")
logger.info(f"KG节点数量{len(kg_manager.graph.get_node_list())}")
logger.info(f"KG边数量{len(kg_manager.graph.get_edge_list())}")
# 数据比对Embedding库与KG的段落hash集合
for pg_hash in kg_manager.stored_paragraph_hashes:
# 使用与EmbeddingStore中一致的命名空间格式
key = f"paragraph-{pg_hash}"
if key not in embed_manager.stored_pg_hashes:
logger.warning(f"KG中存在Embedding库中不存在的段落{key}")
global qa_manager
# 问答系统(用于知识库)
qa_manager = QAManager(
embed_manager,
kg_manager,
)
# # 记忆激活(用于记忆库)
# global inspire_manager
# inspire_manager = MemoryActiveManager(
# embed_manager,
# llm_client_list[global_config["embedding"]["provider"]],
# )
else:
logger.info("LPMM知识库已禁用跳过初始化")
# 创建空的占位符对象,避免导入错误

View File

@@ -1,381 +0,0 @@
import asyncio
import os
from functools import partial
from typing import List, Callable, Any
from src.chat.knowledge.embedding_store import EmbeddingManager
from src.chat.knowledge.kg_manager import KGManager
from src.chat.knowledge.qa_manager import QAManager
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.knowledge import get_qa_manager, lpmm_start_up
logger = get_logger("LPMM-Plugin-API")
class LPMMOperations:
"""
LPMM 内部操作接口。
封装了 LPMM 的核心操作,供插件系统 API 或其他内部组件调用。
"""
def __init__(self):
self._initialized = False
async def _run_cancellable_executor(self, func: Callable, *args, **kwargs) -> Any:
"""
在线程池中执行可取消的同步操作。
当任务被取消时(如 Ctrl+C会立即响应并抛出 CancelledError。
注意:线程池中的操作可能仍在运行,但协程会立即返回,不会阻塞主进程。
Args:
func: 要执行的同步函数
*args: 函数的位置参数
**kwargs: 函数的关键字参数
Returns:
函数的返回值
Raises:
asyncio.CancelledError: 当任务被取消时
"""
loop = asyncio.get_event_loop()
# 在线程池中执行,当协程被取消时会立即响应
# 虽然线程池中的操作可能仍在运行,但协程不会阻塞
return await loop.run_in_executor(None, func, *args, **kwargs)
async def _get_managers(self) -> tuple[EmbeddingManager, KGManager, QAManager]:
"""获取并确保 LPMM 管理器已初始化"""
qa_mgr = get_qa_manager()
if qa_mgr is None:
# 如果全局没初始化,尝试初始化
if not global_config.lpmm_knowledge.enable:
logger.warning("LPMM 知识库在全局配置中未启用,操作可能受限。")
lpmm_start_up()
qa_mgr = get_qa_manager()
if qa_mgr is None:
raise RuntimeError("无法获取 LPMM QAManager请检查 LPMM 是否已正确安装和配置。")
return qa_mgr.embed_manager, qa_mgr.kg_manager, qa_mgr
async def add_content(self, text: str, auto_split: bool = True) -> dict:
"""
向知识库添加新内容。
Args:
text: 原始文本。
auto_split: 是否自动按双换行符分割段落。
- True: 自动分割(默认),支持多段文本(用双换行分隔)
- False: 不分割,将整个文本作为完整一段处理
Returns:
dict: {"status": "success/error", "count": 导入段落数, "message": "描述"}
"""
try:
embed_mgr, kg_mgr, _ = await self._get_managers()
# 1. 分段处理
if auto_split:
# 自动按双换行符分割
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
else:
# 不分割,作为完整一段
text_stripped = text.strip()
if not text_stripped:
return {"status": "error", "message": "文本内容为空"}
paragraphs = [text_stripped]
if not paragraphs:
return {"status": "error", "message": "文本内容为空"}
# 2. 实体与三元组抽取 (内部调用大模型)
from src.chat.knowledge.ie_process import IEProcess
from src.services.llm_service import LLMServiceClient
llm_ner = LLMServiceClient(
task_name="lpmm_entity_extract", request_type="lpmm.entity_extract"
)
llm_rdf = LLMServiceClient(
task_name="lpmm_rdf_build", request_type="lpmm.rdf_build"
)
ie_process = IEProcess(llm_ner, llm_rdf)
logger.info(f"[Plugin API] 正在对 {len(paragraphs)} 段文本执行信息抽取...")
extracted_docs = await ie_process.process_paragraphs(paragraphs)
# 3. 构造并导入数据
# 这里我们手动实现导入逻辑,不依赖外部脚本
# a. 准备段落
raw_paragraphs = {doc["idx"]: doc["passage"] for doc in extracted_docs}
# b. 准备三元组
triple_list_data = {doc["idx"]: doc["extracted_triples"] for doc in extracted_docs}
# 向量化并入库
# 注意:此处模仿 import_openie.py 的核心逻辑
# 1. 先进行去重检查,只处理新段落
# store_new_data_set 期望的格式raw_paragraphs 的键是段落hash不带前缀值是段落文本
new_raw_paragraphs = {}
new_triple_list_data = {}
for pg_hash, passage in raw_paragraphs.items():
key = f"paragraph-{pg_hash}"
if key not in embed_mgr.stored_pg_hashes:
new_raw_paragraphs[pg_hash] = passage
new_triple_list_data[pg_hash] = triple_list_data[pg_hash]
if not new_raw_paragraphs:
return {"status": "success", "count": 0, "message": "内容已存在,无需重复导入"}
# 2. 使用 EmbeddingManager 的标准方法存储段落、实体和关系的嵌入
# store_new_data_set 会自动处理嵌入生成和存储
# 将同步阻塞操作放到线程池中执行,避免阻塞事件循环
await self._run_cancellable_executor(embed_mgr.store_new_data_set, new_raw_paragraphs, new_triple_list_data)
# 3. 构建知识图谱只需要三元组数据和embedding_manager
await self._run_cancellable_executor(kg_mgr.build_kg, new_triple_list_data, embed_mgr)
# 4. 持久化
await self._run_cancellable_executor(embed_mgr.rebuild_faiss_index)
await self._run_cancellable_executor(embed_mgr.save_to_file)
await self._run_cancellable_executor(kg_mgr.save_to_file)
return {
"status": "success",
"count": len(new_raw_paragraphs),
"message": f"成功导入 {len(new_raw_paragraphs)} 条知识",
}
except asyncio.CancelledError:
logger.warning("[Plugin API] 导入操作被用户中断")
return {"status": "cancelled", "message": "导入操作已被用户中断"}
except Exception as e:
logger.error(f"[Plugin API] 导入知识失败: {e}", exc_info=True)
return {"status": "error", "message": str(e)}
async def search(self, query: str, top_k: int = 3) -> List[str]:
"""
检索知识库。
Args:
query: 查询问题。
top_k: 返回最相关的条目数。
Returns:
List[str]: 相关文段列表。
"""
try:
_, _, qa_mgr = await self._get_managers()
# 直接调用 QAManager 的检索接口
knowledge = qa_mgr.get_knowledge(query, top_k=top_k)
# 返回通常是拼接好的字符串,这里我们可以尝试按其内部规则切分回列表,或者直接返回
return [knowledge] if knowledge else []
except Exception as e:
logger.error(f"[Plugin API] 检索知识失败: {e}")
return []
async def delete(self, keyword: str, exact_match: bool = False) -> dict:
"""
根据关键词或完整文段删除知识库内容。
Args:
keyword: 匹配关键词或完整文段。
exact_match: 是否使用完整文段匹配True=完全匹配False=关键词模糊匹配)。
Returns:
dict: {"status": "success/info", "deleted_count": 删除条数, "message": "描述"}
"""
try:
embed_mgr, kg_mgr, _ = await self._get_managers()
# 1. 查找匹配的段落
to_delete_keys = []
to_delete_hashes = []
for key, item in embed_mgr.paragraphs_embedding_store.store.items():
if exact_match:
# 完整文段匹配
if item.str.strip() == keyword.strip():
to_delete_keys.append(key)
to_delete_hashes.append(key.replace("paragraph-", "", 1))
else:
# 关键词模糊匹配
if keyword in item.str:
to_delete_keys.append(key)
to_delete_hashes.append(key.replace("paragraph-", "", 1))
if not to_delete_keys:
match_type = "完整文段" if exact_match else "关键词"
return {"status": "info", "deleted_count": 0, "message": f"未找到匹配的内容({match_type}匹配)"}
# 2. 执行删除
# 将同步阻塞操作放到线程池中执行,避免阻塞事件循环
# a. 从向量库删除
deleted_count, _ = await self._run_cancellable_executor(
embed_mgr.paragraphs_embedding_store.delete_items, to_delete_keys
)
embed_mgr.stored_pg_hashes = set(embed_mgr.paragraphs_embedding_store.store.keys())
# b. 从知识图谱删除
# 注意:必须使用关键字参数,避免 True 被误当作 ent_hashes 参数
# 使用 partial 来传递关键字参数,因为 run_in_executor 不支持 **kwargs
delete_func = partial(
kg_mgr.delete_paragraphs, to_delete_hashes, ent_hashes=None, remove_orphan_entities=True
)
await self._run_cancellable_executor(delete_func)
# 3. 持久化
await self._run_cancellable_executor(embed_mgr.rebuild_faiss_index)
await self._run_cancellable_executor(embed_mgr.save_to_file)
await self._run_cancellable_executor(kg_mgr.save_to_file)
match_type = "完整文段" if exact_match else "关键词"
return {
"status": "success",
"deleted_count": deleted_count,
"message": f"已成功删除 {deleted_count} 条相关知识({match_type}匹配)",
}
except asyncio.CancelledError:
logger.warning("[Plugin API] 删除操作被用户中断")
return {"status": "cancelled", "message": "删除操作已被用户中断"}
except Exception as e:
logger.error(f"[Plugin API] 删除知识失败: {e}", exc_info=True)
return {"status": "error", "message": str(e)}
async def clear_all(self) -> dict:
"""
清空整个LPMM知识库删除所有段落、实体、关系和知识图谱数据
Returns:
dict: {"status": "success/error", "message": "描述", "stats": {...}}
"""
try:
embed_mgr, kg_mgr, _ = await self._get_managers()
# 记录清空前的统计信息
before_stats = {
"paragraphs": len(embed_mgr.paragraphs_embedding_store.store),
"entities": len(embed_mgr.entities_embedding_store.store),
"relations": len(embed_mgr.relation_embedding_store.store),
"kg_nodes": len(kg_mgr.graph.get_node_list()),
"kg_edges": len(kg_mgr.graph.get_edge_list()),
}
# 将同步阻塞操作放到线程池中执行,避免阻塞事件循环
# 1. 清空所有向量库
# 获取所有keys
para_keys = list(embed_mgr.paragraphs_embedding_store.store.keys())
ent_keys = list(embed_mgr.entities_embedding_store.store.keys())
rel_keys = list(embed_mgr.relation_embedding_store.store.keys())
# 删除所有段落向量
para_deleted, _ = await self._run_cancellable_executor(
embed_mgr.paragraphs_embedding_store.delete_items, para_keys
)
embed_mgr.stored_pg_hashes.clear()
# 删除所有实体向量
if ent_keys:
ent_deleted, _ = await self._run_cancellable_executor(
embed_mgr.entities_embedding_store.delete_items, ent_keys
)
else:
ent_deleted = 0
# 删除所有关系向量
if rel_keys:
rel_deleted, _ = await self._run_cancellable_executor(
embed_mgr.relation_embedding_store.delete_items, rel_keys
)
else:
rel_deleted = 0
# 2. 清空所有 embedding store 的索引和映射
# 确保 faiss_index 和 idx2hash 也被重置,并删除旧的索引文件
def _clear_embedding_indices():
# 清空段落索引
embed_mgr.paragraphs_embedding_store.faiss_index = None
embed_mgr.paragraphs_embedding_store.idx2hash = None
embed_mgr.paragraphs_embedding_store.dirty = False
# 删除旧的索引文件
if os.path.exists(embed_mgr.paragraphs_embedding_store.index_file_path):
os.remove(embed_mgr.paragraphs_embedding_store.index_file_path)
if os.path.exists(embed_mgr.paragraphs_embedding_store.idx2hash_file_path):
os.remove(embed_mgr.paragraphs_embedding_store.idx2hash_file_path)
# 清空实体索引
embed_mgr.entities_embedding_store.faiss_index = None
embed_mgr.entities_embedding_store.idx2hash = None
embed_mgr.entities_embedding_store.dirty = False
# 删除旧的索引文件
if os.path.exists(embed_mgr.entities_embedding_store.index_file_path):
os.remove(embed_mgr.entities_embedding_store.index_file_path)
if os.path.exists(embed_mgr.entities_embedding_store.idx2hash_file_path):
os.remove(embed_mgr.entities_embedding_store.idx2hash_file_path)
# 清空关系索引
embed_mgr.relation_embedding_store.faiss_index = None
embed_mgr.relation_embedding_store.idx2hash = None
embed_mgr.relation_embedding_store.dirty = False
# 删除旧的索引文件
if os.path.exists(embed_mgr.relation_embedding_store.index_file_path):
os.remove(embed_mgr.relation_embedding_store.index_file_path)
if os.path.exists(embed_mgr.relation_embedding_store.idx2hash_file_path):
os.remove(embed_mgr.relation_embedding_store.idx2hash_file_path)
await self._run_cancellable_executor(_clear_embedding_indices)
# 3. 清空知识图谱
# 获取所有段落hash
all_pg_hashes = list(kg_mgr.stored_paragraph_hashes)
if all_pg_hashes:
# 删除所有段落节点(这会自动清理相关的边和孤立实体)
# 注意:必须使用关键字参数,避免 True 被误当作 ent_hashes 参数
# 使用 partial 来传递关键字参数,因为 run_in_executor 不支持 **kwargs
delete_func = partial(
kg_mgr.delete_paragraphs, all_pg_hashes, ent_hashes=None, remove_orphan_entities=True
)
await self._run_cancellable_executor(delete_func)
# 完全清空KG创建新的空图无论是否有段落hash都要执行
from quick_algo import di_graph
kg_mgr.graph = di_graph.DiGraph()
kg_mgr.stored_paragraph_hashes.clear()
kg_mgr.ent_appear_cnt.clear()
# 4. 保存所有数据此时所有store都是空的索引也是None
# 注意即使store为空save_to_file也会保存空的DataFrame这是正确的
await self._run_cancellable_executor(embed_mgr.save_to_file)
await self._run_cancellable_executor(kg_mgr.save_to_file)
after_stats = {
"paragraphs": len(embed_mgr.paragraphs_embedding_store.store),
"entities": len(embed_mgr.entities_embedding_store.store),
"relations": len(embed_mgr.relation_embedding_store.store),
"kg_nodes": len(kg_mgr.graph.get_node_list()),
"kg_edges": len(kg_mgr.graph.get_edge_list()),
}
return {
"status": "success",
"message": f"已成功清空LPMM知识库删除 {para_deleted} 个段落、{ent_deleted} 个实体、{rel_deleted} 个关系)",
"stats": {
"before": before_stats,
"after": after_stats,
},
}
except asyncio.CancelledError:
logger.warning("[Plugin API] 清空操作被用户中断")
return {"status": "cancelled", "message": "清空操作已被用户中断"}
except Exception as e:
logger.error(f"[Plugin API] 清空知识库失败: {e}", exc_info=True)
return {"status": "error", "message": str(e)}
# 内部使用的单例
lpmm_ops = LPMMOperations()

View File

@@ -613,6 +613,13 @@ class ChatBot:
scope=scope,
) # 确保会话存在
try:
from src.services.memory_flow_service import memory_automation_service
await memory_automation_service.on_incoming_message(message)
except Exception as exc:
logger.warning(f"[{session_id}] 长期记忆自动摘要注册失败: {exc}")
# message.update_chat_stream(chat)
# 命令处理 - 使用新插件系统检查并处理命令。

View File

@@ -347,6 +347,13 @@ class UniversalMessageSender:
with get_db_session() as db_session:
db_session.add(message.to_db_instance())
try:
from src.services.memory_flow_service import memory_automation_service
await memory_automation_service.on_message_sent(message)
except Exception as exc:
logger.warning(f"[{chat_id}] 长期记忆人物事实写回注册失败: {exc}")
return sent_msg
except Exception as e: