From df088205dd68594e0733b0c9558ca3245a540299 Mon Sep 17 00:00:00 2001 From: DrSmoothl <1787882683@qq.com> Date: Mon, 16 Mar 2026 08:22:42 +0800 Subject: [PATCH] =?UTF-8?q?refactor(plugin=5Floader):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=8F=92=E4=BB=B6=E5=8F=91=E7=8E=B0=E4=B8=8E=E5=8A=A0=E8=BD=BD?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BD=BF=E7=94=A8=20Path=20?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E5=A2=9E=E5=BC=BA=E5=8F=AF=E8=AF=BB=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugin_runtime/runner/plugin_loader.py | 133 ++++++++++++--------- 1 file changed, 79 insertions(+), 54 deletions(-) diff --git a/src/plugin_runtime/runner/plugin_loader.py b/src/plugin_runtime/runner/plugin_loader.py index 1b055dfe..11ba45e7 100644 --- a/src/plugin_runtime/runner/plugin_loader.py +++ b/src/plugin_runtime/runner/plugin_loader.py @@ -7,7 +7,8 @@ """ from collections import deque -from typing import Any, Dict, List, Optional, Set, Tuple +from pathlib import Path +from typing import Any, Dict, Iterator, List, Optional, Set, Tuple import contextlib import importlib @@ -21,6 +22,8 @@ from src.plugin_runtime.runner.manifest_validator import ManifestValidator logger = get_logger("plugin_runtime.runner.plugin_loader") +PluginCandidate = Tuple[Path, Dict[str, Any], Path] + class PluginMeta: """加载后的插件元数据""" @@ -77,66 +80,92 @@ class PluginLoader: Returns: 成功加载的插件元数据列表(按依赖顺序) """ - # 第一阶段:发现并校验 manifest - candidates: Dict[str, Tuple[str, Dict[str, Any], str]] = {} # id -> (dir, manifest, plugin_path) - duplicate_candidates: Dict[str, List[str]] = {} - for base_dir in plugin_dirs: - if not os.path.isdir(base_dir): + candidates, duplicate_candidates = self._discover_candidates(plugin_dirs) + self._record_duplicate_candidates(duplicate_candidates) + + # 第二阶段:依赖解析(拓扑排序) + load_order, failed_deps = self._resolve_dependencies(candidates) + self._record_failed_dependencies(failed_deps) + + # 第三阶段:按依赖顺序加载 + return self._load_plugins_in_order(load_order, candidates) + + def _discover_candidates(self, plugin_dirs: List[str]) -> Tuple[Dict[str, PluginCandidate], Dict[str, List[Path]]]: + """扫描插件目录并收集候选插件。""" + candidates: Dict[str, PluginCandidate] = {} + duplicate_candidates: Dict[str, List[Path]] = {} + + for base_dir_str in plugin_dirs: + base_dir = Path(base_dir_str) + if not base_dir.is_dir(): logger.warning(f"插件目录不存在: {base_dir}") continue - for entry in os.listdir(base_dir): - plugin_dir = os.path.join(base_dir, entry) - if not os.path.isdir(plugin_dir): + for plugin_dir in sorted(entry for entry in base_dir.iterdir() if entry.is_dir()): + discovered = self._discover_single_candidate(plugin_dir) + if discovered is None: continue - manifest_path = os.path.join(plugin_dir, "_manifest.json") - plugin_path = os.path.join(plugin_dir, "plugin.py") - - if not os.path.exists(manifest_path) or not os.path.exists(plugin_path): - continue - - try: - with open(manifest_path, "r", encoding="utf-8") as f: - manifest = json.load(f) - except Exception as e: - self._failed_plugins[entry] = f"manifest 解析失败: {e}" - logger.error(f"插件 {entry} manifest 解析失败: {e}") - continue - - if not self._manifest_validator.validate(manifest): - errors = "; ".join(self._manifest_validator.errors) - self._failed_plugins[entry] = f"manifest 校验失败: {errors}" - continue - - plugin_id = str(manifest.get("name", entry)).strip() or entry + plugin_id, candidate = discovered if plugin_id in duplicate_candidates: - duplicate_candidates[plugin_id].append(plugin_dir) + duplicate_candidates[plugin_id].append(candidate[0]) continue previous = candidates.get(plugin_id) if previous is not None: - duplicate_candidates[plugin_id] = [previous[0], plugin_dir] + duplicate_candidates[plugin_id] = [previous[0], candidate[0]] candidates.pop(plugin_id, None) continue - candidates[plugin_id] = (plugin_dir, manifest, plugin_path) + candidates[plugin_id] = candidate + return candidates, duplicate_candidates + + def _discover_single_candidate(self, plugin_dir: Path) -> Optional[Tuple[str, PluginCandidate]]: + """发现并校验单个插件目录。""" + manifest_path = plugin_dir / "_manifest.json" + plugin_path = plugin_dir / "plugin.py" + + if not manifest_path.exists() or not plugin_path.exists(): + return None + + try: + with manifest_path.open("r", encoding="utf-8") as manifest_file: + manifest: Dict[str, Any] = json.load(manifest_file) + except Exception as e: + self._failed_plugins[plugin_dir.name] = f"manifest 解析失败: {e}" + logger.error(f"插件 {plugin_dir.name} manifest 解析失败: {e}") + return None + + if not self._manifest_validator.validate(manifest): + errors = "; ".join(self._manifest_validator.errors) + self._failed_plugins[plugin_dir.name] = f"manifest 校验失败: {errors}" + return None + + plugin_id = str(manifest.get("name", plugin_dir.name)).strip() or plugin_dir.name + return plugin_id, (plugin_dir, manifest, plugin_path) + + def _record_duplicate_candidates(self, duplicate_candidates: Dict[str, List[Path]]) -> None: + """记录重复插件 ID 错误。""" for plugin_id, conflict_dirs in duplicate_candidates.items(): - unique_dirs = sorted(dict.fromkeys(conflict_dirs)) + unique_dirs = sorted({str(path) for path in conflict_dirs}) reason = f"检测到重复插件 ID: {plugin_id} -> {', '.join(unique_dirs)}" self._failed_plugins[plugin_id] = reason logger.error(reason) - # 第二阶段:依赖解析(拓扑排序) - load_order, failed_deps = self._resolve_dependencies(candidates) + def _record_failed_dependencies(self, failed_deps: Dict[str, str]) -> None: + """记录依赖解析失败信息。""" + for plugin_id, reason in failed_deps.items(): + self._failed_plugins[plugin_id] = reason + logger.error(f"插件 {plugin_id} 依赖解析失败: {reason}") - for pid, reason in failed_deps.items(): - self._failed_plugins[pid] = reason - logger.error(f"插件 {pid} 依赖解析失败: {reason}") - - # 第三阶段:按依赖顺序加载 - results = [] + def _load_plugins_in_order( + self, + load_order: List[str], + candidates: Dict[str, PluginCandidate], + ) -> List[PluginMeta]: + """按依赖顺序加载插件。""" + results: List[PluginMeta] = [] for plugin_id in load_order: plugin_dir, manifest, plugin_path = candidates[plugin_id] try: @@ -165,7 +194,7 @@ class PluginLoader: def _resolve_dependencies( self, - candidates: Dict[str, Tuple[str, Dict[str, Any], str]], + candidates: Dict[str, PluginCandidate], ) -> Tuple[List[str], Dict[str, str]]: """拓扑排序解析加载顺序,返回 (有序列表, 失败项 {id: reason})。""" available = set(candidates.keys()) @@ -223,9 +252,9 @@ class PluginLoader: def _load_single_plugin( self, plugin_id: str, - plugin_dir: str, + plugin_dir: Path, manifest: Dict[str, Any], - plugin_path: str, + plugin_path: Path, ) -> Optional[PluginMeta]: """加载单个插件""" # 确保兼容层导入钩子已安装(旧版插件可能 import src.plugin_system) @@ -233,7 +262,7 @@ class PluginLoader: # 动态导入插件模块 module_name = f"_maibot_plugin_{plugin_id}" - spec = importlib.util.spec_from_file_location(module_name, plugin_path) + spec = importlib.util.spec_from_file_location(module_name, str(plugin_path)) if spec is None or spec.loader is None: logger.error(f"无法创建模块 spec: {plugin_path}") return None @@ -241,7 +270,7 @@ class PluginLoader: module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module - plugin_parent_dir = os.path.normpath(os.path.dirname(plugin_dir)) + plugin_parent_dir = plugin_dir.parent with self._temporary_sys_path_entry(plugin_parent_dir): spec.loader.exec_module(module) @@ -252,7 +281,7 @@ class PluginLoader: logger.info(f"插件 {plugin_id} v{manifest.get('version', '?')} 加载成功") return PluginMeta( plugin_id=plugin_id, - plugin_dir=plugin_dir, + plugin_dir=str(plugin_dir), plugin_instance=instance, manifest=manifest, ) @@ -265,7 +294,7 @@ class PluginLoader: ) return PluginMeta( plugin_id=plugin_id, - plugin_dir=plugin_dir, + plugin_dir=str(plugin_dir), plugin_instance=instance, manifest=manifest, ) @@ -275,13 +304,9 @@ class PluginLoader: @staticmethod @contextlib.contextmanager - def _temporary_sys_path_entry(path: str): + def _temporary_sys_path_entry(path: Path) -> Iterator[None]: """临时将路径放入 sys.path 头部,并在离开作用域后恢复。""" - if not path: - yield - return - - normalized_path = os.path.normpath(path) + normalized_path = os.path.normpath(str(path)) existing_paths = {os.path.normpath(entry) for entry in sys.path} inserted = normalized_path not in existing_paths if inserted: