From d92aa800a3f9e74c54a218c1a86257ccfbfd7127 Mon Sep 17 00:00:00 2001 From: DrSmoothl <1787882683@qq.com> Date: Fri, 13 Mar 2026 15:47:49 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=9B=B4=E6=96=B0=E9=80=9A=E7=9F=A5=E6=9C=BA?= =?UTF-8?q?=E5=88=B6=EF=BC=8C=E6=94=AF=E6=8C=81=E7=83=AD=E9=87=8D=E8=BD=BD?= =?UTF-8?q?=E4=B8=8E=E6=96=87=E4=BB=B6=E5=8F=98=E5=8C=96=E7=9B=91=E5=90=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytests/test_plugin_runtime.py | 76 ++++++++++ src/plugin_runtime/host/supervisor.py | 24 ++++ src/plugin_runtime/integration.py | 175 +++++++++++++++++++++++- src/plugin_runtime/protocol/envelope.py | 6 +- 4 files changed, 276 insertions(+), 5 deletions(-) diff --git a/pytests/test_plugin_runtime.py b/pytests/test_plugin_runtime.py index b00c6142..ac1f5b9e 100644 --- a/pytests/test_plugin_runtime.py +++ b/pytests/test_plugin_runtime.py @@ -1690,3 +1690,79 @@ class TestIntegration: assert manager.is_running is False assert len(instances) == 2 assert instances[0].stopped is True + + @pytest.mark.asyncio + async def test_handle_plugin_file_changes_routes_reload_and_config_update(self, monkeypatch, tmp_path): + from pathlib import Path + + from src.config.file_watcher import FileChange + from src.plugin_runtime import integration as integration_module + + builtin_root = tmp_path / "src" / "plugins" / "built_in" + thirdparty_root = tmp_path / "plugins" + alpha_dir = builtin_root / "alpha" + beta_dir = thirdparty_root / "beta" + alpha_dir.mkdir(parents=True) + beta_dir.mkdir(parents=True) + (alpha_dir / "config.toml").write_text("enabled = true\n", encoding="utf-8") + (beta_dir / "config.toml").write_text("enabled = false\n", encoding="utf-8") + + monkeypatch.chdir(tmp_path) + + class FakeSupervisor: + def __init__(self, plugin_dirs, registered_plugins): + self._plugin_dirs = plugin_dirs + self._registered_plugins = registered_plugins + self.reload_reasons = [] + self.config_updates = [] + + async def reload_plugins(self, reason="manual"): + self.reload_reasons.append(reason) + + async def notify_plugin_config_updated(self, plugin_id, config_data, config_version=""): + self.config_updates.append((plugin_id, config_data, config_version)) + return True + + manager = integration_module.PluginRuntimeManager() + manager._started = True + manager._builtin_supervisor = FakeSupervisor([str(builtin_root)], {"alpha": object()}) + manager._thirdparty_supervisor = FakeSupervisor([str(thirdparty_root)], {"beta": object()}) + + changes = [ + FileChange(change_type=1, path=alpha_dir / "config.toml"), + FileChange(change_type=1, path=beta_dir / "plugin.py"), + ] + + await manager._handle_plugin_file_changes(changes) + + assert manager._builtin_supervisor.reload_reasons == [] + assert manager._thirdparty_supervisor.reload_reasons == ["file_watcher"] + assert manager._builtin_supervisor.config_updates == [ + ("alpha", {"enabled": True}, "") + ] + assert manager._thirdparty_supervisor.config_updates == [] + + @pytest.mark.asyncio + async def test_handle_config_reload_notifies_all_registered_plugins(self): + from src.plugin_runtime import integration as integration_module + + class FakeSupervisor: + def __init__(self, plugins): + self._registered_plugins = {plugin_id: object() for plugin_id in plugins} + + manager = integration_module.PluginRuntimeManager() + manager._started = True + manager._builtin_supervisor = FakeSupervisor(["alpha"]) + manager._thirdparty_supervisor = FakeSupervisor(["beta", "gamma"]) + + notified = [] + + async def fake_notify(plugin_id, config_data=None, config_version=""): + notified.append((plugin_id, config_version)) + return True + + manager.notify_plugin_config_updated = fake_notify + + await manager.handle_config_reload() + + assert notified == [("alpha", ""), ("beta", ""), ("gamma", "")] diff --git a/src/plugin_runtime/host/supervisor.py b/src/plugin_runtime/host/supervisor.py index 1ed44e5a..13ef4ff0 100644 --- a/src/plugin_runtime/host/supervisor.py +++ b/src/plugin_runtime/host/supervisor.py @@ -25,6 +25,7 @@ from src.plugin_runtime.host.policy_engine import PolicyEngine from src.plugin_runtime.host.rpc_server import RPCServer from src.plugin_runtime.host.workflow_executor import WorkflowExecutor, WorkflowContext, WorkflowResult from src.plugin_runtime.protocol.envelope import ( + ConfigUpdatedPayload, Envelope, HealthPayload, LogBatchPayload, @@ -375,6 +376,29 @@ class PluginSupervisor: logger.info("热重载完成") + async def notify_plugin_config_updated( + self, + plugin_id: str, + config_data: Dict[str, Any], + config_version: str = "", + ) -> bool: + """通知指定插件其配置已更新。""" + if plugin_id not in self._registered_plugins: + return False + + payload = ConfigUpdatedPayload( + plugin_id=plugin_id, + config_version=config_version, + config_data=config_data, + ) + await self._rpc_server.send_request( + "plugin.config_updated", + plugin_id=plugin_id, + payload=payload.model_dump(), + timeout_ms=5000, + ) + return True + # ─── 内部方法 ────────────────────────────────────────────── def _register_internal_methods(self) -> None: diff --git a/src/plugin_runtime/integration.py b/src/plugin_runtime/integration.py index d4e02b6b..e8d25eaf 100644 --- a/src/plugin_runtime/integration.py +++ b/src/plugin_runtime/integration.py @@ -7,14 +7,16 @@ 4. 提供统一的能力实现注册接口,使插件可以调用主程序功能 """ -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, Iterable, List, Optional, Tuple import asyncio import os +from pathlib import Path from src.chat.message_receive.chat_manager import BotChatSession from src.common.logger import get_logger -from src.config.config import global_config +from src.config.config import config_manager, global_config +from src.config.file_watcher import FileChange, FileWatcher logger = get_logger("plugin_runtime.integration") @@ -45,6 +47,9 @@ class PluginRuntimeManager: self._builtin_supervisor: Optional[PluginSupervisor] = None self._thirdparty_supervisor: Optional[PluginSupervisor] = None self._started: bool = False + self._config_reload_callback_registered: bool = False + self._plugin_file_watcher: Optional[FileWatcher] = None + self._plugin_file_watcher_subscription_id: Optional[str] = None # ─── 插件目录 ───────────────────────────────────────────── @@ -112,10 +117,14 @@ class PluginRuntimeManager: if self._thirdparty_supervisor: await self._thirdparty_supervisor.start() started_supervisors.append(self._thirdparty_supervisor) + self._register_config_reload_callback() + await self._start_plugin_file_watcher() self._started = True logger.info(f"插件运行时已启动 — 内置: {builtin_dirs or '无'}, 第三方: {thirdparty_dirs or '无'}") except Exception as e: logger.error(f"插件运行时启动失败: {e}", exc_info=True) + await self._stop_plugin_file_watcher() + self._unregister_config_reload_callback() await asyncio.gather(*(sv.stop() for sv in started_supervisors), return_exceptions=True) self._started = False self._builtin_supervisor = None @@ -126,6 +135,9 @@ class PluginRuntimeManager: if not self._started: return + await self._stop_plugin_file_watcher() + self._unregister_config_reload_callback() + coros = [] if self._builtin_supervisor: coros.append(self._builtin_supervisor.stop()) @@ -151,6 +163,44 @@ class PluginRuntimeManager: """获取所有活跃的 Supervisor""" return [s for s in (self._builtin_supervisor, self._thirdparty_supervisor) if s is not None] + async def notify_plugin_config_updated( + self, + plugin_id: str, + config_data: Optional[Dict[str, Any]] = None, + config_version: str = "", + ) -> bool: + """向拥有该插件的 Supervisor 推送配置更新事件。""" + if not self._started: + return False + + for sv in self.supervisors: + if plugin_id in sv._registered_plugins: + config_payload = ( + config_data + if config_data is not None + else self._load_plugin_config_for_supervisor(plugin_id, getattr(sv, "_plugin_dirs", [])) + ) + await sv.notify_plugin_config_updated( + plugin_id=plugin_id, + config_data=config_payload, + config_version=config_version, + ) + return True + return False + + async def handle_config_reload(self) -> None: + """处理主配置热重载后的插件配置通知。""" + if not self._started: + return + + tasks = [ + self.notify_plugin_config_updated(plugin_id) + for sv in self.supervisors + for plugin_id in list(sv._registered_plugins.keys()) + ] + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + # ─── 事件桥接 ────────────────────────────────────────────── async def bridge_event( @@ -228,6 +278,127 @@ class PluginRuntimeManager: ) raise RuntimeError(f"插件 {plugin_id} 未在任何 Supervisor 中注册") + def _register_config_reload_callback(self) -> None: + if self._config_reload_callback_registered: + return + config_manager.register_reload_callback(self.handle_config_reload) + self._config_reload_callback_registered = True + + def _unregister_config_reload_callback(self) -> None: + if not self._config_reload_callback_registered: + return + config_manager.unregister_reload_callback(self.handle_config_reload) + self._config_reload_callback_registered = False + + async def _start_plugin_file_watcher(self) -> None: + if self._plugin_file_watcher is not None and self._plugin_file_watcher.running: + return + + watch_paths = [Path(path).resolve() for path in self._iter_plugin_dirs() if os.path.isdir(path)] + if not watch_paths: + return + + watcher = FileWatcher( + paths=watch_paths, + debounce_ms=600, + callback_timeout_s=15.0, + callback_failure_threshold=3, + callback_cooldown_s=30.0, + ) + subscription_id = watcher.subscribe(self._handle_plugin_file_changes, paths=watch_paths) + await watcher.start() + self._plugin_file_watcher = watcher + self._plugin_file_watcher_subscription_id = subscription_id + + async def _stop_plugin_file_watcher(self) -> None: + if self._plugin_file_watcher is None: + return + if self._plugin_file_watcher_subscription_id is not None: + self._plugin_file_watcher.unsubscribe(self._plugin_file_watcher_subscription_id) + self._plugin_file_watcher_subscription_id = None + await self._plugin_file_watcher.stop() + self._plugin_file_watcher = None + + def _iter_plugin_dirs(self) -> Iterable[str]: + for supervisor in self.supervisors: + for plugin_dir in getattr(supervisor, "_plugin_dirs", []): + yield plugin_dir + + async def _handle_plugin_file_changes(self, changes: List[FileChange]) -> None: + if not self._started or not changes: + return + + reload_supervisors: List[Any] = [] + config_updates: Dict[str, set[str]] = {} + changed_paths = [change.path.resolve() for change in changes] + + for supervisor in self.supervisors: + plugin_ids_for_config = config_updates.setdefault(self._get_supervisor_key(supervisor), set()) + for path in changed_paths: + plugin_id = self._match_plugin_id_for_supervisor(supervisor, path) + if plugin_id is None: + continue + if path.name == "config.toml": + plugin_ids_for_config.add(plugin_id) + elif path.name in {"plugin.py", "_manifest.json"} or path.suffix == ".py": + if supervisor not in reload_supervisors: + reload_supervisors.append(supervisor) + + for supervisor in reload_supervisors: + await supervisor.reload_plugins(reason="file_watcher") + + for supervisor in self.supervisors: + if supervisor in reload_supervisors: + continue + for plugin_id in config_updates.get(self._get_supervisor_key(supervisor), set()): + try: + await supervisor.notify_plugin_config_updated( + plugin_id=plugin_id, + config_data=self._load_plugin_config_for_supervisor( + plugin_id, getattr(supervisor, "_plugin_dirs", []) + ), + ) + except Exception as exc: + logger.warning(f"插件 {plugin_id} 配置热更新通知失败: {exc}") + + @staticmethod + def _get_supervisor_key(supervisor: Any) -> str: + return str(id(supervisor)) + + @staticmethod + def _plugin_dir_matches(path: Path, plugin_dir: str) -> bool: + plugin_root = Path(plugin_dir).resolve() + return path == plugin_root or path.is_relative_to(plugin_root) + + def _match_plugin_id_for_supervisor(self, supervisor: Any, path: Path) -> Optional[str]: + for plugin_id, reg in getattr(supervisor, "_registered_plugins", {}).items(): + for plugin_dir in getattr(supervisor, "_plugin_dirs", []): + candidate_dir = Path(plugin_dir).resolve() / plugin_id + if path == candidate_dir or path.is_relative_to(candidate_dir): + return plugin_id + + for plugin_dir in getattr(supervisor, "_plugin_dirs", []): + plugin_root = Path(plugin_dir).resolve() + if self._plugin_dir_matches(path, plugin_dir): + relative_parts = path.relative_to(plugin_root).parts + if relative_parts: + return relative_parts[0] + return None + + @staticmethod + def _load_plugin_config_for_supervisor(plugin_id: str, plugin_dirs: Iterable[str]) -> Dict[str, Any]: + import tomlkit + + for plugin_dir in plugin_dirs: + plugin_path = Path(plugin_dir).resolve() / plugin_id + if plugin_path.is_dir(): + config_path = plugin_path / "config.toml" + if not config_path.exists(): + return {} + with open(config_path, "r", encoding="utf-8") as handle: + return dict(tomlkit.load(handle)) + return {} + # ─── 能力实现注册 ────────────────────────────────────────── def _register_capability_impls(self, supervisor: Any) -> None: diff --git a/src/plugin_runtime/protocol/envelope.py b/src/plugin_runtime/protocol/envelope.py index 61a45ed8..4ff72c81 100644 --- a/src/plugin_runtime/protocol/envelope.py +++ b/src/plugin_runtime/protocol/envelope.py @@ -194,9 +194,9 @@ class HealthPayload(BaseModel): # ─── 配置更新 ────────────────────────────────────────────────────── -# TODO: Host 侧尚未实现配置变更检测与推送。Runner 端的 _handle_config_updated -# 已就绪,但当前无任何调用方通过 RPC 发送 plugin.config_updated 消息。 -# 需要在 Supervisor 或 CapabilityService 中监听配置文件变化并主动推送。 +# Host 侧现已支持配置更新推送: +# - 总配置热重载完成后,PluginRuntimeManager 会向已加载插件推送配置更新事件。 +# - 插件目录下的 config.toml 变化由现有 FileWatcher 监听并转发为 plugin.config_updated。 class ConfigUpdatedPayload(BaseModel): """plugin.config_updated 事件 payload"""