fix:校验 JSON 导入并添加警告

为 JSON 导入载荷增加更健壮的校验,避免将类似哈希的十六进制 token 误识别为内容/实体/关系
在 import_payloads 中引入 ImportPayloadValidationError、is_probable_hash_token、normalize_entity_import_item 和 normalize_relation_import_item;同时收紧段落校验(对空内容或类似哈希的内容进行抛错/跳过)
更新 web_import_manager,按文件收集并追加警告(warning_count、warnings),跳过写入类似哈希的条目,限制保留警告数量,并透传 _build_json_units 的构建警告。
更新 process_knowledge 脚本以复用相同校验逻辑,对实体/关系增加哈希防护并跳过相关项,同时记录被跳过项的汇总日志。
更新 IMPORT_GUIDE.md,补充 JSON 导入约束与任务警告字段。另在 query_memory 中增加回退逻辑:当按人物过滤的检索无结果时,自动降级为关键词检索,并在工具结果中附带回退元数据
This commit is contained in:
DawnARC
2026-04-06 20:48:47 +08:00
parent 526fc9b763
commit 0cfaa15988
5 changed files with 555 additions and 157 deletions

View File

@@ -10,21 +10,22 @@
5. 更新 manifest
"""
import sys
import os
import json
import asyncio
import time
import random
import hashlib
import tomlkit
import argparse
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any, Optional
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn
from pathlib import Path
from typing import Any, Dict, List, Optional
from rich.console import Console
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import argparse
import asyncio
import hashlib
import json
import os
import random
import sys
import time
import tomlkit
console = Console()
@@ -89,8 +90,14 @@ try:
resolve_stored_knowledge_type = storage_module.resolve_stored_knowledge_type
select_import_strategy = storage_module.select_import_strategy
from A_memorix.core.utils.import_payloads import (
ImportPayloadValidationError,
is_probable_hash_token,
normalize_entity_import_item,
normalize_paragraph_import_item,
normalize_relation_import_item,
)
from A_memorix.core.utils.time_parser import normalize_time_meta
from A_memorix.core.utils.import_payloads import normalize_paragraph_import_item
from A_memorix.core.strategies.base import BaseStrategy, ProcessedChunk, KnowledgeType as StratKnowledgeType
from A_memorix.core.strategies.narrative import NarrativeStrategy
from A_memorix.core.strategies.factual import FactualStrategy
@@ -603,11 +610,18 @@ Chat paragraph:
# Re-use existing methods
async def _add_entity_with_vector(self, name: str, source_paragraph: Optional[str] = None) -> str:
# Same as before
hash_value = self.metadata_store.add_entity(name, source_paragraph=source_paragraph)
self.graph_store.add_nodes([name])
# 最后一道守卫:防止旁路把 hash 写入实体名
entity_name = str(name or "").strip()
if not entity_name:
return ""
if is_probable_hash_token(entity_name):
logger.warning(f"脚本导入跳过疑似哈希实体: {entity_name[:32]}")
return ""
hash_value = self.metadata_store.add_entity(entity_name, source_paragraph=source_paragraph)
self.graph_store.add_nodes([entity_name])
try:
emb = await self.embedding_manager.encode(name)
emb = await self.embedding_manager.encode(entity_name)
try:
self.vector_store.add(emb.reshape(1, -1), [hash_value])
except ValueError: pass
@@ -632,13 +646,35 @@ Chat paragraph:
async def _import_to_db(self, data: Dict, progress_callback=None):
# Same logic, but ensure robust
warning_count = 0
def append_warning(message: str) -> None:
nonlocal warning_count
warning_count += 1
logger.warning(message)
with self.graph_store.batch_update():
for item in data.get("paragraphs", []):
paragraph = normalize_paragraph_import_item(
item,
default_source="script",
)
for paragraph_index, item in enumerate(data.get("paragraphs", [])):
try:
paragraph = normalize_paragraph_import_item(
item,
default_source="script",
)
except ImportPayloadValidationError as exc:
append_warning(
f"脚本导入跳过段落[{paragraph_index}]{exc} (code={exc.code})"
)
if progress_callback:
progress_callback(1)
continue
content = paragraph["content"]
if is_probable_hash_token(content):
append_warning(f"脚本导入跳过段落[{paragraph_index}]:段落内容疑似哈希值")
if progress_callback:
progress_callback(1)
continue
source = paragraph["source"]
k_type_val = paragraph["knowledge_type"]
@@ -658,44 +694,109 @@ Chat paragraph:
para_entities = paragraph["entities"]
for entity in para_entities:
if entity:
await self._add_entity_with_vector(entity, source_paragraph=h_val)
name = normalize_entity_import_item(entity)
if not name:
append_warning(f"脚本导入跳过段落[{paragraph_index}]中的实体:无效名称或疑似哈希值")
continue
await self._add_entity_with_vector(name, source_paragraph=h_val)
para_relations = paragraph["relations"]
for rel in para_relations:
s, p, o = rel.get("subject"), rel.get("predicate"), rel.get("object")
if s and p and o:
await self._add_entity_with_vector(s, source_paragraph=h_val)
await self._add_entity_with_vector(o, source_paragraph=h_val)
confidence = float(rel.get("confidence", 1.0) or 1.0)
rel_meta = rel.get("metadata", {})
write_vector = self._should_write_relation_vectors()
if self.relation_write_service is not None:
await self.relation_write_service.upsert_relation_with_vector(
subject=s,
predicate=p,
obj=o,
confidence=confidence,
source_paragraph=h_val,
metadata=rel_meta if isinstance(rel_meta, dict) else {},
write_vector=write_vector,
)
else:
rel_hash = self.metadata_store.add_relation(
s,
p,
o,
confidence=confidence,
source_paragraph=h_val,
metadata=rel_meta if isinstance(rel_meta, dict) else {},
)
self.graph_store.add_edges([(s, o)], relation_hashes=[rel_hash])
try:
self.metadata_store.set_relation_vector_state(rel_hash, "none")
except Exception:
pass
if progress_callback: progress_callback(1)
normalized_relation = normalize_relation_import_item(rel)
if normalized_relation is None:
append_warning(f"脚本导入跳过段落[{paragraph_index}]中的关系:字段无效或疑似哈希值")
continue
s = normalized_relation["subject"]
p = normalized_relation["predicate"]
o = normalized_relation["object"]
await self._add_entity_with_vector(s, source_paragraph=h_val)
await self._add_entity_with_vector(o, source_paragraph=h_val)
confidence = float(rel.get("confidence", 1.0) or 1.0) if isinstance(rel, dict) else 1.0
rel_meta = rel.get("metadata", {}) if isinstance(rel, dict) else {}
write_vector = self._should_write_relation_vectors()
if self.relation_write_service is not None:
await self.relation_write_service.upsert_relation_with_vector(
subject=s,
predicate=p,
obj=o,
confidence=confidence,
source_paragraph=h_val,
metadata=rel_meta if isinstance(rel_meta, dict) else {},
write_vector=write_vector,
)
else:
rel_hash = self.metadata_store.add_relation(
s,
p,
o,
confidence=confidence,
source_paragraph=h_val,
metadata=rel_meta if isinstance(rel_meta, dict) else {},
)
self.graph_store.add_edges([(s, o)], relation_hashes=[rel_hash])
try:
self.metadata_store.set_relation_vector_state(rel_hash, "none")
except Exception:
pass
if progress_callback:
progress_callback(1)
for entity_index, raw_entity in enumerate(data.get("entities", []) or []):
entity_name = normalize_entity_import_item(raw_entity)
if not entity_name:
append_warning(f"脚本导入跳过顶层实体[{entity_index}]:无效名称或疑似哈希值")
continue
await self._add_entity_with_vector(entity_name)
for relation_index, raw_relation in enumerate(data.get("relations", []) or []):
relation = normalize_relation_import_item(raw_relation)
if relation is None:
append_warning(f"脚本导入跳过顶层关系[{relation_index}]:字段无效或疑似哈希值")
continue
subject = relation["subject"]
predicate = relation["predicate"]
obj = relation["object"]
await self._add_entity_with_vector(subject)
await self._add_entity_with_vector(obj)
confidence = (
float(raw_relation.get("confidence", 1.0) or 1.0)
if isinstance(raw_relation, dict)
else 1.0
)
rel_meta = raw_relation.get("metadata", {}) if isinstance(raw_relation, dict) else {}
write_vector = self._should_write_relation_vectors()
if self.relation_write_service is not None:
await self.relation_write_service.upsert_relation_with_vector(
subject=subject,
predicate=predicate,
obj=obj,
confidence=confidence,
source_paragraph="",
metadata=rel_meta if isinstance(rel_meta, dict) else {},
write_vector=write_vector,
)
else:
rel_hash = self.metadata_store.add_relation(
subject,
predicate,
obj,
confidence=confidence,
source_paragraph="",
metadata=rel_meta if isinstance(rel_meta, dict) else {},
)
self.graph_store.add_edges([(subject, obj)], relation_hashes=[rel_hash])
try:
self.metadata_store.set_relation_vector_state(rel_hash, "none")
except Exception:
pass
if warning_count > 0:
logger.warning(f"脚本导入完成,跳过异常项 {warning_count}")
async def close(self):
if self.metadata_store: self.metadata_store.close()