feat: Enhance plugin loading and management

- Added module_name parameter to PluginMeta for better module tracking.
- Improved documentation for PluginMeta and PluginLoader methods.
- Introduced methods for managing loaded plugins: set_loaded_plugin, remove_loaded_plugin, and purge_plugin_modules.
- Enhanced dependency resolution in PluginLoader with resolve_dependencies method.
- Implemented candidate discovery and loading in PluginLoader.
- Added support for plugin reloading with _reload_plugin_by_id in PluginRunner.
- Improved error handling and logging throughout the RPCClient and PluginRunner.
- Added support for handling hook invocations in PluginRunner.
- Refactored plugin registration and unregistration processes for clarity and efficiency.
This commit is contained in:
DrSmoothl
2026-03-20 22:23:47 +08:00
parent 07256182fb
commit e4850c469f
9 changed files with 1351 additions and 333 deletions

View File

@@ -1,32 +1,38 @@
from pathlib import Path
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import asyncio
import contextlib
import os
import sys
from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_runtime.transport.factory import create_transport_server
from src.plugin_runtime import ENV_HOST_VERSION, ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN
from src.plugin_runtime.protocol.envelope import (
BootstrapPluginPayload,
ConfigUpdatedPayload,
Envelope,
HealthPayload,
LogBatchPayload,
PROTOCOL_VERSION,
RegisterPluginPayload,
ReloadPluginResultPayload,
RunnerReadyPayload,
ShutdownPayload,
UnregisterPluginPayload,
)
from src.plugin_runtime.protocol.codec import MsgPackCodec
from src.plugin_runtime.protocol.errors import ErrorCode, RPCError
from src.plugin_runtime.transport.factory import create_transport_server
from .authorization import AuthorizationManager
from .capability_service import CapabilityService
from .rpc_server import RPCServer
from .logger_bridge import RunnerLogBridge
from .component_registry import ComponentRegistry
from .event_dispatcher import EventDispatcher
from .hook_dispatcher import HookDispatcher
from .logger_bridge import RunnerLogBridge
from .message_gateway import MessageGateway
from .message_utils import PluginMessageUtils
from .rpc_server import RPCServer
if TYPE_CHECKING:
from src.chat.message_receive.message import SessionMessage
@@ -35,7 +41,11 @@ logger = get_logger("plugin_runtime.host.runner_manager")
class PluginRunnerSupervisor:
"""插件Runner管理器负责管理Runner的生命周期"""
"""插件 Runner 监督器。
负责 Host 侧与单个 Runner 子进程之间的生命周期、内部 RPC、
健康检查和插件级重载协调。
"""
def __init__(
self,
@@ -44,13 +54,24 @@ class PluginRunnerSupervisor:
health_check_interval_sec: Optional[float] = None,
max_restart_attempts: Optional[int] = None,
runner_spawn_timeout_sec: Optional[float] = None,
):
_cfg = global_config.plugin_runtime
self._plugin_dirs: List[Path] = plugin_dirs or []
self._health_interval = health_check_interval_sec or _cfg.health_check_interval_sec or 30.0
self._runner_spawn_timeout = runner_spawn_timeout_sec or _cfg.runner_spawn_timeout_sec or 30.0
) -> None:
"""初始化 Supervisor。
Args:
plugin_dirs: 由当前 Runner 负责加载的插件目录列表。
socket_path: 自定义 IPC 地址;留空时由传输层自动生成。
health_check_interval_sec: 健康检查间隔,单位秒。
max_restart_attempts: 自动重启 Runner 的最大次数。
runner_spawn_timeout_sec: 等待 Runner 建连并就绪的超时时间,单位秒。
"""
runtime_config = global_config.plugin_runtime
self._plugin_dirs: List[Path] = plugin_dirs or []
self._health_interval: float = health_check_interval_sec or runtime_config.health_check_interval_sec or 30.0
self._runner_spawn_timeout: float = (
runner_spawn_timeout_sec or runtime_config.runner_spawn_timeout_sec or 30.0
)
self._max_restart_attempts: int = max_restart_attempts or runtime_config.max_restart_attempts or 3
# 基础设施
self._transport = create_transport_server(socket_path=socket_path)
self._authorization = AuthorizationManager()
self._capability_service = CapabilityService(self._authorization)
@@ -58,61 +79,55 @@ class PluginRunnerSupervisor:
self._event_dispatcher = EventDispatcher(self._component_registry)
self._hook_dispatcher = HookDispatcher(self._component_registry)
self._message_gateway = MessageGateway(self._component_registry)
# 编解码和服务器
from src.plugin_runtime.protocol.codec import MsgPackCodec
self._log_bridge = RunnerLogBridge()
codec = MsgPackCodec()
self._rpc_server = RPCServer(transport=self._transport, codec=codec)
# Runner 子进程
self._runner_process: Optional[asyncio.subprocess.Process] = None
self._max_restart_attempts: int = max_restart_attempts or _cfg.max_restart_attempts or 3
self._restart_count: int = 0
# 已注册的插件组件信息
self._registered_plugins: Dict[str, RegisterPluginPayload] = {}
self._runner_ready_events: asyncio.Event = asyncio.Event()
self._runner_ready_payloads: RunnerReadyPayload = RunnerReadyPayload()
self._health_task: Optional[asyncio.Task[None]] = None
self._stderr_drain_task: Optional[asyncio.Task[None]] = None
self._restart_count: int = 0
self._running: bool = False
# 后台任务
self._health_task: Optional[asyncio.Task] = None
# Runner stderr 流排空任务(仅保留 stderr用于 IPC 建立前的启动日志倒空、致命错误输出等场景)
self._stderr_drain_task: Optional[asyncio.Task] = None
self._running = False
# Runner 日志桥(将 Runner 上报的批量日志重放到主进程 Logger
self._log_bridge: RunnerLogBridge = RunnerLogBridge()
# 注册内部 RPC 方法
self._register_internal_methods() # TODO: 完成内部方法注册
self._register_internal_methods()
@property
def authorization_manager(self) -> AuthorizationManager:
"""返回授权管理器。"""
return self._authorization
@property
def capability_service(self) -> CapabilityService:
"""返回能力服务。"""
return self._capability_service
@property
def component_registry(self) -> ComponentRegistry:
"""返回组件注册表。"""
return self._component_registry
@property
def event_dispatcher(self) -> EventDispatcher:
"""返回事件分发器。"""
return self._event_dispatcher
@property
def hook_dispatcher(self) -> HookDispatcher:
"""返回 Hook 分发器。"""
return self._hook_dispatcher
@property
def message_gateway(self) -> MessageGateway:
"""返回消息网关。"""
return self._message_gateway
@property
def rpc_server(self) -> RPCServer:
"""返回底层 RPC 服务端。"""
return self._rpc_server
async def dispatch_event(
@@ -121,11 +136,28 @@ class PluginRunnerSupervisor:
message: Optional["SessionMessage"] = None,
extra_args: Optional[Dict[str, Any]] = None,
) -> Tuple[bool, Optional["SessionMessage"]]:
"""分发事件到所有对应 handler 的快捷方法。"""
"""分发事件到已注册的事件处理器。
Args:
event_type: 事件类型。
message: 可选的消息对象。
extra_args: 附加参数。
Returns:
Tuple[bool, Optional[SessionMessage]]: 是否继续处理,以及插件可能修改后的消息。
"""
return await self._event_dispatcher.dispatch_event(event_type, self, message, extra_args)
async def dispatch_hook(self, stage: str, **kwargs):
"""分发Hook事件到所有对应 handler 的快捷方法。"""
async def dispatch_hook(self, stage: str, **kwargs: Any) -> Dict[str, Any]:
"""分发 Hook 到已注册的 Hook 处理器。
Args:
stage: Hook 阶段名称。
**kwargs: 传递给 Hook 的关键字参数。
Returns:
Dict[str, Any]: 经 Hook 修改后的参数字典。
"""
return await self._hook_dispatcher.hook_dispatch(stage, self, **kwargs)
async def send_message_to_external(
@@ -135,60 +167,68 @@ class PluginRunnerSupervisor:
enabled_only: bool = True,
save_to_db: bool = True,
) -> bool:
"""发送系统内部消息到外部平台的快捷方法。"""
"""通过插件消息网关发送外部消息。
Args:
internal_message: 系统内部消息对象。
enabled_only: 是否仅使用启用的网关组件。
save_to_db: 发送成功后是否写入数据库。
Returns:
bool: 是否发送成功。
"""
return await self._message_gateway.send_message_to_external(
internal_message, self, enabled_only=enabled_only, save_to_db=save_to_db
internal_message,
self,
enabled_only=enabled_only,
save_to_db=save_to_db,
)
async def start(self) -> None:
"""启动 Supervisor
"""启动 Supervisor"""
if self._running:
logger.warning("PluginRunnerSupervisor 已在运行,跳过重复启动")
return
1. 启动 RPC Server
2. 拉起 Runner 子进程
3. 启动健康检查
"""
self._running = True
self._restart_count = 0
self._clear_runner_state()
# 启动 RPC Server
await self._rpc_server.start()
# 拉起 Runner 进程
await self._spawn_runner()
# 等待 Runner 完成连接和初始化,避免 start() 返回时 Runner 尚未就绪
try:
await self._wait_for_runner_connection(timeout_sec=self._runner_spawn_timeout)
await self._wait_for_runner_ready(timeout_sec=self._runner_spawn_timeout)
except TimeoutError:
if not self._rpc_server.is_connected:
logger.warning(f"Runner 未在 {self._runner_spawn_timeout}s 内完成连接,后续操作可能失败")
logger.warning("Runner 未在限定时间内完成连接,后续操作可能失败")
else:
logger.warning(f"Runner 未在 {self._runner_spawn_timeout}s 内完成初始化,后续操作可能失败")
logger.warning("Runner 未在限定时间内完成初始化,后续操作可能失败")
# 启动健康检查
self._health_task = asyncio.create_task(self._health_check_loop())
logger.info("PluginSupervisor 已启动")
self._health_task = asyncio.create_task(self._health_check_loop(), name="PluginRunnerSupervisor.health")
logger.info("PluginRunnerSupervisor 已启动")
async def stop(self) -> None:
"""停止 Supervisor"""
"""停止 Supervisor"""
if not self._running:
return
self._running = False
# 停止组件
await self._event_dispatcher.stop()
await self._hook_dispatcher.stop()
# 停止健康检查
if self._health_task:
if self._health_task is not None:
self._health_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._health_task
self._health_task = None
# 优雅关停 Runner
await self._shutdown_runner()
# 停止 RPC Server
await self._event_dispatcher.stop()
await self._hook_dispatcher.stop()
await self._shutdown_runner(reason="host_stop")
await self._rpc_server.stop()
self._clear_runner_state()
logger.info("PluginSupervisor 已停止")
logger.info("PluginRunnerSupervisor 已停止")
async def invoke_plugin(
self,
@@ -198,9 +238,17 @@ class PluginRunnerSupervisor:
args: Optional[Dict[str, Any]] = None,
timeout_ms: int = 30000,
) -> Envelope:
"""调用插件组件
"""调用 Runner 内的插件组件
由主进程业务逻辑调用,通过 RPC 转发给 Runner。
Args:
method: RPC 方法名。
plugin_id: 目标插件 ID。
component_name: 组件名。
args: 调用参数。
timeout_ms: RPC 超时时间,单位毫秒。
Returns:
Envelope: RPC 响应信封。
"""
return await self._rpc_server.send_request(
method,
@@ -210,27 +258,421 @@ class PluginRunnerSupervisor:
)
async def reload_plugin(self, plugin_id: str, reason: str = "manual") -> bool:
raise NotImplementedError("等待SDK完成") # TODO: 完成对应的调用和请求逻辑
"""按插件 ID 触发精确重载。
Args:
plugin_id: 目标插件 ID。
reason: 重载原因。
Returns:
bool: 是否重载成功。
"""
try:
response = await self._rpc_server.send_request(
"plugin.reload",
plugin_id=plugin_id,
payload={"plugin_id": plugin_id, "reason": reason},
timeout_ms=max(int(self._runner_spawn_timeout * 1000), 10000),
)
except Exception as exc:
logger.error(f"插件 {plugin_id} 重载请求失败: {exc}")
return False
result = ReloadPluginResultPayload.model_validate(response.payload)
if not result.success:
logger.warning(f"插件 {plugin_id} 重载失败: {result.failed_plugins}")
return result.success
async def reload_plugins(
self,
plugin_ids: Optional[List[str]] = None,
reason: str = "manual",
) -> bool:
"""批量重载插件。
Args:
plugin_ids: 目标插件 ID 列表;为空时重载当前已注册的全部插件。
reason: 重载原因。
Returns:
bool: 是否全部重载成功。
"""
target_plugin_ids = plugin_ids or list(self._registered_plugins.keys())
ordered_plugin_ids = list(dict.fromkeys(target_plugin_ids))
success = True
for plugin_id in ordered_plugin_ids:
reloaded = await self.reload_plugin(plugin_id=plugin_id, reason=reason)
success = success and reloaded
return success
async def notify_plugin_config_updated(
self,
plugin_id: str,
config_data: Optional[Dict[str, Any]] = None,
config_version: str = "",
) -> bool:
"""向 Runner 推送插件配置更新。
Args:
plugin_id: 目标插件 ID。
config_data: 配置内容。
config_version: 配置版本号。
Returns:
bool: 请求是否成功送达并被 Runner 接受。
"""
payload = ConfigUpdatedPayload(
plugin_id=plugin_id,
config_version=config_version,
config_data=config_data or {},
)
try:
response = await self._rpc_server.send_request(
"plugin.config_updated",
plugin_id=plugin_id,
payload=payload.model_dump(),
timeout_ms=10000,
)
except Exception as exc:
logger.warning(f"插件 {plugin_id} 配置更新通知失败: {exc}")
return False
return bool(response.payload.get("acknowledged", False))
async def _wait_for_runner_connection(self, timeout_sec: float) -> None:
"""等待 Runner 连接上 RPC Server"""
"""等待 Runner 建立 RPC 连接。
async def wait_for_connection():
Args:
timeout_sec: 超时时间,单位秒。
Raises:
TimeoutError: 在超时时间内 Runner 未完成连接。
"""
async def wait_for_connection() -> None:
"""轮询等待 RPC 连接建立。"""
while self._running and not self._rpc_server.is_connected:
await asyncio.sleep(0.1)
try:
await asyncio.wait_for(wait_for_connection(), timeout=timeout_sec)
logger.info("Runner 已连接到 RPC Server")
except asyncio.TimeoutError as e:
raise TimeoutError(f"等待 Runner 连接超时({timeout_sec}s") from e
except asyncio.TimeoutError as exc:
raise TimeoutError(f"等待 Runner 连接超时({timeout_sec}s") from exc
async def _wait_for_runner_ready(self, timeout_sec: float = 30.0) -> RunnerReadyPayload:
"""等待 Runner 完成初始化并上报就绪"""
"""等待 Runner 完成启动初始化
Args:
timeout_sec: 超时时间,单位秒。
Returns:
RunnerReadyPayload: Runner 上报的就绪信息。
Raises:
TimeoutError: 在超时时间内 Runner 未完成初始化。
"""
try:
await asyncio.wait_for(self._runner_ready_events.wait(), timeout=timeout_sec)
logger.info("Runner 已完成初始化并上报就绪")
return self._runner_ready_payloads
except asyncio.TimeoutError as e:
raise TimeoutError(f"等待 Runner 就绪超时({timeout_sec}s") from e
except asyncio.TimeoutError as exc:
raise TimeoutError(f"等待 Runner 就绪超时({timeout_sec}s") from exc
def _register_internal_methods(self) -> None:
"""注册 Host 侧内部 RPC 方法。"""
self._rpc_server.register_method("cap.call", self._capability_service.handle_capability_request)
self._rpc_server.register_method("plugin.bootstrap", self._handle_bootstrap_plugin)
self._rpc_server.register_method("plugin.register_components", self._handle_register_plugin)
self._rpc_server.register_method("plugin.register_plugin", self._handle_register_plugin)
self._rpc_server.register_method("plugin.unregister", self._handle_unregister_plugin)
self._rpc_server.register_method("runner.log_batch", self._log_bridge.handle_log_batch)
self._rpc_server.register_method("runner.ready", self._handle_runner_ready)
async def _handle_bootstrap_plugin(self, envelope: Envelope) -> Envelope:
"""处理插件 bootstrap 请求。
Args:
envelope: RPC 请求信封。
Returns:
Envelope: RPC 响应信封。
"""
try:
payload = BootstrapPluginPayload.model_validate(envelope.payload)
except Exception as exc:
return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc))
if payload.capabilities_required:
self._authorization.register_plugin(payload.plugin_id, payload.capabilities_required)
else:
self._authorization.revoke_permission_token(payload.plugin_id)
return envelope.make_response(payload={"accepted": True, "plugin_id": payload.plugin_id})
async def _handle_register_plugin(self, envelope: Envelope) -> Envelope:
"""处理插件组件注册请求。
Args:
envelope: RPC 请求信封。
Returns:
Envelope: RPC 响应信封。
"""
try:
payload = RegisterPluginPayload.model_validate(envelope.payload)
except Exception as exc:
return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc))
self._component_registry.remove_components_by_plugin(payload.plugin_id)
registered_count = self._component_registry.register_plugin_components(
payload.plugin_id,
[component.model_dump() for component in payload.components],
)
self._registered_plugins[payload.plugin_id] = payload
return envelope.make_response(
payload={
"accepted": True,
"plugin_id": payload.plugin_id,
"registered_components": registered_count,
}
)
async def _handle_unregister_plugin(self, envelope: Envelope) -> Envelope:
"""处理插件注销请求。
Args:
envelope: RPC 请求信封。
Returns:
Envelope: RPC 响应信封。
"""
try:
payload = UnregisterPluginPayload.model_validate(envelope.payload)
except Exception as exc:
return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc))
removed_components = self._component_registry.remove_components_by_plugin(payload.plugin_id)
self._authorization.revoke_permission_token(payload.plugin_id)
removed_registration = self._registered_plugins.pop(payload.plugin_id, None) is not None
return envelope.make_response(
payload={
"accepted": True,
"plugin_id": payload.plugin_id,
"reason": payload.reason,
"removed_components": removed_components,
"removed_registration": removed_registration,
}
)
async def _handle_runner_ready(self, envelope: Envelope) -> Envelope:
"""处理 Runner 就绪通知。
Args:
envelope: RPC 请求信封。
Returns:
Envelope: RPC 响应信封。
"""
try:
payload = RunnerReadyPayload.model_validate(envelope.payload)
except Exception as exc:
return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc))
self._runner_ready_payloads = payload
self._runner_ready_events.set()
return envelope.make_response(payload={"accepted": True})
def _build_runner_environment(self) -> Dict[str, str]:
"""构建拉起 Runner 所需的环境变量。
Returns:
Dict[str, str]: 传递给 Runner 进程的环境变量映射。
"""
return {
ENV_HOST_VERSION: PROTOCOL_VERSION,
ENV_IPC_ADDRESS: self._transport.get_address(),
ENV_PLUGIN_DIRS: os.pathsep.join(str(path) for path in self._plugin_dirs),
ENV_SESSION_TOKEN: self._rpc_server.session_token,
}
async def _spawn_runner(self) -> None:
"""拉起 Runner 子进程。"""
if self._runner_process is not None and self._runner_process.returncode is None:
logger.warning("Runner 已在运行,跳过重复拉起")
return
self._clear_runner_state()
env = os.environ.copy()
env.update(self._build_runner_environment())
self._runner_process = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"src.plugin_runtime.runner.runner_main",
env=env,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE,
)
if self._runner_process.stderr is not None:
self._stderr_drain_task = asyncio.create_task(
self._drain_runner_stderr(self._runner_process.stderr),
name="PluginRunnerSupervisor.stderr",
)
logger.info(f"Runner 已拉起pid={self._runner_process.pid}")
async def _drain_runner_stderr(self, stream: asyncio.StreamReader) -> None:
"""持续排空 Runner 的 stderr。
Args:
stream: Runner 的 stderr 流。
"""
try:
while True:
line = await stream.readline()
if not line:
return
message = line.decode("utf-8", errors="replace").rstrip()
if message:
logger.warning(f"[runner-stderr] {message}")
except asyncio.CancelledError:
raise
except Exception as exc:
logger.warning(f"排空 Runner stderr 失败: {exc}")
async def _shutdown_runner(self, reason: str = "normal") -> None:
"""优雅关闭 Runner 子进程。
Args:
reason: 关停原因。
"""
process = self._runner_process
if process is None:
return
payload = ShutdownPayload(reason=reason)
if process.returncode is None and self._rpc_server.is_connected:
with contextlib.suppress(Exception):
await self._rpc_server.send_request(
"plugin.prepare_shutdown",
payload=payload.model_dump(),
timeout_ms=payload.drain_timeout_ms,
)
with contextlib.suppress(Exception):
await self._rpc_server.send_request(
"plugin.shutdown",
payload=payload.model_dump(),
timeout_ms=payload.drain_timeout_ms,
)
if process.returncode is None:
try:
await asyncio.wait_for(process.wait(), timeout=max(payload.drain_timeout_ms / 1000.0, 1.0))
except asyncio.TimeoutError:
logger.warning("Runner 优雅退出超时,尝试 terminate")
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
logger.warning("Runner terminate 超时,尝试 kill")
process.kill()
with contextlib.suppress(Exception):
await asyncio.wait_for(process.wait(), timeout=5.0)
self._runner_process = None
if self._stderr_drain_task is not None:
self._stderr_drain_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._stderr_drain_task
self._stderr_drain_task = None
self._clear_runner_state()
async def _health_check_loop(self) -> None:
"""周期性检查 Runner 健康状态,并在必要时重启。"""
timeout_ms = max(int(self._health_interval * 1000), 1000)
while self._running:
try:
await asyncio.sleep(self._health_interval)
except asyncio.CancelledError:
return
if not self._running:
return
process = self._runner_process
if process is None or process.returncode is not None:
reason = "runner_process_exited" if process is not None else "runner_process_missing"
restarted = await self._restart_runner(reason=reason)
if not restarted:
return
continue
try:
response = await self._rpc_server.send_request("plugin.health", timeout_ms=timeout_ms)
health = HealthPayload.model_validate(response.payload)
if not health.healthy:
restarted = await self._restart_runner(reason="health_check_unhealthy")
if not restarted:
return
except asyncio.CancelledError:
return
except (RPCError, Exception) as exc:
logger.warning(f"Runner 健康检查失败: {exc}")
restarted = await self._restart_runner(reason="health_check_failed")
if not restarted:
return
async def _restart_runner(self, reason: str) -> bool:
"""在 Runner 异常时执行整进程级重启。
Args:
reason: 触发重启的原因。
Returns:
bool: 是否重启成功。
"""
if not self._running:
return False
if self._restart_count >= self._max_restart_attempts:
logger.error(f"Runner 自动重启次数已达上限停止重启。reason={reason}")
return False
self._restart_count += 1
logger.warning(f"准备重启 Runner{self._restart_count}reason={reason}")
await self._shutdown_runner(reason=reason)
try:
await self._spawn_runner()
await self._wait_for_runner_connection(timeout_sec=self._runner_spawn_timeout)
await self._wait_for_runner_ready(timeout_sec=self._runner_spawn_timeout)
except Exception as exc:
logger.error(f"Runner 重启失败: {exc}", exc_info=True)
return False
self._restart_count = 0
logger.info("Runner 已成功重启")
return True
def _clear_runner_state(self) -> None:
"""清理当前 Runner 对应的 Host 侧注册状态。"""
self._authorization.clear()
self._component_registry.clear()
self._registered_plugins.clear()
self._runner_ready_events = asyncio.Event()
self._runner_ready_payloads = RunnerReadyPayload()
PluginSupervisor = PluginRunnerSupervisor