From bf7fc02c8dc8977095ee5235542b30fe45083442 Mon Sep 17 00:00:00 2001 From: xiamuceer Date: Thu, 15 May 2025 17:26:31 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=82=E9=85=8D=E4=B8=80=E4=B8=AA=E4=B8=AA?= =?UTF-8?q?=E4=BA=BA=E5=BE=AE=E4=BF=A1=E9=80=82=E9=85=8D=E5=99=A8=E2=80=94?= =?UTF-8?q?=E2=80=94wechatpadpro?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- astrbot/core/config/default.py | 8 + astrbot/core/platform/manager.py | 4 + .../wechatpadpro/wechatpadpro_adapter.py | 513 ++++++++++++++++++ .../wechatpadpro_message_event.py | 183 +++++++ requirements.txt | 3 +- 5 files changed, 710 insertions(+), 1 deletion(-) create mode 100644 astrbot/core/platform/sources/wechatpadpro/wechatpadpro_adapter.py create mode 100644 astrbot/core/platform/sources/wechatpadpro/wechatpadpro_message_event.py diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index 56bdd5bac..9a8ba66eb 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -155,6 +155,14 @@ CONFIG_METADATA_2 = { "host": "这里填写你的局域网IP或者公网服务器IP", "port": 11451, }, + "wechatpadpro(微信)": { + "id": "wechatpadpro", + "type": "wechatpadpro", + "enable": False, + "admin_key": "stay33", + "host": "这里填写你的局域网IP或者公网服务器IP", + "port": 8059, + }, "weixin_official_account(微信公众平台)": { "id": "weixin_official_account", "type": "weixin_official_account", diff --git a/astrbot/core/platform/manager.py b/astrbot/core/platform/manager.py index 4ac575446..9473e9199 100644 --- a/astrbot/core/platform/manager.py +++ b/astrbot/core/platform/manager.py @@ -62,6 +62,10 @@ class PlatformManager: from .sources.gewechat.gewechat_platform_adapter import ( GewechatPlatformAdapter, # noqa: F401 ) + case "wechatpadpro": + from .sources.wechatpadpro.wechatpadpro_adapter import ( + WeChatPadProAdapter, + ) case "lark": from .sources.lark.lark_adapter import LarkPlatformAdapter # noqa: F401 case "dingtalk": diff --git a/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_adapter.py b/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_adapter.py new file mode 100644 index 000000000..ad0a5fe16 --- /dev/null +++ b/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_adapter.py @@ -0,0 +1,513 @@ +import asyncio +import aiohttp +import json +import os +import websockets +from typing import Awaitable, Any, Optional, Coroutine +from astrbot.api.message_components import Plain, Image, At, Record, Video +from astrbot.api.platform import Platform, PlatformMetadata +from astrbot.api.event import MessageChain +from astrbot.core.platform.astr_message_event import MessageSesion +from astrbot.core.platform.astrbot_message import AstrBotMessage, MessageMember, MessageType +from ...register import register_platform_adapter +from astrbot import logger +from astrbot.core.utils.astrbot_path import get_astrbot_data_path +from .wechatpadpro_message_event import WeChatPadProMessageEvent + +@register_platform_adapter("wechatpadpro", "WeChatPadPro 消息平台适配器") +class WeChatPadProAdapter(Platform): + def __init__( + self, platform_config: dict, platform_settings: dict, event_queue: asyncio.Queue + ) -> None: + super().__init__(event_queue) + self.config = platform_config + self.settings = platform_settings + self.unique_session = platform_settings.get("unique_session", False) + + self.metadata = PlatformMetadata( + name="wechatpadpro", + description="WeChatPadPro 消息平台适配器", + id=self.config.get("id", "wechatpadpro"), + ) + + # 保存配置信息 + self.admin_key = self.config.get("admin_key") + self.host = self.config.get("host") + self.port = self.config.get("port") + self.base_url = f"http://{self.host}:{self.port}" + self.auth_key = None # 用于保存生成的授权码 + self.wxid = None # 用于保存登录成功后的 wxid + self.credentials_file = os.path.join(get_astrbot_data_path(), "wechatpadpro_credentials.json") # 持久化文件路径 + self._websocket = None # 用于保存 WebSocket 连接 + + async def run(self) -> None: + """ + 启动平台适配器的运行实例。 + """ + logger.info("WeChatPadPro 适配器正在启动...") + + # 尝试从文件中加载凭据 + loaded_credentials = self.load_credentials() + if loaded_credentials: + self.auth_key = loaded_credentials.get("auth_key") + self.wxid = loaded_credentials.get("wxid") + + # 检查在线状态 + if self.auth_key and await self.check_online_status(): + logger.info("WeChatPadPro 设备已在线,跳过扫码登录。") + # 如果在线,连接 WebSocket 接收消息 + asyncio.create_task(self.connect_websocket()) + else: + logger.info("WeChatPadPro 设备不在线或无可用凭据,开始扫码登录流程。") + # 1. 生成授权码 + await self.generate_auth_key() + + if not self.auth_key: + logger.error("无法获取授权码,WeChatPadPro 适配器启动失败。") + return + + # 2. 获取登录二维码 + qr_code_url = await self.get_login_qr_code() + + if qr_code_url: + logger.info(f"请扫描以下二维码登录: {qr_code_url}") + else: + logger.error("无法获取登录二维码。") + return + + # 3. 检测扫码状态 + login_successful = await self.check_login_status() + + if login_successful: + # 登录成功后,连接 WebSocket 接收消息 + asyncio.create_task(self.connect_websocket()) + else: + logger.warning("登录失败或超时,WeChatPadPro 适配器将关闭。") + await self.terminate() + return + + + # 示例:保持运行直到终止事件被设置 + self._shutdown_event = asyncio.Event() + await self._shutdown_event.wait() + logger.info("WeChatPadPro 适配器已停止。") + + def load_credentials(self): + """ + 从文件中加载 auth_key 和 wxid。 + """ + if os.path.exists(self.credentials_file): + try: + with open(self.credentials_file, "r") as f: + credentials = json.load(f) + logger.info("成功加载 WeChatPadPro 凭据。") + return credentials + except Exception as e: + logger.error(f"加载 WeChatPadPro 凭据失败: {e}") + return None + + def save_credentials(self): + """ + 将 auth_key 和 wxid 保存到文件。 + """ + credentials = { + "auth_key": self.auth_key, + "wxid": self.wxid, + } + try: + # 确保数据目录存在 + data_dir = os.path.dirname(self.credentials_file) + os.makedirs(data_dir, exist_ok=True) + with open(self.credentials_file, "w") as f: + json.dump(credentials, f) + logger.info("成功保存 WeChatPadPro 凭据。") + except Exception as e: + logger.error(f"保存 WeChatPadPro 凭据失败: {e}") + + async def check_online_status(self): + """ + 检查 WeChatPadPro 设备是否在线。 + """ + url = f"{self.base_url}/login/GetLoginStatus" + params = {"key": self.auth_key} + + async with aiohttp.ClientSession() as session: + try: + async with session.get(url, params=params) as response: + response_data = await response.json() + # 根据提供的在线接口返回示例,成功状态码是 200,loginState 为 1 表示在线 + if response.status == 200 and response_data.get("Code") == 200: + login_state = response_data.get("Data", {}).get("loginState") + if login_state == 1: + logger.info("WeChatPadPro 设备当前在线。") + return True + else: + logger.info(f"WeChatPadPro 设备不在线,登录状态: {login_state}") + return False + else: + logger.error(f"检查在线状态失败: {response.status}, {response_data}") + return False + except aiohttp.ClientConnectorError as e: + logger.error(f"连接到 WeChatPadPro 服务失败: {e}") + return False + except Exception as e: + logger.error(f"检查在线状态时发生错误: {e}") + return False + + + async def generate_auth_key(self): + """ + 生成授权码。 + """ + url = f"{self.base_url}/admin/GenAuthKey1" + params = {"key": self.admin_key} + payload = {"Count": 1, "Days": 30} # 生成一个有效期30天的授权码 + + async with aiohttp.ClientSession() as session: + try: + async with session.post(url, params=params, json=payload) as response: + response_data = await response.json() + # 修正成功判断条件和授权码提取路径 + if response.status == 200 and response_data.get("Code") == 200: + # 授权码在 Data 字段的列表中 + if response_data.get("Data") and isinstance(response_data["Data"], list) and len(response_data["Data"]) > 0: + self.auth_key = response_data["Data"][0] + logger.info(f"成功获取授权码: {self.auth_key}") + else: + logger.error(f"生成授权码成功但未找到授权码: {response_data}") + else: + logger.error(f"生成授权码失败: {response.status}, {response_data}") + except aiohttp.ClientConnectorError as e: + logger.error(f"连接到 WeChatPadPro 服务失败: {e}") + except Exception as e: + logger.error(f"生成授权码时发生错误: {e}") + + async def get_login_qr_code(self): + """ + 获取登录二维码地址。 + """ + url = f"{self.base_url}/login/GetLoginQrCodeNew" + params = {"key": self.auth_key} + payload = {} # 根据文档,这个接口的 body 可以为空 + + async with aiohttp.ClientSession() as session: + try: + async with session.post(url, params=params, json=payload) as response: + response_data = await response.json() + # 修正成功判断条件和数据提取路径 + if response.status == 200 and response_data.get("Code") == 200: + # 二维码地址在 Data.QrCodeUrl 字段中 + if response_data.get("Data") and response_data["Data"].get("QrCodeUrl"): + return response_data["Data"]["QrCodeUrl"] + else: + logger.error(f"获取登录二维码成功但未找到二维码地址: {response_data}") + return None + else: + logger.error(f"获取登录二维码失败: {response.status}, {response_data}") + return None + except aiohttp.ClientConnectorError as e: + logger.error(f"连接到 WeChatPadPro 服务失败: {e}") + return None + except Exception as e: + logger.error(f"获取登录二维码时发生错误: {e}") + return None + + async def check_login_status(self): + """ + 循环检测扫码状态。 + 尝试 6 次后跳出循环,添加倒计时。 + 返回 True 如果登录成功,否则返回 False。 + """ + url = f"{self.base_url}/login/CheckLoginStatus" + params = {"key": self.auth_key} + + attempts = 0 # 初始化尝试次数 + max_attempts = 6 # 最大尝试次数 + countdown = 30 # 倒计时时长 + logger.info(f"请在 {countdown} 秒内扫码登录!!!") + while attempts < max_attempts: + async with aiohttp.ClientSession() as session: + try: + async with session.get(url, params=params) as response: + response_data = await response.json() + # 成功判断条件和数据提取路径 + if response.status == 200 and response_data.get("Code") == 200: + if response_data.get("Data") and response_data["Data"].get("state") is not None: + status = response_data["Data"]["state"] + logger.info(f"第 {attempts + 1} 次尝试,当前登录状态: {status},还剩{countdown-attempts*5}秒") + if status == 2: # 状态 2 表示登录成功 + logger.info("登录成功!") + self.wxid = response_data["Data"].get("wxid") + self.wxnewpass = response_data["Data"].get("wxnewpass") + logger.info(f"登录成功,wxid: {self.wxid}, wxnewpass: {self.wxnewpass}") + self.save_credentials() # 登录成功后保存凭据 + return True + elif status == -2: # 二维码过期 + logger.error("二维码已过期,请重新获取。") + return False + else: + logger.error(f"检测登录状态成功但未找到登录状态: {response_data}") + else: + logger.info(f"检测登录状态失败: {response.status}, {response_data}") + + except aiohttp.ClientConnectorError as e: + logger.error(f"连接到 WeChatPadPro 服务失败: {e}") + await asyncio.sleep(5) + attempts += 1 + continue + except Exception as e: + logger.error(f"检测登录状态时发生错误: {e}") + attempts += 1 + continue + + attempts += 1 + await asyncio.sleep(5) # 每隔5秒检测一次 + logger.warning("登录检测超过最大尝试次数,退出检测。") + return False + + async def connect_websocket(self): + """ + 建立 WebSocket 连接并处理接收到的消息。 + """ + os.environ["no_proxy"] = f"localhost,127.0.0.1,{self.host}" + ws_url = f"ws://{self.host}:{self.port}/ws/GetSyncMsg?key={self.auth_key}" + logger.info(f"正在连接 WebSocket: {ws_url}") + while True: + try: + async with websockets.connect(ws_url) as websocket: + self._websocket = websocket + logger.info("WebSocket 连接成功。") + while True: + try: + message = await websocket.recv() + asyncio.create_task(self.handle_websocket_message(message)) + except websockets.exceptions.ConnectionClosedOK: + logger.info("WebSocket 连接正常关闭。") + break + except Exception as e: + logger.error(f"处理 WebSocket 消息时发生错误: {e}") + # 在这里可以添加重连逻辑 + break + except Exception as e: + logger.error(f"WebSocket 连接失败: {e}") + # 在这里可以添加重连逻辑 + await asyncio.sleep(5) # 等待一段时间后重试 + + async def handle_websocket_message(self, message: str): + """ + 处理从 WebSocket 接收到的消息。 + """ + logger.info(f"收到 WebSocket 消息: {message}") + try: + message_data = json.loads(message) + # 检查消息结构,确保是有效的消息推送 + if message_data.get("msg_id") is not None and message_data.get("from_user_name") is not None: + abm = await self.convert_message(message_data) + if abm: + # 创建 WeChatPadProMessageEvent 实例 + message_event = WeChatPadProMessageEvent( + message_str=abm.message_str, + message_obj=abm, + platform_meta=self.meta(), + session_id=abm.session_id, + # 传递适配器实例,以便在事件中调用 send 方法 + adapter=self, + ) + # 提交事件到事件队列 + self.commit_event(message_event) + else: + logger.warning(f"收到未知结构的 WebSocket 消息: {message_data}") + + except json.JSONDecodeError: + logger.error(f"无法解析 WebSocket 消息为 JSON: {message}") + except Exception as e: + logger.error(f"处理 WebSocket 消息时发生错误: {e}") + + + async def convert_message(self, raw_message: dict) -> AstrBotMessage: + """ + 将 WeChatPadPro 原始消息转换为 AstrBotMessage。 + """ + abm = AstrBotMessage() + abm.raw_message = raw_message + abm.message_id = str(raw_message.get("msg_id")) + abm.timestamp = raw_message.get("create_time") + abm.self_id = self.wxid # 机器人的 wxid + + from_user_name = raw_message.get("from_user_name", {}).get("str", "") + to_user_name = raw_message.get("to_user_name", {}).get("str", "") + content = raw_message.get("content", {}).get("str", "") + msg_type = raw_message.get("msg_type") + + abm.message_str = content # 纯文本消息内容 (初始值) + abm.message = [] # Initialize message components list + + # 先判断群聊/私聊并设置基本属性 + await self._process_chat_type(abm, raw_message, from_user_name, to_user_name, content) + + # 如果是机器人自己发送的消息,忽略 + if from_user_name == self.wxid: + return None + + # 再根据消息类型处理消息内容 + self._process_message_content(abm, raw_message, msg_type, content) + + return abm + + async def _process_chat_type(self, abm: AstrBotMessage, raw_message: dict, from_user_name: str, to_user_name: str, content: str): + """ + 判断消息是群聊还是私聊,并设置 AstrBotMessage 的基本属性。 + """ + if "@chatroom" in from_user_name: + abm.type = MessageType.GROUP_MESSAGE + abm.group_id = from_user_name # 群聊 ID + + parts = content.split(":\n", 1) + sender_wxid = "" + if len(parts) == 2: + sender_wxid = parts[0] + sender_name_from_content = parts[1] + + abm.sender = MessageMember(user_id=sender_wxid, nickname="") + + # 如果需要更准确的群昵称,调用 GetChatroomMemberDetail 接口 + if sender_wxid: # 只有当发送者 wxid 可用时才尝试获取更准确的昵称 + accurate_nickname = await self._get_group_member_nickname(abm.group_id, sender_wxid) + if accurate_nickname: + abm.sender.nickname = accurate_nickname + + # 对于群聊,session_id 可以是群聊 ID 或发送者 ID + 群聊 ID (如果 unique_session 为 True) + if self.unique_session: + # 需要获取发送者的 wxid,这可能需要额外的接口调用或从消息中解析 + # 暂时使用 from_user_name 作为 session_id 的一部分 + abm.session_id = f"{from_user_name}_{to_user_name}" # 示例,可能需要调整 + else: + abm.session_id = from_user_name + # logger.info("跳过群消息") + # pass + else: + abm.type = MessageType.FRIEND_MESSAGE + abm.group_id = "" # 私聊没有群组 ID + abm.sender = MessageMember(user_id=from_user_name, nickname="") # 暂时没有私聊发送者的昵称 + abm.session_id = from_user_name # 私聊的 session_id 是发送者 ID + # 如果是来自 'weixin' 的消息,忽略 + if from_user_name == 'weixin': + logger.info("收到来自 'weixin' 的消息,忽略!") + return + + async def _get_group_member_nickname(self, group_id: str, member_wxid: str) -> Optional[str]: + """ + 通过接口获取群成员的昵称。 + """ + url = f"{self.base_url}/group/GetChatroomMemberDetail" + params = {"key": self.auth_key} + payload = { + "ChatRoomName": group_id, + } + + async with aiohttp.ClientSession() as session: + try: + async with session.post(url, params=params, json=payload) as response: + response_data = await response.json() + if response.status == 200 and response_data.get("Code") == 200: + # 从返回数据中查找对应成员的昵称 + member_list = response_data.get("Data", {}).get("member_data", {}).get("chatroom_member_list", []) + for member in member_list: + if member.get("user_name") == member_wxid: + return member.get("nick_name") + logger.warning(f"在群 {group_id} 中未找到成员 {member_wxid} 的昵称") + return None + else: + logger.error(f"获取群成员详情失败: {response.status}, {response_data}") + return None + except aiohttp.ClientConnectorError as e: + logger.error(f"连接到 WeChatPadPro 服务失败: {e}") + return None + except Exception as e: + logger.error(f"获取群成员详情时发生错误: {e}") + return None + + @staticmethod + def _process_message_content(abm: AstrBotMessage, raw_message: dict, msg_type: int, content: str): + """ + 根据消息类型处理消息内容,填充 AstrBotMessage 的 message 列表。 + """ + if msg_type == 1: # 文本消息 + # 对于群聊消息,从 content 中提取实际消息内容 + if abm.type == MessageType.GROUP_MESSAGE: + parts = content.split(":\n", 1) + if len(parts) == 2: + abm.message_str = parts[1] # 更新纯文本消息内容为实际消息内容 + abm.message.append(Plain(abm.message_str)) + else: + # 如果群聊消息格式不符合预期,仍然使用原始 content + abm.message.append(Plain(abm.message_str)) + else: # 私聊消息 + abm.message.append(Plain(abm.message_str)) + elif msg_type == 3: # 图片消息 + # TODO: 从 raw_message 中提取图片信息并创建 Image 组件 + logger.warning(f"收到图片消息,待实现处理: {raw_message}") + # 示例:abm.message.append(Image(file="图片文件路径或URL")) + pass + elif msg_type == 47: # 视频消息 (注意:表情消息也是 47,需要区分) + # TODO: 从 raw_message 中提取视频信息并创建 Video 组件 + logger.warning(f"收到视频消息,待实现处理: {raw_message}") + # 示例:abm.message.append(Video(file="视频文件路径或URL")) + pass + elif msg_type == 50: # 语音/视频 (根据上下文判断是语音还是视频) + # TODO: 从 raw_message 中提取语音信息并创建 Record 组件 + logger.warning(f"收到语音/视频消息,待实现处理: {raw_message}") + # 示例:abm.message.append(Record(file="语音文件路径或URL")) + pass + elif msg_type == 49: # 引用消息 + # TODO: 解析 content 中的 XML,提取引用内容和发送者信息 + logger.warning(f"收到引用消息,待实现处理: {raw_message}") + # 示例:abm.message.append(Reply(id="被引用消息ID", sender_id="被引用消息发送者ID")) + try: + import xml.etree.ElementTree as ET + root = ET.fromstring(content) + # 示例:提取被引用消息的发送者和内容 + # referenced_sender = root.find('.//dataitemsource/fromusr').text + # referenced_content = root.find('.//datadesc').text + # logger.info(f"引用消息解析结果 - 发送者: {referenced_sender}, 内容: {referenced_content}") + # 根据需要创建 Reply 组件或其他组件 + except Exception as e: + logger.error(f"解析引用消息 XML 失败: {e}") + pass + # elif msg_type == ... # Add handling for other message types here + else: + logger.warning(f"收到未处理的消息类型: {msg_type}, 原始消息: {raw_message}") + # abm.message remains empty [] for unhandled types + + async def terminate(self): + """ + 终止一个平台的运行实例。 + """ + logger.info("正在终止 WeChatPadPro 适配器...") + # 关闭 WebSocket 连接 + if self._websocket: + await self._websocket.close() + # 在这里实现终止 WeChatPadPro 客户端或连接的逻辑 + # await self.client.stop() + if hasattr(self, '_shutdown_event'): + self._shutdown_event.set() + + + def meta(self) -> PlatformMetadata: + """ + 得到一个平台的元数据。 + """ + return self.metadata + + async def send_by_session( + self, session: MessageSesion, message_chain: MessageChain + ) -> Awaitable[Any]: + """ + 通过会话发送消息。 + """ + logger.info(f"向会话 {session} 发送消息: {message_chain}") + # 在这里实现将 MessageChain 转换为 WeChatPadPro 消息格式并发送的逻辑 + # 例如: + # message_text = "".join([comp.text for comp in message_chain if isinstance(comp, Plain)]) + # await self.client.send_message(session.session_id, message_text) + pass # 待实现 diff --git a/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_message_event.py b/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_message_event.py new file mode 100644 index 000000000..203a0f814 --- /dev/null +++ b/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_message_event.py @@ -0,0 +1,183 @@ +import time +import aiohttp +from astrbot.core.platform.astr_message_event import AstrMessageEvent +from astrbot.core.platform.astrbot_message import AstrBotMessage, MessageType +from astrbot.core.platform.platform_metadata import PlatformMetadata +from astrbot.core.message.message_event_result import MessageChain +from astrbot.core.message.components import Plain, Image # Import Image +from astrbot import logger +import base64 +from PIL import Image as PILImage # 使用别名避免冲突 +import io + + +class WeChatPadProMessageEvent(AstrMessageEvent): + def __init__( + self, + message_str: str, + message_obj: AstrBotMessage, + platform_meta: PlatformMetadata, + session_id: str, + # 添加平台特定的参数,例如适配器实例 + adapter: object, # 传递适配器实例 + ): + # logger.info(f"WeChatPadProMessageEvent __init__ called with:") + # logger.info(f" message_str: {message_str}") + # logger.info(f" message_obj: {message_obj}") + # logger.info(f" message_obj.message: {message_obj.message}") # Log the message components list + # logger.info(f" platform_meta: {platform_meta}") + # logger.info(f" session_id: {session_id}") + # logger.info(f" adapter: {adapter}") + + # Pass the message components list to the parent class constructor + # Pass message_str to the parent class constructor, similar to gewechat adapter + # Pass message_str and message_obj to the parent class constructor, similar to fake adapter + super().__init__(message_str, message_obj, platform_meta, session_id) + self.message_obj = message_obj # Save the full message object + self.adapter = adapter # Save the adapter instance + + async def send(self, message: MessageChain): + """ + 发送消息到 WeChatPadPro 平台。 + """ + # 在这里实现将 MessageChain 转换为 WeChatPadPro 消息格式并发送的逻辑 + # 遍历消息链,处理不同类型的消息组件 + for component in message.chain: + # logger.info(f"Processing component: {component}") # Log the component + # logger.info(f"Type of component: {type(component)}") # Log the type of the component + # logger.info(f"Image class in scope: {Image}") # Log the Image class itself + time.sleep(1) + if isinstance(component, Plain): + # 发送文本消息 + message_text = component.text + # 实现 reply_with_mention 功能 + if ( + self.message_obj.type == MessageType.GROUP_MESSAGE # 确保是群聊消息 + and self.adapter.settings.get("reply_with_mention", False) # 检查适配器设置是否启用 reply_with_mention + and self.message_obj.sender # 确保有发送者信息 + and (self.message_obj.sender.user_id or self.message_obj.sender.nickname) # 确保发送者有 ID 或昵称 + ): + # 在文本消息前加上 @ 消息发送者的信息 + # 优先使用 nickname,如果没有则使用 user_id + mention_text = self.message_obj.sender.nickname if self.message_obj.sender.nickname else self.message_obj.sender.user_id + message_text = f"@{mention_text} {message_text}" + logger.info(f"已添加 @ 信息: {message_text}") + + if message_text: + payload = { + "MsgItem": [ + { + "MsgType": 1, # 1 for Text + "TextContent": message_text, + "ToUserName": self.session_id, # 接收者 wxid + } + ] + } + url = f"{self.adapter.base_url}/message/SendTextMessage" # 使用文本消息发送接口 + params = {"key": self.adapter.auth_key} + async with aiohttp.ClientSession() as session: + try: + async with session.post(url, params=params, json=payload) as response: + response_data = await response.json() + if response.status == 200 and response_data.get("Code") == 200: + logger.info(f"成功发送文本消息到 {self.session_id}: {message_text}") + else: + logger.error(f"发送文本消息失败到 {self.session_id}: {response.status}, {response_data}") + except aiohttp.ClientConnectorError as e: + logger.error(f"连接到 WeChatPadPro 服务失败: {e}") + except Exception as e: + logger.error(f"发送文本消息时发生错误: {e}") + + elif isinstance(component, Image): + # 发送图片消息 + try: + # 假设 Image 对象有 to_base64() 方法 + image_base64 = await component.convert_to_base64() # 需要 Image 组件支持转为 base64 + # logger.info(f"转换后的base64图片:{image_base64}") + + # Base64图片格式校验 + try: + image_data = base64.b64decode(image_base64, validate=True) + logger.info("Base64图片格式校验成功。") + except (base64.binascii.Error, ValueError) as e: + logger.error(f"Base64图片格式校验失败: {e}") + await self.send(MessageChain([Plain("发送图片失败:图片编码格式不正确。")])) + continue # 跳过发送此图片 + + # 图片压缩处理 + try: + img = PILImage.open(io.BytesIO(image_data)) # 使用别名 PILImage + + # 示例压缩:对于 JPEG 格式,降低质量;对于其他格式,转换为 JPEG 并降低质量 + output_buffer = io.BytesIO() + if img.format == 'JPEG': + img.save(output_buffer, format='JPEG', quality=80) # 降低JPEG质量到80 + else: + # 尝试转换为JPEG进行压缩,如果图片是透明的,先转换为RGB + if img.mode in ('RGBA', 'P'): + img = img.convert('RGB') + img.save(output_buffer, format='JPEG', quality=80) # 转换为JPEG并降低质量 + + compressed_image_base64 = base64.b64encode(output_buffer.getvalue()).decode('utf-8') + logger.info(f"图片压缩成功,原大小: {len(image_base64)} bytes, 压缩后大小: {len(compressed_image_base64)} bytes") + image_base64_to_send = compressed_image_base64 # 使用压缩后的base64 + except Exception as e: + logger.error(f"图片压缩处理失败: {e}") + # 如果压缩失败,可以选择发送原图或者跳过 + # 这里选择发送原图,或者可以根据需求发送错误消息并跳过 + image_base64_to_send = image_base64 # 压缩失败,发送原图 + logger.warning("图片压缩失败,将尝试发送原图。") + + + payload = { + "MsgItem": [ + { + "AtWxIDList": [], # 根据需要添加 @ 的用户 wxid 列表 + "ImageContent": image_base64_to_send, # 使用处理后的base64 + "MsgType": 3, # 图片消息类型 + "TextContent": "", + "ToUserName": self.session_id, # 接收者 wxid + } + ] + } + url = f"{self.adapter.base_url}/message/SendImageNewMessage" # 使用新的图片发送接口 + params = {"key": self.adapter.auth_key} + async with aiohttp.ClientSession() as session: + try: + async with session.post(url, params=params, json=payload) as response: + response_data = await response.json() + logger.info(response_data) + if response.status == 200 and response_data.get("Code") == 200: + logger.info(f"成功发送图片消息到 {self.session_id}") + else: + logger.error(f"发送图片消息失败到 {self.session_id}: {response.status}, {response_data}") + except aiohttp.ClientConnectorError as e: + logger.error(f"连接到 WeChatPadPro 服务失败: {e}") + except Exception as e: + logger.error(f"发送图片消息时发生错误: {e}") + + except Exception as e: + logger.error(f"处理图片消息失败: {e}") + # 可以选择发送一个错误提示文本消息 + await self.send(MessageChain([Plain("发送图片失败。")])) + # TODO: 添加对其他消息组件类型的处理 (Record, Video, At等) + # elif isinstance(component, Record): + # pass + # elif isinstance(component, Video): + # pass + # elif isinstance(component, At): + # pass + # ... + + await super().send(message) # 调用父类的 send 方法进行指标上报等操作 + + + # 根据 WeChatPadPro 的事件特点,可能需要重写 AstrMessageEvent 中的其他方法 + # 例如: + # def get_sender_id(self) -> str: + # # 从 self.message_obj 中获取发送者 ID + # return self.message_obj.sender.user_id + # + # def is_private_chat(self) -> bool: + # # 根据 self.message_obj 判断是否是私聊 + # return self.message_obj.type == MessageType.FRIEND_MESSAGE \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f800f131e..4bfd9e89d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,4 +33,5 @@ telegramify-markdown google-genai click filelock -watchfiles \ No newline at end of file +watchfiles +websockets \ No newline at end of file