fix: remove nc ada

This commit is contained in:
UnCLAS-Prommer
2026-03-22 12:50:09 +08:00
committed by DrSmoothl
parent a0c653de45
commit 0066224251
12 changed files with 0 additions and 2390 deletions

View File

@@ -1 +0,0 @@
"""NapCat 内置适配器插件包。"""

View File

@@ -1,30 +0,0 @@
{
"manifest_version": 1,
"name": "napcat_adapter_builtin",
"version": "0.1.0",
"description": "Built-in NapCat adapter plugin for MVP message forwarding.",
"author": {
"name": "OpenAI Codex"
},
"license": "GPL-v3.0-or-later",
"host_application": {
"min_version": "1.0.0"
},
"keywords": [
"adapter",
"built-in",
"napcat",
"onebot",
"qq"
],
"categories": [
"Adapter",
"Built-in"
],
"default_locale": "en-US",
"plugin_info": {
"is_built_in": true,
"plugin_type": "adapter"
},
"capabilities": []
}

View File

@@ -1,510 +0,0 @@
"""NapCat 入站消息编解码。"""
from typing import Any, Dict, List, Mapping, Optional, Tuple
from uuid import uuid4
import hashlib
import json
import re
import time
from napcat_adapter.qq_queries import NapCatQueryService
_CQ_SEGMENT_PATTERN = re.compile(r"\[CQ:(?P<type>[a-zA-Z0-9_]+)(?P<params>(?:,[^\]]*)?)\]")
class NapCatInboundCodec:
"""NapCat 入站消息编码器。"""
def __init__(self, logger: Any, query_service: NapCatQueryService) -> None:
"""初始化入站消息编码器。
Args:
logger: 插件日志对象。
query_service: QQ 查询服务。
"""
self._logger = logger
self._query_service = query_service
async def build_message_dict(
self,
payload: Mapping[str, Any],
self_id: str,
sender_user_id: str,
sender: Mapping[str, Any],
) -> Dict[str, Any]:
"""构造 Host 侧可接受的 ``MessageDict``。
Args:
payload: NapCat 原始消息事件。
self_id: 当前机器人账号 ID。
sender_user_id: 发送者用户 ID。
sender: 发送者信息字典。
Returns:
Dict[str, Any]: 规范化后的 ``MessageDict``。
"""
message_type = str(payload.get("message_type") or "").strip() or "private"
group_id = str(payload.get("group_id") or "").strip()
group_name = str(payload.get("group_name") or "").strip() or (f"group_{group_id}" if group_id else "")
user_nickname = str(sender.get("nickname") or sender.get("card") or sender_user_id).strip() or sender_user_id
user_cardname = str(sender.get("card") or "").strip() or None
raw_message, is_at = await self.convert_segments(payload, self_id)
raw_message_text = str(payload.get("raw_message") or "").strip()
if not raw_message:
raw_message = [{"type": "text", "data": raw_message_text or "[unsupported]"}]
plain_text = self.build_plain_text(raw_message, raw_message_text)
timestamp_seconds = payload.get("time")
if not isinstance(timestamp_seconds, (int, float)):
timestamp_seconds = time.time()
additional_config: Dict[str, Any] = {"self_id": self_id, "napcat_message_type": message_type}
if group_id:
additional_config["platform_io_target_group_id"] = group_id
else:
additional_config["platform_io_target_user_id"] = sender_user_id
message_info: Dict[str, Any] = {
"user_info": {
"user_id": sender_user_id,
"user_nickname": user_nickname,
"user_cardname": user_cardname,
},
"additional_config": additional_config,
}
if group_id:
message_info["group_info"] = {"group_id": group_id, "group_name": group_name}
message_id = str(payload.get("message_id") or f"napcat-{uuid4().hex}").strip()
return {
"message_id": message_id,
"timestamp": str(float(timestamp_seconds)),
"platform": "qq",
"message_info": message_info,
"raw_message": raw_message,
"is_mentioned": is_at,
"is_at": is_at,
"is_emoji": False,
"is_picture": False,
"is_command": plain_text.startswith("/"),
"is_notify": False,
"session_id": "",
"processed_plain_text": plain_text,
"display_message": plain_text,
}
async def convert_segments(self, payload: Mapping[str, Any], self_id: str) -> Tuple[List[Dict[str, Any]], bool]:
"""将 OneBot 消息段转换为 Host 消息段结构。
Args:
payload: OneBot 原始消息事件。
self_id: 当前机器人账号 ID。
Returns:
Tuple[List[Dict[str, Any]], bool]: 转换后的消息段列表,以及是否 @ 到当前机器人。
"""
message_payload = payload.get("message")
if isinstance(message_payload, str):
parsed_message_payload = self._parse_cq_message_text(message_payload)
if parsed_message_payload:
message_payload = parsed_message_payload
else:
normalized_text = self._decode_cq_entities(message_payload).strip()
return ([{"type": "text", "data": normalized_text}] if normalized_text else []), False
if not isinstance(message_payload, list):
return [], False
converted_segments: List[Dict[str, Any]] = []
is_at = False
for segment in message_payload:
if not isinstance(segment, Mapping):
continue
segment_type = str(segment.get("type") or "").strip()
segment_data = segment.get("data", {})
if not isinstance(segment_data, Mapping):
segment_data = {}
if segment_type == "text":
if text_value := str(segment_data.get("text") or ""):
converted_segments.append({"type": "text", "data": text_value})
continue
if segment_type == "at":
if target_user_id := str(segment_data.get("qq") or "").strip():
converted_segments.append(
{
"type": "at",
"data": {
"target_user_id": target_user_id,
"target_user_nickname": None,
"target_user_cardname": None,
},
}
)
if self_id and target_user_id == self_id:
is_at = True
continue
if segment_type == "reply":
if reply_segment := await self._build_reply_segment(segment_data):
converted_segments.append(reply_segment)
continue
if segment_type == "face":
converted_segments.append({"type": "text", "data": "[face]"})
continue
if segment_type == "image":
converted_segments.append(await self._build_image_like_segment(segment_data, is_emoji=False))
continue
if segment_type == "record":
converted_segments.append(await self._build_record_segment(segment_data))
continue
if segment_type == "video":
converted_segments.append({"type": "text", "data": "[video]"})
continue
if segment_type == "file":
converted_segments.append({"type": "text", "data": "[file]"})
continue
if segment_type == "json":
converted_segments.append(self._build_json_text_segment(segment_data))
continue
if segment_type == "forward":
if forward_segment := await self._build_forward_segment(segment_data):
converted_segments.append(forward_segment)
continue
if segment_type in {"xml", "share"}:
converted_segments.append({"type": "text", "data": f"[{segment_type}]"})
return converted_segments, is_at
async def _build_reply_segment(self, segment_data: Mapping[str, Any]) -> Optional[Dict[str, Any]]:
"""构造回复消息段。
Args:
segment_data: OneBot ``reply`` 段的 ``data`` 字典。
Returns:
Optional[Dict[str, Any]]: 转换后的回复消息段;缺少消息 ID 时返回 ``None``。
"""
target_message_id = str(segment_data.get("id") or "").strip()
if not target_message_id:
return None
message_detail = await self._query_service.get_message_detail(target_message_id)
reply_payload: Dict[str, Any] = {"target_message_id": target_message_id}
if message_detail is not None:
sender = message_detail.get("sender", {})
if not isinstance(sender, Mapping):
sender = {}
reply_payload["target_message_content"] = str(message_detail.get("raw_message") or "").strip() or None
reply_payload["target_message_sender_id"] = str(
message_detail.get("user_id") or sender.get("user_id") or ""
).strip() or None
reply_payload["target_message_sender_nickname"] = str(sender.get("nickname") or "").strip() or None
reply_payload["target_message_sender_cardname"] = str(sender.get("card") or "").strip() or None
return {"type": "reply", "data": reply_payload}
async def _build_image_like_segment(
self,
segment_data: Mapping[str, Any],
is_emoji: bool,
) -> Dict[str, Any]:
"""构造图片或表情消息段。
Args:
segment_data: OneBot ``image`` 段的 ``data`` 字典。
is_emoji: 是否按表情组件处理。
Returns:
Dict[str, Any]: 转换后的图片或表情消息段。
"""
subtype = self._normalize_numeric_segment_value(segment_data.get("sub_type"))
actual_is_emoji = is_emoji or (subtype is not None and subtype not in {0, 4, 9})
image_url = str(segment_data.get("url") or "").strip()
binary_data = await self._query_service.download_binary(image_url)
if not binary_data:
return {"type": "text", "data": "[emoji]" if actual_is_emoji else "[image]"}
return {
"type": "emoji" if actual_is_emoji else "image",
"data": "",
"hash": hashlib.sha256(binary_data).hexdigest(),
"binary_data_base64": self._encode_binary(binary_data),
}
async def _build_record_segment(self, segment_data: Mapping[str, Any]) -> Dict[str, Any]:
"""构造语音消息段。
Args:
segment_data: OneBot ``record`` 段的 ``data`` 字典。
Returns:
Dict[str, Any]: 转换后的语音或占位文本消息段。
"""
file_name = str(segment_data.get("file") or "").strip()
file_id = str(segment_data.get("file_id") or "").strip() or None
if not file_name:
return {"type": "text", "data": "[voice]"}
record_detail = await self._query_service.get_record_detail(file_name=file_name, file_id=file_id)
if record_detail is None:
return {"type": "text", "data": "[voice]"}
record_base64 = str(record_detail.get("base64") or "").strip()
if not record_base64:
return {"type": "text", "data": "[voice]"}
try:
binary_data = self._decode_binary(record_base64)
except Exception:
return {"type": "text", "data": "[voice]"}
return {
"type": "voice",
"data": "",
"hash": hashlib.sha256(binary_data).hexdigest(),
"binary_data_base64": self._encode_binary(binary_data),
}
async def _build_forward_segment(self, segment_data: Mapping[str, Any]) -> Optional[Dict[str, Any]]:
"""构造合并转发消息段。
Args:
segment_data: OneBot ``forward`` 段的 ``data`` 字典。
Returns:
Optional[Dict[str, Any]]: 转换后的合并转发消息段;失败时返回 ``None``。
"""
message_id = str(segment_data.get("id") or "").strip()
if not message_id:
return None
forward_detail = await self._query_service.get_forward_message(message_id)
if forward_detail is None:
return {"type": "text", "data": "[forward]"}
messages = forward_detail.get("messages", [])
if not isinstance(messages, list):
return {"type": "text", "data": "[forward]"}
forward_nodes: List[Dict[str, Any]] = []
for forward_message in messages:
if not isinstance(forward_message, Mapping):
continue
raw_content = forward_message.get("content", [])
content_segments = await self._convert_forward_content(raw_content, "")
sender = forward_message.get("sender", {})
if not isinstance(sender, Mapping):
sender = {}
forward_nodes.append(
{
"user_id": str(sender.get("user_id") or sender.get("uin") or "").strip() or None,
"user_nickname": str(sender.get("nickname") or sender.get("name") or "未知用户"),
"user_cardname": str(sender.get("card") or "").strip() or None,
"message_id": str(forward_message.get("message_id") or uuid4().hex),
"content": content_segments or [{"type": "text", "data": "[empty]"}],
}
)
if not forward_nodes:
return {"type": "text", "data": "[forward]"}
return {"type": "forward", "data": forward_nodes}
async def _convert_forward_content(self, raw_content: Any, self_id: str) -> List[Dict[str, Any]]:
"""转换转发节点内部的消息段列表。
Args:
raw_content: 转发节点原始内容。
self_id: 当前机器人账号 ID。
Returns:
List[Dict[str, Any]]: 转换后的消息段列表。
"""
pseudo_payload: Dict[str, Any] = {"message": raw_content}
segments, _ = await self.convert_segments(pseudo_payload, self_id)
return segments
def _build_json_text_segment(self, segment_data: Mapping[str, Any]) -> Dict[str, Any]:
"""将 JSON 卡片最佳努力转换为文本占位。
Args:
segment_data: OneBot ``json`` 段的 ``data`` 字典。
Returns:
Dict[str, Any]: 转换后的文本消息段。
"""
json_data = str(segment_data.get("data") or "").strip()
if not json_data:
return {"type": "text", "data": "[json]"}
try:
parsed_json = json.loads(json_data)
except Exception:
return {"type": "text", "data": "[json]"}
app_name = str(parsed_json.get("app") or "").strip()
prompt = ""
if isinstance(parsed_json.get("meta"), Mapping):
prompt = str(parsed_json["meta"].get("prompt") or "").strip()
text = prompt or app_name or "json"
return {"type": "text", "data": f"[json:{text}]"}
@staticmethod
def _encode_binary(binary_data: bytes) -> str:
"""将二进制内容编码为 Base64 字符串。
Args:
binary_data: 待编码的二进制内容。
Returns:
str: Base64 编码字符串。
"""
import base64
return base64.b64encode(binary_data).decode("utf-8")
@staticmethod
def _decode_binary(binary_base64: str) -> bytes:
"""将 Base64 字符串解码为二进制内容。
Args:
binary_base64: Base64 字符串。
Returns:
bytes: 解码后的二进制内容。
"""
import base64
return base64.b64decode(binary_base64)
def build_plain_text(self, raw_message: List[Dict[str, Any]], fallback_text: str) -> str:
"""从标准消息段中提取可展示的纯文本。
Args:
raw_message: 标准化后的消息段列表。
fallback_text: 当无法拼出文本时使用的回退文本。
Returns:
str: 用于 Host 展示和命令判断的纯文本内容。
"""
plain_text_parts: List[str] = []
for item in raw_message:
if not isinstance(item, Mapping):
continue
item_type = str(item.get("type") or "").strip()
item_data = item.get("data")
if item_type == "text":
plain_text_parts.append(str(item_data or ""))
elif item_type == "at" and isinstance(item_data, Mapping):
plain_text_parts.append(f"@{item_data.get('target_user_id') or ''}")
elif item_type == "reply":
plain_text_parts.append("[reply]")
elif item_type == "forward":
plain_text_parts.append("[forward]")
elif item_type in {"image", "emoji", "voice"}:
plain_text_parts.append(f"[{item_type}]")
plain_text = "".join(part for part in plain_text_parts if part).strip()
return plain_text or fallback_text or "[unsupported]"
def _parse_cq_message_text(self, message_text: str) -> List[Dict[str, Any]]:
"""将 CQ 码字符串解析为 OneBot 风格消息段列表。
Args:
message_text: NapCat 在字符串模式下返回的消息内容。
Returns:
List[Dict[str, Any]]: 解析后的 OneBot 风格消息段列表。
"""
parsed_segments: List[Dict[str, Any]] = []
current_index = 0
for match in _CQ_SEGMENT_PATTERN.finditer(message_text):
prefix_text = self._decode_cq_entities(message_text[current_index : match.start()])
if prefix_text:
parsed_segments.append({"type": "text", "data": {"text": prefix_text}})
segment_type = str(match.group("type") or "").strip()
segment_data = self._parse_cq_segment_data(match.group("params") or "")
if segment_type:
parsed_segments.append({"type": segment_type, "data": segment_data})
current_index = match.end()
suffix_text = self._decode_cq_entities(message_text[current_index:])
if suffix_text:
parsed_segments.append({"type": "text", "data": {"text": suffix_text}})
return parsed_segments
def _parse_cq_segment_data(self, raw_params: str) -> Dict[str, Any]:
"""解析单个 CQ 段中的参数串。
Args:
raw_params: 形如 ``,key=value,key2=value2`` 的原始参数字符串。
Returns:
Dict[str, Any]: 解析后的参数字典。
"""
parsed_data: Dict[str, Any] = {}
if not raw_params:
return parsed_data
for item in raw_params.lstrip(",").split(","):
if not item or "=" not in item:
continue
key, value = item.split("=", 1)
normalized_key = key.strip()
if not normalized_key:
continue
decoded_value = self._decode_cq_entities(value)
parsed_data[normalized_key] = self._normalize_numeric_segment_value(decoded_value)
return parsed_data
@staticmethod
def _decode_cq_entities(text: str) -> str:
"""解码 CQ 码中的 HTML 风格转义实体。
Args:
text: 待解码的 CQ 文本。
Returns:
str: 解码后的普通文本。
"""
return (
text.replace("&amp;", "&")
.replace("&#91;", "[")
.replace("&#93;", "]")
.replace("&#44;", ",")
)
@staticmethod
def _normalize_numeric_segment_value(value: Any) -> Any:
"""将可安全识别的数字字符串转为整数。
Args:
value: 原始字段值。
Returns:
Any: 规范化后的字段值。
"""
if isinstance(value, str):
stripped_value = value.strip()
if stripped_value.isdigit():
return int(stripped_value)
return stripped_value
return value

View File

@@ -1,192 +0,0 @@
"""NapCat 出站消息编解码。"""
from typing import Any, Dict, List, Mapping, Tuple
class NapCatOutboundCodec:
"""NapCat 出站消息编码器。"""
def build_outbound_action(
self,
message: Mapping[str, Any],
route: Mapping[str, Any],
) -> Tuple[str, Dict[str, Any]]:
"""为 Host 出站消息构造 OneBot 动作。
Args:
message: Host 侧标准 ``MessageDict``。
route: Platform IO 路由信息。
Returns:
Tuple[str, Dict[str, Any]]: 动作名称与参数字典。
Raises:
ValueError: 当私聊出站缺少目标用户 ID 时抛出。
"""
message_info = message.get("message_info", {})
if not isinstance(message_info, Mapping):
message_info = {}
group_info = message_info.get("group_info", {})
if not isinstance(group_info, Mapping):
group_info = {}
additional_config = message_info.get("additional_config", {})
if not isinstance(additional_config, Mapping):
additional_config = {}
raw_message = message.get("raw_message", [])
segments = self.convert_segments(raw_message)
if target_group_id := str(
group_info.get("group_id") or additional_config.get("platform_io_target_group_id") or ""
).strip():
return "send_group_msg", {"group_id": target_group_id, "message": segments}
target_user_id = str(
additional_config.get("platform_io_target_user_id")
or additional_config.get("target_user_id")
or route.get("target_user_id")
or ""
).strip()
if not target_user_id:
raise ValueError("Outbound private message is missing target_user_id")
return "send_private_msg", {"message": segments, "user_id": target_user_id}
def convert_segments(self, raw_message: Any) -> List[Dict[str, Any]]:
"""将 Host 消息段转换为 OneBot 消息段。
Args:
raw_message: Host 侧 ``raw_message`` 字段。
Returns:
List[Dict[str, Any]]: OneBot 消息段列表。
"""
if not isinstance(raw_message, list):
return [{"type": "text", "data": {"text": ""}}]
outbound_segments: List[Dict[str, Any]] = []
for item in raw_message:
if not isinstance(item, Mapping):
continue
item_type = str(item.get("type") or "").strip()
item_data = item.get("data")
if item_type == "text":
text_value = str(item_data or "")
outbound_segments.append({"type": "text", "data": {"text": text_value}})
continue
if item_type == "at" and isinstance(item_data, Mapping):
if target_user_id := str(item_data.get("target_user_id") or "").strip():
outbound_segments.append({"type": "at", "data": {"qq": target_user_id}})
continue
if item_type == "reply":
if isinstance(item_data, Mapping):
target_message_id = str(item_data.get("target_message_id") or "").strip()
else:
target_message_id = str(item_data or "").strip()
if target_message_id:
outbound_segments.append({"type": "reply", "data": {"id": target_message_id}})
continue
if item_type == "image":
binary_base64 = str(item.get("binary_data_base64") or "").strip()
if binary_base64:
outbound_segments.append(
{
"type": "image",
"data": {"file": f"base64://{binary_base64}", "subtype": 0},
}
)
else:
outbound_segments.append({"type": "text", "data": {"text": "[image]"}})
continue
if item_type == "emoji":
binary_base64 = str(item.get("binary_data_base64") or "").strip()
if binary_base64:
outbound_segments.append(
{
"type": "image",
"data": {
"file": f"base64://{binary_base64}",
"subtype": 1,
"summary": "[动画表情]",
},
}
)
else:
outbound_segments.append({"type": "text", "data": {"text": "[emoji]"}})
continue
if item_type == "voice":
binary_base64 = str(item.get("binary_data_base64") or "").strip()
if binary_base64:
outbound_segments.append({"type": "record", "data": {"file": f"base64://{binary_base64}"}})
else:
outbound_segments.append({"type": "text", "data": {"text": "[voice]"}})
continue
if item_type == "forward" and isinstance(item_data, list):
outbound_segments.extend(self._build_forward_nodes(item_data))
continue
if item_type == "dict" and isinstance(item_data, Mapping):
if dict_segment := self._build_dict_component_segment(item_data):
outbound_segments.append(dict_segment)
continue
fallback_text = f"[unsupported:{item_type or 'unknown'}]"
outbound_segments.append({"type": "text", "data": {"text": fallback_text}})
if not outbound_segments:
outbound_segments.append({"type": "text", "data": {"text": ""}})
return outbound_segments
def _build_forward_nodes(self, forward_nodes: List[Any]) -> List[Dict[str, Any]]:
"""构造 NapCat 转发节点列表。
Args:
forward_nodes: 内部转发节点列表。
Returns:
List[Dict[str, Any]]: NapCat 转发节点列表。
"""
built_nodes: List[Dict[str, Any]] = []
for node in forward_nodes:
if not isinstance(node, Mapping):
continue
raw_content = node.get("content", [])
node_segments = self.convert_segments(raw_content)
built_nodes.append(
{
"type": "node",
"data": {
"name": str(node.get("user_nickname") or node.get("user_cardname") or "QQ用户"),
"uin": str(node.get("user_id") or ""),
"content": node_segments,
},
}
)
return built_nodes
def _build_dict_component_segment(self, item_data: Mapping[str, Any]) -> Dict[str, Any]:
"""尽力将 ``DictComponent`` 转换为 NapCat 消息段。
Args:
item_data: ``DictComponent`` 原始数据。
Returns:
Dict[str, Any]: NapCat 消息段;不支持时返回占位文本段。
"""
raw_type = str(item_data.get("type") or "").strip()
raw_payload = item_data.get("data", item_data)
if raw_type in {"file", "music", "video", "face"} and isinstance(raw_payload, Mapping):
return {"type": raw_type, "data": dict(raw_payload)}
if raw_type in {"image", "record", "reply", "at"} and isinstance(raw_payload, Mapping):
return {"type": raw_type, "data": dict(raw_payload)}
return {"type": "text", "data": {"text": f"[unsupported:{raw_type or 'dict'}]"}}

View File

@@ -1,398 +0,0 @@
"""NapCat 内置适配器配置解析。"""
from dataclasses import dataclass, field
from typing import Any, Dict, Mapping, Optional, Set, Tuple
from urllib.parse import urlparse
from napcat_adapter.constants import (
DEFAULT_ACTION_TIMEOUT_SEC,
DEFAULT_CHAT_LIST_TYPE,
DEFAULT_HEARTBEAT_INTERVAL_SEC,
DEFAULT_NAPCAT_HOST,
DEFAULT_NAPCAT_PORT,
DEFAULT_RECONNECT_DELAY_SEC,
SUPPORTED_CONFIG_VERSION,
)
@dataclass(frozen=True)
class NapCatPluginOptions:
"""插件级配置。"""
enabled: bool = False
config_version: str = ""
def should_connect(self) -> bool:
"""判断当前配置下是否应当启动连接。
Returns:
bool: 若插件连接已启用,则返回 ``True``。
"""
return self.enabled
@dataclass(frozen=True)
class NapCatServerConfig:
"""NapCat 正向 WebSocket 连接配置。"""
host: str = DEFAULT_NAPCAT_HOST
port: int = DEFAULT_NAPCAT_PORT
token: str = ""
heartbeat_interval: float = DEFAULT_HEARTBEAT_INTERVAL_SEC
reconnect_delay_sec: float = DEFAULT_RECONNECT_DELAY_SEC
action_timeout_sec: float = DEFAULT_ACTION_TIMEOUT_SEC
connection_id: str = ""
def build_ws_url(self) -> str:
"""构造正向 WebSocket 地址。
Returns:
str: 供适配器作为客户端连接的 NapCat WebSocket 地址。
"""
return f"ws://{self.host}:{self.port}"
@dataclass(frozen=True)
class NapCatChatConfig:
"""聊天名单配置。"""
group_list_type: str = DEFAULT_CHAT_LIST_TYPE
group_list: Set[str] = field(default_factory=set)
private_list_type: str = DEFAULT_CHAT_LIST_TYPE
private_list: Set[str] = field(default_factory=set)
ban_user_id: Set[str] = field(default_factory=set)
@dataclass(frozen=True)
class NapCatFilterConfig:
"""消息过滤配置。"""
ignore_self_message: bool = True
@dataclass(frozen=True)
class NapCatPluginSettings:
"""NapCat 插件完整配置。"""
plugin: NapCatPluginOptions = field(default_factory=NapCatPluginOptions)
napcat_server: NapCatServerConfig = field(default_factory=NapCatServerConfig)
chat: NapCatChatConfig = field(default_factory=NapCatChatConfig)
filters: NapCatFilterConfig = field(default_factory=NapCatFilterConfig)
@classmethod
def from_mapping(cls, raw_config: Mapping[str, Any], logger: Any) -> "NapCatPluginSettings":
"""从 Runner 注入的原始配置字典解析插件配置。
Args:
raw_config: Runner 注入的原始配置内容。
logger: 插件日志对象。
Returns:
NapCatPluginSettings: 规范化后的插件配置。
"""
plugin_section = _as_mapping(raw_config.get("plugin"))
server_section = _as_mapping(raw_config.get("napcat_server"))
legacy_connection_section = _as_mapping(raw_config.get("connection"))
chat_section = _as_mapping(raw_config.get("chat"))
filters_section = _as_mapping(raw_config.get("filters"))
if not server_section and legacy_connection_section:
logger.warning("NapCat 适配器检测到旧版 [connection] 配置段,请尽快迁移到 [napcat_server]")
server_section = legacy_connection_section
legacy_host, legacy_port = _read_legacy_host_port(server_section, legacy_connection_section, logger)
parsed_host = _read_string(server_section, "host") or legacy_host or DEFAULT_NAPCAT_HOST
parsed_port = _read_positive_int(
mapping=server_section,
key="port",
default=legacy_port or DEFAULT_NAPCAT_PORT,
logger=logger,
setting_name="napcat_server.port",
)
return cls(
plugin=NapCatPluginOptions(
enabled=_read_bool(plugin_section, "enabled", False),
config_version=_read_string(plugin_section, "config_version"),
),
napcat_server=NapCatServerConfig(
host=parsed_host,
port=parsed_port,
token=_read_string(server_section, "token") or _read_string(server_section, "access_token"),
heartbeat_interval=_read_positive_float(
mapping=server_section,
key="heartbeat_interval",
default=_read_positive_float(
mapping=server_section,
key="heartbeat_sec",
default=DEFAULT_HEARTBEAT_INTERVAL_SEC,
logger=logger,
setting_name="napcat_server.heartbeat_interval",
),
logger=logger,
setting_name="napcat_server.heartbeat_interval",
),
reconnect_delay_sec=_read_positive_float(
mapping=server_section,
key="reconnect_delay_sec",
default=DEFAULT_RECONNECT_DELAY_SEC,
logger=logger,
setting_name="napcat_server.reconnect_delay_sec",
),
action_timeout_sec=_read_positive_float(
mapping=server_section,
key="action_timeout_sec",
default=DEFAULT_ACTION_TIMEOUT_SEC,
logger=logger,
setting_name="napcat_server.action_timeout_sec",
),
connection_id=_read_string(server_section, "connection_id"),
),
chat=NapCatChatConfig(
group_list_type=_read_list_mode(
mapping=chat_section,
key="group_list_type",
default=DEFAULT_CHAT_LIST_TYPE,
logger=logger,
setting_name="chat.group_list_type",
),
group_list=_read_string_set(chat_section, "group_list"),
private_list_type=_read_list_mode(
mapping=chat_section,
key="private_list_type",
default=DEFAULT_CHAT_LIST_TYPE,
logger=logger,
setting_name="chat.private_list_type",
),
private_list=_read_string_set(chat_section, "private_list"),
ban_user_id=_read_string_set(chat_section, "ban_user_id"),
),
filters=NapCatFilterConfig(
ignore_self_message=_read_bool(filters_section, "ignore_self_message", True),
),
)
def should_connect(self) -> bool:
"""判断当前配置下是否应当启动连接。
Returns:
bool: 若插件连接已启用,则返回 ``True``。
"""
return self.plugin.should_connect()
def validate(self, logger: Any) -> bool:
"""校验当前配置是否满足启动连接的前提条件。
Args:
logger: 插件日志对象。
Returns:
bool: 若配置满足启动连接的前提条件,则返回 ``True``。
"""
config_version = self.plugin.config_version
if not config_version:
logger.error(
f"NapCat 适配器配置缺少 plugin.config_version当前插件要求版本 {SUPPORTED_CONFIG_VERSION}"
)
return False
if config_version != SUPPORTED_CONFIG_VERSION:
logger.error(
"NapCat 适配器配置版本不兼容: "
f"当前为 {config_version},当前插件要求 {SUPPORTED_CONFIG_VERSION}"
)
return False
if not self.napcat_server.host:
logger.warning("NapCat 适配器已启用,但 napcat_server.host 为空")
return False
if self.napcat_server.port <= 0:
logger.warning("NapCat 适配器已启用,但 napcat_server.port 不是正整数")
return False
return True
def _as_mapping(value: Any) -> Dict[str, Any]:
"""将任意值安全转换为字典。
Args:
value: 待转换的值。
Returns:
Dict[str, Any]: 若原值是映射,则返回普通字典;否则返回空字典。
"""
return dict(value) if isinstance(value, Mapping) else {}
def _read_bool(mapping: Mapping[str, Any], key: str, default: bool) -> bool:
"""安全读取布尔配置值。
Args:
mapping: 待读取的配置字典。
key: 目标键名。
default: 读取失败时的默认值。
Returns:
bool: 解析后的布尔值。
"""
value = mapping.get(key, default)
return value if isinstance(value, bool) else default
def _read_string(mapping: Mapping[str, Any], key: str) -> str:
"""安全读取字符串配置值。
Args:
mapping: 待读取的配置字典。
key: 目标键名。
Returns:
str: 去除首尾空白后的字符串值。
"""
value = mapping.get(key)
return "" if value is None else str(value).strip()
def _read_positive_float(
mapping: Mapping[str, Any],
key: str,
default: float,
logger: Any,
setting_name: str,
) -> float:
"""安全读取正浮点数配置值。
Args:
mapping: 待读取的配置字典。
key: 目标键名。
default: 读取失败时的默认值。
logger: 插件日志对象。
setting_name: 用于日志输出的完整配置名。
Returns:
float: 合法的正浮点数;否则返回默认值。
"""
value = mapping.get(key, default)
if isinstance(value, (int, float)) and float(value) > 0:
return float(value)
if key in mapping:
logger.warning(f"NapCat 适配器配置项取值无效,已回退到默认值: {setting_name}={value!r},默认值为 {default}")
return default
def _read_positive_int(
mapping: Mapping[str, Any],
key: str,
default: int,
logger: Any,
setting_name: str,
) -> int:
"""安全读取正整数配置值。
Args:
mapping: 待读取的配置字典。
key: 目标键名。
default: 读取失败时的默认值。
logger: 插件日志对象。
setting_name: 用于日志输出的完整配置名。
Returns:
int: 合法的正整数;否则返回默认值。
"""
value = mapping.get(key, default)
if isinstance(value, int) and value > 0:
return value
if isinstance(value, str) and value.isdigit() and int(value) > 0:
return int(value)
if key in mapping:
logger.warning(f"NapCat 适配器配置项取值无效,已回退到默认值: {setting_name}={value!r},默认值为 {default}")
return default
def _read_list_mode(
mapping: Mapping[str, Any],
key: str,
default: str,
logger: Any,
setting_name: str,
) -> str:
"""安全读取名单模式配置值。
Args:
mapping: 待读取的配置字典。
key: 目标键名。
default: 读取失败时的默认值。
logger: 插件日志对象。
setting_name: 用于日志输出的完整配置名。
Returns:
str: 合法的名单模式字符串。
"""
value = mapping.get(key, default)
if isinstance(value, str):
normalized_value = value.strip()
if normalized_value in {"whitelist", "blacklist"}:
return normalized_value
if key in mapping:
logger.warning(f"NapCat 适配器配置项取值无效,已回退到默认值: {setting_name}={value!r},默认值为 {default}")
return default
def _read_string_set(mapping: Mapping[str, Any], key: str) -> Set[str]:
"""安全读取字符串集合配置值。
Args:
mapping: 待读取的配置字典。
key: 目标键名。
Returns:
Set[str]: 规范化后的字符串集合。
"""
value = mapping.get(key, [])
if not isinstance(value, list):
return set()
normalized_values: Set[str] = set()
for item in value:
item_text = "" if item is None else str(item).strip()
if item_text:
normalized_values.add(item_text)
return normalized_values
def _read_legacy_host_port(
server_section: Mapping[str, Any],
legacy_connection_section: Mapping[str, Any],
logger: Any,
) -> Tuple[str, Optional[int]]:
"""从旧版 ``ws_url`` 配置中提取主机与端口。
Args:
server_section: 新版 ``napcat_server`` 配置段。
legacy_connection_section: 旧版 ``connection`` 配置段。
logger: 插件日志对象。
Returns:
Tuple[str, Optional[int]]: 解析到的主机与端口;若未找到,则返回空主机与 ``None``。
"""
legacy_ws_url = _read_string(server_section, "ws_url") or _read_string(legacy_connection_section, "ws_url")
if not legacy_ws_url:
return "", None
parsed_url = urlparse(legacy_ws_url)
parsed_host = parsed_url.hostname or ""
parsed_port = parsed_url.port
logger.warning(
"NapCat 适配器检测到旧版 ws_url 配置,已临时兼容解析,请尽快迁移到 napcat_server.host/port"
)
if parsed_url.path not in {"", "/"}:
logger.warning("NapCat 适配器旧版 ws_url 包含路径,新的 napcat_server 配置不会保留该路径")
return parsed_host, parsed_port

View File

@@ -1,9 +0,0 @@
"""NapCat 内置适配器共享常量。"""
SUPPORTED_CONFIG_VERSION = "0.1.0"
DEFAULT_NAPCAT_HOST = "127.0.0.1"
DEFAULT_NAPCAT_PORT = 3001
DEFAULT_RECONNECT_DELAY_SEC = 5.0
DEFAULT_HEARTBEAT_INTERVAL_SEC = 30.0
DEFAULT_ACTION_TIMEOUT_SEC = 15.0
DEFAULT_CHAT_LIST_TYPE = "whitelist"

View File

@@ -1,68 +0,0 @@
"""NapCat 入站消息过滤。"""
from typing import Any, Set
from napcat_adapter.config import NapCatChatConfig
class NapCatChatFilter:
"""NapCat 聊天名单过滤器。"""
def __init__(self, logger: Any) -> None:
"""初始化聊天名单过滤器。
Args:
logger: 插件日志对象。
"""
self._logger = logger
def is_inbound_chat_allowed(
self,
sender_user_id: str,
group_id: str,
chat_config: NapCatChatConfig,
) -> bool:
"""检查入站消息是否通过聊天名单过滤。
Args:
sender_user_id: 发送者用户 ID。
group_id: 群聊 ID私聊时为空字符串。
chat_config: 当前生效的聊天配置。
Returns:
bool: 若消息允许继续进入 Host则返回 ``True``。
"""
if sender_user_id in chat_config.ban_user_id:
self._logger.warning(f"NapCat 用户 {sender_user_id} 在全局禁止名单中,消息被丢弃")
return False
if group_id:
if not self._is_id_allowed_by_list_policy(group_id, chat_config.group_list_type, chat_config.group_list):
self._logger.warning(f"NapCat 群聊 {group_id} 未通过聊天名单过滤,消息被丢弃")
return False
return True
if not self._is_id_allowed_by_list_policy(
sender_user_id,
chat_config.private_list_type,
chat_config.private_list,
):
self._logger.warning(f"NapCat 私聊用户 {sender_user_id} 未通过聊天名单过滤,消息被丢弃")
return False
return True
@staticmethod
def _is_id_allowed_by_list_policy(target_id: str, list_type: str, configured_ids: Set[str]) -> bool:
"""根据白名单或黑名单规则判断目标 ID 是否允许通过。
Args:
target_id: 待检查的目标 ID。
list_type: 名单模式,仅支持 ``whitelist`` 或 ``blacklist``。
configured_ids: 配置中的 ID 集合。
Returns:
bool: 若目标 ID 允许通过,则返回 ``True``。
"""
if list_type == "whitelist":
return target_id in configured_ids
return target_id not in configured_ids

View File

@@ -1,381 +0,0 @@
"""内置 NapCat 适配器插件。
当前实现维持 MVP 范围,目标是跑通基础消息收发链路:
1. 作为客户端连接 NapCat / OneBot v11 WebSocket 服务。
2. 将入站消息事件转换为 Host 侧的 ``MessageDict``。
3. 将 Host 出站消息转换为 OneBot 动作并发送。
当前范围刻意收敛为:
- 单连接
- 文本、@、reply 基础转发
- 暂不处理 ``notice`` / ``meta_event`` 的完整语义归一化
- 暂不支持图片、语音、文件等复杂媒体
"""
from __future__ import annotations
from typing import Any, Dict, Mapping, Optional
import asyncio
from maibot_sdk import Adapter, MaiBotPlugin
from napcat_adapter.codec_inbound import NapCatInboundCodec
from napcat_adapter.codec_outbound import NapCatOutboundCodec
from napcat_adapter.config import NapCatPluginSettings
from napcat_adapter.filters import NapCatChatFilter
from napcat_adapter.qq_notice import NapCatNoticeCodec
from napcat_adapter.qq_queries import NapCatQueryService
from napcat_adapter.runtime_state import NapCatRuntimeStateManager
from napcat_adapter.transport import NapCatTransportClient
@Adapter(platform="qq", protocol="napcat", send_method="send_to_platform")
class NapCatAdapterPlugin(MaiBotPlugin):
"""NapCat 适配器 MVP 实现。"""
def __init__(self) -> None:
"""初始化 NapCat 适配器插件实例。"""
super().__init__()
self._plugin_config: Dict[str, Any] = {}
self._settings: Optional[NapCatPluginSettings] = None
self._inbound_codec: Optional[NapCatInboundCodec] = None
self._outbound_codec = NapCatOutboundCodec()
self._chat_filter: Optional[NapCatChatFilter] = None
self._query_service: Optional[NapCatQueryService] = None
self._notice_codec: Optional[NapCatNoticeCodec] = None
self._runtime_state: Optional[NapCatRuntimeStateManager] = None
self._transport: Optional[NapCatTransportClient] = None
def set_plugin_config(self, config: Dict[str, Any]) -> None:
"""设置插件配置内容。
Args:
config: Runner 注入的 ``config.toml`` 解析结果。
"""
self._plugin_config = config if isinstance(config, dict) else {}
async def on_load(self) -> None:
"""在插件加载时根据配置决定是否启动连接。"""
await self._restart_connection_if_needed()
async def on_unload(self) -> None:
"""在插件卸载时关闭连接并清理运行时状态。"""
await self._stop_connection()
async def on_config_update(self, new_config: Dict[str, Any], version: str) -> None:
"""在配置更新后重载连接状态。
Args:
new_config: 最新的插件配置。
version: 配置版本号。
"""
self.set_plugin_config(new_config)
self._settings = None
if version:
self.ctx.logger.debug(f"NapCat 适配器收到配置更新通知: {version}")
await self._restart_connection_if_needed()
async def send_to_platform(
self,
message: Dict[str, Any],
route: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""将 Host 出站消息发送到 NapCat。
Args:
message: Host 侧标准 ``MessageDict``。
route: Platform IO 生成的路由信息。
metadata: Platform IO 附带的投递元数据。
**kwargs: 预留的扩展参数。
Returns:
Dict[str, Any]: 标准化后的发送结果。
"""
del metadata
del kwargs
self._ensure_runtime_components()
transport = self._transport
if transport is None:
return {"success": False, "error": "NapCat transport is not initialized"}
try:
action_name, params = self._outbound_codec.build_outbound_action(message, route or {})
response = await transport.call_action(action_name, params)
except Exception as exc:
return {"success": False, "error": str(exc)}
if str(response.get("status", "")).lower() != "ok":
return {
"success": False,
"error": str(response.get("wording") or response.get("message") or "NapCat send failed"),
"metadata": {"retcode": response.get("retcode")},
}
response_data = response.get("data", {})
external_message_id = ""
if isinstance(response_data, Mapping):
external_message_id = str(response_data.get("message_id") or "")
return {
"success": True,
"external_message_id": external_message_id or None,
"metadata": {"action": action_name},
}
def _ensure_runtime_components(self) -> None:
"""确保运行时依赖对象已经完成初始化。"""
if self._chat_filter is None:
self._chat_filter = NapCatChatFilter(self.ctx.logger)
if self._transport is None:
self._transport = NapCatTransportClient(
logger=self.ctx.logger,
on_connection_opened=self._bootstrap_adapter_runtime_state,
on_connection_closed=self._handle_transport_disconnected,
on_payload=self._handle_transport_payload,
)
if self._query_service is None:
self._query_service = NapCatQueryService(self.ctx.logger, self._transport)
if self._inbound_codec is None:
self._inbound_codec = NapCatInboundCodec(self.ctx.logger, self._query_service)
if self._notice_codec is None:
self._notice_codec = NapCatNoticeCodec(self.ctx.logger, self._query_service)
if self._runtime_state is None:
self._runtime_state = NapCatRuntimeStateManager(self.ctx.adapter, self.ctx.logger)
def _reload_settings(self) -> NapCatPluginSettings:
"""重新解析当前插件配置。
Returns:
NapCatPluginSettings: 最新的规范化配置。
"""
self._settings = NapCatPluginSettings.from_mapping(self._plugin_config, self.ctx.logger)
return self._settings
async def _restart_connection_if_needed(self) -> None:
"""根据当前配置重启连接循环。"""
self._ensure_runtime_components()
settings = self._reload_settings()
await self._stop_connection()
if not settings.should_connect():
self.ctx.logger.info("NapCat 适配器保持空闲状态,因为插件或配置未启用")
return
if not settings.validate(self.ctx.logger):
return
transport = self._transport
assert transport is not None
if not transport.is_available():
self.ctx.logger.error("NapCat 适配器依赖 aiohttp但当前环境未安装该依赖")
return
transport.configure(settings.napcat_server)
await transport.start()
async def _stop_connection(self) -> None:
"""停止当前连接。"""
transport = self._transport
if transport is not None:
await transport.stop()
return
runtime_state = self._runtime_state
if runtime_state is not None:
await runtime_state.report_disconnected()
async def _handle_transport_payload(self, payload: Dict[str, Any]) -> None:
"""处理来自传输层的非 echo 载荷。
Args:
payload: NapCat 推送的原始事件数据。
"""
post_type = str(payload.get("post_type") or "").strip()
if post_type == "message":
await self._handle_inbound_message(payload)
return
if post_type == "notice":
await self._handle_notice_event(payload)
return
if post_type == "meta_event":
await self._handle_meta_event(payload)
async def _handle_inbound_message(self, payload: Dict[str, Any]) -> None:
"""处理单条 NapCat 入站消息并注入 Host。
Args:
payload: NapCat / OneBot 推送的原始消息事件。
"""
self._ensure_runtime_components()
settings = self._settings or self._reload_settings()
chat_filter = self._chat_filter
inbound_codec = self._inbound_codec
runtime_state = self._runtime_state
assert chat_filter is not None
assert inbound_codec is not None
assert runtime_state is not None
self_id = str(payload.get("self_id") or "").strip()
if self_id:
await runtime_state.report_connected(self_id, settings.napcat_server)
sender = payload.get("sender", {})
if not isinstance(sender, Mapping):
sender = {}
sender_user_id = str(payload.get("user_id") or sender.get("user_id") or "").strip()
if not sender_user_id:
return
group_id = str(payload.get("group_id") or "").strip()
if self_id and sender_user_id == self_id and settings.filters.ignore_self_message:
return
if not chat_filter.is_inbound_chat_allowed(sender_user_id, group_id, settings.chat):
return
message_dict = await inbound_codec.build_message_dict(payload, self_id, sender_user_id, sender)
route_metadata: Dict[str, Any] = {}
if self_id:
route_metadata["self_id"] = self_id
if settings.napcat_server.connection_id:
route_metadata["connection_id"] = settings.napcat_server.connection_id
external_message_id = str(payload.get("message_id") or "").strip()
accepted = await self.ctx.adapter.receive_external_message(
message_dict,
route_metadata=route_metadata,
external_message_id=external_message_id,
dedupe_key=external_message_id,
)
if not accepted:
self.ctx.logger.debug(f"Host 丢弃了 NapCat 入站消息: {external_message_id or '无消息 ID'}")
async def _handle_notice_event(self, payload: Dict[str, Any]) -> None:
"""处理 NapCat ``notice`` 事件并注入 Host。
Args:
payload: NapCat 推送的通知事件。
"""
self._ensure_runtime_components()
notice_codec = self._notice_codec
runtime_state = self._runtime_state
settings = self._settings or self._reload_settings()
assert notice_codec is not None
assert runtime_state is not None
self_id = str(payload.get("self_id") or "").strip()
if self_id:
await runtime_state.report_connected(self_id, settings.napcat_server)
message_dict = await notice_codec.build_notice_message_dict(payload)
if message_dict is None:
return
route_metadata: Dict[str, Any] = {}
if self_id:
route_metadata["self_id"] = self_id
if settings.napcat_server.connection_id:
route_metadata["connection_id"] = settings.napcat_server.connection_id
external_message_id = str(payload.get("message_id") or payload.get("notice_type") or "").strip()
accepted = await self.ctx.adapter.receive_external_message(
message_dict,
route_metadata=route_metadata,
external_message_id=external_message_id or None,
dedupe_key=external_message_id or None,
)
if not accepted:
self.ctx.logger.debug(f"Host 丢弃了 NapCat 通知事件: {external_message_id or '无消息 ID'}")
async def _handle_meta_event(self, payload: Dict[str, Any]) -> None:
"""处理 NapCat ``meta_event`` 事件。
Args:
payload: NapCat 推送的元事件。
"""
self._ensure_runtime_components()
notice_codec = self._notice_codec
runtime_state = self._runtime_state
settings = self._settings or self._reload_settings()
assert notice_codec is not None
assert runtime_state is not None
self_id = str(payload.get("self_id") or "").strip()
if self_id:
await runtime_state.report_connected(self_id, settings.napcat_server)
await notice_codec.handle_meta_event(payload)
async def _bootstrap_adapter_runtime_state(self) -> None:
"""在连接建立后主动获取账号信息并激活适配器路由。"""
transport = self._transport
query_service = self._query_service
runtime_state = self._runtime_state
settings = self._settings or self._reload_settings()
if transport is None or query_service is None or runtime_state is None:
return
max_attempts = 3
last_error: Optional[Exception] = None
for attempt in range(1, max_attempts + 1):
try:
login_info = await query_service.get_login_info()
self_id = self._extract_self_id_from_login_response(login_info)
await runtime_state.report_connected(self_id, settings.napcat_server)
return
except asyncio.CancelledError:
raise
except Exception as exc:
last_error = exc
self.ctx.logger.warning(
f"NapCat 适配器获取登录信息失败,第 {attempt}/{max_attempts} 次重试: {exc}"
)
if attempt < max_attempts:
await asyncio.sleep(1.0)
if last_error is not None:
self.ctx.logger.error(f"NapCat 适配器未能完成路由激活,连接将保持只接收状态: {last_error}")
async def _handle_transport_disconnected(self) -> None:
"""处理传输层断开事件。"""
runtime_state = self._runtime_state
if runtime_state is not None:
await runtime_state.report_disconnected()
@staticmethod
def _extract_self_id_from_login_response(response: Optional[Dict[str, Any]]) -> str:
"""从 ``get_login_info`` 查询结果中提取当前账号 ID。
Args:
response: NapCat 返回的登录信息字典。
Returns:
str: 规范化后的账号 ID 字符串。
Raises:
ValueError: 当响应中缺少有效账号 ID 时抛出。
"""
if not isinstance(response, Mapping):
raise ValueError("get_login_info 响应缺少 data 字段")
self_id = str(response.get("user_id") or "").strip()
if not self_id:
raise ValueError("get_login_info 响应缺少有效的 user_id")
return self_id
def create_plugin() -> NapCatAdapterPlugin:
"""创建插件实例。
Returns:
NapCatAdapterPlugin: NapCat 内置适配器插件实例。
"""
return NapCatAdapterPlugin()

View File

@@ -1,224 +0,0 @@
"""NapCat QQ 平台通知与元事件处理。"""
from typing import Any, Dict, Mapping, Optional
from uuid import uuid4
import time
from napcat_adapter.qq_queries import NapCatQueryService
class NapCatNoticeCodec:
"""NapCat QQ 通知事件编码器。"""
def __init__(self, logger: Any, query_service: NapCatQueryService) -> None:
"""初始化通知事件编码器。
Args:
logger: 插件日志对象。
query_service: QQ 查询服务。
"""
self._logger = logger
self._query_service = query_service
async def build_notice_message_dict(self, payload: Mapping[str, Any]) -> Optional[Dict[str, Any]]:
"""将 NapCat ``notice`` 事件转换为 Host 可接受的消息字典。
Args:
payload: NapCat 推送的原始通知事件。
Returns:
Optional[Dict[str, Any]]: 成功时返回标准 ``MessageDict``;无法识别时返回 ``None``。
"""
notice_type = str(payload.get("notice_type") or "").strip()
if not notice_type:
return None
group_id = str(payload.get("group_id") or "").strip()
user_id = str(payload.get("user_id") or payload.get("operator_id") or "").strip()
self_id = str(payload.get("self_id") or "").strip()
user_info = await self._build_user_info(group_id=group_id, user_id=user_id)
group_info = await self._build_group_info(group_id)
notice_text = self._build_notice_text(payload, user_info.get("user_nickname", user_id or "系统"))
if not notice_text:
return None
additional_config: Dict[str, Any] = {
"self_id": self_id,
"napcat_notice_type": notice_type,
"napcat_notice_sub_type": str(payload.get("sub_type") or "").strip(),
"napcat_notice_payload": dict(payload),
}
if group_id:
additional_config["platform_io_target_group_id"] = group_id
elif user_id:
additional_config["platform_io_target_user_id"] = user_id
message_info: Dict[str, Any] = {"user_info": user_info, "additional_config": additional_config}
if group_info is not None:
message_info["group_info"] = group_info
timestamp_seconds = payload.get("time")
if not isinstance(timestamp_seconds, (int, float)):
timestamp_seconds = time.time()
return {
"message_id": f"napcat-notice-{uuid4().hex}",
"timestamp": str(float(timestamp_seconds)),
"platform": "qq",
"message_info": message_info,
"raw_message": [{"type": "text", "data": notice_text}],
"is_mentioned": False,
"is_at": False,
"is_emoji": False,
"is_picture": False,
"is_command": False,
"is_notify": True,
"session_id": "",
"processed_plain_text": notice_text,
"display_message": notice_text,
}
async def handle_meta_event(self, payload: Mapping[str, Any]) -> None:
"""处理 ``meta_event`` 事件的日志与状态观测。
Args:
payload: NapCat 推送的原始元事件。
"""
meta_event_type = str(payload.get("meta_event_type") or "").strip()
self_id = str(payload.get("self_id") or "").strip() or "unknown"
if meta_event_type == "lifecycle":
sub_type = str(payload.get("sub_type") or "").strip()
if sub_type == "connect":
self._logger.info(f"NapCat 元事件Bot {self_id} 已建立连接")
else:
self._logger.debug(f"NapCat 生命周期事件: self_id={self_id} sub_type={sub_type}")
return
if meta_event_type == "heartbeat":
status = payload.get("status", {})
if not isinstance(status, Mapping):
status = {}
is_online = bool(status.get("online", False))
is_good = bool(status.get("good", False))
interval_ms = payload.get("interval")
self._logger.debug(
f"NapCat 心跳事件: self_id={self_id} online={is_online} good={is_good} interval={interval_ms}"
)
if not is_online:
self._logger.warning(f"NapCat 心跳显示 Bot {self_id} 已离线")
elif not is_good:
self._logger.warning(f"NapCat 心跳显示 Bot {self_id} 状态异常")
async def _build_user_info(self, group_id: str, user_id: str) -> Dict[str, Optional[str]]:
"""构造通知消息的用户信息。
Args:
group_id: 群号;私聊或系统通知时为空字符串。
user_id: 事件关联用户号。
Returns:
Dict[str, Optional[str]]: 规范化后的用户信息字典。
"""
if not user_id:
return {
"user_id": "notice",
"user_nickname": "系统通知",
"user_cardname": None,
}
member_info: Optional[Dict[str, Any]]
if group_id:
member_info = await self._query_service.get_group_member_info(group_id, user_id)
else:
member_info = await self._query_service.get_stranger_info(user_id)
if member_info is None:
return {
"user_id": user_id,
"user_nickname": user_id,
"user_cardname": None,
}
return {
"user_id": user_id,
"user_nickname": str(member_info.get("nickname") or user_id),
"user_cardname": self._normalize_optional_string(member_info.get("card")),
}
async def _build_group_info(self, group_id: str) -> Optional[Dict[str, str]]:
"""构造通知消息的群信息。
Args:
group_id: 群号。
Returns:
Optional[Dict[str, str]]: 群信息字典;若不是群通知则返回 ``None``。
"""
if not group_id:
return None
group_info = await self._query_service.get_group_info(group_id)
group_name = str(group_info.get("group_name") or f"group_{group_id}") if group_info else f"group_{group_id}"
return {"group_id": group_id, "group_name": group_name}
def _build_notice_text(self, payload: Mapping[str, Any], actor_name: str) -> str:
"""根据 NapCat 通知事件生成可读文本。
Args:
payload: 原始通知事件。
actor_name: 事件操作者显示名。
Returns:
str: 生成的可读通知文本。
"""
notice_type = str(payload.get("notice_type") or "").strip()
sub_type = str(payload.get("sub_type") or "").strip()
target_id = str(payload.get("target_id") or "").strip()
if notice_type in {"group_recall", "friend_recall"}:
return f"{actor_name} 撤回了一条消息"
if notice_type == "notify" and sub_type == "poke":
target_text = f" -> {target_id}" if target_id else ""
return f"{actor_name} 发起了戳一戳{target_text}"
if notice_type == "notify" and sub_type == "group_name":
return f"{actor_name} 修改了群名称"
if notice_type == "group_ban" and sub_type == "ban":
duration = payload.get("duration")
return f"{actor_name} 触发了群禁言,时长 {duration}"
if notice_type == "group_ban" and sub_type == "lift_ban":
return f"{actor_name} 触发了解除禁言"
if notice_type == "group_upload":
file_info = payload.get("file", {})
file_name = ""
if isinstance(file_info, Mapping):
file_name = str(file_info.get("name") or "").strip()
return f"{actor_name} 上传了文件{f'{file_name}' if file_name else ''}"
if notice_type == "group_increase":
return f"{actor_name} 加入了群聊"
if notice_type == "group_decrease":
return f"{actor_name} 离开了群聊"
if notice_type == "group_admin":
return f"{actor_name} 的群管理员状态发生变化"
if notice_type == "essence":
return f"{actor_name} 触发了精华消息事件"
if notice_type == "group_msg_emoji_like":
return f"{actor_name} 给一条消息添加了表情回应"
return f"[notice] {notice_type}.{sub_type}".strip(".")
@staticmethod
def _normalize_optional_string(value: Any) -> Optional[str]:
"""将任意值规范化为可选字符串。
Args:
value: 待规范化的值。
Returns:
Optional[str]: 规范化后的字符串;若值为空则返回 ``None``。
"""
if value is None:
return None
normalized_value = str(value).strip()
return normalized_value if normalized_value else None

View File

@@ -1,170 +0,0 @@
"""NapCat QQ 平台查询能力。"""
from typing import TYPE_CHECKING, Any, Dict, Optional
import asyncio
if TYPE_CHECKING:
from napcat_adapter.transport import NapCatTransportClient
try:
from aiohttp import ClientSession, ClientTimeout
AIOHTTP_AVAILABLE = True
except ImportError:
ClientSession = None # type: ignore[assignment]
ClientTimeout = None # type: ignore[assignment]
AIOHTTP_AVAILABLE = False
class NapCatQueryService:
"""NapCat QQ 平台查询服务。"""
def __init__(self, logger: Any, transport: "NapCatTransportClient") -> None:
"""初始化查询服务。
Args:
logger: 插件日志对象。
transport: NapCat 传输层客户端。
"""
self._logger = logger
self._transport = transport
async def get_login_info(self) -> Optional[Dict[str, Any]]:
"""获取当前登录账号信息。
Returns:
Optional[Dict[str, Any]]: 登录信息字典;失败时返回 ``None``。
"""
return await self._call_query("get_login_info", {})
async def get_group_info(self, group_id: str) -> Optional[Dict[str, Any]]:
"""获取群信息。
Args:
group_id: 群号。
Returns:
Optional[Dict[str, Any]]: 群信息字典;失败时返回 ``None``。
"""
return await self._call_query("get_group_info", {"group_id": group_id})
async def get_group_member_info(self, group_id: str, user_id: str) -> Optional[Dict[str, Any]]:
"""获取群成员信息。
Args:
group_id: 群号。
user_id: 用户号。
Returns:
Optional[Dict[str, Any]]: 群成员信息字典;失败时返回 ``None``。
"""
return await self._call_query(
"get_group_member_info",
{"group_id": group_id, "user_id": user_id, "no_cache": True},
)
async def get_stranger_info(self, user_id: str) -> Optional[Dict[str, Any]]:
"""获取陌生人信息。
Args:
user_id: 用户号。
Returns:
Optional[Dict[str, Any]]: 陌生人信息字典;失败时返回 ``None``。
"""
return await self._call_query("get_stranger_info", {"user_id": user_id})
async def get_message_detail(self, message_id: str) -> Optional[Dict[str, Any]]:
"""获取消息详情。
Args:
message_id: 消息 ID。
Returns:
Optional[Dict[str, Any]]: 消息详情字典;失败时返回 ``None``。
"""
return await self._call_query("get_msg", {"message_id": message_id})
async def get_forward_message(self, message_id: str) -> Optional[Dict[str, Any]]:
"""获取合并转发消息详情。
Args:
message_id: 转发消息 ID。
Returns:
Optional[Dict[str, Any]]: 合并转发消息详情;失败时返回 ``None``。
"""
return await self._call_query("get_forward_msg", {"message_id": message_id})
async def get_record_detail(self, file_name: str, file_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""获取语音文件详情。
Args:
file_name: 语音文件名。
file_id: 可选文件 ID。
Returns:
Optional[Dict[str, Any]]: 语音详情字典;失败时返回 ``None``。
"""
params: Dict[str, Any] = {"file": file_name, "out_format": "wav"}
if file_id:
params["file_id"] = file_id
return await self._call_query("get_record", params)
async def download_binary(self, url: str) -> Optional[bytes]:
"""下载远程二进制资源。
Args:
url: 资源 URL。
Returns:
Optional[bytes]: 下载到的二进制内容;失败时返回 ``None``。
"""
if not url:
return None
if not AIOHTTP_AVAILABLE or ClientSession is None or ClientTimeout is None:
self._logger.warning("NapCat 查询层缺少 aiohttp无法下载远程资源")
return None
try:
timeout = ClientTimeout(total=15)
async with ClientSession(timeout=timeout) as session:
async with session.get(url) as response:
if response.status != 200:
self._logger.warning(f"NapCat 远程资源下载失败: status={response.status} url={url}")
return None
return await response.read()
except asyncio.CancelledError:
raise
except Exception as exc:
self._logger.warning(f"NapCat 远程资源下载失败: {exc}")
return None
async def _call_query(self, action_name: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""调用 OneBot 查询动作并提取 ``data`` 字段。
Args:
action_name: OneBot 动作名。
params: 动作参数。
Returns:
Optional[Dict[str, Any]]: 查询结果中的 ``data`` 字段;失败时返回 ``None``。
"""
try:
response = await self._transport.call_action(action_name, params)
except asyncio.CancelledError:
raise
except Exception as exc:
self._logger.warning(f"NapCat 查询动作执行失败: action={action_name} error={exc}")
return None
if str(response.get("status") or "").lower() != "ok":
self._logger.warning(
f"NapCat 查询动作返回失败: action={action_name} "
f"message={response.get('wording') or response.get('message') or 'unknown'}"
)
return None
response_data = response.get("data")
return response_data if isinstance(response_data, dict) else None

View File

@@ -1,85 +0,0 @@
"""NapCat 运行时路由状态管理。"""
from typing import Any, Optional
from napcat_adapter.config import NapCatServerConfig
class NapCatRuntimeStateManager:
"""NapCat 适配器路由状态上报器。"""
def __init__(self, adapter_capability: Any, logger: Any) -> None:
"""初始化运行时状态管理器。
Args:
adapter_capability: SDK 提供的适配器能力对象。
logger: 插件日志对象。
"""
self._adapter_capability = adapter_capability
self._logger = logger
self._runtime_state_connected: bool = False
self._reported_account_id: Optional[str] = None
self._reported_scope: Optional[str] = None
async def report_connected(self, account_id: str, server_config: NapCatServerConfig) -> bool:
"""向 Host 上报当前连接已就绪。
Args:
account_id: 当前 NapCat 连接对应的机器人账号 ID。
server_config: 当前生效的 NapCat 服务端配置。
Returns:
bool: 若 Host 接受了运行时状态更新,则返回 ``True``。
"""
normalized_account_id = str(account_id).strip()
if not normalized_account_id:
return False
scope = server_config.connection_id or None
if (
self._runtime_state_connected
and self._reported_account_id == normalized_account_id
and self._reported_scope == scope
):
return True
accepted = False
try:
accepted = await self._adapter_capability.update_runtime_state(
connected=True,
account_id=normalized_account_id,
scope=server_config.connection_id,
metadata={"ws_url": server_config.build_ws_url()},
)
except Exception as exc:
self._logger.warning(f"NapCat 适配器上报连接就绪状态失败: {exc}")
return False
if not accepted:
self._logger.warning("NapCat 适配器连接已建立,但 Host 未接受运行时状态更新")
return False
self._runtime_state_connected = True
self._reported_account_id = normalized_account_id
self._reported_scope = scope
self._logger.info(
f"NapCat 适配器已激活路由: platform=qq account_id={normalized_account_id} "
f"scope={self._reported_scope or '*'}"
)
return True
async def report_disconnected(self) -> None:
"""向 Host 上报当前连接已断开,并撤销适配器路由。"""
if not self._runtime_state_connected:
self._reported_account_id = None
self._reported_scope = None
return
try:
await self._adapter_capability.update_runtime_state(connected=False)
except Exception as exc:
self._logger.warning(f"NapCat 适配器上报断开状态失败: {exc}")
finally:
self._runtime_state_connected = False
self._reported_account_id = None
self._reported_scope = None

View File

@@ -1,322 +0,0 @@
"""NapCat 正向 WebSocket 传输层。"""
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Set, cast
from uuid import uuid4
import asyncio
import contextlib
import json
from napcat_adapter.config import NapCatServerConfig
if TYPE_CHECKING:
from aiohttp import ClientWebSocketResponse as AiohttpClientWebSocketResponse
try:
from aiohttp import ClientSession, ClientTimeout, WSMsgType
AIOHTTP_AVAILABLE = True
except ImportError:
ClientSession = cast(Any, None)
ClientTimeout = cast(Any, None)
WSMsgType = cast(Any, None)
AIOHTTP_AVAILABLE = False
if not TYPE_CHECKING:
AiohttpClientWebSocketResponse = Any
class NapCatTransportClient:
"""NapCat 正向 WebSocket 客户端。"""
def __init__(
self,
logger: Any,
on_connection_opened: Callable[[], Awaitable[None]],
on_connection_closed: Callable[[], Awaitable[None]],
on_payload: Callable[[Dict[str, Any]], Awaitable[None]],
) -> None:
"""初始化传输层客户端。
Args:
logger: 插件日志对象。
on_connection_opened: 连接建立后的异步回调。
on_connection_closed: 连接断开后的异步回调。
on_payload: 收到非 echo 载荷后的异步回调。
"""
self._logger = logger
self._on_connection_opened = on_connection_opened
self._on_connection_closed = on_connection_closed
self._on_payload = on_payload
self._server_config: Optional[NapCatServerConfig] = None
self._connection_task: Optional[asyncio.Task[None]] = None
self._pending_actions: Dict[str, asyncio.Future[Dict[str, Any]]] = {}
self._background_tasks: Set[asyncio.Task[Any]] = set()
self._send_lock = asyncio.Lock()
self._ws: Optional[AiohttpClientWebSocketResponse] = None
self._stop_requested: bool = False
self._connection_active: bool = False
@classmethod
def is_available(cls) -> bool:
"""判断当前环境是否安装了传输层依赖。
Returns:
bool: 若已安装 ``aiohttp``,则返回 ``True``。
"""
return AIOHTTP_AVAILABLE
def configure(self, server_config: NapCatServerConfig) -> None:
"""更新当前传输层使用的 NapCat 服务端配置。
Args:
server_config: 最新生效的 NapCat 服务端配置。
"""
self._server_config = server_config
async def start(self) -> None:
"""启动 NapCat 正向 WebSocket 连接循环。
Raises:
RuntimeError: 当缺少配置或依赖时抛出。
"""
if not self.is_available():
raise RuntimeError("NapCat 适配器依赖 aiohttp但当前环境未安装该依赖")
if self._server_config is None:
raise RuntimeError("NapCat 适配器尚未配置 napcat_server")
if self._connection_task is not None and not self._connection_task.done():
return
self._stop_requested = False
self._connection_task = asyncio.create_task(self._connection_loop(), name="napcat_adapter.connection")
async def stop(self) -> None:
"""停止当前连接并清理所有后台任务。"""
self._stop_requested = True
connection_task = self._connection_task
self._connection_task = None
ws = self._ws
if ws is not None and not ws.closed:
with contextlib.suppress(Exception):
await ws.close()
self._ws = None
if connection_task is not None:
connection_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await connection_task
await self._cancel_background_tasks()
await self._notify_connection_closed()
self._fail_pending_actions("NapCat connection closed")
async def call_action(self, action_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""发送 OneBot 动作并等待对应的 echo 响应。
Args:
action_name: OneBot 动作名称。
params: 动作参数。
Returns:
Dict[str, Any]: NapCat 返回的原始响应字典。
Raises:
RuntimeError: 当连接不可用时抛出。
"""
ws = self._ws
server_config = self._server_config
if ws is None or ws.closed or server_config is None:
raise RuntimeError("NapCat is not connected")
echo_id = uuid4().hex
loop = asyncio.get_running_loop()
response_future: asyncio.Future[Dict[str, Any]] = loop.create_future()
self._pending_actions[echo_id] = response_future
request_payload = {"action": action_name, "params": params, "echo": echo_id}
try:
async with self._send_lock:
await ws.send_str(json.dumps(request_payload, ensure_ascii=False))
return await asyncio.wait_for(response_future, timeout=server_config.action_timeout_sec)
finally:
self._pending_actions.pop(echo_id, None)
async def _connection_loop(self) -> None:
"""维护单个 WebSocket 连接,并在断开后按配置重连。"""
assert ClientSession is not None
assert ClientTimeout is not None
while not self._stop_requested:
server_config = self._server_config
if server_config is None:
return
ws_url = server_config.build_ws_url()
timeout = ClientTimeout(total=None, connect=10)
try:
async with ClientSession(headers=self._build_headers(server_config), timeout=timeout) as session:
async with session.ws_connect(ws_url, heartbeat=server_config.heartbeat_interval or None) as ws:
self._ws = ws
self._logger.info(f"NapCat 适配器已连接: {ws_url}")
await self._receive_loop(ws)
except asyncio.CancelledError:
raise
except Exception as exc:
self._logger.warning(f"NapCat 适配器连接失败: {exc}")
finally:
self._ws = None
await self._notify_connection_closed()
self._fail_pending_actions("NapCat connection interrupted")
if self._stop_requested:
break
await asyncio.sleep(server_config.reconnect_delay_sec)
async def _receive_loop(self, ws: AiohttpClientWebSocketResponse) -> None:
"""持续消费 WebSocket 消息并分发处理。
Args:
ws: 当前活跃的 WebSocket 连接对象。
"""
assert WSMsgType is not None
bootstrap_task = self._create_background_task(
self._notify_connection_opened(),
"napcat_adapter.bootstrap",
)
try:
async for ws_message in ws:
if ws_message.type != WSMsgType.TEXT:
if ws_message.type in {WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.ERROR}:
break
continue
payload = self._parse_json_message(ws_message.data)
if payload is None:
continue
if echo_id := str(payload.get("echo") or "").strip():
self._resolve_pending_action(echo_id, payload)
continue
self._create_background_task(self._on_payload(payload), "napcat_adapter.payload")
finally:
if bootstrap_task is not None and not bootstrap_task.done():
bootstrap_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await bootstrap_task
def _create_background_task(self, coroutine: Awaitable[Any], name: str) -> asyncio.Task[Any]:
"""创建并跟踪一个后台任务。
Args:
coroutine: 待执行的协程对象。
name: 任务名。
Returns:
asyncio.Task[Any]: 已创建的后台任务。
"""
task = asyncio.create_task(coroutine, name=name)
self._background_tasks.add(task)
task.add_done_callback(self._handle_background_task_completion)
return task
def _handle_background_task_completion(self, task: asyncio.Task[Any]) -> None:
"""处理后台任务结束后的清理与异常记录。
Args:
task: 已结束的后台任务。
"""
self._background_tasks.discard(task)
if task.cancelled():
return
exception = task.exception()
if exception is not None:
self._logger.error(f"NapCat 适配器后台任务异常: {exception}", exc_info=True)
async def _cancel_background_tasks(self) -> None:
"""取消所有仍在运行的后台任务。"""
background_tasks = list(self._background_tasks)
for task in background_tasks:
task.cancel()
if background_tasks:
with contextlib.suppress(Exception):
await asyncio.gather(*background_tasks, return_exceptions=True)
self._background_tasks.clear()
async def _notify_connection_opened(self) -> None:
"""在连接建立后触发上层回调。"""
if self._connection_active:
return
self._connection_active = True
try:
await self._on_connection_opened()
except Exception as exc:
self._logger.warning(f"NapCat 适配器连接建立回调失败: {exc}")
async def _notify_connection_closed(self) -> None:
"""在连接断开后触发上层回调。"""
if not self._connection_active:
return
self._connection_active = False
try:
await self._on_connection_closed()
except Exception as exc:
self._logger.warning(f"NapCat 适配器断连回调失败: {exc}")
def _resolve_pending_action(self, echo_id: str, payload: Dict[str, Any]) -> None:
"""解析等待中的动作响应。
Args:
echo_id: 动作请求对应的 echo 标识。
payload: NapCat 返回的响应载荷。
"""
response_future = self._pending_actions.get(echo_id)
if response_future is None or response_future.done():
return
response_future.set_result(payload)
def _fail_pending_actions(self, error_message: str) -> None:
"""让所有等待中的动作以异常方式结束。
Args:
error_message: 写入异常中的错误信息。
"""
for response_future in self._pending_actions.values():
if not response_future.done():
response_future.set_exception(RuntimeError(error_message))
self._pending_actions.clear()
def _build_headers(self, server_config: NapCatServerConfig) -> Dict[str, str]:
"""构造连接 NapCat 所需的请求头。
Args:
server_config: 当前生效的 NapCat 服务端配置。
Returns:
Dict[str, str]: WebSocket 握手请求头。
"""
return {"Authorization": f"Bearer {server_config.token}"} if server_config.token else {}
def _parse_json_message(self, data: Any) -> Optional[Dict[str, Any]]:
"""解析 WebSocket 文本消息中的 JSON 数据。
Args:
data: WebSocket 收到的原始文本数据。
Returns:
Optional[Dict[str, Any]]: 成功时返回字典,失败时返回 ``None``。
"""
try:
payload = json.loads(str(data))
except Exception as exc:
self._logger.warning(f"NapCat 适配器解析 JSON 载荷失败: {exc}")
return None
return payload if isinstance(payload, dict) else None