refactor: supervisor部分方法重写
This commit is contained in:
committed by
DrSmoothl
parent
6cc7e37b1e
commit
3d22657707
@@ -1,98 +1,41 @@
|
||||
"""Supervisor - 插件生命周期管理
|
||||
|
||||
负责:
|
||||
1. 拉起 Runner 子进程
|
||||
2. 健康检查 + 崩溃自动重启
|
||||
3. 代码热重载(generation 切换)
|
||||
4. 优雅关停
|
||||
"""
|
||||
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from pathlib import Path
|
||||
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import logging as stdlib_logging
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
from src.common.logger import get_logger
|
||||
from src.config.config import MMC_VERSION, global_config
|
||||
from src.plugin_runtime import ENV_HOST_VERSION, ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN
|
||||
from src.plugin_runtime.host.capability_service import CapabilityService
|
||||
from src.plugin_runtime.host.component_registry import ComponentRegistry
|
||||
from src.plugin_runtime.host.event_dispatcher import EventDispatcher
|
||||
from src.plugin_runtime.host.policy_engine import PolicyEngine
|
||||
from src.plugin_runtime.host.rpc_server import RPCServer
|
||||
from src.plugin_runtime.host.workflow_executor import WorkflowExecutor, WorkflowContext, WorkflowResult
|
||||
from src.config.config import global_config
|
||||
from src.plugin_runtime.transport.factory import create_transport_server
|
||||
from src.plugin_runtime.protocol.envelope import (
|
||||
BootstrapPluginPayload,
|
||||
ConfigUpdatedPayload,
|
||||
Envelope,
|
||||
HealthPayload,
|
||||
LogBatchPayload,
|
||||
RegisterComponentsPayload,
|
||||
RegisterPluginPayload,
|
||||
RunnerReadyPayload,
|
||||
ShutdownPayload,
|
||||
)
|
||||
from src.plugin_runtime.protocol.errors import ErrorCode, RPCError
|
||||
from src.plugin_runtime.transport.factory import create_transport_server
|
||||
|
||||
logger = get_logger("plugin_runtime.host.supervisor")
|
||||
from .authorization import AuthorizationManager
|
||||
from .capability_service import CapabilityService
|
||||
from .rpc_server import RPCServer
|
||||
from .logger_bridge import RunnerLogBridge
|
||||
from .component_registry import ComponentRegistry
|
||||
from .event_dispatcher import EventDispatcher
|
||||
from .hook_dispatcher import HookDispatcher
|
||||
from .message_gateway import MessageGateway
|
||||
from .message_utils import PluginMessageUtils
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from src.chat.message_receive.message import SessionMessage
|
||||
|
||||
logger = get_logger("plugin_runtime.host.runner_manager")
|
||||
|
||||
|
||||
# ─── 日志桥 ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class RunnerLogBridge:
|
||||
"""将 Runner 进程上报的批量日志重放到主进程的 Logger 中。
|
||||
|
||||
Runner 通过 ``runner.log_batch`` IPC 事件批量到达。
|
||||
每条 LogEntry 被重建为一个真实的 :class:`logging.LogRecord` 并直接
|
||||
调用 ``logging.getLogger(entry.logger_name).handle(record)``,
|
||||
从而接入主进程已配置好的 structlog Handler 链。
|
||||
"""
|
||||
|
||||
async def handle_log_batch(self, envelope: Envelope) -> Envelope:
|
||||
"""IPC 事件处理器:解析批量日志并重放到主进程 Logger。
|
||||
|
||||
Args:
|
||||
envelope: 方法名为 ``runner.log_batch`` 的 IPC 事件信封。
|
||||
|
||||
Returns:
|
||||
空响应信封(事件模式下将被忽略)。
|
||||
"""
|
||||
try:
|
||||
batch = LogBatchPayload.model_validate(envelope.payload)
|
||||
except Exception as exc:
|
||||
return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc))
|
||||
|
||||
for entry in batch.entries:
|
||||
# 重建一个与原始日志尽量相符的 LogRecord
|
||||
record = stdlib_logging.LogRecord(
|
||||
name=entry.logger_name,
|
||||
level=entry.level,
|
||||
pathname="<runner>",
|
||||
lineno=0,
|
||||
msg=entry.message,
|
||||
args=(),
|
||||
exc_info=None,
|
||||
)
|
||||
record.created = entry.timestamp_ms / 1000.0
|
||||
record.msecs = entry.timestamp_ms % 1000
|
||||
if entry.exception_text:
|
||||
record.exc_text = entry.exception_text
|
||||
|
||||
stdlib_logging.getLogger(entry.logger_name).handle(record)
|
||||
|
||||
return envelope.make_response(payload={"accepted": True, "count": len(batch.entries)})
|
||||
|
||||
|
||||
class PluginSupervisor:
|
||||
"""插件 Supervisor
|
||||
|
||||
Host 端的核心管理器,负责整个插件 Runner 进程的生命周期。
|
||||
"""
|
||||
class PluginRunnerSupervisor:
|
||||
"""插件的Runner管理器,负责管理Runner的生命周期"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -103,45 +46,34 @@ class PluginSupervisor:
|
||||
runner_spawn_timeout_sec: Optional[float] = None,
|
||||
):
|
||||
_cfg = global_config.plugin_runtime
|
||||
self._plugin_dirs = plugin_dirs or []
|
||||
self._health_interval = (
|
||||
health_check_interval_sec if health_check_interval_sec is not None else _cfg.health_check_interval_sec
|
||||
)
|
||||
self._runner_spawn_timeout = (
|
||||
runner_spawn_timeout_sec if runner_spawn_timeout_sec is not None else _cfg.runner_spawn_timeout_sec
|
||||
)
|
||||
self._plugin_dirs: List[Path] = plugin_dirs or []
|
||||
self._health_interval = health_check_interval_sec or _cfg.health_check_interval_sec or 30.0
|
||||
self._runner_spawn_timeout = runner_spawn_timeout_sec or _cfg.runner_spawn_timeout_sec or 30.0
|
||||
|
||||
# 基础设施
|
||||
self._transport = create_transport_server(socket_path=socket_path)
|
||||
self._policy = PolicyEngine()
|
||||
self._capability_service = CapabilityService(self._policy)
|
||||
self._authorization = AuthorizationManager()
|
||||
self._capability_service = CapabilityService(self._authorization)
|
||||
self._component_registry = ComponentRegistry()
|
||||
self._event_dispatcher = EventDispatcher(self._component_registry)
|
||||
self._workflow_executor = WorkflowExecutor(self._component_registry)
|
||||
self._hook_dispatcher = HookDispatcher(self._component_registry)
|
||||
self._message_gateway = MessageGateway(self._component_registry)
|
||||
|
||||
# 编解码
|
||||
# 编解码和服务器
|
||||
from src.plugin_runtime.protocol.codec import MsgPackCodec
|
||||
|
||||
codec = MsgPackCodec()
|
||||
|
||||
self._rpc_server = RPCServer(
|
||||
transport=self._transport,
|
||||
codec=codec,
|
||||
)
|
||||
self._rpc_server = RPCServer(transport=self._transport, codec=codec)
|
||||
|
||||
# Runner 子进程
|
||||
self._runner_process: Optional[asyncio.subprocess.Process] = None
|
||||
self._runner_generation: int = 0
|
||||
self._max_restart_attempts: int = (
|
||||
max_restart_attempts if max_restart_attempts is not None else _cfg.max_restart_attempts
|
||||
)
|
||||
self._max_restart_attempts: int = max_restart_attempts or _cfg.max_restart_attempts or 3
|
||||
self._restart_count: int = 0
|
||||
|
||||
# 已注册的插件组件信息
|
||||
self._registered_plugins: Dict[str, RegisterComponentsPayload] = {}
|
||||
self._staged_registered_plugins: Dict[str, RegisterComponentsPayload] = {}
|
||||
self._runner_ready_events: Dict[int, asyncio.Event] = {}
|
||||
self._runner_ready_payloads: Dict[int, RunnerReadyPayload] = {}
|
||||
self._registered_plugins: Dict[str, RegisterPluginPayload] = {}
|
||||
self._runner_ready_events: asyncio.Event = asyncio.Event()
|
||||
self._runner_ready_payloads: RunnerReadyPayload = RunnerReadyPayload()
|
||||
|
||||
# 后台任务
|
||||
self._health_task: Optional[asyncio.Task] = None
|
||||
@@ -153,11 +85,11 @@ class PluginSupervisor:
|
||||
self._log_bridge: RunnerLogBridge = RunnerLogBridge()
|
||||
|
||||
# 注册内部 RPC 方法
|
||||
self._register_internal_methods()
|
||||
self._register_internal_methods() # TODO: 完成内部方法注册
|
||||
|
||||
@property
|
||||
def policy_engine(self) -> PolicyEngine:
|
||||
return self._policy
|
||||
def authorization_manager(self) -> AuthorizationManager:
|
||||
return self._authorization
|
||||
|
||||
@property
|
||||
def capability_service(self) -> CapabilityService:
|
||||
@@ -172,8 +104,12 @@ class PluginSupervisor:
|
||||
return self._event_dispatcher
|
||||
|
||||
@property
|
||||
def workflow_executor(self) -> WorkflowExecutor:
|
||||
return self._workflow_executor
|
||||
def hook_dispatcher(self) -> HookDispatcher:
|
||||
return self._hook_dispatcher
|
||||
|
||||
@property
|
||||
def message_gateway(self) -> MessageGateway:
|
||||
return self._message_gateway
|
||||
|
||||
@property
|
||||
def rpc_server(self) -> RPCServer:
|
||||
@@ -182,64 +118,26 @@ class PluginSupervisor:
|
||||
async def dispatch_event(
|
||||
self,
|
||||
event_type: str,
|
||||
message: Optional[Dict[str, Any]] = None,
|
||||
message: Optional["SessionMessage"] = None,
|
||||
extra_args: Optional[Dict[str, Any]] = None,
|
||||
) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||||
) -> Tuple[bool, Optional["SessionMessage"]]:
|
||||
"""分发事件到所有对应 handler 的快捷方法。"""
|
||||
return await self._event_dispatcher.dispatch_event(event_type, self, message, extra_args)
|
||||
|
||||
async def _invoke(plugin_id: str, component_name: str, args: Dict[str, Any]) -> Dict[str, Any]:
|
||||
resp = await self.invoke_plugin(
|
||||
method="plugin.emit_event",
|
||||
plugin_id=plugin_id,
|
||||
component_name=component_name,
|
||||
args=args,
|
||||
)
|
||||
return resp.payload
|
||||
async def dispatch_hook(self, stage: str, **kwargs):
|
||||
"""分发Hook事件到所有对应 handler 的快捷方法。"""
|
||||
return await self._hook_dispatcher.hook_dispatch(stage, self, **kwargs)
|
||||
|
||||
return await self._event_dispatcher.dispatch_event(
|
||||
event_type=event_type,
|
||||
invoke_fn=_invoke,
|
||||
message=message,
|
||||
extra_args=extra_args,
|
||||
)
|
||||
|
||||
async def execute_workflow(
|
||||
async def send_message_to_external(
|
||||
self,
|
||||
message: Optional[Dict[str, Any]] = None,
|
||||
stream_id: Optional[str] = None,
|
||||
context: Optional[WorkflowContext] = None,
|
||||
) -> Tuple[WorkflowResult, Optional[Dict[str, Any]], WorkflowContext]:
|
||||
"""执行 Workflow Pipeline 的快捷方法。"""
|
||||
|
||||
async def _invoke(plugin_id: str, component_name: str, args: Dict[str, Any]) -> Dict[str, Any]:
|
||||
resp = await self.invoke_plugin(
|
||||
method="plugin.invoke_workflow_step",
|
||||
plugin_id=plugin_id,
|
||||
component_name=component_name,
|
||||
args=args,
|
||||
)
|
||||
payload = resp.payload
|
||||
if payload.get("success"):
|
||||
result = payload.get("result")
|
||||
return result if isinstance(result, dict) else {}
|
||||
raise RuntimeError(payload.get("result", "workflow step invoke failed"))
|
||||
|
||||
async def _command_invoke(plugin_id: str, component_name: str, args: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""命令走 plugin.invoke_command,保留原始返回值结构。"""
|
||||
resp = await self.invoke_plugin(
|
||||
method="plugin.invoke_command",
|
||||
plugin_id=plugin_id,
|
||||
component_name=component_name,
|
||||
args=args,
|
||||
)
|
||||
return resp.payload
|
||||
|
||||
return await self._workflow_executor.execute(
|
||||
invoke_fn=_invoke,
|
||||
message=message,
|
||||
stream_id=stream_id,
|
||||
context=context,
|
||||
command_invoke_fn=_command_invoke,
|
||||
internal_message: "SessionMessage",
|
||||
*,
|
||||
enabled_only: bool = True,
|
||||
save_to_db: bool = True,
|
||||
) -> bool:
|
||||
"""发送系统内部消息到外部平台的快捷方法。"""
|
||||
return await self._message_gateway.send_message_to_external(
|
||||
internal_message, self, enabled_only=enabled_only, save_to_db=save_to_db
|
||||
)
|
||||
|
||||
async def start(self) -> None:
|
||||
@@ -253,17 +151,13 @@ class PluginSupervisor:
|
||||
|
||||
# 启动 RPC Server
|
||||
await self._rpc_server.start()
|
||||
|
||||
# 计算预期 generation(与 reload_plugins 保持一致)
|
||||
expected_generation = self._rpc_server.runner_generation + 1
|
||||
|
||||
# 拉起 Runner 进程
|
||||
await self._spawn_runner()
|
||||
|
||||
# 等待 Runner 完成连接和初始化,避免 start() 返回时 Runner 尚未就绪
|
||||
try:
|
||||
await self._wait_for_runner_generation(expected_generation, timeout_sec=self._runner_spawn_timeout)
|
||||
await self._wait_for_runner_ready(expected_generation, timeout_sec=self._runner_spawn_timeout)
|
||||
await self._wait_for_runner_connection(timeout_sec=self._runner_spawn_timeout)
|
||||
await self._wait_for_runner_ready(timeout_sec=self._runner_spawn_timeout)
|
||||
except TimeoutError:
|
||||
if not self._rpc_server.is_connected:
|
||||
logger.warning(f"Runner 未在 {self._runner_spawn_timeout}s 内完成连接,后续操作可能失败")
|
||||
@@ -279,6 +173,10 @@ class PluginSupervisor:
|
||||
"""停止 Supervisor"""
|
||||
self._running = False
|
||||
|
||||
# 停止组件
|
||||
await self._event_dispatcher.stop()
|
||||
await self._hook_dispatcher.stop()
|
||||
|
||||
# 停止健康检查
|
||||
if self._health_task:
|
||||
self._health_task.cancel()
|
||||
@@ -305,439 +203,34 @@ class PluginSupervisor:
|
||||
由主进程业务逻辑调用,通过 RPC 转发给 Runner。
|
||||
"""
|
||||
return await self._rpc_server.send_request(
|
||||
method=method,
|
||||
plugin_id=plugin_id,
|
||||
payload={
|
||||
"component_name": component_name,
|
||||
"args": args or {},
|
||||
},
|
||||
timeout_ms=timeout_ms,
|
||||
method,
|
||||
plugin_id,
|
||||
{"component_name": component_name, "args": args or {}},
|
||||
timeout_ms,
|
||||
)
|
||||
|
||||
async def reload_plugins(self, reason: str = "manual") -> bool:
|
||||
"""热重载所有插件(进程级 generation 切换)
|
||||
async def reload_plugin(self, plugin_id: str, reason: str = "manual") -> bool:
|
||||
raise NotImplementedError("等待SDK完成") # TODO: 完成对应的调用和请求逻辑
|
||||
|
||||
1. 拉起新 Runner
|
||||
2. 等待新 Runner 完成注册和健康检查
|
||||
3. 关停旧 Runner
|
||||
"""
|
||||
logger.info(f"开始热重载插件,原因: {reason}")
|
||||
async def _wait_for_runner_connection(self, timeout_sec: float) -> None:
|
||||
"""等待 Runner 连接上 RPC Server"""
|
||||
|
||||
# 保存旧进程引用和旧 session token(回滚时需要恢复)
|
||||
old_process = self._runner_process
|
||||
old_registered_plugins = dict(self._registered_plugins)
|
||||
old_session_token = self._rpc_server.session_token
|
||||
expected_generation = self._rpc_server.runner_generation + 1
|
||||
async def wait_for_connection():
|
||||
while self._running and not self._rpc_server.is_connected:
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# 允许新 Runner 以 staged 方式接入,验证通过后再切换活跃连接
|
||||
self._rpc_server.begin_staged_takeover()
|
||||
self._staged_registered_plugins.clear()
|
||||
|
||||
# 重新生成 session token,防止被终止的旧 Runner 重连
|
||||
self._rpc_server.reset_session_token()
|
||||
|
||||
# 注意:不在此处调用 _clear_runtime_state()。
|
||||
# 旧组件在新 Runner 完成注册前继续提供服务,避免热重载窗口期内
|
||||
# dispatch_event / execute_workflow 找不到任何组件导致消息静默丢失。
|
||||
# ComponentRegistry.register_component 对同名组件是覆盖式写入,安全。
|
||||
|
||||
# 拉起新 Runner
|
||||
try:
|
||||
await self._spawn_runner()
|
||||
await self._wait_for_runner_generation(
|
||||
expected_generation,
|
||||
timeout_sec=self._runner_spawn_timeout,
|
||||
allow_staged=True,
|
||||
)
|
||||
await self._wait_for_runner_ready(expected_generation, timeout_sec=self._runner_spawn_timeout)
|
||||
resp = await self._rpc_server.send_request(
|
||||
"plugin.health",
|
||||
timeout_ms=5000,
|
||||
target_generation=expected_generation,
|
||||
)
|
||||
health = HealthPayload.model_validate(resp.payload)
|
||||
if not health.healthy:
|
||||
raise RPCError(ErrorCode.E_PLUGIN_CRASHED, "新 Runner 健康检查失败")
|
||||
await self._rpc_server.commit_staged_takeover()
|
||||
except Exception as e:
|
||||
logger.error(f"新 Runner 健康检查失败: {e},回滚")
|
||||
await self._terminate_process(self._runner_process, old_process)
|
||||
await self._rpc_server.rollback_staged_takeover()
|
||||
self._runner_process = old_process
|
||||
self._rpc_server.restore_session_token(old_session_token)
|
||||
self._staged_registered_plugins.clear()
|
||||
self._registered_plugins = dict(old_registered_plugins)
|
||||
self._rebuild_runtime_state()
|
||||
return False
|
||||
await asyncio.wait_for(wait_for_connection(), timeout=timeout_sec)
|
||||
logger.info("Runner 已连接到 RPC Server")
|
||||
except asyncio.TimeoutError as e:
|
||||
raise TimeoutError(f"等待 Runner 连接超时({timeout_sec}s)") from e
|
||||
|
||||
self._runner_generation = self._rpc_server.runner_generation
|
||||
self._registered_plugins = dict(self._staged_registered_plugins)
|
||||
self._staged_registered_plugins.clear()
|
||||
self._rebuild_runtime_state()
|
||||
async def _wait_for_runner_ready(self, timeout_sec: float = 30.0) -> RunnerReadyPayload:
|
||||
"""等待 Runner 完成初始化并上报就绪"""
|
||||
|
||||
# 关停旧 Runner
|
||||
if old_process and old_process.returncode is None:
|
||||
try:
|
||||
old_process.terminate()
|
||||
await asyncio.wait_for(old_process.wait(), timeout=10.0)
|
||||
except asyncio.TimeoutError:
|
||||
old_process.kill()
|
||||
|
||||
logger.info("热重载完成")
|
||||
return True
|
||||
|
||||
async def notify_plugin_config_updated(
|
||||
self,
|
||||
plugin_id: str,
|
||||
config_data: Dict[str, Any],
|
||||
config_version: str = "",
|
||||
) -> bool:
|
||||
"""通知指定插件其配置已更新。"""
|
||||
if plugin_id not in self._registered_plugins:
|
||||
return False
|
||||
|
||||
payload = ConfigUpdatedPayload(
|
||||
plugin_id=plugin_id,
|
||||
config_version=config_version,
|
||||
config_data=config_data,
|
||||
)
|
||||
await self._rpc_server.send_request(
|
||||
"plugin.config_updated",
|
||||
plugin_id=plugin_id,
|
||||
payload=payload.model_dump(),
|
||||
timeout_ms=5000,
|
||||
)
|
||||
return True
|
||||
|
||||
# ─── 内部方法 ──────────────────────────────────────────────
|
||||
|
||||
def _register_internal_methods(self) -> None:
|
||||
"""注册 Host 端的 RPC 方法处理器"""
|
||||
# Runner -> Host 的能力调用统一走 capability_service
|
||||
self._rpc_server.register_method("cap.request", self._capability_service.handle_capability_request)
|
||||
self._rpc_server.register_method("plugin.bootstrap", self._handle_bootstrap_plugin)
|
||||
# 插件注册
|
||||
self._rpc_server.register_method("plugin.register_components", self._handle_register_components)
|
||||
self._rpc_server.register_method("runner.ready", self._handle_runner_ready)
|
||||
# Runner 日志批量上报
|
||||
self._rpc_server.register_method("runner.log_batch", self._log_bridge.handle_log_batch)
|
||||
|
||||
async def _handle_bootstrap_plugin(self, envelope: Envelope) -> Envelope:
|
||||
"""处理插件 bootstrap 请求,仅同步能力令牌。"""
|
||||
try:
|
||||
bootstrap = BootstrapPluginPayload.model_validate(envelope.payload)
|
||||
except Exception as e:
|
||||
return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(e))
|
||||
|
||||
active_generation = self._rpc_server.runner_generation
|
||||
staged_generation = self._rpc_server.staged_generation
|
||||
if envelope.generation not in {active_generation, staged_generation}:
|
||||
return envelope.make_error_response(
|
||||
ErrorCode.E_GENERATION_MISMATCH.value,
|
||||
f"插件 bootstrap generation 过期: {envelope.generation} 不在已知代际中",
|
||||
)
|
||||
|
||||
if bootstrap.capabilities_required:
|
||||
self._policy.register_plugin(
|
||||
plugin_id=bootstrap.plugin_id,
|
||||
generation=envelope.generation,
|
||||
capabilities=bootstrap.capabilities_required,
|
||||
)
|
||||
else:
|
||||
self._policy.revoke_plugin(bootstrap.plugin_id, generation=envelope.generation)
|
||||
|
||||
return envelope.make_response(payload={"accepted": True})
|
||||
|
||||
async def _handle_register_components(self, envelope: Envelope) -> Envelope:
|
||||
"""处理插件组件注册请求"""
|
||||
try:
|
||||
reg = RegisterComponentsPayload.model_validate(envelope.payload)
|
||||
except Exception as e:
|
||||
return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(e))
|
||||
|
||||
active_generation = self._rpc_server.runner_generation
|
||||
staged_generation = self._rpc_server.staged_generation
|
||||
if envelope.generation not in {active_generation, staged_generation}:
|
||||
return envelope.make_error_response(
|
||||
ErrorCode.E_GENERATION_MISMATCH.value,
|
||||
f"组件注册 generation 过期: {envelope.generation} 不在已知代际中",
|
||||
)
|
||||
|
||||
if envelope.generation == staged_generation and staged_generation != 0:
|
||||
self._staged_registered_plugins[reg.plugin_id] = reg
|
||||
logger.info(
|
||||
f"插件 {reg.plugin_id} v{reg.plugin_version} staged 注册成功,"
|
||||
f"组件数: {len(reg.components)}, 能力需求: {reg.capabilities_required}"
|
||||
)
|
||||
return envelope.make_response(payload={"accepted": True, "staged": True})
|
||||
|
||||
self._registered_plugins[reg.plugin_id] = reg
|
||||
|
||||
# 在策略引擎中注册插件
|
||||
self._policy.register_plugin(
|
||||
plugin_id=reg.plugin_id,
|
||||
generation=envelope.generation,
|
||||
capabilities=reg.capabilities_required or [],
|
||||
)
|
||||
|
||||
# 同 generation 下重新注册时,以本次声明为准,避免残留幽灵组件
|
||||
self._component_registry.remove_components_by_plugin(reg.plugin_id)
|
||||
self._component_registry.register_plugin_components(
|
||||
plugin_id=reg.plugin_id,
|
||||
components=[c.model_dump() for c in reg.components],
|
||||
)
|
||||
|
||||
stats = self._component_registry.get_stats()
|
||||
logger.info(
|
||||
f"插件 {reg.plugin_id} v{reg.plugin_version} 注册成功,"
|
||||
f"组件数: {len(reg.components)}, 能力需求: {reg.capabilities_required},"
|
||||
f"注册表总计: {stats}"
|
||||
)
|
||||
|
||||
return envelope.make_response(payload={"accepted": True})
|
||||
|
||||
async def _handle_runner_ready(self, envelope: Envelope) -> Envelope:
|
||||
"""处理 Runner 初始化完成信号。"""
|
||||
try:
|
||||
ready = RunnerReadyPayload.model_validate(envelope.payload)
|
||||
except Exception as e:
|
||||
return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(e))
|
||||
|
||||
event = self._runner_ready_events.setdefault(envelope.generation, asyncio.Event())
|
||||
self._runner_ready_payloads[envelope.generation] = ready
|
||||
event.set()
|
||||
logger.info(
|
||||
f"Runner generation={envelope.generation} 已就绪,成功插件数: {len(ready.loaded_plugins)},"
|
||||
f"失败插件数: {len(ready.failed_plugins)}"
|
||||
)
|
||||
return envelope.make_response(payload={"accepted": True})
|
||||
|
||||
async def _spawn_runner(self) -> None:
|
||||
"""拉起 Runner 子进程"""
|
||||
runner_module = "src.plugin_runtime.runner.runner_main"
|
||||
address = self._transport.get_address()
|
||||
token = self._rpc_server.session_token
|
||||
|
||||
env = os.environ.copy()
|
||||
env[ENV_IPC_ADDRESS] = address
|
||||
env[ENV_SESSION_TOKEN] = token
|
||||
env[ENV_PLUGIN_DIRS] = os.pathsep.join(str(p) for p in self._plugin_dirs)
|
||||
env[ENV_HOST_VERSION] = MMC_VERSION
|
||||
|
||||
self._runner_process = await asyncio.create_subprocess_exec(
|
||||
sys.executable,
|
||||
"-m",
|
||||
runner_module,
|
||||
env=env,
|
||||
# stdout 不捕获:Runner 的日志均通过 IPC 传㛹(RunnerIPCLogHandler)
|
||||
stdout=None,
|
||||
# stderr 捕获为 PIPE,仅用于 IPC 建立前的进程级致命错误输出
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
|
||||
self._attach_stderr_drain(self._runner_process)
|
||||
self._runner_generation = self._rpc_server.runner_generation
|
||||
logger.info(f"Runner 子进程已启动: pid={self._runner_process.pid}, generation={self._runner_generation}")
|
||||
|
||||
async def _shutdown_runner(self) -> None:
|
||||
"""优雅关停 Runner"""
|
||||
if not self._runner_process or self._runner_process.returncode is not None:
|
||||
return
|
||||
|
||||
# 发送 prepare_shutdown
|
||||
try:
|
||||
if self._rpc_server.is_connected:
|
||||
shutdown_payload = ShutdownPayload(reason="host_shutdown", drain_timeout_ms=5000)
|
||||
await self._rpc_server.send_request(
|
||||
"plugin.prepare_shutdown",
|
||||
payload=shutdown_payload.model_dump(),
|
||||
timeout_ms=5000,
|
||||
)
|
||||
await self._rpc_server.send_request(
|
||||
"plugin.shutdown",
|
||||
payload=shutdown_payload.model_dump(),
|
||||
timeout_ms=5000,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"发送关停命令失败: {e}")
|
||||
|
||||
# 等待进程退出
|
||||
try:
|
||||
await asyncio.wait_for(self._runner_process.wait(), timeout=10.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("Runner 未在超时内退出,强制终止")
|
||||
self._runner_process.kill()
|
||||
await self._runner_process.wait()
|
||||
|
||||
await self._cleanup_stderr_drain()
|
||||
|
||||
async def _health_check_loop(self) -> None:
|
||||
"""周期性健康检查 + 崩溃自动重启"""
|
||||
while self._running:
|
||||
await asyncio.sleep(self._health_interval)
|
||||
|
||||
# 检查 Runner 进程是否意外退出
|
||||
if self._runner_process and self._runner_process.returncode is not None:
|
||||
exit_code = self._runner_process.returncode
|
||||
logger.warning(f"Runner 进程已退出 (exit_code={exit_code})")
|
||||
|
||||
if self._restart_count < self._max_restart_attempts:
|
||||
self._restart_count += 1
|
||||
logger.info(f"尝试重启 Runner ({self._restart_count}/{self._max_restart_attempts})")
|
||||
# 清理旧的组件注册
|
||||
for plugin_id in list(self._registered_plugins.keys()):
|
||||
self._component_registry.remove_components_by_plugin(plugin_id)
|
||||
self._policy.revoke_plugin(plugin_id)
|
||||
self._registered_plugins.clear()
|
||||
|
||||
try:
|
||||
self._clear_runtime_state()
|
||||
# 重新生成 session token,防止旧 Runner 僵尸进程用旧 token 重连
|
||||
self._rpc_server.reset_session_token()
|
||||
await self._spawn_runner()
|
||||
except Exception as e:
|
||||
logger.error(f"Runner 重启失败: {e}", exc_info=True)
|
||||
else:
|
||||
logger.error(f"Runner 连续崩溃 {self._max_restart_attempts} 次,停止重启")
|
||||
continue
|
||||
|
||||
if not self._rpc_server.is_connected:
|
||||
logger.warning("Runner 未连接,跳过健康检查")
|
||||
continue
|
||||
|
||||
try:
|
||||
resp = await self._rpc_server.send_request("plugin.health", timeout_ms=5000)
|
||||
health = HealthPayload.model_validate(resp.payload)
|
||||
if not health.healthy:
|
||||
logger.warning(f"Runner 健康检查异常: {health}")
|
||||
else:
|
||||
# 健康检查成功,重置重启计数
|
||||
self._restart_count = 0
|
||||
except RPCError as e:
|
||||
logger.error(f"健康检查失败: {e}")
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"健康检查异常: {e}")
|
||||
|
||||
async def _wait_for_runner_generation(
|
||||
self,
|
||||
expected_generation: int,
|
||||
timeout_sec: float,
|
||||
allow_staged: bool = False,
|
||||
) -> None:
|
||||
"""等待指定代际的 Runner 完成连接。"""
|
||||
deadline = asyncio.get_running_loop().time() + timeout_sec
|
||||
while asyncio.get_running_loop().time() < deadline:
|
||||
if allow_staged and self._rpc_server.has_generation(expected_generation):
|
||||
return
|
||||
if self._rpc_server.is_connected and self._rpc_server.runner_generation >= expected_generation:
|
||||
self._runner_generation = self._rpc_server.runner_generation
|
||||
return
|
||||
await asyncio.sleep(0.1)
|
||||
raise TimeoutError(f"等待 Runner generation {expected_generation} 超时")
|
||||
|
||||
async def _wait_for_runner_ready(self, expected_generation: int, timeout_sec: float) -> RunnerReadyPayload:
|
||||
"""等待指定代际的 Runner 完成初始化。"""
|
||||
event = self._runner_ready_events.setdefault(expected_generation, asyncio.Event())
|
||||
await asyncio.wait_for(event.wait(), timeout=timeout_sec)
|
||||
return self._runner_ready_payloads.get(expected_generation, RunnerReadyPayload())
|
||||
|
||||
def _clear_runtime_state(self) -> None:
|
||||
"""清空当前插件注册态。"""
|
||||
self._component_registry.clear()
|
||||
self._policy.clear()
|
||||
self._registered_plugins.clear()
|
||||
self._staged_registered_plugins.clear()
|
||||
|
||||
def _rebuild_runtime_state(self) -> None:
|
||||
"""根据已记录的插件注册信息重建运行时状态。"""
|
||||
self._component_registry.clear()
|
||||
self._policy.clear()
|
||||
for reg in self._registered_plugins.values():
|
||||
self._policy.register_plugin(
|
||||
plugin_id=reg.plugin_id,
|
||||
generation=self._rpc_server.runner_generation,
|
||||
capabilities=reg.capabilities_required or [],
|
||||
)
|
||||
self._component_registry.register_plugin_components(
|
||||
plugin_id=reg.plugin_id,
|
||||
components=[c.model_dump() for c in reg.components],
|
||||
)
|
||||
|
||||
def _attach_stderr_drain(self, process: asyncio.subprocess.Process) -> None:
|
||||
"""为 Runner stderr 创建排空任务,捕获 IPC 建立前的进程级错误输出。
|
||||
|
||||
stderr 中的内容通常是:
|
||||
- Runner 启动早期(握手完成之前)的日志
|
||||
- 进程级致命错误(ImportError、SyntaxError等)
|
||||
- 异常进程退出前的最后输出
|
||||
|
||||
握手成功后,插件的所有日志均经由 RunnerIPCLogHandler 通过 IPC 传输。
|
||||
"""
|
||||
if process.stderr is None:
|
||||
return
|
||||
task = asyncio.create_task(
|
||||
self._drain_runner_stderr(process.stderr, process.pid),
|
||||
name=f"runner_stderr_drain:{process.pid}",
|
||||
)
|
||||
self._stderr_drain_task = task
|
||||
task.add_done_callback(
|
||||
lambda done_task: None if self._stderr_drain_task is not done_task else self._clear_stderr_drain_task()
|
||||
)
|
||||
|
||||
def _clear_stderr_drain_task(self) -> None:
|
||||
self._stderr_drain_task = None
|
||||
|
||||
async def _drain_runner_stderr(
|
||||
self,
|
||||
stream: asyncio.StreamReader,
|
||||
pid: int,
|
||||
) -> None:
|
||||
"""持续读取 Runner stderr 并转发到 Host Logger,防止 PIPE 锡死子进程。
|
||||
|
||||
Args:
|
||||
stream: Runner 子进程的 stderr 流。
|
||||
pid: 子进程 PID,仅用于日志上下文。
|
||||
"""
|
||||
try:
|
||||
while True:
|
||||
line = await stream.readline()
|
||||
if not line:
|
||||
break
|
||||
if message := line.decode(errors="replace").rstrip():
|
||||
# 将 stderr 输出以 WARNING 级展示:
|
||||
# 如果 Runner 正常运行,此流应当无输出;
|
||||
# 有输出说明进程级错误发生,需要出现在主进程日志中
|
||||
logger.warning(f"[runner:{pid}:stderr] {message}")
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.debug(f"读取 Runner stderr 失败 (pid={pid}): {exc}")
|
||||
|
||||
async def _cleanup_stderr_drain(self) -> None:
|
||||
"""等待并取消 stderr 排空任务。"""
|
||||
if self._stderr_drain_task is None:
|
||||
return
|
||||
task = self._stderr_drain_task
|
||||
self._stderr_drain_task = None
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
with contextlib.suppress(Exception):
|
||||
await asyncio.gather(task, return_exceptions=True)
|
||||
|
||||
@staticmethod
|
||||
async def _terminate_process(
|
||||
process: Optional[asyncio.subprocess.Process],
|
||||
keep_process: Optional[asyncio.subprocess.Process] = None,
|
||||
) -> None:
|
||||
"""终止指定进程,但跳过需要保留的旧进程引用。"""
|
||||
if process is None or process is keep_process or process.returncode is not None:
|
||||
return
|
||||
|
||||
process.terminate()
|
||||
try:
|
||||
await asyncio.wait_for(process.wait(), timeout=10.0)
|
||||
except asyncio.TimeoutError:
|
||||
process.kill()
|
||||
await process.wait()
|
||||
await asyncio.wait_for(self._runner_ready_events.wait(), timeout=timeout_sec)
|
||||
logger.info("Runner 已完成初始化并上报就绪")
|
||||
return self._runner_ready_payloads
|
||||
except asyncio.TimeoutError as e:
|
||||
raise TimeoutError(f"等待 Runner 就绪超时({timeout_sec}s)") from e
|
||||
|
||||
Reference in New Issue
Block a user