diff --git a/pytests/test_adapter_runtime_state.py b/pytests/test_adapter_runtime_state.py deleted file mode 100644 index e82f4c8c..00000000 --- a/pytests/test_adapter_runtime_state.py +++ /dev/null @@ -1,162 +0,0 @@ -"""适配器运行时状态同步测试。""" - -from typing import Any, Dict - -import pytest - -from src.platform_io.manager import PlatformIOManager -from src.platform_io.types import RouteKey -from src.plugin_runtime.host.supervisor import PluginSupervisor -from src.plugin_runtime.protocol.envelope import ( - AdapterDeclarationPayload, - Envelope, - MessageType, -) - - -def _make_request(plugin_id: str, payload: Dict[str, Any]) -> Envelope: - """构造一个适配器状态更新 RPC 请求。 - - Args: - plugin_id: 目标适配器插件 ID。 - payload: 请求载荷。 - - Returns: - Envelope: 标准 RPC 请求信封。 - """ - return Envelope( - request_id=1, - message_type=MessageType.REQUEST, - method="host.update_adapter_state", - plugin_id=plugin_id, - payload=payload, - ) - - -@pytest.mark.asyncio -async def test_adapter_runtime_state_binds_and_unbinds_routes(monkeypatch: pytest.MonkeyPatch) -> None: - """连接建立后应绑定路由,断开后应撤销路由。""" - import src.plugin_runtime.host.supervisor as supervisor_module - - platform_io_manager = PlatformIOManager() - monkeypatch.setattr(supervisor_module, "get_platform_io_manager", lambda: platform_io_manager) - - supervisor = PluginSupervisor(plugin_dirs=[]) - adapter = AdapterDeclarationPayload(platform="qq", protocol="napcat") - await supervisor._register_adapter_driver("napcat_adapter_builtin", adapter) - - response = await supervisor._handle_update_adapter_state( - _make_request( - "napcat_adapter_builtin", - { - "connected": True, - "account_id": "10001", - "scope": "", - "metadata": {}, - }, - ) - ) - - assert response.error is None - assert response.payload["accepted"] is True - assert ( - platform_io_manager.route_table.get_active_binding( - RouteKey(platform="qq", account_id="10001"), - exact_only=True, - ).driver_id - == "adapter:napcat_adapter_builtin" - ) - assert ( - platform_io_manager.route_table.get_active_binding( - RouteKey(platform="qq"), - exact_only=True, - ).driver_id - == "adapter:napcat_adapter_builtin" - ) - - response = await supervisor._handle_update_adapter_state( - _make_request( - "napcat_adapter_builtin", - { - "connected": False, - "account_id": "", - "scope": "", - "metadata": {}, - }, - ) - ) - - assert response.error is None - assert response.payload["accepted"] is True - assert platform_io_manager.route_table.get_active_binding( - RouteKey(platform="qq", account_id="10001"), - exact_only=True, - ) is None - assert platform_io_manager.route_table.get_active_binding(RouteKey(platform="qq"), exact_only=True) is None - - -@pytest.mark.asyncio -async def test_platform_default_route_is_removed_when_multiple_exact_routes_exist( - monkeypatch: pytest.MonkeyPatch, -) -> None: - """同一平台存在多个精确路由时不应保留默认平台路由。""" - import src.plugin_runtime.host.supervisor as supervisor_module - - platform_io_manager = PlatformIOManager() - monkeypatch.setattr(supervisor_module, "get_platform_io_manager", lambda: platform_io_manager) - - supervisor = PluginSupervisor(plugin_dirs=[]) - adapter = AdapterDeclarationPayload(platform="qq", protocol="napcat") - await supervisor._register_adapter_driver("adapter_a", adapter) - await supervisor._register_adapter_driver("adapter_b", adapter) - - await supervisor._handle_update_adapter_state( - _make_request( - "adapter_a", - { - "connected": True, - "account_id": "10001", - "scope": "", - "metadata": {}, - }, - ) - ) - assert ( - platform_io_manager.route_table.get_active_binding( - RouteKey(platform="qq"), - exact_only=True, - ).driver_id - == "adapter:adapter_a" - ) - - await supervisor._handle_update_adapter_state( - _make_request( - "adapter_b", - { - "connected": True, - "account_id": "10002", - "scope": "", - "metadata": {}, - }, - ) - ) - assert platform_io_manager.route_table.get_active_binding(RouteKey(platform="qq"), exact_only=True) is None - - await supervisor._handle_update_adapter_state( - _make_request( - "adapter_b", - { - "connected": False, - "account_id": "", - "scope": "", - "metadata": {}, - }, - ) - ) - assert ( - platform_io_manager.route_table.get_active_binding( - RouteKey(platform="qq"), - exact_only=True, - ).driver_id - == "adapter:adapter_a" - ) diff --git a/pytests/test_message_gateway_runtime.py b/pytests/test_message_gateway_runtime.py new file mode 100644 index 00000000..9650bc10 --- /dev/null +++ b/pytests/test_message_gateway_runtime.py @@ -0,0 +1,170 @@ +"""消息网关运行时状态同步测试。""" + +from typing import Any, Dict + +import pytest + +from src.platform_io.manager import PlatformIOManager +from src.platform_io.types import RouteKey +from src.plugin_runtime.host.supervisor import PluginSupervisor +from src.plugin_runtime.protocol.envelope import Envelope, MessageType + + +def _make_request(method: str, plugin_id: str, payload: Dict[str, Any]) -> Envelope: + """构造一个 RPC 请求信封。 + + Args: + method: RPC 方法名。 + plugin_id: 目标插件 ID。 + payload: 请求载荷。 + + Returns: + Envelope: 标准 RPC 请求信封。 + """ + + return Envelope( + request_id=1, + message_type=MessageType.REQUEST, + method=method, + plugin_id=plugin_id, + payload=payload, + ) + + +@pytest.mark.asyncio +async def test_message_gateway_runtime_state_binds_send_and_receive_routes( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """消息网关就绪后应同时绑定发送表和接收表。""" + + import src.plugin_runtime.host.supervisor as supervisor_module + + platform_io_manager = PlatformIOManager() + monkeypatch.setattr(supervisor_module, "get_platform_io_manager", lambda: platform_io_manager) + + supervisor = PluginSupervisor(plugin_dirs=[]) + register_response = await supervisor._handle_register_plugin( + _make_request( + "plugin.register_components", + "napcat_plugin", + { + "plugin_id": "napcat_plugin", + "plugin_version": "1.0.0", + "components": [ + { + "name": "napcat_gateway", + "component_type": "MESSAGE_GATEWAY", + "plugin_id": "napcat_plugin", + "metadata": { + "route_type": "duplex", + "platform": "qq", + "protocol": "napcat", + }, + } + ], + "capabilities_required": [], + }, + ) + ) + + assert register_response.error is None + response = await supervisor._handle_update_message_gateway_state( + _make_request( + "host.update_message_gateway_state", + "napcat_plugin", + { + "gateway_name": "napcat_gateway", + "ready": True, + "platform": "qq", + "account_id": "10001", + "scope": "primary", + "metadata": {}, + }, + ) + ) + + assert response.error is None + assert response.payload["accepted"] is True + + send_bindings = platform_io_manager.send_route_table.resolve_bindings( + RouteKey(platform="qq", account_id="10001", scope="primary") + ) + receive_bindings = platform_io_manager.receive_route_table.resolve_bindings( + RouteKey(platform="qq", account_id="10001", scope="primary") + ) + + assert [binding.driver_id for binding in send_bindings] == ["gateway:napcat_plugin:napcat_gateway"] + assert [binding.driver_id for binding in receive_bindings] == ["gateway:napcat_plugin:napcat_gateway"] + + +@pytest.mark.asyncio +async def test_message_gateway_runtime_state_unbinds_routes_when_not_ready( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """消息网关断开后应撤销发送表和接收表中的绑定。""" + + import src.plugin_runtime.host.supervisor as supervisor_module + + platform_io_manager = PlatformIOManager() + monkeypatch.setattr(supervisor_module, "get_platform_io_manager", lambda: platform_io_manager) + + supervisor = PluginSupervisor(plugin_dirs=[]) + await supervisor._handle_register_plugin( + _make_request( + "plugin.register_components", + "napcat_plugin", + { + "plugin_id": "napcat_plugin", + "plugin_version": "1.0.0", + "components": [ + { + "name": "napcat_gateway", + "component_type": "MESSAGE_GATEWAY", + "plugin_id": "napcat_plugin", + "metadata": { + "route_type": "duplex", + "platform": "qq", + "protocol": "napcat", + }, + } + ], + "capabilities_required": [], + }, + ) + ) + + await supervisor._handle_update_message_gateway_state( + _make_request( + "host.update_message_gateway_state", + "napcat_plugin", + { + "gateway_name": "napcat_gateway", + "ready": True, + "platform": "qq", + "account_id": "10001", + "scope": "primary", + "metadata": {}, + }, + ) + ) + response = await supervisor._handle_update_message_gateway_state( + _make_request( + "host.update_message_gateway_state", + "napcat_plugin", + { + "gateway_name": "napcat_gateway", + "ready": False, + "platform": "qq", + "account_id": "", + "scope": "", + "metadata": {}, + }, + ) + ) + + assert response.error is None + assert response.payload["accepted"] is True + assert platform_io_manager.send_route_table.resolve_bindings(RouteKey(platform="qq", account_id="10001")) == [] + assert ( + platform_io_manager.receive_route_table.resolve_bindings(RouteKey(platform="qq", account_id="10001")) == [] + ) diff --git a/pytests/test_platform_io_dedupe.py b/pytests/test_platform_io_dedupe.py index 4a3cbb44..68ae95c6 100644 --- a/pytests/test_platform_io_dedupe.py +++ b/pytests/test_platform_io_dedupe.py @@ -159,6 +159,51 @@ class TestPlatformIODedupe: session_message_envelope = _build_envelope(session_message_id="session-1") payload_only_envelope = _build_envelope(payload={"message": "hello"}) - assert PlatformIOManager._build_inbound_dedupe_key(explicit_envelope) == "qq:10001:main:dedupe-1" - assert PlatformIOManager._build_inbound_dedupe_key(session_message_envelope) == "qq:10001:main:session-1" + assert PlatformIOManager._build_inbound_dedupe_key(explicit_envelope) == "plugin.napcat:dedupe-1" + assert PlatformIOManager._build_inbound_dedupe_key(session_message_envelope) == "plugin.napcat:session-1" assert PlatformIOManager._build_inbound_dedupe_key(payload_only_envelope) is None + + @pytest.mark.asyncio + async def test_send_message_fans_out_to_all_matching_routes(self) -> None: + """同一路由命中多条发送链路时应全部发送。""" + + manager = PlatformIOManager() + first_driver = _StubPlatformIODriver( + DriverDescriptor( + driver_id="plugin.gateway_a", + kind=DriverKind.PLUGIN, + platform="qq", + ) + ) + second_driver = _StubPlatformIODriver( + DriverDescriptor( + driver_id="plugin.gateway_b", + kind=DriverKind.PLUGIN, + platform="qq", + ) + ) + manager.register_driver(first_driver) + manager.register_driver(second_driver) + manager.bind_send_route( + RouteBinding( + route_key=RouteKey(platform="qq"), + driver_id=first_driver.driver_id, + driver_kind=first_driver.descriptor.kind, + ) + ) + manager.bind_send_route( + RouteBinding( + route_key=RouteKey(platform="qq"), + driver_id=second_driver.driver_id, + driver_kind=second_driver.descriptor.kind, + ) + ) + + message = SimpleNamespace(message_id="internal-msg-1") + result = await manager.send_message(message, RouteKey(platform="qq")) + + assert result.has_success is True + assert [receipt.driver_id for receipt in result.sent_receipts] == [ + "plugin.gateway_a", + "plugin.gateway_b", + ] diff --git a/pytests/test_plugin_runtime_action_bridge.py b/pytests/test_plugin_runtime_action_bridge.py new file mode 100644 index 00000000..f2364094 --- /dev/null +++ b/pytests/test_plugin_runtime_action_bridge.py @@ -0,0 +1,138 @@ +from types import SimpleNamespace +from typing import Any + +import pytest + +from src.core.component_registry import component_registry as core_component_registry +from src.plugin_runtime.host.supervisor import PluginSupervisor +from src.plugin_runtime.protocol.envelope import ComponentDeclaration, RegisterPluginPayload + + +def _build_action_payload(plugin_id: str, action_name: str) -> RegisterPluginPayload: + """构造用于测试的 runtime Action 注册载荷。 + + Args: + plugin_id: 插件 ID。 + action_name: Action 名称。 + + Returns: + RegisterPluginPayload: 测试用注册载荷。 + """ + return RegisterPluginPayload( + plugin_id=plugin_id, + plugin_version="1.0.0", + components=[ + ComponentDeclaration( + name=action_name, + component_type="ACTION", + plugin_id=plugin_id, + metadata={ + "description": "发送一个测试回复", + "enabled": True, + "activation_type": "keyword", + "activation_probability": 0.25, + "activation_keywords": ["测试", "hello"], + "action_parameters": {"target": "目标对象"}, + "action_require": ["需要发送回复时使用"], + "associated_types": ["text"], + "parallel_action": True, + }, + ) + ], + ) + + +@pytest.mark.asyncio +async def test_runtime_actions_are_mirrored_into_core_registry_and_invoked(monkeypatch: pytest.MonkeyPatch) -> None: + """运行时 Action 应镜像到旧核心注册表,并可由旧 Planner 执行。""" + plugin_id = "runtime_action_bridge_plugin" + action_name = "runtime_action_bridge_test" + payload = _build_action_payload(plugin_id=plugin_id, action_name=action_name) + supervisor = PluginSupervisor(plugin_dirs=[]) + captured: dict[str, Any] = {} + + core_component_registry.remove_action(action_name) + + async def fake_invoke_plugin( + method: str, + plugin_id: str, + component_name: str, + args: dict[str, Any] | None = None, + timeout_ms: int = 30000, + ) -> Any: + """模拟 plugin runtime Action 调用。 + + Args: + method: RPC 方法名。 + plugin_id: 插件 ID。 + component_name: 组件名称。 + args: 调用参数。 + timeout_ms: RPC 超时时间。 + + Returns: + Any: 伪造的 RPC 响应对象。 + """ + captured["method"] = method + captured["plugin_id"] = plugin_id + captured["component_name"] = component_name + captured["args"] = args or {} + captured["timeout_ms"] = timeout_ms + return SimpleNamespace(payload={"success": True, "result": (True, "runtime action executed")}) + + monkeypatch.setattr(supervisor, "invoke_plugin", fake_invoke_plugin) + + try: + supervisor._mirror_runtime_actions_to_core_registry(payload) + + action_info = core_component_registry.get_action_info(action_name) + assert action_info is not None + assert action_info.plugin_name == plugin_id + assert action_info.description == "发送一个测试回复" + assert action_info.activation_keywords == ["测试", "hello"] + assert action_info.random_activation_probability == 0.25 + assert action_info.parallel_action is True + + executor = core_component_registry.get_action_executor(action_name) + assert executor is not None + + success, reason = await executor( + action_data={"target": "MaiBot"}, + action_reasoning="当前适合使用这个动作", + cycle_timers={"planner": 0.1}, + thinking_id="tid-1", + chat_stream=SimpleNamespace(session_id="stream-1"), + log_prefix="[test]", + shutting_down=False, + plugin_config={"enabled": True}, + ) + + assert success is True + assert reason == "runtime action executed" + assert captured["method"] == "plugin.invoke_action" + assert captured["plugin_id"] == plugin_id + assert captured["component_name"] == action_name + assert captured["args"]["stream_id"] == "stream-1" + assert captured["args"]["chat_id"] == "stream-1" + assert captured["args"]["reasoning"] == "当前适合使用这个动作" + assert captured["args"]["target"] == "MaiBot" + assert captured["args"]["action_data"] == {"target": "MaiBot"} + finally: + supervisor._remove_core_action_mirrors(plugin_id) + core_component_registry.remove_action(action_name) + + +def test_clear_runner_state_removes_mirrored_runtime_actions() -> None: + """清理 Runner 状态时应同步移除旧核心注册表中的镜像 Action。""" + plugin_id = "runtime_action_bridge_cleanup_plugin" + action_name = "runtime_action_bridge_cleanup_test" + payload = _build_action_payload(plugin_id=plugin_id, action_name=action_name) + supervisor = PluginSupervisor(plugin_dirs=[]) + + core_component_registry.remove_action(action_name) + + supervisor._mirror_runtime_actions_to_core_registry(payload) + assert core_component_registry.get_action_info(action_name) is not None + + supervisor._clear_runner_state() + + assert core_component_registry.get_action_info(action_name) is None diff --git a/src/chat/message_receive/uni_message_sender.py b/src/chat/message_receive/uni_message_sender.py index 17d5d6d5..df74e459 100644 --- a/src/chat/message_receive/uni_message_sender.py +++ b/src/chat/message_receive/uni_message_sender.py @@ -125,23 +125,27 @@ async def _send_message(message: SessionMessage, show_log: bool = True) -> bool: return True try: - from src.platform_io import DeliveryStatus from src.plugin_runtime.integration import get_plugin_runtime_manager - receipt = await get_plugin_runtime_manager().try_send_message_via_platform_io(message) - if receipt is not None: - if receipt.status == DeliveryStatus.SENT: + delivery_batch = await get_plugin_runtime_manager().try_send_message_via_platform_io(message) + if delivery_batch is not None: + if delivery_batch.has_success: + successful_driver_ids = [ + receipt.driver_id or "unknown" + for receipt in delivery_batch.sent_receipts + ] if show_log: logger.info( f"已通过 Platform IO 将消息 '{message_preview}' 发往平台'{platform}' " - f"(driver: {receipt.driver_id or 'unknown'})" + f"(drivers: {', '.join(successful_driver_ids)})" ) return True - logger.warning( - f"Platform IO 发送失败: platform={platform} driver={receipt.driver_id} " - f"status={receipt.status} error={receipt.error}" - ) + failed_details = "; ".join( + f"driver={receipt.driver_id} status={receipt.status} error={receipt.error}" + for receipt in delivery_batch.failed_receipts + ) or "未命中任何发送路由" + logger.warning(f"Platform IO 发送失败: platform={platform} {failed_details}") return False except Exception as exc: logger.warning(f"检查 Platform IO 出站链路时出现异常,将回退旧发送链: {exc}") diff --git a/src/platform_io/__init__.py b/src/platform_io/__init__.py index 380ecbb6..c91535d1 100644 --- a/src/platform_io/__init__.py +++ b/src/platform_io/__init__.py @@ -6,8 +6,9 @@ from .manager import PlatformIOManager, get_platform_io_manager from .route_key_factory import RouteKeyFactory -from .routing import RouteBindingConflictError, RouteTable +from .routing import RouteTable from .types import ( + DeliveryBatch, DeliveryReceipt, DeliveryStatus, DriverDescriptor, @@ -15,10 +16,10 @@ from .types import ( InboundMessageEnvelope, RouteBinding, RouteKey, - RouteMode, ) __all__ = [ + "DeliveryBatch", "DeliveryReceipt", "DeliveryStatus", "DriverDescriptor", @@ -27,9 +28,7 @@ __all__ = [ "PlatformIOManager", "RouteKeyFactory", "RouteBinding", - "RouteBindingConflictError", "RouteKey", - "RouteMode", "RouteTable", "get_platform_io_manager", ] diff --git a/src/platform_io/drivers/plugin_driver.py b/src/platform_io/drivers/plugin_driver.py index dff980f8..c03204ad 100644 --- a/src/platform_io/drivers/plugin_driver.py +++ b/src/platform_io/drivers/plugin_driver.py @@ -1,4 +1,4 @@ -"""提供 Platform IO 的插件适配器驱动实现。""" +"""提供 Platform IO 的插件消息网关驱动实现。""" from typing import TYPE_CHECKING, Any, Dict, Optional, Protocol @@ -9,45 +9,49 @@ if TYPE_CHECKING: from src.chat.message_receive.message import SessionMessage -class _AdapterSupervisorProtocol(Protocol): - """适配器驱动依赖的 Supervisor 最小协议。""" +class _GatewaySupervisorProtocol(Protocol): + """消息网关驱动依赖的 Supervisor 最小协议。""" - async def invoke_adapter( + async def invoke_message_gateway( self, plugin_id: str, - method_name: str, + component_name: str, args: Optional[Dict[str, Any]] = None, timeout_ms: int = 30000, ) -> Any: - """调用适配器插件专用方法。""" + """调用插件声明的消息网关方法。""" class PluginPlatformDriver(PlatformIODriver): - """面向适配器插件链路的 Platform IO 驱动。""" + """面向插件消息网关链路的 Platform IO 驱动。""" def __init__( self, driver_id: str, platform: str, - supervisor: _AdapterSupervisorProtocol, - send_method: str = "send_to_platform", + supervisor: _GatewaySupervisorProtocol, + component_name: str, + *, + supports_send: bool, account_id: Optional[str] = None, scope: Optional[str] = None, plugin_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> None: - """初始化一个插件适配器驱动。 + """初始化一个插件消息网关驱动。 Args: driver_id: Broker 内的唯一驱动 ID。 - platform: 该适配器负责的平台名称。 - supervisor: 持有该适配器插件的 Supervisor。 - send_method: 出站发送时要调用的插件方法名。 + platform: 该消息网关负责的平台名称。 + supervisor: 持有该插件的 Supervisor。 + component_name: 出站时要调用的网关组件名称。 + supports_send: 当前驱动是否具备出站能力。 account_id: 可选的账号 ID 或 self ID。 scope: 可选的额外路由作用域。 - plugin_id: 拥有该适配器实现的插件 ID。 + plugin_id: 拥有该实现的插件 ID。 metadata: 可选的额外驱动元数据。 """ + descriptor = DriverDescriptor( driver_id=driver_id, kind=DriverKind.PLUGIN, @@ -59,7 +63,8 @@ class PluginPlatformDriver(PlatformIODriver): ) super().__init__(descriptor) self._supervisor = supervisor - self._send_method = send_method + self._component_name = component_name + self._supports_send = supports_send async def send_message( self, @@ -67,16 +72,27 @@ class PluginPlatformDriver(PlatformIODriver): route_key: RouteKey, metadata: Optional[Dict[str, Any]] = None, ) -> DeliveryReceipt: - """通过适配器插件发送消息。 + """通过插件消息网关发送消息。 Args: message: 要投递的内部会话消息。 route_key: Broker 为本次投递选择的路由键。 - metadata: 本次出站投递可选的 Broker 侧元数据。 + metadata: 可选的发送元数据。 Returns: - DeliveryReceipt: 由驱动返回的规范化回执。 + DeliveryReceipt: 规范化后的发送回执。 """ + + if not self._supports_send: + return DeliveryReceipt( + internal_message_id=message.message_id, + route_key=route_key, + status=DeliveryStatus.FAILED, + driver_id=self.driver_id, + driver_kind=self.descriptor.kind, + error="当前消息网关仅支持接收,不支持发送", + ) + from src.plugin_runtime.host.message_utils import PluginMessageUtils plugin_id = self.descriptor.plugin_id or "" @@ -87,14 +103,14 @@ class PluginPlatformDriver(PlatformIODriver): status=DeliveryStatus.FAILED, driver_id=self.driver_id, driver_kind=self.descriptor.kind, - error="插件适配器驱动缺少 plugin_id", + error="插件消息网关驱动缺少 plugin_id", ) try: message_dict = PluginMessageUtils._session_message_to_dict(message) - response = await self._supervisor.invoke_adapter( + response = await self._supervisor.invoke_message_gateway( plugin_id=plugin_id, - method_name=self._send_method, + component_name=self._component_name, args={ "message": message_dict, "route": { @@ -119,7 +135,7 @@ class PluginPlatformDriver(PlatformIODriver): return self._build_receipt(message.message_id, route_key, response) def _build_receipt(self, internal_message_id: str, route_key: RouteKey, response: Any) -> DeliveryReceipt: - """将适配器调用响应归一化为出站回执。 + """将网关调用响应归一化为出站回执。 Args: internal_message_id: 内部消息 ID。 @@ -129,8 +145,9 @@ class PluginPlatformDriver(PlatformIODriver): Returns: DeliveryReceipt: 标准化后的出站回执。 """ + if getattr(response, "error", None): - error = response.error.get("message", "适配器发送失败") + error = response.error.get("message", "消息网关发送失败") return DeliveryReceipt( internal_message_id=internal_message_id, route_key=route_key, @@ -149,7 +166,7 @@ class PluginPlatformDriver(PlatformIODriver): status=DeliveryStatus.FAILED, driver_id=self.driver_id, driver_kind=self.descriptor.kind, - error=str(payload.get("result", "适配器发送失败")) if isinstance(payload, dict) else "适配器发送失败", + error=str(payload.get("result", "消息网关发送失败")) if isinstance(payload, dict) else "消息网关发送失败", ) result = payload.get("result") if isinstance(payload, dict) else None @@ -161,7 +178,7 @@ class PluginPlatformDriver(PlatformIODriver): status=DeliveryStatus.FAILED, driver_id=self.driver_id, driver_kind=self.descriptor.kind, - error=str(result.get("error", "适配器发送失败")), + error=str(result.get("error", "消息网关发送失败")), metadata=result.get("metadata", {}) if isinstance(result.get("metadata"), dict) else {}, ) external_message_id = str(result.get("external_message_id") or result.get("message_id") or "") or None diff --git a/src/platform_io/manager.py b/src/platform_io/manager.py index b1fe3bdc..c96a9ddd 100644 --- a/src/platform_io/manager.py +++ b/src/platform_io/manager.py @@ -10,7 +10,7 @@ from .outbound_tracker import OutboundTracker from .route_key_factory import RouteKeyFactory from .registry import DriverRegistry from .routing import RouteTable -from .types import DeliveryReceipt, DeliveryStatus, InboundMessageEnvelope, RouteBinding, RouteKey +from .types import DeliveryBatch, DeliveryReceipt, DeliveryStatus, InboundMessageEnvelope, RouteBinding, RouteKey if TYPE_CHECKING: from src.chat.message_receive.message import SessionMessage @@ -21,17 +21,21 @@ InboundDispatcher = Callable[[InboundMessageEnvelope], Awaitable[None]] class PlatformIOManager: - """统一协调双路径平台消息 IO 的路由、去重与状态跟踪。 + """统一协调平台消息 IO 的路由、去重与状态跟踪。 - 这个管理器预期会成为 legacy 适配器链路与 plugin 适配器链路之间的 - 唯一裁决点。当前地基阶段,它只提供共享状态和 Broker 侧契约,还没有 - 真正把生产流量切到新中间层。 + 与旧实现不同,这个管理器不再负责“多条链路谁该接管平台”的裁决, + 只维护发送表和接收表两张轻量路由表: + + - 发送时:解析所有命中的发送绑定并全部投递。 + - 接收时:只校验当前驱动是否已登记为可接收链路,然后全部放行给上层。 + - 去重时:仅对单条链路做技术性重放抑制,不做跨链路语义去重。 """ def __init__(self) -> None: """初始化 Broker 管理器及其内存状态。""" self._driver_registry = DriverRegistry() - self._route_table = RouteTable() + self._send_route_table = RouteTable() + self._receive_route_table = RouteTable() self._deduplicator = MessageDeduplicator() self._outbound_tracker = OutboundTracker() self._inbound_dispatcher: Optional[InboundDispatcher] = None @@ -152,13 +156,22 @@ class PlatformIOManager: return self._driver_registry @property - def route_table(self) -> RouteTable: - """返回管理器持有的路由绑定表。 + def send_route_table(self) -> RouteTable: + """返回发送路由表。""" - Returns: - RouteTable: 用于归属解析的路由绑定表。 - """ - return self._route_table + return self._send_route_table + + @property + def receive_route_table(self) -> RouteTable: + """返回接收路由表。""" + + return self._receive_route_table + + @property + def route_table(self) -> RouteTable: + """兼容旧接口,返回发送路由表。""" + + return self._send_route_table @property def deduplicator(self) -> MessageDeduplicator: @@ -257,15 +270,15 @@ class PlatformIOManager: return None removed_driver.clear_inbound_handler() - self._route_table.remove_bindings_by_driver(driver_id) + self._send_route_table.remove_bindings_by_driver(driver_id) + self._receive_route_table.remove_bindings_by_driver(driver_id) return removed_driver - def bind_route(self, binding: RouteBinding, *, replace: bool = False) -> None: - """为某个路由键绑定驱动。 + def bind_send_route(self, binding: RouteBinding) -> None: + """为某个路由键绑定发送驱动。 Args: binding: 要保存的路由绑定。 - replace: 是否允许替换已有的精确 active owner。 Raises: ValueError: 当绑定引用了不存在的驱动,或者绑定与驱动描述不一致时抛出。 @@ -275,30 +288,78 @@ class PlatformIOManager: raise ValueError(f"驱动 {binding.driver_id} 未注册,无法绑定路由") self._validate_binding_against_driver(binding, driver) - self._route_table.bind(binding, replace=replace) + self._send_route_table.bind(binding) - def unbind_route(self, route_key: RouteKey, driver_id: Optional[str] = None) -> None: - """移除一个或多个路由绑定。 + def bind_receive_route(self, binding: RouteBinding) -> None: + """为某个路由键绑定接收驱动。 + + Args: + binding: 要保存的路由绑定。 + + Raises: + ValueError: 当绑定引用了不存在的驱动,或者绑定与驱动描述不一致时抛出。 + """ + driver = self._driver_registry.get(binding.driver_id) + if driver is None: + raise ValueError(f"驱动 {binding.driver_id} 未注册,无法绑定路由") + + self._validate_binding_against_driver(binding, driver) + self._receive_route_table.bind(binding) + + def bind_route(self, binding: RouteBinding) -> None: + """兼容旧接口,默认同时绑定发送表和接收表。""" + + self.bind_send_route(binding) + self.bind_receive_route(binding) + + def unbind_send_route(self, route_key: RouteKey, driver_id: Optional[str] = None) -> None: + """移除发送路由绑定。 Args: route_key: 要移除绑定的路由键。 driver_id: 可选的特定驱动 ID。 """ - self._route_table.unbind(route_key, driver_id) - def resolve_driver(self, route_key: RouteKey) -> Optional[PlatformIODriver]: - """解析某个路由键当前的 active 驱动。 + self._send_route_table.unbind(route_key, driver_id) + + def unbind_receive_route(self, route_key: RouteKey, driver_id: Optional[str] = None) -> None: + """移除接收路由绑定。 + + Args: + route_key: 要移除绑定的路由键。 + driver_id: 可选的特定驱动 ID。 + """ + + self._receive_route_table.unbind(route_key, driver_id) + + def unbind_route(self, route_key: RouteKey, driver_id: Optional[str] = None) -> None: + """兼容旧接口,默认同时从发送表和接收表解绑。""" + + self.unbind_send_route(route_key, driver_id) + self.unbind_receive_route(route_key, driver_id) + + def resolve_drivers(self, route_key: RouteKey) -> List[PlatformIODriver]: + """解析某个路由键当前命中的全部发送驱动。 Args: route_key: 要解析的路由键。 Returns: - Optional[PlatformIODriver]: 若存在 active 驱动,则返回该驱动实例。 + List[PlatformIODriver]: 当前命中的全部发送驱动。 """ - active_binding = self._route_table.get_active_binding(route_key) - if active_binding is None: - return None - return self._driver_registry.get(active_binding.driver_id) + + drivers: List[PlatformIODriver] = [] + for binding in self._send_route_table.resolve_bindings(route_key): + driver = self._driver_registry.get(binding.driver_id) + if driver is not None: + drivers.append(driver) + return drivers + + def resolve_driver(self, route_key: RouteKey) -> Optional[PlatformIODriver]: + """兼容旧接口,返回首个命中的发送驱动。""" + + drivers = self.resolve_drivers(route_key) + return drivers[0] if drivers else None @staticmethod def build_route_key_from_message(message: "SessionMessage") -> RouteKey: @@ -335,9 +396,9 @@ class PlatformIOManager: 否则返回 ``False``。 """ - if not self._route_table.accepts_inbound(envelope.route_key, envelope.driver_id): + if not self._receive_route_table.has_binding_for_driver(envelope.route_key, envelope.driver_id): logger.info( - "忽略非 active owner 的入站消息: route=%s driver=%s", + "忽略未登记到接收路由表的入站消息: route=%s driver=%s", envelope.route_key, envelope.driver_id, ) @@ -361,8 +422,8 @@ class PlatformIOManager: message: "SessionMessage", route_key: RouteKey, metadata: Optional[Dict[str, Any]] = None, - ) -> DeliveryReceipt: - """通过 Broker 选中的驱动发送一条消息。 + ) -> DeliveryBatch: + """通过 Broker 选中的全部发送驱动广播一条消息。 Args: message: 要投递的内部会话消息。 @@ -370,61 +431,54 @@ class PlatformIOManager: metadata: 可选的额外 Broker 侧元数据。 Returns: - DeliveryReceipt: 规范化后的出站回执。若路由不存在、驱动缺失, - 或同一消息已存在未完成的出站跟踪,也会返回失败回执而不是抛异常。 + DeliveryBatch: 规范化后的批量出站回执。 """ + drivers = self.resolve_drivers(route_key) + if not drivers: + return DeliveryBatch(internal_message_id=message.message_id, route_key=route_key) - active_binding = self._route_table.get_active_binding(route_key) - if active_binding is None: - return DeliveryReceipt( - internal_message_id=message.message_id, - route_key=route_key, - status=DeliveryStatus.FAILED, - error="未找到 active 路由绑定", - ) + receipts: List[DeliveryReceipt] = [] + for driver in drivers: + try: + self._outbound_tracker.begin_tracking( + internal_message_id=message.message_id, + route_key=route_key, + driver_id=driver.driver_id, + metadata=metadata, + ) + except ValueError as exc: + receipts.append( + DeliveryReceipt( + internal_message_id=message.message_id, + route_key=route_key, + status=DeliveryStatus.FAILED, + driver_id=driver.driver_id, + driver_kind=driver.descriptor.kind, + error=str(exc), + ) + ) + continue - driver = self._driver_registry.get(active_binding.driver_id) - if driver is None: - return DeliveryReceipt( - internal_message_id=message.message_id, - route_key=route_key, - status=DeliveryStatus.FAILED, - driver_id=active_binding.driver_id, - driver_kind=active_binding.driver_kind, - error="active 路由绑定对应的驱动不存在", - ) + try: + receipt = await driver.send_message(message=message, route_key=route_key, metadata=metadata) + except Exception as exc: + receipt = DeliveryReceipt( + internal_message_id=message.message_id, + route_key=route_key, + status=DeliveryStatus.FAILED, + driver_id=driver.driver_id, + driver_kind=driver.descriptor.kind, + error=str(exc), + ) - try: - self._outbound_tracker.begin_tracking( - internal_message_id=message.message_id, - route_key=route_key, - driver_id=driver.driver_id, - metadata=metadata, - ) - except ValueError as exc: - return DeliveryReceipt( - internal_message_id=message.message_id, - route_key=route_key, - status=DeliveryStatus.FAILED, - driver_id=driver.driver_id, - driver_kind=driver.descriptor.kind, - error=str(exc), - ) + self._outbound_tracker.finish_tracking(receipt) + receipts.append(receipt) - try: - receipt = await driver.send_message(message=message, route_key=route_key, metadata=metadata) - except Exception as exc: - receipt = DeliveryReceipt( - internal_message_id=message.message_id, - route_key=route_key, - status=DeliveryStatus.FAILED, - driver_id=driver.driver_id, - driver_kind=driver.descriptor.kind, - error=str(exc), - ) - - self._outbound_tracker.finish_tracking(receipt) - return receipt + return DeliveryBatch( + internal_message_id=message.message_id, + route_key=route_key, + receipts=receipts, + ) @staticmethod def _build_inbound_dedupe_key(envelope: InboundMessageEnvelope) -> Optional[str]: @@ -453,7 +507,7 @@ class PlatformIOManager: if not normalized_dedupe_key: return None - return f"{envelope.route_key.to_dedupe_scope()}:{normalized_dedupe_key}" + return f"{envelope.driver_id}:{normalized_dedupe_key}" @staticmethod def _validate_binding_against_driver(binding: RouteBinding, driver: PlatformIODriver) -> None: diff --git a/src/platform_io/routing.py b/src/platform_io/routing.py index 7f85bbfa..2a9b41ef 100644 --- a/src/platform_io/routing.py +++ b/src/platform_io/routing.py @@ -1,52 +1,29 @@ -"""提供 Platform IO 的路由绑定存储与归属解析能力。""" +"""提供 Platform IO 的轻量路由绑定表。""" from typing import Dict, List, Optional -from .types import RouteBinding, RouteKey, RouteMode - - -class RouteBindingConflictError(ValueError): - """当同一路由键出现多个 active owner 竞争时抛出。""" +from .types import RouteBinding, RouteKey class RouteTable: - """维护路由绑定并解析路由归属。 + """维护单张路由绑定表。 - 这个表刻意保持轻量,只负责归属规则本身,不掺杂具体发送或接收逻辑。 - 它决定某个路由键当前由哪个驱动 active 接管,哪些驱动仅以 shadow - 方式旁路观测。 + 该实现不负责裁决“唯一 owner”,只负责保存绑定,并按 + ``RouteKey.resolution_order()`` 解析出候选绑定列表。 """ def __init__(self) -> None: - """初始化一个空的路由绑定表。""" + """初始化空路由绑定表。""" + self._bindings: Dict[RouteKey, Dict[str, RouteBinding]] = {} - def bind(self, binding: RouteBinding, *, replace: bool = False) -> None: + def bind(self, binding: RouteBinding) -> None: """注册或更新一条路由绑定。 Args: - binding: 要注册的绑定对象。 - replace: 当精确路由键上已经存在 active owner 时,是否允许替换。 - - Raises: - RouteBindingConflictError: 当精确路由键上已存在其他 active owner, - 且 ``replace`` 为 ``False`` 时抛出。 + binding: 要保存的路由绑定。 """ - if binding.mode == RouteMode.DISABLED: - self.unbind(binding.route_key, binding.driver_id) - return - - if binding.mode == RouteMode.ACTIVE: - active_binding = self.get_active_binding(binding.route_key, exact_only=True) - if active_binding and active_binding.driver_id != binding.driver_id: - if not replace: - raise RouteBindingConflictError( - f"RouteKey {binding.route_key} 已由 {active_binding.driver_id} 接管," - f"拒绝绑定到 {binding.driver_id}" - ) - self.unbind(binding.route_key, active_binding.driver_id) - self._bindings.setdefault(binding.route_key, {})[binding.driver_id] = binding def unbind(self, route_key: RouteKey, driver_id: Optional[str] = None) -> List[RouteBinding]: @@ -54,7 +31,7 @@ class RouteTable: Args: route_key: 要移除绑定的路由键。 - driver_id: 可选的特定驱动 ID;若为空,则移除该路由键上的全部绑定。 + driver_id: 可选的驱动 ID;为空时移除该路由键下全部绑定。 Returns: List[RouteBinding]: 被移除的绑定列表。 @@ -67,15 +44,15 @@ class RouteTable: if driver_id is None: removed = list(binding_map.values()) self._bindings.pop(route_key, None) - return removed + return self._sort_bindings(removed) removed_binding = binding_map.pop(driver_id, None) if not binding_map: self._bindings.pop(route_key, None) - return [removed_binding] if removed_binding else [] + return [removed_binding] if removed_binding is not None else [] def remove_bindings_by_driver(self, driver_id: str) -> List[RouteBinding]: - """移除某个驱动在所有路由键上的绑定。 + """移除某个驱动在整张表上的全部绑定。 Args: driver_id: 要移除绑定的驱动 ID。 @@ -83,9 +60,9 @@ class RouteTable: Returns: List[RouteBinding]: 被移除的绑定列表。 """ + removed_bindings: List[RouteBinding] = [] empty_route_keys: List[RouteKey] = [] - for route_key, binding_map in self._bindings.items(): removed_binding = binding_map.pop(driver_id, None) if removed_binding is not None: @@ -99,13 +76,13 @@ class RouteTable: return self._sort_bindings(removed_bindings) def list_bindings(self, route_key: Optional[RouteKey] = None) -> List[RouteBinding]: - """列出当前绑定。 + """列出当前路由表中的绑定。 Args: - route_key: 可选的路由键过滤条件;若为空,则返回全部路由键上的绑定。 + route_key: 可选的路由键过滤条件。 Returns: - List[RouteBinding]: 按优先级降序排列的绑定列表。 + List[RouteBinding]: 当前绑定列表。 """ if route_key is None: @@ -117,51 +94,38 @@ class RouteTable: binding_map = self._bindings.get(route_key, {}) return self._sort_bindings(list(binding_map.values())) - def get_active_binding(self, route_key: RouteKey, *, exact_only: bool = False) -> Optional[RouteBinding]: - """获取某个路由键当前生效的 active 绑定。 + def resolve_bindings(self, route_key: RouteKey) -> List[RouteBinding]: + """按从具体到宽泛的顺序解析路由候选绑定。 Args: - route_key: 要解析的路由键。 - exact_only: 是否只检查精确路由键而不做回退解析。 + route_key: 待解析的路由键。 Returns: - Optional[RouteBinding]: 若存在 active owner,则返回对应绑定。 + List[RouteBinding]: 去重后的候选绑定列表。 """ - candidate_keys = [route_key] if exact_only else route_key.resolution_order() - for candidate_key in candidate_keys: - binding_map = self._bindings.get(candidate_key, {}) - active_binding = self._pick_best_binding(binding_map, RouteMode.ACTIVE) - if active_binding is not None: - return active_binding - return None + resolved_bindings: List[RouteBinding] = [] + seen_driver_ids: set[str] = set() + for candidate_key in route_key.resolution_order(): + for binding in self.list_bindings(candidate_key): + if binding.driver_id in seen_driver_ids: + continue + seen_driver_ids.add(binding.driver_id) + resolved_bindings.append(binding) + return resolved_bindings - def get_shadow_bindings(self, route_key: RouteKey) -> List[RouteBinding]: - """获取某个精确路由键上的 shadow 绑定。 + def has_binding_for_driver(self, route_key: RouteKey, driver_id: str) -> bool: + """判断指定驱动是否在当前路由键解析结果中。 Args: - route_key: 要查看的路由键。 + route_key: 待解析的路由键。 + driver_id: 目标驱动 ID。 Returns: - List[RouteBinding]: 按优先级降序排列的 shadow 绑定列表。 - """ - binding_map = self._bindings.get(route_key, {}) - shadow_bindings = [binding for binding in binding_map.values() if binding.mode == RouteMode.SHADOW] - return self._sort_bindings(shadow_bindings) - - def accepts_inbound(self, route_key: RouteKey, driver_id: str) -> bool: - """判断某个驱动是否是当前允许入 Core 的 active owner。 - - Args: - route_key: 入站消息对应的路由键。 - driver_id: 希望将消息送入 Core 的驱动 ID。 - - Returns: - bool: 若该驱动是解析结果中的 active owner,则返回 ``True``。 + bool: 若驱动存在于解析结果中则返回 ``True``。 """ - active_binding = self.get_active_binding(route_key) - return active_binding is not None and active_binding.driver_id == driver_id + return any(binding.driver_id == driver_id for binding in self.resolve_bindings(route_key)) @staticmethod def _sort_bindings(bindings: List[RouteBinding]) -> List[RouteBinding]: @@ -173,30 +137,5 @@ class RouteTable: Returns: List[RouteBinding]: 排序后的绑定列表。 """ + return sorted(bindings, key=lambda item: item.priority, reverse=True) - - @staticmethod - def _pick_best_binding( - binding_map: Dict[str, RouteBinding], - mode: RouteMode, - ) -> Optional[RouteBinding]: - """从绑定映射中挑选指定模式下优先级最高的一条绑定。 - - Args: - binding_map: 某个精确 ``RouteKey`` 对应的绑定映射。 - mode: 需要挑选的绑定模式。 - - Returns: - Optional[RouteBinding]: 若存在匹配模式的绑定,则返回优先级最高的一条。 - - Notes: - 这里使用单次线性扫描代替“先过滤成列表再排序”的做法,以减少 - 高频路由解析路径上的临时对象分配和排序开销。 - """ - best_binding: Optional[RouteBinding] = None - for binding in binding_map.values(): - if binding.mode != mode: - continue - if best_binding is None or binding.priority > best_binding.priority: - best_binding = binding - return best_binding diff --git a/src/platform_io/types.py b/src/platform_io/types.py index 8729b637..200eca51 100644 --- a/src/platform_io/types.py +++ b/src/platform_io/types.py @@ -19,14 +19,6 @@ class DriverKind(str, Enum): PLUGIN = "plugin" -class RouteMode(str, Enum): - """路由归属模式枚举。""" - - ACTIVE = "active" - SHADOW = "shadow" - DISABLED = "disabled" - - class DeliveryStatus(str, Enum): """统一出站回执状态枚举。""" @@ -158,21 +150,19 @@ class DriverDescriptor: @dataclass(frozen=True, slots=True) class RouteBinding: - """表示一条从路由键到驱动的归属绑定关系。 + """表示一条从路由键到驱动的绑定关系。 Attributes: route_key: 该绑定覆盖的路由键。 - driver_id: 拥有或旁路观察该路由的驱动 ID。 + driver_id: 拥有该路由的驱动 ID。 driver_kind: 绑定驱动的类型。 - mode: 绑定模式,例如 active owner 或 shadow observer。 - priority: 当同模式下存在多条绑定时使用的相对优先级。 + priority: 当同一路由键存在多条绑定时使用的相对优先级。 metadata: 预留给未来路由策略的额外绑定元数据。 """ route_key: RouteKey driver_id: str driver_kind: DriverKind - mode: RouteMode = RouteMode.ACTIVE priority: int = 0 metadata: Dict[str, Any] = field(default_factory=dict) @@ -239,3 +229,36 @@ class DeliveryReceipt: external_message_id: Optional[str] = None error: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass(slots=True) +class DeliveryBatch: + """表示一次广播式出站投递的批量结果。 + + Attributes: + internal_message_id: 内部消息 ID。 + route_key: 本次投递使用的路由键。 + receipts: 各条路由的独立投递回执列表。 + """ + + internal_message_id: str + route_key: RouteKey + receipts: List[DeliveryReceipt] = field(default_factory=list) + + @property + def sent_receipts(self) -> List[DeliveryReceipt]: + """返回全部发送成功的回执。""" + + return [receipt for receipt in self.receipts if receipt.status == DeliveryStatus.SENT] + + @property + def failed_receipts(self) -> List[DeliveryReceipt]: + """返回全部发送失败的回执。""" + + return [receipt for receipt in self.receipts if receipt.status != DeliveryStatus.SENT] + + @property + def has_success(self) -> bool: + """返回当前批量投递是否至少命中一条成功回执。""" + + return bool(self.sent_receipts) diff --git a/src/plugin_runtime/host/component_registry.py b/src/plugin_runtime/host/component_registry.py index 95da0052..08b0ea3b 100644 --- a/src/plugin_runtime/host/component_registry.py +++ b/src/plugin_runtime/host/component_registry.py @@ -119,12 +119,52 @@ class MessageGatewayEntry(ComponentEntry): """MessageGateway 组件条目""" def __init__(self, name: str, component_type: str, plugin_id: str, metadata: Dict[str, Any]) -> None: - platform = metadata.get("platform") - if not platform or not isinstance(platform, str): - raise ValueError(f"MessageGateway 组件 {plugin_id}.{name} 缺少有效的 platform 字段") - self.platform: str = platform + self.route_type: str = self._normalize_route_type(metadata.get("route_type", "")) + self.platform: str = str(metadata.get("platform", "") or "").strip() + self.protocol: str = str(metadata.get("protocol", "") or "").strip() + self.account_id: str = str(metadata.get("account_id", "") or "").strip() + self.scope: str = str(metadata.get("scope", "") or "").strip() super().__init__(name, component_type, plugin_id, metadata) + @staticmethod + def _normalize_route_type(raw_value: Any) -> str: + """规范化消息网关路由类型。 + + Args: + raw_value: 原始路由类型值。 + + Returns: + str: 规范化后的路由类型。 + + Raises: + ValueError: 当路由类型不受支持时抛出。 + """ + + normalized_value = str(raw_value or "").strip().lower() + route_type_aliases = { + "send": "send", + "receive": "receive", + "recv": "receive", + "recive": "receive", + "duplex": "duplex", + } + route_type = route_type_aliases.get(normalized_value) + if route_type is None: + raise ValueError(f"MessageGateway 路由类型不合法: {raw_value}") + return route_type + + @property + def supports_send(self) -> bool: + """返回当前网关是否支持出站。""" + + return self.route_type in {"send", "duplex"} + + @property + def supports_receive(self) -> bool: + """返回当前网关是否支持入站。""" + + return self.route_type in {"receive", "duplex"} + class ComponentRegistry: """Host-side 组件注册表 @@ -404,26 +444,71 @@ class ComponentRegistry: handlers.sort(key=lambda c: c.priority, reverse=True) return handlers - def get_message_gateways( - self, platform: str, *, enabled_only: bool = True, session_id: Optional[str] = None + def get_message_gateway( + self, + plugin_id: str, + name: str, + *, + enabled_only: bool = True, + session_id: Optional[str] = None, ) -> Optional[MessageGatewayEntry]: - """查询消息网关组件。 + """按插件和组件名获取单个消息网关。 Args: - platform (str): 平台名称 - enabled_only (bool): 是否仅返回启用的组件 - session_id (Optional[str]): 可选的会话ID,若提供则考虑会话禁用状态 + plugin_id: 插件 ID。 + name: 网关组件名称。 + enabled_only: 是否仅返回启用的组件。 + session_id: 可选的会话 ID。 + Returns: - gateway (Optional[MessageGatewayEntry]): 符合条件的 MessageGateway 组件,可能不存在 + Optional[MessageGatewayEntry]: 若存在则返回消息网关条目。 """ + component = self._components.get(f"{plugin_id}.{name}") + if not isinstance(component, MessageGatewayEntry): + return None + if enabled_only and not self.check_component_enabled(component, session_id): + return None + return component + + def get_message_gateways( + self, + *, + plugin_id: Optional[str] = None, + platform: str = "", + route_type: str = "", + enabled_only: bool = True, + session_id: Optional[str] = None, + ) -> List[MessageGatewayEntry]: + """查询消息网关组件列表。 + + Args: + plugin_id: 可选的插件 ID 过滤条件。 + platform: 可选的平台过滤条件。 + route_type: 可选的路由类型过滤条件。 + enabled_only: 是否仅返回启用的组件。 + session_id: 可选的会话 ID。 + + Returns: + List[MessageGatewayEntry]: 符合条件的消息网关组件列表。 + """ + + normalized_platform = str(platform or "").strip() + normalized_route_type = str(route_type or "").strip().lower() + gateways: List[MessageGatewayEntry] = [] for comp in self._by_type.get(ComponentTypes.MESSAGE_GATEWAY, {}).values(): if not isinstance(comp, MessageGatewayEntry): continue + if plugin_id and comp.plugin_id != plugin_id: + continue if enabled_only and not self.check_component_enabled(comp, session_id): continue - if comp.platform == platform: - return comp # 返回第一个 + if normalized_platform and comp.platform != normalized_platform: + continue + if normalized_route_type and comp.route_type != normalized_route_type: + continue + gateways.append(comp) + return gateways def get_tools(self, *, enabled_only: bool = True, session_id: Optional[str] = None) -> List[ToolEntry]: """查询所有工具组件。 diff --git a/src/plugin_runtime/host/message_gateway.py b/src/plugin_runtime/host/message_gateway.py index 9e8e9be6..90f94493 100644 --- a/src/plugin_runtime/host/message_gateway.py +++ b/src/plugin_runtime/host/message_gateway.py @@ -1,12 +1,9 @@ -""" -Message Gateway 模块 -适配器专用,用于将其他平台的消息转换为系统内部的消息格式,并将系统消息转换为其他平台的格式。 -""" +"""Host 侧消息网关包装器。""" from typing import TYPE_CHECKING, Any, Dict from src.common.logger import get_logger -from src.platform_io import DeliveryStatus, get_platform_io_manager +from src.platform_io import get_platform_io_manager from .message_utils import PluginMessageUtils @@ -50,7 +47,7 @@ class MessageGateway: internal_message: 内部消息对象。 Returns: - Dict[str, Any]: 供适配器插件消费的标准消息字典。 + Dict[str, Any]: 供消息网关插件消费的标准消息字典。 """ return dict(PluginMessageUtils._session_message_to_dict(internal_message)) @@ -83,7 +80,7 @@ class MessageGateway: Args: internal_message: 系统内部的 ``SessionMessage`` 对象。 supervisor: 当前持有该消息网关的 Supervisor。 - enabled_only: 兼容旧签名的保留参数,当前由 Platform IO 统一裁决。 + enabled_only: 兼容旧签名的保留参数,当前未使用。 save_to_db: 发送成功后是否写入数据库。 Returns: @@ -98,12 +95,13 @@ class MessageGateway: return False route_key = platform_io_manager.build_route_key_from_message(internal_message) - receipt = await platform_io_manager.send_message(internal_message, route_key) - if receipt.status != DeliveryStatus.SENT: - logger.warning(f"通过适配器链路发送消息失败: {receipt.error or receipt.status}") + delivery_batch = await platform_io_manager.send_message(internal_message, route_key) + if not delivery_batch.has_success: + logger.warning("通过消息网关链路发送消息失败: 未命中任何成功回执") return False - internal_message.message_id = receipt.external_message_id or internal_message.message_id + first_successful_receipt = delivery_batch.sent_receipts[0] + internal_message.message_id = first_successful_receipt.external_message_id or internal_message.message_id if save_to_db: try: from src.common.utils.utils_message import MessageUtils diff --git a/src/plugin_runtime/host/supervisor.py b/src/plugin_runtime/host/supervisor.py index 8a26af11..3588934e 100644 --- a/src/plugin_runtime/host/supervisor.py +++ b/src/plugin_runtime/host/supervisor.py @@ -9,24 +9,24 @@ import sys from src.common.logger import get_logger from src.config.config import global_config -from src.platform_io import DriverKind, InboundMessageEnvelope, RouteBinding, RouteKey, RouteMode, get_platform_io_manager +from src.core.component_registry import component_registry as core_component_registry +from src.core.types import ActionActivationType, ActionInfo, ComponentType as CoreComponentType +from src.platform_io import DriverKind, InboundMessageEnvelope, RouteBinding, RouteKey, get_platform_io_manager from src.platform_io.drivers import PluginPlatformDriver from src.platform_io.route_key_factory import RouteKeyFactory -from src.platform_io.routing import RouteBindingConflictError from src.plugin_runtime import ENV_HOST_VERSION, ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN from src.plugin_runtime.protocol.envelope import ( - AdapterDeclarationPayload, - AdapterStateUpdatePayload, - AdapterStateUpdateResultPayload, BootstrapPluginPayload, ConfigUpdatedPayload, Envelope, HealthPayload, + MessageGatewayStateUpdatePayload, + MessageGatewayStateUpdateResultPayload, PROTOCOL_VERSION, - ReceiveExternalMessagePayload, ReceiveExternalMessageResultPayload, RegisterPluginPayload, ReloadPluginResultPayload, + RouteMessagePayload, RunnerReadyPayload, ShutdownPayload, UnregisterPluginPayload, @@ -49,15 +49,12 @@ if TYPE_CHECKING: logger = get_logger("plugin_runtime.host.runner_manager") -_ADAPTER_BINDING_ROLE_RUNTIME_EXACT = "runtime_exact" -_ADAPTER_BINDING_ROLE_PLATFORM_DEFAULT = "platform_default" - - @dataclass(slots=True) -class _AdapterRuntimeState: - """保存适配器插件当前的运行时连接状态。""" +class _MessageGatewayRuntimeState: + """保存消息网关当前的运行时连接状态。""" - connected: bool = False + ready: bool = False + platform: Optional[str] = None account_id: Optional[str] = None scope: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) @@ -109,8 +106,8 @@ class PluginRunnerSupervisor: self._runner_process: Optional[asyncio.subprocess.Process] = None self._registered_plugins: Dict[str, RegisterPluginPayload] = {} - self._registered_adapters: Dict[str, AdapterDeclarationPayload] = {} - self._adapter_runtime_states: Dict[str, _AdapterRuntimeState] = {} + self._message_gateway_states: Dict[str, Dict[str, _MessageGatewayRuntimeState]] = {} + self._mirrored_core_actions: Dict[str, List[str]] = {} self._runner_ready_events: asyncio.Event = asyncio.Event() self._runner_ready_payloads: RunnerReadyPayload = RunnerReadyPayload() self._health_task: Optional[asyncio.Task[None]] = None @@ -289,28 +286,29 @@ class PluginRunnerSupervisor: timeout_ms, ) - async def invoke_adapter( + async def invoke_message_gateway( self, plugin_id: str, - method_name: str, + component_name: str, args: Optional[Dict[str, Any]] = None, timeout_ms: int = 30000, ) -> Envelope: - """调用适配器插件的专用方法。 + """调用插件声明的消息网关方法。 Args: - plugin_id: 目标适配器插件 ID。 - method_name: 要调用的插件方法名,例如 ``send_to_platform``。 - args: 传递给插件方法的关键字参数。 + plugin_id: 目标插件 ID。 + component_name: 消息网关组件名称。 + args: 传递给网关方法的关键字参数。 timeout_ms: RPC 超时时间,单位毫秒。 Returns: Envelope: Runner 返回的响应信封。 """ + return await self.invoke_plugin( - method="plugin.invoke_adapter", + method="plugin.invoke_message_gateway", plugin_id=plugin_id, - component_name=method_name, + component_name=component_name, args=args, timeout_ms=timeout_ms, ) @@ -468,8 +466,8 @@ class PluginRunnerSupervisor: def _register_internal_methods(self) -> None: """注册 Host 侧内部 RPC 方法。""" self._rpc_server.register_method("cap.call", self._capability_service.handle_capability_request) - self._rpc_server.register_method("host.receive_external_message", self._handle_receive_external_message) - self._rpc_server.register_method("host.update_adapter_state", self._handle_update_adapter_state) + self._rpc_server.register_method("host.route_message", self._handle_route_message) + self._rpc_server.register_method("host.update_message_gateway_state", self._handle_update_message_gateway_state) self._rpc_server.register_method("plugin.bootstrap", self._handle_bootstrap_plugin) self._rpc_server.register_method("plugin.register_components", self._handle_register_plugin) self._rpc_server.register_method("plugin.register_plugin", self._handle_register_plugin) @@ -512,30 +510,26 @@ class PluginRunnerSupervisor: except Exception as exc: return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc)) + self._remove_core_action_mirrors(payload.plugin_id) self._component_registry.remove_components_by_plugin(payload.plugin_id) - if payload.plugin_id in self._registered_adapters: - await self._unregister_adapter_driver(payload.plugin_id) - - try: - if payload.adapter is not None: - await self._register_adapter_driver(payload.plugin_id, payload.adapter) - except RouteBindingConflictError as exc: - return envelope.make_error_response(ErrorCode.E_METHOD_NOT_ALLOWED.value, str(exc)) - except Exception as exc: - return envelope.make_error_response(ErrorCode.E_UNKNOWN.value, str(exc)) + await self._unregister_all_message_gateway_drivers_for_plugin(payload.plugin_id) registered_count = self._component_registry.register_plugin_components( payload.plugin_id, [component.model_dump() for component in payload.components], ) self._registered_plugins[payload.plugin_id] = payload + self._message_gateway_states[payload.plugin_id] = {} + self._mirror_runtime_actions_to_core_registry(payload) return envelope.make_response( payload={ "accepted": True, "plugin_id": payload.plugin_id, "registered_components": registered_count, - "adapter_registered": payload.adapter is not None, + "message_gateways": len( + self._component_registry.get_message_gateways(plugin_id=payload.plugin_id, enabled_only=False) + ), } ) @@ -556,7 +550,9 @@ class PluginRunnerSupervisor: removed_components = self._component_registry.remove_components_by_plugin(payload.plugin_id) self._authorization.revoke_permission_token(payload.plugin_id) removed_registration = self._registered_plugins.pop(payload.plugin_id, None) is not None - await self._unregister_adapter_driver(payload.plugin_id) + self._remove_core_action_mirrors(payload.plugin_id) + await self._unregister_all_message_gateway_drivers_for_plugin(payload.plugin_id) + self._message_gateway_states.pop(payload.plugin_id, None) return envelope.make_response( payload={ @@ -569,41 +565,321 @@ class PluginRunnerSupervisor: ) @staticmethod - def _build_adapter_driver_id(plugin_id: str) -> str: - """构造适配器驱动 ID。 + def _coerce_action_activation_type(raw_value: Any) -> ActionActivationType: + """将运行时 Action 激活类型转换为旧核心枚举。 Args: - plugin_id: 适配器插件 ID。 + raw_value: 插件运行时声明中的激活类型值。 + + Returns: + ActionActivationType: 可供旧 Planner 使用的激活类型枚举。 + """ + normalized_value = str(raw_value or ActionActivationType.ALWAYS.value).strip().lower() + try: + return ActionActivationType(normalized_value) + except ValueError: + return ActionActivationType.ALWAYS + + @staticmethod + def _coerce_float(value: Any, default: float = 0.0) -> float: + """将任意输入尽量转换为浮点数。 + + Args: + value: 待转换的值。 + default: 转换失败时使用的默认值。 + + Returns: + float: 转换结果。 + """ + try: + return float(value) + except (TypeError, ValueError): + return default + + @staticmethod + def _build_core_action_info(plugin_id: str, component_name: str, metadata: Dict[str, Any]) -> ActionInfo: + """将运行时 Action 元数据映射为旧核心 ActionInfo。 + + Args: + plugin_id: 插件 ID。 + component_name: 组件名称。 + metadata: 运行时组件元数据。 + + Returns: + ActionInfo: 兼容旧 Planner 的动作定义。 + """ + activation_keywords = [ + str(item) + for item in (metadata.get("activation_keywords") or []) + if item is not None and str(item).strip() + ] + action_require = [ + str(item) + for item in (metadata.get("action_require") or []) + if item is not None and str(item).strip() + ] + associated_types = [ + str(item) + for item in (metadata.get("associated_types") or []) + if item is not None and str(item).strip() + ] + raw_action_parameters = metadata.get("action_parameters") or {} + action_parameters = { + str(param_name): str(param_description) + for param_name, param_description in raw_action_parameters.items() + } if isinstance(raw_action_parameters, dict) else {} + + return ActionInfo( + name=component_name, + component_type=CoreComponentType.ACTION, + description=str(metadata.get("description", "") or ""), + enabled=bool(metadata.get("enabled", True)), + plugin_name=plugin_id, + metadata=dict(metadata), + action_parameters=action_parameters, + action_require=action_require, + associated_types=associated_types, + activation_type=PluginRunnerSupervisor._coerce_action_activation_type(metadata.get("activation_type")), + random_activation_probability=PluginRunnerSupervisor._coerce_float( + metadata.get("activation_probability"), + 0.0, + ), + activation_keywords=activation_keywords, + parallel_action=bool(metadata.get("parallel_action", False)), + ) + + @staticmethod + def _extract_stream_id_from_action_kwargs(kwargs: Dict[str, Any]) -> str: + """从旧 ActionManager 传入参数中提取聊天流 ID。 + + Args: + kwargs: 旧动作执行器收到的关键字参数。 + + Returns: + str: 可用于新运行时 Action 的 ``stream_id``。 + """ + chat_stream = kwargs.get("chat_stream") + if chat_stream is not None: + try: + return str(chat_stream.session_id) + except AttributeError: + pass + + raw_stream_id = kwargs.get("stream_id", "") + return str(raw_stream_id or "") + + def _build_runtime_action_executor( + self, + plugin_id: str, + component_name: str, + ) -> Any: + """构造一个转发到 plugin runtime 的旧核心 Action 执行器。 + + Args: + plugin_id: 目标插件 ID。 + component_name: 目标 Action 组件名称。 + + Returns: + Callable[..., Coroutine[Any, Any, tuple[bool, str]]]: 兼容旧 ActionManager 的执行器。 + """ + + async def _executor(**kwargs: Any) -> tuple[bool, str]: + """将旧 Planner 的动作调用桥接到 plugin runtime。 + + Args: + **kwargs: 旧 ActionManager 传入的运行时上下文参数。 + + Returns: + tuple[bool, str]: ``(是否成功, 动作说明)``。 + """ + invoke_args: Dict[str, Any] = {} + action_data = kwargs.get("action_data") + if isinstance(action_data, dict): + invoke_args.update(action_data) + + stream_id = self._extract_stream_id_from_action_kwargs(kwargs) + invoke_args["action_data"] = action_data if isinstance(action_data, dict) else {} + invoke_args["stream_id"] = stream_id + invoke_args["chat_id"] = stream_id + invoke_args["reasoning"] = str(kwargs.get("action_reasoning", "") or "") + + thinking_id = kwargs.get("thinking_id") + if thinking_id is not None: + invoke_args["thinking_id"] = str(thinking_id) + + cycle_timers = kwargs.get("cycle_timers") + if isinstance(cycle_timers, dict): + invoke_args["cycle_timers"] = cycle_timers + + plugin_config = kwargs.get("plugin_config") + if isinstance(plugin_config, dict): + invoke_args["plugin_config"] = plugin_config + + log_prefix = kwargs.get("log_prefix") + if isinstance(log_prefix, str): + invoke_args["log_prefix"] = log_prefix + + shutting_down = kwargs.get("shutting_down") + if isinstance(shutting_down, bool): + invoke_args["shutting_down"] = shutting_down + + try: + response = await self.invoke_plugin( + method="plugin.invoke_action", + plugin_id=plugin_id, + component_name=component_name, + args=invoke_args, + timeout_ms=30000, + ) + except Exception as exc: + logger.error(f"运行时 Action {plugin_id}.{component_name} 执行失败: {exc}", exc_info=True) + return False, str(exc) + + payload = response.payload if isinstance(response.payload, dict) else {} + success = bool(payload.get("success", False)) + result = payload.get("result") + + if isinstance(result, (list, tuple)): + if len(result) >= 2: + return bool(result[0]), "" if result[1] is None else str(result[1]) + if len(result) == 1: + return bool(result[0]), "" + + if success: + return True, "" if result is None else str(result) + return False, "" if result is None else str(result) + + return _executor + + def _mirror_runtime_actions_to_core_registry(self, payload: RegisterPluginPayload) -> None: + """将 plugin runtime 中声明的 Action 镜像到旧核心注册表。 + + Args: + payload: 当前插件的注册载荷。 + """ + mirrored_action_names: List[str] = [] + + for component in payload.components: + if str(component.component_type).upper() != CoreComponentType.ACTION.name: + continue + + action_info = self._build_core_action_info( + plugin_id=payload.plugin_id, + component_name=component.name, + metadata=component.metadata, + ) + action_executor = self._build_runtime_action_executor( + plugin_id=payload.plugin_id, + component_name=component.name, + ) + registered = core_component_registry.register_action(action_info, action_executor) + if not registered: + logger.warning( + f"运行时 Action {payload.plugin_id}.{component.name} 无法镜像到旧核心注册表," + "可能与现有 Action 重名" + ) + continue + mirrored_action_names.append(component.name) + + if mirrored_action_names: + self._mirrored_core_actions[payload.plugin_id] = mirrored_action_names + + def _remove_core_action_mirrors(self, plugin_id: str) -> None: + """移除某个插件镜像到旧核心注册表的所有 Action。 + + Args: + plugin_id: 目标插件 ID。 + """ + mirrored_action_names = self._mirrored_core_actions.pop(plugin_id, []) + for action_name in mirrored_action_names: + core_component_registry.remove_action(action_name) + + @staticmethod + def _build_message_gateway_driver_id(plugin_id: str, gateway_name: str) -> str: + """构造消息网关驱动 ID。 + + Args: + plugin_id: 插件 ID。 + gateway_name: 网关组件名称。 Returns: str: 对应 Platform IO 中的驱动 ID。 """ - return f"adapter:{plugin_id}" - async def _register_adapter_driver(self, plugin_id: str, adapter: AdapterDeclarationPayload) -> None: - """将适配器插件驱动注册到 Platform IO。 + return f"gateway:{plugin_id}:{gateway_name}" + + @staticmethod + def _normalize_runtime_route_value(value: str) -> Optional[str]: + """规范化运行时路由字段。 Args: - plugin_id: 适配器插件 ID。 - adapter: 经过校验的适配器声明。 + value: 待规范化的原始字符串。 - Raises: - ValueError: 当驱动注册失败时抛出。 + Returns: + Optional[str]: 规范化后非空则返回字符串,否则返回 ``None``。 """ - await self._unregister_adapter_driver(plugin_id) + + normalized_value = str(value or "").strip() + return normalized_value or None + + def _resolve_message_gateway_entry( + self, + plugin_id: str, + gateway_name: str, + ) -> Optional[Any]: + """解析指定插件的消息网关组件。 + + Args: + plugin_id: 插件 ID。 + gateway_name: 网关组件名称;为空时按兼容规则推断。 + + Returns: + Optional[Any]: 匹配到的消息网关组件条目。 + """ + + if gateway_name: + return self._component_registry.get_message_gateway( + plugin_id=plugin_id, + name=gateway_name, + enabled_only=False, + ) + + gateways = self._component_registry.get_message_gateways(plugin_id=plugin_id, enabled_only=False) + if len(gateways) == 1: + return gateways[0] + + return None + + async def _register_message_gateway_driver( + self, + plugin_id: str, + gateway_entry: Any, + route_key: RouteKey, + ) -> None: + """为消息网关注册驱动并绑定发送/接收路由。 + + Args: + plugin_id: 插件 ID。 + gateway_entry: 消息网关组件条目。 + route_key: 当前链路对应的路由键。 + """ + + await self._unregister_message_gateway_driver(plugin_id, gateway_entry.name) platform_io_manager = get_platform_io_manager() driver = PluginPlatformDriver( - driver_id=self._build_adapter_driver_id(plugin_id), - platform=adapter.platform, - account_id=adapter.account_id or None, - scope=adapter.scope or None, + driver_id=self._build_message_gateway_driver_id(plugin_id, gateway_entry.name), + platform=route_key.platform, + account_id=route_key.account_id, + scope=route_key.scope, plugin_id=plugin_id, - send_method=adapter.send_method, + component_name=gateway_entry.name, + supports_send=bool(gateway_entry.supports_send), supervisor=self, metadata={ - "protocol": adapter.protocol, - **adapter.metadata, + "protocol": gateway_entry.protocol, + "route_type": gateway_entry.route_type, + **gateway_entry.metadata, }, ) @@ -620,20 +896,36 @@ class PluginRunnerSupervisor: platform_io_manager.unregister_driver(driver.driver_id) raise - self._registered_adapters[plugin_id] = adapter - self._adapter_runtime_states[plugin_id] = _AdapterRuntimeState() + binding_metadata = { + "plugin_id": plugin_id, + "gateway_name": gateway_entry.name, + "protocol": gateway_entry.protocol, + "route_type": gateway_entry.route_type, + **gateway_entry.metadata, + } + binding = RouteBinding( + route_key=route_key, + driver_id=driver.driver_id, + driver_kind=DriverKind.PLUGIN, + metadata=binding_metadata, + ) + if gateway_entry.supports_send: + platform_io_manager.bind_send_route(binding) + if gateway_entry.supports_receive: + platform_io_manager.bind_receive_route(binding) - async def _unregister_adapter_driver(self, plugin_id: str) -> None: - """从 Platform IO 注销一个适配器驱动。 + async def _unregister_message_gateway_driver(self, plugin_id: str, gateway_name: str) -> None: + """从 Platform IO 注销单个消息网关驱动。 Args: - plugin_id: 适配器插件 ID。 + plugin_id: 插件 ID。 + gateway_name: 网关组件名称。 """ - platform_io_manager = get_platform_io_manager() - driver_id = self._build_adapter_driver_id(plugin_id) - adapter = self._registered_adapters.get(plugin_id) - self._remove_adapter_route_bindings(plugin_id) + platform_io_manager = get_platform_io_manager() + driver_id = self._build_message_gateway_driver_id(plugin_id, gateway_name) + platform_io_manager.send_route_table.remove_bindings_by_driver(driver_id) + platform_io_manager.receive_route_table.remove_bindings_by_driver(driver_id) with contextlib.suppress(Exception): if platform_io_manager.is_started: @@ -641,204 +933,83 @@ class PluginRunnerSupervisor: else: platform_io_manager.unregister_driver(driver_id) - if adapter is not None: - self._refresh_platform_default_route(adapter.platform) - - self._registered_adapters.pop(plugin_id, None) - self._adapter_runtime_states.pop(plugin_id, None) - - async def _unregister_all_adapter_drivers(self) -> None: - """注销当前 Supervisor 管理的全部适配器驱动。""" - plugin_ids = list(self._registered_adapters.keys()) - for plugin_id in plugin_ids: - await self._unregister_adapter_driver(plugin_id) - - def _remove_adapter_route_bindings(self, plugin_id: str) -> None: - """移除某个适配器驱动当前持有的全部路由绑定。 + async def _unregister_all_message_gateway_drivers_for_plugin(self, plugin_id: str) -> None: + """注销指定插件的全部消息网关驱动。 Args: - plugin_id: 适配器插件 ID。 + plugin_id: 插件 ID。 """ - platform_io_manager = get_platform_io_manager() - platform_io_manager.route_table.remove_bindings_by_driver(self._build_adapter_driver_id(plugin_id)) - @staticmethod - def _normalize_runtime_route_value(value: str) -> Optional[str]: - """规范化适配器运行时路由字段。 + gateway_names = list(self._message_gateway_states.get(plugin_id, {}).keys()) + for gateway_name in gateway_names: + await self._unregister_message_gateway_driver(plugin_id, gateway_name) - Args: - value: 待规范化的原始字符串。 - - Returns: - Optional[str]: 规范化后非空则返回字符串,否则返回 ``None``。 - """ - normalized_value = str(value).strip() - return normalized_value or None - - def _build_runtime_route_key( + def _build_message_gateway_route_key( self, - adapter: AdapterDeclarationPayload, - payload: AdapterStateUpdatePayload, + gateway_entry: Any, + payload: MessageGatewayStateUpdatePayload, ) -> RouteKey: - """根据运行时状态更新构造适配器生效路由键。 + """根据消息网关运行时状态构造路由键。 Args: - adapter: 当前适配器声明。 - payload: 适配器上报的运行时状态。 + gateway_entry: 消息网关组件条目。 + payload: 网关上报的运行时状态。 Returns: - RouteKey: 当前连接应接管的精确路由键。 + RouteKey: 当前链路对应的路由键。 Raises: - ValueError: 当静态声明与运行时上报的身份信息冲突时抛出。 + ValueError: 当平台信息缺失时抛出。 """ - runtime_account_id = self._normalize_runtime_route_value(payload.account_id) - runtime_scope = self._normalize_runtime_route_value(payload.scope) - if adapter.account_id and runtime_account_id and adapter.account_id != runtime_account_id: - raise ValueError( - f"适配器声明的 account_id={adapter.account_id} 与运行时上报的 {runtime_account_id} 不一致" - ) - if adapter.scope and runtime_scope and adapter.scope != runtime_scope: - raise ValueError(f"适配器声明的 scope={adapter.scope} 与运行时上报的 {runtime_scope} 不一致") + platform = str(payload.platform or gateway_entry.platform or "").strip() + if not platform: + raise ValueError(f"消息网关 {gateway_entry.full_name} 未提供有效的平台名称") return RouteKey( - platform=adapter.platform, - account_id=runtime_account_id or adapter.account_id or None, - scope=runtime_scope or adapter.scope or None, + platform=platform, + account_id=self._normalize_runtime_route_value(payload.account_id) or gateway_entry.account_id or None, + scope=self._normalize_runtime_route_value(payload.scope) or gateway_entry.scope or None, ) - def _bind_runtime_exact_route( + def _apply_message_gateway_state( self, plugin_id: str, - adapter: AdapterDeclarationPayload, - route_key: RouteKey, - ) -> None: - """为适配器连接绑定精确生效路由。 + gateway_entry: Any, + payload: MessageGatewayStateUpdatePayload, + ) -> Tuple[_MessageGatewayRuntimeState, Dict[str, Any]]: + """应用消息网关运行时状态,并同步 Platform IO 路由。 Args: - plugin_id: 适配器插件 ID。 - adapter: 当前适配器声明。 - route_key: 当前连接对应的精确路由键。 + plugin_id: 插件 ID。 + gateway_entry: 消息网关组件条目。 + payload: 网关上报的运行时状态。 - Raises: - RouteBindingConflictError: 当目标路由已被其他 active owner 占用时抛出。 + Returns: + Tuple[_MessageGatewayRuntimeState, Dict[str, Any]]: 更新后的状态与路由键字典。 """ - platform_io_manager = get_platform_io_manager() - platform_io_manager.bind_route( - RouteBinding( - route_key=route_key, - driver_id=self._build_adapter_driver_id(plugin_id), - driver_kind=DriverKind.PLUGIN, - metadata={ - "plugin_id": plugin_id, - "protocol": adapter.protocol, - "binding_role": _ADAPTER_BINDING_ROLE_RUNTIME_EXACT, - }, + + plugin_states = self._message_gateway_states.setdefault(plugin_id, {}) + if not payload.ready: + runtime_state = _MessageGatewayRuntimeState( + ready=False, + platform=self._normalize_runtime_route_value(payload.platform) or gateway_entry.platform or None, + account_id=self._normalize_runtime_route_value(payload.account_id) or gateway_entry.account_id or None, + scope=self._normalize_runtime_route_value(payload.scope) or gateway_entry.scope or None, + metadata=dict(payload.metadata), ) - ) - - def _list_runtime_exact_bindings(self, platform: str) -> List[RouteBinding]: - """列出某个平台上由 Host 动态维护的精确适配器绑定。 - - Args: - platform: 目标平台名称。 - - Returns: - List[RouteBinding]: 当前平台上全部动态精确绑定。 - """ - platform_io_manager = get_platform_io_manager() - return [ - binding - for binding in platform_io_manager.route_table.list_bindings() - if binding.mode == RouteMode.ACTIVE - and binding.route_key.platform == platform - and binding.metadata.get("binding_role") == _ADAPTER_BINDING_ROLE_RUNTIME_EXACT - ] - - def _refresh_platform_default_route(self, platform: str) -> None: - """根据当前精确绑定数量刷新平台级默认路由。 - - 当某个平台恰好只存在一个动态精确绑定时,会为该绑定额外创建一条 - ``RouteKey(platform=)`` 形式的默认路由,方便缺少账号维度的 - 出站消息继续找到唯一 owner。若精确绑定数量变为 0 或大于 1,则撤销 - 由 Host 自动维护的默认路由,避免出现隐式歧义。 - - Args: - platform: 目标平台名称。 - """ - platform_io_manager = get_platform_io_manager() - default_route_key = RouteKey(platform=platform) - existing_default_binding = platform_io_manager.route_table.get_active_binding(default_route_key, exact_only=True) - - if existing_default_binding is not None: - binding_role = existing_default_binding.metadata.get("binding_role") - if binding_role != _ADAPTER_BINDING_ROLE_PLATFORM_DEFAULT: - return - platform_io_manager.unbind_route(default_route_key, existing_default_binding.driver_id) - - exact_bindings = self._list_runtime_exact_bindings(platform) - if len(exact_bindings) != 1: - return - - exact_binding = exact_bindings[0] - if exact_binding.route_key == default_route_key: - return - - platform_io_manager.bind_route( - RouteBinding( - route_key=default_route_key, - driver_id=exact_binding.driver_id, - driver_kind=exact_binding.driver_kind, - metadata={ - "plugin_id": exact_binding.metadata.get("plugin_id", ""), - "protocol": exact_binding.metadata.get("protocol", ""), - "binding_role": _ADAPTER_BINDING_ROLE_PLATFORM_DEFAULT, - }, - ), - replace=True, - ) - - def _apply_adapter_runtime_state( - self, - plugin_id: str, - adapter: AdapterDeclarationPayload, - payload: AdapterStateUpdatePayload, - ) -> Tuple[_AdapterRuntimeState, Dict[str, Any]]: - """应用适配器运行时状态,并同步 Platform IO 路由。 - - Args: - plugin_id: 适配器插件 ID。 - adapter: 当前适配器声明。 - payload: 适配器上报的运行时状态。 - - Returns: - Tuple[_AdapterRuntimeState, Dict[str, Any]]: 更新后的运行时状态,以及 - 供 RPC 响应返回的路由键字典。 - - Raises: - RouteBindingConflictError: 当新的精确路由与其他 active owner 冲突时抛出。 - ValueError: 当运行时路由信息不合法时抛出。 - """ - if not payload.connected: - self._remove_adapter_route_bindings(plugin_id) - self._refresh_platform_default_route(adapter.platform) - runtime_state = _AdapterRuntimeState(connected=False, metadata=dict(payload.metadata)) - self._adapter_runtime_states[plugin_id] = runtime_state + plugin_states[gateway_entry.name] = runtime_state return runtime_state, {} - route_key = self._build_runtime_route_key(adapter, payload) - self._remove_adapter_route_bindings(plugin_id) - self._bind_runtime_exact_route(plugin_id, adapter, route_key) - self._refresh_platform_default_route(adapter.platform) - - runtime_state = _AdapterRuntimeState( - connected=True, + route_key = self._build_message_gateway_route_key(gateway_entry, payload) + runtime_state = _MessageGatewayRuntimeState( + ready=True, + platform=route_key.platform, account_id=route_key.account_id, scope=route_key.scope, metadata=dict(payload.metadata), ) - self._adapter_runtime_states[plugin_id] = runtime_state + plugin_states[gateway_entry.name] = runtime_state return runtime_state, { "platform": route_key.platform, "account_id": route_key.account_id, @@ -856,8 +1027,9 @@ class PluginRunnerSupervisor: Args: session_message: 已构造好的内部消息对象。 route_key: Host 为该消息解析出的标准路由键。 - route_metadata: 适配器通过 RPC 补充的原始路由辅助元数据。 + route_metadata: 插件通过 RPC 补充的原始路由辅助元数据。 """ + additional_config = session_message.message_info.additional_config if not isinstance(additional_config, dict): additional_config = {} @@ -877,45 +1049,49 @@ class PluginRunnerSupervisor: def _build_inbound_route_key( self, - adapter: AdapterDeclarationPayload, + gateway_entry: Any, + runtime_state: _MessageGatewayRuntimeState, message: Dict[str, Any], route_metadata: Dict[str, Any], ) -> RouteKey: - """为适配器入站消息构造归一路由键。 + """为入站消息构造归一路由键。 Args: - adapter: 当前适配器声明。 + gateway_entry: 接收消息的网关组件条目。 + runtime_state: 当前网关的运行时状态。 message: 标准消息字典。 route_metadata: 插件补充的路由辅助元数据。 Returns: RouteKey: 供 Platform IO 使用的规范化路由键。 - - Raises: - ValueError: 消息平台字段与适配器平台声明不一致时抛出。 """ - message_platform = str(message.get("platform") or adapter.platform).strip() - if message_platform != adapter.platform: - raise ValueError( - f"外部消息平台 {message_platform} 与适配器 {adapter.platform} 不一致" - ) + + platform = str( + message.get("platform") + or route_metadata.get("platform") + or runtime_state.platform + or gateway_entry.platform + or "" + ).strip() + if not platform: + raise ValueError(f"消息网关 {gateway_entry.full_name} 的入站消息缺少平台信息") try: route_key = RouteKeyFactory.from_message_dict(message) except Exception: - route_key = RouteKey(platform=message_platform) + route_key = RouteKey(platform=platform) route_account_id, route_scope = RouteKeyFactory.extract_components(route_metadata) - account_id = route_key.account_id or route_account_id or adapter.account_id or None - scope = route_key.scope or route_scope or adapter.scope or None + account_id = route_key.account_id or route_account_id or runtime_state.account_id or gateway_entry.account_id or None + scope = route_key.scope or route_scope or runtime_state.scope or gateway_entry.scope or None return RouteKey( - platform=message_platform, + platform=platform, account_id=account_id, scope=scope, ) - async def _handle_update_adapter_state(self, envelope: Envelope) -> Envelope: - """处理适配器插件上报的运行时状态更新。 + async def _handle_update_message_gateway_state(self, envelope: Envelope) -> Envelope: + """处理消息网关上报的运行时状态更新。 Args: envelope: RPC 请求信封。 @@ -923,38 +1099,42 @@ class PluginRunnerSupervisor: Returns: Envelope: 状态更新处理结果。 """ + try: - payload = AdapterStateUpdatePayload.model_validate(envelope.payload) + payload = MessageGatewayStateUpdatePayload.model_validate(envelope.payload) except Exception as exc: return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc)) - adapter = self._registered_adapters.get(envelope.plugin_id) - if adapter is None: + gateway_entry = self._resolve_message_gateway_entry(envelope.plugin_id, payload.gateway_name) + if gateway_entry is None: return envelope.make_error_response( ErrorCode.E_METHOD_NOT_ALLOWED.value, - f"插件 {envelope.plugin_id} 未声明为适配器,不能更新运行时状态", + f"插件 {envelope.plugin_id} 未声明消息网关 {payload.gateway_name or ''}", ) try: - runtime_state, route_key_dict = self._apply_adapter_runtime_state( + if payload.ready: + route_key = self._build_message_gateway_route_key(gateway_entry, payload) + await self._register_message_gateway_driver(envelope.plugin_id, gateway_entry, route_key) + else: + await self._unregister_message_gateway_driver(envelope.plugin_id, gateway_entry.name) + runtime_state, route_key_dict = self._apply_message_gateway_state( plugin_id=envelope.plugin_id, - adapter=adapter, + gateway_entry=gateway_entry, payload=payload, ) - except RouteBindingConflictError as exc: - return envelope.make_error_response(ErrorCode.E_METHOD_NOT_ALLOWED.value, str(exc)) except Exception as exc: return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc)) - response = AdapterStateUpdateResultPayload( + response = MessageGatewayStateUpdateResultPayload( accepted=True, - connected=runtime_state.connected, + ready=runtime_state.ready, route_key=route_key_dict, ) return envelope.make_response(payload=response.model_dump()) - async def _handle_receive_external_message(self, envelope: Envelope) -> Envelope: - """处理适配器插件上报的外部入站消息。 + async def _handle_route_message(self, envelope: Envelope) -> Envelope: + """处理消息网关上报的外部入站消息。 Args: envelope: RPC 请求信封。 @@ -962,21 +1142,33 @@ class PluginRunnerSupervisor: Returns: Envelope: 注入结果响应。 """ + try: - payload = ReceiveExternalMessagePayload.model_validate(envelope.payload) + payload = RouteMessagePayload.model_validate(envelope.payload) except Exception as exc: return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc)) - adapter = self._registered_adapters.get(envelope.plugin_id) - if adapter is None: + gateway_entry = self._resolve_message_gateway_entry(envelope.plugin_id, payload.gateway_name) + if gateway_entry is None or not bool(gateway_entry.supports_receive): return envelope.make_error_response( ErrorCode.E_METHOD_NOT_ALLOWED.value, - f"插件 {envelope.plugin_id} 未声明为适配器,不能注入外部消息", + f"插件 {envelope.plugin_id} 未声明可接收的消息网关 {payload.gateway_name}", + ) + + runtime_state = self._message_gateway_states.get(envelope.plugin_id, {}).get( + gateway_entry.name, + _MessageGatewayRuntimeState(), + ) + if not runtime_state.ready: + return envelope.make_error_response( + ErrorCode.E_METHOD_NOT_ALLOWED.value, + f"消息网关 {gateway_entry.full_name} 尚未就绪,不能注入外部消息", ) try: route_key = self._build_inbound_route_key( - adapter=adapter, + gateway_entry=gateway_entry, + runtime_state=runtime_state, message=payload.message, route_metadata=payload.route_metadata, ) @@ -989,7 +1181,7 @@ class PluginRunnerSupervisor: accepted = await platform_io_manager.accept_inbound( InboundMessageEnvelope( route_key=route_key, - driver_id=self._build_adapter_driver_id(envelope.plugin_id), + driver_id=self._build_message_gateway_driver_id(envelope.plugin_id, gateway_entry.name), driver_kind=DriverKind.PLUGIN, external_message_id=payload.external_message_id or str(payload.message.get("message_id") or "") or None, dedupe_key=payload.dedupe_key or None, @@ -997,7 +1189,8 @@ class PluginRunnerSupervisor: payload=payload.message, metadata={ "plugin_id": envelope.plugin_id, - "protocol": adapter.protocol, + "gateway_name": gateway_entry.name, + "protocol": gateway_entry.protocol, **payload.route_metadata, }, ) @@ -1138,7 +1331,8 @@ class PluginRunnerSupervisor: await self._stderr_drain_task self._stderr_drain_task = None - await self._unregister_all_adapter_drivers() + for plugin_id in list(self._message_gateway_states.keys()): + await self._unregister_all_message_gateway_drivers_for_plugin(plugin_id) self._clear_runner_state() async def _health_check_loop(self) -> None: @@ -1213,11 +1407,12 @@ class PluginRunnerSupervisor: def _clear_runner_state(self) -> None: """清理当前 Runner 对应的 Host 侧注册状态。""" + for plugin_id in list(self._mirrored_core_actions.keys()): + self._remove_core_action_mirrors(plugin_id) self._authorization.clear() self._component_registry.clear() self._registered_plugins.clear() - self._registered_adapters.clear() - self._adapter_runtime_states.clear() + self._message_gateway_states.clear() self._runner_ready_events = asyncio.Event() self._runner_ready_payloads = RunnerReadyPayload() self._rpc_server.clear_handshake_state() diff --git a/src/plugin_runtime/integration.py b/src/plugin_runtime/integration.py index bf85669b..b74b2d46 100644 --- a/src/plugin_runtime/integration.py +++ b/src/plugin_runtime/integration.py @@ -18,7 +18,7 @@ import tomlkit from src.common.logger import get_logger from src.config.config import global_config from src.config.file_watcher import FileChange, FileWatcher -from src.platform_io import DeliveryReceipt, InboundMessageEnvelope, get_platform_io_manager +from src.platform_io import DeliveryBatch, InboundMessageEnvelope, get_platform_io_manager from src.plugin_runtime.capabilities import ( RuntimeComponentCapabilityMixin, RuntimeCoreCapabilityMixin, @@ -351,15 +351,15 @@ class PluginRuntimeManager( async def try_send_message_via_platform_io( self, message: "SessionMessage", - ) -> Optional[DeliveryReceipt]: + ) -> Optional[DeliveryBatch]: """尝试通过 Platform IO 中间层发送消息。 Args: message: 待发送的内部会话消息。 Returns: - Optional[DeliveryReceipt]: 若当前消息存在 active 路由,则返回实际发送 - 结果;若没有可用路由或 Platform IO 尚未启动,则返回 ``None``。 + Optional[DeliveryBatch]: 若当前消息命中了至少一条发送路由,则返回 + 实际发送结果;若没有可用路由或 Platform IO 尚未启动,则返回 ``None``。 """ if not self._started: return None @@ -374,7 +374,7 @@ class PluginRuntimeManager( logger.warning(f"根据消息构造 Platform IO 路由键失败: {exc}") return None - if platform_io_manager.resolve_driver(route_key) is None: + if not platform_io_manager.resolve_drivers(route_key): return None return await platform_io_manager.send_message(message, route_key) diff --git a/src/plugin_runtime/protocol/envelope.py b/src/plugin_runtime/protocol/envelope.py index f68657fa..cbbb71be 100644 --- a/src/plugin_runtime/protocol/envelope.py +++ b/src/plugin_runtime/protocol/envelope.py @@ -156,8 +156,6 @@ class RegisterPluginPayload(BaseModel): """插件版本""" components: List[ComponentDeclaration] = Field(default_factory=list, description="组件列表") """组件列表""" - adapter: Optional["AdapterDeclarationPayload"] = Field(default=None, description="可选的适配器声明") - """可选的适配器声明""" capabilities_required: List[str] = Field(default_factory=list, description="所需能力列表") """所需能力列表""" @@ -287,50 +285,39 @@ class ReloadPluginResultPayload(BaseModel): """重载失败的插件及原因""" -class AdapterDeclarationPayload(BaseModel): - """适配器插件声明载荷。""" +class MessageGatewayStateUpdatePayload(BaseModel): + """消息网关运行时状态更新载荷。""" - platform: str = Field(description="适配器负责的平台名称,例如 qq") - """适配器负责的平台名称,例如 qq""" - protocol: str = Field(default="", description="接入协议或实现名称,例如 napcat") - """接入协议或实现名称,例如 napcat""" - account_id: str = Field(default="", description="可选的账号 ID 或 self_id") - """可选的账号 ID 或 self_id""" - scope: str = Field(default="", description="可选的路由作用域") - """可选的路由作用域""" - send_method: str = Field(default="send_to_platform", description="Host 出站调用的插件方法名") - """Host 出站调用的插件方法名""" - metadata: Dict[str, Any] = Field(default_factory=dict, description="适配器附加元数据") - """适配器附加元数据""" - - -class AdapterStateUpdatePayload(BaseModel): - """适配器运行时状态更新载荷。""" - - connected: bool = Field(description="适配器当前是否已连接并准备接管路由") - """适配器当前是否已连接并准备接管路由""" - account_id: str = Field(default="", description="当前连接对应的账号 ID 或 self_id") - """当前连接对应的账号 ID 或 self_id""" - scope: str = Field(default="", description="当前连接对应的可选路由作用域") - """当前连接对应的可选路由作用域""" + gateway_name: str = Field(description="消息网关组件名称") + """消息网关组件名称""" + ready: bool = Field(description="当前链路是否已经就绪") + """当前链路是否已经就绪""" + platform: str = Field(default="", description="当前链路负责的平台名称") + """当前链路负责的平台名称""" + account_id: str = Field(default="", description="当前链路对应的账号 ID 或 self_id") + """当前链路对应的账号 ID 或 self_id""" + scope: str = Field(default="", description="当前链路对应的可选路由作用域") + """当前链路对应的可选路由作用域""" metadata: Dict[str, Any] = Field(default_factory=dict, description="可选的运行时状态元数据") """可选的运行时状态元数据""" -class AdapterStateUpdateResultPayload(BaseModel): - """适配器运行时状态更新结果载荷。""" +class MessageGatewayStateUpdateResultPayload(BaseModel): + """消息网关运行时状态更新结果载荷。""" accepted: bool = Field(description="Host 是否接受了本次状态更新") """Host 是否接受了本次状态更新""" - connected: bool = Field(description="Host 记录的当前连接状态") - """Host 记录的当前连接状态""" + ready: bool = Field(description="Host 记录的当前就绪状态") + """Host 记录的当前就绪状态""" route_key: Dict[str, Any] = Field(default_factory=dict, description="当前生效的路由键") """当前生效的路由键""" -class ReceiveExternalMessagePayload(BaseModel): - """适配器插件向 Host 注入外部消息的请求载荷。""" +class RouteMessagePayload(BaseModel): + """消息网关向 Host 路由外部消息的请求载荷。""" + gateway_name: str = Field(description="接收消息的网关组件名称") + """接收消息的网关组件名称""" message: Dict[str, Any] = Field(description="符合 MessageDict 结构的标准消息字典") """符合 MessageDict 结构的标准消息字典""" route_metadata: Dict[str, Any] = Field(default_factory=dict, description="可选的路由辅助元数据") diff --git a/src/plugin_runtime/runner/runner_main.py b/src/plugin_runtime/runner/runner_main.py index 8078c88b..3a50e2f7 100644 --- a/src/plugin_runtime/runner/runner_main.py +++ b/src/plugin_runtime/runner/runner_main.py @@ -25,7 +25,6 @@ import tomllib from src.common.logger import get_console_handler, get_logger, initialize_logging from src.plugin_runtime import ENV_HOST_VERSION, ENV_IPC_ADDRESS, ENV_PLUGIN_DIRS, ENV_SESSION_TOKEN from src.plugin_runtime.protocol.envelope import ( - AdapterDeclarationPayload, BootstrapPluginPayload, ComponentDeclaration, Envelope, @@ -219,7 +218,7 @@ class PluginRunner: """为插件实例创建并注入 PluginContext。 对新版 MaiBotPlugin(具有 _set_context 方法):创建 PluginContext 并注入。 - 对旧版 LegacyPluginAdapter(具有 _set_context 方法,由适配器代理):同上。 + 对旧版 LegacyPluginAdapter(具有 _set_context 方法,由兼容代理封装):同上。 """ if not hasattr(instance, "_set_context"): return @@ -293,7 +292,7 @@ class PluginRunner: self._rpc_client.register_method("plugin.invoke_command", self._handle_invoke) self._rpc_client.register_method("plugin.invoke_action", self._handle_invoke) self._rpc_client.register_method("plugin.invoke_tool", self._handle_invoke) - self._rpc_client.register_method("plugin.invoke_adapter", self._handle_invoke) + self._rpc_client.register_method("plugin.invoke_message_gateway", self._handle_invoke) self._rpc_client.register_method("plugin.emit_event", self._handle_event_invoke) self._rpc_client.register_method("plugin.invoke_hook", self._handle_hook_invoke) self._rpc_client.register_method("plugin.invoke_workflow_step", self._handle_workflow_step) @@ -331,29 +330,6 @@ class PluginRunner: """撤销 bootstrap 期间为插件签发的能力令牌。""" await self._bootstrap_plugin(meta, capabilities_required=[]) - def _collect_adapter_declaration(self, meta: PluginMeta) -> Optional[AdapterDeclarationPayload]: - """从插件实例中提取适配器声明。 - - Args: - meta: 待提取声明的插件元数据。 - - Returns: - Optional[AdapterDeclarationPayload]: 若插件声明了适配器角色,则返回 - 经过校验的适配器声明;否则返回 ``None``。 - - Raises: - ValueError: 插件导出的适配器声明结构非法时抛出。 - """ - instance = meta.instance - if not hasattr(instance, "get_adapter_info"): - return None - - adapter_info = instance.get_adapter_info() - if adapter_info is None: - return None - - return AdapterDeclarationPayload.model_validate(adapter_info) - async def _register_plugin(self, meta: PluginMeta) -> bool: """向 Host 注册单个插件。 @@ -379,17 +355,10 @@ class PluginRunner: for comp_info in instance.get_components() ) - try: - adapter = self._collect_adapter_declaration(meta) - except Exception as exc: - logger.error(f"插件 {meta.plugin_id} 适配器声明非法: {exc}", exc_info=True) - return False - reg_payload = RegisterPluginPayload( plugin_id=meta.plugin_id, plugin_version=meta.version, components=components, - adapter=adapter, capabilities_required=meta.capabilities_required, )