From 4729f5acdb135589375a5ee898d5d299c764e80c Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Tue, 14 Apr 2026 23:53:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=8F=E6=94=B9=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytests/test_maisaka_tool_logging.py | 23 -- pytests/test_mute_plugin_sdk.py | 339 ------------------ pytests/utils_test/test_bot_identity_utils.py | 227 ------------ src/maisaka/chat_loop_service.py | 12 +- 4 files changed, 6 insertions(+), 595 deletions(-) delete mode 100644 pytests/test_maisaka_tool_logging.py delete mode 100644 pytests/test_mute_plugin_sdk.py delete mode 100644 pytests/utils_test/test_bot_identity_utils.py diff --git a/pytests/test_maisaka_tool_logging.py b/pytests/test_maisaka_tool_logging.py deleted file mode 100644 index 0216eb83..00000000 --- a/pytests/test_maisaka_tool_logging.py +++ /dev/null @@ -1,23 +0,0 @@ -from src.maisaka.chat_loop_service import MaisakaChatLoopService - - -def test_build_tool_names_log_text_supports_openai_function_schema() -> None: - tool_definitions = [ - { - "type": "function", - "function": { - "name": "mute_user", - "description": "禁言指定用户", - "parameters": { - "type": "object", - "properties": {}, - }, - }, - }, - { - "name": "reply", - "description": "发送回复", - }, - ] - - assert MaisakaChatLoopService._build_tool_names_log_text(tool_definitions) == "mute_user、reply" diff --git a/pytests/test_mute_plugin_sdk.py b/pytests/test_mute_plugin_sdk.py deleted file mode 100644 index c811cc51..00000000 --- a/pytests/test_mute_plugin_sdk.py +++ /dev/null @@ -1,339 +0,0 @@ -"""MutePlugin SDK 回归测试。""" - -from __future__ import annotations - -from pathlib import Path -from types import SimpleNamespace -from typing import Any, Dict, List - -import pytest - -from maibot_sdk.context import PluginContext -from maibot_sdk.plugin import MaiBotPlugin - -from plugins.MutePlugin.plugin import create_plugin -from src.core.tooling import ToolExecutionContext, ToolInvocation -from src.plugin_runtime.component_query import ComponentQueryService -from src.plugin_runtime.runner.manifest_validator import ManifestValidator - - -def _build_plugin() -> MaiBotPlugin: - """构造已注入默认配置的插件实例。""" - - plugin = create_plugin() - plugin.set_plugin_config(plugin.get_default_config()) - return plugin - - -def test_mute_plugin_manifest_is_valid_v2() -> None: - """MutePlugin 的 manifest 应符合当前运行时要求。""" - - validator = ManifestValidator(host_version="1.0.0", sdk_version="2.3.0") - manifest = validator.load_from_plugin_path(Path("plugins/MutePlugin")) - - assert manifest is not None - assert manifest.id == "sengokucola.mute-plugin" - assert manifest.manifest_version == 2 - - -def test_create_plugin_returns_sdk_plugin() -> None: - """插件入口应返回 SDK 插件实例。""" - - plugin = create_plugin() - - assert isinstance(plugin, MaiBotPlugin) - - -@pytest.mark.asyncio -async def test_mute_command_calls_napcat_group_ban_api() -> None: - """手动禁言命令应通过 NapCat Adapter 新 API 执行。""" - - plugin = _build_plugin() - plugin.set_plugin_config( - { - **plugin.get_default_config(), - "components": { - "enable_smart_mute": True, - "enable_mute_command": True, - }, - } - ) - - capability_calls: List[Dict[str, Any]] = [] - - async def fake_rpc_call(method: str, plugin_id: str = "", payload: Dict[str, Any] | None = None) -> Dict[str, Any]: - assert method == "cap.call" - assert payload is not None - capability_calls.append(payload) - - capability = payload["capability"] - if capability == "person.get_id_by_name": - return {"success": True, "person_id": "person-1"} - if capability == "person.get_value": - return {"success": True, "value": "123456"} - if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.get_group_member_info": - return {"success": True, "result": {"role": "member"}} - if capability == "api.call": - return {"success": True, "result": {"status": "ok", "retcode": 0}} - if capability == "send.text": - return {"success": True} - raise AssertionError(f"unexpected capability: {capability}") - - plugin._set_context(PluginContext(plugin_id="mute", rpc_call=fake_rpc_call)) - - success, message, intercept = await plugin.handle_mute_command( - stream_id="group-10001", - group_id="10001", - user_id="42", - matched_groups={ - "target": "张三", - "duration": "120", - "reason": "刷屏", - }, - ) - - assert success is True - assert message == "成功禁言 张三" - assert intercept is True - - api_call = next( - call - for call in capability_calls - if call["capability"] == "api.call" - and call["args"]["api_name"] == "adapter.napcat.group.set_group_ban" - ) - assert api_call["args"]["version"] == "1" - assert api_call["args"]["args"] == { - "group_id": "10001", - "user_id": "123456", - "duration": 120, - } - - -@pytest.mark.asyncio -async def test_mute_tool_requires_target_person_name() -> None: - """禁言工具在缺少目标时应直接失败并提示。""" - - plugin = _build_plugin() - capability_calls: List[Dict[str, Any]] = [] - - async def fake_rpc_call(method: str, plugin_id: str = "", payload: Dict[str, Any] | None = None) -> Dict[str, Any]: - assert method == "cap.call" - assert payload is not None - capability_calls.append(payload) - return {"success": True} - - plugin._set_context(PluginContext(plugin_id="mute", rpc_call=fake_rpc_call)) - - success, message = await plugin.handle_mute_tool( - stream_id="group-10001", - group_id="10001", - target="", - duration="60", - reason="测试", - ) - - assert success is False - assert message == "禁言目标不能为空" - assert capability_calls[-1]["capability"] == "send.text" - assert capability_calls[-1]["args"]["text"] == "没有指定禁言对象哦" - - -@pytest.mark.asyncio -async def test_mute_tool_can_unwrap_nested_person_user_id_response() -> None: - """禁言工具应能兼容解包多层 capability 返回结果。""" - - plugin = _build_plugin() - capability_calls: List[Dict[str, Any]] = [] - - async def fake_rpc_call(method: str, plugin_id: str = "", payload: Dict[str, Any] | None = None) -> Dict[str, Any]: - assert method == "cap.call" - assert payload is not None - capability_calls.append(payload) - - capability = payload["capability"] - if capability == "person.get_id_by_name": - return {"success": True, "result": {"success": True, "person_id": "person-1"}} - if capability == "person.get_value": - return {"success": True, "result": {"success": True, "value": "123456"}} - if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.get_group_member_info": - return {"success": True, "result": {"role": "member"}} - if capability == "api.call": - return {"success": True, "result": {"status": "ok"}} - if capability == "send.text": - return {"success": True} - raise AssertionError(f"unexpected capability: {capability}") - - plugin._set_context(PluginContext(plugin_id="mute", rpc_call=fake_rpc_call)) - - success, message = await plugin.handle_mute_tool( - stream_id="group-10001", - group_id="10001", - target="张三", - duration=60, - reason="测试", - ) - - assert success is True - assert message == "成功禁言 张三" - - api_call = next( - call - for call in capability_calls - if call["capability"] == "api.call" - and call["args"]["api_name"] == "adapter.napcat.group.set_group_ban" - ) - assert api_call["args"]["args"]["user_id"] == "123456" - - -@pytest.mark.asyncio -async def test_mute_tool_rejects_owner_before_group_ban_call() -> None: - """禁言工具应在检测到群主时提前返回明确提示。""" - - plugin = _build_plugin() - capability_calls: List[Dict[str, Any]] = [] - - async def fake_rpc_call(method: str, plugin_id: str = "", payload: Dict[str, Any] | None = None) -> Dict[str, Any]: - assert method == "cap.call" - assert payload is not None - capability_calls.append(payload) - - capability = payload["capability"] - if capability == "person.get_id_by_name": - return {"success": True, "person_id": "person-1"} - if capability == "person.get_value": - return {"success": True, "value": "123456"} - if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.get_group_member_info": - return {"success": True, "result": {"role": "owner"}} - if capability == "send.text": - return {"success": True} - raise AssertionError(f"unexpected capability: {capability}") - - plugin._set_context(PluginContext(plugin_id="mute", rpc_call=fake_rpc_call)) - - success, message = await plugin.handle_mute_tool( - stream_id="group-10001", - group_id="10001", - target="张三", - duration=60, - reason="测试", - ) - - assert success is False - assert message == "张三 是群主,不能被禁言" - assert not any( - call["capability"] == "api.call" and call["args"]["api_name"] == "adapter.napcat.group.set_group_ban" - for call in capability_calls - ) - - -@pytest.mark.asyncio -async def test_mute_tool_maps_cannot_ban_owner_error_message() -> None: - """NapCat 返回 cannot ban owner 时应转成明确中文提示。""" - - plugin = _build_plugin() - capability_calls: List[Dict[str, Any]] = [] - - async def fake_rpc_call(method: str, plugin_id: str = "", payload: Dict[str, Any] | None = None) -> Dict[str, Any]: - assert method == "cap.call" - assert payload is not None - capability_calls.append(payload) - - capability = payload["capability"] - if capability == "person.get_id_by_name": - return {"success": True, "person_id": "person-1"} - if capability == "person.get_value": - return {"success": True, "value": "123456"} - if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.get_group_member_info": - return {"success": True, "result": {"role": "member"}} - if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.set_group_ban": - return {"success": False, "error": "NapCat 动作返回失败: action=set_group_ban message=cannot ban owner"} - if capability == "send.text": - return {"success": True} - raise AssertionError(f"unexpected capability: {capability}") - - plugin._set_context(PluginContext(plugin_id="mute", rpc_call=fake_rpc_call)) - - success, message = await plugin.handle_mute_tool( - stream_id="group-10001", - group_id="10001", - target="张三", - duration=60, - reason="测试", - ) - - assert success is False - assert message == "张三 是群主,不能被禁言" - - -@pytest.mark.asyncio -async def test_mute_tool_accepts_nested_ok_api_result() -> None: - """嵌套的 success/result/status=ok 返回值也应判定为成功。""" - - plugin = _build_plugin() - - async def fake_rpc_call(method: str, plugin_id: str = "", payload: Dict[str, Any] | None = None) -> Dict[str, Any]: - assert method == "cap.call" - assert payload is not None - - capability = payload["capability"] - if capability == "person.get_id_by_name": - return {"success": True, "person_id": "person-1"} - if capability == "person.get_value": - return {"success": True, "value": "123456"} - if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.get_group_member_info": - return {"success": True, "result": {"role": "member"}} - if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.set_group_ban": - return { - "success": True, - "result": { - "status": "ok", - "retcode": 0, - "data": None, - "message": "", - "wording": "", - }, - } - if capability == "send.text": - return {"success": True} - raise AssertionError(f"unexpected capability: {capability}") - - plugin._set_context(PluginContext(plugin_id="mute", rpc_call=fake_rpc_call)) - - success, message = await plugin.handle_mute_tool( - stream_id="group-10001", - group_id="10001", - target="张三", - duration=60, - reason="测试", - ) - - assert success is True - assert message == "成功禁言 张三" - - -def test_tool_invocation_payload_injects_group_and_user_context() -> None: - """插件工具执行时应自动补齐群聊上下文字段。""" - - entry = SimpleNamespace(invoke_method="plugin.invoke_tool") - anchor_message = SimpleNamespace( - message_info=SimpleNamespace( - group_info=SimpleNamespace(group_id="10001"), - user_info=SimpleNamespace(user_id="20002"), - ) - ) - invocation = ToolInvocation(tool_name="mute", arguments={"target": "张三"}, stream_id="session-1") - context = ToolExecutionContext( - session_id="session-1", - stream_id="session-1", - reasoning="test", - metadata={"anchor_message": anchor_message}, - ) - - payload = ComponentQueryService._build_tool_invocation_payload(entry, invocation, context) - - assert payload["target"] == "张三" - assert payload["stream_id"] == "session-1" - assert payload["chat_id"] == "session-1" - assert payload["group_id"] == "10001" - assert payload["user_id"] == "20002" diff --git a/pytests/utils_test/test_bot_identity_utils.py b/pytests/utils_test/test_bot_identity_utils.py deleted file mode 100644 index c345174b..00000000 --- a/pytests/utils_test/test_bot_identity_utils.py +++ /dev/null @@ -1,227 +0,0 @@ -from pathlib import Path -from types import ModuleType, SimpleNamespace - -import importlib.util -import sys - - -class DummyLogger: - def __init__(self) -> None: - self.warning_messages: list[str] = [] - - def debug(self, _msg: str) -> None: - return - - def info(self, _msg: str) -> None: - return - - def warning(self, msg: str) -> None: - self.warning_messages.append(msg) - - def error(self, _msg: str) -> None: - return - - -def load_utils_module(monkeypatch, qq_account=123456, platforms=None): - logger = DummyLogger() - configured_platforms = platforms or [] - - def _stub_module(name: str) -> ModuleType: - module = ModuleType(name) - monkeypatch.setitem(sys.modules, name, module) - return module - - for package_name in [ - "src", - "src.chat", - "src.chat.message_receive", - "src.chat.utils", - "src.common", - "src.config", - "src.llm_models", - "src.person_info", - ]: - if package_name not in sys.modules: - package_module = ModuleType(package_name) - package_module.__path__ = [] - monkeypatch.setitem(sys.modules, package_name, package_module) - - jieba_module = ModuleType("jieba") - jieba_module.cut = lambda text: list(text) - monkeypatch.setitem(sys.modules, "jieba", jieba_module) - - logger_module = _stub_module("src.common.logger") - logger_module.get_logger = lambda _name: logger - - config_module = _stub_module("src.config.config") - config_module.global_config = SimpleNamespace( - bot=SimpleNamespace( - qq_account=qq_account, - platforms=configured_platforms, - nickname="MaiBot", - alias_names=[], - ), - chat=SimpleNamespace( - at_bot_inevitable_reply=1, - mentioned_bot_reply=1, - ), - ) - config_module.model_config = SimpleNamespace() - - message_module = _stub_module("src.chat.message_receive.message") - - class SessionMessage: - pass - - message_module.SessionMessage = SessionMessage - - chat_manager_module = _stub_module("src.chat.message_receive.chat_manager") - chat_manager_module.chat_manager = SimpleNamespace(get_session_by_session_id=lambda _chat_id: None) - - llm_module = _stub_module("src.llm_models.utils_model") - - class LLMRequest: - def __init__(self, *args, **kwargs) -> None: - del args, kwargs - - llm_module.LLMRequest = LLMRequest - - person_module = _stub_module("src.person_info.person_info") - - class Person: - pass - - person_module.Person = Person - - typo_generator_module = _stub_module("src.chat.utils.typo_generator") - - class ChineseTypoGenerator: - def __init__(self, *args, **kwargs) -> None: - del args, kwargs - - def create_typo_sentence(self, sentence: str): - return sentence, "" - - typo_generator_module.ChineseTypoGenerator = ChineseTypoGenerator - - file_path = Path(__file__).parent.parent.parent / "src" / "chat" / "utils" / "utils.py" - spec = importlib.util.spec_from_file_location("src.chat.utils.utils", file_path) - utils_module = importlib.util.module_from_spec(spec) - utils_module.__package__ = "src.chat.utils" - monkeypatch.setitem(sys.modules, "src.chat.utils.utils", utils_module) - assert spec.loader is not None - spec.loader.exec_module(utils_module) - return utils_module, logger - - -def test_platform_specific_bot_accounts(monkeypatch): - utils_module, _logger = load_utils_module( - monkeypatch, - qq_account=123456, - platforms=[" TG : tg_bot ", "discord: disc_bot"], - ) - - assert utils_module.get_bot_account("qq") == "123456" - assert utils_module.get_bot_account("webui") == "123456" - assert utils_module.get_bot_account("telegram") == "tg_bot" - assert utils_module.get_bot_account("tg") == "tg_bot" - assert utils_module.get_bot_account("discord") == "disc_bot" - - assert utils_module.is_bot_self("qq", "123456") - assert utils_module.is_bot_self("webui", "123456") - assert utils_module.is_bot_self("telegram", "tg_bot") - assert utils_module.is_bot_self(" TG ", "tg_bot") - - -def test_get_all_bot_accounts_includes_runtime_aliases(monkeypatch): - utils_module, _logger = load_utils_module( - monkeypatch, - qq_account=123456, - platforms=["TG:tg_bot", "discord:disc_bot"], - ) - - assert utils_module.get_all_bot_accounts() == { - "qq": "123456", - "webui": "123456", - "telegram": "tg_bot", - "tg": "tg_bot", - "discord": "disc_bot", - } - - -def test_get_all_bot_accounts_keeps_canonical_qq_identity(monkeypatch): - utils_module, _logger = load_utils_module( - monkeypatch, - qq_account=123456, - platforms=["qq:999999", "webui:888888", "TG:tg_bot"], - ) - - assert utils_module.get_all_bot_accounts()["qq"] == "123456" - assert utils_module.get_all_bot_accounts()["webui"] == "123456" - - -def test_unknown_platform_no_longer_falls_back_to_qq(monkeypatch): - utils_module, logger = load_utils_module(monkeypatch, qq_account=123456, platforms=[]) - - assert utils_module.is_bot_self("unknown_platform", "123456") is False - assert logger.warning_messages - assert "unknown_platform" in logger.warning_messages[-1] - - -def test_unknown_platform_warns_only_once(monkeypatch): - utils_module, logger = load_utils_module(monkeypatch, qq_account=123456, platforms=[]) - - assert utils_module.is_bot_self("unknown_platform", "first") is False - assert utils_module.is_bot_self(" unknown_platform ", "second") is False - assert len(logger.warning_messages) == 1 - - -def test_unconfigured_qq_account_disables_qq_and_webui_identity(monkeypatch): - utils_module, _logger = load_utils_module(monkeypatch, qq_account=0, platforms=["telegram:tg_bot"]) - - assert utils_module.get_bot_account("qq") == "" - assert utils_module.get_bot_account("webui") == "" - assert utils_module.is_bot_self("qq", "0") is False - assert utils_module.is_bot_self("webui", "0") is False - - -def test_is_mentioned_bot_in_message_uses_platform_account(monkeypatch): - utils_module, _logger = load_utils_module(monkeypatch, qq_account=123456, platforms=["TG:tg_bot"]) - - message = SimpleNamespace( - processed_plain_text="@tg_bot 你好", - platform="telegram", - is_mentioned=False, - message_segment=None, - message_info=SimpleNamespace( - additional_config={}, - user_info=SimpleNamespace(user_id="user_1"), - ), - ) - - is_mentioned, is_at, reply_probability = utils_module.is_mentioned_bot_in_message(message) - - assert is_mentioned is True - assert is_at is True - assert reply_probability == 1.0 - - -def test_is_mentioned_bot_in_message_normalizes_qq_platform(monkeypatch): - utils_module, _logger = load_utils_module(monkeypatch, qq_account=123456, platforms=[]) - - message = SimpleNamespace( - processed_plain_text="@ 你好", - platform=" QQ ", - is_mentioned=False, - message_segment=None, - message_info=SimpleNamespace( - additional_config={}, - user_info=SimpleNamespace(user_id="user_1"), - ), - ) - - is_mentioned, is_at, reply_probability = utils_module.is_mentioned_bot_in_message(message) - - assert is_mentioned is True - assert is_at is True - assert reply_probability == 1.0 diff --git a/src/maisaka/chat_loop_service.py b/src/maisaka/chat_loop_service.py index e45fa2d6..f12fb049 100644 --- a/src/maisaka/chat_loop_service.py +++ b/src/maisaka/chat_loop_service.py @@ -627,18 +627,18 @@ class MaisakaChatLoopService: break if not selected_indices: - return [], f"没有选择到上下文消息,实际发送 {effective_context_size} 条 user/assistant 消息" + return [], "实际发送 0 条消息(tool 0 条,普通消息 0 条)" selected_indices.reverse() selected_history = [filtered_history[index] for index in selected_indices] - selected_history, hidden_assistant_count = MaisakaChatLoopService._hide_early_assistant_messages(selected_history) + selected_history, _ = MaisakaChatLoopService._hide_early_assistant_messages(selected_history) selected_history, _ = drop_orphan_tool_results(selected_history) + tool_message_count = sum(1 for message in selected_history if isinstance(message, ToolResultMessage)) + normal_message_count = len(selected_history) - tool_message_count selection_reason = ( - f"上下文裁剪:最近 {effective_context_size} 条 user/assistant 消息," - f"实际发送 {len(selected_history)} 条" + f"实际发送 {len(selected_history)} 条消息" + f"|消息 {normal_message_count} 条|tool {tool_message_count} 条" ) - if hidden_assistant_count > 0: - selection_reason += f",已隐藏最早 {hidden_assistant_count} 条 assistant 消息" return ( selected_history, selection_reason,