refactor(lib): unify WebSocket utilities into ws-utils

This commit is contained in:
DrSmoothl
2026-03-01 16:57:45 +08:00
parent 1533ec505b
commit e1c8a30376
3 changed files with 270 additions and 198 deletions

View File

@@ -3,8 +3,9 @@
* 确保整个应用只有一个 WebSocket 连接 * 确保整个应用只有一个 WebSocket 连接
*/ */
import { fetchWithAuth, checkAuthStatus } from './fetch-with-auth' import { checkAuthStatus } from './fetch-with-auth'
import { getSetting } from './settings-manager' import { getSetting } from './settings-manager'
import { createReconnectingWebSocket } from './ws-utils'
export interface LogEntry { export interface LogEntry {
id: string id: string
@@ -18,10 +19,7 @@ type LogCallback = (log: LogEntry) => void
type ConnectionCallback = (connected: boolean) => void type ConnectionCallback = (connected: boolean) => void
class LogWebSocketManager { class LogWebSocketManager {
private ws: WebSocket | null = null private wsControl: ReturnType<typeof createReconnectingWebSocket> | null = null
private reconnectTimeout: number | null = null
private reconnectAttempts = 0
private heartbeatInterval: number | null = null
// 订阅者 // 订阅者
private logCallbacks: Set<LogCallback> = new Set() private logCallbacks: Set<LogCallback> = new Set()
@@ -54,9 +52,9 @@ class LogWebSocketManager {
} }
/** /**
* 获取 WebSocket URL * 获取 WebSocket URL(不含 token 参数)
*/ */
private getWebSocketUrl(token?: string): string { private getWebSocketUrl(): string {
let baseUrl: string let baseUrl: string
if (import.meta.env.DEV) { if (import.meta.env.DEV) {
// 开发模式:连接到 WebUI 后端服务器 // 开发模式:连接到 WebUI 后端服务器
@@ -67,49 +65,13 @@ class LogWebSocketManager {
const host = window.location.host const host = window.location.host
baseUrl = `${protocol}//${host}/ws/logs` baseUrl = `${protocol}//${host}/ws/logs`
} }
// 如果有 token添加到 URL 参数
if (token) {
return `${baseUrl}?token=${encodeURIComponent(token)}`
}
return baseUrl return baseUrl
} }
/**
* 获取 WebSocket 临时认证 token
*/
private async getWsToken(): Promise<string | null> {
try {
// 使用相对路径,让前端代理处理请求,避免 CORS 问题
const response = await fetchWithAuth('/api/webui/ws-token', {
method: 'GET',
credentials: 'include', // 携带 Cookie
})
if (!response.ok) {
console.error('获取 WebSocket token 失败:', response.status)
return null
}
const data = await response.json()
if (data.success && data.token) {
return data.token
}
return null
} catch (error) {
console.error('获取 WebSocket token 失败:', error)
return null
}
}
/** /**
* 连接 WebSocket会先检查登录状态 * 连接 WebSocket会先检查登录状态
*/ */
async connect() { async connect() {
if (this.ws?.readyState === WebSocket.OPEN || this.ws?.readyState === WebSocket.CONNECTING) {
return
}
// 检查是否在登录页面 // 检查是否在登录页面
if (window.location.pathname === '/auth') { if (window.location.pathname === '/auth') {
console.log('📡 在登录页面,跳过 WebSocket 连接') console.log('📡 在登录页面,跳过 WebSocket 连接')
@@ -123,114 +85,51 @@ class LogWebSocketManager {
return return
} }
// 先获取临时认证 token const wsUrl = this.getWebSocketUrl()
const wsToken = await this.getWsToken()
if (!wsToken) {
console.log('📡 无法获取 WebSocket token跳过连接')
return
}
const wsUrl = this.getWebSocketUrl(wsToken)
// 使用 ws-utils 创建 WebSocket
this.wsControl = createReconnectingWebSocket(wsUrl, {
onMessage: (data: string) => {
try { try {
this.ws = new WebSocket(wsUrl) const log: LogEntry = JSON.parse(data)
this.ws.onopen = () => {
this.isConnected = true
this.reconnectAttempts = 0
this.notifyConnection(true)
this.startHeartbeat()
}
this.ws.onmessage = (event) => {
try {
// 忽略心跳响应
if (event.data === 'pong') {
return
}
const log: LogEntry = JSON.parse(event.data)
this.notifyLog(log) this.notifyLog(log)
} catch (error) { } catch (error) {
console.error('解析日志消息失败:', error) console.error('解析日志消息失败:', error)
} }
} },
onOpen: () => {
this.ws.onerror = (error) => { this.isConnected = true
this.notifyConnection(true)
},
onClose: () => {
this.isConnected = false
this.notifyConnection(false)
},
onError: (error) => {
console.error('❌ WebSocket 错误:', error) console.error('❌ WebSocket 错误:', error)
this.isConnected = false this.isConnected = false
this.notifyConnection(false) this.notifyConnection(false)
} },
heartbeatInterval: 30000,
maxRetries: this.getMaxReconnectAttempts(),
backoffBase: this.getReconnectInterval(),
maxBackoff: 30000,
})
this.ws.onclose = () => { // 启动连接
this.isConnected = false await this.wsControl.connect()
this.notifyConnection(false)
this.stopHeartbeat()
this.attemptReconnect()
}
} catch (error) {
console.error('创建 WebSocket 连接失败:', error)
this.attemptReconnect()
}
}
/**
* 尝试重连
*/
private attemptReconnect() {
const maxAttempts = this.getMaxReconnectAttempts()
if (this.reconnectAttempts >= maxAttempts) {
return
}
this.reconnectAttempts += 1
const baseInterval = this.getReconnectInterval()
const delay = Math.min(baseInterval * this.reconnectAttempts, 30000)
this.reconnectTimeout = window.setTimeout(() => {
this.connect() // connect 是 async 但这里不需要 await它内部会处理错误
}, delay)
}
/**
* 启动心跳
*/
private startHeartbeat() {
this.heartbeatInterval = window.setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send('ping')
}
}, 30000) // 每30秒发送一次心跳
}
/**
* 停止心跳
*/
private stopHeartbeat() {
if (this.heartbeatInterval !== null) {
clearInterval(this.heartbeatInterval)
this.heartbeatInterval = null
}
} }
/** /**
* 断开连接 * 断开连接
*/ */
disconnect() { disconnect() {
if (this.reconnectTimeout !== null) { if (this.wsControl) {
clearTimeout(this.reconnectTimeout) this.wsControl.disconnect()
this.reconnectTimeout = null this.wsControl = null
}
this.stopHeartbeat()
if (this.ws) {
this.ws.close()
this.ws = null
} }
this.isConnected = false this.isConnected = false
this.reconnectAttempts = 0
} }
/** /**

View File

@@ -1,6 +1,8 @@
import { fetchWithAuth, getAuthHeaders } from '@/lib/fetch-with-auth' import { fetchWithAuth, getAuthHeaders } from '@/lib/fetch-with-auth'
import type { PluginInfo } from '@/types/plugin' import type { PluginInfo } from '@/types/plugin'
import { createReconnectingWebSocket } from './ws-utils'
/** /**
* Git 安装状态 * Git 安装状态
*/ */
@@ -267,27 +269,6 @@ export function isPluginCompatible(
return true return true
} }
/**
* 获取 WebSocket 临时认证 token
*/
async function getWsToken(): Promise<string | null> {
try {
const response = await fetchWithAuth('/api/webui/ws-token')
if (!response.ok) {
console.error('获取 WebSocket token 失败:', response.status)
return null
}
const data = await response.json()
if (data.success && data.token) {
return data.token
}
return null
} catch (error) {
console.error('获取 WebSocket token 失败:', error)
return null
}
}
/** /**
* 连接插件加载进度 WebSocket * 连接插件加载进度 WebSocket
* *
@@ -297,60 +278,41 @@ export async function connectPluginProgressWebSocket(
onProgress: (progress: PluginLoadProgress) => void, onProgress: (progress: PluginLoadProgress) => void,
onError?: (error: Event) => void onError?: (error: Event) => void
): Promise<WebSocket | null> { ): Promise<WebSocket | null> {
// 先获取临时 token
const wsToken = await getWsToken()
if (!wsToken) {
console.warn('无法获取 WebSocket token可能未登录')
return null
}
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:' const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
const host = window.location.host const host = window.location.host
const wsUrl = `${protocol}//${host}/api/webui/ws/plugin-progress?token=${encodeURIComponent(wsToken)}` const wsUrl = `${protocol}//${host}/api/webui/ws/plugin-progress`
// 使用 ws-utils 创建 WebSocket
const wsControl = createReconnectingWebSocket(wsUrl, {
onMessage: (data: string) => {
try { try {
const ws = new WebSocket(wsUrl) const progressData = JSON.parse(data) as PluginLoadProgress
onProgress(progressData)
ws.onopen = () => {
console.log('Plugin progress WebSocket connected')
// 发送心跳
const heartbeat = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send('ping')
} else {
clearInterval(heartbeat)
}
}, 30000)
}
ws.onmessage = (event) => {
try {
// 忽略心跳响应
if (event.data === 'pong') {
return
}
const data = JSON.parse(event.data) as PluginLoadProgress
onProgress(data)
} catch (error) { } catch (error) {
console.error('Failed to parse progress data:', error) console.error('Failed to parse progress data:', error)
} }
} },
onOpen: () => {
ws.onerror = (error) => { console.log('Plugin progress WebSocket connected')
},
onClose: () => {
console.log('Plugin progress WebSocket disconnected')
},
onError: (error) => {
console.error('Plugin progress WebSocket error:', error) console.error('Plugin progress WebSocket error:', error)
onError?.(error) onError?.(error)
} },
heartbeatInterval: 30000,
maxRetries: 10,
backoffBase: 1000,
maxBackoff: 30000,
})
ws.onclose = () => { // 启动连接
console.log('Plugin progress WebSocket disconnected') await wsControl.connect()
}
return ws // 返回 WebSocket 实例(用于外部检查连接状态)
} catch (error) { return wsControl.getWebSocket()
console.error('创建 WebSocket 连接失败:', error)
return null
}
} }
/** /**

View File

@@ -0,0 +1,211 @@
import { fetchWithAuth } from './fetch-with-auth'
/**
* WebSocket 配置选项
*/
export interface WebSocketOptions {
onMessage?: (data: string) => void
onOpen?: () => void
onClose?: () => void
onError?: (error: Event) => void
heartbeatInterval?: number // 心跳间隔(毫秒)
maxRetries?: number // 最大重连次数
backoffBase?: number // 重连基础间隔(毫秒)
maxBackoff?: number // 最大重连间隔(毫秒)
}
/**
* 获取 WebSocket 临时认证 token
*/
export async function getWsToken(): Promise<string | null> {
try {
// 使用相对路径,让前端代理处理请求,避免 CORS 问题
const response = await fetchWithAuth('/api/webui/ws-token', {
method: 'GET',
credentials: 'include', // 携带 Cookie
})
if (!response.ok) {
console.error('获取 WebSocket token 失败:', response.status)
return null
}
const data = await response.json()
if (data.success && data.token) {
return data.token
}
return null
} catch (error) {
console.error('获取 WebSocket token 失败:', error)
return null
}
}
/**
* 创建带重连、心跳的 WebSocket 封装
*
* @param url WebSocket URL不含 token 参数)
* @param options 配置选项
* @returns WebSocket 控制对象,包含 connect、disconnect、send 方法
*/
export function createReconnectingWebSocket(
url: string,
options: WebSocketOptions = {}
) {
const {
onMessage,
onOpen,
onClose,
onError,
heartbeatInterval = 30000,
maxRetries = 10,
backoffBase = 1000,
maxBackoff = 30000,
} = options
let ws: WebSocket | null = null
let reconnectTimeout: number | null = null
let reconnectAttempts = 0
let heartbeatIntervalId: number | null = null
let isManualDisconnect = false
/**
* 启动心跳
*/
function startHeartbeat() {
stopHeartbeat()
heartbeatIntervalId = window.setInterval(() => {
if (ws?.readyState === WebSocket.OPEN) {
ws.send('ping')
}
}, heartbeatInterval)
}
/**
* 停止心跳
*/
function stopHeartbeat() {
if (heartbeatIntervalId !== null) {
clearInterval(heartbeatIntervalId)
heartbeatIntervalId = null
}
}
/**
* 尝试重连
*/
function attemptReconnect() {
if (isManualDisconnect) {
return
}
if (reconnectAttempts >= maxRetries) {
console.warn(`WebSocket 达到最大重连次数 (${maxRetries}),停止重连`)
return
}
reconnectAttempts += 1
const delay = Math.min(backoffBase * reconnectAttempts, maxBackoff)
console.log(`WebSocket 将在 ${delay}ms 后重连(第 ${reconnectAttempts} 次)`)
reconnectTimeout = window.setTimeout(() => {
connect()
}, delay)
}
/**
* 连接 WebSocket
*/
async function connect() {
if (ws?.readyState === WebSocket.OPEN || ws?.readyState === WebSocket.CONNECTING) {
return
}
// 先获取临时认证 token
const wsToken = await getWsToken()
if (!wsToken) {
console.warn('无法获取 WebSocket token跳过连接')
return
}
const wsUrl = `${url}?token=${encodeURIComponent(wsToken)}`
try {
ws = new WebSocket(wsUrl)
ws.onopen = () => {
reconnectAttempts = 0
startHeartbeat()
onOpen?.()
}
ws.onmessage = (event) => {
// 忽略心跳响应
if (event.data === 'pong') {
return
}
onMessage?.(event.data)
}
ws.onerror = (error) => {
console.error('WebSocket 错误:', error)
onError?.(error)
}
ws.onclose = () => {
stopHeartbeat()
onClose?.()
attemptReconnect()
}
} catch (error) {
console.error('创建 WebSocket 连接失败:', error)
attemptReconnect()
}
}
/**
* 断开连接
*/
function disconnect() {
isManualDisconnect = true
if (reconnectTimeout !== null) {
clearTimeout(reconnectTimeout)
reconnectTimeout = null
}
stopHeartbeat()
if (ws) {
ws.close()
ws = null
}
reconnectAttempts = 0
}
/**
* 发送消息
*/
function send(data: string) {
if (ws?.readyState === WebSocket.OPEN) {
ws.send(data)
} else {
console.warn('WebSocket 未连接,无法发送消息')
}
}
/**
* 获取当前 WebSocket 实例
*/
function getWebSocket(): WebSocket | null {
return ws
}
return {
connect,
disconnect,
send,
getWebSocket,
}
}