Compare commits

...

6 Commits

Author SHA1 Message Date
邹永赫 8132ce24eb fix: correctly synchronize MCP client initialization 2026-03-01 21:22:41 +09:00
idiotsj 38e99cf65c fix: restore task cancellation on timeout per review feedback
Pending tasks in asyncio.wait are tasks that have NOT completed
initialization within 20s, so cancelling them is safe and correct.
2026-02-27 22:21:47 +08:00
idiotsj 2cfe4288b2 fix: prevent initialized MCP clients from being cleaned up on timeout
- Do not cancel pending tasks on timeout; let them continue running
  in the background waiting for the termination signal (event.set()),
  so successfully initialized services remain available
- Track initialization state with a flag to distinguish init failures
  from post-init cancellations in _init_mcp_client_task_wrapper
2026-02-27 22:18:55 +08:00
idiotsj 4924739423 refactor: simplify MCP init orchestration and improve log security
- Replace Future-based sync with asyncio.wait + name→task mapping
- Explicitly cancel timed-out tasks after 20s timeout
- Downgrade sensitive config details (command/args/URL) to debug level
- Move urllib.parse import to top-level
2026-02-27 21:59:32 +08:00
idiotsj ec9f7403d5 perf: add timeout and better error handling for MCP initialization
- Add 20-second total timeout to prevent slow MCP servers from blocking startup
- Show detailed configuration info when MCP initialization fails
- List all failed services in a summary warning
- Gracefully handle timeout by using already-completed services

This ensures that even if some MCP servers are slow or unreachable,
the system will start within a reasonable time and provide clear
feedback about which services failed and why.
2026-02-27 20:24:17 +08:00
idiotsj 0de7ae8481 fix: resolve MCP tools race condition causing 'completion 无法解析' error
- Wait for MCP client initialization to complete before accepting requests
- Add Future-based synchronization in init_mcp_clients()
- Prevent tool_calls from being rejected due to empty func_list
- Improve error logging for MCP initialization failures

Fixes race condition where AI attempts to call MCP tools before they are
registered, resulting in 'API 返回的 completion 无法解析' exceptions.

The issue occurred because:
1. MCP clients were initialized asynchronously without waiting
2. System accepted user requests immediately after startup
3. AI received empty tool list and attempted to call non-existent tools
4. Tool matching failed, causing parsing errors

This fix ensures all MCP tools are loaded before the system processes
any requests that might use them.
2026-02-27 20:05:13 +08:00
2 changed files with 107 additions and 19 deletions
+105 -17
View File
@@ -4,6 +4,7 @@ import asyncio
import copy
import json
import os
import urllib.parse
from collections.abc import AsyncGenerator, Awaitable, Callable
from typing import Any
@@ -212,15 +213,93 @@ class FunctionToolManager:
open(mcp_json_file, encoding="utf-8"),
)["mcpServers"]
for name in mcp_server_json_obj:
cfg = mcp_server_json_obj[name]
tasks: dict[str, asyncio.Task] = {}
ready_futures: dict[str, asyncio.Future] = {}
for name, cfg in mcp_server_json_obj.items():
if cfg.get("active", True):
event = asyncio.Event()
asyncio.create_task(
self._init_mcp_client_task_wrapper(name, cfg, event),
ready_future = asyncio.get_running_loop().create_future()
task = asyncio.create_task(
self._init_mcp_client_task_wrapper(
name,
cfg,
event,
ready_future,
),
)
tasks[name] = task
ready_futures[name] = ready_future
self.mcp_client_event[name] = event
if ready_futures:
logger.info(f"等待 {len(ready_futures)} 个 MCP 服务初始化...")
_, pending_futures = await asyncio.wait(
ready_futures.values(),
timeout=20.0,
)
pending_services = {
name
for name, ready_future in ready_futures.items()
if ready_future in pending_futures
}
if pending_services:
logger.warning(
"MCP 服务初始化超时(20秒),部分服务可能未完全加载。"
"建议检查 MCP 服务器配置和网络连接。"
)
for name in pending_services:
task = tasks[name]
task.cancel()
await asyncio.gather(
*(tasks[name] for name in pending_services),
return_exceptions=True,
)
success_count = 0
failed_services: list[str] = []
for name, ready_future in ready_futures.items():
if name in pending_services:
logger.error(f"MCP 服务 {name} 初始化超时")
failed_services.append(name)
self.mcp_client_event.pop(name, None)
continue
if ready_future.cancelled():
logger.error(f"MCP 服务 {name} 初始化已取消")
failed_services.append(name)
self.mcp_client_event.pop(name, None)
continue
exc = ready_future.exception()
if exc is not None:
logger.error(f"MCP 服务 {name} 初始化失败: {exc}")
# 仅在 debug 级别输出完整配置,避免在生产日志中泄露敏感信息
cfg = mcp_server_json_obj.get(name, {})
if "command" in cfg:
logger.debug(f" 命令: {cfg['command']}")
if "args" in cfg:
logger.debug(f" 参数: {cfg['args']}")
elif "url" in cfg:
parsed = urllib.parse.urlparse(cfg["url"])
logger.debug(f" 主机: {parsed.scheme}://{parsed.netloc}")
failed_services.append(name)
self.mcp_client_event.pop(name, None)
else:
success_count += 1
if failed_services:
logger.warning(
f"以下 MCP 服务初始化失败: {', '.join(failed_services)}"
f"请检查配置文件 mcp_server.json 和服务器可用性。"
)
logger.info(f"MCP 服务初始化完成: {success_count}/{len(tasks)} 成功")
async def _init_mcp_client_task_wrapper(
self,
name: str,
@@ -229,20 +308,29 @@ class FunctionToolManager:
ready_future: asyncio.Future | None = None,
) -> None:
"""初始化 MCP 客户端的包装函数,用于捕获异常"""
initialized = False
try:
await self._init_mcp_client(name, cfg)
tools = await self.mcp_client_dict[name].list_tools_and_save()
initialized = True
if ready_future and not ready_future.done():
# tell the caller we are ready
ready_future.set_result(tools)
ready_future.set_result(True)
await event.wait()
logger.info(f"收到 MCP 客户端 {name} 终止信号")
except asyncio.CancelledError:
if ready_future and not ready_future.done():
ready_future.set_exception(
asyncio.TimeoutError("MCP 客户端初始化超时"),
)
raise
except Exception as e:
logger.error(f"初始化 MCP 客户端 {name} 失败", exc_info=True)
if ready_future and not ready_future.done():
ready_future.set_exception(e)
if not initialized:
# 初始化阶段失败,记录错误并向上抛出让 task.exception() 捕获
logger.error(f"初始化 MCP 客户端 {name} 失败", exc_info=True)
raise
# 初始化已成功,此处异常来自 event.wait() 被取消,属于正常终止流程
finally:
# 无论如何都能清理
await self._terminate_mcp_client(name)
async def _init_mcp_client(self, name: str, config: dict) -> None:
@@ -340,22 +428,22 @@ class FunctionToolManager:
if not event:
event = asyncio.Event()
if not ready_future:
ready_future = asyncio.Future()
ready_future = asyncio.get_running_loop().create_future()
if name in self.mcp_client_dict:
return
asyncio.create_task(
init_task = asyncio.create_task(
self._init_mcp_client_task_wrapper(name, config, event, ready_future),
)
try:
await asyncio.wait_for(ready_future, timeout=timeout)
finally:
except asyncio.TimeoutError:
init_task.cancel()
await asyncio.gather(init_task, return_exceptions=True)
self.mcp_client_event.pop(name, None)
raise
else:
self.mcp_client_event[name] = event
if ready_future.done() and ready_future.exception():
exc = ready_future.exception()
if exc is not None:
raise exc
async def disable_mcp_server(
self,
name: str | None = None,
+2 -2
View File
@@ -274,8 +274,8 @@ class ProviderManager:
if not self.curr_tts_provider_inst and self.tts_provider_insts:
self.curr_tts_provider_inst = self.tts_provider_insts[0]
# 初始化 MCP Client 连接
asyncio.create_task(self.llm_tools.init_mcp_clients(), name="init_mcp_clients")
# 初始化 MCP Client 连接(等待完成以确保工具可用)
await self.llm_tools.init_mcp_clients()
def dynamic_import_provider(self, type: str):
"""动态导入提供商适配器模块