diff --git a/src/person_info/person_info.py b/src/person_info/person_info.py index 0838156a..90ac16af 100644 --- a/src/person_info/person_info.py +++ b/src/person_info/person_info.py @@ -1,15 +1,10 @@ from datetime import datetime -from typing import Dict, Optional, Union +from typing import Optional, Union -import asyncio import hashlib import json -import math -import random import time -from json_repair import repair_json - from sqlmodel import col, select from src.chat.message_receive.chat_manager import chat_manager as _chat_manager @@ -19,15 +14,10 @@ from src.common.database.database_model import PersonInfo from src.common.logger import get_logger from src.config.config import global_config from src.services.memory_service import memory_service -from src.services.llm_service import LLMServiceClient logger = get_logger("person_info") -relation_selection_model = LLMServiceClient( - task_name="utils", request_type="relation_selection" -) - def _to_group_cardname_records(group_cardname_json: Optional[str]) -> list[dict[str, str]]: """将数据库中的群名片 JSON 转换为 `Person` 内部使用的结构。 @@ -133,50 +123,6 @@ def is_person_known( return False -def get_category_from_memory(memory_point: str) -> Optional[str]: - """从记忆点中获取分类""" - # 按照最左边的:符号进行分割,返回分割后的第一个部分作为分类 - if not isinstance(memory_point, str): - return None - parts = memory_point.split(":", 1) - return parts[0].strip() if len(parts) > 1 else None - - -def get_weight_from_memory(memory_point: str) -> float: - """从记忆点中获取权重""" - # 按照最右边的:符号进行分割,返回分割后的最后一个部分作为权重 - if not isinstance(memory_point, str): - return -math.inf - parts = memory_point.rsplit(":", 1) - if len(parts) <= 1: - return -math.inf - try: - return float(parts[-1].strip()) - except Exception: - return -math.inf - - -def get_memory_content_from_memory(memory_point: str) -> str: - """从记忆点中获取记忆内容""" - # 按:进行分割,去掉第一段和最后一段,返回中间部分作为记忆内容 - if not isinstance(memory_point, str): - return "" - parts = memory_point.split(":") - return ":".join(parts[1:-1]).strip() if len(parts) > 2 else "" - - -def extract_categories_from_response(response: str) -> list[str]: - """从response中提取所有<>包裹的内容""" - if not isinstance(response, str): - return [] - - import re - - pattern = r"<([^<>]+)>" - matches = re.findall(pattern, response) - return matches - - def calculate_string_similarity(s1: str, s2: str) -> float: """ 计算两个字符串的相似度 @@ -430,31 +376,6 @@ class Person: return deleted_count - def get_all_category(self): - category_list = [] - for memory in self.memory_points: - if memory is None: - continue - category = get_category_from_memory(memory) - if category and category not in category_list: - category_list.append(category) - return category_list - - def get_memory_list_by_category(self, category: str): - memory_list = [] - for memory in self.memory_points: - if memory is None: - continue - if get_category_from_memory(memory) == category: - memory_list.append(memory) - return memory_list - - def get_random_memory_by_category(self, category: str, num: int = 1): - memory_list = self.get_memory_list_by_category(category) - if len(memory_list) < num: - return memory_list - return random.sample(memory_list, num) - def add_group_nick_name(self, group_id: str, group_nick_name: str): """ 添加或更新群昵称 @@ -584,251 +505,6 @@ class Person: except Exception as e: logger.error(f"同步用户 {self.person_id} 信息到数据库时出错: {e}") - async def build_relationship(self, chat_content: str = "", info_type=""): - if not self.is_known: - return "" - # 构建points文本 - - nickname_str = "" - if self.person_name != self.nickname: - nickname_str = f"(ta在{self.platform}上的昵称是{self.nickname})" - - relation_info = "" - - points_text = "" - category_list = self.get_all_category() - - if chat_content: - prompt = f"""当前聊天内容: -{chat_content} - -分类列表: -{category_list} -**要求**:请你根据当前聊天内容,从以下分类中选择一个与聊天内容相关的分类,并用<>包裹输出,不要输出其他内容,不要输出引号或[],严格用<>包裹: -例如: -<分类1><分类2><分类3>...... -如果没有相关的分类,请输出""" - - generation_result = await relation_selection_model.generate_response(prompt) - response = generation_result.response - # print(prompt) - # print(response) - category_list = extract_categories_from_response(response) - if "none" not in category_list: - for category in category_list: - random_memory = self.get_random_memory_by_category(category, 2) - if random_memory: - random_memory_str = "\n".join( - [get_memory_content_from_memory(memory) for memory in random_memory] - ) - points_text = f"有关 {category} 的内容:{random_memory_str}" - break - elif info_type: - prompt = f"""你需要获取用户{self.person_name}的 **{info_type}** 信息。 - -现有信息类别列表: -{category_list} -**要求**:请你根据**{info_type}**,从以下分类中选择一个与**{info_type}**相关的分类,并用<>包裹输出,不要输出其他内容,不要输出引号或[],严格用<>包裹: -例如: -<分类1><分类2><分类3>...... -如果没有相关的分类,请输出""" - generation_result = await relation_selection_model.generate_response(prompt) - response = generation_result.response - # print(prompt) - # print(response) - category_list = extract_categories_from_response(response) - if "none" not in category_list: - for category in category_list: - random_memory = self.get_random_memory_by_category(category, 3) - if random_memory: - random_memory_str = "\n".join( - [get_memory_content_from_memory(memory) for memory in random_memory] - ) - points_text = f"有关 {category} 的内容:{random_memory_str}" - break - else: - for category in category_list: - random_memory = self.get_random_memory_by_category(category, 1)[0] - if random_memory: - points_text = f"有关 {category} 的内容:{get_memory_content_from_memory(random_memory)}" - break - - points_info = "" - if points_text: - points_info = f"你还记得有关{self.person_name}的内容:{points_text}" - - if not (nickname_str or points_info): - return "" - relation_info = f"{self.person_name}:{nickname_str}{points_info}" - - return relation_info - - -class PersonInfoManager: - def __init__(self): - self.person_name_list = {} - self.qv_name_llm = LLMServiceClient( - task_name="utils", request_type="relation.qv_name" - ) - try: - with get_db_session() as _: - pass - except Exception as e: - logger.error(f"数据库连接或 PersonInfo 表创建失败: {e}") - - # 初始化时读取所有person_name - try: - with get_db_session() as session: - statement = select(PersonInfo.person_id, PersonInfo.person_name).where( - col(PersonInfo.person_name).is_not(None) - ) - for person_id, person_name in session.exec(statement).all(): - if person_name: - self.person_name_list[person_id] = person_name - logger.debug(f"已加载 {len(self.person_name_list)} 个用户名称") - except Exception as e: - logger.error(f"加载 person_name_list 失败: {e}") - - @staticmethod - def _extract_json_from_text(text: str) -> Dict[str, str]: - """从文本中提取JSON数据的高容错方法""" - try: - fixed_json = repair_json(text) - if isinstance(fixed_json, str): - parsed_json = json.loads(fixed_json) - else: - parsed_json = fixed_json - - if isinstance(parsed_json, list) and parsed_json: - parsed_json = parsed_json[0] - - if isinstance(parsed_json, dict): - return parsed_json - - except Exception as e: - logger.warning(f"JSON提取失败: {e}") - - logger.warning(f"无法从文本中提取有效的JSON字典: {text}") - logger.info(f"文本: {text}") - return {"nickname": "", "reason": ""} - - async def _generate_unique_person_name(self, base_name: str) -> str: - """生成唯一的 person_name,如果存在重复则添加数字后缀""" - # 处理空昵称的情况 - if not base_name or base_name.isspace(): - base_name = "空格" - - # 检查基础名称是否已存在 - if base_name not in self.person_name_list.values(): - return base_name - - # 如果存在,添加数字后缀 - counter = 1 - while True: - new_name = f"{base_name}[{counter}]" - if new_name not in self.person_name_list.values(): - return new_name - counter += 1 - - async def qv_person_name( - self, person_id: str, user_nickname: str, user_cardname: str, user_avatar: str, request: str = "" - ): - """给某个用户取名""" - if not person_id: - logger.debug("取名失败:person_id不能为空") - return None - - person = Person(person_id=person_id) - old_name = person.person_name - old_reason = person.name_reason - - max_retries = 8 - current_try = 0 - existing_names_str = "" - current_name_set = set(self.person_name_list.values()) - - while current_try < max_retries: - bot_name = global_config.bot.nickname - - qv_name_prompt = f"你是{bot_name},一个伪装成人类的AI,你不能让别人发现这一点," - qv_name_prompt += f"现在你想给一个用户取一个昵称,用户的昵称是{user_nickname}," - qv_name_prompt += f"用户的群昵称名是{user_cardname}," - if user_avatar: - qv_name_prompt += f"用户的头像是{user_avatar}," - if old_name: - qv_name_prompt += f"你之前叫他{old_name},是因为{old_reason}," - - qv_name_prompt += f"\n其他取名的要求是:{request},不要太浮夸,简短," - qv_name_prompt += "\n请根据以上用户信息,想想你叫他什么比较好,不要太浮夸,请最好使用用户的昵称或群昵称原文,可以稍作修改,优先使用原文。优先使用用户的昵称或者群昵称原文。" - - if existing_names_str: - qv_name_prompt += f"\n请注意,以下名称已被你尝试过或已知存在,请避免:{existing_names_str}。\n" - - if len(current_name_set) < 50 and current_name_set: - qv_name_prompt += f"已知的其他昵称有: {', '.join(list(current_name_set)[:10])}等。\n" - - qv_name_prompt += "请用json给出你的想法,并给出理由,示例如下:" - qv_name_prompt += """{ - "nickname": "昵称", - "reason": "理由" - }""" - generation_result = await self.qv_name_llm.generate_response(qv_name_prompt) - response = generation_result.response - # logger.info(f"取名提示词:{qv_name_prompt}\n取名回复:{response}") - result = self._extract_json_from_text(response) - - if not result or not result.get("nickname"): - logger.error("生成的昵称为空或结果格式不正确,重试中...") - current_try += 1 - continue - - generated_nickname = result["nickname"] - - is_duplicate = False - if generated_nickname in current_name_set: - is_duplicate = True - logger.info(f"尝试给用户{user_nickname} {person_id} 取名,但是 {generated_nickname} 已存在,重试中...") - else: - - def _db_check_name_exists_sync(name_to_check): - with get_db_session() as session: - statement = select(PersonInfo.person_id).where(col(PersonInfo.person_name) == name_to_check) - return session.exec(statement).first() is not None - - if await asyncio.to_thread(_db_check_name_exists_sync, generated_nickname): - is_duplicate = True - current_name_set.add(generated_nickname) - - if not is_duplicate: - person.person_name = generated_nickname - person.name_reason = result.get("reason", "未提供理由") - person.sync_to_database() - - logger.info( - f"成功给用户{user_nickname} {person_id} 取名 {generated_nickname},理由:{result.get('reason', '未提供理由')}" - ) - - self.person_name_list[person_id] = generated_nickname - return result - else: - if existing_names_str: - existing_names_str += "、" - existing_names_str += generated_nickname - logger.debug(f"生成的昵称 {generated_nickname} 已存在,重试中...") - current_try += 1 - - # 如果多次尝试后仍未成功,使用唯一的 user_nickname 作为默认值 - unique_nickname = await self._generate_unique_person_name(user_nickname) - logger.warning(f"在{max_retries}次尝试后未能生成唯一昵称,使用默认昵称 {unique_nickname}") - person.person_name = unique_nickname - person.name_reason = "使用用户原始昵称作为默认值" - person.sync_to_database() - self.person_name_list[person_id] = unique_nickname - return {"nickname": unique_nickname, "reason": "使用用户原始昵称作为默认值"} - - -person_info_manager = PersonInfoManager() - async def store_person_memory_from_answer(person_name: str, memory_content: str, chat_id: str) -> None: """将人物事实写入长期记忆系统。