diff --git a/src/chat/brain_chat/brain_chat.py b/src/chat/brain_chat/brain_chat.py index 35c1e513..2b4863ac 100644 --- a/src/chat/brain_chat/brain_chat.py +++ b/src/chat/brain_chat/brain_chat.py @@ -1,23 +1,26 @@ import asyncio +import random import time import traceback -import random -from typing import List, Optional, Dict, Any, Tuple, TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from rich.traceback import install from src.config.config import global_config from src.common.logger import get_logger +from src.common.utils.utils_config import ExpressionConfigUtils +from src.bw_learner.expression_learner import ExpressionLearner +from src.bw_learner.jargon_miner import JargonMiner +from src.chat.message_receive.chat_manager import BotChatSession +from src.chat.message_receive.chat_manager import chat_manager as _chat_manager +from src.chat.message_receive.message import SessionMessage from src.common.data_models.info_data_model import ActionPlannerInfo from src.common.data_models.message_component_data_model import MessageSequence, TextComponent -from src.chat.message_receive.chat_manager import chat_manager as _chat_manager from src.chat.utils.prompt_builder import global_prompt_manager from src.chat.utils.timer_calculator import Timer from src.chat.brain_chat.brain_planner import BrainPlanner from src.chat.planner_actions.action_modifier import ActionModifier from src.chat.planner_actions.action_manager import ActionManager -from src.chat.heart_flow.hfc_utils_old import CycleDetail -from src.bw_learner.expression_learner_old import expression_learner_manager -from src.bw_learner.message_recorder_old import extract_and_distribute_messages +from src.chat.heart_flow.heartFC_utils import CycleDetail from src.person_info.person_info import Person from src.core.types import ActionInfo, EventType from src.core.event_bus import event_bus @@ -31,7 +34,7 @@ from src.services import ( from src.services.message_service import build_readable_messages_with_id, get_messages_before_time_in_chat if TYPE_CHECKING: - from src.common.data_models.database_data_model import DatabaseMessages + from src.chat.message_receive.message import SessionMessage ERROR_LOOP_INFO = { @@ -75,12 +78,20 @@ class BrainChatting: """ # 基础属性 self.stream_id: str = session_id # 聊天流ID - self.chat_stream: ChatStream = get_chat_manager().get_stream(self.stream_id) # type: ignore + self.chat_stream: BotChatSession = _chat_manager.get_session_by_session_id(self.stream_id) # type: ignore[assignment] if not self.chat_stream: raise ValueError(f"无法找到聊天流: {self.stream_id}") self.log_prefix = f"[{_chat_manager.get_session_name(self.stream_id) or self.stream_id}]" - self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id) + expr_use, jargon_learn, expr_learn = ExpressionConfigUtils.get_expression_config_for_chat(self.stream_id) + self._enable_expression_use = expr_use + self._enable_expression_learning = expr_learn + self._enable_jargon_learning = jargon_learn + self._expression_learner = ExpressionLearner(self.stream_id) + self._jargon_miner = JargonMiner(self.stream_id, session_name=self.log_prefix.strip("[]")) + self._min_messages_for_extraction = 30 + self._min_extraction_interval = 60 + self._last_extraction_time = 0.0 self.action_manager = ActionManager() self.action_planner = BrainPlanner(chat_id=self.stream_id, action_manager=self.action_manager) @@ -163,6 +174,25 @@ class BrainChatting: + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") ) + async def _trigger_expression_learning(self, messages: List[SessionMessage]) -> None: + if not messages: + return + + self._expression_learner.add_messages(messages) + if time.time() - self._last_extraction_time < self._min_extraction_interval: + return + if self._expression_learner.get_cache_size() < self._min_messages_for_extraction: + return + if not self._enable_expression_learning: + return + + self._last_extraction_time = time.time() + try: + jargon_miner = self._jargon_miner if self._enable_jargon_learning else None + await self._expression_learner.learn(jargon_miner) + except Exception as exc: + logger.error(f"{self.log_prefix} 表达学习失败: {exc}", exc_info=True) + async def _loopbody(self): # sourcery skip: hoist-if-from-if # 获取最新消息(用于上下文,但不影响是否调用 observe) recent_messages_list = message_api.get_messages_by_time_in_chat( @@ -197,8 +227,8 @@ class BrainChatting: async def _send_and_store_reply( self, - response_set: "ReplySetModel", - action_message: "DatabaseMessages", + response_set: MessageSequence, + action_message: SessionMessage, cycle_timers: Dict[str, float], thinking_id, actions, @@ -212,11 +242,11 @@ class BrainChatting: ) # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 - platform = action_message.chat_info.platform + platform = action_message.platform if platform is None: platform = getattr(self.chat_stream, "platform", "unknown") - person = Person(platform=platform, user_id=action_message.user_info.user_id) + person = Person(platform=platform, user_id=action_message.message_info.user_info.user_id) person_name = person.person_name action_prompt_display = f"你对{person_name}进行了回复:{reply_text}" @@ -245,16 +275,15 @@ class BrainChatting: async def _observe( self, # interest_value: float = 0.0, - recent_messages_list: Optional[List["DatabaseMessages"]] = None, + recent_messages_list: Optional[List[SessionMessage]] = None, ) -> bool: # sourcery skip: merge-else-if-into-elif, remove-redundant-if if recent_messages_list is None: recent_messages_list = [] _reply_text = "" # 初始化reply_text变量,避免UnboundLocalError async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): - # 通过 MessageRecorder 统一提取消息并分发给 expression_learner 和 jargon_miner - # 在 replyer 执行时触发,统一管理时间窗口,避免重复获取消息 - asyncio.create_task(extract_and_distribute_messages(self.stream_id)) + if recent_messages_list: + asyncio.create_task(self._trigger_expression_learning(recent_messages_list)) cycle_timers, thinking_id = self.start_cycle() logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考") @@ -294,7 +323,7 @@ class BrainChatting: prompt_key="brain_planner", ) _event_msg = build_event_message( - EventType.ON_PLAN, llm_prompt=prompt_info[0], stream_id=self.chat_stream.stream_id + EventType.ON_PLAN, llm_prompt=prompt_info[0], stream_id=self.chat_stream.session_id ) continue_flag, modified_message = await event_bus.emit(EventType.ON_PLAN, _event_msg) if not continue_flag: @@ -454,7 +483,7 @@ class BrainChatting: action_data: dict, cycle_timers: Dict[str, float], thinking_id: str, - action_message: Optional["DatabaseMessages"] = None, + action_message: Optional[SessionMessage] = None, ) -> tuple[bool, str, str]: """ 处理规划动作,使用动作工厂创建相应的动作处理器 @@ -508,11 +537,11 @@ class BrainChatting: async def _send_response( self, reply_set: MessageSequence, - message_data: "DatabaseMessages", + message_data: SessionMessage, selected_expressions: Optional[List[int]] = None, ) -> str: new_message_count = message_api.count_new_messages( - chat_id=self.chat_stream.stream_id, start_time=self.last_read_time, end_time=time.time() + chat_id=self.chat_stream.session_id, start_time=self.last_read_time, end_time=time.time() ) need_reply = new_message_count >= random.randint(2, 4) @@ -529,7 +558,7 @@ class BrainChatting: if not first_replied: await send_api.text_to_stream( text=data, - stream_id=self.chat_stream.stream_id, + stream_id=self.chat_stream.session_id, reply_message=message_data, set_reply=need_reply, typing=False, @@ -539,7 +568,7 @@ class BrainChatting: else: await send_api.text_to_stream( text=data, - stream_id=self.chat_stream.stream_id, + stream_id=self.chat_stream.session_id, reply_message=message_data, set_reply=False, typing=True, @@ -585,9 +614,8 @@ class BrainChatting: cleaned_uw: List[str] = [] for item in uw: if isinstance(item, str): - s = item.strip() - if s: - cleaned_uw.append(s) + if stripped_item := item.strip(): + cleaned_uw.append(stripped_item) if cleaned_uw: unknown_words = cleaned_uw diff --git a/src/chat/heart_flow/heartFC_chat_old.py b/src/chat/heart_flow/heartFC_chat_old.py deleted file mode 100644 index 8901e8b5..00000000 --- a/src/chat/heart_flow/heartFC_chat_old.py +++ /dev/null @@ -1,798 +0,0 @@ -import asyncio -import time -import traceback -import random -from typing import List, Optional, Dict, Any, Tuple, TYPE_CHECKING -from rich.traceback import install - -from src.config.config import global_config -from src.common.logger import get_logger -from src.common.data_models.info_data_model import ActionPlannerInfo -from src.common.data_models.message_data_model import ReplyContentType -from src.chat.message_receive.chat_manager import chat_manager, BotChatSession -from src.chat.utils.prompt_builder import global_prompt_manager -from src.chat.utils.timer_calculator import Timer -from src.chat.planner_actions.planner import ActionPlanner -from src.chat.planner_actions.action_modifier import ActionModifier -from src.chat.planner_actions.action_manager import ActionManager -from src.chat.heart_flow.hfc_utils_old import CycleDetail -from src.bw_learner.expression_learner_old import expression_learner_manager -from src.chat.heart_flow.frequency_control import frequency_control_manager -from src.bw_learner.message_recorder_old import extract_and_distribute_messages -from src.person_info.person_info import Person -from src.plugin_system.base.component_types import EventType, ActionInfo -from src.plugin_system.core import events_manager -from src.plugin_system.apis import generator_api, send_api, message_api, database_api -from src.chat.utils.chat_message_builder import ( - build_readable_messages_with_id, - get_raw_msg_before_timestamp_with_chat, -) -from src.chat.utils.utils import record_replyer_action_temp -from src.memory_system.chat_history_summarizer import ChatHistorySummarizer - -if TYPE_CHECKING: - from src.common.data_models.database_data_model import DatabaseMessages - from src.common.data_models.message_data_model import ReplySetModel - - -ERROR_LOOP_INFO = { - "loop_plan_info": { - "action_result": { - "action_type": "error", - "action_data": {}, - "reasoning": "循环处理失败", - }, - }, - "loop_action_info": { - "action_taken": False, - "reply_text": "", - "command": "", - "taken_time": time.time(), - }, -} - - -install(extra_lines=3) - -# 注释:原来的动作修改超时常量已移除,因为改为顺序执行 - -logger = get_logger("hfc") # Logger Name Changed - - -class HeartFChatting: - """ - 管理一个连续的Focus Chat循环 - 用于在特定聊天流中生成回复。 - 其生命周期现在由其关联的 SubHeartflow 的 FOCUSED 状态控制。 - """ - - def __init__(self, session_id: str): - """ - HeartFChatting 初始化函数 - - 参数: - session_id: 聊天会话唯一标识符(如session_id) - on_stop_focus_chat: 当收到stop_focus_chat命令时调用的回调函数 - performance_version: 性能记录版本号,用于区分不同启动版本 - """ - # 基础属性 - self.session_id: str = session_id # 聊天会话ID - session = chat_manager.get_session_by_session_id(session_id) - if not session: - raise ValueError(f"未找到 session_id={session_id} 的聊天会话") - self.chat_session: BotChatSession = session - self.log_prefix = f"[{chat_manager.get_session_name(self.session_id) or self.session_id}]" - - self.expression_learner = expression_learner_manager.get_expression_learner(self.session_id) - - self.action_manager = ActionManager() - self.action_planner = ActionPlanner(chat_id=self.session_id, action_manager=self.action_manager) - self.action_modifier = ActionModifier(action_manager=self.action_manager, chat_id=self.session_id) - - # 循环控制内部状态 - self.running: bool = False - self._loop_task: Optional[asyncio.Task] = None # 主循环任务 - - # 添加循环信息管理相关的属性 - self.history_loop: List[CycleDetail] = [] - self._cycle_counter = 0 - self._current_cycle_detail: CycleDetail = None # type: ignore - - self.last_read_time = time.time() - 2 - - self.is_mute = False - - self.last_active_time = time.time() # 记录上一次非noreply时间 - - self.question_probability_multiplier = 1 - self.questioned = False - - # 跟踪连续 no_reply 次数,用于动态调整阈值 - self.consecutive_no_reply_count = 0 - - # 聊天内容概括器 - self.chat_history_summarizer = ChatHistorySummarizer(session_id=self.session_id) - - async def start(self): - """检查是否需要启动主循环,如果未激活则启动。""" - - # 如果循环已经激活,直接返回 - if self.running: - logger.debug(f"{self.log_prefix} HeartFChatting 已激活,无需重复启动") - return - - try: - # 标记为活动状态,防止重复启动 - self.running = True - - self._loop_task = asyncio.create_task(self._main_chat_loop()) - self._loop_task.add_done_callback(self._handle_loop_completion) - - # 启动聊天内容概括器的后台定期检查循环 - await self.chat_history_summarizer.start() - - logger.info(f"{self.log_prefix} HeartFChatting 启动完成") - - except Exception as e: - # 启动失败时重置状态 - self.running = False - self._loop_task = None - logger.error(f"{self.log_prefix} HeartFChatting 启动失败: {e}") - raise - - def _handle_loop_completion(self, task: asyncio.Task): - """当 _hfc_loop 任务完成时执行的回调。""" - try: - if exception := task.exception(): - logger.error(f"{self.log_prefix} HeartFChatting: 脱离了聊天(异常): {exception}") - logger.error(traceback.format_exc()) # Log full traceback for exceptions - else: - logger.info(f"{self.log_prefix} HeartFChatting: 脱离了聊天 (外部停止)") - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} HeartFChatting: 结束了聊天") - - def start_cycle(self) -> Tuple[Dict[str, float], str]: - self._cycle_counter += 1 - self._current_cycle_detail = CycleDetail(cycle_id=self._cycle_counter) - self._current_cycle_detail.thinking_id = f"tid{str(round(time.time(), 2))}" - cycle_timers = {} - return cycle_timers, self._current_cycle_detail.thinking_id - - def end_cycle(self, loop_info, cycle_timers): - self._current_cycle_detail.set_loop_info(loop_info) - self.history_loop.append(self._current_cycle_detail) - self._current_cycle_detail.timers = cycle_timers - self._current_cycle_detail.end_time = time.time() - - def print_cycle_info(self, cycle_timers): - # 记录循环信息和计时器结果 - timer_strings = [] - for name, elapsed in cycle_timers.items(): - if elapsed < 0.1: - # 不显示小于0.1秒的计时器 - continue - formatted_time = f"{elapsed:.2f}秒" - timer_strings.append(f"{name}: {formatted_time}") - - logger.info( - f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," - f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒;" # type: ignore - + (f"详情: {'; '.join(timer_strings)}" if timer_strings else "") - ) - - async def _loopbody(self): - recent_messages_list = message_api.get_messages_by_time_in_chat( - chat_id=self.session_id, - start_time=self.last_read_time, - end_time=time.time(), - limit=20, - limit_mode="latest", - filter_mai=True, - filter_command=False, - filter_intercept_message_level=0, - ) - - # 根据连续 no_reply 次数动态调整阈值 - # 3次 no_reply 时,阈值调高到 1.5(50%概率为1,50%概率为2) - # 5次 no_reply 时,提高到 2(大于等于两条消息的阈值) - if self.consecutive_no_reply_count >= 5: - threshold = 2 - elif self.consecutive_no_reply_count >= 3: - # 1.5 的含义:50%概率为1,50%概率为2 - threshold = 2 if random.random() < 0.5 else 1 - else: - threshold = 1 - - if len(recent_messages_list) >= threshold: - # for message in recent_messages_list: - # print(message.processed_plain_text) - - self.last_read_time = time.time() - - # !此处使at或者提及必定回复 - mentioned_message = None - for message in recent_messages_list: - if (message.is_mentioned or message.is_at) and global_config.chat.mentioned_bot_reply: - mentioned_message = message - - # logger.info(f"{self.log_prefix} 当前talk_value: {TempMethods.get_talk_value(self.stream_id)}") - - # *控制频率用 - if mentioned_message: - await self._observe(recent_messages_list=recent_messages_list, force_reply_message=mentioned_message) - elif ( - random.random() - < TempMethodsHFC.get_talk_value(self.session_id) - * frequency_control_manager.get_or_create_frequency_control(self.session_id).get_talk_frequency_adjust() - ): - await self._observe(recent_messages_list=recent_messages_list) - else: - # 没有提到,继续保持沉默,等待5秒防止频繁触发 - await asyncio.sleep(10) - return True - else: - await asyncio.sleep(0.2) - return True - return True - - async def _send_and_store_reply( - self, - response_set: "ReplySetModel", - action_message: "DatabaseMessages", - cycle_timers: Dict[str, float], - thinking_id, - actions, - selected_expressions: Optional[List[int]] = None, - quote_message: Optional[bool] = None, - ) -> Tuple[Dict[str, Any], str, Dict[str, float]]: - with Timer("回复发送", cycle_timers): - reply_text = await self._send_response( - reply_set=response_set, - message_data=action_message, - selected_expressions=selected_expressions, - quote_message=quote_message, - ) - - # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 - platform = action_message.chat_info.platform - if platform is None: - platform = getattr(self.chat_stream, "platform", "unknown") - - person = Person(platform=platform, user_id=action_message.user_info.user_id) - person_name = person.person_name - action_prompt_display = f"你对{person_name}进行了回复:{reply_text}" - - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=False, - action_prompt_display=action_prompt_display, - action_done=True, - thinking_id=thinking_id, - action_data={"reply_text": reply_text}, - action_name="reply", - ) - - # 构建循环信息 - loop_info: Dict[str, Any] = { - "loop_plan_info": { - "action_result": actions, - }, - "loop_action_info": { - "action_taken": True, - "reply_text": reply_text, - "command": "", - "taken_time": time.time(), - }, - } - - return loop_info, reply_text, cycle_timers - - async def _observe( - self, # interest_value: float = 0.0, - recent_messages_list: Optional[List["DatabaseMessages"]] = None, - force_reply_message: Optional["DatabaseMessages"] = None, - ) -> bool: # sourcery skip: merge-else-if-into-elif, remove-redundant-if - if recent_messages_list is None: - recent_messages_list = [] - _reply_text = "" # 初始化reply_text变量,避免UnboundLocalError - - start_time = time.time() - async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): - # 通过 MessageRecorder 统一提取消息并分发给 expression_learner 和 jargon_miner - # 在 replyer 执行时触发,统一管理时间窗口,避免重复获取消息 - asyncio.create_task(extract_and_distribute_messages(self.session_id)) - - # 添加curious检测任务 - 检测聊天记录中的矛盾、冲突或需要提问的内容 - # asyncio.create_task(check_and_make_question(self.stream_id)) - # 添加聊天内容概括任务 - 累积、打包和压缩聊天记录 - # 注意:后台循环已在start()中启动,这里作为额外触发点,在有思考时立即处理 - # asyncio.create_task(self.chat_history_summarizer.process()) - - cycle_timers, thinking_id = self.start_cycle() - logger.info( - f"{self.log_prefix} 开始第{self._cycle_counter}次思考(频率: {TempMethodsHFC.get_talk_value(self.session_id)})" - ) - - # 第一步:动作检查 - available_actions: Dict[str, ActionInfo] = {} - try: - await self.action_modifier.modify_actions() - available_actions = self.action_manager.get_using_actions() - except Exception as e: - logger.error(f"{self.log_prefix} 动作修改失败: {e}") - - # 执行planner - is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() - - message_list_before_now = get_raw_msg_before_timestamp_with_chat( - chat_id=self.session_id, - timestamp=time.time(), - limit=int(global_config.chat.max_context_size * 0.6), - filter_intercept_message_level=1, - ) - chat_content_block, message_id_list = build_readable_messages_with_id( - messages=message_list_before_now, - timestamp_mode="normal_no_YMD", - read_mark=self.action_planner.last_obs_time_mark, - truncate=True, - show_actions=True, - ) - - prompt_info = await self.action_planner.build_planner_prompt( - is_group_chat=is_group_chat, - chat_target_info=chat_target_info, - current_available_actions=available_actions, - chat_content_block=chat_content_block, - message_id_list=message_id_list, - ) - continue_flag, modified_message = await events_manager.handle_mai_events( - EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id - ) - if not continue_flag: - return False - if modified_message and modified_message._modify_flags.modify_llm_prompt: - prompt_info = (modified_message.llm_prompt, prompt_info[1]) - - with Timer("规划器", cycle_timers): - action_to_use_info = await self.action_planner.plan( - loop_start_time=self.last_read_time, - available_actions=available_actions, - force_reply_message=force_reply_message, - ) - - logger.info( - f"{self.log_prefix} 决定执行{len(action_to_use_info)}个动作: {' '.join([a.action_type for a in action_to_use_info])}" - ) - - # 3. 并行执行所有动作 - action_tasks = [ - asyncio.create_task( - self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers) - ) - for action in action_to_use_info - ] - - # 并行执行所有任务 - results = await asyncio.gather(*action_tasks, return_exceptions=True) - - # 处理执行结果 - reply_loop_info = None - reply_text_from_reply = "" - action_success = False - action_reply_text = "" - - excute_result_str = "" - for result in results: - excute_result_str += f"{result['action_type']} 执行结果:{result['result']}\n" - - if isinstance(result, BaseException): - logger.error(f"{self.log_prefix} 动作执行异常: {result}") - continue - - if result["action_type"] != "reply": - action_success = result["success"] - action_reply_text = result["result"] - elif result["action_type"] == "reply": - if result["success"]: - reply_loop_info = result["loop_info"] - reply_text_from_reply = result["result"] - else: - logger.warning(f"{self.log_prefix} 回复动作执行失败") - - self.action_planner.add_plan_excute_log(result=excute_result_str) - - # 构建最终的循环信息 - if reply_loop_info: - # 如果有回复信息,使用回复的loop_info作为基础 - loop_info = reply_loop_info - # 更新动作执行信息 - loop_info["loop_action_info"].update( - { - "action_taken": action_success, - "taken_time": time.time(), - } - ) - _reply_text = reply_text_from_reply - else: - # 没有回复信息,构建纯动作的loop_info - loop_info = { - "loop_plan_info": { - "action_result": action_to_use_info, - }, - "loop_action_info": { - "action_taken": action_success, - "reply_text": action_reply_text, - "taken_time": time.time(), - }, - } - _reply_text = action_reply_text - - self.end_cycle(loop_info, cycle_timers) - self.print_cycle_info(cycle_timers) - - end_time = time.time() - if end_time - start_time < global_config.chat.planner_smooth: - wait_time = global_config.chat.planner_smooth - (end_time - start_time) - await asyncio.sleep(wait_time) - else: - await asyncio.sleep(0.1) - return True - - # async def _main_chat_loop(self): - # """主循环,持续进行计划并可能回复消息,直到被外部取消。""" - # try: - # while self.running: - # # 主循环 - # success = await self._loopbody() - # await asyncio.sleep(0.1) - # if not success: - # break - # except asyncio.CancelledError: - # # 设置了关闭标志位后被取消是正常流程 - # logger.info(f"{self.log_prefix} 麦麦已关闭聊天") - # except Exception: - # logger.error(f"{self.log_prefix} 麦麦聊天意外错误,将于3s后尝试重新启动") - # print(traceback.format_exc()) - # await asyncio.sleep(3) - # self._loop_task = asyncio.create_task(self._main_chat_loop()) - # logger.error(f"{self.log_prefix} 结束了当前聊天循环") - - async def _handle_action( - self, - action: str, - action_reasoning: str, - action_data: dict, - cycle_timers: Dict[str, float], - thinking_id: str, - action_message: Optional["DatabaseMessages"] = None, - ) -> tuple[bool, str, str]: - """ - 处理规划动作,使用动作工厂创建相应的动作处理器 - - 参数: - action: 动作类型 - action_reasoning: 决策理由 - action_data: 动作数据,包含不同动作需要的参数 - cycle_timers: 计时器字典 - thinking_id: 思考ID - action_message: 消息数据 - 返回: - tuple[bool, str, str]: (是否执行了动作, 思考消息ID, 命令) - """ - try: - # 使用工厂创建动作处理器实例 - try: - action_handler = self.action_manager.create_action( - action_name=action, - action_data=action_data, - cycle_timers=cycle_timers, - thinking_id=thinking_id, - chat_stream=self.chat_stream, - log_prefix=self.log_prefix, - action_reasoning=action_reasoning, - action_message=action_message, - ) - except Exception as e: - logger.error(f"{self.log_prefix} 创建动作处理器时出错: {e}") - traceback.print_exc() - return False, "" - - # 处理动作并获取结果(固定记录一次动作信息) - result = await action_handler.execute() - success, action_text = result - - return success, action_text - - except Exception as e: - logger.error(f"{self.log_prefix} 处理{action}时出错: {e}") - traceback.print_exc() - return False, "" - - async def _send_response( - self, - reply_set: "ReplySetModel", - message_data: "DatabaseMessages", - selected_expressions: Optional[List[int]] = None, - quote_message: Optional[bool] = None, - ) -> str: - # 根据 llm_quote 配置决定是否使用 quote_message 参数 - if global_config.chat.llm_quote: - # 如果配置为 true,使用 llm_quote 参数决定是否引用回复 - if quote_message is None: - logger.warning(f"{self.log_prefix} quote_message 参数为空,不引用") - need_reply = False - else: - need_reply = quote_message - if need_reply: - logger.info(f"{self.log_prefix} LLM 决定使用引用回复") - else: - # 如果配置为 false,使用原来的模式 - new_message_count = message_api.count_new_messages( - chat_id=self.chat_stream.stream_id, start_time=self.last_read_time, end_time=time.time() - ) - need_reply = new_message_count >= random.randint(2, 3) or time.time() - self.last_read_time > 90 - if need_reply: - logger.info( - f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复,或者上次回复时间超过90秒" - ) - - reply_text = "" - first_replied = False - for reply_content in reply_set.reply_data: - if reply_content.content_type != ReplyContentType.TEXT: - continue - data: str = reply_content.content # type: ignore - if not first_replied: - await send_api.text_to_stream( - text=data, - stream_id=self.chat_stream.stream_id, - reply_message=message_data, - set_reply=need_reply, - typing=False, - selected_expressions=selected_expressions, - ) - first_replied = True - else: - await send_api.text_to_stream( - text=data, - stream_id=self.chat_stream.stream_id, - reply_message=message_data, - set_reply=False, - typing=True, - selected_expressions=selected_expressions, - ) - reply_text += data - - return reply_text - - async def _execute_action( - self, - action_planner_info: ActionPlannerInfo, - chosen_action_plan_infos: List[ActionPlannerInfo], - thinking_id: str, - available_actions: Dict[str, ActionInfo], - cycle_timers: Dict[str, float], - ): - """执行单个动作的通用函数""" - try: - with Timer(f"动作{action_planner_info.action_type}", cycle_timers): - # 直接当场执行no_reply逻辑 - if action_planner_info.action_type == "no_reply": - # 直接处理no_reply逻辑,不再通过动作系统 - reason = action_planner_info.reasoning or "选择不回复" - # logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") - - # 增加连续 no_reply 计数 - self.consecutive_no_reply_count += 1 - - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=False, - action_prompt_display=reason, - action_done=True, - thinking_id=thinking_id, - action_data={}, - action_name="no_reply", - action_reasoning=reason, - ) - - return {"action_type": "no_reply", "success": True, "result": "选择不回复", "command": ""} - - elif action_planner_info.action_type == "reply": - # 直接当场执行reply逻辑 - self.questioned = False - # 刷新主动发言状态 - # 重置连续 no_reply 计数 - self.consecutive_no_reply_count = 0 - - reason = action_planner_info.reasoning or "" - # 根据 think_mode 配置决定 think_level 的值 - think_mode = global_config.chat.think_mode - if think_mode == "default": - think_level = 0 - elif think_mode == "deep": - think_level = 1 - elif think_mode == "dynamic": - # dynamic 模式:从 planner 返回的 action_data 中获取 - think_level = action_planner_info.action_data.get("think_level", 1) - else: - # 默认使用 default 模式 - think_level = 0 - # 使用 action_reasoning(planner 的整体思考理由)作为 reply_reason - planner_reasoning = action_planner_info.action_reasoning or reason - - record_replyer_action_temp( - chat_id=self.session_id, - reason=reason, - think_level=think_level, - ) - - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=False, - action_prompt_display=reason, - action_done=True, - thinking_id=thinking_id, - action_data={}, - action_name="reply", - action_reasoning=reason, - ) - - # 从 Planner 的 action_data 中提取未知词语列表(仅在 reply 时使用) - unknown_words = None - quote_message = None - if isinstance(action_planner_info.action_data, dict): - uw = action_planner_info.action_data.get("unknown_words") - if isinstance(uw, list): - cleaned_uw: List[str] = [] - for item in uw: - if isinstance(item, str): - s = item.strip() - if s: - cleaned_uw.append(s) - if cleaned_uw: - unknown_words = cleaned_uw - - # 从 Planner 的 action_data 中提取 quote_message 参数 - qm = action_planner_info.action_data.get("quote") - if qm is not None: - # 支持多种格式:true/false, "true"/"false", 1/0 - if isinstance(qm, bool): - quote_message = qm - elif isinstance(qm, str): - quote_message = qm.lower() in ("true", "1", "yes") - elif isinstance(qm, (int, float)): - quote_message = bool(qm) - - logger.info(f"{self.log_prefix} {qm}引用回复设置: {quote_message}") - - success, llm_response = await generator_api.generate_reply( - chat_stream=self.chat_stream, - reply_message=action_planner_info.action_message, - available_actions=available_actions, - chosen_actions=chosen_action_plan_infos, - reply_reason=planner_reasoning, - unknown_words=unknown_words, - enable_tool=global_config.tool.enable_tool, - request_type="replyer", - from_plugin=False, - reply_time_point=action_planner_info.action_data.get("loop_start_time", time.time()), - think_level=think_level, - ) - - if not success or not llm_response or not llm_response.reply_set: - if action_planner_info.action_message: - logger.info(f"对 {action_planner_info.action_message.processed_plain_text} 的回复生成失败") - else: - logger.info("回复生成失败") - return {"action_type": "reply", "success": False, "result": "回复生成失败", "loop_info": None} - - response_set = llm_response.reply_set - selected_expressions = llm_response.selected_expressions - loop_info, reply_text, _ = await self._send_and_store_reply( - response_set=response_set, - action_message=action_planner_info.action_message, # type: ignore - cycle_timers=cycle_timers, - thinking_id=thinking_id, - actions=chosen_action_plan_infos, - selected_expressions=selected_expressions, - quote_message=quote_message, - ) - self.last_active_time = time.time() - return { - "action_type": "reply", - "success": True, - "result": f"你使用reply动作,对' {action_planner_info.action_message.processed_plain_text} '这句话进行了回复,回复内容为: '{reply_text}'", - "loop_info": loop_info, - } - - else: - # 执行普通动作 - with Timer("动作执行", cycle_timers): - success, result = await self._handle_action( - action=action_planner_info.action_type, - action_reasoning=action_planner_info.action_reasoning or "", - action_data=action_planner_info.action_data or {}, - cycle_timers=cycle_timers, - thinking_id=thinking_id, - action_message=action_planner_info.action_message, - ) - - self.last_active_time = time.time() - return { - "action_type": action_planner_info.action_type, - "success": success, - "result": result, - } - - except Exception as e: - logger.error(f"{self.log_prefix} 执行动作时出错: {e}") - logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}") - return { - "action_type": action_planner_info.action_type, - "success": False, - "result": "", - "loop_info": None, - "error": str(e), - } - - -class TempMethodsHFC: - @staticmethod - def get_talk_value(chat_id: Optional[str]) -> float: - result = global_config.chat.talk_value or 0.0000001 - if not global_config.chat.enable_talk_value_rules or not global_config.chat.talk_value_rules: - return result - import time - - local_time = time.localtime() - now_min = local_time.tm_hour * 60 + local_time.tm_min - # 先处理特定规则 - if chat_id: - for rule in global_config.chat.talk_value_rules: - if not rule.platform and not rule.item_id: - continue # 一起留空表示全局,跳过 - is_group = rule.rule_type == "group" - from src.chat.message_receive.chat_stream import get_chat_manager - - stream_id = get_chat_manager().get_stream_id(rule.platform, str(rule.item_id), is_group) - if stream_id != chat_id: - continue - parsed_range = TempMethodsHFC._parse_range(rule.time) - if not parsed_range: - continue - start_min, end_min = parsed_range - in_range: bool = False - if start_min <= end_min: - in_range = start_min <= now_min <= end_min - else: - in_range = now_min >= start_min or now_min <= end_min - if in_range: - return rule.value or 0.0 - # 再处理全局规则 - for rule in global_config.chat.talk_value_rules: - if rule.platform or rule.item_id: - continue # 有指定表示特定,跳过 - parsed_range = TempMethodsHFC._parse_range(rule.time) - if not parsed_range: - continue - start_min, end_min = parsed_range - in_range: bool = False - if start_min <= end_min: - in_range = start_min <= now_min <= end_min - else: - in_range = now_min >= start_min or now_min <= end_min - if in_range: - return rule.value or 0.0000001 - return result - - @staticmethod - def _parse_range(range_str: str) -> Optional[tuple[int, int]]: - """解析 "HH:MM-HH:MM" 到 (start_min, end_min)。""" - try: - start_str, end_str = [s.strip() for s in range_str.split("-")] - sh, sm = [int(x) for x in start_str.split(":")] - eh, em = [int(x) for x in end_str.split(":")] - return sh * 60 + sm, eh * 60 + em - except Exception: - return None diff --git a/src/chat/planner_actions/action_manager.py b/src/chat/planner_actions/action_manager.py index a6ffec2b..167cdcab 100644 --- a/src/chat/planner_actions/action_manager.py +++ b/src/chat/planner_actions/action_manager.py @@ -1,8 +1,8 @@ from typing import Dict, Optional, Tuple from src.chat.message_receive.chat_manager import BotChatSession +from src.chat.message_receive.message import SessionMessage from src.common.logger import get_logger -from src.common.data_models.database_data_model import DatabaseMessages from src.core.component_registry import component_registry, ActionExecutor from src.core.types import ActionInfo @@ -52,7 +52,7 @@ class ActionManager: chat_stream: BotChatSession, log_prefix: str, shutting_down: bool = False, - action_message: Optional[DatabaseMessages] = None, + action_message: Optional[SessionMessage] = None, ) -> Optional[ActionHandle]: """ 创建动作执行句柄 diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index b6b68fa3..a2ba4b9d 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -24,13 +24,13 @@ from src.services.message_service import ( from src.chat.utils.utils import get_chat_type_and_target_info, is_bot_self from src.chat.planner_actions.action_manager import ActionManager from src.chat.message_receive.chat_manager import chat_manager as _chat_manager +from src.chat.message_receive.message import SessionMessage from src.core.types import ActionActivationType, ActionInfo, ComponentType from src.core.component_registry import component_registry from src.person_info.person_info import Person if TYPE_CHECKING: from src.common.data_models.info_data_model import TargetPersonInfo - from src.common.data_models.database_data_model import DatabaseMessages logger = get_logger("planner") @@ -56,8 +56,8 @@ class ActionPlanner: self.unknown_words_cache_limit = 10 def find_message_by_id( - self, message_id: str, message_id_list: List[Tuple[str, "DatabaseMessages"]] - ) -> Optional["DatabaseMessages"]: + self, message_id: str, message_id_list: List[Tuple[str, "SessionMessage"]] + ) -> Optional["SessionMessage"]: # sourcery skip: use-next """ 根据message_id从message_id_list中查找对应的原始消息 @@ -75,7 +75,7 @@ class ActionPlanner: return None def _replace_message_ids_with_text( - self, text: Optional[str], message_id_list: List[Tuple[str, "DatabaseMessages"]] + self, text: Optional[str], message_id_list: List[Tuple[str, "SessionMessage"]] ) -> Optional[str]: """将文本中的 m+数字 消息ID替换为原消息内容,并添加双引号""" if not text: @@ -122,13 +122,7 @@ class ActionPlanner: msg_text = re.sub(pic_pattern, replace_pic_id, msg_text) # 替换用户引用格式:回复 和 @ - platform = ( - getattr(message, "user_info", None) - and message.user_info.platform - or getattr(message, "chat_info", None) - and message.chat_info.platform - or "qq" - ) + platform = message.platform or "qq" msg_text = replace_user_references(msg_text, platform, replace_bot_name=True) # 替换单独的 <用户名:用户ID> 格式(replace_user_references 已处理回复<和@<格式) @@ -160,7 +154,7 @@ class ActionPlanner: def _parse_single_action( self, action_json: dict, - message_id_list: List[Tuple[str, "DatabaseMessages"]], + message_id_list: List[Tuple[str, "SessionMessage"]], current_available_actions: List[Tuple[str, ActionInfo]], extracted_reasoning: str = "", ) -> List[ActionPlannerInfo]: @@ -245,10 +239,10 @@ class ActionPlanner: return action_planner_infos - def _is_message_from_self(self, message: "DatabaseMessages") -> bool: + def _is_message_from_self(self, message: "SessionMessage") -> bool: """判断消息是否由机器人自身发送(支持多平台,包括 WebUI)""" try: - return is_bot_self(message.user_info.platform or "", str(message.user_info.user_id)) + return is_bot_self(message.platform or "", str(message.message_info.user_info.user_id)) except AttributeError: logger.warning(f"{self.log_prefix}检测消息发送者失败,缺少必要字段") return False @@ -380,7 +374,7 @@ class ActionPlanner: self, available_actions: Dict[str, ActionInfo], loop_start_time: float = 0.0, - force_reply_message: Optional["DatabaseMessages"] = None, + force_reply_message: Optional["SessionMessage"] = None, ) -> List[ActionPlannerInfo]: # sourcery skip: use-named-expression """ @@ -395,7 +389,7 @@ class ActionPlanner: limit=int(global_config.chat.max_context_size * 0.6), filter_intercept_message_level=1, ) - message_id_list: list[Tuple[str, "DatabaseMessages"]] = [] + message_id_list: list[Tuple[str, "SessionMessage"]] = [] chat_content_block, message_id_list = build_readable_messages_with_id( messages=message_list_before_now, timestamp_mode="normal_no_YMD", @@ -552,10 +546,10 @@ class ActionPlanner: is_group_chat: bool, chat_target_info: Optional["TargetPersonInfo"], current_available_actions: Dict[str, ActionInfo], - message_id_list: List[Tuple[str, "DatabaseMessages"]], + message_id_list: List[Tuple[str, "SessionMessage"]], chat_content_block: str = "", interest: str = "", - ) -> tuple[str, List[Tuple[str, "DatabaseMessages"]]]: + ) -> tuple[str, List[Tuple[str, "SessionMessage"]]]: """构建 Planner LLM 的提示词 (获取模板并填充数据)""" try: actions_before_now_block = self.get_plan_log_str() @@ -710,7 +704,7 @@ class ActionPlanner: async def _execute_main_planner( self, prompt: str, - message_id_list: List[Tuple[str, "DatabaseMessages"]], + message_id_list: List[Tuple[str, "SessionMessage"]], filtered_actions: Dict[str, ActionInfo], available_actions: Dict[str, ActionInfo], loop_start_time: float,