LPMM 知识库删除能力与自检脚本增强(附关键健壮性修复)

为 LPMM 新增安全可控的删除能力:
KGManager.delete_paragraphs 支持按段落/实体哈希删除图节点及关联边,可选清理孤立实体,并从图中重建元数据
统一删除脚本 scripts/delete_lpmm_items.py,支持按批次(OpenIE 文件)、哈希文件、原始文本段落、关键字搜索进行删除,内置 dry-run 和最大节点数保护
新增自检与回归脚本:
scripts/inspect_lpmm_batch.py / scripts/inspect_lpmm_global.py 用于批次级和全局状态检查
scripts/test_lpmm_retrieval.py 一键初始化 LPMM 并用固定问题测试检索效果。
健壮性与性能保护:
在 KGManager.kg_search 中对 ent_appear_cnt 缺失增加兜底,避免实体权重计算时的 KeyError。
增加同义实体数量限制与 PPR 节点/关系阈值,必要时自动退回纯向量检索
文档补充:
docs-src/lpmm_user_guide.md:面向零基础用户的导入 / 删除 / 自检脚本使用指南
docs-src/lpmm_parameters_guide.md:[lpmm_knowledge] 关键参数说明与简单调参建议
This commit is contained in:
陈曦
2025-11-27 13:20:12 +08:00
parent fa4555197d
commit 1383caf249
9 changed files with 1376 additions and 5 deletions

View File

@@ -0,0 +1,360 @@
import argparse
import sys
from pathlib import Path
from typing import List, Tuple, Dict, Any
import json
import os
# 强制使用 utf-8避免控制台编码报错
try:
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
except Exception:
pass
# 确保能找到 src 包
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.chat.knowledge.embedding_store import EmbeddingManager
from src.chat.knowledge.kg_manager import KGManager
from src.common.logger import get_logger
from src.chat.knowledge.utils.hash import get_sha256
logger = get_logger("delete_lpmm_items")
def read_hashes(file_path: Path) -> List[str]:
"""读取哈希列表,跳过空行"""
hashes: List[str] = []
for line in file_path.read_text(encoding="utf-8").splitlines():
val = line.strip()
if not val:
continue
hashes.append(val)
return hashes
def read_openie_hashes(file_path: Path) -> List[str]:
"""从 OpenIE JSON 中提取 idx 作为段落哈希"""
data: Dict[str, Any] = json.loads(file_path.read_text(encoding="utf-8"))
docs = data.get("docs", []) if isinstance(data, dict) else []
hashes: List[str] = []
for doc in docs:
idx = doc.get("idx") if isinstance(doc, dict) else None
if isinstance(idx, str) and idx.strip():
hashes.append(idx.strip())
return hashes
def normalize_paragraph_keys(raw_hashes: List[str]) -> Tuple[List[str], List[str]]:
"""将输入规范为完整键和纯哈希两份列表"""
keys: List[str] = []
hashes: List[str] = []
for h in raw_hashes:
if h.startswith("paragraph-"):
keys.append(h)
hashes.append(h.replace("paragraph-", "", 1))
else:
keys.append(f"paragraph-{h}")
hashes.append(h)
return keys, hashes
def main():
parser = argparse.ArgumentParser(description="Delete paragraphs from LPMM knowledge base (vectors + graph).")
parser.add_argument("--hash-file", help="文本文件路径,每行一个 paragraph 哈希或带前缀键")
parser.add_argument("--openie-file", help="OpenIE 输出文件JSON将其 docs.idx 作为待删段落哈希")
parser.add_argument("--raw-file", help="原始 txt 语料文件(按空行分段),可结合 --raw-index 使用")
parser.add_argument(
"--raw-index",
help="在 --raw-file 中要删除的段落索引1 基,支持逗号分隔,例如 1,3",
)
parser.add_argument("--search-text", help="在当前段落库中按子串搜索匹配段落并交互选择删除")
parser.add_argument(
"--search-limit",
type=int,
default=10,
help="--search-text 模式下最多展示的候选段落数量",
)
parser.add_argument("--delete-entities", action="store_true", help="同时删除 OpenIE 文件中的实体节点/嵌入")
parser.add_argument("--delete-relations", action="store_true", help="同时删除 OpenIE 文件中的关系嵌入")
parser.add_argument("--remove-orphan-entities", action="store_true", help="删除删除后孤立的实体节点")
parser.add_argument("--dry-run", action="store_true", help="仅预览将删除的项,不实际修改")
parser.add_argument("--yes", action="store_true", help="跳过交互确认,直接执行删除(谨慎使用)")
parser.add_argument(
"--max-delete-nodes",
type=int,
default=2000,
help="单次最大允许删除的节点数量(段落+实体),超过则需要显式确认或调整该参数",
)
args = parser.parse_args()
# 至少需要一种来源
if not (args.hash_file or args.openie_file or args.raw_file or args.search_text):
logger.error("必须指定 --hash-file / --openie-file / --raw-file / --search-text 之一")
sys.exit(1)
raw_hashes: List[str] = []
raw_entities: List[str] = []
raw_relations: List[str] = []
if args.hash_file:
hash_file = Path(args.hash_file)
if not hash_file.exists():
logger.error(f"哈希文件不存在: {hash_file}")
sys.exit(1)
raw_hashes.extend(read_hashes(hash_file))
if args.openie_file:
openie_path = Path(args.openie_file)
if not openie_path.exists():
logger.error(f"OpenIE 文件不存在: {openie_path}")
sys.exit(1)
# 段落
raw_hashes.extend(read_openie_hashes(openie_path))
# 实体/关系(实体同时包含 extracted_entities 与三元组主语/宾语,以匹配 KG 构图逻辑)
try:
data = json.loads(openie_path.read_text(encoding="utf-8"))
docs = data.get("docs", []) if isinstance(data, dict) else []
for doc in docs:
if not isinstance(doc, dict):
continue
ents = doc.get("extracted_entities", [])
if isinstance(ents, list):
raw_entities.extend([e for e in ents if isinstance(e, str)])
triples = doc.get("extracted_triples", [])
if isinstance(triples, list):
for t in triples:
if isinstance(t, list) and len(t) == 3:
subj, _, obj = t
if isinstance(subj, str):
raw_entities.append(subj)
if isinstance(obj, str):
raw_entities.append(obj)
raw_relations.append(str(tuple(t)))
except Exception as e:
logger.error(f"读取 OpenIE 文件失败: {e}")
sys.exit(1)
# 从原始 txt 语料按段落索引选择删除
if args.raw_file:
raw_path = Path(args.raw_file)
if not raw_path.exists():
logger.error(f"原始语料文件不存在: {raw_path}")
sys.exit(1)
text = raw_path.read_text(encoding="utf-8")
paragraphs: List[str] = []
buf = []
for line in text.splitlines():
if line.strip() == "":
if buf:
paragraphs.append("\n".join(buf).strip())
buf = []
else:
buf.append(line)
if buf:
paragraphs.append("\n".join(buf).strip())
if not paragraphs:
logger.error(f"原始语料文件 {raw_path} 中没有解析到任何段落")
sys.exit(1)
if not args.raw_index:
logger.info(f"{raw_path} 共解析出 {len(paragraphs)} 个段落,请通过 --raw-index 指定要删除的段落,例如 --raw-index 1,3")
sys.exit(1)
# 解析索引列表1-based
try:
idx_list = [int(x.strip()) for x in str(args.raw_index).split(",") if x.strip()]
except ValueError:
logger.error(f"--raw-index 解析失败: {args.raw_index}")
sys.exit(1)
for idx in idx_list:
if idx < 1 or idx > len(paragraphs):
logger.error(f"--raw-index 包含无效索引 {idx}(有效范围 1~{len(paragraphs)}")
sys.exit(1)
logger.info("根据原始语料选择段落:")
for idx in idx_list:
para = paragraphs[idx - 1]
h = get_sha256(para)
logger.info(f"- 第 {idx}hash={h},内容预览:{para[:80]}")
raw_hashes.append(h)
# 在现有库中按子串搜索候选段落并交互选择
if args.search_text:
search_text = args.search_text.strip()
if not search_text:
logger.error("--search-text 不能为空")
sys.exit(1)
logger.info(f"正在根据关键字在现有段落库中搜索:{search_text!r}")
em_search = EmbeddingManager()
try:
em_search.load_from_file()
except Exception as e:
logger.error(f"加载嵌入库失败,无法使用 --search-text 功能: {e}")
sys.exit(1)
candidates = []
for key, item in em_search.paragraphs_embedding_store.store.items():
if search_text in item.str:
candidates.append((key, item.str))
if len(candidates) >= args.search_limit:
break
if not candidates:
logger.info("未在现有段落库中找到包含该关键字的段落")
else:
logger.info("找到以下候选段落(输入序号选择要删除的条目,可用逗号分隔,多选):")
for i, (key, text) in enumerate(candidates, start=1):
logger.info(f"{i}. {key} | {text[:80]}")
choice = input("请输入要删除的序号列表(如 1,3或直接回车取消").strip()
if choice:
try:
idxs = [int(x.strip()) for x in choice.split(",") if x.strip()]
except ValueError:
logger.error("输入的序号列表无法解析,已取消 --search-text 删除")
else:
for i in idxs:
if 1 <= i <= len(candidates):
key, _ = candidates[i - 1]
# key 已是完整的 paragraph-xxx
if key.startswith("paragraph-"):
raw_hashes.append(key.split("paragraph-", 1)[1])
else:
logger.warning(f"忽略无效序号: {i}")
# 去重但保持顺序
seen = set()
raw_hashes = [h for h in raw_hashes if not (h in seen or seen.add(h))]
if not raw_hashes:
logger.error("未读取到任何待删哈希,无操作")
sys.exit(1)
keys, pg_hashes = normalize_paragraph_keys(raw_hashes)
ent_hashes: List[str] = []
rel_hashes: List[str] = []
if args.delete_entities and raw_entities:
ent_hashes = [get_sha256(e) for e in raw_entities]
if args.delete_relations and raw_relations:
rel_hashes = [get_sha256(r) for r in raw_relations]
logger.info("=== 删除操作预备 ===")
logger.info("请确保已备份 data/embedding 与 data/rag必要时可使用 --dry-run 预览")
logger.info(f"待删除段落数量: {len(keys)}")
logger.info(f"示例: {keys[:5]}")
if ent_hashes:
logger.info(f"待删除实体数量: {len(ent_hashes)}")
if rel_hashes:
logger.info(f"待删除关系数量: {len(rel_hashes)}")
total_nodes_to_delete = len(pg_hashes) + (len(ent_hashes) if args.delete_entities else 0)
logger.info(f"本次预计删除节点总数(段落+实体): {total_nodes_to_delete}")
if args.dry_run:
logger.info("dry-run 模式,未执行删除")
return
# 大批次删除保护
if total_nodes_to_delete > args.max_delete_nodes and not args.yes:
logger.error(
f"本次预计删除节点 {total_nodes_to_delete} 个,超过阈值 {args.max_delete_nodes}"
" 为避免误删,请降低批次规模或使用 --max-delete-nodes 调整阈值,并加上 --yes 明确确认。"
)
sys.exit(1)
# 交互确认
if not args.yes:
confirm = input("确认删除上述数据?输入大写 YES 以继续,其他任意键取消: ").strip()
if confirm != "YES":
logger.info("用户取消删除操作")
return
# 加载嵌入与图
embed_manager = EmbeddingManager()
kg_manager = KGManager()
try:
embed_manager.load_from_file()
kg_manager.load_from_file()
except Exception as e:
logger.error(f"加载现有知识库失败: {e}")
sys.exit(1)
# 记录删除前全局统计,便于对比
before_para_vec = len(embed_manager.paragraphs_embedding_store.store)
before_ent_vec = len(embed_manager.entities_embedding_store.store)
before_rel_vec = len(embed_manager.relation_embedding_store.store)
before_nodes = len(kg_manager.graph.get_node_list())
before_edges = len(kg_manager.graph.get_edge_list())
logger.info(
f"删除前统计: 段落向量={before_para_vec}, 实体向量={before_ent_vec}, 关系向量={before_rel_vec}, "
f"KG节点={before_nodes}, KG边={before_edges}"
)
# 删除向量
deleted, skipped = embed_manager.paragraphs_embedding_store.delete_items(keys)
embed_manager.stored_pg_hashes = set(embed_manager.paragraphs_embedding_store.store.keys())
logger.info(f"段落向量删除完成,删除: {deleted}, 跳过: {skipped}")
ent_deleted = ent_skipped = rel_deleted = rel_skipped = 0
if ent_hashes:
ent_keys = [f"entity-{h}" for h in ent_hashes]
ent_deleted, ent_skipped = embed_manager.entities_embedding_store.delete_items(ent_keys)
logger.info(f"实体向量删除完成,删除: {ent_deleted}, 跳过: {ent_skipped}")
if rel_hashes:
rel_keys = [f"relation-{h}" for h in rel_hashes]
rel_deleted, rel_skipped = embed_manager.relation_embedding_store.delete_items(rel_keys)
logger.info(f"关系向量删除完成,删除: {rel_deleted}, 跳过: {rel_skipped}")
# 删除图节点/边
kg_result = kg_manager.delete_paragraphs(
pg_hashes,
ent_hashes=ent_hashes if args.delete_entities else None,
remove_orphan_entities=args.remove_orphan_entities,
)
logger.info(
f"KG 删除完成,删除: {kg_result.get('deleted', 0)}, 跳过: {kg_result.get('skipped', 0)}, "
f"孤立实体清理: {kg_result.get('orphan_removed', 0)}"
)
# 重建索引并保存
logger.info("重建 Faiss 索引并保存嵌入文件...")
embed_manager.rebuild_faiss_index()
embed_manager.save_to_file()
logger.info("保存 KG 数据...")
kg_manager.save_to_file()
# 删除后统计
after_para_vec = len(embed_manager.paragraphs_embedding_store.store)
after_ent_vec = len(embed_manager.entities_embedding_store.store)
after_rel_vec = len(embed_manager.relation_embedding_store.store)
after_nodes = len(kg_manager.graph.get_node_list())
after_edges = len(kg_manager.graph.get_edge_list())
logger.info(
"删除后统计: 段落向量=%d(%+d), 实体向量=%d(%+d), 关系向量=%d(%+d), KG节点=%d(%+d), KG边=%d(%+d)"
% (
after_para_vec,
after_para_vec - before_para_vec,
after_ent_vec,
after_ent_vec - before_ent_vec,
after_rel_vec,
after_rel_vec - before_rel_vec,
after_nodes,
after_nodes - before_nodes,
after_edges,
after_edges - before_edges,
)
)
logger.info("删除流程完成")
if __name__ == "__main__":
main()

View File

@@ -131,6 +131,13 @@ def main(): # sourcery skip: comprehension-to-generator, extract-method
logger.info("用户取消操作")
print("操作已取消")
sys.exit(1)
# 友好提示:说明“网络错误(可重试)”日志属于正常自动重试行为,避免用户误以为任务失败
print(
"\n提示:在提取过程中,如果看到模型出现“网络错误(可重试)”等日志,"
"表示系统正在自动重试请求,一般不会影响整体导入结果,请耐心等待即可。\n"
)
print("\n" + "=" * 40 + "\n")
ensure_dirs() # 确保目录存在
logger.info("--------进行信息提取--------\n")

View File

@@ -0,0 +1,132 @@
import argparse
import json
import os
import sys
from pathlib import Path
from typing import List, Tuple
# 确保能导入 src.*
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.chat.knowledge.utils.hash import get_sha256
from src.chat.knowledge.embedding_store import EmbeddingManager
from src.chat.knowledge.kg_manager import KGManager
from src.common.logger import get_logger
logger = get_logger("inspect_lpmm_batch")
def load_openie_hashes(path: Path) -> Tuple[List[str], List[str], List[str]]:
"""从 OpenIE JSON 中提取段落 / 实体 / 关系的哈希
注意:实体既包括 extracted_entities 中的条目,也包括三元组中的主语/宾语,
以与 KG 构图逻辑保持一致。
"""
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
pg_hashes: List[str] = []
ent_hashes: List[str] = []
rel_hashes: List[str] = []
for doc in data.get("docs", []):
if not isinstance(doc, dict):
continue
idx = doc.get("idx")
if isinstance(idx, str) and idx.strip():
pg_hashes.append(idx.strip())
ents = doc.get("extracted_entities", [])
if isinstance(ents, list):
for e in ents:
if isinstance(e, str):
ent_hashes.append(get_sha256(e))
triples = doc.get("extracted_triples", [])
if isinstance(triples, list):
for t in triples:
if isinstance(t, list) and len(t) == 3:
# 主语/宾语作为实体参与构图
subj, _, obj = t
if isinstance(subj, str):
ent_hashes.append(get_sha256(subj))
if isinstance(obj, str):
ent_hashes.append(get_sha256(obj))
rel_hashes.append(get_sha256(str(tuple(t))))
# 去重但保留顺序
def unique(seq: List[str]) -> List[str]:
seen = set()
return [x for x in seq if not (x in seen or seen.add(x))]
return unique(pg_hashes), unique(ent_hashes), unique(rel_hashes)
def main() -> None:
parser = argparse.ArgumentParser(
description="检查指定 OpenIE 文件对应批次在当前向量库与 KG 中的存在情况(用于验证删除效果)。"
)
parser.add_argument("--openie-file", required=True, help="OpenIE 输出 JSON 文件路径")
args = parser.parse_args()
openie_path = Path(args.openie_file)
if not openie_path.exists():
logger.error(f"OpenIE 文件不存在: {openie_path}")
sys.exit(1)
pg_hashes, ent_hashes, rel_hashes = load_openie_hashes(openie_path)
logger.info(
f"{openie_path.name} 解析到 段落 {len(pg_hashes)} 条,实体 {len(ent_hashes)} 个,关系 {len(rel_hashes)}"
)
# 加载当前嵌入与 KG
em = EmbeddingManager()
kg = KGManager()
try:
em.load_from_file()
kg.load_from_file()
except Exception as e:
logger.error(f"加载当前知识库失败: {e}")
sys.exit(1)
graph_nodes = set(kg.graph.get_node_list())
# 检查段落
pg_keys = [f"paragraph-{h}" for h in pg_hashes]
pg_in_vec = sum(1 for k in pg_keys if k in em.paragraphs_embedding_store.store)
pg_in_kg = sum(1 for k in pg_keys if k in graph_nodes)
# 检查实体
ent_keys = [f"entity-{h}" for h in ent_hashes]
ent_in_vec = sum(1 for k in ent_keys if k in em.entities_embedding_store.store)
ent_in_kg = sum(1 for k in ent_keys if k in graph_nodes)
# 检查关系(只针对向量库)
rel_keys = [f"relation-{h}" for h in rel_hashes]
rel_in_vec = sum(1 for k in rel_keys if k in em.relation_embedding_store.store)
print("==== 批次存在情况(删除前/后对比用) ====")
print(f"段落: 总计 {len(pg_keys)}, 向量库剩余 {pg_in_vec}, KG 中剩余 {pg_in_kg}")
print(f"实体: 总计 {len(ent_keys)}, 向量库剩余 {ent_in_vec}, KG 中剩余 {ent_in_kg}")
print(f"关系: 总计 {len(rel_keys)}, 向量库剩余 {rel_in_vec}")
# 打印少量仍存在的样例,便于检查内容是否正常
sample_pg = [k for k in pg_keys if k in graph_nodes][:3]
if sample_pg:
print("\n仍在 KG 中的段落节点示例:")
for k in sample_pg:
nd = kg.graph[k]
content = nd["content"] if "content" in nd else k
print(f"- {k}: {content[:80]}")
sample_ent = [k for k in ent_keys if k in graph_nodes][:3]
if sample_ent:
print("\n仍在 KG 中的实体节点示例:")
for k in sample_ent:
nd = kg.graph[k]
content = nd["content"] if "content" in nd else k
print(f"- {k}: {content[:80]}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,71 @@
import os
import sys
from typing import Set
# 保证可以导入 src.*
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.chat.knowledge.embedding_store import EmbeddingManager
from src.chat.knowledge.kg_manager import KGManager
from src.common.logger import get_logger
logger = get_logger("inspect_lpmm_global")
def main() -> None:
"""检查当前整库(所有批次)的向量与 KG 状态,用于观察删除对剩余数据的影响。"""
em = EmbeddingManager()
kg = KGManager()
try:
em.load_from_file()
kg.load_from_file()
except Exception as e:
logger.error(f"加载当前知识库失败: {e}")
sys.exit(1)
# 向量库统计
para_cnt = len(em.paragraphs_embedding_store.store)
ent_cnt_vec = len(em.entities_embedding_store.store)
rel_cnt_vec = len(em.relation_embedding_store.store)
# KG 统计
nodes = kg.graph.get_node_list()
edges = kg.graph.get_edge_list()
node_set: Set[str] = set(nodes)
para_nodes = [n for n in nodes if n.startswith("paragraph-")]
ent_nodes = [n for n in nodes if n.startswith("entity-")]
print("==== 向量库统计 ====")
print(f"段落向量条数: {para_cnt}")
print(f"实体向量条数: {ent_cnt_vec}")
print(f"关系向量条数: {rel_cnt_vec}")
print("\n==== KG 图统计 ====")
print(f"节点总数: {len(nodes)}")
print(f"边总数: {len(edges)}")
print(f"段落节点数: {len(para_nodes)}")
print(f"实体节点数: {len(ent_nodes)}")
# ent_appear_cnt 状态
ent_cnt_meta = len(kg.ent_appear_cnt)
print(f"\n实体计数表条目数: {ent_cnt_meta}")
# 抽样查看剩余段落/实体内容
print("\n==== 剩余段落示例(最多 3 条) ====")
for nid in para_nodes[:3]:
nd = kg.graph[nid]
content = nd["content"] if "content" in nd else nid
print(f"- {nid}: {content[:80]}")
print("\n==== 剩余实体示例(最多 5 条) ====")
for nid in ent_nodes[:5]:
nd = kg.graph[nid]
content = nd["content"] if "content" in nd else nid
print(f"- {nid}: {content[:80]}")
if __name__ == "__main__":
main()

View File

@@ -1,9 +1,9 @@
import os
from pathlib import Path
import sys # 新增系统模块导入
from src.chat.knowledge.utils.hash import get_sha256
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.chat.knowledge.utils.hash import get_sha256
from src.common.logger import get_logger
logger = get_logger("lpmm")
@@ -59,10 +59,11 @@ def load_raw_data() -> tuple[list[str], list[str]]:
- raw_data: 原始数据列表
- sha256_list: 原始数据的SHA256集合
"""
raw_data = _process_multi_files()
raw_paragraphs = _process_multi_files()
sha256_list = []
sha256_set = set()
for item in raw_data:
raw_data: list[str] = []
for item in raw_paragraphs:
if not isinstance(item, str):
logger.warning(f"数据类型错误:{item}")
continue

View File

@@ -0,0 +1,93 @@
import asyncio
import os
import sys
from typing import List, Dict, Any
# 强制使用 utf-8避免控制台编码报错影响 Embedding 加载
try:
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
except Exception:
pass
# 确保能导入 src.*
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.knowledge import lpmm_start_up
from src.memory_system.retrieval_tools.query_lpmm_knowledge import query_lpmm_knowledge
logger = get_logger("test_lpmm_retrieval")
TEST_CASES: List[Dict[str, Any]] = [
{
"name": "回滚一批知识",
"query": "LPMM是什么?",
"expect_keywords": ["哈希列表", "删除脚本", "OpenIE"],
},
{
"name": "调整 LPMM 检索参数",
"query": "不同用词习惯带来的检索偏差该如何解决",
"expect_keywords": ["bot_config.toml", "lpmm_knowledge", "qa_paragraph_search_top_k"],
},
]
async def run_tests() -> None:
"""简单测试 LPMM 知识库检索能力"""
if not global_config.lpmm_knowledge.enable:
logger.warning("当前配置中 lpmm_knowledge.enable 为 False检索测试可能直接返回“未启用”。")
logger.info("开始初始化 LPMM 知识库...")
lpmm_start_up()
logger.info("LPMM 知识库初始化完成,开始执行测试用例。")
for case in TEST_CASES:
name = case["name"]
query = case["query"]
expect_keywords: List[str] = case.get("expect_keywords", [])
print("\n" + "=" * 60)
print(f"[TEST] {name}")
print(f"[Q] {query}")
result = await query_lpmm_knowledge(query, limit=3)
print("\n[RAW RESULT]")
print(result)
status = "UNKNOWN"
hit_keywords: List[str] = []
if isinstance(result, str):
if "未启用" in result or "未初始化" in result or "查询失败" in result:
status = "ERROR"
elif "未找到与" in result:
status = "NO_HIT"
else:
if expect_keywords:
hit_keywords = [kw for kw in expect_keywords if kw in result]
status = "PASS" if hit_keywords else "WARN"
else:
status = "PASS"
print("\n[CHECK]")
print(f"Status: {status}")
if expect_keywords:
print(f"Expected keywords: {expect_keywords}")
print(f"Hit keywords: {hit_keywords}")
print("\n" + "=" * 60)
print("LPMM 检索测试完成。请根据每条用例的 Status 和命中关键词判断检索效果是否符合预期。")
def main() -> None:
asyncio.run(run_tests())
if __name__ == "__main__":
main()