diff --git a/src/config/official_configs.py b/src/config/official_configs.py index fb96da9a..29d6e976 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -2425,14 +2425,14 @@ class MCPServerItemConfig(ConfigBase): ) """是否启用当前 MCP 服务器""" - transport: Literal["stdio", "streamable_http"] = Field( + transport: Literal["stdio", "streamable_http", "sse"] = Field( default="stdio", json_schema_extra={ "x-widget": "select", "x-icon": "shuffle", }, ) - """传输方式,可选 `stdio` 或 `streamable_http`""" + """传输方式,可选 `stdio`、`streamable_http` 或 `sse`""" command: str = Field( default="", @@ -2521,6 +2521,9 @@ class MCPServerItemConfig(ConfigBase): if self.transport == "streamable_http" and not self.url.strip(): raise ValueError(f"MCP 服务器 {self.name} 使用 streamable_http 时必须填写 url") + if self.transport == "sse" and not self.url.strip(): + raise ValueError(f"MCP 服务器 {self.name} 使用 sse 时必须填写 url") + return super().model_post_init(context) diff --git a/src/mcp_module/config.py b/src/mcp_module/config.py index 4d4d73af..2bc2621c 100644 --- a/src/mcp_module/config.py +++ b/src/mcp_module/config.py @@ -50,7 +50,7 @@ class MCPServerRuntimeConfig: """单个 MCP 服务器的运行时配置。""" name: str - transport: Literal["stdio", "streamable_http"] = "stdio" + transport: Literal["stdio", "streamable_http", "sse"] = "stdio" command: str = "" args: list[str] = field(default_factory=list) env: dict[str, str] = field(default_factory=dict) @@ -65,13 +65,15 @@ class MCPServerRuntimeConfig: """返回当前服务器的传输类型。 Returns: - str: ``stdio``、``streamable_http`` 或 ``unknown``。 + str: ``stdio``、``streamable_http``、``sse`` 或 ``unknown``。 """ if self.transport == "stdio" and self.command: return "stdio" if self.transport == "streamable_http" and self.url: return "streamable_http" + if self.transport == "sse" and self.url: + return "sse" return "unknown" def build_http_headers(self) -> dict[str, str]: diff --git a/src/mcp_module/connection.py b/src/mcp_module/connection.py index 10a1ea5e..02a3823d 100644 --- a/src/mcp_module/connection.py +++ b/src/mcp_module/connection.py @@ -43,16 +43,26 @@ try: from mcp.client.stdio import stdio_client from mcp.client.streamable_http import streamable_http_client + try: + from mcp.client.sse import sse_client + + SSE_AVAILABLE = True + except ImportError: + SSE_AVAILABLE = False + sse_client = None # type: ignore[assignment] + MCP_AVAILABLE = True STREAMABLE_HTTP_AVAILABLE = True except ImportError: MCP_AVAILABLE = False STREAMABLE_HTTP_AVAILABLE = False + SSE_AVAILABLE = False ClientSession = None # type: ignore[assignment,misc] StdioServerParameters = None # type: ignore[assignment,misc] mcp_types = None # type: ignore[assignment] stdio_client = None # type: ignore[assignment] streamable_http_client = None # type: ignore[assignment] + sse_client = None # type: ignore[assignment] class MCPConnection: @@ -139,6 +149,8 @@ class MCPConnection: return await self._connect_stdio() if self.config.transport_type == "streamable_http": return await self._connect_streamable_http() + if self.config.transport_type == "sse": + return await self._connect_sse() raise ValueError(f"MCP 服务器 '{self.config.name}' 使用了未知传输类型: {self.config.transport}") @@ -184,16 +196,53 @@ class MCPConnection: self._session_id_getter = session_id_getter return read_stream, write_stream - def _build_http_client(self) -> httpx.AsyncClient: - """构建 Streamable HTTP 使用的 `httpx` 客户端。 + async def _connect_sse(self) -> tuple[Any, Any]: + """建立 SSE 传输连接。 + + Returns: + tuple[Any, Any]: 读写流对象。 + """ + + if not SSE_AVAILABLE or sse_client is None: + raise ImportError("当前环境未安装可用的 MCP SSE 客户端") + if not self.config.url: + raise ValueError(f"MCP 服务器 '{self.config.name}' 缺少 SSE url 配置") + + read_stream, write_stream = await self._exit_stack.enter_async_context( + sse_client( + url=self.config.url, + headers=self.config.build_http_headers(), + timeout=self.config.http_timeout_seconds, + sse_read_timeout=self.config.read_timeout_seconds, + httpx_client_factory=self._build_http_client, + ) + ) + return read_stream, write_stream + + def _build_http_client( + self, + headers: dict[str, str] | None = None, + timeout: httpx.Timeout | None = None, + auth: httpx.Auth | None = None, + ) -> httpx.AsyncClient: + """构建 httpx 客户端。 + + Args: + headers: 合并到配置请求头的额外请求头。 + timeout: 覆盖的 httpx 超时配置。 + auth: 附加认证。 Returns: httpx.AsyncClient: 预配置的异步 HTTP 客户端。 """ + del auth + merged_headers = self.config.build_http_headers() + if headers: + merged_headers.update(headers) return httpx.AsyncClient( - headers=self.config.build_http_headers(), - timeout=httpx.Timeout(self.config.http_timeout_seconds), + headers=merged_headers, + timeout=timeout or httpx.Timeout(self.config.http_timeout_seconds), ) async def _create_client_session(self, read_stream: Any, write_stream: Any) -> Any: diff --git a/src/mcp_module/manager.py b/src/mcp_module/manager.py index 53c4dbc4..f0d054e7 100644 --- a/src/mcp_module/manager.py +++ b/src/mcp_module/manager.py @@ -309,7 +309,7 @@ class MCPManager: provider_type="mcp", icons=[build_tool_icon(item) for item in getattr(tool, "icons", []) or []], annotation=build_tool_annotation(getattr(tool, "annotations", None)), - metadata={"server_name": server_name} | getattr(tool, "meta", {}), + metadata={"server_name": server_name} | (getattr(tool, "meta", {}) or {}), ) ) return tool_specs