diff --git a/src/A_memorix/IMPORT_GUIDE.md b/src/A_memorix/IMPORT_GUIDE.md index 9d248178..6fce822f 100644 --- a/src/A_memorix/IMPORT_GUIDE.md +++ b/src/A_memorix/IMPORT_GUIDE.md @@ -163,6 +163,34 @@ python src/A_memorix/scripts/audit_vector_consistency.py --json } ``` +### 3.3 JSON 导入字段约束(`input_mode="json"`) + +`create_paste/create_upload/create_raw_scan` 在 `input_mode="json"` 下,导入内容必须是语义文本,不接受 hash 形态字段作为正文或实体名。 + +- 段落 `paragraphs[*]` + - 允许字符串(视为 `content`)或对象(必须包含 `content`)。 + - `content` 若为空,或为“整串 hex 且长度 32/40/64”的疑似 hash,会被跳过并记为 warning。 +- 实体 `entities[*]` + - 允许字符串,或对象(仅提取 `name/label/entity` 作为实体名)。 + - 无法提取名称、名称为空、名称为疑似 hash 的实体会被跳过。 +- 关系 `relations[*]` + - 仅接受对象,且必须包含 `subject/predicate/object`。 + - 任一字段为空或为疑似 hash 时,该关系会被跳过。 + +说明: + +- “跳过”不会导致任务失败,任务会继续处理其余有效项。 +- 仅阻断未来导入;历史库中的旧数据不会自动清理。 + +### 3.4 任务告警字段 + +`memory_import_admin` 的 `list/get/chunks` 返回中,`task.files[*]` 提供: + +- `warning_count`: 文件累计告警数 +- `warnings`: 告警明细(仅保留最近若干条) + +这两个字段用于区分“导入成功但有跳过项”与“导入失败”,不要把 warning 当作 error 处理。 + ## 4. 直接写入 Tool(非任务化) 若你不需要任务编排,也可以直接调用: diff --git a/src/A_memorix/core/utils/import_payloads.py b/src/A_memorix/core/utils/import_payloads.py index 6986a4c1..9e6aafe3 100644 --- a/src/A_memorix/core/utils/import_payloads.py +++ b/src/A_memorix/core/utils/import_payloads.py @@ -3,10 +3,78 @@ from __future__ import annotations from typing import Any, Dict, List, Optional +import re from ..storage import KnowledgeType, resolve_stored_knowledge_type from .time_parser import normalize_time_meta +_HASH_TOKEN_PATTERN = re.compile(r"^[0-9a-fA-F]+$") +_ENTITY_NAME_KEYS = ("name", "label", "entity") + + +class ImportPayloadValidationError(ValueError): + """导入负载校验异常(可用于上层按项跳过并记录告警)。""" + + def __init__(self, message: str, *, code: str, field: str = "", value: str = "") -> None: + super().__init__(message) + self.code = code + self.field = field + self.value = value + + +def is_probable_hash_token(value: Any) -> bool: + """判断文本是否疑似哈希值(hex 串,长度为 32/40/64)。""" + + text = str(value or "").strip() + if len(text) not in {32, 40, 64}: + return False + return bool(_HASH_TOKEN_PATTERN.fullmatch(text)) + + +def normalize_entity_import_item(item: Any) -> Optional[str]: + """标准化实体导入项。 + + 支持: + - 字符串实体名 + - 对象实体(提取 name/label/entity 字段) + """ + + if isinstance(item, str): + name = item.strip() + elif isinstance(item, dict): + name = "" + for key in _ENTITY_NAME_KEYS: + candidate = str(item.get(key, "") or "").strip() + if candidate: + name = candidate + break + else: + name = "" + + if not name or is_probable_hash_token(name): + return None + return name + + +def normalize_relation_import_item(item: Any) -> Optional[Dict[str, str]]: + """标准化关系导入项。""" + + if not isinstance(item, dict): + return None + + subject = str(item.get("subject", "") or "").strip() + predicate = str(item.get("predicate", "") or "").strip() + obj = str(item.get("object", "") or "").strip() + if not (subject and predicate and obj): + return None + if any(is_probable_hash_token(token) for token in (subject, predicate, obj)): + return None + return { + "subject": subject, + "predicate": predicate, + "object": obj, + } + def _normalize_entities(raw_entities: Any) -> List[str]: if not isinstance(raw_entities, list): @@ -14,7 +82,7 @@ def _normalize_entities(raw_entities: Any) -> List[str]: out: List[str] = [] seen = set() for item in raw_entities: - name = str(item or "").strip() + name = normalize_entity_import_item(item) if not name: continue key = name.lower() @@ -30,20 +98,10 @@ def _normalize_relations(raw_relations: Any) -> List[Dict[str, str]]: return [] out: List[Dict[str, str]] = [] for item in raw_relations: - if not isinstance(item, dict): + relation = normalize_relation_import_item(item) + if relation is None: continue - subject = str(item.get("subject", "")).strip() - predicate = str(item.get("predicate", "")).strip() - obj = str(item.get("object", "")).strip() - if not (subject and predicate and obj): - continue - out.append( - { - "subject": subject, - "predicate": predicate, - "object": obj, - } - ) + out.append(relation) return out @@ -55,7 +113,20 @@ def normalize_paragraph_import_item( """Normalize one paragraph import item from text/json payloads.""" if isinstance(item, str): - content = str(item) + content = str(item or "") + if not content.strip(): + raise ImportPayloadValidationError( + "段落 content 不能为空", + code="paragraph_content_empty", + field="content", + ) + if is_probable_hash_token(content): + raise ImportPayloadValidationError( + "段落 content 疑似哈希值,已跳过", + code="paragraph_content_hash_like", + field="content", + value=content, + ) knowledge_type = resolve_stored_knowledge_type(None, content=content) return { "content": content, @@ -67,11 +138,26 @@ def normalize_paragraph_import_item( } if not isinstance(item, dict) or "content" not in item: - raise ValueError("段落项必须为字符串或包含 content 的对象") + raise ImportPayloadValidationError( + "段落项必须为字符串或包含 content 的对象", + code="paragraph_item_invalid", + field="content", + ) content = str(item.get("content", "") or "") if not content.strip(): - raise ValueError("段落 content 不能为空") + raise ImportPayloadValidationError( + "段落 content 不能为空", + code="paragraph_content_empty", + field="content", + ) + if is_probable_hash_token(content): + raise ImportPayloadValidationError( + "段落 content 疑似哈希值,已跳过", + code="paragraph_content_hash_like", + field="content", + value=content, + ) raw_time_meta = { "event_time": item.get("event_time"), diff --git a/src/A_memorix/core/utils/web_import_manager.py b/src/A_memorix/core/utils/web_import_manager.py index 51bd501f..1ee93418 100644 --- a/src/A_memorix/core/utils/web_import_manager.py +++ b/src/A_memorix/core/utils/web_import_manager.py @@ -33,7 +33,13 @@ from ..storage import ( MetadataStore, ) from ..storage.type_detection import looks_like_quote_text -from ..utils.import_payloads import normalize_paragraph_import_item +from ..utils.import_payloads import ( + ImportPayloadValidationError, + is_probable_hash_token, + normalize_entity_import_item, + normalize_paragraph_import_item, + normalize_relation_import_item, +) from ..utils.runtime_self_check import ensure_runtime_self_check from ..utils.time_parser import normalize_time_meta from ..storage.knowledge_types import ImportStrategy @@ -77,6 +83,8 @@ CHUNK_STATUS = { "cancelled", } +FILE_WARNING_KEEP_LIMIT = 50 + def _now() -> float: return time.time() @@ -219,6 +227,8 @@ class ImportFileRecord: content_hash: str = "" retry_chunk_indexes: List[int] = field(default_factory=list) retry_mode: str = "" + warning_count: int = 0 + warnings: List[str] = field(default_factory=list) def to_dict(self, include_chunks: bool = False) -> Dict[str, Any]: payload = { @@ -241,6 +251,8 @@ class ImportFileRecord: "content_hash": self.content_hash or "", "retry_chunk_indexes": list(self.retry_chunk_indexes or []), "retry_mode": self.retry_mode or "", + "warning_count": int(self.warning_count), + "warnings": list(self.warnings), } if include_chunks: payload["chunks"] = [chunk.to_dict() for chunk in self.chunks] @@ -1386,6 +1398,7 @@ class ImportTaskManager: "offset": start, "limit": size, "total": len(file_obj.chunks), + "file": file_obj.to_dict(include_chunks=False), "items": [x.to_dict() for x in items], } @@ -2775,7 +2788,9 @@ class ImportTaskManager: if task: task.schema_detected = schema task.updated_at = _now() - units = self._build_json_units(data, file_record.file_id, file_record.name, schema) + units, build_warnings = self._build_json_units(data, file_record.file_id, file_record.name, schema) + if build_warnings: + await self._append_file_warnings(task_id, file_record.file_id, build_warnings) await self._register_json_units(task_id, file_record.file_id, units) await self._set_file_state(task_id, file_record.file_id, "extracting", "extracting") @@ -2829,8 +2844,15 @@ class ImportTaskManager: return "web_json" raise RuntimeError("不支持的 JSON 格式:需要 paragraphs 或 docs") - def _build_json_units(self, data: Any, file_id: str, filename: str, schema: str) -> List[Dict[str, Any]]: + def _build_json_units( + self, + data: Any, + file_id: str, + filename: str, + schema: str, + ) -> Tuple[List[Dict[str, Any]], List[str]]: units: List[Dict[str, Any]] = [] + warnings: List[str] = [] paragraphs: List[Any] = [] entities: List[Any] = [] relations: List[Any] = [] @@ -2867,11 +2889,17 @@ class ImportTaskManager: } paragraphs.append(para_item) - for p in paragraphs: - paragraph = normalize_paragraph_import_item( - p, - default_source=f"web_import:{filename}", - ) + for paragraph_index, p in enumerate(paragraphs): + try: + paragraph = normalize_paragraph_import_item( + p, + default_source=f"web_import:{filename}", + ) + except ImportPayloadValidationError as exc: + warnings.append( + f"跳过段落[{paragraph_index}]:{exc} (code={exc.code})" + ) + continue units.append( { "chunk_id": f"{file_id}_json_{len(units)}", @@ -2887,38 +2915,51 @@ class ImportTaskManager: } ) - for e in entities: - name = str(e or "").strip() - if name: - units.append( - { - "chunk_id": f"{file_id}_json_{len(units)}", - "kind": "entity", - "name": name, - "chunk_type": "entity", - "preview": name[:120], - } + for entity_index, e in enumerate(entities): + name = normalize_entity_import_item(e) + if not name: + raw = str(e or "").strip() + warnings.append( + f"跳过实体[{entity_index}]:无效名称或疑似哈希值 ({raw[:80]})" ) - - for r in relations: - if not isinstance(r, dict): continue - s = str(r.get("subject", "")).strip() - p = str(r.get("predicate", "")).strip() - o = str(r.get("object", "")).strip() - if s and p and o: - units.append( - { - "chunk_id": f"{file_id}_json_{len(units)}", - "kind": "relation", - "subject": s, - "predicate": p, - "object": o, - "chunk_type": "relation", - "preview": f"{s} {p} {o}"[:120], - } + units.append( + { + "chunk_id": f"{file_id}_json_{len(units)}", + "kind": "entity", + "name": name, + "chunk_type": "entity", + "preview": name[:120], + } + ) + + for relation_index, r in enumerate(relations): + relation = normalize_relation_import_item(r) + if relation is None: + if isinstance(r, dict): + raw = ( + f"{str(r.get('subject', '')).strip()} | " + f"{str(r.get('predicate', '')).strip()} | " + f"{str(r.get('object', '')).strip()}" + ) + else: + raw = str(r or "").strip() + warnings.append( + f"跳过关系[{relation_index}]:无效三元组或疑似哈希值 ({raw[:120]})" ) - return units + continue + units.append( + { + "chunk_id": f"{file_id}_json_{len(units)}", + "kind": "relation", + "subject": relation["subject"], + "predicate": relation["predicate"], + "object": relation["object"], + "chunk_type": "relation", + "preview": f"{relation['subject']} {relation['predicate']} {relation['object']}"[:120], + } + ) + return units, warnings async def _register_json_units(self, task_id: str, file_id: str, units: List[Dict[str, Any]]) -> None: records = [ @@ -2964,48 +3005,91 @@ class ImportTaskManager: await self._set_chunk_state(task_id, file_record.file_id, chunk_id, "writing", "writing", 0.7) try: + chunk_warnings: List[str] = [] + skip_write = False async with self._storage_lock: kind = unit["kind"] if kind == "paragraph": content = str(unit.get("content", "")) + if not content.strip(): + chunk_warnings.append(f"跳过分块[{chunk_id}]:段落内容为空") + skip_write = True + elif is_probable_hash_token(content): + chunk_warnings.append(f"跳过分块[{chunk_id}]:段落内容疑似哈希值") + skip_write = True + if skip_write: + pass k_type = resolve_stored_knowledge_type( unit.get("knowledge_type"), content=content, ).value source = str(unit.get("source") or f"web_import:{file_record.name}") - para_hash = self.plugin.metadata_store.add_paragraph( - content=content, - source=source, - knowledge_type=k_type, - time_meta=unit.get("time_meta"), - ) - vector_result = await self._write_paragraph_vector_or_enqueue( - paragraph_hash=para_hash, - content=content, - context="web_import_json", - ) - if str(vector_result.get("warning", "") or "").strip(): - logger.warning( - f"web_import json paragraph 向量写入降级: hash={para_hash[:8]} detail={vector_result.get('detail')}" + if not skip_write: + para_hash = self.plugin.metadata_store.add_paragraph( + content=content, + source=source, + knowledge_type=k_type, + time_meta=unit.get("time_meta"), ) - for name in unit.get("entities", []) or []: - n = str(name or "").strip() - if n: + vector_result = await self._write_paragraph_vector_or_enqueue( + paragraph_hash=para_hash, + content=content, + context="web_import_json", + ) + if str(vector_result.get("warning", "") or "").strip(): + logger.warning( + f"web_import json paragraph 向量写入降级: hash={para_hash[:8]} detail={vector_result.get('detail')}" + ) + for name in unit.get("entities", []) or []: + n = str(name or "").strip() + if not n: + continue + if is_probable_hash_token(n): + chunk_warnings.append( + f"跳过分块[{chunk_id}]中的实体:疑似哈希值 ({n[:32]})" + ) + continue await self._add_entity_with_vector(n, source_paragraph=para_hash) - for rel in unit.get("relations", []) or []: - if not isinstance(rel, dict): - continue - s = str(rel.get("subject", "")).strip() - p = str(rel.get("predicate", "")).strip() - o = str(rel.get("object", "")).strip() - if s and p and o: + for rel in unit.get("relations", []) or []: + if not isinstance(rel, dict): + continue + s = str(rel.get("subject", "")).strip() + p = str(rel.get("predicate", "")).strip() + o = str(rel.get("object", "")).strip() + if not (s and p and o): + continue + if any(is_probable_hash_token(token) for token in (s, p, o)): + chunk_warnings.append( + f"跳过分块[{chunk_id}]中的关系:疑似哈希值 ({s[:24]}|{p[:24]}|{o[:24]})" + ) + continue await self._add_relation(s, p, o, source_paragraph=para_hash) elif kind == "entity": - await self._add_entity_with_vector(unit["name"]) + entity_name = str(unit.get("name", "")).strip() + if not entity_name: + chunk_warnings.append(f"跳过分块[{chunk_id}]:实体名为空") + skip_write = True + elif is_probable_hash_token(entity_name): + chunk_warnings.append(f"跳过分块[{chunk_id}]:实体名疑似哈希值") + skip_write = True + if not skip_write: + await self._add_entity_with_vector(entity_name) elif kind == "relation": - await self._add_relation(unit["subject"], unit["predicate"], unit["object"]) + subject = str(unit.get("subject", "")).strip() + predicate = str(unit.get("predicate", "")).strip() + obj = str(unit.get("object", "")).strip() + if not (subject and predicate and obj): + chunk_warnings.append(f"跳过分块[{chunk_id}]:关系字段不完整") + skip_write = True + elif any(is_probable_hash_token(token) for token in (subject, predicate, obj)): + chunk_warnings.append(f"跳过分块[{chunk_id}]:关系字段疑似哈希值") + skip_write = True + if not skip_write: + await self._add_relation(subject, predicate, obj) else: raise RuntimeError(f"未知 JSON 导入单元类型: {kind}") + if chunk_warnings: + await self._append_file_warnings(task_id, file_record.file_id, chunk_warnings) await self._set_chunk_completed(task_id, file_record.file_id, chunk_id) except Exception as e: await self._set_chunk_failed(task_id, file_record.file_id, chunk_id, f"写入失败: {e}") @@ -3040,7 +3124,10 @@ class ImportTaskManager: *, time_meta: Optional[Dict[str, Any]] = None, ) -> None: - content = processed.chunk.text + content = str(processed.chunk.text or "") + if is_probable_hash_token(content): + logger.warning("跳过疑似哈希段落写入: source=%s preview=%s", self._source_label(file_record), content[:32]) + return para_hash = self.plugin.metadata_store.add_paragraph( content=content, source=self._source_label(file_record), @@ -3081,7 +3168,7 @@ class ImportTaskManager: for k in ("entities", "events", "verbatim_entities"): for e in data.get(k, []): name = str(e or "").strip() - if name: + if name and not is_probable_hash_token(name): entities.append(name) uniq_entities = list({x.strip().lower(): x.strip() for x in entities if str(x).strip()}.values()) @@ -3092,23 +3179,44 @@ class ImportTaskManager: await self._add_relation(s, p, o, source_paragraph=para_hash) async def _add_entity_with_vector(self, name: str, source_paragraph: str = "") -> str: - hash_value = self.plugin.metadata_store.add_entity(name=name, source_paragraph=source_paragraph) - self.plugin.graph_store.add_nodes([name]) + name_token = str(name or "").strip() + if not name_token: + return "" + if is_probable_hash_token(name_token): + logger.warning(f"跳过疑似哈希实体写入: entity={name_token[:32]}") + return "" + + hash_value = self.plugin.metadata_store.add_entity(name=name_token, source_paragraph=source_paragraph) + self.plugin.graph_store.add_nodes([name_token]) if hash_value not in self.plugin.vector_store: try: if self._is_embedding_degraded(): raise RuntimeError("embedding_degraded") - emb = await self.plugin.embedding_manager.encode(name) + emb = await self.plugin.embedding_manager.encode(name_token) self.plugin.vector_store.add(emb.reshape(1, -1), [hash_value]) except Exception as exc: if not self._allow_metadata_only_write(): raise - logger.warning(f"实体向量写入降级,保留 metadata/graph: entity={name} error={exc}") + logger.warning(f"实体向量写入降级,保留 metadata/graph: entity={name_token} error={exc}") return hash_value async def _add_relation(self, subject: str, predicate: str, obj: str, source_paragraph: str = "") -> str: - await self._add_entity_with_vector(subject, source_paragraph=source_paragraph) - await self._add_entity_with_vector(obj, source_paragraph=source_paragraph) + subject_token = str(subject or "").strip() + predicate_token = str(predicate or "").strip() + object_token = str(obj or "").strip() + if not (subject_token and predicate_token and object_token): + return "" + if any(is_probable_hash_token(token) for token in (subject_token, predicate_token, object_token)): + logger.warning( + "跳过疑似哈希关系写入: %s | %s | %s", + subject_token[:24], + predicate_token[:24], + object_token[:24], + ) + return "" + + await self._add_entity_with_vector(subject_token, source_paragraph=source_paragraph) + await self._add_entity_with_vector(object_token, source_paragraph=source_paragraph) rv_cfg = self.plugin.get_config("retrieval.relation_vectorization", {}) or {} if not isinstance(rv_cfg, dict): rv_cfg = {} @@ -3117,9 +3225,9 @@ class ImportTaskManager: relation_service = getattr(self.plugin, "relation_write_service", None) if relation_service is not None: result = await relation_service.upsert_relation_with_vector( - subject=subject, - predicate=predicate, - obj=obj, + subject=subject_token, + predicate=predicate_token, + obj=object_token, confidence=1.0, source_paragraph=source_paragraph, write_vector=write_vector, @@ -3127,13 +3235,13 @@ class ImportTaskManager: return result.hash_value rel_hash = self.plugin.metadata_store.add_relation( - subject=subject, - predicate=predicate, - obj=obj, + subject=subject_token, + predicate=predicate_token, + obj=object_token, source_paragraph=source_paragraph, confidence=1.0, ) - self.plugin.graph_store.add_edges([(subject, obj)], relation_hashes=[rel_hash]) + self.plugin.graph_store.add_edges([(subject_token, object_token)], relation_hashes=[rel_hash]) try: self.plugin.metadata_store.set_relation_vector_state(rel_hash, "none") except Exception: @@ -3376,6 +3484,28 @@ JSON schema: task.status = "running" task.current_step = "running" + async def _append_file_warning(self, task_id: str, file_id: str, warning: str) -> None: + warning_text = str(warning or "").strip() + if not warning_text: + return + async with self._lock: + task = self._tasks.get(task_id) + if not task: + return + file_record = self._find_file(task, file_id) + if not file_record: + return + file_record.warning_count += 1 + file_record.warnings.append(warning_text) + if len(file_record.warnings) > FILE_WARNING_KEEP_LIMIT: + file_record.warnings = file_record.warnings[-FILE_WARNING_KEEP_LIMIT:] + file_record.updated_at = _now() + task.updated_at = _now() + + async def _append_file_warnings(self, task_id: str, file_id: str, warnings: List[str]) -> None: + for warning in warnings: + await self._append_file_warning(task_id, file_id, warning) + async def _set_file_failed(self, task_id: str, file_id: str, error: str) -> None: async with self._lock: task = self._tasks.get(task_id) diff --git a/src/A_memorix/scripts/process_knowledge.py b/src/A_memorix/scripts/process_knowledge.py index 2f5a6d5f..1dfec182 100644 --- a/src/A_memorix/scripts/process_knowledge.py +++ b/src/A_memorix/scripts/process_knowledge.py @@ -10,21 +10,22 @@ 5. 更新 manifest """ -import sys -import os -import json -import asyncio -import time -import random -import hashlib -import tomlkit -import argparse -from pathlib import Path from datetime import datetime -from typing import List, Dict, Any, Optional -from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn +from pathlib import Path +from typing import Any, Dict, List, Optional + from rich.console import Console +from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type +import argparse +import asyncio +import hashlib +import json +import os +import random +import sys +import time +import tomlkit console = Console() @@ -89,8 +90,14 @@ try: resolve_stored_knowledge_type = storage_module.resolve_stored_knowledge_type select_import_strategy = storage_module.select_import_strategy + from A_memorix.core.utils.import_payloads import ( + ImportPayloadValidationError, + is_probable_hash_token, + normalize_entity_import_item, + normalize_paragraph_import_item, + normalize_relation_import_item, + ) from A_memorix.core.utils.time_parser import normalize_time_meta - from A_memorix.core.utils.import_payloads import normalize_paragraph_import_item from A_memorix.core.strategies.base import BaseStrategy, ProcessedChunk, KnowledgeType as StratKnowledgeType from A_memorix.core.strategies.narrative import NarrativeStrategy from A_memorix.core.strategies.factual import FactualStrategy @@ -603,11 +610,18 @@ Chat paragraph: # Re-use existing methods async def _add_entity_with_vector(self, name: str, source_paragraph: Optional[str] = None) -> str: - # Same as before - hash_value = self.metadata_store.add_entity(name, source_paragraph=source_paragraph) - self.graph_store.add_nodes([name]) + # 最后一道守卫:防止旁路把 hash 写入实体名 + entity_name = str(name or "").strip() + if not entity_name: + return "" + if is_probable_hash_token(entity_name): + logger.warning(f"脚本导入跳过疑似哈希实体: {entity_name[:32]}") + return "" + + hash_value = self.metadata_store.add_entity(entity_name, source_paragraph=source_paragraph) + self.graph_store.add_nodes([entity_name]) try: - emb = await self.embedding_manager.encode(name) + emb = await self.embedding_manager.encode(entity_name) try: self.vector_store.add(emb.reshape(1, -1), [hash_value]) except ValueError: pass @@ -632,13 +646,35 @@ Chat paragraph: async def _import_to_db(self, data: Dict, progress_callback=None): # Same logic, but ensure robust + warning_count = 0 + + def append_warning(message: str) -> None: + nonlocal warning_count + warning_count += 1 + logger.warning(message) + with self.graph_store.batch_update(): - for item in data.get("paragraphs", []): - paragraph = normalize_paragraph_import_item( - item, - default_source="script", - ) + for paragraph_index, item in enumerate(data.get("paragraphs", [])): + try: + paragraph = normalize_paragraph_import_item( + item, + default_source="script", + ) + except ImportPayloadValidationError as exc: + append_warning( + f"脚本导入跳过段落[{paragraph_index}]:{exc} (code={exc.code})" + ) + if progress_callback: + progress_callback(1) + continue + content = paragraph["content"] + if is_probable_hash_token(content): + append_warning(f"脚本导入跳过段落[{paragraph_index}]:段落内容疑似哈希值") + if progress_callback: + progress_callback(1) + continue + source = paragraph["source"] k_type_val = paragraph["knowledge_type"] @@ -658,44 +694,109 @@ Chat paragraph: para_entities = paragraph["entities"] for entity in para_entities: - if entity: - await self._add_entity_with_vector(entity, source_paragraph=h_val) - + name = normalize_entity_import_item(entity) + if not name: + append_warning(f"脚本导入跳过段落[{paragraph_index}]中的实体:无效名称或疑似哈希值") + continue + await self._add_entity_with_vector(name, source_paragraph=h_val) + para_relations = paragraph["relations"] for rel in para_relations: - s, p, o = rel.get("subject"), rel.get("predicate"), rel.get("object") - if s and p and o: - await self._add_entity_with_vector(s, source_paragraph=h_val) - await self._add_entity_with_vector(o, source_paragraph=h_val) - confidence = float(rel.get("confidence", 1.0) or 1.0) - rel_meta = rel.get("metadata", {}) - write_vector = self._should_write_relation_vectors() - if self.relation_write_service is not None: - await self.relation_write_service.upsert_relation_with_vector( - subject=s, - predicate=p, - obj=o, - confidence=confidence, - source_paragraph=h_val, - metadata=rel_meta if isinstance(rel_meta, dict) else {}, - write_vector=write_vector, - ) - else: - rel_hash = self.metadata_store.add_relation( - s, - p, - o, - confidence=confidence, - source_paragraph=h_val, - metadata=rel_meta if isinstance(rel_meta, dict) else {}, - ) - self.graph_store.add_edges([(s, o)], relation_hashes=[rel_hash]) - try: - self.metadata_store.set_relation_vector_state(rel_hash, "none") - except Exception: - pass - - if progress_callback: progress_callback(1) + normalized_relation = normalize_relation_import_item(rel) + if normalized_relation is None: + append_warning(f"脚本导入跳过段落[{paragraph_index}]中的关系:字段无效或疑似哈希值") + continue + + s = normalized_relation["subject"] + p = normalized_relation["predicate"] + o = normalized_relation["object"] + await self._add_entity_with_vector(s, source_paragraph=h_val) + await self._add_entity_with_vector(o, source_paragraph=h_val) + + confidence = float(rel.get("confidence", 1.0) or 1.0) if isinstance(rel, dict) else 1.0 + rel_meta = rel.get("metadata", {}) if isinstance(rel, dict) else {} + write_vector = self._should_write_relation_vectors() + if self.relation_write_service is not None: + await self.relation_write_service.upsert_relation_with_vector( + subject=s, + predicate=p, + obj=o, + confidence=confidence, + source_paragraph=h_val, + metadata=rel_meta if isinstance(rel_meta, dict) else {}, + write_vector=write_vector, + ) + else: + rel_hash = self.metadata_store.add_relation( + s, + p, + o, + confidence=confidence, + source_paragraph=h_val, + metadata=rel_meta if isinstance(rel_meta, dict) else {}, + ) + self.graph_store.add_edges([(s, o)], relation_hashes=[rel_hash]) + try: + self.metadata_store.set_relation_vector_state(rel_hash, "none") + except Exception: + pass + + if progress_callback: + progress_callback(1) + + for entity_index, raw_entity in enumerate(data.get("entities", []) or []): + entity_name = normalize_entity_import_item(raw_entity) + if not entity_name: + append_warning(f"脚本导入跳过顶层实体[{entity_index}]:无效名称或疑似哈希值") + continue + await self._add_entity_with_vector(entity_name) + + for relation_index, raw_relation in enumerate(data.get("relations", []) or []): + relation = normalize_relation_import_item(raw_relation) + if relation is None: + append_warning(f"脚本导入跳过顶层关系[{relation_index}]:字段无效或疑似哈希值") + continue + + subject = relation["subject"] + predicate = relation["predicate"] + obj = relation["object"] + await self._add_entity_with_vector(subject) + await self._add_entity_with_vector(obj) + + confidence = ( + float(raw_relation.get("confidence", 1.0) or 1.0) + if isinstance(raw_relation, dict) + else 1.0 + ) + rel_meta = raw_relation.get("metadata", {}) if isinstance(raw_relation, dict) else {} + write_vector = self._should_write_relation_vectors() + if self.relation_write_service is not None: + await self.relation_write_service.upsert_relation_with_vector( + subject=subject, + predicate=predicate, + obj=obj, + confidence=confidence, + source_paragraph="", + metadata=rel_meta if isinstance(rel_meta, dict) else {}, + write_vector=write_vector, + ) + else: + rel_hash = self.metadata_store.add_relation( + subject, + predicate, + obj, + confidence=confidence, + source_paragraph="", + metadata=rel_meta if isinstance(rel_meta, dict) else {}, + ) + self.graph_store.add_edges([(subject, obj)], relation_hashes=[rel_hash]) + try: + self.metadata_store.set_relation_vector_state(rel_hash, "none") + except Exception: + pass + + if warning_count > 0: + logger.warning(f"脚本导入完成,跳过异常项 {warning_count} 条") async def close(self): if self.metadata_store: self.metadata_store.close() diff --git a/src/maisaka/builtin_tool/query_memory.py b/src/maisaka/builtin_tool/query_memory.py index 529eb2a7..33bd1ace 100644 --- a/src/maisaka/builtin_tool/query_memory.py +++ b/src/maisaka/builtin_tool/query_memory.py @@ -131,12 +131,10 @@ def _build_success_content(result: MemorySearchResult, *, limit: int) -> str: snippet = result.to_text(limit=max(1, int(limit))) if result.hits: - if summary and snippet: - return f"{summary}\n{snippet}" - if summary: - return summary if snippet: return snippet + if summary: + return summary return "已找到匹配的长期记忆。" if result.filtered: @@ -189,6 +187,11 @@ async def handle_tool( group_id=group_id, ) respect_filter = bool(invocation.arguments.get("respect_filter", True)) + fallback_applied = False + fallback_reason = "" + fallback_query = "" + effective_mode = mode + primary_hit_count = 0 logger.info( f"{runtime.log_prefix} 触发长期记忆检索工具: " @@ -213,12 +216,53 @@ async def handle_tool( invocation.tool_name, f"长期记忆检索失败:{exc}", ) + primary_hit_count = len(result.hits) + + # 方案2:人物过滤未命中时,降级到关键词检索,避免直接“空结果”。 + if ( + result.success + and person_id + and not result.filtered + and not result.hits + and clean_query + ): + fallback_applied = True + fallback_reason = "person_filter_miss" + fallback_query = clean_query + effective_mode = "search" + logger.info( + f"{runtime.log_prefix} 人物过滤未命中,降级为关键词检索: " + f"query={fallback_query!r} original_mode={mode} person_id={person_id!r}" + ) + try: + fallback_result = await memory_service.search( + fallback_query, + limit=limit, + mode="search", + chat_id=session_id, + person_id="", + time_start=None, + time_end=None, + respect_filter=respect_filter, + user_id=user_id, + group_id=group_id, + ) + if fallback_result.success: + result = fallback_result + else: + logger.warning( + f"{runtime.log_prefix} 关键词降级检索失败,回退原结果: " + f"error={fallback_result.error}" + ) + except Exception as exc: + logger.warning(f"{runtime.log_prefix} 关键词降级检索异常,回退原结果: {exc}") structured_content: Dict[str, Any] = result.to_dict() structured_content.update( { "query": clean_query, "mode": mode, + "effective_mode": effective_mode, "limit": limit, "chat_id": session_id, "person_name": person_name, @@ -228,6 +272,10 @@ async def handle_tool( "respect_filter": respect_filter, "user_id": user_id, "group_id": group_id, + "fallback_applied": fallback_applied, + "fallback_reason": fallback_reason, + "fallback_query": fallback_query, + "primary_hit_count": primary_hit_count, } ) @@ -240,6 +288,11 @@ async def handle_tool( ) content = _build_success_content(result, limit=limit) + if fallback_applied: + content = ( + "提示:人物定向检索未命中,已自动降级为关键词检索。\n" + f"{content}" + ) if clean_query: display_prompt = f"你查询了长期记忆:{clean_query}" else: