From 343e0b54b94bacf96e4b2221593aacc4fa37ad65 Mon Sep 17 00:00:00 2001
From: Soulter <905617992@qq.com>
Date: Tue, 27 May 2025 15:39:02 +0800
Subject: [PATCH] feat: MCP supports Streamable HTTP transport method
fixes: #1637 #1342
---
astrbot/core/provider/func_tool_manager.py | 60 +++++++++++++++++-----
dashboard/src/views/ToolUsePage.vue | 2 +-
2 files changed, 47 insertions(+), 15 deletions(-)
diff --git a/astrbot/core/provider/func_tool_manager.py b/astrbot/core/provider/func_tool_manager.py
index 416e74386..8d79b1de8 100644
--- a/astrbot/core/provider/func_tool_manager.py
+++ b/astrbot/core/provider/func_tool_manager.py
@@ -4,6 +4,7 @@ import textwrap
import os
import asyncio
import logging
+from datetime import timedelta
from typing import Dict, List, Awaitable, Literal, Any
from dataclasses import dataclass
@@ -20,6 +21,13 @@ try:
except (ModuleNotFoundError, ImportError):
logger.warning("警告: 缺少依赖库 'mcp',将无法使用 MCP 服务。")
+try:
+ from mcp.client.streamable_http import streamablehttp_client
+except (ModuleNotFoundError, ImportError):
+ logger.warning(
+ "警告: 缺少依赖库 'mcp' 或者 mcp 库版本过低,无法使用 Streamable HTTP 连接方式。"
+ )
+
DEFAULT_MCP_CONFIG = {"mcpServers": {}}
SUPPORTED_TYPES = [
@@ -96,7 +104,10 @@ class MCPClient:
async def connect_to_server(self, mcp_server_config: dict, name: str):
"""连接到 MCP 服务器
- 如果 `url` 参数存在,则使用 SSE 的方式连接到 MCP 服务。
+ 如果 `url` 参数存在:
+ 1. 当 transport 指定为 `streamable_http` 时,使用 Streamable HTTP 连接方式。
+ 1. 当 transport 指定为 `sse` 时,使用 SSE 连接方式。
+ 2. 如果没有指定,默认使用 SSE 的方式连接到 MCP 服务。
Args:
mcp_server_config (dict): Configuration for the MCP server. See https://modelcontextprotocol.io/quickstart/server
@@ -108,20 +119,41 @@ class MCPClient:
cfg.pop("active", None) # Remove active flag from config
if "url" in cfg:
- # SSE transport method
- self._streams_context = sse_client(
- url=cfg["url"],
- headers=cfg.get("headers", {}),
- timeout=cfg.get("timeout", 5),
- sse_read_timeout=cfg.get("sse_read_timeout", 60 * 5),
- )
- streams = await self._streams_context.__aenter__()
+ is_sse = True
+ if cfg.get("transport") == "streamable_http":
+ is_sse = False
+ if is_sse:
+ # SSE transport method
+ self._streams_context = sse_client(
+ url=cfg["url"],
+ headers=cfg.get("headers", {}),
+ timeout=cfg.get("timeout", 5),
+ sse_read_timeout=cfg.get("sse_read_timeout", 60 * 5),
+ )
+ streams = await self._streams_context.__aenter__()
- # Create a new client session
- # self.session = await self._session_context.__aenter__()
- self.session = await self.exit_stack.enter_async_context(
- mcp.ClientSession(*streams)
- )
+ # Create a new client session
+ self.session = await self.exit_stack.enter_async_context(
+ mcp.ClientSession(*streams)
+ )
+ else:
+ timeout = timedelta(seconds=cfg.get("timeout", 30))
+ sse_read_timeout = timedelta(
+ seconds=cfg.get("sse_read_timeout", 60 * 5)
+ )
+ self._streams_context = streamablehttp_client(
+ url=cfg["url"],
+ headers=cfg.get("headers", {}),
+ timeout=timeout,
+ sse_read_timeout=sse_read_timeout,
+ terminate_on_close=cfg.get("terminate_on_close", True),
+ )
+ read_s, write_s, _ = await self._streams_context.__aenter__()
+
+ # Create a new client session
+ self.session = await self.exit_stack.enter_async_context(
+ mcp.ClientSession(read_stream=read_s, write_stream=write_s)
+ )
else:
server_params = mcp.StdioServerParameters(
diff --git a/dashboard/src/views/ToolUsePage.vue b/dashboard/src/views/ToolUsePage.vue
index 312fca87b..a8a74082f 100644
--- a/dashboard/src/views/ToolUsePage.vue
+++ b/dashboard/src/views/ToolUsePage.vue
@@ -327,7 +327,7 @@
1. 某些 MCP 服务器可能需要按照其要求在 env 中填充 `API_KEY` 或 `TOKEN` 等信息,请注意检查是否填写。
- 2. 当配置中带有 url 参数时,将使用 SSE 的方式连接到服务器。
+ 2. 当配置中指定 url 参数时:如果还同时指定 `transport` 参数的值为 `streamable_http`,则使用 Steamable HTTP,否则使用 SSE 连接。