Files
mai-bot/pytests/test_napcat_adapter_sdk.py

511 lines
18 KiB
Python

"""NapCat 插件与新 SDK 对接测试。"""
from importlib import import_module, util
from pathlib import Path
from typing import Any, Dict, List, Tuple
import logging
import sys
from types import SimpleNamespace
import pytest
PROJECT_ROOT = Path(__file__).resolve().parents[1]
PLUGINS_ROOT = PROJECT_ROOT / "plugins"
SDK_ROOT = PROJECT_ROOT / "packages" / "maibot-plugin-sdk"
NAPCAT_PLUGIN_DIR = PLUGINS_ROOT / "MaiBot-Napcat-Adapter"
NAPCAT_TEST_MODULE = "_test_napcat_adapter"
for import_path in (str(SDK_ROOT),):
if import_path not in sys.path:
sys.path.insert(0, import_path)
class _FakeGatewayCapability:
"""用于捕获消息网关状态上报的测试替身。"""
def __init__(self) -> None:
"""初始化测试替身。"""
self.calls: List[Dict[str, Any]] = []
async def update_state(
self,
gateway_name: str,
*,
ready: bool,
platform: str = "",
account_id: str = "",
scope: str = "",
metadata: Dict[str, Any] | None = None,
) -> bool:
"""记录一次状态上报请求。
Args:
gateway_name: 网关组件名称。
ready: 当前是否就绪。
platform: 平台名称。
account_id: 账号 ID。
scope: 路由作用域。
metadata: 附加元数据。
Returns:
bool: 始终返回 ``True``,模拟 Host 接受状态更新。
"""
self.calls.append(
{
"gateway_name": gateway_name,
"ready": ready,
"platform": platform,
"account_id": account_id,
"scope": scope,
"metadata": metadata or {},
}
)
return True
class _FakeNapCatQueryService:
"""用于驱动 NapCat 入站编解码测试的查询服务替身。"""
def __init__(self, forward_payloads: Dict[str, Any] | None = None) -> None:
"""初始化查询服务替身。
Args:
forward_payloads: 预置的合并转发响应映射。
"""
self._forward_payloads = forward_payloads or {}
async def download_binary(self, url: str) -> bytes | None:
"""模拟下载远程二进制资源。
Args:
url: 资源地址。
Returns:
bytes | None: 测试中默认不返回二进制内容。
"""
del url
return None
async def get_message_detail(self, message_id: str) -> Dict[str, Any] | None:
"""模拟获取消息详情。
Args:
message_id: 消息 ID。
Returns:
Dict[str, Any] | None: 测试中默认不返回详情。
"""
del message_id
return None
async def get_forward_message(self, message_id: str) -> Any:
"""模拟获取合并转发消息详情。
Args:
message_id: 转发消息 ID。
Returns:
Any: 预置的合并转发消息详情。
"""
return self._forward_payloads.get(message_id)
async def get_record_detail(self, file_name: str, file_id: str | None = None) -> Dict[str, Any] | None:
"""模拟获取语音详情。
Args:
file_name: 文件名。
file_id: 文件 ID。
Returns:
Dict[str, Any] | None: 测试中默认不返回语音详情。
"""
del file_name
del file_id
return None
class _FakeNapCatActionService:
"""用于驱动 NapCat 查询服务测试的动作服务替身。"""
def __init__(self, response_data: Any) -> None:
"""初始化动作服务替身。
Args:
response_data: 预置的 ``safe_call_action_data`` 返回值。
"""
self._response_data = response_data
async def safe_call_action_data(self, action_name: str, params: Dict[str, Any]) -> Any:
"""模拟安全调用 OneBot 动作。
Args:
action_name: 动作名称。
params: 动作参数。
Returns:
Any: 预置返回值。
"""
del action_name
del params
return self._response_data
def _load_napcat_sdk_modules() -> Tuple[Any, Any, Any, Any]:
"""动态加载 NapCat 插件测试所需的模块。
Returns:
tuple[Any, Any, Any, Any]:
依次返回常量模块、配置模块、插件模块和运行时状态模块。
"""
if NAPCAT_TEST_MODULE not in sys.modules:
plugin_path = NAPCAT_PLUGIN_DIR / "plugin.py"
spec = util.spec_from_file_location(
NAPCAT_TEST_MODULE,
plugin_path,
submodule_search_locations=[str(NAPCAT_PLUGIN_DIR)],
)
if spec is None or spec.loader is None:
raise ImportError(f"无法为 NapCat 插件创建模块规格: {plugin_path}")
module = util.module_from_spec(spec)
sys.modules[NAPCAT_TEST_MODULE] = module
try:
spec.loader.exec_module(module)
except Exception:
sys.modules.pop(NAPCAT_TEST_MODULE, None)
raise
return (
import_module(f"{NAPCAT_TEST_MODULE}.constants"),
import_module(f"{NAPCAT_TEST_MODULE}.config"),
import_module(f"{NAPCAT_TEST_MODULE}.plugin"),
import_module(f"{NAPCAT_TEST_MODULE}.runtime_state"),
)
def _load_napcat_sdk_symbols() -> Tuple[Any, Any, Any, Any]:
"""动态加载 NapCat 插件测试所需的符号。
Returns:
tuple[Any, Any, Any, Any]:
依次返回网关名常量、配置类、插件类和运行时状态管理器类。
"""
constants_module, config_module, plugin_module, runtime_state_module = _load_napcat_sdk_modules()
return (
constants_module.NAPCAT_GATEWAY_NAME,
config_module.NapCatServerConfig,
plugin_module.NapCatAdapterPlugin,
runtime_state_module.NapCatRuntimeStateManager,
)
def _load_napcat_inbound_codec_cls() -> Any:
"""动态加载 NapCat 入站编解码器类。
Returns:
Any: ``NapCatInboundCodec`` 类对象。
"""
_load_napcat_sdk_modules()
codec_module = import_module(f"{NAPCAT_TEST_MODULE}.codecs.inbound.message_codec")
return codec_module.NapCatInboundCodec
def _load_napcat_query_service_cls() -> Any:
"""动态加载 NapCat 查询服务类。
Returns:
Any: ``NapCatQueryService`` 类对象。
"""
_load_napcat_sdk_modules()
query_service_module = import_module(f"{NAPCAT_TEST_MODULE}.services.query_service")
return query_service_module.NapCatQueryService
def test_napcat_plugin_collects_duplex_message_gateway() -> None:
"""NapCat 插件应声明新的双工消息网关组件。"""
napcat_gateway_name, _napcat_server_config, napcat_plugin_cls, _runtime_state_cls = _load_napcat_sdk_symbols()
plugin = napcat_plugin_cls()
components = plugin.get_components()
gateway_components = [
component
for component in components
if component.get("type") == "MESSAGE_GATEWAY"
]
assert len(gateway_components) == 1
gateway_component = gateway_components[0]
assert gateway_component["name"] == napcat_gateway_name
assert gateway_component["metadata"]["route_type"] == "duplex"
assert gateway_component["metadata"]["platform"] == "qq"
assert gateway_component["metadata"]["protocol"] == "napcat"
def test_napcat_plugin_uses_sdk_config_model() -> None:
"""NapCat 插件应声明 SDK 配置模型并暴露默认配置与 Schema。"""
constants_module, _config_module, plugin_module, _runtime_state_module = _load_napcat_sdk_modules()
plugin = plugin_module.NapCatAdapterPlugin()
default_config = plugin.get_default_config()
schema = plugin.get_webui_config_schema(plugin_id="maibot-team.napcat-adapter")
assert default_config["plugin"]["config_version"] == constants_module.SUPPORTED_CONFIG_VERSION
assert default_config["chat"]["ban_qq_bot"] is False
assert default_config["filters"]["ignore_self_message"] is True
assert schema["plugin_id"] == "maibot-team.napcat-adapter"
assert schema["sections"]["chat"]["fields"]["group_list"]["type"] == "array"
assert schema["sections"]["chat"]["fields"]["group_list_type"]["choices"] == ["whitelist", "blacklist"]
def test_napcat_plugin_normalizes_legacy_config_values() -> None:
"""NapCat 插件应兼容旧配置字段并输出规范化结果。"""
constants_module, _config_module, plugin_module, _runtime_state_module = _load_napcat_sdk_modules()
plugin = plugin_module.NapCatAdapterPlugin()
plugin.set_plugin_config(
{
"plugin": {"enabled": True, "config_version": constants_module.SUPPORTED_CONFIG_VERSION},
"connection": {
"access_token": "secret-token",
"heartbeat_sec": "45",
"ws_url": "ws://10.0.0.8:3012/onebot/v11/ws",
},
"chat": {
"ban_qq_bot": True,
"ban_user_id": ["42", 42, ""],
"group_list": [123, " 456 ", None, "123"],
"group_list_type": "whitelist",
"private_list": "invalid",
"private_list_type": "unexpected",
},
"filters": {"ignore_self_message": True},
}
)
config_data = plugin.get_plugin_config_data()
assert "connection" not in config_data
assert config_data["plugin"]["config_version"] == constants_module.SUPPORTED_CONFIG_VERSION
assert config_data["napcat_server"]["host"] == "10.0.0.8"
assert config_data["napcat_server"]["port"] == 3012
assert config_data["napcat_server"]["token"] == "secret-token"
assert config_data["napcat_server"]["heartbeat_interval"] == 45.0
assert config_data["chat"]["group_list"] == ["123", "456"]
assert config_data["chat"]["private_list"] == []
assert config_data["chat"]["private_list_type"] == constants_module.DEFAULT_CHAT_LIST_TYPE
assert plugin.config.napcat_server.build_ws_url() == "ws://10.0.0.8:3012"
@pytest.mark.asyncio
async def test_runtime_state_reports_via_gateway_capability() -> None:
"""NapCat 运行时状态应通过新的消息网关能力上报。"""
napcat_gateway_name, napcat_server_config_cls, _napcat_plugin_cls, runtime_state_cls = _load_napcat_sdk_symbols()
gateway_capability = _FakeGatewayCapability()
runtime_state_manager = runtime_state_cls(
gateway_capability=gateway_capability,
logger=logging.getLogger("test.napcat_adapter"),
gateway_name=napcat_gateway_name,
)
connected = await runtime_state_manager.report_connected(
"10001",
napcat_server_config_cls(connection_id="primary"),
)
await runtime_state_manager.report_disconnected()
assert connected is True
assert gateway_capability.calls[0]["gateway_name"] == napcat_gateway_name
assert gateway_capability.calls[0]["ready"] is True
assert gateway_capability.calls[0]["platform"] == "qq"
assert gateway_capability.calls[0]["account_id"] == "10001"
assert gateway_capability.calls[0]["scope"] == "primary"
assert gateway_capability.calls[1]["gateway_name"] == napcat_gateway_name
assert gateway_capability.calls[1]["ready"] is False
assert gateway_capability.calls[1]["platform"] == "qq"
@pytest.mark.asyncio
async def test_napcat_plugin_send_result_contains_message_id_echo_callback() -> None:
"""NapCat 插件发送成功后应显式返回消息 ID 回调数据。"""
_napcat_gateway_name, _napcat_server_config, napcat_plugin_cls, _runtime_state_cls = _load_napcat_sdk_symbols()
plugin = napcat_plugin_cls()
class _FakeOutboundCodec:
"""用于测试的出站编码器替身。"""
@staticmethod
def build_outbound_action(message: Dict[str, Any], route: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]:
"""返回固定动作与参数。"""
del message
del route
return "send_msg", {"message": "hello"}
class _FakeTransport:
"""用于测试的传输层替身。"""
@staticmethod
async def call_action(action_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""返回带平台消息 ID 的成功响应。"""
del action_name
del params
return {
"status": "ok",
"data": {
"message_id": "platform-message-id",
},
}
plugin._require_runtime_bundle = lambda: SimpleNamespace( # type: ignore[method-assign]
outbound_codec=_FakeOutboundCodec(),
transport=_FakeTransport(),
)
result = await plugin.handle_napcat_gateway(
message={"message_id": "internal-message-id"},
route={},
)
assert result["success"] is True
assert result["external_message_id"] == "platform-message-id"
assert result["metadata"]["adapter_callbacks"] == [
{
"name": "message_id_echo",
"payload": {
"content": {
"type": "echo",
"echo": "internal-message-id",
"actual_id": "platform-message-id",
}
},
}
]
@pytest.mark.asyncio
async def test_inbound_codec_parses_forward_nodes_from_legacy_message_field() -> None:
"""入站编解码器应兼容旧版 ``sender + message`` 转发节点结构。"""
inbound_codec_cls = _load_napcat_inbound_codec_cls()
codec = inbound_codec_cls(
logger=logging.getLogger("test.napcat_adapter.forward_legacy"),
query_service=_FakeNapCatQueryService(
forward_payloads={
"forward-1": {
"messages": [
{
"sender": {"user_id": "10001", "nickname": "张三", "card": "群名片"},
"message_id": "node-1",
"message": [{"type": "text", "data": {"text": "第一条转发"}}],
}
]
}
}
),
)
segments, is_at = await codec.convert_segments(
{"message": [{"type": "forward", "data": {"id": "forward-1"}}]},
"",
)
assert is_at is False
assert len(segments) == 1
assert segments[0]["type"] == "forward"
assert segments[0]["data"][0]["user_id"] == "10001"
assert segments[0]["data"][0]["user_nickname"] == "张三"
assert segments[0]["data"][0]["user_cardname"] == "群名片"
assert segments[0]["data"][0]["content"] == [{"type": "text", "data": "第一条转发"}]
@pytest.mark.asyncio
async def test_inbound_codec_parses_nested_inline_forward_content() -> None:
"""入站编解码器应支持内联 ``content`` 形式的嵌套合并转发。"""
inbound_codec_cls = _load_napcat_inbound_codec_cls()
codec = inbound_codec_cls(
logger=logging.getLogger("test.napcat_adapter.forward_nested"),
query_service=_FakeNapCatQueryService(
forward_payloads={
"forward-outer": {
"messages": [
{
"sender": {"user_id": "10001", "nickname": "张三"},
"message_id": "node-outer",
"message": [
{
"type": "forward",
"data": {
"content": [
{
"sender": {"user_id": "10002", "nickname": "李四"},
"message_id": "node-inner",
"message": [{"type": "text", "data": {"text": "内层消息"}}],
}
]
},
}
],
}
]
}
}
),
)
segments, _ = await codec.convert_segments(
{"message": [{"type": "forward", "data": {"id": "forward-outer"}}]},
"",
)
assert len(segments) == 1
assert segments[0]["type"] == "forward"
outer_content = segments[0]["data"][0]["content"]
assert len(outer_content) == 1
assert outer_content[0]["type"] == "forward"
nested_nodes = outer_content[0]["data"]
assert nested_nodes[0]["user_id"] == "10002"
assert nested_nodes[0]["user_nickname"] == "李四"
assert nested_nodes[0]["content"] == [{"type": "text", "data": "内层消息"}]
@pytest.mark.asyncio
async def test_query_service_normalizes_forward_payload_list() -> None:
"""查询服务应兼容 ``get_forward_msg`` 直接返回节点列表。"""
query_service_cls = _load_napcat_query_service_cls()
query_service = query_service_cls(
action_service=_FakeNapCatActionService(
[
{
"sender": {"user_id": "10001", "nickname": "张三"},
"message_id": "node-1",
"message": [{"type": "text", "data": {"text": "列表返回"}}],
}
]
),
logger=logging.getLogger("test.napcat_adapter.query_service"),
)
forward_payload = await query_service.get_forward_message("forward-1")
assert forward_payload == {
"messages": [
{
"sender": {"user_id": "10001", "nickname": "张三"},
"message_id": "node-1",
"message": [{"type": "text", "data": {"text": "列表返回"}}],
}
]
}