From d03dc3601eec81c6a47fa2e73f0df1d8a26b84fe Mon Sep 17 00:00:00 2001 From: UnCLAS-Prommer Date: Wed, 4 Mar 2026 21:01:51 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=9E=84=E5=BB=BA=E5=99=A8?= =?UTF-8?q?=EF=BC=9A=E5=B0=86=E6=B6=88=E6=81=AF=E8=BD=AC=E6=8D=A2=E4=B8=BA?= =?UTF-8?q?=E5=8F=AF=E8=AF=BB=E6=B6=88=E6=81=AF=EF=BC=9B=E5=85=B6=E5=AF=B9?= =?UTF-8?q?=E5=BA=94=E7=9A=84=E6=B5=8B=E8=AF=95=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytests/utils_test/message_utils_test.py | 367 ++++++++++++++++++ .../message_component_data_model.py | 5 +- src/common/utils/math_utils.py | 34 ++ src/common/utils/utils_message.py | 311 ++++++++++++++- 4 files changed, 714 insertions(+), 3 deletions(-) create mode 100644 pytests/utils_test/message_utils_test.py create mode 100644 src/common/utils/math_utils.py diff --git a/pytests/utils_test/message_utils_test.py b/pytests/utils_test/message_utils_test.py new file mode 100644 index 00000000..97441437 --- /dev/null +++ b/pytests/utils_test/message_utils_test.py @@ -0,0 +1,367 @@ +import sys +from dataclasses import dataclass, field +import pytest +import importlib +import importlib.util +from types import ModuleType +from pathlib import Path +from datetime import datetime +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from src.common.data_models.message_component_data_model import MessageSequence, ForwardComponent + from src.chat.message_receive.message import ( + SessionMessage, + TextComponent, + ImageComponent, + EmojiComponent, + VoiceComponent, + AtComponent, + ReplyComponent, + ForwardNodeComponent, + ) + + +class DummyLogger: + def __init__(self) -> None: + self.logging_record = [] + + def debug(self, msg): + print(f"DEBUG: {msg}") + self.logging_record.append(f"DEBUG: {msg}") + + def info(self, msg): + print(f"INFO: {msg}") + self.logging_record.append(f"INFO: {msg}") + + def warning(self, msg): + print(f"WARNING: {msg}") + self.logging_record.append(f"WARNING: {msg}") + + def error(self, msg): + print(f"ERROR: {msg}") + self.logging_record.append(f"ERROR: {msg}") + + def critical(self, msg): + print(f"CRITICAL: {msg}") + self.logging_record.append(f"CRITICAL: {msg}") + + +def get_logger(name): + return DummyLogger() + + +class DummyDBSession: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def exec(self, statement): + return self + + def first(self): + return None + + def commit(self): + pass + + def all(self): + return [] + + +def get_db_session(): + return DummyDBSession() + + +def get_manual_db_session(): + return DummyDBSession() + + +class DummySelect: + def __init__(self, model): + self.model = model + + def filter_by(self, **kwargs): + return self + + def where(self, condition): + return self + + def limit(self, n): + return self + + +def select(model): + return DummySelect(model) + + +async def dummy_get_voice_text(binary_data): + return None # 可以根据需要返回模拟的文本结果 + + +class DummyPersonUtils: + @staticmethod + def get_person_info_by_user_id_and_platform(user_id, platform): + return None # 可以根据需要返回模拟的用户信息 + + +class DummyConfig: + class MessageReceiveConfig: + ban_words = set() + ban_msgs_regex = set() + + message_receive = MessageReceiveConfig() + + +@dataclass +class UserInfo: + user_id: str + user_nickname: str + user_cardname: Optional[str] = None + + +@dataclass +class GroupInfo: + group_id: str + group_name: str + + +@dataclass +class MessageInfo: + user_info: UserInfo + group_info: Optional[GroupInfo] = None + additional_config: dict = field(default_factory=dict) + + +def setup_mocks(monkeypatch): + def _stub_module(name: str) -> ModuleType: + module = ModuleType(name) + monkeypatch.setitem(sys.modules, name, module) + return module + + # src.common.logger + logger_mod = _stub_module("src.common.logger") + # Mock the logger + logger_mod.get_logger = get_logger + + db_mod = _stub_module("src.common.database.database") + db_mod.get_db_session = get_db_session + db_mod.get_manual_db_session = get_manual_db_session + + db_model_mod = _stub_module("src.common.database.database_model") + db_model_mod.Messages = None # 可以根据需要添加更多的属性或方法 + + emoji_manager_mod = _stub_module("src.chat.emoji_system.emoji_manager") + emoji_manager_mod.emoji_manager = None # 可以根据需要添加更多的属性或方法 + + image_manager_mod = _stub_module("src.chat.image_system.image_manager") + image_manager_mod.image_manager = None # 可以根据需要添加更多的属性或方法 + + voice_utils_mod = _stub_module("src.common.utils.utils_voice") + voice_utils_mod.get_voice_text = dummy_get_voice_text + + person_utils_mod = _stub_module("src.common.utils.utils_person") + person_utils_mod.PersonUtils = DummyPersonUtils + + config_mod = _stub_module("src.config.config") + config_mod.global_config = DummyConfig() + + +def load_message_via_file(monkeypatch): + setup_mocks(monkeypatch) + file_path = Path(__file__).parent.parent.parent / "src" / "chat" / "message_receive" / "message.py" + spec = importlib.util.spec_from_file_location("message", file_path) + message_module = importlib.util.module_from_spec(spec) + monkeypatch.setitem(sys.modules, "message_module", message_module) + spec.loader.exec_module(message_module) + message_module.select = select + SessionMessageClass = message_module.SessionMessage + TextComponentClass = message_module.TextComponent + ImageComponentClass = message_module.ImageComponent + EmojiComponentClass = message_module.EmojiComponent + VoiceComponentClass = message_module.VoiceComponent + AtComponentClass = message_module.AtComponent + ReplyComponentClass = message_module.ReplyComponent + ForwardNodeComponentClass = message_module.ForwardNodeComponent + MessageSequenceClass = sys.modules["src.common.data_models.message_component_data_model"].MessageSequence + ForwardComponentClass = sys.modules["src.common.data_models.message_component_data_model"].ForwardComponent + globals()["SessionMessage"] = SessionMessageClass + globals()["TextComponent"] = TextComponentClass + globals()["ImageComponent"] = ImageComponentClass + globals()["EmojiComponent"] = EmojiComponentClass + globals()["VoiceComponent"] = VoiceComponentClass + globals()["AtComponent"] = AtComponentClass + globals()["ReplyComponent"] = ReplyComponentClass + globals()["ForwardNodeComponent"] = ForwardNodeComponentClass + globals()["MessageSequence"] = MessageSequenceClass + globals()["ForwardComponent"] = ForwardComponentClass + return message_module + + +def dummy_number_to_short_id(original_id: int, salt: str, length: int = 6) -> str: + return "X" * length # 返回固定的字符串,长度由参数决定,模拟生成短ID的行为 + +def dummy_is_bot_self(user_id: str) -> bool: + return user_id == "bot_self" + +def load_utils_via_file(monkeypatch): + setup_mocks(monkeypatch) + + # Mock math_utils 模块,供 from .math_utils import number_to_short_id 使用 + math_utils_mod = ModuleType("src.common.utils.math_utils") + math_utils_mod.number_to_short_id = dummy_number_to_short_id + monkeypatch.setitem(sys.modules, "src.common.utils.math_utils", math_utils_mod) + + # 确保包层级模块存在于 sys.modules 中,使相对导入能正确解析 + for pkg_name in ["src", "src.common", "src.common.utils"]: + if pkg_name not in sys.modules: + pkg_mod = ModuleType(pkg_name) + pkg_mod.__path__ = [] + monkeypatch.setitem(sys.modules, pkg_name, pkg_mod) + + file_path = Path(__file__).parent.parent.parent / "src" / "common" / "utils" / "utils_message.py" + spec = importlib.util.spec_from_file_location("src.common.utils.utils_message", file_path) + utils_module = importlib.util.module_from_spec(spec) + utils_module.__package__ = "src.common.utils" # 设置包,使相对导入生效 + monkeypatch.setitem(sys.modules, "src.common.utils.utils_message", utils_module) + monkeypatch.setitem(sys.modules, "message_utils_module", utils_module) + spec.loader.exec_module(utils_module) + utils_module.is_bot_self = dummy_is_bot_self + return utils_module + + +@pytest.mark.asyncio +async def test_message_utils(monkeypatch): + load_message_via_file(monkeypatch) + utils_module = load_utils_via_file(monkeypatch) + MessageUtils = utils_module.MessageUtils + + +@pytest.mark.asyncio +async def test_build_readable_message_basic(monkeypatch): + """基础用例:单条消息,显示行号""" + load_message_via_file(monkeypatch) + utils_module = load_utils_via_file(monkeypatch) + MessageUtils = utils_module.MessageUtils + + msg = SessionMessage("m1", datetime.now()) + msg.platform = "test" + msg.session_id = "s_test" + user_info = UserInfo(user_id="u1", user_nickname="Alice") + msg.message_info = MessageInfo(user_info=user_info) + msg.raw_message = MessageSequence([TextComponent("Hello world")]) + text, mapping = await MessageUtils.build_readable_message([msg], anonymize=False, show_lineno=True) + assert "[1] Alice说:Hello world" in text + assert mapping == {} + + +@pytest.mark.asyncio +async def test_build_readable_message_anonymize(monkeypatch): + """匿名化用例:验证 mapping 和返回文本""" + load_message_via_file(monkeypatch) + utils_module = load_utils_via_file(monkeypatch) + MessageUtils = utils_module.MessageUtils + + msg = SessionMessage("m2", datetime.now()) + msg.platform = "test" + msg.session_id = "s_test" + user_info = UserInfo(user_id="u42", user_nickname="Bob") + msg.message_info = MessageInfo(user_info=user_info) + msg.raw_message = MessageSequence([TextComponent("Secret text")]) + text, mapping = await MessageUtils.build_readable_message([msg], anonymize=True, show_lineno=False) + # 根据实现,original_name 为 user_nickname,因此文本中应包含原始名称 + assert "XXXXXX说:" in text + assert "u42" in mapping + assert mapping["u42"][0] == "XXXXXX" + assert mapping["u42"][1] == "Bob" + + +@pytest.mark.asyncio +async def test_build_readable_message_replace_bot(monkeypatch): + """替换机器人名用例:当 user_id 为 bot_self 时应被替换为 target_bot_name""" + load_message_via_file(monkeypatch) + utils_module = load_utils_via_file(monkeypatch) + MessageUtils = utils_module.MessageUtils + + msg = SessionMessage("m3", datetime.now()) + msg.platform = "test" + msg.session_id = "s_test" + user_info = UserInfo(user_id="bot_self", user_nickname="SomeBot") + msg.message_info = MessageInfo(user_info=user_info) + msg.raw_message = MessageSequence([TextComponent("ping")]) + text, mapping = await MessageUtils.build_readable_message([msg], replace_bot_name=True, target_bot_name="MAIBot") + assert "MAIBot说:ping" in text + + +@pytest.mark.asyncio +async def test_build_readable_message_image_extraction(monkeypatch): + """图片提取:验证 extract_pictures 为 True 时,文本中包含图片占位及 img_map 内容被返回""" + load_message_via_file(monkeypatch) + utils_module = load_utils_via_file(monkeypatch) + MessageUtils = utils_module.MessageUtils + + # 构建包含图片组件的消息 + img = ImageComponent(binary_hash="h", binary_data=b"\x01\x02", content="Img") + msg = SessionMessage("mi1", datetime.now()) + msg.platform = "test" + msg.session_id = "s_img" + msg.raw_message = MessageSequence([img]) + msg.message_info = MessageInfo(UserInfo(user_id="ui_img", user_nickname="ImgUser")) + text, mapping = await MessageUtils.build_readable_message([msg], extract_pictures=True) + # 应包含图片描述占位 + assert "图片1" in text + # mapping 不为空(匿名化未开启则为空) + assert isinstance(mapping, dict) + + +@pytest.mark.asyncio +async def test_build_readable_message_anonymize_and_replace_bot_name_and_lineno(monkeypatch): + """组合用例:多个消息同时包含匿名化、机器人名称替换""" + load_message_via_file(monkeypatch) + utils_module = load_utils_via_file(monkeypatch) + MessageUtils = utils_module.MessageUtils + # 构建多个消息 + msg1 = SessionMessage("m4", datetime.now()) + msg1.platform = "test" + msg1.session_id = "s_comb" + msg2 = SessionMessage("m5", datetime.now()) + msg2.platform = "test" + msg2.session_id = "s_comb" + msg1.message_info = MessageInfo(UserInfo(user_id="u_comb", user_nickname="Charlie")) + msg2.message_info = MessageInfo(UserInfo(user_id="bot_self", user_nickname="SomeBot")) + msg1.raw_message = MessageSequence([TextComponent("Hi")]) + msg2.raw_message = MessageSequence([TextComponent("Hello")]) + text, mapping = await MessageUtils.build_readable_message( + [msg1, msg2], + anonymize=True, + replace_bot_name=True, + target_bot_name="MAIBot", + show_lineno=True, + ) + # 验证文本内容 + assert "[1] XXXXXX说:Hi" in text + assert "[2] MAIBot说:Hello" in text + # 验证 mapping 内容 + assert "u_comb" in mapping + assert mapping["u_comb"][0] == "XXXXXX" + +@pytest.mark.asyncio +async def test_build_readable_message_with_at(monkeypatch): + """包含@组件的消息:验证@组件中的用户信息也被匿名化和替换""" + load_message_via_file(monkeypatch) + utils_module = load_utils_via_file(monkeypatch) + MessageUtils = utils_module.MessageUtils + + # 构建包含回复组件的消息 + at_comp = AtComponent(target_user_id="u_at", target_user_nickname="AtUser") + msg = SessionMessage("m_at", datetime.now()) + msg.platform = "test" + msg.session_id = "s_at" + msg.raw_message = MessageSequence([at_comp]) + msg.message_info = MessageInfo(UserInfo(user_id="u_main", user_nickname="MainUser")) + text, mapping = await MessageUtils.build_readable_message([msg], anonymize=True, replace_bot_name=True, target_bot_name="MAIBot") + # 验证主消息和@组件中的用户信息都被处理 + assert "XXXXXX说:" in text # 主消息用户被匿名化 + assert "XXXXXX说:@XXXXXX" in text # @组件用户被匿名化 \ No newline at end of file diff --git a/src/common/data_models/message_component_data_model.py b/src/common/data_models/message_component_data_model.py index 3149db57..d7b5c287 100644 --- a/src/common/data_models/message_component_data_model.py +++ b/src/common/data_models/message_component_data_model.py @@ -13,6 +13,7 @@ from src.common.logger import get_logger logger = get_logger("base_message_component_model") +class UnknownUser(str): ... class BaseMessageComponentModel(ABC): @property @@ -255,13 +256,13 @@ class ForwardComponent(BaseMessageComponentModel): def __init__( self, - user_nickname: str, + user_nickname: str | UnknownUser, message_id: str, content: List[StandardMessageComponents], user_id: Optional[str] = None, user_cardname: Optional[str] = None, ): - self.user_nickname: str = user_nickname + self.user_nickname: str | UnknownUser = user_nickname """转发节点的发送者昵称""" self.message_id: str = message_id """转发节点的消息ID""" diff --git a/src/common/utils/math_utils.py b/src/common/utils/math_utils.py new file mode 100644 index 00000000..d37ba793 --- /dev/null +++ b/src/common/utils/math_utils.py @@ -0,0 +1,34 @@ +import hashlib + +def number_to_short_id(original_id: int, salt: str, length: int = 6) -> str: + """ + 将数字编号转换为短ID(不可逆) + + :param original_id: 原始数字 + :param length: 想要生成的短ID长度 (建议 4-8) + :return: 短ID字符串 + """ + # 1. 加盐,避免简单的哈希冲突和猜测 + data = f"{original_id}{salt}".encode("utf-8") + + # 2. 计算 SHA-256 哈希 + hash_digest = hashlib.sha256(data).digest() + + # 3. 取前几个字节转换为整数 + # 为了达到需要的长度,我们可能需要取更多的字节 + num_bytes_needed = max(4, length) # 保证足够的熵 + hash_int = int.from_bytes(hash_digest[:num_bytes_needed], byteorder="big") + + # 4. 使用 Base62 字符集编码 + characters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" + base = len(characters) + + short_id = "" + temp_num = hash_int + + # 生成指定长度的ID + for _ in range(length): + short_id = characters[temp_num % base] + short_id + temp_num //= base + + return short_id diff --git a/src/common/utils/utils_message.py b/src/common/utils/utils_message.py index f4c5ff2a..a344c8ea 100644 --- a/src/common/utils/utils_message.py +++ b/src/common/utils/utils_message.py @@ -1,9 +1,10 @@ from maim_message import MessageBase, Seg -from typing import List, Tuple, Optional, TYPE_CHECKING +from typing import List, Tuple, Optional, Dict, TYPE_CHECKING import base64 import hashlib import msgpack +import random import re from src.common.data_models.message_component_data_model import ( @@ -16,9 +17,13 @@ from src.common.data_models.message_component_data_model import ( AtComponent, ReplyComponent, DictComponent, + UnknownUser, + ForwardNodeComponent, ) from src.config.config import global_config +from .math_utils import number_to_short_id + if TYPE_CHECKING: from src.chat.message_receive.message import SessionMessage @@ -136,3 +141,307 @@ class MessageUtils: with get_db_session() as session: db_message = message.to_db_instance() session.add(db_message) + + @staticmethod + async def build_readable_message( + messages: List["SessionMessage"], + anonymize: bool = False, + show_lineno: bool = False, + extract_pictures: bool = False, + replace_bot_name: bool = False, + target_bot_name: Optional[str] = None, + ) -> Tuple[str, Dict[str, Tuple[str, str]]]: + """ + 将消息构建为LLM可读的文本格式 + + Args: + messages (List[SessionMessage]): 消息列表 + anonymize (bool): 是否匿名化用户信息 + show_lineno (bool): 是否在每条消息前显示行号 + extract_pictures (bool): 是否提取图片信息并在文本中显示占位符 + replace_bot_name (bool): 是否将消息中的机器人名称替换为统一的占位符 + target_bot_name (Optional[str]): 如果replace_bot_name为True,指定要替换的机器人名称 + Returns: + return (Tuple[str, Dict[str, Tuple[str, str]]]): 构建后的消息文本,以及映射表(匿名ID, 原始名称) + """ + msg_list: List["SessionMessage"] = messages + user_id_mapping: Dict[str, Tuple[str, str]] = {} # user_id -> (匿名ID, 原始名称) + copied: bool = False # 标记是否已经复制过消息列表,避免不必要的复制开销 + img_map: Optional[Dict[str, Tuple[int, str]]] = None + emoji_map: Optional[Dict[str, Tuple[int, str]]] = None + if replace_bot_name and not target_bot_name: + raise ValueError("当replace_bot_name为True时,必须指定target_bot_name参数") + if anonymize or replace_bot_name: + user_id_mapping = {} # 利用弱引用直接传入并得到修改结果 + anonymous_messages: List["SessionMessage"] = [] + salt_str = str(random.randint(100000, 999999)) # 每次调用生成一个随机盐,确保匿名ID不可预测 + anonymous_messages.extend( + MessageUtils._process_usr_info( + msg, + user_id_mapping, + salt_str, + anonymize, + replace_bot_name, + target_bot_name, + ) + for msg in messages + ) + msg_list = anonymous_messages + copied = True + + processed_plain_texts: List[str] = [] + if extract_pictures: + img_map = {} # binary_hash -> (图片ID, 描述信息) + emoji_map = {} # binary_hash -> (表情ID, 描述信息) + msg_list = [ + MessageUtils._extract_pictures_from_message(msg, img_map, emoji_map, copied) for msg in msg_list + ] + processed_plain_texts.extend(f"[图片{img_id}: {desc}]" for img_id, desc in img_map.values()) + processed_plain_texts.append("") # 图片和表情之间添加一个换行,避免连在一起 + processed_plain_texts.extend(f"[表情{emoji_id}: {desc}]" for emoji_id, desc in emoji_map.values()) + processed_plain_texts.append("") # 表情和消息文本之间添加两个换行,避免连在一起 + + lineno_counter = 1 + for msg in msg_list: + await msg.process() + plain_text: str = msg.processed_plain_text # type: ignore + usr_info = msg.message_info.user_info + usr_name = usr_info.user_cardname or usr_info.user_nickname or "未知用户" + header = f"[{lineno_counter}] {usr_name}说:" if show_lineno else f"{usr_name}说:" + lineno_counter += 1 + processed_plain_texts.append("".join([header, plain_text])) + + return "\n".join(processed_plain_texts), user_id_mapping + + @staticmethod + def _process_usr_info( + message: "SessionMessage", + anonymize_mapping: Dict[str, Tuple[str, str]], + salt: str, + anonymize: bool, + replace_bot_name: bool, + target_bot_name: Optional[str] = None, + ): + """处理消息中的用户信息,进行匿名化显示""" + new_message = message.deepcopy() + new_component_list = [ + MessageUtils._process_msg_component( + component, + anonymize_mapping, + salt, + anonymize, + replace_bot_name, + target_bot_name, + ) + for component in new_message.raw_message.components + ] + new_message.raw_message.components = new_component_list + msg_usr_info = message.message_info.user_info + if anonymize: + if msg_usr_info.user_id not in anonymize_mapping: + num = len(anonymize_mapping) + 1 + anonymous_id = number_to_short_id(num, salt, length=6) + original_name = msg_usr_info.user_cardname or msg_usr_info.user_nickname or msg_usr_info.user_id + anonymize_mapping[msg_usr_info.user_id] = (anonymous_id, original_name) + anonymous_name = anonymize_mapping[msg_usr_info.user_id][0] + new_message.message_info.user_info.user_nickname = anonymous_name + new_message.message_info.user_info.user_cardname = anonymous_name + if replace_bot_name and target_bot_name and is_bot_self(msg_usr_info.user_id): + new_message.message_info.user_info.user_nickname = target_bot_name + new_message.message_info.user_info.user_cardname = target_bot_name + return new_message + + @staticmethod + def _process_msg_component( + component: StandardMessageComponents, + anonymize_mapping: Dict[str, Tuple[str, str]], + salt: str, + anonymize: bool, + replace_bot_name: bool, + target_bot_name: Optional[str] = None, + ) -> StandardMessageComponents: + """将消息组件中的用户信息匿名化""" + if isinstance(component, AtComponent): + return MessageUtils.__handle_at_component( + component, + anonymize_mapping, + salt, + anonymize, + replace_bot_name, + target_bot_name, + ) + elif isinstance(component, ReplyComponent): + return MessageUtils.__handle_reply_component( + component, + anonymize_mapping, + salt, + anonymize, + replace_bot_name, + target_bot_name, + ) + elif isinstance(component, ForwardNodeComponent): + return MessageUtils.__handle_forward_node_component( + component, + anonymize_mapping, + salt, + anonymize, + replace_bot_name, + target_bot_name, + ) + return component + + @staticmethod + def __handle_at_component( + component: AtComponent, + anonymize_mapping: Dict[str, Tuple[str, str]], + salt: str, + anonymize: bool, + replace_bot_name: bool, + target_bot_name: Optional[str] = None, + ): + user_id = component.target_user_id # user_id一定存在 + if anonymize: + if user_id not in anonymize_mapping: + # 新人物? 编号 + 1,生成一个新的匿名ID + num = len(anonymize_mapping) + 1 + anonymous_id = number_to_short_id(num, salt, length=6) + original_name = component.target_user_cardname or component.target_user_nickname or user_id + anonymize_mapping[user_id] = (anonymous_id, original_name) + # 替换昵称和备注为匿名ID + anonymous_name = anonymize_mapping[user_id][0] + component.target_user_nickname = anonymous_name + component.target_user_cardname = anonymous_name + if replace_bot_name and target_bot_name and is_bot_self(user_id): + component.target_user_nickname = target_bot_name + component.target_user_cardname = target_bot_name + return component + + @staticmethod + def __handle_forward_node_component( + component: ForwardNodeComponent, + anonymize_mapping: Dict[str, Tuple[str, str]], + salt: str, + anonymize: bool, + replace_bot_name: bool, + target_bot_name: Optional[str] = None, + ): + for comp in component.forward_components: + user_id = comp.user_id + if not user_id: # 如果转发节点的用户ID不存在,直接设置为未知用户 + comp.user_id = "unknown_user" + comp.user_cardname = "未知用户" + comp.user_nickname = "未知用户" + continue + if isinstance(user_id, UnknownUser): # 如果用户ID是UnknownUser类型,直接设置为未知用户 + comp.user_id = "unknown_user" + comp.user_cardname = "未知用户" + comp.user_nickname = "未知用户" + continue + if anonymize: + if user_id not in anonymize_mapping: + num = len(anonymize_mapping) + 1 + anonymous_id = number_to_short_id(num, salt, length=6) + original_name = comp.user_cardname or comp.user_nickname or user_id + anonymize_mapping[user_id] = (anonymous_id, original_name) + anonymous_name = anonymize_mapping[user_id][0] + comp.user_nickname = anonymous_name + comp.user_cardname = anonymous_name + if replace_bot_name and target_bot_name and is_bot_self(user_id): + comp.user_nickname = target_bot_name + comp.user_cardname = target_bot_name + comp.content = [ # 递归处理转发消息中的组件 + MessageUtils._process_msg_component( + c, + anonymize_mapping, + salt, + anonymize, + replace_bot_name, + target_bot_name, + ) + for c in comp.content + ] + return component + + @staticmethod + def __handle_reply_component( + component: ReplyComponent, + anonymize_mapping: Dict[str, Tuple[str, str]], + salt: str, + anonymize: bool, + replace_bot_name: bool, + target_bot_name: Optional[str] = None, + ): + if user_id := component.target_message_sender_id: + if anonymize: + if user_id not in anonymize_mapping: + num = len(anonymize_mapping) + 1 + anonymous_id = number_to_short_id(num, salt, length=6) + original_name = ( + component.target_message_sender_cardname or component.target_message_sender_nickname or user_id + ) + anonymize_mapping[user_id] = (anonymous_id, original_name) + anonymous_name = anonymize_mapping[user_id][0] + component.target_message_sender_nickname = anonymous_name + component.target_message_sender_cardname = anonymous_name + if replace_bot_name and target_bot_name and is_bot_self(user_id): + component.target_message_sender_nickname = target_bot_name + component.target_message_sender_cardname = target_bot_name + else: + component.target_message_sender_nickname = "未知用户" # 如果没有Reply消息的发送者ID,直接设置为未知用户 + component.target_message_sender_cardname = "未知用户" + return component + + @staticmethod + def _extract_pictures_from_message( + message: "SessionMessage", + img_map: Dict[str, Tuple[int, str]], + emoji_map: Dict[str, Tuple[int, str]], + copied: bool, + ): + """从消息中提取图片组件,返回列表包含(图片ID, 描述信息)""" + if not copied: + message = message.deepcopy() # 避免修改原消息 + new_component_list: List[StandardMessageComponents] = [] + new_component_list.extend( + MessageUtils._extract_pictures_from_component(component, img_map, emoji_map) + for component in message.raw_message.components + ) + message.raw_message.components = new_component_list + return message + + @staticmethod + def _extract_pictures_from_component( + component: StandardMessageComponents, + img_map: Dict[str, Tuple[int, str]], + emoji_map: Dict[str, Tuple[int, str]], + ) -> StandardMessageComponents: + """从消息组件中提取图片信息""" + if isinstance(component, ImageComponent): + if component.binary_hash in img_map: + img_id, _ = img_map[component.binary_hash] + else: + img_id = len(img_map) + 1 + img_map[component.binary_hash] = (img_id, component.content) + component.content = f"图片{img_id}" + elif isinstance(component, EmojiComponent): + if component.binary_hash in emoji_map: + emoji_id, _ = emoji_map[component.binary_hash] + else: + emoji_id = len(emoji_map) + 1 + emoji_map[component.binary_hash] = (emoji_id, component.content) + component.content = f"表情{emoji_id}" + elif isinstance(component, ForwardNodeComponent): + for comp in component.forward_components: + comp.content = [ + MessageUtils._extract_pictures_from_component(c, img_map, emoji_map) for c in comp.content + ] + return component + +# TODO: 这个函数的实现非常临时,后续需要替换为更完善的实现,比如直接从配置文件中读取机器人自己的ID,或者通过API获取机器人自己的信息等 +def is_bot_self(user_id: str) -> bool: + """ + 判断用户ID是否是机器人自己 + + 临时方法,后续会替换为更完善的实现 + """ + return user_id == "bot_self"