Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8132ce24eb | |||
| 38e99cf65c | |||
| 2cfe4288b2 | |||
| 4924739423 | |||
| ec9f7403d5 | |||
| 0de7ae8481 |
@@ -4,6 +4,7 @@ import asyncio
|
|||||||
import copy
|
import copy
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import urllib.parse
|
||||||
from collections.abc import AsyncGenerator, Awaitable, Callable
|
from collections.abc import AsyncGenerator, Awaitable, Callable
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@@ -212,15 +213,93 @@ class FunctionToolManager:
|
|||||||
open(mcp_json_file, encoding="utf-8"),
|
open(mcp_json_file, encoding="utf-8"),
|
||||||
)["mcpServers"]
|
)["mcpServers"]
|
||||||
|
|
||||||
for name in mcp_server_json_obj:
|
tasks: dict[str, asyncio.Task] = {}
|
||||||
cfg = mcp_server_json_obj[name]
|
ready_futures: dict[str, asyncio.Future] = {}
|
||||||
|
|
||||||
|
for name, cfg in mcp_server_json_obj.items():
|
||||||
if cfg.get("active", True):
|
if cfg.get("active", True):
|
||||||
event = asyncio.Event()
|
event = asyncio.Event()
|
||||||
asyncio.create_task(
|
ready_future = asyncio.get_running_loop().create_future()
|
||||||
self._init_mcp_client_task_wrapper(name, cfg, event),
|
task = asyncio.create_task(
|
||||||
|
self._init_mcp_client_task_wrapper(
|
||||||
|
name,
|
||||||
|
cfg,
|
||||||
|
event,
|
||||||
|
ready_future,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
tasks[name] = task
|
||||||
|
ready_futures[name] = ready_future
|
||||||
self.mcp_client_event[name] = event
|
self.mcp_client_event[name] = event
|
||||||
|
|
||||||
|
if ready_futures:
|
||||||
|
logger.info(f"等待 {len(ready_futures)} 个 MCP 服务初始化...")
|
||||||
|
|
||||||
|
_, pending_futures = await asyncio.wait(
|
||||||
|
ready_futures.values(),
|
||||||
|
timeout=20.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
pending_services = {
|
||||||
|
name
|
||||||
|
for name, ready_future in ready_futures.items()
|
||||||
|
if ready_future in pending_futures
|
||||||
|
}
|
||||||
|
|
||||||
|
if pending_services:
|
||||||
|
logger.warning(
|
||||||
|
"MCP 服务初始化超时(20秒),部分服务可能未完全加载。"
|
||||||
|
"建议检查 MCP 服务器配置和网络连接。"
|
||||||
|
)
|
||||||
|
for name in pending_services:
|
||||||
|
task = tasks[name]
|
||||||
|
task.cancel()
|
||||||
|
await asyncio.gather(
|
||||||
|
*(tasks[name] for name in pending_services),
|
||||||
|
return_exceptions=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
success_count = 0
|
||||||
|
failed_services: list[str] = []
|
||||||
|
|
||||||
|
for name, ready_future in ready_futures.items():
|
||||||
|
if name in pending_services:
|
||||||
|
logger.error(f"MCP 服务 {name} 初始化超时")
|
||||||
|
failed_services.append(name)
|
||||||
|
self.mcp_client_event.pop(name, None)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if ready_future.cancelled():
|
||||||
|
logger.error(f"MCP 服务 {name} 初始化已取消")
|
||||||
|
failed_services.append(name)
|
||||||
|
self.mcp_client_event.pop(name, None)
|
||||||
|
continue
|
||||||
|
|
||||||
|
exc = ready_future.exception()
|
||||||
|
if exc is not None:
|
||||||
|
logger.error(f"MCP 服务 {name} 初始化失败: {exc}")
|
||||||
|
# 仅在 debug 级别输出完整配置,避免在生产日志中泄露敏感信息
|
||||||
|
cfg = mcp_server_json_obj.get(name, {})
|
||||||
|
if "command" in cfg:
|
||||||
|
logger.debug(f" 命令: {cfg['command']}")
|
||||||
|
if "args" in cfg:
|
||||||
|
logger.debug(f" 参数: {cfg['args']}")
|
||||||
|
elif "url" in cfg:
|
||||||
|
parsed = urllib.parse.urlparse(cfg["url"])
|
||||||
|
logger.debug(f" 主机: {parsed.scheme}://{parsed.netloc}")
|
||||||
|
failed_services.append(name)
|
||||||
|
self.mcp_client_event.pop(name, None)
|
||||||
|
else:
|
||||||
|
success_count += 1
|
||||||
|
|
||||||
|
if failed_services:
|
||||||
|
logger.warning(
|
||||||
|
f"以下 MCP 服务初始化失败: {', '.join(failed_services)}。"
|
||||||
|
f"请检查配置文件 mcp_server.json 和服务器可用性。"
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"MCP 服务初始化完成: {success_count}/{len(tasks)} 成功")
|
||||||
|
|
||||||
async def _init_mcp_client_task_wrapper(
|
async def _init_mcp_client_task_wrapper(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
@@ -229,20 +308,29 @@ class FunctionToolManager:
|
|||||||
ready_future: asyncio.Future | None = None,
|
ready_future: asyncio.Future | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""初始化 MCP 客户端的包装函数,用于捕获异常"""
|
"""初始化 MCP 客户端的包装函数,用于捕获异常"""
|
||||||
|
initialized = False
|
||||||
try:
|
try:
|
||||||
await self._init_mcp_client(name, cfg)
|
await self._init_mcp_client(name, cfg)
|
||||||
tools = await self.mcp_client_dict[name].list_tools_and_save()
|
initialized = True
|
||||||
if ready_future and not ready_future.done():
|
if ready_future and not ready_future.done():
|
||||||
# tell the caller we are ready
|
ready_future.set_result(True)
|
||||||
ready_future.set_result(tools)
|
|
||||||
await event.wait()
|
await event.wait()
|
||||||
logger.info(f"收到 MCP 客户端 {name} 终止信号")
|
logger.info(f"收到 MCP 客户端 {name} 终止信号")
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
if ready_future and not ready_future.done():
|
||||||
|
ready_future.set_exception(
|
||||||
|
asyncio.TimeoutError("MCP 客户端初始化超时"),
|
||||||
|
)
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"初始化 MCP 客户端 {name} 失败", exc_info=True)
|
|
||||||
if ready_future and not ready_future.done():
|
if ready_future and not ready_future.done():
|
||||||
ready_future.set_exception(e)
|
ready_future.set_exception(e)
|
||||||
|
if not initialized:
|
||||||
|
# 初始化阶段失败,记录错误并向上抛出让 task.exception() 捕获
|
||||||
|
logger.error(f"初始化 MCP 客户端 {name} 失败", exc_info=True)
|
||||||
|
raise
|
||||||
|
# 初始化已成功,此处异常来自 event.wait() 被取消,属于正常终止流程
|
||||||
finally:
|
finally:
|
||||||
# 无论如何都能清理
|
|
||||||
await self._terminate_mcp_client(name)
|
await self._terminate_mcp_client(name)
|
||||||
|
|
||||||
async def _init_mcp_client(self, name: str, config: dict) -> None:
|
async def _init_mcp_client(self, name: str, config: dict) -> None:
|
||||||
@@ -340,22 +428,22 @@ class FunctionToolManager:
|
|||||||
if not event:
|
if not event:
|
||||||
event = asyncio.Event()
|
event = asyncio.Event()
|
||||||
if not ready_future:
|
if not ready_future:
|
||||||
ready_future = asyncio.Future()
|
ready_future = asyncio.get_running_loop().create_future()
|
||||||
if name in self.mcp_client_dict:
|
if name in self.mcp_client_dict:
|
||||||
return
|
return
|
||||||
asyncio.create_task(
|
init_task = asyncio.create_task(
|
||||||
self._init_mcp_client_task_wrapper(name, config, event, ready_future),
|
self._init_mcp_client_task_wrapper(name, config, event, ready_future),
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(ready_future, timeout=timeout)
|
await asyncio.wait_for(ready_future, timeout=timeout)
|
||||||
finally:
|
except asyncio.TimeoutError:
|
||||||
|
init_task.cancel()
|
||||||
|
await asyncio.gather(init_task, return_exceptions=True)
|
||||||
|
self.mcp_client_event.pop(name, None)
|
||||||
|
raise
|
||||||
|
else:
|
||||||
self.mcp_client_event[name] = event
|
self.mcp_client_event[name] = event
|
||||||
|
|
||||||
if ready_future.done() and ready_future.exception():
|
|
||||||
exc = ready_future.exception()
|
|
||||||
if exc is not None:
|
|
||||||
raise exc
|
|
||||||
|
|
||||||
async def disable_mcp_server(
|
async def disable_mcp_server(
|
||||||
self,
|
self,
|
||||||
name: str | None = None,
|
name: str | None = None,
|
||||||
|
|||||||
@@ -274,8 +274,8 @@ class ProviderManager:
|
|||||||
if not self.curr_tts_provider_inst and self.tts_provider_insts:
|
if not self.curr_tts_provider_inst and self.tts_provider_insts:
|
||||||
self.curr_tts_provider_inst = self.tts_provider_insts[0]
|
self.curr_tts_provider_inst = self.tts_provider_insts[0]
|
||||||
|
|
||||||
# 初始化 MCP Client 连接
|
# 初始化 MCP Client 连接(等待完成以确保工具可用)
|
||||||
asyncio.create_task(self.llm_tools.init_mcp_clients(), name="init_mcp_clients")
|
await self.llm_tools.init_mcp_clients()
|
||||||
|
|
||||||
def dynamic_import_provider(self, type: str):
|
def dynamic_import_provider(self, type: str):
|
||||||
"""动态导入提供商适配器模块
|
"""动态导入提供商适配器模块
|
||||||
|
|||||||
Reference in New Issue
Block a user