883 lines
31 KiB
Python
883 lines
31 KiB
Python
"""NapCat 插件与新 SDK 对接测试。"""
|
|
|
|
from importlib import import_module, util
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Tuple
|
|
|
|
import logging
|
|
import sys
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
PLUGINS_ROOT = PROJECT_ROOT / "plugins"
|
|
SDK_ROOT = PROJECT_ROOT / "packages" / "maibot-plugin-sdk"
|
|
NAPCAT_PLUGIN_DIR = PLUGINS_ROOT / "MaiBot-Napcat-Adapter"
|
|
NAPCAT_TEST_MODULE = "_test_napcat_adapter"
|
|
|
|
for import_path in (str(SDK_ROOT),):
|
|
if import_path not in sys.path:
|
|
sys.path.insert(0, import_path)
|
|
|
|
|
|
class _FakeGatewayCapability:
|
|
"""用于捕获消息网关状态上报的测试替身。"""
|
|
|
|
def __init__(self) -> None:
|
|
"""初始化测试替身。"""
|
|
|
|
self.calls: List[Dict[str, Any]] = []
|
|
|
|
async def update_state(
|
|
self,
|
|
gateway_name: str,
|
|
*,
|
|
ready: bool,
|
|
platform: str = "",
|
|
account_id: str = "",
|
|
scope: str = "",
|
|
metadata: Dict[str, Any] | None = None,
|
|
) -> bool:
|
|
"""记录一次状态上报请求。
|
|
|
|
Args:
|
|
gateway_name: 网关组件名称。
|
|
ready: 当前是否就绪。
|
|
platform: 平台名称。
|
|
account_id: 账号 ID。
|
|
scope: 路由作用域。
|
|
metadata: 附加元数据。
|
|
|
|
Returns:
|
|
bool: 始终返回 ``True``,模拟 Host 接受状态更新。
|
|
"""
|
|
|
|
self.calls.append(
|
|
{
|
|
"gateway_name": gateway_name,
|
|
"ready": ready,
|
|
"platform": platform,
|
|
"account_id": account_id,
|
|
"scope": scope,
|
|
"metadata": metadata or {},
|
|
}
|
|
)
|
|
return True
|
|
|
|
|
|
class _FakeNapCatQueryService:
|
|
"""用于驱动 NapCat 入站编解码测试的查询服务替身。"""
|
|
|
|
def __init__(
|
|
self,
|
|
forward_payloads: Dict[str, Any] | None = None,
|
|
group_member_payloads: Dict[tuple[str, str], Dict[str, Any] | None] | None = None,
|
|
stranger_payloads: Dict[str, Dict[str, Any] | None] | None = None,
|
|
) -> None:
|
|
"""初始化查询服务替身。
|
|
|
|
Args:
|
|
forward_payloads: 预置的合并转发响应映射。
|
|
group_member_payloads: 预置的群成员资料映射。
|
|
stranger_payloads: 预置的陌生人资料映射。
|
|
"""
|
|
self._forward_payloads = forward_payloads or {}
|
|
self._group_member_payloads = group_member_payloads or {}
|
|
self._stranger_payloads = stranger_payloads or {}
|
|
|
|
async def download_binary(self, url: str) -> bytes | None:
|
|
"""模拟下载远程二进制资源。
|
|
|
|
Args:
|
|
url: 资源地址。
|
|
|
|
Returns:
|
|
bytes | None: 测试中默认不返回二进制内容。
|
|
"""
|
|
del url
|
|
return None
|
|
|
|
async def get_message_detail(self, message_id: str) -> Dict[str, Any] | None:
|
|
"""模拟获取消息详情。
|
|
|
|
Args:
|
|
message_id: 消息 ID。
|
|
|
|
Returns:
|
|
Dict[str, Any] | None: 测试中默认不返回详情。
|
|
"""
|
|
del message_id
|
|
return None
|
|
|
|
async def get_forward_message(
|
|
self,
|
|
message_id: str | None = None,
|
|
forward_id: str | None = None,
|
|
) -> Any:
|
|
"""模拟获取合并转发消息详情。
|
|
|
|
Args:
|
|
message_id: 转发消息 ID。
|
|
forward_id: 兼容字段 ``id``。
|
|
|
|
Returns:
|
|
Any: 预置的合并转发消息详情。
|
|
"""
|
|
return self._forward_payloads.get(forward_id or message_id or "")
|
|
|
|
async def get_group_member_info(
|
|
self,
|
|
group_id: str,
|
|
user_id: str,
|
|
no_cache: bool = True,
|
|
) -> Dict[str, Any] | None:
|
|
"""模拟获取群成员资料。"""
|
|
del no_cache
|
|
return self._group_member_payloads.get((group_id, user_id))
|
|
|
|
async def get_stranger_info(self, user_id: str, no_cache: bool = False) -> Dict[str, Any] | None:
|
|
"""模拟获取 QQ 昵称资料。"""
|
|
del no_cache
|
|
return self._stranger_payloads.get(user_id)
|
|
|
|
async def get_record_detail(
|
|
self,
|
|
file_name: str | None = None,
|
|
file_id: str | None = None,
|
|
out_format: str = "wav",
|
|
) -> Dict[str, Any] | None:
|
|
"""模拟获取语音详情。
|
|
|
|
Args:
|
|
file_name: 文件名。
|
|
file_id: 文件 ID。
|
|
out_format: 输出格式。
|
|
|
|
Returns:
|
|
Dict[str, Any] | None: 测试中默认不返回语音详情。
|
|
"""
|
|
del file_name
|
|
del file_id
|
|
del out_format
|
|
return None
|
|
|
|
|
|
class _FakeNapCatActionService:
|
|
"""用于驱动 NapCat 查询服务测试的动作服务替身。"""
|
|
|
|
def __init__(self, response_data: Any) -> None:
|
|
"""初始化动作服务替身。
|
|
|
|
Args:
|
|
response_data: 预置的 ``safe_call_action_data`` 返回值。
|
|
"""
|
|
self._response_data = response_data
|
|
self.action_calls: List[Dict[str, Any]] = []
|
|
self.action_data_calls: List[Dict[str, Any]] = []
|
|
|
|
async def safe_call_action_data(self, action_name: str, params: Dict[str, Any]) -> Any:
|
|
"""模拟安全调用 OneBot 动作。
|
|
|
|
Args:
|
|
action_name: 动作名称。
|
|
params: 动作参数。
|
|
|
|
Returns:
|
|
Any: 预置返回值。
|
|
"""
|
|
self.action_data_calls.append({"action_name": action_name, "params": dict(params)})
|
|
return self._response_data
|
|
|
|
async def call_action(self, action_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""模拟调用 OneBot 动作并记录参数。"""
|
|
|
|
self.action_calls.append({"action_name": action_name, "params": dict(params)})
|
|
return {"status": "ok", "retcode": 0, "data": {}}
|
|
|
|
|
|
def _load_napcat_sdk_modules() -> Tuple[Any, Any, Any, Any]:
|
|
"""动态加载 NapCat 插件测试所需的模块。
|
|
|
|
Returns:
|
|
tuple[Any, Any, Any, Any]:
|
|
依次返回常量模块、配置模块、插件模块和运行时状态模块。
|
|
"""
|
|
|
|
if NAPCAT_TEST_MODULE not in sys.modules:
|
|
plugin_path = NAPCAT_PLUGIN_DIR / "plugin.py"
|
|
spec = util.spec_from_file_location(
|
|
NAPCAT_TEST_MODULE,
|
|
plugin_path,
|
|
submodule_search_locations=[str(NAPCAT_PLUGIN_DIR)],
|
|
)
|
|
if spec is None or spec.loader is None:
|
|
raise ImportError(f"无法为 NapCat 插件创建模块规格: {plugin_path}")
|
|
|
|
module = util.module_from_spec(spec)
|
|
sys.modules[NAPCAT_TEST_MODULE] = module
|
|
try:
|
|
spec.loader.exec_module(module)
|
|
except Exception:
|
|
sys.modules.pop(NAPCAT_TEST_MODULE, None)
|
|
raise
|
|
|
|
return (
|
|
import_module(f"{NAPCAT_TEST_MODULE}.constants"),
|
|
import_module(f"{NAPCAT_TEST_MODULE}.config"),
|
|
import_module(f"{NAPCAT_TEST_MODULE}.plugin"),
|
|
import_module(f"{NAPCAT_TEST_MODULE}.runtime_state"),
|
|
)
|
|
|
|
|
|
def _load_napcat_sdk_symbols() -> Tuple[Any, Any, Any, Any]:
|
|
"""动态加载 NapCat 插件测试所需的符号。
|
|
|
|
Returns:
|
|
tuple[Any, Any, Any, Any]:
|
|
依次返回网关名常量、配置类、插件类和运行时状态管理器类。
|
|
"""
|
|
|
|
constants_module, config_module, plugin_module, runtime_state_module = _load_napcat_sdk_modules()
|
|
return (
|
|
constants_module.NAPCAT_GATEWAY_NAME,
|
|
config_module.NapCatServerConfig,
|
|
plugin_module.NapCatAdapterPlugin,
|
|
runtime_state_module.NapCatRuntimeStateManager,
|
|
)
|
|
|
|
|
|
def _load_napcat_inbound_codec_cls() -> Any:
|
|
"""动态加载 NapCat 入站编解码器类。
|
|
|
|
Returns:
|
|
Any: ``NapCatInboundCodec`` 类对象。
|
|
"""
|
|
_load_napcat_sdk_modules()
|
|
codec_module = import_module(f"{NAPCAT_TEST_MODULE}.codecs.inbound.message_codec")
|
|
return codec_module.NapCatInboundCodec
|
|
|
|
|
|
def _load_napcat_query_service_cls() -> Any:
|
|
"""动态加载 NapCat 查询服务类。
|
|
|
|
Returns:
|
|
Any: ``NapCatQueryService`` 类对象。
|
|
"""
|
|
_load_napcat_sdk_modules()
|
|
query_service_module = import_module(f"{NAPCAT_TEST_MODULE}.services.query_service")
|
|
return query_service_module.NapCatQueryService
|
|
|
|
|
|
def test_napcat_plugin_collects_duplex_message_gateway() -> None:
|
|
"""NapCat 插件应声明新的双工消息网关组件。"""
|
|
|
|
napcat_gateway_name, _napcat_server_config, napcat_plugin_cls, _runtime_state_cls = _load_napcat_sdk_symbols()
|
|
plugin = napcat_plugin_cls()
|
|
components = plugin.get_components()
|
|
gateway_components = [
|
|
component
|
|
for component in components
|
|
if component.get("type") == "MESSAGE_GATEWAY"
|
|
]
|
|
|
|
assert len(gateway_components) == 1
|
|
gateway_component = gateway_components[0]
|
|
assert gateway_component["name"] == napcat_gateway_name
|
|
assert gateway_component["metadata"]["route_type"] == "duplex"
|
|
assert gateway_component["metadata"]["platform"] == "qq"
|
|
assert gateway_component["metadata"]["protocol"] == "napcat"
|
|
|
|
|
|
def test_napcat_plugin_uses_sdk_config_model() -> None:
|
|
"""NapCat 插件应声明 SDK 配置模型并暴露默认配置与 Schema。"""
|
|
|
|
constants_module, _config_module, plugin_module, _runtime_state_module = _load_napcat_sdk_modules()
|
|
plugin = plugin_module.NapCatAdapterPlugin()
|
|
|
|
default_config = plugin.get_default_config()
|
|
schema = plugin.get_webui_config_schema(plugin_id="maibot-team.napcat-adapter")
|
|
|
|
assert default_config["plugin"]["config_version"] == constants_module.SUPPORTED_CONFIG_VERSION
|
|
assert default_config["chat"]["ban_qq_bot"] is False
|
|
assert default_config["filters"]["ignore_self_message"] is True
|
|
assert schema["plugin_id"] == "maibot-team.napcat-adapter"
|
|
assert schema["sections"]["chat"]["fields"]["group_list"]["type"] == "array"
|
|
assert schema["sections"]["chat"]["fields"]["group_list_type"]["choices"] == ["whitelist", "blacklist"]
|
|
|
|
|
|
def test_napcat_plugin_normalizes_legacy_config_values() -> None:
|
|
"""NapCat 插件应兼容旧配置字段并输出规范化结果。"""
|
|
|
|
constants_module, _config_module, plugin_module, _runtime_state_module = _load_napcat_sdk_modules()
|
|
plugin = plugin_module.NapCatAdapterPlugin()
|
|
|
|
plugin.set_plugin_config(
|
|
{
|
|
"plugin": {"enabled": True, "config_version": constants_module.SUPPORTED_CONFIG_VERSION},
|
|
"connection": {
|
|
"access_token": "secret-token",
|
|
"heartbeat_sec": "45",
|
|
"ws_url": "ws://10.0.0.8:3012/onebot/v11/ws",
|
|
},
|
|
"chat": {
|
|
"ban_qq_bot": True,
|
|
"ban_user_id": ["42", 42, ""],
|
|
"group_list": [123, " 456 ", None, "123"],
|
|
"group_list_type": "whitelist",
|
|
"private_list": "invalid",
|
|
"private_list_type": "unexpected",
|
|
},
|
|
"filters": {"ignore_self_message": True},
|
|
}
|
|
)
|
|
|
|
config_data = plugin.get_plugin_config_data()
|
|
|
|
assert "connection" not in config_data
|
|
assert config_data["plugin"]["config_version"] == constants_module.SUPPORTED_CONFIG_VERSION
|
|
assert config_data["napcat_server"]["host"] == "10.0.0.8"
|
|
assert config_data["napcat_server"]["port"] == 3012
|
|
assert config_data["napcat_server"]["token"] == "secret-token"
|
|
assert config_data["napcat_server"]["heartbeat_interval"] == 45.0
|
|
assert config_data["chat"]["group_list"] == ["123", "456"]
|
|
assert config_data["chat"]["private_list"] == []
|
|
assert config_data["chat"]["private_list_type"] == constants_module.DEFAULT_CHAT_LIST_TYPE
|
|
assert plugin.config.napcat_server.build_ws_url() == "ws://10.0.0.8:3012"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runtime_state_reports_via_gateway_capability() -> None:
|
|
"""NapCat 运行时状态应通过新的消息网关能力上报。"""
|
|
|
|
napcat_gateway_name, napcat_server_config_cls, _napcat_plugin_cls, runtime_state_cls = _load_napcat_sdk_symbols()
|
|
gateway_capability = _FakeGatewayCapability()
|
|
runtime_state_manager = runtime_state_cls(
|
|
gateway_capability=gateway_capability,
|
|
logger=logging.getLogger("test.napcat_adapter"),
|
|
gateway_name=napcat_gateway_name,
|
|
)
|
|
|
|
connected = await runtime_state_manager.report_connected(
|
|
"10001",
|
|
napcat_server_config_cls(connection_id="primary"),
|
|
)
|
|
await runtime_state_manager.report_disconnected()
|
|
|
|
assert connected is True
|
|
assert gateway_capability.calls[0]["gateway_name"] == napcat_gateway_name
|
|
assert gateway_capability.calls[0]["ready"] is True
|
|
assert gateway_capability.calls[0]["platform"] == "qq"
|
|
assert gateway_capability.calls[0]["account_id"] == "10001"
|
|
assert gateway_capability.calls[0]["scope"] == "primary"
|
|
assert gateway_capability.calls[1]["gateway_name"] == napcat_gateway_name
|
|
assert gateway_capability.calls[1]["ready"] is False
|
|
assert gateway_capability.calls[1]["platform"] == "qq"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_napcat_plugin_send_result_contains_message_id_echo_callback() -> None:
|
|
"""NapCat 插件发送成功后应显式返回消息 ID 回调数据。"""
|
|
|
|
_napcat_gateway_name, _napcat_server_config, napcat_plugin_cls, _runtime_state_cls = _load_napcat_sdk_symbols()
|
|
plugin = napcat_plugin_cls()
|
|
|
|
class _FakeOutboundCodec:
|
|
"""用于测试的出站编码器替身。"""
|
|
|
|
@staticmethod
|
|
def build_outbound_action(message: Dict[str, Any], route: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]:
|
|
"""返回固定动作与参数。"""
|
|
|
|
del message
|
|
del route
|
|
return "send_msg", {"message": "hello"}
|
|
|
|
class _FakeTransport:
|
|
"""用于测试的传输层替身。"""
|
|
|
|
@staticmethod
|
|
async def call_action(action_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""返回带平台消息 ID 的成功响应。"""
|
|
|
|
del action_name
|
|
del params
|
|
return {
|
|
"status": "ok",
|
|
"data": {
|
|
"message_id": "platform-message-id",
|
|
},
|
|
}
|
|
|
|
plugin._require_runtime_bundle = lambda: SimpleNamespace( # type: ignore[method-assign]
|
|
outbound_codec=_FakeOutboundCodec(),
|
|
transport=_FakeTransport(),
|
|
)
|
|
|
|
result = await plugin.handle_napcat_gateway(
|
|
message={"message_id": "internal-message-id"},
|
|
route={},
|
|
)
|
|
|
|
assert result["success"] is True
|
|
assert result["external_message_id"] == "platform-message-id"
|
|
assert result["metadata"]["adapter_callbacks"] == [
|
|
{
|
|
"name": "message_id_echo",
|
|
"payload": {
|
|
"content": {
|
|
"type": "echo",
|
|
"echo": "internal-message-id",
|
|
"actual_id": "platform-message-id",
|
|
}
|
|
},
|
|
}
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_inbound_codec_parses_forward_nodes_from_legacy_message_field() -> None:
|
|
"""入站编解码器应兼容旧版 ``sender + message`` 转发节点结构。"""
|
|
|
|
inbound_codec_cls = _load_napcat_inbound_codec_cls()
|
|
codec = inbound_codec_cls(
|
|
logger=logging.getLogger("test.napcat_adapter.forward_legacy"),
|
|
query_service=_FakeNapCatQueryService(
|
|
forward_payloads={
|
|
"forward-1": {
|
|
"messages": [
|
|
{
|
|
"sender": {"user_id": "10001", "nickname": "张三", "card": "群名片"},
|
|
"message_id": "node-1",
|
|
"message": [{"type": "text", "data": {"text": "第一条转发"}}],
|
|
}
|
|
]
|
|
}
|
|
}
|
|
),
|
|
)
|
|
|
|
segments, is_at = await codec.convert_segments(
|
|
{"message": [{"type": "forward", "data": {"id": "forward-1"}}]},
|
|
"",
|
|
)
|
|
|
|
assert is_at is False
|
|
assert len(segments) == 1
|
|
assert segments[0]["type"] == "forward"
|
|
assert segments[0]["data"][0]["user_id"] == "10001"
|
|
assert segments[0]["data"][0]["user_nickname"] == "张三"
|
|
assert segments[0]["data"][0]["user_cardname"] == "群名片"
|
|
assert segments[0]["data"][0]["content"] == [{"type": "text", "data": "第一条转发"}]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_inbound_codec_parses_nested_inline_forward_content() -> None:
|
|
"""入站编解码器应支持内联 ``content`` 形式的嵌套合并转发。"""
|
|
|
|
inbound_codec_cls = _load_napcat_inbound_codec_cls()
|
|
codec = inbound_codec_cls(
|
|
logger=logging.getLogger("test.napcat_adapter.forward_nested"),
|
|
query_service=_FakeNapCatQueryService(
|
|
forward_payloads={
|
|
"forward-outer": {
|
|
"messages": [
|
|
{
|
|
"sender": {"user_id": "10001", "nickname": "张三"},
|
|
"message_id": "node-outer",
|
|
"message": [
|
|
{
|
|
"type": "forward",
|
|
"data": {
|
|
"content": [
|
|
{
|
|
"sender": {"user_id": "10002", "nickname": "李四"},
|
|
"message_id": "node-inner",
|
|
"message": [{"type": "text", "data": {"text": "内层消息"}}],
|
|
}
|
|
]
|
|
},
|
|
}
|
|
],
|
|
}
|
|
]
|
|
}
|
|
}
|
|
),
|
|
)
|
|
|
|
segments, _ = await codec.convert_segments(
|
|
{"message": [{"type": "forward", "data": {"id": "forward-outer"}}]},
|
|
"",
|
|
)
|
|
|
|
assert len(segments) == 1
|
|
assert segments[0]["type"] == "forward"
|
|
outer_content = segments[0]["data"][0]["content"]
|
|
assert len(outer_content) == 1
|
|
assert outer_content[0]["type"] == "forward"
|
|
nested_nodes = outer_content[0]["data"]
|
|
assert nested_nodes[0]["user_id"] == "10002"
|
|
assert nested_nodes[0]["user_nickname"] == "李四"
|
|
assert nested_nodes[0]["content"] == [{"type": "text", "data": "内层消息"}]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_inbound_codec_resolves_at_to_group_cardname() -> None:
|
|
"""入站编解码器应优先将 ``at`` 解析为群昵称。"""
|
|
|
|
inbound_codec_cls = _load_napcat_inbound_codec_cls()
|
|
codec = inbound_codec_cls(
|
|
logger=logging.getLogger("test.napcat_adapter.at_cardname"),
|
|
query_service=_FakeNapCatQueryService(
|
|
group_member_payloads={
|
|
("12345", "1206069534"): {
|
|
"nickname": "QQ昵称",
|
|
"card": "群昵称",
|
|
}
|
|
}
|
|
),
|
|
)
|
|
|
|
message_dict = await codec.build_message_dict(
|
|
payload={
|
|
"message_type": "group",
|
|
"group_id": "12345",
|
|
"message_id": "msg-1",
|
|
"message": [{"type": "at", "data": {"qq": "1206069534"}}],
|
|
"sender": {"user_id": "10001", "nickname": "发送者"},
|
|
"time": 1710000000,
|
|
},
|
|
self_id="20001",
|
|
sender_user_id="10001",
|
|
sender={"user_id": "10001", "nickname": "发送者"},
|
|
)
|
|
|
|
assert message_dict["processed_plain_text"] == "@群昵称"
|
|
assert message_dict["display_message"] == "@群昵称"
|
|
assert message_dict["raw_message"] == [
|
|
{
|
|
"type": "at",
|
|
"data": {
|
|
"target_user_id": "1206069534",
|
|
"target_user_nickname": "QQ昵称",
|
|
"target_user_cardname": "群昵称",
|
|
},
|
|
}
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_inbound_codec_falls_back_to_qq_nickname_when_group_cardname_is_empty() -> None:
|
|
"""入站编解码器在群昵称为空时应回退到 QQ 昵称。"""
|
|
|
|
inbound_codec_cls = _load_napcat_inbound_codec_cls()
|
|
codec = inbound_codec_cls(
|
|
logger=logging.getLogger("test.napcat_adapter.at_nickname"),
|
|
query_service=_FakeNapCatQueryService(
|
|
group_member_payloads={
|
|
("12345", "1206069534"): {
|
|
"nickname": "QQ昵称",
|
|
"card": "",
|
|
}
|
|
}
|
|
),
|
|
)
|
|
|
|
message_dict = await codec.build_message_dict(
|
|
payload={
|
|
"message_type": "group",
|
|
"group_id": "12345",
|
|
"message_id": "msg-2",
|
|
"message": [{"type": "at", "data": {"qq": "1206069534"}}],
|
|
"sender": {"user_id": "10001", "nickname": "发送者"},
|
|
"time": 1710000000,
|
|
},
|
|
self_id="20001",
|
|
sender_user_id="10001",
|
|
sender={"user_id": "10001", "nickname": "发送者"},
|
|
)
|
|
|
|
assert message_dict["processed_plain_text"] == "@QQ昵称"
|
|
assert message_dict["display_message"] == "@QQ昵称"
|
|
assert message_dict["raw_message"] == [
|
|
{
|
|
"type": "at",
|
|
"data": {
|
|
"target_user_id": "1206069534",
|
|
"target_user_nickname": "QQ昵称",
|
|
"target_user_cardname": None,
|
|
},
|
|
}
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_inbound_codec_falls_back_to_stranger_nickname_when_group_profile_is_missing() -> None:
|
|
"""入站编解码器在群资料缺失时应继续回退到 QQ 昵称。"""
|
|
|
|
inbound_codec_cls = _load_napcat_inbound_codec_cls()
|
|
codec = inbound_codec_cls(
|
|
logger=logging.getLogger("test.napcat_adapter.at_stranger_nickname"),
|
|
query_service=_FakeNapCatQueryService(
|
|
group_member_payloads={("12345", "1206069534"): None},
|
|
stranger_payloads={"1206069534": {"nickname": "QQ昵称"}},
|
|
),
|
|
)
|
|
|
|
message_dict = await codec.build_message_dict(
|
|
payload={
|
|
"message_type": "group",
|
|
"group_id": "12345",
|
|
"message_id": "msg-3",
|
|
"message": [{"type": "at", "data": {"qq": "1206069534"}}],
|
|
"sender": {"user_id": "10001", "nickname": "发送者"},
|
|
"time": 1710000000,
|
|
},
|
|
self_id="20001",
|
|
sender_user_id="10001",
|
|
sender={"user_id": "10001", "nickname": "发送者"},
|
|
)
|
|
|
|
assert message_dict["processed_plain_text"] == "@QQ昵称"
|
|
assert message_dict["display_message"] == "@QQ昵称"
|
|
assert message_dict["raw_message"] == [
|
|
{
|
|
"type": "at",
|
|
"data": {
|
|
"target_user_id": "1206069534",
|
|
"target_user_nickname": "QQ昵称",
|
|
"target_user_cardname": None,
|
|
},
|
|
}
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_query_service_normalizes_forward_payload_list() -> None:
|
|
"""查询服务应兼容 ``get_forward_msg`` 直接返回节点列表。"""
|
|
|
|
query_service_cls = _load_napcat_query_service_cls()
|
|
query_service = query_service_cls(
|
|
action_service=_FakeNapCatActionService(
|
|
[
|
|
{
|
|
"sender": {"user_id": "10001", "nickname": "张三"},
|
|
"message_id": "node-1",
|
|
"message": [{"type": "text", "data": {"text": "列表返回"}}],
|
|
}
|
|
]
|
|
),
|
|
logger=logging.getLogger("test.napcat_adapter.query_service"),
|
|
)
|
|
|
|
forward_payload = await query_service.get_forward_message("forward-1")
|
|
|
|
assert forward_payload == {
|
|
"messages": [
|
|
{
|
|
"sender": {"user_id": "10001", "nickname": "张三"},
|
|
"message_id": "node-1",
|
|
"message": [{"type": "text", "data": {"text": "列表返回"}}],
|
|
}
|
|
]
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_query_service_supports_official_no_cache_for_get_stranger_info() -> None:
|
|
"""查询服务应按官方字段下发 ``no_cache``。"""
|
|
|
|
action_service = _FakeNapCatActionService({"nickname": "测试用户"})
|
|
query_service_cls = _load_napcat_query_service_cls()
|
|
query_service = query_service_cls(
|
|
action_service=action_service,
|
|
logger=logging.getLogger("test.napcat_adapter.query_service.stranger"),
|
|
)
|
|
|
|
payload = await query_service.get_stranger_info("10001", no_cache=True)
|
|
|
|
assert payload == {"nickname": "测试用户"}
|
|
assert action_service.action_data_calls[-1] == {
|
|
"action_name": "get_stranger_info",
|
|
"params": {"user_id": "10001", "no_cache": True},
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_query_service_supports_official_forward_id_alias() -> None:
|
|
"""查询服务应兼容官方 ``id`` 字段调用 ``get_forward_msg``。"""
|
|
|
|
action_service = _FakeNapCatActionService({"messages": []})
|
|
query_service_cls = _load_napcat_query_service_cls()
|
|
query_service = query_service_cls(
|
|
action_service=action_service,
|
|
logger=logging.getLogger("test.napcat_adapter.query_service.forward_alias"),
|
|
)
|
|
|
|
payload = await query_service.get_forward_message(forward_id="forward-alias")
|
|
|
|
assert payload == {"messages": []}
|
|
assert action_service.action_data_calls[-1] == {
|
|
"action_name": "get_forward_msg",
|
|
"params": {"id": "forward-alias"},
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_query_service_supports_custom_out_format_for_get_record() -> None:
|
|
"""查询服务应按官方字段下发自定义 ``out_format``。"""
|
|
|
|
action_service = _FakeNapCatActionService({"file": "voice.mp3"})
|
|
query_service_cls = _load_napcat_query_service_cls()
|
|
query_service = query_service_cls(
|
|
action_service=action_service,
|
|
logger=logging.getLogger("test.napcat_adapter.query_service.record"),
|
|
)
|
|
|
|
payload = await query_service.get_record_detail(file_id="record-1", out_format="mp3")
|
|
|
|
assert payload == {"file": "voice.mp3"}
|
|
assert action_service.action_data_calls[-1] == {
|
|
"action_name": "get_record",
|
|
"params": {"file_id": "record-1", "out_format": "mp3"},
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_query_service_supports_target_id_for_send_poke() -> None:
|
|
"""查询服务应按官方字段下发 ``target_id``。"""
|
|
|
|
action_service = _FakeNapCatActionService(None)
|
|
query_service_cls = _load_napcat_query_service_cls()
|
|
query_service = query_service_cls(
|
|
action_service=action_service,
|
|
logger=logging.getLogger("test.napcat_adapter.query_service.poke"),
|
|
)
|
|
|
|
response = await query_service.send_poke(user_id=10001, group_id=20002, target_id=30003)
|
|
|
|
assert response["status"] == "ok"
|
|
assert action_service.action_calls[-1] == {
|
|
"action_name": "send_poke",
|
|
"params": {"user_id": 10001, "group_id": 20002, "target_id": 30003},
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_public_api_send_poke_supports_official_fields_and_legacy_alias() -> None:
|
|
"""公开 API 应同时兼容官方字段和旧版 ``qq_id`` 别名。"""
|
|
|
|
_napcat_gateway_name, _napcat_server_config, napcat_plugin_cls, _runtime_state_cls = _load_napcat_sdk_symbols()
|
|
plugin = napcat_plugin_cls()
|
|
captured: List[Dict[str, Any]] = []
|
|
|
|
class _SpyQueryService:
|
|
async def send_poke(
|
|
self,
|
|
user_id: int,
|
|
group_id: int | None = None,
|
|
target_id: int | None = None,
|
|
) -> Dict[str, Any]:
|
|
captured.append(
|
|
{
|
|
"user_id": user_id,
|
|
"group_id": group_id,
|
|
"target_id": target_id,
|
|
}
|
|
)
|
|
return {"status": "ok", "data": {}}
|
|
|
|
plugin._query_service = _SpyQueryService()
|
|
plugin._ensure_runtime_components = lambda: None # type: ignore[method-assign]
|
|
|
|
await plugin.api_send_poke(user_id="10001", group_id="20002", target_id="30003")
|
|
await plugin.api_send_poke(qq_id="40004")
|
|
|
|
assert captured == [
|
|
{"user_id": 10001, "group_id": 20002, "target_id": 30003},
|
|
{"user_id": 40004, "group_id": None, "target_id": None},
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_public_api_get_forward_msg_and_get_record_support_official_fields() -> None:
|
|
"""公开 API 应接受官方 ``id`` 和 ``out_format`` 等字段。"""
|
|
|
|
_napcat_gateway_name, _napcat_server_config, napcat_plugin_cls, _runtime_state_cls = _load_napcat_sdk_symbols()
|
|
plugin = napcat_plugin_cls()
|
|
captured: Dict[str, Dict[str, Any]] = {}
|
|
|
|
class _SpyQueryService:
|
|
async def get_forward_message(
|
|
self,
|
|
message_id: str | None = None,
|
|
forward_id: str | None = None,
|
|
) -> Dict[str, Any]:
|
|
captured["forward"] = {"message_id": message_id, "forward_id": forward_id}
|
|
return {"messages": []}
|
|
|
|
async def get_record_detail(
|
|
self,
|
|
file_name: str | None = None,
|
|
file_id: str | None = None,
|
|
out_format: str = "wav",
|
|
) -> Dict[str, Any]:
|
|
captured["record"] = {
|
|
"file_name": file_name,
|
|
"file_id": file_id,
|
|
"out_format": out_format,
|
|
}
|
|
return {"file_id": file_id or "record-1"}
|
|
|
|
plugin._query_service = _SpyQueryService()
|
|
plugin._ensure_runtime_components = lambda: None # type: ignore[method-assign]
|
|
|
|
forward_payload = await plugin.api_get_forward_msg(id="forward-1")
|
|
record_payload = await plugin.api_get_record(file_id="record-1", out_format="mp3")
|
|
|
|
assert forward_payload == {"messages": []}
|
|
assert record_payload == {"file_id": "record-1"}
|
|
assert captured["forward"] == {"message_id": None, "forward_id": "forward-1"}
|
|
assert captured["record"] == {
|
|
"file_name": None,
|
|
"file_id": "record-1",
|
|
"out_format": "mp3",
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_public_api_send_poke_rejects_conflicting_alias_values() -> None:
|
|
"""公开 ``send_poke`` API 应拒绝互相冲突的别名值。"""
|
|
|
|
_napcat_gateway_name, _napcat_server_config, napcat_plugin_cls, _runtime_state_cls = _load_napcat_sdk_symbols()
|
|
plugin = napcat_plugin_cls()
|
|
plugin._ensure_runtime_components = lambda: None # type: ignore[method-assign]
|
|
|
|
with pytest.raises(ValueError, match="user_id 与 qq_id 不能同时传递不同的值"):
|
|
await plugin.api_send_poke(user_id="10001", qq_id="20002")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_public_api_get_forward_msg_rejects_conflicting_fields() -> None:
|
|
"""公开 ``get_forward_msg`` API 应拒绝冲突的双字段调用。"""
|
|
|
|
_napcat_gateway_name, _napcat_server_config, napcat_plugin_cls, _runtime_state_cls = _load_napcat_sdk_symbols()
|
|
plugin = napcat_plugin_cls()
|
|
plugin._ensure_runtime_components = lambda: None # type: ignore[method-assign]
|
|
|
|
with pytest.raises(ValueError, match="message_id 与 id 不能同时传递不同的值"):
|
|
await plugin.api_get_forward_msg(message_id="forward-a", id="forward-b")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_public_api_get_record_requires_file_or_file_id() -> None:
|
|
"""公开 ``get_record`` API 至少需要一个官方定位字段。"""
|
|
|
|
_napcat_gateway_name, _napcat_server_config, napcat_plugin_cls, _runtime_state_cls = _load_napcat_sdk_symbols()
|
|
plugin = napcat_plugin_cls()
|
|
plugin._ensure_runtime_components = lambda: None # type: ignore[method-assign]
|
|
|
|
with pytest.raises(ValueError, match="file 或 file_id 至少提供一个"):
|
|
await plugin.api_get_record()
|