合并到远程

This commit is contained in:
UnCLAS-Prommer
2026-02-13 13:41:58 +08:00
parent 60f76e4d4e
commit b9f3c17e14
15 changed files with 420 additions and 450 deletions

View File

@@ -1,54 +1,6 @@
import copy
from typing import Any
class BaseDataModel:
def deepcopy(self):
return copy.deepcopy(self)
def transform_class_to_dict(obj: Any) -> Any:
# sourcery skip: assign-if-exp, reintroduce-else
"""
将对象或容器中的 BaseDataModel 子类(类对象)或 BaseDataModel 实例
递归转换为普通 dict不修改原对象。
- 对于类对象isinstance(value, type) 且 issubclass(..., BaseDataModel)
读取类的 __dict__ 中非 dunder 项并递归转换。
- 对于实例isinstance(value, BaseDataModel)),读取 vars(instance) 并递归转换。
"""
def _transform(value: Any) -> Any:
# 值是类对象且为 BaseDataModel 的子类
if isinstance(value, type) and issubclass(value, BaseDataModel):
return {k: _transform(v) for k, v in value.__dict__.items() if not k.startswith("__") and not callable(v)}
# 值是 BaseDataModel 的实例
if isinstance(value, BaseDataModel):
return {k: _transform(v) for k, v in vars(value).items()}
# 常见容器类型,递归处理
if isinstance(value, dict):
return {k: _transform(v) for k, v in value.items()}
if isinstance(value, list):
return [_transform(v) for v in value]
if isinstance(value, tuple):
return tuple(_transform(v) for v in value)
if isinstance(value, set):
return {_transform(v) for v in value}
# 基本类型,直接返回
return value
result = _transform(obj)
def flatten(target_dict: dict):
flat_dict = {}
for k, v in target_dict.items():
if isinstance(v, dict):
# 递归扁平化子字典
sub_flat = flatten(v)
flat_dict.update(sub_flat)
else:
flat_dict[k] = v
return flat_dict
return flatten(result) if isinstance(result, dict) else result

View File

@@ -0,0 +1,226 @@
from abc import ABC, abstractmethod
from copy import deepcopy
from maim_message import Seg, UserInfo, MessageBase, BaseMessageInfo
from typing import Optional, List, Union, Dict, Any
import asyncio
import hashlib
import base64
from src.common.logger import get_logger
logger = get_logger("base_message_component_model")
class BaseMessageComponentModel(ABC):
@abstractmethod
async def to_seg(self) -> Seg:
"""将消息组件转换为 maim_message.Seg 对象"""
raise NotImplementedError
def clone(self):
return deepcopy(self)
class ByteComponent:
def __init__(self, *, binary_hash: str, content: Optional[str] = None, binary_data: Optional[bytes] = None) -> None:
self.content: str = content if content is not None else ""
"""处理后的内容"""
self.binary_data: bytes = binary_data if binary_data is not None else b""
"""原始二进制数据"""
self.binary_hash: str = hashlib.sha256(self.binary_data).hexdigest() if self.binary_data else binary_hash
"""二进制数据的 SHA256 哈希值,用于唯一标识该二进制数据"""
class TextComponent(BaseMessageComponentModel):
def __init__(self, text: str):
self.text = text
assert isinstance(text, str), "TextComponent 的 text 必须是字符串类型"
async def to_seg(self) -> Seg:
return Seg(type="text", data=self.text)
class ImageComponent(BaseMessageComponentModel, ByteComponent):
async def load_image_binary(self):
if not self.binary_data:
...
async def to_seg(self) -> Seg:
if not self.binary_data:
await self.load_image_binary()
return Seg(type="image", data=base64.b64encode(self.binary_data).decode())
class EmojiComponent(BaseMessageComponentModel, ByteComponent):
async def load_emoji_binary(self) -> None:
"""
加载表情的二进制数据,如果 binary_data 为空,则通过 emoji_hash 从表情管理器加载
Raises:
ValueError: 如果 binary_data 为空且缺少 emoji_hash
ValueError: 如果无法通过 emoji_hash 加载表情二进制数据
"""
if not self.binary_data:
from src.chat.emoji_system.emoji_manager import emoji_manager
if not (
emoji := emoji_manager.get_emoji_by_hash(self.binary_hash)
or emoji_manager.get_emoji_by_hash_from_db(self.binary_hash)
):
raise ValueError(f"无法通过 emoji_hash 加载表情二进制数据: {self.binary_hash}")
try:
self.binary_data = await asyncio.to_thread(emoji.full_path.read_bytes)
except Exception as e:
raise ValueError(f"通过 emoji_hash 加载表情二进制数据时发生错误: {e}") from e
async def to_seg(self) -> Seg:
if not self.binary_data:
await self.load_emoji_binary()
return Seg(type="emoji", data=base64.b64encode(self.binary_data).decode())
class VoiceComponent(BaseMessageComponentModel, ByteComponent):
async def load_voice_binary(self) -> None:
if not self.binary_data:
from src.common.utils.utils_file import FileUtils
try:
file_path = FileUtils.get_file_path_by_hash(self.binary_hash)
self.binary_data = await asyncio.to_thread(file_path.read_bytes)
except Exception as e:
raise ValueError(f"通过 voice_hash 加载语音二进制数据时发生错误: {e}") from e
async def to_seg(self) -> Seg:
if not self.binary_data:
await self.load_voice_binary()
return Seg(type="voice", data=base64.b64encode(self.binary_data).decode())
class ForwardNodeComponent(BaseMessageComponentModel):
def __init__(self, forward_components: List["ForwardComponent"]):
self.forward_components = forward_components
assert isinstance(forward_components, list), "ForwardNodeComponent 的 forward_components 必须是列表类型"
assert all(isinstance(comp, ForwardComponent) for comp in forward_components), (
"ForwardNodeComponent 的 forward_components 列表中必须全部是 ForwardComponent 类型"
)
assert forward_components, "ForwardNodeComponent 的 forward_components 不能为空列表"
async def to_seg(self) -> "Seg":
resp: List[Dict[str, Any]] = []
for comp in self.forward_components:
data = await comp.to_seg()
sender_info = UserInfo(None, comp.user_id, comp.user_nickname, comp.user_cardname)
base_message_info = BaseMessageInfo(user_info=sender_info)
base_message = MessageBase(base_message_info, data)
resp.append(base_message.to_dict())
return Seg(type="forward", data=resp) # type: ignore
class DictComponent:
def __init__(self, data: Dict[str, Any]):
self.data = data
assert isinstance(data, dict), "DictComponent 的 data 必须是字典类型"
StandardMessageComponents = Union[
TextComponent,
ImageComponent,
EmojiComponent,
VoiceComponent,
ForwardNodeComponent,
DictComponent,
]
class ForwardComponent(BaseMessageComponentModel):
def __init__(
self,
user_nickname: str,
content: List[StandardMessageComponents],
user_id: Optional[str] = None,
user_cardname: Optional[str] = None,
):
self.user_nickname: str = user_nickname
self.content: List[StandardMessageComponents] = content
self.user_id: Optional[str] = user_id
self.user_cardname: Optional[str] = user_cardname
assert self.content, "ForwardComponent 的 content 不能为空"
async def to_seg(self) -> "Seg":
return Seg(
type="seglist", data=[await comp.to_seg() for comp in self.content if not isinstance(comp, DictComponent)]
)
class MessageSequence:
def __init__(self, components: List[StandardMessageComponents]):
self.components: List[StandardMessageComponents] = components
def to_dict(self) -> List[Dict[str, Any]]:
return [self._item_2_dict(comp) for comp in self.components]
def _item_2_dict(self, item: StandardMessageComponents) -> Dict[str, Any]:
if isinstance(item, TextComponent):
return {"type": "text", "data": item.text}
elif isinstance(item, ImageComponent):
if not item.content:
raise RuntimeError("ImageComponent content 未初始化")
return {"type": "image", "data": item.content, "hash": item.binary_hash}
elif isinstance(item, EmojiComponent):
if not item.content:
raise RuntimeError("EmojiComponent content 未初始化")
return {"type": "emoji", "data": item.content, "hash": item.binary_hash}
elif isinstance(item, VoiceComponent):
if not item.content:
raise RuntimeError("VoiceComponent content 未初始化")
return {"type": "voice", "data": item.content, "hash": item.binary_hash}
elif isinstance(item, ForwardNodeComponent):
return {
"type": "forward",
"data": [
{
"user_id": comp.user_id,
"user_nickname": comp.user_nickname,
"user_cardname": comp.user_cardname,
"content": [self._item_2_dict(c) for c in comp.content],
}
for comp in item.forward_components
],
}
else:
logger.warning(f"Unofficial component type: {type(item)}, defaulting to DictComponent")
return {"type": "dict", "data": item.data}
@classmethod
def from_dict(cls, data: List[Dict[str, Any]]) -> "MessageSequence":
components: List[StandardMessageComponents] = []
components.extend(cls._dict_2_item(item) for item in data)
return cls(components=components)
@classmethod
def _dict_2_item(cls, item: Dict[str, Any]) -> StandardMessageComponents:
item_type = item.get("type")
if item_type == "text":
return TextComponent(text=item["data"])
elif item_type == "image":
return ImageComponent(binary_hash=item["hash"], content=item["data"])
elif item_type == "emoji":
return EmojiComponent(binary_hash=item["hash"], content=item["data"])
elif item_type == "voice":
return VoiceComponent(binary_hash=item["hash"], content=item["data"])
elif item_type == "forward":
forward_components = []
for fc in item["data"]:
content = [cls._dict_2_item(c) for c in fc["content"]]
forward_component = ForwardComponent(
user_nickname=fc["user_nickname"],
user_id=fc.get("user_id"),
user_cardname=fc.get("user_cardname"),
content=content,
)
forward_components.append(forward_component)
return ForwardNodeComponent(forward_components=forward_components)
else:
logger.warning(f"Unofficial component type in dict: {item_type}, defaulting to DictComponent")
return DictComponent(data=item.get("data") or {})

View File

@@ -1,210 +0,0 @@
from typing import Optional, TYPE_CHECKING, List, Tuple, Union, Dict, Any
from dataclasses import dataclass, field
from enum import Enum
from . import BaseDataModel
if TYPE_CHECKING:
from .database_data_model import DatabaseMessages
@dataclass
class MessageAndActionModel(BaseDataModel):
chat_id: str = field(default_factory=str)
time: float = field(default_factory=float)
user_id: str = field(default_factory=str)
user_platform: str = field(default_factory=str)
user_nickname: str = field(default_factory=str)
user_cardname: Optional[str] = None
processed_plain_text: Optional[str] = None
display_message: Optional[str] = None
chat_info_platform: str = field(default_factory=str)
is_action_record: bool = field(default=False)
action_name: Optional[str] = None
is_command: bool = field(default=False)
intercept_message_level: int = field(default=0)
@classmethod
def from_DatabaseMessages(cls, message: "DatabaseMessages"):
return cls(
chat_id=message.chat_id,
time=message.time,
user_id=message.user_info.user_id,
user_platform=message.user_info.platform,
user_nickname=message.user_info.user_nickname,
user_cardname=message.user_info.user_cardname,
processed_plain_text=message.processed_plain_text,
display_message=message.display_message,
chat_info_platform=message.chat_info.platform,
is_command=message.is_command,
intercept_message_level=getattr(message, "intercept_message_level", 0),
)
class ReplyContentType(Enum):
TEXT = "text"
IMAGE = "image"
EMOJI = "emoji"
COMMAND = "command"
VOICE = "voice"
FORWARD = "forward"
HYBRID = "hybrid" # 混合类型,包含多种内容
def __repr__(self) -> str:
return self.value
@dataclass
class ForwardNode(BaseDataModel):
user_id: Optional[str] = None
user_nickname: Optional[str] = None
content: Union[List["ReplyContent"], str] = field(default_factory=list)
@classmethod
def construct_as_id_reference(cls, message_id: str) -> "ForwardNode":
return cls(user_id="", user_nickname="", content=message_id)
@classmethod
def construct_as_created_node(
cls, user_id: str, user_nickname: str, content: List["ReplyContent"]
) -> "ForwardNode":
return cls(user_id=user_id, user_nickname=user_nickname, content=content)
@dataclass
class ReplyContent(BaseDataModel):
content_type: ReplyContentType | str
content: Union[str, Dict, List[ForwardNode], List["ReplyContent"]] # 支持嵌套的 ReplyContent
@classmethod
def construct_as_text(cls, text: str):
return cls(content_type=ReplyContentType.TEXT, content=text)
@classmethod
def construct_as_image(cls, image_base64: str):
return cls(content_type=ReplyContentType.IMAGE, content=image_base64)
@classmethod
def construct_as_voice(cls, voice_base64: str):
return cls(content_type=ReplyContentType.VOICE, content=voice_base64)
@classmethod
def construct_as_emoji(cls, emoji_str: str):
return cls(content_type=ReplyContentType.EMOJI, content=emoji_str)
@classmethod
def construct_as_command(cls, command_arg: Dict):
return cls(content_type=ReplyContentType.COMMAND, content=command_arg)
@classmethod
def construct_as_hybrid(cls, hybrid_content: List[Tuple[ReplyContentType | str, str]]):
hybrid_content_list: List[ReplyContent] = []
for content_type, content in hybrid_content:
assert content_type not in [
ReplyContentType.HYBRID,
ReplyContentType.FORWARD,
ReplyContentType.VOICE,
ReplyContentType.COMMAND,
], "混合内容的每个项不能是混合、转发、语音或命令类型"
assert isinstance(content, str), "混合内容的每个项必须是字符串"
hybrid_content_list.append(ReplyContent(content_type=content_type, content=content))
return cls(content_type=ReplyContentType.HYBRID, content=hybrid_content_list)
@classmethod
def construct_as_forward(cls, forward_nodes: List[ForwardNode]):
return cls(content_type=ReplyContentType.FORWARD, content=forward_nodes)
def __post_init__(self):
if isinstance(self.content_type, ReplyContentType):
if self.content_type not in [ReplyContentType.HYBRID, ReplyContentType.FORWARD] and isinstance(
self.content, List
):
raise ValueError(
f"非混合类型/转发类型的内容不能是列表content_type: {self.content_type}, content: {self.content}"
)
elif self.content_type in [ReplyContentType.HYBRID, ReplyContentType.FORWARD]:
if not isinstance(self.content, List):
raise ValueError(
f"混合类型/转发类型的内容必须是列表content_type: {self.content_type}, content: {self.content}"
)
@dataclass
class ReplySetModel(BaseDataModel):
"""
回复集数据模型,用于多种回复类型的返回
"""
reply_data: List[ReplyContent] = field(default_factory=list)
def __len__(self):
return len(self.reply_data)
def add_text_content(self, text: str):
"""
添加文本内容
Args:
text: 文本内容
"""
self.reply_data.append(ReplyContent(content_type=ReplyContentType.TEXT, content=text))
def add_image_content(self, image_base64: str):
"""
添加图片内容base64编码的图片数据
Args:
image_base64: base64编码的图片数据
"""
self.reply_data.append(ReplyContent(content_type=ReplyContentType.IMAGE, content=image_base64))
def add_voice_content(self, voice_base64: str):
"""
添加语音内容base64编码的音频数据
Args:
voice_base64: base64编码的音频数据
"""
self.reply_data.append(ReplyContent(content_type=ReplyContentType.VOICE, content=voice_base64))
def add_hybrid_content_by_raw(self, hybrid_content: List[Tuple[ReplyContentType | str, str]]):
"""
添加混合型内容可以包含text, image, emoji的任意组合
Args:
hybrid_content: 元组 (类型, 消息内容) 构成的列表,如[(ReplyContentType.TEXT, "Hello"), (ReplyContentType.IMAGE, "<base64")]
"""
hybrid_content_list: List[ReplyContent] = []
for content_type, content in hybrid_content:
assert content_type not in [
ReplyContentType.HYBRID,
ReplyContentType.FORWARD,
ReplyContentType.VOICE,
ReplyContentType.COMMAND,
], "混合内容的每个项不能是混合、转发、语音或命令类型"
assert isinstance(content, str), "混合内容的每个项必须是字符串"
hybrid_content_list.append(ReplyContent(content_type=content_type, content=content))
self.reply_data.append(ReplyContent(content_type=ReplyContentType.HYBRID, content=hybrid_content_list))
def add_hybrid_content(self, hybrid_content: List[ReplyContent]):
"""
添加混合型内容,使用已经构造好的 ReplyContent 列表
Args:
hybrid_content: ReplyContent 构成的列表,如[ReplyContent(ReplyContentType.TEXT, "Hello"), ReplyContent(ReplyContentType.IMAGE, "<base64")]
"""
for content in hybrid_content:
assert content.content_type not in [
ReplyContentType.HYBRID,
ReplyContentType.FORWARD,
ReplyContentType.VOICE,
ReplyContentType.COMMAND,
], "混合内容的每个项不能是混合、转发、语音或命令类型"
assert isinstance(content.content, str), "混合内容的每个项必须是字符串"
self.reply_data.append(ReplyContent(content_type=ReplyContentType.HYBRID, content=hybrid_content))
def add_custom_content(self, content_type: str, content: Any):
"""
添加自定义类型的内容"""
self.reply_data.append(ReplyContent(content_type=content_type, content=content))
def add_forward_content(self, forward_content: List[ForwardNode]):
"""添加转发内容可以是字符串或ReplyContent嵌套的转发内容需要自己构造放入"""
self.reply_data.append(ReplyContent(content_type=ReplyContentType.FORWARD, content=forward_content))

View File

@@ -1,36 +0,0 @@
# 对于`message_data_model.py`中`class ReplyContent`的规划解读
分类讨论如下:
- `ReplyContent.TEXT`: 单独的文本,`_level = 0``content``str`类型。
- `ReplyContent.IMAGE`: 单独的图片,`_level = 0``content``str`类型图片base64
- `ReplyContent.EMOJI`: 单独的表情包,`_level = 0``content``str`类型图片base64
- `ReplyContent.VOICE`: 单独的语音,`_level = 0``content``str`类型语音base64
- `ReplyContent.HYBRID`: 混合内容,`_level = 0`
- 其应该是一个列表,列表内应该只接受`str`类型的内容(图片和文本混合体)
- `ReplyContent.FORWARD`: 转发消息,`_level = n`
- 其应该是一个列表,列表接受`str`类型(图片/文本),`ReplyContent`类型(嵌套转发,嵌套有最高层数限制)
- `ReplyContent.COMMAND`: 指令消息,`_level = 0`
- 其应该是一个列表,列表内应该只接受`Dict`类型的内容
未来规划:
- `ReplyContent.AT` 单独的艾特,`_level = 0``content``str`类型用户ID
内容构造方式:
- 对于`TEXT`, `IMAGE`, `EMOJI`, `VOICE`,直接传入对应类型的内容,且`content`应该为`str`
- 对于`COMMAND`,传入一个字典,字典内的内容类型应符合上述规定。
- 对于`HYBRID`, `FORWARD`,传入一个列表,列表内的内容类型应符合上述规定。
因此,我们的类型注解应该是:
```python
from typing import Union, List, Dict
ReplyContentType = Union[
str, # TEXT, IMAGE, EMOJI, VOICE
List[Union[str, 'ReplyContent']], # HYBRID, FORWARD
Dict # COMMAND
]
```
现在`_level`被移除了,在解析的时候显式地检查内容的类型和结构即可。
`send_api`的custom_reply_set_to_stream仅在特定的类型下提供reply)message

View File

@@ -1,57 +0,0 @@
# 有关转发消息和其他消息的构建类型说明
```mermaid
graph LR;
direction TB;
A[ReplySet] --- B[ReplyContent];
A --- C["ReplyContent"];
A --- K["ReplyContent"];
A --- L["ReplyContent"];
A --- N["ReplyContent"];
A --- D[...];
B --- E["Text (in str)"];
B --- F["Image (in base64)"];
C --- G["Voice (in base64)"];
B --- I["Emoji (in base64)"];
subgraph "可行内容(以下的任意组合)";
subgraph "转发消息(Forward)"
M["List[ForwardNode]"]
end
subgraph "混合消息(Hybrid)"
J["List[ReplyContent] (要求只能包含普通消息)"]
end
subgraph "命令消息(Command)"
H["Command (in Dict)"]
end
subgraph "语音消息"
G
end
subgraph "普通消息"
E
F
I
end
end
N --- H
K --- J
L --- M
subgraph ForwardNodes
O["ForwardNode"]
P["ForwardNode"]
Q["ForwardNode"]
end
M --- O
M --- P
M --- Q
subgraph "内容 (message_id引用法)"
P --- U["content: str, 引用已有消息的有效ID"];
end
subgraph "内容 (生成法)"
O --- R["user_id: str"];
O --- S["user_nickname: str"];
O --- T["content: List[ReplyContent], 为这个转发节点的消息内容"];
end
```
另外,自定义消息类型我们在这里不做讨论。
以上列出了所有可能的ReplySet构建方式下面我们来解释一下各个类型的含义。