feat: add WebSocket plugin progress notification module and integrate with plugin management routes

- Implemented a WebSocket endpoint for broadcasting plugin loading progress.
- Created functions to update and broadcast progress status to connected clients.
- Integrated progress updates into plugin management routes for operations like fetching, cloning, installing, uninstalling, and updating plugins.
- Added error handling and logging for progress updates and WebSocket connections.
This commit is contained in:
墨梓柒
2025-11-18 22:54:25 +08:00
parent 8fe90b3642
commit 4141c25fdd
4 changed files with 2009 additions and 0 deletions

View File

@@ -0,0 +1,127 @@
"""WebSocket 插件加载进度推送模块"""
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from typing import Set, Dict, Any
import json
import asyncio
from src.common.logger import get_logger
logger = get_logger("webui.plugin_progress")
# 创建路由器
router = APIRouter()
# 全局 WebSocket 连接池
active_connections: Set[WebSocket] = set()
# 当前加载进度状态
current_progress: Dict[str, Any] = {
"operation": "idle", # idle, fetch, install, uninstall, update
"stage": "idle", # idle, loading, success, error
"progress": 0, # 0-100
"message": "",
"error": None,
"plugin_id": None, # 当前操作的插件 ID
"total_plugins": 0,
"loaded_plugins": 0
}
async def broadcast_progress(progress_data: Dict[str, Any]):
"""广播进度更新到所有连接的客户端"""
global current_progress
current_progress = progress_data.copy()
if not active_connections:
return
message = json.dumps(progress_data, ensure_ascii=False)
disconnected = set()
for websocket in active_connections:
try:
await websocket.send_text(message)
except Exception as e:
logger.error(f"发送进度更新失败: {e}")
disconnected.add(websocket)
# 移除断开的连接
for websocket in disconnected:
active_connections.discard(websocket)
async def update_progress(
stage: str,
progress: int,
message: str,
operation: str = "fetch",
error: str = None,
plugin_id: str = None,
total_plugins: int = 0,
loaded_plugins: int = 0
):
"""更新并广播进度
Args:
stage: 阶段 (idle, loading, success, error)
progress: 进度百分比 (0-100)
message: 当前消息
operation: 操作类型 (fetch, install, uninstall, update)
error: 错误信息(可选)
plugin_id: 当前操作的插件 ID
total_plugins: 总插件数
loaded_plugins: 已加载插件数
"""
progress_data = {
"operation": operation,
"stage": stage,
"progress": progress,
"message": message,
"error": error,
"plugin_id": plugin_id,
"total_plugins": total_plugins,
"loaded_plugins": loaded_plugins,
"timestamp": asyncio.get_event_loop().time()
}
await broadcast_progress(progress_data)
logger.debug(f"进度更新: [{operation}] {stage} - {progress}% - {message}")
@router.websocket("/ws/plugin-progress")
async def websocket_plugin_progress(websocket: WebSocket):
"""WebSocket 插件加载进度推送端点
客户端连接后会立即收到当前进度状态
"""
await websocket.accept()
active_connections.add(websocket)
logger.info(f"📡 插件进度 WebSocket 客户端已连接,当前连接数: {len(active_connections)}")
try:
# 发送当前进度状态
await websocket.send_text(json.dumps(current_progress, ensure_ascii=False))
# 保持连接并处理客户端消息
while True:
try:
data = await websocket.receive_text()
# 处理客户端心跳
if data == "ping":
await websocket.send_text("pong")
except Exception as e:
logger.error(f"处理客户端消息时出错: {e}")
break
except WebSocketDisconnect:
active_connections.discard(websocket)
logger.info(f"📡 插件进度 WebSocket 客户端已断开,当前连接数: {len(active_connections)}")
except Exception as e:
logger.error(f"❌ WebSocket 错误: {e}")
active_connections.discard(websocket)
def get_progress_router() -> APIRouter:
"""获取插件进度 WebSocket 路由器"""
return router