diff --git a/pytests/test_plugin_runtime.py b/pytests/test_plugin_runtime.py index 823777f3..76dbe316 100644 --- a/pytests/test_plugin_runtime.py +++ b/pytests/test_plugin_runtime.py @@ -1199,6 +1199,14 @@ class TestSupervisor: def __init__(self): self.runner_generation = 1 self.is_connected = True + self.session_token = "fake-token" + + def reset_session_token(self): + self.session_token = "new-fake-token" + return self.session_token + + def restore_session_token(self, token): + self.session_token = token async def send_request(self, method, timeout_ms=5000, **kwargs): assert self.runner_generation == 2 @@ -1240,6 +1248,14 @@ class TestSupervisor: def __init__(self): self.runner_generation = 1 self.is_connected = True + self.session_token = "fake-token" + + def reset_session_token(self): + self.session_token = "new-fake-token" + return self.session_token + + def restore_session_token(self, token): + self.session_token = token async def send_request(self, method, timeout_ms=5000, **kwargs): raise RuntimeError("new runner unhealthy") diff --git a/src/config/config.py b/src/config/config.py index de196599..a3b81d2d 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -26,6 +26,7 @@ from .official_configs import ( LPMMKnowledgeConfig, MaiSakaConfig, MaimMessageConfig, + PluginRuntimeConfig, MemoryConfig, MessageReceiveConfig, PersonalityConfig, @@ -54,7 +55,7 @@ CONFIG_DIR: Path = PROJECT_ROOT / "config" BOT_CONFIG_PATH: Path = (CONFIG_DIR / "bot_config.toml").resolve().absolute() MODEL_CONFIG_PATH: Path = (CONFIG_DIR / "model_config.toml").resolve().absolute() MMC_VERSION: str = "1.0.0" -CONFIG_VERSION: str = "8.0.0" +CONFIG_VERSION: str = "8.1.0" MODEL_CONFIG_VERSION: str = "1.12.0" logger = get_logger("config") @@ -131,6 +132,9 @@ class Config(ConfigBase): maisaka: MaiSakaConfig = Field(default_factory=MaiSakaConfig) """MaiSaka对话系统配置类""" + plugin_runtime: PluginRuntimeConfig = Field(default_factory=PluginRuntimeConfig) + """插件运行时配置类""" + class ModelConfig(ConfigBase): """模型配置类""" diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 60153dac..2de01030 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -1608,3 +1608,82 @@ class MaiSakaConfig(ConfigBase): }, ) """QQ API 密钥""" + + +class PluginRuntimeConfig(ConfigBase): + """插件运行时配置类""" + + __ui_label__ = "插件运行时" + __ui_icon__ = "puzzle" + + enabled: bool = Field( + default=True, + json_schema_extra={ + "x-widget": "switch", + "x-icon": "power", + }, + ) + """启用插件系统""" + + builtin_plugin_dir: str = Field( + default="src/plugins/built_in", + json_schema_extra={ + "x-widget": "input", + "x-icon": "folder", + }, + ) + """内置插件目录(相对于项目根目录)""" + + thirdparty_plugin_dir: str = Field( + default="plugins", + json_schema_extra={ + "x-widget": "input", + "x-icon": "folder-open", + }, + ) + """第三方插件目录(相对于项目根目录)""" + + health_check_interval_sec: float = Field( + default=30.0, + json_schema_extra={ + "x-widget": "number", + "x-icon": "activity", + }, + ) + """健康检查间隔(秒)""" + + max_restart_attempts: int = Field( + default=3, + json_schema_extra={ + "x-widget": "number", + "x-icon": "refresh-cw", + }, + ) + """Runner 崩溃后最大自动重启次数""" + + runner_spawn_timeout_sec: float = Field( + default=30.0, + json_schema_extra={ + "x-widget": "number", + "x-icon": "clock", + }, + ) + """等待 Runner 子进程启动并注册的超时时间(秒)""" + + workflow_blocking_timeout_sec: float = Field( + default=120.0, + json_schema_extra={ + "x-widget": "number", + "x-icon": "timer", + }, + ) + """Workflow 阻塞步骤的全局超时上限(秒)""" + + ipc_socket_path: str = Field( + default="", + json_schema_extra={ + "x-widget": "input", + "x-icon": "link", + }, + ) + """_wrap_\n 自定义 IPC Socket 路径(仅 Linux/macOS 生效)\n 留空则自动生成临时路径""" diff --git a/src/plugin_runtime/__init__.py b/src/plugin_runtime/__init__.py index 8b137891..a4a6768b 100644 --- a/src/plugin_runtime/__init__.py +++ b/src/plugin_runtime/__init__.py @@ -1 +1,16 @@ +"""插件运行时包 + +定义 Host ↔ Runner 子进程间传递的环境变量名称常量。 +这些环境变量用于子进程 IPC 通信,值在运行时动态生成。 +""" + +# Host 端在 spawn Runner 子进程时设置、Runner 端启动时读取的环境变量名 +ENV_IPC_ADDRESS = "MAIBOT_IPC_ADDRESS" +"""IPC 传输层监听地址(UDS socket 路径或 TCP host:port)""" + +ENV_SESSION_TOKEN = "MAIBOT_SESSION_TOKEN" +"""本次会话的认证令牌(每次 spawn / reload 重新生成)""" + +ENV_PLUGIN_DIRS = "MAIBOT_PLUGIN_DIRS" +"""Runner 需要加载的插件目录列表(os.pathsep 分隔)""" diff --git a/src/plugin_runtime/host/supervisor.py b/src/plugin_runtime/host/supervisor.py index 23d4bd6b..e9d3b726 100644 --- a/src/plugin_runtime/host/supervisor.py +++ b/src/plugin_runtime/host/supervisor.py @@ -16,6 +16,8 @@ import os import sys from src.common.logger import get_logger +from src.config.config import global_config +from src.plugin_runtime import ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN from src.plugin_runtime.host.capability_service import CapabilityService from src.plugin_runtime.host.component_registry import ComponentRegistry from src.plugin_runtime.host.event_dispatcher import EventDispatcher @@ -93,10 +95,14 @@ class PluginSupervisor: self, plugin_dirs: Optional[List[str]] = None, socket_path: Optional[str] = None, - health_check_interval_sec: float = 30.0, + health_check_interval_sec: Optional[float] = None, + max_restart_attempts: Optional[int] = None, + runner_spawn_timeout_sec: Optional[float] = None, ): + _cfg = global_config.plugin_runtime self._plugin_dirs = plugin_dirs or [] - self._health_interval = health_check_interval_sec + self._health_interval = health_check_interval_sec if health_check_interval_sec is not None else _cfg.health_check_interval_sec + self._runner_spawn_timeout = runner_spawn_timeout_sec if runner_spawn_timeout_sec is not None else _cfg.runner_spawn_timeout_sec # 基础设施 self._transport = create_transport_server(socket_path=socket_path) @@ -118,7 +124,7 @@ class PluginSupervisor: # Runner 子进程 self._runner_process: Optional[asyncio.subprocess.Process] = None self._runner_generation: int = 0 - self._max_restart_attempts: int = 3 + self._max_restart_attempts: int = max_restart_attempts if max_restart_attempts is not None else _cfg.max_restart_attempts self._restart_count: int = 0 # 已注册的插件组件信息 @@ -230,10 +236,10 @@ class PluginSupervisor: # 等待 Runner 完成连接,避免 start() 返回时 Runner 尚未就绪 try: - await self._wait_for_runner_generation(expected_generation, timeout_sec=30.0) + await self._wait_for_runner_generation(expected_generation, timeout_sec=self._runner_spawn_timeout) except TimeoutError: if not self._rpc_server.is_connected: - logger.warning("Runner 未在 30s 内完成连接,后续操作可能失败") + logger.warning(f"Runner 未在 {self._runner_spawn_timeout}s 内完成连接,后续操作可能失败") # 启动健康检查 self._health_task = asyncio.create_task(self._health_check_loop()) @@ -305,7 +311,7 @@ class PluginSupervisor: # 拉起新 Runner try: await self._spawn_runner() - await self._wait_for_runner_generation(expected_generation, timeout_sec=30.0) + await self._wait_for_runner_generation(expected_generation, timeout_sec=self._runner_spawn_timeout) resp = await self._rpc_server.send_request("plugin.health", timeout_ms=5000) health = HealthPayload.model_validate(resp.payload) if not health.healthy: @@ -393,9 +399,9 @@ class PluginSupervisor: token = self._rpc_server.session_token env = os.environ.copy() - env["MAIBOT_IPC_ADDRESS"] = address - env["MAIBOT_SESSION_TOKEN"] = token - env["MAIBOT_PLUGIN_DIRS"] = os.pathsep.join(self._plugin_dirs) + env[ENV_IPC_ADDRESS] = address + env[ENV_SESSION_TOKEN] = token + env[ENV_PLUGIN_DIRS] = os.pathsep.join(self._plugin_dirs) self._runner_process = await asyncio.create_subprocess_exec( sys.executable, "-m", runner_module, diff --git a/src/plugin_runtime/host/workflow_executor.py b/src/plugin_runtime/host/workflow_executor.py index 69b2102a..e191eebb 100644 --- a/src/plugin_runtime/host/workflow_executor.py +++ b/src/plugin_runtime/host/workflow_executor.py @@ -24,6 +24,7 @@ import time import uuid from src.common.logger import get_logger +from src.config.config import global_config from src.plugin_runtime.host.component_registry import ComponentRegistry, RegisteredComponent logger = get_logger("plugin_runtime.host.workflow_executor") @@ -44,7 +45,9 @@ HOOK_SKIP_STAGE = "skip_stage" HOOK_ABORT = "abort" # blocking hook 全局最大超时(秒):即使 hook 声明 timeout_ms=0 也不会无限等待 -GLOBAL_BLOCKING_TIMEOUT_SEC = 120.0 +# 从配置文件读取,允许用户调整 +def _get_blocking_timeout() -> float: + return global_config.plugin_runtime.workflow_blocking_timeout_sec class ModificationRecord: @@ -300,7 +303,7 @@ class WorkflowExecutor: """ timeout_ms = step.metadata.get("timeout_ms", 0) # 使用 hook 声明的超时,但不超过全局安全阀 - timeout_sec = timeout_ms / 1000 if timeout_ms > 0 else GLOBAL_BLOCKING_TIMEOUT_SEC + timeout_sec = timeout_ms / 1000 if timeout_ms > 0 else _get_blocking_timeout() step_key = f"{stage}:{step.full_name}" step_start = time.perf_counter() diff --git a/src/plugin_runtime/integration.py b/src/plugin_runtime/integration.py index 1d4eced0..dcd04629 100644 --- a/src/plugin_runtime/integration.py +++ b/src/plugin_runtime/integration.py @@ -13,6 +13,7 @@ import asyncio import os from src.common.logger import get_logger +from src.config.config import global_config logger = get_logger("plugin_runtime.integration") @@ -66,6 +67,11 @@ class PluginRuntimeManager: logger.warning("PluginRuntimeManager 已在运行中,跳过重复启动") return + _cfg = global_config.plugin_runtime + if not _cfg.enabled: + logger.info("插件运行时已在配置中禁用,跳过启动") + return + from src.plugin_runtime.host.supervisor import PluginSupervisor builtin_dirs = self._get_builtin_plugin_dirs() @@ -75,18 +81,21 @@ class PluginRuntimeManager: logger.info("未找到任何插件目录,跳过插件运行时启动") return + # 从配置读取自定义 IPC socket 路径(留空则自动生成) + socket_path = _cfg.ipc_socket_path or None + # 创建两个 Supervisor,各自拥有独立的 socket / Runner 子进程 if builtin_dirs: self._builtin_supervisor = PluginSupervisor( plugin_dirs=builtin_dirs, - socket_path=None, # 自动生成 + socket_path=socket_path, ) self._register_capability_impls(self._builtin_supervisor) if thirdparty_dirs: self._thirdparty_supervisor = PluginSupervisor( plugin_dirs=thirdparty_dirs, - socket_path=None, + socket_path=socket_path, ) self._register_capability_impls(self._thirdparty_supervisor) diff --git a/src/plugin_runtime/runner/runner_main.py b/src/plugin_runtime/runner/runner_main.py index 878b1947..db08bb24 100644 --- a/src/plugin_runtime/runner/runner_main.py +++ b/src/plugin_runtime/runner/runner_main.py @@ -23,6 +23,7 @@ import time from typing import Any from src.common.logger import get_logger, initialize_logging +from src.plugin_runtime import ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN from src.plugin_runtime.protocol.envelope import ( ComponentDeclaration, Envelope, @@ -413,9 +414,7 @@ def _isolate_sys_path(plugin_dirs: List[str]) -> None: _ALLOWED_SRC_PREFIXES = ("src.plugin_runtime", "src.common") def find_module(self, fullname, path=None): - if self._should_block(fullname): - return self - return None + return self if self._should_block(fullname) else None def load_module(self, fullname): raise ImportError( @@ -427,10 +426,10 @@ def _isolate_sys_path(plugin_dirs: List[str]) -> None: if not fullname.startswith("src.") or fullname == "src": return False # 放行白名单前缀 - for prefix in self._ALLOWED_SRC_PREFIXES: - if fullname == prefix or fullname.startswith(prefix + "."): - return False - return True + return not any( + fullname == prefix or fullname.startswith(f"{prefix}.") + for prefix in self._ALLOWED_SRC_PREFIXES + ) sys.meta_path.insert(0, _PluginImportBlocker()) @@ -439,12 +438,12 @@ def _isolate_sys_path(plugin_dirs: List[str]) -> None: async def _async_main() -> None: """异步主入口""" - host_address = os.environ.get("MAIBOT_IPC_ADDRESS", "") - session_token = os.environ.get("MAIBOT_SESSION_TOKEN", "") - plugin_dirs_str = os.environ.get("MAIBOT_PLUGIN_DIRS", "") + host_address = os.environ.get(ENV_IPC_ADDRESS, "") + session_token = os.environ.get(ENV_SESSION_TOKEN, "") + plugin_dirs_str = os.environ.get(ENV_PLUGIN_DIRS, "") if not host_address or not session_token: - logger.error("缺少必要的环境变量: MAIBOT_IPC_ADDRESS, MAIBOT_SESSION_TOKEN") + logger.error(f"缺少必要的环境变量: {ENV_IPC_ADDRESS}, {ENV_SESSION_TOKEN}") sys.exit(1) plugin_dirs = [d for d in plugin_dirs_str.split(os.pathsep) if d]