From 3b5baf901a080a3f0670430bcefaf22447cf82e1 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Tue, 7 Apr 2026 15:15:37 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4=E6=AE=8B=E7=95=99=E7=9A=84Kn?= =?UTF-8?q?owU=E7=B3=BB=E7=BB=9F=EF=BC=8C=E4=BF=AE=E5=A4=8Dgemini=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E7=9A=84=E6=80=9D=E8=80=83=E7=AD=BE=E5=90=8D=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 - pytests/test_gemini_thought_signatures.py | 72 ++ .../data_models/llm_service_data_models.py | 1 + src/common/database/database_model.py | 16 - .../database/migrations/frozen_v2_schema.py | 16 - src/config/config.py | 2 +- src/config/official_configs.py | 9 - src/core/types.py | 3 + src/know_u/__init__.py | 3 - src/know_u/knowledge.py | 363 ------ src/know_u/knowledge_store.py | 370 ------ src/llm_models/payload_content/tool_option.py | 1 + src/maisaka/builtin_tool/query_person_info.py | 54 - src/maisaka/chat_loop_service.py | 36 - src/maisaka/context_messages.py | 1 - src/maisaka/runtime.py | 46 +- src/memory_system/chat_history_summarizer.py | 1123 ----------------- src/plugin_runtime/hook_payloads.py | 3 + src/services/llm_service.py | 2 + 19 files changed, 88 insertions(+), 2034 deletions(-) create mode 100644 pytests/test_gemini_thought_signatures.py delete mode 100644 src/know_u/__init__.py delete mode 100644 src/know_u/knowledge.py delete mode 100644 src/know_u/knowledge_store.py delete mode 100644 src/memory_system/chat_history_summarizer.py diff --git a/.gitignore b/.gitignore index ac2a837a..c5a687ca 100644 --- a/.gitignore +++ b/.gitignore @@ -6,7 +6,6 @@ data/ !pytests/A_memorix_test/data/real_dialogues/private_alice_weekend.json pytests/A_memorix_test/data/benchmarks/results/ data1/ -mai_knowledge/knowledge.json mongodb/ NapCat.Framework.Windows.Once/ NapCat.Framework.Windows.OneKey/ diff --git a/pytests/test_gemini_thought_signatures.py b/pytests/test_gemini_thought_signatures.py new file mode 100644 index 00000000..ace63d6f --- /dev/null +++ b/pytests/test_gemini_thought_signatures.py @@ -0,0 +1,72 @@ +import base64 +import sys +from types import ModuleType, SimpleNamespace + + +config_module = ModuleType("src.config.config") + + +class _ConfigManagerStub: + def get_model_config(self) -> SimpleNamespace: + return SimpleNamespace(api_providers=[]) + + def register_reload_callback(self, _: object) -> None: + return None + + +config_module.config_manager = _ConfigManagerStub() +sys.modules.setdefault("src.config.config", config_module) + +from src.llm_models.model_client import gemini_client +from src.llm_models.payload_content.message import MessageBuilder, RoleType +from src.llm_models.payload_content.tool_option import ToolCall + + +def _encode_signature(value: bytes) -> str: + return base64.b64encode(value).decode("ascii") + + +def test_convert_messages_preserves_gemini_function_call_signature_and_tool_result_id() -> None: + thought_signature = b"gemini-signature" + tool_call = ToolCall( + call_id="call-1", + func_name="reply", + args={"msg_id": "42"}, + extra_content={"google": {"thought_signature": _encode_signature(thought_signature)}}, + ) + assistant_message = MessageBuilder().set_role(RoleType.Assistant).set_tool_calls([tool_call]).build() + tool_message = ( + MessageBuilder() + .set_role(RoleType.Tool) + .set_tool_call_id("call-1") + .set_tool_name("reply") + .add_text_content('{"ok": true}') + .build() + ) + + contents, _ = gemini_client._convert_messages([assistant_message, tool_message]) + + assistant_part = contents[0].parts[0] + assert assistant_part.function_call is not None + assert assistant_part.function_call.id == "call-1" + assert assistant_part.function_call.name == "reply" + assert assistant_part.thought_signature == thought_signature + + tool_part = contents[1].parts[0] + assert tool_part.function_response is not None + assert tool_part.function_response.id == "call-1" + assert tool_part.function_response.name == "reply" + assert tool_part.function_response.response == {"ok": True} + + +def test_convert_messages_injects_dummy_signature_for_first_historical_tool_call() -> None: + tool_calls = [ + ToolCall(call_id="call-1", func_name="reply", args={"msg_id": "1"}), + ToolCall(call_id="call-2", func_name="reply", args={"msg_id": "2"}), + ] + assistant_message = MessageBuilder().set_role(RoleType.Assistant).set_tool_calls(tool_calls).build() + + contents, _ = gemini_client._convert_messages([assistant_message]) + + assert contents[0].parts[0].thought_signature == gemini_client.GEMINI_FALLBACK_THOUGHT_SIGNATURE + assert contents[0].parts[1].thought_signature is None diff --git a/src/common/data_models/llm_service_data_models.py b/src/common/data_models/llm_service_data_models.py index cacd3e10..415707b0 100644 --- a/src/common/data_models/llm_service_data_models.py +++ b/src/common/data_models/llm_service_data_models.py @@ -135,6 +135,7 @@ class LLMServiceResult(BaseDataModel): "name": tool_call.func_name, "arguments": tool_call.args or {}, }, + **({"extra_content": tool_call.extra_content} if tool_call.extra_content else {}), } for tool_call in self.completion.tool_calls ] diff --git a/src/common/database/database_model.py b/src/common/database/database_model.py index 4164429f..2a35ab0c 100644 --- a/src/common/database/database_model.py +++ b/src/common/database/database_model.py @@ -221,22 +221,6 @@ class Jargon(SQLModel, table=True): inference_with_content_only: Optional[str] = Field( default=None, sa_column=Column(Text, nullable=True) ) # 只基于词条的推断结果,JSON格式 - - -class MaiKnowledge(SQLModel, table=True): - """存储 Maisaka 的用户画像知识。""" - - __tablename__ = "mai_knowledge" # type: ignore - - id: Optional[int] = Field(default=None, primary_key=True) - knowledge_id: str = Field(index=True, max_length=255) - category_id: str = Field(index=True, max_length=32) - content: str - normalized_content: str = Field(index=True) - metadata_json: Optional[str] = Field(default=None, nullable=True) - created_at: datetime = Field(default_factory=datetime.now, sa_column=Column(DateTime, index=True)) - - class ChatHistory(SQLModel, table=True): """存储聊天历史记录的模型""" diff --git a/src/common/database/migrations/frozen_v2_schema.py b/src/common/database/migrations/frozen_v2_schema.py index f427ace5..926d69dc 100644 --- a/src/common/database/migrations/frozen_v2_schema.py +++ b/src/common/database/migrations/frozen_v2_schema.py @@ -138,18 +138,6 @@ _V2_TABLE_STATEMENTS = ( ) """, """ - CREATE TABLE IF NOT EXISTS mai_knowledge ( - id INTEGER NOT NULL, - knowledge_id VARCHAR(255) NOT NULL, - category_id VARCHAR(32) NOT NULL, - content VARCHAR NOT NULL, - normalized_content VARCHAR NOT NULL, - metadata_json VARCHAR, - created_at DATETIME, - PRIMARY KEY (id) - ) - """, - """ CREATE TABLE IF NOT EXISTS mai_messages ( id INTEGER NOT NULL, message_id VARCHAR(255) NOT NULL, @@ -260,10 +248,6 @@ _V2_INDEX_STATEMENTS = ( "CREATE INDEX IF NOT EXISTS ix_llm_usage_model_assign_name ON llm_usage (model_assign_name)", "CREATE INDEX IF NOT EXISTS ix_llm_usage_model_name ON llm_usage (model_name)", "CREATE INDEX IF NOT EXISTS ix_llm_usage_timestamp ON llm_usage (timestamp)", - "CREATE INDEX IF NOT EXISTS ix_mai_knowledge_category_id ON mai_knowledge (category_id)", - "CREATE INDEX IF NOT EXISTS ix_mai_knowledge_created_at ON mai_knowledge (created_at)", - "CREATE INDEX IF NOT EXISTS ix_mai_knowledge_knowledge_id ON mai_knowledge (knowledge_id)", - "CREATE INDEX IF NOT EXISTS ix_mai_knowledge_normalized_content ON mai_knowledge (normalized_content)", "CREATE INDEX IF NOT EXISTS ix_mai_messages_group_id ON mai_messages (group_id)", "CREATE INDEX IF NOT EXISTS ix_mai_messages_message_id ON mai_messages (message_id)", "CREATE INDEX IF NOT EXISTS ix_mai_messages_platform ON mai_messages (platform)", diff --git a/src/config/config.py b/src/config/config.py index e4412f2d..ac36a045 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -56,7 +56,7 @@ CONFIG_DIR: Path = PROJECT_ROOT / "config" BOT_CONFIG_PATH: Path = (CONFIG_DIR / "bot_config.toml").resolve().absolute() MODEL_CONFIG_PATH: Path = (CONFIG_DIR / "model_config.toml").resolve().absolute() MMC_VERSION: str = "1.0.0" -CONFIG_VERSION: str = "8.4.1" +CONFIG_VERSION: str = "8.5.0" MODEL_CONFIG_VERSION: str = "1.13.1" logger = get_logger("config") diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 6a6bc627..7f3cd08e 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -1456,15 +1456,6 @@ class MaiSakaConfig(ConfigBase): __ui_label__ = "MaiSaka" __ui_icon__ = "message-circle" - - enable_knowledge_module: bool = Field( - default=True, - json_schema_extra={ - "x-widget": "switch", - "x-icon": "book", - }, - ) - """启用知识库模块""" cli_user_name: str = Field( default="用户", json_schema_extra={ diff --git a/src/core/types.py b/src/core/types.py index aff857a3..63cc70b2 100644 --- a/src/core/types.py +++ b/src/core/types.py @@ -315,6 +315,9 @@ class MaiMessages: call_id=str(tool_call.get("call_id", "")), func_name=str(tool_call.get("func_name", "")), args=tool_call.get("args"), + extra_content=tool_call.get("extra_content") + if isinstance(tool_call.get("extra_content"), dict) + else None, ) ) return deserialized_tool_calls diff --git a/src/know_u/__init__.py b/src/know_u/__init__.py deleted file mode 100644 index 9945120b..00000000 --- a/src/know_u/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -""" -Knowledge utilities package for Maisaka. -""" diff --git a/src/know_u/knowledge.py b/src/know_u/knowledge.py deleted file mode 100644 index fb8df1a3..00000000 --- a/src/know_u/knowledge.py +++ /dev/null @@ -1,363 +0,0 @@ -""" -Maisaka knowledge retrieval and learning helpers. -""" - -from typing import Any, Dict, List - -import asyncio -import json - -from src.chat.message_receive.message import SessionMessage -from src.chat.utils.utils import is_bot_self -from src.common.data_models.llm_service_data_models import LLMGenerationOptions -from src.common.logger import get_logger -from src.know_u.knowledge_store import KNOWLEDGE_CATEGORIES, get_knowledge_store -from src.maisaka.context_messages import AssistantMessage, LLMContextMessage, SessionBackedMessage, ToolResultMessage -from src.maisaka.message_adapter import parse_speaker_content -from src.person_info.person_info import Person -from src.services.llm_service import LLMServiceClient - -logger = get_logger("maisaka_knowledge") - -NO_RESULT_KEYWORDS = [ - "无", - "没有", - "不适用", - "无需", - "无相关", -] - - -def extract_category_ids_from_result(result: str) -> List[str]: - """Extract valid category ids from an LLM result string.""" - if not result: - return [] - - normalized = result.strip() - if not normalized: - return [] - - lowered = normalized.lower() - if any(keyword in lowered for keyword in ["none", "no relevant", "no_need", "no need"]): - return [] - if any(keyword in normalized for keyword in NO_RESULT_KEYWORDS): - return [] - - category_ids: List[str] = [] - for part in normalized.replace(",", " ").replace(",", " ").replace("\n", " ").split(): - candidate = part.strip() - if candidate in KNOWLEDGE_CATEGORIES and candidate not in category_ids: - category_ids.append(candidate) - - return category_ids - - -async def retrieve_relevant_knowledge( - knowledge_analyzer: Any, - chat_history: List[LLMContextMessage], -) -> str: - """Retrieve formatted knowledge snippets relevant to the current chat history.""" - store = get_knowledge_store() - categories_summary = store.get_categories_summary() - - try: - category_ids = await knowledge_analyzer.analyze_knowledge_need(chat_history, categories_summary) - if not category_ids: - return "" - return store.get_formatted_knowledge(category_ids) - except Exception: - logger.exception("检索相关知识失败") - return "" - - -class KnowledgeLearner: - """ - 从最近对话中提取用户画像类知识并写入知识库。 - """ - - def __init__(self, session_id: str) -> None: - self._session_id = session_id - self._store = get_knowledge_store() - self._llm = LLMServiceClient(task_name="utils", request_type="maisaka.knowledge.learn") - self._learning_lock = asyncio.Lock() - self._last_processed_index = 0 - self.min_messages_for_extraction = 10 - - def get_pending_count(self, message_cache: List[SessionMessage]) -> int: - """??????????????""" - return max(0, len(message_cache) - self._last_processed_index) - - async def learn(self, message_cache: List[SessionMessage]) -> int: - """?????????????????????""" - pending_messages = message_cache[self._last_processed_index :] - if not pending_messages: - return 0 - if len(pending_messages) < self.min_messages_for_extraction: - return 0 - - async with self._learning_lock: - chat_excerpt = self._build_chat_excerpt(pending_messages) - if not chat_excerpt: - return 0 - - prompt = self._build_learning_prompt(chat_excerpt) - try: - result = await self._llm.generate_response( - prompt=prompt, - options=LLMGenerationOptions( - temperature=0.1, - max_tokens=512, - ), - ) - except Exception: - logger.exception("??????????") - return 0 - - knowledge_items = self._parse_learning_result(result.response or "") - if not knowledge_items: - logger.debug("?????????????????") - self._last_processed_index = len(message_cache) - return 0 - - added_count = 0 - for item in knowledge_items: - category_id = str(item.get("category_id", "")).strip() - content = str(item.get("content", "")).strip() - if not category_id or not content: - continue - - metadata = { - "session_id": self._session_id, - "source": "maisaka_learning", - } - for field_name in ("platform", "user_id", "user_nickname", "person_name"): - field_value = str(item.get(field_name, "")).strip() - if field_value: - metadata[field_name] = field_value - - if self._store.add_knowledge( - category_id=category_id, - content=content, - metadata=metadata, - ): - added_count += 1 - - self._last_processed_index = len(message_cache) - - if added_count > 0: - logger.info( - f"Maisaka ???????: ????={self._session_id} ????={added_count}" - ) - else: - logger.debug( - f"Maisaka ???????????????: ????={self._session_id}" - ) - - return added_count - - def _build_chat_excerpt(self, messages: List[SessionMessage]) -> str: - """ - 构建适合画像提取的对话片段,只保留用户可见文本。 - """ - lines: List[str] = [] - for message in messages[-30:]: - if isinstance(message, (AssistantMessage, ToolResultMessage)): - continue - if isinstance(message, SessionBackedMessage): - if message.original_message and is_bot_self( - message.original_message.platform, - message.original_message.message_info.user_info.user_id, - ): - continue - raw_text = message.processed_plain_text.strip() - fallback_speaker = ( - message.original_message.message_info.user_info.user_nickname - if message.original_message is not None - else "用户" - ) - else: - if is_bot_self(message.platform, message.message_info.user_info.user_id): - continue - raw_text = message.processed_plain_text.strip() - fallback_speaker = message.message_info.user_info.user_nickname or "用户" - - if not raw_text: - continue - - speaker_name, body = parse_speaker_content(raw_text) - visible_text = (body or raw_text).strip() - if not visible_text: - continue - - speaker = speaker_name or fallback_speaker - user_metadata = self._extract_message_user_metadata(message) - metadata_parts = [ - f"platform={user_metadata['platform'] or 'unknown'}", - f"user_id={user_metadata['user_id'] or 'unknown'}", - f"user_nickname={user_metadata['user_nickname'] or speaker}", - f"person_name={user_metadata['person_name'] or ''}", - ] - lines.append( - f"[用户信息] {'; '.join(metadata_parts)}\n" - f"[发言] {speaker}: {visible_text}" - ) - - return "\n".join(lines) - - @staticmethod - def _extract_message_user_metadata(message: SessionMessage) -> Dict[str, str]: - """提取消息对应的用户元信息。""" - source_message = message.original_message if isinstance(message, SessionBackedMessage) else message - platform = str(getattr(source_message, "platform", "") or "").strip() - user_info = getattr(getattr(source_message, "message_info", None), "user_info", None) - user_id = str(getattr(user_info, "user_id", "") or "").strip() - user_nickname = str(getattr(user_info, "user_nickname", "") or "").strip() - - person_name = "" - if platform and user_id: - try: - person = Person(platform=platform, user_id=user_id) - if person.is_known and person.person_name: - person_name = str(person.person_name).strip() - except Exception: - person_name = "" - - return { - "platform": platform, - "user_id": user_id, - "user_nickname": user_nickname, - "person_name": person_name, - } - - def _build_learning_prompt(self, chat_excerpt: str) -> str: - """构建知识提取提示词。""" - categories_text = "\n".join( - f"{category_id}. {category_name}" for category_id, category_name in KNOWLEDGE_CATEGORIES.items() - ) - return ( - "你是一个用户画像知识提取器,需要从聊天记录里提取稳定、可复用的用户事实。\n" - "只提取用户明确表达或高置信度可归纳的信息,不要猜测,不要提取一次性情绪,不要重复表述。\n" - "如果没有可提取内容,返回空数组 []。\n" - "输出必须是 JSON 数组,每项格式为 " - '{"category_id":"分类编号","content":"简洁中文陈述"}。\n' - "分类如下:\n" - f"{categories_text}\n\n" - "聊天记录:\n" - f"{chat_excerpt}" - ) - - def _parse_learning_result(self, result: str) -> List[Dict[str, str]]: - """解析模型返回的知识条目。""" - normalized = result.strip() - if not normalized: - return [] - - if "```" in normalized: - normalized = normalized.replace("```json", "").replace("```JSON", "").replace("```", "").strip() - - try: - parsed = json.loads(normalized) - except json.JSONDecodeError: - logger.warning("知识学习结果不是有效的 JSON") - return [] - - if not isinstance(parsed, list): - return [] - - normalized_items: List[Dict[str, str]] = [] - seen_pairs: set[tuple[str, str]] = set() - for item in parsed: - if not isinstance(item, dict): - continue - - category_id = str(item.get("category_id", "")).strip() - content = " ".join(str(item.get("content", "")).strip().split()) - if category_id not in KNOWLEDGE_CATEGORIES: - continue - if not content: - continue - - pair = (category_id, content) - if pair in seen_pairs: - continue - seen_pairs.add(pair) - normalized_items.append( - { - "category_id": category_id, - "content": content, - } - ) - - return normalized_items - - def _build_learning_prompt(self, chat_excerpt: str) -> str: - """构建知识提取提示词。""" - categories_text = "\n".join( - f"{category_id}. {category_name}" for category_id, category_name in KNOWLEDGE_CATEGORIES.items() - ) - return ( - "你是一个用户画像知识提取器,需要从聊天记录里提取稳定、可复用的用户事实。\n" - "聊天记录每条发言前都带有用户元信息,你必须明确判断这些特征属于哪个用户。\n" - "只提取用户明确表达或高置信度可归纳的信息,不要猜测,不要提取一次性情绪,不要重复表达。\n" - "如果没有可提取内容,返回空数组[]。\n" - "输出必须是 JSON 数组,每项格式为 " - '{"category_id":"分类编号","content":"简洁中文陈述","platform":"平台","user_id":"用户ID","user_nickname":"用户昵称","person_name":"人物名或空字符串"}。\n' - "其中 platform 和 user_id 必填;user_nickname 尽量填写;person_name 仅在用户信息中明确给出时填写,否则填空字符串。\n" - "同一条知识只能归属到一个用户,不要混合不同人的信息。\n" - "分类如下:\n" - f"{categories_text}\n\n" - "聊天记录:\n" - f"{chat_excerpt}" - ) - - def _parse_learning_result(self, result: str) -> List[Dict[str, str]]: - """解析模型返回的知识条目。""" - normalized = result.strip() - if not normalized: - return [] - - if "```" in normalized: - normalized = normalized.replace("```json", "").replace("```JSON", "").replace("```", "").strip() - - try: - parsed = json.loads(normalized) - except json.JSONDecodeError: - logger.warning("知识学习结果不是有效的 JSON") - return [] - - if not isinstance(parsed, list): - return [] - - normalized_items: List[Dict[str, str]] = [] - seen_pairs: set[tuple[str, str, str, str]] = set() - for item in parsed: - if not isinstance(item, dict): - continue - - category_id = str(item.get("category_id", "")).strip() - content = " ".join(str(item.get("content", "")).strip().split()) - platform = str(item.get("platform", "")).strip() - user_id = str(item.get("user_id", "")).strip() - user_nickname = str(item.get("user_nickname", "")).strip() - person_name = str(item.get("person_name", "")).strip() - if category_id not in KNOWLEDGE_CATEGORIES: - continue - if not content or not platform or not user_id: - continue - - pair = (category_id, content, platform, user_id) - if pair in seen_pairs: - continue - seen_pairs.add(pair) - normalized_items.append( - { - "category_id": category_id, - "content": content, - "platform": platform, - "user_id": user_id, - "user_nickname": user_nickname, - "person_name": person_name, - } - ) - - return normalized_items diff --git a/src/know_u/knowledge_store.py b/src/know_u/knowledge_store.py deleted file mode 100644 index 4ca56814..00000000 --- a/src/know_u/knowledge_store.py +++ /dev/null @@ -1,370 +0,0 @@ -""" -MaiSaka knowledge store. -""" - -from datetime import datetime -from pathlib import Path -from typing import Any, Dict, List, Optional - -import json - -from sqlmodel import col, select - -from src.common.database.database import DATABASE_URL, get_db_session -from src.common.database.database_model import MaiKnowledge - -PROJECT_ROOT = Path(__file__).resolve().parents[2] -KNOWLEDGE_DATA_DIR = PROJECT_ROOT / "mai_knowledge" -KNOWLEDGE_FILE = KNOWLEDGE_DATA_DIR / "knowledge.json" - - -KNOWLEDGE_CATEGORIES = { - "1": "性别", - "2": "性格", - "3": "饮食口味", - "4": "交友偏好", - "5": "情绪/理性倾向", - "6": "兴趣爱好", - "7": "职业/专业", - "8": "生活习惯", - "9": "价值观", - "10": "沟通风格", - "11": "学习方式", - "12": "压力应对方式", -} - - -class KnowledgeStore: - """存储 Maisaka 的用户画像知识。""" - - def __init__(self) -> None: - """初始化知识存储,并在需要时迁移旧版 JSON 数据。""" - self._ensure_legacy_data_dir() - self._migrate_legacy_file_if_needed() - - def _ensure_legacy_data_dir(self) -> None: - """确保旧版知识目录存在,便于兼容历史数据。""" - KNOWLEDGE_DATA_DIR.mkdir(parents=True, exist_ok=True) - - @staticmethod - def _normalize_content(content: str) -> str: - """标准化知识内容,便于去重。""" - return " ".join(str(content).strip().split()) - - @staticmethod - def _serialize_metadata(metadata: Optional[Dict[str, Any]]) -> Optional[str]: - """将元数据序列化为 JSON 文本。""" - if not metadata: - return None - return json.dumps(metadata, ensure_ascii=False, sort_keys=True) - - @staticmethod - def _deserialize_metadata(raw_text: Optional[str]) -> Dict[str, Any]: - """将 JSON 文本反序列化为元数据字典。""" - if not raw_text: - return {} - try: - parsed = json.loads(raw_text) - except json.JSONDecodeError: - return {} - return parsed if isinstance(parsed, dict) else {} - - @staticmethod - def _parse_created_at(raw_value: Any) -> datetime: - """解析旧版数据中的创建时间。""" - if isinstance(raw_value, datetime): - return raw_value - if isinstance(raw_value, str): - raw_text = raw_value.strip() - if raw_text: - try: - return datetime.fromisoformat(raw_text) - except ValueError: - pass - return datetime.now() - - @classmethod - def _build_item_dict(cls, record: MaiKnowledge) -> Dict[str, Any]: - """将数据库记录转换为兼容旧接口的字典。""" - return { - "id": record.knowledge_id, - "content": record.content, - "metadata": cls._deserialize_metadata(record.metadata_json), - "created_at": record.created_at.isoformat(), - } - - def _load_legacy_knowledge_file(self) -> Dict[str, List[Dict[str, Any]]]: - """读取旧版 JSON 知识文件。""" - if not KNOWLEDGE_FILE.exists(): - return {} - - try: - with open(KNOWLEDGE_FILE, "r", encoding="utf-8") as file: - loaded = json.load(file) - except Exception: - return {} - - if not isinstance(loaded, dict): - return {} - - normalized_knowledge: Dict[str, List[Dict[str, Any]]] = {} - for category_id in KNOWLEDGE_CATEGORIES: - category_items = loaded.get(category_id, []) - if isinstance(category_items, list): - normalized_knowledge[category_id] = [ - item for item in category_items if isinstance(item, dict) - ] - return normalized_knowledge - - def _migrate_legacy_file_if_needed(self) -> None: - """在数据库为空时,将旧版 JSON 中的知识导入数据库。""" - legacy_knowledge = self._load_legacy_knowledge_file() - if not legacy_knowledge: - return - - with get_db_session(auto_commit=False) as session: - existing_record = session.exec(select(MaiKnowledge.id).limit(1)).first() - if existing_record is not None: - return - - for category_id, items in legacy_knowledge.items(): - if category_id not in KNOWLEDGE_CATEGORIES: - continue - - for item in items: - content = self._normalize_content(str(item.get("content", ""))) - if not content: - continue - - metadata = item.get("metadata") - session.add( - MaiKnowledge( - knowledge_id=str(item.get("id") or f"know_{category_id}_{datetime.now().timestamp()}"), - category_id=category_id, - content=content, - normalized_content=content, - metadata_json=self._serialize_metadata(metadata if isinstance(metadata, dict) else None), - created_at=self._parse_created_at(item.get("created_at")), - ) - ) - - session.commit() - - def add_knowledge( - self, - category_id: str, - content: str, - metadata: Optional[Dict[str, Any]] = None, - ) -> bool: - """添加一条知识信息。""" - if category_id not in KNOWLEDGE_CATEGORIES: - return False - - normalized_content = self._normalize_content(content) - if not normalized_content: - return False - - user_platform = str((metadata or {}).get("platform", "")).strip() - user_id = str((metadata or {}).get("user_id", "")).strip() - with get_db_session(auto_commit=False) as session: - existing_records = session.exec( - select(MaiKnowledge).where( - MaiKnowledge.category_id == category_id, - MaiKnowledge.normalized_content == normalized_content, - ) - ).all() - for existing_record in existing_records: - existing_metadata = self._deserialize_metadata(existing_record.metadata_json) - existing_platform = str(existing_metadata.get("platform", "")).strip() - existing_user_id = str(existing_metadata.get("user_id", "")).strip() - if user_platform and user_id: - if existing_platform == user_platform and existing_user_id == user_id: - return False - continue - if not existing_platform and not existing_user_id: - return False - - session.add( - MaiKnowledge( - knowledge_id=f"know_{category_id}_{datetime.now().timestamp()}", - category_id=category_id, - content=normalized_content, - normalized_content=normalized_content, - metadata_json=self._serialize_metadata(metadata), - created_at=datetime.now(), - ) - ) - session.commit() - return True - - def search_knowledge( - self, - keyword: str, - limit: int = 10, - ) -> List[Dict[str, Any]]: - """按关键词搜索知识内容。""" - normalized_keyword = self._normalize_content(keyword) - if not normalized_keyword: - return [] - - limit_value = max(1, int(limit)) - with get_db_session() as session: - records = session.exec( - select(MaiKnowledge) - .where( - col(MaiKnowledge.content).contains(normalized_keyword) - | col(MaiKnowledge.normalized_content).contains(normalized_keyword) - ) - .order_by(MaiKnowledge.created_at.desc(), MaiKnowledge.id.desc()) - .limit(limit_value) - ).all() - - results: List[Dict[str, Any]] = [] - for record in records: - item = self._build_item_dict(record) - item["category_id"] = record.category_id - item["category_name"] = self.get_category_name(record.category_id) - results.append(item) - return results - - def get_knowledge_by_user( - self, - *, - platform: str = "", - user_id: str = "", - user_nickname: str = "", - person_name: str = "", - limit: int = 10, - ) -> List[Dict[str, Any]]: - """按用户元信息筛选知识条目。""" - platform = str(platform).strip() - user_id = str(user_id).strip() - user_nickname = str(user_nickname).strip() - person_name = str(person_name).strip() - if not any((platform, user_id, user_nickname, person_name)): - return [] - - limit_value = max(1, int(limit)) - with get_db_session() as session: - records = session.exec( - select(MaiKnowledge).order_by(MaiKnowledge.created_at.desc(), MaiKnowledge.id.desc()) - ).all() - - results: List[Dict[str, Any]] = [] - for record in records: - metadata = self._deserialize_metadata(record.metadata_json) - if user_id and str(metadata.get("user_id", "")).strip() != user_id: - continue - if platform and str(metadata.get("platform", "")).strip() != platform: - continue - if user_nickname and str(metadata.get("user_nickname", "")).strip() != user_nickname: - continue - if person_name and str(metadata.get("person_name", "")).strip() != person_name: - continue - - item = self._build_item_dict(record) - item["category_id"] = record.category_id - item["category_name"] = self.get_category_name(record.category_id) - results.append(item) - if len(results) >= limit_value: - break - - return results - - def get_category_knowledge(self, category_id: str) -> List[Dict[str, Any]]: - """获取某个分类下的所有知识。""" - if category_id not in KNOWLEDGE_CATEGORIES: - return [] - - with get_db_session() as session: - records = session.exec( - select(MaiKnowledge) - .where(MaiKnowledge.category_id == category_id) - .order_by(MaiKnowledge.created_at.asc(), MaiKnowledge.id.asc()) - ).all() - return [self._build_item_dict(record) for record in records] - - def get_all_knowledge(self) -> Dict[str, List[Dict[str, Any]]]: - """获取全部知识。""" - all_knowledge: Dict[str, List[Dict[str, Any]]] = { - category_id: [] for category_id in KNOWLEDGE_CATEGORIES - } - with get_db_session() as session: - records = session.exec( - select(MaiKnowledge).order_by( - MaiKnowledge.category_id.asc(), - MaiKnowledge.created_at.asc(), - MaiKnowledge.id.asc(), - ) - ).all() - - for record in records: - all_knowledge.setdefault(record.category_id, []).append(self._build_item_dict(record)) - return all_knowledge - - def get_category_name(self, category_id: str) -> str: - """获取分类名称。""" - return KNOWLEDGE_CATEGORIES.get(category_id, "未知分类") - - def get_categories_summary(self) -> str: - """获取分类摘要,供模型判断是否需要检索。""" - counts: Dict[str, int] = {category_id: 0 for category_id in KNOWLEDGE_CATEGORIES} - with get_db_session() as session: - records = session.exec(select(MaiKnowledge.category_id)).all() - - for category_id in records: - if category_id in counts: - counts[category_id] += 1 - - lines: List[str] = [] - for category_id, category_name in KNOWLEDGE_CATEGORIES.items(): - count = counts.get(category_id, 0) - count_text = f"{count}条" if count > 0 else "无数据" - lines.append(f"{category_id}. {category_name} ({count_text})") - return "\n".join(lines) - - def get_formatted_knowledge(self, category_ids: List[str], limit_per_category: int = 5) -> str: - """获取指定分类的格式化知识内容。""" - parts: List[str] = [] - for category_id in category_ids: - items = self.get_category_knowledge(category_id) - if not items: - continue - - category_name = self.get_category_name(category_id) - parts.append(f"【{category_name}】") - - recent_items = items[-limit_per_category:] - for item in recent_items: - content = str(item.get("content", "")).strip() - if content: - parts.append(f"- {content}") - - return "\n".join(parts) - - def get_stats(self) -> Dict[str, Any]: - """获取知识数据统计。""" - with get_db_session() as session: - total_items = len(session.exec(select(MaiKnowledge.id)).all()) - - return { - "total_categories": len(KNOWLEDGE_CATEGORIES), - "total_items": total_items, - "data_file": DATABASE_URL, - "data_exists": True, - "data_size_kb": 0, - "legacy_data_file": str(KNOWLEDGE_FILE), - "legacy_data_exists": KNOWLEDGE_FILE.exists(), - "storage_type": "database", - } - - -_knowledge_store_instance: Optional[KnowledgeStore] = None - - -def get_knowledge_store() -> KnowledgeStore: - """获取知识存储单例。""" - global _knowledge_store_instance - if _knowledge_store_instance is None: - _knowledge_store_instance = KnowledgeStore() - return _knowledge_store_instance diff --git a/src/llm_models/payload_content/tool_option.py b/src/llm_models/payload_content/tool_option.py index 27345170..ebb2a22f 100644 --- a/src/llm_models/payload_content/tool_option.py +++ b/src/llm_models/payload_content/tool_option.py @@ -524,6 +524,7 @@ class ToolCall: call_id: str func_name: str args: Dict[str, Any] | None = None + extra_content: Dict[str, Any] | None = None def __post_init__(self) -> None: """执行工具调用的基础校验。 diff --git a/src/maisaka/builtin_tool/query_person_info.py b/src/maisaka/builtin_tool/query_person_info.py index b902df8d..2c8340d4 100644 --- a/src/maisaka/builtin_tool/query_person_info.py +++ b/src/maisaka/builtin_tool/query_person_info.py @@ -9,7 +9,6 @@ from sqlmodel import col, select from src.common.database.database import get_db_session from src.common.database.database_model import PersonInfo from src.core.tooling import ToolExecutionContext, ToolExecutionResult, ToolInvocation, ToolSpec -from src.know_u.knowledge_store import get_knowledge_store from .context import BuiltinToolRuntimeContext @@ -79,7 +78,6 @@ async def handle_tool( result: Dict[str, Any] = { "query": person_name, "persons": persons, - "related_knowledge": _query_related_knowledge(person_name, persons, limit), } return tool_ctx.build_success_result( invocation.tool_name, @@ -129,55 +127,3 @@ def _query_person_records(person_name: str, limit: int) -> List[Dict[str, Any]]: ) return persons - - -def _query_related_knowledge( - person_name: str, - persons: List[Dict[str, Any]], - limit: int, -) -> List[Dict[str, Any]]: - """从 Maisaka knowledge 中补充检索与该人物相关的条目。""" - - store = get_knowledge_store() - knowledge_items: List[Dict[str, Any]] = [] - seen_ids: set[str] = set() - - for person in persons: - matched_items = store.get_knowledge_by_user( - platform=str(person.get("platform", "")).strip(), - user_id=str(person.get("user_id", "")).strip(), - user_nickname=str(person.get("user_nickname", "")).strip(), - person_name=str(person.get("person_name", "")).strip(), - limit=max(limit, 5), - ) - for item in matched_items: - item_id = str(item.get("id", "")).strip() - if item_id and item_id in seen_ids: - continue - if item_id: - seen_ids.add(item_id) - knowledge_items.append(item) - - if not knowledge_items: - fallback_items = store.search_knowledge(person_name, limit=max(limit, 5)) - for item in fallback_items: - item_id = str(item.get("id", "")).strip() - if item_id and item_id in seen_ids: - continue - if item_id: - seen_ids.add(item_id) - knowledge_items.append(item) - - results: List[Dict[str, Any]] = [] - for item in knowledge_items: - results.append( - { - "id": str(item.get("id", "")).strip(), - "category_id": str(item.get("category_id", "")).strip(), - "category_name": str(item.get("category_name", "")).strip(), - "content": str(item.get("content", "")).strip(), - "metadata": item.get("metadata", {}), - "created_at": item.get("created_at"), - } - ) - return results diff --git a/src/maisaka/chat_loop_service.py b/src/maisaka/chat_loop_service.py index 62908e84..5cbee7b2 100644 --- a/src/maisaka/chat_loop_service.py +++ b/src/maisaka/chat_loop_service.py @@ -18,7 +18,6 @@ from src.common.prompt_i18n import load_prompt from src.common.utils.utils_session import SessionUtils from src.config.config import global_config from src.core.tooling import ToolRegistry, ToolSpec -from src.know_u.knowledge import extract_category_ids_from_result from src.llm_models.model_client.base_client import BaseClient from src.llm_models.payload_content.message import Message, MessageBuilder, RoleType from src.llm_models.payload_content.resp_format import RespFormat, RespFormatType @@ -665,41 +664,6 @@ class MaisakaChatLoopService: ) return filtered_tool_specs - async def analyze_knowledge_need( - self, - chat_history: List[LLMContextMessage], - categories_summary: str, - ) -> List[str]: - """分析当前对话是否需要检索知识库分类。""" - visible_history: List[str] = [] - for message in chat_history[-8:]: - if not message.processed_plain_text: - continue - visible_history.append(f"{message.role}: {message.processed_plain_text}") - - if not visible_history or not categories_summary.strip(): - return [] - - prompt = ( - "你需要判断当前对话是否需要查询知识库。\n" - "请只返回最相关的分类编号,多个编号用空格分隔;如果完全不需要,返回 none。\n\n" - f"【可用分类】\n{categories_summary}\n\n" - f"【最近对话】\n{chr(10).join(visible_history)}" - ) - - try: - generation_result = await self._llm_chat.generate_response( - prompt=prompt, - options=LLMGenerationOptions( - temperature=0.1, - max_tokens=64, - ), - ) - except Exception: - return [] - - return extract_category_ids_from_result(generation_result.response or "") - async def chat_loop_step( self, chat_history: List[LLMContextMessage], diff --git a/src/maisaka/context_messages.py b/src/maisaka/context_messages.py index 18111cfa..8d85d237 100644 --- a/src/maisaka/context_messages.py +++ b/src/maisaka/context_messages.py @@ -263,7 +263,6 @@ class ReferenceMessageType(str, Enum): CUSTOM = "custom" JARGON = "jargon" - KNOWLEDGE = "knowledge" MEMORY = "memory" TOOL_HINT = "tool_hint" diff --git a/src/maisaka/runtime.py b/src/maisaka/runtime.py index 1c2935d2..b17783a1 100644 --- a/src/maisaka/runtime.py +++ b/src/maisaka/runtime.py @@ -19,7 +19,6 @@ from src.common.logger import get_logger from src.common.utils.utils_config import ExpressionConfigUtils from src.config.config import global_config from src.core.tooling import ToolRegistry -from src.know_u.knowledge import KnowledgeLearner from src.learners.expression_learner import ExpressionLearner from src.learners.jargon_miner import JargonMiner from src.llm_models.payload_content.resp_format import RespFormat @@ -102,10 +101,8 @@ class MaisakaHeartFlowChatting: self._enable_jargon_learning = jargon_learn self._min_extraction_interval = 30 self._last_expression_extraction_time = 0.0 - self._last_knowledge_extraction_time = 0.0 self._expression_learner = ExpressionLearner(session_id) self._jargon_miner = JargonMiner(session_id, session_name=session_name) - self._knowledge_learner = KnowledgeLearner(session_id) self._reasoning_engine = MaisakaReasoningEngine(self) self._tool_registry = ToolRegistry() @@ -449,16 +446,11 @@ class MaisakaHeartFlowChatting: self._wait_timeout_task = None async def _trigger_batch_learning(self, messages: list[SessionMessage]) -> None: - """按同一批消息触发表达方式、黑话和 knowledge 学习。""" - expression_result, knowledge_result = await asyncio.gather( - self._trigger_expression_learning(messages), - self._trigger_knowledge_learning(messages), - return_exceptions=True, - ) - if isinstance(expression_result, Exception): - logger.error(f"{self.log_prefix} 表达学习任务异常退出: {expression_result}") - if isinstance(knowledge_result, Exception): - logger.error(f"{self.log_prefix} 知识学习任务异常退出: {knowledge_result}") + """按同一批消息触发表达方式和黑话学习。""" + try: + await self._trigger_expression_learning(messages) + except Exception as exc: + logger.error(f"{self.log_prefix} 表达学习任务异常退出: {exc}") def _should_trigger_learning( self, @@ -523,34 +515,6 @@ class MaisakaHeartFlowChatting: except Exception: logger.exception(f"{self.log_prefix} ??????") - async def _trigger_knowledge_learning(self, messages: list[SessionMessage]) -> None: - """?????????????????""" - pending_count = self._knowledge_learner.get_pending_count(self.message_cache) - if not self._should_trigger_learning( - enabled=global_config.maisaka.enable_knowledge_module, - feature_name="知识学习", - last_extraction_time=self._last_knowledge_extraction_time, - pending_count=pending_count, - min_messages_for_extraction=self._knowledge_learner.min_messages_for_extraction, - ): - return - - self._last_knowledge_extraction_time = time.time() - logger.info( - f"{self.log_prefix} ??????: " - f"??????={len(messages)} ??????={pending_count} " - f"?????={len(self.message_cache)}" - ) - - try: - added_count = await self._knowledge_learner.learn(self.message_cache) - if added_count > 0: - logger.info(f"{self.log_prefix} ???????: ?????={added_count}") - else: - logger.debug(f"{self.log_prefix} ???????????????") - except Exception: - logger.exception(f"{self.log_prefix} ??????") - async def _init_mcp(self) -> None: """初始化 MCP 工具并注册到统一工具层。""" self._mcp_host_bridge = MCPHostLLMBridge( diff --git a/src/memory_system/chat_history_summarizer.py b/src/memory_system/chat_history_summarizer.py deleted file mode 100644 index 94f4390f..00000000 --- a/src/memory_system/chat_history_summarizer.py +++ /dev/null @@ -1,1123 +0,0 @@ -""" -聊天内容概括器 -用于累积、打包和压缩聊天记录 -""" - -import asyncio -import json -import time -import re -import difflib -import datetime -from pathlib import Path -from typing import Any, Dict, List, Optional, Set -from dataclasses import dataclass, field -from json_repair import repair_json - -from src.chat.message_receive.message import SessionMessage -from src.common.logger import get_logger -from src.config.config import global_config -from src.common.data_models.llm_service_data_models import LLMGenerationOptions -from src.services.llm_service import LLMServiceClient -from src.services import message_service as message_api -from src.chat.utils.utils import is_bot_self -from src.person_info.person_info import Person -from src.chat.message_receive.chat_manager import chat_manager as _chat_manager -from src.prompt.prompt_manager import prompt_manager - -logger = get_logger("chat_history_summarizer") - -HIPPO_CACHE_DIR = Path(__file__).resolve().parents[2] / "data" / "hippo_memorizer" - - -@dataclass -class MessageBatch: - """消息批次(用于触发话题检查的原始消息累积)""" - - messages: List[SessionMessage] - start_time: float - end_time: float - - -@dataclass -class TopicCacheItem: - """ - 话题缓存项 - - Attributes: - topic: 话题标题(一句话描述时间、人物、事件和主题) - messages: 与该话题相关的消息字符串列表(已经通过 build 函数转成可读文本) - participants: 涉及到的发言人昵称集合 - no_update_checks: 连续多少次“检查”没有新增内容 - """ - - topic: str - messages: List[str] = field(default_factory=list) - participants: Set[str] = field(default_factory=set) - no_update_checks: int = 0 - - -class ChatHistorySummarizer: - """聊天内容概括器""" - - def __init__(self, session_id: str, check_interval: int = 60): - """ - 初始化聊天内容概括器 - - Args: - session_id: 会话ID - check_interval: 定期检查间隔(秒),默认60秒 - """ - self.session_id = session_id - self._chat_display_name = self._get_chat_display_name() - self.log_prefix = f"[{self._chat_display_name}]" - - # 记录时间点,用于计算新消息 - self.last_check_time = time.time() - - # 记录上一次话题检查的时间,用于判断是否需要触发检查 - self.last_topic_check_time = time.time() - - # 当前累积的消息批次 - self.current_batch: Optional[MessageBatch] = None - - # 话题缓存:topic_str -> TopicCacheItem - # 在内存中维护,并通过本地文件实时持久化 - self.topic_cache: Dict[str, TopicCacheItem] = {} - self._safe_chat_id = self._sanitize_chat_id(self.session_id) - self._topic_cache_file = HIPPO_CACHE_DIR / f"{self._safe_chat_id}.json" - # 注意:批次加载需要异步查询消息,所以在 start() 中调用 - - # LLM请求器,用于压缩聊天内容 - self.summarizer_llm = LLMServiceClient( - task_name="utils", request_type="chat_history_summarizer" - ) - - # 后台循环相关 - self.check_interval = check_interval # 检查间隔(秒) - self._periodic_task: Optional[asyncio.Task] = None - self._running = False - - def _get_chat_display_name(self) -> str: - """获取聊天显示名称""" - try: - chat_name = _chat_manager.get_session_name(self.session_id) - if chat_name: - return chat_name - # 如果获取失败,使用简化的chat_id显示 - if len(self.session_id) > 20: - return f"{self.session_id[:8]}..." - return self.session_id - except Exception: - # 如果获取失败,使用简化的chat_id显示 - if len(self.session_id) > 20: - return f"{self.session_id[:8]}..." - return self.session_id - - def _sanitize_chat_id(self, chat_id: str) -> str: - """用于生成可作为文件名的 chat_id""" - return re.sub(r"[^a-zA-Z0-9_.-]", "_", chat_id) - - def _load_topic_cache_from_disk(self): - """在启动时加载本地话题缓存(同步部分),支持重启后继续""" - try: - if not self._topic_cache_file.exists(): - return - - with self._topic_cache_file.open("r", encoding="utf-8") as f: - data = json.load(f) - - self.last_topic_check_time = data.get("last_topic_check_time", self.last_topic_check_time) - topics_data = data.get("topics", {}) - loaded_count = 0 - for topic, payload in topics_data.items(): - self.topic_cache[topic] = TopicCacheItem( - topic=topic, - messages=payload.get("messages", []), - participants=set(payload.get("participants", [])), - no_update_checks=payload.get("no_update_checks", 0), - ) - loaded_count += 1 - - if loaded_count: - logger.info(f"{self.log_prefix} 已加载 {loaded_count} 个话题缓存,继续追踪") - except Exception as e: - logger.error(f"{self.log_prefix} 加载话题缓存失败: {e}") - - async def _load_batch_from_disk(self): - """在启动时加载聊天批次,支持重启后继续""" - try: - if not self._topic_cache_file.exists(): - return - - with self._topic_cache_file.open("r", encoding="utf-8") as f: - data = json.load(f) - - batch_data = data.get("current_batch") - if not batch_data: - return - - start_time = batch_data.get("start_time") - end_time = batch_data.get("end_time") - if not start_time or not end_time: - return - - # 根据时间范围重新查询消息 - messages = message_api.get_messages_by_time_in_chat( - chat_id=self.session_id, - start_time=start_time, - end_time=end_time, - limit=0, - limit_mode="latest", - filter_mai=False, - filter_command=False, - ) - - if messages: - self.current_batch = MessageBatch( - messages=messages, - start_time=start_time, - end_time=end_time, - ) - logger.info(f"{self.log_prefix} 已恢复聊天批次,包含 {len(messages)} 条消息") - except Exception as e: - logger.error(f"{self.log_prefix} 加载聊天批次失败: {e}") - - def _persist_topic_cache(self): - """实时持久化话题缓存和聊天批次,避免重启后丢失""" - try: - # 如果既没有话题缓存也没有批次,删除缓存文件 - if not self.topic_cache and not self.current_batch: - if self._topic_cache_file.exists(): - self._topic_cache_file.unlink() - return - - HIPPO_CACHE_DIR.mkdir(parents=True, exist_ok=True) - data = { - "chat_id": self.session_id, - "last_topic_check_time": self.last_topic_check_time, - "topics": { - topic: { - "messages": item.messages, - "participants": list(item.participants), - "no_update_checks": item.no_update_checks, - } - for topic, item in self.topic_cache.items() - }, - } - - # 保存当前批次的时间范围(如果有) - if self.current_batch: - data["current_batch"] = { - "start_time": self.current_batch.start_time, - "end_time": self.current_batch.end_time, - } - - with self._topic_cache_file.open("w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=2) - except Exception as e: - logger.error(f"{self.log_prefix} 持久化话题缓存失败: {e}") - - async def process(self, current_time: Optional[float] = None): - """ - 处理聊天内容概括 - - Args: - current_time: 当前时间戳,如果为None则使用time.time() - """ - if current_time is None: - current_time = time.time() - - try: - # 获取从上次检查时间到当前时间的新消息 - new_messages = message_api.get_messages_by_time_in_chat( - chat_id=self.session_id, - start_time=self.last_check_time, - end_time=current_time, - limit=0, - limit_mode="latest", - filter_mai=False, # 不过滤bot消息,因为需要检查bot是否发言 - filter_command=False, - ) - - if not new_messages: - # 没有新消息,检查是否需要进行“话题检查” - if self.current_batch and self.current_batch.messages: - await self._check_and_run_topic_check(current_time) - self.last_check_time = current_time - return - - logger.debug( - f"{self.log_prefix} 开始处理聊天概括,时间窗口: {self.last_check_time:.2f} -> {current_time:.2f}" - ) - - # 有新消息,更新最后检查时间 - self.last_check_time = current_time - - # 如果有当前批次,添加新消息 - if self.current_batch: - before_count = len(self.current_batch.messages) - self.current_batch.messages.extend(new_messages) - self.current_batch.end_time = current_time - logger.info( - f"{self.log_prefix} 更新聊天检查批次: {before_count} -> {len(self.current_batch.messages)} 条消息" - ) - # 更新批次后持久化 - self._persist_topic_cache() - else: - # 创建新批次 - self.current_batch = MessageBatch( - messages=new_messages, - start_time=new_messages[0].timestamp.timestamp() if new_messages else current_time, - end_time=current_time, - ) - logger.debug(f"{self.log_prefix} 新建聊天检查批次: {len(new_messages)} 条消息") - # 创建批次后持久化 - self._persist_topic_cache() - - # 检查是否需要触发“话题检查” - await self._check_and_run_topic_check(current_time) - - except Exception as e: - logger.error(f"{self.log_prefix} 处理聊天内容概括时出错: {e}") - import traceback - - traceback.print_exc() - - async def _check_and_run_topic_check(self, current_time: float): - """ - 检查是否需要进行一次“话题检查” - - 触发条件: - - 当前批次消息数 >= 100,或者 - - 距离上一次检查的时间 > 3600 秒(1小时) - """ - if not self.current_batch or not self.current_batch.messages: - return - - messages = self.current_batch.messages - message_count = len(messages) - time_since_last_check = current_time - self.last_topic_check_time - - # 格式化时间差显示 - if time_since_last_check < 60: - time_str = f"{time_since_last_check:.1f}秒" - elif time_since_last_check < 3600: - time_str = f"{time_since_last_check / 60:.1f}分钟" - else: - time_str = f"{time_since_last_check / 3600:.1f}小时" - - logger.debug(f"{self.log_prefix} 批次状态检查 | 消息数: {message_count} | 距上次检查: {time_str}") - - # 检查"话题检查"触发条件 - should_check = False - - # 从配置中获取阈值 - message_threshold = global_config.memory.chat_history_topic_check_message_threshold - time_threshold_hours = global_config.memory.chat_history_topic_check_time_hours - min_messages = global_config.memory.chat_history_topic_check_min_messages - time_threshold_seconds = time_threshold_hours * 3600 - - # 条件1: 消息数量达到阈值,触发一次检查 - if message_count >= message_threshold: - should_check = True - logger.info( - f"{self.log_prefix} 触发检查条件: 消息数量达到 {message_count} 条(阈值: {message_threshold}条)" - ) - - # 条件2: 距离上一次检查超过时间阈值且消息数量达到最小阈值,触发一次检查 - elif time_since_last_check > time_threshold_seconds and message_count >= min_messages: - should_check = True - logger.info( - f"{self.log_prefix} 触发检查条件: 距上次检查 {time_str}(阈值: {time_threshold_hours}小时)且消息数量达到 {message_count} 条(阈值: {min_messages}条)" - ) - - if should_check: - await self._run_topic_check_and_update_cache(messages) - # 本批次已经被处理为话题信息,可以清空 - self.current_batch = None - # 更新上一次检查时间,并持久化 - self.last_topic_check_time = current_time - self._persist_topic_cache() - - async def _run_topic_check_and_update_cache(self, messages: List[SessionMessage]): - """ - 执行一次“话题检查”: - 1. 首先确认这段消息里是否有 Bot 发言,没有则直接丢弃本次批次; - 2. 将消息编号并转成字符串,构造 LLM Prompt; - 3. 把历史话题标题列表放入 Prompt,要求 LLM: - - 识别当前聊天中的话题(1 个或多个); - - 为每个话题选出相关消息编号; - - 若话题属于历史话题,则沿用原话题标题; - 4. LLM 返回 JSON:多个 {topic, message_indices}; - 5. 更新本地话题缓存,并根据规则触发“话题打包存储”。 - """ - if not messages: - return - - start_time = messages[0].timestamp.timestamp() - end_time = messages[-1].timestamp.timestamp() - - logger.info( - f"{self.log_prefix} 开始话题检查 | 消息数: {len(messages)} | 时间范围: {start_time:.2f} - {end_time:.2f}" - ) - - # 1. 检查当前批次内是否有 bot 发言(只检查当前批次,不往前推) - # 原因:我们要记录的是 bot 参与过的对话片段,如果当前批次内 bot 没有发言, - # 说明 bot 没有参与这段对话,不应该记录 - has_bot_message = any( - is_bot_self(msg.platform, msg.message_info.user_info.user_id) for msg in messages - ) - - if not has_bot_message: - logger.info( - f"{self.log_prefix} 当前批次内无 Bot 发言,丢弃本次检查 | 时间范围: {start_time:.2f} - {end_time:.2f}" - ) - return - - # 2. 构造编号后的消息字符串和参与者信息 - numbered_lines, index_to_msg_str, index_to_msg_text, index_to_participants = ( - self._build_numbered_messages_for_llm(messages) - ) - - # 3. 调用 LLM 识别话题,并得到 topic -> indices(失败时最多重试 3 次) - existing_topics = list(self.topic_cache.keys()) - max_retries = 3 - attempt = 0 - success = False - topic_to_indices: Dict[str, List[int]] = {} - - while attempt < max_retries: - attempt += 1 - success, topic_to_indices = await self._analyze_topics_with_llm( - numbered_lines=numbered_lines, - existing_topics=existing_topics, - ) - - if success and topic_to_indices: - if attempt > 1: - logger.info( - f"{self.log_prefix} 话题识别在第 {attempt} 次重试后成功 | 话题数: {len(topic_to_indices)}" - ) - break - - logger.warning( - f"{self.log_prefix} 话题识别失败或无有效话题,第 {attempt} 次尝试失败" - + ("" if attempt >= max_retries else ",准备重试") - ) - - if not success or not topic_to_indices: - logger.error(f"{self.log_prefix} 话题识别连续 {max_retries} 次失败或始终无有效话题,本次检查放弃") - # 即使识别失败,也认为是一次"检查",但不更新 no_update_checks(保持原状) - return - - # 3.5. 检查新话题是否与历史话题相似(相似度>=90%则使用历史标题) - topic_mapping = self._build_topic_mapping(topic_to_indices, similarity_threshold=0.9) - - # 应用话题映射:将相似的新话题标题替换为历史话题标题 - if topic_mapping: - new_topic_to_indices: Dict[str, List[int]] = {} - for new_topic, indices in topic_to_indices.items(): - # 如果这个新话题需要映射到历史话题 - if new_topic in topic_mapping: - historical_topic = topic_mapping[new_topic] - # 如果历史话题已经存在,合并消息索引 - if historical_topic in new_topic_to_indices: - # 合并索引并去重 - combined_indices = list(set(new_topic_to_indices[historical_topic] + indices)) - new_topic_to_indices[historical_topic] = combined_indices - else: - new_topic_to_indices[historical_topic] = indices - else: - # 不需要映射,保持原样 - new_topic_to_indices[new_topic] = indices - topic_to_indices = new_topic_to_indices - - # 4. 统计哪些话题在本次检查中有新增内容 - updated_topics: Set[str] = set() - - for topic, indices in topic_to_indices.items(): - if not indices: - continue - - item = self.topic_cache.get(topic) - if not item: - # 新话题 - item = TopicCacheItem(topic=topic) - self.topic_cache[topic] = item - - # 收集属于该话题的消息文本(不带编号) - topic_msg_texts: List[str] = [] - new_participants: Set[str] = set() - for idx in indices: - msg_text = index_to_msg_text.get(idx) - if not msg_text: - continue - topic_msg_texts.append(msg_text) - new_participants.update(index_to_participants.get(idx, set())) - - if not topic_msg_texts: - continue - - # 将本次检查中属于该话题的所有消息合并为一个字符串(不带编号) - merged_text = "\n".join(topic_msg_texts) - item.messages.append(merged_text) - item.participants.update(new_participants) - # 本次检查中该话题有更新,重置计数 - item.no_update_checks = 0 - updated_topics.add(topic) - - # 5. 对于本次没有更新的历史话题,no_update_checks + 1 - for topic, item in list(self.topic_cache.items()): - if topic not in updated_topics: - item.no_update_checks += 1 - - # 6. 检查是否有话题需要打包存储 - # 从配置中获取阈值 - no_update_checks_threshold = global_config.memory.chat_history_finalize_no_update_checks - message_count_threshold = global_config.memory.chat_history_finalize_message_count - - topics_to_finalize: List[str] = [] - for topic, item in self.topic_cache.items(): - if item.no_update_checks >= no_update_checks_threshold: - logger.info( - f"{self.log_prefix} 话题[{topic}] 连续 {no_update_checks_threshold} 次检查无新增内容,触发打包存储" - ) - topics_to_finalize.append(topic) - continue - if len(item.messages) > message_count_threshold: - logger.info(f"{self.log_prefix} 话题[{topic}] 消息条数超过 {message_count_threshold},触发打包存储") - topics_to_finalize.append(topic) - - for topic in topics_to_finalize: - item = self.topic_cache.get(topic) - if not item: - continue - try: - await self._finalize_and_store_topic( - topic=topic, - item=item, - # 这里的时间范围尽量覆盖最近一次检查的区间 - start_time=start_time, - end_time=end_time, - ) - finally: - # 无论成功与否,都从缓存中删除,避免重复 - self.topic_cache.pop(topic, None) - - def _find_most_similar_topic( - self, new_topic: str, existing_topics: List[str], similarity_threshold: float = 0.9 - ) -> Optional[tuple[str, float]]: - """ - 查找与给定新话题最相似的历史话题 - - Args: - new_topic: 新话题标题 - existing_topics: 历史话题标题列表 - similarity_threshold: 相似度阈值,默认0.9(90%) - - Returns: - Optional[tuple[str, float]]: 如果找到相似度>=阈值的历史话题,返回(历史话题标题, 相似度), - 否则返回None - """ - if not existing_topics: - return None - - best_match = None - best_similarity = 0.0 - - for existing_topic in existing_topics: - similarity = difflib.SequenceMatcher(None, new_topic, existing_topic).ratio() - if similarity > best_similarity: - best_similarity = similarity - best_match = existing_topic - - # 如果相似度达到阈值,返回匹配结果 - if best_match and best_similarity >= similarity_threshold: - return (best_match, best_similarity) - - return None - - def _build_topic_mapping( - self, topic_to_indices: Dict[str, List[int]], similarity_threshold: float = 0.9 - ) -> Dict[str, str]: - """ - 构建新话题到历史话题的映射(如果相似度>=阈值) - - Args: - topic_to_indices: 新话题到消息索引的映射 - similarity_threshold: 相似度阈值,默认0.9(90%) - - Returns: - Dict[str, str]: 新话题 -> 历史话题的映射字典 - """ - existing_topics_list = list(self.topic_cache.keys()) - topic_mapping: Dict[str, str] = {} - - for new_topic in topic_to_indices.keys(): - # 如果新话题已经在历史话题中,不需要检查 - if new_topic in existing_topics_list: - continue - - # 查找最相似的历史话题 - result = self._find_most_similar_topic(new_topic, existing_topics_list, similarity_threshold) - if result: - historical_topic, similarity = result - topic_mapping[new_topic] = historical_topic - logger.info( - f"{self.log_prefix} 话题相似度检查: '{new_topic}' 与历史话题 '{historical_topic}' 相似度 {similarity:.2%},使用历史标题" - ) - - return topic_mapping - - def _build_numbered_messages_for_llm( - self, messages: List[SessionMessage] - ) -> tuple[List[str], Dict[int, str], Dict[int, str], Dict[int, Set[str]]]: - """ - 将消息转为带编号的字符串,供 LLM 选择使用。 - - 返回: - numbered_lines: ["1. xxx", "2. yyy", ...] # 带编号,用于 LLM 选择 - index_to_msg_str: idx -> "idx. xxx" # 带编号,用于 LLM 选择 - index_to_msg_text: idx -> "xxx" # 不带编号,用于最终存储 - index_to_participants: idx -> {nickname1, nickname2, ...} - """ - numbered_lines: List[str] = [] - index_to_msg_str: Dict[int, str] = {} - index_to_msg_text: Dict[int, str] = {} # 不带编号的消息文本 - index_to_participants: Dict[int, Set[str]] = {} - - for idx, msg in enumerate(messages, start=1): - # 使用 build_readable_messages 生成可读文本 - try: - text = message_api.build_readable_messages( - messages=[msg], - replace_bot_name=True, - timestamp_mode="normal_no_YMD", - read_mark=0.0, - truncate=False, - show_actions=False, - ).strip() - except Exception: - # 回退到简单文本 - text = getattr(msg, "processed_plain_text", "") or "" - - # 获取发言人昵称 - participants: Set[str] = set() - try: - platform = msg.platform - user_id = msg.message_info.user_info.user_id - if platform and user_id: - person = Person(platform=platform, user_id=user_id) - if person.person_name: - participants.add(person.person_name) - except Exception: - pass - - # 带编号的字符串(用于 LLM 选择) - line = f"{idx}. {text}" - numbered_lines.append(line) - index_to_msg_str[idx] = line - # 不带编号的文本(用于最终存储) - index_to_msg_text[idx] = text - index_to_participants[idx] = participants - - return numbered_lines, index_to_msg_str, index_to_msg_text, index_to_participants - - async def _analyze_topics_with_llm( - self, - numbered_lines: List[str], - existing_topics: List[str], - ) -> tuple[bool, Dict[str, List[int]]]: - """ - 使用 LLM 识别本次检查中的话题,并为每个话题选择相关消息编号。 - - 要求: - - 话题用一句话清晰描述正在发生的事件,包括时间、人物、主要事件和主题; - - 可以有 1 个或多个话题; - - 若某个话题与历史话题列表中的某个话题是同一件事,请直接使用历史话题的字符串; - - 输出 JSON,格式: - [ - { - "topic": "话题标题字符串", - "message_indices": [1, 2, 5] - }, - ... - ] - """ - if not numbered_lines: - return False, {} - - history_topics_block = "\n".join(f"- {t}" for t in existing_topics) if existing_topics else "(当前无历史话题)" - messages_block = "\n".join(numbered_lines) - - prompt_template = prompt_manager.get_prompt("hippo_topic_analysis") - prompt_template.add_context("history_topics_block", history_topics_block) - prompt_template.add_context("messages_block", messages_block) - prompt = await prompt_manager.render_prompt(prompt_template) - - try: - generation_result = await self.summarizer_llm.generate_response( - prompt=prompt, - options=LLMGenerationOptions(temperature=0.3), - ) - response = generation_result.response - - logger.info(f"{self.log_prefix} 话题识别LLM Prompt: {prompt}") - logger.info(f"{self.log_prefix} 话题识别LLM Response: {response}") - - # 尝试从响应中提取JSON代码块 - json_str = None - json_pattern = r"```json\s*(.*?)\s*```" - matches = re.findall(json_pattern, response, re.DOTALL) - - if matches: - # 找到JSON代码块,使用第一个匹配 - json_str = matches[0].strip() - else: - # 如果没有找到代码块,尝试查找JSON数组的开始和结束位置 - # 查找第一个 [ 和最后一个 ] - start_idx = response.find("[") - end_idx = response.rfind("]") - if start_idx != -1 and end_idx != -1 and end_idx > start_idx: - json_str = response[start_idx : end_idx + 1].strip() - else: - # 如果还是找不到,尝试直接使用整个响应(移除可能的markdown标记) - json_str = response.strip() - json_str = re.sub(r"^```json\s*", "", json_str, flags=re.MULTILINE) - json_str = re.sub(r"^```\s*", "", json_str, flags=re.MULTILINE) - json_str = json_str.strip() - - # 使用json_repair修复可能的JSON错误 - if json_str: - try: - repaired_json = repair_json(json_str) - result = json.loads(repaired_json) if isinstance(repaired_json, str) else repaired_json - except Exception as repair_error: - # 如果repair失败,尝试直接解析 - logger.warning(f"{self.log_prefix} JSON修复失败,尝试直接解析: {repair_error}") - result = json.loads(json_str) - else: - raise ValueError("无法从响应中提取JSON内容") - - if not isinstance(result, list): - logger.error(f"{self.log_prefix} 话题识别返回的 JSON 不是列表: {result}") - return False, {} - - topic_to_indices: Dict[str, List[int]] = {} - for item in result: - if not isinstance(item, dict): - continue - topic = item.get("topic") - indices = item.get("message_indices") or item.get("messages") or [] - if not topic or not isinstance(topic, str): - continue - if isinstance(indices, list): - valid_indices: List[int] = [] - for v in indices: - try: - iv = int(v) - if iv > 0: - valid_indices.append(iv) - except (TypeError, ValueError): - continue - if valid_indices: - topic_to_indices[topic] = valid_indices - - return True, topic_to_indices - - except Exception as e: - logger.error(f"{self.log_prefix} 话题识别 LLM 调用或解析失败: {e}") - logger.error(f"{self.log_prefix} LLM响应: {response if 'response' in locals() else 'N/A'}") - return False, {} - - async def _finalize_and_store_topic( - self, - topic: str, - item: TopicCacheItem, - start_time: float, - end_time: float, - ): - """ - 对某个话题进行最终打包存储: - 1. 将 messages(list[str]) 拼接为 original_text; - 2. 使用 LLM 对 original_text 进行总结,得到 summary 和 keywords,theme 直接使用话题字符串; - 3. 写入数据库 ChatHistory; - 4. 完成后,调用方会从缓存中删除该话题。 - """ - if not item.messages: - logger.info(f"{self.log_prefix} 话题[{topic}] 无消息内容,跳过打包") - return - - original_text = "\n".join(item.messages) - - logger.info( - f"{self.log_prefix} 开始将聊天记录构建成记忆:[{topic}] | 消息数: {len(item.messages)} | 时间范围: {start_time:.2f} - {end_time:.2f}" - ) - - # 使用 LLM 进行总结(基于话题名),带重试机制 - max_retries = 3 - attempt = 0 - success = False - keywords = [] - summary = "" - - while attempt < max_retries: - attempt += 1 - success, keywords, summary = await self._compress_with_llm(original_text, topic) - - if success and keywords and summary: - # 成功获取到有效的 keywords 和 summary - if attempt > 1: - logger.info(f"{self.log_prefix} 话题[{topic}] LLM 概括在第 {attempt} 次重试后成功") - break - - if attempt < max_retries: - logger.warning(f"{self.log_prefix} 话题[{topic}] LLM 概括失败(第 {attempt} 次尝试),准备重试") - else: - logger.error(f"{self.log_prefix} 话题[{topic}] LLM 概括连续 {max_retries} 次失败,放弃存储") - - if not success or not keywords or not summary: - logger.warning(f"{self.log_prefix} 话题[{topic}] LLM 概括失败,不写入数据库") - return - - participants = list(item.participants) - - await self._store_to_database( - start_time=start_time, - end_time=end_time, - original_text=original_text, - participants=participants, - theme=topic, # 主题直接使用话题名 - keywords=keywords, - summary=summary, - ) - - logger.info( - f"{self.log_prefix} 话题[{topic}] 成功打包并存储 | 消息数: {len(item.messages)} | 参与者数: {len(participants)}" - ) - - async def _compress_with_llm(self, original_text: str, topic: str) -> tuple[bool, List[str], str]: - """ - 使用LLM压缩聊天内容(用于单个话题的最终总结) - - Args: - original_text: 聊天记录原文 - topic: 话题名称 - - Returns: - tuple[bool, List[str], str]: (是否成功, 关键词列表, 概括) - """ - prompt_template = prompt_manager.get_prompt("hippo_topic_summary") - prompt_template.add_context("topic", topic) - prompt_template.add_context("original_text", original_text) - prompt = await prompt_manager.render_prompt(prompt_template) - - try: - generation_result = await self.summarizer_llm.generate_response(prompt=prompt) - response = generation_result.response - - # 解析JSON响应 - json_str = response.strip() - json_str = re.sub(r"^```json\s*", "", json_str, flags=re.MULTILINE) - json_str = re.sub(r"^```\s*", "", json_str, flags=re.MULTILINE) - json_str = json_str.strip() - - # 查找JSON对象的开始与结束 - start_idx = json_str.find("{") - if start_idx == -1: - raise ValueError("未找到JSON对象开始标记") - - end_idx = json_str.rfind("}") - if end_idx == -1 or end_idx <= start_idx: - logger.warning(f"{self.log_prefix} JSON缺少结束标记,尝试自动修复") - extracted_json = json_str[start_idx:] - else: - extracted_json = json_str[start_idx : end_idx + 1] - - def _parse_with_quote_fix(payload: str) -> Dict[str, Any]: - fixed_chars: List[str] = [] - in_string = False - escape_next = False - i = 0 - while i < len(payload): - char = payload[i] - if escape_next: - fixed_chars.append(char) - escape_next = False - elif char == "\\": - fixed_chars.append(char) - escape_next = True - elif char == '"' and not escape_next: - fixed_chars.append(char) - in_string = not in_string - elif in_string and char in {"“", "”"}: - # 在字符串值内部,将中文引号替换为转义的英文引号 - fixed_chars.append('\\"') - else: - fixed_chars.append(char) - i += 1 - - repaired = "".join(fixed_chars) - return json.loads(repaired) - - try: - result = json.loads(extracted_json) - except json.JSONDecodeError: - try: - repaired_json = repair_json(extracted_json) - if isinstance(repaired_json, str): - result = json.loads(repaired_json) - else: - result = repaired_json - except Exception as repair_error: - logger.warning(f"{self.log_prefix} repair_json 失败,使用引号修复: {repair_error}") - result = _parse_with_quote_fix(extracted_json) - - keywords = result.get("keywords", []) - summary = result.get("summary", "") - - # 检查必需字段是否为空 - if not keywords or not summary: - logger.warning(f"{self.log_prefix} LLM返回的JSON中缺少必需字段,原文\n{response}") - # 返回失败,和模型出错一样,让上层进行重试 - return False, [], "" - - # 确保keywords是列表 - if isinstance(keywords, str): - keywords = [keywords] - - return True, keywords, summary - - except Exception as e: - logger.error(f"{self.log_prefix} LLM压缩聊天内容时出错: {e}") - logger.error(f"{self.log_prefix} LLM响应: {response if 'response' in locals() else 'N/A'}") - # 返回失败标志和默认值 - return False, [], "压缩失败,无法生成概括" - - async def _store_to_database( - self, - start_time: float, - end_time: float, - original_text: str, - participants: List[str], - theme: str, - keywords: List[str], - summary: str, - ): - """存储到数据库""" - try: - from src.common.database.database_model import ChatHistory - from src.services import database_service as database_api - - # 准备数据 - data = { - "session_id": self.session_id, - "start_timestamp": datetime.fromtimestamp(start_time), - "end_timestamp": datetime.fromtimestamp(end_time), - "original_messages": original_text, - "participants": json.dumps(participants, ensure_ascii=False), - "theme": theme, - "keywords": json.dumps(keywords, ensure_ascii=False), - "summary": summary, - "query_count": 0, - "query_forget_count": 0, - } - - saved_record = await database_api.db_save( - ChatHistory, - data=data, - ) - - if saved_record: - logger.debug(f"{self.log_prefix} 成功存储聊天历史记录到数据库") - else: - logger.warning(f"{self.log_prefix} 存储聊天历史记录到数据库失败") - - if saved_record and saved_record.get("id") is not None: - await self._import_to_long_term_memory( - record_id=int(saved_record["id"]), - theme=theme, - summary=summary, - participants=participants, - start_time=start_time, - end_time=end_time, - original_text=original_text, - ) - - except Exception as e: - logger.error(f"{self.log_prefix} 存储到数据库时出错: {e}") - import traceback - - traceback.print_exc() - raise - - async def _import_to_long_term_memory( - self, - record_id: int, - theme: str, - summary: str, - participants: List[str], - start_time: float, - end_time: float, - original_text: str, - ): - """ - 将聊天历史总结导入到统一长期记忆 - - Args: - record_id: chat_history 主键 - theme: 话题主题 - summary: 概括内容 - participants: 参与者列表 - start_time: 开始时间 - end_time: 结束时间 - original_text: 原始文本(可能很长,需要截断) - """ - try: - from src.services.memory_service import memory_service - session = _chat_manager.get_session_by_session_id(self.session_id) - session_user_id = str(getattr(session, "user_id", "") or "").strip() if session else "" - session_group_id = str(getattr(session, "group_id", "") or "").strip() if session else "" - - content_parts = [] - if theme: - content_parts.append(f"主题:{theme}") - if summary: - content_parts.append(f"概括:{summary}") - if participants: - participants_text = "、".join(participants) - content_parts.append(f"参与者:{participants_text}") - content_to_import = "\n".join(content_parts) - - if not content_to_import.strip(): - logger.warning(f"{self.log_prefix} 聊天历史总结内容为空,改用插件侧 generate_from_chat 兜底") - await self._fallback_import_to_long_term_memory( - record_id=record_id, - theme=theme, - participants=participants, - start_time=start_time, - end_time=end_time, - original_text=original_text, - ) - return - - result = await memory_service.ingest_summary( - external_id=f"chat_history:{record_id}", - chat_id=self.session_id, - text=content_to_import, - participants=participants, - time_start=start_time, - time_end=end_time, - tags=[theme] if theme else [], - metadata={"theme": theme, "original_text_length": len(original_text or "")}, - respect_filter=True, - user_id=session_user_id, - group_id=session_group_id, - ) - if result.success: - if result.detail == "chat_filtered": - logger.debug(f"{self.log_prefix} 聊天历史总结被聊天过滤策略跳过 | 话题: {theme}") - else: - logger.info(f"{self.log_prefix} 成功将聊天历史总结导入到长期记忆 | 话题: {theme}") - else: - logger.warning(f"{self.log_prefix} 将聊天历史总结导入到长期记忆失败,尝试插件侧兜底 | 话题: {theme} | 错误: {result.detail}") - await self._fallback_import_to_long_term_memory( - record_id=record_id, - theme=theme, - participants=participants, - start_time=start_time, - end_time=end_time, - original_text=original_text, - ) - - except Exception as e: - logger.error(f"{self.log_prefix} 导入聊天历史总结到长期记忆时出错: {e}", exc_info=True) - - async def _fallback_import_to_long_term_memory( - self, - *, - record_id: int, - theme: str, - participants: List[str], - start_time: float, - end_time: float, - original_text: str, - ) -> None: - try: - from src.services.memory_service import memory_service - session = _chat_manager.get_session_by_session_id(self.session_id) - session_user_id = str(getattr(session, "user_id", "") or "").strip() if session else "" - session_group_id = str(getattr(session, "group_id", "") or "").strip() if session else "" - - result = await memory_service.ingest_summary( - external_id=f"chat_history:{record_id}", - chat_id=self.session_id, - text="", - participants=participants, - time_start=start_time, - time_end=end_time, - tags=[theme] if theme else [], - metadata={ - "theme": theme, - "original_text_length": len(original_text or ""), - "generate_from_chat": True, - "context_length": global_config.memory.chat_history_topic_check_message_threshold, - }, - respect_filter=True, - user_id=session_user_id, - group_id=session_group_id, - ) - if result.success: - if result.detail == "chat_filtered": - logger.debug(f"{self.log_prefix} 插件侧 generate_from_chat 兜底被聊天过滤策略跳过 | 话题: {theme}") - else: - logger.info(f"{self.log_prefix} 插件侧 generate_from_chat 兜底导入成功 | 话题: {theme}") - else: - logger.warning(f"{self.log_prefix} 插件侧 generate_from_chat 兜底导入失败 | 话题: {theme} | 错误: {result.detail}") - except Exception as exc: - logger.error(f"{self.log_prefix} 插件侧兜底导入长期记忆失败: {exc}", exc_info=True) - - async def start(self): - """启动后台定期检查循环""" - if self._running: - logger.warning(f"{self.log_prefix} 后台循环已在运行,无需重复启动") - return - - # 加载聊天批次(如果有) - await self._load_batch_from_disk() - - self._running = True - self._periodic_task = asyncio.create_task(self._periodic_check_loop()) - logger.info(f"{self.log_prefix} 已启动后台定期检查循环 | 检查间隔: {self.check_interval}秒") - - async def stop(self): - """停止后台定期检查循环""" - self._running = False - if self._periodic_task: - self._periodic_task.cancel() - try: - await self._periodic_task - except asyncio.CancelledError: - pass - self._periodic_task = None - logger.info(f"{self.log_prefix} 已停止后台定期检查循环") - - async def _periodic_check_loop(self): - """后台定期检查循环""" - try: - while self._running: - # 执行一次检查 - await self.process() - - # 等待指定间隔后再次检查 - await asyncio.sleep(self.check_interval) - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} 后台检查循环被取消") - raise - except Exception as e: - logger.error(f"{self.log_prefix} 后台检查循环出错: {e}") - import traceback - - traceback.print_exc() - self._running = False diff --git a/src/plugin_runtime/hook_payloads.py b/src/plugin_runtime/hook_payloads.py index 9d3fbc69..6e9cc9e7 100644 --- a/src/plugin_runtime/hook_payloads.py +++ b/src/plugin_runtime/hook_payloads.py @@ -62,6 +62,7 @@ def serialize_tool_calls(tool_calls: Sequence[ToolCall] | None) -> List[Dict[str "name": tool_call.func_name, "arguments": dict(tool_call.args or {}), }, + **({"extra_content": tool_call.extra_content} if tool_call.extra_content else {}), } for tool_call in tool_calls ] @@ -102,11 +103,13 @@ def deserialize_tool_calls(raw_tool_calls: Any) -> List[ToolCall]: if not isinstance(call_id, str) or not isinstance(function_name, str): raise ValueError("Hook 返回的工具调用缺少 `id` 或函数名称") + extra_content = raw_tool_call.get("extra_content") normalized_tool_calls.append( ToolCall( call_id=call_id, func_name=function_name, args=function_arguments if isinstance(function_arguments, dict) else {}, + extra_content=extra_content if isinstance(extra_content, dict) else None, ) ) return normalized_tool_calls diff --git a/src/services/llm_service.py b/src/services/llm_service.py index 4b9972f3..9b69898a 100644 --- a/src/services/llm_service.py +++ b/src/services/llm_service.py @@ -384,11 +384,13 @@ def _build_tool_calls(raw_tool_calls: Any) -> List[ToolCall] | None: if not isinstance(call_id, str) or not isinstance(func_name, str): raise ValueError("工具调用缺少 `id` 或函数名称") + extra_content = raw_tool_call.get("extra_content") tool_calls.append( ToolCall( call_id=call_id, func_name=func_name, args=_normalize_tool_arguments(arguments), + extra_content=extra_content if isinstance(extra_content, dict) else None, ) )