diff --git a/src/plugins/built_in/napcat_adapter/__init__.py b/src/plugins/built_in/napcat_adapter/__init__.py deleted file mode 100644 index fa82860f..00000000 --- a/src/plugins/built_in/napcat_adapter/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""NapCat 内置适配器插件包。""" diff --git a/src/plugins/built_in/napcat_adapter/_manifest.json b/src/plugins/built_in/napcat_adapter/_manifest.json deleted file mode 100644 index 6f7e68fd..00000000 --- a/src/plugins/built_in/napcat_adapter/_manifest.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "manifest_version": 1, - "name": "napcat_adapter_builtin", - "version": "0.1.0", - "description": "Built-in NapCat adapter plugin for MVP message forwarding.", - "author": { - "name": "OpenAI Codex" - }, - "license": "GPL-v3.0-or-later", - "host_application": { - "min_version": "1.0.0" - }, - "keywords": [ - "adapter", - "built-in", - "napcat", - "onebot", - "qq" - ], - "categories": [ - "Adapter", - "Built-in" - ], - "default_locale": "en-US", - "plugin_info": { - "is_built_in": true, - "plugin_type": "adapter" - }, - "capabilities": [] -} diff --git a/src/plugins/built_in/napcat_adapter/codec_inbound.py b/src/plugins/built_in/napcat_adapter/codec_inbound.py deleted file mode 100644 index 8fb020dc..00000000 --- a/src/plugins/built_in/napcat_adapter/codec_inbound.py +++ /dev/null @@ -1,510 +0,0 @@ -"""NapCat 入站消息编解码。""" - -from typing import Any, Dict, List, Mapping, Optional, Tuple -from uuid import uuid4 - -import hashlib -import json -import re -import time - -from napcat_adapter.qq_queries import NapCatQueryService - - -_CQ_SEGMENT_PATTERN = re.compile(r"\[CQ:(?P[a-zA-Z0-9_]+)(?P(?:,[^\]]*)?)\]") - - -class NapCatInboundCodec: - """NapCat 入站消息编码器。""" - - def __init__(self, logger: Any, query_service: NapCatQueryService) -> None: - """初始化入站消息编码器。 - - Args: - logger: 插件日志对象。 - query_service: QQ 查询服务。 - """ - self._logger = logger - self._query_service = query_service - - async def build_message_dict( - self, - payload: Mapping[str, Any], - self_id: str, - sender_user_id: str, - sender: Mapping[str, Any], - ) -> Dict[str, Any]: - """构造 Host 侧可接受的 ``MessageDict``。 - - Args: - payload: NapCat 原始消息事件。 - self_id: 当前机器人账号 ID。 - sender_user_id: 发送者用户 ID。 - sender: 发送者信息字典。 - - Returns: - Dict[str, Any]: 规范化后的 ``MessageDict``。 - """ - message_type = str(payload.get("message_type") or "").strip() or "private" - group_id = str(payload.get("group_id") or "").strip() - group_name = str(payload.get("group_name") or "").strip() or (f"group_{group_id}" if group_id else "") - user_nickname = str(sender.get("nickname") or sender.get("card") or sender_user_id).strip() or sender_user_id - user_cardname = str(sender.get("card") or "").strip() or None - - raw_message, is_at = await self.convert_segments(payload, self_id) - raw_message_text = str(payload.get("raw_message") or "").strip() - if not raw_message: - raw_message = [{"type": "text", "data": raw_message_text or "[unsupported]"}] - - plain_text = self.build_plain_text(raw_message, raw_message_text) - timestamp_seconds = payload.get("time") - if not isinstance(timestamp_seconds, (int, float)): - timestamp_seconds = time.time() - - additional_config: Dict[str, Any] = {"self_id": self_id, "napcat_message_type": message_type} - if group_id: - additional_config["platform_io_target_group_id"] = group_id - else: - additional_config["platform_io_target_user_id"] = sender_user_id - - message_info: Dict[str, Any] = { - "user_info": { - "user_id": sender_user_id, - "user_nickname": user_nickname, - "user_cardname": user_cardname, - }, - "additional_config": additional_config, - } - if group_id: - message_info["group_info"] = {"group_id": group_id, "group_name": group_name} - - message_id = str(payload.get("message_id") or f"napcat-{uuid4().hex}").strip() - return { - "message_id": message_id, - "timestamp": str(float(timestamp_seconds)), - "platform": "qq", - "message_info": message_info, - "raw_message": raw_message, - "is_mentioned": is_at, - "is_at": is_at, - "is_emoji": False, - "is_picture": False, - "is_command": plain_text.startswith("/"), - "is_notify": False, - "session_id": "", - "processed_plain_text": plain_text, - "display_message": plain_text, - } - - async def convert_segments(self, payload: Mapping[str, Any], self_id: str) -> Tuple[List[Dict[str, Any]], bool]: - """将 OneBot 消息段转换为 Host 消息段结构。 - - Args: - payload: OneBot 原始消息事件。 - self_id: 当前机器人账号 ID。 - - Returns: - Tuple[List[Dict[str, Any]], bool]: 转换后的消息段列表,以及是否 @ 到当前机器人。 - """ - message_payload = payload.get("message") - if isinstance(message_payload, str): - parsed_message_payload = self._parse_cq_message_text(message_payload) - if parsed_message_payload: - message_payload = parsed_message_payload - else: - normalized_text = self._decode_cq_entities(message_payload).strip() - return ([{"type": "text", "data": normalized_text}] if normalized_text else []), False - - if not isinstance(message_payload, list): - return [], False - - converted_segments: List[Dict[str, Any]] = [] - is_at = False - for segment in message_payload: - if not isinstance(segment, Mapping): - continue - - segment_type = str(segment.get("type") or "").strip() - segment_data = segment.get("data", {}) - if not isinstance(segment_data, Mapping): - segment_data = {} - - if segment_type == "text": - if text_value := str(segment_data.get("text") or ""): - converted_segments.append({"type": "text", "data": text_value}) - continue - - if segment_type == "at": - if target_user_id := str(segment_data.get("qq") or "").strip(): - converted_segments.append( - { - "type": "at", - "data": { - "target_user_id": target_user_id, - "target_user_nickname": None, - "target_user_cardname": None, - }, - } - ) - if self_id and target_user_id == self_id: - is_at = True - continue - - if segment_type == "reply": - if reply_segment := await self._build_reply_segment(segment_data): - converted_segments.append(reply_segment) - continue - - if segment_type == "face": - converted_segments.append({"type": "text", "data": "[face]"}) - continue - - if segment_type == "image": - converted_segments.append(await self._build_image_like_segment(segment_data, is_emoji=False)) - continue - - if segment_type == "record": - converted_segments.append(await self._build_record_segment(segment_data)) - continue - - if segment_type == "video": - converted_segments.append({"type": "text", "data": "[video]"}) - continue - - if segment_type == "file": - converted_segments.append({"type": "text", "data": "[file]"}) - continue - - if segment_type == "json": - converted_segments.append(self._build_json_text_segment(segment_data)) - continue - - if segment_type == "forward": - if forward_segment := await self._build_forward_segment(segment_data): - converted_segments.append(forward_segment) - continue - - if segment_type in {"xml", "share"}: - converted_segments.append({"type": "text", "data": f"[{segment_type}]"}) - - return converted_segments, is_at - - async def _build_reply_segment(self, segment_data: Mapping[str, Any]) -> Optional[Dict[str, Any]]: - """构造回复消息段。 - - Args: - segment_data: OneBot ``reply`` 段的 ``data`` 字典。 - - Returns: - Optional[Dict[str, Any]]: 转换后的回复消息段;缺少消息 ID 时返回 ``None``。 - """ - target_message_id = str(segment_data.get("id") or "").strip() - if not target_message_id: - return None - - message_detail = await self._query_service.get_message_detail(target_message_id) - reply_payload: Dict[str, Any] = {"target_message_id": target_message_id} - if message_detail is not None: - sender = message_detail.get("sender", {}) - if not isinstance(sender, Mapping): - sender = {} - reply_payload["target_message_content"] = str(message_detail.get("raw_message") or "").strip() or None - reply_payload["target_message_sender_id"] = str( - message_detail.get("user_id") or sender.get("user_id") or "" - ).strip() or None - reply_payload["target_message_sender_nickname"] = str(sender.get("nickname") or "").strip() or None - reply_payload["target_message_sender_cardname"] = str(sender.get("card") or "").strip() or None - - return {"type": "reply", "data": reply_payload} - - async def _build_image_like_segment( - self, - segment_data: Mapping[str, Any], - is_emoji: bool, - ) -> Dict[str, Any]: - """构造图片或表情消息段。 - - Args: - segment_data: OneBot ``image`` 段的 ``data`` 字典。 - is_emoji: 是否按表情组件处理。 - - Returns: - Dict[str, Any]: 转换后的图片或表情消息段。 - """ - subtype = self._normalize_numeric_segment_value(segment_data.get("sub_type")) - actual_is_emoji = is_emoji or (subtype is not None and subtype not in {0, 4, 9}) - - image_url = str(segment_data.get("url") or "").strip() - binary_data = await self._query_service.download_binary(image_url) - if not binary_data: - return {"type": "text", "data": "[emoji]" if actual_is_emoji else "[image]"} - - return { - "type": "emoji" if actual_is_emoji else "image", - "data": "", - "hash": hashlib.sha256(binary_data).hexdigest(), - "binary_data_base64": self._encode_binary(binary_data), - } - - async def _build_record_segment(self, segment_data: Mapping[str, Any]) -> Dict[str, Any]: - """构造语音消息段。 - - Args: - segment_data: OneBot ``record`` 段的 ``data`` 字典。 - - Returns: - Dict[str, Any]: 转换后的语音或占位文本消息段。 - """ - file_name = str(segment_data.get("file") or "").strip() - file_id = str(segment_data.get("file_id") or "").strip() or None - if not file_name: - return {"type": "text", "data": "[voice]"} - - record_detail = await self._query_service.get_record_detail(file_name=file_name, file_id=file_id) - if record_detail is None: - return {"type": "text", "data": "[voice]"} - - record_base64 = str(record_detail.get("base64") or "").strip() - if not record_base64: - return {"type": "text", "data": "[voice]"} - - try: - binary_data = self._decode_binary(record_base64) - except Exception: - return {"type": "text", "data": "[voice]"} - - return { - "type": "voice", - "data": "", - "hash": hashlib.sha256(binary_data).hexdigest(), - "binary_data_base64": self._encode_binary(binary_data), - } - - async def _build_forward_segment(self, segment_data: Mapping[str, Any]) -> Optional[Dict[str, Any]]: - """构造合并转发消息段。 - - Args: - segment_data: OneBot ``forward`` 段的 ``data`` 字典。 - - Returns: - Optional[Dict[str, Any]]: 转换后的合并转发消息段;失败时返回 ``None``。 - """ - message_id = str(segment_data.get("id") or "").strip() - if not message_id: - return None - - forward_detail = await self._query_service.get_forward_message(message_id) - if forward_detail is None: - return {"type": "text", "data": "[forward]"} - - messages = forward_detail.get("messages", []) - if not isinstance(messages, list): - return {"type": "text", "data": "[forward]"} - - forward_nodes: List[Dict[str, Any]] = [] - for forward_message in messages: - if not isinstance(forward_message, Mapping): - continue - raw_content = forward_message.get("content", []) - content_segments = await self._convert_forward_content(raw_content, "") - sender = forward_message.get("sender", {}) - if not isinstance(sender, Mapping): - sender = {} - forward_nodes.append( - { - "user_id": str(sender.get("user_id") or sender.get("uin") or "").strip() or None, - "user_nickname": str(sender.get("nickname") or sender.get("name") or "未知用户"), - "user_cardname": str(sender.get("card") or "").strip() or None, - "message_id": str(forward_message.get("message_id") or uuid4().hex), - "content": content_segments or [{"type": "text", "data": "[empty]"}], - } - ) - - if not forward_nodes: - return {"type": "text", "data": "[forward]"} - return {"type": "forward", "data": forward_nodes} - - async def _convert_forward_content(self, raw_content: Any, self_id: str) -> List[Dict[str, Any]]: - """转换转发节点内部的消息段列表。 - - Args: - raw_content: 转发节点原始内容。 - self_id: 当前机器人账号 ID。 - - Returns: - List[Dict[str, Any]]: 转换后的消息段列表。 - """ - pseudo_payload: Dict[str, Any] = {"message": raw_content} - segments, _ = await self.convert_segments(pseudo_payload, self_id) - return segments - - def _build_json_text_segment(self, segment_data: Mapping[str, Any]) -> Dict[str, Any]: - """将 JSON 卡片最佳努力转换为文本占位。 - - Args: - segment_data: OneBot ``json`` 段的 ``data`` 字典。 - - Returns: - Dict[str, Any]: 转换后的文本消息段。 - """ - json_data = str(segment_data.get("data") or "").strip() - if not json_data: - return {"type": "text", "data": "[json]"} - - try: - parsed_json = json.loads(json_data) - except Exception: - return {"type": "text", "data": "[json]"} - - app_name = str(parsed_json.get("app") or "").strip() - prompt = "" - if isinstance(parsed_json.get("meta"), Mapping): - prompt = str(parsed_json["meta"].get("prompt") or "").strip() - text = prompt or app_name or "json" - return {"type": "text", "data": f"[json:{text}]"} - - @staticmethod - def _encode_binary(binary_data: bytes) -> str: - """将二进制内容编码为 Base64 字符串。 - - Args: - binary_data: 待编码的二进制内容。 - - Returns: - str: Base64 编码字符串。 - """ - import base64 - - return base64.b64encode(binary_data).decode("utf-8") - - @staticmethod - def _decode_binary(binary_base64: str) -> bytes: - """将 Base64 字符串解码为二进制内容。 - - Args: - binary_base64: Base64 字符串。 - - Returns: - bytes: 解码后的二进制内容。 - """ - import base64 - - return base64.b64decode(binary_base64) - - def build_plain_text(self, raw_message: List[Dict[str, Any]], fallback_text: str) -> str: - """从标准消息段中提取可展示的纯文本。 - - Args: - raw_message: 标准化后的消息段列表。 - fallback_text: 当无法拼出文本时使用的回退文本。 - - Returns: - str: 用于 Host 展示和命令判断的纯文本内容。 - """ - plain_text_parts: List[str] = [] - for item in raw_message: - if not isinstance(item, Mapping): - continue - item_type = str(item.get("type") or "").strip() - item_data = item.get("data") - if item_type == "text": - plain_text_parts.append(str(item_data or "")) - elif item_type == "at" and isinstance(item_data, Mapping): - plain_text_parts.append(f"@{item_data.get('target_user_id') or ''}") - elif item_type == "reply": - plain_text_parts.append("[reply]") - elif item_type == "forward": - plain_text_parts.append("[forward]") - elif item_type in {"image", "emoji", "voice"}: - plain_text_parts.append(f"[{item_type}]") - - plain_text = "".join(part for part in plain_text_parts if part).strip() - return plain_text or fallback_text or "[unsupported]" - - def _parse_cq_message_text(self, message_text: str) -> List[Dict[str, Any]]: - """将 CQ 码字符串解析为 OneBot 风格消息段列表。 - - Args: - message_text: NapCat 在字符串模式下返回的消息内容。 - - Returns: - List[Dict[str, Any]]: 解析后的 OneBot 风格消息段列表。 - """ - parsed_segments: List[Dict[str, Any]] = [] - current_index = 0 - - for match in _CQ_SEGMENT_PATTERN.finditer(message_text): - prefix_text = self._decode_cq_entities(message_text[current_index : match.start()]) - if prefix_text: - parsed_segments.append({"type": "text", "data": {"text": prefix_text}}) - - segment_type = str(match.group("type") or "").strip() - segment_data = self._parse_cq_segment_data(match.group("params") or "") - if segment_type: - parsed_segments.append({"type": segment_type, "data": segment_data}) - current_index = match.end() - - suffix_text = self._decode_cq_entities(message_text[current_index:]) - if suffix_text: - parsed_segments.append({"type": "text", "data": {"text": suffix_text}}) - - return parsed_segments - - def _parse_cq_segment_data(self, raw_params: str) -> Dict[str, Any]: - """解析单个 CQ 段中的参数串。 - - Args: - raw_params: 形如 ``,key=value,key2=value2`` 的原始参数字符串。 - - Returns: - Dict[str, Any]: 解析后的参数字典。 - """ - parsed_data: Dict[str, Any] = {} - if not raw_params: - return parsed_data - - for item in raw_params.lstrip(",").split(","): - if not item or "=" not in item: - continue - key, value = item.split("=", 1) - normalized_key = key.strip() - if not normalized_key: - continue - decoded_value = self._decode_cq_entities(value) - parsed_data[normalized_key] = self._normalize_numeric_segment_value(decoded_value) - - return parsed_data - - @staticmethod - def _decode_cq_entities(text: str) -> str: - """解码 CQ 码中的 HTML 风格转义实体。 - - Args: - text: 待解码的 CQ 文本。 - - Returns: - str: 解码后的普通文本。 - """ - return ( - text.replace("&", "&") - .replace("[", "[") - .replace("]", "]") - .replace(",", ",") - ) - - @staticmethod - def _normalize_numeric_segment_value(value: Any) -> Any: - """将可安全识别的数字字符串转为整数。 - - Args: - value: 原始字段值。 - - Returns: - Any: 规范化后的字段值。 - """ - if isinstance(value, str): - stripped_value = value.strip() - if stripped_value.isdigit(): - return int(stripped_value) - return stripped_value - return value diff --git a/src/plugins/built_in/napcat_adapter/codec_outbound.py b/src/plugins/built_in/napcat_adapter/codec_outbound.py deleted file mode 100644 index 6adcb622..00000000 --- a/src/plugins/built_in/napcat_adapter/codec_outbound.py +++ /dev/null @@ -1,192 +0,0 @@ -"""NapCat 出站消息编解码。""" - -from typing import Any, Dict, List, Mapping, Tuple - - -class NapCatOutboundCodec: - """NapCat 出站消息编码器。""" - - def build_outbound_action( - self, - message: Mapping[str, Any], - route: Mapping[str, Any], - ) -> Tuple[str, Dict[str, Any]]: - """为 Host 出站消息构造 OneBot 动作。 - - Args: - message: Host 侧标准 ``MessageDict``。 - route: Platform IO 路由信息。 - - Returns: - Tuple[str, Dict[str, Any]]: 动作名称与参数字典。 - - Raises: - ValueError: 当私聊出站缺少目标用户 ID 时抛出。 - """ - message_info = message.get("message_info", {}) - if not isinstance(message_info, Mapping): - message_info = {} - - group_info = message_info.get("group_info", {}) - if not isinstance(group_info, Mapping): - group_info = {} - - additional_config = message_info.get("additional_config", {}) - if not isinstance(additional_config, Mapping): - additional_config = {} - - raw_message = message.get("raw_message", []) - segments = self.convert_segments(raw_message) - - if target_group_id := str( - group_info.get("group_id") or additional_config.get("platform_io_target_group_id") or "" - ).strip(): - return "send_group_msg", {"group_id": target_group_id, "message": segments} - - target_user_id = str( - additional_config.get("platform_io_target_user_id") - or additional_config.get("target_user_id") - or route.get("target_user_id") - or "" - ).strip() - if not target_user_id: - raise ValueError("Outbound private message is missing target_user_id") - - return "send_private_msg", {"message": segments, "user_id": target_user_id} - - def convert_segments(self, raw_message: Any) -> List[Dict[str, Any]]: - """将 Host 消息段转换为 OneBot 消息段。 - - Args: - raw_message: Host 侧 ``raw_message`` 字段。 - - Returns: - List[Dict[str, Any]]: OneBot 消息段列表。 - """ - if not isinstance(raw_message, list): - return [{"type": "text", "data": {"text": ""}}] - - outbound_segments: List[Dict[str, Any]] = [] - for item in raw_message: - if not isinstance(item, Mapping): - continue - - item_type = str(item.get("type") or "").strip() - item_data = item.get("data") - - if item_type == "text": - text_value = str(item_data or "") - outbound_segments.append({"type": "text", "data": {"text": text_value}}) - continue - - if item_type == "at" and isinstance(item_data, Mapping): - if target_user_id := str(item_data.get("target_user_id") or "").strip(): - outbound_segments.append({"type": "at", "data": {"qq": target_user_id}}) - continue - - if item_type == "reply": - if isinstance(item_data, Mapping): - target_message_id = str(item_data.get("target_message_id") or "").strip() - else: - target_message_id = str(item_data or "").strip() - if target_message_id: - outbound_segments.append({"type": "reply", "data": {"id": target_message_id}}) - continue - - if item_type == "image": - binary_base64 = str(item.get("binary_data_base64") or "").strip() - if binary_base64: - outbound_segments.append( - { - "type": "image", - "data": {"file": f"base64://{binary_base64}", "subtype": 0}, - } - ) - else: - outbound_segments.append({"type": "text", "data": {"text": "[image]"}}) - continue - - if item_type == "emoji": - binary_base64 = str(item.get("binary_data_base64") or "").strip() - if binary_base64: - outbound_segments.append( - { - "type": "image", - "data": { - "file": f"base64://{binary_base64}", - "subtype": 1, - "summary": "[动画表情]", - }, - } - ) - else: - outbound_segments.append({"type": "text", "data": {"text": "[emoji]"}}) - continue - - if item_type == "voice": - binary_base64 = str(item.get("binary_data_base64") or "").strip() - if binary_base64: - outbound_segments.append({"type": "record", "data": {"file": f"base64://{binary_base64}"}}) - else: - outbound_segments.append({"type": "text", "data": {"text": "[voice]"}}) - continue - - if item_type == "forward" and isinstance(item_data, list): - outbound_segments.extend(self._build_forward_nodes(item_data)) - continue - - if item_type == "dict" and isinstance(item_data, Mapping): - if dict_segment := self._build_dict_component_segment(item_data): - outbound_segments.append(dict_segment) - continue - - fallback_text = f"[unsupported:{item_type or 'unknown'}]" - outbound_segments.append({"type": "text", "data": {"text": fallback_text}}) - - if not outbound_segments: - outbound_segments.append({"type": "text", "data": {"text": ""}}) - return outbound_segments - - def _build_forward_nodes(self, forward_nodes: List[Any]) -> List[Dict[str, Any]]: - """构造 NapCat 转发节点列表。 - - Args: - forward_nodes: 内部转发节点列表。 - - Returns: - List[Dict[str, Any]]: NapCat 转发节点列表。 - """ - built_nodes: List[Dict[str, Any]] = [] - for node in forward_nodes: - if not isinstance(node, Mapping): - continue - raw_content = node.get("content", []) - node_segments = self.convert_segments(raw_content) - built_nodes.append( - { - "type": "node", - "data": { - "name": str(node.get("user_nickname") or node.get("user_cardname") or "QQ用户"), - "uin": str(node.get("user_id") or ""), - "content": node_segments, - }, - } - ) - return built_nodes - - def _build_dict_component_segment(self, item_data: Mapping[str, Any]) -> Dict[str, Any]: - """尽力将 ``DictComponent`` 转换为 NapCat 消息段。 - - Args: - item_data: ``DictComponent`` 原始数据。 - - Returns: - Dict[str, Any]: NapCat 消息段;不支持时返回占位文本段。 - """ - raw_type = str(item_data.get("type") or "").strip() - raw_payload = item_data.get("data", item_data) - if raw_type in {"file", "music", "video", "face"} and isinstance(raw_payload, Mapping): - return {"type": raw_type, "data": dict(raw_payload)} - if raw_type in {"image", "record", "reply", "at"} and isinstance(raw_payload, Mapping): - return {"type": raw_type, "data": dict(raw_payload)} - return {"type": "text", "data": {"text": f"[unsupported:{raw_type or 'dict'}]"}} diff --git a/src/plugins/built_in/napcat_adapter/config.py b/src/plugins/built_in/napcat_adapter/config.py deleted file mode 100644 index eeb4acab..00000000 --- a/src/plugins/built_in/napcat_adapter/config.py +++ /dev/null @@ -1,398 +0,0 @@ -"""NapCat 内置适配器配置解析。""" - -from dataclasses import dataclass, field -from typing import Any, Dict, Mapping, Optional, Set, Tuple -from urllib.parse import urlparse - -from napcat_adapter.constants import ( - DEFAULT_ACTION_TIMEOUT_SEC, - DEFAULT_CHAT_LIST_TYPE, - DEFAULT_HEARTBEAT_INTERVAL_SEC, - DEFAULT_NAPCAT_HOST, - DEFAULT_NAPCAT_PORT, - DEFAULT_RECONNECT_DELAY_SEC, - SUPPORTED_CONFIG_VERSION, -) - - -@dataclass(frozen=True) -class NapCatPluginOptions: - """插件级配置。""" - - enabled: bool = False - config_version: str = "" - - def should_connect(self) -> bool: - """判断当前配置下是否应当启动连接。 - - Returns: - bool: 若插件连接已启用,则返回 ``True``。 - """ - return self.enabled - - -@dataclass(frozen=True) -class NapCatServerConfig: - """NapCat 正向 WebSocket 连接配置。""" - - host: str = DEFAULT_NAPCAT_HOST - port: int = DEFAULT_NAPCAT_PORT - token: str = "" - heartbeat_interval: float = DEFAULT_HEARTBEAT_INTERVAL_SEC - reconnect_delay_sec: float = DEFAULT_RECONNECT_DELAY_SEC - action_timeout_sec: float = DEFAULT_ACTION_TIMEOUT_SEC - connection_id: str = "" - - def build_ws_url(self) -> str: - """构造正向 WebSocket 地址。 - - Returns: - str: 供适配器作为客户端连接的 NapCat WebSocket 地址。 - """ - return f"ws://{self.host}:{self.port}" - - -@dataclass(frozen=True) -class NapCatChatConfig: - """聊天名单配置。""" - - group_list_type: str = DEFAULT_CHAT_LIST_TYPE - group_list: Set[str] = field(default_factory=set) - private_list_type: str = DEFAULT_CHAT_LIST_TYPE - private_list: Set[str] = field(default_factory=set) - ban_user_id: Set[str] = field(default_factory=set) - - -@dataclass(frozen=True) -class NapCatFilterConfig: - """消息过滤配置。""" - - ignore_self_message: bool = True - - -@dataclass(frozen=True) -class NapCatPluginSettings: - """NapCat 插件完整配置。""" - - plugin: NapCatPluginOptions = field(default_factory=NapCatPluginOptions) - napcat_server: NapCatServerConfig = field(default_factory=NapCatServerConfig) - chat: NapCatChatConfig = field(default_factory=NapCatChatConfig) - filters: NapCatFilterConfig = field(default_factory=NapCatFilterConfig) - - @classmethod - def from_mapping(cls, raw_config: Mapping[str, Any], logger: Any) -> "NapCatPluginSettings": - """从 Runner 注入的原始配置字典解析插件配置。 - - Args: - raw_config: Runner 注入的原始配置内容。 - logger: 插件日志对象。 - - Returns: - NapCatPluginSettings: 规范化后的插件配置。 - """ - plugin_section = _as_mapping(raw_config.get("plugin")) - server_section = _as_mapping(raw_config.get("napcat_server")) - legacy_connection_section = _as_mapping(raw_config.get("connection")) - chat_section = _as_mapping(raw_config.get("chat")) - filters_section = _as_mapping(raw_config.get("filters")) - - if not server_section and legacy_connection_section: - logger.warning("NapCat 适配器检测到旧版 [connection] 配置段,请尽快迁移到 [napcat_server]") - server_section = legacy_connection_section - - legacy_host, legacy_port = _read_legacy_host_port(server_section, legacy_connection_section, logger) - parsed_host = _read_string(server_section, "host") or legacy_host or DEFAULT_NAPCAT_HOST - parsed_port = _read_positive_int( - mapping=server_section, - key="port", - default=legacy_port or DEFAULT_NAPCAT_PORT, - logger=logger, - setting_name="napcat_server.port", - ) - - return cls( - plugin=NapCatPluginOptions( - enabled=_read_bool(plugin_section, "enabled", False), - config_version=_read_string(plugin_section, "config_version"), - ), - napcat_server=NapCatServerConfig( - host=parsed_host, - port=parsed_port, - token=_read_string(server_section, "token") or _read_string(server_section, "access_token"), - heartbeat_interval=_read_positive_float( - mapping=server_section, - key="heartbeat_interval", - default=_read_positive_float( - mapping=server_section, - key="heartbeat_sec", - default=DEFAULT_HEARTBEAT_INTERVAL_SEC, - logger=logger, - setting_name="napcat_server.heartbeat_interval", - ), - logger=logger, - setting_name="napcat_server.heartbeat_interval", - ), - reconnect_delay_sec=_read_positive_float( - mapping=server_section, - key="reconnect_delay_sec", - default=DEFAULT_RECONNECT_DELAY_SEC, - logger=logger, - setting_name="napcat_server.reconnect_delay_sec", - ), - action_timeout_sec=_read_positive_float( - mapping=server_section, - key="action_timeout_sec", - default=DEFAULT_ACTION_TIMEOUT_SEC, - logger=logger, - setting_name="napcat_server.action_timeout_sec", - ), - connection_id=_read_string(server_section, "connection_id"), - ), - chat=NapCatChatConfig( - group_list_type=_read_list_mode( - mapping=chat_section, - key="group_list_type", - default=DEFAULT_CHAT_LIST_TYPE, - logger=logger, - setting_name="chat.group_list_type", - ), - group_list=_read_string_set(chat_section, "group_list"), - private_list_type=_read_list_mode( - mapping=chat_section, - key="private_list_type", - default=DEFAULT_CHAT_LIST_TYPE, - logger=logger, - setting_name="chat.private_list_type", - ), - private_list=_read_string_set(chat_section, "private_list"), - ban_user_id=_read_string_set(chat_section, "ban_user_id"), - ), - filters=NapCatFilterConfig( - ignore_self_message=_read_bool(filters_section, "ignore_self_message", True), - ), - ) - - def should_connect(self) -> bool: - """判断当前配置下是否应当启动连接。 - - Returns: - bool: 若插件连接已启用,则返回 ``True``。 - """ - return self.plugin.should_connect() - - def validate(self, logger: Any) -> bool: - """校验当前配置是否满足启动连接的前提条件。 - - Args: - logger: 插件日志对象。 - - Returns: - bool: 若配置满足启动连接的前提条件,则返回 ``True``。 - """ - config_version = self.plugin.config_version - if not config_version: - logger.error( - f"NapCat 适配器配置缺少 plugin.config_version,当前插件要求版本 {SUPPORTED_CONFIG_VERSION}" - ) - return False - - if config_version != SUPPORTED_CONFIG_VERSION: - logger.error( - "NapCat 适配器配置版本不兼容: " - f"当前为 {config_version},当前插件要求 {SUPPORTED_CONFIG_VERSION}" - ) - return False - - if not self.napcat_server.host: - logger.warning("NapCat 适配器已启用,但 napcat_server.host 为空") - return False - - if self.napcat_server.port <= 0: - logger.warning("NapCat 适配器已启用,但 napcat_server.port 不是正整数") - return False - - return True - - -def _as_mapping(value: Any) -> Dict[str, Any]: - """将任意值安全转换为字典。 - - Args: - value: 待转换的值。 - - Returns: - Dict[str, Any]: 若原值是映射,则返回普通字典;否则返回空字典。 - """ - return dict(value) if isinstance(value, Mapping) else {} - - -def _read_bool(mapping: Mapping[str, Any], key: str, default: bool) -> bool: - """安全读取布尔配置值。 - - Args: - mapping: 待读取的配置字典。 - key: 目标键名。 - default: 读取失败时的默认值。 - - Returns: - bool: 解析后的布尔值。 - """ - value = mapping.get(key, default) - return value if isinstance(value, bool) else default - - -def _read_string(mapping: Mapping[str, Any], key: str) -> str: - """安全读取字符串配置值。 - - Args: - mapping: 待读取的配置字典。 - key: 目标键名。 - - Returns: - str: 去除首尾空白后的字符串值。 - """ - value = mapping.get(key) - return "" if value is None else str(value).strip() - - -def _read_positive_float( - mapping: Mapping[str, Any], - key: str, - default: float, - logger: Any, - setting_name: str, -) -> float: - """安全读取正浮点数配置值。 - - Args: - mapping: 待读取的配置字典。 - key: 目标键名。 - default: 读取失败时的默认值。 - logger: 插件日志对象。 - setting_name: 用于日志输出的完整配置名。 - - Returns: - float: 合法的正浮点数;否则返回默认值。 - """ - value = mapping.get(key, default) - if isinstance(value, (int, float)) and float(value) > 0: - return float(value) - - if key in mapping: - logger.warning(f"NapCat 适配器配置项取值无效,已回退到默认值: {setting_name}={value!r},默认值为 {default}") - return default - - -def _read_positive_int( - mapping: Mapping[str, Any], - key: str, - default: int, - logger: Any, - setting_name: str, -) -> int: - """安全读取正整数配置值。 - - Args: - mapping: 待读取的配置字典。 - key: 目标键名。 - default: 读取失败时的默认值。 - logger: 插件日志对象。 - setting_name: 用于日志输出的完整配置名。 - - Returns: - int: 合法的正整数;否则返回默认值。 - """ - value = mapping.get(key, default) - if isinstance(value, int) and value > 0: - return value - - if isinstance(value, str) and value.isdigit() and int(value) > 0: - return int(value) - - if key in mapping: - logger.warning(f"NapCat 适配器配置项取值无效,已回退到默认值: {setting_name}={value!r},默认值为 {default}") - return default - - -def _read_list_mode( - mapping: Mapping[str, Any], - key: str, - default: str, - logger: Any, - setting_name: str, -) -> str: - """安全读取名单模式配置值。 - - Args: - mapping: 待读取的配置字典。 - key: 目标键名。 - default: 读取失败时的默认值。 - logger: 插件日志对象。 - setting_name: 用于日志输出的完整配置名。 - - Returns: - str: 合法的名单模式字符串。 - """ - value = mapping.get(key, default) - if isinstance(value, str): - normalized_value = value.strip() - if normalized_value in {"whitelist", "blacklist"}: - return normalized_value - - if key in mapping: - logger.warning(f"NapCat 适配器配置项取值无效,已回退到默认值: {setting_name}={value!r},默认值为 {default}") - return default - - -def _read_string_set(mapping: Mapping[str, Any], key: str) -> Set[str]: - """安全读取字符串集合配置值。 - - Args: - mapping: 待读取的配置字典。 - key: 目标键名。 - - Returns: - Set[str]: 规范化后的字符串集合。 - """ - value = mapping.get(key, []) - if not isinstance(value, list): - return set() - - normalized_values: Set[str] = set() - for item in value: - item_text = "" if item is None else str(item).strip() - if item_text: - normalized_values.add(item_text) - return normalized_values - - -def _read_legacy_host_port( - server_section: Mapping[str, Any], - legacy_connection_section: Mapping[str, Any], - logger: Any, -) -> Tuple[str, Optional[int]]: - """从旧版 ``ws_url`` 配置中提取主机与端口。 - - Args: - server_section: 新版 ``napcat_server`` 配置段。 - legacy_connection_section: 旧版 ``connection`` 配置段。 - logger: 插件日志对象。 - - Returns: - Tuple[str, Optional[int]]: 解析到的主机与端口;若未找到,则返回空主机与 ``None``。 - """ - legacy_ws_url = _read_string(server_section, "ws_url") or _read_string(legacy_connection_section, "ws_url") - if not legacy_ws_url: - return "", None - - parsed_url = urlparse(legacy_ws_url) - parsed_host = parsed_url.hostname or "" - parsed_port = parsed_url.port - - logger.warning( - "NapCat 适配器检测到旧版 ws_url 配置,已临时兼容解析,请尽快迁移到 napcat_server.host/port" - ) - if parsed_url.path not in {"", "/"}: - logger.warning("NapCat 适配器旧版 ws_url 包含路径,新的 napcat_server 配置不会保留该路径") - - return parsed_host, parsed_port diff --git a/src/plugins/built_in/napcat_adapter/constants.py b/src/plugins/built_in/napcat_adapter/constants.py deleted file mode 100644 index bdddde6f..00000000 --- a/src/plugins/built_in/napcat_adapter/constants.py +++ /dev/null @@ -1,9 +0,0 @@ -"""NapCat 内置适配器共享常量。""" - -SUPPORTED_CONFIG_VERSION = "0.1.0" -DEFAULT_NAPCAT_HOST = "127.0.0.1" -DEFAULT_NAPCAT_PORT = 3001 -DEFAULT_RECONNECT_DELAY_SEC = 5.0 -DEFAULT_HEARTBEAT_INTERVAL_SEC = 30.0 -DEFAULT_ACTION_TIMEOUT_SEC = 15.0 -DEFAULT_CHAT_LIST_TYPE = "whitelist" diff --git a/src/plugins/built_in/napcat_adapter/filters.py b/src/plugins/built_in/napcat_adapter/filters.py deleted file mode 100644 index 141cda85..00000000 --- a/src/plugins/built_in/napcat_adapter/filters.py +++ /dev/null @@ -1,68 +0,0 @@ -"""NapCat 入站消息过滤。""" - -from typing import Any, Set - -from napcat_adapter.config import NapCatChatConfig - - -class NapCatChatFilter: - """NapCat 聊天名单过滤器。""" - - def __init__(self, logger: Any) -> None: - """初始化聊天名单过滤器。 - - Args: - logger: 插件日志对象。 - """ - self._logger = logger - - def is_inbound_chat_allowed( - self, - sender_user_id: str, - group_id: str, - chat_config: NapCatChatConfig, - ) -> bool: - """检查入站消息是否通过聊天名单过滤。 - - Args: - sender_user_id: 发送者用户 ID。 - group_id: 群聊 ID;私聊时为空字符串。 - chat_config: 当前生效的聊天配置。 - - Returns: - bool: 若消息允许继续进入 Host,则返回 ``True``。 - """ - if sender_user_id in chat_config.ban_user_id: - self._logger.warning(f"NapCat 用户 {sender_user_id} 在全局禁止名单中,消息被丢弃") - return False - - if group_id: - if not self._is_id_allowed_by_list_policy(group_id, chat_config.group_list_type, chat_config.group_list): - self._logger.warning(f"NapCat 群聊 {group_id} 未通过聊天名单过滤,消息被丢弃") - return False - return True - - if not self._is_id_allowed_by_list_policy( - sender_user_id, - chat_config.private_list_type, - chat_config.private_list, - ): - self._logger.warning(f"NapCat 私聊用户 {sender_user_id} 未通过聊天名单过滤,消息被丢弃") - return False - return True - - @staticmethod - def _is_id_allowed_by_list_policy(target_id: str, list_type: str, configured_ids: Set[str]) -> bool: - """根据白名单或黑名单规则判断目标 ID 是否允许通过。 - - Args: - target_id: 待检查的目标 ID。 - list_type: 名单模式,仅支持 ``whitelist`` 或 ``blacklist``。 - configured_ids: 配置中的 ID 集合。 - - Returns: - bool: 若目标 ID 允许通过,则返回 ``True``。 - """ - if list_type == "whitelist": - return target_id in configured_ids - return target_id not in configured_ids diff --git a/src/plugins/built_in/napcat_adapter/plugin.py b/src/plugins/built_in/napcat_adapter/plugin.py deleted file mode 100644 index 50900c5d..00000000 --- a/src/plugins/built_in/napcat_adapter/plugin.py +++ /dev/null @@ -1,381 +0,0 @@ -"""内置 NapCat 适配器插件。 - -当前实现维持 MVP 范围,目标是跑通基础消息收发链路: -1. 作为客户端连接 NapCat / OneBot v11 WebSocket 服务。 -2. 将入站消息事件转换为 Host 侧的 ``MessageDict``。 -3. 将 Host 出站消息转换为 OneBot 动作并发送。 - -当前范围刻意收敛为: -- 单连接 -- 文本、@、reply 基础转发 -- 暂不处理 ``notice`` / ``meta_event`` 的完整语义归一化 -- 暂不支持图片、语音、文件等复杂媒体 -""" - -from __future__ import annotations - -from typing import Any, Dict, Mapping, Optional - -import asyncio - -from maibot_sdk import Adapter, MaiBotPlugin - -from napcat_adapter.codec_inbound import NapCatInboundCodec -from napcat_adapter.codec_outbound import NapCatOutboundCodec -from napcat_adapter.config import NapCatPluginSettings -from napcat_adapter.filters import NapCatChatFilter -from napcat_adapter.qq_notice import NapCatNoticeCodec -from napcat_adapter.qq_queries import NapCatQueryService -from napcat_adapter.runtime_state import NapCatRuntimeStateManager -from napcat_adapter.transport import NapCatTransportClient - - -@Adapter(platform="qq", protocol="napcat", send_method="send_to_platform") -class NapCatAdapterPlugin(MaiBotPlugin): - """NapCat 适配器 MVP 实现。""" - - def __init__(self) -> None: - """初始化 NapCat 适配器插件实例。""" - super().__init__() - self._plugin_config: Dict[str, Any] = {} - self._settings: Optional[NapCatPluginSettings] = None - self._inbound_codec: Optional[NapCatInboundCodec] = None - self._outbound_codec = NapCatOutboundCodec() - self._chat_filter: Optional[NapCatChatFilter] = None - self._query_service: Optional[NapCatQueryService] = None - self._notice_codec: Optional[NapCatNoticeCodec] = None - self._runtime_state: Optional[NapCatRuntimeStateManager] = None - self._transport: Optional[NapCatTransportClient] = None - - def set_plugin_config(self, config: Dict[str, Any]) -> None: - """设置插件配置内容。 - - Args: - config: Runner 注入的 ``config.toml`` 解析结果。 - """ - self._plugin_config = config if isinstance(config, dict) else {} - - async def on_load(self) -> None: - """在插件加载时根据配置决定是否启动连接。""" - await self._restart_connection_if_needed() - - async def on_unload(self) -> None: - """在插件卸载时关闭连接并清理运行时状态。""" - await self._stop_connection() - - async def on_config_update(self, new_config: Dict[str, Any], version: str) -> None: - """在配置更新后重载连接状态。 - - Args: - new_config: 最新的插件配置。 - version: 配置版本号。 - """ - self.set_plugin_config(new_config) - self._settings = None - if version: - self.ctx.logger.debug(f"NapCat 适配器收到配置更新通知: {version}") - await self._restart_connection_if_needed() - - async def send_to_platform( - self, - message: Dict[str, Any], - route: Optional[Dict[str, Any]] = None, - metadata: Optional[Dict[str, Any]] = None, - **kwargs: Any, - ) -> Dict[str, Any]: - """将 Host 出站消息发送到 NapCat。 - - Args: - message: Host 侧标准 ``MessageDict``。 - route: Platform IO 生成的路由信息。 - metadata: Platform IO 附带的投递元数据。 - **kwargs: 预留的扩展参数。 - - Returns: - Dict[str, Any]: 标准化后的发送结果。 - """ - del metadata - del kwargs - - self._ensure_runtime_components() - transport = self._transport - if transport is None: - return {"success": False, "error": "NapCat transport is not initialized"} - - try: - action_name, params = self._outbound_codec.build_outbound_action(message, route or {}) - response = await transport.call_action(action_name, params) - except Exception as exc: - return {"success": False, "error": str(exc)} - - if str(response.get("status", "")).lower() != "ok": - return { - "success": False, - "error": str(response.get("wording") or response.get("message") or "NapCat send failed"), - "metadata": {"retcode": response.get("retcode")}, - } - - response_data = response.get("data", {}) - external_message_id = "" - if isinstance(response_data, Mapping): - external_message_id = str(response_data.get("message_id") or "") - - return { - "success": True, - "external_message_id": external_message_id or None, - "metadata": {"action": action_name}, - } - - def _ensure_runtime_components(self) -> None: - """确保运行时依赖对象已经完成初始化。""" - if self._chat_filter is None: - self._chat_filter = NapCatChatFilter(self.ctx.logger) - - if self._transport is None: - self._transport = NapCatTransportClient( - logger=self.ctx.logger, - on_connection_opened=self._bootstrap_adapter_runtime_state, - on_connection_closed=self._handle_transport_disconnected, - on_payload=self._handle_transport_payload, - ) - - if self._query_service is None: - self._query_service = NapCatQueryService(self.ctx.logger, self._transport) - - if self._inbound_codec is None: - self._inbound_codec = NapCatInboundCodec(self.ctx.logger, self._query_service) - - if self._notice_codec is None: - self._notice_codec = NapCatNoticeCodec(self.ctx.logger, self._query_service) - - if self._runtime_state is None: - self._runtime_state = NapCatRuntimeStateManager(self.ctx.adapter, self.ctx.logger) - - def _reload_settings(self) -> NapCatPluginSettings: - """重新解析当前插件配置。 - - Returns: - NapCatPluginSettings: 最新的规范化配置。 - """ - self._settings = NapCatPluginSettings.from_mapping(self._plugin_config, self.ctx.logger) - return self._settings - - async def _restart_connection_if_needed(self) -> None: - """根据当前配置重启连接循环。""" - self._ensure_runtime_components() - settings = self._reload_settings() - - await self._stop_connection() - if not settings.should_connect(): - self.ctx.logger.info("NapCat 适配器保持空闲状态,因为插件或配置未启用") - return - if not settings.validate(self.ctx.logger): - return - - transport = self._transport - assert transport is not None - if not transport.is_available(): - self.ctx.logger.error("NapCat 适配器依赖 aiohttp,但当前环境未安装该依赖") - return - - transport.configure(settings.napcat_server) - await transport.start() - - async def _stop_connection(self) -> None: - """停止当前连接。""" - transport = self._transport - if transport is not None: - await transport.stop() - return - - runtime_state = self._runtime_state - if runtime_state is not None: - await runtime_state.report_disconnected() - - async def _handle_transport_payload(self, payload: Dict[str, Any]) -> None: - """处理来自传输层的非 echo 载荷。 - - Args: - payload: NapCat 推送的原始事件数据。 - """ - post_type = str(payload.get("post_type") or "").strip() - if post_type == "message": - await self._handle_inbound_message(payload) - return - if post_type == "notice": - await self._handle_notice_event(payload) - return - if post_type == "meta_event": - await self._handle_meta_event(payload) - - async def _handle_inbound_message(self, payload: Dict[str, Any]) -> None: - """处理单条 NapCat 入站消息并注入 Host。 - - Args: - payload: NapCat / OneBot 推送的原始消息事件。 - """ - self._ensure_runtime_components() - settings = self._settings or self._reload_settings() - chat_filter = self._chat_filter - inbound_codec = self._inbound_codec - runtime_state = self._runtime_state - assert chat_filter is not None - assert inbound_codec is not None - assert runtime_state is not None - - self_id = str(payload.get("self_id") or "").strip() - if self_id: - await runtime_state.report_connected(self_id, settings.napcat_server) - - sender = payload.get("sender", {}) - if not isinstance(sender, Mapping): - sender = {} - - sender_user_id = str(payload.get("user_id") or sender.get("user_id") or "").strip() - if not sender_user_id: - return - - group_id = str(payload.get("group_id") or "").strip() - if self_id and sender_user_id == self_id and settings.filters.ignore_self_message: - return - if not chat_filter.is_inbound_chat_allowed(sender_user_id, group_id, settings.chat): - return - - message_dict = await inbound_codec.build_message_dict(payload, self_id, sender_user_id, sender) - route_metadata: Dict[str, Any] = {} - if self_id: - route_metadata["self_id"] = self_id - if settings.napcat_server.connection_id: - route_metadata["connection_id"] = settings.napcat_server.connection_id - - external_message_id = str(payload.get("message_id") or "").strip() - accepted = await self.ctx.adapter.receive_external_message( - message_dict, - route_metadata=route_metadata, - external_message_id=external_message_id, - dedupe_key=external_message_id, - ) - if not accepted: - self.ctx.logger.debug(f"Host 丢弃了 NapCat 入站消息: {external_message_id or '无消息 ID'}") - - async def _handle_notice_event(self, payload: Dict[str, Any]) -> None: - """处理 NapCat ``notice`` 事件并注入 Host。 - - Args: - payload: NapCat 推送的通知事件。 - """ - self._ensure_runtime_components() - notice_codec = self._notice_codec - runtime_state = self._runtime_state - settings = self._settings or self._reload_settings() - assert notice_codec is not None - assert runtime_state is not None - - self_id = str(payload.get("self_id") or "").strip() - if self_id: - await runtime_state.report_connected(self_id, settings.napcat_server) - - message_dict = await notice_codec.build_notice_message_dict(payload) - if message_dict is None: - return - - route_metadata: Dict[str, Any] = {} - if self_id: - route_metadata["self_id"] = self_id - if settings.napcat_server.connection_id: - route_metadata["connection_id"] = settings.napcat_server.connection_id - - external_message_id = str(payload.get("message_id") or payload.get("notice_type") or "").strip() - accepted = await self.ctx.adapter.receive_external_message( - message_dict, - route_metadata=route_metadata, - external_message_id=external_message_id or None, - dedupe_key=external_message_id or None, - ) - if not accepted: - self.ctx.logger.debug(f"Host 丢弃了 NapCat 通知事件: {external_message_id or '无消息 ID'}") - - async def _handle_meta_event(self, payload: Dict[str, Any]) -> None: - """处理 NapCat ``meta_event`` 事件。 - - Args: - payload: NapCat 推送的元事件。 - """ - self._ensure_runtime_components() - notice_codec = self._notice_codec - runtime_state = self._runtime_state - settings = self._settings or self._reload_settings() - assert notice_codec is not None - assert runtime_state is not None - - self_id = str(payload.get("self_id") or "").strip() - if self_id: - await runtime_state.report_connected(self_id, settings.napcat_server) - - await notice_codec.handle_meta_event(payload) - - async def _bootstrap_adapter_runtime_state(self) -> None: - """在连接建立后主动获取账号信息并激活适配器路由。""" - transport = self._transport - query_service = self._query_service - runtime_state = self._runtime_state - settings = self._settings or self._reload_settings() - if transport is None or query_service is None or runtime_state is None: - return - - max_attempts = 3 - last_error: Optional[Exception] = None - for attempt in range(1, max_attempts + 1): - try: - login_info = await query_service.get_login_info() - self_id = self._extract_self_id_from_login_response(login_info) - await runtime_state.report_connected(self_id, settings.napcat_server) - return - except asyncio.CancelledError: - raise - except Exception as exc: - last_error = exc - self.ctx.logger.warning( - f"NapCat 适配器获取登录信息失败,第 {attempt}/{max_attempts} 次重试: {exc}" - ) - if attempt < max_attempts: - await asyncio.sleep(1.0) - - if last_error is not None: - self.ctx.logger.error(f"NapCat 适配器未能完成路由激活,连接将保持只接收状态: {last_error}") - - async def _handle_transport_disconnected(self) -> None: - """处理传输层断开事件。""" - runtime_state = self._runtime_state - if runtime_state is not None: - await runtime_state.report_disconnected() - - @staticmethod - def _extract_self_id_from_login_response(response: Optional[Dict[str, Any]]) -> str: - """从 ``get_login_info`` 查询结果中提取当前账号 ID。 - - Args: - response: NapCat 返回的登录信息字典。 - - Returns: - str: 规范化后的账号 ID 字符串。 - - Raises: - ValueError: 当响应中缺少有效账号 ID 时抛出。 - """ - if not isinstance(response, Mapping): - raise ValueError("get_login_info 响应缺少 data 字段") - - self_id = str(response.get("user_id") or "").strip() - if not self_id: - raise ValueError("get_login_info 响应缺少有效的 user_id") - return self_id - - -def create_plugin() -> NapCatAdapterPlugin: - """创建插件实例。 - - Returns: - NapCatAdapterPlugin: NapCat 内置适配器插件实例。 - """ - return NapCatAdapterPlugin() diff --git a/src/plugins/built_in/napcat_adapter/qq_notice.py b/src/plugins/built_in/napcat_adapter/qq_notice.py deleted file mode 100644 index f577cf98..00000000 --- a/src/plugins/built_in/napcat_adapter/qq_notice.py +++ /dev/null @@ -1,224 +0,0 @@ -"""NapCat QQ 平台通知与元事件处理。""" - -from typing import Any, Dict, Mapping, Optional -from uuid import uuid4 - -import time - -from napcat_adapter.qq_queries import NapCatQueryService - - -class NapCatNoticeCodec: - """NapCat QQ 通知事件编码器。""" - - def __init__(self, logger: Any, query_service: NapCatQueryService) -> None: - """初始化通知事件编码器。 - - Args: - logger: 插件日志对象。 - query_service: QQ 查询服务。 - """ - self._logger = logger - self._query_service = query_service - - async def build_notice_message_dict(self, payload: Mapping[str, Any]) -> Optional[Dict[str, Any]]: - """将 NapCat ``notice`` 事件转换为 Host 可接受的消息字典。 - - Args: - payload: NapCat 推送的原始通知事件。 - - Returns: - Optional[Dict[str, Any]]: 成功时返回标准 ``MessageDict``;无法识别时返回 ``None``。 - """ - notice_type = str(payload.get("notice_type") or "").strip() - if not notice_type: - return None - - group_id = str(payload.get("group_id") or "").strip() - user_id = str(payload.get("user_id") or payload.get("operator_id") or "").strip() - self_id = str(payload.get("self_id") or "").strip() - - user_info = await self._build_user_info(group_id=group_id, user_id=user_id) - group_info = await self._build_group_info(group_id) - notice_text = self._build_notice_text(payload, user_info.get("user_nickname", user_id or "系统")) - if not notice_text: - return None - - additional_config: Dict[str, Any] = { - "self_id": self_id, - "napcat_notice_type": notice_type, - "napcat_notice_sub_type": str(payload.get("sub_type") or "").strip(), - "napcat_notice_payload": dict(payload), - } - if group_id: - additional_config["platform_io_target_group_id"] = group_id - elif user_id: - additional_config["platform_io_target_user_id"] = user_id - - message_info: Dict[str, Any] = {"user_info": user_info, "additional_config": additional_config} - if group_info is not None: - message_info["group_info"] = group_info - - timestamp_seconds = payload.get("time") - if not isinstance(timestamp_seconds, (int, float)): - timestamp_seconds = time.time() - - return { - "message_id": f"napcat-notice-{uuid4().hex}", - "timestamp": str(float(timestamp_seconds)), - "platform": "qq", - "message_info": message_info, - "raw_message": [{"type": "text", "data": notice_text}], - "is_mentioned": False, - "is_at": False, - "is_emoji": False, - "is_picture": False, - "is_command": False, - "is_notify": True, - "session_id": "", - "processed_plain_text": notice_text, - "display_message": notice_text, - } - - async def handle_meta_event(self, payload: Mapping[str, Any]) -> None: - """处理 ``meta_event`` 事件的日志与状态观测。 - - Args: - payload: NapCat 推送的原始元事件。 - """ - meta_event_type = str(payload.get("meta_event_type") or "").strip() - self_id = str(payload.get("self_id") or "").strip() or "unknown" - - if meta_event_type == "lifecycle": - sub_type = str(payload.get("sub_type") or "").strip() - if sub_type == "connect": - self._logger.info(f"NapCat 元事件:Bot {self_id} 已建立连接") - else: - self._logger.debug(f"NapCat 生命周期事件: self_id={self_id} sub_type={sub_type}") - return - - if meta_event_type == "heartbeat": - status = payload.get("status", {}) - if not isinstance(status, Mapping): - status = {} - is_online = bool(status.get("online", False)) - is_good = bool(status.get("good", False)) - interval_ms = payload.get("interval") - self._logger.debug( - f"NapCat 心跳事件: self_id={self_id} online={is_online} good={is_good} interval={interval_ms}" - ) - if not is_online: - self._logger.warning(f"NapCat 心跳显示 Bot {self_id} 已离线") - elif not is_good: - self._logger.warning(f"NapCat 心跳显示 Bot {self_id} 状态异常") - - async def _build_user_info(self, group_id: str, user_id: str) -> Dict[str, Optional[str]]: - """构造通知消息的用户信息。 - - Args: - group_id: 群号;私聊或系统通知时为空字符串。 - user_id: 事件关联用户号。 - - Returns: - Dict[str, Optional[str]]: 规范化后的用户信息字典。 - """ - if not user_id: - return { - "user_id": "notice", - "user_nickname": "系统通知", - "user_cardname": None, - } - - member_info: Optional[Dict[str, Any]] - if group_id: - member_info = await self._query_service.get_group_member_info(group_id, user_id) - else: - member_info = await self._query_service.get_stranger_info(user_id) - - if member_info is None: - return { - "user_id": user_id, - "user_nickname": user_id, - "user_cardname": None, - } - - return { - "user_id": user_id, - "user_nickname": str(member_info.get("nickname") or user_id), - "user_cardname": self._normalize_optional_string(member_info.get("card")), - } - - async def _build_group_info(self, group_id: str) -> Optional[Dict[str, str]]: - """构造通知消息的群信息。 - - Args: - group_id: 群号。 - - Returns: - Optional[Dict[str, str]]: 群信息字典;若不是群通知则返回 ``None``。 - """ - if not group_id: - return None - - group_info = await self._query_service.get_group_info(group_id) - group_name = str(group_info.get("group_name") or f"group_{group_id}") if group_info else f"group_{group_id}" - return {"group_id": group_id, "group_name": group_name} - - def _build_notice_text(self, payload: Mapping[str, Any], actor_name: str) -> str: - """根据 NapCat 通知事件生成可读文本。 - - Args: - payload: 原始通知事件。 - actor_name: 事件操作者显示名。 - - Returns: - str: 生成的可读通知文本。 - """ - notice_type = str(payload.get("notice_type") or "").strip() - sub_type = str(payload.get("sub_type") or "").strip() - target_id = str(payload.get("target_id") or "").strip() - - if notice_type in {"group_recall", "friend_recall"}: - return f"{actor_name} 撤回了一条消息" - if notice_type == "notify" and sub_type == "poke": - target_text = f" -> {target_id}" if target_id else "" - return f"{actor_name} 发起了戳一戳{target_text}" - if notice_type == "notify" and sub_type == "group_name": - return f"{actor_name} 修改了群名称" - if notice_type == "group_ban" and sub_type == "ban": - duration = payload.get("duration") - return f"{actor_name} 触发了群禁言,时长 {duration} 秒" - if notice_type == "group_ban" and sub_type == "lift_ban": - return f"{actor_name} 触发了解除禁言" - if notice_type == "group_upload": - file_info = payload.get("file", {}) - file_name = "" - if isinstance(file_info, Mapping): - file_name = str(file_info.get("name") or "").strip() - return f"{actor_name} 上传了文件{f':{file_name}' if file_name else ''}" - if notice_type == "group_increase": - return f"{actor_name} 加入了群聊" - if notice_type == "group_decrease": - return f"{actor_name} 离开了群聊" - if notice_type == "group_admin": - return f"{actor_name} 的群管理员状态发生变化" - if notice_type == "essence": - return f"{actor_name} 触发了精华消息事件" - if notice_type == "group_msg_emoji_like": - return f"{actor_name} 给一条消息添加了表情回应" - return f"[notice] {notice_type}.{sub_type}".strip(".") - - @staticmethod - def _normalize_optional_string(value: Any) -> Optional[str]: - """将任意值规范化为可选字符串。 - - Args: - value: 待规范化的值。 - - Returns: - Optional[str]: 规范化后的字符串;若值为空则返回 ``None``。 - """ - if value is None: - return None - normalized_value = str(value).strip() - return normalized_value if normalized_value else None diff --git a/src/plugins/built_in/napcat_adapter/qq_queries.py b/src/plugins/built_in/napcat_adapter/qq_queries.py deleted file mode 100644 index 7d29803a..00000000 --- a/src/plugins/built_in/napcat_adapter/qq_queries.py +++ /dev/null @@ -1,170 +0,0 @@ -"""NapCat QQ 平台查询能力。""" - -from typing import TYPE_CHECKING, Any, Dict, Optional - -import asyncio - -if TYPE_CHECKING: - from napcat_adapter.transport import NapCatTransportClient - -try: - from aiohttp import ClientSession, ClientTimeout - - AIOHTTP_AVAILABLE = True -except ImportError: - ClientSession = None # type: ignore[assignment] - ClientTimeout = None # type: ignore[assignment] - AIOHTTP_AVAILABLE = False - - -class NapCatQueryService: - """NapCat QQ 平台查询服务。""" - - def __init__(self, logger: Any, transport: "NapCatTransportClient") -> None: - """初始化查询服务。 - - Args: - logger: 插件日志对象。 - transport: NapCat 传输层客户端。 - """ - self._logger = logger - self._transport = transport - - async def get_login_info(self) -> Optional[Dict[str, Any]]: - """获取当前登录账号信息。 - - Returns: - Optional[Dict[str, Any]]: 登录信息字典;失败时返回 ``None``。 - """ - return await self._call_query("get_login_info", {}) - - async def get_group_info(self, group_id: str) -> Optional[Dict[str, Any]]: - """获取群信息。 - - Args: - group_id: 群号。 - - Returns: - Optional[Dict[str, Any]]: 群信息字典;失败时返回 ``None``。 - """ - return await self._call_query("get_group_info", {"group_id": group_id}) - - async def get_group_member_info(self, group_id: str, user_id: str) -> Optional[Dict[str, Any]]: - """获取群成员信息。 - - Args: - group_id: 群号。 - user_id: 用户号。 - - Returns: - Optional[Dict[str, Any]]: 群成员信息字典;失败时返回 ``None``。 - """ - return await self._call_query( - "get_group_member_info", - {"group_id": group_id, "user_id": user_id, "no_cache": True}, - ) - - async def get_stranger_info(self, user_id: str) -> Optional[Dict[str, Any]]: - """获取陌生人信息。 - - Args: - user_id: 用户号。 - - Returns: - Optional[Dict[str, Any]]: 陌生人信息字典;失败时返回 ``None``。 - """ - return await self._call_query("get_stranger_info", {"user_id": user_id}) - - async def get_message_detail(self, message_id: str) -> Optional[Dict[str, Any]]: - """获取消息详情。 - - Args: - message_id: 消息 ID。 - - Returns: - Optional[Dict[str, Any]]: 消息详情字典;失败时返回 ``None``。 - """ - return await self._call_query("get_msg", {"message_id": message_id}) - - async def get_forward_message(self, message_id: str) -> Optional[Dict[str, Any]]: - """获取合并转发消息详情。 - - Args: - message_id: 转发消息 ID。 - - Returns: - Optional[Dict[str, Any]]: 合并转发消息详情;失败时返回 ``None``。 - """ - return await self._call_query("get_forward_msg", {"message_id": message_id}) - - async def get_record_detail(self, file_name: str, file_id: Optional[str] = None) -> Optional[Dict[str, Any]]: - """获取语音文件详情。 - - Args: - file_name: 语音文件名。 - file_id: 可选文件 ID。 - - Returns: - Optional[Dict[str, Any]]: 语音详情字典;失败时返回 ``None``。 - """ - params: Dict[str, Any] = {"file": file_name, "out_format": "wav"} - if file_id: - params["file_id"] = file_id - return await self._call_query("get_record", params) - - async def download_binary(self, url: str) -> Optional[bytes]: - """下载远程二进制资源。 - - Args: - url: 资源 URL。 - - Returns: - Optional[bytes]: 下载到的二进制内容;失败时返回 ``None``。 - """ - if not url: - return None - if not AIOHTTP_AVAILABLE or ClientSession is None or ClientTimeout is None: - self._logger.warning("NapCat 查询层缺少 aiohttp,无法下载远程资源") - return None - - try: - timeout = ClientTimeout(total=15) - async with ClientSession(timeout=timeout) as session: - async with session.get(url) as response: - if response.status != 200: - self._logger.warning(f"NapCat 远程资源下载失败: status={response.status} url={url}") - return None - return await response.read() - except asyncio.CancelledError: - raise - except Exception as exc: - self._logger.warning(f"NapCat 远程资源下载失败: {exc}") - return None - - async def _call_query(self, action_name: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """调用 OneBot 查询动作并提取 ``data`` 字段。 - - Args: - action_name: OneBot 动作名。 - params: 动作参数。 - - Returns: - Optional[Dict[str, Any]]: 查询结果中的 ``data`` 字段;失败时返回 ``None``。 - """ - try: - response = await self._transport.call_action(action_name, params) - except asyncio.CancelledError: - raise - except Exception as exc: - self._logger.warning(f"NapCat 查询动作执行失败: action={action_name} error={exc}") - return None - - if str(response.get("status") or "").lower() != "ok": - self._logger.warning( - f"NapCat 查询动作返回失败: action={action_name} " - f"message={response.get('wording') or response.get('message') or 'unknown'}" - ) - return None - - response_data = response.get("data") - return response_data if isinstance(response_data, dict) else None diff --git a/src/plugins/built_in/napcat_adapter/runtime_state.py b/src/plugins/built_in/napcat_adapter/runtime_state.py deleted file mode 100644 index b4dbfa09..00000000 --- a/src/plugins/built_in/napcat_adapter/runtime_state.py +++ /dev/null @@ -1,85 +0,0 @@ -"""NapCat 运行时路由状态管理。""" - -from typing import Any, Optional - -from napcat_adapter.config import NapCatServerConfig - - -class NapCatRuntimeStateManager: - """NapCat 适配器路由状态上报器。""" - - def __init__(self, adapter_capability: Any, logger: Any) -> None: - """初始化运行时状态管理器。 - - Args: - adapter_capability: SDK 提供的适配器能力对象。 - logger: 插件日志对象。 - """ - self._adapter_capability = adapter_capability - self._logger = logger - self._runtime_state_connected: bool = False - self._reported_account_id: Optional[str] = None - self._reported_scope: Optional[str] = None - - async def report_connected(self, account_id: str, server_config: NapCatServerConfig) -> bool: - """向 Host 上报当前连接已就绪。 - - Args: - account_id: 当前 NapCat 连接对应的机器人账号 ID。 - server_config: 当前生效的 NapCat 服务端配置。 - - Returns: - bool: 若 Host 接受了运行时状态更新,则返回 ``True``。 - """ - normalized_account_id = str(account_id).strip() - if not normalized_account_id: - return False - - scope = server_config.connection_id or None - if ( - self._runtime_state_connected - and self._reported_account_id == normalized_account_id - and self._reported_scope == scope - ): - return True - - accepted = False - try: - accepted = await self._adapter_capability.update_runtime_state( - connected=True, - account_id=normalized_account_id, - scope=server_config.connection_id, - metadata={"ws_url": server_config.build_ws_url()}, - ) - except Exception as exc: - self._logger.warning(f"NapCat 适配器上报连接就绪状态失败: {exc}") - return False - - if not accepted: - self._logger.warning("NapCat 适配器连接已建立,但 Host 未接受运行时状态更新") - return False - - self._runtime_state_connected = True - self._reported_account_id = normalized_account_id - self._reported_scope = scope - self._logger.info( - f"NapCat 适配器已激活路由: platform=qq account_id={normalized_account_id} " - f"scope={self._reported_scope or '*'}" - ) - return True - - async def report_disconnected(self) -> None: - """向 Host 上报当前连接已断开,并撤销适配器路由。""" - if not self._runtime_state_connected: - self._reported_account_id = None - self._reported_scope = None - return - - try: - await self._adapter_capability.update_runtime_state(connected=False) - except Exception as exc: - self._logger.warning(f"NapCat 适配器上报断开状态失败: {exc}") - finally: - self._runtime_state_connected = False - self._reported_account_id = None - self._reported_scope = None diff --git a/src/plugins/built_in/napcat_adapter/transport.py b/src/plugins/built_in/napcat_adapter/transport.py deleted file mode 100644 index d20de097..00000000 --- a/src/plugins/built_in/napcat_adapter/transport.py +++ /dev/null @@ -1,322 +0,0 @@ -"""NapCat 正向 WebSocket 传输层。""" - -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Set, cast -from uuid import uuid4 - -import asyncio -import contextlib -import json - -from napcat_adapter.config import NapCatServerConfig - -if TYPE_CHECKING: - from aiohttp import ClientWebSocketResponse as AiohttpClientWebSocketResponse - -try: - from aiohttp import ClientSession, ClientTimeout, WSMsgType - - AIOHTTP_AVAILABLE = True -except ImportError: - ClientSession = cast(Any, None) - ClientTimeout = cast(Any, None) - WSMsgType = cast(Any, None) - AIOHTTP_AVAILABLE = False - -if not TYPE_CHECKING: - AiohttpClientWebSocketResponse = Any - - -class NapCatTransportClient: - """NapCat 正向 WebSocket 客户端。""" - - def __init__( - self, - logger: Any, - on_connection_opened: Callable[[], Awaitable[None]], - on_connection_closed: Callable[[], Awaitable[None]], - on_payload: Callable[[Dict[str, Any]], Awaitable[None]], - ) -> None: - """初始化传输层客户端。 - - Args: - logger: 插件日志对象。 - on_connection_opened: 连接建立后的异步回调。 - on_connection_closed: 连接断开后的异步回调。 - on_payload: 收到非 echo 载荷后的异步回调。 - """ - self._logger = logger - self._on_connection_opened = on_connection_opened - self._on_connection_closed = on_connection_closed - self._on_payload = on_payload - self._server_config: Optional[NapCatServerConfig] = None - self._connection_task: Optional[asyncio.Task[None]] = None - self._pending_actions: Dict[str, asyncio.Future[Dict[str, Any]]] = {} - self._background_tasks: Set[asyncio.Task[Any]] = set() - self._send_lock = asyncio.Lock() - self._ws: Optional[AiohttpClientWebSocketResponse] = None - self._stop_requested: bool = False - self._connection_active: bool = False - - @classmethod - def is_available(cls) -> bool: - """判断当前环境是否安装了传输层依赖。 - - Returns: - bool: 若已安装 ``aiohttp``,则返回 ``True``。 - """ - return AIOHTTP_AVAILABLE - - def configure(self, server_config: NapCatServerConfig) -> None: - """更新当前传输层使用的 NapCat 服务端配置。 - - Args: - server_config: 最新生效的 NapCat 服务端配置。 - """ - self._server_config = server_config - - async def start(self) -> None: - """启动 NapCat 正向 WebSocket 连接循环。 - - Raises: - RuntimeError: 当缺少配置或依赖时抛出。 - """ - if not self.is_available(): - raise RuntimeError("NapCat 适配器依赖 aiohttp,但当前环境未安装该依赖") - if self._server_config is None: - raise RuntimeError("NapCat 适配器尚未配置 napcat_server") - if self._connection_task is not None and not self._connection_task.done(): - return - - self._stop_requested = False - self._connection_task = asyncio.create_task(self._connection_loop(), name="napcat_adapter.connection") - - async def stop(self) -> None: - """停止当前连接并清理所有后台任务。""" - self._stop_requested = True - connection_task = self._connection_task - self._connection_task = None - - ws = self._ws - if ws is not None and not ws.closed: - with contextlib.suppress(Exception): - await ws.close() - self._ws = None - - if connection_task is not None: - connection_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await connection_task - - await self._cancel_background_tasks() - await self._notify_connection_closed() - self._fail_pending_actions("NapCat connection closed") - - async def call_action(self, action_name: str, params: Dict[str, Any]) -> Dict[str, Any]: - """发送 OneBot 动作并等待对应的 echo 响应。 - - Args: - action_name: OneBot 动作名称。 - params: 动作参数。 - - Returns: - Dict[str, Any]: NapCat 返回的原始响应字典。 - - Raises: - RuntimeError: 当连接不可用时抛出。 - """ - ws = self._ws - server_config = self._server_config - if ws is None or ws.closed or server_config is None: - raise RuntimeError("NapCat is not connected") - - echo_id = uuid4().hex - loop = asyncio.get_running_loop() - response_future: asyncio.Future[Dict[str, Any]] = loop.create_future() - self._pending_actions[echo_id] = response_future - - request_payload = {"action": action_name, "params": params, "echo": echo_id} - try: - async with self._send_lock: - await ws.send_str(json.dumps(request_payload, ensure_ascii=False)) - return await asyncio.wait_for(response_future, timeout=server_config.action_timeout_sec) - finally: - self._pending_actions.pop(echo_id, None) - - async def _connection_loop(self) -> None: - """维护单个 WebSocket 连接,并在断开后按配置重连。""" - assert ClientSession is not None - assert ClientTimeout is not None - - while not self._stop_requested: - server_config = self._server_config - if server_config is None: - return - - ws_url = server_config.build_ws_url() - timeout = ClientTimeout(total=None, connect=10) - - try: - async with ClientSession(headers=self._build_headers(server_config), timeout=timeout) as session: - async with session.ws_connect(ws_url, heartbeat=server_config.heartbeat_interval or None) as ws: - self._ws = ws - self._logger.info(f"NapCat 适配器已连接: {ws_url}") - await self._receive_loop(ws) - except asyncio.CancelledError: - raise - except Exception as exc: - self._logger.warning(f"NapCat 适配器连接失败: {exc}") - finally: - self._ws = None - await self._notify_connection_closed() - self._fail_pending_actions("NapCat connection interrupted") - - if self._stop_requested: - break - - await asyncio.sleep(server_config.reconnect_delay_sec) - - async def _receive_loop(self, ws: AiohttpClientWebSocketResponse) -> None: - """持续消费 WebSocket 消息并分发处理。 - - Args: - ws: 当前活跃的 WebSocket 连接对象。 - """ - assert WSMsgType is not None - - bootstrap_task = self._create_background_task( - self._notify_connection_opened(), - "napcat_adapter.bootstrap", - ) - try: - async for ws_message in ws: - if ws_message.type != WSMsgType.TEXT: - if ws_message.type in {WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.ERROR}: - break - continue - - payload = self._parse_json_message(ws_message.data) - if payload is None: - continue - - if echo_id := str(payload.get("echo") or "").strip(): - self._resolve_pending_action(echo_id, payload) - continue - - self._create_background_task(self._on_payload(payload), "napcat_adapter.payload") - finally: - if bootstrap_task is not None and not bootstrap_task.done(): - bootstrap_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await bootstrap_task - - def _create_background_task(self, coroutine: Awaitable[Any], name: str) -> asyncio.Task[Any]: - """创建并跟踪一个后台任务。 - - Args: - coroutine: 待执行的协程对象。 - name: 任务名。 - - Returns: - asyncio.Task[Any]: 已创建的后台任务。 - """ - task = asyncio.create_task(coroutine, name=name) - self._background_tasks.add(task) - task.add_done_callback(self._handle_background_task_completion) - return task - - def _handle_background_task_completion(self, task: asyncio.Task[Any]) -> None: - """处理后台任务结束后的清理与异常记录。 - - Args: - task: 已结束的后台任务。 - """ - self._background_tasks.discard(task) - if task.cancelled(): - return - - exception = task.exception() - if exception is not None: - self._logger.error(f"NapCat 适配器后台任务异常: {exception}", exc_info=True) - - async def _cancel_background_tasks(self) -> None: - """取消所有仍在运行的后台任务。""" - background_tasks = list(self._background_tasks) - for task in background_tasks: - task.cancel() - if background_tasks: - with contextlib.suppress(Exception): - await asyncio.gather(*background_tasks, return_exceptions=True) - self._background_tasks.clear() - - async def _notify_connection_opened(self) -> None: - """在连接建立后触发上层回调。""" - if self._connection_active: - return - - self._connection_active = True - try: - await self._on_connection_opened() - except Exception as exc: - self._logger.warning(f"NapCat 适配器连接建立回调失败: {exc}") - - async def _notify_connection_closed(self) -> None: - """在连接断开后触发上层回调。""" - if not self._connection_active: - return - - self._connection_active = False - try: - await self._on_connection_closed() - except Exception as exc: - self._logger.warning(f"NapCat 适配器断连回调失败: {exc}") - - def _resolve_pending_action(self, echo_id: str, payload: Dict[str, Any]) -> None: - """解析等待中的动作响应。 - - Args: - echo_id: 动作请求对应的 echo 标识。 - payload: NapCat 返回的响应载荷。 - """ - response_future = self._pending_actions.get(echo_id) - if response_future is None or response_future.done(): - return - response_future.set_result(payload) - - def _fail_pending_actions(self, error_message: str) -> None: - """让所有等待中的动作以异常方式结束。 - - Args: - error_message: 写入异常中的错误信息。 - """ - for response_future in self._pending_actions.values(): - if not response_future.done(): - response_future.set_exception(RuntimeError(error_message)) - self._pending_actions.clear() - - def _build_headers(self, server_config: NapCatServerConfig) -> Dict[str, str]: - """构造连接 NapCat 所需的请求头。 - - Args: - server_config: 当前生效的 NapCat 服务端配置。 - - Returns: - Dict[str, str]: WebSocket 握手请求头。 - """ - return {"Authorization": f"Bearer {server_config.token}"} if server_config.token else {} - - def _parse_json_message(self, data: Any) -> Optional[Dict[str, Any]]: - """解析 WebSocket 文本消息中的 JSON 数据。 - - Args: - data: WebSocket 收到的原始文本数据。 - - Returns: - Optional[Dict[str, Any]]: 成功时返回字典,失败时返回 ``None``。 - """ - try: - payload = json.loads(str(data)) - except Exception as exc: - self._logger.warning(f"NapCat 适配器解析 JSON 载荷失败: {exc}") - return None - - return payload if isinstance(payload, dict) else None