merge: 同步 upstream/r-dev 并解决冲突

This commit is contained in:
DawnARC
2026-04-03 19:56:45 +08:00
186 changed files with 14212 additions and 6705 deletions

View File

@@ -0,0 +1,194 @@
"""HTML 浏览器渲染服务测试。"""
from pathlib import Path
from typing import Any, Dict, List
import pytest
from src.config.official_configs import PluginRuntimeRenderConfig
from src.services import html_render_service as html_render_service_module
from src.services.html_render_service import HTMLRenderService, ManagedBrowserRecord
class _FakeChromium:
"""用于模拟 Playwright Chromium 启动器的测试桩。"""
def __init__(self, effects: List[Any]) -> None:
"""初始化 Chromium 启动测试桩。
Args:
effects: 每次调用 ``launch`` 时依次返回或抛出的结果。
"""
self._effects: List[Any] = list(effects)
self.calls: List[Dict[str, Any]] = []
async def launch(self, **kwargs: Any) -> Any:
"""模拟 Playwright Chromium 的启动过程。
Args:
**kwargs: 浏览器启动参数。
Returns:
Any: 预设的浏览器对象。
Raises:
Exception: 当预设结果为异常对象时抛出。
"""
self.calls.append(dict(kwargs))
effect = self._effects.pop(0)
if isinstance(effect, Exception):
raise effect
return effect
class _FakePlaywright:
"""用于模拟 Playwright 根对象的测试桩。"""
def __init__(self, chromium: _FakeChromium) -> None:
"""初始化 Playwright 测试桩。
Args:
chromium: Chromium 启动器测试桩。
"""
self.chromium = chromium
def _build_render_config(**kwargs: Any) -> PluginRuntimeRenderConfig:
"""构造用于测试的浏览器渲染配置。
Args:
**kwargs: 需要覆盖的配置字段。
Returns:
PluginRuntimeRenderConfig: 测试使用的配置对象。
"""
payload: Dict[str, Any] = {
"auto_download_chromium": True,
"browser_install_root": "data/test-playwright-browsers",
}
payload.update(kwargs)
return PluginRuntimeRenderConfig(**payload)
@pytest.mark.asyncio
async def test_launch_browser_auto_downloads_chromium_when_missing(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""未检测到可用浏览器时,应自动下载 Chromium 并记录状态。"""
monkeypatch.setattr(html_render_service_module, "PROJECT_ROOT", tmp_path)
service = HTMLRenderService()
config = _build_render_config()
fake_browser = object()
fake_chromium = _FakeChromium(
[
RuntimeError("browserType.launch: Executable doesn't exist at /tmp/chromium"),
fake_browser,
]
)
install_calls: List[str] = []
monkeypatch.setattr(service, "_resolve_executable_path", lambda _config: "")
async def fake_install(_config: PluginRuntimeRenderConfig) -> None:
"""模拟 Chromium 自动下载。
Args:
_config: 当前浏览器渲染配置。
"""
install_calls.append(_config.browser_install_root)
browsers_path = service._get_managed_browsers_path(_config)
(browsers_path / "chromium-1234").mkdir(parents=True, exist_ok=True)
monkeypatch.setattr(service, "_install_chromium_browser", fake_install)
browser = await service._launch_browser(_FakePlaywright(fake_chromium), config)
assert browser is fake_browser
assert install_calls == ["data/test-playwright-browsers"]
assert len(fake_chromium.calls) == 2
browser_record = service._load_managed_browser_record()
assert browser_record is not None
assert browser_record.install_source == "auto_download"
assert browser_record.browser_name == "chromium"
@pytest.mark.asyncio
async def test_launch_browser_reuses_existing_managed_browser(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""已存在 Playwright 托管浏览器时,不应重复下载。"""
monkeypatch.setattr(html_render_service_module, "PROJECT_ROOT", tmp_path)
service = HTMLRenderService()
config = _build_render_config()
browsers_path = service._get_managed_browsers_path(config)
(browsers_path / "chrome-headless-shell-1234").mkdir(parents=True, exist_ok=True)
fake_browser = object()
fake_chromium = _FakeChromium([fake_browser])
monkeypatch.setattr(service, "_resolve_executable_path", lambda _config: "")
async def fail_install(_config: PluginRuntimeRenderConfig) -> None:
"""若被错误调用则立即失败。
Args:
_config: 当前浏览器渲染配置。
Raises:
AssertionError: 表示本测试不期望进入下载逻辑。
"""
raise AssertionError("不应触发自动下载")
monkeypatch.setattr(service, "_install_chromium_browser", fail_install)
browser = await service._launch_browser(_FakePlaywright(fake_chromium), config)
assert browser is fake_browser
assert len(fake_chromium.calls) == 1
browser_record = service._load_managed_browser_record()
assert browser_record is not None
assert browser_record.install_source == "existing_cache"
assert browser_record.browsers_path == str(browsers_path)
@pytest.mark.asyncio
async def test_launch_browser_prefers_local_executable(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""探测到本机浏览器时,应优先使用可执行文件路径启动。"""
monkeypatch.setattr(html_render_service_module, "PROJECT_ROOT", tmp_path)
service = HTMLRenderService()
config = _build_render_config()
fake_browser = object()
fake_chromium = _FakeChromium([fake_browser])
executable_path = "/usr/bin/google-chrome"
monkeypatch.setattr(service, "_resolve_executable_path", lambda _config: executable_path)
browser = await service._launch_browser(_FakePlaywright(fake_chromium), config)
assert browser is fake_browser
assert len(fake_chromium.calls) == 1
assert fake_chromium.calls[0]["executable_path"] == executable_path
assert service._load_managed_browser_record() is None
def test_managed_browser_record_roundtrip() -> None:
"""托管浏览器记录应支持序列化与反序列化。"""
record = ManagedBrowserRecord(
browser_name="chromium",
browsers_path="/tmp/playwright-browsers",
install_source="auto_download",
playwright_version="1.58.0",
recorded_at="2026-04-03T10:00:00+00:00",
last_verified_at="2026-04-03T10:00:01+00:00",
)
restored_record = ManagedBrowserRecord.from_dict(record.to_dict())
assert restored_record == record

View File

@@ -1,9 +1,9 @@
"""NapCat 插件与新 SDK 对接测试。"""
from importlib import import_module, util
from pathlib import Path
from typing import Any, Dict, List
from typing import Any, Dict, List, Tuple
import importlib
import logging
import sys
@@ -12,8 +12,10 @@ import pytest
PROJECT_ROOT = Path(__file__).resolve().parents[1]
PLUGINS_ROOT = PROJECT_ROOT / "plugins"
SDK_ROOT = PROJECT_ROOT / "packages" / "maibot-plugin-sdk"
NAPCAT_PLUGIN_DIR = PLUGINS_ROOT / "MaiBot-Napcat-Adapter"
NAPCAT_TEST_MODULE = "_test_napcat_adapter"
for import_path in (str(PLUGINS_ROOT), str(SDK_ROOT)):
for import_path in (str(SDK_ROOT),):
if import_path not in sys.path:
sys.path.insert(0, import_path)
@@ -63,7 +65,41 @@ class _FakeGatewayCapability:
return True
def _load_napcat_sdk_symbols() -> tuple[Any, Any, Any, Any]:
def _load_napcat_sdk_modules() -> Tuple[Any, Any, Any, Any]:
"""动态加载 NapCat 插件测试所需的模块。
Returns:
tuple[Any, Any, Any, Any]:
依次返回常量模块、配置模块、插件模块和运行时状态模块。
"""
if NAPCAT_TEST_MODULE not in sys.modules:
plugin_path = NAPCAT_PLUGIN_DIR / "plugin.py"
spec = util.spec_from_file_location(
NAPCAT_TEST_MODULE,
plugin_path,
submodule_search_locations=[str(NAPCAT_PLUGIN_DIR)],
)
if spec is None or spec.loader is None:
raise ImportError(f"无法为 NapCat 插件创建模块规格: {plugin_path}")
module = util.module_from_spec(spec)
sys.modules[NAPCAT_TEST_MODULE] = module
try:
spec.loader.exec_module(module)
except Exception:
sys.modules.pop(NAPCAT_TEST_MODULE, None)
raise
return (
import_module(f"{NAPCAT_TEST_MODULE}.constants"),
import_module(f"{NAPCAT_TEST_MODULE}.config"),
import_module(f"{NAPCAT_TEST_MODULE}.plugin"),
import_module(f"{NAPCAT_TEST_MODULE}.runtime_state"),
)
def _load_napcat_sdk_symbols() -> Tuple[Any, Any, Any, Any]:
"""动态加载 NapCat 插件测试所需的符号。
Returns:
@@ -71,10 +107,7 @@ def _load_napcat_sdk_symbols() -> tuple[Any, Any, Any, Any]:
依次返回网关名常量、配置类、插件类和运行时状态管理器类。
"""
constants_module = importlib.import_module("napcat_adapter.constants")
config_module = importlib.import_module("napcat_adapter.config")
plugin_module = importlib.import_module("napcat_adapter.plugin")
runtime_state_module = importlib.import_module("napcat_adapter.runtime_state")
constants_module, config_module, plugin_module, runtime_state_module = _load_napcat_sdk_modules()
return (
constants_module.NAPCAT_GATEWAY_NAME,
config_module.NapCatServerConfig,
@@ -103,6 +136,63 @@ def test_napcat_plugin_collects_duplex_message_gateway() -> None:
assert gateway_component["metadata"]["protocol"] == "napcat"
def test_napcat_plugin_uses_sdk_config_model() -> None:
"""NapCat 插件应声明 SDK 配置模型并暴露默认配置与 Schema。"""
constants_module, _config_module, plugin_module, _runtime_state_module = _load_napcat_sdk_modules()
plugin = plugin_module.NapCatAdapterPlugin()
default_config = plugin.get_default_config()
schema = plugin.get_webui_config_schema(plugin_id="maibot-team.napcat-adapter")
assert default_config["plugin"]["config_version"] == constants_module.SUPPORTED_CONFIG_VERSION
assert default_config["chat"]["ban_qq_bot"] is False
assert default_config["filters"]["ignore_self_message"] is True
assert schema["plugin_id"] == "maibot-team.napcat-adapter"
assert schema["sections"]["chat"]["fields"]["group_list"]["type"] == "array"
assert schema["sections"]["chat"]["fields"]["group_list_type"]["choices"] == ["whitelist", "blacklist"]
def test_napcat_plugin_normalizes_legacy_config_values() -> None:
"""NapCat 插件应兼容旧配置字段并输出规范化结果。"""
constants_module, _config_module, plugin_module, _runtime_state_module = _load_napcat_sdk_modules()
plugin = plugin_module.NapCatAdapterPlugin()
plugin.set_plugin_config(
{
"plugin": {"enabled": True, "config_version": ""},
"connection": {
"access_token": "secret-token",
"heartbeat_sec": "45",
"ws_url": "ws://10.0.0.8:3012/onebot/v11/ws",
},
"chat": {
"ban_qq_bot": True,
"ban_user_id": ["42", 42, ""],
"group_list": [123, " 456 ", None, "123"],
"group_list_type": "whitelist",
"private_list": "invalid",
"private_list_type": "unexpected",
},
"filters": {"ignore_self_message": True},
}
)
config_data = plugin.get_plugin_config_data()
assert "connection" not in config_data
assert config_data["plugin"]["config_version"] == constants_module.SUPPORTED_CONFIG_VERSION
assert config_data["napcat_server"]["host"] == "10.0.0.8"
assert config_data["napcat_server"]["port"] == 3012
assert config_data["napcat_server"]["token"] == "secret-token"
assert config_data["napcat_server"]["heartbeat_interval"] == 45.0
assert config_data["chat"]["group_list"] == ["123", "456"]
assert config_data["chat"]["private_list"] == []
assert config_data["chat"]["private_list_type"] == constants_module.DEFAULT_CHAT_LIST_TYPE
assert plugin.config.napcat_server.build_ws_url() == "ws://10.0.0.8:3012"
@pytest.mark.asyncio
async def test_runtime_state_reports_via_gateway_capability() -> None:
"""NapCat 运行时状态应通过新的消息网关能力上报。"""

View File

@@ -0,0 +1,436 @@
"""插件配置运行时测试。"""
from __future__ import annotations
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Dict, Mapping, Optional, Tuple, cast
import tomllib
import pytest
from src.plugin_runtime.component_query import component_query_service
from src.plugin_runtime.protocol.envelope import (
Envelope,
InspectPluginConfigPayload,
MessageType,
RegisterPluginPayload,
ValidatePluginConfigPayload,
)
from src.plugin_runtime.runner.runner_main import PluginRunner
from src.webui.routers.plugin.config_routes import get_plugin_config, get_plugin_config_schema, update_plugin_config
from src.webui.routers.plugin.schemas import UpdatePluginConfigRequest
class _DemoConfigPlugin:
"""用于测试 Runner 配置归一化流程的伪插件。"""
def __init__(self) -> None:
"""初始化测试插件状态。"""
self.received_config: Dict[str, Any] = {}
def normalize_plugin_config(self, config_data: Optional[Mapping[str, Any]]) -> Tuple[Dict[str, Any], bool]:
"""补齐测试插件的默认配置。
Args:
config_data: 原始配置数据。
Returns:
Tuple[Dict[str, Any], bool]: 补齐后的配置,以及是否发生变更。
"""
current_config = dict(config_data or {})
plugin_section = dict(current_config.get("plugin", {}))
changed = "retry_count" not in plugin_section
plugin_section.setdefault("enabled", True)
plugin_section.setdefault("retry_count", 3)
return {"plugin": plugin_section}, changed
def set_plugin_config(self, config: Dict[str, Any]) -> None:
"""记录 Runner 注入的配置内容。
Args:
config: 当前最新配置。
"""
self.received_config = config
def get_default_config(self) -> Dict[str, Any]:
"""返回测试插件的默认配置。
Returns:
Dict[str, Any]: 默认配置字典。
"""
return {"plugin": {"enabled": True, "retry_count": 3}}
def get_webui_config_schema(
self,
*,
plugin_id: str = "",
plugin_name: str = "",
plugin_version: str = "",
plugin_description: str = "",
plugin_author: str = "",
) -> Dict[str, Any]:
"""返回测试插件的 WebUI 配置 Schema。
Args:
plugin_id: 插件 ID。
plugin_name: 插件名称。
plugin_version: 插件版本。
plugin_description: 插件描述。
plugin_author: 插件作者。
Returns:
Dict[str, Any]: 测试配置 Schema。
"""
del plugin_name, plugin_description, plugin_author
return {
"plugin_id": plugin_id,
"plugin_info": {
"name": "Demo",
"version": plugin_version,
"description": "",
"author": "",
},
"sections": {
"plugin": {
"fields": {
"enabled": {
"type": "boolean",
"label": "启用",
"default": True,
"ui_type": "switch",
}
}
}
},
"layout": {"type": "auto", "tabs": []},
}
class _StrictConfigPlugin:
"""用于测试配置校验错误的伪插件。"""
def normalize_plugin_config(self, config_data: Optional[Mapping[str, Any]]) -> Tuple[Dict[str, Any], bool]:
"""校验重试次数不能为负数。
Args:
config_data: 原始配置数据。
Returns:
Tuple[Dict[str, Any], bool]: 规范化配置结果。
Raises:
ValueError: 当重试次数为负数时抛出。
"""
current_config = dict(config_data or {})
plugin_section = dict(current_config.get("plugin", {}))
retry_count = int(plugin_section.get("retry_count", 0))
if retry_count < 0:
raise ValueError("重试次数不能小于 0")
plugin_section.setdefault("enabled", True)
return {"plugin": plugin_section}, False
def set_plugin_config(self, config: Dict[str, Any]) -> None:
"""兼容 Runner 配置注入接口。
Args:
config: 当前配置字典。
"""
del config
def test_runner_apply_plugin_config_generates_config_file(tmp_path: Path) -> None:
"""Runner 注入配置时应自动补齐并落盘 config.toml。"""
plugin = _DemoConfigPlugin()
runner = PluginRunner(
host_address="ipc://unused",
session_token="session-token",
plugin_dirs=[],
)
meta = SimpleNamespace(plugin_id="demo.plugin", plugin_dir=str(tmp_path), instance=plugin)
runner._apply_plugin_config(cast(Any, meta), config_data={"plugin": {"enabled": False}})
config_path = tmp_path / "config.toml"
assert config_path.exists()
assert plugin.received_config == {"plugin": {"enabled": False, "retry_count": 3}}
with config_path.open("rb") as handle:
saved_config = tomllib.load(handle)
assert saved_config == {"plugin": {"enabled": False, "retry_count": 3}}
def test_component_query_service_returns_plugin_config_schema(monkeypatch: Any) -> None:
"""组件查询服务应支持按插件 ID 返回配置 Schema。"""
payload = RegisterPluginPayload(
plugin_id="demo.plugin",
plugin_version="1.0.0",
default_config={"plugin": {"enabled": True}},
config_schema={
"plugin_id": "demo.plugin",
"plugin_info": {
"name": "Demo",
"version": "1.0.0",
"description": "",
"author": "",
},
"sections": {"plugin": {"fields": {}}},
"layout": {"type": "auto", "tabs": []},
},
)
fake_supervisor = SimpleNamespace(_registered_plugins={"demo.plugin": payload})
fake_manager = SimpleNamespace(_get_supervisor_for_plugin=lambda plugin_id: fake_supervisor)
monkeypatch.setattr(
type(component_query_service),
"_get_runtime_manager",
staticmethod(lambda: fake_manager),
)
assert component_query_service.get_plugin_config_schema("demo.plugin") == payload.config_schema
assert component_query_service.get_plugin_default_config("demo.plugin") == payload.default_config
@pytest.mark.asyncio
async def test_runner_validate_plugin_config_handler_returns_normalized_config(monkeypatch: pytest.MonkeyPatch) -> None:
"""Runner 应返回插件模型归一化后的配置。"""
plugin = _DemoConfigPlugin()
runner = PluginRunner(
host_address="ipc://unused",
session_token="session-token",
plugin_dirs=[],
)
meta = SimpleNamespace(plugin_id="demo.plugin", plugin_dir="", instance=plugin)
monkeypatch.setattr(runner._loader, "get_plugin", lambda plugin_id: meta if plugin_id == "demo.plugin" else None)
envelope = Envelope(
request_id=1,
message_type=MessageType.REQUEST,
method="plugin.validate_config",
plugin_id="demo.plugin",
payload=ValidatePluginConfigPayload(config_data={"plugin": {"enabled": False}}).model_dump(),
)
response = await runner._handle_validate_plugin_config(envelope)
assert response.error is None
assert response.payload["success"] is True
assert response.payload["normalized_config"] == {"plugin": {"enabled": False, "retry_count": 3}}
@pytest.mark.asyncio
async def test_runner_inspect_plugin_config_handler_supports_unloaded_plugin(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Runner 应支持对未加载插件执行冷检查。"""
plugin = _DemoConfigPlugin()
runner = PluginRunner(
host_address="ipc://unused",
session_token="session-token",
plugin_dirs=[],
)
meta = SimpleNamespace(
plugin_id="demo.plugin",
plugin_dir="/tmp/demo-plugin",
instance=plugin,
manifest=SimpleNamespace(
name="Demo",
description="",
author=SimpleNamespace(name="tester"),
),
version="1.0.0",
)
purged_plugins: list[tuple[str, str]] = []
monkeypatch.setattr(
runner,
"_resolve_plugin_meta_for_config_request",
lambda plugin_id: (meta, True, None) if plugin_id == "demo.plugin" else (None, False, "not-found"),
)
monkeypatch.setattr(
runner._loader,
"purge_plugin_modules",
lambda plugin_id, plugin_dir: purged_plugins.append((plugin_id, plugin_dir)),
)
envelope = Envelope(
request_id=1,
message_type=MessageType.REQUEST,
method="plugin.inspect_config",
plugin_id="demo.plugin",
payload=InspectPluginConfigPayload(
config_data={"plugin": {"enabled": False}},
use_provided_config=True,
).model_dump(),
)
response = await runner._handle_inspect_plugin_config(envelope)
assert response.error is None
assert response.payload["success"] is True
assert response.payload["enabled"] is False
assert response.payload["normalized_config"] == {"plugin": {"enabled": False, "retry_count": 3}}
assert response.payload["default_config"] == {"plugin": {"enabled": True, "retry_count": 3}}
assert purged_plugins == [("demo.plugin", "/tmp/demo-plugin")]
@pytest.mark.asyncio
async def test_runner_validate_plugin_config_handler_returns_error_on_invalid_config(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Runner 应在插件拒绝配置时返回错误响应。"""
plugin = _StrictConfigPlugin()
runner = PluginRunner(
host_address="ipc://unused",
session_token="session-token",
plugin_dirs=[],
)
meta = SimpleNamespace(plugin_id="demo.plugin", plugin_dir="", instance=plugin)
monkeypatch.setattr(runner._loader, "get_plugin", lambda plugin_id: meta if plugin_id == "demo.plugin" else None)
envelope = Envelope(
request_id=1,
message_type=MessageType.REQUEST,
method="plugin.validate_config",
plugin_id="demo.plugin",
payload=ValidatePluginConfigPayload(config_data={"plugin": {"retry_count": -1}}).model_dump(),
)
response = await runner._handle_validate_plugin_config(envelope)
assert response.error is not None
assert response.error["message"] == "重试次数不能小于 0"
@pytest.mark.asyncio
async def test_update_plugin_config_prefers_runtime_validation(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
"""WebUI 保存插件配置时应优先使用运行时校验结果。"""
config_path = tmp_path / "config.toml"
async def _mock_validate_plugin_config(plugin_id: str, config_data: Dict[str, Any]) -> Dict[str, Any] | None:
"""返回运行时归一化后的配置。
Args:
plugin_id: 插件 ID。
config_data: 原始配置。
Returns:
Dict[str, Any] | None: 归一化后的配置。
"""
assert plugin_id == "demo.plugin"
assert config_data == {"plugin": {"enabled": False}}
return {"plugin": {"enabled": False, "retry_count": 3}}
fake_runtime_manager = SimpleNamespace(validate_plugin_config=_mock_validate_plugin_config)
monkeypatch.setattr(
"src.webui.routers.plugin.config_routes.require_plugin_token",
lambda session: session or "session-token",
)
monkeypatch.setattr(
"src.webui.routers.plugin.config_routes.find_plugin_path_by_id",
lambda plugin_id: tmp_path if plugin_id == "demo.plugin" else None,
)
monkeypatch.setattr(
"src.plugin_runtime.integration.get_plugin_runtime_manager",
lambda: fake_runtime_manager,
)
response = await update_plugin_config(
"demo.plugin",
UpdatePluginConfigRequest(config={"plugin.enabled": False}),
maibot_session="session-token",
)
assert response["success"] is True
with config_path.open("rb") as handle:
saved_config = tomllib.load(handle)
assert saved_config == {"plugin": {"enabled": False, "retry_count": 3}}
@pytest.mark.asyncio
async def test_webui_config_endpoints_use_runtime_inspection_for_unloaded_plugin(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
"""WebUI 在插件未加载时也应从代码定义返回配置与 Schema。"""
async def _mock_inspect_plugin_config(
plugin_id: str,
config_data: Optional[Dict[str, Any]] = None,
*,
use_provided_config: bool = False,
) -> SimpleNamespace | None:
"""返回运行时冷检查结果。
Args:
plugin_id: 插件 ID。
config_data: 可选配置。
use_provided_config: 是否使用传入配置。
Returns:
SimpleNamespace | None: 冷检查结果。
"""
del config_data, use_provided_config
if plugin_id != "demo.plugin":
return None
return SimpleNamespace(
config_schema={
"plugin_id": "demo.plugin",
"plugin_info": {
"name": "Demo",
"version": "1.0.0",
"description": "",
"author": "",
},
"sections": {"plugin": {"fields": {}}},
"layout": {"type": "auto", "tabs": []},
},
normalized_config={"plugin": {"enabled": True, "retry_count": 3}},
enabled=True,
)
fake_runtime_manager = SimpleNamespace(inspect_plugin_config=_mock_inspect_plugin_config)
monkeypatch.setattr(
"src.webui.routers.plugin.config_routes.require_plugin_token",
lambda session: session or "session-token",
)
monkeypatch.setattr(
"src.webui.routers.plugin.config_routes.find_plugin_path_by_id",
lambda plugin_id: tmp_path if plugin_id == "demo.plugin" else None,
)
monkeypatch.setattr(
"src.plugin_runtime.integration.get_plugin_runtime_manager",
lambda: fake_runtime_manager,
)
schema_response = await get_plugin_config_schema("demo.plugin", maibot_session="session-token")
config_response = await get_plugin_config("demo.plugin", maibot_session="session-token")
assert schema_response["success"] is True
assert schema_response["schema"]["plugin_id"] == "demo.plugin"
assert config_response == {
"success": True,
"config": {"plugin": {"enabled": True, "retry_count": 3}},
"message": "配置文件不存在,已返回默认配置",
}

View File

@@ -0,0 +1,225 @@
"""插件依赖流水线测试。"""
from pathlib import Path
import json
import pytest
from src.plugin_runtime.dependency_pipeline import PluginDependencyPipeline
def _build_manifest(
plugin_id: str,
*,
dependencies: list[dict[str, str]] | None = None,
) -> dict[str, object]:
"""构造测试用的 Manifest v2 数据。
Args:
plugin_id: 插件 ID。
dependencies: 依赖声明列表。
Returns:
dict[str, object]: 可直接写入 ``_manifest.json`` 的字典。
"""
return {
"manifest_version": 2,
"version": "1.0.0",
"name": plugin_id,
"description": "测试插件",
"author": {
"name": "tester",
"url": "https://example.com/tester",
},
"license": "MIT",
"urls": {
"repository": f"https://example.com/{plugin_id}",
},
"host_application": {
"min_version": "1.0.0",
"max_version": "1.0.0",
},
"sdk": {
"min_version": "2.0.0",
"max_version": "2.99.99",
},
"dependencies": dependencies or [],
"capabilities": [],
"i18n": {
"default_locale": "zh-CN",
"supported_locales": ["zh-CN"],
},
"id": plugin_id,
}
def _write_plugin(
plugin_root: Path,
plugin_name: str,
plugin_id: str,
*,
dependencies: list[dict[str, str]] | None = None,
) -> Path:
"""在临时目录中写入一个测试插件。
Args:
plugin_root: 插件根目录。
plugin_name: 插件目录名。
plugin_id: 插件 ID。
dependencies: Python 依赖声明列表。
Returns:
Path: 插件目录路径。
"""
plugin_dir = plugin_root / plugin_name
plugin_dir.mkdir(parents=True)
(plugin_dir / "plugin.py").write_text("def create_plugin():\n return object()\n", encoding="utf-8")
(plugin_dir / "_manifest.json").write_text(
json.dumps(_build_manifest(plugin_id, dependencies=dependencies)),
encoding="utf-8",
)
return plugin_dir
def test_build_plan_blocks_plugin_conflicting_with_host_requirement(tmp_path: Path) -> None:
"""与主程序依赖冲突的插件应被阻止加载。"""
plugin_root = tmp_path / "plugins"
_write_plugin(
plugin_root,
"conflict_plugin",
"test.conflict-plugin",
dependencies=[
{
"type": "python_package",
"name": "numpy",
"version_spec": "<1.0.0",
}
],
)
pipeline = PluginDependencyPipeline(project_root=Path.cwd())
plan = pipeline.build_plan([plugin_root])
assert "test.conflict-plugin" in plan.blocked_plugin_reasons
assert "主程序" in plan.blocked_plugin_reasons["test.conflict-plugin"]
assert plan.install_requirements == ()
def test_build_plan_blocks_plugins_with_conflicting_python_dependencies(tmp_path: Path) -> None:
"""插件之间出现 Python 包版本冲突时应同时阻止双方加载。"""
plugin_root = tmp_path / "plugins"
_write_plugin(
plugin_root,
"plugin_a",
"test.plugin-a",
dependencies=[
{
"type": "python_package",
"name": "demo-package",
"version_spec": "<2.0.0",
}
],
)
_write_plugin(
plugin_root,
"plugin_b",
"test.plugin-b",
dependencies=[
{
"type": "python_package",
"name": "demo-package",
"version_spec": ">=3.0.0",
}
],
)
pipeline = PluginDependencyPipeline(project_root=Path.cwd())
plan = pipeline.build_plan([plugin_root])
assert "test.plugin-a" in plan.blocked_plugin_reasons
assert "test.plugin-b" in plan.blocked_plugin_reasons
assert "test.plugin-b" in plan.blocked_plugin_reasons["test.plugin-a"]
assert "test.plugin-a" in plan.blocked_plugin_reasons["test.plugin-b"]
def test_build_plan_collects_install_requirements_for_missing_packages(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
"""未安装但无冲突的依赖应进入自动安装计划。"""
plugin_root = tmp_path / "plugins"
_write_plugin(
plugin_root,
"plugin_a",
"test.plugin-a",
dependencies=[
{
"type": "python_package",
"name": "demo-package",
"version_spec": ">=1.0.0,<2.0.0",
}
],
)
pipeline = PluginDependencyPipeline(project_root=Path.cwd())
monkeypatch.setattr(
pipeline._manifest_validator,
"get_installed_package_version",
lambda package_name: None if package_name == "demo-package" else "1.0.0",
)
plan = pipeline.build_plan([plugin_root])
assert plan.blocked_plugin_reasons == {}
assert len(plan.install_requirements) == 1
assert plan.install_requirements[0].package_name == "demo-package"
assert plan.install_requirements[0].plugin_ids == ("test.plugin-a",)
assert plan.install_requirements[0].requirement_text == "demo-package>=1.0.0,<2.0.0"
@pytest.mark.asyncio
async def test_execute_blocks_plugins_when_auto_install_fails(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
"""自动安装失败时,相关插件应被阻止加载。"""
plugin_root = tmp_path / "plugins"
_write_plugin(
plugin_root,
"plugin_a",
"test.plugin-a",
dependencies=[
{
"type": "python_package",
"name": "demo-package",
"version_spec": ">=1.0.0,<2.0.0",
}
],
)
pipeline = PluginDependencyPipeline(project_root=Path.cwd())
monkeypatch.setattr(
pipeline._manifest_validator,
"get_installed_package_version",
lambda package_name: None if package_name == "demo-package" else "1.0.0",
)
async def fake_install(_requirements) -> tuple[bool, str]:
"""模拟依赖安装失败。"""
return False, "network error"
monkeypatch.setattr(pipeline, "_install_requirements", fake_install)
result = await pipeline.execute([plugin_root])
assert result.environment_changed is False
assert "test.plugin-a" in result.blocked_plugin_reasons
assert "自动安装 Python 依赖失败" in result.blocked_plugin_reasons["test.plugin-a"]

View File

@@ -3,9 +3,11 @@
验证协议层、传输层、RPC 通信链路的正确性。
"""
# pyright: reportArgumentType=false, reportAttributeAccessIssue=false, reportCallIssue=false, reportIndexIssue=false, reportMissingImports=false, reportOptionalMemberAccess=false
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Awaitable, Callable, Dict, List, Optional
from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence
import asyncio
import json
@@ -1405,6 +1407,57 @@ class TestComponentRegistry:
assert warnings
assert "plugin_a.broken" in warnings[0]
def test_register_hook_handler_rejects_unknown_hook(self):
from src.plugin_runtime.host.component_registry import ComponentRegistrationError, ComponentRegistry
from src.plugin_runtime.host.hook_spec_registry import HookSpecRegistry
reg = ComponentRegistry(hook_spec_registry=HookSpecRegistry())
with pytest.raises(ComponentRegistrationError, match="未注册的 Hook"):
reg.register_component(
"broken_hook",
"hook_handler",
"plugin_a",
{
"hook": "chat.receive.unknown",
"mode": "blocking",
},
)
def test_register_plugin_components_is_atomic_when_hook_invalid(self):
from src.plugin_runtime.host.component_registry import ComponentRegistrationError, ComponentRegistry
from src.plugin_runtime.host.hook_spec_registry import HookSpec, HookSpecRegistry
hook_spec_registry = HookSpecRegistry()
hook_spec_registry.register_hook_spec(HookSpec(name="chat.receive.before_process"))
reg = ComponentRegistry(hook_spec_registry=hook_spec_registry)
reg.register_plugin_components(
"plugin_a",
[
{"name": "cmd_old", "component_type": "command", "metadata": {"command_pattern": r"^/old"}},
],
)
with pytest.raises(ComponentRegistrationError, match="未注册的 Hook"):
reg.register_plugin_components(
"plugin_a",
[
{
"name": "hook_ok",
"component_type": "hook_handler",
"metadata": {"hook": "chat.receive.before_process", "mode": "blocking"},
},
{
"name": "hook_bad",
"component_type": "hook_handler",
"metadata": {"hook": "chat.receive.missing", "mode": "blocking"},
},
],
)
assert reg.get_component("plugin_a.cmd_old") is not None
assert reg.get_component("plugin_a.hook_ok") is None
def test_query_by_type(self):
from src.plugin_runtime.host.component_registry import ComponentRegistry
@@ -2142,6 +2195,18 @@ class TestPluginRuntimeHookEntry:
assert result.kwargs["session_id"] == "s-1"
assert ("b1", "builtin_guard") in call_log
def test_manager_lists_builtin_hook_specs(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""PluginRuntimeManager 应暴露内置 Hook 规格清单。"""
_ComponentRegistry, PluginRuntimeManager = self._import_manager_modules(monkeypatch)
manager = PluginRuntimeManager()
hook_names = {spec.name for spec in manager.list_hook_specs()}
assert "chat.receive.before_process" in hook_names
assert "send_service.before_send" in hook_names
assert "maisaka.planner.after_response" in hook_names
class TestRPCServer:
"""RPC Server 代际保护测试"""
@@ -2828,7 +2893,7 @@ class TestIntegration:
assert instances[0].stopped is True
@pytest.mark.asyncio
async def test_handle_plugin_source_changes_only_reload_matching_supervisor(self, monkeypatch, tmp_path):
async def test_handle_plugin_source_changes_restarts_supervisors_after_dependency_sync(self, monkeypatch, tmp_path):
from src.config.file_watcher import FileChange
from src.plugin_runtime import integration as integration_module
import json
@@ -2852,7 +2917,6 @@ class TestIntegration:
def __init__(self, plugin_dirs, registered_plugins):
self._plugin_dirs = plugin_dirs
self._registered_plugins = registered_plugins
self.reload_reasons = []
self.config_updates = []
def get_loaded_plugin_ids(self):
@@ -2861,9 +2925,6 @@ class TestIntegration:
def get_loaded_plugin_versions(self):
return {plugin_id: "1.0.0" for plugin_id in self._registered_plugins}
async def reload_plugins(self, plugin_ids=None, reason="manual", external_available_plugins=None):
self.reload_reasons.append((plugin_ids, reason, external_available_plugins or {}))
async def notify_plugin_config_updated(self, plugin_id, config_data, config_version=""):
self.config_updates.append((plugin_id, config_data, config_version))
return True
@@ -2872,27 +2933,37 @@ class TestIntegration:
manager._started = True
manager._builtin_supervisor = FakeSupervisor([builtin_root], {"test.alpha": object()})
manager._third_party_supervisor = FakeSupervisor([thirdparty_root], {"test.beta": object()})
dependency_sync_calls = []
restart_calls = []
async def fake_sync(plugin_dirs: Sequence[Path]) -> Any:
"""记录依赖同步调用。"""
dependency_sync_calls.append(list(plugin_dirs))
return integration_module.DependencySyncState(
blocked_changed_plugin_ids={"test.beta"},
environment_changed=False,
)
async def fake_restart(reason: str) -> bool:
"""记录 Supervisor 重启调用。"""
restart_calls.append(reason)
return True
monkeypatch.setattr(manager, "_sync_plugin_dependencies", fake_sync)
monkeypatch.setattr(manager, "_restart_supervisors", fake_restart)
changes = [
FileChange(change_type=1, path=beta_dir / "plugin.py"),
]
refresh_calls = []
def fake_refresh() -> None:
refresh_calls.append(True)
manager._refresh_plugin_config_watch_subscriptions = fake_refresh
await manager._handle_plugin_source_changes(changes)
assert manager._builtin_supervisor.reload_reasons == []
assert manager._third_party_supervisor.reload_reasons == [
(["test.beta"], "file_watcher", {"test.alpha": "1.0.0"})
]
assert dependency_sync_calls == [[builtin_root, thirdparty_root]]
assert restart_calls == ["file_watcher_blocklist_changed"]
assert manager._builtin_supervisor.config_updates == []
assert manager._third_party_supervisor.config_updates == []
assert refresh_calls == [True]
@pytest.mark.asyncio
async def test_reload_plugins_globally_warns_and_skips_cross_supervisor_dependents(self, monkeypatch):
@@ -2974,6 +3045,16 @@ class TestIntegration:
self._registered_plugins = {plugin_id: object() for plugin_id in plugins}
self.config_updates = []
async def inspect_plugin_config(
self,
plugin_id: str,
config_data: Optional[Dict[str, Any]] = None,
use_provided_config: bool = False,
) -> SimpleNamespace:
"""返回测试用的配置解析结果。"""
del config_data, use_provided_config
return SimpleNamespace(enabled=True, normalized_config={"enabled": True}, plugin_id=plugin_id)
async def notify_plugin_config_updated(
self,
plugin_id,
@@ -2997,6 +3078,110 @@ class TestIntegration:
assert manager._builtin_supervisor.config_updates == [("test.alpha", {"enabled": True}, "", "self")]
assert manager._third_party_supervisor.config_updates == []
@pytest.mark.asyncio
async def test_handle_plugin_config_changes_loads_unloaded_enabled_plugin(self, monkeypatch, tmp_path):
from src.plugin_runtime import integration as integration_module
from src.config.file_watcher import FileChange
import json
thirdparty_root = tmp_path / "plugins"
alpha_dir = thirdparty_root / "alpha"
alpha_dir.mkdir(parents=True)
(alpha_dir / "config.toml").write_text("[plugin]\nenabled = true\n", encoding="utf-8")
(alpha_dir / "plugin.py").write_text("def create_plugin():\n return object()\n", encoding="utf-8")
(alpha_dir / "_manifest.json").write_text(json.dumps(build_test_manifest("test.alpha")), encoding="utf-8")
monkeypatch.chdir(tmp_path)
class FakeSupervisor:
def __init__(self, plugin_dirs):
self._plugin_dirs = plugin_dirs
self._registered_plugins = {}
async def inspect_plugin_config(
self,
plugin_id: str,
config_data: Optional[Dict[str, Any]] = None,
use_provided_config: bool = False,
) -> SimpleNamespace:
"""返回测试用的启用配置快照。"""
del config_data, use_provided_config
return SimpleNamespace(enabled=True, normalized_config={"plugin": {"enabled": True}}, plugin_id=plugin_id)
manager = integration_module.PluginRuntimeManager()
manager._started = True
manager._third_party_supervisor = FakeSupervisor([thirdparty_root])
load_calls = []
async def fake_load_plugin_globally(plugin_id: str, reason: str = "manual") -> bool:
"""记录自动加载调用。"""
load_calls.append((plugin_id, reason))
return True
monkeypatch.setattr(manager, "load_plugin_globally", fake_load_plugin_globally)
await manager._handle_plugin_config_changes(
"test.alpha",
[FileChange(change_type=1, path=alpha_dir / "config.toml")],
)
assert load_calls == [("test.alpha", "config_enabled")]
@pytest.mark.asyncio
async def test_handle_plugin_config_changes_unloads_loaded_disabled_plugin(self, monkeypatch, tmp_path):
from src.plugin_runtime import integration as integration_module
from src.config.file_watcher import FileChange
import json
builtin_root = tmp_path / "src" / "plugins" / "built_in"
alpha_dir = builtin_root / "alpha"
alpha_dir.mkdir(parents=True)
(alpha_dir / "config.toml").write_text("[plugin]\nenabled = false\n", encoding="utf-8")
(alpha_dir / "plugin.py").write_text("def create_plugin():\n return object()\n", encoding="utf-8")
(alpha_dir / "_manifest.json").write_text(json.dumps(build_test_manifest("test.alpha")), encoding="utf-8")
monkeypatch.chdir(tmp_path)
class FakeSupervisor:
def __init__(self, plugin_dirs, plugins):
self._plugin_dirs = plugin_dirs
self._registered_plugins = {plugin_id: object() for plugin_id in plugins}
async def inspect_plugin_config(
self,
plugin_id: str,
config_data: Optional[Dict[str, Any]] = None,
use_provided_config: bool = False,
) -> SimpleNamespace:
"""返回测试用的禁用配置快照。"""
del config_data, use_provided_config
return SimpleNamespace(
enabled=False,
normalized_config={"plugin": {"enabled": False}},
plugin_id=plugin_id,
)
manager = integration_module.PluginRuntimeManager()
manager._started = True
manager._builtin_supervisor = FakeSupervisor([builtin_root], ["test.alpha"])
reload_calls = []
async def fake_reload_plugins_globally(plugin_ids: Sequence[str], reason: str = "manual") -> bool:
"""记录自动卸载调用。"""
reload_calls.append((list(plugin_ids), reason))
return True
monkeypatch.setattr(manager, "reload_plugins_globally", fake_reload_plugins_globally)
await manager._handle_plugin_config_changes(
"test.alpha",
[FileChange(change_type=1, path=alpha_dir / "config.toml")],
)
assert reload_calls == [(["test.alpha"], "config_disabled")]
@pytest.mark.asyncio
async def test_handle_main_config_reload_only_notifies_subscribers(self, monkeypatch):
from src.plugin_runtime import integration as integration_module
@@ -3108,6 +3293,55 @@ class TestIntegration:
subscription["paths"][0] for subscription in manager._plugin_file_watcher.subscriptions
} == {alpha_dir / "config.toml", beta_dir / "config.toml"}
def test_refresh_plugin_config_watch_subscriptions_includes_unloaded_plugins(self, tmp_path):
from src.plugin_runtime import integration as integration_module
import json
thirdparty_root = tmp_path / "plugins"
alpha_dir = thirdparty_root / "alpha"
beta_dir = thirdparty_root / "beta"
alpha_dir.mkdir(parents=True)
beta_dir.mkdir(parents=True)
(alpha_dir / "plugin.py").write_text("def create_plugin():\n return object()\n", encoding="utf-8")
(beta_dir / "plugin.py").write_text("def create_plugin():\n return object()\n", encoding="utf-8")
(alpha_dir / "_manifest.json").write_text(json.dumps(build_test_manifest("test.alpha")), encoding="utf-8")
(beta_dir / "_manifest.json").write_text(json.dumps(build_test_manifest("test.beta")), encoding="utf-8")
class FakeWatcher:
def __init__(self):
self.subscriptions = []
def subscribe(
self,
callback: Any,
*,
paths: Optional[Sequence[Path]] = None,
change_types: Any = None,
) -> str:
"""记录新的监听订阅。"""
del callback, change_types
subscription_id = f"sub-{len(self.subscriptions) + 1}"
self.subscriptions.append({"id": subscription_id, "paths": tuple(paths or ())})
return subscription_id
def unsubscribe(self, subscription_id: str) -> bool:
"""兼容 watcher 取消订阅接口。"""
del subscription_id
return True
class FakeSupervisor:
def __init__(self, plugin_dirs, plugins):
self._plugin_dirs = plugin_dirs
self._registered_plugins = {plugin_id: object() for plugin_id in plugins}
manager = integration_module.PluginRuntimeManager()
manager._plugin_file_watcher = FakeWatcher()
manager._third_party_supervisor = FakeSupervisor([thirdparty_root], ["test.alpha"])
manager._refresh_plugin_config_watch_subscriptions()
assert set(manager._plugin_config_watcher_subscriptions.keys()) == {"test.alpha", "test.beta"}
@pytest.mark.asyncio
async def test_component_reload_plugin_returns_failure_when_reload_rolls_back(self, monkeypatch):
from src.plugin_runtime import integration as integration_module

View File

@@ -0,0 +1,96 @@
"""插件运行时浏览器渲染能力测试。"""
from typing import Optional
import pytest
from src.plugin_runtime.integration import PluginRuntimeManager
from src.plugin_runtime.host.supervisor import PluginSupervisor
from src.services.html_render_service import HtmlRenderRequest, HtmlRenderResult
class _FakeRenderService:
"""用于替代真实浏览器渲染服务的测试桩。"""
def __init__(self) -> None:
"""初始化测试桩。"""
self.last_request: Optional[HtmlRenderRequest] = None
async def render_html_to_png(self, request: HtmlRenderRequest) -> HtmlRenderResult:
"""记录请求并返回固定的渲染结果。
Args:
request: 当前渲染请求。
Returns:
HtmlRenderResult: 固定的测试渲染结果。
"""
self.last_request = request
return HtmlRenderResult(
image_base64="ZmFrZS1pbWFnZQ==",
mime_type="image/png",
width=640,
height=480,
render_ms=12,
)
def test_render_capability_is_registered() -> None:
"""Host 注册能力时应包含 render.html2png。"""
manager = PluginRuntimeManager()
supervisor = PluginSupervisor(plugin_dirs=[])
manager._register_capability_impls(supervisor)
assert "render.html2png" in supervisor.capability_service.list_capabilities()
@pytest.mark.asyncio
async def test_render_capability_forwards_request(monkeypatch: pytest.MonkeyPatch) -> None:
"""render.html2png 应将请求透传给浏览器渲染服务。"""
from src.plugin_runtime.capabilities import render as render_capability_module
fake_service = _FakeRenderService()
monkeypatch.setattr(render_capability_module, "get_html_render_service", lambda: fake_service)
manager = PluginRuntimeManager()
result = await manager._cap_render_html2png(
"demo.plugin",
"render.html2png",
{
"html": "<body><div id='card'>hello</div></body>",
"selector": "#card",
"viewport": {"width": 1024, "height": 768},
"device_scale_factor": 1.5,
"full_page": False,
"omit_background": True,
"wait_until": "networkidle",
"wait_for_selector": "#card",
"wait_for_timeout_ms": 150,
"timeout_ms": 3000,
"allow_network": True,
},
)
assert result == {
"success": True,
"result": {
"image_base64": "ZmFrZS1pbWFnZQ==",
"mime_type": "image/png",
"width": 640,
"height": 480,
"render_ms": 12,
},
}
assert fake_service.last_request is not None
assert fake_service.last_request.selector == "#card"
assert fake_service.last_request.viewport_width == 1024
assert fake_service.last_request.viewport_height == 768
assert fake_service.last_request.device_scale_factor == 1.5
assert fake_service.last_request.omit_background is True
assert fake_service.last_request.wait_until == "networkidle"
assert fake_service.last_request.allow_network is True

View File

@@ -0,0 +1,136 @@
"""业务命名 Hook 集成测试。"""
from types import SimpleNamespace
from typing import Any
import os
import sys
import pytest
# 确保项目根目录在 sys.path 中
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
# SDK 包路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "packages", "maibot-plugin-sdk"))
class _FakeHookManager:
"""用于业务 Hook 测试的最小运行时管理器。"""
def __init__(self, responses: dict[str, SimpleNamespace]) -> None:
"""初始化测试管理器。
Args:
responses: 按 Hook 名称预设的返回结果映射。
"""
self._responses = responses
self.calls: list[tuple[str, dict[str, Any]]] = []
async def invoke_hook(self, hook_name: str, **kwargs: Any) -> SimpleNamespace:
"""模拟调用运行时命名 Hook。
Args:
hook_name: 目标 Hook 名称。
**kwargs: 传入 Hook 的参数。
Returns:
SimpleNamespace: 预设的 Hook 返回结果。
"""
self.calls.append((hook_name, dict(kwargs)))
return self._responses.get(hook_name, SimpleNamespace(kwargs=dict(kwargs), aborted=False))
def test_builtin_hook_catalog_includes_new_business_hooks(monkeypatch: pytest.MonkeyPatch) -> None:
"""内置 Hook 目录应包含三个业务系统新增的 Hook。"""
monkeypatch.setattr(sys, "exit", lambda code=0: None)
from src.plugin_runtime.hook_catalog import register_builtin_hook_specs
from src.plugin_runtime.host.hook_spec_registry import HookSpecRegistry
registry = HookSpecRegistry()
hook_names = {spec.name for spec in register_builtin_hook_specs(registry)}
assert "emoji.maisaka.before_select" in hook_names
assert "emoji.register.after_build_emotion" in hook_names
assert "jargon.extract.before_persist" in hook_names
assert "jargon.query.after_search" in hook_names
assert "expression.select.before_select" in hook_names
assert "expression.learn.before_upsert" in hook_names
@pytest.mark.asyncio
async def test_send_emoji_for_maisaka_can_be_aborted_by_hook(monkeypatch: pytest.MonkeyPatch) -> None:
"""表情包系统应允许在选择前被 Hook 中止。"""
from src.chat.emoji_system import maisaka_tool
fake_manager = _FakeHookManager(
{
"emoji.maisaka.before_select": SimpleNamespace(
kwargs={"abort_message": "插件阻止了表情发送。"},
aborted=True,
)
}
)
monkeypatch.setattr(maisaka_tool, "_get_runtime_manager", lambda: fake_manager)
result = await maisaka_tool.send_emoji_for_maisaka(stream_id="stream-1", requested_emotion="开心")
assert result.success is False
assert result.message == "插件阻止了表情发送。"
assert fake_manager.calls[0][0] == "emoji.maisaka.before_select"
@pytest.mark.asyncio
async def test_jargon_extract_can_be_aborted_before_persist(monkeypatch: pytest.MonkeyPatch) -> None:
"""黑话提取结果应允许在写库前被 Hook 中止。"""
from src.learners.jargon_miner import JargonMiner
fake_manager = _FakeHookManager(
{
"jargon.extract.before_persist": SimpleNamespace(
kwargs={"entries": []},
aborted=True,
)
}
)
monkeypatch.setattr(JargonMiner, "_get_runtime_manager", staticmethod(lambda: fake_manager))
miner = JargonMiner(session_id="session-1", session_name="测试会话")
await miner.process_extracted_entries(
[{"content": "yyds", "raw_content": {"[1] yyds 太强了"}}],
)
assert fake_manager.calls[0][0] == "jargon.extract.before_persist"
assert fake_manager.calls[0][1]["session_id"] == "session-1"
@pytest.mark.asyncio
async def test_expression_selection_can_be_aborted_by_hook(monkeypatch: pytest.MonkeyPatch) -> None:
"""表达方式选择流程应允许在开始前被 Hook 中止。"""
from src.learners.expression_selector import ExpressionSelector
fake_manager = _FakeHookManager(
{
"expression.select.before_select": SimpleNamespace(
kwargs={},
aborted=True,
)
}
)
monkeypatch.setattr(ExpressionSelector, "_get_runtime_manager", staticmethod(lambda: fake_manager))
monkeypatch.setattr(ExpressionSelector, "can_use_expression_for_chat", lambda self, chat_id: True)
selector = ExpressionSelector()
selected_expressions, selected_ids = await selector.select_suitable_expressions(
chat_id="session-1",
chat_info="用户刚刚发来一条消息。",
)
assert selected_expressions == []
assert selected_ids == []
assert fake_manager.calls[0][0] == "expression.select.before_select"

View File

@@ -1,4 +1,4 @@
from src.config.official_configs import ChatConfig
from src.config.official_configs import ChatConfig, MessageReceiveConfig
from src.config.config import Config
from src.webui.config_schema import ConfigSchemaGenerator
@@ -60,17 +60,49 @@ def test_nested_model_schema():
def test_field_without_extra_metadata():
"""Test that fields without json_schema_extra still generate valid schema."""
schema = ConfigSchemaGenerator.generate_schema(ChatConfig)
max_context_size = next(f for f in schema["fields"] if f["name"] == "max_context_size")
inevitable_at_reply = next(f for f in schema["fields"] if f["name"] == "inevitable_at_reply")
# Verify basic fields are generated
assert "name" in max_context_size
assert max_context_size["name"] == "max_context_size"
assert "type" in max_context_size
assert max_context_size["type"] == "integer"
assert "label" in max_context_size
assert "required" in max_context_size
assert "name" in inevitable_at_reply
assert inevitable_at_reply["name"] == "inevitable_at_reply"
assert "type" in inevitable_at_reply
assert inevitable_at_reply["type"] == "boolean"
assert "label" in inevitable_at_reply
assert "required" in inevitable_at_reply
# Verify no x-widget or x-icon from json_schema_extra (since field has none)
# These fields should only be present if explicitly defined in json_schema_extra
assert not max_context_size.get("x-widget")
assert not max_context_size.get("x-icon")
assert not inevitable_at_reply.get("x-widget")
assert not inevitable_at_reply.get("x-icon")
def test_all_top_level_sections_have_ui_metadata():
"""所有顶层配置节都必须声明 uiParent 或独立 Tab 的标签与图标。"""
schema = ConfigSchemaGenerator.generate_schema(Config)
for section_name, section_schema in schema["nested"].items():
has_parent = bool(section_schema.get("uiParent"))
has_host_meta = bool(section_schema.get("uiLabel")) and bool(section_schema.get("uiIcon"))
assert has_parent or has_host_meta, f"{section_name} 缺少 UI 元数据"
def test_maisaka_is_host_tab_and_mcp_is_attached_to_it():
"""MaiSaka 应作为独立 TabMCP 作为其子配置挂载。"""
schema = ConfigSchemaGenerator.generate_schema(Config)
maisaka_schema = schema["nested"]["maisaka"]
mcp_schema = schema["nested"]["mcp"]
assert maisaka_schema.get("uiParent") is None
assert maisaka_schema.get("uiLabel") == "MaiSaka"
assert maisaka_schema.get("uiIcon") == "message-circle"
assert mcp_schema.get("uiParent") == "maisaka"
def test_set_field_is_mapped_as_array():
"""set[str] 应映射为前端可识别的 array。"""
schema = ConfigSchemaGenerator.generate_schema(MessageReceiveConfig)
ban_words = next(field for field in schema["fields"] if field["name"] == "ban_words")
assert ban_words["type"] == "array"
assert ban_words["items"]["type"] == "string"