refactor(plugin_loader): 优化插件发现与加载逻辑,使用 Path 类型增强可读性

This commit is contained in:
DrSmoothl
2026-03-16 08:22:42 +08:00
parent 7136deac93
commit df088205dd

View File

@@ -7,7 +7,8 @@
"""
from collections import deque
from typing import Any, Dict, List, Optional, Set, Tuple
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple
import contextlib
import importlib
@@ -21,6 +22,8 @@ from src.plugin_runtime.runner.manifest_validator import ManifestValidator
logger = get_logger("plugin_runtime.runner.plugin_loader")
PluginCandidate = Tuple[Path, Dict[str, Any], Path]
class PluginMeta:
"""加载后的插件元数据"""
@@ -77,66 +80,92 @@ class PluginLoader:
Returns:
成功加载的插件元数据列表(按依赖顺序)
"""
# 第一阶段:发现并校验 manifest
candidates: Dict[str, Tuple[str, Dict[str, Any], str]] = {} # id -> (dir, manifest, plugin_path)
duplicate_candidates: Dict[str, List[str]] = {}
for base_dir in plugin_dirs:
if not os.path.isdir(base_dir):
candidates, duplicate_candidates = self._discover_candidates(plugin_dirs)
self._record_duplicate_candidates(duplicate_candidates)
# 第二阶段:依赖解析(拓扑排序)
load_order, failed_deps = self._resolve_dependencies(candidates)
self._record_failed_dependencies(failed_deps)
# 第三阶段:按依赖顺序加载
return self._load_plugins_in_order(load_order, candidates)
def _discover_candidates(self, plugin_dirs: List[str]) -> Tuple[Dict[str, PluginCandidate], Dict[str, List[Path]]]:
"""扫描插件目录并收集候选插件。"""
candidates: Dict[str, PluginCandidate] = {}
duplicate_candidates: Dict[str, List[Path]] = {}
for base_dir_str in plugin_dirs:
base_dir = Path(base_dir_str)
if not base_dir.is_dir():
logger.warning(f"插件目录不存在: {base_dir}")
continue
for entry in os.listdir(base_dir):
plugin_dir = os.path.join(base_dir, entry)
if not os.path.isdir(plugin_dir):
for plugin_dir in sorted(entry for entry in base_dir.iterdir() if entry.is_dir()):
discovered = self._discover_single_candidate(plugin_dir)
if discovered is None:
continue
manifest_path = os.path.join(plugin_dir, "_manifest.json")
plugin_path = os.path.join(plugin_dir, "plugin.py")
if not os.path.exists(manifest_path) or not os.path.exists(plugin_path):
continue
try:
with open(manifest_path, "r", encoding="utf-8") as f:
manifest = json.load(f)
except Exception as e:
self._failed_plugins[entry] = f"manifest 解析失败: {e}"
logger.error(f"插件 {entry} manifest 解析失败: {e}")
continue
if not self._manifest_validator.validate(manifest):
errors = "; ".join(self._manifest_validator.errors)
self._failed_plugins[entry] = f"manifest 校验失败: {errors}"
continue
plugin_id = str(manifest.get("name", entry)).strip() or entry
plugin_id, candidate = discovered
if plugin_id in duplicate_candidates:
duplicate_candidates[plugin_id].append(plugin_dir)
duplicate_candidates[plugin_id].append(candidate[0])
continue
previous = candidates.get(plugin_id)
if previous is not None:
duplicate_candidates[plugin_id] = [previous[0], plugin_dir]
duplicate_candidates[plugin_id] = [previous[0], candidate[0]]
candidates.pop(plugin_id, None)
continue
candidates[plugin_id] = (plugin_dir, manifest, plugin_path)
candidates[plugin_id] = candidate
return candidates, duplicate_candidates
def _discover_single_candidate(self, plugin_dir: Path) -> Optional[Tuple[str, PluginCandidate]]:
"""发现并校验单个插件目录。"""
manifest_path = plugin_dir / "_manifest.json"
plugin_path = plugin_dir / "plugin.py"
if not manifest_path.exists() or not plugin_path.exists():
return None
try:
with manifest_path.open("r", encoding="utf-8") as manifest_file:
manifest: Dict[str, Any] = json.load(manifest_file)
except Exception as e:
self._failed_plugins[plugin_dir.name] = f"manifest 解析失败: {e}"
logger.error(f"插件 {plugin_dir.name} manifest 解析失败: {e}")
return None
if not self._manifest_validator.validate(manifest):
errors = "; ".join(self._manifest_validator.errors)
self._failed_plugins[plugin_dir.name] = f"manifest 校验失败: {errors}"
return None
plugin_id = str(manifest.get("name", plugin_dir.name)).strip() or plugin_dir.name
return plugin_id, (plugin_dir, manifest, plugin_path)
def _record_duplicate_candidates(self, duplicate_candidates: Dict[str, List[Path]]) -> None:
"""记录重复插件 ID 错误。"""
for plugin_id, conflict_dirs in duplicate_candidates.items():
unique_dirs = sorted(dict.fromkeys(conflict_dirs))
unique_dirs = sorted({str(path) for path in conflict_dirs})
reason = f"检测到重复插件 ID: {plugin_id} -> {', '.join(unique_dirs)}"
self._failed_plugins[plugin_id] = reason
logger.error(reason)
# 第二阶段:依赖解析(拓扑排序)
load_order, failed_deps = self._resolve_dependencies(candidates)
def _record_failed_dependencies(self, failed_deps: Dict[str, str]) -> None:
"""记录依赖解析失败信息。"""
for plugin_id, reason in failed_deps.items():
self._failed_plugins[plugin_id] = reason
logger.error(f"插件 {plugin_id} 依赖解析失败: {reason}")
for pid, reason in failed_deps.items():
self._failed_plugins[pid] = reason
logger.error(f"插件 {pid} 依赖解析失败: {reason}")
# 第三阶段:按依赖顺序加载
results = []
def _load_plugins_in_order(
self,
load_order: List[str],
candidates: Dict[str, PluginCandidate],
) -> List[PluginMeta]:
"""按依赖顺序加载插件。"""
results: List[PluginMeta] = []
for plugin_id in load_order:
plugin_dir, manifest, plugin_path = candidates[plugin_id]
try:
@@ -165,7 +194,7 @@ class PluginLoader:
def _resolve_dependencies(
self,
candidates: Dict[str, Tuple[str, Dict[str, Any], str]],
candidates: Dict[str, PluginCandidate],
) -> Tuple[List[str], Dict[str, str]]:
"""拓扑排序解析加载顺序,返回 (有序列表, 失败项 {id: reason})。"""
available = set(candidates.keys())
@@ -223,9 +252,9 @@ class PluginLoader:
def _load_single_plugin(
self,
plugin_id: str,
plugin_dir: str,
plugin_dir: Path,
manifest: Dict[str, Any],
plugin_path: str,
plugin_path: Path,
) -> Optional[PluginMeta]:
"""加载单个插件"""
# 确保兼容层导入钩子已安装(旧版插件可能 import src.plugin_system
@@ -233,7 +262,7 @@ class PluginLoader:
# 动态导入插件模块
module_name = f"_maibot_plugin_{plugin_id}"
spec = importlib.util.spec_from_file_location(module_name, plugin_path)
spec = importlib.util.spec_from_file_location(module_name, str(plugin_path))
if spec is None or spec.loader is None:
logger.error(f"无法创建模块 spec: {plugin_path}")
return None
@@ -241,7 +270,7 @@ class PluginLoader:
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
plugin_parent_dir = os.path.normpath(os.path.dirname(plugin_dir))
plugin_parent_dir = plugin_dir.parent
with self._temporary_sys_path_entry(plugin_parent_dir):
spec.loader.exec_module(module)
@@ -252,7 +281,7 @@ class PluginLoader:
logger.info(f"插件 {plugin_id} v{manifest.get('version', '?')} 加载成功")
return PluginMeta(
plugin_id=plugin_id,
plugin_dir=plugin_dir,
plugin_dir=str(plugin_dir),
plugin_instance=instance,
manifest=manifest,
)
@@ -265,7 +294,7 @@ class PluginLoader:
)
return PluginMeta(
plugin_id=plugin_id,
plugin_dir=plugin_dir,
plugin_dir=str(plugin_dir),
plugin_instance=instance,
manifest=manifest,
)
@@ -275,13 +304,9 @@ class PluginLoader:
@staticmethod
@contextlib.contextmanager
def _temporary_sys_path_entry(path: str):
def _temporary_sys_path_entry(path: Path) -> Iterator[None]:
"""临时将路径放入 sys.path 头部,并在离开作用域后恢复。"""
if not path:
yield
return
normalized_path = os.path.normpath(path)
normalized_path = os.path.normpath(str(path))
existing_paths = {os.path.normpath(entry) for entry in sys.path}
inserted = normalized_path not in existing_paths
if inserted: