"""CLI 下的 Prompt 可视化渲染模块。""" from __future__ import annotations from base64 import b64decode from dataclasses import dataclass from enum import Enum from pathlib import Path from typing import Any, Dict, List, Literal from urllib.parse import quote import hashlib import html import json import tempfile from pydantic import BaseModel, Field as PydanticField from rich.console import Group, RenderableType from rich.panel import Panel from rich.pretty import Pretty from rich.text import Text from .display_utils import ( format_token_count, format_tool_call_for_display as normalize_tool_call_for_display, get_request_panel_style as get_shared_request_panel_style, get_role_badge_label as get_shared_role_badge_label, get_role_badge_style as get_shared_role_badge_style, ) from .prompt_preview_logger import PromptPreviewLogger PROJECT_ROOT = Path(__file__).parent.parent.parent.absolute().resolve() DATA_IMAGE_DIR = PROJECT_ROOT / "data" / "images" class PromptImageDisplayMode(str, Enum): """图片在终端中的展示模式。""" LEGACY = "legacy" """不新增链接,仅保留原有的元信息展示。""" PATH_LINK = "path_link" """把图片落盘到临时目录并输出可点击路径。""" class PromptImageDisplaySettings(BaseModel): """图片展示参数。""" display_mode: PromptImageDisplayMode = PydanticField(default=PromptImageDisplayMode.LEGACY) """图片展示模式。""" @dataclass(slots=True) class _MessageRenderResult: """可渲染结果与是否有工具调用信息。""" message_panel: Panel tool_call_panels: List[Panel] class PromptCLIVisualizer: """负责构建 CLI 下 prompt 展示所需的所有可视化组件。""" @staticmethod def get_request_panel_style(request_kind: str) -> tuple[str, str]: """返回不同请求类型对应的标题与边框颜色。""" return get_shared_request_panel_style(request_kind) @staticmethod def _get_role_badge_style(role: str) -> str: return get_shared_role_badge_style(role) @staticmethod def _get_role_badge_label(role: str) -> str: return get_shared_role_badge_label(role) @staticmethod def _format_token_count(token_count: int) -> str: return format_token_count(token_count) @classmethod def build_prompt_stats_text( cls, *, selected_history_count: int, built_message_count: int, prompt_tokens: int, completion_tokens: int, total_tokens: int, ) -> str: """构造 prompt 统计文本。""" return ( f"上下文消息数量={selected_history_count} " f"已构建消息数={built_message_count} " f"实际输入Token={cls._format_token_count(prompt_tokens)} " f"输出Token={cls._format_token_count(completion_tokens)} " f"总Token={cls._format_token_count(total_tokens)}" ) @staticmethod def _normalize_image_format(image_format: str) -> str: """归一化图片扩展名。""" normalized = image_format.strip().lower() if normalized == "jpg": return "jpeg" return normalized @staticmethod def _build_image_cache_path(image_format: str, image_base64: str) -> Path: image_format = PromptCLIVisualizer._normalize_image_format(image_format) root = Path(tempfile.gettempdir()) / "maisaka_prompt_images" root.mkdir(parents=True, exist_ok=True) digest = hashlib.sha256(image_base64.encode("utf-8")).hexdigest() return root / f"{digest}.{image_format}" @staticmethod def _build_file_uri(file_path: Path) -> str: normalized = file_path.resolve().as_posix() return f"file:///{quote(normalized, safe='/:')}" @staticmethod def _build_official_image_path(image_format: str, image_base64: str) -> Path | None: normalized_format = PromptCLIVisualizer._normalize_image_format(image_format) try: image_bytes = b64decode(image_base64) except Exception: return None digest = hashlib.sha256(image_bytes).hexdigest() official_path = DATA_IMAGE_DIR / f"{digest}.{normalized_format}" if official_path.exists(): return official_path return None @staticmethod def _build_image_file_link(image_format: str, image_base64: str) -> tuple[str, Path] | None: """优先返回正式图片路径;不存在时回退到临时缓存路径。""" normalized_format = PromptCLIVisualizer._normalize_image_format(image_format) or "bin" official_path = PromptCLIVisualizer._build_official_image_path(image_format, image_base64) if official_path is not None: return PromptCLIVisualizer._build_file_uri(official_path), official_path try: image_bytes = b64decode(image_base64) except Exception: return None path = PromptCLIVisualizer._build_image_cache_path(normalized_format, image_base64) if not path.exists(): try: path.write_bytes(image_bytes) except Exception: return None return PromptCLIVisualizer._build_file_uri(path), path @classmethod def _render_image_item(cls, image_format: str, image_base64: str, settings: PromptImageDisplaySettings) -> Panel: normalized_format = cls._normalize_image_format(image_format) approx_size = max(0, len(image_base64) * 3 // 4) size_text = f"{approx_size / 1024:.1f} KB" if approx_size >= 1024 else f"{approx_size} B" preview_parts: List[RenderableType] = [ Text(f"图片格式 image/{normalized_format} {size_text}", style="magenta") ] if settings.display_mode == PromptImageDisplayMode.PATH_LINK: path_result = cls._build_image_file_link(image_format, image_base64) if path_result is not None: file_uri, file_path = path_result preview_parts: List[RenderableType] = [ Text(f"图片格式 image/{normalized_format} {size_text} 路径:{file_path}", style="magenta") ] preview_parts.append(Text.from_markup(f"[link={file_uri}]点击打开图片[/link]", style="cyan")) return Panel( Group(*preview_parts), border_style="magenta", padding=(0, 1), ) @staticmethod def _extract_image_pair(item: Any) -> tuple[str, str] | None: """兼容图片片段被序列化为 tuple 或 list 的两种形式。""" if isinstance(item, (tuple, list)) and len(item) == 2: image_format, image_base64 = item if isinstance(image_format, str) and isinstance(image_base64, str): return image_format, image_base64 return None @classmethod def _render_message_content(cls, content: Any, settings: PromptImageDisplaySettings) -> RenderableType: if isinstance(content, str): return Text(content) if isinstance(content, list): parts: List[RenderableType] = [] for item in content: if isinstance(item, str): parts.append(Text(item)) continue image_pair = cls._extract_image_pair(item) if image_pair is not None: image_format, image_base64 = image_pair parts.append(cls._render_image_item(image_format, image_base64, settings)) continue if isinstance(item, dict) and item.get("type") == "text" and isinstance(item.get("text"), str): parts.append(Text(item["text"])) else: parts.append(Pretty(item, expand_all=True)) return Group(*parts) if parts else Text("") if content is None: return Text("") return Pretty(content, expand_all=True) @classmethod def _serialize_message_content_for_dump(cls, content: Any) -> str: if isinstance(content, str): return content if isinstance(content, list): parts: List[str] = [] for item in content: if isinstance(item, str): parts.append(item) continue image_pair = cls._extract_image_pair(item) if image_pair is not None: image_format, image_base64 = image_pair approx_size = max(0, len(str(image_base64)) * 3 // 4) parts.append(f"[图片 image/{image_format} {approx_size} B]") continue if isinstance(item, dict) and item.get("type") == "text" and isinstance(item.get("text"), str): parts.append(item["text"]) continue try: parts.append(json.dumps(item, ensure_ascii=False, indent=2, default=str)) except Exception: parts.append(str(item)) return "\n".join(part for part in parts if part).strip() if content is None: return "" try: return json.dumps(content, ensure_ascii=False, indent=2, default=str) except Exception: return str(content) @classmethod def format_tool_call_for_display(cls, tool_call: Any) -> Dict[str, Any]: return normalize_tool_call_for_display(tool_call) @classmethod def _build_tool_card_title(cls, tool_call: Any) -> str: """构建 HTML 中工具卡片的折叠标题。""" normalized_tool_call = cls.format_tool_call_for_display(tool_call) tool_name = str(normalized_tool_call.get("name") or "").strip() return tool_name or "unknown" @classmethod def _build_tool_call_html(cls, tool_call: Any) -> str: """将单个工具调用渲染为默认折叠的 HTML 卡片。""" normalized_tool_call = cls.format_tool_call_for_display(tool_call) tool_name = cls._build_tool_card_title(tool_call) tool_call_id = str(normalized_tool_call.get("id") or "").strip() tool_arguments = normalized_tool_call.get("arguments") tool_meta_html = "" if tool_call_id: tool_meta_html = ( "
" "调用 ID" f"{html.escape(tool_call_id)}" "
" ) return ( "
" "" f"{html.escape(tool_name)}" "" "
" f"{tool_meta_html}" f"
{html.escape(json.dumps(tool_arguments, ensure_ascii=False, indent=2, default=str))}
" "
" "
" ) @classmethod def _extract_tool_definition_fields(cls, tool_definition: dict[str, Any]) -> tuple[str, str, Any]: """提取工具定义中的名称、描述和详情内容。""" function_info = tool_definition.get("function") if isinstance(function_info, dict): tool_name = str(function_info.get("name") or "").strip() or "unknown" description = str(function_info.get("description") or "").strip() detail_payload = function_info else: tool_name = str(tool_definition.get("name") or "").strip() or "unknown" description = str(tool_definition.get("description") or "").strip() detail_payload = tool_definition return tool_name, description, detail_payload @classmethod def _build_tool_definition_html(cls, tool_definition: dict[str, Any]) -> str: """将单个传入工具定义渲染为默认折叠的 HTML 卡片。""" tool_name, description, detail_payload = cls._extract_tool_definition_fields(tool_definition) description_html = "" if description: description_html = ( "
" "说明" f"{html.escape(description)}" "
" ) return ( "
" "" f"{html.escape(tool_name)}" "" "
" f"{description_html}" f"
{html.escape(json.dumps(detail_payload, ensure_ascii=False, indent=2, default=str))}
" "
" "
" ) @classmethod def _render_tool_call_panel(cls, tool_call: Any, index: int, parent_index: int) -> Panel: title = Text.assemble( Text(" 工具调用 ", style="bold white on magenta"), Text(f" #{parent_index}.{index}", style="muted"), ) return Panel( Pretty(cls.format_tool_call_for_display(tool_call), expand_all=True), title=title, border_style="magenta", padding=(0, 1), ) @classmethod def _build_prompt_dump_text(cls, messages: list[Any]) -> str: sections: List[str] = [] for index, message in enumerate(messages, start=1): if isinstance(message, dict): raw_role = message.get("role", "unknown") content = message.get("content") tool_call_id = message.get("tool_call_id") tool_calls = message.get("tool_calls") or [] else: raw_role = getattr(message, "role", "unknown") content = getattr(message, "content", None) tool_call_id = getattr(message, "tool_call_id", None) tool_calls = getattr(message, "tool_calls", None) or [] role = raw_role.value if hasattr(raw_role, "value") else str(raw_role) block_lines = [f"[{index}] role={role}"] if tool_call_id: block_lines.append(f"tool_call_id={tool_call_id}") normalized_content = cls._serialize_message_content_for_dump(content) if normalized_content: block_lines.append("") block_lines.append(normalized_content) if tool_calls: block_lines.append("") block_lines.append("tool_calls:") for tool_call in tool_calls: normalized_tool_call = cls.format_tool_call_for_display(tool_call) block_lines.append(json.dumps(normalized_tool_call, ensure_ascii=False, indent=2, default=str)) sections.append("\n".join(block_lines).strip()) return "\n\n" + ("\n\n" + ("=" * 80) + "\n\n").join(sections) if sections else "[空 Prompt]" @classmethod def _build_tool_definition_dump_text(cls, tool_definitions: list[dict[str, Any]] | None) -> str: """构建传入工具定义的文本备份内容。""" if not tool_definitions: return "" sections: List[str] = ["[tool_definitions]"] for index, tool_definition in enumerate(tool_definitions, start=1): tool_name, _, detail_payload = cls._extract_tool_definition_fields(tool_definition) sections.append(f"[{index}] name={tool_name}") sections.append(json.dumps(detail_payload, ensure_ascii=False, indent=2, default=str)) return "\n\n".join(sections).strip() @classmethod def _render_message_content_html(cls, content: Any) -> str: if isinstance(content, str): return f"
{html.escape(content)}
" if isinstance(content, list): parts: List[str] = [] for item in content: if isinstance(item, str): parts.append(f"
{html.escape(item)}
") continue image_pair = cls._extract_image_pair(item) if image_pair is not None: image_format, image_base64 = image_pair image_html = cls._render_image_item_html(str(image_format), str(image_base64)) parts.append(image_html) continue if isinstance(item, dict) and item.get("type") == "text" and isinstance(item.get("text"), str): parts.append(f"
{html.escape(item['text'])}
") continue parts.append(f"
{html.escape(json.dumps(item, ensure_ascii=False, indent=2, default=str))}
") return "".join(parts) if parts else "
"

        if content is None:
            return "
"

        return f"
{html.escape(json.dumps(content, ensure_ascii=False, indent=2, default=str))}
" @classmethod def _render_image_item_html(cls, image_format: str, image_base64: str) -> str: normalized_format = cls._normalize_image_format(image_format) approx_size = max(0, len(image_base64) * 3 // 4) size_text = f"{approx_size / 1024:.1f} KB" if approx_size >= 1024 else f"{approx_size} B" path_result = cls._build_image_file_link(image_format, image_base64) if path_result is None: return ( "
" f"
图片 image/{html.escape(normalized_format)} {html.escape(size_text)}
" "
" ) file_uri, file_path = path_result return ( "
" f"
图片 image/{html.escape(normalized_format)} {html.escape(size_text)}
" f"
{html.escape(str(file_path))}
" f"打开图片" "
" ) @classmethod def _build_html_role_class(cls, role: str) -> str: return { "system": "system", "user": "user", "assistant": "assistant", "tool": "tool", }.get(role, "unknown") @classmethod def _build_prompt_viewer_html( cls, messages: list[dict[str, Any]], *, request_kind: str, selection_reason: str, tool_definitions: list[dict[str, Any]] | None = None, ) -> str: panel_title, _ = cls.get_request_panel_style(request_kind) message_cards: List[str] = [] for index, message in enumerate(messages, start=1): raw_role = message.get("role", "unknown") role = raw_role.value if hasattr(raw_role, "value") else str(raw_role) role_label = cls._get_role_badge_label(role) role_class = cls._build_html_role_class(role) content_html = cls._render_message_content_html(message.get("content")) tool_call_id = message.get("tool_call_id") tool_call_html = "" if tool_call_id: tool_call_html = ( "
" "工具调用 ID" f"{html.escape(str(tool_call_id))}" "
" ) tool_panels = "" raw_tool_calls = message.get("tool_calls") or [] if isinstance(raw_tool_calls, list) and raw_tool_calls: tool_panels = ( "
" "
工具调用
" f"{''.join(cls._build_tool_call_html(tool_call) for tool_call in raw_tool_calls)}" "
" ) message_cards.append( "
" "
" f"{html.escape(role_label)}" f"#{index}" "
" f"
{content_html}
" f"{tool_call_html}" f"{tool_panels}" "
" ) subtitle_html = "" if selection_reason.strip(): subtitle_html = f"
{html.escape(selection_reason)}
" tool_definition_section_html = "" if tool_definitions: tool_definition_section_html = ( "
" "
" "全部工具" f"{len(tool_definitions)} 个" "
" "
" "
本次送入模型的工具定义
" f"{''.join(cls._build_tool_definition_html(tool_definition) for tool_definition in tool_definitions)}" "
" "
" ) return f""" {html.escape(panel_title)}
{html.escape(panel_title)}
{subtitle_html}
{''.join(message_cards)} {tool_definition_section_html}
""" @classmethod def build_prompt_access_panel( cls, messages: list[Any], *, category: str, chat_id: str, request_kind: str, selection_reason: str, image_display_mode: Literal["legacy", "path_link"], tool_definitions: list[dict[str, Any]] | None = None, ) -> RenderableType: """构建用于查看完整 prompt 的折叠入口内容。""" viewer_messages: list[dict[str, Any]] = [] for message in messages: if isinstance(message, dict): viewer_messages.append(dict(message)) continue normalized_message = { "content": getattr(message, "content", None), "role": getattr(getattr(message, "role", "unknown"), "value", getattr(message, "role", "unknown")), } tool_call_id = getattr(message, "tool_call_id", None) if tool_call_id: normalized_message["tool_call_id"] = tool_call_id tool_calls = getattr(message, "tool_calls", None) if tool_calls: normalized_message["tool_calls"] = [ cls.format_tool_call_for_display(tool_call) for tool_call in tool_calls ] viewer_messages.append(normalized_message) prompt_dump_text = cls._build_prompt_dump_text(messages) tool_definition_dump_text = cls._build_tool_definition_dump_text(tool_definitions) if tool_definition_dump_text: prompt_dump_text = f"{prompt_dump_text}\n\n{'=' * 80}\n\n{tool_definition_dump_text}" viewer_html_text = cls._build_prompt_viewer_html( viewer_messages, request_kind=request_kind, selection_reason=selection_reason, tool_definitions=tool_definitions, ) saved_paths = PromptPreviewLogger.save_preview_files( chat_id, category, { ".html": viewer_html_text, ".txt": prompt_dump_text, }, ) viewer_html_path = saved_paths[".html"] prompt_dump_path = saved_paths[".txt"] viewer_uri = cls._build_file_uri(viewer_html_path) dump_uri = cls._build_file_uri(prompt_dump_path) body = Group( Text.from_markup( f"[bold green]html预览:{viewer_html_path}[/bold green] " f"[link={viewer_uri}]在浏览器打开 Prompt [/link]" ), Text.from_markup( f"[magenta]原始文本:{prompt_dump_path}[/magenta] " f"[cyan][link={dump_uri}]点击打开 Prompt 文本[/link][/cyan]" ), ) return body @classmethod def build_prompt_section( cls, messages: list[Any], *, category: str, chat_id: str, request_kind: str, selection_reason: str, image_display_mode: Literal["legacy", "path_link"], folded: bool, tool_definitions: list[dict[str, Any]] | None = None, ) -> Panel: """构建用于嵌入结果面板中的 Prompt 区块。""" panel_title, panel_border_style = cls.get_request_panel_style(request_kind) if folded: prompt_renderable = cls.build_prompt_access_panel( messages, category=category, chat_id=chat_id, request_kind=request_kind, selection_reason=selection_reason, image_display_mode=image_display_mode, tool_definitions=tool_definitions, ) else: ordered_panels = cls.build_prompt_panels( messages, image_display_mode=image_display_mode, ) prompt_renderable = Group(*ordered_panels) return Panel( prompt_renderable, title=panel_title, subtitle=selection_reason, border_style=panel_border_style, padding=(0, 1), ) @classmethod def _build_text_preview_html( cls, content: str, *, request_kind: str, subtitle: str, ) -> str: panel_title, _ = cls.get_request_panel_style(request_kind) subtitle_html = f"
{html.escape(subtitle)}
" if subtitle.strip() else "" return f""" {html.escape(panel_title)}
{html.escape(panel_title)}
{subtitle_html}
{html.escape(content)}
""" @classmethod def build_text_access_panel( cls, content: str, *, category: str, chat_id: str, request_kind: str, subtitle: str, ) -> RenderableType: """构建文本型 Prompt 的折叠入口内容。""" html_content = cls._build_text_preview_html(content, request_kind=request_kind, subtitle=subtitle) saved_paths = PromptPreviewLogger.save_preview_files( chat_id, category, { ".html": html_content, ".txt": content, }, ) viewer_html_path = saved_paths[".html"] text_dump_path = saved_paths[".txt"] viewer_uri = cls._build_file_uri(viewer_html_path) dump_uri = cls._build_file_uri(text_dump_path) body = Group( Text.from_markup( f"[bold green]富文本预览:{viewer_html_path}[/bold green] " f"[link={viewer_uri}]点击在浏览器打开富文本 Prompt 视图[/link]" ), Text.from_markup( f"[magenta]原始文本备份:{text_dump_path}[/magenta] " f"[cyan][link={dump_uri}]点击直接打开 Prompt 文本[/link][/cyan]" ), ) return body @classmethod def build_text_section( cls, content: str, *, category: str, chat_id: str, request_kind: str, subtitle: str, folded: bool, ) -> Panel: """构建文本型 Prompt 的嵌入区块。""" panel_title, panel_border_style = cls.get_request_panel_style(request_kind) if folded: prompt_renderable = cls.build_text_access_panel( content, category=category, chat_id=chat_id, request_kind=request_kind, subtitle=subtitle, ) else: prompt_renderable = Text(content) return Panel( prompt_renderable, title=panel_title, subtitle=subtitle, border_style=panel_border_style, padding=(0, 1), ) @classmethod def _render_message_panel(cls, message: Any, index: int, settings: PromptImageDisplaySettings) -> _MessageRenderResult: if isinstance(message, dict): raw_role = message.get("role", "unknown") content = message.get("content") tool_call_id = message.get("tool_call_id") else: raw_role = getattr(message, "role", "unknown") content = getattr(message, "content", None) tool_call_id = getattr(message, "tool_call_id", None) role = raw_role.value if hasattr(raw_role, "value") else str(raw_role) title = Text.assemble( Text(f" {cls._get_role_badge_label(role)} ", style=cls._get_role_badge_style(role)), Text(f" #{index}", style="muted"), ) parts: List[RenderableType] = [] if content not in (None, "", []): parts.append(cls._render_message_content(content, settings)) if tool_call_id: parts.append( Text.assemble( Text(" 工具调用ID ", style="bold magenta"), Text(" "), Text(str(tool_call_id), style="magenta"), ) ) if not parts: parts.append(Text("[空]", style="muted")) message_panel = Panel( Group(*parts), title=title, border_style="dim", padding=(0, 1), ) tool_call_panels: List[Panel] = [] tool_calls = getattr(message, "tool_calls", None) if tool_calls: for tool_call_index, tool_call in enumerate(tool_calls, start=1): tool_call_panels.append(cls._render_tool_call_panel(tool_call, tool_call_index, index)) return _MessageRenderResult(message_panel=message_panel, tool_call_panels=tool_call_panels) @classmethod def build_prompt_panels( cls, messages: list[Any], *, image_display_mode: Literal["legacy", "path_link"], ) -> List[Panel]: """构建完整 prompt 可视化面板。""" if image_display_mode not in {mode.value for mode in PromptImageDisplayMode}: image_display_mode = PromptImageDisplayMode.LEGACY settings = PromptImageDisplaySettings( display_mode=PromptImageDisplayMode(image_display_mode), ) ordered_panels: List[Panel] = [] for index, message in enumerate(messages, start=1): message_render_result = cls._render_message_panel(message, index, settings) ordered_panels.append(message_render_result.message_panel) ordered_panels.extend(message_render_result.tool_call_panels) return ordered_panels