小改动

This commit is contained in:
SengokuCola
2026-04-14 23:53:38 +08:00
parent 2471a2c4a4
commit 4729f5acdb
4 changed files with 6 additions and 595 deletions

View File

@@ -1,23 +0,0 @@
from src.maisaka.chat_loop_service import MaisakaChatLoopService
def test_build_tool_names_log_text_supports_openai_function_schema() -> None:
tool_definitions = [
{
"type": "function",
"function": {
"name": "mute_user",
"description": "禁言指定用户",
"parameters": {
"type": "object",
"properties": {},
},
},
},
{
"name": "reply",
"description": "发送回复",
},
]
assert MaisakaChatLoopService._build_tool_names_log_text(tool_definitions) == "mute_user、reply"

View File

@@ -1,339 +0,0 @@
"""MutePlugin SDK 回归测试。"""
from __future__ import annotations
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Dict, List
import pytest
from maibot_sdk.context import PluginContext
from maibot_sdk.plugin import MaiBotPlugin
from plugins.MutePlugin.plugin import create_plugin
from src.core.tooling import ToolExecutionContext, ToolInvocation
from src.plugin_runtime.component_query import ComponentQueryService
from src.plugin_runtime.runner.manifest_validator import ManifestValidator
def _build_plugin() -> MaiBotPlugin:
"""构造已注入默认配置的插件实例。"""
plugin = create_plugin()
plugin.set_plugin_config(plugin.get_default_config())
return plugin
def test_mute_plugin_manifest_is_valid_v2() -> None:
"""MutePlugin 的 manifest 应符合当前运行时要求。"""
validator = ManifestValidator(host_version="1.0.0", sdk_version="2.3.0")
manifest = validator.load_from_plugin_path(Path("plugins/MutePlugin"))
assert manifest is not None
assert manifest.id == "sengokucola.mute-plugin"
assert manifest.manifest_version == 2
def test_create_plugin_returns_sdk_plugin() -> None:
"""插件入口应返回 SDK 插件实例。"""
plugin = create_plugin()
assert isinstance(plugin, MaiBotPlugin)
@pytest.mark.asyncio
async def test_mute_command_calls_napcat_group_ban_api() -> None:
"""手动禁言命令应通过 NapCat Adapter 新 API 执行。"""
plugin = _build_plugin()
plugin.set_plugin_config(
{
**plugin.get_default_config(),
"components": {
"enable_smart_mute": True,
"enable_mute_command": True,
},
}
)
capability_calls: List[Dict[str, Any]] = []
async def fake_rpc_call(method: str, plugin_id: str = "", payload: Dict[str, Any] | None = None) -> Dict[str, Any]:
assert method == "cap.call"
assert payload is not None
capability_calls.append(payload)
capability = payload["capability"]
if capability == "person.get_id_by_name":
return {"success": True, "person_id": "person-1"}
if capability == "person.get_value":
return {"success": True, "value": "123456"}
if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.get_group_member_info":
return {"success": True, "result": {"role": "member"}}
if capability == "api.call":
return {"success": True, "result": {"status": "ok", "retcode": 0}}
if capability == "send.text":
return {"success": True}
raise AssertionError(f"unexpected capability: {capability}")
plugin._set_context(PluginContext(plugin_id="mute", rpc_call=fake_rpc_call))
success, message, intercept = await plugin.handle_mute_command(
stream_id="group-10001",
group_id="10001",
user_id="42",
matched_groups={
"target": "张三",
"duration": "120",
"reason": "刷屏",
},
)
assert success is True
assert message == "成功禁言 张三"
assert intercept is True
api_call = next(
call
for call in capability_calls
if call["capability"] == "api.call"
and call["args"]["api_name"] == "adapter.napcat.group.set_group_ban"
)
assert api_call["args"]["version"] == "1"
assert api_call["args"]["args"] == {
"group_id": "10001",
"user_id": "123456",
"duration": 120,
}
@pytest.mark.asyncio
async def test_mute_tool_requires_target_person_name() -> None:
"""禁言工具在缺少目标时应直接失败并提示。"""
plugin = _build_plugin()
capability_calls: List[Dict[str, Any]] = []
async def fake_rpc_call(method: str, plugin_id: str = "", payload: Dict[str, Any] | None = None) -> Dict[str, Any]:
assert method == "cap.call"
assert payload is not None
capability_calls.append(payload)
return {"success": True}
plugin._set_context(PluginContext(plugin_id="mute", rpc_call=fake_rpc_call))
success, message = await plugin.handle_mute_tool(
stream_id="group-10001",
group_id="10001",
target="",
duration="60",
reason="测试",
)
assert success is False
assert message == "禁言目标不能为空"
assert capability_calls[-1]["capability"] == "send.text"
assert capability_calls[-1]["args"]["text"] == "没有指定禁言对象哦"
@pytest.mark.asyncio
async def test_mute_tool_can_unwrap_nested_person_user_id_response() -> None:
"""禁言工具应能兼容解包多层 capability 返回结果。"""
plugin = _build_plugin()
capability_calls: List[Dict[str, Any]] = []
async def fake_rpc_call(method: str, plugin_id: str = "", payload: Dict[str, Any] | None = None) -> Dict[str, Any]:
assert method == "cap.call"
assert payload is not None
capability_calls.append(payload)
capability = payload["capability"]
if capability == "person.get_id_by_name":
return {"success": True, "result": {"success": True, "person_id": "person-1"}}
if capability == "person.get_value":
return {"success": True, "result": {"success": True, "value": "123456"}}
if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.get_group_member_info":
return {"success": True, "result": {"role": "member"}}
if capability == "api.call":
return {"success": True, "result": {"status": "ok"}}
if capability == "send.text":
return {"success": True}
raise AssertionError(f"unexpected capability: {capability}")
plugin._set_context(PluginContext(plugin_id="mute", rpc_call=fake_rpc_call))
success, message = await plugin.handle_mute_tool(
stream_id="group-10001",
group_id="10001",
target="张三",
duration=60,
reason="测试",
)
assert success is True
assert message == "成功禁言 张三"
api_call = next(
call
for call in capability_calls
if call["capability"] == "api.call"
and call["args"]["api_name"] == "adapter.napcat.group.set_group_ban"
)
assert api_call["args"]["args"]["user_id"] == "123456"
@pytest.mark.asyncio
async def test_mute_tool_rejects_owner_before_group_ban_call() -> None:
"""禁言工具应在检测到群主时提前返回明确提示。"""
plugin = _build_plugin()
capability_calls: List[Dict[str, Any]] = []
async def fake_rpc_call(method: str, plugin_id: str = "", payload: Dict[str, Any] | None = None) -> Dict[str, Any]:
assert method == "cap.call"
assert payload is not None
capability_calls.append(payload)
capability = payload["capability"]
if capability == "person.get_id_by_name":
return {"success": True, "person_id": "person-1"}
if capability == "person.get_value":
return {"success": True, "value": "123456"}
if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.get_group_member_info":
return {"success": True, "result": {"role": "owner"}}
if capability == "send.text":
return {"success": True}
raise AssertionError(f"unexpected capability: {capability}")
plugin._set_context(PluginContext(plugin_id="mute", rpc_call=fake_rpc_call))
success, message = await plugin.handle_mute_tool(
stream_id="group-10001",
group_id="10001",
target="张三",
duration=60,
reason="测试",
)
assert success is False
assert message == "张三 是群主,不能被禁言"
assert not any(
call["capability"] == "api.call" and call["args"]["api_name"] == "adapter.napcat.group.set_group_ban"
for call in capability_calls
)
@pytest.mark.asyncio
async def test_mute_tool_maps_cannot_ban_owner_error_message() -> None:
"""NapCat 返回 cannot ban owner 时应转成明确中文提示。"""
plugin = _build_plugin()
capability_calls: List[Dict[str, Any]] = []
async def fake_rpc_call(method: str, plugin_id: str = "", payload: Dict[str, Any] | None = None) -> Dict[str, Any]:
assert method == "cap.call"
assert payload is not None
capability_calls.append(payload)
capability = payload["capability"]
if capability == "person.get_id_by_name":
return {"success": True, "person_id": "person-1"}
if capability == "person.get_value":
return {"success": True, "value": "123456"}
if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.get_group_member_info":
return {"success": True, "result": {"role": "member"}}
if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.set_group_ban":
return {"success": False, "error": "NapCat 动作返回失败: action=set_group_ban message=cannot ban owner"}
if capability == "send.text":
return {"success": True}
raise AssertionError(f"unexpected capability: {capability}")
plugin._set_context(PluginContext(plugin_id="mute", rpc_call=fake_rpc_call))
success, message = await plugin.handle_mute_tool(
stream_id="group-10001",
group_id="10001",
target="张三",
duration=60,
reason="测试",
)
assert success is False
assert message == "张三 是群主,不能被禁言"
@pytest.mark.asyncio
async def test_mute_tool_accepts_nested_ok_api_result() -> None:
"""嵌套的 success/result/status=ok 返回值也应判定为成功。"""
plugin = _build_plugin()
async def fake_rpc_call(method: str, plugin_id: str = "", payload: Dict[str, Any] | None = None) -> Dict[str, Any]:
assert method == "cap.call"
assert payload is not None
capability = payload["capability"]
if capability == "person.get_id_by_name":
return {"success": True, "person_id": "person-1"}
if capability == "person.get_value":
return {"success": True, "value": "123456"}
if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.get_group_member_info":
return {"success": True, "result": {"role": "member"}}
if capability == "api.call" and payload["args"]["api_name"] == "adapter.napcat.group.set_group_ban":
return {
"success": True,
"result": {
"status": "ok",
"retcode": 0,
"data": None,
"message": "",
"wording": "",
},
}
if capability == "send.text":
return {"success": True}
raise AssertionError(f"unexpected capability: {capability}")
plugin._set_context(PluginContext(plugin_id="mute", rpc_call=fake_rpc_call))
success, message = await plugin.handle_mute_tool(
stream_id="group-10001",
group_id="10001",
target="张三",
duration=60,
reason="测试",
)
assert success is True
assert message == "成功禁言 张三"
def test_tool_invocation_payload_injects_group_and_user_context() -> None:
"""插件工具执行时应自动补齐群聊上下文字段。"""
entry = SimpleNamespace(invoke_method="plugin.invoke_tool")
anchor_message = SimpleNamespace(
message_info=SimpleNamespace(
group_info=SimpleNamespace(group_id="10001"),
user_info=SimpleNamespace(user_id="20002"),
)
)
invocation = ToolInvocation(tool_name="mute", arguments={"target": "张三"}, stream_id="session-1")
context = ToolExecutionContext(
session_id="session-1",
stream_id="session-1",
reasoning="test",
metadata={"anchor_message": anchor_message},
)
payload = ComponentQueryService._build_tool_invocation_payload(entry, invocation, context)
assert payload["target"] == "张三"
assert payload["stream_id"] == "session-1"
assert payload["chat_id"] == "session-1"
assert payload["group_id"] == "10001"
assert payload["user_id"] == "20002"

View File

@@ -1,227 +0,0 @@
from pathlib import Path
from types import ModuleType, SimpleNamespace
import importlib.util
import sys
class DummyLogger:
def __init__(self) -> None:
self.warning_messages: list[str] = []
def debug(self, _msg: str) -> None:
return
def info(self, _msg: str) -> None:
return
def warning(self, msg: str) -> None:
self.warning_messages.append(msg)
def error(self, _msg: str) -> None:
return
def load_utils_module(monkeypatch, qq_account=123456, platforms=None):
logger = DummyLogger()
configured_platforms = platforms or []
def _stub_module(name: str) -> ModuleType:
module = ModuleType(name)
monkeypatch.setitem(sys.modules, name, module)
return module
for package_name in [
"src",
"src.chat",
"src.chat.message_receive",
"src.chat.utils",
"src.common",
"src.config",
"src.llm_models",
"src.person_info",
]:
if package_name not in sys.modules:
package_module = ModuleType(package_name)
package_module.__path__ = []
monkeypatch.setitem(sys.modules, package_name, package_module)
jieba_module = ModuleType("jieba")
jieba_module.cut = lambda text: list(text)
monkeypatch.setitem(sys.modules, "jieba", jieba_module)
logger_module = _stub_module("src.common.logger")
logger_module.get_logger = lambda _name: logger
config_module = _stub_module("src.config.config")
config_module.global_config = SimpleNamespace(
bot=SimpleNamespace(
qq_account=qq_account,
platforms=configured_platforms,
nickname="MaiBot",
alias_names=[],
),
chat=SimpleNamespace(
at_bot_inevitable_reply=1,
mentioned_bot_reply=1,
),
)
config_module.model_config = SimpleNamespace()
message_module = _stub_module("src.chat.message_receive.message")
class SessionMessage:
pass
message_module.SessionMessage = SessionMessage
chat_manager_module = _stub_module("src.chat.message_receive.chat_manager")
chat_manager_module.chat_manager = SimpleNamespace(get_session_by_session_id=lambda _chat_id: None)
llm_module = _stub_module("src.llm_models.utils_model")
class LLMRequest:
def __init__(self, *args, **kwargs) -> None:
del args, kwargs
llm_module.LLMRequest = LLMRequest
person_module = _stub_module("src.person_info.person_info")
class Person:
pass
person_module.Person = Person
typo_generator_module = _stub_module("src.chat.utils.typo_generator")
class ChineseTypoGenerator:
def __init__(self, *args, **kwargs) -> None:
del args, kwargs
def create_typo_sentence(self, sentence: str):
return sentence, ""
typo_generator_module.ChineseTypoGenerator = ChineseTypoGenerator
file_path = Path(__file__).parent.parent.parent / "src" / "chat" / "utils" / "utils.py"
spec = importlib.util.spec_from_file_location("src.chat.utils.utils", file_path)
utils_module = importlib.util.module_from_spec(spec)
utils_module.__package__ = "src.chat.utils"
monkeypatch.setitem(sys.modules, "src.chat.utils.utils", utils_module)
assert spec.loader is not None
spec.loader.exec_module(utils_module)
return utils_module, logger
def test_platform_specific_bot_accounts(monkeypatch):
utils_module, _logger = load_utils_module(
monkeypatch,
qq_account=123456,
platforms=[" TG : tg_bot ", "discord: disc_bot"],
)
assert utils_module.get_bot_account("qq") == "123456"
assert utils_module.get_bot_account("webui") == "123456"
assert utils_module.get_bot_account("telegram") == "tg_bot"
assert utils_module.get_bot_account("tg") == "tg_bot"
assert utils_module.get_bot_account("discord") == "disc_bot"
assert utils_module.is_bot_self("qq", "123456")
assert utils_module.is_bot_self("webui", "123456")
assert utils_module.is_bot_self("telegram", "tg_bot")
assert utils_module.is_bot_self(" TG ", "tg_bot")
def test_get_all_bot_accounts_includes_runtime_aliases(monkeypatch):
utils_module, _logger = load_utils_module(
monkeypatch,
qq_account=123456,
platforms=["TG:tg_bot", "discord:disc_bot"],
)
assert utils_module.get_all_bot_accounts() == {
"qq": "123456",
"webui": "123456",
"telegram": "tg_bot",
"tg": "tg_bot",
"discord": "disc_bot",
}
def test_get_all_bot_accounts_keeps_canonical_qq_identity(monkeypatch):
utils_module, _logger = load_utils_module(
monkeypatch,
qq_account=123456,
platforms=["qq:999999", "webui:888888", "TG:tg_bot"],
)
assert utils_module.get_all_bot_accounts()["qq"] == "123456"
assert utils_module.get_all_bot_accounts()["webui"] == "123456"
def test_unknown_platform_no_longer_falls_back_to_qq(monkeypatch):
utils_module, logger = load_utils_module(monkeypatch, qq_account=123456, platforms=[])
assert utils_module.is_bot_self("unknown_platform", "123456") is False
assert logger.warning_messages
assert "unknown_platform" in logger.warning_messages[-1]
def test_unknown_platform_warns_only_once(monkeypatch):
utils_module, logger = load_utils_module(monkeypatch, qq_account=123456, platforms=[])
assert utils_module.is_bot_self("unknown_platform", "first") is False
assert utils_module.is_bot_self(" unknown_platform ", "second") is False
assert len(logger.warning_messages) == 1
def test_unconfigured_qq_account_disables_qq_and_webui_identity(monkeypatch):
utils_module, _logger = load_utils_module(monkeypatch, qq_account=0, platforms=["telegram:tg_bot"])
assert utils_module.get_bot_account("qq") == ""
assert utils_module.get_bot_account("webui") == ""
assert utils_module.is_bot_self("qq", "0") is False
assert utils_module.is_bot_self("webui", "0") is False
def test_is_mentioned_bot_in_message_uses_platform_account(monkeypatch):
utils_module, _logger = load_utils_module(monkeypatch, qq_account=123456, platforms=["TG:tg_bot"])
message = SimpleNamespace(
processed_plain_text="@tg_bot 你好",
platform="telegram",
is_mentioned=False,
message_segment=None,
message_info=SimpleNamespace(
additional_config={},
user_info=SimpleNamespace(user_id="user_1"),
),
)
is_mentioned, is_at, reply_probability = utils_module.is_mentioned_bot_in_message(message)
assert is_mentioned is True
assert is_at is True
assert reply_probability == 1.0
def test_is_mentioned_bot_in_message_normalizes_qq_platform(monkeypatch):
utils_module, _logger = load_utils_module(monkeypatch, qq_account=123456, platforms=[])
message = SimpleNamespace(
processed_plain_text="@<MaiBot:123456> 你好",
platform=" QQ ",
is_mentioned=False,
message_segment=None,
message_info=SimpleNamespace(
additional_config={},
user_info=SimpleNamespace(user_id="user_1"),
),
)
is_mentioned, is_at, reply_probability = utils_module.is_mentioned_bot_in_message(message)
assert is_mentioned is True
assert is_at is True
assert reply_probability == 1.0

View File

@@ -627,18 +627,18 @@ class MaisakaChatLoopService:
break
if not selected_indices:
return [], f"没有选择到上下文消息,实际发送 {effective_context_size} 条 user/assistant 消息"
return [], "实际发送 0 条消息tool 0 条,普通消息 0 条)"
selected_indices.reverse()
selected_history = [filtered_history[index] for index in selected_indices]
selected_history, hidden_assistant_count = MaisakaChatLoopService._hide_early_assistant_messages(selected_history)
selected_history, _ = MaisakaChatLoopService._hide_early_assistant_messages(selected_history)
selected_history, _ = drop_orphan_tool_results(selected_history)
tool_message_count = sum(1 for message in selected_history if isinstance(message, ToolResultMessage))
normal_message_count = len(selected_history) - tool_message_count
selection_reason = (
f"上下文裁剪:最近 {effective_context_size} 条 user/assistant 消息"
f"实际发送 {len(selected_history)}"
f"实际发送 {len(selected_history)}消息"
f"|消息 {normal_message_count} 条|tool {tool_message_count}"
)
if hidden_assistant_count > 0:
selection_reason += f",已隐藏最早 {hidden_assistant_count} 条 assistant 消息"
return (
selected_history,
selection_reason,