From 18a0e7664ad23a1b582610557501f3d81923fd8a Mon Sep 17 00:00:00 2001 From: DrSmoothl <1787882683@qq.com> Date: Mon, 23 Mar 2026 16:14:13 +0800 Subject: [PATCH] Refactor plugin runtime components and enhance message handling - Removed unused core action mirror functionality from PluginRunnerSupervisor. - Simplified action and command execution logic in send_service.py. - Introduced ComponentQueryService for unified component querying in plugin runtime. - Enhanced message component handling with new binary component support. - Improved message sequence construction and detection of outbound message flags. - Updated methods for sending messages to streamline the process and improve readability. --- pytests/test_platform_io_dedupe.py | 6 +- pytests/test_plugin_runtime_action_bridge.py | 328 +++++--- src/chat/brain_chat/brain_planner.py | 32 +- src/chat/message_receive/bot.py | 114 +-- src/chat/planner_actions/action_manager.py | 14 +- src/chat/planner_actions/planner.py | 41 +- src/chat/tool_executor.py | 14 +- src/core/component_registry.py | 239 ------ src/platform_io/manager.py | 24 - src/plugin_runtime/capabilities/core.py | 8 +- src/plugin_runtime/capabilities/data.py | 4 +- src/plugin_runtime/component_query.py | 709 ++++++++++++++++++ src/plugin_runtime/host/component_registry.py | 72 +- src/plugin_runtime/host/supervisor.py | 238 ------ src/services/send_service.py | 265 ++++--- 15 files changed, 1255 insertions(+), 853 deletions(-) delete mode 100644 src/core/component_registry.py create mode 100644 src/plugin_runtime/component_query.py diff --git a/pytests/test_platform_io_dedupe.py b/pytests/test_platform_io_dedupe.py index 68ae95c6..d6bdd1dd 100644 --- a/pytests/test_platform_io_dedupe.py +++ b/pytests/test_platform_io_dedupe.py @@ -72,10 +72,10 @@ class _StubPlatformIODriver(PlatformIODriver): def _build_manager() -> PlatformIOManager: - """构造带有最小 active owner 的 Broker 管理器。 + """构造带有最小接收路由的 Broker 管理器。 Returns: - PlatformIOManager: 已注册测试驱动并绑定活动路由的 Broker。 + PlatformIOManager: 已注册测试驱动并绑定接收路由的 Broker。 """ manager = PlatformIOManager() driver = _StubPlatformIODriver( @@ -88,7 +88,7 @@ def _build_manager() -> PlatformIOManager: ) ) manager.register_driver(driver) - manager.bind_route( + manager.bind_receive_route( RouteBinding( route_key=RouteKey(platform="qq", account_id="10001", scope="main"), driver_id=driver.driver_id, diff --git a/pytests/test_plugin_runtime_action_bridge.py b/pytests/test_plugin_runtime_action_bridge.py index f2364094..e13dfaf3 100644 --- a/pytests/test_plugin_runtime_action_bridge.py +++ b/pytests/test_plugin_runtime_action_bridge.py @@ -1,57 +1,109 @@ +"""核心组件查询层与插件运行时聚合测试。""" + from types import SimpleNamespace from typing import Any import pytest -from src.core.component_registry import component_registry as core_component_registry +import src.plugin_runtime.integration as integration_module + +from src.core.types import ActionInfo, ToolInfo +from src.plugin_runtime.component_query import component_query_service from src.plugin_runtime.host.supervisor import PluginSupervisor -from src.plugin_runtime.protocol.envelope import ComponentDeclaration, RegisterPluginPayload -def _build_action_payload(plugin_id: str, action_name: str) -> RegisterPluginPayload: - """构造用于测试的 runtime Action 注册载荷。 +class _FakeRuntimeManager: + """测试用插件运行时管理器。""" + + def __init__(self, supervisor: PluginSupervisor, plugin_id: str, plugin_config: dict[str, Any]) -> None: + """初始化测试用运行时管理器。 + + Args: + supervisor: 持有测试组件的监督器。 + plugin_id: 目标插件 ID。 + plugin_config: 需要返回的插件配置。 + """ + + self.supervisors = [supervisor] + self._plugin_id = plugin_id + self._plugin_config = plugin_config + + def _get_supervisor_for_plugin(self, plugin_id: str) -> PluginSupervisor | None: + """按插件 ID 返回对应监督器。 + + Args: + plugin_id: 目标插件 ID。 + + Returns: + PluginSupervisor | None: 命中时返回监督器。 + """ + + return self.supervisors[0] if plugin_id == self._plugin_id else None + + def _load_plugin_config_for_supervisor(self, supervisor: Any, plugin_id: str) -> dict[str, Any]: + """返回测试配置。 + + Args: + supervisor: 监督器实例。 + plugin_id: 目标插件 ID。 + + Returns: + dict[str, Any]: 测试配置内容。 + """ + + del supervisor + if plugin_id != self._plugin_id: + return {} + return dict(self._plugin_config) + + +def _install_runtime_manager( + monkeypatch: pytest.MonkeyPatch, + supervisor: PluginSupervisor, + plugin_id: str, + plugin_config: dict[str, Any] | None = None, +) -> None: + """为测试安装假的运行时管理器。 Args: - plugin_id: 插件 ID。 - action_name: Action 名称。 - - Returns: - RegisterPluginPayload: 测试用注册载荷。 + monkeypatch: pytest monkeypatch 对象。 + supervisor: 持有测试组件的监督器。 + plugin_id: 测试插件 ID。 + plugin_config: 可选的测试配置内容。 """ - return RegisterPluginPayload( - plugin_id=plugin_id, - plugin_version="1.0.0", - components=[ - ComponentDeclaration( - name=action_name, - component_type="ACTION", - plugin_id=plugin_id, - metadata={ - "description": "发送一个测试回复", - "enabled": True, - "activation_type": "keyword", - "activation_probability": 0.25, - "activation_keywords": ["测试", "hello"], - "action_parameters": {"target": "目标对象"}, - "action_require": ["需要发送回复时使用"], - "associated_types": ["text"], - "parallel_action": True, - }, - ) - ], - ) + + fake_manager = _FakeRuntimeManager(supervisor, plugin_id, plugin_config or {"enabled": True}) + monkeypatch.setattr(integration_module, "get_plugin_runtime_manager", lambda: fake_manager) @pytest.mark.asyncio -async def test_runtime_actions_are_mirrored_into_core_registry_and_invoked(monkeypatch: pytest.MonkeyPatch) -> None: - """运行时 Action 应镜像到旧核心注册表,并可由旧 Planner 执行。""" +async def test_core_component_registry_reads_runtime_action_and_executor( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """核心查询层应直接读取运行时 Action,并返回 RPC 执行闭包。""" + plugin_id = "runtime_action_bridge_plugin" action_name = "runtime_action_bridge_test" - payload = _build_action_payload(plugin_id=plugin_id, action_name=action_name) supervisor = PluginSupervisor(plugin_dirs=[]) captured: dict[str, Any] = {} - core_component_registry.remove_action(action_name) + supervisor.component_registry.register_component( + name=action_name, + component_type="ACTION", + plugin_id=plugin_id, + metadata={ + "description": "发送一个测试回复", + "enabled": True, + "activation_type": "keyword", + "activation_probability": 0.25, + "activation_keywords": ["测试", "hello"], + "action_parameters": {"target": "目标对象"}, + "action_require": ["需要发送回复时使用"], + "associated_types": ["text"], + "parallel_action": True, + }, + ) + _install_runtime_manager(monkeypatch, supervisor, plugin_id, {"enabled": True, "mode": "test"}) async def fake_invoke_plugin( method: str, @@ -60,18 +112,8 @@ async def test_runtime_actions_are_mirrored_into_core_registry_and_invoked(monke args: dict[str, Any] | None = None, timeout_ms: int = 30000, ) -> Any: - """模拟 plugin runtime Action 调用。 + """模拟动作 RPC 调用。""" - Args: - method: RPC 方法名。 - plugin_id: 插件 ID。 - component_name: 组件名称。 - args: 调用参数。 - timeout_ms: RPC 超时时间。 - - Returns: - Any: 伪造的 RPC 响应对象。 - """ captured["method"] = method captured["plugin_id"] = plugin_id captured["component_name"] = component_name @@ -81,58 +123,162 @@ async def test_runtime_actions_are_mirrored_into_core_registry_and_invoked(monke monkeypatch.setattr(supervisor, "invoke_plugin", fake_invoke_plugin) - try: - supervisor._mirror_runtime_actions_to_core_registry(payload) + action_info = component_query_service.get_action_info(action_name) + assert isinstance(action_info, ActionInfo) + assert action_info.plugin_name == plugin_id + assert action_info.description == "发送一个测试回复" + assert action_info.activation_keywords == ["测试", "hello"] + assert action_info.random_activation_probability == 0.25 + assert action_info.parallel_action is True + assert action_name in component_query_service.get_default_actions() + assert component_query_service.get_plugin_config(plugin_id) == {"enabled": True, "mode": "test"} - action_info = core_component_registry.get_action_info(action_name) - assert action_info is not None - assert action_info.plugin_name == plugin_id - assert action_info.description == "发送一个测试回复" - assert action_info.activation_keywords == ["测试", "hello"] - assert action_info.random_activation_probability == 0.25 - assert action_info.parallel_action is True + executor = component_query_service.get_action_executor(action_name) + assert executor is not None - executor = core_component_registry.get_action_executor(action_name) - assert executor is not None + success, reason = await executor( + action_data={"target": "MaiBot"}, + action_reasoning="当前适合使用这个动作", + cycle_timers={"planner": 0.1}, + thinking_id="tid-1", + chat_stream=SimpleNamespace(session_id="stream-1"), + log_prefix="[test]", + shutting_down=False, + plugin_config={"enabled": True}, + ) - success, reason = await executor( - action_data={"target": "MaiBot"}, - action_reasoning="当前适合使用这个动作", - cycle_timers={"planner": 0.1}, - thinking_id="tid-1", - chat_stream=SimpleNamespace(session_id="stream-1"), - log_prefix="[test]", - shutting_down=False, - plugin_config={"enabled": True}, - ) - - assert success is True - assert reason == "runtime action executed" - assert captured["method"] == "plugin.invoke_action" - assert captured["plugin_id"] == plugin_id - assert captured["component_name"] == action_name - assert captured["args"]["stream_id"] == "stream-1" - assert captured["args"]["chat_id"] == "stream-1" - assert captured["args"]["reasoning"] == "当前适合使用这个动作" - assert captured["args"]["target"] == "MaiBot" - assert captured["args"]["action_data"] == {"target": "MaiBot"} - finally: - supervisor._remove_core_action_mirrors(plugin_id) - core_component_registry.remove_action(action_name) + assert success is True + assert reason == "runtime action executed" + assert captured["method"] == "plugin.invoke_action" + assert captured["plugin_id"] == plugin_id + assert captured["component_name"] == action_name + assert captured["args"]["stream_id"] == "stream-1" + assert captured["args"]["chat_id"] == "stream-1" + assert captured["args"]["reasoning"] == "当前适合使用这个动作" + assert captured["args"]["target"] == "MaiBot" + assert captured["args"]["action_data"] == {"target": "MaiBot"} -def test_clear_runner_state_removes_mirrored_runtime_actions() -> None: - """清理 Runner 状态时应同步移除旧核心注册表中的镜像 Action。""" - plugin_id = "runtime_action_bridge_cleanup_plugin" - action_name = "runtime_action_bridge_cleanup_test" - payload = _build_action_payload(plugin_id=plugin_id, action_name=action_name) +@pytest.mark.asyncio +async def test_core_component_registry_reads_runtime_command_and_executor( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """核心查询层应直接使用运行时命令匹配与执行闭包。""" + + plugin_id = "runtime_command_bridge_plugin" + command_name = "runtime_command_bridge_test" + supervisor = PluginSupervisor(plugin_dirs=[]) + captured: dict[str, Any] = {} + + supervisor.component_registry.register_component( + name=command_name, + component_type="COMMAND", + plugin_id=plugin_id, + metadata={ + "description": "测试命令", + "enabled": True, + "command_pattern": r"^/test(?:\s+.+)?$", + "aliases": ["/hello"], + "intercept_message_level": 1, + }, + ) + _install_runtime_manager(monkeypatch, supervisor, plugin_id, {"mode": "command"}) + + async def fake_invoke_plugin( + method: str, + plugin_id: str, + component_name: str, + args: dict[str, Any] | None = None, + timeout_ms: int = 30000, + ) -> Any: + """模拟命令 RPC 调用。""" + + captured["method"] = method + captured["plugin_id"] = plugin_id + captured["component_name"] = component_name + captured["args"] = args or {} + captured["timeout_ms"] = timeout_ms + return SimpleNamespace(payload={"success": True, "result": (True, "command ok", True)}) + + monkeypatch.setattr(supervisor, "invoke_plugin", fake_invoke_plugin) + + matched = component_query_service.find_command_by_text("/test hello") + assert matched is not None + command_executor, matched_groups, command_info = matched + + assert matched_groups == {} + assert command_info.plugin_name == plugin_id + assert command_info.command_pattern == r"^/test(?:\s+.+)?$" + + success, response_text, intercept = await command_executor( + message=SimpleNamespace(processed_plain_text="/test hello", session_id="stream-2"), + plugin_config={"mode": "command"}, + matched_groups=matched_groups, + ) + + assert success is True + assert response_text == "command ok" + assert intercept is True + assert captured["method"] == "plugin.invoke_command" + assert captured["plugin_id"] == plugin_id + assert captured["component_name"] == command_name + assert captured["args"]["text"] == "/test hello" + assert captured["args"]["stream_id"] == "stream-2" + assert captured["args"]["plugin_config"] == {"mode": "command"} + + +@pytest.mark.asyncio +async def test_core_component_registry_reads_runtime_tools_and_executor( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """核心查询层应直接读取运行时 Tool,并返回 RPC 执行闭包。""" + + plugin_id = "runtime_tool_bridge_plugin" + tool_name = "runtime_tool_bridge_test" supervisor = PluginSupervisor(plugin_dirs=[]) - core_component_registry.remove_action(action_name) + supervisor.component_registry.register_component( + name=tool_name, + component_type="TOOL", + plugin_id=plugin_id, + metadata={ + "description": "测试工具", + "enabled": True, + "parameters": [ + { + "name": "query", + "param_type": "string", + "description": "查询词", + "required": True, + } + ], + }, + ) + _install_runtime_manager(monkeypatch, supervisor, plugin_id) - supervisor._mirror_runtime_actions_to_core_registry(payload) - assert core_component_registry.get_action_info(action_name) is not None + async def fake_invoke_plugin( + method: str, + plugin_id: str, + component_name: str, + args: dict[str, Any] | None = None, + timeout_ms: int = 30000, + ) -> Any: + """模拟工具 RPC 调用。""" - supervisor._clear_runner_state() + del timeout_ms + assert method == "plugin.invoke_tool" + assert plugin_id == "runtime_tool_bridge_plugin" + assert component_name == "runtime_tool_bridge_test" + assert args == {"query": "MaiBot"} + return SimpleNamespace(payload={"success": True, "result": {"content": "tool ok"}}) - assert core_component_registry.get_action_info(action_name) is None + monkeypatch.setattr(supervisor, "invoke_plugin", fake_invoke_plugin) + + tool_info = component_query_service.get_tool_info(tool_name) + assert isinstance(tool_info, ToolInfo) + assert tool_info.tool_description == "测试工具" + assert tool_name in component_query_service.get_llm_available_tools() + + executor = component_query_service.get_tool_executor(tool_name) + assert executor is not None + assert await executor({"query": "MaiBot"}) == {"content": "tool ok"} diff --git a/src/chat/brain_chat/brain_planner.py b/src/chat/brain_chat/brain_planner.py index 12b103a0..709be8ee 100644 --- a/src/chat/brain_chat/brain_planner.py +++ b/src/chat/brain_chat/brain_planner.py @@ -1,30 +1,32 @@ +from datetime import datetime +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple + import json -import time -import traceback import random import re -from typing import Dict, Optional, Tuple, List, TYPE_CHECKING -from rich.traceback import install -from datetime import datetime -from json_repair import repair_json +import time +import traceback + +from json_repair import repair_json +from rich.traceback import install -from src.llm_models.utils_model import LLMRequest -from src.config.config import global_config, model_config -from src.common.logger import get_logger from src.chat.logger.plan_reply_logger import PlanReplyLogger +from src.chat.message_receive.chat_manager import chat_manager as _chat_manager +from src.chat.planner_actions.action_manager import ActionManager +from src.chat.utils.utils import get_chat_type_and_target_info from src.common.data_models.info_data_model import ActionPlannerInfo +from src.common.logger import get_logger from src.common.utils.utils_action import ActionUtils +from src.config.config import global_config, model_config +from src.core.types import ActionActivationType, ActionInfo, ComponentType +from src.llm_models.utils_model import LLMRequest +from src.plugin_runtime.component_query import component_query_service from src.prompt.prompt_manager import prompt_manager from src.services.message_service import ( build_readable_messages_with_id, get_actions_by_timestamp_with_chat, get_messages_before_time_in_chat, ) -from src.chat.utils.utils import get_chat_type_and_target_info -from src.chat.planner_actions.action_manager import ActionManager -from src.chat.message_receive.chat_manager import chat_manager as _chat_manager -from src.core.types import ActionActivationType, ActionInfo, ComponentType -from src.core.component_registry import component_registry if TYPE_CHECKING: from src.common.data_models.info_data_model import TargetPersonInfo @@ -320,7 +322,7 @@ class BrainPlanner: current_available_actions_dict = self.action_manager.get_using_actions() # 获取完整的动作信息 - all_registered_actions: Dict[str, ActionInfo] = component_registry.get_components_by_type( # type: ignore + all_registered_actions: Dict[str, ActionInfo] = component_query_service.get_components_by_type( # type: ignore ComponentType.ACTION ) current_available_actions = {} diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 23e7de6e..025150fc 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -1,19 +1,19 @@ from contextlib import suppress -import traceback -import os - -from maim_message import MessageBase from typing import Any, Dict, Optional +import os +import traceback +from maim_message import MessageBase + +from src.chat.heart_flow.heartflow_message_processor import HeartFCMessageReceiver from src.common.logger import get_logger from src.common.utils.utils_message import MessageUtils from src.common.utils.utils_session import SessionUtils -from src.chat.heart_flow.heartflow_message_processor import HeartFCMessageReceiver # from src.chat.brain_chat.PFC.pfc_manager import PFCManager from src.core.announcement_manager import global_announcement_manager -from src.core.component_registry import component_registry +from src.plugin_runtime.component_query import component_query_service from .message import SessionMessage from .chat_manager import chat_manager @@ -58,16 +58,22 @@ class ChatBot: logger.error(f"创建PFC聊天失败: {e}") logger.error(traceback.format_exc()) - async def _process_commands(self, message: SessionMessage): - # sourcery skip: use-named-expression - """使用新插件系统处理命令""" + async def _process_commands(self, message: SessionMessage) -> tuple[bool, Optional[str], bool]: + """使用统一组件注册表处理命令。 + + Args: + message: 当前待处理的会话消息。 + + Returns: + tuple[bool, Optional[str], bool]: ``(是否命中命令, 命令响应文本, 是否继续后续处理)``。 + """ if not message.processed_plain_text: return False, None, True # 没有文本内容,继续处理消息 try: text = message.processed_plain_text - # 使用核心组件注册表查找命令 - command_result = component_registry.find_command_by_text(text) + # 使用插件运行时统一查询服务查找命令 + command_result = component_query_service.find_command_by_text(text) if command_result: command_executor, matched_groups, command_info = command_result plugin_name = command_info.plugin_name @@ -81,7 +87,7 @@ class ChatBot: message.is_command = True # 获取插件配置 - plugin_config = component_registry.get_plugin_config(plugin_name) + plugin_config = component_query_service.get_plugin_config(plugin_name) try: # 调用命令执行器 @@ -112,88 +118,32 @@ class ChatBot: # 命令出错时,根据命令的拦截设置决定是否继续处理消息 return True, str(e), False # 出错时继续处理消息 - # 没有找到旧系统命令,尝试新版本插件运行时 - new_cmd_result = await self._process_new_runtime_command(message) - return new_cmd_result if new_cmd_result is not None else (False, None, True) + return False, None, True except Exception as e: logger.error(f"处理命令时出错: {e}") return False, None, True # 出错时继续处理消息 - async def _process_new_runtime_command(self, message: SessionMessage): - """尝试在新版本插件运行时中查找并执行命令 - - Returns: - (found, response, continue_processing) 三元组, - 或 None 表示新运行时中也未找到匹配命令。 - """ - from src.plugin_runtime.integration import get_plugin_runtime_manager - - prm = get_plugin_runtime_manager() - if not prm.is_running: - return None - - matched = prm.find_command_by_text(message.processed_plain_text) - if matched is None: - return None - - command_name = matched["name"] - if message.session_id and command_name in global_announcement_manager.get_disabled_chat_commands( - message.session_id - ): - logger.info(f"[新运行时] 用户禁用的命令,跳过处理: {matched['full_name']}") - return False, None, True - - message.is_command = True - logger.info(f"[新运行时] 匹配命令: {matched['full_name']}") - - try: - resp = await prm.invoke_plugin( - method="plugin.invoke_command", - plugin_id=matched["plugin_id"], - component_name=matched["name"], - args={ - "text": message.processed_plain_text, - "stream_id": message.session_id or "", - "matched_groups": matched.get("matched_groups") or {}, - }, - timeout_ms=30000, - ) - - payload = resp.payload - success = payload.get("success", False) - cmd_result = payload.get("result") - - # 拦截位优先从命令返回值中获取(支持运行时动态决定), - # 回退到组件 metadata 中的静态声明 - if isinstance(cmd_result, (list, tuple)) and len(cmd_result) >= 3: - # 命令返回 (found, response_text, intercept_bool) 三元组 - response_text = cmd_result[1] if cmd_result[1] is not None else "" - intercept = bool(cmd_result[2]) - else: - response_text = cmd_result if cmd_result is not None else "" - intercept = bool(matched["metadata"].get("intercept_message_level", 0)) - - self._mark_command_message(message, int(intercept)) - - if success: - logger.info(f"[新运行时] 命令执行成功: {matched['full_name']}") - else: - logger.warning(f"[新运行时] 命令执行失败: {matched['full_name']} - {response_text}") - - return True, response_text, not intercept - - except Exception as e: - logger.error(f"[新运行时] 执行命令 {matched['full_name']} 异常: {e}", exc_info=True) - return True, str(e), True - @staticmethod def _mark_command_message(message: SessionMessage, intercept_message_level: int) -> None: + """标记消息已经被命令链消费。 + + Args: + message: 待标记的会话消息。 + intercept_message_level: 命令设置的拦截级别。 + """ + message.is_command = True message.message_info.additional_config["intercept_message_level"] = intercept_message_level @staticmethod def _store_intercepted_command_message(message: SessionMessage) -> None: + """将被命令链拦截的消息写入数据库。 + + Args: + message: 已完成命令处理的会话消息。 + """ + MessageUtils.store_message_to_db(message) async def _handle_command_processing_result( diff --git a/src/chat/planner_actions/action_manager.py b/src/chat/planner_actions/action_manager.py index 167cdcab..8133ac18 100644 --- a/src/chat/planner_actions/action_manager.py +++ b/src/chat/planner_actions/action_manager.py @@ -3,8 +3,8 @@ from typing import Dict, Optional, Tuple from src.chat.message_receive.chat_manager import BotChatSession from src.chat.message_receive.message import SessionMessage from src.common.logger import get_logger -from src.core.component_registry import component_registry, ActionExecutor from src.core.types import ActionInfo +from src.plugin_runtime.component_query import ActionExecutor, component_query_service logger = get_logger("action_manager") @@ -28,7 +28,7 @@ class ActionManager: """ 动作管理器,用于管理各种类型的动作 - 使用核心组件注册表的 executor-based 模式。 + 使用插件运行时统一查询服务的 executor-based 模式。 """ def __init__(self): @@ -38,7 +38,7 @@ class ActionManager: self._using_actions: Dict[str, ActionInfo] = {} # 初始化时将默认动作加载到使用中的动作 - self._using_actions = component_registry.get_default_actions() + self._using_actions = component_query_service.get_default_actions() # === 执行Action方法 === @@ -72,17 +72,17 @@ class ActionManager: Optional[ActionHandle]: 执行句柄,如果动作未注册则返回 None """ try: - executor = component_registry.get_action_executor(action_name) + executor = component_query_service.get_action_executor(action_name) if not executor: logger.warning(f"{log_prefix} 未找到Action组件: {action_name}") return None - info = component_registry.get_action_info(action_name) + info = component_query_service.get_action_info(action_name) if not info: logger.warning(f"{log_prefix} 未找到Action组件信息: {action_name}") return None - plugin_config = component_registry.get_plugin_config(info.plugin_name) or {} + plugin_config = component_query_service.get_plugin_config(info.plugin_name) or {} handle = ActionHandle( executor, @@ -133,5 +133,5 @@ class ActionManager: def restore_actions(self) -> None: """恢复到默认动作集""" actions_to_restore = list(self._using_actions.keys()) - self._using_actions = component_registry.get_default_actions() + self._using_actions = component_query_service.get_default_actions() logger.debug(f"恢复动作集: 从 {actions_to_restore} 恢复到默认动作集 {list(self._using_actions.keys())}") diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index 5184abcb..b21efa6b 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -1,33 +1,36 @@ +from collections import OrderedDict +from datetime import datetime +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union + +import contextlib import json -import time -import traceback import random import re -import contextlib -from typing import Dict, Optional, Tuple, List, TYPE_CHECKING, Union -from collections import OrderedDict -from rich.traceback import install -from datetime import datetime +import time +import traceback + from json_repair import repair_json -from src.llm_models.utils_model import LLMRequest -from src.config.config import global_config, model_config -from src.common.logger import get_logger +from rich.traceback import install + from src.chat.logger.plan_reply_logger import PlanReplyLogger +from src.chat.message_receive.chat_manager import chat_manager as _chat_manager +from src.chat.message_receive.message import SessionMessage +from src.chat.planner_actions.action_manager import ActionManager +from src.chat.utils.utils import get_chat_type_and_target_info, is_bot_self from src.common.data_models.info_data_model import ActionPlannerInfo +from src.common.logger import get_logger +from src.config.config import global_config, model_config +from src.core.types import ActionActivationType, ActionInfo, ComponentType +from src.llm_models.utils_model import LLMRequest +from src.person_info.person_info import Person +from src.plugin_runtime.component_query import component_query_service from src.prompt.prompt_manager import prompt_manager from src.services.message_service import ( build_readable_messages_with_id, - replace_user_references, get_messages_before_time_in_chat, + replace_user_references, translate_pid_to_description, ) -from src.chat.utils.utils import get_chat_type_and_target_info, is_bot_self -from src.chat.planner_actions.action_manager import ActionManager -from src.chat.message_receive.chat_manager import chat_manager as _chat_manager -from src.chat.message_receive.message import SessionMessage -from src.core.types import ActionActivationType, ActionInfo, ComponentType -from src.core.component_registry import component_registry -from src.person_info.person_info import Person if TYPE_CHECKING: from src.common.data_models.info_data_model import TargetPersonInfo @@ -634,7 +637,7 @@ class ActionPlanner: current_available_actions_dict = self.action_manager.get_using_actions() # 获取完整的动作信息 - all_registered_actions: Dict[str, ActionInfo] = component_registry.get_components_by_type( # type: ignore + all_registered_actions: Dict[str, ActionInfo] = component_query_service.get_components_by_type( # type: ignore ComponentType.ACTION ) current_available_actions = {} diff --git a/src/chat/tool_executor.py b/src/chat/tool_executor.py index d449f7a1..aa99fce8 100644 --- a/src/chat/tool_executor.py +++ b/src/chat/tool_executor.py @@ -1,22 +1,20 @@ -""" -工具执行器 +"""工具执行器。 独立的工具执行组件,可以直接输入聊天消息内容, 自动判断并执行相应的工具,返回结构化的工具执行结果。 - -从 src.plugin_system.core.tool_use 迁移,使用新的核心组件注册表。 """ +from typing import Any, Dict, List, Optional, Tuple + import hashlib import time -from typing import Any, Dict, List, Optional, Tuple from src.common.logger import get_logger from src.config.config import global_config, model_config from src.core.announcement_manager import global_announcement_manager -from src.core.component_registry import component_registry from src.llm_models.payload_content import ToolCall from src.llm_models.utils_model import LLMRequest +from src.plugin_runtime.component_query import component_query_service from src.prompt.prompt_manager import prompt_manager logger = get_logger("tool_use") @@ -89,7 +87,7 @@ class ToolExecutor: def _get_tool_definitions(self) -> List[Dict[str, Any]]: """获取 LLM 可用的工具定义列表""" - all_tools = component_registry.get_llm_available_tools() + all_tools = component_query_service.get_llm_available_tools() user_disabled_tools = global_announcement_manager.get_disabled_chat_tools(self.chat_id) return [info.get_llm_definition() for name, info in all_tools.items() if name not in user_disabled_tools] @@ -152,7 +150,7 @@ class ToolExecutor: function_args = tool_call.args or {} function_args["llm_called"] = True - executor = component_registry.get_tool_executor(function_name) + executor = component_query_service.get_tool_executor(function_name) if not executor: logger.warning(f"未知工具名称: {function_name}") return None diff --git a/src/core/component_registry.py b/src/core/component_registry.py deleted file mode 100644 index bb58682a..00000000 --- a/src/core/component_registry.py +++ /dev/null @@ -1,239 +0,0 @@ -""" -核心组件注册表 - -面向最终架构的组件管理: -- Action:注册 ActionInfo + 执行器(本地 callable 或 IPC 路由) -- Command:注册正则模式 + 执行器 -- Tool:注册工具定义 + 执行器 - -不依赖任何插件基类,组件执行器是纯 async callable。 -""" - -import re -from typing import Any, Awaitable, Callable, Dict, Optional, Pattern, Tuple - -from src.common.logger import get_logger -from src.core.types import ( - ActionInfo, - CommandInfo, - ComponentInfo, - ComponentType, - ToolInfo, -) - -logger = get_logger("component_registry") - -# 执行器类型 -ActionExecutor = Callable[..., Awaitable[Any]] -CommandExecutor = Callable[..., Awaitable[Tuple[bool, Optional[str], bool]]] -ToolExecutor = Callable[..., Awaitable[Any]] - - -class ComponentRegistry: - """核心组件注册表 - - 管理 action、command、tool 三类组件。 - 每个组件由「元信息 + 执行器」构成,执行器是 async callable, - 不需要继承任何基类。 - """ - - def __init__(self): - # Action 注册 - self._actions: Dict[str, ActionInfo] = {} - self._action_executors: Dict[str, ActionExecutor] = {} - self._default_actions: Dict[str, ActionInfo] = {} - - # Command 注册 - self._commands: Dict[str, CommandInfo] = {} - self._command_executors: Dict[str, CommandExecutor] = {} - self._command_patterns: Dict[Pattern, str] = {} - - # Tool 注册 - self._tools: Dict[str, ToolInfo] = {} - self._tool_executors: Dict[str, ToolExecutor] = {} - self._llm_available_tools: Dict[str, ToolInfo] = {} - - # 插件配置(plugin_name -> config dict) - self._plugin_configs: Dict[str, dict] = {} - - logger.info("核心组件注册表初始化完成") - - # ========== Action ========== - - def register_action( - self, - info: ActionInfo, - executor: ActionExecutor, - ) -> bool: - """注册 action - - Args: - info: action 元信息 - executor: 执行器,async callable - """ - name = info.name - if name in self._actions: - logger.warning(f"Action {name} 已存在,跳过注册") - return False - - self._actions[name] = info - self._action_executors[name] = executor - - if info.enabled: - self._default_actions[name] = info - - logger.debug(f"注册 Action: {name}") - return True - - def get_action_info(self, name: str) -> Optional[ActionInfo]: - return self._actions.get(name) - - def get_action_executor(self, name: str) -> Optional[ActionExecutor]: - return self._action_executors.get(name) - - def get_default_actions(self) -> Dict[str, ActionInfo]: - return self._default_actions.copy() - - def get_all_actions(self) -> Dict[str, ActionInfo]: - return self._actions.copy() - - def remove_action(self, name: str) -> bool: - if name not in self._actions: - return False - del self._actions[name] - self._action_executors.pop(name, None) - self._default_actions.pop(name, None) - logger.debug(f"移除 Action: {name}") - return True - - # ========== Command ========== - - def register_command( - self, - info: CommandInfo, - executor: CommandExecutor, - ) -> bool: - """注册 command""" - name = info.name - if name in self._commands: - logger.warning(f"Command {name} 已存在,跳过注册") - return False - - self._commands[name] = info - self._command_executors[name] = executor - - if info.enabled and info.command_pattern: - pattern = re.compile(info.command_pattern, re.IGNORECASE | re.DOTALL) - self._command_patterns[pattern] = name - - logger.debug(f"注册 Command: {name}") - return True - - def find_command_by_text(self, text: str) -> Optional[Tuple[CommandExecutor, dict, CommandInfo]]: - """根据文本查找匹配的命令 - - Returns: - (executor, matched_groups, command_info) 或 None - """ - candidates = [p for p in self._command_patterns if p.match(text)] - if not candidates: - return None - if len(candidates) > 1: - logger.warning(f"文本 '{text[:50]}' 匹配到多个命令模式,使用第一个") - pattern = candidates[0] - name = self._command_patterns[pattern] - return ( - self._command_executors[name], - pattern.match(text).groupdict(), # type: ignore - self._commands[name], - ) - - def remove_command(self, name: str) -> bool: - if name not in self._commands: - return False - del self._commands[name] - self._command_executors.pop(name, None) - self._command_patterns = {k: v for k, v in self._command_patterns.items() if v != name} - logger.debug(f"移除 Command: {name}") - return True - - # ========== Tool ========== - - def register_tool( - self, - info: ToolInfo, - executor: ToolExecutor, - ) -> bool: - """注册 tool""" - name = info.name - if name in self._tools: - logger.warning(f"Tool {name} 已存在,跳过注册") - return False - - self._tools[name] = info - self._tool_executors[name] = executor - - if info.enabled: - self._llm_available_tools[name] = info - - logger.debug(f"注册 Tool: {name}") - return True - - def get_tool_info(self, name: str) -> Optional[ToolInfo]: - return self._tools.get(name) - - def get_tool_executor(self, name: str) -> Optional[ToolExecutor]: - return self._tool_executors.get(name) - - def get_llm_available_tools(self) -> Dict[str, ToolInfo]: - return self._llm_available_tools.copy() - - def get_all_tools(self) -> Dict[str, ToolInfo]: - return self._tools.copy() - - def remove_tool(self, name: str) -> bool: - if name not in self._tools: - return False - del self._tools[name] - self._tool_executors.pop(name, None) - self._llm_available_tools.pop(name, None) - logger.debug(f"移除 Tool: {name}") - return True - - # ========== 通用查询 ========== - - def get_component_info(self, name: str, component_type: ComponentType) -> Optional[ComponentInfo]: - """获取组件元信息""" - match component_type: - case ComponentType.ACTION: - return self._actions.get(name) - case ComponentType.COMMAND: - return self._commands.get(name) - case ComponentType.TOOL: - return self._tools.get(name) - case _: - return None - - def get_components_by_type(self, component_type: ComponentType) -> Dict[str, ComponentInfo]: - """获取某类型的所有组件""" - match component_type: - case ComponentType.ACTION: - return dict(self._actions) - case ComponentType.COMMAND: - return dict(self._commands) - case ComponentType.TOOL: - return dict(self._tools) - case _: - return {} - - # ========== 插件配置 ========== - - def set_plugin_config(self, plugin_name: str, config: dict) -> None: - self._plugin_configs[plugin_name] = config - - def get_plugin_config(self, plugin_name: str) -> Optional[dict]: - return self._plugin_configs.get(plugin_name) - - -# 全局单例 -component_registry = ComponentRegistry() diff --git a/src/platform_io/manager.py b/src/platform_io/manager.py index cb5996b4..be03e35d 100644 --- a/src/platform_io/manager.py +++ b/src/platform_io/manager.py @@ -178,12 +178,6 @@ class PlatformIOManager: return self._receive_route_table - @property - def route_table(self) -> RouteTable: - """兼容旧接口,返回发送路由表。""" - - return self._send_route_table - @property def deduplicator(self) -> MessageDeduplicator: """返回管理器持有的入站去重器。 @@ -369,12 +363,6 @@ class PlatformIOManager: self._validate_binding_against_driver(binding, driver) self._receive_route_table.bind(binding) - def bind_route(self, binding: RouteBinding) -> None: - """兼容旧接口,默认同时绑定发送表和接收表。""" - - self.bind_send_route(binding) - self.bind_receive_route(binding) - def unbind_send_route(self, route_key: RouteKey, driver_id: Optional[str] = None) -> None: """移除发送路由绑定。 @@ -395,12 +383,6 @@ class PlatformIOManager: self._receive_route_table.unbind(route_key, driver_id) - def unbind_route(self, route_key: RouteKey, driver_id: Optional[str] = None) -> None: - """兼容旧接口,默认同时从发送表和接收表解绑。""" - - self.unbind_send_route(route_key, driver_id) - self.unbind_receive_route(route_key, driver_id) - def resolve_drivers(self, route_key: RouteKey) -> List[PlatformIODriver]: """解析某个路由键当前命中的全部发送驱动。 @@ -430,12 +412,6 @@ class PlatformIOManager: return [] return [fallback_driver] - def resolve_driver(self, route_key: RouteKey) -> Optional[PlatformIODriver]: - """兼容旧接口,返回首个命中的发送驱动。""" - - drivers = self.resolve_drivers(route_key) - return drivers[0] if drivers else None - @staticmethod def build_route_key_from_message(message: "SessionMessage") -> RouteKey: """根据 ``SessionMessage`` 构造路由键。 diff --git a/src/plugin_runtime/capabilities/core.py b/src/plugin_runtime/capabilities/core.py index def5f03d..9bb1755b 100644 --- a/src/plugin_runtime/capabilities/core.py +++ b/src/plugin_runtime/capabilities/core.py @@ -238,14 +238,14 @@ class RuntimeCoreCapabilityMixin: return {"success": False, "value": None, "error": str(e)} async def _cap_config_get_plugin(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: - from src.core.component_registry import component_registry as core_registry + from src.plugin_runtime.component_query import component_query_service plugin_name: str = args.get("plugin_name", plugin_id) key: str = args.get("key", "") default = args.get("default") try: - config = core_registry.get_plugin_config(plugin_name) + config = component_query_service.get_plugin_config(plugin_name) if config is None: return {"success": False, "value": default, "error": f"未找到插件 {plugin_name} 的配置"} @@ -258,11 +258,11 @@ class RuntimeCoreCapabilityMixin: return {"success": False, "value": default, "error": str(e)} async def _cap_config_get_all(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: - from src.core.component_registry import component_registry as core_registry + from src.plugin_runtime.component_query import component_query_service plugin_name: str = args.get("plugin_name", plugin_id) try: - config = core_registry.get_plugin_config(plugin_name) + config = component_query_service.get_plugin_config(plugin_name) if config is None: return {"success": True, "value": {}} return {"success": True, "value": config} diff --git a/src/plugin_runtime/capabilities/data.py b/src/plugin_runtime/capabilities/data.py index c4ae0a56..fdf8d898 100644 --- a/src/plugin_runtime/capabilities/data.py +++ b/src/plugin_runtime/capabilities/data.py @@ -648,10 +648,10 @@ class RuntimeDataCapabilityMixin: return {"success": False, "error": str(e)} async def _cap_tool_get_definitions(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: - from src.core.component_registry import component_registry as core_registry + from src.plugin_runtime.component_query import component_query_service try: - tools = core_registry.get_llm_available_tools() + tools = component_query_service.get_llm_available_tools() return { "success": True, "tools": [{"name": name, "definition": info.get_llm_definition()} for name, info in tools.items()], diff --git a/src/plugin_runtime/component_query.py b/src/plugin_runtime/component_query.py new file mode 100644 index 00000000..7d23d202 --- /dev/null +++ b/src/plugin_runtime/component_query.py @@ -0,0 +1,709 @@ +"""插件运行时统一组件查询服务。 + +该模块统一从插件运行时的 Host ComponentRegistry 中聚合只读视图, +供 HFC/PFC、Planner、ToolExecutor 和运行时能力层查询与调用。 +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Tuple + +from src.common.logger import get_logger +from src.core.types import ActionActivationType, ActionInfo, CommandInfo, ComponentInfo, ComponentType, ToolInfo +from src.llm_models.payload_content.tool_option import ToolParamType + +if TYPE_CHECKING: + from src.plugin_runtime.host.component_registry import ActionEntry, CommandEntry, ComponentEntry, ToolEntry + from src.plugin_runtime.host.supervisor import PluginSupervisor + from src.plugin_runtime.integration import PluginRuntimeManager + +logger = get_logger("plugin_runtime.component_query") + +ActionExecutor = Callable[..., Awaitable[Any]] +CommandExecutor = Callable[..., Awaitable[Tuple[bool, Optional[str], bool]]] +ToolExecutor = Callable[..., Awaitable[Any]] + +_HOST_COMPONENT_TYPE_MAP: Dict[ComponentType, str] = { + ComponentType.ACTION: "ACTION", + ComponentType.COMMAND: "COMMAND", + ComponentType.TOOL: "TOOL", +} +_TOOL_PARAM_TYPE_MAP: Dict[str, ToolParamType] = { + "string": ToolParamType.STRING, + "integer": ToolParamType.INTEGER, + "float": ToolParamType.FLOAT, + "boolean": ToolParamType.BOOLEAN, + "bool": ToolParamType.BOOLEAN, +} + + +class ComponentQueryService: + """插件运行时统一组件查询服务。 + + 该对象不维护独立状态,只读取插件系统中的注册结果。 + 所有注册、删除、配置写入等写操作都被显式禁用。 + """ + + @staticmethod + def _get_runtime_manager() -> "PluginRuntimeManager": + """获取插件运行时管理器单例。 + + Returns: + PluginRuntimeManager: 当前全局插件运行时管理器。 + """ + + from src.plugin_runtime.integration import get_plugin_runtime_manager + + return get_plugin_runtime_manager() + + def _iter_supervisors(self) -> list["PluginSupervisor"]: + """获取当前所有活跃的插件运行时监督器。 + + Returns: + list[PluginSupervisor]: 当前运行中的监督器列表。 + """ + + runtime_manager = self._get_runtime_manager() + return list(runtime_manager.supervisors) + + def _iter_component_entries( + self, + component_type: ComponentType, + *, + enabled_only: bool = True, + ) -> list[tuple["PluginSupervisor", "ComponentEntry"]]: + """遍历指定类型的全部组件条目。 + + Args: + component_type: 目标组件类型。 + enabled_only: 是否仅返回启用状态的组件。 + + Returns: + list[tuple[PluginSupervisor, ComponentEntry]]: ``(监督器, 组件条目)`` 列表。 + """ + + host_component_type = _HOST_COMPONENT_TYPE_MAP.get(component_type) + if host_component_type is None: + return [] + + collected_entries: list[tuple["PluginSupervisor", "ComponentEntry"]] = [] + for supervisor in self._iter_supervisors(): + for component in supervisor.component_registry.get_components_by_type( + host_component_type, + enabled_only=enabled_only, + ): + collected_entries.append((supervisor, component)) + return collected_entries + + @staticmethod + def _coerce_action_activation_type(raw_value: Any) -> ActionActivationType: + """规范化动作激活类型。 + + Args: + raw_value: 原始激活类型值。 + + Returns: + ActionActivationType: 规范化后的激活类型枚举。 + """ + + normalized_value = str(raw_value or "").strip().lower() + if normalized_value == ActionActivationType.NEVER.value: + return ActionActivationType.NEVER + if normalized_value == ActionActivationType.RANDOM.value: + return ActionActivationType.RANDOM + if normalized_value == ActionActivationType.KEYWORD.value: + return ActionActivationType.KEYWORD + return ActionActivationType.ALWAYS + + @staticmethod + def _coerce_float(value: Any, default: float = 0.0) -> float: + """将任意值安全转换为浮点数。 + + Args: + value: 待转换的输入值。 + default: 转换失败时返回的默认值。 + + Returns: + float: 转换后的浮点结果。 + """ + + try: + return float(value) + except (TypeError, ValueError): + return default + + @staticmethod + def _build_action_info(entry: "ActionEntry") -> ActionInfo: + """将运行时 Action 条目转换为核心动作信息。 + + Args: + entry: 插件运行时中的 Action 条目。 + + Returns: + ActionInfo: 供核心 Planner 使用的动作信息。 + """ + + metadata = dict(entry.metadata) + raw_action_parameters = metadata.get("action_parameters") + action_parameters = ( + { + str(param_name): str(param_description) + for param_name, param_description in raw_action_parameters.items() + } + if isinstance(raw_action_parameters, dict) + else {} + ) + action_require = [ + str(item) + for item in (metadata.get("action_require") or []) + if item is not None and str(item).strip() + ] + associated_types = [ + str(item) + for item in (metadata.get("associated_types") or []) + if item is not None and str(item).strip() + ] + activation_keywords = [ + str(item) + for item in (metadata.get("activation_keywords") or []) + if item is not None and str(item).strip() + ] + + return ActionInfo( + name=entry.name, + component_type=ComponentType.ACTION, + description=str(metadata.get("description", "") or ""), + enabled=bool(entry.enabled), + plugin_name=entry.plugin_id, + metadata=metadata, + action_parameters=action_parameters, + action_require=action_require, + associated_types=associated_types, + activation_type=ComponentQueryService._coerce_action_activation_type(metadata.get("activation_type")), + random_activation_probability=ComponentQueryService._coerce_float( + metadata.get("activation_probability"), + 0.0, + ), + activation_keywords=activation_keywords, + parallel_action=bool(metadata.get("parallel_action", False)), + ) + + @staticmethod + def _build_command_info(entry: "CommandEntry") -> CommandInfo: + """将运行时 Command 条目转换为核心命令信息。 + + Args: + entry: 插件运行时中的 Command 条目。 + + Returns: + CommandInfo: 供核心命令链使用的命令信息。 + """ + + metadata = dict(entry.metadata) + return CommandInfo( + name=entry.name, + component_type=ComponentType.COMMAND, + description=str(metadata.get("description", "") or ""), + enabled=bool(entry.enabled), + plugin_name=entry.plugin_id, + metadata=metadata, + command_pattern=str(metadata.get("command_pattern", "") or ""), + ) + + @staticmethod + def _coerce_tool_param_type(raw_value: Any) -> ToolParamType: + """规范化工具参数类型。 + + Args: + raw_value: 原始工具参数类型值。 + + Returns: + ToolParamType: 规范化后的工具参数类型。 + """ + + normalized_value = str(raw_value or "").strip().lower() + return _TOOL_PARAM_TYPE_MAP.get(normalized_value, ToolParamType.STRING) + + @staticmethod + def _build_tool_parameters(entry: "ToolEntry") -> list[tuple[str, ToolParamType, str, bool, list[str] | None]]: + """将运行时工具参数元数据转换为核心 ToolInfo 参数列表。 + + Args: + entry: 插件运行时中的 Tool 条目。 + + Returns: + list[tuple[str, ToolParamType, str, bool, list[str] | None]]: 转换后的参数列表。 + """ + + structured_parameters = entry.parameters if isinstance(entry.parameters, list) else [] + if not structured_parameters and isinstance(entry.parameters_raw, dict): + structured_parameters = [ + {"name": key, **value} + for key, value in entry.parameters_raw.items() + if isinstance(value, dict) + ] + + normalized_parameters: list[tuple[str, ToolParamType, str, bool, list[str] | None]] = [] + for parameter in structured_parameters: + if not isinstance(parameter, dict): + continue + + parameter_name = str(parameter.get("name", "") or "").strip() + if not parameter_name: + continue + + enum_values = parameter.get("enum") + normalized_enum_values = ( + [str(item) for item in enum_values if item is not None] + if isinstance(enum_values, list) + else None + ) + normalized_parameters.append( + ( + parameter_name, + ComponentQueryService._coerce_tool_param_type(parameter.get("param_type") or parameter.get("type")), + str(parameter.get("description", "") or ""), + bool(parameter.get("required", True)), + normalized_enum_values, + ) + ) + return normalized_parameters + + @staticmethod + def _build_tool_info(entry: "ToolEntry") -> ToolInfo: + """将运行时 Tool 条目转换为核心工具信息。 + + Args: + entry: 插件运行时中的 Tool 条目。 + + Returns: + ToolInfo: 供 ToolExecutor 与能力层使用的工具信息。 + """ + + return ToolInfo( + name=entry.name, + component_type=ComponentType.TOOL, + description=entry.description, + enabled=bool(entry.enabled), + plugin_name=entry.plugin_id, + metadata=dict(entry.metadata), + tool_parameters=ComponentQueryService._build_tool_parameters(entry), + tool_description=entry.description, + ) + + @staticmethod + def _log_duplicate_component(component_type: ComponentType, component_name: str) -> None: + """记录重复组件名称冲突。 + + Args: + component_type: 组件类型。 + component_name: 发生冲突的组件名称。 + """ + + logger.warning(f"检测到重复{component_type.value}名称 {component_name},将只保留首个匹配项") + + def _get_unique_component_entry( + self, + component_type: ComponentType, + name: str, + ) -> Optional[tuple["PluginSupervisor", "ComponentEntry"]]: + """按组件短名解析唯一条目。 + + Args: + component_type: 目标组件类型。 + name: 组件短名。 + + Returns: + Optional[tuple[PluginSupervisor, ComponentEntry]]: 唯一命中的组件条目。 + """ + + matched_entries = [ + (supervisor, entry) + for supervisor, entry in self._iter_component_entries(component_type) + if entry.name == name + ] + if not matched_entries: + return None + if len(matched_entries) > 1: + self._log_duplicate_component(component_type, name) + return matched_entries[0] + + def _collect_unique_component_infos( + self, + component_type: ComponentType, + ) -> Dict[str, ComponentInfo]: + """收集某类组件的唯一信息视图。 + + Args: + component_type: 目标组件类型。 + + Returns: + Dict[str, ComponentInfo]: 组件名到核心组件信息的映射。 + """ + + collected_components: Dict[str, ComponentInfo] = {} + for _supervisor, entry in self._iter_component_entries(component_type): + if entry.name in collected_components: + self._log_duplicate_component(component_type, entry.name) + continue + + if component_type == ComponentType.ACTION: + collected_components[entry.name] = self._build_action_info(entry) # type: ignore[arg-type] + elif component_type == ComponentType.COMMAND: + collected_components[entry.name] = self._build_command_info(entry) # type: ignore[arg-type] + elif component_type == ComponentType.TOOL: + collected_components[entry.name] = self._build_tool_info(entry) # type: ignore[arg-type] + return collected_components + + @staticmethod + def _extract_stream_id_from_action_kwargs(kwargs: Dict[str, Any]) -> str: + """从旧 ActionManager 参数中提取聊天流 ID。 + + Args: + kwargs: 旧动作执行器收到的关键字参数。 + + Returns: + str: 提取出的 ``stream_id``。 + """ + + chat_stream = kwargs.get("chat_stream") + if chat_stream is not None: + try: + return str(chat_stream.session_id) + except AttributeError: + pass + + return str(kwargs.get("stream_id", "") or "") + + @staticmethod + def _build_action_executor(supervisor: "PluginSupervisor", plugin_id: str, component_name: str) -> ActionExecutor: + """构造动作执行 RPC 闭包。 + + Args: + supervisor: 负责该组件的监督器。 + plugin_id: 插件 ID。 + component_name: 组件名称。 + + Returns: + ActionExecutor: 兼容旧 Planner 的异步执行器。 + """ + + async def _executor(**kwargs: Any) -> tuple[bool, str]: + """将核心动作调用桥接到插件运行时。 + + Args: + **kwargs: 旧 ActionManager 传入的上下文参数。 + + Returns: + tuple[bool, str]: ``(是否成功, 结果说明)``。 + """ + + invoke_args: Dict[str, Any] = {} + action_data = kwargs.get("action_data") + if isinstance(action_data, dict): + invoke_args.update(action_data) + + stream_id = ComponentQueryService._extract_stream_id_from_action_kwargs(kwargs) + invoke_args["action_data"] = action_data if isinstance(action_data, dict) else {} + invoke_args["stream_id"] = stream_id + invoke_args["chat_id"] = stream_id + invoke_args["reasoning"] = str(kwargs.get("action_reasoning", "") or "") + + if (thinking_id := kwargs.get("thinking_id")) is not None: + invoke_args["thinking_id"] = str(thinking_id) + if isinstance(kwargs.get("cycle_timers"), dict): + invoke_args["cycle_timers"] = kwargs["cycle_timers"] + if isinstance(kwargs.get("plugin_config"), dict): + invoke_args["plugin_config"] = kwargs["plugin_config"] + if isinstance(kwargs.get("log_prefix"), str): + invoke_args["log_prefix"] = kwargs["log_prefix"] + if isinstance(kwargs.get("shutting_down"), bool): + invoke_args["shutting_down"] = kwargs["shutting_down"] + + try: + response = await supervisor.invoke_plugin( + method="plugin.invoke_action", + plugin_id=plugin_id, + component_name=component_name, + args=invoke_args, + timeout_ms=30000, + ) + except Exception as exc: + logger.error(f"运行时 Action {plugin_id}.{component_name} 执行失败: {exc}", exc_info=True) + return False, str(exc) + + payload = response.payload if isinstance(response.payload, dict) else {} + success = bool(payload.get("success", False)) + result = payload.get("result") + if isinstance(result, (list, tuple)): + if len(result) >= 2: + return bool(result[0]), "" if result[1] is None else str(result[1]) + if len(result) == 1: + return bool(result[0]), "" + if success: + return True, "" if result is None else str(result) + return False, "" if result is None else str(result) + + return _executor + + @staticmethod + def _build_command_executor( + supervisor: "PluginSupervisor", + plugin_id: str, + component_name: str, + metadata: Dict[str, Any], + ) -> CommandExecutor: + """构造命令执行 RPC 闭包。 + + Args: + supervisor: 负责该组件的监督器。 + plugin_id: 插件 ID。 + component_name: 组件名称。 + metadata: 命令组件元数据。 + + Returns: + CommandExecutor: 兼容旧消息命令链的执行器。 + """ + + async def _executor(**kwargs: Any) -> tuple[bool, Optional[str], bool]: + """将核心命令调用桥接到插件运行时。 + + Args: + **kwargs: 命令执行上下文参数。 + + Returns: + tuple[bool, Optional[str], bool]: ``(是否成功, 返回文本, 是否拦截后续消息)``。 + """ + + message = kwargs.get("message") + matched_groups = kwargs.get("matched_groups") + plugin_config = kwargs.get("plugin_config") + invoke_args: Dict[str, Any] = { + "text": str(getattr(message, "processed_plain_text", "") or ""), + "stream_id": str(getattr(message, "session_id", "") or ""), + "matched_groups": matched_groups if isinstance(matched_groups, dict) else {}, + } + if isinstance(plugin_config, dict): + invoke_args["plugin_config"] = plugin_config + + try: + response = await supervisor.invoke_plugin( + method="plugin.invoke_command", + plugin_id=plugin_id, + component_name=component_name, + args=invoke_args, + timeout_ms=30000, + ) + except Exception as exc: + logger.error(f"运行时 Command {plugin_id}.{component_name} 执行失败: {exc}", exc_info=True) + return False, str(exc), True + + payload = response.payload if isinstance(response.payload, dict) else {} + success = bool(payload.get("success", False)) + result = payload.get("result") + intercept = bool(metadata.get("intercept_message_level", 0)) + response_text: Optional[str] + + if isinstance(result, (list, tuple)) and len(result) >= 3: + response_text = None if result[1] is None else str(result[1]) + intercept = bool(result[2]) + else: + response_text = None if result is None else str(result) + + return success, response_text, intercept + + return _executor + + @staticmethod + def _build_tool_executor(supervisor: "PluginSupervisor", plugin_id: str, component_name: str) -> ToolExecutor: + """构造工具执行 RPC 闭包。 + + Args: + supervisor: 负责该组件的监督器。 + plugin_id: 插件 ID。 + component_name: 组件名称。 + + Returns: + ToolExecutor: 兼容旧 ToolExecutor 的异步执行器。 + """ + + async def _executor(function_args: Dict[str, Any]) -> Any: + """将核心工具调用桥接到插件运行时。 + + Args: + function_args: 工具调用参数。 + + Returns: + Any: 插件工具返回结果;若结果不是字典,则会包装为 ``{"content": ...}``。 + """ + + try: + response = await supervisor.invoke_plugin( + method="plugin.invoke_tool", + plugin_id=plugin_id, + component_name=component_name, + args=function_args, + timeout_ms=30000, + ) + except Exception as exc: + logger.error(f"运行时 Tool {plugin_id}.{component_name} 执行失败: {exc}", exc_info=True) + return {"content": f"工具 {component_name} 执行失败: {exc}"} + + payload = response.payload if isinstance(response.payload, dict) else {} + result = payload.get("result") + if isinstance(result, dict): + return result + return {"content": "" if result is None else str(result)} + + return _executor + + def get_action_info(self, name: str) -> Optional[ActionInfo]: + """获取指定动作的信息。 + + Args: + name: 动作名称。 + + Returns: + Optional[ActionInfo]: 匹配到的动作信息。 + """ + + matched_entry = self._get_unique_component_entry(ComponentType.ACTION, name) + if matched_entry is None: + return None + _supervisor, entry = matched_entry + return self._build_action_info(entry) # type: ignore[arg-type] + + def get_action_executor(self, name: str) -> Optional[ActionExecutor]: + """获取指定动作的执行器。 + + Args: + name: 动作名称。 + + Returns: + Optional[ActionExecutor]: 运行时 RPC 执行闭包。 + """ + + matched_entry = self._get_unique_component_entry(ComponentType.ACTION, name) + if matched_entry is None: + return None + supervisor, entry = matched_entry + return self._build_action_executor(supervisor, entry.plugin_id, entry.name) + + def get_default_actions(self) -> Dict[str, ActionInfo]: + """获取当前默认启用的动作集合。 + + Returns: + Dict[str, ActionInfo]: 动作名到动作信息的映射。 + """ + + action_infos = self._collect_unique_component_infos(ComponentType.ACTION) + return {name: info for name, info in action_infos.items() if isinstance(info, ActionInfo) and info.enabled} + + def find_command_by_text(self, text: str) -> Optional[Tuple[CommandExecutor, dict, CommandInfo]]: + """根据文本查找匹配的命令。 + + Args: + text: 待匹配的文本内容。 + + Returns: + Optional[Tuple[CommandExecutor, dict, CommandInfo]]: 匹配结果。 + """ + + for supervisor in self._iter_supervisors(): + match_result = supervisor.component_registry.find_command_by_text(text) + if match_result is None: + continue + + entry, matched_groups = match_result + command_info = self._build_command_info(entry) # type: ignore[arg-type] + command_executor = self._build_command_executor( + supervisor, + entry.plugin_id, + entry.name, + dict(entry.metadata), + ) + return command_executor, matched_groups, command_info + return None + + def get_tool_info(self, name: str) -> Optional[ToolInfo]: + """获取指定工具的信息。 + + Args: + name: 工具名称。 + + Returns: + Optional[ToolInfo]: 匹配到的工具信息。 + """ + + matched_entry = self._get_unique_component_entry(ComponentType.TOOL, name) + if matched_entry is None: + return None + _supervisor, entry = matched_entry + return self._build_tool_info(entry) # type: ignore[arg-type] + + def get_tool_executor(self, name: str) -> Optional[ToolExecutor]: + """获取指定工具的执行器。 + + Args: + name: 工具名称。 + + Returns: + Optional[ToolExecutor]: 运行时 RPC 执行闭包。 + """ + + matched_entry = self._get_unique_component_entry(ComponentType.TOOL, name) + if matched_entry is None: + return None + supervisor, entry = matched_entry + return self._build_tool_executor(supervisor, entry.plugin_id, entry.name) + + def get_llm_available_tools(self) -> Dict[str, ToolInfo]: + """获取当前可供 LLM 选择的工具集合。 + + Returns: + Dict[str, ToolInfo]: 工具名到工具信息的映射。 + """ + + tool_infos = self._collect_unique_component_infos(ComponentType.TOOL) + return {name: info for name, info in tool_infos.items() if isinstance(info, ToolInfo) and info.enabled} + + def get_components_by_type(self, component_type: ComponentType) -> Dict[str, ComponentInfo]: + """获取某类组件的全部信息。 + + Args: + component_type: 组件类型。 + + Returns: + Dict[str, ComponentInfo]: 组件名到组件信息的映射。 + """ + + return self._collect_unique_component_infos(component_type) + + def get_plugin_config(self, plugin_name: str) -> Optional[dict]: + """读取指定插件的配置文件内容。 + + Args: + plugin_name: 插件名称。 + + Returns: + Optional[dict]: 读取成功时返回配置字典;未找到时返回 ``None``。 + """ + + runtime_manager = self._get_runtime_manager() + try: + supervisor = runtime_manager._get_supervisor_for_plugin(plugin_name) + except RuntimeError as exc: + logger.error(f"读取插件配置失败: {exc}") + return None + + if supervisor is None: + return None + + try: + return runtime_manager._load_plugin_config_for_supervisor(supervisor, plugin_name) + except Exception as exc: + logger.error(f"读取插件 {plugin_name} 配置失败: {exc}", exc_info=True) + return None + + +component_query_service = ComponentQueryService() diff --git a/src/plugin_runtime/host/component_registry.py b/src/plugin_runtime/host/component_registry.py index 08b0ea3b..1c073490 100644 --- a/src/plugin_runtime/host/component_registry.py +++ b/src/plugin_runtime/host/component_registry.py @@ -31,12 +31,12 @@ class ComponentTypes(str, Enum): class StatusDict(TypedDict): total: int - ACTION: int - COMMAND: int - TOOL: int - EVENT_HANDLER: int - HOOK_HANDLER: int - MESSAGE_GATEWAY: int + action: int + command: int + tool: int + event_handler: int + hook_handler: int + message_gateway: int plugins: int @@ -185,6 +185,23 @@ class ComponentRegistry: # 按插件索引 self._by_plugin: Dict[str, List[ComponentEntry]] = {} + @staticmethod + def _normalize_component_type(component_type: str) -> ComponentTypes: + """规范化组件类型输入。 + + Args: + component_type: 原始组件类型字符串。 + + Returns: + ComponentTypes: 规范化后的组件类型枚举。 + + Raises: + ValueError: 当组件类型不受支持时抛出。 + """ + + normalized_value = str(component_type or "").strip().upper() + return ComponentTypes(normalized_value) + def clear(self) -> None: """清空全部组件注册状态。""" self._components.clear() @@ -205,18 +222,19 @@ class ComponentRegistry: success (bool): 是否成功注册(失败原因通常是组件类型无效) """ try: - if component_type == ComponentTypes.ACTION: - comp = ActionEntry(name, component_type, plugin_id, metadata) - elif component_type == ComponentTypes.COMMAND: - comp = CommandEntry(name, component_type, plugin_id, metadata) - elif component_type == ComponentTypes.TOOL: - comp = ToolEntry(name, component_type, plugin_id, metadata) - elif component_type == ComponentTypes.EVENT_HANDLER: - comp = EventHandlerEntry(name, component_type, plugin_id, metadata) - elif component_type == ComponentTypes.HOOK_HANDLER: - comp = HookHandlerEntry(name, component_type, plugin_id, metadata) - elif component_type == ComponentTypes.MESSAGE_GATEWAY: - comp = MessageGatewayEntry(name, component_type, plugin_id, metadata) + normalized_type = self._normalize_component_type(component_type) + if normalized_type == ComponentTypes.ACTION: + comp = ActionEntry(name, normalized_type.value, plugin_id, metadata) + elif normalized_type == ComponentTypes.COMMAND: + comp = CommandEntry(name, normalized_type.value, plugin_id, metadata) + elif normalized_type == ComponentTypes.TOOL: + comp = ToolEntry(name, normalized_type.value, plugin_id, metadata) + elif normalized_type == ComponentTypes.EVENT_HANDLER: + comp = EventHandlerEntry(name, normalized_type.value, plugin_id, metadata) + elif normalized_type == ComponentTypes.HOOK_HANDLER: + comp = HookHandlerEntry(name, normalized_type.value, plugin_id, metadata) + elif normalized_type == ComponentTypes.MESSAGE_GATEWAY: + comp = MessageGatewayEntry(name, normalized_type.value, plugin_id, metadata) else: raise ValueError(f"组件类型 {component_type} 不存在") except ValueError: @@ -304,6 +322,20 @@ class ComponentRegistry: comp.enabled = enabled return True + def set_component_enabled(self, full_name: str, enabled: bool, session_id: Optional[str] = None) -> bool: + """设置指定组件的启用状态。 + + Args: + full_name: 组件全名。 + enabled: 目标启用状态。 + session_id: 可选的会话 ID,仅对该会话生效。 + + Returns: + bool: 是否设置成功。 + """ + + return self.toggle_component_status(full_name, enabled, session_id=session_id) + def toggle_plugin_status(self, plugin_id: str, enabled: bool, session_id: Optional[str] = None) -> int: """批量启用或禁用某插件的所有组件。 @@ -348,7 +380,7 @@ class ComponentRegistry: components (List[ComponentEntry]): 组件条目列表 """ try: - comp_type = ComponentTypes(component_type) + comp_type = self._normalize_component_type(component_type) except ValueError: logger.error(f"组件类型 {component_type} 不存在") raise @@ -536,6 +568,6 @@ class ComponentRegistry: """ stats: StatusDict = {"total": len(self._components)} # type: ignore for comp_type, type_dict in self._by_type.items(): - stats[comp_type.value] = len(type_dict) + stats[comp_type.value.lower()] = len(type_dict) stats["plugins"] = len(self._by_plugin) return stats diff --git a/src/plugin_runtime/host/supervisor.py b/src/plugin_runtime/host/supervisor.py index 3588934e..4a9885f8 100644 --- a/src/plugin_runtime/host/supervisor.py +++ b/src/plugin_runtime/host/supervisor.py @@ -9,8 +9,6 @@ import sys from src.common.logger import get_logger from src.config.config import global_config -from src.core.component_registry import component_registry as core_component_registry -from src.core.types import ActionActivationType, ActionInfo, ComponentType as CoreComponentType from src.platform_io import DriverKind, InboundMessageEnvelope, RouteBinding, RouteKey, get_platform_io_manager from src.platform_io.drivers import PluginPlatformDriver from src.platform_io.route_key_factory import RouteKeyFactory @@ -107,7 +105,6 @@ class PluginRunnerSupervisor: self._runner_process: Optional[asyncio.subprocess.Process] = None self._registered_plugins: Dict[str, RegisterPluginPayload] = {} self._message_gateway_states: Dict[str, Dict[str, _MessageGatewayRuntimeState]] = {} - self._mirrored_core_actions: Dict[str, List[str]] = {} self._runner_ready_events: asyncio.Event = asyncio.Event() self._runner_ready_payloads: RunnerReadyPayload = RunnerReadyPayload() self._health_task: Optional[asyncio.Task[None]] = None @@ -510,7 +507,6 @@ class PluginRunnerSupervisor: except Exception as exc: return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc)) - self._remove_core_action_mirrors(payload.plugin_id) self._component_registry.remove_components_by_plugin(payload.plugin_id) await self._unregister_all_message_gateway_drivers_for_plugin(payload.plugin_id) @@ -520,7 +516,6 @@ class PluginRunnerSupervisor: ) self._registered_plugins[payload.plugin_id] = payload self._message_gateway_states[payload.plugin_id] = {} - self._mirror_runtime_actions_to_core_registry(payload) return envelope.make_response( payload={ @@ -550,7 +545,6 @@ class PluginRunnerSupervisor: removed_components = self._component_registry.remove_components_by_plugin(payload.plugin_id) self._authorization.revoke_permission_token(payload.plugin_id) removed_registration = self._registered_plugins.pop(payload.plugin_id, None) is not None - self._remove_core_action_mirrors(payload.plugin_id) await self._unregister_all_message_gateway_drivers_for_plugin(payload.plugin_id) self._message_gateway_states.pop(payload.plugin_id, None) @@ -564,236 +558,6 @@ class PluginRunnerSupervisor: } ) - @staticmethod - def _coerce_action_activation_type(raw_value: Any) -> ActionActivationType: - """将运行时 Action 激活类型转换为旧核心枚举。 - - Args: - raw_value: 插件运行时声明中的激活类型值。 - - Returns: - ActionActivationType: 可供旧 Planner 使用的激活类型枚举。 - """ - normalized_value = str(raw_value or ActionActivationType.ALWAYS.value).strip().lower() - try: - return ActionActivationType(normalized_value) - except ValueError: - return ActionActivationType.ALWAYS - - @staticmethod - def _coerce_float(value: Any, default: float = 0.0) -> float: - """将任意输入尽量转换为浮点数。 - - Args: - value: 待转换的值。 - default: 转换失败时使用的默认值。 - - Returns: - float: 转换结果。 - """ - try: - return float(value) - except (TypeError, ValueError): - return default - - @staticmethod - def _build_core_action_info(plugin_id: str, component_name: str, metadata: Dict[str, Any]) -> ActionInfo: - """将运行时 Action 元数据映射为旧核心 ActionInfo。 - - Args: - plugin_id: 插件 ID。 - component_name: 组件名称。 - metadata: 运行时组件元数据。 - - Returns: - ActionInfo: 兼容旧 Planner 的动作定义。 - """ - activation_keywords = [ - str(item) - for item in (metadata.get("activation_keywords") or []) - if item is not None and str(item).strip() - ] - action_require = [ - str(item) - for item in (metadata.get("action_require") or []) - if item is not None and str(item).strip() - ] - associated_types = [ - str(item) - for item in (metadata.get("associated_types") or []) - if item is not None and str(item).strip() - ] - raw_action_parameters = metadata.get("action_parameters") or {} - action_parameters = { - str(param_name): str(param_description) - for param_name, param_description in raw_action_parameters.items() - } if isinstance(raw_action_parameters, dict) else {} - - return ActionInfo( - name=component_name, - component_type=CoreComponentType.ACTION, - description=str(metadata.get("description", "") or ""), - enabled=bool(metadata.get("enabled", True)), - plugin_name=plugin_id, - metadata=dict(metadata), - action_parameters=action_parameters, - action_require=action_require, - associated_types=associated_types, - activation_type=PluginRunnerSupervisor._coerce_action_activation_type(metadata.get("activation_type")), - random_activation_probability=PluginRunnerSupervisor._coerce_float( - metadata.get("activation_probability"), - 0.0, - ), - activation_keywords=activation_keywords, - parallel_action=bool(metadata.get("parallel_action", False)), - ) - - @staticmethod - def _extract_stream_id_from_action_kwargs(kwargs: Dict[str, Any]) -> str: - """从旧 ActionManager 传入参数中提取聊天流 ID。 - - Args: - kwargs: 旧动作执行器收到的关键字参数。 - - Returns: - str: 可用于新运行时 Action 的 ``stream_id``。 - """ - chat_stream = kwargs.get("chat_stream") - if chat_stream is not None: - try: - return str(chat_stream.session_id) - except AttributeError: - pass - - raw_stream_id = kwargs.get("stream_id", "") - return str(raw_stream_id or "") - - def _build_runtime_action_executor( - self, - plugin_id: str, - component_name: str, - ) -> Any: - """构造一个转发到 plugin runtime 的旧核心 Action 执行器。 - - Args: - plugin_id: 目标插件 ID。 - component_name: 目标 Action 组件名称。 - - Returns: - Callable[..., Coroutine[Any, Any, tuple[bool, str]]]: 兼容旧 ActionManager 的执行器。 - """ - - async def _executor(**kwargs: Any) -> tuple[bool, str]: - """将旧 Planner 的动作调用桥接到 plugin runtime。 - - Args: - **kwargs: 旧 ActionManager 传入的运行时上下文参数。 - - Returns: - tuple[bool, str]: ``(是否成功, 动作说明)``。 - """ - invoke_args: Dict[str, Any] = {} - action_data = kwargs.get("action_data") - if isinstance(action_data, dict): - invoke_args.update(action_data) - - stream_id = self._extract_stream_id_from_action_kwargs(kwargs) - invoke_args["action_data"] = action_data if isinstance(action_data, dict) else {} - invoke_args["stream_id"] = stream_id - invoke_args["chat_id"] = stream_id - invoke_args["reasoning"] = str(kwargs.get("action_reasoning", "") or "") - - thinking_id = kwargs.get("thinking_id") - if thinking_id is not None: - invoke_args["thinking_id"] = str(thinking_id) - - cycle_timers = kwargs.get("cycle_timers") - if isinstance(cycle_timers, dict): - invoke_args["cycle_timers"] = cycle_timers - - plugin_config = kwargs.get("plugin_config") - if isinstance(plugin_config, dict): - invoke_args["plugin_config"] = plugin_config - - log_prefix = kwargs.get("log_prefix") - if isinstance(log_prefix, str): - invoke_args["log_prefix"] = log_prefix - - shutting_down = kwargs.get("shutting_down") - if isinstance(shutting_down, bool): - invoke_args["shutting_down"] = shutting_down - - try: - response = await self.invoke_plugin( - method="plugin.invoke_action", - plugin_id=plugin_id, - component_name=component_name, - args=invoke_args, - timeout_ms=30000, - ) - except Exception as exc: - logger.error(f"运行时 Action {plugin_id}.{component_name} 执行失败: {exc}", exc_info=True) - return False, str(exc) - - payload = response.payload if isinstance(response.payload, dict) else {} - success = bool(payload.get("success", False)) - result = payload.get("result") - - if isinstance(result, (list, tuple)): - if len(result) >= 2: - return bool(result[0]), "" if result[1] is None else str(result[1]) - if len(result) == 1: - return bool(result[0]), "" - - if success: - return True, "" if result is None else str(result) - return False, "" if result is None else str(result) - - return _executor - - def _mirror_runtime_actions_to_core_registry(self, payload: RegisterPluginPayload) -> None: - """将 plugin runtime 中声明的 Action 镜像到旧核心注册表。 - - Args: - payload: 当前插件的注册载荷。 - """ - mirrored_action_names: List[str] = [] - - for component in payload.components: - if str(component.component_type).upper() != CoreComponentType.ACTION.name: - continue - - action_info = self._build_core_action_info( - plugin_id=payload.plugin_id, - component_name=component.name, - metadata=component.metadata, - ) - action_executor = self._build_runtime_action_executor( - plugin_id=payload.plugin_id, - component_name=component.name, - ) - registered = core_component_registry.register_action(action_info, action_executor) - if not registered: - logger.warning( - f"运行时 Action {payload.plugin_id}.{component.name} 无法镜像到旧核心注册表," - "可能与现有 Action 重名" - ) - continue - mirrored_action_names.append(component.name) - - if mirrored_action_names: - self._mirrored_core_actions[payload.plugin_id] = mirrored_action_names - - def _remove_core_action_mirrors(self, plugin_id: str) -> None: - """移除某个插件镜像到旧核心注册表的所有 Action。 - - Args: - plugin_id: 目标插件 ID。 - """ - mirrored_action_names = self._mirrored_core_actions.pop(plugin_id, []) - for action_name in mirrored_action_names: - core_component_registry.remove_action(action_name) - @staticmethod def _build_message_gateway_driver_id(plugin_id: str, gateway_name: str) -> str: """构造消息网关驱动 ID。 @@ -1407,8 +1171,6 @@ class PluginRunnerSupervisor: def _clear_runner_state(self) -> None: """清理当前 Runner 对应的 Host 侧注册状态。""" - for plugin_id in list(self._mirrored_core_actions.keys()): - self._remove_core_action_mirrors(plugin_id) self._authorization.clear() self._component_registry.clear() self._registered_plugins.clear() diff --git a/src/services/send_service.py b/src/services/send_service.py index 7903cdeb..54f2a9de 100644 --- a/src/services/send_service.py +++ b/src/services/send_service.py @@ -8,10 +8,9 @@ 3. 具体走插件链还是 legacy 旧链,由 Platform IO 内部统一决策。 """ +from copy import deepcopy from typing import Any, Dict, List, Optional -from maim_message import Seg - import asyncio import base64 import hashlib @@ -28,6 +27,7 @@ from src.common.data_models.message_component_data_model import ( AtComponent, DictComponent, EmojiComponent, + ForwardNodeComponent, ImageComponent, MessageSequence, ReplyComponent, @@ -72,88 +72,163 @@ def _inherit_platform_io_route_metadata(target_stream: BotChatSession) -> Dict[s if normalized_value: inherited_metadata[key] = value - if target_stream.group_id: - normalized_group_id = str(target_stream.group_id).strip() - if normalized_group_id: - inherited_metadata["platform_io_target_group_id"] = normalized_group_id + if target_stream.group_id and (normalized_group_id := str(target_stream.group_id).strip()): + inherited_metadata["platform_io_target_group_id"] = normalized_group_id - if target_stream.user_id: - normalized_user_id = str(target_stream.user_id).strip() - if normalized_user_id: - inherited_metadata["platform_io_target_user_id"] = normalized_user_id + if target_stream.user_id and (normalized_user_id := str(target_stream.user_id).strip()): + inherited_metadata["platform_io_target_user_id"] = normalized_user_id return inherited_metadata -def _build_component_from_seg(message_segment: Seg) -> StandardMessageComponents: - """将单个消息段转换为内部消息组件。 +def _build_binary_component_from_base64(component_type: str, raw_data: str) -> StandardMessageComponents: + """根据 Base64 数据构造二进制消息组件。 Args: - message_segment: 待转换的消息段。 + component_type: 组件类型名称。 + raw_data: Base64 编码后的二进制数据。 Returns: StandardMessageComponents: 转换后的内部消息组件。 + + Raises: + ValueError: 当组件类型不受支持时抛出。 """ - segment_type = str(message_segment.type or "").strip().lower() - segment_data = message_segment.data + binary_data = base64.b64decode(raw_data) + binary_hash = hashlib.sha256(binary_data).hexdigest() - if segment_type == "text": - return TextComponent(text=str(segment_data or "")) - - if segment_type == "image": - image_binary = base64.b64decode(str(segment_data or "")) - return ImageComponent( - binary_hash=hashlib.sha256(image_binary).hexdigest(), - binary_data=image_binary, - ) - - if segment_type == "emoji": - emoji_binary = base64.b64decode(str(segment_data or "")) - return EmojiComponent( - binary_hash=hashlib.sha256(emoji_binary).hexdigest(), - binary_data=emoji_binary, - ) - - if segment_type == "voice": - voice_binary = base64.b64decode(str(segment_data or "")) - return VoiceComponent( - binary_hash=hashlib.sha256(voice_binary).hexdigest(), - binary_data=voice_binary, - ) - - if segment_type == "at": - return AtComponent(target_user_id=str(segment_data or "")) - - if segment_type == "reply": - return ReplyComponent(target_message_id=str(segment_data or "")) - - if segment_type == "dict" and isinstance(segment_data, dict): - return DictComponent(data=segment_data) - - return DictComponent(data={"type": segment_type, "data": segment_data}) + if component_type == "image": + return ImageComponent(binary_hash=binary_hash, binary_data=binary_data) + if component_type == "emoji": + return EmojiComponent(binary_hash=binary_hash, binary_data=binary_data) + if component_type == "voice": + return VoiceComponent(binary_hash=binary_hash, binary_data=binary_data) + raise ValueError(f"不支持的二进制组件类型: {component_type}") -def _build_message_sequence_from_seg(message_segment: Seg) -> MessageSequence: - """将消息段转换为内部消息组件序列。 +def _build_message_sequence_from_custom_message( + message_type: str, + content: str | Dict[str, Any], +) -> MessageSequence: + """根据自定义消息类型构造内部消息组件序列。 Args: - message_segment: 待转换的消息段。 + message_type: 自定义消息类型。 + content: 自定义消息内容。 Returns: MessageSequence: 转换后的消息组件序列。 """ - if str(message_segment.type or "").strip().lower() == "seglist": - raw_segments = message_segment.data - if not isinstance(raw_segments, list): - raise ValueError("seglist 类型的消息段数据必须是列表") - components = [ - _build_component_from_seg(item) - for item in raw_segments - if isinstance(item, Seg) - ] - return MessageSequence(components=components) + normalized_type = message_type.strip().lower() - return MessageSequence(components=[_build_component_from_seg(message_segment)]) + if normalized_type == "text": + return MessageSequence(components=[TextComponent(text=str(content))]) + + if normalized_type in {"image", "emoji", "voice"}: + return MessageSequence( + components=[_build_binary_component_from_base64(normalized_type, str(content))] + ) + + if normalized_type == "at": + return MessageSequence(components=[AtComponent(target_user_id=str(content))]) + + if normalized_type == "reply": + return MessageSequence(components=[ReplyComponent(target_message_id=str(content))]) + + if normalized_type == "dict" and isinstance(content, dict): + return MessageSequence(components=[DictComponent(data=deepcopy(content))]) + + return MessageSequence( + components=[ + DictComponent( + data={ + "type": normalized_type, + "data": deepcopy(content), + } + ) + ] + ) + + +def _clone_message_sequence(message_sequence: MessageSequence) -> MessageSequence: + """复制消息组件序列,避免原对象被发送流程修改。 + + Args: + message_sequence: 原始消息组件序列。 + + Returns: + MessageSequence: 深拷贝后的消息组件序列。 + """ + return deepcopy(message_sequence) + + +def _detect_outbound_message_flags(message_sequence: MessageSequence) -> Dict[str, bool]: + """根据消息组件序列推断出站消息标记。 + + Args: + message_sequence: 待发送的消息组件序列。 + + Returns: + Dict[str, bool]: 包含 ``is_emoji``、``is_picture``、``is_command`` 的标记字典。 + """ + if len(message_sequence.components) != 1: + return { + "is_emoji": False, + "is_picture": False, + "is_command": False, + } + + component = message_sequence.components[0] + is_command = False + if isinstance(component, DictComponent) and isinstance(component.data, dict): + is_command = str(component.data.get("type") or "").strip().lower() == "command" + + return { + "is_emoji": isinstance(component, EmojiComponent), + "is_picture": isinstance(component, ImageComponent), + "is_command": is_command, + } + + +def _describe_message_sequence(message_sequence: MessageSequence) -> str: + """生成消息组件序列的简短描述文本。 + + Args: + message_sequence: 待描述的消息组件序列。 + + Returns: + str: 适用于日志的简短类型描述。 + """ + if len(message_sequence.components) != 1: + return "message_sequence" + + component = message_sequence.components[0] + if isinstance(component, DictComponent) and isinstance(component.data, dict): + custom_type = str(component.data.get("type") or "").strip() + return custom_type or "dict" + + if isinstance(component, TextComponent): + return component.format_name + + if isinstance(component, ImageComponent): + return component.format_name + + if isinstance(component, EmojiComponent): + return component.format_name + + if isinstance(component, VoiceComponent): + return component.format_name + + if isinstance(component, AtComponent): + return component.format_name + + if isinstance(component, ReplyComponent): + return component.format_name + + if isinstance(component, ForwardNodeComponent): + return component.format_name + + return "unknown" def _build_processed_plain_text(message: SessionMessage) -> str: @@ -204,7 +279,7 @@ def _build_processed_plain_text(message: SessionMessage) -> str: def _build_outbound_session_message( - message_segment: Seg, + message_sequence: MessageSequence, stream_id: str, display_message: str = "", reply_message: Optional[MaiMessage] = None, @@ -213,7 +288,7 @@ def _build_outbound_session_message( """根据目标会话构建待发送的内部消息对象。 Args: - message_segment: 待发送的消息段。 + message_sequence: 待发送的消息组件序列。 stream_id: 目标会话 ID。 display_message: 用于界面展示的文本内容。 reply_message: 被回复的锚点消息。 @@ -268,13 +343,14 @@ def _build_outbound_session_message( group_info=group_info, additional_config=additional_config, ) - outbound_message.raw_message = _build_message_sequence_from_seg(message_segment) + outbound_message.raw_message = _clone_message_sequence(message_sequence) outbound_message.session_id = target_stream.session_id outbound_message.display_message = display_message outbound_message.reply_to = anchor_message.message_id if anchor_message is not None else None - outbound_message.is_emoji = message_segment.type == "emoji" - outbound_message.is_picture = message_segment.type == "image" - outbound_message.is_command = message_segment.type == "command" + message_flags = _detect_outbound_message_flags(outbound_message.raw_message) + outbound_message.is_emoji = message_flags["is_emoji"] + outbound_message.is_picture = message_flags["is_picture"] + outbound_message.is_command = message_flags["is_command"] outbound_message.initialized = True return outbound_message @@ -467,7 +543,7 @@ async def send_session_message( async def _send_to_target( - message_segment: Seg, + message_sequence: MessageSequence, stream_id: str, display_message: str = "", typing: bool = False, @@ -480,7 +556,7 @@ async def _send_to_target( """向指定目标构建并发送消息。 Args: - message_segment: 待发送的消息段。 + message_sequence: 待发送的消息组件序列。 stream_id: 目标会话 ID。 display_message: 用于界面展示的文本内容。 typing: 是否显示输入中状态。 @@ -499,10 +575,10 @@ async def _send_to_target( return False if show_log: - logger.debug(f"[SendService] 发送{message_segment.type}消息到 {stream_id}") + logger.debug(f"[SendService] 发送{_describe_message_sequence(message_sequence)}消息到 {stream_id}") outbound_message = _build_outbound_session_message( - message_segment=message_segment, + message_sequence=message_sequence, stream_id=stream_id, display_message=display_message, reply_message=reply_message, @@ -555,7 +631,7 @@ async def text_to_stream( bool: 发送成功时返回 ``True``。 """ return await _send_to_target( - message_segment=Seg(type="text", data=text), + message_sequence=MessageSequence(components=[TextComponent(text=text)]), stream_id=stream_id, display_message="", typing=typing, @@ -586,7 +662,7 @@ async def emoji_to_stream( bool: 发送成功时返回 ``True``。 """ return await _send_to_target( - message_segment=Seg(type="emoji", data=emoji_base64), + message_sequence=_build_message_sequence_from_custom_message("emoji", emoji_base64), stream_id=stream_id, display_message="", typing=False, @@ -616,7 +692,7 @@ async def image_to_stream( bool: 发送成功时返回 ``True``。 """ return await _send_to_target( - message_segment=Seg(type="image", data=image_base64), + message_sequence=_build_message_sequence_from_custom_message("image", image_base64), stream_id=stream_id, display_message="", typing=False, @@ -654,7 +730,7 @@ async def custom_to_stream( bool: 发送成功时返回 ``True``。 """ return await _send_to_target( - message_segment=Seg(type=message_type, data=content), # type: ignore[arg-type] + message_sequence=_build_message_sequence_from_custom_message(message_type, content), stream_id=stream_id, display_message=display_message, typing=typing, @@ -688,28 +764,15 @@ async def custom_reply_set_to_stream( show_log: 是否输出发送日志。 Returns: - bool: 全部组件发送成功时返回 ``True``。 + bool: 发送成功时返回 ``True``。 """ - success = True - for component in reply_set.components: - if isinstance(component, DictComponent): - message_seg = Seg(type="dict", data=component.data) # type: ignore[arg-type] - else: - message_seg = await component.to_seg() - - status = await _send_to_target( - message_segment=message_seg, - stream_id=stream_id, - display_message=display_message, - typing=typing, - reply_message=reply_message, - set_reply=set_reply, - storage_message=storage_message, - show_log=show_log, - ) - if not status: - success = False - logger.error(f"[SendService] 发送消息组件失败,组件类型:{type(component).__name__}") - set_reply = False - - return success + return await _send_to_target( + message_sequence=reply_set, + stream_id=stream_id, + display_message=display_message, + typing=typing, + reply_message=reply_message, + set_reply=set_reply, + storage_message=storage_message, + show_log=show_log, + )