Merge remote-tracking branch 'upstream/r-dev' into sync/pr-1564-upstream-20260331

# Conflicts:
#	src/chat/brain_chat/PFC/conversation.py
#	src/chat/brain_chat/PFC/pfc_KnowledgeFetcher.py
#	src/chat/knowledge/lpmm_ops.py
This commit is contained in:
A-Dawn
2026-03-31 10:43:55 +08:00
179 changed files with 21829 additions and 20118 deletions

View File

@@ -1,30 +0,0 @@
# 运行时配置(包含用户敏感信息)
config.toml
# 备份文件
*.backup.*
*.bak
# 日志
logs/
*.log
*.jsonl
# Python 缓存
__pycache__/
*.py[cod]
*$py.class
*.so
# 本地测试脚本(仓库不提交)
test_*.py
# IDE
.idea/
.vscode/
*.swp
*.swo
# 系统文件
.DS_Store
Thumbs.db

View File

@@ -1,24 +0,0 @@
# Changelog
本文件记录 `MaiBot_MCPBridgePlugin` 的用户可感知变更。
## 2.0.0
- 配置入口统一MCP 服务器仅使用 Claude Desktop `mcpServers` JSON`servers.claude_config_json`
- 兼容迁移:自动识别旧版 `servers.list` 并迁移为 `mcpServers`(需在 WebUI 保存一次固化)
- 保持功能不变:保留 Workflow硬流程/工具链)与 ReAct软流程双轨制能力
- 精简实现:移除旧的 WebUI 导入导出/快速添加服务器实现与 `tomlkit` 依赖
- 易用性:完善 Workflow 变量替换(支持数组下标与 bracket 写法),并优化 WebUI 配置区顺序
## 1.9.0
- 双轨制架构ReAct软流程+ Workflow硬流程/工具链)
## 1.8.0
- Workflow工具链多工具顺序执行、变量替换、自定义 Workflow 并注册为组合工具
## 1.7.0
- 断路器模式、状态刷新、工具搜索等易用性增强

View File

@@ -1,356 +0,0 @@
# MCP 桥接插件开发文档
本文档面向开发者,介绍插件的架构设计、核心模块和扩展方式。
## 架构概览
```
MaiBot_MCPBridgePlugin/
├── plugin.py # 主插件文件,包含所有核心逻辑
├── mcp_client.py # MCP 客户端封装
├── tool_chain.py # 工具链Workflow模块
├── core/
│ └── claude_config.py # Claude Desktop mcpServers 解析/迁移
├── config.toml # 运行时配置
└── _manifest.json # 插件元数据
```
## 核心模块
### 1. MCP 客户端 (`mcp_client.py`)
封装了与 MCP 服务器的通信逻辑。
```python
from .mcp_client import mcp_manager, MCPServerConfig, TransportType
# 添加服务器
config = MCPServerConfig(
name="my-server",
transport=TransportType.STREAMABLE_HTTP,
url="https://mcp.example.com/mcp"
)
await mcp_manager.add_server(config)
# 调用工具
result = await mcp_manager.call_tool("server_tool_name", {"param": "value"})
if result.success:
print(result.content)
```
**支持的传输类型:**
- `STDIO`: 本地进程通信
- `SSE`: Server-Sent Events
- `HTTP`: HTTP 请求
- `STREAMABLE_HTTP`: 流式 HTTP推荐
### 2. 工具注册系统
MCP 工具通过动态类创建注册到 MaiBot
```python
# 创建工具代理类
class MCPToolProxy(BaseTool):
name = "mcp_server_tool"
description = "工具描述"
parameters = [("param", ToolParamType.STRING, "参数描述", True, None)]
available_for_llm = True
async def execute(self, function_args):
result = await mcp_manager.call_tool(self._mcp_tool_key, function_args)
return {"name": self.name, "content": result.content}
```
### 3. 工具链模块 (`tool_chain.py`)
实现 Workflow 硬流程,支持多工具顺序执行。
```python
from .tool_chain import ToolChainDefinition, ToolChainStep, tool_chain_manager
# 定义工具链
chain = ToolChainDefinition(
name="search_and_detail",
description="搜索并获取详情",
input_params={"query": "搜索关键词"},
steps=[
ToolChainStep(
tool_name="mcp_server_search",
args_template={"keyword": "${input.query}"},
output_key="search_result"
),
ToolChainStep(
tool_name="mcp_server_detail",
args_template={"id": "${prev}"}
)
]
)
# 注册并执行
tool_chain_manager.add_chain(chain)
result = await tool_chain_manager.execute_chain("search_and_detail", {"query": "test"})
```
**变量替换语法:**
- `${input.参数名}`: 用户输入
- `${step.输出键}`: 指定步骤的输出
- `${prev}`: 上一步输出
- `${prev.字段}`: 上一步输出JSON的字段
- `${step.geo.return.0.location}` / `${step.geo.return[0].location}`: 数组下标访问
- `${step.geo['return'][0]['location']}`: bracket 写法(最通用)
## 双轨制架构
### ReAct 软流程
将 MCP 工具注册到 MaiBot 的记忆检索 ReAct 系统LLM 自主决策调用。
```python
def _register_tools_to_react(self) -> int:
from src.memory_system.retrieval_tools import register_memory_retrieval_tool
def make_execute_func(tool_key: str):
async def execute_func(**kwargs) -> str:
result = await mcp_manager.call_tool(tool_key, kwargs)
return result.content if result.success else f"失败: {result.error}"
return execute_func
register_memory_retrieval_tool(
name="mcp_tool_name",
description="工具描述",
parameters=[{"name": "param", "type": "string", "required": True}],
execute_func=make_execute_func("tool_key")
)
```
### Workflow 硬流程
用户预定义的固定执行流程,注册为组合工具。
```python
def _register_tool_chains(self) -> None:
from src.plugin_system.core.component_registry import component_registry
for chain_name, chain in tool_chain_manager.get_enabled_chains().items():
info, tool_class = tool_chain_registry.register_chain(chain)
info.plugin_name = self.plugin_name
component_registry.register_component(info, tool_class)
```
## 配置系统
### MCP 服务器配置Claude Desktop 规范)
插件只接受 Claude Desktop 的 `mcpServers` JSON`core/claude_config.py`)。配置入口统一为:
- WebUI/配置文件:`[servers].claude_config_json`
- 命令:`/mcp import`(合并 `mcpServers`)与 `/mcp export`(导出当前 `mcpServers`
兼容迁移:
- 若检测到旧版 `servers.list`,会自动迁移为 `servers.claude_config_json`(仅迁移到内存配置,需 WebUI 保存一次固化)。
### WebUI 配置 Schema
使用 `ConfigField` 定义 WebUI 配置项:
```python
config_schema = {
"section_name": {
"field_name": ConfigField(
type=str, # 类型: str, bool, int, float
default="default_value", # 默认值
description="字段描述",
label="显示标签",
input_type="textarea", # 输入类型: text, textarea, password
rows=5, # textarea 行数
disabled=True, # 只读
choices=["a", "b"], # 下拉选项
hint="提示信息",
order=1, # 排序
),
},
}
```
### 配置读取
```python
# 在组件中读取配置
value = self.get_config("section.key", default="fallback")
# 在插件类中读取
value = self.config.get("section", {}).get("key", "default")
```
## 事件处理
### 启动事件
```python
class MCPStartupHandler(BaseEventHandler):
event_type = EventType.ON_START
handler_name = "mcp_startup"
async def execute(self, message):
global _plugin_instance
if _plugin_instance:
await _plugin_instance._async_connect_servers()
return (True, True, None, None, None)
```
### 停止事件
```python
class MCPStopHandler(BaseEventHandler):
event_type = EventType.ON_STOP
handler_name = "mcp_stop"
async def execute(self, message):
await mcp_manager.shutdown()
return (True, True, None, None, None)
```
## 命令系统
```python
class MCPStatusCommand(BaseCommand):
command_name = "mcp_status"
command_pattern = r"^/mcp(?:\s+(?P<action>\S+))?(?:\s+(?P<arg>.+))?$"
async def execute(self) -> Tuple[bool, str, bool]:
action = self.matched_groups.get("action", "")
arg = self.matched_groups.get("arg", "")
if action == "tools":
await self.send_text("工具列表...")
elif action == "reconnect":
await self._handle_reconnect(arg)
return (True, None, True) # (成功, 消息, 拦截)
```
## 高级功能
### 调用追踪
```python
from plugin import tool_call_tracer, ToolCallRecord
# 记录调用
record = ToolCallRecord(
call_id="xxx",
timestamp=time.time(),
tool_name="tool",
server_name="server",
arguments={"key": "value"},
success=True,
duration_ms=100.0
)
tool_call_tracer.record(record)
# 查询记录
recent = tool_call_tracer.get_recent(10)
by_tool = tool_call_tracer.get_by_tool("tool_name")
```
### 调用缓存
```python
from plugin import tool_call_cache
# 配置缓存
tool_call_cache.configure(
enabled=True,
ttl=300, # 秒
max_entries=200,
exclude_tools="mcp_*_time_*" # 排除模式
)
# 使用缓存
cached = tool_call_cache.get("tool_name", {"param": "value"})
if cached is None:
result = await call_tool(...)
tool_call_cache.set("tool_name", {"param": "value"}, result)
```
### 权限控制
```python
from plugin import permission_checker
# 配置权限
permission_checker.configure(
enabled=True,
default_mode="allow_all", # 或 "deny_all"
rules_json='[{"tool": "mcp_*_delete_*", "denied": ["qq:123:group"]}]',
quick_deny_groups="123456789",
quick_allow_users="111111111"
)
# 检查权限
allowed = permission_checker.check(
tool_name="mcp_server_delete",
chat_id="123456",
user_id="789",
is_group=True
)
```
### 断路器模式
MCP 客户端内置断路器,故障服务器快速失败:
- 连续失败 N 次后熔断
- 熔断期间直接返回错误
- 定期尝试恢复
## 扩展开发
### 添加新的传输类型
1.`mcp_client.py` 中添加 `TransportType` 枚举值
2. 实现对应的连接逻辑
3. 更新 `_create_transport()` 方法
### 添加新的工具类型
1. 继承 `BaseTool` 创建新类
2.`get_plugin_components()` 中注册
3. 实现 `execute()` 方法
### 添加新的命令
1.`MCPStatusCommand.execute()` 中添加新的 action 分支
2. 或创建新的 `BaseCommand` 子类
## 调试技巧
### 日志级别
```python
from src.common.logger import get_logger
logger = get_logger("mcp_bridge_plugin")
logger.debug("详细调试信息")
logger.info("一般信息")
logger.warning("警告")
logger.error("错误")
```
### 常用调试命令
```bash
/mcp # 查看状态
/mcp tools # 查看工具列表
/mcp trace # 查看调用记录
/mcp cache # 查看缓存状态
/mcp chain # 查看工具链
```
## 更新日志
`plugins/MaiBot_MCPBridgePlugin/CHANGELOG.md`
## 开发约定
- 本仓库不提交测试脚本/临时复现文件;如需本地验证,可自行在工作区创建未跟踪文件(建议放到 `.local/` 并加入 `.gitignore`)。

View File

@@ -1,357 +0,0 @@
# MCP 桥接插件
将 [MCP (Model Context Protocol)](https://modelcontextprotocol.io/) 服务器的工具桥接到 MaiBot使麦麦能够调用外部 MCP 工具。
<img width="3012" height="1794" alt="image" src="https://github.com/user-attachments/assets/ece56404-301a-4abf-b16d-87bd430fc977" />
## 🚀 快速开始
### 1. 安装
```bash
# 克隆到 MaiBot 插件目录
cd /path/to/MaiBot/plugins
git clone https://github.com/CharTyr/MaiBot_MCPBridgePlugin.git MCPBridgePlugin
# 安装依赖
pip install mcp
# 复制配置文件
cd MCPBridgePlugin
cp config.example.toml config.toml
```
### 2. 添加服务器
编辑 `config.toml`,在 `[servers]``claude_config_json` 中填写 Claude Desktop 的 `mcpServers` JSON
```toml
[servers]
claude_config_json = '''
{
"mcpServers": {
"time": { "transport": "streamable_http", "url": "https://mcp.api-inference.modelscope.cn/server/mcp-server-time" },
"my-server": { "transport": "streamable_http", "url": "https://mcp.xxx.com/mcp", "headers": { "Authorization": "Bearer 你的密钥" } },
"fetch": { "command": "uvx", "args": ["mcp-server-fetch"] }
}
}
'''
```
### 3. 启动
重启 MaiBot或发送 `/mcp reconnect`
---
## 📚 去哪找 MCP 服务器?
| 平台 | 说明 |
|------|------|
| [mcp.modelscope.cn](https://mcp.modelscope.cn/) | 魔搭 ModelScope免费推荐 |
| [smithery.ai](https://smithery.ai/) | MCP 服务器注册中心 |
| [github.com/modelcontextprotocol/servers](https://github.com/modelcontextprotocol/servers) | 官方服务器列表 |
---
## 💡 常用命令
| 命令 | 说明 |
|------|------|
| `/mcp` | 查看连接状态 |
| `/mcp tools` | 查看可用工具 |
| `/mcp reconnect` | 重连服务器 |
| `/mcp trace` | 查看调用记录 |
| `/mcp cache` | 查看缓存状态 |
| `/mcp perm` | 查看权限配置 |
| `/mcp import <json>` | 🆕 导入 Claude Desktop 配置 |
| `/mcp export` | 🆕 导出配置 |
| `/mcp search <关键词>` | 🆕 搜索工具 |
| `/mcp chain` | 🆕 查看工具链 |
| `/mcp chain <名称>` | 🆕 查看工具链详情 |
| `/mcp chain test <名称> <参数>` | 🆕 测试执行工具链 |
---
## ✨ 功能特性
### 核心功能
- 🔌 多服务器同时连接
- 📡 支持 stdio / SSE / HTTP / Streamable HTTP
- 🔄 自动重试、心跳检测、断线重连
- 🖥️ WebUI 完整配置支持
### 双轨制架构
- 🔄 **ReAct软流程**LLM 自主决策,多轮动态调用 MCP 工具(适合探索式场景)
- 🔗 **Workflow硬流程/工具链)**:用户预定义步骤顺序与参数传递(适合可控可复用场景)
### 高级功能
- 📦 Resources 支持(实验性)
- 📝 Prompts 支持(实验性)
- 🔄 结果后处理LLM 摘要提炼)
- 🔍 调用追踪 / 🗄️ 调用缓存 / 🔐 权限控制 / 🚫 工具禁用
### 更新日志
-`plugins/MaiBot_MCPBridgePlugin/CHANGELOG.md`
---
## ⚙️ 配置说明
### 服务器配置
```json
{
"mcpServers": {
"server_name": {
"transport": "streamable_http",
"url": "https://..."
}
}
}
```
| 字段 | 说明 |
|------|------|
| `mcpServers.<name>` | 服务器名称(唯一) |
| `enabled` | 是否启用(可选,默认 true |
| `transport` | `stdio` / `sse` / `http` / `streamable_http` |
| `url` | 远程服务器地址 |
| `headers` | 🆕 鉴权头(如 `{"Authorization": "Bearer xxx"}` |
| `command` / `args` | 本地服务器启动命令 |
### 权限控制
**快捷配置(推荐):**
```toml
[permissions]
perm_enabled = true
quick_deny_groups = "123456789" # 禁用的群号
quick_allow_users = "111111111" # 管理员白名单
```
**高级规则:**
```json
[{"tool": "mcp_*_delete_*", "denied": ["qq:123456:group"]}]
```
### 工具禁用
```toml
[tools]
disabled_tools = '''
mcp_filesystem_delete_file
mcp_filesystem_write_file
'''
```
### 调用缓存
```toml
[settings]
cache_enabled = true
cache_ttl = 300
cache_exclude_tools = "mcp_*_time_*"
```
---
## ❓ 常见问题
**Q: 工具没有注册?**
- 检查 `enabled = true`
- 检查 MaiBot 日志错误信息
- 确认 `pip install mcp`
**Q: JSON 格式报错?**
- 多行 JSON 用 `'''` 三引号包裹
- 使用英文双引号 `"`
**Q: 如何手动重连?**
- `/mcp reconnect``/mcp reconnect 服务器名`
---
## 📥 配置导入导出Claude mcpServers
### 从 Claude Desktop 导入
如果你已有 Claude Desktop 的 MCP 配置,可以直接导入:
```
/mcp import {"mcpServers":{"time":{"command":"uvx","args":["mcp-server-time"]},"fetch":{"command":"uvx","args":["mcp-server-fetch"]}}}
```
支持的格式:
- Claude Desktop 格式(`mcpServers` 对象)
- 兼容旧版MaiBot servers 列表数组(将自动迁移为 `mcpServers`
### 导出配置
```
/mcp export # 导出为 Claude Desktop 格式(默认)
/mcp export claude # 导出为 Claude Desktop 格式
```
### 注意事项
- 导入时会自动跳过同名服务器
- 导入后需要发送 `/mcp reconnect` 使配置生效
- 支持 stdio、sse、http、streamable_http 全部传输类型
---
## 🔗 Workflow硬流程/工具链)
工具链允许你将多个 MCP 工具按顺序执行,后续工具可以使用前序工具的输出作为输入。
### 1 分钟上手(推荐 WebUI
1. 先完成 MCP 服务器配置并 `/mcp reconnect`
2. 发送 `/mcp tools`,复制你要用的工具名
3. 打开 WebUI → 「Workflow硬流程/工具链)」→ 用“快速添加”表单填入:
- 名称/描述
- 输入参数(每行 `参数名=描述`
- 执行步骤(每行 `工具名|参数JSON|输出键`
4. 在“确认添加”中输入 `ADD` 并保存
### 快速添加工具链(推荐)
在 WebUI 的「工具链」配置区,使用表单快速添加:
1. **名称**: 填写工具链名称(英文,如 `search_and_detail`
2. **描述**: 填写工具链用途(供 LLM 理解何时使用)
3. **输入参数**: 每行一个,格式 `参数名=描述`
```
query=搜索关键词
max_results=最大结果数
```
4. **执行步骤**: 每行一个,格式 `工具名|参数JSON|输出键`
```
mcp_server_search|{"keyword":"${input.query}"}|search_result
mcp_server_detail|{"id":"${prev}"}|
```
5. **确认添加**: 输入 `ADD` 并保存
### JSON 配置方式
也可以直接在「工具链列表」中编写 JSON
```json
[
{
"name": "search_and_detail",
"description": "先搜索模组,再获取详情",
"input_params": {
"query": "搜索关键词"
},
"steps": [
{
"tool_name": "mcp_mcmod_search_mod",
"args_template": {"keyword": "${input.query}", "limit": 1},
"output_key": "search_result",
"description": "搜索模组"
},
{
"tool_name": "mcp_mcmod_get_mod_detail",
"args_template": {"mod_id": "${prev}"},
"description": "获取详情"
}
]
}
]
```
### 变量替换
| 变量格式 | 说明 |
|---------|------|
| `${input.参数名}` | 用户输入的参数 |
| `${step.输出键}` | 某个步骤的输出(通过 `output_key` 指定) |
| `${prev}` | 上一步的输出 |
| `${prev.字段}` | 上一步输出JSON的某个字段 |
| `${step.geo.return.0.location}` | 数组下标访问dot |
| `${step.geo.return[0].location}` | 数组下标访问([] |
| `${step.geo['return'][0]['location']}` | bracket 写法(最通用) |
### 工具链字段说明
| 字段 | 说明 |
|------|------|
| `name` | 工具链名称,将生成 `chain_xxx` 工具 |
| `description` | 描述,供 LLM 理解何时使用 |
| `input_params` | 输入参数定义 `{参数名: 描述}` |
| `steps` | 执行步骤数组 |
| `steps[].tool_name` | 要调用的工具名 |
| `steps[].args_template` | 参数模板,支持变量替换 |
| `steps[].output_key` | 输出存储键名(可选) |
| `steps[].optional` | 是否可选,失败时继续执行(默认 false |
### 命令
```bash
/mcp chain # 查看所有工具链
/mcp chain list # 列出工具链
/mcp chain <名称> # 查看详情
/mcp chain test <名称> {"query": "JEI"} # 测试执行
/mcp chain reload # 重新加载配置
```
---
## 🔄 双轨制架构
MCP 桥接插件支持两种工具调用模式,可根据场景选择:
### ReAct 软流程
LLM 自主决策的多轮工具调用模式,适合复杂、不确定的场景。
**工作原理:**
1. 用户提问 → LLM 分析需要什么信息
2. LLM 选择调用工具 → 获取结果
3. LLM 观察结果 → 决定是否需要更多信息
4. 重复 2-3 直到信息足够 → 生成最终回答
**启用方式:**
在 WebUI「ReAct (软流程)」配置区启用MCP 工具将自动注册到 MaiBot 的记忆检索 ReAct 系统。
**适用场景:**
- 复杂问题需要多步推理
- 不确定需要调用哪些工具
- 需要根据中间结果动态调整
### Workflow 硬流程
用户预定义的工作流,固定执行顺序,适合可靠、可控的场景。
**工作原理:**
1. 用户定义步骤顺序和参数传递
2. 按顺序执行每个步骤
3. 后续步骤可使用前序步骤的输出
4. 返回最终结果
**适用场景:**
- 流程固定、可预测
- 需要可靠、可重复的执行
- 希望精确控制工具调用顺序
### 对比
| 特性 | ReAct 软流程 | Workflow 硬流程 |
|------|-------------|----------------|
| 决策者 | LLM 自主决策 | 用户预定义 |
| 灵活性 | 高,动态调整 | 低,固定流程 |
| 可预测性 | 低 | 高 |
| 适用场景 | 复杂、探索性任务 | 固定、重复性任务 |
| 配置方式 | 启用即可 | 需要定义步骤 |
---
## 📋 依赖
- MaiBot >= 0.11.6
- Python >= 3.10
- mcp >= 1.0.0
## 📄 许可证
AGPL-3.0

View File

@@ -1,44 +0,0 @@
"""
MCP 桥接插件
将 MCP (Model Context Protocol) 服务器的工具桥接到 MaiBot
v1.1.0 新增功能:
- 心跳检测和自动重连
- 调用统计(次数、成功率、耗时)
- 更好的错误处理
v1.2.0 新增功能:
- Resources 支持(资源读取)
- Prompts 支持(提示模板)
"""
from .plugin import MCPBridgePlugin, mcp_tool_registry, MCPStartupHandler, MCPStopHandler
from .mcp_client import (
mcp_manager,
MCPClientManager,
MCPServerConfig,
TransportType,
MCPCallResult,
MCPToolInfo,
MCPResourceInfo,
MCPPromptInfo,
ToolCallStats,
ServerStats,
)
__all__ = [
"MCPBridgePlugin",
"mcp_tool_registry",
"mcp_manager",
"MCPClientManager",
"MCPServerConfig",
"TransportType",
"MCPCallResult",
"MCPToolInfo",
"MCPResourceInfo",
"MCPPromptInfo",
"ToolCallStats",
"ServerStats",
"MCPStartupHandler",
"MCPStopHandler",
]

View File

@@ -1,42 +0,0 @@
{
"manifest_version": 2,
"version": "2.0.0",
"name": "MCP桥接插件",
"description": "将 MCP (Model Context Protocol) 服务器的工具桥接到 MaiBot使麦麦能够调用外部 MCP 工具。",
"author": {
"name": "CharTyr",
"url": "https://github.com/CharTyr"
},
"license": "AGPL-3.0",
"urls": {
"repository": "https://github.com/CharTyr/MaiBot_MCPBridgePlugin",
"homepage": "https://github.com/CharTyr/MaiBot_MCPBridgePlugin",
"documentation": "https://github.com/CharTyr/MaiBot_MCPBridgePlugin",
"issues": "https://github.com/CharTyr/MaiBot_MCPBridgePlugin/issues"
},
"host_application": {
"min_version": "0.11.6",
"max_version": "1.0.0"
},
"sdk": {
"min_version": "2.0.0",
"max_version": "2.99.99"
},
"dependencies": [
{
"type": "python_package",
"name": "mcp",
"version_spec": ">=0.0.0"
}
],
"capabilities": [
"send.text"
],
"i18n": {
"default_locale": "zh-CN",
"supported_locales": [
"zh-CN"
]
},
"id": "chartyr.mcpbridge-plugin"
}

View File

@@ -1,309 +0,0 @@
# MCP桥接插件 - 配置文件示例
# 将 MCP (Model Context Protocol) 服务器的工具桥接到 MaiBot
#
# 使用方法:复制此文件为 config.toml然后根据需要修改配置
#
# ============================================================
# 🎯 快速开始(三步)
# ============================================================
# 1. 在下方 [servers] 添加 MCP 服务器配置
# 2. 将 enabled 改为 true 启用服务器
# 3. 重启 MaiBot 或发送 /mcp reconnect
#
# ============================================================
# 📚 去哪找 MCP 服务器?
# ============================================================
#
# 【远程服务(推荐新手)】
# - ModelScope: https://mcp.modelscope.cn/ (免费,推荐)
# - Smithery: https://smithery.ai/
# - Glama: https://glama.ai/mcp/servers
#
# 【本地服务(需要 npx 或 uvx
# - 官方列表: https://github.com/modelcontextprotocol/servers
#
# ============================================================
# ============================================================
# 🔌 MCP 服务器配置
# ============================================================
#
# ⚠️ 重要配置格式Claude Desktop 规范)
# ────────────────────────────────────────────────────────────
# 统一使用 Claude Desktop 的 mcpServers JSON。
#
# claude_config_json 的内容应为 JSON 对象:
# {
# "mcpServers": {
# "server_name": { ...server config... },
# "another": { ... }
# }
# }
#
# 每个服务器支持字段:
# transport - 传输方式: "stdio" / "sse" / "http" / "streamable_http"(可选)
# url - 服务器地址sse/http/streamable_http 模式)
# command - 启动命令stdio 模式,如 "npx" / "uvx"
# args - 命令参数数组stdio 模式)
# env - 环境变量对象stdio 模式,可选)
# headers - 鉴权头(可选,如 {"Authorization": "Bearer xxx"}
# enabled - 是否启用(可选,默认 true
# post_process - 服务器级别后处理配置(可选)
#
# ============================================================
[servers]
claude_config_json = '''
{
"mcpServers": {
"time-mcp-server": {
"enabled": false,
"transport": "streamable_http",
"url": "https://mcp.api-inference.modelscope.cn/server/mcp-server-time"
},
"my-auth-server": {
"enabled": false,
"transport": "streamable_http",
"url": "https://mcp.api-inference.modelscope.net/xxxxxx/mcp",
"headers": {
"Authorization": "Bearer ms-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
}
},
"fetch-local": {
"enabled": false,
"command": "uvx",
"args": ["mcp-server-fetch"]
}
}
}
'''
# ============================================================
# 插件基本信息
# ============================================================
[plugin]
name = "mcp_bridge_plugin"
version = "2.0.0"
config_version = "2.0.0"
enabled = false # 默认禁用,在 WebUI 中启用
# ============================================================
# Workflow硬流程/工具链)
# ============================================================
#
# 作用:把多个工具按顺序执行;后续步骤可引用前序输出。
#
# ✅ 推荐配置方式WebUI「Workflow硬流程/工具链)」里用“快速添加”表单。
# ✅ 也可以直接写 chains_listJSON 数组)。
#
# 变量替换:
# ${input.xxx} - 用户输入
# ${step.<output_key>} - 指定步骤输出(需设置 output_key
# ${prev} - 上一步输出
# ${prev.字段} - 上一步输出JSON的字段
# ${step.geo.return.0.location} - 数组/下标访问dot
# ${step.geo.return[0].location} - 数组/下标访问([]
# ${step.geo['return'][0]['location']} - bracket 写法
#
# ============================================================
[tool_chains]
chains_enabled = true
chains_list = '''
[
{
"name": "search_and_detail",
"description": "先搜索,再根据结果获取详情",
"input_params": { "query": "搜索关键词" },
"steps": [
{ "tool_name": "把这里替换成你的搜索工具名", "args_template": { "keyword": "${input.query}" }, "output_key": "search" },
{ "tool_name": "把这里替换成你的详情工具名", "args_template": { "id": "${prev}" } }
]
}
]
'''
# ============================================================
# ReAct软流程
# ============================================================
#
# 作用:把 MCP 工具注册到 MaiBot 的 ReAct 系统LLM 可自主多轮调用。
#
# 注意ReAct 适合“探索式/不确定”场景Workflow 适合“固定/可控”场景。
#
# ============================================================
[react]
react_enabled = false
filter_mode = "whitelist" # whitelist / blacklist
tool_filter = "" # 每行一个工具名,支持通配符 *
# ============================================================
# 全局设置(高级设置建议保持默认)
# ============================================================
[settings]
# 🏷️ 工具前缀 - 用于区分 MCP 工具和原生工具
tool_prefix = "mcp"
# ⏱️ 连接超时(秒)
connect_timeout = 30.0
# ⏱️ 调用超时(秒)
call_timeout = 60.0
# 🔄 自动连接 - 启动时自动连接所有已启用的服务器
auto_connect = true
# 🔁 重试次数 - 连接失败时的重试次数
retry_attempts = 3
# ⏳ 重试间隔(秒)
retry_interval = 5.0
# 💓 心跳检测 - 定期检测服务器连接状态
heartbeat_enabled = true
# 💓 心跳间隔(秒)- 建议 30-120 秒
heartbeat_interval = 60.0
# 🔄 自动重连 - 检测到断开时自动尝试重连
auto_reconnect = true
# 🔄 最大重连次数 - 连续重连失败后暂停重连
max_reconnect_attempts = 3
# ============================================================
# 高级功能(实验性)
# ============================================================
# 📦 启用 Resources - 允许读取 MCP 服务器提供的资源
enable_resources = false
# 📝 启用 Prompts - 允许使用 MCP 服务器提供的提示模板
enable_prompts = false
# ============================================================
# 结果后处理功能
# ============================================================
# 当 MCP 工具返回的内容过长时,使用 LLM 对结果进行摘要提炼
# 🔄 启用结果后处理
post_process_enabled = false
# 📏 后处理阈值(字符数)- 结果长度超过此值才触发后处理
post_process_threshold = 500
# 🔢 后处理输出限制 - LLM 摘要输出的最大 token 数
post_process_max_tokens = 500
# 🤖 后处理模型(可选)- 留空则使用 utils 模型组
post_process_model = ""
# 🧠 后处理提示词模板
post_process_prompt = '''用户问题:{query}
工具返回内容:
{result}
请从上述内容中提取与用户问题最相关的关键信息,简洁准确地输出:'''
# ============================================================
# 调用链路追踪
# ============================================================
# 记录工具调用详情,便于调试和分析
# 🔍 启用调用追踪
trace_enabled = true
# 📊 追踪记录上限 - 内存中保留的最大记录数
trace_max_records = 50
# 📝 追踪日志文件 - 是否将追踪记录写入日志文件
# 启用后记录写入 plugins/MaiBot_MCPBridgePlugin/logs/trace.jsonl
trace_log_enabled = false
# ============================================================
# 工具调用缓存
# ============================================================
# 缓存相同参数的调用结果,减少重复请求
# 🗄️ 启用调用缓存
cache_enabled = false
# ⏱️ 缓存有效期(秒)
cache_ttl = 300
# 📦 最大缓存条目 - 超出后 LRU 淘汰
cache_max_entries = 200
# 🚫 缓存排除列表 - 即不缓存的工具(每行一个,支持通配符 *
# 时间类、随机类工具建议排除
cache_exclude_tools = '''
mcp_*_time_*
mcp_*_random_*
'''
# ============================================================
# 工具管理
# ============================================================
[tools]
# 📋 工具清单(只读)- 启动后自动生成
tool_list = "(启动后自动生成)"
# 🚫 禁用工具列表 - 要禁用的工具名(每行一个)
# 从上方工具清单复制工具名,禁用后该工具不会被 LLM 调用
# 示例:
# disabled_tools = '''
# mcp_filesystem_delete_file
# mcp_filesystem_write_file
# '''
disabled_tools = ""
# ============================================================
# 权限控制
# ============================================================
[permissions]
# 🔐 启用权限控制 - 按群/用户限制工具使用
perm_enabled = false
# 📋 默认模式
# allow_all: 未配置规则的工具默认允许
# deny_all: 未配置规则的工具默认禁止
perm_default_mode = "allow_all"
# ────────────────────────────────────────────────────────────
# 🚀 快捷配置(推荐新手使用)
# ────────────────────────────────────────────────────────────
# 🚫 禁用群列表 - 这些群无法使用任何 MCP 工具(每行一个群号)
# 示例:
# quick_deny_groups = '''
# 123456789
# 987654321
# '''
quick_deny_groups = ""
# ✅ 管理员白名单 - 这些用户始终可以使用所有工具每行一个QQ号
# 示例:
# quick_allow_users = '''
# 111111111
# '''
quick_allow_users = ""
# ────────────────────────────────────────────────────────────
# 📜 高级权限规则(可选,针对特定工具配置)
# ────────────────────────────────────────────────────────────
# 格式: qq:ID:group/private/user工具名支持通配符 *
# 示例:
# perm_rules = '''
# [
# {"tool": "mcp_*_delete_*", "denied": ["qq:123456:group"]}
# ]
# '''
perm_rules = "[]"
# ============================================================
# 状态显示(只读)
# ============================================================
[status]
connection_status = "未初始化"

View File

@@ -1 +0,0 @@
"""Core helpers for MCP Bridge Plugin."""

View File

@@ -1,169 +0,0 @@
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List, Literal, Optional
class ClaudeConfigError(ValueError):
pass
Transport = Literal["stdio", "sse", "http", "streamable_http"]
@dataclass(frozen=True)
class ClaudeMcpServer:
name: str
transport: Transport
command: str = ""
args: List[str] = field(default_factory=list)
env: Dict[str, str] = field(default_factory=dict)
url: str = ""
headers: Dict[str, str] = field(default_factory=dict)
enabled: bool = True
def _normalize_transport(value: Optional[str]) -> Transport:
if not value:
return "streamable_http"
v = value.strip().lower().replace("-", "_")
if v in ("streamable_http", "streamablehttp", "streamable"):
return "streamable_http"
if v in ("http",):
return "http"
if v in ("sse",):
return "sse"
if v in ("stdio",):
return "stdio"
raise ClaudeConfigError(f"unsupported transport: {value}")
def _coerce_str_list(value: Any, field_name: str) -> List[str]:
if value is None:
return []
if isinstance(value, list):
return [str(v) for v in value]
raise ClaudeConfigError(f"{field_name} must be a list")
def _coerce_str_dict(value: Any, field_name: str) -> Dict[str, str]:
if value is None:
return {}
if isinstance(value, dict):
return {str(k): str(v) for k, v in value.items()}
raise ClaudeConfigError(f"{field_name} must be an object")
def parse_claude_mcp_config(config_json: str) -> List[ClaudeMcpServer]:
"""Parse Claude Desktop style MCP config JSON.
Supported:
- Full object: {"mcpServers": {...}}
- Direct mapping: {...} treated as mcpServers
"""
text = (config_json or "").strip()
if not text:
return []
try:
data = json.loads(text)
except json.JSONDecodeError as e:
raise ClaudeConfigError(f"invalid JSON: {e}") from e
if not isinstance(data, dict):
raise ClaudeConfigError("config must be a JSON object")
servers_obj = data.get("mcpServers", data)
if not isinstance(servers_obj, dict):
raise ClaudeConfigError("mcpServers must be an object")
servers: List[ClaudeMcpServer] = []
for name, raw in servers_obj.items():
if not isinstance(name, str) or not name.strip():
raise ClaudeConfigError("server name must be a non-empty string")
if not isinstance(raw, dict):
raise ClaudeConfigError(f"server '{name}' must be an object")
enabled = bool(raw.get("enabled", True))
command = str(raw.get("command", "") or "")
url = str(raw.get("url", "") or "")
args = _coerce_str_list(raw.get("args"), "args")
env = _coerce_str_dict(raw.get("env"), "env")
headers = _coerce_str_dict(raw.get("headers"), "headers")
transport_hint = raw.get("transport", raw.get("type"))
if command:
transport: Transport = "stdio"
elif url:
try:
transport = _normalize_transport(str(transport_hint) if transport_hint is not None else None)
except ClaudeConfigError:
transport = "streamable_http"
else:
raise ClaudeConfigError(f"server '{name}' must have either 'command' or 'url'")
servers.append(
ClaudeMcpServer(
name=name,
transport=transport,
command=command,
args=args,
env=env,
url=url,
headers=headers,
enabled=enabled,
)
)
return servers
def legacy_servers_list_to_claude_config(servers_list_json: str) -> str:
"""Convert legacy v1.x servers list (JSON array) to Claude mcpServers JSON.
Legacy item schema:
{"name","enabled","transport","url","headers","command","args","env"}
"""
text = (servers_list_json or "").strip()
if not text:
return ""
try:
data = json.loads(text)
except json.JSONDecodeError:
return ""
if isinstance(data, dict):
data = [data]
if not isinstance(data, list):
return ""
mcp_servers: Dict[str, Any] = {}
for item in data:
if not isinstance(item, dict):
continue
name = str(item.get("name", "") or "").strip()
if not name:
continue
enabled = bool(item.get("enabled", True))
transport = str(item.get("transport", "") or "").strip().lower().replace("-", "_")
if transport == "stdio" or item.get("command"):
entry: Dict[str, Any] = {
"enabled": enabled,
"command": item.get("command", "") or "",
"args": item.get("args", []) or [],
}
if item.get("env"):
entry["env"] = item.get("env")
mcp_servers[name] = entry
continue
entry = {"enabled": enabled, "url": item.get("url", "") or ""}
if item.get("headers"):
entry["headers"] = item.get("headers")
if transport:
entry["transport"] = transport
mcp_servers[name] = entry
if not mcp_servers:
return ""
return json.dumps({"mcpServers": mcp_servers}, ensure_ascii=False, indent=2)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,2 +0,0 @@
# MCP 桥接插件依赖
mcp>=1.0.0

View File

@@ -1,584 +0,0 @@
"""
MCP Workflow 模块 v1.9.0
支持用户自定义工作流(硬流程),将多个 MCP 工具按顺序执行
双轨制架构:
- 软流程 (ReAct): LLM 自主决策,动态多轮调用工具,灵活但不可预测
- 硬流程 (Workflow): 用户预定义的工作流,固定流程,可靠可控
功能:
- Workflow 定义和管理
- 顺序执行多个工具(硬流程)
- 支持变量替换(使用前序工具的输出)
- 自动注册为组合工具供 LLM 调用
- 与 ReAct 软流程互补,用户可选择合适的执行方式
"""
import json
import re
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
try:
from src.common.logger import get_logger
logger = get_logger("mcp_tool_chain")
except ImportError:
import logging
logger = logging.getLogger("mcp_tool_chain")
@dataclass
class ToolChainStep:
"""工具链步骤"""
tool_name: str # 要调用的工具名(如 mcp_server_tool
args_template: Dict[str, Any] = field(default_factory=dict) # 参数模板,支持变量替换
output_key: str = "" # 输出存储的键名,供后续步骤引用
description: str = "" # 步骤描述
optional: bool = False # 是否可选(失败时继续执行)
def to_dict(self) -> Dict[str, Any]:
return {
"tool_name": self.tool_name,
"args_template": self.args_template,
"output_key": self.output_key,
"description": self.description,
"optional": self.optional,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ToolChainStep":
return cls(
tool_name=data.get("tool_name", ""),
args_template=data.get("args_template", {}),
output_key=data.get("output_key", ""),
description=data.get("description", ""),
optional=data.get("optional", False),
)
@dataclass
class ToolChainDefinition:
"""工具链定义"""
name: str # 工具链名称(将作为组合工具的名称)
description: str # 工具链描述(供 LLM 理解)
steps: List[ToolChainStep] = field(default_factory=list) # 执行步骤
input_params: Dict[str, str] = field(default_factory=dict) # 输入参数定义 {参数名: 描述}
enabled: bool = True # 是否启用
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"description": self.description,
"steps": [step.to_dict() for step in self.steps],
"input_params": self.input_params,
"enabled": self.enabled,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ToolChainDefinition":
steps = [ToolChainStep.from_dict(s) for s in data.get("steps", [])]
return cls(
name=data.get("name", ""),
description=data.get("description", ""),
steps=steps,
input_params=data.get("input_params", {}),
enabled=data.get("enabled", True),
)
@dataclass
class ChainExecutionResult:
"""工具链执行结果"""
success: bool
final_output: str # 最终输出(最后一个步骤的结果)
step_results: List[Dict[str, Any]] = field(default_factory=list) # 每个步骤的结果
error: str = ""
total_duration_ms: float = 0.0
def to_summary(self) -> str:
"""生成执行摘要"""
lines = []
for i, step in enumerate(self.step_results):
status = "" if step.get("success") else ""
tool = step.get("tool_name", "unknown")
duration = step.get("duration_ms", 0)
lines.append(f"{status} 步骤{i + 1}: {tool} ({duration:.0f}ms)")
if not step.get("success") and step.get("error"):
lines.append(f" 错误: {step['error'][:50]}")
return "\n".join(lines)
class ToolChainExecutor:
"""工具链执行器"""
# 变量替换模式: ${step.output_key} 或 ${input.param_name} 或 ${prev}
VAR_PATTERN = re.compile(r"\$\{([^}]+)\}")
def __init__(self, mcp_manager):
self._mcp_manager = mcp_manager
def _resolve_tool_key(self, tool_name: str) -> Optional[str]:
"""解析工具名,返回有效的 tool_key
支持:
- 直接使用 tool_key如 mcp_server_tool
- 使用注册后的工具名(会自动转换 - 和 . 为 _
"""
all_tools = self._mcp_manager.all_tools
# 直接匹配
if tool_name in all_tools:
return tool_name
# 尝试转换后匹配(用户可能使用了注册后的名称)
normalized = tool_name.replace("-", "_").replace(".", "_")
if normalized in all_tools:
return normalized
# 尝试查找包含该名称的工具
for key in all_tools.keys():
if key.endswith(f"_{tool_name}") or key.endswith(f"_{normalized}"):
return key
return None
async def execute(
self,
chain: ToolChainDefinition,
input_args: Dict[str, Any],
) -> ChainExecutionResult:
"""执行工具链
Args:
chain: 工具链定义
input_args: 用户输入的参数
Returns:
ChainExecutionResult: 执行结果
"""
start_time = time.time()
step_results = []
context = {
"input": input_args or {}, # 用户输入,确保不为 None
"step": {}, # 各步骤输出,按 output_key 存储
"prev": "", # 上一步的输出
}
final_output = ""
# 验证必需的输入参数
missing_params = []
for param_name in chain.input_params.keys():
if param_name not in context["input"]:
missing_params.append(param_name)
if missing_params:
return ChainExecutionResult(
success=False,
final_output="",
error=f"缺少必需参数: {', '.join(missing_params)}",
total_duration_ms=(time.time() - start_time) * 1000,
)
for i, step in enumerate(chain.steps):
step_start = time.time()
step_result = {
"step_index": i,
"tool_name": step.tool_name,
"success": False,
"output": "",
"error": "",
"duration_ms": 0,
}
try:
# 替换参数中的变量
resolved_args = self._resolve_args(step.args_template, context)
step_result["resolved_args"] = resolved_args
# 解析工具名
tool_key = self._resolve_tool_key(step.tool_name)
if not tool_key:
step_result["error"] = f"工具 {step.tool_name} 不存在"
logger.warning(f"工具链步骤 {i + 1}: 工具 {step.tool_name} 不存在")
if not step.optional:
step_results.append(step_result)
return ChainExecutionResult(
success=False,
final_output="",
step_results=step_results,
error=f"步骤 {i + 1}: 工具 {step.tool_name} 不存在",
total_duration_ms=(time.time() - start_time) * 1000,
)
step_results.append(step_result)
continue
logger.debug(f"工具链步骤 {i + 1}: 调用 {tool_key},参数: {resolved_args}")
# 调用工具
result = await self._mcp_manager.call_tool(tool_key, resolved_args)
step_duration = (time.time() - step_start) * 1000
step_result["duration_ms"] = step_duration
if result.success:
step_result["success"] = True
# 确保 content 不为 None
content = result.content if result.content is not None else ""
step_result["output"] = content
# 更新上下文
context["prev"] = content
if step.output_key:
context["step"][step.output_key] = content
final_output = content
content_preview = content[:100] if content else "(空)"
logger.debug(f"工具链步骤 {i + 1} 成功: {content_preview}...")
else:
step_result["error"] = result.error or "未知错误"
logger.warning(f"工具链步骤 {i + 1} 失败: {result.error}")
if not step.optional:
step_results.append(step_result)
return ChainExecutionResult(
success=False,
final_output="",
step_results=step_results,
error=f"步骤 {i + 1} ({step.tool_name}) 失败: {result.error}",
total_duration_ms=(time.time() - start_time) * 1000,
)
except Exception as e:
step_duration = (time.time() - step_start) * 1000
step_result["duration_ms"] = step_duration
step_result["error"] = str(e)
logger.error(f"工具链步骤 {i + 1} 异常: {e}")
if not step.optional:
step_results.append(step_result)
return ChainExecutionResult(
success=False,
final_output="",
step_results=step_results,
error=f"步骤 {i + 1} ({step.tool_name}) 异常: {e}",
total_duration_ms=(time.time() - start_time) * 1000,
)
step_results.append(step_result)
total_duration = (time.time() - start_time) * 1000
return ChainExecutionResult(
success=True,
final_output=final_output,
step_results=step_results,
total_duration_ms=total_duration,
)
def _resolve_args(self, args_template: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""解析参数模板,替换变量
支持的变量格式:
- ${input.param_name}: 用户输入的参数
- ${step.output_key}: 某个步骤的输出
- ${prev}: 上一步的输出
- ${prev.field}: 上一步输出JSON的某个字段
"""
resolved = {}
for key, value in args_template.items():
if isinstance(value, str):
resolved[key] = self._substitute_vars(value, context)
elif isinstance(value, dict):
resolved[key] = self._resolve_args(value, context)
elif isinstance(value, list):
resolved[key] = [self._substitute_vars(v, context) if isinstance(v, str) else v for v in value]
else:
resolved[key] = value
return resolved
def _substitute_vars(self, template: str, context: Dict[str, Any]) -> str:
"""替换字符串中的变量"""
def replacer(match):
var_path = match.group(1)
return self._get_var_value(var_path, context)
return self.VAR_PATTERN.sub(replacer, template)
def _get_var_value(self, var_path: str, context: Dict[str, Any]) -> str:
"""获取变量值
Args:
var_path: 变量路径,如 "input.query", "step.search_result", "prev", "prev.id"
context: 上下文
"""
parts = self._parse_var_path(var_path)
if not parts:
return ""
# 获取根对象
root = parts[0]
if root not in context:
logger.warning(f"变量 {var_path} 的根 '{root}' 不存在")
return ""
value = context[root]
# 遍历路径
for part in parts[1:]:
if isinstance(value, str):
parsed = self._try_parse_json(value)
if parsed is not None:
value = parsed
if isinstance(value, dict):
value = value.get(part, "")
elif isinstance(value, list):
if part.isdigit():
idx = int(part)
value = value[idx] if 0 <= idx < len(value) else ""
else:
value = ""
else:
value = ""
# 确保返回字符串
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False)
if value is None:
return ""
if value == "":
return ""
return str(value)
def _try_parse_json(self, value: str) -> Optional[Any]:
"""尝试将字符串解析为 JSON 对象,失败则返回 None。"""
if not value:
return None
try:
return json.loads(value)
except json.JSONDecodeError:
return None
def _parse_var_path(self, var_path: str) -> List[str]:
"""解析变量路径,支持点号与下标写法。
支持:
- step.geo.return.0.location
- step.geo.return[0].location
- step.geo['return'][0]['location']
"""
if not var_path:
return []
tokens: List[str] = []
buf: List[str] = []
in_bracket = False
in_quote = False
quote_char = ""
def flush_buf() -> None:
if buf:
token = "".join(buf).strip()
if token:
tokens.append(token)
buf.clear()
i = 0
while i < len(var_path):
ch = var_path[i]
if not in_bracket and ch == ".":
flush_buf()
i += 1
continue
if not in_bracket and ch == "[":
flush_buf()
in_bracket = True
in_quote = False
quote_char = ""
i += 1
continue
if in_bracket and not in_quote and ch == "]":
flush_buf()
in_bracket = False
i += 1
continue
if in_bracket and ch in ("'", '"'):
if not in_quote:
in_quote = True
quote_char = ch
i += 1
continue
if quote_char == ch:
in_quote = False
quote_char = ""
i += 1
continue
if in_bracket and not in_quote:
if ch.isspace():
i += 1
continue
if ch == ",":
i += 1
continue
buf.append(ch)
i += 1
flush_buf()
if in_bracket or in_quote:
return [p for p in var_path.split(".") if p]
return tokens
class ToolChainManager:
"""工具链管理器"""
_instance: Optional["ToolChainManager"] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._chains: Dict[str, ToolChainDefinition] = {}
self._executor: Optional[ToolChainExecutor] = None
def set_executor(self, mcp_manager) -> None:
"""设置执行器"""
self._executor = ToolChainExecutor(mcp_manager)
def add_chain(self, chain: ToolChainDefinition) -> bool:
"""添加工具链"""
if not chain.name:
logger.error("工具链名称不能为空")
return False
if chain.name in self._chains:
logger.warning(f"工具链 {chain.name} 已存在,将被覆盖")
self._chains[chain.name] = chain
logger.info(f"已添加工具链: {chain.name} ({len(chain.steps)} 个步骤)")
return True
def remove_chain(self, name: str) -> bool:
"""移除工具链"""
if name in self._chains:
del self._chains[name]
logger.info(f"已移除工具链: {name}")
return True
return False
def get_chain(self, name: str) -> Optional[ToolChainDefinition]:
"""获取工具链"""
return self._chains.get(name)
def get_all_chains(self) -> Dict[str, ToolChainDefinition]:
"""获取所有工具链"""
return self._chains.copy()
def get_enabled_chains(self) -> Dict[str, ToolChainDefinition]:
"""获取所有启用的工具链"""
return {name: chain for name, chain in self._chains.items() if chain.enabled}
async def execute_chain(
self,
chain_name: str,
input_args: Dict[str, Any],
) -> ChainExecutionResult:
"""执行工具链"""
chain = self._chains.get(chain_name)
if not chain:
return ChainExecutionResult(
success=False,
final_output="",
error=f"工具链 {chain_name} 不存在",
)
if not chain.enabled:
return ChainExecutionResult(
success=False,
final_output="",
error=f"工具链 {chain_name} 已禁用",
)
if not self._executor:
return ChainExecutionResult(
success=False,
final_output="",
error="工具链执行器未初始化",
)
return await self._executor.execute(chain, input_args)
def load_from_json(self, json_str: str) -> Tuple[int, List[str]]:
"""从 JSON 字符串加载工具链配置
Returns:
(成功加载数量, 错误列表)
"""
errors = []
loaded = 0
try:
data = json.loads(json_str) if json_str.strip() else []
except json.JSONDecodeError as e:
return 0, [f"JSON 解析失败: {e}"]
if not isinstance(data, list):
data = [data]
for i, item in enumerate(data):
try:
chain = ToolChainDefinition.from_dict(item)
if not chain.name:
errors.append(f"{i + 1} 个工具链缺少名称")
continue
if not chain.steps:
errors.append(f"工具链 {chain.name} 没有步骤")
continue
self.add_chain(chain)
loaded += 1
except Exception as e:
errors.append(f"{i + 1} 个工具链解析失败: {e}")
return loaded, errors
def export_to_json(self, pretty: bool = True) -> str:
"""导出所有工具链为 JSON"""
chains_data = [chain.to_dict() for chain in self._chains.values()]
if pretty:
return json.dumps(chains_data, ensure_ascii=False, indent=2)
return json.dumps(chains_data, ensure_ascii=False)
def clear(self) -> None:
"""清空所有工具链"""
self._chains.clear()
# 全局工具链管理器实例
tool_chain_manager = ToolChainManager()