feat: 企业微信智能机器人支持主动消息推送以及发送视频、文件等消息类型支持 (#4999)

This commit is contained in:
Soulter
2026-02-09 22:16:44 +08:00
committed by GitHub
parent 04faf26140
commit d204b92877
7 changed files with 465 additions and 29 deletions
+14 -2
View File
@@ -319,9 +319,11 @@ CONFIG_METADATA_2 = {
"id": "wecom_ai_bot",
"type": "wecom_ai_bot",
"enable": True,
"wecomaibot_init_respond_text": "💭 思考中...",
"wecomaibot_init_respond_text": "",
"wecomaibot_friend_message_welcome_text": "",
"wecom_ai_bot_name": "",
"msg_push_webhook_url": "",
"only_use_webhook_url_to_send": False,
"token": "",
"encoding_aes_key": "",
"unified_webhook_mode": True,
@@ -687,13 +689,23 @@ CONFIG_METADATA_2 = {
"wecomaibot_init_respond_text": {
"description": "企业微信智能机器人初始响应文本",
"type": "string",
"hint": "当机器人收到消息时,首先回复的文本内容。留空则使用默认值",
"hint": "当机器人收到消息时,首先回复的文本内容。留空则不设置",
},
"wecomaibot_friend_message_welcome_text": {
"description": "企业微信智能机器人私聊欢迎语",
"type": "string",
"hint": "当用户当天进入智能机器人单聊会话,回复欢迎语,留空则不回复。",
},
"msg_push_webhook_url": {
"description": "企业微信消息推送 Webhook URL",
"type": "string",
"hint": "用于 send_by_session 主动消息推送。格式示例: https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx",
},
"only_use_webhook_url_to_send": {
"description": "仅使用 Webhook 发送消息",
"type": "bool",
"hint": "启用后,企业微信智能机器人的所有回复都改为通过消息推送 Webhook 发送。消息推送 Webhook 支持更多的消息类型(如图片、文件等)。",
},
"lark_bot_name": {
"description": "飞书机器人的名字",
"type": "string",
@@ -39,6 +39,7 @@ from .wecomai_utils import (
generate_random_string,
process_encrypted_image,
)
from .wecomai_webhook import WecomAIBotWebhookClient, WecomAIBotWebhookError
class WecomAIQueueListener:
@@ -84,20 +85,24 @@ class WecomAIBotAdapter(Platform):
self.bot_name = self.config.get("wecom_ai_bot_name", "")
self.initial_respond_text = self.config.get(
"wecomaibot_init_respond_text",
"💭 思考中...",
"",
)
self.friend_message_welcome_text = self.config.get(
"wecomaibot_friend_message_welcome_text",
"",
)
self.unified_webhook_mode = self.config.get("unified_webhook_mode", False)
self.msg_push_webhook_url = self.config.get("msg_push_webhook_url", "").strip()
self.only_use_webhook_url_to_send = bool(
self.config.get("only_use_webhook_url_to_send", False),
)
# 平台元数据
self.metadata = PlatformMetadata(
name="wecom_ai_bot",
description="企业微信智能机器人适配器,支持 HTTP 回调接收消息",
id=self.config.get("id", "wecom_ai_bot"),
support_proactive_message=False,
support_proactive_message=bool(self.msg_push_webhook_url),
)
# 初始化 API 客户端
@@ -123,6 +128,15 @@ class WecomAIBotAdapter(Platform):
self._handle_queued_message,
)
self.webhook_client: WecomAIBotWebhookClient | None = None
if self.msg_push_webhook_url:
try:
self.webhook_client = WecomAIBotWebhookClient(
self.msg_push_webhook_url,
)
except WecomAIBotWebhookError as e:
logger.error("企业微信消息推送 webhook 配置无效: %s", e)
async def _handle_queued_message(self, data: dict) -> None:
"""处理队列中的消息,类似webchat的callback"""
try:
@@ -164,16 +178,19 @@ class WecomAIBotAdapter(Platform):
)
self.queue_mgr.set_pending_response(stream_id, callback_params)
resp = WecomAIBotStreamMessageBuilder.make_text_stream(
stream_id,
self.initial_respond_text,
False,
)
return await self.api_client.encrypt_message(
resp,
callback_params["nonce"],
callback_params["timestamp"],
)
if self.only_use_webhook_url_to_send and self.webhook_client:
return None
if self.initial_respond_text:
resp = WecomAIBotStreamMessageBuilder.make_text_stream(
stream_id,
self.initial_respond_text,
False,
)
return await self.api_client.encrypt_message(
resp,
callback_params["nonce"],
callback_params["timestamp"],
)
except Exception as e:
logger.error("处理消息时发生异常: %s", e)
return None
@@ -393,9 +410,23 @@ class WecomAIBotAdapter(Platform):
session: MessageSesion,
message_chain: MessageChain,
) -> None:
"""通过会话发送消息"""
# 企业微信智能机器人主要通过回调响应,这里记录日志
logger.info("会话发送消息: %s -> %s", session.session_id, message_chain)
"""通过消息推送 webhook 发送消息"""
if not self.webhook_client:
logger.warning(
"主动消息发送失败: 未配置企业微信消息推送 Webhook URL,请前往配置添加。session_id=%s",
session.session_id,
)
await super().send_by_session(session, message_chain)
return
try:
await self.webhook_client.send_message_chain(message_chain)
except Exception as e:
logger.error(
"企业微信消息推送失败(session=%s): %s",
session.session_id,
e,
)
await super().send_by_session(session, message_chain)
def run(self) -> Awaitable[Any]:
@@ -448,6 +479,8 @@ class WecomAIBotAdapter(Platform):
session_id=message.session_id,
api_client=self.api_client,
queue_mgr=self.queue_mgr,
webhook_client=self.webhook_client,
only_use_webhook_url_to_send=self.only_use_webhook_url_to_send,
)
self.commit_event(message_event)
@@ -2,13 +2,11 @@
from astrbot.api import logger
from astrbot.api.event import AstrMessageEvent, MessageChain
from astrbot.api.message_components import (
Image,
Plain,
)
from astrbot.api.message_components import At, Image, Plain
from .wecomai_api import WecomAIBotAPIClient
from .wecomai_queue_mgr import WecomAIQueueMgr
from .wecomai_webhook import WecomAIBotWebhookClient
class WecomAIBotMessageEvent(AstrMessageEvent):
@@ -22,6 +20,8 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
session_id: str,
api_client: WecomAIBotAPIClient,
queue_mgr: WecomAIQueueMgr,
webhook_client: WecomAIBotWebhookClient | None = None,
only_use_webhook_url_to_send: bool = False,
) -> None:
"""初始化消息事件
@@ -36,6 +36,19 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
super().__init__(message_str, message_obj, platform_meta, session_id)
self.api_client = api_client
self.queue_mgr = queue_mgr
self.webhook_client = webhook_client
self.only_use_webhook_url_to_send = only_use_webhook_url_to_send
async def _mark_stream_complete(self, stream_id: str) -> None:
back_queue = self.queue_mgr.get_or_create_back_queue(stream_id)
await back_queue.put(
{
"type": "complete",
"data": "",
"streaming": False,
"session_id": stream_id,
},
)
@staticmethod
async def _send(
@@ -43,6 +56,7 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
stream_id: str,
queue_mgr: WecomAIQueueMgr,
streaming: bool = False,
suppress_unsupported_log: bool = False,
):
back_queue = queue_mgr.get_or_create_back_queue(stream_id)
@@ -58,7 +72,17 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
data = ""
for comp in message_chain.chain:
if isinstance(comp, Plain):
if isinstance(comp, At):
data = f"@{comp.name} "
await back_queue.put(
{
"type": "plain",
"data": data,
"streaming": streaming,
"session_id": stream_id,
},
)
elif isinstance(comp, Plain):
data = comp.text
await back_queue.put(
{
@@ -86,7 +110,10 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
except Exception as e:
logger.error("处理图片消息失败: %s", e)
else:
logger.warning(f"[WecomAI] 不支持的消息组件类型: {type(comp)}, 跳过")
if not suppress_unsupported_log:
logger.warning(
f"[WecomAI] 不支持的消息组件类型: {type(comp)}, 跳过"
)
return data
@@ -97,7 +124,24 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
"wecom_ai_bot platform event raw_message should be a dict"
)
stream_id = raw.get("stream_id", self.session_id)
await WecomAIBotMessageEvent._send(message, stream_id, self.queue_mgr)
if self.only_use_webhook_url_to_send and self.webhook_client and message:
await self.webhook_client.send_message_chain(message)
await self._mark_stream_complete(stream_id)
await super().send(MessageChain([]))
return
if self.webhook_client and message:
await self.webhook_client.send_message_chain(
message,
unsupported_only=True,
)
await WecomAIBotMessageEvent._send(
message,
stream_id,
self.queue_mgr,
suppress_unsupported_log=self.webhook_client is not None,
)
await super().send(MessageChain([]))
async def send_streaming(self, generator, use_fallback=False) -> None:
@@ -110,9 +154,23 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
stream_id = raw.get("stream_id", self.session_id)
back_queue = self.queue_mgr.get_or_create_back_queue(stream_id)
if self.only_use_webhook_url_to_send and self.webhook_client:
merged_chain = MessageChain([])
async for chain in generator:
merged_chain.chain.extend(chain.chain)
merged_chain.squash_plain()
await self.webhook_client.send_message_chain(merged_chain)
await self._mark_stream_complete(stream_id)
await super().send_streaming(generator, use_fallback)
return
# 企业微信智能机器人不支持增量发送,因此我们需要在这里将增量内容累积起来,积累发送
increment_plain = ""
async for chain in generator:
if self.webhook_client:
await self.webhook_client.send_message_chain(
chain, unsupported_only=True
)
# 累积增量内容,并改写 Plain 段
chain.squash_plain()
for comp in chain.chain:
@@ -139,6 +197,7 @@ class WecomAIBotMessageEvent(AstrMessageEvent):
stream_id=stream_id,
queue_mgr=self.queue_mgr,
streaming=True,
suppress_unsupported_log=self.webhook_client is not None,
)
await back_queue.put(
@@ -0,0 +1,225 @@
"""企业微信智能机器人 webhook 推送客户端。"""
from __future__ import annotations
import base64
import hashlib
import mimetypes
from pathlib import Path
from typing import Any, Literal
from urllib.parse import parse_qs, urlencode, urlparse
import aiohttp
from astrbot.api import logger
from astrbot.api.event import MessageChain
from astrbot.api.message_components import At, File, Image, Plain, Record, Video
from astrbot.core.utils.media_utils import convert_audio_format
class WecomAIBotWebhookError(RuntimeError):
"""企业微信 webhook 推送异常。"""
class WecomAIBotWebhookClient:
"""企业微信智能机器人 webhook 消息推送客户端。"""
def __init__(self, webhook_url: str, timeout_seconds: int = 15) -> None:
self.webhook_url = webhook_url.strip()
self.timeout_seconds = timeout_seconds
if not self.webhook_url:
raise WecomAIBotWebhookError("消息推送 webhook URL 不能为空")
self._webhook_key = self._extract_webhook_key()
def _extract_webhook_key(self) -> str:
parsed = urlparse(self.webhook_url)
key = parse_qs(parsed.query).get("key", [""])[0].strip()
if not key:
raise WecomAIBotWebhookError("消息推送 webhook URL 缺少 key 参数")
return key
def _build_upload_url(self, media_type: Literal["file", "voice"]) -> str:
query = urlencode({"key": self._webhook_key, "type": media_type})
return f"https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?{query}"
@staticmethod
def _split_markdown_v2_content(content: str, max_bytes: int = 4096) -> list[str]:
if not content:
return []
chunks: list[str] = []
buffer: list[str] = []
current_size = 0
for char in content:
char_size = len(char.encode("utf-8"))
if current_size + char_size > max_bytes and buffer:
chunks.append("".join(buffer))
buffer = [char]
current_size = char_size
else:
buffer.append(char)
current_size += char_size
if buffer:
chunks.append("".join(buffer))
return chunks
async def send_payload(self, payload: dict[str, Any]) -> None:
timeout = aiohttp.ClientTimeout(total=self.timeout_seconds)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(self.webhook_url, json=payload) as response:
text = await response.text()
if response.status != 200:
raise WecomAIBotWebhookError(
f"Webhook 请求失败: HTTP {response.status}, {text}"
)
result = await response.json(content_type=None)
if result.get("errcode") != 0:
raise WecomAIBotWebhookError(
f"Webhook 返回错误: {result.get('errcode')} {result.get('errmsg')}"
)
logger.debug("企业微信消息推送成功: %s", payload.get("msgtype", "unknown"))
async def send_markdown_v2(self, content: str) -> None:
for chunk in self._split_markdown_v2_content(content):
await self.send_payload(
{
"msgtype": "markdown_v2",
"markdown_v2": {"content": chunk},
}
)
async def send_image_base64(self, image_base64: str) -> None:
image_bytes = base64.b64decode(image_base64)
md5 = hashlib.md5(image_bytes).hexdigest()
await self.send_payload(
{
"msgtype": "image",
"image": {
"base64": image_base64,
"md5": md5,
},
}
)
async def upload_media(
self, file_path: Path, media_type: Literal["file", "voice"]
) -> str:
if not file_path.exists() or not file_path.is_file():
raise WecomAIBotWebhookError(f"文件不存在: {file_path}")
content_type = (
mimetypes.guess_type(str(file_path))[0] or "application/octet-stream"
)
form = aiohttp.FormData()
form.add_field(
"media",
file_path.read_bytes(),
filename=file_path.name,
content_type=content_type,
)
timeout = aiohttp.ClientTimeout(total=self.timeout_seconds)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
self._build_upload_url(media_type),
data=form,
) as response:
text = await response.text()
if response.status != 200:
raise WecomAIBotWebhookError(
f"上传媒体失败: HTTP {response.status}, {text}"
)
result = await response.json(content_type=None)
if result.get("errcode") != 0:
raise WecomAIBotWebhookError(
f"上传媒体失败: {result.get('errcode')} {result.get('errmsg')}"
)
media_id = result.get("media_id", "")
if not media_id:
raise WecomAIBotWebhookError("上传媒体失败: 返回缺少 media_id")
return str(media_id)
async def send_file(self, file_path: Path) -> None:
media_id = await self.upload_media(file_path, "file")
await self.send_payload(
{
"msgtype": "file",
"file": {"media_id": media_id},
}
)
async def send_voice(self, file_path: Path) -> None:
media_id = await self.upload_media(file_path, "voice")
await self.send_payload(
{
"msgtype": "voice",
"voice": {"media_id": media_id},
}
)
@staticmethod
def is_stream_supported_component(component: Any) -> bool:
return isinstance(component, Plain | Image | At)
async def send_message_chain(
self,
message_chain: MessageChain,
unsupported_only: bool = False,
) -> None:
async def flush_markdown_buffer(parts: list[str]) -> None:
content = "".join(parts).strip()
parts.clear()
if content:
await self.send_markdown_v2(content)
markdown_buffer: list[str] = []
for component in message_chain.chain:
if unsupported_only and self.is_stream_supported_component(component):
continue
if isinstance(component, Plain):
markdown_buffer.append(component.text)
elif isinstance(component, At):
mention_name = component.name or str(component.qq)
markdown_buffer.append(f" @{mention_name} ")
elif isinstance(component, Image):
await flush_markdown_buffer(markdown_buffer)
image_base64 = await component.convert_to_base64()
await self.send_image_base64(image_base64)
elif isinstance(component, File):
await flush_markdown_buffer(markdown_buffer)
file_path = await component.get_file()
if not file_path:
logger.warning("文件消息缺少有效文件路径,已跳过: %s", component)
continue
await self.send_file(Path(file_path))
elif isinstance(component, Video):
await flush_markdown_buffer(markdown_buffer)
video_path = await component.convert_to_file_path()
await self.send_file(Path(video_path))
elif isinstance(component, Record):
await flush_markdown_buffer(markdown_buffer)
source_voice_path = Path(await component.convert_to_file_path())
target_voice_path = source_voice_path
converted = False
if source_voice_path.suffix.lower() != ".amr":
target_voice_path = Path(
await convert_audio_format(str(source_voice_path), "amr"),
)
converted = target_voice_path != source_voice_path
try:
await self.send_voice(target_voice_path)
finally:
if converted and target_voice_path.exists():
try:
target_voice_path.unlink()
except Exception as e:
logger.warning(
"清理临时语音文件失败 %s: %s", target_voice_path, e
)
else:
logger.warning(
"企业微信消息推送暂不支持组件类型 %s,已跳过",
type(component).__name__,
)
await flush_markdown_buffer(markdown_buffer)
@@ -501,7 +501,7 @@
},
"wecomaibot_init_respond_text": {
"description": "WeCom AI Bot Initial Response Text",
"hint": "First reply when the bot receives a message. Leave empty to use default."
"hint": "First reply when the bot receives a message. Leave empty to disable."
},
"wpp_active_message_poll": {
"description": "Enable Proactive Message Polling",
@@ -521,6 +521,14 @@
"ws_reverse_token": {
"description": "Reverse WebSocket Token",
"hint": "Reverse WebSocket token. If not set, token verification is disabled."
},
"msg_push_webhook_url": {
"description": "WeCom Message Push Webhook URL",
"hint": "Used for proactive message push. It is strongly recommended to set this for a better message sending experience."
},
"only_use_webhook_url_to_send": {
"description": "Send Replies via Webhook Only",
"hint": "When enabled, all WeCom AI Bot replies are sent through msg_push_webhook_url. The message push webhook supports more message types (such as images, files, etc.). If you do not need the typing effect, it is strongly recommended to use this option. "
}
},
"general": {
@@ -350,7 +350,7 @@
},
"kf_name": {
"description": "微信客服账号名",
"hint": "可选。微信客服账号名(不是 ID)。可在 https://kf.weixin.qq.com/kf/frame#/accounts 获取"
"hint": "如果填写此项,即代表你将使用企业微信客服,而不是企业微信应用。可在 https://kf.weixin.qq.com/kf/frame#/accounts 获取"
},
"lark_bot_name": {
"description": "飞书机器人的名字",
@@ -500,11 +500,11 @@
},
"wecomaibot_friend_message_welcome_text": {
"description": "企业微信智能机器人私聊欢迎语",
"hint": "当用户当天进入智能机器人单聊会话,回复欢迎语,留空则不回复。"
"hint": "当用户当天进入智能机器人单聊会话,回复欢迎语,如 “💭 思考中...”。留空则不回复。"
},
"wecomaibot_init_respond_text": {
"description": "企业微信智能机器人初始响应文本",
"hint": "当机器人收到消息时,首先回复的文本内容。留空则使用默认值。"
"hint": "当机器人收到消息时,首先回复的文本内容。留空则不设置。"
},
"wpp_active_message_poll": {
"description": "是否启用主动消息轮询",
@@ -524,6 +524,14 @@
"ws_reverse_token": {
"description": "反向 Websocket Token",
"hint": "反向 Websocket Token。未设置则不启用 Token 验证。"
},
"msg_push_webhook_url": {
"description": "企业微信消息推送 Webhook URL",
"hint": "用于主动消息推送,请在企微群->消息推送得到 URL。强烈建议设置此项以带来更好的消息发送体验。"
},
"only_use_webhook_url_to_send": {
"description": "仅使用 Webhook 发送消息",
"hint": "启用后,企业微信智能机器人的所有回复都改为通过消息推送 Webhook 发送。消息推送 Webhook 支持更多的消息类型(如图片、文件等)。如果不需要打字机效果,强烈建议使用此选项。"
}
},
"general": {
+92 -1
View File
@@ -351,8 +351,99 @@ export default {
}
},
findPlatformTemplate(platform) {
const templates = this.metadata?.platform_group?.metadata?.platform?.config_template || {};
if (platform?.type && templates[platform.type]) {
return templates[platform.type];
}
if (platform?.id && templates[platform.id]) {
return templates[platform.id];
}
for (const template of Object.values(templates)) {
if (template?.type === platform?.type) {
return template;
}
}
return null;
},
mergeConfigWithTemplate(sourceConfig, templateConfig) {
const merge = (source, reference) => {
const target = {};
const sourceObj = source && typeof source === 'object' && !Array.isArray(source) ? source : {};
const referenceObj = reference && typeof reference === 'object' && !Array.isArray(reference) ? reference : null;
if (!referenceObj) {
for (const [key, value] of Object.entries(sourceObj)) {
if (Array.isArray(value)) {
target[key] = [...value];
} else if (value && typeof value === 'object') {
target[key] = { ...value };
} else {
target[key] = value;
}
}
return target;
}
// 1) template
for (const [key, refValue] of Object.entries(referenceObj)) {
const hasSourceKey = Object.prototype.hasOwnProperty.call(sourceObj, key);
const sourceValue = sourceObj[key];
if (refValue && typeof refValue === 'object' && !Array.isArray(refValue)) {
target[key] = merge(
hasSourceKey && sourceValue && typeof sourceValue === 'object' && !Array.isArray(sourceValue)
? sourceValue
: {},
refValue
);
continue;
}
if (hasSourceKey) {
if (Array.isArray(sourceValue)) {
target[key] = [...sourceValue];
} else if (sourceValue && typeof sourceValue === 'object') {
target[key] = { ...sourceValue };
} else {
target[key] = sourceValue;
}
} else if (Array.isArray(refValue)) {
target[key] = [...refValue];
} else {
target[key] = refValue;
}
}
// 2) source
for (const [key, value] of Object.entries(sourceObj)) {
if (Object.prototype.hasOwnProperty.call(referenceObj, key)) {
continue;
}
if (Array.isArray(value)) {
target[key] = [...value];
} else if (value && typeof value === 'object') {
target[key] = { ...value };
} else {
target[key] = value;
}
}
return target;
};
return merge(sourceConfig, templateConfig);
},
editPlatform(platform) {
this.updatingPlatformConfig = JSON.parse(JSON.stringify(platform));
const platformCopy = JSON.parse(JSON.stringify(platform));
const template = this.findPlatformTemplate(platformCopy);
this.updatingPlatformConfig = template
? this.mergeConfigWithTemplate(platformCopy, template)
: platformCopy;
this.updatingMode = true;
this.showAddPlatformDialog = true;
this.$nextTick(() => {