From 17d62a9af7872d469cf3e28897701d90ca9b9945 Mon Sep 17 00:00:00 2001
From: Soulter <37870767+Soulter@users.noreply.github.com>
Date: Sun, 20 Jul 2025 15:53:13 +0800
Subject: [PATCH] refactor: mcp server reload mechanism (#2161)
* refactor: mcp server reload mechanism
* fix: wait for client events
* fix: all other mcp servers are terminated when disable selected server
* fix: resolve type hinting issues in MCPClient and FuncCall methods
* perf: optimize mcp server loaders
* perf: improve MCP client connection testing
* perf: improve error message
* perf: clean code
* perf: increase default timeout for MCP connection and reset dialog message on close
---------
Co-authored-by: Raven95676
---
astrbot/core/provider/func_tool_manager.py | 359 ++++++++++++------
astrbot/core/provider/manager.py | 15 +-
astrbot/dashboard/routes/tools.py | 123 ++++--
dashboard/src/components/shared/ItemCard.vue | 6 +
.../i18n/locales/en-US/features/tool-use.json | 11 +-
.../i18n/locales/zh-CN/features/tool-use.json | 11 +-
dashboard/src/views/ToolUsePage.vue | 223 +++++++----
7 files changed, 509 insertions(+), 239 deletions(-)
diff --git a/astrbot/core/provider/func_tool_manager.py b/astrbot/core/provider/func_tool_manager.py
index cea2e4f38..07a0fbd8f 100644
--- a/astrbot/core/provider/func_tool_manager.py
+++ b/astrbot/core/provider/func_tool_manager.py
@@ -39,6 +39,72 @@ SUPPORTED_TYPES = [
] # json schema 支持的数据类型
+def _prepare_config(config: dict) -> dict:
+ """准备配置,处理嵌套格式"""
+ if "mcpServers" in config and config["mcpServers"]:
+ first_key = next(iter(config["mcpServers"]))
+ config = config["mcpServers"][first_key]
+ config.pop("active", None)
+ return config
+
+
+async def _quick_test_mcp_connection(config: dict) -> tuple[bool, str]:
+ """快速测试 MCP 服务器可达性"""
+ import aiohttp
+
+ cfg = _prepare_config(config.copy())
+
+ url = cfg["url"]
+ headers = cfg.get("headers", {})
+ timeout = cfg.get("timeout", 10)
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ if cfg.get("transport") == "streamable_http":
+ test_payload = {
+ "jsonrpc": "2.0",
+ "method": "initialize",
+ "id": 0,
+ "params": {
+ "protocolVersion": "2024-11-05",
+ "capabilities": {},
+ "clientInfo": {"name": "test-client", "version": "1.2.3"},
+ },
+ }
+ async with session.post(
+ url,
+ headers={
+ **headers,
+ "Content-Type": "application/json",
+ "Accept": "application/json, text/event-stream",
+ },
+ json=test_payload,
+ timeout=aiohttp.ClientTimeout(total=timeout),
+ ) as response:
+ if response.status == 200:
+ return True, ""
+ else:
+ return False, f"HTTP {response.status}: {response.reason}"
+ else:
+ async with session.get(
+ url,
+ headers={
+ **headers,
+ "Accept": "application/json, text/event-stream",
+ },
+ timeout=aiohttp.ClientTimeout(total=timeout),
+ ) as response:
+ if response.status == 200:
+ return True, ""
+ else:
+ return False, f"HTTP {response.status}: {response.reason}"
+
+ except asyncio.TimeoutError:
+ return False, f"连接超时: {timeout}秒"
+ except Exception as e:
+ return False, f"{e!s}"
+
+
@dataclass
class FuncTool:
"""
@@ -80,12 +146,10 @@ class FuncTool:
if not self.mcp_client or not self.mcp_client.session:
raise Exception(f"MCP client for {self.name} is not available")
# 使用name属性而不是额外的mcp_tool_name
- if ":" in self.name:
- # 如果名字是格式为 mcp:server:tool_name,提取实际的工具名
- actual_tool_name = self.name.split(":")[-1]
- return await self.mcp_client.session.call_tool(actual_tool_name, args)
- else:
- return await self.mcp_client.session.call_tool(self.name, args)
+ actual_tool_name = (
+ self.name.split(":")[-1] if ":" in self.name else self.name
+ )
+ return await self.mcp_client.session.call_tool(actual_tool_name, args)
else:
raise Exception(f"Unknown function origin: {self.origin}")
@@ -100,6 +164,7 @@ class MCPClient:
self.active: bool = True
self.tools: List[mcp.Tool] = []
self.server_errlogs: List[str] = []
+ self.running_event = asyncio.Event()
async def connect_to_server(self, mcp_server_config: dict, name: str):
"""连接到 MCP 服务器
@@ -112,17 +177,19 @@ class MCPClient:
Args:
mcp_server_config (dict): Configuration for the MCP server. See https://modelcontextprotocol.io/quickstart/server
"""
- cfg = mcp_server_config.copy()
- if "mcpServers" in cfg and len(cfg["mcpServers"]) > 0:
- key_0 = list(cfg["mcpServers"].keys())[0]
- cfg = cfg["mcpServers"][key_0]
- cfg.pop("active", None) # Remove active flag from config
+ cfg = _prepare_config(mcp_server_config.copy())
+
+ def logging_callback(msg: str):
+ # 处理 MCP 服务的错误日志
+ print(f"MCP Server {name} Error: {msg}")
+ self.server_errlogs.append(msg)
if "url" in cfg:
- is_sse = True
- if cfg.get("transport") == "streamable_http":
- is_sse = False
- if is_sse:
+ success, error_msg = await _quick_test_mcp_connection(cfg)
+ if not success:
+ raise Exception(error_msg)
+
+ if cfg.get("transport") != "streamable_http":
# SSE transport method
self._streams_context = sse_client(
url=cfg["url"],
@@ -130,11 +197,18 @@ class MCPClient:
timeout=cfg.get("timeout", 5),
sse_read_timeout=cfg.get("sse_read_timeout", 60 * 5),
)
- streams = await self.exit_stack.enter_async_context(self._streams_context)
+ streams = await self.exit_stack.enter_async_context(
+ self._streams_context
+ )
# Create a new client session
+ read_timeout = timedelta(seconds=cfg.get("session_read_timeout", 20))
self.session = await self.exit_stack.enter_async_context(
- mcp.ClientSession(*streams)
+ mcp.ClientSession(
+ *streams,
+ read_timeout_seconds=read_timeout,
+ logging_callback=logging_callback, # type: ignore
+ )
)
else:
timeout = timedelta(seconds=cfg.get("timeout", 30))
@@ -148,11 +222,19 @@ class MCPClient:
sse_read_timeout=sse_read_timeout,
terminate_on_close=cfg.get("terminate_on_close", True),
)
- read_s, write_s, _ = await self.exit_stack.enter_async_context(self._streams_context)
+ read_s, write_s, _ = await self.exit_stack.enter_async_context(
+ self._streams_context
+ )
# Create a new client session
+ read_timeout = timedelta(seconds=cfg.get("session_read_timeout", 20))
self.session = await self.exit_stack.enter_async_context(
- mcp.ClientSession(read_stream=read_s, write_stream=write_s)
+ mcp.ClientSession(
+ read_stream=read_s,
+ write_stream=write_s,
+ read_timeout_seconds=read_timeout,
+ logging_callback=logging_callback, # type: ignore
+ )
)
else:
@@ -172,7 +254,7 @@ class MCPClient:
logger=logger,
identifier=f"MCPServer-{name}",
callback=callback,
- ),
+ ), # type: ignore
),
)
@@ -180,19 +262,18 @@ class MCPClient:
self.session = await self.exit_stack.enter_async_context(
mcp.ClientSession(*stdio_transport)
)
-
await self.session.initialize()
async def list_tools_and_save(self) -> mcp.ListToolsResult:
"""List all tools from the server and save them to self.tools"""
response = await self.session.list_tools()
- logger.debug(f"MCP server {self.name} list tools response: {response}")
self.tools = response.tools
return response
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
+ self.running_event.set() # Set the running event to indicate cleanup is done
class FuncCall:
@@ -201,8 +282,6 @@ class FuncCall:
"""内部加载的 func tools"""
self.mcp_client_dict: Dict[str, MCPClient] = {}
"""MCP 服务列表"""
- self.mcp_service_queue = asyncio.Queue()
- """用于外部控制 MCP 服务的启停"""
self.mcp_client_event: Dict[str, asyncio.Event] = {}
def empty(self) -> bool:
@@ -258,7 +337,7 @@ class FuncCall:
return f
return None
- async def _init_mcp_clients(self) -> None:
+ async def init_mcp_clients(self) -> None:
"""从项目根目录读取 mcp_server.json 文件,初始化 MCP 服务列表。文件格式如下:
```
{
@@ -300,115 +379,64 @@ class FuncCall:
)
self.mcp_client_event[name] = event
- async def mcp_service_selector(self):
- """为了避免在不同异步任务中控制 MCP 服务导致的报错,整个项目统一通过这个 Task 来控制
-
- 使用 self.mcp_service_queue.put_nowait() 来控制 MCP 服务的启停,数据格式如下:
-
- {"type": "init"} 初始化所有MCP客户端
-
- {"type": "init", "name": "mcp_server_name", "cfg": {...}} 初始化指定的MCP客户端
-
- {"type": "terminate"} 终止所有MCP客户端
-
- {"type": "terminate", "name": "mcp_server_name"} 终止指定的MCP客户端
- """
- while True:
- data = await self.mcp_service_queue.get()
- if data["type"] == "init":
- if "name" in data:
- event = asyncio.Event()
- asyncio.create_task(
- self._init_mcp_client_task_wrapper(
- data["name"], data["cfg"], event
- )
- )
- self.mcp_client_event[data["name"]] = event
- else:
- await self._init_mcp_clients()
- elif data["type"] == "terminate":
- if "name" in data:
- # await self._terminate_mcp_client(data["name"])
- if data["name"] in self.mcp_client_event:
- self.mcp_client_event[data["name"]].set()
- self.mcp_client_event.pop(data["name"], None)
- self.func_list = [
- f
- for f in self.func_list
- if not (
- f.origin == "mcp" and f.mcp_server_name == data["name"]
- )
- ]
- else:
- for name in self.mcp_client_dict.keys():
- # await self._terminate_mcp_client(name)
- # self.mcp_client_event[name].set()
- if name in self.mcp_client_event:
- self.mcp_client_event[name].set()
- self.mcp_client_event.pop(name, None)
- self.func_list = [f for f in self.func_list if f.origin != "mcp"]
-
async def _init_mcp_client_task_wrapper(
- self, name: str, cfg: dict, event: asyncio.Event
+ self,
+ name: str,
+ cfg: dict,
+ event: asyncio.Event,
+ ready_future: asyncio.Future = None,
) -> None:
"""初始化 MCP 客户端的包装函数,用于捕获异常"""
try:
await self._init_mcp_client(name, cfg)
+ tools = await self.mcp_client_dict[name].list_tools_and_save()
+ if ready_future and not ready_future.done():
+ # tell the caller we are ready
+ ready_future.set_result(tools)
await event.wait()
logger.info(f"收到 MCP 客户端 {name} 终止信号")
except Exception as e:
- import traceback
-
- traceback.print_exc()
- logger.error(f"初始化 MCP 客户端 {name} 失败: {e}")
+ logger.error(f"初始化 MCP 客户端 {name} 失败", exc_info=True)
+ if ready_future and not ready_future.done():
+ ready_future.set_exception(e)
finally:
# 无论如何都能清理
await self._terminate_mcp_client(name)
async def _init_mcp_client(self, name: str, config: dict) -> None:
"""初始化单个MCP客户端"""
- try:
- # 先清理之前的客户端,如果存在
- if name in self.mcp_client_dict:
- await self._terminate_mcp_client(name)
+ # 先清理之前的客户端,如果存在
+ if name in self.mcp_client_dict:
+ await self._terminate_mcp_client(name)
- mcp_client = MCPClient()
- mcp_client.name = name
- self.mcp_client_dict[name] = mcp_client
- await mcp_client.connect_to_server(config, name)
- tools_res = await mcp_client.list_tools_and_save()
- tool_names = [tool.name for tool in tools_res.tools]
+ mcp_client = MCPClient()
+ mcp_client.name = name
+ self.mcp_client_dict[name] = mcp_client
+ await mcp_client.connect_to_server(config, name)
+ tools_res = await mcp_client.list_tools_and_save()
+ logger.debug(f"MCP server {name} list tools response: {tools_res}")
+ tool_names = [tool.name for tool in tools_res.tools]
- # 移除该MCP服务之前的工具(如有)
- self.func_list = [
- f
- for f in self.func_list
- if not (f.origin == "mcp" and f.mcp_server_name == name)
- ]
+ # 移除该MCP服务之前的工具(如有)
+ self.func_list = [
+ f
+ for f in self.func_list
+ if not (f.origin == "mcp" and f.mcp_server_name == name)
+ ]
- # 将 MCP 工具转换为 FuncTool 并添加到 func_list
- for tool in mcp_client.tools:
- func_tool = FuncTool(
- name=tool.name,
- parameters=tool.inputSchema,
- description=tool.description,
- origin="mcp",
- mcp_server_name=name,
- mcp_client=mcp_client,
- )
- self.func_list.append(func_tool)
+ # 将 MCP 工具转换为 FuncTool 并添加到 func_list
+ for tool in mcp_client.tools:
+ func_tool = FuncTool(
+ name=tool.name,
+ parameters=tool.inputSchema,
+ description=tool.description,
+ origin="mcp",
+ mcp_server_name=name,
+ mcp_client=mcp_client,
+ )
+ self.func_list.append(func_tool)
- logger.info(f"已连接 MCP 服务 {name}, Tools: {tool_names}")
- return
- except Exception as e:
- import traceback
-
- logger.error(traceback.format_exc())
- logger.error(f"初始化 MCP 客户端 {name} 失败: {e}")
- # 发生错误时确保客户端被清理
- if name in self.mcp_client_dict:
- await self._terminate_mcp_client(name)
- return
+ logger.info(f"已连接 MCP 服务 {name}, Tools: {tool_names}")
async def _terminate_mcp_client(self, name: str) -> None:
"""关闭并清理MCP客户端"""
@@ -418,7 +446,7 @@ class FuncCall:
await self.mcp_client_dict[name].cleanup()
self.mcp_client_dict.pop(name)
except Exception as e:
- logger.info(f"清空 MCP 客户端资源 {name}: {e}。")
+ logger.error(f"清空 MCP 客户端资源 {name}: {e}。")
# 移除关联的FuncTool
self.func_list = [
f
@@ -427,6 +455,103 @@ class FuncCall:
]
logger.info(f"已关闭 MCP 服务 {name}")
+ @staticmethod
+ async def test_mcp_server_connection(config: dict) -> list[str]:
+ if "url" in config:
+ success, error_msg = await _quick_test_mcp_connection(config)
+ if not success:
+ raise Exception(error_msg)
+
+ mcp_client = MCPClient()
+ try:
+ logger.debug(f"testing MCP server connection with config: {config}")
+ await mcp_client.connect_to_server(config, "test")
+ tools_res = await mcp_client.list_tools_and_save()
+ tool_names = [tool.name for tool in tools_res.tools]
+ finally:
+ logger.debug("Cleaning up MCP client after testing connection.")
+ await mcp_client.cleanup()
+ return tool_names
+
+ async def enable_mcp_server(
+ self,
+ name: str,
+ config: dict,
+ event: asyncio.Event | None = None,
+ ready_future: asyncio.Future | None = None,
+ timeout: int = 30,
+ ) -> None:
+ """Enable_mcp_server a new MCP server to the manager and initialize it.
+
+ Args:
+ name (str): The name of the MCP server.
+ config (dict): Configuration for the MCP server.
+ event (asyncio.Event): Event to signal when the MCP client is ready.
+ ready_future (asyncio.Future): Future to signal when the MCP client is ready.
+ timeout (int): Timeout for the initialization.
+ Raises:
+ TimeoutError: If the initialization does not complete within the specified timeout.
+ Exception: If there is an error during initialization.
+ """
+ if not event:
+ event = asyncio.Event()
+ if not ready_future:
+ ready_future = asyncio.Future()
+ if name in self.mcp_client_dict:
+ return
+ asyncio.create_task(
+ self._init_mcp_client_task_wrapper(name, config, event, ready_future)
+ )
+ try:
+ await asyncio.wait_for(ready_future, timeout=timeout)
+ finally:
+ self.mcp_client_event[name] = event
+
+ if ready_future.done() and ready_future.exception():
+ exc = ready_future.exception()
+ if exc is not None:
+ raise exc
+
+ async def disable_mcp_server(
+ self, name: str | None = None, timeout: float = 10
+ ) -> None:
+ """Disable an MCP server by its name.
+
+ Args:
+ name (str): The name of the MCP server to disable. If None, ALL MCP servers will be disabled.
+ timeout (int): Timeout.
+ """
+ if name:
+ if name not in self.mcp_client_event:
+ return
+ client = self.mcp_client_dict.get(name)
+ self.mcp_client_event[name].set()
+ if not client:
+ return
+ client_running_event = client.running_event
+ try:
+ await asyncio.wait_for(client_running_event.wait(), timeout=timeout)
+ finally:
+ self.mcp_client_event.pop(name, None)
+ self.func_list = [
+ f
+ for f in self.func_list
+ if f.origin != "mcp" or f.mcp_server_name != name
+ ]
+ else:
+ running_events = [
+ client.running_event.wait() for client in self.mcp_client_dict.values()
+ ]
+ for key, event in self.mcp_client_event.items():
+ event.set()
+ # waiting for all clients to finish
+ try:
+ await asyncio.wait_for(asyncio.gather(*running_events), timeout=timeout)
+ finally:
+ self.mcp_client_event.clear()
+ self.mcp_client_dict.clear()
+ self.func_list = [f for f in self.func_list if f.origin != "mcp"]
+
def get_func_desc_openai_style(self, omit_empty_parameter_field=False) -> list:
"""
获得 OpenAI API 风格的**已经激活**的工具描述
diff --git a/astrbot/core/provider/manager.py b/astrbot/core/provider/manager.py
index df21e6a12..370c5322b 100644
--- a/astrbot/core/provider/manager.py
+++ b/astrbot/core/provider/manager.py
@@ -169,10 +169,7 @@ class ProviderManager:
self.curr_tts_provider_inst = self.tts_provider_insts[0]
# 初始化 MCP Client 连接
- asyncio.create_task(
- self.llm_tools.mcp_service_selector(), name="mcp-service-handler"
- )
- self.llm_tools.mcp_service_queue.put_nowait({"type": "init"})
+ asyncio.create_task(self.llm_tools.init_mcp_clients(), name="init_mcp_clients")
async def load_provider(self, provider_config: dict):
if not provider_config["enable"]:
@@ -422,7 +419,7 @@ class ProviderManager:
self.curr_tts_provider_inst = None
if getattr(self.inst_map[provider_id], "terminate", None):
- await self.inst_map[provider_id].terminate() # type: ignore
+ await self.inst_map[provider_id].terminate() # type: ignore
logger.info(
f"{provider_id} 提供商适配器已终止({len(self.provider_insts)}, {len(self.stt_provider_insts)}, {len(self.tts_provider_insts)})"
@@ -432,6 +429,8 @@ class ProviderManager:
async def terminate(self):
for provider_inst in self.provider_insts:
if hasattr(provider_inst, "terminate"):
- await provider_inst.terminate() # type: ignore
- # 清理 MCP Client 连接
- await self.llm_tools.mcp_service_queue.put({"type": "terminate"})
+ await provider_inst.terminate() # type: ignore
+ try:
+ await self.llm_tools.disable_mcp_server()
+ except Exception:
+ logger.error("Error while disabling MCP servers", exc_info=True)
diff --git a/astrbot/dashboard/routes/tools.py b/astrbot/dashboard/routes/tools.py
index d38014c71..5dad2576b 100644
--- a/astrbot/dashboard/routes/tools.py
+++ b/astrbot/dashboard/routes/tools.py
@@ -26,6 +26,7 @@ class ToolsRoute(Route):
"/tools/mcp/update": ("POST", self.update_mcp_server),
"/tools/mcp/delete": ("POST", self.delete_mcp_server),
"/tools/mcp/market": ("GET", self.get_mcp_markets),
+ "/tools/mcp/test": ("POST", self.test_mcp_connection),
}
self.register_routes()
self.tool_mgr = self.core_lifecycle.provider_manager.llm_tools
@@ -132,12 +133,19 @@ class ToolsRoute(Route):
config["mcpServers"][name] = server_config
if self.save_mcp_config(config):
- # 动态初始化新MCP客户端
- await self.tool_mgr.mcp_service_queue.put({
- "type": "init",
- "name": name,
- "cfg": config["mcpServers"][name],
- })
+ try:
+ await self.tool_mgr.enable_mcp_server(
+ name, server_config, timeout=30
+ )
+ except TimeoutError:
+ return Response().error(f"启用 MCP 服务器 {name} 超时。").__dict__
+ except Exception as e:
+ logger.error(traceback.format_exc())
+ return (
+ Response()
+ .error(f"启用 MCP 服务器 {name} 失败: {str(e)}")
+ .__dict__
+ )
return Response().ok(None, f"成功添加 MCP 服务器 {name}").__dict__
else:
return Response().error("保存配置失败").__dict__
@@ -193,31 +201,55 @@ class ToolsRoute(Route):
if self.save_mcp_config(config):
# 处理MCP客户端状态变化
if active:
- # 如果要激活服务器或者配置已更改
if name in self.tool_mgr.mcp_client_dict or not only_update_active:
- await self.tool_mgr.mcp_service_queue.put({
- "type": "terminate",
- "name": name,
- })
- await self.tool_mgr.mcp_service_queue.put({
- "type": "init",
- "name": name,
- "cfg": config["mcpServers"][name],
- })
- else:
- # 客户端不存在,初始化
- await self.tool_mgr.mcp_service_queue.put({
- "type": "init",
- "name": name,
- "cfg": config["mcpServers"][name],
- })
+ try:
+ await self.tool_mgr.disable_mcp_server(name, timeout=10)
+ except TimeoutError as e:
+ return (
+ Response()
+ .error(f"启用前停用 MCP 服务器时 {name} 超时: {str(e)}")
+ .__dict__
+ )
+ except Exception as e:
+ logger.error(traceback.format_exc())
+ return (
+ Response()
+ .error(f"启用前停用 MCP 服务器时 {name} 失败: {str(e)}")
+ .__dict__
+ )
+ try:
+ await self.tool_mgr.enable_mcp_server(
+ name, config["mcpServers"][name], timeout=30
+ )
+ except TimeoutError:
+ return (
+ Response().error(f"启用 MCP 服务器 {name} 超时。").__dict__
+ )
+ except Exception as e:
+ logger.error(traceback.format_exc())
+ return (
+ Response()
+ .error(f"启用 MCP 服务器 {name} 失败: {str(e)}")
+ .__dict__
+ )
else:
# 如果要停用服务器
if name in self.tool_mgr.mcp_client_dict:
- self.tool_mgr.mcp_service_queue.put_nowait({
- "type": "terminate",
- "name": name,
- })
+ try:
+ await self.tool_mgr.disable_mcp_server(name, timeout=10)
+ except TimeoutError:
+ return (
+ Response()
+ .error(f"停用 MCP 服务器 {name} 超时。")
+ .__dict__
+ )
+ except Exception as e:
+ logger.error(traceback.format_exc())
+ return (
+ Response()
+ .error(f"停用 MCP 服务器 {name} 失败: {str(e)}")
+ .__dict__
+ )
return Response().ok(None, f"成功更新 MCP 服务器 {name}").__dict__
else:
@@ -239,17 +271,23 @@ class ToolsRoute(Route):
if name not in config["mcpServers"]:
return Response().error(f"服务器 {name} 不存在").__dict__
- # 删除服务器配置
del config["mcpServers"][name]
if self.save_mcp_config(config):
- # 关闭并删除MCP客户端
if name in self.tool_mgr.mcp_client_dict:
- self.tool_mgr.mcp_service_queue.put_nowait({
- "type": "terminate",
- "name": name,
- })
-
+ try:
+ await self.tool_mgr.disable_mcp_server(name, timeout=10)
+ except TimeoutError:
+ return (
+ Response().error(f"停用 MCP 服务器 {name} 超时。").__dict__
+ )
+ except Exception as e:
+ logger.error(traceback.format_exc())
+ return (
+ Response()
+ .error(f"停用 MCP 服务器 {name} 失败: {str(e)}")
+ .__dict__
+ )
return Response().ok(None, f"成功删除 MCP 服务器 {name}").__dict__
else:
return Response().error("保存配置失败").__dict__
@@ -281,3 +319,20 @@ class ToolsRoute(Route):
except Exception as _:
logger.error(traceback.format_exc())
return Response().error("获取市场数据失败").__dict__
+
+ async def test_mcp_connection(self):
+ """
+ 测试 MCP 服务器连接
+ """
+ try:
+ server_data = await request.json
+ config = server_data.get("mcp_server_config", None)
+
+ tools_name = await self.tool_mgr.test_mcp_server_connection(config)
+ return (
+ Response().ok(data=tools_name, message="🎉 MCP 服务器可用!").__dict__
+ )
+
+ except Exception as e:
+ logger.error(traceback.format_exc())
+ return Response().error(f"测试 MCP 连接失败: {str(e)}").__dict__
diff --git a/dashboard/src/components/shared/ItemCard.vue b/dashboard/src/components/shared/ItemCard.vue
index ff790cb7b..6152c531f 100644
--- a/dashboard/src/components/shared/ItemCard.vue
+++ b/dashboard/src/components/shared/ItemCard.vue
@@ -9,6 +9,8 @@
hide-details
density="compact"
:model-value="getItemEnabled()"
+ :loading="loading"
+ :disabled="loading"
v-bind="props"
@update:model-value="toggleEnabled"
>
@@ -77,6 +79,10 @@ export default {
bglogo: {
type: String,
default: null
+ },
+ loading: {
+ type: Boolean,
+ default: false
}
},
emits: ['toggle-enabled', 'delete', 'edit'],
diff --git a/dashboard/src/i18n/locales/en-US/features/tool-use.json b/dashboard/src/i18n/locales/en-US/features/tool-use.json
index fad67a0d5..bd36fd68a 100644
--- a/dashboard/src/i18n/locales/en-US/features/tool-use.json
+++ b/dashboard/src/i18n/locales/en-US/features/tool-use.json
@@ -15,7 +15,9 @@
"buttons": {
"refresh": "Refresh",
"add": "Add Server",
- "useTemplate": "Use Template"
+ "useTemplateStdio": "Stdio Template",
+ "useTemplateStreamableHttp": "Streamable HTTP Template",
+ "useTemplateSse": "SSE Template"
},
"empty": "No MCP servers available, click Add Server to add one",
"status": {
@@ -68,10 +70,6 @@
"enable": "Enable Server",
"config": "Server Configuration"
},
- "configNotes": {
- "note1": "1. Some MCP servers may require filling in `API_KEY` or `TOKEN` information in env according to their requirements, please check if filled.",
- "note2": "2. When url parameter is specified in configuration: if `transport` parameter is also specified as `streamable_http`, Streamable HTTP is used, otherwise SSE connection is used."
- },
"errors": {
"configEmpty": "Configuration cannot be empty",
"jsonFormat": "JSON format error: {error}",
@@ -79,7 +77,8 @@
},
"buttons": {
"cancel": "Cancel",
- "save": "Save"
+ "save": "Save",
+ "testConnection": "Test Connection"
}
},
"serverDetail": {
diff --git a/dashboard/src/i18n/locales/zh-CN/features/tool-use.json b/dashboard/src/i18n/locales/zh-CN/features/tool-use.json
index f44a16d59..c9e8e858d 100644
--- a/dashboard/src/i18n/locales/zh-CN/features/tool-use.json
+++ b/dashboard/src/i18n/locales/zh-CN/features/tool-use.json
@@ -15,7 +15,9 @@
"buttons": {
"refresh": "刷新",
"add": "新增服务器",
- "useTemplate": "使用模板"
+ "useTemplateStdio": "Stdio 模板",
+ "useTemplateStreamableHttp": "Streamable HTTP 模板",
+ "useTemplateSse": "SSE 模板"
},
"empty": "暂无 MCP 服务器,点击 新增服务器 添加",
"status": {
@@ -68,10 +70,6 @@
"enable": "启用服务器",
"config": "服务器配置"
},
- "configNotes": {
- "note1": "1. 某些 MCP 服务器可能需要按照其要求在 env 中填充 `API_KEY` 或 `TOKEN` 等信息,请注意检查是否填写。",
- "note2": "2. 当配置中指定 url 参数时:如果还同时指定 `transport` 参数的值为 `streamable_http`,则使用 Steamable HTTP,否则使用 SSE 连接。"
- },
"errors": {
"configEmpty": "配置不能为空",
"jsonFormat": "JSON 格式错误: {error}",
@@ -79,7 +77,8 @@
},
"buttons": {
"cancel": "取消",
- "save": "保存"
+ "save": "保存",
+ "testConnection": "测试连接"
}
},
"serverDetail": {
diff --git a/dashboard/src/views/ToolUsePage.vue b/dashboard/src/views/ToolUsePage.vue
index cc70c415b..93f53924c 100644
--- a/dashboard/src/views/ToolUsePage.vue
+++ b/dashboard/src/views/ToolUsePage.vue
@@ -20,7 +20,8 @@
-
+
{{ tm('mcpServers.buttons.add') }}
@@ -49,7 +50,8 @@
mdi-server
{{ tm('mcpServers.title') }}
-
+
{{ tm('mcpServers.buttons.refresh') }}
-
+
mdi-file-code
-
+
{{ getServerConfigSummary(item) }}
-
-
-
mdi-tools
-
{{ tm('mcpServers.status.availableTools') }} ({{ item.tools.length }})
+
+
+
+
+
+
mdi-tools
+
+
+
+ {{ tm('mcpServers.status.availableTools', { count: item.tools.length }) }} ({{
+ item.tools.length }})
+
+
+
+
+
+ {{ tm('mcpServers.status.availableTools') }}
+
+
+
+
+
+
+ Close
+
+
+
+
+
+
+
+
+
+
+ mdi-alert-circle
+ {{ tm('mcpServers.status.noTools') }}
+
+
+
+
-
-
- {{ tool }}
-
-
-
-
- mdi-alert-circle
- {{ tm('mcpServers.status.noTools') }}
+
+
@@ -131,8 +162,9 @@
-
+
mdi-store
{{ tm('marketplace.title') }}
-
+
{{ tm('marketplace.buttons.refresh') }}
@@ -256,7 +288,8 @@
mdi-tools
- {{ tm('marketplace.status.availableTools', { count: server.tools ? server.tools.length : 0 }) }}
+ {{ tm('marketplace.status.availableTools', { count: server.tools ? server.tools.length : 0 })
+ }}
@@ -310,31 +343,25 @@
-
-
-
+
{{ tm('dialogs.addServer.fields.config') }}
-
-
- mdi-information
-
-
- {{ tm('tooltip.serverConfig') }}
-
-
-
- {{ tm('mcpServers.buttons.useTemplate') }}
+
+ {{ tm('mcpServers.buttons.useTemplateStdio') }}
+
+
+ {{ tm('mcpServers.buttons.useTemplateStreamableHttp') }}
+
+
+ {{ tm('mcpServers.buttons.useTemplateSse') }}
- {{ tm('dialogs.addServer.configNotes.note1') }}
-
- {{ tm('dialogs.addServer.configNotes.note2') }}
-
+
+ {{ addServerDialogMessage }}
+
-
+
{{ tm('dialogs.addServer.buttons.cancel') }}
+
+ {{ tm('dialogs.addServer.buttons.testConnection') }}
+
{{ tm('dialogs.addServer.buttons.save') }}
@@ -504,8 +536,11 @@ export default {
tools: [],
showMcpServerDialog: false,
showServerDetailDialog: false,
+ addServerDialogMessage: "",
showTools: true,
loading: false,
+ loadingGettingServers: false,
+ mcpServerUpdateLoaders: {}, // record loading state for each server update
isEditMode: false,
serverConfigJson: '',
jsonError: null,
@@ -575,10 +610,10 @@ export default {
if (!this.marketplaceSearch.trim()) {
return this.marketplaceServers;
}
-
+
const searchTerm = this.marketplaceSearch.toLowerCase();
- return this.marketplaceServers.filter(server =>
- server.name.toLowerCase().includes(searchTerm) ||
+ return this.marketplaceServers.filter(server =>
+ server.name.toLowerCase().includes(searchTerm) ||
(server.name_h && server.name_h.toLowerCase().includes(searchTerm)) ||
(server.description && server.description.toLowerCase().includes(searchTerm))
);
@@ -618,17 +653,21 @@ export default {
},
getServers() {
- this.loading = true
+ this.loadingGettingServers = true;
axios.get('/api/tools/mcp/servers')
.then(response => {
this.mcpServers = response.data.data || [];
+ this.mcpServers.forEach(server => {
+ // Ensure each server has a loader state
+ if (!this.mcpServerUpdateLoaders[server.name]) {
+ this.mcpServerUpdateLoaders[server.name] = false;
+ }
+ });
})
.catch(error => {
this.showError(this.tm('messages.getServersError', { error: error.message }));
}).finally(() => {
- setTimeout(() => {
- this.loading = false;
- }, 500);
+ this.loadingGettingServers = false;
});
},
@@ -658,14 +697,28 @@ export default {
}
},
- setConfigTemplate() {
- // 设置一个基本的配置模板
- const template = {
- command: "python",
- args: ["-m", "your_module"],
- // 可以添加其他 MCP 支持的配置项
- };
-
+ setConfigTemplate(type = 'stdio') {
+ let template = {};
+ if (type === 'streamable_http') {
+ template = {
+ transport: "streamable_http",
+ url: "your mcp server url",
+ headers: {},
+ timeout: 30,
+ };
+ } else if (type === 'sse') {
+ template = {
+ transport: "sse",
+ url: "your mcp server url",
+ headers: {},
+ timeout: 30,
+ };
+ } else {
+ template = {
+ command: "python",
+ args: ["-m", "your_module"],
+ };
+ }
this.serverConfigJson = JSON.stringify(template, null, 2);
},
@@ -693,6 +746,7 @@ export default {
.then(response => {
this.loading = false;
this.showMcpServerDialog = false;
+ this.addServerDialogMessage = "";
this.getServers();
this.getTools();
this.showSuccess(response.data.message || this.tm('messages.saveSuccess'));
@@ -753,6 +807,7 @@ export default {
updateServerStatus(server) {
// 切换服务器状态
+ this.mcpServerUpdateLoaders[server.name] = true;
server.active = !server.active;
axios.post('/api/tools/mcp/update', server)
.then(response => {
@@ -761,16 +816,48 @@ export default {
})
.catch(error => {
this.showError(this.tm('messages.updateError', { error: error.response?.data?.message || error.message }));
- // 回滚状态
server.active = !server.active;
+ })
+ .finally(() => {
+ this.mcpServerUpdateLoaders[server.name] = false;
});
},
closeServerDialog() {
this.showMcpServerDialog = false;
+ this.addServerDialogMessage = '';
this.resetForm();
},
+ testServerConnection() {
+ if (!this.validateJson()) {
+ return;
+ }
+
+ this.loading = true;
+
+ let configObj;
+ try {
+ configObj = JSON.parse(this.serverConfigJson);
+ } catch (e) {
+ this.loading = false;
+ this.showError(this.tm('dialogs.addServer.errors.jsonParse', { error: e.message }));
+ return;
+ }
+
+ axios.post('/api/tools/mcp/test', {
+ "mcp_server_config": configObj,
+ })
+ .then(response => {
+ this.loading = false;
+ this.addServerDialogMessage = `${response.data.message} (tools: ${response.data.data})`;
+ })
+ .catch(error => {
+ this.loading = false;
+ this.showError(this.tm('messages.testError', { error: error.response?.data?.message || error.message }));
+ });
+ },
+
resetForm() {
this.currentServer = {
name: '',
@@ -939,7 +1026,7 @@ export default {
.monaco-container {
border: 1px solid rgba(0, 0, 0, 0.1);
- border-radius: 4px;
+ border-radius: 8px;
height: 300px;
margin-top: 4px;
overflow: hidden;