Final Commit Before Rdev

This commit is contained in:
UnCLAS-Prommer
2026-03-11 00:14:18 +08:00
committed by SengokuCola
parent e1e296491c
commit 8b9cda4296
10 changed files with 662 additions and 1348 deletions

View File

@@ -1,3 +1,4 @@
# TODO: 完全删除此文件,将所有方法该合并的合并。
import time
import random
import re
@@ -19,7 +20,7 @@ install(extra_lines=3)
logger = get_logger("chat_message_builder")
def replace_user_references(
def replace_user_references( # TODO: 整合此函数
content: Optional[str],
platform: str,
name_resolver: Optional[Callable[[str, str], str]] = None,
@@ -262,102 +263,103 @@ def get_actions_by_timestamp_with_chat_inclusive(
return [action.model_dump() for action in actions]
def get_raw_msg_by_timestamp_random(
timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest"
) -> List[DatabaseMessages]:
"""
先在范围时间戳内随机选择一条消息取得消息的chat_id然后根据chat_id获取该聊天在指定时间戳范围内的消息
"""
# 获取所有消息只取chat_id字段
all_msgs = get_raw_msg_by_timestamp(timestamp_start, timestamp_end)
if not all_msgs:
return []
# 随机选一条
msg = random.choice(all_msgs)
chat_id = msg.chat_id
timestamp_start = msg.time
# 用 chat_id 获取该聊天在指定时间戳范围内的消息
return get_raw_msg_by_timestamp_with_chat(chat_id, timestamp_start, timestamp_end, limit, "earliest")
# TODO: 整合为统一函数由参数控制仿照build_readable_message
# def get_raw_msg_by_timestamp_random(
# timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest"
# ) -> List[DatabaseMessages]:
# """
# 先在范围时间戳内随机选择一条消息取得消息的chat_id然后根据chat_id获取该聊天在指定时间戳范围内的消息
# """
# # 获取所有消息只取chat_id字段
# all_msgs = get_raw_msg_by_timestamp(timestamp_start, timestamp_end)
# if not all_msgs:
# return []
# # 随机选一条
# msg = random.choice(all_msgs)
# chat_id = msg.chat_id
# timestamp_start = msg.time
# # 用 chat_id 获取该聊天在指定时间戳范围内的消息
# return get_raw_msg_by_timestamp_with_chat(chat_id, timestamp_start, timestamp_end, limit, "earliest")
def get_raw_msg_by_timestamp_with_users(
timestamp_start: float, timestamp_end: float, person_ids: List[str], limit: int = 0, limit_mode: str = "latest"
) -> List[DatabaseMessages]:
"""获取某些特定用户在 *所有聊天* 中从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制
limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'
"""
filter_query = {"time": {"$gt": timestamp_start, "$lt": timestamp_end}, "user_id": {"$in": person_ids}}
# 只有当 limit 为 0 时才应用外部 sort
sort_order = [("time", 1)] if limit == 0 else None
return find_messages(message_filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode)
# def get_raw_msg_by_timestamp_with_users(
# timestamp_start: float, timestamp_end: float, person_ids: List[str], limit: int = 0, limit_mode: str = "latest"
# ) -> List[DatabaseMessages]:
# """获取某些特定用户在 *所有聊天* 中从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表
# limit: 限制返回的消息数量0为不限制
# limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'。
# """
# filter_query = {"time": {"$gt": timestamp_start, "$lt": timestamp_end}, "user_id": {"$in": person_ids}}
# # 只有当 limit 为 0 时才应用外部 sort
# sort_order = [("time", 1)] if limit == 0 else None
# return find_messages(message_filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode)
def get_raw_msg_before_timestamp(timestamp: float, limit: int = 0) -> List[DatabaseMessages]:
"""获取指定时间戳之前的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制
"""
filter_query = {"time": {"$lt": timestamp}}
sort_order = [("time", 1)]
return find_messages(message_filter=filter_query, sort=sort_order, limit=limit)
# def get_raw_msg_before_timestamp(timestamp: float, limit: int = 0) -> List[DatabaseMessages]:
# """获取指定时间戳之前的消息,按时间升序排序,返回消息列表
# limit: 限制返回的消息数量0为不限制
# """
# filter_query = {"time": {"$lt": timestamp}}
# sort_order = [("time", 1)]
# return find_messages(message_filter=filter_query, sort=sort_order, limit=limit)
def get_raw_msg_before_timestamp_with_chat(
chat_id: str, timestamp: float, limit: int = 0, filter_intercept_message_level: Optional[int] = None
) -> List[DatabaseMessages]:
"""获取指定时间戳之前的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制
"""
filter_query = {"chat_id": chat_id, "time": {"$lt": timestamp}}
sort_order = [("time", 1)]
return find_messages(
message_filter=filter_query,
sort=sort_order,
limit=limit,
filter_intercept_message_level=filter_intercept_message_level,
)
# def get_raw_msg_before_timestamp_with_chat(
# chat_id: str, timestamp: float, limit: int = 0, filter_intercept_message_level: Optional[int] = None
# ) -> List[DatabaseMessages]:
# """获取指定时间戳之前的消息,按时间升序排序,返回消息列表
# limit: 限制返回的消息数量0为不限制
# """
# filter_query = {"chat_id": chat_id, "time": {"$lt": timestamp}}
# sort_order = [("time", 1)]
# return find_messages(
# message_filter=filter_query,
# sort=sort_order,
# limit=limit,
# filter_intercept_message_level=filter_intercept_message_level,
# )
def get_raw_msg_before_timestamp_with_users(
timestamp: float, person_ids: List[str], limit: int = 0
) -> List[DatabaseMessages]:
"""获取指定时间戳之前的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制
"""
filter_query = {"time": {"$lt": timestamp}, "user_id": {"$in": person_ids}}
sort_order = [("time", 1)]
return find_messages(message_filter=filter_query, sort=sort_order, limit=limit)
# def get_raw_msg_before_timestamp_with_users(
# timestamp: float, person_ids: List[str], limit: int = 0
# ) -> List[DatabaseMessages]:
# """获取指定时间戳之前的消息,按时间升序排序,返回消息列表
# limit: 限制返回的消息数量0为不限制
# """
# filter_query = {"time": {"$lt": timestamp}, "user_id": {"$in": person_ids}}
# sort_order = [("time", 1)]
# return find_messages(message_filter=filter_query, sort=sort_order, limit=limit)
def num_new_messages_since(chat_id: str, timestamp_start: float = 0.0, timestamp_end: Optional[float] = None) -> int:
"""
检查特定聊天从 timestamp_start (不含) 到 timestamp_end (不含) 之间有多少新消息。
如果 timestamp_end 为 None则检查从 timestamp_start (不含) 到当前时间的消息。
"""
# 确定有效的结束时间戳
_timestamp_end = timestamp_end if timestamp_end is not None else time.time()
# def num_new_messages_since(chat_id: str, timestamp_start: float = 0.0, timestamp_end: Optional[float] = None) -> int:
# """
# 检查特定聊天从 timestamp_start (不含) 到 timestamp_end (不含) 之间有多少新消息。
# 如果 timestamp_end 为 None则检查从 timestamp_start (不含) 到当前时间的消息。
# """
# # 确定有效的结束时间戳
# _timestamp_end = timestamp_end if timestamp_end is not None else time.time()
# 确保 timestamp_start < _timestamp_end
if timestamp_start >= _timestamp_end:
# logger.warning(f"timestamp_start ({timestamp_start}) must be less than _timestamp_end ({_timestamp_end}). Returning 0.")
return 0 # 起始时间大于等于结束时间,没有新消息
# # 确保 timestamp_start < _timestamp_end
# if timestamp_start >= _timestamp_end:
# # logger.warning(f"timestamp_start ({timestamp_start}) must be less than _timestamp_end ({_timestamp_end}). Returning 0.")
# return 0 # 起始时间大于等于结束时间,没有新消息
filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": _timestamp_end}}
return count_messages(message_filter=filter_query)
# filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": _timestamp_end}}
# return count_messages(message_filter=filter_query)
def num_new_messages_since_with_users(
chat_id: str, timestamp_start: float, timestamp_end: float, person_ids: List[str]
) -> int:
"""检查某些特定用户在特定聊天在指定时间戳之间有多少新消息"""
if not person_ids: # 保持空列表检查
return 0
filter_query = {
"chat_id": chat_id,
"time": {"$gt": timestamp_start, "$lt": timestamp_end},
"user_id": {"$in": person_ids},
}
return count_messages(message_filter=filter_query)
# def num_new_messages_since_with_users(
# chat_id: str, timestamp_start: float, timestamp_end: float, person_ids: List[str]
# ) -> int:
# """检查某些特定用户在特定聊天在指定时间戳之间有多少新消息"""
# if not person_ids: # 保持空列表检查
# return 0
# filter_query = {
# "chat_id": chat_id,
# "time": {"$gt": timestamp_start, "$lt": timestamp_end},
# "user_id": {"$in": person_ids},
# }
# return count_messages(message_filter=filter_query)
def _build_readable_messages_internal(
@@ -563,40 +565,41 @@ def _build_readable_messages_internal(
)
def build_pic_mapping_info(pic_id_mapping: Dict[str, str]) -> str:
# sourcery skip: use-contextlib-suppress
"""
构建图片映射信息字符串,显示图片的具体描述内容
# 由MessageUtils._extract_pictures_from_message替代
# def build_pic_mapping_info(pic_id_mapping: Dict[str, str]) -> str:
# # sourcery skip: use-contextlib-suppress
# """
# 构建图片映射信息字符串,显示图片的具体描述内容
Args:
pic_id_mapping: 图片ID到显示名称的映射字典
# Args:
# pic_id_mapping: 图片ID到显示名称的映射字典
Returns:
格式化的映射信息字符串
"""
if not pic_id_mapping:
return ""
# Returns:
# 格式化的映射信息字符串
# """
# if not pic_id_mapping:
# return ""
mapping_lines = []
# mapping_lines = []
# 按图片编号排序
sorted_items = sorted(pic_id_mapping.items(), key=lambda x: int(x[1].replace("图片", "")))
# # 按图片编号排序
# sorted_items = sorted(pic_id_mapping.items(), key=lambda x: int(x[1].replace("图片", "")))
for pic_id, display_name in sorted_items:
# 从数据库中获取图片描述
description = "内容正在阅读,请稍等"
try:
with get_db_session() as session:
image = session.get(Images, int(pic_id)) if pic_id.isdigit() else None
if image and image.description:
description = image.description
except Exception:
# 如果查询失败,保持默认描述
pass
# for pic_id, display_name in sorted_items:
# # 从数据库中获取图片描述
# description = "内容正在阅读,请稍等"
# try:
# with get_db_session() as session:
# image = session.get(Images, int(pic_id)) if pic_id.isdigit() else None
# if image and image.description:
# description = image.description
# except Exception:
# # 如果查询失败,保持默认描述
# pass
mapping_lines.append(f"[{display_name}] 的内容:{description}")
# mapping_lines.append(f"[{display_name}] 的内容:{description}")
return "\n".join(mapping_lines)
# return "\n".join(mapping_lines)
def build_readable_actions(actions: List[DatabaseActionRecords], mode: str = "relative") -> str:
@@ -646,68 +649,69 @@ def build_readable_actions(actions: List[DatabaseActionRecords], mode: str = "re
return "\n".join(output_lines)
async def build_readable_messages_with_list(
messages: List[DatabaseMessages],
replace_bot_name: bool = True,
timestamp_mode: str = "relative",
truncate: bool = False,
pic_single: bool = False,
) -> Tuple[str, List[Tuple[float, str, str]]]:
"""
将消息列表转换为可读的文本格式,并返回原始(时间戳, 昵称, 内容)列表。
允许通过参数控制格式化行为。
"""
formatted_string, details_list, pic_id_mapping, _ = _build_readable_messages_internal(
messages,
replace_bot_name,
timestamp_mode,
truncate,
pic_id_mapping=None,
pic_counter=1,
show_pic=True,
message_id_list=None,
pic_single=pic_single,
long_time_notice=False,
)
# 由MessageUtils里面的build_readable_message替代
# async def build_readable_messages_with_list(
# messages: List[DatabaseMessages],
# replace_bot_name: bool = True,
# timestamp_mode: str = "relative",
# truncate: bool = False,
# pic_single: bool = False,
# ) -> Tuple[str, List[Tuple[float, str, str]]]:
# """
# 将消息列表转换为可读的文本格式,并返回原始(时间戳, 昵称, 内容)列表。
# 允许通过参数控制格式化行为。
# """
# formatted_string, details_list, pic_id_mapping, _ = _build_readable_messages_internal(
# messages,
# replace_bot_name,
# timestamp_mode,
# truncate,
# pic_id_mapping=None,
# pic_counter=1,
# show_pic=True,
# message_id_list=None,
# pic_single=pic_single,
# long_time_notice=False,
# )
if not pic_single:
if pic_mapping_info := build_pic_mapping_info(pic_id_mapping):
formatted_string = f"{pic_mapping_info}\n\n{formatted_string}"
# if not pic_single:
# if pic_mapping_info := build_pic_mapping_info(pic_id_mapping):
# formatted_string = f"{pic_mapping_info}\n\n{formatted_string}"
return formatted_string, details_list
# return formatted_string, details_list
# 由MessageUtils里面的build_readable_message替代
# def build_readable_messages_with_id(
# messages: List[DatabaseMessages],
# replace_bot_name: bool = True,
# timestamp_mode: str = "relative",
# read_mark: float = 0.0,
# truncate: bool = False,
# show_actions: bool = False,
# show_pic: bool = True,
# remove_emoji_stickers: bool = False,
# pic_single: bool = False,
# ) -> Tuple[str, List[Tuple[str, DatabaseMessages]]]:
# """
# 将消息列表转换为可读的文本格式,并返回原始(时间戳, 昵称, 内容)列表。
# 允许通过参数控制格式化行为。
# """
# message_id_list = assign_message_ids(messages)
def build_readable_messages_with_id(
messages: List[DatabaseMessages],
replace_bot_name: bool = True,
timestamp_mode: str = "relative",
read_mark: float = 0.0,
truncate: bool = False,
show_actions: bool = False,
show_pic: bool = True,
remove_emoji_stickers: bool = False,
pic_single: bool = False,
) -> Tuple[str, List[Tuple[str, DatabaseMessages]]]:
"""
将消息列表转换为可读的文本格式,并返回原始(时间戳, 昵称, 内容)列表。
允许通过参数控制格式化行为。
"""
message_id_list = assign_message_ids(messages)
# formatted_string = build_readable_messages(
# messages=messages,
# replace_bot_name=replace_bot_name,
# timestamp_mode=timestamp_mode,
# truncate=truncate,
# show_actions=show_actions,
# show_pic=show_pic,
# read_mark=read_mark,
# message_id_list=message_id_list,
# remove_emoji_stickers=remove_emoji_stickers,
# pic_single=pic_single,
# )
formatted_string = build_readable_messages(
messages=messages,
replace_bot_name=replace_bot_name,
timestamp_mode=timestamp_mode,
truncate=truncate,
show_actions=show_actions,
show_pic=show_pic,
read_mark=read_mark,
message_id_list=message_id_list,
remove_emoji_stickers=remove_emoji_stickers,
pic_single=pic_single,
)
return formatted_string, message_id_list
# return formatted_string, message_id_list
def build_readable_messages(
@@ -903,111 +907,112 @@ def build_readable_messages(
return "".join(result_parts)
async def build_anonymous_messages(messages: List[DatabaseMessages], show_ids: bool = False) -> str:
"""
构建匿名可读消息将不同人的名称转为唯一占位符A、B、C...bot自己用SELF。
处理 回复<aaa:bbb> 和 @<aaa:bbb> 字段将bbb映射为匿名占位符。
"""
if not messages:
logger.warning("没有消息,无法构建匿名消息")
return ""
# 由MessageUtils里面的build_readable_message替代
# async def build_anonymous_messages(messages: List[DatabaseMessages], show_ids: bool = False) -> str:
# """
# 构建匿名可读消息将不同人的名称转为唯一占位符A、B、C...bot自己用SELF。
# 处理 回复<aaa:bbb> 和 @<aaa:bbb> 字段将bbb映射为匿名占位符。
# """
# if not messages:
# logger.warning("没有消息,无法构建匿名消息")
# return ""
person_map = {}
current_char = ord("A")
output_lines = []
# person_map = {}
# current_char = ord("A")
# output_lines = []
# 图片ID映射字典
pic_id_mapping = {}
pic_counter = 1
# # 图片ID映射字典
# pic_id_mapping = {}
# pic_counter = 1
def process_pic_ids(content: str) -> str:
"""处理内容中的图片ID将其替换为[图片x]格式"""
nonlocal pic_counter
# def process_pic_ids(content: str) -> str:
# """处理内容中的图片ID将其替换为[图片x]格式"""
# nonlocal pic_counter
# 匹配 [picid:xxxxx] 格式
pic_pattern = r"\[picid:([^\]]+)\]"
# # 匹配 [picid:xxxxx] 格式
# pic_pattern = r"\[picid:([^\]]+)\]"
def replace_pic_id(match):
nonlocal pic_counter
pic_id = match.group(1)
# def replace_pic_id(match):
# nonlocal pic_counter
# pic_id = match.group(1)
if pic_id not in pic_id_mapping:
pic_id_mapping[pic_id] = f"图片{pic_counter}"
pic_counter += 1
# if pic_id not in pic_id_mapping:
# pic_id_mapping[pic_id] = f"图片{pic_counter}"
# pic_counter += 1
return f"[{pic_id_mapping[pic_id]}]"
# return f"[{pic_id_mapping[pic_id]}]"
return re.sub(pic_pattern, replace_pic_id, content)
# return re.sub(pic_pattern, replace_pic_id, content)
def get_anon_name(platform, user_id):
# print(f"get_anon_name: platform:{platform}, user_id:{user_id}")
# print(f"global_config.bot.qq_account:{global_config.bot.qq_account}")
# def get_anon_name(platform, user_id):
# # print(f"get_anon_name: platform:{platform}, user_id:{user_id}")
# # print(f"global_config.bot.qq_account:{global_config.bot.qq_account}")
if (platform == "qq" and user_id == global_config.bot.qq_account) or (
platform == "telegram" and user_id == getattr(global_config.bot, "telegram_account", "")
):
# print("SELF11111111111111")
return "SELF"
try:
person_id = get_person_id(platform, user_id)
except Exception as _e:
person_id = None
if not person_id:
return "?"
if person_id not in person_map:
nonlocal current_char
person_map[person_id] = chr(current_char)
current_char += 1
return person_map[person_id]
# if (platform == "qq" and user_id == global_config.bot.qq_account) or (
# platform == "telegram" and user_id == getattr(global_config.bot, "telegram_account", "")
# ):
# # print("SELF11111111111111")
# return "SELF"
# try:
# person_id = get_person_id(platform, user_id)
# except Exception as _e:
# person_id = None
# if not person_id:
# return "?"
# if person_id not in person_map:
# nonlocal current_char
# person_map[person_id] = chr(current_char)
# current_char += 1
# return person_map[person_id]
for i, msg in enumerate(messages):
try:
platform = msg.chat_info.platform
user_id = msg.user_info.user_id
content = msg.display_message or msg.processed_plain_text or ""
# for i, msg in enumerate(messages):
# try:
# platform = msg.chat_info.platform
# user_id = msg.user_info.user_id
# content = msg.display_message or msg.processed_plain_text or ""
# 处理图片ID
content = process_pic_ids(content)
# # 处理图片ID
# content = process_pic_ids(content)
anon_name = get_anon_name(platform, user_id)
# print(f"anon_name:{anon_name}")
# anon_name = get_anon_name(platform, user_id)
# # print(f"anon_name:{anon_name}")
# 使用独立函数处理用户引用格式,传入自定义的匿名名称解析器
def anon_name_resolver(platform: str, user_id: str) -> str:
try:
return get_anon_name(platform, user_id)
except Exception:
return "?"
# # 使用独立函数处理用户引用格式,传入自定义的匿名名称解析器
# def anon_name_resolver(platform: str, user_id: str) -> str:
# try:
# return get_anon_name(platform, user_id)
# except Exception:
# return "?"
content = replace_user_references(content, platform, anon_name_resolver, replace_bot_name=False)
# content = replace_user_references(content, platform, anon_name_resolver, replace_bot_name=False)
# 构建消息头如果启用show_ids则添加序号
if show_ids:
header = f"[{i + 1}] {anon_name}"
else:
header = f"{anon_name}"
# # 构建消息头如果启用show_ids则添加序号
# if show_ids:
# header = f"[{i + 1}] {anon_name}说 "
# else:
# header = f"{anon_name}说 "
output_lines.append(header)
stripped_line = content.strip()
if stripped_line:
if stripped_line.endswith(""):
stripped_line = stripped_line[:-1]
output_lines.append(f"{stripped_line}")
# print(f"output_lines:{output_lines}")
output_lines.append("\n")
except Exception:
continue
# output_lines.append(header)
# stripped_line = content.strip()
# if stripped_line:
# if stripped_line.endswith("。"):
# stripped_line = stripped_line[:-1]
# output_lines.append(f"{stripped_line}")
# # print(f"output_lines:{output_lines}")
# output_lines.append("\n")
# except Exception:
# continue
# 在最前面添加图片映射信息
final_output_lines = []
pic_mapping_info = build_pic_mapping_info(pic_id_mapping)
if pic_mapping_info:
final_output_lines.append(pic_mapping_info)
final_output_lines.append("\n\n")
# # 在最前面添加图片映射信息
# final_output_lines = []
# pic_mapping_info = build_pic_mapping_info(pic_id_mapping)
# if pic_mapping_info:
# final_output_lines.append(pic_mapping_info)
# final_output_lines.append("\n\n")
final_output_lines.extend(output_lines)
formatted_string = "".join(final_output_lines).strip()
return formatted_string
# final_output_lines.extend(output_lines)
# formatted_string = "".join(final_output_lines).strip()
# return formatted_string
async def get_person_id_list(messages: List[Dict[str, Any]]) -> List[str]:

View File

@@ -523,7 +523,7 @@ def process_llm_response(text: str, enable_splitter: bool = True, enable_chinese
def calculate_typing_time(
input_string: str,
thinking_start_time: float,
# thinking_start_time: float,
chinese_time: float = 0.3,
english_time: float = 0.15,
is_emoji: bool = False,
@@ -556,8 +556,8 @@ def calculate_typing_time(
if is_emoji:
total_time = 1
if time.time() - thinking_start_time > 10:
total_time = 1
# if time.time() - thinking_start_time > 10:
# total_time = 1
# print(f"thinking_start_time:{thinking_start_time}")
# print(f"nowtime:{time.time()}")