refactor: bundled webui static files into wheel and replace astrbot cli log with English (#5665)

* refactor: bundled webui static files into wheel and replace astrbot cli log with English

- Translated and standardized log messages in cmd_conf.py for better clarity.
- Updated initialization logic in cmd_init.py to provide clearer user prompts and error handling.
- Improved plugin management commands in cmd_plug.py with consistent language and error messages.
- Enhanced run command in cmd_run.py with clearer status messages and error handling.
- Updated utility functions in basic.py and plugin.py to improve readability and maintainability.
- Added version comparison logic in version_comparator.py with clearer comments.
- Enhanced logging configuration in log.py to suppress noisy loggers.
- Updated the updater logic in updator.py to provide clearer error messages for users.
- Improved IO utility functions in io.py to handle dashboard versioning more effectively.
- Enhanced dashboard server logic in server.py to prioritize bundled assets and improve user feedback.
- Updated pyproject.toml to include bundled dashboard assets and custom build hooks.
- Added a custom build script (hatch_build.py) to automate dashboard builds during package creation.

* refactor: improve exception messages and formatting in CLI command validation

* perf: change npm install to npm ci for consistent dependency installation

* fix
This commit is contained in:
Soulter
2026-03-03 12:58:59 +08:00
committed by GitHub
parent 0711ec346f
commit 6b642d7674
15 changed files with 310 additions and 187 deletions
+3
View File
@@ -36,6 +36,9 @@ dashboard/dist/
package-lock.json
yarn.lock
# Bundled dashboard dist (generated by hatch_build.py during pip wheel build)
astrbot/dashboard/dist/
# Operating System
**/.DS_Store
.DS_Store
+7 -7
View File
@@ -1,4 +1,4 @@
"""AstrBot CLI入口"""
"""AstrBot CLI entry point"""
import sys
@@ -29,23 +29,23 @@ def cli() -> None:
@click.command()
@click.argument("command_name", required=False, type=str)
def help(command_name: str | None) -> None:
"""显示命令的帮助信息
"""Display help information for commands
如果提供了 COMMAND_NAME,则显示该命令的详细帮助信息。
否则,显示通用帮助信息。
If COMMAND_NAME is provided, display detailed help for that command.
Otherwise, display general help information.
"""
ctx = click.get_current_context()
if command_name:
# 查找指定命令
# Find the specified command
command = cli.get_command(ctx, command_name)
if command:
# 显示特定命令的帮助信息
# Display help for the specific command
click.echo(command.get_help(ctx))
else:
click.echo(f"Unknown command: {command_name}")
sys.exit(1)
else:
# 显示通用帮助信息
# Display general help information
click.echo(cli.get_help(ctx))
+47 -43
View File
@@ -10,57 +10,61 @@ from ..utils import check_astrbot_root, get_astrbot_root
def _validate_log_level(value: str) -> str:
"""验证日志级别"""
"""Validate log level"""
value = value.upper()
if value not in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]:
raise click.ClickException(
"日志级别必须是 DEBUG/INFO/WARNING/ERROR/CRITICAL 之一",
"Log level must be one of DEBUG/INFO/WARNING/ERROR/CRITICAL",
)
return value
def _validate_dashboard_port(value: str) -> int:
"""验证 Dashboard 端口"""
"""Validate Dashboard port"""
try:
port = int(value)
if port < 1 or port > 65535:
raise click.ClickException("端口必须在 1-65535 范围内")
raise click.ClickException("Port must be in range 1-65535")
return port
except ValueError:
raise click.ClickException("端口必须是数字")
raise click.ClickException("Port must be a number")
def _validate_dashboard_username(value: str) -> str:
"""验证 Dashboard 用户名"""
"""Validate Dashboard username"""
if not value:
raise click.ClickException("用户名不能为空")
raise click.ClickException("Username cannot be empty")
return value
def _validate_dashboard_password(value: str) -> str:
"""验证 Dashboard 密码"""
"""Validate Dashboard password"""
if not value:
raise click.ClickException("密码不能为空")
raise click.ClickException("Password cannot be empty")
return hashlib.md5(value.encode()).hexdigest()
def _validate_timezone(value: str) -> str:
"""验证时区"""
"""Validate timezone"""
try:
zoneinfo.ZoneInfo(value)
except Exception:
raise click.ClickException(f"无效的时区: {value},请使用有效的IANA时区名称")
raise click.ClickException(
f"Invalid timezone: {value}. Please use a valid IANA timezone name"
)
return value
def _validate_callback_api_base(value: str) -> str:
"""验证回调接口基址"""
"""Validate callback API base URL"""
if not value.startswith("http://") and not value.startswith("https://"):
raise click.ClickException("回调接口基址必须以 http:// 或 https:// 开头")
raise click.ClickException(
"Callback API base must start with http:// or https://"
)
return value
# 可通过CLI设置的配置项,配置键到验证器函数的映射
# Configuration items settable via CLI, mapping config keys to validator functions
CONFIG_VALIDATORS: dict[str, Callable[[str], Any]] = {
"timezone": _validate_timezone,
"log_level": _validate_log_level,
@@ -72,11 +76,11 @@ CONFIG_VALIDATORS: dict[str, Callable[[str], Any]] = {
def _load_config() -> dict[str, Any]:
"""加载或初始化配置文件"""
"""Load or initialize config file"""
root = get_astrbot_root()
if not check_astrbot_root(root):
raise click.ClickException(
f"{root}不是有效的 AstrBot 根目录,如需初始化请使用 astrbot init",
f"{root} is not a valid AstrBot root directory. Use 'astrbot init' to initialize",
)
config_path = root / "data" / "cmd_config.json"
@@ -91,11 +95,11 @@ def _load_config() -> dict[str, Any]:
try:
return json.loads(config_path.read_text(encoding="utf-8-sig"))
except json.JSONDecodeError as e:
raise click.ClickException(f"配置文件解析失败: {e!s}")
raise click.ClickException(f"Failed to parse config file: {e!s}")
def _save_config(config: dict[str, Any]) -> None:
"""保存配置文件"""
"""Save config file"""
config_path = get_astrbot_root() / "data" / "cmd_config.json"
config_path.write_text(
@@ -105,21 +109,21 @@ def _save_config(config: dict[str, Any]) -> None:
def _set_nested_item(obj: dict[str, Any], path: str, value: Any) -> None:
"""设置嵌套字典中的值"""
"""Set a value in a nested dictionary"""
parts = path.split(".")
for part in parts[:-1]:
if part not in obj:
obj[part] = {}
elif not isinstance(obj[part], dict):
raise click.ClickException(
f"配置路径冲突: {'.'.join(parts[: parts.index(part) + 1])} 不是字典",
f"Config path conflict: {'.'.join(parts[: parts.index(part) + 1])} is not a dict",
)
obj = obj[part]
obj[parts[-1]] = value
def _get_nested_item(obj: dict[str, Any], path: str) -> Any:
"""获取嵌套字典中的值"""
"""Get a value from a nested dictionary"""
parts = path.split(".")
for part in parts:
obj = obj[part]
@@ -128,21 +132,21 @@ def _get_nested_item(obj: dict[str, Any], path: str) -> Any:
@click.group(name="conf")
def conf() -> None:
"""配置管理命令
"""Configuration management commands
支持的配置项:
Supported config keys:
- timezone: 时区设置 (例如: Asia/Shanghai)
- timezone: Timezone setting (e.g. Asia/Shanghai)
- log_level: 日志级别 (DEBUG/INFO/WARNING/ERROR/CRITICAL)
- log_level: Log level (DEBUG/INFO/WARNING/ERROR/CRITICAL)
- dashboard.port: Dashboard 端口
- dashboard.port: Dashboard port
- dashboard.username: Dashboard 用户名
- dashboard.username: Dashboard username
- dashboard.password: Dashboard 密码
- dashboard.password: Dashboard password
- callback_api_base: 回调接口基址
- callback_api_base: Callback API base URL
"""
@@ -150,9 +154,9 @@ def conf() -> None:
@click.argument("key")
@click.argument("value")
def set_config(key: str, value: str) -> None:
"""设置配置项的值"""
"""Set the value of a config item"""
if key not in CONFIG_VALIDATORS:
raise click.ClickException(f"不支持的配置项: {key}")
raise click.ClickException(f"Unsupported config key: {key}")
config = _load_config()
@@ -162,29 +166,29 @@ def set_config(key: str, value: str) -> None:
_set_nested_item(config, key, validated_value)
_save_config(config)
click.echo(f"配置已更新: {key}")
click.echo(f"Config updated: {key}")
if key == "dashboard.password":
click.echo(" 原值: ********")
click.echo(" 新值: ********")
click.echo(" Old value: ********")
click.echo(" New value: ********")
else:
click.echo(f" 原值: {old_value}")
click.echo(f" 新值: {validated_value}")
click.echo(f" Old value: {old_value}")
click.echo(f" New value: {validated_value}")
except KeyError:
raise click.ClickException(f"未知的配置项: {key}")
raise click.ClickException(f"Unknown config key: {key}")
except Exception as e:
raise click.UsageError(f"设置配置失败: {e!s}")
raise click.UsageError(f"Failed to set config: {e!s}")
@conf.command(name="get")
@click.argument("key", required=False)
def get_config(key: str | None = None) -> None:
"""获取配置项的值,不提供key则显示所有可配置项"""
"""Get the value of a config item. If no key is provided, show all configurable items"""
config = _load_config()
if key:
if key not in CONFIG_VALIDATORS:
raise click.ClickException(f"不支持的配置项: {key}")
raise click.ClickException(f"Unsupported config key: {key}")
try:
value = _get_nested_item(config, key)
@@ -192,11 +196,11 @@ def get_config(key: str | None = None) -> None:
value = "********"
click.echo(f"{key}: {value}")
except KeyError:
raise click.ClickException(f"未知的配置项: {key}")
raise click.ClickException(f"Unknown config key: {key}")
except Exception as e:
raise click.UsageError(f"获取配置失败: {e!s}")
raise click.UsageError(f"Failed to get config: {e!s}")
else:
click.echo("当前配置:")
click.echo("Current config:")
for key in CONFIG_VALIDATORS:
try:
value = (
+8 -9
View File
@@ -8,16 +8,12 @@ from ..utils import check_dashboard, get_astrbot_root
async def initialize_astrbot(astrbot_root: Path) -> None:
"""执行 AstrBot 初始化逻辑"""
"""Execute AstrBot initialization logic"""
dot_astrbot = astrbot_root / ".astrbot"
if not dot_astrbot.exists():
click.echo(f"Current Directory: {astrbot_root}")
click.echo(
"如果你确认这是 Astrbot root directory, 你需要在当前目录下创建一个 .astrbot 文件标记该目录为 AstrBot 的数据目录。",
)
if click.confirm(
f"请检查当前目录是否正确,确认正确请回车: {astrbot_root}",
f"Install AstrBot to this directory? {astrbot_root}",
default=True,
abort=True,
):
@@ -40,7 +36,7 @@ async def initialize_astrbot(astrbot_root: Path) -> None:
@click.command()
def init() -> None:
"""初始化 AstrBot"""
"""Initialize AstrBot"""
click.echo("Initializing AstrBot...")
astrbot_root = get_astrbot_root()
lock_file = astrbot_root / "astrbot.lock"
@@ -49,8 +45,11 @@ def init() -> None:
try:
with lock.acquire():
asyncio.run(initialize_astrbot(astrbot_root))
click.echo("Done! You can now run 'astrbot run' to start AstrBot")
except Timeout:
raise click.ClickException("无法获取锁文件,请检查是否有其他实例正在运行")
raise click.ClickException(
"Cannot acquire lock file. Please check if another instance is running"
)
except Exception as e:
raise click.ClickException(f"初始化失败: {e!s}")
raise click.ClickException(f"Initialization failed: {e!s}")
+54 -46
View File
@@ -16,14 +16,14 @@ from ..utils import (
@click.group()
def plug() -> None:
"""插件管理"""
"""Plugin management"""
def _get_data_path() -> Path:
base = get_astrbot_root()
if not check_astrbot_root(base):
raise click.ClickException(
f"{base}不是有效的 AstrBot 根目录,如需初始化请使用 astrbot init",
f"{base} is not a valid AstrBot root directory. Use 'astrbot init' to initialize",
)
return (base / "data").resolve()
@@ -32,7 +32,9 @@ def display_plugins(plugins, title=None, color=None) -> None:
if title:
click.echo(click.style(title, fg=color, bold=True))
click.echo(f"{'名称':<20} {'版本':<10} {'状态':<10} {'作者':<15} {'描述':<30}")
click.echo(
f"{'Name':<20} {'Version':<10} {'Status':<10} {'Author':<15} {'Description':<30}"
)
click.echo("-" * 85)
for p in plugins:
@@ -46,30 +48,30 @@ def display_plugins(plugins, title=None, color=None) -> None:
@plug.command()
@click.argument("name")
def new(name: str) -> None:
"""创建新插件"""
"""Create a new plugin"""
base_path = _get_data_path()
plug_path = base_path / "plugins" / name
if plug_path.exists():
raise click.ClickException(f"插件 {name} 已存在")
raise click.ClickException(f"Plugin {name} already exists")
author = click.prompt("请输入插件作者", type=str)
desc = click.prompt("请输入插件描述", type=str)
version = click.prompt("请输入插件版本", type=str)
author = click.prompt("Enter plugin author", type=str)
desc = click.prompt("Enter plugin description", type=str)
version = click.prompt("Enter plugin version", type=str)
if not re.match(r"^\d+\.\d+(\.\d+)?$", version.lower().lstrip("v")):
raise click.ClickException("版本号必须为 x.y x.y.z 格式")
repo = click.prompt("请输入插件仓库:", type=str)
raise click.ClickException("Version must be in x.y or x.y.z format")
repo = click.prompt("Enter plugin repository URL:", type=str)
if not repo.startswith("http"):
raise click.ClickException("仓库地址必须以 http 开头")
raise click.ClickException("Repository URL must start with http")
click.echo("下载插件模板...")
click.echo("Downloading plugin template...")
get_git_repo(
"https://github.com/Soulter/helloworld",
plug_path,
)
click.echo("重写插件信息...")
# 重写 metadata.yaml
click.echo("Rewriting plugin metadata...")
# Rewrite metadata.yaml
with open(plug_path / "metadata.yaml", "w", encoding="utf-8") as f:
f.write(
f"name: {name}\n"
@@ -79,11 +81,13 @@ def new(name: str) -> None:
f"repo: {repo}\n",
)
# 重写 README.md
# Rewrite README.md
with open(plug_path / "README.md", "w", encoding="utf-8") as f:
f.write(f"# {name}\n\n{desc}\n\n# 支持\n\n[帮助文档](https://astrbot.app)\n")
f.write(
f"# {name}\n\n{desc}\n\n# Support\n\n[Documentation](https://astrbot.app)\n"
)
# 重写 main.py
# Rewrite main.py
with open(plug_path / "main.py", encoding="utf-8") as f:
content = f.read()
@@ -95,54 +99,54 @@ def new(name: str) -> None:
with open(plug_path / "main.py", "w", encoding="utf-8") as f:
f.write(new_content)
click.echo(f"插件 {name} 创建成功")
click.echo(f"Plugin {name} created successfully")
@plug.command()
@click.option("--all", "-a", is_flag=True, help="列出未安装的插件")
@click.option("--all", "-a", is_flag=True, help="List uninstalled plugins")
def list(all: bool) -> None:
"""列出插件"""
"""List plugins"""
base_path = _get_data_path()
plugins = build_plug_list(base_path / "plugins")
# 未发布的插件
# Unpublished plugins
not_published_plugins = [
p for p in plugins if p["status"] == PluginStatus.NOT_PUBLISHED
]
if not_published_plugins:
display_plugins(not_published_plugins, "未发布的插件", "red")
display_plugins(not_published_plugins, "Unpublished Plugins", "red")
# 需要更新的插件
# Plugins needing update
need_update_plugins = [
p for p in plugins if p["status"] == PluginStatus.NEED_UPDATE
]
if need_update_plugins:
display_plugins(need_update_plugins, "需要更新的插件", "yellow")
display_plugins(need_update_plugins, "Plugins Needing Update", "yellow")
# 已安装的插件
# Installed plugins
installed_plugins = [p for p in plugins if p["status"] == PluginStatus.INSTALLED]
if installed_plugins:
display_plugins(installed_plugins, "已安装的插件", "green")
display_plugins(installed_plugins, "Installed Plugins", "green")
# 未安装的插件
# Uninstalled plugins
not_installed_plugins = [
p for p in plugins if p["status"] == PluginStatus.NOT_INSTALLED
]
if not_installed_plugins and all:
display_plugins(not_installed_plugins, "未安装的插件", "blue")
display_plugins(not_installed_plugins, "Uninstalled Plugins", "blue")
if (
not any([not_published_plugins, need_update_plugins, installed_plugins])
and not all
):
click.echo("未安装任何插件")
click.echo("No plugins installed")
@plug.command()
@click.argument("name")
@click.option("--proxy", help="代理服务器地址")
@click.option("--proxy", help="Proxy server address")
def install(name: str, proxy: str | None) -> None:
"""安装插件"""
"""Install a plugin"""
base_path = _get_data_path()
plug_path = base_path / "plugins"
plugins = build_plug_list(base_path / "plugins")
@@ -157,7 +161,7 @@ def install(name: str, proxy: str | None) -> None:
)
if not plugin:
raise click.ClickException(f"未找到可安装的插件 {name},可能是不存在或已安装")
raise click.ClickException(f"Plugin {name} not found or already installed")
manage_plugin(plugin, plug_path, is_update=False, proxy=proxy)
@@ -165,30 +169,32 @@ def install(name: str, proxy: str | None) -> None:
@plug.command()
@click.argument("name")
def remove(name: str) -> None:
"""卸载插件"""
"""Uninstall a plugin"""
base_path = _get_data_path()
plugins = build_plug_list(base_path / "plugins")
plugin = next((p for p in plugins if p["name"] == name), None)
if not plugin or not plugin.get("local_path"):
raise click.ClickException(f"插件 {name} 不存在或未安装")
raise click.ClickException(f"Plugin {name} does not exist or is not installed")
plugin_path = plugin["local_path"]
click.confirm(f"确定要卸载插件 {name} 吗?", default=False, abort=True)
click.confirm(
f"Are you sure you want to uninstall plugin {name}?", default=False, abort=True
)
try:
shutil.rmtree(plugin_path)
click.echo(f"插件 {name} 已卸载")
click.echo(f"Plugin {name} has been uninstalled")
except Exception as e:
raise click.ClickException(f"卸载插件 {name} 失败: {e}")
raise click.ClickException(f"Failed to uninstall plugin {name}: {e}")
@plug.command()
@click.argument("name", required=False)
@click.option("--proxy", help="Github代理地址")
@click.option("--proxy", help="GitHub proxy address")
def update(name: str, proxy: str | None) -> None:
"""更新插件"""
"""Update plugins"""
base_path = _get_data_path()
plug_path = base_path / "plugins"
plugins = build_plug_list(base_path / "plugins")
@@ -204,7 +210,9 @@ def update(name: str, proxy: str | None) -> None:
)
if not plugin:
raise click.ClickException(f"插件 {name} 不需要更新或无法更新")
raise click.ClickException(
f"Plugin {name} does not need updating or cannot be updated"
)
manage_plugin(plugin, plug_path, is_update=True, proxy=proxy)
else:
@@ -213,20 +221,20 @@ def update(name: str, proxy: str | None) -> None:
]
if not need_update_plugins:
click.echo("没有需要更新的插件")
click.echo("No plugins need updating")
return
click.echo(f"发现 {len(need_update_plugins)} 个插件需要更新")
click.echo(f"Found {len(need_update_plugins)} plugin(s) needing update")
for plugin in need_update_plugins:
plugin_name = plugin["name"]
click.echo(f"正在更新插件 {plugin_name}...")
click.echo(f"Updating plugin {plugin_name}...")
manage_plugin(plugin, plug_path, is_update=True, proxy=proxy)
@plug.command()
@click.argument("query")
def search(query: str) -> None:
"""搜索插件"""
"""Search for plugins"""
base_path = _get_data_path()
plugins = build_plug_list(base_path / "plugins")
@@ -239,7 +247,7 @@ def search(query: str) -> None:
]
if not matched_plugins:
click.echo(f"未找到匹配 '{query}' 的插件")
click.echo(f"No plugins matching '{query}' found")
return
display_plugins(matched_plugins, f"搜索结果: '{query}'", "cyan")
display_plugins(matched_plugins, f"Search results: '{query}'", "cyan")
+11 -9
View File
@@ -11,7 +11,7 @@ from ..utils import check_astrbot_root, check_dashboard, get_astrbot_root
async def run_astrbot(astrbot_root: Path) -> None:
"""运行 AstrBot"""
"""Run AstrBot"""
from astrbot.core import LogBroker, LogManager, db_helper, logger
from astrbot.core.initial_loader import InitialLoader
@@ -26,18 +26,18 @@ async def run_astrbot(astrbot_root: Path) -> None:
await core_lifecycle.start()
@click.option("--reload", "-r", is_flag=True, help="插件自动重载")
@click.option("--port", "-p", help="Astrbot Dashboard端口", required=False, type=str)
@click.option("--reload", "-r", is_flag=True, help="Auto-reload plugins")
@click.option("--port", "-p", help="AstrBot Dashboard port", required=False, type=str)
@click.command()
def run(reload: bool, port: str) -> None:
"""运行 AstrBot"""
"""Run AstrBot"""
try:
os.environ["ASTRBOT_CLI"] = "1"
astrbot_root = get_astrbot_root()
if not check_astrbot_root(astrbot_root):
raise click.ClickException(
f"{astrbot_root}不是有效的 AstrBot 根目录,如需初始化请使用 astrbot init",
f"{astrbot_root} is not a valid AstrBot root directory. Use 'astrbot init' to initialize",
)
os.environ["ASTRBOT_ROOT"] = str(astrbot_root)
@@ -47,7 +47,7 @@ def run(reload: bool, port: str) -> None:
os.environ["DASHBOARD_PORT"] = port
if reload:
click.echo("启用插件自动重载")
click.echo("Plugin auto-reload enabled")
os.environ["ASTRBOT_RELOAD"] = "1"
lock_file = astrbot_root / "astrbot.lock"
@@ -55,8 +55,10 @@ def run(reload: bool, port: str) -> None:
with lock.acquire():
asyncio.run(run_astrbot(astrbot_root))
except KeyboardInterrupt:
click.echo("AstrBot 已关闭...")
click.echo("AstrBot has been shut down.")
except Timeout:
raise click.ClickException("无法获取锁文件,请检查是否有其他实例正在运行")
raise click.ClickException(
"Cannot acquire lock file. Please check if another instance is running"
)
except Exception as e:
raise click.ClickException(f"运行时出现错误: {e}\n{traceback.format_exc()}")
raise click.ClickException(f"Runtime error: {e}\n{traceback.format_exc()}")
+21 -13
View File
@@ -2,9 +2,12 @@ from pathlib import Path
import click
# Static assets bundled inside the installed wheel (built by hatch_build.py).
_BUNDLED_DIST = Path(__file__).parent.parent.parent / "dashboard" / "dist"
def check_astrbot_root(path: str | Path) -> bool:
"""检查路径是否为 AstrBot 根目录"""
"""Check if the path is an AstrBot root directory"""
if not isinstance(path, Path):
path = Path(path)
if not path.exists() or not path.is_dir():
@@ -15,43 +18,48 @@ def check_astrbot_root(path: str | Path) -> bool:
def get_astrbot_root() -> Path:
"""获取Astrbot根目录路径"""
"""Get the AstrBot root directory path"""
return Path.cwd()
async def check_dashboard(astrbot_root: Path) -> None:
"""检查是否安装了dashboard"""
"""Check if the dashboard is installed"""
from astrbot.core.config.default import VERSION
from astrbot.core.utils.io import download_dashboard, get_dashboard_version
from .version_comparator import VersionComparator
# If the wheel ships bundled dashboard assets, no network download is needed.
if _BUNDLED_DIST.exists():
click.echo("Dashboard is bundled with the package skipping download.")
return
try:
dashboard_version = await get_dashboard_version()
match dashboard_version:
case None:
click.echo("未安装管理面板")
click.echo("Dashboard is not installed")
if click.confirm(
"是否安装管理面板?",
"Install dashboard?",
default=True,
abort=True,
):
click.echo("正在安装管理面板...")
click.echo("Installing dashboard...")
await download_dashboard(
path="data/dashboard.zip",
extract_path=str(astrbot_root),
version=f"v{VERSION}",
latest=False,
)
click.echo("管理面板安装完成")
click.echo("Dashboard installed successfully")
case str():
if VersionComparator.compare_version(VERSION, dashboard_version) <= 0:
click.echo("管理面板已是最新版本")
click.echo("Dashboard is already up to date")
return
try:
version = dashboard_version.split("v")[1]
click.echo(f"管理面板版本: {version}")
click.echo(f"Dashboard version: {version}")
await download_dashboard(
path="data/dashboard.zip",
extract_path=str(astrbot_root),
@@ -59,10 +67,10 @@ async def check_dashboard(astrbot_root: Path) -> None:
latest=False,
)
except Exception as e:
click.echo(f"下载管理面板失败: {e}")
click.echo(f"Failed to download dashboard: {e}")
return
except FileNotFoundError:
click.echo("初始化管理面板目录...")
click.echo("Initializing dashboard directory...")
try:
await download_dashboard(
path=str(astrbot_root / "dashboard.zip"),
@@ -70,7 +78,7 @@ async def check_dashboard(astrbot_root: Path) -> None:
version=f"v{VERSION}",
latest=False,
)
click.echo("管理面板初始化完成")
click.echo("Dashboard initialized successfully")
except Exception as e:
click.echo(f"下载管理面板失败: {e}")
click.echo(f"Failed to download dashboard: {e}")
return
+47 -43
View File
@@ -13,22 +13,22 @@ from .version_comparator import VersionComparator
class PluginStatus(str, Enum):
INSTALLED = "已安装"
NEED_UPDATE = "需更新"
NOT_INSTALLED = "未安装"
NOT_PUBLISHED = "未发布"
INSTALLED = "installed"
NEED_UPDATE = "needs-update"
NOT_INSTALLED = "not-installed"
NOT_PUBLISHED = "unpublished"
def get_git_repo(url: str, target_path: Path, proxy: str | None = None) -> None:
"""从 Git 仓库下载代码并解压到指定路径"""
"""Download code from a Git repository and extract to the specified path"""
temp_dir = Path(tempfile.mkdtemp())
try:
# 解析仓库信息
# Parse repository info
repo_namespace = url.split("/")[-2:]
author = repo_namespace[0]
repo = repo_namespace[1]
# 尝试获取最新的 release
# Try to get the latest release
release_url = f"https://api.github.com/repos/{author}/{repo}/releases"
try:
with httpx.Client(
@@ -40,21 +40,21 @@ def get_git_repo(url: str, target_path: Path, proxy: str | None = None) -> None:
releases = resp.json()
if releases:
# 使用最新的 release
# Use the latest release
download_url = releases[0]["zipball_url"]
else:
# 没有 release,使用默认分支
click.echo(f"正在从默认分支下载 {author}/{repo}")
# No release found, use default branch
click.echo(f"Downloading {author}/{repo} from default branch")
download_url = f"https://github.com/{author}/{repo}/archive/refs/heads/master.zip"
except Exception as e:
click.echo(f"获取 release 信息失败: {e},将直接使用提供的 URL")
click.echo(f"Failed to get release info: {e}. Using provided URL directly")
download_url = url
# 应用代理
# Apply proxy
if proxy:
download_url = f"{proxy}/{download_url}"
# 下载并解压
# Download and extract
with httpx.Client(
proxy=proxy if proxy else None,
follow_redirects=True,
@@ -65,7 +65,7 @@ def get_git_repo(url: str, target_path: Path, proxy: str | None = None) -> None:
and "archive/refs/heads/master.zip" in download_url
):
alt_url = download_url.replace("master.zip", "main.zip")
click.echo("master 分支不存在,尝试下载 main 分支")
click.echo("Branch 'master' not found, trying 'main' branch")
resp = client.get(alt_url)
resp.raise_for_status()
else:
@@ -84,13 +84,13 @@ def get_git_repo(url: str, target_path: Path, proxy: str | None = None) -> None:
def load_yaml_metadata(plugin_dir: Path) -> dict:
""" metadata.yaml 文件加载插件元数据
"""Load plugin metadata from metadata.yaml file
Args:
plugin_dir: 插件目录路径
plugin_dir: Plugin directory path
Returns:
dict: 包含元数据的字典,如果读取失败则返回空字典
dict: Dictionary containing metadata, or empty dict if loading fails
"""
yaml_path = plugin_dir / "metadata.yaml"
@@ -98,33 +98,33 @@ def load_yaml_metadata(plugin_dir: Path) -> dict:
try:
return yaml.safe_load(yaml_path.read_text(encoding="utf-8")) or {}
except Exception as e:
click.echo(f"读取 {yaml_path} 失败: {e}", err=True)
click.echo(f"Failed to read {yaml_path}: {e}", err=True)
return {}
def build_plug_list(plugins_dir: Path) -> list:
"""构建插件列表,包含本地和在线插件信息
"""Build plugin list containing local and online plugin information
Args:
plugins_dir (Path): 插件目录路径
plugins_dir (Path): Plugin directory path
Returns:
list: 包含插件信息的字典列表
list: List of dicts containing plugin information
"""
# 获取本地插件信息
# Get local plugin info
result = []
if plugins_dir.exists():
for plugin_name in [d.name for d in plugins_dir.glob("*") if d.is_dir()]:
plugin_dir = plugins_dir / plugin_name
# metadata.yaml 加载元数据
# Load metadata from metadata.yaml
metadata = load_yaml_metadata(plugin_dir)
if "desc" not in metadata and "description" in metadata:
metadata["desc"] = metadata["description"]
# 如果成功加载元数据,添加到结果列表
# If metadata loaded successfully, add to result list
if metadata and all(
k in metadata for k in ["name", "desc", "version", "author", "repo"]
):
@@ -140,7 +140,7 @@ def build_plug_list(plugins_dir: Path) -> list:
},
)
# 获取在线插件列表
# Get online plugin list
online_plugins = []
try:
with httpx.Client() as client:
@@ -160,13 +160,13 @@ def build_plug_list(plugins_dir: Path) -> list:
},
)
except Exception as e:
click.echo(f"获取在线插件列表失败: {e}", err=True)
click.echo(f"Failed to get online plugin list: {e}", err=True)
# 与在线插件比对,更新状态
# Compare with online plugins and update status
online_plugin_names = {plugin["name"] for plugin in online_plugins}
for local_plugin in result:
if local_plugin["name"] in online_plugin_names:
# 查找对应的在线插件
# Find the corresponding online plugin
online_plugin = next(
p for p in online_plugins if p["name"] == local_plugin["name"]
)
@@ -179,10 +179,10 @@ def build_plug_list(plugins_dir: Path) -> list:
):
local_plugin["status"] = PluginStatus.NEED_UPDATE
else:
# 本地插件未在线上发布
# Local plugin is not published online
local_plugin["status"] = PluginStatus.NOT_PUBLISHED
# 添加未安装的在线插件
# Add uninstalled online plugins
for online_plugin in online_plugins:
if not any(plugin["name"] == online_plugin["name"] for plugin in result):
result.append(online_plugin)
@@ -196,19 +196,19 @@ def manage_plugin(
is_update: bool = False,
proxy: str | None = None,
) -> None:
"""安装或更新插件
"""Install or update a plugin
Args:
plugin (dict): 插件信息字典
plugins_dir (Path): 插件目录
is_update (bool, optional): 是否为更新操作. 默认为 False
proxy (str, optional): 代理服务器地址
plugin (dict): Plugin info dict
plugins_dir (Path): Plugins directory
is_update (bool, optional): Whether this is an update operation. Defaults to False
proxy (str, optional): Proxy server address
"""
plugin_name = plugin["name"]
repo_url = plugin["repo"]
# 如果是更新且有本地路径,直接使用本地路径
# If updating and local path exists, use it directly
if is_update and plugin.get("local_path"):
target_path = Path(plugin["local_path"])
else:
@@ -216,11 +216,13 @@ def manage_plugin(
backup_path = Path(f"{target_path}_backup") if is_update else None
# 检查插件是否存在
# Check if plugin exists
if is_update and not target_path.exists():
raise click.ClickException(f"插件 {plugin_name} 未安装,无法更新")
raise click.ClickException(
f"Plugin {plugin_name} is not installed and cannot be updated"
)
# 备份现有插件
# Backup existing plugin
if is_update and backup_path is not None and backup_path.exists():
shutil.rmtree(backup_path)
if is_update and backup_path is not None:
@@ -228,19 +230,21 @@ def manage_plugin(
try:
click.echo(
f"正在从 {repo_url} {'更新' if is_update else '下载'}插件 {plugin_name}...",
f"{'Updating' if is_update else 'Downloading'} plugin {plugin_name} from {repo_url}...",
)
get_git_repo(repo_url, target_path, proxy)
# 更新成功,删除备份
# Update succeeded, delete backup
if is_update and backup_path is not None and backup_path.exists():
shutil.rmtree(backup_path)
click.echo(f"插件 {plugin_name} {'更新' if is_update else '安装'}成功")
click.echo(
f"Plugin {plugin_name} {'updated' if is_update else 'installed'} successfully"
)
except Exception as e:
if target_path.exists():
shutil.rmtree(target_path, ignore_errors=True)
if is_update and backup_path is not None and backup_path.exists():
shutil.move(backup_path, target_path)
raise click.ClickException(
f"{'更新' if is_update else '安装'}插件 {plugin_name} 时出错: {e}",
f"Error {'updating' if is_update else 'installing'} plugin {plugin_name}: {e}",
)
+11 -11
View File
@@ -1,4 +1,4 @@
"""拷贝自 astrbot.core.utils.version_comparator"""
"""Copied from astrbot.core.utils.version_comparator"""
import re
@@ -6,11 +6,11 @@ import re
class VersionComparator:
@staticmethod
def compare_version(v1: str, v2: str) -> int:
"""根据 Semver 语义版本规范来比较版本号的大小。支持不仅局限于 3 个数字的版本号,并处理预发布标签。
"""Compare version numbers according to Semver semantics. Supports version numbers with more than 3 digits and handles pre-release tags.
参考: https://semver.org/lang/zh-CN/
Reference: https://semver.org/
返回 1 表示 v1 > v2,返回 -1 表示 v1 < v2,返回 0 表示 v1 = v2
Returns 1 if v1 > v2, -1 if v1 < v2, 0 if v1 == v2.
"""
v1 = v1.lower().replace("v", "")
v2 = v2.lower().replace("v", "")
@@ -24,7 +24,7 @@ class VersionComparator:
return [], None
major_minor_patch = match.group(1).split(".")
prerelease = match.group(2)
# buildmetadata = match.group(3) # 构建元数据在比较时忽略
# buildmetadata = match.group(3) # Build metadata is ignored in comparison
parts = [int(x) for x in major_minor_patch]
prerelease = VersionComparator._split_prerelease(prerelease)
return parts, prerelease
@@ -32,7 +32,7 @@ class VersionComparator:
v1_parts, v1_prerelease = split_version(v1)
v2_parts, v2_prerelease = split_version(v2)
# 比较数字部分
# Compare numeric parts
length = max(len(v1_parts), len(v2_parts))
v1_parts.extend([0] * (length - len(v1_parts)))
v2_parts.extend([0] * (length - len(v2_parts)))
@@ -43,11 +43,11 @@ class VersionComparator:
if v1_parts[i] < v2_parts[i]:
return -1
# 比较预发布标签
# Compare pre-release tags
if v1_prerelease is None and v2_prerelease is not None:
return 1 # 没有预发布标签的版本高于有预发布标签的版本
return 1 # Version without pre-release tag is higher than one with it
if v1_prerelease is not None and v2_prerelease is None:
return -1 # 有预发布标签的版本低于没有预发布标签的版本
return -1 # Version with pre-release tag is lower than one without it
if v1_prerelease is not None and v2_prerelease is not None:
len_pre = max(len(v1_prerelease), len(v2_prerelease))
for i in range(len_pre):
@@ -72,9 +72,9 @@ class VersionComparator:
return 1
if p1 < p2:
return -1
return 0 # 预发布标签完全相同
return 0 # Pre-release tags are identical
return 0 # 数字部分和预发布标签都相同
return 0 # Both numeric parts and pre-release tags are equal
@staticmethod
def _split_prerelease(prerelease):
+4
View File
@@ -175,6 +175,10 @@ class LogManager:
_trace_sink_id: int | None = None
_NOISY_LOGGER_LEVELS: dict[str, int] = {
"aiosqlite": logging.WARNING,
"filelock": logging.WARNING,
"asyncio": logging.WARNING,
"tzlocal": logging.WARNING,
"apscheduler": logging.WARNING,
}
@classmethod
+3 -1
View File
@@ -149,7 +149,9 @@ class AstrBotUpdator(RepoZipUpdator):
file_url = None
if os.environ.get("ASTRBOT_CLI") or os.environ.get("ASTRBOT_LAUNCHER"):
raise Exception("不支持更新此方式启动的AstrBot") # 避免版本管理混乱
raise Exception(
"Error: You are running AstrBot via CLI, please use `pip` or `uv tool upgrade` to update AstrBot."
) # 避免版本管理混乱
if latest:
latest_version = update_data[0]["tag_name"]
+7 -1
View File
@@ -14,7 +14,7 @@ import certifi
import psutil
from PIL import Image
from .astrbot_path import get_astrbot_data_path, get_astrbot_temp_path
from .astrbot_path import get_astrbot_data_path, get_astrbot_path, get_astrbot_temp_path
logger = logging.getLogger("astrbot")
@@ -219,7 +219,13 @@ def get_local_ip_addresses():
async def get_dashboard_version():
# First check user data directory (manually updated / downloaded dashboard).
dist_dir = os.path.join(get_astrbot_data_path(), "dist")
if not os.path.exists(dist_dir):
# Fall back to the dist bundled inside the installed wheel.
_bundled = Path(get_astrbot_path()) / "astrbot" / "dashboard" / "dist"
if _bundled.exists():
dist_dir = str(_bundled)
if os.path.exists(dist_dir):
version_file = os.path.join(dist_dir, "assets", "version")
if os.path.exists(version_file):
+16 -4
View File
@@ -33,6 +33,9 @@ from .routes.session_management import SessionManagementRoute
from .routes.subagent import SubAgentRoute
from .routes.t2i import T2iRoute
# Static assets shipped inside the wheel (built during `hatch build`).
_BUNDLED_DIST = Path(__file__).parent / "dist"
class _AddrWithPort(Protocol):
port: int
@@ -66,13 +69,22 @@ class AstrBotDashboard:
self.config = core_lifecycle.astrbot_config
self.db = db
# 参数指定webui目录
# Path priority:
# 1. Explicit webui_dir argument
# 2. data/dist/ (user-installed / manually updated dashboard)
# 3. astrbot/dashboard/dist/ (bundled with the wheel)
if webui_dir and os.path.exists(webui_dir):
self.data_path = os.path.abspath(webui_dir)
else:
self.data_path = os.path.abspath(
os.path.join(get_astrbot_data_path(), "dist"),
)
user_dist = os.path.join(get_astrbot_data_path(), "dist")
if os.path.exists(user_dist):
self.data_path = os.path.abspath(user_dist)
elif _BUNDLED_DIST.exists():
self.data_path = str(_BUNDLED_DIST)
logger.info("Using bundled dashboard dist: %s", self.data_path)
else:
# Fall back to expected user path (will fail gracefully later)
self.data_path = os.path.abspath(user_dist)
self.app = Quart("dashboard", static_folder=self.data_path, static_url_path="/")
APP = self.app # noqa
+8
View File
@@ -114,6 +114,14 @@ exclude = ["dashboard", "node_modules", "dist", "data", "tests"]
[tool.hatch.metadata]
allow-direct-references = true
# Include bundled dashboard dist even though it is not tracked by VCS.
[tool.hatch.build.targets.wheel]
artifacts = ["astrbot/dashboard/dist/**"]
# Custom build hook: builds the Vue dashboard and copies dist into the package.
[tool.hatch.build.hooks.custom]
path = "scripts/hatch_build.py"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
+63
View File
@@ -0,0 +1,63 @@
"""
Custom Hatchling build hook.
During `hatch build` (or `pip wheel`), this hook:
1. Runs `npm run build` inside the `dashboard/` directory.
2. Copies the resulting `dashboard/dist/` tree into
`astrbot/dashboard/dist/` so the static assets are shipped
inside the Python wheel.
"""
import shutil
import subprocess
import sys
from pathlib import Path
from hatchling.builders.hooks.plugin.interface import BuildHookInterface
class CustomBuildHook(BuildHookInterface):
PLUGIN_NAME = "custom"
def initialize(self, version: str, build_data: dict) -> None:
root = Path(self.root)
dashboard_src = root / "dashboard"
dist_src = dashboard_src / "dist"
dist_target = root / "astrbot" / "dashboard" / "dist"
if not dashboard_src.exists():
print(
"[hatch_build] 'dashboard/' directory not found skipping dashboard build.",
file=sys.stderr,
)
return
# ── Install Node dependencies if node_modules is absent ─────────────
if not (dashboard_src / "node_modules").exists():
print("[hatch_build] Installing dashboard Node dependencies...")
subprocess.run(
["npm", "install"],
cwd=dashboard_src,
check=True,
)
# ── Build the Vue/Vite dashboard ──────────────────────────────────────
print("[hatch_build] Building Vue dashboard (npm run build)...")
subprocess.run(
["npm", "run", "build"],
cwd=dashboard_src,
check=True,
)
if not dist_src.exists():
print(
"[hatch_build] dashboard/dist not found after build skipping copy.",
file=sys.stderr,
)
return
# ── Copy into the Python package tree ────────────────────────────────
if dist_target.exists():
shutil.rmtree(dist_target)
shutil.copytree(dist_src, dist_target)
print(f"[hatch_build] Dashboard dist copied → {dist_target.relative_to(root)}")