feat: Enhance plugin configuration management and SDK integration

- Add support for configuration reload scopes in the plugin runtime.
- Implement validation for SDK plugins to ensure required lifecycle methods are overridden.
- Update the configuration update handling to include scope information.
- Introduce tests for expression auto-check task and NapCat adapter SDK integration.
- Refactor configuration management to support callbacks with variable arguments.
- Improve plugin loading and error handling for configuration updates.
- Ensure that plugins can manage their own configuration updates effectively.
This commit is contained in:
DrSmoothl
2026-03-23 20:06:12 +08:00
parent 9dea6b0e6f
commit d13767ee21
16 changed files with 907 additions and 71 deletions

View File

@@ -399,6 +399,7 @@ class PluginRunnerSupervisor:
plugin_id: str,
config_data: Optional[Dict[str, Any]] = None,
config_version: str = "",
config_scope: str = "self",
) -> bool:
"""向 Runner 推送插件配置更新。
@@ -406,12 +407,14 @@ class PluginRunnerSupervisor:
plugin_id: 目标插件 ID。
config_data: 配置内容。
config_version: 配置版本号。
config_scope: 配置变更范围。
Returns:
bool: 请求是否成功送达并被 Runner 接受。
"""
payload = ConfigUpdatedPayload(
plugin_id=plugin_id,
config_scope=config_scope,
config_version=config_version,
config_data=config_data or {},
)
@@ -428,6 +431,22 @@ class PluginRunnerSupervisor:
return bool(response.payload.get("acknowledged", False))
def get_config_reload_subscribers(self, scope: str) -> List[str]:
"""返回订阅指定全局配置广播的插件列表。
Args:
scope: 配置变更范围,仅支持 ``bot`` 或 ``model``。
Returns:
List[str]: 已声明订阅该范围的插件 ID 列表。
"""
matched_plugins: List[str] = []
for plugin_id, registration in self._registered_plugins.items():
if scope in registration.config_reload_subscriptions:
matched_plugins.append(plugin_id)
return matched_plugins
async def _wait_for_runner_connection(self, timeout_sec: float) -> None:
"""等待 Runner 建立 RPC 连接。

View File

@@ -16,7 +16,7 @@ import json
import tomlkit
from src.common.logger import get_logger
from src.config.config import global_config
from src.config.config import config_manager
from src.config.file_watcher import FileChange, FileWatcher
from src.platform_io import DeliveryBatch, InboundMessageEnvelope, get_platform_io_manager
from src.plugin_runtime.capabilities import (
@@ -69,6 +69,8 @@ class PluginRuntimeManager(
self._plugin_source_watcher_subscription_id: Optional[str] = None
self._plugin_config_watcher_subscriptions: Dict[str, Tuple[Path, str]] = {}
self._plugin_path_cache: Dict[str, Path] = {}
self._config_reload_callback: Callable[[Sequence[str]], Awaitable[None]] = self._handle_main_config_reload
self._config_reload_callback_registered: bool = False
async def _dispatch_platform_inbound(self, envelope: InboundMessageEnvelope) -> None:
"""接收 Platform IO 审核后的入站消息并送入主消息链。
@@ -108,7 +110,7 @@ class PluginRuntimeManager(
logger.warning("PluginRuntimeManager 已在运行中,跳过重复启动")
return
_cfg = global_config.plugin_runtime
_cfg = config_manager.get_global_config().plugin_runtime
if not _cfg.enabled:
logger.info("插件运行时已在配置中禁用,跳过启动")
return
@@ -166,11 +168,16 @@ class PluginRuntimeManager(
await self._third_party_supervisor.start()
started_supervisors.append(self._third_party_supervisor)
await self._start_plugin_file_watcher()
config_manager.register_reload_callback(self._config_reload_callback)
self._config_reload_callback_registered = True
self._started = True
logger.info(f"插件运行时已启动 — 内置: {builtin_dirs or ''}, 第三方: {third_party_dirs or ''}")
except Exception as e:
logger.error(f"插件运行时启动失败: {e}", exc_info=True)
await self._stop_plugin_file_watcher()
if self._config_reload_callback_registered:
config_manager.unregister_reload_callback(self._config_reload_callback)
self._config_reload_callback_registered = False
await asyncio.gather(*(sv.stop() for sv in started_supervisors), return_exceptions=True)
platform_io_manager.clear_inbound_dispatcher()
try:
@@ -188,6 +195,9 @@ class PluginRuntimeManager(
platform_io_manager = get_platform_io_manager()
await self._stop_plugin_file_watcher()
if self._config_reload_callback_registered:
config_manager.unregister_reload_callback(self._config_reload_callback)
self._config_reload_callback_registered = False
coroutines: List[Coroutine[Any, Any, None]] = []
if self._builtin_supervisor:
@@ -233,6 +243,7 @@ class PluginRuntimeManager(
plugin_id: str,
config_data: Optional[Dict[str, Any]] = None,
config_version: str = "",
config_scope: str = "self",
) -> bool:
"""向拥有该插件的 Supervisor 推送配置更新事件。
@@ -240,6 +251,7 @@ class PluginRuntimeManager(
plugin_id: 插件 ID
config_data: 可选的配置数据(如果为 None 则由 Supervisor 从磁盘加载)
config_version: 可选的配置版本字符串,供 Supervisor 进行版本控制
config_scope: 配置变更范围。
"""
if not self._started:
return False
@@ -258,12 +270,67 @@ class PluginRuntimeManager(
if config_data is not None
else self._load_plugin_config_for_supervisor(sv, plugin_id)
)
await sv.notify_plugin_config_updated(
return await sv.notify_plugin_config_updated(
plugin_id=plugin_id,
config_data=config_payload,
config_version=config_version,
config_scope=config_scope,
)
return True
@staticmethod
def _normalize_config_reload_scopes(changed_scopes: Sequence[str]) -> tuple[str, ...]:
"""规范化配置热重载范围列表。
Args:
changed_scopes: 原始配置热重载范围列表。
Returns:
tuple[str, ...]: 去重后的有效配置范围元组。
"""
normalized_scopes: list[str] = []
for scope in changed_scopes:
normalized_scope = str(scope or "").strip().lower()
if normalized_scope not in {"bot", "model"}:
continue
if normalized_scope not in normalized_scopes:
normalized_scopes.append(normalized_scope)
return tuple(normalized_scopes)
async def _broadcast_config_reload(self, scope: str, config_data: Dict[str, Any]) -> None:
"""向订阅指定范围的插件广播配置热重载。
Args:
scope: 配置变更范围,仅支持 ``bot`` 或 ``model``。
config_data: 最新配置数据。
"""
for supervisor in self.supervisors:
for plugin_id in supervisor.get_config_reload_subscribers(scope):
delivered = await supervisor.notify_plugin_config_updated(
plugin_id=plugin_id,
config_data=config_data,
config_version="",
config_scope=scope,
)
if not delivered:
logger.warning(f"向插件 {plugin_id} 广播 {scope} 配置热重载失败")
async def _handle_main_config_reload(self, changed_scopes: Sequence[str]) -> None:
"""处理 bot/model 主配置热重载广播。
Args:
changed_scopes: 本次热重载命中的配置范围列表。
"""
if not self._started:
return
normalized_scopes = self._normalize_config_reload_scopes(changed_scopes)
if "bot" in normalized_scopes:
await self._broadcast_config_reload("bot", config_manager.get_global_config().model_dump())
if "model" in normalized_scopes:
await self._broadcast_config_reload("model", config_manager.get_model_config().model_dump())
# ─── 事件桥接 ──────────────────────────────────────────────
@@ -612,16 +679,12 @@ class PluginRuntimeManager(
return None if plugin_path is None else plugin_path / "config.toml"
async def _handle_plugin_config_changes(self, plugin_id: str, changes: Sequence[FileChange]) -> None:
"""处理单个插件配置文件变化,并精确重载目标插件
"""处理单个插件配置文件变化,并定向派发自配置热更新
Args:
plugin_id: 发生配置变更的插件 ID。
changes: 当前批次收集到的配置文件变更列表。
Notes:
这里选择“精确重载该插件”,而不是仅推送软性的配置更新通知。
这样可以保证没有实现 ``on_config_update()`` 的插件也能重新执行
``on_load()``,让磁盘上的 ``config.toml`` 修改对插件运行态真正生效。
"""
if not self._started or not changes:
return
@@ -636,15 +699,15 @@ class PluginRuntimeManager(
return
try:
self._load_plugin_config_for_supervisor(supervisor, plugin_id)
reload_success = await supervisor.reload_plugin(
config_payload = self._load_plugin_config_for_supervisor(supervisor, plugin_id)
delivered = await supervisor.notify_plugin_config_updated(
plugin_id=plugin_id,
reason="config_file_changed",
config_data=config_payload,
config_version="",
config_scope="self",
)
if reload_success:
self._refresh_plugin_config_watch_subscriptions()
else:
logger.warning(f"插件 {plugin_id} 配置文件变更后重载失败")
if not delivered:
logger.warning(f"插件 {plugin_id} 配置文件变更后通知失败")
except Exception as exc:
logger.warning(f"插件 {plugin_id} 配置文件变更处理失败: {exc}")
@@ -652,8 +715,8 @@ class PluginRuntimeManager(
"""处理插件源码相关变化。
这里仅负责源码、清单等会影响插件装载状态的文件;配置文件的变化会由
单独的 per-plugin watcher 处理,并精确重载对应插件,避免放大成
不必要的跨插件 reload。
单独的 per-plugin watcher 处理,并定向派发给目标插件的
``on_config_update()``,避免放大成不必要的跨插件 reload。
"""
if not self._started or not changes:
return

View File

@@ -29,6 +29,14 @@ class MessageType(str, Enum):
BROADCAST = "broadcast"
class ConfigReloadScope(str, Enum):
"""配置热重载范围。"""
SELF = "self"
BOT = "bot"
MODEL = "model"
# ====== 请求 ID 生成器 ======
class RequestIdGenerator:
"""单调递增 int64 请求 ID 生成器"""
@@ -158,6 +166,8 @@ class RegisterPluginPayload(BaseModel):
"""组件列表"""
capabilities_required: List[str] = Field(default_factory=list, description="所需能力列表")
"""所需能力列表"""
config_reload_subscriptions: List[str] = Field(default_factory=list, description="订阅的全局配置热重载范围")
"""订阅的全局配置热重载范围"""
class BootstrapPluginPayload(BaseModel):
@@ -236,6 +246,8 @@ class ConfigUpdatedPayload(BaseModel):
plugin_id: str = Field(description="插件 ID")
"""插件 ID"""
config_scope: ConfigReloadScope = Field(description="配置变更范围")
"""配置变更范围"""
config_version: str = Field(description="新配置版本")
"""新配置版本"""
config_data: Dict[str, Any] = Field(default_factory=dict, description="配置内容")

View File

@@ -403,6 +403,7 @@ class PluginLoader:
create_plugin = getattr(module, "create_plugin", None)
if create_plugin is not None:
instance = create_plugin()
self._validate_sdk_plugin_contract(plugin_id, instance)
logger.info(f"插件 {plugin_id} v{manifest.get('version', '?')} 加载成功")
return PluginMeta(
plugin_id=plugin_id,
@@ -432,6 +433,35 @@ class PluginLoader:
logger.error(f"插件 {plugin_id} 缺少 create_plugin 工厂函数且未检测到旧版 BasePlugin")
return None
@staticmethod
def _validate_sdk_plugin_contract(plugin_id: str, instance: Any) -> None:
"""校验 SDK 插件的基础契约。
Args:
plugin_id: 当前插件 ID。
instance: ``create_plugin()`` 返回的插件实例。
Raises:
TypeError: 当插件未覆盖必需生命周期方法或订阅声明不合法时抛出。
"""
try:
from maibot_sdk.plugin import MaiBotPlugin
except ImportError:
return
if not isinstance(instance, MaiBotPlugin):
return
if type(instance).on_load is MaiBotPlugin.on_load:
raise TypeError(f"插件 {plugin_id} 必须实现 on_load()")
if type(instance).on_unload is MaiBotPlugin.on_unload:
raise TypeError(f"插件 {plugin_id} 必须实现 on_unload()")
if type(instance).on_config_update is MaiBotPlugin.on_config_update:
raise TypeError(f"插件 {plugin_id} 必须实现 on_config_update()")
instance.get_config_reload_subscriptions()
@staticmethod
@contextlib.contextmanager
def _temporary_sys_path_entry(path: Path) -> Iterator[None]:

View File

@@ -27,6 +27,7 @@ from src.plugin_runtime import ENV_HOST_VERSION, ENV_IPC_ADDRESS, ENV_PLUGIN_DIR
from src.plugin_runtime.protocol.envelope import (
BootstrapPluginPayload,
ComponentDeclaration,
ConfigUpdatedPayload,
Envelope,
HealthPayload,
InvokePayload,
@@ -342,6 +343,7 @@ class PluginRunner:
"""
# 收集插件组件声明
components: List[ComponentDeclaration] = []
config_reload_subscriptions: List[str] = []
instance = meta.instance
# 从插件实例获取组件声明SDK 插件须实现 get_components 方法)
@@ -355,12 +357,15 @@ class PluginRunner:
)
for comp_info in instance.get_components()
)
if hasattr(instance, "get_config_reload_subscriptions"):
config_reload_subscriptions = list(instance.get_config_reload_subscriptions())
reg_payload = RegisterPluginPayload(
plugin_id=meta.plugin_id,
plugin_version=meta.version,
components=components,
capabilities_required=meta.capabilities_required,
config_reload_subscriptions=config_reload_subscriptions,
)
try:
@@ -911,18 +916,28 @@ class PluginRunner:
return envelope.make_response(payload={"acknowledged": True})
async def _handle_config_updated(self, envelope: Envelope) -> Envelope:
"""处理配置更新事件"""
"""处理配置更新事件"""
try:
payload = ConfigUpdatedPayload.model_validate(envelope.payload)
except Exception as exc:
return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc))
plugin_id = envelope.plugin_id
if meta := self._loader.get_plugin(plugin_id):
try:
config_data = envelope.payload.get("config_data", {})
config_version = envelope.payload.get("config_version", "")
self._apply_plugin_config(meta, config_data=config_data)
if hasattr(meta.instance, "on_config_update"):
ret = meta.instance.on_config_update(config_data, config_version)
# 兼容同步和异步的 on_config_update 实现
if asyncio.iscoroutine(ret):
await ret
config_scope = payload.config_scope.value
if config_scope == "self":
self._apply_plugin_config(meta, config_data=payload.config_data)
if not hasattr(meta.instance, "on_config_update"):
raise AttributeError("插件缺少 on_config_update() 实现")
ret = meta.instance.on_config_update(
config_scope,
payload.config_data,
payload.config_version,
)
if asyncio.iscoroutine(ret):
await ret
except Exception as e:
logger.error(f"插件 {plugin_id} 配置更新失败: {e}")
return envelope.make_error_response(ErrorCode.E_UNKNOWN.value, str(e))