mirror of
https://github.com/hanxi/xiaomusic.git
synced 2025-12-06 14:52:50 +08:00
feat: 实现更新接口
This commit is contained in:
10
test/test_update.py
Normal file
10
test/test_update.py
Normal file
@@ -0,0 +1,10 @@
|
||||
from xiaomusic.utils import (
|
||||
download_and_extract,
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
|
||||
url = "https://github.hanxi.cc/proxy/hanxi/xiaomusic/releases/download/main/app-amd64-lite.tar.gz"
|
||||
target_directory = "./tmp/app"
|
||||
asyncio.run(download_and_extract(url, target_directory))
|
||||
@@ -11,11 +11,13 @@ import json
|
||||
import logging
|
||||
import mimetypes
|
||||
import os
|
||||
import platform
|
||||
import random
|
||||
import re
|
||||
import shutil
|
||||
import string
|
||||
import subprocess
|
||||
import tarfile
|
||||
import tempfile
|
||||
import urllib.parse
|
||||
from collections.abc import AsyncIterator
|
||||
@@ -1027,19 +1029,70 @@ def _restart_xiaomusic():
|
||||
return True
|
||||
|
||||
|
||||
def update_version(version):
|
||||
async def update_version(version: str, lite: bool):
|
||||
if not is_docker():
|
||||
ret = "xiaomusic 更新只能在 docker 中进行"
|
||||
log.info(ret)
|
||||
return ret
|
||||
|
||||
lite_tag = ""
|
||||
if lite:
|
||||
lite_tag = "-lite"
|
||||
arch = get_os_architecture()
|
||||
if "unknown" in arch:
|
||||
log.warning(f"update_version failed: {arch}")
|
||||
return arch
|
||||
# https://github.com/hanxi/xiaomusic/releases/download/main/app-amd64-lite.tar.gz
|
||||
url = f"https://github.hanxi.cc/proxy/hanxi/xiaomusic/releases/download/{version}/app-{arch}{lite_tag}.tar.gz"
|
||||
target_directory = "/app"
|
||||
await download_and_extract(url, target_directory)
|
||||
|
||||
ok, ret = _restart_xiaomusic()
|
||||
if not ok:
|
||||
return ret
|
||||
|
||||
|
||||
def get_os_architecture():
|
||||
"""
|
||||
获取操作系统架构类型:amd64、arm64、arm-v7。
|
||||
|
||||
Returns:
|
||||
str: 架构类型
|
||||
"""
|
||||
arch = platform.machine().lower()
|
||||
|
||||
if arch in ("x86_64", "amd64"):
|
||||
return "amd64"
|
||||
elif arch in ("aarch64", "arm64"):
|
||||
return "arm64"
|
||||
elif "arm" in arch or "armv7" in arch:
|
||||
return "arm-v7"
|
||||
else:
|
||||
return f"unknown architecture: {arch}"
|
||||
|
||||
|
||||
async def download_and_extract(url: str, target_directory: str):
|
||||
# 创建目标目录
|
||||
os.makedirs(target_directory, exist_ok=True)
|
||||
|
||||
# 使用 aiohttp 异步下载文件
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as response:
|
||||
if response.status == 200:
|
||||
file_name = os.path.join(target_directory, url.split("/")[-1])
|
||||
with open(file_name, "wb") as f:
|
||||
# 以块的方式下载文件,防止内存占用过大
|
||||
async for chunk in response.content.iter_any():
|
||||
f.write(chunk)
|
||||
log.info(f"文件下载完成: {file_name}")
|
||||
# 解压下载的文件
|
||||
if file_name.endswith(".tar.gz"):
|
||||
with tarfile.open(file_name, "r:gz") as tar:
|
||||
tar.extractall(path=target_directory)
|
||||
log.info(f"文件解压完成到: {target_directory}")
|
||||
else:
|
||||
log.warning(f"下载失败, 状态码: {response.status}")
|
||||
|
||||
|
||||
def chmodfile(file_path: str):
|
||||
try:
|
||||
os.chmod(file_path, 0o775)
|
||||
|
||||
Reference in New Issue
Block a user