diff --git a/.gitignore b/.gitignore index 6d8b249f..c5a687ca 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,10 @@ data/ +!pytests/A_memorix_test/data/ +!pytests/A_memorix_test/data/benchmarks/ +!pytests/A_memorix_test/data/benchmarks/long_novel_memory_benchmark.json +!pytests/A_memorix_test/data/real_dialogues/ +!pytests/A_memorix_test/data/real_dialogues/private_alice_weekend.json +pytests/A_memorix_test/data/benchmarks/results/ data1/ mongodb/ NapCat.Framework.Windows.Once/ diff --git a/src/chat/brain_chat/PFC/pfc_KnowledgeFetcher.py b/src/chat/brain_chat/PFC/pfc_KnowledgeFetcher.py index 3136f8be..fe875540 100644 --- a/src/chat/brain_chat/PFC/pfc_KnowledgeFetcher.py +++ b/src/chat/brain_chat/PFC/pfc_KnowledgeFetcher.py @@ -3,10 +3,6 @@ from typing import Any, Dict, List, Tuple from src.chat.message_receive.chat_manager import chat_manager as _chat_manager from src.common.logger import get_logger -# NOTE: HippocampusManager doesn't exist in v0.12.2 - memory system was redesigned -# from src.plugins.memory_system.Hippocampus import HippocampusManager -from src.config.config import model_config -from src.llm_models.utils_model import LLMRequest from src.person_info.person_info import resolve_person_id_for_memory from src.services.memory_service import memory_service @@ -17,7 +13,6 @@ class KnowledgeFetcher: """知识调取器""" def __init__(self, private_name: str, stream_id: str): - self.llm = LLMRequest(model_set=model_config.model_task_config.utils) self.private_name = private_name self.stream_id = stream_id diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 33a66ffc..8bc06ed0 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -325,6 +325,13 @@ class ChatBot: scope=scope, ) # 确保会话存在 + try: + from src.services.memory_flow_service import memory_automation_service + + await memory_automation_service.on_incoming_message(message) + except Exception as exc: + logger.warning(f"[{session_id}] 长期记忆自动摘要注册失败: {exc}") + # message.update_chat_stream(chat) # 命令处理 - 使用新插件系统检查并处理命令。 diff --git a/src/common/message_repository.py b/src/common/message_repository.py index 94d7bfea..98799738 100644 --- a/src/common/message_repository.py +++ b/src/common/message_repository.py @@ -189,39 +189,37 @@ def find_messages( conditions.append(Messages.is_command == False) # noqa: E712 statement = select(Messages).where(*conditions) - if limit > 0: - if limit_mode == "earliest": - statement = statement.order_by(col(Messages.timestamp)).limit(limit) - with get_db_session() as session: + with get_db_session(auto_commit=False) as session: + if limit > 0: + if limit_mode == "earliest": + statement = statement.order_by(col(Messages.timestamp)).limit(limit) results = list(session.exec(statement).all()) + else: + statement = statement.order_by(col(Messages.timestamp).desc()).limit(limit) + results = list(session.exec(statement).all()) + results = list(reversed(results)) else: - statement = statement.order_by(col(Messages.timestamp).desc()).limit(limit) - with get_db_session() as session: - results = list(session.exec(statement).all()) - results = list(reversed(results)) - else: - if sort: - order_terms: list[Any] = [] - for field_name, direction in sort: - sort_field = _resolve_field(field_name) - if sort_field is None: - logger.warning(f"排序字段 '{field_name}' 在 Messages 模型中未找到。将跳过此排序条件。") - continue - order_terms.append(sort_field.asc() if direction == 1 else sort_field.desc()) - if order_terms: - statement = statement.order_by(*order_terms) - with get_db_session() as session: + if sort: + order_terms: list[Any] = [] + for field_name, direction in sort: + sort_field = _resolve_field(field_name) + if sort_field is None: + logger.warning(f"排序字段 '{field_name}' 在 Messages 模型中未找到。将跳过此排序条件。") + continue + order_terms.append(sort_field.asc() if direction == 1 else sort_field.desc()) + if order_terms: + statement = statement.order_by(*order_terms) results = list(session.exec(statement).all()) - if filter_intercept_message_level is not None: - filtered_results = [] - for msg in results: - config = _parse_additional_config(msg) - if config.get("intercept_message_level", 0) <= filter_intercept_message_level: - filtered_results.append(msg) - results = filtered_results + if filter_intercept_message_level is not None: + filtered_results = [] + for msg in results: + config = _parse_additional_config(msg) + if config.get("intercept_message_level", 0) <= filter_intercept_message_level: + filtered_results.append(msg) + results = filtered_results - return [_message_to_instance(msg) for msg in results] + return [_message_to_instance(msg) for msg in results] except Exception as e: log_message = ( "使用 SQLModel 查找消息失败 " diff --git a/src/learners/jargon_explainer_old.py b/src/learners/jargon_explainer_old.py index 876b4539..330da8cb 100644 --- a/src/learners/jargon_explainer_old.py +++ b/src/learners/jargon_explainer_old.py @@ -8,7 +8,7 @@ from src.common.data_models.llm_service_data_models import LLMGenerationOptions from src.services.llm_service import LLMServiceClient from src.config.config import global_config from src.prompt.prompt_manager import prompt_manager -from src.learners.jargon_miner_old import search_jargon +from src.learners.jargon_explainer import search_jargon from src.learners.learner_utils_old import ( is_bot_message, contains_bot_self_name, diff --git a/src/person_info/person_info.py b/src/person_info/person_info.py index c603f4b7..0838156a 100644 --- a/src/person_info/person_info.py +++ b/src/person_info/person_info.py @@ -18,6 +18,7 @@ from src.common.database.database import get_db_session from src.common.database.database_model import PersonInfo from src.common.logger import get_logger from src.config.config import global_config +from src.services.memory_service import memory_service from src.services.llm_service import LLMServiceClient @@ -66,15 +67,45 @@ def get_person_id(platform: str, user_id: Union[int, str]) -> str: def get_person_id_by_person_name(person_name: str) -> str: """根据用户名获取用户ID""" try: - with get_db_session() as session: - statement = select(PersonInfo).where(col(PersonInfo.person_name) == person_name).limit(1) - record = session.exec(statement).first() - return record.person_id if record else "" + with get_db_session(auto_commit=False) as session: + statement = select(PersonInfo.person_id).where(col(PersonInfo.person_name) == person_name).limit(1) + person_id = session.exec(statement).first() + return str(person_id) if person_id else "" except Exception as e: logger.error(f"根据用户名 {person_name} 获取用户ID时出错: {e}") return "" +def resolve_person_id_for_memory( + *, + person_name: str = "", + platform: str = "", + user_id: Union[int, str, None] = None, + strict_known: bool = False, +) -> str: + """解析长期记忆检索/写入使用的人物 ID。 + + 解析顺序: + 1. 优先按 `person_name` 映射数据库中的 `person_id` + 2. 回退到 `platform + user_id` 生成稳定 `person_id` + 3. 若 `strict_known=True`,则要求该 `person_id` 已被认识 + """ + clean_name = str(person_name or "").strip() + if clean_name: + if by_name := get_person_id_by_person_name(clean_name): + return by_name + + clean_platform = str(platform or "").strip() + clean_user_id = str(user_id or "").strip() + if clean_platform and clean_user_id: + candidate = get_person_id(clean_platform, clean_user_id) + if strict_known and not is_person_known(person_id=candidate): + return "" + return candidate + + return "" + + def is_person_known( person_id: Optional[str] = None, user_id: Optional[str] = None, @@ -800,75 +831,83 @@ person_info_manager = PersonInfoManager() async def store_person_memory_from_answer(person_name: str, memory_content: str, chat_id: str) -> None: - """将人物信息存入person_info的memory_points + """将人物事实写入长期记忆系统。 Args: person_name: 人物名称 memory_content: 记忆内容 chat_id: 聊天ID """ + clean_content = str(memory_content or "").strip() + if not clean_content: + logger.debug("人物事实写回跳过:memory_content 为空") + return + + clean_chat_id = str(chat_id or "").strip() + if not clean_chat_id: + logger.warning("人物事实写回失败:chat_id 为空") + return + + clean_person_name = str(person_name or "").strip() try: # 从 chat_id 获取 session - session = _chat_manager.get_session_by_session_id(chat_id) + session = _chat_manager.get_session_by_session_id(clean_chat_id) if not session: - logger.warning(f"无法获取session for chat_id: {chat_id}") + logger.warning(f"无法获取session for chat_id: {clean_chat_id}") return - platform = session.platform - - # 尝试从person_name查找person_id - # 首先尝试通过person_name查找 - person_id = get_person_id_by_person_name(person_name) + session_platform = str(getattr(session, "platform", "") or "").strip() + session_user_id = str(getattr(session, "user_id", "") or "").strip() + session_group_id = str(getattr(session, "group_id", "") or "").strip() + person_id = resolve_person_id_for_memory( + person_name=clean_person_name, + platform=session_platform, + user_id=session_user_id, + ) if not person_id: - # 如果通过person_name找不到,尝试从 session 获取 user_id - if platform and session.user_id: - user_id = session.user_id - person_id = get_person_id(platform, user_id) - else: - logger.warning(f"无法确定person_id for person_name: {person_name}, chat_id: {chat_id}") - return - - # 创建或获取Person对象 - person = Person(person_id=person_id) - - if not person.is_known: - logger.warning(f"用户 {person_name} (person_id: {person_id}) 尚未认识,无法存储记忆") + logger.warning(f"无法确定person_id for person_name: {clean_person_name}, chat_id: {clean_chat_id}") return - # 确定记忆分类(可以根据memory_content判断,这里使用通用分类) - category = "其他" # 默认分类,可以根据需要调整 + person = Person(person_id=person_id) + if not person.is_known: + logger.warning(f"用户 {clean_person_name or person_id} (person_id: {person_id}) 尚未认识,跳过写回") + return - # 记忆点格式:category:content:weight - weight = "1.0" # 默认权重 - memory_point = f"{category}:{memory_content}:{weight}" + participant_name = str(getattr(person, "person_name", "") or getattr(person, "nickname", "") or "").strip() + if not participant_name: + participant_name = clean_person_name or person_id - # 添加到memory_points - if not person.memory_points: - person.memory_points = [] + payload_fingerprint = hashlib.md5(f"{person_id}|{clean_chat_id}|{clean_content}".encode()).hexdigest() + external_id = f"person_fact:{person_id}:{payload_fingerprint}" - # 检查是否已存在相似的记忆点(避免重复) - is_duplicate = False - for existing_point in person.memory_points: - if existing_point and isinstance(existing_point, str): - parts = existing_point.split(":", 2) - if len(parts) >= 2: - existing_content = parts[1].strip() - # 简单相似度检查(如果内容相同或非常相似,则跳过) - if ( - existing_content == memory_content - or memory_content in existing_content - or existing_content in memory_content - ): - is_duplicate = True - break + result = await memory_service.ingest_text( + external_id=external_id, + source_type="person_fact", + text=clean_content, + chat_id=clean_chat_id, + person_ids=[person_id], + participants=[participant_name], + tags=["person_fact"], + metadata={ + "person_id": person_id, + "person_name": participant_name, + "writeback_source": "memory_flow_service", + }, + respect_filter=True, + user_id=session_user_id, + group_id=session_group_id, + ) - if not is_duplicate: - person.memory_points.append(memory_point) - person.sync_to_database() - logger.info(f"成功添加记忆点到 {person_name} (person_id: {person_id}): {memory_point}") + if getattr(result, "success", False): + logger.info( + f"成功写回人物事实到长期记忆: person={participant_name} person_id={person_id} chat_id={clean_chat_id}" + ) else: - logger.debug(f"记忆点已存在,跳过: {memory_point}") + logger.warning( + f"人物事实写回长期记忆失败: person={participant_name} person_id={person_id} " + f"chat_id={clean_chat_id} detail={getattr(result, 'detail', '')}" + ) except Exception as e: logger.error(f"存储人物记忆失败: {e}") diff --git a/src/plugin_runtime/capabilities/data.py b/src/plugin_runtime/capabilities/data.py index 32843d09..1acd33d3 100644 --- a/src/plugin_runtime/capabilities/data.py +++ b/src/plugin_runtime/capabilities/data.py @@ -671,10 +671,30 @@ class RuntimeDataCapabilityMixin: except (TypeError, ValueError): limit_value = 5 + mode = str(args.get("mode", "search") or "search").strip() or "search" + chat_id = str(args.get("chat_id", "") or "").strip() + person_id = str(args.get("person_id", "") or "").strip() + user_id = str(args.get("user_id", "") or "").strip() + group_id = str(args.get("group_id", "") or "").strip() + respect_filter = bool(args.get("respect_filter", True)) + time_start = args.get("time_start") + time_end = args.get("time_end") + try: from src.services.memory_service import memory_service - result = await memory_service.search(query, limit=limit_value) + result = await memory_service.search( + query, + limit=limit_value, + mode=mode, + chat_id=chat_id, + person_id=person_id, + time_start=time_start, + time_end=time_end, + respect_filter=respect_filter, + user_id=user_id, + group_id=group_id, + ) if not result.success: return {"success": False, "error": result.error or "长期记忆检索失败"} knowledge_info = result.to_text(limit=limit_value) diff --git a/src/services/memory_flow_service.py b/src/services/memory_flow_service.py index 96062eb6..c95bcc69 100644 --- a/src/services/memory_flow_service.py +++ b/src/services/memory_flow_service.py @@ -178,6 +178,7 @@ class PersonFactWritebackService: 1. 明确是关于目标人物本人的信息。 2. 具有相对稳定性,可以作为长期记忆保存。 3. 用简洁中文陈述句表达。 +4. 如果回复是在直接对目标人物说话,出现“你/你的/你自己”时,默认都指目标人物,请先改写成关于目标人物的第三人称事实再输出。 不要提取: - 机器人的情绪、计划、临时动作、客套话 diff --git a/src/services/send_service.py b/src/services/send_service.py index d7f17563..5b0d60f8 100644 --- a/src/services/send_service.py +++ b/src/services/send_service.py @@ -434,6 +434,21 @@ def _store_sent_message(message: SessionMessage) -> None: MessageUtils.store_message_to_db(message) +async def _notify_memory_automation_on_message_sent(message: SessionMessage) -> None: + """在发送成功后通知长期记忆自动化服务。 + + Args: + message: 已成功发送的内部消息对象。 + """ + try: + from src.services.memory_flow_service import memory_automation_service + + await memory_automation_service.on_message_sent(message) + except Exception as exc: + session_id = message.session_id or "unknown-session" + logger.warning(f"[{session_id}] 长期记忆人物事实写回注册失败: {exc}") + + def _log_platform_io_failures(delivery_batch: DeliveryBatch) -> None: """输出 Platform IO 批量发送失败详情。 @@ -503,6 +518,7 @@ async def _send_via_platform_io( if delivery_batch.has_success: if storage_message: _store_sent_message(message) + await _notify_memory_automation_on_message_sent(message) if show_log: successful_driver_ids = [ receipt.driver_id or "unknown"