Files
Dt8333 7dd95d8a59 chore: auto ann fix by ruff (#4903)
* chore: auto fix by ruff

* refactor: 统一修正返回类型注解为 None/bool 以匹配实现

* refactor: 将 _get_next_page 改为异步并移除多余的请求错误抛出

* refactor: 将 get_client 的返回类型改为 object

* style: 为 LarkMessageEvent 的相关方法添加返回类型注解 None

---------

Co-authored-by: Soulter <37870767+Soulter@users.noreply.github.com>
2026-02-09 00:22:24 +08:00

82 lines
3.0 KiB
Python

import os
import shutil
import zipfile
from astrbot.core import logger
from astrbot.core.utils.astrbot_path import get_astrbot_plugin_path
from astrbot.core.utils.io import on_error, remove_dir
from ..star.star import StarMetadata
from ..updator import RepoZipUpdator
class PluginUpdator(RepoZipUpdator):
def __init__(self, repo_mirror: str = "") -> None:
super().__init__(repo_mirror)
self.plugin_store_path = get_astrbot_plugin_path()
def get_plugin_store_path(self) -> str:
return self.plugin_store_path
async def install(self, repo_url: str, proxy="") -> str:
_, repo_name, _ = self.parse_github_url(repo_url)
repo_name = self.format_name(repo_name)
plugin_path = os.path.join(self.plugin_store_path, repo_name)
await self.download_from_repo_url(plugin_path, repo_url, proxy)
self.unzip_file(plugin_path + ".zip", plugin_path)
return plugin_path
async def update(self, plugin: StarMetadata, proxy="") -> str:
repo_url = plugin.repo
if not repo_url:
raise Exception(f"插件 {plugin.name} 没有指定仓库地址。")
if not plugin.root_dir_name:
raise Exception(f"插件 {plugin.name} 的根目录名未指定。")
plugin_path = os.path.join(self.plugin_store_path, plugin.root_dir_name)
logger.info(f"正在更新插件,路径: {plugin_path},仓库地址: {repo_url}")
await self.download_from_repo_url(plugin_path, repo_url, proxy=proxy)
try:
remove_dir(plugin_path)
except BaseException as e:
logger.error(
f"删除旧版本插件 {plugin_path} 文件夹失败: {e!s},使用覆盖安装。",
)
self.unzip_file(plugin_path + ".zip", plugin_path)
return plugin_path
def unzip_file(self, zip_path: str, target_dir: str) -> None:
os.makedirs(target_dir, exist_ok=True)
update_dir = ""
logger.info(f"正在解压压缩包: {zip_path}")
with zipfile.ZipFile(zip_path, "r") as z:
update_dir = z.namelist()[0]
z.extractall(target_dir)
files = os.listdir(os.path.join(target_dir, update_dir))
for f in files:
if os.path.isdir(os.path.join(target_dir, update_dir, f)):
if os.path.exists(os.path.join(target_dir, f)):
shutil.rmtree(os.path.join(target_dir, f), onerror=on_error)
elif os.path.exists(os.path.join(target_dir, f)):
os.remove(os.path.join(target_dir, f))
shutil.move(os.path.join(target_dir, update_dir, f), target_dir)
try:
logger.info(
f"删除临时文件: {zip_path}{os.path.join(target_dir, update_dir)}",
)
shutil.rmtree(os.path.join(target_dir, update_dir), onerror=on_error)
os.remove(zip_path)
except BaseException:
logger.warning(
f"删除更新文件失败,可以手动删除 {zip_path}{os.path.join(target_dir, update_dir)}",
)