Merge pull request #1386 from A-Dawn/feat-lpmm知识库加强

LPMM 知识库删除能力与自检脚本增强(附关键健壮性修复)
This commit is contained in:
UnCLAS-Prommer
2025-12-18 22:56:12 +08:00
committed by GitHub
16 changed files with 2505 additions and 55 deletions

View File

@@ -0,0 +1,386 @@
import argparse
import sys
from pathlib import Path
from typing import List, Tuple, Dict, Any
import json
import os
# 强制使用 utf-8避免控制台编码报错
try:
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
except Exception:
pass
# 确保能找到 src 包
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.chat.knowledge.embedding_store import EmbeddingManager
from src.chat.knowledge.kg_manager import KGManager
from src.common.logger import get_logger
from src.chat.knowledge.utils.hash import get_sha256
logger = get_logger("delete_lpmm_items")
def read_hashes(file_path: Path) -> List[str]:
"""读取哈希列表,跳过空行"""
hashes: List[str] = []
for line in file_path.read_text(encoding="utf-8").splitlines():
val = line.strip()
if not val:
continue
hashes.append(val)
return hashes
def read_openie_hashes(file_path: Path) -> List[str]:
"""从 OpenIE JSON 中提取 idx 作为段落哈希"""
data: Dict[str, Any] = json.loads(file_path.read_text(encoding="utf-8"))
docs = data.get("docs", []) if isinstance(data, dict) else []
hashes: List[str] = []
for doc in docs:
idx = doc.get("idx") if isinstance(doc, dict) else None
if isinstance(idx, str) and idx.strip():
hashes.append(idx.strip())
return hashes
def normalize_paragraph_keys(raw_hashes: List[str]) -> Tuple[List[str], List[str]]:
"""将输入规范为完整键和纯哈希两份列表"""
keys: List[str] = []
hashes: List[str] = []
for h in raw_hashes:
if h.startswith("paragraph-"):
keys.append(h)
hashes.append(h.replace("paragraph-", "", 1))
else:
keys.append(f"paragraph-{h}")
hashes.append(h)
return keys, hashes
def main():
parser = argparse.ArgumentParser(description="Delete paragraphs from LPMM knowledge base (vectors + graph).")
parser.add_argument("--hash-file", help="文本文件路径,每行一个 paragraph 哈希或带前缀键")
parser.add_argument("--openie-file", help="OpenIE 输出文件JSON将其 docs.idx 作为待删段落哈希")
parser.add_argument("--raw-file", help="原始 txt 语料文件(按空行分段),可结合 --raw-index 使用")
parser.add_argument(
"--raw-index",
help="在 --raw-file 中要删除的段落索引1 基,支持逗号分隔,例如 1,3",
)
parser.add_argument("--search-text", help="在当前段落库中按子串搜索匹配段落并交互选择删除")
parser.add_argument(
"--search-limit",
type=int,
default=10,
help="--search-text 模式下最多展示的候选段落数量",
)
parser.add_argument("--delete-entities", action="store_true", help="同时删除 OpenIE 文件中的实体节点/嵌入")
parser.add_argument("--delete-relations", action="store_true", help="同时删除 OpenIE 文件中的关系嵌入")
parser.add_argument("--remove-orphan-entities", action="store_true", help="删除删除后孤立的实体节点")
parser.add_argument("--dry-run", action="store_true", help="仅预览将删除的项,不实际修改")
parser.add_argument("--yes", action="store_true", help="跳过交互确认,直接执行删除(谨慎使用)")
parser.add_argument(
"--max-delete-nodes",
type=int,
default=2000,
help="单次最大允许删除的节点数量(段落+实体),超过则需要显式确认或调整该参数",
)
parser.add_argument(
"--non-interactive",
action="store_true",
help=(
"非交互模式:不再通过 input() 询问任何信息;"
"在该模式下,如果需要交互(例如 --search-text 未指定具体条目、未提供 --yes"
"会直接报错退出。"
),
)
args = parser.parse_args()
# 至少需要一种来源
if not (args.hash_file or args.openie_file or args.raw_file or args.search_text):
logger.error("必须指定 --hash-file / --openie-file / --raw-file / --search-text 之一")
sys.exit(1)
raw_hashes: List[str] = []
raw_entities: List[str] = []
raw_relations: List[str] = []
if args.hash_file:
hash_file = Path(args.hash_file)
if not hash_file.exists():
logger.error(f"哈希文件不存在: {hash_file}")
sys.exit(1)
raw_hashes.extend(read_hashes(hash_file))
if args.openie_file:
openie_path = Path(args.openie_file)
if not openie_path.exists():
logger.error(f"OpenIE 文件不存在: {openie_path}")
sys.exit(1)
# 段落
raw_hashes.extend(read_openie_hashes(openie_path))
# 实体/关系(实体同时包含 extracted_entities 与三元组主语/宾语,以匹配 KG 构图逻辑)
try:
data = json.loads(openie_path.read_text(encoding="utf-8"))
docs = data.get("docs", []) if isinstance(data, dict) else []
for doc in docs:
if not isinstance(doc, dict):
continue
ents = doc.get("extracted_entities", [])
if isinstance(ents, list):
raw_entities.extend([e for e in ents if isinstance(e, str)])
triples = doc.get("extracted_triples", [])
if isinstance(triples, list):
for t in triples:
if isinstance(t, list) and len(t) == 3:
subj, _, obj = t
if isinstance(subj, str):
raw_entities.append(subj)
if isinstance(obj, str):
raw_entities.append(obj)
raw_relations.append(str(tuple(t)))
except Exception as e:
logger.error(f"读取 OpenIE 文件失败: {e}")
sys.exit(1)
# 从原始 txt 语料按段落索引选择删除
if args.raw_file:
raw_path = Path(args.raw_file)
if not raw_path.exists():
logger.error(f"原始语料文件不存在: {raw_path}")
sys.exit(1)
text = raw_path.read_text(encoding="utf-8")
paragraphs: List[str] = []
buf = []
for line in text.splitlines():
if line.strip() == "":
if buf:
paragraphs.append("\n".join(buf).strip())
buf = []
else:
buf.append(line)
if buf:
paragraphs.append("\n".join(buf).strip())
if not paragraphs:
logger.error(f"原始语料文件 {raw_path} 中没有解析到任何段落")
sys.exit(1)
if not args.raw_index:
logger.info(f"{raw_path} 共解析出 {len(paragraphs)} 个段落,请通过 --raw-index 指定要删除的段落,例如 --raw-index 1,3")
sys.exit(1)
# 解析索引列表1-based
try:
idx_list = [int(x.strip()) for x in str(args.raw_index).split(",") if x.strip()]
except ValueError:
logger.error(f"--raw-index 解析失败: {args.raw_index}")
sys.exit(1)
for idx in idx_list:
if idx < 1 or idx > len(paragraphs):
logger.error(f"--raw-index 包含无效索引 {idx}(有效范围 1~{len(paragraphs)}")
sys.exit(1)
logger.info("根据原始语料选择段落:")
for idx in idx_list:
para = paragraphs[idx - 1]
h = get_sha256(para)
logger.info(f"- 第 {idx}hash={h},内容预览:{para[:80]}")
raw_hashes.append(h)
# 在现有库中按子串搜索候选段落并交互选择
if args.search_text:
search_text = args.search_text.strip()
if not search_text:
logger.error("--search-text 不能为空")
sys.exit(1)
logger.info(f"正在根据关键字在现有段落库中搜索:{search_text!r}")
em_search = EmbeddingManager()
try:
em_search.load_from_file()
except Exception as e:
logger.error(f"加载嵌入库失败,无法使用 --search-text 功能: {e}")
sys.exit(1)
candidates = []
for key, item in em_search.paragraphs_embedding_store.store.items():
if search_text in item.str:
candidates.append((key, item.str))
if len(candidates) >= args.search_limit:
break
if not candidates:
logger.info("未在现有段落库中找到包含该关键字的段落")
else:
logger.info("找到以下候选段落(输入序号选择要删除的条目,可用逗号分隔,多选):")
for i, (key, text) in enumerate(candidates, start=1):
logger.info(f"{i}. {key} | {text[:80]}")
if args.non_interactive:
logger.error(
"当前处于非交互模式,无法通过输入序号选择要删除的候选段落;"
"如需脚本化删除,请改用 --hash-file / --openie-file / --raw-file 等方式。"
)
sys.exit(1)
choice = input("请输入要删除的序号列表(如 1,3或直接回车取消").strip()
if choice:
try:
idxs = [int(x.strip()) for x in choice.split(",") if x.strip()]
except ValueError:
logger.error("输入的序号列表无法解析,已取消 --search-text 删除")
else:
for i in idxs:
if 1 <= i <= len(candidates):
key, _ = candidates[i - 1]
# key 已是完整的 paragraph-xxx
if key.startswith("paragraph-"):
raw_hashes.append(key.split("paragraph-", 1)[1])
else:
logger.warning(f"忽略无效序号: {i}")
# 去重但保持顺序
seen = set()
raw_hashes = [h for h in raw_hashes if not (h in seen or seen.add(h))]
if not raw_hashes:
logger.error("未读取到任何待删哈希,无操作")
sys.exit(1)
keys, pg_hashes = normalize_paragraph_keys(raw_hashes)
ent_hashes: List[str] = []
rel_hashes: List[str] = []
if args.delete_entities and raw_entities:
ent_hashes = [get_sha256(e) for e in raw_entities]
if args.delete_relations and raw_relations:
rel_hashes = [get_sha256(r) for r in raw_relations]
logger.info("=== 删除操作预备 ===")
logger.info("请确保已备份 data/embedding 与 data/rag必要时可使用 --dry-run 预览")
logger.info(f"待删除段落数量: {len(keys)}")
logger.info(f"示例: {keys[:5]}")
if ent_hashes:
logger.info(f"待删除实体数量: {len(ent_hashes)}")
if rel_hashes:
logger.info(f"待删除关系数量: {len(rel_hashes)}")
total_nodes_to_delete = len(pg_hashes) + (len(ent_hashes) if args.delete_entities else 0)
logger.info(f"本次预计删除节点总数(段落+实体): {total_nodes_to_delete}")
if args.dry_run:
logger.info("dry-run 模式,未执行删除")
return
# 大批次删除保护
if total_nodes_to_delete > args.max_delete_nodes and not args.yes:
logger.error(
f"本次预计删除节点 {total_nodes_to_delete} 个,超过阈值 {args.max_delete_nodes}"
" 为避免误删,请降低批次规模或使用 --max-delete-nodes 调整阈值,并加上 --yes 明确确认。"
)
sys.exit(1)
# 交互确认
if not args.yes:
if args.non_interactive:
logger.error(
"当前处于非交互模式且未指定 --yes出于安全考虑删除操作已被拒绝。\n"
"如确认需要在非交互模式下执行删除,请显式添加 --yes 参数。"
)
sys.exit(1)
confirm = input("确认删除上述数据?输入大写 YES 以继续,其他任意键取消: ").strip()
if confirm != "YES":
logger.info("用户取消删除操作")
return
# 加载嵌入与图
embed_manager = EmbeddingManager()
kg_manager = KGManager()
try:
embed_manager.load_from_file()
kg_manager.load_from_file()
except Exception as e:
logger.error(f"加载现有知识库失败: {e}")
sys.exit(1)
# 记录删除前全局统计,便于对比
before_para_vec = len(embed_manager.paragraphs_embedding_store.store)
before_ent_vec = len(embed_manager.entities_embedding_store.store)
before_rel_vec = len(embed_manager.relation_embedding_store.store)
before_nodes = len(kg_manager.graph.get_node_list())
before_edges = len(kg_manager.graph.get_edge_list())
logger.info(
f"删除前统计: 段落向量={before_para_vec}, 实体向量={before_ent_vec}, 关系向量={before_rel_vec}, "
f"KG节点={before_nodes}, KG边={before_edges}"
)
# 删除向量
deleted, skipped = embed_manager.paragraphs_embedding_store.delete_items(keys)
embed_manager.stored_pg_hashes = set(embed_manager.paragraphs_embedding_store.store.keys())
logger.info(f"段落向量删除完成,删除: {deleted}, 跳过: {skipped}")
ent_deleted = ent_skipped = rel_deleted = rel_skipped = 0
if ent_hashes:
ent_keys = [f"entity-{h}" for h in ent_hashes]
ent_deleted, ent_skipped = embed_manager.entities_embedding_store.delete_items(ent_keys)
logger.info(f"实体向量删除完成,删除: {ent_deleted}, 跳过: {ent_skipped}")
if rel_hashes:
rel_keys = [f"relation-{h}" for h in rel_hashes]
rel_deleted, rel_skipped = embed_manager.relation_embedding_store.delete_items(rel_keys)
logger.info(f"关系向量删除完成,删除: {rel_deleted}, 跳过: {rel_skipped}")
# 删除图节点/边
kg_result = kg_manager.delete_paragraphs(
pg_hashes,
ent_hashes=ent_hashes if args.delete_entities else None,
remove_orphan_entities=args.remove_orphan_entities,
)
logger.info(
f"KG 删除完成,删除: {kg_result.get('deleted', 0)}, 跳过: {kg_result.get('skipped', 0)}, "
f"孤立实体清理: {kg_result.get('orphan_removed', 0)}"
)
# 重建索引并保存
logger.info("重建 Faiss 索引并保存嵌入文件...")
embed_manager.rebuild_faiss_index()
embed_manager.save_to_file()
logger.info("保存 KG 数据...")
kg_manager.save_to_file()
# 删除后统计
after_para_vec = len(embed_manager.paragraphs_embedding_store.store)
after_ent_vec = len(embed_manager.entities_embedding_store.store)
after_rel_vec = len(embed_manager.relation_embedding_store.store)
after_nodes = len(kg_manager.graph.get_node_list())
after_edges = len(kg_manager.graph.get_edge_list())
logger.info(
"删除后统计: 段落向量=%d(%+d), 实体向量=%d(%+d), 关系向量=%d(%+d), KG节点=%d(%+d), KG边=%d(%+d)"
% (
after_para_vec,
after_para_vec - before_para_vec,
after_ent_vec,
after_ent_vec - before_ent_vec,
after_rel_vec,
after_rel_vec - before_rel_vec,
after_nodes,
after_nodes - before_nodes,
after_edges,
after_edges - before_edges,
)
)
logger.info("删除流程完成")
print(
"\n[NOTICE] 删除脚本执行完毕。如主程序(聊天 / WebUI已在运行"
"请重启主程序,或在主程序内部调用一次 lpmm_start_up() 以应用最新 LPMM 知识库。"
)
print("[NOTICE] 如果不清楚 lpmm_start_up 是什么,直接重启主程序即可。")
if __name__ == "__main__":
main()

View File

@@ -4,10 +4,12 @@
# print("未找到quick_algo库无法使用quick_algo算法")
# print("请安装quick_algo库 - 在lib.quick_algo中执行命令python setup.py build_ext --inplace")
import argparse
import sys
import os
import asyncio
from time import sleep
from typing import Optional
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.chat.knowledge.embedding_store import EmbeddingManager
@@ -71,7 +73,12 @@ def hash_deduplicate(
return new_raw_paragraphs, new_triple_list_data
def handle_import_openie(openie_data: OpenIE, embed_manager: EmbeddingManager, kg_manager: KGManager) -> bool:
def handle_import_openie(
openie_data: OpenIE,
embed_manager: EmbeddingManager,
kg_manager: KGManager,
non_interactive: bool = False,
) -> bool:
# sourcery skip: extract-method
# 从OpenIE数据中提取段落原文与三元组列表
# 索引的段落原文
@@ -124,8 +131,13 @@ def handle_import_openie(openie_data: OpenIE, embed_manager: EmbeddingManager, k
logger.info("所有数据均完整,没有发现缺失字段。")
return False
# 新增:提示用户是否删除非法文段继续导入
# 将print移到所有logger.error之后确保不会被冲掉
# 在非交互模式下,不再询问用户,而是直接报错终止
logger.info(f"\n检测到非法文段,共{len(missing_idxs)}条。")
if non_interactive:
logger.error(
"检测到非法文段且当前处于非交互模式,无法询问是否删除非法文段,导入终止。"
)
sys.exit(1)
logger.info("\n是否删除所有非法文段后继续导入?(y/n): ", end="")
user_choice = input().strip().lower()
if user_choice != "y":
@@ -174,20 +186,25 @@ def handle_import_openie(openie_data: OpenIE, embed_manager: EmbeddingManager, k
return True
async def main_async(): # sourcery skip: dict-comprehension
async def main_async(non_interactive: bool = False) -> bool: # sourcery skip: dict-comprehension
# 新增确认提示
print("=== 重要操作确认 ===")
print("OpenIE导入时会大量发送请求可能会撞到请求速度上限请注意选用的模型")
print("同之前样例在本地模型下在70分钟内我们发送了约8万条请求在网络允许下速度会更快")
print("推荐使用硅基流动的Pro/BAAI/bge-m3")
print("每百万Token费用为0.7元")
print("知识导入时,会消耗大量系统资源,建议在较好配置电脑上运行")
print("同上样例导入时10700K几乎跑满14900HX占用80%峰值内存占用约3G")
confirm = input("确认继续执行?(y/n): ").strip().lower()
if confirm != "y":
logger.info("用户取消操作")
print("操作已取消")
sys.exit(1)
if non_interactive:
logger.warning(
"当前处于非交互模式,将跳过导入开销确认提示,直接开始执行 OpenIE 导入。"
)
else:
print("=== 重要操作确认 ===")
print("OpenIE导入时会大量发送请求可能会撞到请求速度上限请注意选用的模型")
print("同之前样例在本地模型下在70分钟内我们发送了约8万条请求在网络允许下速度会更快")
print("推荐使用硅基流动的Pro/BAAI/bge-m3")
print("每百万Token费用为0.7元")
print("知识导入时,会消耗大量系统资源,建议在较好配置电脑上运行")
print("同上样例导入时10700K几乎跑满14900HX占用80%峰值内存占用约3G")
confirm = input("确认继续执行?(y/n): ").strip().lower()
if confirm != "y":
logger.info("用户取消操作")
print("操作已取消")
sys.exit(1)
print("\n" + "=" * 40 + "\n")
ensure_openie_dir() # 确保OpenIE目录存在
logger.info("----开始导入openie数据----\n")
@@ -235,14 +252,27 @@ async def main_async(): # sourcery skip: dict-comprehension
except Exception as e:
logger.error(f"导入OpenIE数据文件时发生错误{e}")
return False
if handle_import_openie(openie_data, embed_manager, kg_manager) is False:
if handle_import_openie(openie_data, embed_manager, kg_manager, non_interactive=non_interactive) is False:
logger.error("处理OpenIE数据时发生错误")
return False
return None
return True
def main():
"""主函数 - 设置新的事件循环并运行异步主函数"""
def main(argv: Optional[list[str]] = None) -> None:
"""主函数 - 解析参数并运行异步主流程。"""
parser = argparse.ArgumentParser(
description=(
"OpenIE 导入脚本:读取 data/openie 中的 OpenIE JSON 批次,"
"将其导入到 LPMM 的向量库与知识图中。"
)
)
parser.add_argument(
"--non-interactive",
action="store_true",
help="非交互模式:跳过导入确认提示以及非法文段删除询问,遇到非法文段时直接报错退出。",
)
args = parser.parse_args(argv)
# 检查是否有现有的事件循环
try:
loop = asyncio.get_running_loop()
@@ -255,13 +285,22 @@ def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
ok: bool = False
try:
# 在新的事件循环中运行异步主函数
loop.run_until_complete(main_async())
ok = loop.run_until_complete(main_async(non_interactive=args.non_interactive))
print(
"\n[NOTICE] OpenIE 导入脚本执行完毕。如主程序(聊天 / WebUI已在运行"
"请重启主程序,或在主程序内部调用一次 lpmm_start_up() 以应用最新 LPMM 知识库。"
)
print("[NOTICE] 如果不清楚 lpmm_start_up 是什么,直接重启主程序即可。")
finally:
# 确保事件循环被正确关闭
if not loop.is_closed():
loop.close()
if not ok:
# 统一错误码,方便在非交互场景下检测失败
sys.exit(1)
if __name__ == "__main__":

View File

@@ -1,3 +1,4 @@
import argparse
import json
import os
import signal
@@ -5,6 +6,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock, Event
import sys
import datetime
from typing import Optional
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
# 添加项目根目录到 sys.path
@@ -115,22 +117,34 @@ def signal_handler(_signum, _frame):
sys.exit(0)
def main(): # sourcery skip: comprehension-to-generator, extract-method
def _run(non_interactive: bool = False) -> None: # sourcery skip: comprehension-to-generator, extract-method
# 设置信号处理器
signal.signal(signal.SIGINT, signal_handler)
ensure_dirs() # 确保目录存在
# 新增用户确认提示
print("=== 重要操作确认,请认真阅读以下内容哦 ===")
print("实体提取操作将会花费较多api余额和时间建议在空闲时段执行。")
print("举例600万字全剧情提取选用deepseek v3 0324消耗约40元约3小时")
print("建议使用硅基流动的非Pro模型")
print("或者使用可以用赠金抵扣的Pro模型")
print("请确保账户余额充足,并且在执行前确认无误。")
confirm = input("确认继续执行?(y/n): ").strip().lower()
if confirm != "y":
logger.info("用户取消操作")
print("操作已取消")
sys.exit(1)
if non_interactive:
logger.warning(
"当前处于非交互模式,将跳过费用与时长确认提示,直接开始进行实体提取操作"
)
else:
print("=== 重要操作确认,请认真阅读以下内容哦 ===")
print("实体提取操作将会花费较多api余额和时间建议在空闲时段执行。")
print("举例600万字全剧情提取选用deepseek v3 0324消耗约40元约3小时。")
print("建议使用硅基流动的非Pro模型")
print("或者使用可以用赠金抵扣的Pro模型")
print("请确保账户余额充足,并且在执行前确认无误。")
confirm = input("确认继续执行?(y/n): ").strip().lower()
if confirm != "y":
logger.info("用户取消操作")
print("操作已取消")
sys.exit(1)
# 友好提示:说明“网络错误(可重试)”日志属于正常自动重试行为,避免用户误以为任务失败
print(
"\n提示:在提取过程中,如果看到模型出现“网络错误(可重试)”等日志,"
"表示系统正在自动重试请求,一般不会影响整体导入结果,请耐心等待即可。\n"
)
print("\n" + "=" * 40 + "\n")
ensure_dirs() # 确保目录存在
logger.info("--------进行信息提取--------\n")
@@ -215,5 +229,22 @@ def main(): # sourcery skip: comprehension-to-generator, extract-method
logger.info(f"提取失败的文段SHA256{failed_sha256}")
def main(argv: Optional[list[str]] = None) -> None:
parser = argparse.ArgumentParser(
description=(
"LPMM 信息提取脚本:从 data/lpmm_raw_data/*.txt 中读取原始段落,"
"调用 LLM 提取实体和三元组,并生成 OpenIE JSON 批次文件。"
)
)
parser.add_argument(
"--non-interactive",
action="store_true",
help="非交互模式:跳过费用确认提示,直接开始执行;适用于 CI / 定时任务等场景。",
)
args = parser.parse_args(argv)
_run(non_interactive=args.non_interactive)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,132 @@
import argparse
import json
import os
import sys
from pathlib import Path
from typing import List, Tuple
# 确保能导入 src.*
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.chat.knowledge.utils.hash import get_sha256
from src.chat.knowledge.embedding_store import EmbeddingManager
from src.chat.knowledge.kg_manager import KGManager
from src.common.logger import get_logger
logger = get_logger("inspect_lpmm_batch")
def load_openie_hashes(path: Path) -> Tuple[List[str], List[str], List[str]]:
"""从 OpenIE JSON 中提取段落 / 实体 / 关系的哈希
注意:实体既包括 extracted_entities 中的条目,也包括三元组中的主语/宾语,
以与 KG 构图逻辑保持一致。
"""
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
pg_hashes: List[str] = []
ent_hashes: List[str] = []
rel_hashes: List[str] = []
for doc in data.get("docs", []):
if not isinstance(doc, dict):
continue
idx = doc.get("idx")
if isinstance(idx, str) and idx.strip():
pg_hashes.append(idx.strip())
ents = doc.get("extracted_entities", [])
if isinstance(ents, list):
for e in ents:
if isinstance(e, str):
ent_hashes.append(get_sha256(e))
triples = doc.get("extracted_triples", [])
if isinstance(triples, list):
for t in triples:
if isinstance(t, list) and len(t) == 3:
# 主语/宾语作为实体参与构图
subj, _, obj = t
if isinstance(subj, str):
ent_hashes.append(get_sha256(subj))
if isinstance(obj, str):
ent_hashes.append(get_sha256(obj))
rel_hashes.append(get_sha256(str(tuple(t))))
# 去重但保留顺序
def unique(seq: List[str]) -> List[str]:
seen = set()
return [x for x in seq if not (x in seen or seen.add(x))]
return unique(pg_hashes), unique(ent_hashes), unique(rel_hashes)
def main() -> None:
parser = argparse.ArgumentParser(
description="检查指定 OpenIE 文件对应批次在当前向量库与 KG 中的存在情况(用于验证删除效果)。"
)
parser.add_argument("--openie-file", required=True, help="OpenIE 输出 JSON 文件路径")
args = parser.parse_args()
openie_path = Path(args.openie_file)
if not openie_path.exists():
logger.error(f"OpenIE 文件不存在: {openie_path}")
sys.exit(1)
pg_hashes, ent_hashes, rel_hashes = load_openie_hashes(openie_path)
logger.info(
f"{openie_path.name} 解析到 段落 {len(pg_hashes)} 条,实体 {len(ent_hashes)} 个,关系 {len(rel_hashes)}"
)
# 加载当前嵌入与 KG
em = EmbeddingManager()
kg = KGManager()
try:
em.load_from_file()
kg.load_from_file()
except Exception as e:
logger.error(f"加载当前知识库失败: {e}")
sys.exit(1)
graph_nodes = set(kg.graph.get_node_list())
# 检查段落
pg_keys = [f"paragraph-{h}" for h in pg_hashes]
pg_in_vec = sum(1 for k in pg_keys if k in em.paragraphs_embedding_store.store)
pg_in_kg = sum(1 for k in pg_keys if k in graph_nodes)
# 检查实体
ent_keys = [f"entity-{h}" for h in ent_hashes]
ent_in_vec = sum(1 for k in ent_keys if k in em.entities_embedding_store.store)
ent_in_kg = sum(1 for k in ent_keys if k in graph_nodes)
# 检查关系(只针对向量库)
rel_keys = [f"relation-{h}" for h in rel_hashes]
rel_in_vec = sum(1 for k in rel_keys if k in em.relation_embedding_store.store)
print("==== 批次存在情况(删除前/后对比用) ====")
print(f"段落: 总计 {len(pg_keys)}, 向量库剩余 {pg_in_vec}, KG 中剩余 {pg_in_kg}")
print(f"实体: 总计 {len(ent_keys)}, 向量库剩余 {ent_in_vec}, KG 中剩余 {ent_in_kg}")
print(f"关系: 总计 {len(rel_keys)}, 向量库剩余 {rel_in_vec}")
# 打印少量仍存在的样例,便于检查内容是否正常
sample_pg = [k for k in pg_keys if k in graph_nodes][:3]
if sample_pg:
print("\n仍在 KG 中的段落节点示例:")
for k in sample_pg:
nd = kg.graph[k]
content = nd["content"] if "content" in nd else k
print(f"- {k}: {content[:80]}")
sample_ent = [k for k in ent_keys if k in graph_nodes][:3]
if sample_ent:
print("\n仍在 KG 中的实体节点示例:")
for k in sample_ent:
nd = kg.graph[k]
content = nd["content"] if "content" in nd else k
print(f"- {k}: {content[:80]}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,71 @@
import os
import sys
from typing import Set
# 保证可以导入 src.*
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.chat.knowledge.embedding_store import EmbeddingManager
from src.chat.knowledge.kg_manager import KGManager
from src.common.logger import get_logger
logger = get_logger("inspect_lpmm_global")
def main() -> None:
"""检查当前整库(所有批次)的向量与 KG 状态,用于观察删除对剩余数据的影响。"""
em = EmbeddingManager()
kg = KGManager()
try:
em.load_from_file()
kg.load_from_file()
except Exception as e:
logger.error(f"加载当前知识库失败: {e}")
sys.exit(1)
# 向量库统计
para_cnt = len(em.paragraphs_embedding_store.store)
ent_cnt_vec = len(em.entities_embedding_store.store)
rel_cnt_vec = len(em.relation_embedding_store.store)
# KG 统计
nodes = kg.graph.get_node_list()
edges = kg.graph.get_edge_list()
node_set: Set[str] = set(nodes)
para_nodes = [n for n in nodes if n.startswith("paragraph-")]
ent_nodes = [n for n in nodes if n.startswith("entity-")]
print("==== 向量库统计 ====")
print(f"段落向量条数: {para_cnt}")
print(f"实体向量条数: {ent_cnt_vec}")
print(f"关系向量条数: {rel_cnt_vec}")
print("\n==== KG 图统计 ====")
print(f"节点总数: {len(nodes)}")
print(f"边总数: {len(edges)}")
print(f"段落节点数: {len(para_nodes)}")
print(f"实体节点数: {len(ent_nodes)}")
# ent_appear_cnt 状态
ent_cnt_meta = len(kg.ent_appear_cnt)
print(f"\n实体计数表条目数: {ent_cnt_meta}")
# 抽样查看剩余段落/实体内容
print("\n==== 剩余段落示例(最多 3 条) ====")
for nid in para_nodes[:3]:
nd = kg.graph[nid]
content = nd["content"] if "content" in nd else nid
print(f"- {nid}: {content[:80]}")
print("\n==== 剩余实体示例(最多 5 条) ====")
for nid in ent_nodes[:5]:
nd = kg.graph[nid]
content = nd["content"] if "content" in nd else nid
print(f"- {nid}: {content[:80]}")
if __name__ == "__main__":
main()

541
scripts/lpmm_manager.py Normal file
View File

@@ -0,0 +1,541 @@
import argparse
import os
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional, List
# 尽量统一控制台编码为 utf-8避免中文输出报错
try:
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
except Exception:
pass
# 确保能导入 src.* 以及同目录脚本
CURRENT_DIR = os.path.dirname(__file__)
PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, ".."))
if PROJECT_ROOT not in sys.path:
sys.path.append(PROJECT_ROOT)
from src.common.logger import get_logger # type: ignore
from src.config.config import global_config, model_config # type: ignore
# 引入各功能脚本的入口函数
from import_openie import main as import_openie_main # type: ignore
from info_extraction import main as info_extraction_main # type: ignore
from delete_lpmm_items import main as delete_lpmm_items_main # type: ignore
from inspect_lpmm_batch import main as inspect_lpmm_batch_main # type: ignore
from inspect_lpmm_global import main as inspect_lpmm_global_main # type: ignore
from refresh_lpmm_knowledge import main as refresh_lpmm_knowledge_main # type: ignore
from test_lpmm_retrieval import main as test_lpmm_retrieval_main # type: ignore
from raw_data_preprocessor import load_raw_data # type: ignore
logger = get_logger("lpmm_manager")
ACTION_INFO = {
"prepare_raw": "预处理 data/lpmm_raw_data/*.txt按空行切分为段落并做去重统计",
"info_extract": "原始 txt -> OpenIE 信息抽取(调用 info_extraction.py",
"import_openie": "导入 OpenIE 批次到向量库与知识图(调用 import_openie.py",
"delete": "删除/回滚知识(调用 delete_lpmm_items.py",
"batch_inspect": "检查指定 OpenIE 批次在当前库中的存在情况(调用 inspect_lpmm_batch.py",
"global_inspect": "查看当前整库向量与 KG 状态(调用 inspect_lpmm_global.py",
"refresh": "刷新 LPMM 磁盘数据到内存(调用 refresh_lpmm_knowledge.py",
"test": "运行 LPMM 检索效果回归测试(调用 test_lpmm_retrieval.py",
"embedding_helper": "嵌入模型迁移辅助:查看当前嵌入模型/维度并归档 embedding_model_test.json",
"full_import": "一键执行:信息抽取 -> 导入 OpenIE -> 刷新",
}
def _with_overridden_argv(extra_args: List[str], target_main) -> None:
"""在不修改子脚本的前提下,临时覆盖 sys.argv 以透传参数。"""
old_argv = list(sys.argv)
try:
# 第 0 个元素为“程序名”,后续元素为实际参数
# 这里不再插入类似 delete_lpmm_items.py 的占位,避免被 argparse 误识别为位置参数
sys.argv = [old_argv[0]] + extra_args
target_main()
finally:
sys.argv = old_argv
def _check_before_info_extract(non_interactive: bool = False) -> bool:
"""信息抽取前的轻量级检查。"""
raw_dir = Path(PROJECT_ROOT) / "data" / "lpmm_raw_data"
txt_files = list(raw_dir.glob("*.txt"))
if not txt_files:
msg = (
f"[WARN] 未在 {raw_dir} 下找到任何 .txt 原始语料文件,"
"info_extraction 可能立即退出或无数据可处理。"
)
print(msg)
if non_interactive:
logger.error(
"非交互模式下要求原始语料目录中已存在可用的 .txt 文件,请先准备好数据再重试。"
)
return False
cont = input("仍然继续执行信息提取吗?(y/n): ").strip().lower()
return cont == "y"
return True
def _check_before_import_openie(non_interactive: bool = False) -> bool:
"""导入 OpenIE 前的轻量级检查。"""
openie_dir = Path(PROJECT_ROOT) / "data" / "openie"
json_files = list(openie_dir.glob("*.json"))
if not json_files:
msg = (
f"[WARN] 未在 {openie_dir} 下找到任何 OpenIE JSON 文件,"
"import_openie 可能会因为找不到批次而失败。"
)
print(msg)
if non_interactive:
logger.error(
"非交互模式下要求 data/openie 目录中已存在可用的 OpenIE JSON 文件,请先执行信息提取脚本。"
)
return False
cont = input("仍然继续执行导入吗?(y/n): ").strip().lower()
return cont == "y"
return True
def _warn_if_lpmm_disabled() -> None:
"""在部分操作前提醒 lpmm_knowledge.enable 状态。"""
try:
if not getattr(global_config.lpmm_knowledge, "enable", False):
print(
"[WARN] 当前配置 lpmm_knowledge.enable = false"
"刷新或检索测试可能无法在聊天侧真正启用 LPMM。"
)
except Exception:
# 配置异常时不阻断主流程,仅忽略提示
pass
def run_action(action: str, extra_args: Optional[List[str]] = None) -> None:
"""根据动作名称调度到对应脚本。
这里不重复解析子参数,而是直接调用各脚本的 main()
让子脚本保留原有的交互/参数行为。
"""
logger.info("开始执行操作: %s", action)
extra_args = extra_args or []
try:
if action == "prepare_raw":
logger.info("开始预处理原始语料 (data/lpmm_raw_data/*.txt)...")
sha_list, raw_data = load_raw_data()
print(
f"\n[PREPARE_RAW] 完成原始语料预处理:共 {len(raw_data)} 条段落,"
f"去重后哈希数 {len(sha_list)}"
)
elif action == "info_extract":
if not _check_before_info_extract("--non-interactive" in extra_args):
print("已根据用户选择,取消执行信息提取。")
return
_with_overridden_argv(extra_args, info_extraction_main)
elif action == "import_openie":
if not _check_before_import_openie("--non-interactive" in extra_args):
print("已根据用户选择,取消执行导入。")
return
_with_overridden_argv(extra_args, import_openie_main)
elif action == "delete":
_with_overridden_argv(extra_args, delete_lpmm_items_main)
elif action == "batch_inspect":
_with_overridden_argv(extra_args, inspect_lpmm_batch_main)
elif action == "global_inspect":
_with_overridden_argv(extra_args, inspect_lpmm_global_main)
elif action == "refresh":
_warn_if_lpmm_disabled()
_with_overridden_argv(extra_args, refresh_lpmm_knowledge_main)
elif action == "test":
_warn_if_lpmm_disabled()
_with_overridden_argv(extra_args, test_lpmm_retrieval_main)
elif action == "embedding_helper":
# 嵌入模型迁移辅助:查看当前嵌入模型/维度并归档 embedding_model_test.json
_run_embedding_helper()
elif action == "full_import":
# 一键流水线:预处理原始语料 -> 信息抽取 -> 导入 -> 刷新
logger.info("开始 full_import预处理原始语料 -> 信息抽取 -> 导入 -> 刷新")
sha_list, raw_data = load_raw_data()
print(
f"\n[FULL_IMPORT] 原始语料预处理完成:共 {len(raw_data)} 条段落,"
f"去重后哈希数 {len(sha_list)}"
)
non_interactive = "--non-interactive" in extra_args
if not _check_before_info_extract(non_interactive):
print("已根据用户选择,取消 full_import信息提取阶段被取消")
return
# 使用与单步 info_extract 相同的参数透传机制,确保 --non-interactive 等生效
_with_overridden_argv(extra_args, info_extraction_main)
if not _check_before_import_openie(non_interactive):
print("已根据用户选择,取消 full_import导入阶段被取消")
return
_with_overridden_argv(extra_args, import_openie_main)
_warn_if_lpmm_disabled()
_with_overridden_argv(extra_args, refresh_lpmm_knowledge_main)
else:
logger.error("未知操作: %s", action)
except KeyboardInterrupt:
logger.info("用户中断当前操作Ctrl+C")
except SystemExit:
# 子脚本里大量使用 sys.exit直接透传即可
raise
except Exception as exc: # pragma: no cover - 防御性兜底
logger.error("执行操作 %s 时发生未捕获异常: %s", action, exc)
raise
def print_menu() -> None:
print("\n===== LPMM 管理菜单 =====")
for idx, key in enumerate(
[
"prepare_raw",
"info_extract",
"import_openie",
"delete",
"batch_inspect",
"global_inspect",
"refresh",
"test",
"embedding_helper",
"full_import",
],
start=1,
):
desc = ACTION_INFO.get(key, "")
print(f"{idx}. {key:14s} - {desc}")
print("0. 退出")
print("=========================")
def interactive_loop() -> None:
"""交互式选择模式。"""
key_order = [
"prepare_raw",
"info_extract",
"import_openie",
"delete",
"batch_inspect",
"global_inspect",
"refresh",
"test",
"embedding_helper",
"full_import",
]
while True:
print_menu()
choice = input("请输入选项编号0-10").strip()
if choice in ("0", "q", "Q", "quit", "exit"):
print("已退出 LPMM 管理器。")
return
try:
idx = int(choice)
except ValueError:
print("输入无效,请输入 0-10 之间的数字。")
continue
if not (1 <= idx <= len(key_order)):
print("输入编号超出范围,请重新输入。")
continue
action = key_order[idx - 1]
print(f"\n你选择了: {action} - {ACTION_INFO.get(action, '')}")
confirm = input("确认执行该操作?(y/n): ").strip().lower()
if confirm != "y":
print("已取消当前操作。\n")
continue
# 通过交互式问题,尽量帮用户补全对应脚本的常用参数
extra_args: List[str] = []
if action == "delete":
extra_args = _interactive_build_delete_args()
elif action == "batch_inspect":
extra_args = _interactive_build_batch_inspect_args()
elif action == "test":
extra_args = _interactive_build_test_args()
else:
extra_args = []
run_action(action, extra_args=extra_args)
print("\n当前操作已结束,回到主菜单。\n")
def _interactive_choose_openie_file(prompt: str) -> Optional[str]:
"""在 data/openie 下列出可选 JSON 文件,并返回用户选择的路径。"""
openie_dir = Path(PROJECT_ROOT) / "data" / "openie"
files = sorted(openie_dir.glob("*.json"))
if not files:
print(f"[WARN] 在 {openie_dir} 下没有找到任何 OpenIE JSON 文件。")
return input(prompt).strip() or None
print("\n可选的 OpenIE 批次文件:")
for i, f in enumerate(files, start=1):
print(f"{i}. {f.name}")
print("0. 手动输入完整路径")
while True:
choice = input("请选择文件编号:").strip()
if choice == "0":
manual = input(prompt).strip()
return manual or None
try:
idx = int(choice)
except ValueError:
print("请输入合法的编号。")
continue
if 1 <= idx <= len(files):
return str(files[idx - 1])
print("编号超出范围,请重试。")
def _interactive_build_delete_args() -> List[str]:
"""为 delete_lpmm_items 构造常见参数,减少二次交互。"""
print(
"\n[DELETE] 请选择删除方式:\n"
"1. 按哈希文件删除 (--hash-file)\n"
"2. 按 OpenIE 批次删除 (--openie-file)\n"
"3. 按原始语料文件 + 段落索引删除 (--raw-file + --raw-index)\n"
"4. 按关键字搜索现有段落 (--search-text)\n"
"回车跳过,由子脚本自行交互。"
)
mode = input("输入选项编号1-4或回车跳过").strip()
args: List[str] = []
if mode == "1":
path = input("请输入哈希文件路径(每行一个 hash").strip()
if path:
args += ["--hash-file", path]
elif mode == "2":
path = _interactive_choose_openie_file("请输入 OpenIE JSON 文件路径:")
if path:
args += ["--openie-file", path]
elif mode == "3":
raw_file = input("请输入原始语料 txt 文件路径:").strip()
raw_index = input("请输入要删除的段落索引(如 1,3").strip()
if raw_file and raw_index:
args += ["--raw-file", raw_file, "--raw-index", raw_index]
elif mode == "4":
text = input("请输入用于搜索的关键字(出现在段落原文中):").strip()
if text:
args += ["--search-text", text]
else:
# 留空则完全交给子脚本交互
return []
# 进一步询问与安全相关的布尔选项
print(
"\n[DELETE] 接下来是一些安全相关选项的说明:\n"
"- 删除实体向量/节点:会一并清理与这些段落关联的实体节点及其向量;\n"
"- 删除关系向量:在上面的基础上,额外清理关系向量(一般与删除实体一同使用);\n"
"- 删除孤立实体节点:删除后若实体不再连接任何段落,将其从图中移除,避免残留孤点;\n"
"- dry-run只预览将要删除的内容不真正修改任何数据\n"
"- 跳过交互确认(--yes):直接执行删除操作,适合脚本化或已充分确认的场景;\n"
"- 单次最大删除节点数上限:防止一次性删除规模过大,起到误操作保护作用;\n"
"- 一般情况下建议同时删除实体向量/节点/关系向量/节点,以确保知识图谱的完整性。"
)
# 快速选项:按推荐方式清理所有相关实体/关系
quick_all = input(
"是否使用推荐策略:同时删除关联的实体向量/节点、关系向量,并清理孤立实体?(Y/n): "
).strip().lower()
if quick_all in ("", "y", "yes"):
args.extend(["--delete-entities", "--delete-relations", "--remove-orphan-entities"])
else:
# 仅当未使用快速方案时,再逐项询问
if input("是否同时删除实体向量/节点?(y/N): ").strip().lower() == "y":
args.append("--delete-entities")
if input("是否同时删除关系向量?(y/N): ").strip().lower() == "y":
args.append("--delete-relations")
if input("是否删除孤立实体节点?(y/N): ").strip().lower() == "y":
args.append("--remove-orphan-entities")
if input("是否以 dry-run 预览而不真正删除?(y/N): ").strip().lower() == "y":
args.append("--dry-run")
else:
if input("是否跳过交互确认直接删除?(默认否,请谨慎) (y/N): ").strip().lower() == "y":
args.append("--yes")
max_nodes = input("单次最大删除节点数上限(回车使用默认 2000").strip()
if max_nodes:
args += ["--max-delete-nodes", max_nodes]
return args
def _interactive_build_batch_inspect_args() -> List[str]:
"""为 inspect_lpmm_batch 构造 --openie-file 参数。"""
path = _interactive_choose_openie_file(
"请输入要检查的 OpenIE JSON 文件路径(回车跳过,由子脚本自行交互):"
)
if not path:
return []
return ["--openie-file", path]
def _interactive_build_test_args() -> List[str]:
"""为 test_lpmm_retrieval 构造自定义测试用例参数。"""
print(
"\n[TEST] 你可以:\n"
"- 直接回车使用内置的默认测试用例;\n"
"- 或者输入一条自定义问题,并指定期望命中的关键字。"
)
query = input("请输入自定义测试问题(回车则使用默认用例):").strip()
if not query:
return []
expect = input("请输入期望命中的关键字(可选,多项用逗号分隔):").strip()
args: List[str] = ["--query", query]
if expect:
for kw in expect.split(","):
kw = kw.strip()
if kw:
args.extend(["--expect-keyword", kw])
return args
def _run_embedding_helper() -> None:
"""嵌入模型迁移辅助:展示当前配置,并安全归档 embedding_model_test.json。"""
from src.chat.knowledge.embedding_store import EMBEDDING_TEST_FILE # type: ignore
# 1. 读取当前配置中的嵌入维度与模型信息
current_dim = getattr(getattr(global_config, "lpmm_knowledge", None), "embedding_dimension", None)
embed_task = getattr(model_config.model_task_config, "embedding", None)
model_ids: List[str] = []
if embed_task is not None:
model_ids = getattr(embed_task, "model_list", []) or []
primary_model = model_ids[0] if model_ids else "unknown"
safe_model_name = re.sub(r"[^0-9A-Za-z_.-]+", "_", primary_model) or "unknown"
print("\n===== 嵌入模型迁移辅助 (embedding_helper) =====")
print(f"- 当前嵌入模型标识model_task_config.embedding.model_list[0]: {primary_model}")
print(f"- 当前配置中的嵌入维度 (lpmm_knowledge.embedding_dimension): {current_dim}")
print(f"- 测试文件路径: {EMBEDDING_TEST_FILE}")
new_dim = input(
"\n如果你计划更换嵌入模型,请在此输入“新的嵌入维度”(仅用于记录与提示,回车则跳过):"
).strip()
if new_dim and not new_dim.isdigit():
print("输入的维度不是纯数字,已取消操作。")
return
print(
"\n[重要提示]\n"
"- 修改嵌入模型或维度会导致当前磁盘中的旧知识库data/embedding 下的向量)与新模型不兼容;\n"
"- 这通常意味着你需要清空旧的向量/图数据,并重新执行 LPMM 导入流水线;\n"
"- 请仅在你**确定要切换嵌入模型/维度**时再继续。\n"
)
confirm = input("是否已充分评估风险,并准备切换嵌入模型/维度?(y/N): ").strip().lower()
if confirm != "y":
print("已根据你的选择取消嵌入模型迁移辅助操作。")
return
print(
"\n接下来请手动完成以下操作(脚本不会自动修改配置或删除知识库):\n"
f"1. 在配置文件中,将 lpmm_knowledge.embedding_dimension 从 {current_dim} 修改为你计划使用的新维度"
+ (f"(例如 {new_dim}" if new_dim else "") # 仅作为示例
+ "\n"
"2. 根据需要,清空 data/embedding 与相关 KG 数据data/rag 等),然后重新执行导入流水线;\n"
"3. 本脚本将帮助你归档当前的 embedding_model_test.json避免旧测试文件干扰新模型的校验。\n"
)
# 2. 归档 embedding_model_test.json
test_path = Path(EMBEDDING_TEST_FILE)
if not test_path.exists():
print(f"\n[INFO] 未在 {test_path} 发现 embedding_model_test.json无需归档。")
return
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
archive_name = f"embedding_model_test-{safe_model_name}-{ts}.json"
archive_path = test_path.with_name(archive_name)
# 若不巧重名,简单追加后缀避免覆盖
suffix_id = 1
while archive_path.exists():
archive_name = f"embedding_model_test-{safe_model_name}-{ts}-{suffix_id}.json"
archive_path = test_path.with_name(archive_name)
suffix_id += 1
try:
test_path.rename(archive_path)
except Exception as exc: # pragma: no cover - 防御性兜底
logger.error("归档 embedding_model_test.json 失败: %s", exc)
print(f"[ERROR] 归档 embedding_model_test.json 失败,请检查文件权限与路径。错误详情已写入日志。")
return
print(
f"\n[OK] 已将 {test_path.name} 重命名为 {archive_path.name}\n"
f"- 归档位置: {archive_path}\n"
"- 之后再次运行涉及嵌入模型的一致性校验时,将会基于当前配置与新模型生成新的测试文件。\n"
"- 在完成配置修改与知识库重导入前,请不要手动再创建名为 embedding_model_test.json 的文件。"
)
def parse_args(argv: Optional[list[str]] = None) -> tuple[argparse.Namespace, List[str]]:
parser = argparse.ArgumentParser(
description=(
"LPMM 管理脚本:集中入口管理 LPMM 的导入 / 删除 / 自检 / 刷新 / 测试等功能。\n"
"可以通过 --interactive 进入菜单模式,也可以使用 --action 直接执行单个操作。"
)
)
parser.add_argument(
"-i",
"--interactive",
action="store_true",
help="进入交互式菜单模式(推荐给手动运维使用)",
)
parser.add_argument(
"-a",
"--action",
choices=list(ACTION_INFO.keys()),
help="直接执行指定操作(非交互模式)",
)
parser.add_argument(
"--non-interactive",
action="store_true",
help=(
"启用非交互模式lpmm_manager 自身不会再通过 input() 询问是否继续前置检查;"
"并会将 --non-interactive 透传给子脚本,以避免子脚本中的交互式确认。"
),
)
# 允许在管理脚本之后继续跟随子脚本参数,例如:
# python lpmm_manager.py -a delete -- --hash-file xxx --yes
args, unknown = parser.parse_known_args(argv)
return args, unknown
def main(argv: Optional[list[str]] = None) -> None:
args, extra_args = parse_args(argv)
# 如果指定了 non-interactive则不能进入交互式菜单
if args.non_interactive and args.interactive:
logger.error("不能同时指定 --interactive 与 --non-interactive请二选一。")
sys.exit(1)
# 没有指定 action 或显式要求交互 -> 进入菜单
if args.interactive or not args.action:
interactive_loop()
return
# 在非交互模式下,将 --non-interactive 透传给子脚本,避免其内部出现 input() 交互
if args.non_interactive:
extra_args = ["--non-interactive"] + extra_args
# 非交互模式:直接执行指定操作
run_action(args.action, extra_args=extra_args)
if __name__ == "__main__":
main()

View File

@@ -1,9 +1,9 @@
import os
from pathlib import Path
import sys # 新增系统模块导入
from src.chat.knowledge.utils.hash import get_sha256
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.chat.knowledge.utils.hash import get_sha256
from src.common.logger import get_logger
logger = get_logger("lpmm")
@@ -59,10 +59,11 @@ def load_raw_data() -> tuple[list[str], list[str]]:
- raw_data: 原始数据列表
- sha256_list: 原始数据的SHA256集合
"""
raw_data = _process_multi_files()
raw_paragraphs = _process_multi_files()
sha256_list = []
sha256_set = set()
for item in raw_data:
raw_data: list[str] = []
for item in raw_paragraphs:
if not isinstance(item, str):
logger.warning(f"数据类型错误:{item}")
continue

View File

@@ -0,0 +1,66 @@
import os
import sys
try:
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
except Exception:
pass
# 确保能导入 src.*
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.knowledge import lpmm_start_up, get_qa_manager
logger = get_logger("refresh_lpmm_knowledge")
def main() -> None:
logger.info("开始刷新 LPMM 知识库(重新加载向量库与 KG...")
if not global_config.lpmm_knowledge.enable:
logger.warning(
"当前配置中 lpmm_knowledge.enable = false本次仅刷新磁盘数据与内存结构"
"但聊天侧如未启用 LPMM 仍不会在问答中使用知识库。"
)
# 调用标准启动逻辑,内部会加载 data/embedding 与 data/rag
lpmm_start_up()
qa_manager = get_qa_manager()
if qa_manager is None:
logger.error("刷新后 qa_manager 仍为 None请检查是否已经成功导入过 LPMM 知识库。")
return
# 简要输出当前知识库规模,方便人工确认
embed_manager = qa_manager.embed_manager
kg_manager = qa_manager.kg_manager
para_vec = len(embed_manager.paragraphs_embedding_store.store)
ent_vec = len(embed_manager.entities_embedding_store.store)
rel_vec = len(embed_manager.relation_embedding_store.store)
nodes = len(kg_manager.graph.get_node_list())
edges = len(kg_manager.graph.get_edge_list())
logger.info("LPMM 知识库刷新完成,当前规模:")
logger.info(
"段落向量=%d, 实体向量=%d, 关系向量=%d, KG节点=%d, KG边=%d",
para_vec,
ent_vec,
rel_vec,
nodes,
edges,
)
print("\n[REFRESH] 刷新完成,请注意:")
print("- 本脚本是在独立进程内执行的,用于验证磁盘数据可以正常加载。")
print("- 若主程序已在运行且未在内部调用 lpmm_start_up() 重新初始化,仍需重启或新增管理入口来热刷新。")
print("- 如果不清楚 lpmm_start_up 是什么,只需要重启主程序即可。")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,122 @@
import argparse
import asyncio
import os
import sys
from typing import List, Dict, Any, Optional
# 强制使用 utf-8避免控制台编码报错影响 Embedding 加载
try:
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
except Exception:
pass
# 确保能导入 src.*
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.knowledge import lpmm_start_up
from src.memory_system.retrieval_tools.query_lpmm_knowledge import query_lpmm_knowledge
logger = get_logger("test_lpmm_retrieval")
DEFAULT_TEST_CASES: List[Dict[str, Any]] = [
{
"name": "回滚一批知识",
"query": "LPMM是什么?",
"expect_keywords": ["哈希列表", "删除脚本", "OpenIE"],
},
{
"name": "调整 LPMM 检索参数",
"query": "不同用词习惯带来的检索偏差该如何解决",
"expect_keywords": ["bot_config.toml", "lpmm_knowledge", "qa_paragraph_search_top_k"],
},
]
async def run_tests(test_cases: Optional[List[Dict[str, Any]]] = None) -> None:
"""简单测试 LPMM 知识库检索能力"""
if not global_config.lpmm_knowledge.enable:
logger.warning("当前配置中 lpmm_knowledge.enable 为 False检索测试可能直接返回“未启用”。")
logger.info("开始初始化 LPMM 知识库...")
lpmm_start_up()
logger.info("LPMM 知识库初始化完成,开始执行测试用例。")
cases = test_cases if test_cases is not None else DEFAULT_TEST_CASES
for case in cases:
name = case["name"]
query = case["query"]
expect_keywords: List[str] = case.get("expect_keywords", [])
print("\n" + "=" * 60)
print(f"[TEST] {name}")
print(f"[Q] {query}")
result = await query_lpmm_knowledge(query, limit=3)
print("\n[RAW RESULT]")
print(result)
status = "UNKNOWN"
hit_keywords: List[str] = []
if isinstance(result, str):
if "未启用" in result or "未初始化" in result or "查询失败" in result:
status = "ERROR"
elif "未找到与" in result:
status = "NO_HIT"
else:
if expect_keywords:
hit_keywords = [kw for kw in expect_keywords if kw in result]
status = "PASS" if hit_keywords else "WARN"
else:
status = "PASS"
print("\n[CHECK]")
print(f"Status: {status}")
if expect_keywords:
print(f"Expected keywords: {expect_keywords}")
print(f"Hit keywords: {hit_keywords}")
print("\n" + "=" * 60)
print("LPMM 检索测试完成。请根据每条用例的 Status 和命中关键词判断检索效果是否符合预期。")
def main() -> None:
parser = argparse.ArgumentParser(
description=(
"测试 LPMM 知识库检索能力。\n"
"如不提供参数,则执行内置的默认用例;\n"
"也可以通过 --query 与 --expect-keyword 自定义一条测试用例。"
)
)
parser.add_argument(
"--query",
help="自定义测试问题(单条)。提供该参数时,将仅运行这一条用例。",
)
parser.add_argument(
"--expect-keyword",
action="append",
help="期望在检索结果中出现的关键字,可重复多次指定;仅在提供 --query 时生效。",
)
args = parser.parse_args()
if args.query:
custom_case = {
"name": "custom",
"query": args.query,
"expect_keywords": args.expect_keyword or [],
}
asyncio.run(run_tests([custom_case]))
else:
asyncio.run(run_tests())
if __name__ == "__main__":
main()