feat: 过滤器支持限定权限组;支持指定白名单是否忽略管理

This commit is contained in:
Soulter
2024-12-10 20:14:13 +08:00
parent f963194124
commit 47e70bd086
14 changed files with 109 additions and 33 deletions
+3 -1
View File
@@ -4,7 +4,9 @@ from astrbot.core.star.register import (
register_event_message_type as event_message_type,
register_regex as regex,
register_platform_adapter_type as platform_adapter_type,
register_permission_type as permission_type
)
from astrbot.core.star.filter.event_message_type import EventMessageTypeFilter, EventMessageType
from astrbot.core.star.filter.platform_adapter_type import PlatformAdapterTypeFilter, PlatformAdapterType
from astrbot.core.star.filter.platform_adapter_type import PlatformAdapterTypeFilter, PlatformAdapterType
from astrbot.core.star.filter.permission import PermissionTypeFilter, PermissionType
+13 -3
View File
@@ -17,7 +17,9 @@ DEFAULT_CONFIG = {
},
"reply_prefix": "",
"forward_threshold": 200,
"id_whitelist": []
"id_whitelist": [],
"wl_ignore_admin_on_group": True,
"wl_ignore_admin_on_friend": True
},
"provider": [],
"provider_settings": {
@@ -160,7 +162,13 @@ ADAPTER_CONFIG_TEMPLATE = {
# 配置项的中文描述、值类型
CONFIG_METADATA_2 = {
"config_version": {"description": "配置版本", "type": "int"},
"config_version": {"description": "配置版本", "type": "int", "invisible": True},
"dashboard": {
"invisible": True,
"description": "",
"type": "object",
"items": {}
},
"platform": {
"description": "平台配置",
"type": "list",
@@ -196,7 +204,9 @@ CONFIG_METADATA_2 = {
},
"reply_prefix": {"description": "回复前缀", "type": "string", "hint": "机器人回复消息时带有的前缀。"},
"forward_threshold": {"description": "转发消息的字数阈值", "type": "int", "hint": "超过一定字数后,机器人会将消息折叠成 QQ 群聊的 “转发消息”,以防止刷屏。目前仅 QQ 平台适配器适用。"},
"id_whitelist": {"description": "ID 白名单", "type": "list", "items": {"type": "int"}, "hint": "填写后,将只处理所填写的 ID 发来的消息事件。为空时表示不启用白名单过滤。可以使用 /myid 指令获取在某个平台上的 ID。"},
"id_whitelist": {"description": "ID 白名单", "type": "list", "items": {"type": "int"}, "hint": "填写后,将只处理所填写的 ID 发来的消息事件。为空时表示不启用白名单过滤。可以使用 /myid 指令获取在某个平台上的会话 ID。也可在 AstrBot 日志内获取会话 ID,当一条消息没通过白名单时,会输出 INFO 级别的日志。会话 ID 类似 aiocqhttp:GroupMessage:547540978"},
"wl_ignore_admin_on_group": {"description": "管理员群组消息无视 ID 白名单", "type": "bool"},
"wl_ignore_admin_on_friend": {"description": "管理员私聊消息无视 ID 白名单", "type": "bool"}
}
},
"provider": {
@@ -38,7 +38,11 @@ class ResultDecorateStage:
break
if plain_str and len(plain_str) > 150:
render_start = time.time()
url = await html_renderer.render_t2i(plain_str, return_url=True)
try:
url = await html_renderer.render_t2i(plain_str, return_url=True)
except BaseException:
logger.error("文本转图片失败,使用文本发送。")
return
if time.time() - render_start > 3:
logger.warning(f"文本转图片耗时超过了 3 秒,如果觉得很慢可以使用 /t2i 关闭文本转图片模式。")
if url:
+12 -5
View File
@@ -58,20 +58,27 @@ class WakingCheckStage(Stage):
handlers_parsed_params = {} # 注册了指令的 handler
for handler in star_handlers_registry:
# filter 需要满足 AND 的逻辑关系
passed = False
print(handler.handler_full_name)
passed = True
child_command_handler_md = None
if len(handler.event_filters) == 0:
# 不可能有这种情况, 也不允许有这种情况
continue
for filter in handler.event_filters:
try:
if isinstance(filter, CommandGroupFilter):
'''如果指令组过滤成功, 会返回叶子指令的 StarHandlerMetadata'''
ok, child_command_handler_md = filter.filter(event, self.ctx.astrbot_config)
if ok:
passed = True
if not ok:
passed = False
else:
handler = child_command_handler_md # handler 覆盖
break
else:
if filter.filter(event, self.ctx.astrbot_config):
passed = True
if not filter.filter(event, self.ctx.astrbot_config):
passed = False
break
except Exception as e:
# event.set_result(MessageEventResult().message(f"插件 {handler.handler_full_name} 报错:{e}"))
@@ -11,11 +11,17 @@ class WhitelistCheckStage(Stage):
'''
async def initialize(self, ctx: PipelineContext) -> None:
self.whitelist = ctx.astrbot_config['platform_settings']['id_whitelist']
self.wl_ignore_admin_on_group = ctx.astrbot_config['platform_settings']['wl_ignore_admin_on_group']
self.wl_ignore_admin_on_friend = ctx.astrbot_config['platform_settings']['wl_ignore_admin_on_friend']
async def process(self, event: AstrMessageEvent) -> Union[None, AsyncGenerator[None, None]]:
# 检查是否在白名单
if event.role == 'admin' and event.get_message_type() == MessageType.FRIEND_MESSAGE:
return
if self.wl_ignore_admin_on_group:
if event.role == 'admin' and event.get_message_type() == MessageType.GROUP_MESSAGE:
return
if self.wl_ignore_admin_on_friend:
if event.role == 'admin' and event.get_message_type() == MessageType.FRIEND_MESSAGE:
return
if event.unified_msg_origin not in self.whitelist:
logger.info(f"会话 {event.unified_msg_origin} 不在会话白名单中,已终止事件传播。")
event.stop_event()
+6 -5
View File
@@ -211,14 +211,15 @@ class AstrMessageEvent(abc.ABC):
def is_wake_up(self) -> bool:
'''
是否是唤醒机器人的事件。
机器人被唤醒的条件:
1. 消息以用户设置的唤醒前缀开头,默认是 `/`.
2. 消息中有 at 机器人的消息。
3. 是私聊。
'''
return self.is_wake
def is_admin(self) -> bool:
'''
是否是管理员。
'''
return self.role == "admin"
async def send(self, message: MessageChain):
'''
发送消息到消息平台。
@@ -60,8 +60,9 @@ class ProviderOpenAIOfficial(Provider):
try:
models_str = []
models = await self.client.models.list()
models = models.data
for model in models:
models_str.append(model['id'])
models_str.append(model.id)
return models_str
except NotFoundError as e:
raise Exception(f"获取模型列表失败:{e}")
+2 -1
View File
@@ -59,9 +59,10 @@ class CommandFilter(HandlerFilter, ParameterValidationMixin):
params = {}
try:
params = self.validate_and_convert_params(ls, self.handler_params)
# 解析完成咱也不能丢掉呀,留着给后面的用
except ValueError as e:
raise e
event.set_extra("parsed_params", params)
return True
+26
View File
@@ -0,0 +1,26 @@
import enum
from . import HandlerFilter
from astrbot.core.platform.astr_message_event import AstrMessageEvent
from astrbot.core.config import AstrBotConfig
from astrbot.core.platform.message_type import MessageType
class PermissionType(enum.Flag):
'''权限类型。当选择 MEMBER,ADMIN 也可以通过。
'''
ADMIN = "admin"
MEMBER = "member"
class PermissionTypeFilter(HandlerFilter):
def __init__(self, permission_type: PermissionType, raise_error: bool = True):
self.permission_type = permission_type
self.raise_error = raise_error
def filter(self, event: AstrMessageEvent, cfg: AstrBotConfig) -> bool:
'''过滤器
'''
if self.permission_type == PermissionType.ADMIN:
if not event.is_admin():
event.stop_event()
raise ValueError("您没有权限执行此操作。")
return True
+2 -1
View File
@@ -4,5 +4,6 @@ from .star_handler import (
register_command_group,
register_event_message_type,
register_platform_adapter_type,
register_regex
register_regex,
register_permission_type
)
+19 -3
View File
@@ -5,6 +5,7 @@ from ..filter.command import CommandFilter
from ..filter.command_group import CommandGroupFilter
from ..filter.event_message_type import EventMessageTypeFilter, EventMessageType
from ..filter.platform_adapter_type import PlatformAdapterTypeFilter, PlatformAdapterType
from ..filter.permission import PermissionTypeFilter, PermissionType
from ..filter.regex import RegexFilter
from typing import Awaitable, List, Dict
@@ -13,7 +14,7 @@ def get_handler_full_name(awatable: Awaitable) -> str:
'''获取 Handler 的全名'''
return f"{awatable.__module__}_{awatable.__name__}"
def get_handler_or_create(handler: Awaitable) -> StarHandlerMetadata:
def get_handler_or_create(handler: Awaitable, dont_add = False) -> StarHandlerMetadata:
'''获取 Handler 或者创建一个新的 Handler'''
handler_full_name = get_handler_full_name(handler)
if handler_full_name in star_handlers_map:
@@ -26,8 +27,9 @@ def get_handler_or_create(handler: Awaitable) -> StarHandlerMetadata:
handler=handler,
event_filters=[]
)
star_handlers_registry.append(md)
star_handlers_map[handler_full_name] = md
if not dont_add:
star_handlers_registry.append(md)
star_handlers_map[handler_full_name] = md
return md
def register_command(command_name: str = None, *args):
@@ -112,4 +114,18 @@ def register_regex(regex: str):
handler_md.event_filters.append(RegexFilter(regex))
return awatable
return decorator
def register_permission_type(permission_type: PermissionType, raise_error: bool = True):
'''注册一个 PermissionType
Args:
permission_type: PermissionType
raise_error: 如果没有权限,是否抛出错误到消息平台,并且停止事件传播。默认为 True
'''
def decorator(awatable):
handler_md = get_handler_or_create(awatable)
handler_md.event_filters.append(PermissionTypeFilter(permission_type, raise_error))
return awatable
return decorator
+1 -1
View File
@@ -24,7 +24,7 @@ class ParameterValidationMixin:
else:
result[param_name] = params[i]
else:
result[param_name] = type(param_type_or_default_val)(params[i])
result[param_name] = param_type_or_default_val(params[i])
except ValueError:
raise ValueError(f"参数 {param_name} 类型错误")
print(result)
-4
View File
@@ -84,7 +84,6 @@ def save_extension_config(post_config: dict):
class ConfigRoute(Route):
def __init__(self, context: RouteContext, core_lifecycle: AstrBotCoreLifecycle) -> None:
super().__init__(context)
self.config_key_dont_show = ['dashboard', 'config_version']
self.core_lifecycle = core_lifecycle
self.routes = {
'/config/get': ('GET', self.get_configs),
@@ -120,9 +119,6 @@ class ConfigRoute(Route):
async def _get_astrbot_config(self):
config = self.config
for key in self.config_key_dont_show:
if key in config:
del config[key]
return {
"metadata": CONFIG_METADATA_2,
"config": config,
+9 -4
View File
@@ -25,7 +25,7 @@ class Main(star.Star):
except BaseException as e:
pass
msg = "已注册的 AstrBot 内置指令:"
msg = "已注册的 AstrBot 内置指令:\n"
msg += f"""[System]
/plugin: 插件管理
/t2i: 开启/关闭文本转图片模式
@@ -76,14 +76,16 @@ class Main(star.Star):
user_id = str(event.get_sender_id())
ret = f"""SID: {sid} 此 ID 可用于设置会话白名单。/wl <SID> 添加白名单, /dwl <SID> 删除白名单。
UID: {user_id} 此 ID 可用于设置管理员。/op <UID> 授权管理员, /deop <UID> 取消管理员。"""
event.set_result(MessageEventResult().message(ret))
event.set_result(MessageEventResult().message(ret).use_t2i(False))
@filter.permission_type(filter.PermissionType.ADMIN)
@filter.command("op")
async def op(self, event: AstrMessageEvent, admin_id: str):
self.context.get_config()['admins_id'].append(admin_id)
self.context.get_config().save_config()
event.set_result(MessageEventResult().message("授权成功。"))
@filter.permission_type(filter.PermissionType.ADMIN)
@filter.command("deop")
async def deop(self, event: AstrMessageEvent, admin_id: str):
try:
@@ -92,13 +94,15 @@ UID: {user_id} 此 ID 可用于设置管理员。/op <UID> 授权管理员, /deo
event.set_result(MessageEventResult().message("取消授权成功。"))
except ValueError:
event.set_result(MessageEventResult().message("此用户 ID 不在管理员名单内。"))
@filter.permission_type(filter.PermissionType.ADMIN)
@filter.command("wl")
async def wl(self, event: AstrMessageEvent, sid: str):
self.context.get_config()['platform_settings']['id_whitelist'].append(sid)
self.context.get_config().save_config()
event.set_result(MessageEventResult().message("添加白名单成功。"))
@filter.permission_type(filter.PermissionType.ADMIN)
@filter.command("dwl")
async def dwl(self, event: AstrMessageEvent, sid: str):
try:
@@ -142,7 +146,7 @@ UID: {user_id} 此 ID 可用于设置管理员。/op <UID> 授权管理员, /deo
try:
models = await self.context.get_using_provider().get_models()
except BaseException as e:
message.set_result(MessageEventResult().message("获取模型列表失败: " + str(e)))
message.set_result(MessageEventResult().message("获取模型列表失败: " + str(e)).use_t2i(False))
return
i = 1
ret = "下面列出了此服务提供商可用模型:"
@@ -193,6 +197,7 @@ UID: {user_id} 此 ID 可用于设置管理员。/op <UID> 授权管理员, /deo
message.set_result(MessageEventResult().message(ret).use_t2i(False))
@filter.permission_type(filter.PermissionType.ADMIN)
@filter.command("key")
async def key(self, message: AstrMessageEvent, index: int=None):