适配一个个人微信适配器——wechatpadpro

This commit is contained in:
xiamuceer
2025-05-15 17:26:31 +08:00
parent 2ecb52a9b2
commit bf7fc02c8d
5 changed files with 710 additions and 1 deletions
+8
View File
@@ -155,6 +155,14 @@ CONFIG_METADATA_2 = {
"host": "这里填写你的局域网IP或者公网服务器IP",
"port": 11451,
},
"wechatpadpro(微信)": {
"id": "wechatpadpro",
"type": "wechatpadpro",
"enable": False,
"admin_key": "stay33",
"host": "这里填写你的局域网IP或者公网服务器IP",
"port": 8059,
},
"weixin_official_account(微信公众平台)": {
"id": "weixin_official_account",
"type": "weixin_official_account",
+4
View File
@@ -62,6 +62,10 @@ class PlatformManager:
from .sources.gewechat.gewechat_platform_adapter import (
GewechatPlatformAdapter, # noqa: F401
)
case "wechatpadpro":
from .sources.wechatpadpro.wechatpadpro_adapter import (
WeChatPadProAdapter,
)
case "lark":
from .sources.lark.lark_adapter import LarkPlatformAdapter # noqa: F401
case "dingtalk":
@@ -0,0 +1,513 @@
import asyncio
import aiohttp
import json
import os
import websockets
from typing import Awaitable, Any, Optional, Coroutine
from astrbot.api.message_components import Plain, Image, At, Record, Video
from astrbot.api.platform import Platform, PlatformMetadata
from astrbot.api.event import MessageChain
from astrbot.core.platform.astr_message_event import MessageSesion
from astrbot.core.platform.astrbot_message import AstrBotMessage, MessageMember, MessageType
from ...register import register_platform_adapter
from astrbot import logger
from astrbot.core.utils.astrbot_path import get_astrbot_data_path
from .wechatpadpro_message_event import WeChatPadProMessageEvent
@register_platform_adapter("wechatpadpro", "WeChatPadPro 消息平台适配器")
class WeChatPadProAdapter(Platform):
def __init__(
self, platform_config: dict, platform_settings: dict, event_queue: asyncio.Queue
) -> None:
super().__init__(event_queue)
self.config = platform_config
self.settings = platform_settings
self.unique_session = platform_settings.get("unique_session", False)
self.metadata = PlatformMetadata(
name="wechatpadpro",
description="WeChatPadPro 消息平台适配器",
id=self.config.get("id", "wechatpadpro"),
)
# 保存配置信息
self.admin_key = self.config.get("admin_key")
self.host = self.config.get("host")
self.port = self.config.get("port")
self.base_url = f"http://{self.host}:{self.port}"
self.auth_key = None # 用于保存生成的授权码
self.wxid = None # 用于保存登录成功后的 wxid
self.credentials_file = os.path.join(get_astrbot_data_path(), "wechatpadpro_credentials.json") # 持久化文件路径
self._websocket = None # 用于保存 WebSocket 连接
async def run(self) -> None:
"""
启动平台适配器的运行实例。
"""
logger.info("WeChatPadPro 适配器正在启动...")
# 尝试从文件中加载凭据
loaded_credentials = self.load_credentials()
if loaded_credentials:
self.auth_key = loaded_credentials.get("auth_key")
self.wxid = loaded_credentials.get("wxid")
# 检查在线状态
if self.auth_key and await self.check_online_status():
logger.info("WeChatPadPro 设备已在线,跳过扫码登录。")
# 如果在线,连接 WebSocket 接收消息
asyncio.create_task(self.connect_websocket())
else:
logger.info("WeChatPadPro 设备不在线或无可用凭据,开始扫码登录流程。")
# 1. 生成授权码
await self.generate_auth_key()
if not self.auth_key:
logger.error("无法获取授权码,WeChatPadPro 适配器启动失败。")
return
# 2. 获取登录二维码
qr_code_url = await self.get_login_qr_code()
if qr_code_url:
logger.info(f"请扫描以下二维码登录: {qr_code_url}")
else:
logger.error("无法获取登录二维码。")
return
# 3. 检测扫码状态
login_successful = await self.check_login_status()
if login_successful:
# 登录成功后,连接 WebSocket 接收消息
asyncio.create_task(self.connect_websocket())
else:
logger.warning("登录失败或超时,WeChatPadPro 适配器将关闭。")
await self.terminate()
return
# 示例:保持运行直到终止事件被设置
self._shutdown_event = asyncio.Event()
await self._shutdown_event.wait()
logger.info("WeChatPadPro 适配器已停止。")
def load_credentials(self):
"""
从文件中加载 auth_key 和 wxid。
"""
if os.path.exists(self.credentials_file):
try:
with open(self.credentials_file, "r") as f:
credentials = json.load(f)
logger.info("成功加载 WeChatPadPro 凭据。")
return credentials
except Exception as e:
logger.error(f"加载 WeChatPadPro 凭据失败: {e}")
return None
def save_credentials(self):
"""
将 auth_key 和 wxid 保存到文件。
"""
credentials = {
"auth_key": self.auth_key,
"wxid": self.wxid,
}
try:
# 确保数据目录存在
data_dir = os.path.dirname(self.credentials_file)
os.makedirs(data_dir, exist_ok=True)
with open(self.credentials_file, "w") as f:
json.dump(credentials, f)
logger.info("成功保存 WeChatPadPro 凭据。")
except Exception as e:
logger.error(f"保存 WeChatPadPro 凭据失败: {e}")
async def check_online_status(self):
"""
检查 WeChatPadPro 设备是否在线。
"""
url = f"{self.base_url}/login/GetLoginStatus"
params = {"key": self.auth_key}
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, params=params) as response:
response_data = await response.json()
# 根据提供的在线接口返回示例,成功状态码是 200,loginState 为 1 表示在线
if response.status == 200 and response_data.get("Code") == 200:
login_state = response_data.get("Data", {}).get("loginState")
if login_state == 1:
logger.info("WeChatPadPro 设备当前在线。")
return True
else:
logger.info(f"WeChatPadPro 设备不在线,登录状态: {login_state}")
return False
else:
logger.error(f"检查在线状态失败: {response.status}, {response_data}")
return False
except aiohttp.ClientConnectorError as e:
logger.error(f"连接到 WeChatPadPro 服务失败: {e}")
return False
except Exception as e:
logger.error(f"检查在线状态时发生错误: {e}")
return False
async def generate_auth_key(self):
"""
生成授权码。
"""
url = f"{self.base_url}/admin/GenAuthKey1"
params = {"key": self.admin_key}
payload = {"Count": 1, "Days": 30} # 生成一个有效期30天的授权码
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, params=params, json=payload) as response:
response_data = await response.json()
# 修正成功判断条件和授权码提取路径
if response.status == 200 and response_data.get("Code") == 200:
# 授权码在 Data 字段的列表中
if response_data.get("Data") and isinstance(response_data["Data"], list) and len(response_data["Data"]) > 0:
self.auth_key = response_data["Data"][0]
logger.info(f"成功获取授权码: {self.auth_key}")
else:
logger.error(f"生成授权码成功但未找到授权码: {response_data}")
else:
logger.error(f"生成授权码失败: {response.status}, {response_data}")
except aiohttp.ClientConnectorError as e:
logger.error(f"连接到 WeChatPadPro 服务失败: {e}")
except Exception as e:
logger.error(f"生成授权码时发生错误: {e}")
async def get_login_qr_code(self):
"""
获取登录二维码地址。
"""
url = f"{self.base_url}/login/GetLoginQrCodeNew"
params = {"key": self.auth_key}
payload = {} # 根据文档,这个接口的 body 可以为空
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, params=params, json=payload) as response:
response_data = await response.json()
# 修正成功判断条件和数据提取路径
if response.status == 200 and response_data.get("Code") == 200:
# 二维码地址在 Data.QrCodeUrl 字段中
if response_data.get("Data") and response_data["Data"].get("QrCodeUrl"):
return response_data["Data"]["QrCodeUrl"]
else:
logger.error(f"获取登录二维码成功但未找到二维码地址: {response_data}")
return None
else:
logger.error(f"获取登录二维码失败: {response.status}, {response_data}")
return None
except aiohttp.ClientConnectorError as e:
logger.error(f"连接到 WeChatPadPro 服务失败: {e}")
return None
except Exception as e:
logger.error(f"获取登录二维码时发生错误: {e}")
return None
async def check_login_status(self):
"""
循环检测扫码状态。
尝试 6 次后跳出循环,添加倒计时。
返回 True 如果登录成功,否则返回 False。
"""
url = f"{self.base_url}/login/CheckLoginStatus"
params = {"key": self.auth_key}
attempts = 0 # 初始化尝试次数
max_attempts = 6 # 最大尝试次数
countdown = 30 # 倒计时时长
logger.info(f"请在 {countdown} 秒内扫码登录!!!")
while attempts < max_attempts:
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, params=params) as response:
response_data = await response.json()
# 成功判断条件和数据提取路径
if response.status == 200 and response_data.get("Code") == 200:
if response_data.get("Data") and response_data["Data"].get("state") is not None:
status = response_data["Data"]["state"]
logger.info(f"{attempts + 1} 次尝试,当前登录状态: {status},还剩{countdown-attempts*5}")
if status == 2: # 状态 2 表示登录成功
logger.info("登录成功!")
self.wxid = response_data["Data"].get("wxid")
self.wxnewpass = response_data["Data"].get("wxnewpass")
logger.info(f"登录成功,wxid: {self.wxid}, wxnewpass: {self.wxnewpass}")
self.save_credentials() # 登录成功后保存凭据
return True
elif status == -2: # 二维码过期
logger.error("二维码已过期,请重新获取。")
return False
else:
logger.error(f"检测登录状态成功但未找到登录状态: {response_data}")
else:
logger.info(f"检测登录状态失败: {response.status}, {response_data}")
except aiohttp.ClientConnectorError as e:
logger.error(f"连接到 WeChatPadPro 服务失败: {e}")
await asyncio.sleep(5)
attempts += 1
continue
except Exception as e:
logger.error(f"检测登录状态时发生错误: {e}")
attempts += 1
continue
attempts += 1
await asyncio.sleep(5) # 每隔5秒检测一次
logger.warning("登录检测超过最大尝试次数,退出检测。")
return False
async def connect_websocket(self):
"""
建立 WebSocket 连接并处理接收到的消息。
"""
os.environ["no_proxy"] = f"localhost,127.0.0.1,{self.host}"
ws_url = f"ws://{self.host}:{self.port}/ws/GetSyncMsg?key={self.auth_key}"
logger.info(f"正在连接 WebSocket: {ws_url}")
while True:
try:
async with websockets.connect(ws_url) as websocket:
self._websocket = websocket
logger.info("WebSocket 连接成功。")
while True:
try:
message = await websocket.recv()
asyncio.create_task(self.handle_websocket_message(message))
except websockets.exceptions.ConnectionClosedOK:
logger.info("WebSocket 连接正常关闭。")
break
except Exception as e:
logger.error(f"处理 WebSocket 消息时发生错误: {e}")
# 在这里可以添加重连逻辑
break
except Exception as e:
logger.error(f"WebSocket 连接失败: {e}")
# 在这里可以添加重连逻辑
await asyncio.sleep(5) # 等待一段时间后重试
async def handle_websocket_message(self, message: str):
"""
处理从 WebSocket 接收到的消息。
"""
logger.info(f"收到 WebSocket 消息: {message}")
try:
message_data = json.loads(message)
# 检查消息结构,确保是有效的消息推送
if message_data.get("msg_id") is not None and message_data.get("from_user_name") is not None:
abm = await self.convert_message(message_data)
if abm:
# 创建 WeChatPadProMessageEvent 实例
message_event = WeChatPadProMessageEvent(
message_str=abm.message_str,
message_obj=abm,
platform_meta=self.meta(),
session_id=abm.session_id,
# 传递适配器实例,以便在事件中调用 send 方法
adapter=self,
)
# 提交事件到事件队列
self.commit_event(message_event)
else:
logger.warning(f"收到未知结构的 WebSocket 消息: {message_data}")
except json.JSONDecodeError:
logger.error(f"无法解析 WebSocket 消息为 JSON: {message}")
except Exception as e:
logger.error(f"处理 WebSocket 消息时发生错误: {e}")
async def convert_message(self, raw_message: dict) -> AstrBotMessage:
"""
将 WeChatPadPro 原始消息转换为 AstrBotMessage。
"""
abm = AstrBotMessage()
abm.raw_message = raw_message
abm.message_id = str(raw_message.get("msg_id"))
abm.timestamp = raw_message.get("create_time")
abm.self_id = self.wxid # 机器人的 wxid
from_user_name = raw_message.get("from_user_name", {}).get("str", "")
to_user_name = raw_message.get("to_user_name", {}).get("str", "")
content = raw_message.get("content", {}).get("str", "")
msg_type = raw_message.get("msg_type")
abm.message_str = content # 纯文本消息内容 (初始值)
abm.message = [] # Initialize message components list
# 先判断群聊/私聊并设置基本属性
await self._process_chat_type(abm, raw_message, from_user_name, to_user_name, content)
# 如果是机器人自己发送的消息,忽略
if from_user_name == self.wxid:
return None
# 再根据消息类型处理消息内容
self._process_message_content(abm, raw_message, msg_type, content)
return abm
async def _process_chat_type(self, abm: AstrBotMessage, raw_message: dict, from_user_name: str, to_user_name: str, content: str):
"""
判断消息是群聊还是私聊,并设置 AstrBotMessage 的基本属性。
"""
if "@chatroom" in from_user_name:
abm.type = MessageType.GROUP_MESSAGE
abm.group_id = from_user_name # 群聊 ID
parts = content.split(":\n", 1)
sender_wxid = ""
if len(parts) == 2:
sender_wxid = parts[0]
sender_name_from_content = parts[1]
abm.sender = MessageMember(user_id=sender_wxid, nickname="")
# 如果需要更准确的群昵称,调用 GetChatroomMemberDetail 接口
if sender_wxid: # 只有当发送者 wxid 可用时才尝试获取更准确的昵称
accurate_nickname = await self._get_group_member_nickname(abm.group_id, sender_wxid)
if accurate_nickname:
abm.sender.nickname = accurate_nickname
# 对于群聊,session_id 可以是群聊 ID 或发送者 ID + 群聊 ID (如果 unique_session 为 True)
if self.unique_session:
# 需要获取发送者的 wxid,这可能需要额外的接口调用或从消息中解析
# 暂时使用 from_user_name 作为 session_id 的一部分
abm.session_id = f"{from_user_name}_{to_user_name}" # 示例,可能需要调整
else:
abm.session_id = from_user_name
# logger.info("跳过群消息")
# pass
else:
abm.type = MessageType.FRIEND_MESSAGE
abm.group_id = "" # 私聊没有群组 ID
abm.sender = MessageMember(user_id=from_user_name, nickname="") # 暂时没有私聊发送者的昵称
abm.session_id = from_user_name # 私聊的 session_id 是发送者 ID
# 如果是来自 'weixin' 的消息,忽略
if from_user_name == 'weixin':
logger.info("收到来自 'weixin' 的消息,忽略!")
return
async def _get_group_member_nickname(self, group_id: str, member_wxid: str) -> Optional[str]:
"""
通过接口获取群成员的昵称。
"""
url = f"{self.base_url}/group/GetChatroomMemberDetail"
params = {"key": self.auth_key}
payload = {
"ChatRoomName": group_id,
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, params=params, json=payload) as response:
response_data = await response.json()
if response.status == 200 and response_data.get("Code") == 200:
# 从返回数据中查找对应成员的昵称
member_list = response_data.get("Data", {}).get("member_data", {}).get("chatroom_member_list", [])
for member in member_list:
if member.get("user_name") == member_wxid:
return member.get("nick_name")
logger.warning(f"在群 {group_id} 中未找到成员 {member_wxid} 的昵称")
return None
else:
logger.error(f"获取群成员详情失败: {response.status}, {response_data}")
return None
except aiohttp.ClientConnectorError as e:
logger.error(f"连接到 WeChatPadPro 服务失败: {e}")
return None
except Exception as e:
logger.error(f"获取群成员详情时发生错误: {e}")
return None
@staticmethod
def _process_message_content(abm: AstrBotMessage, raw_message: dict, msg_type: int, content: str):
"""
根据消息类型处理消息内容,填充 AstrBotMessage 的 message 列表。
"""
if msg_type == 1: # 文本消息
# 对于群聊消息,从 content 中提取实际消息内容
if abm.type == MessageType.GROUP_MESSAGE:
parts = content.split(":\n", 1)
if len(parts) == 2:
abm.message_str = parts[1] # 更新纯文本消息内容为实际消息内容
abm.message.append(Plain(abm.message_str))
else:
# 如果群聊消息格式不符合预期,仍然使用原始 content
abm.message.append(Plain(abm.message_str))
else: # 私聊消息
abm.message.append(Plain(abm.message_str))
elif msg_type == 3: # 图片消息
# TODO: 从 raw_message 中提取图片信息并创建 Image 组件
logger.warning(f"收到图片消息,待实现处理: {raw_message}")
# 示例:abm.message.append(Image(file="图片文件路径或URL"))
pass
elif msg_type == 47: # 视频消息 (注意:表情消息也是 47,需要区分)
# TODO: 从 raw_message 中提取视频信息并创建 Video 组件
logger.warning(f"收到视频消息,待实现处理: {raw_message}")
# 示例:abm.message.append(Video(file="视频文件路径或URL"))
pass
elif msg_type == 50: # 语音/视频 (根据上下文判断是语音还是视频)
# TODO: 从 raw_message 中提取语音信息并创建 Record 组件
logger.warning(f"收到语音/视频消息,待实现处理: {raw_message}")
# 示例:abm.message.append(Record(file="语音文件路径或URL"))
pass
elif msg_type == 49: # 引用消息
# TODO: 解析 content 中的 XML,提取引用内容和发送者信息
logger.warning(f"收到引用消息,待实现处理: {raw_message}")
# 示例:abm.message.append(Reply(id="被引用消息ID", sender_id="被引用消息发送者ID"))
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(content)
# 示例:提取被引用消息的发送者和内容
# referenced_sender = root.find('.//dataitemsource/fromusr').text
# referenced_content = root.find('.//datadesc').text
# logger.info(f"引用消息解析结果 - 发送者: {referenced_sender}, 内容: {referenced_content}")
# 根据需要创建 Reply 组件或其他组件
except Exception as e:
logger.error(f"解析引用消息 XML 失败: {e}")
pass
# elif msg_type == ... # Add handling for other message types here
else:
logger.warning(f"收到未处理的消息类型: {msg_type}, 原始消息: {raw_message}")
# abm.message remains empty [] for unhandled types
async def terminate(self):
"""
终止一个平台的运行实例。
"""
logger.info("正在终止 WeChatPadPro 适配器...")
# 关闭 WebSocket 连接
if self._websocket:
await self._websocket.close()
# 在这里实现终止 WeChatPadPro 客户端或连接的逻辑
# await self.client.stop()
if hasattr(self, '_shutdown_event'):
self._shutdown_event.set()
def meta(self) -> PlatformMetadata:
"""
得到一个平台的元数据。
"""
return self.metadata
async def send_by_session(
self, session: MessageSesion, message_chain: MessageChain
) -> Awaitable[Any]:
"""
通过会话发送消息。
"""
logger.info(f"向会话 {session} 发送消息: {message_chain}")
# 在这里实现将 MessageChain 转换为 WeChatPadPro 消息格式并发送的逻辑
# 例如:
# message_text = "".join([comp.text for comp in message_chain if isinstance(comp, Plain)])
# await self.client.send_message(session.session_id, message_text)
pass # 待实现
@@ -0,0 +1,183 @@
import time
import aiohttp
from astrbot.core.platform.astr_message_event import AstrMessageEvent
from astrbot.core.platform.astrbot_message import AstrBotMessage, MessageType
from astrbot.core.platform.platform_metadata import PlatformMetadata
from astrbot.core.message.message_event_result import MessageChain
from astrbot.core.message.components import Plain, Image # Import Image
from astrbot import logger
import base64
from PIL import Image as PILImage # 使用别名避免冲突
import io
class WeChatPadProMessageEvent(AstrMessageEvent):
def __init__(
self,
message_str: str,
message_obj: AstrBotMessage,
platform_meta: PlatformMetadata,
session_id: str,
# 添加平台特定的参数,例如适配器实例
adapter: object, # 传递适配器实例
):
# logger.info(f"WeChatPadProMessageEvent __init__ called with:")
# logger.info(f" message_str: {message_str}")
# logger.info(f" message_obj: {message_obj}")
# logger.info(f" message_obj.message: {message_obj.message}") # Log the message components list
# logger.info(f" platform_meta: {platform_meta}")
# logger.info(f" session_id: {session_id}")
# logger.info(f" adapter: {adapter}")
# Pass the message components list to the parent class constructor
# Pass message_str to the parent class constructor, similar to gewechat adapter
# Pass message_str and message_obj to the parent class constructor, similar to fake adapter
super().__init__(message_str, message_obj, platform_meta, session_id)
self.message_obj = message_obj # Save the full message object
self.adapter = adapter # Save the adapter instance
async def send(self, message: MessageChain):
"""
发送消息到 WeChatPadPro 平台。
"""
# 在这里实现将 MessageChain 转换为 WeChatPadPro 消息格式并发送的逻辑
# 遍历消息链,处理不同类型的消息组件
for component in message.chain:
# logger.info(f"Processing component: {component}") # Log the component
# logger.info(f"Type of component: {type(component)}") # Log the type of the component
# logger.info(f"Image class in scope: {Image}") # Log the Image class itself
time.sleep(1)
if isinstance(component, Plain):
# 发送文本消息
message_text = component.text
# 实现 reply_with_mention 功能
if (
self.message_obj.type == MessageType.GROUP_MESSAGE # 确保是群聊消息
and self.adapter.settings.get("reply_with_mention", False) # 检查适配器设置是否启用 reply_with_mention
and self.message_obj.sender # 确保有发送者信息
and (self.message_obj.sender.user_id or self.message_obj.sender.nickname) # 确保发送者有 ID 或昵称
):
# 在文本消息前加上 @ 消息发送者的信息
# 优先使用 nickname,如果没有则使用 user_id
mention_text = self.message_obj.sender.nickname if self.message_obj.sender.nickname else self.message_obj.sender.user_id
message_text = f"@{mention_text} {message_text}"
logger.info(f"已添加 @ 信息: {message_text}")
if message_text:
payload = {
"MsgItem": [
{
"MsgType": 1, # 1 for Text
"TextContent": message_text,
"ToUserName": self.session_id, # 接收者 wxid
}
]
}
url = f"{self.adapter.base_url}/message/SendTextMessage" # 使用文本消息发送接口
params = {"key": self.adapter.auth_key}
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, params=params, json=payload) as response:
response_data = await response.json()
if response.status == 200 and response_data.get("Code") == 200:
logger.info(f"成功发送文本消息到 {self.session_id}: {message_text}")
else:
logger.error(f"发送文本消息失败到 {self.session_id}: {response.status}, {response_data}")
except aiohttp.ClientConnectorError as e:
logger.error(f"连接到 WeChatPadPro 服务失败: {e}")
except Exception as e:
logger.error(f"发送文本消息时发生错误: {e}")
elif isinstance(component, Image):
# 发送图片消息
try:
# 假设 Image 对象有 to_base64() 方法
image_base64 = await component.convert_to_base64() # 需要 Image 组件支持转为 base64
# logger.info(f"转换后的base64图片:{image_base64}")
# Base64图片格式校验
try:
image_data = base64.b64decode(image_base64, validate=True)
logger.info("Base64图片格式校验成功。")
except (base64.binascii.Error, ValueError) as e:
logger.error(f"Base64图片格式校验失败: {e}")
await self.send(MessageChain([Plain("发送图片失败:图片编码格式不正确。")]))
continue # 跳过发送此图片
# 图片压缩处理
try:
img = PILImage.open(io.BytesIO(image_data)) # 使用别名 PILImage
# 示例压缩:对于 JPEG 格式,降低质量;对于其他格式,转换为 JPEG 并降低质量
output_buffer = io.BytesIO()
if img.format == 'JPEG':
img.save(output_buffer, format='JPEG', quality=80) # 降低JPEG质量到80
else:
# 尝试转换为JPEG进行压缩,如果图片是透明的,先转换为RGB
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
img.save(output_buffer, format='JPEG', quality=80) # 转换为JPEG并降低质量
compressed_image_base64 = base64.b64encode(output_buffer.getvalue()).decode('utf-8')
logger.info(f"图片压缩成功,原大小: {len(image_base64)} bytes, 压缩后大小: {len(compressed_image_base64)} bytes")
image_base64_to_send = compressed_image_base64 # 使用压缩后的base64
except Exception as e:
logger.error(f"图片压缩处理失败: {e}")
# 如果压缩失败,可以选择发送原图或者跳过
# 这里选择发送原图,或者可以根据需求发送错误消息并跳过
image_base64_to_send = image_base64 # 压缩失败,发送原图
logger.warning("图片压缩失败,将尝试发送原图。")
payload = {
"MsgItem": [
{
"AtWxIDList": [], # 根据需要添加 @ 的用户 wxid 列表
"ImageContent": image_base64_to_send, # 使用处理后的base64
"MsgType": 3, # 图片消息类型
"TextContent": "",
"ToUserName": self.session_id, # 接收者 wxid
}
]
}
url = f"{self.adapter.base_url}/message/SendImageNewMessage" # 使用新的图片发送接口
params = {"key": self.adapter.auth_key}
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, params=params, json=payload) as response:
response_data = await response.json()
logger.info(response_data)
if response.status == 200 and response_data.get("Code") == 200:
logger.info(f"成功发送图片消息到 {self.session_id}")
else:
logger.error(f"发送图片消息失败到 {self.session_id}: {response.status}, {response_data}")
except aiohttp.ClientConnectorError as e:
logger.error(f"连接到 WeChatPadPro 服务失败: {e}")
except Exception as e:
logger.error(f"发送图片消息时发生错误: {e}")
except Exception as e:
logger.error(f"处理图片消息失败: {e}")
# 可以选择发送一个错误提示文本消息
await self.send(MessageChain([Plain("发送图片失败。")]))
# TODO: 添加对其他消息组件类型的处理 (Record, Video, At等)
# elif isinstance(component, Record):
# pass
# elif isinstance(component, Video):
# pass
# elif isinstance(component, At):
# pass
# ...
await super().send(message) # 调用父类的 send 方法进行指标上报等操作
# 根据 WeChatPadPro 的事件特点,可能需要重写 AstrMessageEvent 中的其他方法
# 例如:
# def get_sender_id(self) -> str:
# # 从 self.message_obj 中获取发送者 ID
# return self.message_obj.sender.user_id
#
# def is_private_chat(self) -> bool:
# # 根据 self.message_obj 判断是否是私聊
# return self.message_obj.type == MessageType.FRIEND_MESSAGE
+2 -1
View File
@@ -33,4 +33,5 @@ telegramify-markdown
google-genai
click
filelock
watchfiles
watchfiles
websockets