fix(A_memorix):标准化导入数据并修复进度显示
为导入/LLM输出增加健壮的标准化与验证机制,修复进度计算逻辑,并改进UI摘要展示
This commit is contained in:
@@ -2627,6 +2627,7 @@ class MetadataStore:
|
||||
SELECT source, COUNT(*) as count, MAX(created_at) as last_updated
|
||||
FROM paragraphs
|
||||
WHERE source IS NOT NULL AND source != ''
|
||||
AND (is_deleted IS NULL OR is_deleted = 0)
|
||||
GROUP BY source
|
||||
ORDER BY last_updated DESC
|
||||
""")
|
||||
|
||||
@@ -56,6 +56,63 @@ SUMMARY_PROMPT_TEMPLATE = """
|
||||
注意:总结应具有叙事性,能够作为长程记忆的一部分。直接使用实体的实际名称,不要使用 e1/e2 等代号。
|
||||
"""
|
||||
|
||||
|
||||
def _normalize_entity_items(raw_entities: Any) -> List[str]:
|
||||
if not isinstance(raw_entities, list):
|
||||
return []
|
||||
entities: List[str] = []
|
||||
seen = set()
|
||||
for item in raw_entities:
|
||||
if isinstance(item, str):
|
||||
name = item.strip()
|
||||
elif isinstance(item, dict):
|
||||
name = str(item.get("name") or item.get("label") or item.get("entity") or "").strip()
|
||||
else:
|
||||
name = ""
|
||||
if not name:
|
||||
continue
|
||||
key = name.lower()
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
entities.append(name)
|
||||
return entities
|
||||
|
||||
|
||||
def _normalize_relation_items(raw_relations: Any) -> List[Dict[str, str]]:
|
||||
if not isinstance(raw_relations, list):
|
||||
return []
|
||||
relations: List[Dict[str, str]] = []
|
||||
for item in raw_relations:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
subject = str(item.get("subject", "") or "").strip()
|
||||
predicate = str(item.get("predicate", "") or "").strip()
|
||||
obj = str(item.get("object", "") or "").strip()
|
||||
if not (subject and predicate and obj):
|
||||
continue
|
||||
relations.append({"subject": subject, "predicate": predicate, "object": obj})
|
||||
return relations
|
||||
|
||||
|
||||
def _message_timestamp(message: Any) -> Optional[float]:
|
||||
for attr_name in ("timestamp", "time"):
|
||||
value = getattr(message, attr_name, None)
|
||||
if value is None:
|
||||
continue
|
||||
timestamp_func = getattr(value, "timestamp", None)
|
||||
if callable(timestamp_func):
|
||||
try:
|
||||
return float(timestamp_func())
|
||||
except Exception:
|
||||
continue
|
||||
try:
|
||||
return float(value)
|
||||
except Exception:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
class SummaryImporter:
|
||||
"""总结并导入知识的工具类"""
|
||||
|
||||
@@ -312,14 +369,12 @@ class SummaryImporter:
|
||||
if not data or "summary" not in data:
|
||||
return False, "解析 LLM 响应失败或总结为空"
|
||||
|
||||
summary_text = data["summary"]
|
||||
entities = data.get("entities", [])
|
||||
relations = data.get("relations", [])
|
||||
msg_times = [
|
||||
float(getattr(getattr(msg, "timestamp", None), "timestamp", lambda: 0.0)())
|
||||
for msg in messages
|
||||
if getattr(msg, "time", None) is not None
|
||||
]
|
||||
summary_text = str(data["summary"] or "").strip()
|
||||
if not summary_text:
|
||||
return False, "解析 LLM 响应失败或总结为空"
|
||||
entities = _normalize_entity_items(data.get("entities"))
|
||||
relations = _normalize_relation_items(data.get("relations"))
|
||||
msg_times = [timestamp for msg in messages if (timestamp := _message_timestamp(msg)) is not None]
|
||||
time_meta = {}
|
||||
if msg_times:
|
||||
time_meta = {
|
||||
@@ -455,8 +510,8 @@ class SummaryImporter:
|
||||
if not isinstance(rv_cfg, dict):
|
||||
rv_cfg = {}
|
||||
write_vector = bool(rv_cfg.get("enabled", False)) and bool(rv_cfg.get("write_on_import", True))
|
||||
for rel in relations:
|
||||
s, p, o = rel.get("subject"), rel.get("predicate"), rel.get("object")
|
||||
for rel in _normalize_relation_items(relations):
|
||||
s, p, o = rel["subject"], rel["predicate"], rel["object"]
|
||||
if all([s, p, o]):
|
||||
if self.relation_write_service is not None:
|
||||
await self.relation_write_service.upsert_relation_with_vector(
|
||||
|
||||
@@ -6,6 +6,11 @@ Web Import Task Manager
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
@@ -15,24 +20,24 @@ import sys
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
from src.common.logger import get_logger
|
||||
from src.services import llm_service as llm_api
|
||||
|
||||
from ...paths import default_data_dir, repo_root, resolve_repo_path, scripts_root
|
||||
from ..storage import (
|
||||
KnowledgeType,
|
||||
MetadataStore,
|
||||
parse_import_strategy,
|
||||
resolve_stored_knowledge_type,
|
||||
select_import_strategy,
|
||||
KnowledgeType,
|
||||
MetadataStore,
|
||||
)
|
||||
from ..storage.knowledge_types import ImportStrategy
|
||||
from ..storage.type_detection import looks_like_quote_text
|
||||
from ..strategies.base import KnowledgeType as StrategyKnowledgeType, ProcessedChunk
|
||||
from ..strategies.factual import FactualStrategy
|
||||
from ..strategies.narrative import NarrativeStrategy
|
||||
from ..strategies.quote import QuoteStrategy
|
||||
from ..utils.import_payloads import (
|
||||
ImportPayloadValidationError,
|
||||
is_probable_hash_token,
|
||||
@@ -42,11 +47,6 @@ from ..utils.import_payloads import (
|
||||
)
|
||||
from ..utils.runtime_self_check import ensure_runtime_self_check
|
||||
from ..utils.time_parser import normalize_time_meta
|
||||
from ..storage.knowledge_types import ImportStrategy
|
||||
from ..strategies.base import ProcessedChunk, KnowledgeType as StrategyKnowledgeType
|
||||
from ..strategies.narrative import NarrativeStrategy
|
||||
from ..strategies.factual import FactualStrategy
|
||||
from ..strategies.quote import QuoteStrategy
|
||||
|
||||
logger = get_logger("A_Memorix.WebImportManager")
|
||||
|
||||
@@ -141,6 +141,44 @@ def _coerce_list(value: Any) -> List[str]:
|
||||
return out
|
||||
|
||||
|
||||
def _coerce_import_data_dict(value: Any, *, context: str) -> Dict[str, Any]:
|
||||
"""确保 LLM 抽取结果是对象,避免写入阶段出现部分提交。"""
|
||||
|
||||
if value is None:
|
||||
return {}
|
||||
if isinstance(value, dict):
|
||||
return value
|
||||
raise ValueError(f"{context} 必须返回 JSON 对象,当前类型: {type(value).__name__}")
|
||||
|
||||
|
||||
def _normalize_import_relation_list(value: Any) -> List[Dict[str, str]]:
|
||||
if not isinstance(value, list):
|
||||
return []
|
||||
relations: List[Dict[str, str]] = []
|
||||
for item in value:
|
||||
relation = normalize_relation_import_item(item)
|
||||
if relation is not None:
|
||||
relations.append(relation)
|
||||
return relations
|
||||
|
||||
|
||||
def _normalize_import_entity_list(value: Any) -> List[str]:
|
||||
if not isinstance(value, list):
|
||||
return []
|
||||
entities: List[str] = []
|
||||
seen = set()
|
||||
for item in value:
|
||||
name = normalize_entity_import_item(item)
|
||||
if not name:
|
||||
continue
|
||||
key = name.lower()
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
entities.append(name)
|
||||
return entities
|
||||
|
||||
|
||||
def _parse_optional_positive_int(value: Any, field_name: str) -> Optional[int]:
|
||||
if value is None:
|
||||
return None
|
||||
@@ -3128,6 +3166,7 @@ class ImportTaskManager:
|
||||
if is_probable_hash_token(content):
|
||||
logger.warning("跳过疑似哈希段落写入: source=%s preview=%s", self._source_label(file_record), content[:32])
|
||||
return
|
||||
data = _coerce_import_data_dict(processed.data, context="分块抽取结果")
|
||||
para_hash = self.plugin.metadata_store.add_paragraph(
|
||||
content=content,
|
||||
source=self._source_label(file_record),
|
||||
@@ -3145,31 +3184,25 @@ class ImportTaskManager:
|
||||
f"web_import text paragraph 向量写入降级: hash={para_hash[:8]} detail={vector_result.get('detail')}"
|
||||
)
|
||||
|
||||
data = processed.data or {}
|
||||
entities: List[str] = []
|
||||
relations: List[Tuple[str, str, str]] = []
|
||||
|
||||
for triple in data.get("triples", []):
|
||||
s = str(triple.get("subject", "")).strip()
|
||||
p = str(triple.get("predicate", "")).strip()
|
||||
o = str(triple.get("object", "")).strip()
|
||||
if s and p and o:
|
||||
relations.append((s, p, o))
|
||||
entities.extend([s, o])
|
||||
for triple in _normalize_import_relation_list(data.get("triples")):
|
||||
s = triple["subject"]
|
||||
p = triple["predicate"]
|
||||
o = triple["object"]
|
||||
relations.append((s, p, o))
|
||||
entities.extend([s, o])
|
||||
|
||||
for rel in data.get("relations", []):
|
||||
s = str(rel.get("subject", "")).strip()
|
||||
p = str(rel.get("predicate", "")).strip()
|
||||
o = str(rel.get("object", "")).strip()
|
||||
if s and p and o:
|
||||
relations.append((s, p, o))
|
||||
entities.extend([s, o])
|
||||
for rel in _normalize_import_relation_list(data.get("relations")):
|
||||
s = rel["subject"]
|
||||
p = rel["predicate"]
|
||||
o = rel["object"]
|
||||
relations.append((s, p, o))
|
||||
entities.extend([s, o])
|
||||
|
||||
for k in ("entities", "events", "verbatim_entities"):
|
||||
for e in data.get(k, []):
|
||||
name = str(e or "").strip()
|
||||
if name and not is_probable_hash_token(name):
|
||||
entities.append(name)
|
||||
entities.extend(_normalize_import_entity_list(data.get(k)))
|
||||
|
||||
uniq_entities = list({x.strip().lower(): x.strip() for x in entities if str(x).strip()}.values())
|
||||
for name in uniq_entities:
|
||||
@@ -3297,12 +3330,12 @@ class ImportTaskManager:
|
||||
txt = txt[4:].strip()
|
||||
|
||||
try:
|
||||
return json.loads(txt)
|
||||
return _coerce_import_data_dict(json.loads(txt), context="LLM 抽取结果")
|
||||
except Exception:
|
||||
s = txt.find("{")
|
||||
e = txt.rfind("}")
|
||||
if s >= 0 and e > s:
|
||||
return json.loads(txt[s : e + 1])
|
||||
return _coerce_import_data_dict(json.loads(txt[s : e + 1]), context="LLM 抽取结果")
|
||||
raise
|
||||
except Exception as err:
|
||||
last_error = err
|
||||
@@ -3373,6 +3406,7 @@ JSON schema:
|
||||
logger.warning(f"chat_log 时间语义抽取失败: {e}")
|
||||
return None
|
||||
|
||||
result = _coerce_import_data_dict(result, context="chat_log 时间抽取结果")
|
||||
raw_time_meta = {
|
||||
"event_time": result.get("event_time"),
|
||||
"event_time_start": result.get("event_time_start"),
|
||||
@@ -3632,7 +3666,7 @@ JSON schema:
|
||||
c.progress = 1.0
|
||||
c.updated_at = _now()
|
||||
f.failed_chunks += 1
|
||||
f.progress = self._compute_ratio(f.done_chunks + f.failed_chunks + f.cancelled_chunks, f.total_chunks)
|
||||
f.progress = self._compute_ratio(f.done_chunks, f.total_chunks)
|
||||
if not f.error:
|
||||
f.error = str(error)
|
||||
f.updated_at = _now()
|
||||
@@ -3698,7 +3732,7 @@ JSON schema:
|
||||
task.done_chunks = done
|
||||
task.failed_chunks = failed
|
||||
task.cancelled_chunks = cancelled
|
||||
task.progress = self._compute_ratio(done + failed + cancelled, total)
|
||||
task.progress = self._compute_ratio(done, total)
|
||||
task.updated_at = _now()
|
||||
|
||||
async def _should_cleanup_task_temp(self, task_id: str) -> bool:
|
||||
|
||||
Reference in New Issue
Block a user