"""插件运行时统一组件查询服务。 该模块统一从插件运行时的 Host ComponentRegistry 中聚合只读视图, 供 HFC、ToolExecutor 和运行时能力层查询与调用。 """ from __future__ import annotations from copy import deepcopy from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Tuple, cast from src.common.logger import get_logger from src.core.tooling import ( ToolExecutionContext, ToolExecutionResult, ToolInvocation, ToolSpec, build_tool_detailed_description, ) from src.core.types import ActionActivationType, ActionInfo, CommandInfo, ComponentInfo, ComponentType, ToolInfo from src.llm_models.payload_content.tool_option import normalize_tool_option if TYPE_CHECKING: from src.plugin_runtime.host.component_registry import ActionEntry, CommandEntry, ComponentEntry, ToolEntry from src.plugin_runtime.host.supervisor import PluginSupervisor from src.plugin_runtime.integration import PluginRuntimeManager logger = get_logger("plugin_runtime.component_query") ActionExecutor = Callable[..., Awaitable[Any]] CommandExecutor = Callable[..., Awaitable[Tuple[bool, Optional[str], bool]]] ToolExecutor = Callable[..., Awaitable[Any]] _HOST_COMPONENT_TYPE_MAP: Dict[ComponentType, str] = { ComponentType.ACTION: "ACTION", ComponentType.COMMAND: "COMMAND", ComponentType.TOOL: "TOOL", } class ComponentQueryService: """插件运行时统一组件查询服务。 该对象不维护独立状态,只读取插件系统中的注册结果。 所有注册、删除、配置写入等写操作都被显式禁用。 """ @staticmethod def _get_runtime_manager() -> "PluginRuntimeManager": """获取插件运行时管理器单例。 Returns: PluginRuntimeManager: 当前全局插件运行时管理器。 """ from src.plugin_runtime.integration import get_plugin_runtime_manager return get_plugin_runtime_manager() def _iter_supervisors(self) -> list["PluginSupervisor"]: """获取当前所有活跃的插件运行时监督器。 Returns: list[PluginSupervisor]: 当前运行中的监督器列表。 """ runtime_manager = self._get_runtime_manager() return list(runtime_manager.supervisors) def _iter_component_entries( self, component_type: ComponentType, *, enabled_only: bool = True, ) -> list[tuple["PluginSupervisor", "ComponentEntry"]]: """遍历指定类型的全部组件条目。 Args: component_type: 目标组件类型。 enabled_only: 是否仅返回启用状态的组件。 Returns: list[tuple[PluginSupervisor, ComponentEntry]]: ``(监督器, 组件条目)`` 列表。 """ host_component_type = _HOST_COMPONENT_TYPE_MAP.get(component_type) if host_component_type is None: return [] collected_entries: list[tuple["PluginSupervisor", "ComponentEntry"]] = [] for supervisor in self._iter_supervisors(): for component in supervisor.component_registry.get_components_by_type( host_component_type, enabled_only=enabled_only, ): collected_entries.append((supervisor, component)) return collected_entries @staticmethod def _coerce_action_activation_type(raw_value: Any) -> ActionActivationType: """规范化动作激活类型。 Args: raw_value: 原始激活类型值。 Returns: ActionActivationType: 规范化后的激活类型枚举。 """ normalized_value = str(raw_value or "").strip().lower() if normalized_value == ActionActivationType.NEVER.value: return ActionActivationType.NEVER if normalized_value == ActionActivationType.RANDOM.value: return ActionActivationType.RANDOM if normalized_value == ActionActivationType.KEYWORD.value: return ActionActivationType.KEYWORD return ActionActivationType.ALWAYS @staticmethod def _coerce_float(value: Any, default: float = 0.0) -> float: """将任意值安全转换为浮点数。 Args: value: 待转换的输入值。 default: 转换失败时返回的默认值。 Returns: float: 转换后的浮点结果。 """ try: return float(value) except (TypeError, ValueError): return default @staticmethod def _build_action_info(entry: "ActionEntry") -> ActionInfo: """将运行时 Action 条目转换为核心动作信息。 Args: entry: 插件运行时中的 Action 条目。 Returns: ActionInfo: 供核心 Planner 使用的动作信息。 """ metadata = dict(entry.metadata) raw_action_parameters = metadata.get("action_parameters") action_parameters = ( {str(param_name): str(param_description) for param_name, param_description in raw_action_parameters.items()} if isinstance(raw_action_parameters, dict) else {} ) action_require = [ str(item) for item in (metadata.get("action_require") or []) if item is not None and str(item).strip() ] associated_types = [ str(item) for item in (metadata.get("associated_types") or []) if item is not None and str(item).strip() ] activation_keywords = [ str(item) for item in (metadata.get("activation_keywords") or []) if item is not None and str(item).strip() ] return ActionInfo( name=entry.name, description=str(metadata.get("description", "") or ""), enabled=bool(entry.enabled), plugin_name=entry.plugin_id, action_parameters=action_parameters, action_require=action_require, associated_types=associated_types, activation_type=ComponentQueryService._coerce_action_activation_type(metadata.get("activation_type")), random_activation_probability=ComponentQueryService._coerce_float( metadata.get("activation_probability"), 0.0, ), activation_keywords=activation_keywords, parallel_action=bool(metadata.get("parallel_action", False)), ) @staticmethod def _build_command_info(entry: "CommandEntry") -> CommandInfo: """将运行时 Command 条目转换为核心命令信息。 Args: entry: 插件运行时中的 Command 条目。 Returns: CommandInfo: 供核心命令链使用的命令信息。 """ metadata = dict(entry.metadata) return CommandInfo( name=entry.name, description=str(metadata.get("description", "") or ""), enabled=bool(entry.enabled), plugin_name=entry.plugin_id, ) @staticmethod def _build_tool_definition(entry: "ToolEntry") -> dict[str, Any]: """将运行时 Tool 条目转换为原始工具定义字典。 Args: entry: 插件运行时中的 Tool 条目。 Returns: dict[str, Any]: 可交给 `normalize_tool_option()` 的原始工具定义。 """ raw_definition: dict[str, Any] = { "name": entry.name, "description": entry.description, } if isinstance(entry.parameters_raw, dict) and entry.parameters_raw: raw_definition["parameters_schema"] = entry.parameters_raw return raw_definition if isinstance(entry.parameters, list) and entry.parameters: raw_definition["parameters"] = entry.parameters return raw_definition if isinstance(entry.parameters_raw, list) and entry.parameters_raw: raw_definition["parameters"] = entry.parameters_raw return raw_definition return raw_definition @staticmethod def _build_tool_parameters_schema(entry: "ToolEntry") -> dict[str, Any] | None: """将运行时 Tool 条目转换为对象级参数 Schema。 Args: entry: 插件运行时中的 Tool 条目。 Returns: dict[str, Any] | None: 规范化后的对象级参数 Schema。 """ normalized_option = normalize_tool_option(ComponentQueryService._build_tool_definition(entry)) return normalized_option.parameters_schema @staticmethod def _build_tool_info(entry: "ToolEntry") -> ToolInfo: """将运行时 Tool 条目转换为核心工具信息。 Args: entry: 插件运行时中的 Tool 条目。 Returns: ToolInfo: 供 ToolExecutor 与能力层使用的工具信息。 """ return ToolInfo( name=entry.name, description=entry.brief_description or entry.description, enabled=bool(entry.enabled), plugin_name=entry.plugin_id, parameters_schema=ComponentQueryService._build_tool_parameters_schema(entry), ) @staticmethod def _build_tool_spec(entry: "ToolEntry") -> ToolSpec: """将运行时 Tool 条目转换为统一工具声明。 Args: entry: 插件运行时中的 Tool 条目。 Returns: ToolSpec: 统一工具声明。 """ parameters_schema = ComponentQueryService._build_tool_parameters_schema(entry) return ToolSpec( name=entry.name, brief_description=entry.brief_description or entry.description or f"工具 {entry.name}", detailed_description=entry.detailed_description or build_tool_detailed_description(parameters_schema), parameters_schema=parameters_schema, provider_name=entry.plugin_id, provider_type="plugin", metadata={ "plugin_id": entry.plugin_id, "invoke_method": entry.invoke_method, "legacy_component_type": entry.legacy_component_type, }, ) @staticmethod def _log_duplicate_component(component_type: ComponentType, component_name: str) -> None: """记录重复组件名称冲突。 Args: component_type: 组件类型。 component_name: 发生冲突的组件名称。 """ logger.warning(f"检测到重复{component_type.value}名称 {component_name},将只保留首个匹配项") def _get_unique_component_entry( self, component_type: ComponentType, name: str, ) -> Optional[tuple["PluginSupervisor", "ComponentEntry"]]: """按组件短名解析唯一条目。 Args: component_type: 目标组件类型。 name: 组件短名。 Returns: Optional[tuple[PluginSupervisor, ComponentEntry]]: 唯一命中的组件条目。 """ matched_entries = [ (supervisor, entry) for supervisor, entry in self._iter_component_entries(component_type) if entry.name == name ] if not matched_entries: return None if len(matched_entries) > 1: self._log_duplicate_component(component_type, name) return matched_entries[0] def _collect_unique_component_infos( self, component_type: ComponentType, ) -> Dict[str, ComponentInfo]: """收集某类组件的唯一信息视图。 Args: component_type: 目标组件类型。 Returns: Dict[str, ComponentInfo]: 组件名到核心组件信息的映射。 """ collected_components: Dict[str, ComponentInfo] = {} for _supervisor, entry in self._iter_component_entries(component_type): if entry.name in collected_components: self._log_duplicate_component(component_type, entry.name) continue if component_type == ComponentType.ACTION: collected_components[entry.name] = self._build_action_info(entry) # type: ignore[arg-type] elif component_type == ComponentType.COMMAND: collected_components[entry.name] = self._build_command_info(entry) # type: ignore[arg-type] elif component_type == ComponentType.TOOL: collected_components[entry.name] = self._build_tool_info(entry) # type: ignore[arg-type] return collected_components @staticmethod def _extract_stream_id_from_action_kwargs(kwargs: Dict[str, Any]) -> str: """从旧 ActionManager 参数中提取聊天流 ID。 Args: kwargs: 旧动作执行器收到的关键字参数。 Returns: str: 提取出的 ``stream_id``。 """ chat_stream = kwargs.get("chat_stream") if chat_stream is not None: try: return str(chat_stream.session_id) except AttributeError: pass return str(kwargs.get("stream_id", "") or "") @staticmethod def _build_action_executor(supervisor: "PluginSupervisor", plugin_id: str, component_name: str) -> ActionExecutor: """构造动作执行 RPC 闭包。 Args: supervisor: 负责该组件的监督器。 plugin_id: 插件 ID。 component_name: 组件名称。 Returns: ActionExecutor: 兼容旧 Planner 的异步执行器。 """ async def _executor(**kwargs: Any) -> tuple[bool, str]: """将核心动作调用桥接到插件运行时。 Args: **kwargs: 旧 ActionManager 传入的上下文参数。 Returns: tuple[bool, str]: ``(是否成功, 结果说明)``。 """ invoke_args: Dict[str, Any] = {} action_data = kwargs.get("action_data") if isinstance(action_data, dict): invoke_args.update(action_data) stream_id = ComponentQueryService._extract_stream_id_from_action_kwargs(kwargs) invoke_args["action_data"] = action_data if isinstance(action_data, dict) else {} invoke_args["stream_id"] = stream_id invoke_args["chat_id"] = stream_id invoke_args["reasoning"] = str(kwargs.get("action_reasoning", "") or "") if (thinking_id := kwargs.get("thinking_id")) is not None: invoke_args["thinking_id"] = str(thinking_id) if isinstance(kwargs.get("cycle_timers"), dict): invoke_args["cycle_timers"] = kwargs["cycle_timers"] if isinstance(kwargs.get("plugin_config"), dict): invoke_args["plugin_config"] = kwargs["plugin_config"] if isinstance(kwargs.get("log_prefix"), str): invoke_args["log_prefix"] = kwargs["log_prefix"] if isinstance(kwargs.get("shutting_down"), bool): invoke_args["shutting_down"] = kwargs["shutting_down"] try: response = await supervisor.invoke_plugin( method="plugin.invoke_action", plugin_id=plugin_id, component_name=component_name, args=invoke_args, timeout_ms=30000, ) except Exception as exc: logger.error(f"运行时 Action {plugin_id}.{component_name} 执行失败: {exc}", exc_info=True) return False, str(exc) payload = response.payload if isinstance(response.payload, dict) else {} success = bool(payload.get("success", False)) result = payload.get("result") if isinstance(result, (list, tuple)): if len(result) >= 2: return bool(result[0]), "" if result[1] is None else str(result[1]) if len(result) == 1: return bool(result[0]), "" if success: return True, "" if result is None else str(result) return False, "" if result is None else str(result) return _executor @staticmethod def _build_command_executor( supervisor: "PluginSupervisor", plugin_id: str, component_name: str, metadata: Dict[str, Any], ) -> CommandExecutor: """构造命令执行 RPC 闭包。 Args: supervisor: 负责该组件的监督器。 plugin_id: 插件 ID。 component_name: 组件名称。 metadata: 命令组件元数据。 Returns: CommandExecutor: 兼容旧消息命令链的执行器。 """ async def _executor(**kwargs: Any) -> tuple[bool, Optional[str], bool]: """将核心命令调用桥接到插件运行时。 Args: **kwargs: 命令执行上下文参数。 Returns: tuple[bool, Optional[str], bool]: ``(是否成功, 返回文本, 是否拦截后续消息)``。 """ message = kwargs.get("message") matched_groups = kwargs.get("matched_groups") plugin_config = kwargs.get("plugin_config") message_info = getattr(message, "message_info", None) group_info = getattr(message_info, "group_info", None) user_info = getattr(message_info, "user_info", None) invoke_args: Dict[str, Any] = { "text": str(getattr(message, "processed_plain_text", "") or ""), "stream_id": str(getattr(message, "session_id", "") or ""), "group_id": str(getattr(group_info, "group_id", "") or ""), "user_id": str(getattr(user_info, "user_id", "") or ""), "matched_groups": matched_groups if isinstance(matched_groups, dict) else {}, } if isinstance(plugin_config, dict): invoke_args["plugin_config"] = plugin_config try: response = await supervisor.invoke_plugin( method="plugin.invoke_command", plugin_id=plugin_id, component_name=component_name, args=invoke_args, timeout_ms=30000, ) except Exception as exc: logger.error(f"运行时 Command {plugin_id}.{component_name} 执行失败: {exc}", exc_info=True) return False, str(exc), True payload = response.payload if isinstance(response.payload, dict) else {} success = bool(payload.get("success", False)) result = payload.get("result") intercept = bool(metadata.get("intercept_message_level", 0)) response_text: Optional[str] if isinstance(result, (list, tuple)) and len(result) >= 3: response_text = None if result[1] is None else str(result[1]) intercept = bool(result[2]) else: response_text = None if result is None else str(result) return success, response_text, intercept return _executor @staticmethod def _build_tool_executor( supervisor: "PluginSupervisor", plugin_id: str, component_name: str, invoke_method: str = "plugin.invoke_tool", ) -> ToolExecutor: """构造工具执行 RPC 闭包。 Args: supervisor: 负责该组件的监督器。 plugin_id: 插件 ID。 component_name: 组件名称。 Returns: ToolExecutor: 兼容旧 ToolExecutor 的异步执行器。 """ async def _executor(function_args: Dict[str, Any]) -> Any: """将核心工具调用桥接到插件运行时。 Args: function_args: 工具调用参数。 Returns: Any: 插件工具返回结果;若结果不是字典,则会包装为 ``{"content": ...}``。 """ try: response = await supervisor.invoke_plugin( method=invoke_method, plugin_id=plugin_id, component_name=component_name, args=function_args, timeout_ms=30000, ) except Exception as exc: logger.error(f"运行时 Tool {plugin_id}.{component_name} 执行失败: {exc}", exc_info=True) return {"content": f"工具 {component_name} 执行失败: {exc}"} payload = response.payload if isinstance(response.payload, dict) else {} result = payload.get("result") if isinstance(result, dict): return result return {"content": "" if result is None else str(result)} return _executor def get_action_info(self, name: str) -> Optional[ActionInfo]: """获取指定动作的信息。 Args: name: 动作名称。 Returns: Optional[ActionInfo]: 匹配到的动作信息。 """ matched_entry = self._get_unique_component_entry(ComponentType.ACTION, name) if matched_entry is None: return None _supervisor, entry = matched_entry return self._build_action_info(entry) # type: ignore[arg-type] def get_action_executor(self, name: str) -> Optional[ActionExecutor]: """获取指定动作的执行器。 Args: name: 动作名称。 Returns: Optional[ActionExecutor]: 运行时 RPC 执行闭包。 """ matched_entry = self._get_unique_component_entry(ComponentType.ACTION, name) if matched_entry is None: return None supervisor, entry = matched_entry return self._build_action_executor(supervisor, entry.plugin_id, entry.name) def get_default_actions(self) -> Dict[str, ActionInfo]: """获取当前默认启用的动作集合。 Returns: Dict[str, ActionInfo]: 动作名到动作信息的映射。 """ action_infos = self._collect_unique_component_infos(ComponentType.ACTION) return {name: info for name, info in action_infos.items() if isinstance(info, ActionInfo) and info.enabled} def find_command_by_text(self, text: str) -> Optional[Tuple[CommandExecutor, dict, CommandInfo]]: """根据文本查找匹配的命令。 Args: text: 待匹配的文本内容。 Returns: Optional[Tuple[CommandExecutor, dict, CommandInfo]]: 匹配结果。 """ for supervisor in self._iter_supervisors(): match_result = supervisor.component_registry.find_command_by_text(text) if match_result is None: continue entry, matched_groups = match_result command_info = self._build_command_info(entry) # type: ignore[arg-type] command_executor = self._build_command_executor( supervisor, entry.plugin_id, entry.name, dict(entry.metadata), ) return command_executor, matched_groups, command_info return None def get_tool_info(self, name: str) -> Optional[ToolInfo]: """获取指定工具的信息。 Args: name: 工具名称。 Returns: Optional[ToolInfo]: 匹配到的工具信息。 """ matched_entry = self._get_unique_component_entry(ComponentType.TOOL, name) if matched_entry is None: return None _supervisor, entry = matched_entry return self._build_tool_info(entry) # type: ignore[arg-type] def get_tool_executor(self, name: str) -> Optional[ToolExecutor]: """获取指定工具的执行器。 Args: name: 工具名称。 Returns: Optional[ToolExecutor]: 运行时 RPC 执行闭包。 """ matched_entry = self._get_unique_component_entry(ComponentType.TOOL, name) if matched_entry is None: return None supervisor, entry = matched_entry tool_entry = cast("ToolEntry", entry) return self._build_tool_executor(supervisor, tool_entry.plugin_id, tool_entry.name, tool_entry.invoke_method) def get_llm_available_tool_specs(self) -> Dict[str, ToolSpec]: """获取当前可供 LLM 使用的统一工具声明集合。 Returns: Dict[str, ToolSpec]: 工具名到工具声明的映射。 """ collected_specs: Dict[str, ToolSpec] = {} for _supervisor, entry in self._iter_component_entries(ComponentType.TOOL): if entry.name in collected_specs: self._log_duplicate_component(ComponentType.TOOL, entry.name) continue collected_specs[entry.name] = self._build_tool_spec(entry) # type: ignore[arg-type] return collected_specs @staticmethod def _build_tool_invocation_payload( entry: "ToolEntry", invocation: ToolInvocation, context: Optional[ToolExecutionContext], ) -> Dict[str, Any]: """构造插件工具执行时发送给 Runner 的参数。 Args: entry: 目标工具条目。 invocation: 统一工具调用请求。 context: 统一工具执行上下文。 Returns: Dict[str, Any]: 发往 Runner 的参数字典。 """ payload = dict(invocation.arguments) if entry.invoke_method == "plugin.invoke_action": stream_id = context.stream_id if context is not None else invocation.stream_id reasoning = context.reasoning if context is not None else invocation.reasoning payload = { **payload, "stream_id": stream_id, "chat_id": stream_id, "reasoning": reasoning, "action_data": dict(invocation.arguments), } return payload @staticmethod def _parse_tool_invoke_result( entry: "ToolEntry", result: Any, ) -> ToolExecutionResult: """将插件组件返回值转换为统一工具执行结果。 Args: entry: 目标工具条目。 result: 插件组件原始返回值。 Returns: ToolExecutionResult: 统一执行结果。 """ if isinstance(result, dict): success = bool(result.get("success", True)) content = str(result.get("content", result.get("message", "")) or "").strip() error_message = "" if not success: error_message = str(result.get("error", result.get("message", "插件工具执行失败")) or "").strip() return ToolExecutionResult( tool_name=entry.name, success=success, content=content, error_message=error_message, structured_content=result, metadata={"plugin_id": entry.plugin_id}, ) if isinstance(result, (list, tuple)) and result: if isinstance(result[0], bool): success = bool(result[0]) message = "" if len(result) < 2 or result[1] is None else str(result[1]).strip() return ToolExecutionResult( tool_name=entry.name, success=success, content=message if success else "", error_message="" if success else message, structured_content=list(result), metadata={"plugin_id": entry.plugin_id}, ) normalized_content = "" if result is None else str(result).strip() return ToolExecutionResult( tool_name=entry.name, success=True, content=normalized_content, structured_content=result, metadata={"plugin_id": entry.plugin_id}, ) async def invoke_tool_as_tool( self, invocation: ToolInvocation, context: Optional[ToolExecutionContext] = None, ) -> ToolExecutionResult: """按统一工具语义执行插件工具。 Args: invocation: 统一工具调用请求。 context: 执行上下文。 Returns: ToolExecutionResult: 统一工具执行结果。 """ matched_entry = self._get_unique_component_entry(ComponentType.TOOL, invocation.tool_name) if matched_entry is None: return ToolExecutionResult( tool_name=invocation.tool_name, success=False, error_message=f"未找到插件工具:{invocation.tool_name}", ) supervisor, entry = matched_entry tool_entry = cast("ToolEntry", entry) invoke_payload = self._build_tool_invocation_payload(tool_entry, invocation, context) try: response = await supervisor.invoke_plugin( method=tool_entry.invoke_method, plugin_id=tool_entry.plugin_id, component_name=tool_entry.name, args=invoke_payload, timeout_ms=30000, ) except Exception as exc: logger.error(f"运行时工具 {tool_entry.plugin_id}.{tool_entry.name} 执行失败: {exc}", exc_info=True) return ToolExecutionResult( tool_name=tool_entry.name, success=False, error_message=str(exc), metadata={"plugin_id": tool_entry.plugin_id}, ) payload = response.payload if isinstance(response.payload, dict) else {} transport_success = bool(payload.get("success", False)) result = payload.get("result") if not transport_success: return ToolExecutionResult( tool_name=tool_entry.name, success=False, error_message="" if result is None else str(result), structured_content=result, metadata={"plugin_id": tool_entry.plugin_id}, ) return self._parse_tool_invoke_result(tool_entry, result) def get_llm_available_tools(self) -> Dict[str, ToolInfo]: """获取当前可供 LLM 选择的工具集合。 Returns: Dict[str, ToolInfo]: 工具名到工具信息的映射。 """ tool_infos = self._collect_unique_component_infos(ComponentType.TOOL) return {name: info for name, info in tool_infos.items() if isinstance(info, ToolInfo) and info.enabled} def get_components_by_type(self, component_type: ComponentType) -> Dict[str, ComponentInfo]: """获取某类组件的全部信息。 Args: component_type: 组件类型。 Returns: Dict[str, ComponentInfo]: 组件名到组件信息的映射。 """ return self._collect_unique_component_infos(component_type) def get_plugin_config(self, plugin_name: str) -> Optional[dict]: """读取指定插件的配置文件内容。 Args: plugin_name: 插件名称。 Returns: Optional[dict]: 读取成功时返回配置字典;未找到时返回 ``None``。 """ runtime_manager = self._get_runtime_manager() try: supervisor = runtime_manager._get_supervisor_for_plugin(plugin_name) except RuntimeError as exc: logger.error(f"读取插件配置失败: {exc}") return None if supervisor is None: return None try: return runtime_manager._load_plugin_config_for_supervisor(supervisor, plugin_name) except Exception as exc: logger.error(f"读取插件 {plugin_name} 配置失败: {exc}", exc_info=True) return None def get_plugin_default_config(self, plugin_name: str) -> Optional[dict]: """获取指定插件注册时上报的默认配置。 Args: plugin_name: 插件名称。 Returns: Optional[dict]: 默认配置字典;未找到时返回 ``None``。 """ runtime_manager = self._get_runtime_manager() try: supervisor = runtime_manager._get_supervisor_for_plugin(plugin_name) except RuntimeError as exc: logger.error(f"读取插件默认配置失败: {exc}") return None if supervisor is None: return None registration = supervisor._registered_plugins.get(plugin_name) if registration is None: return None return dict(registration.default_config) def get_plugin_config_schema(self, plugin_name: str) -> Optional[dict]: """获取指定插件注册时上报的配置 Schema。 Args: plugin_name: 插件名称。 Returns: Optional[dict]: 配置 Schema;未找到时返回 ``None``。 """ runtime_manager = self._get_runtime_manager() try: supervisor = runtime_manager._get_supervisor_for_plugin(plugin_name) except RuntimeError as exc: logger.error(f"读取插件配置 Schema 失败: {exc}") return None if supervisor is None: return None registration = supervisor._registered_plugins.get(plugin_name) if registration is None: return None return dict(registration.config_schema) def list_hook_specs(self) -> list[dict[str, Any]]: """返回当前运行时公开的 Hook 规格清单。 Returns: list[dict[str, Any]]: 可直接序列化给 WebUI 的 Hook 规格列表。 """ runtime_manager = self._get_runtime_manager() return [ { "name": spec.name, "description": spec.description, "parameters_schema": deepcopy(spec.parameters_schema), "default_timeout_ms": spec.default_timeout_ms, "allow_blocking": spec.allow_blocking, "allow_observe": spec.allow_observe, "allow_abort": spec.allow_abort, "allow_kwargs_mutation": spec.allow_kwargs_mutation, } for spec in runtime_manager.list_hook_specs() ] component_query_service = ComponentQueryService()