feat: add MaiSaka real-time chat flow monitoring component and WebSocket event handling

- Implemented the MaiSakaMonitor component for real-time monitoring of chat flow using WebSocket.
- Created a custom hook `useMaisakaMonitor` to manage WebSocket subscriptions and event states.
- Developed a backend module for broadcasting various monitoring events through WebSocket.
- Added serialization functions for messages and tool calls to optimize data transmission.
- Included event emission functions for session start, message ingestion, cycle start, timing gate results, planner requests/responses, tool executions, and replier requests/responses.
This commit is contained in:
DrSmoothl
2026-04-05 00:23:34 +08:00
parent 2fb911a8d5
commit c816ad4179
18 changed files with 1612 additions and 94 deletions

View File

@@ -36,7 +36,7 @@ export const menuSections: MenuSection[] = [
{ icon: LayoutGrid, label: 'sidebar.menu.configTemplate', path: '/config/pack-market' }, { icon: LayoutGrid, label: 'sidebar.menu.configTemplate', path: '/config/pack-market' },
{ icon: Sliders, label: 'sidebar.menu.pluginConfig', path: '/plugin-config' }, { icon: Sliders, label: 'sidebar.menu.pluginConfig', path: '/plugin-config' },
{ icon: FileSearch, label: 'sidebar.menu.logViewer', path: '/logs', searchDescription: 'search.items.logsDesc' }, { icon: FileSearch, label: 'sidebar.menu.logViewer', path: '/logs', searchDescription: 'search.items.logsDesc' },
{ icon: Activity, label: 'sidebar.menu.plannerMonitor', path: '/planner-monitor' }, { icon: Activity, label: 'sidebar.menu.maisakaMonitor', path: '/planner-monitor' },
{ icon: MessageSquare, label: 'sidebar.menu.localChat', path: '/chat' }, { icon: MessageSquare, label: 'sidebar.menu.localChat', path: '/chat' },
], ],
}, },

View File

@@ -38,7 +38,7 @@
"configTemplate": "Config Templates", "configTemplate": "Config Templates",
"pluginConfig": "Plugin Config", "pluginConfig": "Plugin Config",
"logViewer": "Log Viewer", "logViewer": "Log Viewer",
"plannerMonitor": "Planner & Replier Monitor", "maisakaMonitor": "MaiSaka Chat Monitor",
"localChat": "Local Chat", "localChat": "Local Chat",
"settings": "Settings" "settings": "Settings"
} }

View File

@@ -38,7 +38,7 @@
"configTemplate": "設定テンプレート", "configTemplate": "設定テンプレート",
"pluginConfig": "プラグイン設定", "pluginConfig": "プラグイン設定",
"logViewer": "ログビューア", "logViewer": "ログビューア",
"plannerMonitor": "プランナー & リプライヤー監視", "maisakaMonitor": "MaiSaka チャット監視",
"localChat": "ローカルチャット", "localChat": "ローカルチャット",
"settings": "設定" "settings": "設定"
} }

View File

@@ -38,7 +38,7 @@
"configTemplate": "설정 템플릿", "configTemplate": "설정 템플릿",
"pluginConfig": "플러그인 설정", "pluginConfig": "플러그인 설정",
"logViewer": "로그 뷰어", "logViewer": "로그 뷰어",
"plannerMonitor": "플래너 & 리플라이어 모니터", "maisakaMonitor": "MaiSaka 채팅 모니터",
"localChat": "로컬 채팅", "localChat": "로컬 채팅",
"settings": "설정" "settings": "설정"
} }

View File

@@ -38,7 +38,7 @@
"configTemplate": "配置模板市场", "configTemplate": "配置模板市场",
"pluginConfig": "插件配置", "pluginConfig": "插件配置",
"logViewer": "日志查看器", "logViewer": "日志查看器",
"plannerMonitor": "计划器&回复器监控", "maisakaMonitor": "MaiSaka 聊天流监控",
"localChat": "本地聊天室", "localChat": "本地聊天室",
"settings": "系统设置" "settings": "系统设置"
} }

View File

@@ -30,7 +30,7 @@ export async function getApiBaseUrl(): Promise<string> {
/** /**
* Get WebSocket base URL * Get WebSocket base URL
* - Electron: Convert HTTP/HTTPS URL to WS/WSS * - Electron: Convert HTTP/HTTPS URL to WS/WSS
* - Browser DEV: ws://127.0.0.1:8001 (hardcoded, same as log-websocket.ts) * - Browser DEV: Use same-origin WS URL and let Vite proxy forward requests
* - Browser PROD: Construct WS URL from window.location * - Browser PROD: Construct WS URL from window.location
*/ */
export async function getWsBaseUrl(): Promise<string> { export async function getWsBaseUrl(): Promise<string> {
@@ -47,9 +47,10 @@ export async function getWsBaseUrl(): Promise<string> {
}) })
} }
// Browser DEV: Use hardcoded WebSocket server // Browser DEV: Use same-origin URL so Vite proxy can forward WebSocket requests
if (import.meta.env.DEV) { if (import.meta.env.DEV) {
return 'ws://127.0.0.1:8001' const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
return `${protocol}//${window.location.host}`
} }
// Browser PROD: Construct WS URL from current location // Browser PROD: Construct WS URL from current location

View File

@@ -0,0 +1,223 @@
/**
* MaiSaka 实时监控 WebSocket 客户端
*
* 订阅 maisaka_monitor 主题,接收推理引擎各阶段的实时事件。
*/
import type { WsEventEnvelope } from './unified-ws'
import { unifiedWsClient } from './unified-ws'
// ─── 事件数据类型 ───────────────────────────────────────────────
export interface MaisakaMessage {
role: string
content: string | null
tool_call_id?: string
tool_calls?: MaisakaToolCall[]
}
export interface MaisakaToolCall {
id: string
name: string
arguments?: Record<string, unknown>
arguments_raw?: string
}
export interface SessionStartEvent {
session_id: string
session_name: string
timestamp: number
}
export interface MessageIngestedEvent {
session_id: string
speaker_name: string
content: string
message_id: string
timestamp: number
}
export interface CycleStartEvent {
session_id: string
cycle_id: number
round_index: number
max_rounds: number
history_count: number
timestamp: number
}
export interface TimingGateResultEvent {
session_id: string
cycle_id: number
action: 'continue' | 'wait' | 'no_reply'
content: string | null
tool_calls: MaisakaToolCall[]
messages: MaisakaMessage[]
prompt_tokens: number
selected_history_count: number
duration_ms: number
timestamp: number
}
export interface PlannerRequestEvent {
session_id: string
cycle_id: number
messages: MaisakaMessage[]
tool_count: number
selected_history_count: number
timestamp: number
}
export interface PlannerResponseEvent {
session_id: string
cycle_id: number
content: string | null
tool_calls: MaisakaToolCall[]
prompt_tokens: number
completion_tokens: number
total_tokens: number
duration_ms: number
timestamp: number
}
export interface ToolExecutionEvent {
session_id: string
cycle_id: number
tool_name: string
tool_args: Record<string, unknown>
result_summary: string
success: boolean
duration_ms: number
timestamp: number
}
export interface CycleEndEvent {
session_id: string
cycle_id: number
time_records: Record<string, number>
agent_state: string
timestamp: number
}
export interface ReplierRequestEvent {
session_id: string
messages: MaisakaMessage[]
model_name: string
timestamp: number
}
export interface ReplierResponseEvent {
session_id: string
content: string | null
reasoning: string
model_name: string
prompt_tokens: number
completion_tokens: number
total_tokens: number
duration_ms: number
success: boolean
timestamp: number
}
// ─── 统一事件联合类型 ─────────────────────────────────────────
export type MaisakaMonitorEvent =
| { type: 'session.start'; data: SessionStartEvent }
| { type: 'message.ingested'; data: MessageIngestedEvent }
| { type: 'cycle.start'; data: CycleStartEvent }
| { type: 'timing_gate.result'; data: TimingGateResultEvent }
| { type: 'planner.request'; data: PlannerRequestEvent }
| { type: 'planner.response'; data: PlannerResponseEvent }
| { type: 'tool.execution'; data: ToolExecutionEvent }
| { type: 'cycle.end'; data: CycleEndEvent }
| { type: 'replier.request'; data: ReplierRequestEvent }
| { type: 'replier.response'; data: ReplierResponseEvent }
export type MaisakaEventListener = (event: MaisakaMonitorEvent) => void
// ─── 客户端 ───────────────────────────────────────────────────
class MaisakaMonitorClient {
private initialized = false
private listenerIdCounter = 0
private listeners: Map<number, MaisakaEventListener> = new Map()
private subscriptionActive = false
private subscriptionPromise: Promise<void> | null = null
private deferredUnsubTimer: ReturnType<typeof setTimeout> | null = null
private initialize(): void {
if (this.initialized) {
return
}
unifiedWsClient.addEventListener((message: WsEventEnvelope) => {
if (message.domain !== 'maisaka_monitor') {
return
}
const event: MaisakaMonitorEvent = {
type: message.event as MaisakaMonitorEvent['type'],
data: message.data as never,
}
this.listeners.forEach((listener) => {
try {
listener(event)
} catch (error) {
console.error('MaiSaka 监控事件监听器执行失败:', error)
}
})
})
this.initialized = true
}
private async ensureSubscribed(): Promise<void> {
if (this.subscriptionActive) {
return
}
if (this.subscriptionPromise === null) {
this.subscriptionPromise = unifiedWsClient
.subscribe('maisaka_monitor', 'main')
.then(() => {
this.subscriptionActive = true
})
.finally(() => {
this.subscriptionPromise = null
})
}
await this.subscriptionPromise
}
async subscribe(listener: MaisakaEventListener): Promise<() => Promise<void>> {
this.initialize()
const listenerId = ++this.listenerIdCounter
this.listeners.set(listenerId, listener)
// 如果有待执行的延迟退订取消它React StrictMode 快速卸载/重新挂载)
if (this.deferredUnsubTimer !== null) {
clearTimeout(this.deferredUnsubTimer)
this.deferredUnsubTimer = null
}
await this.ensureSubscribed()
return async () => {
this.listeners.delete(listenerId)
if (this.listeners.size === 0 && this.subscriptionActive) {
// 延迟退订:等待短暂时间再真正退订,避免 StrictMode 导致的竞态
this.deferredUnsubTimer = setTimeout(() => {
this.deferredUnsubTimer = null
if (this.listeners.size === 0 && this.subscriptionActive) {
this.subscriptionActive = false
void unifiedWsClient.unsubscribe('maisaka_monitor', 'main')
}
}, 200)
}
}
}
}
export const maisakaMonitorClient = new MaisakaMonitorClient()

View File

@@ -83,11 +83,14 @@ async function getWsToken(): Promise<string | null> {
} }
class UnifiedWebSocketClient { class UnifiedWebSocketClient {
private readonly heartbeatIntervalMs = 30000
private readonly heartbeatTimeoutMs = 90000
private connectPromise: Promise<void> | null = null private connectPromise: Promise<void> | null = null
private connectionListeners: Set<ConnectionListener> = new Set() private connectionListeners: Set<ConnectionListener> = new Set()
private eventListeners: Set<EventListener> = new Set() private eventListeners: Set<EventListener> = new Set()
private hasConnectedOnce = false private hasConnectedOnce = false
private heartbeatIntervalId: number | null = null private heartbeatIntervalId: number | null = null
private lastPongAt = 0
private manualDisconnect = false private manualDisconnect = false
private pendingRequests: Map<string, PendingRequest> = new Map() private pendingRequests: Map<string, PendingRequest> = new Map()
private reconnectAttempts = 0 private reconnectAttempts = 0
@@ -151,10 +154,21 @@ class UnifiedWebSocketClient {
private startHeartbeat(): void { private startHeartbeat(): void {
this.stopHeartbeat() this.stopHeartbeat()
this.heartbeatIntervalId = window.setInterval(() => { this.heartbeatIntervalId = window.setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) { if (this.ws?.readyState !== WebSocket.OPEN) {
this.ws.send(JSON.stringify({ op: 'ping' })) return
} }
}, 30000)
const now = Date.now()
if (this.lastPongAt > 0 && now - this.lastPongAt > this.heartbeatTimeoutMs) {
console.warn('统一 WebSocket 心跳超时,准备重连')
void this.restart().catch((error) => {
console.error('统一 WebSocket 心跳重连失败:', error)
})
return
}
this.ws.send(JSON.stringify({ op: 'ping' }))
}, this.heartbeatIntervalMs)
} }
private clearReconnectTimer(): void { private clearReconnectTimer(): void {
@@ -252,7 +266,11 @@ class UnifiedWebSocketClient {
} }
} }
private handleServerMessage(rawData: string): void { private handleServerMessage(socket: WebSocket, rawData: string): void {
if (this.ws !== socket) {
return
}
let message: WsServerEnvelope let message: WsServerEnvelope
try { try {
message = JSON.parse(rawData) as WsServerEnvelope message = JSON.parse(rawData) as WsServerEnvelope
@@ -262,6 +280,7 @@ class UnifiedWebSocketClient {
} }
if (message.op === 'pong') { if (message.op === 'pong') {
this.lastPongAt = Date.now()
return return
} }
@@ -297,8 +316,13 @@ class UnifiedWebSocketClient {
} }
} }
private handleClose(event: CloseEvent): void { private handleClose(socket: WebSocket, event: CloseEvent): void {
if (this.ws !== socket) {
return
}
this.stopHeartbeat() this.stopHeartbeat()
this.lastPongAt = 0
this.ws = null this.ws = null
this.connectPromise = null this.connectPromise = null
this.setStatus('idle') this.setStatus('idle')
@@ -340,10 +364,16 @@ class UnifiedWebSocketClient {
this.ws = socket this.ws = socket
socket.onopen = () => { socket.onopen = () => {
if (this.ws !== socket) {
socket.close()
return
}
settled = true settled = true
const shouldNotifyReconnect = this.hasConnectedOnce const shouldNotifyReconnect = this.hasConnectedOnce
this.hasConnectedOnce = true this.hasConnectedOnce = true
this.reconnectAttempts = 0 this.reconnectAttempts = 0
this.lastPongAt = Date.now()
this.startHeartbeat() this.startHeartbeat()
this.setStatus('connected') this.setStatus('connected')
resolve() resolve()
@@ -351,10 +381,14 @@ class UnifiedWebSocketClient {
} }
socket.onmessage = (event) => { socket.onmessage = (event) => {
this.handleServerMessage(event.data) this.handleServerMessage(socket, event.data)
} }
socket.onerror = () => { socket.onerror = () => {
if (this.ws !== socket) {
return
}
if (!settled) { if (!settled) {
settled = true settled = true
reject(new Error('统一 WebSocket 连接失败')) reject(new Error('统一 WebSocket 连接失败'))
@@ -366,7 +400,7 @@ class UnifiedWebSocketClient {
settled = true settled = true
reject(new Error(`统一 WebSocket 已关闭 (${event.code})`)) reject(new Error(`统一 WebSocket 已关闭 (${event.code})`))
} }
this.handleClose(event) this.handleClose(socket, event)
} }
}) })
})() })()
@@ -384,6 +418,7 @@ class UnifiedWebSocketClient {
this.manualDisconnect = true this.manualDisconnect = true
this.clearReconnectTimer() this.clearReconnectTimer()
this.stopHeartbeat() this.stopHeartbeat()
this.lastPongAt = 0
this.rejectPendingRequests(new Error('统一 WebSocket 已手动断开')) this.rejectPendingRequests(new Error('统一 WebSocket 已手动断开'))
this.connectPromise = null this.connectPromise = null
if (this.ws) { if (this.ws) {

View File

@@ -156,7 +156,7 @@ const logsRoute = createRoute({
component: LogViewerPage, component: LogViewerPage,
}) })
// 计划器&恢复器监控路由 // MaiSaka 聊天流监控路由
const plannerMonitorRoute = createRoute({ const plannerMonitorRoute = createRoute({
getParentRoute: () => protectedRoute, getParentRoute: () => protectedRoute,
path: '/planner-monitor', path: '/planner-monitor',

View File

@@ -1,86 +1,30 @@
/** /**
* 监控页面入口 * MaiSaka 聊天流监控页面入口
* 整合规划器监控和回复器监控 *
* 通过 WebSocket 实时渲染 MaiSaka 推理过程。
*/ */
import { Activity, RefreshCw, MessageSquareText } from 'lucide-react' import { Activity } from 'lucide-react'
import { ScrollArea } from '@/components/ui/scroll-area'
import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs' import { MaisakaMonitor } from './maisaka-monitor'
import { Button } from '@/components/ui/button'
import { useState, useCallback } from 'react'
import { PlannerMonitor } from './planner-monitor'
import { ReplierMonitor } from './replier-monitor'
export function PlannerMonitorPage() { export function PlannerMonitorPage() {
const [activeTab, setActiveTab] = useState<'planner' | 'replier'>('planner')
const [autoRefresh, setAutoRefresh] = useState(false)
const [refreshKey, setRefreshKey] = useState(0)
const handleManualRefresh = useCallback(() => {
setRefreshKey(k => k + 1)
}, [])
return ( return (
<div className="space-y-4 sm:space-y-6 p-4 sm:p-6"> <div className="space-y-4 sm:space-y-6 p-4 sm:p-6">
{/* 页面标题 */} {/* 页面标题 */}
<div className="flex flex-col sm:flex-row sm:items-center justify-between gap-4"> <div className="flex flex-col sm:flex-row sm:items-center justify-between gap-4">
<div> <div>
<h1 className="text-2xl sm:text-3xl font-bold"> &amp; </h1> <h1 className="text-2xl sm:text-3xl font-bold flex items-center gap-2">
<Activity className="h-6 w-6 sm:h-7 sm:w-7" />
MaiSaka
</h1>
<p className="text-muted-foreground mt-1 sm:mt-2 text-sm sm:text-base"> <p className="text-muted-foreground mt-1 sm:mt-2 text-sm sm:text-base">
MaiSaka
</p> </p>
</div> </div>
<div className="flex items-center gap-2">
<Button
variant={autoRefresh ? "default" : "outline"}
size="sm"
onClick={() => setAutoRefresh(!autoRefresh)}
>
<RefreshCw className={`h-4 w-4 mr-2 ${autoRefresh ? 'animate-spin' : ''}`} />
{autoRefresh ? '自动刷新中' : '自动刷新'}
</Button>
<Button
variant="outline"
size="sm"
onClick={handleManualRefresh}
>
<RefreshCw className="h-4 w-4" />
</Button>
</div>
</div> </div>
{/* 标签页 */} {/* 主体 */}
<Tabs <MaisakaMonitor />
value={activeTab}
onValueChange={(v) => setActiveTab(v as 'planner' | 'replier')}
className="w-full"
>
<TabsList className="grid w-full grid-cols-2 gap-0.5 sm:gap-1 h-auto p-1">
<TabsTrigger value="planner" className="gap-1 sm:gap-2 text-xs sm:text-sm px-2 sm:px-3 py-2">
<Activity className="h-3.5 w-3.5 sm:h-4 sm:w-4" strokeWidth={2} fill="none" />
<span></span>
</TabsTrigger>
<TabsTrigger value="replier" className="gap-1 sm:gap-2 text-xs sm:text-sm px-2 sm:px-3 py-2">
<MessageSquareText className="h-3.5 w-3.5 sm:h-4 sm:w-4" strokeWidth={2} fill="none" />
<span></span>
</TabsTrigger>
</TabsList>
<ScrollArea className="h-[calc(100vh-240px)] sm:h-[calc(100vh-280px)] mt-4 sm:mt-6">
<TabsContent value="planner" className="mt-0">
<PlannerMonitor
autoRefresh={autoRefresh}
refreshKey={refreshKey}
/>
</TabsContent>
<TabsContent value="replier" className="mt-0">
<ReplierMonitor
autoRefresh={autoRefresh}
refreshKey={refreshKey}
/>
</TabsContent>
</ScrollArea>
</Tabs>
</div> </div>
) )
} }

View File

@@ -0,0 +1,553 @@
/**
* MaiSaka 聊天流实时监控组件
*
* 通过 WebSocket 实时接收 MaiSaka 推理引擎事件,
* 以时间线形式展示聊天流的推理过程。
*/
import {
Activity,
ArrowRight,
Bot,
Brain,
CheckCircle2,
ChevronDown,
ChevronRight,
CircleDot,
Clock,
Eraser,
Gauge,
MessageSquare,
PauseCircle,
Timer,
Wrench,
XCircle,
Zap,
} from 'lucide-react'
import { Badge } from '@/components/ui/badge'
import { Button } from '@/components/ui/button'
import { Card, CardHeader, CardTitle } from '@/components/ui/card'
import { ScrollArea } from '@/components/ui/scroll-area'
import { Separator } from '@/components/ui/separator'
import { cn } from '@/lib/utils'
import { useCallback, useEffect, useRef, useState } from 'react'
import type {
CycleEndEvent,
CycleStartEvent,
MaisakaToolCall,
MessageIngestedEvent,
PlannerResponseEvent,
ReplierResponseEvent,
TimingGateResultEvent,
ToolExecutionEvent,
} from '@/lib/maisaka-monitor-client'
import type { SessionInfo, TimelineEntry } from './use-maisaka-monitor'
import { useMaisakaMonitor } from './use-maisaka-monitor'
// ─── 工具函数 ──────────────────────────────────────────────────
function formatMs(ms: number): string {
if (ms < 1000) return `${Math.round(ms)}ms`
return `${(ms / 1000).toFixed(2)}s`
}
function formatTimestamp(ts: number): string {
return new Date(ts * 1000).toLocaleTimeString('zh-CN', {
hour: '2-digit',
minute: '2-digit',
second: '2-digit',
})
}
function formatRelativeTime(ts: number): string {
const diff = Date.now() / 1000 - ts
if (diff < 10) return '刚刚'
if (diff < 60) return `${Math.round(diff)}秒前`
if (diff < 3600) return `${Math.round(diff / 60)}分钟前`
return `${Math.round(diff / 3600)}小时前`
}
// ─── 会话侧边栏 ──────────────────────────────────────────────
function SessionSidebar({
sessions,
selectedSession,
onSelect,
}: {
sessions: Map<string, SessionInfo>
selectedSession: string | null
onSelect: (id: string) => void
}) {
const sortedSessions = Array.from(sessions.values()).sort(
(a, b) => b.lastActivity - a.lastActivity,
)
if (sortedSessions.length === 0) {
return (
<div className="flex flex-col items-center justify-center h-full text-muted-foreground gap-2 p-4">
<Bot className="h-8 w-8 opacity-40" />
<p className="text-sm text-center"> MaiSaka </p>
</div>
)
}
return (
<div className="flex flex-col gap-1 p-2">
{sortedSessions.map((session) => (
<button
key={session.sessionId}
onClick={() => onSelect(session.sessionId)}
className={cn(
'flex flex-col items-start gap-0.5 rounded-lg px-3 py-2 text-left text-sm transition-colors',
'hover:bg-accent/50',
selectedSession === session.sessionId && 'bg-accent text-accent-foreground',
)}
>
<div className="flex w-full items-center justify-between">
<span className="font-medium truncate max-w-35">
{session.sessionName}
</span>
<Badge variant="secondary" className="text-[10px] h-4 px-1">
{session.eventCount}
</Badge>
</div>
<span className="text-xs text-muted-foreground">
{formatRelativeTime(session.lastActivity)}
</span>
</button>
))}
</div>
)
}
// ─── 单条时间线事件渲染 ──────────────────────────────────────
function MessageIngestedCard({ data }: { data: MessageIngestedEvent }) {
return (
<div className="flex items-start gap-3">
<div className="mt-1 flex h-7 w-7 shrink-0 items-center justify-center rounded-full bg-blue-500/15 text-blue-500">
<MessageSquare className="h-3.5 w-3.5" />
</div>
<div className="flex-1 min-w-0">
<div className="flex items-center gap-2 mb-1">
<span className="font-medium text-sm">{data.speaker_name}</span>
<span className="text-xs text-muted-foreground">{formatTimestamp(data.timestamp)}</span>
</div>
<p className="text-sm text-foreground/80 whitespace-pre-wrap wrap-break-word leading-relaxed">
{data.content || '[空消息]'}
</p>
</div>
</div>
)
}
function CycleStartCard({ data }: { data: CycleStartEvent }) {
return (
<div className="flex items-center gap-3">
<div className="mt-0.5 flex h-7 w-7 shrink-0 items-center justify-center rounded-full bg-violet-500/15 text-violet-500">
<Zap className="h-3.5 w-3.5" />
</div>
<div className="flex items-center gap-2 flex-wrap">
<span className="text-sm font-medium"> #{data.cycle_id}</span>
<Badge variant="outline" className="text-[10px]">
{data.round_index + 1}/{data.max_rounds}
</Badge>
<Badge variant="secondary" className="text-[10px]">
{data.history_count}
</Badge>
</div>
</div>
)
}
function TimingGateCard({ data }: { data: TimingGateResultEvent }) {
const actionConfig: Record<string, { label: string; variant: 'default' | 'secondary' | 'destructive'; icon: typeof ArrowRight }> = {
continue: { label: '继续执行', variant: 'default', icon: ArrowRight },
wait: { label: '等待', variant: 'secondary', icon: PauseCircle },
no_reply: { label: '不回复', variant: 'destructive', icon: XCircle },
}
const config = actionConfig[data.action] ?? actionConfig.continue
const Icon = config.icon
return (
<div className="flex items-start gap-3">
<div className="mt-1 flex h-7 w-7 shrink-0 items-center justify-center rounded-full bg-amber-500/15 text-amber-500">
<Timer className="h-3.5 w-3.5" />
</div>
<div className="flex-1 min-w-0">
<div className="flex items-center gap-2 mb-1 flex-wrap">
<span className="text-sm font-medium">Timing Gate</span>
<Badge variant={config.variant} className="text-[10px] gap-0.5">
<Icon className="h-2.5 w-2.5" />
{config.label}
</Badge>
<span className="text-xs text-muted-foreground">{formatMs(data.duration_ms)}</span>
</div>
{data.content && (
<CollapsibleText text={data.content} maxLines={3} />
)}
</div>
</div>
)
}
function PlannerResponseCard({ data }: { data: PlannerResponseEvent }) {
return (
<div className="flex items-start gap-3">
<div className="mt-1 flex h-7 w-7 shrink-0 items-center justify-center rounded-full bg-emerald-500/15 text-emerald-500">
<Brain className="h-3.5 w-3.5" />
</div>
<div className="flex-1 min-w-0">
<div className="flex items-center gap-2 mb-1 flex-wrap">
<span className="text-sm font-medium"></span>
<span className="text-xs text-muted-foreground">{formatMs(data.duration_ms)}</span>
<Badge variant="outline" className="text-[10px]">
{data.prompt_tokens}+{data.completion_tokens} tokens
</Badge>
</div>
{data.content && (
<CollapsibleText text={data.content} maxLines={6} />
)}
{data.tool_calls.length > 0 && (
<div className="mt-2 flex flex-wrap gap-1.5">
{data.tool_calls.map((tc: MaisakaToolCall, idx: number) => (
<Badge key={idx} variant="secondary" className="text-[10px] gap-1">
<Wrench className="h-2.5 w-2.5" />
{tc.name}
</Badge>
))}
</div>
)}
</div>
</div>
)
}
function ToolExecutionCard({ data }: { data: ToolExecutionEvent }) {
return (
<div className="flex items-start gap-3">
<div className={cn(
'mt-1 flex h-7 w-7 shrink-0 items-center justify-center rounded-full',
data.success
? 'bg-teal-500/15 text-teal-500'
: 'bg-red-500/15 text-red-500',
)}>
<Wrench className="h-3.5 w-3.5" />
</div>
<div className="flex-1 min-w-0">
<div className="flex items-center gap-2 mb-1 flex-wrap">
<span className="text-sm font-medium font-mono">{data.tool_name}</span>
{data.success
? <CheckCircle2 className="h-3.5 w-3.5 text-teal-500" />
: <XCircle className="h-3.5 w-3.5 text-red-500" />
}
<span className="text-xs text-muted-foreground">{formatMs(data.duration_ms)}</span>
</div>
{Object.keys(data.tool_args).length > 0 && (
<div className="text-xs text-muted-foreground font-mono bg-muted/50 rounded px-2 py-1 mb-1 whitespace-pre-wrap break-all">
{JSON.stringify(data.tool_args, null, 2)}
</div>
)}
{data.result_summary && (
<CollapsibleText text={data.result_summary} maxLines={3} className="text-muted-foreground" />
)}
</div>
</div>
)
}
function CycleEndCard({ data }: { data: CycleEndEvent }) {
const totalTime = Object.values(data.time_records).reduce((a, b) => a + b, 0)
return (
<div className="flex items-center gap-3">
<div className="mt-0.5 flex h-7 w-7 shrink-0 items-center justify-center rounded-full bg-slate-500/15 text-slate-500">
<CircleDot className="h-3.5 w-3.5" />
</div>
<div className="flex items-center gap-2 flex-wrap">
<span className="text-sm text-muted-foreground"></span>
<Badge variant="outline" className="text-[10px]">
{formatMs(totalTime * 1000)}
</Badge>
{Object.entries(data.time_records).map(([name, duration]) => (
<span key={name} className="text-[10px] text-muted-foreground">
{name}: {formatMs(duration * 1000)}
</span>
))}
<Badge
variant={data.agent_state === 'running' ? 'default' : 'secondary'}
className="text-[10px]"
>
{data.agent_state}
</Badge>
</div>
</div>
)
}
// ─── 可折叠文本组件 ────────────────────────────────────────────
function CollapsibleText({
text,
maxLines = 4,
className,
}: {
text: string
maxLines?: number
className?: string
}) {
const [expanded, setExpanded] = useState(false)
const lines = text.split('\n')
const needsCollapse = lines.length > maxLines
if (!needsCollapse || expanded) {
return (
<div className="relative">
<p className={cn(
'text-sm whitespace-pre-wrap wrap-break-word leading-relaxed',
className,
)}>
{text}
</p>
{needsCollapse && (
<button
onClick={() => setExpanded(false)}
className="text-xs text-primary hover:underline mt-1 flex items-center gap-0.5"
>
<ChevronDown className="h-3 w-3" />
</button>
)}
</div>
)
}
return (
<div>
<p className={cn(
'text-sm whitespace-pre-wrap wrap-break-word leading-relaxed',
className,
)}>
{lines.slice(0, maxLines).join('\n')}
</p>
<button
onClick={() => setExpanded(true)}
className="text-xs text-primary hover:underline mt-1 flex items-center gap-0.5"
>
<ChevronRight className="h-3 w-3" /> ({lines.length} )
</button>
</div>
)
}
// ─── 回复器响应卡片 ──────────────────────────────────────────
function ReplierResponseCard({ data }: { data: ReplierResponseEvent }) {
return (
<Card className="border-l-4 border-l-purple-500/60">
<CardHeader className="py-2.5 px-4 space-y-2">
<div className="flex items-center gap-2">
<Bot className="h-4 w-4 text-purple-500" />
<CardTitle className="text-sm font-medium"></CardTitle>
<Badge variant="outline" className="text-xs font-normal ml-auto">
{formatMs(data.duration_ms)}
</Badge>
{data.success ? (
<Badge variant="secondary" className="text-xs gap-1">
<CheckCircle2 className="h-3 w-3" />
</Badge>
) : (
<Badge variant="destructive" className="text-xs gap-1">
<XCircle className="h-3 w-3" />
</Badge>
)}
<span className="text-xs text-muted-foreground">{formatTimestamp(data.timestamp)}</span>
</div>
{data.content && (
<CollapsibleText text={data.content} maxLines={6} className="text-foreground/90" />
)}
{data.reasoning && (
<details className="mt-1">
<summary className="text-xs text-muted-foreground cursor-pointer hover:text-foreground">
</summary>
<CollapsibleText text={data.reasoning} maxLines={8} className="mt-1 text-muted-foreground" />
</details>
)}
{(data.prompt_tokens > 0 || data.completion_tokens > 0) && (
<div className="flex gap-3 text-xs text-muted-foreground mt-1">
{data.model_name && <span>: {data.model_name}</span>}
<span>: {data.prompt_tokens}</span>
<span>: {data.completion_tokens}</span>
<span>: {data.total_tokens}</span>
</div>
)}
</CardHeader>
</Card>
)
}
// ─── 时间线入口渲染器 ──────────────────────────────────────────
function TimelineEventRenderer({ entry }: { entry: TimelineEntry }) {
switch (entry.type) {
case 'message.ingested':
return <MessageIngestedCard data={entry.data as MessageIngestedEvent} />
case 'cycle.start':
return <CycleStartCard data={entry.data as CycleStartEvent} />
case 'timing_gate.result':
return <TimingGateCard data={entry.data as TimingGateResultEvent} />
case 'planner.response':
return <PlannerResponseCard data={entry.data as PlannerResponseEvent} />
case 'tool.execution':
return <ToolExecutionCard data={entry.data as ToolExecutionEvent} />
case 'cycle.end':
return <CycleEndCard data={entry.data as CycleEndEvent} />
case 'replier.response':
return <ReplierResponseCard data={entry.data as ReplierResponseEvent} />
// planner.request, replier.request 和 session.start 通常不需要在 timeline 中主要展示
default:
return null
}
}
// ─── 主组件 ─────────────────────────────────────────────────
export function MaisakaMonitor() {
const {
timeline,
sessions,
selectedSession,
setSelectedSession,
connected,
clearTimeline,
} = useMaisakaMonitor()
const scrollRef = useRef<HTMLDivElement>(null)
const [autoScroll, setAutoScroll] = useState(true)
// 自动滚动到底部
useEffect(() => {
if (autoScroll && scrollRef.current) {
const viewport = scrollRef.current.querySelector('[data-radix-scroll-area-viewport]')
if (viewport) {
viewport.scrollTop = viewport.scrollHeight
}
}
}, [timeline, autoScroll])
const handleScroll = useCallback((e: React.UIEvent<HTMLDivElement>) => {
const target = e.currentTarget.querySelector('[data-radix-scroll-area-viewport]')
if (!target) return
const { scrollTop, scrollHeight, clientHeight } = target as HTMLElement
setAutoScroll(scrollHeight - scrollTop - clientHeight < 80)
}, [])
// 统计当前会话的各事件类型计数
const stats = {
messages: timeline.filter((e) => e.type === 'message.ingested').length,
cycles: timeline.filter((e) => e.type === 'cycle.start').length,
toolCalls: timeline.filter((e) => e.type === 'tool.execution').length,
}
return (
<div className="flex h-[calc(100vh-180px)] gap-4">
{/* 会话侧边栏 */}
<Card className="w-60 shrink-0 flex flex-col">
<CardHeader className="py-3 px-4 space-y-0">
<CardTitle className="text-sm font-medium flex items-center gap-2">
<Activity className="h-4 w-4" />
{connected && (
<span className="ml-auto flex h-2 w-2 rounded-full bg-emerald-500" />
)}
</CardTitle>
</CardHeader>
<Separator />
<ScrollArea className="flex-1">
<SessionSidebar
sessions={sessions}
selectedSession={selectedSession}
onSelect={setSelectedSession}
/>
</ScrollArea>
</Card>
{/* 主时间线区域 */}
<div className="flex-1 flex flex-col min-w-0">
{/* 顶部统计栏 */}
<div className="flex items-center gap-3 mb-3 flex-wrap">
<div className="flex items-center gap-4 text-sm">
<div className="flex items-center gap-1.5 text-muted-foreground">
<MessageSquare className="h-3.5 w-3.5" />
<span>{stats.messages} </span>
</div>
<div className="flex items-center gap-1.5 text-muted-foreground">
<Brain className="h-3.5 w-3.5" />
<span>{stats.cycles} </span>
</div>
<div className="flex items-center gap-1.5 text-muted-foreground">
<Wrench className="h-3.5 w-3.5" />
<span>{stats.toolCalls} </span>
</div>
</div>
<div className="ml-auto flex items-center gap-2">
<Button
variant="ghost"
size="sm"
className="h-7 text-xs"
onClick={() => setAutoScroll(!autoScroll)}
>
<Gauge className={cn('h-3.5 w-3.5 mr-1', autoScroll && 'text-primary')} />
{autoScroll ? '跟踪中' : '已暂停'}
</Button>
<Button
variant="ghost"
size="sm"
className="h-7 text-xs"
onClick={clearTimeline}
>
<Eraser className="h-3.5 w-3.5 mr-1" />
</Button>
</div>
</div>
{/* 时间线 */}
<Card className="flex-1 overflow-hidden">
<ScrollArea
className="h-full"
ref={scrollRef}
onScrollCapture={handleScroll}
>
<div className="p-4 space-y-3">
{timeline.length === 0 ? (
<div className="flex flex-col items-center justify-center py-20 text-muted-foreground gap-3">
<Clock className="h-10 w-10 opacity-30" />
<p className="text-sm"> MaiSaka </p>
<p className="text-xs opacity-60">
MaiSaka
</p>
</div>
) : (
timeline.map((entry) => {
const rendered = <TimelineEventRenderer entry={entry} />
if (!rendered) return null
return (
<div
key={entry.id}
className="animate-in fade-in-0 slide-in-from-bottom-2 duration-300"
>
{rendered}
{entry.type === 'cycle.end' && (
<Separator className="mt-3" />
)}
</div>
)
})
)}
</div>
</ScrollArea>
</Card>
</div>
</div>
)
}

View File

@@ -0,0 +1,144 @@
/**
* MaiSaka 聊天流实时监控 - React Hook
*
* 管理 WebSocket 订阅与事件流的状态。
*/
import { useCallback, useEffect, useRef, useState } from 'react'
import type { MaisakaMonitorEvent } from '@/lib/maisaka-monitor-client'
import { maisakaMonitorClient } from '@/lib/maisaka-monitor-client'
/** 单条时间线事件(前端视图模型) */
export interface TimelineEntry {
/** 唯一 ID */
id: string
/** 事件类型 */
type: MaisakaMonitorEvent['type']
/** 原始事件数据 */
data: MaisakaMonitorEvent['data']
/** 事件时间戳 */
timestamp: number
/** 所属会话 ID */
sessionId: string
}
/** 会话概要信息 */
export interface SessionInfo {
sessionId: string
sessionName: string
lastActivity: number
eventCount: number
}
/** 最大保留的时间线条目数 */
const MAX_TIMELINE_ENTRIES = 500
let entryCounter = 0
export function useMaisakaMonitor() {
const [timeline, setTimeline] = useState<TimelineEntry[]>([])
const [sessions, setSessions] = useState<Map<string, SessionInfo>>(new Map())
const [selectedSession, setSelectedSession] = useState<string | null>(null)
const [connected, setConnected] = useState(false)
const unsubRef = useRef<(() => Promise<void>) | null>(null)
const handleEvent = useCallback((event: MaisakaMonitorEvent) => {
const sessionId = (event.data as unknown as Record<string, unknown>).session_id as string
const timestamp = (event.data as unknown as Record<string, unknown>).timestamp as number
const entry: TimelineEntry = {
id: `evt_${++entryCounter}_${Date.now()}`,
type: event.type,
data: event.data,
timestamp,
sessionId,
}
setTimeline((prev) => {
const next = [...prev, entry]
return next.length > MAX_TIMELINE_ENTRIES
? next.slice(next.length - MAX_TIMELINE_ENTRIES)
: next
})
// 更新会话信息
if (event.type === 'session.start') {
const d = event.data
setSessions((prev) => {
const next = new Map(prev)
next.set(sessionId, {
sessionId,
sessionName: d.session_name,
lastActivity: timestamp,
eventCount: (prev.get(sessionId)?.eventCount ?? 0) + 1,
})
return next
})
} else {
setSessions((prev) => {
const existing = prev.get(sessionId)
if (!existing) {
const next = new Map(prev)
next.set(sessionId, {
sessionId,
sessionName: sessionId.slice(0, 8),
lastActivity: timestamp,
eventCount: 1,
})
return next
}
const next = new Map(prev)
next.set(sessionId, {
...existing,
lastActivity: timestamp,
eventCount: existing.eventCount + 1,
})
return next
})
}
// 自动选中第一个会话
setSelectedSession((current) => current ?? sessionId)
}, [])
useEffect(() => {
let cancelled = false
maisakaMonitorClient.subscribe(handleEvent).then((unsub) => {
if (cancelled) {
void unsub()
return
}
unsubRef.current = unsub
setConnected(true)
})
return () => {
cancelled = true
if (unsubRef.current) {
void unsubRef.current()
unsubRef.current = null
}
setConnected(false)
}
}, [handleEvent])
const clearTimeline = useCallback(() => {
setTimeline([])
}, [])
/** 当前选中会话的时间线 */
const filteredTimeline = selectedSession
? timeline.filter((e) => e.sessionId === selectedSession)
: timeline
return {
timeline: filteredTimeline,
allTimeline: timeline,
sessions,
selectedSession,
setSelectedSession,
connected,
clearTimeline,
}
}

View File

@@ -26,6 +26,7 @@ from src.common.utils.utils_session import SessionUtils
from src.config.config import global_config from src.config.config import global_config
from src.core.types import ActionInfo from src.core.types import ActionInfo
from src.llm_models.payload_content.message import ImageMessagePart, Message, MessageBuilder, RoleType, TextMessagePart from src.llm_models.payload_content.message import ImageMessagePart, Message, MessageBuilder, RoleType, TextMessagePart
from src.maisaka.monitor_events import emit_replier_request, emit_replier_response
from src.services.llm_service import LLMServiceClient from src.services.llm_service import LLMServiceClient
from src.maisaka.context_messages import ( from src.maisaka.context_messages import (
@@ -533,6 +534,14 @@ class MaisakaReplyGenerator:
) )
started_at = time.perf_counter() started_at = time.perf_counter()
# 向监控前端广播回复器请求事件
await emit_replier_request(
session_id=preview_chat_id,
messages=request_messages,
model_name=getattr(self.express_model, "model_name", ""),
)
try: try:
generation_result = await self.express_model.generate_response_with_messages(message_factory=message_factory) generation_result = await self.express_model.generate_response_with_messages(message_factory=message_factory)
except Exception as exc: except Exception as exc:
@@ -556,6 +565,19 @@ class MaisakaReplyGenerator:
overall_ms=round((time.perf_counter() - started_at) * 1000, 2), overall_ms=round((time.perf_counter() - started_at) * 1000, 2),
) )
# 向监控前端广播回复器响应事件
await emit_replier_response(
session_id=preview_chat_id,
content=response_text,
reasoning=generation_result.reasoning or "",
model_name=generation_result.model_name or "",
prompt_tokens=generation_result.prompt_tokens,
completion_tokens=generation_result.completion_tokens,
total_tokens=generation_result.total_tokens,
duration_ms=result.metrics.overall_ms or 0.0,
success=result.success,
)
if global_config.debug.show_replyer_reasoning and result.completion.reasoning_text: if global_config.debug.show_replyer_reasoning and result.completion.reasoning_text:
logger.info(f"Maisaka 回复器思考内容:\n{result.completion.reasoning_text}") logger.info(f"Maisaka 回复器思考内容:\n{result.completion.reasoning_text}")

View File

@@ -0,0 +1,473 @@
"""MaiSaka 实时监控事件广播模块。
通过统一 WebSocket 将 MaiSaka 推理引擎各阶段的状态实时推送给前端监控界面,
无需落盘 HTML/TXT 中间文件即可在 WebUI 中渲染完整的聊天流推理过程。
"""
from typing import Any, Dict, List, Optional
import time
from src.common.logger import get_logger
logger = get_logger("maisaka_monitor")
# WebSocket 广播使用的业务域与主题
MONITOR_DOMAIN = "maisaka_monitor"
MONITOR_TOPIC = "main"
def _serialize_message(message: Any) -> Dict[str, Any]:
"""将单条 LLM 消息序列化为可通过 WebSocket 传输的字典。
对二进制数据(如图片)仅保留元信息,不传输原始字节以减小带宽占用。
Args:
message: 原始消息对象,可以是 dict 或带 role/content 属性的消息实例。
Returns:
Dict[str, Any]: 序列化后的消息字典。
"""
if isinstance(message, dict):
serialized: Dict[str, Any] = {
"role": str(message.get("role", "unknown")),
"content": message.get("content"),
}
if message.get("tool_call_id"):
serialized["tool_call_id"] = message["tool_call_id"]
if message.get("tool_calls"):
serialized["tool_calls"] = _serialize_tool_calls_from_dicts(message["tool_calls"])
return serialized
raw_role = getattr(message, "role", "unknown")
role_str = raw_role.value if hasattr(raw_role, "value") else str(raw_role) # type: ignore[union-attr]
serialized = {
"role": role_str,
"content": _extract_text_content(getattr(message, "content", None)),
}
tool_call_id = getattr(message, "tool_call_id", None)
if tool_call_id:
serialized["tool_call_id"] = str(tool_call_id)
tool_calls = getattr(message, "tool_calls", None)
if tool_calls:
serialized["tool_calls"] = _serialize_tool_calls_from_objects(tool_calls)
return serialized
def _extract_text_content(content: Any) -> Optional[str]:
"""从消息内容中提取纯文本表示。
支持字符串、列表(多模态内容块)等格式,对图片仅保留占位信息。
Args:
content: 消息的原始 content 字段。
Returns:
Optional[str]: 提取后的文本内容。
"""
if content is None:
return None
if isinstance(content, str):
return content
if isinstance(content, list):
text_parts: List[str] = []
for block in content:
if isinstance(block, dict):
block_type = block.get("type", "")
if block_type == "text":
text_parts.append(str(block.get("text", "")))
elif block_type == "image_url":
text_parts.append("[图片]")
else:
text_parts.append(f"[{block_type}]")
elif isinstance(block, str):
text_parts.append(block)
return "\n".join(text_parts) if text_parts else None
return str(content)
def _serialize_tool_calls_from_objects(tool_calls: List[Any]) -> List[Dict[str, Any]]:
"""将工具调用对象列表序列化为字典列表。
Args:
tool_calls: 工具调用对象列表ToolCall 或类似结构)。
Returns:
List[Dict[str, Any]]: 序列化后的工具调用列表。
"""
result: List[Dict[str, Any]] = []
for tc in tool_calls:
serialized: Dict[str, Any] = {
"id": getattr(tc, "id", None) or getattr(tc, "tool_call_id", ""),
"name": getattr(tc, "func_name", None) or getattr(tc, "name", "unknown"),
}
args = getattr(tc, "args", None) or getattr(tc, "arguments", None)
if isinstance(args, dict):
serialized["arguments"] = args
elif isinstance(args, str):
serialized["arguments_raw"] = args
result.append(serialized)
return result
def _serialize_tool_calls_from_dicts(tool_calls: List[Any]) -> List[Dict[str, Any]]:
"""将工具调用字典列表标准化为可传输格式。
Args:
tool_calls: 工具调用字典列表。
Returns:
List[Dict[str, Any]]: 标准化后的工具调用列表。
"""
result: List[Dict[str, Any]] = []
for tc in tool_calls:
if isinstance(tc, dict):
result.append({
"id": tc.get("id", ""),
"name": tc.get("name", tc.get("func_name", "unknown")),
"arguments": tc.get("arguments", tc.get("args", {})),
})
else:
result.append({
"id": getattr(tc, "id", ""),
"name": getattr(tc, "func_name", "unknown"),
"arguments": getattr(tc, "args", {}),
})
return result
def _serialize_messages(messages: List[Any]) -> List[Dict[str, Any]]:
"""批量序列化消息列表。
Args:
messages: 原始消息列表。
Returns:
List[Dict[str, Any]]: 序列化后的消息字典列表。
"""
return [_serialize_message(msg) for msg in messages]
async def _broadcast(event: str, data: Dict[str, Any]) -> None:
"""通过统一 WebSocket 管理器向所有订阅了 maisaka_monitor 主题的连接广播事件。
延迟导入 websocket_manager 以避免循环依赖。
Args:
event: 事件名称。
data: 事件数据。
"""
try:
from src.webui.routers.websocket.manager import websocket_manager
subscription_key = f"{MONITOR_DOMAIN}:{MONITOR_TOPIC}"
total_connections = len(websocket_manager.connections)
subscriber_count = sum(
1 for conn in websocket_manager.connections.values()
if subscription_key in conn.subscriptions
)
# 诊断:打印 manager 对象 id 和连接状态
logger.info(
f"[诊断] _broadcast: manager_id={id(websocket_manager)} "
f"总连接={total_connections} 订阅者={subscriber_count} event={event}"
)
if subscriber_count == 0 and total_connections > 0:
for cid, conn in websocket_manager.connections.items():
logger.info(
f"[诊断] 连接={cid[:8]}… 订阅={conn.subscriptions}"
)
await websocket_manager.broadcast_to_topic(
domain=MONITOR_DOMAIN,
topic=MONITOR_TOPIC,
event=event,
data=data,
)
except Exception as exc:
logger.warning(f"MaiSaka 监控事件广播失败: {exc}", exc_info=True)
async def emit_session_start(session_id: str, session_name: str) -> None:
"""广播会话开始事件。
Args:
session_id: 聊天流 ID。
session_name: 聊天流显示名称。
"""
await _broadcast("session.start", {
"session_id": session_id,
"session_name": session_name,
"timestamp": time.time(),
})
async def emit_message_ingested(
session_id: str,
speaker_name: str,
content: str,
message_id: str,
timestamp: float,
) -> None:
"""广播新消息注入事件。
当新的用户消息被纳入 MaiSaka 推理上下文时触发。
Args:
session_id: 聊天流 ID。
speaker_name: 发言者名称。
content: 消息文本内容。
message_id: 消息 ID。
timestamp: 消息时间戳。
"""
await _broadcast("message.ingested", {
"session_id": session_id,
"speaker_name": speaker_name,
"content": content,
"message_id": message_id,
"timestamp": timestamp,
})
async def emit_cycle_start(
session_id: str,
cycle_id: int,
round_index: int,
max_rounds: int,
history_count: int,
) -> None:
"""广播推理循环开始事件。
Args:
session_id: 聊天流 ID。
cycle_id: 循环编号。
round_index: 当前回合索引(从 0 开始)。
max_rounds: 最大回合数。
history_count: 当前上下文消息数。
"""
await _broadcast("cycle.start", {
"session_id": session_id,
"cycle_id": cycle_id,
"round_index": round_index,
"max_rounds": max_rounds,
"history_count": history_count,
"timestamp": time.time(),
})
async def emit_timing_gate_result(
session_id: str,
cycle_id: int,
action: str,
content: Optional[str],
tool_calls: List[Any],
messages: List[Any],
prompt_tokens: int,
selected_history_count: int,
duration_ms: float,
) -> None:
"""广播 Timing Gate 子代理结果事件。
Args:
session_id: 聊天流 ID。
cycle_id: 循环编号。
action: 控制决策continue/wait/no_reply
content: Timing Gate 返回的文本内容。
tool_calls: 工具调用列表。
messages: 发送给 Timing Gate 的消息列表。
prompt_tokens: 输入 Token 数。
selected_history_count: 已选上下文消息数。
duration_ms: 执行耗时(毫秒)。
"""
await _broadcast("timing_gate.result", {
"session_id": session_id,
"cycle_id": cycle_id,
"action": action,
"content": content,
"tool_calls": _serialize_tool_calls_from_objects(tool_calls),
"messages": _serialize_messages(messages),
"prompt_tokens": prompt_tokens,
"selected_history_count": selected_history_count,
"duration_ms": duration_ms,
"timestamp": time.time(),
})
async def emit_planner_request(
session_id: str,
cycle_id: int,
messages: List[Any],
tool_count: int,
selected_history_count: int,
) -> None:
"""广播规划器请求开始事件。
携带完整的消息列表,前端可以增量渲染新增消息。
Args:
session_id: 聊天流 ID。
cycle_id: 循环编号。
messages: 发送给规划器的完整消息列表。
tool_count: 可用工具数量。
selected_history_count: 已选上下文消息数。
"""
await _broadcast("planner.request", {
"session_id": session_id,
"cycle_id": cycle_id,
"messages": _serialize_messages(messages),
"tool_count": tool_count,
"selected_history_count": selected_history_count,
"timestamp": time.time(),
})
async def emit_planner_response(
session_id: str,
cycle_id: int,
content: Optional[str],
tool_calls: List[Any],
prompt_tokens: int,
completion_tokens: int,
total_tokens: int,
duration_ms: float,
) -> None:
"""广播规划器响应事件。
Args:
session_id: 聊天流 ID。
cycle_id: 循环编号。
content: 规划器返回的思考文本。
tool_calls: 规划器返回的工具调用列表。
prompt_tokens: 输入 Token 数。
completion_tokens: 输出 Token 数。
total_tokens: 总 Token 数。
duration_ms: 执行耗时(毫秒)。
"""
await _broadcast("planner.response", {
"session_id": session_id,
"cycle_id": cycle_id,
"content": content,
"tool_calls": _serialize_tool_calls_from_objects(tool_calls),
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"duration_ms": duration_ms,
"timestamp": time.time(),
})
async def emit_tool_execution(
session_id: str,
cycle_id: int,
tool_name: str,
tool_args: Dict[str, Any],
result_summary: str,
success: bool,
duration_ms: float,
) -> None:
"""广播工具执行结果事件。
Args:
session_id: 聊天流 ID。
cycle_id: 循环编号。
tool_name: 工具名称。
tool_args: 工具参数。
result_summary: 执行结果摘要。
success: 是否成功。
duration_ms: 执行耗时(毫秒)。
"""
await _broadcast("tool.execution", {
"session_id": session_id,
"cycle_id": cycle_id,
"tool_name": tool_name,
"tool_args": tool_args,
"result_summary": result_summary,
"success": success,
"duration_ms": duration_ms,
"timestamp": time.time(),
})
async def emit_cycle_end(
session_id: str,
cycle_id: int,
time_records: Dict[str, float],
agent_state: str,
) -> None:
"""广播推理循环结束事件。
Args:
session_id: 聊天流 ID。
cycle_id: 循环编号。
time_records: 各阶段耗时记录。
agent_state: 循环结束后的代理状态。
"""
await _broadcast("cycle.end", {
"session_id": session_id,
"cycle_id": cycle_id,
"time_records": time_records,
"agent_state": agent_state,
"timestamp": time.time(),
})
async def emit_replier_request(
session_id: str,
messages: List[Any],
model_name: str = "",
) -> None:
"""广播回复器请求开始事件。
Args:
session_id: 聊天流 ID。
messages: 发送给回复器的消息列表。
model_name: 使用的模型名称。
"""
await _broadcast("replier.request", {
"session_id": session_id,
"messages": _serialize_messages(messages),
"model_name": model_name,
"timestamp": time.time(),
})
async def emit_replier_response(
session_id: str,
content: Optional[str],
reasoning: str,
model_name: str,
prompt_tokens: int,
completion_tokens: int,
total_tokens: int,
duration_ms: float,
success: bool,
) -> None:
"""广播回复器响应事件。
Args:
session_id: 聊天流 ID。
content: 回复器生成的文本。
reasoning: 回复器的思考过程文本。
model_name: 使用的模型名称。
prompt_tokens: 输入 Token 数。
completion_tokens: 输出 Token 数。
total_tokens: 总 Token 数。
duration_ms: 执行耗时(毫秒)。
success: 是否生成成功。
"""
await _broadcast("replier.response", {
"session_id": session_id,
"content": content,
"reasoning": reasoning,
"model_name": model_name,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"duration_ms": duration_ms,
"success": success,
"timestamp": time.time(),
})

View File

@@ -38,6 +38,14 @@ from .message_adapter import (
clone_message_sequence, clone_message_sequence,
format_speaker_content, format_speaker_content,
) )
from .monitor_events import (
emit_cycle_end,
emit_cycle_start,
emit_message_ingested,
emit_planner_response,
emit_timing_gate_result,
emit_tool_execution,
)
from .planner_message_utils import build_planner_user_prefix_from_session_message from .planner_message_utils import build_planner_user_prefix_from_session_message
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -279,11 +287,30 @@ class MaisakaReasoningEngine:
for round_index in range(self._runtime._max_internal_rounds): for round_index in range(self._runtime._max_internal_rounds):
cycle_detail = self._start_cycle() cycle_detail = self._start_cycle()
self._runtime._log_cycle_started(cycle_detail, round_index) self._runtime._log_cycle_started(cycle_detail, round_index)
await emit_cycle_start(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
round_index=round_index,
max_rounds=self._runtime._max_internal_rounds,
history_count=len(self._runtime._chat_history),
)
planner_started_at = 0.0 planner_started_at = 0.0
try: try:
timing_started_at = time.time() timing_started_at = time.time()
timing_action, timing_response, timing_tool_results = await self._run_timing_gate(anchor_message) timing_action, timing_response, timing_tool_results = await self._run_timing_gate(anchor_message)
cycle_detail.time_records["timing_gate"] = time.time() - timing_started_at timing_duration_ms = (time.time() - timing_started_at) * 1000
cycle_detail.time_records["timing_gate"] = timing_duration_ms / 1000
await emit_timing_gate_result(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
action=timing_action,
content=timing_response.content,
tool_calls=timing_response.tool_calls,
messages=[],
prompt_tokens=timing_response.prompt_tokens,
selected_history_count=timing_response.selected_history_count,
duration_ms=timing_duration_ms,
)
self._runtime._render_context_usage_panel( self._runtime._render_context_usage_panel(
selected_history_count=timing_response.selected_history_count, selected_history_count=timing_response.selected_history_count,
prompt_tokens=timing_response.prompt_tokens, prompt_tokens=timing_response.prompt_tokens,
@@ -310,12 +337,23 @@ class MaisakaReasoningEngine:
response = await self._run_interruptible_planner( response = await self._run_interruptible_planner(
tool_definitions=action_tool_definitions, tool_definitions=action_tool_definitions,
) )
cycle_detail.time_records["planner"] = time.time() - planner_started_at planner_duration_ms = (time.time() - planner_started_at) * 1000
cycle_detail.time_records["planner"] = planner_duration_ms / 1000
logger.info( logger.info(
f"{self._runtime.log_prefix} 规划器执行完成: " f"{self._runtime.log_prefix} 规划器执行完成: "
f"回合={round_index + 1} " f"回合={round_index + 1} "
f"耗时={cycle_detail.time_records['planner']:.3f}" f"耗时={cycle_detail.time_records['planner']:.3f}"
) )
await emit_planner_response(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
content=response.content,
tool_calls=response.tool_calls,
prompt_tokens=response.prompt_tokens,
completion_tokens=response.completion_tokens,
total_tokens=response.total_tokens,
duration_ms=planner_duration_ms,
)
reasoning_content = response.content or "" reasoning_content = response.content or ""
if self._should_replace_reasoning(reasoning_content): if self._should_replace_reasoning(reasoning_content):
@@ -367,6 +405,12 @@ class MaisakaReasoningEngine:
break break
finally: finally:
self._end_cycle(cycle_detail) self._end_cycle(cycle_detail)
await emit_cycle_end(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
time_records=dict(cycle_detail.time_records),
agent_state=self._runtime._agent_state,
)
finally: finally:
if self._runtime._agent_state == self._runtime._STATE_RUNNING: if self._runtime._agent_state == self._runtime._STATE_RUNNING:
self._runtime._agent_state = self._runtime._STATE_STOP self._runtime._agent_state = self._runtime._STATE_STOP
@@ -426,6 +470,17 @@ class MaisakaReasoningEngine:
self._insert_chat_history_message(history_message) self._insert_chat_history_message(history_message)
self._trim_chat_history() self._trim_chat_history()
# 向监控前端广播新消息注入事件
user_info = message.message_info.user_info
speaker_name = user_info.user_cardname or user_info.user_nickname or user_info.user_id
await emit_message_ingested(
session_id=self._runtime.session_id,
speaker_name=speaker_name,
content=(message.processed_plain_text or "").strip(),
message_id=message.message_id,
timestamp=message.timestamp.timestamp(),
)
async def _build_history_message(self, message: SessionMessage) -> Optional[LLMContextMessage]: async def _build_history_message(self, message: SessionMessage) -> Optional[LLMContextMessage]:
"""根据真实消息构造对应的上下文消息。""" """根据真实消息构造对应的上下文消息。"""
@@ -986,7 +1041,9 @@ class MaisakaReasoningEngine:
} }
for tool_call in tool_calls: for tool_call in tool_calls:
invocation = self._build_tool_invocation(tool_call, latest_thought) invocation = self._build_tool_invocation(tool_call, latest_thought)
tool_started_at = time.time()
result = await self._runtime._tool_registry.invoke(invocation, execution_context) result = await self._runtime._tool_registry.invoke(invocation, execution_context)
tool_duration_ms = (time.time() - tool_started_at) * 1000
await self._store_tool_execution_record( await self._store_tool_execution_record(
invocation, invocation,
result, result,
@@ -995,6 +1052,18 @@ class MaisakaReasoningEngine:
self._append_tool_execution_result(tool_call, result) self._append_tool_execution_result(tool_call, result)
tool_result_summaries.append(self._build_tool_result_summary(tool_call, result)) tool_result_summaries.append(self._build_tool_result_summary(tool_call, result))
# 向监控前端广播工具执行结果
cycle_id = self._runtime._current_cycle_detail.cycle_id if self._runtime._current_cycle_detail else 0
await emit_tool_execution(
session_id=self._runtime.session_id,
cycle_id=cycle_id,
tool_name=tool_call.func_name,
tool_args=invocation.arguments if isinstance(invocation.arguments, dict) else {},
result_summary=result.content[:500] if result.content else (result.error_message or "")[:500],
success=result.success,
duration_ms=tool_duration_ms,
)
if not result.success and tool_call.func_name == "reply": if not result.success and tool_call.func_name == "reply":
logger.warning(f"{self._runtime.log_prefix} 回复工具未生成可见消息,将继续下一轮循环") logger.warning(f"{self._runtime.log_prefix} 回复工具未生成可见消息,将继续下一轮循环")

View File

@@ -14,11 +14,9 @@ def get_api_router() -> APIRouter:
def get_all_routers() -> List[APIRouter]: def get_all_routers() -> List[APIRouter]:
"""获取所有需要独立注册的路由器列表""" """获取所有需要独立注册的路由器列表"""
from src.webui.api.planner import router as planner_router
from src.webui.api.replier import router as replier_router
from src.webui.routers.chat import router as chat_router from src.webui.routers.chat import router as chat_router
from src.webui.routers.memory import compat_router as memory_compat_router
from src.webui.routers.knowledge import router as knowledge_router from src.webui.routers.knowledge import router as knowledge_router
from src.webui.routers.memory import compat_router as memory_compat_router
from src.webui.routes import router as main_router from src.webui.routes import router as main_router
return [ return [
@@ -26,8 +24,6 @@ def get_all_routers() -> List[APIRouter]:
memory_compat_router, memory_compat_router,
knowledge_router, knowledge_router,
chat_router, chat_router,
planner_router,
replier_router,
] ]

View File

@@ -1,10 +1,12 @@
"""统一 WebSocket 连接管理器。""" """统一 WebSocket 连接管理器。"""
import asyncio
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Set from typing import Any, Dict, Optional, Set
import asyncio
from fastapi import WebSocket from fastapi import WebSocket
from starlette.websockets import WebSocketState
from src.common.logger import get_logger from src.common.logger import get_logger
@@ -42,6 +44,24 @@ class UnifiedWebSocketManager:
""" """
return f"{domain}:{topic}" return f"{domain}:{topic}"
async def _close_websocket(self, connection: WebSocketConnection) -> None:
"""显式关闭底层 WebSocket 连接。
某些异常退出路径只会执行清理逻辑,但不会自动向客户端发送关闭帧。
这里主动关闭底层连接,确保浏览器能够及时感知断线并触发重连。
Args:
connection: 目标连接上下文。
"""
websocket = connection.websocket
if (
websocket.client_state == WebSocketState.DISCONNECTED
or websocket.application_state == WebSocketState.DISCONNECTED
):
return
await websocket.close()
async def _sender_loop(self, connection: WebSocketConnection) -> None: async def _sender_loop(self, connection: WebSocketConnection) -> None:
"""串行发送指定连接的出站消息。 """串行发送指定连接的出站消息。
@@ -85,6 +105,11 @@ class UnifiedWebSocketManager:
if connection is None: if connection is None:
return return
try:
await self._close_websocket(connection)
except Exception as exc:
logger.debug("关闭统一 WebSocket 底层连接时出现异常: connection=%s, error=%s", connection_id, exc)
await connection.send_queue.put(None) await connection.send_queue.put(None)
if connection.sender_task is not None: if connection.sender_task is not None:
try: try:

View File

@@ -1,6 +1,7 @@
"""统一 WebSocket 路由。""" """统一 WebSocket 路由。"""
from typing import Any, Dict, Optional, Set, cast from typing import Any, Dict, Optional, Set, cast
import asyncio import asyncio
import time import time
import uuid import uuid
@@ -140,6 +141,26 @@ async def _handle_plugin_progress_subscribe(connection_id: str, request_id: Opti
) )
async def _handle_maisaka_monitor_subscribe(connection_id: str, request_id: Optional[str]) -> None:
"""处理 MaiSaka 监控域订阅请求。
Args:
connection_id: 连接 ID。
request_id: 请求 ID。
"""
logger.info(
f"MaiSaka 监控订阅请求: connection_id={connection_id} "
f"manager_id={id(websocket_manager)}"
)
websocket_manager.subscribe(connection_id, domain="maisaka_monitor", topic="main")
await websocket_manager.send_response(
connection_id,
request_id=request_id,
ok=True,
data={"domain": "maisaka_monitor", "topic": "main"},
)
async def _handle_subscribe(connection_id: str, message: Dict[str, Any]) -> None: async def _handle_subscribe(connection_id: str, message: Dict[str, Any]) -> None:
"""处理主题订阅请求。 """处理主题订阅请求。
@@ -160,6 +181,10 @@ async def _handle_subscribe(connection_id: str, message: Dict[str, Any]) -> None
await _handle_plugin_progress_subscribe(connection_id, request_id) await _handle_plugin_progress_subscribe(connection_id, request_id)
return return
if domain == "maisaka_monitor" and topic == "main":
await _handle_maisaka_monitor_subscribe(connection_id, request_id)
return
await websocket_manager.send_response( await websocket_manager.send_response(
connection_id, connection_id,
request_id=request_id, request_id=request_id,
@@ -541,8 +566,16 @@ async def websocket_endpoint(websocket: WebSocket, token: Optional[str] = Query(
await handle_client_message(connection_id, cast(Dict[str, Any], raw_message)) await handle_client_message(connection_id, cast(Dict[str, Any], raw_message))
except WebSocketDisconnect: except WebSocketDisconnect:
logger.info("统一 WebSocket 客户端已断开: connection=%s", connection_id) logger.info("统一 WebSocket 客户端已断开: connection=%s", connection_id)
except asyncio.CancelledError:
logger.warning("统一 WebSocket 连接处理被取消: connection=%s", connection_id)
raise
except Exception as exc: except Exception as exc:
logger.error(f"统一 WebSocket 处理失败: {exc}") logger.error("统一 WebSocket 处理失败: connection=%s, error=%s", connection_id, exc, exc_info=True)
finally: finally:
chat_manager.disconnect_connection(connection_id) chat_manager.disconnect_connection(connection_id)
await websocket_manager.disconnect(connection_id) await websocket_manager.disconnect(connection_id)
logger.info(
"统一 WebSocket 连接清理完成: connection=%s, 剩余连接=%s",
connection_id,
len(websocket_manager.connections),
)