feat: 添加 Host 应用版本号支持,优化插件加载和热重载逻辑,检测重复插件 ID

This commit is contained in:
DrSmoothl
2026-03-13 17:35:35 +08:00
parent 8da1b6d93f
commit 2f3519411a
6 changed files with 189 additions and 65 deletions

View File

@@ -13,3 +13,6 @@ ENV_SESSION_TOKEN = "MAIBOT_SESSION_TOKEN"
ENV_PLUGIN_DIRS = "MAIBOT_PLUGIN_DIRS"
"""Runner 需要加载的插件目录列表os.pathsep 分隔)"""
ENV_HOST_VERSION = "MAIBOT_HOST_VERSION"
"""Runner 读取的 Host 应用版本号,用于 manifest 兼容性校验"""

View File

@@ -11,6 +11,7 @@ from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
import asyncio
import contextlib
import re
import secrets
from src.common.logger import get_logger
@@ -561,13 +562,22 @@ class RPCServer:
def _check_sdk_version(sdk_version: str) -> bool:
"""检查 SDK 版本是否在支持范围内"""
try:
sdk_parts = [int(x) for x in sdk_version.split(".")]
min_parts = [int(x) for x in MIN_SDK_VERSION.split(".")]
max_parts = [int(x) for x in MAX_SDK_VERSION.split(".")]
sdk_parts = RPCServer._parse_version_tuple(sdk_version)
min_parts = RPCServer._parse_version_tuple(MIN_SDK_VERSION)
max_parts = RPCServer._parse_version_tuple(MAX_SDK_VERSION)
return min_parts <= sdk_parts <= max_parts
except (ValueError, AttributeError):
return False
@staticmethod
def _parse_version_tuple(version: str) -> Tuple[int, int, int]:
base_version = re.split(r"[-.](?:snapshot|dev|alpha|beta|rc)", version or "", flags=re.IGNORECASE)[0]
base_version = base_version.split("+", 1)[0]
parts = [part for part in base_version.split(".") if part != ""]
while len(parts) < 3:
parts.append("0")
return (int(parts[0]), int(parts[1]), int(parts[2]))
def _get_connection_for_generation(self, generation: int) -> Optional[Connection]:
if generation == self._runner_generation:
return self._connection

View File

@@ -16,8 +16,8 @@ import os
import sys
from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_runtime import ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN
from src.config.config import MMC_VERSION, global_config
from src.plugin_runtime import ENV_HOST_VERSION, ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN
from src.plugin_runtime.host.capability_service import CapabilityService
from src.plugin_runtime.host.component_registry import ComponentRegistry
from src.plugin_runtime.host.event_dispatcher import EventDispatcher
@@ -521,6 +521,7 @@ class PluginSupervisor:
env[ENV_IPC_ADDRESS] = address
env[ENV_SESSION_TOKEN] = token
env[ENV_PLUGIN_DIRS] = os.pathsep.join(self._plugin_dirs)
env[ENV_HOST_VERSION] = MMC_VERSION
self._runner_process = await asyncio.create_subprocess_exec(
sys.executable,

View File

@@ -10,6 +10,7 @@
from typing import Any, Dict, Iterable, List, Optional, Tuple
import asyncio
import json
import os
from pathlib import Path
@@ -83,6 +84,13 @@ class PluginRuntimeManager:
builtin_dirs = self._get_builtin_plugin_dirs()
thirdparty_dirs = self._get_thirdparty_plugin_dirs()
if duplicate_plugin_ids := self._find_duplicate_plugin_ids(builtin_dirs + thirdparty_dirs):
details = "; ".join(
f"{plugin_id}: {', '.join(paths)}" for plugin_id, paths in sorted(duplicate_plugin_ids.items())
)
logger.error(f"检测到重复插件 ID拒绝启动插件运行时: {details}")
return
if not builtin_dirs and not thirdparty_dirs:
logger.info("未找到任何插件目录,跳过插件运行时启动")
return
@@ -173,20 +181,26 @@ class PluginRuntimeManager:
if not self._started:
return False
for sv in self.supervisors:
if plugin_id in sv._registered_plugins:
config_payload = (
config_data
if config_data is not None
else self._load_plugin_config_for_supervisor(plugin_id, getattr(sv, "_plugin_dirs", []))
)
await sv.notify_plugin_config_updated(
plugin_id=plugin_id,
config_data=config_payload,
config_version=config_version,
)
return True
return False
try:
sv = self._get_supervisor_for_plugin(plugin_id)
except RuntimeError as exc:
logger.error(f"推送插件配置更新失败: {exc}")
return False
if sv is None:
return False
config_payload = (
config_data
if config_data is not None
else self._load_plugin_config_for_supervisor(plugin_id, getattr(sv, "_plugin_dirs", []))
)
await sv.notify_plugin_config_updated(
plugin_id=plugin_id,
config_data=config_payload,
config_version=config_version,
)
return True
async def handle_config_reload(self) -> None:
"""处理主配置热重载后的插件配置通知。"""
@@ -267,16 +281,60 @@ class PluginRuntimeManager:
timeout_ms: int = 30000,
) -> Any:
"""将插件调用路由到拥有该插件的 Supervisor"""
for sv in self.supervisors:
if sv.component_registry.get_components_by_plugin(plugin_id):
return await sv.invoke_plugin(
method=method,
plugin_id=plugin_id,
component_name=component_name,
args=args,
timeout_ms=timeout_ms,
)
raise RuntimeError(f"插件 {plugin_id} 未在任何 Supervisor 中注册")
sv = self._get_supervisor_for_plugin(plugin_id)
if sv is None:
raise RuntimeError(f"插件 {plugin_id} 未在任何 Supervisor 中注册")
return await sv.invoke_plugin(
method=method,
plugin_id=plugin_id,
component_name=component_name,
args=args,
timeout_ms=timeout_ms,
)
def _get_supervisors_for_plugin(self, plugin_id: str) -> List[Any]:
return [
supervisor
for supervisor in self.supervisors
if plugin_id in getattr(supervisor, "_registered_plugins", {})
]
def _get_supervisor_for_plugin(self, plugin_id: str) -> Optional[Any]:
matches = self._get_supervisors_for_plugin(plugin_id)
if len(matches) > 1:
raise RuntimeError(f"插件 {plugin_id} 同时存在于多个 Supervisor 中,无法安全路由")
return matches[0] if matches else None
@staticmethod
def _find_duplicate_plugin_ids(plugin_dirs: List[str]) -> Dict[str, List[str]]:
plugin_locations: Dict[str, List[str]] = {}
for base_dir in plugin_dirs:
if not os.path.isdir(base_dir):
continue
for entry in os.listdir(base_dir):
plugin_dir = os.path.join(base_dir, entry)
if not os.path.isdir(plugin_dir):
continue
manifest_path = os.path.join(plugin_dir, "_manifest.json")
plugin_path = os.path.join(plugin_dir, "plugin.py")
if not os.path.exists(manifest_path) or not os.path.exists(plugin_path):
continue
plugin_id = entry
try:
with open(manifest_path, "r", encoding="utf-8") as manifest_file:
manifest = json.load(manifest_file)
plugin_id = str(manifest.get("name", entry)).strip() or entry
except Exception:
continue
plugin_locations.setdefault(plugin_id, []).append(plugin_dir)
return {
plugin_id: sorted(dict.fromkeys(paths))
for plugin_id, paths in plugin_locations.items()
if len(set(paths)) > 1
}
def _register_config_reload_callback(self) -> None:
if self._config_reload_callback_registered:
@@ -321,13 +379,19 @@ class PluginRuntimeManager:
def _iter_plugin_dirs(self) -> Iterable[str]:
for supervisor in self.supervisors:
for plugin_dir in getattr(supervisor, "_plugin_dirs", []):
yield plugin_dir
yield from getattr(supervisor, "_plugin_dirs", [])
async def _handle_plugin_file_changes(self, changes: List[FileChange]) -> None:
if not self._started or not changes:
return
if duplicate_plugin_ids := self._find_duplicate_plugin_ids(list(self._iter_plugin_dirs())):
details = "; ".join(
f"{plugin_id}: {', '.join(paths)}" for plugin_id, paths in sorted(duplicate_plugin_ids.items())
)
logger.error(f"检测到重复插件 ID跳过本次插件热重载: {details}")
return
reload_supervisors: List[Any] = []
config_updates: Dict[str, set[str]] = {}
changed_paths = [change.path.resolve() for change in changes]
@@ -362,8 +426,8 @@ class PluginRuntimeManager:
logger.warning(f"插件 {plugin_id} 配置热更新通知失败: {exc}")
@staticmethod
def _get_supervisor_key(supervisor: Any) -> str:
return str(id(supervisor))
def _get_supervisor_key(supervisor: Any) -> int:
return id(supervisor)
@staticmethod
def _plugin_dir_matches(path: Path, plugin_dir: str) -> bool:
@@ -371,7 +435,7 @@ class PluginRuntimeManager:
return path == plugin_root or path.is_relative_to(plugin_root)
def _match_plugin_id_for_supervisor(self, supervisor: Any, path: Path) -> Optional[str]:
for plugin_id, reg in getattr(supervisor, "_registered_plugins", {}).items():
for plugin_id, _reg in getattr(supervisor, "_registered_plugins", {}).items():
for plugin_dir in getattr(supervisor, "_plugin_dirs", []):
candidate_dir = Path(plugin_dir).resolve() / plugin_id
if path == candidate_dir or path.is_relative_to(candidate_dir):
@@ -379,10 +443,8 @@ class PluginRuntimeManager:
for plugin_dir in getattr(supervisor, "_plugin_dirs", []):
plugin_root = Path(plugin_dir).resolve()
if self._plugin_dir_matches(path, plugin_dir):
relative_parts = path.relative_to(plugin_root).parts
if relative_parts:
return relative_parts[0]
if self._plugin_dir_matches(path, plugin_dir) and (relative_parts := path.relative_to(plugin_root).parts):
return relative_parts[0]
return None
@staticmethod
@@ -1589,6 +1651,9 @@ class PluginRuntimeManager:
result: Dict[str, Any] = {}
for sv in mgr.supervisors:
for pid, reg in sv._registered_plugins.items():
if pid in result:
logger.error(f"检测到重复插件 ID {pid}component.get_all_plugins 结果已拒绝聚合")
return {"success": False, "error": f"检测到重复插件 ID: {pid}"}
# 从 ComponentRegistry 中获取该插件的所有组件
comps = sv.component_registry.get_components_by_plugin(pid, enabled_only=False)
components_list = [
@@ -1619,7 +1684,12 @@ class PluginRuntimeManager:
"""
plugin_name: str = args.get("plugin_name", plugin_id)
mgr = get_plugin_runtime_manager()
for sv in mgr.supervisors:
try:
sv = mgr._get_supervisor_for_plugin(plugin_name)
except RuntimeError as exc:
return {"success": False, "error": str(exc)}
if sv is not None:
reg = sv._registered_plugins.get(plugin_name)
if reg is not None:
return {
@@ -1667,9 +1737,11 @@ class PluginRuntimeManager:
if comp is not None and comp.component_type == component_type:
return comp, None
for candidate in sv.component_registry.get_components_by_type(component_type, enabled_only=False):
if candidate.name == name:
short_name_matches.append(candidate)
short_name_matches.extend(
candidate
for candidate in sv.component_registry.get_components_by_type(component_type, enabled_only=False)
if candidate.name == name
)
if len(short_name_matches) == 1:
return short_name_matches[0], None
@@ -1737,18 +1809,28 @@ class PluginRuntimeManager:
import os
mgr = get_plugin_runtime_manager()
if duplicate_plugin_ids := mgr._find_duplicate_plugin_ids(list(mgr._iter_plugin_dirs())):
details = "; ".join(
f"{conflict_plugin_id}: {', '.join(paths)}"
for conflict_plugin_id, paths in sorted(duplicate_plugin_ids.items())
)
return {"success": False, "error": f"检测到重复插件 ID拒绝热重载: {details}"}
# 优先查找已注册该插件的 Supervisor
for sv in mgr.supervisors:
if plugin_name in sv._registered_plugins:
try:
reloaded = await sv.reload_plugins(reason=f"load {plugin_name}")
if reloaded:
return {"success": True, "count": 1}
return {"success": False, "error": f"插件 {plugin_name} 热重载失败,已回滚"}
except Exception as e:
logger.error(f"[cap.component.load_plugin] 热重载失败: {e}")
return {"success": False, "error": str(e)}
try:
registered_supervisor = mgr._get_supervisor_for_plugin(plugin_name)
except RuntimeError as exc:
return {"success": False, "error": str(exc)}
if registered_supervisor is not None:
try:
reloaded = await registered_supervisor.reload_plugins(reason=f"load {plugin_name}")
if reloaded:
return {"success": True, "count": 1}
return {"success": False, "error": f"插件 {plugin_name} 热重载失败,已回滚"}
except Exception as e:
logger.error(f"[cap.component.load_plugin] 热重载失败: {e}")
return {"success": False, "error": str(e)}
# 插件尚未注册,检查是否有 Supervisor 的 plugin_dirs 下包含该插件目录
for sv in mgr.supervisors:
@@ -1784,16 +1866,27 @@ class PluginRuntimeManager:
return {"success": False, "error": "缺少必要参数 plugin_name"}
mgr = get_plugin_runtime_manager()
for sv in mgr.supervisors:
if plugin_name in sv._registered_plugins:
try:
reloaded = await sv.reload_plugins(reason=f"reload {plugin_name}")
if reloaded:
return {"success": True}
return {"success": False, "error": f"插件 {plugin_name} 热重载失败,已回滚"}
except Exception as e:
logger.error(f"[cap.component.reload_plugin] 热重载失败: {e}")
return {"success": False, "error": str(e)}
if duplicate_plugin_ids := mgr._find_duplicate_plugin_ids(list(mgr._iter_plugin_dirs())):
details = "; ".join(
f"{conflict_plugin_id}: {', '.join(paths)}"
for conflict_plugin_id, paths in sorted(duplicate_plugin_ids.items())
)
return {"success": False, "error": f"检测到重复插件 ID拒绝热重载: {details}"}
try:
sv = mgr._get_supervisor_for_plugin(plugin_name)
except RuntimeError as exc:
return {"success": False, "error": str(exc)}
if sv is not None:
try:
reloaded = await sv.reload_plugins(reason=f"reload {plugin_name}")
if reloaded:
return {"success": True}
return {"success": False, "error": f"插件 {plugin_name} 热重载失败,已回滚"}
except Exception as e:
logger.error(f"[cap.component.reload_plugin] 热重载失败: {e}")
return {"success": False, "error": str(e)}
return {"success": False, "error": f"未找到插件: {plugin_name}"}
# ═════════════════════════════════════════════════════════

View File

@@ -78,6 +78,7 @@ class PluginLoader:
"""
# 第一阶段:发现并校验 manifest
candidates: Dict[str, Tuple[str, Dict[str, Any], str]] = {} # id -> (dir, manifest, plugin_path)
duplicate_candidates: Dict[str, List[str]] = {}
for base_dir in plugin_dirs:
if not os.path.isdir(base_dir):
logger.warning(f"插件目录不存在: {base_dir}")
@@ -107,9 +108,25 @@ class PluginLoader:
self._failed_plugins[entry] = f"manifest 校验失败: {errors}"
continue
plugin_id = manifest.get("name", entry)
plugin_id = str(manifest.get("name", entry)).strip() or entry
if plugin_id in duplicate_candidates:
duplicate_candidates[plugin_id].append(plugin_dir)
continue
previous = candidates.get(plugin_id)
if previous is not None:
duplicate_candidates[plugin_id] = [previous[0], plugin_dir]
candidates.pop(plugin_id, None)
continue
candidates[plugin_id] = (plugin_dir, manifest, plugin_path)
for plugin_id, conflict_dirs in duplicate_candidates.items():
unique_dirs = sorted(dict.fromkeys(conflict_dirs))
reason = f"检测到重复插件 ID: {plugin_id} -> {', '.join(unique_dirs)}"
self._failed_plugins[plugin_id] = reason
logger.error(reason)
# 第二阶段:依赖解析(拓扑排序)
load_order, failed_deps = self._resolve_dependencies(candidates)

View File

@@ -24,7 +24,7 @@ import time
import tomllib
from src.common.logger import get_console_handler, get_logger, initialize_logging
from src.plugin_runtime import ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN
from src.plugin_runtime import ENV_HOST_VERSION, ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN
from src.plugin_runtime.protocol.envelope import (
BootstrapPluginPayload,
ComponentDeclaration,
@@ -68,7 +68,7 @@ class PluginRunner:
self._plugin_dirs: list[str] = plugin_dirs
self._rpc_client: RPCClient = RPCClient(host_address, session_token)
self._loader: PluginLoader = PluginLoader()
self._loader: PluginLoader = PluginLoader(host_version=os.getenv(ENV_HOST_VERSION, ""))
self._start_time: float = time.monotonic()
self._shutting_down: bool = False