Merge branch 'r-dev' of https://github.com/A-Dawn/MaiBot into r-dev

This commit is contained in:
DawnARC
2026-04-13 13:10:02 +08:00
83 changed files with 3789 additions and 3693 deletions

View File

@@ -10,6 +10,8 @@ from src.llm_models.payload_content.tool_option import ToolDefinitionInput
from .context import BuiltinToolRuntimeContext
from .continue_tool import get_tool_spec as get_continue_tool_spec
from .continue_tool import handle_tool as handle_continue_tool
from .finish import get_tool_spec as get_finish_tool_spec
from .finish import handle_tool as handle_finish_tool
from .no_reply import get_tool_spec as get_no_reply_tool_spec
from .no_reply import handle_tool as handle_no_reply_tool
from .query_jargon import get_tool_spec as get_query_jargon_tool_spec
@@ -22,6 +24,8 @@ from .reply import get_tool_spec as get_reply_tool_spec
from .reply import handle_tool as handle_reply_tool
from .send_emoji import get_tool_spec as get_send_emoji_tool_spec
from .send_emoji import handle_tool as handle_send_emoji_tool
from .tool_search import get_tool_spec as get_tool_search_tool_spec
from .tool_search import handle_tool as handle_tool_search_tool
from .view_complex_message import get_tool_spec as get_view_complex_message_tool_spec
from .view_complex_message import handle_tool as handle_view_complex_message_tool
from .wait import get_tool_spec as get_wait_tool_spec
@@ -44,11 +48,13 @@ def get_action_tool_specs() -> List[ToolSpec]:
"""获取 Action Loop 阶段可用的内置工具声明。"""
return [
get_finish_tool_spec(),
get_reply_tool_spec(),
get_view_complex_message_tool_spec(),
get_query_jargon_tool_spec(),
get_query_memory_tool_spec(enabled=bool(global_config.memory.enable_memory_query_tool)),
get_send_emoji_tool_spec(),
get_tool_search_tool_spec(),
]
@@ -63,12 +69,14 @@ def get_all_builtin_tool_specs() -> List[ToolSpec]:
return [
*get_timing_tool_specs(),
get_finish_tool_spec(),
get_reply_tool_spec(),
get_view_complex_message_tool_spec(),
get_query_jargon_tool_spec(),
get_query_memory_tool_spec(enabled=True),
get_query_person_info_tool_spec(),
get_send_emoji_tool_spec(),
get_tool_search_tool_spec(),
]
@@ -95,6 +103,7 @@ def build_builtin_tool_handlers(tool_ctx: BuiltinToolRuntimeContext) -> Dict[str
return {
"continue": lambda invocation, context=None: handle_continue_tool(tool_ctx, invocation, context),
"finish": lambda invocation, context=None: handle_finish_tool(tool_ctx, invocation, context),
"reply": lambda invocation, context=None: handle_reply_tool(tool_ctx, invocation, context),
"no_reply": lambda invocation, context=None: handle_no_reply_tool(tool_ctx, invocation, context),
"query_jargon": lambda invocation, context=None: handle_query_jargon_tool(tool_ctx, invocation, context),
@@ -106,6 +115,7 @@ def build_builtin_tool_handlers(tool_ctx: BuiltinToolRuntimeContext) -> Dict[str
),
"wait": lambda invocation, context=None: handle_wait_tool(tool_ctx, invocation, context),
"send_emoji": lambda invocation, context=None: handle_send_emoji_tool(tool_ctx, invocation, context),
"tool_search": lambda invocation, context=None: handle_tool_search_tool(tool_ctx, invocation, context),
"view_complex_message": lambda invocation, context=None: handle_view_complex_message_tool(
tool_ctx,
invocation,

View File

@@ -0,0 +1,34 @@
"""finish 内置工具。"""
from typing import Optional
from src.core.tooling import ToolExecutionContext, ToolExecutionResult, ToolInvocation, ToolSpec
from .context import BuiltinToolRuntimeContext
def get_tool_spec() -> ToolSpec:
"""获取 finish 工具声明。"""
return ToolSpec(
name="finish",
brief_description="结束本轮思考,等待后续新的外部消息再继续。",
provider_name="maisaka_builtin",
provider_type="builtin",
)
async def handle_tool(
tool_ctx: BuiltinToolRuntimeContext,
invocation: ToolInvocation,
context: Optional[ToolExecutionContext] = None,
) -> ToolExecutionResult:
"""执行 finish 内置工具。"""
del context
tool_ctx.runtime._enter_stop_state()
return tool_ctx.build_success_result(
invocation.tool_name,
"当前对话循环已结束本轮思考,等待新的消息到来。",
metadata={"pause_execution": True},
)

View File

@@ -29,6 +29,6 @@ async def handle_tool(
tool_ctx.runtime._enter_stop_state()
return tool_ctx.build_success_result(
invocation.tool_name,
"当前对话循环已暂停,等待新消息到来。",
"当前暂时停止思考,等待新消息到来。",
metadata={"pause_execution": True},
)

View File

@@ -91,10 +91,6 @@ async def handle_tool(
f"未找到要回复的目标消息msg_id={target_message_id}",
)
logger.info(
f"{tool_ctx.runtime.log_prefix} 已触发回复工具,"
f"目标消息编号={target_message_id} 引用回复={set_quote} 最新思考={latest_thought!r}"
)
try:
replyer = replyer_manager.get_replyer(
chat_stream=tool_ctx.runtime.chat_stream,

View File

@@ -2,11 +2,11 @@
from datetime import datetime
from io import BytesIO
import math
from random import sample
from typing import Any, Dict, Optional
import asyncio
import math
from PIL import Image as PILImage
from PIL import ImageDraw, ImageFont
@@ -20,12 +20,14 @@ from src.common.logger import get_logger
from src.config.config import global_config
from src.core.tooling import ToolExecutionContext, ToolExecutionResult, ToolInvocation, ToolSpec
from src.llm_models.payload_content.resp_format import RespFormat, RespFormatType
from src.llm_models.payload_content.message import MessageBuilder, RoleType
from src.maisaka.context_messages import (
LLMContextMessage,
ReferenceMessage,
ReferenceMessageType,
SessionBackedMessage,
)
from src.plugin_runtime.hook_payloads import serialize_prompt_messages
from .context import BuiltinToolRuntimeContext
@@ -242,34 +244,9 @@ def _build_emoji_candidate_summary(emojis: list[MaiEmoji]) -> str:
return "\n".join(summary_lines).strip()
def _build_send_emoji_prompt_preview(
*,
system_prompt: str,
requested_emotion: str,
grid_rows: int,
grid_columns: int,
sampled_emojis: list[MaiEmoji],
) -> str:
"""构建表情选择子代理的文本预览。"""
task_text = (
"[选择任务]\n"
f"requested_emotion: {requested_emotion or '未指定'}\n"
f"候选总数: {len(sampled_emojis)}\n"
f"拼图布局: {grid_rows}x{grid_columns}\n"
"请只输出 JSON。"
)
candidate_summary = _build_emoji_candidate_summary(sampled_emojis)
return (
f"[System Prompt]\n{system_prompt}\n\n"
f"{task_text}\n\n"
f"[候选表情摘要]\n{candidate_summary or '无候选表情'}"
).strip()
def _build_send_emoji_monitor_detail(
*,
prompt_text: str = "",
request_messages: Optional[list[dict[str, Any]]] = None,
reasoning_text: str = "",
output_text: str = "",
metrics: Optional[Dict[str, Any]] = None,
@@ -278,8 +255,8 @@ def _build_send_emoji_monitor_detail(
"""构建 emotion tool 统一监控详情。"""
detail: Dict[str, Any] = {}
if prompt_text.strip():
detail["prompt_text"] = prompt_text.strip()
if isinstance(request_messages, list) and request_messages:
detail["request_messages"] = request_messages
if reasoning_text.strip():
detail["reasoning_text"] = reasoning_text.strip()
if output_text.strip():
@@ -387,13 +364,16 @@ async def _select_emoji_with_sub_agent(
remaining_uses_value=1,
display_prefix="[表情包选择任务]",
)
prompt_preview = _build_send_emoji_prompt_preview(
system_prompt=system_prompt,
requested_emotion=requested_emotion,
grid_rows=grid_rows,
grid_columns=grid_columns,
sampled_emojis=sampled_emojis,
)
request_messages = [
MessageBuilder().set_role(RoleType.System).add_text_content(system_prompt).build(),
]
prompt_llm_message = prompt_message.to_llm_message()
if prompt_llm_message is not None:
request_messages.append(prompt_llm_message)
candidate_llm_message = candidate_message.to_llm_message()
if candidate_llm_message is not None:
request_messages.append(candidate_llm_message)
serialized_request_messages = serialize_prompt_messages(request_messages)
selection_started_at = datetime.now()
response = await tool_ctx.runtime.run_sub_agent(
@@ -421,7 +401,7 @@ async def _select_emoji_with_sub_agent(
logger.warning(f"{tool_ctx.runtime.log_prefix} 表情包子代理结果解析失败,将回退到候选首项: {exc}")
if selection_metadata is not None:
selection_metadata["monitor_detail"] = _build_send_emoji_monitor_detail(
prompt_text=prompt_preview,
request_messages=serialized_request_messages,
output_text=response.content or "",
metrics=selection_metrics,
extra_sections=[{
@@ -435,7 +415,7 @@ async def _select_emoji_with_sub_agent(
if selection_metadata is not None:
selection_metadata["reason"] = selection.reason.strip()
selection_metadata["monitor_detail"] = _build_send_emoji_monitor_detail(
prompt_text=prompt_preview,
request_messages=serialized_request_messages,
reasoning_text=selection.reason,
output_text=response.content or "",
metrics=selection_metrics,

View File

@@ -0,0 +1,106 @@
"""tool_search 内置工具。"""
from typing import Any, Dict, List, Optional
import json
from src.core.tooling import ToolExecutionContext, ToolExecutionResult, ToolInvocation, ToolSpec
from .context import BuiltinToolRuntimeContext
def get_tool_spec() -> ToolSpec:
"""获取 tool_search 工具声明。"""
return ToolSpec(
name="tool_search",
brief_description="在 deferred tools 列表中按名称或关键词搜索工具,并将命中的工具加入后续轮次的可用工具列表。",
detailed_description=(
"参数说明:\n"
"- queryString必填。工具名、前缀或关键词。\n"
"- limitInteger可选。最多返回多少个匹配工具默认为 5。"
),
parameters_schema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "要搜索的工具名、前缀或关键词。",
},
"limit": {
"type": "integer",
"description": "最多返回多少个匹配工具。",
"minimum": 1,
},
},
"required": ["query"],
},
provider_name="maisaka_builtin",
provider_type="builtin",
)
async def handle_tool(
tool_ctx: BuiltinToolRuntimeContext,
invocation: ToolInvocation,
context: Optional[ToolExecutionContext] = None,
) -> ToolExecutionResult:
"""执行 tool_search 内置工具。"""
del context
raw_query = invocation.arguments.get("query")
if not isinstance(raw_query, str) or not raw_query.strip():
return tool_ctx.build_failure_result(
invocation.tool_name,
"tool_search 需要提供非空的 `query` 字符串参数。",
)
raw_limit = invocation.arguments.get("limit", 5)
try:
limit = max(1, int(raw_limit))
except (TypeError, ValueError):
limit = 5
matched_tool_specs = tool_ctx.runtime.search_deferred_tool_specs(raw_query, limit=limit)
matched_tool_names = [tool_spec.name for tool_spec in matched_tool_specs]
newly_discovered_tool_names = tool_ctx.runtime.discover_deferred_tools(matched_tool_names)
structured_content: Dict[str, Any] = {
"query": raw_query.strip(),
"matched_tool_names": matched_tool_names,
"newly_discovered_tool_names": newly_discovered_tool_names,
}
if not matched_tool_names:
return tool_ctx.build_success_result(
invocation.tool_name,
"未找到匹配的 deferred tools请尝试更完整的工具名、前缀或其他关键词。",
structured_content=structured_content,
metadata={"record_display_prompt": "tool_search 未找到匹配工具。"},
)
content_lines: List[str] = [
f"已找到 {len(matched_tool_names)} 个 deferred tools它们会在后续轮次中加入可用工具列表",
*[f"- {tool_name}" for tool_name in matched_tool_names],
]
if newly_discovered_tool_names:
content_lines.extend(
[
"",
"本次新发现的工具:",
*[f"- {tool_name}" for tool_name in newly_discovered_tool_names],
]
)
else:
content_lines.extend(["", "这些工具此前已经发现过,无需重复展开。"])
return tool_ctx.build_success_result(
invocation.tool_name,
"\n".join(content_lines),
structured_content=structured_content,
metadata={
"matched_tool_names": matched_tool_names,
"newly_discovered_tool_names": newly_discovered_tool_names,
"record_display_prompt": json.dumps(structured_content, ensure_ascii=False),
},
)

View File

@@ -12,8 +12,8 @@ def get_tool_spec() -> ToolSpec:
return ToolSpec(
name="wait",
brief_description="暂停当前对话并等待用户新的输入",
detailed_description="参数说明:\n- secondsinteger必填。等待的秒数。",
brief_description="暂停当前对话并固定等待一段时间,期间不因新消息提前恢复",
detailed_description="参数说明:\n- secondsinteger必填。等待的秒数。等待期间收到的新消息只会暂存,直到超时后再继续处理。",
parameters_schema={
"type": "object",
"properties": {
@@ -46,6 +46,6 @@ async def handle_tool(
tool_ctx.runtime._enter_wait_state(seconds=wait_seconds, tool_call_id=invocation.call_id)
return tool_ctx.build_success_result(
invocation.tool_name,
f"当前对话循环进入等待状态,最长等待 {wait_seconds} 秒。",
f"当前对话循环进入等待状态,将固定等待 {wait_seconds};期间收到的新消息不会提前打断本次等待",
metadata={"pause_execution": True},
)

View File

@@ -5,20 +5,18 @@ from datetime import datetime
from typing import Any, List, Optional, Sequence
import asyncio
import json
import random
from pydantic import BaseModel, Field as PydanticField
from rich.console import RenderableType
from src.common.data_models.llm_service_data_models import LLMGenerationOptions
from src.common.logger import get_logger
from src.common.prompt_i18n import load_prompt
from src.common.utils.utils_session import SessionUtils
from src.config.config import global_config
from src.core.tooling import ToolRegistry, ToolSpec
from src.core.tooling import ToolRegistry
from src.llm_models.model_client.base_client import BaseClient
from src.llm_models.payload_content.message import Message, MessageBuilder, RoleType
from src.llm_models.payload_content.resp_format import RespFormat, RespFormatType
from src.llm_models.payload_content.resp_format import RespFormat
from src.llm_models.payload_content.tool_option import ToolCall, ToolDefinitionInput, ToolOption, normalize_tool_options
from src.plugin_runtime.hook_payloads import (
deserialize_prompt_messages,
@@ -32,9 +30,11 @@ from src.plugin_runtime.host.hook_spec_registry import HookSpec, HookSpecRegistr
from src.services.llm_service import LLMServiceClient
from .builtin_tool import get_builtin_tools
from .context_messages import AssistantMessage, LLMContextMessage
from .context_messages import AssistantMessage, LLMContextMessage, ToolResultMessage
from .history_utils import drop_orphan_tool_results
from .prompt_cli_renderer import PromptCLIVisualizer
from .display.prompt_cli_renderer import PromptCLIVisualizer
TIMING_GATE_TOOL_NAMES = {"continue", "no_reply", "wait"}
@dataclass(slots=True)
@@ -54,13 +54,6 @@ class ChatResponse:
prompt_section: Optional[RenderableType] = None
class ToolFilterSelection(BaseModel):
"""工具筛选响应。"""
selected_tool_names: list[str] = PydanticField(default_factory=list)
"""经过预筛后保留的候选工具名称列表。"""
logger = get_logger("maisaka_chat_loop")
@@ -217,10 +210,6 @@ class MaisakaChatLoopService:
else:
self._chat_system_prompt = chat_system_prompt
self._llm_chat = LLMServiceClient(task_name="planner", request_type="maisaka_planner")
self._tool_filter_llm = LLMServiceClient(
task_name=global_config.maisaka.tool_filter_task_name,
request_type="maisaka_tool_filter",
)
@property
def personality_prompt(self) -> str:
@@ -303,8 +292,15 @@ class MaisakaChatLoopService:
"file_tools_section": tools_section,
"group_chat_attention_block": self._build_group_chat_attention_block(),
"identity": self._personality_prompt,
"time_block": self._build_time_block(),
}
@staticmethod
def _build_time_block() -> str:
"""构建当前时间提示块。"""
return f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
def _build_group_chat_attention_block(self) -> str:
"""构建当前聊天场景下的额外注意事项块。"""
@@ -399,6 +395,7 @@ class MaisakaChatLoopService:
self,
selected_history: List[LLMContextMessage],
*,
injected_user_messages: Sequence[str] | None = None,
system_prompt: Optional[str] = None,
) -> List[Message]:
"""构造发给大模型的消息列表。
@@ -420,254 +417,49 @@ class MaisakaChatLoopService:
if llm_message is not None:
messages.append(llm_message)
normalized_injected_messages: List[Message] = []
for injected_message in injected_user_messages or []:
normalized_message = str(injected_message or "").strip()
if not normalized_message:
continue
normalized_injected_messages.append(
MessageBuilder()
.set_role(RoleType.User)
.add_text_content(normalized_message)
.build()
)
if normalized_injected_messages:
insertion_index = self._resolve_injected_user_messages_insertion_index(messages)
messages[insertion_index:insertion_index] = normalized_injected_messages
return messages
@staticmethod
def _is_builtin_tool_spec(tool_spec: ToolSpec) -> bool:
"""判断一个工具是否属于默认内置工具
def _resolve_injected_user_messages_insertion_index(messages: Sequence[Message]) -> int:
"""计算 injected meta user messages 在请求中的插入位置
Args:
tool_spec: 待判断的工具声明。
Returns:
bool: 是否为默认内置工具
规则与 deferred attachment 更接近:
- 从尾部向前寻找最近的 stopping point
- stopping point 为 assistant 消息或 tool 结果消息;
- 找到后插入到其后面;
- 若不存在 stopping point则退回到 system 消息之后
"""
return tool_spec.provider_type == "builtin" or tool_spec.provider_name == "maisaka_builtin"
for index in range(len(messages) - 1, -1, -1):
message = messages[index]
if message.role in {RoleType.Assistant, RoleType.Tool}:
return index + 1
@classmethod
def _split_builtin_and_candidate_tools(
cls,
tool_specs: List[ToolSpec],
) -> tuple[List[ToolSpec], List[ToolSpec]]:
"""拆分内置工具与可筛选工具列表。
Args:
tool_specs: 当前全部工具声明。
Returns:
tuple[List[ToolSpec], List[ToolSpec]]: `(内置工具, 可筛选工具)`。
"""
builtin_tool_specs: List[ToolSpec] = []
candidate_tool_specs: List[ToolSpec] = []
for tool_spec in tool_specs:
if cls._is_builtin_tool_spec(tool_spec):
builtin_tool_specs.append(tool_spec)
else:
candidate_tool_specs.append(tool_spec)
return builtin_tool_specs, candidate_tool_specs
@staticmethod
def _truncate_tool_filter_text(text: str, max_length: int = 180) -> str:
"""截断工具筛选阶段展示的文本。
Args:
text: 原始文本。
max_length: 最长保留字符数。
Returns:
str: 截断后的文本。
"""
normalized_text = text.strip()
if len(normalized_text) <= max_length:
return normalized_text
return f"{normalized_text[: max_length - 1]}"
def _build_tool_filter_prompt(
self,
selected_history: List[LLMContextMessage],
candidate_tool_specs: List[ToolSpec],
max_keep: int,
) -> str:
"""构造小模型工具预筛选提示词。
Args:
selected_history: 已选中的对话上下文。
candidate_tool_specs: 非内置候选工具列表。
max_keep: 最多保留的候选工具数量。
Returns:
str: 用于工具预筛的小模型提示词。
"""
history_lines: List[str] = []
for message in selected_history[-10:]:
plain_text = message.processed_plain_text.strip()
if not plain_text:
continue
history_lines.append(
f"- {message.role}: {self._truncate_tool_filter_text(plain_text, max_length=200)}"
)
if history_lines:
history_section = "\n".join(history_lines)
else:
history_section = "- 当前没有可用的对话上下文。"
tool_lines = [
f"- {tool_spec.name}: {tool_spec.brief_description.strip() or '无简要描述'}"
for tool_spec in candidate_tool_specs
]
tool_section = "\n".join(tool_lines) if tool_lines else "- 当前没有候选工具。"
return (
"你是 Maisaka 的工具预筛选器。\n"
"你的任务是在正式进入 planner 前,根据当前情景从候选工具中挑出最可能马上会用到的工具。\n"
"默认内置工具已经自动保留,不在候选列表中,你不需要再次选择它们。\n"
"你只能参考工具的简要描述,不要假设未描述的隐藏能力。\n"
f"最多保留 {max_keep} 个候选工具;如果都不合适,可以返回空数组。\n"
"请严格返回 JSON 对象,格式为:"
'{"selected_tool_names":["工具名1","工具名2"]}\n\n'
f"【最近对话】\n{history_section}\n\n"
f"【候选工具(仅简要描述)】\n{tool_section}"
)
@staticmethod
def _parse_tool_filter_response(
response_text: str,
candidate_tool_specs: List[ToolSpec],
max_keep: int,
) -> List[ToolSpec] | None:
"""解析工具预筛选响应。
Args:
response_text: 小模型返回的原始文本。
candidate_tool_specs: 非内置候选工具列表。
max_keep: 最多保留的候选工具数量。
Returns:
List[ToolSpec] | None: 成功解析时返回筛选后的工具列表;解析失败时返回 ``None``。
"""
normalized_response = response_text.strip()
if not normalized_response:
return None
selected_tool_names: List[str]
try:
selected_tool_names = ToolFilterSelection.model_validate_json(normalized_response).selected_tool_names
except Exception:
try:
parsed_payload = json.loads(normalized_response)
except json.JSONDecodeError:
return None
if isinstance(parsed_payload, dict):
raw_tool_names = parsed_payload.get("selected_tool_names", [])
elif isinstance(parsed_payload, list):
raw_tool_names = parsed_payload
else:
return None
if not isinstance(raw_tool_names, list):
return None
selected_tool_names = []
for item in raw_tool_names:
normalized_name = str(item).strip()
if normalized_name:
selected_tool_names.append(normalized_name)
candidate_map = {tool_spec.name: tool_spec for tool_spec in candidate_tool_specs}
filtered_tool_specs: List[ToolSpec] = []
seen_names: set[str] = set()
for tool_name in selected_tool_names:
normalized_name = tool_name.strip()
if not normalized_name or normalized_name in seen_names:
continue
tool_spec = candidate_map.get(normalized_name)
if tool_spec is None:
continue
seen_names.add(normalized_name)
filtered_tool_specs.append(tool_spec)
if len(filtered_tool_specs) >= max_keep:
break
return filtered_tool_specs
async def _filter_tool_specs_for_planner(
self,
selected_history: List[LLMContextMessage],
tool_specs: List[ToolSpec],
) -> List[ToolSpec]:
"""在将工具交给 planner 前进行快速预筛选。
Args:
selected_history: 已选中的对话上下文。
tool_specs: 当前全部可用工具声明。
Returns:
List[ToolSpec]: 最终交给 planner 的工具声明列表。
"""
threshold = max(1, int(global_config.maisaka.tool_filter_threshold))
max_keep = max(1, int(global_config.maisaka.tool_filter_max_keep))
if len(tool_specs) <= threshold:
return tool_specs
builtin_tool_specs, candidate_tool_specs = self._split_builtin_and_candidate_tools(tool_specs)
if not candidate_tool_specs:
return tool_specs
if len(candidate_tool_specs) <= max_keep:
return [*builtin_tool_specs, *candidate_tool_specs]
filter_prompt = self._build_tool_filter_prompt(selected_history, candidate_tool_specs, max_keep)
logger.info(
"工具预筛选开始: "
f"总工具数={len(tool_specs)} "
f"内置工具数={len(builtin_tool_specs)} "
f"候选工具数={len(candidate_tool_specs)} "
f"最多保留候选数={max_keep}"
)
try:
generation_result = await self._tool_filter_llm.generate_response(
prompt=filter_prompt,
options=LLMGenerationOptions(
temperature=0.0,
max_tokens=256,
response_format=RespFormat(
format_type=RespFormatType.JSON_SCHEMA,
schema=ToolFilterSelection,
),
),
)
except Exception as exc:
logger.warning(f"工具预筛选失败,保留全部工具。错误={exc}")
return tool_specs
filtered_candidate_tool_specs = self._parse_tool_filter_response(
generation_result.response or "",
candidate_tool_specs,
max_keep,
)
if filtered_candidate_tool_specs is None:
logger.warning(
"工具预筛选返回结果无法解析,保留全部工具。"
f" 原始返回={generation_result.response or ''!r}"
)
return tool_specs
filtered_tool_specs = [*builtin_tool_specs, *filtered_candidate_tool_specs]
if not filtered_tool_specs:
logger.warning("工具预筛选得到空结果,保留全部工具以避免主流程失去工具能力。")
return tool_specs
logger.info(
"工具预筛选完成: "
f"筛选前总数={len(tool_specs)} "
f"筛选后总数={len(filtered_tool_specs)} "
f"保留候选工具={[tool_spec.name for tool_spec in filtered_candidate_tool_specs]}"
)
return filtered_tool_specs
if messages and messages[0].role == RoleType.System:
return 1
return 0
async def chat_loop_step(
self,
chat_history: List[LLMContextMessage],
*,
injected_user_messages: Sequence[str] | None = None,
request_kind: str = "planner",
response_format: RespFormat | None = None,
tool_definitions: Sequence[ToolDefinitionInput] | None = None,
@@ -683,8 +475,14 @@ class MaisakaChatLoopService:
if not self._prompts_loaded:
await self.ensure_chat_prompt_loaded()
selected_history, selection_reason = self.select_llm_context_messages(chat_history)
built_messages = self._build_request_messages(selected_history)
selected_history, selection_reason = self.select_llm_context_messages(
chat_history,
request_kind=request_kind,
)
built_messages = self._build_request_messages(
selected_history,
injected_user_messages=injected_user_messages,
)
def message_factory(_client: BaseClient) -> List[Message]:
"""返回当前轮次已经构建好的请求消息。
@@ -704,8 +502,7 @@ class MaisakaChatLoopService:
all_tools = list(tool_definitions)
elif self._tool_registry is not None:
tool_specs = await self._tool_registry.list_tools()
filtered_tool_specs = await self._filter_tool_specs_for_planner(selected_history, tool_specs)
all_tools = [tool_spec.to_llm_definition() for tool_spec in filtered_tool_specs]
all_tools = [tool_spec.to_llm_definition() for tool_spec in tool_specs]
else:
all_tools = [*get_builtin_tools(), *self._extra_tools]
@@ -740,15 +537,9 @@ class MaisakaChatLoopService:
selection_reason=selection_reason,
image_display_mode=image_display_mode,
folded=global_config.debug.fold_maisaka_thinking,
tool_definitions=list(all_tools),
)
logger.info(
"规划器请求开始: "
f"已选上下文消息数={len(selected_history)} "
f"大模型消息数={len(built_messages)} "
f"工具数={len(all_tools)} "
f"启用打断={self._interrupt_flag is not None}"
)
generation_result = await self._llm_chat.generate_response_with_messages(
message_factory=message_factory,
options=LLMGenerationOptions(
@@ -760,15 +551,6 @@ class MaisakaChatLoopService:
),
)
prompt_stats_text = PromptCLIVisualizer.build_prompt_stats_text(
selected_history_count=len(selected_history),
built_message_count=len(built_messages),
prompt_tokens=generation_result.prompt_tokens,
completion_tokens=generation_result.completion_tokens,
total_tokens=generation_result.total_tokens,
)
logger.info(f"本轮Prompt统计: {prompt_stats_text}")
final_response = generation_result.response or ""
final_tool_calls = list(generation_result.tool_calls or [])
after_response_result = await self._get_runtime_manager().invoke_hook(
@@ -822,16 +604,21 @@ class MaisakaChatLoopService:
def select_llm_context_messages(
chat_history: List[LLMContextMessage],
*,
request_kind: str = "planner",
max_context_size: Optional[int] = None,
) -> tuple[List[LLMContextMessage], str]:
"""??????? LLM ???????"""
"""选择LLM上下文消息"""
filtered_history = MaisakaChatLoopService._filter_history_for_request_kind(
chat_history,
request_kind=request_kind,
)
effective_context_size = max(1, int(max_context_size or global_config.chat.max_context_size))
selected_indices: List[int] = []
counted_message_count = 0
for index in range(len(chat_history) - 1, -1, -1):
message = chat_history[index]
for index in range(len(filtered_history) - 1, -1, -1):
message = filtered_history[index]
if message.to_llm_message() is None:
continue
@@ -842,10 +629,10 @@ class MaisakaChatLoopService:
break
if not selected_indices:
return [], f"???????? {effective_context_size} ? user/assistant??? 0 ??"
return [], f"没有选择到上下文消息,实际发送 {effective_context_size} user/assistant 消息"
selected_indices.reverse()
selected_history = [chat_history[index] for index in selected_indices]
selected_history = [filtered_history[index] for index in selected_indices]
selected_history, hidden_assistant_count = MaisakaChatLoopService._hide_early_assistant_messages(selected_history)
selected_history, _ = drop_orphan_tool_results(selected_history)
selection_reason = (
@@ -860,45 +647,43 @@ class MaisakaChatLoopService:
)
@staticmethod
def _select_llm_context_messages(chat_history: List[LLMContextMessage]) -> tuple[List[LLMContextMessage], str]:
"""选择真正发送给 LLM 的上下文消息。
def _filter_history_for_request_kind(
selected_history: List[LLMContextMessage],
*,
request_kind: str,
) -> List[LLMContextMessage]:
"""按请求类型过滤不应暴露的历史工具链。"""
Args:
chat_history: 当前全部对话历史。
if request_kind != "planner":
return selected_history
Returns:
tuple[List[LLMContextMessage], str]: `(已选上下文, 选择说明)`。
"""
max_context_size = max(1, int(global_config.chat.max_context_size))
selected_indices: List[int] = []
counted_message_count = 0
for index in range(len(chat_history) - 1, -1, -1):
message = chat_history[index]
if message.to_llm_message() is None:
filtered_history: List[LLMContextMessage] = []
for message in selected_history:
if isinstance(message, ToolResultMessage) and message.tool_name in TIMING_GATE_TOOL_NAMES:
continue
selected_indices.append(index)
if message.count_in_context:
counted_message_count += 1
if counted_message_count >= max_context_size:
break
if isinstance(message, AssistantMessage) and message.tool_calls:
kept_tool_calls = [
tool_call
for tool_call in message.tool_calls
if tool_call.func_name not in TIMING_GATE_TOOL_NAMES
]
if not kept_tool_calls:
continue
if len(kept_tool_calls) != len(message.tool_calls):
filtered_history.append(
AssistantMessage(
content=message.content,
timestamp=message.timestamp,
tool_calls=kept_tool_calls,
source_kind=message.source_kind,
)
)
continue
if not selected_indices:
return [], f"上下文判定:最近 {max_context_size} 条 user/assistant当前 0 条)"
filtered_history.append(message)
selected_indices.reverse()
selected_history = [chat_history[index] for index in selected_indices]
selected_history, hidden_assistant_count = MaisakaChatLoopService._hide_early_assistant_messages(selected_history)
selected_history, _ = drop_orphan_tool_results(selected_history)
return (
selected_history,
(
f"上下文判定:最近 {max_context_size} 条 user/assistant"
f"展示并发送窗口内消息 {len(selected_history)}"
),
)
return filtered_history
@staticmethod
def _hide_early_assistant_messages(

View File

@@ -51,7 +51,9 @@ def _append_emoji_component(builder: MessageBuilder, component: EmojiComponent)
if component.content:
builder.add_text_content(component.content)
return True
return False
builder.add_text_content("[表情包]")
return True
def _append_image_component(builder: MessageBuilder, component: ImageComponent) -> bool:
@@ -65,7 +67,9 @@ def _append_image_component(builder: MessageBuilder, component: ImageComponent)
if component.content:
builder.add_text_content(component.content)
return True
return False
builder.add_text_content("[图片]")
return True
def _append_reply_component(builder: MessageBuilder, component: ReplyComponent) -> bool:

View File

@@ -0,0 +1,33 @@
"""Maisaka 展示模块。"""
from .display_utils import (
build_tool_call_summary_lines,
format_token_count,
format_tool_call_for_display,
get_request_panel_style,
get_role_badge_label,
get_role_badge_style,
)
from .prompt_cli_renderer import PromptCLIVisualizer
from .prompt_preview_logger import PromptPreviewLogger
from .stage_status_board import (
disable_stage_status_board,
enable_stage_status_board,
remove_stage_status,
update_stage_status,
)
__all__ = [
"PromptCLIVisualizer",
"PromptPreviewLogger",
"build_tool_call_summary_lines",
"disable_stage_status_board",
"enable_stage_status_board",
"format_token_count",
"format_tool_call_for_display",
"get_request_panel_style",
"get_role_badge_label",
"get_role_badge_style",
"remove_stage_status",
"update_stage_status",
]

View File

@@ -4,14 +4,15 @@ from typing import Any
_REQUEST_PANEL_STYLE_MAP: dict[str, tuple[str, str]] = {
"timing_gate": ("\u004d\u0061\u0069\u0053\u0061\u006b\u0061 \u5927\u6a21\u578b\u8bf7\u6c42 - Timing Gate \u5b50\u4ee3\u7406", "bright_magenta"),
"replyer": ("\u004d\u0061\u0069\u0053\u0061\u006b\u0061 \u56de\u590d\u5668 Prompt", "bright_yellow"),
"planner": ("MaiSaka 大模型请求 - 对话单步", "green"),
"timing_gate": ("MaiSaka 大模型请求 - Timing Gate 子代理", "bright_magenta"),
"replyer": ("MaiSaka 回复器 Prompt", "bright_yellow"),
"emotion": ("MaiSaka Emotion Tool Prompt", "bright_cyan"),
"sub_agent": ("\u004d\u0061\u0069\u0053\u0061\u006b\u0061 \u5927\u6a21\u578b\u8bf7\u6c42 - \u5b50\u4ee3\u7406", "bright_blue"),
"sub_agent": ("MaiSaka 大模型请求 - 子代理", "bright_blue"),
}
_DEFAULT_REQUEST_PANEL_STYLE: tuple[str, str] = (
"\u004d\u0061\u0069\u0053\u0061\u006b\u0061 \u5927\u6a21\u578b\u8bf7\u6c42 - \u5bf9\u8bdd\u5355\u6b65",
"MaiSaka 大模型请求 - 对话单步",
"cyan",
)
@@ -23,10 +24,10 @@ _ROLE_BADGE_STYLE_MAP: dict[str, str] = {
}
_ROLE_BADGE_LABEL_MAP: dict[str, str] = {
"system": "\u7cfb\u7edf",
"user": "\u7528\u6237",
"assistant": "\u52a9\u624b",
"tool": "\u5de5\u5177",
"system": "系统",
"user": "用户",
"assistant": "助手",
"tool": "工具",
}
@@ -54,7 +55,7 @@ def get_role_badge_style(role: str) -> str:
def get_role_badge_label(role: str) -> str:
"""返回角色标签对应的展示文案。"""
return _ROLE_BADGE_LABEL_MAP.get(role, "\u672a\u77e5")
return _ROLE_BADGE_LABEL_MAP.get(role, "未知")
def format_tool_call_for_display(tool_call: Any) -> dict[str, Any]:

View File

@@ -0,0 +1,58 @@
"""Maisaka Prompt 预览路径工具。"""
from __future__ import annotations
from pathlib import Path
from urllib.parse import quote
import re
from src.chat.message_receive.chat_manager import chat_manager
REPO_ROOT = Path(__file__).parent.parent.parent.parent.absolute().resolve()
SAFE_NAME_PATTERN = re.compile(r"[^A-Za-z0-9._-]+")
def normalize_preview_name(value: str) -> str:
normalized_value = SAFE_NAME_PATTERN.sub("_", str(value or "").strip()).strip("._")
if normalized_value:
return normalized_value
return "unknown"
def normalize_platform_name(platform: str) -> str:
normalized_platform = str(platform or "").strip().lower()
platform_aliases = {
"telegram": "tg",
}
return normalize_preview_name(platform_aliases.get(normalized_platform, normalized_platform))
def build_preview_chat_dir_name(chat_id: str) -> str:
session = chat_manager.get_session_by_session_id(chat_id)
if session is not None:
platform = normalize_platform_name(session.platform)
if session.is_group_session and session.group_id:
return f"{platform}_group_{normalize_preview_name(session.group_id)}"
if session.user_id:
return f"{platform}_private_{normalize_preview_name(session.user_id)}"
normalized_chat_id = normalize_preview_name(chat_id)
if normalized_chat_id != "unknown":
return normalized_chat_id
return "unknown_chat"
def build_display_path(file_path: Path) -> str:
"""构造用于展示的路径,项目内文件优先显示相对路径。"""
resolved_path = file_path.resolve()
try:
return resolved_path.relative_to(REPO_ROOT).as_posix()
except ValueError:
return resolved_path.as_posix()
def build_file_uri(file_path: Path) -> str:
normalized = file_path.resolve().as_posix()
return f"file:///{quote(normalized, safe='/:')}"

View File

@@ -7,7 +7,6 @@ from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Literal
from urllib.parse import quote
import hashlib
import html
@@ -27,10 +26,10 @@ from .display_utils import (
get_role_badge_label as get_shared_role_badge_label,
get_role_badge_style as get_shared_role_badge_style,
)
from .preview_path_utils import build_display_path, build_file_uri, REPO_ROOT
from .prompt_preview_logger import PromptPreviewLogger
PROJECT_ROOT = Path(__file__).parent.parent.parent.absolute().resolve()
DATA_IMAGE_DIR = PROJECT_ROOT / "data" / "images"
DATA_IMAGE_DIR = REPO_ROOT / "data" / "images"
class PromptImageDisplayMode(str, Enum):
@@ -115,11 +114,6 @@ class PromptCLIVisualizer:
digest = hashlib.sha256(image_base64.encode("utf-8")).hexdigest()
return root / f"{digest}.{image_format}"
@staticmethod
def _build_file_uri(file_path: Path) -> str:
normalized = file_path.resolve().as_posix()
return f"file:///{quote(normalized, safe='/:')}"
@staticmethod
def _build_official_image_path(image_format: str, image_base64: str) -> Path | None:
normalized_format = PromptCLIVisualizer._normalize_image_format(image_format)
@@ -140,7 +134,7 @@ class PromptCLIVisualizer:
normalized_format = PromptCLIVisualizer._normalize_image_format(image_format) or "bin"
official_path = PromptCLIVisualizer._build_official_image_path(image_format, image_base64)
if official_path is not None:
return PromptCLIVisualizer._build_file_uri(official_path), official_path
return build_file_uri(official_path), official_path
try:
image_bytes = b64decode(image_base64)
@@ -153,7 +147,7 @@ class PromptCLIVisualizer:
path.write_bytes(image_bytes)
except Exception:
return None
return PromptCLIVisualizer._build_file_uri(path), path
return build_file_uri(path), path
@classmethod
def _render_image_item(cls, image_format: str, image_base64: str, settings: PromptImageDisplaySettings) -> Panel:
@@ -169,8 +163,9 @@ class PromptCLIVisualizer:
path_result = cls._build_image_file_link(image_format, image_base64)
if path_result is not None:
file_uri, file_path = path_result
display_path = build_display_path(file_path)
preview_parts: List[RenderableType] = [
Text(f"图片格式 image/{normalized_format} {size_text} 路径:{file_path}", style="magenta")
Text(f"图片格式 image/{normalized_format} {size_text} 路径:{display_path}", style="magenta")
]
preview_parts.append(Text.from_markup(f"[link={file_uri}]点击打开图片[/link]", style="cyan"))
@@ -181,6 +176,16 @@ class PromptCLIVisualizer:
padding=(0, 1),
)
@staticmethod
def _extract_image_pair(item: Any) -> tuple[str, str] | None:
"""兼容图片片段被序列化为 tuple 或 list 的两种形式。"""
if isinstance(item, (tuple, list)) and len(item) == 2:
image_format, image_base64 = item
if isinstance(image_format, str) and isinstance(image_base64, str):
return image_format, image_base64
return None
@classmethod
def _render_message_content(cls, content: Any, settings: PromptImageDisplaySettings) -> RenderableType:
if isinstance(content, str):
@@ -192,11 +197,11 @@ class PromptCLIVisualizer:
if isinstance(item, str):
parts.append(Text(item))
continue
if isinstance(item, tuple) and len(item) == 2:
image_format, image_base64 = item
if isinstance(image_format, str) and isinstance(image_base64, str):
parts.append(cls._render_image_item(image_format, image_base64, settings))
continue
image_pair = cls._extract_image_pair(item)
if image_pair is not None:
image_format, image_base64 = image_pair
parts.append(cls._render_image_item(image_format, image_base64, settings))
continue
if isinstance(item, dict) and item.get("type") == "text" and isinstance(item.get("text"), str):
parts.append(Text(item["text"]))
else:
@@ -218,8 +223,9 @@ class PromptCLIVisualizer:
if isinstance(item, str):
parts.append(item)
continue
if isinstance(item, tuple) and len(item) == 2:
image_format, image_base64 = item
image_pair = cls._extract_image_pair(item)
if image_pair is not None:
image_format, image_base64 = image_pair
approx_size = max(0, len(str(image_base64)) * 3 // 4)
parts.append(f"[图片 image/{image_format} {approx_size} B]")
continue
@@ -242,6 +248,85 @@ class PromptCLIVisualizer:
def format_tool_call_for_display(cls, tool_call: Any) -> Dict[str, Any]:
return normalize_tool_call_for_display(tool_call)
@classmethod
def _build_tool_card_title(cls, tool_call: Any) -> str:
"""构建 HTML 中工具卡片的折叠标题。"""
normalized_tool_call = cls.format_tool_call_for_display(tool_call)
tool_name = str(normalized_tool_call.get("name") or "").strip()
return tool_name or "unknown"
@classmethod
def _build_tool_call_html(cls, tool_call: Any) -> str:
"""将单个工具调用渲染为默认折叠的 HTML 卡片。"""
normalized_tool_call = cls.format_tool_call_for_display(tool_call)
tool_name = cls._build_tool_card_title(tool_call)
tool_call_id = str(normalized_tool_call.get("id") or "").strip()
tool_arguments = normalized_tool_call.get("arguments")
tool_meta_html = ""
if tool_call_id:
tool_meta_html = (
"<div class='tool-card-meta'>"
"<span class='tool-card-meta-label'>调用 ID</span>"
f"<code>{html.escape(tool_call_id)}</code>"
"</div>"
)
return (
"<details class='tool-card tool-call-card'>"
"<summary class='tool-card-summary'>"
f"<span class='tool-card-name'>{html.escape(tool_name)}</span>"
"</summary>"
"<div class='tool-card-body'>"
f"{tool_meta_html}"
f"<pre>{html.escape(json.dumps(tool_arguments, ensure_ascii=False, indent=2, default=str))}</pre>"
"</div>"
"</details>"
)
@classmethod
def _extract_tool_definition_fields(cls, tool_definition: dict[str, Any]) -> tuple[str, str, Any]:
"""提取工具定义中的名称、描述和详情内容。"""
function_info = tool_definition.get("function")
if isinstance(function_info, dict):
tool_name = str(function_info.get("name") or "").strip() or "unknown"
description = str(function_info.get("description") or "").strip()
detail_payload = function_info
else:
tool_name = str(tool_definition.get("name") or "").strip() or "unknown"
description = str(tool_definition.get("description") or "").strip()
detail_payload = tool_definition
return tool_name, description, detail_payload
@classmethod
def _build_tool_definition_html(cls, tool_definition: dict[str, Any]) -> str:
"""将单个传入工具定义渲染为默认折叠的 HTML 卡片。"""
tool_name, description, detail_payload = cls._extract_tool_definition_fields(tool_definition)
description_html = ""
if description:
description_html = (
"<div class='tool-card-meta'>"
"<span class='tool-card-meta-label'>说明</span>"
f"<span>{html.escape(description)}</span>"
"</div>"
)
return (
"<details class='tool-card tool-definition-card'>"
"<summary class='tool-card-summary'>"
f"<span class='tool-card-name'>{html.escape(tool_name)}</span>"
"</summary>"
"<div class='tool-card-body'>"
f"{description_html}"
f"<pre>{html.escape(json.dumps(detail_payload, ensure_ascii=False, indent=2, default=str))}</pre>"
"</div>"
"</details>"
)
@classmethod
def _render_tool_call_panel(cls, tool_call: Any, index: int, parent_index: int) -> Panel:
title = Text.assemble(
@@ -291,6 +376,20 @@ class PromptCLIVisualizer:
return "\n\n" + ("\n\n" + ("=" * 80) + "\n\n").join(sections) if sections else "[空 Prompt]"
@classmethod
def _build_tool_definition_dump_text(cls, tool_definitions: list[dict[str, Any]] | None) -> str:
"""构建传入工具定义的文本备份内容。"""
if not tool_definitions:
return ""
sections: List[str] = ["[tool_definitions]"]
for index, tool_definition in enumerate(tool_definitions, start=1):
tool_name, _, detail_payload = cls._extract_tool_definition_fields(tool_definition)
sections.append(f"[{index}] name={tool_name}")
sections.append(json.dumps(detail_payload, ensure_ascii=False, indent=2, default=str))
return "\n\n".join(sections).strip()
@classmethod
def _render_message_content_html(cls, content: Any) -> str:
if isinstance(content, str):
@@ -302,8 +401,9 @@ class PromptCLIVisualizer:
if isinstance(item, str):
parts.append(f"<pre>{html.escape(item)}</pre>")
continue
if isinstance(item, tuple) and len(item) == 2:
image_format, image_base64 = item
image_pair = cls._extract_image_pair(item)
if image_pair is not None:
image_format, image_base64 = image_pair
image_html = cls._render_image_item_html(str(image_format), str(image_base64))
parts.append(image_html)
continue
@@ -332,14 +432,44 @@ class PromptCLIVisualizer:
)
file_uri, file_path = path_result
display_path = build_display_path(file_path)
return (
"<div class='image-card'>"
f"<div class='image-meta'>图片 image/{html.escape(normalized_format)} {html.escape(size_text)}</div>"
f"<div class='image-path'>{html.escape(str(file_path))}</div>"
f"<a class='image-preview-link' href='{html.escape(file_uri, quote=True)}'>"
f"<img class='image-preview' src='{html.escape(file_uri, quote=True)}' alt='图片预览' />"
"</a>"
f"<div class='image-path'>{html.escape(display_path)}</div>"
f"<a class='image-link' href='{html.escape(file_uri, quote=True)}'>打开图片</a>"
"</div>"
)
@staticmethod
def _build_preview_access_body(
*,
viewer_label: str,
viewer_path: Path,
viewer_link_text: str,
dump_label: str,
dump_path: Path,
dump_link_text: str,
) -> RenderableType:
viewer_uri = build_file_uri(viewer_path)
dump_uri = build_file_uri(dump_path)
viewer_display_path = build_display_path(viewer_path)
dump_display_path = build_display_path(dump_path)
return Group(
Text.from_markup(
f"[bold green]{viewer_label}{viewer_display_path}[/bold green] "
f"[link={viewer_uri}]{viewer_link_text}[/link]"
),
Text.from_markup(
f"[magenta]{dump_label}{dump_display_path}[/magenta] "
f"[cyan][link={dump_uri}]{dump_link_text}[/link][/cyan]"
),
)
@classmethod
def _build_html_role_class(cls, role: str) -> str:
return {
@@ -356,6 +486,7 @@ class PromptCLIVisualizer:
*,
request_kind: str,
selection_reason: str,
tool_definitions: list[dict[str, Any]] | None = None,
) -> str:
panel_title, _ = cls.get_request_panel_style(request_kind)
message_cards: List[str] = []
@@ -378,16 +509,12 @@ class PromptCLIVisualizer:
tool_panels = ""
raw_tool_calls = message.get("tool_calls") or []
if isinstance(raw_tool_calls, list) and raw_tool_calls:
tool_items = []
for tool_call_index, tool_call in enumerate(raw_tool_calls, start=1):
normalized_tool_call = cls.format_tool_call_for_display(tool_call)
tool_items.append(
"<div class='tool-panel'>"
f"<div class='tool-panel-title'>工具调用 #{index}.{tool_call_index}</div>"
f"<pre>{html.escape(json.dumps(normalized_tool_call, ensure_ascii=False, indent=2, default=str))}</pre>"
"</div>"
)
tool_panels = "".join(tool_items)
tool_panels = (
"<div class='tool-list'>"
"<div class='tool-list-title'>工具调用</div>"
f"{''.join(cls._build_tool_call_html(tool_call) for tool_call in raw_tool_calls)}"
"</div>"
)
message_cards.append(
"<section class='message-card'>"
@@ -405,6 +532,21 @@ class PromptCLIVisualizer:
if selection_reason.strip():
subtitle_html = f"<div class='subtitle'>{html.escape(selection_reason)}</div>"
tool_definition_section_html = ""
if tool_definitions:
tool_definition_section_html = (
"<section class='message-card tool-definition-section'>"
"<div class='message-head'>"
"<span class='role-badge tool'>全部工具</span>"
f"<span class='message-index'>{len(tool_definitions)} 个</span>"
"</div>"
"<div class='tool-list'>"
"<div class='tool-list-title'>本次送入模型的工具定义</div>"
f"{''.join(cls._build_tool_definition_html(tool_definition) for tool_definition in tool_definitions)}"
"</div>"
"</section>"
)
return f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
@@ -491,7 +633,7 @@ class PromptCLIVisualizer:
font-weight: 600;
}}
.message-content pre,
.tool-panel pre {{
.tool-card pre {{
margin: 0;
white-space: pre-wrap;
word-break: break-word;
@@ -517,18 +659,81 @@ class PromptCLIVisualizer:
border-radius: 8px;
padding: 3px 8px;
}}
.tool-panel {{
.tool-list {{
margin-top: 14px;
}}
.tool-list-title {{
color: #86198f;
font-size: 13px;
font-weight: 800;
margin-bottom: 10px;
}}
.tool-card {{
margin-top: 12px;
background: #fcf4ff;
border: 1px solid #f0d7fb;
border-radius: 14px;
padding: 12px 14px;
overflow: hidden;
}}
.tool-panel-title {{
color: #a21caf;
.tool-call-card {{
border-color: #ff8700;
}}
.tool-card:first-of-type {{
margin-top: 0;
}}
.tool-card-summary {{
list-style: none;
cursor: pointer;
display: flex;
align-items: center;
justify-content: space-between;
padding: 12px 14px;
color: #86198f;
font-size: 13px;
font-weight: 800;
}}
.tool-card-summary::-webkit-details-marker {{
display: none;
}}
.tool-card-summary::after {{
content: "展开";
color: #a21caf;
font-size: 12px;
font-weight: 700;
margin-bottom: 8px;
}}
.tool-card[open] .tool-card-summary::after {{
content: "收起";
}}
.tool-card-name {{
word-break: break-word;
}}
.tool-card-body {{
border-top: 1px solid #f0d7fb;
padding: 12px 14px;
background: rgba(255, 255, 255, 0.52);
}}
.tool-call-card .tool-card-body {{
border-top-color: #ff8700;
}}
.tool-card-meta {{
margin-bottom: 10px;
color: #a21caf;
display: flex;
gap: 10px;
align-items: center;
flex-wrap: wrap;
}}
.tool-card-meta-label {{
font-weight: 700;
}}
.tool-card-meta code {{
background: #faf5ff;
border: 1px solid #e9d5ff;
border-radius: 8px;
padding: 3px 8px;
}}
.tool-card pre {{
color: #3b0764;
}}
.image-card {{
background: #f8fafc;
@@ -547,6 +752,22 @@ class PromptCLIVisualizer:
font-family: "Cascadia Mono", "JetBrains Mono", "Consolas", monospace;
word-break: break-all;
}}
.image-preview-link {{
display: block;
margin-top: 10px;
}}
.image-preview {{
display: block;
max-width: min(100%, 560px);
max-height: 420px;
width: auto;
height: auto;
border-radius: 12px;
border: 1px solid #dbe4f0;
background: #fff;
box-shadow: 0 8px 20px rgba(15, 23, 42, 0.08);
object-fit: contain;
}}
.image-link {{
display: inline-block;
margin-top: 8px;
@@ -564,6 +785,7 @@ class PromptCLIVisualizer:
{subtitle_html}
</header>
{''.join(message_cards)}
{tool_definition_section_html}
</main>
</body>
</html>"""
@@ -578,6 +800,7 @@ class PromptCLIVisualizer:
request_kind: str,
selection_reason: str,
image_display_mode: Literal["legacy", "path_link"],
tool_definitions: list[dict[str, Any]] | None = None,
) -> RenderableType:
"""构建用于查看完整 prompt 的折叠入口内容。"""
@@ -603,10 +826,14 @@ class PromptCLIVisualizer:
viewer_messages.append(normalized_message)
prompt_dump_text = cls._build_prompt_dump_text(messages)
tool_definition_dump_text = cls._build_tool_definition_dump_text(tool_definitions)
if tool_definition_dump_text:
prompt_dump_text = f"{prompt_dump_text}\n\n{'=' * 80}\n\n{tool_definition_dump_text}"
viewer_html_text = cls._build_prompt_viewer_html(
viewer_messages,
request_kind=request_kind,
selection_reason=selection_reason,
tool_definitions=tool_definitions,
)
saved_paths = PromptPreviewLogger.save_preview_files(
chat_id,
@@ -618,18 +845,13 @@ class PromptCLIVisualizer:
)
viewer_html_path = saved_paths[".html"]
prompt_dump_path = saved_paths[".txt"]
viewer_uri = cls._build_file_uri(viewer_html_path)
dump_uri = cls._build_file_uri(prompt_dump_path)
body = Group(
Text.from_markup(
f"[bold green]富文本预览:{viewer_html_path}[/bold green] "
f"[link={viewer_uri}]点击在浏览器打开富文本 Prompt 视图[/link]"
),
Text.from_markup(
f"[magenta]原始文本备份:{prompt_dump_path}[/magenta] "
f"[cyan][link={dump_uri}]点击直接打开 Prompt 文本[/link][/cyan]"
),
body = cls._build_preview_access_body(
viewer_label="html预览",
viewer_path=viewer_html_path,
viewer_link_text="在浏览器打开 Prompt",
dump_label="原始文本",
dump_path=prompt_dump_path,
dump_link_text="点击打开 Prompt 文本",
)
return body
@@ -644,6 +866,7 @@ class PromptCLIVisualizer:
selection_reason: str,
image_display_mode: Literal["legacy", "path_link"],
folded: bool,
tool_definitions: list[dict[str, Any]] | None = None,
) -> Panel:
"""构建用于嵌入结果面板中的 Prompt 区块。"""
@@ -656,6 +879,7 @@ class PromptCLIVisualizer:
request_kind=request_kind,
selection_reason=selection_reason,
image_display_mode=image_display_mode,
tool_definitions=tool_definitions,
)
else:
ordered_panels = cls.build_prompt_panels(
@@ -782,18 +1006,13 @@ class PromptCLIVisualizer:
)
viewer_html_path = saved_paths[".html"]
text_dump_path = saved_paths[".txt"]
viewer_uri = cls._build_file_uri(viewer_html_path)
dump_uri = cls._build_file_uri(text_dump_path)
body = Group(
Text.from_markup(
f"[bold green]富文本预览:{viewer_html_path}[/bold green] "
f"[link={viewer_uri}]点击在浏览器打开富文本 Prompt 视图[/link]"
),
Text.from_markup(
f"[magenta]原始文本备份:{text_dump_path}[/magenta] "
f"[cyan][link={dump_uri}]点击直接打开 Prompt 文本[/link][/cyan]"
),
body = cls._build_preview_access_body(
viewer_label="富文本预览",
viewer_path=viewer_html_path,
viewer_link_text="点击在浏览器打开富文本 Prompt 视图",
dump_label="原始文本备份",
dump_path=text_dump_path,
dump_link_text="点击直接打开 Prompt 文本",
)
return body

View File

@@ -2,34 +2,29 @@
from __future__ import annotations
import re
import time
from pathlib import Path
from typing import Dict
from uuid import uuid4
from src.config.config import global_config
from .preview_path_utils import build_preview_chat_dir_name, normalize_preview_name
class PromptPreviewLogger:
"""负责保存 Maisaka Prompt 预览文件并控制目录容量。"""
_BASE_DIR = Path("logs") / "maisaka_prompt"
_MAX_PREVIEW_GROUPS_PER_CHAT = 1024
_TRIM_COUNT = 100
_SAFE_NAME_PATTERN = re.compile(r"[^A-Za-z0-9._-]+")
@classmethod
def _get_max_per_chat(cls) -> int:
"""从配置中获取每个聊天流最大保存的预览数量。"""
return getattr(global_config.chat, "plan_reply_log_max_per_chat", 1000)
@classmethod
def _normalize_chat_id(cls, chat_id: str) -> str:
normalized_chat_id = cls._SAFE_NAME_PATTERN.sub("_", str(chat_id or "").strip()).strip("._")
if normalized_chat_id:
return normalized_chat_id
return "unknown_chat"
def _build_file_stem(cls, chat_dir: Path) -> str:
base_stem = str(int(time.time() * 1000))
candidate_stem = base_stem
suffix_index = 1
while any((chat_dir / f"{candidate_stem}{suffix}").exists() for suffix in (".html", ".txt")):
candidate_stem = f"{base_stem}_{suffix_index}"
suffix_index += 1
return candidate_stem
@classmethod
def save_preview_files(
@@ -40,10 +35,10 @@ class PromptPreviewLogger:
) -> Dict[str, Path]:
"""保存同一份 Prompt 预览的多个文件并执行超量清理。"""
normalized_category = cls._normalize_chat_id(category)
chat_dir = (cls._BASE_DIR / normalized_category / cls._normalize_chat_id(chat_id)).resolve()
normalized_category = normalize_preview_name(category)
chat_dir = (cls._BASE_DIR / normalized_category / build_preview_chat_dir_name(chat_id)).resolve()
chat_dir.mkdir(parents=True, exist_ok=True)
stem = f"{int(time.time() * 1000)}_{uuid4().hex[:8]}"
stem = cls._build_file_stem(chat_dir)
saved_paths: Dict[str, Path] = {}
try:
for suffix, content in files.items():
@@ -65,15 +60,14 @@ class PromptPreviewLogger:
continue
grouped_files.setdefault(file_path.stem, []).append(file_path)
max_per_chat = cls._get_max_per_chat()
if len(grouped_files) <= max_per_chat:
if len(grouped_files) <= cls._MAX_PREVIEW_GROUPS_PER_CHAT:
return
sorted_groups = sorted(
grouped_files.items(),
key=lambda item: min(path.stat().st_mtime for path in item[1]),
)
overflow_count = len(grouped_files) - max_per_chat
overflow_count = len(grouped_files) - cls._MAX_PREVIEW_GROUPS_PER_CHAT
trim_count = min(len(sorted_groups), max(cls._TRIM_COUNT, overflow_count))
for _, file_group in sorted_groups[:trim_count]:
for old_file in file_group:

View File

@@ -0,0 +1,163 @@
"""Maisaka 阶段状态看板。"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Optional
import json
import os
import subprocess
import sys
import threading
import time
class MaisakaStageStatusBoard:
"""维护 Maisaka 阶段状态,并在独立终端中展示。"""
def __init__(self) -> None:
self._lock = threading.Lock()
self._enabled = False
self._entries: dict[str, dict[str, Any]] = {}
self._viewer_process: Optional[subprocess.Popen[Any]] = None
self._state_file = Path("temp") / "maisaka_stage_status.json"
self._state_file.parent.mkdir(parents=True, exist_ok=True)
def enable(self) -> None:
"""启用阶段状态看板。"""
with self._lock:
if self._enabled:
return
self._enabled = True
self._write_state_locked()
self._ensure_viewer_process_locked()
def disable(self) -> None:
"""禁用阶段状态看板。"""
with self._lock:
self._enabled = False
self._entries.clear()
self._write_state_locked()
process = self._viewer_process
self._viewer_process = None
if process is not None and process.poll() is None:
try:
process.terminate()
except Exception:
pass
def update(
self,
*,
session_id: str,
session_name: str,
stage: str,
detail: str = "",
round_text: str = "",
agent_state: str = "",
) -> None:
"""更新一个会话的阶段状态。"""
with self._lock:
if not self._enabled:
return
now = time.time()
current = self._entries.get(session_id, {})
previous_stage = str(current.get("stage") or "").strip()
stage_started_at = float(current.get("stage_started_at") or now)
if previous_stage != stage:
stage_started_at = now
self._entries[session_id] = {
"session_id": session_id,
"session_name": session_name,
"stage": stage,
"detail": detail,
"round_text": round_text,
"agent_state": agent_state,
"stage_started_at": stage_started_at,
"updated_at": now,
}
self._write_state_locked()
def remove(self, session_id: str) -> None:
"""移除一个会话的阶段状态。"""
with self._lock:
if not self._enabled:
return
self._entries.pop(session_id, None)
self._write_state_locked()
def _write_state_locked(self) -> None:
payload = {
"enabled": self._enabled,
"host_pid": os.getpid(),
"updated_at": time.time(),
"entries": list(self._entries.values()),
}
tmp_file = self._state_file.with_suffix(".tmp")
tmp_file.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
tmp_file.replace(self._state_file)
def _ensure_viewer_process_locked(self) -> None:
if not sys.platform.startswith("win"):
return
if self._viewer_process is not None and self._viewer_process.poll() is None:
return
creationflags = getattr(subprocess, "CREATE_NEW_CONSOLE", 0)
viewer_script = Path(__file__).resolve().with_name("stage_status_viewer.py")
self._viewer_process = subprocess.Popen(
[
sys.executable,
str(viewer_script),
str(self._state_file.resolve()),
],
creationflags=creationflags,
cwd=str(Path.cwd()),
)
_stage_board = MaisakaStageStatusBoard()
def enable_stage_status_board() -> None:
"""启用控制台阶段状态看板。"""
_stage_board.enable()
def disable_stage_status_board() -> None:
"""禁用控制台阶段状态看板。"""
_stage_board.disable()
def update_stage_status(
*,
session_id: str,
session_name: str,
stage: str,
detail: str = "",
round_text: str = "",
agent_state: str = "",
) -> None:
"""更新控制台阶段状态。"""
_stage_board.update(
session_id=session_id,
session_name=session_name,
stage=stage,
detail=detail,
round_text=round_text,
agent_state=agent_state,
)
def remove_stage_status(session_id: str) -> None:
"""移除控制台阶段状态。"""
_stage_board.remove(session_id)

View File

@@ -0,0 +1,93 @@
"""Maisaka 阶段状态看板查看器。"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import json
import os
import sys
import time
import traceback
def _clear_screen() -> None:
os.system("cls" if sys.platform.startswith("win") else "clear")
def _load_state(state_file: Path) -> dict[str, Any]:
if not state_file.exists():
return {}
try:
return json.loads(state_file.read_text(encoding="utf-8"))
except Exception:
return {}
def _render(state: dict[str, Any]) -> str:
entries = state.get("entries")
if not isinstance(entries, list):
entries = []
lines = ["Maisaka 阶段看板", "=" * 72, ""]
if not entries:
lines.append("当前没有活跃会话。")
return "\n".join(lines)
entries = sorted(
[entry for entry in entries if isinstance(entry, dict)],
key=lambda item: str(item.get("session_name") or item.get("session_id") or ""),
)
now = time.time()
for entry in entries:
session_name = str(entry.get("session_name") or entry.get("session_id") or "").strip() or "unknown"
session_id = str(entry.get("session_id") or "").strip()
stage = str(entry.get("stage") or "").strip() or "未知"
detail = str(entry.get("detail") or "").strip() or "-"
round_text = str(entry.get("round_text") or "").strip()
agent_state = str(entry.get("agent_state") or "").strip() or "-"
stage_started_at = float(entry.get("stage_started_at") or now)
elapsed = max(0.0, now - stage_started_at)
lines.append(f"Chat: {session_name}")
if session_id and session_id != session_name:
lines.append(f"ID: {session_id}")
lines.append(f"阶段: {stage}")
if round_text:
lines.append(f"轮次: {round_text}")
lines.append(f"详情: {detail}")
lines.append(f"状态: {agent_state}")
lines.append(f"阶段耗时: {elapsed:.1f}s")
lines.append("-" * 72)
return "\n".join(lines)
def main() -> int:
if len(sys.argv) < 2:
return 1
state_file = Path(sys.argv[1]).resolve()
log_file = state_file.with_name("maisaka_stage_status_viewer.log")
last_render = ""
while True:
try:
state = _load_state(state_file)
if not state.get("enabled", False):
return 0
rendered = _render(state)
if rendered != last_render:
_clear_screen()
print(rendered, flush=True)
last_render = rendered
time.sleep(0.5)
except Exception:
log_file.write_text(traceback.format_exc(), encoding="utf-8")
time.sleep(3)
return 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,125 @@
"""Maisaka 历史消息轮次结束后处理。"""
from dataclasses import dataclass
from .context_messages import AssistantMessage, LLMContextMessage, ToolResultMessage
from .history_utils import drop_leading_orphan_tool_results, drop_orphan_tool_results
TIMING_HISTORY_TOOL_NAMES = {"continue", "finish", "no_reply", "wait"}
EARLY_TRIM_RATIO = 0.2
@dataclass(slots=True)
class HistoryPostProcessResult:
"""历史后处理结果。"""
history: list[LLMContextMessage]
removed_count: int
remaining_context_count: int
def process_chat_history_after_cycle(
chat_history: list[LLMContextMessage],
*,
max_context_size: int,
) -> HistoryPostProcessResult:
"""在每轮结束后统一执行历史裁切与清理。"""
processed_history = list(chat_history)
removed_timing_tool_count = _remove_early_timing_tool_records(processed_history)
removed_assistant_thought_count = _remove_early_assistant_thoughts(processed_history)
processed_history, orphan_removed_count = drop_orphan_tool_results(processed_history)
remaining_context_count = sum(1 for message in processed_history if message.count_in_context)
removed_overflow_count = 0
while remaining_context_count > max_context_size and processed_history:
removed_message = processed_history.pop(0)
removed_overflow_count += 1
if removed_message.count_in_context:
remaining_context_count -= 1
processed_history, leading_orphan_removed_count = drop_leading_orphan_tool_results(processed_history)
removed_overflow_count += leading_orphan_removed_count
remaining_context_count = sum(1 for message in processed_history if message.count_in_context)
removed_count = (
removed_timing_tool_count
+ removed_assistant_thought_count
+ orphan_removed_count
+ removed_overflow_count
)
return HistoryPostProcessResult(
history=processed_history,
removed_count=removed_count,
remaining_context_count=remaining_context_count,
)
def _remove_early_timing_tool_records(chat_history: list[LLMContextMessage]) -> int:
"""移除最早 20% 的门控/结束类工具链记录。"""
candidate_assistant_indexes = [
index
for index, message in enumerate(chat_history)
if _is_timing_tool_assistant_message(message)
]
remove_count = int(len(candidate_assistant_indexes) * EARLY_TRIM_RATIO)
if remove_count <= 0:
return 0
removed_indexes = set(candidate_assistant_indexes[:remove_count])
removed_tool_call_ids = {
tool_call.call_id
for index in removed_indexes
for tool_call in chat_history[index].tool_calls
if tool_call.call_id
}
filtered_history: list[LLMContextMessage] = []
removed_total = 0
for index, message in enumerate(chat_history):
if index in removed_indexes:
removed_total += 1
continue
if isinstance(message, ToolResultMessage) and message.tool_call_id in removed_tool_call_ids:
removed_total += 1
continue
filtered_history.append(message)
chat_history[:] = filtered_history
return removed_total
def _remove_early_assistant_thoughts(chat_history: list[LLMContextMessage]) -> int:
"""移除最早 20% 的非工具 assistant 思考内容。"""
candidate_indexes = [
index
for index, message in enumerate(chat_history)
if isinstance(message, AssistantMessage)
and not message.tool_calls
and message.source_kind != "perception"
and bool(message.content.strip())
]
remove_count = int(len(candidate_indexes) * EARLY_TRIM_RATIO)
if remove_count <= 0:
return 0
removed_indexes = set(candidate_indexes[:remove_count])
filtered_history: list[LLMContextMessage] = []
removed_total = 0
for index, message in enumerate(chat_history):
if index in removed_indexes:
removed_total += 1
continue
filtered_history.append(message)
chat_history[:] = filtered_history
return removed_total
def _is_timing_tool_assistant_message(message: LLMContextMessage) -> bool:
if not isinstance(message, AssistantMessage) or not message.tool_calls:
return False
return all(tool_call.func_name in TIMING_HISTORY_TOOL_NAMES for tool_call in message.tool_calls)

View File

@@ -14,7 +14,7 @@ from src.chat.message_receive.message import SessionMessage
from src.common.data_models.message_component_data_model import EmojiComponent, ImageComponent, MessageSequence
from src.common.logger import get_logger
from src.common.prompt_i18n import load_prompt
from src.config.config import global_config
from src.config.config import config_manager, global_config
from src.core.tooling import ToolExecutionContext, ToolExecutionResult, ToolInvocation, ToolSpec
from src.llm_models.exceptions import ReqAbortException
from src.llm_models.payload_content.tool_option import ToolCall
@@ -35,7 +35,8 @@ from .context_messages import (
ToolResultMessage,
contains_complex_message,
)
from .history_utils import build_prefixed_message_sequence, build_session_message_visible_text, drop_leading_orphan_tool_results
from .history_post_processor import process_chat_history_after_cycle
from .history_utils import build_prefixed_message_sequence, build_session_message_visible_text
from .monitor_events import (
emit_cycle_start,
emit_message_ingested,
@@ -53,7 +54,7 @@ logger = get_logger("maisaka_reasoning_engine")
TIMING_GATE_CONTEXT_LIMIT = 24
TIMING_GATE_MAX_TOKENS = 384
TIMING_GATE_TOOL_NAMES = {"continue", "no_reply", "wait"}
ACTION_HIDDEN_TOOL_NAMES = {"continue", "no_reply", "wait"}
ACTION_HIDDEN_TOOL_NAMES = {"continue", "no_reply"}
ACTION_BUILTIN_TOOL_NAMES = {tool_spec.name for tool_spec in get_action_tool_specs()}
@@ -94,6 +95,7 @@ class MaisakaReasoningEngine:
async def _run_interruptible_planner(
self,
*,
injected_user_messages: Optional[list[str]] = None,
tool_definitions: Optional[list[dict[str, Any]]] = None,
) -> Any:
"""运行一轮可被新消息打断的主 planner 请求。"""
@@ -105,6 +107,7 @@ class MaisakaReasoningEngine:
try:
return await self._runtime._chat_loop_service.chat_loop_step(
self._runtime._chat_history,
injected_user_messages=injected_user_messages,
tool_definitions=tool_definitions,
)
except ReqAbortException:
@@ -117,36 +120,27 @@ class MaisakaReasoningEngine:
)
self._runtime._chat_loop_service.set_interrupt_flag(None)
async def _run_interruptible_sub_agent(
async def _run_timing_gate_sub_agent(
self,
*,
context_message_limit: int,
system_prompt: str,
tool_definitions: list[dict[str, Any]],
) -> Any:
"""运行一轮可被新消息打断的临时子代理请求。"""
"""运行一轮 Timing Gate 子代理请求。
interrupt_flag = asyncio.Event()
interrupted = False
self._runtime._bind_planner_interrupt_flag(interrupt_flag)
try:
return await self._runtime.run_sub_agent(
context_message_limit=context_message_limit,
system_prompt=system_prompt,
request_kind="timing_gate",
interrupt_flag=interrupt_flag,
max_tokens=TIMING_GATE_MAX_TOKENS,
temperature=0.1,
tool_definitions=tool_definitions,
)
except ReqAbortException:
interrupted = True
raise
finally:
self._runtime._unbind_planner_interrupt_flag(
interrupt_flag,
interrupted=interrupted,
)
Timing Gate 阶段不再响应新的 planner 打断,只有主 planner 阶段允许被打断。
"""
return await self._runtime.run_sub_agent(
context_message_limit=context_message_limit,
system_prompt=system_prompt,
request_kind="timing_gate",
interrupt_flag=None,
max_tokens=TIMING_GATE_MAX_TOKENS,
temperature=0.1,
tool_definitions=tool_definitions,
)
@staticmethod
def _build_timing_gate_fallback_prompt() -> str:
@@ -174,22 +168,34 @@ class MaisakaReasoningEngine:
except Exception:
return self._build_timing_gate_fallback_prompt()
async def _build_action_tool_definitions(self) -> list[dict[str, Any]]:
"""构造 Action Loop 阶段可见的工具定义。"""
async def _build_action_tool_definitions(self) -> tuple[list[dict[str, Any]], str]:
"""构造 Action Loop 阶段可见的工具定义与 deferred tools 提示"""
if self._runtime._tool_registry is None:
return []
self._runtime.update_deferred_tool_specs([])
self._runtime.set_current_action_tool_names([])
return [], ""
tool_specs = await self._runtime._tool_registry.list_tools()
return [
tool_spec.to_llm_definition()
for tool_spec in tool_specs
if tool_spec.name not in ACTION_HIDDEN_TOOL_NAMES
and (
tool_spec.provider_name != "maisaka_builtin"
or tool_spec.name in ACTION_BUILTIN_TOOL_NAMES
)
]
visible_builtin_tool_specs: list[ToolSpec] = []
deferred_tool_specs: list[ToolSpec] = []
for tool_spec in tool_specs:
if tool_spec.name in ACTION_HIDDEN_TOOL_NAMES:
continue
if tool_spec.provider_name == "maisaka_builtin":
if tool_spec.name in ACTION_BUILTIN_TOOL_NAMES:
visible_builtin_tool_specs.append(tool_spec)
continue
deferred_tool_specs.append(tool_spec)
self._runtime.update_deferred_tool_specs(deferred_tool_specs)
discovered_deferred_tool_specs = self._runtime.get_discovered_deferred_tool_specs()
visible_tool_specs = [*visible_builtin_tool_specs, *discovered_deferred_tool_specs]
self._runtime.set_current_action_tool_names([tool_spec.name for tool_spec in visible_tool_specs])
return (
[tool_spec.to_llm_definition() for tool_spec in visible_tool_specs],
self._runtime.build_deferred_tools_reminder(),
)
async def _invoke_tool_call(
self,
@@ -227,18 +233,19 @@ class MaisakaReasoningEngine:
async def _run_timing_gate(
self,
anchor_message: SessionMessage,
) -> tuple[Literal["continue", "no_reply", "wait"], Any, list[str]]:
) -> tuple[Literal["continue", "no_reply", "wait"], Any, list[str], list[dict[str, Any]]]:
"""运行 Timing Gate 子代理并返回控制决策。"""
if self._runtime._force_continue_until_reply:
if self._runtime._force_next_timing_continue:
return self._build_forced_continue_timing_result()
response = await self._run_interruptible_sub_agent(
response = await self._run_timing_gate_sub_agent(
context_message_limit=TIMING_GATE_CONTEXT_LIMIT,
system_prompt=self._build_timing_gate_system_prompt(),
tool_definitions=get_timing_tools(),
)
tool_result_summaries: list[str] = []
tool_monitor_results: list[dict[str, Any]] = []
selected_tool_call: Optional[ToolCall] = None
for tool_call in response.tool_calls:
if tool_call.func_name in TIMING_GATE_TOOL_NAMES:
@@ -247,11 +254,11 @@ class MaisakaReasoningEngine:
if selected_tool_call is None:
logger.warning(f"{self._runtime.log_prefix} Timing Gate 未返回有效控制工具,默认继续执行 Action Loop")
return "continue", response, tool_result_summaries
return "continue", response, tool_result_summaries, tool_monitor_results
append_history = selected_tool_call.func_name != "continue"
append_history = False
store_record = selected_tool_call.func_name != "continue"
_, result, _ = await self._invoke_tool_call(
invocation, result, tool_spec = await self._invoke_tool_call(
selected_tool_call,
response.content or "",
anchor_message,
@@ -259,19 +266,31 @@ class MaisakaReasoningEngine:
store_record=store_record,
)
tool_result_summaries.append(self._build_tool_result_summary(selected_tool_call, result))
tool_monitor_results.append(
self._build_tool_monitor_result(
selected_tool_call,
invocation,
result,
duration_ms=0.0,
tool_spec=tool_spec,
)
)
self._append_timing_gate_execution_result(response, selected_tool_call, result)
timing_action = str(result.metadata.get("timing_action") or selected_tool_call.func_name).strip()
if timing_action not in TIMING_GATE_TOOL_NAMES:
logger.warning(
f"{self._runtime.log_prefix} Timing Gate 返回未知动作 {timing_action!r},将按 continue 处理"
)
return "continue", response, tool_result_summaries
return timing_action, response, tool_result_summaries
return "continue", response, tool_result_summaries, tool_monitor_results
return timing_action, response, tool_result_summaries, tool_monitor_results
def _build_forced_continue_timing_result(self) -> tuple[Literal["continue"], ChatResponse, list[str]]:
def _build_forced_continue_timing_result(
self,
) -> tuple[Literal["continue"], ChatResponse, list[str], list[dict[str, Any]]]:
"""构造跳过 Timing Gate 时使用的伪 continue 结果。"""
reason = self._runtime._build_force_continue_timing_reason()
reason = self._runtime._consume_force_next_timing_continue_reason() or "本轮直接跳过 Timing Gate 并视作 continue。"
logger.info(f"{self._runtime.log_prefix} {reason}")
return (
"continue",
@@ -296,8 +315,24 @@ class MaisakaReasoningEngine:
prompt_section=None,
),
[f"- continue [强制跳过]: {reason}"],
[],
)
@staticmethod
def _mark_timing_gate_completed(timing_action: str) -> bool:
"""根据门控动作决定下一轮是否还需要重新执行 timing。"""
return timing_action != "continue"
@staticmethod
def _should_retry_planner_after_interrupt(
*,
round_index: int,
max_internal_rounds: int,
has_pending_messages: bool,
) -> bool:
return has_pending_messages and round_index + 1 < max_internal_rounds
async def run_loop(self) -> None:
"""独立消费消息批次,并执行对应的内部思考轮次。"""
try:
@@ -314,13 +349,20 @@ class MaisakaReasoningEngine:
if self._runtime._has_pending_messages()
else []
)
if not timeout_triggered and not cached_messages and not message_triggered:
if not timeout_triggered and not cached_messages:
continue
self._runtime._agent_state = self._runtime._STATE_RUNNING
self._runtime._update_stage_status(
"消息整理",
f"待处理消息 {len(cached_messages)}" if cached_messages else "准备复用超时锚点",
)
if cached_messages:
asyncio.create_task(self._runtime._trigger_batch_learning(cached_messages))
self._append_wait_interrupted_message_if_needed()
if timeout_triggered:
self._runtime._chat_history.append(
self._build_wait_completed_message(has_new_messages=True)
)
await self._ingest_messages(cached_messages)
anchor_message = cached_messages[-1]
else:
@@ -332,13 +374,16 @@ class MaisakaReasoningEngine:
continue
logger.info(f"{self._runtime.log_prefix} 等待超时后开始新一轮思考")
if self._runtime._pending_wait_tool_call_id:
self._runtime._chat_history.append(self._build_wait_timeout_message())
self._trim_chat_history()
self._runtime._chat_history.append(
self._build_wait_completed_message(has_new_messages=False)
)
try:
timing_gate_required = True
for round_index in range(self._runtime._max_internal_rounds):
cycle_detail = self._start_cycle()
round_text = f"{round_index + 1}/{self._runtime._max_internal_rounds}"
self._runtime._log_cycle_started(cycle_detail, round_index)
self._runtime._update_stage_status("启动循环", f"循环 {cycle_detail.cycle_id}", round_text=round_text)
await emit_cycle_start(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
@@ -349,10 +394,14 @@ class MaisakaReasoningEngine:
planner_started_at = 0.0
planner_duration_ms = 0.0
timing_duration_ms = 0.0
current_stage_started_at = 0.0
timing_action: Optional[str] = None
timing_response: Optional[ChatResponse] = None
timing_tool_results: Optional[list[str]] = None
timing_tool_monitor_results: Optional[list[dict[str, Any]]] = None
response: Optional[ChatResponse] = None
action_tool_definitions: list[dict[str, Any]] = []
planner_extra_lines: list[str] = []
tool_result_summaries: list[str] = []
tool_monitor_results: list[dict[str, Any]] = []
try:
@@ -364,30 +413,46 @@ class MaisakaReasoningEngine:
f"{self._runtime.log_prefix} 本轮思考前已刷新 {refreshed_message_count} 条视觉占位历史消息"
)
timing_started_at = time.time()
timing_action, timing_response, timing_tool_results = await self._run_timing_gate(anchor_message)
timing_duration_ms = (time.time() - timing_started_at) * 1000
cycle_detail.time_records["timing_gate"] = timing_duration_ms / 1000
await emit_timing_gate_result(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
action=timing_action,
content=timing_response.content,
tool_calls=timing_response.tool_calls,
messages=[],
prompt_tokens=timing_response.prompt_tokens,
selected_history_count=timing_response.selected_history_count,
duration_ms=timing_duration_ms,
)
if timing_action != "continue":
logger.info(
f"{self._runtime.log_prefix} Timing Gate 结束当前回合: "
f"回合={round_index + 1} 动作={timing_action}"
if timing_gate_required:
self._runtime._update_stage_status("Timing Gate", "等待门控决策", round_text=round_text)
current_stage_started_at = time.time()
timing_started_at = time.time()
(
timing_action,
timing_response,
timing_tool_results,
timing_tool_monitor_results,
) = await self._run_timing_gate(anchor_message)
timing_duration_ms = (time.time() - timing_started_at) * 1000
cycle_detail.time_records["timing_gate"] = timing_duration_ms / 1000
await emit_timing_gate_result(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
action=timing_action,
content=timing_response.content,
tool_calls=timing_response.tool_calls,
messages=[],
prompt_tokens=timing_response.prompt_tokens,
selected_history_count=timing_response.selected_history_count,
duration_ms=timing_duration_ms,
)
timing_gate_required = self._mark_timing_gate_completed(timing_action)
if timing_action != "continue":
logger.debug(
f"{self._runtime.log_prefix} Timing Gate 结束当前回合: "
f"回合={round_index + 1} 动作={timing_action}"
)
break
else:
logger.info(
f"{self._runtime.log_prefix} 跳过 Timing Gate继续执行 Planner: "
f"回合={round_index + 1}"
)
break
planner_started_at = time.time()
action_tool_definitions = await self._build_action_tool_definitions()
current_stage_started_at = planner_started_at
self._runtime._update_stage_status("Planner", "组织上下文并请求模型", round_text=round_text)
action_tool_definitions, deferred_tools_reminder = await self._build_action_tool_definitions()
logger.info(
f"{self._runtime.log_prefix} 规划器开始执行: "
f"回合={round_index + 1} "
@@ -395,6 +460,7 @@ class MaisakaReasoningEngine:
f"开始时间={planner_started_at:.3f}"
)
response = await self._run_interruptible_planner(
injected_user_messages=[deferred_tools_reminder] if deferred_tools_reminder else None,
tool_definitions=action_tool_definitions,
)
planner_duration_ms = (time.time() - planner_started_at) * 1000
@@ -406,8 +472,8 @@ class MaisakaReasoningEngine:
)
reasoning_content = response.content or ""
if self._should_replace_reasoning(reasoning_content):
response.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具"
response.raw_message.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具"
response.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后直接输出我的想法"
response.raw_message.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后直接输出我的想法"
logger.info(f"{self._runtime.log_prefix} 当前思考与上一轮过于相似,已替换为重新思考提示")
self._last_reasoning_content = reasoning_content
@@ -428,20 +494,73 @@ class MaisakaReasoningEngine:
if not response.content:
break
except ReqAbortException:
interrupted_at = time.time()
logger.info(
f"{self._runtime.log_prefix} 规划器打断成功: "
f"回合={round_index + 1} "
f"开始时间={planner_started_at:.3f} "
f"打断时间={interrupted_at:.3f} "
f"耗时={interrupted_at - planner_started_at:.3f}"
except ReqAbortException as exc:
self._runtime._update_stage_status(
"Planner 已打断",
str(exc) or "收到外部中断信号",
round_text=round_text,
)
break
interrupted_at = time.time()
interrupted_stage_label = "Planner"
interrupted_text = "Planner 收到新消息,开始重新决策"
interrupted_response = ChatResponse(
content=interrupted_text or None,
tool_calls=[],
request_messages=[],
raw_message=AssistantMessage(
content=interrupted_text,
timestamp=datetime.now(),
tool_calls=[],
source_kind="perception",
),
selected_history_count=len(self._runtime._chat_history),
tool_count=len(action_tool_definitions),
prompt_tokens=0,
built_message_count=0,
completion_tokens=0,
total_tokens=0,
prompt_section=None,
)
interrupted_extra_lines = [
"状态:已被新消息打断",
f"打断位置:{interrupted_stage_label} 请求流式响应阶段",
f"打断耗时:{interrupted_at - current_stage_started_at:.3f}",
]
response = interrupted_response
planner_extra_lines = interrupted_extra_lines
logger.info(
f"{self._runtime.log_prefix} {interrupted_stage_label} 打断成功: "
f"回合={round_index + 1} "
f"开始时间={current_stage_started_at:.3f} "
f"打断时间={interrupted_at:.3f} "
f"耗时={interrupted_at - current_stage_started_at:.3f}"
)
if not self._should_retry_planner_after_interrupt(
round_index=round_index,
max_internal_rounds=self._runtime._max_internal_rounds,
has_pending_messages=self._runtime._has_pending_messages(),
):
break
await self._runtime._wait_for_message_quiet_period()
self._runtime._message_turn_scheduled = False
interrupted_messages = self._runtime._collect_pending_messages()
if not interrupted_messages:
break
asyncio.create_task(self._runtime._trigger_batch_learning(interrupted_messages))
await self._ingest_messages(interrupted_messages)
anchor_message = interrupted_messages[-1]
logger.info(
f"{self._runtime.log_prefix} 淇濇寔娲昏穬鐘舵€侊紝璺宠繃 Timing Gate 鐩存帴閲嶈瘯 Planner: "
f"鍥炲悎={round_index + 2}"
)
continue
finally:
completed_cycle = self._end_cycle(cycle_detail)
self._runtime._render_context_usage_panel(
cycle_id=cycle_detail.cycle_id,
time_records=dict(completed_cycle.time_records),
timing_selected_history_count=(
timing_response.selected_history_count if timing_response is not None else None
),
@@ -452,6 +571,7 @@ class MaisakaReasoningEngine:
timing_response=timing_response.content or "" if timing_response is not None else "",
timing_tool_calls=timing_response.tool_calls if timing_response is not None else None,
timing_tool_results=timing_tool_results,
timing_tool_detail_results=timing_tool_monitor_results,
timing_prompt_section=(
timing_response.prompt_section if timing_response is not None else None
),
@@ -464,6 +584,7 @@ class MaisakaReasoningEngine:
planner_tool_results=tool_result_summaries,
planner_tool_detail_results=tool_monitor_results,
planner_prompt_section=response.prompt_section if response is not None else None,
planner_extra_lines=planner_extra_lines,
)
await emit_planner_finalized(
session_id=self._runtime.session_id,
@@ -505,6 +626,8 @@ class MaisakaReasoningEngine:
finally:
if self._runtime._agent_state == self._runtime._STATE_RUNNING:
self._runtime._agent_state = self._runtime._STATE_STOP
if self._runtime._running:
self._runtime._update_stage_status("等待消息", "本轮处理结束")
except asyncio.CancelledError:
self._runtime._log_internal_loop_cancelled()
raise
@@ -543,33 +666,22 @@ class MaisakaReasoningEngine:
return self._runtime.message_cache[-1]
return None
def _build_wait_timeout_message(self) -> ToolResultMessage:
"""构造 wait 超时后的工具结果消息。"""
def _build_wait_completed_message(self, *, has_new_messages: bool) -> ToolResultMessage:
"""构造 wait 完成后的工具结果消息。"""
tool_call_id = self._runtime._pending_wait_tool_call_id or "wait_timeout"
self._runtime._pending_wait_tool_call_id = None
content = (
"等待已结束,期间收到了新的用户输入。请结合这些新消息继续下一轮思考。"
if has_new_messages
else "等待已超时,期间没有收到新的用户输入。请基于现有上下文继续下一轮思考。"
)
return ToolResultMessage(
content="等待已超时,期间没有收到新的用户输入。请基于现有上下文继续下一轮思考。",
content=content,
timestamp=datetime.now(),
tool_call_id=tool_call_id,
tool_name="wait",
)
def _append_wait_interrupted_message_if_needed(self) -> None:
"""如果 wait 被新消息打断,则补一条对应的工具结果消息。"""
tool_call_id = self._runtime._pending_wait_tool_call_id
if not tool_call_id:
return
self._runtime._pending_wait_tool_call_id = None
self._runtime._chat_history.append(
ToolResultMessage(
content="等待过程被新的用户输入打断,已继续处理最新消息。",
timestamp=datetime.now(),
tool_call_id=tool_call_id,
tool_name="wait",
)
)
async def _ingest_messages(self, messages: list[SessionMessage]) -> None:
"""处理传入消息列表,将其转换为历史消息并加入聊天历史缓存。"""
for message in messages:
@@ -578,7 +690,6 @@ class MaisakaReasoningEngine:
continue
self._insert_chat_history_message(history_message)
self._trim_chat_history()
# 向监控前端广播新消息注入事件
user_info = message.message_info.user_info
@@ -628,10 +739,47 @@ class MaisakaReasoningEngine:
planner_prefix: str,
) -> MessageSequence:
message_sequence = build_prefixed_message_sequence(message.raw_message, planner_prefix)
if global_config.visual.multimodal_planner:
if self._resolve_enable_visual_planner():
await self._hydrate_visual_components(message_sequence.components)
return message_sequence
@staticmethod
def _resolve_enable_visual_planner() -> bool:
planner_mode = global_config.visual.planner_mode
planner_task_config = config_manager.get_model_config().model_task_config.planner
models_by_name = {model.name: model for model in config_manager.get_model_config().models}
if planner_mode == "text":
return False
planner_models: list[str] = list(planner_task_config.model_list)
missing_models = [model_name for model_name in planner_models if model_name not in models_by_name]
non_visual_models = [
model_name for model_name in planner_models if model_name in models_by_name and not models_by_name[model_name].visual
]
if planner_mode == "multimodal":
if missing_models:
raise ValueError(
"planner_mode=multimodal但 planner 任务存在未定义的模型:"
f"{', '.join(missing_models)}"
)
if non_visual_models:
raise ValueError(
"planner_mode=multimodal但 planner 任务存在未开启 visual 的模型:"
f"{', '.join(non_visual_models)}"
)
return True
if missing_models:
logger.warning(
"planner_mode=auto 时发现 planner 任务存在未定义模型:"
f"{', '.join(missing_models)},将退化为纯文本 planner"
)
return False
return bool(planner_models) and not non_visual_models
async def _hydrate_visual_components(self, planner_components: list[object]) -> None:
"""在 Maisaka 真正需要图片或表情时,按需回填二进制数据。"""
load_tasks: list[asyncio.Task[None]] = []
@@ -681,6 +829,7 @@ class MaisakaReasoningEngine:
"""结束并记录一轮 Maisaka 思考循环。"""
cycle_detail.end_time = time.time()
self._runtime.history_loop.append(cycle_detail)
self._post_process_chat_history_after_cycle()
timer_strings = [
f"{name}: {duration:.2f}s"
@@ -690,26 +839,20 @@ class MaisakaReasoningEngine:
self._runtime._log_cycle_completed(cycle_detail, timer_strings)
return cycle_detail
def _trim_chat_history(self) -> None:
def _post_process_chat_history_after_cycle(self) -> None:
"""裁剪聊天历史,保证用户消息数量不超过配置限制。"""
conversation_message_count = sum(1 for message in self._runtime._chat_history if message.count_in_context)
if conversation_message_count <= self._runtime._max_context_size:
process_result = process_chat_history_after_cycle(
self._runtime._chat_history,
max_context_size=self._runtime._max_context_size,
)
if process_result.removed_count <= 0:
return
trimmed_history = list(self._runtime._chat_history)
removed_count = 0
while conversation_message_count > self._runtime._max_context_size and trimmed_history:
removed_message = trimmed_history.pop(0)
removed_count += 1
if removed_message.count_in_context:
conversation_message_count -= 1
trimmed_history, pruned_orphan_count = drop_leading_orphan_tool_results(trimmed_history)
removed_count += pruned_orphan_count
self._runtime._chat_history = trimmed_history
self._runtime._log_history_trimmed(removed_count, conversation_message_count)
self._runtime._chat_history = process_result.history
self._runtime._log_history_trimmed(
process_result.removed_count,
process_result.remaining_context_count,
)
@staticmethod
def _calculate_similarity(text1: str, text2: str) -> float:
@@ -934,6 +1077,9 @@ class MaisakaReasoningEngine:
if invocation.tool_name == "no_reply":
return "你暂停了当前对话循环,等待新的外部消息。"
if invocation.tool_name == "finish":
return "你结束了本轮思考,等待新的外部消息后再继续。"
if invocation.tool_name == "continue":
return "你允许当前对话继续进入下一轮完整思考与工具执行。"
@@ -1065,6 +1211,24 @@ class MaisakaReasoningEngine:
)
)
def _append_timing_gate_execution_result(
self,
response: ChatResponse,
tool_call: ToolCall,
result: ToolExecutionResult,
) -> None:
"""将 Timing Gate 的决策链写入历史,供后续门控复用。"""
self._runtime._chat_history.append(
AssistantMessage(
content=response.content or "",
timestamp=response.raw_message.timestamp,
tool_calls=[tool_call],
source_kind="timing_gate",
)
)
self._append_tool_execution_result(tool_call, result)
def _build_tool_result_summary(self, tool_call: ToolCall, result: ToolExecutionResult) -> str:
"""构建用于终端展示的工具结果摘要。"""
@@ -1084,6 +1248,7 @@ class MaisakaReasoningEngine:
invocation: ToolInvocation,
result: ToolExecutionResult,
duration_ms: float,
tool_spec: Optional[ToolSpec] = None,
) -> dict[str, Any]:
"""构建 planner.finalized 中单个工具的监控结果。"""
@@ -1092,9 +1257,20 @@ class MaisakaReasoningEngine:
if monitor_detail is not None:
normalized_detail = self._normalize_tool_record_value(monitor_detail)
monitor_card = result.metadata.get("monitor_card")
normalized_card = None
if monitor_card is not None:
normalized_card = self._normalize_tool_record_value(monitor_card)
monitor_sub_cards = result.metadata.get("monitor_sub_cards")
normalized_sub_cards = None
if monitor_sub_cards is not None:
normalized_sub_cards = self._normalize_tool_record_value(monitor_sub_cards)
return {
"tool_call_id": tool_call.call_id,
"tool_name": tool_call.func_name,
"tool_title": tool_spec.title.strip() if tool_spec is not None and tool_spec.title.strip() else "",
"tool_args": self._normalize_tool_record_value(
invocation.arguments if isinstance(invocation.arguments, dict) else {}
),
@@ -1102,6 +1278,8 @@ class MaisakaReasoningEngine:
"duration_ms": round(duration_ms, 2),
"summary": self._build_tool_result_summary(tool_call, result),
"detail": normalized_detail,
"card": normalized_card,
"sub_cards": normalized_sub_cards,
}
async def _handle_tool_calls(
@@ -1137,7 +1315,7 @@ class MaisakaReasoningEngine:
self._append_tool_execution_result(tool_call, result)
tool_result_summaries.append(self._build_tool_result_summary(tool_call, result))
tool_monitor_results.append(
self._build_tool_monitor_result(tool_call, invocation, result, duration_ms=0.0)
self._build_tool_monitor_result(tool_call, invocation, result, duration_ms=0.0, tool_spec=None)
)
return False, tool_result_summaries, tool_monitor_results
@@ -1146,10 +1324,25 @@ class MaisakaReasoningEngine:
tool_spec.name: tool_spec
for tool_spec in await self._runtime._tool_registry.list_tools()
}
for tool_call in tool_calls:
total_tool_count = len(tool_calls)
for tool_index, tool_call in enumerate(tool_calls, start=1):
invocation = self._build_tool_invocation(tool_call, latest_thought)
self._runtime._update_stage_status(
f"工具执行 · {invocation.tool_name}",
f"{tool_index}/{total_tool_count} 个工具",
)
tool_started_at = time.time()
result = await self._runtime._tool_registry.invoke(invocation, execution_context)
if not self._runtime.is_action_tool_currently_available(invocation.tool_name):
result = ToolExecutionResult(
tool_name=invocation.tool_name,
success=False,
error_message=(
f"工具 {invocation.tool_name} 当前未直接暴露给 planner。"
"如果它在 deferred tools 提示中,请先调用 tool_search。"
),
)
else:
result = await self._runtime._tool_registry.invoke(invocation, execution_context)
tool_duration_ms = (time.time() - tool_started_at) * 1000
await self._store_tool_execution_record(
invocation,
@@ -1159,7 +1352,13 @@ class MaisakaReasoningEngine:
self._append_tool_execution_result(tool_call, result)
tool_result_summaries.append(self._build_tool_result_summary(tool_call, result))
tool_monitor_results.append(
self._build_tool_monitor_result(tool_call, invocation, result, tool_duration_ms)
self._build_tool_monitor_result(
tool_call,
invocation,
result,
tool_duration_ms,
tool_spec=tool_spec_map.get(invocation.tool_name),
)
)
if not result.success and tool_call.func_name == "reply":

View File

@@ -21,7 +21,7 @@ from src.common.data_models.mai_message_data_model import GroupInfo, UserInfo
from src.common.logger import get_logger
from src.common.utils.utils_config import ChatConfigUtils, ExpressionConfigUtils
from src.config.config import global_config
from src.core.tooling import ToolRegistry
from src.core.tooling import ToolRegistry, ToolSpec
from src.learners.expression_learner import ExpressionLearner
from src.learners.jargon_miner import JargonMiner
from src.llm_models.payload_content.resp_format import RespFormat
@@ -30,11 +30,13 @@ from src.mcp_module import MCPManager
from src.mcp_module.host_llm_bridge import MCPHostLLMBridge
from src.mcp_module.provider import MCPToolProvider
from src.plugin_runtime.tool_provider import PluginToolProvider
from src.plugin_runtime.hook_payloads import deserialize_prompt_messages
from .chat_loop_service import ChatResponse, MaisakaChatLoopService
from .context_messages import LLMContextMessage
from .display_utils import build_tool_call_summary_lines, format_token_count
from .prompt_cli_renderer import PromptCLIVisualizer
from .display.display_utils import build_tool_call_summary_lines, format_token_count
from .display.prompt_cli_renderer import PromptCLIVisualizer
from .display.stage_status_board import remove_stage_status, update_stage_status
from .reasoning_engine import MaisakaReasoningEngine
from .tool_provider import MaisakaBuiltinToolProvider
@@ -92,14 +94,16 @@ class MaisakaHeartFlowChatting:
self._max_internal_rounds = MAX_INTERNAL_ROUNDS
self._max_context_size = max(1, int(global_config.chat.max_context_size))
self._agent_state: Literal["running", "wait", "stop"] = self._STATE_STOP
self._wait_until: Optional[float] = None
self._pending_wait_tool_call_id: Optional[str] = None
self._force_continue_until_reply = False
self._force_continue_trigger_message_id = ""
self._force_continue_trigger_reason = ""
self._force_next_timing_continue = False
self._force_next_timing_message_id = ""
self._force_next_timing_reason = ""
self._planner_interrupt_flag: Optional[asyncio.Event] = None
self._planner_interrupt_requested = False
self._planner_interrupt_consecutive_count = 0
self._current_action_tool_names: set[str] = set()
self.discovered_tool_names: set[str] = set()
self.deferred_tool_specs_by_name: dict[str, ToolSpec] = {}
self._planner_interrupt_max_consecutive_count = max(
0,
int(global_config.chat.planner_interrupt_max_consecutive_count),
@@ -118,6 +122,18 @@ class MaisakaHeartFlowChatting:
self._tool_registry = ToolRegistry()
self._register_tool_providers()
def _update_stage_status(self, stage: str, detail: str = "", *, round_text: str = "") -> None:
"""更新当前会话的阶段状态。"""
update_stage_status(
session_id=self.session_id,
session_name=self.session_name,
stage=stage,
detail=detail,
round_text=round_text,
agent_state=self._agent_state,
)
async def start(self) -> None:
"""启动运行时主循环。"""
if self._running:
@@ -130,6 +146,7 @@ class MaisakaHeartFlowChatting:
self._running = True
self._ensure_background_tasks_running()
self._schedule_message_turn()
self._update_stage_status("空闲", "等待消息触发")
logger.info(f"{self.log_prefix} Maisaka 运行时已启动")
async def stop(self) -> None:
@@ -157,6 +174,7 @@ class MaisakaHeartFlowChatting:
await self._tool_registry.close()
self._mcp_manager = None
self._mcp_host_bridge = None
remove_stage_status(self.session_id)
logger.info(f"{self.log_prefix} Maisaka 运行时已停止")
@@ -175,9 +193,6 @@ class MaisakaHeartFlowChatting:
self.message_cache.append(message)
self._message_received_at_by_id[message.message_id] = received_at
self._source_messages_by_id[message.message_id] = message
if self._agent_state == self._STATE_WAIT:
self._cancel_wait_timeout_task()
self._wait_until = None
if self._agent_state == self._STATE_RUNNING:
self._message_debounce_required = True
if self._agent_state == self._STATE_RUNNING and self._planner_interrupt_flag is not None:
@@ -248,7 +263,6 @@ class MaisakaHeartFlowChatting:
def _record_reply_sent(self) -> None:
"""在成功发送 reply 后记录本轮消息回复时长。"""
self._clear_force_continue_until_reply()
if self._reply_latency_measurement_started_at is None:
return
@@ -308,26 +322,26 @@ class MaisakaHeartFlowChatting:
if not message.is_at and not message.is_mentioned:
return
self._arm_force_continue_until_reply(
self._arm_force_next_timing_continue(
message,
is_at=message.is_at,
is_mentioned=message.is_mentioned,
)
def _arm_force_continue_until_reply(
def _arm_force_next_timing_continue(
self,
message: SessionMessage,
*,
is_at: bool,
is_mentioned: bool,
) -> None:
"""在检测到 @ 或提及时,要求后续轮次跳过 Timing Gate 直到成功 reply"""
"""在检测到 @ 或提及时,要求下一次 Timing Gate 直接 continue"""
trigger_reason = "@消息" if is_at else "提及消息" if is_mentioned else "触发消息"
was_armed = self._force_continue_until_reply
self._force_continue_until_reply = True
self._force_continue_trigger_message_id = message.message_id
self._force_continue_trigger_reason = trigger_reason
was_armed = self._force_next_timing_continue
self._force_next_timing_continue = True
self._force_next_timing_message_id = message.message_id
self._force_next_timing_reason = trigger_reason
if was_armed:
logger.info(
@@ -337,34 +351,31 @@ class MaisakaHeartFlowChatting:
return
logger.info(
f"{self.log_prefix} 检测到{trigger_reason}将跳过 Timing Gate 直到成功发送一条 reply"
f"{self.log_prefix} 检测到{trigger_reason}下一次 Timing Gate 将直接视作 continue"
f"消息编号={message.message_id}"
)
def _clear_force_continue_until_reply(self) -> None:
"""在成功发送 reply 后清理强制 continue 状态。"""
def _consume_force_next_timing_continue_reason(self) -> str | None:
"""消费一次性 Timing Gate continue 状态,并返回原因描述"""
if not self._force_continue_until_reply:
return
if not self._force_next_timing_continue:
return None
logger.info(
f"{self.log_prefix} 已成功发送 reply恢复 Timing Gate"
f"触发原因={self._force_continue_trigger_reason or '未知'} "
f"触发消息编号={self._force_continue_trigger_message_id or 'unknown'}"
)
self._force_continue_until_reply = False
self._force_continue_trigger_message_id = ""
self._force_continue_trigger_reason = ""
def _build_force_continue_timing_reason(self) -> str:
"""返回当前强制跳过 Timing Gate 的原因描述。"""
trigger_reason = self._force_continue_trigger_reason or "@/提及消息"
trigger_message_id = self._force_continue_trigger_message_id or "unknown"
return (
trigger_reason = self._force_next_timing_reason or "@/提及消息"
trigger_message_id = self._force_next_timing_message_id or "unknown"
reason = (
f"检测到新的{trigger_reason}(消息编号={trigger_message_id}"
"本轮直接跳过 Timing Gate 并视作 continue,直到成功发送一条 reply"
"本轮直接跳过 Timing Gate 并视作 continue。"
)
logger.info(
f"{self.log_prefix} 已结束本次强制 continue恢复 Timing Gate"
f"触发原因={trigger_reason} "
f"触发消息编号={trigger_message_id}"
)
self._force_next_timing_continue = False
self._force_next_timing_message_id = ""
self._force_next_timing_reason = ""
return reason
def _bind_planner_interrupt_flag(self, interrupt_flag: asyncio.Event) -> None:
"""绑定当前可打断请求使用的中断标记。"""
@@ -426,6 +437,7 @@ class MaisakaHeartFlowChatting:
selected_history, _ = MaisakaChatLoopService.select_llm_context_messages(
self._chat_history,
request_kind=request_kind,
max_context_size=context_message_limit,
)
sub_agent_history = list(selected_history)
@@ -447,11 +459,133 @@ class MaisakaHeartFlowChatting:
tool_definitions=[] if tool_definitions is None else tool_definitions,
)
def set_current_action_tool_names(self, tool_names: Sequence[str]) -> None:
"""记录当前 Action Loop 已实际暴露给 planner 的工具名集合。"""
self._current_action_tool_names = {tool_name for tool_name in tool_names if str(tool_name).strip()}
def is_action_tool_currently_available(self, tool_name: str) -> bool:
"""判断指定工具在当前 Action Loop 轮次中是否真实可用。"""
normalized_name = str(tool_name).strip()
return bool(normalized_name) and normalized_name in self._current_action_tool_names
def update_deferred_tool_specs(self, deferred_tool_specs: Sequence[ToolSpec]) -> None:
"""刷新当前会话的 deferred tools 池,并清理失效的已发现工具。"""
next_specs_by_name: dict[str, ToolSpec] = {}
for tool_spec in deferred_tool_specs:
normalized_name = tool_spec.name.strip()
if not normalized_name:
continue
next_specs_by_name[normalized_name] = tool_spec
self.deferred_tool_specs_by_name = next_specs_by_name
self.discovered_tool_names.intersection_update(next_specs_by_name.keys())
def get_discovered_deferred_tool_specs(self) -> list[ToolSpec]:
"""返回当前会话中已发现、且仍然有效的 deferred tools。"""
return [
tool_spec
for tool_name, tool_spec in self.deferred_tool_specs_by_name.items()
if tool_name in self.discovered_tool_names
]
def build_deferred_tools_reminder(self) -> str:
"""构造供 planner 使用的 deferred tools 提示消息。"""
undiscovered_tool_specs = [
tool_spec
for tool_name, tool_spec in self.deferred_tool_specs_by_name.items()
if tool_name not in self.discovered_tool_names
]
if not undiscovered_tool_specs:
return ""
tool_lines: list[str] = []
for index, tool_spec in enumerate(undiscovered_tool_specs, start=1):
tool_name = tool_spec.name.strip()
tool_description = tool_spec.brief_description.strip()
if tool_description:
tool_lines.append(f"{index}. {tool_name}: {tool_description}")
else:
tool_lines.append(f"{index}. {tool_name}")
reminder_lines = [
"<system-reminder>",
"以下工具当前未直接暴露给你,但可以通过 tool_search 工具发现并在后续轮次中使用:",
*tool_lines,
"",
"如需其中某个工具,请先调用 tool_search。tool_search 只负责发现工具,不直接执行业务。",
"</system-reminder>",
]
return "\n".join(reminder_lines)
def search_deferred_tool_specs(
self,
query: str,
*,
limit: int,
) -> list[ToolSpec]:
"""按名称或简要描述搜索 deferred tools。"""
normalized_query = " ".join(query.lower().split()).strip()
if not normalized_query:
return []
scored_matches: list[tuple[int, str, ToolSpec]] = []
query_terms = [term for term in normalized_query.replace("_", " ").replace("-", " ").split() if term]
for tool_name, tool_spec in self.deferred_tool_specs_by_name.items():
lower_name = tool_name.lower()
lower_description = tool_spec.brief_description.lower()
score = 0
if normalized_query == lower_name:
score += 1000
if lower_name.startswith(normalized_query):
score += 300
if normalized_query in lower_name:
score += 200
if normalized_query in lower_description:
score += 100
for query_term in query_terms:
if query_term in lower_name:
score += 25
if query_term in lower_description:
score += 10
if score <= 0:
continue
scored_matches.append((score, tool_name, tool_spec))
scored_matches.sort(key=lambda item: (-item[0], item[1]))
return [tool_spec for _, _, tool_spec in scored_matches[: max(1, limit)]]
def discover_deferred_tools(self, tool_names: Sequence[str]) -> list[str]:
"""将指定 deferred tools 标记为已发现,并返回本次新发现的工具名。"""
newly_discovered_tool_names: list[str] = []
for raw_tool_name in tool_names:
normalized_name = str(raw_tool_name).strip()
if not normalized_name or normalized_name not in self.deferred_tool_specs_by_name:
continue
if normalized_name in self.discovered_tool_names:
continue
self.discovered_tool_names.add(normalized_name)
newly_discovered_tool_names.append(normalized_name)
return newly_discovered_tool_names
def _has_pending_messages(self) -> bool:
return self._last_processed_index < len(self.message_cache)
def _schedule_message_turn(self) -> None:
"""为当前待处理消息安排一次内部 turn。"""
if self._agent_state == self._STATE_WAIT:
return
if not self._has_pending_messages() or self._message_turn_scheduled:
return
@@ -531,8 +665,9 @@ class MaisakaHeartFlowChatting:
def _enter_wait_state(self, seconds: Optional[float] = None, tool_call_id: Optional[str] = None) -> None:
"""切换到等待状态。"""
self._agent_state = self._STATE_WAIT
self._wait_until = None if seconds is None else time.time() + seconds
self._pending_wait_tool_call_id = tool_call_id
self._message_turn_scheduled = False
self._cancel_deferred_message_turn_task()
self._cancel_wait_timeout_task()
if seconds is not None:
self._wait_timeout_task = asyncio.create_task(
@@ -542,7 +677,6 @@ class MaisakaHeartFlowChatting:
def _enter_stop_state(self) -> None:
"""切换到停止状态。"""
self._agent_state = self._STATE_STOP
self._wait_until = None
self._pending_wait_tool_call_id = None
self._cancel_wait_timeout_task()
@@ -567,7 +701,6 @@ class MaisakaHeartFlowChatting:
logger.info(f"{self.log_prefix} Maisaka 等待已超时")
self._agent_state = self._STATE_RUNNING
self._wait_until = None
await self._internal_turn_queue.put("timeout")
except asyncio.CancelledError:
return
@@ -616,7 +749,7 @@ class MaisakaHeartFlowChatting:
return True
async def _trigger_expression_learning(self, messages: list[SessionMessage]) -> None:
"""?????????????????"""
"""触发表达方式学习"""
pending_count = self._expression_learner.get_pending_count(self.message_cache)
if not self._should_trigger_learning(
enabled=self._enable_expression_learning,
@@ -629,21 +762,21 @@ class MaisakaHeartFlowChatting:
self._last_expression_extraction_time = time.time()
logger.info(
f"{self.log_prefix} ??????: "
f"??????={len(messages)} ??????={pending_count} "
f"?????={len(self.message_cache)} "
f"??????={self._enable_jargon_learning}"
f"{self.log_prefix} 触发表达方式学习: "
f"消息数量={len(messages)} 待处理消息数量={pending_count} "
f"缓存总量={len(self.message_cache)} "
f"是否启用黑话学习={self._enable_jargon_learning}"
)
try:
jargon_miner = self._jargon_miner if self._enable_jargon_learning else None
learnt_style = await self._expression_learner.learn(self.message_cache, jargon_miner)
if learnt_style:
logger.info(f"{self.log_prefix} ???????")
logger.info(f"{self.log_prefix} 表达方式学习成功")
else:
logger.debug(f"{self.log_prefix} ???????????????")
logger.debug(f"{self.log_prefix} 表达方式学习失败")
except Exception:
logger.exception(f"{self.log_prefix} ??????")
logger.exception(f"{self.log_prefix} 表达方式学习异常")
async def _init_mcp(self) -> None:
"""初始化 MCP 工具并注册到统一工具层。"""
@@ -655,12 +788,12 @@ class MaisakaHeartFlowChatting:
host_callbacks=self._mcp_host_bridge.build_callbacks(),
)
if self._mcp_manager is None:
logger.info(f"{self.log_prefix} MCP 管理器不可用")
logger.info(f"{self.log_prefix} Maisaka MCP 管理器不可用")
return
mcp_tool_specs = self._mcp_manager.get_tool_specs()
if not mcp_tool_specs:
logger.info(f"{self.log_prefix} 没有可供 Maisaka 使用的 MCP 工具")
logger.info(f"{self.log_prefix} Maisaka 没有可供使用的 MCP 工具")
return
self._tool_registry.register_provider(MCPToolProvider(self._mcp_manager))
@@ -694,6 +827,7 @@ class MaisakaHeartFlowChatting:
self,
*,
cycle_id: Optional[int] = None,
time_records: Optional[dict[str, float]] = None,
timing_selected_history_count: Optional[int] = None,
timing_prompt_tokens: Optional[int] = None,
timing_action: str = "",
@@ -709,6 +843,7 @@ class MaisakaHeartFlowChatting:
planner_tool_results: Optional[list[str]] = None,
planner_tool_detail_results: Optional[list[dict[str, Any]]] = None,
planner_prompt_section: Optional[RenderableType] = None,
planner_extra_lines: Optional[list[str]] = None,
) -> None:
"""在终端展示当前聊天流本轮 cycle 的最终结果。"""
if not global_config.debug.show_maisaka_thinking:
@@ -721,6 +856,7 @@ class MaisakaHeartFlowChatting:
if cycle_id is not None:
body_lines.append(f"循环编号:{cycle_id}")
panel_subtitle = self._build_cycle_time_records_text(time_records or {})
renderables: list[RenderableType] = [Text("\n".join(body_lines))]
timing_panel = self._build_cycle_stage_panel(
title="Timing Gate",
@@ -728,33 +864,49 @@ class MaisakaHeartFlowChatting:
selected_history_count=timing_selected_history_count,
prompt_tokens=timing_prompt_tokens,
response_text=timing_response,
tool_calls=timing_tool_calls,
tool_results=timing_tool_results,
tool_detail_results=timing_tool_detail_results,
prompt_section=timing_prompt_section,
extra_lines=[f"门控动作:{timing_action}"] if timing_action.strip() else None,
)
if timing_panel is not None:
renderables.append(timing_panel)
timing_tool_cards = self._build_tool_activity_cards(
stage_title="Timing Tool",
tool_calls=timing_tool_calls,
tool_results=timing_tool_results,
tool_detail_results=timing_tool_detail_results,
planner_style=False,
)
if timing_tool_cards:
renderables.extend(timing_tool_cards)
planner_panel = self._build_cycle_stage_panel(
title="Planner",
border_style="green",
selected_history_count=planner_selected_history_count,
prompt_tokens=planner_prompt_tokens,
response_text=planner_response,
tool_calls=planner_tool_calls,
tool_results=planner_tool_results,
tool_detail_results=planner_tool_detail_results,
prompt_section=planner_prompt_section,
extra_lines=planner_extra_lines,
)
if planner_panel is not None:
renderables.append(planner_panel)
planner_tool_cards = self._build_tool_activity_cards(
stage_title="Planner Tool",
tool_calls=planner_tool_calls,
tool_results=planner_tool_results,
tool_detail_results=planner_tool_detail_results,
planner_style=True,
)
if planner_tool_cards:
renderables.extend(planner_tool_cards)
console.print(
Panel(
Group(*renderables),
title="MaiSaka 循环",
subtitle=panel_subtitle,
border_style="bright_blue",
padding=(0, 1),
)
@@ -768,9 +920,6 @@ class MaisakaHeartFlowChatting:
selected_history_count: Optional[int],
prompt_tokens: Optional[int],
response_text: str = "",
tool_calls: Optional[list[Any]] = None,
tool_results: Optional[list[str]] = None,
tool_detail_results: Optional[list[dict[str, Any]]] = None,
prompt_section: Optional[RenderableType] = None,
extra_lines: Optional[list[str]] = None,
) -> Optional[Panel]:
@@ -780,9 +929,6 @@ class MaisakaHeartFlowChatting:
selected_history_count is not None,
prompt_tokens is not None,
bool(response_text.strip()),
bool(tool_calls),
bool(tool_results),
bool(tool_detail_results),
prompt_section is not None,
bool(extra_lines),
])
@@ -809,40 +955,11 @@ class MaisakaHeartFlowChatting:
Panel(
Text(normalized_response),
title="Maisaka 返回",
border_style="green",
border_style=border_style,
padding=(0, 1),
)
)
normalized_tool_calls = build_tool_call_summary_lines(tool_calls or [])
if normalized_tool_calls:
renderables.append(
Panel(
Text("\n".join(normalized_tool_calls)),
title="工具调用",
border_style="magenta",
padding=(0, 1),
)
)
normalized_tool_results = self._filter_redundant_tool_results(
tool_results=tool_results or [],
tool_detail_results=tool_detail_results or [],
)
if normalized_tool_results:
renderables.append(
Panel(
Text("\n".join(normalized_tool_results)),
title="工具结果",
border_style="yellow",
padding=(0, 1),
)
)
detail_panels = self._build_tool_detail_panels(tool_detail_results or [])
if detail_panels:
renderables.extend(detail_panels)
return Panel(
Group(*renderables),
title=title,
@@ -850,6 +967,75 @@ class MaisakaHeartFlowChatting:
padding=(0, 1),
)
def _build_tool_activity_cards(
self,
*,
stage_title: str,
tool_calls: Optional[list[Any]] = None,
tool_results: Optional[list[str]] = None,
tool_detail_results: Optional[list[dict[str, Any]]] = None,
planner_style: bool = False,
) -> list[RenderableType]:
"""构建与阶段同级的工具执行卡片列表。"""
detail_results = tool_detail_results or []
cards = self._build_tool_detail_cards(
detail_results,
stage_title=stage_title,
planner_style=planner_style,
)
if cards:
return cards
# 兼容旧数据结构:若尚无 detail则降级为简单文本卡片。
fallback_lines = self._filter_redundant_tool_results(
tool_results=tool_results or [],
tool_detail_results=detail_results,
)
if not fallback_lines and tool_calls:
fallback_lines = build_tool_call_summary_lines(tool_calls)
if not fallback_lines:
return []
fallback_border_style = "yellow"
return [
Panel(
Text("\n".join(fallback_lines)),
title=stage_title,
border_style=fallback_border_style,
padding=(0, 1),
)
]
@staticmethod
def _build_cycle_time_records_text(time_records: dict[str, float]) -> str:
"""构建循环最外层面板展示的阶段耗时文本。"""
if not time_records:
return "流程耗时:无"
label_map = {
"timing_gate": "Timing Gate",
"planner": "Planner",
"tool_calls": "工具执行",
}
ordered_keys = ["timing_gate", "planner", "tool_calls"]
parts: list[str] = []
for key in ordered_keys:
duration = time_records.get(key)
if isinstance(duration, (int, float)):
parts.append(f"{label_map.get(key, key)} {float(duration):.2f} s")
for key, duration in time_records.items():
if key in ordered_keys or not isinstance(duration, (int, float)):
continue
parts.append(f"{label_map.get(key, key)} {float(duration):.2f} s")
if not parts:
return "流程耗时:无"
return "流程耗时:" + " | ".join(parts)
@staticmethod
def _filter_redundant_tool_results(
*,
@@ -941,7 +1127,9 @@ class MaisakaHeartFlowChatting:
*,
tool_name: str,
prompt_text: str,
request_messages: Optional[list[Any]] = None,
tool_call_id: str,
border_style: str = "bright_yellow",
) -> Panel:
"""将工具 prompt 渲染为可点击查看的预览入口。"""
@@ -950,6 +1138,26 @@ class MaisakaHeartFlowChatting:
if tool_call_id:
subtitle += f"\n调用ID: {tool_call_id}"
if isinstance(request_messages, list) and request_messages:
try:
normalized_messages = deserialize_prompt_messages(request_messages)
except Exception as exc:
logger.warning(f"工具 {tool_name} 的 request_messages 无法反序列化,已回退为文本预览: {exc}")
else:
return Panel(
PromptCLIVisualizer.build_prompt_access_panel(
normalized_messages,
category=labels["prompt_category"],
chat_id=self.session_id,
request_kind=labels["request_kind"],
selection_reason=subtitle,
image_display_mode="path_link" if global_config.maisaka.show_image_path else "legacy",
),
title=labels["prompt_title"],
border_style=border_style,
padding=(0, 1),
)
return Panel(
PromptCLIVisualizer.build_text_access_panel(
prompt_text,
@@ -959,116 +1167,235 @@ class MaisakaHeartFlowChatting:
subtitle=subtitle,
),
title=labels["prompt_title"],
border_style="bright_yellow",
border_style=border_style,
padding=(0, 1),
)
def _build_tool_detail_panels(self, tool_detail_results: list[dict[str, Any]]) -> list[RenderableType]:
""" tool monitor detail 渲染为 CLI 详情卡片"""
def _normalize_tool_card_body_lines(self, body: Any) -> list[str]:
"""工具卡片正文规范化为行列表"""
if isinstance(body, str):
return [line for line in body.splitlines() if line.strip()]
if isinstance(body, list):
return [
str(item).strip()
for item in body
if str(item).strip()
]
return []
def _build_custom_tool_sub_cards(
self,
sub_cards: Any,
*,
default_border_style: str,
) -> list[RenderableType]:
"""构建工具自定义子卡片。"""
if not isinstance(sub_cards, list):
return []
renderables: list[RenderableType] = []
for sub_card in sub_cards:
if not isinstance(sub_card, dict):
continue
title = str(sub_card.get("title") or "").strip() or "附加信息"
border_style = str(sub_card.get("border_style") or "").strip() or default_border_style
body_lines = self._normalize_tool_card_body_lines(
sub_card.get("body_lines", sub_card.get("content", ""))
)
if not body_lines:
continue
renderables.append(
Panel(
Text("\n".join(body_lines)),
title=title,
border_style=border_style,
padding=(0, 1),
)
)
return renderables
def _build_default_tool_detail_parts(
self,
*,
tool_name: str,
tool_call_id: str,
tool_args: Any,
summary: str,
duration_ms: Any,
detail: dict[str, Any],
planner_style: bool,
) -> list[RenderableType]:
"""构建工具卡片默认内容块。"""
argument_border_style = "yellow"
metrics_border_style = "bright_yellow"
prompt_border_style = "bright_yellow"
reasoning_border_style = "yellow"
output_border_style = "bright_yellow"
extra_info_border_style = "yellow"
detail_labels = self._get_tool_detail_labels(tool_name)
parts: list[RenderableType] = []
header_lines: list[str] = []
if summary:
header_lines.append(summary)
if tool_call_id:
header_lines.append(f"调用ID{tool_call_id}")
if isinstance(duration_ms, (int, float)):
header_lines.append(f"执行耗时:{round(float(duration_ms), 2)} ms")
if header_lines:
parts.append(Text("\n".join(header_lines)))
if isinstance(tool_args, dict) and tool_args:
parts.append(
Panel(
Pretty(tool_args, expand_all=True),
title="工具参数",
border_style=argument_border_style,
padding=(0, 1),
)
)
metrics = detail.get("metrics")
if isinstance(metrics, dict):
metrics_text = self._build_tool_metrics_text(metrics)
if metrics_text:
parts.append(
Panel(
Text(metrics_text),
title="执行指标",
border_style=metrics_border_style,
padding=(0, 1),
)
)
prompt_text = str(detail.get("prompt_text") or "").strip()
if prompt_text:
parts.append(
self._build_tool_prompt_access_panel(
tool_name=tool_name,
prompt_text=prompt_text,
request_messages=detail.get("request_messages") if isinstance(detail.get("request_messages"), list) else None,
tool_call_id=tool_call_id,
border_style=prompt_border_style,
)
)
reasoning_text = str(detail.get("reasoning_text") or "").strip()
if reasoning_text:
parts.append(
Panel(
Text(reasoning_text),
title=detail_labels["reasoning_title"],
border_style=reasoning_border_style,
padding=(0, 1),
)
)
output_text = str(detail.get("output_text") or "").strip()
if output_text:
parts.append(
Panel(
Text(output_text),
title=detail_labels["output_title"],
border_style=output_border_style,
padding=(0, 1),
)
)
extra_sections = detail.get("extra_sections")
if isinstance(extra_sections, list):
for section in extra_sections:
if not isinstance(section, dict):
continue
section_title = str(section.get("title") or "").strip() or "附加信息"
section_content = str(section.get("content") or "").strip()
if not section_content:
continue
parts.append(
Panel(
Text(section_content),
title=section_title,
border_style=extra_info_border_style,
padding=(0, 1),
)
)
return parts
def _build_tool_detail_cards(
self,
tool_detail_results: list[dict[str, Any]],
*,
stage_title: str,
planner_style: bool = False,
) -> list[RenderableType]:
"""将 tool monitor detail 渲染为与 Planner/Timing 平级的工具卡片。"""
detail_panel_border_style = "yellow"
sub_card_border_style = "bright_yellow"
panels: list[RenderableType] = []
for tool_result in tool_detail_results:
detail = tool_result.get("detail")
if not isinstance(detail, dict) or not detail:
continue
detail_dict = detail if isinstance(detail, dict) else {}
tool_name = str(tool_result.get("tool_name") or "unknown").strip() or "unknown"
detail_labels = self._get_tool_detail_labels(tool_name)
tool_title = str(tool_result.get("tool_title") or "").strip() or tool_name
tool_call_id = str(tool_result.get("tool_call_id") or "").strip()
tool_args = tool_result.get("tool_args")
summary = str(tool_result.get("summary") or "").strip()
duration_ms = tool_result.get("duration_ms")
custom_card = tool_result.get("card")
parts: list[RenderableType] = []
header_lines: list[str] = []
if summary:
header_lines.append(summary)
if tool_call_id:
header_lines.append(f"调用ID{tool_call_id}")
if isinstance(duration_ms, (int, float)):
header_lines.append(f"执行耗时:{round(float(duration_ms), 2)} ms")
if header_lines:
parts.append(Text("\n".join(header_lines)))
if isinstance(tool_args, dict) and tool_args:
parts.append(
Panel(
Pretty(tool_args, expand_all=True),
title="工具参数",
border_style="cyan",
padding=(0, 1),
)
custom_title = ""
card_border_style = detail_panel_border_style
replace_default_children = False
if isinstance(custom_card, dict):
custom_title = str(custom_card.get("title") or "").strip()
card_border_style = str(custom_card.get("border_style") or "").strip() or detail_panel_border_style
replace_default_children = bool(custom_card.get("replace_default_children", False))
custom_body_lines = self._normalize_tool_card_body_lines(
custom_card.get("body_lines", custom_card.get("content", ""))
)
if custom_body_lines:
parts.append(Text("\n".join(custom_body_lines)))
metrics = detail.get("metrics")
if isinstance(metrics, dict):
metrics_text = self._build_tool_metrics_text(metrics)
if metrics_text:
parts.append(
Panel(
Text(metrics_text),
title="执行指标",
border_style="bright_cyan",
padding=(0, 1),
)
)
prompt_text = str(detail.get("prompt_text") or "").strip()
if prompt_text:
parts.append(
self._build_tool_prompt_access_panel(
if not replace_default_children:
parts.extend(
self._build_default_tool_detail_parts(
tool_name=tool_name,
prompt_text=prompt_text,
tool_call_id=tool_call_id,
tool_args=tool_args,
summary=summary,
duration_ms=duration_ms,
detail=detail_dict,
planner_style=planner_style,
)
)
reasoning_text = str(detail.get("reasoning_text") or "").strip()
if reasoning_text:
parts.append(
Panel(
Text(reasoning_text),
title=detail_labels["reasoning_title"],
border_style="magenta",
padding=(0, 1),
if isinstance(custom_card, dict):
parts.extend(
self._build_custom_tool_sub_cards(
custom_card.get("sub_cards"),
default_border_style=sub_card_border_style,
)
)
output_text = str(detail.get("output_text") or "").strip()
if output_text:
parts.append(
Panel(
Text(output_text),
title=detail_labels["output_title"],
border_style="green",
padding=(0, 1),
)
parts.extend(
self._build_custom_tool_sub_cards(
tool_result.get("sub_cards"),
default_border_style=sub_card_border_style,
)
extra_sections = detail.get("extra_sections")
if isinstance(extra_sections, list):
for section in extra_sections:
if not isinstance(section, dict):
continue
section_title = str(section.get("title") or "").strip() or "附加信息"
section_content = str(section.get("content") or "").strip()
if not section_content:
continue
parts.append(
Panel(
Text(section_content),
title=section_title,
border_style="white",
padding=(0, 1),
)
)
)
if parts:
panels.append(
Panel(
Group(*parts),
title=f"{tool_name} 工具详情",
border_style="yellow",
title=custom_title or f"{stage_title} · {tool_title}",
border_style=card_border_style,
padding=(0, 1),
)
)