feat:修改图片展示模式,合并no_reply和stop

This commit is contained in:
SengokuCola
2026-04-01 14:26:03 +08:00
parent d713aa9576
commit 5b92942194
8 changed files with 427 additions and 71 deletions

View File

@@ -14,7 +14,7 @@
你可以使用这些工具:
- wait(seconds) - 暂时停止对话,等待(seconds)秒,把话语权交给用户,等待对方新的发言。
- no_reply() - 当你判断{bot_name}现在不应该发言,结束对话,不进行任何回复,直到对方有新消息。
- reply():当你判断{bot_name}现在应该正式对用户发出一条可见回复时调用。调用后系统会基于你当前这轮的想法生成一条真正展示给用户的回复。
- reply():当你判断{bot_name}现在应该正式对用户发出一条可见回复时调用。调用后系统会基于你当前这轮的想法生成一条真正展示给用户的回复。你可以针对某个用户回复,也可以对所有用户回复。
- query_jargon():当你认为某些词的含义不明确,或用户询问某些词的含义,需要进行查询
- 其他定义的工具,你可以视情况合适使用

View File

@@ -33,7 +33,6 @@ from src.maisaka.message_adapter import format_speaker_content
from src.maisaka.tool_handlers import (
ToolHandlerContext,
handle_mcp_tool,
handle_stop,
handle_unknown_tool,
handle_wait,
)
@@ -230,9 +229,8 @@ class BufferCLI:
Each round may produce internal thoughts and optionally call tools:
- reply(msg_id): generate a visible reply for the current round
- no_reply(): skip visible output and continue the loop
- no_reply(): pause the inner loop until a new user message arrives
- wait(seconds): wait for new user input
- stop(): stop the current inner loop and return to idle
"""
if self._chat_loop_service is None:
return
@@ -329,11 +327,7 @@ class BufferCLI:
tool_context = self._build_tool_context()
for tool_call in response.tool_calls:
if tool_call.func_name == "stop":
await handle_stop(tool_call, chat_history)
should_stop = True
elif tool_call.func_name == "reply":
if tool_call.func_name == "reply":
reply = await self._generate_visible_reply(chat_history, response.content or "")
chat_history.append(
ToolResultMessage(
@@ -354,15 +348,16 @@ class BufferCLI:
elif tool_call.func_name == "no_reply":
if global_config.maisaka.show_thinking:
console.print("[muted]本轮未发送可见回复。[/muted]")
console.print("[muted]对话已暂停,等待新的输入...[/muted]")
chat_history.append(
ToolResultMessage(
content="本轮未发送可见回复",
content="当前对话循环已暂停,等待新消息到来",
timestamp=datetime.now(),
tool_call_id=tool_call.call_id,
tool_name=tool_call.func_name,
)
)
should_stop = True
elif tool_call.func_name == "wait":
tool_result = await handle_wait(tool_call, chat_history, tool_context)

View File

@@ -1559,6 +1559,15 @@ class MaiSakaConfig(ConfigBase):
)
"""是否渲染低分辨率终端预览图片"""
terminal_image_display_mode: Literal["legacy", "path_link"] = Field(
default="legacy",
json_schema_extra={
"x-widget": "select",
"x-icon": "image",
},
)
"""图片展示模式legacy仅显示元信息/ path_link可点击本地路径"""
terminal_image_preview_width: int = Field(
default=24,
ge=8,

View File

@@ -73,7 +73,7 @@ def create_builtin_tool_specs() -> List[ToolSpec]:
},
"quote": {
"type": "boolean",
"description": "是否以引用回复的方式发送。",
"description": "当有非常明确的回复目标时,以引用回复的方式发送。",
"default": True,
},
"unknown_words": {

View File

@@ -0,0 +1,335 @@
"""CLI 下的 Prompt 可视化渲染模块。"""
from __future__ import annotations
import hashlib
from base64 import b64decode
from dataclasses import dataclass
from enum import Enum
from io import BytesIO
from pathlib import Path
from urllib.parse import quote
from typing import Any, Dict, List, Literal, Optional
import tempfile
from PIL import Image as PILImage
from pydantic import BaseModel, Field as PydanticField
from rich.console import Group, RenderableType
from rich.pretty import Pretty
from rich.panel import Panel
from rich.text import Text
class PromptImageDisplayMode(str, Enum):
"""图片在终端中的展示模式。"""
LEGACY = "legacy"
"""不新增链接,仅保留原有的元信息展示。"""
PATH_LINK = "path_link"
"""把图片落盘到临时目录并输出可点击路径。"""
class PromptImageDisplaySettings(BaseModel):
"""图片展示参数。"""
display_mode: PromptImageDisplayMode = PydanticField(default=PromptImageDisplayMode.LEGACY)
"""图片展示模式。"""
enable_terminal_preview: bool = PydanticField(default=False)
"""是否开启低分辨率终端 ASCII 预览。"""
terminal_preview_width: int = PydanticField(default=24, ge=1)
"""终端预览宽度(字符数)。"""
@dataclass(slots=True)
class _MessageRenderResult:
"""可渲染结果与是否有工具调用信息。"""
message_panel: Panel
tool_call_panels: List[Panel]
class PromptCLIVisualizer:
"""负责构建 CLI 下 prompt 展示所需的所有可视化组件。"""
_ASCII_CHARS = " .:-=+*#%@"
@staticmethod
def _get_role_badge_style(role: str) -> str:
if role == "system":
return "bold white on blue"
if role == "user":
return "bold black on green"
if role == "assistant":
return "bold black on yellow"
if role == "tool":
return "bold white on magenta"
return "bold white on bright_black"
@staticmethod
def _get_role_badge_label(role: str) -> str:
if role == "system":
return "系统"
if role == "user":
return "用户"
if role == "assistant":
return "助手"
if role == "tool":
return "工具"
return "未知"
@staticmethod
def _format_token_count(token_count: int) -> str:
if token_count >= 10_000:
return f"{token_count / 1000:.1f}k"
return str(token_count)
@classmethod
def build_prompt_stats_text(
cls,
*,
selected_history_count: int,
built_message_count: int,
prompt_tokens: int,
completion_tokens: int,
total_tokens: int,
) -> str:
"""构造 prompt 统计文本。"""
return (
f"上下文消息数量={selected_history_count} "
f"已构建消息数={built_message_count} "
f"实际输入Token={cls._format_token_count(prompt_tokens)} "
f"输出Token={cls._format_token_count(completion_tokens)} "
f"总Token={cls._format_token_count(total_tokens)}"
)
@staticmethod
def _normalize_image_format(image_format: str) -> str:
"""归一化图片扩展名。"""
normalized = image_format.strip().lower()
if normalized == "jpg":
return "jpeg"
return normalized
@staticmethod
def _build_terminal_image_preview(image_base64: str, *, preview_width: int) -> Optional[str]:
"""从 base64 构建 ASCII 预览。"""
try:
image_bytes = b64decode(image_base64)
with PILImage.open(BytesIO(image_bytes)) as image:
grayscale = image.convert("L")
width, height = grayscale.size
if width <= 0 or height <= 0:
return None
preview_width = max(8, preview_width)
preview_height = max(1, int(height * (preview_width / width) * 0.5))
resized = grayscale.resize((preview_width, preview_height))
pixels = list(resized.tobytes())
except Exception:
return None
rows: List[str] = []
for row_index in range(preview_height):
row_pixels = pixels[row_index * preview_width : (row_index + 1) * preview_width]
row = "".join(
PromptCLIVisualizer._ASCII_CHARS[min(len(PromptCLIVisualizer._ASCII_CHARS) - 1, pixel * len(PromptCLIVisualizer._ASCII_CHARS) // 256)]
for pixel in row_pixels
)
rows.append(row)
return "\n".join(rows)
@staticmethod
def _build_image_cache_path(image_format: str, image_base64: str) -> Path:
image_format = PromptCLIVisualizer._normalize_image_format(image_format)
root = Path(tempfile.gettempdir()) / "maisaka_prompt_images"
root.mkdir(parents=True, exist_ok=True)
digest = hashlib.sha256(image_base64.encode("utf-8")).hexdigest()
return root / f"{digest}.{image_format}"
@staticmethod
def _build_file_uri(file_path: Path) -> str:
normalized = file_path.as_posix()
return f"file:///{quote(normalized, safe='/:')}"
@staticmethod
def _build_image_file_link(image_format: str, image_base64: str) -> tuple[str, Path] | None:
"""把图片内容写入临时目录并返回可点击链接文本。"""
normalized_format = PromptCLIVisualizer._normalize_image_format(image_format) or "bin"
try:
image_bytes = b64decode(image_base64)
except Exception:
return None
path = PromptCLIVisualizer._build_image_cache_path(normalized_format, image_base64)
if not path.exists():
try:
path.write_bytes(image_bytes)
except Exception:
return None
return PromptCLIVisualizer._build_file_uri(path), path
@classmethod
def _render_image_item(cls, image_format: str, image_base64: str, settings: PromptImageDisplaySettings) -> Panel:
normalized_format = cls._normalize_image_format(image_format)
approx_size = max(0, len(image_base64) * 3 // 4)
size_text = f"{approx_size / 1024:.1f} KB" if approx_size >= 1024 else f"{approx_size} B"
preview_parts: List[RenderableType] = [
Text(f"图片格式 image/{normalized_format} {size_text}", style="magenta")
]
if settings.display_mode == PromptImageDisplayMode.PATH_LINK:
path_result = cls._build_image_file_link(image_format, image_base64)
if path_result is not None:
file_uri, file_path = path_result
preview_parts.append(Text.from_markup(f"\n[link={file_uri}]点击打开图片[/link]", style="cyan"))
preview_parts.append(Text(f"\n{file_path}", style="dim"))
if settings.enable_terminal_preview:
preview_text = cls._build_terminal_image_preview(
image_base64,
preview_width=settings.terminal_preview_width,
)
if preview_text:
preview_parts.append(Text(preview_text, style="white"))
return Panel(
Group(*preview_parts),
border_style="magenta",
padding=(0, 1),
)
@classmethod
def _render_message_content(cls, content: Any, settings: PromptImageDisplaySettings) -> RenderableType:
if isinstance(content, str):
return Text(content)
if isinstance(content, list):
parts: List[RenderableType] = []
for item in content:
if isinstance(item, str):
parts.append(Text(item))
continue
if isinstance(item, tuple) and len(item) == 2:
image_format, image_base64 = item
if isinstance(image_format, str) and isinstance(image_base64, str):
parts.append(cls._render_image_item(image_format, image_base64, settings))
continue
if isinstance(item, dict) and item.get("type") == "text" and isinstance(item.get("text"), str):
parts.append(Text(item["text"]))
else:
parts.append(Pretty(item, expand_all=True))
return Group(*parts) if parts else Text("")
if content is None:
return Text("")
return Pretty(content, expand_all=True)
@classmethod
def format_tool_call_for_display(cls, tool_call: Any) -> Dict[str, Any]:
if isinstance(tool_call, dict):
function_info = tool_call.get("function", {})
return {
"id": tool_call.get("id"),
"name": function_info.get("name", tool_call.get("name")),
"arguments": function_info.get("arguments", tool_call.get("arguments")),
}
return {
"id": getattr(tool_call, "call_id", getattr(tool_call, "id", None)),
"name": getattr(tool_call, "func_name", getattr(tool_call, "name", None)),
"arguments": getattr(tool_call, "args", getattr(tool_call, "arguments", None)),
}
@classmethod
def _render_tool_call_panel(cls, tool_call: Any, index: int, parent_index: int) -> Panel:
title = Text.assemble(
Text(" 工具调用 ", style="bold white on magenta"),
Text(f" #{parent_index}.{index}", style="muted"),
)
return Panel(
Pretty(cls.format_tool_call_for_display(tool_call), expand_all=True),
title=title,
border_style="magenta",
padding=(0, 1),
)
@classmethod
def _render_message_panel(cls, message: Any, index: int, settings: PromptImageDisplaySettings) -> _MessageRenderResult:
if isinstance(message, dict):
raw_role = message.get("role", "unknown")
content = message.get("content")
tool_call_id = message.get("tool_call_id")
else:
raw_role = getattr(message, "role", "unknown")
content = getattr(message, "content", None)
tool_call_id = getattr(message, "tool_call_id", None)
role = raw_role.value if hasattr(raw_role, "value") else str(raw_role)
title = Text.assemble(
Text(f" {cls._get_role_badge_label(role)} ", style=cls._get_role_badge_style(role)),
Text(f" #{index}", style="muted"),
)
parts: List[RenderableType] = []
if content not in (None, "", []):
parts.append(Text(" 内容 ", style="bold cyan"))
parts.append(cls._render_message_content(content, settings))
if tool_call_id:
parts.append(
Text.assemble(
Text(" 工具调用ID ", style="bold magenta"),
Text(" "),
Text(str(tool_call_id), style="magenta"),
)
)
if not parts:
parts.append(Text("[空]", style="muted"))
message_panel = Panel(
Group(*parts),
title=title,
border_style="dim",
padding=(0, 1),
)
tool_call_panels: List[Panel] = []
tool_calls = getattr(message, "tool_calls", None)
if tool_calls:
for tool_call_index, tool_call in enumerate(tool_calls, start=1):
tool_call_panels.append(cls._render_tool_call_panel(tool_call, tool_call_index, index))
return _MessageRenderResult(message_panel=message_panel, tool_call_panels=tool_call_panels)
@classmethod
def build_prompt_panels(
cls,
messages: list[Any],
*,
image_display_mode: Literal["legacy", "path_link"],
enable_terminal_image_preview: bool,
terminal_image_preview_width: int,
) -> List[Panel]:
"""构建完整 prompt 可视化面板。"""
if image_display_mode not in {mode.value for mode in PromptImageDisplayMode}:
image_display_mode = PromptImageDisplayMode.LEGACY
settings = PromptImageDisplaySettings(
display_mode=PromptImageDisplayMode(image_display_mode),
enable_terminal_preview=enable_terminal_image_preview,
terminal_preview_width=terminal_image_preview_width,
)
ordered_panels: List[Panel] = []
for index, message in enumerate(messages, start=1):
message_render_result = cls._render_message_panel(message, index, settings)
ordered_panels.append(message_render_result.message_panel)
ordered_panels.extend(message_render_result.tool_call_panels)
return ordered_panels

View File

@@ -67,7 +67,6 @@ class MaisakaReasoningEngine:
"query_jargon": self._invoke_query_jargon_tool,
"query_person_info": self._invoke_query_person_info_tool,
"wait": self._invoke_wait_tool,
"stop": self._invoke_stop_tool,
"send_emoji": self._invoke_send_emoji_tool,
}
@@ -132,6 +131,10 @@ class MaisakaReasoningEngine:
logger.info(f"{self._runtime.log_prefix} 当前思考与上一轮过于相似,已替换为重新思考提示")
self._last_reasoning_content = reasoning_content
self._runtime._render_context_usage_panel(
selected_history_count=response.selected_history_count,
prompt_tokens=response.prompt_tokens,
)
self._runtime._chat_history.append(response.raw_message)
if response.tool_calls:
@@ -570,7 +573,7 @@ class MaisakaReasoningEngine:
wait_seconds = invocation.arguments.get("seconds", 30)
return f"你让当前对话先等待 {wait_seconds} 秒。"
if invocation.tool_name == "stop":
if invocation.tool_name == "no_reply":
return "你暂停了当前对话循环,等待新的外部消息。"
if invocation.tool_name == "query_jargon":
@@ -760,7 +763,12 @@ class MaisakaReasoningEngine:
"""执行 no_reply 内置工具。"""
del context
return self._build_tool_success_result(invocation.tool_name, "本轮未发送可见回复。")
self._runtime._enter_stop_state()
return self._build_tool_success_result(
invocation.tool_name,
"当前对话循环已暂停,等待新消息到来。",
metadata={"pause_execution": True},
)
async def _invoke_query_jargon_tool(
self,
@@ -803,21 +811,6 @@ class MaisakaReasoningEngine:
metadata={"pause_execution": True},
)
async def _invoke_stop_tool(
self,
invocation: ToolInvocation,
context: Optional[ToolExecutionContext] = None,
) -> ToolExecutionResult:
"""执行 stop 内置工具。"""
del context
self._runtime._enter_stop_state()
return self._build_tool_success_result(
invocation.tool_name,
"当前对话循环已暂停,等待新消息到来。",
metadata={"pause_execution": True},
)
async def _invoke_send_emoji_tool(
self,
invocation: ToolInvocation,
@@ -1017,36 +1010,35 @@ class MaisakaReasoningEngine:
.order_by(col(PersonInfo.last_known_time).desc(), col(PersonInfo.id).desc())
.limit(limit)
).all()
persons: list[dict[str, Any]] = []
for record in records:
memory_points: list[str] = []
if record.memory_points:
try:
parsed_points = json.loads(record.memory_points)
if isinstance(parsed_points, list):
memory_points = [str(point).strip() for point in parsed_points if str(point).strip()]
except (json.JSONDecodeError, TypeError, ValueError):
memory_points = []
persons: list[dict[str, Any]] = []
for record in records:
memory_points: list[str] = []
if record.memory_points:
try:
parsed_points = json.loads(record.memory_points)
if isinstance(parsed_points, list):
memory_points = [str(point).strip() for point in parsed_points if str(point).strip()]
except (json.JSONDecodeError, TypeError, ValueError):
memory_points = []
persons.append(
{
"person_id": record.person_id,
"person_name": record.person_name or "",
"user_nickname": record.user_nickname,
"user_id": record.user_id,
"platform": record.platform,
"name_reason": record.name_reason or "",
"is_known": record.is_known,
"know_counts": record.know_counts,
"memory_points": memory_points[:20],
"last_known_time": (
record.last_known_time.isoformat() if record.last_known_time is not None else None
),
}
)
persons.append(
{
"person_id": record.person_id,
"person_name": record.person_name or "",
"user_nickname": record.user_nickname,
"user_id": record.user_id,
"platform": record.platform,
"name_reason": record.name_reason or "",
"is_known": record.is_known,
"know_counts": record.know_counts,
"memory_points": memory_points[:20],
"last_known_time": (
record.last_known_time.isoformat() if record.last_known_time is not None else None
),
}
)
return persons
return persons
def _query_related_knowledge(
self,

View File

@@ -5,6 +5,10 @@ from typing import Literal, Optional
import asyncio
import time
from rich.panel import Panel
from rich.text import Text
from src.cli.console import console
from src.chat.heart_flow.heartFC_utils import CycleDetail
from src.chat.message_receive.chat_manager import BotChatSession, chat_manager
from src.chat.message_receive.message import SessionMessage
@@ -431,6 +435,40 @@ class MaisakaHeartFlowChatting:
return GroupInfo(group_id=group_info.group_id, group_name=group_info.group_name)
@staticmethod
def _format_token_count(token_count: int) -> str:
"""格式化 token 数量展示文本。"""
if token_count >= 10_000:
return f"{token_count / 1000:.1f}k"
return str(token_count)
def _render_context_usage_panel(
self,
*,
selected_history_count: int,
prompt_tokens: int,
) -> None:
"""在终端展示当前聊天流的上下文占用情况。"""
if not global_config.maisaka.show_thinking:
return
session_name = chat_manager.get_session_name(self.session_id) or self.session_id
body = "\n".join(
[
f"聊天流: {session_name}",
f"Chat ID: {self.session_id}",
f"上下文占用: {selected_history_count}条 / {self._format_token_count(prompt_tokens)}",
]
)
console.print(
Panel(
Text(body),
title="MaiSaka 上下文占用",
border_style="bright_blue",
padding=(0, 1),
)
)
def _log_cycle_started(self, cycle_detail: CycleDetail, round_index: int) -> None:
logger.info(
f"{self.log_prefix} MaiSaka 轮次开始: 循环编号={cycle_detail.cycle_id} "

View File

@@ -32,19 +32,6 @@ class ToolHandlerContext:
self.last_user_input_time: Optional[datetime] = None
async def handle_stop(tc: ToolCall, chat_history: list[LLMContextMessage]) -> None:
"""处理 stop 工具。"""
console.print("[accent]调用工具: stop()[/accent]")
chat_history.append(
ToolResultMessage(
content="当前轮次结束后将停止对话循环。",
timestamp=datetime.now(),
tool_call_id=tc.call_id,
tool_name=tc.func_name,
)
)
async def handle_wait(tc: ToolCall, chat_history: list[LLMContextMessage], ctx: ToolHandlerContext) -> str:
"""处理 wait 工具。"""
seconds = (tc.args or {}).get("seconds", 30)