This commit is contained in:
SengokuCola
2026-05-07 00:05:39 +08:00
9 changed files with 590 additions and 61 deletions

View File

@@ -154,6 +154,38 @@ function mockImportDetail(taskId: string): memoryApi.MemoryImportTaskPayload {
} }
} }
function mockImportCompletedWithErrorsDetail(taskId: string): memoryApi.MemoryImportTaskPayload {
return {
...mockImportDetail(taskId),
status: 'completed_with_errors',
current_step: 'completed_with_errors',
total_chunks: 12,
done_chunks: 9,
failed_chunks: 3,
cancelled_chunks: 0,
progress: 75,
files: [
{
file_id: 'file-error',
name: 'error.txt',
source_kind: 'paste',
input_mode: 'text',
status: 'failed',
current_step: 'failed',
detected_strategy_type: 'auto',
total_chunks: 12,
done_chunks: 9,
failed_chunks: 3,
cancelled_chunks: 0,
progress: 75,
error: 'mock error',
created_at: 1_710_000_000,
updated_at: 1_710_000_100,
},
],
}
}
describe('KnowledgeBasePage import workflow', () => { describe('KnowledgeBasePage import workflow', () => {
beforeEach(() => { beforeEach(() => {
navigateMock.mockReset() navigateMock.mockReset()
@@ -606,6 +638,21 @@ describe('KnowledgeBasePage import workflow', () => {
) )
}, 20_000) }, 20_000)
it('shows import failures separately from successful chunks', async () => {
vi.mocked(memoryApi.getMemoryImportTask).mockResolvedValue({
success: true,
task: mockImportCompletedWithErrorsDetail('import-run-1'),
})
const user = userEvent.setup()
render(<KnowledgeBasePage />)
await screen.findByText('长期记忆控制台', undefined, { timeout: 10_000 })
await user.click(screen.getByRole('tab', { name: '导入' }))
expect((await screen.findAllByText('完成(有错误)')).length).toBeGreaterThan(0)
expect(await screen.findByText('成功 9 / 12 分块 · 失败 3')).toBeInTheDocument()
}, 20_000)
it('supports cancel and retry actions for selected task', async () => { it('supports cancel and retry actions for selected task', async () => {
const user = userEvent.setup() const user = userEvent.setup()
render(<KnowledgeBasePage />) render(<KnowledgeBasePage />)

View File

@@ -39,6 +39,21 @@ import {
normalizeProgress, normalizeProgress,
} from '../utils' } from '../utils'
function formatChunkSummary(done: unknown, total: unknown, failed: unknown, cancelled: unknown = 0): string {
const doneCount = Number(done ?? 0)
const totalCount = Number(total ?? 0)
const failedCount = Number(failed ?? 0)
const cancelledCount = Number(cancelled ?? 0)
const parts = [`成功 ${doneCount} / ${totalCount} 分块`]
if (failedCount > 0) {
parts.push(`失败 ${failedCount}`)
}
if (cancelledCount > 0) {
parts.push(`取消 ${cancelledCount}`)
}
return parts.join(' · ')
}
export interface ImportTabProps { export interface ImportTabProps {
importCreateMode: MemoryImportTaskKind importCreateMode: MemoryImportTaskKind
setImportCreateMode: Dispatch<SetStateAction<MemoryImportTaskKind>> setImportCreateMode: Dispatch<SetStateAction<MemoryImportTaskKind>>
@@ -1073,12 +1088,19 @@ export function ImportTab(props: ImportTabProps) {
? 'success' ? 'success'
: String(selectedImportTaskResolved.status ?? '') === 'failed' : String(selectedImportTaskResolved.status ?? '') === 'failed'
? 'destructive' ? 'destructive'
: String(selectedImportTaskResolved.status ?? '') === 'cancelled' : String(selectedImportTaskResolved.status ?? '') === 'completed_with_errors'
? 'warning'
: String(selectedImportTaskResolved.status ?? '') === 'cancelled'
? 'muted' ? 'muted'
: 'default' : 'default'
} }
busy={RUNNING_IMPORT_STATUS.has(String(selectedImportTaskResolved.status ?? ''))} busy={RUNNING_IMPORT_STATUS.has(String(selectedImportTaskResolved.status ?? ''))}
detail={`已完成 ${Number(selectedImportTaskResolved.done_chunks ?? 0)} / ${Number(selectedImportTaskResolved.total_chunks ?? 0)} 分块`} detail={formatChunkSummary(
selectedImportTaskResolved.done_chunks,
selectedImportTaskResolved.total_chunks,
selectedImportTaskResolved.failed_chunks,
selectedImportTaskResolved.cancelled_chunks,
)}
/> />
</TableCell> </TableCell>
</TableRow> </TableRow>
@@ -1160,7 +1182,12 @@ export function ImportTab(props: ImportTabProps) {
</div> </div>
<Progress value={normalizeProgress(file.progress)} className="mt-2 h-1.5" /> <Progress value={normalizeProgress(file.progress)} className="mt-2 h-1.5" />
<div className="mt-2 text-xs text-muted-foreground"> <div className="mt-2 text-xs text-muted-foreground">
{formatProgressPercent(file.progress)} · {Number(file.done_chunks ?? 0)} / {Number(file.total_chunks ?? 0)} {formatProgressPercent(file.progress)} · {formatChunkSummary(
file.done_chunks,
file.total_chunks,
file.failed_chunks,
file.cancelled_chunks,
)}
</div> </div>
{file.error ? ( {file.error ? (
<div className="mt-2 truncate text-xs text-destructive">{file.error}</div> <div className="mt-2 truncate text-xs text-destructive">{file.error}</div>

View File

@@ -0,0 +1,21 @@
from pathlib import Path
from src.A_memorix.core.storage.metadata_store import MetadataStore
def test_get_all_sources_ignores_soft_deleted_paragraphs(tmp_path: Path) -> None:
store = MetadataStore(data_dir=tmp_path)
store.connect()
try:
live_hash = store.add_paragraph("Alice 喜欢地图", source="live-source")
deleted_hash = store.add_paragraph("Bob 喜欢咖啡", source="deleted-source")
assert live_hash
store.mark_as_deleted([deleted_hash], "paragraph")
sources = store.get_all_sources()
finally:
store.close()
assert [item["source"] for item in sources] == ["live-source"]
assert sources[0]["count"] == 1

View File

@@ -1,6 +1,11 @@
import pytest import pytest
from src.A_memorix.core.utils.summary_importer import SummaryImporter from src.A_memorix.core.utils.summary_importer import (
SummaryImporter,
_message_timestamp,
_normalize_entity_items,
_normalize_relation_items,
)
from src.config.model_configs import TaskConfig from src.config.model_configs import TaskConfig
from src.services import llm_service as llm_api from src.services import llm_service as llm_api
@@ -46,3 +51,22 @@ def test_resolve_summary_model_config_rejects_legacy_string_selector(monkeypatch
with pytest.raises(ValueError, match="List\\[str\\]"): with pytest.raises(ValueError, match="List\\[str\\]"):
importer._resolve_summary_model_config() importer._resolve_summary_model_config()
def test_summary_importer_normalizes_llm_entities_and_relations():
assert _normalize_entity_items(["Alice", {"name": "地图"}, ["bad"], "Alice"]) == ["Alice", "地图"]
assert _normalize_entity_items("Alice") == []
assert _normalize_relation_items(
[
{"subject": "Alice", "predicate": "持有", "object": "地图"},
{"subject": "Alice", "predicate": "", "object": "地图"},
["bad"],
]
) == [{"subject": "Alice", "predicate": "持有", "object": "地图"}]
def test_summary_importer_message_timestamp_accepts_time_fallback():
class Message:
time = 123.5
assert _message_timestamp(Message()) == 123.5

View File

@@ -0,0 +1,182 @@
from types import SimpleNamespace
import numpy as np
import pytest
from src.A_memorix.core.strategies.base import ChunkContext, KnowledgeType, ProcessedChunk, SourceInfo
from src.A_memorix.core.utils.web_import_manager import (
ImportChunkRecord,
ImportFileRecord,
ImportTaskManager,
ImportTaskRecord,
)
class _DummyMetadataStore:
def __init__(self) -> None:
self.paragraphs: list[dict[str, object]] = []
self.entities: list[str] = []
self.relations: list[tuple[str, str, str]] = []
def add_paragraph(self, **kwargs):
self.paragraphs.append(dict(kwargs))
return f"paragraph-{len(self.paragraphs)}"
def add_entity(self, *, name: str, source_paragraph: str = "") -> str:
del source_paragraph
self.entities.append(name)
return f"entity-{name}"
def add_relation(self, *, subject: str, predicate: str, obj: str, **kwargs) -> str:
del kwargs
self.relations.append((subject, predicate, obj))
return f"relation-{len(self.relations)}"
def set_relation_vector_state(self, rel_hash: str, state: str) -> None:
del rel_hash, state
class _DummyGraphStore:
def __init__(self) -> None:
self.nodes: list[list[str]] = []
self.edges: list[list[tuple[str, str]]] = []
def add_nodes(self, nodes):
self.nodes.append(list(nodes))
def add_edges(self, edges, relation_hashes=None):
del relation_hashes
self.edges.append(list(edges))
class _DummyVectorStore:
def __contains__(self, item: str) -> bool:
del item
return False
def add(self, vectors, ids):
del vectors, ids
class _DummyEmbeddingManager:
async def encode(self, text: str) -> np.ndarray:
del text
return np.ones(4, dtype=np.float32)
def _build_manager() -> tuple[ImportTaskManager, _DummyMetadataStore]:
metadata_store = _DummyMetadataStore()
plugin = SimpleNamespace(
metadata_store=metadata_store,
graph_store=_DummyGraphStore(),
vector_store=_DummyVectorStore(),
embedding_manager=_DummyEmbeddingManager(),
relation_write_service=None,
get_config=lambda key, default=None: default,
_is_embedding_degraded=lambda: False,
_allow_metadata_only_write=lambda: True,
write_paragraph_vector_or_enqueue=None,
)
manager = ImportTaskManager(plugin)
return manager, metadata_store
def _build_progress_task(task_id: str, total_chunks: int = 2) -> ImportTaskRecord:
file_record = ImportFileRecord(
file_id="file-1",
name="demo.txt",
source_kind="paste",
input_mode="text",
total_chunks=total_chunks,
chunks=[
ImportChunkRecord(chunk_id=f"chunk-{index}", index=index, chunk_type="text")
for index in range(total_chunks)
],
)
return ImportTaskRecord(task_id=task_id, source="paste", params={}, files=[file_record])
def _build_chunk(data) -> ProcessedChunk:
return ProcessedChunk(
type=KnowledgeType.FACTUAL,
source=SourceInfo(file="demo.txt", offset_start=0, offset_end=4),
chunk=ChunkContext(chunk_id="chunk-1", index=0, text="Alice 持有地图"),
data=data,
)
@pytest.mark.asyncio
async def test_persist_processed_chunk_rejects_non_object_before_paragraph_write() -> None:
manager, metadata_store = _build_manager()
file_record = SimpleNamespace(source_path="", source_kind="paste", name="demo.txt")
with pytest.raises(ValueError, match="分块抽取结果 必须返回 JSON 对象"):
await manager._persist_processed_chunk(file_record, _build_chunk(["bad"]))
assert metadata_store.paragraphs == []
@pytest.mark.asyncio
async def test_chunk_terminal_progress_uses_successful_chunks_only() -> None:
manager, _ = _build_manager()
task = _build_progress_task("task-fail-then-complete")
manager._tasks[task.task_id] = task
await manager._set_chunk_failed(task.task_id, "file-1", "chunk-0", "boom")
await manager._set_chunk_completed(task.task_id, "file-1", "chunk-1")
file_record = task.files[0]
assert file_record.done_chunks == 1
assert file_record.failed_chunks == 1
assert file_record.progress == pytest.approx(0.5)
assert task.progress == pytest.approx(0.5)
reverse_task = _build_progress_task("task-complete-then-fail")
manager._tasks[reverse_task.task_id] = reverse_task
await manager._set_chunk_completed(reverse_task.task_id, "file-1", "chunk-0")
await manager._set_chunk_failed(reverse_task.task_id, "file-1", "chunk-1", "boom")
reverse_file = reverse_task.files[0]
assert reverse_file.done_chunks == 1
assert reverse_file.failed_chunks == 1
assert reverse_file.progress == pytest.approx(0.5)
assert reverse_task.progress == pytest.approx(0.5)
@pytest.mark.asyncio
async def test_cancelled_chunks_do_not_increase_file_progress() -> None:
manager, _ = _build_manager()
task = _build_progress_task("task-cancelled-progress", total_chunks=3)
manager._tasks[task.task_id] = task
await manager._set_chunk_completed(task.task_id, "file-1", "chunk-0")
await manager._set_chunk_cancelled(task.task_id, "file-1", "chunk-1", "任务已取消")
file_record = task.files[0]
assert file_record.done_chunks == 1
assert file_record.cancelled_chunks == 1
assert file_record.progress == pytest.approx(1 / 3)
assert task.progress == pytest.approx(1 / 3)
@pytest.mark.asyncio
async def test_persist_processed_chunk_skips_invalid_nested_items() -> None:
manager, metadata_store = _build_manager()
file_record = SimpleNamespace(source_path="", source_kind="paste", name="demo.txt")
await manager._persist_processed_chunk(
file_record,
_build_chunk(
{
"triples": [{"subject": "Alice", "predicate": "持有", "object": "地图"}, ["bad"]],
"relations": [{"subject": "Alice", "predicate": "", "object": "地图"}],
"entities": ["Alice", {"name": "地图"}, ["bad"]],
}
),
)
assert len(metadata_store.paragraphs) == 1
assert set(metadata_store.entities) >= {"Alice", "地图"}
assert metadata_store.relations == [("Alice", "持有", "地图")]

View File

@@ -241,6 +241,145 @@ def test_webui_memory_profile_query_prefers_explicit_person_id(client: TestClien
assert response.json()["person_id"] == "explicit-person-id" assert response.json()["person_id"] == "explicit-person-id"
def test_webui_memory_profile_list_enriches_person_name(client: TestClient, monkeypatch):
async def fake_profile_admin(*, action: str, **kwargs):
assert action == "list"
assert kwargs["limit"] == 7
return {
"success": True,
"items": [
{"person_id": "person-1", "profile_text": "profile-1"},
{"person_id": "person-2", "profile_text": "profile-2"},
],
}
monkeypatch.setattr(memory_router_module.memory_service, "profile_admin", fake_profile_admin)
monkeypatch.setattr(
memory_router_module,
"_get_person_name_for_person_id",
lambda person_id: {"person-1": "Alice"}.get(person_id, ""),
)
response = client.get("/api/webui/memory/profiles", params={"limit": 7})
assert response.status_code == 200
assert response.json()["items"][0]["person_name"] == "Alice"
assert response.json()["items"][1]["person_name"] == ""
def test_webui_memory_profile_search_resolves_platform_user_id(client: TestClient, monkeypatch):
def fake_resolve_person_id_for_memory(**kwargs):
assert kwargs == {"platform": "qq", "user_id": "12345", "strict_known": False}
return "resolved-person-id"
async def fake_profile_list(limit: int):
assert limit == 200
return {
"success": True,
"items": [
{"person_id": "resolved-person-id", "person_name": "Alice", "profile_text": "喜欢咖啡"},
{"person_id": "other-person-id", "person_name": "Bob", "profile_text": "喜欢茶"},
],
}
monkeypatch.setattr(memory_router_module, "resolve_person_id_for_memory", fake_resolve_person_id_for_memory)
monkeypatch.setattr(memory_router_module, "_profile_list", fake_profile_list)
response = client.get(
"/api/webui/memory/profiles/search",
params={"platform": "qq", "user_id": "12345", "limit": 50},
)
assert response.status_code == 200
assert response.json()["items"] == [
{"person_id": "resolved-person-id", "person_name": "Alice", "profile_text": "喜欢咖啡"}
]
def test_webui_memory_profile_search_filters_keyword(client: TestClient, monkeypatch):
async def fake_profile_list(limit: int):
assert limit == 200
return {
"success": True,
"items": [
{"person_id": "person-1", "person_name": "Alice", "profile_text": "喜欢咖啡"},
{"person_id": "person-2", "person_name": "Bob", "profile_text": "喜欢茶"},
],
}
monkeypatch.setattr(memory_router_module, "_profile_list", fake_profile_list)
response = client.get("/api/webui/memory/profiles/search", params={"person_keyword": "咖啡", "limit": 50})
assert response.status_code == 200
assert response.json()["items"] == [
{"person_id": "person-1", "person_name": "Alice", "profile_text": "喜欢咖啡"}
]
def test_webui_memory_episode_list_resolves_platform_user_id(client: TestClient, monkeypatch):
def fake_resolve_person_id_for_memory(**kwargs):
assert kwargs == {"platform": "qq", "user_id": "12345", "strict_known": False}
return "resolved-person-id"
async def fake_episode_admin(*, action: str, **kwargs):
assert action == "list"
assert kwargs == {
"query": "咖啡",
"limit": 9,
"source": "chat_summary:demo",
"person_id": "resolved-person-id",
"time_start": 100.0,
"time_end": 200.0,
}
return {
"success": True,
"items": [{"episode_id": "ep-1", "person_id": "resolved-person-id", "summary": "喝咖啡"}],
"count": 1,
}
monkeypatch.setattr(memory_router_module, "resolve_person_id_for_memory", fake_resolve_person_id_for_memory)
monkeypatch.setattr(memory_router_module.memory_service, "episode_admin", fake_episode_admin)
monkeypatch.setattr(memory_router_module, "_get_person_name_for_person_id", lambda person_id: "测试人物")
response = client.get(
"/api/webui/memory/episodes",
params={
"query": "咖啡",
"limit": 9,
"source": "chat_summary:demo",
"platform": "qq",
"user_id": "12345",
"time_start": 100,
"time_end": 200,
},
)
assert response.status_code == 200
assert response.json()["items"][0]["person_name"] == "测试人物"
def test_webui_memory_episode_list_prefers_explicit_person_id(client: TestClient, monkeypatch):
def fake_resolve_person_id_for_memory(**kwargs):
raise AssertionError(f"不应解析平台账号: {kwargs}")
async def fake_episode_admin(*, action: str, **kwargs):
assert action == "list"
assert kwargs["person_id"] == "explicit-person-id"
return {"success": True, "items": []}
monkeypatch.setattr(memory_router_module, "resolve_person_id_for_memory", fake_resolve_person_id_for_memory)
monkeypatch.setattr(memory_router_module.memory_service, "episode_admin", fake_episode_admin)
response = client.get(
"/api/webui/memory/episodes",
params={"person_id": "explicit-person-id", "platform": "qq", "user_id": "12345"},
)
assert response.status_code == 200
assert response.json()["items"] == []
def test_compat_aggregate_route(client: TestClient, monkeypatch): def test_compat_aggregate_route(client: TestClient, monkeypatch):
async def fake_search(query: str, **kwargs): async def fake_search(query: str, **kwargs):
assert kwargs["mode"] == "aggregate" assert kwargs["mode"] == "aggregate"

View File

@@ -2623,6 +2623,7 @@ class MetadataStore:
SELECT source, COUNT(*) as count, MAX(created_at) as last_updated SELECT source, COUNT(*) as count, MAX(created_at) as last_updated
FROM paragraphs FROM paragraphs
WHERE source IS NOT NULL AND source != '' WHERE source IS NOT NULL AND source != ''
AND (is_deleted IS NULL OR is_deleted = 0)
GROUP BY source GROUP BY source
ORDER BY last_updated DESC ORDER BY last_updated DESC
""") """)

View File

@@ -56,6 +56,63 @@ SUMMARY_PROMPT_TEMPLATE = """
注意:总结应具有叙事性,能够作为长程记忆的一部分。直接使用实体的实际名称,不要使用 e1/e2 等代号。 注意:总结应具有叙事性,能够作为长程记忆的一部分。直接使用实体的实际名称,不要使用 e1/e2 等代号。
""" """
def _normalize_entity_items(raw_entities: Any) -> List[str]:
if not isinstance(raw_entities, list):
return []
entities: List[str] = []
seen = set()
for item in raw_entities:
if isinstance(item, str):
name = item.strip()
elif isinstance(item, dict):
name = str(item.get("name") or item.get("label") or item.get("entity") or "").strip()
else:
name = ""
if not name:
continue
key = name.lower()
if key in seen:
continue
seen.add(key)
entities.append(name)
return entities
def _normalize_relation_items(raw_relations: Any) -> List[Dict[str, str]]:
if not isinstance(raw_relations, list):
return []
relations: List[Dict[str, str]] = []
for item in raw_relations:
if not isinstance(item, dict):
continue
subject = str(item.get("subject", "") or "").strip()
predicate = str(item.get("predicate", "") or "").strip()
obj = str(item.get("object", "") or "").strip()
if not (subject and predicate and obj):
continue
relations.append({"subject": subject, "predicate": predicate, "object": obj})
return relations
def _message_timestamp(message: Any) -> Optional[float]:
for attr_name in ("timestamp", "time"):
value = getattr(message, attr_name, None)
if value is None:
continue
timestamp_func = getattr(value, "timestamp", None)
if callable(timestamp_func):
try:
return float(timestamp_func())
except Exception:
continue
try:
return float(value)
except Exception:
continue
return None
class SummaryImporter: class SummaryImporter:
"""总结并导入知识的工具类""" """总结并导入知识的工具类"""
@@ -312,14 +369,12 @@ class SummaryImporter:
if not data or "summary" not in data: if not data or "summary" not in data:
return False, "解析 LLM 响应失败或总结为空" return False, "解析 LLM 响应失败或总结为空"
summary_text = data["summary"] summary_text = str(data["summary"] or "").strip()
entities = data.get("entities", []) if not summary_text:
relations = data.get("relations", []) return False, "解析 LLM 响应失败或总结为空"
msg_times = [ entities = _normalize_entity_items(data.get("entities"))
float(getattr(getattr(msg, "timestamp", None), "timestamp", lambda: 0.0)()) relations = _normalize_relation_items(data.get("relations"))
for msg in messages msg_times = [timestamp for msg in messages if (timestamp := _message_timestamp(msg)) is not None]
if getattr(msg, "time", None) is not None
]
time_meta = {} time_meta = {}
if msg_times: if msg_times:
time_meta = { time_meta = {
@@ -455,8 +510,8 @@ class SummaryImporter:
if not isinstance(rv_cfg, dict): if not isinstance(rv_cfg, dict):
rv_cfg = {} rv_cfg = {}
write_vector = bool(rv_cfg.get("enabled", False)) and bool(rv_cfg.get("write_on_import", True)) write_vector = bool(rv_cfg.get("enabled", False)) and bool(rv_cfg.get("write_on_import", True))
for rel in relations: for rel in _normalize_relation_items(relations):
s, p, o = rel.get("subject"), rel.get("predicate"), rel.get("object") s, p, o = rel["subject"], rel["predicate"], rel["object"]
if all([s, p, o]): if all([s, p, o]):
if self.relation_write_service is not None: if self.relation_write_service is not None:
await self.relation_write_service.upsert_relation_with_vector( await self.relation_write_service.upsert_relation_with_vector(

View File

@@ -6,6 +6,11 @@ Web Import Task Manager
from __future__ import annotations from __future__ import annotations
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
import asyncio import asyncio
import hashlib import hashlib
import json import json
@@ -15,24 +20,24 @@ import sys
import time import time
import traceback import traceback
import uuid import uuid
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from src.common.logger import get_logger from src.common.logger import get_logger
from src.services import llm_service as llm_api from src.services import llm_service as llm_api
from ...paths import default_data_dir, repo_root, resolve_repo_path, scripts_root from ...paths import default_data_dir, repo_root, resolve_repo_path, scripts_root
from ..storage import ( from ..storage import (
KnowledgeType,
MetadataStore,
parse_import_strategy, parse_import_strategy,
resolve_stored_knowledge_type, resolve_stored_knowledge_type,
select_import_strategy, select_import_strategy,
KnowledgeType,
MetadataStore,
) )
from ..storage.knowledge_types import ImportStrategy
from ..storage.type_detection import looks_like_quote_text from ..storage.type_detection import looks_like_quote_text
from ..strategies.base import KnowledgeType as StrategyKnowledgeType, ProcessedChunk
from ..strategies.factual import FactualStrategy
from ..strategies.narrative import NarrativeStrategy
from ..strategies.quote import QuoteStrategy
from ..utils.import_payloads import ( from ..utils.import_payloads import (
ImportPayloadValidationError, ImportPayloadValidationError,
is_probable_hash_token, is_probable_hash_token,
@@ -42,11 +47,6 @@ from ..utils.import_payloads import (
) )
from ..utils.runtime_self_check import ensure_runtime_self_check from ..utils.runtime_self_check import ensure_runtime_self_check
from ..utils.time_parser import normalize_time_meta from ..utils.time_parser import normalize_time_meta
from ..storage.knowledge_types import ImportStrategy
from ..strategies.base import ProcessedChunk, KnowledgeType as StrategyKnowledgeType
from ..strategies.narrative import NarrativeStrategy
from ..strategies.factual import FactualStrategy
from ..strategies.quote import QuoteStrategy
logger = get_logger("A_Memorix.WebImportManager") logger = get_logger("A_Memorix.WebImportManager")
@@ -141,6 +141,44 @@ def _coerce_list(value: Any) -> List[str]:
return out return out
def _coerce_import_data_dict(value: Any, *, context: str) -> Dict[str, Any]:
"""确保 LLM 抽取结果是对象,避免写入阶段出现部分提交。"""
if value is None:
return {}
if isinstance(value, dict):
return value
raise ValueError(f"{context} 必须返回 JSON 对象,当前类型: {type(value).__name__}")
def _normalize_import_relation_list(value: Any) -> List[Dict[str, str]]:
if not isinstance(value, list):
return []
relations: List[Dict[str, str]] = []
for item in value:
relation = normalize_relation_import_item(item)
if relation is not None:
relations.append(relation)
return relations
def _normalize_import_entity_list(value: Any) -> List[str]:
if not isinstance(value, list):
return []
entities: List[str] = []
seen = set()
for item in value:
name = normalize_entity_import_item(item)
if not name:
continue
key = name.lower()
if key in seen:
continue
seen.add(key)
entities.append(name)
return entities
def _parse_optional_positive_int(value: Any, field_name: str) -> Optional[int]: def _parse_optional_positive_int(value: Any, field_name: str) -> Optional[int]:
if value is None: if value is None:
return None return None
@@ -2002,7 +2040,7 @@ class ImportTaskManager:
if total <= 0: if total <= 0:
total = max(1, scanned) total = max(1, scanned)
progress = max(0.0, min(1.0, float(scanned) / float(total))) if total > 0 else 0.0 chunk_progress = max(0.0, min(1.0, float(scanned) / float(total))) if total > 0 else 0.0
preview = f"scanned={scanned}/{total}, migrated={migrated}, bad={bad}, last_id={last_id}" preview = f"scanned={scanned}/{total}, migrated={migrated}, bad={bad}, last_id={last_id}"
async with self._lock: async with self._lock:
@@ -2017,14 +2055,14 @@ class ImportTaskManager:
if c.status not in {"completed", "failed", "cancelled"}: if c.status not in {"completed", "failed", "cancelled"}:
c.status = "writing" c.status = "writing"
c.step = "migrating" c.step = "migrating"
c.progress = progress c.progress = chunk_progress
c.content_preview = preview c.content_preview = preview
c.updated_at = _now() c.updated_at = _now()
f.total_chunks = total f.total_chunks = total
f.done_chunks = done f.done_chunks = done
f.failed_chunks = bad f.failed_chunks = bad
f.cancelled_chunks = 0 f.cancelled_chunks = 0
f.progress = progress self._recompute_file_progress(f)
if f.status not in {"failed", "cancelled"}: if f.status not in {"failed", "cancelled"}:
f.status = "writing" f.status = "writing"
f.current_step = "migrating" f.current_step = "migrating"
@@ -2171,7 +2209,7 @@ class ImportTaskManager:
f.done_chunks = max(0, min(f.done_chunks, f.total_chunks)) f.done_chunks = max(0, min(f.done_chunks, f.total_chunks))
f.failed_chunks = max(0, min(f.failed_chunks, f.total_chunks)) f.failed_chunks = max(0, min(f.failed_chunks, f.total_chunks))
f.cancelled_chunks = 0 f.cancelled_chunks = 0
f.progress = 1.0 self._recompute_file_progress(f)
f.status = "completed" f.status = "completed"
f.current_step = "completed" f.current_step = "completed"
if bad_rows > 0 and not f.error: if bad_rows > 0 and not f.error:
@@ -3128,6 +3166,7 @@ class ImportTaskManager:
if is_probable_hash_token(content): if is_probable_hash_token(content):
logger.warning(f"跳过疑似哈希段落写入: source={self._source_label(file_record)} preview={content[:32]}") logger.warning(f"跳过疑似哈希段落写入: source={self._source_label(file_record)} preview={content[:32]}")
return return
data = _coerce_import_data_dict(processed.data, context="分块抽取结果")
para_hash = self.plugin.metadata_store.add_paragraph( para_hash = self.plugin.metadata_store.add_paragraph(
content=content, content=content,
source=self._source_label(file_record), source=self._source_label(file_record),
@@ -3145,31 +3184,25 @@ class ImportTaskManager:
f"web_import text paragraph 向量写入降级: hash={para_hash[:8]} detail={vector_result.get('detail')}" f"web_import text paragraph 向量写入降级: hash={para_hash[:8]} detail={vector_result.get('detail')}"
) )
data = processed.data or {}
entities: List[str] = [] entities: List[str] = []
relations: List[Tuple[str, str, str]] = [] relations: List[Tuple[str, str, str]] = []
for triple in data.get("triples", []): for triple in _normalize_import_relation_list(data.get("triples")):
s = str(triple.get("subject", "")).strip() s = triple["subject"]
p = str(triple.get("predicate", "")).strip() p = triple["predicate"]
o = str(triple.get("object", "")).strip() o = triple["object"]
if s and p and o: relations.append((s, p, o))
relations.append((s, p, o)) entities.extend([s, o])
entities.extend([s, o])
for rel in data.get("relations", []): for rel in _normalize_import_relation_list(data.get("relations")):
s = str(rel.get("subject", "")).strip() s = rel["subject"]
p = str(rel.get("predicate", "")).strip() p = rel["predicate"]
o = str(rel.get("object", "")).strip() o = rel["object"]
if s and p and o: relations.append((s, p, o))
relations.append((s, p, o)) entities.extend([s, o])
entities.extend([s, o])
for k in ("entities", "events", "verbatim_entities"): for k in ("entities", "events", "verbatim_entities"):
for e in data.get(k, []): entities.extend(_normalize_import_entity_list(data.get(k)))
name = str(e or "").strip()
if name and not is_probable_hash_token(name):
entities.append(name)
uniq_entities = list({x.strip().lower(): x.strip() for x in entities if str(x).strip()}.values()) uniq_entities = list({x.strip().lower(): x.strip() for x in entities if str(x).strip()}.values())
for name in uniq_entities: for name in uniq_entities:
@@ -3294,12 +3327,12 @@ class ImportTaskManager:
txt = txt[4:].strip() txt = txt[4:].strip()
try: try:
return json.loads(txt) return _coerce_import_data_dict(json.loads(txt), context="LLM 抽取结果")
except Exception: except Exception:
s = txt.find("{") s = txt.find("{")
e = txt.rfind("}") e = txt.rfind("}")
if s >= 0 and e > s: if s >= 0 and e > s:
return json.loads(txt[s : e + 1]) return _coerce_import_data_dict(json.loads(txt[s : e + 1]), context="LLM 抽取结果")
raise raise
except Exception as err: except Exception as err:
last_error = err last_error = err
@@ -3370,6 +3403,7 @@ JSON schema:
logger.warning(f"chat_log 时间语义抽取失败: {e}") logger.warning(f"chat_log 时间语义抽取失败: {e}")
return None return None
result = _coerce_import_data_dict(result, context="chat_log 时间抽取结果")
raw_time_meta = { raw_time_meta = {
"event_time": result.get("event_time"), "event_time": result.get("event_time"),
"event_time_start": result.get("event_time_start"), "event_time_start": result.get("event_time_start"),
@@ -3541,9 +3575,7 @@ JSON schema:
additional_cancelled += 1 additional_cancelled += 1
if additional_cancelled > 0: if additional_cancelled > 0:
f.cancelled_chunks += additional_cancelled f.cancelled_chunks += additional_cancelled
f.progress = self._compute_ratio( self._recompute_file_progress(f)
f.done_chunks + f.failed_chunks + f.cancelled_chunks, f.total_chunks
)
f.updated_at = _now() f.updated_at = _now()
task.updated_at = _now() task.updated_at = _now()
self._recompute_task_progress(task) self._recompute_task_progress(task)
@@ -3601,7 +3633,7 @@ JSON schema:
c.progress = 1.0 c.progress = 1.0
c.updated_at = _now() c.updated_at = _now()
f.done_chunks += 1 f.done_chunks += 1
f.progress = self._compute_ratio(f.done_chunks + f.failed_chunks + f.cancelled_chunks, f.total_chunks) self._recompute_file_progress(f)
f.updated_at = _now() f.updated_at = _now()
self._recompute_task_progress(task) self._recompute_task_progress(task)
@@ -3629,7 +3661,7 @@ JSON schema:
c.progress = 1.0 c.progress = 1.0
c.updated_at = _now() c.updated_at = _now()
f.failed_chunks += 1 f.failed_chunks += 1
f.progress = self._compute_ratio(f.done_chunks + f.failed_chunks + f.cancelled_chunks, f.total_chunks) self._recompute_file_progress(f)
if not f.error: if not f.error:
f.error = str(error) f.error = str(error)
f.updated_at = _now() f.updated_at = _now()
@@ -3653,7 +3685,7 @@ JSON schema:
c.progress = 1.0 c.progress = 1.0
c.updated_at = _now() c.updated_at = _now()
f.cancelled_chunks += 1 f.cancelled_chunks += 1
f.progress = self._compute_ratio(f.done_chunks + f.failed_chunks + f.cancelled_chunks, f.total_chunks) self._recompute_file_progress(f)
f.updated_at = _now() f.updated_at = _now()
self._recompute_task_progress(task) self._recompute_task_progress(task)
@@ -3681,6 +3713,9 @@ JSON schema:
return 1.0 return 1.0
return max(0.0, min(1.0, float(done) / float(total))) return max(0.0, min(1.0, float(done) / float(total)))
def _recompute_file_progress(self, file_record: ImportFileRecord) -> None:
file_record.progress = self._compute_ratio(file_record.done_chunks, file_record.total_chunks)
def _recompute_task_progress(self, task: ImportTaskRecord) -> None: def _recompute_task_progress(self, task: ImportTaskRecord) -> None:
total = 0 total = 0
done = 0 done = 0
@@ -3695,7 +3730,7 @@ JSON schema:
task.done_chunks = done task.done_chunks = done
task.failed_chunks = failed task.failed_chunks = failed
task.cancelled_chunks = cancelled task.cancelled_chunks = cancelled
task.progress = self._compute_ratio(done + failed + cancelled, total) task.progress = self._compute_ratio(done, total)
task.updated_at = _now() task.updated_at = _now()
async def _should_cleanup_task_temp(self, task_id: str) -> bool: async def _should_cleanup_task_temp(self, task_id: str) -> bool:
@@ -3728,9 +3763,7 @@ JSON schema:
additional_cancelled += 1 additional_cancelled += 1
if additional_cancelled > 0: if additional_cancelled > 0:
f.cancelled_chunks += additional_cancelled f.cancelled_chunks += additional_cancelled
f.progress = self._compute_ratio( self._recompute_file_progress(f)
f.done_chunks + f.failed_chunks + f.cancelled_chunks, f.total_chunks
)
f.updated_at = _now() f.updated_at = _now()
task.status = "cancelled" task.status = "cancelled"
task.current_step = "cancelled" task.current_step = "cancelled"