From c6e9959474a7c26cf8f4ac634e42628c7ed22572 Mon Sep 17 00:00:00 2001 From: DawnARC Date: Thu, 7 May 2026 14:41:57 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E4=BA=BA=E7=89=A9?= =?UTF-8?q?=E7=94=BB=E5=83=8F=E6=B7=B7=E5=85=A5=E8=81=8A=E5=A4=A9=E6=91=98?= =?UTF-8?q?=E8=A6=81=E4=B8=8E=E6=9C=BA=E5=99=A8=E4=BA=BA=E8=BE=93=E5=87=BA?= =?UTF-8?q?=E4=BA=8B=E5=AE=9E=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...test_chat_summary_writeback_integration.py | 29 +---- .../test_memory_flow_service.py | 85 +++++++++---- .../test_person_profile_service.py | 115 +++++++++++++++++ .../core/utils/person_profile_service.py | 70 +++++++++- src/A_memorix/core/utils/summary_importer.py | 2 + src/person_info/person_info.py | 13 +- src/services/memory_flow_service.py | 120 ++++++++++++++++-- 7 files changed, 369 insertions(+), 65 deletions(-) create mode 100644 pytests/A_memorix_test/test_person_profile_service.py diff --git a/pytests/A_memorix_test/test_chat_summary_writeback_integration.py b/pytests/A_memorix_test/test_chat_summary_writeback_integration.py index 24c7ff4f..7618bca7 100644 --- a/pytests/A_memorix_test/test_chat_summary_writeback_integration.py +++ b/pytests/A_memorix_test/test_chat_summary_writeback_integration.py @@ -337,30 +337,11 @@ async def test_text_to_stream_triggers_real_chat_summary_writeback( else None ), ) - monkeypatch.setattr( - memory_flow_service_module.global_config.memory, - "chat_summary_writeback_enabled", - True, - raising=False, - ) - monkeypatch.setattr( - memory_flow_service_module.global_config.memory, - "chat_summary_writeback_message_threshold", - 2, - raising=False, - ) - monkeypatch.setattr( - memory_flow_service_module.global_config.memory, - "chat_summary_writeback_context_length", - 10, - raising=False, - ) - monkeypatch.setattr( - memory_flow_service_module.global_config.memory, - "person_fact_writeback_enabled", - False, - raising=False, - ) + integration_config = memory_flow_service_module.global_config.a_memorix.integration + monkeypatch.setattr(integration_config, "chat_summary_writeback_enabled", True, raising=False) + monkeypatch.setattr(integration_config, "chat_summary_writeback_message_threshold", 2, raising=False) + monkeypatch.setattr(integration_config, "chat_summary_writeback_context_length", 10, raising=False) + monkeypatch.setattr(integration_config, "person_fact_writeback_enabled", False, raising=False) await kernel.initialize() diff --git a/pytests/A_memorix_test/test_memory_flow_service.py b/pytests/A_memorix_test/test_memory_flow_service.py index 4699d0e5..98c9639d 100644 --- a/pytests/A_memorix_test/test_memory_flow_service.py +++ b/pytests/A_memorix_test/test_memory_flow_service.py @@ -5,6 +5,14 @@ import pytest from src.services import memory_flow_service as memory_flow_module +def _fake_global_config(**integration_values): + return SimpleNamespace( + a_memorix=SimpleNamespace( + integration=SimpleNamespace(**integration_values), + ) + ) + + def test_person_fact_parse_fact_list_deduplicates_and_filters_short_items(): raw = '["他喜欢猫", "他喜欢猫", "好", "", "他会弹吉他"]' @@ -38,6 +46,43 @@ def test_person_fact_resolve_target_person_for_private_chat(monkeypatch): assert person.person_id == "qq:123" +@pytest.mark.asyncio +async def test_person_fact_writeback_skips_bot_only_fact_without_user_evidence(monkeypatch): + stored_facts: list[tuple[str, str, str]] = [] + + class FakePerson: + person_id = "person-1" + person_name = "测试用户" + nickname = "测试用户" + is_known = True + + service = memory_flow_module.PersonFactWritebackService.__new__(memory_flow_module.PersonFactWritebackService) + service._resolve_target_person = lambda message: FakePerson() + + async def fake_extract_facts(person, reply_text, user_evidence_text): + del person, reply_text, user_evidence_text + return ["测试用户喜欢辣椒"] + + async def fake_store_person_memory_from_answer(person_name: str, memory_content: str, chat_id: str, **kwargs): + del kwargs + stored_facts.append((person_name, memory_content, chat_id)) + + service._extract_facts = fake_extract_facts + monkeypatch.setattr(memory_flow_module, "store_person_memory_from_answer", fake_store_person_memory_from_answer) + monkeypatch.setattr(memory_flow_module, "find_messages", lambda **kwargs: []) + + message = SimpleNamespace( + processed_plain_text="我记得你喜欢辣椒。", + session_id="session-1", + reply_to="", + session=SimpleNamespace(platform="qq", user_id="bot-1", group_id=""), + ) + + await service._handle_message(message) + + assert stored_facts == [] + + @pytest.mark.asyncio async def test_chat_summary_writeback_service_triggers_when_threshold_reached(monkeypatch): events: list[tuple[str, object]] = [] @@ -45,12 +90,10 @@ async def test_chat_summary_writeback_service_triggers_when_threshold_reached(mo monkeypatch.setattr( memory_flow_module, "global_config", - SimpleNamespace( - memory=SimpleNamespace( - chat_summary_writeback_enabled=True, - chat_summary_writeback_message_threshold=3, - chat_summary_writeback_context_length=7, - ) + _fake_global_config( + chat_summary_writeback_enabled=True, + chat_summary_writeback_message_threshold=3, + chat_summary_writeback_context_length=7, ), ) monkeypatch.setattr(memory_flow_module, "count_messages", lambda **kwargs: 5) @@ -94,12 +137,10 @@ async def test_chat_summary_writeback_service_skips_when_threshold_not_reached(m monkeypatch.setattr( memory_flow_module, "global_config", - SimpleNamespace( - memory=SimpleNamespace( - chat_summary_writeback_enabled=True, - chat_summary_writeback_message_threshold=6, - chat_summary_writeback_context_length=9, - ) + _fake_global_config( + chat_summary_writeback_enabled=True, + chat_summary_writeback_message_threshold=6, + chat_summary_writeback_context_length=9, ), ) monkeypatch.setattr(memory_flow_module, "count_messages", lambda **kwargs: 5) @@ -135,12 +176,10 @@ async def test_chat_summary_writeback_service_restores_previous_trigger_count(mo monkeypatch.setattr( memory_flow_module, "global_config", - SimpleNamespace( - memory=SimpleNamespace( - chat_summary_writeback_enabled=True, - chat_summary_writeback_message_threshold=3, - chat_summary_writeback_context_length=7, - ) + _fake_global_config( + chat_summary_writeback_enabled=True, + chat_summary_writeback_message_threshold=3, + chat_summary_writeback_context_length=7, ), ) monkeypatch.setattr(memory_flow_module, "count_messages", lambda **kwargs: 8) @@ -178,12 +217,10 @@ async def test_chat_summary_writeback_service_falls_back_to_current_count_for_le monkeypatch.setattr( memory_flow_module, "global_config", - SimpleNamespace( - memory=SimpleNamespace( - chat_summary_writeback_enabled=True, - chat_summary_writeback_message_threshold=3, - chat_summary_writeback_context_length=7, - ) + _fake_global_config( + chat_summary_writeback_enabled=True, + chat_summary_writeback_message_threshold=3, + chat_summary_writeback_context_length=7, ), ) monkeypatch.setattr(memory_flow_module, "count_messages", lambda **kwargs: 5) diff --git a/pytests/A_memorix_test/test_person_profile_service.py b/pytests/A_memorix_test/test_person_profile_service.py new file mode 100644 index 00000000..b75beb16 --- /dev/null +++ b/pytests/A_memorix_test/test_person_profile_service.py @@ -0,0 +1,115 @@ +from types import SimpleNamespace + +import pytest + +from src.A_memorix.core.utils.person_profile_service import PersonProfileService + + +class FakeMetadataStore: + def __init__(self) -> None: + self.snapshots: list[dict] = [] + + @staticmethod + def get_latest_person_profile_snapshot(person_id: str): + del person_id + return None + + @staticmethod + def get_relations(**kwargs): + del kwargs + return [] + + @staticmethod + def get_paragraphs_by_source(source: str): + if source == "person_fact:person-1": + return [ + { + "hash": "person-fact-1", + "content": "测试用户喜欢猫。", + "source": source, + "metadata": {"source_type": "person_fact"}, + "created_at": 2.0, + "updated_at": 2.0, + } + ] + return [] + + @staticmethod + def get_paragraph(hash_value: str): + if hash_value == "chat-summary-1": + return { + "hash": hash_value, + "content": "机器人建议测试用户以后叫星灯。", + "source": "chat_summary:session-1", + "metadata": {"source_type": "chat_summary"}, + "word_count": 1, + } + if hash_value == "person-fact-1": + return { + "hash": hash_value, + "content": "测试用户喜欢猫。", + "source": "person_fact:person-1", + "metadata": {"source_type": "person_fact"}, + "word_count": 1, + } + return None + + @staticmethod + def get_paragraph_stale_relation_marks_batch(paragraph_hashes): + del paragraph_hashes + return {} + + @staticmethod + def get_relation_status_batch(relation_hashes): + del relation_hashes + return {} + + @staticmethod + def get_person_profile_override(person_id: str): + del person_id + return None + + def upsert_person_profile_snapshot(self, **kwargs): + self.snapshots.append(kwargs) + return { + "person_id": kwargs["person_id"], + "profile_text": kwargs["profile_text"], + "aliases": kwargs["aliases"], + "relation_edges": kwargs["relation_edges"], + "vector_evidence": kwargs["vector_evidence"], + "evidence_ids": kwargs["evidence_ids"], + "updated_at": 1.0, + "expires_at": kwargs["expires_at"], + "source_note": kwargs["source_note"], + } + + +class FakeRetriever: + async def retrieve(self, query: str, top_k: int): + del query, top_k + return [ + SimpleNamespace( + hash_value="chat-summary-1", + result_type="paragraph", + score=0.95, + content="机器人建议测试用户以后叫星灯。", + metadata={"source_type": "chat_summary"}, + ) + ] + + +@pytest.mark.asyncio +async def test_person_profile_keeps_chat_summary_as_recent_interaction_not_stable_profile(): + metadata_store = FakeMetadataStore() + service = PersonProfileService(metadata_store=metadata_store, retriever=FakeRetriever()) + service.get_person_aliases = lambda person_id: (["测试用户"], "测试用户", []) + + payload = await service.query_person_profile(person_id="person-1", top_k=6, force_refresh=True) + + assert payload["success"] is True + profile_text = payload["profile_text"] + stable_section = profile_text.split("近期相关互动:", 1)[0] + assert "测试用户喜欢猫" in stable_section + assert "星灯" not in stable_section + assert "近期相关互动:" in profile_text + assert "星灯" in profile_text diff --git a/src/A_memorix/core/utils/person_profile_service.py b/src/A_memorix/core/utils/person_profile_service.py index 6215778b..14f3a943 100644 --- a/src/A_memorix/core/utils/person_profile_service.py +++ b/src/A_memorix/core/utils/person_profile_service.py @@ -340,11 +340,51 @@ class PersonProfileService: "type": "paragraph", "score": 1.1, "content": content[:220], - "metadata": {}, + "source": str(row.get("source", "") or source), + "metadata": dict(row.get("metadata", {}) or {}), } ) return self._filter_stale_paragraph_evidence(evidence) + @staticmethod + def _source_type_from_source(source: str) -> str: + token = str(source or "").strip() + if token.startswith("chat_summary:"): + return "chat_summary" + if token.startswith("person_fact:"): + return "person_fact" + return "" + + def _enrich_paragraph_evidence_metadata( + self, + paragraph_hash: str, + metadata: Dict[str, Any], + ) -> Tuple[Dict[str, Any], str]: + merged = dict(metadata or {}) + source = str(merged.get("source", "") or "").strip() + try: + paragraph = self.metadata_store.get_paragraph(paragraph_hash) + except Exception: + paragraph = None + if isinstance(paragraph, dict): + paragraph_metadata = paragraph.get("metadata", {}) or {} + if isinstance(paragraph_metadata, dict): + merged = {**paragraph_metadata, **merged} + source = source or str(paragraph.get("source", "") or "").strip() + source_type = str(merged.get("source_type", "") or "").strip() or self._source_type_from_source(source) + if source_type: + merged["source_type"] = source_type + if source: + merged["source"] = source + return merged, source + + @staticmethod + def _is_chat_summary_evidence(item: Dict[str, Any]) -> bool: + metadata = item.get("metadata", {}) if isinstance(item.get("metadata"), dict) else {} + source_type = str(metadata.get("source_type", "") or "").strip() + source = str(item.get("source", "") or metadata.get("source", "") or "").strip() + return source_type == "chat_summary" or source.startswith("chat_summary:") + def _filter_stale_paragraph_evidence( self, evidence: List[Dict[str, Any]], @@ -417,7 +457,8 @@ class PersonProfileService: "type": "paragraph", "score": 0.0, "content": str(para.get("content", ""))[:180], - "metadata": {}, + "source": str(para.get("source", "") or ""), + "metadata": dict(para.get("metadata", {}) or {}), } ) return self._filter_stale_paragraph_evidence(fallback[:top_k]) @@ -443,13 +484,18 @@ class PersonProfileService: if not h or h in seen_hash: continue seen_hash.add(h) + metadata, source = self._enrich_paragraph_evidence_metadata( + h, + dict(getattr(item, "metadata", {}) or {}), + ) evidence.append( { "hash": h, "type": str(getattr(item, "result_type", "")), "score": float(getattr(item, "score", 0.0) or 0.0), "content": str(getattr(item, "content", "") or "")[:220], - "metadata": dict(getattr(item, "metadata", {}) or {}), + "source": source, + "metadata": metadata, } ) evidence.sort(key=lambda x: x.get("score", 0.0), reverse=True) @@ -475,7 +521,7 @@ class PersonProfileService: lines.append(f"记忆特征: {'; '.join(memory_traits[:6])}") if relation_edges: - lines.append("关系证据:") + lines.append("稳定关系证据:") for rel in relation_edges[:6]: s = rel.get("subject", "") p = rel.get("predicate", "") @@ -483,9 +529,19 @@ class PersonProfileService: conf = float(rel.get("confidence", 0.0)) lines.append(f"- {s} {p} {o} (conf={conf:.2f})") - if vector_evidence: - lines.append("向量证据摘要:") - for item in vector_evidence[:4]: + stable_evidence = [item for item in vector_evidence if not self._is_chat_summary_evidence(item)] + recent_interactions = [item for item in vector_evidence if self._is_chat_summary_evidence(item)] + + if stable_evidence: + lines.append("稳定人物事实:") + for item in stable_evidence[:4]: + content = str(item.get("content", "")).strip() + if content: + lines.append(f"- {content}") + + if recent_interactions: + lines.append("近期相关互动:") + for item in recent_interactions[:2]: content = str(item.get("content", "")).strip() if content: lines.append(f"- {content}") diff --git a/src/A_memorix/core/utils/summary_importer.py b/src/A_memorix/core/utils/summary_importer.py index d2c18ed5..1c30b8df 100644 --- a/src/A_memorix/core/utils/summary_importer.py +++ b/src/A_memorix/core/utils/summary_importer.py @@ -43,6 +43,7 @@ SUMMARY_PROMPT_TEMPLATE = """ 请完成以下任务: 1. **生成总结**:以第三人称或机器人的视角,简洁明了地总结这段对话的主要内容、发生的事件或讨论的主题。 2. **提取实体与关系**:识别并提取对话中提到的重要实体以及它们之间的关系。 +3. **区分事实来源**:用户自己明确表达的稳定人物事实可以记录;机器人发言只能作为上下文,不能单独作为用户画像事实来源。 请严格以 JSON 格式输出,格式如下: {{ @@ -54,6 +55,7 @@ SUMMARY_PROMPT_TEMPLATE = """ }} 注意:总结应具有叙事性,能够作为长程记忆的一部分。直接使用实体的实际名称,不要使用 e1/e2 等代号。 +不要把机器人提出的建议、猜测、玩笑、承诺或复述,写成用户的稳定偏好、身份或长期事实。 """ diff --git a/src/person_info/person_info.py b/src/person_info/person_info.py index 90ac16af..467967e8 100644 --- a/src/person_info/person_info.py +++ b/src/person_info/person_info.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Optional, Union +from typing import List, Optional, Union import hashlib import json @@ -506,7 +506,14 @@ class Person: logger.error(f"同步用户 {self.person_id} 信息到数据库时出错: {e}") -async def store_person_memory_from_answer(person_name: str, memory_content: str, chat_id: str) -> None: +async def store_person_memory_from_answer( + person_name: str, + memory_content: str, + chat_id: str, + *, + evidence_source: str = "user_supported", + evidence_message_ids: Optional[List[str]] = None, +) -> None: """将人物事实写入长期记忆系统。 Args: @@ -569,6 +576,8 @@ async def store_person_memory_from_answer(person_name: str, memory_content: str, "person_id": person_id, "person_name": participant_name, "writeback_source": "memory_flow_service", + "evidence_source": str(evidence_source or "user_supported"), + "evidence_message_ids": evidence_message_ids or [], }, respect_filter=True, user_id=session_user_id, diff --git a/src/services/memory_flow_service.py b/src/services/memory_flow_service.py index 5ef7feb2..8d42a8d9 100644 --- a/src/services/memory_flow_service.py +++ b/src/services/memory_flow_service.py @@ -84,7 +84,12 @@ class PersonFactWritebackService: if target_person is None or not target_person.is_known: return - facts = await self._extract_facts(target_person, reply_text) + user_evidence_messages = self._collect_user_evidence_messages(message, target_person) + if not user_evidence_messages: + return + user_evidence_text = self._format_user_evidence(user_evidence_messages) + + facts = await self._extract_facts(target_person, reply_text, user_evidence_text) if not facts: return @@ -104,8 +109,19 @@ class PersonFactWritebackService: if not person_name: return + evidence_message_ids = [ + str(getattr(item, "message_id", "") or "").strip() + for item in user_evidence_messages + if str(getattr(item, "message_id", "") or "").strip() + ] for fact in facts: - await store_person_memory_from_answer(person_name, fact, session_id) + await store_person_memory_from_answer( + person_name, + fact, + session_id, + evidence_source="user_supported", + evidence_message_ids=evidence_message_ids, + ) def _resolve_target_person(self, message: Any) -> Optional[Person]: session = getattr(message, "session", None) @@ -140,22 +156,110 @@ class PersonFactWritebackService: person = Person(person_id=person_id) return person if person.is_known else None - async def _extract_facts(self, person: Person, reply_text: str) -> List[str]: + def _collect_user_evidence_messages(self, message: Any, person: Person) -> List[Any]: + session = getattr(message, "session", None) + session_id = str( + getattr(message, "session_id", "") + or getattr(session, "session_id", "") + or "" + ).strip() + if not session_id: + return [] + + evidence: List[Any] = [] + seen_ids = set() + + reply_to = str(getattr(message, "reply_to", "") or "").strip() + if reply_to: + try: + replies = find_messages(message_id=reply_to, limit=1) + except Exception as exc: + logger.debug("查询人物事实 reply_to 证据失败: %s", exc) + replies = [] + evidence.extend(self._filter_target_user_messages(replies, person, seen_ids)) + + if evidence: + return evidence[:3] + + timestamp = self._extract_message_timestamp(message) + try: + candidates = find_messages( + session_id=session_id, + before_time=timestamp, + limit=6, + limit_mode="latest", + filter_bot=True, + ) + except Exception as exc: + logger.debug("查询人物事实近期用户证据失败: %s", exc) + return [] + return self._filter_target_user_messages(candidates, person, seen_ids)[:3] + + @staticmethod + def _extract_message_timestamp(message: Any) -> float | None: + raw_timestamp = getattr(message, "timestamp", None) + if hasattr(raw_timestamp, "timestamp") and callable(raw_timestamp.timestamp): + try: + return float(raw_timestamp.timestamp()) + except Exception: + return None + if isinstance(raw_timestamp, (int, float)): + return float(raw_timestamp) + return None + + @staticmethod + def _filter_target_user_messages(messages: List[Any], person: Person, seen_ids: set) -> List[Any]: + filtered: List[Any] = [] + target_person_id = str(getattr(person, "person_id", "") or "").strip() + for item in messages: + platform = str(getattr(item, "platform", "") or "").strip() + user_info = getattr(getattr(item, "message_info", None), "user_info", None) + user_id = str(getattr(user_info, "user_id", "") or getattr(item, "user_id", "") or "").strip() + if not platform or not user_id or is_bot_self(platform, user_id): + continue + if target_person_id and get_person_id(platform, user_id) != target_person_id: + continue + text = str(getattr(item, "processed_plain_text", "") or "").strip() + if not text: + continue + message_id = str(getattr(item, "message_id", "") or "").strip() + dedup_key = message_id or f"{platform}:{user_id}:{text}" + if dedup_key in seen_ids: + continue + seen_ids.add(dedup_key) + filtered.append(item) + return filtered + + @staticmethod + def _format_user_evidence(messages: List[Any]) -> str: + lines: List[str] = [] + for item in messages[:3]: + text = str(getattr(item, "processed_plain_text", "") or "").strip() + if text: + lines.append(f"- {text}") + return "\n".join(lines) + + async def _extract_facts(self, person: Person, reply_text: str, user_evidence_text: str) -> List[str]: person_name = str(getattr(person, "person_name", "") or getattr(person, "nickname", "") or person.person_id) - prompt = f"""你要从一条机器人刚刚发送的回复中,提取“关于{person_name}的稳定事实”。 + prompt = f"""你要从用户原始发言中提取“关于{person_name}的稳定事实”。 目标人物:{person_name} +用户原始发言证据: +{user_evidence_text} + 机器人回复: {reply_text} 请只提取满足以下条件的事实: -1. 明确是关于目标人物本人的信息。 -2. 具有相对稳定性,可以作为长期记忆保存。 -3. 用简洁中文陈述句表达。 -4. 如果回复是在直接对目标人物说话,出现“你/你的/你自己”时,默认都指目标人物,请先改写成关于目标人物的第三人称事实再输出。 +1. 必须能被“用户原始发言证据”直接支持,不能只来自机器人回复。 +2. 明确是关于目标人物本人的信息。 +3. 具有相对稳定性,可以作为长期记忆保存。 +4. 用简洁中文陈述句表达。 +5. 如果用户原始发言中出现“我/我的/自己”,默认指目标人物,请先改写成关于目标人物的第三人称事实再输出。 不要提取: - 机器人的情绪、计划、临时动作、客套话 +- 仅由机器人提出的建议、猜测、玩笑、回忆或承诺 - 只适用于当前时刻的短期安排 - 不确定、猜测、反问 - 与目标人物无关的信息 From 340d5c1d6c19b8114acfeba568071f7c17ced3de Mon Sep 17 00:00:00 2001 From: DawnARC Date: Fri, 8 May 2026 14:47:47 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix=EF=BC=9A=E4=BC=98=E5=8C=96=E4=BA=BA?= =?UTF-8?q?=E7=89=A9=E7=94=BB=E5=83=8F=E6=9D=A5=E6=BA=90=E9=99=90=E5=88=B6?= =?UTF-8?q?=EF=BC=8C=E5=B9=B6=E6=94=B9=E8=BF=9B=E6=A8=A1=E5=9E=8B=E9=80=89?= =?UTF-8?q?=E6=8B=A9=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 对关系/向量段落证据进行过滤,强制每条记录显式绑定至给定的 person_id,从而避免因别名导致的跨人员误召回。 同时新增辅助工具,用于安全处理元数据和列表,并提供证据绑定检查; 调整证据收集顺序,在去重之前先验证绑定关系。从元数据存储返回段落时,统一采用游标配合 _row_to_dict 方法。 全面改版摘要模型解析逻辑:允许接收字符串或列表形式的选择器,将选定的模型映射到任务级配置(返回 task_name 与 TaskConfig),从 config_manager 读取当前模型字典,并增加详细的日志记录与回退处理 --- src/A_memorix/core/storage/metadata_store.py | 4 +- .../core/utils/person_profile_service.py | 115 +++++++++++++++--- src/A_memorix/core/utils/summary_importer.py | 88 +++++++++----- 3 files changed, 161 insertions(+), 46 deletions(-) diff --git a/src/A_memorix/core/storage/metadata_store.py b/src/A_memorix/core/storage/metadata_store.py index 1ff30d46..0a29138d 100644 --- a/src/A_memorix/core/storage/metadata_store.py +++ b/src/A_memorix/core/storage/metadata_store.py @@ -2612,7 +2612,9 @@ class MetadataStore: Returns: 段落列表 """ - return self.query("SELECT * FROM paragraphs WHERE source = ?", (source,)) + cursor = self._conn.cursor() + cursor.execute("SELECT * FROM paragraphs WHERE source = ?", (source,)) + return [self._row_to_dict(row, "paragraph") for row in cursor.fetchall()] def get_all_sources(self) -> List[Dict[str, Any]]: """ diff --git a/src/A_memorix/core/utils/person_profile_service.py b/src/A_memorix/core/utils/person_profile_service.py index 14f3a943..081eaa66 100644 --- a/src/A_memorix/core/utils/person_profile_service.py +++ b/src/A_memorix/core/utils/person_profile_service.py @@ -283,7 +283,13 @@ class PersonProfileService: logger.warning(f"解析人物别名失败: person_id={person_id}, err={e}") return aliases, primary_name, memory_traits - def _collect_relation_evidence(self, aliases: List[str], limit: int = 30) -> List[Dict[str, Any]]: + def _collect_relation_evidence( + self, + aliases: List[str], + limit: int = 30, + *, + person_id: str = "", + ) -> List[Dict[str, Any]]: relation_by_hash: Dict[str, Dict[str, Any]] = {} for alias in aliases: for rel in self.metadata_store.get_relations(subject=alias, include_inactive=False): @@ -296,6 +302,12 @@ class PersonProfileService: relation_by_hash[h] = rel relations = list(relation_by_hash.values()) + if person_id: + relations = [ + rel + for rel in relations + if self._is_relation_bound_to_person(rel, person_id=person_id) + ] relations.sort(key=lambda item: float(item.get("confidence", 0.0)), reverse=True) relations = relations[: max(1, int(limit))] @@ -312,6 +324,38 @@ class PersonProfileService: ) return edges + def _is_relation_bound_to_person( + self, + relation: Dict[str, Any], + *, + person_id: str, + ) -> bool: + pid = str(person_id or "").strip() + if not pid: + return False + + metadata = self._metadata_dict(relation.get("metadata")) + if str(metadata.get("person_id", "") or "").strip() == pid: + return True + if pid in self._list_tokens(metadata.get("person_ids")): + return True + + source_paragraph = str(relation.get("source_paragraph", "") or "").strip() + if source_paragraph: + try: + paragraph = self.metadata_store.get_paragraph(source_paragraph) + except Exception: + paragraph = None + if isinstance(paragraph, dict): + payload = { + "hash": source_paragraph, + "source": str(paragraph.get("source", "") or ""), + "metadata": self._metadata_dict(paragraph.get("metadata")), + } + return self._is_evidence_bound_to_person(payload, person_id=pid) + + return False + def _collect_person_fact_evidence(self, person_id: str, limit: int = 4) -> List[Dict[str, Any]]: token = str(person_id or "").strip() if not token: @@ -346,6 +390,42 @@ class PersonProfileService: ) return self._filter_stale_paragraph_evidence(evidence) + @staticmethod + def _metadata_dict(value: Any) -> Dict[str, Any]: + return dict(value) if isinstance(value, dict) else {} + + @staticmethod + def _list_tokens(value: Any) -> List[str]: + if value is None: + return [] + if isinstance(value, (list, tuple, set)): + return [str(item or "").strip() for item in value if str(item or "").strip()] + token = str(value or "").strip() + return [token] if token else [] + + def _is_evidence_bound_to_person( + self, + item: Dict[str, Any], + *, + person_id: str, + ) -> bool: + """画像证据必须显式绑定到 person_id,避免别名全局召回串人。""" + pid = str(person_id or "").strip() + if not pid: + return False + + metadata = self._metadata_dict(item.get("metadata")) + source = str(item.get("source", "") or metadata.get("source", "") or "").strip() + if source == f"person_fact:{pid}": + return True + + if str(metadata.get("person_id", "") or "").strip() == pid: + return True + if pid in self._list_tokens(metadata.get("person_ids")): + return True + + return False + @staticmethod def _source_type_from_source(source: str) -> str: token = str(source or "").strip() @@ -360,7 +440,7 @@ class PersonProfileService: paragraph_hash: str, metadata: Dict[str, Any], ) -> Tuple[Dict[str, Any], str]: - merged = dict(metadata or {}) + merged = self._metadata_dict(metadata) source = str(merged.get("source", "") or "").strip() try: paragraph = self.metadata_store.get_paragraph(paragraph_hash) @@ -458,9 +538,11 @@ class PersonProfileService: "score": 0.0, "content": str(para.get("content", ""))[:180], "source": str(para.get("source", "") or ""), - "metadata": dict(para.get("metadata", {}) or {}), + "metadata": self._metadata_dict(para.get("metadata")), } ) + if not self._is_evidence_bound_to_person(fallback[-1], person_id=person_id): + fallback.pop() return self._filter_stale_paragraph_evidence(fallback[:top_k]) per_alias_top_k = max(2, int(top_k / max(1, len(alias_queries)))) @@ -483,21 +565,22 @@ class PersonProfileService: h = str(getattr(item, "hash_value", "") or "") if not h or h in seen_hash: continue - seen_hash.add(h) metadata, source = self._enrich_paragraph_evidence_metadata( h, - dict(getattr(item, "metadata", {}) or {}), - ) - evidence.append( - { - "hash": h, - "type": str(getattr(item, "result_type", "")), - "score": float(getattr(item, "score", 0.0) or 0.0), - "content": str(getattr(item, "content", "") or "")[:220], - "source": source, - "metadata": metadata, - } + self._metadata_dict(getattr(item, "metadata", {})), ) + payload = { + "hash": h, + "type": str(getattr(item, "result_type", "")), + "score": float(getattr(item, "score", 0.0) or 0.0), + "content": str(getattr(item, "content", "") or "")[:220], + "source": source, + "metadata": metadata, + } + if not self._is_evidence_bound_to_person(payload, person_id=person_id): + continue + seen_hash.add(h) + evidence.append(payload) evidence.sort(key=lambda x: x.get("score", 0.0), reverse=True) return self._filter_stale_paragraph_evidence(evidence[:top_k]) @@ -640,7 +723,7 @@ class PersonProfileService: if not aliases and person_keyword: aliases = [person_keyword.strip()] primary_name = person_keyword.strip() - relation_edges = self._collect_relation_evidence(aliases, limit=max(10, top_k * 2)) + relation_edges = self._collect_relation_evidence(aliases, limit=max(10, top_k * 2), person_id=pid) vector_evidence = await self._collect_vector_evidence(aliases, top_k=max(4, top_k), person_id=pid) evidence_ids = [ diff --git a/src/A_memorix/core/utils/summary_importer.py b/src/A_memorix/core/utils/summary_importer.py index 1c30b8df..0098aa76 100644 --- a/src/A_memorix/core/utils/summary_importer.py +++ b/src/A_memorix/core/utils/summary_importer.py @@ -16,7 +16,7 @@ import traceback from src.common.logger import get_logger from src.services import llm_service as llm_api from src.services import message_service as message_api -from src.config.config import global_config, model_config as host_model_config +from src.config.config import config_manager, global_config from src.config.model_configs import TaskConfig from ..storage import ( @@ -150,14 +150,20 @@ class SummaryImporter: return True def _normalize_summary_model_selectors(self, raw_value: Any) -> List[str]: - """标准化 summarization.model_name 配置(vNext 仅接受字符串数组)。""" + """标准化 summarization.model_name 配置。""" if raw_value is None: return ["auto"] if isinstance(raw_value, list): selectors = [str(x).strip() for x in raw_value if str(x).strip()] return selectors or ["auto"] + if isinstance(raw_value, str): + selector = raw_value.strip() + if selector: + logger.warning("summarization.model_name 建议使用 List[str],当前字符串配置已兼容处理。") + return [selector] + return ["auto"] raise ValueError( - "summarization.model_name 在 vNext 必须为 List[str]。" + "summarization.model_name 必须为 List[str] 或 str。" " 请执行 scripts/release_vnext_migrate.py migrate。" ) @@ -182,9 +188,17 @@ class SummaryImporter: return None, None - def _resolve_summary_model_config(self) -> Optional[TaskConfig]: + @staticmethod + def _current_model_dict() -> Dict[str, Any]: + try: + return getattr(config_manager.get_model_config(), "models_dict", {}) or {} + except Exception as exc: + logger.warning(f"读取当前模型字典失败: {exc}") + return {} + + def _resolve_summary_model_config(self) -> Optional[Tuple[str, TaskConfig]]: """ - 解析 summarization.model_name 为 TaskConfig。 + 解析 summarization.model_name 为 (task_name, TaskConfig)。 支持: - "auto" - "replyer"(任务名) @@ -201,14 +215,16 @@ class SummaryImporter: selectors = self._normalize_summary_model_selectors(raw_cfg) default_task_name, default_task_cfg = self._pick_default_summary_task(available_tasks) - selected_models: List[str] = [] base_cfg: Optional[TaskConfig] = None - model_dict = getattr(host_model_config, "models_dict", {}) + base_task_name: Optional[str] = None + model_dict = self._current_model_dict() - def _append_models(models: List[str]): - for model_name in models: - if model_name and model_name not in selected_models: - selected_models.append(model_name) + def _find_task_for_model(model_name: str) -> Tuple[Optional[str], Optional[TaskConfig]]: + for task_name, task_cfg in available_tasks.items(): + task_models = [str(item).strip() for item in (getattr(task_cfg, "model_list", []) or []) if str(item).strip()] + if model_name in task_models: + return task_name, task_cfg + return None, None for raw_selector in selectors: selector = raw_selector.strip() @@ -217,9 +233,9 @@ class SummaryImporter: if selector.lower() == "auto": if default_task_cfg: - _append_models(default_task_cfg.model_list) if base_cfg is None: base_cfg = default_task_cfg + base_task_name = default_task_name continue if ":" in selector: @@ -233,47 +249,60 @@ class SummaryImporter: if base_cfg is None: base_cfg = task_cfg + base_task_name = task_name if not model_name or model_name.lower() == "auto": - _append_models(task_cfg.model_list) continue - if model_name in model_dict or model_name in task_cfg.model_list: - _append_models([model_name]) + if model_name in task_cfg.model_list: + logger.info( + f"总结模型选择器 '{selector}' 已定位到任务 '{task_name}';" + "当前 LLM 服务按任务候选列表执行,不单独覆盖具体模型。" + ) else: - logger.warning(f"总结模型选择器 '{selector}' 的模型 '{model_name}' 不存在,已跳过") + logger.warning(f"总结模型选择器 '{selector}' 的模型 '{model_name}' 不在任务 '{task_name}' 中,已跳过") continue task_cfg = available_tasks.get(selector) if task_cfg: - _append_models(task_cfg.model_list) if base_cfg is None: base_cfg = task_cfg + base_task_name = selector continue if selector in model_dict: - _append_models([selector]) + task_name, task_cfg = _find_task_for_model(selector) + if task_name and task_cfg: + if base_cfg is None: + base_cfg = task_cfg + base_task_name = task_name + logger.info( + f"总结模型选择器 '{selector}' 已映射到任务 '{task_name}';" + "当前 LLM 服务按任务候选列表执行,不单独覆盖具体模型。" + ) + continue + logger.warning(f"总结模型选择器 '{selector}' 未归属于任何任务,已跳过") continue logger.warning(f"总结模型选择器 '{selector}' 无法识别,已跳过") - if not selected_models: + if base_cfg is None or not base_task_name: if default_task_cfg: - _append_models(default_task_cfg.model_list) if base_cfg is None: base_cfg = default_task_cfg + base_task_name = default_task_name else: - first_cfg = next(iter(available_tasks.values())) - _append_models(first_cfg.model_list) + base_task_name, first_cfg = next(iter(available_tasks.items())) if base_cfg is None: base_cfg = first_cfg - if not selected_models: + if base_cfg is None or not base_task_name: return None - template_cfg = base_cfg or default_task_cfg or next(iter(available_tasks.values())) - return TaskConfig( - model_list=selected_models, + template_cfg = base_cfg + task_name_to_use = base_task_name + return task_name_to_use, TaskConfig( + model_list=list(template_cfg.model_list), max_tokens=template_cfg.max_tokens, temperature=template_cfg.temperature, slow_threshold=template_cfg.slow_threshold, @@ -343,12 +372,13 @@ class SummaryImporter: chat_history=chat_history_text ) - model_config_to_use = self._resolve_summary_model_config() - if model_config_to_use is None: + resolved_model = self._resolve_summary_model_config() + if resolved_model is None: return False, "未找到可用的总结模型配置" - task_name_to_use = llm_api.resolve_task_name_from_model_config(model_config_to_use) + task_name_to_use, model_config_to_use = resolved_model logger.info(f"正在为流 {stream_id} 执行总结,消息条数: {len(messages)}") + logger.info(f"总结模型任务: {task_name_to_use}") logger.info(f"总结模型候选列表: {model_config_to_use.model_list}") result = await llm_api.generate(