diff --git a/dashboard/src/lib/chat-ws-client.ts b/dashboard/src/lib/chat-ws-client.ts new file mode 100644 index 00000000..88e6fa7c --- /dev/null +++ b/dashboard/src/lib/chat-ws-client.ts @@ -0,0 +1,161 @@ +import { unifiedWsClient, type ConnectionStatus } from './unified-ws' + +interface ChatSessionOpenPayload { + group_id?: string + group_name?: string + person_id?: string + platform?: string + user_id?: string + user_name?: string +} + +type ChatSessionListener = (message: Record) => void + +class ChatWsClient { + private initialized = false + private listeners: Map> = new Map() + private sessionPayloads: Map = new Map() + + private initialize(): void { + if (this.initialized) { + return + } + + unifiedWsClient.addEventListener((message) => { + if (message.domain !== 'chat' || !message.session) { + return + } + + const sessionListeners = this.listeners.get(message.session) + if (!sessionListeners) { + return + } + + sessionListeners.forEach((listener) => { + try { + listener(message.data) + } catch (error) { + console.error('聊天会话监听器执行失败:', error) + } + }) + }) + + unifiedWsClient.onReconnect(() => { + void this.reopenSessions() + }) + + this.initialized = true + } + + private async reopenSessions(): Promise { + const reopenTargets = Array.from(this.sessionPayloads.entries()) + for (const [sessionId, payload] of reopenTargets) { + try { + await unifiedWsClient.call({ + domain: 'chat', + method: 'session.open', + session: sessionId, + data: { + ...payload, + restore: true, + } as Record, + }) + } catch (error) { + console.error(`恢复聊天会话失败 (${sessionId}):`, error) + } + } + } + + async openSession(sessionId: string, payload: ChatSessionOpenPayload): Promise { + this.initialize() + this.sessionPayloads.set(sessionId, payload) + await unifiedWsClient.call({ + domain: 'chat', + method: 'session.open', + session: sessionId, + data: payload as Record, + }) + } + + async closeSession(sessionId: string): Promise { + this.sessionPayloads.delete(sessionId) + if (unifiedWsClient.getStatus() !== 'connected') { + return + } + + try { + await unifiedWsClient.call({ + domain: 'chat', + method: 'session.close', + session: sessionId, + data: {}, + }) + } catch (error) { + console.warn(`关闭聊天会话失败 (${sessionId}):`, error) + } + } + + async sendMessage(sessionId: string, content: string, userName: string): Promise { + await unifiedWsClient.call({ + domain: 'chat', + method: 'message.send', + session: sessionId, + data: { + content, + user_name: userName, + }, + }) + } + + async updateNickname(sessionId: string, userName: string): Promise { + const currentPayload = this.sessionPayloads.get(sessionId) + if (currentPayload) { + this.sessionPayloads.set(sessionId, { + ...currentPayload, + user_name: userName, + }) + } + + await unifiedWsClient.call({ + domain: 'chat', + method: 'session.update_nickname', + session: sessionId, + data: { + user_name: userName, + }, + }) + } + + onSessionMessage(sessionId: string, listener: ChatSessionListener): () => void { + this.initialize() + const sessionListeners = this.listeners.get(sessionId) ?? new Set() + sessionListeners.add(listener) + this.listeners.set(sessionId, sessionListeners) + + return () => { + const currentListeners = this.listeners.get(sessionId) + if (!currentListeners) { + return + } + + currentListeners.delete(listener) + if (currentListeners.size === 0) { + this.listeners.delete(sessionId) + } + } + } + + onConnectionChange(listener: (connected: boolean) => void): () => void { + return unifiedWsClient.onConnectionChange(listener) + } + + onStatusChange(listener: (status: ConnectionStatus) => void): () => void { + return unifiedWsClient.onStatusChange(listener) + } + + async restart(): Promise { + await unifiedWsClient.restart() + } +} + +export const chatWsClient = new ChatWsClient() diff --git a/dashboard/src/lib/log-websocket.ts b/dashboard/src/lib/log-websocket.ts index abf9e7f7..b7645368 100644 --- a/dashboard/src/lib/log-websocket.ts +++ b/dashboard/src/lib/log-websocket.ts @@ -1,13 +1,11 @@ /** * 全局日志 WebSocket 管理器 - * 确保整个应用只有一个 WebSocket 连接 + * 确保整个应用只通过统一连接层订阅日志流 */ import { checkAuthStatus } from './fetch-with-auth' import { getSetting } from './settings-manager' -import { createReconnectingWebSocket } from './ws-utils' - -import { getWsBaseUrl } from '@/lib/api-base' +import { unifiedWsClient } from './unified-ws' export interface LogEntry { id: string @@ -17,165 +15,79 @@ export interface LogEntry { message: string } -type LogCallback = (log: LogEntry) => void +type LogCallback = () => void type ConnectionCallback = (connected: boolean) => void class LogWebSocketManager { - private wsControl: ReturnType | null = null - - // 订阅者 - private logCallbacks: Set = new Set() private connectionCallbacks: Set = new Set() - + private initialized = false private isConnected = false - - // 日志缓存 - 保存所有接收到的日志 private logCache: LogEntry[] = [] + private logCallbacks: Set = new Set() + private subscriptionActive = false - /** - * 获取最大缓存大小(从设置读取) - */ private getMaxCacheSize(): number { return getSetting('logCacheSize') } - /** - * 获取最大重连次数(从设置读取) - */ - private getMaxReconnectAttempts(): number { - return getSetting('wsMaxReconnectAttempts') - } - - /** - * 获取重连间隔(从设置读取) - */ - private getReconnectInterval(): number { - return getSetting('wsReconnectInterval') - } - - /** - * 获取 WebSocket URL(不含 token 参数) - */ - private async getWebSocketUrl(): Promise { - const wsBase = await getWsBaseUrl() - return `${wsBase}/ws/logs` - } - - /** - * 连接 WebSocket(会先检查登录状态) - */ - async connect() { - // 检查是否在登录页面 - if (window.location.pathname === '/auth') { - console.log('📡 在登录页面,跳过 WebSocket 连接') + private initialize(): void { + if (this.initialized) { return } - // 检查登录状态,避免未登录时尝试连接 - const isAuthenticated = await checkAuthStatus() - if (!isAuthenticated) { - console.log('📡 未登录,跳过 WebSocket 连接') - return - } + unifiedWsClient.addEventListener((message) => { + if (message.domain !== 'logs') { + return + } - const wsUrl = await this.getWebSocketUrl() + if (message.event === 'snapshot') { + const entries = Array.isArray(message.data.entries) + ? (message.data.entries as LogEntry[]) + : [] + this.logCache = entries.slice(-this.getMaxCacheSize()) + this.notifyLogChange() + return + } - // 使用 ws-utils 创建 WebSocket - this.wsControl = createReconnectingWebSocket(wsUrl, { - onMessage: (data: string) => { - try { - const log: LogEntry = JSON.parse(data) - this.notifyLog(log) - } catch (error) { - console.error('解析日志消息失败:', error) - } - }, - onOpen: () => { - this.isConnected = true - this.notifyConnection(true) - }, - onClose: () => { - this.isConnected = false - this.notifyConnection(false) - }, - onError: (error) => { - console.error('❌ WebSocket 错误:', error) - this.isConnected = false - this.notifyConnection(false) - }, - heartbeatInterval: 30000, - maxRetries: this.getMaxReconnectAttempts(), - backoffBase: this.getReconnectInterval(), - maxBackoff: 30000, + if (message.event === 'entry' && message.data.entry) { + this.appendLog(message.data.entry as LogEntry) + } }) - // 启动连接 - await this.wsControl.connect() + unifiedWsClient.onConnectionChange((connected) => { + this.isConnected = connected + this.notifyConnection(connected) + }) + + this.initialized = true } - /** - * 断开连接 - */ - disconnect() { - if (this.wsControl) { - this.wsControl.disconnect() - this.wsControl = null - } - - this.isConnected = false - } - - /** - * 订阅日志消息 - */ - onLog(callback: LogCallback) { - this.logCallbacks.add(callback) - return () => this.logCallbacks.delete(callback) - } - - /** - * 订阅连接状态 - */ - onConnectionChange(callback: ConnectionCallback) { - this.connectionCallbacks.add(callback) - // 立即通知当前状态 - callback(this.isConnected) - return () => this.connectionCallbacks.delete(callback) - } - - /** - * 通知所有订阅者新日志 - */ - private notifyLog(log: LogEntry) { - // 检查是否已存在(通过 id 去重) + private appendLog(log: LogEntry): void { const exists = this.logCache.some(existingLog => existingLog.id === log.id) - - if (!exists) { - // 添加到缓存 - this.logCache.push(log) - - // 限制缓存大小(动态读取配置) - const maxCacheSize = this.getMaxCacheSize() - if (this.logCache.length > maxCacheSize) { - this.logCache = this.logCache.slice(-maxCacheSize) - } - - // 只有新日志才通知订阅者 - this.logCallbacks.forEach(callback => { - try { - callback(log) - } catch (error) { - console.error('日志回调执行失败:', error) - } - }) + if (exists) { + return } + + this.logCache.push(log) + const maxCacheSize = this.getMaxCacheSize() + if (this.logCache.length > maxCacheSize) { + this.logCache = this.logCache.slice(-maxCacheSize) + } + this.notifyLogChange() } - /** - * 通知所有订阅者连接状态变化 - */ - private notifyConnection(connected: boolean) { - this.connectionCallbacks.forEach(callback => { + private notifyLogChange(): void { + this.logCallbacks.forEach((callback) => { + try { + callback() + } catch (error) { + console.error('日志回调执行失败:', error) + } + }) + } + + private notifyConnection(connected: boolean): void { + this.connectionCallbacks.forEach((callback) => { try { callback(connected) } catch (error) { @@ -184,35 +96,65 @@ class LogWebSocketManager { }) } - /** - * 获取缓存的所有日志 - */ + async connect(): Promise { + if (window.location.pathname === '/auth') { + return + } + + const isAuthenticated = await checkAuthStatus() + if (!isAuthenticated) { + return + } + + this.initialize() + if (this.subscriptionActive) { + return + } + + try { + await unifiedWsClient.subscribe('logs', 'main', { replay: 100 }) + this.subscriptionActive = true + } catch (error) { + console.error('订阅日志流失败:', error) + } + } + + disconnect(): void { + this.subscriptionActive = false + void unifiedWsClient.unsubscribe('logs', 'main') + this.isConnected = false + this.notifyConnection(false) + } + + onLog(callback: LogCallback): () => void { + this.logCallbacks.add(callback) + return () => this.logCallbacks.delete(callback) + } + + onConnectionChange(callback: ConnectionCallback): () => void { + this.connectionCallbacks.add(callback) + callback(this.isConnected) + return () => this.connectionCallbacks.delete(callback) + } + getAllLogs(): LogEntry[] { return [...this.logCache] } - /** - * 清空日志缓存 - */ - clearLogs() { + clearLogs(): void { this.logCache = [] + this.notifyLogChange() } - /** - * 获取当前连接状态 - */ getConnectionStatus(): boolean { return this.isConnected } } -// 导出单例 export const logWebSocket = new LogWebSocketManager() -// 自动连接(应用启动时) if (typeof window !== 'undefined') { - // 延迟一下确保页面加载完成 setTimeout(() => { - logWebSocket.connect() + void logWebSocket.connect() }, 100) } diff --git a/dashboard/src/lib/plugin-api/marketplace.ts b/dashboard/src/lib/plugin-api/marketplace.ts index 82b8e9c7..a7054088 100644 --- a/dashboard/src/lib/plugin-api/marketplace.ts +++ b/dashboard/src/lib/plugin-api/marketplace.ts @@ -1,9 +1,9 @@ import type { ApiResponse } from '@/types/api' import type { PluginInfo } from '@/types/plugin' -import { getWsBaseUrl } from '@/lib/api-base' import { fetchWithAuth } from '@/lib/fetch-with-auth' import { parseResponse } from '@/lib/api-helpers' +import { pluginProgressClient } from '@/lib/plugin-progress-client' import type { GitStatus, MaimaiVersion } from './types' /** @@ -211,41 +211,13 @@ export function isPluginCompatible( */ export async function connectPluginProgressWebSocket( onProgress: (progress: import('./types').PluginLoadProgress) => void, - onError?: (error: Event) => void -): Promise { - const wsBase = await getWsBaseUrl() - const wsUrl = `${wsBase}/api/webui/ws/plugin-progress` - - // 使用 ws-utils 创建 WebSocket - const { createReconnectingWebSocket } = await import('@/lib/ws-utils') - const wsControl = createReconnectingWebSocket(wsUrl, { - onMessage: (data: string) => { - try { - const progressData = JSON.parse(data) as import('./types').PluginLoadProgress - onProgress(progressData) - } catch (error) { - console.error('Failed to parse progress data:', error) - } - }, - onOpen: () => { - console.log('Plugin progress WebSocket connected') - }, - onClose: () => { - console.log('Plugin progress WebSocket disconnected') - }, - onError: (error) => { - console.error('Plugin progress WebSocket error:', error) - onError?.(error) - }, - heartbeatInterval: 30000, - maxRetries: 10, - backoffBase: 1000, - maxBackoff: 30000, - }) - - // 启动连接 - await wsControl.connect() - - // 返回 WebSocket 实例(用于外部检查连接状态) - return wsControl.getWebSocket() + onError?: (error: Error) => void +): Promise<() => Promise> { + try { + return await pluginProgressClient.subscribe(onProgress) + } catch (error) { + const normalizedError = error instanceof Error ? error : new Error('插件进度订阅失败') + onError?.(normalizedError) + return async () => {} + } } diff --git a/dashboard/src/lib/plugin-progress-client.ts b/dashboard/src/lib/plugin-progress-client.ts new file mode 100644 index 00000000..2aea339d --- /dev/null +++ b/dashboard/src/lib/plugin-progress-client.ts @@ -0,0 +1,58 @@ +import type { PluginLoadProgress } from '@/lib/plugin-api/types' + +import { unifiedWsClient } from './unified-ws' + +type ProgressListener = (progress: PluginLoadProgress) => void + +class PluginProgressClient { + private initialized = false + private listeners: Set = new Set() + private subscriptionActive = false + + private initialize(): void { + if (this.initialized) { + return + } + + unifiedWsClient.addEventListener((message) => { + if (message.domain !== 'plugin_progress') { + return + } + + const progress = message.data.progress as PluginLoadProgress | undefined + if (!progress) { + return + } + + this.listeners.forEach((listener) => { + try { + listener(progress) + } catch (error) { + console.error('插件进度监听器执行失败:', error) + } + }) + }) + + this.initialized = true + } + + async subscribe(listener: ProgressListener): Promise<() => Promise> { + this.initialize() + this.listeners.add(listener) + + if (!this.subscriptionActive) { + await unifiedWsClient.subscribe('plugin_progress', 'main') + this.subscriptionActive = true + } + + return async () => { + this.listeners.delete(listener) + if (this.listeners.size === 0 && this.subscriptionActive) { + this.subscriptionActive = false + await unifiedWsClient.unsubscribe('plugin_progress', 'main') + } + } + } +} + +export const pluginProgressClient = new PluginProgressClient() diff --git a/dashboard/src/lib/unified-ws.ts b/dashboard/src/lib/unified-ws.ts new file mode 100644 index 00000000..3cdafce0 --- /dev/null +++ b/dashboard/src/lib/unified-ws.ts @@ -0,0 +1,495 @@ +import { fetchWithAuth } from './fetch-with-auth' +import { getSetting } from './settings-manager' + +import { getWsBaseUrl } from '@/lib/api-base' + +export type ConnectionStatus = 'idle' | 'connecting' | 'connected' + +export interface WsErrorPayload { + code?: string + message: string +} + +export interface WsEventEnvelope { + op: 'event' + domain: string + event: string + session?: string + topic?: string + data: Record +} + +interface WsResponseEnvelope { + op: 'response' + id?: string + ok: boolean + data?: Record + error?: WsErrorPayload +} + +interface WsPongEnvelope { + op: 'pong' + ts: number +} + +type WsServerEnvelope = WsEventEnvelope | WsPongEnvelope | WsResponseEnvelope + +interface PendingRequest { + reject: (error: Error) => void + resolve: (data: Record) => void + timeoutId: number +} + +interface SubscriptionDefinition { + data?: Record + domain: string + topic: string +} + +type EventListener = (message: WsEventEnvelope) => void +type ConnectionListener = (connected: boolean) => void +type StatusListener = (status: ConnectionStatus) => void +type ReconnectListener = () => void + +function isResponseEnvelope(message: WsServerEnvelope): message is WsResponseEnvelope { + return message.op === 'response' +} + +function isEventEnvelope(message: WsServerEnvelope): message is WsEventEnvelope { + return message.op === 'event' +} + +async function getWsToken(): Promise { + try { + const response = await fetchWithAuth('/api/webui/ws-token', { + method: 'GET', + credentials: 'include', + }) + + if (!response.ok) { + return null + } + + const data = await response.json() + if (data.success && data.token) { + return data.token as string + } + + return null + } catch (error) { + console.error('获取统一 WebSocket token 失败:', error) + return null + } +} + +class UnifiedWebSocketClient { + private connectPromise: Promise | null = null + private connectionListeners: Set = new Set() + private eventListeners: Set = new Set() + private hasConnectedOnce = false + private heartbeatIntervalId: number | null = null + private manualDisconnect = false + private pendingRequests: Map = new Map() + private reconnectAttempts = 0 + private reconnectListeners: Set = new Set() + private reconnectTimeout: number | null = null + private requestCounter = 0 + private status: ConnectionStatus = 'idle' + private statusListeners: Set = new Set() + private subscriptions: Map = new Map() + private ws: WebSocket | null = null + + private getReconnectDelay(): number { + const baseDelay = getSetting('wsReconnectInterval') + return Math.min(baseDelay * Math.max(this.reconnectAttempts, 1), 30000) + } + + private getMaxReconnectAttempts(): number { + return getSetting('wsMaxReconnectAttempts') + } + + private getSubscriptionKey(domain: string, topic: string): string { + return `${domain}:${topic}` + } + + private nextRequestId(): string { + this.requestCounter += 1 + return `ws-${Date.now()}-${this.requestCounter}` + } + + private setStatus(status: ConnectionStatus): void { + if (this.status === status) { + return + } + + this.status = status + this.statusListeners.forEach((listener) => { + try { + listener(status) + } catch (error) { + console.error('WebSocket 状态监听器执行失败:', error) + } + }) + + const connected = status === 'connected' + this.connectionListeners.forEach((listener) => { + try { + listener(connected) + } catch (error) { + console.error('WebSocket 连接监听器执行失败:', error) + } + }) + } + + private stopHeartbeat(): void { + if (this.heartbeatIntervalId !== null) { + clearInterval(this.heartbeatIntervalId) + this.heartbeatIntervalId = null + } + } + + private startHeartbeat(): void { + this.stopHeartbeat() + this.heartbeatIntervalId = window.setInterval(() => { + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify({ op: 'ping' })) + } + }, 30000) + } + + private clearReconnectTimer(): void { + if (this.reconnectTimeout !== null) { + clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = null + } + } + + private rejectPendingRequests(error: Error): void { + this.pendingRequests.forEach((pendingRequest, requestId) => { + clearTimeout(pendingRequest.timeoutId) + pendingRequest.reject(error) + this.pendingRequests.delete(requestId) + }) + } + + private scheduleReconnect(): void { + if (this.manualDisconnect) { + return + } + + if (this.reconnectAttempts >= this.getMaxReconnectAttempts()) { + console.warn(`统一 WebSocket 达到最大重连次数 (${this.getMaxReconnectAttempts()}),停止重连`) + return + } + + this.reconnectAttempts += 1 + const delay = this.getReconnectDelay() + this.clearReconnectTimer() + this.reconnectTimeout = window.setTimeout(() => { + void this.connect().catch((error) => { + console.error('统一 WebSocket 重连失败:', error) + }) + }, delay) + } + + private async createWebSocketUrl(): Promise { + const wsBaseUrl = await getWsBaseUrl() + const wsToken = await getWsToken() + if (!wsBaseUrl || !wsToken) { + return null + } + return `${wsBaseUrl}/api/webui/ws?token=${encodeURIComponent(wsToken)}` + } + + private async sendRequest( + payload: Record, + timeoutMs = 10000, + ): Promise> { + if (this.ws?.readyState !== WebSocket.OPEN) { + throw new Error('统一 WebSocket 尚未连接') + } + + const requestId = payload.id as string + return await new Promise>((resolve, reject) => { + const timeoutId = window.setTimeout(() => { + this.pendingRequests.delete(requestId) + reject(new Error(`统一 WebSocket 请求超时: ${requestId}`)) + }, timeoutMs) + + this.pendingRequests.set(requestId, { + resolve, + reject, + timeoutId, + }) + this.ws?.send(JSON.stringify(payload)) + }) + } + + private async restoreState(shouldNotifyReconnect: boolean): Promise { + const subscriptions = Array.from(this.subscriptions.values()) + for (const subscription of subscriptions) { + try { + await this.sendRequest({ + op: 'subscribe', + id: this.nextRequestId(), + domain: subscription.domain, + topic: subscription.topic, + data: subscription.data ?? {}, + }) + } catch (error) { + console.error('恢复统一 WebSocket 订阅失败:', error) + } + } + + if (shouldNotifyReconnect) { + this.reconnectListeners.forEach((listener) => { + try { + listener() + } catch (error) { + console.error('统一 WebSocket 重连监听器执行失败:', error) + } + }) + } + } + + private handleServerMessage(rawData: string): void { + let message: WsServerEnvelope + try { + message = JSON.parse(rawData) as WsServerEnvelope + } catch (error) { + console.error('解析统一 WebSocket 消息失败:', error) + return + } + + if (message.op === 'pong') { + return + } + + if (isResponseEnvelope(message)) { + const requestId = message.id + if (!requestId) { + return + } + + const pendingRequest = this.pendingRequests.get(requestId) + if (!pendingRequest) { + return + } + + clearTimeout(pendingRequest.timeoutId) + this.pendingRequests.delete(requestId) + if (message.ok) { + pendingRequest.resolve(message.data ?? {}) + } else { + pendingRequest.reject(new Error(message.error?.message ?? '统一 WebSocket 请求失败')) + } + return + } + + if (isEventEnvelope(message)) { + this.eventListeners.forEach((listener) => { + try { + listener(message) + } catch (error) { + console.error('统一 WebSocket 事件监听器执行失败:', error) + } + }) + } + } + + private handleClose(event: CloseEvent): void { + this.stopHeartbeat() + this.ws = null + this.connectPromise = null + this.setStatus('idle') + this.rejectPendingRequests(new Error(`统一 WebSocket 已关闭 (${event.code})`)) + + if (event.code === 4001) { + this.manualDisconnect = true + if (window.location.pathname !== '/auth') { + window.location.href = '/auth' + } + return + } + + this.scheduleReconnect() + } + + async connect(): Promise { + if (this.ws?.readyState === WebSocket.OPEN) { + return + } + + if (this.connectPromise) { + return await this.connectPromise + } + + this.manualDisconnect = false + this.setStatus('connecting') + + this.connectPromise = (async () => { + const wsUrl = await this.createWebSocketUrl() + if (!wsUrl) { + this.setStatus('idle') + throw new Error('无法建立统一 WebSocket 连接') + } + + await new Promise((resolve, reject) => { + let settled = false + const socket = new WebSocket(wsUrl) + this.ws = socket + + socket.onopen = () => { + settled = true + const shouldNotifyReconnect = this.hasConnectedOnce + this.hasConnectedOnce = true + this.reconnectAttempts = 0 + this.startHeartbeat() + this.setStatus('connected') + resolve() + void this.restoreState(shouldNotifyReconnect) + } + + socket.onmessage = (event) => { + this.handleServerMessage(event.data) + } + + socket.onerror = () => { + if (!settled) { + settled = true + reject(new Error('统一 WebSocket 连接失败')) + } + } + + socket.onclose = (event) => { + if (!settled) { + settled = true + reject(new Error(`统一 WebSocket 已关闭 (${event.code})`)) + } + this.handleClose(event) + } + }) + })() + + try { + await this.connectPromise + } finally { + if (this.status !== 'connected') { + this.connectPromise = null + } + } + } + + disconnect(): void { + this.manualDisconnect = true + this.clearReconnectTimer() + this.stopHeartbeat() + this.rejectPendingRequests(new Error('统一 WebSocket 已手动断开')) + this.connectPromise = null + if (this.ws) { + this.ws.close() + this.ws = null + } + this.setStatus('idle') + } + + async restart(): Promise { + this.manualDisconnect = false + this.clearReconnectTimer() + if (this.ws) { + this.ws.close() + return + } + await this.connect() + } + + async call(params: { + data?: Record + domain: string + method: string + session?: string + }): Promise> { + await this.connect() + const requestId = this.nextRequestId() + return await this.sendRequest({ + op: 'call', + id: requestId, + domain: params.domain, + method: params.method, + session: params.session, + data: params.data ?? {}, + }) + } + + async subscribe( + domain: string, + topic: string, + data?: Record, + ): Promise> { + await this.connect() + this.subscriptions.set(this.getSubscriptionKey(domain, topic), { + domain, + topic, + data, + }) + + return await this.sendRequest({ + op: 'subscribe', + id: this.nextRequestId(), + domain, + topic, + data: data ?? {}, + }) + } + + async unsubscribe(domain: string, topic: string): Promise | null> { + this.subscriptions.delete(this.getSubscriptionKey(domain, topic)) + if (this.ws?.readyState !== WebSocket.OPEN) { + return null + } + + return await this.sendRequest({ + op: 'unsubscribe', + id: this.nextRequestId(), + domain, + topic, + data: {}, + }) + } + + addEventListener(listener: EventListener): () => void { + this.eventListeners.add(listener) + return () => { + this.eventListeners.delete(listener) + } + } + + onConnectionChange(listener: ConnectionListener): () => void { + this.connectionListeners.add(listener) + listener(this.status === 'connected') + return () => { + this.connectionListeners.delete(listener) + } + } + + onStatusChange(listener: StatusListener): () => void { + this.statusListeners.add(listener) + listener(this.status) + return () => { + this.statusListeners.delete(listener) + } + } + + onReconnect(listener: ReconnectListener): () => void { + this.reconnectListeners.add(listener) + return () => { + this.reconnectListeners.delete(listener) + } + } + + getStatus(): ConnectionStatus { + return this.status + } +} + +export const unifiedWsClient = new UnifiedWebSocketClient() diff --git a/dashboard/src/lib/ws-utils.ts b/dashboard/src/lib/ws-utils.ts deleted file mode 100644 index 3d3b3240..00000000 --- a/dashboard/src/lib/ws-utils.ts +++ /dev/null @@ -1,211 +0,0 @@ -import { fetchWithAuth } from './fetch-with-auth' - -/** - * WebSocket 配置选项 - */ -export interface WebSocketOptions { - onMessage?: (data: string) => void - onOpen?: () => void - onClose?: () => void - onError?: (error: Event) => void - heartbeatInterval?: number // 心跳间隔(毫秒) - maxRetries?: number // 最大重连次数 - backoffBase?: number // 重连基础间隔(毫秒) - maxBackoff?: number // 最大重连间隔(毫秒) -} - -/** - * 获取 WebSocket 临时认证 token - */ -export async function getWsToken(): Promise { - try { - // 使用相对路径,让前端代理处理请求,避免 CORS 问题 - const response = await fetchWithAuth('/api/webui/ws-token', { - method: 'GET', - credentials: 'include', // 携带 Cookie - }) - - if (!response.ok) { - console.error('获取 WebSocket token 失败:', response.status) - return null - } - - const data = await response.json() - if (data.success && data.token) { - return data.token - } - return null - } catch (error) { - console.error('获取 WebSocket token 失败:', error) - return null - } -} - -/** - * 创建带重连、心跳的 WebSocket 封装 - * - * @param url WebSocket URL(不含 token 参数) - * @param options 配置选项 - * @returns WebSocket 控制对象,包含 connect、disconnect、send 方法 - */ -export function createReconnectingWebSocket( - url: string, - options: WebSocketOptions = {} -) { - const { - onMessage, - onOpen, - onClose, - onError, - heartbeatInterval = 30000, - maxRetries = 10, - backoffBase = 1000, - maxBackoff = 30000, - } = options - - let ws: WebSocket | null = null - let reconnectTimeout: number | null = null - let reconnectAttempts = 0 - let heartbeatIntervalId: number | null = null - let isManualDisconnect = false - - /** - * 启动心跳 - */ - function startHeartbeat() { - stopHeartbeat() - heartbeatIntervalId = window.setInterval(() => { - if (ws?.readyState === WebSocket.OPEN) { - ws.send('ping') - } - }, heartbeatInterval) - } - - /** - * 停止心跳 - */ - function stopHeartbeat() { - if (heartbeatIntervalId !== null) { - clearInterval(heartbeatIntervalId) - heartbeatIntervalId = null - } - } - - /** - * 尝试重连 - */ - function attemptReconnect() { - if (isManualDisconnect) { - return - } - - if (reconnectAttempts >= maxRetries) { - console.warn(`WebSocket 达到最大重连次数 (${maxRetries}),停止重连`) - return - } - - reconnectAttempts += 1 - const delay = Math.min(backoffBase * reconnectAttempts, maxBackoff) - - console.log(`WebSocket 将在 ${delay}ms 后重连(第 ${reconnectAttempts} 次)`) - reconnectTimeout = window.setTimeout(() => { - connect() - }, delay) - } - - /** - * 连接 WebSocket - */ - async function connect() { - if (ws?.readyState === WebSocket.OPEN || ws?.readyState === WebSocket.CONNECTING) { - return - } - - // 先获取临时认证 token - const wsToken = await getWsToken() - if (!wsToken) { - console.warn('无法获取 WebSocket token,跳过连接') - return - } - - const wsUrl = `${url}?token=${encodeURIComponent(wsToken)}` - - try { - ws = new WebSocket(wsUrl) - - ws.onopen = () => { - reconnectAttempts = 0 - startHeartbeat() - onOpen?.() - } - - ws.onmessage = (event) => { - // 忽略心跳响应 - if (event.data === 'pong') { - return - } - onMessage?.(event.data) - } - - ws.onerror = (error) => { - console.error('WebSocket 错误:', error) - onError?.(error) - } - - ws.onclose = () => { - stopHeartbeat() - onClose?.() - attemptReconnect() - } - } catch (error) { - console.error('创建 WebSocket 连接失败:', error) - attemptReconnect() - } - } - - /** - * 断开连接 - */ - function disconnect() { - isManualDisconnect = true - - if (reconnectTimeout !== null) { - clearTimeout(reconnectTimeout) - reconnectTimeout = null - } - - stopHeartbeat() - - if (ws) { - ws.close() - ws = null - } - - reconnectAttempts = 0 - } - - /** - * 发送消息 - */ - function send(data: string) { - if (ws?.readyState === WebSocket.OPEN) { - ws.send(data) - } else { - console.warn('WebSocket 未连接,无法发送消息') - } - } - - /** - * 获取当前 WebSocket 实例 - */ - function getWebSocket(): WebSocket | null { - return ws - } - - return { - connect, - disconnect, - send, - getWebSocket, - } -} diff --git a/dashboard/src/routes/chat/index.tsx b/dashboard/src/routes/chat/index.tsx index 5d7044a6..924264f2 100644 --- a/dashboard/src/routes/chat/index.tsx +++ b/dashboard/src/routes/chat/index.tsx @@ -5,7 +5,7 @@ import { Button } from '@/components/ui/button' import { Input } from '@/components/ui/input' import { ScrollArea } from '@/components/ui/scroll-area' import { useToast } from '@/hooks/use-toast' -import { getWsBaseUrl } from '@/lib/api-base' +import { chatWsClient } from '@/lib/chat-ws-client' import { fetchWithAuth } from '@/lib/fetch-with-auth' import { cn } from '@/lib/utils' import { Bot, Edit2, Loader2, RefreshCw, User, Send, Wifi, WifiOff, UserCircle2 } from 'lucide-react' @@ -85,14 +85,17 @@ export function ChatPage() { // 持久化用户 ID const userIdRef = useRef(getOrCreateUserId()) - // 每个标签页的 WebSocket 连接 - const wsMapRef = useRef>(new Map()) const messagesEndRef = useRef(null) - const reconnectTimeoutMapRef = useRef>(new Map()) const messageIdCounterRef = useRef(0) const processedMessagesMapRef = useRef>>(new Map()) + const sessionUnsubscribeMapRef = useRef void>>(new Map()) + const tabsRef = useRef([]) const { toast } = useToast() + useEffect(() => { + tabsRef.current = tabs + }, [tabs]) + // 生成唯一消息 ID const generateMessageId = (prefix: string) => { messageIdCounterRef.current += 1 @@ -197,357 +200,218 @@ export function ChatPage() { } }, [tempVirtualConfig.platform, personSearchQuery, fetchPersons]) - // 加载聊天历史到指定标签页 - const loadChatHistoryForTab = useCallback(async (tabId: string, groupId?: string) => { + const handleSessionMessage = useCallback(( + tabId: string, + tabType: 'webui' | 'virtual', + config: VirtualIdentityConfig | undefined, + data: WsMessage, + ) => { + switch (data.type) { + case 'session_info': + updateTab(tabId, { + sessionInfo: { + session_id: data.session_id, + user_id: data.user_id, + user_name: data.user_name, + bot_name: data.bot_name, + } + }) + break + + case 'system': + addMessageToTab(tabId, { + id: generateMessageId('sys'), + type: 'system', + content: data.content || '', + timestamp: data.timestamp || Date.now() / 1000, + }) + break + + case 'user_message': { + const senderUserId = data.sender?.user_id + const currentUserId = tabType === 'virtual' && config + ? config.userId + : userIdRef.current + + const normalizeSenderId = senderUserId ? senderUserId.replace(/^webui_user_/, '') : '' + const normalizeCurrentId = currentUserId ? currentUserId.replace(/^webui_user_/, '') : '' + if (normalizeSenderId && normalizeCurrentId && normalizeSenderId === normalizeCurrentId) { + break + } + + const processedSet = processedMessagesMapRef.current.get(tabId) || new Set() + const contentHash = `user-${data.content}-${Math.floor((data.timestamp || 0) * 1000)}` + if (processedSet.has(contentHash)) { + break + } + + processedSet.add(contentHash) + processedMessagesMapRef.current.set(tabId, processedSet) + if (processedSet.size > 100) { + const firstKey = processedSet.values().next().value + if (firstKey) processedSet.delete(firstKey) + } + + addMessageToTab(tabId, { + id: data.message_id || generateMessageId('user'), + type: 'user', + content: data.content || '', + timestamp: data.timestamp || Date.now() / 1000, + sender: data.sender, + }) + break + } + + case 'bot_message': { + updateTab(tabId, { isTyping: false }) + const processedSet = processedMessagesMapRef.current.get(tabId) || new Set() + const contentHash = `bot-${data.content}-${Math.floor((data.timestamp || 0) * 1000)}` + if (processedSet.has(contentHash)) { + break + } + + processedSet.add(contentHash) + processedMessagesMapRef.current.set(tabId, processedSet) + if (processedSet.size > 100) { + const firstKey = processedSet.values().next().value + if (firstKey) processedSet.delete(firstKey) + } + + setTabs(prev => prev.map(tab => { + if (tab.id !== tabId) return tab + const filteredMessages = tab.messages.filter(msg => msg.type !== 'thinking') + const newMessage: ChatMessage = { + id: generateMessageId('bot'), + type: 'bot', + content: data.content || '', + message_type: (data.message_type === 'rich' ? 'rich' : 'text') as 'text' | 'rich', + segments: data.segments, + timestamp: data.timestamp || Date.now() / 1000, + sender: data.sender, + } + return { + ...tab, + messages: [...filteredMessages, newMessage] + } + })) + break + } + + case 'typing': + updateTab(tabId, { isTyping: data.is_typing || false }) + break + + case 'error': + setTabs(prev => prev.map(tab => { + if (tab.id !== tabId) return tab + const filteredMessages = tab.messages.filter(msg => msg.type !== 'thinking') + return { + ...tab, + messages: [...filteredMessages, { + id: generateMessageId('error'), + type: 'error' as const, + content: data.content || '发生错误', + timestamp: data.timestamp || Date.now() / 1000, + }] + } + })) + toast({ + title: '错误', + description: data.content, + variant: 'destructive', + }) + break + + case 'history': { + const historyMessages = data.messages || [] + const processedSet = new Set() + const formattedMessages: ChatMessage[] = historyMessages.map((msg: { + id?: string + content: string + timestamp: number + sender_name?: string + sender_id?: string + is_bot?: boolean + }) => { + const isBot = msg.is_bot || false + const msgId = msg.id || generateMessageId(isBot ? 'bot' : 'user') + const contentHash = `${isBot ? 'bot' : 'user'}-${msg.content}-${Math.floor(msg.timestamp * 1000)}` + processedSet.add(contentHash) + return { + id: msgId, + type: isBot ? 'bot' : 'user' as const, + content: msg.content, + timestamp: msg.timestamp, + sender: { + name: msg.sender_name || (isBot ? '麦麦' : '用户'), + user_id: msg.sender_id, + is_bot: isBot, + }, + } + }) + + processedMessagesMapRef.current.set(tabId, processedSet) + updateTab(tabId, { messages: formattedMessages }) + setIsLoadingHistory(false) + break + } + + default: + break + } + }, [addMessageToTab, toast, updateTab]) + + const ensureSessionListener = useCallback(( + tabId: string, + tabType: 'webui' | 'virtual', + config?: VirtualIdentityConfig, + ) => { + if (sessionUnsubscribeMapRef.current.has(tabId)) { + return + } + + const unsubscribe = chatWsClient.onSessionMessage(tabId, (message) => { + handleSessionMessage(tabId, tabType, config, message as unknown as WsMessage) + }) + sessionUnsubscribeMapRef.current.set(tabId, unsubscribe) + }, [handleSessionMessage]) + + const openSessionForTab = useCallback(async ( + tabId: string, + tabType: 'webui' | 'virtual', + config?: VirtualIdentityConfig, + ) => { + ensureSessionListener(tabId, tabType, config) setIsLoadingHistory(true) + try { - const params = new URLSearchParams() - params.append('user_id', userIdRef.current) - params.append('limit', '50') - if (groupId) { - params.append('group_id', groupId) + if (tabType === 'virtual' && config) { + await chatWsClient.openSession(tabId, { + user_id: config.userId, + user_name: config.userName, + platform: config.platform, + person_id: config.personId, + group_name: config.groupName || 'WebUI虚拟群聊', + group_id: config.groupId, + }) + } else { + await chatWsClient.openSession(tabId, { + user_id: userIdRef.current, + user_name: userName, + }) } - const url = `/api/chat/history?${params.toString()}` - console.log('[Chat] 正在加载历史消息:', url) - - const response = await fetchWithAuth(url) - - if (response.ok) { - const text = await response.text() - try { - const data = JSON.parse(text) - - if (data.messages && data.messages.length > 0) { - const historyMessages: ChatMessage[] = data.messages.map((msg: { - id: string - type: string - content: string - timestamp: number - sender_name?: string - user_id?: string - is_bot?: boolean - }) => ({ - id: msg.id, - type: msg.type as 'user' | 'bot' | 'system' | 'error', - content: msg.content, - timestamp: msg.timestamp, - sender: { - name: msg.sender_name || (msg.is_bot ? '麦麦' : 'WebUI用户'), - user_id: msg.user_id, - is_bot: msg.is_bot - } - })) - - // 更新标签页的消息 - updateTab(tabId, { messages: historyMessages }) - - // 将历史消息添加到去重缓存 - const processedSet = processedMessagesMapRef.current.get(tabId) || new Set() - historyMessages.forEach(msg => { - if (msg.type === 'bot') { - const contentHash = `bot-${msg.content}-${Math.floor(msg.timestamp * 1000)}` - processedSet.add(contentHash) - } - }) - processedMessagesMapRef.current.set(tabId, processedSet) - } - } catch (parseError) { - console.error('[Chat] JSON 解析失败:', parseError) - } - } - } catch (e) { - console.error('[Chat] 加载历史消息失败:', e) - } finally { - setIsLoadingHistory(false) - } - }, [updateTab]) - // 为指定标签页连接 WebSocket(异步,需要先获取认证 token) - const connectWebSocketForTab = useCallback(async (tabId: string, tabType: 'webui' | 'virtual', config?: VirtualIdentityConfig) => { - // 如果已经有连接,不要重复创建 - const existingWs = wsMapRef.current.get(tabId) - if (existingWs?.readyState === WebSocket.OPEN || - existingWs?.readyState === WebSocket.CONNECTING) { - console.log(`[Tab ${tabId}] WebSocket 已存在,跳过连接`) - return - } - - setIsConnecting(true) - - // 先获取临时 WebSocket token - let wsToken: string | null = null - try { - const tokenResponse = await fetchWithAuth('/api/webui/ws-token') - if (tokenResponse.ok) { - const tokenData = await tokenResponse.json() - if (tokenData.success && tokenData.token) { - wsToken = tokenData.token - } else { - console.warn(`[Tab ${tabId}] 获取 WebSocket token 失败: ${tokenData.message || '未登录'}`) - setIsConnecting(false) - return - } - } + updateTab(tabId, { isConnected: true }) } catch (error) { - console.error(`[Tab ${tabId}] 获取 WebSocket token 失败:`, error) - setIsConnecting(false) - return + console.error(`[Tab ${tabId}] 打开聊天会话失败:`, error) + setIsLoadingHistory(false) + toast({ + title: '连接失败', + description: '无法建立聊天会话,请稍后重试', + variant: 'destructive', + }) } - - // 此时 wsToken 一定有值(前面已经 return) - if (!wsToken) { - setIsConnecting(false) - return - } - - const wsBase = await getWsBaseUrl() - const params = new URLSearchParams() - - // 添加 token 到参数 - params.append('token', wsToken) - - if (tabType === 'virtual' && config) { - params.append('user_id', config.userId) - params.append('user_name', config.userName) - params.append('platform', config.platform) - params.append('person_id', config.personId) - params.append('group_name', config.groupName || 'WebUI虚拟群聊') - // 传递稳定的 group_id,确保历史记录能正确加载 - if (config.groupId) { - params.append('group_id', config.groupId) - } - } else { - params.append('user_id', userIdRef.current) - params.append('user_name', userName) - } - - const wsUrl = `${wsBase}/api/chat/ws?${params.toString()}` - console.log(`[Tab ${tabId}] 正在连接 WebSocket:`, wsUrl) - - try { - const ws = new WebSocket(wsUrl) - wsMapRef.current.set(tabId, ws) - - ws.onopen = () => { - updateTab(tabId, { isConnected: true }) - setIsConnecting(false) - console.log(`[Tab ${tabId}] WebSocket 已连接`) - } - - ws.onmessage = (event) => { - try { - const data: WsMessage = JSON.parse(event.data) - - switch (data.type) { - case 'session_info': - updateTab(tabId, { - sessionInfo: { - session_id: data.session_id, - user_id: data.user_id, - user_name: data.user_name, - bot_name: data.bot_name, - } - }) - break - - case 'system': - addMessageToTab(tabId, { - id: generateMessageId('sys'), - type: 'system', - content: data.content || '', - timestamp: data.timestamp || Date.now() / 1000, - }) - break - - case 'user_message': { - // 检查是否是自己发的消息(已在发送时显示,跳过广播回来的) - const senderUserId = data.sender?.user_id - const currentUserId = tabType === 'virtual' && config - ? config.userId - : userIdRef.current - - console.log(`[Tab ${tabId}] 收到 user_message, sender: ${senderUserId}, current: ${currentUserId}`) - - // 标准化 user_id(去掉可能的前缀) - const normalizeSenderId = senderUserId ? senderUserId.replace(/^webui_user_/, '') : '' - const normalizeCurrentId = currentUserId ? currentUserId.replace(/^webui_user_/, '') : '' - - // 如果是自己发的消息,跳过(避免重复显示) - if (normalizeSenderId && normalizeCurrentId && normalizeSenderId === normalizeCurrentId) { - console.log(`[Tab ${tabId}] 跳过自己的消息(user_id 匹配)`) - break - } - - // 额外的消息去重:检查内容和时间戳 - const processedSet = processedMessagesMapRef.current.get(tabId) || new Set() - const contentHash = `user-${data.content}-${Math.floor((data.timestamp || 0) * 1000)}` - if (processedSet.has(contentHash)) { - console.log(`[Tab ${tabId}] 跳过自己的消息(内容去重)`) - break - } - processedSet.add(contentHash) - processedMessagesMapRef.current.set(tabId, processedSet) - - if (processedSet.size > 100) { - const firstKey = processedSet.values().next().value - if (firstKey) processedSet.delete(firstKey) - } - - addMessageToTab(tabId, { - id: data.message_id || generateMessageId('user'), - type: 'user', - content: data.content || '', - timestamp: data.timestamp || Date.now() / 1000, - sender: data.sender, - }) - break - } - - case 'bot_message': { - updateTab(tabId, { isTyping: false }) - const processedSet = processedMessagesMapRef.current.get(tabId) || new Set() - const contentHash = `bot-${data.content}-${Math.floor((data.timestamp || 0) * 1000)}` - if (processedSet.has(contentHash)) { - break - } - processedSet.add(contentHash) - processedMessagesMapRef.current.set(tabId, processedSet) - - if (processedSet.size > 100) { - const firstKey = processedSet.values().next().value - if (firstKey) processedSet.delete(firstKey) - } - - // 移除"思考中"占位消息,添加真实的机器人回复 - setTabs(prev => prev.map(tab => { - if (tab.id !== tabId) return tab - // 过滤掉 thinking 类型的消息 - const filteredMessages = tab.messages.filter(msg => msg.type !== 'thinking') - const newMessage: ChatMessage = { - id: generateMessageId('bot'), - type: 'bot', - content: data.content || '', - message_type: (data.message_type === 'rich' ? 'rich' : 'text') as 'text' | 'rich', - segments: data.segments, - timestamp: data.timestamp || Date.now() / 1000, - sender: data.sender, - } - return { - ...tab, - messages: [...filteredMessages, newMessage] - } - })) - break - } - - case 'typing': - updateTab(tabId, { isTyping: data.is_typing || false }) - break - - case 'error': - // 移除"思考中"占位消息,显示错误 - setTabs(prev => prev.map(tab => { - if (tab.id !== tabId) return tab - const filteredMessages = tab.messages.filter(msg => msg.type !== 'thinking') - return { - ...tab, - messages: [...filteredMessages, { - id: generateMessageId('error'), - type: 'error' as const, - content: data.content || '发生错误', - timestamp: data.timestamp || Date.now() / 1000, - }] - } - })) - toast({ - title: '错误', - description: data.content, - variant: 'destructive', - }) - break - - case 'pong': - break - - case 'history': { - // 处理服务端发送的历史消息 - const historyMessages = data.messages || [] - if (historyMessages.length > 0) { - const processedSet = processedMessagesMapRef.current.get(tabId) || new Set() - const formattedMessages: ChatMessage[] = historyMessages.map((msg: { - id?: string - content: string - timestamp: number - sender_name?: string - sender_id?: string - is_bot?: boolean - }) => { - const isBot = msg.is_bot || false - const msgId = msg.id || generateMessageId(isBot ? 'bot' : 'user') - // 添加到去重集合 - const contentHash = `${isBot ? 'bot' : 'user'}-${msg.content}-${Math.floor(msg.timestamp * 1000)}` - processedSet.add(contentHash) - return { - id: msgId, - type: isBot ? 'bot' : 'user' as const, - content: msg.content, - timestamp: msg.timestamp, - sender: { - name: msg.sender_name || (isBot ? '麦麦' : '用户'), - user_id: msg.sender_id, - is_bot: isBot, - }, - } - }) - processedMessagesMapRef.current.set(tabId, processedSet) - // 替换当前标签页的所有消息 - updateTab(tabId, { messages: formattedMessages }) - console.log(`[Tab ${tabId}] 已加载 ${formattedMessages.length} 条历史消息`) - } - break - } - - default: - console.log('未知消息类型:', data.type) - } - } catch (e) { - console.error('解析消息失败:', e) - } - } - - ws.onclose = () => { - updateTab(tabId, { isConnected: false }) - setIsConnecting(false) - wsMapRef.current.delete(tabId) - console.log(`[Tab ${tabId}] WebSocket 已断开`) - - // 清除旧的重连定时器 - const oldTimeout = reconnectTimeoutMapRef.current.get(tabId) - if (oldTimeout) { - clearTimeout(oldTimeout) - } - - // 5秒后尝试重连 - const timeout = window.setTimeout(() => { - if (!isUnmountedRef.current) { - const tab = tabs.find(t => t.id === tabId) - if (tab) { - connectWebSocketForTab(tabId, tab.type, tab.virtualConfig) - } - } - }, 5000) - reconnectTimeoutMapRef.current.set(tabId, timeout) - } - - ws.onerror = (error) => { - console.error(`[Tab ${tabId}] WebSocket 错误:`, error) - setIsConnecting(false) - } - } catch (e) { - console.error(`[Tab ${tabId}] 创建 WebSocket 失败:`, e) - setIsConnecting(false) - } - }, [userName, updateTab, addMessageToTab, toast, tabs]) + }, [ensureSessionListener, toast, updateTab, userName]) // 用于追踪组件是否已卸载 const isUnmountedRef = useRef(false) @@ -555,69 +419,49 @@ export function ChatPage() { // 初始化连接(默认 WebUI 标签页) useEffect(() => { isUnmountedRef.current = false - - // 保存 ref 的当前值,用于清理 - const wsMap = wsMapRef.current - const reconnectTimeoutMap = reconnectTimeoutMapRef.current - const processedMessagesMap = processedMessagesMapRef.current - - // 加载默认标签页历史消息 - loadChatHistoryForTab('webui-default') - - // 延迟连接 - const connectTimer = setTimeout(() => { - if (!isUnmountedRef.current) { - connectWebSocketForTab('webui-default', 'webui') - - // 恢复的虚拟标签页也需要建立连接 - tabs.forEach(tab => { - if (tab.type === 'virtual' && tab.virtualConfig) { - // 初始化去重缓存 - processedMessagesMap.set(tab.id, new Set()) - // 建立 WebSocket 连接 - setTimeout(() => { - if (!isUnmountedRef.current) { - connectWebSocketForTab(tab.id, 'virtual', tab.virtualConfig) - } - }, 200) - } - }) - } - }, 100) - // 心跳定时器 - 向所有活动连接发送 - const heartbeat = setInterval(() => { - wsMap.forEach((ws) => { - if (ws.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify({ type: 'ping' })) - } - }) - }, 30000) + const unsubscribeConnection = chatWsClient.onConnectionChange((connected) => { + if (isUnmountedRef.current) { + return + } + + setTabs(prev => prev.map(tab => ({ + ...tab, + isConnected: connected, + }))) + }) + + const unsubscribeStatus = chatWsClient.onStatusChange((status) => { + if (!isUnmountedRef.current) { + setIsConnecting(status === 'connecting') + } + }) + + tabs.forEach(tab => { + processedMessagesMapRef.current.set(tab.id, new Set()) + void openSessionForTab(tab.id, tab.type, tab.virtualConfig) + }) return () => { isUnmountedRef.current = true - clearTimeout(connectTimer) - clearInterval(heartbeat) - - // 清理所有重连定时器 - reconnectTimeoutMap.forEach((timeout) => { - clearTimeout(timeout) + unsubscribeConnection() + unsubscribeStatus() + + sessionUnsubscribeMapRef.current.forEach((unsubscribe) => { + unsubscribe() }) - reconnectTimeoutMap.clear() - - // 关闭所有 WebSocket 连接 - wsMap.forEach((ws) => { - ws.close() + sessionUnsubscribeMapRef.current.clear() + + tabsRef.current.forEach(tab => { + void chatWsClient.closeSession(tab.id) }) - wsMap.clear() } // eslint-disable-next-line react-hooks/exhaustive-deps }, []) // 发送消息到当前活动标签页 - const sendMessage = useCallback(() => { - const ws = wsMapRef.current.get(activeTabId) - if (!inputValue.trim() || !ws || ws.readyState !== WebSocket.OPEN) { + const sendMessage = useCallback(async () => { + if (!inputValue.trim() || !activeTab?.isConnected) { return } @@ -628,12 +472,6 @@ export function ChatPage() { const messageContent = inputValue.trim() const currentTimestamp = Date.now() / 1000 - ws.send(JSON.stringify({ - type: 'message', - content: messageContent, - user_name: displayName, - })) - // 添加到去重缓存,防止服务器广播回来的消息重复显示 const processedSet = processedMessagesMapRef.current.get(activeTabId) || new Set() const contentHash = `user-${messageContent}-${Math.floor(currentTimestamp * 1000)}` @@ -672,13 +510,32 @@ export function ChatPage() { addMessageToTab(activeTabId, thinkingMessage) setInputValue('') - }, [inputValue, userName, activeTabId, activeTab, addMessageToTab]) + + try { + await chatWsClient.sendMessage(activeTabId, messageContent, displayName) + } catch (error) { + console.error('发送聊天消息失败:', error) + setTabs(prev => prev.map(tab => { + if (tab.id !== activeTabId) return tab + return { + ...tab, + isTyping: false, + messages: tab.messages.filter(msg => msg.type !== 'thinking') + } + })) + toast({ + title: '发送失败', + description: '当前聊天会话不可用,请稍后重试', + variant: 'destructive', + }) + } + }, [activeTab, activeTabId, addMessageToTab, inputValue, toast, userName]) // 处理键盘事件 const handleKeyDown = (e: React.KeyboardEvent) => { if (e.key === 'Enter' && !e.shiftKey) { e.preventDefault() - sendMessage() + void sendMessage() } } @@ -693,13 +550,9 @@ export function ChatPage() { setUserName(newName) saveUserName(newName) setIsEditingName(false) - // 通知当前标签页的后端昵称变更 - const ws = wsMapRef.current.get(activeTabId) - if (ws?.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify({ - type: 'update_nickname', - user_name: newName - })) + + if (activeTab?.isConnected) { + void chatWsClient.updateNickname(activeTabId, newName) } } @@ -719,12 +572,7 @@ export function ChatPage() { // 重新连接当前标签页 const handleReconnect = () => { - const ws = wsMapRef.current.get(activeTabId) - if (ws) { - ws.close() - wsMapRef.current.delete(activeTabId) - } - connectWebSocketForTab(activeTabId, activeTab?.type || 'webui', activeTab?.virtualConfig) + void chatWsClient.restart() } // 打开虚拟身份配置对话框(新建标签页用) @@ -795,10 +643,10 @@ export function ChatPage() { // 初始化去重缓存 processedMessagesMapRef.current.set(newTabId, new Set()) - // 连接 WebSocket - setTimeout(() => { - connectWebSocketForTab(newTabId, 'virtual', tempVirtualConfig) - }, 100) + void openSessionForTab(newTabId, 'virtual', { + ...tempVirtualConfig, + groupId: stableGroupId, + }) toast({ title: '虚拟身份标签页', @@ -814,20 +662,14 @@ export function ChatPage() { if (tabId === 'webui-default') { return } - - // 关闭 WebSocket 连接 - const ws = wsMapRef.current.get(tabId) - if (ws) { - ws.close() - wsMapRef.current.delete(tabId) - } - - // 清理重连定时器 - const timeout = reconnectTimeoutMapRef.current.get(tabId) - if (timeout) { - clearTimeout(timeout) - reconnectTimeoutMapRef.current.delete(tabId) + + const unsubscribe = sessionUnsubscribeMapRef.current.get(tabId) + if (unsubscribe) { + unsubscribe() + sessionUnsubscribeMapRef.current.delete(tabId) } + + void chatWsClient.closeSession(tabId) // 清理去重缓存 processedMessagesMapRef.current.delete(tabId) @@ -1133,7 +975,7 @@ export function ChatPage() { className="flex-1 h-10 sm:h-10" />