diff --git a/astrbot/core/pipeline/respond/stage.py b/astrbot/core/pipeline/respond/stage.py index c990e9d8d..275926ba9 100644 --- a/astrbot/core/pipeline/respond/stage.py +++ b/astrbot/core/pipeline/respond/stage.py @@ -32,6 +32,7 @@ class RespondStage(Stage): Comp.Node: lambda comp: bool(comp.content), # 转发节点 Comp.Nodes: lambda comp: bool(comp.nodes), # 多个转发节点 Comp.File: lambda comp: bool(comp.file_ or comp.url), + Comp.WechatEmoji: lambda comp: comp.md5 is not None, # 微信表情 } async def initialize(self, ctx: PipelineContext): diff --git a/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_adapter.py b/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_adapter.py index 7e7755501..0ad38819e 100644 --- a/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_adapter.py +++ b/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_adapter.py @@ -6,9 +6,8 @@ from typing import Optional import aiohttp import websockets - from astrbot import logger -from astrbot.api.message_components import Plain, Image +from astrbot.api.message_components import Plain, Image, At from astrbot.api.platform import Platform, PlatformMetadata from astrbot.core.message.message_event_result import MessageChain from astrbot.core.platform.astrbot_message import ( @@ -22,6 +21,13 @@ from astrbot.core.platform.astr_message_event import MessageSesion from ...register import register_platform_adapter from .wechatpadpro_message_event import WeChatPadProMessageEvent +try: + from .xml_data_parser import GeweDataParser +except ImportError as e: + logger.warning( + f"警告: 可能未安装 defusedxml 依赖库,将导致无法解析微信的 表情包、引用 类型的消息: {str(e)}" + ) + @register_platform_adapter("wechatpadpro", "WeChatPadPro 消息平台适配器") class WeChatPadProAdapter(Platform): @@ -59,6 +65,18 @@ class WeChatPadProAdapter(Platform): ) # 持久化文件路径 self.ws_handle_task = None + # 添加图片消息缓存,用于引用消息处理 + self.cached_images = {} + """缓存图片消息。key是NewMsgId (对应引用消息的svrid),value是图片的base64数据""" + # 设置缓存大小限制,避免内存占用过大 + self.max_image_cache = 50 + + # 添加文本消息缓存,用于引用消息处理 + self.cached_texts = {} + """缓存文本消息。key是NewMsgId (对应引用消息的svrid),value是消息文本内容""" + # 设置文本缓存大小限制 + self.max_text_cache = 100 + async def run(self) -> None: """ 启动平台适配器的运行实例。 @@ -102,7 +120,7 @@ class WeChatPadProAdapter(Platform): logger.warning("登录失败或超时,WeChatPadPro 适配器将关闭。") await self.terminate() return - + # 登录成功后,连接 WebSocket 接收消息 self.ws_handle_task = asyncio.create_task(self.connect_websocket()) @@ -161,27 +179,21 @@ class WeChatPadProAdapter(Platform): return True # login_state == 3 为离线状态 elif login_state == 3: - logger.info( - "WeChatPadPro 设备不在线。" - ) + logger.info("WeChatPadPro 设备不在线。") return False else: - logger.error( - f"未知的在线状态: {login_state:}" - ) + logger.error(f"未知的在线状态: {login_state:}") return False # Code == 300 为微信退出状态。 elif response.status == 200 and response_data.get("Code") == 300: - logger.info( - "WeChatPadPro 设备已退出。" - ) + logger.info("WeChatPadPro 设备已退出。") return False else: logger.error( f"检查在线状态失败: {response.status}, {response_data}" ) return False - + except aiohttp.ClientConnectorError as e: logger.error(f"连接到 WeChatPadPro 服务失败: {e}") return False @@ -364,7 +376,9 @@ class WeChatPadProAdapter(Platform): logger.error(f"处理 WebSocket 消息时发生错误: {e}") break except Exception as e: - logger.error(f"WebSocket 连接失败: {e}, 请检查WeChatPadPro服务状态,或尝试重启WeChatPadPro适配器。") + logger.error( + f"WebSocket 连接失败: {e}, 请检查WeChatPadPro服务状态,或尝试重启WeChatPadPro适配器。" + ) await asyncio.sleep(5) async def handle_websocket_message(self, message: str): @@ -439,7 +453,7 @@ class WeChatPadProAdapter(Platform): ): # 再根据消息类型处理消息内容 await self._process_message_content(abm, raw_message, msg_type, content) - + return abm return None @@ -457,6 +471,7 @@ class WeChatPadProAdapter(Platform): """ if from_user_name == "weixin": return False + at_me = False if "@chatroom" in from_user_name: abm.type = MessageType.GROUP_MESSAGE abm.group_id = from_user_name @@ -478,6 +493,14 @@ class WeChatPadProAdapter(Platform): abm.session_id = f"{from_user_name}_{to_user_name}" else: abm.session_id = from_user_name + + msg_source = raw_message.get("msg_source", "") + if self.wxid in msg_source: + at_me = True + if "在群聊中@了你" in raw_message.get("push_content", ""): + at_me = True + if at_me: + abm.message.insert(0, At(qq=abm.self_id, name="")) else: abm.type = MessageType.FRIEND_MESSAGE abm.group_id = "" @@ -575,6 +598,25 @@ class WeChatPadProAdapter(Platform): abm.message.append(Plain(abm.message_str)) else: # 私聊消息 abm.message.append(Plain(abm.message_str)) + + # 缓存文本消息,以便引用消息可以查找 + try: + # 获取msg_id作为缓存的key + new_msg_id = raw_message.get("new_msg_id") + if new_msg_id: + # 限制缓存大小 + if ( + len(self.cached_texts) >= self.max_text_cache + and self.cached_texts + ): + # 删除最早的一条缓存 + oldest_key = next(iter(self.cached_texts)) + self.cached_texts.pop(oldest_key) + + logger.debug(f"缓存文本消息,new_msg_id={new_msg_id}") + self.cached_texts[str(new_msg_id)] = content + except Exception as e: + logger.error(f"缓存文本消息失败: {e}") elif msg_type == 3: # 图片消息 from_user_name = raw_message.get("from_user_name", {}).get("str", "") @@ -588,15 +630,57 @@ class WeChatPadProAdapter(Platform): ) if image_bs64_data: abm.message.append(Image.fromBase64(image_bs64_data)) + # 缓存图片,以便引用消息可以查找 + try: + # 获取msg_id作为缓存的key + new_msg_id = raw_message.get("new_msg_id") + if new_msg_id: + # 限制缓存大小 + if ( + len(self.cached_images) >= self.max_image_cache + and self.cached_images + ): + # 删除最早的一条缓存 + oldest_key = next(iter(self.cached_images)) + self.cached_images.pop(oldest_key) + + logger.debug(f"缓存图片消息,new_msg_id={new_msg_id}") + self.cached_images[str(new_msg_id)] = image_bs64_data + except Exception as e: + logger.error(f"缓存图片消息失败: {e}") elif msg_type == 47: # 视频消息 (注意:表情消息也是 47,需要区分) - logger.warning("收到视频消息,待实现。") + data_parser = GeweDataParser( + content=content, + is_private_chat=(abm.type != MessageType.GROUP_MESSAGE), + raw_message=raw_message, + ) + emoji_message = data_parser.parse_emoji() + if emoji_message is not None: + abm.message.append(emoji_message) elif msg_type == 50: # 语音/视频 logger.warning("收到语音/视频消息,待实现。") elif msg_type == 49: - # 引用消息 - logger.warning("收到引用消息,待实现。") + try: + parser = GeweDataParser( + content=content, + is_private_chat=(abm.type != MessageType.GROUP_MESSAGE), + cached_texts=self.cached_texts, + cached_images=self.cached_images, + raw_message=raw_message, + downloader=self._download_raw_image, + ) + components = await parser.parse_mutil_49() + if components: + abm.message.extend(components) + abm.message_str = "\n".join( + c.text for c in components if isinstance(c, Plain) + ) + except Exception as e: + logger.warning(f"msg_type 49 处理失败: {e}") + abm.message.append(Plain("[XML 消息处理失败]")) + abm.message_str = "[XML 消息处理失败]" else: logger.warning(f"收到未处理的消息类型: {msg_type}。") diff --git a/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_message_event.py b/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_message_event.py index 04bb02936..3c37be345 100644 --- a/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_message_event.py +++ b/astrbot/core/platform/sources/wechatpadpro/wechatpadpro_message_event.py @@ -7,7 +7,7 @@ import aiohttp from PIL import Image as PILImage # 使用别名避免冲突 from astrbot import logger -from astrbot.core.message.components import Image, Plain # Import Image +from astrbot.core.message.components import Image, Plain, WechatEmoji # Import Image from astrbot.core.message.message_event_result import MessageChain from astrbot.core.platform.astr_message_event import AstrMessageEvent from astrbot.core.platform.astrbot_message import AstrBotMessage, MessageType @@ -38,6 +38,8 @@ class WeChatPadProMessageEvent(AstrMessageEvent): await self._send_text(session, comp.text) elif isinstance(comp, Image): await self._send_image(session, comp) + elif isinstance(comp, WechatEmoji): + await self._send_emoji(session, comp) await super().send(message) async def _send_image(self, session: aiohttp.ClientSession, comp: Image): @@ -73,12 +75,29 @@ class WeChatPadProMessageEvent(AstrMessageEvent): message_text = text payload = { "MsgItem": [ - {"MsgType": 1, "TextContent": message_text, "ToUserName": self.session_id} + { + "MsgType": 1, + "TextContent": message_text, + "ToUserName": self.session_id, + } ] } url = f"{self.adapter.base_url}/message/SendTextMessage" await self._post(session, url, payload) + async def _send_emoji(self, session: aiohttp.ClientSession, comp: WechatEmoji): + payload = { + "EmojiList": [ + { + "EmojiMd5": comp.md5, + "EmojiSize": comp.md5_len, + "ToUserName": self.session_id, + } + ] + } + url = f"{self.adapter.base_url}/message/SendEmojiMessage" + await self._post(session, url, payload) + @staticmethod def _validate_base64(b64: str) -> bytes: return base64.b64decode(b64, validate=True) diff --git a/astrbot/core/platform/sources/wechatpadpro/xml_data_parser.py b/astrbot/core/platform/sources/wechatpadpro/xml_data_parser.py new file mode 100644 index 000000000..054ca1b48 --- /dev/null +++ b/astrbot/core/platform/sources/wechatpadpro/xml_data_parser.py @@ -0,0 +1,160 @@ +from defusedxml import ElementTree as eT +from astrbot.api import logger +from astrbot.api.message_components import ( + WechatEmoji as Emoji, + Plain, + Image, + BaseMessageComponent, +) + + +class GeweDataParser: + def __init__( + self, + content: str, + is_private_chat: bool = False, + cached_texts=None, + cached_images=None, + raw_message: dict = None, + downloader=None, + ): + self._xml = None + self.content = content + self.is_private_chat = is_private_chat + self.cached_texts = cached_texts or {} + self.cached_images = cached_images or {} + self.downloader = downloader + + raw_message = raw_message or {} + self.from_user_name = raw_message.get("from_user_name", {}).get("str", "") + self.to_user_name = raw_message.get("to_user_name", {}).get("str", "") + self.msg_id = raw_message.get("msg_id", "") + + def _format_to_xml(self): + if self._xml: + return self._xml + + try: + msg_str = self.content + if not self.is_private_chat: + parts = self.content.split(":\n", 1) + msg_str = parts[1] if len(parts) == 2 else self.content + + self._xml = eT.fromstring(msg_str) + return self._xml + except Exception as e: + logger.error(f"[XML解析失败] {e}") + raise + + async def parse_mutil_49(self) -> list[BaseMessageComponent] | None: + """ + 处理 msg_type == 49 的多种 appmsg 类型(目前支持 type==57) + """ + try: + appmsg_type = self._format_to_xml().findtext(".//appmsg/type") + if appmsg_type == "57": + return await self.parse_reply() + except Exception as e: + logger.warning(f"[parse_mutil_49] 解析失败: {e}") + return None + + async def parse_reply(self) -> list[BaseMessageComponent]: + """ + 处理 type == 57 的引用消息:支持文本(1)、图片(3)、嵌套49(49) + """ + components = [] + + try: + appmsg = self._format_to_xml().find("appmsg") + if appmsg is None: + return [Plain("[引用消息解析失败]")] + + refermsg = appmsg.find("refermsg") + if refermsg is None: + return [Plain("[引用消息解析失败]")] + + quote_type = int(refermsg.findtext("type", "0")) + nickname = refermsg.findtext("displayname", "未知发送者") + quote_content = refermsg.findtext("content", "") + svrid = refermsg.findtext("svrid") + + match quote_type: + case 1: # 文本引用 + quoted_text = self.cached_texts.get(str(svrid), quote_content) + components.append(Plain(f"[引用] {nickname}: {quoted_text}")) + + case 3: # 图片引用 + quoted_image_b64 = self.cached_images.get(str(svrid)) + if not quoted_image_b64: + try: + quote_xml = eT.fromstring(quote_content) + img = quote_xml.find("img") + cdn_url = ( + img.get("cdnbigimgurl") or img.get("cdnmidimgurl") + if img is not None + else None + ) + if cdn_url and self.downloader: + image_resp = await self.downloader( + self.from_user_name, self.to_user_name, self.msg_id + ) + quoted_image_b64 = ( + image_resp.get("Data", {}) + .get("Data", {}) + .get("Buffer") + ) + except Exception as e: + logger.warning(f"[引用图片解析失败] svrid={svrid} err={e}") + + if quoted_image_b64: + components.extend( + [ + Image.fromBase64(quoted_image_b64), + Plain(f"[引用] {nickname}: [引用的图片]"), + ] + ) + else: + components.append( + Plain(f"[引用] {nickname}: [引用的图片 - 未能获取]") + ) + + case 49: # 嵌套引用 + try: + nested_root = eT.fromstring(quote_content) + nested_title = nested_root.findtext(".//appmsg/title", "") + components.append(Plain(f"[引用] {nickname}: {nested_title}")) + except Exception as e: + logger.warning(f"[嵌套引用解析失败] err={e}") + components.append(Plain(f"[引用] {nickname}: [嵌套引用消息]")) + + case _: # 其他未识别类型 + logger.info(f"[未知引用类型] quote_type={quote_type}") + components.append(Plain(f"[引用] {nickname}: [不支持的引用类型]")) + + # 主消息标题 + title = appmsg.findtext("title", "") + if title: + components.append(Plain(title)) + + except Exception as e: + logger.error(f"[parse_reply] 总体解析失败: {e}") + return [Plain("[引用消息解析失败]")] + + return components + + def parse_emoji(self) -> Emoji | None: + """ + 处理 msg_type == 47 的表情消息(emoji) + """ + try: + emoji_element = self._format_to_xml().find(".//emoji") + if emoji_element is not None: + return Emoji( + md5=emoji_element.get("md5"), + md5_len=emoji_element.get("len"), + cdnurl=emoji_element.get("cdnurl"), + ) + except Exception as e: + logger.error(f"[parse_emoji] 解析失败: {e}") + + return None