From 8f7f31a16446e4514e3a46b6d9bae855c8aa612d Mon Sep 17 00:00:00 2001 From: DrSmoothl <1787882683@qq.com> Date: Fri, 13 Mar 2026 00:47:09 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=E8=B0=83=E7=94=A8=E9=80=BB=E8=BE=91=EF=BC=8C=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E5=91=BD=E4=BB=A4=E5=8C=B9=E9=85=8D=E5=92=8C=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E5=A4=84=E7=90=86=EF=BC=8C=E5=A2=9E=E5=BC=BA=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/message_receive/bot.py | 20 ++++----- src/plugin_runtime/integration.py | 20 +++++++++ src/plugin_runtime/protocol/envelope.py | 3 ++ src/plugin_runtime/runner/runner_main.py | 56 +++++++++++++++++++++++- 4 files changed, 88 insertions(+), 11 deletions(-) diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index eec9b3c0..e7a28858 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -134,21 +134,21 @@ class ChatBot: from src.plugin_runtime.integration import get_plugin_runtime_manager prm = get_plugin_runtime_manager() - if not prm.is_running or prm.supervisor is None: + if not prm.is_running: return None - matched = prm.supervisor.component_registry.find_command_by_text(message.processed_plain_text) + matched = prm.find_command_by_text(message.processed_plain_text) if matched is None: return None message.is_command = True - logger.info(f"[新运行时] 匹配命令: {matched.full_name}") + logger.info(f"[新运行时] 匹配命令: {matched['full_name']}") try: - resp = await prm.supervisor.invoke_plugin( + resp = await prm.invoke_plugin( method="plugin.invoke_command", - plugin_id=matched.plugin_id, - component_name=matched.name, + plugin_id=matched["plugin_id"], + component_name=matched["name"], args={ "text": message.processed_plain_text, "stream_id": message.session_id or "", @@ -159,17 +159,17 @@ class ChatBot: payload = resp.payload success = payload.get("success", False) result = payload.get("result", "") - intercept = bool(matched.metadata.get("intercept_message_level", 0)) + intercept = bool(matched["metadata"].get("intercept_message_level", 0)) if success: - logger.info(f"[新运行时] 命令执行成功: {matched.full_name}") + logger.info(f"[新运行时] 命令执行成功: {matched['full_name']}") else: - logger.warning(f"[新运行时] 命令执行失败: {matched.full_name} - {result}") + logger.warning(f"[新运行时] 命令执行失败: {matched['full_name']} - {result}") return True, result, not intercept except Exception as e: - logger.error(f"[新运行时] 执行命令 {matched.full_name} 异常: {e}", exc_info=True) + logger.error(f"[新运行时] 执行命令 {matched['full_name']} 异常: {e}", exc_info=True) return True, str(e), True async def handle_notice_message(self, message: SessionMessage): diff --git a/src/plugin_runtime/integration.py b/src/plugin_runtime/integration.py index b306acb7..2f6afc4a 100644 --- a/src/plugin_runtime/integration.py +++ b/src/plugin_runtime/integration.py @@ -207,6 +207,26 @@ class PluginRuntimeManager: } return None + async def invoke_plugin( + self, + method: str, + plugin_id: str, + component_name: str, + args: Optional[Dict[str, Any]] = None, + timeout_ms: int = 30000, + ) -> Any: + """将插件调用路由到拥有该插件的 Supervisor""" + for sv in self.supervisors: + if sv.component_registry.get_components_by_plugin(plugin_id): + return await sv.invoke_plugin( + method=method, + plugin_id=plugin_id, + component_name=component_name, + args=args, + timeout_ms=timeout_ms, + ) + raise RuntimeError(f"插件 {plugin_id} 未在任何 Supervisor 中注册") + # ─── 能力实现注册 ────────────────────────────────────────── def _register_capability_impls(self, supervisor: Any) -> None: diff --git a/src/plugin_runtime/protocol/envelope.py b/src/plugin_runtime/protocol/envelope.py index 40d720f6..41fcf764 100644 --- a/src/plugin_runtime/protocol/envelope.py +++ b/src/plugin_runtime/protocol/envelope.py @@ -172,6 +172,9 @@ class HealthPayload(BaseModel): # ─── 配置更新 ────────────────────────────────────────────────────── +# TODO: Host 侧尚未实现配置变更检测与推送。Runner 端的 _handle_config_updated +# 已就绪,但当前无任何调用方通过 RPC 发送 plugin.config_updated 消息。 +# 需要在 Supervisor 或 CapabilityService 中监听配置文件变化并主动推送。 class ConfigUpdatedPayload(BaseModel): """plugin.config_updated 事件 payload""" plugin_id: str = Field(description="插件 ID") diff --git a/src/plugin_runtime/runner/runner_main.py b/src/plugin_runtime/runner/runner_main.py index 41d91a1f..14fc5ba9 100644 --- a/src/plugin_runtime/runner/runner_main.py +++ b/src/plugin_runtime/runner/runner_main.py @@ -177,7 +177,7 @@ class PluginRunner: self._rpc_client.register_method("plugin.invoke_command", self._handle_invoke) self._rpc_client.register_method("plugin.invoke_action", self._handle_invoke) self._rpc_client.register_method("plugin.invoke_tool", self._handle_invoke) - self._rpc_client.register_method("plugin.emit_event", self._handle_invoke) + self._rpc_client.register_method("plugin.emit_event", self._handle_event_invoke) self._rpc_client.register_method("plugin.invoke_workflow_step", self._handle_workflow_step) self._rpc_client.register_method("plugin.health", self._handle_health) self._rpc_client.register_method("plugin.prepare_shutdown", self._handle_prepare_shutdown) @@ -270,6 +270,60 @@ class PluginRunner: resp_payload = InvokeResultPayload(success=False, result=str(e)) return envelope.make_response(payload=resp_payload.model_dump()) + async def _handle_event_invoke(self, envelope: Envelope) -> Envelope: + """处理 EventHandler 调用请求 + + 与通用 invoke 不同,会将返回值规范化为 + {success, continue_processing, modified_message, custom_result} 格式, + 使 EventDispatcher 可直接从 payload 顶层读取这些字段。 + """ + try: + invoke = InvokePayload.model_validate(envelope.payload) + except Exception as e: + return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(e)) + + plugin_id = envelope.plugin_id + meta = self._loader.get_plugin(plugin_id) + if meta is None: + return envelope.make_error_response( + ErrorCode.E_PLUGIN_NOT_FOUND.value, + f"插件 {plugin_id} 未加载", + ) + + instance = meta.instance + component_name = invoke.component_name + handler_method = getattr(instance, f"handle_{component_name}", None) + if handler_method is None: + handler_method = getattr(instance, component_name, None) + + if handler_method is None or not callable(handler_method): + return envelope.make_error_response( + ErrorCode.E_METHOD_NOT_ALLOWED.value, + f"插件 {plugin_id} 无组件: {component_name}", + ) + + try: + raw = await handler_method(**invoke.args) if inspect.iscoroutinefunction(handler_method) else handler_method(**invoke.args) + + # 规范化返回值:将 EventHandler 返回展平到 payload 顶层 + if raw is None: + result = {"success": True, "continue_processing": True} + elif isinstance(raw, dict): + result = { + "success": True, + # 兼容 guide.md 中文档的 {"blocked": True} 写法 + "continue_processing": not raw.get("blocked", False) if "blocked" in raw else raw.get("continue_processing", True), + "modified_message": raw.get("modified_message"), + "custom_result": raw.get("custom_result"), + } + else: + result = {"success": True, "continue_processing": True, "custom_result": raw} + + return envelope.make_response(payload=result) + except Exception as e: + logger.error(f"插件 {plugin_id} event_handler {component_name} 执行异常: {e}", exc_info=True) + return envelope.make_response(payload={"success": False, "continue_processing": True}) + async def _handle_workflow_step(self, envelope: Envelope) -> Envelope: """处理 WorkflowStep 调用请求