feat: 实现插件配置更新通知机制,支持热重载与文件变化监听

This commit is contained in:
DrSmoothl
2026-03-13 15:47:49 +08:00
parent f1e10b4054
commit d92aa800a3
4 changed files with 276 additions and 5 deletions

View File

@@ -1690,3 +1690,79 @@ class TestIntegration:
assert manager.is_running is False assert manager.is_running is False
assert len(instances) == 2 assert len(instances) == 2
assert instances[0].stopped is True assert instances[0].stopped is True
@pytest.mark.asyncio
async def test_handle_plugin_file_changes_routes_reload_and_config_update(self, monkeypatch, tmp_path):
from pathlib import Path
from src.config.file_watcher import FileChange
from src.plugin_runtime import integration as integration_module
builtin_root = tmp_path / "src" / "plugins" / "built_in"
thirdparty_root = tmp_path / "plugins"
alpha_dir = builtin_root / "alpha"
beta_dir = thirdparty_root / "beta"
alpha_dir.mkdir(parents=True)
beta_dir.mkdir(parents=True)
(alpha_dir / "config.toml").write_text("enabled = true\n", encoding="utf-8")
(beta_dir / "config.toml").write_text("enabled = false\n", encoding="utf-8")
monkeypatch.chdir(tmp_path)
class FakeSupervisor:
def __init__(self, plugin_dirs, registered_plugins):
self._plugin_dirs = plugin_dirs
self._registered_plugins = registered_plugins
self.reload_reasons = []
self.config_updates = []
async def reload_plugins(self, reason="manual"):
self.reload_reasons.append(reason)
async def notify_plugin_config_updated(self, plugin_id, config_data, config_version=""):
self.config_updates.append((plugin_id, config_data, config_version))
return True
manager = integration_module.PluginRuntimeManager()
manager._started = True
manager._builtin_supervisor = FakeSupervisor([str(builtin_root)], {"alpha": object()})
manager._thirdparty_supervisor = FakeSupervisor([str(thirdparty_root)], {"beta": object()})
changes = [
FileChange(change_type=1, path=alpha_dir / "config.toml"),
FileChange(change_type=1, path=beta_dir / "plugin.py"),
]
await manager._handle_plugin_file_changes(changes)
assert manager._builtin_supervisor.reload_reasons == []
assert manager._thirdparty_supervisor.reload_reasons == ["file_watcher"]
assert manager._builtin_supervisor.config_updates == [
("alpha", {"enabled": True}, "")
]
assert manager._thirdparty_supervisor.config_updates == []
@pytest.mark.asyncio
async def test_handle_config_reload_notifies_all_registered_plugins(self):
from src.plugin_runtime import integration as integration_module
class FakeSupervisor:
def __init__(self, plugins):
self._registered_plugins = {plugin_id: object() for plugin_id in plugins}
manager = integration_module.PluginRuntimeManager()
manager._started = True
manager._builtin_supervisor = FakeSupervisor(["alpha"])
manager._thirdparty_supervisor = FakeSupervisor(["beta", "gamma"])
notified = []
async def fake_notify(plugin_id, config_data=None, config_version=""):
notified.append((plugin_id, config_version))
return True
manager.notify_plugin_config_updated = fake_notify
await manager.handle_config_reload()
assert notified == [("alpha", ""), ("beta", ""), ("gamma", "")]

View File

@@ -25,6 +25,7 @@ from src.plugin_runtime.host.policy_engine import PolicyEngine
from src.plugin_runtime.host.rpc_server import RPCServer from src.plugin_runtime.host.rpc_server import RPCServer
from src.plugin_runtime.host.workflow_executor import WorkflowExecutor, WorkflowContext, WorkflowResult from src.plugin_runtime.host.workflow_executor import WorkflowExecutor, WorkflowContext, WorkflowResult
from src.plugin_runtime.protocol.envelope import ( from src.plugin_runtime.protocol.envelope import (
ConfigUpdatedPayload,
Envelope, Envelope,
HealthPayload, HealthPayload,
LogBatchPayload, LogBatchPayload,
@@ -375,6 +376,29 @@ class PluginSupervisor:
logger.info("热重载完成") logger.info("热重载完成")
async def notify_plugin_config_updated(
self,
plugin_id: str,
config_data: Dict[str, Any],
config_version: str = "",
) -> bool:
"""通知指定插件其配置已更新。"""
if plugin_id not in self._registered_plugins:
return False
payload = ConfigUpdatedPayload(
plugin_id=plugin_id,
config_version=config_version,
config_data=config_data,
)
await self._rpc_server.send_request(
"plugin.config_updated",
plugin_id=plugin_id,
payload=payload.model_dump(),
timeout_ms=5000,
)
return True
# ─── 内部方法 ────────────────────────────────────────────── # ─── 内部方法 ──────────────────────────────────────────────
def _register_internal_methods(self) -> None: def _register_internal_methods(self) -> None:

View File

@@ -7,14 +7,16 @@
4. 提供统一的能力实现注册接口,使插件可以调用主程序功能 4. 提供统一的能力实现注册接口,使插件可以调用主程序功能
""" """
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, Iterable, List, Optional, Tuple
import asyncio import asyncio
import os import os
from pathlib import Path
from src.chat.message_receive.chat_manager import BotChatSession from src.chat.message_receive.chat_manager import BotChatSession
from src.common.logger import get_logger from src.common.logger import get_logger
from src.config.config import global_config from src.config.config import config_manager, global_config
from src.config.file_watcher import FileChange, FileWatcher
logger = get_logger("plugin_runtime.integration") logger = get_logger("plugin_runtime.integration")
@@ -45,6 +47,9 @@ class PluginRuntimeManager:
self._builtin_supervisor: Optional[PluginSupervisor] = None self._builtin_supervisor: Optional[PluginSupervisor] = None
self._thirdparty_supervisor: Optional[PluginSupervisor] = None self._thirdparty_supervisor: Optional[PluginSupervisor] = None
self._started: bool = False self._started: bool = False
self._config_reload_callback_registered: bool = False
self._plugin_file_watcher: Optional[FileWatcher] = None
self._plugin_file_watcher_subscription_id: Optional[str] = None
# ─── 插件目录 ───────────────────────────────────────────── # ─── 插件目录 ─────────────────────────────────────────────
@@ -112,10 +117,14 @@ class PluginRuntimeManager:
if self._thirdparty_supervisor: if self._thirdparty_supervisor:
await self._thirdparty_supervisor.start() await self._thirdparty_supervisor.start()
started_supervisors.append(self._thirdparty_supervisor) started_supervisors.append(self._thirdparty_supervisor)
self._register_config_reload_callback()
await self._start_plugin_file_watcher()
self._started = True self._started = True
logger.info(f"插件运行时已启动 — 内置: {builtin_dirs or ''}, 第三方: {thirdparty_dirs or ''}") logger.info(f"插件运行时已启动 — 内置: {builtin_dirs or ''}, 第三方: {thirdparty_dirs or ''}")
except Exception as e: except Exception as e:
logger.error(f"插件运行时启动失败: {e}", exc_info=True) logger.error(f"插件运行时启动失败: {e}", exc_info=True)
await self._stop_plugin_file_watcher()
self._unregister_config_reload_callback()
await asyncio.gather(*(sv.stop() for sv in started_supervisors), return_exceptions=True) await asyncio.gather(*(sv.stop() for sv in started_supervisors), return_exceptions=True)
self._started = False self._started = False
self._builtin_supervisor = None self._builtin_supervisor = None
@@ -126,6 +135,9 @@ class PluginRuntimeManager:
if not self._started: if not self._started:
return return
await self._stop_plugin_file_watcher()
self._unregister_config_reload_callback()
coros = [] coros = []
if self._builtin_supervisor: if self._builtin_supervisor:
coros.append(self._builtin_supervisor.stop()) coros.append(self._builtin_supervisor.stop())
@@ -151,6 +163,44 @@ class PluginRuntimeManager:
"""获取所有活跃的 Supervisor""" """获取所有活跃的 Supervisor"""
return [s for s in (self._builtin_supervisor, self._thirdparty_supervisor) if s is not None] return [s for s in (self._builtin_supervisor, self._thirdparty_supervisor) if s is not None]
async def notify_plugin_config_updated(
self,
plugin_id: str,
config_data: Optional[Dict[str, Any]] = None,
config_version: str = "",
) -> bool:
"""向拥有该插件的 Supervisor 推送配置更新事件。"""
if not self._started:
return False
for sv in self.supervisors:
if plugin_id in sv._registered_plugins:
config_payload = (
config_data
if config_data is not None
else self._load_plugin_config_for_supervisor(plugin_id, getattr(sv, "_plugin_dirs", []))
)
await sv.notify_plugin_config_updated(
plugin_id=plugin_id,
config_data=config_payload,
config_version=config_version,
)
return True
return False
async def handle_config_reload(self) -> None:
"""处理主配置热重载后的插件配置通知。"""
if not self._started:
return
tasks = [
self.notify_plugin_config_updated(plugin_id)
for sv in self.supervisors
for plugin_id in list(sv._registered_plugins.keys())
]
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
# ─── 事件桥接 ────────────────────────────────────────────── # ─── 事件桥接 ──────────────────────────────────────────────
async def bridge_event( async def bridge_event(
@@ -228,6 +278,127 @@ class PluginRuntimeManager:
) )
raise RuntimeError(f"插件 {plugin_id} 未在任何 Supervisor 中注册") raise RuntimeError(f"插件 {plugin_id} 未在任何 Supervisor 中注册")
def _register_config_reload_callback(self) -> None:
if self._config_reload_callback_registered:
return
config_manager.register_reload_callback(self.handle_config_reload)
self._config_reload_callback_registered = True
def _unregister_config_reload_callback(self) -> None:
if not self._config_reload_callback_registered:
return
config_manager.unregister_reload_callback(self.handle_config_reload)
self._config_reload_callback_registered = False
async def _start_plugin_file_watcher(self) -> None:
if self._plugin_file_watcher is not None and self._plugin_file_watcher.running:
return
watch_paths = [Path(path).resolve() for path in self._iter_plugin_dirs() if os.path.isdir(path)]
if not watch_paths:
return
watcher = FileWatcher(
paths=watch_paths,
debounce_ms=600,
callback_timeout_s=15.0,
callback_failure_threshold=3,
callback_cooldown_s=30.0,
)
subscription_id = watcher.subscribe(self._handle_plugin_file_changes, paths=watch_paths)
await watcher.start()
self._plugin_file_watcher = watcher
self._plugin_file_watcher_subscription_id = subscription_id
async def _stop_plugin_file_watcher(self) -> None:
if self._plugin_file_watcher is None:
return
if self._plugin_file_watcher_subscription_id is not None:
self._plugin_file_watcher.unsubscribe(self._plugin_file_watcher_subscription_id)
self._plugin_file_watcher_subscription_id = None
await self._plugin_file_watcher.stop()
self._plugin_file_watcher = None
def _iter_plugin_dirs(self) -> Iterable[str]:
for supervisor in self.supervisors:
for plugin_dir in getattr(supervisor, "_plugin_dirs", []):
yield plugin_dir
async def _handle_plugin_file_changes(self, changes: List[FileChange]) -> None:
if not self._started or not changes:
return
reload_supervisors: List[Any] = []
config_updates: Dict[str, set[str]] = {}
changed_paths = [change.path.resolve() for change in changes]
for supervisor in self.supervisors:
plugin_ids_for_config = config_updates.setdefault(self._get_supervisor_key(supervisor), set())
for path in changed_paths:
plugin_id = self._match_plugin_id_for_supervisor(supervisor, path)
if plugin_id is None:
continue
if path.name == "config.toml":
plugin_ids_for_config.add(plugin_id)
elif path.name in {"plugin.py", "_manifest.json"} or path.suffix == ".py":
if supervisor not in reload_supervisors:
reload_supervisors.append(supervisor)
for supervisor in reload_supervisors:
await supervisor.reload_plugins(reason="file_watcher")
for supervisor in self.supervisors:
if supervisor in reload_supervisors:
continue
for plugin_id in config_updates.get(self._get_supervisor_key(supervisor), set()):
try:
await supervisor.notify_plugin_config_updated(
plugin_id=plugin_id,
config_data=self._load_plugin_config_for_supervisor(
plugin_id, getattr(supervisor, "_plugin_dirs", [])
),
)
except Exception as exc:
logger.warning(f"插件 {plugin_id} 配置热更新通知失败: {exc}")
@staticmethod
def _get_supervisor_key(supervisor: Any) -> str:
return str(id(supervisor))
@staticmethod
def _plugin_dir_matches(path: Path, plugin_dir: str) -> bool:
plugin_root = Path(plugin_dir).resolve()
return path == plugin_root or path.is_relative_to(plugin_root)
def _match_plugin_id_for_supervisor(self, supervisor: Any, path: Path) -> Optional[str]:
for plugin_id, reg in getattr(supervisor, "_registered_plugins", {}).items():
for plugin_dir in getattr(supervisor, "_plugin_dirs", []):
candidate_dir = Path(plugin_dir).resolve() / plugin_id
if path == candidate_dir or path.is_relative_to(candidate_dir):
return plugin_id
for plugin_dir in getattr(supervisor, "_plugin_dirs", []):
plugin_root = Path(plugin_dir).resolve()
if self._plugin_dir_matches(path, plugin_dir):
relative_parts = path.relative_to(plugin_root).parts
if relative_parts:
return relative_parts[0]
return None
@staticmethod
def _load_plugin_config_for_supervisor(plugin_id: str, plugin_dirs: Iterable[str]) -> Dict[str, Any]:
import tomlkit
for plugin_dir in plugin_dirs:
plugin_path = Path(plugin_dir).resolve() / plugin_id
if plugin_path.is_dir():
config_path = plugin_path / "config.toml"
if not config_path.exists():
return {}
with open(config_path, "r", encoding="utf-8") as handle:
return dict(tomlkit.load(handle))
return {}
# ─── 能力实现注册 ────────────────────────────────────────── # ─── 能力实现注册 ──────────────────────────────────────────
def _register_capability_impls(self, supervisor: Any) -> None: def _register_capability_impls(self, supervisor: Any) -> None:

View File

@@ -194,9 +194,9 @@ class HealthPayload(BaseModel):
# ─── 配置更新 ────────────────────────────────────────────────────── # ─── 配置更新 ──────────────────────────────────────────────────────
# TODO: Host 侧尚未实现配置变更检测与推送。Runner 端的 _handle_config_updated # Host 侧现已支持配置更新推送:
# 已就绪,但当前无任何调用方通过 RPC 发送 plugin.config_updated 消息 # - 总配置热重载完成后PluginRuntimeManager 会向已加载插件推送配置更新事件
# 需要在 Supervisor 或 CapabilityService 中监听配置文件变化并主动推送 # - 插件目录下的 config.toml 变化由现有 FileWatcher 监听并转发为 plugin.config_updated
class ConfigUpdatedPayload(BaseModel): class ConfigUpdatedPayload(BaseModel):
"""plugin.config_updated 事件 payload""" """plugin.config_updated 事件 payload"""