diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index b2489f81..f79ca12d 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -152,21 +152,31 @@ class ChatBot: args={ "text": message.processed_plain_text, "stream_id": message.session_id or "", + "matched_groups": matched.get("matched_groups") or {}, }, timeout_ms=30000, ) payload = resp.payload success = payload.get("success", False) - result = payload.get("result", "") - intercept = bool(matched["metadata"].get("intercept_message_level", 0)) + cmd_result = payload.get("result") + + # 拦截位优先从命令返回值中获取(支持运行时动态决定), + # 回退到组件 metadata 中的静态声明 + if isinstance(cmd_result, (list, tuple)) and len(cmd_result) >= 3: + # 命令返回 (found, response_text, intercept_bool) 三元组 + response_text = cmd_result[1] if cmd_result[1] is not None else "" + intercept = bool(cmd_result[2]) + else: + response_text = cmd_result if cmd_result is not None else "" + intercept = bool(matched["metadata"].get("intercept_message_level", 0)) if success: logger.info(f"[新运行时] 命令执行成功: {matched['full_name']}") else: - logger.warning(f"[新运行时] 命令执行失败: {matched['full_name']} - {result}") + logger.warning(f"[新运行时] 命令执行失败: {matched['full_name']} - {response_text}") - return True, result, not intercept + return True, response_text, not intercept except Exception as e: logger.error(f"[新运行时] 执行命令 {matched['full_name']} 异常: {e}", exc_info=True) diff --git a/src/plugin_runtime/host/component_registry.py b/src/plugin_runtime/host/component_registry.py index 16e307a3..d693883f 100644 --- a/src/plugin_runtime/host/component_registry.py +++ b/src/plugin_runtime/host/component_registry.py @@ -181,18 +181,23 @@ class ComponentRegistry: comps = self._by_plugin.get(plugin_id, []) return [c for c in comps if c.enabled] if enabled_only else list(comps) - def find_command_by_text(self, text: str) -> Optional[RegisteredComponent]: - """通过文本匹配命令正则,返回第一个匹配的 command 组件。""" + def find_command_by_text(self, text: str) -> Optional[tuple[RegisteredComponent, Dict[str, Any]]]: + """通过文本匹配命令正则,返回 (组件, matched_groups) 元组。 + + matched_groups 为正则命名捕获组 dict,别名匹配时为空 dict。 + """ for comp in self._by_type.get("command", {}).values(): if not comp.enabled: continue - if comp._compiled_pattern and comp._compiled_pattern.search(text): - return comp + if comp._compiled_pattern: + m = comp._compiled_pattern.search(text) + if m: + return comp, m.groupdict() # 别名匹配 aliases = comp.metadata.get("aliases", []) for alias in aliases: if text.startswith(alias): - return comp + return comp, {} return None def get_event_handlers( diff --git a/src/plugin_runtime/host/rpc_server.py b/src/plugin_runtime/host/rpc_server.py index c3639c64..687f35f6 100644 --- a/src/plugin_runtime/host/rpc_server.py +++ b/src/plugin_runtime/host/rpc_server.py @@ -225,6 +225,19 @@ class RPCServer: if old_connection and old_connection is not conn and not old_connection.is_closed: logger.info("检测到新 Runner 已接管连接,关闭旧连接") + # 新连接接管后,旧 Runner 的 in-flight 请求不会再收到响应 + # (过期 generation 响应会被 _handle_response 丢弃), + # 在此处立即 fail-fast 所有 pending 请求,避免挂到超时 + stale_count = 0 + for _req_id, future in list(self._pending_requests.items()): + if not future.done(): + future.set_exception( + RPCError(ErrorCode.E_PLUGIN_CRASHED, "Runner 连接已被新 generation 接管") + ) + stale_count += 1 + self._pending_requests.clear() + if stale_count: + logger.info(f"已清理 {stale_count} 个旧 Runner 的 pending 请求") await old_connection.close() # 启动消息接收循环 @@ -237,7 +250,7 @@ class RPCServer: self._connection = None self._runner_id = None # 连接断开时,立即让所有等待中的请求失败,避免挂起至超时 - for req_id, future in list(self._pending_requests.items()): + for _req_id, future in list(self._pending_requests.items()): if not future.done(): future.set_exception(RPCError(ErrorCode.E_PLUGIN_CRASHED, "Runner 连接已断开")) self._pending_requests.clear() diff --git a/src/plugin_runtime/integration.py b/src/plugin_runtime/integration.py index 2f6afc4a..fd29fa06 100644 --- a/src/plugin_runtime/integration.py +++ b/src/plugin_runtime/integration.py @@ -195,15 +195,17 @@ class PluginRuntimeManager: return None for sv in self.supervisors: - result = sv.component_registry.find_command_by_text(text) - if result is not None: + match_result = sv.component_registry.find_command_by_text(text) + if match_result is not None: + comp, matched_groups = match_result return { - "name": result.name, - "full_name": result.full_name, - "component_type": result.component_type, - "plugin_id": result.plugin_id, - "metadata": result.metadata, - "enabled": result.enabled, + "name": comp.name, + "full_name": comp.full_name, + "component_type": comp.component_type, + "plugin_id": comp.plugin_id, + "metadata": comp.metadata, + "enabled": comp.enabled, + "matched_groups": matched_groups, } return None @@ -1399,17 +1401,30 @@ class PluginRuntimeManager: @staticmethod async def _cap_component_get_all_plugins(plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: - """获取所有插件信息(汇总所有 Supervisor 的注册信息)""" + """获取所有插件信息(汇总所有 Supervisor 的注册信息,包含组件列表)""" mgr = get_plugin_runtime_manager() result: Dict[str, Any] = {} for sv in mgr.supervisors: for pid, reg in sv._registered_plugins.items(): + # 从 ComponentRegistry 中获取该插件的所有组件 + comps = sv.component_registry.get_components_by_plugin(pid, enabled_only=False) + components_list = [ + { + "name": c.name, + "full_name": c.full_name, + "type": c.component_type, + "enabled": c.enabled, + "metadata": c.metadata, + } + for c in comps + ] result[pid] = { "name": pid, "version": reg.plugin_version, "description": "", "author": "", "enabled": True, + "components": components_list, } return {"success": True, "plugins": result} @@ -1458,58 +1473,91 @@ class PluginRuntimeManager: async def _cap_component_enable(plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: """启用组件 - args: name, component_type + args: name, component_type, scope, stream_id """ name: str = args.get("name", "") component_type: str = args.get("component_type", "") if not name or not component_type: return {"success": False, "error": "缺少必要参数 name 或 component_type"} + # TODO: scope 和 stream_id 参数尚未实现,当前均为全局启用 mgr = get_plugin_runtime_manager() for sv in mgr.supervisors: + # 先尝试按全名查找(plugin_id.component_name) comp = sv.component_registry.get_component(name) - if comp is not None: + if comp is not None and comp.component_type == component_type: comp.enabled = True return {"success": True} - return {"success": False, "error": f"未找到组件: {name}"} + # 回退:按短名 + 类型在该类型索引中搜索 + for c in sv.component_registry.get_components_by_type(component_type, enabled_only=False): + if c.name == name: + c.enabled = True + return {"success": True} + return {"success": False, "error": f"未找到组件: {name} ({component_type})"} @staticmethod async def _cap_component_disable(plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: """禁用组件 - args: name, component_type + args: name, component_type, scope, stream_id """ name: str = args.get("name", "") component_type: str = args.get("component_type", "") if not name or not component_type: return {"success": False, "error": "缺少必要参数 name 或 component_type"} + # TODO: scope 和 stream_id 参数尚未实现,当前均为全局禁用 mgr = get_plugin_runtime_manager() for sv in mgr.supervisors: comp = sv.component_registry.get_component(name) - if comp is not None: + if comp is not None and comp.component_type == component_type: comp.enabled = False return {"success": True} - return {"success": False, "error": f"未找到组件: {name}"} + for c in sv.component_registry.get_components_by_type(component_type, enabled_only=False): + if c.name == name: + c.enabled = False + return {"success": True} + return {"success": False, "error": f"未找到组件: {name} ({component_type})"} @staticmethod async def _cap_component_load_plugin(plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: """加载插件(在新运行时中通过热重载实现) + 先验证目标插件是否已注册或插件目录是否存在于某个 Supervisor, + 然后只对拥有该插件的 Supervisor 执行热重载。 + args: plugin_name """ plugin_name: str = args.get("plugin_name", "") if not plugin_name: return {"success": False, "error": "缺少必要参数 plugin_name"} + import os + mgr = get_plugin_runtime_manager() + + # 优先查找已注册该插件的 Supervisor for sv in mgr.supervisors: - try: - await sv.reload_plugins(reason=f"load {plugin_name}") - return {"success": True, "count": 1} - except Exception as e: - logger.error(f"[cap.component.load_plugin] 热重载失败: {e}") - return {"success": False, "error": f"无法加载插件: {plugin_name}"} + if plugin_name in sv._registered_plugins: + try: + await sv.reload_plugins(reason=f"load {plugin_name}") + return {"success": True, "count": 1} + except Exception as e: + logger.error(f"[cap.component.load_plugin] 热重载失败: {e}") + return {"success": False, "error": str(e)} + + # 插件尚未注册,检查是否有 Supervisor 的 plugin_dirs 下包含该插件目录 + for sv in mgr.supervisors: + for pdir in sv._plugin_dirs: + if os.path.isdir(os.path.join(pdir, plugin_name)): + try: + await sv.reload_plugins(reason=f"load {plugin_name}") + return {"success": True, "count": 1} + except Exception as e: + logger.error(f"[cap.component.load_plugin] 热重载失败: {e}") + return {"success": False, "error": str(e)} + + return {"success": False, "error": f"未找到插件: {plugin_name}"} @staticmethod async def _cap_component_unload_plugin(plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: