From 0e14cb5de9b7b30ec2326e6198a6787af1dcb256 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Mon, 30 Mar 2026 01:04:27 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E5=A2=9E=E5=8A=A0=E8=AE=A4?= =?UTF-8?q?=E8=AF=86=E7=B3=BB=E7=BB=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/know_u/knowledge.py | 129 +++++++++++++++++++++++++-- src/know_u/knowledge_store.py | 94 ++++++++++++++++++-- src/maisaka/builtin_tools.py | 21 +++++ src/maisaka/reasoning_engine.py | 152 +++++++++++++++++++++++++++++++- 4 files changed, 382 insertions(+), 14 deletions(-) diff --git a/src/know_u/knowledge.py b/src/know_u/knowledge.py index 7fbc0948..1cafc270 100644 --- a/src/know_u/knowledge.py +++ b/src/know_u/knowledge.py @@ -11,11 +11,11 @@ from src.chat.message_receive.message import SessionMessage from src.chat.utils.utils import is_bot_self from src.common.data_models.llm_service_data_models import LLMGenerationOptions from src.common.logger import get_logger -from src.maisaka.context_messages import AssistantMessage, LLMContextMessage, SessionBackedMessage, ToolResultMessage -from src.services.llm_service import LLMServiceClient - from src.know_u.knowledge_store import KNOWLEDGE_CATEGORIES, get_knowledge_store +from src.maisaka.context_messages import AssistantMessage, LLMContextMessage, SessionBackedMessage, ToolResultMessage from src.maisaka.message_adapter import parse_speaker_content +from src.person_info.person_info import Person +from src.services.llm_service import LLMServiceClient logger = get_logger("maisaka_knowledge") @@ -130,13 +130,19 @@ class KnowledgeLearner: if not category_id or not content: continue + metadata = { + "session_id": self._session_id, + "source": "maisaka_learning", + } + for field_name in ("platform", "user_id", "user_nickname", "person_name"): + field_value = str(item.get(field_name, "")).strip() + if field_value: + metadata[field_name] = field_value + if self._store.add_knowledge( category_id=category_id, content=content, - metadata={ - "session_id": self._session_id, - "source": "maisaka_learning", - }, + metadata=metadata, ): added_count += 1 @@ -186,10 +192,45 @@ class KnowledgeLearner: continue speaker = speaker_name or fallback_speaker - lines.append(f"{speaker}: {visible_text}") + user_metadata = self._extract_message_user_metadata(message) + metadata_parts = [ + f"platform={user_metadata['platform'] or 'unknown'}", + f"user_id={user_metadata['user_id'] or 'unknown'}", + f"user_nickname={user_metadata['user_nickname'] or speaker}", + f"person_name={user_metadata['person_name'] or ''}", + ] + lines.append( + f"[用户信息] {'; '.join(metadata_parts)}\n" + f"[发言] {speaker}: {visible_text}" + ) return "\n".join(lines) + @staticmethod + def _extract_message_user_metadata(message: SessionMessage) -> Dict[str, str]: + """提取消息对应的用户元信息。""" + source_message = message.original_message if isinstance(message, SessionBackedMessage) else message + platform = str(getattr(source_message, "platform", "") or "").strip() + user_info = getattr(getattr(source_message, "message_info", None), "user_info", None) + user_id = str(getattr(user_info, "user_id", "") or "").strip() + user_nickname = str(getattr(user_info, "user_nickname", "") or "").strip() + + person_name = "" + if platform and user_id: + try: + person = Person(platform=platform, user_id=user_id) + if person.is_known and person.person_name: + person_name = str(person.person_name).strip() + except Exception: + person_name = "" + + return { + "platform": platform, + "user_id": user_id, + "user_nickname": user_nickname, + "person_name": person_name, + } + def _build_learning_prompt(self, chat_excerpt: str) -> str: """构建知识提取提示词。""" categories_text = "\n".join( @@ -250,3 +291,75 @@ class KnowledgeLearner: ) return normalized_items + + def _build_learning_prompt(self, chat_excerpt: str) -> str: + """构建知识提取提示词。""" + categories_text = "\n".join( + f"{category_id}. {category_name}" for category_id, category_name in KNOWLEDGE_CATEGORIES.items() + ) + return ( + "你是一个用户画像知识提取器,需要从聊天记录里提取稳定、可复用的用户事实。\n" + "聊天记录每条发言前都带有用户元信息,你必须明确判断这些特征属于哪个用户。\n" + "只提取用户明确表达或高置信度可归纳的信息,不要猜测,不要提取一次性情绪,不要重复表达。\n" + "如果没有可提取内容,返回空数组[]。\n" + "输出必须是 JSON 数组,每项格式为 " + '{"category_id":"分类编号","content":"简洁中文陈述","platform":"平台","user_id":"用户ID","user_nickname":"用户昵称","person_name":"人物名或空字符串"}。\n' + "其中 platform 和 user_id 必填;user_nickname 尽量填写;person_name 仅在用户信息中明确给出时填写,否则填空字符串。\n" + "同一条知识只能归属到一个用户,不要混合不同人的信息。\n" + "分类如下:\n" + f"{categories_text}\n\n" + "聊天记录:\n" + f"{chat_excerpt}" + ) + + def _parse_learning_result(self, result: str) -> List[Dict[str, str]]: + """解析模型返回的知识条目。""" + normalized = result.strip() + if not normalized: + return [] + + if "```" in normalized: + normalized = normalized.replace("```json", "").replace("```JSON", "").replace("```", "").strip() + + try: + parsed = json.loads(normalized) + except json.JSONDecodeError: + logger.warning("Knowledge learning result is not valid JSON") + return [] + + if not isinstance(parsed, list): + return [] + + normalized_items: List[Dict[str, str]] = [] + seen_pairs: set[tuple[str, str, str, str]] = set() + for item in parsed: + if not isinstance(item, dict): + continue + + category_id = str(item.get("category_id", "")).strip() + content = " ".join(str(item.get("content", "")).strip().split()) + platform = str(item.get("platform", "")).strip() + user_id = str(item.get("user_id", "")).strip() + user_nickname = str(item.get("user_nickname", "")).strip() + person_name = str(item.get("person_name", "")).strip() + if category_id not in KNOWLEDGE_CATEGORIES: + continue + if not content or not platform or not user_id: + continue + + pair = (category_id, content, platform, user_id) + if pair in seen_pairs: + continue + seen_pairs.add(pair) + normalized_items.append( + { + "category_id": category_id, + "content": content, + "platform": platform, + "user_id": user_id, + "user_nickname": user_nickname, + "person_name": person_name, + } + ) + + return normalized_items diff --git a/src/know_u/knowledge_store.py b/src/know_u/knowledge_store.py index c84b9c3d..4ca56814 100644 --- a/src/know_u/knowledge_store.py +++ b/src/know_u/knowledge_store.py @@ -8,7 +8,7 @@ from typing import Any, Dict, List, Optional import json -from sqlmodel import select +from sqlmodel import col, select from src.common.database.database import DATABASE_URL, get_db_session from src.common.database.database_model import MaiKnowledge @@ -164,15 +164,25 @@ class KnowledgeStore: if not normalized_content: return False + user_platform = str((metadata or {}).get("platform", "")).strip() + user_id = str((metadata or {}).get("user_id", "")).strip() with get_db_session(auto_commit=False) as session: - existing_record = session.exec( + existing_records = session.exec( select(MaiKnowledge).where( MaiKnowledge.category_id == category_id, MaiKnowledge.normalized_content == normalized_content, ) - ).first() - if existing_record is not None: - return False + ).all() + for existing_record in existing_records: + existing_metadata = self._deserialize_metadata(existing_record.metadata_json) + existing_platform = str(existing_metadata.get("platform", "")).strip() + existing_user_id = str(existing_metadata.get("user_id", "")).strip() + if user_platform and user_id: + if existing_platform == user_platform and existing_user_id == user_id: + return False + continue + if not existing_platform and not existing_user_id: + return False session.add( MaiKnowledge( @@ -187,6 +197,80 @@ class KnowledgeStore: session.commit() return True + def search_knowledge( + self, + keyword: str, + limit: int = 10, + ) -> List[Dict[str, Any]]: + """按关键词搜索知识内容。""" + normalized_keyword = self._normalize_content(keyword) + if not normalized_keyword: + return [] + + limit_value = max(1, int(limit)) + with get_db_session() as session: + records = session.exec( + select(MaiKnowledge) + .where( + col(MaiKnowledge.content).contains(normalized_keyword) + | col(MaiKnowledge.normalized_content).contains(normalized_keyword) + ) + .order_by(MaiKnowledge.created_at.desc(), MaiKnowledge.id.desc()) + .limit(limit_value) + ).all() + + results: List[Dict[str, Any]] = [] + for record in records: + item = self._build_item_dict(record) + item["category_id"] = record.category_id + item["category_name"] = self.get_category_name(record.category_id) + results.append(item) + return results + + def get_knowledge_by_user( + self, + *, + platform: str = "", + user_id: str = "", + user_nickname: str = "", + person_name: str = "", + limit: int = 10, + ) -> List[Dict[str, Any]]: + """按用户元信息筛选知识条目。""" + platform = str(platform).strip() + user_id = str(user_id).strip() + user_nickname = str(user_nickname).strip() + person_name = str(person_name).strip() + if not any((platform, user_id, user_nickname, person_name)): + return [] + + limit_value = max(1, int(limit)) + with get_db_session() as session: + records = session.exec( + select(MaiKnowledge).order_by(MaiKnowledge.created_at.desc(), MaiKnowledge.id.desc()) + ).all() + + results: List[Dict[str, Any]] = [] + for record in records: + metadata = self._deserialize_metadata(record.metadata_json) + if user_id and str(metadata.get("user_id", "")).strip() != user_id: + continue + if platform and str(metadata.get("platform", "")).strip() != platform: + continue + if user_nickname and str(metadata.get("user_nickname", "")).strip() != user_nickname: + continue + if person_name and str(metadata.get("person_name", "")).strip() != person_name: + continue + + item = self._build_item_dict(record) + item["category_id"] = record.category_id + item["category_name"] = self.get_category_name(record.category_id) + results.append(item) + if len(results) >= limit_value: + break + + return results + def get_category_knowledge(self, category_id: str) -> List[Dict[str, Any]]: """获取某个分类下的所有知识。""" if category_id not in KNOWLEDGE_CATEGORIES: diff --git a/src/maisaka/builtin_tools.py b/src/maisaka/builtin_tools.py index f7b82c9f..c301187a 100644 --- a/src/maisaka/builtin_tools.py +++ b/src/maisaka/builtin_tools.py @@ -70,6 +70,27 @@ def create_builtin_tools() -> List[ToolOption]: ) tools.append(query_jargon_builder.build()) + query_person_info_builder = ToolOptionBuilder() + query_person_info_builder.set_name("query_person_info") + query_person_info_builder.set_description( + "Query profile and memory information about a specific person by person name, nickname, or user ID." + ) + query_person_info_builder.add_param( + name="person_name", + param_type=ToolParamType.STRING, + description="The person's name, nickname, or user ID to search for.", + required=True, + enum_values=None, + ) + query_person_info_builder.add_param( + name="limit", + param_type=ToolParamType.INTEGER, + description="Maximum number of matched person records to return. Defaults to 3.", + required=False, + enum_values=None, + ) + tools.append(query_person_info_builder.build()) + no_reply_builder = ToolOptionBuilder() no_reply_builder.set_name("no_reply") no_reply_builder.set_description("Do not emit a visible reply this round and continue thinking.") diff --git a/src/maisaka/reasoning_engine.py b/src/maisaka/reasoning_engine.py index e5cd6dae..0f5de40b 100644 --- a/src/maisaka/reasoning_engine.py +++ b/src/maisaka/reasoning_engine.py @@ -1,7 +1,7 @@ """Maisaka 推理引擎。""" from datetime import datetime -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Any, Optional import asyncio import difflib @@ -9,14 +9,18 @@ import json import time import traceback +from sqlmodel import col, select from src.chat.heart_flow.heartFC_utils import CycleDetail from src.chat.message_receive.message import SessionMessage from src.chat.replyer.replyer_manager import replyer_manager from src.chat.utils.utils import process_llm_response from src.common.data_models.message_component_data_model import MessageSequence, TextComponent +from src.common.database.database import get_db_session +from src.common.database.database_model import PersonInfo from src.common.logger import get_logger from src.config.config import global_config +from src.know_u.knowledge_store import get_knowledge_store from src.learners.jargon_explainer import search_jargon from src.llm_models.exceptions import ReqAbortException from src.llm_models.payload_content.tool_option import ToolCall @@ -384,6 +388,10 @@ class MaisakaReasoningEngine: await self._handle_query_jargon(tool_call) continue + if tool_call.func_name == "query_person_info": + await self._handle_query_person_info(tool_call) + continue + if tool_call.func_name == "wait": seconds = (tool_call.args or {}).get("seconds", 30) try: @@ -478,6 +486,148 @@ class MaisakaReasoningEngine: ) ) + async def _handle_query_person_info(self, tool_call: ToolCall) -> None: + """查询指定人物的档案和相关知识。""" + tool_args = tool_call.args or {} + raw_person_name = tool_args.get("person_name") + raw_limit = tool_args.get("limit", 3) + + if not isinstance(raw_person_name, str): + self._runtime._chat_history.append( + self._build_tool_message(tool_call, "query_person_info requires a person_name string.") + ) + return + + person_name = raw_person_name.strip() + if not person_name: + self._runtime._chat_history.append( + self._build_tool_message(tool_call, "query_person_info requires a non-empty person_name.") + ) + return + + try: + limit = max(1, min(int(raw_limit), 10)) + except (TypeError, ValueError): + limit = 3 + + logger.info( + f"{self._runtime.log_prefix} query_person_info triggered: " + f"person_name={person_name!r} limit={limit}" + ) + + persons = self._query_person_records(person_name, limit) + result = { + "query": person_name, + "persons": persons, + "related_knowledge": self._query_related_knowledge(person_name, persons, limit), + } + + logger.info( + f"{self._runtime.log_prefix} query_person_info finished: " + f"persons={len(result['persons'])} related_knowledge={len(result['related_knowledge'])}" + ) + self._runtime._chat_history.append( + self._build_tool_message( + tool_call, + json.dumps(result, ensure_ascii=False), + ) + ) + + def _query_person_records(self, person_name: str, limit: int) -> list[dict[str, Any]]: + """按名称、昵称或用户 ID 查询人物档案。""" + with get_db_session() as session: + records = session.exec( + select(PersonInfo) + .where( + col(PersonInfo.person_name).contains(person_name) + | col(PersonInfo.user_nickname).contains(person_name) + | col(PersonInfo.user_id).contains(person_name) + ) + .order_by(col(PersonInfo.last_known_time).desc(), col(PersonInfo.id).desc()) + .limit(limit) + ).all() + + persons: list[dict[str, Any]] = [] + for record in records: + memory_points: list[str] = [] + if record.memory_points: + try: + parsed_points = json.loads(record.memory_points) + if isinstance(parsed_points, list): + memory_points = [str(point).strip() for point in parsed_points if str(point).strip()] + except (json.JSONDecodeError, TypeError, ValueError): + memory_points = [] + + persons.append( + { + "person_id": record.person_id, + "person_name": record.person_name or "", + "user_nickname": record.user_nickname, + "user_id": record.user_id, + "platform": record.platform, + "name_reason": record.name_reason or "", + "is_known": record.is_known, + "know_counts": record.know_counts, + "memory_points": memory_points[:20], + "last_known_time": ( + record.last_known_time.isoformat() if record.last_known_time is not None else None + ), + } + ) + + return persons + + def _query_related_knowledge( + self, + person_name: str, + persons: list[dict[str, Any]], + limit: int, + ) -> list[dict[str, Any]]: + """从 Maisaka knowledge 中补充检索与该人物相关的条目。""" + store = get_knowledge_store() + knowledge_items: list[dict[str, Any]] = [] + seen_ids: set[str] = set() + + for person in persons: + matched_items = store.get_knowledge_by_user( + platform=str(person.get("platform", "")).strip(), + user_id=str(person.get("user_id", "")).strip(), + user_nickname=str(person.get("user_nickname", "")).strip(), + person_name=str(person.get("person_name", "")).strip(), + limit=max(limit, 5), + ) + for item in matched_items: + item_id = str(item.get("id", "")).strip() + if item_id and item_id in seen_ids: + continue + if item_id: + seen_ids.add(item_id) + knowledge_items.append(item) + + if not knowledge_items: + fallback_items = store.search_knowledge(person_name, limit=max(limit, 5)) + for item in fallback_items: + item_id = str(item.get("id", "")).strip() + if item_id and item_id in seen_ids: + continue + if item_id: + seen_ids.add(item_id) + knowledge_items.append(item) + + results: list[dict[str, Any]] = [] + for item in knowledge_items: + results.append( + { + "id": str(item.get("id", "")).strip(), + "category_id": str(item.get("category_id", "")).strip(), + "category_name": str(item.get("category_name", "")).strip(), + "content": str(item.get("content", "")).strip(), + "metadata": item.get("metadata", {}), + "created_at": item.get("created_at"), + } + ) + return results + async def _handle_reply( self, tool_call: ToolCall,