From baabe4463ebea09d864b1dff8b78e04e78823751 Mon Sep 17 00:00:00 2001 From: DrSmoothl <1787882683@qq.com> Date: Sun, 22 Mar 2026 00:19:26 +0800 Subject: [PATCH] feat: add NapCat built-in adapter with configuration, filters, and transport layer - Implemented configuration parsing for NapCat adapter including server, chat, and filter settings. - Added message filtering logic to handle inbound chat messages based on user and group lists. - Developed a transport layer for WebSocket communication with the NapCat server. - Created a query service for fetching user and group information from the QQ platform. - Implemented runtime state management to report connection status to the host. - Added notice handling for various QQ platform events. --- pytests/test_napcat_adapter_codec.py | 70 ++ pytests/test_napcat_adapter_config.py | 91 ++ pytests/test_platform_io_dedupe.py | 164 +++ pytests/test_plugin_message_utils_runtime.py | 87 ++ src/platform_io/manager.py | 35 +- src/platform_io/types.py | 5 +- src/plugin_runtime/host/message_utils.py | 268 ++++- .../built_in/napcat_adapter/__init__.py | 1 + .../built_in/napcat_adapter/codec_inbound.py | 414 +++++++ .../built_in/napcat_adapter/codec_outbound.py | 192 +++ src/plugins/built_in/napcat_adapter/config.py | 398 +++++++ .../built_in/napcat_adapter/constants.py | 9 + .../built_in/napcat_adapter/filters.py | 68 ++ src/plugins/built_in/napcat_adapter/plugin.py | 1049 +++-------------- .../built_in/napcat_adapter/qq_notice.py | 224 ++++ .../built_in/napcat_adapter/qq_queries.py | 170 +++ .../built_in/napcat_adapter/runtime_state.py | 85 ++ .../built_in/napcat_adapter/transport.py | 322 +++++ 18 files changed, 2755 insertions(+), 897 deletions(-) create mode 100644 pytests/test_napcat_adapter_codec.py create mode 100644 pytests/test_napcat_adapter_config.py create mode 100644 pytests/test_platform_io_dedupe.py create mode 100644 pytests/test_plugin_message_utils_runtime.py create mode 100644 src/plugins/built_in/napcat_adapter/__init__.py create mode 100644 src/plugins/built_in/napcat_adapter/codec_inbound.py create mode 100644 src/plugins/built_in/napcat_adapter/codec_outbound.py create mode 100644 src/plugins/built_in/napcat_adapter/config.py create mode 100644 src/plugins/built_in/napcat_adapter/constants.py create mode 100644 src/plugins/built_in/napcat_adapter/filters.py create mode 100644 src/plugins/built_in/napcat_adapter/qq_notice.py create mode 100644 src/plugins/built_in/napcat_adapter/qq_queries.py create mode 100644 src/plugins/built_in/napcat_adapter/runtime_state.py create mode 100644 src/plugins/built_in/napcat_adapter/transport.py diff --git a/pytests/test_napcat_adapter_codec.py b/pytests/test_napcat_adapter_codec.py new file mode 100644 index 00000000..6f557e08 --- /dev/null +++ b/pytests/test_napcat_adapter_codec.py @@ -0,0 +1,70 @@ +from pathlib import Path +from typing import Any, Dict + +import importlib +import sys + + +BUILT_IN_PLUGIN_ROOT = Path(__file__).resolve().parents[1] / "src" / "plugins" / "built_in" +if str(BUILT_IN_PLUGIN_ROOT) not in sys.path: + sys.path.insert(0, str(BUILT_IN_PLUGIN_ROOT)) + +NapCatOutboundCodec = importlib.import_module("napcat_adapter.codec_outbound").NapCatOutboundCodec + + +def test_napcat_outbound_codec_supports_binary_and_forward_segments() -> None: + codec = NapCatOutboundCodec() + raw_message = [ + {"type": "text", "data": "hello"}, + {"type": "image", "data": "", "hash": "h1", "binary_data_base64": "aW1hZ2U="}, + {"type": "emoji", "data": "", "hash": "h2", "binary_data_base64": "ZW1vamk="}, + {"type": "voice", "data": "", "hash": "h3", "binary_data_base64": "dm9pY2U="}, + { + "type": "reply", + "data": { + "target_message_id": "origin-1", + "target_message_content": "origin text", + }, + }, + { + "type": "forward", + "data": [ + { + "user_id": "42", + "user_nickname": "alice", + "user_cardname": "Alice", + "message_id": "fwd-1", + "content": [{"type": "text", "data": "node-text"}], + } + ], + }, + ] + + converted = codec.convert_segments(raw_message) + + assert converted[0] == {"type": "text", "data": {"text": "hello"}} + assert converted[1]["type"] == "image" + assert converted[1]["data"]["file"] == "base64://aW1hZ2U=" + assert converted[2]["type"] == "image" + assert converted[2]["data"]["subtype"] == 1 + assert converted[3] == {"type": "record", "data": {"file": "base64://dm9pY2U="}} + assert converted[4] == {"type": "reply", "data": {"id": "origin-1"}} + assert converted[5]["type"] == "node" + assert converted[5]["data"]["name"] == "alice" + assert converted[5]["data"]["content"] == [{"type": "text", "data": {"text": "node-text"}}] + + +def test_napcat_outbound_codec_builds_private_action_from_route_metadata() -> None: + codec = NapCatOutboundCodec() + message: Dict[str, Any] = { + "message_info": { + "user_info": {"user_id": "10001", "user_nickname": "tester"}, + "additional_config": {}, + }, + "raw_message": [{"type": "text", "data": "hello"}], + } + + action_name, params = codec.build_outbound_action(message, {"target_user_id": "30001"}) + + assert action_name == "send_private_msg" + assert params == {"message": [{"type": "text", "data": {"text": "hello"}}], "user_id": "30001"} diff --git a/pytests/test_napcat_adapter_config.py b/pytests/test_napcat_adapter_config.py new file mode 100644 index 00000000..688b1a48 --- /dev/null +++ b/pytests/test_napcat_adapter_config.py @@ -0,0 +1,91 @@ +from pathlib import Path +from typing import List + +import importlib +import sys + + +BUILT_IN_PLUGIN_ROOT = Path(__file__).resolve().parents[1] / "src" / "plugins" / "built_in" +if str(BUILT_IN_PLUGIN_ROOT) not in sys.path: + sys.path.insert(0, str(BUILT_IN_PLUGIN_ROOT)) + +NapCatPluginSettings = importlib.import_module("napcat_adapter.config").NapCatPluginSettings + + +class DummyLogger: + """用于测试的轻量日志对象。""" + + def __init__(self) -> None: + """初始化测试日志对象。""" + self.warnings: List[str] = [] + self.errors: List[str] = [] + + def warning(self, message: str) -> None: + """记录警告日志。 + + Args: + message: 待记录的日志内容。 + """ + self.warnings.append(message) + + def error(self, message: str) -> None: + """记录错误日志。 + + Args: + message: 待记录的日志内容。 + """ + self.errors.append(message) + + +def test_parse_new_napcat_server_config() -> None: + logger = DummyLogger() + settings = NapCatPluginSettings.from_mapping( + { + "plugin": {"enabled": True, "config_version": "0.1.0"}, + "napcat_server": { + "host": "localhost", + "port": 8095, + "token": "secret", + "heartbeat_interval": 45, + "reconnect_delay_sec": 7, + "action_timeout_sec": 18, + "connection_id": "main", + }, + }, + logger, + ) + + assert settings.should_connect() is True + assert settings.napcat_server.host == "localhost" + assert settings.napcat_server.port == 8095 + assert settings.napcat_server.token == "secret" + assert settings.napcat_server.heartbeat_interval == 45.0 + assert settings.napcat_server.reconnect_delay_sec == 7.0 + assert settings.napcat_server.action_timeout_sec == 18.0 + assert settings.napcat_server.connection_id == "main" + assert settings.napcat_server.build_ws_url() == "ws://localhost:8095" + assert settings.validate(logger) is True + + +def test_parse_legacy_connection_ws_url_fallback() -> None: + logger = DummyLogger() + settings = NapCatPluginSettings.from_mapping( + { + "plugin": {"enabled": True, "config_version": "0.1.0"}, + "connection": { + "ws_url": "ws://127.0.0.1:3001", + "access_token": "legacy-token", + "heartbeat_sec": 35, + "action_timeout_sec": 12, + }, + }, + logger, + ) + + assert settings.napcat_server.host == "127.0.0.1" + assert settings.napcat_server.port == 3001 + assert settings.napcat_server.token == "legacy-token" + assert settings.napcat_server.heartbeat_interval == 35.0 + assert settings.napcat_server.action_timeout_sec == 12.0 + assert settings.validate(logger) is True + assert logger.warnings diff --git a/pytests/test_platform_io_dedupe.py b/pytests/test_platform_io_dedupe.py new file mode 100644 index 00000000..4a3cbb44 --- /dev/null +++ b/pytests/test_platform_io_dedupe.py @@ -0,0 +1,164 @@ +"""Platform IO 入站去重策略测试。""" + +from types import SimpleNamespace +from typing import Any, Dict, List, Optional + +import pytest + +from src.platform_io.drivers.base import PlatformIODriver +from src.platform_io.manager import PlatformIOManager +from src.platform_io.types import DeliveryReceipt, DeliveryStatus, DriverDescriptor, DriverKind, InboundMessageEnvelope, RouteBinding, RouteKey + + +def _build_envelope( + *, + dedupe_key: str | None = None, + external_message_id: str | None = None, + session_message_id: str | None = None, + payload: Optional[Dict[str, Any]] = None, +) -> InboundMessageEnvelope: + """构造测试用入站信封。 + + Args: + dedupe_key: 显式去重键。 + external_message_id: 平台侧消息 ID。 + session_message_id: 规范化消息对象上的消息 ID。 + payload: 原始载荷。 + + Returns: + InboundMessageEnvelope: 测试用入站消息信封。 + """ + session_message = None + if session_message_id is not None: + session_message = SimpleNamespace(message_id=session_message_id) + + return InboundMessageEnvelope( + route_key=RouteKey(platform="qq", account_id="10001", scope="main"), + driver_id="plugin.napcat", + driver_kind=DriverKind.PLUGIN, + dedupe_key=dedupe_key, + external_message_id=external_message_id, + session_message=session_message, + payload=payload, + ) + + +class _StubPlatformIODriver(PlatformIODriver): + """测试用 Platform IO 驱动。""" + + async def send_message( + self, + message: Any, + route_key: RouteKey, + metadata: Optional[Dict[str, Any]] = None, + ) -> DeliveryReceipt: + """返回一个固定的成功回执。 + + Args: + message: 待发送的消息对象。 + route_key: 本次发送使用的路由键。 + metadata: 额外发送元数据。 + + Returns: + DeliveryReceipt: 固定的成功回执。 + """ + return DeliveryReceipt( + internal_message_id=str(getattr(message, "message_id", "stub-message-id")), + route_key=route_key, + status=DeliveryStatus.SENT, + driver_id=self.driver_id, + driver_kind=self.descriptor.kind, + ) + + +def _build_manager() -> PlatformIOManager: + """构造带有最小 active owner 的 Broker 管理器。 + + Returns: + PlatformIOManager: 已注册测试驱动并绑定活动路由的 Broker。 + """ + manager = PlatformIOManager() + driver = _StubPlatformIODriver( + DriverDescriptor( + driver_id="plugin.napcat", + kind=DriverKind.PLUGIN, + platform="qq", + account_id="10001", + scope="main", + ) + ) + manager.register_driver(driver) + manager.bind_route( + RouteBinding( + route_key=RouteKey(platform="qq", account_id="10001", scope="main"), + driver_id=driver.driver_id, + driver_kind=driver.descriptor.kind, + ) + ) + return manager + + +class TestPlatformIODedupe: + """Platform IO 去重测试。""" + + @pytest.mark.asyncio + async def test_accept_inbound_dedupes_by_external_message_id(self) -> None: + """相同平台消息 ID 的重复入站应被抑制。""" + manager = _build_manager() + accepted_envelopes: List[InboundMessageEnvelope] = [] + + async def dispatcher(envelope: InboundMessageEnvelope) -> None: + """记录被成功接收的入站消息。 + + Args: + envelope: 被 Broker 接受的入站消息。 + """ + accepted_envelopes.append(envelope) + + manager.set_inbound_dispatcher(dispatcher) + + first_envelope = _build_envelope( + external_message_id="msg-1", + payload={"message": "hello"}, + ) + second_envelope = _build_envelope( + external_message_id="msg-1", + payload={"message": "hello"}, + ) + + assert await manager.accept_inbound(first_envelope) is True + assert await manager.accept_inbound(second_envelope) is False + assert len(accepted_envelopes) == 1 + + @pytest.mark.asyncio + async def test_accept_inbound_without_stable_identity_does_not_guess_duplicate(self) -> None: + """缺少稳定身份时,不应仅凭 payload 内容猜测重复消息。""" + manager = _build_manager() + accepted_envelopes: List[InboundMessageEnvelope] = [] + + async def dispatcher(envelope: InboundMessageEnvelope) -> None: + """记录被成功接收的入站消息。 + + Args: + envelope: 被 Broker 接受的入站消息。 + """ + accepted_envelopes.append(envelope) + + manager.set_inbound_dispatcher(dispatcher) + + first_envelope = _build_envelope(payload={"message": "same-payload"}) + second_envelope = _build_envelope(payload={"message": "same-payload"}) + + assert await manager.accept_inbound(first_envelope) is True + assert await manager.accept_inbound(second_envelope) is True + assert len(accepted_envelopes) == 2 + + def test_build_inbound_dedupe_key_prefers_explicit_identity(self) -> None: + """去重键应只来自显式或稳定的技术身份。""" + explicit_envelope = _build_envelope(dedupe_key="dedupe-1", external_message_id="msg-1") + session_message_envelope = _build_envelope(session_message_id="session-1") + payload_only_envelope = _build_envelope(payload={"message": "hello"}) + + assert PlatformIOManager._build_inbound_dedupe_key(explicit_envelope) == "qq:10001:main:dedupe-1" + assert PlatformIOManager._build_inbound_dedupe_key(session_message_envelope) == "qq:10001:main:session-1" + assert PlatformIOManager._build_inbound_dedupe_key(payload_only_envelope) is None diff --git a/pytests/test_plugin_message_utils_runtime.py b/pytests/test_plugin_message_utils_runtime.py new file mode 100644 index 00000000..cb4b5341 --- /dev/null +++ b/pytests/test_plugin_message_utils_runtime.py @@ -0,0 +1,87 @@ +from datetime import datetime +from pathlib import Path + +import sys + +from src.chat.message_receive.message import SessionMessage +from src.common.data_models.mai_message_data_model import GroupInfo, MessageInfo, UserInfo +from src.common.data_models.message_component_data_model import ( + ForwardComponent, + ForwardNodeComponent, + ImageComponent, + MessageSequence, + ReplyComponent, + TextComponent, + VoiceComponent, +) +from src.plugin_runtime.host.message_utils import PluginMessageUtils + + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + + +def test_plugin_message_utils_preserves_binary_components_and_reply_metadata() -> None: + message = SessionMessage(message_id="msg-1", timestamp=datetime.now(), platform="qq") + message.message_info = MessageInfo( + user_info=UserInfo(user_id="10001", user_nickname="tester"), + group_info=GroupInfo(group_id="20001", group_name="group"), + additional_config={"self_id": "999"}, + ) + message.session_id = "qq:20001:10001" + message.processed_plain_text = "binary payload" + message.display_message = "binary payload" + message.raw_message = MessageSequence( + components=[ + TextComponent("hello"), + ImageComponent(binary_hash="", binary_data=b"image-bytes", content=""), + VoiceComponent(binary_hash="", binary_data=b"voice-bytes", content=""), + ReplyComponent( + target_message_id="origin-1", + target_message_content="origin text", + target_message_sender_id="42", + target_message_sender_nickname="alice", + target_message_sender_cardname="Alice", + ), + ForwardNodeComponent( + forward_components=[ + ForwardComponent( + user_nickname="bob", + user_id="43", + user_cardname="Bob", + message_id="forward-1", + content=[ + TextComponent("node-text"), + ImageComponent(binary_hash="", binary_data=b"node-image", content=""), + ], + ) + ] + ), + ] + ) + + message_dict = PluginMessageUtils._session_message_to_dict(message) + rebuilt_message = PluginMessageUtils._build_session_message_from_dict(dict(message_dict)) + + image_component = rebuilt_message.raw_message.components[1] + voice_component = rebuilt_message.raw_message.components[2] + reply_component = rebuilt_message.raw_message.components[3] + forward_component = rebuilt_message.raw_message.components[4] + + assert isinstance(image_component, ImageComponent) + assert image_component.binary_data == b"image-bytes" + + assert isinstance(voice_component, VoiceComponent) + assert voice_component.binary_data == b"voice-bytes" + + assert isinstance(reply_component, ReplyComponent) + assert reply_component.target_message_id == "origin-1" + assert reply_component.target_message_content == "origin text" + assert reply_component.target_message_sender_id == "42" + assert reply_component.target_message_sender_nickname == "alice" + assert reply_component.target_message_sender_cardname == "Alice" + + assert isinstance(forward_component, ForwardNodeComponent) + assert isinstance(forward_component.forward_components[0].content[1], ImageComponent) + assert forward_component.forward_components[0].content[1].binary_data == b"node-image" diff --git a/src/platform_io/manager.py b/src/platform_io/manager.py index 97835667..b1fe3bdc 100644 --- a/src/platform_io/manager.py +++ b/src/platform_io/manager.py @@ -2,9 +2,6 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional -import hashlib -import json - from src.common.logger import get_logger from src.platform_io.drivers.base import PlatformIODriver @@ -438,12 +435,17 @@ class PlatformIOManager: Returns: Optional[str]: 若可以构造稳定去重键则返回该键,否则返回 ``None``。 + + Notes: + 这里仅接受上游显式提供的稳定消息身份,例如 ``dedupe_key``、 + 平台侧 ``external_message_id`` 或已经完成规范化的 + ``session_message.message_id``。Broker 不再根据 ``payload`` 内容 + 猜测语义去重键,避免把“短时间内两条内容刚好完全相同”的合法消息 + 误判为重复入站。 """ raw_dedupe_key = envelope.dedupe_key or envelope.external_message_id if raw_dedupe_key is None and envelope.session_message is not None: raw_dedupe_key = envelope.session_message.message_id - if raw_dedupe_key is None and envelope.payload is not None: - raw_dedupe_key = PlatformIOManager._build_payload_fingerprint(envelope.payload) if raw_dedupe_key is None: return None @@ -453,29 +455,6 @@ class PlatformIOManager: return f"{envelope.route_key.to_dedupe_scope()}:{normalized_dedupe_key}" - @staticmethod - def _build_payload_fingerprint(payload: Dict[str, Any]) -> Optional[str]: - """根据消息载荷构造稳定指纹。 - - Args: - payload: 待构造指纹的原始载荷字典。 - - Returns: - Optional[str]: 若成功生成指纹则返回十六进制摘要,否则返回 ``None``。 - """ - try: - serialized_payload = json.dumps( - payload, - default=str, - ensure_ascii=True, - separators=(",", ":"), - sort_keys=True, - ) - except Exception: - return None - - return hashlib.sha256(serialized_payload.encode()).hexdigest() - @staticmethod def _validate_binding_against_driver(binding: RouteBinding, driver: PlatformIODriver) -> None: """校验路由绑定与驱动描述是否一致。 diff --git a/src/platform_io/types.py b/src/platform_io/types.py index c74dc246..8729b637 100644 --- a/src/platform_io/types.py +++ b/src/platform_io/types.py @@ -198,8 +198,9 @@ class InboundMessageEnvelope: driver_kind: 产出该消息的驱动类型。 external_message_id: 可选的平台侧消息 ID,用于去重。 dedupe_key: 可选的显式去重键。当外部消息没有稳定 ``message_id`` 时, - 可由上游驱动提供消息指纹。若这里为空,中间层仍可能继续回退到 - ``session_message.message_id`` 或 ``payload`` 指纹。 + 可由上游驱动提供稳定的技术性幂等键。若这里为空,中间层仅会继续 + 回退到 ``external_message_id`` 或 ``session_message.message_id``, + 不会再根据 ``payload`` 内容猜测语义去重键。 session_message: 可选的、已经完成规范化的 ``SessionMessage`` 对象。 payload: 可选的原始字典载荷,供延迟转换或调试使用。 metadata: 额外入站元数据,例如连接信息或追踪上下文。 diff --git a/src/plugin_runtime/host/message_utils.py b/src/plugin_runtime/host/message_utils.py index aaebb529..2f6aa01b 100644 --- a/src/plugin_runtime/host/message_utils.py +++ b/src/plugin_runtime/host/message_utils.py @@ -1,10 +1,25 @@ from datetime import datetime -from typing import Dict, Any, TypedDict, Optional, List +from typing import Any, Dict, List, Optional, TypedDict + +import base64 +import hashlib from src.common.logger import get_logger from src.chat.message_receive.message import SessionMessage from src.common.data_models.mai_message_data_model import UserInfo, GroupInfo, MessageInfo -from src.common.data_models.message_component_data_model import MessageSequence +from src.common.data_models.message_component_data_model import ( + AtComponent, + DictComponent, + EmojiComponent, + ForwardComponent, + ForwardNodeComponent, + ImageComponent, + MessageSequence, + ReplyComponent, + StandardMessageComponents, + TextComponent, + VoiceComponent, +) logger = get_logger("plugin_runtime.host.message_utils") @@ -45,6 +60,251 @@ class MessageDict(TypedDict, total=False): class PluginMessageUtils: + @staticmethod + def _message_sequence_to_dict(message_sequence: MessageSequence) -> List[Dict[str, Any]]: + """将消息组件序列转换为插件运行时使用的字典结构。 + + Args: + message_sequence: 待转换的消息组件序列。 + + Returns: + List[Dict[str, Any]]: 供插件运行时协议使用的消息段字典列表。 + """ + return [PluginMessageUtils._component_to_dict(component) for component in message_sequence.components] + + @staticmethod + def _component_to_dict(component: StandardMessageComponents) -> Dict[str, Any]: + """将单个消息组件转换为插件运行时字典结构。 + + Args: + component: 待转换的消息组件。 + + Returns: + Dict[str, Any]: 序列化后的消息组件字典。 + """ + if isinstance(component, TextComponent): + return {"type": "text", "data": component.text} + + if isinstance(component, ImageComponent): + serialized = { + "type": "image", + "data": component.content, + "hash": component.binary_hash, + } + if component.binary_data: + serialized["binary_data_base64"] = base64.b64encode(component.binary_data).decode("utf-8") + return serialized + + if isinstance(component, EmojiComponent): + serialized = { + "type": "emoji", + "data": component.content, + "hash": component.binary_hash, + } + if component.binary_data: + serialized["binary_data_base64"] = base64.b64encode(component.binary_data).decode("utf-8") + return serialized + + if isinstance(component, VoiceComponent): + serialized = { + "type": "voice", + "data": component.content, + "hash": component.binary_hash, + } + if component.binary_data: + serialized["binary_data_base64"] = base64.b64encode(component.binary_data).decode("utf-8") + return serialized + + if isinstance(component, AtComponent): + return { + "type": "at", + "data": { + "target_user_id": component.target_user_id, + "target_user_nickname": component.target_user_nickname, + "target_user_cardname": component.target_user_cardname, + }, + } + + if isinstance(component, ReplyComponent): + return { + "type": "reply", + "data": { + "target_message_id": component.target_message_id, + "target_message_content": component.target_message_content, + "target_message_sender_id": component.target_message_sender_id, + "target_message_sender_nickname": component.target_message_sender_nickname, + "target_message_sender_cardname": component.target_message_sender_cardname, + }, + } + + if isinstance(component, ForwardNodeComponent): + return { + "type": "forward", + "data": [PluginMessageUtils._forward_component_to_dict(item) for item in component.forward_components], + } + + return {"type": "dict", "data": component.data} + + @staticmethod + def _forward_component_to_dict(component: ForwardComponent) -> Dict[str, Any]: + """将单个转发节点组件转换为字典结构。 + + Args: + component: 待转换的转发节点组件。 + + Returns: + Dict[str, Any]: 序列化后的转发节点字典。 + """ + return { + "user_id": component.user_id, + "user_nickname": component.user_nickname, + "user_cardname": component.user_cardname, + "message_id": component.message_id, + "content": [PluginMessageUtils._component_to_dict(item) for item in component.content], + } + + @staticmethod + def _message_sequence_from_dict(raw_message_data: List[Dict[str, Any]]) -> MessageSequence: + """从插件运行时字典结构恢复消息组件序列。 + + Args: + raw_message_data: 插件运行时消息段字典列表。 + + Returns: + MessageSequence: 恢复后的消息组件序列。 + """ + components = [PluginMessageUtils._component_from_dict(item) for item in raw_message_data] + return MessageSequence(components=components) + + @staticmethod + def _component_from_dict(item: Dict[str, Any]) -> StandardMessageComponents: + """从插件运行时字典结构恢复单个消息组件。 + + Args: + item: 单个消息组件的字典表示。 + + Returns: + StandardMessageComponents: 恢复后的内部消息组件对象。 + """ + item_type = str(item.get("type") or "").strip() + if item_type == "text": + return TextComponent(text=str(item.get("data") or "")) + + if item_type == "image": + return PluginMessageUtils._build_binary_component(ImageComponent, item) + + if item_type == "emoji": + return PluginMessageUtils._build_binary_component(EmojiComponent, item) + + if item_type == "voice": + return PluginMessageUtils._build_binary_component(VoiceComponent, item) + + if item_type == "at": + item_data = item.get("data", {}) + if not isinstance(item_data, dict): + item_data = {} + return AtComponent( + target_user_id=str(item_data.get("target_user_id") or ""), + target_user_nickname=PluginMessageUtils._normalize_optional_string(item_data.get("target_user_nickname")), + target_user_cardname=PluginMessageUtils._normalize_optional_string(item_data.get("target_user_cardname")), + ) + + if item_type == "reply": + reply_data = item.get("data") + if isinstance(reply_data, dict): + return ReplyComponent( + target_message_id=str(reply_data.get("target_message_id") or ""), + target_message_content=PluginMessageUtils._normalize_optional_string( + reply_data.get("target_message_content") + ), + target_message_sender_id=PluginMessageUtils._normalize_optional_string( + reply_data.get("target_message_sender_id") + ), + target_message_sender_nickname=PluginMessageUtils._normalize_optional_string( + reply_data.get("target_message_sender_nickname") + ), + target_message_sender_cardname=PluginMessageUtils._normalize_optional_string( + reply_data.get("target_message_sender_cardname") + ), + ) + return ReplyComponent(target_message_id=str(reply_data or "")) + + if item_type == "forward": + forward_nodes: List[ForwardComponent] = [] + raw_forward_nodes = item.get("data", []) + if isinstance(raw_forward_nodes, list): + for node in raw_forward_nodes: + if not isinstance(node, dict): + continue + raw_content = node.get("content", []) + node_components: List[StandardMessageComponents] = [] + if isinstance(raw_content, list): + node_components = [ + PluginMessageUtils._component_from_dict(content) + for content in raw_content + if isinstance(content, dict) + ] + if not node_components: + node_components = [TextComponent(text="[empty forward node]")] + forward_nodes.append( + ForwardComponent( + user_nickname=str(node.get("user_nickname") or "未知用户"), + user_id=PluginMessageUtils._normalize_optional_string(node.get("user_id")), + user_cardname=PluginMessageUtils._normalize_optional_string(node.get("user_cardname")), + message_id=str(node.get("message_id") or ""), + content=node_components, + ) + ) + if not forward_nodes: + return DictComponent(data={"type": "forward", "data": item.get("data", [])}) + return ForwardNodeComponent(forward_components=forward_nodes) + + component_data = item.get("data") + if isinstance(component_data, dict): + return DictComponent(data=component_data) + return DictComponent(data=item) + + @staticmethod + def _build_binary_component(component_cls: Any, item: Dict[str, Any]) -> StandardMessageComponents: + """从字典构造带二进制负载的消息组件。 + + Args: + component_cls: 目标组件类型。 + item: 消息组件字典。 + + Returns: + StandardMessageComponents: 构造后的组件对象。 + """ + content = str(item.get("data") or "") + binary_hash = str(item.get("hash") or "") + raw_binary_base64 = item.get("binary_data_base64") + binary_data = b"" + if isinstance(raw_binary_base64, str) and raw_binary_base64: + try: + binary_data = base64.b64decode(raw_binary_base64) + except Exception: + binary_data = b"" + + if not binary_hash and binary_data: + binary_hash = hashlib.sha256(binary_data).hexdigest() + + return component_cls(binary_hash=binary_hash, content=content, binary_data=binary_data) + + @staticmethod + def _normalize_optional_string(value: Any) -> Optional[str]: + """将任意值规范化为可选字符串。 + + Args: + value: 待规范化的值。 + + Returns: + Optional[str]: 规范化后的字符串;若值为空则返回 ``None``。 + """ + if value is None: + return None + normalized_value = str(value) + return normalized_value if normalized_value else None + @staticmethod def _message_info_to_dict(message_info: MessageInfo) -> MessageInfoDict: """ @@ -92,7 +352,7 @@ class PluginMessageUtils: timestamp=str(session_message.timestamp.timestamp()), # 转换为时间戳字符串 platform=session_message.platform, message_info=PluginMessageUtils._message_info_to_dict(session_message.message_info), - raw_message=session_message.raw_message.to_dict(), # 复用 MessageSequence.to_dict() + raw_message=PluginMessageUtils._message_sequence_to_dict(session_message.raw_message), is_mentioned=session_message.is_mentioned, is_at=session_message.is_at, is_emoji=session_message.is_emoji, @@ -186,7 +446,7 @@ class PluginMessageUtils: # 构建原始消息组件序列(复用 MessageSequence.from_dict 方法) raw_message_data = message_dict["raw_message"] if isinstance(raw_message_data, list): - session_message.raw_message = MessageSequence.from_dict(raw_message_data) + session_message.raw_message = PluginMessageUtils._message_sequence_from_dict(raw_message_data) else: raise ValueError("消息字典中 'raw_message' 字段必须是一个列表") diff --git a/src/plugins/built_in/napcat_adapter/__init__.py b/src/plugins/built_in/napcat_adapter/__init__.py new file mode 100644 index 00000000..fa82860f --- /dev/null +++ b/src/plugins/built_in/napcat_adapter/__init__.py @@ -0,0 +1 @@ +"""NapCat 内置适配器插件包。""" diff --git a/src/plugins/built_in/napcat_adapter/codec_inbound.py b/src/plugins/built_in/napcat_adapter/codec_inbound.py new file mode 100644 index 00000000..b8065585 --- /dev/null +++ b/src/plugins/built_in/napcat_adapter/codec_inbound.py @@ -0,0 +1,414 @@ +"""NapCat 入站消息编解码。""" + +from typing import Any, Dict, List, Mapping, Optional, Tuple +from uuid import uuid4 + +import hashlib +import json +import time + +from napcat_adapter.qq_queries import NapCatQueryService + + +class NapCatInboundCodec: + """NapCat 入站消息编码器。""" + + def __init__(self, logger: Any, query_service: NapCatQueryService) -> None: + """初始化入站消息编码器。 + + Args: + logger: 插件日志对象。 + query_service: QQ 查询服务。 + """ + self._logger = logger + self._query_service = query_service + + async def build_message_dict( + self, + payload: Mapping[str, Any], + self_id: str, + sender_user_id: str, + sender: Mapping[str, Any], + ) -> Dict[str, Any]: + """构造 Host 侧可接受的 ``MessageDict``。 + + Args: + payload: NapCat 原始消息事件。 + self_id: 当前机器人账号 ID。 + sender_user_id: 发送者用户 ID。 + sender: 发送者信息字典。 + + Returns: + Dict[str, Any]: 规范化后的 ``MessageDict``。 + """ + message_type = str(payload.get("message_type") or "").strip() or "private" + group_id = str(payload.get("group_id") or "").strip() + group_name = str(payload.get("group_name") or "").strip() or (f"group_{group_id}" if group_id else "") + user_nickname = str(sender.get("nickname") or sender.get("card") or sender_user_id).strip() or sender_user_id + user_cardname = str(sender.get("card") or "").strip() or None + + raw_message, is_at = await self.convert_segments(payload, self_id) + raw_message_text = str(payload.get("raw_message") or "").strip() + if not raw_message: + raw_message = [{"type": "text", "data": raw_message_text or "[unsupported]"}] + + plain_text = self.build_plain_text(raw_message, raw_message_text) + timestamp_seconds = payload.get("time") + if not isinstance(timestamp_seconds, (int, float)): + timestamp_seconds = time.time() + + additional_config: Dict[str, Any] = {"self_id": self_id, "napcat_message_type": message_type} + if group_id: + additional_config["platform_io_target_group_id"] = group_id + else: + additional_config["platform_io_target_user_id"] = sender_user_id + + message_info: Dict[str, Any] = { + "user_info": { + "user_id": sender_user_id, + "user_nickname": user_nickname, + "user_cardname": user_cardname, + }, + "additional_config": additional_config, + } + if group_id: + message_info["group_info"] = {"group_id": group_id, "group_name": group_name} + + message_id = str(payload.get("message_id") or f"napcat-{uuid4().hex}").strip() + return { + "message_id": message_id, + "timestamp": str(float(timestamp_seconds)), + "platform": "qq", + "message_info": message_info, + "raw_message": raw_message, + "is_mentioned": is_at, + "is_at": is_at, + "is_emoji": False, + "is_picture": False, + "is_command": plain_text.startswith("/"), + "is_notify": False, + "session_id": "", + "processed_plain_text": plain_text, + "display_message": plain_text, + } + + async def convert_segments(self, payload: Mapping[str, Any], self_id: str) -> Tuple[List[Dict[str, Any]], bool]: + """将 OneBot 消息段转换为 Host 消息段结构。 + + Args: + payload: OneBot 原始消息事件。 + self_id: 当前机器人账号 ID。 + + Returns: + Tuple[List[Dict[str, Any]], bool]: 转换后的消息段列表,以及是否 @ 到当前机器人。 + """ + message_payload = payload.get("message") + if isinstance(message_payload, str): + normalized_text = message_payload.strip() + return ([{"type": "text", "data": normalized_text}] if normalized_text else []), False + + if not isinstance(message_payload, list): + return [], False + + converted_segments: List[Dict[str, Any]] = [] + is_at = False + for segment in message_payload: + if not isinstance(segment, Mapping): + continue + + segment_type = str(segment.get("type") or "").strip() + segment_data = segment.get("data", {}) + if not isinstance(segment_data, Mapping): + segment_data = {} + + if segment_type == "text": + if text_value := str(segment_data.get("text") or ""): + converted_segments.append({"type": "text", "data": text_value}) + continue + + if segment_type == "at": + if target_user_id := str(segment_data.get("qq") or "").strip(): + converted_segments.append( + { + "type": "at", + "data": { + "target_user_id": target_user_id, + "target_user_nickname": None, + "target_user_cardname": None, + }, + } + ) + if self_id and target_user_id == self_id: + is_at = True + continue + + if segment_type == "reply": + if reply_segment := await self._build_reply_segment(segment_data): + converted_segments.append(reply_segment) + continue + + if segment_type == "face": + converted_segments.append({"type": "text", "data": "[face]"}) + continue + + if segment_type == "image": + converted_segments.append(await self._build_image_like_segment(segment_data, is_emoji=False)) + continue + + if segment_type == "record": + converted_segments.append(await self._build_record_segment(segment_data)) + continue + + if segment_type == "video": + converted_segments.append({"type": "text", "data": "[video]"}) + continue + + if segment_type == "file": + converted_segments.append({"type": "text", "data": "[file]"}) + continue + + if segment_type == "json": + converted_segments.append(self._build_json_text_segment(segment_data)) + continue + + if segment_type == "forward": + if forward_segment := await self._build_forward_segment(segment_data): + converted_segments.append(forward_segment) + continue + + if segment_type in {"xml", "share"}: + converted_segments.append({"type": "text", "data": f"[{segment_type}]"}) + + return converted_segments, is_at + + async def _build_reply_segment(self, segment_data: Mapping[str, Any]) -> Optional[Dict[str, Any]]: + """构造回复消息段。 + + Args: + segment_data: OneBot ``reply`` 段的 ``data`` 字典。 + + Returns: + Optional[Dict[str, Any]]: 转换后的回复消息段;缺少消息 ID 时返回 ``None``。 + """ + target_message_id = str(segment_data.get("id") or "").strip() + if not target_message_id: + return None + + message_detail = await self._query_service.get_message_detail(target_message_id) + reply_payload: Dict[str, Any] = {"target_message_id": target_message_id} + if message_detail is not None: + sender = message_detail.get("sender", {}) + if not isinstance(sender, Mapping): + sender = {} + reply_payload["target_message_content"] = str(message_detail.get("raw_message") or "").strip() or None + reply_payload["target_message_sender_id"] = str( + message_detail.get("user_id") or sender.get("user_id") or "" + ).strip() or None + reply_payload["target_message_sender_nickname"] = str(sender.get("nickname") or "").strip() or None + reply_payload["target_message_sender_cardname"] = str(sender.get("card") or "").strip() or None + + return {"type": "reply", "data": reply_payload} + + async def _build_image_like_segment( + self, + segment_data: Mapping[str, Any], + is_emoji: bool, + ) -> Dict[str, Any]: + """构造图片或表情消息段。 + + Args: + segment_data: OneBot ``image`` 段的 ``data`` 字典。 + is_emoji: 是否按表情组件处理。 + + Returns: + Dict[str, Any]: 转换后的图片或表情消息段。 + """ + subtype = segment_data.get("sub_type") + actual_is_emoji = is_emoji or (isinstance(subtype, int) and subtype not in {0, 4, 9}) + + image_url = str(segment_data.get("url") or "").strip() + binary_data = await self._query_service.download_binary(image_url) + if not binary_data: + return {"type": "text", "data": "[emoji]" if actual_is_emoji else "[image]"} + + return { + "type": "emoji" if actual_is_emoji else "image", + "data": "", + "hash": hashlib.sha256(binary_data).hexdigest(), + "binary_data_base64": self._encode_binary(binary_data), + } + + async def _build_record_segment(self, segment_data: Mapping[str, Any]) -> Dict[str, Any]: + """构造语音消息段。 + + Args: + segment_data: OneBot ``record`` 段的 ``data`` 字典。 + + Returns: + Dict[str, Any]: 转换后的语音或占位文本消息段。 + """ + file_name = str(segment_data.get("file") or "").strip() + file_id = str(segment_data.get("file_id") or "").strip() or None + if not file_name: + return {"type": "text", "data": "[voice]"} + + record_detail = await self._query_service.get_record_detail(file_name=file_name, file_id=file_id) + if record_detail is None: + return {"type": "text", "data": "[voice]"} + + record_base64 = str(record_detail.get("base64") or "").strip() + if not record_base64: + return {"type": "text", "data": "[voice]"} + + try: + binary_data = self._decode_binary(record_base64) + except Exception: + return {"type": "text", "data": "[voice]"} + + return { + "type": "voice", + "data": "", + "hash": hashlib.sha256(binary_data).hexdigest(), + "binary_data_base64": self._encode_binary(binary_data), + } + + async def _build_forward_segment(self, segment_data: Mapping[str, Any]) -> Optional[Dict[str, Any]]: + """构造合并转发消息段。 + + Args: + segment_data: OneBot ``forward`` 段的 ``data`` 字典。 + + Returns: + Optional[Dict[str, Any]]: 转换后的合并转发消息段;失败时返回 ``None``。 + """ + message_id = str(segment_data.get("id") or "").strip() + if not message_id: + return None + + forward_detail = await self._query_service.get_forward_message(message_id) + if forward_detail is None: + return {"type": "text", "data": "[forward]"} + + messages = forward_detail.get("messages", []) + if not isinstance(messages, list): + return {"type": "text", "data": "[forward]"} + + forward_nodes: List[Dict[str, Any]] = [] + for forward_message in messages: + if not isinstance(forward_message, Mapping): + continue + raw_content = forward_message.get("content", []) + content_segments = await self._convert_forward_content(raw_content, "") + sender = forward_message.get("sender", {}) + if not isinstance(sender, Mapping): + sender = {} + forward_nodes.append( + { + "user_id": str(sender.get("user_id") or sender.get("uin") or "").strip() or None, + "user_nickname": str(sender.get("nickname") or sender.get("name") or "未知用户"), + "user_cardname": str(sender.get("card") or "").strip() or None, + "message_id": str(forward_message.get("message_id") or uuid4().hex), + "content": content_segments or [{"type": "text", "data": "[empty]"}], + } + ) + + if not forward_nodes: + return {"type": "text", "data": "[forward]"} + return {"type": "forward", "data": forward_nodes} + + async def _convert_forward_content(self, raw_content: Any, self_id: str) -> List[Dict[str, Any]]: + """转换转发节点内部的消息段列表。 + + Args: + raw_content: 转发节点原始内容。 + self_id: 当前机器人账号 ID。 + + Returns: + List[Dict[str, Any]]: 转换后的消息段列表。 + """ + pseudo_payload: Dict[str, Any] = {"message": raw_content} + segments, _ = await self.convert_segments(pseudo_payload, self_id) + return segments + + def _build_json_text_segment(self, segment_data: Mapping[str, Any]) -> Dict[str, Any]: + """将 JSON 卡片最佳努力转换为文本占位。 + + Args: + segment_data: OneBot ``json`` 段的 ``data`` 字典。 + + Returns: + Dict[str, Any]: 转换后的文本消息段。 + """ + json_data = str(segment_data.get("data") or "").strip() + if not json_data: + return {"type": "text", "data": "[json]"} + + try: + parsed_json = json.loads(json_data) + except Exception: + return {"type": "text", "data": "[json]"} + + app_name = str(parsed_json.get("app") or "").strip() + prompt = "" + if isinstance(parsed_json.get("meta"), Mapping): + prompt = str(parsed_json["meta"].get("prompt") or "").strip() + text = prompt or app_name or "json" + return {"type": "text", "data": f"[json:{text}]"} + + @staticmethod + def _encode_binary(binary_data: bytes) -> str: + """将二进制内容编码为 Base64 字符串。 + + Args: + binary_data: 待编码的二进制内容。 + + Returns: + str: Base64 编码字符串。 + """ + import base64 + + return base64.b64encode(binary_data).decode("utf-8") + + @staticmethod + def _decode_binary(binary_base64: str) -> bytes: + """将 Base64 字符串解码为二进制内容。 + + Args: + binary_base64: Base64 字符串。 + + Returns: + bytes: 解码后的二进制内容。 + """ + import base64 + + return base64.b64decode(binary_base64) + + def build_plain_text(self, raw_message: List[Dict[str, Any]], fallback_text: str) -> str: + """从标准消息段中提取可展示的纯文本。 + + Args: + raw_message: 标准化后的消息段列表。 + fallback_text: 当无法拼出文本时使用的回退文本。 + + Returns: + str: 用于 Host 展示和命令判断的纯文本内容。 + """ + plain_text_parts: List[str] = [] + for item in raw_message: + if not isinstance(item, Mapping): + continue + item_type = str(item.get("type") or "").strip() + item_data = item.get("data") + if item_type == "text": + plain_text_parts.append(str(item_data or "")) + elif item_type == "at" and isinstance(item_data, Mapping): + plain_text_parts.append(f"@{item_data.get('target_user_id') or ''}") + elif item_type == "reply": + plain_text_parts.append("[reply]") + elif item_type == "forward": + plain_text_parts.append("[forward]") + elif item_type in {"image", "emoji", "voice"}: + plain_text_parts.append(f"[{item_type}]") + + plain_text = "".join(part for part in plain_text_parts if part).strip() + return plain_text or fallback_text or "[unsupported]" diff --git a/src/plugins/built_in/napcat_adapter/codec_outbound.py b/src/plugins/built_in/napcat_adapter/codec_outbound.py new file mode 100644 index 00000000..6adcb622 --- /dev/null +++ b/src/plugins/built_in/napcat_adapter/codec_outbound.py @@ -0,0 +1,192 @@ +"""NapCat 出站消息编解码。""" + +from typing import Any, Dict, List, Mapping, Tuple + + +class NapCatOutboundCodec: + """NapCat 出站消息编码器。""" + + def build_outbound_action( + self, + message: Mapping[str, Any], + route: Mapping[str, Any], + ) -> Tuple[str, Dict[str, Any]]: + """为 Host 出站消息构造 OneBot 动作。 + + Args: + message: Host 侧标准 ``MessageDict``。 + route: Platform IO 路由信息。 + + Returns: + Tuple[str, Dict[str, Any]]: 动作名称与参数字典。 + + Raises: + ValueError: 当私聊出站缺少目标用户 ID 时抛出。 + """ + message_info = message.get("message_info", {}) + if not isinstance(message_info, Mapping): + message_info = {} + + group_info = message_info.get("group_info", {}) + if not isinstance(group_info, Mapping): + group_info = {} + + additional_config = message_info.get("additional_config", {}) + if not isinstance(additional_config, Mapping): + additional_config = {} + + raw_message = message.get("raw_message", []) + segments = self.convert_segments(raw_message) + + if target_group_id := str( + group_info.get("group_id") or additional_config.get("platform_io_target_group_id") or "" + ).strip(): + return "send_group_msg", {"group_id": target_group_id, "message": segments} + + target_user_id = str( + additional_config.get("platform_io_target_user_id") + or additional_config.get("target_user_id") + or route.get("target_user_id") + or "" + ).strip() + if not target_user_id: + raise ValueError("Outbound private message is missing target_user_id") + + return "send_private_msg", {"message": segments, "user_id": target_user_id} + + def convert_segments(self, raw_message: Any) -> List[Dict[str, Any]]: + """将 Host 消息段转换为 OneBot 消息段。 + + Args: + raw_message: Host 侧 ``raw_message`` 字段。 + + Returns: + List[Dict[str, Any]]: OneBot 消息段列表。 + """ + if not isinstance(raw_message, list): + return [{"type": "text", "data": {"text": ""}}] + + outbound_segments: List[Dict[str, Any]] = [] + for item in raw_message: + if not isinstance(item, Mapping): + continue + + item_type = str(item.get("type") or "").strip() + item_data = item.get("data") + + if item_type == "text": + text_value = str(item_data or "") + outbound_segments.append({"type": "text", "data": {"text": text_value}}) + continue + + if item_type == "at" and isinstance(item_data, Mapping): + if target_user_id := str(item_data.get("target_user_id") or "").strip(): + outbound_segments.append({"type": "at", "data": {"qq": target_user_id}}) + continue + + if item_type == "reply": + if isinstance(item_data, Mapping): + target_message_id = str(item_data.get("target_message_id") or "").strip() + else: + target_message_id = str(item_data or "").strip() + if target_message_id: + outbound_segments.append({"type": "reply", "data": {"id": target_message_id}}) + continue + + if item_type == "image": + binary_base64 = str(item.get("binary_data_base64") or "").strip() + if binary_base64: + outbound_segments.append( + { + "type": "image", + "data": {"file": f"base64://{binary_base64}", "subtype": 0}, + } + ) + else: + outbound_segments.append({"type": "text", "data": {"text": "[image]"}}) + continue + + if item_type == "emoji": + binary_base64 = str(item.get("binary_data_base64") or "").strip() + if binary_base64: + outbound_segments.append( + { + "type": "image", + "data": { + "file": f"base64://{binary_base64}", + "subtype": 1, + "summary": "[动画表情]", + }, + } + ) + else: + outbound_segments.append({"type": "text", "data": {"text": "[emoji]"}}) + continue + + if item_type == "voice": + binary_base64 = str(item.get("binary_data_base64") or "").strip() + if binary_base64: + outbound_segments.append({"type": "record", "data": {"file": f"base64://{binary_base64}"}}) + else: + outbound_segments.append({"type": "text", "data": {"text": "[voice]"}}) + continue + + if item_type == "forward" and isinstance(item_data, list): + outbound_segments.extend(self._build_forward_nodes(item_data)) + continue + + if item_type == "dict" and isinstance(item_data, Mapping): + if dict_segment := self._build_dict_component_segment(item_data): + outbound_segments.append(dict_segment) + continue + + fallback_text = f"[unsupported:{item_type or 'unknown'}]" + outbound_segments.append({"type": "text", "data": {"text": fallback_text}}) + + if not outbound_segments: + outbound_segments.append({"type": "text", "data": {"text": ""}}) + return outbound_segments + + def _build_forward_nodes(self, forward_nodes: List[Any]) -> List[Dict[str, Any]]: + """构造 NapCat 转发节点列表。 + + Args: + forward_nodes: 内部转发节点列表。 + + Returns: + List[Dict[str, Any]]: NapCat 转发节点列表。 + """ + built_nodes: List[Dict[str, Any]] = [] + for node in forward_nodes: + if not isinstance(node, Mapping): + continue + raw_content = node.get("content", []) + node_segments = self.convert_segments(raw_content) + built_nodes.append( + { + "type": "node", + "data": { + "name": str(node.get("user_nickname") or node.get("user_cardname") or "QQ用户"), + "uin": str(node.get("user_id") or ""), + "content": node_segments, + }, + } + ) + return built_nodes + + def _build_dict_component_segment(self, item_data: Mapping[str, Any]) -> Dict[str, Any]: + """尽力将 ``DictComponent`` 转换为 NapCat 消息段。 + + Args: + item_data: ``DictComponent`` 原始数据。 + + Returns: + Dict[str, Any]: NapCat 消息段;不支持时返回占位文本段。 + """ + raw_type = str(item_data.get("type") or "").strip() + raw_payload = item_data.get("data", item_data) + if raw_type in {"file", "music", "video", "face"} and isinstance(raw_payload, Mapping): + return {"type": raw_type, "data": dict(raw_payload)} + if raw_type in {"image", "record", "reply", "at"} and isinstance(raw_payload, Mapping): + return {"type": raw_type, "data": dict(raw_payload)} + return {"type": "text", "data": {"text": f"[unsupported:{raw_type or 'dict'}]"}} diff --git a/src/plugins/built_in/napcat_adapter/config.py b/src/plugins/built_in/napcat_adapter/config.py new file mode 100644 index 00000000..eeb4acab --- /dev/null +++ b/src/plugins/built_in/napcat_adapter/config.py @@ -0,0 +1,398 @@ +"""NapCat 内置适配器配置解析。""" + +from dataclasses import dataclass, field +from typing import Any, Dict, Mapping, Optional, Set, Tuple +from urllib.parse import urlparse + +from napcat_adapter.constants import ( + DEFAULT_ACTION_TIMEOUT_SEC, + DEFAULT_CHAT_LIST_TYPE, + DEFAULT_HEARTBEAT_INTERVAL_SEC, + DEFAULT_NAPCAT_HOST, + DEFAULT_NAPCAT_PORT, + DEFAULT_RECONNECT_DELAY_SEC, + SUPPORTED_CONFIG_VERSION, +) + + +@dataclass(frozen=True) +class NapCatPluginOptions: + """插件级配置。""" + + enabled: bool = False + config_version: str = "" + + def should_connect(self) -> bool: + """判断当前配置下是否应当启动连接。 + + Returns: + bool: 若插件连接已启用,则返回 ``True``。 + """ + return self.enabled + + +@dataclass(frozen=True) +class NapCatServerConfig: + """NapCat 正向 WebSocket 连接配置。""" + + host: str = DEFAULT_NAPCAT_HOST + port: int = DEFAULT_NAPCAT_PORT + token: str = "" + heartbeat_interval: float = DEFAULT_HEARTBEAT_INTERVAL_SEC + reconnect_delay_sec: float = DEFAULT_RECONNECT_DELAY_SEC + action_timeout_sec: float = DEFAULT_ACTION_TIMEOUT_SEC + connection_id: str = "" + + def build_ws_url(self) -> str: + """构造正向 WebSocket 地址。 + + Returns: + str: 供适配器作为客户端连接的 NapCat WebSocket 地址。 + """ + return f"ws://{self.host}:{self.port}" + + +@dataclass(frozen=True) +class NapCatChatConfig: + """聊天名单配置。""" + + group_list_type: str = DEFAULT_CHAT_LIST_TYPE + group_list: Set[str] = field(default_factory=set) + private_list_type: str = DEFAULT_CHAT_LIST_TYPE + private_list: Set[str] = field(default_factory=set) + ban_user_id: Set[str] = field(default_factory=set) + + +@dataclass(frozen=True) +class NapCatFilterConfig: + """消息过滤配置。""" + + ignore_self_message: bool = True + + +@dataclass(frozen=True) +class NapCatPluginSettings: + """NapCat 插件完整配置。""" + + plugin: NapCatPluginOptions = field(default_factory=NapCatPluginOptions) + napcat_server: NapCatServerConfig = field(default_factory=NapCatServerConfig) + chat: NapCatChatConfig = field(default_factory=NapCatChatConfig) + filters: NapCatFilterConfig = field(default_factory=NapCatFilterConfig) + + @classmethod + def from_mapping(cls, raw_config: Mapping[str, Any], logger: Any) -> "NapCatPluginSettings": + """从 Runner 注入的原始配置字典解析插件配置。 + + Args: + raw_config: Runner 注入的原始配置内容。 + logger: 插件日志对象。 + + Returns: + NapCatPluginSettings: 规范化后的插件配置。 + """ + plugin_section = _as_mapping(raw_config.get("plugin")) + server_section = _as_mapping(raw_config.get("napcat_server")) + legacy_connection_section = _as_mapping(raw_config.get("connection")) + chat_section = _as_mapping(raw_config.get("chat")) + filters_section = _as_mapping(raw_config.get("filters")) + + if not server_section and legacy_connection_section: + logger.warning("NapCat 适配器检测到旧版 [connection] 配置段,请尽快迁移到 [napcat_server]") + server_section = legacy_connection_section + + legacy_host, legacy_port = _read_legacy_host_port(server_section, legacy_connection_section, logger) + parsed_host = _read_string(server_section, "host") or legacy_host or DEFAULT_NAPCAT_HOST + parsed_port = _read_positive_int( + mapping=server_section, + key="port", + default=legacy_port or DEFAULT_NAPCAT_PORT, + logger=logger, + setting_name="napcat_server.port", + ) + + return cls( + plugin=NapCatPluginOptions( + enabled=_read_bool(plugin_section, "enabled", False), + config_version=_read_string(plugin_section, "config_version"), + ), + napcat_server=NapCatServerConfig( + host=parsed_host, + port=parsed_port, + token=_read_string(server_section, "token") or _read_string(server_section, "access_token"), + heartbeat_interval=_read_positive_float( + mapping=server_section, + key="heartbeat_interval", + default=_read_positive_float( + mapping=server_section, + key="heartbeat_sec", + default=DEFAULT_HEARTBEAT_INTERVAL_SEC, + logger=logger, + setting_name="napcat_server.heartbeat_interval", + ), + logger=logger, + setting_name="napcat_server.heartbeat_interval", + ), + reconnect_delay_sec=_read_positive_float( + mapping=server_section, + key="reconnect_delay_sec", + default=DEFAULT_RECONNECT_DELAY_SEC, + logger=logger, + setting_name="napcat_server.reconnect_delay_sec", + ), + action_timeout_sec=_read_positive_float( + mapping=server_section, + key="action_timeout_sec", + default=DEFAULT_ACTION_TIMEOUT_SEC, + logger=logger, + setting_name="napcat_server.action_timeout_sec", + ), + connection_id=_read_string(server_section, "connection_id"), + ), + chat=NapCatChatConfig( + group_list_type=_read_list_mode( + mapping=chat_section, + key="group_list_type", + default=DEFAULT_CHAT_LIST_TYPE, + logger=logger, + setting_name="chat.group_list_type", + ), + group_list=_read_string_set(chat_section, "group_list"), + private_list_type=_read_list_mode( + mapping=chat_section, + key="private_list_type", + default=DEFAULT_CHAT_LIST_TYPE, + logger=logger, + setting_name="chat.private_list_type", + ), + private_list=_read_string_set(chat_section, "private_list"), + ban_user_id=_read_string_set(chat_section, "ban_user_id"), + ), + filters=NapCatFilterConfig( + ignore_self_message=_read_bool(filters_section, "ignore_self_message", True), + ), + ) + + def should_connect(self) -> bool: + """判断当前配置下是否应当启动连接。 + + Returns: + bool: 若插件连接已启用,则返回 ``True``。 + """ + return self.plugin.should_connect() + + def validate(self, logger: Any) -> bool: + """校验当前配置是否满足启动连接的前提条件。 + + Args: + logger: 插件日志对象。 + + Returns: + bool: 若配置满足启动连接的前提条件,则返回 ``True``。 + """ + config_version = self.plugin.config_version + if not config_version: + logger.error( + f"NapCat 适配器配置缺少 plugin.config_version,当前插件要求版本 {SUPPORTED_CONFIG_VERSION}" + ) + return False + + if config_version != SUPPORTED_CONFIG_VERSION: + logger.error( + "NapCat 适配器配置版本不兼容: " + f"当前为 {config_version},当前插件要求 {SUPPORTED_CONFIG_VERSION}" + ) + return False + + if not self.napcat_server.host: + logger.warning("NapCat 适配器已启用,但 napcat_server.host 为空") + return False + + if self.napcat_server.port <= 0: + logger.warning("NapCat 适配器已启用,但 napcat_server.port 不是正整数") + return False + + return True + + +def _as_mapping(value: Any) -> Dict[str, Any]: + """将任意值安全转换为字典。 + + Args: + value: 待转换的值。 + + Returns: + Dict[str, Any]: 若原值是映射,则返回普通字典;否则返回空字典。 + """ + return dict(value) if isinstance(value, Mapping) else {} + + +def _read_bool(mapping: Mapping[str, Any], key: str, default: bool) -> bool: + """安全读取布尔配置值。 + + Args: + mapping: 待读取的配置字典。 + key: 目标键名。 + default: 读取失败时的默认值。 + + Returns: + bool: 解析后的布尔值。 + """ + value = mapping.get(key, default) + return value if isinstance(value, bool) else default + + +def _read_string(mapping: Mapping[str, Any], key: str) -> str: + """安全读取字符串配置值。 + + Args: + mapping: 待读取的配置字典。 + key: 目标键名。 + + Returns: + str: 去除首尾空白后的字符串值。 + """ + value = mapping.get(key) + return "" if value is None else str(value).strip() + + +def _read_positive_float( + mapping: Mapping[str, Any], + key: str, + default: float, + logger: Any, + setting_name: str, +) -> float: + """安全读取正浮点数配置值。 + + Args: + mapping: 待读取的配置字典。 + key: 目标键名。 + default: 读取失败时的默认值。 + logger: 插件日志对象。 + setting_name: 用于日志输出的完整配置名。 + + Returns: + float: 合法的正浮点数;否则返回默认值。 + """ + value = mapping.get(key, default) + if isinstance(value, (int, float)) and float(value) > 0: + return float(value) + + if key in mapping: + logger.warning(f"NapCat 适配器配置项取值无效,已回退到默认值: {setting_name}={value!r},默认值为 {default}") + return default + + +def _read_positive_int( + mapping: Mapping[str, Any], + key: str, + default: int, + logger: Any, + setting_name: str, +) -> int: + """安全读取正整数配置值。 + + Args: + mapping: 待读取的配置字典。 + key: 目标键名。 + default: 读取失败时的默认值。 + logger: 插件日志对象。 + setting_name: 用于日志输出的完整配置名。 + + Returns: + int: 合法的正整数;否则返回默认值。 + """ + value = mapping.get(key, default) + if isinstance(value, int) and value > 0: + return value + + if isinstance(value, str) and value.isdigit() and int(value) > 0: + return int(value) + + if key in mapping: + logger.warning(f"NapCat 适配器配置项取值无效,已回退到默认值: {setting_name}={value!r},默认值为 {default}") + return default + + +def _read_list_mode( + mapping: Mapping[str, Any], + key: str, + default: str, + logger: Any, + setting_name: str, +) -> str: + """安全读取名单模式配置值。 + + Args: + mapping: 待读取的配置字典。 + key: 目标键名。 + default: 读取失败时的默认值。 + logger: 插件日志对象。 + setting_name: 用于日志输出的完整配置名。 + + Returns: + str: 合法的名单模式字符串。 + """ + value = mapping.get(key, default) + if isinstance(value, str): + normalized_value = value.strip() + if normalized_value in {"whitelist", "blacklist"}: + return normalized_value + + if key in mapping: + logger.warning(f"NapCat 适配器配置项取值无效,已回退到默认值: {setting_name}={value!r},默认值为 {default}") + return default + + +def _read_string_set(mapping: Mapping[str, Any], key: str) -> Set[str]: + """安全读取字符串集合配置值。 + + Args: + mapping: 待读取的配置字典。 + key: 目标键名。 + + Returns: + Set[str]: 规范化后的字符串集合。 + """ + value = mapping.get(key, []) + if not isinstance(value, list): + return set() + + normalized_values: Set[str] = set() + for item in value: + item_text = "" if item is None else str(item).strip() + if item_text: + normalized_values.add(item_text) + return normalized_values + + +def _read_legacy_host_port( + server_section: Mapping[str, Any], + legacy_connection_section: Mapping[str, Any], + logger: Any, +) -> Tuple[str, Optional[int]]: + """从旧版 ``ws_url`` 配置中提取主机与端口。 + + Args: + server_section: 新版 ``napcat_server`` 配置段。 + legacy_connection_section: 旧版 ``connection`` 配置段。 + logger: 插件日志对象。 + + Returns: + Tuple[str, Optional[int]]: 解析到的主机与端口;若未找到,则返回空主机与 ``None``。 + """ + legacy_ws_url = _read_string(server_section, "ws_url") or _read_string(legacy_connection_section, "ws_url") + if not legacy_ws_url: + return "", None + + parsed_url = urlparse(legacy_ws_url) + parsed_host = parsed_url.hostname or "" + parsed_port = parsed_url.port + + logger.warning( + "NapCat 适配器检测到旧版 ws_url 配置,已临时兼容解析,请尽快迁移到 napcat_server.host/port" + ) + if parsed_url.path not in {"", "/"}: + logger.warning("NapCat 适配器旧版 ws_url 包含路径,新的 napcat_server 配置不会保留该路径") + + return parsed_host, parsed_port diff --git a/src/plugins/built_in/napcat_adapter/constants.py b/src/plugins/built_in/napcat_adapter/constants.py new file mode 100644 index 00000000..bdddde6f --- /dev/null +++ b/src/plugins/built_in/napcat_adapter/constants.py @@ -0,0 +1,9 @@ +"""NapCat 内置适配器共享常量。""" + +SUPPORTED_CONFIG_VERSION = "0.1.0" +DEFAULT_NAPCAT_HOST = "127.0.0.1" +DEFAULT_NAPCAT_PORT = 3001 +DEFAULT_RECONNECT_DELAY_SEC = 5.0 +DEFAULT_HEARTBEAT_INTERVAL_SEC = 30.0 +DEFAULT_ACTION_TIMEOUT_SEC = 15.0 +DEFAULT_CHAT_LIST_TYPE = "whitelist" diff --git a/src/plugins/built_in/napcat_adapter/filters.py b/src/plugins/built_in/napcat_adapter/filters.py new file mode 100644 index 00000000..141cda85 --- /dev/null +++ b/src/plugins/built_in/napcat_adapter/filters.py @@ -0,0 +1,68 @@ +"""NapCat 入站消息过滤。""" + +from typing import Any, Set + +from napcat_adapter.config import NapCatChatConfig + + +class NapCatChatFilter: + """NapCat 聊天名单过滤器。""" + + def __init__(self, logger: Any) -> None: + """初始化聊天名单过滤器。 + + Args: + logger: 插件日志对象。 + """ + self._logger = logger + + def is_inbound_chat_allowed( + self, + sender_user_id: str, + group_id: str, + chat_config: NapCatChatConfig, + ) -> bool: + """检查入站消息是否通过聊天名单过滤。 + + Args: + sender_user_id: 发送者用户 ID。 + group_id: 群聊 ID;私聊时为空字符串。 + chat_config: 当前生效的聊天配置。 + + Returns: + bool: 若消息允许继续进入 Host,则返回 ``True``。 + """ + if sender_user_id in chat_config.ban_user_id: + self._logger.warning(f"NapCat 用户 {sender_user_id} 在全局禁止名单中,消息被丢弃") + return False + + if group_id: + if not self._is_id_allowed_by_list_policy(group_id, chat_config.group_list_type, chat_config.group_list): + self._logger.warning(f"NapCat 群聊 {group_id} 未通过聊天名单过滤,消息被丢弃") + return False + return True + + if not self._is_id_allowed_by_list_policy( + sender_user_id, + chat_config.private_list_type, + chat_config.private_list, + ): + self._logger.warning(f"NapCat 私聊用户 {sender_user_id} 未通过聊天名单过滤,消息被丢弃") + return False + return True + + @staticmethod + def _is_id_allowed_by_list_policy(target_id: str, list_type: str, configured_ids: Set[str]) -> bool: + """根据白名单或黑名单规则判断目标 ID 是否允许通过。 + + Args: + target_id: 待检查的目标 ID。 + list_type: 名单模式,仅支持 ``whitelist`` 或 ``blacklist``。 + configured_ids: 配置中的 ID 集合。 + + Returns: + bool: 若目标 ID 允许通过,则返回 ``True``。 + """ + if list_type == "whitelist": + return target_id in configured_ids + return target_id not in configured_ids diff --git a/src/plugins/built_in/napcat_adapter/plugin.py b/src/plugins/built_in/napcat_adapter/plugin.py index c8bb837b..b1e9bc8c 100644 --- a/src/plugins/built_in/napcat_adapter/plugin.py +++ b/src/plugins/built_in/napcat_adapter/plugin.py @@ -1,6 +1,6 @@ """内置 NapCat 适配器插件。 -当前实现是一个 MVP 版本,目标仅限于跑通基础消息收发链路: +当前实现维持 MVP 范围,目标是跑通基础消息收发链路: 1. 作为客户端连接 NapCat / OneBot v11 WebSocket 服务。 2. 将入站消息事件转换为 Host 侧的 ``MessageDict``。 3. 将 Host 出站消息转换为 OneBot 动作并发送。 @@ -8,45 +8,26 @@ 当前范围刻意收敛为: - 单连接 - 文本、@、reply 基础转发 -- 暂不处理 ``notice`` / ``meta_event`` +- 暂不处理 ``notice`` / ``meta_event`` 的完整语义归一化 - 暂不支持图片、语音、文件等复杂媒体 """ from __future__ import annotations -from typing import Any, Dict, List, Optional, Set, Tuple, TYPE_CHECKING, cast -from uuid import uuid4 +from typing import Any, Dict, Mapping, Optional import asyncio -import contextlib -import json -import time from maibot_sdk import Adapter, MaiBotPlugin -if TYPE_CHECKING: - from aiohttp import ClientWebSocketResponse as AiohttpClientWebSocketResponse - -try: - from aiohttp import ClientSession, ClientTimeout, ClientWebSocketResponse, WSMsgType - - AIOHTTP_AVAILABLE = True -except ImportError: - ClientSession = cast(Any, None) - ClientTimeout = cast(Any, None) - ClientWebSocketResponse = cast(Any, None) - WSMsgType = cast(Any, None) - AIOHTTP_AVAILABLE = False - -if not TYPE_CHECKING: - AiohttpClientWebSocketResponse = Any - - -SUPPORTED_CONFIG_VERSION = "0.1.0" -DEFAULT_RECONNECT_DELAY_SEC = 5.0 -DEFAULT_HEARTBEAT_SEC = 30.0 -DEFAULT_ACTION_TIMEOUT_SEC = 15.0 -DEFAULT_CHAT_LIST_TYPE = "whitelist" +from napcat_adapter.codec_inbound import NapCatInboundCodec +from napcat_adapter.codec_outbound import NapCatOutboundCodec +from napcat_adapter.config import NapCatPluginSettings +from napcat_adapter.filters import NapCatChatFilter +from napcat_adapter.qq_notice import NapCatNoticeCodec +from napcat_adapter.qq_queries import NapCatQueryService +from napcat_adapter.runtime_state import NapCatRuntimeStateManager +from napcat_adapter.transport import NapCatTransportClient @Adapter(platform="qq", protocol="napcat", send_method="send_to_platform") @@ -57,14 +38,14 @@ class NapCatAdapterPlugin(MaiBotPlugin): """初始化 NapCat 适配器插件实例。""" super().__init__() self._plugin_config: Dict[str, Any] = {} - self._connection_task: Optional[asyncio.Task[None]] = None - self._pending_actions: Dict[str, asyncio.Future[Dict[str, Any]]] = {} - self._background_tasks: Set[asyncio.Task[Any]] = set() - self._reported_account_id: Optional[str] = None - self._reported_scope: Optional[str] = None - self._runtime_state_connected: bool = False - self._send_lock = asyncio.Lock() - self._ws: Optional[AiohttpClientWebSocketResponse] = None + self._settings: Optional[NapCatPluginSettings] = None + self._inbound_codec: Optional[NapCatInboundCodec] = None + self._outbound_codec = NapCatOutboundCodec() + self._chat_filter: Optional[NapCatChatFilter] = None + self._query_service: Optional[NapCatQueryService] = None + self._notice_codec: Optional[NapCatNoticeCodec] = None + self._runtime_state: Optional[NapCatRuntimeStateManager] = None + self._transport: Optional[NapCatTransportClient] = None def set_plugin_config(self, config: Dict[str, Any]) -> None: """设置插件配置内容。 @@ -79,9 +60,8 @@ class NapCatAdapterPlugin(MaiBotPlugin): await self._restart_connection_if_needed() async def on_unload(self) -> None: - """在插件卸载时关闭连接并清理后台任务。""" + """在插件卸载时关闭连接并清理运行时状态。""" await self._stop_connection() - await self._cancel_background_tasks() async def on_config_update(self, new_config: Dict[str, Any], version: str) -> None: """在配置更新后重载连接状态。 @@ -116,13 +96,14 @@ class NapCatAdapterPlugin(MaiBotPlugin): del metadata del kwargs - ws = self._ws - if ws is None or ws.closed: - return {"success": False, "error": "NapCat is not connected"} + self._ensure_runtime_components() + transport = self._transport + if transport is None: + return {"success": False, "error": "NapCat transport is not initialized"} try: - action_name, params = self._build_outbound_action(message, route or {}) - response = await self._call_action(action_name, params) + action_name, params = self._outbound_codec.build_outbound_action(message, route or {}) + response = await transport.call_action(action_name, params) except Exception as exc: return {"success": False, "error": str(exc)} @@ -135,7 +116,7 @@ class NapCatAdapterPlugin(MaiBotPlugin): response_data = response.get("data", {}) external_message_id = "" - if isinstance(response_data, dict): + if isinstance(response_data, Mapping): external_message_id = str(response_data.get("message_id") or "") return { @@ -144,143 +125,109 @@ class NapCatAdapterPlugin(MaiBotPlugin): "metadata": {"action": action_name}, } - async def _restart_connection_if_needed(self) -> None: - """根据当前配置重启连接循环。""" - await self._stop_connection() - if not self._should_connect(): - self.ctx.logger.info("NapCat 适配器保持空闲状态,因为插件或配置未启用") - return - if not self._validate_current_config(): - return - if not AIOHTTP_AVAILABLE: - self.ctx.logger.error("NapCat 适配器依赖 aiohttp,但当前环境未安装该依赖") - return - self._connection_task = asyncio.create_task(self._connection_loop(), name="napcat_adapter.connection") + def _ensure_runtime_components(self) -> None: + """确保运行时依赖对象已经完成初始化。""" + if self._chat_filter is None: + self._chat_filter = NapCatChatFilter(self.ctx.logger) - async def _stop_connection(self) -> None: - """停止当前连接并让所有等待中的动作失败返回。""" - connection_task = self._connection_task - self._connection_task = None - - ws = self._ws - if ws is not None and not ws.closed: - with contextlib.suppress(Exception): - await ws.close() - self._ws = None - - if connection_task is not None: - connection_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await connection_task - - await self._report_adapter_disconnected() - self._fail_pending_actions("NapCat connection closed") - - async def _cancel_background_tasks(self) -> None: - """取消所有仍在运行的入站后台任务。""" - background_tasks = list(self._background_tasks) - for task in background_tasks: - task.cancel() - if background_tasks: - with contextlib.suppress(Exception): - await asyncio.gather(*background_tasks, return_exceptions=True) - self._background_tasks.clear() - - async def _connection_loop(self) -> None: - """维护单个 WebSocket 连接,并在断开后按配置重连。""" - assert ClientSession is not None - assert ClientTimeout is not None - - while self._should_connect(): - ws_url = self._get_string(self._connection_config(), "ws_url") - if not ws_url: - self.ctx.logger.warning("NapCat 适配器已启用,但 connection.ws_url 为空") - return - - headers = self._build_headers() - timeout = ClientTimeout(total=None, connect=10) - heartbeat = self._get_positive_float(self._connection_config(), "heartbeat_sec", DEFAULT_HEARTBEAT_SEC) - - try: - async with ClientSession(headers=headers, timeout=timeout) as session: - async with session.ws_connect(ws_url, heartbeat=heartbeat or None) as ws: - self._ws = ws - self.ctx.logger.info(f"NapCat 适配器已连接: {ws_url}") - await self._receive_loop(ws) - except asyncio.CancelledError: - raise - except Exception as exc: - self.ctx.logger.warning(f"NapCat 适配器连接失败: {exc}") - finally: - self._ws = None - await self._report_adapter_disconnected() - self._fail_pending_actions("NapCat connection interrupted") - - if not self._should_connect(): - break - - await asyncio.sleep( - self._get_positive_float( - self._connection_config(), - "reconnect_delay_sec", - DEFAULT_RECONNECT_DELAY_SEC, - ) + if self._transport is None: + self._transport = NapCatTransportClient( + logger=self.ctx.logger, + on_connection_opened=self._bootstrap_adapter_runtime_state, + on_connection_closed=self._handle_transport_disconnected, + on_payload=self._handle_transport_payload, ) - async def _receive_loop(self, ws: AiohttpClientWebSocketResponse) -> None: - """持续消费 WebSocket 消息并分发处理。 + if self._query_service is None: + self._query_service = NapCatQueryService(self.ctx.logger, self._transport) + + if self._inbound_codec is None: + self._inbound_codec = NapCatInboundCodec(self.ctx.logger, self._query_service) + + if self._notice_codec is None: + self._notice_codec = NapCatNoticeCodec(self.ctx.logger, self._query_service) + + if self._runtime_state is None: + self._runtime_state = NapCatRuntimeStateManager(self.ctx.adapter, self.ctx.logger) + + def _reload_settings(self) -> NapCatPluginSettings: + """重新解析当前插件配置。 + + Returns: + NapCatPluginSettings: 最新的规范化配置。 + """ + self._settings = NapCatPluginSettings.from_mapping(self._plugin_config, self.ctx.logger) + return self._settings + + async def _restart_connection_if_needed(self) -> None: + """根据当前配置重启连接循环。""" + self._ensure_runtime_components() + settings = self._reload_settings() + + await self._stop_connection() + if not settings.should_connect(): + self.ctx.logger.info("NapCat 适配器保持空闲状态,因为插件或配置未启用") + return + if not settings.validate(self.ctx.logger): + return + + transport = self._transport + assert transport is not None + if not transport.is_available(): + self.ctx.logger.error("NapCat 适配器依赖 aiohttp,但当前环境未安装该依赖") + return + + transport.configure(settings.napcat_server) + await transport.start() + + async def _stop_connection(self) -> None: + """停止当前连接。""" + transport = self._transport + if transport is not None: + await transport.stop() + return + + runtime_state = self._runtime_state + if runtime_state is not None: + await runtime_state.report_disconnected() + + async def _handle_transport_payload(self, payload: Dict[str, Any]) -> None: + """处理来自传输层的非 echo 载荷。 Args: - ws: 当前活跃的 WebSocket 连接对象。 + payload: NapCat 推送的原始事件数据。 """ - assert WSMsgType is not None - - bootstrap_task = asyncio.create_task( - self._bootstrap_adapter_runtime_state(), - name="napcat_adapter.bootstrap", - ) - self._background_tasks.add(bootstrap_task) - bootstrap_task.add_done_callback(self._background_tasks.discard) - - try: - async for ws_message in ws: - if ws_message.type != WSMsgType.TEXT: - if ws_message.type in {WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.ERROR}: - break - continue - - payload = self._parse_json_message(ws_message.data) - if payload is None: - continue - - if echo_id := str(payload.get("echo") or "").strip(): - self._resolve_pending_action(echo_id, payload) - continue - - if str(payload.get("post_type") or "").strip() != "message": - continue - - task = asyncio.create_task(self._handle_inbound_message(payload), name="napcat_adapter.inbound") - self._background_tasks.add(task) - task.add_done_callback(self._background_tasks.discard) - finally: - if not bootstrap_task.done(): - bootstrap_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await bootstrap_task + post_type = str(payload.get("post_type") or "").strip() + if post_type == "message": + await self._handle_inbound_message(payload) + return + if post_type == "notice": + await self._handle_notice_event(payload) + return + if post_type == "meta_event": + await self._handle_meta_event(payload) async def _handle_inbound_message(self, payload: Dict[str, Any]) -> None: """处理单条 NapCat 入站消息并注入 Host。 Args: - payload: NapCat / OneBot 推送的原始事件数据。 + payload: NapCat / OneBot 推送的原始消息事件。 """ + self._ensure_runtime_components() + settings = self._settings or self._reload_settings() + chat_filter = self._chat_filter + inbound_codec = self._inbound_codec + runtime_state = self._runtime_state + assert chat_filter is not None + assert inbound_codec is not None + assert runtime_state is not None + self_id = str(payload.get("self_id") or "").strip() if self_id: - await self._report_adapter_connected(self_id) + await runtime_state.report_connected(self_id, settings.napcat_server) sender = payload.get("sender", {}) - if not isinstance(sender, dict): + if not isinstance(sender, Mapping): sender = {} sender_user_id = str(payload.get("user_id") or sender.get("user_id") or "").strip() @@ -288,17 +235,17 @@ class NapCatAdapterPlugin(MaiBotPlugin): return group_id = str(payload.get("group_id") or "").strip() - if self_id and sender_user_id == self_id and self._get_bool(self._filters_config(), "ignore_self_message", True): + if self_id and sender_user_id == self_id and settings.filters.ignore_self_message: return - if not self._is_inbound_chat_allowed(sender_user_id, group_id): + if not chat_filter.is_inbound_chat_allowed(sender_user_id, group_id, settings.chat): return - message_dict = self._build_inbound_message_dict(payload, self_id, sender_user_id, sender) + message_dict = await inbound_codec.build_message_dict(payload, self_id, sender_user_id, sender) route_metadata: Dict[str, Any] = {} if self_id: route_metadata["self_id"] = self_id - if connection_id := self._get_string(self._connection_config(), "connection_id"): - route_metadata["connection_id"] = connection_id + if settings.napcat_server.connection_id: + route_metadata["connection_id"] = settings.napcat_server.connection_id external_message_id = str(payload.get("message_id") or "").strip() accepted = await self.ctx.adapter.receive_external_message( @@ -310,305 +257,78 @@ class NapCatAdapterPlugin(MaiBotPlugin): if not accepted: self.ctx.logger.debug(f"Host 丢弃了 NapCat 入站消息: {external_message_id or '无消息 ID'}") - def _build_inbound_message_dict( - self, - payload: Dict[str, Any], - self_id: str, - sender_user_id: str, - sender: Dict[str, Any], - ) -> Dict[str, Any]: - """构造 Host 侧可接受的 ``MessageDict``。 + async def _handle_notice_event(self, payload: Dict[str, Any]) -> None: + """处理 NapCat ``notice`` 事件并注入 Host。 Args: - payload: NapCat 原始消息事件。 - self_id: 当前机器人账号 ID。 - sender_user_id: 发送者用户 ID。 - sender: 发送者信息字典。 - - Returns: - Dict[str, Any]: 规范化后的 ``MessageDict``。 + payload: NapCat 推送的通知事件。 """ - message_type = str(payload.get("message_type") or "").strip() or "private" - group_id = str(payload.get("group_id") or "").strip() - group_name = str(payload.get("group_name") or "").strip() or (f"group_{group_id}" if group_id else "") - user_nickname = str(sender.get("nickname") or sender.get("card") or sender_user_id).strip() or sender_user_id - user_cardname = str(sender.get("card") or "").strip() or None + self._ensure_runtime_components() + notice_codec = self._notice_codec + runtime_state = self._runtime_state + settings = self._settings or self._reload_settings() + assert notice_codec is not None + assert runtime_state is not None - raw_message, is_at = self._convert_inbound_segments(payload.get("message"), self_id) - raw_message_text = str(payload.get("raw_message") or "").strip() - if not raw_message: - raw_message = [{"type": "text", "data": raw_message_text or "[unsupported]"}] + self_id = str(payload.get("self_id") or "").strip() + if self_id: + await runtime_state.report_connected(self_id, settings.napcat_server) - plain_text = self._build_plain_text(raw_message, raw_message_text) - timestamp_seconds = payload.get("time") - if not isinstance(timestamp_seconds, (int, float)): - timestamp_seconds = time.time() - - additional_config: Dict[str, Any] = {"self_id": self_id, "napcat_message_type": message_type} - if group_id: - additional_config["platform_io_target_group_id"] = group_id - else: - additional_config["platform_io_target_user_id"] = sender_user_id - - message_info: Dict[str, Any] = { - "user_info": { - "user_id": sender_user_id, - "user_nickname": user_nickname, - "user_cardname": user_cardname, - }, - "additional_config": additional_config, - } - if group_id: - message_info["group_info"] = {"group_id": group_id, "group_name": group_name} - - message_id = str(payload.get("message_id") or f"napcat-{uuid4().hex}").strip() - return { - "message_id": message_id, - "timestamp": str(float(timestamp_seconds)), - "platform": "qq", - "message_info": message_info, - "raw_message": raw_message, - "is_mentioned": is_at, - "is_at": is_at, - "is_emoji": False, - "is_picture": False, - "is_command": plain_text.startswith("/"), - "is_notify": False, - "session_id": "", - "processed_plain_text": plain_text, - "display_message": plain_text, - } - - def _convert_inbound_segments(self, message_payload: Any, self_id: str) -> Tuple[List[Dict[str, Any]], bool]: - """将 OneBot 消息段转换为 Host 消息段结构。 - - Args: - message_payload: OneBot 原始 ``message`` 字段。 - self_id: 当前机器人账号 ID。 - - Returns: - Tuple[List[Dict[str, Any]], bool]: 转换后的消息段列表,以及是否 @ 到当前机器人。 - """ - if isinstance(message_payload, str): - normalized_text = message_payload.strip() - return ([{"type": "text", "data": normalized_text}] if normalized_text else []), False - - if not isinstance(message_payload, list): - return [], False - - converted_segments: List[Dict[str, Any]] = [] - is_at = False - placeholder_texts = { - "face": "[face]", - "file": "[file]", - "image": "[image]", - "json": "[json]", - "record": "[voice]", - "video": "[video]", - "xml": "[xml]", - } - - for segment in message_payload: - if not isinstance(segment, dict): - continue - - segment_type = str(segment.get("type") or "").strip() - segment_data = segment.get("data", {}) - if not isinstance(segment_data, dict): - segment_data = {} - - if segment_type == "text": - if text_value := str(segment_data.get("text") or ""): - converted_segments.append({"type": "text", "data": text_value}) - continue - - if segment_type == "at": - if target_user_id := str(segment_data.get("qq") or "").strip(): - converted_segments.append( - { - "type": "at", - "data": { - "target_user_id": target_user_id, - "target_user_nickname": None, - "target_user_cardname": None, - }, - } - ) - if self_id and target_user_id == self_id: - is_at = True - continue - - if segment_type == "reply": - if target_message_id := str(segment_data.get("id") or "").strip(): - converted_segments.append({"type": "reply", "data": target_message_id}) - continue - - if placeholder := placeholder_texts.get(segment_type): - converted_segments.append({"type": "text", "data": placeholder}) - - return converted_segments, is_at - - def _build_outbound_action( - self, - message: Dict[str, Any], - route: Dict[str, Any], - ) -> Tuple[str, Dict[str, Any]]: - """为 Host 出站消息构造 OneBot 动作。 - - Args: - message: Host 侧标准 ``MessageDict``。 - route: Platform IO 路由信息。 - - Returns: - Tuple[str, Dict[str, Any]]: 动作名称与参数字典。 - """ - message_info = message.get("message_info", {}) - if not isinstance(message_info, dict): - message_info = {} - - group_info = message_info.get("group_info", {}) - if not isinstance(group_info, dict): - group_info = {} - - additional_config = message_info.get("additional_config", {}) - if not isinstance(additional_config, dict): - additional_config = {} - - raw_message = message.get("raw_message", []) - segments = self._convert_outbound_segments(raw_message) - - if target_group_id := str( - group_info.get("group_id") or additional_config.get("platform_io_target_group_id") or "" - ).strip(): - return "send_group_msg", {"group_id": target_group_id, "message": segments} - - if not ( - target_user_id := str( - additional_config.get("platform_io_target_user_id") - or additional_config.get("target_user_id") - or route.get("target_user_id") - or "" - ).strip() - ): - raise ValueError("Outbound private message is missing target_user_id") - - return "send_private_msg", {"message": segments, "user_id": target_user_id} - - def _convert_outbound_segments(self, raw_message: Any) -> List[Dict[str, Any]]: - """将 Host 消息段转换为 OneBot 消息段。 - - Args: - raw_message: Host 侧 ``raw_message`` 字段。 - - Returns: - List[Dict[str, Any]]: OneBot 消息段列表。 - """ - if not isinstance(raw_message, list): - return [{"type": "text", "data": {"text": ""}}] - - outbound_segments: List[Dict[str, Any]] = [] - for item in raw_message: - if not isinstance(item, dict): - continue - - item_type = str(item.get("type") or "").strip() - item_data = item.get("data") - - if item_type == "text": - text_value = str(item_data or "") - outbound_segments.append({"type": "text", "data": {"text": text_value}}) - continue - - if item_type == "at" and isinstance(item_data, dict): - if target_user_id := str(item_data.get("target_user_id") or "").strip(): - outbound_segments.append({"type": "at", "data": {"qq": target_user_id}}) - continue - - if item_type == "reply": - if target_message_id := str(item_data or "").strip(): - outbound_segments.append({"type": "reply", "data": {"id": target_message_id}}) - continue - - fallback_text = f"[unsupported:{item_type or 'unknown'}]" - outbound_segments.append({"type": "text", "data": {"text": fallback_text}}) - - if not outbound_segments: - outbound_segments.append({"type": "text", "data": {"text": ""}}) - return outbound_segments - - async def _call_action(self, action_name: str, params: Dict[str, Any]) -> Dict[str, Any]: - """发送 OneBot 动作并等待对应的 echo 响应。 - - Args: - action_name: OneBot 动作名称。 - params: 动作参数。 - - Returns: - Dict[str, Any]: NapCat 返回的原始响应字典。 - """ - ws = self._ws - if ws is None or ws.closed: - raise RuntimeError("NapCat is not connected") - - echo_id = uuid4().hex - loop = asyncio.get_running_loop() - response_future: asyncio.Future[Dict[str, Any]] = loop.create_future() - self._pending_actions[echo_id] = response_future - - request_payload = {"action": action_name, "params": params, "echo": echo_id} - try: - async with self._send_lock: - await ws.send_str(json.dumps(request_payload, ensure_ascii=False)) - timeout_seconds = self._get_positive_float( - self._connection_config(), - "action_timeout_sec", - DEFAULT_ACTION_TIMEOUT_SEC, - ) - return await asyncio.wait_for(response_future, timeout=timeout_seconds) - finally: - self._pending_actions.pop(echo_id, None) - - def _resolve_pending_action(self, echo_id: str, payload: Dict[str, Any]) -> None: - """解析等待中的动作响应。 - - Args: - echo_id: 动作请求对应的 echo 标识。 - payload: NapCat 返回的响应载荷。 - """ - response_future = self._pending_actions.get(echo_id) - if response_future is None or response_future.done(): + message_dict = await notice_codec.build_notice_message_dict(payload) + if message_dict is None: return - response_future.set_result(payload) - def _fail_pending_actions(self, error_message: str) -> None: - """让所有等待中的动作以异常方式结束。 + route_metadata: Dict[str, Any] = {} + if self_id: + route_metadata["self_id"] = self_id + if settings.napcat_server.connection_id: + route_metadata["connection_id"] = settings.napcat_server.connection_id + + external_message_id = str(payload.get("message_id") or payload.get("notice_type") or "").strip() + accepted = await self.ctx.adapter.receive_external_message( + message_dict, + route_metadata=route_metadata, + external_message_id=external_message_id or None, + dedupe_key=external_message_id or None, + ) + if not accepted: + self.ctx.logger.debug(f"Host 丢弃了 NapCat 通知事件: {external_message_id or '无消息 ID'}") + + async def _handle_meta_event(self, payload: Dict[str, Any]) -> None: + """处理 NapCat ``meta_event`` 事件。 Args: - error_message: 写入异常中的错误信息。 + payload: NapCat 推送的元事件。 """ - for response_future in self._pending_actions.values(): - if not response_future.done(): - response_future.set_exception(RuntimeError(error_message)) - self._pending_actions.clear() + self._ensure_runtime_components() + notice_codec = self._notice_codec + runtime_state = self._runtime_state + settings = self._settings or self._reload_settings() + assert notice_codec is not None + assert runtime_state is not None + + self_id = str(payload.get("self_id") or "").strip() + if self_id: + await runtime_state.report_connected(self_id, settings.napcat_server) + + await notice_codec.handle_meta_event(payload) async def _bootstrap_adapter_runtime_state(self) -> None: - """在连接建立后主动获取账号信息并激活适配器路由。 + """在连接建立后主动获取账号信息并激活适配器路由。""" + transport = self._transport + query_service = self._query_service + runtime_state = self._runtime_state + settings = self._settings or self._reload_settings() + if transport is None or query_service is None or runtime_state is None: + return - 该步骤会在 WebSocket 接收循环启动后异步执行,确保 `_call_action()` - 发出的 `get_login_info` 请求能够被同一连接上的接收循环消费到 echo - 响应,从而在真正收到业务消息前就完成 Host 侧 route 激活。 - """ max_attempts = 3 last_error: Optional[Exception] = None for attempt in range(1, max_attempts + 1): - ws = self._ws - if ws is None or ws.closed: - return - try: - response = await self._call_action("get_login_info", {}) - self_id = self._extract_self_id_from_login_response(response) - await self._report_adapter_connected(self_id) + login_info = await query_service.get_login_info() + self_id = self._extract_self_id_from_login_response(login_info) + await runtime_state.report_connected(self_id, settings.napcat_server) return except asyncio.CancelledError: raise @@ -623,430 +343,33 @@ class NapCatAdapterPlugin(MaiBotPlugin): if last_error is not None: self.ctx.logger.error(f"NapCat 适配器未能完成路由激活,连接将保持只接收状态: {last_error}") + async def _handle_transport_disconnected(self) -> None: + """处理传输层断开事件。""" + runtime_state = self._runtime_state + if runtime_state is not None: + await runtime_state.report_disconnected() + @staticmethod - def _extract_self_id_from_login_response(response: Dict[str, Any]) -> str: - """从 `get_login_info` 响应中提取当前账号 ID。 + def _extract_self_id_from_login_response(response: Optional[Dict[str, Any]]) -> str: + """从 ``get_login_info`` 查询结果中提取当前账号 ID。 Args: - response: NapCat 返回的原始动作响应。 + response: NapCat 返回的登录信息字典。 Returns: - str: 规范化后的 `self_id` 字符串。 + str: 规范化后的账号 ID 字符串。 Raises: ValueError: 当响应中缺少有效账号 ID 时抛出。 """ - if str(response.get("status") or "").lower() != "ok": - raise ValueError(str(response.get("wording") or response.get("message") or "get_login_info failed")) - - response_data = response.get("data", {}) - if not isinstance(response_data, dict): + if not isinstance(response, Mapping): raise ValueError("get_login_info 响应缺少 data 字段") - self_id = str(response_data.get("user_id") or "").strip() + self_id = str(response.get("user_id") or "").strip() if not self_id: raise ValueError("get_login_info 响应缺少有效的 user_id") return self_id - async def _report_adapter_connected(self, account_id: str) -> None: - """向 Host 上报当前连接已就绪。 - - Args: - account_id: 当前 NapCat 连接对应的机器人账号 ID。 - """ - normalized_account_id = str(account_id).strip() - if not normalized_account_id: - return - - scope = self._get_string(self._connection_config(), "connection_id").strip() - if ( - self._runtime_state_connected - and self._reported_account_id == normalized_account_id - and self._reported_scope == (scope or None) - ): - return - - accepted = False - try: - accepted = await self.ctx.adapter.update_runtime_state( - connected=True, - account_id=normalized_account_id, - scope=scope, - metadata={"ws_url": self._get_string(self._connection_config(), "ws_url")}, - ) - except Exception as exc: - self.ctx.logger.warning(f"NapCat 适配器上报连接就绪状态失败: {exc}") - return - - if not accepted: - self.ctx.logger.warning("NapCat 适配器连接已建立,但 Host 未接受运行时状态更新") - return - - self._runtime_state_connected = True - self._reported_account_id = normalized_account_id - self._reported_scope = scope or None - self.ctx.logger.info( - f"NapCat 适配器已激活路由: platform=qq account_id={normalized_account_id} " - f"scope={self._reported_scope or '*'}" - ) - - async def _report_adapter_disconnected(self) -> None: - """向 Host 上报当前连接已断开,并撤销适配器路由。""" - if not self._runtime_state_connected: - self._reported_account_id = None - self._reported_scope = None - return - - try: - await self.ctx.adapter.update_runtime_state(connected=False) - except Exception as exc: - self.ctx.logger.warning(f"NapCat 适配器上报断开状态失败: {exc}") - finally: - self._runtime_state_connected = False - self._reported_account_id = None - self._reported_scope = None - - def _build_headers(self) -> Dict[str, str]: - """构造连接 NapCat 所需的请求头。 - - Returns: - Dict[str, str]: WebSocket 握手请求头。 - """ - access_token = self._get_string(self._connection_config(), "access_token") - return {"Authorization": f"Bearer {access_token}"} if access_token else {} - - def _parse_json_message(self, data: Any) -> Optional[Dict[str, Any]]: - """解析 WebSocket 文本消息中的 JSON 数据。 - - Args: - data: WebSocket 收到的原始文本数据。 - - Returns: - Optional[Dict[str, Any]]: 成功时返回字典,失败时返回 ``None``。 - """ - try: - payload = json.loads(str(data)) - except Exception as exc: - self.ctx.logger.warning(f"NapCat 适配器解析 JSON 载荷失败: {exc}") - return None - - return payload if isinstance(payload, dict) else None - - def _build_plain_text(self, raw_message: List[Dict[str, Any]], fallback_text: str) -> str: - """从标准消息段中提取可展示的纯文本。 - - Args: - raw_message: 标准化后的消息段列表。 - fallback_text: 当无法拼出文本时使用的回退文本。 - - Returns: - str: 用于 Host 展示和命令判断的纯文本内容。 - """ - plain_text_parts: List[str] = [] - for item in raw_message: - if not isinstance(item, dict): - continue - item_type = str(item.get("type") or "").strip() - item_data = item.get("data") - if item_type == "text": - plain_text_parts.append(str(item_data or "")) - elif item_type == "at" and isinstance(item_data, dict): - plain_text_parts.append(f"@{item_data.get('target_user_id') or ''}") - elif item_type == "reply": - plain_text_parts.append("[reply]") - - plain_text = "".join(part for part in plain_text_parts if part).strip() - return plain_text or fallback_text or "[unsupported]" - - def _plugin_section(self) -> Dict[str, Any]: - """读取插件配置中的 ``plugin`` 段。 - - Returns: - Dict[str, Any]: ``plugin`` 配置字典。 - """ - plugin_section = self._plugin_config.get("plugin", {}) - return plugin_section if isinstance(plugin_section, dict) else {} - - def _connection_config(self) -> Dict[str, Any]: - """读取插件配置中的 ``connection`` 段。 - - Returns: - Dict[str, Any]: ``connection`` 配置字典。 - """ - connection_config = self._plugin_config.get("connection", {}) - return connection_config if isinstance(connection_config, dict) else {} - - def _filters_config(self) -> Dict[str, Any]: - """读取插件配置中的 ``filters`` 段。 - - Returns: - Dict[str, Any]: ``filters`` 配置字典。 - """ - filters_config = self._plugin_config.get("filters", {}) - return filters_config if isinstance(filters_config, dict) else {} - - def _chat_config(self) -> Dict[str, Any]: - """读取插件配置中的 ``chat`` 段。 - - Returns: - Dict[str, Any]: ``chat`` 配置字典。 - """ - chat_config = self._plugin_config.get("chat", {}) - return chat_config if isinstance(chat_config, dict) else {} - - def _is_inbound_chat_allowed(self, sender_user_id: str, group_id: str) -> bool: - """检查入站消息是否通过聊天名单过滤。 - - Args: - sender_user_id: 发送者用户 ID。 - group_id: 群聊 ID;私聊时为空字符串。 - - Returns: - bool: 若消息允许继续进入 Host,则返回 ``True``。 - """ - chat_config = self._chat_config() - banned_user_ids = self._get_string_list(chat_config, "ban_user_id") - if sender_user_id in banned_user_ids: - self.ctx.logger.warning(f"NapCat 用户 {sender_user_id} 在全局禁止名单中,消息被丢弃") - return False - - if group_id: - group_list_type = self._get_list_mode(chat_config, "group_list_type", DEFAULT_CHAT_LIST_TYPE) - group_id_list = self._get_string_list(chat_config, "group_list") - if not self._is_id_allowed_by_list_policy(group_id, group_list_type, group_id_list): - self.ctx.logger.warning(f"NapCat 群聊 {group_id} 未通过聊天名单过滤,消息被丢弃") - return False - return True - - private_list_type = self._get_list_mode(chat_config, "private_list_type", DEFAULT_CHAT_LIST_TYPE) - private_id_list = self._get_string_list(chat_config, "private_list") - if not self._is_id_allowed_by_list_policy(sender_user_id, private_list_type, private_id_list): - self.ctx.logger.warning(f"NapCat 私聊用户 {sender_user_id} 未通过聊天名单过滤,消息被丢弃") - return False - return True - - def _is_id_allowed_by_list_policy( - self, - target_id: str, - list_type: str, - configured_ids: Set[str], - ) -> bool: - """根据白名单或黑名单规则判断目标 ID 是否允许通过。 - - Args: - target_id: 待检查的目标 ID。 - list_type: 名单模式,仅支持 ``whitelist`` 或 ``blacklist``。 - configured_ids: 配置中的 ID 集合。 - - Returns: - bool: 若目标 ID 允许通过,则返回 ``True``。 - """ - if list_type == "whitelist": - return target_id in configured_ids - return target_id not in configured_ids - - def _validate_current_config(self) -> bool: - """校验当前配置是否满足启动连接的前提条件。 - - Returns: - bool: 配置可用于启动连接时返回 ``True``。 - """ - if not self._validate_plugin_config_version(): - return False - - connection_config = self._connection_config() - ws_url = self._get_string(connection_config, "ws_url") - if not ws_url: - self.ctx.logger.warning("NapCat 适配器已启用,但 connection.ws_url 为空") - return False - - self._validate_positive_float_setting( - connection_config, - "connection", - "reconnect_delay_sec", - DEFAULT_RECONNECT_DELAY_SEC, - ) - self._validate_positive_float_setting( - connection_config, - "connection", - "heartbeat_sec", - DEFAULT_HEARTBEAT_SEC, - ) - self._validate_positive_float_setting( - connection_config, - "connection", - "action_timeout_sec", - DEFAULT_ACTION_TIMEOUT_SEC, - ) - self._validate_list_mode_setting(self._chat_config(), "chat", "group_list_type", DEFAULT_CHAT_LIST_TYPE) - self._validate_list_mode_setting(self._chat_config(), "chat", "private_list_type", DEFAULT_CHAT_LIST_TYPE) - return True - - def _validate_plugin_config_version(self) -> bool: - """校验插件配置版本是否与当前实现兼容。 - - Returns: - bool: 版本兼容时返回 ``True``。 - """ - config_version = self._get_string(self._plugin_section(), "config_version") - if not config_version: - self.ctx.logger.error( - f"NapCat 适配器配置缺少 plugin.config_version,当前插件要求版本 {SUPPORTED_CONFIG_VERSION}" - ) - return False - - if config_version != SUPPORTED_CONFIG_VERSION: - self.ctx.logger.error( - "NapCat 适配器配置版本不兼容: " - f"当前为 {config_version},当前插件要求 {SUPPORTED_CONFIG_VERSION}" - ) - return False - - return True - - def _validate_positive_float_setting( - self, - mapping: Dict[str, Any], - section_name: str, - key: str, - default: float, - ) -> None: - """校验正浮点数配置项,并在非法时输出告警日志。 - - Args: - mapping: 待读取的配置字典。 - section_name: 当前配置段名称。 - key: 目标配置键名。 - default: 配置非法时实际使用的默认值。 - """ - value = mapping.get(key, default) - if isinstance(value, (int, float)) and float(value) > 0: - return - - self.ctx.logger.warning( - "NapCat 适配器配置项取值无效,已回退到默认值: " - f"{section_name}.{key}={value!r},默认值为 {default}" - ) - - def _validate_list_mode_setting( - self, - mapping: Dict[str, Any], - section_name: str, - key: str, - default: str, - ) -> None: - """校验名单模式配置项,并在非法时输出告警日志。 - - Args: - mapping: 待读取的配置字典。 - section_name: 当前配置段名称。 - key: 目标配置键名。 - default: 配置非法时实际使用的默认值。 - """ - value = mapping.get(key, default) - if isinstance(value, str) and value.strip() in {"whitelist", "blacklist"}: - return - - self.ctx.logger.warning( - "NapCat 适配器配置项取值无效,已回退到默认值: " - f"{section_name}.{key}={value!r},默认值为 {default}" - ) - - def _should_connect(self) -> bool: - """判断当前配置下是否应当启动连接。 - - Returns: - bool: 若启用了插件连接则返回 ``True``。 - """ - return self._get_bool(self._plugin_section(), "enabled", False) - - @staticmethod - def _get_bool(mapping: Dict[str, Any], key: str, default: bool) -> bool: - """安全读取布尔配置值。 - - Args: - mapping: 待读取的配置字典。 - key: 目标键名。 - default: 读取失败时的默认值。 - - Returns: - bool: 解析后的布尔值。 - """ - value = mapping.get(key, default) - return value if isinstance(value, bool) else default - - @staticmethod - def _get_positive_float(mapping: Dict[str, Any], key: str, default: float) -> float: - """安全读取正浮点数配置值。 - - Args: - mapping: 待读取的配置字典。 - key: 目标键名。 - default: 读取失败时的默认值。 - - Returns: - float: 合法的正浮点数;否则返回默认值。 - """ - value = mapping.get(key, default) - if isinstance(value, (int, float)) and float(value) > 0: - return float(value) - return default - - @staticmethod - def _get_string(mapping: Dict[str, Any], key: str) -> str: - """安全读取字符串配置值。 - - Args: - mapping: 待读取的配置字典。 - key: 目标键名。 - - Returns: - str: 去除首尾空白后的字符串值。 - """ - value = mapping.get(key) - return "" if value is None else str(value).strip() - - @staticmethod - def _get_list_mode(mapping: Dict[str, Any], key: str, default: str) -> str: - """安全读取名单模式配置值。 - - Args: - mapping: 待读取的配置字典。 - key: 目标键名。 - default: 读取失败时的默认值。 - - Returns: - str: 合法的名单模式字符串。 - """ - value = mapping.get(key, default) - if isinstance(value, str): - normalized_value = value.strip() - if normalized_value in {"whitelist", "blacklist"}: - return normalized_value - return default - - @staticmethod - def _get_string_list(mapping: Dict[str, Any], key: str) -> Set[str]: - """安全读取 ID 列表配置值。 - - Args: - mapping: 待读取的配置字典。 - key: 目标键名。 - - Returns: - Set[str]: 去重后的字符串 ID 集合。 - """ - value = mapping.get(key, []) - if not isinstance(value, list): - return set() - - normalized_values: Set[str] = set() - for item in value: - item_text = "" if item is None else str(item).strip() - if item_text: - normalized_values.add(item_text) - return normalized_values - def create_plugin() -> NapCatAdapterPlugin: """创建插件实例。 diff --git a/src/plugins/built_in/napcat_adapter/qq_notice.py b/src/plugins/built_in/napcat_adapter/qq_notice.py new file mode 100644 index 00000000..f577cf98 --- /dev/null +++ b/src/plugins/built_in/napcat_adapter/qq_notice.py @@ -0,0 +1,224 @@ +"""NapCat QQ 平台通知与元事件处理。""" + +from typing import Any, Dict, Mapping, Optional +from uuid import uuid4 + +import time + +from napcat_adapter.qq_queries import NapCatQueryService + + +class NapCatNoticeCodec: + """NapCat QQ 通知事件编码器。""" + + def __init__(self, logger: Any, query_service: NapCatQueryService) -> None: + """初始化通知事件编码器。 + + Args: + logger: 插件日志对象。 + query_service: QQ 查询服务。 + """ + self._logger = logger + self._query_service = query_service + + async def build_notice_message_dict(self, payload: Mapping[str, Any]) -> Optional[Dict[str, Any]]: + """将 NapCat ``notice`` 事件转换为 Host 可接受的消息字典。 + + Args: + payload: NapCat 推送的原始通知事件。 + + Returns: + Optional[Dict[str, Any]]: 成功时返回标准 ``MessageDict``;无法识别时返回 ``None``。 + """ + notice_type = str(payload.get("notice_type") or "").strip() + if not notice_type: + return None + + group_id = str(payload.get("group_id") or "").strip() + user_id = str(payload.get("user_id") or payload.get("operator_id") or "").strip() + self_id = str(payload.get("self_id") or "").strip() + + user_info = await self._build_user_info(group_id=group_id, user_id=user_id) + group_info = await self._build_group_info(group_id) + notice_text = self._build_notice_text(payload, user_info.get("user_nickname", user_id or "系统")) + if not notice_text: + return None + + additional_config: Dict[str, Any] = { + "self_id": self_id, + "napcat_notice_type": notice_type, + "napcat_notice_sub_type": str(payload.get("sub_type") or "").strip(), + "napcat_notice_payload": dict(payload), + } + if group_id: + additional_config["platform_io_target_group_id"] = group_id + elif user_id: + additional_config["platform_io_target_user_id"] = user_id + + message_info: Dict[str, Any] = {"user_info": user_info, "additional_config": additional_config} + if group_info is not None: + message_info["group_info"] = group_info + + timestamp_seconds = payload.get("time") + if not isinstance(timestamp_seconds, (int, float)): + timestamp_seconds = time.time() + + return { + "message_id": f"napcat-notice-{uuid4().hex}", + "timestamp": str(float(timestamp_seconds)), + "platform": "qq", + "message_info": message_info, + "raw_message": [{"type": "text", "data": notice_text}], + "is_mentioned": False, + "is_at": False, + "is_emoji": False, + "is_picture": False, + "is_command": False, + "is_notify": True, + "session_id": "", + "processed_plain_text": notice_text, + "display_message": notice_text, + } + + async def handle_meta_event(self, payload: Mapping[str, Any]) -> None: + """处理 ``meta_event`` 事件的日志与状态观测。 + + Args: + payload: NapCat 推送的原始元事件。 + """ + meta_event_type = str(payload.get("meta_event_type") or "").strip() + self_id = str(payload.get("self_id") or "").strip() or "unknown" + + if meta_event_type == "lifecycle": + sub_type = str(payload.get("sub_type") or "").strip() + if sub_type == "connect": + self._logger.info(f"NapCat 元事件:Bot {self_id} 已建立连接") + else: + self._logger.debug(f"NapCat 生命周期事件: self_id={self_id} sub_type={sub_type}") + return + + if meta_event_type == "heartbeat": + status = payload.get("status", {}) + if not isinstance(status, Mapping): + status = {} + is_online = bool(status.get("online", False)) + is_good = bool(status.get("good", False)) + interval_ms = payload.get("interval") + self._logger.debug( + f"NapCat 心跳事件: self_id={self_id} online={is_online} good={is_good} interval={interval_ms}" + ) + if not is_online: + self._logger.warning(f"NapCat 心跳显示 Bot {self_id} 已离线") + elif not is_good: + self._logger.warning(f"NapCat 心跳显示 Bot {self_id} 状态异常") + + async def _build_user_info(self, group_id: str, user_id: str) -> Dict[str, Optional[str]]: + """构造通知消息的用户信息。 + + Args: + group_id: 群号;私聊或系统通知时为空字符串。 + user_id: 事件关联用户号。 + + Returns: + Dict[str, Optional[str]]: 规范化后的用户信息字典。 + """ + if not user_id: + return { + "user_id": "notice", + "user_nickname": "系统通知", + "user_cardname": None, + } + + member_info: Optional[Dict[str, Any]] + if group_id: + member_info = await self._query_service.get_group_member_info(group_id, user_id) + else: + member_info = await self._query_service.get_stranger_info(user_id) + + if member_info is None: + return { + "user_id": user_id, + "user_nickname": user_id, + "user_cardname": None, + } + + return { + "user_id": user_id, + "user_nickname": str(member_info.get("nickname") or user_id), + "user_cardname": self._normalize_optional_string(member_info.get("card")), + } + + async def _build_group_info(self, group_id: str) -> Optional[Dict[str, str]]: + """构造通知消息的群信息。 + + Args: + group_id: 群号。 + + Returns: + Optional[Dict[str, str]]: 群信息字典;若不是群通知则返回 ``None``。 + """ + if not group_id: + return None + + group_info = await self._query_service.get_group_info(group_id) + group_name = str(group_info.get("group_name") or f"group_{group_id}") if group_info else f"group_{group_id}" + return {"group_id": group_id, "group_name": group_name} + + def _build_notice_text(self, payload: Mapping[str, Any], actor_name: str) -> str: + """根据 NapCat 通知事件生成可读文本。 + + Args: + payload: 原始通知事件。 + actor_name: 事件操作者显示名。 + + Returns: + str: 生成的可读通知文本。 + """ + notice_type = str(payload.get("notice_type") or "").strip() + sub_type = str(payload.get("sub_type") or "").strip() + target_id = str(payload.get("target_id") or "").strip() + + if notice_type in {"group_recall", "friend_recall"}: + return f"{actor_name} 撤回了一条消息" + if notice_type == "notify" and sub_type == "poke": + target_text = f" -> {target_id}" if target_id else "" + return f"{actor_name} 发起了戳一戳{target_text}" + if notice_type == "notify" and sub_type == "group_name": + return f"{actor_name} 修改了群名称" + if notice_type == "group_ban" and sub_type == "ban": + duration = payload.get("duration") + return f"{actor_name} 触发了群禁言,时长 {duration} 秒" + if notice_type == "group_ban" and sub_type == "lift_ban": + return f"{actor_name} 触发了解除禁言" + if notice_type == "group_upload": + file_info = payload.get("file", {}) + file_name = "" + if isinstance(file_info, Mapping): + file_name = str(file_info.get("name") or "").strip() + return f"{actor_name} 上传了文件{f':{file_name}' if file_name else ''}" + if notice_type == "group_increase": + return f"{actor_name} 加入了群聊" + if notice_type == "group_decrease": + return f"{actor_name} 离开了群聊" + if notice_type == "group_admin": + return f"{actor_name} 的群管理员状态发生变化" + if notice_type == "essence": + return f"{actor_name} 触发了精华消息事件" + if notice_type == "group_msg_emoji_like": + return f"{actor_name} 给一条消息添加了表情回应" + return f"[notice] {notice_type}.{sub_type}".strip(".") + + @staticmethod + def _normalize_optional_string(value: Any) -> Optional[str]: + """将任意值规范化为可选字符串。 + + Args: + value: 待规范化的值。 + + Returns: + Optional[str]: 规范化后的字符串;若值为空则返回 ``None``。 + """ + if value is None: + return None + normalized_value = str(value).strip() + return normalized_value if normalized_value else None diff --git a/src/plugins/built_in/napcat_adapter/qq_queries.py b/src/plugins/built_in/napcat_adapter/qq_queries.py new file mode 100644 index 00000000..7d29803a --- /dev/null +++ b/src/plugins/built_in/napcat_adapter/qq_queries.py @@ -0,0 +1,170 @@ +"""NapCat QQ 平台查询能力。""" + +from typing import TYPE_CHECKING, Any, Dict, Optional + +import asyncio + +if TYPE_CHECKING: + from napcat_adapter.transport import NapCatTransportClient + +try: + from aiohttp import ClientSession, ClientTimeout + + AIOHTTP_AVAILABLE = True +except ImportError: + ClientSession = None # type: ignore[assignment] + ClientTimeout = None # type: ignore[assignment] + AIOHTTP_AVAILABLE = False + + +class NapCatQueryService: + """NapCat QQ 平台查询服务。""" + + def __init__(self, logger: Any, transport: "NapCatTransportClient") -> None: + """初始化查询服务。 + + Args: + logger: 插件日志对象。 + transport: NapCat 传输层客户端。 + """ + self._logger = logger + self._transport = transport + + async def get_login_info(self) -> Optional[Dict[str, Any]]: + """获取当前登录账号信息。 + + Returns: + Optional[Dict[str, Any]]: 登录信息字典;失败时返回 ``None``。 + """ + return await self._call_query("get_login_info", {}) + + async def get_group_info(self, group_id: str) -> Optional[Dict[str, Any]]: + """获取群信息。 + + Args: + group_id: 群号。 + + Returns: + Optional[Dict[str, Any]]: 群信息字典;失败时返回 ``None``。 + """ + return await self._call_query("get_group_info", {"group_id": group_id}) + + async def get_group_member_info(self, group_id: str, user_id: str) -> Optional[Dict[str, Any]]: + """获取群成员信息。 + + Args: + group_id: 群号。 + user_id: 用户号。 + + Returns: + Optional[Dict[str, Any]]: 群成员信息字典;失败时返回 ``None``。 + """ + return await self._call_query( + "get_group_member_info", + {"group_id": group_id, "user_id": user_id, "no_cache": True}, + ) + + async def get_stranger_info(self, user_id: str) -> Optional[Dict[str, Any]]: + """获取陌生人信息。 + + Args: + user_id: 用户号。 + + Returns: + Optional[Dict[str, Any]]: 陌生人信息字典;失败时返回 ``None``。 + """ + return await self._call_query("get_stranger_info", {"user_id": user_id}) + + async def get_message_detail(self, message_id: str) -> Optional[Dict[str, Any]]: + """获取消息详情。 + + Args: + message_id: 消息 ID。 + + Returns: + Optional[Dict[str, Any]]: 消息详情字典;失败时返回 ``None``。 + """ + return await self._call_query("get_msg", {"message_id": message_id}) + + async def get_forward_message(self, message_id: str) -> Optional[Dict[str, Any]]: + """获取合并转发消息详情。 + + Args: + message_id: 转发消息 ID。 + + Returns: + Optional[Dict[str, Any]]: 合并转发消息详情;失败时返回 ``None``。 + """ + return await self._call_query("get_forward_msg", {"message_id": message_id}) + + async def get_record_detail(self, file_name: str, file_id: Optional[str] = None) -> Optional[Dict[str, Any]]: + """获取语音文件详情。 + + Args: + file_name: 语音文件名。 + file_id: 可选文件 ID。 + + Returns: + Optional[Dict[str, Any]]: 语音详情字典;失败时返回 ``None``。 + """ + params: Dict[str, Any] = {"file": file_name, "out_format": "wav"} + if file_id: + params["file_id"] = file_id + return await self._call_query("get_record", params) + + async def download_binary(self, url: str) -> Optional[bytes]: + """下载远程二进制资源。 + + Args: + url: 资源 URL。 + + Returns: + Optional[bytes]: 下载到的二进制内容;失败时返回 ``None``。 + """ + if not url: + return None + if not AIOHTTP_AVAILABLE or ClientSession is None or ClientTimeout is None: + self._logger.warning("NapCat 查询层缺少 aiohttp,无法下载远程资源") + return None + + try: + timeout = ClientTimeout(total=15) + async with ClientSession(timeout=timeout) as session: + async with session.get(url) as response: + if response.status != 200: + self._logger.warning(f"NapCat 远程资源下载失败: status={response.status} url={url}") + return None + return await response.read() + except asyncio.CancelledError: + raise + except Exception as exc: + self._logger.warning(f"NapCat 远程资源下载失败: {exc}") + return None + + async def _call_query(self, action_name: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """调用 OneBot 查询动作并提取 ``data`` 字段。 + + Args: + action_name: OneBot 动作名。 + params: 动作参数。 + + Returns: + Optional[Dict[str, Any]]: 查询结果中的 ``data`` 字段;失败时返回 ``None``。 + """ + try: + response = await self._transport.call_action(action_name, params) + except asyncio.CancelledError: + raise + except Exception as exc: + self._logger.warning(f"NapCat 查询动作执行失败: action={action_name} error={exc}") + return None + + if str(response.get("status") or "").lower() != "ok": + self._logger.warning( + f"NapCat 查询动作返回失败: action={action_name} " + f"message={response.get('wording') or response.get('message') or 'unknown'}" + ) + return None + + response_data = response.get("data") + return response_data if isinstance(response_data, dict) else None diff --git a/src/plugins/built_in/napcat_adapter/runtime_state.py b/src/plugins/built_in/napcat_adapter/runtime_state.py new file mode 100644 index 00000000..b4dbfa09 --- /dev/null +++ b/src/plugins/built_in/napcat_adapter/runtime_state.py @@ -0,0 +1,85 @@ +"""NapCat 运行时路由状态管理。""" + +from typing import Any, Optional + +from napcat_adapter.config import NapCatServerConfig + + +class NapCatRuntimeStateManager: + """NapCat 适配器路由状态上报器。""" + + def __init__(self, adapter_capability: Any, logger: Any) -> None: + """初始化运行时状态管理器。 + + Args: + adapter_capability: SDK 提供的适配器能力对象。 + logger: 插件日志对象。 + """ + self._adapter_capability = adapter_capability + self._logger = logger + self._runtime_state_connected: bool = False + self._reported_account_id: Optional[str] = None + self._reported_scope: Optional[str] = None + + async def report_connected(self, account_id: str, server_config: NapCatServerConfig) -> bool: + """向 Host 上报当前连接已就绪。 + + Args: + account_id: 当前 NapCat 连接对应的机器人账号 ID。 + server_config: 当前生效的 NapCat 服务端配置。 + + Returns: + bool: 若 Host 接受了运行时状态更新,则返回 ``True``。 + """ + normalized_account_id = str(account_id).strip() + if not normalized_account_id: + return False + + scope = server_config.connection_id or None + if ( + self._runtime_state_connected + and self._reported_account_id == normalized_account_id + and self._reported_scope == scope + ): + return True + + accepted = False + try: + accepted = await self._adapter_capability.update_runtime_state( + connected=True, + account_id=normalized_account_id, + scope=server_config.connection_id, + metadata={"ws_url": server_config.build_ws_url()}, + ) + except Exception as exc: + self._logger.warning(f"NapCat 适配器上报连接就绪状态失败: {exc}") + return False + + if not accepted: + self._logger.warning("NapCat 适配器连接已建立,但 Host 未接受运行时状态更新") + return False + + self._runtime_state_connected = True + self._reported_account_id = normalized_account_id + self._reported_scope = scope + self._logger.info( + f"NapCat 适配器已激活路由: platform=qq account_id={normalized_account_id} " + f"scope={self._reported_scope or '*'}" + ) + return True + + async def report_disconnected(self) -> None: + """向 Host 上报当前连接已断开,并撤销适配器路由。""" + if not self._runtime_state_connected: + self._reported_account_id = None + self._reported_scope = None + return + + try: + await self._adapter_capability.update_runtime_state(connected=False) + except Exception as exc: + self._logger.warning(f"NapCat 适配器上报断开状态失败: {exc}") + finally: + self._runtime_state_connected = False + self._reported_account_id = None + self._reported_scope = None diff --git a/src/plugins/built_in/napcat_adapter/transport.py b/src/plugins/built_in/napcat_adapter/transport.py new file mode 100644 index 00000000..d20de097 --- /dev/null +++ b/src/plugins/built_in/napcat_adapter/transport.py @@ -0,0 +1,322 @@ +"""NapCat 正向 WebSocket 传输层。""" + +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Set, cast +from uuid import uuid4 + +import asyncio +import contextlib +import json + +from napcat_adapter.config import NapCatServerConfig + +if TYPE_CHECKING: + from aiohttp import ClientWebSocketResponse as AiohttpClientWebSocketResponse + +try: + from aiohttp import ClientSession, ClientTimeout, WSMsgType + + AIOHTTP_AVAILABLE = True +except ImportError: + ClientSession = cast(Any, None) + ClientTimeout = cast(Any, None) + WSMsgType = cast(Any, None) + AIOHTTP_AVAILABLE = False + +if not TYPE_CHECKING: + AiohttpClientWebSocketResponse = Any + + +class NapCatTransportClient: + """NapCat 正向 WebSocket 客户端。""" + + def __init__( + self, + logger: Any, + on_connection_opened: Callable[[], Awaitable[None]], + on_connection_closed: Callable[[], Awaitable[None]], + on_payload: Callable[[Dict[str, Any]], Awaitable[None]], + ) -> None: + """初始化传输层客户端。 + + Args: + logger: 插件日志对象。 + on_connection_opened: 连接建立后的异步回调。 + on_connection_closed: 连接断开后的异步回调。 + on_payload: 收到非 echo 载荷后的异步回调。 + """ + self._logger = logger + self._on_connection_opened = on_connection_opened + self._on_connection_closed = on_connection_closed + self._on_payload = on_payload + self._server_config: Optional[NapCatServerConfig] = None + self._connection_task: Optional[asyncio.Task[None]] = None + self._pending_actions: Dict[str, asyncio.Future[Dict[str, Any]]] = {} + self._background_tasks: Set[asyncio.Task[Any]] = set() + self._send_lock = asyncio.Lock() + self._ws: Optional[AiohttpClientWebSocketResponse] = None + self._stop_requested: bool = False + self._connection_active: bool = False + + @classmethod + def is_available(cls) -> bool: + """判断当前环境是否安装了传输层依赖。 + + Returns: + bool: 若已安装 ``aiohttp``,则返回 ``True``。 + """ + return AIOHTTP_AVAILABLE + + def configure(self, server_config: NapCatServerConfig) -> None: + """更新当前传输层使用的 NapCat 服务端配置。 + + Args: + server_config: 最新生效的 NapCat 服务端配置。 + """ + self._server_config = server_config + + async def start(self) -> None: + """启动 NapCat 正向 WebSocket 连接循环。 + + Raises: + RuntimeError: 当缺少配置或依赖时抛出。 + """ + if not self.is_available(): + raise RuntimeError("NapCat 适配器依赖 aiohttp,但当前环境未安装该依赖") + if self._server_config is None: + raise RuntimeError("NapCat 适配器尚未配置 napcat_server") + if self._connection_task is not None and not self._connection_task.done(): + return + + self._stop_requested = False + self._connection_task = asyncio.create_task(self._connection_loop(), name="napcat_adapter.connection") + + async def stop(self) -> None: + """停止当前连接并清理所有后台任务。""" + self._stop_requested = True + connection_task = self._connection_task + self._connection_task = None + + ws = self._ws + if ws is not None and not ws.closed: + with contextlib.suppress(Exception): + await ws.close() + self._ws = None + + if connection_task is not None: + connection_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await connection_task + + await self._cancel_background_tasks() + await self._notify_connection_closed() + self._fail_pending_actions("NapCat connection closed") + + async def call_action(self, action_name: str, params: Dict[str, Any]) -> Dict[str, Any]: + """发送 OneBot 动作并等待对应的 echo 响应。 + + Args: + action_name: OneBot 动作名称。 + params: 动作参数。 + + Returns: + Dict[str, Any]: NapCat 返回的原始响应字典。 + + Raises: + RuntimeError: 当连接不可用时抛出。 + """ + ws = self._ws + server_config = self._server_config + if ws is None or ws.closed or server_config is None: + raise RuntimeError("NapCat is not connected") + + echo_id = uuid4().hex + loop = asyncio.get_running_loop() + response_future: asyncio.Future[Dict[str, Any]] = loop.create_future() + self._pending_actions[echo_id] = response_future + + request_payload = {"action": action_name, "params": params, "echo": echo_id} + try: + async with self._send_lock: + await ws.send_str(json.dumps(request_payload, ensure_ascii=False)) + return await asyncio.wait_for(response_future, timeout=server_config.action_timeout_sec) + finally: + self._pending_actions.pop(echo_id, None) + + async def _connection_loop(self) -> None: + """维护单个 WebSocket 连接,并在断开后按配置重连。""" + assert ClientSession is not None + assert ClientTimeout is not None + + while not self._stop_requested: + server_config = self._server_config + if server_config is None: + return + + ws_url = server_config.build_ws_url() + timeout = ClientTimeout(total=None, connect=10) + + try: + async with ClientSession(headers=self._build_headers(server_config), timeout=timeout) as session: + async with session.ws_connect(ws_url, heartbeat=server_config.heartbeat_interval or None) as ws: + self._ws = ws + self._logger.info(f"NapCat 适配器已连接: {ws_url}") + await self._receive_loop(ws) + except asyncio.CancelledError: + raise + except Exception as exc: + self._logger.warning(f"NapCat 适配器连接失败: {exc}") + finally: + self._ws = None + await self._notify_connection_closed() + self._fail_pending_actions("NapCat connection interrupted") + + if self._stop_requested: + break + + await asyncio.sleep(server_config.reconnect_delay_sec) + + async def _receive_loop(self, ws: AiohttpClientWebSocketResponse) -> None: + """持续消费 WebSocket 消息并分发处理。 + + Args: + ws: 当前活跃的 WebSocket 连接对象。 + """ + assert WSMsgType is not None + + bootstrap_task = self._create_background_task( + self._notify_connection_opened(), + "napcat_adapter.bootstrap", + ) + try: + async for ws_message in ws: + if ws_message.type != WSMsgType.TEXT: + if ws_message.type in {WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.ERROR}: + break + continue + + payload = self._parse_json_message(ws_message.data) + if payload is None: + continue + + if echo_id := str(payload.get("echo") or "").strip(): + self._resolve_pending_action(echo_id, payload) + continue + + self._create_background_task(self._on_payload(payload), "napcat_adapter.payload") + finally: + if bootstrap_task is not None and not bootstrap_task.done(): + bootstrap_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await bootstrap_task + + def _create_background_task(self, coroutine: Awaitable[Any], name: str) -> asyncio.Task[Any]: + """创建并跟踪一个后台任务。 + + Args: + coroutine: 待执行的协程对象。 + name: 任务名。 + + Returns: + asyncio.Task[Any]: 已创建的后台任务。 + """ + task = asyncio.create_task(coroutine, name=name) + self._background_tasks.add(task) + task.add_done_callback(self._handle_background_task_completion) + return task + + def _handle_background_task_completion(self, task: asyncio.Task[Any]) -> None: + """处理后台任务结束后的清理与异常记录。 + + Args: + task: 已结束的后台任务。 + """ + self._background_tasks.discard(task) + if task.cancelled(): + return + + exception = task.exception() + if exception is not None: + self._logger.error(f"NapCat 适配器后台任务异常: {exception}", exc_info=True) + + async def _cancel_background_tasks(self) -> None: + """取消所有仍在运行的后台任务。""" + background_tasks = list(self._background_tasks) + for task in background_tasks: + task.cancel() + if background_tasks: + with contextlib.suppress(Exception): + await asyncio.gather(*background_tasks, return_exceptions=True) + self._background_tasks.clear() + + async def _notify_connection_opened(self) -> None: + """在连接建立后触发上层回调。""" + if self._connection_active: + return + + self._connection_active = True + try: + await self._on_connection_opened() + except Exception as exc: + self._logger.warning(f"NapCat 适配器连接建立回调失败: {exc}") + + async def _notify_connection_closed(self) -> None: + """在连接断开后触发上层回调。""" + if not self._connection_active: + return + + self._connection_active = False + try: + await self._on_connection_closed() + except Exception as exc: + self._logger.warning(f"NapCat 适配器断连回调失败: {exc}") + + def _resolve_pending_action(self, echo_id: str, payload: Dict[str, Any]) -> None: + """解析等待中的动作响应。 + + Args: + echo_id: 动作请求对应的 echo 标识。 + payload: NapCat 返回的响应载荷。 + """ + response_future = self._pending_actions.get(echo_id) + if response_future is None or response_future.done(): + return + response_future.set_result(payload) + + def _fail_pending_actions(self, error_message: str) -> None: + """让所有等待中的动作以异常方式结束。 + + Args: + error_message: 写入异常中的错误信息。 + """ + for response_future in self._pending_actions.values(): + if not response_future.done(): + response_future.set_exception(RuntimeError(error_message)) + self._pending_actions.clear() + + def _build_headers(self, server_config: NapCatServerConfig) -> Dict[str, str]: + """构造连接 NapCat 所需的请求头。 + + Args: + server_config: 当前生效的 NapCat 服务端配置。 + + Returns: + Dict[str, str]: WebSocket 握手请求头。 + """ + return {"Authorization": f"Bearer {server_config.token}"} if server_config.token else {} + + def _parse_json_message(self, data: Any) -> Optional[Dict[str, Any]]: + """解析 WebSocket 文本消息中的 JSON 数据。 + + Args: + data: WebSocket 收到的原始文本数据。 + + Returns: + Optional[Dict[str, Any]]: 成功时返回字典,失败时返回 ``None``。 + """ + try: + payload = json.loads(str(data)) + except Exception as exc: + self._logger.warning(f"NapCat 适配器解析 JSON 载荷失败: {exc}") + return None + + return payload if isinstance(payload, dict) else None