From 458e8e0db87bdd3c05b614bf1eae22194367ae70 Mon Sep 17 00:00:00 2001 From: LIghtJUNction Date: Tue, 17 Mar 2026 18:48:19 +0800 Subject: [PATCH] fix(cli): recover flags consumed by -E option and prompt for recipient --- astrbot/cli/commands/cmd_bk.py | 84 +++++++++++++++++++++++++++++++++- tests/cli/test_bk.py | 20 ++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/astrbot/cli/commands/cmd_bk.py b/astrbot/cli/commands/cmd_bk.py index 322c891dc..08a2cad72 100644 --- a/astrbot/cli/commands/cmd_bk.py +++ b/astrbot/cli/commands/cmd_bk.py @@ -109,6 +109,27 @@ def export_data( -> Signs, encrypts for Bob, and generates a SHA256 checksum file. """ + # Handle case where -E consumes the next flag (e.g. -E -S) + if gpg_encrypt and gpg_encrypt.startswith("-"): + consumed_flag = gpg_encrypt + click.echo( + click.style( + f"Warning: Flag '{consumed_flag}' was interpreted as the recipient for -E.", + fg="yellow", + ) + ) + + # Recover flags + if consumed_flag == "-S": + gpg_sign = True + click.echo("Recovered flag -S (Sign).") + elif consumed_flag == "-C": + gpg_symmetric = True + click.echo("Recovered flag -C (Symmetric).") + + # Prompt for the actual recipient + gpg_encrypt = click.prompt("Please enter the GPG recipient (email or key ID)") + async def _run(): if gpg_sign or gpg_encrypt or gpg_symmetric: if not shutil.which("gpg"): @@ -200,16 +221,77 @@ def export_data( @bk.command(name="import") @click.argument("backup_file") @click.option("--yes", "-y", is_flag=True, help="Skip confirmation prompts") -def import_data(backup_file: str, yes: bool): +def import_data_command(backup_file: str, yes: bool): """Import AstrBot data from a backup archive. Automatically handles .zip files and .gpg files (signed or encrypted). If the file is encrypted, you will be prompted for the passphrase. + If a digest file (.sha256, .md5, etc.) exists, it will be verified automatically. """ backup_path = Path(backup_file) if not backup_path.exists(): raise click.ClickException(f"Backup file not found: {backup_file}") + # 1. Verify Digest if exists + def _verify_digest(file_path: Path) -> bool: + supported_digests = ["sha256", "sha512", "md5", "sha1"] + digest_verified = True # Default true if no digest file found + + for algo in supported_digests: + digest_file = file_path.with_name(f"{file_path.name}.{algo}") + if digest_file.exists(): + click.echo(f"Found digest file: {digest_file.name}") + try: + # Parse digest file + content = digest_file.read_text(encoding="utf-8").strip() + # Format: "digest *filename" or "digest filename" + # We expect the hash to be the first part + if " " in content: + expected_digest = content.split()[0].lower() + else: + expected_digest = content.lower() + + click.echo(f"Verifying {algo} digest...") + hash_func = getattr(hashlib, algo)() + with open(file_path, "rb") as f: + while chunk := f.read(8192): + hash_func.update(chunk) + + calculated_digest = hash_func.hexdigest().lower() + + if calculated_digest == expected_digest: + click.echo( + click.style("Digest verification PASSED.", fg="green") + ) + else: + click.echo( + click.style( + "Digest verification FAILED!", fg="red", bold=True + ) + ) + click.echo(f" Expected: {expected_digest}") + click.echo(f" Actual: {calculated_digest}") + digest_verified = False + except Exception as e: + click.echo(click.style(f"Error checking digest: {e}", fg="red")) + digest_verified = False + + return digest_verified + + if not _verify_digest(backup_path): + if not yes: + if not click.confirm( + "Digest verification failed. Abort import?", default=True, abort=True + ): + pass + else: + click.echo( + click.style( + "Warning: Digest verification failed. Continuing due to --yes.", + fg="yellow", + ) + ) + if not yes: click.confirm( "This will OVERWRITE all current data (DB, Config, Plugins). Continue?", diff --git a/tests/cli/test_bk.py b/tests/cli/test_bk.py index 36fe17b5e..a210e6521 100644 --- a/tests/cli/test_bk.py +++ b/tests/cli/test_bk.py @@ -211,3 +211,23 @@ def test_export_gpg_missing(mock_exporter, mock_kb_manager): assert result.exit_code != 0 assert "GPG tool not found" in result.output + + +def test_export_gpg_recipient_recovery(mock_exporter, mock_kb_manager, mock_gpg_tools): + """Test recovery when -E consumes a flag""" + _, mock_exec = mock_gpg_tools + runner = CliRunner() + + with patch("pathlib.Path.unlink"), patch("pathlib.Path.exists", return_value=True): + # input="user@example.com\n" provides the recipient when prompted + result = runner.invoke(bk, ["export", "-E", "-S"], input="user@example.com\n") + + assert result.exit_code == 0 + assert "Warning: Flag '-S' was interpreted as the recipient" in result.output + assert "Recovered flag -S (Sign)" in result.output + + # Verify GPG command has both sign and encrypt with correct recipient + args = mock_exec.call_args[0] + assert "--sign" in args + assert "--encrypt" in args + assert "user@example.com" in args