diff --git a/changelogs/changelog.md b/changelogs/changelog.md index a35f2ef6..7e413b76 100644 --- a/changelogs/changelog.md +++ b/changelogs/changelog.md @@ -2,20 +2,31 @@ ## [0.11.0] - 2025-9-22 ### 🌟 主要功能更改 -- 重构记忆系统,新的记忆系统更可靠,记忆能力更强大 -- 麦麦好奇功能,麦麦会自主提出问题 -- 添加deepthink插件(默认关闭),让麦麦可以深度思考一些问题 +- 重构记忆系统,新的记忆系统更可靠,双通道查询,可以查询文本记忆和过去聊天记录 +- 主动发言功能,麦麦会自主提出问题(可精细调控频率) +- 支持多重人格设定,可以随机切换成不同状态 +- 新增表达方式学习新模式,更少的占用 - 添加表情包管理插件 +- 现可更好的支持多平台 +- 添加deepthink插件(默认关闭),让麦麦可以深度思考一些问题 +- 现已内置BetterFrequency插件 + ### 细节功能更改 - 修复配置文件转义问题 - 情绪系统现在可以由配置文件控制开关 - 修复平行动作控制失效的问题 - 添加planner防抖,防止短时间快速消耗token +- 优化planner历史状态记录 - 修复吞字问题 +- 修复意外换行问题 +- 移除VLM的token限制 +- 为tool工具添加chat_id字段 - 更新依赖表 - 修复负载均衡 -- 优化了对gemini和不同模型的支持 +- 现统计模型名而不是模型标识符 +- 修改默认推荐模型为ds v3.2 +- 优化了对gemini和不同模型的支持,优化了对gemini搜索的支持 ## [0.10.3] - 2025-9-22 ### 🌟 主要功能更改 diff --git a/plugins/ChatFrequency/frequency_adjust_action.py b/plugins/ChatFrequency/frequency_adjust_action.py deleted file mode 100644 index c953b027..00000000 --- a/plugins/ChatFrequency/frequency_adjust_action.py +++ /dev/null @@ -1,121 +0,0 @@ -from typing import Tuple - -# 导入新插件系统 -from src.plugin_system import BaseAction, ActionActivationType - -# 导入依赖的系统组件 -from src.common.logger import get_logger - -# 导入API模块 -from src.plugin_system.apis import frequency_api, send_api, config_api, generator_api - -logger = get_logger("frequency_adjust") - - -class FrequencyAdjustAction(BaseAction): - """频率调节动作 - 调整聊天发言频率""" - - activation_type = ActionActivationType.LLM_JUDGE - parallel_action = False - - # 动作基本信息 - action_name = "frequency_adjust" - - action_description = "调整当前聊天的发言频率" - - # 动作参数定义 - action_parameters = { - "direction": "调整方向:'increase'(增加)或'decrease'(降低)", - } - - # 动作使用场景 - bot_name = config_api.get_global_config("bot.nickname") - - - action_require = [ - f"当用户提到 {bot_name} 太安静或太活跃时使用", - f"有人提到 {bot_name} 的发言太多或太少", - f"需要根据聊天氛围调整 {bot_name} 的活跃度", - ] - - # 关联类型 - associated_types = ["text"] - - async def execute(self) -> Tuple[bool, str]: - """执行频率调节动作""" - try: - # 1. 获取动作参数 - direction = self.action_data.get("direction") - # multiply = 1.2 - # multiply = self.action_data.get("multiply") - - if not direction: - error_msg = "缺少必要的参数:direction或multiply" - logger.error(f"{self.log_prefix} {error_msg}") - return False, error_msg - - # 2. 获取当前频率值 - current_frequency = frequency_api.get_current_talk_value(self.chat_id) - - # 3. 计算新的频率值(使用比率而不是绝对值) - # calculated_frequency = current_frequency * multiply - if direction == "increase": - calculated_frequency = current_frequency * 1.2 - if calculated_frequency > 1.0: - new_frequency = 1.0 - action_desc = f"增加到最大值" - # 记录超出限制的action - logger.warning(f"{self.log_prefix} 尝试调整频率超出最大值: current={current_frequency:.2f}, calculated={calculated_frequency:.2f}") - await self.store_action_info( - action_build_into_prompt=True, - action_prompt_display=f"你尝试调整发言频率到{calculated_frequency:.2f},但最大值只能为1.0,已设置为最大值", - action_done=True, - ) - return True, f"调整发言频率超出限制: {current_frequency:.2f} → {new_frequency:.2f}" - else: - new_frequency = calculated_frequency - action_desc = f"增加" - elif direction == "decrease": - calculated_frequency = current_frequency * 0.8 - new_frequency = max(0.0, calculated_frequency) - action_desc = f"降低" - else: - error_msg = f"无效的调整方向: {direction}" - logger.error(f"{self.log_prefix} {error_msg}") - return False, error_msg - - # 4. 设置新的频率值 - frequency_api.set_talk_frequency_adjust(self.chat_id, new_frequency) - - # 5. 发送反馈消息 - feedback_msg = f"已{action_desc}发言频率:{current_frequency:.2f} → {new_frequency:.2f}" - result_status, data = await generator_api.rewrite_reply( - chat_stream=self.chat_stream, - reply_data={ - "raw_reply": feedback_msg, - "reason": "表达自己已经调整了发言频率,不一定要说具体数值,可以有趣一些", - }, - ) - - - if result_status: - for reply_seg in data.reply_set.reply_data: - send_data = reply_seg.content - await self.send_text(send_data) - logger.info(f"{self.log_prefix} {send_data}") - - # 6. 存储动作信息(仅在未超出限制时) - if calculated_frequency <= 1.0: - await self.store_action_info( - action_build_into_prompt=True, - action_prompt_display=f"你{action_desc}了发言频率,从{current_frequency:.2f}调整到{new_frequency:.2f}", - action_done=True, - ) - - return True, f"成功调整发言频率: {current_frequency:.2f} → {new_frequency:.2f}" - - except Exception as e: - error_msg = f"频率调节失败: {str(e)}" - logger.error(f"{self.log_prefix} {error_msg}", exc_info=True) - await self.send_text("频率调节失败") - return False, error_msg \ No newline at end of file diff --git a/plugins/ChatFrequency/plugin.py b/plugins/ChatFrequency/plugin.py index 6d78e6e1..93cb522b 100644 --- a/plugins/ChatFrequency/plugin.py +++ b/plugins/ChatFrequency/plugin.py @@ -1,4 +1,4 @@ -from typing import List, Tuple, Type, Any, Optional +from typing import List, Tuple, Type, Optional from src.plugin_system import ( BasePlugin, register_plugin, @@ -7,8 +7,6 @@ from src.plugin_system import ( ConfigField ) from src.plugin_system.apis import send_api, frequency_api -from .frequency_adjust_action import FrequencyAdjustAction - class SetTalkFrequencyCommand(BaseCommand): """设置当前聊天的talk_frequency值""" @@ -136,10 +134,6 @@ class BetterFrequencyPlugin(BasePlugin): "max_adjust_value": ConfigField(type=float, default=1.0, description="最大调整值"), "min_adjust_value": ConfigField(type=float, default=0.0, description="最小调整值"), }, - # "features": { - # "enable_frequency_adjust_action": ConfigField(type=bool, default=False, description="是否启用频率调节动作(FrequencyAdjustAction)"), - # "enable_commands": ConfigField(type=bool, default=True, description="是否启用命令功能(/chat命令)"), - # } } def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: @@ -152,8 +146,5 @@ class BetterFrequencyPlugin(BasePlugin): (ShowFrequencyCommand.get_command_info(), ShowFrequencyCommand), ]) - # 根据配置决定是否注册频率调节动作组件 - # if self.config.get("features", {}).get("enable_frequency_adjust_action", True): - # components.append((FrequencyAdjustAction.get_action_info(), FrequencyAdjustAction)) return components diff --git a/src/chat/brain_chat/brain_chat.py b/src/chat/brain_chat/brain_chat.py index 89771628..db037970 100644 --- a/src/chat/brain_chat/brain_chat.py +++ b/src/chat/brain_chat/brain_chat.py @@ -1,574 +1,574 @@ -import asyncio -import time -import traceback -import random -from typing import List, Optional, Dict, Any, Tuple, TYPE_CHECKING -from rich.traceback import install - -from src.config.config import global_config -from src.common.logger import get_logger -from src.common.data_models.info_data_model import ActionPlannerInfo -from src.common.data_models.message_data_model import ReplyContentType -from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager -from src.chat.utils.prompt_builder import global_prompt_manager -from src.chat.utils.timer_calculator import Timer -from src.chat.brain_chat.brain_planner import BrainPlanner -from src.chat.planner_actions.action_modifier import ActionModifier -from src.chat.planner_actions.action_manager import ActionManager -from src.chat.heart_flow.hfc_utils import CycleDetail -from src.express.expression_learner import expression_learner_manager -from src.person_info.person_info import Person -from src.plugin_system.base.component_types import EventType, ActionInfo -from src.plugin_system.core import events_manager -from src.plugin_system.apis import generator_api, send_api, message_api, database_api -from src.memory_system.Memory_chest import global_memory_chest -from src.chat.utils.chat_message_builder import ( - build_readable_messages_with_id, - get_raw_msg_before_timestamp_with_chat, -) - -if TYPE_CHECKING: - from src.common.data_models.database_data_model import DatabaseMessages - from src.common.data_models.message_data_model import ReplySetModel - - -ERROR_LOOP_INFO = { - "loop_plan_info": { - "action_result": { - "action_type": "error", - "action_data": {}, - "reasoning": "循环处理失败", - }, - }, - "loop_action_info": { - "action_taken": False, - "reply_text": "", - "command": "", - "taken_time": time.time(), - }, -} - - -install(extra_lines=3) - -# 注释:原来的动作修改超时常量已移除,因为改为顺序执行 - -logger = get_logger("bc") # Logger Name Changed - - -class BrainChatting: - """ - 管理一个连续的私聊Brain Chat循环 - 用于在特定聊天流中生成回复。 - """ - - def __init__(self, chat_id: str): - """ - BrainChatting 初始化函数 - - 参数: - chat_id: 聊天流唯一标识符(如stream_id) - on_stop_focus_chat: 当收到stop_focus_chat命令时调用的回调函数 - performance_version: 性能记录版本号,用于区分不同启动版本 - """ - # 基础属性 - self.stream_id: str = chat_id # 聊天流ID - self.chat_stream: ChatStream = get_chat_manager().get_stream(self.stream_id) # type: ignore - if not self.chat_stream: - raise ValueError(f"无法找到聊天流: {self.stream_id}") - self.log_prefix = f"[{get_chat_manager().get_stream_name(self.stream_id) or self.stream_id}]" - - self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id) - - self.action_manager = ActionManager() - self.action_planner = BrainPlanner(chat_id=self.stream_id, action_manager=self.action_manager) - self.action_modifier = ActionModifier(action_manager=self.action_manager, chat_id=self.stream_id) - - # 循环控制内部状态 - self.running: bool = False - self._loop_task: Optional[asyncio.Task] = None # 主循环任务 - - # 添加循环信息管理相关的属性 - self.history_loop: List[CycleDetail] = [] - self._cycle_counter = 0 - self._current_cycle_detail: CycleDetail = None # type: ignore - - self.last_read_time = time.time() - 2 - - self.more_plan = False - - async def start(self): - """检查是否需要启动主循环,如果未激活则启动。""" - - # 如果循环已经激活,直接返回 - if self.running: - logger.debug(f"{self.log_prefix} BrainChatting 已激活,无需重复启动") - return - - try: - # 标记为活动状态,防止重复启动 - self.running = True - - self._loop_task = asyncio.create_task(self._main_chat_loop()) - self._loop_task.add_done_callback(self._handle_loop_completion) - logger.info(f"{self.log_prefix} BrainChatting 启动完成") - - except Exception as e: - # 启动失败时重置状态 - self.running = False - self._loop_task = None - logger.error(f"{self.log_prefix} BrainChatting 启动失败: {e}") - raise - - def _handle_loop_completion(self, task: asyncio.Task): - """当 _hfc_loop 任务完成时执行的回调。""" - try: - if exception := task.exception(): - logger.error(f"{self.log_prefix} BrainChatting: 脱离了聊天(异常): {exception}") - logger.error(traceback.format_exc()) # Log full traceback for exceptions - else: - logger.info(f"{self.log_prefix} BrainChatting: 脱离了聊天 (外部停止)") - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} BrainChatting: 结束了聊天") - - def start_cycle(self) -> Tuple[Dict[str, float], str]: - self._cycle_counter += 1 - self._current_cycle_detail = CycleDetail(self._cycle_counter) - self._current_cycle_detail.thinking_id = f"tid{str(round(time.time(), 2))}" - cycle_timers = {} - return cycle_timers, self._current_cycle_detail.thinking_id - - def end_cycle(self, loop_info, cycle_timers): - self._current_cycle_detail.set_loop_info(loop_info) - self.history_loop.append(self._current_cycle_detail) - self._current_cycle_detail.timers = cycle_timers - self._current_cycle_detail.end_time = time.time() - - def print_cycle_info(self, cycle_timers): - # 记录循环信息和计时器结果 - timer_strings = [] - for name, elapsed in cycle_timers.items(): - formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" - timer_strings.append(f"{name}: {formatted_time}") - - logger.info( - f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," - f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒" # type: ignore - + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") - ) - - async def _loopbody(self): # sourcery skip: hoist-if-from-if - recent_messages_list = message_api.get_messages_by_time_in_chat( - chat_id=self.stream_id, - start_time=self.last_read_time, - end_time=time.time(), - limit=20, - limit_mode="latest", - filter_mai=True, - filter_command=True, - ) - - if len(recent_messages_list) >= 1: - self.last_read_time = time.time() - await self._observe(recent_messages_list=recent_messages_list) - - else: - # Normal模式:消息数量不足,等待 - await asyncio.sleep(0.2) - return True - return True - - async def _send_and_store_reply( - self, - response_set: "ReplySetModel", - action_message: "DatabaseMessages", - cycle_timers: Dict[str, float], - thinking_id, - actions, - selected_expressions: Optional[List[int]] = None, - ) -> Tuple[Dict[str, Any], str, Dict[str, float]]: - with Timer("回复发送", cycle_timers): - reply_text = await self._send_response( - reply_set=response_set, - message_data=action_message, - selected_expressions=selected_expressions, - ) - - # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 - platform = action_message.chat_info.platform - if platform is None: - platform = getattr(self.chat_stream, "platform", "unknown") - - person = Person(platform=platform, user_id=action_message.user_info.user_id) - person_name = person.person_name - action_prompt_display = f"你对{person_name}进行了回复:{reply_text}" - - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=False, - action_prompt_display=action_prompt_display, - action_done=True, - thinking_id=thinking_id, - action_data={"reply_text": reply_text}, - action_name="reply", - ) - - # 构建循环信息 - loop_info: Dict[str, Any] = { - "loop_plan_info": { - "action_result": actions, - }, - "loop_action_info": { - "action_taken": True, - "reply_text": reply_text, - "command": "", - "taken_time": time.time(), - }, - } - - return loop_info, reply_text, cycle_timers - - async def _observe( - self, # interest_value: float = 0.0, - recent_messages_list: Optional[List["DatabaseMessages"]] = None, - ) -> bool: # sourcery skip: merge-else-if-into-elif, remove-redundant-if - if recent_messages_list is None: - recent_messages_list = [] - _reply_text = "" # 初始化reply_text变量,避免UnboundLocalError - - async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): - asyncio.create_task(self.expression_learner.trigger_learning_for_chat()) - asyncio.create_task(global_memory_chest.build_running_content(chat_id=self.stream_id)) - - cycle_timers, thinking_id = self.start_cycle() - logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考") - - # 第一步:动作检查 - available_actions: Dict[str, ActionInfo] = {} - try: - await self.action_modifier.modify_actions() - available_actions = self.action_manager.get_using_actions() - except Exception as e: - logger.error(f"{self.log_prefix} 动作修改失败: {e}") - - # 执行planner - is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() - - message_list_before_now = get_raw_msg_before_timestamp_with_chat( - chat_id=self.stream_id, - timestamp=time.time(), - limit=int(global_config.chat.max_context_size * 0.6), - ) - chat_content_block, message_id_list = build_readable_messages_with_id( - messages=message_list_before_now, - timestamp_mode="normal_no_YMD", - read_mark=self.action_planner.last_obs_time_mark, - truncate=True, - show_actions=True, - ) - - prompt_info = await self.action_planner.build_planner_prompt( - is_group_chat=is_group_chat, - chat_target_info=chat_target_info, - current_available_actions=available_actions, - chat_content_block=chat_content_block, - message_id_list=message_id_list, - interest=global_config.personality.interest, - ) - continue_flag, modified_message = await events_manager.handle_mai_events( - EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id - ) - if not continue_flag: - return False - if modified_message and modified_message._modify_flags.modify_llm_prompt: - prompt_info = (modified_message.llm_prompt, prompt_info[1]) - - with Timer("规划器", cycle_timers): - action_to_use_info = await self.action_planner.plan( - loop_start_time=self.last_read_time, - available_actions=available_actions, - ) - - # 3. 并行执行所有动作 - action_tasks = [ - asyncio.create_task( - self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers) - ) - for action in action_to_use_info - ] - - # 并行执行所有任务 - results = await asyncio.gather(*action_tasks, return_exceptions=True) - - # 处理执行结果 - reply_loop_info = None - reply_text_from_reply = "" - action_success = False - action_reply_text = "" - - for result in results: - if isinstance(result, BaseException): - logger.error(f"{self.log_prefix} 动作执行异常: {result}") - continue - - if result["action_type"] != "reply": - action_success = result["success"] - action_reply_text = result["reply_text"] - elif result["action_type"] == "reply": - if result["success"]: - reply_loop_info = result["loop_info"] - reply_text_from_reply = result["reply_text"] - else: - logger.warning(f"{self.log_prefix} 回复动作执行失败") - - # 构建最终的循环信息 - if reply_loop_info: - # 如果有回复信息,使用回复的loop_info作为基础 - loop_info = reply_loop_info - # 更新动作执行信息 - loop_info["loop_action_info"].update( - { - "action_taken": action_success, - "taken_time": time.time(), - } - ) - _reply_text = reply_text_from_reply - else: - # 没有回复信息,构建纯动作的loop_info - loop_info = { - "loop_plan_info": { - "action_result": action_to_use_info, - }, - "loop_action_info": { - "action_taken": action_success, - "reply_text": action_reply_text, - "taken_time": time.time(), - }, - } - _reply_text = action_reply_text - - self.end_cycle(loop_info, cycle_timers) - self.print_cycle_info(cycle_timers) - - return True - - async def _main_chat_loop(self): - """主循环,持续进行计划并可能回复消息,直到被外部取消。""" - try: - while self.running: - # 主循环 - success = await self._loopbody() - await asyncio.sleep(0.1) - if not success: - break - except asyncio.CancelledError: - # 设置了关闭标志位后被取消是正常流程 - logger.info(f"{self.log_prefix} 麦麦已关闭聊天") - except Exception: - logger.error(f"{self.log_prefix} 麦麦聊天意外错误,将于3s后尝试重新启动") - print(traceback.format_exc()) - await asyncio.sleep(3) - self._loop_task = asyncio.create_task(self._main_chat_loop()) - logger.error(f"{self.log_prefix} 结束了当前聊天循环") - - async def _handle_action( - self, - action: str, - reasoning: str, - action_data: dict, - cycle_timers: Dict[str, float], - thinking_id: str, - action_message: Optional["DatabaseMessages"] = None, - ) -> tuple[bool, str, str]: - """ - 处理规划动作,使用动作工厂创建相应的动作处理器 - - 参数: - action: 动作类型 - reasoning: 决策理由 - action_data: 动作数据,包含不同动作需要的参数 - cycle_timers: 计时器字典 - thinking_id: 思考ID - - 返回: - tuple[bool, str, str]: (是否执行了动作, 思考消息ID, 命令) - """ - try: - # 使用工厂创建动作处理器实例 - try: - action_handler = self.action_manager.create_action( - action_name=action, - action_data=action_data, - action_reasoning=reasoning, - cycle_timers=cycle_timers, - thinking_id=thinking_id, - chat_stream=self.chat_stream, - log_prefix=self.log_prefix, - action_message=action_message, - ) - except Exception as e: - logger.error(f"{self.log_prefix} 创建动作处理器时出错: {e}") - traceback.print_exc() - return False, "", "" - - if not action_handler: - logger.warning(f"{self.log_prefix} 未能创建动作处理器: {action}") - return False, "", "" - - # 处理动作并获取结果(固定记录一次动作信息) - # BaseAction 定义了异步方法 execute() 作为统一执行入口 - # 这里调用 execute() 以兼容所有 Action 实现 - result = await action_handler.execute() - success, action_text = result - command = "" - - return success, action_text, command - - except Exception as e: - logger.error(f"{self.log_prefix} 处理{action}时出错: {e}") - traceback.print_exc() - return False, "", "" - - async def _send_response( - self, - reply_set: "ReplySetModel", - message_data: "DatabaseMessages", - selected_expressions: Optional[List[int]] = None, - ) -> str: - new_message_count = message_api.count_new_messages( - chat_id=self.chat_stream.stream_id, start_time=self.last_read_time, end_time=time.time() - ) - - need_reply = new_message_count >= random.randint(2, 4) - - if need_reply: - logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复") - - reply_text = "" - first_replied = False - for reply_content in reply_set.reply_data: - if reply_content.content_type != ReplyContentType.TEXT: - continue - data: str = reply_content.content # type: ignore - if not first_replied: - await send_api.text_to_stream( - text=data, - stream_id=self.chat_stream.stream_id, - reply_message=message_data, - set_reply=need_reply, - typing=False, - selected_expressions=selected_expressions, - ) - first_replied = True - else: - await send_api.text_to_stream( - text=data, - stream_id=self.chat_stream.stream_id, - reply_message=message_data, - set_reply=False, - typing=True, - selected_expressions=selected_expressions, - ) - reply_text += data - - return reply_text - - async def _execute_action( - self, - action_planner_info: ActionPlannerInfo, - chosen_action_plan_infos: List[ActionPlannerInfo], - thinking_id: str, - available_actions: Dict[str, ActionInfo], - cycle_timers: Dict[str, float], - ): - """执行单个动作的通用函数""" - try: - with Timer(f"动作{action_planner_info.action_type}", cycle_timers): - if action_planner_info.action_type == "no_reply": - # 直接处理no_reply逻辑,不再通过动作系统 - reason = action_planner_info.reasoning or "选择不回复" - # logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") - - # 存储no_reply信息到数据库 - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=False, - action_prompt_display=reason, - action_done=True, - thinking_id=thinking_id, - action_data={"reason": reason}, - action_name="no_reply", - ) - return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""} - - elif action_planner_info.action_type == "reply": - try: - success, llm_response = await generator_api.generate_reply( - chat_stream=self.chat_stream, - reply_message=action_planner_info.action_message, - available_actions=available_actions, - chosen_actions=chosen_action_plan_infos, - reply_reason=action_planner_info.reasoning or "", - enable_tool=global_config.tool.enable_tool, - request_type="replyer", - from_plugin=False, - ) - - if not success or not llm_response or not llm_response.reply_set: - if action_planner_info.action_message: - logger.info( - f"对 {action_planner_info.action_message.processed_plain_text} 的回复生成失败" - ) - else: - logger.info("回复生成失败") - return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} - - except asyncio.CancelledError: - logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消") - return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} - response_set = llm_response.reply_set - selected_expressions = llm_response.selected_expressions - loop_info, reply_text, _ = await self._send_and_store_reply( - response_set=response_set, - action_message=action_planner_info.action_message, # type: ignore - cycle_timers=cycle_timers, - thinking_id=thinking_id, - actions=chosen_action_plan_infos, - selected_expressions=selected_expressions, - ) - return { - "action_type": "reply", - "success": True, - "reply_text": reply_text, - "loop_info": loop_info, - } - - # 其他动作 - else: - # 执行普通动作 - with Timer("动作执行", cycle_timers): - success, reply_text, command = await self._handle_action( - action_planner_info.action_type, - action_planner_info.reasoning or "", - action_planner_info.action_data or {}, - cycle_timers, - thinking_id, - action_planner_info.action_message, - ) - return { - "action_type": action_planner_info.action_type, - "success": success, - "reply_text": reply_text, - "command": command, - } - - except Exception as e: - logger.error(f"{self.log_prefix} 执行动作时出错: {e}") - logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}") - return { - "action_type": action_planner_info.action_type, - "success": False, - "reply_text": "", - "loop_info": None, - "error": str(e), - } +import asyncio +import time +import traceback +import random +from typing import List, Optional, Dict, Any, Tuple, TYPE_CHECKING +from rich.traceback import install + +from src.config.config import global_config +from src.common.logger import get_logger +from src.common.data_models.info_data_model import ActionPlannerInfo +from src.common.data_models.message_data_model import ReplyContentType +from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager +from src.chat.utils.prompt_builder import global_prompt_manager +from src.chat.utils.timer_calculator import Timer +from src.chat.brain_chat.brain_planner import BrainPlanner +from src.chat.planner_actions.action_modifier import ActionModifier +from src.chat.planner_actions.action_manager import ActionManager +from src.chat.heart_flow.hfc_utils import CycleDetail +from src.express.expression_learner import expression_learner_manager +from src.person_info.person_info import Person +from src.plugin_system.base.component_types import EventType, ActionInfo +from src.plugin_system.core import events_manager +from src.plugin_system.apis import generator_api, send_api, message_api, database_api +from src.memory_system.Memory_chest import global_memory_chest +from src.chat.utils.chat_message_builder import ( + build_readable_messages_with_id, + get_raw_msg_before_timestamp_with_chat, +) + +if TYPE_CHECKING: + from src.common.data_models.database_data_model import DatabaseMessages + from src.common.data_models.message_data_model import ReplySetModel + + +ERROR_LOOP_INFO = { + "loop_plan_info": { + "action_result": { + "action_type": "error", + "action_data": {}, + "reasoning": "循环处理失败", + }, + }, + "loop_action_info": { + "action_taken": False, + "reply_text": "", + "command": "", + "taken_time": time.time(), + }, +} + + +install(extra_lines=3) + +# 注释:原来的动作修改超时常量已移除,因为改为顺序执行 + +logger = get_logger("bc") # Logger Name Changed + + +class BrainChatting: + """ + 管理一个连续的私聊Brain Chat循环 + 用于在特定聊天流中生成回复。 + """ + + def __init__(self, chat_id: str): + """ + BrainChatting 初始化函数 + + 参数: + chat_id: 聊天流唯一标识符(如stream_id) + on_stop_focus_chat: 当收到stop_focus_chat命令时调用的回调函数 + performance_version: 性能记录版本号,用于区分不同启动版本 + """ + # 基础属性 + self.stream_id: str = chat_id # 聊天流ID + self.chat_stream: ChatStream = get_chat_manager().get_stream(self.stream_id) # type: ignore + if not self.chat_stream: + raise ValueError(f"无法找到聊天流: {self.stream_id}") + self.log_prefix = f"[{get_chat_manager().get_stream_name(self.stream_id) or self.stream_id}]" + + self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id) + + self.action_manager = ActionManager() + self.action_planner = BrainPlanner(chat_id=self.stream_id, action_manager=self.action_manager) + self.action_modifier = ActionModifier(action_manager=self.action_manager, chat_id=self.stream_id) + + # 循环控制内部状态 + self.running: bool = False + self._loop_task: Optional[asyncio.Task] = None # 主循环任务 + + # 添加循环信息管理相关的属性 + self.history_loop: List[CycleDetail] = [] + self._cycle_counter = 0 + self._current_cycle_detail: CycleDetail = None # type: ignore + + self.last_read_time = time.time() - 2 + + self.more_plan = False + + async def start(self): + """检查是否需要启动主循环,如果未激活则启动。""" + + # 如果循环已经激活,直接返回 + if self.running: + logger.debug(f"{self.log_prefix} BrainChatting 已激活,无需重复启动") + return + + try: + # 标记为活动状态,防止重复启动 + self.running = True + + self._loop_task = asyncio.create_task(self._main_chat_loop()) + self._loop_task.add_done_callback(self._handle_loop_completion) + logger.info(f"{self.log_prefix} BrainChatting 启动完成") + + except Exception as e: + # 启动失败时重置状态 + self.running = False + self._loop_task = None + logger.error(f"{self.log_prefix} BrainChatting 启动失败: {e}") + raise + + def _handle_loop_completion(self, task: asyncio.Task): + """当 _hfc_loop 任务完成时执行的回调。""" + try: + if exception := task.exception(): + logger.error(f"{self.log_prefix} BrainChatting: 脱离了聊天(异常): {exception}") + logger.error(traceback.format_exc()) # Log full traceback for exceptions + else: + logger.info(f"{self.log_prefix} BrainChatting: 脱离了聊天 (外部停止)") + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} BrainChatting: 结束了聊天") + + def start_cycle(self) -> Tuple[Dict[str, float], str]: + self._cycle_counter += 1 + self._current_cycle_detail = CycleDetail(self._cycle_counter) + self._current_cycle_detail.thinking_id = f"tid{str(round(time.time(), 2))}" + cycle_timers = {} + return cycle_timers, self._current_cycle_detail.thinking_id + + def end_cycle(self, loop_info, cycle_timers): + self._current_cycle_detail.set_loop_info(loop_info) + self.history_loop.append(self._current_cycle_detail) + self._current_cycle_detail.timers = cycle_timers + self._current_cycle_detail.end_time = time.time() + + def print_cycle_info(self, cycle_timers): + # 记录循环信息和计时器结果 + timer_strings = [] + for name, elapsed in cycle_timers.items(): + formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" + timer_strings.append(f"{name}: {formatted_time}") + + logger.info( + f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," + f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒" # type: ignore + + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") + ) + + async def _loopbody(self): # sourcery skip: hoist-if-from-if + recent_messages_list = message_api.get_messages_by_time_in_chat( + chat_id=self.stream_id, + start_time=self.last_read_time, + end_time=time.time(), + limit=20, + limit_mode="latest", + filter_mai=True, + filter_command=True, + ) + + if len(recent_messages_list) >= 1: + self.last_read_time = time.time() + await self._observe(recent_messages_list=recent_messages_list) + + else: + # Normal模式:消息数量不足,等待 + await asyncio.sleep(0.2) + return True + return True + + async def _send_and_store_reply( + self, + response_set: "ReplySetModel", + action_message: "DatabaseMessages", + cycle_timers: Dict[str, float], + thinking_id, + actions, + selected_expressions: Optional[List[int]] = None, + ) -> Tuple[Dict[str, Any], str, Dict[str, float]]: + with Timer("回复发送", cycle_timers): + reply_text = await self._send_response( + reply_set=response_set, + message_data=action_message, + selected_expressions=selected_expressions, + ) + + # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 + platform = action_message.chat_info.platform + if platform is None: + platform = getattr(self.chat_stream, "platform", "unknown") + + person = Person(platform=platform, user_id=action_message.user_info.user_id) + person_name = person.person_name + action_prompt_display = f"你对{person_name}进行了回复:{reply_text}" + + await database_api.store_action_info( + chat_stream=self.chat_stream, + action_build_into_prompt=False, + action_prompt_display=action_prompt_display, + action_done=True, + thinking_id=thinking_id, + action_data={"reply_text": reply_text}, + action_name="reply", + ) + + # 构建循环信息 + loop_info: Dict[str, Any] = { + "loop_plan_info": { + "action_result": actions, + }, + "loop_action_info": { + "action_taken": True, + "reply_text": reply_text, + "command": "", + "taken_time": time.time(), + }, + } + + return loop_info, reply_text, cycle_timers + + async def _observe( + self, # interest_value: float = 0.0, + recent_messages_list: Optional[List["DatabaseMessages"]] = None, + ) -> bool: # sourcery skip: merge-else-if-into-elif, remove-redundant-if + if recent_messages_list is None: + recent_messages_list = [] + _reply_text = "" # 初始化reply_text变量,避免UnboundLocalError + + async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): + asyncio.create_task(self.expression_learner.trigger_learning_for_chat()) + asyncio.create_task(global_memory_chest.build_running_content(chat_id=self.stream_id)) + + cycle_timers, thinking_id = self.start_cycle() + logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考") + + # 第一步:动作检查 + available_actions: Dict[str, ActionInfo] = {} + try: + await self.action_modifier.modify_actions() + available_actions = self.action_manager.get_using_actions() + except Exception as e: + logger.error(f"{self.log_prefix} 动作修改失败: {e}") + + # 执行planner + is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() + + message_list_before_now = get_raw_msg_before_timestamp_with_chat( + chat_id=self.stream_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.6), + ) + chat_content_block, message_id_list = build_readable_messages_with_id( + messages=message_list_before_now, + timestamp_mode="normal_no_YMD", + read_mark=self.action_planner.last_obs_time_mark, + truncate=True, + show_actions=True, + ) + + prompt_info = await self.action_planner.build_planner_prompt( + is_group_chat=is_group_chat, + chat_target_info=chat_target_info, + current_available_actions=available_actions, + chat_content_block=chat_content_block, + message_id_list=message_id_list, + interest=global_config.personality.interest, + ) + continue_flag, modified_message = await events_manager.handle_mai_events( + EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id + ) + if not continue_flag: + return False + if modified_message and modified_message._modify_flags.modify_llm_prompt: + prompt_info = (modified_message.llm_prompt, prompt_info[1]) + + with Timer("规划器", cycle_timers): + action_to_use_info = await self.action_planner.plan( + loop_start_time=self.last_read_time, + available_actions=available_actions, + ) + + # 3. 并行执行所有动作 + action_tasks = [ + asyncio.create_task( + self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers) + ) + for action in action_to_use_info + ] + + # 并行执行所有任务 + results = await asyncio.gather(*action_tasks, return_exceptions=True) + + # 处理执行结果 + reply_loop_info = None + reply_text_from_reply = "" + action_success = False + action_reply_text = "" + + for result in results: + if isinstance(result, BaseException): + logger.error(f"{self.log_prefix} 动作执行异常: {result}") + continue + + if result["action_type"] != "reply": + action_success = result["success"] + action_reply_text = result["reply_text"] + elif result["action_type"] == "reply": + if result["success"]: + reply_loop_info = result["loop_info"] + reply_text_from_reply = result["reply_text"] + else: + logger.warning(f"{self.log_prefix} 回复动作执行失败") + + # 构建最终的循环信息 + if reply_loop_info: + # 如果有回复信息,使用回复的loop_info作为基础 + loop_info = reply_loop_info + # 更新动作执行信息 + loop_info["loop_action_info"].update( + { + "action_taken": action_success, + "taken_time": time.time(), + } + ) + _reply_text = reply_text_from_reply + else: + # 没有回复信息,构建纯动作的loop_info + loop_info = { + "loop_plan_info": { + "action_result": action_to_use_info, + }, + "loop_action_info": { + "action_taken": action_success, + "reply_text": action_reply_text, + "taken_time": time.time(), + }, + } + _reply_text = action_reply_text + + self.end_cycle(loop_info, cycle_timers) + self.print_cycle_info(cycle_timers) + + return True + + async def _main_chat_loop(self): + """主循环,持续进行计划并可能回复消息,直到被外部取消。""" + try: + while self.running: + # 主循环 + success = await self._loopbody() + await asyncio.sleep(0.1) + if not success: + break + except asyncio.CancelledError: + # 设置了关闭标志位后被取消是正常流程 + logger.info(f"{self.log_prefix} 麦麦已关闭聊天") + except Exception: + logger.error(f"{self.log_prefix} 麦麦聊天意外错误,将于3s后尝试重新启动") + print(traceback.format_exc()) + await asyncio.sleep(3) + self._loop_task = asyncio.create_task(self._main_chat_loop()) + logger.error(f"{self.log_prefix} 结束了当前聊天循环") + + async def _handle_action( + self, + action: str, + reasoning: str, + action_data: dict, + cycle_timers: Dict[str, float], + thinking_id: str, + action_message: Optional["DatabaseMessages"] = None, + ) -> tuple[bool, str, str]: + """ + 处理规划动作,使用动作工厂创建相应的动作处理器 + + 参数: + action: 动作类型 + reasoning: 决策理由 + action_data: 动作数据,包含不同动作需要的参数 + cycle_timers: 计时器字典 + thinking_id: 思考ID + + 返回: + tuple[bool, str, str]: (是否执行了动作, 思考消息ID, 命令) + """ + try: + # 使用工厂创建动作处理器实例 + try: + action_handler = self.action_manager.create_action( + action_name=action, + action_data=action_data, + action_reasoning=reasoning, + cycle_timers=cycle_timers, + thinking_id=thinking_id, + chat_stream=self.chat_stream, + log_prefix=self.log_prefix, + action_message=action_message, + ) + except Exception as e: + logger.error(f"{self.log_prefix} 创建动作处理器时出错: {e}") + traceback.print_exc() + return False, "", "" + + if not action_handler: + logger.warning(f"{self.log_prefix} 未能创建动作处理器: {action}") + return False, "", "" + + # 处理动作并获取结果(固定记录一次动作信息) + # BaseAction 定义了异步方法 execute() 作为统一执行入口 + # 这里调用 execute() 以兼容所有 Action 实现 + result = await action_handler.execute() + success, action_text = result + command = "" + + return success, action_text, command + + except Exception as e: + logger.error(f"{self.log_prefix} 处理{action}时出错: {e}") + traceback.print_exc() + return False, "", "" + + async def _send_response( + self, + reply_set: "ReplySetModel", + message_data: "DatabaseMessages", + selected_expressions: Optional[List[int]] = None, + ) -> str: + new_message_count = message_api.count_new_messages( + chat_id=self.chat_stream.stream_id, start_time=self.last_read_time, end_time=time.time() + ) + + need_reply = new_message_count >= random.randint(2, 4) + + if need_reply: + logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复") + + reply_text = "" + first_replied = False + for reply_content in reply_set.reply_data: + if reply_content.content_type != ReplyContentType.TEXT: + continue + data: str = reply_content.content # type: ignore + if not first_replied: + await send_api.text_to_stream( + text=data, + stream_id=self.chat_stream.stream_id, + reply_message=message_data, + set_reply=need_reply, + typing=False, + selected_expressions=selected_expressions, + ) + first_replied = True + else: + await send_api.text_to_stream( + text=data, + stream_id=self.chat_stream.stream_id, + reply_message=message_data, + set_reply=False, + typing=True, + selected_expressions=selected_expressions, + ) + reply_text += data + + return reply_text + + async def _execute_action( + self, + action_planner_info: ActionPlannerInfo, + chosen_action_plan_infos: List[ActionPlannerInfo], + thinking_id: str, + available_actions: Dict[str, ActionInfo], + cycle_timers: Dict[str, float], + ): + """执行单个动作的通用函数""" + try: + with Timer(f"动作{action_planner_info.action_type}", cycle_timers): + if action_planner_info.action_type == "no_reply": + # 直接处理no_reply逻辑,不再通过动作系统 + reason = action_planner_info.reasoning or "选择不回复" + # logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") + + # 存储no_reply信息到数据库 + await database_api.store_action_info( + chat_stream=self.chat_stream, + action_build_into_prompt=False, + action_prompt_display=reason, + action_done=True, + thinking_id=thinking_id, + action_data={"reason": reason}, + action_name="no_reply", + ) + return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""} + + elif action_planner_info.action_type == "reply": + try: + success, llm_response = await generator_api.generate_reply( + chat_stream=self.chat_stream, + reply_message=action_planner_info.action_message, + available_actions=available_actions, + chosen_actions=chosen_action_plan_infos, + reply_reason=action_planner_info.reasoning or "", + enable_tool=global_config.tool.enable_tool, + request_type="replyer", + from_plugin=False, + ) + + if not success or not llm_response or not llm_response.reply_set: + if action_planner_info.action_message: + logger.info( + f"对 {action_planner_info.action_message.processed_plain_text} 的回复生成失败" + ) + else: + logger.info("回复生成失败") + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} + + except asyncio.CancelledError: + logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消") + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} + response_set = llm_response.reply_set + selected_expressions = llm_response.selected_expressions + loop_info, reply_text, _ = await self._send_and_store_reply( + response_set=response_set, + action_message=action_planner_info.action_message, # type: ignore + cycle_timers=cycle_timers, + thinking_id=thinking_id, + actions=chosen_action_plan_infos, + selected_expressions=selected_expressions, + ) + return { + "action_type": "reply", + "success": True, + "reply_text": reply_text, + "loop_info": loop_info, + } + + # 其他动作 + else: + # 执行普通动作 + with Timer("动作执行", cycle_timers): + success, reply_text, command = await self._handle_action( + action_planner_info.action_type, + action_planner_info.reasoning or "", + action_planner_info.action_data or {}, + cycle_timers, + thinking_id, + action_planner_info.action_message, + ) + return { + "action_type": action_planner_info.action_type, + "success": success, + "reply_text": reply_text, + "command": command, + } + + except Exception as e: + logger.error(f"{self.log_prefix} 执行动作时出错: {e}") + logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}") + return { + "action_type": action_planner_info.action_type, + "success": False, + "reply_text": "", + "loop_info": None, + "error": str(e), + } diff --git a/src/chat/frequency_control/frequency_control.py b/src/chat/frequency_control/frequency_control.py index dfa5a616..905e0da5 100644 --- a/src/chat/frequency_control/frequency_control.py +++ b/src/chat/frequency_control/frequency_control.py @@ -98,8 +98,8 @@ class FrequencyControl: prompt, ) - logger.info(f"频率调整 prompt: {prompt}") - logger.info(f"频率调整 response: {response}") + # logger.info(f"频率调整 prompt: {prompt}") + # logger.info(f"频率调整 response: {response}") if global_config.debug.show_prompt: logger.info(f"频率调整 prompt: {prompt}") diff --git a/src/chat/replyer/group_generator.py b/src/chat/replyer/group_generator.py index 2b43ddb0..b8c68954 100644 --- a/src/chat/replyer/group_generator.py +++ b/src/chat/replyer/group_generator.py @@ -133,7 +133,7 @@ class DefaultReplyer: try: content, reasoning_content, model_name, tool_call = await self.llm_generate_content(prompt) - logger.debug(f"replyer生成内容: {content}") + # logger.debug(f"replyer生成内容: {content}") logger.info(f"replyer生成内容: {content}") logger.info(f"replyer生成推理: {reasoning_content}") @@ -998,7 +998,7 @@ class DefaultReplyer: async def llm_generate_content(self, prompt: str): with Timer("LLM生成", {}): # 内部计时器,可选保留 # 直接使用已初始化的模型实例 - logger.info(f"\n{prompt}\n") + # logger.info(f"\n{prompt}\n") if global_config.debug.show_prompt: logger.info(f"\n{prompt}\n") diff --git a/src/config/config.py b/src/config/config.py index e49d427d..cf53228c 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -55,7 +55,7 @@ TEMPLATE_DIR = os.path.join(PROJECT_ROOT, "template") # 考虑到,实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码 # 对该字段的更新,请严格参照语义化版本规范:https://semver.org/lang/zh-CN/ -MMC_VERSION = "0.11.0-snapshot.3" +MMC_VERSION = "0.11.0" def get_key_comment(toml_table, key): diff --git a/template/model_config_template.toml b/template/model_config_template.toml index 830b27ca..565377e4 100644 --- a/template/model_config_template.toml +++ b/template/model_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "1.7.6" +version = "1.7.7" # 配置文件版本号迭代规则同bot_config.toml @@ -121,7 +121,7 @@ max_tokens = 800 [model_task_config.replyer] # 首要回复模型,还用于表达器和表达方式学习 model_list = ["siliconflow-deepseek-v3.2-think","siliconflow-deepseek-r1","siliconflow-deepseek-v3.2"] temperature = 0.3 # 模型温度,新V3建议0.1-0.3 -max_tokens = 800 +max_tokens = 2048 [model_task_config.planner] #决策:负责决定麦麦该什么时候回复的模型 model_list = ["siliconflow-deepseek-v3.2"]