4e0b5063c6
* feat(backend): add permission update api * feat(useCommandActions): add updatePermission action and translations * feat(dashboard): implement permission editing ui * style: fix import sorting in command.py * refactor(backend): extract permission update logic to service * feat(i18n): add success and failure messages for command updates --------- Co-authored-by: Soulter <905617992@qq.com>
543 lines
19 KiB
Python
543 lines
19 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from astrbot.api import sp
|
|
from astrbot.core import db_helper, logger
|
|
from astrbot.core.db.po import CommandConfig
|
|
from astrbot.core.star.filter.command import CommandFilter
|
|
from astrbot.core.star.filter.command_group import CommandGroupFilter
|
|
from astrbot.core.star.filter.permission import PermissionType, PermissionTypeFilter
|
|
from astrbot.core.star.star import star_map
|
|
from astrbot.core.star.star_handler import StarHandlerMetadata, star_handlers_registry
|
|
|
|
|
|
@dataclass
|
|
class CommandDescriptor:
|
|
handler: StarHandlerMetadata = field(repr=False)
|
|
filter_ref: CommandFilter | CommandGroupFilter | None = field(
|
|
default=None,
|
|
repr=False,
|
|
)
|
|
handler_full_name: str = ""
|
|
handler_name: str = ""
|
|
plugin_name: str = ""
|
|
plugin_display_name: str | None = None
|
|
module_path: str = ""
|
|
description: str = ""
|
|
command_type: str = "command" # "command" | "group" | "sub_command"
|
|
raw_command_name: str | None = None
|
|
current_fragment: str | None = None
|
|
parent_signature: str = ""
|
|
parent_group_handler: str = ""
|
|
original_command: str | None = None
|
|
effective_command: str | None = None
|
|
aliases: list[str] = field(default_factory=list)
|
|
permission: str = "everyone"
|
|
enabled: bool = True
|
|
is_group: bool = False
|
|
is_sub_command: bool = False
|
|
reserved: bool = False
|
|
config: CommandConfig | None = None
|
|
has_conflict: bool = False
|
|
sub_commands: list[CommandDescriptor] = field(default_factory=list)
|
|
|
|
|
|
async def sync_command_configs() -> None:
|
|
"""同步指令配置,清理过期配置。"""
|
|
descriptors = _collect_descriptors(include_sub_commands=False)
|
|
config_records = await db_helper.get_command_configs()
|
|
config_map = _bind_configs_to_descriptors(descriptors, config_records)
|
|
live_handlers = {desc.handler_full_name for desc in descriptors}
|
|
|
|
stale_configs = [key for key in config_map if key not in live_handlers]
|
|
if stale_configs:
|
|
await db_helper.delete_command_configs(stale_configs)
|
|
|
|
|
|
async def toggle_command(handler_full_name: str, enabled: bool) -> CommandDescriptor:
|
|
descriptor = _build_descriptor_by_full_name(handler_full_name)
|
|
if not descriptor:
|
|
raise ValueError("指定的处理函数不存在或不是指令。")
|
|
|
|
existing_cfg = await db_helper.get_command_config(handler_full_name)
|
|
config = await db_helper.upsert_command_config(
|
|
handler_full_name=handler_full_name,
|
|
plugin_name=descriptor.plugin_name or "",
|
|
module_path=descriptor.module_path,
|
|
original_command=descriptor.original_command or descriptor.handler_name,
|
|
resolved_command=(
|
|
existing_cfg.resolved_command
|
|
if existing_cfg
|
|
else descriptor.current_fragment
|
|
),
|
|
enabled=enabled,
|
|
keep_original_alias=False,
|
|
conflict_key=existing_cfg.conflict_key
|
|
if existing_cfg and existing_cfg.conflict_key
|
|
else descriptor.original_command,
|
|
resolution_strategy=existing_cfg.resolution_strategy if existing_cfg else None,
|
|
note=existing_cfg.note if existing_cfg else None,
|
|
extra_data=existing_cfg.extra_data if existing_cfg else None,
|
|
auto_managed=False,
|
|
)
|
|
_bind_descriptor_with_config(descriptor, config)
|
|
await sync_command_configs()
|
|
return descriptor
|
|
|
|
|
|
async def rename_command(
|
|
handler_full_name: str,
|
|
new_fragment: str,
|
|
aliases: list[str] | None = None,
|
|
) -> CommandDescriptor:
|
|
descriptor = _build_descriptor_by_full_name(handler_full_name)
|
|
if not descriptor:
|
|
raise ValueError("指定的处理函数不存在或不是指令。")
|
|
|
|
new_fragment = new_fragment.strip()
|
|
if not new_fragment:
|
|
raise ValueError("指令名不能为空。")
|
|
|
|
# 校验主指令名
|
|
candidate_full = _compose_command(descriptor.parent_signature, new_fragment)
|
|
if _is_command_in_use(handler_full_name, candidate_full):
|
|
raise ValueError(f"指令名 '{candidate_full}' 已被其他指令占用。")
|
|
|
|
# 校验别名
|
|
if aliases:
|
|
for alias in aliases:
|
|
alias = alias.strip()
|
|
if not alias:
|
|
continue
|
|
alias_full = _compose_command(descriptor.parent_signature, alias)
|
|
if _is_command_in_use(handler_full_name, alias_full):
|
|
raise ValueError(f"别名 '{alias_full}' 已被其他指令占用。")
|
|
|
|
existing_cfg = await db_helper.get_command_config(handler_full_name)
|
|
merged_extra = dict(existing_cfg.extra_data or {}) if existing_cfg else {}
|
|
merged_extra["resolved_aliases"] = aliases or []
|
|
|
|
config = await db_helper.upsert_command_config(
|
|
handler_full_name=handler_full_name,
|
|
plugin_name=descriptor.plugin_name or "",
|
|
module_path=descriptor.module_path,
|
|
original_command=descriptor.original_command or descriptor.handler_name,
|
|
resolved_command=new_fragment,
|
|
enabled=True if descriptor.enabled else False,
|
|
keep_original_alias=False,
|
|
conflict_key=descriptor.original_command,
|
|
resolution_strategy="manual_rename",
|
|
note=None,
|
|
extra_data=merged_extra,
|
|
auto_managed=False,
|
|
)
|
|
_bind_descriptor_with_config(descriptor, config)
|
|
|
|
await sync_command_configs()
|
|
return descriptor
|
|
|
|
|
|
async def update_command_permission(
|
|
handler_full_name: str,
|
|
permission_type: str,
|
|
) -> CommandDescriptor:
|
|
descriptor = _build_descriptor_by_full_name(handler_full_name)
|
|
if not descriptor:
|
|
raise ValueError("指定的处理函数不存在或不是指令。")
|
|
|
|
if permission_type not in ["admin", "member"]:
|
|
raise ValueError("权限类型必须为 admin 或 member。")
|
|
|
|
handler = descriptor.handler
|
|
found_plugin = star_map.get(handler.handler_module_path)
|
|
if not found_plugin:
|
|
raise ValueError("未找到指令所属插件")
|
|
|
|
# 1. Update Persistent Config (alter_cmd)
|
|
alter_cmd_cfg = await sp.global_get("alter_cmd", {})
|
|
plugin_ = alter_cmd_cfg.get(found_plugin.name, {})
|
|
cfg = plugin_.get(handler.handler_name, {})
|
|
cfg["permission"] = permission_type
|
|
plugin_[handler.handler_name] = cfg
|
|
alter_cmd_cfg[found_plugin.name] = plugin_
|
|
|
|
await sp.global_put("alter_cmd", alter_cmd_cfg)
|
|
|
|
# 2. Update Runtime Filter
|
|
found_permission_filter = False
|
|
target_perm_type = (
|
|
PermissionType.ADMIN if permission_type == "admin" else PermissionType.MEMBER
|
|
)
|
|
|
|
for filter_ in handler.event_filters:
|
|
if isinstance(filter_, PermissionTypeFilter):
|
|
filter_.permission_type = target_perm_type
|
|
found_permission_filter = True
|
|
break
|
|
|
|
if not found_permission_filter:
|
|
handler.event_filters.insert(0, PermissionTypeFilter(target_perm_type))
|
|
|
|
# Re-build descriptor to reflect changes
|
|
return _build_descriptor(handler) or descriptor
|
|
|
|
|
|
async def list_commands() -> list[dict[str, Any]]:
|
|
descriptors = _collect_descriptors(include_sub_commands=True)
|
|
config_records = await db_helper.get_command_configs()
|
|
_bind_configs_to_descriptors(descriptors, config_records)
|
|
|
|
conflict_groups = _group_conflicts(descriptors)
|
|
conflict_handler_names: set[str] = {
|
|
d.handler_full_name for group in conflict_groups.values() for d in group
|
|
}
|
|
|
|
# 分类,设置冲突标志,将子指令挂载到父指令组
|
|
group_map: dict[str, CommandDescriptor] = {}
|
|
sub_commands: list[CommandDescriptor] = []
|
|
root_commands: list[CommandDescriptor] = []
|
|
|
|
for desc in descriptors:
|
|
desc.has_conflict = desc.handler_full_name in conflict_handler_names
|
|
if desc.is_group:
|
|
group_map[desc.handler_full_name] = desc
|
|
elif desc.is_sub_command:
|
|
sub_commands.append(desc)
|
|
else:
|
|
root_commands.append(desc)
|
|
|
|
for sub in sub_commands:
|
|
if sub.parent_group_handler and sub.parent_group_handler in group_map:
|
|
group_map[sub.parent_group_handler].sub_commands.append(sub)
|
|
else:
|
|
root_commands.append(sub)
|
|
|
|
# 指令组 + 普通指令,按 effective_command 字母排序
|
|
all_commands = list(group_map.values()) + root_commands
|
|
all_commands.sort(key=lambda d: (d.effective_command or "").lower())
|
|
|
|
result = [_descriptor_to_dict(desc) for desc in all_commands]
|
|
return result
|
|
|
|
|
|
async def list_command_conflicts() -> list[dict[str, Any]]:
|
|
"""列出所有冲突的指令组。"""
|
|
descriptors = _collect_descriptors(include_sub_commands=False)
|
|
config_records = await db_helper.get_command_configs()
|
|
_bind_configs_to_descriptors(descriptors, config_records)
|
|
|
|
conflict_groups = _group_conflicts(descriptors)
|
|
details = [
|
|
{
|
|
"conflict_key": key,
|
|
"handlers": [
|
|
{
|
|
"handler_full_name": item.handler_full_name,
|
|
"plugin": item.plugin_name,
|
|
"current_name": item.effective_command,
|
|
}
|
|
for item in group
|
|
],
|
|
}
|
|
for key, group in conflict_groups.items()
|
|
]
|
|
return details
|
|
|
|
|
|
# Internal helpers ----------------------------------------------------------
|
|
|
|
|
|
def _collect_descriptors(include_sub_commands: bool) -> list[CommandDescriptor]:
|
|
"""收集指令,按需包含子指令。"""
|
|
descriptors: list[CommandDescriptor] = []
|
|
for handler in star_handlers_registry:
|
|
try:
|
|
desc = _build_descriptor(handler)
|
|
if not desc:
|
|
continue
|
|
if not include_sub_commands and desc.is_sub_command:
|
|
continue
|
|
descriptors.append(desc)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"解析指令处理函数 {handler.handler_full_name} 失败,跳过该指令。原因: {e!s}"
|
|
)
|
|
continue
|
|
return descriptors
|
|
|
|
|
|
def _build_descriptor(handler: StarHandlerMetadata) -> CommandDescriptor | None:
|
|
filter_ref = _locate_primary_filter(handler)
|
|
if filter_ref is None:
|
|
return None
|
|
|
|
plugin_meta = star_map.get(handler.handler_module_path)
|
|
plugin_name = (
|
|
plugin_meta.name if plugin_meta else None
|
|
) or handler.handler_module_path
|
|
plugin_display = plugin_meta.display_name if plugin_meta else None
|
|
|
|
is_sub_command = bool(handler.extras_configs.get("sub_command"))
|
|
parent_group_handler = ""
|
|
|
|
if isinstance(filter_ref, CommandFilter):
|
|
raw_fragment = getattr(
|
|
filter_ref, "_original_command_name", filter_ref.command_name
|
|
)
|
|
current_fragment = filter_ref.command_name
|
|
parent_signature = (filter_ref.parent_command_names or [""])[0].strip()
|
|
# 如果是子指令,尝试找到父指令组的 handler_full_name
|
|
if is_sub_command and parent_signature:
|
|
parent_group_handler = _find_parent_group_handler(
|
|
handler.handler_module_path, parent_signature
|
|
)
|
|
else:
|
|
raw_fragment = getattr(
|
|
filter_ref, "_original_group_name", filter_ref.group_name
|
|
)
|
|
current_fragment = filter_ref.group_name
|
|
parent_signature = _resolve_group_parent_signature(filter_ref)
|
|
|
|
original_command = _compose_command(parent_signature, raw_fragment)
|
|
effective_command = _compose_command(parent_signature, current_fragment)
|
|
|
|
# 确定 command_type
|
|
if isinstance(filter_ref, CommandGroupFilter):
|
|
command_type = "group"
|
|
elif is_sub_command:
|
|
command_type = "sub_command"
|
|
else:
|
|
command_type = "command"
|
|
|
|
descriptor = CommandDescriptor(
|
|
handler=handler,
|
|
filter_ref=filter_ref,
|
|
handler_full_name=handler.handler_full_name,
|
|
handler_name=handler.handler_name,
|
|
plugin_name=plugin_name,
|
|
plugin_display_name=plugin_display,
|
|
module_path=handler.handler_module_path,
|
|
description=handler.desc or "",
|
|
command_type=command_type,
|
|
raw_command_name=raw_fragment,
|
|
current_fragment=current_fragment,
|
|
parent_signature=parent_signature,
|
|
parent_group_handler=parent_group_handler,
|
|
original_command=original_command,
|
|
effective_command=effective_command,
|
|
aliases=sorted(getattr(filter_ref, "alias", set())),
|
|
permission=_determine_permission(handler),
|
|
enabled=handler.enabled,
|
|
is_group=isinstance(filter_ref, CommandGroupFilter),
|
|
is_sub_command=is_sub_command,
|
|
reserved=plugin_meta.reserved if plugin_meta else False,
|
|
)
|
|
return descriptor
|
|
|
|
|
|
def _build_descriptor_by_full_name(full_name: str) -> CommandDescriptor | None:
|
|
handler = star_handlers_registry.get_handler_by_full_name(full_name)
|
|
if not handler:
|
|
return None
|
|
return _build_descriptor(handler)
|
|
|
|
|
|
def _locate_primary_filter(
|
|
handler: StarHandlerMetadata,
|
|
) -> CommandFilter | CommandGroupFilter | None:
|
|
for filter_ref in handler.event_filters:
|
|
if isinstance(filter_ref, CommandFilter | CommandGroupFilter):
|
|
return filter_ref
|
|
return None
|
|
|
|
|
|
def _determine_permission(handler: StarHandlerMetadata) -> str:
|
|
for filter_ref in handler.event_filters:
|
|
if isinstance(filter_ref, PermissionTypeFilter):
|
|
return (
|
|
"admin"
|
|
if filter_ref.permission_type == PermissionType.ADMIN
|
|
else "member"
|
|
)
|
|
return "everyone"
|
|
|
|
|
|
def _resolve_group_parent_signature(group_filter: CommandGroupFilter) -> str:
|
|
signatures: list[str] = []
|
|
parent = group_filter.parent_group
|
|
while parent:
|
|
signatures.append(getattr(parent, "_original_group_name", parent.group_name))
|
|
parent = parent.parent_group
|
|
return " ".join(reversed(signatures)).strip()
|
|
|
|
|
|
def _find_parent_group_handler(module_path: str, parent_signature: str) -> str:
|
|
"""根据模块路径和父级签名,找到对应的指令组 handler_full_name。"""
|
|
parent_sig_normalized = parent_signature.strip()
|
|
for handler in star_handlers_registry:
|
|
if handler.handler_module_path != module_path:
|
|
continue
|
|
filter_ref = _locate_primary_filter(handler)
|
|
if not isinstance(filter_ref, CommandGroupFilter):
|
|
continue
|
|
# 检查该指令组的完整指令名是否匹配 parent_signature
|
|
group_names = filter_ref.get_complete_command_names()
|
|
if parent_sig_normalized in group_names:
|
|
return handler.handler_full_name
|
|
return ""
|
|
|
|
|
|
def _compose_command(parent_signature: str, fragment: str | None) -> str:
|
|
fragment = (fragment or "").strip()
|
|
parent_signature = parent_signature.strip()
|
|
if not parent_signature:
|
|
return fragment
|
|
if not fragment:
|
|
return parent_signature
|
|
return f"{parent_signature} {fragment}"
|
|
|
|
|
|
def _bind_descriptor_with_config(
|
|
descriptor: CommandDescriptor,
|
|
config: CommandConfig,
|
|
) -> None:
|
|
_apply_config_to_descriptor(descriptor, config)
|
|
_apply_config_to_runtime(descriptor, config)
|
|
|
|
|
|
def _apply_config_to_descriptor(
|
|
descriptor: CommandDescriptor,
|
|
config: CommandConfig,
|
|
) -> None:
|
|
descriptor.config = config
|
|
descriptor.enabled = config.enabled
|
|
|
|
if config.original_command:
|
|
descriptor.original_command = config.original_command
|
|
|
|
new_fragment = config.resolved_command or descriptor.current_fragment
|
|
descriptor.current_fragment = new_fragment
|
|
descriptor.effective_command = _compose_command(
|
|
descriptor.parent_signature,
|
|
new_fragment,
|
|
)
|
|
|
|
extra = config.extra_data or {}
|
|
resolved_aliases = extra.get("resolved_aliases")
|
|
if isinstance(resolved_aliases, list):
|
|
descriptor.aliases = [str(x) for x in resolved_aliases if str(x).strip()]
|
|
|
|
|
|
def _apply_config_to_runtime(
|
|
descriptor: CommandDescriptor,
|
|
config: CommandConfig,
|
|
) -> None:
|
|
descriptor.handler.enabled = config.enabled
|
|
if descriptor.filter_ref:
|
|
if descriptor.current_fragment:
|
|
_set_filter_fragment(descriptor.filter_ref, descriptor.current_fragment)
|
|
extra = config.extra_data or {}
|
|
resolved_aliases = extra.get("resolved_aliases")
|
|
if isinstance(resolved_aliases, list):
|
|
_set_filter_aliases(
|
|
descriptor.filter_ref,
|
|
[str(x) for x in resolved_aliases if str(x).strip()],
|
|
)
|
|
|
|
|
|
def _bind_configs_to_descriptors(
|
|
descriptors: list[CommandDescriptor],
|
|
config_records: list[CommandConfig],
|
|
) -> dict[str, CommandConfig]:
|
|
config_map = {cfg.handler_full_name: cfg for cfg in config_records}
|
|
for desc in descriptors:
|
|
if cfg := config_map.get(desc.handler_full_name):
|
|
_bind_descriptor_with_config(desc, cfg)
|
|
return config_map
|
|
|
|
|
|
def _group_conflicts(
|
|
descriptors: list[CommandDescriptor],
|
|
) -> dict[str, list[CommandDescriptor]]:
|
|
conflicts: dict[str, list[CommandDescriptor]] = defaultdict(list)
|
|
for desc in descriptors:
|
|
if desc.effective_command and desc.enabled:
|
|
conflicts[desc.effective_command].append(desc)
|
|
return {k: v for k, v in conflicts.items() if len(v) > 1}
|
|
|
|
|
|
def _set_filter_fragment(
|
|
filter_ref: CommandFilter | CommandGroupFilter,
|
|
fragment: str,
|
|
) -> None:
|
|
attr = (
|
|
"group_name" if isinstance(filter_ref, CommandGroupFilter) else "command_name"
|
|
)
|
|
current_value = getattr(filter_ref, attr)
|
|
if fragment == current_value:
|
|
return
|
|
setattr(filter_ref, attr, fragment)
|
|
if hasattr(filter_ref, "_cmpl_cmd_names"):
|
|
filter_ref._cmpl_cmd_names = None
|
|
|
|
|
|
def _set_filter_aliases(
|
|
filter_ref: CommandFilter | CommandGroupFilter,
|
|
aliases: list[str],
|
|
) -> None:
|
|
current_aliases = getattr(filter_ref, "alias", set())
|
|
if set(aliases) == current_aliases:
|
|
return
|
|
setattr(filter_ref, "alias", set(aliases))
|
|
if hasattr(filter_ref, "_cmpl_cmd_names"):
|
|
filter_ref._cmpl_cmd_names = None
|
|
|
|
|
|
def _is_command_in_use(
|
|
target_handler_full_name: str,
|
|
candidate_full_command: str,
|
|
) -> bool:
|
|
candidate = candidate_full_command.strip()
|
|
for handler in star_handlers_registry:
|
|
if handler.handler_full_name == target_handler_full_name:
|
|
continue
|
|
filter_ref = _locate_primary_filter(handler)
|
|
if not filter_ref:
|
|
continue
|
|
names = {name.strip() for name in filter_ref.get_complete_command_names()}
|
|
if candidate in names:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _descriptor_to_dict(desc: CommandDescriptor) -> dict[str, Any]:
|
|
result = {
|
|
"handler_full_name": desc.handler_full_name,
|
|
"handler_name": desc.handler_name,
|
|
"plugin": desc.plugin_name,
|
|
"plugin_display_name": desc.plugin_display_name,
|
|
"module_path": desc.module_path,
|
|
"description": desc.description,
|
|
"type": desc.command_type,
|
|
"parent_signature": desc.parent_signature,
|
|
"parent_group_handler": desc.parent_group_handler,
|
|
"original_command": desc.original_command,
|
|
"current_fragment": desc.current_fragment,
|
|
"effective_command": desc.effective_command,
|
|
"aliases": desc.aliases,
|
|
"permission": desc.permission,
|
|
"enabled": desc.enabled,
|
|
"is_group": desc.is_group,
|
|
"has_conflict": desc.has_conflict,
|
|
"reserved": desc.reserved,
|
|
}
|
|
# 如果是指令组,包含子指令列表
|
|
if desc.is_group and desc.sub_commands:
|
|
result["sub_commands"] = [_descriptor_to_dict(sub) for sub in desc.sub_commands]
|
|
else:
|
|
result["sub_commands"] = []
|
|
return result
|