feat:使用url阅览图片,标记表情包类消息

This commit is contained in:
SengokuCola
2026-04-01 16:21:30 +08:00
parent 5b92942194
commit cecc58a9e7
4 changed files with 49 additions and 339 deletions

View File

@@ -1550,15 +1550,6 @@ class MaiSakaConfig(ConfigBase):
)
"""工具筛选阶段最多保留的非内置工具数量"""
terminal_image_preview: bool = Field(
default=False,
json_schema_extra={
"x-widget": "switch",
"x-icon": "image",
},
)
"""是否渲染低分辨率终端预览图片"""
terminal_image_display_mode: Literal["legacy", "path_link"] = Field(
default="legacy",
json_schema_extra={
@@ -1568,16 +1559,6 @@ class MaiSakaConfig(ConfigBase):
)
"""图片展示模式legacy仅显示元信息/ path_link可点击本地路径"""
terminal_image_preview_width: int = Field(
default=24,
ge=8,
json_schema_extra={
"x-widget": "input",
"x-icon": "columns",
},
)
"""Maisaka终端图片预览的字符宽度"""
class MCPAuthorizationConfig(ConfigBase):
"""MCP HTTP 认证配置。"""

View File

@@ -1,22 +1,17 @@
"""Maisaka 对话循环服务。"""
from base64 import b64decode
from dataclasses import dataclass
from datetime import datetime
from io import BytesIO
from time import perf_counter
from typing import Any, Dict, List, Optional, Sequence
from typing import List, Optional, Sequence
import asyncio
import json
import random
from PIL import Image as PILImage
from pydantic import BaseModel, Field as PydanticField
from rich.console import Group, RenderableType
from rich.console import Group
from rich.panel import Panel
from rich.pretty import Pretty
from rich.text import Text
from src.cli.console import console
from src.common.data_models.llm_service_data_models import LLMGenerationOptions
@@ -35,6 +30,7 @@ from src.services.llm_service import LLMServiceClient
from .builtin_tools import get_builtin_tools
from .context_messages import AssistantMessage, LLMContextMessage, SessionBackedMessage
from .message_adapter import format_speaker_content
from .prompt_cli_renderer import PromptCLIVisualizer
@dataclass(slots=True)
@@ -44,6 +40,11 @@ class ChatResponse:
content: Optional[str]
tool_calls: List[ToolCall]
raw_message: AssistantMessage
selected_history_count: int
prompt_tokens: int
built_message_count: int
completion_tokens: int
total_tokens: int
class ToolFilterSelection(BaseModel):
@@ -468,259 +469,6 @@ class MaisakaChatLoopService:
return extract_category_ids_from_result(generation_result.response or "")
@staticmethod
def _get_role_badge_style(role: str) -> str:
"""返回终端中角色标签的样式。
Args:
role: 消息角色名称。
Returns:
str: Rich 可识别的样式字符串。
"""
if role == "system":
return "bold white on blue"
if role == "user":
return "bold black on green"
if role == "assistant":
return "bold black on yellow"
if role == "tool":
return "bold white on magenta"
return "bold white on bright_black"
@staticmethod
def _get_role_badge_label(role: str) -> str:
"""返回终端中角色标签的中文名称。
Args:
role: 消息角色名称。
Returns:
str: 用于展示的中文角色名称。
"""
if role == "system":
return "系统"
if role == "user":
return "用户"
if role == "assistant":
return "助手"
if role == "tool":
return "工具"
return "未知"
@staticmethod
def _build_terminal_image_preview(image_base64: str) -> Optional[str]:
"""构造终端图片预览字符画。
Args:
image_base64: 图片的 Base64 编码。
Returns:
Optional[str]: 生成成功时返回字符画文本,否则返回 ``None``。
"""
ascii_chars = " .:-=+*#%@"
try:
image_bytes = b64decode(image_base64)
with PILImage.open(BytesIO(image_bytes)) as image:
grayscale = image.convert("L")
width, height = grayscale.size
if width <= 0 or height <= 0:
return None
preview_width = max(8, int(global_config.maisaka.terminal_image_preview_width))
preview_height = max(1, int(height * (preview_width / width) * 0.5))
resized = grayscale.resize((preview_width, preview_height))
pixels = list(resized.tobytes())
except Exception:
return None
rows: List[str] = []
for row_index in range(preview_height):
row_pixels = pixels[row_index * preview_width : (row_index + 1) * preview_width]
row = "".join(ascii_chars[min(len(ascii_chars) - 1, pixel * len(ascii_chars) // 256)] for pixel in row_pixels)
rows.append(row)
return "\n".join(rows)
@classmethod
def _render_message_content(cls, content: Any) -> RenderableType:
"""将消息内容渲染为终端可展示对象。
Args:
content: 原始消息内容。
Returns:
RenderableType: Rich 可渲染对象。
"""
if isinstance(content, str):
return Text(content)
if isinstance(content, list):
parts: List[RenderableType] = []
for item in content:
if isinstance(item, str):
parts.append(Text(item))
continue
if isinstance(item, tuple) and len(item) == 2:
image_format, image_base64 = item
if isinstance(image_format, str) and isinstance(image_base64, str):
approx_size = max(0, len(image_base64) * 3 // 4)
size_text = f"{approx_size / 1024:.1f} KB" if approx_size >= 1024 else f"{approx_size} B"
preview_parts: List[RenderableType] = [
Text(f"图片格式 image/{image_format} {size_text}\nbase64 内容已省略", style="magenta")
]
if global_config.maisaka.terminal_image_preview:
preview_text = cls._build_terminal_image_preview(image_base64)
if preview_text:
preview_parts.append(Text(preview_text, style="white"))
parts.append(
Panel(
Group(*preview_parts),
border_style="magenta",
padding=(0, 1),
)
)
continue
if isinstance(item, dict) and item.get("type") == "text" and isinstance(item.get("text"), str):
parts.append(Text(item["text"]))
else:
parts.append(Pretty(item, expand_all=True))
return Group(*parts) if parts else Text("")
if content is None:
return Text("")
return Pretty(content, expand_all=True)
@staticmethod
def _format_tool_call_for_display(tool_call: Any) -> Dict[str, Any]:
"""将工具调用对象格式化为易读字典。
Args:
tool_call: 原始工具调用对象或字典。
Returns:
Dict[str, Any]: 适合终端展示的工具调用字典。
"""
if isinstance(tool_call, dict):
function_info = tool_call.get("function", {})
return {
"id": tool_call.get("id"),
"name": function_info.get("name", tool_call.get("name")),
"arguments": function_info.get("arguments", tool_call.get("arguments")),
}
return {
"id": getattr(tool_call, "call_id", getattr(tool_call, "id", None)),
"name": getattr(tool_call, "func_name", getattr(tool_call, "name", None)),
"arguments": getattr(tool_call, "args", getattr(tool_call, "arguments", None)),
}
def _render_tool_call_panel(self, tool_call: Any, index: int, parent_index: int) -> Panel:
"""渲染单个工具调用面板。
Args:
tool_call: 原始工具调用对象。
index: 工具调用在当前消息中的序号。
parent_index: 所属消息的序号。
Returns:
Panel: 工具调用展示面板。
"""
title = Text.assemble(
Text(" 工具调用 ", style="bold white on magenta"),
Text(f" #{parent_index}.{index}", style="muted"),
)
return Panel(
Pretty(self._format_tool_call_for_display(tool_call), expand_all=True),
title=title,
border_style="magenta",
padding=(0, 1),
)
def _render_message_panel(self, message: Any, index: int) -> Panel:
"""渲染单条消息面板。
Args:
message: 原始消息对象或字典。
index: 消息序号。
Returns:
Panel: 终端展示面板。
"""
if isinstance(message, dict):
raw_role = message.get("role", "unknown")
content = message.get("content")
tool_call_id = message.get("tool_call_id")
else:
raw_role = getattr(message, "role", "unknown")
content = getattr(message, "content", None)
tool_call_id = getattr(message, "tool_call_id", None)
role = raw_role.value if isinstance(raw_role, RoleType) else str(raw_role)
title = Text.assemble(
Text(f" {self._get_role_badge_label(role)} ", style=self._get_role_badge_style(role)),
Text(f" #{index}", style="muted"),
)
parts: List[RenderableType] = []
if content not in (None, "", []):
parts.append(Text(" 消息 ", style="bold cyan"))
parts.append(self._render_message_content(content))
if tool_call_id:
parts.append(
Text.assemble(
Text(" 工具调用编号 ", style="bold magenta"),
Text(" "),
Text(str(tool_call_id), style="magenta"),
)
)
if not parts:
parts.append(Text("[空消息]", style="muted"))
return Panel(
Group(*parts),
title=title,
border_style="dim",
padding=(0, 1),
)
@staticmethod
def _format_token_count(token_count: int) -> str:
"""格式化 token 数量展示文本。"""
if token_count >= 10_000:
return f"{token_count / 1000:.1f}k"
return str(token_count)
@classmethod
def _build_prompt_stats_text(
cls,
*,
selected_history_count: int,
built_message_count: int,
prompt_tokens: int,
completion_tokens: int,
total_tokens: int,
) -> str:
"""构造本轮 prompt 的统计信息文本。"""
return (
f"已选上下文消息数={selected_history_count} "
f"大模型消息数={built_message_count} "
f"实际输入Token={cls._format_token_count(prompt_tokens)} "
f"输出Token={cls._format_token_count(completion_tokens)} "
f"总Token={cls._format_token_count(total_tokens)}"
)
async def chat_loop_step(self, chat_history: List[LLMContextMessage]) -> ChatResponse:
"""执行一轮 Maisaka 规划器请求。
@@ -756,13 +504,10 @@ class MaisakaChatLoopService:
else:
all_tools = [*get_builtin_tools(), *self._extra_tools]
ordered_panels: List[Panel] = []
for index, msg in enumerate(built_messages, start=1):
ordered_panels.append(self._render_message_panel(msg, index))
tool_calls = getattr(msg, "tool_calls", None)
if tool_calls:
for tool_call_index, tool_call in enumerate(tool_calls, start=1):
ordered_panels.append(self._render_tool_call_panel(tool_call, tool_call_index, index))
ordered_panels = PromptCLIVisualizer.build_prompt_panels(
built_messages,
image_display_mode=global_config.maisaka.terminal_image_display_mode,
)
if global_config.maisaka.show_thinking and ordered_panels:
console.print(
@@ -795,7 +540,7 @@ class MaisakaChatLoopService:
request_elapsed = perf_counter() - request_started_at
logger.info(f"规划器请求完成,耗时={request_elapsed:.3f}")
prompt_stats_text = self._build_prompt_stats_text(
prompt_stats_text = PromptCLIVisualizer.build_prompt_stats_text(
selected_history_count=len(selected_history),
built_message_count=len(built_messages),
prompt_tokens=generation_result.prompt_tokens,
@@ -826,6 +571,11 @@ class MaisakaChatLoopService:
content=generation_result.response,
tool_calls=generation_result.tool_calls or [],
raw_message=raw_message,
selected_history_count=len(selected_history),
prompt_tokens=generation_result.prompt_tokens,
built_message_count=len(built_messages),
completion_tokens=generation_result.completion_tokens,
total_tokens=generation_result.total_tokens,
)
@staticmethod

View File

@@ -27,6 +27,13 @@ def _guess_image_format(image_bytes: bytes) -> Optional[str]:
return None
def _build_binary_component_type_text(component: EmojiComponent | ImageComponent) -> str:
"""为图片类消息组件构造显式的消息类型标记。"""
if isinstance(component, EmojiComponent):
return "[消息类型]表情包"
return "[消息类型]图片"
def _build_message_from_sequence(
role: RoleType,
message_sequence: MessageSequence,
@@ -53,6 +60,7 @@ def _build_message_from_sequence(
if isinstance(component, (EmojiComponent, ImageComponent)):
image_format = _guess_image_format(component.binary_data)
if image_format and component.binary_data:
builder.add_text_content(_build_binary_component_type_text(component))
builder.add_image_content(image_format, base64.b64encode(component.binary_data).decode("utf-8"))
has_content = True
continue

View File

@@ -6,20 +6,21 @@ import hashlib
from base64 import b64decode
from dataclasses import dataclass
from enum import Enum
from io import BytesIO
from pathlib import Path
from urllib.parse import quote
from typing import Any, Dict, List, Literal, Optional
from typing import Any, Dict, List, Literal
import tempfile
from PIL import Image as PILImage
from pydantic import BaseModel, Field as PydanticField
from rich.console import Group, RenderableType
from rich.pretty import Pretty
from rich.panel import Panel
from rich.text import Text
PROJECT_ROOT = Path(__file__).parent.parent.parent.absolute().resolve()
DATA_IMAGE_DIR = PROJECT_ROOT / "data" / "images"
class PromptImageDisplayMode(str, Enum):
"""图片在终端中的展示模式。"""
@@ -37,11 +38,6 @@ class PromptImageDisplaySettings(BaseModel):
display_mode: PromptImageDisplayMode = PydanticField(default=PromptImageDisplayMode.LEGACY)
"""图片展示模式。"""
enable_terminal_preview: bool = PydanticField(default=False)
"""是否开启低分辨率终端 ASCII 预览。"""
terminal_preview_width: int = PydanticField(default=24, ge=1)
"""终端预览宽度(字符数)。"""
@dataclass(slots=True)
@@ -55,8 +51,6 @@ class _MessageRenderResult:
class PromptCLIVisualizer:
"""负责构建 CLI 下 prompt 展示所需的所有可视化组件。"""
_ASCII_CHARS = " .:-=+*#%@"
@staticmethod
def _get_role_badge_style(role: str) -> str:
if role == "system":
@@ -114,35 +108,6 @@ class PromptCLIVisualizer:
return "jpeg"
return normalized
@staticmethod
def _build_terminal_image_preview(image_base64: str, *, preview_width: int) -> Optional[str]:
"""从 base64 构建 ASCII 预览。"""
try:
image_bytes = b64decode(image_base64)
with PILImage.open(BytesIO(image_bytes)) as image:
grayscale = image.convert("L")
width, height = grayscale.size
if width <= 0 or height <= 0:
return None
preview_width = max(8, preview_width)
preview_height = max(1, int(height * (preview_width / width) * 0.5))
resized = grayscale.resize((preview_width, preview_height))
pixels = list(resized.tobytes())
except Exception:
return None
rows: List[str] = []
for row_index in range(preview_height):
row_pixels = pixels[row_index * preview_width : (row_index + 1) * preview_width]
row = "".join(
PromptCLIVisualizer._ASCII_CHARS[min(len(PromptCLIVisualizer._ASCII_CHARS) - 1, pixel * len(PromptCLIVisualizer._ASCII_CHARS) // 256)]
for pixel in row_pixels
)
rows.append(row)
return "\n".join(rows)
@staticmethod
def _build_image_cache_path(image_format: str, image_base64: str) -> Path:
image_format = PromptCLIVisualizer._normalize_image_format(image_format)
@@ -156,10 +121,28 @@ class PromptCLIVisualizer:
normalized = file_path.as_posix()
return f"file:///{quote(normalized, safe='/:')}"
@staticmethod
def _build_official_image_path(image_format: str, image_base64: str) -> Path | None:
normalized_format = PromptCLIVisualizer._normalize_image_format(image_format)
try:
image_bytes = b64decode(image_base64)
except Exception:
return None
digest = hashlib.sha256(image_bytes).hexdigest()
official_path = DATA_IMAGE_DIR / f"{digest}.{normalized_format}"
if official_path.exists():
return official_path
return None
@staticmethod
def _build_image_file_link(image_format: str, image_base64: str) -> tuple[str, Path] | None:
"""把图片内容写入临时目录并返回可点击链接文本"""
"""优先返回正式图片路径;不存在时回退到临时缓存路径"""
normalized_format = PromptCLIVisualizer._normalize_image_format(image_format) or "bin"
official_path = PromptCLIVisualizer._build_official_image_path(image_format, image_base64)
if official_path is not None:
return PromptCLIVisualizer._build_file_uri(official_path), official_path
try:
image_bytes = b64decode(image_base64)
except Exception:
@@ -190,14 +173,6 @@ class PromptCLIVisualizer:
preview_parts.append(Text.from_markup(f"\n[link={file_uri}]点击打开图片[/link]", style="cyan"))
preview_parts.append(Text(f"\n{file_path}", style="dim"))
if settings.enable_terminal_preview:
preview_text = cls._build_terminal_image_preview(
image_base64,
preview_width=settings.terminal_preview_width,
)
if preview_text:
preview_parts.append(Text(preview_text, style="white"))
return Panel(
Group(*preview_parts),
border_style="magenta",
@@ -315,16 +290,12 @@ class PromptCLIVisualizer:
messages: list[Any],
*,
image_display_mode: Literal["legacy", "path_link"],
enable_terminal_image_preview: bool,
terminal_image_preview_width: int,
) -> List[Panel]:
"""构建完整 prompt 可视化面板。"""
if image_display_mode not in {mode.value for mode in PromptImageDisplayMode}:
image_display_mode = PromptImageDisplayMode.LEGACY
settings = PromptImageDisplaySettings(
display_mode=PromptImageDisplayMode(image_display_mode),
enable_terminal_preview=enable_terminal_image_preview,
terminal_preview_width=terminal_image_preview_width,
)
ordered_panels: List[Panel] = []