From 946595216a3166b63b7cf9a28c3631ec701dd8b4 Mon Sep 17 00:00:00 2001 From: xiamuceer Date: Thu, 15 May 2025 20:43:33 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96wechapadpro=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wechatpadpro/wechatpadpro_adapter.py | 66 ++---- .../wechatpadpro_message_event.py | 216 +++++------------- 2 files changed, 87 insertions(+), 195 deletions(-) diff --git a/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_adapter.py b/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_adapter.py index 1452ee7af..9e5160742 100644 --- a/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_adapter.py +++ b/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_adapter.py @@ -6,8 +6,6 @@ import websockets from typing import Awaitable, Any, Optional, Coroutine from astrbot.api.message_components import Plain, Image, At, Record, Video from astrbot.api.platform import Platform, PlatformMetadata -from astrbot.api.event import MessageChain -from astrbot.core.platform.astr_message_event import MessageSesion from astrbot.core.platform.astrbot_message import AstrBotMessage, MessageMember, MessageType from ...register import register_platform_adapter from astrbot import logger @@ -20,6 +18,8 @@ class WeChatPadProAdapter(Platform): self, platform_config: dict, platform_settings: dict, event_queue: asyncio.Queue ) -> None: super().__init__(event_queue) + self._shutdown_event = None + self.wxnewpass = None self.config = platform_config self.settings = platform_settings self.unique_session = platform_settings.get("unique_session", False) @@ -236,7 +236,6 @@ class WeChatPadProAdapter(Platform): status = response_data["Data"]["state"] logger.info(f"第 {attempts + 1} 次尝试,当前登录状态: {status},还剩{countdown-attempts*5}秒") if status == 2: # 状态 2 表示登录成功 - logger.info("登录成功!") self.wxid = response_data["Data"].get("wxid") self.wxnewpass = response_data["Data"].get("wxnewpass") logger.info(f"登录成功,wxid: {self.wxid}, wxnewpass: {self.wxnewpass}") @@ -324,7 +323,7 @@ class WeChatPadProAdapter(Platform): logger.error(f"处理 WebSocket 消息时发生错误: {e}") - async def convert_message(self, raw_message: dict) -> AstrBotMessage: + async def convert_message(self, raw_message: dict) -> AstrBotMessage | None: """ 将 WeChatPadPro 原始消息转换为 AstrBotMessage。 """ @@ -332,7 +331,7 @@ class WeChatPadProAdapter(Platform): abm.raw_message = raw_message abm.message_id = str(raw_message.get("msg_id")) abm.timestamp = raw_message.get("create_time") - abm.self_id = self.wxid # 机器人的 wxid + abm.self_id = self.wxid from_user_name = raw_message.get("from_user_name", {}).get("str", "") to_user_name = raw_message.get("to_user_name", {}).get("str", "") @@ -342,60 +341,55 @@ class WeChatPadProAdapter(Platform): abm.message_str = content # 纯文本消息内容 (初始值) abm.message = [] # Initialize message components list - # 先判断群聊/私聊并设置基本属性 - await self._process_chat_type(abm, raw_message, from_user_name, to_user_name, content) - # 如果是机器人自己发送的消息、回显消息或系统消息,忽略 - is_echo = raw_message.get("is_echo", False) or raw_message.get("msg_source") == "send" - is_system = msg_type in ("system", "sys") - if from_user_name == self.wxid or to_user_name == self.wxid or is_echo or is_system: - return None + if from_user_name == self.wxid: + logger.info("忽略自己发送的消息!!!") + return None - # 再根据消息类型处理消息内容 - self._process_message_content(abm, raw_message, msg_type, content) + # 先判断群聊/私聊并设置基本属性 + if await self._process_chat_type(abm, raw_message, from_user_name, to_user_name, content): + # 再根据消息类型处理消息内容 + self._process_message_content(abm, raw_message, msg_type, content) - return abm + return abm + return None async def _process_chat_type(self, abm: AstrBotMessage, raw_message: dict, from_user_name: str, to_user_name: str, content: str): """ 判断消息是群聊还是私聊,并设置 AstrBotMessage 的基本属性。 """ + if from_user_name == 'weixin': + logger.info("忽略微信团队的消息!!!") + return False if "@chatroom" in from_user_name: + logger.info("开始处理群消息!") abm.type = MessageType.GROUP_MESSAGE - abm.group_id = from_user_name # 群聊 ID + abm.group_id = from_user_name parts = content.split(":\n", 1) sender_wxid = "" if len(parts) == 2: sender_wxid = parts[0] - sender_name_from_content = parts[1] abm.sender = MessageMember(user_id=sender_wxid, nickname="") - # 如果需要更准确的群昵称,调用 GetChatroomMemberDetail 接口 - if sender_wxid: # 只有当发送者 wxid 可用时才尝试获取更准确的昵称 + # 获取群聊发送者的nickname + if sender_wxid: accurate_nickname = await self._get_group_member_nickname(abm.group_id, sender_wxid) if accurate_nickname: abm.sender.nickname = accurate_nickname # 对于群聊,session_id 可以是群聊 ID 或发送者 ID + 群聊 ID (如果 unique_session 为 True) if self.unique_session: - # 需要获取发送者的 wxid,这可能需要额外的接口调用或从消息中解析 - # 暂时使用 from_user_name 作为 session_id 的一部分 - abm.session_id = f"{from_user_name}_{to_user_name}" # 示例,可能需要调整 + abm.session_id = f"{from_user_name}_{to_user_name}" else: abm.session_id = from_user_name - # logger.info("跳过群消息") - # pass else: abm.type = MessageType.FRIEND_MESSAGE - abm.group_id = "" # 私聊没有群组 ID + abm.group_id = "" abm.sender = MessageMember(user_id=from_user_name, nickname="") # 暂时没有私聊发送者的昵称 - abm.session_id = from_user_name # 私聊的 session_id 是发送者 ID - # 如果是来自 'weixin' 的消息,忽略 - if from_user_name == 'weixin': - logger.info("收到来自 'weixin' 的消息,忽略!") - return + abm.session_id = from_user_name + return True async def _get_group_member_nickname(self, group_id: str, member_wxid: str) -> Optional[str]: """ @@ -418,10 +412,9 @@ class WeChatPadProAdapter(Platform): if member.get("user_name") == member_wxid: return member.get("nick_name") logger.warning(f"在群 {group_id} 中未找到成员 {member_wxid} 的昵称") - return None else: logger.error(f"获取群成员详情失败: {response.status}, {response_data}") - return None + return None except aiohttp.ClientConnectorError as e: logger.error(f"连接到 WeChatPadPro 服务失败: {e}") return None @@ -476,7 +469,6 @@ class WeChatPadProAdapter(Platform): except Exception as e: logger.error(f"解析引用消息 XML 失败: {e}") pass - # elif msg_type == ... # Add handling for other message types here else: logger.warning(f"收到未处理的消息类型: {msg_type}, 原始消息: {raw_message}") # abm.message remains empty [] for unhandled types @@ -500,11 +492,3 @@ class WeChatPadProAdapter(Platform): 得到一个平台的元数据。 """ return self.metadata - - async def send_by_session( - self, session: MessageSesion, message_chain: MessageChain - ) -> Awaitable[Any]: - """ - 通过会话发送消息。 - """ - logger.info(f"向会话 {session} 发送消息: {message_chain}") diff --git a/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_message_event.py b/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_message_event.py index 513a47e34..4adf5bb17 100644 --- a/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_message_event.py +++ b/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_message_event.py @@ -1,4 +1,4 @@ -import time +import asyncio import aiohttp from astrbot.core.platform.astr_message_event import AstrMessageEvent from astrbot.core.platform.astrbot_message import AstrBotMessage, MessageType @@ -21,169 +21,77 @@ class WeChatPadProMessageEvent(AstrMessageEvent): # 添加平台特定的参数,例如适配器实例 adapter: object, # 传递适配器实例 ): - # logger.info(f"WeChatPadProMessageEvent __init__ called with:") - # logger.info(f" message_str: {message_str}") - # logger.info(f" message_obj: {message_obj}") - # logger.info(f" message_obj.message: {message_obj.message}") # Log the message components list - # logger.info(f" platform_meta: {platform_meta}") - # logger.info(f" session_id: {session_id}") - # logger.info(f" adapter: {adapter}") - - # Pass the message components list to the parent class constructor - # Pass message_str to the parent class constructor, similar to gewechat adapter - # Pass message_str and message_obj to the parent class constructor, similar to fake adapter super().__init__(message_str, message_obj, platform_meta, session_id) self.message_obj = message_obj # Save the full message object self.adapter = adapter # Save the adapter instance async def send(self, message: MessageChain): - """ - 发送消息到 WeChatPadPro 平台。 - """ - # 在这里实现将 MessageChain 转换为 WeChatPadPro 消息格式并发送的逻辑 - # 遍历消息链,处理不同类型的消息组件 - for component in message.chain: - # logger.info(f"Processing component: {component}") # Log the component - # logger.info(f"Type of component: {type(component)}") # Log the type of the component - # logger.info(f"Image class in scope: {Image}") # Log the Image class itself - time.sleep(1) - if isinstance(component, Plain): - # 发送文本消息 - message_text = component.text - # 实现 reply_with_mention 功能 - if ( - self.message_obj.type == MessageType.GROUP_MESSAGE # 确保是群聊消息 - and self.adapter.settings.get("reply_with_mention", False) # 检查适配器设置是否启用 reply_with_mention - and self.message_obj.sender # 确保有发送者信息 - and (self.message_obj.sender.user_id or self.message_obj.sender.nickname) # 确保发送者有 ID 或昵称 - ): - # 在文本消息前加上 @ 消息发送者的信息 - # 优先使用 nickname,如果没有则使用 user_id - mention_text = self.message_obj.sender.nickname if self.message_obj.sender.nickname else self.message_obj.sender.user_id - message_text = f"@{mention_text} {message_text}" - logger.info(f"已添加 @ 信息: {message_text}") + async with aiohttp.ClientSession() as session: + for comp in message.chain: + await asyncio.sleep(1) + if isinstance(comp, Plain): + await self._send_text(session, comp.text) + elif isinstance(comp, Image): + await self._send_image(session, comp) + await super().send(message) - if message_text: - payload = { - "MsgItem": [ - { - "MsgType": 1, # 1 for Text - "TextContent": message_text, - "ToUserName": self.session_id, # 接收者 wxid - } - ] - } - url = f"{self.adapter.base_url}/message/SendTextMessage" # 使用文本消息发送接口 - params = {"key": self.adapter.auth_key} - async with aiohttp.ClientSession() as session: - try: - async with session.post(url, params=params, json=payload) as response: - response_data = await response.json() - if response.status == 200 and response_data.get("Code") == 200: - logger.info(f"成功发送文本消息到 {self.session_id}: {message_text}") - else: - logger.error(f"发送文本消息失败到 {self.session_id}: {response.status}, {response_data}") - except aiohttp.ClientConnectorError as e: - logger.error(f"连接到 WeChatPadPro 服务失败: {e}") - except Exception as e: - logger.error(f"发送文本消息时发生错误: {e}") + async def _send_image(self, session: aiohttp.ClientSession, comp: Image): + b64 = await comp.convert_to_base64() + raw = self._validate_base64(b64) + b64c = self._compress_image(raw) + payload = {"MsgItem":[{"ImageContent": b64c, "MsgType":3, "ToUserName": self.session_id}]} + url = f"{self.adapter.base_url}/message/SendImageNewMessage" + await self._post(session, url, payload) - elif isinstance(component, Image): - # 发送图片消息 - if hasattr(component, "convert_to_base64"): - try: - image_base64 = await component.convert_to_base64() - except Exception as e: - logger.error(f"Error converting image to base64: {e}") - continue - else: - logger.error("Image component missing convert_to_base64, skipping image send.") - continue - # logger.info(f"转换后的base64图片:{image_base64}") + async def _send_text(self, session: aiohttp.ClientSession, text: str): + if ( + self.message_obj.type == MessageType.GROUP_MESSAGE # 确保是群聊消息 + and self.adapter.settings.get("reply_with_mention", False) # 检查适配器设置是否启用 reply_with_mention + and self.message_obj.sender # 确保有发送者信息 + and (self.message_obj.sender.user_id or self.message_obj.sender.nickname) # 确保发送者有 ID 或昵称 + ): + # 优先使用 nickname,如果没有则使用 user_id + mention_text = self.message_obj.sender.nickname or self.message_obj.sender.user_id + message_text = f"@{mention_text} {text}" + logger.info(f"已添加 @ 信息: {message_text}") - # Base64图片格式校验 - try: - image_data = base64.b64decode(image_base64, validate=True) - logger.info("Base64图片格式校验成功。") - except (base64.binascii.Error, ValueError) as e: - logger.error(f"Base64图片格式校验失败: {e}") - await self.send(MessageChain([Plain("发送图片失败:图片编码格式不正确。")])) - continue # 跳过发送此图片 + payload = {"MsgItem": [{"MsgType": 1, "TextContent": text, "ToUserName": self.session_id}]} + url = f"{self.adapter.base_url}/message/SendTextMessage" + await self._post(session, url, payload) - # 图片压缩处理 - try: - img = PILImage.open(io.BytesIO(image_data)) # 使用别名 PILImage + @staticmethod + def _validate_base64(b64: str) -> bytes: + return base64.b64decode(b64, validate=True) - # 示例压缩:对于 JPEG 格式,降低质量;对于其他格式,转换为 JPEG 并降低质量 - output_buffer = io.BytesIO() - if img.format == 'JPEG': - img.save(output_buffer, format='JPEG', quality=80) # 降低JPEG质量到80 - else: - # 尝试转换为JPEG进行压缩,如果图片是透明的,先转换为RGB - if img.mode in ('RGBA', 'P'): - img = img.convert('RGB') - img.save(output_buffer, format='JPEG', quality=80) # 转换为JPEG并降低质量 + @staticmethod + def _compress_image(data: bytes) -> str: + img = PILImage.open(io.BytesIO(data)) + buf = io.BytesIO() + if img.format == "JPEG": + img.save(buf, "JPEG", quality=80) + else: + if img.mode in ("RGBA","P"): + img = img.convert("RGB") + img.save(buf, "JPEG", quality=80) + # logger.info("图片处理完成!!!") + return base64.b64encode(buf.getvalue()).decode() - compressed_image_base64 = base64.b64encode(output_buffer.getvalue()).decode('utf-8') - logger.info(f"图片压缩成功,原大小: {len(image_base64)} bytes, 压缩后大小: {len(compressed_image_base64)} bytes") - image_base64_to_send = compressed_image_base64 # 使用压缩后的base64 - except Exception as e: - logger.error(f"图片压缩处理失败: {e}") - # 如果压缩失败,可以选择发送原图或者跳过 - # 这里选择发送原图,或者可以根据需求发送错误消息并跳过 - image_base64_to_send = image_base64 # 压缩失败,发送原图 - logger.warning("图片压缩失败,将尝试发送原图。") + async def _post(self, session, url, payload): + params = {"key": self.adapter.auth_key} + try: + async with session.post(url, params=params, json=payload) as resp: + data = await resp.json() + if resp.status != 200 or data.get("Code") != 200: + logger.error(f"{url} failed: {resp.status} {data}") + except Exception as e: + logger.error(f"{url} error: {e}") - payload = { - "MsgItem": [ - { - "AtWxIDList": [], # 根据需要添加 @ 的用户 wxid 列表 - "ImageContent": image_base64_to_send, # 使用处理后的base64 - "MsgType": 3, # 图片消息类型 - "TextContent": "", - "ToUserName": self.session_id, # 接收者 wxid - } - ] - } - url = f"{self.adapter.base_url}/message/SendImageNewMessage" # 使用新的图片发送接口 - params = {"key": self.adapter.auth_key} - async with aiohttp.ClientSession() as session: - try: - async with session.post(url, params=params, json=payload) as response: - response_data = await response.json() - logger.info(response_data) - if response.status == 200 and response_data.get("Code") == 200: - logger.info(f"成功发送图片消息到 {self.session_id}") - else: - logger.error(f"发送图片消息失败到 {self.session_id}: {response.status}, {response_data}") - except aiohttp.ClientConnectorError as e: - logger.error(f"连接到 WeChatPadPro 服务失败: {e}") - except Exception as e: - logger.error(f"发送图片消息时发生错误: {e}") - - except Exception as e: - logger.error(f"处理图片消息失败: {e}") - # 可以选择发送一个错误提示文本消息 - await self.send(MessageChain([Plain("发送图片失败。")])) - # TODO: 添加对其他消息组件类型的处理 (Record, Video, At等) - # elif isinstance(component, Record): - # pass - # elif isinstance(component, Video): - # pass - # elif isinstance(component, At): - # pass - # ... - - await super().send(message) # 调用父类的 send 方法进行指标上报等操作 - - - # 根据 WeChatPadPro 的事件特点,可能需要重写 AstrMessageEvent 中的其他方法 - # 例如: - # def get_sender_id(self) -> str: - # # 从 self.message_obj 中获取发送者 ID - # return self.message_obj.sender.user_id - # - # def is_private_chat(self) -> bool: - # # 根据 self.message_obj 判断是否是私聊 - # return self.message_obj.type == MessageType.FRIEND_MESSAGE \ No newline at end of file +# TODO: 添加对其他消息组件类型的处理 (Record, Video, At等) +# elif isinstance(component, Record): +# pass +# elif isinstance(component, Video): +# pass +# elif isinstance(component, At): +# pass +# ...