feat: 使用groupby来合并aiocqhttp连续的文本段

This commit is contained in:
kkjz
2025-04-20 18:09:04 +08:00
parent 98427345cf
commit d3bd775a79
@@ -3,7 +3,7 @@ import time
import asyncio
import logging
import uuid
import re
import itertools
from typing import Awaitable, Any
from aiocqhttp import CQHttp, Event
from astrbot.api.platform import (
@@ -160,14 +160,6 @@ class AiocqhttpAdapter(Platform):
return abm
def _is_url_or_ip(self,text: str) -> bool:
"""
判断一个字符串是否为网址(http/https 开头)或 IP 地址。
"""
url_pattern = r"^(?:http|https)://.+$"
ip_pattern = r"^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
return bool(re.match(url_pattern, text) or re.match(ip_pattern, text))
async def _convert_handle_message_event(
self, event: Event, get_reply=True
) -> AstrBotMessage:
@@ -211,99 +203,87 @@ class AiocqhttpAdapter(Platform):
return
# 按消息段类型类型适配
for idx, m in enumerate(event.message):
t = m["type"]
for t, m_group in itertools.groupby(event.message, key=lambda x: x["type"]):
a = None
if t == "text":
should_append_new = True # 是否需要正常处理的标签
# 合并相邻文本段的情况
if idx > 0 and event.message[idx - 1]["type"] == "text":
prev_text = event.message[idx - 1]["data"]["text"]
curr_text = m["data"]["text"].strip()
# 处理网址或IP地址的特殊情况,处理两端文本中间为url时的情况
if (prev_text.endswith(" ") and self._is_url_or_ip(curr_text)) or \
(idx > 1 and m["data"]["text"].startswith(" ") and self._is_url_or_ip(prev_text.strip())):
# 合并到前一个文本段
message_str += " " + curr_text
event.message[idx - 1]["data"]["text"] = message_str
a = ComponentTypes[t](**event.message[idx - 1]["data"])
abm.message[-1] = a
should_append_new = False
# 如果不需要合并,添加为新的消息段
if should_append_new:
message_str += m["data"]["text"].strip()
a = ComponentTypes[t](**m["data"])
abm.message.append(a)
for m in m_group:
message_str += m["data"]["text"]
message_str = message_str.strip()
a = ComponentTypes[t](**m["data"])
abm.message.append(a)
elif t == "file":
if m["data"].get("url") and m["data"].get("url").startswith("http"):
# Lagrange
logger.info("guessing lagrange")
for m in m_group:
if m["data"].get("url") and m["data"].get("url").startswith("http"):
# Lagrange
logger.info("guessing lagrange")
file_name = m["data"].get("file_name", "file")
path = os.path.join("data/temp", file_name)
await download_file(m["data"]["url"], path)
file_name = m["data"].get("file_name", "file")
path = os.path.join("data/temp", file_name)
await download_file(m["data"]["url"], path)
m["data"] = {"file": path, "name": file_name}
a = ComponentTypes[t](**m["data"]) # noqa: F405
abm.message.append(a)
else:
try:
# Napcat, LLBot
ret = await self.bot.call_action(
action="get_file",
file_id=event.message[0]["data"]["file_id"],
)
if not ret.get("file", None):
raise ValueError(f"无法解析文件响应: {ret}")
if not os.path.exists(ret["file"]):
raise FileNotFoundError(
f"文件不存在或者权限问题: {ret['file']}。如果您使用 Docker 部署了 AstrBot 或者消息协议端(Napcat等),请先映射路径。如果路径在 /root 目录下,请用 sudo 打开 AstrBot"
)
m["data"] = {"file": ret["file"], "name": ret["file_name"]}
m["data"] = {"file": path, "name": file_name}
a = ComponentTypes[t](**m["data"]) # noqa: F405
abm.message.append(a)
except ActionFailed as e:
logger.error(f"获取文件失败: {e},此消息段将被忽略。")
except BaseException as e:
logger.error(f"获取文件失败: {e},此消息段将被忽略。")
else:
try:
# Napcat, LLBot
ret = await self.bot.call_action(
action="get_file",
file_id=event.message[0]["data"]["file_id"],
)
if not ret.get("file", None):
raise ValueError(f"无法解析文件响应: {ret}")
if not os.path.exists(ret["file"]):
raise FileNotFoundError(
f"文件不存在或者权限问题: {ret['file']}。如果您使用 Docker 部署了 AstrBot 或者消息协议端(Napcat等),请先映射路径。如果路径在 /root 目录下,请用 sudo 打开 AstrBot"
)
m["data"] = {"file": ret["file"], "name": ret["file_name"]}
a = ComponentTypes[t](**m["data"]) # noqa: F405
abm.message.append(a)
except ActionFailed as e:
logger.error(f"获取文件失败: {e},此消息段将被忽略。")
except BaseException as e:
logger.error(f"获取文件失败: {e},此消息段将被忽略。")
elif t == "reply":
if not get_reply:
a = ComponentTypes[t](**m["data"]) # noqa: F405
abm.message.append(a)
else:
try:
reply_event_data = await self.bot.call_action(
action="get_msg",
message_id=int(m["data"]["id"]),
)
abm_reply = await self._convert_handle_message_event(
Event.from_payload(reply_event_data), get_reply=False
)
reply_seg = Reply(
id=abm_reply.message_id,
chain=abm_reply.message,
sender_id=abm_reply.sender.user_id,
sender_nickname=abm_reply.sender.nickname,
time=abm_reply.timestamp,
message_str=abm_reply.message_str,
text=abm_reply.message_str, # for compatibility
qq=abm_reply.sender.user_id, # for compatibility
)
abm.message.append(reply_seg)
except BaseException as e:
logger.error(f"获取引用消息失败: {e}")
for m in m_group:
if not get_reply:
a = ComponentTypes[t](**m["data"]) # noqa: F405
abm.message.append(a)
else:
try:
reply_event_data = await self.bot.call_action(
action="get_msg",
message_id=int(m["data"]["id"]),
)
abm_reply = await self._convert_handle_message_event(
Event.from_payload(reply_event_data), get_reply=False
)
reply_seg = Reply(
id=abm_reply.message_id,
chain=abm_reply.message,
sender_id=abm_reply.sender.user_id,
sender_nickname=abm_reply.sender.nickname,
time=abm_reply.timestamp,
message_str=abm_reply.message_str,
text=abm_reply.message_str, # for compatibility
qq=abm_reply.sender.user_id, # for compatibility
)
abm.message.append(reply_seg)
except BaseException as e:
logger.error(f"获取引用消息失败: {e}")
a = ComponentTypes[t](**m["data"]) # noqa: F405
abm.message.append(a)
else:
a = ComponentTypes[t](**m["data"]) # noqa: F405
abm.message.append(a)
for m in m_group:
a = ComponentTypes[t](**m["data"]) # noqa: F405
abm.message.append(a)
abm.timestamp = int(time.time())
abm.message_str = message_str