From 8e431e20763ff0be43b73c2f81db7a02f1c4da0d Mon Sep 17 00:00:00 2001 From: Windy_cold Date: Sun, 8 Mar 2026 20:53:56 +0800 Subject: [PATCH 01/21] correct openrouter api_base (#5911) --- astrbot/core/config/default.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index 71608b7ad..ad8873e43 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -1123,7 +1123,7 @@ CONFIG_METADATA_2 = { "enable": True, "key": [], "timeout": 120, - "api_base": "https://openrouter.ai/v1", + "api_base": "https://openrouter.ai/api/v1", "proxy": "", "custom_headers": {}, }, From 7f3c0fdeb2b216315849df06657c570749a6c6ef Mon Sep 17 00:00:00 2001 From: Soulter <37870767+Soulter@users.noreply.github.com> Date: Sun, 8 Mar 2026 23:18:56 +0800 Subject: [PATCH 02/21] fix: cannot receive image, file in dingtalk (#5920) fixes: #5916 #5786 --- .../sources/dingtalk/dingtalk_adapter.py | 102 ++++++++++++++++-- 1 file changed, 95 insertions(+), 7 deletions(-) diff --git a/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py b/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py index 2d9b45cc1..105af8c77 100644 --- a/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py +++ b/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py @@ -11,7 +11,7 @@ from dingtalk_stream import AckMessage from astrbot import logger from astrbot.api.event import MessageChain -from astrbot.api.message_components import At, Image, Plain, Record, Video +from astrbot.api.message_components import At, File, Image, Plain, Record, Video from astrbot.api.platform import ( AstrBotMessage, MessageMember, @@ -178,10 +178,38 @@ class DingtalkPlatformAdapter(Platform): abm.session_id = abm.sender.user_id message_type: str = cast(str, message.message_type) + robot_code = cast(str, message.robot_code or "") + raw_content = cast(dict, message.extensions.get("content") or {}) + if not isinstance(raw_content, dict): + raw_content = {} match message_type: case "text": abm.message_str = message.text.content.strip() abm.message.append(Plain(abm.message_str)) + case "picture": + if not robot_code: + logger.error("钉钉图片消息解析失败: 回调中缺少 robotCode") + await self._remember_sender_binding(message, abm) + return abm + image_content = cast( + dingtalk_stream.ImageContent | None, + message.image_content, + ) + download_code = cast( + str, (image_content.download_code if image_content else "") or "" + ) + if not download_code: + logger.warning("钉钉图片消息缺少 downloadCode,已跳过") + else: + f_path = await self.download_ding_file( + download_code, + robot_code, + "jpg", + ) + if f_path: + abm.message.append(Image.fromFileSystem(f_path)) + else: + logger.warning("钉钉图片消息下载失败,无法解析为图片") case "richText": rtc: dingtalk_stream.RichTextContent = cast( dingtalk_stream.RichTextContent, message.rich_text_content @@ -193,14 +221,64 @@ class DingtalkPlatformAdapter(Platform): plains += content["text"] abm.message.append(Plain(plains)) elif "type" in content and content["type"] == "picture": + download_code = cast(str, content.get("downloadCode") or "") + if not download_code: + logger.warning( + "钉钉富文本图片消息缺少 downloadCode,已跳过" + ) + continue + if not robot_code: + logger.error( + "钉钉富文本图片消息解析失败: 回调中缺少 robotCode" + ) + continue f_path = await self.download_ding_file( - content["downloadCode"], - cast(str, message.robot_code), + download_code, + robot_code, "jpg", ) - abm.message.append(Image.fromFileSystem(f_path)) - case "audio": - pass + if f_path: + abm.message.append(Image.fromFileSystem(f_path)) + case "audio" | "voice": + download_code = cast(str, raw_content.get("downloadCode") or "") + if not download_code: + logger.warning("钉钉语音消息缺少 downloadCode,已跳过") + elif not robot_code: + logger.error("钉钉语音消息解析失败: 回调中缺少 robotCode") + else: + voice_ext = cast(str, raw_content.get("fileExtension") or "") + if not voice_ext: + voice_ext = "amr" + voice_ext = voice_ext.lstrip(".") + f_path = await self.download_ding_file( + download_code, + robot_code, + voice_ext, + ) + if f_path: + abm.message.append(Record.fromFileSystem(f_path)) + case "file": + download_code = cast(str, raw_content.get("downloadCode") or "") + if not download_code: + logger.warning("钉钉文件消息缺少 downloadCode,已跳过") + elif not robot_code: + logger.error("钉钉文件消息解析失败: 回调中缺少 robotCode") + else: + file_name = cast(str, raw_content.get("fileName") or "") + file_ext = Path(file_name).suffix.lstrip(".") if file_name else "" + if not file_ext: + file_ext = cast(str, raw_content.get("fileExtension") or "") + if not file_ext: + file_ext = "file" + f_path = await self.download_ding_file( + download_code, + robot_code, + file_ext, + ) + if f_path: + if not file_name: + file_name = Path(f_path).name + abm.message.append(File(name=file_name, file=f_path)) await self._remember_sender_binding(message, abm) return abm # 别忘了返回转换后的消息对象 @@ -270,7 +348,17 @@ class DingtalkPlatformAdapter(Platform): ) return "" resp_data = await resp.json() - download_url = resp_data["data"]["downloadUrl"] + download_url = cast( + str, + ( + resp_data.get("downloadUrl") + or resp_data.get("data", {}).get("downloadUrl") + or "" + ), + ) + if not download_url: + logger.error(f"下载钉钉文件失败: 未找到 downloadUrl, 响应: {resp_data}") + return "" await download_file(download_url, str(f_path)) return str(f_path) From 537849c1e7b7281473522cbba4a116cecbe98e3a Mon Sep 17 00:00:00 2001 From: Soulter <37870767+Soulter@users.noreply.github.com> Date: Sun, 8 Mar 2026 23:31:11 +0800 Subject: [PATCH 03/21] fix(dingtalk): text is ignored; cannot send file actively (#5921) --- .../sources/dingtalk/dingtalk_adapter.py | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py b/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py index 105af8c77..7982a6593 100644 --- a/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py +++ b/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py @@ -215,11 +215,13 @@ class DingtalkPlatformAdapter(Platform): dingtalk_stream.RichTextContent, message.rich_text_content ) contents: list[dict] = cast(list[dict], rtc.rich_text_list) + plain_parts: list[str] = [] for content in contents: - plains = "" if "text" in content: - plains += content["text"] - abm.message.append(Plain(plains)) + plain_text = cast(str, content.get("text") or "") + if plain_text: + plain_parts.append(plain_text) + abm.message.append(Plain(plain_text)) elif "type" in content and content["type"] == "picture": download_code = cast(str, content.get("downloadCode") or "") if not download_code: @@ -239,6 +241,7 @@ class DingtalkPlatformAdapter(Platform): ) if f_path: abm.message.append(Image.fromFileSystem(f_path)) + abm.message_str = "".join(plain_parts).strip() case "audio" | "voice": download_code = cast(str, raw_content.get("downloadCode") or "") if not download_code: @@ -629,6 +632,28 @@ class DingtalkPlatformAdapter(Platform): self._safe_remove_file(cover_path) if converted_video: self._safe_remove_file(video_path) + elif isinstance(segment, File): + try: + file_path = await segment.get_file() + if not file_path: + logger.warning("钉钉文件发送失败: 无法解析文件路径") + continue + media_id = await self.upload_media(file_path, "file") + if not media_id: + continue + file_name = segment.name or Path(file_path).name + file_type = Path(file_name).suffix.lstrip(".") + await send_message( + msg_key="sampleFile", + msg_param={ + "mediaId": media_id, + "fileName": file_name, + "fileType": file_type, + }, + ) + except Exception as e: + logger.warning(f"钉钉文件发送失败: {e}") + continue async def send_message_chain_to_group( self, From 5808784f07e936194d5b4c7a204e56fc5ddf926a Mon Sep 17 00:00:00 2001 From: sanyekana Date: Sun, 8 Mar 2026 23:46:32 +0800 Subject: [PATCH 04/21] fix: prevent crash on malformed MCP server config (#5666) (#5673) * fix: prevent crash on malformed MCP server config (#5666) * fix: prevent crash on malformed MCP server config (#5666) * fix: validate MCP connection before persisting server config * fix: guard mcpServers type before iterating server list * refactor: use typed empty-config error and extract MCP rollback helper * fix: translate error messages and comments to English for consistency --------- Co-authored-by: Soulter <905617992@qq.com> --- astrbot/dashboard/routes/tools.py | 253 +++++++++++++----- .../extension/McpServersSection.vue | 8 + 2 files changed, 193 insertions(+), 68 deletions(-) diff --git a/astrbot/dashboard/routes/tools.py b/astrbot/dashboard/routes/tools.py index b19385c28..84f8dcc6d 100644 --- a/astrbot/dashboard/routes/tools.py +++ b/astrbot/dashboard/routes/tools.py @@ -12,6 +12,32 @@ from .route import Response, Route, RouteContext DEFAULT_MCP_CONFIG = {"mcpServers": {}} +class EmptyMcpServersError(ValueError): + """Raised when mcpServers is empty.""" + + pass + + +def _extract_mcp_server_config(mcp_servers_value: object) -> dict: + """Extract server configuration from user-submitted mcpServers field. + + Raises: + ValueError: Invalid configuration + """ + if not isinstance(mcp_servers_value, dict): + raise ValueError("mcpServers must be a JSON object") + if not mcp_servers_value: + raise EmptyMcpServersError("mcpServers configuration cannot be empty") + key_0 = next(iter(mcp_servers_value)) + extracted = mcp_servers_value[key_0] + if not isinstance(extracted, dict): + raise ValueError( + "Invalid mcpServers format. Ensure each key in mcpServers is a server name, " + "and each value is an object containing fields like command/url." + ) + return extracted + + class ToolsRoute(Route): def __init__( self, @@ -33,13 +59,37 @@ class ToolsRoute(Route): self.register_routes() self.tool_mgr = self.core_lifecycle.provider_manager.llm_tools + def _rollback_mcp_server(self, name: str) -> bool: + try: + rollback_config = self.tool_mgr.load_mcp_config() + if name in rollback_config["mcpServers"]: + rollback_config["mcpServers"].pop(name) + return self.tool_mgr.save_mcp_config(rollback_config) + return True + except Exception: + logger.error(traceback.format_exc()) + return False + async def get_mcp_servers(self): try: config = self.tool_mgr.load_mcp_config() servers = [] + mcp_servers = config.get("mcpServers", {}) + + if not isinstance(mcp_servers, dict): + logger.warning( + f"Invalid MCP server config type: {type(mcp_servers).__name__}. Expected object/dict; skipped all MCP servers." + ) + mcp_servers = {} # 获取所有服务器并添加它们的工具列表 - for name, server_config in config["mcpServers"].items(): + for name, server_config in mcp_servers.items(): + if not isinstance(server_config, dict): + logger.warning( + f"Invalid config for MCP server '{name}' (type: {type(server_config).__name__}); skipped." + ) + continue + server_info = { "name": name, "active": server_config.get("active", True), @@ -65,7 +115,7 @@ class ToolsRoute(Route): return Response().ok(servers).__dict__ except Exception as e: logger.error(traceback.format_exc()) - return Response().error(f"获取 MCP 服务器列表失败: {e!s}").__dict__ + return Response().error(f"Failed to get MCP server list: {e!s}").__dict__ async def add_mcp_server(self): try: @@ -75,7 +125,7 @@ class ToolsRoute(Route): # 检查必填字段 if not name: - return Response().error("服务器名称不能为空").__dict__ + return Response().error("Server name cannot be empty").__dict__ # 移除特殊字段并检查配置是否有效 has_valid_config = False @@ -85,21 +135,33 @@ class ToolsRoute(Route): for key, value in server_data.items(): if key not in ["name", "active", "tools", "errlogs"]: # 排除特殊字段 if key == "mcpServers": - key_0 = list(server_data["mcpServers"].keys())[ - 0 - ] # 不考虑为空的情况 - server_config = server_data["mcpServers"][key_0] + try: + server_config = _extract_mcp_server_config( + server_data["mcpServers"] + ) + except ValueError as e: + return Response().error(f"{e!s}").__dict__ else: server_config[key] = value has_valid_config = True if not has_valid_config: - return Response().error("必须提供有效的服务器配置").__dict__ + return ( + Response() + .error("A valid server configuration is required") + .__dict__ + ) config = self.tool_mgr.load_mcp_config() if name in config["mcpServers"]: - return Response().error(f"服务器 {name} 已存在").__dict__ + return Response().error(f"Server {name} already exists").__dict__ + + try: + await self.tool_mgr.test_mcp_server_connection(server_config) + except Exception as e: + logger.error(traceback.format_exc()) + return Response().error(f"MCP connection test failed: {e!s}").__dict__ config["mcpServers"][name] = server_config @@ -111,17 +173,27 @@ class ToolsRoute(Route): timeout=30, ) except TimeoutError: - return Response().error(f"启用 MCP 服务器 {name} 超时。").__dict__ + rollback_ok = self._rollback_mcp_server(name) + err_msg = f"Timed out while enabling MCP server {name}." + if not rollback_ok: + err_msg += " Configuration rollback failed. Please check the config manually." + return Response().error(err_msg).__dict__ except Exception as e: logger.error(traceback.format_exc()) - return ( - Response().error(f"启用 MCP 服务器 {name} 失败: {e!s}").__dict__ - ) - return Response().ok(None, f"成功添加 MCP 服务器 {name}").__dict__ - return Response().error("保存配置失败").__dict__ + rollback_ok = self._rollback_mcp_server(name) + err_msg = f"Failed to enable MCP server {name}: {e!s}" + if not rollback_ok: + err_msg += " Configuration rollback failed. Please check the config manually." + return Response().error(err_msg).__dict__ + return ( + Response() + .ok(None, f"Successfully added MCP server {name}") + .__dict__ + ) + return Response().error("Failed to save configuration").__dict__ except Exception as e: logger.error(traceback.format_exc()) - return Response().error(f"添加 MCP 服务器失败: {e!s}").__dict__ + return Response().error(f"Failed to add MCP server: {e!s}").__dict__ async def update_mcp_server(self): try: @@ -131,23 +203,25 @@ class ToolsRoute(Route): old_name = server_data.get("oldName") or name if not name: - return Response().error("服务器名称不能为空").__dict__ + return Response().error("Server name cannot be empty").__dict__ config = self.tool_mgr.load_mcp_config() if old_name not in config["mcpServers"]: - return Response().error(f"服务器 {old_name} 不存在").__dict__ + return Response().error(f"Server {old_name} does not exist").__dict__ is_rename = name != old_name if name in config["mcpServers"] and is_rename: - return Response().error(f"服务器 {name} 已存在").__dict__ + return Response().error(f"Server {name} already exists").__dict__ # 获取活动状态 - active = server_data.get( - "active", - config["mcpServers"][old_name].get("active", True), - ) + old_config = config["mcpServers"][old_name] + if isinstance(old_config, dict): + old_active = old_config.get("active", True) + else: + old_active = True + active = server_data.get("active", old_active) # 创建新的配置对象 server_config = {"active": active} @@ -165,17 +239,19 @@ class ToolsRoute(Route): "oldName", ]: # 排除特殊字段 if key == "mcpServers": - key_0 = list(server_data["mcpServers"].keys())[ - 0 - ] # 不考虑为空的情况 - server_config = server_data["mcpServers"][key_0] + try: + server_config = _extract_mcp_server_config( + server_data["mcpServers"] + ) + except ValueError as e: + return Response().error(f"{e!s}").__dict__ else: server_config[key] = value only_update_active = False # 如果只更新活动状态,保留原始配置 - if only_update_active: - for key, value in config["mcpServers"][old_name].items(): + if only_update_active and isinstance(old_config, dict): + for key, value in old_config.items(): if key != "active": # 除了active之外的所有字段都保留 server_config[key] = value @@ -200,7 +276,7 @@ class ToolsRoute(Route): return ( Response() .error( - f"启用前停用 MCP 服务器时 {old_name} 超时: {e!s}" + f"Timed out while disabling MCP server {old_name} before enabling: {e!s}" ) .__dict__ ) @@ -209,7 +285,7 @@ class ToolsRoute(Route): return ( Response() .error( - f"启用前停用 MCP 服务器时 {old_name} 失败: {e!s}" + f"Failed to disable MCP server {old_name} before enabling: {e!s}" ) .__dict__ ) @@ -221,13 +297,15 @@ class ToolsRoute(Route): ) except TimeoutError: return ( - Response().error(f"启用 MCP 服务器 {name} 超时。").__dict__ + Response() + .error(f"Timed out while enabling MCP server {name}.") + .__dict__ ) except Exception as e: logger.error(traceback.format_exc()) return ( Response() - .error(f"启用 MCP 服务器 {name} 失败: {e!s}") + .error(f"Failed to enable MCP server {name}: {e!s}") .__dict__ ) # 如果要停用服务器 @@ -237,22 +315,26 @@ class ToolsRoute(Route): except TimeoutError: return ( Response() - .error(f"停用 MCP 服务器 {old_name} 超时。") + .error(f"Timed out while disabling MCP server {old_name}.") .__dict__ ) except Exception as e: logger.error(traceback.format_exc()) return ( Response() - .error(f"停用 MCP 服务器 {old_name} 失败: {e!s}") + .error(f"Failed to disable MCP server {old_name}: {e!s}") .__dict__ ) - return Response().ok(None, f"成功更新 MCP 服务器 {name}").__dict__ - return Response().error("保存配置失败").__dict__ + return ( + Response() + .ok(None, f"Successfully updated MCP server {name}") + .__dict__ + ) + return Response().error("Failed to save configuration").__dict__ except Exception as e: logger.error(traceback.format_exc()) - return Response().error(f"更新 MCP 服务器失败: {e!s}").__dict__ + return Response().error(f"Failed to update MCP server: {e!s}").__dict__ async def delete_mcp_server(self): try: @@ -260,12 +342,12 @@ class ToolsRoute(Route): name = server_data.get("name", "") if not name: - return Response().error("服务器名称不能为空").__dict__ + return Response().error("Server name cannot be empty").__dict__ config = self.tool_mgr.load_mcp_config() if name not in config["mcpServers"]: - return Response().error(f"服务器 {name} 不存在").__dict__ + return Response().error(f"Server {name} does not exist").__dict__ del config["mcpServers"][name] @@ -275,51 +357,76 @@ class ToolsRoute(Route): await self.tool_mgr.disable_mcp_server(name, timeout=10) except TimeoutError: return ( - Response().error(f"停用 MCP 服务器 {name} 超时。").__dict__ + Response() + .error(f"Timed out while disabling MCP server {name}.") + .__dict__ ) except Exception as e: logger.error(traceback.format_exc()) return ( Response() - .error(f"停用 MCP 服务器 {name} 失败: {e!s}") + .error(f"Failed to disable MCP server {name}: {e!s}") .__dict__ ) - return Response().ok(None, f"成功删除 MCP 服务器 {name}").__dict__ - return Response().error("保存配置失败").__dict__ + return ( + Response() + .ok(None, f"Successfully deleted MCP server {name}") + .__dict__ + ) + return Response().error("Failed to save configuration").__dict__ except Exception as e: logger.error(traceback.format_exc()) - return Response().error(f"删除 MCP 服务器失败: {e!s}").__dict__ + return Response().error(f"Failed to delete MCP server: {e!s}").__dict__ async def test_mcp_connection(self): - """测试 MCP 服务器连接""" + """Test MCP server connection.""" try: server_data = await request.json config = server_data.get("mcp_server_config", None) if not isinstance(config, dict) or not config: - return Response().error("无效的 MCP 服务器配置").__dict__ + return Response().error("Invalid MCP server configuration").__dict__ if "mcpServers" in config: - keys = list(config["mcpServers"].keys()) - if not keys: - return Response().error("MCP 服务器配置不能为空").__dict__ - if len(keys) > 1: - return Response().error("一次只能配置一个 MCP 服务器配置").__dict__ - config = config["mcpServers"][keys[0]] + mcp_servers = config["mcpServers"] + if isinstance(mcp_servers, dict) and len(mcp_servers) > 1: + return ( + Response() + .error( + "Only one MCP server configuration can be tested at a time" + ) + .__dict__ + ) + try: + config = _extract_mcp_server_config(mcp_servers) + except EmptyMcpServersError: + return ( + Response() + .error("MCP server configuration cannot be empty") + .__dict__ + ) + except ValueError as e: + return Response().error(f"{e!s}").__dict__ elif not config: - return Response().error("MCP 服务器配置不能为空").__dict__ + return ( + Response() + .error("MCP server configuration cannot be empty") + .__dict__ + ) tools_name = await self.tool_mgr.test_mcp_server_connection(config) return ( - Response().ok(data=tools_name, message="🎉 MCP 服务器可用!").__dict__ + Response() + .ok(data=tools_name, message="🎉 MCP server is available!") + .__dict__ ) except Exception as e: logger.error(traceback.format_exc()) - return Response().error(f"测试 MCP 连接失败: {e!s}").__dict__ + return Response().error(f"Failed to test MCP connection: {e!s}").__dict__ async def get_tool_list(self): - """获取所有注册的工具列表""" + """Get all registered tools.""" try: tools = self.tool_mgr.func_list tools_dict = [] @@ -349,36 +456,44 @@ class ToolsRoute(Route): return Response().ok(data=tools_dict).__dict__ except Exception as e: logger.error(traceback.format_exc()) - return Response().error(f"获取工具列表失败: {e!s}").__dict__ + return Response().error(f"Failed to get tool list: {e!s}").__dict__ async def toggle_tool(self): - """启用或停用指定的工具""" + """Activate or deactivate a specified tool.""" try: data = await request.json tool_name = data.get("name") action = data.get("activate") # True or False if not tool_name or action is None: - return Response().error("缺少必要参数: name 或 action").__dict__ + return ( + Response() + .error("Missing required parameters: name or activate") + .__dict__ + ) if action: try: ok = self.tool_mgr.activate_llm_tool(tool_name, star_map=star_map) except ValueError as e: - return Response().error(f"启用工具失败: {e!s}").__dict__ + return Response().error(f"Failed to activate tool: {e!s}").__dict__ else: ok = self.tool_mgr.deactivate_llm_tool(tool_name) if ok: - return Response().ok(None, "操作成功。").__dict__ - return Response().error(f"工具 {tool_name} 不存在或操作失败。").__dict__ + return Response().ok(None, "Operation successful.").__dict__ + return ( + Response() + .error(f"Tool {tool_name} does not exist or the operation failed.") + .__dict__ + ) except Exception as e: logger.error(traceback.format_exc()) - return Response().error(f"操作工具失败: {e!s}").__dict__ + return Response().error(f"Failed to operate tool: {e!s}").__dict__ async def sync_provider(self): - """同步 MCP 提供者配置""" + """Sync MCP provider configuration.""" try: data = await request.json provider_name = data.get("name") # modelscope, or others @@ -387,9 +502,11 @@ class ToolsRoute(Route): access_token = data.get("access_token", "") await self.tool_mgr.sync_modelscope_mcp_servers(access_token) case _: - return Response().error(f"未知: {provider_name}").__dict__ + return ( + Response().error(f"Unknown provider: {provider_name}").__dict__ + ) - return Response().ok(message="同步成功").__dict__ + return Response().ok(message="Sync completed").__dict__ except Exception as e: logger.error(traceback.format_exc()) - return Response().error(f"同步失败: {e!s}").__dict__ + return Response().error(f"Sync failed: {e!s}").__dict__ diff --git a/dashboard/src/components/extension/McpServersSection.vue b/dashboard/src/components/extension/McpServersSection.vue index 95b679580..d24bcec58 100644 --- a/dashboard/src/components/extension/McpServersSection.vue +++ b/dashboard/src/components/extension/McpServersSection.vue @@ -300,6 +300,10 @@ export default { this.loadingGettingServers = true; axios.get('/api/tools/mcp/servers') .then(response => { + if (response.data.status === 'error') { + this.showError(response.data.message || this.tm('messages.getServersError', { error: 'Unknown error' })); + return; + } this.mcpServers = response.data.data || []; this.mcpServers.forEach(server => { if (!this.mcpServerUpdateLoaders[server.name]) { @@ -372,6 +376,10 @@ export default { axios.post(endpoint, serverData) .then(response => { this.loading = false; + if (response.data.status === 'error') { + this.showError(response.data.message || this.tm('messages.saveError', { error: 'Unknown error' })); + return; + } this.showMcpServerDialog = false; this.addServerDialogMessage = ''; this.getServers(); From 3fd6c4c8a61b310da98a5d323739f1a08792991f Mon Sep 17 00:00:00 2001 From: whatevertogo <149563971+whatevertogo@users.noreply.github.com> Date: Mon, 9 Mar 2026 00:00:13 +0800 Subject: [PATCH 05/21] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20asyncio=20?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E5=BE=AA=E7=8E=AF=E7=9B=B8=E5=85=B3=E9=97=AE?= =?UTF-8?q?=E9=A2=98=20(#5774)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: 修复 asyncio 事件循环相关的问题 1. components.py: 修复异常处理结构错误 - 将 except Exception 移到正确的内部 try 块 - 确保 _download_file() 异常能被正确捕获和记录 2. session_lock.py: 修复跨事件循环 Lock 绑定问题 - 添加 _access_lock_loop_id 追踪事件循环 - 当事件循环变化时重新创建 Lock Co-Authored-By: Claude Opus 4.6 * fix: 根据代码审查反馈修复问题 1. components.py: 移除 asyncio.set_event_loop() 调用 - 创建临时 event loop 时不再设置为全局 - 避免干扰其他 asyncio 使用 2. session_lock.py: 简化延迟初始化逻辑 - 移除 loop-ID 追踪和 _get_lock 方法 - 使用 setdefault 简化 session lock 创建 - 保留延迟初始化行为 3. wecomai_queue_mgr.py: 使用 time.monotonic() 替代 loop.time() - 同步方法不再依赖活动的 event loop - 避免在非异步上下文中抛出 RuntimeError Co-Authored-By: Claude Opus 4.6 * fix: 优化 asyncio 事件循环管理,使用安全的方式创建和关闭事件循环 * fix: 根据代码审查反馈改进异常处理和事件循环使用 - main.py: 显式处理 check_dashboard_files() 返回 None 的情况 - components.py: 使用 logger.exception 保留异常堆栈信息 - star_manager.py: 添加 Future 异常回调处理 __del__ 执行异常 - bay_manager.py: 缓存事件循环引用避免重复调用 Co-Authored-By: Claude Opus 4.6 * refactor: 简化 SessionLockManager 使用 defaultdict 和 setdefault - 使用 defaultdict(asyncio.Lock) 简化锁的懒创建 - 使用 setdefault 简化 _get_loop_state 逻辑 - 减少 get + if 分支,提升可读性 Co-Authored-By: Claude Opus 4.6 * fix: 降低 webui_dir 检查失败时的日志级别为 warning 改为警告而非退出,允许程序在无 WebUI 的情况下继续运行 Co-Authored-By: Claude Opus 4.6 * refactor: 重构事件循环锁管理,简化锁状态管理逻辑 * 新增对 SessionLockManager 的多事件循环隔离测试 * fix: 修复测试中的变量声明和断言,确保事件循环管理器的正确性 * fix: 修复插件删除时异常处理逻辑,确保正确记录错误信息 * fix: 新增针对多个事件循环的 OneBot 实例的测试,确保锁对象在不同事件循环间不共享 --------- Co-authored-by: whatevertogo Co-authored-by: Claude Opus 4.6 --- .../dashscope/dashscope_agent_runner.py | 4 +- astrbot/core/computer/booters/bay_manager.py | 5 +- astrbot/core/message/components.py | 27 +- .../sources/dingtalk/dingtalk_adapter.py | 4 +- .../qqofficial/qqofficial_message_event.py | 4 +- .../qqofficial_webhook/qo_webhook_server.py | 2 +- .../platform/sources/telegram/tg_event.py | 12 +- .../platform/sources/wecom/wecom_adapter.py | 10 +- .../sources/wecom_ai_bot/wecomai_queue_mgr.py | 9 +- .../weixin_offacc_adapter.py | 4 +- .../core/provider/sources/dashscope_tts.py | 4 +- astrbot/core/provider/sources/genie_tts.py | 4 +- .../sources/sensevoice_selfhosted_source.py | 4 +- .../sources/whisper_selfhosted_source.py | 4 +- astrbot/core/star/star_manager.py | 15 +- astrbot/core/utils/session_lock.py | 28 +- main.py | 33 +- tests/unit/test_session_lock.py | 545 ++++++++++++++++++ 18 files changed, 659 insertions(+), 59 deletions(-) create mode 100644 tests/unit/test_session_lock.py diff --git a/astrbot/core/agent/runners/dashscope/dashscope_agent_runner.py b/astrbot/core/agent/runners/dashscope/dashscope_agent_runner.py index 1aaf6e3b9..8169a678c 100644 --- a/astrbot/core/agent/runners/dashscope/dashscope_agent_runner.py +++ b/astrbot/core/agent/runners/dashscope/dashscope_agent_runner.py @@ -302,7 +302,7 @@ class DashscopeAgentRunner(BaseAgentRunner[TContext]): while True: try: - item_type, item_data = await asyncio.get_event_loop().run_in_executor( + item_type, item_data = await asyncio.get_running_loop().run_in_executor( None, response_queue.get, True, 1 ) except queue.Empty: @@ -388,7 +388,7 @@ class DashscopeAgentRunner(BaseAgentRunner[TContext]): # 发起请求 partial = functools.partial(Application.call, **payload) - response = await asyncio.get_event_loop().run_in_executor(None, partial) + response = await asyncio.get_running_loop().run_in_executor(None, partial) async for resp in self._handle_streaming_response(response, session_id): yield resp diff --git a/astrbot/core/computer/booters/bay_manager.py b/astrbot/core/computer/booters/bay_manager.py index 24fa379e8..61ccc1b3a 100644 --- a/astrbot/core/computer/booters/bay_manager.py +++ b/astrbot/core/computer/booters/bay_manager.py @@ -121,11 +121,12 @@ class BayContainerManager: async def wait_healthy(self, timeout: int = HEALTH_TIMEOUT_S) -> None: """Block until Bay's ``/health`` endpoint returns 200.""" url = f"http://127.0.0.1:{self._host_port}/health" - deadline = asyncio.get_event_loop().time() + timeout + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout last_error: str = "" async with aiohttp.ClientSession() as session: - while asyncio.get_event_loop().time() < deadline: + while loop.time() < deadline: try: async with session.get( url, timeout=aiohttp.ClientTimeout(total=3) diff --git a/astrbot/core/message/components.py b/astrbot/core/message/components.py index 6dbe78ae4..d9ea6aa26 100644 --- a/astrbot/core/message/components.py +++ b/astrbot/core/message/components.py @@ -699,21 +699,24 @@ class File(BaseMessageComponent): if self.url: try: - loop = asyncio.get_event_loop() - if loop.is_running(): - logger.warning( - "不可以在异步上下文中同步等待下载! " - "这个警告通常发生于某些逻辑试图通过 .file 获取文件消息段的文件内容。" - "请使用 await get_file() 代替直接获取 .file 字段", - ) - return "" - # 等待下载完成 - loop.run_until_complete(self._download_file()) + # 检查是否有正在运行的 event loop + asyncio.get_running_loop() + logger.warning( + "不可以在异步上下文中同步等待下载! " + "这个警告通常发生于某些逻辑试图通过 .file 获取文件消息段的文件内容。" + "请使用 await get_file() 代替直接获取 .file 字段", + ) + return "" + except RuntimeError: + # 没有运行中的 event loop,可以同步执行 + try: + # 使用 asyncio.run 安全地创建和关闭事件循环 + asyncio.run(self._download_file()) + except Exception: + logger.exception("文件下载失败") if self.file_ and os.path.exists(self.file_): return os.path.abspath(self.file_) - except Exception as e: - logger.error(f"文件下载失败: {e}") return "" diff --git a/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py b/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py index 7982a6593..37c3b09ab 100644 --- a/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py +++ b/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py @@ -367,7 +367,7 @@ class DingtalkPlatformAdapter(Platform): async def get_access_token(self) -> str: try: - access_token = await asyncio.get_event_loop().run_in_executor( + access_token = await asyncio.get_running_loop().run_in_executor( None, self.client_.get_access_token, ) @@ -760,7 +760,7 @@ class DingtalkPlatformAdapter(Platform): return logger.error(f"钉钉机器人启动失败: {e}") - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() await loop.run_in_executor(None, start_client, loop) async def terminate(self) -> None: diff --git a/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py b/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py index 868ec8a65..2b417f45f 100644 --- a/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py +++ b/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py @@ -80,7 +80,7 @@ class QQOfficialMessageEvent(AstrMessageEvent): if isinstance(source, botpy.message.C2CMessage): # 真流式传输 - current_time = asyncio.get_event_loop().time() + current_time = asyncio.get_running_loop().time() time_since_last_edit = current_time - last_edit_time if time_since_last_edit >= throttle_interval: @@ -90,7 +90,7 @@ class QQOfficialMessageEvent(AstrMessageEvent): ) stream_payload["index"] += 1 stream_payload["id"] = ret["id"] - last_edit_time = asyncio.get_event_loop().time() + last_edit_time = asyncio.get_running_loop().time() if isinstance(source, botpy.message.C2CMessage): # 结束流式对话,并且传输 buffer 中剩余的消息 diff --git a/astrbot/core/platform/sources/qqofficial_webhook/qo_webhook_server.py b/astrbot/core/platform/sources/qqofficial_webhook/qo_webhook_server.py index 5f35471ee..bcd05faf1 100644 --- a/astrbot/core/platform/sources/qqofficial_webhook/qo_webhook_server.py +++ b/astrbot/core/platform/sources/qqofficial_webhook/qo_webhook_server.py @@ -55,7 +55,7 @@ class QQOfficialWebhook: max_async=1, connect=bot_connect, dispatch=self.client.ws_dispatch, - loop=asyncio.get_event_loop(), + loop=asyncio.get_running_loop(), api=self.api, ) diff --git a/astrbot/core/platform/sources/telegram/tg_event.py b/astrbot/core/platform/sources/telegram/tg_event.py index 96c7c5568..43e58960e 100644 --- a/astrbot/core/platform/sources/telegram/tg_event.py +++ b/astrbot/core/platform/sources/telegram/tg_event.py @@ -626,7 +626,7 @@ class TelegramPlatformEvent(AstrMessageEvent): # 发送初始 typing 状态 await self._ensure_typing(user_name, message_thread_id) - last_chat_action_time = asyncio.get_event_loop().time() + last_chat_action_time = asyncio.get_running_loop().time() def _append_text(t: str) -> None: nonlocal delta @@ -657,11 +657,11 @@ class TelegramPlatformEvent(AstrMessageEvent): # 编辑或发送消息 if message_id and len(delta) <= self.MAX_MESSAGE_LENGTH: - current_time = asyncio.get_event_loop().time() + current_time = asyncio.get_running_loop().time() time_since_last_edit = current_time - last_edit_time if time_since_last_edit >= throttle_interval: - current_time = asyncio.get_event_loop().time() + current_time = asyncio.get_running_loop().time() if current_time - last_chat_action_time >= chat_action_interval: await self._ensure_typing(user_name, message_thread_id) last_chat_action_time = current_time @@ -674,9 +674,9 @@ class TelegramPlatformEvent(AstrMessageEvent): current_content = delta except Exception as e: logger.warning(f"编辑消息失败(streaming): {e!s}") - last_edit_time = asyncio.get_event_loop().time() + last_edit_time = asyncio.get_running_loop().time() else: - current_time = asyncio.get_event_loop().time() + current_time = asyncio.get_running_loop().time() if current_time - last_chat_action_time >= chat_action_interval: await self._ensure_typing(user_name, message_thread_id) last_chat_action_time = current_time @@ -688,7 +688,7 @@ class TelegramPlatformEvent(AstrMessageEvent): except Exception as e: logger.warning(f"发送消息失败(streaming): {e!s}") message_id = msg.message_id - last_edit_time = asyncio.get_event_loop().time() + last_edit_time = asyncio.get_running_loop().time() try: if delta and current_content != delta: diff --git a/astrbot/core/platform/sources/wecom/wecom_adapter.py b/astrbot/core/platform/sources/wecom/wecom_adapter.py index 6647db89f..410b30eea 100644 --- a/astrbot/core/platform/sources/wecom/wecom_adapter.py +++ b/astrbot/core/platform/sources/wecom/wecom_adapter.py @@ -200,7 +200,7 @@ class WecomPlatformAdapter(Platform): return msg_list[-1] return None - msg_new = await asyncio.get_event_loop().run_in_executor( + msg_new = await asyncio.get_running_loop().run_in_executor( None, get_latest_msg_item, ) @@ -261,7 +261,7 @@ class WecomPlatformAdapter(Platform): @override async def run(self) -> None: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() if self.kf_name: try: acc_list = ( @@ -339,7 +339,7 @@ class WecomPlatformAdapter(Platform): abm.session_id = abm.sender.user_id abm.raw_message = msg elif isinstance(msg, VoiceMessage): - resp: Response = await asyncio.get_event_loop().run_in_executor( + resp: Response = await asyncio.get_running_loop().run_in_executor( None, self.client.media.download, msg.media_id, @@ -395,7 +395,7 @@ class WecomPlatformAdapter(Platform): abm.message_str = text elif msgtype == "image": media_id = msg.get("image", {}).get("media_id", "") - resp: Response = await asyncio.get_event_loop().run_in_executor( + resp: Response = await asyncio.get_running_loop().run_in_executor( None, self.client.media.download, media_id, @@ -407,7 +407,7 @@ class WecomPlatformAdapter(Platform): abm.message = [Image(file=path, url=path)] elif msgtype == "voice": media_id = msg.get("voice", {}).get("media_id", "") - resp: Response = await asyncio.get_event_loop().run_in_executor( + resp: Response = await asyncio.get_running_loop().run_in_executor( None, self.client.media.download, media_id, diff --git a/astrbot/core/platform/sources/wecom_ai_bot/wecomai_queue_mgr.py b/astrbot/core/platform/sources/wecom_ai_bot/wecomai_queue_mgr.py index 9b6e6b968..efa94b58e 100644 --- a/astrbot/core/platform/sources/wecom_ai_bot/wecomai_queue_mgr.py +++ b/astrbot/core/platform/sources/wecom_ai_bot/wecomai_queue_mgr.py @@ -4,6 +4,7 @@ """ import asyncio +import time from collections.abc import Awaitable, Callable from typing import Any @@ -82,7 +83,7 @@ class WecomAIQueueMgr: del self.pending_responses[session_id] logger.debug(f"[WecomAI] 移除待处理响应: {session_id}") if mark_finished: - self.completed_streams[session_id] = asyncio.get_event_loop().time() + self.completed_streams[session_id] = time.monotonic() logger.debug(f"[WecomAI] 标记流已结束: {session_id}") def remove_queue(self, session_id: str): @@ -135,7 +136,7 @@ class WecomAIQueueMgr: """ self.pending_responses[session_id] = { "callback_params": callback_params, - "timestamp": asyncio.get_event_loop().time(), + "timestamp": time.monotonic(), } logger.debug(f"[WecomAI] 设置待处理响应: {session_id}") @@ -160,7 +161,7 @@ class WecomAIQueueMgr: finished_at = self.completed_streams.get(session_id) if finished_at is None: return False - if asyncio.get_event_loop().time() - finished_at > max_age_seconds: + if time.monotonic() - finished_at > max_age_seconds: self.completed_streams.pop(session_id, None) return False return True @@ -172,7 +173,7 @@ class WecomAIQueueMgr: max_age_seconds: 最大存活时间(秒) """ - current_time = asyncio.get_event_loop().time() + current_time = time.monotonic() expired_sessions = [] for session_id, response_data in self.pending_responses.items(): diff --git a/astrbot/core/platform/sources/weixin_official_account/weixin_offacc_adapter.py b/astrbot/core/platform/sources/weixin_official_account/weixin_offacc_adapter.py index c01355974..bb7061ca1 100644 --- a/astrbot/core/platform/sources/weixin_official_account/weixin_offacc_adapter.py +++ b/astrbot/core/platform/sources/weixin_official_account/weixin_offacc_adapter.py @@ -369,7 +369,7 @@ class WeixinOfficialAccountPlatformAdapter(Platform): if future: logger.debug(f"duplicate message id checked: {msg.id}") else: - future = asyncio.get_event_loop().create_future() + future = asyncio.get_running_loop().create_future() self.wexin_event_workers[msg_id] = future await self.convert_message(msg, future) # I love shield so much! @@ -461,7 +461,7 @@ class WeixinOfficialAccountPlatformAdapter(Platform): elif msg.type == "voice": assert isinstance(msg, VoiceMessage) - resp: Response = await asyncio.get_event_loop().run_in_executor( + resp: Response = await asyncio.get_running_loop().run_in_executor( None, self.client.media.download, msg.media_id, diff --git a/astrbot/core/provider/sources/dashscope_tts.py b/astrbot/core/provider/sources/dashscope_tts.py index 9b6816859..15e763f3e 100644 --- a/astrbot/core/provider/sources/dashscope_tts.py +++ b/astrbot/core/provider/sources/dashscope_tts.py @@ -87,7 +87,7 @@ class ProviderDashscopeTTSAPI(TTSProvider): model: str, text: str, ) -> tuple[bytes | None, str]: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() response = await loop.run_in_executor(None, self._call_qwen_tts, model, text) audio_bytes = await self._extract_audio_from_response(response) if not audio_bytes: @@ -143,7 +143,7 @@ class ProviderDashscopeTTSAPI(TTSProvider): voice=self.voice, format=AudioFormat.WAV_24000HZ_MONO_16BIT, ) - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() audio_bytes = await loop.run_in_executor( None, synthesizer.call, diff --git a/astrbot/core/provider/sources/genie_tts.py b/astrbot/core/provider/sources/genie_tts.py index 8f9b6d91d..b76bf6b46 100644 --- a/astrbot/core/provider/sources/genie_tts.py +++ b/astrbot/core/provider/sources/genie_tts.py @@ -59,7 +59,7 @@ class GenieTTSProvider(TTSProvider): filename = f"genie_tts_{uuid.uuid4()}.wav" path = os.path.join(temp_dir, filename) - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() def _generate(save_path: str) -> None: assert genie is not None @@ -85,7 +85,7 @@ class GenieTTSProvider(TTSProvider): text_queue: asyncio.Queue[str | None], audio_queue: "asyncio.Queue[bytes | tuple[str, bytes] | None]", ) -> None: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() while True: text = await text_queue.get() diff --git a/astrbot/core/provider/sources/sensevoice_selfhosted_source.py b/astrbot/core/provider/sources/sensevoice_selfhosted_source.py index af6c0f631..d41ebaf62 100644 --- a/astrbot/core/provider/sources/sensevoice_selfhosted_source.py +++ b/astrbot/core/provider/sources/sensevoice_selfhosted_source.py @@ -43,7 +43,7 @@ class ProviderSenseVoiceSTTSelfHost(STTProvider): logger.info("下载或者加载 SenseVoice 模型中,这可能需要一些时间 ...") # 将模型加载放到线程池中执行 - self.model = await asyncio.get_event_loop().run_in_executor( + self.model = await asyncio.get_running_loop().run_in_executor( None, lambda: SenseVoiceSmall(self.model_name, quantize=True, batch_size=16), ) @@ -88,7 +88,7 @@ class ProviderSenseVoiceSTTSelfHost(STTProvider): audio_url = output_path # 使用 run_in_executor 来调用模型进行识别 - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() res = await loop.run_in_executor( None, # 使用默认的线程池 lambda: cast(SenseVoiceSmall, self.model)( diff --git a/astrbot/core/provider/sources/whisper_selfhosted_source.py b/astrbot/core/provider/sources/whisper_selfhosted_source.py index 678deb948..519a64de6 100644 --- a/astrbot/core/provider/sources/whisper_selfhosted_source.py +++ b/astrbot/core/provider/sources/whisper_selfhosted_source.py @@ -31,7 +31,7 @@ class ProviderOpenAIWhisperSelfHost(STTProvider): self.model = None async def initialize(self) -> None: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() logger.info("下载或者加载 Whisper 模型中,这可能需要一些时间 ...") self.model = await loop.run_in_executor( None, @@ -50,7 +50,7 @@ class ProviderOpenAIWhisperSelfHost(STTProvider): return False async def get_text(self, audio_url: str) -> str: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() is_tencent = False diff --git a/astrbot/core/star/star_manager.py b/astrbot/core/star/star_manager.py index 68c58fdae..b812698f2 100644 --- a/astrbot/core/star/star_manager.py +++ b/astrbot/core/star/star_manager.py @@ -1374,10 +1374,23 @@ class PluginManager: return if "__del__" in star_metadata.star_cls_type.__dict__: - asyncio.get_event_loop().run_in_executor( + loop = asyncio.get_running_loop() + future = loop.run_in_executor( None, star_metadata.star_cls.__del__, ) + + def _log_del_exception(fut: asyncio.Future) -> None: + if fut.cancelled(): + return + if (exc := fut.exception()) is not None: + logger.error( + "插件 %s 在 __del__ 中抛出了异常:%r", + star_metadata.name, + exc, + ) + + future.add_done_callback(_log_del_exception) elif "terminate" in star_metadata.star_cls_type.__dict__: await star_metadata.star_cls.terminate() diff --git a/astrbot/core/utils/session_lock.py b/astrbot/core/utils/session_lock.py index 7810d6ce4..732a29b72 100644 --- a/astrbot/core/utils/session_lock.py +++ b/astrbot/core/utils/session_lock.py @@ -1,9 +1,13 @@ import asyncio +import threading +import weakref from collections import defaultdict from contextlib import asynccontextmanager -class SessionLockManager: +class _PerLoopSessionLockManager: + """Per-event-loop session lock manager; keeps original simple semantics.""" + def __init__(self) -> None: self._locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) self._lock_count: dict[str, int] = defaultdict(int) @@ -26,4 +30,26 @@ class SessionLockManager: self._lock_count.pop(session_id, None) +class SessionLockManager: + """Thread-safe session lock manager with per-event-loop isolation.""" + + def __init__(self) -> None: + self._state_guard = threading.Lock() + self._loop_managers: weakref.WeakKeyDictionary[ + asyncio.AbstractEventLoop, _PerLoopSessionLockManager + ] = weakref.WeakKeyDictionary() + + def _get_loop_manager(self) -> _PerLoopSessionLockManager: + """Get the lock manager for the current event loop.""" + loop = asyncio.get_running_loop() + with self._state_guard: + return self._loop_managers.setdefault(loop, _PerLoopSessionLockManager()) + + @asynccontextmanager + async def acquire_lock(self, session_id: str): + manager = self._get_loop_manager() + async with manager.acquire_lock(session_id): + yield + + session_lock_manager = SessionLockManager() diff --git a/main.py b/main.py index 36c46fca3..1cc900982 100644 --- a/main.py +++ b/main.py @@ -101,6 +101,26 @@ async def check_dashboard_files(webui_dir: str | None = None): return data_dist_path +async def main_async(webui_dir_arg: str | None) -> None: + """主异步入口""" + # 检查仪表板文件 + webui_dir = await check_dashboard_files(webui_dir_arg) + if webui_dir is None: + logger.warning( + "管理面板文件检查失败,WebUI 功能将不可用。" + "请检查网络连接或手动指定 --webui-dir 参数。" + ) + + db = db_helper + + # 打印 logo + logger.info(logo_tmpl) + + core_lifecycle = InitialLoader(db, log_broker) + core_lifecycle.webui_dir = webui_dir + await core_lifecycle.start() + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="AstrBot") parser.add_argument( @@ -117,14 +137,5 @@ if __name__ == "__main__": log_broker = LogBroker() LogManager.set_queue_handler(logger, log_broker) - # 检查仪表板文件 - webui_dir = asyncio.run(check_dashboard_files(args.webui_dir)) - - db = db_helper - - # 打印 logo - logger.info(logo_tmpl) - - core_lifecycle = InitialLoader(db, log_broker) - core_lifecycle.webui_dir = webui_dir - asyncio.run(core_lifecycle.start()) + # 只使用一次 asyncio.run() + asyncio.run(main_async(args.webui_dir)) diff --git a/tests/unit/test_session_lock.py b/tests/unit/test_session_lock.py new file mode 100644 index 000000000..fea686b11 --- /dev/null +++ b/tests/unit/test_session_lock.py @@ -0,0 +1,545 @@ +"""Tests for SessionLockManager with multi-event-loop isolation.""" + +import asyncio +import threading +import time +import weakref +from concurrent.futures import ThreadPoolExecutor + +import pytest + +from astrbot.core.utils.session_lock import SessionLockManager + + +class TestSessionLockManagerBasic: + """Basic functionality tests.""" + + def test_init(self): + """Test manager initialization.""" + manager = SessionLockManager() + assert manager._state_guard is not None + assert manager._loop_managers is not None + + @pytest.mark.asyncio + async def test_acquire_release_lock(self): + """Test basic lock acquire and release.""" + manager = SessionLockManager() + session_id = "test-session" + + async with manager.acquire_lock(session_id): + # Lock acquired successfully + pass + + # Lock should be released and cleaned up + state = manager._get_loop_manager() + assert session_id not in state._locks + assert session_id not in state._lock_count + + @pytest.mark.asyncio + async def test_lock_is_reusable(self): + """Test that locks can be acquired multiple times.""" + manager = SessionLockManager() + session_id = "test-session" + + async with manager.acquire_lock(session_id): + pass + + async with manager.acquire_lock(session_id): + pass + + # Both acquisitions should succeed + + +class TestCrossLoopIsolation: + """Tests for event loop isolation.""" + + @pytest.mark.asyncio + async def test_different_loops_have_different_managers(self): + """Test that different event loops get different per-loop managers.""" + manager = SessionLockManager() + + # Get manager for current loop + manager1 = manager._get_loop_manager() + + # Run in a different event loop + def run_in_new_loop(): + new_loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(new_loop) + + async def get_manager(): + return manager._get_loop_manager() + + return new_loop.run_until_complete(get_manager()) + finally: + new_loop.close() + asyncio.set_event_loop(None) + + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(run_in_new_loop) + manager2 = future.result() + + # Should be different manager instances + assert manager1 is not manager2 + + @pytest.mark.asyncio + async def test_locks_isolated_across_loops(self): + """Test that locks from different loops are isolated.""" + manager = SessionLockManager() + session_id = "shared-session" + results = [] + + async def acquire_in_loop(loop_id: int): + """Acquire lock in a new event loop.""" + async with manager.acquire_lock(session_id): + results.append(f"loop-{loop_id}-acquired") + await asyncio.sleep(0.05) + results.append(f"loop-{loop_id}-released") + + def run_in_thread(loop_id: int): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(acquire_in_loop(loop_id)) + finally: + loop.close() + asyncio.set_event_loop(None) + + # Run two loops concurrently - they should NOT block each other + # because locks are isolated per-loop + with ThreadPoolExecutor(max_workers=2) as executor: + futures = [executor.submit(run_in_thread, i) for i in range(2)] + for f in futures: + f.result() + + # Both loops should acquire immediately (no blocking between loops) + # Order should show interleaved acquisitions, not sequential + assert len(results) == 4 + + @pytest.mark.asyncio + async def test_same_loop_blocks_on_same_session(self): + """Test that same loop blocks when acquiring same session lock.""" + manager = SessionLockManager() + session_id = "test-session" + execution_order = [] + + async def task1(): + async with manager.acquire_lock(session_id): + execution_order.append("task1-start") + await asyncio.sleep(0.1) + execution_order.append("task1-end") + + async def task2(): + await asyncio.sleep(0.01) # Let task1 start first + async with manager.acquire_lock(session_id): + execution_order.append("task2-start") + execution_order.append("task2-end") + + await asyncio.gather(task1(), task2()) + + # task2 should wait for task1 to finish + assert execution_order.index("task1-start") < execution_order.index("task1-end") + assert execution_order.index("task1-end") < execution_order.index("task2-start") + + +class TestConcurrency: + """Tests for concurrent access.""" + + @pytest.mark.asyncio + async def test_concurrent_acquisitions_same_loop(self): + """Test concurrent lock acquisitions on the same loop.""" + manager = SessionLockManager() + session_id = "concurrent-session" + acquired_count = 0 + max_concurrent = 0 + lock = asyncio.Lock() + + async def acquire_and_check(): + nonlocal acquired_count, max_concurrent + async with manager.acquire_lock(session_id): + async with lock: + acquired_count += 1 + max_concurrent = max(max_concurrent, acquired_count) + await asyncio.sleep(0.01) + async with lock: + acquired_count -= 1 + + # Run multiple concurrent tasks + tasks = [acquire_and_check() for _ in range(5)] + await asyncio.gather(*tasks) + + # Max concurrent should be 1 (lock serializes access) + assert max_concurrent == 1 + + @pytest.mark.asyncio + async def test_thread_safety_of_loop_manager_creation(self): + """Test that _get_loop_manager is thread-safe.""" + manager = SessionLockManager() + managers = [] + errors = [] + + def create_loop_and_get_manager(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + + async def get_mgr(): + return manager._get_loop_manager() + + mgr = loop.run_until_complete(get_mgr()) + managers.append(mgr) + except Exception as e: + errors.append(e) + finally: + loop.close() + asyncio.set_event_loop(None) + + threads = [threading.Thread(target=create_loop_and_get_manager) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(errors) == 0 + # All managers should be valid + for m in managers: + assert hasattr(m, "_locks") + assert hasattr(m, "_access_lock") + + +class TestEventLoopCleanup: + """Tests for event loop cleanup behavior.""" + + @pytest.mark.asyncio + async def test_weakref_cleanup_on_loop_close(self): + """Test that per-loop managers are cleaned up when loop is closed.""" + manager = SessionLockManager() + loop_ref: weakref.ref[asyncio.AbstractEventLoop] | None = None + + def run_in_new_loop(): + nonlocal loop_ref + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop_ref = weakref.ref(loop) + + async def use_lock(): + async with manager.acquire_lock("test-session"): + pass + return manager._get_loop_manager() + + try: + per_loop_mgr = loop.run_until_complete(use_lock()) + # Keep a weak ref to the per-loop manager + return weakref.ref(per_loop_mgr) + finally: + loop.close() + asyncio.set_event_loop(None) + + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(run_in_new_loop) + per_loop_mgr_ref = future.result() + + # Give time for weakref cleanup + import gc + + gc.collect() + + # The per-loop manager should be cleaned up when the loop is closed + # because WeakKeyDictionary removes entries when the key (loop) is gone + per_loop_mgr = per_loop_mgr_ref() + loop = loop_ref() if loop_ref is not None else None + assert per_loop_mgr is None or loop is None + + @pytest.mark.asyncio + async def test_access_after_loop_close_in_new_loop_works(self): + """Test that accessing from a new loop after old loop closes works.""" + manager = SessionLockManager() + + # Use lock in current loop + async with manager.acquire_lock("session-1"): + pass + + # Simulate old loop being closed and new loop being created + def run_in_new_loop(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + + async def use_lock(): + # Should work without issues in new loop + async with manager.acquire_lock("session-2"): + return "success" + + return loop.run_until_complete(use_lock()) + finally: + loop.close() + asyncio.set_event_loop(None) + + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(run_in_new_loop) + result = future.result() + + assert result == "success" + + +class TestIssue5464: + """Tests for issue #5464: Multiple OneBot instances with different event loops. + + Issue: Running multiple OneBot adapter instances causes + "is bound to a different event loop" error. + """ + + @pytest.mark.asyncio + async def test_multiple_event_loops_no_cross_loop_error(self): + """Test that multiple event loops don't cause cross-loop binding errors. + + This simulates the scenario where multiple OneBot instances + (each potentially running in different event loops) access the + same SessionLockManager concurrently. + """ + from astrbot.core.utils.session_lock import session_lock_manager + + errors: list[Exception] = [] + results: list[str] = [] + + def simulate_onebot_instance(instance_id: int, session_ids: list[str]): + """Simulate a OneBot instance running in its own event loop.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + + async def process_messages(): + for session_id in session_ids: + try: + async with session_lock_manager.acquire_lock(session_id): + # Simulate message processing + await asyncio.sleep(0.01) + results.append(f"instance-{instance_id}-{session_id}") + except Exception as e: + errors.append(e) + + loop.run_until_complete(process_messages()) + finally: + loop.close() + asyncio.set_event_loop(None) + + # Simulate 4 OneBot instances (as in the issue report) + # Each handles multiple sessions concurrently + threads = [] + for i in range(4): + sessions = [f"session-{i}-1", f"session-{i}-2", f"session-{i}-3"] + t = threading.Thread(target=simulate_onebot_instance, args=(i, sessions)) + threads.append(t) + + for t in threads: + t.start() + for t in threads: + t.join() + + # Should have no errors (especially no "bound to a different event loop") + assert len(errors) == 0, f"Errors occurred: {errors}" + assert len(results) == 12 # 4 instances * 3 sessions each + + @pytest.mark.asyncio + async def test_lock_object_not_shared_across_loops(self): + """Verify that asyncio.Lock objects are not shared across event loops. + + The root cause of issue #5464 was that Lock objects created in one + event loop were being used in another, causing the error. + """ + manager = SessionLockManager() + session_id = "shared-session-id" + lock_ids: set[int] = set() + lock_id_lock = threading.Lock() + + def get_lock_in_new_loop(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + + async def acquire_and_capture(): + # Get the per-loop manager + per_loop_mgr = manager._get_loop_manager() + # Capture the lock object id before acquiring + async with per_loop_mgr._access_lock: + lock = per_loop_mgr._locks[session_id] + with lock_id_lock: + lock_ids.add(id(lock)) + async with manager.acquire_lock(session_id): + await asyncio.sleep(0.01) + + loop.run_until_complete(acquire_and_capture()) + finally: + loop.close() + asyncio.set_event_loop(None) + + # Run multiple loops concurrently + threads = [threading.Thread(target=get_lock_in_new_loop) for _ in range(5)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Each loop should have its own Lock object + # If locks were shared, we'd only have 1 lock_id + assert len(lock_ids) == 5, "Each event loop should have its own Lock object" + + @pytest.mark.asyncio + async def test_concurrent_access_same_session_different_loops(self): + """Test that same session ID accessed from different loops doesn't block. + + This verifies the fix: locks are isolated per event loop, + so different loops can acquire the "same" session lock concurrently. + """ + from astrbot.core.utils.session_lock import session_lock_manager + + session_id = "global-session" + acquisition_times: list[float] = [] + time_lock = threading.Lock() + + def acquire_lock_in_loop(loop_id: int): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + + async def acquire(): + import time + + start = time.time() + async with session_lock_manager.acquire_lock(session_id): + with time_lock: + acquisition_times.append(start) + await asyncio.sleep(0.1) # Hold the lock + + loop.run_until_complete(acquire()) + finally: + loop.close() + asyncio.set_event_loop(None) + + # Start 3 threads nearly simultaneously + threads = [threading.Thread(target=acquire_lock_in_loop, args=(i,)) for i in range(3)] + + start_time = time.time() + for t in threads: + t.start() + for t in threads: + t.join() + total_time = time.time() - start_time + + # If locks were NOT isolated, we'd need ~0.3s (3 * 0.1s serial) + # With isolation, all should complete in ~0.1s (parallel) + # Allow some overhead, but should be much less than 0.3s + assert total_time < 0.25, ( + f"Locks should be isolated per loop, but took {total_time:.2f}s" + ) + + +class TestEdgeCases: + """Tests for edge cases.""" + + @pytest.mark.asyncio + async def test_empty_session_id(self): + """Test with empty session ID.""" + manager = SessionLockManager() + + async with manager.acquire_lock(""): + pass + + # Should work without issues + + @pytest.mark.asyncio + async def test_special_characters_in_session_id(self): + """Test with special characters in session ID.""" + manager = SessionLockManager() + session_id = "session-with-special-chars!@#$%^&*()" + + async with manager.acquire_lock(session_id): + pass + + # Should work without issues + + @pytest.mark.asyncio + async def test_very_long_session_id(self): + """Test with very long session ID.""" + manager = SessionLockManager() + session_id = "a" * 10000 + + async with manager.acquire_lock(session_id): + pass + + # Should work without issues + + @pytest.mark.asyncio + async def test_lock_not_held_after_context_exit(self): + """Test that lock is released after context manager exit.""" + manager = SessionLockManager() + session_id = "test-session" + + async with manager.acquire_lock(session_id): + state = manager._get_loop_manager() + # Lock should exist and have count 1 + assert session_id in state._locks + assert state._lock_count[session_id] == 1 + + # After exit, lock should be cleaned up + state = manager._get_loop_manager() + assert session_id not in state._locks + assert session_id not in state._lock_count + + @pytest.mark.asyncio + async def test_exception_during_lock(self): + """Test that lock is released even if exception occurs.""" + manager = SessionLockManager() + session_id = "test-session" + + with pytest.raises(ValueError): + async with manager.acquire_lock(session_id): + raise ValueError("test error") + + # Lock should still be released + state = manager._get_loop_manager() + assert session_id not in state._locks + assert session_id not in state._lock_count + + @pytest.mark.asyncio + async def test_nested_lock_different_sessions(self): + """Test nested locks on different sessions.""" + manager = SessionLockManager() + + async with manager.acquire_lock("session-1"): + async with manager.acquire_lock("session-2"): + state = manager._get_loop_manager() + assert "session-1" in state._locks + assert "session-2" in state._locks + assert state._lock_count["session-1"] == 1 + assert state._lock_count["session-2"] == 1 + + state = manager._get_loop_manager() + assert "session-1" not in state._locks + assert "session-2" not in state._locks + + @pytest.mark.asyncio + async def test_reentrant_lock_same_session(self): + """Test reentrant locking on same session (should block).""" + manager = SessionLockManager() + session_id = "test-session" + order = [] + + async def outer(): + async with manager.acquire_lock(session_id): + order.append("outer-acquired") + await asyncio.sleep(0.1) + order.append("outer-done") + + async def inner(): + await asyncio.sleep(0.01) # Let outer acquire first + order.append("inner-attempt") + async with manager.acquire_lock(session_id): + order.append("inner-acquired") + order.append("inner-done") + + await asyncio.gather(outer(), inner()) + + # Inner should wait for outer to complete + assert order.index("outer-acquired") < order.index("outer-done") + assert order.index("outer-done") < order.index("inner-acquired") From a53a1ca49b15c037ef996dc0894f613337821b40 Mon Sep 17 00:00:00 2001 From: Jason <101583541+JasonOA888@users.noreply.github.com> Date: Mon, 9 Mar 2026 00:17:11 +0800 Subject: [PATCH 06/21] fix(provider): handle MiniMax ThinkingBlock when max_tokens reached (#5913) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(provider): handle MiniMax ThinkingBlock when max_tokens reached Fixes #5912 Problem: MiniMax API returns ThinkingBlock when stop_reason='max_tokens', but AstrBot throws 'completion 无法解析' exception because both completion_text and tools_call_args are empty. Root cause: The validation logic didn't consider ThinkingBlock (reasoning_content) as valid content. Fix: When completion_text and tools_call_args are empty but reasoning_content is present, treat it as valid instead of throwing exception. This happens when the model thinks but runs out of tokens before generating the actual response. Impact: MiniMax models now work correctly when responses are truncated due to max_tokens limit. * refactor: address review feedback 1. Use getattr for safe stop_reason access (prevent AttributeError) 2. Use ValueError instead of generic Exception for better error handling Thanks @gemini-code-assist and @sourcery-ai for the review! * refactor: flatten nested if/else with guard clause Address Gemini Code Assist feedback: - Use guard clause for early return - Flattened nested conditional for better readability Logic unchanged, just cleaner code structure. * fix(provider): improve logging for ThinkingBlock completions in ProviderAnthropic --------- Co-authored-by: Soulter <905617992@qq.com> --- .../core/provider/sources/anthropic_source.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/astrbot/core/provider/sources/anthropic_source.py b/astrbot/core/provider/sources/anthropic_source.py index ec3c395a4..be70fdc74 100644 --- a/astrbot/core/provider/sources/anthropic_source.py +++ b/astrbot/core/provider/sources/anthropic_source.py @@ -276,9 +276,24 @@ class ProviderAnthropic(Provider): llm_response.id = completion.id llm_response.usage = self._extract_usage(completion.usage) - # TODO(Soulter): 处理 end_turn 情况 + # Handle cases where completion only contains ThinkingBlock (e.g., MiniMax max_tokens) + # When stop_reason='max_tokens', the model may return only thinking content + # This is valid and should not raise an exception if not llm_response.completion_text and not llm_response.tools_call_args: - raise Exception(f"Anthropic API 返回的 completion 无法解析:{completion}。") + # Guard clause: raise early if no valid content at all + if not llm_response.reasoning_content: + raise ValueError( + f"Anthropic API returned unparsable completion: " + f"no text, tool_use, or thinking content found. " + f"Completion: {completion}" + ) + + # We have reasoning content (ThinkingBlock) - this is valid + stop_reason = getattr(completion, "stop_reason", "unknown") + logger.debug( + f"Completion contains only ThinkingBlock (stop_reason={stop_reason})" + ) + llm_response.completion_text = "" # Ensure empty string, not None return llm_response From 5dd30f9a45f3c52ae27551d33fe578c5d5ccd81d Mon Sep 17 00:00:00 2001 From: Soulter <905617992@qq.com> Date: Mon, 9 Mar 2026 00:20:33 +0800 Subject: [PATCH 07/21] chore: bump version to 4.19.3 --- astrbot/cli/__init__.py | 2 +- astrbot/core/config/default.py | 46 +++++++++++++++++++++++++++++++++- changelogs/v4.19.3.md | 40 +++++++++++++++++++++++++++++ pyproject.toml | 2 +- 4 files changed, 87 insertions(+), 3 deletions(-) create mode 100644 changelogs/v4.19.3.md diff --git a/astrbot/cli/__init__.py b/astrbot/cli/__init__.py index 2bcafbace..2fe53d9fd 100644 --- a/astrbot/cli/__init__.py +++ b/astrbot/cli/__init__.py @@ -1 +1 @@ -__version__ = "4.19.2" +__version__ = "4.19.3" diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index ad8873e43..e45850378 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -5,7 +5,7 @@ from typing import Any, TypedDict from astrbot.core.utils.astrbot_path import get_astrbot_data_path -VERSION = "4.19.2" +VERSION = "4.19.3" DB_PATH = os.path.join(get_astrbot_data_path(), "data_v4.db") WEBHOOK_SUPPORTED_PLATFORMS = [ @@ -343,11 +343,16 @@ CONFIG_METADATA_2 = { "id": "wecom_ai_bot", "type": "wecom_ai_bot", "enable": True, + "wecom_ai_bot_connection_mode": "webhook", "wecomaibot_init_respond_text": "", "wecomaibot_friend_message_welcome_text": "", "wecom_ai_bot_name": "", "msg_push_webhook_url": "", "only_use_webhook_url_to_send": False, + "long_connection_bot_id": "", + "long_connection_secret": "", + "long_connection_ws_url": "wss://openws.work.weixin.qq.com", + "long_connection_heartbeat_interval": 30, "token": "", "encoding_aes_key": "", "unified_webhook_mode": True, @@ -732,6 +737,13 @@ CONFIG_METADATA_2 = { "type": "string", "hint": "请务必填写正确,否则无法使用一些指令。", }, + "wecom_ai_bot_connection_mode": { + "description": "企业微信智能机器人连接模式", + "type": "string", + "options": ["webhook", "long_connection"], + "labels": ["Webhook 回调", "长连接"], + "hint": "Webhook 回调模式需要配置 Token/EncodingAESKey。长连接模式需要配置 BotID/Secret。", + }, "wecomaibot_init_respond_text": { "description": "企业微信智能机器人初始响应文本", "type": "string", @@ -752,6 +764,38 @@ CONFIG_METADATA_2 = { "type": "bool", "hint": "启用后,企业微信智能机器人的所有回复都改为通过消息推送 Webhook 发送。消息推送 Webhook 支持更多的消息类型(如图片、文件等)。", }, + "long_connection_bot_id": { + "description": "长连接 BotID", + "type": "string", + "hint": "企业微信智能机器人长连接模式凭证 BotID。", + "condition": { + "wecom_ai_bot_connection_mode": "long_connection", + }, + }, + "long_connection_secret": { + "description": "长连接 Secret", + "type": "string", + "hint": "企业微信智能机器人长连接模式凭证 Secret。", + "condition": { + "wecom_ai_bot_connection_mode": "long_connection", + }, + }, + "long_connection_ws_url": { + "description": "长连接 WebSocket 地址", + "type": "string", + "hint": "默认值为 wss://openws.work.weixin.qq.com,一般无需修改。", + "condition": { + "wecom_ai_bot_connection_mode": "long_connection", + }, + }, + "long_connection_heartbeat_interval": { + "description": "长连接心跳间隔", + "type": "int", + "hint": "长连接模式心跳间隔(秒),建议 30 秒。", + "condition": { + "wecom_ai_bot_connection_mode": "long_connection", + }, + }, "lark_bot_name": { "description": "飞书机器人的名字", "type": "string", diff --git a/changelogs/v4.19.3.md b/changelogs/v4.19.3.md new file mode 100644 index 000000000..58bc48595 --- /dev/null +++ b/changelogs/v4.19.3.md @@ -0,0 +1,40 @@ +## What's Changed + +### 新增 + +- 新增技能 ZIP 批量上传能力 ([#5804](https://github.com/AstrBotDevs/AstrBot/pull/5804))。 + +### 修复 + +- 修复 MCP Server 配置异常时可能导致崩溃的问题 ([#5666](https://github.com/AstrBotDevs/AstrBot/pull/5666), [#5673](https://github.com/AstrBotDevs/AstrBot/pull/5673))。 +- 修复钉钉适配器文本消息被忽略、无法主动发送文件的问题 ([#5921](https://github.com/AstrBotDevs/AstrBot/pull/5921))。 +- 修复钉钉适配器无法接收图片与文件的问题 ([#5920](https://github.com/AstrBotDevs/AstrBot/pull/5920))。 +- fix(provider): handle MiniMax ThinkingBlock when max_tokens reached ([#5913](https://github.com/AstrBotDevs/AstrBot/pull/5913))。 +- 修复 OpenRouter `api_base` 配置错误的问题 ([#5911](https://github.com/AstrBotDevs/AstrBot/pull/5911))。 +- 修复插件市场中按展示名搜索已安装插件不生效的问题 ([#5806](https://github.com/AstrBotDevs/AstrBot/pull/5806), [#5811](https://github.com/AstrBotDevs/AstrBot/pull/5811))。 +- 修复仅图片响应未应用 `reply_with_quote` 与 `reply_with_mention` 的问题 ([#5219](https://github.com/AstrBotDevs/AstrBot/pull/5219))。 +- 修复 `RegexFilter` 使用 `re.match` 导致匹配范围不正确的问题 ([#5368](https://github.com/AstrBotDevs/AstrBot/pull/5368))。 +- 修复桌面运行环境检测依赖 frozen Python 的问题 ([#5859](https://github.com/AstrBotDevs/AstrBot/pull/5859))。 +- 修复通过“创建新配置”创建平台机器人后找不到 pipeline scheduler 的问题 ([#5776](https://github.com/AstrBotDevs/AstrBot/pull/5776))。 + +--- + +## What's Changed (EN) + +### New Features + +- Added batch upload support for multiple skill ZIP files ([#5804](https://github.com/AstrBotDevs/AstrBot/pull/5804)). + +### Bug Fixes + +- Fixed potential crash on malformed MCP server config ([#5666](https://github.com/AstrBotDevs/AstrBot/pull/5666), [#5673](https://github.com/AstrBotDevs/AstrBot/pull/5673)). +- Fixed DingTalk adapter issue where text messages were ignored and files could not be sent proactively ([#5921](https://github.com/AstrBotDevs/AstrBot/pull/5921)). +- Fixed DingTalk adapter issue where image and file messages could not be received ([#5920](https://github.com/AstrBotDevs/AstrBot/pull/5920)). +- Fixed incorrect OpenRouter `api_base` configuration ([#5911](https://github.com/AstrBotDevs/AstrBot/pull/5911)). +- Fixed searching installed plugins by display name in extensions ([#5806](https://github.com/AstrBotDevs/AstrBot/pull/5806), [#5811](https://github.com/AstrBotDevs/AstrBot/pull/5811)). +- Fixed image-only responses not applying `reply_with_quote` and `reply_with_mention` ([#5219](https://github.com/AstrBotDevs/AstrBot/pull/5219)). +- Fixed `RegexFilter` using `re.match` instead of `re.search` for expected matching behavior ([#5368](https://github.com/AstrBotDevs/AstrBot/pull/5368)). +- Fixed desktop runtime detection requiring frozen Python ([#5859](https://github.com/AstrBotDevs/AstrBot/pull/5859)). +- Fixed missing pipeline scheduler after creating a platform bot via "create new config" ([#5776](https://github.com/AstrBotDevs/AstrBot/pull/5776)). +- fix(provider): handle MiniMax ThinkingBlock when max_tokens reached ([#5913](https://github.com/AstrBotDevs/AstrBot/pull/5913)) + diff --git a/pyproject.toml b/pyproject.toml index 463da4955..a9e8c060d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "AstrBot" -version = "4.19.2" +version = "4.19.3" description = "Easy-to-use multi-platform LLM chatbot and development framework" readme = "README.md" requires-python = ">=3.12" From 654112ca863e0c1d242fea99138845b5339ba33f Mon Sep 17 00:00:00 2001 From: Soulter <37870767+Soulter@users.noreply.github.com> Date: Mon, 9 Mar 2026 11:10:32 +0800 Subject: [PATCH 08/21] feat(wecomai): implement long connection mode and update configuration options (#5930) --- astrbot/core/config/default.py | 45 +++- .../sources/wecom_ai_bot/wecomai_adapter.py | 216 +++++++++++++--- .../sources/wecom_ai_bot/wecomai_event.py | 125 +++++++++- .../wecom_ai_bot/wecomai_long_connection.py | 236 ++++++++++++++++++ .../en-US/features/config-metadata.json | 28 +++ .../zh-CN/features/config-metadata.json | 38 ++- 6 files changed, 635 insertions(+), 53 deletions(-) create mode 100644 astrbot/core/platform/sources/wecom_ai_bot/wecomai_long_connection.py diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index e45850378..f1280ba7c 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -342,19 +342,20 @@ CONFIG_METADATA_2 = { "企业微信智能机器人": { "id": "wecom_ai_bot", "type": "wecom_ai_bot", + "hint": "如果发现字段有异常,请重新创建", "enable": True, - "wecom_ai_bot_connection_mode": "webhook", + "wecom_ai_bot_connection_mode": "long_connection", # long_connection, webhook + "wecom_ai_bot_name": "", + "wecomaibot_ws_bot_id": "", + "wecomaibot_ws_secret": "", + "wecomaibot_token": "", + "wecomaibot_encoding_aes_key": "", "wecomaibot_init_respond_text": "", "wecomaibot_friend_message_welcome_text": "", - "wecom_ai_bot_name": "", "msg_push_webhook_url": "", "only_use_webhook_url_to_send": False, - "long_connection_bot_id": "", - "long_connection_secret": "", - "long_connection_ws_url": "wss://openws.work.weixin.qq.com", - "long_connection_heartbeat_interval": 30, - "token": "", - "encoding_aes_key": "", + "wecomaibot_ws_url": "wss://openws.work.weixin.qq.com", + "wecomaibot_heartbeat_interval": 30, "unified_webhook_mode": True, "webhook_uuid": "", "callback_server_host": "0.0.0.0", @@ -754,6 +755,22 @@ CONFIG_METADATA_2 = { "type": "string", "hint": "当用户当天进入智能机器人单聊会话,回复欢迎语,留空则不回复。", }, + "wecomaibot_token": { + "description": "企业微信智能机器人 Token", + "type": "string", + "hint": "用于 Webhook 回调模式的身份验证。", + "condition": { + "wecom_ai_bot_connection_mode": "webhook", + }, + }, + "wecomaibot_encoding_aes_key": { + "description": "企业微信智能机器人 EncodingAESKey", + "type": "string", + "hint": "用于 Webhook 回调模式的消息加密解密。", + "condition": { + "wecom_ai_bot_connection_mode": "webhook", + }, + }, "msg_push_webhook_url": { "description": "企业微信消息推送 Webhook URL", "type": "string", @@ -764,7 +781,7 @@ CONFIG_METADATA_2 = { "type": "bool", "hint": "启用后,企业微信智能机器人的所有回复都改为通过消息推送 Webhook 发送。消息推送 Webhook 支持更多的消息类型(如图片、文件等)。", }, - "long_connection_bot_id": { + "wecomaibot_ws_bot_id": { "description": "长连接 BotID", "type": "string", "hint": "企业微信智能机器人长连接模式凭证 BotID。", @@ -772,7 +789,7 @@ CONFIG_METADATA_2 = { "wecom_ai_bot_connection_mode": "long_connection", }, }, - "long_connection_secret": { + "wecomaibot_ws_secret": { "description": "长连接 Secret", "type": "string", "hint": "企业微信智能机器人长连接模式凭证 Secret。", @@ -780,17 +797,19 @@ CONFIG_METADATA_2 = { "wecom_ai_bot_connection_mode": "long_connection", }, }, - "long_connection_ws_url": { + "wecomaibot_ws_url": { "description": "长连接 WebSocket 地址", "type": "string", + "invisible": True, "hint": "默认值为 wss://openws.work.weixin.qq.com,一般无需修改。", "condition": { "wecom_ai_bot_connection_mode": "long_connection", }, }, - "long_connection_heartbeat_interval": { + "wecomaibot_heartbeat_interval": { "description": "长连接心跳间隔", "type": "int", + "invisible": True, "hint": "长连接模式心跳间隔(秒),建议 30 秒。", "condition": { "wecom_ai_bot_connection_mode": "long_connection", @@ -840,7 +859,7 @@ CONFIG_METADATA_2 = { "unified_webhook_mode": { "description": "统一 Webhook 模式", "type": "bool", - "hint": "启用后,将使用 AstrBot 统一 Webhook 入口,无需单独开启端口。回调地址为 /api/platform/webhook/{webhook_uuid}。", + "hint": "Webhook 模式下使用 AstrBot 统一 Webhook 入口,无需单独开启端口。回调地址为 /api/platform/webhook/{webhook_uuid}。", }, "webhook_uuid": { "invisible": True, diff --git a/astrbot/core/platform/sources/wecom_ai_bot/wecomai_adapter.py b/astrbot/core/platform/sources/wecom_ai_bot/wecomai_adapter.py index aba60e06c..62f236b57 100644 --- a/astrbot/core/platform/sources/wecom_ai_bot/wecomai_adapter.py +++ b/astrbot/core/platform/sources/wecom_ai_bot/wecomai_adapter.py @@ -1,5 +1,5 @@ """企业微信智能机器人平台适配器 -基于企业微信智能机器人 API 的消息平台适配器,支持 HTTP 回调 +基于企业微信智能机器人 API 的消息平台适配器,支持 HTTP 回调与长连接 参考webchat_adapter.py的队列机制,实现异步消息处理和流式响应 """ @@ -31,6 +31,7 @@ from .wecomai_api import ( WecomAIBotStreamMessageBuilder, ) from .wecomai_event import WecomAIBotMessageEvent +from .wecomai_long_connection import WecomAIBotLongConnectionClient from .wecomai_queue_mgr import WecomAIQueueMgr from .wecomai_server import WecomAIBotServer from .wecomai_utils import ( @@ -78,8 +79,13 @@ class WecomAIBotAdapter(Platform): self.settings = platform_settings # 初始化配置参数 - self.token = self.config["token"] - self.encoding_aes_key = self.config["encoding_aes_key"] + self.connection_mode = self.config.get( + "wecom_ai_bot_connection_mode", "webhook" + ) + self.token = self.config.get("token", self.config.get("wecomaibot_token", "")) + self.encoding_aes_key = self.config.get( + "encoding_aes_key", self.config.get("wecomaibot_encoding_aes_key", "") + ) self.port = int(self.config["port"]) self.host = self.config.get("callback_server_host", "0.0.0.0") self.bot_name = self.config.get("wecom_ai_bot_name", "") @@ -96,25 +102,52 @@ class WecomAIBotAdapter(Platform): self.only_use_webhook_url_to_send = bool( self.config.get("only_use_webhook_url_to_send", False), ) + self.long_connection_bot_id = self.config.get( + "wecomaibot_ws_bot_id", self.config.get("long_connection_bot_id", "") + ) + self.long_connection_secret = self.config.get( + "wecomaibot_ws_secret", self.config.get("long_connection_secret", "") + ) + self.long_connection_ws_url = self.config.get( + "wecomaibot_ws_url", + "wss://openws.work.weixin.qq.com", + ) + self.long_connection_heartbeat_interval = int( + self.config.get("wecomaibot_heartbeat_interval", 30), + ) # 平台元数据 self.metadata = PlatformMetadata( name="wecom_ai_bot", - description="企业微信智能机器人适配器,支持 HTTP 回调接收消息", + description="企业微信智能机器人适配器,支持 HTTP 回调和长连接模式", id=self.config.get("id", "wecom_ai_bot"), support_proactive_message=bool(self.msg_push_webhook_url), ) - # 初始化 API 客户端 - self.api_client = WecomAIBotAPIClient(self.token, self.encoding_aes_key) + self.api_client: WecomAIBotAPIClient | None = None + self.server: WecomAIBotServer | None = None + self.long_connection_client: WecomAIBotLongConnectionClient | None = None - # 初始化 HTTP 服务器 - self.server = WecomAIBotServer( - host=self.host, - port=self.port, - api_client=self.api_client, - message_handler=self._process_message, - ) + if self.connection_mode == "long_connection": + if not self.long_connection_bot_id or not self.long_connection_secret: + logger.warning( + "企业微信智能机器人长连接模式缺少 BotID 或 Secret,连接可能失败" + ) + self.long_connection_client = WecomAIBotLongConnectionClient( + bot_id=self.long_connection_bot_id, + secret=self.long_connection_secret, + ws_url=self.long_connection_ws_url, + heartbeat_interval=self.long_connection_heartbeat_interval, + message_handler=self._process_long_connection_payload, + ) + else: + self.api_client = WecomAIBotAPIClient(self.token, self.encoding_aes_key) + self.server = WecomAIBotServer( + host=self.host, + port=self.port, + api_client=self.api_client, + message_handler=self._process_message, + ) # 事件循环和关闭信号 self.shutdown_event = asyncio.Event() @@ -161,6 +194,9 @@ class WecomAIBotAdapter(Platform): 加密后的响应消息,无需响应时返回 None """ + if not self.api_client: + logger.error("Webhook 消息处理失败: API 客户端未初始化") + return None msgtype = message_data.get("msgtype") if not msgtype: logger.warning(f"消息类型未知,忽略: {message_data}") @@ -320,6 +356,89 @@ class WecomAIBotAdapter(Platform): logger.error("处理欢迎消息时发生异常: %s", e) return None + async def _process_long_connection_payload( + self, + payload: dict[str, Any], + ) -> None: + """处理长连接回调消息。""" + cmd = payload.get("cmd") + headers = payload.get("headers") or {} + body = payload.get("body") or {} + req_id = headers.get("req_id") + if not isinstance(body, dict): + return + + if cmd == "aibot_msg_callback": + session_id = self._extract_session_id(body) + stream_id = f"{session_id}_{generate_random_string(10)}" + await self._enqueue_message( + body, {"req_id": req_id or ""}, stream_id, session_id + ) + self.queue_mgr.set_pending_response( + stream_id, + { + "req_id": req_id or "", + "connection_mode": "long_connection", + }, + ) + + if self.initial_respond_text and req_id: + await self._send_long_connection_respond_msg( + req_id=req_id, + body={ + "msgtype": "stream", + "stream": { + "id": stream_id, + "finish": False, + "content": self.initial_respond_text, + }, + }, + ) + return + + if cmd == "aibot_event_callback": + event = body.get("event") or {} + event_type = event.get("eventtype") + if ( + event_type == "enter_chat" + and self.friend_message_welcome_text + and req_id + ): + await self._send_long_connection_respond_welcome(req_id) + elif event_type == "disconnected_event": + logger.warning( + "[WecomAI][LongConn] 收到 disconnected_event,旧连接将被关闭" + ) + + async def _send_long_connection_respond_welcome(self, req_id: str) -> bool: + client = self.long_connection_client + if not client: + return False + return await client.send_command( + cmd="aibot_respond_welcome_msg", + req_id=req_id, + body={ + "msgtype": "text", + "text": { + "content": self.friend_message_welcome_text, + }, + }, + ) + + async def _send_long_connection_respond_msg( + self, + req_id: str, + body: dict[str, Any], + ) -> bool: + client = self.long_connection_client + if not client: + return False + return await client.send_command( + cmd="aibot_respond_msg", + req_id=req_id, + body=body, + ) + def _extract_session_id(self, message_data: dict[str, Any]) -> str: """从消息数据中提取会话ID""" user_id = message_data.get("from", {}).get("userid", "default_user") @@ -355,15 +474,16 @@ class WecomAIBotAdapter(Platform): content = "" image_base64 = [] - _img_url_to_process = [] + _img_url_to_process: list[tuple[str, str | None]] = [] msg_items = [] if msgtype == WecomAIBotConstants.MSG_TYPE_TEXT: content = WecomAIBotMessageParser.parse_text_message(message_data) elif msgtype == WecomAIBotConstants.MSG_TYPE_IMAGE: - _img_url_to_process.append( - WecomAIBotMessageParser.parse_image_message(message_data), - ) + image_payload = message_data.get("image", {}) + image_url = image_payload.get("url", "") + if image_url: + _img_url_to_process.append((image_url, image_payload.get("aeskey"))) elif msgtype == WecomAIBotConstants.MSG_TYPE_MIXED: # 提取混合消息中的文本内容 msg_items = WecomAIBotMessageParser.parse_mixed_message(message_data) @@ -374,9 +494,12 @@ class WecomAIBotAdapter(Platform): if text_content: text_parts.append(text_content) elif item.get("msgtype") == WecomAIBotConstants.MSG_TYPE_IMAGE: - image_url = item.get("image", {}).get("url", "") + image_payload = item.get("image", {}) + image_url = image_payload.get("url", "") if image_url: - _img_url_to_process.append(image_url) + _img_url_to_process.append( + (image_url, image_payload.get("aeskey")) + ) content = " ".join(text_parts) if text_parts else "" else: content = f"[{msgtype}消息]" @@ -384,8 +507,8 @@ class WecomAIBotAdapter(Platform): # 并行处理图片下载和解密 if _img_url_to_process: tasks = [ - process_encrypted_image(url, self.encoding_aes_key) - for url in _img_url_to_process + process_encrypted_image(url, aes_key or self.encoding_aes_key) + for url, aes_key in _img_url_to_process ] results = await asyncio.gather(*tasks) for success, result in results: @@ -459,26 +582,43 @@ class WecomAIBotAdapter(Platform): """运行适配器,同时启动HTTP服务器和队列监听器""" async def run_both() -> None: - # 如果启用统一 webhook 模式,则不启动独立服务器 - webhook_uuid = self.config.get("webhook_uuid") - if self.unified_webhook_mode and webhook_uuid: - log_webhook_info(f"{self.meta().id}(企业微信智能机器人)", webhook_uuid) - # 只运行队列监听器 - await self.queue_listener.run() - else: + if self.connection_mode == "long_connection": + if not self.long_connection_client: + raise RuntimeError("长连接客户端未初始化") logger.info( - "启动企业微信智能机器人适配器,监听 %s:%d", self.host, self.port + "启动企业微信智能机器人长连接模式: %s", self.long_connection_ws_url ) - # 同时运行HTTP服务器和队列监听器 await asyncio.gather( - self.server.start_server(), + self.long_connection_client.start(), self.queue_listener.run(), ) + else: + # 如果启用统一 webhook 模式,则不启动独立服务器 + webhook_uuid = self.config.get("webhook_uuid") + if self.unified_webhook_mode and webhook_uuid: + log_webhook_info( + f"{self.meta().id}(企业微信智能机器人)", webhook_uuid + ) + # 只运行队列监听器 + await self.queue_listener.run() + else: + if not self.server: + raise RuntimeError("Webhook 服务器未初始化") + logger.info( + "启动企业微信智能机器人适配器,监听 %s:%d", self.host, self.port + ) + # 同时运行HTTP服务器和队列监听器 + await asyncio.gather( + self.server.start_server(), + self.queue_listener.run(), + ) return run_both() async def webhook_callback(self, request: Any) -> Any: """统一 Webhook 回调入口""" + if self.connection_mode == "long_connection" or not self.server: + return "long_connection mode does not accept webhook callbacks", 400 # 根据请求方法分发到不同的处理函数 if request.method == "GET": return await self.server.handle_verify(request) @@ -489,7 +629,10 @@ class WecomAIBotAdapter(Platform): """终止适配器""" logger.info("企业微信智能机器人适配器正在关闭...") self.shutdown_event.set() - await self.server.shutdown() + if self.long_connection_client: + await self.long_connection_client.shutdown() + if self.server: + await self.server.shutdown() def meta(self) -> PlatformMetadata: """获取平台元数据""" @@ -507,17 +650,22 @@ class WecomAIBotAdapter(Platform): queue_mgr=self.queue_mgr, webhook_client=self.webhook_client, only_use_webhook_url_to_send=self.only_use_webhook_url_to_send, + long_connection_sender=self._send_long_connection_respond_msg, ) + message_event.is_at_or_wake_command = ( + True # 企业微信智能机器人默认消息都是 at 或唤醒命令 + ) + message_event.is_wake = True # 企业微信智能机器人消息默认当做唤醒命令处理 self.commit_event(message_event) except Exception as e: logger.error("处理消息时发生异常: %s", e) - def get_client(self) -> WecomAIBotAPIClient: + def get_client(self) -> WecomAIBotAPIClient | None: """获取 API 客户端""" return self.api_client - def get_server(self) -> WecomAIBotServer: + def get_server(self) -> WecomAIBotServer | None: """获取 HTTP 服务器实例""" return self.server diff --git a/astrbot/core/platform/sources/wecom_ai_bot/wecomai_event.py b/astrbot/core/platform/sources/wecom_ai_bot/wecomai_event.py index 0369a82af..b7cf189e1 100644 --- a/astrbot/core/platform/sources/wecom_ai_bot/wecomai_event.py +++ b/astrbot/core/platform/sources/wecom_ai_bot/wecomai_event.py @@ -1,5 +1,7 @@ """企业微信智能机器人事件处理模块,处理消息事件的发送和接收""" +from collections.abc import Awaitable, Callable + from astrbot.api import logger from astrbot.api.event import AstrMessageEvent, MessageChain from astrbot.api.message_components import At, Image, Plain @@ -18,10 +20,11 @@ class WecomAIBotMessageEvent(AstrMessageEvent): message_obj, platform_meta, session_id: str, - api_client: WecomAIBotAPIClient, + api_client: WecomAIBotAPIClient | None, queue_mgr: WecomAIQueueMgr, webhook_client: WecomAIBotWebhookClient | None = None, only_use_webhook_url_to_send: bool = False, + long_connection_sender: (Callable[[str, dict], Awaitable[bool]] | None) = None, ) -> None: """初始化消息事件 @@ -38,6 +41,7 @@ class WecomAIBotMessageEvent(AstrMessageEvent): self.queue_mgr = queue_mgr self.webhook_client = webhook_client self.only_use_webhook_url_to_send = only_use_webhook_url_to_send + self.long_connection_sender = long_connection_sender async def _mark_stream_complete(self, stream_id: str) -> None: back_queue = self.queue_mgr.get_or_create_back_queue(stream_id) @@ -117,6 +121,18 @@ class WecomAIBotMessageEvent(AstrMessageEvent): return data + @staticmethod + def _extract_plain_text_from_chain(message_chain: MessageChain | None) -> str: + if not message_chain: + return "" + plain_parts: list[str] = [] + for comp in message_chain.chain: + if isinstance(comp, At): + plain_parts.append(f"@{comp.name} ") + elif isinstance(comp, Plain): + plain_parts.append(comp.text) + return "".join(plain_parts).strip() + async def send(self, message: MessageChain | None) -> None: """发送消息""" raw = self.message_obj.raw_message @@ -124,6 +140,44 @@ class WecomAIBotMessageEvent(AstrMessageEvent): "wecom_ai_bot platform event raw_message should be a dict" ) stream_id = raw.get("stream_id", self.session_id) + pending_response = self.queue_mgr.get_pending_response(stream_id) or {} + connection_mode = pending_response.get("callback_params", {}).get( + "connection_mode" + ) + req_id = pending_response.get("callback_params", {}).get("req_id") + + if ( + connection_mode == "long_connection" + and self.long_connection_sender + and isinstance(req_id, str) + and req_id + ): + if self.only_use_webhook_url_to_send and self.webhook_client and message: + await self.webhook_client.send_message_chain(message) + await super().send(MessageChain([])) + return + + if self.webhook_client and message: + await self.webhook_client.send_message_chain( + message, + unsupported_only=True, + ) + + content = self._extract_plain_text_from_chain(message) + await self.long_connection_sender( + req_id, + { + "msgtype": "stream", + "stream": { + "id": stream_id, + "finish": True, + "content": content, + }, + }, + ) + await super().send(MessageChain([])) + return + if self.only_use_webhook_url_to_send and self.webhook_client and message: await self.webhook_client.send_message_chain(message) await self._mark_stream_complete(stream_id) @@ -152,8 +206,77 @@ class WecomAIBotMessageEvent(AstrMessageEvent): "wecom_ai_bot platform event raw_message should be a dict" ) stream_id = raw.get("stream_id", self.session_id) + pending_response = self.queue_mgr.get_pending_response(stream_id) or {} + connection_mode = pending_response.get("callback_params", {}).get( + "connection_mode" + ) + req_id = pending_response.get("callback_params", {}).get("req_id") back_queue = self.queue_mgr.get_or_create_back_queue(stream_id) + if ( + connection_mode == "long_connection" + and self.long_connection_sender + and isinstance(req_id, str) + and req_id + ): + if self.only_use_webhook_url_to_send and self.webhook_client: + merged_chain = MessageChain([]) + async for chain in generator: + merged_chain.chain.extend(chain.chain) + merged_chain.squash_plain() + await self.webhook_client.send_message_chain(merged_chain) + await self.long_connection_sender( + req_id, + { + "msgtype": "stream", + "stream": { + "id": stream_id, + "finish": True, + "content": "", + }, + }, + ) + await super().send_streaming(generator, use_fallback) + return + + increment_plain = "" + async for chain in generator: + if self.webhook_client: + await self.webhook_client.send_message_chain( + chain, + unsupported_only=True, + ) + + chain.squash_plain() + chunk_text = self._extract_plain_text_from_chain(chain) + if chunk_text: + increment_plain += chunk_text + await self.long_connection_sender( + req_id, + { + "msgtype": "stream", + "stream": { + "id": stream_id, + "finish": False, + "content": increment_plain, + }, + }, + ) + + await self.long_connection_sender( + req_id, + { + "msgtype": "stream", + "stream": { + "id": stream_id, + "finish": True, + "content": increment_plain, + }, + }, + ) + await super().send_streaming(generator, use_fallback) + return + if self.only_use_webhook_url_to_send and self.webhook_client: merged_chain = MessageChain([]) async for chain in generator: diff --git a/astrbot/core/platform/sources/wecom_ai_bot/wecomai_long_connection.py b/astrbot/core/platform/sources/wecom_ai_bot/wecomai_long_connection.py new file mode 100644 index 000000000..1017dd230 --- /dev/null +++ b/astrbot/core/platform/sources/wecom_ai_bot/wecomai_long_connection.py @@ -0,0 +1,236 @@ +"""企业微信智能机器人长连接客户端。""" + +import asyncio +import json +import uuid +from collections.abc import Awaitable, Callable +from typing import Any + +import aiohttp + +from astrbot.api import logger + + +class WecomAIBotLongConnectionClient: + """企业微信智能机器人 WebSocket 长连接客户端。""" + + def __init__( + self, + bot_id: str, + secret: str, + ws_url: str, + heartbeat_interval: int, + message_handler: Callable[[dict[str, Any]], Awaitable[None]], + ) -> None: + self.bot_id = bot_id + self.secret = secret + self.ws_url = ws_url + self.heartbeat_interval = max(5, int(heartbeat_interval)) + self.message_handler = message_handler + + self._session: aiohttp.ClientSession | None = None + self._ws: aiohttp.ClientWebSocketResponse | None = None + self._shutdown_event = asyncio.Event() + self._send_lock = asyncio.Lock() + self._command_lock = asyncio.Lock() + self._response_waiters: dict[str, asyncio.Future[dict[str, Any]]] = {} + + @staticmethod + def gen_req_id() -> str: + return uuid.uuid4().hex + + async def start(self) -> None: + """启动长连接并自动重连。""" + reconnect_delay = 1 + while not self._shutdown_event.is_set(): + try: + await self._run_once() + reconnect_delay = 1 + except asyncio.CancelledError: + raise + except Exception as e: + logger.error("[WecomAI][LongConn] 长连接异常: %s", e) + if self._shutdown_event.is_set(): + break + await asyncio.sleep(reconnect_delay) + reconnect_delay = min(reconnect_delay * 2, 30) + + async def _run_once(self) -> None: + timeout = aiohttp.ClientTimeout(total=None, sock_connect=15, sock_read=None) + async with aiohttp.ClientSession(timeout=timeout) as session: + self._session = session + logger.info("[WecomAI][LongConn] 正在连接: %s", self.ws_url) + async with session.ws_connect( + self.ws_url, heartbeat=None, autoping=True + ) as ws: + self._ws = ws + await self._subscribe() + logger.info("[WecomAI][LongConn] 订阅成功,已建立长连接") + + heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + try: + while not self._shutdown_event.is_set(): + message = await ws.receive() + if message.type == aiohttp.WSMsgType.TEXT: + await self._handle_text_message(message.data) + elif message.type in { + aiohttp.WSMsgType.CLOSED, + aiohttp.WSMsgType.CLOSE, + aiohttp.WSMsgType.ERROR, + }: + break + finally: + heartbeat_task.cancel() + try: + await heartbeat_task + except asyncio.CancelledError: + pass + self._ws = None + + async def _subscribe(self) -> None: + """发送 aibot_subscribe,并等待响应。""" + req_id = self.gen_req_id() + payload = { + "cmd": "aibot_subscribe", + "headers": {"req_id": req_id}, + "body": {"bot_id": self.bot_id, "secret": self.secret}, + } + await self._send_json(payload) + + if not self._ws: + raise RuntimeError("WebSocket 未建立") + + reply = await self._ws.receive(timeout=10) + if reply.type != aiohttp.WSMsgType.TEXT: + raise RuntimeError(f"订阅失败: 非文本响应 {reply.type}") + + data = json.loads(reply.data) + if data.get("errcode") != 0: + raise RuntimeError( + f"订阅失败 errcode={data.get('errcode')} errmsg={data.get('errmsg')}" + ) + + async def _heartbeat_loop(self) -> None: + while not self._shutdown_event.is_set(): + await asyncio.sleep(self.heartbeat_interval) + if self._shutdown_event.is_set(): + break + try: + await self.send_command("ping", self.gen_req_id(), None) + except Exception as e: + logger.warning("[WecomAI][LongConn] 发送心跳失败: %s", e) + return + + async def _handle_text_message(self, text: str) -> None: + try: + payload = json.loads(text) + except json.JSONDecodeError: + logger.warning("[WecomAI][LongConn] 收到非 JSON 消息: %s", text) + return + + headers = payload.get("headers") or {} + req_id = headers.get("req_id") + if isinstance(req_id, str): + waiter = self._response_waiters.get(req_id) + if waiter and not waiter.done(): + waiter.set_result(payload) + return + + cmd = payload.get("cmd") + if cmd in {"aibot_msg_callback", "aibot_event_callback"}: + await self.message_handler(payload) + return + + if payload.get("errcode") not in (None, 0): + logger.warning( + "[WecomAI][LongConn] 服务端返回错误: errcode=%s errmsg=%s", + payload.get("errcode"), + payload.get("errmsg"), + ) + + async def send_command( + self, + cmd: str, + req_id: str, + body: dict[str, Any] | None, + ) -> bool: + """发送长连接命令。""" + headers = {"req_id": req_id} + payload: dict[str, Any] = {"cmd": cmd, "headers": headers} + if body is not None: + payload["body"] = body + + async with self._command_lock: + max_retries = 3 + for attempt in range(max_retries + 1): + response = await self._send_and_wait_response(req_id, payload) + if not response: + if attempt < max_retries: + await asyncio.sleep(min(0.2 * (2**attempt), 2.0)) + continue + return False + + errcode = response.get("errcode") + if errcode in (0, None): + return True + + if errcode == 6000 and attempt < max_retries: + backoff = min(0.2 * (2**attempt), 2.0) + logger.warning( + "[WecomAI][LongConn] 命令冲突(errcode=6000),将重试。cmd=%s req_id=%s attempt=%d", + cmd, + req_id, + attempt + 1, + ) + await asyncio.sleep(backoff) + continue + + logger.warning( + "[WecomAI][LongConn] 命令失败: cmd=%s req_id=%s errcode=%s errmsg=%s", + cmd, + req_id, + errcode, + response.get("errmsg"), + ) + return False + + return False + + async def _send_and_wait_response( + self, + req_id: str, + payload: dict[str, Any], + timeout: float = 10.0, + ) -> dict[str, Any] | None: + loop = asyncio.get_running_loop() + waiter: asyncio.Future[dict[str, Any]] = loop.create_future() + self._response_waiters[req_id] = waiter + try: + await self._send_json(payload) + return await asyncio.wait_for(waiter, timeout=timeout) + except TimeoutError: + logger.warning( + "[WecomAI][LongConn] 等待命令响应超时: cmd=%s req_id=%s", + payload.get("cmd"), + req_id, + ) + return None + finally: + self._response_waiters.pop(req_id, None) + + async def _send_json(self, payload: dict[str, Any]) -> None: + ws = self._ws + if ws is None or ws.closed: + raise RuntimeError("长连接尚未建立") + async with self._send_lock: + await ws.send_json(payload) + + async def shutdown(self) -> None: + self._shutdown_event.set() + ws = self._ws + if ws is not None and not ws.closed: + await ws.close() + + session = self._session + if session is not None and not session.closed: + await session.close() diff --git a/dashboard/src/i18n/locales/en-US/features/config-metadata.json b/dashboard/src/i18n/locales/en-US/features/config-metadata.json index a143678c2..089aca7ad 100644 --- a/dashboard/src/i18n/locales/en-US/features/config-metadata.json +++ b/dashboard/src/i18n/locales/en-US/features/config-metadata.json @@ -550,6 +550,10 @@ "description": "WeCom AI Bot Name", "hint": "Must be correct; otherwise some commands won't work." }, + "wecom_ai_bot_connection_mode": { + "description": "WeCom AI Bot Connection Mode", + "hint": "Webhook mode requires Token/EncodingAESKey; long_connection mode requires BotID/Secret." + }, "wecomaibot_friend_message_welcome_text": { "description": "WeCom AI Bot DM Welcome Message", "hint": "When a user enters a DM session on that day, reply with a welcome message. Leave empty to disable." @@ -558,6 +562,30 @@ "description": "WeCom AI Bot Initial Response Text", "hint": "First reply when the bot receives a message. Leave empty to disable." }, + "wecomaibot_token": { + "description": "WeCom AI Bot Token", + "hint": "Used for authentication in webhook callback mode." + }, + "wecomaibot_encoding_aes_key": { + "description": "WeCom AI Bot EncodingAESKey", + "hint": "Used for message encryption/decryption in webhook callback mode." + }, + "wecomaibot_ws_bot_id": { + "description": "Long Connection BotID", + "hint": "BotID credential for WeCom AI Bot long connection mode." + }, + "wecomaibot_ws_secret": { + "description": "Long Connection Secret", + "hint": "Secret credential for WeCom AI Bot long connection mode." + }, + "wecomaibot_ws_url": { + "description": "Long Connection WebSocket URL", + "hint": "Default is wss://openws.work.weixin.qq.com and usually does not need changes." + }, + "wecomaibot_heartbeat_interval": { + "description": "Long Connection Heartbeat Interval", + "hint": "Heartbeat interval (seconds) in long connection mode. 30 seconds is recommended." + }, "wpp_active_message_poll": { "description": "Enable Proactive Message Polling", "hint": "Only enable if WeChat messages are not syncing to AstrBot on time. Disabled by default." diff --git a/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json b/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json index 015ce3082..158dbf380 100644 --- a/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json +++ b/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json @@ -543,7 +543,7 @@ }, "unified_webhook_mode": { "description": "统一 Webhook 模式", - "hint": "启用后,将使用 AstrBot 统一 Webhook 入口,无需单独开启端口。回调地址为 /api/platform/webhook/{webhook_uuid}。" + "hint": "Webhook 模式下使用 AstrBot 统一 Webhook 入口,无需单独开启端口。回调地址为 /api/platform/webhook/{webhook_uuid}。" }, "webhook_uuid": { "description": "Webhook UUID", @@ -553,13 +553,41 @@ "description": "企业微信智能机器人的名字", "hint": "请务必填写正确,否则无法使用一些指令。" }, + "wecom_ai_bot_connection_mode": { + "description": "企业微信智能机器人连接模式", + "hint": "Webhook 回调模式需要配置 Token/EncodingAESKey;长连接模式需要配置 BotID/Secret。" + }, "wecomaibot_friend_message_welcome_text": { "description": "企业微信智能机器人私聊欢迎语", - "hint": "当用户当天进入智能机器人单聊会话,回复欢迎语,如 “💭 思考中...”。留空则不回复。" + "hint": "可选。当用户当天进入智能机器人单聊会话,回复欢迎语,如 “💭 思考中...”。留空则不回复。" }, "wecomaibot_init_respond_text": { "description": "企业微信智能机器人初始响应文本", - "hint": "当机器人收到消息时,首先回复的文本内容。留空则不设置。" + "hint": "可选。当机器人收到消息时,首先回复的文本内容。留空则不设置。" + }, + "wecomaibot_token": { + "description": "企业微信智能机器人 Token", + "hint": "用于 Webhook 回调模式的身份验证。" + }, + "wecomaibot_encoding_aes_key": { + "description": "企业微信智能机器人 EncodingAESKey", + "hint": "用于 Webhook 回调模式的消息加密解密。" + }, + "wecomaibot_ws_bot_id": { + "description": "长连接 BotID", + "hint": "企业微信智能机器人长连接模式凭证 BotID。" + }, + "wecomaibot_ws_secret": { + "description": "长连接 Secret", + "hint": "企业微信智能机器人长连接模式凭证 Secret。" + }, + "wecomaibot_ws_url": { + "description": "长连接 WebSocket 地址", + "hint": "默认值为 wss://openws.work.weixin.qq.com,一般无需修改。" + }, + "wecomaibot_heartbeat_interval": { + "description": "长连接心跳间隔", + "hint": "长连接模式心跳间隔(秒),建议 30 秒。" }, "wpp_active_message_poll": { "description": "是否启用主动消息轮询", @@ -582,11 +610,11 @@ }, "msg_push_webhook_url": { "description": "企业微信消息推送 Webhook URL", - "hint": "用于主动消息推送,请在企微群->消息推送得到 URL。强烈建议设置此项以带来更好的消息发送体验。" + "hint": "可选。用于主动消息推送,请在企微群->消息推送得到 URL。建议设置此项以带来更好的消息发送体验。" }, "only_use_webhook_url_to_send": { "description": "仅使用 Webhook 发送消息", - "hint": "启用后,企业微信智能机器人的所有回复都改为通过消息推送 Webhook 发送。消息推送 Webhook 支持更多的消息类型(如图片、文件等)。如果不需要打字机效果,强烈建议使用此选项。" + "hint": "可选。启用后,企业微信智能机器人的所有回复都改为通过消息推送 Webhook 发送。消息推送 Webhook 支持更多的消息类型(如图片、文件等)。如果不需要打字机效果,强烈建议使用此选项。" }, "kook_bot_token": { "description": "机器人 Token", From 7d31140c14ef25ca656cc81c131fad2e2eccd27c Mon Sep 17 00:00:00 2001 From: Soulter <905617992@qq.com> Date: Mon, 9 Mar 2026 11:13:39 +0800 Subject: [PATCH 09/21] chore: bump version to 4.19.4 --- astrbot/cli/__init__.py | 2 +- astrbot/core/config/default.py | 2 +- changelogs/v4.19.4.md | 9 +++++++++ pyproject.toml | 2 +- 4 files changed, 12 insertions(+), 3 deletions(-) create mode 100644 changelogs/v4.19.4.md diff --git a/astrbot/cli/__init__.py b/astrbot/cli/__init__.py index 2fe53d9fd..26d864175 100644 --- a/astrbot/cli/__init__.py +++ b/astrbot/cli/__init__.py @@ -1 +1 @@ -__version__ = "4.19.3" +__version__ = "4.19.4" diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index f1280ba7c..bdabcd933 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -5,7 +5,7 @@ from typing import Any, TypedDict from astrbot.core.utils.astrbot_path import get_astrbot_data_path -VERSION = "4.19.3" +VERSION = "4.19.4" DB_PATH = os.path.join(get_astrbot_data_path(), "data_v4.db") WEBHOOK_SUPPORTED_PLATFORMS = [ diff --git a/changelogs/v4.19.4.md b/changelogs/v4.19.4.md new file mode 100644 index 000000000..33244ff08 --- /dev/null +++ b/changelogs/v4.19.4.md @@ -0,0 +1,9 @@ +## What's Changed + +### 新增 + +- 企业微信智能机器人支持长连接模式。[#5930](https://github.com/AstrBotDevs/AstrBot/pull/5930) + +### New + +- Wecom AI Bot supports long-connection mode(Websockets). [#5930](https://github.com/AstrBotDevs/AstrBot/pull/5930) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index a9e8c060d..6c441f89a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "AstrBot" -version = "4.19.3" +version = "4.19.4" description = "Easy-to-use multi-platform LLM chatbot and development framework" readme = "README.md" requires-python = ">=3.12" From 795aec957803fc50e6107ea9de5caee790c4062a Mon Sep 17 00:00:00 2001 From: ChuwuYo <141227996+ChuwuYo@users.noreply.github.com> Date: Mon, 9 Mar 2026 16:12:22 +0800 Subject: [PATCH 10/21] feat(extension): add filtering and sorting for installed plugins in WebUI (#5923) * feat(extension): add PluginSortControl reusable component for sorting * i18n: add i18n keys for plugin sorting and filtering features * feat(extension): add sorting and status filtering for installed plugins Backend changes (plugin.py): - Add _resolve_plugin_dir method to resolve plugin directory path - Add _get_plugin_installed_at method to get installation time from file mtime - Add installed_at field to plugin API response Frontend changes (InstalledPluginsTab.vue): - Import PluginSortControl component - Add status filter toggle (all/enabled/disabled) using v-btn-toggle - Integrate PluginSortControl for sorting options - Add toolbar layout with actions and controls sections Frontend changes (MarketPluginsTab.vue): - Import PluginSortControl component - Replace v-select + v-btn combination with unified PluginSortControl Frontend changes (useExtensionPage.js): - Add installedStatusFilter, installedSortBy, installedSortOrder refs - Add installedSortItems and installedSortUsesOrder computed properties - Add sortInstalledPlugins function with multi-criteria support - Support sorting by install time, name, author, and update status - Add status filtering in filteredPlugins computed property - Disable default table sorting by setting sortable: false * test: add tests for installed_at field in plugin API - Assert all plugins have installed_at field in get_plugins response - Assert installed_at is not null after plugin installation * fix(extension): add explicit fallbacks for installed plugin sort comparisons * i18n(extension): rename install time label to last modified * fix(extension): cache installed_at parsing and validate timestamp format in tests * test(dashboard): strengthen installed_at coverage for plugin API --- astrbot/dashboard/routes/plugin.py | 32 +++- .../extension/PluginSortControl.vue | 97 ++++++++++++ .../locales/en-US/features/extension.json | 7 + .../locales/zh-CN/features/extension.json | 7 + .../views/extension/InstalledPluginsTab.vue | 118 ++++++++++++--- .../src/views/extension/MarketPluginsTab.vue | 54 +++---- .../src/views/extension/useExtensionPage.js | 139 ++++++++++++++++-- tests/test_dashboard.py | 43 ++++++ 8 files changed, 423 insertions(+), 74 deletions(-) create mode 100644 dashboard/src/components/extension/PluginSortControl.vue diff --git a/astrbot/dashboard/routes/plugin.py b/astrbot/dashboard/routes/plugin.py index bb7769926..d151bbe6f 100644 --- a/astrbot/dashboard/routes/plugin.py +++ b/astrbot/dashboard/routes/plugin.py @@ -5,7 +5,8 @@ import os import ssl import traceback from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timezone +from pathlib import Path import aiohttp import certifi @@ -352,6 +353,34 @@ class PluginRoute(Route): logger.warning(f"获取插件 Logo 失败: {e}") return None + def _resolve_plugin_dir(self, plugin) -> Path | None: + if not plugin.root_dir_name: + return None + + base_dir = Path( + self.plugin_manager.reserved_plugin_path + if plugin.reserved + else self.plugin_manager.plugin_store_path + ) + plugin_dir = base_dir / plugin.root_dir_name + if not plugin_dir.is_dir(): + return None + return plugin_dir + + def _get_plugin_installed_at(self, plugin) -> str | None: + plugin_dir = self._resolve_plugin_dir(plugin) + if plugin_dir is None: + return None + + try: + return datetime.fromtimestamp( + plugin_dir.stat().st_mtime, + timezone.utc, + ).isoformat() + except OSError as exc: + logger.warning(f"获取插件安装时间失败 {plugin.name}: {exc!s}") + return None + async def get_plugins(self): _plugin_resp = [] plugin_name = request.args.get("name") @@ -377,6 +406,7 @@ class PluginRoute(Route): "logo": f"/api/file/{logo_url}" if logo_url else None, "support_platforms": plugin.support_platforms, "astrbot_version": plugin.astrbot_version, + "installed_at": self._get_plugin_installed_at(plugin), } # 检查是否为全空的幽灵插件 if not any( diff --git a/dashboard/src/components/extension/PluginSortControl.vue b/dashboard/src/components/extension/PluginSortControl.vue new file mode 100644 index 000000000..4596424a7 --- /dev/null +++ b/dashboard/src/components/extension/PluginSortControl.vue @@ -0,0 +1,97 @@ + + + + + diff --git a/dashboard/src/i18n/locales/en-US/features/extension.json b/dashboard/src/i18n/locales/en-US/features/extension.json index c97deaf49..07affcd62 100644 --- a/dashboard/src/i18n/locales/en-US/features/extension.json +++ b/dashboard/src/i18n/locales/en-US/features/extension.json @@ -23,6 +23,9 @@ "placeholder": "Search extensions...", "marketPlaceholder": "Search market extensions..." }, + "filters": { + "all": "All" + }, "views": { "card": "Card View", "list": "List View" @@ -122,10 +125,14 @@ "sourceSafetyWarning": "Even with the default source, plugin stability and security cannot be fully guaranteed. Please verify carefully before use." }, "sort": { + "by": "Sort by", "default": "Default", + "installTime": "Last Modified", + "name": "Name", "stars": "Stars", "author": "Author", "updated": "Last Updated", + "updateStatus": "Update Status", "ascending": "Ascending", "descending": "Descending" }, diff --git a/dashboard/src/i18n/locales/zh-CN/features/extension.json b/dashboard/src/i18n/locales/zh-CN/features/extension.json index a67fef728..f42173ffa 100644 --- a/dashboard/src/i18n/locales/zh-CN/features/extension.json +++ b/dashboard/src/i18n/locales/zh-CN/features/extension.json @@ -23,6 +23,9 @@ "placeholder": "搜索插件...", "marketPlaceholder": "搜索市场插件..." }, + "filters": { + "all": "全部" + }, "views": { "card": "卡片视图", "list": "列表视图" @@ -122,10 +125,14 @@ "sourceSafetyWarning": "即使是默认插件源,我们也不能完全保证插件的稳定性和安全性,使用前请谨慎核查。" }, "sort": { + "by": "排序方式", "default": "默认排序", + "installTime": "最后修改时间", + "name": "名称", "stars": "Star数", "author": "作者名", "updated": "更新时间", + "updateStatus": "更新状态", "ascending": "升序", "descending": "降序" }, diff --git a/dashboard/src/views/extension/InstalledPluginsTab.vue b/dashboard/src/views/extension/InstalledPluginsTab.vue index 442ae2433..f5f62f5e8 100644 --- a/dashboard/src/views/extension/InstalledPluginsTab.vue +++ b/dashboard/src/views/extension/InstalledPluginsTab.vue @@ -1,4 +1,5 @@