diff --git a/astrbot/core/star/register/star_handler.py b/astrbot/core/star/register/star_handler.py index 0d31d0814..b4b2ee918 100644 --- a/astrbot/core/star/register/star_handler.py +++ b/astrbot/core/star/register/star_handler.py @@ -17,7 +17,7 @@ def get_handler_full_name(awaitable: Awaitable) -> str: '''获取 Handler 的全名''' return f"{awaitable.__module__}_{awaitable.__name__}" -def get_handler_or_create(handler: Awaitable, event_type: EventType, dont_add = False) -> StarHandlerMetadata: +def get_handler_or_create(handler: Awaitable, event_type: EventType, dont_add = False, **kwargs) -> StarHandlerMetadata: '''获取 Handler 或者创建一个新的 Handler''' handler_full_name = get_handler_full_name(handler) md = star_handlers_registry.get_handler_by_full_name(handler_full_name) @@ -30,14 +30,17 @@ def get_handler_or_create(handler: Awaitable, event_type: EventType, dont_add = handler_name=handler.__name__, handler_module_path=handler.__module__, handler=handler, - event_filters=[] + event_filters=[], ) + if handler.__doc__: + md.desc = handler.__doc__.strip() if not dont_add: star_handlers_registry.append(md) return md def register_command(command_name: str = None, *args): - '''注册一个 Command''' + '''注册一个 Command. + ''' new_command = None add_to_event_filters = False @@ -61,8 +64,9 @@ def register_command(command_name: str = None, *args): return decorator -def register_command_group(command_group_name: str = None, *args): - '''注册一个 CommandGroup''' +def register_command_group(command_group_name: str = None, desc: str = "", *args): + '''注册一个 CommandGroup + ''' new_group = None add_to_event_filters = False @@ -111,10 +115,10 @@ def register_platform_adapter_type(platform_adapter_type: PlatformAdapterType): return decorator -def register_regex(regex: str): +def register_regex(regex: str, desc: str = ""): '''注册一个 Regex''' def decorator(awaitable): - handler_md = get_handler_or_create(awaitable, EventType.AdapterMessageEvent) + handler_md = get_handler_or_create(awaitable, EventType.AdapterMessageEvent, desc=desc) handler_md.event_filters.append(RegexFilter(regex)) return awaitable diff --git a/astrbot/core/star/star_manager.py b/astrbot/core/star/star_manager.py index 1854c5490..a7a694fce 100644 --- a/astrbot/core/star/star_manager.py +++ b/astrbot/core/star/star_manager.py @@ -19,6 +19,8 @@ from .star import star_registry, star_map from .star_handler import star_handlers_registry from astrbot.core.provider.register import llm_tools +from .filter.permission import PermissionTypeFilter, PermissionType + class PluginManager: def __init__( self, @@ -159,6 +161,8 @@ class PluginManager: inactivated_plugins: list = sp.get("inactivated_plugins", []) inactivated_llm_tools: list = sp.get("inactivated_llm_tools", []) + alter_cmd = sp.get("alter_cmd", {}) + # 导入插件模块,并尝试实例化插件类 for plugin_module in plugin_modules: try: @@ -213,12 +217,12 @@ class PluginManager: metadata.root_dir_name = root_dir_name metadata.reserved = reserved + # 绑定 handler related_handlers = star_handlers_registry.get_handlers_by_module_name(metadata.module_path) for handler in related_handlers: logger.debug(f"bind handler {handler.handler_name} to {metadata.name}") - # handler.handler.__self__ = star_metadata.star_cls # 绑定 handler 的 self handler.handler = functools.partial(handler.handler, metadata.star_cls) - # llm_tool + # 绑定 llm_tool handler for func_tool in llm_tools.func_list: if func_tool.handler.__module__ == metadata.module_path: func_tool.handler_module_path = metadata.module_path @@ -253,9 +257,29 @@ class PluginManager: star_registry.append(metadata) logger.debug(f"插件 {root_dir_name} 载入成功。") + # 禁用/启用插件 if metadata.module_path in inactivated_plugins: metadata.activated = False + # 检查并且植入自定义的权限过滤器(alter_cmd) + for handler in star_handlers_registry.get_handlers_by_module_name(metadata.module_path): + if metadata.name in alter_cmd and handler.handler_name in alter_cmd[metadata.name]: + # 注入权限过滤器 + cmd_type = alter_cmd[metadata.name][handler.handler_name].get("permission", "member") + found_permission_filter = False + for filter_ in handler.event_filters: + if isinstance(filter_, PermissionTypeFilter): + if cmd_type == "admin": + filter_.permission_type = PermissionType.ADMIN + else: + filter_.permission_type = PermissionType.MEMBER + found_permission_filter = True + break + if not found_permission_filter: + handler.event_filters.append(PermissionTypeFilter(PermissionType.ADMIN if cmd_type == "admin" else PermissionType.MEMBER)) + + logger.debug(f"插入权限过滤器 {cmd_type} 到 {metadata.name} 的 {handler.handler_name} 方法。") + # 执行 initialize() 方法 if hasattr(metadata.star_cls, "initialize"): await metadata.star_cls.initialize() diff --git a/packages/astrbot/main.py b/packages/astrbot/main.py index d70010ee9..95ceae5fa 100644 --- a/packages/astrbot/main.py +++ b/packages/astrbot/main.py @@ -6,10 +6,13 @@ import astrbot.api.event.filter as filter from astrbot.api.event import AstrMessageEvent, MessageEventResult from astrbot.api import sp from astrbot.api.provider import Personality, ProviderRequest, LLMResponse -from astrbot.api.platform import MessageType from astrbot.core.utils.io import download_dashboard, get_dashboard_version +from astrbot.core.star.star_handler import star_handlers_registry, StarHandlerMetadata +from astrbot.core.star.star import star_map +from astrbot.core.star.filter.command import CommandFilter +from astrbot.core.star.filter.command_group import CommandGroupFilter +from astrbot.core.star.filter.permission import PermissionTypeFilter from astrbot.core.config.default import VERSION -from collections import defaultdict from .long_term_memory import LongTermMemory from astrbot.core import logger @@ -41,6 +44,7 @@ class Main(star.Star): @filter.command("help") async def help(self, event: AstrMessageEvent): + '''查看帮助''' notice = "" try: notice = await self._query_astrbot_notice() @@ -50,31 +54,32 @@ class Main(star.Star): dashboard_version = await get_dashboard_version() msg = f"""AstrBot v{VERSION}(WebUI: {dashboard_version}) -已注册的 AstrBot 内置指令: +AstrBot 指令: [System] -/plugin: 查看注册的插件、插件帮助 -/t2i: 开启/关闭文本转图片模式 -/sid: 获取当前会话的 ID +/plugin: 查看插件、插件帮助 +/t2i: 开关文本转图片 +/sid: 获取会话 ID /op : 授权管理员 /deop : 取消管理员 -/wl : 添加会话白名单 -/dwl : 删除会话白名单 +/wl : 添加白名单 +/dwl : 删除白名单 /dashboard_update: 更新管理面板 +/alter_cmd: 设置指令权限 [大模型] -/provider: 查看、切换大模型提供商 -/model: 查看、切换提供商模型列表 -/key: 查看、切换 API Key +/provider: 大模型提供商 +/model: 模型列表 +/key: API Key /reset: 重置 LLM 会话 -/history: 获取会话历史记录 -/persona: 情境人格设置 -/tool ls: 查看、激活、停用当前注册的函数工具 +/history: 对话记录 +/persona: 人格情景 +/tool ls: 函数工具 [其他] -/set <变量名> <值>: 为当前会话定义一个变量。适用于 Dify 工作流输入。 -/unset <变量名>: 删除当前会话的变量。 +/set <变量名> <值>: 为会话定义一个变量。适用于 Dify 工作流输入。 +/unset <变量名>: 删除会话的变量。 -提示:如果要查看插件指令,请输入 /plugin 查看具体信息。 +提示:如要查看插件指令,请输入 /plugin 查看具体信息。 {notice}""" event.set_result(MessageEventResult().message(msg).use_t2i(False)) @@ -124,7 +129,7 @@ class Main(star.Star): if plugin_list_info.strip() == "": plugin_list_info = "没有加载任何插件。" - plugin_list_info += "\n使用 /plugin <插件名> 查看插件帮助。\n使用 /plugin on/off <插件名> 启用或者禁用插件。" + plugin_list_info += "\n使用 /plugin <插件名> 查看插件帮助和加载的指令。\n使用 /plugin on/off <插件名> 启用或者禁用插件。" event.set_result(MessageEventResult().message(f"{plugin_list_info}").use_t2i(False)) else: if oper1 == "off": @@ -147,10 +152,34 @@ class Main(star.Star): plugin = self.context.get_registered_star(oper1) if plugin is None: event.set_result(MessageEventResult().message("未找到此插件。")) - else: - help_msg = plugin.star_cls.__doc__ if plugin.star_cls.__doc__ else "该插件未提供帮助信息" - ret = f"插件 {oper1} 帮助信息:\n" + help_msg - event.set_result(MessageEventResult().message(ret).use_t2i(False)) + return + help_msg = plugin.star_cls.__doc__ if plugin.star_cls.__doc__ else "帮助信息: 未提供" + help_msg += f"\n\n作者: {plugin.author}\n版本: {plugin.version}" + command_handlers = [] + command_names = [] + for handler in star_handlers_registry: + assert isinstance(handler, StarHandlerMetadata) + if handler.handler_module_path != plugin.module_path: + continue + for filter_ in handler.event_filters: + if isinstance(filter_, CommandFilter): + command_handlers.append(handler) + command_names.append(filter_.command_name) + break + elif isinstance(filter_, CommandGroupFilter): + command_handlers.append(handler) + command_names.append(filter_.group_name) + + if len(command_handlers) > 0: + help_msg += "\n\n指令列表:\n" + for i in range(len(command_handlers)): + help_msg += f"{command_names[i]}: {command_handlers[i].desc}\n" + + help_msg += "\nTip: 指令的触发需要添加唤醒前缀,默认为 /。" + + ret = f"插件 {oper1} 帮助信息:\n" + help_msg + ret += "更多帮助信息请查看插件仓库 README。" + event.set_result(MessageEventResult().message(ret).use_t2i(False)) @filter.command("t2i") async def t2i(self, event: AstrMessageEvent): @@ -540,6 +569,66 @@ UID: {user_id} 此 ID 可用于设置管理员。/op 授权管理员, /deo await self.ltm.after_req_llm(event) except BaseException as e: logger.error(f"ltm: {e}") + + @filter.permission_type(filter.PermissionType.ADMIN) + @filter.command("alter_cmd") + async def alter_cmd(self, event: AstrMessageEvent): + # token = event.message_str.split(" ") + token = self.parse_commands(event.message_str) + if token.len < 2: + yield event.plain_result("可设置所有其他指令是否需要管理员权限。\n格式: /alter_cmd \n 例如: /alter_cmd provider admin 将 provider 设置为管理员指令") + return + + cmd_name = token.get(1) + cmd_type = token.get(2) + + if cmd_type not in ["admin", "member"]: + yield event.plain_result("指令类型错误,可选类型有 admin, member") + return + + # 查找指令 + found_command = None + for handler in star_handlers_registry: + assert isinstance(handler, StarHandlerMetadata) + for filter_ in handler.event_filters: + if isinstance(filter_, CommandFilter): + if filter_.command_name == cmd_name: + found_command = handler + break + elif isinstance(filter_, CommandGroupFilter): + if cmd_name == filter_.group_name: + found_command = handler + break + + if not found_command: + yield event.plain_result("未找到该指令") + return + + found_plugin = star_map[found_command.handler_module_path] + + alter_cmd_cfg = sp.get("alter_cmd", {}) + plugin_ = alter_cmd_cfg.get(found_plugin.name, {}) + cfg = plugin_.get(found_command.handler_name, {}) + cfg["permission"] = cmd_type + plugin_[found_command.handler_name] = cfg + alter_cmd_cfg[found_plugin.name] = plugin_ + + sp.put("alter_cmd", alter_cmd_cfg) + + # 注入权限过滤器 + found_permission_filter = False + for filter_ in found_command.event_filters: + if isinstance(filter_, PermissionTypeFilter): + if cmd_type == "admin": + filter_.permission_type = filter.PermissionType.ADMIN + else: + filter_.permission_type = filter.PermissionType.MEMBER + found_permission_filter = True + break + if not found_permission_filter: + found_command.event_filters.insert(0, PermissionTypeFilter(filter.PermissionType.ADMIN if cmd_type == "admin" else filter.PermissionType.MEMBER)) + + yield event.plain_result(f"已将 {cmd_name} 设置为 {cmd_type} 指令") # @filter.command_group("kdb") # def kdb(self):