From a39ccedb9a1ac3332f56801dcd7c06a77540b9d1 Mon Sep 17 00:00:00 2001 From: UnCLAS-Prommer Date: Sat, 28 Feb 2026 21:14:46 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BA=94=E8=A6=81=E6=B1=82=E6=8F=90=E4=BA=A4?= =?UTF-8?q?=E4=B8=8A=E6=9C=AA=E5=AE=8C=E6=88=90=E7=9A=84HFC,=20expression?= =?UTF-8?q?=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bw_learner/expression_reflect_tracker.py | 60 ++ src/bw_learner/expression_reflector.py | 316 +++---- src/chat/heart_flow/heartFC_chat.py | 892 ++++--------------- src/chat/heart_flow/hfc_utils.py | 15 +- src/chat/message_receive/__init__.py | 9 +- src/chat/message_receive/bot.py | 211 ++--- 6 files changed, 423 insertions(+), 1080 deletions(-) create mode 100644 src/bw_learner/expression_reflect_tracker.py diff --git a/src/bw_learner/expression_reflect_tracker.py b/src/bw_learner/expression_reflect_tracker.py new file mode 100644 index 00000000..1ab56d3f --- /dev/null +++ b/src/bw_learner/expression_reflect_tracker.py @@ -0,0 +1,60 @@ +from typing import TYPE_CHECKING, Optional + +import time + +from src.common.logger import get_logger +from src.common.database.database import get_db_session +from src.llm_models.utils_model import LLMRequest +from src.config.config import model_config + +if TYPE_CHECKING: + from src.common.data_models.expression_data_model import MaiExpression + +# TODO: 这个LLMRequest实例被更优雅的方式替换掉 +judge_model = LLMRequest(model_set=model_config.model_task_config.tool_use, request_type="reflect.tracker") + +logger = get_logger("reflect_tracker") + +class ReflectTracker: + def __init__(self, session_id: str): + self.session_id = session_id + self.last_check_msg_count = 0 + self.max_msg_count = 30 + self.max_duration = 15 * 60 # 15 分钟 + self.expression: Optional["MaiExpression"] = None # 当前正在追踪的表达,由外部设置 + + # 运行状态 + self.tracking = False + self.tracking_start_time: float = 0.0 + + def register_expression_and_track(self, expression: "MaiExpression"): + """注册需要追踪的表达""" + if self.tracking: + raise RuntimeError("ReflectTracker is already tracking an expression.") + self.expression = expression + self.tracking = True + self.tracking_start_time = time.time() + + def _reset_tracker(self): + """重置追踪状态""" + self.expression = None + self.tracking = False + self.last_check_msg_count = 0 + + async def trigger_tracker(self) -> bool: + """ + 触发追踪检查 + + Returns: + return (bool): 如果返回True,表示追踪完成,Tracker运行结束(运行状态置为`False`);如果返回False,表示继续追踪 + """ + # 对于没有正在追踪的表达,直接返回False + if not self.tracking: + return False + + # 检查是否超时(无论是消息数量还是时间) + if time.time() - self.tracking_start_time > self.max_duration: + self._reset_tracker() + return True + + # TODO: 完成追踪检查逻辑 \ No newline at end of file diff --git a/src/bw_learner/expression_reflector.py b/src/bw_learner/expression_reflector.py index d1902f55..9c45f22c 100644 --- a/src/bw_learner/expression_reflector.py +++ b/src/bw_learner/expression_reflector.py @@ -1,22 +1,35 @@ +from rich.traceback import install +from sqlmodel import select +from typing import TYPE_CHECKING + import random import time -from typing import Optional, Dict from src.common.logger import get_logger -from src.common.database.database_model import Expression from src.config.config import global_config -from src.chat.message_receive.chat_stream import get_chat_manager -from src.plugin_system.apis import send_api +from src.common.database.database_model import Expression +from src.common.database.database import get_db_session +from src.common.data_models.expression_data_model import MaiExpression +from src.common.utils.utils_session import SessionUtils +from .expression_reflect_tracker import ReflectTracker + +if TYPE_CHECKING: + from src.config.official_configs import TargetItem logger = get_logger("expression_reflector") +install(extra_lines=3) + +LOG_PREFIX = "[Expression Reflector]" + class ExpressionReflector: - """表达反思器,管理单个聊天流的表达反思提问""" + """表达反思器,管理单个聊天流的表达反思提问,使用每个session_id独立的实例""" - def __init__(self, chat_id: str): - self.chat_id = chat_id - self.last_ask_time: float = 0.0 + def __init__(self, session_id: str): + self.session_id = session_id + self.last_ask_time: float = time.time() + self.reflect_tracker: ReflectTracker = ReflectTracker(session_id) async def check_and_ask(self) -> bool: """ @@ -25,226 +38,99 @@ class ExpressionReflector: Returns: bool: 是否执行了提问 """ - try: - logger.debug(f"[Expression Reflection] 开始检查是否需要提问 (stream_id: {self.chat_id})") - - if not global_config.expression.expression_manual_reflect: - logger.debug("[Expression Reflection] 表达反思功能未启用,跳过") - return False - - operator_config = global_config.expression.manual_reflect_operator_id - if not operator_config: - logger.debug("[Expression Reflection] Operator ID 未配置,跳过") - return False - - # 检查是否在允许列表中 - allow_reflect = global_config.expression.allow_reflect - if allow_reflect: - # 将 allow_reflect 中的 platform:id:type 格式转换为 chat_id 列表 - allow_reflect_chat_ids = [] - for stream_config in allow_reflect: - parsed_chat_id = global_config.expression._parse_stream_config_to_chat_id(stream_config) - if parsed_chat_id: - allow_reflect_chat_ids.append(parsed_chat_id) - else: - logger.warning(f"[Expression Reflection] 无法解析 allow_reflect 配置项: {stream_config}") - - if self.chat_id not in allow_reflect_chat_ids: - logger.info(f"[Expression Reflection] 当前聊天流 {self.chat_id} 不在允许列表中,跳过") - return False - - # 检查上一次提问时间 - current_time = time.time() - time_since_last_ask = current_time - self.last_ask_time - - # 5-10分钟间隔,随机选择 - min_interval = 10 * 60 # 5分钟 - max_interval = 15 * 60 # 10分钟 - interval = random.uniform(min_interval, max_interval) - - logger.info( - f"[Expression Reflection] 上次提问时间: {self.last_ask_time:.2f}, 当前时间: {current_time:.2f}, 已过时间: {time_since_last_ask:.2f}秒 ({time_since_last_ask / 60:.2f}分钟), 需要间隔: {interval:.2f}秒 ({interval / 60:.2f}分钟)" - ) - - if time_since_last_ask < interval: - remaining_time = interval - time_since_last_ask - logger.info( - f"[Expression Reflection] 距离上次提问时间不足,还需等待 {remaining_time:.2f}秒 ({remaining_time / 60:.2f}分钟),跳过" - ) - return False - - # 检查是否已经有针对该 Operator 的 Tracker 在运行 - logger.info(f"[Expression Reflection] 检查 Operator {operator_config} 是否已有活跃的 Tracker") - if await _check_tracker_exists(operator_config): - logger.info(f"[Expression Reflection] Operator {operator_config} 已有活跃的 Tracker,跳过本次提问") - return False - - # 获取未检查的表达 - try: - logger.info("[Expression Reflection] 查询未检查且未拒绝的表达") - expressions = Expression.select().where((~Expression.checked) & (~Expression.rejected)).limit(50) - - expr_list = list(expressions) - logger.info(f"[Expression Reflection] 找到 {len(expr_list)} 个候选表达") - - if not expr_list: - logger.info("[Expression Reflection] 没有可用的表达,跳过") - return False - - target_expr: Expression = random.choice(expr_list) - logger.info( - f"[Expression Reflection] 随机选择了表达 ID: {target_expr.id}, Situation: {target_expr.situation}, Style: {target_expr.style}" - ) - - # 生成询问文本 - ask_text = _generate_ask_text(target_expr) - if not ask_text: - logger.warning("[Expression Reflection] 生成询问文本失败,跳过") - return False - - logger.info(f"[Expression Reflection] 准备向 Operator {operator_config} 发送提问") - # 发送给 Operator - await _send_to_operator(operator_config, ask_text, target_expr) - - # 更新上一次提问时间 - self.last_ask_time = current_time - logger.info(f"[Expression Reflection] 提问成功,已更新上次提问时间为 {current_time:.2f}") - - return True - - except Exception as e: - logger.error(f"[Expression Reflection] 检查或提问过程中出错: {e}") - import traceback - - logger.error(traceback.format_exc()) - return False - except Exception as e: - logger.error(f"[Expression Reflection] 检查或提问过程中出错: {e}") - import traceback - - logger.error(traceback.format_exc()) + if not await self.check_need_ask(): return False - -class ExpressionReflectorManager: - """表达反思管理器,管理多个聊天流的表达反思实例""" - - def __init__(self): - self.reflectors: Dict[str, ExpressionReflector] = {} - - def get_or_create_reflector(self, chat_id: str) -> ExpressionReflector: - """获取或创建指定聊天流的表达反思实例""" - if chat_id not in self.reflectors: - self.reflectors[chat_id] = ExpressionReflector(chat_id) - return self.reflectors[chat_id] - - -# 创建全局实例 -expression_reflector_manager = ExpressionReflectorManager() - - -async def _check_tracker_exists(operator_config: str) -> bool: - """检查指定 Operator 是否已有活跃的 Tracker""" - from src.bw_learner.reflect_tracker import reflect_tracker_manager - - chat_manager = get_chat_manager() - chat_stream = None - - # 尝试解析配置字符串 "platform:id:type" - parts = operator_config.split(":") - if len(parts) == 3: - platform = parts[0] - id_str = parts[1] - stream_type = parts[2] - - user_info = None - group_info = None - - from maim_message import UserInfo, GroupInfo - - if stream_type == "group": - group_info = GroupInfo(group_id=id_str, platform=platform) - user_info = UserInfo(user_id="system", user_nickname="System", platform=platform) - elif stream_type == "private": - user_info = UserInfo(user_id=id_str, platform=platform, user_nickname="Operator") - else: + operator_config = global_config.expression.manual_reflect_operator_id + if not operator_config: + logger.debug(f"{LOG_PREFIX} Operator ID 未配置,跳过") return False - if user_info: - try: - chat_stream = await chat_manager.get_or_create_stream(platform, user_info, group_info) - except Exception as e: - logger.error(f"Failed to get or create chat stream for checking tracker: {e}") - return False - else: - chat_stream = chat_manager.get_stream(operator_config) - - if not chat_stream: + if await self.ask_reflection(operator_config): + self.last_ask_time = time.time() + return True return False - return reflect_tracker_manager.get_tracker(chat_stream.stream_id) is not None + async def check_need_ask(self) -> bool: + """ + 检查是否需要提问表达反思 + Returns: + bool: 是否执行了提问 + """ + if not global_config.expression.expression_manual_reflect: + logger.debug(f"{LOG_PREFIX} 表达反思功能未启用,跳过") + return False + logger.debug(f"{LOG_PREFIX} 开始检查是否需要提问 (session_id: {self.session_id})") + operator_config = global_config.expression.manual_reflect_operator_id + if not operator_config: + logger.debug(f"{LOG_PREFIX} Operator ID 未配置,跳过") + return False + + if allow_reflect_list := global_config.expression.allow_reflect: + # 转换配置项为session_id列表 + allow_reflect_session_ids = [ + self._parse_config_item_2_session_id(stream_config) for stream_config in allow_reflect_list + ] + if self.session_id not in allow_reflect_session_ids: + logger.info(f"{LOG_PREFIX} 当前聊天流 {self.session_id} 不在允许列表中,跳过") + return False + + # 检查上一次提问时间 + current_time = time.time() + time_since_last_ask = current_time - self.last_ask_time + + # 随机选择10-15分钟间隔 + ask_interval = random.uniform(10 * 60, 15 * 60) + if time_since_last_ask < ask_interval: + logger.info( + f"{LOG_PREFIX} 距离上次提问时间 {time_since_last_ask:.2f} 秒,未达到随机间隔 {ask_interval:.2f} 秒,跳过" + ) + return False + + if self.reflect_tracker.tracking: + logger.info(f"{LOG_PREFIX} Operator {operator_config} 已有活跃的 Tracker,跳过本次提问") + return False + return True + + async def ask_reflection(self, operator_config: "TargetItem") -> bool: + """执行提问表达反思的操作""" + # 选取未检查过的表达 + logger.info(f"{LOG_PREFIX} 查询未检查且未拒绝的表达") + try: + with get_db_session() as session: + statement = select(Expression).filter_by(checked=False, rejected=False).limit(50) + results = session.exec(statement).all() + if not results: + logger.info(f"{LOG_PREFIX} 未找到未检查且未拒绝的表达") + return False + logger.info(f"{LOG_PREFIX} 找到 {len(results)} 个未检查且未拒绝的表达") + + except Exception as selected_expression: + logger.error(f"{LOG_PREFIX} 查询表达时发生错误: {selected_expression}") + return False + + # 随机选取一个表达进行提问 + selected_expression = MaiExpression.from_db_instance(random.choice(results)) + item_id = selected_expression.item_id + situation = selected_expression.situation + style = selected_expression.style + logger.info(f"{LOG_PREFIX} 随机选择了表达 ID: {item_id}, Situation: {situation}, Style: {style}") -def _generate_ask_text(expr: Expression) -> Optional[str]: - try: ask_text = ( f"我正在学习新的表达方式,请帮我看看这个是否合适?\n\n" f"**学习到的表达信息**\n" - f"- 情景 (Situation): {expr.situation}\n" - f"- 风格 (Style): {expr.style}\n" + f"- 情景 (Situation): {situation}\n" + f"- 风格 (Style): {style}\n" ) - return ask_text - except Exception as e: - logger.error(f"Failed to generate ask text: {e}") - return None + # TODO: 在发送相关API重构完成后完成发送给operator的逻辑 -async def _send_to_operator(operator_config: str, text: str, expr: Expression): - chat_manager = get_chat_manager() - chat_stream = None + self.reflect_tracker.register_expression_and_track(selected_expression) + return True - # 尝试解析配置字符串 "platform:id:type" - parts = operator_config.split(":") - if len(parts) == 3: - platform = parts[0] - id_str = parts[1] - stream_type = parts[2] - - user_info = None - group_info = None - - from maim_message import UserInfo, GroupInfo - - if stream_type == "group": - group_info = GroupInfo(group_id=id_str, platform=platform) - user_info = UserInfo(user_id="system", user_nickname="System", platform=platform) - elif stream_type == "private": - user_info = UserInfo(user_id=id_str, platform=platform, user_nickname="Operator") + def _parse_config_item_2_session_id(self, config_item: "TargetItem") -> str: + if config_item.rule_type == "group": + return SessionUtils.calculate_session_id(config_item.platform, group_id=str(config_item.item_id)) else: - logger.warning(f"Unknown stream type in operator config: {stream_type}") - return - - if user_info: - try: - chat_stream = await chat_manager.get_or_create_stream(platform, user_info, group_info) - except Exception as e: - logger.error(f"Failed to get or create chat stream for operator {operator_config}: {e}") - return - else: - chat_stream = chat_manager.get_stream(operator_config) - - if not chat_stream: - logger.warning(f"Could not find or create chat stream for operator: {operator_config}") - return - - stream_id = chat_stream.stream_id - - # 注册 Tracker - from src.bw_learner.reflect_tracker import ReflectTracker, reflect_tracker_manager - - tracker = ReflectTracker(chat_stream=chat_stream, expression=expr, created_time=time.time()) - reflect_tracker_manager.add_tracker(stream_id, tracker) - - # 发送消息 - await send_api.text_to_stream(text=text, stream_id=stream_id, typing=True) - logger.info(f"Sent expression reflect query to operator {operator_config} for expr {expr.id}") + return SessionUtils.calculate_session_id(config_item.platform, user_id=str(config_item.item_id)) diff --git a/src/chat/heart_flow/heartFC_chat.py b/src/chat/heart_flow/heartFC_chat.py index 2c2d790e..ace6b8b3 100644 --- a/src/chat/heart_flow/heartFC_chat.py +++ b/src/chat/heart_flow/heartFC_chat.py @@ -1,148 +1,169 @@ +from rich.traceback import install +from typing import Optional, List, TYPE_CHECKING + import asyncio import time import traceback import random -from typing import List, Optional, Dict, Any, Tuple, TYPE_CHECKING -from rich.traceback import install -from src.config.config import global_config from src.common.logger import get_logger -from src.common.data_models.info_data_model import ActionPlannerInfo -from src.common.data_models.message_data_model import ReplyContentType -from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager -from src.chat.utils.prompt_builder import global_prompt_manager -from src.chat.utils.timer_calculator import Timer -from src.chat.planner_actions.planner import ActionPlanner -from src.chat.planner_actions.action_modifier import ActionModifier -from src.chat.planner_actions.action_manager import ActionManager -from src.chat.heart_flow.hfc_utils import CycleDetail -from src.bw_learner.expression_learner import expression_learner_manager -from src.chat.heart_flow.frequency_control import frequency_control_manager -from src.bw_learner.reflect_tracker import reflect_tracker_manager -from src.bw_learner.expression_reflector import expression_reflector_manager -from src.bw_learner.message_recorder import extract_and_distribute_messages -from src.person_info.person_info import Person -from src.plugin_system.base.component_types import EventType, ActionInfo -from src.plugin_system.core import events_manager -from src.plugin_system.apis import generator_api, send_api, message_api, database_api -from src.chat.utils.chat_message_builder import ( - build_readable_messages_with_id, - get_raw_msg_before_timestamp_with_chat, -) -from src.chat.utils.utils import record_replyer_action_temp -from src.memory_system.chat_history_summarizer import ChatHistorySummarizer +from src.common.utils.utils_session import SessionUtils +from src.config.config import global_config +from src.chat.message_receive.chat_manager import chat_manager if TYPE_CHECKING: - from src.common.data_models.database_data_model import DatabaseMessages - from src.common.data_models.message_data_model import ReplySetModel + from src.chat.message_receive.message import SessionMessage +install(extra_lines=5) -ERROR_LOOP_INFO = { - "loop_plan_info": { - "action_result": { - "action_type": "error", - "action_data": {}, - "reasoning": "循环处理失败", - }, - }, - "loop_action_info": { - "action_taken": False, - "reply_text": "", - "command": "", - "taken_time": time.time(), - }, -} - - -install(extra_lines=3) - -# 注释:原来的动作修改超时常量已移除,因为改为顺序执行 - -logger = get_logger("hfc") # Logger Name Changed +logger = get_logger("heartFC_chat") class HeartFChatting: """ - 管理一个连续的Focus Chat循环 - 用于在特定聊天流中生成回复。 - 其生命周期现在由其关联的 SubHeartflow 的 FOCUSED 状态控制。 + 管理一个连续的Focus Chat聊天会话 + 用于在特定的聊天会话里面生成回复 """ - def __init__(self, chat_id: str): + def __init__(self, session_id: str): """ - HeartFChatting 初始化函数 + 初始化 HeartFChatting 实例 - 参数: - chat_id: 聊天流唯一标识符(如stream_id) - on_stop_focus_chat: 当收到stop_focus_chat命令时调用的回调函数 - performance_version: 性能记录版本号,用于区分不同启动版本 + Args: + session_id: 聊天会话ID """ # 基础属性 - self.stream_id: str = chat_id # 聊天流ID - self.chat_stream: ChatStream = get_chat_manager().get_stream(self.stream_id) # type: ignore - if not self.chat_stream: - raise ValueError(f"无法找到聊天流: {self.stream_id}") - self.log_prefix = f"[{get_chat_manager().get_stream_name(self.stream_id) or self.stream_id}]" + self.session_id = session_id + session_name = chat_manager.get_session_name(session_id) or session_id + self.log_prefix = f"[{session_name}]" - self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id) + # 系统运行状态 + self._running: bool = False + self._loop_task: Optional[asyncio.Task] = None + self._cycle_counter: int = 0 + self._hfc_lock: asyncio.Lock = asyncio.Lock() # 用于保护 _hfc_func 的并发访问 + # 聊天频率相关 + self._consecutive_no_reply_count = 0 # 跟踪连续 no_reply 次数,用于动态调整阈值 + self._talk_frequency_adjust: float = 1.0 # 发言频率修正值,默认为1.0,可以根据需要调整 - self.action_manager = ActionManager() - self.action_planner = ActionPlanner(chat_id=self.stream_id, action_manager=self.action_manager) - self.action_modifier = ActionModifier(action_manager=self.action_manager, chat_id=self.stream_id) + # HFC内消息缓存 + self.message_cache: List[SessionMessage] = [] - # 循环控制内部状态 - self.running: bool = False - self._loop_task: Optional[asyncio.Task] = None # 主循环任务 - - # 添加循环信息管理相关的属性 - self.history_loop: List[CycleDetail] = [] - self._cycle_counter = 0 - self._current_cycle_detail: CycleDetail = None # type: ignore - - self.last_read_time = time.time() - 2 - - self.is_mute = False - - self.last_active_time = time.time() # 记录上一次非noreply时间 - - self.question_probability_multiplier = 1 - self.questioned = False - - # 跟踪连续 no_reply 次数,用于动态调整阈值 - self.consecutive_no_reply_count = 0 - - # 聊天内容概括器 - self.chat_history_summarizer = ChatHistorySummarizer(chat_id=self.stream_id) + # Asyncio Event 用于控制循环的开始和结束 + self._cycle_event = asyncio.Event() async def start(self): - """检查是否需要启动主循环,如果未激活则启动。""" - - # 如果循环已经激活,直接返回 - if self.running: - logger.debug(f"{self.log_prefix} HeartFChatting 已激活,无需重复启动") + """启动 HeartFChatting 的主循环""" + # 先检查是否已经启动运行 + if self._running: + logger.debug(f"{self.log_prefix} 已经在运行中,无需重复启动") return try: - # 标记为活动状态,防止重复启动 - self.running = True + self._running = True + self._cycle_event.clear() # 确保事件初始状态为未设置 - self._loop_task = asyncio.create_task(self._main_chat_loop()) + self._loop_task = asyncio.create_task(self.main_loop()) self._loop_task.add_done_callback(self._handle_loop_completion) - # 启动聊天内容概括器的后台定期检查循环 - await self.chat_history_summarizer.start() - logger.info(f"{self.log_prefix} HeartFChatting 启动完成") - except Exception as e: - # 启动失败时重置状态 - self.running = False - self._loop_task = None - logger.error(f"{self.log_prefix} HeartFChatting 启动失败: {e}") + logger.error(f"{self.log_prefix} 启动 HeartFChatting 失败: {e}", exc_info=True) + self._running = False # 确保状态正确 + self._cycle_event.set() # 确保事件被设置,避免死锁 + self._loop_task = None # 确保任务引用被清理 raise + async def stop(self): + """停止 HeartFChatting 的主循环""" + if not self._running: + logger.debug(f"{self.log_prefix} HeartFChatting 已经停止,无需重复停止") + return + + self._running = False + self._cycle_event.set() # 触发事件,通知循环结束 + + if self._loop_task: + self._loop_task.cancel() # 取消主循环任务 + try: + await self._loop_task # 等待任务完成 + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} HeartFChatting 主循环已成功取消") + except Exception as e: + logger.error(f"{self.log_prefix} 停止 HeartFChatting 时发生错误: {e}", exc_info=True) + finally: + self._loop_task = None # 确保任务引用被清理 + + logger.info(f"{self.log_prefix} HeartFChatting 已停止") + + async def adjust_talk_frequency(self, new_value: float): + """调整发言频率的调整值 + + Args: + new_value: 新的修正值,必须为非负数。值越大,修正发言频率越高;值越小,修正发言频率越低。 + """ + self._talk_frequency_adjust = max(0.0, new_value) + + async def register_message(self, message: "SessionMessage"): + """注册一条消息到 HeartFChatting 的缓存中,并检测其是否产生提及,决定是否唤醒聊天 + + Args: + message: 待注册的消息对象 + """ + self.message_cache.append(message) + # 先检查at必回复 + if global_config.chat.inevitable_at_reply and message.is_at: + async with self._hfc_lock: # 确保与主循环逻辑的互斥访问 + await self._judge_and_response(message) + return # 直接返回,避免同一条消息被主循环再次处理 + # 再检查提及必回复 + if global_config.chat.mentioned_bot_reply and message.is_mentioned: + # 直接获取锁,确保一定一定触发回复逻辑,不受当前是否正在执行主循环的影响 + async with self._hfc_lock: # 确保与主循环逻辑的互斥访问 + await self._judge_and_response(message) + return + + async def main_loop(self): + try: + while self._running and not self._cycle_event.is_set(): + if not self._hfc_lock.locked(): + async with self._hfc_lock: # 确保主循环逻辑的互斥访问 + await self._hfc_func() + await asyncio.sleep(5) + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} HeartFChatting: 主循环被取消,正在关闭") + except Exception as e: + logger.error(f"{self.log_prefix} 麦麦聊天意外错误: {e},将于3s后尝试重新启动") + await self.stop() # 确保状态正确 + await asyncio.sleep(3) + await self.start() # 尝试重新启动 + + async def _hfc_func(self, mentioned_message: Optional["SessionMessage"] = None): + """心流聊天的主循环逻辑""" + if self._consecutive_no_reply_count >= 5: + threshold = 2 + elif self._consecutive_no_reply_count >= 3: + threshold = 2 if random.random() < 0.5 else 1 + else: + threshold = 1 + + if len(self.message_cache) < threshold: + await asyncio.sleep(0.2) + return True + + talk_value_threshold = random.random() * self._get_talk_value(self.session_id) * self._talk_frequency_adjust + if mentioned_message and global_config.chat.mentioned_bot_reply: + await self._judge_and_response(mentioned_message) + elif random.random() < talk_value_threshold: + await self._judge_and_response() + return True + + async def _judge_and_response(self, mentioned_message: Optional["SessionMessage"] = None): + """判定和生成回复""" + # TODO: 在expression和reflector重构完成后完成这里的逻辑 + def _handle_loop_completion(self, task: asyncio.Task): - """当 _hfc_loop 任务完成时执行的回调。""" + """当 _hfc_func 任务完成时执行的回调。""" try: if exception := task.exception(): logger.error(f"{self.log_prefix} HeartFChatting: 脱离了聊天(异常): {exception}") @@ -152,657 +173,54 @@ class HeartFChatting: except asyncio.CancelledError: logger.info(f"{self.log_prefix} HeartFChatting: 结束了聊天") - def start_cycle(self) -> Tuple[Dict[str, float], str]: - self._cycle_counter += 1 - self._current_cycle_detail = CycleDetail(self._cycle_counter) - self._current_cycle_detail.thinking_id = f"tid{str(round(time.time(), 2))}" - cycle_timers = {} - return cycle_timers, self._current_cycle_detail.thinking_id - - def end_cycle(self, loop_info, cycle_timers): - self._current_cycle_detail.set_loop_info(loop_info) - self.history_loop.append(self._current_cycle_detail) - self._current_cycle_detail.timers = cycle_timers - self._current_cycle_detail.end_time = time.time() - - def print_cycle_info(self, cycle_timers): - # 记录循环信息和计时器结果 - timer_strings = [] - for name, elapsed in cycle_timers.items(): - if elapsed < 0.1: - # 不显示小于0.1秒的计时器 - continue - formatted_time = f"{elapsed:.2f}秒" - timer_strings.append(f"{name}: {formatted_time}") - - logger.info( - f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," - f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒;" # type: ignore - + (f"详情: {'; '.join(timer_strings)}" if timer_strings else "") - ) - - async def _loopbody(self): - recent_messages_list = message_api.get_messages_by_time_in_chat( - chat_id=self.stream_id, - start_time=self.last_read_time, - end_time=time.time(), - limit=20, - limit_mode="latest", - filter_mai=True, - filter_command=False, - filter_intercept_message_level=0, - ) - - # 根据连续 no_reply 次数动态调整阈值 - # 3次 no_reply 时,阈值调高到 1.5(50%概率为1,50%概率为2) - # 5次 no_reply 时,提高到 2(大于等于两条消息的阈值) - if self.consecutive_no_reply_count >= 5: - threshold = 2 - elif self.consecutive_no_reply_count >= 3: - # 1.5 的含义:50%概率为1,50%概率为2 - threshold = 2 if random.random() < 0.5 else 1 - else: - threshold = 1 - - if len(recent_messages_list) >= threshold: - # for message in recent_messages_list: - # print(message.processed_plain_text) - - self.last_read_time = time.time() - - # !此处使at或者提及必定回复 - mentioned_message = None - for message in recent_messages_list: - if (message.is_mentioned or message.is_at) and global_config.chat.mentioned_bot_reply: - mentioned_message = message - - # logger.info(f"{self.log_prefix} 当前talk_value: {TempMethods.get_talk_value(self.stream_id)}") - - # *控制频率用 - if mentioned_message: - await self._observe(recent_messages_list=recent_messages_list, force_reply_message=mentioned_message) - elif ( - random.random() - < TempMethodsHFC.get_talk_value(self.stream_id) - * frequency_control_manager.get_or_create_frequency_control(self.stream_id).get_talk_frequency_adjust() - ): - await self._observe(recent_messages_list=recent_messages_list) - else: - # 没有提到,继续保持沉默,等待5秒防止频繁触发 - await asyncio.sleep(10) - return True - else: - await asyncio.sleep(0.2) - return True - return True - - async def _send_and_store_reply( - self, - response_set: "ReplySetModel", - action_message: "DatabaseMessages", - cycle_timers: Dict[str, float], - thinking_id, - actions, - selected_expressions: Optional[List[int]] = None, - quote_message: Optional[bool] = None, - ) -> Tuple[Dict[str, Any], str, Dict[str, float]]: - with Timer("回复发送", cycle_timers): - reply_text = await self._send_response( - reply_set=response_set, - message_data=action_message, - selected_expressions=selected_expressions, - quote_message=quote_message, - ) - - # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 - platform = action_message.chat_info.platform - if platform is None: - platform = getattr(self.chat_stream, "platform", "unknown") - - person = Person(platform=platform, user_id=action_message.user_info.user_id) - person_name = person.person_name - action_prompt_display = f"你对{person_name}进行了回复:{reply_text}" - - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=False, - action_prompt_display=action_prompt_display, - action_done=True, - thinking_id=thinking_id, - action_data={"reply_text": reply_text}, - action_name="reply", - ) - - # 构建循环信息 - loop_info: Dict[str, Any] = { - "loop_plan_info": { - "action_result": actions, - }, - "loop_action_info": { - "action_taken": True, - "reply_text": reply_text, - "command": "", - "taken_time": time.time(), - }, - } - - return loop_info, reply_text, cycle_timers - - async def _observe( - self, # interest_value: float = 0.0, - recent_messages_list: Optional[List["DatabaseMessages"]] = None, - force_reply_message: Optional["DatabaseMessages"] = None, - ) -> bool: # sourcery skip: merge-else-if-into-elif, remove-redundant-if - if recent_messages_list is None: - recent_messages_list = [] - _reply_text = "" # 初始化reply_text变量,避免UnboundLocalError - - # ------------------------------------------------------------------------- - # ReflectTracker Check - # 在每次回复前检查一次上下文,看是否有反思问题得到了解答 - # ------------------------------------------------------------------------- - - reflector = expression_reflector_manager.get_or_create_reflector(self.stream_id) - await reflector.check_and_ask() - tracker = reflect_tracker_manager.get_tracker(self.stream_id) - if tracker: - resolved = await tracker.trigger_tracker() - if resolved: - reflect_tracker_manager.remove_tracker(self.stream_id) - logger.info(f"{self.log_prefix} ReflectTracker resolved and removed.") - - start_time = time.time() - async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): - # 通过 MessageRecorder 统一提取消息并分发给 expression_learner 和 jargon_miner - # 在 replyer 执行时触发,统一管理时间窗口,避免重复获取消息 - asyncio.create_task(extract_and_distribute_messages(self.stream_id)) - - # 添加curious检测任务 - 检测聊天记录中的矛盾、冲突或需要提问的内容 - # asyncio.create_task(check_and_make_question(self.stream_id)) - # 添加聊天内容概括任务 - 累积、打包和压缩聊天记录 - # 注意:后台循环已在start()中启动,这里作为额外触发点,在有思考时立即处理 - # asyncio.create_task(self.chat_history_summarizer.process()) - - cycle_timers, thinking_id = self.start_cycle() - logger.info( - f"{self.log_prefix} 开始第{self._cycle_counter}次思考(频率: {TempMethodsHFC.get_talk_value(self.stream_id)})" - ) - - # 第一步:动作检查 - available_actions: Dict[str, ActionInfo] = {} - try: - await self.action_modifier.modify_actions() - available_actions = self.action_manager.get_using_actions() - except Exception as e: - logger.error(f"{self.log_prefix} 动作修改失败: {e}") - - # 执行planner - is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() - - message_list_before_now = get_raw_msg_before_timestamp_with_chat( - chat_id=self.stream_id, - timestamp=time.time(), - limit=int(global_config.chat.max_context_size * 0.6), - filter_intercept_message_level=1, - ) - chat_content_block, message_id_list = build_readable_messages_with_id( - messages=message_list_before_now, - timestamp_mode="normal_no_YMD", - read_mark=self.action_planner.last_obs_time_mark, - truncate=True, - show_actions=True, - ) - - prompt_info = await self.action_planner.build_planner_prompt( - is_group_chat=is_group_chat, - chat_target_info=chat_target_info, - current_available_actions=available_actions, - chat_content_block=chat_content_block, - message_id_list=message_id_list, - ) - continue_flag, modified_message = await events_manager.handle_mai_events( - EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id - ) - if not continue_flag: - return False - if modified_message and modified_message._modify_flags.modify_llm_prompt: - prompt_info = (modified_message.llm_prompt, prompt_info[1]) - - with Timer("规划器", cycle_timers): - action_to_use_info = await self.action_planner.plan( - loop_start_time=self.last_read_time, - available_actions=available_actions, - force_reply_message=force_reply_message, - ) - - logger.info( - f"{self.log_prefix} 决定执行{len(action_to_use_info)}个动作: {' '.join([a.action_type for a in action_to_use_info])}" - ) - - # 3. 并行执行所有动作 - action_tasks = [ - asyncio.create_task( - self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers) - ) - for action in action_to_use_info - ] - - # 并行执行所有任务 - results = await asyncio.gather(*action_tasks, return_exceptions=True) - - # 处理执行结果 - reply_loop_info = None - reply_text_from_reply = "" - action_success = False - action_reply_text = "" - - excute_result_str = "" - for result in results: - excute_result_str += f"{result['action_type']} 执行结果:{result['result']}\n" - - if isinstance(result, BaseException): - logger.error(f"{self.log_prefix} 动作执行异常: {result}") - continue - - if result["action_type"] != "reply": - action_success = result["success"] - action_reply_text = result["result"] - elif result["action_type"] == "reply": - if result["success"]: - reply_loop_info = result["loop_info"] - reply_text_from_reply = result["result"] - else: - logger.warning(f"{self.log_prefix} 回复动作执行失败") - - self.action_planner.add_plan_excute_log(result=excute_result_str) - - # 构建最终的循环信息 - if reply_loop_info: - # 如果有回复信息,使用回复的loop_info作为基础 - loop_info = reply_loop_info - # 更新动作执行信息 - loop_info["loop_action_info"].update( - { - "action_taken": action_success, - "taken_time": time.time(), - } - ) - _reply_text = reply_text_from_reply - else: - # 没有回复信息,构建纯动作的loop_info - loop_info = { - "loop_plan_info": { - "action_result": action_to_use_info, - }, - "loop_action_info": { - "action_taken": action_success, - "reply_text": action_reply_text, - "taken_time": time.time(), - }, - } - _reply_text = action_reply_text - - self.end_cycle(loop_info, cycle_timers) - self.print_cycle_info(cycle_timers) - - end_time = time.time() - if end_time - start_time < global_config.chat.planner_smooth: - wait_time = global_config.chat.planner_smooth - (end_time - start_time) - await asyncio.sleep(wait_time) - else: - await asyncio.sleep(0.1) - return True - - async def _main_chat_loop(self): - """主循环,持续进行计划并可能回复消息,直到被外部取消。""" - try: - while self.running: - # 主循环 - success = await self._loopbody() - await asyncio.sleep(0.1) - if not success: - break - except asyncio.CancelledError: - # 设置了关闭标志位后被取消是正常流程 - logger.info(f"{self.log_prefix} 麦麦已关闭聊天") - except Exception: - logger.error(f"{self.log_prefix} 麦麦聊天意外错误,将于3s后尝试重新启动") - print(traceback.format_exc()) - await asyncio.sleep(3) - self._loop_task = asyncio.create_task(self._main_chat_loop()) - logger.error(f"{self.log_prefix} 结束了当前聊天循环") - - async def _handle_action( - self, - action: str, - action_reasoning: str, - action_data: dict, - cycle_timers: Dict[str, float], - thinking_id: str, - action_message: Optional["DatabaseMessages"] = None, - ) -> tuple[bool, str, str]: - """ - 处理规划动作,使用动作工厂创建相应的动作处理器 - - 参数: - action: 动作类型 - action_reasoning: 决策理由 - action_data: 动作数据,包含不同动作需要的参数 - cycle_timers: 计时器字典 - thinking_id: 思考ID - action_message: 消息数据 - 返回: - tuple[bool, str, str]: (是否执行了动作, 思考消息ID, 命令) - """ - try: - # 使用工厂创建动作处理器实例 - try: - action_handler = self.action_manager.create_action( - action_name=action, - action_data=action_data, - cycle_timers=cycle_timers, - thinking_id=thinking_id, - chat_stream=self.chat_stream, - log_prefix=self.log_prefix, - action_reasoning=action_reasoning, - action_message=action_message, - ) - except Exception as e: - logger.error(f"{self.log_prefix} 创建动作处理器时出错: {e}") - traceback.print_exc() - return False, "" - - # 处理动作并获取结果(固定记录一次动作信息) - result = await action_handler.execute() - success, action_text = result - - return success, action_text - - except Exception as e: - logger.error(f"{self.log_prefix} 处理{action}时出错: {e}") - traceback.print_exc() - return False, "" - - async def _send_response( - self, - reply_set: "ReplySetModel", - message_data: "DatabaseMessages", - selected_expressions: Optional[List[int]] = None, - quote_message: Optional[bool] = None, - ) -> str: - # 根据 llm_quote 配置决定是否使用 quote_message 参数 - if global_config.chat.llm_quote: - # 如果配置为 true,使用 llm_quote 参数决定是否引用回复 - if quote_message is None: - logger.warning(f"{self.log_prefix} quote_message 参数为空,不引用") - need_reply = False - else: - need_reply = quote_message - if need_reply: - logger.info(f"{self.log_prefix} LLM 决定使用引用回复") - else: - # 如果配置为 false,使用原来的模式 - new_message_count = message_api.count_new_messages( - chat_id=self.chat_stream.stream_id, start_time=self.last_read_time, end_time=time.time() - ) - need_reply = new_message_count >= random.randint(2, 3) or time.time() - self.last_read_time > 90 - if need_reply: - logger.info( - f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复,或者上次回复时间超过90秒" - ) - - reply_text = "" - first_replied = False - for reply_content in reply_set.reply_data: - if reply_content.content_type != ReplyContentType.TEXT: - continue - data: str = reply_content.content # type: ignore - if not first_replied: - await send_api.text_to_stream( - text=data, - stream_id=self.chat_stream.stream_id, - reply_message=message_data, - set_reply=need_reply, - typing=False, - selected_expressions=selected_expressions, - ) - first_replied = True - else: - await send_api.text_to_stream( - text=data, - stream_id=self.chat_stream.stream_id, - reply_message=message_data, - set_reply=False, - typing=True, - selected_expressions=selected_expressions, - ) - reply_text += data - - return reply_text - - async def _execute_action( - self, - action_planner_info: ActionPlannerInfo, - chosen_action_plan_infos: List[ActionPlannerInfo], - thinking_id: str, - available_actions: Dict[str, ActionInfo], - cycle_timers: Dict[str, float], - ): - """执行单个动作的通用函数""" - try: - with Timer(f"动作{action_planner_info.action_type}", cycle_timers): - # 直接当场执行no_reply逻辑 - if action_planner_info.action_type == "no_reply": - # 直接处理no_reply逻辑,不再通过动作系统 - reason = action_planner_info.reasoning or "选择不回复" - # logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") - - # 增加连续 no_reply 计数 - self.consecutive_no_reply_count += 1 - - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=False, - action_prompt_display=reason, - action_done=True, - thinking_id=thinking_id, - action_data={}, - action_name="no_reply", - action_reasoning=reason, - ) - - return {"action_type": "no_reply", "success": True, "result": "选择不回复", "command": ""} - - elif action_planner_info.action_type == "reply": - # 直接当场执行reply逻辑 - self.questioned = False - # 刷新主动发言状态 - # 重置连续 no_reply 计数 - self.consecutive_no_reply_count = 0 - - reason = action_planner_info.reasoning or "" - # 根据 think_mode 配置决定 think_level 的值 - think_mode = global_config.chat.think_mode - if think_mode == "default": - think_level = 0 - elif think_mode == "deep": - think_level = 1 - elif think_mode == "dynamic": - # dynamic 模式:从 planner 返回的 action_data 中获取 - think_level = action_planner_info.action_data.get("think_level", 1) - else: - # 默认使用 default 模式 - think_level = 0 - # 使用 action_reasoning(planner 的整体思考理由)作为 reply_reason - planner_reasoning = action_planner_info.action_reasoning or reason - - record_replyer_action_temp( - chat_id=self.stream_id, - reason=reason, - think_level=think_level, - ) - - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=False, - action_prompt_display=reason, - action_done=True, - thinking_id=thinking_id, - action_data={}, - action_name="reply", - action_reasoning=reason, - ) - - # 从 Planner 的 action_data 中提取未知词语列表(仅在 reply 时使用) - unknown_words = None - quote_message = None - if isinstance(action_planner_info.action_data, dict): - uw = action_planner_info.action_data.get("unknown_words") - if isinstance(uw, list): - cleaned_uw: List[str] = [] - for item in uw: - if isinstance(item, str): - s = item.strip() - if s: - cleaned_uw.append(s) - if cleaned_uw: - unknown_words = cleaned_uw - - # 从 Planner 的 action_data 中提取 quote_message 参数 - qm = action_planner_info.action_data.get("quote") - if qm is not None: - # 支持多种格式:true/false, "true"/"false", 1/0 - if isinstance(qm, bool): - quote_message = qm - elif isinstance(qm, str): - quote_message = qm.lower() in ("true", "1", "yes") - elif isinstance(qm, (int, float)): - quote_message = bool(qm) - - logger.info(f"{self.log_prefix} {qm}引用回复设置: {quote_message}") - - success, llm_response = await generator_api.generate_reply( - chat_stream=self.chat_stream, - reply_message=action_planner_info.action_message, - available_actions=available_actions, - chosen_actions=chosen_action_plan_infos, - reply_reason=planner_reasoning, - unknown_words=unknown_words, - enable_tool=global_config.tool.enable_tool, - request_type="replyer", - from_plugin=False, - reply_time_point=action_planner_info.action_data.get("loop_start_time", time.time()), - think_level=think_level, - ) - - if not success or not llm_response or not llm_response.reply_set: - if action_planner_info.action_message: - logger.info(f"对 {action_planner_info.action_message.processed_plain_text} 的回复生成失败") - else: - logger.info("回复生成失败") - return {"action_type": "reply", "success": False, "result": "回复生成失败", "loop_info": None} - - response_set = llm_response.reply_set - selected_expressions = llm_response.selected_expressions - loop_info, reply_text, _ = await self._send_and_store_reply( - response_set=response_set, - action_message=action_planner_info.action_message, # type: ignore - cycle_timers=cycle_timers, - thinking_id=thinking_id, - actions=chosen_action_plan_infos, - selected_expressions=selected_expressions, - quote_message=quote_message, - ) - self.last_active_time = time.time() - return { - "action_type": "reply", - "success": True, - "result": f"你使用reply动作,对' {action_planner_info.action_message.processed_plain_text} '这句话进行了回复,回复内容为: '{reply_text}'", - "loop_info": loop_info, - } - - else: - # 执行普通动作 - with Timer("动作执行", cycle_timers): - success, result = await self._handle_action( - action=action_planner_info.action_type, - action_reasoning=action_planner_info.action_reasoning or "", - action_data=action_planner_info.action_data or {}, - cycle_timers=cycle_timers, - thinking_id=thinking_id, - action_message=action_planner_info.action_message, - ) - - self.last_active_time = time.time() - return { - "action_type": action_planner_info.action_type, - "success": success, - "result": result, - } - - except Exception as e: - logger.error(f"{self.log_prefix} 执行动作时出错: {e}") - logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}") - return { - "action_type": action_planner_info.action_type, - "success": False, - "result": "", - "loop_info": None, - "error": str(e), - } - - -class TempMethodsHFC: - @staticmethod - def get_talk_value(chat_id: Optional[str]) -> float: - result = global_config.chat.talk_value or 0.0000001 + def _get_talk_value(self, session_id: Optional[str]) -> float: + result = global_config.chat.talk_value or 0.0 if not global_config.chat.enable_talk_value_rules or not global_config.chat.talk_value_rules: return result - import time - local_time = time.localtime() now_min = local_time.tm_hour * 60 + local_time.tm_min - # 先处理特定规则 - if chat_id: + + # 优先匹配会话相关的规则 + if session_id: for rule in global_config.chat.talk_value_rules: if not rule.platform and not rule.item_id: - continue # 一起留空表示全局,跳过 - is_group = rule.rule_type == "group" - from src.chat.message_receive.chat_stream import get_chat_manager - - stream_id = get_chat_manager().get_stream_id(rule.platform, str(rule.item_id), is_group) - if stream_id != chat_id: - continue - parsed_range = TempMethodsHFC._parse_range(rule.time) + continue # 一起留空表示全局 + if rule.rule_type == "group": + rule_session_id = SessionUtils.calculate_session_id(rule.platform, group_id=str(rule.item_id)) + else: + rule_session_id = SessionUtils.calculate_session_id(rule.platform, user_id=str(rule.item_id)) + if rule_session_id != session_id: + continue # 不匹配的会话ID,跳过 + parsed_range = self._parse_range(rule.time) if not parsed_range: - continue + continue # 无法解析的时间范围,跳过 start_min, end_min = parsed_range in_range: bool = False if start_min <= end_min: in_range = start_min <= now_min <= end_min - else: + else: # 跨天的时间范围 in_range = now_min >= start_min or now_min <= end_min if in_range: - return rule.value or 0.0 - # 再处理全局规则 + return rule.value or 0.0 # 如果规则生效但没有设置值,返回0.0 + + # 没有匹配到会话相关的规则,继续匹配全局规则 for rule in global_config.chat.talk_value_rules: if rule.platform or rule.item_id: - continue # 有指定表示特定,跳过 - parsed_range = TempMethodsHFC._parse_range(rule.time) + continue # 只匹配全局规则 + parsed_range = self._parse_range(rule.time) if not parsed_range: - continue + continue # 无法解析的时间范围,跳过 start_min, end_min = parsed_range in_range: bool = False if start_min <= end_min: in_range = start_min <= now_min <= end_min - else: + else: # 跨天的时间范围 in_range = now_min >= start_min or now_min <= end_min if in_range: - return rule.value or 0.0000001 - return result + return rule.value or 0.0 # 如果规则生效但没有设置值,返回0.0 + return result # 如果没有任何规则生效,返回默认值 - @staticmethod - def _parse_range(range_str: str) -> Optional[tuple[int, int]]: + def _parse_range(self, range_str: str) -> Optional[tuple[int, int]]: """解析 "HH:MM-HH:MM" 到 (start_min, end_min)。""" try: start_str, end_str = [s.strip() for s in range_str.split("-")] diff --git a/src/chat/heart_flow/hfc_utils.py b/src/chat/heart_flow/hfc_utils.py index 9a715a2d..36d9d6fb 100644 --- a/src/chat/heart_flow/hfc_utils.py +++ b/src/chat/heart_flow/hfc_utils.py @@ -1,6 +1,8 @@ -import time +from dataclasses import dataclass from typing import Optional, Dict, Any +import time + from src.config.config import global_config from src.common.logger import get_logger from src.chat.message_receive.chat_stream import get_chat_manager @@ -11,6 +13,13 @@ from src.common.message_repository import count_messages logger = get_logger(__name__) +@dataclass +class CyclePlanInfo: + ... + +@dataclass +class CycleActionInfo: + ... class CycleDetail: """循环信息记录类""" @@ -22,8 +31,8 @@ class CycleDetail: self.end_time: Optional[float] = None self.timers: Dict[str, float] = {} - self.loop_plan_info: Dict[str, Any] = {} - self.loop_action_info: Dict[str, Any] = {} + self.loop_plan_info: CyclePlanInfo = CyclePlanInfo() + self.loop_action_info: CycleActionInfo = CycleActionInfo() def to_dict(self) -> Dict[str, Any]: """将循环信息转换为字典格式""" diff --git a/src/chat/message_receive/__init__.py b/src/chat/message_receive/__init__.py index fad126f4..b58146f8 100644 --- a/src/chat/message_receive/__init__.py +++ b/src/chat/message_receive/__init__.py @@ -1,10 +1,5 @@ from src.chat.emoji_system.emoji_manager import emoji_manager -from src.chat.message_receive.chat_stream import get_chat_manager -from src.chat.message_receive.storage import MessageStorage +from src.chat.message_receive.chat_manager import chat_manager -__all__ = [ - "get_chat_manager", - "MessageStorage", - "emoji_manager", -] +__all__ = ["chat_manager", "emoji_manager"] diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 7dbf3688..9cbb3cf9 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -1,21 +1,23 @@ import traceback import os -import re -from typing import Dict, Any, Optional -from maim_message import UserInfo, Seg, GroupInfo +from maim_message import MessageBase +from typing import Dict, Any + from src.common.logger import get_logger -from src.config.config import global_config -from src.chat.message_receive.chat_stream import get_chat_manager -from src.chat.message_receive.message import MessageRecv -from src.chat.message_receive.storage import MessageStorage +from src.common.utils.utils_message import MessageUtils +from src.common.utils.utils_session import SessionUtils +from src.chat.message_receive.message_old import MessageRecv from src.chat.heart_flow.heartflow_message_processor import HeartFCMessageReceiver from src.chat.brain_chat.PFC.pfc_manager import PFCManager from src.chat.utils.prompt_builder import Prompt, global_prompt_manager from src.plugin_system.core import component_registry, events_manager, global_announcement_manager from src.plugin_system.base import BaseCommand, EventType +from .message import SessionMessage +from .chat_manager import chat_manager + # 定义日志配置 # 获取项目根目录(假设本文件在src/chat/message_receive/下,根目录为上上上级目录) @@ -25,50 +27,6 @@ PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.. logger = get_logger("chat") -def _check_ban_words(text: str, userinfo: UserInfo, group_info: Optional[GroupInfo] = None) -> bool: - """检查消息是否包含过滤词 - - Args: - text: 待检查的文本 - chat: 聊天对象 - userinfo: 用户信息 - - Returns: - bool: 是否包含过滤词 - """ - for word in global_config.message_receive.ban_words: - if word in text: - chat_name = group_info.group_name if group_info else "私聊" - logger.info(f"[{chat_name}]{userinfo.user_nickname}:{text}") - logger.info(f"[过滤词识别]消息中含有{word},filtered") - return True - return False - - -def _check_ban_regex(text: str, userinfo: UserInfo, group_info: Optional[GroupInfo] = None) -> bool: - """检查消息是否匹配过滤正则表达式 - - Args: - text: 待检查的文本 - chat: 聊天对象 - userinfo: 用户信息 - - Returns: - bool: 是否匹配过滤正则 - """ - # 检查text是否为None或空字符串 - if text is None or not text: - return False - - for pattern in global_config.message_receive.ban_msgs_regex: - if re.search(pattern, text): - chat_name = group_info.group_name if group_info else "私聊" - logger.info(f"[{chat_name}]{userinfo.user_nickname}:{text}") - logger.info(f"[正则表达式过滤]消息匹配到{pattern},filtered") - return True - return False - - class ChatBot: def __init__(self): self.bot = None # bot 实例引用 @@ -100,9 +58,11 @@ class ChatBot: logger.error(f"创建PFC聊天失败: {e}") logger.error(traceback.format_exc()) - async def _process_commands(self, message: MessageRecv): + async def _process_commands(self, message: SessionMessage): # sourcery skip: use-named-expression """使用新插件系统处理命令""" + if not message.processed_plain_text: + return False, None, True # 没有文本内容,继续处理消息 try: text = message.processed_plain_text @@ -112,11 +72,8 @@ class ChatBot: command_class, matched_groups, command_info = command_result plugin_name = command_info.plugin_name command_name = command_info.name - if ( - message.chat_stream - and message.chat_stream.stream_id - and command_name - in global_announcement_manager.get_disabled_chat_commands(message.chat_stream.stream_id) + if message.session_id and command_name in global_announcement_manager.get_disabled_chat_commands( + message.session_id ): logger.info("用户禁用的命令,跳过处理") return False, None, True @@ -269,97 +226,115 @@ class ChatBot: ) # print(message_data) # logger.debug(str(message_data)) - message = MessageRecv(message_data) + maim_raw_message = MessageBase.from_dict(message_data) + message = SessionMessage.from_maim_message(maim_raw_message) group_info = message.message_info.group_info user_info = message.message_info.user_info - continue_flag, modified_message = await events_manager.handle_mai_events( - EventType.ON_MESSAGE_PRE_PROCESS, message + session_id = SessionUtils.calculate_session_id( + message.platform, + user_id=message.message_info.user_info.user_id, + group_id=group_info.group_id if group_info else None, ) - if not continue_flag: - return - if modified_message and modified_message._modify_flags.modify_message_segments: - message.message_segment = Seg(type="seglist", data=modified_message.message_segments) - if await self.handle_notice_message(message): - pass + message.session_id = session_id # 正确初始化session_id - # 处理消息内容,生成纯文本 + # TODO: 修复事件预处理部分 + # continue_flag, modified_message = await events_manager.handle_mai_events( + # EventType.ON_MESSAGE_PRE_PROCESS, message + # ) + # if not continue_flag: + # return + # if modified_message and modified_message._modify_flags.modify_message_segments: + # message.message_segment = Seg(type="seglist", data=modified_message.message_segments) + + # TODO: notice消息处理 + # if await self.handle_notice_message(message): + # pass + + # 处理消息内容,识别表情包等二进制数据并转化为文本描述 await message.process() # 平台层的 @ 检测由底层 is_mentioned_bot_in_message 统一处理;此处不做用户名硬编码匹配 # 过滤检查 - if _check_ban_words( - message.processed_plain_text, - user_info, # type: ignore - group_info, - ) or _check_ban_regex( - message.raw_message, # type: ignore - user_info, # type: ignore - group_info, - ): + text = message.processed_plain_text or "" + is_banned, word = MessageUtils.check_ban_words(text) + if is_banned: + chat_name = group_info.group_name if group_info else "私聊" + logger.info(f"[{chat_name}]{user_info.user_nickname}:{text}") + logger.info(f"[过滤词识别]消息中含有{word},filtered") + return + is_banned_regex, pattern = MessageUtils.check_ban_regex(text) + if is_banned_regex: + chat_name = group_info.group_name if group_info else "私聊" + logger.info(f"[{chat_name}]{user_info.user_nickname}:{text}") + logger.info(f"[正则表达式过滤]消息匹配到{pattern},filtered") return - get_chat_manager().register_message(message) + chat_manager.register_message(message) - chat = await get_chat_manager().get_or_create_stream( - platform=message.message_info.platform, # type: ignore - user_info=user_info, # type: ignore - group_info=group_info, - ) + platform = message.platform + user_id = user_info.user_id + group_id = group_info.group_id if group_info else None + _ = await chat_manager.get_or_create_session(platform, user_id, group_id) # 确保会话存在 - message.update_chat_stream(chat) - - # if await self.check_ban_content(message): - # logger.warning(f"检测到消息中含有违法,色情,暴力,反动,敏感内容,消息内容:{message.processed_plain_text},发送者:{message.message_info.user_info.user_nickname}") - # return + # message.update_chat_stream(chat) + # TODO: 在新命令系统完成后恢复这里 # 命令处理 - 使用新插件系统检查并处理命令 - is_command, cmd_result, continue_process = await self._process_commands(message) + # is_command, cmd_result, continue_process = await self._process_commands(message) # 如果是命令且不需要继续处理,则直接返回 - if is_command and not continue_process: - await MessageStorage.store_message(message, chat) - logger.info(f"命令处理完成,跳过后续消息处理: {cmd_result}") - return + # if is_command and not continue_process: + # await MessageStorage.store_message(message, chat) + # logger.info(f"命令处理完成,跳过后续消息处理: {cmd_result}") + # return - continue_flag, modified_message = await events_manager.handle_mai_events(EventType.ON_MESSAGE, message) - if not continue_flag: - return - if modified_message and modified_message._modify_flags.modify_plain_text: - message.processed_plain_text = modified_message.plain_text + # continue_flag, modified_message = await events_manager.handle_mai_events(EventType.ON_MESSAGE, message) + # if not continue_flag: + # return + # if modified_message and modified_message._modify_flags.modify_plain_text: + # message.processed_plain_text = modified_message.plain_text - # 确认从接口发来的message是否有自定义的prompt模板信息 - if message.message_info.template_info and not message.message_info.template_info.template_default: - template_group_name: Optional[str] = message.message_info.template_info.template_name # type: ignore - template_items = message.message_info.template_info.template_items - async with global_prompt_manager.async_message_scope(template_group_name): - if isinstance(template_items, dict): - for k in template_items.keys(): - await Prompt.create_async(template_items[k], k) - logger.debug(f"注册{template_items[k]},{k}") - else: - template_group_name = None + # # 确认从接口发来的message是否有自定义的prompt模板信息 + # if message.message_info.template_info and not message.message_info.template_info.template_default: + # template_group_name: Optional[str] = message.message_info.template_info.template_name # type: ignore + # template_items = message.message_info.template_info.template_items + # async with global_prompt_manager.async_message_scope(template_group_name): + # if isinstance(template_items, dict): + # for k in template_items.keys(): + # await Prompt.create_async(template_items[k], k) + # logger.debug(f"注册{template_items[k]},{k}") + # else: + # template_group_name = None + # async def preprocess(): + # # 根据聊天类型路由消息 + # if group_info is None: + # # 私聊消息 -> PFC系统 + # logger.debug("[私聊]检测到私聊消息,路由到PFC系统") + # await MessageStorage.store_message(message, chat) + # await self._create_pfc_chat(message) + # else: + # # 群聊消息 -> HeartFlow系统 + # logger.debug("[群聊]检测到群聊消息,路由到HeartFlow系统") + # await self.heartflow_message_receiver.process_message(message) + + # if template_group_name: + # async with global_prompt_manager.async_message_scope(template_group_name): + # await preprocess() + # else: + # await preprocess() async def preprocess(): - # 根据聊天类型路由消息 if group_info is None: - # 私聊消息 -> PFC系统 logger.debug("[私聊]检测到私聊消息,路由到PFC系统") - await MessageStorage.store_message(message, chat) + MessageUtils.store_message_to_db(message) # 存储消息到数据库 await self._create_pfc_chat(message) else: - # 群聊消息 -> HeartFlow系统 logger.debug("[群聊]检测到群聊消息,路由到HeartFlow系统") await self.heartflow_message_receiver.process_message(message) - if template_group_name: - async with global_prompt_manager.async_message_scope(template_group_name): - await preprocess() - else: - await preprocess() - except Exception as e: logger.error(f"预处理消息失败: {e}") traceback.print_exc()