mcp独立模块

This commit is contained in:
SengokuCola
2026-03-29 14:39:14 +08:00
parent 5876f246c0
commit f32edfa732
6 changed files with 19 additions and 26 deletions

View File

@@ -0,0 +1,18 @@
"""
MCP (Model Context Protocol) 客户端包。
提供 MCPManager 用于管理 MCP 服务器连接、发现工具、调用工具。
用法:
from .manager import MCPManager
manager = await MCPManager.from_config("config/mcp_config.json")
if manager:
tools = manager.get_openai_tools() # 获取 OpenAI 格式工具列表
result = await manager.call_tool(name, args) # 调用工具
await manager.close() # 关闭连接
"""
from .manager import MCPManager
__all__ = ["MCPManager"]

107
src/mcp_module/config.py Normal file
View File

@@ -0,0 +1,107 @@
"""
MCP 配置加载与验证。
从 config/mcp_config.json 读取 MCP 服务器定义,解析为结构化配置对象。
配置格式示例:
{
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "C:/Users"],
"env": {}
},
"remote-api": {
"url": "http://localhost:8080/sse",
"headers": {"Authorization": "Bearer xxx"}
}
}
}
- command + args: Stdio 传输(启动子进程)
- url: SSE 传输(连接远程服务器)
"""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import json
import os
from src.maisaka.console import console
DEFAULT_MCP_CONFIG_PATH = Path(__file__).resolve().parents[2] / "config" / "mcp_config.json"
@dataclass
class MCPServerConfig:
"""单个 MCP 服务器配置。"""
name: str
# ── Stdio 传输 ──
command: Optional[str] = None
args: list[str] = field(default_factory=list)
env: Optional[dict[str, str]] = None
# ── SSE 传输 ──
url: Optional[str] = None
headers: dict[str, str] = field(default_factory=dict)
@property
def transport_type(self) -> str:
"""返回传输类型: 'stdio' / 'sse' / 'unknown'"""
if self.command:
return "stdio"
if self.url:
return "sse"
return "unknown"
def load_mcp_config(config_path: str = str(DEFAULT_MCP_CONFIG_PATH)) -> list[MCPServerConfig]:
"""
从配置文件加载 MCP 服务器列表。
Args:
config_path: 配置文件路径
Returns:
解析后的 MCPServerConfig 列表;文件不存在或为空时返回空列表。
"""
if not os.path.isfile(config_path):
return []
try:
with open(config_path, "r", encoding="utf-8") as f:
data = json.load(f)
except (json.JSONDecodeError, OSError) as e:
console.print(f"[warning]⚠️ 读取 MCP 配置失败: {e}[/warning]")
return []
mcp_servers = data.get("mcpServers", {})
if not isinstance(mcp_servers, dict):
console.print("[warning]⚠️ MCP 配置中的 mcpServers 格式无效[/warning]")
return []
configs: list[MCPServerConfig] = []
for name, cfg in mcp_servers.items():
if not isinstance(cfg, dict):
console.print(f"[warning]⚠️ MCP 服务器 '{name}' 配置格式无效,已跳过[/warning]")
continue
server = MCPServerConfig(
name=name,
command=cfg.get("command"),
args=cfg.get("args", []),
env=cfg.get("env"),
url=cfg.get("url"),
headers=cfg.get("headers", {}),
)
if server.transport_type == "unknown":
console.print(f"[warning]⚠️ MCP 服务器 '{name}' 缺少 command 或 url已跳过[/warning]")
continue
configs.append(server)
return configs

View File

@@ -0,0 +1,148 @@
"""
MaiSaka - 单个 MCP 服务器连接管理
封装单个 MCP 服务器的连接生命周期:连接 → 发现工具 → 调用工具 → 断开。
"""
from contextlib import AsyncExitStack
from typing import Any, Optional
from src.maisaka.console import console
from .config import MCPServerConfig
# ──────────────────── MCP SDK 可选导入 ────────────────────
#
# mcp 是可选依赖。如果未安装MCP_AVAILABLE = False
# MCPManager.from_config() 会检测到并返回 None不影响主程序运行。
try:
from mcp import ClientSession
try:
from mcp.client.stdio import StdioServerParameters
except ImportError:
from mcp import StdioServerParameters # type: ignore[attr-defined]
from mcp.client.stdio import stdio_client
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
ClientSession = None # type: ignore[assignment,misc]
StdioServerParameters = None # type: ignore[assignment,misc]
stdio_client = None # type: ignore[assignment]
try:
from mcp.client.sse import sse_client
SSE_AVAILABLE = True
except ImportError:
SSE_AVAILABLE = False
sse_client = None # type: ignore[assignment]
class MCPConnection:
"""
管理单个 MCP 服务器的连接生命周期。
支持两种传输方式:
- Stdio: 启动子进程,通过 stdin/stdout 通信
- SSE: 连接远程 HTTP SSE 端点
"""
def __init__(self, config: MCPServerConfig):
self.config = config
self.session: Optional[Any] = None # mcp.ClientSession
self.tools: list = [] # mcp Tool objects
self._exit_stack = AsyncExitStack()
async def connect(self) -> bool:
"""
连接到 MCP 服务器并发现可用工具。
Returns:
True 表示连接成功False 表示失败。
"""
if not MCP_AVAILABLE:
console.print("[warning]⚠️ 未安装 mcp SDK请运行: pip install mcp[/warning]")
return False
try:
await self._exit_stack.__aenter__()
if self.config.transport_type == "stdio":
read_stream, write_stream = await self._connect_stdio()
elif self.config.transport_type == "sse":
read_stream, write_stream = await self._connect_sse()
else:
console.print(f"[warning]MCP '{self.config.name}': 未知传输类型[/warning]")
return False
# 创建并初始化 MCP 会话
self.session = await self._exit_stack.enter_async_context(ClientSession(read_stream, write_stream))
await self.session.initialize()
# 发现工具
result = await self.session.list_tools()
self.tools = result.tools if hasattr(result, "tools") else []
return True
except Exception as e:
console.print(f"[warning]⚠️ MCP 服务器 '{self.config.name}' 连接失败: {e}[/warning]")
await self.close()
return False
async def _connect_stdio(self):
"""建立 Stdio 传输连接。"""
params = StdioServerParameters(
command=self.config.command,
args=self.config.args,
env=self.config.env,
)
return await self._exit_stack.enter_async_context(stdio_client(params))
async def _connect_sse(self):
"""建立 SSE 传输连接。"""
if not SSE_AVAILABLE:
raise ImportError("SSE 传输需要额外依赖,请运行: pip install mcp[sse]")
return await self._exit_stack.enter_async_context(sse_client(url=self.config.url, headers=self.config.headers))
async def call_tool(self, tool_name: str, arguments: dict) -> str:
"""
调用 MCP 工具并返回结果文本。
Args:
tool_name: 工具名称
arguments: 工具参数字典
Returns:
工具执行结果的文本表示。
"""
if not self.session:
return f"MCP 服务器 '{self.config.name}' 未连接"
result = await self.session.call_tool(tool_name, arguments=arguments)
# 将结果内容转换为文本
parts: list[str] = []
for content in result.content:
if hasattr(content, "text"):
parts.append(content.text)
elif hasattr(content, "data"):
# 二进制/图片内容,展示类型信息
content_type = getattr(content, "mimeType", "unknown")
parts.append(f"[{content_type} 二进制内容]")
elif hasattr(content, "type"):
parts.append(f"[{content.type} 内容]")
return "\n".join(parts) if parts else "工具执行成功(无输出)"
async def close(self):
"""关闭连接并释放资源。"""
try:
await self._exit_stack.aclose()
except Exception:
pass
self.session = None
self.tools = []

215
src/mcp_module/manager.py Normal file
View File

@@ -0,0 +1,215 @@
"""
MaiSaka - MCP 管理器
管理所有 MCP 服务器连接,提供统一的工具发现与调用接口。
"""
from typing import Optional
from src.maisaka.console import console
from .config import DEFAULT_MCP_CONFIG_PATH, MCPServerConfig, load_mcp_config
from .connection import MCPConnection, MCP_AVAILABLE
# 内置工具名称集合 —— MCP 工具不允许与这些名称冲突
BUILTIN_TOOL_NAMES = frozenset(
{
"reply",
"no_reply",
"wait",
"stop",
"create_table",
"list_tables",
"view_table",
}
)
class MCPManager:
"""
MCP 服务器连接管理器。
职责:
- 根据配置文件连接所有 MCP 服务器
- 将 MCP 工具转换为 OpenAI function calling 格式
- 路由工具调用到正确的 MCP 服务器
- 统一管理连接生命周期
"""
def __init__(self):
self._connections: dict[str, MCPConnection] = {} # server_name → connection
self._tool_to_server: dict[str, str] = {} # tool_name → server_name
# ──────── 工厂方法 ────────
@classmethod
async def from_config(
cls,
config_path: str = str(DEFAULT_MCP_CONFIG_PATH),
) -> Optional["MCPManager"]:
"""
从配置文件创建并初始化 MCPManager。
Args:
config_path: mcp_config.json 文件路径
Returns:
初始化完成的 MCPManager无配置或全部连接失败时返回 None。
"""
configs = load_mcp_config(config_path)
if not configs:
return None
if not MCP_AVAILABLE:
console.print("[warning]⚠️ 发现 MCP 配置但未安装 mcp SDK请运行: pip install mcp[/warning]")
return None
manager = cls()
await manager._connect_all(configs)
if not manager._connections:
console.print("[warning]⚠️ 所有 MCP 服务器连接失败[/warning]")
return None
return manager
# ──────── 连接管理 ────────
async def _connect_all(self, configs: list[MCPServerConfig]):
"""连接所有配置的 MCP 服务器,跳过失败的连接。"""
for cfg in configs:
conn = MCPConnection(cfg)
success = await conn.connect()
if not success:
continue
self._connections[cfg.name] = conn
# 注册工具,检查冲突
registered = 0
for tool in conn.tools:
tool_name = tool.name
if tool_name in BUILTIN_TOOL_NAMES:
console.print(
f"[warning]⚠️ MCP 工具 '{tool_name}' (来自 {cfg.name}) 与内置工具冲突,已跳过[/warning]"
)
continue
if tool_name in self._tool_to_server:
existing_server = self._tool_to_server[tool_name]
console.print(
f"[warning]⚠️ MCP 工具 '{tool_name}' "
f"(来自 {cfg.name}) 与 {existing_server} 冲突,已跳过[/warning]"
)
continue
self._tool_to_server[tool_name] = cfg.name
registered += 1
console.print(
f"[success]✓ MCP 服务器 '{cfg.name}' 已连接[/success] [muted]({registered} 个工具已注册)[/muted]"
)
# ──────── 工具发现 ────────
def get_openai_tools(self) -> list[dict]:
"""
将所有已注册的 MCP 工具转换为 OpenAI function calling 格式。
Returns:
OpenAI tools 格式的工具定义列表。
"""
tools: list[dict] = []
for server_name, conn in self._connections.items():
for tool in conn.tools:
# 只包含成功注册的工具
if tool.name not in self._tool_to_server:
continue
if self._tool_to_server[tool.name] != server_name:
continue
# MCP inputSchema → OpenAI parameters
parameters = (
dict(tool.inputSchema)
if hasattr(tool, "inputSchema") and tool.inputSchema
else {"type": "object", "properties": {}}
)
# 移除 $schema 字段(部分 MCP 服务器会带上OpenAI 不接受)
parameters.pop("$schema", None)
tools.append(
{
"type": "function",
"function": {
"name": tool.name,
"description": (tool.description or f"MCP tool from {server_name}"),
"parameters": parameters,
},
}
)
return tools
# ──────── 工具调用 ────────
def is_mcp_tool(self, tool_name: str) -> bool:
"""判断工具名是否为已注册的 MCP 工具。"""
return tool_name in self._tool_to_server
async def call_tool(self, tool_name: str, arguments: dict) -> str:
"""
调用指定的 MCP 工具。
自动路由到正确的 MCP 服务器。
Args:
tool_name: 工具名称
arguments: 工具参数
Returns:
工具执行结果文本。
"""
server_name = self._tool_to_server.get(tool_name)
if not server_name or server_name not in self._connections:
return f"MCP 工具 '{tool_name}' 未找到"
conn = self._connections[server_name]
try:
return await conn.call_tool(tool_name, arguments)
except Exception as e:
return f"MCP 工具 '{tool_name}' 执行失败: {e}"
# ──────── 信息展示 ────────
def get_tool_summary(self) -> str:
"""获取所有已注册 MCP 工具的摘要信息。"""
parts: list[str] = []
for server_name, conn in self._connections.items():
tool_names = [
t.name
for t in conn.tools
if t.name in self._tool_to_server and self._tool_to_server[t.name] == server_name
]
if tool_names:
parts.append(f"{server_name}: {', '.join(tool_names)}")
return "\n".join(parts)
@property
def server_count(self) -> int:
"""已连接的 MCP 服务器数量。"""
return len(self._connections)
@property
def tool_count(self) -> int:
"""已注册的 MCP 工具总数。"""
return len(self._tool_to_server)
# ──────── 生命周期 ────────
async def close(self):
"""关闭所有 MCP 服务器连接。"""
for conn in self._connections.values():
await conn.close()
self._connections.clear()
self._tool_to_server.clear()