Files
AstrBot/model/platform/qqchan.py
T
Soulter 37488118a6 feat: 1. keyword指令支持记录图片;
2. qq频道转gocq数据结构兼容层实现;
perf: 1. 优化代码结构;
2. log 支持环境变量指定log等级
2023-09-02 00:24:13 +08:00

128 lines
5.1 KiB
Python

import io
import botpy
from PIL import Image as PILImage
from botpy.message import Message, DirectMessage
import re
import asyncio
import requests
from cores.qqbot.personality import personalities
from util import general_utils as gu
from nakuru.entities.components import Plain, At, Image
class NakuruGuildMember():
tiny_id: int
user_id: int
title: str
nickname: str
role: int
icon_url: str
class NakuruGuildMessage():
type: str = "GuildMessage"
self_id: int
self_tiny_id: int
sub_type: str
message_id: str
guild_id: int
channel_id: int
user_id: int
message: list
sender: NakuruGuildMember
raw_message: Message
def __str__(self) -> str:
return str(self.__dict__)
class QQChan():
def run_bot(self, botclient, appid, token):
intents = botpy.Intents(public_guild_messages=True, direct_message=True)
self.client = botclient
self.client.run(appid=appid, token=token)
# gocq-频道SDK兼容层(发)
def gocq_compatible_send(self, gocq_message_chain: list):
plain_text = ""
image_path = None # only one img supported
for i in gocq_message_chain:
if isinstance(i, Plain):
plain_text += i.text
elif isinstance(i, Image) and image_path == None:
image_path = i.path
return plain_text, image_path
# gocq-频道SDK兼容层(收)
def gocq_compatible_receive(self, message: Message) -> NakuruGuildMessage:
ngm = NakuruGuildMessage()
ngm.self_id = message.mentions[0].id
ngm.self_tiny_id = message.mentions[0].id
ngm.sub_type = "normal"
ngm.message_id = message.id
ngm.guild_id = int(message.channel_id)
ngm.channel_id = int(message.channel_id)
ngm.user_id = int(message.author.id)
msg = []
plain_content = message.content.replace("<@!"+str(ngm.self_id)+">", "").strip()
msg.append(Plain(plain_content))
if message.attachments:
for i in message.attachments:
if i.content_type.startswith("image"):
url = i.url
if not url.startswith("http"):
url = "https://"+url
img = Image.fromURL(url)
msg.append(img)
ngm.message = msg
ngm.sender = NakuruGuildMember()
ngm.sender.tiny_id = int(message.author.id)
ngm.sender.user_id = int(message.author.id)
ngm.sender.title = ""
ngm.sender.nickname = message.author.username
ngm.sender.role = 0
ngm.sender.icon_url = message.author.avatar
ngm.raw_message = message
return ngm
def send_qq_msg(self, message: NakuruGuildMessage, res, msg_ref = None):
gu.log("回复QQ频道消息: "+str(res), level=gu.LEVEL_INFO, tag="QQ频道", max_len=500)
plain_text = ""
image_path = None
if isinstance(res, list):
# 兼容gocq
plain_text, image_path = self.gocq_compatible_send(res)
elif isinstance(res, str):
plain_text = res
print(plain_text, image_path)
try:
reply_res = asyncio.run_coroutine_threadsafe(message.raw_message.reply(content=str(plain_text), message_reference = msg_ref, file_image=image_path), self.client.loop)
reply_res.result()
except BaseException as e:
# 分割过长的消息
if "msg over length" in str(e):
split_res = []
split_res.append(plain_text[:len(plain_text)//2])
split_res.append(plain_text[len(plain_text)//2:])
for i in split_res:
reply_res = asyncio.run_coroutine_threadsafe(message.raw_message.reply(content=str(i), message_reference = msg_ref, file_image=image_path), self.client.loop)
reply_res.result()
else:
# 发送qq信息
try:
# 防止被qq频道过滤消息
plain_text = plain_text.replace(".", " . ")
reply_res = asyncio.run_coroutine_threadsafe(message.raw_message.reply(content=str(plain_text), message_reference = msg_ref, file_image=image_path), self.client.loop)
# 发送信息
except BaseException as e:
print("QQ频道API错误: \n"+str(e))
try:
reply_res = asyncio.run_coroutine_threadsafe(message.raw_message.reply(content=str(str.join(" ", plain_text)), message_reference = msg_ref, file_image=image_path), self.client.loop)
except BaseException as e:
plain_text = re.sub(r'(https|http)?:\/\/(\w|\.|\/|\?|\=|\&|\%)*\b', '[被隐藏的链接]', str(e), flags=re.MULTILINE)
plain_text = plain_text.replace(".", "·")
asyncio.run_coroutine_threadsafe(message.raw_message.reply(content=plain_text), self.client.loop).result()
# send(message, f"QQ频道API错误:{str(e)}\n下面是格式化后的回答:\n{f_res}")