Files
better-genshin-impact/.github/workflows/cnb_release.py
2025-08-20 01:40:02 +08:00

452 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CNB Release Uploader
依赖:
- requests
- tqdm
安装依赖:
pip install requests tqdm
"""
import os
import json
import requests
import time
import sys
import argparse
from typing import List, Dict, Optional
from pathlib import Path
from tqdm import tqdm
class CNBReleaseUploader:
def __init__(self, token: str, base_url: str = "https://api.cnb.cool"):
"""
初始化CNB Release上传器
Args:
token: 认证token
base_url: API基础URL
"""
self.token = token
self.base_url = base_url
self.headers = {
'Accept': 'application/json',
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'User-Agent': 'CNB-Release-Uploader/1.0.0',
'Host': 'api.cnb.cool',
'Connection': 'keep-alive'
}
def create_release(self, project_path: str, release_data: Dict) -> Optional[Dict]:
"""
创建一个新的release
Args:
project_path: 项目路径 (例如: "bettergi/better-genshin-impact")
release_data: release数据
Returns:
创建的release信息或None
"""
url = f"{self.base_url}/{project_path}/-/releases"
# 打印请求信息
print("\n📋 请求头 (Headers):")
for key, value in self.headers.items():
print(f" {key}: {value}")
try:
response = requests.post(url, headers=self.headers, json=release_data)
response.raise_for_status()
release_info = response.json()
print(f"✅ Release创建成功: {release_info.get('name', 'N/A')}")
print(f" Release ID: {release_info.get('id', 'N/A')}")
print(f" Tag Name: {release_info.get('tag_name', 'N/A')}")
print(f" Created At: {release_info.get('created_at', 'N/A')}")
print(f" Is Latest: {release_info.get('is_latest', False)}")
return release_info
except requests.exceptions.RequestException as e:
print(f"❌ 创建release失败: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f" 状态码: {e.response.status_code}")
print(f" 响应内容: {e.response.text}")
return None
def get_asset_upload_url(self, project_path: str, release_id: str, asset_name: str,
file_size: int, overwrite: bool = True) -> Optional[Dict]:
"""
获取asset上传URL
Args:
project_path: 项目路径
release_id: release ID
asset_name: asset名称
file_size: 文件大小
overwrite: 是否覆盖现有文件
Returns:
包含upload_url等信息的字典或None
"""
url = f"{self.base_url}/{project_path}/-/releases/{release_id}/asset-upload-url"
data = {
"asset_name": asset_name,
"overwrite": overwrite,
"size": file_size
}
try:
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
upload_info = response.json()
print(f"✅ 获取上传URL成功")
return upload_info
except requests.exceptions.RequestException as e:
print(f"❌ 获取上传URL失败 ({asset_name}): {e}")
if hasattr(e, 'response') and e.response is not None:
print(f" 状态码: {e.response.status_code}")
print(f" 响应内容: {e.response.text}")
return None
def upload_asset(self, upload_url: str, file_path: str, show_progress: bool = True) -> bool:
"""
上传asset文件
Args:
upload_url: 上传URL
file_path: 本地文件路径
show_progress: 是否显示上传进度条
Returns:
是否上传成功
"""
if not os.path.exists(file_path):
print(f"❌ 文件不存在: {file_path}")
return False
upload_headers = {
'Accept': 'application/json',
'Authorization': f'Bearer {self.token}',
}
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
try:
with open(file_path, 'rb') as file:
if show_progress and file_size > 0:
# 创建进度条
progress_bar = tqdm(
total=file_size,
unit='B',
unit_scale=True,
unit_divisor=1024,
desc=f"📤 上传 {file_name}",
ncols=80,
bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]'
)
# 创建一个包装器来更新进度条
class ProgressFileWrapper:
def __init__(self, file_obj, progress_bar):
self.file_obj = file_obj
self.progress_bar = progress_bar
def read(self, size=-1):
data = self.file_obj.read(size)
if data:
self.progress_bar.update(len(data))
return data
def __getattr__(self, name):
return getattr(self.file_obj, name)
wrapped_file = ProgressFileWrapper(file, progress_bar)
response = requests.put(upload_url, headers=upload_headers, data=wrapped_file)
progress_bar.close()
else:
response = requests.put(upload_url, headers=upload_headers, data=file)
response.raise_for_status()
print(f"📤 上传到 {upload_url} 返回结果: {response.status_code}")
try:
response_data = response.json()
print(f" 响应内容 (JSON): {json.dumps(response_data, indent=2, ensure_ascii=False)}")
except (json.JSONDecodeError, ValueError):
print(f" 响应内容 (文本): {response.text}")
return True
except requests.exceptions.RequestException as e:
print(f"❌ 文件上传失败 ({os.path.basename(file_path)}): {e}")
if hasattr(e, 'response') and e.response is not None:
print(f" 响应状态码: {e.response.status_code}")
print(f" 响应内容: {e.response.text}")
return False
def verify_upload(self, verify_url: str) -> bool:
"""
验证上传是否成功
Args:
verify_url: 验证URL
Returns:
是否验证成功
"""
try:
response = requests.post(verify_url, headers=self.headers)
response.raise_for_status()
print(f"🔍 验证请求返回状态: {response.status_code}")
try:
response_data = response.json()
print(f" 响应内容 (JSON): {json.dumps(response_data, indent=2, ensure_ascii=False)}")
except (json.JSONDecodeError, ValueError):
print(f" 响应内容 (文本): {response.text}")
return True
except requests.exceptions.RequestException as e:
print(f"❌ 验证上传失败: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f" 验证状态码: {e.response.status_code}")
print(f" 验证响应内容: {e.response.text}")
return False
def upload_multiple_assets(self, project_path: str, release_id: str,
asset_files: List[str], overwrite: bool = True, show_progress: bool = True) -> List[bool]:
"""
上传多个assets
Args:
project_path: 项目路径
release_id: release ID
asset_files: asset文件路径列表
overwrite: 是否覆盖现有文件
show_progress: 是否显示进度条
Returns:
每个文件的上传结果列表
"""
results = []
print(f"\n📦 开始上传 {len(asset_files)} 个文件到release {release_id}...")
# 计算总文件大小用于整体进度显示
total_size = 0
valid_files = []
for file_path in asset_files:
if os.path.exists(file_path):
total_size += os.path.getsize(file_path)
valid_files.append(file_path)
if show_progress and valid_files:
print(f"📊 总计 {len(valid_files)} 个有效文件,总大小: {total_size / 1024 / 1024:.2f} MB")
print("" + "=" * 60)
for i, file_path in enumerate(asset_files, 1):
if not os.path.exists(file_path):
print(f"❌ [{i}/{len(asset_files)}] 跳过不存在的文件: {file_path}")
results.append(False)
continue
file_size = os.path.getsize(file_path)
asset_name = os.path.basename(file_path)
print(f"\n📁 [{i}/{len(asset_files)}] 处理文件: {asset_name}")
print(f" 文件大小: {file_size:,} bytes ({file_size / 1024 / 1024:.2f} MB)")
# 1. 获取上传URL
upload_info = self.get_asset_upload_url(
project_path, release_id, asset_name, file_size, overwrite
)
if not upload_info:
results.append(False)
continue
# 2. 上传文件
upload_success = self.upload_asset(upload_info['upload_url'], file_path, show_progress)
time.sleep(1)
# 3. 验证上传如果有验证URL
final_success = upload_success
if upload_success and 'verify_url' in upload_info:
verify_success = self.verify_upload(upload_info['verify_url'])
final_success = verify_success
if verify_success:
print(f"✅ 文件 {asset_name} 上传并验证成功")
else:
print(f"❌ 文件 {asset_name} 上传成功但验证失败")
elif upload_success:
print(f"✅ 文件 {asset_name} 上传成功(无需验证)")
results.append(final_success)
# 避免请求过快
if i < len(asset_files):
time.sleep(1)
return results
def load_config_from_json(json_input: str) -> Dict:
"""
从JSON字符串或文件路径加载配置
Args:
json_input: JSON字符串或JSON文件路径
Returns:
配置字典
"""
try:
# 首先尝试作为JSON字符串解析
config = json.loads(json_input)
print("✅ 从JSON字符串加载配置成功")
return config
except json.JSONDecodeError:
# 如果失败,尝试作为文件路径
try:
if os.path.exists(json_input):
with open(json_input, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"✅ 从文件 {json_input} 加载配置成功")
return config
else:
raise FileNotFoundError(f"配置文件不存在: {json_input}")
except Exception as e:
raise ValueError(f"无法解析JSON配置: {e}")
def validate_config(config: Dict) -> None:
"""
验证配置的必需字段
Args:
config: 配置字典
Raises:
ValueError: 如果配置无效
"""
required_fields = ['token', 'project_path', 'release_data']
for field in required_fields:
if field not in config:
raise ValueError(f"缺少必需字段: {field}")
# 验证release_data必需字段
release_required = ['tag_name', 'name']
for field in release_required:
if field not in config['release_data']:
raise ValueError(f"release_data缺少必需字段: {field}")
# 验证asset_files
if 'asset_files' in config and not isinstance(config['asset_files'], list):
raise ValueError("asset_files必须是数组")
def main():
parser = argparse.ArgumentParser(description='CNB Release Uploader - JSON配置版本')
parser.add_argument('config', help='JSON配置字符串或JSON配置文件路径')
parser.add_argument('--dry-run', action='store_true', help='只验证配置,不执行上传')
parser.add_argument('--no-progress', action='store_true', help='禁用上传进度条显示')
args = parser.parse_args()
try:
# 加载配置
print("🔧 加载配置...")
config = load_config_from_json(args.config)
# 验证配置
print("🔍 验证配置...")
validate_config(config)
# 打印配置信息
print("\n📋 配置信息:")
print(f" Token: {'*' * 20}")
print(f" 项目路径: {config['project_path']}")
print(f" Release名称: {config['release_data']['name']}")
print(f" Tag名称: {config['release_data']['tag_name']}")
print(f" Asset文件数量: {len(config.get('asset_files', []))}")
if args.dry_run:
print("\n🧪 Dry-run模式配置验证通过退出程序")
return 0
# 创建上传器实例
uploader = CNBReleaseUploader(
token=config['token'],
base_url=config.get('base_url', 'https://api.cnb.cool')
)
# 1. 创建release
print("\n🚀 开始创建release...")
release_info = uploader.create_release(config['project_path'], config['release_data'])
if not release_info:
print("❌ 创建release失败退出程序")
return 1
release_id = release_info.get('id')
if not release_id:
print("❌ 无法获取release ID退出程序")
return 1
# 2. 上传assets如果有
asset_files = config.get('asset_files', [])
if asset_files:
overwrite = config.get('overwrite', True)
show_progress = not args.no_progress # 默认显示进度条,除非指定 --no-progress
results = uploader.upload_multiple_assets(
config['project_path'], release_id, asset_files, overwrite, show_progress
)
# 3. 显示结果
print("\n" + "=" * 50)
print("📊 上传结果汇总:")
print("=" * 50)
success_count = sum(results)
total_count = len(results)
for i, (file_path, success) in enumerate(zip(asset_files, results)):
status = "✅ 成功" if success else "❌ 失败"
print(f" [{i + 1}] {status} - {os.path.basename(file_path)}")
print(f"\n🎉 完成! 成功上传 {success_count}/{total_count} 个文件")
if success_count == total_count:
print("🎊 所有文件都上传成功!")
return 0
elif success_count > 0:
print("⚠️ 部分文件上传成功,请检查失败的文件")
return 2
else:
print("💥 所有文件上传失败,请检查配置和网络")
return 1
else:
print("\n📦 没有指定asset文件只创建了release")
return 0
except Exception as e:
print(f"❌ 程序执行失败: {e}")
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)