diff --git a/pytests/test_plugin_runtime.py b/pytests/test_plugin_runtime.py index 7c383cbb..2c703161 100644 --- a/pytests/test_plugin_runtime.py +++ b/pytests/test_plugin_runtime.py @@ -2235,7 +2235,7 @@ class TestIntegration: assert instances[0].stopped is True @pytest.mark.asyncio - async def test_handle_plugin_file_changes_routes_reload_and_config_update(self, monkeypatch, tmp_path): + async def test_handle_plugin_source_changes_only_reload_matching_supervisor(self, monkeypatch, tmp_path): from src.config.file_watcher import FileChange from src.plugin_runtime import integration as integration_module @@ -2266,47 +2266,107 @@ class TestIntegration: manager = integration_module.PluginRuntimeManager() manager._started = True - manager._builtin_supervisor = FakeSupervisor([str(builtin_root)], {"alpha": object()}) - manager._thirdparty_supervisor = FakeSupervisor([str(thirdparty_root)], {"beta": object()}) + manager._builtin_supervisor = FakeSupervisor([builtin_root], {"alpha": object()}) + manager._third_party_supervisor = FakeSupervisor([thirdparty_root], {"beta": object()}) changes = [ - FileChange(change_type=1, path=alpha_dir / "config.toml"), FileChange(change_type=1, path=beta_dir / "plugin.py"), ] - await manager._handle_plugin_file_changes(changes) + refresh_calls = [] + + def fake_refresh() -> None: + refresh_calls.append(True) + + manager._refresh_plugin_config_watch_subscriptions = fake_refresh + + await manager._handle_plugin_source_changes(changes) assert manager._builtin_supervisor.reload_reasons == [] - assert manager._thirdparty_supervisor.reload_reasons == ["file_watcher"] - assert manager._builtin_supervisor.config_updates == [ - ("alpha", {"enabled": True}, "") - ] - assert manager._thirdparty_supervisor.config_updates == [] + assert manager._third_party_supervisor.reload_reasons == ["file_watcher"] + assert manager._builtin_supervisor.config_updates == [] + assert manager._third_party_supervisor.config_updates == [] + assert refresh_calls == [True] @pytest.mark.asyncio - async def test_handle_config_reload_notifies_all_registered_plugins(self): + async def test_handle_plugin_config_changes_only_notify_target_plugin(self, monkeypatch, tmp_path): from src.plugin_runtime import integration as integration_module + from src.config.file_watcher import FileChange + + builtin_root = tmp_path / "src" / "plugins" / "built_in" + thirdparty_root = tmp_path / "plugins" + alpha_dir = builtin_root / "alpha" + beta_dir = thirdparty_root / "beta" + alpha_dir.mkdir(parents=True) + beta_dir.mkdir(parents=True) + (alpha_dir / "config.toml").write_text("enabled = true\n", encoding="utf-8") + (beta_dir / "config.toml").write_text("enabled = false\n", encoding="utf-8") + + monkeypatch.chdir(tmp_path) class FakeSupervisor: - def __init__(self, plugins): + def __init__(self, plugin_dirs, plugins): + self._plugin_dirs = plugin_dirs self._registered_plugins = {plugin_id: object() for plugin_id in plugins} + self.config_updates = [] + + async def notify_plugin_config_updated(self, plugin_id, config_data, config_version=""): + self.config_updates.append((plugin_id, config_data, config_version)) + return True manager = integration_module.PluginRuntimeManager() manager._started = True - manager._builtin_supervisor = FakeSupervisor(["alpha"]) - manager._thirdparty_supervisor = FakeSupervisor(["beta", "gamma"]) + manager._builtin_supervisor = FakeSupervisor([builtin_root], ["alpha"]) + manager._third_party_supervisor = FakeSupervisor([thirdparty_root], ["beta"]) - notified = [] + await manager._handle_plugin_config_changes( + "alpha", + [FileChange(change_type=1, path=alpha_dir / "config.toml")], + ) - async def fake_notify(plugin_id, config_data=None, config_version=""): - notified.append((plugin_id, config_version)) - return True + assert manager._builtin_supervisor.config_updates == [("alpha", {"enabled": True}, "")] + assert manager._third_party_supervisor.config_updates == [] - manager.notify_plugin_config_updated = fake_notify + def test_refresh_plugin_config_watch_subscriptions_registers_per_plugin(self, tmp_path): + from src.plugin_runtime import integration as integration_module - await manager.handle_config_reload() + builtin_root = tmp_path / "src" / "plugins" / "built_in" + thirdparty_root = tmp_path / "plugins" + alpha_dir = builtin_root / "alpha" + beta_dir = thirdparty_root / "beta" + alpha_dir.mkdir(parents=True) + beta_dir.mkdir(parents=True) - assert notified == [("alpha", ""), ("beta", ""), ("gamma", "")] + class FakeWatcher: + def __init__(self): + self.subscriptions = [] + self.unsubscribed = [] + + def subscribe(self, callback, *, paths=None, change_types=None): + subscription_id = f"sub-{len(self.subscriptions) + 1}" + self.subscriptions.append({"id": subscription_id, "callback": callback, "paths": tuple(paths or ())}) + return subscription_id + + def unsubscribe(self, subscription_id): + self.unsubscribed.append(subscription_id) + return True + + class FakeSupervisor: + def __init__(self, plugin_dirs, plugins): + self._plugin_dirs = plugin_dirs + self._registered_plugins = {plugin_id: object() for plugin_id in plugins} + + manager = integration_module.PluginRuntimeManager() + manager._plugin_file_watcher = FakeWatcher() + manager._builtin_supervisor = FakeSupervisor([builtin_root], ["alpha"]) + manager._third_party_supervisor = FakeSupervisor([thirdparty_root], ["beta"]) + + manager._refresh_plugin_config_watch_subscriptions() + + assert set(manager._plugin_config_watcher_subscriptions.keys()) == {"alpha", "beta"} + assert { + subscription["paths"][0] for subscription in manager._plugin_file_watcher.subscriptions + } == {alpha_dir / "config.toml", beta_dir / "config.toml"} @pytest.mark.asyncio async def test_component_reload_plugin_returns_failure_when_reload_rolls_back(self, monkeypatch): diff --git a/src/plugin_runtime/integration.py b/src/plugin_runtime/integration.py index ffd3b4c6..04c8e324 100644 --- a/src/plugin_runtime/integration.py +++ b/src/plugin_runtime/integration.py @@ -8,14 +8,14 @@ """ from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Sequence, Tuple, Coroutine +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Coroutine, Dict, Iterable, List, Optional, Sequence, Tuple import asyncio import json import tomlkit from src.common.logger import get_logger -from src.config.config import config_manager, global_config +from src.config.config import global_config from src.config.file_watcher import FileChange, FileWatcher from src.plugin_runtime.capabilities import ( RuntimeComponentCapabilityMixin, @@ -60,9 +60,9 @@ class PluginRuntimeManager( self._builtin_supervisor: Optional[PluginSupervisor] = None self._third_party_supervisor: Optional[PluginSupervisor] = None self._started: bool = False - self._config_reload_callback_registered: bool = False self._plugin_file_watcher: Optional[FileWatcher] = None - self._plugin_file_watcher_subscription_id: Optional[str] = None + self._plugin_source_watcher_subscription_id: Optional[str] = None + self._plugin_config_watcher_subscriptions: Dict[str, Tuple[Path, str]] = {} # ─── 插件目录 ───────────────────────────────────────────── @@ -138,14 +138,12 @@ class PluginRuntimeManager( if self._third_party_supervisor: await self._third_party_supervisor.start() started_supervisors.append(self._third_party_supervisor) - self._register_config_reload_callback() await self._start_plugin_file_watcher() self._started = True logger.info(f"插件运行时已启动 — 内置: {builtin_dirs or '无'}, 第三方: {third_party_dirs or '无'}") except Exception as e: logger.error(f"插件运行时启动失败: {e}", exc_info=True) await self._stop_plugin_file_watcher() - self._unregister_config_reload_callback() await asyncio.gather(*(sv.stop() for sv in started_supervisors), return_exceptions=True) self._started = False self._builtin_supervisor = None @@ -157,7 +155,6 @@ class PluginRuntimeManager( return await self._stop_plugin_file_watcher() - self._unregister_config_reload_callback() coroutines: List[Coroutine[Any, Any, None]] = [] if self._builtin_supervisor: @@ -180,7 +177,7 @@ class PluginRuntimeManager( return self._started @property - def supervisors(self): + def supervisors(self) -> List["PluginSupervisor"]: """获取所有活跃的 Supervisor""" return [s for s in (self._builtin_supervisor, self._third_party_supervisor) if s is not None] @@ -221,22 +218,6 @@ class PluginRuntimeManager( ) return True - async def handle_config_reload(self) -> None: - """响应全局配置热重载,并向当前已注册插件重新推送各自配置。 - - 该回调不负责重载插件运行时本身;它只在 config_manager 完成配置重载后, - 遍历所有已注册插件并触发一次 plugin.config_updated 通知。 - """ - if not self._started: - return - - if tasks := [ - self.notify_plugin_config_updated(plugin_id) - for sv in self.supervisors - for plugin_id in list(sv._registered_plugins.keys()) - ]: - await asyncio.gather(*tasks, return_exceptions=True) - # ─── 事件桥接 ────────────────────────────────────────────── async def bridge_event( @@ -314,10 +295,20 @@ class PluginRuntimeManager( timeout_ms=timeout_ms, ) - def _get_supervisors_for_plugin(self, plugin_id: str): + def _get_supervisors_for_plugin(self, plugin_id: str) -> List["PluginSupervisor"]: + """返回当前持有指定插件的所有 Supervisor。 + + 该辅助函数主要用于检测插件是否被重复注册到多个运行时分组, + 供后续单路由选择和冲突检查使用。 + """ return [supervisor for supervisor in self.supervisors if plugin_id in supervisor._registered_plugins] - def _get_supervisor_for_plugin(self, plugin_id: str): + def _get_supervisor_for_plugin(self, plugin_id: str) -> Optional["PluginSupervisor"]: + """返回负责指定插件的唯一 Supervisor。 + + 如果同一个插件同时出现在多个 Supervisor 中,说明运行时状态异常, + 此时直接抛出错误,避免把请求路由到错误的子进程。 + """ matches = self._get_supervisors_for_plugin(plugin_id) if len(matches) > 1: raise RuntimeError(f"插件 {plugin_id} 同时存在于多个 Supervisor 中,无法安全路由") @@ -325,6 +316,7 @@ class PluginRuntimeManager( @staticmethod def _find_duplicate_plugin_ids(plugin_dirs: List[Path]) -> Dict[str, List[Path]]: + """扫描插件目录,找出被多个目录重复声明的插件 ID。""" plugin_locations: Dict[str, List[Path]] = {} for base_dir in plugin_dirs: if not base_dir.is_dir(): @@ -353,19 +345,8 @@ class PluginRuntimeManager( if len(set(paths)) > 1 } - def _register_config_reload_callback(self) -> None: - if self._config_reload_callback_registered: - return - config_manager.register_reload_callback(self.handle_config_reload) - self._config_reload_callback_registered = True - - def _unregister_config_reload_callback(self) -> None: - if not self._config_reload_callback_registered: - return - config_manager.unregister_reload_callback(self.handle_config_reload) - self._config_reload_callback_registered = False - async def _start_plugin_file_watcher(self) -> None: + """启动插件文件监视器,并建立源码与配置两类订阅。""" if self._plugin_file_watcher is not None and self._plugin_file_watcher.running: return @@ -380,25 +361,111 @@ class PluginRuntimeManager( callback_failure_threshold=3, callback_cooldown_s=30.0, ) - subscription_id = watcher.subscribe(self._handle_plugin_file_changes, paths=watch_paths) + subscription_id = watcher.subscribe(self._handle_plugin_source_changes, paths=watch_paths) await watcher.start() self._plugin_file_watcher = watcher - self._plugin_file_watcher_subscription_id = subscription_id + self._plugin_source_watcher_subscription_id = subscription_id + self._refresh_plugin_config_watch_subscriptions() async def _stop_plugin_file_watcher(self) -> None: + """停止插件文件监视器,并清理所有已注册订阅。""" if self._plugin_file_watcher is None: return - if self._plugin_file_watcher_subscription_id is not None: - self._plugin_file_watcher.unsubscribe(self._plugin_file_watcher_subscription_id) - self._plugin_file_watcher_subscription_id = None + for _plugin_id, (_config_path, subscription_id) in list(self._plugin_config_watcher_subscriptions.items()): + self._plugin_file_watcher.unsubscribe(subscription_id) + self._plugin_config_watcher_subscriptions.clear() + if self._plugin_source_watcher_subscription_id is not None: + self._plugin_file_watcher.unsubscribe(self._plugin_source_watcher_subscription_id) + self._plugin_source_watcher_subscription_id = None await self._plugin_file_watcher.stop() self._plugin_file_watcher = None def _iter_plugin_dirs(self) -> Iterable[Path]: + """迭代所有 Supervisor 当前管理的插件根目录。""" for supervisor in self.supervisors: yield from getattr(supervisor, "_plugin_dirs", []) - async def _handle_plugin_file_changes(self, changes: Sequence[FileChange]) -> None: + def _refresh_plugin_config_watch_subscriptions(self) -> None: + """按当前已注册插件集合刷新 config.toml 的单插件订阅。 + + 当插件热重载后,插件集合或目录位置可能发生变化,因此需要重新对齐 + watcher 的订阅,确保每个插件配置变更只触发对应 plugin_id。 + """ + if self._plugin_file_watcher is None: + return + + desired_config_paths = dict(self._iter_registered_plugin_config_paths()) + + for plugin_id, (_old_path, subscription_id) in list(self._plugin_config_watcher_subscriptions.items()): + if desired_config_paths.get(plugin_id) == self._plugin_config_watcher_subscriptions[plugin_id][0]: + continue + self._plugin_file_watcher.unsubscribe(subscription_id) + del self._plugin_config_watcher_subscriptions[plugin_id] + + for plugin_id, config_path in desired_config_paths.items(): + existing_subscription = self._plugin_config_watcher_subscriptions.get(plugin_id) + if existing_subscription is not None and existing_subscription[0] == config_path: + continue + subscription_id = self._plugin_file_watcher.subscribe( + self._build_plugin_config_change_callback(plugin_id), + paths=[config_path], + ) + self._plugin_config_watcher_subscriptions[plugin_id] = (config_path, subscription_id) + + def _build_plugin_config_change_callback( + self, plugin_id: str + ) -> Callable[[Sequence[FileChange]], Awaitable[None]]: + """为指定插件生成配置文件变更回调。""" + + async def _callback(changes: Sequence[FileChange]) -> None: + await self._handle_plugin_config_changes(plugin_id, changes) + + return _callback + + def _iter_registered_plugin_config_paths(self) -> Iterable[Tuple[str, Path]]: + """迭代当前所有已注册插件的 config.toml 路径。""" + for supervisor in self.supervisors: + for plugin_id in getattr(supervisor, "_registered_plugins", {}).keys(): + if config_path := self._get_plugin_config_path_for_supervisor(supervisor, plugin_id): + yield plugin_id, config_path + + def _get_plugin_config_path_for_supervisor(self, supervisor: Any, plugin_id: str) -> Optional[Path]: + """从指定 Supervisor 的插件目录中定位某个插件的 config.toml。""" + for plugin_dir in getattr(supervisor, "_plugin_dirs", []): + plugin_dir = Path(plugin_dir) + plugin_path = plugin_dir.resolve() / plugin_id + if plugin_path.is_dir(): + return plugin_path / "config.toml" + return None + + async def _handle_plugin_config_changes(self, plugin_id: str, changes: Sequence[FileChange]) -> None: + """处理单个插件配置文件变化,并仅向目标插件推送配置更新。""" + if not self._started or not changes: + return + + try: + supervisor = self._get_supervisor_for_plugin(plugin_id) + except RuntimeError as exc: + logger.warning(f"插件 {plugin_id} 配置监听匹配失败: {exc}") + return + + if supervisor is None: + return + + try: + await supervisor.notify_plugin_config_updated( + plugin_id=plugin_id, + config_data=self._load_plugin_config_for_supervisor(plugin_id, getattr(supervisor, "_plugin_dirs", [])), + ) + except Exception as exc: + logger.warning(f"插件 {plugin_id} 配置热更新通知失败: {exc}") + + async def _handle_plugin_source_changes(self, changes: Sequence[FileChange]) -> None: + """处理插件源码相关变化。 + + 这里仅负责源码、清单等会影响插件装载状态的文件;配置文件的变化会由 + 单独的 per-plugin watcher 处理,避免把单插件配置更新放大成全量 reload。 + """ if not self._started or not changes: return @@ -411,55 +478,39 @@ class PluginRuntimeManager( return reload_supervisors: List[Any] = [] - config_updates: Dict[int, set[str]] = {} changed_paths = [change.path.resolve() for change in changes] for supervisor in self.supervisors: - plugin_ids_for_config = config_updates.setdefault(self._get_supervisor_key(supervisor), set()) for path in changed_paths: plugin_id = self._match_plugin_id_for_supervisor(supervisor, path) if plugin_id is None: continue - if path.name == "config.toml": - plugin_ids_for_config.add(plugin_id) - elif path.name in {"plugin.py", "_manifest.json"} or path.suffix == ".py": - if supervisor not in reload_supervisors: - reload_supervisors.append(supervisor) + if (path.name in {"plugin.py", "_manifest.json"} or path.suffix == ".py") and supervisor not in reload_supervisors: + reload_supervisors.append(supervisor) for supervisor in reload_supervisors: await supervisor.reload_plugins(reason="file_watcher") - for supervisor in self.supervisors: - if supervisor in reload_supervisors: - continue - for plugin_id in config_updates.get(self._get_supervisor_key(supervisor), set()): - try: - await supervisor.notify_plugin_config_updated( - plugin_id=plugin_id, - config_data=self._load_plugin_config_for_supervisor( - plugin_id, getattr(supervisor, "_plugin_dirs", []) - ), - ) - except Exception as exc: - logger.warning(f"插件 {plugin_id} 配置热更新通知失败: {exc}") - - @staticmethod - def _get_supervisor_key(supervisor: Any) -> int: - return id(supervisor) + if reload_supervisors: + self._refresh_plugin_config_watch_subscriptions() @staticmethod def _plugin_dir_matches(path: Path, plugin_dir: Path) -> bool: + """判断某个文件路径是否落在指定插件根目录内。""" plugin_root = plugin_dir.resolve() return path == plugin_root or path.is_relative_to(plugin_root) def _match_plugin_id_for_supervisor(self, supervisor: Any, path: Path) -> Optional[str]: + """根据变更路径为指定 Supervisor 推断受影响的插件 ID。""" for plugin_id, _reg in getattr(supervisor, "_registered_plugins", {}).items(): for plugin_dir in getattr(supervisor, "_plugin_dirs", []): + plugin_dir = Path(plugin_dir) candidate_dir = plugin_dir.resolve() / plugin_id if path == candidate_dir or path.is_relative_to(candidate_dir): return plugin_id for plugin_dir in getattr(supervisor, "_plugin_dirs", []): + plugin_dir = Path(plugin_dir) plugin_root = plugin_dir.resolve() if self._plugin_dir_matches(path, plugin_dir) and (relative_parts := path.relative_to(plugin_root).parts): return relative_parts[0] @@ -467,6 +518,7 @@ class PluginRuntimeManager( @staticmethod def _load_plugin_config_for_supervisor(plugin_id: str, plugin_dirs: Iterable[Path]) -> Dict[str, Any]: + """从给定插件目录集合中读取目标插件的配置内容。""" for plugin_dir in plugin_dirs: plugin_path = plugin_dir.resolve() / plugin_id if plugin_path.is_dir(): diff --git a/src/webui/routers/plugin/config_routes.py b/src/webui/routers/plugin/config_routes.py index a84362bf..128e86a8 100644 --- a/src/webui/routers/plugin/config_routes.py +++ b/src/webui/routers/plugin/config_routes.py @@ -206,7 +206,7 @@ async def update_plugin_config_raw( file_obj.write(request.config) logger.info(f"已更新插件原始配置: {plugin_id}") - return {"success": True, "message": "配置已保存", "note": "配置更改将在插件重新加载后生效"} + return {"success": True, "message": "配置已保存", "note": "配置更改将自动热更新到对应插件"} except HTTPException: raise except Exception as e: @@ -266,7 +266,7 @@ async def update_plugin_config( save_toml_with_format(config_data, str(config_path)) logger.info(f"已更新插件配置: {plugin_id}") - return {"success": True, "message": "配置已保存", "note": "配置更改将在插件重新加载后生效"} + return {"success": True, "message": "配置已保存", "note": "配置更改将自动热更新到对应插件"} except HTTPException: raise except Exception as e: @@ -290,7 +290,7 @@ async def reset_plugin_config(plugin_id: str, maibot_session: Optional[str] = Co backup_path = backup_file(config_path, "reset", move_file=True) logger.info(f"已重置插件配置: {plugin_id},备份: {backup_path}") - return {"success": True, "message": "配置已重置,下次加载插件时将使用默认配置", "backup": str(backup_path)} + return {"success": True, "message": "配置已重置,运行时将自动刷新为默认配置", "backup": str(backup_path)} except HTTPException: raise except Exception as e: