From 20bab798727a3483f04fa196fe4f0b19c6fc9dda Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sun, 29 Mar 2026 01:00:43 +0800 Subject: [PATCH] =?UTF-8?q?ref=EF=BC=9A=E7=A7=BB=E9=99=A4=E4=B8=80?= =?UTF-8?q?=E4=BA=9B=E8=8D=A3=E8=AA=89=E6=A8=A1=E5=9D=97=EF=BC=8C=E6=96=B0?= =?UTF-8?q?=E5=BB=BAmaisaka=E5=9B=9E=E5=A4=8D=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/heart_flow/heartflow_manager.py | 55 +- src/chat/replyer/maisaka_generator.py | 256 +++++++++ src/chat/replyer/replyer_manager.py | 95 ++-- src/maisaka/chat_loop_service.py | 389 +++++++++++++ src/maisaka/cli.py | 226 ++++---- src/maisaka/knowledge.py | 4 +- src/maisaka/llm_service.py | 660 ----------------------- src/maisaka/reasoning_engine.py | 102 +++- src/maisaka/replyer.py | 115 ---- src/maisaka/runtime.py | 22 +- src/maisaka/tool_handlers.py | 3 - 11 files changed, 957 insertions(+), 970 deletions(-) create mode 100644 src/chat/replyer/maisaka_generator.py create mode 100644 src/maisaka/chat_loop_service.py delete mode 100644 src/maisaka/llm_service.py delete mode 100644 src/maisaka/replyer.py diff --git a/src/chat/heart_flow/heartflow_manager.py b/src/chat/heart_flow/heartflow_manager.py index 0b83d819..3bbc6ec3 100644 --- a/src/chat/heart_flow/heartflow_manager.py +++ b/src/chat/heart_flow/heartflow_manager.py @@ -1,7 +1,8 @@ -from typing import Dict - +import asyncio import traceback +from typing import Dict + from src.chat.message_receive.chat_manager import chat_manager from src.common.logger import get_logger from src.maisaka.runtime import MaisakaHeartFlowChatting @@ -10,46 +11,38 @@ logger = get_logger("heartflow") class HeartflowManager: - """主心流协调器。 - - 当前群聊统一使用 Maisaka runtime 作为消息核心循环实现。 - """ + """管理 session 级别的 Maisaka 心流实例。""" def __init__(self) -> None: - """初始化心流聊天实例缓存。""" self.heartflow_chat_list: Dict[str, MaisakaHeartFlowChatting] = {} + self._chat_create_locks: Dict[str, asyncio.Lock] = {} async def get_or_create_heartflow_chat(self, session_id: str) -> MaisakaHeartFlowChatting: - """获取或创建群聊心流实例。 - - Args: - session_id: 聊天会话 ID。 - - Returns: - MaisakaHeartFlowChatting: 当前会话绑定的 Maisaka runtime。 - """ + """获取或创建指定会话对应的 Maisaka runtime。""" try: if chat := self.heartflow_chat_list.get(session_id): return chat - chat_session = chat_manager.get_session_by_session_id(session_id) - if not chat_session: - raise ValueError(f"未找到 session_id={session_id} 的聊天流") - new_chat = MaisakaHeartFlowChatting(session_id=session_id) - await new_chat.start() - self.heartflow_chat_list[session_id] = new_chat - return new_chat - except Exception as e: - logger.error(f"创建心流聊天 {session_id} 失败: {e}", exc_info=True) + + create_lock = self._chat_create_locks.setdefault(session_id, asyncio.Lock()) + async with create_lock: + if chat := self.heartflow_chat_list.get(session_id): + return chat + + chat_session = chat_manager.get_session_by_session_id(session_id) + if not chat_session: + raise ValueError(f"未找到 session_id={session_id} 对应的聊天流") + + new_chat = MaisakaHeartFlowChatting(session_id=session_id) + await new_chat.start() + self.heartflow_chat_list[session_id] = new_chat + return new_chat + except Exception as exc: + logger.error(f"创建心流聊天 {session_id} 失败: {exc}", exc_info=True) traceback.print_exc() - raise e + raise def adjust_talk_frequency(self, session_id: str, frequency: float) -> None: - """调整指定聊天流的说话频率。 - - Args: - session_id: 聊天会话 ID。 - frequency: 目标频率系数。 - """ + """调整指定聊天流的说话频率。""" chat = self.heartflow_chat_list.get(session_id) if chat: chat.adjust_talk_frequency(frequency) diff --git a/src/chat/replyer/maisaka_generator.py b/src/chat/replyer/maisaka_generator.py new file mode 100644 index 00000000..76bbc93f --- /dev/null +++ b/src/chat/replyer/maisaka_generator.py @@ -0,0 +1,256 @@ +from datetime import datetime +from typing import Dict, List, Optional, Tuple + +import random +import time + +from src.chat.message_receive.chat_manager import BotChatSession +from src.chat.message_receive.message import SessionMessage +from src.common.data_models.reply_generation_data_models import ( + GenerationMetrics, + LLMCompletionResult, + ReplyGenerationResult, +) +from src.common.logger import get_logger +from src.common.prompt_i18n import load_prompt +from src.config.config import global_config +from src.core.types import ActionInfo +from src.services.llm_service import LLMServiceClient + +from src.maisaka.message_adapter import ( + get_message_kind, + get_message_role, + get_message_text, + is_perception_message, + parse_speaker_content, +) + +logger = get_logger("maisaka_replyer") + + +class MaisakaReplyGenerator: + """Maisaka 可见回复生成器。""" + + def __init__( + self, + chat_stream: Optional[BotChatSession] = None, + request_type: str = "maisaka_replyer", + ) -> None: + self.chat_stream = chat_stream + self.request_type = request_type + self.express_model = LLMServiceClient( + task_name="replyer", + request_type=request_type, + ) + self._personality_prompt = self._build_personality_prompt() + + def _build_personality_prompt(self) -> str: + """构建回复器使用的人设描述。""" + try: + bot_name = global_config.bot.nickname + alias_names = global_config.bot.alias_names + bot_aliases = f",也有人叫你{','.join(alias_names)}" if alias_names else "" + + prompt_personality = global_config.personality.personality + if ( + hasattr(global_config.personality, "states") + and global_config.personality.states + and hasattr(global_config.personality, "state_probability") + and global_config.personality.state_probability > 0 + and random.random() < global_config.personality.state_probability + ): + prompt_personality = random.choice(global_config.personality.states) + + return f"你的名字是{bot_name}{bot_aliases},你{prompt_personality};" + except Exception as exc: + logger.warning(f"Failed to build Maisaka personality prompt: {exc}") + return "你的名字是麦麦,你是一个活泼可爱的 AI 助手。" + + @staticmethod + def _normalize_content(content: str, limit: int = 500) -> str: + normalized = " ".join((content or "").split()) + if len(normalized) > limit: + return normalized[:limit] + "..." + return normalized + + @staticmethod + def _format_message_time(message: SessionMessage) -> str: + return message.timestamp.strftime("%H:%M:%S") + + @staticmethod + def _extract_visible_assistant_reply(message: SessionMessage) -> str: + if is_perception_message(message): + return "" + return "" + + def _extract_guided_bot_reply(self, message: SessionMessage) -> str: + speaker_name, body = parse_speaker_content(get_message_text(message).strip()) + bot_nickname = global_config.bot.nickname.strip() or "Bot" + if speaker_name == bot_nickname: + return self._normalize_content(body.strip()) + return "" + + @staticmethod + def _split_user_message_segments(raw_content: str) -> list[tuple[Optional[str], str]]: + """按说话人拆分用户消息。""" + segments: list[tuple[Optional[str], str]] = [] + current_speaker: Optional[str] = None + current_lines: list[str] = [] + + for raw_line in raw_content.splitlines(): + speaker_name, content_body = parse_speaker_content(raw_line) + if speaker_name is not None: + if current_lines: + segments.append((current_speaker, "\n".join(current_lines))) + current_speaker = speaker_name + current_lines = [content_body] + continue + + current_lines.append(raw_line) + + if current_lines: + segments.append((current_speaker, "\n".join(current_lines))) + + return segments + + def _format_chat_history(self, messages: list[SessionMessage]) -> str: + """格式化回复器使用的可见聊天历史。""" + bot_nickname = global_config.bot.nickname.strip() or "Bot" + parts: list[str] = [] + + for message in messages: + role = get_message_role(message) + timestamp = self._format_message_time(message) + + if role == "user": + guided_reply = self._extract_guided_bot_reply(message) + if guided_reply: + parts.append(f"{timestamp} {bot_nickname}(you): {guided_reply}") + continue + + raw_content = get_message_text(message) + for speaker_name, content_body in self._split_user_message_segments(raw_content): + content = self._normalize_content(content_body) + if not content: + continue + visible_speaker = speaker_name or global_config.maisaka.user_name.strip() or "User" + parts.append(f"{timestamp} {visible_speaker}: {content}") + continue + + if role == "assistant": + visible_reply = self._extract_visible_assistant_reply(message) + if visible_reply: + parts.append(f"{timestamp} {bot_nickname}(you): {visible_reply}") + + return "\n".join(parts) + + def _build_prompt(self, chat_history: List[SessionMessage], reply_reason: str) -> str: + """构建 Maisaka replyer 提示词。""" + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + formatted_history = self._format_chat_history(chat_history) + + try: + system_prompt = load_prompt( + "maidairy_replyer", + bot_name=global_config.bot.nickname, + identity=self._personality_prompt, + reply_style=global_config.personality.reply_style, + ) + except Exception: + system_prompt = "你是一个友好的 AI 助手,请根据用户的想法生成自然的回复。" + + user_prompt = ( + f"当前时间:{current_time}\n\n" + f"【聊天记录】\n{formatted_history}\n\n" + f"【你的想法】\n{reply_reason}\n\n" + "现在,你说:" + ) + return f"System: {system_prompt}\n\nUser: {user_prompt}" + + async def generate_reply_with_context( + self, + extra_info: str = "", + reply_reason: str = "", + available_actions: Optional[Dict[str, ActionInfo]] = None, + chosen_actions: Optional[List[object]] = None, + enable_tool: bool = True, + from_plugin: bool = True, + stream_id: Optional[str] = None, + reply_message: Optional[SessionMessage] = None, + reply_time_point: Optional[float] = None, + think_level: int = 1, + unknown_words: Optional[List[str]] = None, + log_reply: bool = True, + chat_history: Optional[List[SessionMessage]] = None, + ) -> Tuple[bool, ReplyGenerationResult]: + """结合上下文生成 Maisaka 的最终可见回复。""" + del available_actions + del chosen_actions + del enable_tool + del extra_info + del from_plugin + del log_reply + del reply_time_point + del think_level + del unknown_words + + result = ReplyGenerationResult() + if not reply_reason or chat_history is None: + result.error_message = "reply_reason or chat_history is empty" + return False, result + + logger.info( + f"Maisaka replyer start: stream_id={stream_id} reply_reason={reply_reason!r} " + f"history_size={len(chat_history)} target_message_id=" + f"{reply_message.message_id if reply_message else None}" + ) + + filtered_history = [ + message + for message in chat_history + if get_message_role(message) != "system" and get_message_kind(message) != "perception" + ] + prompt = self._build_prompt(filtered_history, reply_reason) + result.completion.request_prompt = prompt + + if global_config.debug.show_replyer_prompt: + logger.info(f"\nMaisaka replyer prompt:\n{prompt}\n") + + started_at = time.perf_counter() + try: + generation_result = await self.express_model.generate_response(prompt) + except Exception as exc: + logger.exception("Maisaka replyer call failed") + result.error_message = str(exc) + result.metrics = GenerationMetrics( + overall_ms=round((time.perf_counter() - started_at) * 1000, 2), + ) + return False, result + + response_text = (generation_result.response or "").strip() + result.success = bool(response_text) + result.completion = LLMCompletionResult( + request_prompt=prompt, + response_text=response_text, + reasoning_text=generation_result.reasoning or "", + model_name=generation_result.model_name or "", + tool_calls=generation_result.tool_calls or [], + ) + result.metrics = GenerationMetrics( + overall_ms=round((time.perf_counter() - started_at) * 1000, 2), + ) + + if global_config.debug.show_replyer_reasoning and result.completion.reasoning_text: + logger.info(f"Maisaka replyer reasoning:\n{result.completion.reasoning_text}") + + if not result.success: + result.error_message = "replyer returned empty content" + logger.warning("Maisaka replyer returned empty content") + return False, result + + logger.info( + f"Maisaka replyer success: response_text={response_text!r} " + f"overall_ms={result.metrics.overall_ms}" + ) + result.text_fragments = [response_text] + return True, result diff --git a/src/chat/replyer/replyer_manager.py b/src/chat/replyer/replyer_manager.py index eb430585..6ba9ce02 100644 --- a/src/chat/replyer/replyer_manager.py +++ b/src/chat/replyer/replyer_manager.py @@ -1,65 +1,82 @@ -from typing import Dict, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional -from src.common.logger import get_logger from src.chat.message_receive.chat_manager import BotChatSession, chat_manager as _chat_manager -from src.chat.replyer.group_generator import DefaultReplyer -from src.chat.replyer.private_generator import PrivateReplyer +from src.common.logger import get_logger + +if TYPE_CHECKING: + from src.chat.replyer.group_generator import DefaultReplyer + from src.chat.replyer.maisaka_generator import MaisakaReplyGenerator + from src.chat.replyer.private_generator import PrivateReplyer logger = get_logger("ReplyerManager") class ReplyerManager: - def __init__(self): - self._repliers: Dict[str, DefaultReplyer | PrivateReplyer] = {} + """统一管理不同类型的回复生成器。""" + + def __init__(self) -> None: + self._repliers: Dict[str, Any] = {} def get_replyer( self, chat_stream: Optional[BotChatSession] = None, chat_id: Optional[str] = None, request_type: str = "replyer", - ) -> Optional[DefaultReplyer | PrivateReplyer]: - """ - 获取或创建回复器实例。 - - model_configs 仅在首次为某个 chat_id/stream_id 创建实例时有效。 - 后续调用将返回已缓存的实例,忽略 model_configs 参数。 - """ + replyer_type: str = "default", + ) -> Optional["DefaultReplyer | MaisakaReplyGenerator | PrivateReplyer"]: + """按会话和 replyer 类型获取实例。""" stream_id = chat_stream.session_id if chat_stream else chat_id if not stream_id: - logger.warning("[ReplyerManager] 缺少 stream_id,无法获取回复器。") + logger.warning("[ReplyerManager] 缺少 stream_id,无法获取 replyer") return None - # 如果已有缓存实例,直接返回 - if stream_id in self._repliers: - logger.debug(f"[ReplyerManager] 为 stream_id '{stream_id}' 返回已存在的回复器实例。") - return self._repliers[stream_id] + cache_key = f"{replyer_type}:{stream_id}" + if cache_key in self._repliers: + logger.info(f"[ReplyerManager] 命中缓存 replyer: cache_key={cache_key}") + return self._repliers[cache_key] - # 如果没有缓存,则创建新实例(首次初始化) - logger.debug(f"[ReplyerManager] 为 stream_id '{stream_id}' 创建新的回复器实例并缓存。") - - target_stream = chat_stream + target_stream = chat_stream or _chat_manager.get_session_by_session_id(stream_id) if not target_stream: - target_stream = _chat_manager.get_session_by_session_id(stream_id) - - if not target_stream: - logger.warning(f"[ReplyerManager] 未找到 stream_id='{stream_id}' 的聊天流,无法创建回复器。") + logger.warning(f"[ReplyerManager] 未找到会话,stream_id={stream_id}") return None - # model_configs 只在此时(初始化时)生效 - if target_stream.is_group_session: - replyer = DefaultReplyer( - chat_stream=target_stream, - request_type=request_type, - ) - else: - replyer = PrivateReplyer( - chat_stream=target_stream, - request_type=request_type, - ) + logger.info( + f"[ReplyerManager] 开始创建 replyer: cache_key={cache_key}, " + f"replyer_type={replyer_type}, is_group_session={target_stream.is_group_session}" + ) - self._repliers[stream_id] = replyer + try: + if replyer_type == "maisaka": + logger.info("[ReplyerManager] importing MaisakaReplyGenerator") + from src.chat.replyer.maisaka_generator import MaisakaReplyGenerator + + replyer = MaisakaReplyGenerator( + chat_stream=target_stream, + request_type=request_type, + ) + elif target_stream.is_group_session: + logger.info("[ReplyerManager] importing DefaultReplyer") + from src.chat.replyer.group_generator import DefaultReplyer + + replyer = DefaultReplyer( + chat_stream=target_stream, + request_type=request_type, + ) + else: + logger.info("[ReplyerManager] importing PrivateReplyer") + from src.chat.replyer.private_generator import PrivateReplyer + + replyer = PrivateReplyer( + chat_stream=target_stream, + request_type=request_type, + ) + except Exception: + logger.exception(f"[ReplyerManager] 创建 replyer 失败: cache_key={cache_key}") + raise + + self._repliers[cache_key] = replyer + logger.info(f"[ReplyerManager] replyer 创建完成: cache_key={cache_key}") return replyer -# 创建一个全局实例 replyer_manager = ReplyerManager() diff --git a/src/maisaka/chat_loop_service.py b/src/maisaka/chat_loop_service.py new file mode 100644 index 00000000..edaa61ae --- /dev/null +++ b/src/maisaka/chat_loop_service.py @@ -0,0 +1,389 @@ +from dataclasses import dataclass +from base64 import b64decode +from datetime import datetime +from io import BytesIO +from time import perf_counter +from typing import Any, Dict, List, Optional + +import asyncio +import random + +from PIL import Image as PILImage +from rich.console import Group +from rich.panel import Panel +from rich.pretty import Pretty +from rich.text import Text + +from src.chat.message_receive.message import SessionMessage +from src.common.data_models.llm_service_data_models import LLMGenerationOptions +from src.common.logger import get_logger +from src.common.prompt_i18n import load_prompt +from src.config.config import global_config +from src.llm_models.model_client.base_client import BaseClient +from src.llm_models.payload_content.message import Message, MessageBuilder, RoleType +from src.llm_models.payload_content.tool_option import ToolCall, ToolDefinitionInput, ToolOption, normalize_tool_options +from src.services.llm_service import LLMServiceClient + +from .builtin_tools import get_builtin_tools +from .console import console +from .knowledge import extract_category_ids_from_result +from .message_adapter import ( + build_message, + format_speaker_content, + to_llm_message, +) + + +@dataclass(slots=True) +class ChatResponse: + """LLM 对话循环单步响应。""" + + content: Optional[str] + tool_calls: List[ToolCall] + raw_message: SessionMessage + + +logger = get_logger("maisaka_chat_loop") + + +class MaisakaChatLoopService: + """负责 Maisaka 主对话循环、系统提示词和终端渲染。""" + + def __init__( + self, + chat_system_prompt: Optional[str] = None, + temperature: float = 0.5, + max_tokens: int = 2048, + ) -> None: + self._temperature = temperature + self._max_tokens = max_tokens + self._extra_tools: List[ToolOption] = [] + self._prompts_loaded = False + self._prompt_load_lock = asyncio.Lock() + self._personality_prompt = self._build_personality_prompt() + if chat_system_prompt is None: + self._chat_system_prompt = f"{self._personality_prompt}\n\nYou are a helpful AI assistant." + else: + self._chat_system_prompt = chat_system_prompt + self._llm_chat = LLMServiceClient(task_name="planner", request_type="maisaka_planner") + + @property + def personality_prompt(self) -> str: + return self._personality_prompt + + def _build_personality_prompt(self) -> str: + try: + bot_name = global_config.bot.nickname + if global_config.bot.alias_names: + bot_nickname = f", also known as {','.join(global_config.bot.alias_names)}" + else: + bot_nickname = "" + + prompt_personality = global_config.personality.personality + if ( + hasattr(global_config.personality, "states") + and global_config.personality.states + and hasattr(global_config.personality, "state_probability") + and global_config.personality.state_probability > 0 + and random.random() < global_config.personality.state_probability + ): + prompt_personality = random.choice(global_config.personality.states) + + return f"Your name is {bot_name}{bot_nickname}; persona: {prompt_personality};" + except Exception: + return "Your name is MaiMai; persona: lively and cute AI assistant." + + async def ensure_chat_prompt_loaded(self, tools_section: str = "") -> None: + if self._prompts_loaded: + return + + async with self._prompt_load_lock: + if self._prompts_loaded: + return + + try: + self._chat_system_prompt = load_prompt( + "maidairy_chat", + file_tools_section=tools_section, + bot_name=global_config.bot.nickname, + identity=self._personality_prompt, + ) + except Exception: + self._chat_system_prompt = f"{self._personality_prompt}\n\nYou are a helpful AI assistant." + + self._prompts_loaded = True + + def set_extra_tools(self, tools: List[ToolDefinitionInput]) -> None: + self._extra_tools = normalize_tool_options(tools) or [] + + async def analyze_knowledge_need( + self, + chat_history: List[SessionMessage], + categories_summary: str, + ) -> List[str]: + """分析当前对话是否需要检索知识库分类。""" + visible_history: List[str] = [] + for message in chat_history[-8:]: + if not message.content: + continue + role = getattr(message, "role", "") + visible_history.append(f"{role}: {message.content}") + + if not visible_history or not categories_summary.strip(): + return [] + + prompt = ( + "你需要判断当前对话是否需要查询知识库。\n" + "请只返回最相关的分类编号,多个编号用空格分隔;如果完全不需要,返回 none。\n\n" + f"【可用分类】\n{categories_summary}\n\n" + f"【最近对话】\n{chr(10).join(visible_history)}" + ) + + try: + generation_result = await self._llm_chat.generate_response( + prompt=prompt, + options=LLMGenerationOptions( + temperature=0.1, + max_tokens=64, + ), + ) + except Exception: + return [] + + return extract_category_ids_from_result(generation_result.response or "") + + @staticmethod + def _get_role_badge_style(role: str) -> str: + if role == "system": + return "bold white on blue" + if role == "user": + return "bold black on green" + if role == "assistant": + return "bold black on yellow" + if role == "tool": + return "bold white on magenta" + return "bold white on bright_black" + + @staticmethod + def _build_terminal_image_preview(image_base64: str) -> Optional[str]: + ascii_chars = " .:-=+*#%@" + + try: + image_bytes = b64decode(image_base64) + with PILImage.open(BytesIO(image_bytes)) as image: + grayscale = image.convert("L") + width, height = grayscale.size + if width <= 0 or height <= 0: + return None + + preview_width = max(8, int(global_config.maisaka.terminal_image_preview_width)) + preview_height = max(1, int(height * (preview_width / width) * 0.5)) + resized = grayscale.resize((preview_width, preview_height)) + pixels = list(resized.getdata()) + except Exception: + return None + + rows: List[str] = [] + for row_index in range(preview_height): + row_pixels = pixels[row_index * preview_width : (row_index + 1) * preview_width] + row = "".join(ascii_chars[min(len(ascii_chars) - 1, pixel * len(ascii_chars) // 256)] for pixel in row_pixels) + rows.append(row) + + return "\n".join(rows) + + @classmethod + def _render_message_content(cls, content: Any) -> object: + if isinstance(content, str): + return Text(content) + + if isinstance(content, list): + parts: List[object] = [] + for item in content: + if isinstance(item, str): + parts.append(Text(item)) + continue + if isinstance(item, tuple) and len(item) == 2: + image_format, image_base64 = item + if isinstance(image_format, str) and isinstance(image_base64, str): + approx_size = max(0, len(image_base64) * 3 // 4) + size_text = f"{approx_size / 1024:.1f} KB" if approx_size >= 1024 else f"{approx_size} B" + preview_parts: List[object] = [ + Text(f"image/{image_format} {size_text}\nbase64 omitted", style="magenta") + ] + if global_config.maisaka.terminal_image_preview: + preview_text = cls._build_terminal_image_preview(image_base64) + if preview_text: + preview_parts.append(Text(preview_text, style="white")) + parts.append( + Panel( + Group(*preview_parts), + border_style="magenta", + padding=(0, 1), + ) + ) + continue + if isinstance(item, dict) and item.get("type") == "text" and isinstance(item.get("text"), str): + parts.append(Text(item["text"])) + else: + parts.append(Pretty(item, expand_all=True)) + return Group(*parts) if parts else Text("") + + if content is None: + return Text("") + + return Pretty(content, expand_all=True) + + @staticmethod + def _format_tool_call_for_display(tool_call: Any) -> Dict[str, Any]: + if isinstance(tool_call, dict): + function_info = tool_call.get("function", {}) + return { + "id": tool_call.get("id"), + "name": function_info.get("name", tool_call.get("name")), + "arguments": function_info.get("arguments", tool_call.get("arguments")), + } + + return { + "id": getattr(tool_call, "call_id", getattr(tool_call, "id", None)), + "name": getattr(tool_call, "func_name", getattr(tool_call, "name", None)), + "arguments": getattr(tool_call, "args", getattr(tool_call, "arguments", None)), + } + + def _render_tool_call_panel(self, tool_call: Any, index: int, parent_index: int) -> Panel: + title = Text.assemble( + Text(" TOOL CALL ", style="bold white on magenta"), + Text(f" #{parent_index}.{index}", style="muted"), + ) + return Panel( + Pretty(self._format_tool_call_for_display(tool_call), expand_all=True), + title=title, + border_style="magenta", + padding=(0, 1), + ) + + def _render_message_panel(self, message: Any, index: int) -> Panel: + if isinstance(message, dict): + raw_role = message.get("role", "unknown") + content = message.get("content") + tool_call_id = message.get("tool_call_id") + else: + raw_role = getattr(message, "role", "unknown") + content = getattr(message, "content", None) + tool_call_id = getattr(message, "tool_call_id", None) + + role = raw_role.value if hasattr(raw_role, "value") else str(raw_role) + title = Text.assemble( + Text(f" {role.upper()} ", style=self._get_role_badge_style(role)), + Text(f" #{index}", style="muted"), + ) + + parts: List[object] = [] + if content not in (None, "", []): + parts.append(Text(" message ", style="bold cyan")) + parts.append(self._render_message_content(content)) + + if tool_call_id: + parts.append( + Text.assemble( + Text(" tool_call_id ", style="bold magenta"), + Text(" "), + Text(str(tool_call_id), style="magenta"), + ) + ) + + if not parts: + parts.append(Text("[empty message]", style="muted")) + + return Panel( + Group(*parts), + title=title, + border_style="dim", + padding=(0, 1), + ) + + async def chat_loop_step(self, chat_history: List[SessionMessage]) -> ChatResponse: + await self.ensure_chat_prompt_loaded() + + def message_factory(_client: BaseClient) -> List[Message]: + messages: List[Message] = [] + system_msg = MessageBuilder().set_role(RoleType.System) + system_msg.add_text_content(self._chat_system_prompt) + messages.append(system_msg.build()) + + for msg in chat_history: + llm_message = to_llm_message(msg) + if llm_message is not None: + messages.append(llm_message) + + return messages + + all_tools = [*get_builtin_tools(), *self._extra_tools] + built_messages = message_factory(None) + + ordered_panels: List[Panel] = [] + for index, msg in enumerate(built_messages, start=1): + ordered_panels.append(self._render_message_panel(msg, index)) + tool_calls = getattr(msg, "tool_calls", None) + if tool_calls: + for tool_call_index, tool_call in enumerate(tool_calls, start=1): + ordered_panels.append(self._render_tool_call_panel(tool_call, tool_call_index, index)) + + if global_config.maisaka.show_thinking and ordered_panels: + console.print( + Panel( + Group(*ordered_panels), + title="MaiSaka LLM Request - chat_loop_step", + border_style="cyan", + padding=(0, 1), + ) + ) + + request_started_at = perf_counter() + generation_result = await self._llm_chat.generate_response_with_messages( + message_factory=message_factory, + options=LLMGenerationOptions( + tool_options=all_tools if all_tools else None, + temperature=self._temperature, + max_tokens=self._max_tokens, + ), + ) + _ = perf_counter() - request_started_at + + tool_call_summaries = [ + { + "id": getattr(tool_call, "call_id", getattr(tool_call, "id", None)), + "name": getattr(tool_call, "func_name", getattr(tool_call, "name", None)), + "args": getattr(tool_call, "args", getattr(tool_call, "arguments", None)), + } + for tool_call in (generation_result.tool_calls or []) + ] + logger.info( + f"Maisaka planner returned content={generation_result.response or ''!r} " + f"tool_calls={tool_call_summaries}" + ) + + raw_message = build_message( + role=RoleType.Assistant.value, + content=generation_result.response or "", + source="assistant", + tool_calls=generation_result.tool_calls or None, + ) + return ChatResponse( + content=generation_result.response, + tool_calls=generation_result.tool_calls or [], + raw_message=raw_message, + ) + + @staticmethod + def build_chat_context(user_text: str) -> List[SessionMessage]: + return [ + build_message( + role=RoleType.User.value, + content=format_speaker_content( + global_config.maisaka.user_name.strip() or "用户", + user_text, + datetime.now(), + ), + source="user", + ) + ] diff --git a/src/maisaka/cli.py b/src/maisaka/cli.py index 790265da..8c84b997 100644 --- a/src/maisaka/cli.py +++ b/src/maisaka/cli.py @@ -14,13 +14,14 @@ from rich.panel import Panel from rich.text import Text from src.chat.message_receive.message import SessionMessage -from src.config.config import global_config +from src.chat.replyer.maisaka_generator import MaisakaReplyGenerator +from src.config.config import config_manager, global_config +from .chat_loop_service import MaisakaChatLoopService from .console import console from .input_reader import InputReader from .knowledge import retrieve_relevant_knowledge from .knowledge_store import get_knowledge_store -from .llm_service import MaiSakaLLMService from .message_adapter import build_message, format_speaker_content, remove_last_perception from .mcp_client import MCPManager from .tool_handlers import ( @@ -33,10 +34,11 @@ from .tool_handlers import ( class BufferCLI: - """Command line interface for Maisaka.""" + """Maisaka 命令行交互入口。""" - def __init__(self): - self.llm_service: Optional[MaiSakaLLMService] = None + def __init__(self) -> None: + self._chat_loop_service: Optional[MaisakaChatLoopService] = None + self._reply_generator = MaisakaReplyGenerator() self._reader = InputReader() self._chat_history: Optional[list[SessionMessage]] = None self._knowledge_store = get_knowledge_store() @@ -55,32 +57,38 @@ class BufferCLI: self._init_llm() def _init_llm(self) -> None: - """从主项目配置初始化 LLM 服务。""" + """初始化 Maisaka 使用的聊天服务。""" thinking_env = os.getenv("ENABLE_THINKING", "").strip().lower() enable_thinking: Optional[bool] = True if thinking_env == "true" else False if thinking_env == "false" else None - self.llm_service = MaiSakaLLMService( - api_key="", - base_url=None, - model="", - enable_thinking=enable_thinking, - ) + _ = enable_thinking + self._chat_loop_service = MaisakaChatLoopService() - model_name = self.llm_service.get_current_model_name() + model_name = self._get_current_model_name() console.print(f"[success][OK] LLM service initialized[/success] [muted](model: {model_name})[/muted]") + @staticmethod + def _get_current_model_name() -> str: + """读取当前 planner 模型名。""" + try: + model_task_config = config_manager.get_model_config().model_task_config + if model_task_config.planner.model_list: + return model_task_config.planner.model_list[0] + except Exception: + pass + return "unconfigured" + def _build_tool_context(self) -> ToolHandlerContext: - """Build the shared tool handler context.""" - ctx = ToolHandlerContext( - llm_service=self.llm_service, + """构建工具处理的共享上下文。""" + tool_context = ToolHandlerContext( reader=self._reader, user_input_times=self._user_input_times, ) - ctx.last_user_input_time = self._last_user_input_time - return ctx + tool_context.last_user_input_time = self._last_user_input_time + return tool_context - def _show_banner(self): - """Render the startup banner.""" + def _show_banner(self) -> None: + """渲染启动横幅。""" banner = Text() banner.append("MaiSaka", style="bold cyan") banner.append(" v2.0\n", style="muted") @@ -89,9 +97,9 @@ class BufferCLI: console.print(Panel(banner, box=box.DOUBLE_EDGE, border_style="cyan", padding=(1, 2))) console.print() - async def _start_chat(self, user_text: str): - """Append user input and continue the inner loop.""" - if not self.llm_service: + async def _start_chat(self, user_text: str) -> None: + """追加用户输入并继续内部循环。""" + if self._chat_loop_service is None: console.print("[warning]LLM service is not initialized; skipping chat.[/warning]") return @@ -102,13 +110,13 @@ class BufferCLI: if self._chat_history is None: self._chat_start_time = now self._last_assistant_response_time = None - self._chat_history = self.llm_service.build_chat_context(user_text) + self._chat_history = self._chat_loop_service.build_chat_context(user_text) else: self._chat_history.append( build_message( role="user", content=format_speaker_content( - global_config.maisaka.user_name.strip() or "用户", + global_config.maisaka.user_name.strip() or "User", user_text, now, ), @@ -117,7 +125,7 @@ class BufferCLI: await self._run_llm_loop(self._chat_history) - async def _run_llm_loop(self, chat_history: list[SessionMessage]): + async def _run_llm_loop(self, chat_history: list[SessionMessage]) -> None: """ Main inner loop for the Maisaka planner. @@ -126,12 +134,10 @@ class BufferCLI: - no_reply(): skip visible output and continue the loop - wait(seconds): wait for new user input - stop(): stop the current inner loop and return to idle - - Per round: - 1. Run enabled perception modules in parallel when the previous round used tools. - 2. Call the planner model with the current history. - 3. Append the assistant thought and execute any requested tools. """ + if self._chat_loop_service is None: + return + consecutive_errors = 0 last_had_tool_calls = True @@ -141,7 +147,7 @@ class BufferCLI: status_text_parts = [] if global_config.maisaka.enable_knowledge_module: - tasks.append(("knowledge", retrieve_relevant_knowledge(self.llm_service, chat_history))) + tasks.append(("knowledge", retrieve_relevant_knowledge(self._chat_loop_service, chat_history))) status_text_parts.append("knowledge") with console.status( @@ -183,13 +189,12 @@ class BufferCLI: source="assistant", ) ) - else: - if global_config.maisaka.show_thinking: - console.print("[muted]Skipping module analysis because the last round used no tools.[/muted]") + elif global_config.maisaka.show_thinking: + console.print("[muted]Skipping module analysis because the last round used no tools.[/muted]") with console.status("[info]AI is thinking...[/info]", spinner="dots"): try: - response = await self.llm_service.chat_loop_step(chat_history) + response = await self._chat_loop_service.chat_loop_step(chat_history) consecutive_errors = 0 except Exception as exc: consecutive_errors += 1 @@ -217,83 +222,83 @@ class BufferCLI: last_had_tool_calls = False continue - if response.tool_calls: - should_stop = False - ctx = self._build_tool_context() - - for tc in response.tool_calls: - if tc.func_name == "stop": - await handle_stop(tc, chat_history) - should_stop = True - - elif tc.func_name == "reply": - reply = await self._generate_visible_reply(chat_history, response.content) - chat_history.append( - build_message( - role="tool", - content="Visible reply generated and recorded.", - source="tool", - tool_call_id=tc.call_id, - ) - ) - chat_history.append( - build_message( - role="user", - content=format_speaker_content( - global_config.bot.nickname.strip() or "MaiSaka", - reply, - datetime.now(), - ), - source="guided_reply", - ) - ) - - elif tc.func_name == "no_reply": - if global_config.maisaka.show_thinking: - console.print("[muted]No visible reply this round.[/muted]") - chat_history.append( - build_message( - role="tool", - content="No visible reply was sent for this round.", - source="tool", - tool_call_id=tc.call_id, - ) - ) - - elif tc.func_name == "wait": - tool_result = await handle_wait(tc, chat_history, ctx) - if ctx.last_user_input_time != self._last_user_input_time: - self._last_user_input_time = ctx.last_user_input_time - if tool_result.startswith("[[QUIT]]"): - should_stop = True - - elif self._mcp_manager and self._mcp_manager.is_mcp_tool(tc.func_name): - await handle_mcp_tool(tc, chat_history, self._mcp_manager) - - else: - await handle_unknown_tool(tc, chat_history) - - if should_stop: - console.print("[muted]Conversation paused. Waiting for new input...[/muted]\n") - break - - last_had_tool_calls = True - else: + if not response.tool_calls: last_had_tool_calls = False continue - async def _init_mcp(self): - """Initialize MCP servers and register exposed tools.""" + should_stop = False + tool_context = self._build_tool_context() + + for tool_call in response.tool_calls: + if tool_call.func_name == "stop": + await handle_stop(tool_call, chat_history) + should_stop = True + + elif tool_call.func_name == "reply": + reply = await self._generate_visible_reply(chat_history, response.content) + chat_history.append( + build_message( + role="tool", + content="Visible reply generated and recorded.", + source="tool", + tool_call_id=tool_call.call_id, + ) + ) + chat_history.append( + build_message( + role="user", + content=format_speaker_content( + global_config.bot.nickname.strip() or "MaiSaka", + reply, + datetime.now(), + ), + source="guided_reply", + ) + ) + + elif tool_call.func_name == "no_reply": + if global_config.maisaka.show_thinking: + console.print("[muted]No visible reply this round.[/muted]") + chat_history.append( + build_message( + role="tool", + content="No visible reply was sent for this round.", + source="tool", + tool_call_id=tool_call.call_id, + ) + ) + + elif tool_call.func_name == "wait": + tool_result = await handle_wait(tool_call, chat_history, tool_context) + if tool_context.last_user_input_time != self._last_user_input_time: + self._last_user_input_time = tool_context.last_user_input_time + if tool_result.startswith("[[QUIT]]"): + should_stop = True + + elif self._mcp_manager and self._mcp_manager.is_mcp_tool(tool_call.func_name): + await handle_mcp_tool(tool_call, chat_history, self._mcp_manager) + + else: + await handle_unknown_tool(tool_call, chat_history) + + if should_stop: + console.print("[muted]Conversation paused. Waiting for new input...[/muted]\n") + break + + last_had_tool_calls = True + + async def _init_mcp(self) -> None: + """初始化 MCP 服务并注册暴露的工具。""" config_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), "mcp_config.json", ) self._mcp_manager = await MCPManager.from_config(config_path) - if self._mcp_manager and self.llm_service: + if self._mcp_manager and self._chat_loop_service: mcp_tools = self._mcp_manager.get_openai_tools() if mcp_tools: - self.llm_service.set_extra_tools(mcp_tools) + self._chat_loop_service.set_extra_tools(mcp_tools) summary = self._mcp_manager.get_tool_summary() console.print( Panel( @@ -305,12 +310,19 @@ class BufferCLI: ) async def _generate_visible_reply(self, chat_history: list[SessionMessage], latest_thought: str) -> str: - """Generate and emit a visible reply based on the latest thought.""" - if not self.llm_service or not latest_thought: + """根据最新思考生成并输出可见回复。""" + if not latest_thought: return "" with console.status("[info]Generating visible reply...[/info]", spinner="dots"): - reply = await self.llm_service.generate_reply(latest_thought, chat_history) + success, result = await self._reply_generator.generate_reply_with_context( + reply_reason=latest_thought, + chat_history=chat_history, + ) + if success and result.text_fragments: + reply = result.text_fragments[0] + else: + reply = "..." console.print( Panel( @@ -323,8 +335,8 @@ class BufferCLI: return reply - async def run(self): - """Main interactive loop.""" + async def run(self) -> None: + """主交互循环。""" if global_config.maisaka.enable_mcp: await self._init_mcp() else: diff --git a/src/maisaka/knowledge.py b/src/maisaka/knowledge.py index b6fc5773..bb8d340b 100644 --- a/src/maisaka/knowledge.py +++ b/src/maisaka/knowledge.py @@ -42,7 +42,7 @@ def extract_category_ids_from_result(result: str) -> List[str]: async def retrieve_relevant_knowledge( - llm_service, + knowledge_analyzer, chat_history: List[SessionMessage], ) -> str: """Retrieve formatted knowledge snippets relevant to the current chat history.""" @@ -50,7 +50,7 @@ async def retrieve_relevant_knowledge( categories_summary = store.get_categories_summary() try: - category_ids = await llm_service.analyze_knowledge_need(chat_history, categories_summary) + category_ids = await knowledge_analyzer.analyze_knowledge_need(chat_history, categories_summary) if not category_ids: return "" return store.get_formatted_knowledge(category_ids) diff --git a/src/maisaka/llm_service.py b/src/maisaka/llm_service.py deleted file mode 100644 index 78d61a50..00000000 --- a/src/maisaka/llm_service.py +++ /dev/null @@ -1,660 +0,0 @@ -"""MaiSaka LLM 服务。 - -该模块基于主项目服务层封装 MaiSaka 所需的对话与工具调用接口。 -""" - -from base64 import b64decode -from dataclasses import dataclass -from datetime import datetime -from io import BytesIO -from time import perf_counter -from typing import Any, Dict, List, Optional - -import asyncio -import random - -from PIL import Image as PILImage -from rich.console import Group -from rich.panel import Panel -from rich.pretty import Pretty -from rich.text import Text - -from src.chat.message_receive.message import SessionMessage -from src.common.data_models.llm_service_data_models import LLMGenerationOptions -from src.common.logger import get_logger -from src.common.prompt_i18n import load_prompt -from src.config.config import config_manager, global_config -from src.llm_models.model_client.base_client import BaseClient -from src.llm_models.payload_content.message import Message, MessageBuilder, RoleType -from src.llm_models.payload_content.tool_option import ( - ToolCall, - ToolDefinitionInput, - ToolOption, - normalize_tool_options, -) -from src.services.llm_service import LLMServiceClient - -from .builtin_tools import get_builtin_tools -from .console import console -from .message_adapter import ( - build_message, - format_speaker_content, - get_message_kind, - get_message_role, - get_message_text, - get_tool_calls, - to_llm_message, -) - -logger = get_logger("maisaka_llm") - -@dataclass(slots=True) -class ChatResponse: - """LLM 对话循环单步响应。""" - - content: Optional[str] - tool_calls: List[ToolCall] - raw_message: SessionMessage - - -class MaiSakaLLMService: - """MaiSaka LLM 服务 - 适配主项目 LLM 系统""" - - def __init__( - self, - api_key: Optional[str] = None, - base_url: Optional[str] = None, - model: Optional[str] = None, - chat_system_prompt: Optional[str] = None, - temperature: float = 0.5, - max_tokens: int = 2048, - enable_thinking: Optional[bool] = None, - ) -> None: - """初始化 MaiSaka LLM 服务。 - - Args: - api_key: 兼容旧接口保留的参数,当前不使用。 - base_url: 兼容旧接口保留的参数,当前不使用。 - model: 兼容旧接口保留的参数,当前不使用。 - chat_system_prompt: 可选的系统提示词覆盖值。 - temperature: 默认温度参数。 - max_tokens: 默认最大输出 token 数。 - enable_thinking: 是否启用思考模式。 - """ - del api_key, base_url, model - self._temperature = temperature - self._max_tokens = max_tokens - self._enable_thinking = enable_thinking - self._extra_tools: List[ToolOption] = [] - self._prompts_loaded = False - self._prompt_load_lock = asyncio.Lock() - - # 初始化服务层 LLM 门面(按任务名实时解析配置,确保热重载生效) - self._llm_tool_use = LLMServiceClient(task_name="tool_use", request_type="maisaka_tool_use") - # 主对话也使用 planner 模型 - self._llm_planner = LLMServiceClient(task_name="planner", request_type="maisaka_planner") - self._llm_chat = self._llm_planner - self._llm_utils = self._llm_tool_use - # 回复生成使用 replyer 模型 - self._llm_replyer = LLMServiceClient(task_name="replyer", request_type="maisaka_replyer") - - - # 构建人设信息 - personality_prompt = self._build_personality_prompt() - self._personality_prompt = personality_prompt - - # 提示词在真正调用 LLM 前异步懒加载,避免在已有事件循环中嵌套 run_until_complete - if chat_system_prompt is None: - self._chat_system_prompt = f"{personality_prompt}\n\n你是一个友好的 AI 助手。" - else: - self._chat_system_prompt = chat_system_prompt - - def get_current_model_name(self) -> str: - """获取当前 Maisaka 对话主模型名称。 - - Returns: - str: 当前 planner 任务的首选模型名;未配置时返回 ``未配置``。 - """ - try: - model_task_config = config_manager.get_model_config().model_task_config - if model_task_config.planner.model_list: - return model_task_config.planner.model_list[0] - except Exception as exc: - logger.warning(f"获取当前 Maisaka 模型名称失败: {exc}") - return "未配置" - - - def _build_personality_prompt(self) -> str: - """构建当前人设提示词。 - - Returns: - str: 最终用于系统提示词的人设描述。 - """ - try: - bot_name = global_config.bot.nickname - if global_config.bot.alias_names: - bot_nickname = f",也有人叫你{','.join(global_config.bot.alias_names)}" - else: - bot_nickname = "" - - # 获取基础personality - prompt_personality = global_config.personality.personality - - # 检查是否需要随机替换为状态(personality 本体) - if ( - hasattr(global_config.personality, "states") - and global_config.personality.states - and hasattr(global_config.personality, "state_probability") - and global_config.personality.state_probability > 0 - and random.random() < global_config.personality.state_probability - ): - # 随机选择一个状态替换personality - selected_state = random.choice(global_config.personality.states) - prompt_personality = selected_state - - prompt_personality = f"{prompt_personality};" - return f"你的名字是{bot_name}{bot_nickname},你{prompt_personality}" - except Exception as e: - logger.warning(f"构建人设信息失败: {e}") - # 返回默认人设 - return "你的名字是麦麦,你是一个活泼可爱的AI助手。" - - def set_extra_tools(self, tools: List[ToolDefinitionInput]) -> None: - """设置额外工具定义。 - - Args: - tools: 外部传入的工具定义列表,例如 MCP 暴露的 OpenAI-compatible 工具。 - """ - self._extra_tools = normalize_tool_options(tools) or [] - logger.info(f"已为 Maisaka 加载 {len(self._extra_tools)} 个额外工具") - - async def _ensure_prompts_loaded(self) -> None: - """异步懒加载提示词。 - - Returns: - None: 该方法仅刷新内部提示词缓存。 - """ - if self._prompts_loaded: - return - - async with self._prompt_load_lock: - if self._prompts_loaded: - return - - try: - tools_section = "" - if False and global_config.maisaka.enable_write_file: - tools_section += "\n• write_file(filename, content) — 在 mai_files 目录下写入文件。" - if False and global_config.maisaka.enable_read_file: - tools_section += "\n• read_file(filename) — 读取 mai_files 目录下的文件内容。" - if False and global_config.maisaka.enable_list_files: - tools_section += "\n• list_files() — 获取 mai_files 目录下所有文件的元信息列表。" - self._chat_system_prompt = load_prompt( - "maidairy_chat", - file_tools_section=tools_section if tools_section else "", - bot_name=global_config.bot.nickname, - identity=self._personality_prompt, - ) - logger.info(f"系统提示词已渲染,长度: {len(self._chat_system_prompt)}") - except Exception as e: - logger.error(f"加载系统提示词失败: {e}") - self._chat_system_prompt = f"{self._personality_prompt}\n\n你是一个友好的 AI 助手。" - - self._prompts_loaded = True - - @staticmethod - def _get_role_badge_style(role: str) -> str: - """为不同角色返回终端标签样式。 - - Args: - role: 消息角色名称。 - - Returns: - str: Rich 可识别的样式字符串。 - """ - if role == "system": - return "bold white on blue" - if role == "user": - return "bold black on green" - if role == "assistant": - return "bold black on yellow" - if role == "tool": - return "bold white on magenta" - return "bold white on bright_black" - - @staticmethod - def _build_terminal_image_preview(image_base64: str) -> Optional[str]: - """构建终端 ASCII 图片预览。 - - Args: - image_base64: 图片的 Base64 数据。 - - Returns: - Optional[str]: 可渲染的 ASCII 预览文本;失败时返回 `None`。 - """ - ascii_chars = " .:-=+*#%@" - - try: - image_bytes = b64decode(image_base64) - with PILImage.open(BytesIO(image_bytes)) as image: - grayscale = image.convert("L") - width, height = grayscale.size - if width <= 0 or height <= 0: - return None - - preview_width = max(8, int(global_config.maisaka.terminal_image_preview_width)) - preview_height = max(1, int(height * (preview_width / width) * 0.5)) - resized = grayscale.resize((preview_width, preview_height)) - pixels = list(resized.getdata()) - except Exception: - return None - - rows: List[str] = [] - for row_index in range(preview_height): - row_pixels = pixels[row_index * preview_width : (row_index + 1) * preview_width] - row = "".join(ascii_chars[min(len(ascii_chars) - 1, pixel * len(ascii_chars) // 256)] for pixel in row_pixels) - rows.append(row) - - return "\n".join(rows) - - @staticmethod - def _render_message_content(content: Any) -> object: - """将消息内容转换为 Rich 可渲染对象。 - - Args: - content: 原始消息内容。 - - Returns: - object: Rich 可渲染对象。 - """ - if isinstance(content, str): - return Text(content) - - if isinstance(content, list): - parts: List[object] = [] - for item in content: - if isinstance(item, str): - parts.append(Text(item)) - continue - if isinstance(item, tuple) and len(item) == 2: - image_format, image_base64 = item - if isinstance(image_format, str) and isinstance(image_base64, str): - approx_size = max(0, len(image_base64) * 3 // 4) - size_text = f"{approx_size / 1024:.1f} KB" if approx_size >= 1024 else f"{approx_size} B" - preview_parts: List[object] = [ - Text(f"image/{image_format} {size_text}\nbase64 omitted", style="magenta") - ] - if global_config.maisaka.terminal_image_preview: - preview_text = MaiSakaLLMService._build_terminal_image_preview(image_base64) - if preview_text: - preview_parts.append(Text(preview_text, style="white")) - parts.append( - Panel( - Group(*preview_parts), - border_style="magenta", - padding=(0, 1), - ) - ) - continue - if isinstance(item, dict) and item.get("type") == "text" and isinstance(item.get("text"), str): - parts.append(Text(item["text"])) - else: - parts.append(Pretty(item, expand_all=True)) - return Group(*parts) if parts else Text("") - - if content is None: - return Text("") - - return Pretty(content, expand_all=True) - - @staticmethod - def _format_tool_call_for_display(tool_call: Any) -> Dict[str, Any]: - """将工具调用转换为 CLI 展示结构。 - - Args: - tool_call: 原始工具调用对象或字典。 - - Returns: - Dict[str, Any]: 统一后的展示字典。 - """ - if isinstance(tool_call, dict): - function_info = tool_call.get("function", {}) - return { - "id": tool_call.get("id"), - "name": function_info.get("name", tool_call.get("name")), - "arguments": function_info.get("arguments", tool_call.get("arguments")), - } - - return { - "id": getattr(tool_call, "call_id", getattr(tool_call, "id", None)), - "name": getattr(tool_call, "func_name", getattr(tool_call, "name", None)), - "arguments": getattr(tool_call, "args", getattr(tool_call, "arguments", None)), - } - - def _render_tool_call_panel(self, tool_call: Any, index: int, parent_index: int) -> Panel: - """渲染单个工具调用面板。 - - Args: - tool_call: 原始工具调用对象或字典。 - index: 当前工具调用在父消息中的序号。 - parent_index: 父消息在消息列表中的序号。 - - Returns: - Panel: 可直接打印的工具调用面板。 - """ - title = Text.assemble( - Text(" TOOL CALL ", style="bold white on magenta"), - Text(f" #{parent_index}.{index}", style="muted"), - ) - return Panel( - Pretty(self._format_tool_call_for_display(tool_call), expand_all=True), - title=title, - border_style="magenta", - padding=(0, 1), - ) - - def _render_message_panel(self, message: Any, index: int) -> Panel: - """渲染主循环 Prompt 中的一条消息。 - - Args: - message: 原始消息对象或字典。 - index: 当前消息序号。 - - Returns: - Panel: 可直接打印的消息面板。 - """ - if isinstance(message, dict): - raw_role = message.get("role", "unknown") - content = message.get("content") - tool_call_id = message.get("tool_call_id") - else: - raw_role = getattr(message, "role", "unknown") - content = getattr(message, "content", None) - tool_call_id = getattr(message, "tool_call_id", None) - - role = raw_role.value if hasattr(raw_role, "value") else str(raw_role) - title = Text.assemble( - Text(f" {role.upper()} ", style=self._get_role_badge_style(role)), - Text(f" #{index}", style="muted"), - ) - - parts: List[object] = [] - if content not in (None, "", []): - parts.append(Text(" message ", style="bold cyan")) - parts.append(self._render_message_content(content)) - - if tool_call_id: - parts.append( - Text.assemble( - Text(" tool_call_id ", style="bold magenta"), - Text(" "), - Text(str(tool_call_id), style="magenta"), - ) - ) - - if not parts: - parts.append(Text("[empty message]", style="muted")) - - return Panel( - Group(*parts), - title=title, - border_style="dim", - padding=(0, 1), - ) - - async def chat_loop_step(self, chat_history: List[SessionMessage]) -> ChatResponse: - """执行主对话循环的一步。 - - Args: - chat_history: 当前对话历史。 - - Returns: - ChatResponse: 本轮对话生成结果。 - """ - await self._ensure_prompts_loaded() - - def message_factory(_client: BaseClient) -> List[Message]: - """将 MaiSaka 对话历史转换为内部消息列表。 - - Args: - _client: 当前底层客户端实例。 - - Returns: - List[Message]: 规范化后的消息列表。 - """ - messages: List[Message] = [] - - # 首先添加系统提示词 - system_msg = MessageBuilder().set_role(RoleType.System) - system_msg.add_text_content(self._chat_system_prompt) - messages.append(system_msg.build()) - - # 然后添加对话历史 - for msg in chat_history: - llm_message = to_llm_message(msg) - if llm_message is not None: - messages.append(llm_message) - - return messages - - # 调用 LLM(使用带消息的接口) - # 合并内置工具和额外工具,统一交给底层规范化流程处理。 - all_tools = [*get_builtin_tools(), *self._extra_tools] - - # 打印消息列表 - built_messages = message_factory(None) - - ordered_panels: List[Panel] = [] - for index, msg in enumerate(built_messages, start=1): - ordered_panels.append(self._render_message_panel(msg, index)) - tool_calls = getattr(msg, "tool_calls", None) - if tool_calls: - for tool_call_index, tool_call in enumerate(tool_calls, start=1): - ordered_panels.append(self._render_tool_call_panel(tool_call, tool_call_index, index)) - - if global_config.maisaka.show_thinking and ordered_panels: - console.print( - Panel( - Group(*ordered_panels), - title="MaiSaka LLM Request - chat_loop_step", - border_style="cyan", - padding=(0, 1), - ) - ) - logger.info(f"对话循环步骤的提示展示已完成(共 {len(built_messages)} 条消息,{len(all_tools)} 个工具)") - - - request_started_at = perf_counter() - logger.info("对话循环步骤正在调用规划模型生成响应") - generation_result = await self._llm_chat.generate_response_with_messages( - message_factory=message_factory, - options=LLMGenerationOptions( - tool_options=all_tools if all_tools else None, - temperature=self._temperature, - max_tokens=self._max_tokens, - ), - ) - response = generation_result.response - model = generation_result.model_name - tool_calls = generation_result.tool_calls - elapsed = perf_counter() - request_started_at - logger.info( - f"对话循环步骤中的规划模型已返回,耗时 {elapsed:.2f} 秒" - f"(模型={model},工具调用数={len(tool_calls or [])},回复长度={len(response or '')})" - ) - raw_message = build_message( - role=RoleType.Assistant.value, - content=response or "", - source="assistant", - tool_calls=tool_calls or None, - ) - logger.info("已将规划模型响应转换为 SessionMessage") - - return ChatResponse( - content=response, - tool_calls=tool_calls or [], - raw_message=raw_message, - ) - - def _filter_for_api(self, chat_history: List[SessionMessage]) -> str: - """将对话历史过滤为简单文本格式。 - - Args: - chat_history: 当前对话历史。 - - Returns: - str: 过滤后的文本上下文。 - """ - parts = [] - for msg in chat_history: - role = get_message_role(msg) - content = get_message_text(msg) - - # 跳过内部字段 - if get_message_kind(msg) == "perception" or role == RoleType.Tool.value: - continue - - if role == RoleType.System.value: - parts.append(f"System: {content}") - elif role == RoleType.User.value: - parts.append(f"User: {content}") - elif role == RoleType.Assistant.value: - # 处理工具调用 - tool_calls = get_tool_calls(msg) - if tool_calls: - tool_desc = ", ".join([tc.func_name for tc in tool_calls if tc.func_name]) - parts.append(f"Assistant (called tools: {tool_desc})") - else: - parts.append(f"Assistant: {content}") - - return "\n\n".join(parts) - - def build_chat_context(self, user_text: str) -> List[SessionMessage]: - """构建新的对话上下文。 - - Args: - user_text: 用户输入文本。 - - Returns: - List[SessionMessage]: 初始对话上下文消息列表。 - """ - return [ - build_message( - role=RoleType.User.value, - content=format_speaker_content( - global_config.maisaka.user_name.strip() or "用户", - user_text, - datetime.now(), - ), - source="user", - ) - ] - - async def _removed_analyze_timing(self, chat_history: List[SessionMessage], timing_info: str) -> str: - """执行时间节奏分析。 - - Args: - chat_history: 当前对话历史。 - timing_info: 外部传入的时间信息摘要。 - - Returns: - str: 时间分析文本。 - """ - await self._ensure_prompts_loaded() - filtered = [ - m - for m in chat_history - if get_message_kind(m) != "perception" and get_message_role(m) != RoleType.System.value - ] - - # 使用加载的系统提示词 - system_prompt = self._timing_prompt or "请分析以下对话的时间节奏和用户状态:" - - prompt_parts = [f"{system_prompt}\n\n【系统时间戳信息】\n{timing_info}\n\n【当前对话记录】\n"] - for msg in filtered: - role = get_message_role(msg) - content = get_message_text(msg) - if role == RoleType.User.value: - prompt_parts.append(f"{global_config.maisaka.user_name.strip() or '用户'}: {content}") - elif role == RoleType.Assistant.value: - prompt_parts.append(f"助手: {content}") - - prompt = "\n".join(prompt_parts) - - if False: - print("\n" + "=" * 60) - print("MaiSaka LLM Request - analyze_timing:") - print(f" {prompt}") - print("=" * 60 + "\n") - - try: - generation_result = await self._llm_utils.generate_response( - prompt=prompt, - options=LLMGenerationOptions(temperature=0.3, max_tokens=512), - ) - response = generation_result.response - - return response - except Exception as e: - logger.error(f"时间分析 LLM 调用出错: {e}") - return "" - - # ──────── 回复生成(使用 replyer 模型) ──────── - - async def generate_reply(self, reason: str, chat_history: List[SessionMessage]) -> str: - """生成最终回复文本。 - - Args: - reason: 当前轮次的内部想法或回复理由。 - chat_history: 当前对话历史。 - - Returns: - str: 最终回复文本。 - """ - await self._ensure_prompts_loaded() - from datetime import datetime - from .replyer import format_chat_history - - current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - # 格式化对话历史 - filtered_history = [ - msg - for msg in chat_history - if get_message_role(msg) != RoleType.System.value and get_message_kind(msg) != "perception" - ] - formatted_history = format_chat_history(filtered_history) - - # 获取回复提示词 - try: - system_prompt = load_prompt( - "maidairy_replyer", - bot_name=global_config.bot.nickname, - identity=self._personality_prompt, - reply_style=global_config.personality.reply_style, - ) - except Exception: - system_prompt = "你是一个友好的 AI 助手,请根据用户的想法生成自然的回复。" - - user_prompt = ( - f"当前时间:{current_time}\n\n【聊天记录】\n{formatted_history}\n\n【你的想法】\n{reason}\n\n现在,你说:" - ) - - messages = f"System: {system_prompt}\n\nUser: {user_prompt}" - - if global_config.maisaka.show_thinking: - print("\n" + "=" * 60) - print("MaiSaka LLM Request - generate_reply:") - print(f" {messages}") - print("=" * 60 + "\n") - - try: - generation_result = await self._llm_replyer.generate_response( - prompt=messages, - options=LLMGenerationOptions(temperature=0.8, max_tokens=512), - ) - response = generation_result.response - return response.strip() if response else "..." - except Exception as e: - logger.error(f"回复生成 LLM 调用出错: {e}") - return "..." diff --git a/src/maisaka/reasoning_engine.py b/src/maisaka/reasoning_engine.py index 43786afa..f3b38171 100644 --- a/src/maisaka/reasoning_engine.py +++ b/src/maisaka/reasoning_engine.py @@ -7,8 +7,10 @@ from typing import TYPE_CHECKING, Optional from src.chat.heart_flow.heartFC_utils import CycleDetail from src.chat.message_receive.message import SessionMessage +from src.chat.replyer.replyer_manager import replyer_manager from src.common.data_models.mai_message_data_model import UserInfo from src.common.data_models.message_component_data_model import MessageSequence +from src.common.logger import get_logger from src.config.config import global_config from src.llm_models.payload_content.tool_option import ToolCall from src.services import send_service @@ -28,6 +30,8 @@ from .tool_handlers import ( if TYPE_CHECKING: from .runtime import MaisakaHeartFlowChatting +logger = get_logger("maisaka_reasoning_engine") + class MaisakaReasoningEngine: """负责内部思考、推理与工具执行。""" @@ -54,7 +58,7 @@ class MaisakaReasoningEngine: self._runtime._log_cycle_started(cycle_detail, round_index) try: planner_started_at = time.time() - response = await self._runtime._llm_service.chat_loop_step(self._runtime._chat_history) + response = await self._runtime._chat_loop_service.chat_loop_step(self._runtime._chat_history) cycle_detail.time_records["planner"] = time.time() - planner_started_at response.raw_message.platform = anchor_message.platform @@ -87,6 +91,9 @@ class MaisakaReasoningEngine: except asyncio.CancelledError: self._runtime._log_internal_loop_cancelled() raise + except Exception: + logger.exception("%s Maisaka internal loop crashed", self._runtime.log_prefix) + raise async def _ingest_messages(self, messages: list[SessionMessage]) -> None: """处理传入消息列表,将其转换为历史消息并加入聊天历史缓存。""" @@ -252,13 +259,92 @@ class MaisakaReasoningEngine: ) return False - reply_text = await self._runtime._llm_service.generate_reply(latest_thought, self._runtime._chat_history) - sent = await send_service.text_to_stream( - text=reply_text, - stream_id=self._runtime.session_id, - set_reply=True, - reply_message=target_message, - typing=False, + logger.info( + f"{self._runtime.log_prefix} reply tool triggered: " + f"target_msg_id={target_message_id} latest_thought={latest_thought!r}" + ) + logger.info(f"{self._runtime.log_prefix} acquiring Maisaka reply generator") + try: + replyer = replyer_manager.get_replyer( + chat_stream=self._runtime.chat_stream, + request_type="maisaka_replyer", + replyer_type="maisaka", + ) + except Exception: + logger.exception( + f"{self._runtime.log_prefix} replyer_manager.get_replyer crashed: " + f"target_msg_id={target_message_id}" + ) + self._runtime._chat_history.append( + self._build_tool_message(tool_call, "Maisaka reply generator acquisition crashed.") + ) + return False + + if replyer is None: + logger.error(f"{self._runtime.log_prefix} failed to acquire Maisaka reply generator") + self._runtime._chat_history.append( + self._build_tool_message(tool_call, "Maisaka reply generator is unavailable.") + ) + return False + + logger.info(f"{self._runtime.log_prefix} acquired Maisaka reply generator successfully") + + try: + success, reply_result = await replyer.generate_reply_with_context( + reply_reason=latest_thought, + stream_id=self._runtime.session_id, + reply_message=target_message, + chat_history=self._runtime._chat_history, + log_reply=False, + ) + except Exception: + logger.exception(f"{self._runtime.log_prefix} reply generator crashed: target_msg_id={target_message_id}") + self._runtime._chat_history.append( + self._build_tool_message(tool_call, "Visible reply generation crashed.") + ) + return False + + logger.info( + f"{self._runtime.log_prefix} reply generator finished: " + f"success={success} response_text={reply_result.completion.response_text!r} " + f"error={reply_result.error_message!r}" + ) + reply_text = reply_result.completion.response_text.strip() if success else "" + if not reply_text: + logger.warning( + f"{self._runtime.log_prefix} reply generator returned empty text: " + f"target_msg_id={target_message_id} error={reply_result.error_message!r}" + ) + self._runtime._chat_history.append( + self._build_tool_message(tool_call, "Visible reply generation failed.") + ) + return False + + logger.info( + f"{self._runtime.log_prefix} sending guided reply: " + f"target_msg_id={target_message_id} reply_text={reply_text!r}" + ) + try: + sent = await send_service.text_to_stream( + text=reply_text, + stream_id=self._runtime.session_id, + set_reply=True, + reply_message=target_message, + typing=False, + ) + except Exception: + logger.exception( + f"{self._runtime.log_prefix} send_service.text_to_stream crashed " + f"for target_msg_id={target_message_id}" + ) + self._runtime._chat_history.append( + self._build_tool_message(tool_call, "Visible reply send crashed.") + ) + return False + + logger.info( + f"{self._runtime.log_prefix} guided reply send result: " + f"target_msg_id={target_message_id} sent={sent}" ) tool_result = "Visible reply generated and sent." if sent else "Visible reply generation succeeded but send failed." self._runtime._chat_history.append(self._build_tool_message(tool_call, tool_result)) diff --git a/src/maisaka/replyer.py b/src/maisaka/replyer.py deleted file mode 100644 index ef2b54f2..00000000 --- a/src/maisaka/replyer.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -MaiSaka reply helper. -""" - -from typing import Optional - -from src.chat.message_receive.message import SessionMessage -from src.config.config import global_config - -from .llm_service import MaiSakaLLMService -from .message_adapter import get_message_role, get_message_text, is_perception_message, parse_speaker_content - - -def _normalize_content(content: str, limit: int = 500) -> str: - normalized = " ".join((content or "").split()) - if len(normalized) > limit: - return normalized[:limit] + "..." - return normalized - - -def _format_message_time(message: SessionMessage) -> str: - return message.timestamp.strftime("%H:%M:%S") - - -def _extract_visible_assistant_reply(message: SessionMessage) -> str: - if is_perception_message(message): - return "" - return "" - - -def _extract_guided_bot_reply(message: SessionMessage) -> str: - speaker_name, body = parse_speaker_content(get_message_text(message).strip()) - bot_nickname = global_config.bot.nickname.strip() or "Bot" - if speaker_name == bot_nickname: - return _normalize_content(body.strip()) - return "" - - -def _split_user_message_segments(raw_content: str) -> list[tuple[Optional[str], str]]: - """Split a user message into speaker-labeled segments. - - A new segment only starts when a line explicitly begins with `[speaker]`. - Continuation lines remain part of the current speaker's message. - """ - segments: list[tuple[Optional[str], str]] = [] - current_speaker: Optional[str] = None - current_lines: list[str] = [] - - for raw_line in raw_content.splitlines(): - speaker_name, content_body = parse_speaker_content(raw_line) - if speaker_name is not None: - if current_lines: - segments.append((current_speaker, "\n".join(current_lines))) - current_speaker = speaker_name - current_lines = [content_body] - continue - - current_lines.append(raw_line) - - if current_lines: - segments.append((current_speaker, "\n".join(current_lines))) - - return segments - - -def format_chat_history(messages: list[SessionMessage]) -> str: - """Format visible chat history for reply generation.""" - bot_nickname = global_config.bot.nickname.strip() or "Bot" - parts: list[str] = [] - - for message in messages: - role = get_message_role(message) - timestamp = _format_message_time(message) - - if role == "user": - guided_reply = _extract_guided_bot_reply(message) - if guided_reply: - parts.append(f"{timestamp} {bot_nickname}(you): {guided_reply}") - continue - - raw_content = get_message_text(message) - for speaker_name, content_body in _split_user_message_segments(raw_content): - content = _normalize_content(content_body) - if not content: - continue - visible_speaker = speaker_name or global_config.maisaka.user_name.strip() or "用户" - parts.append(f"{timestamp} {visible_speaker}: {content}") - continue - - if role == "assistant": - visible_reply = _extract_visible_assistant_reply(message) - if visible_reply: - parts.append(f"{timestamp} {bot_nickname}(you): {visible_reply}") - - return "\n".join(parts) - - -class Replyer: - """Generate visible replies from thoughts and context.""" - - def __init__(self, llm_service: Optional[MaiSakaLLMService] = None): - self._llm_service = llm_service - self._enabled = True - - def set_llm_service(self, llm_service: MaiSakaLLMService) -> None: - self._llm_service = llm_service - - def set_enabled(self, enabled: bool) -> None: - self._enabled = enabled - - async def reply(self, reason: str, chat_history: list[SessionMessage]) -> str: - if not self._enabled or not reason or self._llm_service is None: - return "..." - - return await self._llm_service.generate_reply(reason, chat_history) diff --git a/src/maisaka/runtime.py b/src/maisaka/runtime.py index 20c85b1e..0d8b28da 100644 --- a/src/maisaka/runtime.py +++ b/src/maisaka/runtime.py @@ -18,7 +18,7 @@ from src.config.config import global_config from src.llm_models.payload_content.tool_option import ToolCall from src.services import send_service -from .llm_service import MaiSakaLLMService +from .chat_loop_service import MaisakaChatLoopService from .mcp_client import MCPManager from .message_adapter import ( build_message, @@ -51,7 +51,7 @@ class MaisakaHeartFlowChatting: session_name = chat_manager.get_session_name(session_id) or session_id self.log_prefix = f"[{session_name}]" - self._llm_service = MaiSakaLLMService(api_key="", base_url=None, model="") + self._chat_loop_service = MaisakaChatLoopService() self._chat_history: list[SessionMessage] = [] self.history_loop: list[CycleDetail] = [] self.message_cache: list[SessionMessage] = [] @@ -201,11 +201,23 @@ class MaisakaHeartFlowChatting: def _drain_message_cache(self) -> list[SessionMessage]: """Drain the current message cache as one processing batch.""" - drained_messages = list(self.message_cache) + drained_messages: list[SessionMessage] = [] + seen_message_ids: set[str] = set() + + def append_unique(message: SessionMessage) -> None: + message_id = message.message_id + if message_id in seen_message_ids: + return + seen_message_ids.add(message_id) + drained_messages.append(message) + + for message in self.message_cache: + append_unique(message) + self.message_cache.clear() while not self._message_queue.empty(): try: - drained_messages.append(self._message_queue.get_nowait()) + append_unique(self._message_queue.get_nowait()) except asyncio.QueueEmpty: break return drained_messages @@ -223,7 +235,7 @@ class MaisakaHeartFlowChatting: logger.info(f"{self.log_prefix} No MCP tools were exposed to Maisaka") return - self._llm_service.set_extra_tools(mcp_tools) + self._chat_loop_service.set_extra_tools(mcp_tools) logger.info( f"{self.log_prefix} Loaded {len(mcp_tools)} MCP tools into Maisaka:\n" f"{self._mcp_manager.get_tool_summary()}" diff --git a/src/maisaka/tool_handlers.py b/src/maisaka/tool_handlers.py index 20cafcdb..6cd8c2d6 100644 --- a/src/maisaka/tool_handlers.py +++ b/src/maisaka/tool_handlers.py @@ -16,7 +16,6 @@ from src.llm_models.payload_content.tool_option import ToolCall from .console import console from .input_reader import InputReader -from .llm_service import MaiSakaLLMService from .message_adapter import build_message if TYPE_CHECKING: @@ -31,11 +30,9 @@ class ToolHandlerContext: def __init__( self, - llm_service: MaiSakaLLMService, reader: InputReader, user_input_times: list[datetime], ) -> None: - self.llm_service = llm_service self.reader = reader self.user_input_times = user_input_times self.last_user_input_time: Optional[datetime] = None