import sys from dataclasses import dataclass, field import pytest import importlib import importlib.util from types import ModuleType from pathlib import Path from datetime import datetime from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from src.common.data_models.message_component_data_model import MessageSequence from src.chat.message_receive.message import ( SessionMessage, TextComponent, ImageComponent, AtComponent, ) class DummyLogger: def __init__(self) -> None: self.logging_record = [] def debug(self, msg): print(f"DEBUG: {msg}") self.logging_record.append(f"DEBUG: {msg}") def info(self, msg): print(f"INFO: {msg}") self.logging_record.append(f"INFO: {msg}") def warning(self, msg): print(f"WARNING: {msg}") self.logging_record.append(f"WARNING: {msg}") def error(self, msg): print(f"ERROR: {msg}") self.logging_record.append(f"ERROR: {msg}") def critical(self, msg): print(f"CRITICAL: {msg}") self.logging_record.append(f"CRITICAL: {msg}") def get_logger(name): return DummyLogger() class DummyDBSession: def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): pass def exec(self, statement): return self def first(self): return None def commit(self): pass def all(self): return [] def get_db_session(): return DummyDBSession() def get_manual_db_session(): return DummyDBSession() class DummySelect: def __init__(self, model): self.model = model def filter_by(self, **kwargs): return self def where(self, condition): return self def limit(self, n): return self def select(model): return DummySelect(model) async def dummy_get_voice_text(binary_data): return None # 可以根据需要返回模拟的文本结果 class DummyPersonUtils: @staticmethod def get_person_info_by_user_id_and_platform(user_id, platform): return None # 可以根据需要返回模拟的用户信息 class DummyConfig: class MessageReceiveConfig: ban_words = set() ban_msgs_regex = set() message_receive = MessageReceiveConfig() @dataclass class UserInfo: user_id: str user_nickname: str user_cardname: Optional[str] = None @dataclass class GroupInfo: group_id: str group_name: str @dataclass class MessageInfo: user_info: UserInfo group_info: Optional[GroupInfo] = None additional_config: dict = field(default_factory=dict) def setup_mocks(monkeypatch): def _stub_module(name: str) -> ModuleType: module = ModuleType(name) monkeypatch.setitem(sys.modules, name, module) return module # src.common.logger logger_mod = _stub_module("src.common.logger") # Mock the logger logger_mod.get_logger = get_logger db_mod = _stub_module("src.common.database.database") db_mod.get_db_session = get_db_session db_mod.get_manual_db_session = get_manual_db_session db_model_mod = _stub_module("src.common.database.database_model") db_model_mod.Messages = None # 可以根据需要添加更多的属性或方法 emoji_manager_mod = _stub_module("src.chat.emoji_system.emoji_manager") emoji_manager_mod.emoji_manager = None # 可以根据需要添加更多的属性或方法 image_manager_mod = _stub_module("src.chat.image_system.image_manager") image_manager_mod.image_manager = None # 可以根据需要添加更多的属性或方法 voice_utils_mod = _stub_module("src.common.utils.utils_voice") voice_utils_mod.get_voice_text = dummy_get_voice_text person_utils_mod = _stub_module("src.common.utils.utils_person") person_utils_mod.PersonUtils = DummyPersonUtils config_mod = _stub_module("src.config.config") config_mod.global_config = DummyConfig() def load_message_via_file(monkeypatch): setup_mocks(monkeypatch) file_path = Path(__file__).parent.parent.parent / "src" / "chat" / "message_receive" / "message.py" spec = importlib.util.spec_from_file_location("message", file_path) message_module = importlib.util.module_from_spec(spec) monkeypatch.setitem(sys.modules, "message_module", message_module) spec.loader.exec_module(message_module) message_module.select = select SessionMessageClass = message_module.SessionMessage TextComponentClass = message_module.TextComponent ImageComponentClass = message_module.ImageComponent EmojiComponentClass = message_module.EmojiComponent VoiceComponentClass = message_module.VoiceComponent AtComponentClass = message_module.AtComponent ReplyComponentClass = message_module.ReplyComponent ForwardNodeComponentClass = message_module.ForwardNodeComponent MessageSequenceClass = sys.modules["src.common.data_models.message_component_data_model"].MessageSequence ForwardComponentClass = sys.modules["src.common.data_models.message_component_data_model"].ForwardComponent globals()["SessionMessage"] = SessionMessageClass globals()["TextComponent"] = TextComponentClass globals()["ImageComponent"] = ImageComponentClass globals()["EmojiComponent"] = EmojiComponentClass globals()["VoiceComponent"] = VoiceComponentClass globals()["AtComponent"] = AtComponentClass globals()["ReplyComponent"] = ReplyComponentClass globals()["ForwardNodeComponent"] = ForwardNodeComponentClass globals()["MessageSequence"] = MessageSequenceClass globals()["ForwardComponent"] = ForwardComponentClass return message_module def dummy_number_to_short_id(original_id: int, salt: str, length: int = 6) -> str: return "X" * length # 返回固定的字符串,长度由参数决定,模拟生成短ID的行为 def dummy_is_bot_self(platform, user_id: str) -> bool: return user_id == "bot_self" def load_utils_via_file(monkeypatch): setup_mocks(monkeypatch) # Mock math_utils 模块,供 from .math_utils import number_to_short_id 使用 math_utils_mod = ModuleType("src.common.utils.math_utils") math_utils_mod.number_to_short_id = dummy_number_to_short_id math_utils_mod.TimestampMode = type( "TimestampMode", (), {"NORMAL": "%Y-%m-%d %H:%M:%S", "NORMAL_NO_YMD": "%H:%M:%S", "RELATIVE": "relative"} ) math_utils_mod.translate_timestamp_to_human_readable = lambda timestamp, mode: ( "2024-01-01 12:00:00" ) # 返回固定的时间字符串 monkeypatch.setitem(sys.modules, "src.common.utils.math_utils", math_utils_mod) # 确保包层级模块存在于 sys.modules 中,使相对导入能正确解析 for pkg_name in ["src", "src.common", "src.common.utils"]: if pkg_name not in sys.modules: pkg_mod = ModuleType(pkg_name) pkg_mod.__path__ = [] monkeypatch.setitem(sys.modules, pkg_name, pkg_mod) file_path = Path(__file__).parent.parent.parent / "src" / "common" / "utils" / "utils_message.py" spec = importlib.util.spec_from_file_location("src.common.utils.utils_message", file_path) utils_module = importlib.util.module_from_spec(spec) utils_module.__package__ = "src.common.utils" # 设置包,使相对导入生效 monkeypatch.setitem(sys.modules, "src.common.utils.utils_message", utils_module) monkeypatch.setitem(sys.modules, "message_utils_module", utils_module) spec.loader.exec_module(utils_module) utils_module.is_bot_self = dummy_is_bot_self return utils_module @pytest.mark.asyncio async def test_message_utils(monkeypatch): load_message_via_file(monkeypatch) load_utils_via_file(monkeypatch) @pytest.mark.asyncio async def test_build_readable_message_basic(monkeypatch): """基础用例:单条消息,显示行号""" load_message_via_file(monkeypatch) utils_module = load_utils_via_file(monkeypatch) MessageUtils = utils_module.MessageUtils msg = SessionMessage("m1", datetime.now(), platform="test") msg.platform = "test" msg.session_id = "s_test" user_info = UserInfo(user_id="u1", user_nickname="Alice") msg.message_info = MessageInfo(user_info=user_info) msg.raw_message = MessageSequence([TextComponent("Hello world")]) text, mapping, _ = await MessageUtils.build_readable_message([msg], anonymize=False, show_lineno=True) assert "[1] Alice说:Hello world" in text assert mapping == {} @pytest.mark.asyncio async def test_build_readable_message_anonymize(monkeypatch): """匿名化用例:验证 mapping 和返回文本""" load_message_via_file(monkeypatch) utils_module = load_utils_via_file(monkeypatch) MessageUtils = utils_module.MessageUtils msg = SessionMessage("m2", datetime.now(), platform="test") msg.session_id = "s_test" user_info = UserInfo(user_id="u42", user_nickname="Bob") msg.message_info = MessageInfo(user_info=user_info) msg.raw_message = MessageSequence([TextComponent("Secret text")]) text, mapping, _ = await MessageUtils.build_readable_message([msg], anonymize=True, show_lineno=False) # 根据实现,original_name 为 user_nickname,因此文本中应包含原始名称 assert "XXXXXX说:" in text assert "u42" in mapping assert mapping["u42"][0] == "XXXXXX" assert mapping["u42"][1] == "Bob" @pytest.mark.asyncio async def test_build_readable_message_replace_bot(monkeypatch): """替换机器人名用例:当 user_id 为 bot_self 时应被替换为 target_bot_name""" load_message_via_file(monkeypatch) utils_module = load_utils_via_file(monkeypatch) MessageUtils = utils_module.MessageUtils msg = SessionMessage("m3", datetime.now(), platform="test") msg.session_id = "s_test" user_info = UserInfo(user_id="bot_self", user_nickname="SomeBot") msg.message_info = MessageInfo(user_info=user_info) msg.raw_message = MessageSequence([TextComponent("ping")]) text, mapping, _ = await MessageUtils.build_readable_message([msg], replace_bot_name=True, target_bot_name="MAIBot") assert "MAIBot说:ping" in text @pytest.mark.asyncio async def test_build_readable_message_image_extraction(monkeypatch): """图片提取:验证 extract_pictures 为 True 时,文本中包含图片占位及 img_map 内容被返回""" load_message_via_file(monkeypatch) utils_module = load_utils_via_file(monkeypatch) MessageUtils = utils_module.MessageUtils # 构建包含图片组件的消息 img = ImageComponent(binary_hash="h", binary_data=b"\x01\x02", content="Img") msg = SessionMessage("mi1", datetime.now(), platform="test") msg.session_id = "s_img" msg.raw_message = MessageSequence([img]) msg.message_info = MessageInfo(UserInfo(user_id="ui_img", user_nickname="ImgUser")) text, mapping, _ = await MessageUtils.build_readable_message([msg], extract_pictures=True) # 应包含图片描述占位 assert "图片1" in text # mapping 不为空(匿名化未开启则为空) assert isinstance(mapping, dict) @pytest.mark.asyncio async def test_build_readable_message_anonymize_and_replace_bot_name_and_lineno(monkeypatch): """组合用例:多个消息同时包含匿名化、机器人名称替换""" load_message_via_file(monkeypatch) utils_module = load_utils_via_file(monkeypatch) MessageUtils = utils_module.MessageUtils # 构建多个消息 msg1 = SessionMessage("m4", datetime.now(), platform="test") msg1.session_id = "s_comb" msg2 = SessionMessage("m5", datetime.now(), platform="test") msg2.session_id = "s_comb" msg1.message_info = MessageInfo(UserInfo(user_id="u_comb", user_nickname="Charlie")) msg2.message_info = MessageInfo(UserInfo(user_id="bot_self", user_nickname="SomeBot")) msg1.raw_message = MessageSequence([TextComponent("Hi")]) msg2.raw_message = MessageSequence([TextComponent("Hello")]) text, mapping, _ = await MessageUtils.build_readable_message( [msg1, msg2], anonymize=True, replace_bot_name=True, target_bot_name="MAIBot", show_lineno=True, ) # 验证文本内容 assert "[1] XXXXXX说:Hi" in text assert "[2] MAIBot说:Hello" in text # 验证 mapping 内容 assert "u_comb" in mapping assert mapping["u_comb"][0] == "XXXXXX" @pytest.mark.asyncio async def test_build_readable_message_with_at(monkeypatch): """包含@组件的消息:验证@组件中的用户信息也被匿名化和替换""" load_message_via_file(monkeypatch) utils_module = load_utils_via_file(monkeypatch) MessageUtils = utils_module.MessageUtils # 构建包含回复组件的消息 at_comp = AtComponent(target_user_id="u_at", target_user_nickname="AtUser") msg = SessionMessage("m_at", datetime.now(), platform="test") msg.session_id = "s_at" msg.raw_message = MessageSequence([at_comp]) msg.message_info = MessageInfo(UserInfo(user_id="u_main", user_nickname="MainUser")) text, mapping, _ = await MessageUtils.build_readable_message( [msg], anonymize=True, replace_bot_name=True, target_bot_name="MAIBot" ) # 验证主消息和@组件中的用户信息都被处理 assert "XXXXXX说:" in text # 主消息用户被匿名化 assert "XXXXXX说:@XXXXXX" in text # @组件用户被匿名化