Merge pull request #1583 from A-Dawn/r-dev

fix:校验 JSON 导入并添加警告
This commit is contained in:
Dawn ARC
2026-04-06 20:52:54 +08:00
committed by GitHub
5 changed files with 555 additions and 157 deletions

View File

@@ -163,6 +163,34 @@ python src/A_memorix/scripts/audit_vector_consistency.py --json
} }
``` ```
### 3.3 JSON 导入字段约束(`input_mode="json"`
`create_paste/create_upload/create_raw_scan``input_mode="json"` 下,导入内容必须是语义文本,不接受 hash 形态字段作为正文或实体名。
- 段落 `paragraphs[*]`
- 允许字符串(视为 `content`)或对象(必须包含 `content`)。
- `content` 若为空,或为“整串 hex 且长度 32/40/64”的疑似 hash会被跳过并记为 warning。
- 实体 `entities[*]`
- 允许字符串,或对象(仅提取 `name/label/entity` 作为实体名)。
- 无法提取名称、名称为空、名称为疑似 hash 的实体会被跳过。
- 关系 `relations[*]`
- 仅接受对象,且必须包含 `subject/predicate/object`
- 任一字段为空或为疑似 hash 时,该关系会被跳过。
说明:
- “跳过”不会导致任务失败,任务会继续处理其余有效项。
- 仅阻断未来导入;历史库中的旧数据不会自动清理。
### 3.4 任务告警字段
`memory_import_admin``list/get/chunks` 返回中,`task.files[*]` 提供:
- `warning_count`: 文件累计告警数
- `warnings`: 告警明细(仅保留最近若干条)
这两个字段用于区分“导入成功但有跳过项”与“导入失败”,不要把 warning 当作 error 处理。
## 4. 直接写入 Tool非任务化 ## 4. 直接写入 Tool非任务化
若你不需要任务编排,也可以直接调用: 若你不需要任务编排,也可以直接调用:

View File

@@ -3,10 +3,78 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import re
from ..storage import KnowledgeType, resolve_stored_knowledge_type from ..storage import KnowledgeType, resolve_stored_knowledge_type
from .time_parser import normalize_time_meta from .time_parser import normalize_time_meta
_HASH_TOKEN_PATTERN = re.compile(r"^[0-9a-fA-F]+$")
_ENTITY_NAME_KEYS = ("name", "label", "entity")
class ImportPayloadValidationError(ValueError):
"""导入负载校验异常(可用于上层按项跳过并记录告警)。"""
def __init__(self, message: str, *, code: str, field: str = "", value: str = "") -> None:
super().__init__(message)
self.code = code
self.field = field
self.value = value
def is_probable_hash_token(value: Any) -> bool:
"""判断文本是否疑似哈希值hex 串,长度为 32/40/64"""
text = str(value or "").strip()
if len(text) not in {32, 40, 64}:
return False
return bool(_HASH_TOKEN_PATTERN.fullmatch(text))
def normalize_entity_import_item(item: Any) -> Optional[str]:
"""标准化实体导入项。
支持:
- 字符串实体名
- 对象实体(提取 name/label/entity 字段)
"""
if isinstance(item, str):
name = item.strip()
elif isinstance(item, dict):
name = ""
for key in _ENTITY_NAME_KEYS:
candidate = str(item.get(key, "") or "").strip()
if candidate:
name = candidate
break
else:
name = ""
if not name or is_probable_hash_token(name):
return None
return name
def normalize_relation_import_item(item: Any) -> Optional[Dict[str, str]]:
"""标准化关系导入项。"""
if not isinstance(item, dict):
return None
subject = str(item.get("subject", "") or "").strip()
predicate = str(item.get("predicate", "") or "").strip()
obj = str(item.get("object", "") or "").strip()
if not (subject and predicate and obj):
return None
if any(is_probable_hash_token(token) for token in (subject, predicate, obj)):
return None
return {
"subject": subject,
"predicate": predicate,
"object": obj,
}
def _normalize_entities(raw_entities: Any) -> List[str]: def _normalize_entities(raw_entities: Any) -> List[str]:
if not isinstance(raw_entities, list): if not isinstance(raw_entities, list):
@@ -14,7 +82,7 @@ def _normalize_entities(raw_entities: Any) -> List[str]:
out: List[str] = [] out: List[str] = []
seen = set() seen = set()
for item in raw_entities: for item in raw_entities:
name = str(item or "").strip() name = normalize_entity_import_item(item)
if not name: if not name:
continue continue
key = name.lower() key = name.lower()
@@ -30,20 +98,10 @@ def _normalize_relations(raw_relations: Any) -> List[Dict[str, str]]:
return [] return []
out: List[Dict[str, str]] = [] out: List[Dict[str, str]] = []
for item in raw_relations: for item in raw_relations:
if not isinstance(item, dict): relation = normalize_relation_import_item(item)
if relation is None:
continue continue
subject = str(item.get("subject", "")).strip() out.append(relation)
predicate = str(item.get("predicate", "")).strip()
obj = str(item.get("object", "")).strip()
if not (subject and predicate and obj):
continue
out.append(
{
"subject": subject,
"predicate": predicate,
"object": obj,
}
)
return out return out
@@ -55,7 +113,20 @@ def normalize_paragraph_import_item(
"""Normalize one paragraph import item from text/json payloads.""" """Normalize one paragraph import item from text/json payloads."""
if isinstance(item, str): if isinstance(item, str):
content = str(item) content = str(item or "")
if not content.strip():
raise ImportPayloadValidationError(
"段落 content 不能为空",
code="paragraph_content_empty",
field="content",
)
if is_probable_hash_token(content):
raise ImportPayloadValidationError(
"段落 content 疑似哈希值,已跳过",
code="paragraph_content_hash_like",
field="content",
value=content,
)
knowledge_type = resolve_stored_knowledge_type(None, content=content) knowledge_type = resolve_stored_knowledge_type(None, content=content)
return { return {
"content": content, "content": content,
@@ -67,11 +138,26 @@ def normalize_paragraph_import_item(
} }
if not isinstance(item, dict) or "content" not in item: if not isinstance(item, dict) or "content" not in item:
raise ValueError("段落项必须为字符串或包含 content 的对象") raise ImportPayloadValidationError(
"段落项必须为字符串或包含 content 的对象",
code="paragraph_item_invalid",
field="content",
)
content = str(item.get("content", "") or "") content = str(item.get("content", "") or "")
if not content.strip(): if not content.strip():
raise ValueError("段落 content 不能为空") raise ImportPayloadValidationError(
"段落 content 不能为空",
code="paragraph_content_empty",
field="content",
)
if is_probable_hash_token(content):
raise ImportPayloadValidationError(
"段落 content 疑似哈希值,已跳过",
code="paragraph_content_hash_like",
field="content",
value=content,
)
raw_time_meta = { raw_time_meta = {
"event_time": item.get("event_time"), "event_time": item.get("event_time"),

View File

@@ -33,7 +33,13 @@ from ..storage import (
MetadataStore, MetadataStore,
) )
from ..storage.type_detection import looks_like_quote_text from ..storage.type_detection import looks_like_quote_text
from ..utils.import_payloads import normalize_paragraph_import_item from ..utils.import_payloads import (
ImportPayloadValidationError,
is_probable_hash_token,
normalize_entity_import_item,
normalize_paragraph_import_item,
normalize_relation_import_item,
)
from ..utils.runtime_self_check import ensure_runtime_self_check from ..utils.runtime_self_check import ensure_runtime_self_check
from ..utils.time_parser import normalize_time_meta from ..utils.time_parser import normalize_time_meta
from ..storage.knowledge_types import ImportStrategy from ..storage.knowledge_types import ImportStrategy
@@ -77,6 +83,8 @@ CHUNK_STATUS = {
"cancelled", "cancelled",
} }
FILE_WARNING_KEEP_LIMIT = 50
def _now() -> float: def _now() -> float:
return time.time() return time.time()
@@ -219,6 +227,8 @@ class ImportFileRecord:
content_hash: str = "" content_hash: str = ""
retry_chunk_indexes: List[int] = field(default_factory=list) retry_chunk_indexes: List[int] = field(default_factory=list)
retry_mode: str = "" retry_mode: str = ""
warning_count: int = 0
warnings: List[str] = field(default_factory=list)
def to_dict(self, include_chunks: bool = False) -> Dict[str, Any]: def to_dict(self, include_chunks: bool = False) -> Dict[str, Any]:
payload = { payload = {
@@ -241,6 +251,8 @@ class ImportFileRecord:
"content_hash": self.content_hash or "", "content_hash": self.content_hash or "",
"retry_chunk_indexes": list(self.retry_chunk_indexes or []), "retry_chunk_indexes": list(self.retry_chunk_indexes or []),
"retry_mode": self.retry_mode or "", "retry_mode": self.retry_mode or "",
"warning_count": int(self.warning_count),
"warnings": list(self.warnings),
} }
if include_chunks: if include_chunks:
payload["chunks"] = [chunk.to_dict() for chunk in self.chunks] payload["chunks"] = [chunk.to_dict() for chunk in self.chunks]
@@ -1386,6 +1398,7 @@ class ImportTaskManager:
"offset": start, "offset": start,
"limit": size, "limit": size,
"total": len(file_obj.chunks), "total": len(file_obj.chunks),
"file": file_obj.to_dict(include_chunks=False),
"items": [x.to_dict() for x in items], "items": [x.to_dict() for x in items],
} }
@@ -2775,7 +2788,9 @@ class ImportTaskManager:
if task: if task:
task.schema_detected = schema task.schema_detected = schema
task.updated_at = _now() task.updated_at = _now()
units = self._build_json_units(data, file_record.file_id, file_record.name, schema) units, build_warnings = self._build_json_units(data, file_record.file_id, file_record.name, schema)
if build_warnings:
await self._append_file_warnings(task_id, file_record.file_id, build_warnings)
await self._register_json_units(task_id, file_record.file_id, units) await self._register_json_units(task_id, file_record.file_id, units)
await self._set_file_state(task_id, file_record.file_id, "extracting", "extracting") await self._set_file_state(task_id, file_record.file_id, "extracting", "extracting")
@@ -2829,8 +2844,15 @@ class ImportTaskManager:
return "web_json" return "web_json"
raise RuntimeError("不支持的 JSON 格式:需要 paragraphs 或 docs") raise RuntimeError("不支持的 JSON 格式:需要 paragraphs 或 docs")
def _build_json_units(self, data: Any, file_id: str, filename: str, schema: str) -> List[Dict[str, Any]]: def _build_json_units(
self,
data: Any,
file_id: str,
filename: str,
schema: str,
) -> Tuple[List[Dict[str, Any]], List[str]]:
units: List[Dict[str, Any]] = [] units: List[Dict[str, Any]] = []
warnings: List[str] = []
paragraphs: List[Any] = [] paragraphs: List[Any] = []
entities: List[Any] = [] entities: List[Any] = []
relations: List[Any] = [] relations: List[Any] = []
@@ -2867,11 +2889,17 @@ class ImportTaskManager:
} }
paragraphs.append(para_item) paragraphs.append(para_item)
for p in paragraphs: for paragraph_index, p in enumerate(paragraphs):
paragraph = normalize_paragraph_import_item( try:
p, paragraph = normalize_paragraph_import_item(
default_source=f"web_import:{filename}", p,
) default_source=f"web_import:{filename}",
)
except ImportPayloadValidationError as exc:
warnings.append(
f"跳过段落[{paragraph_index}]{exc} (code={exc.code})"
)
continue
units.append( units.append(
{ {
"chunk_id": f"{file_id}_json_{len(units)}", "chunk_id": f"{file_id}_json_{len(units)}",
@@ -2887,38 +2915,51 @@ class ImportTaskManager:
} }
) )
for e in entities: for entity_index, e in enumerate(entities):
name = str(e or "").strip() name = normalize_entity_import_item(e)
if name: if not name:
units.append( raw = str(e or "").strip()
{ warnings.append(
"chunk_id": f"{file_id}_json_{len(units)}", f"跳过实体[{entity_index}]:无效名称或疑似哈希值 ({raw[:80]})"
"kind": "entity",
"name": name,
"chunk_type": "entity",
"preview": name[:120],
}
) )
for r in relations:
if not isinstance(r, dict):
continue continue
s = str(r.get("subject", "")).strip() units.append(
p = str(r.get("predicate", "")).strip() {
o = str(r.get("object", "")).strip() "chunk_id": f"{file_id}_json_{len(units)}",
if s and p and o: "kind": "entity",
units.append( "name": name,
{ "chunk_type": "entity",
"chunk_id": f"{file_id}_json_{len(units)}", "preview": name[:120],
"kind": "relation", }
"subject": s, )
"predicate": p,
"object": o, for relation_index, r in enumerate(relations):
"chunk_type": "relation", relation = normalize_relation_import_item(r)
"preview": f"{s} {p} {o}"[:120], if relation is None:
} if isinstance(r, dict):
raw = (
f"{str(r.get('subject', '')).strip()} | "
f"{str(r.get('predicate', '')).strip()} | "
f"{str(r.get('object', '')).strip()}"
)
else:
raw = str(r or "").strip()
warnings.append(
f"跳过关系[{relation_index}]:无效三元组或疑似哈希值 ({raw[:120]})"
) )
return units continue
units.append(
{
"chunk_id": f"{file_id}_json_{len(units)}",
"kind": "relation",
"subject": relation["subject"],
"predicate": relation["predicate"],
"object": relation["object"],
"chunk_type": "relation",
"preview": f"{relation['subject']} {relation['predicate']} {relation['object']}"[:120],
}
)
return units, warnings
async def _register_json_units(self, task_id: str, file_id: str, units: List[Dict[str, Any]]) -> None: async def _register_json_units(self, task_id: str, file_id: str, units: List[Dict[str, Any]]) -> None:
records = [ records = [
@@ -2964,48 +3005,91 @@ class ImportTaskManager:
await self._set_chunk_state(task_id, file_record.file_id, chunk_id, "writing", "writing", 0.7) await self._set_chunk_state(task_id, file_record.file_id, chunk_id, "writing", "writing", 0.7)
try: try:
chunk_warnings: List[str] = []
skip_write = False
async with self._storage_lock: async with self._storage_lock:
kind = unit["kind"] kind = unit["kind"]
if kind == "paragraph": if kind == "paragraph":
content = str(unit.get("content", "")) content = str(unit.get("content", ""))
if not content.strip():
chunk_warnings.append(f"跳过分块[{chunk_id}]:段落内容为空")
skip_write = True
elif is_probable_hash_token(content):
chunk_warnings.append(f"跳过分块[{chunk_id}]:段落内容疑似哈希值")
skip_write = True
if skip_write:
pass
k_type = resolve_stored_knowledge_type( k_type = resolve_stored_knowledge_type(
unit.get("knowledge_type"), unit.get("knowledge_type"),
content=content, content=content,
).value ).value
source = str(unit.get("source") or f"web_import:{file_record.name}") source = str(unit.get("source") or f"web_import:{file_record.name}")
para_hash = self.plugin.metadata_store.add_paragraph( if not skip_write:
content=content, para_hash = self.plugin.metadata_store.add_paragraph(
source=source, content=content,
knowledge_type=k_type, source=source,
time_meta=unit.get("time_meta"), knowledge_type=k_type,
) time_meta=unit.get("time_meta"),
vector_result = await self._write_paragraph_vector_or_enqueue(
paragraph_hash=para_hash,
content=content,
context="web_import_json",
)
if str(vector_result.get("warning", "") or "").strip():
logger.warning(
f"web_import json paragraph 向量写入降级: hash={para_hash[:8]} detail={vector_result.get('detail')}"
) )
for name in unit.get("entities", []) or []: vector_result = await self._write_paragraph_vector_or_enqueue(
n = str(name or "").strip() paragraph_hash=para_hash,
if n: content=content,
context="web_import_json",
)
if str(vector_result.get("warning", "") or "").strip():
logger.warning(
f"web_import json paragraph 向量写入降级: hash={para_hash[:8]} detail={vector_result.get('detail')}"
)
for name in unit.get("entities", []) or []:
n = str(name or "").strip()
if not n:
continue
if is_probable_hash_token(n):
chunk_warnings.append(
f"跳过分块[{chunk_id}]中的实体:疑似哈希值 ({n[:32]})"
)
continue
await self._add_entity_with_vector(n, source_paragraph=para_hash) await self._add_entity_with_vector(n, source_paragraph=para_hash)
for rel in unit.get("relations", []) or []: for rel in unit.get("relations", []) or []:
if not isinstance(rel, dict): if not isinstance(rel, dict):
continue continue
s = str(rel.get("subject", "")).strip() s = str(rel.get("subject", "")).strip()
p = str(rel.get("predicate", "")).strip() p = str(rel.get("predicate", "")).strip()
o = str(rel.get("object", "")).strip() o = str(rel.get("object", "")).strip()
if s and p and o: if not (s and p and o):
continue
if any(is_probable_hash_token(token) for token in (s, p, o)):
chunk_warnings.append(
f"跳过分块[{chunk_id}]中的关系:疑似哈希值 ({s[:24]}|{p[:24]}|{o[:24]})"
)
continue
await self._add_relation(s, p, o, source_paragraph=para_hash) await self._add_relation(s, p, o, source_paragraph=para_hash)
elif kind == "entity": elif kind == "entity":
await self._add_entity_with_vector(unit["name"]) entity_name = str(unit.get("name", "")).strip()
if not entity_name:
chunk_warnings.append(f"跳过分块[{chunk_id}]:实体名为空")
skip_write = True
elif is_probable_hash_token(entity_name):
chunk_warnings.append(f"跳过分块[{chunk_id}]:实体名疑似哈希值")
skip_write = True
if not skip_write:
await self._add_entity_with_vector(entity_name)
elif kind == "relation": elif kind == "relation":
await self._add_relation(unit["subject"], unit["predicate"], unit["object"]) subject = str(unit.get("subject", "")).strip()
predicate = str(unit.get("predicate", "")).strip()
obj = str(unit.get("object", "")).strip()
if not (subject and predicate and obj):
chunk_warnings.append(f"跳过分块[{chunk_id}]:关系字段不完整")
skip_write = True
elif any(is_probable_hash_token(token) for token in (subject, predicate, obj)):
chunk_warnings.append(f"跳过分块[{chunk_id}]:关系字段疑似哈希值")
skip_write = True
if not skip_write:
await self._add_relation(subject, predicate, obj)
else: else:
raise RuntimeError(f"未知 JSON 导入单元类型: {kind}") raise RuntimeError(f"未知 JSON 导入单元类型: {kind}")
if chunk_warnings:
await self._append_file_warnings(task_id, file_record.file_id, chunk_warnings)
await self._set_chunk_completed(task_id, file_record.file_id, chunk_id) await self._set_chunk_completed(task_id, file_record.file_id, chunk_id)
except Exception as e: except Exception as e:
await self._set_chunk_failed(task_id, file_record.file_id, chunk_id, f"写入失败: {e}") await self._set_chunk_failed(task_id, file_record.file_id, chunk_id, f"写入失败: {e}")
@@ -3040,7 +3124,10 @@ class ImportTaskManager:
*, *,
time_meta: Optional[Dict[str, Any]] = None, time_meta: Optional[Dict[str, Any]] = None,
) -> None: ) -> None:
content = processed.chunk.text content = str(processed.chunk.text or "")
if is_probable_hash_token(content):
logger.warning("跳过疑似哈希段落写入: source=%s preview=%s", self._source_label(file_record), content[:32])
return
para_hash = self.plugin.metadata_store.add_paragraph( para_hash = self.plugin.metadata_store.add_paragraph(
content=content, content=content,
source=self._source_label(file_record), source=self._source_label(file_record),
@@ -3081,7 +3168,7 @@ class ImportTaskManager:
for k in ("entities", "events", "verbatim_entities"): for k in ("entities", "events", "verbatim_entities"):
for e in data.get(k, []): for e in data.get(k, []):
name = str(e or "").strip() name = str(e or "").strip()
if name: if name and not is_probable_hash_token(name):
entities.append(name) entities.append(name)
uniq_entities = list({x.strip().lower(): x.strip() for x in entities if str(x).strip()}.values()) uniq_entities = list({x.strip().lower(): x.strip() for x in entities if str(x).strip()}.values())
@@ -3092,23 +3179,44 @@ class ImportTaskManager:
await self._add_relation(s, p, o, source_paragraph=para_hash) await self._add_relation(s, p, o, source_paragraph=para_hash)
async def _add_entity_with_vector(self, name: str, source_paragraph: str = "") -> str: async def _add_entity_with_vector(self, name: str, source_paragraph: str = "") -> str:
hash_value = self.plugin.metadata_store.add_entity(name=name, source_paragraph=source_paragraph) name_token = str(name or "").strip()
self.plugin.graph_store.add_nodes([name]) if not name_token:
return ""
if is_probable_hash_token(name_token):
logger.warning(f"跳过疑似哈希实体写入: entity={name_token[:32]}")
return ""
hash_value = self.plugin.metadata_store.add_entity(name=name_token, source_paragraph=source_paragraph)
self.plugin.graph_store.add_nodes([name_token])
if hash_value not in self.plugin.vector_store: if hash_value not in self.plugin.vector_store:
try: try:
if self._is_embedding_degraded(): if self._is_embedding_degraded():
raise RuntimeError("embedding_degraded") raise RuntimeError("embedding_degraded")
emb = await self.plugin.embedding_manager.encode(name) emb = await self.plugin.embedding_manager.encode(name_token)
self.plugin.vector_store.add(emb.reshape(1, -1), [hash_value]) self.plugin.vector_store.add(emb.reshape(1, -1), [hash_value])
except Exception as exc: except Exception as exc:
if not self._allow_metadata_only_write(): if not self._allow_metadata_only_write():
raise raise
logger.warning(f"实体向量写入降级,保留 metadata/graph: entity={name} error={exc}") logger.warning(f"实体向量写入降级,保留 metadata/graph: entity={name_token} error={exc}")
return hash_value return hash_value
async def _add_relation(self, subject: str, predicate: str, obj: str, source_paragraph: str = "") -> str: async def _add_relation(self, subject: str, predicate: str, obj: str, source_paragraph: str = "") -> str:
await self._add_entity_with_vector(subject, source_paragraph=source_paragraph) subject_token = str(subject or "").strip()
await self._add_entity_with_vector(obj, source_paragraph=source_paragraph) predicate_token = str(predicate or "").strip()
object_token = str(obj or "").strip()
if not (subject_token and predicate_token and object_token):
return ""
if any(is_probable_hash_token(token) for token in (subject_token, predicate_token, object_token)):
logger.warning(
"跳过疑似哈希关系写入: %s | %s | %s",
subject_token[:24],
predicate_token[:24],
object_token[:24],
)
return ""
await self._add_entity_with_vector(subject_token, source_paragraph=source_paragraph)
await self._add_entity_with_vector(object_token, source_paragraph=source_paragraph)
rv_cfg = self.plugin.get_config("retrieval.relation_vectorization", {}) or {} rv_cfg = self.plugin.get_config("retrieval.relation_vectorization", {}) or {}
if not isinstance(rv_cfg, dict): if not isinstance(rv_cfg, dict):
rv_cfg = {} rv_cfg = {}
@@ -3117,9 +3225,9 @@ class ImportTaskManager:
relation_service = getattr(self.plugin, "relation_write_service", None) relation_service = getattr(self.plugin, "relation_write_service", None)
if relation_service is not None: if relation_service is not None:
result = await relation_service.upsert_relation_with_vector( result = await relation_service.upsert_relation_with_vector(
subject=subject, subject=subject_token,
predicate=predicate, predicate=predicate_token,
obj=obj, obj=object_token,
confidence=1.0, confidence=1.0,
source_paragraph=source_paragraph, source_paragraph=source_paragraph,
write_vector=write_vector, write_vector=write_vector,
@@ -3127,13 +3235,13 @@ class ImportTaskManager:
return result.hash_value return result.hash_value
rel_hash = self.plugin.metadata_store.add_relation( rel_hash = self.plugin.metadata_store.add_relation(
subject=subject, subject=subject_token,
predicate=predicate, predicate=predicate_token,
obj=obj, obj=object_token,
source_paragraph=source_paragraph, source_paragraph=source_paragraph,
confidence=1.0, confidence=1.0,
) )
self.plugin.graph_store.add_edges([(subject, obj)], relation_hashes=[rel_hash]) self.plugin.graph_store.add_edges([(subject_token, object_token)], relation_hashes=[rel_hash])
try: try:
self.plugin.metadata_store.set_relation_vector_state(rel_hash, "none") self.plugin.metadata_store.set_relation_vector_state(rel_hash, "none")
except Exception: except Exception:
@@ -3376,6 +3484,28 @@ JSON schema:
task.status = "running" task.status = "running"
task.current_step = "running" task.current_step = "running"
async def _append_file_warning(self, task_id: str, file_id: str, warning: str) -> None:
warning_text = str(warning or "").strip()
if not warning_text:
return
async with self._lock:
task = self._tasks.get(task_id)
if not task:
return
file_record = self._find_file(task, file_id)
if not file_record:
return
file_record.warning_count += 1
file_record.warnings.append(warning_text)
if len(file_record.warnings) > FILE_WARNING_KEEP_LIMIT:
file_record.warnings = file_record.warnings[-FILE_WARNING_KEEP_LIMIT:]
file_record.updated_at = _now()
task.updated_at = _now()
async def _append_file_warnings(self, task_id: str, file_id: str, warnings: List[str]) -> None:
for warning in warnings:
await self._append_file_warning(task_id, file_id, warning)
async def _set_file_failed(self, task_id: str, file_id: str, error: str) -> None: async def _set_file_failed(self, task_id: str, file_id: str, error: str) -> None:
async with self._lock: async with self._lock:
task = self._tasks.get(task_id) task = self._tasks.get(task_id)

View File

@@ -10,21 +10,22 @@
5. 更新 manifest 5. 更新 manifest
""" """
import sys
import os
import json
import asyncio
import time
import random
import hashlib
import tomlkit
import argparse
from pathlib import Path
from datetime import datetime from datetime import datetime
from typing import List, Dict, Any, Optional from pathlib import Path
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn from typing import Any, Dict, List, Optional
from rich.console import Console from rich.console import Console
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import argparse
import asyncio
import hashlib
import json
import os
import random
import sys
import time
import tomlkit
console = Console() console = Console()
@@ -89,8 +90,14 @@ try:
resolve_stored_knowledge_type = storage_module.resolve_stored_knowledge_type resolve_stored_knowledge_type = storage_module.resolve_stored_knowledge_type
select_import_strategy = storage_module.select_import_strategy select_import_strategy = storage_module.select_import_strategy
from A_memorix.core.utils.import_payloads import (
ImportPayloadValidationError,
is_probable_hash_token,
normalize_entity_import_item,
normalize_paragraph_import_item,
normalize_relation_import_item,
)
from A_memorix.core.utils.time_parser import normalize_time_meta from A_memorix.core.utils.time_parser import normalize_time_meta
from A_memorix.core.utils.import_payloads import normalize_paragraph_import_item
from A_memorix.core.strategies.base import BaseStrategy, ProcessedChunk, KnowledgeType as StratKnowledgeType from A_memorix.core.strategies.base import BaseStrategy, ProcessedChunk, KnowledgeType as StratKnowledgeType
from A_memorix.core.strategies.narrative import NarrativeStrategy from A_memorix.core.strategies.narrative import NarrativeStrategy
from A_memorix.core.strategies.factual import FactualStrategy from A_memorix.core.strategies.factual import FactualStrategy
@@ -603,11 +610,18 @@ Chat paragraph:
# Re-use existing methods # Re-use existing methods
async def _add_entity_with_vector(self, name: str, source_paragraph: Optional[str] = None) -> str: async def _add_entity_with_vector(self, name: str, source_paragraph: Optional[str] = None) -> str:
# Same as before # 最后一道守卫:防止旁路把 hash 写入实体名
hash_value = self.metadata_store.add_entity(name, source_paragraph=source_paragraph) entity_name = str(name or "").strip()
self.graph_store.add_nodes([name]) if not entity_name:
return ""
if is_probable_hash_token(entity_name):
logger.warning(f"脚本导入跳过疑似哈希实体: {entity_name[:32]}")
return ""
hash_value = self.metadata_store.add_entity(entity_name, source_paragraph=source_paragraph)
self.graph_store.add_nodes([entity_name])
try: try:
emb = await self.embedding_manager.encode(name) emb = await self.embedding_manager.encode(entity_name)
try: try:
self.vector_store.add(emb.reshape(1, -1), [hash_value]) self.vector_store.add(emb.reshape(1, -1), [hash_value])
except ValueError: pass except ValueError: pass
@@ -632,13 +646,35 @@ Chat paragraph:
async def _import_to_db(self, data: Dict, progress_callback=None): async def _import_to_db(self, data: Dict, progress_callback=None):
# Same logic, but ensure robust # Same logic, but ensure robust
warning_count = 0
def append_warning(message: str) -> None:
nonlocal warning_count
warning_count += 1
logger.warning(message)
with self.graph_store.batch_update(): with self.graph_store.batch_update():
for item in data.get("paragraphs", []): for paragraph_index, item in enumerate(data.get("paragraphs", [])):
paragraph = normalize_paragraph_import_item( try:
item, paragraph = normalize_paragraph_import_item(
default_source="script", item,
) default_source="script",
)
except ImportPayloadValidationError as exc:
append_warning(
f"脚本导入跳过段落[{paragraph_index}]{exc} (code={exc.code})"
)
if progress_callback:
progress_callback(1)
continue
content = paragraph["content"] content = paragraph["content"]
if is_probable_hash_token(content):
append_warning(f"脚本导入跳过段落[{paragraph_index}]:段落内容疑似哈希值")
if progress_callback:
progress_callback(1)
continue
source = paragraph["source"] source = paragraph["source"]
k_type_val = paragraph["knowledge_type"] k_type_val = paragraph["knowledge_type"]
@@ -658,44 +694,109 @@ Chat paragraph:
para_entities = paragraph["entities"] para_entities = paragraph["entities"]
for entity in para_entities: for entity in para_entities:
if entity: name = normalize_entity_import_item(entity)
await self._add_entity_with_vector(entity, source_paragraph=h_val) if not name:
append_warning(f"脚本导入跳过段落[{paragraph_index}]中的实体:无效名称或疑似哈希值")
continue
await self._add_entity_with_vector(name, source_paragraph=h_val)
para_relations = paragraph["relations"] para_relations = paragraph["relations"]
for rel in para_relations: for rel in para_relations:
s, p, o = rel.get("subject"), rel.get("predicate"), rel.get("object") normalized_relation = normalize_relation_import_item(rel)
if s and p and o: if normalized_relation is None:
await self._add_entity_with_vector(s, source_paragraph=h_val) append_warning(f"脚本导入跳过段落[{paragraph_index}]中的关系:字段无效或疑似哈希值")
await self._add_entity_with_vector(o, source_paragraph=h_val) continue
confidence = float(rel.get("confidence", 1.0) or 1.0)
rel_meta = rel.get("metadata", {}) s = normalized_relation["subject"]
write_vector = self._should_write_relation_vectors() p = normalized_relation["predicate"]
if self.relation_write_service is not None: o = normalized_relation["object"]
await self.relation_write_service.upsert_relation_with_vector( await self._add_entity_with_vector(s, source_paragraph=h_val)
subject=s, await self._add_entity_with_vector(o, source_paragraph=h_val)
predicate=p,
obj=o, confidence = float(rel.get("confidence", 1.0) or 1.0) if isinstance(rel, dict) else 1.0
confidence=confidence, rel_meta = rel.get("metadata", {}) if isinstance(rel, dict) else {}
source_paragraph=h_val, write_vector = self._should_write_relation_vectors()
metadata=rel_meta if isinstance(rel_meta, dict) else {}, if self.relation_write_service is not None:
write_vector=write_vector, await self.relation_write_service.upsert_relation_with_vector(
) subject=s,
else: predicate=p,
rel_hash = self.metadata_store.add_relation( obj=o,
s, confidence=confidence,
p, source_paragraph=h_val,
o, metadata=rel_meta if isinstance(rel_meta, dict) else {},
confidence=confidence, write_vector=write_vector,
source_paragraph=h_val, )
metadata=rel_meta if isinstance(rel_meta, dict) else {}, else:
) rel_hash = self.metadata_store.add_relation(
self.graph_store.add_edges([(s, o)], relation_hashes=[rel_hash]) s,
try: p,
self.metadata_store.set_relation_vector_state(rel_hash, "none") o,
except Exception: confidence=confidence,
pass source_paragraph=h_val,
metadata=rel_meta if isinstance(rel_meta, dict) else {},
if progress_callback: progress_callback(1) )
self.graph_store.add_edges([(s, o)], relation_hashes=[rel_hash])
try:
self.metadata_store.set_relation_vector_state(rel_hash, "none")
except Exception:
pass
if progress_callback:
progress_callback(1)
for entity_index, raw_entity in enumerate(data.get("entities", []) or []):
entity_name = normalize_entity_import_item(raw_entity)
if not entity_name:
append_warning(f"脚本导入跳过顶层实体[{entity_index}]:无效名称或疑似哈希值")
continue
await self._add_entity_with_vector(entity_name)
for relation_index, raw_relation in enumerate(data.get("relations", []) or []):
relation = normalize_relation_import_item(raw_relation)
if relation is None:
append_warning(f"脚本导入跳过顶层关系[{relation_index}]:字段无效或疑似哈希值")
continue
subject = relation["subject"]
predicate = relation["predicate"]
obj = relation["object"]
await self._add_entity_with_vector(subject)
await self._add_entity_with_vector(obj)
confidence = (
float(raw_relation.get("confidence", 1.0) or 1.0)
if isinstance(raw_relation, dict)
else 1.0
)
rel_meta = raw_relation.get("metadata", {}) if isinstance(raw_relation, dict) else {}
write_vector = self._should_write_relation_vectors()
if self.relation_write_service is not None:
await self.relation_write_service.upsert_relation_with_vector(
subject=subject,
predicate=predicate,
obj=obj,
confidence=confidence,
source_paragraph="",
metadata=rel_meta if isinstance(rel_meta, dict) else {},
write_vector=write_vector,
)
else:
rel_hash = self.metadata_store.add_relation(
subject,
predicate,
obj,
confidence=confidence,
source_paragraph="",
metadata=rel_meta if isinstance(rel_meta, dict) else {},
)
self.graph_store.add_edges([(subject, obj)], relation_hashes=[rel_hash])
try:
self.metadata_store.set_relation_vector_state(rel_hash, "none")
except Exception:
pass
if warning_count > 0:
logger.warning(f"脚本导入完成,跳过异常项 {warning_count}")
async def close(self): async def close(self):
if self.metadata_store: self.metadata_store.close() if self.metadata_store: self.metadata_store.close()

View File

@@ -131,12 +131,10 @@ def _build_success_content(result: MemorySearchResult, *, limit: int) -> str:
snippet = result.to_text(limit=max(1, int(limit))) snippet = result.to_text(limit=max(1, int(limit)))
if result.hits: if result.hits:
if summary and snippet:
return f"{summary}\n{snippet}"
if summary:
return summary
if snippet: if snippet:
return snippet return snippet
if summary:
return summary
return "已找到匹配的长期记忆。" return "已找到匹配的长期记忆。"
if result.filtered: if result.filtered:
@@ -189,6 +187,11 @@ async def handle_tool(
group_id=group_id, group_id=group_id,
) )
respect_filter = bool(invocation.arguments.get("respect_filter", True)) respect_filter = bool(invocation.arguments.get("respect_filter", True))
fallback_applied = False
fallback_reason = ""
fallback_query = ""
effective_mode = mode
primary_hit_count = 0
logger.info( logger.info(
f"{runtime.log_prefix} 触发长期记忆检索工具: " f"{runtime.log_prefix} 触发长期记忆检索工具: "
@@ -213,12 +216,53 @@ async def handle_tool(
invocation.tool_name, invocation.tool_name,
f"长期记忆检索失败:{exc}", f"长期记忆检索失败:{exc}",
) )
primary_hit_count = len(result.hits)
# 方案2人物过滤未命中时降级到关键词检索避免直接“空结果”。
if (
result.success
and person_id
and not result.filtered
and not result.hits
and clean_query
):
fallback_applied = True
fallback_reason = "person_filter_miss"
fallback_query = clean_query
effective_mode = "search"
logger.info(
f"{runtime.log_prefix} 人物过滤未命中,降级为关键词检索: "
f"query={fallback_query!r} original_mode={mode} person_id={person_id!r}"
)
try:
fallback_result = await memory_service.search(
fallback_query,
limit=limit,
mode="search",
chat_id=session_id,
person_id="",
time_start=None,
time_end=None,
respect_filter=respect_filter,
user_id=user_id,
group_id=group_id,
)
if fallback_result.success:
result = fallback_result
else:
logger.warning(
f"{runtime.log_prefix} 关键词降级检索失败,回退原结果: "
f"error={fallback_result.error}"
)
except Exception as exc:
logger.warning(f"{runtime.log_prefix} 关键词降级检索异常,回退原结果: {exc}")
structured_content: Dict[str, Any] = result.to_dict() structured_content: Dict[str, Any] = result.to_dict()
structured_content.update( structured_content.update(
{ {
"query": clean_query, "query": clean_query,
"mode": mode, "mode": mode,
"effective_mode": effective_mode,
"limit": limit, "limit": limit,
"chat_id": session_id, "chat_id": session_id,
"person_name": person_name, "person_name": person_name,
@@ -228,6 +272,10 @@ async def handle_tool(
"respect_filter": respect_filter, "respect_filter": respect_filter,
"user_id": user_id, "user_id": user_id,
"group_id": group_id, "group_id": group_id,
"fallback_applied": fallback_applied,
"fallback_reason": fallback_reason,
"fallback_query": fallback_query,
"primary_hit_count": primary_hit_count,
} }
) )
@@ -240,6 +288,11 @@ async def handle_tool(
) )
content = _build_success_content(result, limit=limit) content = _build_success_content(result, limit=limit)
if fallback_applied:
content = (
"提示:人物定向检索未命中,已自动降级为关键词检索。\n"
f"{content}"
)
if clean_query: if clean_query:
display_prompt = f"你查询了长期记忆:{clean_query}" display_prompt = f"你查询了长期记忆:{clean_query}"
else: else: