From 9acf03d24bbdcac2907be0264bdcef593fd706c9 Mon Sep 17 00:00:00 2001 From: DawnARC Date: Wed, 6 May 2026 22:41:35 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix(A=5Fmemorix):=E6=A0=87=E5=87=86?= =?UTF-8?q?=E5=8C=96=E5=AF=BC=E5=85=A5=E6=95=B0=E6=8D=AE=E5=B9=B6=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E8=BF=9B=E5=BA=A6=E6=98=BE=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 为导入/LLM输出增加健壮的标准化与验证机制,修复进度计算逻辑,并改进UI摘要展示 --- .../__tests__/knowledge-base.test.tsx | 47 ++++++ .../knowledge-base/tabs/ImportTab.tsx | 33 ++++- .../test_metadata_store_sources.py | 21 +++ .../test_summary_importer_model_config.py | 26 +++- .../test_web_import_manager_payloads.py | 117 +++++++++++++++ pytests/webui/test_memory_routes.py | 139 ++++++++++++++++++ src/A_memorix/core/storage/metadata_store.py | 1 + src/A_memorix/core/utils/summary_importer.py | 75 ++++++++-- .../core/utils/web_import_manager.py | 104 ++++++++----- 9 files changed, 514 insertions(+), 49 deletions(-) create mode 100644 pytests/A_memorix_test/test_metadata_store_sources.py create mode 100644 pytests/A_memorix_test/test_web_import_manager_payloads.py diff --git a/dashboard/src/routes/resource/__tests__/knowledge-base.test.tsx b/dashboard/src/routes/resource/__tests__/knowledge-base.test.tsx index a26fc395..1276f4ed 100644 --- a/dashboard/src/routes/resource/__tests__/knowledge-base.test.tsx +++ b/dashboard/src/routes/resource/__tests__/knowledge-base.test.tsx @@ -154,6 +154,38 @@ function mockImportDetail(taskId: string): memoryApi.MemoryImportTaskPayload { } } +function mockImportCompletedWithErrorsDetail(taskId: string): memoryApi.MemoryImportTaskPayload { + return { + ...mockImportDetail(taskId), + status: 'completed_with_errors', + current_step: 'completed_with_errors', + total_chunks: 12, + done_chunks: 9, + failed_chunks: 3, + cancelled_chunks: 0, + progress: 75, + files: [ + { + file_id: 'file-error', + name: 'error.txt', + source_kind: 'paste', + input_mode: 'text', + status: 'failed', + current_step: 'failed', + detected_strategy_type: 'auto', + total_chunks: 12, + done_chunks: 9, + failed_chunks: 3, + cancelled_chunks: 0, + progress: 75, + error: 'mock error', + created_at: 1_710_000_000, + updated_at: 1_710_000_100, + }, + ], + } +} + describe('KnowledgeBasePage import workflow', () => { beforeEach(() => { navigateMock.mockReset() @@ -606,6 +638,21 @@ describe('KnowledgeBasePage import workflow', () => { ) }, 20_000) + it('shows import failures separately from successful chunks', async () => { + vi.mocked(memoryApi.getMemoryImportTask).mockResolvedValue({ + success: true, + task: mockImportCompletedWithErrorsDetail('import-run-1'), + }) + const user = userEvent.setup() + render() + + await screen.findByText('长期记忆控制台', undefined, { timeout: 10_000 }) + await user.click(screen.getByRole('tab', { name: '导入' })) + + expect((await screen.findAllByText('完成(有错误)')).length).toBeGreaterThan(0) + expect(await screen.findByText('成功 9 / 12 分块 · 失败 3')).toBeInTheDocument() + }, 20_000) + it('supports cancel and retry actions for selected task', async () => { const user = userEvent.setup() render() diff --git a/dashboard/src/routes/resource/knowledge-base/tabs/ImportTab.tsx b/dashboard/src/routes/resource/knowledge-base/tabs/ImportTab.tsx index 80d1674e..bf4c95c1 100644 --- a/dashboard/src/routes/resource/knowledge-base/tabs/ImportTab.tsx +++ b/dashboard/src/routes/resource/knowledge-base/tabs/ImportTab.tsx @@ -39,6 +39,21 @@ import { normalizeProgress, } from '../utils' +function formatChunkSummary(done: unknown, total: unknown, failed: unknown, cancelled: unknown = 0): string { + const doneCount = Number(done ?? 0) + const totalCount = Number(total ?? 0) + const failedCount = Number(failed ?? 0) + const cancelledCount = Number(cancelled ?? 0) + const parts = [`成功 ${doneCount} / ${totalCount} 分块`] + if (failedCount > 0) { + parts.push(`失败 ${failedCount}`) + } + if (cancelledCount > 0) { + parts.push(`取消 ${cancelledCount}`) + } + return parts.join(' · ') +} + export interface ImportTabProps { importCreateMode: MemoryImportTaskKind setImportCreateMode: Dispatch> @@ -1073,12 +1088,19 @@ export function ImportTab(props: ImportTabProps) { ? 'success' : String(selectedImportTaskResolved.status ?? '') === 'failed' ? 'destructive' - : String(selectedImportTaskResolved.status ?? '') === 'cancelled' + : String(selectedImportTaskResolved.status ?? '') === 'completed_with_errors' + ? 'warning' + : String(selectedImportTaskResolved.status ?? '') === 'cancelled' ? 'muted' : 'default' } busy={RUNNING_IMPORT_STATUS.has(String(selectedImportTaskResolved.status ?? ''))} - detail={`已完成 ${Number(selectedImportTaskResolved.done_chunks ?? 0)} / ${Number(selectedImportTaskResolved.total_chunks ?? 0)} 分块`} + detail={formatChunkSummary( + selectedImportTaskResolved.done_chunks, + selectedImportTaskResolved.total_chunks, + selectedImportTaskResolved.failed_chunks, + selectedImportTaskResolved.cancelled_chunks, + )} /> @@ -1160,7 +1182,12 @@ export function ImportTab(props: ImportTabProps) {
- {formatProgressPercent(file.progress)} · {Number(file.done_chunks ?? 0)} / {Number(file.total_chunks ?? 0)} + {formatProgressPercent(file.progress)} · {formatChunkSummary( + file.done_chunks, + file.total_chunks, + file.failed_chunks, + file.cancelled_chunks, + )}
{file.error ? (
{file.error}
diff --git a/pytests/A_memorix_test/test_metadata_store_sources.py b/pytests/A_memorix_test/test_metadata_store_sources.py new file mode 100644 index 00000000..dcedbd3d --- /dev/null +++ b/pytests/A_memorix_test/test_metadata_store_sources.py @@ -0,0 +1,21 @@ +from pathlib import Path + +from src.A_memorix.core.storage.metadata_store import MetadataStore + + +def test_get_all_sources_ignores_soft_deleted_paragraphs(tmp_path: Path) -> None: + store = MetadataStore(data_dir=tmp_path) + store.connect() + try: + live_hash = store.add_paragraph("Alice 喜欢地图", source="live-source") + deleted_hash = store.add_paragraph("Bob 喜欢咖啡", source="deleted-source") + + assert live_hash + store.mark_as_deleted([deleted_hash], "paragraph") + + sources = store.get_all_sources() + finally: + store.close() + + assert [item["source"] for item in sources] == ["live-source"] + assert sources[0]["count"] == 1 diff --git a/pytests/A_memorix_test/test_summary_importer_model_config.py b/pytests/A_memorix_test/test_summary_importer_model_config.py index 845deb08..e972361e 100644 --- a/pytests/A_memorix_test/test_summary_importer_model_config.py +++ b/pytests/A_memorix_test/test_summary_importer_model_config.py @@ -1,6 +1,11 @@ import pytest -from src.A_memorix.core.utils.summary_importer import SummaryImporter +from src.A_memorix.core.utils.summary_importer import ( + SummaryImporter, + _message_timestamp, + _normalize_entity_items, + _normalize_relation_items, +) from src.config.model_configs import TaskConfig from src.services import llm_service as llm_api @@ -46,3 +51,22 @@ def test_resolve_summary_model_config_rejects_legacy_string_selector(monkeypatch with pytest.raises(ValueError, match="List\\[str\\]"): importer._resolve_summary_model_config() + + +def test_summary_importer_normalizes_llm_entities_and_relations(): + assert _normalize_entity_items(["Alice", {"name": "地图"}, ["bad"], "Alice"]) == ["Alice", "地图"] + assert _normalize_entity_items("Alice") == [] + assert _normalize_relation_items( + [ + {"subject": "Alice", "predicate": "持有", "object": "地图"}, + {"subject": "Alice", "predicate": "", "object": "地图"}, + ["bad"], + ] + ) == [{"subject": "Alice", "predicate": "持有", "object": "地图"}] + + +def test_summary_importer_message_timestamp_accepts_time_fallback(): + class Message: + time = 123.5 + + assert _message_timestamp(Message()) == 123.5 diff --git a/pytests/A_memorix_test/test_web_import_manager_payloads.py b/pytests/A_memorix_test/test_web_import_manager_payloads.py new file mode 100644 index 00000000..8fdca65d --- /dev/null +++ b/pytests/A_memorix_test/test_web_import_manager_payloads.py @@ -0,0 +1,117 @@ +from types import SimpleNamespace + +import numpy as np +import pytest + +from src.A_memorix.core.strategies.base import ChunkContext, KnowledgeType, ProcessedChunk, SourceInfo +from src.A_memorix.core.utils.web_import_manager import ImportTaskManager + + +class _DummyMetadataStore: + def __init__(self) -> None: + self.paragraphs: list[dict[str, object]] = [] + self.entities: list[str] = [] + self.relations: list[tuple[str, str, str]] = [] + + def add_paragraph(self, **kwargs): + self.paragraphs.append(dict(kwargs)) + return f"paragraph-{len(self.paragraphs)}" + + def add_entity(self, *, name: str, source_paragraph: str = "") -> str: + del source_paragraph + self.entities.append(name) + return f"entity-{name}" + + def add_relation(self, *, subject: str, predicate: str, obj: str, **kwargs) -> str: + del kwargs + self.relations.append((subject, predicate, obj)) + return f"relation-{len(self.relations)}" + + def set_relation_vector_state(self, rel_hash: str, state: str) -> None: + del rel_hash, state + + +class _DummyGraphStore: + def __init__(self) -> None: + self.nodes: list[list[str]] = [] + self.edges: list[list[tuple[str, str]]] = [] + + def add_nodes(self, nodes): + self.nodes.append(list(nodes)) + + def add_edges(self, edges, relation_hashes=None): + del relation_hashes + self.edges.append(list(edges)) + + +class _DummyVectorStore: + def __contains__(self, item: str) -> bool: + del item + return False + + def add(self, vectors, ids): + del vectors, ids + + +class _DummyEmbeddingManager: + async def encode(self, text: str) -> np.ndarray: + del text + return np.ones(4, dtype=np.float32) + + +def _build_manager() -> tuple[ImportTaskManager, _DummyMetadataStore]: + metadata_store = _DummyMetadataStore() + plugin = SimpleNamespace( + metadata_store=metadata_store, + graph_store=_DummyGraphStore(), + vector_store=_DummyVectorStore(), + embedding_manager=_DummyEmbeddingManager(), + relation_write_service=None, + get_config=lambda key, default=None: default, + _is_embedding_degraded=lambda: False, + _allow_metadata_only_write=lambda: True, + write_paragraph_vector_or_enqueue=None, + ) + manager = ImportTaskManager(plugin) + return manager, metadata_store + + +def _build_chunk(data) -> ProcessedChunk: + return ProcessedChunk( + type=KnowledgeType.FACTUAL, + source=SourceInfo(file="demo.txt", offset_start=0, offset_end=4), + chunk=ChunkContext(chunk_id="chunk-1", index=0, text="Alice 持有地图"), + data=data, + ) + + +@pytest.mark.asyncio +async def test_persist_processed_chunk_rejects_non_object_before_paragraph_write() -> None: + manager, metadata_store = _build_manager() + file_record = SimpleNamespace(source_path="", source_kind="paste", name="demo.txt") + + with pytest.raises(ValueError, match="分块抽取结果 必须返回 JSON 对象"): + await manager._persist_processed_chunk(file_record, _build_chunk(["bad"])) + + assert metadata_store.paragraphs == [] + + +@pytest.mark.asyncio +async def test_persist_processed_chunk_skips_invalid_nested_items() -> None: + manager, metadata_store = _build_manager() + file_record = SimpleNamespace(source_path="", source_kind="paste", name="demo.txt") + + await manager._persist_processed_chunk( + file_record, + _build_chunk( + { + "triples": [{"subject": "Alice", "predicate": "持有", "object": "地图"}, ["bad"]], + "relations": [{"subject": "Alice", "predicate": "", "object": "地图"}], + "entities": ["Alice", {"name": "地图"}, ["bad"]], + } + ), + ) + + assert len(metadata_store.paragraphs) == 1 + assert set(metadata_store.entities) >= {"Alice", "地图"} + assert metadata_store.relations == [("Alice", "持有", "地图")] diff --git a/pytests/webui/test_memory_routes.py b/pytests/webui/test_memory_routes.py index d2894786..8681c561 100644 --- a/pytests/webui/test_memory_routes.py +++ b/pytests/webui/test_memory_routes.py @@ -241,6 +241,145 @@ def test_webui_memory_profile_query_prefers_explicit_person_id(client: TestClien assert response.json()["person_id"] == "explicit-person-id" +def test_webui_memory_profile_list_enriches_person_name(client: TestClient, monkeypatch): + async def fake_profile_admin(*, action: str, **kwargs): + assert action == "list" + assert kwargs["limit"] == 7 + return { + "success": True, + "items": [ + {"person_id": "person-1", "profile_text": "profile-1"}, + {"person_id": "person-2", "profile_text": "profile-2"}, + ], + } + + monkeypatch.setattr(memory_router_module.memory_service, "profile_admin", fake_profile_admin) + monkeypatch.setattr( + memory_router_module, + "_get_person_name_for_person_id", + lambda person_id: {"person-1": "Alice"}.get(person_id, ""), + ) + + response = client.get("/api/webui/memory/profiles", params={"limit": 7}) + + assert response.status_code == 200 + assert response.json()["items"][0]["person_name"] == "Alice" + assert response.json()["items"][1]["person_name"] == "" + + +def test_webui_memory_profile_search_resolves_platform_user_id(client: TestClient, monkeypatch): + def fake_resolve_person_id_for_memory(**kwargs): + assert kwargs == {"platform": "qq", "user_id": "12345", "strict_known": False} + return "resolved-person-id" + + async def fake_profile_list(limit: int): + assert limit == 200 + return { + "success": True, + "items": [ + {"person_id": "resolved-person-id", "person_name": "Alice", "profile_text": "喜欢咖啡"}, + {"person_id": "other-person-id", "person_name": "Bob", "profile_text": "喜欢茶"}, + ], + } + + monkeypatch.setattr(memory_router_module, "resolve_person_id_for_memory", fake_resolve_person_id_for_memory) + monkeypatch.setattr(memory_router_module, "_profile_list", fake_profile_list) + + response = client.get( + "/api/webui/memory/profiles/search", + params={"platform": "qq", "user_id": "12345", "limit": 50}, + ) + + assert response.status_code == 200 + assert response.json()["items"] == [ + {"person_id": "resolved-person-id", "person_name": "Alice", "profile_text": "喜欢咖啡"} + ] + + +def test_webui_memory_profile_search_filters_keyword(client: TestClient, monkeypatch): + async def fake_profile_list(limit: int): + assert limit == 200 + return { + "success": True, + "items": [ + {"person_id": "person-1", "person_name": "Alice", "profile_text": "喜欢咖啡"}, + {"person_id": "person-2", "person_name": "Bob", "profile_text": "喜欢茶"}, + ], + } + + monkeypatch.setattr(memory_router_module, "_profile_list", fake_profile_list) + + response = client.get("/api/webui/memory/profiles/search", params={"person_keyword": "咖啡", "limit": 50}) + + assert response.status_code == 200 + assert response.json()["items"] == [ + {"person_id": "person-1", "person_name": "Alice", "profile_text": "喜欢咖啡"} + ] + + +def test_webui_memory_episode_list_resolves_platform_user_id(client: TestClient, monkeypatch): + def fake_resolve_person_id_for_memory(**kwargs): + assert kwargs == {"platform": "qq", "user_id": "12345", "strict_known": False} + return "resolved-person-id" + + async def fake_episode_admin(*, action: str, **kwargs): + assert action == "list" + assert kwargs == { + "query": "咖啡", + "limit": 9, + "source": "chat_summary:demo", + "person_id": "resolved-person-id", + "time_start": 100.0, + "time_end": 200.0, + } + return { + "success": True, + "items": [{"episode_id": "ep-1", "person_id": "resolved-person-id", "summary": "喝咖啡"}], + "count": 1, + } + + monkeypatch.setattr(memory_router_module, "resolve_person_id_for_memory", fake_resolve_person_id_for_memory) + monkeypatch.setattr(memory_router_module.memory_service, "episode_admin", fake_episode_admin) + monkeypatch.setattr(memory_router_module, "_get_person_name_for_person_id", lambda person_id: "测试人物") + + response = client.get( + "/api/webui/memory/episodes", + params={ + "query": "咖啡", + "limit": 9, + "source": "chat_summary:demo", + "platform": "qq", + "user_id": "12345", + "time_start": 100, + "time_end": 200, + }, + ) + + assert response.status_code == 200 + assert response.json()["items"][0]["person_name"] == "测试人物" + + +def test_webui_memory_episode_list_prefers_explicit_person_id(client: TestClient, monkeypatch): + def fake_resolve_person_id_for_memory(**kwargs): + raise AssertionError(f"不应解析平台账号: {kwargs}") + + async def fake_episode_admin(*, action: str, **kwargs): + assert action == "list" + assert kwargs["person_id"] == "explicit-person-id" + return {"success": True, "items": []} + + monkeypatch.setattr(memory_router_module, "resolve_person_id_for_memory", fake_resolve_person_id_for_memory) + monkeypatch.setattr(memory_router_module.memory_service, "episode_admin", fake_episode_admin) + + response = client.get( + "/api/webui/memory/episodes", + params={"person_id": "explicit-person-id", "platform": "qq", "user_id": "12345"}, + ) + + assert response.status_code == 200 + assert response.json()["items"] == [] + + def test_compat_aggregate_route(client: TestClient, monkeypatch): async def fake_search(query: str, **kwargs): assert kwargs["mode"] == "aggregate" diff --git a/src/A_memorix/core/storage/metadata_store.py b/src/A_memorix/core/storage/metadata_store.py index 2667e683..1ff30d46 100644 --- a/src/A_memorix/core/storage/metadata_store.py +++ b/src/A_memorix/core/storage/metadata_store.py @@ -2627,6 +2627,7 @@ class MetadataStore: SELECT source, COUNT(*) as count, MAX(created_at) as last_updated FROM paragraphs WHERE source IS NOT NULL AND source != '' + AND (is_deleted IS NULL OR is_deleted = 0) GROUP BY source ORDER BY last_updated DESC """) diff --git a/src/A_memorix/core/utils/summary_importer.py b/src/A_memorix/core/utils/summary_importer.py index 0728e4dc..d2c18ed5 100644 --- a/src/A_memorix/core/utils/summary_importer.py +++ b/src/A_memorix/core/utils/summary_importer.py @@ -56,6 +56,63 @@ SUMMARY_PROMPT_TEMPLATE = """ 注意:总结应具有叙事性,能够作为长程记忆的一部分。直接使用实体的实际名称,不要使用 e1/e2 等代号。 """ + +def _normalize_entity_items(raw_entities: Any) -> List[str]: + if not isinstance(raw_entities, list): + return [] + entities: List[str] = [] + seen = set() + for item in raw_entities: + if isinstance(item, str): + name = item.strip() + elif isinstance(item, dict): + name = str(item.get("name") or item.get("label") or item.get("entity") or "").strip() + else: + name = "" + if not name: + continue + key = name.lower() + if key in seen: + continue + seen.add(key) + entities.append(name) + return entities + + +def _normalize_relation_items(raw_relations: Any) -> List[Dict[str, str]]: + if not isinstance(raw_relations, list): + return [] + relations: List[Dict[str, str]] = [] + for item in raw_relations: + if not isinstance(item, dict): + continue + subject = str(item.get("subject", "") or "").strip() + predicate = str(item.get("predicate", "") or "").strip() + obj = str(item.get("object", "") or "").strip() + if not (subject and predicate and obj): + continue + relations.append({"subject": subject, "predicate": predicate, "object": obj}) + return relations + + +def _message_timestamp(message: Any) -> Optional[float]: + for attr_name in ("timestamp", "time"): + value = getattr(message, attr_name, None) + if value is None: + continue + timestamp_func = getattr(value, "timestamp", None) + if callable(timestamp_func): + try: + return float(timestamp_func()) + except Exception: + continue + try: + return float(value) + except Exception: + continue + return None + + class SummaryImporter: """总结并导入知识的工具类""" @@ -312,14 +369,12 @@ class SummaryImporter: if not data or "summary" not in data: return False, "解析 LLM 响应失败或总结为空" - summary_text = data["summary"] - entities = data.get("entities", []) - relations = data.get("relations", []) - msg_times = [ - float(getattr(getattr(msg, "timestamp", None), "timestamp", lambda: 0.0)()) - for msg in messages - if getattr(msg, "time", None) is not None - ] + summary_text = str(data["summary"] or "").strip() + if not summary_text: + return False, "解析 LLM 响应失败或总结为空" + entities = _normalize_entity_items(data.get("entities")) + relations = _normalize_relation_items(data.get("relations")) + msg_times = [timestamp for msg in messages if (timestamp := _message_timestamp(msg)) is not None] time_meta = {} if msg_times: time_meta = { @@ -455,8 +510,8 @@ class SummaryImporter: if not isinstance(rv_cfg, dict): rv_cfg = {} write_vector = bool(rv_cfg.get("enabled", False)) and bool(rv_cfg.get("write_on_import", True)) - for rel in relations: - s, p, o = rel.get("subject"), rel.get("predicate"), rel.get("object") + for rel in _normalize_relation_items(relations): + s, p, o = rel["subject"], rel["predicate"], rel["object"] if all([s, p, o]): if self.relation_write_service is not None: await self.relation_write_service.upsert_relation_with_vector( diff --git a/src/A_memorix/core/utils/web_import_manager.py b/src/A_memorix/core/utils/web_import_manager.py index a4db5c54..2b41f9f2 100644 --- a/src/A_memorix/core/utils/web_import_manager.py +++ b/src/A_memorix/core/utils/web_import_manager.py @@ -6,6 +6,11 @@ Web Import Task Manager from __future__ import annotations +from collections import deque +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Tuple import asyncio import hashlib import json @@ -15,24 +20,24 @@ import sys import time import traceback import uuid -from collections import deque -from dataclasses import dataclass, field -from datetime import datetime -from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple from src.common.logger import get_logger from src.services import llm_service as llm_api from ...paths import default_data_dir, repo_root, resolve_repo_path, scripts_root from ..storage import ( + KnowledgeType, + MetadataStore, parse_import_strategy, resolve_stored_knowledge_type, select_import_strategy, - KnowledgeType, - MetadataStore, ) +from ..storage.knowledge_types import ImportStrategy from ..storage.type_detection import looks_like_quote_text +from ..strategies.base import KnowledgeType as StrategyKnowledgeType, ProcessedChunk +from ..strategies.factual import FactualStrategy +from ..strategies.narrative import NarrativeStrategy +from ..strategies.quote import QuoteStrategy from ..utils.import_payloads import ( ImportPayloadValidationError, is_probable_hash_token, @@ -42,11 +47,6 @@ from ..utils.import_payloads import ( ) from ..utils.runtime_self_check import ensure_runtime_self_check from ..utils.time_parser import normalize_time_meta -from ..storage.knowledge_types import ImportStrategy -from ..strategies.base import ProcessedChunk, KnowledgeType as StrategyKnowledgeType -from ..strategies.narrative import NarrativeStrategy -from ..strategies.factual import FactualStrategy -from ..strategies.quote import QuoteStrategy logger = get_logger("A_Memorix.WebImportManager") @@ -141,6 +141,44 @@ def _coerce_list(value: Any) -> List[str]: return out +def _coerce_import_data_dict(value: Any, *, context: str) -> Dict[str, Any]: + """确保 LLM 抽取结果是对象,避免写入阶段出现部分提交。""" + + if value is None: + return {} + if isinstance(value, dict): + return value + raise ValueError(f"{context} 必须返回 JSON 对象,当前类型: {type(value).__name__}") + + +def _normalize_import_relation_list(value: Any) -> List[Dict[str, str]]: + if not isinstance(value, list): + return [] + relations: List[Dict[str, str]] = [] + for item in value: + relation = normalize_relation_import_item(item) + if relation is not None: + relations.append(relation) + return relations + + +def _normalize_import_entity_list(value: Any) -> List[str]: + if not isinstance(value, list): + return [] + entities: List[str] = [] + seen = set() + for item in value: + name = normalize_entity_import_item(item) + if not name: + continue + key = name.lower() + if key in seen: + continue + seen.add(key) + entities.append(name) + return entities + + def _parse_optional_positive_int(value: Any, field_name: str) -> Optional[int]: if value is None: return None @@ -3128,6 +3166,7 @@ class ImportTaskManager: if is_probable_hash_token(content): logger.warning("跳过疑似哈希段落写入: source=%s preview=%s", self._source_label(file_record), content[:32]) return + data = _coerce_import_data_dict(processed.data, context="分块抽取结果") para_hash = self.plugin.metadata_store.add_paragraph( content=content, source=self._source_label(file_record), @@ -3145,31 +3184,25 @@ class ImportTaskManager: f"web_import text paragraph 向量写入降级: hash={para_hash[:8]} detail={vector_result.get('detail')}" ) - data = processed.data or {} entities: List[str] = [] relations: List[Tuple[str, str, str]] = [] - for triple in data.get("triples", []): - s = str(triple.get("subject", "")).strip() - p = str(triple.get("predicate", "")).strip() - o = str(triple.get("object", "")).strip() - if s and p and o: - relations.append((s, p, o)) - entities.extend([s, o]) + for triple in _normalize_import_relation_list(data.get("triples")): + s = triple["subject"] + p = triple["predicate"] + o = triple["object"] + relations.append((s, p, o)) + entities.extend([s, o]) - for rel in data.get("relations", []): - s = str(rel.get("subject", "")).strip() - p = str(rel.get("predicate", "")).strip() - o = str(rel.get("object", "")).strip() - if s and p and o: - relations.append((s, p, o)) - entities.extend([s, o]) + for rel in _normalize_import_relation_list(data.get("relations")): + s = rel["subject"] + p = rel["predicate"] + o = rel["object"] + relations.append((s, p, o)) + entities.extend([s, o]) for k in ("entities", "events", "verbatim_entities"): - for e in data.get(k, []): - name = str(e or "").strip() - if name and not is_probable_hash_token(name): - entities.append(name) + entities.extend(_normalize_import_entity_list(data.get(k))) uniq_entities = list({x.strip().lower(): x.strip() for x in entities if str(x).strip()}.values()) for name in uniq_entities: @@ -3297,12 +3330,12 @@ class ImportTaskManager: txt = txt[4:].strip() try: - return json.loads(txt) + return _coerce_import_data_dict(json.loads(txt), context="LLM 抽取结果") except Exception: s = txt.find("{") e = txt.rfind("}") if s >= 0 and e > s: - return json.loads(txt[s : e + 1]) + return _coerce_import_data_dict(json.loads(txt[s : e + 1]), context="LLM 抽取结果") raise except Exception as err: last_error = err @@ -3373,6 +3406,7 @@ JSON schema: logger.warning(f"chat_log 时间语义抽取失败: {e}") return None + result = _coerce_import_data_dict(result, context="chat_log 时间抽取结果") raw_time_meta = { "event_time": result.get("event_time"), "event_time_start": result.get("event_time_start"), @@ -3632,7 +3666,7 @@ JSON schema: c.progress = 1.0 c.updated_at = _now() f.failed_chunks += 1 - f.progress = self._compute_ratio(f.done_chunks + f.failed_chunks + f.cancelled_chunks, f.total_chunks) + f.progress = self._compute_ratio(f.done_chunks, f.total_chunks) if not f.error: f.error = str(error) f.updated_at = _now() @@ -3698,7 +3732,7 @@ JSON schema: task.done_chunks = done task.failed_chunks = failed task.cancelled_chunks = cancelled - task.progress = self._compute_ratio(done + failed + cancelled, total) + task.progress = self._compute_ratio(done, total) task.updated_at = _now() async def _should_cleanup_task_temp(self, task_id: str) -> bool: From d01c7276a8a45f4e71807b2a182c2937b2f7b75c Mon Sep 17 00:00:00 2001 From: DawnARC Date: Wed, 6 May 2026 23:05:44 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E8=BF=9B=E5=BA=A6=E5=B1=95=E7=A4=BA=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../test_web_import_manager_payloads.py | 67 ++++++++++++++++++- .../core/utils/web_import_manager.py | 25 ++++--- uv.lock | 8 +-- 3 files changed, 82 insertions(+), 18 deletions(-) diff --git a/pytests/A_memorix_test/test_web_import_manager_payloads.py b/pytests/A_memorix_test/test_web_import_manager_payloads.py index 8fdca65d..f2d78df3 100644 --- a/pytests/A_memorix_test/test_web_import_manager_payloads.py +++ b/pytests/A_memorix_test/test_web_import_manager_payloads.py @@ -4,7 +4,12 @@ import numpy as np import pytest from src.A_memorix.core.strategies.base import ChunkContext, KnowledgeType, ProcessedChunk, SourceInfo -from src.A_memorix.core.utils.web_import_manager import ImportTaskManager +from src.A_memorix.core.utils.web_import_manager import ( + ImportChunkRecord, + ImportFileRecord, + ImportTaskManager, + ImportTaskRecord, +) class _DummyMetadataStore: @@ -76,6 +81,21 @@ def _build_manager() -> tuple[ImportTaskManager, _DummyMetadataStore]: return manager, metadata_store +def _build_progress_task(task_id: str, total_chunks: int = 2) -> ImportTaskRecord: + file_record = ImportFileRecord( + file_id="file-1", + name="demo.txt", + source_kind="paste", + input_mode="text", + total_chunks=total_chunks, + chunks=[ + ImportChunkRecord(chunk_id=f"chunk-{index}", index=index, chunk_type="text") + for index in range(total_chunks) + ], + ) + return ImportTaskRecord(task_id=task_id, source="paste", params={}, files=[file_record]) + + def _build_chunk(data) -> ProcessedChunk: return ProcessedChunk( type=KnowledgeType.FACTUAL, @@ -96,6 +116,51 @@ async def test_persist_processed_chunk_rejects_non_object_before_paragraph_write assert metadata_store.paragraphs == [] +@pytest.mark.asyncio +async def test_chunk_terminal_progress_uses_successful_chunks_only() -> None: + manager, _ = _build_manager() + + task = _build_progress_task("task-fail-then-complete") + manager._tasks[task.task_id] = task + + await manager._set_chunk_failed(task.task_id, "file-1", "chunk-0", "boom") + await manager._set_chunk_completed(task.task_id, "file-1", "chunk-1") + + file_record = task.files[0] + assert file_record.done_chunks == 1 + assert file_record.failed_chunks == 1 + assert file_record.progress == pytest.approx(0.5) + assert task.progress == pytest.approx(0.5) + + reverse_task = _build_progress_task("task-complete-then-fail") + manager._tasks[reverse_task.task_id] = reverse_task + + await manager._set_chunk_completed(reverse_task.task_id, "file-1", "chunk-0") + await manager._set_chunk_failed(reverse_task.task_id, "file-1", "chunk-1", "boom") + + reverse_file = reverse_task.files[0] + assert reverse_file.done_chunks == 1 + assert reverse_file.failed_chunks == 1 + assert reverse_file.progress == pytest.approx(0.5) + assert reverse_task.progress == pytest.approx(0.5) + + +@pytest.mark.asyncio +async def test_cancelled_chunks_do_not_increase_file_progress() -> None: + manager, _ = _build_manager() + task = _build_progress_task("task-cancelled-progress", total_chunks=3) + manager._tasks[task.task_id] = task + + await manager._set_chunk_completed(task.task_id, "file-1", "chunk-0") + await manager._set_chunk_cancelled(task.task_id, "file-1", "chunk-1", "任务已取消") + + file_record = task.files[0] + assert file_record.done_chunks == 1 + assert file_record.cancelled_chunks == 1 + assert file_record.progress == pytest.approx(1 / 3) + assert task.progress == pytest.approx(1 / 3) + + @pytest.mark.asyncio async def test_persist_processed_chunk_skips_invalid_nested_items() -> None: manager, metadata_store = _build_manager() diff --git a/src/A_memorix/core/utils/web_import_manager.py b/src/A_memorix/core/utils/web_import_manager.py index 2b41f9f2..037ddc6a 100644 --- a/src/A_memorix/core/utils/web_import_manager.py +++ b/src/A_memorix/core/utils/web_import_manager.py @@ -2040,7 +2040,7 @@ class ImportTaskManager: if total <= 0: total = max(1, scanned) - progress = max(0.0, min(1.0, float(scanned) / float(total))) if total > 0 else 0.0 + chunk_progress = max(0.0, min(1.0, float(scanned) / float(total))) if total > 0 else 0.0 preview = f"scanned={scanned}/{total}, migrated={migrated}, bad={bad}, last_id={last_id}" async with self._lock: @@ -2055,14 +2055,14 @@ class ImportTaskManager: if c.status not in {"completed", "failed", "cancelled"}: c.status = "writing" c.step = "migrating" - c.progress = progress + c.progress = chunk_progress c.content_preview = preview c.updated_at = _now() f.total_chunks = total f.done_chunks = done f.failed_chunks = bad f.cancelled_chunks = 0 - f.progress = progress + self._recompute_file_progress(f) if f.status not in {"failed", "cancelled"}: f.status = "writing" f.current_step = "migrating" @@ -2209,7 +2209,7 @@ class ImportTaskManager: f.done_chunks = max(0, min(f.done_chunks, f.total_chunks)) f.failed_chunks = max(0, min(f.failed_chunks, f.total_chunks)) f.cancelled_chunks = 0 - f.progress = 1.0 + self._recompute_file_progress(f) f.status = "completed" f.current_step = "completed" if bad_rows > 0 and not f.error: @@ -3578,9 +3578,7 @@ JSON schema: additional_cancelled += 1 if additional_cancelled > 0: f.cancelled_chunks += additional_cancelled - f.progress = self._compute_ratio( - f.done_chunks + f.failed_chunks + f.cancelled_chunks, f.total_chunks - ) + self._recompute_file_progress(f) f.updated_at = _now() task.updated_at = _now() self._recompute_task_progress(task) @@ -3638,7 +3636,7 @@ JSON schema: c.progress = 1.0 c.updated_at = _now() f.done_chunks += 1 - f.progress = self._compute_ratio(f.done_chunks + f.failed_chunks + f.cancelled_chunks, f.total_chunks) + self._recompute_file_progress(f) f.updated_at = _now() self._recompute_task_progress(task) @@ -3666,7 +3664,7 @@ JSON schema: c.progress = 1.0 c.updated_at = _now() f.failed_chunks += 1 - f.progress = self._compute_ratio(f.done_chunks, f.total_chunks) + self._recompute_file_progress(f) if not f.error: f.error = str(error) f.updated_at = _now() @@ -3690,7 +3688,7 @@ JSON schema: c.progress = 1.0 c.updated_at = _now() f.cancelled_chunks += 1 - f.progress = self._compute_ratio(f.done_chunks + f.failed_chunks + f.cancelled_chunks, f.total_chunks) + self._recompute_file_progress(f) f.updated_at = _now() self._recompute_task_progress(task) @@ -3718,6 +3716,9 @@ JSON schema: return 1.0 return max(0.0, min(1.0, float(done) / float(total))) + def _recompute_file_progress(self, file_record: ImportFileRecord) -> None: + file_record.progress = self._compute_ratio(file_record.done_chunks, file_record.total_chunks) + def _recompute_task_progress(self, task: ImportTaskRecord) -> None: total = 0 done = 0 @@ -3765,9 +3766,7 @@ JSON schema: additional_cancelled += 1 if additional_cancelled > 0: f.cancelled_chunks += additional_cancelled - f.progress = self._compute_ratio( - f.done_chunks + f.failed_chunks + f.cancelled_chunks, f.total_chunks - ) + self._recompute_file_progress(f) f.updated_at = _now() task.status = "cancelled" task.current_step = "cancelled" diff --git a/uv.lock b/uv.lock index dba2d9cf..4853ca9a 100644 --- a/uv.lock +++ b/uv.lock @@ -1511,7 +1511,7 @@ requires-dist = [ { name = "httpx", extras = ["socks"] }, { name = "jieba", specifier = ">=0.42.1" }, { name = "json-repair", specifier = ">=0.47.6" }, - { name = "maibot-dashboard", specifier = ">=1.0.5" }, + { name = "maibot-dashboard", specifier = ">=1.0.6" }, { name = "maibot-plugin-sdk", specifier = ">=2.4.0" }, { name = "maim-message", specifier = ">=0.6.2" }, { name = "matplotlib", specifier = ">=3.10.5" }, @@ -1549,11 +1549,11 @@ dev = [ [[package]] name = "maibot-dashboard" -version = "1.0.5" +version = "1.0.6" source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } -sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/b8/a7/eb1032664ea98b58a861412aca19b31066dc3368f1264a2a53970bd9385c/maibot_dashboard-1.0.5.tar.gz", hash = "sha256:3480723e42120defbaf8ebb952c45bc3e0cd9274a04c5acda0331e55e15ebdc1", size = 2477306, upload-time = "2026-05-05T10:40:34.327Z" } +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/73/76/a2a47f902f20bbaa699584d7fa9676f591503e0d6954de65aa0a90c07000/maibot_dashboard-1.0.6.tar.gz", hash = "sha256:f383d3505a102554a51bf49d1fc56a8ba8c5db60a3c41b7eab4513a6fd0a1f88", size = 2485522, upload-time = "2026-05-06T10:44:36.42Z" } wheels = [ - { url = "https://pypi.tuna.tsinghua.edu.cn/packages/3a/a0/ad4f7c1d381875ca8d1aeedf5ff6e94692f64cf558479f9e845e47bca830/maibot_dashboard-1.0.5-py3-none-any.whl", hash = "sha256:67bfbb82a1ddd666d20cc958864db38df2e5493f77df0cb049ae83987b1dd65d", size = 2542631, upload-time = "2026-05-05T10:40:32.5Z" }, + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/ae/14/a62631e60c9606a793d6740ef61fc0b8868cf8a79c9f192667026874799b/maibot_dashboard-1.0.6-py3-none-any.whl", hash = "sha256:36299d7039fbb98fd8aa1fb31d2bbc040d1018d9d87ebcf09194e4efb0cf9af7", size = 2552642, upload-time = "2026-05-06T10:44:34.216Z" }, ] [[package]]