diff --git a/src/plugin_runtime/host/supervisor.py b/src/plugin_runtime/host/supervisor.py index bfa00cbf..82a5970b 100644 --- a/src/plugin_runtime/host/supervisor.py +++ b/src/plugin_runtime/host/supervisor.py @@ -1,98 +1,41 @@ -"""Supervisor - 插件生命周期管理 - -负责: -1. 拉起 Runner 子进程 -2. 健康检查 + 崩溃自动重启 -3. 代码热重载(generation 切换) -4. 优雅关停 -""" - -from typing import Any, Dict, List, Optional, Tuple +from pathlib import Path +from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING import asyncio -import contextlib -import logging as stdlib_logging -import os -import sys -from pathlib import Path + from src.common.logger import get_logger -from src.config.config import MMC_VERSION, global_config -from src.plugin_runtime import ENV_HOST_VERSION, ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN -from src.plugin_runtime.host.capability_service import CapabilityService -from src.plugin_runtime.host.component_registry import ComponentRegistry -from src.plugin_runtime.host.event_dispatcher import EventDispatcher -from src.plugin_runtime.host.policy_engine import PolicyEngine -from src.plugin_runtime.host.rpc_server import RPCServer -from src.plugin_runtime.host.workflow_executor import WorkflowExecutor, WorkflowContext, WorkflowResult +from src.config.config import global_config +from src.plugin_runtime.transport.factory import create_transport_server from src.plugin_runtime.protocol.envelope import ( BootstrapPluginPayload, ConfigUpdatedPayload, Envelope, HealthPayload, LogBatchPayload, - RegisterComponentsPayload, + RegisterPluginPayload, RunnerReadyPayload, ShutdownPayload, ) -from src.plugin_runtime.protocol.errors import ErrorCode, RPCError -from src.plugin_runtime.transport.factory import create_transport_server -logger = get_logger("plugin_runtime.host.supervisor") +from .authorization import AuthorizationManager +from .capability_service import CapabilityService +from .rpc_server import RPCServer +from .logger_bridge import RunnerLogBridge +from .component_registry import ComponentRegistry +from .event_dispatcher import EventDispatcher +from .hook_dispatcher import HookDispatcher +from .message_gateway import MessageGateway +from .message_utils import PluginMessageUtils + +if TYPE_CHECKING: + from src.chat.message_receive.message import SessionMessage + +logger = get_logger("plugin_runtime.host.runner_manager") -# ─── 日志桥 ────────────────────────────────────────────────────── - - -class RunnerLogBridge: - """将 Runner 进程上报的批量日志重放到主进程的 Logger 中。 - - Runner 通过 ``runner.log_batch`` IPC 事件批量到达。 - 每条 LogEntry 被重建为一个真实的 :class:`logging.LogRecord` 并直接 - 调用 ``logging.getLogger(entry.logger_name).handle(record)``, - 从而接入主进程已配置好的 structlog Handler 链。 - """ - - async def handle_log_batch(self, envelope: Envelope) -> Envelope: - """IPC 事件处理器:解析批量日志并重放到主进程 Logger。 - - Args: - envelope: 方法名为 ``runner.log_batch`` 的 IPC 事件信封。 - - Returns: - 空响应信封(事件模式下将被忽略)。 - """ - try: - batch = LogBatchPayload.model_validate(envelope.payload) - except Exception as exc: - return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc)) - - for entry in batch.entries: - # 重建一个与原始日志尽量相符的 LogRecord - record = stdlib_logging.LogRecord( - name=entry.logger_name, - level=entry.level, - pathname="", - lineno=0, - msg=entry.message, - args=(), - exc_info=None, - ) - record.created = entry.timestamp_ms / 1000.0 - record.msecs = entry.timestamp_ms % 1000 - if entry.exception_text: - record.exc_text = entry.exception_text - - stdlib_logging.getLogger(entry.logger_name).handle(record) - - return envelope.make_response(payload={"accepted": True, "count": len(batch.entries)}) - - -class PluginSupervisor: - """插件 Supervisor - - Host 端的核心管理器,负责整个插件 Runner 进程的生命周期。 - """ +class PluginRunnerSupervisor: + """插件的Runner管理器,负责管理Runner的生命周期""" def __init__( self, @@ -103,45 +46,34 @@ class PluginSupervisor: runner_spawn_timeout_sec: Optional[float] = None, ): _cfg = global_config.plugin_runtime - self._plugin_dirs = plugin_dirs or [] - self._health_interval = ( - health_check_interval_sec if health_check_interval_sec is not None else _cfg.health_check_interval_sec - ) - self._runner_spawn_timeout = ( - runner_spawn_timeout_sec if runner_spawn_timeout_sec is not None else _cfg.runner_spawn_timeout_sec - ) + self._plugin_dirs: List[Path] = plugin_dirs or [] + self._health_interval = health_check_interval_sec or _cfg.health_check_interval_sec or 30.0 + self._runner_spawn_timeout = runner_spawn_timeout_sec or _cfg.runner_spawn_timeout_sec or 30.0 # 基础设施 self._transport = create_transport_server(socket_path=socket_path) - self._policy = PolicyEngine() - self._capability_service = CapabilityService(self._policy) + self._authorization = AuthorizationManager() + self._capability_service = CapabilityService(self._authorization) self._component_registry = ComponentRegistry() self._event_dispatcher = EventDispatcher(self._component_registry) - self._workflow_executor = WorkflowExecutor(self._component_registry) + self._hook_dispatcher = HookDispatcher(self._component_registry) + self._message_gateway = MessageGateway(self._component_registry) - # 编解码 + # 编解码和服务器 from src.plugin_runtime.protocol.codec import MsgPackCodec codec = MsgPackCodec() - - self._rpc_server = RPCServer( - transport=self._transport, - codec=codec, - ) + self._rpc_server = RPCServer(transport=self._transport, codec=codec) # Runner 子进程 self._runner_process: Optional[asyncio.subprocess.Process] = None - self._runner_generation: int = 0 - self._max_restart_attempts: int = ( - max_restart_attempts if max_restart_attempts is not None else _cfg.max_restart_attempts - ) + self._max_restart_attempts: int = max_restart_attempts or _cfg.max_restart_attempts or 3 self._restart_count: int = 0 # 已注册的插件组件信息 - self._registered_plugins: Dict[str, RegisterComponentsPayload] = {} - self._staged_registered_plugins: Dict[str, RegisterComponentsPayload] = {} - self._runner_ready_events: Dict[int, asyncio.Event] = {} - self._runner_ready_payloads: Dict[int, RunnerReadyPayload] = {} + self._registered_plugins: Dict[str, RegisterPluginPayload] = {} + self._runner_ready_events: asyncio.Event = asyncio.Event() + self._runner_ready_payloads: RunnerReadyPayload = RunnerReadyPayload() # 后台任务 self._health_task: Optional[asyncio.Task] = None @@ -153,11 +85,11 @@ class PluginSupervisor: self._log_bridge: RunnerLogBridge = RunnerLogBridge() # 注册内部 RPC 方法 - self._register_internal_methods() + self._register_internal_methods() # TODO: 完成内部方法注册 @property - def policy_engine(self) -> PolicyEngine: - return self._policy + def authorization_manager(self) -> AuthorizationManager: + return self._authorization @property def capability_service(self) -> CapabilityService: @@ -172,8 +104,12 @@ class PluginSupervisor: return self._event_dispatcher @property - def workflow_executor(self) -> WorkflowExecutor: - return self._workflow_executor + def hook_dispatcher(self) -> HookDispatcher: + return self._hook_dispatcher + + @property + def message_gateway(self) -> MessageGateway: + return self._message_gateway @property def rpc_server(self) -> RPCServer: @@ -182,64 +118,26 @@ class PluginSupervisor: async def dispatch_event( self, event_type: str, - message: Optional[Dict[str, Any]] = None, + message: Optional["SessionMessage"] = None, extra_args: Optional[Dict[str, Any]] = None, - ) -> Tuple[bool, Optional[Dict[str, Any]]]: + ) -> Tuple[bool, Optional["SessionMessage"]]: """分发事件到所有对应 handler 的快捷方法。""" + return await self._event_dispatcher.dispatch_event(event_type, self, message, extra_args) - async def _invoke(plugin_id: str, component_name: str, args: Dict[str, Any]) -> Dict[str, Any]: - resp = await self.invoke_plugin( - method="plugin.emit_event", - plugin_id=plugin_id, - component_name=component_name, - args=args, - ) - return resp.payload + async def dispatch_hook(self, stage: str, **kwargs): + """分发Hook事件到所有对应 handler 的快捷方法。""" + return await self._hook_dispatcher.hook_dispatch(stage, self, **kwargs) - return await self._event_dispatcher.dispatch_event( - event_type=event_type, - invoke_fn=_invoke, - message=message, - extra_args=extra_args, - ) - - async def execute_workflow( + async def send_message_to_external( self, - message: Optional[Dict[str, Any]] = None, - stream_id: Optional[str] = None, - context: Optional[WorkflowContext] = None, - ) -> Tuple[WorkflowResult, Optional[Dict[str, Any]], WorkflowContext]: - """执行 Workflow Pipeline 的快捷方法。""" - - async def _invoke(plugin_id: str, component_name: str, args: Dict[str, Any]) -> Dict[str, Any]: - resp = await self.invoke_plugin( - method="plugin.invoke_workflow_step", - plugin_id=plugin_id, - component_name=component_name, - args=args, - ) - payload = resp.payload - if payload.get("success"): - result = payload.get("result") - return result if isinstance(result, dict) else {} - raise RuntimeError(payload.get("result", "workflow step invoke failed")) - - async def _command_invoke(plugin_id: str, component_name: str, args: Dict[str, Any]) -> Dict[str, Any]: - """命令走 plugin.invoke_command,保留原始返回值结构。""" - resp = await self.invoke_plugin( - method="plugin.invoke_command", - plugin_id=plugin_id, - component_name=component_name, - args=args, - ) - return resp.payload - - return await self._workflow_executor.execute( - invoke_fn=_invoke, - message=message, - stream_id=stream_id, - context=context, - command_invoke_fn=_command_invoke, + internal_message: "SessionMessage", + *, + enabled_only: bool = True, + save_to_db: bool = True, + ) -> bool: + """发送系统内部消息到外部平台的快捷方法。""" + return await self._message_gateway.send_message_to_external( + internal_message, self, enabled_only=enabled_only, save_to_db=save_to_db ) async def start(self) -> None: @@ -253,17 +151,13 @@ class PluginSupervisor: # 启动 RPC Server await self._rpc_server.start() - - # 计算预期 generation(与 reload_plugins 保持一致) - expected_generation = self._rpc_server.runner_generation + 1 - # 拉起 Runner 进程 await self._spawn_runner() # 等待 Runner 完成连接和初始化,避免 start() 返回时 Runner 尚未就绪 try: - await self._wait_for_runner_generation(expected_generation, timeout_sec=self._runner_spawn_timeout) - await self._wait_for_runner_ready(expected_generation, timeout_sec=self._runner_spawn_timeout) + await self._wait_for_runner_connection(timeout_sec=self._runner_spawn_timeout) + await self._wait_for_runner_ready(timeout_sec=self._runner_spawn_timeout) except TimeoutError: if not self._rpc_server.is_connected: logger.warning(f"Runner 未在 {self._runner_spawn_timeout}s 内完成连接,后续操作可能失败") @@ -279,6 +173,10 @@ class PluginSupervisor: """停止 Supervisor""" self._running = False + # 停止组件 + await self._event_dispatcher.stop() + await self._hook_dispatcher.stop() + # 停止健康检查 if self._health_task: self._health_task.cancel() @@ -305,439 +203,34 @@ class PluginSupervisor: 由主进程业务逻辑调用,通过 RPC 转发给 Runner。 """ return await self._rpc_server.send_request( - method=method, - plugin_id=plugin_id, - payload={ - "component_name": component_name, - "args": args or {}, - }, - timeout_ms=timeout_ms, + method, + plugin_id, + {"component_name": component_name, "args": args or {}}, + timeout_ms, ) - async def reload_plugins(self, reason: str = "manual") -> bool: - """热重载所有插件(进程级 generation 切换) + async def reload_plugin(self, plugin_id: str, reason: str = "manual") -> bool: + raise NotImplementedError("等待SDK完成") # TODO: 完成对应的调用和请求逻辑 - 1. 拉起新 Runner - 2. 等待新 Runner 完成注册和健康检查 - 3. 关停旧 Runner - """ - logger.info(f"开始热重载插件,原因: {reason}") + async def _wait_for_runner_connection(self, timeout_sec: float) -> None: + """等待 Runner 连接上 RPC Server""" - # 保存旧进程引用和旧 session token(回滚时需要恢复) - old_process = self._runner_process - old_registered_plugins = dict(self._registered_plugins) - old_session_token = self._rpc_server.session_token - expected_generation = self._rpc_server.runner_generation + 1 + async def wait_for_connection(): + while self._running and not self._rpc_server.is_connected: + await asyncio.sleep(0.1) - # 允许新 Runner 以 staged 方式接入,验证通过后再切换活跃连接 - self._rpc_server.begin_staged_takeover() - self._staged_registered_plugins.clear() - - # 重新生成 session token,防止被终止的旧 Runner 重连 - self._rpc_server.reset_session_token() - - # 注意:不在此处调用 _clear_runtime_state()。 - # 旧组件在新 Runner 完成注册前继续提供服务,避免热重载窗口期内 - # dispatch_event / execute_workflow 找不到任何组件导致消息静默丢失。 - # ComponentRegistry.register_component 对同名组件是覆盖式写入,安全。 - - # 拉起新 Runner try: - await self._spawn_runner() - await self._wait_for_runner_generation( - expected_generation, - timeout_sec=self._runner_spawn_timeout, - allow_staged=True, - ) - await self._wait_for_runner_ready(expected_generation, timeout_sec=self._runner_spawn_timeout) - resp = await self._rpc_server.send_request( - "plugin.health", - timeout_ms=5000, - target_generation=expected_generation, - ) - health = HealthPayload.model_validate(resp.payload) - if not health.healthy: - raise RPCError(ErrorCode.E_PLUGIN_CRASHED, "新 Runner 健康检查失败") - await self._rpc_server.commit_staged_takeover() - except Exception as e: - logger.error(f"新 Runner 健康检查失败: {e},回滚") - await self._terminate_process(self._runner_process, old_process) - await self._rpc_server.rollback_staged_takeover() - self._runner_process = old_process - self._rpc_server.restore_session_token(old_session_token) - self._staged_registered_plugins.clear() - self._registered_plugins = dict(old_registered_plugins) - self._rebuild_runtime_state() - return False + await asyncio.wait_for(wait_for_connection(), timeout=timeout_sec) + logger.info("Runner 已连接到 RPC Server") + except asyncio.TimeoutError as e: + raise TimeoutError(f"等待 Runner 连接超时({timeout_sec}s)") from e - self._runner_generation = self._rpc_server.runner_generation - self._registered_plugins = dict(self._staged_registered_plugins) - self._staged_registered_plugins.clear() - self._rebuild_runtime_state() + async def _wait_for_runner_ready(self, timeout_sec: float = 30.0) -> RunnerReadyPayload: + """等待 Runner 完成初始化并上报就绪""" - # 关停旧 Runner - if old_process and old_process.returncode is None: - try: - old_process.terminate() - await asyncio.wait_for(old_process.wait(), timeout=10.0) - except asyncio.TimeoutError: - old_process.kill() - - logger.info("热重载完成") - return True - - async def notify_plugin_config_updated( - self, - plugin_id: str, - config_data: Dict[str, Any], - config_version: str = "", - ) -> bool: - """通知指定插件其配置已更新。""" - if plugin_id not in self._registered_plugins: - return False - - payload = ConfigUpdatedPayload( - plugin_id=plugin_id, - config_version=config_version, - config_data=config_data, - ) - await self._rpc_server.send_request( - "plugin.config_updated", - plugin_id=plugin_id, - payload=payload.model_dump(), - timeout_ms=5000, - ) - return True - - # ─── 内部方法 ────────────────────────────────────────────── - - def _register_internal_methods(self) -> None: - """注册 Host 端的 RPC 方法处理器""" - # Runner -> Host 的能力调用统一走 capability_service - self._rpc_server.register_method("cap.request", self._capability_service.handle_capability_request) - self._rpc_server.register_method("plugin.bootstrap", self._handle_bootstrap_plugin) - # 插件注册 - self._rpc_server.register_method("plugin.register_components", self._handle_register_components) - self._rpc_server.register_method("runner.ready", self._handle_runner_ready) - # Runner 日志批量上报 - self._rpc_server.register_method("runner.log_batch", self._log_bridge.handle_log_batch) - - async def _handle_bootstrap_plugin(self, envelope: Envelope) -> Envelope: - """处理插件 bootstrap 请求,仅同步能力令牌。""" try: - bootstrap = BootstrapPluginPayload.model_validate(envelope.payload) - except Exception as e: - return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(e)) - - active_generation = self._rpc_server.runner_generation - staged_generation = self._rpc_server.staged_generation - if envelope.generation not in {active_generation, staged_generation}: - return envelope.make_error_response( - ErrorCode.E_GENERATION_MISMATCH.value, - f"插件 bootstrap generation 过期: {envelope.generation} 不在已知代际中", - ) - - if bootstrap.capabilities_required: - self._policy.register_plugin( - plugin_id=bootstrap.plugin_id, - generation=envelope.generation, - capabilities=bootstrap.capabilities_required, - ) - else: - self._policy.revoke_plugin(bootstrap.plugin_id, generation=envelope.generation) - - return envelope.make_response(payload={"accepted": True}) - - async def _handle_register_components(self, envelope: Envelope) -> Envelope: - """处理插件组件注册请求""" - try: - reg = RegisterComponentsPayload.model_validate(envelope.payload) - except Exception as e: - return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(e)) - - active_generation = self._rpc_server.runner_generation - staged_generation = self._rpc_server.staged_generation - if envelope.generation not in {active_generation, staged_generation}: - return envelope.make_error_response( - ErrorCode.E_GENERATION_MISMATCH.value, - f"组件注册 generation 过期: {envelope.generation} 不在已知代际中", - ) - - if envelope.generation == staged_generation and staged_generation != 0: - self._staged_registered_plugins[reg.plugin_id] = reg - logger.info( - f"插件 {reg.plugin_id} v{reg.plugin_version} staged 注册成功," - f"组件数: {len(reg.components)}, 能力需求: {reg.capabilities_required}" - ) - return envelope.make_response(payload={"accepted": True, "staged": True}) - - self._registered_plugins[reg.plugin_id] = reg - - # 在策略引擎中注册插件 - self._policy.register_plugin( - plugin_id=reg.plugin_id, - generation=envelope.generation, - capabilities=reg.capabilities_required or [], - ) - - # 同 generation 下重新注册时,以本次声明为准,避免残留幽灵组件 - self._component_registry.remove_components_by_plugin(reg.plugin_id) - self._component_registry.register_plugin_components( - plugin_id=reg.plugin_id, - components=[c.model_dump() for c in reg.components], - ) - - stats = self._component_registry.get_stats() - logger.info( - f"插件 {reg.plugin_id} v{reg.plugin_version} 注册成功," - f"组件数: {len(reg.components)}, 能力需求: {reg.capabilities_required}," - f"注册表总计: {stats}" - ) - - return envelope.make_response(payload={"accepted": True}) - - async def _handle_runner_ready(self, envelope: Envelope) -> Envelope: - """处理 Runner 初始化完成信号。""" - try: - ready = RunnerReadyPayload.model_validate(envelope.payload) - except Exception as e: - return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(e)) - - event = self._runner_ready_events.setdefault(envelope.generation, asyncio.Event()) - self._runner_ready_payloads[envelope.generation] = ready - event.set() - logger.info( - f"Runner generation={envelope.generation} 已就绪,成功插件数: {len(ready.loaded_plugins)}," - f"失败插件数: {len(ready.failed_plugins)}" - ) - return envelope.make_response(payload={"accepted": True}) - - async def _spawn_runner(self) -> None: - """拉起 Runner 子进程""" - runner_module = "src.plugin_runtime.runner.runner_main" - address = self._transport.get_address() - token = self._rpc_server.session_token - - env = os.environ.copy() - env[ENV_IPC_ADDRESS] = address - env[ENV_SESSION_TOKEN] = token - env[ENV_PLUGIN_DIRS] = os.pathsep.join(str(p) for p in self._plugin_dirs) - env[ENV_HOST_VERSION] = MMC_VERSION - - self._runner_process = await asyncio.create_subprocess_exec( - sys.executable, - "-m", - runner_module, - env=env, - # stdout 不捕获:Runner 的日志均通过 IPC 传㛹(RunnerIPCLogHandler) - stdout=None, - # stderr 捕获为 PIPE,仅用于 IPC 建立前的进程级致命错误输出 - stderr=asyncio.subprocess.PIPE, - ) - - self._attach_stderr_drain(self._runner_process) - self._runner_generation = self._rpc_server.runner_generation - logger.info(f"Runner 子进程已启动: pid={self._runner_process.pid}, generation={self._runner_generation}") - - async def _shutdown_runner(self) -> None: - """优雅关停 Runner""" - if not self._runner_process or self._runner_process.returncode is not None: - return - - # 发送 prepare_shutdown - try: - if self._rpc_server.is_connected: - shutdown_payload = ShutdownPayload(reason="host_shutdown", drain_timeout_ms=5000) - await self._rpc_server.send_request( - "plugin.prepare_shutdown", - payload=shutdown_payload.model_dump(), - timeout_ms=5000, - ) - await self._rpc_server.send_request( - "plugin.shutdown", - payload=shutdown_payload.model_dump(), - timeout_ms=5000, - ) - except Exception as e: - logger.warning(f"发送关停命令失败: {e}") - - # 等待进程退出 - try: - await asyncio.wait_for(self._runner_process.wait(), timeout=10.0) - except asyncio.TimeoutError: - logger.warning("Runner 未在超时内退出,强制终止") - self._runner_process.kill() - await self._runner_process.wait() - - await self._cleanup_stderr_drain() - - async def _health_check_loop(self) -> None: - """周期性健康检查 + 崩溃自动重启""" - while self._running: - await asyncio.sleep(self._health_interval) - - # 检查 Runner 进程是否意外退出 - if self._runner_process and self._runner_process.returncode is not None: - exit_code = self._runner_process.returncode - logger.warning(f"Runner 进程已退出 (exit_code={exit_code})") - - if self._restart_count < self._max_restart_attempts: - self._restart_count += 1 - logger.info(f"尝试重启 Runner ({self._restart_count}/{self._max_restart_attempts})") - # 清理旧的组件注册 - for plugin_id in list(self._registered_plugins.keys()): - self._component_registry.remove_components_by_plugin(plugin_id) - self._policy.revoke_plugin(plugin_id) - self._registered_plugins.clear() - - try: - self._clear_runtime_state() - # 重新生成 session token,防止旧 Runner 僵尸进程用旧 token 重连 - self._rpc_server.reset_session_token() - await self._spawn_runner() - except Exception as e: - logger.error(f"Runner 重启失败: {e}", exc_info=True) - else: - logger.error(f"Runner 连续崩溃 {self._max_restart_attempts} 次,停止重启") - continue - - if not self._rpc_server.is_connected: - logger.warning("Runner 未连接,跳过健康检查") - continue - - try: - resp = await self._rpc_server.send_request("plugin.health", timeout_ms=5000) - health = HealthPayload.model_validate(resp.payload) - if not health.healthy: - logger.warning(f"Runner 健康检查异常: {health}") - else: - # 健康检查成功,重置重启计数 - self._restart_count = 0 - except RPCError as e: - logger.error(f"健康检查失败: {e}") - except asyncio.CancelledError: - break - except Exception as e: - logger.error(f"健康检查异常: {e}") - - async def _wait_for_runner_generation( - self, - expected_generation: int, - timeout_sec: float, - allow_staged: bool = False, - ) -> None: - """等待指定代际的 Runner 完成连接。""" - deadline = asyncio.get_running_loop().time() + timeout_sec - while asyncio.get_running_loop().time() < deadline: - if allow_staged and self._rpc_server.has_generation(expected_generation): - return - if self._rpc_server.is_connected and self._rpc_server.runner_generation >= expected_generation: - self._runner_generation = self._rpc_server.runner_generation - return - await asyncio.sleep(0.1) - raise TimeoutError(f"等待 Runner generation {expected_generation} 超时") - - async def _wait_for_runner_ready(self, expected_generation: int, timeout_sec: float) -> RunnerReadyPayload: - """等待指定代际的 Runner 完成初始化。""" - event = self._runner_ready_events.setdefault(expected_generation, asyncio.Event()) - await asyncio.wait_for(event.wait(), timeout=timeout_sec) - return self._runner_ready_payloads.get(expected_generation, RunnerReadyPayload()) - - def _clear_runtime_state(self) -> None: - """清空当前插件注册态。""" - self._component_registry.clear() - self._policy.clear() - self._registered_plugins.clear() - self._staged_registered_plugins.clear() - - def _rebuild_runtime_state(self) -> None: - """根据已记录的插件注册信息重建运行时状态。""" - self._component_registry.clear() - self._policy.clear() - for reg in self._registered_plugins.values(): - self._policy.register_plugin( - plugin_id=reg.plugin_id, - generation=self._rpc_server.runner_generation, - capabilities=reg.capabilities_required or [], - ) - self._component_registry.register_plugin_components( - plugin_id=reg.plugin_id, - components=[c.model_dump() for c in reg.components], - ) - - def _attach_stderr_drain(self, process: asyncio.subprocess.Process) -> None: - """为 Runner stderr 创建排空任务,捕获 IPC 建立前的进程级错误输出。 - - stderr 中的内容通常是: - - Runner 启动早期(握手完成之前)的日志 - - 进程级致命错误(ImportError、SyntaxError等) - - 异常进程退出前的最后输出 - - 握手成功后,插件的所有日志均经由 RunnerIPCLogHandler 通过 IPC 传输。 - """ - if process.stderr is None: - return - task = asyncio.create_task( - self._drain_runner_stderr(process.stderr, process.pid), - name=f"runner_stderr_drain:{process.pid}", - ) - self._stderr_drain_task = task - task.add_done_callback( - lambda done_task: None if self._stderr_drain_task is not done_task else self._clear_stderr_drain_task() - ) - - def _clear_stderr_drain_task(self) -> None: - self._stderr_drain_task = None - - async def _drain_runner_stderr( - self, - stream: asyncio.StreamReader, - pid: int, - ) -> None: - """持续读取 Runner stderr 并转发到 Host Logger,防止 PIPE 锡死子进程。 - - Args: - stream: Runner 子进程的 stderr 流。 - pid: 子进程 PID,仅用于日志上下文。 - """ - try: - while True: - line = await stream.readline() - if not line: - break - if message := line.decode(errors="replace").rstrip(): - # 将 stderr 输出以 WARNING 级展示: - # 如果 Runner 正常运行,此流应当无输出; - # 有输出说明进程级错误发生,需要出现在主进程日志中 - logger.warning(f"[runner:{pid}:stderr] {message}") - except asyncio.CancelledError: - raise - except Exception as exc: - logger.debug(f"读取 Runner stderr 失败 (pid={pid}): {exc}") - - async def _cleanup_stderr_drain(self) -> None: - """等待并取消 stderr 排空任务。""" - if self._stderr_drain_task is None: - return - task = self._stderr_drain_task - self._stderr_drain_task = None - if not task.done(): - task.cancel() - with contextlib.suppress(Exception): - await asyncio.gather(task, return_exceptions=True) - - @staticmethod - async def _terminate_process( - process: Optional[asyncio.subprocess.Process], - keep_process: Optional[asyncio.subprocess.Process] = None, - ) -> None: - """终止指定进程,但跳过需要保留的旧进程引用。""" - if process is None or process is keep_process or process.returncode is not None: - return - - process.terminate() - try: - await asyncio.wait_for(process.wait(), timeout=10.0) - except asyncio.TimeoutError: - process.kill() - await process.wait() + await asyncio.wait_for(self._runner_ready_events.wait(), timeout=timeout_sec) + logger.info("Runner 已完成初始化并上报就绪") + return self._runner_ready_payloads + except asyncio.TimeoutError as e: + raise TimeoutError(f"等待 Runner 就绪超时({timeout_sec}s)") from e