From 7d325ab56bbcda5c7371790064ea8c9048848f6d Mon Sep 17 00:00:00 2001 From: DrSmoothl <1787882683@qq.com> Date: Sat, 4 Apr 2026 16:05:43 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E9=80=82=E9=85=8D?= =?UTF-8?q?=E5=99=A8=E5=9B=9E=E8=B0=83=E5=A4=84=E7=90=86=E5=92=8C=E6=88=90?= =?UTF-8?q?=E5=8A=9F=E5=9B=9E=E6=89=A7=E6=B6=88=E6=81=AF=20ID=20=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytests/test_napcat_adapter_sdk.py | 61 ++++++++++++++++++++++ pytests/test_send_service.py | 45 +++++++++++++++- src/plugin_runtime/host/message_gateway.py | 4 +- src/services/send_service.py | 58 ++++++++++++++++++++ 4 files changed, 166 insertions(+), 2 deletions(-) diff --git a/pytests/test_napcat_adapter_sdk.py b/pytests/test_napcat_adapter_sdk.py index 170219e5..82d0ad9f 100644 --- a/pytests/test_napcat_adapter_sdk.py +++ b/pytests/test_napcat_adapter_sdk.py @@ -6,6 +6,7 @@ from typing import Any, Dict, List, Tuple import logging import sys +from types import SimpleNamespace import pytest @@ -331,6 +332,66 @@ async def test_runtime_state_reports_via_gateway_capability() -> None: assert gateway_capability.calls[1]["platform"] == "qq" +@pytest.mark.asyncio +async def test_napcat_plugin_send_result_contains_message_id_echo_callback() -> None: + """NapCat 插件发送成功后应显式返回消息 ID 回调数据。""" + + _napcat_gateway_name, _napcat_server_config, napcat_plugin_cls, _runtime_state_cls = _load_napcat_sdk_symbols() + plugin = napcat_plugin_cls() + + class _FakeOutboundCodec: + """用于测试的出站编码器替身。""" + + @staticmethod + def build_outbound_action(message: Dict[str, Any], route: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: + """返回固定动作与参数。""" + + del message + del route + return "send_msg", {"message": "hello"} + + class _FakeTransport: + """用于测试的传输层替身。""" + + @staticmethod + async def call_action(action_name: str, params: Dict[str, Any]) -> Dict[str, Any]: + """返回带平台消息 ID 的成功响应。""" + + del action_name + del params + return { + "status": "ok", + "data": { + "message_id": "platform-message-id", + }, + } + + plugin._require_runtime_bundle = lambda: SimpleNamespace( # type: ignore[method-assign] + outbound_codec=_FakeOutboundCodec(), + transport=_FakeTransport(), + ) + + result = await plugin.handle_napcat_gateway( + message={"message_id": "internal-message-id"}, + route={}, + ) + + assert result["success"] is True + assert result["external_message_id"] == "platform-message-id" + assert result["metadata"]["adapter_callbacks"] == [ + { + "name": "message_id_echo", + "payload": { + "content": { + "type": "echo", + "echo": "internal-message-id", + "actual_id": "platform-message-id", + } + }, + } + ] + + @pytest.mark.asyncio async def test_inbound_codec_parses_forward_nodes_from_legacy_message_field() -> None: """入站编解码器应兼容旧版 ``sender + message`` 转发节点结构。""" diff --git a/pytests/test_send_service.py b/pytests/test_send_service.py index 44f77090..476d6be8 100644 --- a/pytests/test_send_service.py +++ b/pytests/test_send_service.py @@ -29,6 +29,7 @@ class _FakePlatformIOManager: self.sent_messages.append( { "message": message, + "message_id_before_send": str(getattr(message, "message_id", "") or ""), "route_key": route_key, "metadata": metadata, } @@ -67,16 +68,43 @@ def test_inherit_platform_io_route_metadata_falls_back_to_bot_account( @pytest.mark.asyncio async def test_text_to_stream_delegates_to_platform_io(monkeypatch: pytest.MonkeyPatch) -> None: + import src.common.message_server.api as message_server_api + fake_manager = _FakePlatformIOManager( delivery_batch=SimpleNamespace( has_success=True, - sent_receipts=[SimpleNamespace(driver_id="plugin.qq.sender")], + sent_receipts=[ + SimpleNamespace( + driver_id="plugin.qq.sender", + external_message_id="real-message-id", + metadata={ + "adapter_callbacks": [ + { + "name": "message_id_echo", + "payload": { + "content": { + "type": "echo", + "echo": "send_api_test", + "actual_id": "real-message-id", + } + }, + } + ] + }, + ) + ], failed_receipts=[], route_key=SimpleNamespace(platform="qq"), ) ) + callback_payloads: List[Dict[str, Any]] = [] stored_messages: List[Any] = [] + async def fake_echo_handler(payload: Dict[str, Any]) -> None: + """记录发送成功后的消息 ID 回调。""" + + callback_payloads.append(payload) + monkeypatch.setattr(send_service, "get_platform_io_manager", lambda: fake_manager) monkeypatch.setattr(send_service, "get_bot_account", lambda platform: "bot-qq") monkeypatch.setattr( @@ -89,6 +117,11 @@ async def test_text_to_stream_delegates_to_platform_io(monkeypatch: pytest.Monke "store_message_to_db", lambda message: stored_messages.append(message), ) + monkeypatch.setattr( + message_server_api, + "global_api", + SimpleNamespace(_custom_message_handlers={"message_id_echo": fake_echo_handler}), + ) result = await send_service.text_to_stream(text="你好", stream_id="test-session") @@ -97,6 +130,16 @@ async def test_text_to_stream_delegates_to_platform_io(monkeypatch: pytest.Monke assert len(fake_manager.sent_messages) == 1 assert fake_manager.sent_messages[0]["metadata"] == {"show_log": False} assert len(stored_messages) == 1 + assert stored_messages[0].message_id == "real-message-id" + assert callback_payloads == [ + { + "content": { + "type": "echo", + "echo": "send_api_test", + "actual_id": "real-message-id", + } + } + ] @pytest.mark.asyncio diff --git a/src/plugin_runtime/host/message_gateway.py b/src/plugin_runtime/host/message_gateway.py index 90f94493..d1799648 100644 --- a/src/plugin_runtime/host/message_gateway.py +++ b/src/plugin_runtime/host/message_gateway.py @@ -101,7 +101,9 @@ class MessageGateway: return False first_successful_receipt = delivery_batch.sent_receipts[0] - internal_message.message_id = first_successful_receipt.external_message_id or internal_message.message_id + external_message_id = str(first_successful_receipt.external_message_id or "").strip() + if external_message_id: + internal_message.message_id = external_message_id if save_to_db: try: from src.common.utils.utils_message import MessageUtils diff --git a/src/services/send_service.py b/src/services/send_service.py index 52223de8..53a58acf 100644 --- a/src/services/send_service.py +++ b/src/services/send_service.py @@ -637,6 +637,61 @@ def _store_sent_message(message: SessionMessage) -> None: MessageUtils.store_message_to_db(message) +async def _apply_successful_delivery_receipt(message: SessionMessage, delivery_batch: DeliveryBatch) -> None: + """将成功回执中的平台消息 ID 回填到内部消息。 + + Args: + message: 已发送成功的内部消息对象。 + delivery_batch: Platform IO 返回的批量回执。 + """ + if not delivery_batch.sent_receipts: + return + + original_message_id = str(message.message_id or "").strip() + external_message_id = str(delivery_batch.sent_receipts[0].external_message_id or "").strip() + if not external_message_id or external_message_id == original_message_id: + return + + message.message_id = external_message_id + + +async def _dispatch_adapter_callbacks(delivery_batch: DeliveryBatch) -> None: + """分发适配器随成功回执返回的自定义回调。 + + Args: + delivery_batch: Platform IO 返回的批量回执。 + """ + try: + from src.common.message_server import api as message_server_api + + global_api = getattr(message_server_api, "global_api", None) + custom_handlers = getattr(global_api, "_custom_message_handlers", None) + if not isinstance(custom_handlers, dict): + return + + for receipt in delivery_batch.sent_receipts: + raw_callbacks = receipt.metadata.get("adapter_callbacks") + if not isinstance(raw_callbacks, list): + continue + + for raw_callback in raw_callbacks: + if not isinstance(raw_callback, dict): + continue + + callback_name = str(raw_callback.get("name") or "").strip() + payload = raw_callback.get("payload") + if not callback_name or not isinstance(payload, dict): + continue + + handler = custom_handlers.get(callback_name) + if handler is None: + continue + + await handler(payload) + except Exception as exc: + logger.warning(f"[SendService] 分发适配器回调失败: {exc}") + + async def _notify_memory_automation_on_message_sent(message: SessionMessage) -> None: """在发送成功后通知长期记忆自动化服务。 @@ -740,6 +795,9 @@ async def _send_via_platform_io( return False sent = bool(delivery_batch.has_success) + if sent: + await _apply_successful_delivery_receipt(message, delivery_batch) + await _dispatch_adapter_callbacks(delivery_batch) await _invoke_send_hook( "send_service.after_send", message,