feat:修复孤儿工具报错,为replyer等tool添加统一的控制台展示接口

This commit is contained in:
SengokuCola
2026-04-07 16:21:42 +08:00
parent c5f514946b
commit 6968879a04
11 changed files with 1803 additions and 439 deletions

View File

@@ -1,74 +1,50 @@
"""MaiSaka 实时监控事件广播模块。
通过统一 WebSocket 将 MaiSaka 推理引擎各阶段状态实时推送给前端监控界面
无需落盘 HTML/TXT 中间文件即可在 WebUI 中渲染完整的聊天流推理过程。
通过统一 WebSocket 将 MaiSaka 推理引擎各阶段状态实时推送给前端监控界面
"""
from typing import Any, Dict, List, Optional
from datetime import datetime
import time
from typing import Any, Dict, List, Optional
from src.common.logger import get_logger
logger = get_logger("maisaka_monitor")
# WebSocket 广播使用的业务域与主题
MONITOR_DOMAIN = "maisaka_monitor"
MONITOR_TOPIC = "main"
def _serialize_message(message: Any) -> Dict[str, Any]:
"""单条 LLM 消息序列化为可通过 WebSocket 传输的字典。
def _normalize_payload_value(value: Any) -> Any:
"""事件载荷中的任意值规范化为可序列化结构。"""
对二进制数据(如图片)仅保留元信息,不传输原始字节以减小带宽占用。
Args:
message: 原始消息对象,可以是 dict 或带 role/content 属性的消息实例。
Returns:
Dict[str, Any]: 序列化后的消息字典。
"""
if isinstance(message, dict):
serialized: Dict[str, Any] = {
"role": str(message.get("role", "unknown")),
"content": message.get("content"),
}
if message.get("tool_call_id"):
serialized["tool_call_id"] = message["tool_call_id"]
if message.get("tool_calls"):
serialized["tool_calls"] = _serialize_tool_calls_from_dicts(message["tool_calls"])
return serialized
raw_role = getattr(message, "role", "unknown")
role_str = raw_role.value if hasattr(raw_role, "value") else str(raw_role) # type: ignore[union-attr]
serialized = {
"role": role_str,
"content": _extract_text_content(getattr(message, "content", None)),
}
tool_call_id = getattr(message, "tool_call_id", None)
if tool_call_id:
serialized["tool_call_id"] = str(tool_call_id)
tool_calls = getattr(message, "tool_calls", None)
if tool_calls:
serialized["tool_calls"] = _serialize_tool_calls_from_objects(tool_calls)
return serialized
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, dict):
normalized_dict: Dict[str, Any] = {}
for key, item in value.items():
normalized_dict[str(key)] = _normalize_payload_value(item)
return normalized_dict
if isinstance(value, (list, tuple, set)):
return [_normalize_payload_value(item) for item in value]
if hasattr(value, "model_dump"):
try:
return _normalize_payload_value(value.model_dump())
except Exception:
return str(value)
if hasattr(value, "__dict__"):
try:
return _normalize_payload_value(dict(value.__dict__))
except Exception:
return str(value)
return str(value)
def _extract_text_content(content: Any) -> Optional[str]:
"""从消息内容中提取纯文本表示。
"""从消息内容中提取纯文本表示。"""
支持字符串、列表(多模态内容块)等格式,对图片仅保留占位信息。
Args:
content: 消息的原始 content 字段。
Returns:
Optional[str]: 提取后的文本内容。
"""
if content is None:
return None
if isinstance(content, str):
@@ -91,23 +67,17 @@ def _extract_text_content(content: Any) -> Optional[str]:
def _serialize_tool_calls_from_objects(tool_calls: List[Any]) -> List[Dict[str, Any]]:
"""将工具调用对象列表序列化为字典列表。
"""将工具调用对象列表序列化为字典列表。"""
Args:
tool_calls: 工具调用对象列表ToolCall 或类似结构)。
Returns:
List[Dict[str, Any]]: 序列化后的工具调用列表。
"""
result: List[Dict[str, Any]] = []
for tc in tool_calls:
for tool_call in tool_calls:
serialized: Dict[str, Any] = {
"id": getattr(tc, "id", None) or getattr(tc, "tool_call_id", ""),
"name": getattr(tc, "func_name", None) or getattr(tc, "name", "unknown"),
"id": getattr(tool_call, "id", None) or getattr(tool_call, "call_id", ""),
"name": getattr(tool_call, "func_name", None) or getattr(tool_call, "name", "unknown"),
}
args = getattr(tc, "args", None) or getattr(tc, "arguments", None)
args = getattr(tool_call, "args", None) or getattr(tool_call, "arguments", None)
if isinstance(args, dict):
serialized["arguments"] = args
serialized["arguments"] = _normalize_payload_value(args)
elif isinstance(args, str):
serialized["arguments_raw"] = args
result.append(serialized)
@@ -115,73 +85,101 @@ def _serialize_tool_calls_from_objects(tool_calls: List[Any]) -> List[Dict[str,
def _serialize_tool_calls_from_dicts(tool_calls: List[Any]) -> List[Dict[str, Any]]:
"""将工具调用字典列表标准化为可传输格式。
"""将工具调用字典列表标准化为可传输格式。"""
Args:
tool_calls: 工具调用字典列表。
Returns:
List[Dict[str, Any]]: 标准化后的工具调用列表。
"""
result: List[Dict[str, Any]] = []
for tc in tool_calls:
if isinstance(tc, dict):
for tool_call in tool_calls:
if isinstance(tool_call, dict):
result.append({
"id": tc.get("id", ""),
"name": tc.get("name", tc.get("func_name", "unknown")),
"arguments": tc.get("arguments", tc.get("args", {})),
})
else:
result.append({
"id": getattr(tc, "id", ""),
"name": getattr(tc, "func_name", "unknown"),
"arguments": getattr(tc, "args", {}),
"id": str(tool_call.get("id", "")),
"name": str(tool_call.get("name", tool_call.get("func_name", "unknown"))),
"arguments": _normalize_payload_value(tool_call.get("arguments", tool_call.get("args", {}))),
})
continue
result.append({
"id": str(getattr(tool_call, "id", getattr(tool_call, "call_id", ""))),
"name": str(getattr(tool_call, "func_name", getattr(tool_call, "name", "unknown"))),
"arguments": _normalize_payload_value(getattr(tool_call, "args", getattr(tool_call, "arguments", {}))),
})
return result
def _serialize_message(message: Any) -> Dict[str, Any]:
"""将单条消息序列化为可通过 WebSocket 传输的字典。"""
if isinstance(message, dict):
serialized: Dict[str, Any] = {
"role": str(message.get("role", "unknown")),
"content": _extract_text_content(message.get("content")),
}
if message.get("tool_call_id"):
serialized["tool_call_id"] = str(message["tool_call_id"])
if message.get("tool_calls"):
serialized["tool_calls"] = _serialize_tool_calls_from_dicts(message["tool_calls"])
return serialized
raw_role = getattr(message, "role", "unknown")
role_str = raw_role.value if hasattr(raw_role, "value") else str(raw_role)
serialized = {
"role": role_str,
"content": _extract_text_content(getattr(message, "content", None)),
}
tool_call_id = getattr(message, "tool_call_id", None)
if tool_call_id:
serialized["tool_call_id"] = str(tool_call_id)
tool_calls = getattr(message, "tool_calls", None)
if tool_calls:
serialized["tool_calls"] = _serialize_tool_calls_from_objects(tool_calls)
return serialized
def _serialize_messages(messages: List[Any]) -> List[Dict[str, Any]]:
"""批量序列化消息列表。
"""批量序列化消息列表。"""
Args:
messages: 原始消息列表。
return [_serialize_message(message) for message in messages]
Returns:
List[Dict[str, Any]]: 序列化后的消息字典列表。
"""
return [_serialize_message(msg) for msg in messages]
def _serialize_tool_results(tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""标准化最终 planner 卡中的工具结果列表。"""
serialized_tools: List[Dict[str, Any]] = []
for tool in tools:
serialized_tool = {
"tool_call_id": str(tool.get("tool_call_id", "")),
"tool_name": str(tool.get("tool_name", "")),
"tool_args": _normalize_payload_value(tool.get("tool_args", {})),
"success": bool(tool.get("success", False)),
"duration_ms": float(tool.get("duration_ms", 0.0) or 0.0),
"summary": str(tool.get("summary", "")),
}
detail = tool.get("detail")
if detail is not None:
serialized_tool["detail"] = _normalize_payload_value(detail)
serialized_tools.append(serialized_tool)
return serialized_tools
async def _broadcast(event: str, data: Dict[str, Any]) -> None:
"""通过统一 WebSocket 管理器向所有订阅了 maisaka_monitor 主题的连接广播事件。
"""通过统一 WebSocket 管理器向监控主题广播事件。"""
延迟导入 websocket_manager 以避免循环依赖。
Args:
event: 事件名称。
data: 事件数据。
"""
try:
from src.webui.routers.websocket.manager import websocket_manager
subscription_key = f"{MONITOR_DOMAIN}:{MONITOR_TOPIC}"
total_connections = len(websocket_manager.connections)
subscriber_count = sum(
1 for conn in websocket_manager.connections.values()
if subscription_key in conn.subscriptions
1
for connection in websocket_manager.connections.values()
if subscription_key in connection.subscriptions
)
# 诊断:打印 manager 对象 id 和连接状态
logger.info(
f"[诊断] _broadcast: manager_id={id(websocket_manager)} "
f"总连接={total_connections} 订阅者={subscriber_count} event={event}"
)
if subscriber_count == 0 and total_connections > 0:
for cid, conn in websocket_manager.connections.items():
logger.info(
f"[诊断] 连接={cid[:8]}… 订阅={conn.subscriptions}"
)
await websocket_manager.broadcast_to_topic(
domain=MONITOR_DOMAIN,
topic=MONITOR_TOPIC,
@@ -193,12 +191,8 @@ async def _broadcast(event: str, data: Dict[str, Any]) -> None:
async def emit_session_start(session_id: str, session_name: str) -> None:
"""广播会话开始事件。
"""广播会话开始事件。"""
Args:
session_id: 聊天流 ID。
session_name: 聊天流显示名称。
"""
await _broadcast("session.start", {
"session_id": session_id,
"session_name": session_name,
@@ -213,17 +207,8 @@ async def emit_message_ingested(
message_id: str,
timestamp: float,
) -> None:
"""广播新消息注入事件。
"""广播新消息注入事件。"""
当新的用户消息被纳入 MaiSaka 推理上下文时触发。
Args:
session_id: 聊天流 ID。
speaker_name: 发言者名称。
content: 消息文本内容。
message_id: 消息 ID。
timestamp: 消息时间戳。
"""
await _broadcast("message.ingested", {
"session_id": session_id,
"speaker_name": speaker_name,
@@ -240,15 +225,8 @@ async def emit_cycle_start(
max_rounds: int,
history_count: int,
) -> None:
"""广播推理循环开始事件。
"""广播推理循环开始事件。"""
Args:
session_id: 聊天流 ID。
cycle_id: 循环编号。
round_index: 当前回合索引(从 0 开始)。
max_rounds: 最大回合数。
history_count: 当前上下文消息数。
"""
await _broadcast("cycle.start", {
"session_id": session_id,
"cycle_id": cycle_id,
@@ -270,19 +248,8 @@ async def emit_timing_gate_result(
selected_history_count: int,
duration_ms: float,
) -> None:
"""广播 Timing Gate 子代理结果事件。
"""广播 Timing Gate 结果事件。"""
Args:
session_id: 聊天流 ID。
cycle_id: 循环编号。
action: 控制决策continue/wait/no_reply
content: Timing Gate 返回的文本内容。
tool_calls: 工具调用列表。
messages: 发送给 Timing Gate 的消息列表。
prompt_tokens: 输入 Token 数。
selected_history_count: 已选上下文消息数。
duration_ms: 执行耗时(毫秒)。
"""
await _broadcast("timing_gate.result", {
"session_id": session_id,
"cycle_id": cycle_id,
@@ -297,177 +264,45 @@ async def emit_timing_gate_result(
})
async def emit_planner_request(
async def emit_planner_finalized(
*,
session_id: str,
cycle_id: int,
messages: List[Any],
tool_count: int,
request_messages: List[Any],
selected_history_count: int,
) -> None:
"""广播规划器请求开始事件。
携带完整的消息列表,前端可以增量渲染新增消息。
Args:
session_id: 聊天流 ID。
cycle_id: 循环编号。
messages: 发送给规划器的完整消息列表。
tool_count: 可用工具数量。
selected_history_count: 已选上下文消息数。
"""
await _broadcast("planner.request", {
"session_id": session_id,
"cycle_id": cycle_id,
"messages": _serialize_messages(messages),
"tool_count": tool_count,
"selected_history_count": selected_history_count,
"timestamp": time.time(),
})
async def emit_planner_response(
session_id: str,
cycle_id: int,
content: Optional[str],
tool_calls: List[Any],
tool_count: int,
planner_content: Optional[str],
planner_tool_calls: List[Any],
prompt_tokens: int,
completion_tokens: int,
total_tokens: int,
duration_ms: float,
) -> None:
"""广播规划器响应事件。
Args:
session_id: 聊天流 ID。
cycle_id: 循环编号。
content: 规划器返回的思考文本。
tool_calls: 规划器返回的工具调用列表。
prompt_tokens: 输入 Token 数。
completion_tokens: 输出 Token 数。
total_tokens: 总 Token 数。
duration_ms: 执行耗时(毫秒)。
"""
await _broadcast("planner.response", {
"session_id": session_id,
"cycle_id": cycle_id,
"content": content,
"tool_calls": _serialize_tool_calls_from_objects(tool_calls),
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"duration_ms": duration_ms,
"timestamp": time.time(),
})
async def emit_tool_execution(
session_id: str,
cycle_id: int,
tool_name: str,
tool_args: Dict[str, Any],
result_summary: str,
success: bool,
duration_ms: float,
) -> None:
"""广播工具执行结果事件。
Args:
session_id: 聊天流 ID。
cycle_id: 循环编号。
tool_name: 工具名称。
tool_args: 工具参数。
result_summary: 执行结果摘要。
success: 是否成功。
duration_ms: 执行耗时(毫秒)。
"""
await _broadcast("tool.execution", {
"session_id": session_id,
"cycle_id": cycle_id,
"tool_name": tool_name,
"tool_args": tool_args,
"result_summary": result_summary,
"success": success,
"duration_ms": duration_ms,
"timestamp": time.time(),
})
async def emit_cycle_end(
session_id: str,
cycle_id: int,
tools: List[Dict[str, Any]],
time_records: Dict[str, float],
agent_state: str,
) -> None:
"""广播推理循环结束事件。
"""广播一轮 planner 结束后的最终聚合事件。"""
Args:
session_id: 聊天流 ID。
cycle_id: 循环编号。
time_records: 各阶段耗时记录。
agent_state: 循环结束后的代理状态。
"""
await _broadcast("cycle.end", {
await _broadcast("planner.finalized", {
"session_id": session_id,
"cycle_id": cycle_id,
"time_records": time_records,
"agent_state": agent_state,
"timestamp": time.time(),
})
async def emit_replier_request(
session_id: str,
messages: List[Any],
model_name: str = "",
) -> None:
"""广播回复器请求开始事件。
Args:
session_id: 聊天流 ID。
messages: 发送给回复器的消息列表。
model_name: 使用的模型名称。
"""
await _broadcast("replier.request", {
"session_id": session_id,
"messages": _serialize_messages(messages),
"model_name": model_name,
"timestamp": time.time(),
})
async def emit_replier_response(
session_id: str,
content: Optional[str],
reasoning: str,
model_name: str,
prompt_tokens: int,
completion_tokens: int,
total_tokens: int,
duration_ms: float,
success: bool,
) -> None:
"""广播回复器响应事件。
Args:
session_id: 聊天流 ID。
content: 回复器生成的文本。
reasoning: 回复器的思考过程文本。
model_name: 使用的模型名称。
prompt_tokens: 输入 Token 数。
completion_tokens: 输出 Token 数。
total_tokens: 总 Token 数。
duration_ms: 执行耗时(毫秒)。
success: 是否生成成功。
"""
await _broadcast("replier.response", {
"session_id": session_id,
"content": content,
"reasoning": reasoning,
"model_name": model_name,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"duration_ms": duration_ms,
"success": success,
"timestamp": time.time(),
"request": {
"messages": _serialize_messages(request_messages),
"selected_history_count": selected_history_count,
"tool_count": tool_count,
},
"planner": {
"content": planner_content,
"tool_calls": _serialize_tool_calls_from_objects(planner_tool_calls),
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"duration_ms": duration_ms,
},
"tools": _serialize_tool_results(tools),
"final_state": {
"time_records": _normalize_payload_value(time_records),
"agent_state": agent_state,
},
})