From cecc58a9e7ec98461eb3c426a1991324505370ff Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Wed, 1 Apr 2026 16:21:30 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E4=BD=BF=E7=94=A8url=E9=98=85?= =?UTF-8?q?=E8=A7=88=E5=9B=BE=E7=89=87=EF=BC=8C=E6=A0=87=E8=AE=B0=E8=A1=A8?= =?UTF-8?q?=E6=83=85=E5=8C=85=E7=B1=BB=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config/official_configs.py | 19 -- src/maisaka/chat_loop_service.py | 286 ++--------------------------- src/maisaka/context_messages.py | 8 + src/maisaka/prompt_cli_renderer.py | 75 +++----- 4 files changed, 49 insertions(+), 339 deletions(-) diff --git a/src/config/official_configs.py b/src/config/official_configs.py index dceb29cf..1dfad725 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -1550,15 +1550,6 @@ class MaiSakaConfig(ConfigBase): ) """工具筛选阶段最多保留的非内置工具数量""" - terminal_image_preview: bool = Field( - default=False, - json_schema_extra={ - "x-widget": "switch", - "x-icon": "image", - }, - ) - """是否渲染低分辨率终端预览图片""" - terminal_image_display_mode: Literal["legacy", "path_link"] = Field( default="legacy", json_schema_extra={ @@ -1568,16 +1559,6 @@ class MaiSakaConfig(ConfigBase): ) """图片展示模式:legacy(仅显示元信息)/ path_link(可点击本地路径)""" - terminal_image_preview_width: int = Field( - default=24, - ge=8, - json_schema_extra={ - "x-widget": "input", - "x-icon": "columns", - }, - ) - """Maisaka终端图片预览的字符宽度""" - class MCPAuthorizationConfig(ConfigBase): """MCP HTTP 认证配置。""" diff --git a/src/maisaka/chat_loop_service.py b/src/maisaka/chat_loop_service.py index 58724a4f..ccdf78cd 100644 --- a/src/maisaka/chat_loop_service.py +++ b/src/maisaka/chat_loop_service.py @@ -1,22 +1,17 @@ """Maisaka 对话循环服务。""" -from base64 import b64decode from dataclasses import dataclass from datetime import datetime -from io import BytesIO from time import perf_counter -from typing import Any, Dict, List, Optional, Sequence +from typing import List, Optional, Sequence import asyncio import json import random -from PIL import Image as PILImage from pydantic import BaseModel, Field as PydanticField -from rich.console import Group, RenderableType +from rich.console import Group from rich.panel import Panel -from rich.pretty import Pretty -from rich.text import Text from src.cli.console import console from src.common.data_models.llm_service_data_models import LLMGenerationOptions @@ -35,6 +30,7 @@ from src.services.llm_service import LLMServiceClient from .builtin_tools import get_builtin_tools from .context_messages import AssistantMessage, LLMContextMessage, SessionBackedMessage from .message_adapter import format_speaker_content +from .prompt_cli_renderer import PromptCLIVisualizer @dataclass(slots=True) @@ -44,6 +40,11 @@ class ChatResponse: content: Optional[str] tool_calls: List[ToolCall] raw_message: AssistantMessage + selected_history_count: int + prompt_tokens: int + built_message_count: int + completion_tokens: int + total_tokens: int class ToolFilterSelection(BaseModel): @@ -468,259 +469,6 @@ class MaisakaChatLoopService: return extract_category_ids_from_result(generation_result.response or "") - @staticmethod - def _get_role_badge_style(role: str) -> str: - """返回终端中角色标签的样式。 - - Args: - role: 消息角色名称。 - - Returns: - str: Rich 可识别的样式字符串。 - """ - - if role == "system": - return "bold white on blue" - if role == "user": - return "bold black on green" - if role == "assistant": - return "bold black on yellow" - if role == "tool": - return "bold white on magenta" - return "bold white on bright_black" - - @staticmethod - def _get_role_badge_label(role: str) -> str: - """返回终端中角色标签的中文名称。 - - Args: - role: 消息角色名称。 - - Returns: - str: 用于展示的中文角色名称。 - """ - - if role == "system": - return "系统" - if role == "user": - return "用户" - if role == "assistant": - return "助手" - if role == "tool": - return "工具" - return "未知" - - @staticmethod - def _build_terminal_image_preview(image_base64: str) -> Optional[str]: - """构造终端图片预览字符画。 - - Args: - image_base64: 图片的 Base64 编码。 - - Returns: - Optional[str]: 生成成功时返回字符画文本,否则返回 ``None``。 - """ - - ascii_chars = " .:-=+*#%@" - - try: - image_bytes = b64decode(image_base64) - with PILImage.open(BytesIO(image_bytes)) as image: - grayscale = image.convert("L") - width, height = grayscale.size - if width <= 0 or height <= 0: - return None - - preview_width = max(8, int(global_config.maisaka.terminal_image_preview_width)) - preview_height = max(1, int(height * (preview_width / width) * 0.5)) - resized = grayscale.resize((preview_width, preview_height)) - pixels = list(resized.tobytes()) - except Exception: - return None - - rows: List[str] = [] - for row_index in range(preview_height): - row_pixels = pixels[row_index * preview_width : (row_index + 1) * preview_width] - row = "".join(ascii_chars[min(len(ascii_chars) - 1, pixel * len(ascii_chars) // 256)] for pixel in row_pixels) - rows.append(row) - - return "\n".join(rows) - - @classmethod - def _render_message_content(cls, content: Any) -> RenderableType: - """将消息内容渲染为终端可展示对象。 - - Args: - content: 原始消息内容。 - - Returns: - RenderableType: Rich 可渲染对象。 - """ - - if isinstance(content, str): - return Text(content) - - if isinstance(content, list): - parts: List[RenderableType] = [] - for item in content: - if isinstance(item, str): - parts.append(Text(item)) - continue - if isinstance(item, tuple) and len(item) == 2: - image_format, image_base64 = item - if isinstance(image_format, str) and isinstance(image_base64, str): - approx_size = max(0, len(image_base64) * 3 // 4) - size_text = f"{approx_size / 1024:.1f} KB" if approx_size >= 1024 else f"{approx_size} B" - preview_parts: List[RenderableType] = [ - Text(f"图片格式 image/{image_format} {size_text}\nbase64 内容已省略", style="magenta") - ] - if global_config.maisaka.terminal_image_preview: - preview_text = cls._build_terminal_image_preview(image_base64) - if preview_text: - preview_parts.append(Text(preview_text, style="white")) - parts.append( - Panel( - Group(*preview_parts), - border_style="magenta", - padding=(0, 1), - ) - ) - continue - if isinstance(item, dict) and item.get("type") == "text" and isinstance(item.get("text"), str): - parts.append(Text(item["text"])) - else: - parts.append(Pretty(item, expand_all=True)) - return Group(*parts) if parts else Text("") - - if content is None: - return Text("") - - return Pretty(content, expand_all=True) - - @staticmethod - def _format_tool_call_for_display(tool_call: Any) -> Dict[str, Any]: - """将工具调用对象格式化为易读字典。 - - Args: - tool_call: 原始工具调用对象或字典。 - - Returns: - Dict[str, Any]: 适合终端展示的工具调用字典。 - """ - - if isinstance(tool_call, dict): - function_info = tool_call.get("function", {}) - return { - "id": tool_call.get("id"), - "name": function_info.get("name", tool_call.get("name")), - "arguments": function_info.get("arguments", tool_call.get("arguments")), - } - - return { - "id": getattr(tool_call, "call_id", getattr(tool_call, "id", None)), - "name": getattr(tool_call, "func_name", getattr(tool_call, "name", None)), - "arguments": getattr(tool_call, "args", getattr(tool_call, "arguments", None)), - } - - def _render_tool_call_panel(self, tool_call: Any, index: int, parent_index: int) -> Panel: - """渲染单个工具调用面板。 - - Args: - tool_call: 原始工具调用对象。 - index: 工具调用在当前消息中的序号。 - parent_index: 所属消息的序号。 - - Returns: - Panel: 工具调用展示面板。 - """ - - title = Text.assemble( - Text(" 工具调用 ", style="bold white on magenta"), - Text(f" #{parent_index}.{index}", style="muted"), - ) - return Panel( - Pretty(self._format_tool_call_for_display(tool_call), expand_all=True), - title=title, - border_style="magenta", - padding=(0, 1), - ) - - def _render_message_panel(self, message: Any, index: int) -> Panel: - """渲染单条消息面板。 - - Args: - message: 原始消息对象或字典。 - index: 消息序号。 - - Returns: - Panel: 终端展示面板。 - """ - - if isinstance(message, dict): - raw_role = message.get("role", "unknown") - content = message.get("content") - tool_call_id = message.get("tool_call_id") - else: - raw_role = getattr(message, "role", "unknown") - content = getattr(message, "content", None) - tool_call_id = getattr(message, "tool_call_id", None) - - role = raw_role.value if isinstance(raw_role, RoleType) else str(raw_role) - title = Text.assemble( - Text(f" {self._get_role_badge_label(role)} ", style=self._get_role_badge_style(role)), - Text(f" #{index}", style="muted"), - ) - - parts: List[RenderableType] = [] - if content not in (None, "", []): - parts.append(Text(" 消息 ", style="bold cyan")) - parts.append(self._render_message_content(content)) - - if tool_call_id: - parts.append( - Text.assemble( - Text(" 工具调用编号 ", style="bold magenta"), - Text(" "), - Text(str(tool_call_id), style="magenta"), - ) - ) - - if not parts: - parts.append(Text("[空消息]", style="muted")) - - return Panel( - Group(*parts), - title=title, - border_style="dim", - padding=(0, 1), - ) - - @staticmethod - def _format_token_count(token_count: int) -> str: - """格式化 token 数量展示文本。""" - if token_count >= 10_000: - return f"{token_count / 1000:.1f}k" - return str(token_count) - - @classmethod - def _build_prompt_stats_text( - cls, - *, - selected_history_count: int, - built_message_count: int, - prompt_tokens: int, - completion_tokens: int, - total_tokens: int, - ) -> str: - """构造本轮 prompt 的统计信息文本。""" - return ( - f"已选上下文消息数={selected_history_count} " - f"大模型消息数={built_message_count} " - f"实际输入Token={cls._format_token_count(prompt_tokens)} " - f"输出Token={cls._format_token_count(completion_tokens)} " - f"总Token={cls._format_token_count(total_tokens)}" - ) - async def chat_loop_step(self, chat_history: List[LLMContextMessage]) -> ChatResponse: """执行一轮 Maisaka 规划器请求。 @@ -756,13 +504,10 @@ class MaisakaChatLoopService: else: all_tools = [*get_builtin_tools(), *self._extra_tools] - ordered_panels: List[Panel] = [] - for index, msg in enumerate(built_messages, start=1): - ordered_panels.append(self._render_message_panel(msg, index)) - tool_calls = getattr(msg, "tool_calls", None) - if tool_calls: - for tool_call_index, tool_call in enumerate(tool_calls, start=1): - ordered_panels.append(self._render_tool_call_panel(tool_call, tool_call_index, index)) + ordered_panels = PromptCLIVisualizer.build_prompt_panels( + built_messages, + image_display_mode=global_config.maisaka.terminal_image_display_mode, + ) if global_config.maisaka.show_thinking and ordered_panels: console.print( @@ -795,7 +540,7 @@ class MaisakaChatLoopService: request_elapsed = perf_counter() - request_started_at logger.info(f"规划器请求完成,耗时={request_elapsed:.3f} 秒") - prompt_stats_text = self._build_prompt_stats_text( + prompt_stats_text = PromptCLIVisualizer.build_prompt_stats_text( selected_history_count=len(selected_history), built_message_count=len(built_messages), prompt_tokens=generation_result.prompt_tokens, @@ -826,6 +571,11 @@ class MaisakaChatLoopService: content=generation_result.response, tool_calls=generation_result.tool_calls or [], raw_message=raw_message, + selected_history_count=len(selected_history), + prompt_tokens=generation_result.prompt_tokens, + built_message_count=len(built_messages), + completion_tokens=generation_result.completion_tokens, + total_tokens=generation_result.total_tokens, ) @staticmethod diff --git a/src/maisaka/context_messages.py b/src/maisaka/context_messages.py index 8da06a23..174da097 100644 --- a/src/maisaka/context_messages.py +++ b/src/maisaka/context_messages.py @@ -27,6 +27,13 @@ def _guess_image_format(image_bytes: bytes) -> Optional[str]: return None +def _build_binary_component_type_text(component: EmojiComponent | ImageComponent) -> str: + """为图片类消息组件构造显式的消息类型标记。""" + if isinstance(component, EmojiComponent): + return "[消息类型]表情包" + return "[消息类型]图片" + + def _build_message_from_sequence( role: RoleType, message_sequence: MessageSequence, @@ -53,6 +60,7 @@ def _build_message_from_sequence( if isinstance(component, (EmojiComponent, ImageComponent)): image_format = _guess_image_format(component.binary_data) if image_format and component.binary_data: + builder.add_text_content(_build_binary_component_type_text(component)) builder.add_image_content(image_format, base64.b64encode(component.binary_data).decode("utf-8")) has_content = True continue diff --git a/src/maisaka/prompt_cli_renderer.py b/src/maisaka/prompt_cli_renderer.py index 104558be..7b4301c7 100644 --- a/src/maisaka/prompt_cli_renderer.py +++ b/src/maisaka/prompt_cli_renderer.py @@ -6,20 +6,21 @@ import hashlib from base64 import b64decode from dataclasses import dataclass from enum import Enum -from io import BytesIO from pathlib import Path from urllib.parse import quote -from typing import Any, Dict, List, Literal, Optional +from typing import Any, Dict, List, Literal import tempfile -from PIL import Image as PILImage from pydantic import BaseModel, Field as PydanticField from rich.console import Group, RenderableType from rich.pretty import Pretty from rich.panel import Panel from rich.text import Text +PROJECT_ROOT = Path(__file__).parent.parent.parent.absolute().resolve() +DATA_IMAGE_DIR = PROJECT_ROOT / "data" / "images" + class PromptImageDisplayMode(str, Enum): """图片在终端中的展示模式。""" @@ -37,11 +38,6 @@ class PromptImageDisplaySettings(BaseModel): display_mode: PromptImageDisplayMode = PydanticField(default=PromptImageDisplayMode.LEGACY) """图片展示模式。""" - enable_terminal_preview: bool = PydanticField(default=False) - """是否开启低分辨率终端 ASCII 预览。""" - - terminal_preview_width: int = PydanticField(default=24, ge=1) - """终端预览宽度(字符数)。""" @dataclass(slots=True) @@ -55,8 +51,6 @@ class _MessageRenderResult: class PromptCLIVisualizer: """负责构建 CLI 下 prompt 展示所需的所有可视化组件。""" - _ASCII_CHARS = " .:-=+*#%@" - @staticmethod def _get_role_badge_style(role: str) -> str: if role == "system": @@ -114,35 +108,6 @@ class PromptCLIVisualizer: return "jpeg" return normalized - @staticmethod - def _build_terminal_image_preview(image_base64: str, *, preview_width: int) -> Optional[str]: - """从 base64 构建 ASCII 预览。""" - try: - image_bytes = b64decode(image_base64) - with PILImage.open(BytesIO(image_bytes)) as image: - grayscale = image.convert("L") - width, height = grayscale.size - if width <= 0 or height <= 0: - return None - - preview_width = max(8, preview_width) - preview_height = max(1, int(height * (preview_width / width) * 0.5)) - resized = grayscale.resize((preview_width, preview_height)) - pixels = list(resized.tobytes()) - except Exception: - return None - - rows: List[str] = [] - for row_index in range(preview_height): - row_pixels = pixels[row_index * preview_width : (row_index + 1) * preview_width] - row = "".join( - PromptCLIVisualizer._ASCII_CHARS[min(len(PromptCLIVisualizer._ASCII_CHARS) - 1, pixel * len(PromptCLIVisualizer._ASCII_CHARS) // 256)] - for pixel in row_pixels - ) - rows.append(row) - - return "\n".join(rows) - @staticmethod def _build_image_cache_path(image_format: str, image_base64: str) -> Path: image_format = PromptCLIVisualizer._normalize_image_format(image_format) @@ -156,10 +121,28 @@ class PromptCLIVisualizer: normalized = file_path.as_posix() return f"file:///{quote(normalized, safe='/:')}" + @staticmethod + def _build_official_image_path(image_format: str, image_base64: str) -> Path | None: + normalized_format = PromptCLIVisualizer._normalize_image_format(image_format) + try: + image_bytes = b64decode(image_base64) + except Exception: + return None + + digest = hashlib.sha256(image_bytes).hexdigest() + official_path = DATA_IMAGE_DIR / f"{digest}.{normalized_format}" + if official_path.exists(): + return official_path + return None + @staticmethod def _build_image_file_link(image_format: str, image_base64: str) -> tuple[str, Path] | None: - """把图片内容写入临时目录并返回可点击链接文本。""" + """优先返回正式图片路径;不存在时回退到临时缓存路径。""" normalized_format = PromptCLIVisualizer._normalize_image_format(image_format) or "bin" + official_path = PromptCLIVisualizer._build_official_image_path(image_format, image_base64) + if official_path is not None: + return PromptCLIVisualizer._build_file_uri(official_path), official_path + try: image_bytes = b64decode(image_base64) except Exception: @@ -190,14 +173,6 @@ class PromptCLIVisualizer: preview_parts.append(Text.from_markup(f"\n[link={file_uri}]点击打开图片[/link]", style="cyan")) preview_parts.append(Text(f"\n{file_path}", style="dim")) - if settings.enable_terminal_preview: - preview_text = cls._build_terminal_image_preview( - image_base64, - preview_width=settings.terminal_preview_width, - ) - if preview_text: - preview_parts.append(Text(preview_text, style="white")) - return Panel( Group(*preview_parts), border_style="magenta", @@ -315,16 +290,12 @@ class PromptCLIVisualizer: messages: list[Any], *, image_display_mode: Literal["legacy", "path_link"], - enable_terminal_image_preview: bool, - terminal_image_preview_width: int, ) -> List[Panel]: """构建完整 prompt 可视化面板。""" if image_display_mode not in {mode.value for mode in PromptImageDisplayMode}: image_display_mode = PromptImageDisplayMode.LEGACY settings = PromptImageDisplaySettings( display_mode=PromptImageDisplayMode(image_display_mode), - enable_terminal_preview=enable_terminal_image_preview, - terminal_preview_width=terminal_image_preview_width, ) ordered_panels: List[Panel] = []