feat:重构maisaka的消息类型,添加打断功能

This commit is contained in:
SengokuCola
2026-03-30 00:45:41 +08:00
parent b5408b4550
commit 01ef29aadb
34 changed files with 670 additions and 7782 deletions

View File

@@ -1,30 +0,0 @@
# 运行时配置(包含用户敏感信息)
config.toml
# 备份文件
*.backup.*
*.bak
# 日志
logs/
*.log
*.jsonl
# Python 缓存
__pycache__/
*.py[cod]
*$py.class
*.so
# 本地测试脚本(仓库不提交)
test_*.py
# IDE
.idea/
.vscode/
*.swp
*.swo
# 系统文件
.DS_Store
Thumbs.db

View File

@@ -1,24 +0,0 @@
# Changelog
本文件记录 `MaiBot_MCPBridgePlugin` 的用户可感知变更。
## 2.0.0
- 配置入口统一MCP 服务器仅使用 Claude Desktop `mcpServers` JSON`servers.claude_config_json`
- 兼容迁移:自动识别旧版 `servers.list` 并迁移为 `mcpServers`(需在 WebUI 保存一次固化)
- 保持功能不变:保留 Workflow硬流程/工具链)与 ReAct软流程双轨制能力
- 精简实现:移除旧的 WebUI 导入导出/快速添加服务器实现与 `tomlkit` 依赖
- 易用性:完善 Workflow 变量替换(支持数组下标与 bracket 写法),并优化 WebUI 配置区顺序
## 1.9.0
- 双轨制架构ReAct软流程+ Workflow硬流程/工具链)
## 1.8.0
- Workflow工具链多工具顺序执行、变量替换、自定义 Workflow 并注册为组合工具
## 1.7.0
- 断路器模式、状态刷新、工具搜索等易用性增强

View File

@@ -1,356 +0,0 @@
# MCP 桥接插件开发文档
本文档面向开发者,介绍插件的架构设计、核心模块和扩展方式。
## 架构概览
```
MaiBot_MCPBridgePlugin/
├── plugin.py # 主插件文件,包含所有核心逻辑
├── mcp_client.py # MCP 客户端封装
├── tool_chain.py # 工具链Workflow模块
├── core/
│ └── claude_config.py # Claude Desktop mcpServers 解析/迁移
├── config.toml # 运行时配置
└── _manifest.json # 插件元数据
```
## 核心模块
### 1. MCP 客户端 (`mcp_client.py`)
封装了与 MCP 服务器的通信逻辑。
```python
from .mcp_client import mcp_manager, MCPServerConfig, TransportType
# 添加服务器
config = MCPServerConfig(
name="my-server",
transport=TransportType.STREAMABLE_HTTP,
url="https://mcp.example.com/mcp"
)
await mcp_manager.add_server(config)
# 调用工具
result = await mcp_manager.call_tool("server_tool_name", {"param": "value"})
if result.success:
print(result.content)
```
**支持的传输类型:**
- `STDIO`: 本地进程通信
- `SSE`: Server-Sent Events
- `HTTP`: HTTP 请求
- `STREAMABLE_HTTP`: 流式 HTTP推荐
### 2. 工具注册系统
MCP 工具通过动态类创建注册到 MaiBot
```python
# 创建工具代理类
class MCPToolProxy(BaseTool):
name = "mcp_server_tool"
description = "工具描述"
parameters = [("param", ToolParamType.STRING, "参数描述", True, None)]
available_for_llm = True
async def execute(self, function_args):
result = await mcp_manager.call_tool(self._mcp_tool_key, function_args)
return {"name": self.name, "content": result.content}
```
### 3. 工具链模块 (`tool_chain.py`)
实现 Workflow 硬流程,支持多工具顺序执行。
```python
from .tool_chain import ToolChainDefinition, ToolChainStep, tool_chain_manager
# 定义工具链
chain = ToolChainDefinition(
name="search_and_detail",
description="搜索并获取详情",
input_params={"query": "搜索关键词"},
steps=[
ToolChainStep(
tool_name="mcp_server_search",
args_template={"keyword": "${input.query}"},
output_key="search_result"
),
ToolChainStep(
tool_name="mcp_server_detail",
args_template={"id": "${prev}"}
)
]
)
# 注册并执行
tool_chain_manager.add_chain(chain)
result = await tool_chain_manager.execute_chain("search_and_detail", {"query": "test"})
```
**变量替换语法:**
- `${input.参数名}`: 用户输入
- `${step.输出键}`: 指定步骤的输出
- `${prev}`: 上一步输出
- `${prev.字段}`: 上一步输出JSON的字段
- `${step.geo.return.0.location}` / `${step.geo.return[0].location}`: 数组下标访问
- `${step.geo['return'][0]['location']}`: bracket 写法(最通用)
## 双轨制架构
### ReAct 软流程
将 MCP 工具注册到 MaiBot 的记忆检索 ReAct 系统LLM 自主决策调用。
```python
def _register_tools_to_react(self) -> int:
from src.memory_system.retrieval_tools import register_memory_retrieval_tool
def make_execute_func(tool_key: str):
async def execute_func(**kwargs) -> str:
result = await mcp_manager.call_tool(tool_key, kwargs)
return result.content if result.success else f"失败: {result.error}"
return execute_func
register_memory_retrieval_tool(
name="mcp_tool_name",
description="工具描述",
parameters=[{"name": "param", "type": "string", "required": True}],
execute_func=make_execute_func("tool_key")
)
```
### Workflow 硬流程
用户预定义的固定执行流程,注册为组合工具。
```python
def _register_tool_chains(self) -> None:
from src.plugin_system.core.component_registry import component_registry
for chain_name, chain in tool_chain_manager.get_enabled_chains().items():
info, tool_class = tool_chain_registry.register_chain(chain)
info.plugin_name = self.plugin_name
component_registry.register_component(info, tool_class)
```
## 配置系统
### MCP 服务器配置Claude Desktop 规范)
插件只接受 Claude Desktop 的 `mcpServers` JSON`core/claude_config.py`)。配置入口统一为:
- WebUI/配置文件:`[servers].claude_config_json`
- 命令:`/mcp import`(合并 `mcpServers`)与 `/mcp export`(导出当前 `mcpServers`
兼容迁移:
- 若检测到旧版 `servers.list`,会自动迁移为 `servers.claude_config_json`(仅迁移到内存配置,需 WebUI 保存一次固化)。
### WebUI 配置 Schema
使用 `ConfigField` 定义 WebUI 配置项:
```python
config_schema = {
"section_name": {
"field_name": ConfigField(
type=str, # 类型: str, bool, int, float
default="default_value", # 默认值
description="字段描述",
label="显示标签",
input_type="textarea", # 输入类型: text, textarea, password
rows=5, # textarea 行数
disabled=True, # 只读
choices=["a", "b"], # 下拉选项
hint="提示信息",
order=1, # 排序
),
},
}
```
### 配置读取
```python
# 在组件中读取配置
value = self.get_config("section.key", default="fallback")
# 在插件类中读取
value = self.config.get("section", {}).get("key", "default")
```
## 事件处理
### 启动事件
```python
class MCPStartupHandler(BaseEventHandler):
event_type = EventType.ON_START
handler_name = "mcp_startup"
async def execute(self, message):
global _plugin_instance
if _plugin_instance:
await _plugin_instance._async_connect_servers()
return (True, True, None, None, None)
```
### 停止事件
```python
class MCPStopHandler(BaseEventHandler):
event_type = EventType.ON_STOP
handler_name = "mcp_stop"
async def execute(self, message):
await mcp_manager.shutdown()
return (True, True, None, None, None)
```
## 命令系统
```python
class MCPStatusCommand(BaseCommand):
command_name = "mcp_status"
command_pattern = r"^/mcp(?:\s+(?P<action>\S+))?(?:\s+(?P<arg>.+))?$"
async def execute(self) -> Tuple[bool, str, bool]:
action = self.matched_groups.get("action", "")
arg = self.matched_groups.get("arg", "")
if action == "tools":
await self.send_text("工具列表...")
elif action == "reconnect":
await self._handle_reconnect(arg)
return (True, None, True) # (成功, 消息, 拦截)
```
## 高级功能
### 调用追踪
```python
from plugin import tool_call_tracer, ToolCallRecord
# 记录调用
record = ToolCallRecord(
call_id="xxx",
timestamp=time.time(),
tool_name="tool",
server_name="server",
arguments={"key": "value"},
success=True,
duration_ms=100.0
)
tool_call_tracer.record(record)
# 查询记录
recent = tool_call_tracer.get_recent(10)
by_tool = tool_call_tracer.get_by_tool("tool_name")
```
### 调用缓存
```python
from plugin import tool_call_cache
# 配置缓存
tool_call_cache.configure(
enabled=True,
ttl=300, # 秒
max_entries=200,
exclude_tools="mcp_*_time_*" # 排除模式
)
# 使用缓存
cached = tool_call_cache.get("tool_name", {"param": "value"})
if cached is None:
result = await call_tool(...)
tool_call_cache.set("tool_name", {"param": "value"}, result)
```
### 权限控制
```python
from plugin import permission_checker
# 配置权限
permission_checker.configure(
enabled=True,
default_mode="allow_all", # 或 "deny_all"
rules_json='[{"tool": "mcp_*_delete_*", "denied": ["qq:123:group"]}]',
quick_deny_groups="123456789",
quick_allow_users="111111111"
)
# 检查权限
allowed = permission_checker.check(
tool_name="mcp_server_delete",
chat_id="123456",
user_id="789",
is_group=True
)
```
### 断路器模式
MCP 客户端内置断路器,故障服务器快速失败:
- 连续失败 N 次后熔断
- 熔断期间直接返回错误
- 定期尝试恢复
## 扩展开发
### 添加新的传输类型
1.`mcp_client.py` 中添加 `TransportType` 枚举值
2. 实现对应的连接逻辑
3. 更新 `_create_transport()` 方法
### 添加新的工具类型
1. 继承 `BaseTool` 创建新类
2.`get_plugin_components()` 中注册
3. 实现 `execute()` 方法
### 添加新的命令
1.`MCPStatusCommand.execute()` 中添加新的 action 分支
2. 或创建新的 `BaseCommand` 子类
## 调试技巧
### 日志级别
```python
from src.common.logger import get_logger
logger = get_logger("mcp_bridge_plugin")
logger.debug("详细调试信息")
logger.info("一般信息")
logger.warning("警告")
logger.error("错误")
```
### 常用调试命令
```bash
/mcp # 查看状态
/mcp tools # 查看工具列表
/mcp trace # 查看调用记录
/mcp cache # 查看缓存状态
/mcp chain # 查看工具链
```
## 更新日志
`plugins/MaiBot_MCPBridgePlugin/CHANGELOG.md`
## 开发约定
- 本仓库不提交测试脚本/临时复现文件;如需本地验证,可自行在工作区创建未跟踪文件(建议放到 `.local/` 并加入 `.gitignore`)。

View File

@@ -1,357 +0,0 @@
# MCP 桥接插件
将 [MCP (Model Context Protocol)](https://modelcontextprotocol.io/) 服务器的工具桥接到 MaiBot使麦麦能够调用外部 MCP 工具。
<img width="3012" height="1794" alt="image" src="https://github.com/user-attachments/assets/ece56404-301a-4abf-b16d-87bd430fc977" />
## 🚀 快速开始
### 1. 安装
```bash
# 克隆到 MaiBot 插件目录
cd /path/to/MaiBot/plugins
git clone https://github.com/CharTyr/MaiBot_MCPBridgePlugin.git MCPBridgePlugin
# 安装依赖
pip install mcp
# 复制配置文件
cd MCPBridgePlugin
cp config.example.toml config.toml
```
### 2. 添加服务器
编辑 `config.toml`,在 `[servers]``claude_config_json` 中填写 Claude Desktop 的 `mcpServers` JSON
```toml
[servers]
claude_config_json = '''
{
"mcpServers": {
"time": { "transport": "streamable_http", "url": "https://mcp.api-inference.modelscope.cn/server/mcp-server-time" },
"my-server": { "transport": "streamable_http", "url": "https://mcp.xxx.com/mcp", "headers": { "Authorization": "Bearer 你的密钥" } },
"fetch": { "command": "uvx", "args": ["mcp-server-fetch"] }
}
}
'''
```
### 3. 启动
重启 MaiBot或发送 `/mcp reconnect`
---
## 📚 去哪找 MCP 服务器?
| 平台 | 说明 |
|------|------|
| [mcp.modelscope.cn](https://mcp.modelscope.cn/) | 魔搭 ModelScope免费推荐 |
| [smithery.ai](https://smithery.ai/) | MCP 服务器注册中心 |
| [github.com/modelcontextprotocol/servers](https://github.com/modelcontextprotocol/servers) | 官方服务器列表 |
---
## 💡 常用命令
| 命令 | 说明 |
|------|------|
| `/mcp` | 查看连接状态 |
| `/mcp tools` | 查看可用工具 |
| `/mcp reconnect` | 重连服务器 |
| `/mcp trace` | 查看调用记录 |
| `/mcp cache` | 查看缓存状态 |
| `/mcp perm` | 查看权限配置 |
| `/mcp import <json>` | 🆕 导入 Claude Desktop 配置 |
| `/mcp export` | 🆕 导出配置 |
| `/mcp search <关键词>` | 🆕 搜索工具 |
| `/mcp chain` | 🆕 查看工具链 |
| `/mcp chain <名称>` | 🆕 查看工具链详情 |
| `/mcp chain test <名称> <参数>` | 🆕 测试执行工具链 |
---
## ✨ 功能特性
### 核心功能
- 🔌 多服务器同时连接
- 📡 支持 stdio / SSE / HTTP / Streamable HTTP
- 🔄 自动重试、心跳检测、断线重连
- 🖥️ WebUI 完整配置支持
### 双轨制架构
- 🔄 **ReAct软流程**LLM 自主决策,多轮动态调用 MCP 工具(适合探索式场景)
- 🔗 **Workflow硬流程/工具链)**:用户预定义步骤顺序与参数传递(适合可控可复用场景)
### 高级功能
- 📦 Resources 支持(实验性)
- 📝 Prompts 支持(实验性)
- 🔄 结果后处理LLM 摘要提炼)
- 🔍 调用追踪 / 🗄️ 调用缓存 / 🔐 权限控制 / 🚫 工具禁用
### 更新日志
-`plugins/MaiBot_MCPBridgePlugin/CHANGELOG.md`
---
## ⚙️ 配置说明
### 服务器配置
```json
{
"mcpServers": {
"server_name": {
"transport": "streamable_http",
"url": "https://..."
}
}
}
```
| 字段 | 说明 |
|------|------|
| `mcpServers.<name>` | 服务器名称(唯一) |
| `enabled` | 是否启用(可选,默认 true |
| `transport` | `stdio` / `sse` / `http` / `streamable_http` |
| `url` | 远程服务器地址 |
| `headers` | 🆕 鉴权头(如 `{"Authorization": "Bearer xxx"}` |
| `command` / `args` | 本地服务器启动命令 |
### 权限控制
**快捷配置(推荐):**
```toml
[permissions]
perm_enabled = true
quick_deny_groups = "123456789" # 禁用的群号
quick_allow_users = "111111111" # 管理员白名单
```
**高级规则:**
```json
[{"tool": "mcp_*_delete_*", "denied": ["qq:123456:group"]}]
```
### 工具禁用
```toml
[tools]
disabled_tools = '''
mcp_filesystem_delete_file
mcp_filesystem_write_file
'''
```
### 调用缓存
```toml
[settings]
cache_enabled = true
cache_ttl = 300
cache_exclude_tools = "mcp_*_time_*"
```
---
## ❓ 常见问题
**Q: 工具没有注册?**
- 检查 `enabled = true`
- 检查 MaiBot 日志错误信息
- 确认 `pip install mcp`
**Q: JSON 格式报错?**
- 多行 JSON 用 `'''` 三引号包裹
- 使用英文双引号 `"`
**Q: 如何手动重连?**
- `/mcp reconnect``/mcp reconnect 服务器名`
---
## 📥 配置导入导出Claude mcpServers
### 从 Claude Desktop 导入
如果你已有 Claude Desktop 的 MCP 配置,可以直接导入:
```
/mcp import {"mcpServers":{"time":{"command":"uvx","args":["mcp-server-time"]},"fetch":{"command":"uvx","args":["mcp-server-fetch"]}}}
```
支持的格式:
- Claude Desktop 格式(`mcpServers` 对象)
- 兼容旧版MaiBot servers 列表数组(将自动迁移为 `mcpServers`
### 导出配置
```
/mcp export # 导出为 Claude Desktop 格式(默认)
/mcp export claude # 导出为 Claude Desktop 格式
```
### 注意事项
- 导入时会自动跳过同名服务器
- 导入后需要发送 `/mcp reconnect` 使配置生效
- 支持 stdio、sse、http、streamable_http 全部传输类型
---
## 🔗 Workflow硬流程/工具链)
工具链允许你将多个 MCP 工具按顺序执行,后续工具可以使用前序工具的输出作为输入。
### 1 分钟上手(推荐 WebUI
1. 先完成 MCP 服务器配置并 `/mcp reconnect`
2. 发送 `/mcp tools`,复制你要用的工具名
3. 打开 WebUI → 「Workflow硬流程/工具链)」→ 用“快速添加”表单填入:
- 名称/描述
- 输入参数(每行 `参数名=描述`
- 执行步骤(每行 `工具名|参数JSON|输出键`
4. 在“确认添加”中输入 `ADD` 并保存
### 快速添加工具链(推荐)
在 WebUI 的「工具链」配置区,使用表单快速添加:
1. **名称**: 填写工具链名称(英文,如 `search_and_detail`
2. **描述**: 填写工具链用途(供 LLM 理解何时使用)
3. **输入参数**: 每行一个,格式 `参数名=描述`
```
query=搜索关键词
max_results=最大结果数
```
4. **执行步骤**: 每行一个,格式 `工具名|参数JSON|输出键`
```
mcp_server_search|{"keyword":"${input.query}"}|search_result
mcp_server_detail|{"id":"${prev}"}|
```
5. **确认添加**: 输入 `ADD` 并保存
### JSON 配置方式
也可以直接在「工具链列表」中编写 JSON
```json
[
{
"name": "search_and_detail",
"description": "先搜索模组,再获取详情",
"input_params": {
"query": "搜索关键词"
},
"steps": [
{
"tool_name": "mcp_mcmod_search_mod",
"args_template": {"keyword": "${input.query}", "limit": 1},
"output_key": "search_result",
"description": "搜索模组"
},
{
"tool_name": "mcp_mcmod_get_mod_detail",
"args_template": {"mod_id": "${prev}"},
"description": "获取详情"
}
]
}
]
```
### 变量替换
| 变量格式 | 说明 |
|---------|------|
| `${input.参数名}` | 用户输入的参数 |
| `${step.输出键}` | 某个步骤的输出(通过 `output_key` 指定) |
| `${prev}` | 上一步的输出 |
| `${prev.字段}` | 上一步输出JSON的某个字段 |
| `${step.geo.return.0.location}` | 数组下标访问dot |
| `${step.geo.return[0].location}` | 数组下标访问([] |
| `${step.geo['return'][0]['location']}` | bracket 写法(最通用) |
### 工具链字段说明
| 字段 | 说明 |
|------|------|
| `name` | 工具链名称,将生成 `chain_xxx` 工具 |
| `description` | 描述,供 LLM 理解何时使用 |
| `input_params` | 输入参数定义 `{参数名: 描述}` |
| `steps` | 执行步骤数组 |
| `steps[].tool_name` | 要调用的工具名 |
| `steps[].args_template` | 参数模板,支持变量替换 |
| `steps[].output_key` | 输出存储键名(可选) |
| `steps[].optional` | 是否可选,失败时继续执行(默认 false |
### 命令
```bash
/mcp chain # 查看所有工具链
/mcp chain list # 列出工具链
/mcp chain <名称> # 查看详情
/mcp chain test <名称> {"query": "JEI"} # 测试执行
/mcp chain reload # 重新加载配置
```
---
## 🔄 双轨制架构
MCP 桥接插件支持两种工具调用模式,可根据场景选择:
### ReAct 软流程
LLM 自主决策的多轮工具调用模式,适合复杂、不确定的场景。
**工作原理:**
1. 用户提问 → LLM 分析需要什么信息
2. LLM 选择调用工具 → 获取结果
3. LLM 观察结果 → 决定是否需要更多信息
4. 重复 2-3 直到信息足够 → 生成最终回答
**启用方式:**
在 WebUI「ReAct (软流程)」配置区启用MCP 工具将自动注册到 MaiBot 的记忆检索 ReAct 系统。
**适用场景:**
- 复杂问题需要多步推理
- 不确定需要调用哪些工具
- 需要根据中间结果动态调整
### Workflow 硬流程
用户预定义的工作流,固定执行顺序,适合可靠、可控的场景。
**工作原理:**
1. 用户定义步骤顺序和参数传递
2. 按顺序执行每个步骤
3. 后续步骤可使用前序步骤的输出
4. 返回最终结果
**适用场景:**
- 流程固定、可预测
- 需要可靠、可重复的执行
- 希望精确控制工具调用顺序
### 对比
| 特性 | ReAct 软流程 | Workflow 硬流程 |
|------|-------------|----------------|
| 决策者 | LLM 自主决策 | 用户预定义 |
| 灵活性 | 高,动态调整 | 低,固定流程 |
| 可预测性 | 低 | 高 |
| 适用场景 | 复杂、探索性任务 | 固定、重复性任务 |
| 配置方式 | 启用即可 | 需要定义步骤 |
---
## 📋 依赖
- MaiBot >= 0.11.6
- Python >= 3.10
- mcp >= 1.0.0
## 📄 许可证
AGPL-3.0

View File

@@ -1,44 +0,0 @@
"""
MCP 桥接插件
将 MCP (Model Context Protocol) 服务器的工具桥接到 MaiBot
v1.1.0 新增功能:
- 心跳检测和自动重连
- 调用统计(次数、成功率、耗时)
- 更好的错误处理
v1.2.0 新增功能:
- Resources 支持(资源读取)
- Prompts 支持(提示模板)
"""
from .plugin import MCPBridgePlugin, mcp_tool_registry, MCPStartupHandler, MCPStopHandler
from .mcp_client import (
mcp_manager,
MCPClientManager,
MCPServerConfig,
TransportType,
MCPCallResult,
MCPToolInfo,
MCPResourceInfo,
MCPPromptInfo,
ToolCallStats,
ServerStats,
)
__all__ = [
"MCPBridgePlugin",
"mcp_tool_registry",
"mcp_manager",
"MCPClientManager",
"MCPServerConfig",
"TransportType",
"MCPCallResult",
"MCPToolInfo",
"MCPResourceInfo",
"MCPPromptInfo",
"ToolCallStats",
"ServerStats",
"MCPStartupHandler",
"MCPStopHandler",
]

View File

@@ -1,42 +0,0 @@
{
"manifest_version": 2,
"version": "2.0.0",
"name": "MCP桥接插件",
"description": "将 MCP (Model Context Protocol) 服务器的工具桥接到 MaiBot使麦麦能够调用外部 MCP 工具。",
"author": {
"name": "CharTyr",
"url": "https://github.com/CharTyr"
},
"license": "AGPL-3.0",
"urls": {
"repository": "https://github.com/CharTyr/MaiBot_MCPBridgePlugin",
"homepage": "https://github.com/CharTyr/MaiBot_MCPBridgePlugin",
"documentation": "https://github.com/CharTyr/MaiBot_MCPBridgePlugin",
"issues": "https://github.com/CharTyr/MaiBot_MCPBridgePlugin/issues"
},
"host_application": {
"min_version": "0.11.6",
"max_version": "1.0.0"
},
"sdk": {
"min_version": "2.0.0",
"max_version": "2.99.99"
},
"dependencies": [
{
"type": "python_package",
"name": "mcp",
"version_spec": ">=0.0.0"
}
],
"capabilities": [
"send.text"
],
"i18n": {
"default_locale": "zh-CN",
"supported_locales": [
"zh-CN"
]
},
"id": "chartyr.mcpbridge-plugin"
}

View File

@@ -1,309 +0,0 @@
# MCP桥接插件 - 配置文件示例
# 将 MCP (Model Context Protocol) 服务器的工具桥接到 MaiBot
#
# 使用方法:复制此文件为 config.toml然后根据需要修改配置
#
# ============================================================
# 🎯 快速开始(三步)
# ============================================================
# 1. 在下方 [servers] 添加 MCP 服务器配置
# 2. 将 enabled 改为 true 启用服务器
# 3. 重启 MaiBot 或发送 /mcp reconnect
#
# ============================================================
# 📚 去哪找 MCP 服务器?
# ============================================================
#
# 【远程服务(推荐新手)】
# - ModelScope: https://mcp.modelscope.cn/ (免费,推荐)
# - Smithery: https://smithery.ai/
# - Glama: https://glama.ai/mcp/servers
#
# 【本地服务(需要 npx 或 uvx
# - 官方列表: https://github.com/modelcontextprotocol/servers
#
# ============================================================
# ============================================================
# 🔌 MCP 服务器配置
# ============================================================
#
# ⚠️ 重要配置格式Claude Desktop 规范)
# ────────────────────────────────────────────────────────────
# 统一使用 Claude Desktop 的 mcpServers JSON。
#
# claude_config_json 的内容应为 JSON 对象:
# {
# "mcpServers": {
# "server_name": { ...server config... },
# "another": { ... }
# }
# }
#
# 每个服务器支持字段:
# transport - 传输方式: "stdio" / "sse" / "http" / "streamable_http"(可选)
# url - 服务器地址sse/http/streamable_http 模式)
# command - 启动命令stdio 模式,如 "npx" / "uvx"
# args - 命令参数数组stdio 模式)
# env - 环境变量对象stdio 模式,可选)
# headers - 鉴权头(可选,如 {"Authorization": "Bearer xxx"}
# enabled - 是否启用(可选,默认 true
# post_process - 服务器级别后处理配置(可选)
#
# ============================================================
[servers]
claude_config_json = '''
{
"mcpServers": {
"time-mcp-server": {
"enabled": false,
"transport": "streamable_http",
"url": "https://mcp.api-inference.modelscope.cn/server/mcp-server-time"
},
"my-auth-server": {
"enabled": false,
"transport": "streamable_http",
"url": "https://mcp.api-inference.modelscope.net/xxxxxx/mcp",
"headers": {
"Authorization": "Bearer ms-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
}
},
"fetch-local": {
"enabled": false,
"command": "uvx",
"args": ["mcp-server-fetch"]
}
}
}
'''
# ============================================================
# 插件基本信息
# ============================================================
[plugin]
name = "mcp_bridge_plugin"
version = "2.0.0"
config_version = "2.0.0"
enabled = false # 默认禁用,在 WebUI 中启用
# ============================================================
# Workflow硬流程/工具链)
# ============================================================
#
# 作用:把多个工具按顺序执行;后续步骤可引用前序输出。
#
# ✅ 推荐配置方式WebUI「Workflow硬流程/工具链)」里用“快速添加”表单。
# ✅ 也可以直接写 chains_listJSON 数组)。
#
# 变量替换:
# ${input.xxx} - 用户输入
# ${step.<output_key>} - 指定步骤输出(需设置 output_key
# ${prev} - 上一步输出
# ${prev.字段} - 上一步输出JSON的字段
# ${step.geo.return.0.location} - 数组/下标访问dot
# ${step.geo.return[0].location} - 数组/下标访问([]
# ${step.geo['return'][0]['location']} - bracket 写法
#
# ============================================================
[tool_chains]
chains_enabled = true
chains_list = '''
[
{
"name": "search_and_detail",
"description": "先搜索,再根据结果获取详情",
"input_params": { "query": "搜索关键词" },
"steps": [
{ "tool_name": "把这里替换成你的搜索工具名", "args_template": { "keyword": "${input.query}" }, "output_key": "search" },
{ "tool_name": "把这里替换成你的详情工具名", "args_template": { "id": "${prev}" } }
]
}
]
'''
# ============================================================
# ReAct软流程
# ============================================================
#
# 作用:把 MCP 工具注册到 MaiBot 的 ReAct 系统LLM 可自主多轮调用。
#
# 注意ReAct 适合“探索式/不确定”场景Workflow 适合“固定/可控”场景。
#
# ============================================================
[react]
react_enabled = false
filter_mode = "whitelist" # whitelist / blacklist
tool_filter = "" # 每行一个工具名,支持通配符 *
# ============================================================
# 全局设置(高级设置建议保持默认)
# ============================================================
[settings]
# 🏷️ 工具前缀 - 用于区分 MCP 工具和原生工具
tool_prefix = "mcp"
# ⏱️ 连接超时(秒)
connect_timeout = 30.0
# ⏱️ 调用超时(秒)
call_timeout = 60.0
# 🔄 自动连接 - 启动时自动连接所有已启用的服务器
auto_connect = true
# 🔁 重试次数 - 连接失败时的重试次数
retry_attempts = 3
# ⏳ 重试间隔(秒)
retry_interval = 5.0
# 💓 心跳检测 - 定期检测服务器连接状态
heartbeat_enabled = true
# 💓 心跳间隔(秒)- 建议 30-120 秒
heartbeat_interval = 60.0
# 🔄 自动重连 - 检测到断开时自动尝试重连
auto_reconnect = true
# 🔄 最大重连次数 - 连续重连失败后暂停重连
max_reconnect_attempts = 3
# ============================================================
# 高级功能(实验性)
# ============================================================
# 📦 启用 Resources - 允许读取 MCP 服务器提供的资源
enable_resources = false
# 📝 启用 Prompts - 允许使用 MCP 服务器提供的提示模板
enable_prompts = false
# ============================================================
# 结果后处理功能
# ============================================================
# 当 MCP 工具返回的内容过长时,使用 LLM 对结果进行摘要提炼
# 🔄 启用结果后处理
post_process_enabled = false
# 📏 后处理阈值(字符数)- 结果长度超过此值才触发后处理
post_process_threshold = 500
# 🔢 后处理输出限制 - LLM 摘要输出的最大 token 数
post_process_max_tokens = 500
# 🤖 后处理模型(可选)- 留空则使用 utils 模型组
post_process_model = ""
# 🧠 后处理提示词模板
post_process_prompt = '''用户问题:{query}
工具返回内容:
{result}
请从上述内容中提取与用户问题最相关的关键信息,简洁准确地输出:'''
# ============================================================
# 调用链路追踪
# ============================================================
# 记录工具调用详情,便于调试和分析
# 🔍 启用调用追踪
trace_enabled = true
# 📊 追踪记录上限 - 内存中保留的最大记录数
trace_max_records = 50
# 📝 追踪日志文件 - 是否将追踪记录写入日志文件
# 启用后记录写入 plugins/MaiBot_MCPBridgePlugin/logs/trace.jsonl
trace_log_enabled = false
# ============================================================
# 工具调用缓存
# ============================================================
# 缓存相同参数的调用结果,减少重复请求
# 🗄️ 启用调用缓存
cache_enabled = false
# ⏱️ 缓存有效期(秒)
cache_ttl = 300
# 📦 最大缓存条目 - 超出后 LRU 淘汰
cache_max_entries = 200
# 🚫 缓存排除列表 - 即不缓存的工具(每行一个,支持通配符 *
# 时间类、随机类工具建议排除
cache_exclude_tools = '''
mcp_*_time_*
mcp_*_random_*
'''
# ============================================================
# 工具管理
# ============================================================
[tools]
# 📋 工具清单(只读)- 启动后自动生成
tool_list = "(启动后自动生成)"
# 🚫 禁用工具列表 - 要禁用的工具名(每行一个)
# 从上方工具清单复制工具名,禁用后该工具不会被 LLM 调用
# 示例:
# disabled_tools = '''
# mcp_filesystem_delete_file
# mcp_filesystem_write_file
# '''
disabled_tools = ""
# ============================================================
# 权限控制
# ============================================================
[permissions]
# 🔐 启用权限控制 - 按群/用户限制工具使用
perm_enabled = false
# 📋 默认模式
# allow_all: 未配置规则的工具默认允许
# deny_all: 未配置规则的工具默认禁止
perm_default_mode = "allow_all"
# ────────────────────────────────────────────────────────────
# 🚀 快捷配置(推荐新手使用)
# ────────────────────────────────────────────────────────────
# 🚫 禁用群列表 - 这些群无法使用任何 MCP 工具(每行一个群号)
# 示例:
# quick_deny_groups = '''
# 123456789
# 987654321
# '''
quick_deny_groups = ""
# ✅ 管理员白名单 - 这些用户始终可以使用所有工具每行一个QQ号
# 示例:
# quick_allow_users = '''
# 111111111
# '''
quick_allow_users = ""
# ────────────────────────────────────────────────────────────
# 📜 高级权限规则(可选,针对特定工具配置)
# ────────────────────────────────────────────────────────────
# 格式: qq:ID:group/private/user工具名支持通配符 *
# 示例:
# perm_rules = '''
# [
# {"tool": "mcp_*_delete_*", "denied": ["qq:123456:group"]}
# ]
# '''
perm_rules = "[]"
# ============================================================
# 状态显示(只读)
# ============================================================
[status]
connection_status = "未初始化"

View File

@@ -1 +0,0 @@
"""Core helpers for MCP Bridge Plugin."""

View File

@@ -1,169 +0,0 @@
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List, Literal, Optional
class ClaudeConfigError(ValueError):
pass
Transport = Literal["stdio", "sse", "http", "streamable_http"]
@dataclass(frozen=True)
class ClaudeMcpServer:
name: str
transport: Transport
command: str = ""
args: List[str] = field(default_factory=list)
env: Dict[str, str] = field(default_factory=dict)
url: str = ""
headers: Dict[str, str] = field(default_factory=dict)
enabled: bool = True
def _normalize_transport(value: Optional[str]) -> Transport:
if not value:
return "streamable_http"
v = value.strip().lower().replace("-", "_")
if v in ("streamable_http", "streamablehttp", "streamable"):
return "streamable_http"
if v in ("http",):
return "http"
if v in ("sse",):
return "sse"
if v in ("stdio",):
return "stdio"
raise ClaudeConfigError(f"unsupported transport: {value}")
def _coerce_str_list(value: Any, field_name: str) -> List[str]:
if value is None:
return []
if isinstance(value, list):
return [str(v) for v in value]
raise ClaudeConfigError(f"{field_name} must be a list")
def _coerce_str_dict(value: Any, field_name: str) -> Dict[str, str]:
if value is None:
return {}
if isinstance(value, dict):
return {str(k): str(v) for k, v in value.items()}
raise ClaudeConfigError(f"{field_name} must be an object")
def parse_claude_mcp_config(config_json: str) -> List[ClaudeMcpServer]:
"""Parse Claude Desktop style MCP config JSON.
Supported:
- Full object: {"mcpServers": {...}}
- Direct mapping: {...} treated as mcpServers
"""
text = (config_json or "").strip()
if not text:
return []
try:
data = json.loads(text)
except json.JSONDecodeError as e:
raise ClaudeConfigError(f"invalid JSON: {e}") from e
if not isinstance(data, dict):
raise ClaudeConfigError("config must be a JSON object")
servers_obj = data.get("mcpServers", data)
if not isinstance(servers_obj, dict):
raise ClaudeConfigError("mcpServers must be an object")
servers: List[ClaudeMcpServer] = []
for name, raw in servers_obj.items():
if not isinstance(name, str) or not name.strip():
raise ClaudeConfigError("server name must be a non-empty string")
if not isinstance(raw, dict):
raise ClaudeConfigError(f"server '{name}' must be an object")
enabled = bool(raw.get("enabled", True))
command = str(raw.get("command", "") or "")
url = str(raw.get("url", "") or "")
args = _coerce_str_list(raw.get("args"), "args")
env = _coerce_str_dict(raw.get("env"), "env")
headers = _coerce_str_dict(raw.get("headers"), "headers")
transport_hint = raw.get("transport", raw.get("type"))
if command:
transport: Transport = "stdio"
elif url:
try:
transport = _normalize_transport(str(transport_hint) if transport_hint is not None else None)
except ClaudeConfigError:
transport = "streamable_http"
else:
raise ClaudeConfigError(f"server '{name}' must have either 'command' or 'url'")
servers.append(
ClaudeMcpServer(
name=name,
transport=transport,
command=command,
args=args,
env=env,
url=url,
headers=headers,
enabled=enabled,
)
)
return servers
def legacy_servers_list_to_claude_config(servers_list_json: str) -> str:
"""Convert legacy v1.x servers list (JSON array) to Claude mcpServers JSON.
Legacy item schema:
{"name","enabled","transport","url","headers","command","args","env"}
"""
text = (servers_list_json or "").strip()
if not text:
return ""
try:
data = json.loads(text)
except json.JSONDecodeError:
return ""
if isinstance(data, dict):
data = [data]
if not isinstance(data, list):
return ""
mcp_servers: Dict[str, Any] = {}
for item in data:
if not isinstance(item, dict):
continue
name = str(item.get("name", "") or "").strip()
if not name:
continue
enabled = bool(item.get("enabled", True))
transport = str(item.get("transport", "") or "").strip().lower().replace("-", "_")
if transport == "stdio" or item.get("command"):
entry: Dict[str, Any] = {
"enabled": enabled,
"command": item.get("command", "") or "",
"args": item.get("args", []) or [],
}
if item.get("env"):
entry["env"] = item.get("env")
mcp_servers[name] = entry
continue
entry = {"enabled": enabled, "url": item.get("url", "") or ""}
if item.get("headers"):
entry["headers"] = item.get("headers")
if transport:
entry["transport"] = transport
mcp_servers[name] = entry
if not mcp_servers:
return ""
return json.dumps({"mcpServers": mcp_servers}, ensure_ascii=False, indent=2)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,2 +0,0 @@
# MCP 桥接插件依赖
mcp>=1.0.0

View File

@@ -1,584 +0,0 @@
"""
MCP Workflow 模块 v1.9.0
支持用户自定义工作流(硬流程),将多个 MCP 工具按顺序执行
双轨制架构:
- 软流程 (ReAct): LLM 自主决策,动态多轮调用工具,灵活但不可预测
- 硬流程 (Workflow): 用户预定义的工作流,固定流程,可靠可控
功能:
- Workflow 定义和管理
- 顺序执行多个工具(硬流程)
- 支持变量替换(使用前序工具的输出)
- 自动注册为组合工具供 LLM 调用
- 与 ReAct 软流程互补,用户可选择合适的执行方式
"""
import json
import re
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
try:
from src.common.logger import get_logger
logger = get_logger("mcp_tool_chain")
except ImportError:
import logging
logger = logging.getLogger("mcp_tool_chain")
@dataclass
class ToolChainStep:
"""工具链步骤"""
tool_name: str # 要调用的工具名(如 mcp_server_tool
args_template: Dict[str, Any] = field(default_factory=dict) # 参数模板,支持变量替换
output_key: str = "" # 输出存储的键名,供后续步骤引用
description: str = "" # 步骤描述
optional: bool = False # 是否可选(失败时继续执行)
def to_dict(self) -> Dict[str, Any]:
return {
"tool_name": self.tool_name,
"args_template": self.args_template,
"output_key": self.output_key,
"description": self.description,
"optional": self.optional,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ToolChainStep":
return cls(
tool_name=data.get("tool_name", ""),
args_template=data.get("args_template", {}),
output_key=data.get("output_key", ""),
description=data.get("description", ""),
optional=data.get("optional", False),
)
@dataclass
class ToolChainDefinition:
"""工具链定义"""
name: str # 工具链名称(将作为组合工具的名称)
description: str # 工具链描述(供 LLM 理解)
steps: List[ToolChainStep] = field(default_factory=list) # 执行步骤
input_params: Dict[str, str] = field(default_factory=dict) # 输入参数定义 {参数名: 描述}
enabled: bool = True # 是否启用
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"description": self.description,
"steps": [step.to_dict() for step in self.steps],
"input_params": self.input_params,
"enabled": self.enabled,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ToolChainDefinition":
steps = [ToolChainStep.from_dict(s) for s in data.get("steps", [])]
return cls(
name=data.get("name", ""),
description=data.get("description", ""),
steps=steps,
input_params=data.get("input_params", {}),
enabled=data.get("enabled", True),
)
@dataclass
class ChainExecutionResult:
"""工具链执行结果"""
success: bool
final_output: str # 最终输出(最后一个步骤的结果)
step_results: List[Dict[str, Any]] = field(default_factory=list) # 每个步骤的结果
error: str = ""
total_duration_ms: float = 0.0
def to_summary(self) -> str:
"""生成执行摘要"""
lines = []
for i, step in enumerate(self.step_results):
status = "" if step.get("success") else ""
tool = step.get("tool_name", "unknown")
duration = step.get("duration_ms", 0)
lines.append(f"{status} 步骤{i + 1}: {tool} ({duration:.0f}ms)")
if not step.get("success") and step.get("error"):
lines.append(f" 错误: {step['error'][:50]}")
return "\n".join(lines)
class ToolChainExecutor:
"""工具链执行器"""
# 变量替换模式: ${step.output_key} 或 ${input.param_name} 或 ${prev}
VAR_PATTERN = re.compile(r"\$\{([^}]+)\}")
def __init__(self, mcp_manager):
self._mcp_manager = mcp_manager
def _resolve_tool_key(self, tool_name: str) -> Optional[str]:
"""解析工具名,返回有效的 tool_key
支持:
- 直接使用 tool_key如 mcp_server_tool
- 使用注册后的工具名(会自动转换 - 和 . 为 _
"""
all_tools = self._mcp_manager.all_tools
# 直接匹配
if tool_name in all_tools:
return tool_name
# 尝试转换后匹配(用户可能使用了注册后的名称)
normalized = tool_name.replace("-", "_").replace(".", "_")
if normalized in all_tools:
return normalized
# 尝试查找包含该名称的工具
for key in all_tools.keys():
if key.endswith(f"_{tool_name}") or key.endswith(f"_{normalized}"):
return key
return None
async def execute(
self,
chain: ToolChainDefinition,
input_args: Dict[str, Any],
) -> ChainExecutionResult:
"""执行工具链
Args:
chain: 工具链定义
input_args: 用户输入的参数
Returns:
ChainExecutionResult: 执行结果
"""
start_time = time.time()
step_results = []
context = {
"input": input_args or {}, # 用户输入,确保不为 None
"step": {}, # 各步骤输出,按 output_key 存储
"prev": "", # 上一步的输出
}
final_output = ""
# 验证必需的输入参数
missing_params = []
for param_name in chain.input_params.keys():
if param_name not in context["input"]:
missing_params.append(param_name)
if missing_params:
return ChainExecutionResult(
success=False,
final_output="",
error=f"缺少必需参数: {', '.join(missing_params)}",
total_duration_ms=(time.time() - start_time) * 1000,
)
for i, step in enumerate(chain.steps):
step_start = time.time()
step_result = {
"step_index": i,
"tool_name": step.tool_name,
"success": False,
"output": "",
"error": "",
"duration_ms": 0,
}
try:
# 替换参数中的变量
resolved_args = self._resolve_args(step.args_template, context)
step_result["resolved_args"] = resolved_args
# 解析工具名
tool_key = self._resolve_tool_key(step.tool_name)
if not tool_key:
step_result["error"] = f"工具 {step.tool_name} 不存在"
logger.warning(f"工具链步骤 {i + 1}: 工具 {step.tool_name} 不存在")
if not step.optional:
step_results.append(step_result)
return ChainExecutionResult(
success=False,
final_output="",
step_results=step_results,
error=f"步骤 {i + 1}: 工具 {step.tool_name} 不存在",
total_duration_ms=(time.time() - start_time) * 1000,
)
step_results.append(step_result)
continue
logger.debug(f"工具链步骤 {i + 1}: 调用 {tool_key},参数: {resolved_args}")
# 调用工具
result = await self._mcp_manager.call_tool(tool_key, resolved_args)
step_duration = (time.time() - step_start) * 1000
step_result["duration_ms"] = step_duration
if result.success:
step_result["success"] = True
# 确保 content 不为 None
content = result.content if result.content is not None else ""
step_result["output"] = content
# 更新上下文
context["prev"] = content
if step.output_key:
context["step"][step.output_key] = content
final_output = content
content_preview = content[:100] if content else "(空)"
logger.debug(f"工具链步骤 {i + 1} 成功: {content_preview}...")
else:
step_result["error"] = result.error or "未知错误"
logger.warning(f"工具链步骤 {i + 1} 失败: {result.error}")
if not step.optional:
step_results.append(step_result)
return ChainExecutionResult(
success=False,
final_output="",
step_results=step_results,
error=f"步骤 {i + 1} ({step.tool_name}) 失败: {result.error}",
total_duration_ms=(time.time() - start_time) * 1000,
)
except Exception as e:
step_duration = (time.time() - step_start) * 1000
step_result["duration_ms"] = step_duration
step_result["error"] = str(e)
logger.error(f"工具链步骤 {i + 1} 异常: {e}")
if not step.optional:
step_results.append(step_result)
return ChainExecutionResult(
success=False,
final_output="",
step_results=step_results,
error=f"步骤 {i + 1} ({step.tool_name}) 异常: {e}",
total_duration_ms=(time.time() - start_time) * 1000,
)
step_results.append(step_result)
total_duration = (time.time() - start_time) * 1000
return ChainExecutionResult(
success=True,
final_output=final_output,
step_results=step_results,
total_duration_ms=total_duration,
)
def _resolve_args(self, args_template: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""解析参数模板,替换变量
支持的变量格式:
- ${input.param_name}: 用户输入的参数
- ${step.output_key}: 某个步骤的输出
- ${prev}: 上一步的输出
- ${prev.field}: 上一步输出JSON的某个字段
"""
resolved = {}
for key, value in args_template.items():
if isinstance(value, str):
resolved[key] = self._substitute_vars(value, context)
elif isinstance(value, dict):
resolved[key] = self._resolve_args(value, context)
elif isinstance(value, list):
resolved[key] = [self._substitute_vars(v, context) if isinstance(v, str) else v for v in value]
else:
resolved[key] = value
return resolved
def _substitute_vars(self, template: str, context: Dict[str, Any]) -> str:
"""替换字符串中的变量"""
def replacer(match):
var_path = match.group(1)
return self._get_var_value(var_path, context)
return self.VAR_PATTERN.sub(replacer, template)
def _get_var_value(self, var_path: str, context: Dict[str, Any]) -> str:
"""获取变量值
Args:
var_path: 变量路径,如 "input.query", "step.search_result", "prev", "prev.id"
context: 上下文
"""
parts = self._parse_var_path(var_path)
if not parts:
return ""
# 获取根对象
root = parts[0]
if root not in context:
logger.warning(f"变量 {var_path} 的根 '{root}' 不存在")
return ""
value = context[root]
# 遍历路径
for part in parts[1:]:
if isinstance(value, str):
parsed = self._try_parse_json(value)
if parsed is not None:
value = parsed
if isinstance(value, dict):
value = value.get(part, "")
elif isinstance(value, list):
if part.isdigit():
idx = int(part)
value = value[idx] if 0 <= idx < len(value) else ""
else:
value = ""
else:
value = ""
# 确保返回字符串
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False)
if value is None:
return ""
if value == "":
return ""
return str(value)
def _try_parse_json(self, value: str) -> Optional[Any]:
"""尝试将字符串解析为 JSON 对象,失败则返回 None。"""
if not value:
return None
try:
return json.loads(value)
except json.JSONDecodeError:
return None
def _parse_var_path(self, var_path: str) -> List[str]:
"""解析变量路径,支持点号与下标写法。
支持:
- step.geo.return.0.location
- step.geo.return[0].location
- step.geo['return'][0]['location']
"""
if not var_path:
return []
tokens: List[str] = []
buf: List[str] = []
in_bracket = False
in_quote = False
quote_char = ""
def flush_buf() -> None:
if buf:
token = "".join(buf).strip()
if token:
tokens.append(token)
buf.clear()
i = 0
while i < len(var_path):
ch = var_path[i]
if not in_bracket and ch == ".":
flush_buf()
i += 1
continue
if not in_bracket and ch == "[":
flush_buf()
in_bracket = True
in_quote = False
quote_char = ""
i += 1
continue
if in_bracket and not in_quote and ch == "]":
flush_buf()
in_bracket = False
i += 1
continue
if in_bracket and ch in ("'", '"'):
if not in_quote:
in_quote = True
quote_char = ch
i += 1
continue
if quote_char == ch:
in_quote = False
quote_char = ""
i += 1
continue
if in_bracket and not in_quote:
if ch.isspace():
i += 1
continue
if ch == ",":
i += 1
continue
buf.append(ch)
i += 1
flush_buf()
if in_bracket or in_quote:
return [p for p in var_path.split(".") if p]
return tokens
class ToolChainManager:
"""工具链管理器"""
_instance: Optional["ToolChainManager"] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._chains: Dict[str, ToolChainDefinition] = {}
self._executor: Optional[ToolChainExecutor] = None
def set_executor(self, mcp_manager) -> None:
"""设置执行器"""
self._executor = ToolChainExecutor(mcp_manager)
def add_chain(self, chain: ToolChainDefinition) -> bool:
"""添加工具链"""
if not chain.name:
logger.error("工具链名称不能为空")
return False
if chain.name in self._chains:
logger.warning(f"工具链 {chain.name} 已存在,将被覆盖")
self._chains[chain.name] = chain
logger.info(f"已添加工具链: {chain.name} ({len(chain.steps)} 个步骤)")
return True
def remove_chain(self, name: str) -> bool:
"""移除工具链"""
if name in self._chains:
del self._chains[name]
logger.info(f"已移除工具链: {name}")
return True
return False
def get_chain(self, name: str) -> Optional[ToolChainDefinition]:
"""获取工具链"""
return self._chains.get(name)
def get_all_chains(self) -> Dict[str, ToolChainDefinition]:
"""获取所有工具链"""
return self._chains.copy()
def get_enabled_chains(self) -> Dict[str, ToolChainDefinition]:
"""获取所有启用的工具链"""
return {name: chain for name, chain in self._chains.items() if chain.enabled}
async def execute_chain(
self,
chain_name: str,
input_args: Dict[str, Any],
) -> ChainExecutionResult:
"""执行工具链"""
chain = self._chains.get(chain_name)
if not chain:
return ChainExecutionResult(
success=False,
final_output="",
error=f"工具链 {chain_name} 不存在",
)
if not chain.enabled:
return ChainExecutionResult(
success=False,
final_output="",
error=f"工具链 {chain_name} 已禁用",
)
if not self._executor:
return ChainExecutionResult(
success=False,
final_output="",
error="工具链执行器未初始化",
)
return await self._executor.execute(chain, input_args)
def load_from_json(self, json_str: str) -> Tuple[int, List[str]]:
"""从 JSON 字符串加载工具链配置
Returns:
(成功加载数量, 错误列表)
"""
errors = []
loaded = 0
try:
data = json.loads(json_str) if json_str.strip() else []
except json.JSONDecodeError as e:
return 0, [f"JSON 解析失败: {e}"]
if not isinstance(data, list):
data = [data]
for i, item in enumerate(data):
try:
chain = ToolChainDefinition.from_dict(item)
if not chain.name:
errors.append(f"{i + 1} 个工具链缺少名称")
continue
if not chain.steps:
errors.append(f"工具链 {chain.name} 没有步骤")
continue
self.add_chain(chain)
loaded += 1
except Exception as e:
errors.append(f"{i + 1} 个工具链解析失败: {e}")
return loaded, errors
def export_to_json(self, pretty: bool = True) -> str:
"""导出所有工具链为 JSON"""
chains_data = [chain.to_dict() for chain in self._chains.values()]
if pretty:
return json.dumps(chains_data, ensure_ascii=False, indent=2)
return json.dumps(chains_data, ensure_ascii=False)
def clear(self) -> None:
"""清空所有工具链"""
self._chains.clear()
# 全局工具链管理器实例
tool_chain_manager = ToolChainManager()

View File

@@ -1,5 +0,0 @@
{action_name}
动作描述:{action_description}
使用条件{parallel_text}
{action_require}
{{"action":"{action_name}",{action_parameters}, "target_message_id":"消息id(m+数字)"}}

View File

@@ -1 +0,0 @@
你正在qq群里聊天下面是群里正在聊的内容:

View File

@@ -1 +0,0 @@
正在群里聊天

View File

@@ -1 +0,0 @@
你正在和{sender_name}聊天,这是你们之前聊的内容:

View File

@@ -1 +0,0 @@
和{sender_name}聊天

View File

@@ -1,10 +0,0 @@
你是一个专门获取知识的助手。你的名字是{bot_name}。现在是{time_now}。
群里正在进行的聊天内容:
{chat_history}
现在,{sender}发送了内容:{target_message},你想要回复ta。
请仔细分析聊天内容,考虑以下几点:
1. 内容中是否包含需要查询信息的问题
2. 是否有明确的知识获取指令
If you need to use the search tool, please directly call the function "lpmm_search_knowledge". If you do not need to use any tool, simply output "No tool needed".

View File

@@ -1,14 +1,10 @@
的任务是根据内部想法生成一条对用户可见的自然回复。
正在qq群里聊天下面是群里正在聊的内容其中包含聊天记录和聊天中的图片
其中标注 {bot_name}(你) 的发言是你自己的发言,请注意区分:
【参考信息】
{bot_name}的人设:{identity}
回复风格要求:{reply_style}
【参考信息结束】
{time_block}
你正在群里聊天,现在请你读读之前的聊天记录,然后给出日常且口语化的回复,
尽量简短一些。
没必要刻意友好回复,符合你的人格就行。没必要刻意友好回复,符合你的人格就行。没必要刻意友好回复,符合你的人格就行
请注意把握聊天内容,不要回复的太有条理。
你的风格平淡但不失讽刺不过分兴奋很简短。可以参考贴吧知乎和微博的回复风格。很平淡和白话不浮夸不长篇大论b站评论风格但一定注意不要过分修辞和复杂句。
请注意不要输出多余内容(包括不必要的前后缀冒号括号表情包at或 @等 ),只输出发言内容就好。
最好一次对一个话题进行回复,免得啰嗦或者回复内容太乱。
{identity}
你正在群里聊天,现在请你读读之前的聊天记录,把握当前的话题,然后给出日常且口语化的回复,
尽量简短一些。最好一次对一个话题进行回复,免得啰嗦或者回复内容太乱。请注意把握聊天内容
{reply_style}
请注意不要输出多余内容(包括不必要的前后缀冒号括号表情包at或 @等 ),只输出发言内容就好。

View File

@@ -1,14 +0,0 @@
{knowledge_prompt}{tool_info_block}{extra_info_block}
{expression_habits_block}{memory_retrieval}{jargon_explanation}
你正在和{sender_name}聊天,这是你们之前聊的内容:
{time_block}
{dialogue_prompt}
你现在想补充说明你刚刚自己的发言内容:{target},原因是{reason}
请你根据聊天内容,组织一条新回复。注意,{target} 是刚刚你自己的发言,你要在这基础上进一步发言,请按照你自己的角度来继续进行回复。注意保持上下文的连贯性。
{identity}
{chat_prompt}尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。
{reply_style}
请注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。
{moderation_prompt}不要输出多余内容(包括冒号和引号括号表情包at或 @等 )。

View File

@@ -1,18 +0,0 @@
{knowledge_prompt}{tool_info_block}{extra_info_block}
{expression_habits_block}{memory_retrieval}{jargon_explanation}
你正在qq群里聊天下面是群里正在聊的内容其中包含聊天记录和聊天中的图片
其中标注 {bot_name}(你) 的发言是你自己的发言,请注意区分:
{time_block}
{dialogue_prompt}
{reply_target_block}。
{planner_reasoning}
{identity}
{chat_prompt}你正在群里聊天,现在请你读读之前的聊天记录,然后给出日常且口语化的回复,
尽量简短一些。{keywords_reaction_prompt}
请注意把握聊天内容,不要回复的太有条理。
{reply_style}
请注意不要输出多余内容(包括不必要的前后缀冒号括号表情包at或 @等 ),只输出发言内容就好。
最好一次对一个话题进行回复,免得啰嗦或者回复内容太乱。
现在,你说:

View File

@@ -1,11 +0,0 @@
你是一个专门执行工具的助手。你的名字是{bot_name}。现在是{time_now}。
群里正在进行的聊天内容:
{chat_history}
现在,{sender}发送了内容:{target_message},你想要回复ta。
请仔细分析聊天内容,考虑以下几点:
1. 内容中是否包含需要查询信息的问题
2. 是否有明确的工具使用指令
你可以选择多个动作
If you need to use tools, please directly call the corresponding tool function. If you do not need to use any tool, simply output "No tool needed".

View File

@@ -8,7 +8,6 @@ import time
from sqlmodel import select
from src.chat.message_receive.chat_manager import BotChatSession
from src.chat.message_receive.message import SessionMessage
from src.common.database.database import get_db_session
from src.common.database.database_model import Expression
from src.common.data_models.reply_generation_data_models import (
@@ -22,15 +21,11 @@ from src.config.config import global_config
from src.core.types import ActionInfo
from src.services.llm_service import LLMServiceClient
from src.maisaka.message_adapter import (
get_message_kind,
get_message_role,
get_message_source,
get_message_text,
parse_speaker_content,
)
from src.chat.message_receive.message import SessionMessage
from src.maisaka.context_messages import AssistantMessage, LLMContextMessage, ReferenceMessage, SessionBackedMessage, ToolResultMessage
from src.maisaka.message_adapter import parse_speaker_content
logger = get_logger("maisaka_replyer")
logger = get_logger("replyer")
@dataclass
@@ -96,16 +91,16 @@ class MaisakaReplyGenerator:
return normalized
@staticmethod
def _format_message_time(message: SessionMessage) -> str:
def _format_message_time(message: LLMContextMessage) -> str:
return message.timestamp.strftime("%H:%M:%S")
@staticmethod
def _extract_visible_assistant_reply(message: SessionMessage) -> str:
def _extract_visible_assistant_reply(message: AssistantMessage) -> str:
del message
return ""
def _extract_guided_bot_reply(self, message: SessionMessage) -> str:
speaker_name, body = parse_speaker_content(get_message_text(message).strip())
def _extract_guided_bot_reply(self, message: SessionBackedMessage) -> str:
speaker_name, body = parse_speaker_content(message.processed_plain_text.strip())
bot_nickname = global_config.bot.nickname.strip() or "Bot"
if speaker_name == bot_nickname:
return self._normalize_content(body.strip())
@@ -134,25 +129,24 @@ class MaisakaReplyGenerator:
return segments
def _format_chat_history(self, messages: List[SessionMessage]) -> str:
def _format_chat_history(self, messages: List[LLMContextMessage]) -> str:
"""格式化 replyer 使用的可见聊天记录。"""
bot_nickname = global_config.bot.nickname.strip() or "Bot"
parts: List[str] = []
for message in messages:
role = get_message_role(message)
timestamp = self._format_message_time(message)
if get_message_source(message) == "user_reference":
if isinstance(message, (ReferenceMessage, ToolResultMessage)):
continue
if role == "user":
if isinstance(message, SessionBackedMessage):
guided_reply = self._extract_guided_bot_reply(message)
if guided_reply:
parts.append(f"{timestamp} {bot_nickname}(you): {guided_reply}")
continue
raw_content = get_message_text(message)
raw_content = message.processed_plain_text
for speaker_name, content_body in self._split_user_message_segments(raw_content):
content = self._normalize_content(content_body)
if not content:
@@ -161,7 +155,7 @@ class MaisakaReplyGenerator:
parts.append(f"{timestamp} {visible_speaker}: {content}")
continue
if role == "assistant":
if isinstance(message, AssistantMessage):
visible_reply = self._extract_visible_assistant_reply(message)
if visible_reply:
parts.append(f"{timestamp} {bot_nickname}(you): {visible_reply}")
@@ -170,7 +164,7 @@ class MaisakaReplyGenerator:
def _build_prompt(
self,
chat_history: List[SessionMessage],
chat_history: List[LLMContextMessage],
reply_reason: str,
expression_habits: str = "",
) -> str:
@@ -182,6 +176,7 @@ class MaisakaReplyGenerator:
system_prompt = load_prompt(
"maidairy_replyer",
bot_name=global_config.bot.nickname,
time_block=f"当前时间:{current_time}",
identity=self._personality_prompt,
reply_style=global_config.personality.reply_style,
)
@@ -214,7 +209,7 @@ class MaisakaReplyGenerator:
async def _build_reply_context(
self,
chat_history: List[SessionMessage],
chat_history: List[LLMContextMessage],
reply_message: Optional[SessionMessage],
reply_reason: str,
stream_id: Optional[str],
@@ -239,7 +234,7 @@ class MaisakaReplyGenerator:
def _build_expression_habits(
self,
session_id: str,
chat_history: List[SessionMessage],
chat_history: List[LLMContextMessage],
reply_message: Optional[SessionMessage],
reply_reason: str,
) -> tuple[str, List[int]]:
@@ -301,7 +296,7 @@ class MaisakaReplyGenerator:
think_level: int = 1,
unknown_words: Optional[List[str]] = None,
log_reply: bool = True,
chat_history: Optional[List[SessionMessage]] = None,
chat_history: Optional[List[LLMContextMessage]] = None,
expression_habits: str = "",
selected_expression_ids: Optional[List[int]] = None,
) -> Tuple[bool, ReplyGenerationResult]:
@@ -330,9 +325,7 @@ class MaisakaReplyGenerator:
filtered_history = [
message
for message in chat_history
if get_message_role(message) != "system"
and get_message_kind(message) != "perception"
and get_message_source(message) != "user_reference"
if not isinstance(message, (ReferenceMessage, ToolResultMessage))
]
logger.debug(f"Maisaka replyer: filtered_history size={len(filtered_history)}")

View File

@@ -23,7 +23,13 @@ from src.config.config import config_manager, global_config
from src.mcp_module import MCPManager
from src.maisaka.chat_loop_service import MaisakaChatLoopService
from src.maisaka.message_adapter import build_message, format_speaker_content, remove_last_perception
from src.maisaka.context_messages import (
AssistantMessage,
LLMContextMessage,
SessionBackedMessage,
ToolResultMessage,
)
from src.maisaka.message_adapter import format_speaker_content
from src.maisaka.tool_handlers import (
ToolHandlerContext,
handle_mcp_tool,
@@ -43,7 +49,7 @@ class BufferCLI:
self._chat_loop_service: Optional[MaisakaChatLoopService] = None
self._reply_generator = MaisakaReplyGenerator()
self._reader = InputReader()
self._chat_history: Optional[list[SessionMessage]] = None
self._chat_history: Optional[list[LLMContextMessage]] = None
self._knowledge_store = get_knowledge_store()
self._knowledge_learner = KnowledgeLearner("maisaka_cli")
self._knowledge_min_messages_for_extraction = 10
@@ -118,22 +124,78 @@ class BufferCLI:
self._chat_start_time = now
self._last_assistant_response_time = None
self._chat_history = self._chat_loop_service.build_chat_context(user_text)
self._trigger_knowledge_learning([self._chat_history[-1]])
self._trigger_knowledge_learning([self._build_cli_session_message(user_text, now)])
else:
self._chat_history.append(
build_message(
role="user",
content=format_speaker_content(
global_config.maisaka.user_name.strip() or "User",
user_text,
now,
),
self._build_cli_context_message(
user_text=user_text,
timestamp=now,
source_kind="user",
)
)
self._trigger_knowledge_learning([self._chat_history[-1]])
self._trigger_knowledge_learning([self._build_cli_session_message(user_text, now)])
await self._run_llm_loop(self._chat_history)
@staticmethod
def _build_cli_context_message(
user_text: str,
timestamp: datetime,
source_kind: str = "user",
speaker_name: Optional[str] = None,
) -> SessionBackedMessage:
"""为 CLI 构造新的上下文消息。"""
resolved_speaker_name = speaker_name or global_config.maisaka.user_name.strip() or "User"
visible_text = format_speaker_content(
resolved_speaker_name,
user_text,
timestamp,
)
planner_prefix = (
f"[时间]{timestamp.strftime('%H:%M:%S')}\n"
f"[用户]{resolved_speaker_name}\n"
"[用户群昵称]\n"
"[msg_id]\n"
"[发言内容]"
)
from src.common.data_models.message_component_data_model import MessageSequence, TextComponent
return SessionBackedMessage(
raw_message=MessageSequence([TextComponent(f"{planner_prefix}{user_text}")]),
visible_text=visible_text,
timestamp=timestamp,
source_kind=source_kind,
)
@staticmethod
def _build_cli_session_message(user_text: str, timestamp: datetime) -> SessionMessage:
"""为 CLI 的知识学习构造兼容 SessionMessage。"""
from src.common.data_models.mai_message_data_model import MessageInfo, UserInfo
from src.common.data_models.message_component_data_model import MessageSequence
message = SessionMessage(message_id=f"maisaka_cli_{int(timestamp.timestamp() * 1000)}", timestamp=timestamp, platform="maisaka")
message.message_info = MessageInfo(
user_info=UserInfo(
user_id="maisaka_user",
user_nickname=global_config.maisaka.user_name.strip() or "User",
user_cardname=None,
),
group_info=None,
additional_config={},
)
message.session_id = "maisaka_cli"
message.raw_message = MessageSequence([])
visible_text = format_speaker_content(
global_config.maisaka.user_name.strip() or "User",
user_text,
timestamp,
)
message.raw_message.text(visible_text)
message.processed_plain_text = visible_text
message.display_message = visible_text
message.initialized = True
return message
def _trigger_knowledge_learning(self, messages: list[SessionMessage]) -> None:
"""在 CLI 会话中按批次触发 knowledge 学习。"""
if not global_config.maisaka.enable_knowledge_module:
@@ -161,7 +223,7 @@ class BufferCLI:
except Exception as exc:
console.print(f"[warning]Knowledge learning failed: {exc}[/warning]")
async def _run_llm_loop(self, chat_history: list[SessionMessage]) -> None:
async def _run_llm_loop(self, chat_history: list[LLMContextMessage]) -> None:
"""
Main inner loop for the Maisaka planner.
@@ -210,7 +272,8 @@ class BufferCLI:
)
)
remove_last_perception(chat_history)
if chat_history and isinstance(chat_history[-1], AssistantMessage) and chat_history[-1].source == "perception":
chat_history.pop()
perception_parts = []
if knowledge_analysis:
@@ -218,11 +281,10 @@ class BufferCLI:
if perception_parts:
chat_history.append(
build_message(
role="assistant",
AssistantMessage(
content="\n\n".join(perception_parts),
message_kind="perception",
source="assistant",
timestamp=datetime.now(),
source_kind="perception",
)
)
elif global_config.maisaka.show_thinking:
@@ -273,22 +335,19 @@ class BufferCLI:
elif tool_call.func_name == "reply":
reply = await self._generate_visible_reply(chat_history, response.content)
chat_history.append(
build_message(
role="tool",
ToolResultMessage(
content="Visible reply generated and recorded.",
source="tool",
timestamp=datetime.now(),
tool_call_id=tool_call.call_id,
tool_name=tool_call.func_name,
)
)
chat_history.append(
build_message(
role="user",
content=format_speaker_content(
global_config.bot.nickname.strip() or "MaiSaka",
reply,
datetime.now(),
),
source="guided_reply",
self._build_cli_context_message(
user_text=reply,
timestamp=datetime.now(),
source_kind="guided_reply",
speaker_name=global_config.bot.nickname.strip() or "MaiSaka",
)
)
@@ -296,11 +355,11 @@ class BufferCLI:
if global_config.maisaka.show_thinking:
console.print("[muted]No visible reply this round.[/muted]")
chat_history.append(
build_message(
role="tool",
ToolResultMessage(
content="No visible reply was sent for this round.",
source="tool",
timestamp=datetime.now(),
tool_call_id=tool_call.call_id,
tool_name=tool_call.func_name,
)
)
@@ -342,7 +401,7 @@ class BufferCLI:
)
)
async def _generate_visible_reply(self, chat_history: list[SessionMessage], latest_thought: str) -> str:
async def _generate_visible_reply(self, chat_history: list[LLMContextMessage], latest_thought: str) -> str:
"""根据最新思考生成并输出可见回复。"""
if not latest_thought:
return ""

View File

@@ -11,10 +11,11 @@ from src.chat.message_receive.message import SessionMessage
from src.chat.utils.utils import is_bot_self
from src.common.data_models.llm_service_data_models import LLMGenerationOptions
from src.common.logger import get_logger
from src.maisaka.context_messages import AssistantMessage, LLMContextMessage, SessionBackedMessage, ToolResultMessage
from src.services.llm_service import LLMServiceClient
from src.know_u.knowledge_store import KNOWLEDGE_CATEGORIES, get_knowledge_store
from src.maisaka.message_adapter import get_message_role, get_message_text, parse_speaker_content
from src.maisaka.message_adapter import parse_speaker_content
logger = get_logger("maisaka_knowledge")
@@ -53,7 +54,7 @@ def extract_category_ids_from_result(result: str) -> List[str]:
async def retrieve_relevant_knowledge(
knowledge_analyzer: Any,
chat_history: List[SessionMessage],
chat_history: List[LLMContextMessage],
) -> str:
"""Retrieve formatted knowledge snippets relevant to the current chat history."""
store = get_knowledge_store()
@@ -156,14 +157,26 @@ class KnowledgeLearner:
"""
lines: List[str] = []
for message in self._messages_cache[-30:]:
if get_message_role(message) == "assistant":
continue
if get_message_role(message) == "tool":
continue
if is_bot_self(message.platform, message.message_info.user_info.user_id):
if isinstance(message, (AssistantMessage, ToolResultMessage)):
continue
if isinstance(message, SessionBackedMessage):
if message.original_message and is_bot_self(
message.original_message.platform,
message.original_message.message_info.user_info.user_id,
):
continue
raw_text = message.processed_plain_text.strip()
fallback_speaker = (
message.original_message.message_info.user_info.user_nickname
if message.original_message is not None
else "用户"
)
else:
if is_bot_self(message.platform, message.message_info.user_info.user_id):
continue
raw_text = message.processed_plain_text.strip()
fallback_speaker = message.message_info.user_info.user_nickname or "用户"
raw_text = get_message_text(message).strip()
if not raw_text:
continue
@@ -172,7 +185,7 @@ class KnowledgeLearner:
if not visible_text:
continue
speaker = speaker_name or message.message_info.user_info.user_nickname or "用户"
speaker = speaker_name or fallback_speaker
lines.append(f"{speaker}: {visible_text}")
return "\n".join(lines)

View File

@@ -3,6 +3,7 @@ from typing import Any, Callable, Coroutine, Generic, Tuple, TypeVar, cast
import asyncio
from src.common.logger import get_logger
from src.config.model_configs import ModelInfo
from .base_client import (
@@ -33,12 +34,14 @@ ProviderStreamResponseHandler = Callable[
ProviderResponseParser = Callable[[RawResponseT], Tuple[APIResponse, UsageTuple | None]]
"""Provider 专用非流式响应解析函数类型。"""
logger = get_logger("llm_adapter_base")
async def await_task_with_interrupt(
task: asyncio.Task[TaskResultT],
interrupt_flag: asyncio.Event | None,
*,
interval_seconds: float = 0.1,
interval_seconds: float = 0.02,
) -> TaskResultT:
"""在支持外部中断的前提下等待异步任务完成。
@@ -55,8 +58,11 @@ async def await_task_with_interrupt(
"""
from src.llm_models.exceptions import ReqAbortException
started_at = asyncio.get_running_loop().time()
while not task.done():
if interrupt_flag and interrupt_flag.is_set():
elapsed = asyncio.get_running_loop().time() - started_at
logger.info(f"LLM 请求检测到中断信号准备取消底层任务elapsed={elapsed:.3f}s")
task.cancel()
raise ReqAbortException("请求被外部信号中断")
await asyncio.sleep(interval_seconds)

View File

@@ -22,6 +22,7 @@ from src.llm_models.exceptions import (
EmptyResponseException,
ModelAttemptFailed,
NetworkConnectionError,
ReqAbortException,
RespNotOkException,
RespParseException,
)
@@ -326,16 +327,7 @@ class LLMOrchestrator:
del raise_when_empty
self._refresh_task_config()
start_time = time.time()
if self.request_type.startswith("maisaka_"):
logger.info(
f"LLMOrchestrator[{self.request_type}] 开始执行 generate_response_with_message_async "
f"(temperature={temperature}, max_tokens={max_tokens}, tools={len(tools or [])})"
)
if self.request_type.startswith("maisaka_"):
logger.info(
f"LLMOrchestrator[{self.request_type}] 正在根据 {len(tools or [])} 个工具构建内部工具选项"
)
tool_built = self._build_tool_options(tools)
if self.request_type.startswith("maisaka_"):
logger.info(f"LLMOrchestrator[{self.request_type}] 已构建 {len(tool_built or [])} 个内部工具选项")
@@ -777,6 +769,9 @@ class LLMOrchestrator:
)
await asyncio.sleep(api_provider.retry_interval)
except ReqAbortException:
raise
except Exception as e:
logger.error(traceback.format_exc())
@@ -881,6 +876,15 @@ class LLMOrchestrator:
self.model_usage[model_info.name] = (total_tokens, penalty, usage_penalty - 1)
return LLMExecutionResult(api_response=response, model_info=model_info)
except ReqAbortException as e:
total_tokens, penalty, usage_penalty = self.model_usage[model_info.name]
self.model_usage[model_info.name] = (total_tokens, penalty, usage_penalty - 1)
if self.request_type.startswith("maisaka_"):
logger.info(
f"LLMOrchestrator[{self.request_type}] 模型 model={model_info.name} 的请求已被外部信号中断"
)
raise e
except ModelAttemptFailed as e:
last_exception = e.original_exception or e
logger.warning(f"模型 '{model_info.name}' 尝试失败,切换到下一个模型。原因: {e}")

View File

@@ -14,9 +14,9 @@ from rich.panel import Panel
from rich.pretty import Pretty
from rich.text import Text
from src.chat.message_receive.message import SessionMessage
from src.cli.console import console
from src.common.data_models.llm_service_data_models import LLMGenerationOptions
from src.common.data_models.message_component_data_model import MessageSequence, TextComponent
from src.common.logger import get_logger
from src.common.prompt_i18n import load_prompt
from src.config.config import global_config
@@ -27,12 +27,8 @@ from src.llm_models.payload_content.tool_option import ToolCall, ToolDefinitionI
from src.services.llm_service import LLMServiceClient
from .builtin_tools import get_builtin_tools
from .message_adapter import (
build_message,
format_speaker_content,
get_message_role,
to_llm_message,
)
from .context_messages import AssistantMessage, LLMContextMessage, SessionBackedMessage
from .message_adapter import format_speaker_content
@dataclass(slots=True)
@@ -41,7 +37,7 @@ class ChatResponse:
content: Optional[str]
tool_calls: List[ToolCall]
raw_message: SessionMessage
raw_message: AssistantMessage
logger = get_logger("maisaka_chat_loop")
@@ -59,6 +55,7 @@ class MaisakaChatLoopService:
self._temperature = temperature
self._max_tokens = max_tokens
self._extra_tools: List[ToolOption] = []
self._interrupt_flag: asyncio.Event | None = None
self._prompts_loaded = False
self._prompt_load_lock = asyncio.Lock()
self._personality_prompt = self._build_personality_prompt()
@@ -117,18 +114,21 @@ class MaisakaChatLoopService:
def set_extra_tools(self, tools: List[ToolDefinitionInput]) -> None:
self._extra_tools = normalize_tool_options(tools) or []
def set_interrupt_flag(self, interrupt_flag: asyncio.Event | None) -> None:
"""设置当前 planner 请求使用的中断标记。"""
self._interrupt_flag = interrupt_flag
async def analyze_knowledge_need(
self,
chat_history: List[SessionMessage],
chat_history: List[LLMContextMessage],
categories_summary: str,
) -> List[str]:
"""分析当前对话是否需要检索知识库分类。"""
visible_history: List[str] = []
for message in chat_history[-8:]:
if not message.content:
if not message.processed_plain_text:
continue
role = getattr(message, "role", "")
visible_history.append(f"{role}: {message.content}")
visible_history.append(f"{message.role}: {message.processed_plain_text}")
if not visible_history or not categories_summary.strip():
return []
@@ -302,7 +302,7 @@ class MaisakaChatLoopService:
padding=(0, 1),
)
async def chat_loop_step(self, chat_history: List[SessionMessage]) -> ChatResponse:
async def chat_loop_step(self, chat_history: List[LLMContextMessage]) -> ChatResponse:
await self.ensure_chat_prompt_loaded()
selected_history, selection_reason = self._select_llm_context_messages(chat_history)
@@ -313,7 +313,7 @@ class MaisakaChatLoopService:
messages.append(system_msg.build())
for msg in selected_history:
llm_message = to_llm_message(msg)
llm_message = msg.to_llm_message()
if llm_message is not None:
messages.append(llm_message)
@@ -342,15 +342,24 @@ class MaisakaChatLoopService:
)
request_started_at = perf_counter()
logger.info(
"planner 请求开始: "
f"selected_history={len(selected_history)} "
f"llm_messages={len(built_messages)} "
f"tool_count={len(all_tools)} "
f"interrupt_enabled={self._interrupt_flag is not None}"
)
generation_result = await self._llm_chat.generate_response_with_messages(
message_factory=message_factory,
options=LLMGenerationOptions(
tool_options=all_tools if all_tools else None,
temperature=self._temperature,
max_tokens=self._max_tokens,
interrupt_flag=self._interrupt_flag,
),
)
_ = perf_counter() - request_started_at
request_elapsed = perf_counter() - request_started_at
logger.info(f"planner 请求完成elapsed={request_elapsed:.3f}s")
tool_call_summaries = [
{
@@ -365,11 +374,10 @@ class MaisakaChatLoopService:
f"tool_calls={tool_call_summaries}"
)
raw_message = build_message(
role=RoleType.Assistant.value,
raw_message = AssistantMessage(
content=generation_result.response or "",
source="assistant",
tool_calls=generation_result.tool_calls or None,
timestamp=datetime.now(),
tool_calls=generation_result.tool_calls or [],
)
return ChatResponse(
content=generation_result.response,
@@ -378,20 +386,19 @@ class MaisakaChatLoopService:
)
@staticmethod
def _select_llm_context_messages(chat_history: List[SessionMessage]) -> tuple[List[SessionMessage], str]:
def _select_llm_context_messages(chat_history: List[LLMContextMessage]) -> tuple[List[LLMContextMessage], str]:
"""选择真正发送给 LLM 的上下文消息。"""
max_context_size = max(1, int(global_config.chat.max_context_size))
counted_roles = {"user", "assistant"}
selected_indices: List[int] = []
counted_message_count = 0
for index in range(len(chat_history) - 1, -1, -1):
message = chat_history[index]
if to_llm_message(message) is None:
if message.to_llm_message() is None:
continue
selected_indices.append(index)
if get_message_role(message) in counted_roles:
if message.count_in_context:
counted_message_count += 1
if counted_message_count >= max_context_size:
break
@@ -410,15 +417,25 @@ class MaisakaChatLoopService:
)
@staticmethod
def build_chat_context(user_text: str) -> List[SessionMessage]:
def build_chat_context(user_text: str) -> List[LLMContextMessage]:
timestamp = datetime.now()
visible_text = format_speaker_content(
global_config.maisaka.user_name.strip() or "用户",
user_text,
timestamp,
)
planner_prefix = (
f"[时间]{timestamp.strftime('%H:%M:%S')}\n"
f"[用户]{global_config.maisaka.user_name.strip() or '用户'}\n"
"[用户群昵称]\n"
"[msg_id]\n"
"[发言内容]"
)
return [
build_message(
role=RoleType.User.value,
content=format_speaker_content(
global_config.maisaka.user_name.strip() or "用户",
user_text,
datetime.now(),
),
source="user",
SessionBackedMessage(
raw_message=MessageSequence([TextComponent(f"{planner_prefix}{user_text}")]),
visible_text=visible_text,
timestamp=timestamp,
source_kind="user",
)
]

View File

@@ -0,0 +1,275 @@
"""Maisaka 内部上下文消息抽象。"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from io import BytesIO
from typing import Optional
import base64
from PIL import Image as PILImage
from src.chat.message_receive.message import SessionMessage
from src.common.data_models.message_component_data_model import EmojiComponent, ImageComponent, MessageSequence, TextComponent
from src.llm_models.payload_content.message import Message, MessageBuilder, RoleType
from src.llm_models.payload_content.tool_option import ToolCall
def _guess_image_format(image_bytes: bytes) -> Optional[str]:
if not image_bytes:
return None
try:
with PILImage.open(BytesIO(image_bytes)) as image:
return image.format.lower() if image.format else None
except Exception:
return None
def _build_message_from_sequence(
role: RoleType,
message_sequence: MessageSequence,
fallback_text: str,
*,
tool_call_id: Optional[str] = None,
tool_calls: Optional[list[ToolCall]] = None,
) -> Optional[Message]:
"""根据消息片段构造统一 LLM 消息。"""
builder = MessageBuilder().set_role(role)
if role == RoleType.Assistant and tool_calls:
builder.set_tool_calls(tool_calls)
if role == RoleType.Tool and tool_call_id:
builder.add_tool_call(tool_call_id)
has_content = False
for component in message_sequence.components:
if isinstance(component, TextComponent):
if component.text:
builder.add_text_content(component.text)
has_content = True
continue
if isinstance(component, (EmojiComponent, ImageComponent)):
image_format = _guess_image_format(component.binary_data)
if image_format and component.binary_data:
builder.add_image_content(image_format, base64.b64encode(component.binary_data).decode("utf-8"))
has_content = True
continue
if component.content:
builder.add_text_content(component.content)
has_content = True
if not has_content and fallback_text:
builder.add_text_content(fallback_text)
has_content = True
if not has_content and not (role == RoleType.Assistant and tool_calls):
return None
return builder.build()
class ReferenceMessageType(str, Enum):
"""参考消息类型。"""
CUSTOM = "custom"
JARGON = "jargon"
KNOWLEDGE = "knowledge"
MEMORY = "memory"
TOOL_HINT = "tool_hint"
class LLMContextMessage(ABC):
"""Maisaka 内部用于组织 LLM 上下文的统一消息抽象。"""
timestamp: datetime
@property
@abstractmethod
def role(self) -> str:
"""返回 LLM 消息角色。"""
@property
@abstractmethod
def processed_plain_text(self) -> str:
"""返回可读的纯文本内容。"""
@property
def count_in_context(self) -> bool:
"""是否占用普通 user/assistant 上下文窗口。"""
return True
@property
def source(self) -> str:
"""返回消息来源。"""
return self.__class__.__name__
@abstractmethod
def to_llm_message(self) -> Optional[Message]:
"""转换为统一 LLM 消息。"""
def consume_once(self) -> bool:
"""消费一次生命周期,返回是否继续保留。"""
return True
@dataclass(slots=True)
class SessionBackedMessage(LLMContextMessage):
"""真实会话上下文消息。"""
raw_message: MessageSequence
visible_text: str
timestamp: datetime
message_id: Optional[str] = None
original_message: Optional[SessionMessage] = None
source_kind: str = "user"
@property
def role(self) -> str:
return RoleType.User.value
@property
def processed_plain_text(self) -> str:
return self.visible_text
@property
def source(self) -> str:
return self.source_kind
def to_llm_message(self) -> Optional[Message]:
return _build_message_from_sequence(
RoleType.User,
self.raw_message,
self.processed_plain_text,
)
@classmethod
def from_session_message(
cls,
session_message: SessionMessage,
*,
raw_message: MessageSequence,
visible_text: str,
source_kind: str = "user",
) -> "SessionBackedMessage":
"""从真实 SessionMessage 构造上下文消息。"""
return cls(
raw_message=raw_message,
visible_text=visible_text,
timestamp=session_message.timestamp,
message_id=session_message.message_id,
original_message=session_message,
source_kind=source_kind,
)
@dataclass(slots=True)
class ReferenceMessage(LLMContextMessage):
"""参考消息。"""
content: str
timestamp: datetime
reference_type: ReferenceMessageType = ReferenceMessageType.CUSTOM
remaining_uses_value: Optional[int] = 1
display_prefix: str = "[参考消息]"
@property
def role(self) -> str:
return RoleType.User.value
@property
def processed_plain_text(self) -> str:
return f"{self.display_prefix}\n{self.content}".strip()
@property
def count_in_context(self) -> bool:
return False
@property
def source(self) -> str:
return self.reference_type.value
def to_llm_message(self) -> Optional[Message]:
message_sequence = MessageSequence([TextComponent(self.processed_plain_text)])
return _build_message_from_sequence(RoleType.User, message_sequence, self.processed_plain_text)
def consume_once(self) -> bool:
if self.remaining_uses_value is None:
return True
self.remaining_uses_value -= 1
return self.remaining_uses_value > 0
@dataclass(slots=True)
class AssistantMessage(LLMContextMessage):
"""内部 assistant 消息。"""
content: str
timestamp: datetime
tool_calls: list[ToolCall] = field(default_factory=list)
source_kind: str = "assistant"
@property
def role(self) -> str:
return RoleType.Assistant.value
@property
def processed_plain_text(self) -> str:
return self.content
@property
def count_in_context(self) -> bool:
return self.source_kind != "perception"
@property
def source(self) -> str:
return self.source_kind
def to_llm_message(self) -> Optional[Message]:
message_sequence = MessageSequence([])
if self.content:
message_sequence.text(self.content)
return _build_message_from_sequence(
RoleType.Assistant,
message_sequence,
self.content,
tool_calls=self.tool_calls or None,
)
@dataclass(slots=True)
class ToolResultMessage(LLMContextMessage):
"""工具返回结果消息。"""
content: str
timestamp: datetime
tool_call_id: str
tool_name: str = ""
success: bool = True
@property
def role(self) -> str:
return RoleType.Tool.value
@property
def processed_plain_text(self) -> str:
return self.content
@property
def count_in_context(self) -> bool:
return False
@property
def source(self) -> str:
return self.tool_name or "tool"
def to_llm_message(self) -> Optional[Message]:
message_sequence = MessageSequence([TextComponent(self.content)])
return _build_message_from_sequence(
RoleType.Tool,
message_sequence,
self.content,
tool_call_id=self.tool_call_id,
)

View File

@@ -1,148 +1,32 @@
"""
MaiSaka 内部消息适配器。
"""
"""Maisaka 文本与消息片段适配工具。"""
from copy import deepcopy
from datetime import datetime
from io import BytesIO
from typing import Optional
from uuid import uuid4
import base64
import re
from PIL import Image as PILImage
from src.chat.message_receive.message import SessionMessage
from src.common.data_models.mai_message_data_model import GroupInfo, MessageInfo, UserInfo
from src.common.data_models.message_component_data_model import EmojiComponent, ImageComponent, MessageSequence, TextComponent
from src.config.config import global_config
from src.llm_models.payload_content.message import Message, MessageBuilder, RoleType
from src.llm_models.payload_content.tool_option import ToolCall
MAISAKA_PLATFORM = "maisaka"
MAISAKA_SESSION_ID = "maisaka_cli"
MESSAGE_KIND_KEY = "maisaka_message_kind"
SOURCE_KEY = "maisaka_source"
LLM_ROLE_KEY = "maisaka_llm_role"
TOOL_CALL_ID_KEY = "maisaka_tool_call_id"
TOOL_CALLS_KEY = "maisaka_tool_calls"
SPEAKER_PREFIX_PATTERN = re.compile(
r"^(?:(?P<timestamp>\d{2}:\d{2}:\d{2}))?(?:\[msg_id:(?P<message_id>[^\]]+)\])?\[(?P<speaker>[^\]]+)\](?P<content>.*)$",
re.DOTALL,
)
def _build_user_info_for_role(role: str) -> UserInfo:
if role == RoleType.User.value:
return UserInfo(
user_id="maisaka_user",
user_nickname=global_config.maisaka.user_name.strip() or "用户",
user_cardname=None,
)
if role == RoleType.Tool.value:
return UserInfo(user_id="maisaka_tool", user_nickname="tool", user_cardname=None)
return UserInfo(
user_id="maisaka_assistant",
user_nickname=global_config.bot.nickname.strip() or "MaiSaka",
user_cardname=None,
)
def _serialize_tool_call(tool_call: ToolCall) -> dict:
return {
"call_id": tool_call.call_id,
"func_name": tool_call.func_name,
"args": tool_call.args or {},
}
def _deserialize_tool_call(data: dict) -> ToolCall:
return ToolCall(
call_id=str(data.get("call_id", "")),
func_name=str(data.get("func_name", "")),
args=data.get("args", {}) or {},
)
def _ensure_message_id_in_speaker_content(content: str, message_id: str) -> str:
"""Ensure speaker-formatted visible text carries a msg_id marker."""
match = SPEAKER_PREFIX_PATTERN.match(content or "")
if not match:
return content
existing_message_id = match.group("message_id")
if existing_message_id:
return content
timestamp_text = match.group("timestamp")
speaker_name = match.group("speaker")
visible_content = match.group("content")
timestamp = datetime.strptime(timestamp_text, "%H:%M:%S") if timestamp_text else None
return format_speaker_content(speaker_name, visible_content, timestamp, message_id)
def build_message(
role: str,
content: str = "",
*,
message_kind: str = "normal",
source: Optional[str] = None,
tool_call_id: Optional[str] = None,
tool_calls: Optional[list[ToolCall]] = None,
timestamp: Optional[datetime] = None,
message_id: Optional[str] = None,
platform: str = MAISAKA_PLATFORM,
session_id: str = MAISAKA_SESSION_ID,
user_info: Optional[UserInfo] = None,
group_info: Optional[GroupInfo] = None,
raw_message: Optional[MessageSequence] = None,
display_text: Optional[str] = None,
) -> SessionMessage:
"""为 MaiSaka 会话历史构建内部 ``SessionMessage``。"""
resolved_timestamp = timestamp or datetime.now()
resolved_role = role.value if isinstance(role, RoleType) else role
message = SessionMessage(
message_id=message_id or f"maisaka_{uuid4().hex}",
timestamp=resolved_timestamp,
platform=platform,
)
normalized_content = _ensure_message_id_in_speaker_content(content, message.message_id) if content else content
message.message_info = MessageInfo(
user_info=user_info or _build_user_info_for_role(resolved_role),
group_info=group_info,
additional_config={
LLM_ROLE_KEY: resolved_role,
MESSAGE_KIND_KEY: message_kind,
SOURCE_KEY: source or resolved_role,
TOOL_CALL_ID_KEY: tool_call_id,
TOOL_CALLS_KEY: [_serialize_tool_call(tool_call) for tool_call in (tool_calls or [])],
},
)
message.session_id = session_id
message.raw_message = raw_message if raw_message is not None else MessageSequence([])
if raw_message is None and normalized_content:
message.raw_message.text(normalized_content)
visible_text = display_text if display_text is not None else normalized_content
message.processed_plain_text = visible_text
message.display_message = visible_text
message.initialized = True
return message
def format_speaker_content(
speaker_name: str,
content: str,
timestamp: Optional[datetime] = None,
message_id: Optional[str] = None,
) -> str:
"""Format visible conversation content with an explicit speaker label."""
"""将可见文本格式化为带说话人前缀的样式。"""
time_prefix = timestamp.strftime("%H:%M:%S") if timestamp is not None else ""
message_id_prefix = f"[msg_id:{message_id}]" if message_id else ""
return f"{time_prefix}{message_id_prefix}[{speaker_name}]{content}"
def parse_speaker_content(content: str) -> tuple[Optional[str], str]:
"""Parse content formatted as [speaker]message."""
"""解析形如 [speaker]message 的可见文本。"""
match = SPEAKER_PREFIX_PATTERN.match(content or "")
if not match:
return None, content or ""
@@ -150,12 +34,12 @@ def parse_speaker_content(content: str) -> tuple[Optional[str], str]:
def clone_message_sequence(message_sequence: MessageSequence) -> MessageSequence:
"""Create a detached copy of a message sequence."""
"""复制消息片段序列。"""
return MessageSequence([deepcopy(component) for component in message_sequence.components])
def build_visible_text_from_sequence(message_sequence: MessageSequence) -> str:
"""Extract visible text from a message sequence without forcing image descriptions."""
"""从消息片段序列提取可见文本。"""
parts: list[str] = []
for component in message_sequence.components:
if isinstance(component, TextComponent):
@@ -181,112 +65,5 @@ def build_visible_text_from_sequence(message_sequence: MessageSequence) -> str:
if isinstance(component, ImageComponent):
parts.append("[图片]")
return "".join(parts)
def _guess_image_format(image_bytes: bytes) -> Optional[str]:
if not image_bytes:
return None
try:
with PILImage.open(BytesIO(image_bytes)) as image:
return image.format.lower() if image.format else None
except Exception:
return None
def get_message_text(message: SessionMessage) -> str:
if message.processed_plain_text is not None:
return message.processed_plain_text
if message.display_message is not None:
return message.display_message
parts: list[str] = []
for component in message.raw_message.components:
text = getattr(component, "text", None)
if isinstance(text, str):
parts.append(text)
return "".join(parts)
def get_message_role(message: SessionMessage) -> str:
return str(message.message_info.additional_config.get(LLM_ROLE_KEY, RoleType.User.value))
def get_message_kind(message: SessionMessage) -> str:
return str(message.message_info.additional_config.get(MESSAGE_KIND_KEY, "normal"))
def get_message_source(message: SessionMessage) -> str:
return str(message.message_info.additional_config.get(SOURCE_KEY, get_message_role(message)))
def is_perception_message(message: SessionMessage) -> bool:
return get_message_kind(message) == "perception"
def get_tool_call_id(message: SessionMessage) -> Optional[str]:
value = message.message_info.additional_config.get(TOOL_CALL_ID_KEY)
return str(value) if value else None
def get_tool_calls(message: SessionMessage) -> list[ToolCall]:
raw_tool_calls = message.message_info.additional_config.get(TOOL_CALLS_KEY, [])
if not isinstance(raw_tool_calls, list):
return []
return [_deserialize_tool_call(item) for item in raw_tool_calls if isinstance(item, dict)]
def remove_last_perception(messages: list[SessionMessage]) -> None:
for index in range(len(messages) - 1, -1, -1):
if is_perception_message(messages[index]):
messages.pop(index)
break
def to_llm_message(message: SessionMessage) -> Optional[Message]:
role = get_message_role(message)
tool_call_id = get_tool_call_id(message)
tool_calls = get_tool_calls(message)
if role == RoleType.System.value:
role_type = RoleType.System
elif role == RoleType.User.value:
role_type = RoleType.User
elif role == RoleType.Assistant.value:
role_type = RoleType.Assistant
elif role == RoleType.Tool.value:
role_type = RoleType.Tool
else:
return None
builder = MessageBuilder().set_role(role_type)
if role_type == RoleType.Assistant and tool_calls:
builder.set_tool_calls(tool_calls)
if role_type == RoleType.Tool and tool_call_id:
builder.add_tool_call(tool_call_id)
has_content = False
for component in message.raw_message.components:
if isinstance(component, TextComponent):
if component.text:
builder.add_text_content(component.text)
has_content = True
continue
if isinstance(component, (ImageComponent, EmojiComponent)):
image_format = _guess_image_format(component.binary_data)
if image_format and component.binary_data:
builder.add_image_content(image_format, base64.b64encode(component.binary_data).decode("utf-8"))
has_content = True
continue
if component.content:
builder.add_text_content(component.content)
has_content = True
if not has_content:
content = get_message_text(message)
if content:
builder.add_text_content(content)
return builder.build()

View File

@@ -6,33 +6,32 @@ from typing import TYPE_CHECKING, Optional
import asyncio
import difflib
import json
import re
import time
import traceback
from sqlmodel import select
from src.chat.heart_flow.heartFC_utils import CycleDetail
from src.chat.message_receive.message import SessionMessage
from src.chat.replyer.replyer_manager import replyer_manager
from src.chat.utils.utils import get_bot_account, process_llm_response
from src.common.database.database import get_db_session
from src.common.database.database_model import Jargon
from src.common.data_models.mai_message_data_model import UserInfo
from src.chat.utils.utils import process_llm_response
from src.common.data_models.message_component_data_model import MessageSequence, TextComponent
from src.common.logger import get_logger
from src.config.config import global_config
from src.learners.jargon_explainer import search_jargon
from src.llm_models.exceptions import ReqAbortException
from src.llm_models.payload_content.tool_option import ToolCall
from src.services import database_service as database_api, send_service
from .context_messages import (
AssistantMessage,
LLMContextMessage,
SessionBackedMessage,
ToolResultMessage,
)
from .message_adapter import (
build_message,
build_visible_text_from_sequence,
clone_message_sequence,
format_speaker_content,
get_message_source,
get_message_text,
get_message_role,
)
from .tool_handlers import (
handle_mcp_tool,
@@ -51,7 +50,6 @@ class MaisakaReasoningEngine:
def __init__(self, runtime: "MaisakaHeartFlowChatting") -> None:
self._runtime = runtime
self._last_reasoning_content: str = ""
self._shown_jargons: set[str] = set() # 已在参考消息中展示过的 jargon
async def run_loop(self) -> None:
"""独立消费消息批次,并执行对应的内部思考轮次。"""
@@ -65,6 +63,7 @@ class MaisakaReasoningEngine:
self._runtime._agent_state = self._runtime._STATE_RUNNING
if cached_messages:
self._append_wait_interrupted_message_if_needed()
await self._ingest_messages(cached_messages)
anchor_message = cached_messages[-1]
else:
@@ -76,26 +75,35 @@ class MaisakaReasoningEngine:
self._runtime._internal_turn_queue.task_done()
continue
logger.info(f"{self._runtime.log_prefix} wait 超时后开始新一轮思考")
self._runtime._chat_history.append(self._build_wait_timeout_message(anchor_message))
self._runtime._chat_history.append(self._build_wait_timeout_message())
self._trim_chat_history()
try:
for round_index in range(self._runtime._max_internal_rounds):
cycle_detail = self._start_cycle()
self._runtime._log_cycle_started(cycle_detail, round_index)
try:
# 每次LLM生成前动态添加参考消息到最新位置
reference_added = self._append_jargon_reference_message()
planner_started_at = time.time()
response = await self._runtime._chat_loop_service.chat_loop_step(self._runtime._chat_history)
logger.info(
f"{self._runtime.log_prefix} planner 开始: "
f"round={round_index + 1} "
f"history_size={len(self._runtime._chat_history)} "
f"started_at={planner_started_at:.3f}"
)
interrupt_flag = asyncio.Event()
self._runtime._planner_interrupt_flag = interrupt_flag
self._runtime._chat_loop_service.set_interrupt_flag(interrupt_flag)
try:
response = await self._runtime._chat_loop_service.chat_loop_step(self._runtime._chat_history)
finally:
if self._runtime._planner_interrupt_flag is interrupt_flag:
self._runtime._planner_interrupt_flag = None
self._runtime._chat_loop_service.set_interrupt_flag(None)
cycle_detail.time_records["planner"] = time.time() - planner_started_at
# LLM调用后移除刚才添加的参考消息一次性使用
if reference_added and self._runtime._chat_history:
# 从末尾往前查找并移除参考消息
for i in range(len(self._runtime._chat_history) - 1, -1, -1):
if get_message_source(self._runtime._chat_history[i]) == "user_reference":
self._runtime._chat_history.pop(i)
break
logger.info(
f"{self._runtime.log_prefix} planner 完成: "
f"round={round_index + 1} "
f"elapsed={cycle_detail.time_records['planner']:.3f}s"
)
reasoning_content = response.content or ""
if self._should_replace_reasoning(reasoning_content):
@@ -104,9 +112,6 @@ class MaisakaReasoningEngine:
logger.info(f"{self._runtime.log_prefix} reasoning content replaced due to high similarity")
self._last_reasoning_content = reasoning_content
response.raw_message.platform = anchor_message.platform
response.raw_message.session_id = self._runtime.session_id
response.raw_message.message_info.group_info = self._runtime._build_group_info(anchor_message)
self._runtime._chat_history.append(response.raw_message)
if response.tool_calls:
@@ -124,6 +129,16 @@ class MaisakaReasoningEngine:
if response.content:
continue
break
except ReqAbortException:
interrupted_at = time.time()
logger.info(
f"{self._runtime.log_prefix} planner 打断成功: "
f"round={round_index + 1} "
f"started_at={planner_started_at:.3f} "
f"interrupted_at={interrupted_at:.3f} "
f"elapsed={interrupted_at - planner_started_at:.3f}s"
)
break
finally:
self._end_cycle(cycle_detail)
@@ -136,6 +151,7 @@ class MaisakaReasoningEngine:
raise
except Exception:
logger.exception("%s Maisaka internal loop crashed", self._runtime.log_prefix)
logger.error(traceback.format_exc())
raise
def _get_timeout_anchor_message(self) -> Optional[SessionMessage]:
@@ -144,16 +160,31 @@ class MaisakaReasoningEngine:
return self._runtime.message_cache[-1]
return None
def _build_wait_timeout_message(self, anchor_message: SessionMessage) -> SessionMessage:
"""构造 wait 超时后的工具结果消息,用于触发下一轮思考"""
return build_message(
role="tool",
def _build_wait_timeout_message(self) -> ToolResultMessage:
"""构造 wait 超时后的工具结果消息。"""
tool_call_id = self._runtime._pending_wait_tool_call_id or "wait_timeout"
self._runtime._pending_wait_tool_call_id = None
return ToolResultMessage(
content="wait 已超时,期间没有收到新的用户输入。请基于现有上下文继续下一轮思考。",
source="tool",
platform=anchor_message.platform,
session_id=self._runtime.session_id,
group_info=self._runtime._build_group_info(anchor_message),
user_info=UserInfo(user_id="maisaka_tool", user_nickname="tool", user_cardname=None),
timestamp=datetime.now(),
tool_call_id=tool_call_id,
tool_name="wait",
)
def _append_wait_interrupted_message_if_needed(self) -> None:
"""如果 wait 被新消息打断,则补一条对应的工具结果消息。"""
tool_call_id = self._runtime._pending_wait_tool_call_id
if not tool_call_id:
return
self._runtime._pending_wait_tool_call_id = None
self._runtime._chat_history.append(
ToolResultMessage(
content="wait 被新的用户输入打断,已继续处理最新消息。",
timestamp=datetime.now(),
tool_call_id=tool_call_id,
tool_name="wait",
)
)
async def _ingest_messages(self, messages: list[SessionMessage]) -> None:
@@ -164,17 +195,11 @@ class MaisakaReasoningEngine:
if not user_sequence.components:
continue
history_message = build_message(
role="user",
content=visible_text,
source="user",
timestamp=message.timestamp,
platform=message.platform,
session_id=self._runtime.session_id,
group_info=self._runtime._build_group_info(message),
user_info=self._runtime._build_runtime_user_info(),
history_message = SessionBackedMessage.from_session_message(
message,
raw_message=user_sequence,
display_text=visible_text,
visible_text=visible_text,
source_kind="user",
)
self._insert_chat_history_message(history_message)
self._trim_chat_history()
@@ -239,141 +264,10 @@ class MaisakaReasoningEngine:
speaker_name = user_info.user_cardname or user_info.user_nickname or user_info.user_id
return format_speaker_content(speaker_name, content, message.timestamp, message.message_id).strip()
def _insert_chat_history_message(self, message: SessionMessage) -> int:
"""按时间顺序将消息插入聊天历史,同时保留 system 消息在最前"""
if not self._runtime._chat_history:
self._runtime._chat_history.append(message)
return 0
insert_at = len(self._runtime._chat_history)
for index, existing_message in enumerate(self._runtime._chat_history):
if get_message_role(existing_message) == "system":
continue
if existing_message.timestamp > message.timestamp:
insert_at = index
break
self._runtime._chat_history.insert(insert_at, message)
return insert_at
def _append_jargon_reference_message(self) -> bool:
"""每次LLM生成前如果命中了黑话词条则添加一条参考信息消息到聊天历史末尾。
Returns:
bool: 是否添加了参考消息
"""
content = self._build_user_history_corpus()
if not content:
return False
matched_words = self._find_jargon_words_in_text(content)
if not matched_words:
return False
# 记录已展示的 jargon
for word in matched_words:
self._shown_jargons.add(word.lower())
reference_text = (
"[参考信息]\n"
f"{','.join(matched_words)}可能是jargon可以使用query_jargon来查看其含义"
)
reference_sequence = MessageSequence([TextComponent(reference_text)])
# 使用当前时间作为时间戳
reference_message = build_message(
role="user",
content="",
source="user_reference",
timestamp=datetime.now(),
platform=self._runtime.chat_stream.platform,
session_id=self._runtime.session_id,
group_info=self._runtime._build_group_info(),
user_info=self._runtime._build_runtime_user_info(),
raw_message=reference_sequence,
display_text=reference_text,
)
self._runtime._chat_history.append(reference_message)
return True
def _build_user_history_corpus(self) -> str:
"""拼接当前聊天记录内所有用户消息的正文,用于统一匹配黑话。"""
parts: list[str] = []
for history_message in self._runtime._chat_history:
if get_message_role(history_message) != "user":
continue
if get_message_source(history_message) != "user":
continue
text = (get_message_text(history_message) or "").strip()
if not text:
continue
parts.append(text)
return "\n".join(parts)
def _find_jargon_words_in_text(self, content: str) -> list[str]:
"""匹配正文中出现的 jargon 词条。"""
lowered_content = content.lower()
matched_entries: list[tuple[int, int, int, str]] = []
seen_words: set[str] = set()
with get_db_session(auto_commit=False) as session:
query = (
select(Jargon)
.where(Jargon.is_jargon.is_(True))
.order_by(Jargon.count.desc()) # type: ignore[attr-defined]
)
jargons = session.exec(query).all()
for jargon in jargons:
jargon_content = str(jargon.content or "").strip()
if not jargon_content:
continue
# meaning 为空的不匹配
if not str(jargon.meaning or "").strip():
continue
normalized_content = jargon_content.lower()
if normalized_content in seen_words:
continue
# 跳过已经展示过的 jargon
if normalized_content in self._shown_jargons:
continue
if not self._is_visible_jargon(jargon):
continue
match_position = self._get_jargon_match_position(jargon_content, lowered_content, content)
if match_position is None:
continue
seen_words.add(normalized_content)
matched_entries.append((match_position, -len(jargon_content), -int(jargon.count or 0), jargon_content))
matched_entries.sort()
return [matched_content for _, _, _, matched_content in matched_entries[:8]]
def _is_visible_jargon(self, jargon: Jargon) -> bool:
"""判断当前会话是否可见该 jargon。"""
if global_config.expression.all_global_jargon or bool(jargon.is_global):
return True
try:
session_id_dict = json.loads(jargon.session_id_dict or "{}")
except (TypeError, json.JSONDecodeError):
logger.warning(f"Failed to parse jargon.session_id_dict: jargon_id={jargon.id}")
return False
return self._runtime.session_id in session_id_dict
@staticmethod
def _get_jargon_match_position(jargon_content: str, lowered_content: str, original_content: str) -> Optional[int]:
"""返回 jargon 在文本中的首次命中位置,未命中时返回 `None`。"""
if re.search(r"[\u4e00-\u9fff]", jargon_content):
match_index = original_content.lower().find(jargon_content.lower())
return match_index if match_index >= 0 else None
pattern = rf"\b{re.escape(jargon_content.lower())}\b"
match = re.search(pattern, lowered_content)
if match is None:
return None
return match.start()
def _insert_chat_history_message(self, message: LLMContextMessage) -> int:
"""将消息按处理顺序追加到聊天历史末尾"""
self._runtime._chat_history.append(message)
return len(self._runtime._chat_history) - 1
def _start_cycle(self) -> CycleDetail:
"""开始一轮 Maisaka 思考循环。"""
@@ -397,10 +291,7 @@ class MaisakaReasoningEngine:
def _trim_chat_history(self) -> None:
"""裁剪聊天历史,保证用户消息数量不超过配置限制。"""
counted_roles = {"user", "assistant"}
conversation_message_count = sum(
1 for message in self._runtime._chat_history if get_message_role(message) in counted_roles
)
conversation_message_count = sum(1 for message in self._runtime._chat_history if message.count_in_context)
if conversation_message_count <= self._runtime._max_context_size:
return
@@ -410,7 +301,7 @@ class MaisakaReasoningEngine:
while conversation_message_count >= self._runtime._max_context_size and trimmed_history:
removed_message = trimmed_history.pop(0)
removed_count += 1
if get_message_role(removed_message) in counted_roles:
if removed_message.count_in_context:
conversation_message_count -= 1
self._runtime._chat_history = trimmed_history
@@ -441,6 +332,11 @@ class MaisakaReasoningEngine:
bool: 是否需要替换
"""
if not self._last_reasoning_content or not current_content:
logger.info(
f"{self._runtime.log_prefix} reasoning similarity skipped: "
f"last_empty={not bool(self._last_reasoning_content)} "
f"current_empty={not bool(current_content)} similarity=0.00"
)
return False
similarity = self._calculate_similarity(current_content, self._last_reasoning_content)
@@ -495,13 +391,7 @@ class MaisakaReasoningEngine:
except (TypeError, ValueError):
wait_seconds = 30
wait_seconds = max(0, wait_seconds)
self._runtime._chat_history.append(
self._build_tool_message(
tool_call,
f"Waiting for future input for up to {wait_seconds} seconds.",
)
)
self._runtime._enter_wait_state(seconds=wait_seconds)
self._runtime._enter_wait_state(seconds=wait_seconds, tool_call_id=tool_call.call_id)
return True
if tool_call.func_name == "stop":
@@ -743,33 +633,27 @@ class MaisakaReasoningEngine:
tool_reasoning=latest_thought,
)
target_platform = target_message.platform or anchor_message.platform
bot_name = global_config.bot.nickname.strip() or "MaiSaka"
bot_user_info = UserInfo(
user_id=get_bot_account(target_platform) or "maisaka_assistant",
user_nickname=bot_name,
user_cardname=None,
reply_timestamp = datetime.now()
planner_prefix = (
f"[时间]{reply_timestamp.strftime('%H:%M:%S')}\n"
f"[用户]{bot_name}\n"
"[用户群昵称]\n"
"[msg_id]\n"
"[发言内容]"
)
history_message = build_message(
role="user",
content="",
source="guided_reply",
platform=target_platform,
session_id=self._runtime.session_id,
group_info=self._runtime._build_group_info(target_message),
user_info=bot_user_info,
)
history_message.raw_message = MessageSequence(
[TextComponent(f"{self._build_planner_user_prefix(history_message)}{combined_reply_text}")]
history_message = SessionBackedMessage(
raw_message=MessageSequence([TextComponent(f"{planner_prefix}{combined_reply_text}")]),
visible_text="",
timestamp=reply_timestamp,
source_kind="guided_reply",
)
visible_reply_text = format_speaker_content(
bot_name,
combined_reply_text,
history_message.timestamp,
history_message.message_id,
reply_timestamp,
)
history_message.display_message = visible_reply_text
history_message.processed_plain_text = visible_reply_text
history_message.visible_text = visible_reply_text
self._runtime._chat_history.append(history_message)
return True
@@ -871,14 +755,10 @@ class MaisakaReasoningEngine:
self._build_tool_message(tool_call, "Failed to send emoji.")
)
def _build_tool_message(self, tool_call: ToolCall, content: str) -> SessionMessage:
return build_message(
role="tool",
def _build_tool_message(self, tool_call: ToolCall, content: str) -> ToolResultMessage:
return ToolResultMessage(
content=content,
source="tool",
timestamp=datetime.now(),
tool_call_id=tool_call.call_id,
platform=self._runtime.chat_stream.platform,
session_id=self._runtime.session_id,
group_info=self._runtime._build_group_info(),
user_info=UserInfo(user_id="maisaka_tool", user_nickname="tool", user_cardname=None),
tool_name=tool_call.func_name,
)

View File

@@ -19,6 +19,7 @@ from src.learners.jargon_miner import JargonMiner
from src.mcp_module import MCPManager
from .chat_loop_service import MaisakaChatLoopService
from .context_messages import LLMContextMessage
from .reasoning_engine import MaisakaReasoningEngine
logger = get_logger("maisaka_runtime")
@@ -40,7 +41,7 @@ class MaisakaHeartFlowChatting:
session_name = chat_manager.get_session_name(session_id) or session_id
self.log_prefix = f"[{session_name}]"
self._chat_loop_service = MaisakaChatLoopService()
self._chat_history: list[SessionMessage] = []
self._chat_history: list[LLMContextMessage] = []
self.history_loop: list[CycleDetail] = []
# Keep all original messages for batching and later learning.
@@ -60,6 +61,8 @@ class MaisakaHeartFlowChatting:
self._max_context_size = max(1, int(global_config.chat.max_context_size))
self._agent_state: Literal["running", "wait", "stop"] = self._STATE_STOP
self._wait_until: Optional[float] = None
self._pending_wait_tool_call_id: Optional[str] = None
self._planner_interrupt_flag: Optional[asyncio.Event] = None
expr_use, jargon_learn, expr_learn = ExpressionConfigUtils.get_expression_config_for_chat(session_id)
self._enable_expression_use = expr_use
@@ -78,14 +81,14 @@ class MaisakaHeartFlowChatting:
async def start(self) -> None:
"""Start the runtime loop."""
if self._running:
self._ensure_background_tasks_running()
return
if global_config.maisaka.enable_mcp:
await self._init_mcp()
self._running = True
self._internal_loop_task = asyncio.create_task(self._reasoning_engine.run_loop())
self._loop_task = asyncio.create_task(self._main_loop())
self._ensure_background_tasks_running()
logger.info(f"{self.log_prefix} Maisaka runtime started")
async def stop(self) -> None:
@@ -128,12 +131,48 @@ class MaisakaHeartFlowChatting:
async def register_message(self, message: SessionMessage) -> None:
"""Cache a new message and wake the main loop."""
if self._running:
self._ensure_background_tasks_running()
self.message_cache.append(message)
self._source_messages_by_id[message.message_id] = message
if self._agent_state == self._STATE_RUNNING and self._planner_interrupt_flag is not None:
logger.info(
f"{self.log_prefix} 收到新消息,发起 planner 打断; "
f"msg_id={message.message_id} cache_size={len(self.message_cache)} "
f"timestamp={time.time():.3f}"
)
self._planner_interrupt_flag.set()
if self._agent_state in (self._STATE_WAIT, self._STATE_STOP):
self._agent_state = self._STATE_RUNNING
self._new_message_event.set()
def _ensure_background_tasks_running(self) -> None:
"""确保后台任务仍在运行,若崩溃则自动拉起。"""
if not self._running:
return
if self._internal_loop_task is None or self._internal_loop_task.done():
if self._internal_loop_task is not None and not self._internal_loop_task.cancelled():
try:
exc = self._internal_loop_task.exception()
except Exception:
exc = None
if exc is not None:
logger.error(f"{self.log_prefix} internal loop task exited unexpectedly: {exc}")
self._internal_loop_task = asyncio.create_task(self._reasoning_engine.run_loop())
logger.warning(f"{self.log_prefix} restarted Maisaka internal loop task")
if self._loop_task is None or self._loop_task.done():
if self._loop_task is not None and not self._loop_task.cancelled():
try:
exc = self._loop_task.exception()
except Exception:
exc = None
if exc is not None:
logger.error(f"{self.log_prefix} main loop task exited unexpectedly: {exc}")
self._loop_task = asyncio.create_task(self._main_loop())
logger.warning(f"{self.log_prefix} restarted Maisaka main loop task")
async def _main_loop(self) -> None:
try:
while self._running:
@@ -222,15 +261,17 @@ class MaisakaHeartFlowChatting:
self._wait_until = None
return "timeout"
def _enter_wait_state(self, seconds: Optional[float] = None) -> None:
def _enter_wait_state(self, seconds: Optional[float] = None, tool_call_id: Optional[str] = None) -> None:
"""Enter wait state."""
self._agent_state = self._STATE_WAIT
self._wait_until = None if seconds is None else time.time() + seconds
self._pending_wait_tool_call_id = tool_call_id
def _enter_stop_state(self) -> None:
"""Enter stop state."""
self._agent_state = self._STATE_STOP
self._wait_until = None
self._pending_wait_tool_call_id = None
async def _trigger_batch_learning(self, messages: list[SessionMessage]) -> None:
"""按同一批消息触发表达方式、黑话和 knowledge 学习。"""

View File

@@ -9,12 +9,11 @@ import json as _json
from rich.panel import Panel
from src.chat.message_receive.message import SessionMessage
from src.cli.console import console
from src.cli.input_reader import InputReader
from src.llm_models.payload_content.tool_option import ToolCall
from .message_adapter import build_message
from .context_messages import LLMContextMessage, ToolResultMessage
if TYPE_CHECKING:
from src.mcp_module import MCPManager
@@ -33,22 +32,34 @@ class ToolHandlerContext:
self.last_user_input_time: Optional[datetime] = None
async def handle_stop(tc: ToolCall, chat_history: list[SessionMessage]) -> None:
async def handle_stop(tc: ToolCall, chat_history: list[LLMContextMessage]) -> None:
"""处理 stop 工具。"""
console.print("[accent]调用工具: stop()[/accent]")
chat_history.append(
build_message(role="tool", content="当前轮次结束后将停止对话循环。", tool_call_id=tc.call_id)
ToolResultMessage(
content="当前轮次结束后将停止对话循环。",
timestamp=datetime.now(),
tool_call_id=tc.call_id,
tool_name=tc.func_name,
)
)
async def handle_wait(tc: ToolCall, chat_history: list[SessionMessage], ctx: ToolHandlerContext) -> str:
async def handle_wait(tc: ToolCall, chat_history: list[LLMContextMessage], ctx: ToolHandlerContext) -> str:
"""处理 wait 工具。"""
seconds = (tc.args or {}).get("seconds", 30)
seconds = max(5, min(seconds, 300))
console.print(f"[accent]调用工具: wait({seconds})[/accent]")
tool_result = await _do_wait(seconds, ctx)
chat_history.append(build_message(role="tool", content=tool_result, tool_call_id=tc.call_id))
chat_history.append(
ToolResultMessage(
content=tool_result,
timestamp=datetime.now(),
tool_call_id=tc.call_id,
tool_name=tc.func_name,
)
)
return tool_result
@@ -78,7 +89,7 @@ async def _do_wait(seconds: int, ctx: ToolHandlerContext) -> str:
return f"已收到用户输入: {user_input}"
async def handle_mcp_tool(tc: ToolCall, chat_history: list[SessionMessage], mcp_manager: "MCPManager") -> None:
async def handle_mcp_tool(tc: ToolCall, chat_history: list[LLMContextMessage], mcp_manager: "MCPManager") -> None:
"""处理 MCP 工具调用。"""
args_str = _json.dumps(tc.args or {}, ensure_ascii=False)
args_preview = args_str if len(args_str) <= 120 else args_str[:120] + "..."
@@ -96,10 +107,24 @@ async def handle_mcp_tool(tc: ToolCall, chat_history: list[SessionMessage], mcp_
padding=(0, 1),
)
)
chat_history.append(build_message(role="tool", content=result, tool_call_id=tc.call_id))
chat_history.append(
ToolResultMessage(
content=result,
timestamp=datetime.now(),
tool_call_id=tc.call_id,
tool_name=tc.func_name,
)
)
async def handle_unknown_tool(tc: ToolCall, chat_history: list[SessionMessage]) -> None:
async def handle_unknown_tool(tc: ToolCall, chat_history: list[LLMContextMessage]) -> None:
"""处理未知工具调用。"""
console.print(f"[accent]调用未知工具: {tc.func_name}({tc.args})[/accent]")
chat_history.append(build_message(role="tool", content=f"未知工具: {tc.func_name}", tool_call_id=tc.call_id))
chat_history.append(
ToolResultMessage(
content=f"未知工具: {tc.func_name}",
timestamp=datetime.now(),
tool_call_id=tc.call_id,
tool_name=tc.func_name,
)
)