优化wechapadpro代码结构

This commit is contained in:
xiamuceer
2025-05-15 20:43:33 +08:00
parent 6ea5b7581f
commit 946595216a
2 changed files with 87 additions and 195 deletions
@@ -6,8 +6,6 @@ import websockets
from typing import Awaitable, Any, Optional, Coroutine
from astrbot.api.message_components import Plain, Image, At, Record, Video
from astrbot.api.platform import Platform, PlatformMetadata
from astrbot.api.event import MessageChain
from astrbot.core.platform.astr_message_event import MessageSesion
from astrbot.core.platform.astrbot_message import AstrBotMessage, MessageMember, MessageType
from ...register import register_platform_adapter
from astrbot import logger
@@ -20,6 +18,8 @@ class WeChatPadProAdapter(Platform):
self, platform_config: dict, platform_settings: dict, event_queue: asyncio.Queue
) -> None:
super().__init__(event_queue)
self._shutdown_event = None
self.wxnewpass = None
self.config = platform_config
self.settings = platform_settings
self.unique_session = platform_settings.get("unique_session", False)
@@ -236,7 +236,6 @@ class WeChatPadProAdapter(Platform):
status = response_data["Data"]["state"]
logger.info(f"{attempts + 1} 次尝试,当前登录状态: {status},还剩{countdown-attempts*5}")
if status == 2: # 状态 2 表示登录成功
logger.info("登录成功!")
self.wxid = response_data["Data"].get("wxid")
self.wxnewpass = response_data["Data"].get("wxnewpass")
logger.info(f"登录成功,wxid: {self.wxid}, wxnewpass: {self.wxnewpass}")
@@ -324,7 +323,7 @@ class WeChatPadProAdapter(Platform):
logger.error(f"处理 WebSocket 消息时发生错误: {e}")
async def convert_message(self, raw_message: dict) -> AstrBotMessage:
async def convert_message(self, raw_message: dict) -> AstrBotMessage | None:
"""
将 WeChatPadPro 原始消息转换为 AstrBotMessage。
"""
@@ -332,7 +331,7 @@ class WeChatPadProAdapter(Platform):
abm.raw_message = raw_message
abm.message_id = str(raw_message.get("msg_id"))
abm.timestamp = raw_message.get("create_time")
abm.self_id = self.wxid # 机器人的 wxid
abm.self_id = self.wxid
from_user_name = raw_message.get("from_user_name", {}).get("str", "")
to_user_name = raw_message.get("to_user_name", {}).get("str", "")
@@ -342,60 +341,55 @@ class WeChatPadProAdapter(Platform):
abm.message_str = content # 纯文本消息内容 (初始值)
abm.message = [] # Initialize message components list
# 先判断群聊/私聊并设置基本属性
await self._process_chat_type(abm, raw_message, from_user_name, to_user_name, content)
# 如果是机器人自己发送的消息、回显消息或系统消息,忽略
is_echo = raw_message.get("is_echo", False) or raw_message.get("msg_source") == "send"
is_system = msg_type in ("system", "sys")
if from_user_name == self.wxid or to_user_name == self.wxid or is_echo or is_system:
return None
if from_user_name == self.wxid:
logger.info("忽略自己发送的消息!!!")
return None
# 再根据消息类型处理消息内容
self._process_message_content(abm, raw_message, msg_type, content)
# 先判断群聊/私聊并设置基本属性
if await self._process_chat_type(abm, raw_message, from_user_name, to_user_name, content):
# 再根据消息类型处理消息内容
self._process_message_content(abm, raw_message, msg_type, content)
return abm
return abm
return None
async def _process_chat_type(self, abm: AstrBotMessage, raw_message: dict, from_user_name: str, to_user_name: str, content: str):
"""
判断消息是群聊还是私聊,并设置 AstrBotMessage 的基本属性。
"""
if from_user_name == 'weixin':
logger.info("忽略微信团队的消息!!!")
return False
if "@chatroom" in from_user_name:
logger.info("开始处理群消息!")
abm.type = MessageType.GROUP_MESSAGE
abm.group_id = from_user_name # 群聊 ID
abm.group_id = from_user_name
parts = content.split(":\n", 1)
sender_wxid = ""
if len(parts) == 2:
sender_wxid = parts[0]
sender_name_from_content = parts[1]
abm.sender = MessageMember(user_id=sender_wxid, nickname="")
# 如果需要更准确的群昵称,调用 GetChatroomMemberDetail 接口
if sender_wxid: # 只有当发送者 wxid 可用时才尝试获取更准确的昵称
# 获取群聊发送者的nickname
if sender_wxid:
accurate_nickname = await self._get_group_member_nickname(abm.group_id, sender_wxid)
if accurate_nickname:
abm.sender.nickname = accurate_nickname
# 对于群聊,session_id 可以是群聊 ID 或发送者 ID + 群聊 ID (如果 unique_session 为 True)
if self.unique_session:
# 需要获取发送者的 wxid,这可能需要额外的接口调用或从消息中解析
# 暂时使用 from_user_name 作为 session_id 的一部分
abm.session_id = f"{from_user_name}_{to_user_name}" # 示例,可能需要调整
abm.session_id = f"{from_user_name}_{to_user_name}"
else:
abm.session_id = from_user_name
# logger.info("跳过群消息")
# pass
else:
abm.type = MessageType.FRIEND_MESSAGE
abm.group_id = "" # 私聊没有群组 ID
abm.group_id = ""
abm.sender = MessageMember(user_id=from_user_name, nickname="") # 暂时没有私聊发送者的昵称
abm.session_id = from_user_name # 私聊的 session_id 是发送者 ID
# 如果是来自 'weixin' 的消息,忽略
if from_user_name == 'weixin':
logger.info("收到来自 'weixin' 的消息,忽略!")
return
abm.session_id = from_user_name
return True
async def _get_group_member_nickname(self, group_id: str, member_wxid: str) -> Optional[str]:
"""
@@ -418,10 +412,9 @@ class WeChatPadProAdapter(Platform):
if member.get("user_name") == member_wxid:
return member.get("nick_name")
logger.warning(f"在群 {group_id} 中未找到成员 {member_wxid} 的昵称")
return None
else:
logger.error(f"获取群成员详情失败: {response.status}, {response_data}")
return None
return None
except aiohttp.ClientConnectorError as e:
logger.error(f"连接到 WeChatPadPro 服务失败: {e}")
return None
@@ -476,7 +469,6 @@ class WeChatPadProAdapter(Platform):
except Exception as e:
logger.error(f"解析引用消息 XML 失败: {e}")
pass
# elif msg_type == ... # Add handling for other message types here
else:
logger.warning(f"收到未处理的消息类型: {msg_type}, 原始消息: {raw_message}")
# abm.message remains empty [] for unhandled types
@@ -500,11 +492,3 @@ class WeChatPadProAdapter(Platform):
得到一个平台的元数据。
"""
return self.metadata
async def send_by_session(
self, session: MessageSesion, message_chain: MessageChain
) -> Awaitable[Any]:
"""
通过会话发送消息。
"""
logger.info(f"向会话 {session} 发送消息: {message_chain}")
@@ -1,4 +1,4 @@
import time
import asyncio
import aiohttp
from astrbot.core.platform.astr_message_event import AstrMessageEvent
from astrbot.core.platform.astrbot_message import AstrBotMessage, MessageType
@@ -21,169 +21,77 @@ class WeChatPadProMessageEvent(AstrMessageEvent):
# 添加平台特定的参数,例如适配器实例
adapter: object, # 传递适配器实例
):
# logger.info(f"WeChatPadProMessageEvent __init__ called with:")
# logger.info(f" message_str: {message_str}")
# logger.info(f" message_obj: {message_obj}")
# logger.info(f" message_obj.message: {message_obj.message}") # Log the message components list
# logger.info(f" platform_meta: {platform_meta}")
# logger.info(f" session_id: {session_id}")
# logger.info(f" adapter: {adapter}")
# Pass the message components list to the parent class constructor
# Pass message_str to the parent class constructor, similar to gewechat adapter
# Pass message_str and message_obj to the parent class constructor, similar to fake adapter
super().__init__(message_str, message_obj, platform_meta, session_id)
self.message_obj = message_obj # Save the full message object
self.adapter = adapter # Save the adapter instance
async def send(self, message: MessageChain):
"""
发送消息到 WeChatPadPro 平台。
"""
# 在这里实现将 MessageChain 转换为 WeChatPadPro 消息格式并发送的逻辑
# 遍历消息链,处理不同类型的消息组件
for component in message.chain:
# logger.info(f"Processing component: {component}") # Log the component
# logger.info(f"Type of component: {type(component)}") # Log the type of the component
# logger.info(f"Image class in scope: {Image}") # Log the Image class itself
time.sleep(1)
if isinstance(component, Plain):
# 发送文本消息
message_text = component.text
# 实现 reply_with_mention 功能
if (
self.message_obj.type == MessageType.GROUP_MESSAGE # 确保是群聊消息
and self.adapter.settings.get("reply_with_mention", False) # 检查适配器设置是否启用 reply_with_mention
and self.message_obj.sender # 确保有发送者信息
and (self.message_obj.sender.user_id or self.message_obj.sender.nickname) # 确保发送者有 ID 或昵称
):
# 在文本消息前加上 @ 消息发送者的信息
# 优先使用 nickname,如果没有则使用 user_id
mention_text = self.message_obj.sender.nickname if self.message_obj.sender.nickname else self.message_obj.sender.user_id
message_text = f"@{mention_text} {message_text}"
logger.info(f"已添加 @ 信息: {message_text}")
async with aiohttp.ClientSession() as session:
for comp in message.chain:
await asyncio.sleep(1)
if isinstance(comp, Plain):
await self._send_text(session, comp.text)
elif isinstance(comp, Image):
await self._send_image(session, comp)
await super().send(message)
if message_text:
payload = {
"MsgItem": [
{
"MsgType": 1, # 1 for Text
"TextContent": message_text,
"ToUserName": self.session_id, # 接收者 wxid
}
]
}
url = f"{self.adapter.base_url}/message/SendTextMessage" # 使用文本消息发送接口
params = {"key": self.adapter.auth_key}
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, params=params, json=payload) as response:
response_data = await response.json()
if response.status == 200 and response_data.get("Code") == 200:
logger.info(f"成功发送文本消息到 {self.session_id}: {message_text}")
else:
logger.error(f"发送文本消息失败到 {self.session_id}: {response.status}, {response_data}")
except aiohttp.ClientConnectorError as e:
logger.error(f"连接到 WeChatPadPro 服务失败: {e}")
except Exception as e:
logger.error(f"发送文本消息时发生错误: {e}")
async def _send_image(self, session: aiohttp.ClientSession, comp: Image):
b64 = await comp.convert_to_base64()
raw = self._validate_base64(b64)
b64c = self._compress_image(raw)
payload = {"MsgItem":[{"ImageContent": b64c, "MsgType":3, "ToUserName": self.session_id}]}
url = f"{self.adapter.base_url}/message/SendImageNewMessage"
await self._post(session, url, payload)
elif isinstance(component, Image):
# 发送图片消息
if hasattr(component, "convert_to_base64"):
try:
image_base64 = await component.convert_to_base64()
except Exception as e:
logger.error(f"Error converting image to base64: {e}")
continue
else:
logger.error("Image component missing convert_to_base64, skipping image send.")
continue
# logger.info(f"转换后的base64图片:{image_base64}")
async def _send_text(self, session: aiohttp.ClientSession, text: str):
if (
self.message_obj.type == MessageType.GROUP_MESSAGE # 确保是群聊消息
and self.adapter.settings.get("reply_with_mention", False) # 检查适配器设置是否启用 reply_with_mention
and self.message_obj.sender # 确保有发送者信息
and (self.message_obj.sender.user_id or self.message_obj.sender.nickname) # 确保发送者有 ID 或昵称
):
# 优先使用 nickname,如果没有则使用 user_id
mention_text = self.message_obj.sender.nickname or self.message_obj.sender.user_id
message_text = f"@{mention_text} {text}"
logger.info(f"已添加 @ 信息: {message_text}")
# Base64图片格式校验
try:
image_data = base64.b64decode(image_base64, validate=True)
logger.info("Base64图片格式校验成功。")
except (base64.binascii.Error, ValueError) as e:
logger.error(f"Base64图片格式校验失败: {e}")
await self.send(MessageChain([Plain("发送图片失败:图片编码格式不正确。")]))
continue # 跳过发送此图片
payload = {"MsgItem": [{"MsgType": 1, "TextContent": text, "ToUserName": self.session_id}]}
url = f"{self.adapter.base_url}/message/SendTextMessage"
await self._post(session, url, payload)
# 图片压缩处理
try:
img = PILImage.open(io.BytesIO(image_data)) # 使用别名 PILImage
@staticmethod
def _validate_base64(b64: str) -> bytes:
return base64.b64decode(b64, validate=True)
# 示例压缩:对于 JPEG 格式,降低质量;对于其他格式,转换为 JPEG 并降低质量
output_buffer = io.BytesIO()
if img.format == 'JPEG':
img.save(output_buffer, format='JPEG', quality=80) # 降低JPEG质量到80
else:
# 尝试转换为JPEG进行压缩,如果图片是透明的,先转换为RGB
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
img.save(output_buffer, format='JPEG', quality=80) # 转换为JPEG并降低质量
@staticmethod
def _compress_image(data: bytes) -> str:
img = PILImage.open(io.BytesIO(data))
buf = io.BytesIO()
if img.format == "JPEG":
img.save(buf, "JPEG", quality=80)
else:
if img.mode in ("RGBA","P"):
img = img.convert("RGB")
img.save(buf, "JPEG", quality=80)
# logger.info("图片处理完成!!!")
return base64.b64encode(buf.getvalue()).decode()
compressed_image_base64 = base64.b64encode(output_buffer.getvalue()).decode('utf-8')
logger.info(f"图片压缩成功,原大小: {len(image_base64)} bytes, 压缩后大小: {len(compressed_image_base64)} bytes")
image_base64_to_send = compressed_image_base64 # 使用压缩后的base64
except Exception as e:
logger.error(f"图片压缩处理失败: {e}")
# 如果压缩失败,可以选择发送原图或者跳过
# 这里选择发送原图,或者可以根据需求发送错误消息并跳过
image_base64_to_send = image_base64 # 压缩失败,发送原图
logger.warning("图片压缩失败,将尝试发送原图。")
async def _post(self, session, url, payload):
params = {"key": self.adapter.auth_key}
try:
async with session.post(url, params=params, json=payload) as resp:
data = await resp.json()
if resp.status != 200 or data.get("Code") != 200:
logger.error(f"{url} failed: {resp.status} {data}")
except Exception as e:
logger.error(f"{url} error: {e}")
payload = {
"MsgItem": [
{
"AtWxIDList": [], # 根据需要添加 @ 的用户 wxid 列表
"ImageContent": image_base64_to_send, # 使用处理后的base64
"MsgType": 3, # 图片消息类型
"TextContent": "",
"ToUserName": self.session_id, # 接收者 wxid
}
]
}
url = f"{self.adapter.base_url}/message/SendImageNewMessage" # 使用新的图片发送接口
params = {"key": self.adapter.auth_key}
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, params=params, json=payload) as response:
response_data = await response.json()
logger.info(response_data)
if response.status == 200 and response_data.get("Code") == 200:
logger.info(f"成功发送图片消息到 {self.session_id}")
else:
logger.error(f"发送图片消息失败到 {self.session_id}: {response.status}, {response_data}")
except aiohttp.ClientConnectorError as e:
logger.error(f"连接到 WeChatPadPro 服务失败: {e}")
except Exception as e:
logger.error(f"发送图片消息时发生错误: {e}")
except Exception as e:
logger.error(f"处理图片消息失败: {e}")
# 可以选择发送一个错误提示文本消息
await self.send(MessageChain([Plain("发送图片失败。")]))
# TODO: 添加对其他消息组件类型的处理 (Record, Video, At等)
# elif isinstance(component, Record):
# pass
# elif isinstance(component, Video):
# pass
# elif isinstance(component, At):
# pass
# ...
await super().send(message) # 调用父类的 send 方法进行指标上报等操作
# 根据 WeChatPadPro 的事件特点,可能需要重写 AstrMessageEvent 中的其他方法
# 例如:
# def get_sender_id(self) -> str:
# # 从 self.message_obj 中获取发送者 ID
# return self.message_obj.sender.user_id
#
# def is_private_chat(self) -> bool:
# # 根据 self.message_obj 判断是否是私聊
# return self.message_obj.type == MessageType.FRIEND_MESSAGE
# TODO: 添加对其他消息组件类型的处理 (Record, Video, At等)
# elif isinstance(component, Record):
# pass
# elif isinstance(component, Video):
# pass
# elif isinstance(component, At):
# pass
# ...