Compare commits

...

2 Commits

Author SHA1 Message Date
Soulter 7d31140c14 chore: bump version to 4.19.4 2026-03-09 11:13:39 +08:00
Soulter 654112ca86 feat(wecomai): implement long connection mode and update configuration options (#5930) 2026-03-09 11:10:32 +08:00
9 changed files with 647 additions and 56 deletions
+1 -1
View File
@@ -1 +1 @@
__version__ = "4.19.3"
__version__ = "4.19.4"
+33 -14
View File
@@ -5,7 +5,7 @@ from typing import Any, TypedDict
from astrbot.core.utils.astrbot_path import get_astrbot_data_path
VERSION = "4.19.3"
VERSION = "4.19.4"
DB_PATH = os.path.join(get_astrbot_data_path(), "data_v4.db")
WEBHOOK_SUPPORTED_PLATFORMS = [
@@ -342,19 +342,20 @@ CONFIG_METADATA_2 = {
"企业微信智能机器人": {
"id": "wecom_ai_bot",
"type": "wecom_ai_bot",
"hint": "如果发现字段有异常,请重新创建",
"enable": True,
"wecom_ai_bot_connection_mode": "webhook",
"wecom_ai_bot_connection_mode": "long_connection", # long_connection, webhook
"wecom_ai_bot_name": "",
"wecomaibot_ws_bot_id": "",
"wecomaibot_ws_secret": "",
"wecomaibot_token": "",
"wecomaibot_encoding_aes_key": "",
"wecomaibot_init_respond_text": "",
"wecomaibot_friend_message_welcome_text": "",
"wecom_ai_bot_name": "",
"msg_push_webhook_url": "",
"only_use_webhook_url_to_send": False,
"long_connection_bot_id": "",
"long_connection_secret": "",
"long_connection_ws_url": "wss://openws.work.weixin.qq.com",
"long_connection_heartbeat_interval": 30,
"token": "",
"encoding_aes_key": "",
"wecomaibot_ws_url": "wss://openws.work.weixin.qq.com",
"wecomaibot_heartbeat_interval": 30,
"unified_webhook_mode": True,
"webhook_uuid": "",
"callback_server_host": "0.0.0.0",
@@ -754,6 +755,22 @@ CONFIG_METADATA_2 = {
"type": "string",
"hint": "当用户当天进入智能机器人单聊会话,回复欢迎语,留空则不回复。",
},
"wecomaibot_token": {
"description": "企业微信智能机器人 Token",
"type": "string",
"hint": "用于 Webhook 回调模式的身份验证。",
"condition": {
"wecom_ai_bot_connection_mode": "webhook",
},
},
"wecomaibot_encoding_aes_key": {
"description": "企业微信智能机器人 EncodingAESKey",
"type": "string",
"hint": "用于 Webhook 回调模式的消息加密解密。",
"condition": {
"wecom_ai_bot_connection_mode": "webhook",
},
},
"msg_push_webhook_url": {
"description": "企业微信消息推送 Webhook URL",
"type": "string",
@@ -764,7 +781,7 @@ CONFIG_METADATA_2 = {
"type": "bool",
"hint": "启用后,企业微信智能机器人的所有回复都改为通过消息推送 Webhook 发送。消息推送 Webhook 支持更多的消息类型(如图片、文件等)。",
},
"long_connection_bot_id": {
"wecomaibot_ws_bot_id": {
"description": "长连接 BotID",
"type": "string",
"hint": "企业微信智能机器人长连接模式凭证 BotID。",
@@ -772,7 +789,7 @@ CONFIG_METADATA_2 = {
"wecom_ai_bot_connection_mode": "long_connection",
},
},
"long_connection_secret": {
"wecomaibot_ws_secret": {
"description": "长连接 Secret",
"type": "string",
"hint": "企业微信智能机器人长连接模式凭证 Secret。",
@@ -780,17 +797,19 @@ CONFIG_METADATA_2 = {
"wecom_ai_bot_connection_mode": "long_connection",
},
},
"long_connection_ws_url": {
"wecomaibot_ws_url": {
"description": "长连接 WebSocket 地址",
"type": "string",
"invisible": True,
"hint": "默认值为 wss://openws.work.weixin.qq.com,一般无需修改。",
"condition": {
"wecom_ai_bot_connection_mode": "long_connection",
},
},
"long_connection_heartbeat_interval": {
"wecomaibot_heartbeat_interval": {
"description": "长连接心跳间隔",
"type": "int",
"invisible": True,
"hint": "长连接模式心跳间隔(秒),建议 30 秒。",
"condition": {
"wecom_ai_bot_connection_mode": "long_connection",
@@ -840,7 +859,7 @@ CONFIG_METADATA_2 = {
"unified_webhook_mode": {
"description": "统一 Webhook 模式",
"type": "bool",
"hint": "启用后,将使用 AstrBot 统一 Webhook 入口,无需单独开启端口。回调地址为 /api/platform/webhook/{webhook_uuid}",
"hint": "Webhook 模式下使用 AstrBot 统一 Webhook 入口,无需单独开启端口。回调地址为 /api/platform/webhook/{webhook_uuid}",
},
"webhook_uuid": {
"invisible": True,
@@ -1,5 +1,5 @@
"""企业微信智能机器人平台适配器
基于企业微信智能机器人 API 的消息平台适配器,支持 HTTP 回调
基于企业微信智能机器人 API 的消息平台适配器,支持 HTTP 回调与长连接
参考webchat_adapter.py的队列机制,实现异步消息处理和流式响应
"""
@@ -31,6 +31,7 @@ from .wecomai_api import (
WecomAIBotStreamMessageBuilder,
)
from .wecomai_event import WecomAIBotMessageEvent
from .wecomai_long_connection import WecomAIBotLongConnectionClient
from .wecomai_queue_mgr import WecomAIQueueMgr
from .wecomai_server import WecomAIBotServer
from .wecomai_utils import (
@@ -78,8 +79,13 @@ class WecomAIBotAdapter(Platform):
self.settings = platform_settings
# 初始化配置参数
self.token = self.config["token"]
self.encoding_aes_key = self.config["encoding_aes_key"]
self.connection_mode = self.config.get(
"wecom_ai_bot_connection_mode", "webhook"
)
self.token = self.config.get("token", self.config.get("wecomaibot_token", ""))
self.encoding_aes_key = self.config.get(
"encoding_aes_key", self.config.get("wecomaibot_encoding_aes_key", "")
)
self.port = int(self.config["port"])
self.host = self.config.get("callback_server_host", "0.0.0.0")
self.bot_name = self.config.get("wecom_ai_bot_name", "")
@@ -96,25 +102,52 @@ class WecomAIBotAdapter(Platform):
self.only_use_webhook_url_to_send = bool(
self.config.get("only_use_webhook_url_to_send", False),
)
self.long_connection_bot_id = self.config.get(
"wecomaibot_ws_bot_id", self.config.get("long_connection_bot_id", "")
)
self.long_connection_secret = self.config.get(
"wecomaibot_ws_secret", self.config.get("long_connection_secret", "")
)
self.long_connection_ws_url = self.config.get(
"wecomaibot_ws_url",
"wss://openws.work.weixin.qq.com",
)
self.long_connection_heartbeat_interval = int(
self.config.get("wecomaibot_heartbeat_interval", 30),
)
# 平台元数据
self.metadata = PlatformMetadata(
name="wecom_ai_bot",
description="企业微信智能机器人适配器,支持 HTTP 回调接收消息",
description="企业微信智能机器人适配器,支持 HTTP 回调和长连接模式",
id=self.config.get("id", "wecom_ai_bot"),
support_proactive_message=bool(self.msg_push_webhook_url),
)
# 初始化 API 客户端
self.api_client = WecomAIBotAPIClient(self.token, self.encoding_aes_key)
self.api_client: WecomAIBotAPIClient | None = None
self.server: WecomAIBotServer | None = None
self.long_connection_client: WecomAIBotLongConnectionClient | None = None
# 初始化 HTTP 服务器
self.server = WecomAIBotServer(
host=self.host,
port=self.port,
api_client=self.api_client,
message_handler=self._process_message,
)
if self.connection_mode == "long_connection":
if not self.long_connection_bot_id or not self.long_connection_secret:
logger.warning(
"企业微信智能机器人长连接模式缺少 BotID 或 Secret,连接可能失败"
)
self.long_connection_client = WecomAIBotLongConnectionClient(
bot_id=self.long_connection_bot_id,
secret=self.long_connection_secret,
ws_url=self.long_connection_ws_url,
heartbeat_interval=self.long_connection_heartbeat_interval,
message_handler=self._process_long_connection_payload,
)
else:
self.api_client = WecomAIBotAPIClient(self.token, self.encoding_aes_key)
self.server = WecomAIBotServer(
host=self.host,
port=self.port,
api_client=self.api_client,
message_handler=self._process_message,
)
# 事件循环和关闭信号
self.shutdown_event = asyncio.Event()
@@ -161,6 +194,9 @@ class WecomAIBotAdapter(Platform):
加密后的响应消息,无需响应时返回 None
"""
if not self.api_client:
logger.error("Webhook 消息处理失败: API 客户端未初始化")
return None
msgtype = message_data.get("msgtype")
if not msgtype:
logger.warning(f"消息类型未知,忽略: {message_data}")
@@ -320,6 +356,89 @@ class WecomAIBotAdapter(Platform):
logger.error("处理欢迎消息时发生异常: %s", e)
return None
async def _process_long_connection_payload(
self,
payload: dict[str, Any],
) -> None:
"""处理长连接回调消息。"""
cmd = payload.get("cmd")
headers = payload.get("headers") or {}
body = payload.get("body") or {}
req_id = headers.get("req_id")
if not isinstance(body, dict):
return
if cmd == "aibot_msg_callback":
session_id = self._extract_session_id(body)
stream_id = f"{session_id}_{generate_random_string(10)}"
await self._enqueue_message(
body, {"req_id": req_id or ""}, stream_id, session_id
)
self.queue_mgr.set_pending_response(
stream_id,
{
"req_id": req_id or "",
"connection_mode": "long_connection",
},
)
if self.initial_respond_text and req_id:
await self._send_long_connection_respond_msg(
req_id=req_id,
body={
"msgtype": "stream",
"stream": {
"id": stream_id,
"finish": False,
"content": self.initial_respond_text,
},
},
)
return
if cmd == "aibot_event_callback":
event = body.get("event") or {}
event_type = event.get("eventtype")
if (
event_type == "enter_chat"
and self.friend_message_welcome_text
and req_id
):
await self._send_long_connection_respond_welcome(req_id)
elif event_type == "disconnected_event":
logger.warning(
"[WecomAI][LongConn] 收到 disconnected_event,旧连接将被关闭"
)
async def _send_long_connection_respond_welcome(self, req_id: str) -> bool:
client = self.long_connection_client
if not client:
return False
return await client.send_command(
cmd="aibot_respond_welcome_msg",
req_id=req_id,
body={
"msgtype": "text",
"text": {
"content": self.friend_message_welcome_text,
},
},
)
async def _send_long_connection_respond_msg(
self,
req_id: str,
body: dict[str, Any],
) -> bool:
client = self.long_connection_client
if not client:
return False
return await client.send_command(
cmd="aibot_respond_msg",
req_id=req_id,
body=body,
)
def _extract_session_id(self, message_data: dict[str, Any]) -> str:
"""从消息数据中提取会话ID"""
user_id = message_data.get("from", {}).get("userid", "default_user")
@@ -355,15 +474,16 @@ class WecomAIBotAdapter(Platform):
content = ""
image_base64 = []
_img_url_to_process = []
_img_url_to_process: list[tuple[str, str | None]] = []
msg_items = []
if msgtype == WecomAIBotConstants.MSG_TYPE_TEXT:
content = WecomAIBotMessageParser.parse_text_message(message_data)
elif msgtype == WecomAIBotConstants.MSG_TYPE_IMAGE:
_img_url_to_process.append(
WecomAIBotMessageParser.parse_image_message(message_data),
)
image_payload = message_data.get("image", {})
image_url = image_payload.get("url", "")
if image_url:
_img_url_to_process.append((image_url, image_payload.get("aeskey")))
elif msgtype == WecomAIBotConstants.MSG_TYPE_MIXED:
# 提取混合消息中的文本内容
msg_items = WecomAIBotMessageParser.parse_mixed_message(message_data)
@@ -374,9 +494,12 @@ class WecomAIBotAdapter(Platform):
if text_content:
text_parts.append(text_content)
elif item.get("msgtype") == WecomAIBotConstants.MSG_TYPE_IMAGE:
image_url = item.get("image", {}).get("url", "")
image_payload = item.get("image", {})
image_url = image_payload.get("url", "")
if image_url:
_img_url_to_process.append(image_url)
_img_url_to_process.append(
(image_url, image_payload.get("aeskey"))
)
content = " ".join(text_parts) if text_parts else ""
else:
content = f"[{msgtype}消息]"
@@ -384,8 +507,8 @@ class WecomAIBotAdapter(Platform):
# 并行处理图片下载和解密
if _img_url_to_process:
tasks = [
process_encrypted_image(url, self.encoding_aes_key)
for url in _img_url_to_process
process_encrypted_image(url, aes_key or self.encoding_aes_key)
for url, aes_key in _img_url_to_process
]
results = await asyncio.gather(*tasks)
for success, result in results:
@@ -459,26 +582,43 @@ class WecomAIBotAdapter(Platform):
"""运行适配器,同时启动HTTP服务器和队列监听器"""
async def run_both() -> None:
# 如果启用统一 webhook 模式,则不启动独立服务器
webhook_uuid = self.config.get("webhook_uuid")
if self.unified_webhook_mode and webhook_uuid:
log_webhook_info(f"{self.meta().id}(企业微信智能机器人)", webhook_uuid)
# 只运行队列监听器
await self.queue_listener.run()
else:
if self.connection_mode == "long_connection":
if not self.long_connection_client:
raise RuntimeError("长连接客户端未初始化")
logger.info(
"启动企业微信智能机器人适配器,监听 %s:%d", self.host, self.port
"启动企业微信智能机器人长连接模式: %s", self.long_connection_ws_url
)
# 同时运行HTTP服务器和队列监听器
await asyncio.gather(
self.server.start_server(),
self.long_connection_client.start(),
self.queue_listener.run(),
)
else:
# 如果启用统一 webhook 模式,则不启动独立服务器
webhook_uuid = self.config.get("webhook_uuid")
if self.unified_webhook_mode and webhook_uuid:
log_webhook_info(
f"{self.meta().id}(企业微信智能机器人)", webhook_uuid
)
# 只运行队列监听器
await self.queue_listener.run()
else:
if not self.server:
raise RuntimeError("Webhook 服务器未初始化")
logger.info(
"启动企业微信智能机器人适配器,监听 %s:%d", self.host, self.port
)
# 同时运行HTTP服务器和队列监听器
await asyncio.gather(
self.server.start_server(),
self.queue_listener.run(),
)
return run_both()
async def webhook_callback(self, request: Any) -> Any:
"""统一 Webhook 回调入口"""
if self.connection_mode == "long_connection" or not self.server:
return "long_connection mode does not accept webhook callbacks", 400
# 根据请求方法分发到不同的处理函数
if request.method == "GET":
return await self.server.handle_verify(request)
@@ -489,7 +629,10 @@ class WecomAIBotAdapter(Platform):
"""终止适配器"""
logger.info("企业微信智能机器人适配器正在关闭...")
self.shutdown_event.set()
await self.server.shutdown()
if self.long_connection_client:
await self.long_connection_client.shutdown()
if self.server:
await self.server.shutdown()
def meta(self) -> PlatformMetadata:
"""获取平台元数据"""
@@ -507,17 +650,22 @@ class WecomAIBotAdapter(Platform):
queue_mgr=self.queue_mgr,
webhook_client=self.webhook_client,
only_use_webhook_url_to_send=self.only_use_webhook_url_to_send,
long_connection_sender=self._send_long_connection_respond_msg,
)
message_event.is_at_or_wake_command = (
True # 企业微信智能机器人默认消息都是 at 或唤醒命令
)
message_event.is_wake = True # 企业微信智能机器人消息默认当做唤醒命令处理
self.commit_event(message_event)
except Exception as e:
logger.error("处理消息时发生异常: %s", e)
def get_client(self) -> WecomAIBotAPIClient:
def get_client(self) -> WecomAIBotAPIClient | None:
"""获取 API 客户端"""
return self.api_client
def get_server(self) -> WecomAIBotServer:
def get_server(self) -> WecomAIBotServer | None:
"""获取 HTTP 服务器实例"""
return self.server
@@ -1,5 +1,7 @@
"""企业微信智能机器人事件处理模块,处理消息事件的发送和接收"""
from collections.abc import Awaitable, Callable
from astrbot.api import logger
from astrbot.api.event import AstrMessageEvent, MessageChain
from astrbot.api.message_components import At, Image, Plain
@@ -18,10 +20,11 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
message_obj,
platform_meta,
session_id: str,
api_client: WecomAIBotAPIClient,
api_client: WecomAIBotAPIClient | None,
queue_mgr: WecomAIQueueMgr,
webhook_client: WecomAIBotWebhookClient | None = None,
only_use_webhook_url_to_send: bool = False,
long_connection_sender: (Callable[[str, dict], Awaitable[bool]] | None) = None,
) -> None:
"""初始化消息事件
@@ -38,6 +41,7 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
self.queue_mgr = queue_mgr
self.webhook_client = webhook_client
self.only_use_webhook_url_to_send = only_use_webhook_url_to_send
self.long_connection_sender = long_connection_sender
async def _mark_stream_complete(self, stream_id: str) -> None:
back_queue = self.queue_mgr.get_or_create_back_queue(stream_id)
@@ -117,6 +121,18 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
return data
@staticmethod
def _extract_plain_text_from_chain(message_chain: MessageChain | None) -> str:
if not message_chain:
return ""
plain_parts: list[str] = []
for comp in message_chain.chain:
if isinstance(comp, At):
plain_parts.append(f"@{comp.name} ")
elif isinstance(comp, Plain):
plain_parts.append(comp.text)
return "".join(plain_parts).strip()
async def send(self, message: MessageChain | None) -> None:
"""发送消息"""
raw = self.message_obj.raw_message
@@ -124,6 +140,44 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
"wecom_ai_bot platform event raw_message should be a dict"
)
stream_id = raw.get("stream_id", self.session_id)
pending_response = self.queue_mgr.get_pending_response(stream_id) or {}
connection_mode = pending_response.get("callback_params", {}).get(
"connection_mode"
)
req_id = pending_response.get("callback_params", {}).get("req_id")
if (
connection_mode == "long_connection"
and self.long_connection_sender
and isinstance(req_id, str)
and req_id
):
if self.only_use_webhook_url_to_send and self.webhook_client and message:
await self.webhook_client.send_message_chain(message)
await super().send(MessageChain([]))
return
if self.webhook_client and message:
await self.webhook_client.send_message_chain(
message,
unsupported_only=True,
)
content = self._extract_plain_text_from_chain(message)
await self.long_connection_sender(
req_id,
{
"msgtype": "stream",
"stream": {
"id": stream_id,
"finish": True,
"content": content,
},
},
)
await super().send(MessageChain([]))
return
if self.only_use_webhook_url_to_send and self.webhook_client and message:
await self.webhook_client.send_message_chain(message)
await self._mark_stream_complete(stream_id)
@@ -152,8 +206,77 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
"wecom_ai_bot platform event raw_message should be a dict"
)
stream_id = raw.get("stream_id", self.session_id)
pending_response = self.queue_mgr.get_pending_response(stream_id) or {}
connection_mode = pending_response.get("callback_params", {}).get(
"connection_mode"
)
req_id = pending_response.get("callback_params", {}).get("req_id")
back_queue = self.queue_mgr.get_or_create_back_queue(stream_id)
if (
connection_mode == "long_connection"
and self.long_connection_sender
and isinstance(req_id, str)
and req_id
):
if self.only_use_webhook_url_to_send and self.webhook_client:
merged_chain = MessageChain([])
async for chain in generator:
merged_chain.chain.extend(chain.chain)
merged_chain.squash_plain()
await self.webhook_client.send_message_chain(merged_chain)
await self.long_connection_sender(
req_id,
{
"msgtype": "stream",
"stream": {
"id": stream_id,
"finish": True,
"content": "",
},
},
)
await super().send_streaming(generator, use_fallback)
return
increment_plain = ""
async for chain in generator:
if self.webhook_client:
await self.webhook_client.send_message_chain(
chain,
unsupported_only=True,
)
chain.squash_plain()
chunk_text = self._extract_plain_text_from_chain(chain)
if chunk_text:
increment_plain += chunk_text
await self.long_connection_sender(
req_id,
{
"msgtype": "stream",
"stream": {
"id": stream_id,
"finish": False,
"content": increment_plain,
},
},
)
await self.long_connection_sender(
req_id,
{
"msgtype": "stream",
"stream": {
"id": stream_id,
"finish": True,
"content": increment_plain,
},
},
)
await super().send_streaming(generator, use_fallback)
return
if self.only_use_webhook_url_to_send and self.webhook_client:
merged_chain = MessageChain([])
async for chain in generator:
@@ -0,0 +1,236 @@
"""企业微信智能机器人长连接客户端。"""
import asyncio
import json
import uuid
from collections.abc import Awaitable, Callable
from typing import Any
import aiohttp
from astrbot.api import logger
class WecomAIBotLongConnectionClient:
"""企业微信智能机器人 WebSocket 长连接客户端。"""
def __init__(
self,
bot_id: str,
secret: str,
ws_url: str,
heartbeat_interval: int,
message_handler: Callable[[dict[str, Any]], Awaitable[None]],
) -> None:
self.bot_id = bot_id
self.secret = secret
self.ws_url = ws_url
self.heartbeat_interval = max(5, int(heartbeat_interval))
self.message_handler = message_handler
self._session: aiohttp.ClientSession | None = None
self._ws: aiohttp.ClientWebSocketResponse | None = None
self._shutdown_event = asyncio.Event()
self._send_lock = asyncio.Lock()
self._command_lock = asyncio.Lock()
self._response_waiters: dict[str, asyncio.Future[dict[str, Any]]] = {}
@staticmethod
def gen_req_id() -> str:
return uuid.uuid4().hex
async def start(self) -> None:
"""启动长连接并自动重连。"""
reconnect_delay = 1
while not self._shutdown_event.is_set():
try:
await self._run_once()
reconnect_delay = 1
except asyncio.CancelledError:
raise
except Exception as e:
logger.error("[WecomAI][LongConn] 长连接异常: %s", e)
if self._shutdown_event.is_set():
break
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, 30)
async def _run_once(self) -> None:
timeout = aiohttp.ClientTimeout(total=None, sock_connect=15, sock_read=None)
async with aiohttp.ClientSession(timeout=timeout) as session:
self._session = session
logger.info("[WecomAI][LongConn] 正在连接: %s", self.ws_url)
async with session.ws_connect(
self.ws_url, heartbeat=None, autoping=True
) as ws:
self._ws = ws
await self._subscribe()
logger.info("[WecomAI][LongConn] 订阅成功,已建立长连接")
heartbeat_task = asyncio.create_task(self._heartbeat_loop())
try:
while not self._shutdown_event.is_set():
message = await ws.receive()
if message.type == aiohttp.WSMsgType.TEXT:
await self._handle_text_message(message.data)
elif message.type in {
aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.ERROR,
}:
break
finally:
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass
self._ws = None
async def _subscribe(self) -> None:
"""发送 aibot_subscribe,并等待响应。"""
req_id = self.gen_req_id()
payload = {
"cmd": "aibot_subscribe",
"headers": {"req_id": req_id},
"body": {"bot_id": self.bot_id, "secret": self.secret},
}
await self._send_json(payload)
if not self._ws:
raise RuntimeError("WebSocket 未建立")
reply = await self._ws.receive(timeout=10)
if reply.type != aiohttp.WSMsgType.TEXT:
raise RuntimeError(f"订阅失败: 非文本响应 {reply.type}")
data = json.loads(reply.data)
if data.get("errcode") != 0:
raise RuntimeError(
f"订阅失败 errcode={data.get('errcode')} errmsg={data.get('errmsg')}"
)
async def _heartbeat_loop(self) -> None:
while not self._shutdown_event.is_set():
await asyncio.sleep(self.heartbeat_interval)
if self._shutdown_event.is_set():
break
try:
await self.send_command("ping", self.gen_req_id(), None)
except Exception as e:
logger.warning("[WecomAI][LongConn] 发送心跳失败: %s", e)
return
async def _handle_text_message(self, text: str) -> None:
try:
payload = json.loads(text)
except json.JSONDecodeError:
logger.warning("[WecomAI][LongConn] 收到非 JSON 消息: %s", text)
return
headers = payload.get("headers") or {}
req_id = headers.get("req_id")
if isinstance(req_id, str):
waiter = self._response_waiters.get(req_id)
if waiter and not waiter.done():
waiter.set_result(payload)
return
cmd = payload.get("cmd")
if cmd in {"aibot_msg_callback", "aibot_event_callback"}:
await self.message_handler(payload)
return
if payload.get("errcode") not in (None, 0):
logger.warning(
"[WecomAI][LongConn] 服务端返回错误: errcode=%s errmsg=%s",
payload.get("errcode"),
payload.get("errmsg"),
)
async def send_command(
self,
cmd: str,
req_id: str,
body: dict[str, Any] | None,
) -> bool:
"""发送长连接命令。"""
headers = {"req_id": req_id}
payload: dict[str, Any] = {"cmd": cmd, "headers": headers}
if body is not None:
payload["body"] = body
async with self._command_lock:
max_retries = 3
for attempt in range(max_retries + 1):
response = await self._send_and_wait_response(req_id, payload)
if not response:
if attempt < max_retries:
await asyncio.sleep(min(0.2 * (2**attempt), 2.0))
continue
return False
errcode = response.get("errcode")
if errcode in (0, None):
return True
if errcode == 6000 and attempt < max_retries:
backoff = min(0.2 * (2**attempt), 2.0)
logger.warning(
"[WecomAI][LongConn] 命令冲突(errcode=6000),将重试。cmd=%s req_id=%s attempt=%d",
cmd,
req_id,
attempt + 1,
)
await asyncio.sleep(backoff)
continue
logger.warning(
"[WecomAI][LongConn] 命令失败: cmd=%s req_id=%s errcode=%s errmsg=%s",
cmd,
req_id,
errcode,
response.get("errmsg"),
)
return False
return False
async def _send_and_wait_response(
self,
req_id: str,
payload: dict[str, Any],
timeout: float = 10.0,
) -> dict[str, Any] | None:
loop = asyncio.get_running_loop()
waiter: asyncio.Future[dict[str, Any]] = loop.create_future()
self._response_waiters[req_id] = waiter
try:
await self._send_json(payload)
return await asyncio.wait_for(waiter, timeout=timeout)
except TimeoutError:
logger.warning(
"[WecomAI][LongConn] 等待命令响应超时: cmd=%s req_id=%s",
payload.get("cmd"),
req_id,
)
return None
finally:
self._response_waiters.pop(req_id, None)
async def _send_json(self, payload: dict[str, Any]) -> None:
ws = self._ws
if ws is None or ws.closed:
raise RuntimeError("长连接尚未建立")
async with self._send_lock:
await ws.send_json(payload)
async def shutdown(self) -> None:
self._shutdown_event.set()
ws = self._ws
if ws is not None and not ws.closed:
await ws.close()
session = self._session
if session is not None and not session.closed:
await session.close()
+9
View File
@@ -0,0 +1,9 @@
## What's Changed
### 新增
- 企业微信智能机器人支持长连接模式。[#5930](https://github.com/AstrBotDevs/AstrBot/pull/5930)
### New
- Wecom AI Bot supports long-connection mode(Websockets). [#5930](https://github.com/AstrBotDevs/AstrBot/pull/5930)
@@ -550,6 +550,10 @@
"description": "WeCom AI Bot Name",
"hint": "Must be correct; otherwise some commands won't work."
},
"wecom_ai_bot_connection_mode": {
"description": "WeCom AI Bot Connection Mode",
"hint": "Webhook mode requires Token/EncodingAESKey; long_connection mode requires BotID/Secret."
},
"wecomaibot_friend_message_welcome_text": {
"description": "WeCom AI Bot DM Welcome Message",
"hint": "When a user enters a DM session on that day, reply with a welcome message. Leave empty to disable."
@@ -558,6 +562,30 @@
"description": "WeCom AI Bot Initial Response Text",
"hint": "First reply when the bot receives a message. Leave empty to disable."
},
"wecomaibot_token": {
"description": "WeCom AI Bot Token",
"hint": "Used for authentication in webhook callback mode."
},
"wecomaibot_encoding_aes_key": {
"description": "WeCom AI Bot EncodingAESKey",
"hint": "Used for message encryption/decryption in webhook callback mode."
},
"wecomaibot_ws_bot_id": {
"description": "Long Connection BotID",
"hint": "BotID credential for WeCom AI Bot long connection mode."
},
"wecomaibot_ws_secret": {
"description": "Long Connection Secret",
"hint": "Secret credential for WeCom AI Bot long connection mode."
},
"wecomaibot_ws_url": {
"description": "Long Connection WebSocket URL",
"hint": "Default is wss://openws.work.weixin.qq.com and usually does not need changes."
},
"wecomaibot_heartbeat_interval": {
"description": "Long Connection Heartbeat Interval",
"hint": "Heartbeat interval (seconds) in long connection mode. 30 seconds is recommended."
},
"wpp_active_message_poll": {
"description": "Enable Proactive Message Polling",
"hint": "Only enable if WeChat messages are not syncing to AstrBot on time. Disabled by default."
@@ -543,7 +543,7 @@
},
"unified_webhook_mode": {
"description": "统一 Webhook 模式",
"hint": "启用后,将使用 AstrBot 统一 Webhook 入口,无需单独开启端口。回调地址为 /api/platform/webhook/{webhook_uuid}。"
"hint": "Webhook 模式下使用 AstrBot 统一 Webhook 入口,无需单独开启端口。回调地址为 /api/platform/webhook/{webhook_uuid}。"
},
"webhook_uuid": {
"description": "Webhook UUID",
@@ -553,13 +553,41 @@
"description": "企业微信智能机器人的名字",
"hint": "请务必填写正确,否则无法使用一些指令。"
},
"wecom_ai_bot_connection_mode": {
"description": "企业微信智能机器人连接模式",
"hint": "Webhook 回调模式需要配置 Token/EncodingAESKey;长连接模式需要配置 BotID/Secret。"
},
"wecomaibot_friend_message_welcome_text": {
"description": "企业微信智能机器人私聊欢迎语",
"hint": "当用户当天进入智能机器人单聊会话,回复欢迎语,如 “💭 思考中...”。留空则不回复。"
"hint": "可选。当用户当天进入智能机器人单聊会话,回复欢迎语,如 “💭 思考中...”。留空则不回复。"
},
"wecomaibot_init_respond_text": {
"description": "企业微信智能机器人初始响应文本",
"hint": "当机器人收到消息时,首先回复的文本内容。留空则不设置。"
"hint": "可选。当机器人收到消息时,首先回复的文本内容。留空则不设置。"
},
"wecomaibot_token": {
"description": "企业微信智能机器人 Token",
"hint": "用于 Webhook 回调模式的身份验证。"
},
"wecomaibot_encoding_aes_key": {
"description": "企业微信智能机器人 EncodingAESKey",
"hint": "用于 Webhook 回调模式的消息加密解密。"
},
"wecomaibot_ws_bot_id": {
"description": "长连接 BotID",
"hint": "企业微信智能机器人长连接模式凭证 BotID。"
},
"wecomaibot_ws_secret": {
"description": "长连接 Secret",
"hint": "企业微信智能机器人长连接模式凭证 Secret。"
},
"wecomaibot_ws_url": {
"description": "长连接 WebSocket 地址",
"hint": "默认值为 wss://openws.work.weixin.qq.com,一般无需修改。"
},
"wecomaibot_heartbeat_interval": {
"description": "长连接心跳间隔",
"hint": "长连接模式心跳间隔(秒),建议 30 秒。"
},
"wpp_active_message_poll": {
"description": "是否启用主动消息轮询",
@@ -582,11 +610,11 @@
},
"msg_push_webhook_url": {
"description": "企业微信消息推送 Webhook URL",
"hint": "用于主动消息推送,请在企微群->消息推送得到 URL。强烈建议设置此项以带来更好的消息发送体验。"
"hint": "可选。用于主动消息推送,请在企微群->消息推送得到 URL。建议设置此项以带来更好的消息发送体验。"
},
"only_use_webhook_url_to_send": {
"description": "仅使用 Webhook 发送消息",
"hint": "启用后,企业微信智能机器人的所有回复都改为通过消息推送 Webhook 发送。消息推送 Webhook 支持更多的消息类型(如图片、文件等)。如果不需要打字机效果,强烈建议使用此选项。"
"hint": "可选。启用后,企业微信智能机器人的所有回复都改为通过消息推送 Webhook 发送。消息推送 Webhook 支持更多的消息类型(如图片、文件等)。如果不需要打字机效果,强烈建议使用此选项。"
},
"kook_bot_token": {
"description": "机器人 Token",
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "AstrBot"
version = "4.19.3"
version = "4.19.4"
description = "Easy-to-use multi-platform LLM chatbot and development framework"
readme = "README.md"
requires-python = ">=3.12"