fix: 修复错误的插件重载机制,改为插件级区分

test: 补充新版本插件系统重载机制的测试
This commit is contained in:
DrSmoothl
2026-03-16 13:27:10 +08:00
parent e7ac064a80
commit a5a6d2cb26
3 changed files with 204 additions and 92 deletions

View File

@@ -8,14 +8,14 @@
"""
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Sequence, Tuple, Coroutine
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Coroutine, Dict, Iterable, List, Optional, Sequence, Tuple
import asyncio
import json
import tomlkit
from src.common.logger import get_logger
from src.config.config import config_manager, global_config
from src.config.config import global_config
from src.config.file_watcher import FileChange, FileWatcher
from src.plugin_runtime.capabilities import (
RuntimeComponentCapabilityMixin,
@@ -60,9 +60,9 @@ class PluginRuntimeManager(
self._builtin_supervisor: Optional[PluginSupervisor] = None
self._third_party_supervisor: Optional[PluginSupervisor] = None
self._started: bool = False
self._config_reload_callback_registered: bool = False
self._plugin_file_watcher: Optional[FileWatcher] = None
self._plugin_file_watcher_subscription_id: Optional[str] = None
self._plugin_source_watcher_subscription_id: Optional[str] = None
self._plugin_config_watcher_subscriptions: Dict[str, Tuple[Path, str]] = {}
# ─── 插件目录 ─────────────────────────────────────────────
@@ -138,14 +138,12 @@ class PluginRuntimeManager(
if self._third_party_supervisor:
await self._third_party_supervisor.start()
started_supervisors.append(self._third_party_supervisor)
self._register_config_reload_callback()
await self._start_plugin_file_watcher()
self._started = True
logger.info(f"插件运行时已启动 — 内置: {builtin_dirs or ''}, 第三方: {third_party_dirs or ''}")
except Exception as e:
logger.error(f"插件运行时启动失败: {e}", exc_info=True)
await self._stop_plugin_file_watcher()
self._unregister_config_reload_callback()
await asyncio.gather(*(sv.stop() for sv in started_supervisors), return_exceptions=True)
self._started = False
self._builtin_supervisor = None
@@ -157,7 +155,6 @@ class PluginRuntimeManager(
return
await self._stop_plugin_file_watcher()
self._unregister_config_reload_callback()
coroutines: List[Coroutine[Any, Any, None]] = []
if self._builtin_supervisor:
@@ -180,7 +177,7 @@ class PluginRuntimeManager(
return self._started
@property
def supervisors(self):
def supervisors(self) -> List["PluginSupervisor"]:
"""获取所有活跃的 Supervisor"""
return [s for s in (self._builtin_supervisor, self._third_party_supervisor) if s is not None]
@@ -221,22 +218,6 @@ class PluginRuntimeManager(
)
return True
async def handle_config_reload(self) -> None:
"""响应全局配置热重载,并向当前已注册插件重新推送各自配置。
该回调不负责重载插件运行时本身;它只在 config_manager 完成配置重载后,
遍历所有已注册插件并触发一次 plugin.config_updated 通知。
"""
if not self._started:
return
if tasks := [
self.notify_plugin_config_updated(plugin_id)
for sv in self.supervisors
for plugin_id in list(sv._registered_plugins.keys())
]:
await asyncio.gather(*tasks, return_exceptions=True)
# ─── 事件桥接 ──────────────────────────────────────────────
async def bridge_event(
@@ -314,10 +295,20 @@ class PluginRuntimeManager(
timeout_ms=timeout_ms,
)
def _get_supervisors_for_plugin(self, plugin_id: str):
def _get_supervisors_for_plugin(self, plugin_id: str) -> List["PluginSupervisor"]:
"""返回当前持有指定插件的所有 Supervisor。
该辅助函数主要用于检测插件是否被重复注册到多个运行时分组,
供后续单路由选择和冲突检查使用。
"""
return [supervisor for supervisor in self.supervisors if plugin_id in supervisor._registered_plugins]
def _get_supervisor_for_plugin(self, plugin_id: str):
def _get_supervisor_for_plugin(self, plugin_id: str) -> Optional["PluginSupervisor"]:
"""返回负责指定插件的唯一 Supervisor。
如果同一个插件同时出现在多个 Supervisor 中,说明运行时状态异常,
此时直接抛出错误,避免把请求路由到错误的子进程。
"""
matches = self._get_supervisors_for_plugin(plugin_id)
if len(matches) > 1:
raise RuntimeError(f"插件 {plugin_id} 同时存在于多个 Supervisor 中,无法安全路由")
@@ -325,6 +316,7 @@ class PluginRuntimeManager(
@staticmethod
def _find_duplicate_plugin_ids(plugin_dirs: List[Path]) -> Dict[str, List[Path]]:
"""扫描插件目录,找出被多个目录重复声明的插件 ID。"""
plugin_locations: Dict[str, List[Path]] = {}
for base_dir in plugin_dirs:
if not base_dir.is_dir():
@@ -353,19 +345,8 @@ class PluginRuntimeManager(
if len(set(paths)) > 1
}
def _register_config_reload_callback(self) -> None:
if self._config_reload_callback_registered:
return
config_manager.register_reload_callback(self.handle_config_reload)
self._config_reload_callback_registered = True
def _unregister_config_reload_callback(self) -> None:
if not self._config_reload_callback_registered:
return
config_manager.unregister_reload_callback(self.handle_config_reload)
self._config_reload_callback_registered = False
async def _start_plugin_file_watcher(self) -> None:
"""启动插件文件监视器,并建立源码与配置两类订阅。"""
if self._plugin_file_watcher is not None and self._plugin_file_watcher.running:
return
@@ -380,25 +361,111 @@ class PluginRuntimeManager(
callback_failure_threshold=3,
callback_cooldown_s=30.0,
)
subscription_id = watcher.subscribe(self._handle_plugin_file_changes, paths=watch_paths)
subscription_id = watcher.subscribe(self._handle_plugin_source_changes, paths=watch_paths)
await watcher.start()
self._plugin_file_watcher = watcher
self._plugin_file_watcher_subscription_id = subscription_id
self._plugin_source_watcher_subscription_id = subscription_id
self._refresh_plugin_config_watch_subscriptions()
async def _stop_plugin_file_watcher(self) -> None:
"""停止插件文件监视器,并清理所有已注册订阅。"""
if self._plugin_file_watcher is None:
return
if self._plugin_file_watcher_subscription_id is not None:
self._plugin_file_watcher.unsubscribe(self._plugin_file_watcher_subscription_id)
self._plugin_file_watcher_subscription_id = None
for _plugin_id, (_config_path, subscription_id) in list(self._plugin_config_watcher_subscriptions.items()):
self._plugin_file_watcher.unsubscribe(subscription_id)
self._plugin_config_watcher_subscriptions.clear()
if self._plugin_source_watcher_subscription_id is not None:
self._plugin_file_watcher.unsubscribe(self._plugin_source_watcher_subscription_id)
self._plugin_source_watcher_subscription_id = None
await self._plugin_file_watcher.stop()
self._plugin_file_watcher = None
def _iter_plugin_dirs(self) -> Iterable[Path]:
"""迭代所有 Supervisor 当前管理的插件根目录。"""
for supervisor in self.supervisors:
yield from getattr(supervisor, "_plugin_dirs", [])
async def _handle_plugin_file_changes(self, changes: Sequence[FileChange]) -> None:
def _refresh_plugin_config_watch_subscriptions(self) -> None:
"""按当前已注册插件集合刷新 config.toml 的单插件订阅。
当插件热重载后,插件集合或目录位置可能发生变化,因此需要重新对齐
watcher 的订阅,确保每个插件配置变更只触发对应 plugin_id。
"""
if self._plugin_file_watcher is None:
return
desired_config_paths = dict(self._iter_registered_plugin_config_paths())
for plugin_id, (_old_path, subscription_id) in list(self._plugin_config_watcher_subscriptions.items()):
if desired_config_paths.get(plugin_id) == self._plugin_config_watcher_subscriptions[plugin_id][0]:
continue
self._plugin_file_watcher.unsubscribe(subscription_id)
del self._plugin_config_watcher_subscriptions[plugin_id]
for plugin_id, config_path in desired_config_paths.items():
existing_subscription = self._plugin_config_watcher_subscriptions.get(plugin_id)
if existing_subscription is not None and existing_subscription[0] == config_path:
continue
subscription_id = self._plugin_file_watcher.subscribe(
self._build_plugin_config_change_callback(plugin_id),
paths=[config_path],
)
self._plugin_config_watcher_subscriptions[plugin_id] = (config_path, subscription_id)
def _build_plugin_config_change_callback(
self, plugin_id: str
) -> Callable[[Sequence[FileChange]], Awaitable[None]]:
"""为指定插件生成配置文件变更回调。"""
async def _callback(changes: Sequence[FileChange]) -> None:
await self._handle_plugin_config_changes(plugin_id, changes)
return _callback
def _iter_registered_plugin_config_paths(self) -> Iterable[Tuple[str, Path]]:
"""迭代当前所有已注册插件的 config.toml 路径。"""
for supervisor in self.supervisors:
for plugin_id in getattr(supervisor, "_registered_plugins", {}).keys():
if config_path := self._get_plugin_config_path_for_supervisor(supervisor, plugin_id):
yield plugin_id, config_path
def _get_plugin_config_path_for_supervisor(self, supervisor: Any, plugin_id: str) -> Optional[Path]:
"""从指定 Supervisor 的插件目录中定位某个插件的 config.toml。"""
for plugin_dir in getattr(supervisor, "_plugin_dirs", []):
plugin_dir = Path(plugin_dir)
plugin_path = plugin_dir.resolve() / plugin_id
if plugin_path.is_dir():
return plugin_path / "config.toml"
return None
async def _handle_plugin_config_changes(self, plugin_id: str, changes: Sequence[FileChange]) -> None:
"""处理单个插件配置文件变化,并仅向目标插件推送配置更新。"""
if not self._started or not changes:
return
try:
supervisor = self._get_supervisor_for_plugin(plugin_id)
except RuntimeError as exc:
logger.warning(f"插件 {plugin_id} 配置监听匹配失败: {exc}")
return
if supervisor is None:
return
try:
await supervisor.notify_plugin_config_updated(
plugin_id=plugin_id,
config_data=self._load_plugin_config_for_supervisor(plugin_id, getattr(supervisor, "_plugin_dirs", [])),
)
except Exception as exc:
logger.warning(f"插件 {plugin_id} 配置热更新通知失败: {exc}")
async def _handle_plugin_source_changes(self, changes: Sequence[FileChange]) -> None:
"""处理插件源码相关变化。
这里仅负责源码、清单等会影响插件装载状态的文件;配置文件的变化会由
单独的 per-plugin watcher 处理,避免把单插件配置更新放大成全量 reload。
"""
if not self._started or not changes:
return
@@ -411,55 +478,39 @@ class PluginRuntimeManager(
return
reload_supervisors: List[Any] = []
config_updates: Dict[int, set[str]] = {}
changed_paths = [change.path.resolve() for change in changes]
for supervisor in self.supervisors:
plugin_ids_for_config = config_updates.setdefault(self._get_supervisor_key(supervisor), set())
for path in changed_paths:
plugin_id = self._match_plugin_id_for_supervisor(supervisor, path)
if plugin_id is None:
continue
if path.name == "config.toml":
plugin_ids_for_config.add(plugin_id)
elif path.name in {"plugin.py", "_manifest.json"} or path.suffix == ".py":
if supervisor not in reload_supervisors:
reload_supervisors.append(supervisor)
if (path.name in {"plugin.py", "_manifest.json"} or path.suffix == ".py") and supervisor not in reload_supervisors:
reload_supervisors.append(supervisor)
for supervisor in reload_supervisors:
await supervisor.reload_plugins(reason="file_watcher")
for supervisor in self.supervisors:
if supervisor in reload_supervisors:
continue
for plugin_id in config_updates.get(self._get_supervisor_key(supervisor), set()):
try:
await supervisor.notify_plugin_config_updated(
plugin_id=plugin_id,
config_data=self._load_plugin_config_for_supervisor(
plugin_id, getattr(supervisor, "_plugin_dirs", [])
),
)
except Exception as exc:
logger.warning(f"插件 {plugin_id} 配置热更新通知失败: {exc}")
@staticmethod
def _get_supervisor_key(supervisor: Any) -> int:
return id(supervisor)
if reload_supervisors:
self._refresh_plugin_config_watch_subscriptions()
@staticmethod
def _plugin_dir_matches(path: Path, plugin_dir: Path) -> bool:
"""判断某个文件路径是否落在指定插件根目录内。"""
plugin_root = plugin_dir.resolve()
return path == plugin_root or path.is_relative_to(plugin_root)
def _match_plugin_id_for_supervisor(self, supervisor: Any, path: Path) -> Optional[str]:
"""根据变更路径为指定 Supervisor 推断受影响的插件 ID。"""
for plugin_id, _reg in getattr(supervisor, "_registered_plugins", {}).items():
for plugin_dir in getattr(supervisor, "_plugin_dirs", []):
plugin_dir = Path(plugin_dir)
candidate_dir = plugin_dir.resolve() / plugin_id
if path == candidate_dir or path.is_relative_to(candidate_dir):
return plugin_id
for plugin_dir in getattr(supervisor, "_plugin_dirs", []):
plugin_dir = Path(plugin_dir)
plugin_root = plugin_dir.resolve()
if self._plugin_dir_matches(path, plugin_dir) and (relative_parts := path.relative_to(plugin_root).parts):
return relative_parts[0]
@@ -467,6 +518,7 @@ class PluginRuntimeManager(
@staticmethod
def _load_plugin_config_for_supervisor(plugin_id: str, plugin_dirs: Iterable[Path]) -> Dict[str, Any]:
"""从给定插件目录集合中读取目标插件的配置内容。"""
for plugin_dir in plugin_dirs:
plugin_path = plugin_dir.resolve() / plugin_id
if plugin_path.is_dir():