feat:新增记忆测试、检索工具与服务

新增完整的长期记忆支持及测试:引入中文记忆检索提示词、query_long_term_memory 检索工具、记忆服务与记忆流程服务,以及 WebUI 的记忆路由。新增大规模测试套件(包括单元测试与基准/在线测试),覆盖聊天历史摘要、知识获取器、事件(episode)生成、写回机制以及用户画像检索等功能。

更新多个模块以集成记忆检索能力(包括 knowledge fetcher、chat summarizer、memory_retrieval、person_info、config/legacy 迁移以及 WebUI 路由),并移除遗留的 lpmm 知识模块。这些变更完成了记忆运行时的接入,同时为基准测试提供嵌入适配器的 mock,并支持新测试与工具所需的导入与 episode 处理流程。
This commit is contained in:
DawnARC
2026-03-18 21:35:17 +08:00
parent 999e7246e2
commit bd84e500e1
39 changed files with 5849 additions and 764 deletions

View File

@@ -0,0 +1,148 @@
from types import SimpleNamespace
import pytest
from src.memory_system import chat_history_summarizer as summarizer_module
def _build_summarizer() -> summarizer_module.ChatHistorySummarizer:
summarizer = summarizer_module.ChatHistorySummarizer.__new__(summarizer_module.ChatHistorySummarizer)
summarizer.session_id = "session-1"
summarizer.log_prefix = "[session-1]"
return summarizer
@pytest.mark.asyncio
async def test_import_to_long_term_memory_uses_summary_payload(monkeypatch):
calls = []
summarizer = _build_summarizer()
async def fake_ingest_summary(**kwargs):
calls.append(kwargs)
return SimpleNamespace(success=True, detail="", stored_ids=["p1"])
monkeypatch.setattr(
summarizer_module,
"_chat_manager",
SimpleNamespace(get_session_by_session_id=lambda session_id: SimpleNamespace(user_id="user-1", group_id="")),
)
monkeypatch.setattr(summarizer_module, "global_config", SimpleNamespace(memory=SimpleNamespace(chat_history_topic_check_message_threshold=8)))
monkeypatch.setattr("src.services.memory_service.memory_service.ingest_summary", fake_ingest_summary)
await summarizer._import_to_long_term_memory(
record_id=1,
theme="旅行计划",
summary="我们讨论了春游安排",
participants=["Alice", "Bob"],
start_time=1.0,
end_time=2.0,
original_text="long text",
)
assert len(calls) == 1
payload = calls[0]
assert payload["external_id"] == "chat_history:1"
assert payload["chat_id"] == "session-1"
assert payload["participants"] == ["Alice", "Bob"]
assert payload["respect_filter"] is True
assert payload["user_id"] == "user-1"
assert payload["group_id"] == ""
assert "主题:旅行计划" in payload["text"]
assert "概括:我们讨论了春游安排" in payload["text"]
@pytest.mark.asyncio
async def test_import_to_long_term_memory_falls_back_when_content_empty(monkeypatch):
summarizer = _build_summarizer()
fallback_calls = []
async def fake_fallback(**kwargs):
fallback_calls.append(kwargs)
summarizer._fallback_import_to_long_term_memory = fake_fallback
monkeypatch.setattr(
summarizer_module,
"_chat_manager",
SimpleNamespace(get_session_by_session_id=lambda session_id: SimpleNamespace(user_id="user-1", group_id="")),
)
await summarizer._import_to_long_term_memory(
record_id=2,
theme="",
summary="",
participants=[],
start_time=10.0,
end_time=20.0,
original_text="raw chat",
)
assert len(fallback_calls) == 1
assert fallback_calls[0]["record_id"] == 2
assert fallback_calls[0]["original_text"] == "raw chat"
@pytest.mark.asyncio
async def test_import_to_long_term_memory_falls_back_when_ingest_fails(monkeypatch):
summarizer = _build_summarizer()
fallback_calls = []
async def fake_ingest_summary(**kwargs):
return SimpleNamespace(success=False, detail="boom", stored_ids=[])
async def fake_fallback(**kwargs):
fallback_calls.append(kwargs)
summarizer._fallback_import_to_long_term_memory = fake_fallback
monkeypatch.setattr(
summarizer_module,
"_chat_manager",
SimpleNamespace(get_session_by_session_id=lambda session_id: SimpleNamespace(user_id="user-1", group_id="group-1")),
)
monkeypatch.setattr("src.services.memory_service.memory_service.ingest_summary", fake_ingest_summary)
await summarizer._import_to_long_term_memory(
record_id=3,
theme="电影",
summary="聊了电影推荐",
participants=["Alice"],
start_time=3.0,
end_time=4.0,
original_text="raw",
)
assert len(fallback_calls) == 1
assert fallback_calls[0]["theme"] == "电影"
@pytest.mark.asyncio
async def test_fallback_import_to_long_term_memory_sets_generate_from_chat(monkeypatch):
calls = []
summarizer = _build_summarizer()
async def fake_ingest_summary(**kwargs):
calls.append(kwargs)
return SimpleNamespace(success=True, detail="chat_filtered", stored_ids=[])
monkeypatch.setattr(
summarizer_module,
"_chat_manager",
SimpleNamespace(get_session_by_session_id=lambda session_id: SimpleNamespace(user_id="user-2", group_id="group-2")),
)
monkeypatch.setattr(summarizer_module, "global_config", SimpleNamespace(memory=SimpleNamespace(chat_history_topic_check_message_threshold=12)))
monkeypatch.setattr("src.services.memory_service.memory_service.ingest_summary", fake_ingest_summary)
await summarizer._fallback_import_to_long_term_memory(
record_id=4,
theme="工作",
participants=["Alice"],
start_time=5.0,
end_time=6.0,
original_text="a" * 128,
)
assert len(calls) == 1
metadata = calls[0]["metadata"]
assert metadata["generate_from_chat"] is True
assert metadata["context_length"] == 12
assert calls[0]["respect_filter"] is True

View File

@@ -0,0 +1,127 @@
from types import SimpleNamespace
import pytest
from src.chat.brain_chat.PFC import pfc_KnowledgeFetcher as knowledge_module
from src.services.memory_service import MemoryHit, MemorySearchResult
def test_knowledge_fetcher_resolves_private_memory_context(monkeypatch):
monkeypatch.setattr(knowledge_module, "LLMRequest", lambda *args, **kwargs: object())
monkeypatch.setattr(
knowledge_module,
"_chat_manager",
SimpleNamespace(get_session_by_session_id=lambda session_id: SimpleNamespace(platform="qq", user_id="42", group_id="")),
)
monkeypatch.setattr(
knowledge_module,
"resolve_person_id_for_memory",
lambda *, person_name, platform, user_id: f"{person_name}:{platform}:{user_id}",
)
fetcher = knowledge_module.KnowledgeFetcher(private_name="Alice", stream_id="stream-1")
assert fetcher._resolve_private_memory_context() == {
"chat_id": "stream-1",
"person_id": "Alice:qq:42",
"user_id": "42",
"group_id": "",
}
@pytest.mark.asyncio
async def test_knowledge_fetcher_memory_get_knowledge_uses_memory_service(monkeypatch):
monkeypatch.setattr(knowledge_module, "LLMRequest", lambda *args, **kwargs: object())
monkeypatch.setattr(
knowledge_module,
"_chat_manager",
SimpleNamespace(get_session_by_session_id=lambda session_id: SimpleNamespace(platform="qq", user_id="42", group_id="")),
)
monkeypatch.setattr(
knowledge_module,
"resolve_person_id_for_memory",
lambda *, person_name, platform, user_id: f"{person_name}:{platform}:{user_id}",
)
calls = []
async def fake_search(query: str, **kwargs):
calls.append((query, kwargs))
return MemorySearchResult(summary="", hits=[MemoryHit(content="她喜欢猫", source="person_fact:qq:42")], filtered=False)
monkeypatch.setattr(knowledge_module.memory_service, "search", fake_search)
fetcher = knowledge_module.KnowledgeFetcher(private_name="Alice", stream_id="stream-1")
result = await fetcher._memory_get_knowledge("她喜欢什么")
assert "1. 她喜欢猫" in result
assert calls == [
(
"她喜欢什么",
{
"limit": 5,
"mode": "search",
"chat_id": "stream-1",
"person_id": "Alice:qq:42",
"user_id": "42",
"group_id": "",
"respect_filter": True,
},
)
]
@pytest.mark.asyncio
async def test_knowledge_fetcher_falls_back_to_chat_scope_when_person_scope_misses(monkeypatch):
monkeypatch.setattr(knowledge_module, "LLMRequest", lambda *args, **kwargs: object())
monkeypatch.setattr(
knowledge_module,
"_chat_manager",
SimpleNamespace(get_session_by_session_id=lambda session_id: SimpleNamespace(platform="qq", user_id="42", group_id="")),
)
monkeypatch.setattr(
knowledge_module,
"resolve_person_id_for_memory",
lambda *, person_name, platform, user_id: "person-1",
)
calls = []
async def fake_search(query: str, **kwargs):
calls.append((query, kwargs))
if kwargs.get("person_id"):
return MemorySearchResult(summary="", hits=[], filtered=False)
return MemorySearchResult(summary="", hits=[MemoryHit(content="她计划去杭州音乐节", source="chat_summary:stream-1")], filtered=False)
monkeypatch.setattr(knowledge_module.memory_service, "search", fake_search)
fetcher = knowledge_module.KnowledgeFetcher(private_name="Alice", stream_id="stream-1")
result = await fetcher._memory_get_knowledge("Alice 最近在忙什么")
assert "杭州音乐节" in result
assert calls == [
(
"Alice 最近在忙什么",
{
"limit": 5,
"mode": "search",
"chat_id": "stream-1",
"person_id": "person-1",
"user_id": "42",
"group_id": "",
"respect_filter": True,
},
),
(
"Alice 最近在忙什么",
{
"limit": 5,
"mode": "search",
"chat_id": "stream-1",
"person_id": "",
"user_id": "42",
"group_id": "",
"respect_filter": True,
},
),
]

View File

@@ -0,0 +1,35 @@
from src.config.legacy_migration import try_migrate_legacy_bot_config_dict
def test_legacy_learning_list_with_numeric_fourth_column_is_migrated():
payload = {
"expression": {
"learning_list": [
["qq:123456:group", "enable", "disable", "0.5"],
["", "disable", "enable", "0.1"],
]
}
}
result = try_migrate_legacy_bot_config_dict(payload)
assert result.migrated is True
assert "expression.learning_list" in result.reason
assert result.data["expression"]["learning_list"] == [
{
"platform": "qq",
"item_id": "123456",
"rule_type": "group",
"use_expression": True,
"enable_learning": False,
"enable_jargon_learning": False,
},
{
"platform": "",
"item_id": "",
"rule_type": "group",
"use_expression": False,
"enable_learning": True,
"enable_jargon_learning": False,
},
]

View File

@@ -0,0 +1,691 @@
from __future__ import annotations
import asyncio
import inspect
import json
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Dict, List
import numpy as np
import pytest
import pytest_asyncio
from A_memorix.core.runtime import sdk_memory_kernel as kernel_module
from A_memorix.core.runtime.sdk_memory_kernel import KernelSearchRequest, SDKMemoryKernel
from src.chat.brain_chat.PFC import pfc_KnowledgeFetcher as knowledge_module
from src.memory_system import chat_history_summarizer as summarizer_module
from src.memory_system.retrieval_tools.query_long_term_memory import query_long_term_memory
from src.person_info import person_info as person_info_module
from src.services import memory_service as memory_service_module
from src.services.memory_service import MemorySearchResult, memory_service
DATA_FILE = Path(__file__).parent / "data" / "benchmarks" / "long_novel_memory_benchmark.json"
REPORT_FILE = Path(__file__).parent / "data" / "benchmarks" / "results" / "long_novel_memory_benchmark_report.json"
def _load_benchmark_fixture() -> Dict[str, Any]:
return json.loads(DATA_FILE.read_text(encoding="utf-8"))
class _FakeEmbeddingAdapter:
def __init__(self, dimension: int = 32) -> None:
self.dimension = dimension
async def _detect_dimension(self) -> int:
return self.dimension
async def encode(self, texts, dimensions=None):
dim = int(dimensions or self.dimension)
if isinstance(texts, str):
sequence = [texts]
single = True
else:
sequence = list(texts)
single = False
rows = []
for text in sequence:
vec = np.zeros(dim, dtype=np.float32)
for ch in str(text or ""):
code = ord(ch)
vec[code % dim] += 1.0
vec[(code * 7) % dim] += 0.5
if not vec.any():
vec[0] = 1.0
norm = np.linalg.norm(vec)
if norm > 0:
vec = vec / norm
rows.append(vec)
payload = np.vstack(rows)
return payload[0] if single else payload
class _KnownPerson:
def __init__(self, person_id: str, registry: Dict[str, str], reverse_registry: Dict[str, str]) -> None:
self.person_id = person_id
self.is_known = person_id in reverse_registry
self.person_name = reverse_registry.get(person_id, "")
self._registry = registry
class _KernelBackedRuntimeManager:
is_running = True
def __init__(self, kernel: SDKMemoryKernel) -> None:
self.kernel = kernel
async def invoke_plugin(
self,
*,
method: str,
plugin_id: str,
component_name: str,
args: Dict[str, Any] | None,
timeout_ms: int,
):
del method, plugin_id, timeout_ms
payload = args or {}
if component_name == "search_memory":
return await self.kernel.search_memory(
KernelSearchRequest(
query=str(payload.get("query", "") or ""),
limit=int(payload.get("limit", 5) or 5),
mode=str(payload.get("mode", "hybrid") or "hybrid"),
chat_id=str(payload.get("chat_id", "") or ""),
person_id=str(payload.get("person_id", "") or ""),
time_start=payload.get("time_start"),
time_end=payload.get("time_end"),
respect_filter=bool(payload.get("respect_filter", True)),
user_id=str(payload.get("user_id", "") or ""),
group_id=str(payload.get("group_id", "") or ""),
)
)
handler = getattr(self.kernel, component_name)
result = handler(**payload)
return await result if inspect.isawaitable(result) else result
async def _wait_for_import_task(task_id: str, *, max_rounds: int = 200, sleep_seconds: float = 0.05) -> Dict[str, Any]:
for _ in range(max_rounds):
detail = await memory_service.import_admin(action="get", task_id=task_id, include_chunks=True)
task = detail.get("task") or {}
status = str(task.get("status", "") or "")
if status in {"completed", "completed_with_errors", "failed", "cancelled"}:
return detail
await asyncio.sleep(max(0.01, float(sleep_seconds)))
raise AssertionError(f"导入任务在等待窗口内未结束: {task_id}")
def _join_hit_content(search_result: MemorySearchResult) -> str:
return "\n".join(hit.content for hit in search_result.hits)
def _keyword_hits(text: str, keywords: List[str]) -> int:
haystack = str(text or "")
return sum(1 for keyword in keywords if keyword in haystack)
def _keyword_recall(text: str, keywords: List[str]) -> float:
if not keywords:
return 1.0
return _keyword_hits(text, keywords) / float(len(keywords))
def _hit_blob(hit) -> str:
meta = hit.metadata if isinstance(hit.metadata, dict) else {}
return "\n".join(
[
str(hit.content or ""),
str(hit.title or ""),
str(hit.source or ""),
json.dumps(meta, ensure_ascii=False),
]
)
def _first_relevant_rank(search_result: MemorySearchResult, keywords: List[str], minimum_keyword_hits: int) -> int:
for index, hit in enumerate(search_result.hits[:5], start=1):
if _keyword_hits(_hit_blob(hit), keywords) >= max(1, int(minimum_keyword_hits or len(keywords))):
return index
return 0
def _episode_blob_from_items(items: List[Dict[str, Any]]) -> str:
return "\n".join(
(
f"{item.get('title', '')}\n"
f"{item.get('summary', '')}\n"
f"{json.dumps(item.get('keywords', []), ensure_ascii=False)}\n"
f"{json.dumps(item.get('participants', []), ensure_ascii=False)}"
)
for item in items
)
def _episode_blob_from_hits(search_result: MemorySearchResult) -> str:
chunks = []
for hit in search_result.hits:
meta = hit.metadata if isinstance(hit.metadata, dict) else {}
chunks.append(
"\n".join(
[
str(hit.title or ""),
str(hit.content or ""),
json.dumps(meta.get("keywords", []) or [], ensure_ascii=False),
json.dumps(meta.get("participants", []) or [], ensure_ascii=False),
]
)
)
return "\n".join(chunks)
async def _evaluate_episode_generation(*, session_id: str, episode_cases: List[Dict[str, Any]]) -> Dict[str, Any]:
episode_source = f"chat_summary:{session_id}"
payload = await memory_service.episode_admin(
action="query",
source=episode_source,
limit=20,
)
items = payload.get("items") or []
blob = _episode_blob_from_items(items)
reports: List[Dict[str, Any]] = []
success_rate = 0.0
keyword_recall = 0.0
for case in episode_cases:
recall = _keyword_recall(blob, case["expected_keywords"])
success = bool(items) and recall >= float(case.get("minimum_keyword_recall", 1.0))
success_rate += 1.0 if success else 0.0
keyword_recall += recall
reports.append(
{
"query": case["query"],
"success": success,
"keyword_recall": recall,
"episode_count": len(items),
"top_episode": items[0] if items else None,
}
)
total = max(1, len(episode_cases))
return {
"success_rate": round(success_rate / total, 4),
"keyword_recall": round(keyword_recall / total, 4),
"episode_count": len(items),
"reports": reports,
}
async def _evaluate_episode_admin_query(*, session_id: str, episode_cases: List[Dict[str, Any]]) -> Dict[str, Any]:
reports: List[Dict[str, Any]] = []
success_rate = 0.0
keyword_recall = 0.0
episode_source = f"chat_summary:{session_id}"
for case in episode_cases:
payload = await memory_service.episode_admin(
action="query",
source=episode_source,
query=case["query"],
limit=5,
)
items = payload.get("items") or []
blob = "\n".join(
f"{item.get('title', '')}\n{item.get('summary', '')}\n{json.dumps(item.get('keywords', []), ensure_ascii=False)}"
for item in items
)
recall = _keyword_recall(blob, case["expected_keywords"])
success = bool(items) and recall >= float(case.get("minimum_keyword_recall", 1.0))
success_rate += 1.0 if success else 0.0
keyword_recall += recall
reports.append(
{
"query": case["query"],
"success": success,
"keyword_recall": recall,
"episode_count": len(items),
"top_episode": items[0] if items else None,
}
)
total = max(1, len(episode_cases))
return {
"success_rate": round(success_rate / total, 4),
"keyword_recall": round(keyword_recall / total, 4),
"reports": reports,
}
async def _evaluate_episode_search_mode(*, session_id: str, episode_cases: List[Dict[str, Any]]) -> Dict[str, Any]:
reports: List[Dict[str, Any]] = []
success_rate = 0.0
keyword_recall = 0.0
for case in episode_cases:
result = await memory_service.search(
case["query"],
mode="episode",
chat_id=session_id,
respect_filter=False,
limit=5,
)
blob = _episode_blob_from_hits(result)
recall = _keyword_recall(blob, case["expected_keywords"])
success = bool(result.hits) and recall >= float(case.get("minimum_keyword_recall", 1.0))
success_rate += 1.0 if success else 0.0
keyword_recall += recall
reports.append(
{
"query": case["query"],
"success": success,
"keyword_recall": recall,
"episode_count": len(result.hits),
"top_episode": result.hits[0].to_dict() if result.hits else None,
}
)
total = max(1, len(episode_cases))
return {
"success_rate": round(success_rate / total, 4),
"keyword_recall": round(keyword_recall / total, 4),
"reports": reports,
}
async def _evaluate_tool_modes(*, session_id: str, dataset: Dict[str, Any]) -> Dict[str, Any]:
search_case = dataset["search_cases"][0]
episode_case = dataset["episode_cases"][0]
aggregate_case = dataset["knowledge_fetcher_cases"][0]
tool_cases = [
{
"name": "search",
"kwargs": {
"query": "蓝漆铁盒 北塔木梯",
"mode": "search",
"chat_id": session_id,
"limit": 5,
},
"expected_keywords": ["蓝漆铁盒", "北塔木梯", "海潮图"],
"minimum_keyword_recall": 0.67,
},
{
"name": "time",
"kwargs": {
"query": "蓝漆铁盒 北塔",
"mode": "time",
"chat_id": session_id,
"limit": 5,
"time_expression": "最近7天",
},
"expected_keywords": ["蓝漆铁盒", "北塔木梯"],
"minimum_keyword_recall": 0.67,
},
{
"name": "episode",
"kwargs": {
"query": episode_case["query"],
"mode": "episode",
"chat_id": session_id,
"limit": 5,
},
"expected_keywords": episode_case["expected_keywords"],
"minimum_keyword_recall": 0.67,
},
{
"name": "aggregate",
"kwargs": {
"query": aggregate_case["query"],
"mode": "aggregate",
"chat_id": session_id,
"limit": 5,
},
"expected_keywords": aggregate_case["expected_keywords"],
"minimum_keyword_recall": 0.67,
},
]
reports: List[Dict[str, Any]] = []
success_rate = 0.0
keyword_recall = 0.0
for case in tool_cases:
text = await query_long_term_memory(**case["kwargs"])
recall = _keyword_recall(text, case["expected_keywords"])
success = (
"失败" not in text
and "无法解析" not in text
and "未找到" not in text
and recall >= float(case["minimum_keyword_recall"])
)
success_rate += 1.0 if success else 0.0
keyword_recall += recall
reports.append(
{
"name": case["name"],
"success": success,
"keyword_recall": recall,
"preview": text[:320],
}
)
total = max(1, len(tool_cases))
return {
"success_rate": round(success_rate / total, 4),
"keyword_recall": round(keyword_recall / total, 4),
"reports": reports,
}
@pytest_asyncio.fixture
async def benchmark_env(monkeypatch, tmp_path):
dataset = _load_benchmark_fixture()
session_cfg = dataset["session"]
session = SimpleNamespace(
session_id=session_cfg["session_id"],
platform=session_cfg["platform"],
user_id=session_cfg["user_id"],
group_id=session_cfg["group_id"],
)
fake_chat_manager = SimpleNamespace(
get_session_by_session_id=lambda session_id: session if session_id == session.session_id else None,
get_session_name=lambda session_id: session_cfg["display_name"] if session_id == session.session_id else session_id,
)
registry = {item["person_name"]: item["person_id"] for item in dataset["person_writebacks"]}
reverse_registry = {value: key for key, value in registry.items()}
monkeypatch.setattr(kernel_module, "create_embedding_api_adapter", lambda **kwargs: _FakeEmbeddingAdapter())
async def fake_self_check(**kwargs):
return {"ok": True, "message": "ok", "encoded_dimension": 32}
monkeypatch.setattr(kernel_module, "run_embedding_runtime_self_check", fake_self_check)
monkeypatch.setattr(memory_service_module, "get_plugin_runtime_manager", None)
monkeypatch.setattr(summarizer_module, "_chat_manager", fake_chat_manager)
monkeypatch.setattr(knowledge_module, "_chat_manager", fake_chat_manager)
monkeypatch.setattr(person_info_module, "_chat_manager", fake_chat_manager)
monkeypatch.setattr(person_info_module, "get_person_id_by_person_name", lambda person_name: registry.get(str(person_name or "").strip(), ""))
monkeypatch.setattr(
person_info_module,
"Person",
lambda person_id: _KnownPerson(person_id=str(person_id or ""), registry=registry, reverse_registry=reverse_registry),
)
data_dir = (tmp_path / "a_memorix_benchmark_data").resolve()
kernel = SDKMemoryKernel(
plugin_root=tmp_path / "plugin_root",
config={
"storage": {"data_dir": str(data_dir)},
"advanced": {"enable_auto_save": False},
"memory": {"base_decay_interval_hours": 24},
"person_profile": {"refresh_interval_minutes": 5},
},
)
manager = _KernelBackedRuntimeManager(kernel)
monkeypatch.setattr(memory_service_module, "get_plugin_runtime_manager", lambda: manager)
await kernel.initialize()
try:
yield {
"dataset": dataset,
"kernel": kernel,
"session": session,
"person_registry": registry,
}
finally:
await kernel.shutdown()
@pytest.mark.asyncio
async def test_long_novel_memory_benchmark(benchmark_env):
dataset = benchmark_env["dataset"]
session_id = benchmark_env["session"].session_id
created = await memory_service.import_admin(
action="create_paste",
name="long_novel_memory_benchmark.json",
input_mode="json",
llm_enabled=False,
content=json.dumps(dataset["import_payload"], ensure_ascii=False),
)
assert created["success"] is True
import_detail = await _wait_for_import_task(created["task"]["task_id"])
assert import_detail["task"]["status"] == "completed"
for record in dataset["chat_history_records"]:
summarizer = summarizer_module.ChatHistorySummarizer(session_id)
await summarizer._import_to_long_term_memory(
record_id=record["record_id"],
theme=record["theme"],
summary=record["summary"],
participants=record["participants"],
start_time=record["start_time"],
end_time=record["end_time"],
original_text=record["original_text"],
)
for payload in dataset["person_writebacks"]:
await person_info_module.store_person_memory_from_answer(
payload["person_name"],
payload["memory_content"],
session_id,
)
await memory_service.episode_admin(action="process_pending", limit=100, max_retry=2)
search_case_reports: List[Dict[str, Any]] = []
search_accuracy_at_1 = 0.0
search_recall_at_5 = 0.0
search_precision_at_5 = 0.0
search_mrr = 0.0
search_keyword_recall = 0.0
for case in dataset["search_cases"]:
result = await memory_service.search(case["query"], mode="search", respect_filter=False, limit=5)
joined = _join_hit_content(result)
rank = _first_relevant_rank(result, case["expected_keywords"], case.get("minimum_keyword_hits", len(case["expected_keywords"])))
relevant_hits = sum(
1
for hit in result.hits[:5]
if _keyword_hits(_hit_blob(hit), case["expected_keywords"]) >= max(1, int(case.get("minimum_keyword_hits", len(case["expected_keywords"]))))
)
keyword_recall = _keyword_recall(joined, case["expected_keywords"])
search_accuracy_at_1 += 1.0 if rank == 1 else 0.0
search_recall_at_5 += 1.0 if rank > 0 else 0.0
search_precision_at_5 += relevant_hits / float(max(1, min(5, len(result.hits))))
search_mrr += 1.0 / float(rank) if rank > 0 else 0.0
search_keyword_recall += keyword_recall
search_case_reports.append(
{
"query": case["query"],
"rank_of_first_relevant": rank,
"relevant_hits_top5": relevant_hits,
"keyword_recall_top5": keyword_recall,
"top_hit": result.hits[0].to_dict() if result.hits else None,
}
)
search_total = max(1, len(dataset["search_cases"]))
writeback_reports: List[Dict[str, Any]] = []
writeback_success_rate = 0.0
writeback_keyword_recall = 0.0
for payload in dataset["person_writebacks"]:
query = " ".join(payload["expected_keywords"])
result = await memory_service.search(
query,
mode="search",
chat_id=session_id,
person_id=payload["person_id"],
respect_filter=False,
limit=5,
)
joined = _join_hit_content(result)
recall = _keyword_recall(joined, payload["expected_keywords"])
success = bool(result.hits) and recall >= 0.67
writeback_success_rate += 1.0 if success else 0.0
writeback_keyword_recall += recall
writeback_reports.append(
{
"person_id": payload["person_id"],
"success": success,
"keyword_recall": recall,
"hit_count": len(result.hits),
}
)
writeback_total = max(1, len(dataset["person_writebacks"]))
knowledge_reports: List[Dict[str, Any]] = []
knowledge_success_rate = 0.0
knowledge_keyword_recall = 0.0
fetcher = knowledge_module.KnowledgeFetcher(
private_name=dataset["session"]["display_name"],
stream_id=session_id,
)
for case in dataset["knowledge_fetcher_cases"]:
knowledge_text, _ = await fetcher.fetch(case["query"], [])
recall = _keyword_recall(knowledge_text, case["expected_keywords"])
success = recall >= float(case.get("minimum_keyword_recall", 1.0))
knowledge_success_rate += 1.0 if success else 0.0
knowledge_keyword_recall += recall
knowledge_reports.append(
{
"query": case["query"],
"success": success,
"keyword_recall": recall,
"preview": knowledge_text[:300],
}
)
knowledge_total = max(1, len(dataset["knowledge_fetcher_cases"]))
profile_reports: List[Dict[str, Any]] = []
profile_success_rate = 0.0
profile_keyword_recall = 0.0
profile_evidence_rate = 0.0
for case in dataset["profile_cases"]:
profile = await memory_service.get_person_profile(case["person_id"], chat_id=session_id)
recall = _keyword_recall(profile.summary, case["expected_keywords"])
has_evidence = bool(profile.evidence)
success = recall >= float(case.get("minimum_keyword_recall", 1.0)) and has_evidence
profile_success_rate += 1.0 if success else 0.0
profile_keyword_recall += recall
profile_evidence_rate += 1.0 if has_evidence else 0.0
profile_reports.append(
{
"person_id": case["person_id"],
"success": success,
"keyword_recall": recall,
"evidence_count": len(profile.evidence),
"summary_preview": profile.summary[:240],
}
)
profile_total = max(1, len(dataset["profile_cases"]))
episode_generation_auto = await _evaluate_episode_generation(session_id=session_id, episode_cases=dataset["episode_cases"])
episode_admin_query_auto = await _evaluate_episode_admin_query(session_id=session_id, episode_cases=dataset["episode_cases"])
episode_search_mode_auto = await _evaluate_episode_search_mode(session_id=session_id, episode_cases=dataset["episode_cases"])
episode_rebuild = await memory_service.episode_admin(
action="rebuild",
source=f"chat_summary:{session_id}",
)
episode_generation_after_rebuild = await _evaluate_episode_generation(session_id=session_id, episode_cases=dataset["episode_cases"])
episode_admin_query_after_rebuild = await _evaluate_episode_admin_query(session_id=session_id, episode_cases=dataset["episode_cases"])
episode_search_mode_after_rebuild = await _evaluate_episode_search_mode(session_id=session_id, episode_cases=dataset["episode_cases"])
tool_modes = await _evaluate_tool_modes(session_id=session_id, dataset=dataset)
report = {
"dataset": dataset["meta"],
"import": {
"task_id": created["task"]["task_id"],
"status": import_detail["task"]["status"],
"paragraph_count": len(dataset["import_payload"]["paragraphs"]),
},
"metrics": {
"search": {
"accuracy_at_1": round(search_accuracy_at_1 / search_total, 4),
"recall_at_5": round(search_recall_at_5 / search_total, 4),
"precision_at_5": round(search_precision_at_5 / search_total, 4),
"mrr": round(search_mrr / search_total, 4),
"keyword_recall_at_5": round(search_keyword_recall / search_total, 4),
},
"writeback": {
"success_rate": round(writeback_success_rate / writeback_total, 4),
"keyword_recall": round(writeback_keyword_recall / writeback_total, 4),
},
"knowledge_fetcher": {
"success_rate": round(knowledge_success_rate / knowledge_total, 4),
"keyword_recall": round(knowledge_keyword_recall / knowledge_total, 4),
},
"profile": {
"success_rate": round(profile_success_rate / profile_total, 4),
"keyword_recall": round(profile_keyword_recall / profile_total, 4),
"evidence_rate": round(profile_evidence_rate / profile_total, 4),
},
"tool_modes": {
"success_rate": tool_modes["success_rate"],
"keyword_recall": tool_modes["keyword_recall"],
},
"episode_generation_auto": {
"success_rate": episode_generation_auto["success_rate"],
"keyword_recall": episode_generation_auto["keyword_recall"],
"episode_count": episode_generation_auto["episode_count"],
},
"episode_generation_after_rebuild": {
"success_rate": episode_generation_after_rebuild["success_rate"],
"keyword_recall": episode_generation_after_rebuild["keyword_recall"],
"episode_count": episode_generation_after_rebuild["episode_count"],
"rebuild_success": bool(episode_rebuild.get("success", False)),
},
"episode_admin_query_auto": {
"success_rate": episode_admin_query_auto["success_rate"],
"keyword_recall": episode_admin_query_auto["keyword_recall"],
},
"episode_admin_query_after_rebuild": {
"success_rate": episode_admin_query_after_rebuild["success_rate"],
"keyword_recall": episode_admin_query_after_rebuild["keyword_recall"],
"rebuild_success": bool(episode_rebuild.get("success", False)),
},
"episode_search_mode_auto": {
"success_rate": episode_search_mode_auto["success_rate"],
"keyword_recall": episode_search_mode_auto["keyword_recall"],
},
"episode_search_mode_after_rebuild": {
"success_rate": episode_search_mode_after_rebuild["success_rate"],
"keyword_recall": episode_search_mode_after_rebuild["keyword_recall"],
"rebuild_success": bool(episode_rebuild.get("success", False)),
},
},
"cases": {
"search": search_case_reports,
"writeback": writeback_reports,
"knowledge_fetcher": knowledge_reports,
"profile": profile_reports,
"tool_modes": tool_modes["reports"],
"episode_generation_auto": episode_generation_auto["reports"],
"episode_generation_after_rebuild": episode_generation_after_rebuild["reports"],
"episode_admin_query_auto": episode_admin_query_auto["reports"],
"episode_admin_query_after_rebuild": episode_admin_query_after_rebuild["reports"],
"episode_search_mode_auto": episode_search_mode_auto["reports"],
"episode_search_mode_after_rebuild": episode_search_mode_after_rebuild["reports"],
},
}
REPORT_FILE.parent.mkdir(parents=True, exist_ok=True)
REPORT_FILE.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(report["metrics"], ensure_ascii=False, indent=2))
assert report["import"]["status"] == "completed"
assert report["metrics"]["search"]["accuracy_at_1"] >= 0.35
assert report["metrics"]["search"]["recall_at_5"] >= 0.6
assert report["metrics"]["search"]["keyword_recall_at_5"] >= 0.8
assert report["metrics"]["writeback"]["success_rate"] >= 0.66
assert report["metrics"]["knowledge_fetcher"]["success_rate"] >= 0.66
assert report["metrics"]["knowledge_fetcher"]["keyword_recall"] >= 0.75
assert report["metrics"]["profile"]["success_rate"] >= 0.66
assert report["metrics"]["profile"]["evidence_rate"] >= 1.0
assert report["metrics"]["tool_modes"]["success_rate"] >= 0.75
assert report["metrics"]["episode_generation_after_rebuild"]["rebuild_success"] is True
assert report["metrics"]["episode_generation_after_rebuild"]["episode_count"] >= report["metrics"]["episode_generation_auto"]["episode_count"]

View File

@@ -0,0 +1,343 @@
from __future__ import annotations
import json
import os
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Dict, List
import pytest
import pytest_asyncio
from A_memorix.core.runtime.sdk_memory_kernel import SDKMemoryKernel
from pytests.test_long_novel_memory_benchmark import (
_evaluate_episode_admin_query,
_evaluate_episode_generation,
_evaluate_episode_search_mode,
_evaluate_tool_modes,
_KernelBackedRuntimeManager,
_KnownPerson,
_first_relevant_rank,
_hit_blob,
_join_hit_content,
_keyword_hits,
_keyword_recall,
_load_benchmark_fixture,
_wait_for_import_task,
)
from src.chat.brain_chat.PFC import pfc_KnowledgeFetcher as knowledge_module
from src.memory_system import chat_history_summarizer as summarizer_module
from src.person_info import person_info as person_info_module
from src.services import memory_service as memory_service_module
from src.services.memory_service import memory_service
pytestmark = pytest.mark.skipif(
os.getenv("MAIBOT_RUN_LIVE_MEMORY_TESTS") != "1",
reason="需要显式开启真实 external embedding benchmark",
)
REPORT_FILE = Path(__file__).parent / "data" / "benchmarks" / "results" / "long_novel_memory_benchmark_live_report.json"
@pytest_asyncio.fixture
async def benchmark_live_env(monkeypatch, tmp_path):
dataset = _load_benchmark_fixture()
session_cfg = dataset["session"]
session = SimpleNamespace(
session_id=session_cfg["session_id"],
platform=session_cfg["platform"],
user_id=session_cfg["user_id"],
group_id=session_cfg["group_id"],
)
fake_chat_manager = SimpleNamespace(
get_session_by_session_id=lambda session_id: session if session_id == session.session_id else None,
get_session_name=lambda session_id: session_cfg["display_name"] if session_id == session.session_id else session_id,
)
registry = {item["person_name"]: item["person_id"] for item in dataset["person_writebacks"]}
reverse_registry = {value: key for key, value in registry.items()}
monkeypatch.setattr(memory_service_module, "get_plugin_runtime_manager", None)
monkeypatch.setattr(summarizer_module, "_chat_manager", fake_chat_manager)
monkeypatch.setattr(knowledge_module, "_chat_manager", fake_chat_manager)
monkeypatch.setattr(person_info_module, "_chat_manager", fake_chat_manager)
monkeypatch.setattr(person_info_module, "get_person_id_by_person_name", lambda person_name: registry.get(str(person_name or "").strip(), ""))
monkeypatch.setattr(
person_info_module,
"Person",
lambda person_id: _KnownPerson(person_id=str(person_id or ""), registry=registry, reverse_registry=reverse_registry),
)
data_dir = (tmp_path / "a_memorix_live_benchmark_data").resolve()
kernel = SDKMemoryKernel(
plugin_root=tmp_path / "plugin_root",
config={
"storage": {"data_dir": str(data_dir)},
"advanced": {"enable_auto_save": False},
"memory": {"base_decay_interval_hours": 24},
"person_profile": {"refresh_interval_minutes": 5},
},
)
manager = _KernelBackedRuntimeManager(kernel)
monkeypatch.setattr(memory_service_module, "get_plugin_runtime_manager", lambda: manager)
await kernel.initialize()
try:
yield {
"dataset": dataset,
"kernel": kernel,
"session": session,
}
finally:
await kernel.shutdown()
@pytest.mark.asyncio
async def test_long_novel_memory_benchmark_live(benchmark_live_env):
dataset = benchmark_live_env["dataset"]
session_id = benchmark_live_env["session"].session_id
self_check = await memory_service.runtime_admin(action="refresh_self_check")
assert self_check["success"] is True
assert self_check["report"]["ok"] is True
created = await memory_service.import_admin(
action="create_paste",
name="long_novel_memory_benchmark.live.json",
input_mode="json",
llm_enabled=False,
content=json.dumps(dataset["import_payload"], ensure_ascii=False),
)
assert created["success"] is True
import_detail = await _wait_for_import_task(
created["task"]["task_id"],
max_rounds=2400,
sleep_seconds=0.25,
)
assert import_detail["task"]["status"] == "completed"
for record in dataset["chat_history_records"]:
summarizer = summarizer_module.ChatHistorySummarizer(session_id)
await summarizer._import_to_long_term_memory(
record_id=record["record_id"],
theme=record["theme"],
summary=record["summary"],
participants=record["participants"],
start_time=record["start_time"],
end_time=record["end_time"],
original_text=record["original_text"],
)
for payload in dataset["person_writebacks"]:
await person_info_module.store_person_memory_from_answer(
payload["person_name"],
payload["memory_content"],
session_id,
)
await memory_service.episode_admin(action="process_pending", limit=100, max_retry=2)
search_case_reports: List[Dict[str, Any]] = []
search_accuracy_at_1 = 0.0
search_recall_at_5 = 0.0
search_precision_at_5 = 0.0
search_mrr = 0.0
search_keyword_recall = 0.0
for case in dataset["search_cases"]:
result = await memory_service.search(case["query"], mode="search", respect_filter=False, limit=5)
joined = _join_hit_content(result)
rank = _first_relevant_rank(result, case["expected_keywords"], case.get("minimum_keyword_hits", len(case["expected_keywords"])))
relevant_hits = sum(
1
for hit in result.hits[:5]
if _keyword_hits(_hit_blob(hit), case["expected_keywords"]) >= max(1, int(case.get("minimum_keyword_hits", len(case["expected_keywords"]))))
)
keyword_recall = _keyword_recall(joined, case["expected_keywords"])
search_accuracy_at_1 += 1.0 if rank == 1 else 0.0
search_recall_at_5 += 1.0 if rank > 0 else 0.0
search_precision_at_5 += relevant_hits / float(max(1, min(5, len(result.hits))))
search_mrr += 1.0 / float(rank) if rank > 0 else 0.0
search_keyword_recall += keyword_recall
search_case_reports.append(
{
"query": case["query"],
"rank_of_first_relevant": rank,
"relevant_hits_top5": relevant_hits,
"keyword_recall_top5": keyword_recall,
"top_hit": result.hits[0].to_dict() if result.hits else None,
}
)
search_total = max(1, len(dataset["search_cases"]))
writeback_reports: List[Dict[str, Any]] = []
writeback_success_rate = 0.0
writeback_keyword_recall = 0.0
for payload in dataset["person_writebacks"]:
query = " ".join(payload["expected_keywords"])
result = await memory_service.search(
query,
mode="search",
chat_id=session_id,
person_id=payload["person_id"],
respect_filter=False,
limit=5,
)
joined = _join_hit_content(result)
recall = _keyword_recall(joined, payload["expected_keywords"])
success = bool(result.hits) and recall >= 0.67
writeback_success_rate += 1.0 if success else 0.0
writeback_keyword_recall += recall
writeback_reports.append(
{
"person_id": payload["person_id"],
"success": success,
"keyword_recall": recall,
"hit_count": len(result.hits),
}
)
writeback_total = max(1, len(dataset["person_writebacks"]))
knowledge_reports: List[Dict[str, Any]] = []
knowledge_success_rate = 0.0
knowledge_keyword_recall = 0.0
fetcher = knowledge_module.KnowledgeFetcher(
private_name=dataset["session"]["display_name"],
stream_id=session_id,
)
for case in dataset["knowledge_fetcher_cases"]:
knowledge_text, _ = await fetcher.fetch(case["query"], [])
recall = _keyword_recall(knowledge_text, case["expected_keywords"])
success = recall >= float(case.get("minimum_keyword_recall", 1.0))
knowledge_success_rate += 1.0 if success else 0.0
knowledge_keyword_recall += recall
knowledge_reports.append(
{
"query": case["query"],
"success": success,
"keyword_recall": recall,
"preview": knowledge_text[:300],
}
)
knowledge_total = max(1, len(dataset["knowledge_fetcher_cases"]))
profile_reports: List[Dict[str, Any]] = []
profile_success_rate = 0.0
profile_keyword_recall = 0.0
profile_evidence_rate = 0.0
for case in dataset["profile_cases"]:
profile = await memory_service.get_person_profile(case["person_id"], chat_id=session_id)
recall = _keyword_recall(profile.summary, case["expected_keywords"])
has_evidence = bool(profile.evidence)
success = recall >= float(case.get("minimum_keyword_recall", 1.0)) and has_evidence
profile_success_rate += 1.0 if success else 0.0
profile_keyword_recall += recall
profile_evidence_rate += 1.0 if has_evidence else 0.0
profile_reports.append(
{
"person_id": case["person_id"],
"success": success,
"keyword_recall": recall,
"evidence_count": len(profile.evidence),
"summary_preview": profile.summary[:240],
}
)
profile_total = max(1, len(dataset["profile_cases"]))
episode_generation_auto = await _evaluate_episode_generation(session_id=session_id, episode_cases=dataset["episode_cases"])
episode_admin_query_auto = await _evaluate_episode_admin_query(session_id=session_id, episode_cases=dataset["episode_cases"])
episode_search_mode_auto = await _evaluate_episode_search_mode(session_id=session_id, episode_cases=dataset["episode_cases"])
episode_rebuild = await memory_service.episode_admin(
action="rebuild",
source=f"chat_summary:{session_id}",
)
episode_generation_after_rebuild = await _evaluate_episode_generation(session_id=session_id, episode_cases=dataset["episode_cases"])
episode_admin_query_after_rebuild = await _evaluate_episode_admin_query(session_id=session_id, episode_cases=dataset["episode_cases"])
episode_search_mode_after_rebuild = await _evaluate_episode_search_mode(session_id=session_id, episode_cases=dataset["episode_cases"])
tool_modes = await _evaluate_tool_modes(session_id=session_id, dataset=dataset)
report = {
"dataset": dataset["meta"],
"runtime_self_check": self_check["report"],
"import": {
"task_id": created["task"]["task_id"],
"status": import_detail["task"]["status"],
"paragraph_count": len(dataset["import_payload"]["paragraphs"]),
},
"metrics": {
"search": {
"accuracy_at_1": round(search_accuracy_at_1 / search_total, 4),
"recall_at_5": round(search_recall_at_5 / search_total, 4),
"precision_at_5": round(search_precision_at_5 / search_total, 4),
"mrr": round(search_mrr / search_total, 4),
"keyword_recall_at_5": round(search_keyword_recall / search_total, 4),
},
"writeback": {
"success_rate": round(writeback_success_rate / writeback_total, 4),
"keyword_recall": round(writeback_keyword_recall / writeback_total, 4),
},
"knowledge_fetcher": {
"success_rate": round(knowledge_success_rate / knowledge_total, 4),
"keyword_recall": round(knowledge_keyword_recall / knowledge_total, 4),
},
"profile": {
"success_rate": round(profile_success_rate / profile_total, 4),
"keyword_recall": round(profile_keyword_recall / profile_total, 4),
"evidence_rate": round(profile_evidence_rate / profile_total, 4),
},
"tool_modes": {
"success_rate": tool_modes["success_rate"],
"keyword_recall": tool_modes["keyword_recall"],
},
"episode_generation_auto": {
"success_rate": episode_generation_auto["success_rate"],
"keyword_recall": episode_generation_auto["keyword_recall"],
"episode_count": episode_generation_auto["episode_count"],
},
"episode_generation_after_rebuild": {
"success_rate": episode_generation_after_rebuild["success_rate"],
"keyword_recall": episode_generation_after_rebuild["keyword_recall"],
"episode_count": episode_generation_after_rebuild["episode_count"],
"rebuild_success": bool(episode_rebuild.get("success", False)),
},
"episode_admin_query_auto": {
"success_rate": episode_admin_query_auto["success_rate"],
"keyword_recall": episode_admin_query_auto["keyword_recall"],
},
"episode_admin_query_after_rebuild": {
"success_rate": episode_admin_query_after_rebuild["success_rate"],
"keyword_recall": episode_admin_query_after_rebuild["keyword_recall"],
"rebuild_success": bool(episode_rebuild.get("success", False)),
},
"episode_search_mode_auto": {
"success_rate": episode_search_mode_auto["success_rate"],
"keyword_recall": episode_search_mode_auto["keyword_recall"],
},
"episode_search_mode_after_rebuild": {
"success_rate": episode_search_mode_after_rebuild["success_rate"],
"keyword_recall": episode_search_mode_after_rebuild["keyword_recall"],
"rebuild_success": bool(episode_rebuild.get("success", False)),
},
},
"cases": {
"search": search_case_reports,
"writeback": writeback_reports,
"knowledge_fetcher": knowledge_reports,
"profile": profile_reports,
"tool_modes": tool_modes["reports"],
"episode_generation_auto": episode_generation_auto["reports"],
"episode_generation_after_rebuild": episode_generation_after_rebuild["reports"],
"episode_admin_query_auto": episode_admin_query_auto["reports"],
"episode_admin_query_after_rebuild": episode_admin_query_after_rebuild["reports"],
"episode_search_mode_auto": episode_search_mode_auto["reports"],
"episode_search_mode_after_rebuild": episode_search_mode_after_rebuild["reports"],
},
}
REPORT_FILE.parent.mkdir(parents=True, exist_ok=True)
REPORT_FILE.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(report["metrics"], ensure_ascii=False, indent=2))
assert report["import"]["status"] == "completed"
assert report["runtime_self_check"]["ok"] is True

View File

@@ -0,0 +1,138 @@
from types import SimpleNamespace
import pytest
from src.services import memory_flow_service as memory_flow_module
@pytest.mark.asyncio
async def test_long_term_memory_session_manager_reuses_single_summarizer(monkeypatch):
starts: list[str] = []
summarizers: list[object] = []
class FakeSummarizer:
def __init__(self, session_id: str):
self.session_id = session_id
summarizers.append(self)
async def start(self):
starts.append(self.session_id)
async def stop(self):
starts.append(f"stop:{self.session_id}")
monkeypatch.setattr(
memory_flow_module,
"global_config",
SimpleNamespace(memory=SimpleNamespace(long_term_auto_summary_enabled=True)),
)
monkeypatch.setattr(memory_flow_module, "ChatHistorySummarizer", FakeSummarizer)
manager = memory_flow_module.LongTermMemorySessionManager()
message = SimpleNamespace(session_id="session-1")
await manager.on_message(message)
await manager.on_message(message)
assert len(summarizers) == 1
assert starts == ["session-1"]
@pytest.mark.asyncio
async def test_long_term_memory_session_manager_shutdown_stops_all(monkeypatch):
stopped: list[str] = []
class FakeSummarizer:
def __init__(self, session_id: str):
self.session_id = session_id
async def start(self):
return None
async def stop(self):
stopped.append(self.session_id)
monkeypatch.setattr(
memory_flow_module,
"global_config",
SimpleNamespace(memory=SimpleNamespace(long_term_auto_summary_enabled=True)),
)
monkeypatch.setattr(memory_flow_module, "ChatHistorySummarizer", FakeSummarizer)
manager = memory_flow_module.LongTermMemorySessionManager()
await manager.on_message(SimpleNamespace(session_id="session-a"))
await manager.on_message(SimpleNamespace(session_id="session-b"))
await manager.shutdown()
assert stopped == ["session-a", "session-b"]
def test_person_fact_parse_fact_list_deduplicates_and_filters_short_items():
raw = '["他喜欢猫", "他喜欢猫", "", "", "他会弹吉他"]'
result = memory_flow_module.PersonFactWritebackService._parse_fact_list(raw)
assert result == ["他喜欢猫", "他会弹吉他"]
def test_person_fact_looks_ephemeral_detects_short_chitchat():
assert memory_flow_module.PersonFactWritebackService._looks_ephemeral("哈哈")
assert memory_flow_module.PersonFactWritebackService._looks_ephemeral("好的?")
assert not memory_flow_module.PersonFactWritebackService._looks_ephemeral("她最近在学法语和钢琴")
def test_person_fact_resolve_target_person_for_private_chat(monkeypatch):
class FakePerson:
def __init__(self, person_id: str):
self.person_id = person_id
self.is_known = True
service = memory_flow_module.PersonFactWritebackService.__new__(memory_flow_module.PersonFactWritebackService)
monkeypatch.setattr(memory_flow_module, "is_bot_self", lambda platform, user_id: False)
monkeypatch.setattr(memory_flow_module, "get_person_id", lambda platform, user_id: f"{platform}:{user_id}")
monkeypatch.setattr(memory_flow_module, "Person", FakePerson)
message = SimpleNamespace(session=SimpleNamespace(platform="qq", user_id="123", group_id=""))
person = service._resolve_target_person(message)
assert person is not None
assert person.person_id == "qq:123"
@pytest.mark.asyncio
async def test_memory_automation_service_auto_starts_and_delegates(monkeypatch):
events: list[tuple[str, str]] = []
class FakeSessionManager:
async def on_message(self, message):
events.append(("incoming", message.session_id))
async def shutdown(self):
events.append(("shutdown", "session"))
class FakeFactWriteback:
async def start(self):
events.append(("start", "fact"))
async def enqueue(self, message):
events.append(("sent", message.session_id))
async def shutdown(self):
events.append(("shutdown", "fact"))
service = memory_flow_module.MemoryAutomationService()
service.session_manager = FakeSessionManager()
service.fact_writeback = FakeFactWriteback()
await service.on_incoming_message(SimpleNamespace(session_id="session-1"))
await service.on_message_sent(SimpleNamespace(session_id="session-1"))
await service.shutdown()
assert events == [
("start", "fact"),
("incoming", "session-1"),
("sent", "session-1"),
("shutdown", "session"),
("shutdown", "fact"),
]

View File

@@ -0,0 +1,281 @@
import pytest
from src.services.memory_service import MemorySearchResult, MemoryService
def test_coerce_write_result_treats_skipped_payload_as_success():
result = MemoryService._coerce_write_result({"skipped_ids": ["p1"], "detail": "chat_filtered"})
assert result.success is True
assert result.stored_ids == []
assert result.skipped_ids == ["p1"]
assert result.detail == "chat_filtered"
@pytest.mark.asyncio
async def test_graph_admin_invokes_plugin(monkeypatch):
service = MemoryService()
calls = []
async def fake_invoke(component_name, args=None, **kwargs):
calls.append((component_name, args, kwargs))
return {"success": True, "nodes": [], "edges": []}
monkeypatch.setattr(service, "_invoke", fake_invoke)
result = await service.graph_admin(action="get_graph", limit=12)
assert result["success"] is True
assert calls == [("memory_graph_admin", {"action": "get_graph", "limit": 12}, {"timeout_ms": 30000})]
@pytest.mark.asyncio
async def test_get_recycle_bin_uses_maintain_memory_tool(monkeypatch):
service = MemoryService()
calls = []
async def fake_invoke(component_name, args=None, **kwargs):
calls.append((component_name, args))
return {"success": True, "items": [{"hash": "abc"}], "count": 1}
monkeypatch.setattr(service, "_invoke", fake_invoke)
result = await service.get_recycle_bin(limit=5)
assert result == {"success": True, "items": [{"hash": "abc"}], "count": 1}
assert calls == [("maintain_memory", {"action": "recycle_bin", "limit": 5})]
@pytest.mark.asyncio
async def test_search_respects_filter_by_default(monkeypatch):
service = MemoryService()
calls = []
async def fake_invoke(component_name, args=None, **kwargs):
calls.append((component_name, args))
return {"summary": "ok", "hits": [], "filtered": True}
monkeypatch.setattr(service, "_invoke", fake_invoke)
result = await service.search(
"mai",
chat_id="stream-1",
person_id="person-1",
user_id="user-1",
group_id="",
)
assert isinstance(result, MemorySearchResult)
assert result.filtered is True
assert calls == [
(
"search_memory",
{
"query": "mai",
"limit": 5,
"mode": "hybrid",
"chat_id": "stream-1",
"person_id": "person-1",
"time_start": None,
"time_end": None,
"respect_filter": True,
"user_id": "user-1",
"group_id": "",
},
)
]
@pytest.mark.asyncio
async def test_ingest_summary_can_bypass_filter(monkeypatch):
service = MemoryService()
calls = []
async def fake_invoke(component_name, args=None, **kwargs):
calls.append((component_name, args))
return {"success": True, "stored_ids": ["p1"], "detail": ""}
monkeypatch.setattr(service, "_invoke", fake_invoke)
result = await service.ingest_summary(
external_id="chat_history:1",
chat_id="stream-1",
text="summary",
respect_filter=False,
user_id="user-1",
)
assert result.success is True
assert calls == [
(
"ingest_summary",
{
"external_id": "chat_history:1",
"chat_id": "stream-1",
"text": "summary",
"participants": [],
"time_start": None,
"time_end": None,
"tags": [],
"metadata": {},
"respect_filter": False,
"user_id": "user-1",
"group_id": "",
},
)
]
@pytest.mark.asyncio
async def test_v5_admin_invokes_plugin(monkeypatch):
service = MemoryService()
calls = []
async def fake_invoke(component_name, args=None, **kwargs):
calls.append((component_name, args, kwargs))
return {"success": True, "count": 1}
monkeypatch.setattr(service, "_invoke", fake_invoke)
result = await service.v5_admin(action="status", target="mai", limit=5)
assert result["success"] is True
assert calls == [("memory_v5_admin", {"action": "status", "target": "mai", "limit": 5}, {"timeout_ms": 30000})]
@pytest.mark.asyncio
async def test_delete_admin_uses_long_timeout(monkeypatch):
service = MemoryService()
calls = []
async def fake_invoke(component_name, args=None, **kwargs):
calls.append((component_name, args, kwargs))
return {"success": True, "operation_id": "del-1"}
monkeypatch.setattr(service, "_invoke", fake_invoke)
result = await service.delete_admin(action="execute", mode="relation", selector={"query": "mai"})
assert result["success"] is True
assert calls == [
(
"memory_delete_admin",
{"action": "execute", "mode": "relation", "selector": {"query": "mai"}},
{"timeout_ms": 120000},
)
]
@pytest.mark.asyncio
async def test_search_returns_empty_when_query_and_time_missing_async():
service = MemoryService()
result = await service.search("", time_start=None, time_end=None)
assert isinstance(result, MemorySearchResult)
assert result.summary == ""
assert result.hits == []
assert result.filtered is False
@pytest.mark.asyncio
async def test_search_accepts_string_time_bounds(monkeypatch):
service = MemoryService()
calls = []
async def fake_invoke(component_name, args=None, **kwargs):
calls.append((component_name, args))
return {"summary": "ok", "hits": [], "filtered": False}
monkeypatch.setattr(service, "_invoke", fake_invoke)
result = await service.search(
"广播站",
mode="time",
time_start="2026/03/18",
time_end="2026/03/18 09:30",
)
assert isinstance(result, MemorySearchResult)
assert calls == [
(
"search_memory",
{
"query": "广播站",
"limit": 5,
"mode": "time",
"chat_id": "",
"person_id": "",
"time_start": "2026/03/18",
"time_end": "2026/03/18 09:30",
"respect_filter": True,
"user_id": "",
"group_id": "",
},
)
]
def test_coerce_search_result_preserves_aggregate_source_branches():
result = MemoryService._coerce_search_result(
{
"hits": [
{
"content": "广播站值夜班",
"type": "paragraph",
"metadata": {"event_time_start": 1.0},
"source_branches": ["search", "time"],
"rank": 1,
}
]
}
)
assert result.hits[0].metadata["source_branches"] == ["search", "time"]
assert result.hits[0].metadata["rank"] == 1
@pytest.mark.asyncio
async def test_import_admin_uses_long_timeout(monkeypatch):
service = MemoryService()
calls = []
async def fake_invoke(component_name, args=None, **kwargs):
calls.append((component_name, args, kwargs))
return {"success": True, "task_id": "import-1"}
monkeypatch.setattr(service, "_invoke", fake_invoke)
result = await service.import_admin(action="create_lpmm_openie", alias="lpmm")
assert result["success"] is True
assert calls == [
(
"memory_import_admin",
{"action": "create_lpmm_openie", "alias": "lpmm"},
{"timeout_ms": 120000},
)
]
@pytest.mark.asyncio
async def test_tuning_admin_uses_long_timeout(monkeypatch):
service = MemoryService()
calls = []
async def fake_invoke(component_name, args=None, **kwargs):
calls.append((component_name, args, kwargs))
return {"success": True, "task_id": "tuning-1"}
monkeypatch.setattr(service, "_invoke", fake_invoke)
result = await service.tuning_admin(action="create_task", payload={"query": "mai"})
assert result["success"] is True
assert calls == [
(
"memory_tuning_admin",
{"action": "create_task", "payload": {"query": "mai"}},
{"timeout_ms": 120000},
)
]

View File

@@ -0,0 +1,81 @@
from types import SimpleNamespace
import pytest
from src.person_info import person_info as person_info_module
@pytest.mark.asyncio
async def test_store_person_memory_from_answer_writes_person_fact(monkeypatch):
calls = []
class FakePerson:
def __init__(self, person_id: str):
self.person_id = person_id
self.person_name = "Alice"
self.is_known = True
async def fake_ingest_text(**kwargs):
calls.append(kwargs)
return SimpleNamespace(success=True, detail="", stored_ids=["p1"])
session = SimpleNamespace(platform="qq", user_id="10001", group_id="", session_id="session-1")
monkeypatch.setattr(person_info_module, "_chat_manager", SimpleNamespace(get_session_by_session_id=lambda chat_id: session))
monkeypatch.setattr(person_info_module, "get_person_id_by_person_name", lambda person_name: "person-1")
monkeypatch.setattr(person_info_module, "Person", FakePerson)
monkeypatch.setattr(person_info_module.memory_service, "ingest_text", fake_ingest_text)
await person_info_module.store_person_memory_from_answer("Alice", "她喜欢猫和爵士乐", "session-1")
assert len(calls) == 1
payload = calls[0]
assert payload["external_id"].startswith("person_fact:person-1:")
assert payload["source_type"] == "person_fact"
assert payload["chat_id"] == "session-1"
assert payload["person_ids"] == ["person-1"]
assert payload["participants"] == ["Alice"]
assert payload["respect_filter"] is True
assert payload["user_id"] == "10001"
assert payload["group_id"] == ""
assert payload["metadata"]["person_id"] == "person-1"
@pytest.mark.asyncio
async def test_store_person_memory_from_answer_skips_unknown_person(monkeypatch):
calls = []
class FakePerson:
def __init__(self, person_id: str):
self.person_id = person_id
self.person_name = "Unknown"
self.is_known = False
async def fake_ingest_text(**kwargs):
calls.append(kwargs)
return SimpleNamespace(success=True, detail="", stored_ids=["p1"])
session = SimpleNamespace(platform="qq", user_id="10001", group_id="", session_id="session-1")
monkeypatch.setattr(person_info_module, "_chat_manager", SimpleNamespace(get_session_by_session_id=lambda chat_id: session))
monkeypatch.setattr(person_info_module, "get_person_id_by_person_name", lambda person_name: "person-1")
monkeypatch.setattr(person_info_module, "Person", FakePerson)
monkeypatch.setattr(person_info_module.memory_service, "ingest_text", fake_ingest_text)
await person_info_module.store_person_memory_from_answer("Alice", "她喜欢猫和爵士乐", "session-1")
assert calls == []
@pytest.mark.asyncio
async def test_store_person_memory_from_answer_skips_empty_content(monkeypatch):
calls = []
async def fake_ingest_text(**kwargs):
calls.append(kwargs)
return SimpleNamespace(success=True, detail="", stored_ids=["p1"])
monkeypatch.setattr(person_info_module.memory_service, "ingest_text", fake_ingest_text)
await person_info_module.store_person_memory_from_answer("Alice", " ", "session-1")
assert calls == []

View File

@@ -0,0 +1,184 @@
from __future__ import annotations
from datetime import datetime
from types import SimpleNamespace
import pytest
from src.memory_system.retrieval_tools import query_long_term_memory as tool_module
from src.memory_system.retrieval_tools import init_all_tools
from src.memory_system.retrieval_tools.query_long_term_memory import (
_resolve_time_expression,
query_long_term_memory,
register_tool,
)
from src.memory_system.retrieval_tools.tool_registry import get_tool_registry
from src.services.memory_service import MemoryHit, MemorySearchResult
def test_resolve_time_expression_supports_relative_and_absolute_inputs():
now = datetime(2026, 3, 18, 15, 30)
start_ts, end_ts, start_text, end_text = _resolve_time_expression("今天", now=now)
assert datetime.fromtimestamp(start_ts) == datetime(2026, 3, 18, 0, 0)
assert datetime.fromtimestamp(end_ts) == datetime(2026, 3, 18, 23, 59)
assert start_text == "2026/03/18 00:00"
assert end_text == "2026/03/18 23:59"
start_ts, end_ts, start_text, end_text = _resolve_time_expression("最近7天", now=now)
assert datetime.fromtimestamp(start_ts) == datetime(2026, 3, 12, 0, 0)
assert datetime.fromtimestamp(end_ts) == datetime(2026, 3, 18, 23, 59)
assert start_text == "2026/03/12 00:00"
assert end_text == "2026/03/18 23:59"
start_ts, end_ts, start_text, end_text = _resolve_time_expression("2026/03/18", now=now)
assert datetime.fromtimestamp(start_ts) == datetime(2026, 3, 18, 0, 0)
assert datetime.fromtimestamp(end_ts) == datetime(2026, 3, 18, 23, 59)
assert start_text == "2026/03/18 00:00"
assert end_text == "2026/03/18 23:59"
start_ts, end_ts, start_text, end_text = _resolve_time_expression("2026/03/18 09:30", now=now)
assert datetime.fromtimestamp(start_ts) == datetime(2026, 3, 18, 9, 30)
assert datetime.fromtimestamp(end_ts) == datetime(2026, 3, 18, 9, 30)
assert start_text == "2026/03/18 09:30"
assert end_text == "2026/03/18 09:30"
def test_register_tool_exposes_mode_and_time_expression():
register_tool()
tool = get_tool_registry().get_tool("search_long_term_memory")
assert tool is not None
params = {item["name"]: item for item in tool.parameters}
assert "mode" in params
assert params["mode"]["enum"] == ["search", "time", "episode", "aggregate"]
assert "time_expression" in params
assert params["query"]["required"] is False
def test_init_all_tools_registers_long_term_memory_tool():
init_all_tools()
tool = get_tool_registry().get_tool("search_long_term_memory")
assert tool is not None
@pytest.mark.asyncio
async def test_query_long_term_memory_search_mode_maps_to_hybrid(monkeypatch):
captured = {}
async def fake_search(query, **kwargs):
captured["query"] = query
captured["kwargs"] = kwargs
return MemorySearchResult(
hits=[MemoryHit(content="Alice 喜欢猫", score=0.9, hit_type="paragraph")],
)
monkeypatch.setattr(tool_module, "memory_service", SimpleNamespace(search=fake_search))
text = await query_long_term_memory("Alice 喜欢什么", chat_id="stream-1", person_id="person-1")
assert "Alice 喜欢猫" in text
assert captured == {
"query": "Alice 喜欢什么",
"kwargs": {
"limit": 5,
"mode": "hybrid",
"chat_id": "stream-1",
"person_id": "person-1",
"time_start": None,
"time_end": None,
},
}
@pytest.mark.asyncio
async def test_query_long_term_memory_time_mode_parses_expression(monkeypatch):
captured = {}
async def fake_search(query, **kwargs):
captured["query"] = query
captured["kwargs"] = kwargs
return MemorySearchResult(
hits=[
MemoryHit(
content="昨天晚上广播站停播了十分钟。",
score=0.8,
hit_type="paragraph",
metadata={"event_time_start": 1773797400.0},
)
]
)
monkeypatch.setattr(tool_module, "memory_service", SimpleNamespace(search=fake_search))
monkeypatch.setattr(
tool_module,
"_resolve_time_expression",
lambda expression, now=None: (1773795600.0, 1773881940.0, "2026/03/17 00:00", "2026/03/17 23:59"),
)
text = await query_long_term_memory(
query="广播站",
mode="time",
time_expression="昨天",
chat_id="stream-1",
)
assert "指定时间范围" in text
assert "广播站停播" in text
assert captured == {
"query": "广播站",
"kwargs": {
"limit": 5,
"mode": "time",
"chat_id": "stream-1",
"person_id": "",
"time_start": 1773795600.0,
"time_end": 1773881940.0,
},
}
@pytest.mark.asyncio
async def test_query_long_term_memory_episode_and_aggregate_format_output(monkeypatch):
responses = {
"episode": MemorySearchResult(
hits=[
MemoryHit(
content="苏弦在灯塔拆开了那封冬信。",
title="冬信重见天日",
hit_type="episode",
metadata={"participants": ["苏弦"], "keywords": ["冬信", "灯塔"]},
)
]
),
"aggregate": MemorySearchResult(
hits=[
MemoryHit(
content="唐未在广播站值夜班时带着黑狗墨点。",
hit_type="paragraph",
metadata={"source_branches": ["search", "time"]},
)
]
),
}
async def fake_search(query, **kwargs):
return responses[kwargs["mode"]]
monkeypatch.setattr(tool_module, "memory_service", SimpleNamespace(search=fake_search))
episode_text = await query_long_term_memory("那封冬信后来怎么样了", mode="episode")
aggregate_text = await query_long_term_memory("唐未最近有什么线索", mode="aggregate")
assert "事件《冬信重见天日》" in episode_text
assert "参与者:苏弦" in episode_text
assert "[search,time][paragraph]" in aggregate_text
@pytest.mark.asyncio
async def test_query_long_term_memory_invalid_time_expression_returns_retryable_message():
text = await query_long_term_memory(query="广播站", mode="time", time_expression="明年春分后第三周")
assert "无法解析" in text
assert "最近7天" in text

View File

@@ -0,0 +1,335 @@
from __future__ import annotations
import asyncio
import inspect
import json
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Dict
import numpy as np
import pytest
import pytest_asyncio
from A_memorix.core.runtime import sdk_memory_kernel as kernel_module
from A_memorix.core.runtime.sdk_memory_kernel import KernelSearchRequest, SDKMemoryKernel
from src.chat.brain_chat.PFC import pfc_KnowledgeFetcher as knowledge_module
from src.memory_system import chat_history_summarizer as summarizer_module
from src.person_info import person_info as person_info_module
from src.services import memory_service as memory_service_module
from src.services.memory_service import memory_service
DATA_FILE = Path(__file__).parent / "data" / "real_dialogues" / "private_alice_weekend.json"
def _load_dialogue_fixture() -> Dict[str, Any]:
return json.loads(DATA_FILE.read_text(encoding="utf-8"))
class _FakeEmbeddingAdapter:
def __init__(self, dimension: int = 16) -> None:
self.dimension = dimension
async def _detect_dimension(self) -> int:
return self.dimension
async def encode(self, texts, dimensions=None):
dim = int(dimensions or self.dimension)
if isinstance(texts, str):
sequence = [texts]
single = True
else:
sequence = list(texts)
single = False
rows = []
for text in sequence:
vec = np.zeros(dim, dtype=np.float32)
for ch in str(text or ""):
vec[ord(ch) % dim] += 1.0
if not vec.any():
vec[0] = 1.0
norm = np.linalg.norm(vec)
if norm > 0:
vec = vec / norm
rows.append(vec)
payload = np.vstack(rows)
return payload[0] if single else payload
class _KernelBackedRuntimeManager:
is_running = True
def __init__(self, kernel: SDKMemoryKernel) -> None:
self.kernel = kernel
async def invoke_plugin(
self,
*,
method: str,
plugin_id: str,
component_name: str,
args: Dict[str, Any] | None,
timeout_ms: int,
):
del method, plugin_id, timeout_ms
payload = args or {}
if component_name == "search_memory":
return await self.kernel.search_memory(
KernelSearchRequest(
query=str(payload.get("query", "") or ""),
limit=int(payload.get("limit", 5) or 5),
mode=str(payload.get("mode", "hybrid") or "hybrid"),
chat_id=str(payload.get("chat_id", "") or ""),
person_id=str(payload.get("person_id", "") or ""),
time_start=payload.get("time_start"),
time_end=payload.get("time_end"),
respect_filter=bool(payload.get("respect_filter", True)),
user_id=str(payload.get("user_id", "") or ""),
group_id=str(payload.get("group_id", "") or ""),
)
)
handler = getattr(self.kernel, component_name)
result = handler(**payload)
return await result if inspect.isawaitable(result) else result
async def _wait_for_import_task(task_id: str, *, max_rounds: int = 100) -> Dict[str, Any]:
for _ in range(max_rounds):
detail = await memory_service.import_admin(action="get", task_id=task_id, include_chunks=True)
task = detail.get("task") or {}
status = str(task.get("status", "") or "")
if status in {"completed", "completed_with_errors", "failed", "cancelled"}:
return detail
await asyncio.sleep(0.05)
raise AssertionError(f"导入任务在等待窗口内未结束: {task_id}")
def _join_hit_content(search_result) -> str:
return "\n".join(hit.content for hit in search_result.hits)
@pytest_asyncio.fixture
async def real_dialogue_env(monkeypatch, tmp_path):
scenario = _load_dialogue_fixture()
session_cfg = scenario["session"]
session = SimpleNamespace(
session_id=session_cfg["session_id"],
platform=session_cfg["platform"],
user_id=session_cfg["user_id"],
group_id=session_cfg["group_id"],
)
fake_chat_manager = SimpleNamespace(
get_session_by_session_id=lambda session_id: session if session_id == session.session_id else None,
get_session_name=lambda session_id: session_cfg["display_name"] if session_id == session.session_id else session_id,
)
monkeypatch.setattr(kernel_module, "create_embedding_api_adapter", lambda **kwargs: _FakeEmbeddingAdapter())
async def fake_self_check(**kwargs):
return {"ok": True, "message": "ok"}
monkeypatch.setattr(kernel_module, "run_embedding_runtime_self_check", fake_self_check)
monkeypatch.setattr(memory_service_module, "get_plugin_runtime_manager", None)
monkeypatch.setattr(summarizer_module, "_chat_manager", fake_chat_manager)
monkeypatch.setattr(knowledge_module, "_chat_manager", fake_chat_manager)
monkeypatch.setattr(person_info_module, "_chat_manager", fake_chat_manager)
data_dir = (tmp_path / "a_memorix_data").resolve()
kernel = SDKMemoryKernel(
plugin_root=tmp_path / "plugin_root",
config={
"storage": {"data_dir": str(data_dir)},
"advanced": {"enable_auto_save": False},
"memory": {"base_decay_interval_hours": 24},
"person_profile": {"refresh_interval_minutes": 5},
},
)
manager = _KernelBackedRuntimeManager(kernel)
monkeypatch.setattr(memory_service_module, "get_plugin_runtime_manager", lambda: manager)
await kernel.initialize()
try:
yield {
"scenario": scenario,
"kernel": kernel,
"session": session,
}
finally:
await kernel.shutdown()
@pytest.mark.asyncio
async def test_real_dialogue_import_flow_makes_fixture_searchable(real_dialogue_env):
scenario = real_dialogue_env["scenario"]
created = await memory_service.import_admin(
action="create_paste",
name="private_alice.json",
input_mode="json",
llm_enabled=False,
content=json.dumps(scenario["import_payload"], ensure_ascii=False),
)
assert created["success"] is True
detail = await _wait_for_import_task(created["task"]["task_id"])
assert detail["task"]["status"] == "completed"
search = await memory_service.search(
scenario["search_queries"]["direct"],
mode="search",
respect_filter=False,
)
assert search.hits
joined = _join_hit_content(search)
for keyword in scenario["expectations"]["search_keywords"]:
assert keyword in joined
@pytest.mark.asyncio
async def test_real_dialogue_summarizer_flow_persists_summary_to_long_term_memory(real_dialogue_env):
scenario = real_dialogue_env["scenario"]
record = scenario["chat_history_record"]
summarizer = summarizer_module.ChatHistorySummarizer(real_dialogue_env["session"].session_id)
await summarizer._import_to_long_term_memory(
record_id=record["record_id"],
theme=record["theme"],
summary=record["summary"],
participants=record["participants"],
start_time=record["start_time"],
end_time=record["end_time"],
original_text=record["original_text"],
)
search = await memory_service.search(
scenario["search_queries"]["direct"],
mode="search",
chat_id=real_dialogue_env["session"].session_id,
)
assert search.hits
joined = _join_hit_content(search)
for keyword in scenario["expectations"]["search_keywords"]:
assert keyword in joined
@pytest.mark.asyncio
async def test_real_dialogue_person_fact_writeback_is_searchable(real_dialogue_env, monkeypatch):
scenario = real_dialogue_env["scenario"]
class _KnownPerson:
def __init__(self, person_id: str) -> None:
self.person_id = person_id
self.is_known = True
self.person_name = scenario["person"]["person_name"]
monkeypatch.setattr(
person_info_module,
"get_person_id_by_person_name",
lambda person_name: scenario["person"]["person_id"],
)
monkeypatch.setattr(person_info_module, "Person", _KnownPerson)
await person_info_module.store_person_memory_from_answer(
scenario["person"]["person_name"],
scenario["person_fact"]["memory_content"],
real_dialogue_env["session"].session_id,
)
search = await memory_service.search(
scenario["search_queries"]["direct"],
mode="search",
chat_id=real_dialogue_env["session"].session_id,
person_id=scenario["person"]["person_id"],
)
assert search.hits
joined = _join_hit_content(search)
for keyword in scenario["expectations"]["search_keywords"]:
assert keyword in joined
@pytest.mark.asyncio
async def test_real_dialogue_private_knowledge_fetcher_reads_long_term_memory(real_dialogue_env):
scenario = real_dialogue_env["scenario"]
await memory_service.ingest_text(
external_id="fixture:knowledge_fetcher",
source_type="dialogue_note",
text=scenario["person_fact"]["memory_content"],
chat_id=real_dialogue_env["session"].session_id,
person_ids=[scenario["person"]["person_id"]],
participants=[scenario["person"]["person_name"]],
respect_filter=False,
)
fetcher = knowledge_module.KnowledgeFetcher(
private_name=scenario["session"]["display_name"],
stream_id=real_dialogue_env["session"].session_id,
)
knowledge_text, _ = await fetcher.fetch(scenario["search_queries"]["knowledge_fetcher"], [])
for keyword in scenario["expectations"]["search_keywords"]:
assert keyword in knowledge_text
@pytest.mark.asyncio
async def test_real_dialogue_person_profile_contains_stable_traits(real_dialogue_env, monkeypatch):
scenario = real_dialogue_env["scenario"]
class _KnownPerson:
def __init__(self, person_id: str) -> None:
self.person_id = person_id
self.is_known = True
self.person_name = scenario["person"]["person_name"]
monkeypatch.setattr(
person_info_module,
"get_person_id_by_person_name",
lambda person_name: scenario["person"]["person_id"],
)
monkeypatch.setattr(person_info_module, "Person", _KnownPerson)
await person_info_module.store_person_memory_from_answer(
scenario["person"]["person_name"],
scenario["person_fact"]["memory_content"],
real_dialogue_env["session"].session_id,
)
profile = await memory_service.get_person_profile(
scenario["person"]["person_id"],
chat_id=real_dialogue_env["session"].session_id,
)
assert profile.evidence
assert any(keyword in profile.summary for keyword in scenario["expectations"]["profile_keywords"])
@pytest.mark.asyncio
async def test_real_dialogue_summary_flow_generates_queryable_episode(real_dialogue_env):
scenario = real_dialogue_env["scenario"]
record = scenario["chat_history_record"]
summarizer = summarizer_module.ChatHistorySummarizer(real_dialogue_env["session"].session_id)
await summarizer._import_to_long_term_memory(
record_id=record["record_id"],
theme=record["theme"],
summary=record["summary"],
participants=record["participants"],
start_time=record["start_time"],
end_time=record["end_time"],
original_text=record["original_text"],
)
episodes = await memory_service.episode_admin(
action="query",
source=scenario["expectations"]["episode_source"],
limit=5,
)
assert episodes["success"] is True
assert int(episodes["count"]) >= 1

View File

@@ -0,0 +1,312 @@
from __future__ import annotations
import asyncio
import inspect
import json
import os
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Dict
import pytest
import pytest_asyncio
from A_memorix.core.runtime.sdk_memory_kernel import KernelSearchRequest, SDKMemoryKernel
from src.chat.brain_chat.PFC import pfc_KnowledgeFetcher as knowledge_module
from src.memory_system import chat_history_summarizer as summarizer_module
from src.person_info import person_info as person_info_module
from src.services import memory_service as memory_service_module
from src.services.memory_service import memory_service
pytestmark = pytest.mark.skipif(
os.getenv("MAIBOT_RUN_LIVE_MEMORY_TESTS") != "1",
reason="需要显式开启真实 embedding / self-check 集成测试",
)
DATA_FILE = Path(__file__).parent / "data" / "real_dialogues" / "private_alice_weekend.json"
def _load_dialogue_fixture() -> Dict[str, Any]:
return json.loads(DATA_FILE.read_text(encoding="utf-8"))
class _KernelBackedRuntimeManager:
is_running = True
def __init__(self, kernel: SDKMemoryKernel) -> None:
self.kernel = kernel
async def invoke_plugin(
self,
*,
method: str,
plugin_id: str,
component_name: str,
args: Dict[str, Any] | None,
timeout_ms: int,
):
del method, plugin_id, timeout_ms
payload = args or {}
if component_name == "search_memory":
return await self.kernel.search_memory(
KernelSearchRequest(
query=str(payload.get("query", "") or ""),
limit=int(payload.get("limit", 5) or 5),
mode=str(payload.get("mode", "hybrid") or "hybrid"),
chat_id=str(payload.get("chat_id", "") or ""),
person_id=str(payload.get("person_id", "") or ""),
time_start=payload.get("time_start"),
time_end=payload.get("time_end"),
respect_filter=bool(payload.get("respect_filter", True)),
user_id=str(payload.get("user_id", "") or ""),
group_id=str(payload.get("group_id", "") or ""),
)
)
handler = getattr(self.kernel, component_name)
result = handler(**payload)
return await result if inspect.isawaitable(result) else result
async def _wait_for_import_task(task_id: str, *, timeout_seconds: float = 60.0) -> Dict[str, Any]:
deadline = asyncio.get_running_loop().time() + max(1.0, float(timeout_seconds))
while asyncio.get_running_loop().time() < deadline:
detail = await memory_service.import_admin(action="get", task_id=task_id, include_chunks=True)
task = detail.get("task") or {}
status = str(task.get("status", "") or "")
if status in {"completed", "completed_with_errors", "failed", "cancelled"}:
return detail
await asyncio.sleep(0.2)
raise AssertionError(f"导入任务在等待窗口内未结束: {task_id}")
def _join_hit_content(search_result) -> str:
return "\n".join(hit.content for hit in search_result.hits)
@pytest_asyncio.fixture
async def live_dialogue_env(monkeypatch, tmp_path):
scenario = _load_dialogue_fixture()
session_cfg = scenario["session"]
session = SimpleNamespace(
session_id=session_cfg["session_id"],
platform=session_cfg["platform"],
user_id=session_cfg["user_id"],
group_id=session_cfg["group_id"],
)
fake_chat_manager = SimpleNamespace(
get_session_by_session_id=lambda session_id: session if session_id == session.session_id else None,
get_session_name=lambda session_id: session_cfg["display_name"] if session_id == session.session_id else session_id,
)
monkeypatch.setattr(memory_service_module, "get_plugin_runtime_manager", None)
monkeypatch.setattr(summarizer_module, "_chat_manager", fake_chat_manager)
monkeypatch.setattr(knowledge_module, "_chat_manager", fake_chat_manager)
monkeypatch.setattr(person_info_module, "_chat_manager", fake_chat_manager)
data_dir = (tmp_path / "a_memorix_data").resolve()
kernel = SDKMemoryKernel(
plugin_root=tmp_path / "plugin_root",
config={
"storage": {"data_dir": str(data_dir)},
"advanced": {"enable_auto_save": False},
"memory": {"base_decay_interval_hours": 24},
"person_profile": {"refresh_interval_minutes": 5},
},
)
manager = _KernelBackedRuntimeManager(kernel)
monkeypatch.setattr(memory_service_module, "get_plugin_runtime_manager", lambda: manager)
await kernel.initialize()
try:
yield {
"scenario": scenario,
"kernel": kernel,
"session": session,
}
finally:
await kernel.shutdown()
@pytest.mark.asyncio
async def test_live_runtime_self_check_passes(live_dialogue_env):
report = await memory_service.runtime_admin(action="refresh_self_check")
assert report["success"] is True
assert report["report"]["ok"] is True
assert report["report"]["encoded_dimension"] > 0
@pytest.mark.asyncio
async def test_live_import_flow_makes_fixture_searchable(live_dialogue_env):
scenario = live_dialogue_env["scenario"]
created = await memory_service.import_admin(
action="create_paste",
name="private_alice.json",
input_mode="json",
llm_enabled=False,
content=json.dumps(scenario["import_payload"], ensure_ascii=False),
)
assert created["success"] is True
detail = await _wait_for_import_task(created["task"]["task_id"])
assert detail["task"]["status"] == "completed"
search = await memory_service.search(
scenario["search_queries"]["direct"],
mode="search",
respect_filter=False,
)
assert search.hits
joined = _join_hit_content(search)
for keyword in scenario["expectations"]["search_keywords"]:
assert keyword in joined
@pytest.mark.asyncio
async def test_live_summarizer_flow_persists_summary_to_long_term_memory(live_dialogue_env):
scenario = live_dialogue_env["scenario"]
record = scenario["chat_history_record"]
summarizer = summarizer_module.ChatHistorySummarizer(live_dialogue_env["session"].session_id)
await summarizer._import_to_long_term_memory(
record_id=record["record_id"],
theme=record["theme"],
summary=record["summary"],
participants=record["participants"],
start_time=record["start_time"],
end_time=record["end_time"],
original_text=record["original_text"],
)
search = await memory_service.search(
scenario["search_queries"]["direct"],
mode="search",
chat_id=live_dialogue_env["session"].session_id,
)
assert search.hits
joined = _join_hit_content(search)
for keyword in scenario["expectations"]["search_keywords"]:
assert keyword in joined
@pytest.mark.asyncio
async def test_live_person_fact_writeback_is_searchable(live_dialogue_env, monkeypatch):
scenario = live_dialogue_env["scenario"]
class _KnownPerson:
def __init__(self, person_id: str) -> None:
self.person_id = person_id
self.is_known = True
self.person_name = scenario["person"]["person_name"]
monkeypatch.setattr(
person_info_module,
"get_person_id_by_person_name",
lambda person_name: scenario["person"]["person_id"],
)
monkeypatch.setattr(person_info_module, "Person", _KnownPerson)
await person_info_module.store_person_memory_from_answer(
scenario["person"]["person_name"],
scenario["person_fact"]["memory_content"],
live_dialogue_env["session"].session_id,
)
search = await memory_service.search(
scenario["search_queries"]["direct"],
mode="search",
chat_id=live_dialogue_env["session"].session_id,
person_id=scenario["person"]["person_id"],
)
assert search.hits
joined = _join_hit_content(search)
for keyword in scenario["expectations"]["search_keywords"]:
assert keyword in joined
@pytest.mark.asyncio
async def test_live_private_knowledge_fetcher_reads_long_term_memory(live_dialogue_env):
scenario = live_dialogue_env["scenario"]
await memory_service.ingest_text(
external_id="fixture:knowledge_fetcher",
source_type="dialogue_note",
text=scenario["person_fact"]["memory_content"],
chat_id=live_dialogue_env["session"].session_id,
person_ids=[scenario["person"]["person_id"]],
participants=[scenario["person"]["person_name"]],
respect_filter=False,
)
fetcher = knowledge_module.KnowledgeFetcher(
private_name=scenario["session"]["display_name"],
stream_id=live_dialogue_env["session"].session_id,
)
knowledge_text, _ = await fetcher.fetch(scenario["search_queries"]["knowledge_fetcher"], [])
for keyword in scenario["expectations"]["search_keywords"]:
assert keyword in knowledge_text
@pytest.mark.asyncio
async def test_live_person_profile_contains_stable_traits(live_dialogue_env, monkeypatch):
scenario = live_dialogue_env["scenario"]
class _KnownPerson:
def __init__(self, person_id: str) -> None:
self.person_id = person_id
self.is_known = True
self.person_name = scenario["person"]["person_name"]
monkeypatch.setattr(
person_info_module,
"get_person_id_by_person_name",
lambda person_name: scenario["person"]["person_id"],
)
monkeypatch.setattr(person_info_module, "Person", _KnownPerson)
await person_info_module.store_person_memory_from_answer(
scenario["person"]["person_name"],
scenario["person_fact"]["memory_content"],
live_dialogue_env["session"].session_id,
)
profile = await memory_service.get_person_profile(
scenario["person"]["person_id"],
chat_id=live_dialogue_env["session"].session_id,
)
assert profile.evidence
assert any(keyword in profile.summary for keyword in scenario["expectations"]["profile_keywords"])
@pytest.mark.asyncio
async def test_live_summary_flow_generates_queryable_episode(live_dialogue_env):
scenario = live_dialogue_env["scenario"]
record = scenario["chat_history_record"]
summarizer = summarizer_module.ChatHistorySummarizer(live_dialogue_env["session"].session_id)
await summarizer._import_to_long_term_memory(
record_id=record["record_id"],
theme=record["theme"],
summary=record["summary"],
participants=record["participants"],
start_time=record["start_time"],
end_time=record["end_time"],
original_text=record["original_text"],
)
episodes = await memory_service.episode_admin(
action="query",
source=scenario["expectations"]["episode_source"],
limit=5,
)
assert episodes["success"] is True
assert int(episodes["count"]) >= 1

View File

@@ -0,0 +1,279 @@
from fastapi import FastAPI
from fastapi.testclient import TestClient
import pytest
from src.services.memory_service import MemorySearchResult
from src.webui.dependencies import require_auth
from src.webui.routers import memory as memory_router_module
from src.webui.routers.memory import compat_router, router
@pytest.fixture
def client() -> TestClient:
app = FastAPI()
app.dependency_overrides[require_auth] = lambda: "ok"
app.include_router(router)
app.include_router(compat_router)
return TestClient(app)
def test_webui_memory_graph_route(client: TestClient, monkeypatch):
async def fake_graph_admin(*, action: str, **kwargs):
assert action == "get_graph"
return {"success": True, "nodes": [], "edges": [], "total_nodes": 0, "limit": kwargs.get("limit")}
monkeypatch.setattr(memory_router_module.memory_service, "graph_admin", fake_graph_admin)
response = client.get("/api/webui/memory/graph", params={"limit": 77})
assert response.status_code == 200
assert response.json()["success"] is True
assert response.json()["limit"] == 77
def test_compat_aggregate_route(client: TestClient, monkeypatch):
async def fake_search(query: str, **kwargs):
assert kwargs["mode"] == "aggregate"
assert kwargs["respect_filter"] is False
return MemorySearchResult(summary=f"summary:{query}", hits=[])
monkeypatch.setattr(memory_router_module.memory_service, "search", fake_search)
response = client.get("/api/query/aggregate", params={"query": "mai"})
assert response.status_code == 200
assert response.json() == {"success": True, "summary": "summary:mai", "hits": [], "filtered": False}
def test_auto_save_routes(client: TestClient, monkeypatch):
async def fake_runtime_admin(*, action: str, **kwargs):
if action == "get_config":
return {"success": True, "auto_save": True}
if action == "set_auto_save":
return {"success": True, "auto_save": kwargs["enabled"]}
raise AssertionError(action)
monkeypatch.setattr(memory_router_module.memory_service, "runtime_admin", fake_runtime_admin)
get_response = client.get("/api/config/auto_save")
post_response = client.post("/api/config/auto_save", json={"enabled": False})
assert get_response.status_code == 200
assert get_response.json() == {"success": True, "auto_save": True}
assert post_response.status_code == 200
assert post_response.json() == {"success": True, "auto_save": False}
def test_recycle_bin_route(client: TestClient, monkeypatch):
async def fake_get_recycle_bin(*, limit: int):
return {"success": True, "items": [{"hash": "deadbeef"}], "count": 1, "limit": limit}
monkeypatch.setattr(memory_router_module.memory_service, "get_recycle_bin", fake_get_recycle_bin)
response = client.get("/api/memory/recycle_bin", params={"limit": 10})
assert response.status_code == 200
assert response.json()["success"] is True
assert response.json()["count"] == 1
assert response.json()["limit"] == 10
def test_import_guide_route(client: TestClient, monkeypatch):
async def fake_import_admin(*, action: str, **kwargs):
assert kwargs == {}
if action == "get_guide":
return {"success": True}
if action == "get_settings":
return {"success": True, "settings": {"path_aliases": {"raw": "/tmp/raw"}}}
raise AssertionError(action)
monkeypatch.setattr(memory_router_module.memory_service, "import_admin", fake_import_admin)
response = client.get("/api/webui/memory/import/guide")
assert response.status_code == 200
assert response.json()["success"] is True
assert response.json()["source"] == "local"
assert "长期记忆导入说明" in response.json()["content"]
def test_import_upload_route(client: TestClient, monkeypatch, tmp_path):
monkeypatch.setattr(memory_router_module, "STAGING_ROOT", tmp_path)
async def fake_import_admin(*, action: str, **kwargs):
assert action == "create_upload"
staged_files = kwargs["staged_files"]
assert len(staged_files) == 1
assert staged_files[0]["filename"] == "demo.txt"
assert memory_router_module.Path(staged_files[0]["staged_path"]).exists()
return {"success": True, "task_id": "task-1"}
monkeypatch.setattr(memory_router_module.memory_service, "import_admin", fake_import_admin)
response = client.post(
"/api/import/upload",
data={"payload_json": "{\"source\": \"upload\"}"},
files=[("files", ("demo.txt", b"hello world", "text/plain"))],
)
assert response.status_code == 200
assert response.json() == {"success": True, "task_id": "task-1"}
assert list(tmp_path.iterdir()) == []
def test_v5_status_route(client: TestClient, monkeypatch):
async def fake_v5_admin(*, action: str, **kwargs):
assert action == "status"
assert kwargs["target"] == "mai"
return {"success": True, "active_count": 1, "inactive_count": 2, "deleted_count": 3}
monkeypatch.setattr(memory_router_module.memory_service, "v5_admin", fake_v5_admin)
response = client.get("/api/webui/memory/v5/status", params={"target": "mai"})
assert response.status_code == 200
assert response.json()["success"] is True
assert response.json()["deleted_count"] == 3
def test_delete_preview_route(client: TestClient, monkeypatch):
async def fake_delete_admin(*, action: str, **kwargs):
assert action == "preview"
assert kwargs["mode"] == "paragraph"
assert kwargs["selector"] == {"query": "demo"}
return {"success": True, "counts": {"paragraphs": 1}, "dry_run": True}
monkeypatch.setattr(memory_router_module.memory_service, "delete_admin", fake_delete_admin)
response = client.post(
"/api/webui/memory/delete/preview",
json={"mode": "paragraph", "selector": {"query": "demo"}},
)
assert response.status_code == 200
assert response.json() == {"success": True, "counts": {"paragraphs": 1}, "dry_run": True}
def test_episode_process_pending_route(client: TestClient, monkeypatch):
async def fake_episode_admin(*, action: str, **kwargs):
assert action == "process_pending"
assert kwargs == {"limit": 7, "max_retry": 4}
return {"success": True, "processed": 3}
monkeypatch.setattr(memory_router_module.memory_service, "episode_admin", fake_episode_admin)
response = client.post("/api/webui/memory/episodes/process-pending", json={"limit": 7, "max_retry": 4})
assert response.status_code == 200
assert response.json() == {"success": True, "processed": 3}
def test_import_list_route_includes_settings(client: TestClient, monkeypatch):
calls = []
async def fake_import_admin(*, action: str, **kwargs):
calls.append((action, kwargs))
if action == "list":
return {"success": True, "items": [{"task_id": "task-1"}]}
if action == "get_settings":
return {"success": True, "settings": {"path_aliases": {"lpmm": "/tmp/lpmm"}}}
raise AssertionError(action)
monkeypatch.setattr(memory_router_module.memory_service, "import_admin", fake_import_admin)
response = client.get("/api/webui/memory/import/tasks", params={"limit": 9})
assert response.status_code == 200
assert response.json()["items"] == [{"task_id": "task-1"}]
assert response.json()["settings"] == {"path_aliases": {"lpmm": "/tmp/lpmm"}}
assert calls == [("list", {"limit": 9}), ("get_settings", {})]
def test_tuning_profile_route_backfills_settings(client: TestClient, monkeypatch):
calls = []
async def fake_tuning_admin(*, action: str, **kwargs):
calls.append((action, kwargs))
if action == "get_profile":
return {"success": True, "profile": {"retrieval": {"top_k": 8}}}
if action == "get_settings":
return {"success": True, "settings": {"profiles": ["default"]}}
raise AssertionError(action)
monkeypatch.setattr(memory_router_module.memory_service, "tuning_admin", fake_tuning_admin)
response = client.get("/api/webui/memory/retrieval_tuning/profile")
assert response.status_code == 200
assert response.json()["profile"] == {"retrieval": {"top_k": 8}}
assert response.json()["settings"] == {"profiles": ["default"]}
assert calls == [("get_profile", {}), ("get_settings", {})]
def test_tuning_report_route_flattens_report_payload(client: TestClient, monkeypatch):
async def fake_tuning_admin(*, action: str, **kwargs):
assert action == "get_report"
assert kwargs == {"task_id": "task-1", "format": "json"}
return {
"success": True,
"report": {"format": "json", "content": "{\"ok\": true}", "path": "/tmp/report.json"},
}
monkeypatch.setattr(memory_router_module.memory_service, "tuning_admin", fake_tuning_admin)
response = client.get("/api/webui/memory/retrieval_tuning/tasks/task-1/report", params={"format": "json"})
assert response.status_code == 200
assert response.json() == {
"success": True,
"format": "json",
"content": "{\"ok\": true}",
"path": "/tmp/report.json",
"error": "",
}
def test_delete_execute_route(client: TestClient, monkeypatch):
async def fake_delete_admin(*, action: str, **kwargs):
assert action == "execute"
assert kwargs["mode"] == "source"
assert kwargs["selector"] == {"source": "chat_summary:stream-1"}
assert kwargs["reason"] == "cleanup"
assert kwargs["requested_by"] == "tester"
return {"success": True, "operation_id": "del-1"}
monkeypatch.setattr(memory_router_module.memory_service, "delete_admin", fake_delete_admin)
response = client.post(
"/api/webui/memory/delete/execute",
json={
"mode": "source",
"selector": {"source": "chat_summary:stream-1"},
"reason": "cleanup",
"requested_by": "tester",
},
)
assert response.status_code == 200
assert response.json() == {"success": True, "operation_id": "del-1"}
def test_delete_operation_routes(client: TestClient, monkeypatch):
async def fake_delete_admin(*, action: str, **kwargs):
if action == "list_operations":
assert kwargs == {"limit": 5, "mode": "paragraph"}
return {"success": True, "items": [{"operation_id": "del-1"}], "count": 1}
if action == "get_operation":
assert kwargs == {"operation_id": "del-1"}
return {"success": True, "operation": {"operation_id": "del-1", "mode": "paragraph"}}
raise AssertionError(action)
monkeypatch.setattr(memory_router_module.memory_service, "delete_admin", fake_delete_admin)
list_response = client.get("/api/webui/memory/delete/operations", params={"limit": 5, "mode": "paragraph"})
get_response = client.get("/api/webui/memory/delete/operations/del-1")
assert list_response.status_code == 200
assert list_response.json()["count"] == 1
assert get_response.status_code == 200
assert get_response.json()["operation"]["operation_id"] == "del-1"