from typing import Union, List, Optional, Deque, Dict from nonebot.adapters.onebot.v11 import Bot, MessageSegment import asyncio import random from .message import Message, Message_Thinking, MessageSet from .cq_code import CQCode from collections import deque import time from .storage import MessageStorage # 添加这行导入 class SendTemp: """单个群组的临时消息队列管理器""" def __init__(self, group_id: int, max_size: int = 100): self.group_id = group_id self.max_size = max_size self.messages: Deque[Union[Message, Message_Thinking]] = deque(maxlen=max_size) self.last_send_time = 0 def add(self, message: Message) -> None: """按时间顺序添加消息到队列""" if not self.messages: self.messages.append(message) return # 按时间顺序插入 if message.time >= self.messages[-1].time: self.messages.append(message) return # 使用二分查找找到合适的插入位置 messages_list = list(self.messages) left, right = 0, len(messages_list) while left < right: mid = (left + right) // 2 if messages_list[mid].time < message.time: left = mid + 1 else: right = mid # 重建消息队列,保持时间顺序 new_messages = deque(maxlen=self.max_size) new_messages.extend(messages_list[:left]) new_messages.append(message) new_messages.extend(messages_list[left:]) self.messages = new_messages def get_earliest_message(self) -> Optional[Message]: """获取时间最早的消息""" message = self.messages.popleft() if self.messages else None # 如果是思考中的消息且思考时间不够,重新加入队列 # if (isinstance(message, Message_Thinking) and # time.time() - message.start_time < 2): # 最少思考2秒 # self.messages.appendleft(message) # return None return message def clear(self) -> None: """清空队列""" self.messages.clear() def get_all(self, group_id: Optional[int] = None) -> List[Union[Message, Message_Thinking]]: """获取所有待发送的消息""" if group_id is None: return list(self.messages) return [msg for msg in self.messages if msg.group_id == group_id] def peek_next(self) -> Optional[Union[Message, Message_Thinking]]: """查看下一条要发送的消息(不移除)""" return self.messages[0] if self.messages else None def has_messages(self) -> bool: """检查是否有待发送的消息""" return bool(self.messages) def count(self, group_id: Optional[int] = None) -> int: """获取待发送消息数量""" if group_id is None: return len(self.messages) return len([msg for msg in self.messages if msg.group_id == group_id]) def get_last_send_time(self) -> float: """获取最后一次发送时间""" return self.last_send_time def update_send_time(self): """更新最后发送时间""" self.last_send_time = time.time() class SendTempContainer: """管理所有群组的消息缓存容器""" def __init__(self): self.temp_queues: Dict[int, SendTemp] = {} def get_queue(self, group_id: int) -> SendTemp: """获取或创建群组的消息队列""" if group_id not in self.temp_queues: self.temp_queues[group_id] = SendTemp(group_id) return self.temp_queues[group_id] def add_message(self, message: Message) -> None: """添加消息到对应群组的队列""" queue = self.get_queue(message.group_id) queue.add(message) def get_group_messages(self, group_id: int) -> List[Union[Message, Message_Thinking]]: """获取指定群组的所有待发送消息""" queue = self.get_queue(group_id) return queue.get_all() def has_messages(self, group_id: int) -> bool: """检查指定群组是否有待发送消息""" queue = self.get_queue(group_id) return queue.has_messages() def get_all_groups(self) -> List[int]: """获取所有有待发送消息的群组ID""" return list(self.temp_queues.keys()) def update_thinking_message(self, message_obj: Union[Message, MessageSet]) -> bool: """更新思考中的消息 Args: message_obj: 要更新的消息对象,可以是单条消息或消息组 Returns: bool: 更新是否成功 """ queue = self.get_queue(message_obj.group_id) # 使用列表解析找到匹配的消息索引 matching_indices = [ i for i, msg in enumerate(queue.messages) if msg.message_id == message_obj.message_id ] if not matching_indices: return False index = matching_indices[0] # 获取第一个匹配的索引 # 将消息转换为列表以便修改 messages = list(queue.messages) # 根据消息类型处理 if isinstance(message_obj, MessageSet): messages.pop(index) # 在原位置插入新消息组 for i, single_message in enumerate(message_obj.messages): messages.insert(index + i, single_message) # print(f"\033[1;34m[调试]\033[0m 添加消息组中的第{i+1}条消息: {single_message}") else: # 直接替换原消息 messages[index] = message_obj # print(f"\033[1;34m[调试]\033[0m 已更新消息: {message_obj}") # 重建队列 queue.messages.clear() for msg in messages: queue.messages.append(msg) return True class MessageSendControl: """消息发送控制器""" def __init__(self): self.typing_speed = (0.1, 0.3) # 每个字符的打字时间范围(秒) self.message_interval = (0.5, 1) # 多条消息间的间隔时间范围(秒) self.max_retry = 3 # 最大重试次数 self.send_temp_container = SendTempContainer() self._running = True self._paused = False self._current_bot = None self.storage = MessageStorage() # 添加存储实例 def set_bot(self, bot: Bot): """设置当前bot实例""" self._current_bot = bot async def start_processor(self, bot: Bot): """启动消息处理器""" self._current_bot = bot while self._running: await asyncio.sleep(1.5) # 处理所有群组的消息队列 for group_id in self.send_temp_container.get_all_groups(): queue = self.send_temp_container.get_queue(group_id) if queue.has_messages(): message = queue.peek_next() # print(f"\033[1;34m[调试]\033[0m 查看最早的消息: {message}") if message: if isinstance(message, Message_Thinking): # 如果是思考中的消息,检查是否需要继续等待 # message.update_thinking_time() thinking_time = time.time() - message.time if thinking_time < 60: # 最少思考2秒 if int(thinking_time) % 10 == 0: print(f"\033[1;34m[调试]\033[0m 消息正在思考中,已思考{thinking_time:.1f}秒") continue else: print(f"\033[1;34m[调试]\033[0m 思考消息超时,移除") queue.get_earliest_message() # 移除超时的思考消息 elif isinstance(message, Message): message = queue.get_earliest_message() if message and message.processed_plain_text: print(f"- 群组: {group_id} - 内容: {message.processed_plain_text}") cost_time = round(time.time(), 2) - message.time # print(f"\033[1;34m[调试]\033[0m 消息发送111111时间: {cost_time}秒") if cost_time > 40: message.processed_plain_text = CQCode.create_reply_cq(message.message_based_id) + message.processed_plain_text await self._current_bot.send_group_msg( group_id=group_id, message=str(message.processed_plain_text), auto_escape=False ) current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(message.time)) print(f"\033[1;32m群 {group_id} 消息, 用户 麦麦, 时间: {current_time}:\033[0m {str(message.processed_plain_text)}") await self.storage.store_message(message, None) queue.update_send_time() if queue.has_messages(): await asyncio.sleep( random.uniform( self.message_interval[0], self.message_interval[1] ) ) async def process_group_queue(self, bot: Bot, group_id: int) -> None: """处理指定群组的消息队列""" queue = self.send_temp_container.get_queue(group_id) while queue.has_messages(): message = queue.get_earliest_message() if message and message.processed_plain_text: await self.send_message( bot=bot, group_id=group_id, content=message.processed_plain_text ) queue.update_send_time() if queue.has_messages(): await asyncio.sleep( random.uniform(self.message_interval[0], self.message_interval[1]) ) async def process_all_queues(self, bot: Bot) -> None: """处理所有群组的消息队列""" if not self._running or self._paused: return for group_id in self.send_temp_container.get_all_groups(): await self.process_group_queue(bot, group_id) async def send_temp_message(self, bot: Bot, group_id: int, message: Union[Message, Message_Thinking], with_emoji: bool = False, emoji_path: Optional[str] = None) -> bool: """ 发送单个临时消息 Args: bot: Bot实例 group_id: 群组ID message: Message对象 with_emoji: 是否带表情 emoji_path: 表情图片路径 Returns: bool: 发送是否成功 """ try: if with_emoji and emoji_path: return await self.send_with_emoji( bot=bot, group_id=group_id, text_content=message.processed_plain_text, emoji_path=emoji_path ) else: return await self.send_message( bot=bot, group_id=group_id, content=message.processed_plain_text ) except Exception as e: print(f"\033[1;31m[错误]\033[0m 发送临时消息失败: {str(e)}") return False def set_typing_speed(self, min_speed: float, max_speed: float): """设置打字速度范围""" self.typing_speed = (min_speed, max_speed) def set_message_interval(self, min_interval: float, max_interval: float): """设置消息间隔范围""" self.message_interval = (min_interval, max_interval) def pause(self): """暂停消息处理""" self._paused = True def resume(self): """恢复消息处理""" self._paused = False def stop(self): """停止消息处理""" self._running = False # 创建全局实例 message_sender = MessageSendControl()