feat: 支持设置所有指令的权限

feat: 插件指令支持设置指令描述
feat: plugin 指令支持查看插件的指令
This commit is contained in:
Soulter
2025-02-04 15:41:45 +08:00
parent ef05ff4abd
commit 9f9073c0ff
3 changed files with 148 additions and 31 deletions
+11 -7
View File
@@ -17,7 +17,7 @@ def get_handler_full_name(awaitable: Awaitable) -> str:
'''获取 Handler 的全名'''
return f"{awaitable.__module__}_{awaitable.__name__}"
def get_handler_or_create(handler: Awaitable, event_type: EventType, dont_add = False) -> StarHandlerMetadata:
def get_handler_or_create(handler: Awaitable, event_type: EventType, dont_add = False, **kwargs) -> StarHandlerMetadata:
'''获取 Handler 或者创建一个新的 Handler'''
handler_full_name = get_handler_full_name(handler)
md = star_handlers_registry.get_handler_by_full_name(handler_full_name)
@@ -30,14 +30,17 @@ def get_handler_or_create(handler: Awaitable, event_type: EventType, dont_add =
handler_name=handler.__name__,
handler_module_path=handler.__module__,
handler=handler,
event_filters=[]
event_filters=[],
)
if handler.__doc__:
md.desc = handler.__doc__.strip()
if not dont_add:
star_handlers_registry.append(md)
return md
def register_command(command_name: str = None, *args):
'''注册一个 Command'''
'''注册一个 Command.
'''
new_command = None
add_to_event_filters = False
@@ -61,8 +64,9 @@ def register_command(command_name: str = None, *args):
return decorator
def register_command_group(command_group_name: str = None, *args):
'''注册一个 CommandGroup'''
def register_command_group(command_group_name: str = None, desc: str = "", *args):
'''注册一个 CommandGroup
'''
new_group = None
add_to_event_filters = False
@@ -111,10 +115,10 @@ def register_platform_adapter_type(platform_adapter_type: PlatformAdapterType):
return decorator
def register_regex(regex: str):
def register_regex(regex: str, desc: str = ""):
'''注册一个 Regex'''
def decorator(awaitable):
handler_md = get_handler_or_create(awaitable, EventType.AdapterMessageEvent)
handler_md = get_handler_or_create(awaitable, EventType.AdapterMessageEvent, desc=desc)
handler_md.event_filters.append(RegexFilter(regex))
return awaitable
+26 -2
View File
@@ -19,6 +19,8 @@ from .star import star_registry, star_map
from .star_handler import star_handlers_registry
from astrbot.core.provider.register import llm_tools
from .filter.permission import PermissionTypeFilter, PermissionType
class PluginManager:
def __init__(
self,
@@ -159,6 +161,8 @@ class PluginManager:
inactivated_plugins: list = sp.get("inactivated_plugins", [])
inactivated_llm_tools: list = sp.get("inactivated_llm_tools", [])
alter_cmd = sp.get("alter_cmd", {})
# 导入插件模块,并尝试实例化插件类
for plugin_module in plugin_modules:
try:
@@ -213,12 +217,12 @@ class PluginManager:
metadata.root_dir_name = root_dir_name
metadata.reserved = reserved
# 绑定 handler
related_handlers = star_handlers_registry.get_handlers_by_module_name(metadata.module_path)
for handler in related_handlers:
logger.debug(f"bind handler {handler.handler_name} to {metadata.name}")
# handler.handler.__self__ = star_metadata.star_cls # 绑定 handler 的 self
handler.handler = functools.partial(handler.handler, metadata.star_cls)
# llm_tool
# 绑定 llm_tool handler
for func_tool in llm_tools.func_list:
if func_tool.handler.__module__ == metadata.module_path:
func_tool.handler_module_path = metadata.module_path
@@ -253,9 +257,29 @@ class PluginManager:
star_registry.append(metadata)
logger.debug(f"插件 {root_dir_name} 载入成功。")
# 禁用/启用插件
if metadata.module_path in inactivated_plugins:
metadata.activated = False
# 检查并且植入自定义的权限过滤器(alter_cmd)
for handler in star_handlers_registry.get_handlers_by_module_name(metadata.module_path):
if metadata.name in alter_cmd and handler.handler_name in alter_cmd[metadata.name]:
# 注入权限过滤器
cmd_type = alter_cmd[metadata.name][handler.handler_name].get("permission", "member")
found_permission_filter = False
for filter_ in handler.event_filters:
if isinstance(filter_, PermissionTypeFilter):
if cmd_type == "admin":
filter_.permission_type = PermissionType.ADMIN
else:
filter_.permission_type = PermissionType.MEMBER
found_permission_filter = True
break
if not found_permission_filter:
handler.event_filters.append(PermissionTypeFilter(PermissionType.ADMIN if cmd_type == "admin" else PermissionType.MEMBER))
logger.debug(f"插入权限过滤器 {cmd_type}{metadata.name}{handler.handler_name} 方法。")
# 执行 initialize() 方法
if hasattr(metadata.star_cls, "initialize"):
await metadata.star_cls.initialize()
+111 -22
View File
@@ -6,10 +6,13 @@ import astrbot.api.event.filter as filter
from astrbot.api.event import AstrMessageEvent, MessageEventResult
from astrbot.api import sp
from astrbot.api.provider import Personality, ProviderRequest, LLMResponse
from astrbot.api.platform import MessageType
from astrbot.core.utils.io import download_dashboard, get_dashboard_version
from astrbot.core.star.star_handler import star_handlers_registry, StarHandlerMetadata
from astrbot.core.star.star import star_map
from astrbot.core.star.filter.command import CommandFilter
from astrbot.core.star.filter.command_group import CommandGroupFilter
from astrbot.core.star.filter.permission import PermissionTypeFilter
from astrbot.core.config.default import VERSION
from collections import defaultdict
from .long_term_memory import LongTermMemory
from astrbot.core import logger
@@ -41,6 +44,7 @@ class Main(star.Star):
@filter.command("help")
async def help(self, event: AstrMessageEvent):
'''查看帮助'''
notice = ""
try:
notice = await self._query_astrbot_notice()
@@ -50,31 +54,32 @@ class Main(star.Star):
dashboard_version = await get_dashboard_version()
msg = f"""AstrBot v{VERSION}(WebUI: {dashboard_version})
已注册的 AstrBot 内置指令:
AstrBot 指令:
[System]
/plugin: 查看注册的插件、插件帮助
/t2i: 开启/关闭文本转图片模式
/sid: 获取当前会话 ID
/plugin: 查看插件、插件帮助
/t2i: 开文本转图片
/sid: 获取会话 ID
/op <admin_id>: 授权管理员
/deop <admin_id>: 取消管理员
/wl <sid>: 添加会话白名单
/dwl <sid>: 删除会话白名单
/wl <sid>: 添加白名单
/dwl <sid>: 删除白名单
/dashboard_update: 更新管理面板
/alter_cmd: 设置指令权限
[大模型]
/provider: 查看、切换大模型提供商
/model: 查看、切换提供商模型列表
/key: 查看、切换 API Key
/provider: 大模型提供商
/model: 模型列表
/key: API Key
/reset: 重置 LLM 会话
/history: 获取会话历史记录
/persona: 情境人格设置
/tool ls: 查看、激活、停用当前注册的函数工具
/history: 对话记录
/persona: 人格情景
/tool ls: 函数工具
[其他]
/set <变量名> <值>: 为当前会话定义一个变量。适用于 Dify 工作流输入。
/unset <变量名>: 删除当前会话的变量。
/set <变量名> <值>: 为会话定义一个变量。适用于 Dify 工作流输入。
/unset <变量名>: 删除会话的变量。
提示:如要查看插件指令,请输入 /plugin 查看具体信息。
提示:如要查看插件指令,请输入 /plugin 查看具体信息。
{notice}"""
event.set_result(MessageEventResult().message(msg).use_t2i(False))
@@ -124,7 +129,7 @@ class Main(star.Star):
if plugin_list_info.strip() == "":
plugin_list_info = "没有加载任何插件。"
plugin_list_info += "\n使用 /plugin <插件名> 查看插件帮助。\n使用 /plugin on/off <插件名> 启用或者禁用插件。"
plugin_list_info += "\n使用 /plugin <插件名> 查看插件帮助和加载的指令\n使用 /plugin on/off <插件名> 启用或者禁用插件。"
event.set_result(MessageEventResult().message(f"{plugin_list_info}").use_t2i(False))
else:
if oper1 == "off":
@@ -147,10 +152,34 @@ class Main(star.Star):
plugin = self.context.get_registered_star(oper1)
if plugin is None:
event.set_result(MessageEventResult().message("未找到此插件。"))
else:
help_msg = plugin.star_cls.__doc__ if plugin.star_cls.__doc__ else "该插件未提供帮助信息"
ret = f"插件 {oper1} 帮助信息:\n" + help_msg
event.set_result(MessageEventResult().message(ret).use_t2i(False))
return
help_msg = plugin.star_cls.__doc__ if plugin.star_cls.__doc__ else "帮助信息: 未提供"
help_msg += f"\n\n作者: {plugin.author}\n版本: {plugin.version}"
command_handlers = []
command_names = []
for handler in star_handlers_registry:
assert isinstance(handler, StarHandlerMetadata)
if handler.handler_module_path != plugin.module_path:
continue
for filter_ in handler.event_filters:
if isinstance(filter_, CommandFilter):
command_handlers.append(handler)
command_names.append(filter_.command_name)
break
elif isinstance(filter_, CommandGroupFilter):
command_handlers.append(handler)
command_names.append(filter_.group_name)
if len(command_handlers) > 0:
help_msg += "\n\n指令列表:\n"
for i in range(len(command_handlers)):
help_msg += f"{command_names[i]}: {command_handlers[i].desc}\n"
help_msg += "\nTip: 指令的触发需要添加唤醒前缀,默认为 /。"
ret = f"插件 {oper1} 帮助信息:\n" + help_msg
ret += "更多帮助信息请查看插件仓库 README。"
event.set_result(MessageEventResult().message(ret).use_t2i(False))
@filter.command("t2i")
async def t2i(self, event: AstrMessageEvent):
@@ -540,6 +569,66 @@ UID: {user_id} 此 ID 可用于设置管理员。/op <UID> 授权管理员, /deo
await self.ltm.after_req_llm(event)
except BaseException as e:
logger.error(f"ltm: {e}")
@filter.permission_type(filter.PermissionType.ADMIN)
@filter.command("alter_cmd")
async def alter_cmd(self, event: AstrMessageEvent):
# token = event.message_str.split(" ")
token = self.parse_commands(event.message_str)
if token.len < 2:
yield event.plain_result("可设置所有其他指令是否需要管理员权限。\n格式: /alter_cmd <cmd_name> <admin/member>\n 例如: /alter_cmd provider admin 将 provider 设置为管理员指令")
return
cmd_name = token.get(1)
cmd_type = token.get(2)
if cmd_type not in ["admin", "member"]:
yield event.plain_result("指令类型错误,可选类型有 admin, member")
return
# 查找指令
found_command = None
for handler in star_handlers_registry:
assert isinstance(handler, StarHandlerMetadata)
for filter_ in handler.event_filters:
if isinstance(filter_, CommandFilter):
if filter_.command_name == cmd_name:
found_command = handler
break
elif isinstance(filter_, CommandGroupFilter):
if cmd_name == filter_.group_name:
found_command = handler
break
if not found_command:
yield event.plain_result("未找到该指令")
return
found_plugin = star_map[found_command.handler_module_path]
alter_cmd_cfg = sp.get("alter_cmd", {})
plugin_ = alter_cmd_cfg.get(found_plugin.name, {})
cfg = plugin_.get(found_command.handler_name, {})
cfg["permission"] = cmd_type
plugin_[found_command.handler_name] = cfg
alter_cmd_cfg[found_plugin.name] = plugin_
sp.put("alter_cmd", alter_cmd_cfg)
# 注入权限过滤器
found_permission_filter = False
for filter_ in found_command.event_filters:
if isinstance(filter_, PermissionTypeFilter):
if cmd_type == "admin":
filter_.permission_type = filter.PermissionType.ADMIN
else:
filter_.permission_type = filter.PermissionType.MEMBER
found_permission_filter = True
break
if not found_permission_filter:
found_command.event_filters.insert(0, PermissionTypeFilter(filter.PermissionType.ADMIN if cmd_type == "admin" else filter.PermissionType.MEMBER))
yield event.plain_result(f"已将 {cmd_name} 设置为 {cmd_type} 指令")
# @filter.command_group("kdb")
# def kdb(self):