From d09b70b295be88452040c3eb4fd52455cdb0692a Mon Sep 17 00:00:00 2001 From: Soulter <905617992@qq.com> Date: Tue, 20 May 2025 01:38:13 -0400 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E5=BE=AE=E4=BF=A1?= =?UTF-8?q?=E5=85=AC=E4=BC=97=E5=8F=B7=EF=BC=88=E4=B8=AA=E4=BA=BA=E8=AE=A4?= =?UTF-8?q?=E8=AF=81=EF=BC=89=E4=B8=8B=E6=97=A0=E6=B3=95=E5=9B=9E=E5=A4=8D?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- astrbot/core/config/default.py | 6 ++ .../weixin_offacc_adapter.py | 52 +++++++++++++---- .../weixin_offacc_event.py | 56 ++++++++++++++++--- packages/astrbot/main.py | 6 ++ 4 files changed, 101 insertions(+), 19 deletions(-) diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index f05e17f43..c78bd0480 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -176,6 +176,7 @@ CONFIG_METADATA_2 = { "api_base_url": "https://api.weixin.qq.com/cgi-bin/", "callback_server_host": "0.0.0.0", "port": 6194, + "active_send_mode": False }, "wecom(企业微信)": { "id": "wecom", @@ -220,6 +221,11 @@ CONFIG_METADATA_2 = { }, }, "items": { + "active_send_mode": { + "description": "是否换用主动发送接口", + "type": "bool", + "desc": "只有企业认证的公众号才能主动发送。主动发送接口的限制会少一些。" + }, "wpp_active_message_poll": { "description": "是否启用主动消息轮询", "type": "bool", diff --git a/astrbot/core/platform/sources/weixin_official_account/weixin_offacc_adapter.py b/astrbot/core/platform/sources/weixin_official_account/weixin_offacc_adapter.py index 5ed589516..04186ff9d 100644 --- a/astrbot/core/platform/sources/weixin_official_account/weixin_offacc_adapter.py +++ b/astrbot/core/platform/sources/weixin_official_account/weixin_offacc_adapter.py @@ -20,7 +20,7 @@ from requests import Response from wechatpy.utils import check_signature from wechatpy.crypto import WeChatCrypto from wechatpy import WeChatClient -from wechatpy.messages import TextMessage, ImageMessage, VoiceMessage +from wechatpy.messages import TextMessage, ImageMessage, VoiceMessage, BaseMessage from wechatpy.exceptions import InvalidSignatureException from wechatpy import parse_message from .weixin_offacc_event import WeixinOfficialAccountPlatformEvent @@ -87,7 +87,11 @@ class WecomServer: logger.info(f"解析成功: {msg}") if self.callback: - await self.callback(msg) + result_xml = await self.callback(msg) + if not result_xml: + return "success" + if isinstance(result_xml, str): + return result_xml return "success" @@ -117,6 +121,7 @@ class WeixinOfficialAccountPlatformAdapter(Platform): self.api_base_url = platform_config.get( "api_base_url", "https://api.weixin.qq.com/cgi-bin/" ) + self.active_send_mode = self.config.get("active_send_mode", False) if not self.api_base_url: self.api_base_url = "https://api.weixin.qq.com/cgi-bin/" @@ -138,9 +143,29 @@ class WeixinOfficialAccountPlatformAdapter(Platform): self.client.API_BASE_URL = self.api_base_url - async def callback(msg): + # 微信公众号必须 5 秒内进行回复,否则会重试 3 次,我们需要对其进行消息排重 + # msgid -> Future + self.wexin_event_workers: dict[str, asyncio.Future] = {} + + async def callback(msg: BaseMessage): try: - await self.convert_message(msg) + if self.active_send_mode: + await self.convert_message(msg, None) + else: + if msg.id in self.wexin_event_workers: + future = self.wexin_event_workers[msg.id] + logger.debug(f"duplicate message id checked: {msg.id}") + else: + future = asyncio.get_event_loop().create_future() + self.wexin_event_workers[msg.id] = future + await self.convert_message(msg, future) + # I love shield so much! + result = await asyncio.wait_for(asyncio.shield(future), 60) # wait for 60s + logger.debug(f"Got future result: {result}") + self.wexin_event_workers.pop(msg.id, None) + return result # xml. see weixin_offacc_event.py + except asyncio.TimeoutError: + pass except Exception as e: logger.error(f"转换消息时出现异常: {e}") @@ -163,7 +188,9 @@ class WeixinOfficialAccountPlatformAdapter(Platform): async def run(self): await self.server.start_polling() - async def convert_message(self, msg) -> AstrBotMessage | None: + async def convert_message( + self, msg, future: asyncio.Future = None + ) -> AstrBotMessage | None: abm = AstrBotMessage() if isinstance(msg, TextMessage): abm.message_str = msg.content @@ -177,7 +204,6 @@ class WeixinOfficialAccountPlatformAdapter(Platform): abm.message_id = msg.id abm.timestamp = msg.time abm.session_id = abm.sender.user_id - abm.raw_message = msg elif msg.type == "image": assert isinstance(msg, ImageMessage) abm.message_str = "[图片]" @@ -191,7 +217,6 @@ class WeixinOfficialAccountPlatformAdapter(Platform): abm.message_id = msg.id abm.timestamp = msg.time abm.session_id = abm.sender.user_id - abm.raw_message = msg elif msg.type == "voice": assert isinstance(msg, VoiceMessage) @@ -209,7 +234,9 @@ class WeixinOfficialAccountPlatformAdapter(Platform): audio = AudioSegment.from_file(path) audio.export(path_wav, format="wav") except Exception as e: - logger.error(f"转换音频失败: {e}。如果没有安装 pydub 和 ffmpeg 请先安装。") + logger.error( + f"转换音频失败: {e}。如果没有安装 pydub 和 ffmpeg 请先安装。" + ) path_wav = path return @@ -224,11 +251,16 @@ class WeixinOfficialAccountPlatformAdapter(Platform): abm.message_id = msg.id abm.timestamp = msg.time abm.session_id = abm.sender.user_id - abm.raw_message = msg else: logger.warning(f"暂未实现的事件: {msg.type}") + future.set_result(None) return - + # 很不优雅 :( + abm.raw_message = { + "message": msg, + "future": future, + "active_send_mode": self.active_send_mode, + } logger.info(f"abm: {abm}") await self.handle_msg(abm) diff --git a/astrbot/core/platform/sources/weixin_official_account/weixin_offacc_event.py b/astrbot/core/platform/sources/weixin_official_account/weixin_offacc_event.py index 9519cd497..102812705 100644 --- a/astrbot/core/platform/sources/weixin_official_account/weixin_offacc_event.py +++ b/astrbot/core/platform/sources/weixin_official_account/weixin_offacc_event.py @@ -4,6 +4,8 @@ from astrbot.api.event import AstrMessageEvent, MessageChain from astrbot.api.platform import AstrBotMessage, PlatformMetadata from astrbot.api.message_components import Plain, Image, Record from wechatpy import WeChatClient +from wechatpy.replies import TextReply, ImageReply, VoiceReply + from astrbot.api import logger @@ -82,12 +84,23 @@ class WeixinOfficialAccountPlatformEvent(AstrMessageEvent): async def send(self, message: MessageChain): message_obj = self.message_obj + active_send_mode = message_obj.raw_message.get("active_send_mode", False) for comp in message.chain: if isinstance(comp, Plain): # Split long text messages if needed plain_chunks = await self.split_plain(comp.text) for chunk in plain_chunks: - self.client.message.send_text(message_obj.sender.user_id, chunk) + if active_send_mode: + self.client.message.send_text(message_obj.sender.user_id, chunk) + else: + reply = TextReply( + content=chunk, + message=self.message_obj.raw_message["message"], + ) + xml = reply.render() + future = self.message_obj.raw_message["future"] + assert isinstance(future, asyncio.Future) + future.set_result(xml) await asyncio.sleep(0.5) # Avoid sending too fast elif isinstance(comp, Image): img_path = await comp.convert_to_file_path() @@ -102,10 +115,22 @@ class WeixinOfficialAccountPlatformEvent(AstrMessageEvent): ) return logger.debug(f"微信公众平台上传图片返回: {response}") - self.client.message.send_image( - message_obj.sender.user_id, - response["media_id"], - ) + + if active_send_mode: + self.client.message.send_image( + message_obj.sender.user_id, + response["media_id"], + ) + else: + reply = ImageReply( + media_id=response["media_id"], + message=self.message_obj.raw_message["message"], + ) + xml = reply.render() + future = self.message_obj.raw_message["future"] + assert isinstance(future, asyncio.Future) + future.set_result(xml) + elif isinstance(comp, Record): record_path = await comp.convert_to_file_path() # 转成amr @@ -124,10 +149,23 @@ class WeixinOfficialAccountPlatformEvent(AstrMessageEvent): ) return logger.info(f"微信公众平台上传语音返回: {response}") - self.client.message.send_voice( - message_obj.sender.user_id, - response["media_id"], - ) + + + if active_send_mode: + self.client.message.send_voice( + message_obj.sender.user_id, + response["media_id"], + ) + else: + reply = VoiceReply( + media_id=response["media_id"], + message=self.message_obj.raw_message["message"], + ) + xml = reply.render() + future = self.message_obj.raw_message["future"] + assert isinstance(future, asyncio.Future) + future.set_result(xml) + else: logger.warning(f"还没实现这个消息类型的发送逻辑: {comp.type}。") diff --git a/packages/astrbot/main.py b/packages/astrbot/main.py index 04d4cadb7..92ef6bfea 100644 --- a/packages/astrbot/main.py +++ b/packages/astrbot/main.py @@ -1462,3 +1462,9 @@ UID: {user_id} 此 ID 可用于设置管理员。 plugin_cfg["reset"] = reset_cfg alter_cmd_cfg["astrbot"] = plugin_cfg sp.put("alter_cmd", alter_cmd_cfg) + + @filter.command("test") + async def test_to(self, event: AstrMessageEvent): + import asyncio + await asyncio.sleep(10) + yield event.plain_result("OK")