diff --git a/dashboard/src/components/layout/constants.ts b/dashboard/src/components/layout/constants.ts index 6bdcac21..4ed07122 100644 --- a/dashboard/src/components/layout/constants.ts +++ b/dashboard/src/components/layout/constants.ts @@ -36,7 +36,7 @@ export const menuSections: MenuSection[] = [ { icon: LayoutGrid, label: 'sidebar.menu.configTemplate', path: '/config/pack-market' }, { icon: Sliders, label: 'sidebar.menu.pluginConfig', path: '/plugin-config' }, { icon: FileSearch, label: 'sidebar.menu.logViewer', path: '/logs', searchDescription: 'search.items.logsDesc' }, - { icon: Activity, label: 'sidebar.menu.plannerMonitor', path: '/planner-monitor' }, + { icon: Activity, label: 'sidebar.menu.maisakaMonitor', path: '/planner-monitor' }, { icon: MessageSquare, label: 'sidebar.menu.localChat', path: '/chat' }, ], }, diff --git a/dashboard/src/i18n/locales/en.json b/dashboard/src/i18n/locales/en.json index 4b423efd..e8dce641 100644 --- a/dashboard/src/i18n/locales/en.json +++ b/dashboard/src/i18n/locales/en.json @@ -38,7 +38,7 @@ "configTemplate": "Config Templates", "pluginConfig": "Plugin Config", "logViewer": "Log Viewer", - "plannerMonitor": "Planner & Replier Monitor", + "maisakaMonitor": "MaiSaka Chat Monitor", "localChat": "Local Chat", "settings": "Settings" } diff --git a/dashboard/src/i18n/locales/ja.json b/dashboard/src/i18n/locales/ja.json index 18c4d7c7..7ee3dfb2 100644 --- a/dashboard/src/i18n/locales/ja.json +++ b/dashboard/src/i18n/locales/ja.json @@ -38,7 +38,7 @@ "configTemplate": "設定テンプレート", "pluginConfig": "プラグイン設定", "logViewer": "ログビューア", - "plannerMonitor": "プランナー & リプライヤー監視", + "maisakaMonitor": "MaiSaka チャット監視", "localChat": "ローカルチャット", "settings": "設定" } diff --git a/dashboard/src/i18n/locales/ko.json b/dashboard/src/i18n/locales/ko.json index d8226a0e..fb3b071c 100644 --- a/dashboard/src/i18n/locales/ko.json +++ b/dashboard/src/i18n/locales/ko.json @@ -38,7 +38,7 @@ "configTemplate": "설정 템플릿", "pluginConfig": "플러그인 설정", "logViewer": "로그 뷰어", - "plannerMonitor": "플래너 & 리플라이어 모니터", + "maisakaMonitor": "MaiSaka 채팅 모니터", "localChat": "로컬 채팅", "settings": "설정" } diff --git a/dashboard/src/i18n/locales/zh.json b/dashboard/src/i18n/locales/zh.json index e546c933..69cb7557 100644 --- a/dashboard/src/i18n/locales/zh.json +++ b/dashboard/src/i18n/locales/zh.json @@ -38,7 +38,7 @@ "configTemplate": "配置模板市场", "pluginConfig": "插件配置", "logViewer": "日志查看器", - "plannerMonitor": "计划器&回复器监控", + "maisakaMonitor": "MaiSaka 聊天流监控", "localChat": "本地聊天室", "settings": "系统设置" } diff --git a/dashboard/src/lib/api-base.ts b/dashboard/src/lib/api-base.ts index 8d066829..d5b1f651 100644 --- a/dashboard/src/lib/api-base.ts +++ b/dashboard/src/lib/api-base.ts @@ -30,7 +30,7 @@ export async function getApiBaseUrl(): Promise { /** * Get WebSocket base URL * - Electron: Convert HTTP/HTTPS URL to WS/WSS - * - Browser DEV: ws://127.0.0.1:8001 (hardcoded, same as log-websocket.ts) + * - Browser DEV: Use same-origin WS URL and let Vite proxy forward requests * - Browser PROD: Construct WS URL from window.location */ export async function getWsBaseUrl(): Promise { @@ -47,9 +47,10 @@ export async function getWsBaseUrl(): Promise { }) } - // Browser DEV: Use hardcoded WebSocket server + // Browser DEV: Use same-origin URL so Vite proxy can forward WebSocket requests if (import.meta.env.DEV) { - return 'ws://127.0.0.1:8001' + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:' + return `${protocol}//${window.location.host}` } // Browser PROD: Construct WS URL from current location diff --git a/dashboard/src/lib/maisaka-monitor-client.ts b/dashboard/src/lib/maisaka-monitor-client.ts new file mode 100644 index 00000000..76d3f972 --- /dev/null +++ b/dashboard/src/lib/maisaka-monitor-client.ts @@ -0,0 +1,223 @@ +/** + * MaiSaka 实时监控 WebSocket 客户端 + * + * 订阅 maisaka_monitor 主题,接收推理引擎各阶段的实时事件。 + */ +import type { WsEventEnvelope } from './unified-ws' + +import { unifiedWsClient } from './unified-ws' + +// ─── 事件数据类型 ─────────────────────────────────────────────── + +export interface MaisakaMessage { + role: string + content: string | null + tool_call_id?: string + tool_calls?: MaisakaToolCall[] +} + +export interface MaisakaToolCall { + id: string + name: string + arguments?: Record + arguments_raw?: string +} + +export interface SessionStartEvent { + session_id: string + session_name: string + timestamp: number +} + +export interface MessageIngestedEvent { + session_id: string + speaker_name: string + content: string + message_id: string + timestamp: number +} + +export interface CycleStartEvent { + session_id: string + cycle_id: number + round_index: number + max_rounds: number + history_count: number + timestamp: number +} + +export interface TimingGateResultEvent { + session_id: string + cycle_id: number + action: 'continue' | 'wait' | 'no_reply' + content: string | null + tool_calls: MaisakaToolCall[] + messages: MaisakaMessage[] + prompt_tokens: number + selected_history_count: number + duration_ms: number + timestamp: number +} + +export interface PlannerRequestEvent { + session_id: string + cycle_id: number + messages: MaisakaMessage[] + tool_count: number + selected_history_count: number + timestamp: number +} + +export interface PlannerResponseEvent { + session_id: string + cycle_id: number + content: string | null + tool_calls: MaisakaToolCall[] + prompt_tokens: number + completion_tokens: number + total_tokens: number + duration_ms: number + timestamp: number +} + +export interface ToolExecutionEvent { + session_id: string + cycle_id: number + tool_name: string + tool_args: Record + result_summary: string + success: boolean + duration_ms: number + timestamp: number +} + +export interface CycleEndEvent { + session_id: string + cycle_id: number + time_records: Record + agent_state: string + timestamp: number +} + +export interface ReplierRequestEvent { + session_id: string + messages: MaisakaMessage[] + model_name: string + timestamp: number +} + +export interface ReplierResponseEvent { + session_id: string + content: string | null + reasoning: string + model_name: string + prompt_tokens: number + completion_tokens: number + total_tokens: number + duration_ms: number + success: boolean + timestamp: number +} + +// ─── 统一事件联合类型 ───────────────────────────────────────── + +export type MaisakaMonitorEvent = + | { type: 'session.start'; data: SessionStartEvent } + | { type: 'message.ingested'; data: MessageIngestedEvent } + | { type: 'cycle.start'; data: CycleStartEvent } + | { type: 'timing_gate.result'; data: TimingGateResultEvent } + | { type: 'planner.request'; data: PlannerRequestEvent } + | { type: 'planner.response'; data: PlannerResponseEvent } + | { type: 'tool.execution'; data: ToolExecutionEvent } + | { type: 'cycle.end'; data: CycleEndEvent } + | { type: 'replier.request'; data: ReplierRequestEvent } + | { type: 'replier.response'; data: ReplierResponseEvent } + +export type MaisakaEventListener = (event: MaisakaMonitorEvent) => void + +// ─── 客户端 ─────────────────────────────────────────────────── + +class MaisakaMonitorClient { + private initialized = false + private listenerIdCounter = 0 + private listeners: Map = new Map() + private subscriptionActive = false + private subscriptionPromise: Promise | null = null + private deferredUnsubTimer: ReturnType | null = null + + private initialize(): void { + if (this.initialized) { + return + } + + unifiedWsClient.addEventListener((message: WsEventEnvelope) => { + if (message.domain !== 'maisaka_monitor') { + return + } + + const event: MaisakaMonitorEvent = { + type: message.event as MaisakaMonitorEvent['type'], + data: message.data as never, + } + + this.listeners.forEach((listener) => { + try { + listener(event) + } catch (error) { + console.error('MaiSaka 监控事件监听器执行失败:', error) + } + }) + }) + + this.initialized = true + } + + private async ensureSubscribed(): Promise { + if (this.subscriptionActive) { + return + } + + if (this.subscriptionPromise === null) { + this.subscriptionPromise = unifiedWsClient + .subscribe('maisaka_monitor', 'main') + .then(() => { + this.subscriptionActive = true + }) + .finally(() => { + this.subscriptionPromise = null + }) + } + + await this.subscriptionPromise + } + + async subscribe(listener: MaisakaEventListener): Promise<() => Promise> { + this.initialize() + const listenerId = ++this.listenerIdCounter + this.listeners.set(listenerId, listener) + + // 如果有待执行的延迟退订,取消它(React StrictMode 快速卸载/重新挂载) + if (this.deferredUnsubTimer !== null) { + clearTimeout(this.deferredUnsubTimer) + this.deferredUnsubTimer = null + } + + await this.ensureSubscribed() + + return async () => { + this.listeners.delete(listenerId) + if (this.listeners.size === 0 && this.subscriptionActive) { + // 延迟退订:等待短暂时间再真正退订,避免 StrictMode 导致的竞态 + this.deferredUnsubTimer = setTimeout(() => { + this.deferredUnsubTimer = null + if (this.listeners.size === 0 && this.subscriptionActive) { + this.subscriptionActive = false + void unifiedWsClient.unsubscribe('maisaka_monitor', 'main') + } + }, 200) + } + } + } +} + +export const maisakaMonitorClient = new MaisakaMonitorClient() diff --git a/dashboard/src/lib/unified-ws.ts b/dashboard/src/lib/unified-ws.ts index 3cdafce0..6438288d 100644 --- a/dashboard/src/lib/unified-ws.ts +++ b/dashboard/src/lib/unified-ws.ts @@ -83,11 +83,14 @@ async function getWsToken(): Promise { } class UnifiedWebSocketClient { + private readonly heartbeatIntervalMs = 30000 + private readonly heartbeatTimeoutMs = 90000 private connectPromise: Promise | null = null private connectionListeners: Set = new Set() private eventListeners: Set = new Set() private hasConnectedOnce = false private heartbeatIntervalId: number | null = null + private lastPongAt = 0 private manualDisconnect = false private pendingRequests: Map = new Map() private reconnectAttempts = 0 @@ -151,10 +154,21 @@ class UnifiedWebSocketClient { private startHeartbeat(): void { this.stopHeartbeat() this.heartbeatIntervalId = window.setInterval(() => { - if (this.ws?.readyState === WebSocket.OPEN) { - this.ws.send(JSON.stringify({ op: 'ping' })) + if (this.ws?.readyState !== WebSocket.OPEN) { + return } - }, 30000) + + const now = Date.now() + if (this.lastPongAt > 0 && now - this.lastPongAt > this.heartbeatTimeoutMs) { + console.warn('统一 WebSocket 心跳超时,准备重连') + void this.restart().catch((error) => { + console.error('统一 WebSocket 心跳重连失败:', error) + }) + return + } + + this.ws.send(JSON.stringify({ op: 'ping' })) + }, this.heartbeatIntervalMs) } private clearReconnectTimer(): void { @@ -252,7 +266,11 @@ class UnifiedWebSocketClient { } } - private handleServerMessage(rawData: string): void { + private handleServerMessage(socket: WebSocket, rawData: string): void { + if (this.ws !== socket) { + return + } + let message: WsServerEnvelope try { message = JSON.parse(rawData) as WsServerEnvelope @@ -262,6 +280,7 @@ class UnifiedWebSocketClient { } if (message.op === 'pong') { + this.lastPongAt = Date.now() return } @@ -297,8 +316,13 @@ class UnifiedWebSocketClient { } } - private handleClose(event: CloseEvent): void { + private handleClose(socket: WebSocket, event: CloseEvent): void { + if (this.ws !== socket) { + return + } + this.stopHeartbeat() + this.lastPongAt = 0 this.ws = null this.connectPromise = null this.setStatus('idle') @@ -340,10 +364,16 @@ class UnifiedWebSocketClient { this.ws = socket socket.onopen = () => { + if (this.ws !== socket) { + socket.close() + return + } + settled = true const shouldNotifyReconnect = this.hasConnectedOnce this.hasConnectedOnce = true this.reconnectAttempts = 0 + this.lastPongAt = Date.now() this.startHeartbeat() this.setStatus('connected') resolve() @@ -351,10 +381,14 @@ class UnifiedWebSocketClient { } socket.onmessage = (event) => { - this.handleServerMessage(event.data) + this.handleServerMessage(socket, event.data) } socket.onerror = () => { + if (this.ws !== socket) { + return + } + if (!settled) { settled = true reject(new Error('统一 WebSocket 连接失败')) @@ -366,7 +400,7 @@ class UnifiedWebSocketClient { settled = true reject(new Error(`统一 WebSocket 已关闭 (${event.code})`)) } - this.handleClose(event) + this.handleClose(socket, event) } }) })() @@ -384,6 +418,7 @@ class UnifiedWebSocketClient { this.manualDisconnect = true this.clearReconnectTimer() this.stopHeartbeat() + this.lastPongAt = 0 this.rejectPendingRequests(new Error('统一 WebSocket 已手动断开')) this.connectPromise = null if (this.ws) { diff --git a/dashboard/src/router.tsx b/dashboard/src/router.tsx index 26b70d8e..5fcb42e7 100644 --- a/dashboard/src/router.tsx +++ b/dashboard/src/router.tsx @@ -156,7 +156,7 @@ const logsRoute = createRoute({ component: LogViewerPage, }) -// 计划器&恢复器监控路由 +// MaiSaka 聊天流监控路由 const plannerMonitorRoute = createRoute({ getParentRoute: () => protectedRoute, path: '/planner-monitor', diff --git a/dashboard/src/routes/monitor/index.tsx b/dashboard/src/routes/monitor/index.tsx index cf6c2f7a..73d8d4fe 100644 --- a/dashboard/src/routes/monitor/index.tsx +++ b/dashboard/src/routes/monitor/index.tsx @@ -1,86 +1,30 @@ /** - * 监控页面主入口 - * 整合规划器监控和回复器监控 + * MaiSaka 聊天流监控页面入口 + * + * 通过 WebSocket 实时渲染 MaiSaka 推理过程。 */ -import { Activity, RefreshCw, MessageSquareText } from 'lucide-react' -import { ScrollArea } from '@/components/ui/scroll-area' -import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs' -import { Button } from '@/components/ui/button' -import { useState, useCallback } from 'react' -import { PlannerMonitor } from './planner-monitor' -import { ReplierMonitor } from './replier-monitor' +import { Activity } from 'lucide-react' + +import { MaisakaMonitor } from './maisaka-monitor' export function PlannerMonitorPage() { - const [activeTab, setActiveTab] = useState<'planner' | 'replier'>('planner') - const [autoRefresh, setAutoRefresh] = useState(false) - const [refreshKey, setRefreshKey] = useState(0) - - const handleManualRefresh = useCallback(() => { - setRefreshKey(k => k + 1) - }, []) - return (
{/* 页面标题 */}
-

计划器 & 回复器监控

+

+ + MaiSaka 聊天流监控 +

- 实时监控麦麦的任务计划器和回复生成器运行状态 + 实时追踪 MaiSaka 推理引擎的完整思考过程

-
- - -
- {/* 标签页 */} - setActiveTab(v as 'planner' | 'replier')} - className="w-full" - > - - - - 计划器监控 - - - - 回复器监控 - - - - - - - - - - - - - + {/* 主体 */} +
) } diff --git a/dashboard/src/routes/monitor/maisaka-monitor.tsx b/dashboard/src/routes/monitor/maisaka-monitor.tsx new file mode 100644 index 00000000..8591c3eb --- /dev/null +++ b/dashboard/src/routes/monitor/maisaka-monitor.tsx @@ -0,0 +1,553 @@ +/** + * MaiSaka 聊天流实时监控组件 + * + * 通过 WebSocket 实时接收 MaiSaka 推理引擎事件, + * 以时间线形式展示聊天流的推理过程。 + */ +import { + Activity, + ArrowRight, + Bot, + Brain, + CheckCircle2, + ChevronDown, + ChevronRight, + CircleDot, + Clock, + Eraser, + Gauge, + MessageSquare, + PauseCircle, + Timer, + Wrench, + XCircle, + Zap, +} from 'lucide-react' +import { Badge } from '@/components/ui/badge' +import { Button } from '@/components/ui/button' +import { Card, CardHeader, CardTitle } from '@/components/ui/card' +import { ScrollArea } from '@/components/ui/scroll-area' +import { Separator } from '@/components/ui/separator' +import { cn } from '@/lib/utils' +import { useCallback, useEffect, useRef, useState } from 'react' + +import type { + CycleEndEvent, + CycleStartEvent, + MaisakaToolCall, + MessageIngestedEvent, + PlannerResponseEvent, + ReplierResponseEvent, + TimingGateResultEvent, + ToolExecutionEvent, +} from '@/lib/maisaka-monitor-client' +import type { SessionInfo, TimelineEntry } from './use-maisaka-monitor' +import { useMaisakaMonitor } from './use-maisaka-monitor' + +// ─── 工具函数 ────────────────────────────────────────────────── + +function formatMs(ms: number): string { + if (ms < 1000) return `${Math.round(ms)}ms` + return `${(ms / 1000).toFixed(2)}s` +} + +function formatTimestamp(ts: number): string { + return new Date(ts * 1000).toLocaleTimeString('zh-CN', { + hour: '2-digit', + minute: '2-digit', + second: '2-digit', + }) +} + +function formatRelativeTime(ts: number): string { + const diff = Date.now() / 1000 - ts + if (diff < 10) return '刚刚' + if (diff < 60) return `${Math.round(diff)}秒前` + if (diff < 3600) return `${Math.round(diff / 60)}分钟前` + return `${Math.round(diff / 3600)}小时前` +} + +// ─── 会话侧边栏 ────────────────────────────────────────────── + +function SessionSidebar({ + sessions, + selectedSession, + onSelect, +}: { + sessions: Map + selectedSession: string | null + onSelect: (id: string) => void +}) { + const sortedSessions = Array.from(sessions.values()).sort( + (a, b) => b.lastActivity - a.lastActivity, + ) + + if (sortedSessions.length === 0) { + return ( +
+ +

等待 MaiSaka 会话…

+
+ ) + } + + return ( +
+ {sortedSessions.map((session) => ( + + ))} +
+ ) +} + +// ─── 单条时间线事件渲染 ────────────────────────────────────── + +function MessageIngestedCard({ data }: { data: MessageIngestedEvent }) { + return ( +
+
+ +
+
+
+ {data.speaker_name} + {formatTimestamp(data.timestamp)} +
+

+ {data.content || '[空消息]'} +

+
+
+ ) +} + +function CycleStartCard({ data }: { data: CycleStartEvent }) { + return ( +
+
+ +
+
+ 推理循环 #{data.cycle_id} + + 回合 {data.round_index + 1}/{data.max_rounds} + + + 上下文 {data.history_count} 条 + +
+
+ ) +} + +function TimingGateCard({ data }: { data: TimingGateResultEvent }) { + const actionConfig: Record = { + continue: { label: '继续执行', variant: 'default', icon: ArrowRight }, + wait: { label: '等待', variant: 'secondary', icon: PauseCircle }, + no_reply: { label: '不回复', variant: 'destructive', icon: XCircle }, + } + const config = actionConfig[data.action] ?? actionConfig.continue + const Icon = config.icon + + return ( +
+
+ +
+
+
+ Timing Gate + + + {config.label} + + {formatMs(data.duration_ms)} +
+ {data.content && ( + + )} +
+
+ ) +} + +function PlannerResponseCard({ data }: { data: PlannerResponseEvent }) { + return ( +
+
+ +
+
+
+ 规划器思考 + {formatMs(data.duration_ms)} + + {data.prompt_tokens}+{data.completion_tokens} tokens + +
+ {data.content && ( + + )} + {data.tool_calls.length > 0 && ( +
+ {data.tool_calls.map((tc: MaisakaToolCall, idx: number) => ( + + + {tc.name} + + ))} +
+ )} +
+
+ ) +} + +function ToolExecutionCard({ data }: { data: ToolExecutionEvent }) { + return ( +
+
+ +
+
+
+ {data.tool_name} + {data.success + ? + : + } + {formatMs(data.duration_ms)} +
+ {Object.keys(data.tool_args).length > 0 && ( +
+ {JSON.stringify(data.tool_args, null, 2)} +
+ )} + {data.result_summary && ( + + )} +
+
+ ) +} + +function CycleEndCard({ data }: { data: CycleEndEvent }) { + const totalTime = Object.values(data.time_records).reduce((a, b) => a + b, 0) + return ( +
+
+ +
+
+ 循环结束 + + 总耗时 {formatMs(totalTime * 1000)} + + {Object.entries(data.time_records).map(([name, duration]) => ( + + {name}: {formatMs(duration * 1000)} + + ))} + + {data.agent_state} + +
+
+ ) +} + +// ─── 可折叠文本组件 ──────────────────────────────────────────── + +function CollapsibleText({ + text, + maxLines = 4, + className, +}: { + text: string + maxLines?: number + className?: string +}) { + const [expanded, setExpanded] = useState(false) + const lines = text.split('\n') + const needsCollapse = lines.length > maxLines + + if (!needsCollapse || expanded) { + return ( +
+

+ {text} +

+ {needsCollapse && ( + + )} +
+ ) + } + + return ( +
+

+ {lines.slice(0, maxLines).join('\n')} +

+ +
+ ) +} + +// ─── 回复器响应卡片 ────────────────────────────────────────── + +function ReplierResponseCard({ data }: { data: ReplierResponseEvent }) { + return ( + + +
+ + 回复器响应 + + {formatMs(data.duration_ms)} + + {data.success ? ( + + 成功 + + ) : ( + + 失败 + + )} + {formatTimestamp(data.timestamp)} +
+ {data.content && ( + + )} + {data.reasoning && ( +
+ + 思考过程 + + +
+ )} + {(data.prompt_tokens > 0 || data.completion_tokens > 0) && ( +
+ {data.model_name && 模型: {data.model_name}} + 输入: {data.prompt_tokens} + 输出: {data.completion_tokens} + 总计: {data.total_tokens} +
+ )} +
+
+ ) +} + +// ─── 时间线入口渲染器 ────────────────────────────────────────── + +function TimelineEventRenderer({ entry }: { entry: TimelineEntry }) { + switch (entry.type) { + case 'message.ingested': + return + case 'cycle.start': + return + case 'timing_gate.result': + return + case 'planner.response': + return + case 'tool.execution': + return + case 'cycle.end': + return + case 'replier.response': + return + // planner.request, replier.request 和 session.start 通常不需要在 timeline 中主要展示 + default: + return null + } +} + +// ─── 主组件 ───────────────────────────────────────────────── + +export function MaisakaMonitor() { + const { + timeline, + sessions, + selectedSession, + setSelectedSession, + connected, + clearTimeline, + } = useMaisakaMonitor() + + const scrollRef = useRef(null) + const [autoScroll, setAutoScroll] = useState(true) + + // 自动滚动到底部 + useEffect(() => { + if (autoScroll && scrollRef.current) { + const viewport = scrollRef.current.querySelector('[data-radix-scroll-area-viewport]') + if (viewport) { + viewport.scrollTop = viewport.scrollHeight + } + } + }, [timeline, autoScroll]) + + const handleScroll = useCallback((e: React.UIEvent) => { + const target = e.currentTarget.querySelector('[data-radix-scroll-area-viewport]') + if (!target) return + const { scrollTop, scrollHeight, clientHeight } = target as HTMLElement + setAutoScroll(scrollHeight - scrollTop - clientHeight < 80) + }, []) + + // 统计当前会话的各事件类型计数 + const stats = { + messages: timeline.filter((e) => e.type === 'message.ingested').length, + cycles: timeline.filter((e) => e.type === 'cycle.start').length, + toolCalls: timeline.filter((e) => e.type === 'tool.execution').length, + } + + return ( +
+ {/* 会话侧边栏 */} + + + + + 聊天流 + {connected && ( + + )} + + + + + + + + + {/* 主时间线区域 */} +
+ {/* 顶部统计栏 */} +
+
+
+ + {stats.messages} 消息 +
+
+ + {stats.cycles} 循环 +
+
+ + {stats.toolCalls} 工具调用 +
+
+
+ + +
+
+ + {/* 时间线 */} + + +
+ {timeline.length === 0 ? ( +
+ +

等待 MaiSaka 推理事件…

+

+ 当 MaiSaka 处理新消息时,推理过程会实时展示在这里 +

+
+ ) : ( + timeline.map((entry) => { + const rendered = + if (!rendered) return null + return ( +
+ {rendered} + {entry.type === 'cycle.end' && ( + + )} +
+ ) + }) + )} +
+
+
+
+
+ ) +} diff --git a/dashboard/src/routes/monitor/use-maisaka-monitor.ts b/dashboard/src/routes/monitor/use-maisaka-monitor.ts new file mode 100644 index 00000000..3ccf705d --- /dev/null +++ b/dashboard/src/routes/monitor/use-maisaka-monitor.ts @@ -0,0 +1,144 @@ +/** + * MaiSaka 聊天流实时监控 - React Hook + * + * 管理 WebSocket 订阅与事件流的状态。 + */ +import { useCallback, useEffect, useRef, useState } from 'react' + +import type { MaisakaMonitorEvent } from '@/lib/maisaka-monitor-client' +import { maisakaMonitorClient } from '@/lib/maisaka-monitor-client' + +/** 单条时间线事件(前端视图模型) */ +export interface TimelineEntry { + /** 唯一 ID */ + id: string + /** 事件类型 */ + type: MaisakaMonitorEvent['type'] + /** 原始事件数据 */ + data: MaisakaMonitorEvent['data'] + /** 事件时间戳 */ + timestamp: number + /** 所属会话 ID */ + sessionId: string +} + +/** 会话概要信息 */ +export interface SessionInfo { + sessionId: string + sessionName: string + lastActivity: number + eventCount: number +} + +/** 最大保留的时间线条目数 */ +const MAX_TIMELINE_ENTRIES = 500 + +let entryCounter = 0 + +export function useMaisakaMonitor() { + const [timeline, setTimeline] = useState([]) + const [sessions, setSessions] = useState>(new Map()) + const [selectedSession, setSelectedSession] = useState(null) + const [connected, setConnected] = useState(false) + const unsubRef = useRef<(() => Promise) | null>(null) + + const handleEvent = useCallback((event: MaisakaMonitorEvent) => { + const sessionId = (event.data as unknown as Record).session_id as string + const timestamp = (event.data as unknown as Record).timestamp as number + + const entry: TimelineEntry = { + id: `evt_${++entryCounter}_${Date.now()}`, + type: event.type, + data: event.data, + timestamp, + sessionId, + } + + setTimeline((prev) => { + const next = [...prev, entry] + return next.length > MAX_TIMELINE_ENTRIES + ? next.slice(next.length - MAX_TIMELINE_ENTRIES) + : next + }) + + // 更新会话信息 + if (event.type === 'session.start') { + const d = event.data + setSessions((prev) => { + const next = new Map(prev) + next.set(sessionId, { + sessionId, + sessionName: d.session_name, + lastActivity: timestamp, + eventCount: (prev.get(sessionId)?.eventCount ?? 0) + 1, + }) + return next + }) + } else { + setSessions((prev) => { + const existing = prev.get(sessionId) + if (!existing) { + const next = new Map(prev) + next.set(sessionId, { + sessionId, + sessionName: sessionId.slice(0, 8), + lastActivity: timestamp, + eventCount: 1, + }) + return next + } + const next = new Map(prev) + next.set(sessionId, { + ...existing, + lastActivity: timestamp, + eventCount: existing.eventCount + 1, + }) + return next + }) + } + + // 自动选中第一个会话 + setSelectedSession((current) => current ?? sessionId) + }, []) + + useEffect(() => { + let cancelled = false + + maisakaMonitorClient.subscribe(handleEvent).then((unsub) => { + if (cancelled) { + void unsub() + return + } + unsubRef.current = unsub + setConnected(true) + }) + + return () => { + cancelled = true + if (unsubRef.current) { + void unsubRef.current() + unsubRef.current = null + } + setConnected(false) + } + }, [handleEvent]) + + const clearTimeline = useCallback(() => { + setTimeline([]) + }, []) + + /** 当前选中会话的时间线 */ + const filteredTimeline = selectedSession + ? timeline.filter((e) => e.sessionId === selectedSession) + : timeline + + return { + timeline: filteredTimeline, + allTimeline: timeline, + sessions, + selectedSession, + setSelectedSession, + connected, + clearTimeline, + } +} diff --git a/pytests/test_napcat_adapter_sdk.py b/pytests/test_napcat_adapter_sdk.py index 82d0ad9f..877c6676 100644 --- a/pytests/test_napcat_adapter_sdk.py +++ b/pytests/test_napcat_adapter_sdk.py @@ -69,13 +69,22 @@ class _FakeGatewayCapability: class _FakeNapCatQueryService: """用于驱动 NapCat 入站编解码测试的查询服务替身。""" - def __init__(self, forward_payloads: Dict[str, Any] | None = None) -> None: + def __init__( + self, + forward_payloads: Dict[str, Any] | None = None, + group_member_payloads: Dict[tuple[str, str], Dict[str, Any] | None] | None = None, + stranger_payloads: Dict[str, Dict[str, Any] | None] | None = None, + ) -> None: """初始化查询服务替身。 Args: forward_payloads: 预置的合并转发响应映射。 + group_member_payloads: 预置的群成员资料映射。 + stranger_payloads: 预置的陌生人资料映射。 """ self._forward_payloads = forward_payloads or {} + self._group_member_payloads = group_member_payloads or {} + self._stranger_payloads = stranger_payloads or {} async def download_binary(self, url: str) -> bytes | None: """模拟下载远程二进制资源。 @@ -112,6 +121,20 @@ class _FakeNapCatQueryService: """ return self._forward_payloads.get(message_id) + async def get_group_member_info( + self, + group_id: str, + user_id: str, + no_cache: bool = True, + ) -> Dict[str, Any] | None: + """模拟获取群成员资料。""" + del no_cache + return self._group_member_payloads.get((group_id, user_id)) + + async def get_stranger_info(self, user_id: str) -> Dict[str, Any] | None: + """模拟获取 QQ 昵称资料。""" + return self._stranger_payloads.get(user_id) + async def get_record_detail(self, file_name: str, file_id: str | None = None) -> Dict[str, Any] | None: """模拟获取语音详情。 @@ -479,6 +502,137 @@ async def test_inbound_codec_parses_nested_inline_forward_content() -> None: assert nested_nodes[0]["content"] == [{"type": "text", "data": "内层消息"}] +@pytest.mark.asyncio +async def test_inbound_codec_resolves_at_to_group_cardname() -> None: + """入站编解码器应优先将 ``at`` 解析为群昵称。""" + + inbound_codec_cls = _load_napcat_inbound_codec_cls() + codec = inbound_codec_cls( + logger=logging.getLogger("test.napcat_adapter.at_cardname"), + query_service=_FakeNapCatQueryService( + group_member_payloads={ + ("12345", "1206069534"): { + "nickname": "QQ昵称", + "card": "群昵称", + } + } + ), + ) + + message_dict = await codec.build_message_dict( + payload={ + "message_type": "group", + "group_id": "12345", + "message_id": "msg-1", + "message": [{"type": "at", "data": {"qq": "1206069534"}}], + "sender": {"user_id": "10001", "nickname": "发送者"}, + "time": 1710000000, + }, + self_id="20001", + sender_user_id="10001", + sender={"user_id": "10001", "nickname": "发送者"}, + ) + + assert message_dict["processed_plain_text"] == "@群昵称" + assert message_dict["display_message"] == "@群昵称" + assert message_dict["raw_message"] == [ + { + "type": "at", + "data": { + "target_user_id": "1206069534", + "target_user_nickname": "QQ昵称", + "target_user_cardname": "群昵称", + }, + } + ] + + +@pytest.mark.asyncio +async def test_inbound_codec_falls_back_to_qq_nickname_when_group_cardname_is_empty() -> None: + """入站编解码器在群昵称为空时应回退到 QQ 昵称。""" + + inbound_codec_cls = _load_napcat_inbound_codec_cls() + codec = inbound_codec_cls( + logger=logging.getLogger("test.napcat_adapter.at_nickname"), + query_service=_FakeNapCatQueryService( + group_member_payloads={ + ("12345", "1206069534"): { + "nickname": "QQ昵称", + "card": "", + } + } + ), + ) + + message_dict = await codec.build_message_dict( + payload={ + "message_type": "group", + "group_id": "12345", + "message_id": "msg-2", + "message": [{"type": "at", "data": {"qq": "1206069534"}}], + "sender": {"user_id": "10001", "nickname": "发送者"}, + "time": 1710000000, + }, + self_id="20001", + sender_user_id="10001", + sender={"user_id": "10001", "nickname": "发送者"}, + ) + + assert message_dict["processed_plain_text"] == "@QQ昵称" + assert message_dict["display_message"] == "@QQ昵称" + assert message_dict["raw_message"] == [ + { + "type": "at", + "data": { + "target_user_id": "1206069534", + "target_user_nickname": "QQ昵称", + "target_user_cardname": None, + }, + } + ] + + +@pytest.mark.asyncio +async def test_inbound_codec_falls_back_to_stranger_nickname_when_group_profile_is_missing() -> None: + """入站编解码器在群资料缺失时应继续回退到 QQ 昵称。""" + + inbound_codec_cls = _load_napcat_inbound_codec_cls() + codec = inbound_codec_cls( + logger=logging.getLogger("test.napcat_adapter.at_stranger_nickname"), + query_service=_FakeNapCatQueryService( + group_member_payloads={("12345", "1206069534"): None}, + stranger_payloads={"1206069534": {"nickname": "QQ昵称"}}, + ), + ) + + message_dict = await codec.build_message_dict( + payload={ + "message_type": "group", + "group_id": "12345", + "message_id": "msg-3", + "message": [{"type": "at", "data": {"qq": "1206069534"}}], + "sender": {"user_id": "10001", "nickname": "发送者"}, + "time": 1710000000, + }, + self_id="20001", + sender_user_id="10001", + sender={"user_id": "10001", "nickname": "发送者"}, + ) + + assert message_dict["processed_plain_text"] == "@QQ昵称" + assert message_dict["display_message"] == "@QQ昵称" + assert message_dict["raw_message"] == [ + { + "type": "at", + "data": { + "target_user_id": "1206069534", + "target_user_nickname": "QQ昵称", + "target_user_cardname": None, + }, + } + ] + + @pytest.mark.asyncio async def test_query_service_normalizes_forward_payload_list() -> None: """查询服务应兼容 ``get_forward_msg`` 直接返回节点列表。""" diff --git a/src/chat/replyer/maisaka_generator_multi.py b/src/chat/replyer/maisaka_generator_multi.py index 74631441..eaa001f8 100644 --- a/src/chat/replyer/maisaka_generator_multi.py +++ b/src/chat/replyer/maisaka_generator_multi.py @@ -22,6 +22,7 @@ from src.common.prompt_i18n import load_prompt from src.config.config import global_config from src.core.types import ActionInfo from src.llm_models.payload_content.message import ImageMessagePart, Message, MessageBuilder, RoleType, TextMessagePart +from src.maisaka.monitor_events import emit_replier_request, emit_replier_response from src.services.llm_service import LLMServiceClient from src.maisaka.context_messages import ( @@ -428,6 +429,14 @@ class MaisakaReplyGenerator: ) started_at = time.perf_counter() + + # 向监控前端广播回复器请求事件 + await emit_replier_request( + session_id=preview_chat_id, + messages=request_messages, + model_name=getattr(self.express_model, "model_name", ""), + ) + try: generation_result = await self.express_model.generate_response_with_messages( message_factory=message_factory @@ -453,6 +462,19 @@ class MaisakaReplyGenerator: overall_ms=round((time.perf_counter() - started_at) * 1000, 2), ) + # 向监控前端广播回复器响应事件 + await emit_replier_response( + session_id=preview_chat_id, + content=response_text, + reasoning=generation_result.reasoning or "", + model_name=generation_result.model_name or "", + prompt_tokens=generation_result.prompt_tokens, + completion_tokens=generation_result.completion_tokens, + total_tokens=generation_result.total_tokens, + duration_ms=result.metrics.overall_ms or 0.0, + success=result.success, + ) + if global_config.debug.show_replyer_reasoning and result.completion.reasoning_text: logger.info(f"Maisaka 回复器思考内容:\n{result.completion.reasoning_text}") diff --git a/src/maisaka/monitor_events.py b/src/maisaka/monitor_events.py new file mode 100644 index 00000000..c94baae6 --- /dev/null +++ b/src/maisaka/monitor_events.py @@ -0,0 +1,473 @@ +"""MaiSaka 实时监控事件广播模块。 + +通过统一 WebSocket 将 MaiSaka 推理引擎各阶段的状态实时推送给前端监控界面, +无需落盘 HTML/TXT 中间文件即可在 WebUI 中渲染完整的聊天流推理过程。 +""" + +from typing import Any, Dict, List, Optional + +import time + +from src.common.logger import get_logger + +logger = get_logger("maisaka_monitor") + +# WebSocket 广播使用的业务域与主题 +MONITOR_DOMAIN = "maisaka_monitor" +MONITOR_TOPIC = "main" + + +def _serialize_message(message: Any) -> Dict[str, Any]: + """将单条 LLM 消息序列化为可通过 WebSocket 传输的字典。 + + 对二进制数据(如图片)仅保留元信息,不传输原始字节以减小带宽占用。 + + Args: + message: 原始消息对象,可以是 dict 或带 role/content 属性的消息实例。 + + Returns: + Dict[str, Any]: 序列化后的消息字典。 + """ + if isinstance(message, dict): + serialized: Dict[str, Any] = { + "role": str(message.get("role", "unknown")), + "content": message.get("content"), + } + if message.get("tool_call_id"): + serialized["tool_call_id"] = message["tool_call_id"] + if message.get("tool_calls"): + serialized["tool_calls"] = _serialize_tool_calls_from_dicts(message["tool_calls"]) + return serialized + + raw_role = getattr(message, "role", "unknown") + role_str = raw_role.value if hasattr(raw_role, "value") else str(raw_role) # type: ignore[union-attr] + + serialized = { + "role": role_str, + "content": _extract_text_content(getattr(message, "content", None)), + } + + tool_call_id = getattr(message, "tool_call_id", None) + if tool_call_id: + serialized["tool_call_id"] = str(tool_call_id) + + tool_calls = getattr(message, "tool_calls", None) + if tool_calls: + serialized["tool_calls"] = _serialize_tool_calls_from_objects(tool_calls) + + return serialized + + +def _extract_text_content(content: Any) -> Optional[str]: + """从消息内容中提取纯文本表示。 + + 支持字符串、列表(多模态内容块)等格式,对图片仅保留占位信息。 + + Args: + content: 消息的原始 content 字段。 + + Returns: + Optional[str]: 提取后的文本内容。 + """ + if content is None: + return None + if isinstance(content, str): + return content + if isinstance(content, list): + text_parts: List[str] = [] + for block in content: + if isinstance(block, dict): + block_type = block.get("type", "") + if block_type == "text": + text_parts.append(str(block.get("text", ""))) + elif block_type == "image_url": + text_parts.append("[图片]") + else: + text_parts.append(f"[{block_type}]") + elif isinstance(block, str): + text_parts.append(block) + return "\n".join(text_parts) if text_parts else None + return str(content) + + +def _serialize_tool_calls_from_objects(tool_calls: List[Any]) -> List[Dict[str, Any]]: + """将工具调用对象列表序列化为字典列表。 + + Args: + tool_calls: 工具调用对象列表(ToolCall 或类似结构)。 + + Returns: + List[Dict[str, Any]]: 序列化后的工具调用列表。 + """ + result: List[Dict[str, Any]] = [] + for tc in tool_calls: + serialized: Dict[str, Any] = { + "id": getattr(tc, "id", None) or getattr(tc, "tool_call_id", ""), + "name": getattr(tc, "func_name", None) or getattr(tc, "name", "unknown"), + } + args = getattr(tc, "args", None) or getattr(tc, "arguments", None) + if isinstance(args, dict): + serialized["arguments"] = args + elif isinstance(args, str): + serialized["arguments_raw"] = args + result.append(serialized) + return result + + +def _serialize_tool_calls_from_dicts(tool_calls: List[Any]) -> List[Dict[str, Any]]: + """将工具调用字典列表标准化为可传输格式。 + + Args: + tool_calls: 工具调用字典列表。 + + Returns: + List[Dict[str, Any]]: 标准化后的工具调用列表。 + """ + result: List[Dict[str, Any]] = [] + for tc in tool_calls: + if isinstance(tc, dict): + result.append({ + "id": tc.get("id", ""), + "name": tc.get("name", tc.get("func_name", "unknown")), + "arguments": tc.get("arguments", tc.get("args", {})), + }) + else: + result.append({ + "id": getattr(tc, "id", ""), + "name": getattr(tc, "func_name", "unknown"), + "arguments": getattr(tc, "args", {}), + }) + return result + + +def _serialize_messages(messages: List[Any]) -> List[Dict[str, Any]]: + """批量序列化消息列表。 + + Args: + messages: 原始消息列表。 + + Returns: + List[Dict[str, Any]]: 序列化后的消息字典列表。 + """ + return [_serialize_message(msg) for msg in messages] + + +async def _broadcast(event: str, data: Dict[str, Any]) -> None: + """通过统一 WebSocket 管理器向所有订阅了 maisaka_monitor 主题的连接广播事件。 + + 延迟导入 websocket_manager 以避免循环依赖。 + + Args: + event: 事件名称。 + data: 事件数据。 + """ + try: + from src.webui.routers.websocket.manager import websocket_manager + + subscription_key = f"{MONITOR_DOMAIN}:{MONITOR_TOPIC}" + total_connections = len(websocket_manager.connections) + subscriber_count = sum( + 1 for conn in websocket_manager.connections.values() + if subscription_key in conn.subscriptions + ) + + # 诊断:打印 manager 对象 id 和连接状态 + logger.info( + f"[诊断] _broadcast: manager_id={id(websocket_manager)} " + f"总连接={total_connections} 订阅者={subscriber_count} event={event}" + ) + if subscriber_count == 0 and total_connections > 0: + for cid, conn in websocket_manager.connections.items(): + logger.info( + f"[诊断] 连接={cid[:8]}… 订阅={conn.subscriptions}" + ) + + await websocket_manager.broadcast_to_topic( + domain=MONITOR_DOMAIN, + topic=MONITOR_TOPIC, + event=event, + data=data, + ) + except Exception as exc: + logger.warning(f"MaiSaka 监控事件广播失败: {exc}", exc_info=True) + + +async def emit_session_start(session_id: str, session_name: str) -> None: + """广播会话开始事件。 + + Args: + session_id: 聊天流 ID。 + session_name: 聊天流显示名称。 + """ + await _broadcast("session.start", { + "session_id": session_id, + "session_name": session_name, + "timestamp": time.time(), + }) + + +async def emit_message_ingested( + session_id: str, + speaker_name: str, + content: str, + message_id: str, + timestamp: float, +) -> None: + """广播新消息注入事件。 + + 当新的用户消息被纳入 MaiSaka 推理上下文时触发。 + + Args: + session_id: 聊天流 ID。 + speaker_name: 发言者名称。 + content: 消息文本内容。 + message_id: 消息 ID。 + timestamp: 消息时间戳。 + """ + await _broadcast("message.ingested", { + "session_id": session_id, + "speaker_name": speaker_name, + "content": content, + "message_id": message_id, + "timestamp": timestamp, + }) + + +async def emit_cycle_start( + session_id: str, + cycle_id: int, + round_index: int, + max_rounds: int, + history_count: int, +) -> None: + """广播推理循环开始事件。 + + Args: + session_id: 聊天流 ID。 + cycle_id: 循环编号。 + round_index: 当前回合索引(从 0 开始)。 + max_rounds: 最大回合数。 + history_count: 当前上下文消息数。 + """ + await _broadcast("cycle.start", { + "session_id": session_id, + "cycle_id": cycle_id, + "round_index": round_index, + "max_rounds": max_rounds, + "history_count": history_count, + "timestamp": time.time(), + }) + + +async def emit_timing_gate_result( + session_id: str, + cycle_id: int, + action: str, + content: Optional[str], + tool_calls: List[Any], + messages: List[Any], + prompt_tokens: int, + selected_history_count: int, + duration_ms: float, +) -> None: + """广播 Timing Gate 子代理结果事件。 + + Args: + session_id: 聊天流 ID。 + cycle_id: 循环编号。 + action: 控制决策(continue/wait/no_reply)。 + content: Timing Gate 返回的文本内容。 + tool_calls: 工具调用列表。 + messages: 发送给 Timing Gate 的消息列表。 + prompt_tokens: 输入 Token 数。 + selected_history_count: 已选上下文消息数。 + duration_ms: 执行耗时(毫秒)。 + """ + await _broadcast("timing_gate.result", { + "session_id": session_id, + "cycle_id": cycle_id, + "action": action, + "content": content, + "tool_calls": _serialize_tool_calls_from_objects(tool_calls), + "messages": _serialize_messages(messages), + "prompt_tokens": prompt_tokens, + "selected_history_count": selected_history_count, + "duration_ms": duration_ms, + "timestamp": time.time(), + }) + + +async def emit_planner_request( + session_id: str, + cycle_id: int, + messages: List[Any], + tool_count: int, + selected_history_count: int, +) -> None: + """广播规划器请求开始事件。 + + 携带完整的消息列表,前端可以增量渲染新增消息。 + + Args: + session_id: 聊天流 ID。 + cycle_id: 循环编号。 + messages: 发送给规划器的完整消息列表。 + tool_count: 可用工具数量。 + selected_history_count: 已选上下文消息数。 + """ + await _broadcast("planner.request", { + "session_id": session_id, + "cycle_id": cycle_id, + "messages": _serialize_messages(messages), + "tool_count": tool_count, + "selected_history_count": selected_history_count, + "timestamp": time.time(), + }) + + +async def emit_planner_response( + session_id: str, + cycle_id: int, + content: Optional[str], + tool_calls: List[Any], + prompt_tokens: int, + completion_tokens: int, + total_tokens: int, + duration_ms: float, +) -> None: + """广播规划器响应事件。 + + Args: + session_id: 聊天流 ID。 + cycle_id: 循环编号。 + content: 规划器返回的思考文本。 + tool_calls: 规划器返回的工具调用列表。 + prompt_tokens: 输入 Token 数。 + completion_tokens: 输出 Token 数。 + total_tokens: 总 Token 数。 + duration_ms: 执行耗时(毫秒)。 + """ + await _broadcast("planner.response", { + "session_id": session_id, + "cycle_id": cycle_id, + "content": content, + "tool_calls": _serialize_tool_calls_from_objects(tool_calls), + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": total_tokens, + "duration_ms": duration_ms, + "timestamp": time.time(), + }) + + +async def emit_tool_execution( + session_id: str, + cycle_id: int, + tool_name: str, + tool_args: Dict[str, Any], + result_summary: str, + success: bool, + duration_ms: float, +) -> None: + """广播工具执行结果事件。 + + Args: + session_id: 聊天流 ID。 + cycle_id: 循环编号。 + tool_name: 工具名称。 + tool_args: 工具参数。 + result_summary: 执行结果摘要。 + success: 是否成功。 + duration_ms: 执行耗时(毫秒)。 + """ + await _broadcast("tool.execution", { + "session_id": session_id, + "cycle_id": cycle_id, + "tool_name": tool_name, + "tool_args": tool_args, + "result_summary": result_summary, + "success": success, + "duration_ms": duration_ms, + "timestamp": time.time(), + }) + + +async def emit_cycle_end( + session_id: str, + cycle_id: int, + time_records: Dict[str, float], + agent_state: str, +) -> None: + """广播推理循环结束事件。 + + Args: + session_id: 聊天流 ID。 + cycle_id: 循环编号。 + time_records: 各阶段耗时记录。 + agent_state: 循环结束后的代理状态。 + """ + await _broadcast("cycle.end", { + "session_id": session_id, + "cycle_id": cycle_id, + "time_records": time_records, + "agent_state": agent_state, + "timestamp": time.time(), + }) + + +async def emit_replier_request( + session_id: str, + messages: List[Any], + model_name: str = "", +) -> None: + """广播回复器请求开始事件。 + + Args: + session_id: 聊天流 ID。 + messages: 发送给回复器的消息列表。 + model_name: 使用的模型名称。 + """ + await _broadcast("replier.request", { + "session_id": session_id, + "messages": _serialize_messages(messages), + "model_name": model_name, + "timestamp": time.time(), + }) + + +async def emit_replier_response( + session_id: str, + content: Optional[str], + reasoning: str, + model_name: str, + prompt_tokens: int, + completion_tokens: int, + total_tokens: int, + duration_ms: float, + success: bool, +) -> None: + """广播回复器响应事件。 + + Args: + session_id: 聊天流 ID。 + content: 回复器生成的文本。 + reasoning: 回复器的思考过程文本。 + model_name: 使用的模型名称。 + prompt_tokens: 输入 Token 数。 + completion_tokens: 输出 Token 数。 + total_tokens: 总 Token 数。 + duration_ms: 执行耗时(毫秒)。 + success: 是否生成成功。 + """ + await _broadcast("replier.response", { + "session_id": session_id, + "content": content, + "reasoning": reasoning, + "model_name": model_name, + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": total_tokens, + "duration_ms": duration_ms, + "success": success, + "timestamp": time.time(), + }) diff --git a/src/maisaka/reasoning_engine.py b/src/maisaka/reasoning_engine.py index c26dd7e3..4270cd81 100644 --- a/src/maisaka/reasoning_engine.py +++ b/src/maisaka/reasoning_engine.py @@ -38,6 +38,14 @@ from .message_adapter import ( clone_message_sequence, format_speaker_content, ) +from .monitor_events import ( + emit_cycle_end, + emit_cycle_start, + emit_message_ingested, + emit_planner_response, + emit_timing_gate_result, + emit_tool_execution, +) from .planner_message_utils import build_planner_user_prefix_from_session_message if TYPE_CHECKING: @@ -291,15 +299,35 @@ class MaisakaReasoningEngine: if self._runtime._pending_wait_tool_call_id: self._runtime._chat_history.append(self._build_wait_timeout_message()) self._trim_chat_history() + try: for round_index in range(self._runtime._max_internal_rounds): cycle_detail = self._start_cycle() self._runtime._log_cycle_started(cycle_detail, round_index) + await emit_cycle_start( + session_id=self._runtime.session_id, + cycle_id=cycle_detail.cycle_id, + round_index=round_index, + max_rounds=self._runtime._max_internal_rounds, + history_count=len(self._runtime._chat_history), + ) planner_started_at = 0.0 try: timing_started_at = time.time() timing_action, timing_response, timing_tool_results = await self._run_timing_gate(anchor_message) - cycle_detail.time_records["timing_gate"] = time.time() - timing_started_at + timing_duration_ms = (time.time() - timing_started_at) * 1000 + cycle_detail.time_records["timing_gate"] = timing_duration_ms / 1000 + await emit_timing_gate_result( + session_id=self._runtime.session_id, + cycle_id=cycle_detail.cycle_id, + action=timing_action, + content=timing_response.content, + tool_calls=timing_response.tool_calls, + messages=[], + prompt_tokens=timing_response.prompt_tokens, + selected_history_count=timing_response.selected_history_count, + duration_ms=timing_duration_ms, + ) self._runtime._render_context_usage_panel( selected_history_count=timing_response.selected_history_count, prompt_tokens=timing_response.prompt_tokens, @@ -326,12 +354,23 @@ class MaisakaReasoningEngine: response = await self._run_interruptible_planner( tool_definitions=action_tool_definitions, ) - cycle_detail.time_records["planner"] = time.time() - planner_started_at + planner_duration_ms = (time.time() - planner_started_at) * 1000 + cycle_detail.time_records["planner"] = planner_duration_ms / 1000 logger.info( f"{self._runtime.log_prefix} 规划器执行完成: " f"回合={round_index + 1} " f"耗时={cycle_detail.time_records['planner']:.3f} 秒" ) + await emit_planner_response( + session_id=self._runtime.session_id, + cycle_id=cycle_detail.cycle_id, + content=response.content, + tool_calls=response.tool_calls, + prompt_tokens=response.prompt_tokens, + completion_tokens=response.completion_tokens, + total_tokens=response.total_tokens, + duration_ms=planner_duration_ms, + ) reasoning_content = response.content or "" if self._should_replace_reasoning(reasoning_content): @@ -383,6 +422,12 @@ class MaisakaReasoningEngine: break finally: self._end_cycle(cycle_detail) + await emit_cycle_end( + session_id=self._runtime.session_id, + cycle_id=cycle_detail.cycle_id, + time_records=dict(cycle_detail.time_records), + agent_state=self._runtime._agent_state, + ) finally: if self._runtime._agent_state == self._runtime._STATE_RUNNING: self._runtime._agent_state = self._runtime._STATE_STOP @@ -470,6 +515,17 @@ class MaisakaReasoningEngine: self._insert_chat_history_message(history_message) self._trim_chat_history() + # 向监控前端广播新消息注入事件 + user_info = message.message_info.user_info + speaker_name = user_info.user_cardname or user_info.user_nickname or user_info.user_id + await emit_message_ingested( + session_id=self._runtime.session_id, + speaker_name=speaker_name, + content=(message.processed_plain_text or "").strip(), + message_id=message.message_id, + timestamp=message.timestamp.timestamp(), + ) + async def _build_history_message(self, message: SessionMessage) -> Optional[LLMContextMessage]: """根据真实消息构造对应的上下文消息。""" @@ -1030,7 +1086,9 @@ class MaisakaReasoningEngine: } for tool_call in tool_calls: invocation = self._build_tool_invocation(tool_call, latest_thought) + tool_started_at = time.time() result = await self._runtime._tool_registry.invoke(invocation, execution_context) + tool_duration_ms = (time.time() - tool_started_at) * 1000 await self._store_tool_execution_record( invocation, result, @@ -1039,6 +1097,18 @@ class MaisakaReasoningEngine: self._append_tool_execution_result(tool_call, result) tool_result_summaries.append(self._build_tool_result_summary(tool_call, result)) + # 向监控前端广播工具执行结果 + cycle_id = self._runtime._current_cycle_detail.cycle_id if self._runtime._current_cycle_detail else 0 + await emit_tool_execution( + session_id=self._runtime.session_id, + cycle_id=cycle_id, + tool_name=tool_call.func_name, + tool_args=invocation.arguments if isinstance(invocation.arguments, dict) else {}, + result_summary=result.content[:500] if result.content else (result.error_message or "")[:500], + success=result.success, + duration_ms=tool_duration_ms, + ) + if not result.success and tool_call.func_name == "reply": logger.warning(f"{self._runtime.log_prefix} 回复工具未生成可见消息,将继续下一轮循环") diff --git a/src/webui/routers/__init__.py b/src/webui/routers/__init__.py index b0c27606..67b4e326 100644 --- a/src/webui/routers/__init__.py +++ b/src/webui/routers/__init__.py @@ -14,11 +14,9 @@ def get_api_router() -> APIRouter: def get_all_routers() -> List[APIRouter]: """获取所有需要独立注册的路由器列表""" - from src.webui.api.planner import router as planner_router - from src.webui.api.replier import router as replier_router from src.webui.routers.chat import router as chat_router - from src.webui.routers.memory import compat_router as memory_compat_router from src.webui.routers.knowledge import router as knowledge_router + from src.webui.routers.memory import compat_router as memory_compat_router from src.webui.routes import router as main_router return [ @@ -26,8 +24,6 @@ def get_all_routers() -> List[APIRouter]: memory_compat_router, knowledge_router, chat_router, - planner_router, - replier_router, ] diff --git a/src/webui/routers/websocket/manager.py b/src/webui/routers/websocket/manager.py index 052567c8..81d6b929 100644 --- a/src/webui/routers/websocket/manager.py +++ b/src/webui/routers/websocket/manager.py @@ -1,10 +1,12 @@ """统一 WebSocket 连接管理器。""" -import asyncio from dataclasses import dataclass, field from typing import Any, Dict, Optional, Set +import asyncio + from fastapi import WebSocket +from starlette.websockets import WebSocketState from src.common.logger import get_logger @@ -42,6 +44,24 @@ class UnifiedWebSocketManager: """ return f"{domain}:{topic}" + async def _close_websocket(self, connection: WebSocketConnection) -> None: + """显式关闭底层 WebSocket 连接。 + + 某些异常退出路径只会执行清理逻辑,但不会自动向客户端发送关闭帧。 + 这里主动关闭底层连接,确保浏览器能够及时感知断线并触发重连。 + + Args: + connection: 目标连接上下文。 + """ + websocket = connection.websocket + if ( + websocket.client_state == WebSocketState.DISCONNECTED + or websocket.application_state == WebSocketState.DISCONNECTED + ): + return + + await websocket.close() + async def _sender_loop(self, connection: WebSocketConnection) -> None: """串行发送指定连接的出站消息。 @@ -85,6 +105,11 @@ class UnifiedWebSocketManager: if connection is None: return + try: + await self._close_websocket(connection) + except Exception as exc: + logger.debug("关闭统一 WebSocket 底层连接时出现异常: connection=%s, error=%s", connection_id, exc) + await connection.send_queue.put(None) if connection.sender_task is not None: try: diff --git a/src/webui/routers/websocket/unified.py b/src/webui/routers/websocket/unified.py index a2d9b7b3..98d2ffc8 100644 --- a/src/webui/routers/websocket/unified.py +++ b/src/webui/routers/websocket/unified.py @@ -1,6 +1,7 @@ """统一 WebSocket 路由。""" from typing import Any, Dict, Optional, Set, cast + import asyncio import time import uuid @@ -140,6 +141,26 @@ async def _handle_plugin_progress_subscribe(connection_id: str, request_id: Opti ) +async def _handle_maisaka_monitor_subscribe(connection_id: str, request_id: Optional[str]) -> None: + """处理 MaiSaka 监控域订阅请求。 + + Args: + connection_id: 连接 ID。 + request_id: 请求 ID。 + """ + logger.info( + f"MaiSaka 监控订阅请求: connection_id={connection_id} " + f"manager_id={id(websocket_manager)}" + ) + websocket_manager.subscribe(connection_id, domain="maisaka_monitor", topic="main") + await websocket_manager.send_response( + connection_id, + request_id=request_id, + ok=True, + data={"domain": "maisaka_monitor", "topic": "main"}, + ) + + async def _handle_subscribe(connection_id: str, message: Dict[str, Any]) -> None: """处理主题订阅请求。 @@ -160,6 +181,10 @@ async def _handle_subscribe(connection_id: str, message: Dict[str, Any]) -> None await _handle_plugin_progress_subscribe(connection_id, request_id) return + if domain == "maisaka_monitor" and topic == "main": + await _handle_maisaka_monitor_subscribe(connection_id, request_id) + return + await websocket_manager.send_response( connection_id, request_id=request_id, @@ -541,8 +566,16 @@ async def websocket_endpoint(websocket: WebSocket, token: Optional[str] = Query( await handle_client_message(connection_id, cast(Dict[str, Any], raw_message)) except WebSocketDisconnect: logger.info("统一 WebSocket 客户端已断开: connection=%s", connection_id) + except asyncio.CancelledError: + logger.warning("统一 WebSocket 连接处理被取消: connection=%s", connection_id) + raise except Exception as exc: - logger.error(f"统一 WebSocket 处理失败: {exc}") + logger.error("统一 WebSocket 处理失败: connection=%s, error=%s", connection_id, exc, exc_info=True) finally: chat_manager.disconnect_connection(connection_id) await websocket_manager.disconnect(connection_id) + logger.info( + "统一 WebSocket 连接清理完成: connection=%s, 剩余连接=%s", + connection_id, + len(websocket_manager.connections), + )