diff --git a/astrbot/core/log.py b/astrbot/core/log.py index 6609b8246..9b78eaec6 100644 --- a/astrbot/core/log.py +++ b/astrbot/core/log.py @@ -25,6 +25,7 @@ import logging import colorlog import asyncio import os +import sys from collections import deque from asyncio import Queue from typing import List @@ -171,7 +172,9 @@ class LogManager: if logger.hasHandlers(): return logger # 如果logger没有处理器 - console_handler = logging.StreamHandler() # 创建一个StreamHandler用于控制台输出 + console_handler = logging.StreamHandler( + sys.stdout + ) # 创建一个StreamHandler用于控制台输出 console_handler.setLevel( logging.DEBUG ) # 将日志级别设置为DEBUG(最低级别, 显示所有日志), *如果插件没有设置级别, 默认为DEBUG diff --git a/astrbot/core/provider/func_tool_manager.py b/astrbot/core/provider/func_tool_manager.py index 53f5048fa..793d21a12 100644 --- a/astrbot/core/provider/func_tool_manager.py +++ b/astrbot/core/provider/func_tool_manager.py @@ -4,12 +4,14 @@ import textwrap import os import asyncio import copy +import logging from typing import Dict, List, Awaitable, Literal, Any from dataclasses import dataclass from typing import Optional from contextlib import AsyncExitStack from astrbot import logger +from astrbot.core.utils.log_pipe import LogPipe try: import mcp @@ -87,8 +89,9 @@ class MCPClient: self.name = None self.active: bool = True self.tools: List[mcp.Tool] = [] + self.server_errlogs: List[str] = [] - async def connect_to_server(self, mcp_server_config: dict): + async def connect_to_server(self, mcp_server_config: dict, name: str): """Connect to an MCP server Args: @@ -98,19 +101,30 @@ class MCPClient: if "mcpServers" in cfg and len(cfg["mcpServers"]) > 0: key_0 = list(cfg["mcpServers"].keys())[0] cfg = cfg["mcpServers"][key_0] - cfg.pop("active", None) # Remove active flag from config + cfg.pop("active", None) # Remove active flag from config server_params = mcp.StdioServerParameters( **cfg, ) + def callback(msg: str): + # 处理 MCP 服务的错误日志 + self.server_errlogs.append(msg) + stdio_transport = await self.exit_stack.enter_async_context( - mcp.stdio_client(server_params) + mcp.stdio_client( + server_params, + errlog=LogPipe( + level=logging.ERROR, + logger=logger, + identifier=f"MCPServer-{name}", + callback=callback, + ), + ), ) self.stdio, self.write = stdio_transport self.session = await self.exit_stack.enter_async_context( mcp.ClientSession(self.stdio, self.write) ) - await self.session.initialize() async def list_tools_and_save(self) -> mcp.ListToolsResult: @@ -266,7 +280,9 @@ class FuncCall: self.func_list = [ f for f in self.func_list - if not (f.origin == "mcp" and f.mcp_server_name == data["name"]) + if not ( + f.origin == "mcp" and f.mcp_server_name == data["name"] + ) ] else: for name in self.mcp_client_dict.keys(): @@ -275,11 +291,7 @@ class FuncCall: if name in self.mcp_client_event: self.mcp_client_event[name].set() self.mcp_client_event.pop(name, None) - self.func_list = [ - f - for f in self.func_list - if f.origin != "mcp" - ] + self.func_list = [f for f in self.func_list if f.origin != "mcp"] async def _init_mcp_client_task_wrapper( self, name: str, cfg: dict, event: asyncio.Event @@ -291,6 +303,9 @@ class FuncCall: logger.info(f"收到 MCP 客户端 {name} 终止信号") await self._terminate_mcp_client(name) except Exception as e: + import traceback + + traceback.print_exc() logger.error(f"初始化 MCP 客户端 {name} 失败: {e}") async def _init_mcp_client(self, name: str, config: dict) -> None: @@ -302,10 +317,10 @@ class FuncCall: mcp_client = MCPClient() mcp_client.name = name - await mcp_client.connect_to_server(config) + self.mcp_client_dict[name] = mcp_client + await mcp_client.connect_to_server(config, name) tools_res = await mcp_client.list_tools_and_save() tool_names = [tool.name for tool in tools_res.tools] - self.mcp_client_dict[name] = mcp_client # 移除该MCP服务之前的工具(如有) self.func_list = [ @@ -329,6 +344,9 @@ class FuncCall: logger.info(f"已连接 MCP 服务 {name}, Tools: {tool_names}") return True except Exception as e: + import traceback + + logger.error(traceback.format_exc()) logger.error(f"初始化 MCP 客户端 {name} 失败: {e}") # 发生错误时确保客户端被清理 if name in self.mcp_client_dict: @@ -352,7 +370,7 @@ class FuncCall: ] logger.info(f"已关闭 MCP 服务 {name}") - def get_func_desc_openai_style(self, omit_empty_parameter_field = False) -> list: + def get_func_desc_openai_style(self, omit_empty_parameter_field=False) -> list: """ 获得 OpenAI API 风格的**已经激活**的工具描述 """ diff --git a/astrbot/core/utils/log_pipe.py b/astrbot/core/utils/log_pipe.py new file mode 100644 index 000000000..bf5402f17 --- /dev/null +++ b/astrbot/core/utils/log_pipe.py @@ -0,0 +1,36 @@ +import threading +import os +from logging import Logger + + +class LogPipe(threading.Thread): + def __init__( + self, + level, + logger: Logger, + identifier=None, + callback=None, + ): + threading.Thread.__init__(self) + self.daemon = True + self.level = level + self.fd_read, self.fd_write = os.pipe() + self.identifier = identifier + self.logger = logger + self.callback = callback + self.reader = os.fdopen(self.fd_read) + self.start() + + def fileno(self): + return self.fd_write + + def run(self): + for line in iter(self.reader.readline, ""): + if self.callback: + self.callback(line.strip()) + self.logger.log(self.level, f"[{self.identifier}] {line.strip()}") + + self.reader.close() + + def close(self): + os.close(self.fd_write) diff --git a/astrbot/dashboard/routes/tools.py b/astrbot/dashboard/routes/tools.py index 9a3c5aa53..6dd093546 100644 --- a/astrbot/dashboard/routes/tools.py +++ b/astrbot/dashboard/routes/tools.py @@ -1,5 +1,6 @@ import os import json +import aiohttp import traceback from .route import Route, Response, RouteContext from quart import request @@ -20,6 +21,7 @@ class ToolsRoute(Route): "/tools/mcp/add": ("POST", self.add_mcp_server), "/tools/mcp/update": ("POST", self.update_mcp_server), "/tools/mcp/delete": ("POST", self.delete_mcp_server), + "/tools/mcp/market": ("GET", self.get_mcp_markets), } self.register_routes() self.tool_mgr = self.core_lifecycle.provider_manager.llm_tools @@ -78,6 +80,7 @@ class ToolsRoute(Route): ) in self.tool_mgr.mcp_client_dict.items(): if name_key == name: server_info["tools"] = [tool.name for tool in mcp_client.tools] + server_info["errlogs"] = mcp_client.server_errlogs break else: server_info["tools"] = [] @@ -105,9 +108,11 @@ class ToolsRoute(Route): # 复制所有配置字段 for key, value in server_data.items(): - if key not in ["name", "active", "tools"]: # 排除特殊字段 + if key not in ["name", "active", "tools", "errlogs"]: # 排除特殊字段 if key == "mcpServers": - key_0 = list(server_data["mcpServers"].keys())[0] # 不考虑为空的情况 + key_0 = list(server_data["mcpServers"].keys())[ + 0 + ] # 不考虑为空的情况 server_config = server_data["mcpServers"][key_0] else: server_config[key] = value @@ -125,7 +130,7 @@ class ToolsRoute(Route): if self.save_mcp_config(config): # 动态初始化新MCP客户端 - self.tool_mgr.mcp_service_queue.put_nowait( + await self.tool_mgr.mcp_service_queue.put( { "type": "init", "name": name, @@ -166,9 +171,11 @@ class ToolsRoute(Route): # 复制所有配置字段 for key, value in server_data.items(): - if key not in ["name", "active", "tools"]: # 排除特殊字段 + if key not in ["name", "active", "tools", "errlogs"]: # 排除特殊字段 if key == "mcpServers": - key_0 = list(server_data["mcpServers"].keys())[0] # 不考虑为空的情况 + key_0 = list(server_data["mcpServers"].keys())[ + 0 + ] # 不考虑为空的情况 server_config = server_data["mcpServers"][key_0] else: server_config[key] = value @@ -202,7 +209,7 @@ class ToolsRoute(Route): ) else: # 客户端不存在,初始化 - self.tool_mgr.mcp_service_queue.put_nowait( + await self.tool_mgr.mcp_service_queue.put( { "type": "init", "name": name, @@ -258,3 +265,26 @@ class ToolsRoute(Route): except Exception as e: logger.error(traceback.format_exc()) return Response().error(f"删除 MCP 服务器失败: {str(e)}").__dict__ + + async def get_mcp_markets(self): + page = request.args.get("page", 1, type=int) + page_size = request.args.get("page_size", 10, type=int) + BASE_URL = "https://api.soulter.top/astrbot/mcpservers?page={}&page_size={}".format( + page, + page_size, + ) + try: + async with aiohttp.ClientSession() as session: + async with session.get(f"{BASE_URL}") as response: + if response.status == 200: + data = await response.json() + return Response().ok(data["data"]).__dict__ + else: + return ( + Response() + .error(f"获取市场数据失败: HTTP {response.status}") + .__dict__ + ) + except Exception as _: + logger.error(traceback.format_exc()) + return Response().error("获取市场数据失败").__dict__ \ No newline at end of file diff --git a/dashboard/src/components/shared/ExtensionCard.vue b/dashboard/src/components/shared/ExtensionCard.vue index e6573260c..73da2f4b5 100644 --- a/dashboard/src/components/shared/ExtensionCard.vue +++ b/dashboard/src/components/shared/ExtensionCard.vue @@ -81,7 +81,7 @@ const viewReadme = () => {
{{ extension.author }} /
-

+

{{ extension.name }}