From cab502e727f2b4f3f8afd1e5bcd2e8000d056f00 Mon Sep 17 00:00:00 2001 From: UnCLAS-Prommer Date: Sun, 15 Mar 2026 23:48:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=8B=BC=E5=86=99=EF=BC=9B?= =?UTF-8?q?=E5=B0=86str=E6=8D=A2=E4=B8=BA=E8=B7=AF=E5=BE=84=E4=BD=BF?= =?UTF-8?q?=E5=85=B6=E6=9B=B4=E6=B8=85=E6=99=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot.py | 1 + src/plugin_runtime/capabilities/components.py | 13 +- src/plugin_runtime/host/supervisor.py | 5 +- src/plugin_runtime/integration.py | 139 +++++++++--------- 4 files changed, 80 insertions(+), 78 deletions(-) diff --git a/bot.py b/bot.py index 68acbcd2..ac3f0a1c 100644 --- a/bot.py +++ b/bot.py @@ -8,6 +8,7 @@ import asyncio import hashlib import os import platform + # import shutil import subprocess import sys diff --git a/src/plugin_runtime/capabilities/components.py b/src/plugin_runtime/capabilities/components.py index 61bb0e39..aa7ceb46 100644 --- a/src/plugin_runtime/capabilities/components.py +++ b/src/plugin_runtime/capabilities/components.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Protocol from src.common.logger import get_logger @@ -19,9 +20,9 @@ class _RuntimeComponentManagerProtocol(Protocol): self, name: str, component_type: str ) -> tuple[Optional["RegisteredComponent"], Optional[str]]: ... - def _find_duplicate_plugin_ids(self, plugin_dirs: List[str]) -> Dict[str, List[str]]: ... + def _find_duplicate_plugin_ids(self, plugin_dirs: List[Path]) -> Dict[str, List[Path]]: ... - def _iter_plugin_dirs(self) -> Iterable[str]: ... + def _iter_plugin_dirs(self) -> Iterable[Path]: ... class RuntimeComponentCapabilityMixin: @@ -159,11 +160,9 @@ class RuntimeComponentCapabilityMixin: if not plugin_name: return {"success": False, "error": "缺少必要参数 plugin_name"} - import os - if duplicate_plugin_ids := self._find_duplicate_plugin_ids(list(self._iter_plugin_dirs())): details = "; ".join( - f"{conflict_plugin_id}: {', '.join(paths)}" + f"{conflict_plugin_id}: {', '.join(str(path) for path in paths)}" for conflict_plugin_id, paths in sorted(duplicate_plugin_ids.items()) ) return {"success": False, "error": f"检测到重复插件 ID,拒绝热重载: {details}"} @@ -185,7 +184,7 @@ class RuntimeComponentCapabilityMixin: for sv in self.supervisors: for pdir in sv._plugin_dirs: - if os.path.isdir(os.path.join(pdir, plugin_name)): + if (pdir / plugin_name).is_dir(): try: reloaded = await sv.reload_plugins(reason=f"load {plugin_name}") if reloaded: @@ -211,7 +210,7 @@ class RuntimeComponentCapabilityMixin: if duplicate_plugin_ids := self._find_duplicate_plugin_ids(list(self._iter_plugin_dirs())): details = "; ".join( - f"{conflict_plugin_id}: {', '.join(paths)}" + f"{conflict_plugin_id}: {', '.join(str(path) for path in paths)}" for conflict_plugin_id, paths in sorted(duplicate_plugin_ids.items()) ) return {"success": False, "error": f"检测到重复插件 ID,拒绝热重载: {details}"} diff --git a/src/plugin_runtime/host/supervisor.py b/src/plugin_runtime/host/supervisor.py index fe969213..bfa00cbf 100644 --- a/src/plugin_runtime/host/supervisor.py +++ b/src/plugin_runtime/host/supervisor.py @@ -14,6 +14,7 @@ import contextlib import logging as stdlib_logging import os import sys +from pathlib import Path from src.common.logger import get_logger from src.config.config import MMC_VERSION, global_config @@ -95,7 +96,7 @@ class PluginSupervisor: def __init__( self, - plugin_dirs: Optional[List[str]] = None, + plugin_dirs: Optional[List[Path]] = None, socket_path: Optional[str] = None, health_check_interval_sec: Optional[float] = None, max_restart_attempts: Optional[int] = None, @@ -520,7 +521,7 @@ class PluginSupervisor: env = os.environ.copy() env[ENV_IPC_ADDRESS] = address env[ENV_SESSION_TOKEN] = token - env[ENV_PLUGIN_DIRS] = os.pathsep.join(self._plugin_dirs) + env[ENV_PLUGIN_DIRS] = os.pathsep.join(str(p) for p in self._plugin_dirs) env[ENV_HOST_VERSION] = MMC_VERSION self._runner_process = await asyncio.create_subprocess_exec( diff --git a/src/plugin_runtime/integration.py b/src/plugin_runtime/integration.py index 125aad8d..acb96abd 100644 --- a/src/plugin_runtime/integration.py +++ b/src/plugin_runtime/integration.py @@ -8,11 +8,11 @@ """ from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Sequence, Tuple +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Sequence, Tuple, Coroutine import asyncio import json -import os +import tomlkit from src.common.logger import get_logger from src.config.config import config_manager, global_config @@ -58,7 +58,7 @@ class PluginRuntimeManager( from src.plugin_runtime.host.supervisor import PluginSupervisor self._builtin_supervisor: Optional[PluginSupervisor] = None - self._thirdparty_supervisor: Optional[PluginSupervisor] = None + self._third_party_supervisor: Optional[PluginSupervisor] = None self._started: bool = False self._config_reload_callback_registered: bool = False self._plugin_file_watcher: Optional[FileWatcher] = None @@ -67,16 +67,16 @@ class PluginRuntimeManager( # ─── 插件目录 ───────────────────────────────────────────── @staticmethod - def _get_builtin_plugin_dirs() -> List[str]: - """内置插件目录: src/plugins/built_in/""" - candidate = os.path.abspath(os.path.join("src", "plugins", "built_in")) - return [candidate] if os.path.isdir(candidate) else [] + def _get_builtin_plugin_dirs() -> List[Path]: + """内置插件目录:src/plugins/built_in/""" + candidate = Path("src", "plugins", "built_in").resolve() + return [candidate] if candidate.is_dir() else [] @staticmethod - def _get_thirdparty_plugin_dirs() -> List[str]: - """第三方插件目录: plugins/""" - candidate = os.path.abspath("plugins") - return [candidate] if os.path.isdir(candidate) else [] + def _get_third_party_plugin_dirs() -> List[Path]: + """第三方插件目录:plugins/""" + candidate = Path("plugins").resolve() + return [candidate] if candidate.is_dir() else [] # ─── 生命周期 ───────────────────────────────────────────── @@ -94,16 +94,17 @@ class PluginRuntimeManager( from src.plugin_runtime.host.supervisor import PluginSupervisor builtin_dirs = self._get_builtin_plugin_dirs() - thirdparty_dirs = self._get_thirdparty_plugin_dirs() + third_party_dirs = self._get_third_party_plugin_dirs() - if duplicate_plugin_ids := self._find_duplicate_plugin_ids(builtin_dirs + thirdparty_dirs): + if duplicate_plugin_ids := self._find_duplicate_plugin_ids(builtin_dirs + third_party_dirs): details = "; ".join( - f"{plugin_id}: {', '.join(paths)}" for plugin_id, paths in sorted(duplicate_plugin_ids.items()) + f"{plugin_id}: {', '.join(str(p) for p in paths)}" + for plugin_id, paths in sorted(duplicate_plugin_ids.items()) ) logger.error(f"检测到重复插件 ID,拒绝启动插件运行时: {details}") return - if not builtin_dirs and not thirdparty_dirs: + if not builtin_dirs and not third_party_dirs: logger.info("未找到任何插件目录,跳过插件运行时启动") return @@ -112,7 +113,7 @@ class PluginRuntimeManager( # 当用户指定了自定义路径时,为两个 Supervisor 添加后缀以避免 UDS 冲突 builtin_socket = f"{socket_path_base}-builtin" if socket_path_base else None - thirdparty_socket = f"{socket_path_base}-thirdparty" if socket_path_base else None + third_party_socket = f"{socket_path_base}-third_party" if socket_path_base else None # 创建两个 Supervisor,各自拥有独立的 socket / Runner 子进程 if builtin_dirs: @@ -122,25 +123,25 @@ class PluginRuntimeManager( ) self._register_capability_impls(self._builtin_supervisor) - if thirdparty_dirs: - self._thirdparty_supervisor = PluginSupervisor( - plugin_dirs=thirdparty_dirs, - socket_path=thirdparty_socket, + if third_party_dirs: + self._third_party_supervisor = PluginSupervisor( + plugin_dirs=third_party_dirs, + socket_path=third_party_socket, ) - self._register_capability_impls(self._thirdparty_supervisor) + self._register_capability_impls(self._third_party_supervisor) - started_supervisors = [] + started_supervisors: List[PluginSupervisor] = [] try: if self._builtin_supervisor: await self._builtin_supervisor.start() started_supervisors.append(self._builtin_supervisor) - if self._thirdparty_supervisor: - await self._thirdparty_supervisor.start() - started_supervisors.append(self._thirdparty_supervisor) + if self._third_party_supervisor: + await self._third_party_supervisor.start() + started_supervisors.append(self._third_party_supervisor) self._register_config_reload_callback() await self._start_plugin_file_watcher() self._started = True - logger.info(f"插件运行时已启动 — 内置: {builtin_dirs or '无'}, 第三方: {thirdparty_dirs or '无'}") + logger.info(f"插件运行时已启动 — 内置: {builtin_dirs or '无'}, 第三方: {third_party_dirs or '无'}") except Exception as e: logger.error(f"插件运行时启动失败: {e}", exc_info=True) await self._stop_plugin_file_watcher() @@ -148,7 +149,7 @@ class PluginRuntimeManager( await asyncio.gather(*(sv.stop() for sv in started_supervisors), return_exceptions=True) self._started = False self._builtin_supervisor = None - self._thirdparty_supervisor = None + self._third_party_supervisor = None async def stop(self) -> None: """停止所有插件运行时""" @@ -158,30 +159,30 @@ class PluginRuntimeManager( await self._stop_plugin_file_watcher() self._unregister_config_reload_callback() - coros = [] + coroutines: List[Coroutine[Any, Any, None]] = [] if self._builtin_supervisor: - coros.append(self._builtin_supervisor.stop()) - if self._thirdparty_supervisor: - coros.append(self._thirdparty_supervisor.stop()) + coroutines.append(self._builtin_supervisor.stop()) + if self._third_party_supervisor: + coroutines.append(self._third_party_supervisor.stop()) try: - await asyncio.gather(*coros, return_exceptions=True) + await asyncio.gather(*coroutines, return_exceptions=True) logger.info("插件运行时已停止") except Exception as e: logger.error(f"插件运行时停止失败: {e}", exc_info=True) finally: self._started = False self._builtin_supervisor = None - self._thirdparty_supervisor = None + self._third_party_supervisor = None @property def is_running(self) -> bool: return self._started @property - def supervisors(self) -> List[Any]: + def supervisors(self): """获取所有活跃的 Supervisor""" - return [s for s in (self._builtin_supervisor, self._thirdparty_supervisor) if s is not None] + return [s for s in (self._builtin_supervisor, self._third_party_supervisor) if s is not None] async def notify_plugin_config_updated( self, @@ -189,7 +190,13 @@ class PluginRuntimeManager( config_data: Optional[Dict[str, Any]] = None, config_version: str = "", ) -> bool: - """向拥有该插件的 Supervisor 推送配置更新事件。""" + """向拥有该插件的 Supervisor 推送配置更新事件。 + + Args: + plugin_id: 插件 ID + config_data: 可选的配置数据(如果为 None 则由 Supervisor 从磁盘加载) + config_version: 可选的配置版本字符串,供 Supervisor 进行版本控制 + """ if not self._started: return False @@ -205,7 +212,7 @@ class PluginRuntimeManager( config_payload = ( config_data if config_data is not None - else self._load_plugin_config_for_supervisor(plugin_id, getattr(sv, "_plugin_dirs", [])) + else self._load_plugin_config_for_supervisor(plugin_id, plugin_dirs=sv._plugin_dirs) ) await sv.notify_plugin_config_updated( plugin_id=plugin_id, @@ -303,46 +310,41 @@ class PluginRuntimeManager( timeout_ms=timeout_ms, ) - def _get_supervisors_for_plugin(self, plugin_id: str) -> List[Any]: - return [ - supervisor - for supervisor in self.supervisors - if plugin_id in getattr(supervisor, "_registered_plugins", {}) - ] + def _get_supervisors_for_plugin(self, plugin_id: str): + return [supervisor for supervisor in self.supervisors if plugin_id in supervisor._registered_plugins] - def _get_supervisor_for_plugin(self, plugin_id: str) -> Optional[Any]: + def _get_supervisor_for_plugin(self, plugin_id: str): matches = self._get_supervisors_for_plugin(plugin_id) if len(matches) > 1: raise RuntimeError(f"插件 {plugin_id} 同时存在于多个 Supervisor 中,无法安全路由") return matches[0] if matches else None @staticmethod - def _find_duplicate_plugin_ids(plugin_dirs: List[str]) -> Dict[str, List[str]]: - plugin_locations: Dict[str, List[str]] = {} + def _find_duplicate_plugin_ids(plugin_dirs: List[Path]) -> Dict[str, List[Path]]: + plugin_locations: Dict[str, List[Path]] = {} for base_dir in plugin_dirs: - if not os.path.isdir(base_dir): + if not base_dir.is_dir(): continue - for entry in os.listdir(base_dir): - plugin_dir = os.path.join(base_dir, entry) - if not os.path.isdir(plugin_dir): + for entry in base_dir.iterdir(): + if not entry.is_dir(): continue - manifest_path = os.path.join(plugin_dir, "_manifest.json") - plugin_path = os.path.join(plugin_dir, "plugin.py") - if not os.path.exists(manifest_path) or not os.path.exists(plugin_path): + manifest_path = entry / "_manifest.json" + plugin_path = entry / "plugin.py" + if not manifest_path.exists() or not plugin_path.exists(): continue - plugin_id = entry + plugin_id = entry.name try: with open(manifest_path, "r", encoding="utf-8") as manifest_file: manifest = json.load(manifest_file) - plugin_id = str(manifest.get("name", entry)).strip() or entry + plugin_id = str(manifest.get("name", entry.name)).strip() or entry.name except Exception: continue - plugin_locations.setdefault(plugin_id, []).append(plugin_dir) + plugin_locations.setdefault(plugin_id, []).append(entry) return { - plugin_id: sorted(dict.fromkeys(paths)) + plugin_id: sorted(dict.fromkeys(paths), key=lambda p: str(p)) for plugin_id, paths in plugin_locations.items() if len(set(paths)) > 1 } @@ -363,7 +365,7 @@ class PluginRuntimeManager( if self._plugin_file_watcher is not None and self._plugin_file_watcher.running: return - watch_paths = [Path(path).resolve() for path in self._iter_plugin_dirs() if os.path.isdir(path)] + watch_paths = [path.resolve() for path in self._iter_plugin_dirs() if path.is_dir()] if not watch_paths: return @@ -388,7 +390,7 @@ class PluginRuntimeManager( await self._plugin_file_watcher.stop() self._plugin_file_watcher = None - def _iter_plugin_dirs(self) -> Iterable[str]: + def _iter_plugin_dirs(self) -> Iterable[Path]: for supervisor in self.supervisors: yield from getattr(supervisor, "_plugin_dirs", []) @@ -398,7 +400,8 @@ class PluginRuntimeManager( if duplicate_plugin_ids := self._find_duplicate_plugin_ids(list(self._iter_plugin_dirs())): details = "; ".join( - f"{plugin_id}: {', '.join(paths)}" for plugin_id, paths in sorted(duplicate_plugin_ids.items()) + f"{plugin_id}: {', '.join(str(path) for path in paths)}" + for plugin_id, paths in sorted(duplicate_plugin_ids.items()) ) logger.error(f"检测到重复插件 ID,跳过本次插件热重载: {details}") return @@ -441,35 +444,33 @@ class PluginRuntimeManager( return id(supervisor) @staticmethod - def _plugin_dir_matches(path: Path, plugin_dir: str) -> bool: - plugin_root = Path(plugin_dir).resolve() + def _plugin_dir_matches(path: Path, plugin_dir: Path) -> bool: + plugin_root = plugin_dir.resolve() return path == plugin_root or path.is_relative_to(plugin_root) def _match_plugin_id_for_supervisor(self, supervisor: Any, path: Path) -> Optional[str]: for plugin_id, _reg in getattr(supervisor, "_registered_plugins", {}).items(): for plugin_dir in getattr(supervisor, "_plugin_dirs", []): - candidate_dir = Path(plugin_dir).resolve() / plugin_id + candidate_dir = plugin_dir.resolve() / plugin_id if path == candidate_dir or path.is_relative_to(candidate_dir): return plugin_id for plugin_dir in getattr(supervisor, "_plugin_dirs", []): - plugin_root = Path(plugin_dir).resolve() + plugin_root = plugin_dir.resolve() if self._plugin_dir_matches(path, plugin_dir) and (relative_parts := path.relative_to(plugin_root).parts): return relative_parts[0] return None @staticmethod - def _load_plugin_config_for_supervisor(plugin_id: str, plugin_dirs: Iterable[str]) -> Dict[str, Any]: - import tomlkit - + def _load_plugin_config_for_supervisor(plugin_id: str, plugin_dirs: Iterable[Path]) -> Dict[str, Any]: for plugin_dir in plugin_dirs: - plugin_path = Path(plugin_dir).resolve() / plugin_id + plugin_path = plugin_dir.resolve() / plugin_id if plugin_path.is_dir(): config_path = plugin_path / "config.toml" if not config_path.exists(): return {} with open(config_path, "r", encoding="utf-8") as handle: - return dict(tomlkit.load(handle)) + return tomlkit.load(handle).unwrap() return {} # ─── 能力实现注册 ──────────────────────────────────────────