From f17b85c1bd8e715a6e55b67e6605b7b65a0e2f2f Mon Sep 17 00:00:00 2001 From: UnCLAS-Prommer Date: Wed, 11 Mar 2026 22:28:13 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4chat=5Fmessage=5Fbuilder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bw_learner/expression_learner.py | 9 +- src/chat/utils/chat_message_builder.py | 1083 ----------------- .../data_models/action_record_data_model.py | 65 + src/common/utils/system_utils.py | 8 + src/common/utils/utils_action.py | 32 + src/common/utils/utils_image.py | 2 +- src/common/utils/utils_message.py | 101 +- src/common/utils/utils_voice.py | 2 +- 8 files changed, 209 insertions(+), 1093 deletions(-) delete mode 100644 src/chat/utils/chat_message_builder.py create mode 100644 src/common/data_models/action_record_data_model.py create mode 100644 src/common/utils/system_utils.py create mode 100644 src/common/utils/utils_action.py diff --git a/src/bw_learner/expression_learner.py b/src/bw_learner/expression_learner.py index abe46cbd..37ac5a68 100644 --- a/src/bw_learner/expression_learner.py +++ b/src/bw_learner/expression_learner.py @@ -88,7 +88,14 @@ class ExpressionLearner: def _check_cached_jargons_in_messages(self, jargon_miner: Optional["JargonMiner"] = None): if not jargon_miner: return [] - # TODO: 完成检测逻辑 + # 获取缓存的所有jargon实例 + cached_jargons = jargon_miner.get_cached_jargons() + if not cached_jargons: + return [] + matched_entries: List[Tuple[str, str]] = [] + + for i, msg in enumerate(self._messages_cache): + if # ====== DB 操作相关 ====== async def _upsert_expression_to_db(self, situation: str, style: str): diff --git a/src/chat/utils/chat_message_builder.py b/src/chat/utils/chat_message_builder.py deleted file mode 100644 index f4d92eeb..00000000 --- a/src/chat/utils/chat_message_builder.py +++ /dev/null @@ -1,1083 +0,0 @@ -# TODO: 完全删除此文件,将所有方法该合并的合并。 -import time -import random -import re -from datetime import datetime -from typing import List, Dict, Any, Tuple, Optional, Callable -from rich.traceback import install - -from sqlmodel import select, col -from src.config.config import global_config -from src.common.logger import get_logger -from src.common.message_repository import find_messages, count_messages -from src.common.data_models.database_data_model import DatabaseMessages, DatabaseActionRecords -from src.common.database.database import get_db_session -from src.common.database.database_model import ActionRecord, Images -from src.person_info.person_info import Person, get_person_id -from src.chat.utils.utils import translate_timestamp_to_human_readable, assign_message_ids, is_bot_self - -install(extra_lines=3) -logger = get_logger("chat_message_builder") - - -def replace_user_references( # TODO: 整合此函数 - content: Optional[str], - platform: str, - name_resolver: Optional[Callable[[str, str], str]] = None, - replace_bot_name: bool = True, -) -> str: - """ - 替换内容中的用户引用格式,包括回复和@格式 - - Args: - content: 要处理的内容字符串 - platform: 平台标识 - name_resolver: 名称解析函数,接收(platform, user_id)参数,返回用户名称 - 如果为None,则使用默认的person_info_manager - replace_bot_name: 是否将机器人的user_id替换为"机器人昵称(你)" - - Returns: - str: 处理后的内容字符串 - """ - if not content: - return "" - if name_resolver is None: - - def default_resolver(platform: str, user_id: str) -> str: - # 检查是否是机器人自己(支持多平台,包括 WebUI) - if replace_bot_name and is_bot_self(platform, user_id): - return f"{global_config.bot.nickname}(你)" - person = Person(platform=platform, user_id=user_id) - return person.person_name or user_id # type: ignore - - name_resolver = default_resolver - - # 处理回复格式 - reply_pattern = r"回复<([^:<>]+):([^:<>]+)>" - match = re.search(reply_pattern, content) - if match: - aaa = match[1] - bbb = match[2] - try: - # 检查是否是机器人自己(支持多平台,包括 WebUI) - if replace_bot_name and is_bot_self(platform, bbb): - reply_person_name = f"{global_config.bot.nickname}(你)" - else: - reply_person_name = name_resolver(platform, bbb) or aaa - content = re.sub(reply_pattern, f"回复 {reply_person_name}", content, count=1) - except Exception: - # 如果解析失败,使用原始昵称 - content = re.sub(reply_pattern, f"回复 {aaa}", content, count=1) - - # 处理@格式 - at_pattern = r"@<([^:<>]+):([^:<>]+)>" - at_matches = list(re.finditer(at_pattern, content)) - if at_matches: - new_content = "" - last_end = 0 - for m in at_matches: - new_content += content[last_end : m.start()] - aaa = m.group(1) - bbb = m.group(2) - try: - # 检查是否是机器人自己 - if replace_bot_name and bbb == global_config.bot.qq_account: - at_person_name = f"{global_config.bot.nickname}(你)" - else: - at_person_name = name_resolver(platform, bbb) or aaa - new_content += f"@{at_person_name}" - except Exception: - # 如果解析失败,使用原始昵称 - new_content += f"@{aaa}" - last_end = m.end() - new_content += content[last_end:] - content = new_content - - # Telegram 文本 @username 的显示映射交由适配器或平台层处理;此处不做硬编码替换 - - return content - - -def get_raw_msg_by_timestamp(timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest"): - """ - 获取从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表 - limit: 限制返回的消息数量,0为不限制 - limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'。 - """ - filter_query = {"time": {"$gt": timestamp_start, "$lt": timestamp_end}} - # 只有当 limit 为 0 时才应用外部 sort - sort_order = [("time", 1)] if limit == 0 else None - return find_messages(message_filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode) - - -def get_raw_msg_by_timestamp_with_chat( - chat_id: str, - timestamp_start: float, - timestamp_end: float, - limit: int = 0, - limit_mode: str = "latest", - filter_bot=False, - filter_command=False, - filter_intercept_message_level: Optional[int] = None, -) -> List[DatabaseMessages]: - """获取在特定聊天从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表 - limit: 限制返回的消息数量,0为不限制 - limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'。 - """ - filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": timestamp_end}} - # 只有当 limit 为 0 时才应用外部 sort - sort_order = [("time", 1)] if limit == 0 else None - # 直接将 limit_mode 传递给 find_messages - # print(f"get_raw_msg_by_timestamp_with_chat: {chat_id}, {timestamp_start}, {timestamp_end}, {limit}, {limit_mode}, {filter_bot}, {filter_command}") - return find_messages( - message_filter=filter_query, - sort=sort_order, - limit=limit, - limit_mode=limit_mode, - filter_bot=filter_bot, - filter_command=filter_command, - filter_intercept_message_level=filter_intercept_message_level, - ) - - -def get_raw_msg_by_timestamp_with_chat_inclusive( - chat_id: str, - timestamp_start: float, - timestamp_end: float, - limit: int = 0, - limit_mode: str = "latest", - filter_bot=False, - filter_command=False, - filter_intercept_message_level: Optional[int] = None, -) -> List[DatabaseMessages]: - """获取在特定聊天从指定时间戳到指定时间戳的消息(包含边界),按时间升序排序,返回消息列表 - limit: 限制返回的消息数量,0为不限制 - limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'。 - """ - filter_query = {"chat_id": chat_id, "time": {"$gte": timestamp_start, "$lte": timestamp_end}} - # 只有当 limit 为 0 时才应用外部 sort - sort_order = [("time", 1)] if limit == 0 else None - # 直接将 limit_mode 传递给 find_messages - return find_messages( - message_filter=filter_query, - sort=sort_order, - limit=limit, - limit_mode=limit_mode, - filter_bot=filter_bot, - filter_command=filter_command, - filter_intercept_message_level=filter_intercept_message_level, - ) - - -def get_raw_msg_by_timestamp_with_chat_users( - chat_id: str, - timestamp_start: float, - timestamp_end: float, - person_ids: List[str], - limit: int = 0, - limit_mode: str = "latest", -) -> List[DatabaseMessages]: - """获取某些特定用户在特定聊天从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表 - limit: 限制返回的消息数量,0为不限制 - limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'。 - """ - filter_query = { - "chat_id": chat_id, - "time": {"$gt": timestamp_start, "$lt": timestamp_end}, - "user_id": {"$in": person_ids}, - } - # 只有当 limit 为 0 时才应用外部 sort - sort_order = [("time", 1)] if limit == 0 else None - return find_messages(message_filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode) - - -def get_actions_by_timestamp_with_chat( - chat_id: str, - timestamp_start: float = 0, - timestamp_end: float = time.time(), - limit: int = 0, - limit_mode: str = "latest", -) -> List[DatabaseActionRecords]: - """获取在特定聊天从指定时间戳到指定时间戳的动作记录,按时间升序排序,返回动作记录列表""" - with get_db_session() as session: - statement = ( - select(ActionRecord) - .where((col(ActionRecord.session_id) == chat_id)) - .where(col(ActionRecord.timestamp) > datetime.fromtimestamp(timestamp_start)) - .where(col(ActionRecord.timestamp) < datetime.fromtimestamp(timestamp_end)) - ) - - if limit > 0: - if limit_mode == "latest": - statement = statement.order_by(col(ActionRecord.timestamp).desc()).limit(limit) - actions = list(session.exec(statement).all()) - actions = list(reversed(actions)) - else: - statement = statement.order_by(col(ActionRecord.timestamp)).limit(limit) - actions = list(session.exec(statement).all()) - else: - statement = statement.order_by(col(ActionRecord.timestamp)) - actions = session.exec(statement).all() - return [ - DatabaseActionRecords( - action_id=action.action_id, - time=action.timestamp.timestamp(), - action_name=action.action_name, - action_data=action.action_data or "{}", - action_done=True, - action_build_into_prompt=bool(action.action_display_prompt), - action_prompt_display=action.action_display_prompt or "", - chat_id=action.session_id, - chat_info_stream_id=action.session_id, - chat_info_platform=global_config.bot.platform, - action_reasoning=action.action_reasoning or "", - ) - for action in actions - ] - - -def get_actions_by_timestamp_with_chat_inclusive( - chat_id: str, timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest" -) -> List[Dict[str, Any]]: - """获取在特定聊天从指定时间戳到指定时间戳的动作记录(包含边界),按时间升序排序,返回动作记录列表""" - with get_db_session() as session: - statement = ( - select(ActionRecord) - .where((col(ActionRecord.session_id) == chat_id)) - .where(col(ActionRecord.timestamp) >= datetime.fromtimestamp(timestamp_start)) - .where(col(ActionRecord.timestamp) <= datetime.fromtimestamp(timestamp_end)) - ) - - if limit > 0: - if limit_mode == "latest": - statement = statement.order_by(col(ActionRecord.timestamp).desc()).limit(limit) - actions = list(session.exec(statement).all()) - actions = list(reversed(actions)) - else: - statement = statement.order_by(col(ActionRecord.timestamp)).limit(limit) - actions = list(session.exec(statement).all()) - else: - statement = statement.order_by(col(ActionRecord.timestamp)) - actions = session.exec(statement).all() - - return [action.model_dump() for action in actions] - - -# TODO: 整合为统一函数,由参数控制(仿照build_readable_message) -# def get_raw_msg_by_timestamp_random( -# timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest" -# ) -> List[DatabaseMessages]: -# """ -# 先在范围时间戳内随机选择一条消息,取得消息的chat_id,然后根据chat_id获取该聊天在指定时间戳范围内的消息 -# """ -# # 获取所有消息,只取chat_id字段 -# all_msgs = get_raw_msg_by_timestamp(timestamp_start, timestamp_end) -# if not all_msgs: -# return [] -# # 随机选一条 -# msg = random.choice(all_msgs) -# chat_id = msg.chat_id -# timestamp_start = msg.time -# # 用 chat_id 获取该聊天在指定时间戳范围内的消息 -# return get_raw_msg_by_timestamp_with_chat(chat_id, timestamp_start, timestamp_end, limit, "earliest") - - -# def get_raw_msg_by_timestamp_with_users( -# timestamp_start: float, timestamp_end: float, person_ids: List[str], limit: int = 0, limit_mode: str = "latest" -# ) -> List[DatabaseMessages]: -# """获取某些特定用户在 *所有聊天* 中从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表 -# limit: 限制返回的消息数量,0为不限制 -# limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'。 -# """ -# filter_query = {"time": {"$gt": timestamp_start, "$lt": timestamp_end}, "user_id": {"$in": person_ids}} -# # 只有当 limit 为 0 时才应用外部 sort -# sort_order = [("time", 1)] if limit == 0 else None -# return find_messages(message_filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode) - - -# def get_raw_msg_before_timestamp(timestamp: float, limit: int = 0) -> List[DatabaseMessages]: -# """获取指定时间戳之前的消息,按时间升序排序,返回消息列表 -# limit: 限制返回的消息数量,0为不限制 -# """ -# filter_query = {"time": {"$lt": timestamp}} -# sort_order = [("time", 1)] -# return find_messages(message_filter=filter_query, sort=sort_order, limit=limit) - - -# def get_raw_msg_before_timestamp_with_chat( -# chat_id: str, timestamp: float, limit: int = 0, filter_intercept_message_level: Optional[int] = None -# ) -> List[DatabaseMessages]: -# """获取指定时间戳之前的消息,按时间升序排序,返回消息列表 -# limit: 限制返回的消息数量,0为不限制 -# """ -# filter_query = {"chat_id": chat_id, "time": {"$lt": timestamp}} -# sort_order = [("time", 1)] -# return find_messages( -# message_filter=filter_query, -# sort=sort_order, -# limit=limit, -# filter_intercept_message_level=filter_intercept_message_level, -# ) - - -# def get_raw_msg_before_timestamp_with_users( -# timestamp: float, person_ids: List[str], limit: int = 0 -# ) -> List[DatabaseMessages]: -# """获取指定时间戳之前的消息,按时间升序排序,返回消息列表 -# limit: 限制返回的消息数量,0为不限制 -# """ -# filter_query = {"time": {"$lt": timestamp}, "user_id": {"$in": person_ids}} -# sort_order = [("time", 1)] -# return find_messages(message_filter=filter_query, sort=sort_order, limit=limit) - - -# def num_new_messages_since(chat_id: str, timestamp_start: float = 0.0, timestamp_end: Optional[float] = None) -> int: -# """ -# 检查特定聊天从 timestamp_start (不含) 到 timestamp_end (不含) 之间有多少新消息。 -# 如果 timestamp_end 为 None,则检查从 timestamp_start (不含) 到当前时间的消息。 -# """ -# # 确定有效的结束时间戳 -# _timestamp_end = timestamp_end if timestamp_end is not None else time.time() - -# # 确保 timestamp_start < _timestamp_end -# if timestamp_start >= _timestamp_end: -# # logger.warning(f"timestamp_start ({timestamp_start}) must be less than _timestamp_end ({_timestamp_end}). Returning 0.") -# return 0 # 起始时间大于等于结束时间,没有新消息 - -# filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": _timestamp_end}} -# return count_messages(message_filter=filter_query) - - -# def num_new_messages_since_with_users( -# chat_id: str, timestamp_start: float, timestamp_end: float, person_ids: List[str] -# ) -> int: -# """检查某些特定用户在特定聊天在指定时间戳之间有多少新消息""" -# if not person_ids: # 保持空列表检查 -# return 0 -# filter_query = { -# "chat_id": chat_id, -# "time": {"$gt": timestamp_start, "$lt": timestamp_end}, -# "user_id": {"$in": person_ids}, -# } -# return count_messages(message_filter=filter_query) - - -def _build_readable_messages_internal( - messages: List[DatabaseMessages], - replace_bot_name: bool = True, - timestamp_mode: str = "relative", - truncate: bool = False, - pic_id_mapping: Optional[Dict[str, str]] = None, - pic_counter: int = 1, - show_pic: bool = True, - message_id_list: Optional[List[Tuple[str, DatabaseMessages]]] = None, - pic_single: bool = False, - long_time_notice: bool = False, -) -> Tuple[str, List[Tuple[float, str, str]], Dict[str, str], int]: - # sourcery skip: use-getitem-for-re-match-groups - """ - 内部辅助函数,构建可读消息字符串和原始消息详情列表。 - - Args: - messages: 消息字典列表。 - replace_bot_name: 是否将机器人的 user_id 替换为 "我"。 - merge_messages: 是否合并来自同一用户的连续消息。 - timestamp_mode: 时间戳的显示模式 ('relative', 'absolute', etc.)。传递给 translate_timestamp_to_human_readable。 - truncate: 是否根据消息的新旧程度截断过长的消息内容。 - pic_id_mapping: 图片ID映射字典,如果为None则创建新的 - pic_counter: 图片计数器起始值 - - Returns: - 包含格式化消息的字符串、原始消息详情列表、图片映射字典和更新后的计数器的元组。 - """ - if not messages: - return "", [], pic_id_mapping or {}, pic_counter - - detailed_messages_raw: List[Tuple[float, str, str, bool]] = [] - - # 使用传入的映射字典,如果没有则创建新的 - if pic_id_mapping is None: - pic_id_mapping = {} - current_pic_counter = pic_counter - pic_description_cache: Dict[str, str] = {} - - # 创建时间戳到消息ID的映射,用于在消息前添加[id]标识符 - timestamp_to_id_mapping: Dict[float, str] = {} - if message_id_list: - for msg_id, msg in message_id_list: - timestamp = msg.time - if timestamp is not None: - timestamp_to_id_mapping[timestamp] = msg_id - - def process_pic_ids(content: Optional[str]) -> str: - """处理内容中的图片ID,将其替换为[图片x]格式""" - if content is None: - logger.warning("Content is None when processing pic IDs.") - raise ValueError("Content is None") - - # 匹配 [picid:xxxxx] 格式 - pic_pattern = r"\[picid:([^\]]+)\]" - - def replace_pic_id(match: re.Match[str]) -> str: - nonlocal current_pic_counter - nonlocal pic_counter - pic_id = match.group(1) - if pic_single: - if pic_id not in pic_description_cache: - description = "内容正在阅读,请稍等" - try: - with get_db_session() as session: - image = session.get(Images, int(pic_id)) if pic_id.isdigit() else None - if image and image.description: - description = image.description - except Exception: - pass - pic_description_cache[pic_id] = description - return f"[图片:{pic_description_cache[pic_id]}]" - if pic_id not in pic_id_mapping: - pic_id_mapping[pic_id] = f"图片{current_pic_counter}" - current_pic_counter += 1 - - return f"[{pic_id_mapping[pic_id]}]" - - return re.sub(pic_pattern, replace_pic_id, content) - - # 1: 获取发送者信息并提取消息组件 - for message in messages: - user_info = message.user_info - platform = user_info.platform - user_id = user_info.user_id - user_nickname = user_info.user_nickname - user_cardname = user_info.user_cardname - - timestamp = message.time - content = message.display_message or message.processed_plain_text or "" - - # 处理图片ID - if show_pic: - content = process_pic_ids(content) - - # 检查必要信息是否存在 - if not all([platform, user_id, timestamp is not None]): - continue - - person = Person(platform=platform, user_id=user_id) - # 根据 replace_bot_name 参数决定是否替换机器人名称 - person_name = ( - person.person_name or f"{user_nickname}" or (f"昵称:{user_cardname}" if user_cardname else "某人") - ) - # 使用统一的 is_bot_self 函数判断是否是机器人自己(支持多平台,包括 WebUI) - if replace_bot_name and is_bot_self(platform, user_id): - person_name = f"{global_config.bot.nickname}(你)" - - # 使用独立函数处理用户引用格式 - if content := replace_user_references(content, platform, replace_bot_name=replace_bot_name): - if getattr(message, "is_command", False): - content = f"[is_command=True] {content}" - detailed_messages_raw.append((timestamp, person_name, content, False)) - - if not detailed_messages_raw: - return "", [], pic_id_mapping, current_pic_counter - - detailed_messages_raw.sort(key=lambda x: x[0]) # 按时间戳(第一个元素)升序排序,越早的消息排在前面 - detailed_message: List[Tuple[float, str, str, bool]] = [] - - # 2. 应用消息截断逻辑 - messages_count = len(detailed_messages_raw) - if truncate and messages_count > 0: - for i, (timestamp, name, content, is_action) in enumerate(detailed_messages_raw): - # 对于动作记录,不进行截断 - if is_action: - detailed_message.append((timestamp, name, content, is_action)) - continue - - percentile = i / messages_count # 计算消息在列表中的位置百分比 (0 <= percentile < 1) - original_len = len(content) - limit = -1 # 默认不截断 - - if percentile < 0.2: # 60% 之前的消息 (即最旧的 60%) - limit = 50 - replace_content = "......(记不清了)" - elif percentile < 0.5: # 60% 之前的消息 (即最旧的 60%) - limit = 100 - replace_content = "......(有点记不清了)" - elif percentile < 0.7: # 60% 到 80% 之前的消息 (即中间的 20%) - limit = 200 - replace_content = "......(内容太长了)" - elif percentile <= 1.0: # 80% 到 100% 之前的消息 (即较新的 20%) - limit = 400 - replace_content = "......(内容太长了)" - - truncated_content = content - if 0 < limit < original_len: - truncated_content = f"{content[:limit]}{replace_content}" # pyright: ignore[reportPossiblyUnboundVariable] - - detailed_message.append((timestamp, name, truncated_content, is_action)) - else: - # 如果不截断,直接使用原始列表 - detailed_message = detailed_messages_raw - - # 3: 格式化为字符串 - output_lines: List[str] = [] - - prev_timestamp: Optional[float] = None - for timestamp, name, content, _is_action in detailed_message: - # 检查是否需要插入长时间间隔提示 - if long_time_notice and prev_timestamp is not None: - time_diff = timestamp - prev_timestamp - time_diff_hours = time_diff / 3600 - - # 检查是否跨天 - prev_date = time.strftime("%Y-%m-%d", time.localtime(prev_timestamp)) - current_date = time.strftime("%Y-%m-%d", time.localtime(timestamp)) - is_cross_day = prev_date != current_date - - # 如果间隔大于8小时或跨天,插入提示 - if time_diff_hours > 8 or is_cross_day: - # 格式化日期为中文格式:xxxx年xx月xx日(去掉前导零) - current_time_struct = time.localtime(timestamp) - year = current_time_struct.tm_year - month = current_time_struct.tm_mon - day = current_time_struct.tm_mday - date_str = f"{year}年{month}月{day}日" - hours_str = f"{int(time_diff_hours)}h" - notice = f"以下聊天开始时间:{date_str}。距离上一条消息过去了{hours_str}\n" - output_lines.append(notice) - - readable_time = translate_timestamp_to_human_readable(timestamp, mode=timestamp_mode) - - # 查找消息id(如果有)并构建id_prefix - message_id = timestamp_to_id_mapping.get(timestamp, "") - id_prefix = f"[{message_id}]" if message_id else "" - - output_lines.append(f"{id_prefix}{readable_time}, {name}: {content}") - output_lines.append("\n") - prev_timestamp = timestamp - - formatted_string = "".join(output_lines).strip() - - # 返回格式化后的字符串、消息详情列表、图片映射字典和更新后的计数器 - return ( - formatted_string, - [(t, n, c) for t, n, c, is_action in detailed_message if not is_action], - pic_id_mapping, - current_pic_counter, - ) - - -# 由MessageUtils._extract_pictures_from_message替代 -# def build_pic_mapping_info(pic_id_mapping: Dict[str, str]) -> str: -# # sourcery skip: use-contextlib-suppress -# """ -# 构建图片映射信息字符串,显示图片的具体描述内容 - -# Args: -# pic_id_mapping: 图片ID到显示名称的映射字典 - -# Returns: -# 格式化的映射信息字符串 -# """ -# if not pic_id_mapping: -# return "" - -# mapping_lines = [] - -# # 按图片编号排序 -# sorted_items = sorted(pic_id_mapping.items(), key=lambda x: int(x[1].replace("图片", ""))) - -# for pic_id, display_name in sorted_items: -# # 从数据库中获取图片描述 -# description = "内容正在阅读,请稍等" -# try: -# with get_db_session() as session: -# image = session.get(Images, int(pic_id)) if pic_id.isdigit() else None -# if image and image.description: -# description = image.description -# except Exception: -# # 如果查询失败,保持默认描述 -# pass - -# mapping_lines.append(f"[{display_name}] 的内容:{description}") - -# return "\n".join(mapping_lines) - - -def build_readable_actions(actions: List[DatabaseActionRecords], mode: str = "relative") -> str: - """ - 将动作列表转换为可读的文本格式。 - 格式: 在()分钟前,你使用了(action_name),具体内容是:(action_prompt_display) - - Args: - actions: 动作记录字典列表。 - - Returns: - 格式化的动作字符串。 - """ - if not actions: - return "" - - output_lines = [] - current_time = time.time() - - for action in actions: - action_time = action.time or current_time - action_name = action.action_name or "未知动作" - # action_reason = action.get(action_data") - if action_name in ["no_reply", "no_reply"]: - continue - - action_prompt_display = action.action_prompt_display or "无具体内容" - - time_diff_seconds = current_time - action_time - if mode == "relative": - if time_diff_seconds < 60: - time_ago_str = f"在{int(time_diff_seconds)}秒前" - else: - time_diff_minutes = round(time_diff_seconds / 60) - time_ago_str = f"在{int(time_diff_minutes)}分钟前" - elif mode == "absolute": - # 转化为可读时间(仅保留时分秒,不包含日期) - action_time_struct = time.localtime(action_time) - time_str = time.strftime("%H:%M:%S", action_time_struct) - time_ago_str = f"在{time_str}" - else: - raise ValueError(f"Unsupported mode: {mode}") - - line = f"{time_ago_str},你使用了“{action_name}”,具体内容是:“{action_prompt_display}”" - output_lines.append(line) - - return "\n".join(output_lines) - - -# 由MessageUtils里面的build_readable_message替代 -# async def build_readable_messages_with_list( -# messages: List[DatabaseMessages], -# replace_bot_name: bool = True, -# timestamp_mode: str = "relative", -# truncate: bool = False, -# pic_single: bool = False, -# ) -> Tuple[str, List[Tuple[float, str, str]]]: -# """ -# 将消息列表转换为可读的文本格式,并返回原始(时间戳, 昵称, 内容)列表。 -# 允许通过参数控制格式化行为。 -# """ -# formatted_string, details_list, pic_id_mapping, _ = _build_readable_messages_internal( -# messages, -# replace_bot_name, -# timestamp_mode, -# truncate, -# pic_id_mapping=None, -# pic_counter=1, -# show_pic=True, -# message_id_list=None, -# pic_single=pic_single, -# long_time_notice=False, -# ) - -# if not pic_single: -# if pic_mapping_info := build_pic_mapping_info(pic_id_mapping): -# formatted_string = f"{pic_mapping_info}\n\n{formatted_string}" - -# return formatted_string, details_list - -# 由MessageUtils里面的build_readable_message替代 -# def build_readable_messages_with_id( -# messages: List[DatabaseMessages], -# replace_bot_name: bool = True, -# timestamp_mode: str = "relative", -# read_mark: float = 0.0, -# truncate: bool = False, -# show_actions: bool = False, -# show_pic: bool = True, -# remove_emoji_stickers: bool = False, -# pic_single: bool = False, -# ) -> Tuple[str, List[Tuple[str, DatabaseMessages]]]: -# """ -# 将消息列表转换为可读的文本格式,并返回原始(时间戳, 昵称, 内容)列表。 -# 允许通过参数控制格式化行为。 -# """ -# message_id_list = assign_message_ids(messages) - -# formatted_string = build_readable_messages( -# messages=messages, -# replace_bot_name=replace_bot_name, -# timestamp_mode=timestamp_mode, -# truncate=truncate, -# show_actions=show_actions, -# show_pic=show_pic, -# read_mark=read_mark, -# message_id_list=message_id_list, -# remove_emoji_stickers=remove_emoji_stickers, -# pic_single=pic_single, -# ) - -# return formatted_string, message_id_list - - -def build_readable_messages( - messages: List[DatabaseMessages], - replace_bot_name: bool = True, - timestamp_mode: str = "relative", - read_mark: float = 0.0, - truncate: bool = False, - show_actions: bool = False, - show_pic: bool = True, - message_id_list: Optional[List[Tuple[str, DatabaseMessages]]] = None, - remove_emoji_stickers: bool = False, - pic_single: bool = False, - long_time_notice: bool = False, -) -> str: # sourcery skip: extract-method - """ - 将消息列表转换为可读的文本格式。 - 如果提供了 read_mark,则在相应位置插入已读标记。 - 允许通过参数控制格式化行为。 - - Args: - messages: 消息列表 - replace_bot_name: 是否替换机器人名称为"你" - merge_messages: 是否合并连续消息 - timestamp_mode: 时间戳显示模式,"normal"或"normal_no_YMD"或"relative" - read_mark: 已读标记时间戳 - truncate: 是否截断长消息 - show_actions: 是否显示动作记录 - remove_emoji_stickers: 是否移除表情包并过滤空消息 - long_time_notice: 是否在消息间隔过长(>8小时)或跨天时插入时间提示 - """ - # WIP HERE and BELOW ---------------------------------------------- - # 创建messages的深拷贝,避免修改原始列表 - if not messages: - return "" - - # 如果启用移除表情包,先过滤消息 - if remove_emoji_stickers: - filtered_messages = [] - for msg in messages: - # 获取消息内容 - content = msg.processed_plain_text or "" - # 移除表情包 - emoji_pattern = r"\[表情包:[^\]]+\]" - content = re.sub(emoji_pattern, "", content) - - # 如果移除表情包后内容不为空,则保留消息 - if content.strip(): - filtered_messages.append(msg) - - messages = filtered_messages - - copy_messages: List[DatabaseMessages] = [] - for msg in messages: - if remove_emoji_stickers: - # 移除表情包 - msg.processed_plain_text = re.sub(r"\[表情包:[^\]]+\]", "", msg.processed_plain_text or "") - copy_messages.append(msg) - else: - copy_messages.append(msg) - - if show_actions and copy_messages: - # 获取所有消息的时间范围 - min_time = min(msg.time or 0 for msg in copy_messages) - max_time = max(msg.time or 0 for msg in copy_messages) - - # 从第一条消息中获取chat_id - chat_id = messages[0].chat_id if messages else None - - # 获取这个时间范围内的动作记录,并匹配chat_id - with get_db_session() as session: - actions_in_range = session.exec( - select(ActionRecord) - .where(col(ActionRecord.timestamp) >= datetime.fromtimestamp(min_time)) - .where(col(ActionRecord.timestamp) <= datetime.fromtimestamp(max_time)) - .where(col(ActionRecord.session_id) == chat_id) - .order_by(col(ActionRecord.timestamp)) - ).all() - - # 获取最新消息之后的第一个动作记录 - with get_db_session() as session: - action_after_latest = session.exec( - select(ActionRecord) - .where(col(ActionRecord.timestamp) > datetime.fromtimestamp(max_time)) - .where(col(ActionRecord.session_id) == chat_id) - .order_by(col(ActionRecord.timestamp)) - .limit(1) - ).all() - - # 合并两部分动作记录 - actions: List[ActionRecord] = list(actions_in_range) + list(action_after_latest) - - # 将动作记录转换为消息格式 - for action in actions: - # 只有当build_into_prompt为True时才添加动作记录 - action_display_prompt = action.action_display_prompt or "" - if action_display_prompt: - action_msg = DatabaseMessages( - message_id=f"action_{action.action_id}", - time=float(action.timestamp.timestamp()), - chat_id=chat_id or "", - processed_plain_text=action_display_prompt, - display_message=action_display_prompt, - user_platform=global_config.bot.platform, - user_id=str(global_config.bot.qq_account), - user_nickname=global_config.bot.nickname, - user_cardname="", - chat_info_platform=str(global_config.bot.platform), - chat_info_stream_id=chat_id or "", - ) - copy_messages.append(action_msg) - - # 重新按时间排序 - copy_messages.sort(key=lambda x: x.time or 0) - - if read_mark <= 0: - # 没有有效的 read_mark,直接格式化所有消息 - formatted_string, _, pic_id_mapping, _ = _build_readable_messages_internal( - copy_messages, - replace_bot_name, - timestamp_mode, - truncate, - show_pic=show_pic, - message_id_list=message_id_list, - pic_single=pic_single, - long_time_notice=long_time_notice, - ) - - if not pic_single: - pic_mapping_info = build_pic_mapping_info(pic_id_mapping) - if pic_mapping_info: - return f"{pic_mapping_info}\n\n{formatted_string}" - return formatted_string - else: - # 按 read_mark 分割消息 - messages_before_mark = [msg for msg in copy_messages if (msg.time or 0) <= read_mark] - messages_after_mark = [msg for msg in copy_messages if (msg.time or 0) > read_mark] - - # 共享的图片映射字典和计数器 - pic_id_mapping = {} - pic_counter = 1 - - # 分别格式化,但使用共享的图片映射 - formatted_before, _, pic_id_mapping, pic_counter = _build_readable_messages_internal( - messages_before_mark, - replace_bot_name, - timestamp_mode, - truncate, - pic_id_mapping, - pic_counter, - show_pic=show_pic, - message_id_list=message_id_list, - pic_single=pic_single, - long_time_notice=long_time_notice, - ) - formatted_after, _, pic_id_mapping, _ = _build_readable_messages_internal( - messages_after_mark, - replace_bot_name, - timestamp_mode, - False, - pic_id_mapping, - pic_counter, - show_pic=show_pic, - message_id_list=message_id_list, - pic_single=pic_single, - long_time_notice=long_time_notice, - ) - - read_mark_line = "\n--- 以上消息是你已经看过,请关注以下未读的新消息---\n" - - # 生成图片映射信息 - if not pic_single: - if pic_id_mapping: - pic_mapping_info = f"图片信息:\n{build_pic_mapping_info(pic_id_mapping)}\n聊天记录信息:\n" - else: - pic_mapping_info = "聊天记录信息:\n" - else: - pic_mapping_info = "" - - # 组合结果 - result_parts = [] - if pic_mapping_info: - result_parts.extend((pic_mapping_info, "\n")) - if formatted_before and formatted_after: - result_parts.extend([formatted_before, read_mark_line, formatted_after]) - elif formatted_before: - result_parts.extend([formatted_before, read_mark_line]) - elif formatted_after: - result_parts.extend([read_mark_line, formatted_after]) - else: - result_parts.append(read_mark_line.strip()) - - return "".join(result_parts) - - -# 由MessageUtils里面的build_readable_message替代 -# async def build_anonymous_messages(messages: List[DatabaseMessages], show_ids: bool = False) -> str: -# """ -# 构建匿名可读消息,将不同人的名称转为唯一占位符(A、B、C...),bot自己用SELF。 -# 处理 回复 和 @ 字段,将bbb映射为匿名占位符。 -# """ -# if not messages: -# logger.warning("没有消息,无法构建匿名消息") -# return "" - -# person_map = {} -# current_char = ord("A") -# output_lines = [] - -# # 图片ID映射字典 -# pic_id_mapping = {} -# pic_counter = 1 - -# def process_pic_ids(content: str) -> str: -# """处理内容中的图片ID,将其替换为[图片x]格式""" -# nonlocal pic_counter - -# # 匹配 [picid:xxxxx] 格式 -# pic_pattern = r"\[picid:([^\]]+)\]" - -# def replace_pic_id(match): -# nonlocal pic_counter -# pic_id = match.group(1) - -# if pic_id not in pic_id_mapping: -# pic_id_mapping[pic_id] = f"图片{pic_counter}" -# pic_counter += 1 - -# return f"[{pic_id_mapping[pic_id]}]" - -# return re.sub(pic_pattern, replace_pic_id, content) - -# def get_anon_name(platform, user_id): -# # print(f"get_anon_name: platform:{platform}, user_id:{user_id}") -# # print(f"global_config.bot.qq_account:{global_config.bot.qq_account}") - -# if (platform == "qq" and user_id == global_config.bot.qq_account) or ( -# platform == "telegram" and user_id == getattr(global_config.bot, "telegram_account", "") -# ): -# # print("SELF11111111111111") -# return "SELF" -# try: -# person_id = get_person_id(platform, user_id) -# except Exception as _e: -# person_id = None -# if not person_id: -# return "?" -# if person_id not in person_map: -# nonlocal current_char -# person_map[person_id] = chr(current_char) -# current_char += 1 -# return person_map[person_id] - -# for i, msg in enumerate(messages): -# try: -# platform = msg.chat_info.platform -# user_id = msg.user_info.user_id -# content = msg.display_message or msg.processed_plain_text or "" - -# # 处理图片ID -# content = process_pic_ids(content) - -# anon_name = get_anon_name(platform, user_id) -# # print(f"anon_name:{anon_name}") - -# # 使用独立函数处理用户引用格式,传入自定义的匿名名称解析器 -# def anon_name_resolver(platform: str, user_id: str) -> str: -# try: -# return get_anon_name(platform, user_id) -# except Exception: -# return "?" - -# content = replace_user_references(content, platform, anon_name_resolver, replace_bot_name=False) - -# # 构建消息头,如果启用show_ids则添加序号 -# if show_ids: -# header = f"[{i + 1}] {anon_name}说 " -# else: -# header = f"{anon_name}说 " - -# output_lines.append(header) -# stripped_line = content.strip() -# if stripped_line: -# if stripped_line.endswith("。"): -# stripped_line = stripped_line[:-1] -# output_lines.append(f"{stripped_line}") -# # print(f"output_lines:{output_lines}") -# output_lines.append("\n") -# except Exception: -# continue - -# # 在最前面添加图片映射信息 -# final_output_lines = [] -# pic_mapping_info = build_pic_mapping_info(pic_id_mapping) -# if pic_mapping_info: -# final_output_lines.append(pic_mapping_info) -# final_output_lines.append("\n\n") - -# final_output_lines.extend(output_lines) -# formatted_string = "".join(final_output_lines).strip() -# return formatted_string - - -async def get_person_id_list(messages: List[Dict[str, Any]]) -> List[str]: - """ - 从消息列表中提取不重复的 person_id 列表 (忽略机器人自身)。 - - Args: - messages: 消息字典列表。 - - Returns: - 一个包含唯一 person_id 的列表。 - """ - person_ids_set = set() # 使用集合来自动去重 - - for msg in messages: - platform = msg.get("user_platform") or "" - user_id = msg.get("user_id") or "" - - # 检查必要信息是否存在 且 不是机器人自己 - if not all([platform, user_id]) or user_id == global_config.bot.qq_account: - continue - - if person_id := get_person_id(platform, user_id): - person_ids_set.add(person_id) - - return list(person_ids_set) # 将集合转换为列表返回 - - -async def build_bare_messages(messages: List[DatabaseMessages]) -> str: - """ - 构建简化版消息字符串,只包含processed_plain_text内容,不考虑用户名和时间戳 - - Args: - messages: 消息列表 - - Returns: - 只包含消息内容的字符串 - """ - if not messages: - return "" - - output_lines = [] - - for msg in messages: - # 获取纯文本内容 - content = msg.processed_plain_text or "" - - # 处理图片ID - pic_pattern = r"\[picid:[^\]]+\]" - - def replace_pic_id(match): - return "[图片]" - - content = re.sub(pic_pattern, replace_pic_id, content) - - # 处理用户引用格式,移除回复和@标记 - reply_pattern = r"回复<[^:<>]+:[^:<>]+>" - content = re.sub(reply_pattern, "回复[某人]", content) - - at_pattern = r"@<[^:<>]+:[^:<>]+>" - content = re.sub(at_pattern, "@[某人]", content) - - # 清理并添加到输出 - content = content.strip() - if content: - output_lines.append(content) - - return "\n".join(output_lines) diff --git a/src/common/data_models/action_record_data_model.py b/src/common/data_models/action_record_data_model.py new file mode 100644 index 00000000..be6529b3 --- /dev/null +++ b/src/common/data_models/action_record_data_model.py @@ -0,0 +1,65 @@ +from datetime import datetime +from typing import Optional, Dict + +import json + +from src.common.database.database_model import ActionRecord + +from . import BaseDatabaseDataModel + + +class MaiActionRecord(BaseDatabaseDataModel[ActionRecord]): + def __init__( + self, + action_id: str, + timestamp: datetime, + session_id: str, + action_name: str, + action_reasoning: Optional[str] = None, + action_data: Optional[Dict] = None, + action_builtin_prompt: Optional[str] = None, + action_display_prompt: Optional[str] = None, + ): + self.action_id = action_id + """动作ID""" + self.timestamp = timestamp + """时间戳""" + self.session_id = session_id + """会话ID""" + self.action_name = action_name + """动作名称""" + self.action_reasoning = action_reasoning + """动作推理过程""" + self.action_data = action_data or {} + """动作数据""" + self.action_builtin_prompt = action_builtin_prompt + """内置动作提示""" + self.action_display_prompt = action_display_prompt + """最终输入到 Prompt 的内容""" + + @classmethod + def from_db_instance(cls, db_record: ActionRecord): + """Create a data model object from a database record.""" + return cls( + action_id=db_record.action_id, + timestamp=db_record.timestamp, + session_id=db_record.session_id, + action_name=db_record.action_name, + action_reasoning=db_record.action_reasoning, + action_data=json.loads(db_record.action_data) if db_record.action_data else None, + action_builtin_prompt=db_record.action_builtin_prompt, + action_display_prompt=db_record.action_display_prompt, + ) + + def to_db_instance(self): + """Convert the data model object back to a database instance.""" + return ActionRecord( + action_id=self.action_id, + timestamp=self.timestamp, + session_id=self.session_id, + action_name=self.action_name, + action_reasoning=self.action_reasoning, + action_data=json.dumps(self.action_data) if self.action_data else None, + action_builtin_prompt=self.action_builtin_prompt, + action_display_prompt=self.action_display_prompt, + ) diff --git a/src/common/utils/system_utils.py b/src/common/utils/system_utils.py new file mode 100644 index 00000000..d956cb58 --- /dev/null +++ b/src/common/utils/system_utils.py @@ -0,0 +1,8 @@ +# TODO: 这个函数的实现非常临时,后续需要替换为更完善的实现,比如直接从配置文件中读取机器人自己的ID,或者通过API获取机器人自己的信息等 +def is_bot_self(user_id: str, platform: str) -> bool: + """ + 判断用户ID是否是机器人自己 + + 临时方法,后续会替换为更完善的实现 + """ + return user_id == "bot_self" and platform == "test_platform" diff --git a/src/common/utils/utils_action.py b/src/common/utils/utils_action.py new file mode 100644 index 00000000..c1fe7c28 --- /dev/null +++ b/src/common/utils/utils_action.py @@ -0,0 +1,32 @@ +from typing import TYPE_CHECKING, List + +from src.common.utils.math_utils import translate_timestamp_to_human_readable, TimestampMode + +if TYPE_CHECKING: + from src.common.data_models.action_record_data_model import MaiActionRecord + + +class ActionUtils: + @staticmethod + def build_readable_action_records(action_records: List["MaiActionRecord"], timestamp_mode: str | TimestampMode): + """ + 将动作列表转换为可读的文本格式。 + + 格式: `在`time`,你使用了`action_name`,具体内容是:`action_prompt_display` + + Args: + action_records: 动作记录字典列表。 + timestamp_mode: 时间戳模式。 + + Returns: + 格式化的动作字符串。 + """ + if not action_records: + return "" + + output_lines = [] + for record in action_records: + timestamp_str = translate_timestamp_to_human_readable(record.timestamp.timestamp(), mode=timestamp_mode) + line = f"在{timestamp_str},你使用了{record.action_name},具体内容是:{record.action_display_prompt}" + output_lines.append(line) + return "\n".join(output_lines) diff --git a/src/common/utils/utils_image.py b/src/common/utils/utils_image.py index 6132afd6..1257c518 100644 --- a/src/common/utils/utils_image.py +++ b/src/common/utils/utils_image.py @@ -8,7 +8,7 @@ import numpy as np from src.common.logger import get_logger -logger = get_logger("image") +logger = get_logger("image_utils") class ImageUtils: diff --git a/src/common/utils/utils_message.py b/src/common/utils/utils_message.py index e2ad4256..8c54e3ad 100644 --- a/src/common/utils/utils_message.py +++ b/src/common/utils/utils_message.py @@ -1,5 +1,6 @@ from maim_message import MessageBase, Seg from typing import List, Tuple, Optional, Dict, TYPE_CHECKING, Callable +from datetime import datetime import base64 import hashlib @@ -7,6 +8,8 @@ import msgpack import random import re +from sqlmodel import select, col + from src.common.data_models.message_component_data_model import ( MessageSequence, StandardMessageComponents, @@ -20,13 +23,17 @@ from src.common.data_models.message_component_data_model import ( UnknownUser, ForwardNodeComponent, ) +from src.common.logger import get_logger from src.config.config import global_config from .math_utils import number_to_short_id, TimestampMode, translate_timestamp_to_human_readable +from .system_utils import is_bot_self if TYPE_CHECKING: from src.chat.message_receive.message import SessionMessage +logger = get_logger("message_utils") + class MessageUtils: @staticmethod @@ -156,6 +163,7 @@ class MessageUtils: read_mark_time: Optional[float] = None, truncate_message: bool = False, truncate_func: Optional[Callable[[float], Tuple[Optional[int], str]]] = None, + show_actions: bool = False, ) -> Tuple[str, Dict[str, Tuple[str, str]], List[str]]: """ 将消息构建为LLM可读的文本格式 @@ -171,6 +179,7 @@ class MessageUtils: show_message_id_prefix (bool): 是否在每条消息前显示消息ID前缀 truncate_message (bool): 是否启用消息文本截断功能,截断过长的消息文本 truncate_func (Optional[Callable[[float], Tuple[Optional[int], str]]]) 截断函数,接受消息的百分位位置(0-1),返回一个元组(文本长度限制(可为None表不切割), 替换内容) + show_actions (bool): 是否显示Action组件内容 Returns: return (Tuple[str, Dict[str, Tuple[str, str]], List[str]]): 构建后的消息文本,映射表 {用户ID: (匿名ID, 原始名称)},消息编号列表 """ @@ -217,11 +226,30 @@ class MessageUtils: processed_plain_texts.extend(f"[表情{emoji_id}: {desc}]" for emoji_id, desc in emoji_map.values()) processed_plain_texts.extend(("", "聊天记录信息:")) + # 获取动作记录文本列表 + action_messages: List[Tuple[float, str]] = [] + if show_actions and messages: + min_time = msg_list[0].timestamp.timestamp() + max_time = msg_list[-1].timestamp.timestamp() + session_id = msg_list[0].session_id + action_messages = MessageUtils._generate_action_readable(min_time, max_time, session_id) + msg_count = len(msg_list) read_mark_added_flag: bool = False # 标记是否已经添加过已读标签,确保只添加一次 + action_idx: int = 0 # 动作记录的索引,用于双指针遍历 + for i, msg in enumerate(msg_list): await msg.process() plain_text: str = msg.processed_plain_text # type: ignore + msg_time = msg.timestamp.timestamp() + + # 使用双指针插入动作记录 + while action_idx < len(action_messages) and action_messages[action_idx][0] <= msg_time: + processed_plain_texts.append( + MessageUtils._build_action_str_single(action_messages[action_idx], timestamp_mode) + ) + action_idx += 1 + if truncate_message: # 消息截断逻辑 percentile = i / msg_count if not read_mark_time: # 没有已读标签 @@ -250,6 +278,13 @@ class MessageUtils: message_ids.append(message_id) processed_plain_texts.append("".join([header, plain_text])) + # 处理剩余的动作记录(时间在最后一条消息之后的动作) + while action_idx < len(action_messages): + processed_plain_texts.append( + MessageUtils._build_action_str_single(action_messages[action_idx], timestamp_mode) + ) + action_idx += 1 + return "\n".join(processed_plain_texts), user_id_mapping, message_ids @staticmethod @@ -531,12 +566,64 @@ class MessageUtils: ] return component + @staticmethod + def _generate_action_readable(min_time: float, max_time: float, session_id: str) -> List[Tuple[float, str]]: + """ + 获取消息时间范围内的动作记录,并构建动作文本列表 -# TODO: 这个函数的实现非常临时,后续需要替换为更完善的实现,比如直接从配置文件中读取机器人自己的ID,或者通过API获取机器人自己的信息等 -def is_bot_self(user_id: str, platform: str) -> bool: - """ - 判断用户ID是否是机器人自己 + Args: + messages: 消息列表,用于确定时间范围和session_id + timestamp_mode: 时间戳显示模式,默认为None表示不显示时间戳 - 临时方法,后续会替换为更完善的实现 - """ - return user_id == "bot_self" and platform == "test_platform" + Returns: + List[Tuple[float, str]]: 按时间排序的动作文本列表,每个元素为 (timestamp, action_text) + """ + from src.common.database.database import get_db_session + from src.common.database.database_model import ActionRecord + + # 获取这个时间范围内的动作记录,并匹配session_id + try: + with get_db_session() as session: + actions_in_range = session.exec( + select(ActionRecord) + .where(col(ActionRecord.timestamp) >= datetime.fromtimestamp(min_time)) + .where(col(ActionRecord.timestamp) <= datetime.fromtimestamp(max_time)) + .where(col(ActionRecord.session_id) == session_id) + .order_by(col(ActionRecord.timestamp)) + ).all() + + # 获取最新消息之后的第一个动作记录 + with get_db_session() as session: + action_after_latest = session.exec( + select(ActionRecord) + .where(col(ActionRecord.timestamp) > datetime.fromtimestamp(max_time)) + .where(col(ActionRecord.session_id) == session_id) + .order_by(col(ActionRecord.timestamp)) + .limit(1) + ).all() + except Exception as e: + logger.error(f"查询动作记录失败: {e}") + return [] + + # 合并两部分动作记录 + actions = list(actions_in_range) + list(action_after_latest) + + # 构建动作文本列表 + action_messages: List[Tuple[float, str]] = [] + for action in actions: + if action_display_prompt := action.action_display_prompt or "": + action_time = action.timestamp.timestamp() + action_messages.append((action_time, action_display_prompt)) + + return action_messages + + @staticmethod + def _build_action_str_single( + action_content: Tuple[float, str], timestamp_mode: Optional[str | TimestampMode] = None + ) -> str: + action_time, action_text = action_content + action_header = "你执行了: " + if timestamp_mode: + timestamp_str = translate_timestamp_to_human_readable(action_time, mode=timestamp_mode) + action_header = f"[{timestamp_str}] {action_header}" + return f"{action_header}{action_text}" diff --git a/src/common/utils/utils_voice.py b/src/common/utils/utils_voice.py index fafe19a0..651febf0 100644 --- a/src/common/utils/utils_voice.py +++ b/src/common/utils/utils_voice.py @@ -10,7 +10,7 @@ from src.llm_models.utils_model import LLMRequest install(extra_lines=3) -logger = get_logger("chat_voice") +logger = get_logger("voice_utils") # TODO: 在LLMRequest重构后修改这里 asr_model = LLMRequest(model_set=model_config.model_task_config.voice, request_type="audio")