From a3bc145051395b2e1cd54971516db17098ec5fe7 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Fri, 27 Mar 2026 15:08:11 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E8=AE=A9Maisaka=E4=BD=BF=E7=94=A8?= =?UTF-8?q?message=5Fcache?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/maisaka/runtime.py | 142 ++++++++++++++++++++++++----------------- 1 file changed, 85 insertions(+), 57 deletions(-) diff --git a/src/maisaka/runtime.py b/src/maisaka/runtime.py index a7e35037..526927ff 100644 --- a/src/maisaka/runtime.py +++ b/src/maisaka/runtime.py @@ -4,10 +4,12 @@ Maisaka runtime for non-CLI integrations. from datetime import datetime from pathlib import Path +import time from typing import Optional import asyncio +from src.chat.heart_flow.heartFC_utils import CycleDetail from src.chat.message_receive.chat_manager import BotChatSession, chat_manager from src.chat.message_receive.message import SessionMessage from src.common.data_models.mai_message_data_model import GroupInfo, MaiMessage, UserInfo @@ -55,10 +57,13 @@ class MaisakaHeartFlowChatting: self.log_prefix = f"[{session_name}]" self._llm_service = MaiSakaLLMService(api_key="", base_url=None, model="") self._chat_history: list[MaiMessage] = [] + self.history_loop: list[CycleDetail] = [] + self.message_cache: list[SessionMessage] = [] self._mcp_manager: Optional[MCPManager] = None - self._pending_messages: list[SessionMessage] = [] + self._current_cycle_detail: Optional[CycleDetail] = None self._source_messages_by_id: dict[str, SessionMessage] = {} self._running = False + self._cycle_counter = 0 self._loop_task: Optional[asyncio.Task] = None self._loop_lock = asyncio.Lock() self._new_message_event = asyncio.Event() @@ -79,7 +84,7 @@ class MaisakaHeartFlowChatting: self._running = True self._loop_task = asyncio.create_task(self._main_loop()) - logger.info(f"{self.log_prefix} MaiSaka 运行时已启动") + logger.info(f"{self.log_prefix} MaiSaka runtime started") async def stop(self) -> None: """Stop the runtime loop.""" @@ -102,15 +107,15 @@ class MaisakaHeartFlowChatting: await self._mcp_manager.close() self._mcp_manager = None - logger.info(f"{self.log_prefix} MaiSaka 运行时已停止") + logger.info(f"{self.log_prefix} MaiSaka runtime stopped") def adjust_talk_frequency(self, frequency: float) -> None: """Compatibility shim for the existing manager API.""" _ = frequency async def register_message(self, message: SessionMessage) -> None: - """Queue a newly received message for Maisaka processing.""" - self._pending_messages.append(message) + """Append a newly received message into the HFC-style message cache.""" + self.message_cache.append(message) self._source_messages_by_id[message.message_id] = message self._new_message_event.set() @@ -121,17 +126,19 @@ class MaisakaHeartFlowChatting: self._new_message_event.clear() async with self._loop_lock: - pending_messages = self._drain_pending_messages() - if not pending_messages: + cached_messages = self._drain_message_cache() + if not cached_messages: continue - await self._ingest_messages(pending_messages) - await self._run_internal_loop(anchor_message=pending_messages[-1]) - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} MaiSaka 运行循环已取消") - def _drain_pending_messages(self) -> list[SessionMessage]: - drained_messages = list(self._pending_messages) - self._pending_messages.clear() + await self._ingest_messages(cached_messages) + await self._run_internal_loop(anchor_message=cached_messages[-1]) + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} MaiSaka runtime loop cancelled") + + def _drain_message_cache(self) -> list[SessionMessage]: + """Drain the current message cache as one processing batch.""" + drained_messages = list(self.message_cache) + self.message_cache.clear() return drained_messages async def _init_mcp(self) -> None: @@ -139,17 +146,17 @@ class MaisakaHeartFlowChatting: config_path = Path(__file__).with_name("mcp_config.json") self._mcp_manager = await MCPManager.from_config(str(config_path)) if self._mcp_manager is None: - logger.info(f"{self.log_prefix} MaiSaka 运行时的 MCP 不可用") + logger.info(f"{self.log_prefix} MCP manager is unavailable") return mcp_tools = self._mcp_manager.get_openai_tools() if not mcp_tools: - logger.info(f"{self.log_prefix} MCP 管理器已初始化,但未暴露任何工具") + logger.info(f"{self.log_prefix} No MCP tools were exposed to Maisaka") return self._llm_service.set_extra_tools(mcp_tools) logger.info( - f"{self.log_prefix} 已为 MaiSaka 运行时加载 {len(mcp_tools)} 个 MCP 工具:\n" + f"{self.log_prefix} Loaded {len(mcp_tools)} MCP tools into Maisaka:\n" f"{self._mcp_manager.get_tool_summary()}" ) @@ -159,6 +166,7 @@ class MaisakaHeartFlowChatting: self._last_user_input_time = messages[-1].timestamp self._user_input_times.extend(message.timestamp for message in messages) + if MERGE_USER_MESSAGES: merged_sequence = await self._merge_messages(messages) merged_content = build_visible_text_from_sequence(merged_sequence).strip() @@ -203,6 +211,7 @@ class MaisakaHeartFlowChatting: source_sequence = getattr(message, "maisaka_original_raw_message", message.raw_message) else: source_sequence = message.raw_message + for component in clone_message_sequence(source_sequence).components: merged_sequence.components.append(component) appended_component = True @@ -263,48 +272,72 @@ class MaisakaHeartFlowChatting: return message_sequence async def _run_internal_loop(self, anchor_message: SessionMessage) -> None: + """Run the Maisaka internal loop, treating each thinking round as one cycle.""" last_had_tool_calls = True for round_index in range(self._max_internal_rounds): + cycle_detail = self._start_cycle() logger.info( - f"{self.log_prefix} 内部循环第 {round_index + 1}/{self._max_internal_rounds} 轮已开始" - f"(历史消息数={len(self._chat_history)})" + f"{self.log_prefix} MaiSaka cycle={cycle_detail.cycle_id} " + f"round={round_index + 1}/{self._max_internal_rounds} " + f"context_size={len(self._chat_history)}" ) - if last_had_tool_calls: - logger.info(f"{self.log_prefix} 调用规划器前正在构建感知快照") - await self._append_perception_snapshot() - logger.info(f"{self.log_prefix} 感知快照步骤已完成") + try: + if last_had_tool_calls: + perception_started_at = time.time() + await self._append_perception_snapshot() + cycle_detail.time_records["perception"] = time.time() - perception_started_at - logger.info(f"{self.log_prefix} 正在调用 MaiSaka 对话循环步骤") - response = await self._llm_service.chat_loop_step(self._chat_history) - logger.info( - f"{self.log_prefix} 对话循环步骤已返回" - f"(内容长度={len(response.content or '')},工具调用数={len(response.tool_calls)})" - ) - response.raw_message.platform = anchor_message.platform - response.raw_message.session_id = self.session_id - response.raw_message.message_info.group_info = self._build_group_info(anchor_message) - self._chat_history.append(response.raw_message) - self._last_assistant_response_time = datetime.now() + planner_started_at = time.time() + response = await self._llm_service.chat_loop_step(self._chat_history) + cycle_detail.time_records["planner"] = time.time() - planner_started_at - if response.tool_calls: - logger.info(f"{self.log_prefix} 正在处理 {len(response.tool_calls)} 个工具调用") - should_pause = await self._handle_tool_calls(response.tool_calls, response.content or "", anchor_message) - logger.info(f"{self.log_prefix} 工具处理已完成(是否应暂停={should_pause})") - if should_pause: - return - last_had_tool_calls = True - continue + response.raw_message.platform = anchor_message.platform + response.raw_message.session_id = self.session_id + response.raw_message.message_info.group_info = self._build_group_info(anchor_message) + self._chat_history.append(response.raw_message) + self._last_assistant_response_time = datetime.now() - if response.content: - logger.info(f"{self.log_prefix} 规划器返回了内容但没有工具调用,继续内部循环") - last_had_tool_calls = False - continue + if response.tool_calls: + tool_started_at = time.time() + should_pause = await self._handle_tool_calls(response.tool_calls, response.content or "", anchor_message) + cycle_detail.time_records["tool_calls"] = time.time() - tool_started_at + if should_pause: + return + last_had_tool_calls = True + continue - logger.info(f"{self.log_prefix} 规划器返回空内容且没有工具调用,退出内部循环") - return + if response.content: + last_had_tool_calls = False + continue - logger.info(f"{self.log_prefix} MaiSaka 内部循环已达到最大轮次并暂停") + return + finally: + self._end_cycle(cycle_detail) + + def _start_cycle(self) -> CycleDetail: + """Start a Maisaka thinking cycle.""" + self._cycle_counter += 1 + self._current_cycle_detail = CycleDetail(cycle_id=self._cycle_counter) + self._current_cycle_detail.thinking_id = f"maisaka_tid{round(time.time(), 2)}" + return self._current_cycle_detail + + def _end_cycle(self, cycle_detail: CycleDetail, only_long_execution: bool = True) -> CycleDetail: + """End and record a Maisaka thinking cycle.""" + cycle_detail.end_time = time.time() + self.history_loop.append(cycle_detail) + + timer_strings = [ + f"{name}: {duration:.2f}s" + for name, duration in cycle_detail.time_records.items() + if not only_long_execution or duration >= 0.1 + ] + logger.info( + f"{self.log_prefix} MaiSaka cycle={cycle_detail.cycle_id} completed " + f"in {cycle_detail.end_time - cycle_detail.start_time:.2f}s; " + f"stages={', '.join(timer_strings) if timer_strings else 'none'}" + ) + return cycle_detail def _trim_chat_history(self) -> None: """Trim the oldest history until the user-message count is below the configured limit.""" @@ -323,8 +356,8 @@ class MaisakaHeartFlowChatting: self._chat_history = trimmed_history logger.info( - f"{self.log_prefix} 已裁剪 MaiSaka 历史消息 {removed_count} 条;" - f"当前用户消息数为 {user_message_count}。" + f"{self.log_prefix} Trimmed {removed_count} history messages; " + f"remaining_user_messages={user_message_count}" ) async def _append_perception_snapshot(self) -> None: @@ -340,12 +373,7 @@ class MaisakaHeartFlowChatting: perception_parts: list[str] = [] for (task_name, _), result in zip(tasks, results): if isinstance(result, Exception): - analysis_name = { - "emotion": "情绪", - "cognition": "认知", - "knowledge": "知识", - }.get(task_name, task_name) - logger.warning(f"{self.log_prefix} MaiSaka 的{analysis_name}分析失败: {result}") + logger.warning(f"{self.log_prefix} Maisaka {task_name} analysis failed: {result}") continue if result: perception_parts.append(f"{task_name.title()}\n{result}")