Refactor plugin loader and runner to support enhanced manifest structure

- Updated the PluginMeta class to utilize a strongly typed PluginManifest, improving type safety and clarity.
- Refactored dependency extraction logic to streamline the handling of plugin dependencies.
- Modified the PluginLoader to accommodate new manifest versioning and validation processes.
- Enhanced the PluginRunner to work with a dictionary for external available plugins, allowing for version mapping.
- Updated built-in plugins' manifest files to version 2, adding URLs and SDK versioning for better integration and documentation.
- Improved error handling and logging for plugin loading and dependency resolution processes.
This commit is contained in:
DrSmoothl
2026-03-23 22:59:01 +08:00
parent 0c508995dd
commit 1f02171a63
15 changed files with 1676 additions and 711 deletions

View File

@@ -18,7 +18,7 @@ ENV_HOST_VERSION = "MAIBOT_HOST_VERSION"
"""Runner 读取的 Host 应用版本号,用于 manifest 兼容性校验"""
ENV_EXTERNAL_PLUGIN_IDS = "MAIBOT_EXTERNAL_PLUGIN_IDS"
"""Runner 启动时可视为已满足的外部插件依赖列表JSON 数组"""
"""Runner 启动时可视为已满足的外部插件依赖版本映射JSON 对象"""
ENV_GLOBAL_CONFIG_SNAPSHOT = "MAIBOT_GLOBAL_CONFIG_SNAPSHOT"
"""Runner 启动时注入的全局配置快照JSON 对象)"""

View File

@@ -191,7 +191,7 @@ class RuntimeComponentCapabilityMixin:
return None, None, "缺少必要参数 api_name"
if "." in normalized_api_name:
target_plugin_id, target_api_name = normalized_api_name.split(".", 1)
target_plugin_id, target_api_name = normalized_api_name.rsplit(".", 1)
try:
supervisor = self._get_supervisor_for_plugin(target_plugin_id)
except RuntimeError as exc:
@@ -282,7 +282,7 @@ class RuntimeComponentCapabilityMixin:
return None, None, "缺少必要参数 name"
if "." in normalized_name:
plugin_id, api_name = normalized_name.split(".", 1)
plugin_id, api_name = normalized_name.rsplit(".", 1)
try:
supervisor = self._get_supervisor_for_plugin(plugin_id)
except RuntimeError as exc:

View File

@@ -116,7 +116,7 @@ class PluginRunnerSupervisor:
self._runner_process: Optional[asyncio.subprocess.Process] = None
self._registered_plugins: Dict[str, RegisterPluginPayload] = {}
self._message_gateway_states: Dict[str, Dict[str, _MessageGatewayRuntimeState]] = {}
self._external_available_plugin_ids: List[str] = []
self._external_available_plugins: Dict[str, str] = {}
self._runner_ready_events: asyncio.Event = asyncio.Event()
self._runner_ready_payloads: RunnerReadyPayload = RunnerReadyPayload()
self._health_task: Optional[asyncio.Task[None]] = None
@@ -166,21 +166,34 @@ class PluginRunnerSupervisor:
"""返回底层 RPC 服务端。"""
return self._rpc_server
def set_external_available_plugin_ids(self, plugin_ids: List[str]) -> None:
"""设置当前 Runner 启动/重载时可视为已满足的外部依赖列表。"""
def set_external_available_plugins(self, plugin_versions: Dict[str, str]) -> None:
"""设置当前 Runner 启动/重载时可视为已满足的外部依赖版本映射。
normalized_plugin_ids = {
str(plugin_id or "").strip()
for plugin_id in plugin_ids
if str(plugin_id or "").strip()
Args:
plugin_versions: 外部插件版本映射,键为插件 ID值为插件版本。
"""
self._external_available_plugins = {
str(plugin_id or "").strip(): str(plugin_version or "").strip()
for plugin_id, plugin_version in plugin_versions.items()
if str(plugin_id or "").strip() and str(plugin_version or "").strip()
}
self._external_available_plugin_ids = sorted(normalized_plugin_ids)
def get_loaded_plugin_ids(self) -> List[str]:
"""返回当前 Supervisor 已注册的插件 ID 列表。"""
return sorted(self._registered_plugins.keys())
def get_loaded_plugin_versions(self) -> Dict[str, str]:
"""返回当前 Supervisor 已注册插件的版本映射。
Returns:
Dict[str, str]: 已注册插件版本映射,键为插件 ID值为插件版本。
"""
return {
plugin_id: registration.plugin_version
for plugin_id, registration in self._registered_plugins.items()
}
async def dispatch_event(
self,
event_type: str,
@@ -373,14 +386,14 @@ class PluginRunnerSupervisor:
self,
plugin_id: str,
reason: str = "manual",
external_available_plugins: Optional[List[str]] = None,
external_available_plugins: Optional[Dict[str, str]] = None,
) -> bool:
"""按插件 ID 触发精确重载。
Args:
plugin_id: 目标插件 ID。
reason: 重载原因。
external_available_plugins: 视为已满足的外部依赖插件 ID 列表
external_available_plugins: 视为已满足的外部依赖插件版本映射
Returns:
bool: 是否重载成功。
@@ -392,7 +405,7 @@ class PluginRunnerSupervisor:
payload={
"plugin_id": plugin_id,
"reason": reason,
"external_available_plugins": external_available_plugins or self._external_available_plugin_ids,
"external_available_plugins": external_available_plugins or self._external_available_plugins,
},
timeout_ms=max(int(self._runner_spawn_timeout * 1000), 10000),
)
@@ -409,14 +422,14 @@ class PluginRunnerSupervisor:
self,
plugin_ids: Optional[List[str]] = None,
reason: str = "manual",
external_available_plugins: Optional[List[str]] = None,
external_available_plugins: Optional[Dict[str, str]] = None,
) -> bool:
"""批量重载插件。
Args:
plugin_ids: 目标插件 ID 列表;为空时重载当前已注册的全部插件。
reason: 重载原因。
external_available_plugins: 视为已满足的外部依赖插件 ID 列表
external_available_plugins: 视为已满足的外部依赖插件版本映射
Returns:
bool: 是否全部重载成功。
@@ -1136,7 +1149,7 @@ class PluginRunnerSupervisor:
global_config_snapshot = config_manager.get_global_config().model_dump()
global_config_snapshot["model"] = config_manager.get_model_config().model_dump()
return {
ENV_EXTERNAL_PLUGIN_IDS: json.dumps(self._external_available_plugin_ids, ensure_ascii=False),
ENV_EXTERNAL_PLUGIN_IDS: json.dumps(self._external_available_plugins, ensure_ascii=False),
ENV_GLOBAL_CONFIG_SNAPSHOT: json.dumps(global_config_snapshot, ensure_ascii=False),
ENV_HOST_VERSION: PROTOCOL_VERSION,
ENV_IPC_ADDRESS: self._transport.get_address(),

View File

@@ -11,7 +11,6 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Coroutine, Dict, Iterable, List, Optional, Sequence, Set, Tuple
import asyncio
import json
import tomlkit
@@ -26,6 +25,7 @@ from src.plugin_runtime.capabilities import (
)
from src.plugin_runtime.capabilities.registry import register_capability_impls
from src.plugin_runtime.host.message_utils import MessageDict, PluginMessageUtils
from src.plugin_runtime.runner.manifest_validator import ManifestValidator
if TYPE_CHECKING:
from src.chat.message_receive.message import SessionMessage
@@ -69,6 +69,7 @@ class PluginRuntimeManager(
self._plugin_source_watcher_subscription_id: Optional[str] = None
self._plugin_config_watcher_subscriptions: Dict[str, Tuple[Path, str]] = {}
self._plugin_path_cache: Dict[str, Path] = {}
self._manifest_validator: ManifestValidator = ManifestValidator()
self._config_reload_callback: Callable[[Sequence[str]], Awaitable[None]] = self._handle_main_config_reload
self._config_reload_callback_registered: bool = False
@@ -102,46 +103,11 @@ class PluginRuntimeManager(
candidate = Path("plugins").resolve()
return [candidate] if candidate.is_dir() else []
@staticmethod
def _extract_manifest_dependencies(manifest: Dict[str, Any]) -> List[str]:
"""从插件 manifest 中提取规范化后的依赖插件 ID 列表。"""
dependencies: List[str] = []
for dependency in manifest.get("dependencies", []):
if isinstance(dependency, str):
normalized_dependency = dependency.strip()
elif isinstance(dependency, dict):
normalized_dependency = str(dependency.get("name", "") or "").strip()
else:
normalized_dependency = ""
if normalized_dependency:
dependencies.append(normalized_dependency)
return dependencies
@classmethod
def _discover_plugin_dependency_map(cls, plugin_dirs: Iterable[Path]) -> Dict[str, List[str]]:
"""扫描指定插件目录集合,返回 ``plugin_id -> dependencies`` 映射。"""
dependency_map: Dict[str, List[str]] = {}
for plugin_dir in cls._iter_candidate_plugin_paths(plugin_dirs):
manifest_path = plugin_dir / "_manifest.json"
entrypoint_path = plugin_dir / "plugin.py"
if not manifest_path.is_file() or not entrypoint_path.is_file():
continue
try:
with manifest_path.open("r", encoding="utf-8") as manifest_file:
manifest = json.load(manifest_file)
except Exception:
continue
if not isinstance(manifest, dict):
continue
plugin_id = str(manifest.get("name", plugin_dir.name) or "").strip() or plugin_dir.name
dependency_map[plugin_id] = cls._extract_manifest_dependencies(manifest)
return dependency_map
validator = ManifestValidator()
return validator.build_plugin_dependency_map(plugin_dirs)
@classmethod
def _build_group_start_order(
@@ -243,12 +209,12 @@ class PluginRuntimeManager(
if supervisor is None:
continue
external_plugin_ids = [
plugin_id
external_plugin_versions = {
plugin_id: plugin_version
for started_supervisor in started_supervisors
for plugin_id in started_supervisor.get_loaded_plugin_ids()
]
supervisor.set_external_available_plugin_ids(external_plugin_ids)
for plugin_id, plugin_version in started_supervisor.get_loaded_plugin_versions().items()
}
supervisor.set_external_available_plugins(external_plugin_versions)
await supervisor.start()
started_supervisors.append(supervisor)
@@ -366,23 +332,22 @@ class PluginRuntimeManager(
for plugin_id in supervisor.get_loaded_plugin_ids()
}
def _build_external_available_plugins_for_supervisor(self, target_supervisor: "PluginSupervisor") -> List[str]:
"""收集某个 Supervisor 可用的外部插件 ID 列表"""
def _build_external_available_plugins_for_supervisor(self, target_supervisor: "PluginSupervisor") -> Dict[str, str]:
"""收集某个 Supervisor 可用的外部插件版本映射"""
external_plugin_ids: Set[str] = set()
external_plugin_versions: Dict[str, str] = {}
for supervisor in self.supervisors:
if supervisor is target_supervisor:
continue
external_plugin_ids.update(supervisor.get_loaded_plugin_ids())
return sorted(external_plugin_ids)
external_plugin_versions.update(supervisor.get_loaded_plugin_versions())
return external_plugin_versions
def _find_supervisor_by_plugin_directory(self, plugin_id: str) -> Optional["PluginSupervisor"]:
"""根据插件目录推断应负责该插件重载的 Supervisor。"""
for supervisor in self.supervisors:
for plugin_dir in supervisor._plugin_dirs:
if (Path(plugin_dir) / plugin_id).is_dir():
return supervisor
if self._get_plugin_path_for_supervisor(supervisor, plugin_id) is not None:
return supervisor
return None
def _warn_skipped_cross_supervisor_reload(
@@ -740,30 +705,13 @@ class PluginRuntimeManager(
external_available_plugins=self._build_external_available_plugins_for_supervisor(supervisor),
)
@staticmethod
def _find_duplicate_plugin_ids(plugin_dirs: List[Path]) -> Dict[str, List[Path]]:
@classmethod
def _find_duplicate_plugin_ids(cls, plugin_dirs: List[Path]) -> Dict[str, List[Path]]:
"""扫描插件目录,找出被多个目录重复声明的插件 ID。"""
plugin_locations: Dict[str, List[Path]] = {}
for base_dir in plugin_dirs:
if not base_dir.is_dir():
continue
for entry in base_dir.iterdir():
if not entry.is_dir():
continue
manifest_path = entry / "_manifest.json"
plugin_path = entry / "plugin.py"
if not manifest_path.exists() or not plugin_path.exists():
continue
plugin_id = entry.name
try:
with open(manifest_path, "r", encoding="utf-8") as manifest_file:
manifest = json.load(manifest_file)
plugin_id = str(manifest.get("name", entry.name)).strip() or entry.name
except Exception:
continue
plugin_locations.setdefault(plugin_id, []).append(entry)
validator = ManifestValidator()
for plugin_path, manifest in validator.iter_plugin_manifests(plugin_dirs):
plugin_locations.setdefault(manifest.id, []).append(plugin_path)
return {
plugin_id: sorted(dict.fromkeys(paths), key=lambda p: str(p))
@@ -831,8 +779,7 @@ class PluginRuntimeManager(
if entry.is_dir():
yield entry.resolve()
@staticmethod
def _read_plugin_id_from_plugin_path(plugin_path: Path) -> Optional[str]:
def _read_plugin_id_from_plugin_path(self, plugin_path: Path) -> Optional[str]:
"""从单个插件目录中读取 manifest 声明的插件 ID。
Args:
@@ -841,22 +788,7 @@ class PluginRuntimeManager(
Returns:
Optional[str]: 解析成功时返回插件 ID否则返回 ``None``。
"""
manifest_path = plugin_path / "_manifest.json"
entrypoint_path = plugin_path / "plugin.py"
if not manifest_path.is_file() or not entrypoint_path.is_file():
return None
try:
with open(manifest_path, "r", encoding="utf-8") as manifest_file:
manifest = json.load(manifest_file)
except Exception:
return None
if not isinstance(manifest, dict):
return None
plugin_id = str(manifest.get("name", plugin_path.name)).strip() or plugin_path.name
return plugin_id or None
return self._manifest_validator.read_plugin_id_from_plugin_path(plugin_path)
def _iter_discovered_plugin_paths(self, plugin_dirs: Iterable[Path]) -> Iterable[Tuple[str, Path]]:
"""迭代目录中可解析到的插件 ID 与实际目录路径。

View File

@@ -282,8 +282,11 @@ class ReloadPluginPayload(BaseModel):
"""目标插件 ID"""
reason: str = Field(default="manual", description="重载原因")
"""重载原因"""
external_available_plugins: List[str] = Field(default_factory=list, description="可视为已满足的外部依赖插件 ID")
"""可视为已满足的外部依赖插件 ID"""
external_available_plugins: Dict[str, str] = Field(
default_factory=dict,
description="可视为已满足的外部依赖插件版本映射",
)
"""可视为已满足的外部依赖插件版本映射"""
class ReloadPluginResultPayload(BaseModel):

File diff suppressed because it is too large Load Diff

View File

@@ -13,16 +13,16 @@ from typing import Any, Dict, Iterator, List, Optional, Set, Tuple
import contextlib
import importlib
import importlib.util
import json
import os
import re
import sys
from src.common.logger import get_logger
from src.plugin_runtime.runner.manifest_validator import ManifestValidator
from src.plugin_runtime.runner.manifest_validator import ManifestValidator, PluginManifest
logger = get_logger("plugin_runtime.runner.plugin_loader")
PluginCandidate = Tuple[Path, Dict[str, Any], Path]
PluginCandidate = Tuple[Path, PluginManifest, Path]
class PluginMeta:
@@ -34,7 +34,7 @@ class PluginMeta:
plugin_dir: str,
module_name: str,
plugin_instance: Any,
manifest: Dict[str, Any],
manifest: PluginManifest,
) -> None:
"""初始化插件元数据。
@@ -43,36 +43,16 @@ class PluginMeta:
plugin_dir: 插件目录绝对路径。
module_name: 插件入口模块名。
plugin_instance: 插件实例对象。
manifest: 解析后的 manifest 内容
manifest: 解析后的强类型 Manifest。
"""
self.plugin_id = plugin_id
self.plugin_dir = plugin_dir
self.module_name = module_name
self.instance = plugin_instance
self.manifest = manifest
self.version = manifest.get("version", "1.0.0")
self.capabilities_required = manifest.get("capabilities", [])
self.dependencies: List[str] = self._extract_dependencies(manifest)
@staticmethod
def _extract_dependencies(manifest: Dict[str, Any]) -> List[str]:
"""从 manifest 中提取依赖列表。
Args:
manifest: 插件 manifest。
Returns:
List[str]: 规范化后的依赖插件 ID 列表。
"""
raw = manifest.get("dependencies", [])
result: List[str] = []
for dep in raw:
if isinstance(dep, str):
result.append(dep.strip())
elif isinstance(dep, dict):
if name := str(dep.get("name", "")).strip():
result.append(name)
return result
self.version = manifest.version
self.capabilities_required = list(manifest.capabilities)
self.dependencies: List[str] = list(manifest.plugin_dependency_ids)
class PluginLoader:
@@ -98,13 +78,13 @@ class PluginLoader:
def discover_and_load(
self,
plugin_dirs: List[str],
extra_available: Optional[Set[str]] = None,
extra_available: Optional[Dict[str, str]] = None,
) -> List[PluginMeta]:
"""扫描多个目录并加载所有插件。
Args:
plugin_dirs: 插件目录列表。
extra_available: 额外视为已满足的外部依赖插件 ID 集合
extra_available: 额外视为已满足的外部依赖插件版本映射
Returns:
List[PluginMeta]: 成功加载的插件元数据列表,按依赖顺序排列。
@@ -164,26 +144,17 @@ class PluginLoader:
def _discover_single_candidate(self, plugin_dir: Path) -> Optional[Tuple[str, PluginCandidate]]:
"""发现并校验单个插件目录。"""
manifest_path = plugin_dir / "_manifest.json"
plugin_path = plugin_dir / "plugin.py"
if not manifest_path.exists() or not plugin_path.exists():
if not plugin_path.exists():
return None
try:
with manifest_path.open("r", encoding="utf-8") as manifest_file:
manifest: Dict[str, Any] = json.load(manifest_file)
except Exception as e:
self._failed_plugins[plugin_dir.name] = f"manifest 解析失败: {e}"
logger.error(f"插件 {plugin_dir.name} manifest 解析失败: {e}")
return None
if not self._manifest_validator.validate(manifest):
manifest = self._manifest_validator.load_from_plugin_path(plugin_dir)
if manifest is None:
errors = "; ".join(self._manifest_validator.errors)
self._failed_plugins[plugin_dir.name] = f"manifest 校验失败: {errors}"
return None
plugin_id = str(manifest.get("name", plugin_dir.name)).strip() or plugin_dir.name
plugin_id = manifest.id
return plugin_id, (plugin_dir, manifest, plugin_path)
def _record_duplicate_candidates(self, duplicate_candidates: Dict[str, List[Path]]) -> None:
@@ -253,7 +224,7 @@ class PluginLoader:
"""
removed_modules: List[str] = []
plugin_path = Path(plugin_dir).resolve()
synthetic_module_name = f"_maibot_plugin_{plugin_id}"
synthetic_module_name = self._build_safe_module_name(plugin_id)
for module_name, module in list(sys.modules.items()):
if module_name == synthetic_module_name:
@@ -277,6 +248,21 @@ class PluginLoader:
importlib.invalidate_caches()
return removed_modules
@staticmethod
def _build_safe_module_name(plugin_id: str) -> str:
"""将插件 ID 转换为可用于动态导入的安全模块名。
Args:
plugin_id: 原始插件 ID。
Returns:
str: 仅包含字母、数字和下划线的合成模块名。
"""
normalized_plugin_id = re.sub(r"[^0-9A-Za-z_]", "_", str(plugin_id or "").strip())
if normalized_plugin_id and normalized_plugin_id[0].isdigit():
normalized_plugin_id = f"_{normalized_plugin_id}"
return f"_maibot_plugin_{normalized_plugin_id or 'plugin'}"
def list_plugins(self) -> List[str]:
"""列出所有已加载的插件 ID"""
return list(self._loaded_plugins.keys())
@@ -286,18 +272,27 @@ class PluginLoader:
"""返回当前记录的失败插件原因映射。"""
return dict(self._failed_plugins)
@property
def manifest_validator(self) -> ManifestValidator:
"""返回当前加载器持有的 Manifest 校验器。
Returns:
ManifestValidator: 当前使用的 Manifest 校验器实例。
"""
return self._manifest_validator
# ──── 依赖解析 ────────────────────────────────────────────
def resolve_dependencies(
self,
candidates: Dict[str, PluginCandidate],
extra_available: Optional[Set[str]] = None,
extra_available: Optional[Dict[str, str]] = None,
) -> Tuple[List[str], Dict[str, str]]:
"""解析候选插件的依赖顺序。
Args:
candidates: 待加载的候选插件集合。
extra_available: 视为已满足的外部依赖插件 ID 集合
extra_available: 视为已满足的外部依赖插件版本映射
Returns:
Tuple[List[str], Dict[str, str]]: 可加载顺序和失败原因映射。
@@ -320,36 +315,71 @@ class PluginLoader:
def _resolve_dependencies(
self,
candidates: Dict[str, PluginCandidate],
extra_available: Optional[Set[str]] = None,
extra_available: Optional[Dict[str, str]] = None,
) -> Tuple[List[str], Dict[str, str]]:
"""拓扑排序解析加载顺序,返回 (有序列表, 失败项 {id: reason})。"""
available = set(candidates.keys())
satisfied_dependencies = set(extra_available or set())
satisfied_dependencies = {
str(plugin_id or "").strip(): str(plugin_version or "").strip()
for plugin_id, plugin_version in (extra_available or {}).items()
if str(plugin_id or "").strip() and str(plugin_version or "").strip()
}
dep_graph: Dict[str, Set[str]] = {}
failed: Dict[str, str] = {}
for pid, (_, manifest, _) in candidates.items():
raw_deps = manifest.get("dependencies", [])
resolved: Set[str] = set()
missing: List[str] = []
for dep in raw_deps:
dep_name = dep if isinstance(dep, str) else str(dep.get("name", ""))
dep_name = dep_name.strip()
if not dep_name or dep_name == pid:
missing_or_incompatible: List[str] = []
for dependency in manifest.plugin_dependencies:
dependency_id = dependency.id
if dependency_id in available:
dependency_manifest = candidates[dependency_id][1]
if not self._manifest_validator.is_plugin_dependency_satisfied(
dependency,
dependency_manifest.version,
):
missing_or_incompatible.append(
f"{dependency_id} (需要 {dependency.version_spec},当前 {dependency_manifest.version})"
)
continue
resolved.add(dependency_id)
continue
if dep_name in available:
resolved.add(dep_name)
elif dep_name in satisfied_dependencies:
external_dependency_version = satisfied_dependencies.get(dependency_id)
if external_dependency_version is None:
missing_or_incompatible.append(f"{dependency_id} (未找到依赖插件)")
continue
else:
missing.append(dep_name)
if missing:
failed[pid] = f"缺少依赖: {', '.join(missing)}"
if not self._manifest_validator.is_plugin_dependency_satisfied(
dependency,
external_dependency_version,
):
missing_or_incompatible.append(
f"{dependency_id} (需要 {dependency.version_spec},当前 {external_dependency_version})"
)
if missing_or_incompatible:
failed[pid] = f"依赖未满足: {', '.join(missing_or_incompatible)}"
dep_graph[pid] = resolved
# 移除失败项
for pid in failed:
dep_graph.pop(pid, None)
# 迭代传播“依赖自身加载失败”到上游依赖方,避免误报为循环依赖
changed = True
while changed:
changed = False
failed_plugin_ids = set(failed)
for pid, dependencies in list(dep_graph.items()):
if pid in failed:
dep_graph.pop(pid, None)
continue
failed_dependencies = sorted(dependency for dependency in dependencies if dependency in failed_plugin_ids)
if not failed_dependencies:
continue
failed[pid] = f"依赖未满足: {', '.join(f'{dependency} (依赖插件加载失败)' for dependency in failed_dependencies)}"
dep_graph.pop(pid, None)
changed = True
# Kahn 拓扑排序
indegree = {pid: len(deps) for pid, deps in dep_graph.items()}
@@ -382,7 +412,7 @@ class PluginLoader:
self,
plugin_id: str,
plugin_dir: Path,
manifest: Dict[str, Any],
manifest: PluginManifest,
plugin_path: Path,
) -> Optional[PluginMeta]:
"""加载单个插件"""
@@ -390,7 +420,7 @@ class PluginLoader:
self._ensure_compat_hook()
# 动态导入插件模块
module_name = f"_maibot_plugin_{plugin_id}"
module_name = self._build_safe_module_name(plugin_id)
spec = importlib.util.spec_from_file_location(module_name, str(plugin_path))
if spec is None or spec.loader is None:
logger.error(f"无法创建模块 spec: {plugin_path}")
@@ -409,7 +439,7 @@ class PluginLoader:
if create_plugin is not None:
instance = create_plugin()
self._validate_sdk_plugin_contract(plugin_id, instance)
logger.info(f"插件 {plugin_id} v{manifest.get('version', '?')} 加载成功")
logger.info(f"插件 {plugin_id} v{manifest.version} 加载成功")
return PluginMeta(
plugin_id=plugin_id,
plugin_dir=str(plugin_dir),
@@ -422,7 +452,7 @@ class PluginLoader:
instance = self._try_load_legacy_plugin(module, plugin_id)
if instance is not None:
logger.info(
f"插件 {plugin_id} v{manifest.get('version', '?')} 通过旧版兼容层加载成功(请尽快迁移到 maibot_sdk"
f"插件 {plugin_id} v{manifest.version} 通过旧版兼容层加载成功(请尽快迁移到 maibot_sdk"
)
return PluginMeta(
plugin_id=plugin_id,

View File

@@ -10,7 +10,7 @@
"""
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Protocol, Set, Tuple, cast
from typing import Any, Callable, Dict, List, Optional, Protocol, Set, cast
import asyncio
import contextlib
@@ -47,7 +47,7 @@ from src.plugin_runtime.protocol.envelope import (
)
from src.plugin_runtime.protocol.errors import ErrorCode
from src.plugin_runtime.runner.log_handler import RunnerIPCLogHandler
from src.plugin_runtime.runner.plugin_loader import PluginLoader, PluginMeta
from src.plugin_runtime.runner.plugin_loader import PluginCandidate, PluginLoader, PluginMeta
from src.plugin_runtime.runner.rpc_client import RPCClient
logger = get_logger("plugin_runtime.runner.main")
@@ -119,7 +119,7 @@ class PluginRunner:
host_address: str,
session_token: str,
plugin_dirs: List[str],
external_available_plugin_ids: Optional[List[str]] = None,
external_available_plugins: Optional[Dict[str, str]] = None,
) -> None:
"""初始化 Runner。
@@ -127,15 +127,15 @@ class PluginRunner:
host_address: Host 的 IPC 地址。
session_token: 握手用会话令牌。
plugin_dirs: 当前 Runner 负责扫描的插件目录列表。
external_available_plugin_ids: 视为已满足的外部依赖插件 ID 列表
external_available_plugins: 视为已满足的外部依赖插件版本映射
"""
self._host_address: str = host_address
self._session_token: str = session_token
self._plugin_dirs: List[str] = plugin_dirs
self._external_available_plugin_ids: Set[str] = {
str(plugin_id or "").strip()
for plugin_id in (external_available_plugin_ids or [])
if str(plugin_id or "").strip()
self._external_available_plugins: Dict[str, str] = {
str(plugin_id or "").strip(): str(plugin_version or "").strip()
for plugin_id, plugin_version in (external_available_plugins or {}).items()
if str(plugin_id or "").strip() and str(plugin_version or "").strip()
}
self._rpc_client: RPCClient = RPCClient(host_address, session_token)
@@ -166,7 +166,7 @@ class PluginRunner:
# 3. 加载插件
plugins = self._loader.discover_and_load(
self._plugin_dirs,
extra_available=self._external_available_plugin_ids,
extra_available=self._external_available_plugins,
)
logger.info(f"已加载 {len(plugins)} 个插件")
@@ -611,14 +611,14 @@ class PluginRunner:
self,
plugin_id: str,
reason: str,
external_available_plugins: Optional[Set[str]] = None,
external_available_plugins: Optional[Dict[str, str]] = None,
) -> ReloadPluginResultPayload:
"""按插件 ID 在 Runner 进程内执行精确重载。
Args:
plugin_id: 目标插件 ID。
reason: 重载原因。
external_available_plugins: 视为已满足的外部依赖插件 ID 集合
external_available_plugins: 视为已满足的外部依赖插件版本映射
Returns:
ReloadPluginResultPayload: 结构化重载结果。
@@ -626,9 +626,9 @@ class PluginRunner:
candidates, duplicate_candidates = self._loader.discover_candidates(self._plugin_dirs)
failed_plugins: Dict[str, str] = {}
normalized_external_available = {
str(candidate_plugin_id or "").strip()
for candidate_plugin_id in (external_available_plugins or set())
if str(candidate_plugin_id or "").strip()
str(candidate_plugin_id or "").strip(): str(candidate_plugin_version or "").strip()
for candidate_plugin_id, candidate_plugin_version in (external_available_plugins or {}).items()
if str(candidate_plugin_id or "").strip() and str(candidate_plugin_version or "").strip()
}
if plugin_id in duplicate_candidates:
@@ -668,7 +668,7 @@ class PluginRunner:
self._loader.purge_plugin_modules(unload_plugin_id, meta.plugin_dir)
unloaded_plugins.append(unload_plugin_id)
reload_candidates: Dict[str, Tuple[Path, Dict[str, Any], Path]] = {}
reload_candidates: Dict[str, PluginCandidate] = {}
for target_plugin_id in target_plugin_ids:
candidate = candidates.get(target_plugin_id)
if candidate is None:
@@ -678,11 +678,25 @@ class PluginRunner:
load_order, dependency_failures = self._loader.resolve_dependencies(
reload_candidates,
extra_available=retained_plugin_ids | normalized_external_available,
extra_available={
**normalized_external_available,
**{
retained_plugin_id: retained_meta.version
for retained_plugin_id in retained_plugin_ids
if (retained_meta := self._loader.get_plugin(retained_plugin_id)) is not None
},
},
)
failed_plugins.update(dependency_failures)
available_plugins = set(retained_plugin_ids) | normalized_external_available
available_plugins = {
**normalized_external_available,
**{
retained_plugin_id: retained_meta.version
for retained_plugin_id in retained_plugin_ids
if (retained_meta := self._loader.get_plugin(retained_plugin_id)) is not None
},
}
reloaded_plugins: List[str] = []
for load_plugin_id in load_order:
@@ -694,10 +708,12 @@ class PluginRunner:
continue
_, manifest, _ = candidate
dependencies = PluginMeta._extract_dependencies(manifest)
missing_dependencies = [dependency for dependency in dependencies if dependency not in available_plugins]
if missing_dependencies:
failed_plugins[load_plugin_id] = f"依赖未满足: {', '.join(missing_dependencies)}"
unsatisfied_dependencies = self._loader.manifest_validator.get_unsatisfied_plugin_dependencies(
manifest,
available_plugin_versions=available_plugins,
)
if unsatisfied_dependencies:
failed_plugins[load_plugin_id] = f"依赖未满足: {', '.join(unsatisfied_dependencies)}"
continue
meta = self._loader.load_candidate(load_plugin_id, candidate)
@@ -710,7 +726,7 @@ class PluginRunner:
failed_plugins[load_plugin_id] = "插件初始化失败"
continue
available_plugins.add(load_plugin_id)
available_plugins[load_plugin_id] = meta.version
reloaded_plugins.append(load_plugin_id)
if failed_plugins:
@@ -1079,7 +1095,7 @@ class PluginRunner:
result = await self._reload_plugin_by_id(
payload.plugin_id,
payload.reason,
external_available_plugins=set(payload.external_available_plugins),
external_available_plugins=dict(payload.external_available_plugins),
)
return envelope.make_response(payload=result.model_dump())
@@ -1185,13 +1201,13 @@ async def _async_main() -> None:
plugin_dirs = [d for d in plugin_dirs_str.split(os.pathsep) if d]
try:
external_plugin_ids = json.loads(external_plugin_ids_raw) if external_plugin_ids_raw else []
external_plugin_ids = json.loads(external_plugin_ids_raw) if external_plugin_ids_raw else {}
except json.JSONDecodeError:
logger.warning("解析外部依赖插件列表失败,已回退为空列表")
external_plugin_ids = []
if not isinstance(external_plugin_ids, list):
logger.warning("外部依赖插件列表格式非法,已回退为空列表")
external_plugin_ids = []
logger.warning("解析外部依赖插件版本映射失败,已回退为空映射")
external_plugin_ids = {}
if not isinstance(external_plugin_ids, dict):
logger.warning("外部依赖插件版本映射格式非法,已回退为空映射")
external_plugin_ids = {}
# sys.path 隔离: 只保留标准库、SDK 包、插件目录
_isolate_sys_path(plugin_dirs)
@@ -1200,7 +1216,10 @@ async def _async_main() -> None:
host_address,
session_token,
plugin_dirs,
external_available_plugin_ids=[str(plugin_id) for plugin_id in external_plugin_ids],
external_available_plugins={
str(plugin_id): str(plugin_version)
for plugin_id, plugin_version in external_plugin_ids.items()
},
)
# 注册信号处理