diff --git a/src/maisaka/cli.py b/src/maisaka/cli.py index c76d24ed..d036a77c 100644 --- a/src/maisaka/cli.py +++ b/src/maisaka/cli.py @@ -25,13 +25,10 @@ from .message_adapter import build_message, format_speaker_content, remove_last_ from .mcp_client import MCPManager from .tool_handlers import ( ToolHandlerContext, - handle_list_files, handle_mcp_tool, - handle_read_file, handle_stop, handle_unknown_tool, handle_wait, - handle_write_file, ) @@ -270,15 +267,6 @@ class BufferCLI: if tool_result.startswith("[[QUIT]]"): should_stop = True - elif tc.func_name == "write_file": - await handle_write_file(tc, chat_history) - - elif tc.func_name == "read_file": - await handle_read_file(tc, chat_history) - - elif tc.func_name == "list_files": - await handle_list_files(tc, chat_history) - elif self._mcp_manager and self._mcp_manager.is_mcp_tool(tc.func_name): await handle_mcp_tool(tc, chat_history, self._mcp_manager) diff --git a/src/maisaka/llm_service.py b/src/maisaka/llm_service.py index d9c3cc8f..78d61a50 100644 --- a/src/maisaka/llm_service.py +++ b/src/maisaka/llm_service.py @@ -98,8 +98,6 @@ class MaiSakaLLMService: # 回复生成使用 replyer 模型 self._llm_replyer = LLMServiceClient(task_name="replyer", request_type="maisaka_replyer") - # 尝试修复数据库 schema(忽略错误) - self._try_fix_database_schema() # 构建人设信息 personality_prompt = self._build_personality_prompt() @@ -125,29 +123,6 @@ class MaiSakaLLMService: logger.warning(f"获取当前 Maisaka 模型名称失败: {exc}") return "未配置" - def _try_fix_database_schema(self) -> None: - """尝试修复数据库 schema。 - - Returns: - None: 该方法仅执行数据库修复副作用。 - """ - try: - from src.common.database.database_client import get_db_session - from sqlalchemy import text - - with get_db_session() as session: - # 检查 model_api_provider_name 列是否存在 - result = session.execute(text("PRAGMA table_info(llm_usage)")) - columns = [row[1] for row in result.fetchall()] - - if "model_api_provider_name" not in columns: - # 添加缺失的列 - session.execute(text("ALTER TABLE llm_usage ADD COLUMN model_api_provider_name VARCHAR(255)")) - session.commit() - logger.info("数据库结构已修复:已添加 model_api_provider_name 列") - except Exception: - # 静默忽略任何错误,不影响正常流程 - pass def _build_personality_prompt(self) -> str: """构建当前人设提示词。 @@ -208,11 +183,11 @@ class MaiSakaLLMService: try: tools_section = "" - if global_config.maisaka.enable_write_file: + if False and global_config.maisaka.enable_write_file: tools_section += "\n• write_file(filename, content) — 在 mai_files 目录下写入文件。" - if global_config.maisaka.enable_read_file: + if False and global_config.maisaka.enable_read_file: tools_section += "\n• read_file(filename) — 读取 mai_files 目录下的文件内容。" - if global_config.maisaka.enable_list_files: + if False and global_config.maisaka.enable_list_files: tools_section += "\n• list_files() — 获取 mai_files 目录下所有文件的元信息列表。" self._chat_system_prompt = load_prompt( "maidairy_chat", diff --git a/src/maisaka/mcp_client/manager.py b/src/maisaka/mcp_client/manager.py index 68aa1834..1efba099 100644 --- a/src/maisaka/mcp_client/manager.py +++ b/src/maisaka/mcp_client/manager.py @@ -16,9 +16,6 @@ BUILTIN_TOOL_NAMES = frozenset( "no_reply", "wait", "stop", - "write_file", - "read_file", - "list_files", "create_table", "list_tables", "view_table", diff --git a/src/maisaka/reasoning_engine.py b/src/maisaka/reasoning_engine.py new file mode 100644 index 00000000..5226fd79 --- /dev/null +++ b/src/maisaka/reasoning_engine.py @@ -0,0 +1,291 @@ +"""Maisaka 推理引擎。""" + +import asyncio +import time +from datetime import datetime +from typing import TYPE_CHECKING, Optional + +from src.chat.heart_flow.heartFC_utils import CycleDetail +from src.chat.message_receive.message import SessionMessage +from src.common.data_models.mai_message_data_model import UserInfo +from src.common.data_models.message_component_data_model import MessageSequence +from src.config.config import global_config +from src.llm_models.payload_content.tool_option import ToolCall +from src.services import send_service + +from .message_adapter import ( + build_message, + build_visible_text_from_sequence, + clone_message_sequence, + format_speaker_content, + get_message_role, +) +from .tool_handlers import ( + handle_mcp_tool, + handle_unknown_tool, +) + +if TYPE_CHECKING: + from .runtime import MaisakaHeartFlowChatting + + +class MaisakaReasoningEngine: + """负责内部思考、推理与工具执行。""" + + def __init__(self, runtime: "MaisakaHeartFlowChatting") -> None: + self._runtime = runtime + + async def run_loop(self) -> None: + """独立消费消息批次,并执行对应的内部思考轮次。""" + try: + while self._runtime._running: + cached_messages = await self._runtime._internal_turn_queue.get() + if not cached_messages: + self._runtime._internal_turn_queue.task_done() + continue + + self._runtime._agent_state = self._runtime._STATE_RUNNING + await self._ingest_messages(cached_messages) + + anchor_message = cached_messages[-1] + try: + for round_index in range(self._runtime._max_internal_rounds): + cycle_detail = self._start_cycle() + self._runtime._log_cycle_started(cycle_detail, round_index) + try: + planner_started_at = time.time() + response = await self._runtime._llm_service.chat_loop_step(self._runtime._chat_history) + cycle_detail.time_records["planner"] = time.time() - planner_started_at + + response.raw_message.platform = anchor_message.platform + response.raw_message.session_id = self._runtime.session_id + response.raw_message.message_info.group_info = self._runtime._build_group_info(anchor_message) + self._runtime._chat_history.append(response.raw_message) + + if response.tool_calls: + tool_started_at = time.time() + should_pause = await self._handle_tool_calls( + response.tool_calls, + response.content or "", + anchor_message, + ) + cycle_detail.time_records["tool_calls"] = time.time() - tool_started_at + if should_pause: + break + continue + + if response.content: + continue + + break + finally: + self._end_cycle(cycle_detail) + finally: + if self._runtime._agent_state == self._runtime._STATE_RUNNING: + self._runtime._agent_state = self._runtime._STATE_STOP + self._runtime._internal_turn_queue.task_done() + except asyncio.CancelledError: + self._runtime._log_internal_loop_cancelled() + raise + + async def _ingest_messages(self, messages: list[SessionMessage]) -> None: + """处理传入消息列表,将其转换为历史消息并加入聊天历史缓存。""" + for message in messages: + # 构建用户消息序列 + user_sequence = await self._build_message_sequence(message) + visible_text = build_visible_text_from_sequence(user_sequence).strip() + if not user_sequence.components: + continue + + history_message = build_message( + role="user", + content=visible_text, + source="user", + timestamp=message.timestamp, + platform=message.platform, + session_id=self._runtime.session_id, + group_info=self._runtime._build_group_info(message), + user_info=self._runtime._build_runtime_user_info(), + raw_message=user_sequence, + display_text=visible_text, + ) + self._runtime._chat_history.append(history_message) + self._trim_chat_history() + + async def _build_message_sequence(self, message: SessionMessage) -> MessageSequence: + message_sequence = MessageSequence([]) + user_info = message.message_info.user_info + speaker_name = user_info.user_cardname or user_info.user_nickname or user_info.user_id + message_sequence.text(format_speaker_content(speaker_name, "", message.timestamp, message.message_id)) + + appended_component = False + if global_config.maisaka.direct_image_input: + source_sequence = getattr(message, "maisaka_original_raw_message", message.raw_message) + else: + source_sequence = message.raw_message + + for component in clone_message_sequence(source_sequence).components: + message_sequence.components.append(component) + appended_component = True + + if not appended_component: + if not message.processed_plain_text: + await message.process() + content = (message.processed_plain_text or "").strip() + if content: + message_sequence.text(content) + + return message_sequence + + def _start_cycle(self) -> CycleDetail: + """开始一轮 Maisaka 思考循环。""" + self._runtime._cycle_counter += 1 + self._runtime._current_cycle_detail = CycleDetail(cycle_id=self._runtime._cycle_counter) + self._runtime._current_cycle_detail.thinking_id = f"maisaka_tid{round(time.time(), 2)}" + return self._runtime._current_cycle_detail + + def _end_cycle(self, cycle_detail: CycleDetail, only_long_execution: bool = True) -> CycleDetail: + """结束并记录一轮 Maisaka 思考循环。""" + cycle_detail.end_time = time.time() + self._runtime.history_loop.append(cycle_detail) + + timer_strings = [ + f"{name}: {duration:.2f}s" + for name, duration in cycle_detail.time_records.items() + if not only_long_execution or duration >= 0.1 + ] + self._runtime._log_cycle_completed(cycle_detail, timer_strings) + return cycle_detail + + def _trim_chat_history(self) -> None: + """裁剪聊天历史,保证用户消息数量不超过配置限制。""" + user_message_count = sum(1 for message in self._runtime._chat_history if get_message_role(message) == "user") + if user_message_count <= self._runtime._max_context_size: + return + + trimmed_history = list(self._runtime._chat_history) + removed_count = 0 + + while user_message_count >= self._runtime._max_context_size and trimmed_history: + removed_message = trimmed_history.pop(0) + removed_count += 1 + if get_message_role(removed_message) == "user": + user_message_count -= 1 + + self._runtime._chat_history = trimmed_history + self._runtime._log_history_trimmed(removed_count, user_message_count) + + async def _handle_tool_calls( + self, + tool_calls: list[ToolCall], + latest_thought: str, + anchor_message: SessionMessage, + ) -> bool: + for tool_call in tool_calls: + if tool_call.func_name == "reply": + reply_sent = await self._handle_reply(tool_call, latest_thought, anchor_message) + if reply_sent: + return True + continue + + if tool_call.func_name == "no_reply": + self._runtime._chat_history.append( + self._build_tool_message( + tool_call, + "No visible reply was sent for this round.", + ) + ) + continue + + if tool_call.func_name == "wait": + seconds = (tool_call.args or {}).get("seconds", 30) + try: + wait_seconds = int(seconds) + except (TypeError, ValueError): + wait_seconds = 30 + wait_seconds = max(0, wait_seconds) + self._runtime._chat_history.append( + self._build_tool_message( + tool_call, + f"Waiting for future input for up to {wait_seconds} seconds.", + ) + ) + self._runtime._enter_wait_state(seconds=wait_seconds) + return True + + if tool_call.func_name == "stop": + self._runtime._chat_history.append( + self._build_tool_message( + tool_call, + "Conversation loop paused until a new message arrives.", + ) + ) + self._runtime._enter_stop_state() + return True + + if self._runtime._mcp_manager and self._runtime._mcp_manager.is_mcp_tool(tool_call.func_name): + await handle_mcp_tool(tool_call, self._runtime._chat_history, self._runtime._mcp_manager) + continue + + await handle_unknown_tool(tool_call, self._runtime._chat_history) + + return False + + async def _handle_reply( + self, + tool_call: ToolCall, + latest_thought: str, + anchor_message: SessionMessage, + ) -> bool: + target_message_id = str((tool_call.args or {}).get("message_id", "")).strip() + if not target_message_id: + self._runtime._chat_history.append( + self._build_tool_message(tool_call, "reply requires a valid message_id argument.") + ) + return False + + target_message = self._runtime._source_messages_by_id.get(target_message_id) + if target_message is None: + self._runtime._chat_history.append( + self._build_tool_message(tool_call, f"reply target message_id not found: {target_message_id}") + ) + return False + + reply_text = await self._runtime._llm_service.generate_reply(latest_thought, self._runtime._chat_history) + sent = await send_service.text_to_stream( + text=reply_text, + stream_id=self._runtime.session_id, + set_reply=True, + reply_message=target_message, + typing=False, + ) + tool_result = "Visible reply generated and sent." if sent else "Visible reply generation succeeded but send failed." + self._runtime._chat_history.append(self._build_tool_message(tool_call, tool_result)) + if not sent: + return False + + bot_name = global_config.bot.nickname.strip() or "MaiSaka" + self._runtime._chat_history.append( + build_message( + role="user", + content=format_speaker_content(bot_name, reply_text, datetime.now()), + source="guided_reply", + platform=target_message.platform or anchor_message.platform, + session_id=self._runtime.session_id, + group_info=self._runtime._build_group_info(target_message), + user_info=self._runtime._build_runtime_user_info(), + ) + ) + return True + + def _build_tool_message(self, tool_call: ToolCall, content: str) -> SessionMessage: + return build_message( + role="tool", + content=content, + source="tool", + tool_call_id=tool_call.call_id, + platform=self._runtime.chat_stream.platform, + session_id=self._runtime.session_id, + group_info=self._runtime._build_group_info(), + user_info=UserInfo(user_id="maisaka_tool", user_nickname="tool", user_cardname=None), + ) diff --git a/src/maisaka/runtime.py b/src/maisaka/runtime.py index 8d616bef..c570a496 100644 --- a/src/maisaka/runtime.py +++ b/src/maisaka/runtime.py @@ -27,6 +27,7 @@ from .message_adapter import ( format_speaker_content, get_message_role, ) +from .reasoning_engine import MaisakaReasoningEngine from .tool_handlers import ( handle_list_files, handle_mcp_tool, @@ -71,6 +72,7 @@ class MaisakaHeartFlowChatting: self._max_context_size = max(1, int(global_config.chat.max_context_size)) self._agent_state: Literal["running", "wait", "stop"] = self._STATE_STOP self._wait_until: Optional[float] = None + self._reasoning_engine = MaisakaReasoningEngine(self) async def start(self) -> None: """Start the runtime loop.""" @@ -81,7 +83,7 @@ class MaisakaHeartFlowChatting: await self._init_mcp() self._running = True - self._internal_loop_task = asyncio.create_task(self._internal_loop()) + self._internal_loop_task = asyncio.create_task(self._reasoning_engine.run_loop()) self._loop_task = asyncio.create_task(self._main_loop()) logger.info(f"{self.log_prefix} MaiSaka 启动") @@ -429,15 +431,15 @@ class MaisakaHeartFlowChatting: self._enter_stop_state() return True - if tool_call.func_name == "write_file" and global_config.maisaka.enable_write_file: + if False and tool_call.func_name == "write_file" and global_config.maisaka.enable_write_file: await handle_write_file(tool_call, self._chat_history) continue - if tool_call.func_name == "read_file" and global_config.maisaka.enable_read_file: + if False and tool_call.func_name == "read_file" and global_config.maisaka.enable_read_file: await handle_read_file(tool_call, self._chat_history) continue - if tool_call.func_name == "list_files" and global_config.maisaka.enable_list_files: + if False and tool_call.func_name == "list_files" and global_config.maisaka.enable_list_files: await handle_list_files(tool_call, self._chat_history) continue @@ -530,3 +532,26 @@ class MaisakaHeartFlowChatting: return None return GroupInfo(group_id=group_info.group_id, group_name=group_info.group_name) + + def _log_cycle_started(self, cycle_detail: CycleDetail, round_index: int) -> None: + logger.info( + f"{self.log_prefix} MaiSaka cycle={cycle_detail.cycle_id} " + f"round={round_index + 1}/{self._max_internal_rounds} " + f"context_size={len(self._chat_history)}" + ) + + def _log_cycle_completed(self, cycle_detail: CycleDetail, timer_strings: list[str]) -> None: + logger.info( + f"{self.log_prefix} MaiSaka cycle={cycle_detail.cycle_id} completed " + f"in {cycle_detail.end_time - cycle_detail.start_time:.2f}s; " + f"stages={', '.join(timer_strings) if timer_strings else 'none'}" + ) + + def _log_history_trimmed(self, removed_count: int, user_message_count: int) -> None: + logger.info( + f"{self.log_prefix} Trimmed {removed_count} history messages; " + f"remaining_user_messages={user_message_count}" + ) + + def _log_internal_loop_cancelled(self) -> None: + logger.info(f"{self.log_prefix} Maisaka internal loop cancelled")