diff --git a/plugins/hello_world_plugin/plugin.py b/plugins/hello_world_plugin/plugin.py index 7284d9f5..ff59dbe7 100644 --- a/plugins/hello_world_plugin/plugin.py +++ b/plugins/hello_world_plugin/plugin.py @@ -1,3 +1,4 @@ +import random from typing import List, Tuple, Type, Any from src.plugin_system import ( BasePlugin, @@ -12,7 +13,9 @@ from src.plugin_system import ( EventType, MaiMessages, ToolParamType, + ReplyContentType, ) +from src.config.config import global_config class CompareNumbersTool(BaseTool): @@ -144,6 +147,44 @@ class PrintMessage(BaseEventHandler): return True, True, "消息已打印", None, None +class ForwardMessages(BaseEventHandler): + """ + 把接收到的消息转发到指定聊天ID + + 此组件是HYBRID消息和FORWARD消息的使用示例。 + 每收到10条消息,就会以1%的概率使用HYBRID消息转发,否则使用FORWARD消息转发。 + """ + + event_type = EventType.ON_MESSAGE + handler_name = "forward_messages_handler" + handler_description = "把接收到的消息转发到指定聊天ID" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.counter = 0 # 用于计数转发的消息数量 + self.messages: List[str] = [] + + async def execute(self, message: MaiMessages | None) -> Tuple[bool, bool, None, None, None]: + if self.get_config("print_message.enabled", False): + return True, True, None, None, None + if not message: + return True, True, None, None, None + stream_id = message.stream_id or "" + + if message.plain_text: + self.messages.append(message.plain_text) + self.counter += 1 + if self.counter % 10 == 0: + if random.random() < 0.01: + success = await self.send_hybrid(stream_id, [(ReplyContentType.TEXT, msg) for msg in self.messages]) + else: + success = await self.send_forward(stream_id, [(str(global_config.bot.qq_account), str(global_config.bot.nickname), [(ReplyContentType.TEXT, msg)]) for msg in self.messages]) + if not success: + raise ValueError("转发消息失败") + self.messages = [] + return True, True, None, None, None + + # ===== 插件注册 ===== @@ -185,6 +226,7 @@ class HelloWorldPlugin(BasePlugin): (ByeAction.get_action_info(), ByeAction), # 添加告别Action (TimeCommand.get_command_info(), TimeCommand), (PrintMessage.get_handler_info(), PrintMessage), + (ForwardMessages.get_handler_info(), ForwardMessages), ] diff --git a/src/chat/heart_flow/heartFC_chat.py b/src/chat/heart_flow/heartFC_chat.py index 884500b8..54057c8a 100644 --- a/src/chat/heart_flow/heartFC_chat.py +++ b/src/chat/heart_flow/heartFC_chat.py @@ -492,7 +492,7 @@ class HeartFChatting: return False, "", "" # 处理动作并获取结果 - result = await action_handler.handle_action() + result = await action_handler.execute() success, action_text = result command = "" diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 6fb74cd0..5f479615 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -180,6 +180,23 @@ class ChatBot: return + async def echo_message_process(self, raw_data: Dict[str, Any]) -> None: + """ + 用于专门处理回送消息ID的函数 + """ + message_data: Dict[str, Any] = raw_data.get("content", {}) + if not message_data: + return + message_type = message_data.get("type") + if message_type != "echo": + return + mmc_message_id = message_data.get("echo") + actual_message_id = message_data.get("actual_id") + if MessageStorage.update_message(mmc_message_id, actual_message_id): + logger.debug(f"更新消息ID成功: {mmc_message_id} -> {actual_message_id}") + else: + logger.warning(f"更新消息ID失败: {mmc_message_id} -> {actual_message_id}") + async def message_process(self, message_data: Dict[str, Any]) -> None: """处理转化后的统一格式消息 这个函数本质是预处理一些数据,根据配置信息和消息内容,预处理消息,并分发到合适的消息处理器中 @@ -215,18 +232,8 @@ class ChatBot: # print(message_data) # logger.debug(str(message_data)) message = MessageRecv(message_data) - - if await self.handle_notice_message(message): - # return - pass - group_info = message.message_info.group_info user_info = message.message_info.user_info - if message.message_info.additional_config: - sent_message = message.message_info.additional_config.get("echo", False) - if sent_message: # 处理上报的自身消息,更新message_id,需要ada支持上报事件 - await MessageStorage.update_message(message) - return continue_flag, modified_message = await events_manager.handle_mai_events( EventType.ON_MESSAGE_PRE_PROCESS, message @@ -236,6 +243,10 @@ class ChatBot: if modified_message and modified_message._modify_flags.modify_message_segments: message.message_segment = Seg(type="seglist", data=modified_message.message_segments) + if await self.handle_notice_message(message): + # return + pass + get_chat_manager().register_message(message) chat = await get_chat_manager().get_or_create_stream( diff --git a/src/chat/message_receive/storage.py b/src/chat/message_receive/storage.py index 37c9d188..a60bd24c 100644 --- a/src/chat/message_receive/storage.py +++ b/src/chat/message_receive/storage.py @@ -143,31 +143,26 @@ class MessageStorage: # 如果需要其他存储相关的函数,可以在这里添加 @staticmethod - async def update_message( - message: MessageRecv, - ) -> None: # 用于实时更新数据库的自身发送消息ID,目前能处理text,reply,image和emoji - """更新最新一条匹配消息的message_id""" + def update_message(mmc_message_id: str | None, qq_message_id: str | None) -> bool: + """实时更新数据库的自身发送消息ID""" try: - if message.message_segment.type == "notify": - mmc_message_id = message.message_segment.data.get("echo") # type: ignore - qq_message_id = message.message_segment.data.get("actual_id") # type: ignore - else: - logger.info(f"更新消息ID错误,seg类型为{message.message_segment.type}") - return if not qq_message_id: logger.info("消息不存在message_id,无法更新") - return + return False if matched_message := ( Messages.select().where((Messages.message_id == mmc_message_id)).order_by(Messages.time.desc()).first() ): # 更新找到的消息记录 Messages.update(message_id=qq_message_id).where(Messages.id == matched_message.id).execute() # type: ignore logger.debug(f"更新消息ID成功: {matched_message.message_id} -> {qq_message_id}") + return True else: logger.debug("未找到匹配的消息") + return False except Exception as e: logger.error(f"更新消息ID失败: {e}") + return False @staticmethod def replace_image_descriptions(text: str) -> str: diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 36b3df7d..5263517c 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -25,6 +25,7 @@ from src.chat.utils.chat_message_builder import ( replace_user_references, ) from src.chat.express.expression_selector import expression_selector + # from src.chat.memory_system.memory_activator import MemoryActivator from src.mood.mood_manager import mood_manager from src.person_info.person_info import Person, is_person_known @@ -213,7 +214,7 @@ class DefaultReplyer: traceback.print_exc() return False, llm_response - async def build_relation_info(self, chat_content: str, sender: str, person_list: List[Person] = None): + async def build_relation_info(self, chat_content: str, sender: str, person_list: List[Person]): if not global_config.relationship.enable_relationship: return "" @@ -234,7 +235,7 @@ class DefaultReplyer: for person in person_list: person_relation = await person.build_relationship() others_relation += person_relation - + return f"{sender_relation}\n{others_relation}" async def build_expression_habits(self, chat_history: str, target: str) -> Tuple[str, List[int]]: @@ -665,12 +666,19 @@ class DefaultReplyer: timestamp=time.time(), limit=int(global_config.chat.max_context_size * 0.33), ) - - person_list_short:List[Person] = [] + + person_list_short: List[Person] = [] for msg in message_list_before_short: - if global_config.bot.qq_account == msg.user_info.user_id and global_config.bot.platform == msg.user_info.platform: + if ( + global_config.bot.qq_account == msg.user_info.user_id + and global_config.bot.platform == msg.user_info.platform + ): continue - if reply_message and reply_message.user_info.user_id == msg.user_info.user_id and reply_message.user_info.platform == msg.user_info.platform: + if ( + reply_message + and reply_message.user_info.user_id == msg.user_info.user_id + and reply_message.user_info.platform == msg.user_info.platform + ): continue person = Person(platform=msg.user_info.platform, user_id=msg.user_info.user_id) if person.is_known: @@ -692,7 +700,9 @@ class DefaultReplyer: self._time_and_run_task( self.build_expression_habits(chat_talking_prompt_short, target), "expression_habits" ), - self._time_and_run_task(self.build_relation_info(chat_talking_prompt_short,sender, person_list_short), "relation_info"), + self._time_and_run_task( + self.build_relation_info(chat_talking_prompt_short, sender, person_list_short), "relation_info" + ), # self._time_and_run_task(self.build_memory_block(message_list_before_short, target), "memory_block"), self._time_and_run_task( self.build_tool_info(chat_talking_prompt_short, sender, target, enable_tool=enable_tool), "tool_info" @@ -846,7 +856,7 @@ class DefaultReplyer: # 并行执行2个构建任务 (expression_habits_block, _), relation_info, personality_prompt = await asyncio.gather( self.build_expression_habits(chat_talking_prompt_half, target), - self.build_relation_info(chat_talking_prompt_half, sender), + self.build_relation_info(chat_talking_prompt_half, sender, []), self.build_personality_prompt(), ) @@ -950,7 +960,7 @@ class DefaultReplyer: with Timer("LLM生成", {}): # 内部计时器,可选保留 # 直接使用已初始化的模型实例 logger.info(f"\n{prompt}\n") - + if global_config.debug.show_prompt: logger.info(f"\n{prompt}\n") else: diff --git a/src/main.py b/src/main.py index f007788f..e4935559 100644 --- a/src/main.py +++ b/src/main.py @@ -32,7 +32,6 @@ logger = get_logger("main") class MainSystem: def __init__(self): - # 使用消息API替代直接的FastAPI实例 self.app: MessageServer = get_global_api() self.server: Server = get_global_server() @@ -105,6 +104,7 @@ class MainSystem: # 将bot.py中的chat_bot.message_process消息处理函数注册到api.py的消息处理基类中 self.app.register_message_handler(chat_bot.message_process) + self.app.register_custom_message_handler("message_id_echo", chat_bot.echo_message_process) await check_and_run_migrations() diff --git a/src/plugin_system/__init__.py b/src/plugin_system/__init__.py index e20fc0af..18c04df7 100644 --- a/src/plugin_system/__init__.py +++ b/src/plugin_system/__init__.py @@ -27,6 +27,9 @@ from .base import ( ToolParamType, CustomEventHandlerResult, ReplyContentType, + ReplyContent, + ForwardNode, + ReplySetModel, ) # 导入工具模块 @@ -101,8 +104,11 @@ __all__ = [ "EventHandlerInfo", "EventType", "ToolParamType", - "ReplyContentType", # 消息 + "ReplyContentType", + "ReplyContent", + "ForwardNode", + "ReplySetModel", "MaiMessages", "CustomEventHandlerResult", # 装饰器 diff --git a/src/plugin_system/apis/send_api.py b/src/plugin_system/apis/send_api.py index 5fd53de5..f57dd7fd 100644 --- a/src/plugin_system/apis/send_api.py +++ b/src/plugin_system/apis/send_api.py @@ -292,8 +292,6 @@ async def command_to_stream( stream_id: str, storage_message: bool = True, display_message: str = "", - set_reply: bool = False, - reply_message: Optional["DatabaseMessages"] = None, ) -> bool: """向指定流发送命令 @@ -301,6 +299,7 @@ async def command_to_stream( command: 命令 stream_id: 聊天流ID storage_message: 是否存储消息到数据库 + display_message: 显示消息 Returns: bool: 是否发送成功 @@ -311,8 +310,6 @@ async def command_to_stream( display_message=display_message, typing=False, storage_message=storage_message, - set_reply=set_reply, - reply_message=reply_message, ) @@ -363,7 +360,18 @@ async def custom_reply_set_to_stream( storage_message: bool = True, show_log: bool = True, ) -> bool: - """向指定流发送混合型消息集""" + """ + 向指定流发送混合型消息集 + + Args: + reply_set: ReplySetModel 对象,包含多个 ReplyContent + stream_id: 聊天流ID + display_message: 显示消息 + typing: 是否显示正在输入 + reply_to: 回复消息,格式为"发送者:消息内容" + storage_message: 是否存储消息到数据库 + show_log: 是否显示日志 + """ flag: bool = True for reply_content in reply_set.reply_data: status: bool = False @@ -428,7 +436,7 @@ def _parse_content_to_seg(reply_content: "ReplyContent") -> Tuple[Seg, bool]: elif content_type == ReplyContentType.FORWARD: forward_message_list_data: List["ForwardNode"] = reply_content.content # type: ignore assert isinstance(forward_message_list_data, list), "转发类型内容必须是列表" - forward_message_list: List[MessageBase] = [] + forward_message_list: List[Dict] = [] for forward_node in forward_message_list_data: message_segment = Seg(type="id", data=forward_node.content) # type: ignore user_info: Optional[UserInfo] = None @@ -442,7 +450,7 @@ def _parse_content_to_seg(reply_content: "ReplyContent") -> Tuple[Seg, bool]: single_node_content.append(sub_seg) message_segment = Seg(type="seglist", data=single_node_content) forward_message_list.append( - MessageBase(message_segment=message_segment, message_info=BaseMessageInfo(user_info=user_info)) + MessageBase(message_segment=message_segment, message_info=BaseMessageInfo(user_info=user_info)).to_dict() ) return Seg(type="forward", data=forward_message_list), False # type: ignore else: diff --git a/src/plugin_system/base/__init__.py b/src/plugin_system/base/__init__.py index 378acc72..a8c320bf 100644 --- a/src/plugin_system/base/__init__.py +++ b/src/plugin_system/base/__init__.py @@ -25,6 +25,9 @@ from .component_types import ( ToolParamType, CustomEventHandlerResult, ReplyContentType, + ReplyContent, + ForwardNode, + ReplySetModel, ) from .config_types import ConfigField @@ -50,4 +53,7 @@ __all__ = [ "ToolParamType", "CustomEventHandlerResult", "ReplyContentType", + "ReplyContent", + "ForwardNode", + "ReplySetModel", ] diff --git a/src/plugin_system/base/base_action.py b/src/plugin_system/base/base_action.py index 3f06f929..5b25511f 100644 --- a/src/plugin_system/base/base_action.py +++ b/src/plugin_system/base/base_action.py @@ -2,9 +2,10 @@ import time import asyncio from abc import ABC, abstractmethod -from typing import Tuple, Optional, TYPE_CHECKING, Dict +from typing import Tuple, Optional, TYPE_CHECKING, Dict, List from src.common.logger import get_logger +from src.common.data_models.message_data_model import ReplyContentType, ReplyContent, ReplySetModel, ForwardNode from src.chat.message_receive.chat_stream import ChatStream from src.plugin_system.base.component_types import ActionActivationType, ActionInfo, ComponentType from src.plugin_system.apis import send_api, database_api, message_api @@ -156,6 +157,290 @@ class BaseAction(ABC): f"{self.log_prefix} 聊天信息: 类型={'群聊' if self.is_group else '私聊'}, 平台={self.platform}, 目标={self.target_id}" ) + @abstractmethod + async def execute(self) -> Tuple[bool, str]: + """执行Action的抽象方法,子类必须实现 + + Returns: + Tuple[bool, str]: (是否执行成功, 回复文本) + """ + pass + + async def send_text( + self, + content: str, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + typing: bool = False, + storage_message: bool = True, + ) -> bool: + """发送文本消息 + + Args: + content: 文本内容 + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象(当set_reply为True时必填) + typing: 是否计算输入时间 + + Returns: + bool: 是否发送成功 + """ + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + + return await send_api.text_to_stream( + text=content, + stream_id=self.chat_id, + set_reply=set_reply, + reply_message=reply_message, + typing=typing, + storage_message=storage_message, + ) + + async def send_emoji( + self, + emoji_base64: str, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + storage_message: bool = True, + ) -> bool: + """发送表情包 + + Args: + emoji_base64: 表情包的base64编码 + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象(当set_reply为True时必填) + + Returns: + bool: 是否发送成功 + """ + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + + return await send_api.emoji_to_stream( + emoji_base64, + self.chat_id, + set_reply=set_reply, + reply_message=reply_message, + storage_message=storage_message, + ) + + async def send_image( + self, + image_base64: str, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + storage_message: bool = True, + ) -> bool: + """发送图片 + + Args: + image_base64: 图片的base64编码 + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象(当set_reply为True时必填) + + Returns: + bool: 是否发送成功 + """ + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + + return await send_api.image_to_stream( + image_base64, + self.chat_id, + set_reply=set_reply, + reply_message=reply_message, + storage_message=storage_message, + ) + + async def send_command( + self, + command_name: str, + args: Optional[dict] = None, + display_message: str = "", + storage_message: bool = True, + ) -> bool: + """发送命令消息 + + Args: + command_name: 命令名称 + args: 命令参数 + display_message: 显示消息 + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + + # 构造命令数据 + command_data = {"name": command_name, "args": args or {}} + + return await send_api.command_to_stream( + command=command_data, + stream_id=self.chat_id, + storage_message=storage_message, + display_message=display_message, + ) + + async def send_custom( + self, + message_type: str, + content: str | Dict, + typing: bool = False, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + storage_message: bool = True, + ) -> bool: + """发送自定义类型消息 + + Args: + message_type: 消息类型,如"video"、"file"、"audio"等 + content: 消息内容 + typing: 是否显示正在输入 + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象(set_reply 为 True时必填) + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + + return await send_api.custom_to_stream( + message_type=message_type, + content=content, + stream_id=self.chat_id, + typing=typing, + set_reply=set_reply, + reply_message=reply_message, + storage_message=storage_message, + ) + + async def send_hybrid( + self, + message_tuple_list: List[Tuple[ReplyContentType | str, str]], + typing: bool = False, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + storage_message: bool = True, + ) -> bool: + """ + 发送混合类型消息 + + Args: + message_tuple_list: 包含消息类型和内容的元组列表,格式为 [(内容类型, 内容), ...] + typing: 是否计算打字时间 + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象 + """ + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + reply_set = ReplySetModel() + reply_set.add_hybrid_content_by_raw(message_tuple_list) + return await send_api.custom_reply_set_to_stream( + reply_set=reply_set, + stream_id=self.chat_id, + typing=typing, + set_reply=set_reply, + reply_message=reply_message, + storage_message=storage_message, + ) + + async def send_forward( + self, + messages_list: List[Tuple[str, str, List[Tuple[ReplyContentType | str, str]]] | str], + storage_message: bool = True, + ) -> bool: + """转发消息 + + Args: + messages_list: 包含消息信息的列表,当传入自行生成的数据时,元素格式为 (sender_id, nickname, 消息体);当传入消息ID时,元素格式为 "message_id" + 其中消息体的格式为 [(内容类型, 内容), ...] + 任意长度的消息都需要使用列表的形式传入 + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + if not self.chat_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + reply_set = ReplySetModel() + forward_message_nodes: List[ForwardNode] = [] + for message in messages_list: + if isinstance(message, str): + forward_message_node = ForwardNode.construct_as_id_reference(message) + elif isinstance(message, Tuple) and len(message) == 3: + sender_id, nickname, content_list = message + single_node_content_list: List[ReplyContent] = [] + for node_content_type, node_content in content_list: + reply_node_content = ReplyContent(content_type=node_content_type, content=node_content) + single_node_content_list.append(reply_node_content) + forward_message_node = ForwardNode.construct_as_created_node( + user_id=sender_id, user_nickname=nickname, content=single_node_content_list + ) + else: + logger.warning(f"{self.log_prefix} 转发消息时遇到无效的消息格式: {message}") + continue + forward_message_nodes.append(forward_message_node) + reply_set.add_forward_content(forward_message_nodes) + return await send_api.custom_reply_set_to_stream( + reply_set=reply_set, + stream_id=self.chat_id, + storage_message=storage_message, + ) + + async def send_voice(self, audio_base64: str) -> bool: + """ + 发送语音消息 + Args: + audio_base64: 语音的base64编码 + Returns: + bool: 是否发送成功 + """ + if not audio_base64: + logger.error(f"{self.log_prefix} 缺少音频内容") + return False + reply_set = ReplySetModel() + reply_set.add_voice_content(audio_base64) + return await send_api.custom_reply_set_to_stream( + reply_set=reply_set, + stream_id=self.chat_id, + storage_message=False, + ) + + async def store_action_info( + self, + action_build_into_prompt: bool = False, + action_prompt_display: str = "", + action_done: bool = True, + ) -> None: + """存储动作信息到数据库 + + Args: + action_build_into_prompt: 是否构建到提示中 + action_prompt_display: 显示的action提示信息 + action_done: action是否完成 + """ + await database_api.store_action_info( + chat_stream=self.chat_stream, + action_build_into_prompt=action_build_into_prompt, + action_prompt_display=action_prompt_display, + action_done=action_done, + thinking_id=self.thinking_id, + action_data=self.action_data, + action_name=self.action_name, + ) + async def wait_for_new_message(self, timeout: int = 1200) -> Tuple[bool, str]: """等待新消息或超时 @@ -216,177 +501,6 @@ class BaseAction(ABC): logger.error(f"{self.log_prefix} 等待新消息时发生错误: {e}") return False, f"等待新消息失败: {str(e)}" - async def send_text( - self, - content: str, - set_reply: bool = False, - reply_message: Optional["DatabaseMessages"] = None, - typing: bool = False, - ) -> bool: - """发送文本消息 - - Args: - content: 文本内容 - reply_to: 回复消息,格式为"发送者:消息内容" - - Returns: - bool: 是否发送成功 - """ - if not self.chat_id: - logger.error(f"{self.log_prefix} 缺少聊天ID") - return False - - return await send_api.text_to_stream( - text=content, - stream_id=self.chat_id, - set_reply=set_reply, - reply_message=reply_message, - typing=typing, - ) - - async def send_emoji( - self, emoji_base64: str, set_reply: bool = False, reply_message: Optional["DatabaseMessages"] = None - ) -> bool: - """发送表情包 - - Args: - emoji_base64: 表情包的base64编码 - - Returns: - bool: 是否发送成功 - """ - if not self.chat_id: - logger.error(f"{self.log_prefix} 缺少聊天ID") - return False - - return await send_api.emoji_to_stream( - emoji_base64, self.chat_id, set_reply=set_reply, reply_message=reply_message - ) - - async def send_image( - self, image_base64: str, set_reply: bool = False, reply_message: Optional["DatabaseMessages"] = None - ) -> bool: - """发送图片 - - Args: - image_base64: 图片的base64编码 - - Returns: - bool: 是否发送成功 - """ - if not self.chat_id: - logger.error(f"{self.log_prefix} 缺少聊天ID") - return False - - return await send_api.image_to_stream( - image_base64, self.chat_id, set_reply=set_reply, reply_message=reply_message - ) - - async def send_custom( - self, - message_type: str, - content: str | Dict, - typing: bool = False, - set_reply: bool = False, - reply_message: Optional["DatabaseMessages"] = None, - ) -> bool: - """发送自定义类型消息 - - Args: - message_type: 消息类型,如"video"、"file"、"audio"等 - content: 消息内容 - typing: 是否显示正在输入 - reply_to: 回复消息,格式为"发送者:消息内容" - - Returns: - bool: 是否发送成功 - """ - if not self.chat_id: - logger.error(f"{self.log_prefix} 缺少聊天ID") - return False - - return await send_api.custom_to_stream( - message_type=message_type, - content=content, - stream_id=self.chat_id, - typing=typing, - set_reply=set_reply, - reply_message=reply_message, - ) - - async def store_action_info( - self, - action_build_into_prompt: bool = False, - action_prompt_display: str = "", - action_done: bool = True, - ) -> None: - """存储动作信息到数据库 - - Args: - action_build_into_prompt: 是否构建到提示中 - action_prompt_display: 显示的action提示信息 - action_done: action是否完成 - """ - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=action_build_into_prompt, - action_prompt_display=action_prompt_display, - action_done=action_done, - thinking_id=self.thinking_id, - action_data=self.action_data, - action_name=self.action_name, - ) - - async def send_command( - self, - command_name: str, - args: Optional[dict] = None, - display_message: str = "", - storage_message: bool = True, - set_reply: bool = False, - reply_message: Optional["DatabaseMessages"] = None, - ) -> bool: - """发送命令消息 - - 使用stream API发送命令 - - Args: - command_name: 命令名称 - args: 命令参数 - display_message: 显示消息 - storage_message: 是否存储消息到数据库 - - Returns: - bool: 是否发送成功 - """ - try: - if not self.chat_id: - logger.error(f"{self.log_prefix} 缺少聊天ID") - return False - - # 构造命令数据 - command_data = {"name": command_name, "args": args or {}} - - success = await send_api.command_to_stream( - command=command_data, - stream_id=self.chat_id, - storage_message=storage_message, - display_message=display_message, - set_reply=set_reply, - reply_message=reply_message, - ) - - if success: - logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") - else: - logger.error(f"{self.log_prefix} 发送命令失败: {command_name}") - - return success - - except Exception as e: - logger.error(f"{self.log_prefix} 发送命令时出错: {e}") - return False - @classmethod def get_action_info(cls) -> "ActionInfo": """从类属性生成ActionInfo @@ -428,26 +542,6 @@ class BaseAction(ABC): associated_types=getattr(cls, "associated_types", []).copy(), ) - @abstractmethod - async def execute(self) -> Tuple[bool, str]: - """执行Action的抽象方法,子类必须实现 - - Returns: - Tuple[bool, str]: (是否执行成功, 回复文本) - """ - pass - - async def handle_action(self) -> Tuple[bool, str]: - """兼容旧系统的handle_action接口,委托给execute方法 - - 为了保持向后兼容性,旧系统的代码可能会调用handle_action方法。 - 此方法将调用委托给新的execute方法。 - - Returns: - Tuple[bool, str]: (是否执行成功, 回复文本) - """ - return await self.execute() - def get_config(self, key: str, default=None): """获取插件配置值,使用嵌套键访问 diff --git a/src/plugin_system/base/base_command.py b/src/plugin_system/base/base_command.py index 1b4e2486..252b3c0b 100644 --- a/src/plugin_system/base/base_command.py +++ b/src/plugin_system/base/base_command.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod -from typing import Dict, Tuple, Optional, TYPE_CHECKING +from typing import Dict, Tuple, Optional, TYPE_CHECKING, List from src.common.logger import get_logger +from src.common.data_models.message_data_model import ReplyContentType, ReplyContent, ReplySetModel, ForwardNode from src.plugin_system.base.component_types import CommandInfo, ComponentType from src.chat.message_receive.message import MessageRecv from src.plugin_system.apis import send_api @@ -98,7 +99,9 @@ class BaseCommand(ABC): Args: content: 回复内容 - reply_to: 回复消息,格式为"发送者:消息内容" + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象(当set_reply为True时必填) + storage_message: 是否存储消息到数据库 Returns: bool: 是否发送成功 @@ -117,113 +120,6 @@ class BaseCommand(ABC): storage_message=storage_message, ) - async def send_type( - self, - message_type: str, - content: str | Dict, - display_message: str = "", - typing: bool = False, - set_reply: bool = False, - reply_message: Optional["DatabaseMessages"] = None, - ) -> bool: - """发送指定类型的回复消息到当前聊天环境 - - Args: - message_type: 消息类型,如"text"、"image"、"emoji"等 - content: 消息内容 - display_message: 显示消息(可选) - typing: 是否显示正在输入 - reply_to: 回复消息,格式为"发送者:消息内容" - - Returns: - bool: 是否发送成功 - """ - # 获取聊天流信息 - chat_stream = self.message.chat_stream - if not chat_stream or not hasattr(chat_stream, "stream_id"): - logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") - return False - - return await send_api.custom_to_stream( - message_type=message_type, - content=content, - stream_id=chat_stream.stream_id, - display_message=display_message, - typing=typing, - set_reply=set_reply, - reply_message=reply_message, - ) - - async def send_command( - self, - command_name: str, - args: Optional[dict] = None, - display_message: str = "", - storage_message: bool = True, - set_reply: bool = False, - reply_message: Optional["DatabaseMessages"] = None, - ) -> bool: - """发送命令消息 - - Args: - command_name: 命令名称 - args: 命令参数 - display_message: 显示消息 - storage_message: 是否存储消息到数据库 - - Returns: - bool: 是否发送成功 - """ - try: - # 获取聊天流信息 - chat_stream = self.message.chat_stream - if not chat_stream or not hasattr(chat_stream, "stream_id"): - logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") - return False - - # 构造命令数据 - command_data = {"name": command_name, "args": args or {}} - - success = await send_api.command_to_stream( - command=command_data, - stream_id=chat_stream.stream_id, - storage_message=storage_message, - display_message=display_message, - set_reply=set_reply, - reply_message=reply_message, - ) - - if success: - logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") - else: - logger.error(f"{self.log_prefix} 发送命令失败: {command_name}") - - return success - - except Exception as e: - logger.error(f"{self.log_prefix} 发送命令时出错: {e}") - return False - - async def send_emoji( - self, emoji_base64: str, set_reply: bool = False, reply_message: Optional["DatabaseMessages"] = None - ) -> bool: - """发送表情包 - - Args: - emoji_base64: 表情包的base64编码 - - Returns: - bool: 是否发送成功 - """ - chat_stream = self.message.chat_stream - if not chat_stream or not hasattr(chat_stream, "stream_id"): - logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") - return False - - return await send_api.emoji_to_stream( - emoji_base64, chat_stream.stream_id, set_reply=set_reply, reply_message=reply_message - ) - async def send_image( self, image_base64: str, @@ -252,6 +148,221 @@ class BaseCommand(ABC): storage_message=storage_message, ) + async def send_emoji( + self, + emoji_base64: str, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + storage_message: bool = True, + ) -> bool: + """发送表情包 + + Args: + emoji_base64: 表情包的base64编码 + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象(当set_reply为True时必填) + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + chat_stream = self.message.chat_stream + if not chat_stream or not hasattr(chat_stream, "stream_id"): + logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") + return False + + return await send_api.emoji_to_stream( + emoji_base64, chat_stream.stream_id, set_reply=set_reply, reply_message=reply_message + ) + + async def send_command( + self, + command_name: str, + args: Optional[dict] = None, + display_message: str = "", + storage_message: bool = True, + ) -> bool: + """发送命令消息 + + Args: + command_name: 命令名称 + args: 命令参数 + display_message: 显示消息 + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + try: + # 获取聊天流信息 + chat_stream = self.message.chat_stream + if not chat_stream or not hasattr(chat_stream, "stream_id"): + logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") + return False + + # 构造命令数据 + command_data = {"name": command_name, "args": args or {}} + + success = await send_api.command_to_stream( + command=command_data, + stream_id=chat_stream.stream_id, + storage_message=storage_message, + display_message=display_message, + ) + + if success: + logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") + else: + logger.error(f"{self.log_prefix} 发送命令失败: {command_name}") + + return success + + except Exception as e: + logger.error(f"{self.log_prefix} 发送命令时出错: {e}") + return False + + async def send_voice(self, voice_base64: str) -> bool: + """ + 发送语音消息 + Args: + voice_base64: 语音的base64编码 + Returns: + bool: 是否发送成功 + """ + chat_stream = self.message.chat_stream + if not chat_stream or not hasattr(chat_stream, "stream_id"): + logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") + return False + + return await send_api.custom_to_stream( + message_type="voice", + content=voice_base64, + stream_id=chat_stream.stream_id, + typing=False, + set_reply=False, + reply_message=None, + storage_message=False, + ) + + async def send_hybrid( + self, + message_tuple_list: List[Tuple[ReplyContentType | str, str]], + typing: bool = False, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + storage_message: bool = True, + ) -> bool: + """ + 发送混合类型消息 + + Args: + message_tuple_list: 包含消息类型和内容的元组列表,格式为 [(内容类型, 内容), ...] + typing: 是否显示正在输入 + set_reply: 是否计算打字时间 + reply_message: 回复的消息对象 + storage_message: 是否存储消息到数据库 + """ + chat_stream = self.message.chat_stream + if not chat_stream or not hasattr(chat_stream, "stream_id"): + logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") + return False + reply_set = ReplySetModel() + reply_set.add_hybrid_content_by_raw(message_tuple_list) + return await send_api.custom_reply_set_to_stream( + reply_set=reply_set, + stream_id=chat_stream.stream_id, + typing=typing, + set_reply=set_reply, + reply_message=reply_message, + storage_message=storage_message, + ) + + async def send_forward( + self, + messages_list: List[Tuple[str, str, List[Tuple[ReplyContentType | str, str]]] | str], + storage_message: bool = True, + ) -> bool: + """转发消息 + + Args: + messages_list: 包含消息信息的列表,当传入自行生成的数据时,元素格式为 (sender_id, nickname, 消息体);当传入消息ID时,元素格式为 "message_id" + 其中消息体的格式为 [(内容类型, 内容), ...] + 任意长度的消息都需要使用列表的形式传入 + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + chat_stream = self.message.chat_stream + if not chat_stream or not hasattr(chat_stream, "stream_id"): + logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") + return False + reply_set = ReplySetModel() + forward_message_nodes: List[ForwardNode] = [] + for message in messages_list: + if isinstance(message, str): + forward_message_node = ForwardNode.construct_as_id_reference(message) + elif isinstance(message, Tuple) and len(message) == 3: + sender_id, nickname, content_list = message + single_node_content_list: List[ReplyContent] = [] + for node_content_type, node_content in content_list: + reply_node_content = ReplyContent(content_type=node_content_type, content=node_content) + single_node_content_list.append(reply_node_content) + forward_message_node = ForwardNode.construct_as_created_node( + user_id=sender_id, user_nickname=nickname, content=single_node_content_list + ) + else: + logger.warning(f"{self.log_prefix} 转发消息时遇到无效的消息格式: {message}") + continue + forward_message_nodes.append(forward_message_node) + reply_set.add_forward_content(forward_message_nodes) + return await send_api.custom_reply_set_to_stream( + reply_set=reply_set, + stream_id=chat_stream.stream_id, + storage_message=storage_message, + ) + + async def send_custom( + self, + message_type: str, + content: str | Dict, + display_message: str = "", + typing: bool = False, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + storage_message: bool = True, + ) -> bool: + """发送指定类型的回复消息到当前聊天环境 + + Args: + message_type: 消息类型,如"text"、"image"、"emoji"、"voice"等 + content: 消息内容 + display_message: 显示消息(可选) + typing: 是否显示正在输入 + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象(set_reply 为 True时必填) + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + # 获取聊天流信息 + chat_stream = self.message.chat_stream + if not chat_stream or not hasattr(chat_stream, "stream_id"): + logger.error(f"{self.log_prefix} 缺少聊天流或stream_id") + return False + + return await send_api.custom_to_stream( + message_type=message_type, + content=content, + stream_id=chat_stream.stream_id, + display_message=display_message, + typing=typing, + set_reply=set_reply, + reply_message=reply_message, + storage_message=storage_message, + ) + @classmethod def get_command_info(cls) -> "CommandInfo": """从类属性生成CommandInfo diff --git a/src/plugin_system/base/base_events_handler.py b/src/plugin_system/base/base_events_handler.py index d248ef0a..bb8391ca 100644 --- a/src/plugin_system/base/base_events_handler.py +++ b/src/plugin_system/base/base_events_handler.py @@ -1,11 +1,16 @@ from abc import ABC, abstractmethod -from typing import Tuple, Optional, Dict, List +from typing import Tuple, Optional, Dict, List, TYPE_CHECKING from src.common.logger import get_logger +from src.common.data_models.message_data_model import ReplyContentType, ReplySetModel, ReplyContent, ForwardNode +from src.plugin_system.apis import send_api from .component_types import MaiMessages, EventType, EventHandlerInfo, ComponentType, CustomEventHandlerResult logger = get_logger("base_event_handler") +if TYPE_CHECKING: + from src.common.data_models.database_data_model import DatabaseMessages + class BaseEventHandler(ABC): """事件处理器基类 @@ -103,3 +108,273 @@ class BaseEventHandler(ABC): return default return current + + async def send_text( + self, + stream_id: str, + text: str, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + typing: bool = False, + storage_message: bool = True, + ) -> bool: + """发送文本消息 + + Args: + stream_id: 聊天ID + text: 文本内容 + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象(当set_reply为True时必填) + typing: 是否计算输入时间 + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + if not stream_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + return await send_api.text_to_stream( + text=text, + stream_id=stream_id, + set_reply=set_reply, + reply_message=reply_message, + typing=typing, + storage_message=storage_message, + ) + + async def send_emoji( + self, + stream_id: str, + emoji_base64: str, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + storage_message: bool = True, + ) -> bool: + """发送表情消息 + + Args: + emoji_base64: 表情的Base64编码 + stream_id: 聊天ID + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象(当set_reply为True时必填) + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + if not stream_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + return await send_api.emoji_to_stream( + emoji_base64=emoji_base64, + stream_id=stream_id, + set_reply=set_reply, + reply_message=reply_message, + storage_message=storage_message, + ) + + async def send_image( + self, + stream_id: str, + image_base64: str, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + storage_message: bool = True, + ) -> bool: + """发送图片消息 + + Args: + image_base64: 图片的Base64编码 + stream_id: 聊天ID + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象(当set_reply为True时必填) + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + if not stream_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + return await send_api.image_to_stream( + image_base64=image_base64, + stream_id=stream_id, + set_reply=set_reply, + reply_message=reply_message, + storage_message=storage_message, + ) + + async def send_voice( + self, + stream_id: str, + audio_base64: str, + ) -> bool: + """发送语音消息 + Args: + stream_id: 聊天ID + audio_base64: 语音的Base64编码 + Returns: + bool: 是否发送成功 + """ + if not stream_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + reply_set = ReplySetModel() + reply_set.add_voice_content(audio_base64) + return await send_api.custom_reply_set_to_stream( + reply_set=reply_set, + stream_id=stream_id, + storage_message=False, + ) + + async def send_command( + self, + stream_id: str, + command_name: str, + command_args: Optional[dict] = None, + display_message: str = "", + storage_message: bool = True, + ) -> bool: + """发送命令消息 + + Args: + stream_id: 流ID + command_name: 命令名称 + command_args: 命令参数字典 + display_message: 显示消息 + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + if not stream_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + + # 构造命令数据 + command_data = {"name": command_name, "args": command_args or {}} + + return await send_api.command_to_stream( + command=command_data, + stream_id=stream_id, + storage_message=storage_message, + display_message=display_message, + ) + + async def send_custom( + self, + stream_id: str, + message_type: str, + content: str | Dict, + typing: bool = False, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + storage_message: bool = True, + ) -> bool: + """发送自定义消息 + + Args: + stream_id: 聊天ID + message_type: 消息类型 + content: 消息内容,可以是字符串或字典 + typing: 是否显示正在输入状态 + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象(当set_reply为True时必填) + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + if not stream_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + return await send_api.custom_to_stream( + message_type=message_type, + content=content, + stream_id=stream_id, + typing=typing, + set_reply=set_reply, + reply_message=reply_message, + storage_message=storage_message, + ) + + async def send_hybrid( + self, + stream_id: str, + message_tuple_list: List[Tuple[ReplyContentType | str, str]], + typing: bool = False, + set_reply: bool = False, + reply_message: Optional["DatabaseMessages"] = None, + storage_message: bool = True, + ) -> bool: + """ + 发送混合类型消息 + + Args: + stream_id: 流ID + message_tuple_list: 包含消息类型和内容的元组列表,格式为 [(内容类型, 内容), ...] + typing: 是否计算打字时间 + set_reply: 是否作为回复发送 + reply_message: 回复的消息对象 + storage_message: 是否存储消息到数据库 + """ + if not stream_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + reply_set = ReplySetModel() + reply_set.add_hybrid_content_by_raw(message_tuple_list) + return await send_api.custom_reply_set_to_stream( + reply_set=reply_set, + stream_id=stream_id, + typing=typing, + set_reply=set_reply, + reply_message=reply_message, + storage_message=storage_message, + ) + + async def send_forward( + self, + stream_id: str, + messages_list: List[Tuple[str, str, List[Tuple[ReplyContentType | str, str]]] | str], + storage_message: bool = True, + ) -> bool: + """转发消息 + + Args: + stream_id: 聊天ID + messages_list: 包含消息信息的列表,当传入自行生成的数据时,元素格式为 (sender_id, nickname, 消息体);当传入消息ID时,元素格式为 "message_id" + 其中消息体的格式为 [(内容类型, 内容), ...] + 任意长度的消息都需要使用列表的形式传入 + storage_message: 是否存储消息到数据库 + + Returns: + bool: 是否发送成功 + """ + if not stream_id: + logger.error(f"{self.log_prefix} 缺少聊天ID") + return False + reply_set = ReplySetModel() + forward_message_nodes: List[ForwardNode] = [] + for message in messages_list: + if isinstance(message, str): + forward_message_node = ForwardNode.construct_as_id_reference(message) + elif isinstance(message, Tuple) and len(message) == 3: + sender_id, nickname, content_list = message + single_node_content_list: List[ReplyContent] = [] + for node_content_type, node_content in content_list: + reply_node_content = ReplyContent(content_type=node_content_type, content=node_content) + single_node_content_list.append(reply_node_content) + forward_message_node = ForwardNode.construct_as_created_node( + user_id=sender_id, user_nickname=nickname, content=single_node_content_list + ) + else: + logger.warning(f"{self.log_prefix} 转发消息时遇到无效的消息格式: {message}") + continue + forward_message_nodes.append(forward_message_node) + reply_set.add_forward_content(forward_message_nodes) + return await send_api.custom_reply_set_to_stream( + reply_set=reply_set, + stream_id=stream_id, + storage_message=storage_message, + ) diff --git a/src/plugin_system/base/component_types.py b/src/plugin_system/base/component_types.py index 86c4f1a8..6c073ae9 100644 --- a/src/plugin_system/base/component_types.py +++ b/src/plugin_system/base/component_types.py @@ -8,6 +8,9 @@ from maim_message import Seg from src.llm_models.payload_content.tool_option import ToolParamType as ToolParamType from src.llm_models.payload_content.tool_option import ToolCall as ToolCall from src.common.data_models.message_data_model import ReplyContentType as ReplyContentType +from src.common.data_models.message_data_model import ReplyContent as ReplyContent +from src.common.data_models.message_data_model import ForwardNode as ForwardNode +from src.common.data_models.message_data_model import ReplySetModel as ReplySetModel # 组件类型枚举