feat:让Maisaka使用message_cache

This commit is contained in:
SengokuCola
2026-03-27 15:08:11 +08:00
parent 0959193808
commit a3bc145051

View File

@@ -4,10 +4,12 @@ Maisaka runtime for non-CLI integrations.
from datetime import datetime
from pathlib import Path
import time
from typing import Optional
import asyncio
from src.chat.heart_flow.heartFC_utils import CycleDetail
from src.chat.message_receive.chat_manager import BotChatSession, chat_manager
from src.chat.message_receive.message import SessionMessage
from src.common.data_models.mai_message_data_model import GroupInfo, MaiMessage, UserInfo
@@ -55,10 +57,13 @@ class MaisakaHeartFlowChatting:
self.log_prefix = f"[{session_name}]"
self._llm_service = MaiSakaLLMService(api_key="", base_url=None, model="")
self._chat_history: list[MaiMessage] = []
self.history_loop: list[CycleDetail] = []
self.message_cache: list[SessionMessage] = []
self._mcp_manager: Optional[MCPManager] = None
self._pending_messages: list[SessionMessage] = []
self._current_cycle_detail: Optional[CycleDetail] = None
self._source_messages_by_id: dict[str, SessionMessage] = {}
self._running = False
self._cycle_counter = 0
self._loop_task: Optional[asyncio.Task] = None
self._loop_lock = asyncio.Lock()
self._new_message_event = asyncio.Event()
@@ -79,7 +84,7 @@ class MaisakaHeartFlowChatting:
self._running = True
self._loop_task = asyncio.create_task(self._main_loop())
logger.info(f"{self.log_prefix} MaiSaka 运行时已启动")
logger.info(f"{self.log_prefix} MaiSaka runtime started")
async def stop(self) -> None:
"""Stop the runtime loop."""
@@ -102,15 +107,15 @@ class MaisakaHeartFlowChatting:
await self._mcp_manager.close()
self._mcp_manager = None
logger.info(f"{self.log_prefix} MaiSaka 运行时已停止")
logger.info(f"{self.log_prefix} MaiSaka runtime stopped")
def adjust_talk_frequency(self, frequency: float) -> None:
"""Compatibility shim for the existing manager API."""
_ = frequency
async def register_message(self, message: SessionMessage) -> None:
"""Queue a newly received message for Maisaka processing."""
self._pending_messages.append(message)
"""Append a newly received message into the HFC-style message cache."""
self.message_cache.append(message)
self._source_messages_by_id[message.message_id] = message
self._new_message_event.set()
@@ -121,17 +126,19 @@ class MaisakaHeartFlowChatting:
self._new_message_event.clear()
async with self._loop_lock:
pending_messages = self._drain_pending_messages()
if not pending_messages:
cached_messages = self._drain_message_cache()
if not cached_messages:
continue
await self._ingest_messages(pending_messages)
await self._run_internal_loop(anchor_message=pending_messages[-1])
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} MaiSaka 运行循环已取消")
def _drain_pending_messages(self) -> list[SessionMessage]:
drained_messages = list(self._pending_messages)
self._pending_messages.clear()
await self._ingest_messages(cached_messages)
await self._run_internal_loop(anchor_message=cached_messages[-1])
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} MaiSaka runtime loop cancelled")
def _drain_message_cache(self) -> list[SessionMessage]:
"""Drain the current message cache as one processing batch."""
drained_messages = list(self.message_cache)
self.message_cache.clear()
return drained_messages
async def _init_mcp(self) -> None:
@@ -139,17 +146,17 @@ class MaisakaHeartFlowChatting:
config_path = Path(__file__).with_name("mcp_config.json")
self._mcp_manager = await MCPManager.from_config(str(config_path))
if self._mcp_manager is None:
logger.info(f"{self.log_prefix} MaiSaka 运行时的 MCP 不可用")
logger.info(f"{self.log_prefix} MCP manager is unavailable")
return
mcp_tools = self._mcp_manager.get_openai_tools()
if not mcp_tools:
logger.info(f"{self.log_prefix} MCP 管理器已初始化,但未暴露任何工具")
logger.info(f"{self.log_prefix} No MCP tools were exposed to Maisaka")
return
self._llm_service.set_extra_tools(mcp_tools)
logger.info(
f"{self.log_prefix} 已为 MaiSaka 运行时加载 {len(mcp_tools)} MCP 工具:\n"
f"{self.log_prefix} Loaded {len(mcp_tools)} MCP tools into Maisaka:\n"
f"{self._mcp_manager.get_tool_summary()}"
)
@@ -159,6 +166,7 @@ class MaisakaHeartFlowChatting:
self._last_user_input_time = messages[-1].timestamp
self._user_input_times.extend(message.timestamp for message in messages)
if MERGE_USER_MESSAGES:
merged_sequence = await self._merge_messages(messages)
merged_content = build_visible_text_from_sequence(merged_sequence).strip()
@@ -203,6 +211,7 @@ class MaisakaHeartFlowChatting:
source_sequence = getattr(message, "maisaka_original_raw_message", message.raw_message)
else:
source_sequence = message.raw_message
for component in clone_message_sequence(source_sequence).components:
merged_sequence.components.append(component)
appended_component = True
@@ -263,48 +272,72 @@ class MaisakaHeartFlowChatting:
return message_sequence
async def _run_internal_loop(self, anchor_message: SessionMessage) -> None:
"""Run the Maisaka internal loop, treating each thinking round as one cycle."""
last_had_tool_calls = True
for round_index in range(self._max_internal_rounds):
cycle_detail = self._start_cycle()
logger.info(
f"{self.log_prefix} 内部循环第 {round_index + 1}/{self._max_internal_rounds} 轮已开始"
f"(历史消息数={len(self._chat_history)}"
f"{self.log_prefix} MaiSaka cycle={cycle_detail.cycle_id} "
f"round={round_index + 1}/{self._max_internal_rounds} "
f"context_size={len(self._chat_history)}"
)
if last_had_tool_calls:
logger.info(f"{self.log_prefix} 调用规划器前正在构建感知快照")
await self._append_perception_snapshot()
logger.info(f"{self.log_prefix} 感知快照步骤已完成")
try:
if last_had_tool_calls:
perception_started_at = time.time()
await self._append_perception_snapshot()
cycle_detail.time_records["perception"] = time.time() - perception_started_at
logger.info(f"{self.log_prefix} 正在调用 MaiSaka 对话循环步骤")
response = await self._llm_service.chat_loop_step(self._chat_history)
logger.info(
f"{self.log_prefix} 对话循环步骤已返回"
f"(内容长度={len(response.content or '')},工具调用数={len(response.tool_calls)}"
)
response.raw_message.platform = anchor_message.platform
response.raw_message.session_id = self.session_id
response.raw_message.message_info.group_info = self._build_group_info(anchor_message)
self._chat_history.append(response.raw_message)
self._last_assistant_response_time = datetime.now()
planner_started_at = time.time()
response = await self._llm_service.chat_loop_step(self._chat_history)
cycle_detail.time_records["planner"] = time.time() - planner_started_at
if response.tool_calls:
logger.info(f"{self.log_prefix} 正在处理 {len(response.tool_calls)} 个工具调用")
should_pause = await self._handle_tool_calls(response.tool_calls, response.content or "", anchor_message)
logger.info(f"{self.log_prefix} 工具处理已完成(是否应暂停={should_pause}")
if should_pause:
return
last_had_tool_calls = True
continue
response.raw_message.platform = anchor_message.platform
response.raw_message.session_id = self.session_id
response.raw_message.message_info.group_info = self._build_group_info(anchor_message)
self._chat_history.append(response.raw_message)
self._last_assistant_response_time = datetime.now()
if response.content:
logger.info(f"{self.log_prefix} 规划器返回了内容但没有工具调用,继续内部循环")
last_had_tool_calls = False
continue
if response.tool_calls:
tool_started_at = time.time()
should_pause = await self._handle_tool_calls(response.tool_calls, response.content or "", anchor_message)
cycle_detail.time_records["tool_calls"] = time.time() - tool_started_at
if should_pause:
return
last_had_tool_calls = True
continue
logger.info(f"{self.log_prefix} 规划器返回空内容且没有工具调用,退出内部循环")
return
if response.content:
last_had_tool_calls = False
continue
logger.info(f"{self.log_prefix} MaiSaka 内部循环已达到最大轮次并暂停")
return
finally:
self._end_cycle(cycle_detail)
def _start_cycle(self) -> CycleDetail:
"""Start a Maisaka thinking cycle."""
self._cycle_counter += 1
self._current_cycle_detail = CycleDetail(cycle_id=self._cycle_counter)
self._current_cycle_detail.thinking_id = f"maisaka_tid{round(time.time(), 2)}"
return self._current_cycle_detail
def _end_cycle(self, cycle_detail: CycleDetail, only_long_execution: bool = True) -> CycleDetail:
"""End and record a Maisaka thinking cycle."""
cycle_detail.end_time = time.time()
self.history_loop.append(cycle_detail)
timer_strings = [
f"{name}: {duration:.2f}s"
for name, duration in cycle_detail.time_records.items()
if not only_long_execution or duration >= 0.1
]
logger.info(
f"{self.log_prefix} MaiSaka cycle={cycle_detail.cycle_id} completed "
f"in {cycle_detail.end_time - cycle_detail.start_time:.2f}s; "
f"stages={', '.join(timer_strings) if timer_strings else 'none'}"
)
return cycle_detail
def _trim_chat_history(self) -> None:
"""Trim the oldest history until the user-message count is below the configured limit."""
@@ -323,8 +356,8 @@ class MaisakaHeartFlowChatting:
self._chat_history = trimmed_history
logger.info(
f"{self.log_prefix} 已裁剪 MaiSaka 历史消息 {removed_count} 条;"
f"当前用户消息数为 {user_message_count}"
f"{self.log_prefix} Trimmed {removed_count} history messages; "
f"remaining_user_messages={user_message_count}"
)
async def _append_perception_snapshot(self) -> None:
@@ -340,12 +373,7 @@ class MaisakaHeartFlowChatting:
perception_parts: list[str] = []
for (task_name, _), result in zip(tasks, results):
if isinstance(result, Exception):
analysis_name = {
"emotion": "情绪",
"cognition": "认知",
"knowledge": "知识",
}.get(task_name, task_name)
logger.warning(f"{self.log_prefix} MaiSaka 的{analysis_name}分析失败: {result}")
logger.warning(f"{self.log_prefix} Maisaka {task_name} analysis failed: {result}")
continue
if result:
perception_parts.append(f"{task_name.title()}\n{result}")