消息构建器:将消息转换为可读消息;其对应的测试文件

This commit is contained in:
UnCLAS-Prommer
2026-03-04 21:01:51 +08:00
parent c16ced765e
commit d03dc3601e
4 changed files with 714 additions and 3 deletions

View File

@@ -0,0 +1,367 @@
import sys
from dataclasses import dataclass, field
import pytest
import importlib
import importlib.util
from types import ModuleType
from pathlib import Path
from datetime import datetime
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from src.common.data_models.message_component_data_model import MessageSequence, ForwardComponent
from src.chat.message_receive.message import (
SessionMessage,
TextComponent,
ImageComponent,
EmojiComponent,
VoiceComponent,
AtComponent,
ReplyComponent,
ForwardNodeComponent,
)
class DummyLogger:
def __init__(self) -> None:
self.logging_record = []
def debug(self, msg):
print(f"DEBUG: {msg}")
self.logging_record.append(f"DEBUG: {msg}")
def info(self, msg):
print(f"INFO: {msg}")
self.logging_record.append(f"INFO: {msg}")
def warning(self, msg):
print(f"WARNING: {msg}")
self.logging_record.append(f"WARNING: {msg}")
def error(self, msg):
print(f"ERROR: {msg}")
self.logging_record.append(f"ERROR: {msg}")
def critical(self, msg):
print(f"CRITICAL: {msg}")
self.logging_record.append(f"CRITICAL: {msg}")
def get_logger(name):
return DummyLogger()
class DummyDBSession:
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def exec(self, statement):
return self
def first(self):
return None
def commit(self):
pass
def all(self):
return []
def get_db_session():
return DummyDBSession()
def get_manual_db_session():
return DummyDBSession()
class DummySelect:
def __init__(self, model):
self.model = model
def filter_by(self, **kwargs):
return self
def where(self, condition):
return self
def limit(self, n):
return self
def select(model):
return DummySelect(model)
async def dummy_get_voice_text(binary_data):
return None # 可以根据需要返回模拟的文本结果
class DummyPersonUtils:
@staticmethod
def get_person_info_by_user_id_and_platform(user_id, platform):
return None # 可以根据需要返回模拟的用户信息
class DummyConfig:
class MessageReceiveConfig:
ban_words = set()
ban_msgs_regex = set()
message_receive = MessageReceiveConfig()
@dataclass
class UserInfo:
user_id: str
user_nickname: str
user_cardname: Optional[str] = None
@dataclass
class GroupInfo:
group_id: str
group_name: str
@dataclass
class MessageInfo:
user_info: UserInfo
group_info: Optional[GroupInfo] = None
additional_config: dict = field(default_factory=dict)
def setup_mocks(monkeypatch):
def _stub_module(name: str) -> ModuleType:
module = ModuleType(name)
monkeypatch.setitem(sys.modules, name, module)
return module
# src.common.logger
logger_mod = _stub_module("src.common.logger")
# Mock the logger
logger_mod.get_logger = get_logger
db_mod = _stub_module("src.common.database.database")
db_mod.get_db_session = get_db_session
db_mod.get_manual_db_session = get_manual_db_session
db_model_mod = _stub_module("src.common.database.database_model")
db_model_mod.Messages = None # 可以根据需要添加更多的属性或方法
emoji_manager_mod = _stub_module("src.chat.emoji_system.emoji_manager")
emoji_manager_mod.emoji_manager = None # 可以根据需要添加更多的属性或方法
image_manager_mod = _stub_module("src.chat.image_system.image_manager")
image_manager_mod.image_manager = None # 可以根据需要添加更多的属性或方法
voice_utils_mod = _stub_module("src.common.utils.utils_voice")
voice_utils_mod.get_voice_text = dummy_get_voice_text
person_utils_mod = _stub_module("src.common.utils.utils_person")
person_utils_mod.PersonUtils = DummyPersonUtils
config_mod = _stub_module("src.config.config")
config_mod.global_config = DummyConfig()
def load_message_via_file(monkeypatch):
setup_mocks(monkeypatch)
file_path = Path(__file__).parent.parent.parent / "src" / "chat" / "message_receive" / "message.py"
spec = importlib.util.spec_from_file_location("message", file_path)
message_module = importlib.util.module_from_spec(spec)
monkeypatch.setitem(sys.modules, "message_module", message_module)
spec.loader.exec_module(message_module)
message_module.select = select
SessionMessageClass = message_module.SessionMessage
TextComponentClass = message_module.TextComponent
ImageComponentClass = message_module.ImageComponent
EmojiComponentClass = message_module.EmojiComponent
VoiceComponentClass = message_module.VoiceComponent
AtComponentClass = message_module.AtComponent
ReplyComponentClass = message_module.ReplyComponent
ForwardNodeComponentClass = message_module.ForwardNodeComponent
MessageSequenceClass = sys.modules["src.common.data_models.message_component_data_model"].MessageSequence
ForwardComponentClass = sys.modules["src.common.data_models.message_component_data_model"].ForwardComponent
globals()["SessionMessage"] = SessionMessageClass
globals()["TextComponent"] = TextComponentClass
globals()["ImageComponent"] = ImageComponentClass
globals()["EmojiComponent"] = EmojiComponentClass
globals()["VoiceComponent"] = VoiceComponentClass
globals()["AtComponent"] = AtComponentClass
globals()["ReplyComponent"] = ReplyComponentClass
globals()["ForwardNodeComponent"] = ForwardNodeComponentClass
globals()["MessageSequence"] = MessageSequenceClass
globals()["ForwardComponent"] = ForwardComponentClass
return message_module
def dummy_number_to_short_id(original_id: int, salt: str, length: int = 6) -> str:
return "X" * length # 返回固定的字符串长度由参数决定模拟生成短ID的行为
def dummy_is_bot_self(user_id: str) -> bool:
return user_id == "bot_self"
def load_utils_via_file(monkeypatch):
setup_mocks(monkeypatch)
# Mock math_utils 模块,供 from .math_utils import number_to_short_id 使用
math_utils_mod = ModuleType("src.common.utils.math_utils")
math_utils_mod.number_to_short_id = dummy_number_to_short_id
monkeypatch.setitem(sys.modules, "src.common.utils.math_utils", math_utils_mod)
# 确保包层级模块存在于 sys.modules 中,使相对导入能正确解析
for pkg_name in ["src", "src.common", "src.common.utils"]:
if pkg_name not in sys.modules:
pkg_mod = ModuleType(pkg_name)
pkg_mod.__path__ = []
monkeypatch.setitem(sys.modules, pkg_name, pkg_mod)
file_path = Path(__file__).parent.parent.parent / "src" / "common" / "utils" / "utils_message.py"
spec = importlib.util.spec_from_file_location("src.common.utils.utils_message", file_path)
utils_module = importlib.util.module_from_spec(spec)
utils_module.__package__ = "src.common.utils" # 设置包,使相对导入生效
monkeypatch.setitem(sys.modules, "src.common.utils.utils_message", utils_module)
monkeypatch.setitem(sys.modules, "message_utils_module", utils_module)
spec.loader.exec_module(utils_module)
utils_module.is_bot_self = dummy_is_bot_self
return utils_module
@pytest.mark.asyncio
async def test_message_utils(monkeypatch):
load_message_via_file(monkeypatch)
utils_module = load_utils_via_file(monkeypatch)
MessageUtils = utils_module.MessageUtils
@pytest.mark.asyncio
async def test_build_readable_message_basic(monkeypatch):
"""基础用例:单条消息,显示行号"""
load_message_via_file(monkeypatch)
utils_module = load_utils_via_file(monkeypatch)
MessageUtils = utils_module.MessageUtils
msg = SessionMessage("m1", datetime.now())
msg.platform = "test"
msg.session_id = "s_test"
user_info = UserInfo(user_id="u1", user_nickname="Alice")
msg.message_info = MessageInfo(user_info=user_info)
msg.raw_message = MessageSequence([TextComponent("Hello world")])
text, mapping = await MessageUtils.build_readable_message([msg], anonymize=False, show_lineno=True)
assert "[1] Alice说Hello world" in text
assert mapping == {}
@pytest.mark.asyncio
async def test_build_readable_message_anonymize(monkeypatch):
"""匿名化用例:验证 mapping 和返回文本"""
load_message_via_file(monkeypatch)
utils_module = load_utils_via_file(monkeypatch)
MessageUtils = utils_module.MessageUtils
msg = SessionMessage("m2", datetime.now())
msg.platform = "test"
msg.session_id = "s_test"
user_info = UserInfo(user_id="u42", user_nickname="Bob")
msg.message_info = MessageInfo(user_info=user_info)
msg.raw_message = MessageSequence([TextComponent("Secret text")])
text, mapping = await MessageUtils.build_readable_message([msg], anonymize=True, show_lineno=False)
# 根据实现original_name 为 user_nickname因此文本中应包含原始名称
assert "XXXXXX说" in text
assert "u42" in mapping
assert mapping["u42"][0] == "XXXXXX"
assert mapping["u42"][1] == "Bob"
@pytest.mark.asyncio
async def test_build_readable_message_replace_bot(monkeypatch):
"""替换机器人名用例:当 user_id 为 bot_self 时应被替换为 target_bot_name"""
load_message_via_file(monkeypatch)
utils_module = load_utils_via_file(monkeypatch)
MessageUtils = utils_module.MessageUtils
msg = SessionMessage("m3", datetime.now())
msg.platform = "test"
msg.session_id = "s_test"
user_info = UserInfo(user_id="bot_self", user_nickname="SomeBot")
msg.message_info = MessageInfo(user_info=user_info)
msg.raw_message = MessageSequence([TextComponent("ping")])
text, mapping = await MessageUtils.build_readable_message([msg], replace_bot_name=True, target_bot_name="MAIBot")
assert "MAIBot说ping" in text
@pytest.mark.asyncio
async def test_build_readable_message_image_extraction(monkeypatch):
"""图片提取:验证 extract_pictures 为 True 时,文本中包含图片占位及 img_map 内容被返回"""
load_message_via_file(monkeypatch)
utils_module = load_utils_via_file(monkeypatch)
MessageUtils = utils_module.MessageUtils
# 构建包含图片组件的消息
img = ImageComponent(binary_hash="h", binary_data=b"\x01\x02", content="Img")
msg = SessionMessage("mi1", datetime.now())
msg.platform = "test"
msg.session_id = "s_img"
msg.raw_message = MessageSequence([img])
msg.message_info = MessageInfo(UserInfo(user_id="ui_img", user_nickname="ImgUser"))
text, mapping = await MessageUtils.build_readable_message([msg], extract_pictures=True)
# 应包含图片描述占位
assert "图片1" in text
# mapping 不为空(匿名化未开启则为空)
assert isinstance(mapping, dict)
@pytest.mark.asyncio
async def test_build_readable_message_anonymize_and_replace_bot_name_and_lineno(monkeypatch):
"""组合用例:多个消息同时包含匿名化、机器人名称替换"""
load_message_via_file(monkeypatch)
utils_module = load_utils_via_file(monkeypatch)
MessageUtils = utils_module.MessageUtils
# 构建多个消息
msg1 = SessionMessage("m4", datetime.now())
msg1.platform = "test"
msg1.session_id = "s_comb"
msg2 = SessionMessage("m5", datetime.now())
msg2.platform = "test"
msg2.session_id = "s_comb"
msg1.message_info = MessageInfo(UserInfo(user_id="u_comb", user_nickname="Charlie"))
msg2.message_info = MessageInfo(UserInfo(user_id="bot_self", user_nickname="SomeBot"))
msg1.raw_message = MessageSequence([TextComponent("Hi")])
msg2.raw_message = MessageSequence([TextComponent("Hello")])
text, mapping = await MessageUtils.build_readable_message(
[msg1, msg2],
anonymize=True,
replace_bot_name=True,
target_bot_name="MAIBot",
show_lineno=True,
)
# 验证文本内容
assert "[1] XXXXXX说Hi" in text
assert "[2] MAIBot说Hello" in text
# 验证 mapping 内容
assert "u_comb" in mapping
assert mapping["u_comb"][0] == "XXXXXX"
@pytest.mark.asyncio
async def test_build_readable_message_with_at(monkeypatch):
"""包含@组件的消息:验证@组件中的用户信息也被匿名化和替换"""
load_message_via_file(monkeypatch)
utils_module = load_utils_via_file(monkeypatch)
MessageUtils = utils_module.MessageUtils
# 构建包含回复组件的消息
at_comp = AtComponent(target_user_id="u_at", target_user_nickname="AtUser")
msg = SessionMessage("m_at", datetime.now())
msg.platform = "test"
msg.session_id = "s_at"
msg.raw_message = MessageSequence([at_comp])
msg.message_info = MessageInfo(UserInfo(user_id="u_main", user_nickname="MainUser"))
text, mapping = await MessageUtils.build_readable_message([msg], anonymize=True, replace_bot_name=True, target_bot_name="MAIBot")
# 验证主消息和@组件中的用户信息都被处理
assert "XXXXXX说" in text # 主消息用户被匿名化
assert "XXXXXX说@XXXXXX" in text # @组件用户被匿名化

View File

@@ -13,6 +13,7 @@ from src.common.logger import get_logger
logger = get_logger("base_message_component_model")
class UnknownUser(str): ...
class BaseMessageComponentModel(ABC):
@property
@@ -255,13 +256,13 @@ class ForwardComponent(BaseMessageComponentModel):
def __init__(
self,
user_nickname: str,
user_nickname: str | UnknownUser,
message_id: str,
content: List[StandardMessageComponents],
user_id: Optional[str] = None,
user_cardname: Optional[str] = None,
):
self.user_nickname: str = user_nickname
self.user_nickname: str | UnknownUser = user_nickname
"""转发节点的发送者昵称"""
self.message_id: str = message_id
"""转发节点的消息ID"""

View File

@@ -0,0 +1,34 @@
import hashlib
def number_to_short_id(original_id: int, salt: str, length: int = 6) -> str:
"""
将数字编号转换为短ID不可逆
:param original_id: 原始数字
:param length: 想要生成的短ID长度 (建议 4-8)
:return: 短ID字符串
"""
# 1. 加盐,避免简单的哈希冲突和猜测
data = f"{original_id}{salt}".encode("utf-8")
# 2. 计算 SHA-256 哈希
hash_digest = hashlib.sha256(data).digest()
# 3. 取前几个字节转换为整数
# 为了达到需要的长度,我们可能需要取更多的字节
num_bytes_needed = max(4, length) # 保证足够的熵
hash_int = int.from_bytes(hash_digest[:num_bytes_needed], byteorder="big")
# 4. 使用 Base62 字符集编码
characters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
base = len(characters)
short_id = ""
temp_num = hash_int
# 生成指定长度的ID
for _ in range(length):
short_id = characters[temp_num % base] + short_id
temp_num //= base
return short_id

View File

@@ -1,9 +1,10 @@
from maim_message import MessageBase, Seg
from typing import List, Tuple, Optional, TYPE_CHECKING
from typing import List, Tuple, Optional, Dict, TYPE_CHECKING
import base64
import hashlib
import msgpack
import random
import re
from src.common.data_models.message_component_data_model import (
@@ -16,9 +17,13 @@ from src.common.data_models.message_component_data_model import (
AtComponent,
ReplyComponent,
DictComponent,
UnknownUser,
ForwardNodeComponent,
)
from src.config.config import global_config
from .math_utils import number_to_short_id
if TYPE_CHECKING:
from src.chat.message_receive.message import SessionMessage
@@ -136,3 +141,307 @@ class MessageUtils:
with get_db_session() as session:
db_message = message.to_db_instance()
session.add(db_message)
@staticmethod
async def build_readable_message(
messages: List["SessionMessage"],
anonymize: bool = False,
show_lineno: bool = False,
extract_pictures: bool = False,
replace_bot_name: bool = False,
target_bot_name: Optional[str] = None,
) -> Tuple[str, Dict[str, Tuple[str, str]]]:
"""
将消息构建为LLM可读的文本格式
Args:
messages (List[SessionMessage]): 消息列表
anonymize (bool): 是否匿名化用户信息
show_lineno (bool): 是否在每条消息前显示行号
extract_pictures (bool): 是否提取图片信息并在文本中显示占位符
replace_bot_name (bool): 是否将消息中的机器人名称替换为统一的占位符
target_bot_name (Optional[str]): 如果replace_bot_name为True指定要替换的机器人名称
Returns:
return (Tuple[str, Dict[str, Tuple[str, str]]]): 构建后的消息文本以及映射表匿名ID, 原始名称)
"""
msg_list: List["SessionMessage"] = messages
user_id_mapping: Dict[str, Tuple[str, str]] = {} # user_id -> (匿名ID, 原始名称)
copied: bool = False # 标记是否已经复制过消息列表,避免不必要的复制开销
img_map: Optional[Dict[str, Tuple[int, str]]] = None
emoji_map: Optional[Dict[str, Tuple[int, str]]] = None
if replace_bot_name and not target_bot_name:
raise ValueError("当replace_bot_name为True时必须指定target_bot_name参数")
if anonymize or replace_bot_name:
user_id_mapping = {} # 利用弱引用直接传入并得到修改结果
anonymous_messages: List["SessionMessage"] = []
salt_str = str(random.randint(100000, 999999)) # 每次调用生成一个随机盐确保匿名ID不可预测
anonymous_messages.extend(
MessageUtils._process_usr_info(
msg,
user_id_mapping,
salt_str,
anonymize,
replace_bot_name,
target_bot_name,
)
for msg in messages
)
msg_list = anonymous_messages
copied = True
processed_plain_texts: List[str] = []
if extract_pictures:
img_map = {} # binary_hash -> (图片ID, 描述信息)
emoji_map = {} # binary_hash -> (表情ID, 描述信息)
msg_list = [
MessageUtils._extract_pictures_from_message(msg, img_map, emoji_map, copied) for msg in msg_list
]
processed_plain_texts.extend(f"[图片{img_id}: {desc}]" for img_id, desc in img_map.values())
processed_plain_texts.append("") # 图片和表情之间添加一个换行,避免连在一起
processed_plain_texts.extend(f"[表情{emoji_id}: {desc}]" for emoji_id, desc in emoji_map.values())
processed_plain_texts.append("") # 表情和消息文本之间添加两个换行,避免连在一起
lineno_counter = 1
for msg in msg_list:
await msg.process()
plain_text: str = msg.processed_plain_text # type: ignore
usr_info = msg.message_info.user_info
usr_name = usr_info.user_cardname or usr_info.user_nickname or "未知用户"
header = f"[{lineno_counter}] {usr_name}说:" if show_lineno else f"{usr_name}说:"
lineno_counter += 1
processed_plain_texts.append("".join([header, plain_text]))
return "\n".join(processed_plain_texts), user_id_mapping
@staticmethod
def _process_usr_info(
message: "SessionMessage",
anonymize_mapping: Dict[str, Tuple[str, str]],
salt: str,
anonymize: bool,
replace_bot_name: bool,
target_bot_name: Optional[str] = None,
):
"""处理消息中的用户信息,进行匿名化显示"""
new_message = message.deepcopy()
new_component_list = [
MessageUtils._process_msg_component(
component,
anonymize_mapping,
salt,
anonymize,
replace_bot_name,
target_bot_name,
)
for component in new_message.raw_message.components
]
new_message.raw_message.components = new_component_list
msg_usr_info = message.message_info.user_info
if anonymize:
if msg_usr_info.user_id not in anonymize_mapping:
num = len(anonymize_mapping) + 1
anonymous_id = number_to_short_id(num, salt, length=6)
original_name = msg_usr_info.user_cardname or msg_usr_info.user_nickname or msg_usr_info.user_id
anonymize_mapping[msg_usr_info.user_id] = (anonymous_id, original_name)
anonymous_name = anonymize_mapping[msg_usr_info.user_id][0]
new_message.message_info.user_info.user_nickname = anonymous_name
new_message.message_info.user_info.user_cardname = anonymous_name
if replace_bot_name and target_bot_name and is_bot_self(msg_usr_info.user_id):
new_message.message_info.user_info.user_nickname = target_bot_name
new_message.message_info.user_info.user_cardname = target_bot_name
return new_message
@staticmethod
def _process_msg_component(
component: StandardMessageComponents,
anonymize_mapping: Dict[str, Tuple[str, str]],
salt: str,
anonymize: bool,
replace_bot_name: bool,
target_bot_name: Optional[str] = None,
) -> StandardMessageComponents:
"""将消息组件中的用户信息匿名化"""
if isinstance(component, AtComponent):
return MessageUtils.__handle_at_component(
component,
anonymize_mapping,
salt,
anonymize,
replace_bot_name,
target_bot_name,
)
elif isinstance(component, ReplyComponent):
return MessageUtils.__handle_reply_component(
component,
anonymize_mapping,
salt,
anonymize,
replace_bot_name,
target_bot_name,
)
elif isinstance(component, ForwardNodeComponent):
return MessageUtils.__handle_forward_node_component(
component,
anonymize_mapping,
salt,
anonymize,
replace_bot_name,
target_bot_name,
)
return component
@staticmethod
def __handle_at_component(
component: AtComponent,
anonymize_mapping: Dict[str, Tuple[str, str]],
salt: str,
anonymize: bool,
replace_bot_name: bool,
target_bot_name: Optional[str] = None,
):
user_id = component.target_user_id # user_id一定存在
if anonymize:
if user_id not in anonymize_mapping:
# 新人物? 编号 + 1生成一个新的匿名ID
num = len(anonymize_mapping) + 1
anonymous_id = number_to_short_id(num, salt, length=6)
original_name = component.target_user_cardname or component.target_user_nickname or user_id
anonymize_mapping[user_id] = (anonymous_id, original_name)
# 替换昵称和备注为匿名ID
anonymous_name = anonymize_mapping[user_id][0]
component.target_user_nickname = anonymous_name
component.target_user_cardname = anonymous_name
if replace_bot_name and target_bot_name and is_bot_self(user_id):
component.target_user_nickname = target_bot_name
component.target_user_cardname = target_bot_name
return component
@staticmethod
def __handle_forward_node_component(
component: ForwardNodeComponent,
anonymize_mapping: Dict[str, Tuple[str, str]],
salt: str,
anonymize: bool,
replace_bot_name: bool,
target_bot_name: Optional[str] = None,
):
for comp in component.forward_components:
user_id = comp.user_id
if not user_id: # 如果转发节点的用户ID不存在直接设置为未知用户
comp.user_id = "unknown_user"
comp.user_cardname = "未知用户"
comp.user_nickname = "未知用户"
continue
if isinstance(user_id, UnknownUser): # 如果用户ID是UnknownUser类型直接设置为未知用户
comp.user_id = "unknown_user"
comp.user_cardname = "未知用户"
comp.user_nickname = "未知用户"
continue
if anonymize:
if user_id not in anonymize_mapping:
num = len(anonymize_mapping) + 1
anonymous_id = number_to_short_id(num, salt, length=6)
original_name = comp.user_cardname or comp.user_nickname or user_id
anonymize_mapping[user_id] = (anonymous_id, original_name)
anonymous_name = anonymize_mapping[user_id][0]
comp.user_nickname = anonymous_name
comp.user_cardname = anonymous_name
if replace_bot_name and target_bot_name and is_bot_self(user_id):
comp.user_nickname = target_bot_name
comp.user_cardname = target_bot_name
comp.content = [ # 递归处理转发消息中的组件
MessageUtils._process_msg_component(
c,
anonymize_mapping,
salt,
anonymize,
replace_bot_name,
target_bot_name,
)
for c in comp.content
]
return component
@staticmethod
def __handle_reply_component(
component: ReplyComponent,
anonymize_mapping: Dict[str, Tuple[str, str]],
salt: str,
anonymize: bool,
replace_bot_name: bool,
target_bot_name: Optional[str] = None,
):
if user_id := component.target_message_sender_id:
if anonymize:
if user_id not in anonymize_mapping:
num = len(anonymize_mapping) + 1
anonymous_id = number_to_short_id(num, salt, length=6)
original_name = (
component.target_message_sender_cardname or component.target_message_sender_nickname or user_id
)
anonymize_mapping[user_id] = (anonymous_id, original_name)
anonymous_name = anonymize_mapping[user_id][0]
component.target_message_sender_nickname = anonymous_name
component.target_message_sender_cardname = anonymous_name
if replace_bot_name and target_bot_name and is_bot_self(user_id):
component.target_message_sender_nickname = target_bot_name
component.target_message_sender_cardname = target_bot_name
else:
component.target_message_sender_nickname = "未知用户" # 如果没有Reply消息的发送者ID直接设置为未知用户
component.target_message_sender_cardname = "未知用户"
return component
@staticmethod
def _extract_pictures_from_message(
message: "SessionMessage",
img_map: Dict[str, Tuple[int, str]],
emoji_map: Dict[str, Tuple[int, str]],
copied: bool,
):
"""从消息中提取图片组件,返回列表包含(图片ID, 描述信息)"""
if not copied:
message = message.deepcopy() # 避免修改原消息
new_component_list: List[StandardMessageComponents] = []
new_component_list.extend(
MessageUtils._extract_pictures_from_component(component, img_map, emoji_map)
for component in message.raw_message.components
)
message.raw_message.components = new_component_list
return message
@staticmethod
def _extract_pictures_from_component(
component: StandardMessageComponents,
img_map: Dict[str, Tuple[int, str]],
emoji_map: Dict[str, Tuple[int, str]],
) -> StandardMessageComponents:
"""从消息组件中提取图片信息"""
if isinstance(component, ImageComponent):
if component.binary_hash in img_map:
img_id, _ = img_map[component.binary_hash]
else:
img_id = len(img_map) + 1
img_map[component.binary_hash] = (img_id, component.content)
component.content = f"图片{img_id}"
elif isinstance(component, EmojiComponent):
if component.binary_hash in emoji_map:
emoji_id, _ = emoji_map[component.binary_hash]
else:
emoji_id = len(emoji_map) + 1
emoji_map[component.binary_hash] = (emoji_id, component.content)
component.content = f"表情{emoji_id}"
elif isinstance(component, ForwardNodeComponent):
for comp in component.forward_components:
comp.content = [
MessageUtils._extract_pictures_from_component(c, img_map, emoji_map) for c in comp.content
]
return component
# TODO: 这个函数的实现非常临时后续需要替换为更完善的实现比如直接从配置文件中读取机器人自己的ID或者通过API获取机器人自己的信息等
def is_bot_self(user_id: str) -> bool:
"""
判断用户ID是否是机器人自己
临时方法,后续会替换为更完善的实现
"""
return user_id == "bot_self"