remove:无用代码

This commit is contained in:
SengokuCola
2026-04-23 16:02:32 +08:00
parent 35ff91d134
commit 03750cdb6b

View File

@@ -1,15 +1,10 @@
from datetime import datetime
from typing import Dict, Optional, Union
from typing import Optional, Union
import asyncio
import hashlib
import json
import math
import random
import time
from json_repair import repair_json
from sqlmodel import col, select
from src.chat.message_receive.chat_manager import chat_manager as _chat_manager
@@ -19,15 +14,10 @@ from src.common.database.database_model import PersonInfo
from src.common.logger import get_logger
from src.config.config import global_config
from src.services.memory_service import memory_service
from src.services.llm_service import LLMServiceClient
logger = get_logger("person_info")
relation_selection_model = LLMServiceClient(
task_name="utils", request_type="relation_selection"
)
def _to_group_cardname_records(group_cardname_json: Optional[str]) -> list[dict[str, str]]:
"""将数据库中的群名片 JSON 转换为 `Person` 内部使用的结构。
@@ -133,50 +123,6 @@ def is_person_known(
return False
def get_category_from_memory(memory_point: str) -> Optional[str]:
"""从记忆点中获取分类"""
# 按照最左边的:符号进行分割,返回分割后的第一个部分作为分类
if not isinstance(memory_point, str):
return None
parts = memory_point.split(":", 1)
return parts[0].strip() if len(parts) > 1 else None
def get_weight_from_memory(memory_point: str) -> float:
"""从记忆点中获取权重"""
# 按照最右边的:符号进行分割,返回分割后的最后一个部分作为权重
if not isinstance(memory_point, str):
return -math.inf
parts = memory_point.rsplit(":", 1)
if len(parts) <= 1:
return -math.inf
try:
return float(parts[-1].strip())
except Exception:
return -math.inf
def get_memory_content_from_memory(memory_point: str) -> str:
"""从记忆点中获取记忆内容"""
# 按:进行分割,去掉第一段和最后一段,返回中间部分作为记忆内容
if not isinstance(memory_point, str):
return ""
parts = memory_point.split(":")
return ":".join(parts[1:-1]).strip() if len(parts) > 2 else ""
def extract_categories_from_response(response: str) -> list[str]:
"""从response中提取所有<>包裹的内容"""
if not isinstance(response, str):
return []
import re
pattern = r"<([^<>]+)>"
matches = re.findall(pattern, response)
return matches
def calculate_string_similarity(s1: str, s2: str) -> float:
"""
计算两个字符串的相似度
@@ -430,31 +376,6 @@ class Person:
return deleted_count
def get_all_category(self):
category_list = []
for memory in self.memory_points:
if memory is None:
continue
category = get_category_from_memory(memory)
if category and category not in category_list:
category_list.append(category)
return category_list
def get_memory_list_by_category(self, category: str):
memory_list = []
for memory in self.memory_points:
if memory is None:
continue
if get_category_from_memory(memory) == category:
memory_list.append(memory)
return memory_list
def get_random_memory_by_category(self, category: str, num: int = 1):
memory_list = self.get_memory_list_by_category(category)
if len(memory_list) < num:
return memory_list
return random.sample(memory_list, num)
def add_group_nick_name(self, group_id: str, group_nick_name: str):
"""
添加或更新群昵称
@@ -584,251 +505,6 @@ class Person:
except Exception as e:
logger.error(f"同步用户 {self.person_id} 信息到数据库时出错: {e}")
async def build_relationship(self, chat_content: str = "", info_type=""):
if not self.is_known:
return ""
# 构建points文本
nickname_str = ""
if self.person_name != self.nickname:
nickname_str = f"(ta在{self.platform}上的昵称是{self.nickname})"
relation_info = ""
points_text = ""
category_list = self.get_all_category()
if chat_content:
prompt = f"""当前聊天内容:
{chat_content}
分类列表:
{category_list}
**要求**:请你根据当前聊天内容,从以下分类中选择一个与聊天内容相关的分类,并用<>包裹输出,不要输出其他内容,不要输出引号或[],严格用<>包裹:
例如:
<分类1><分类2><分类3>......
如果没有相关的分类,请输出<none>"""
generation_result = await relation_selection_model.generate_response(prompt)
response = generation_result.response
# print(prompt)
# print(response)
category_list = extract_categories_from_response(response)
if "none" not in category_list:
for category in category_list:
random_memory = self.get_random_memory_by_category(category, 2)
if random_memory:
random_memory_str = "\n".join(
[get_memory_content_from_memory(memory) for memory in random_memory]
)
points_text = f"有关 {category} 的内容:{random_memory_str}"
break
elif info_type:
prompt = f"""你需要获取用户{self.person_name}的 **{info_type}** 信息。
现有信息类别列表:
{category_list}
**要求**:请你根据**{info_type}**,从以下分类中选择一个与**{info_type}**相关的分类,并用<>包裹输出,不要输出其他内容,不要输出引号或[],严格用<>包裹:
例如:
<分类1><分类2><分类3>......
如果没有相关的分类,请输出<none>"""
generation_result = await relation_selection_model.generate_response(prompt)
response = generation_result.response
# print(prompt)
# print(response)
category_list = extract_categories_from_response(response)
if "none" not in category_list:
for category in category_list:
random_memory = self.get_random_memory_by_category(category, 3)
if random_memory:
random_memory_str = "\n".join(
[get_memory_content_from_memory(memory) for memory in random_memory]
)
points_text = f"有关 {category} 的内容:{random_memory_str}"
break
else:
for category in category_list:
random_memory = self.get_random_memory_by_category(category, 1)[0]
if random_memory:
points_text = f"有关 {category} 的内容:{get_memory_content_from_memory(random_memory)}"
break
points_info = ""
if points_text:
points_info = f"你还记得有关{self.person_name}的内容:{points_text}"
if not (nickname_str or points_info):
return ""
relation_info = f"{self.person_name}:{nickname_str}{points_info}"
return relation_info
class PersonInfoManager:
def __init__(self):
self.person_name_list = {}
self.qv_name_llm = LLMServiceClient(
task_name="utils", request_type="relation.qv_name"
)
try:
with get_db_session() as _:
pass
except Exception as e:
logger.error(f"数据库连接或 PersonInfo 表创建失败: {e}")
# 初始化时读取所有person_name
try:
with get_db_session() as session:
statement = select(PersonInfo.person_id, PersonInfo.person_name).where(
col(PersonInfo.person_name).is_not(None)
)
for person_id, person_name in session.exec(statement).all():
if person_name:
self.person_name_list[person_id] = person_name
logger.debug(f"已加载 {len(self.person_name_list)} 个用户名称")
except Exception as e:
logger.error(f"加载 person_name_list 失败: {e}")
@staticmethod
def _extract_json_from_text(text: str) -> Dict[str, str]:
"""从文本中提取JSON数据的高容错方法"""
try:
fixed_json = repair_json(text)
if isinstance(fixed_json, str):
parsed_json = json.loads(fixed_json)
else:
parsed_json = fixed_json
if isinstance(parsed_json, list) and parsed_json:
parsed_json = parsed_json[0]
if isinstance(parsed_json, dict):
return parsed_json
except Exception as e:
logger.warning(f"JSON提取失败: {e}")
logger.warning(f"无法从文本中提取有效的JSON字典: {text}")
logger.info(f"文本: {text}")
return {"nickname": "", "reason": ""}
async def _generate_unique_person_name(self, base_name: str) -> str:
"""生成唯一的 person_name如果存在重复则添加数字后缀"""
# 处理空昵称的情况
if not base_name or base_name.isspace():
base_name = "空格"
# 检查基础名称是否已存在
if base_name not in self.person_name_list.values():
return base_name
# 如果存在,添加数字后缀
counter = 1
while True:
new_name = f"{base_name}[{counter}]"
if new_name not in self.person_name_list.values():
return new_name
counter += 1
async def qv_person_name(
self, person_id: str, user_nickname: str, user_cardname: str, user_avatar: str, request: str = ""
):
"""给某个用户取名"""
if not person_id:
logger.debug("取名失败person_id不能为空")
return None
person = Person(person_id=person_id)
old_name = person.person_name
old_reason = person.name_reason
max_retries = 8
current_try = 0
existing_names_str = ""
current_name_set = set(self.person_name_list.values())
while current_try < max_retries:
bot_name = global_config.bot.nickname
qv_name_prompt = f"你是{bot_name}一个伪装成人类的AI你不能让别人发现这一点"
qv_name_prompt += f"现在你想给一个用户取一个昵称,用户的昵称是{user_nickname}"
qv_name_prompt += f"用户的群昵称名是{user_cardname}"
if user_avatar:
qv_name_prompt += f"用户的头像是{user_avatar}"
if old_name:
qv_name_prompt += f"你之前叫他{old_name},是因为{old_reason}"
qv_name_prompt += f"\n其他取名的要求是:{request},不要太浮夸,简短,"
qv_name_prompt += "\n请根据以上用户信息,想想你叫他什么比较好,不要太浮夸,请最好使用用户的昵称或群昵称原文,可以稍作修改,优先使用原文。优先使用用户的昵称或者群昵称原文。"
if existing_names_str:
qv_name_prompt += f"\n请注意,以下名称已被你尝试过或已知存在,请避免:{existing_names_str}\n"
if len(current_name_set) < 50 and current_name_set:
qv_name_prompt += f"已知的其他昵称有: {', '.join(list(current_name_set)[:10])}等。\n"
qv_name_prompt += "请用json给出你的想法并给出理由示例如下"
qv_name_prompt += """{
"nickname": "昵称",
"reason": "理由"
}"""
generation_result = await self.qv_name_llm.generate_response(qv_name_prompt)
response = generation_result.response
# logger.info(f"取名提示词:{qv_name_prompt}\n取名回复{response}")
result = self._extract_json_from_text(response)
if not result or not result.get("nickname"):
logger.error("生成的昵称为空或结果格式不正确,重试中...")
current_try += 1
continue
generated_nickname = result["nickname"]
is_duplicate = False
if generated_nickname in current_name_set:
is_duplicate = True
logger.info(f"尝试给用户{user_nickname} {person_id} 取名,但是 {generated_nickname} 已存在,重试中...")
else:
def _db_check_name_exists_sync(name_to_check):
with get_db_session() as session:
statement = select(PersonInfo.person_id).where(col(PersonInfo.person_name) == name_to_check)
return session.exec(statement).first() is not None
if await asyncio.to_thread(_db_check_name_exists_sync, generated_nickname):
is_duplicate = True
current_name_set.add(generated_nickname)
if not is_duplicate:
person.person_name = generated_nickname
person.name_reason = result.get("reason", "未提供理由")
person.sync_to_database()
logger.info(
f"成功给用户{user_nickname} {person_id} 取名 {generated_nickname},理由:{result.get('reason', '未提供理由')}"
)
self.person_name_list[person_id] = generated_nickname
return result
else:
if existing_names_str:
existing_names_str += ""
existing_names_str += generated_nickname
logger.debug(f"生成的昵称 {generated_nickname} 已存在,重试中...")
current_try += 1
# 如果多次尝试后仍未成功,使用唯一的 user_nickname 作为默认值
unique_nickname = await self._generate_unique_person_name(user_nickname)
logger.warning(f"{max_retries}次尝试后未能生成唯一昵称,使用默认昵称 {unique_nickname}")
person.person_name = unique_nickname
person.name_reason = "使用用户原始昵称作为默认值"
person.sync_to_database()
self.person_name_list[person_id] = unique_nickname
return {"nickname": unique_nickname, "reason": "使用用户原始昵称作为默认值"}
person_info_manager = PersonInfoManager()
async def store_person_memory_from_answer(person_name: str, memory_content: str, chat_id: str) -> None:
"""将人物事实写入长期记忆系统。