diff --git a/src/plugin_runtime/__init__.py b/src/plugin_runtime/__init__.py index 940a4f2b..a881d399 100644 --- a/src/plugin_runtime/__init__.py +++ b/src/plugin_runtime/__init__.py @@ -13,3 +13,6 @@ ENV_SESSION_TOKEN = "MAIBOT_SESSION_TOKEN" ENV_PLUGIN_DIRS = "MAIBOT_PLUGIN_DIRS" """Runner 需要加载的插件目录列表(os.pathsep 分隔)""" + +ENV_HOST_VERSION = "MAIBOT_HOST_VERSION" +"""Runner 读取的 Host 应用版本号,用于 manifest 兼容性校验""" diff --git a/src/plugin_runtime/host/rpc_server.py b/src/plugin_runtime/host/rpc_server.py index ec27ca88..f3196f6b 100644 --- a/src/plugin_runtime/host/rpc_server.py +++ b/src/plugin_runtime/host/rpc_server.py @@ -11,6 +11,7 @@ from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple import asyncio import contextlib +import re import secrets from src.common.logger import get_logger @@ -561,13 +562,22 @@ class RPCServer: def _check_sdk_version(sdk_version: str) -> bool: """检查 SDK 版本是否在支持范围内""" try: - sdk_parts = [int(x) for x in sdk_version.split(".")] - min_parts = [int(x) for x in MIN_SDK_VERSION.split(".")] - max_parts = [int(x) for x in MAX_SDK_VERSION.split(".")] + sdk_parts = RPCServer._parse_version_tuple(sdk_version) + min_parts = RPCServer._parse_version_tuple(MIN_SDK_VERSION) + max_parts = RPCServer._parse_version_tuple(MAX_SDK_VERSION) return min_parts <= sdk_parts <= max_parts except (ValueError, AttributeError): return False + @staticmethod + def _parse_version_tuple(version: str) -> Tuple[int, int, int]: + base_version = re.split(r"[-.](?:snapshot|dev|alpha|beta|rc)", version or "", flags=re.IGNORECASE)[0] + base_version = base_version.split("+", 1)[0] + parts = [part for part in base_version.split(".") if part != ""] + while len(parts) < 3: + parts.append("0") + return (int(parts[0]), int(parts[1]), int(parts[2])) + def _get_connection_for_generation(self, generation: int) -> Optional[Connection]: if generation == self._runner_generation: return self._connection diff --git a/src/plugin_runtime/host/supervisor.py b/src/plugin_runtime/host/supervisor.py index ef222c03..fe969213 100644 --- a/src/plugin_runtime/host/supervisor.py +++ b/src/plugin_runtime/host/supervisor.py @@ -16,8 +16,8 @@ import os import sys from src.common.logger import get_logger -from src.config.config import global_config -from src.plugin_runtime import ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN +from src.config.config import MMC_VERSION, global_config +from src.plugin_runtime import ENV_HOST_VERSION, ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN from src.plugin_runtime.host.capability_service import CapabilityService from src.plugin_runtime.host.component_registry import ComponentRegistry from src.plugin_runtime.host.event_dispatcher import EventDispatcher @@ -521,6 +521,7 @@ class PluginSupervisor: env[ENV_IPC_ADDRESS] = address env[ENV_SESSION_TOKEN] = token env[ENV_PLUGIN_DIRS] = os.pathsep.join(self._plugin_dirs) + env[ENV_HOST_VERSION] = MMC_VERSION self._runner_process = await asyncio.create_subprocess_exec( sys.executable, diff --git a/src/plugin_runtime/integration.py b/src/plugin_runtime/integration.py index 94ebb62d..142aca43 100644 --- a/src/plugin_runtime/integration.py +++ b/src/plugin_runtime/integration.py @@ -10,6 +10,7 @@ from typing import Any, Dict, Iterable, List, Optional, Tuple import asyncio +import json import os from pathlib import Path @@ -83,6 +84,13 @@ class PluginRuntimeManager: builtin_dirs = self._get_builtin_plugin_dirs() thirdparty_dirs = self._get_thirdparty_plugin_dirs() + if duplicate_plugin_ids := self._find_duplicate_plugin_ids(builtin_dirs + thirdparty_dirs): + details = "; ".join( + f"{plugin_id}: {', '.join(paths)}" for plugin_id, paths in sorted(duplicate_plugin_ids.items()) + ) + logger.error(f"检测到重复插件 ID,拒绝启动插件运行时: {details}") + return + if not builtin_dirs and not thirdparty_dirs: logger.info("未找到任何插件目录,跳过插件运行时启动") return @@ -173,20 +181,26 @@ class PluginRuntimeManager: if not self._started: return False - for sv in self.supervisors: - if plugin_id in sv._registered_plugins: - config_payload = ( - config_data - if config_data is not None - else self._load_plugin_config_for_supervisor(plugin_id, getattr(sv, "_plugin_dirs", [])) - ) - await sv.notify_plugin_config_updated( - plugin_id=plugin_id, - config_data=config_payload, - config_version=config_version, - ) - return True - return False + try: + sv = self._get_supervisor_for_plugin(plugin_id) + except RuntimeError as exc: + logger.error(f"推送插件配置更新失败: {exc}") + return False + + if sv is None: + return False + + config_payload = ( + config_data + if config_data is not None + else self._load_plugin_config_for_supervisor(plugin_id, getattr(sv, "_plugin_dirs", [])) + ) + await sv.notify_plugin_config_updated( + plugin_id=plugin_id, + config_data=config_payload, + config_version=config_version, + ) + return True async def handle_config_reload(self) -> None: """处理主配置热重载后的插件配置通知。""" @@ -267,16 +281,60 @@ class PluginRuntimeManager: timeout_ms: int = 30000, ) -> Any: """将插件调用路由到拥有该插件的 Supervisor""" - for sv in self.supervisors: - if sv.component_registry.get_components_by_plugin(plugin_id): - return await sv.invoke_plugin( - method=method, - plugin_id=plugin_id, - component_name=component_name, - args=args, - timeout_ms=timeout_ms, - ) - raise RuntimeError(f"插件 {plugin_id} 未在任何 Supervisor 中注册") + sv = self._get_supervisor_for_plugin(plugin_id) + if sv is None: + raise RuntimeError(f"插件 {plugin_id} 未在任何 Supervisor 中注册") + return await sv.invoke_plugin( + method=method, + plugin_id=plugin_id, + component_name=component_name, + args=args, + timeout_ms=timeout_ms, + ) + + def _get_supervisors_for_plugin(self, plugin_id: str) -> List[Any]: + return [ + supervisor + for supervisor in self.supervisors + if plugin_id in getattr(supervisor, "_registered_plugins", {}) + ] + + def _get_supervisor_for_plugin(self, plugin_id: str) -> Optional[Any]: + matches = self._get_supervisors_for_plugin(plugin_id) + if len(matches) > 1: + raise RuntimeError(f"插件 {plugin_id} 同时存在于多个 Supervisor 中,无法安全路由") + return matches[0] if matches else None + + @staticmethod + def _find_duplicate_plugin_ids(plugin_dirs: List[str]) -> Dict[str, List[str]]: + plugin_locations: Dict[str, List[str]] = {} + for base_dir in plugin_dirs: + if not os.path.isdir(base_dir): + continue + for entry in os.listdir(base_dir): + plugin_dir = os.path.join(base_dir, entry) + if not os.path.isdir(plugin_dir): + continue + manifest_path = os.path.join(plugin_dir, "_manifest.json") + plugin_path = os.path.join(plugin_dir, "plugin.py") + if not os.path.exists(manifest_path) or not os.path.exists(plugin_path): + continue + + plugin_id = entry + try: + with open(manifest_path, "r", encoding="utf-8") as manifest_file: + manifest = json.load(manifest_file) + plugin_id = str(manifest.get("name", entry)).strip() or entry + except Exception: + continue + + plugin_locations.setdefault(plugin_id, []).append(plugin_dir) + + return { + plugin_id: sorted(dict.fromkeys(paths)) + for plugin_id, paths in plugin_locations.items() + if len(set(paths)) > 1 + } def _register_config_reload_callback(self) -> None: if self._config_reload_callback_registered: @@ -321,13 +379,19 @@ class PluginRuntimeManager: def _iter_plugin_dirs(self) -> Iterable[str]: for supervisor in self.supervisors: - for plugin_dir in getattr(supervisor, "_plugin_dirs", []): - yield plugin_dir + yield from getattr(supervisor, "_plugin_dirs", []) async def _handle_plugin_file_changes(self, changes: List[FileChange]) -> None: if not self._started or not changes: return + if duplicate_plugin_ids := self._find_duplicate_plugin_ids(list(self._iter_plugin_dirs())): + details = "; ".join( + f"{plugin_id}: {', '.join(paths)}" for plugin_id, paths in sorted(duplicate_plugin_ids.items()) + ) + logger.error(f"检测到重复插件 ID,跳过本次插件热重载: {details}") + return + reload_supervisors: List[Any] = [] config_updates: Dict[str, set[str]] = {} changed_paths = [change.path.resolve() for change in changes] @@ -362,8 +426,8 @@ class PluginRuntimeManager: logger.warning(f"插件 {plugin_id} 配置热更新通知失败: {exc}") @staticmethod - def _get_supervisor_key(supervisor: Any) -> str: - return str(id(supervisor)) + def _get_supervisor_key(supervisor: Any) -> int: + return id(supervisor) @staticmethod def _plugin_dir_matches(path: Path, plugin_dir: str) -> bool: @@ -371,7 +435,7 @@ class PluginRuntimeManager: return path == plugin_root or path.is_relative_to(plugin_root) def _match_plugin_id_for_supervisor(self, supervisor: Any, path: Path) -> Optional[str]: - for plugin_id, reg in getattr(supervisor, "_registered_plugins", {}).items(): + for plugin_id, _reg in getattr(supervisor, "_registered_plugins", {}).items(): for plugin_dir in getattr(supervisor, "_plugin_dirs", []): candidate_dir = Path(plugin_dir).resolve() / plugin_id if path == candidate_dir or path.is_relative_to(candidate_dir): @@ -379,10 +443,8 @@ class PluginRuntimeManager: for plugin_dir in getattr(supervisor, "_plugin_dirs", []): plugin_root = Path(plugin_dir).resolve() - if self._plugin_dir_matches(path, plugin_dir): - relative_parts = path.relative_to(plugin_root).parts - if relative_parts: - return relative_parts[0] + if self._plugin_dir_matches(path, plugin_dir) and (relative_parts := path.relative_to(plugin_root).parts): + return relative_parts[0] return None @staticmethod @@ -1589,6 +1651,9 @@ class PluginRuntimeManager: result: Dict[str, Any] = {} for sv in mgr.supervisors: for pid, reg in sv._registered_plugins.items(): + if pid in result: + logger.error(f"检测到重复插件 ID {pid},component.get_all_plugins 结果已拒绝聚合") + return {"success": False, "error": f"检测到重复插件 ID: {pid}"} # 从 ComponentRegistry 中获取该插件的所有组件 comps = sv.component_registry.get_components_by_plugin(pid, enabled_only=False) components_list = [ @@ -1619,7 +1684,12 @@ class PluginRuntimeManager: """ plugin_name: str = args.get("plugin_name", plugin_id) mgr = get_plugin_runtime_manager() - for sv in mgr.supervisors: + try: + sv = mgr._get_supervisor_for_plugin(plugin_name) + except RuntimeError as exc: + return {"success": False, "error": str(exc)} + + if sv is not None: reg = sv._registered_plugins.get(plugin_name) if reg is not None: return { @@ -1667,9 +1737,11 @@ class PluginRuntimeManager: if comp is not None and comp.component_type == component_type: return comp, None - for candidate in sv.component_registry.get_components_by_type(component_type, enabled_only=False): - if candidate.name == name: - short_name_matches.append(candidate) + short_name_matches.extend( + candidate + for candidate in sv.component_registry.get_components_by_type(component_type, enabled_only=False) + if candidate.name == name + ) if len(short_name_matches) == 1: return short_name_matches[0], None @@ -1737,18 +1809,28 @@ class PluginRuntimeManager: import os mgr = get_plugin_runtime_manager() + if duplicate_plugin_ids := mgr._find_duplicate_plugin_ids(list(mgr._iter_plugin_dirs())): + details = "; ".join( + f"{conflict_plugin_id}: {', '.join(paths)}" + for conflict_plugin_id, paths in sorted(duplicate_plugin_ids.items()) + ) + return {"success": False, "error": f"检测到重复插件 ID,拒绝热重载: {details}"} # 优先查找已注册该插件的 Supervisor - for sv in mgr.supervisors: - if plugin_name in sv._registered_plugins: - try: - reloaded = await sv.reload_plugins(reason=f"load {plugin_name}") - if reloaded: - return {"success": True, "count": 1} - return {"success": False, "error": f"插件 {plugin_name} 热重载失败,已回滚"} - except Exception as e: - logger.error(f"[cap.component.load_plugin] 热重载失败: {e}") - return {"success": False, "error": str(e)} + try: + registered_supervisor = mgr._get_supervisor_for_plugin(plugin_name) + except RuntimeError as exc: + return {"success": False, "error": str(exc)} + + if registered_supervisor is not None: + try: + reloaded = await registered_supervisor.reload_plugins(reason=f"load {plugin_name}") + if reloaded: + return {"success": True, "count": 1} + return {"success": False, "error": f"插件 {plugin_name} 热重载失败,已回滚"} + except Exception as e: + logger.error(f"[cap.component.load_plugin] 热重载失败: {e}") + return {"success": False, "error": str(e)} # 插件尚未注册,检查是否有 Supervisor 的 plugin_dirs 下包含该插件目录 for sv in mgr.supervisors: @@ -1784,16 +1866,27 @@ class PluginRuntimeManager: return {"success": False, "error": "缺少必要参数 plugin_name"} mgr = get_plugin_runtime_manager() - for sv in mgr.supervisors: - if plugin_name in sv._registered_plugins: - try: - reloaded = await sv.reload_plugins(reason=f"reload {plugin_name}") - if reloaded: - return {"success": True} - return {"success": False, "error": f"插件 {plugin_name} 热重载失败,已回滚"} - except Exception as e: - logger.error(f"[cap.component.reload_plugin] 热重载失败: {e}") - return {"success": False, "error": str(e)} + if duplicate_plugin_ids := mgr._find_duplicate_plugin_ids(list(mgr._iter_plugin_dirs())): + details = "; ".join( + f"{conflict_plugin_id}: {', '.join(paths)}" + for conflict_plugin_id, paths in sorted(duplicate_plugin_ids.items()) + ) + return {"success": False, "error": f"检测到重复插件 ID,拒绝热重载: {details}"} + + try: + sv = mgr._get_supervisor_for_plugin(plugin_name) + except RuntimeError as exc: + return {"success": False, "error": str(exc)} + + if sv is not None: + try: + reloaded = await sv.reload_plugins(reason=f"reload {plugin_name}") + if reloaded: + return {"success": True} + return {"success": False, "error": f"插件 {plugin_name} 热重载失败,已回滚"} + except Exception as e: + logger.error(f"[cap.component.reload_plugin] 热重载失败: {e}") + return {"success": False, "error": str(e)} return {"success": False, "error": f"未找到插件: {plugin_name}"} # ═════════════════════════════════════════════════════════ diff --git a/src/plugin_runtime/runner/plugin_loader.py b/src/plugin_runtime/runner/plugin_loader.py index 1ddc55b7..97ab3284 100644 --- a/src/plugin_runtime/runner/plugin_loader.py +++ b/src/plugin_runtime/runner/plugin_loader.py @@ -78,6 +78,7 @@ class PluginLoader: """ # 第一阶段:发现并校验 manifest candidates: Dict[str, Tuple[str, Dict[str, Any], str]] = {} # id -> (dir, manifest, plugin_path) + duplicate_candidates: Dict[str, List[str]] = {} for base_dir in plugin_dirs: if not os.path.isdir(base_dir): logger.warning(f"插件目录不存在: {base_dir}") @@ -107,9 +108,25 @@ class PluginLoader: self._failed_plugins[entry] = f"manifest 校验失败: {errors}" continue - plugin_id = manifest.get("name", entry) + plugin_id = str(manifest.get("name", entry)).strip() or entry + if plugin_id in duplicate_candidates: + duplicate_candidates[plugin_id].append(plugin_dir) + continue + + previous = candidates.get(plugin_id) + if previous is not None: + duplicate_candidates[plugin_id] = [previous[0], plugin_dir] + candidates.pop(plugin_id, None) + continue + candidates[plugin_id] = (plugin_dir, manifest, plugin_path) + for plugin_id, conflict_dirs in duplicate_candidates.items(): + unique_dirs = sorted(dict.fromkeys(conflict_dirs)) + reason = f"检测到重复插件 ID: {plugin_id} -> {', '.join(unique_dirs)}" + self._failed_plugins[plugin_id] = reason + logger.error(reason) + # 第二阶段:依赖解析(拓扑排序) load_order, failed_deps = self._resolve_dependencies(candidates) diff --git a/src/plugin_runtime/runner/runner_main.py b/src/plugin_runtime/runner/runner_main.py index d80fe242..e06f76ef 100644 --- a/src/plugin_runtime/runner/runner_main.py +++ b/src/plugin_runtime/runner/runner_main.py @@ -24,7 +24,7 @@ import time import tomllib from src.common.logger import get_console_handler, get_logger, initialize_logging -from src.plugin_runtime import ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN +from src.plugin_runtime import ENV_HOST_VERSION, ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN from src.plugin_runtime.protocol.envelope import ( BootstrapPluginPayload, ComponentDeclaration, @@ -68,7 +68,7 @@ class PluginRunner: self._plugin_dirs: list[str] = plugin_dirs self._rpc_client: RPCClient = RPCClient(host_address, session_token) - self._loader: PluginLoader = PluginLoader() + self._loader: PluginLoader = PluginLoader(host_version=os.getenv(ENV_HOST_VERSION, "")) self._start_time: float = time.monotonic() self._shutting_down: bool = False