1
0
mirror of https://github.com/hanxi/xiaomusic.git synced 2026-03-18 08:43:17 +08:00

Auto-format code 🧹🌟🤖

This commit is contained in:
Formatter [BOT]
2025-12-25 13:47:21 +00:00
parent 67d93feec5
commit dd97299de3
6 changed files with 653 additions and 479 deletions

View File

@@ -5,7 +5,7 @@
import sys
sys.path.append('.')
sys.path.append(".")
from xiaomusic.config import Config
from xiaomusic.js_plugin_manager import JSPluginManager
@@ -18,9 +18,14 @@ def check_all_plugins():
config.verbose = True
class SimpleLogger:
def info(self, msg): print(f"[INFO] {msg}")
def error(self, msg): print(f"[ERROR] {msg}")
def debug(self, msg): print(f"[DEBUG] {msg}")
def info(self, msg):
print(f"[INFO] {msg}")
def error(self, msg):
print(f"[ERROR] {msg}")
def debug(self, msg):
print(f"[DEBUG] {msg}")
print("1. 创建插件管理器...")
manager = JSPluginManager(None)
@@ -28,6 +33,7 @@ def check_all_plugins():
manager.log = SimpleLogger()
import time
time.sleep(3) # 等待插件加载
print("\n2. 获取所有插件状态...")
@@ -39,7 +45,7 @@ def check_all_plugins():
failed_plugins = []
for plugin in plugins:
if plugin.get('loaded', False) and plugin.get('enabled', False):
if plugin.get("loaded", False) and plugin.get("enabled", False):
working_plugins.append(plugin)
else:
failed_plugins.append(plugin)
@@ -53,8 +59,9 @@ def check_all_plugins():
print(f"{plugin['name']}: {plugin.get('error', 'Unknown error')}")
# 清理
if hasattr(manager, 'node_process') and manager.node_process:
if hasattr(manager, "node_process") and manager.node_process:
manager.node_process.terminate()
if __name__ == "__main__":
check_all_plugins()

View File

@@ -95,7 +95,7 @@ security = HTTPBasic()
def verification(
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
):
current_username_bytes = credentials.username.encode("utf8")
correct_username_bytes = config.httpauth_username.encode("utf8")
@@ -255,25 +255,28 @@ def searchmusic(name: str = "", Verifcation=Depends(verification)):
@app.get("/api/search/online")
async def search_online_music(
keyword: str = Query(..., description="搜索关键词"),
plugin: str = Query("all", description="指定插件名称all表示搜索所有插件"),
page: int = Query(1, description="页码"),
limit: int = Query(20, description="每页数量"),
Verifcation=Depends(verification)
keyword: str = Query(..., description="搜索关键词"),
plugin: str = Query("all", description="指定插件名称all表示搜索所有插件"),
page: int = Query(1, description="页码"),
limit: int = Query(20, description="每页数量"),
Verifcation=Depends(verification),
):
"""在线音乐搜索API"""
try:
if not keyword:
return {"success": False, "error": "Keyword required"}
return await xiaomusic.get_music_list_online(keyword=keyword, plugin=plugin, page=page, limit=limit)
return await xiaomusic.get_music_list_online(
keyword=keyword, plugin=plugin, page=page, limit=limit
)
except Exception as e:
return {"success": False, "error": str(e)}
@app.get("/api/proxy/real-music-url")
async def get_real_music_url(url: str = Query(..., description="音乐下载URL"),
Verifcation=Depends(verification)):
async def get_real_music_url(
url: str = Query(..., description="音乐下载URL"), Verifcation=Depends(verification)
):
"""通过服务端代理获取真实的音乐播放URL避免CORS问题"""
try:
# 获取真实的音乐播放URL
@@ -282,17 +285,11 @@ async def get_real_music_url(url: str = Query(..., description="音乐下载URL"
except Exception as e:
log.error(f"获取真实音乐URL失败: {e}")
# 如果代理获取失败仍然返回原始URL
return {
"success": False,
"realUrl": url,
"error": str(e)
}
return {"success": False, "realUrl": url, "error": str(e)}
@app.post("/api/play/getMediaSource")
async def get_media_source(
request: Request, Verifcation=Depends(verification)
):
async def get_media_source(request: Request, Verifcation=Depends(verification)):
"""获取音乐真实播放URL"""
try:
# 获取请求数据
@@ -304,9 +301,7 @@ async def get_media_source(
@app.post("/api/play/getLyric")
async def get_media_lyric(
request: Request, Verifcation=Depends(verification)
):
async def get_media_lyric(request: Request, Verifcation=Depends(verification)):
"""获取音乐真实播放URL"""
try:
# 获取请求数据
@@ -318,23 +313,21 @@ async def get_media_lyric(
@app.post("/api/play/online")
async def play_online_music(
request: Request, Verifcation=Depends(verification)
):
async def play_online_music(request: Request, Verifcation=Depends(verification)):
"""设备端在线播放插件音乐"""
try:
# 获取请求数据
data = await request.json()
did = data.get('did')
did = data.get("did")
openapi_info = xiaomusic.js_plugin_manager.get_openapi_info()
if openapi_info.get("enabled", False):
media_source = await xiaomusic.get_real_url_of_openapi(data.get('url'))
media_source = await xiaomusic.get_real_url_of_openapi(data.get("url"))
else:
# 调用公共函数处理,获取音乐真实播放URL
media_source = await xiaomusic.get_media_source_url(data)
if not media_source or not media_source.get('url'):
if not media_source or not media_source.get("url"):
return {"success": False, "error": "Failed to get media source URL"}
url = media_source.get('url')
url = media_source.get("url")
decoded_url = urllib.parse.unquote(url)
return await xiaomusic.play_url(did=did, arg1=decoded_url)
except Exception as e:
@@ -343,14 +336,18 @@ async def play_online_music(
# =====================================插件入口函数===============
@app.get("/api/js-plugins")
def get_js_plugins(
enabled_only: bool = Query(False, description="是否只返回启用的插件"),
Verifcation=Depends(verification)
enabled_only: bool = Query(False, description="是否只返回启用的插件"),
Verifcation=Depends(verification),
):
"""获取插件列表"""
try:
if not hasattr(xiaomusic, 'js_plugin_manager') or not xiaomusic.js_plugin_manager:
if (
not hasattr(xiaomusic, "js_plugin_manager")
or not xiaomusic.js_plugin_manager
):
return {"success": False, "error": "JS Plugin Manager not available"}
# 重新加载插件
# xiaomusic.js_plugin_manager.reload_plugins()
@@ -369,7 +366,10 @@ def get_js_plugins(
def enable_js_plugin(plugin_name: str, Verifcation=Depends(verification)):
"""启用插件"""
try:
if not hasattr(xiaomusic, 'js_plugin_manager') or not xiaomusic.js_plugin_manager:
if (
not hasattr(xiaomusic, "js_plugin_manager")
or not xiaomusic.js_plugin_manager
):
return {"success": False, "error": "JS Plugin Manager not available"}
success = xiaomusic.js_plugin_manager.enable_plugin(plugin_name)
@@ -383,7 +383,10 @@ def enable_js_plugin(plugin_name: str, Verifcation=Depends(verification)):
def disable_js_plugin(plugin_name: str, Verifcation=Depends(verification)):
"""禁用插件"""
try:
if not hasattr(xiaomusic, 'js_plugin_manager') or not xiaomusic.js_plugin_manager:
if (
not hasattr(xiaomusic, "js_plugin_manager")
or not xiaomusic.js_plugin_manager
):
return {"success": False, "error": "JS Plugin Manager not available"}
success = xiaomusic.js_plugin_manager.disable_plugin(plugin_name)
@@ -397,7 +400,10 @@ def disable_js_plugin(plugin_name: str, Verifcation=Depends(verification)):
def uninstall_js_plugin(plugin_name: str, Verifcation=Depends(verification)):
"""卸载插件"""
try:
if not hasattr(xiaomusic, 'js_plugin_manager') or not xiaomusic.js_plugin_manager:
if (
not hasattr(xiaomusic, "js_plugin_manager")
or not xiaomusic.js_plugin_manager
):
return {"success": False, "error": "JS Plugin Manager not available"}
success = xiaomusic.js_plugin_manager.uninstall_plugin(plugin_name)
@@ -409,29 +415,35 @@ def uninstall_js_plugin(plugin_name: str, Verifcation=Depends(verification)):
@app.post("/api/js-plugins/upload")
async def upload_js_plugin(
file: UploadFile = File(...),
verification=Depends(verification)
file: UploadFile = File(...), verification=Depends(verification)
):
"""上传 JS 插件"""
try:
# 验证文件扩展名
if not file.filename.endswith('.js'):
if not file.filename.endswith(".js"):
raise HTTPException(status_code=400, detail="只允许上传 .js 文件")
# 使用 JSPluginManager 中定义的插件目录
if not hasattr(xiaomusic, 'js_plugin_manager') or not xiaomusic.js_plugin_manager:
raise HTTPException(status_code=500, detail="JS Plugin Manager not available")
if (
not hasattr(xiaomusic, "js_plugin_manager")
or not xiaomusic.js_plugin_manager
):
raise HTTPException(
status_code=500, detail="JS Plugin Manager not available"
)
plugin_dir = xiaomusic.js_plugin_manager.plugins_dir
os.makedirs(plugin_dir, exist_ok=True)
file_path = os.path.join(plugin_dir, file.filename)
# 校验是否已存在同名js插件 存在则提示,停止上传
if os.path.exists(file_path):
raise HTTPException(status_code=409, detail=f"插件 {file.filename} 已存在,请重命名后再上传")
raise HTTPException(
status_code=409, detail=f"插件 {file.filename} 已存在,请重命名后再上传"
)
file_path = os.path.join(plugin_dir, file.filename)
# 写入文件内容
async with aiofiles.open(file_path, 'wb') as f:
async with aiofiles.open(file_path, "wb") as f:
content = await file.read()
await f.write(content)
@@ -450,10 +462,9 @@ async def upload_js_plugin(
# =====================================开放接口配置函数===============
@app.get("/api/openapi/load")
def get_openapi_info(
Verifcation=Depends(verification)
):
def get_openapi_info(Verifcation=Depends(verification)):
"""获取开放接口配置信息"""
try:
openapi_info = xiaomusic.js_plugin_manager.get_openapi_info()
@@ -472,12 +483,12 @@ def toggle_openapi(Verifcation=Depends(verification)):
@app.post("/api/openapi/updateUrl")
async def update_openapi_url(request: Request, Verifcation=Depends(verification)):
async def update_openapi_url(request: Request, Verifcation=Depends(verification)):
"""更新开放接口地址"""
try:
request_json = await request.json()
search_url = request_json.get('search_url')
if not request_json or 'search_url' not in request_json:
search_url = request_json.get("search_url")
if not request_json or "search_url" not in request_json:
return {"success": False, "error": "Missing 'search_url' in request body"}
return xiaomusic.js_plugin_manager.update_openapi_url(search_url)
except Exception as e:
@@ -486,6 +497,7 @@ async def update_openapi_url(request: Request, Verifcation=Depends(verification
# =====================================开放接口函数END===============
@app.get("/playingmusic")
def playingmusic(did: str = "", Verifcation=Depends(verification)):
if not xiaomusic.did_exist(did):
@@ -577,7 +589,7 @@ async def musiclist(Verifcation=Depends(verification)):
@app.get("/musicinfo")
async def musicinfo(
name: str, musictag: bool = False, Verifcation=Depends(verification)
name: str, musictag: bool = False, Verifcation=Depends(verification)
):
url, _ = await xiaomusic.get_music_url(name)
info = {
@@ -592,9 +604,9 @@ async def musicinfo(
@app.get("/musicinfos")
async def musicinfos(
name: list[str] = Query(None),
musictag: bool = False,
Verifcation=Depends(verification),
name: list[str] = Query(None),
musictag: bool = False,
Verifcation=Depends(verification),
):
ret = []
for music_name in name:
@@ -957,7 +969,7 @@ class PlayListUpdateObj(BaseModel):
# 修改歌单名字
@app.post("/playlistupdatename")
async def playlistupdatename(
data: PlayListUpdateObj, Verifcation=Depends(verification)
data: PlayListUpdateObj, Verifcation=Depends(verification)
):
ret = xiaomusic.play_list_update_name(data.oldname, data.newname)
if ret:
@@ -1002,7 +1014,7 @@ async def playlistdelmusic(data: PlayListMusicObj, Verifcation=Depends(verificat
# 歌单更新歌曲
@app.post("/playlistupdatemusic")
async def playlistupdatemusic(
data: PlayListMusicObj, Verifcation=Depends(verification)
data: PlayListMusicObj, Verifcation=Depends(verification)
):
ret = xiaomusic.play_list_update_music(data.name, data.music_list)
if ret:
@@ -1023,7 +1035,7 @@ async def getplaylist(name: str, Verifcation=Depends(verification)):
# 更新版本
@app.post("/updateversion")
async def updateversion(
version: str = "", lite: bool = True, Verifcation=Depends(verification)
version: str = "", lite: bool = True, Verifcation=Depends(verification)
):
ret = await update_version(version, lite)
if ret != "OK":
@@ -1054,7 +1066,7 @@ def access_key_verification(file_path, key, code):
if key is not None:
current_key_bytes = key.encode("utf8")
correct_key_bytes = (
config.httpauth_username + config.httpauth_password
config.httpauth_username + config.httpauth_password
).encode("utf8")
is_correct_key = secrets.compare_digest(correct_key_bytes, current_key_bytes)
if is_correct_key:
@@ -1065,7 +1077,7 @@ def access_key_verification(file_path, key, code):
correct_code_bytes = (
hashlib.sha256(
(
file_path + config.httpauth_username + config.httpauth_password
file_path + config.httpauth_username + config.httpauth_password
).encode("utf-8")
)
.hexdigest()
@@ -1250,8 +1262,8 @@ JWT_EXPIRE_SECONDS = 60 * 5 # 5 分钟有效期(足够前端连接和重连
@app.get("/generate_ws_token")
def generate_ws_token(
did: str,
_: bool = Depends(verification), # 复用 HTTP Basic 验证
did: str,
_: bool = Depends(verification), # 复用 HTTP Basic 验证
):
if not xiaomusic.did_exist(did):
raise HTTPException(status_code=400, detail="Invalid did")

View File

@@ -14,7 +14,9 @@ class JSAdapter:
self.xiaomusic = xiaomusic
self.log = logging.getLogger(__name__)
def format_search_results(self, plugin_results: list[dict], plugin_name: str) -> list[str]:
def format_search_results(
self, plugin_results: list[dict], plugin_name: str
) -> list[str]:
"""格式化搜索结果为 xiaomusic 格式,返回 ID 列表"""
formatted_ids = []
for item in plugin_results:
@@ -23,20 +25,24 @@ class JSAdapter:
continue
# 构造符合 xiaomusic 格式的音乐项
music_id = self._generate_music_id(plugin_name, item.get('id', ''), item.get('songmid', ''))
music_id = self._generate_music_id(
plugin_name, item.get("id", ""), item.get("songmid", "")
)
music_item = {
'id': music_id,
'title': item.get('title', item.get('name', '')),
'artist': self._extract_artists(item),
'album': item.get('album', item.get('albumName', '')),
'source': 'online',
'plugin_name': plugin_name,
'original_data': item,
'duration': item.get('duration', 0),
'cover': item.get('artwork', item.get('cover', item.get('albumPic', ''))),
'url': item.get('url', ''),
'lyric': item.get('lyric', item.get('lrc', '')),
'quality': item.get('quality', ''),
"id": music_id,
"title": item.get("title", item.get("name", "")),
"artist": self._extract_artists(item),
"album": item.get("album", item.get("albumName", "")),
"source": "online",
"plugin_name": plugin_name,
"original_data": item,
"duration": item.get("duration", 0),
"cover": item.get(
"artwork", item.get("cover", item.get("albumPic", ""))
),
"url": item.get("url", ""),
"lyric": item.get("lyric", item.get("lrc", "")),
"quality": item.get("quality", ""),
}
# 添加到 all_music 字典中
@@ -45,15 +51,19 @@ class JSAdapter:
return formatted_ids
def format_media_source_result(self, media_source_result: dict, music_item: dict) -> dict:
def format_media_source_result(
self, media_source_result: dict, music_item: dict
) -> dict:
"""格式化媒体源结果"""
if not media_source_result:
return {}
formatted = {
'url': media_source_result.get('url', ''),
'headers': media_source_result.get('headers', {}),
'userAgent': media_source_result.get('userAgent', media_source_result.get('user_agent', ''))
"url": media_source_result.get("url", ""),
"headers": media_source_result.get("headers", {}),
"userAgent": media_source_result.get(
"userAgent", media_source_result.get("user_agent", "")
),
}
return formatted
@@ -61,18 +71,18 @@ class JSAdapter:
def format_lyric_result(self, lyric_result: dict) -> str:
"""格式化歌词结果为 lrc 格式字符串"""
if not lyric_result:
return ''
return ""
# 获取原始歌词和翻译
raw_lrc = lyric_result.get('rawLrc', lyric_result.get('raw_lrc', ''))
translation = lyric_result.get('translation', '')
raw_lrc = lyric_result.get("rawLrc", lyric_result.get("raw_lrc", ""))
translation = lyric_result.get("translation", "")
# 如果有翻译,合并歌词和翻译
if translation and raw_lrc:
# 这里可以实现歌词和翻译的合并逻辑
return f"{raw_lrc}\n{translation}"
return raw_lrc or translation or ''
return raw_lrc or translation or ""
def format_album_info_result(self, album_info_result: dict) -> dict:
"""格式化专辑信息结果"""
@@ -80,14 +90,18 @@ class JSAdapter:
return {}
formatted = {
'isEnd': album_info_result.get('isEnd', True),
'musicList': self.format_search_results(album_info_result.get('musicList', []), 'album'),
'albumItem': {
'title': album_info_result.get('albumItem', {}).get('title', ''),
'artist': album_info_result.get('albumItem', {}).get('artist', ''),
'cover': album_info_result.get('albumItem', {}).get('cover', ''),
'description': album_info_result.get('albumItem', {}).get('description', ''),
}
"isEnd": album_info_result.get("isEnd", True),
"musicList": self.format_search_results(
album_info_result.get("musicList", []), "album"
),
"albumItem": {
"title": album_info_result.get("albumItem", {}).get("title", ""),
"artist": album_info_result.get("albumItem", {}).get("artist", ""),
"cover": album_info_result.get("albumItem", {}).get("cover", ""),
"description": album_info_result.get("albumItem", {}).get(
"description", ""
),
},
}
return formatted
@@ -98,13 +112,17 @@ class JSAdapter:
return {}
formatted = {
'isEnd': music_sheet_result.get('isEnd', True),
'musicList': self.format_search_results(music_sheet_result.get('musicList', []), 'playlist'),
'sheetItem': {
'title': music_sheet_result.get('sheetItem', {}).get('title', ''),
'cover': music_sheet_result.get('sheetItem', {}).get('cover', ''),
'description': music_sheet_result.get('sheetItem', {}).get('description', ''),
}
"isEnd": music_sheet_result.get("isEnd", True),
"musicList": self.format_search_results(
music_sheet_result.get("musicList", []), "playlist"
),
"sheetItem": {
"title": music_sheet_result.get("sheetItem", {}).get("title", ""),
"cover": music_sheet_result.get("sheetItem", {}).get("cover", ""),
"description": music_sheet_result.get("sheetItem", {}).get(
"description", ""
),
},
}
return formatted
@@ -115,8 +133,10 @@ class JSAdapter:
return {}
formatted = {
'isEnd': artist_works_result.get('isEnd', True),
'data': self.format_search_results(artist_works_result.get('data', []), 'artist'),
"isEnd": artist_works_result.get("isEnd", True),
"data": self.format_search_results(
artist_works_result.get("data", []), "artist"
),
}
return formatted
@@ -128,19 +148,16 @@ class JSAdapter:
formatted = []
for group in top_lists_result:
formatted_group = {
'title': group.get('title', ''),
'data': []
}
formatted_group = {"title": group.get("title", ""), "data": []}
for item in group.get('data', []):
for item in group.get("data", []):
formatted_item = {
'id': item.get('id', ''),
'title': item.get('title', ''),
'description': item.get('description', ''),
'coverImg': item.get('coverImg', item.get('cover', '')),
"id": item.get("id", ""),
"title": item.get("title", ""),
"description": item.get("description", ""),
"coverImg": item.get("coverImg", item.get("cover", "")),
}
formatted_group['data'].append(formatted_item)
formatted_group["data"].append(formatted_item)
formatted.append(formatted_group)
@@ -152,14 +169,18 @@ class JSAdapter:
return {}
formatted = {
'isEnd': top_list_detail_result.get('isEnd', True),
'musicList': self.format_search_results(top_list_detail_result.get('musicList', []), 'toplist'),
'topListItem': top_list_detail_result.get('topListItem', {}),
"isEnd": top_list_detail_result.get("isEnd", True),
"musicList": self.format_search_results(
top_list_detail_result.get("musicList", []), "toplist"
),
"topListItem": top_list_detail_result.get("topListItem", {}),
}
return formatted
def _generate_music_id(self, plugin_name: str, item_id: str, fallback_id: str = '') -> str:
def _generate_music_id(
self, plugin_name: str, item_id: str, fallback_id: str = ""
) -> str:
"""生成唯一音乐ID"""
if item_id:
return f"online_{plugin_name}_{item_id}"
@@ -170,7 +191,7 @@ class JSAdapter:
def _extract_artists(self, item: dict) -> str:
"""提取艺术家信息"""
# 尝试多种可能的艺术家字段
artist_fields = ['artist', 'artists', 'singer', 'author', 'creator', 'singers']
artist_fields = ["artist", "artists", "singer", "author", "creator", "singers"]
for field in artist_fields:
if field in item:
@@ -180,36 +201,36 @@ class JSAdapter:
artists = []
for artist in value:
if isinstance(artist, dict):
artists.append(artist.get('name', str(artist)))
artists.append(artist.get("name", str(artist)))
else:
artists.append(str(artist))
return ', '.join(artists)
return ", ".join(artists)
elif isinstance(value, dict):
# 如果是艺术家对象
return value.get('name', str(value))
return value.get("name", str(value))
elif value:
return str(value)
# 如果没有找到艺术家信息,返回默认值
return '未知艺术家'
return "未知艺术家"
def convert_music_item_for_plugin(self, music_item: dict) -> dict:
"""将 xiaomusic 音乐项转换为插件兼容格式"""
# 如果原始数据存在,优先使用原始数据
if isinstance(music_item, dict) and 'original_data' in music_item:
return music_item['original_data']
if isinstance(music_item, dict) and "original_data" in music_item:
return music_item["original_data"]
# 否则构造一个基本的音乐项
converted = {
'id': music_item.get('id', ''),
'title': music_item.get('title', ''),
'artist': music_item.get('artist', ''),
'album': music_item.get('album', ''),
'url': music_item.get('url', ''),
'duration': music_item.get('duration', 0),
'artwork': music_item.get('cover', ''),
'lyric': music_item.get('lyric', ''),
'quality': music_item.get('quality', ''),
"id": music_item.get("id", ""),
"title": music_item.get("title", ""),
"artist": music_item.get("artist", ""),
"album": music_item.get("album", ""),
"url": music_item.get("url", ""),
"duration": music_item.get("duration", 0),
"artwork": music_item.get("cover", ""),
"lyric": music_item.get("lyric", ""),
"quality": music_item.get("quality", ""),
}
return converted

File diff suppressed because it is too large Load Diff

View File

@@ -200,7 +200,7 @@ def custom_sort_key(s):
def _get_depth_path(root, directory, depth):
# 计算当前目录的深度
relative_path = root[len(directory):].strip(os.sep)
relative_path = root[len(directory) :].strip(os.sep)
path_parts = relative_path.split(os.sep)
if len(path_parts) >= depth:
return os.path.join(directory, *path_parts[:depth])
@@ -231,7 +231,7 @@ def traverse_music_directory(directory, depth, exclude_dirs, support_extension):
dirs[:] = [d for d in dirs if d not in exclude_dirs]
# 计算当前目录的深度
current_depth = root[len(directory):].count(os.sep) + 1
current_depth = root[len(directory) :].count(os.sep) + 1
if current_depth > depth:
depth_path = _get_depth_path(root, directory, depth - 1)
_append_files_result(result, depth_path, root, files, support_extension)
@@ -242,14 +242,14 @@ def traverse_music_directory(directory, depth, exclude_dirs, support_extension):
# 发送给网页3thplay用于三者设备播放
async def thdplay(
action, args="/static/3thdplay.mp3", target="HTTP://192.168.1.10:58090/thdaction"
action, args="/static/3thdplay.mp3", target="HTTP://192.168.1.10:58090/thdaction"
):
# 接口地址 target,在参数文件指定
data = {"action": action, "args": args}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
target, json=data, timeout=5
target, json=data, timeout=5
) as response: # 增加超时以避免长时间挂起
# 如果响应不是200引发异常
response.raise_for_status()
@@ -276,7 +276,7 @@ async def downloadfile(url):
# 使用 aiohttp 创建一个客户端会话来发起请求
async with aiohttp.ClientSession() as session:
async with session.get(
cleaned_url, timeout=5
cleaned_url, timeout=5
) as response: # 增加超时以避免长时间挂起
# 如果响应不是200引发异常
response.raise_for_status()
@@ -345,11 +345,11 @@ async def get_web_music_duration(url, config):
cleaned_url = parsed_url.geturl()
async with aiohttp.ClientSession() as session:
async with session.get(
cleaned_url,
allow_redirects=True,
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36"
},
cleaned_url,
allow_redirects=True,
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36"
},
) as response:
url = str(response.url)
# 设置总超时时间为3秒
@@ -427,14 +427,18 @@ def get_duration_by_ffprobe(file_path, ffmpeg_location):
)
# 输出命令执行结果
log.info(f"命令执行结果 command result - return code: {result.returncode}, stdout: {result.stdout}")
log.info(
f"命令执行结果 command result - return code: {result.returncode}, stdout: {result.stdout}"
)
# 解析 JSON 输出
ffprobe_output = json.loads(result.stdout)
# 获取时长
duration = float(ffprobe_output["format"]["duration"])
log.info(f"Successfully extracted duration: {duration} seconds for file: {file_path}")
log.info(
f"Successfully extracted duration: {duration} seconds for file: {file_path}"
)
except Exception as e:
log.warning(f"Error getting local music {file_path} duration: {e}")
@@ -499,8 +503,8 @@ def remove_id3_tags(input_file: str, config) -> str:
# 检查是否存在ID3 v2.3或v2.4标签
if not (
audio.tags
and (audio.tags.version == (2, 3, 0) or audio.tags.version == (2, 4, 0))
audio.tags
and (audio.tags.version == (2, 3, 0) or audio.tags.version == (2, 4, 0))
):
return None
@@ -1148,7 +1152,7 @@ def remove_common_prefix(directory):
# 检查文件名是否以共同前缀开头
if filename.startswith(common_prefix):
# 构造新的文件名
new_filename = filename[len(common_prefix):]
new_filename = filename[len(common_prefix) :]
match = pattern.search(new_filename.strip())
if match:
num = match.group(1)
@@ -1339,10 +1343,10 @@ async def fetch_json_get(url, headers, config):
async with aiohttp.ClientSession(connector=connector) as session:
# 3. 发起带代理的GET请求
async with session.get(
url,
headers=headers,
proxy=proxy, # 传入格式化后的代理参数
timeout=10, # 超时时间(秒),避免无限等待
url,
headers=headers,
proxy=proxy, # 传入格式化后的代理参数
timeout=10, # 超时时间(秒),避免无限等待
) as response:
if response.status == 200:
data = await response.json()
@@ -1482,7 +1486,7 @@ class MusicUrlCache:
async def text_to_mp3(
text: str, save_dir: str, voice: str = "zh-CN-XiaoxiaoNeural"
text: str, save_dir: str, voice: str = "zh-CN-XiaoxiaoNeural"
) -> str:
"""
使用edge-tts将文本转换为MP3语音文件

View File

@@ -112,6 +112,7 @@ class XiaoMusic:
# 初始化 JS 插件管理器
try:
from xiaomusic.js_plugin_manager import JSPluginManager
self.js_plugin_manager = JSPluginManager(self)
self.log.info("JS Plugin Manager initialized successfully")
except Exception as e:
@@ -121,6 +122,7 @@ class XiaoMusic:
# 初始化 JS 插件适配器
try:
from xiaomusic.js_adapter import JSAdapter
self.js_adapter = JSAdapter(self)
self.log.info("JS Adapter initialized successfully")
except Exception as e:
@@ -148,8 +150,15 @@ class XiaoMusic:
self.log.warning("配置文件目录和音乐目录建议设置为不同的目录")
# 私有方法:调用插件方法的通用封装
async def __call_plugin_method(self, plugin_name: str, method_name: str, music_item: dict, result_key: str,
required_field: str = None, **kwargs):
async def __call_plugin_method(
self,
plugin_name: str,
method_name: str,
music_item: dict,
result_key: str,
required_field: str = None,
**kwargs,
):
"""
通用方法:调用 JS 插件的方法并返回结果
@@ -177,13 +186,22 @@ class XiaoMusic:
try:
# 调用插件方法,传递额外参数
result = getattr(self.js_plugin_manager, method_name)(plugin_name, music_item, **kwargs)
if not result or not result.get(result_key) or result.get(result_key) == 'None':
result = getattr(self.js_plugin_manager, method_name)(
plugin_name, music_item, **kwargs
)
if (
not result
or not result.get(result_key)
or result.get(result_key) == "None"
):
return {"success": False, "error": f"Failed to get {result_key}"}
# 如果指定了必填字段,则额外校验
if required_field and not result.get(required_field):
return {"success": False, "error": f"Missing required field: {required_field}"}
return {
"success": False,
"error": f"Missing required field: {required_field}",
}
# 追加属性后返回
result["success"] = True
return result
@@ -561,7 +579,7 @@ class XiaoMusic:
picture = tags["picture"]
if picture:
if picture.startswith(self.config.picture_cache_path):
picture = picture[len(self.config.picture_cache_path):]
picture = picture[len(self.config.picture_cache_path) :]
picture = picture.replace("\\", "/")
if picture.startswith("/"):
picture = picture[1:]
@@ -712,7 +730,7 @@ class XiaoMusic:
# 处理文件路径
if filename.startswith(self.config.music_path):
filename = filename[len(self.config.music_path):]
filename = filename[len(self.config.music_path) :]
filename = filename.replace("\\", "/")
if filename.startswith("/"):
filename = filename[1:]
@@ -802,7 +820,7 @@ class XiaoMusic:
# TODO: 网络歌曲获取歌曲额外信息
pass
elif os.path.exists(file_or_url) and not_in_dirs(
file_or_url, ignore_tag_absolute_dirs
file_or_url, ignore_tag_absolute_dirs
):
all_music_tags[name] = extract_audio_metadata(
file_or_url, self.config.picture_cache_path
@@ -839,7 +857,7 @@ class XiaoMusic:
if dir_name == os.path.basename(self.music_path):
dir_name = "其他"
if self.music_path != self.download_path and dir_name == os.path.basename(
self.download_path
self.download_path
):
dir_name = "下载"
if dir_name not in all_music_by_dir:
@@ -1013,7 +1031,7 @@ class XiaoMusic:
self.start_file_watch()
analytics_task = asyncio.create_task(self.analytics_task_daily())
assert (
analytics_task is not None
analytics_task is not None
) # to keep the reference to task, do not remove this
async with ClientSession() as session:
self.session = session
@@ -1126,11 +1144,11 @@ class XiaoMusic:
opvalue = self.config.key_word_dict.get(opkey)
if (
(not ctrl_panel)
and (not self.isplaying(did))
and self.active_cmd
and (opvalue not in self.active_cmd)
and (opkey not in self.active_cmd)
(not ctrl_panel)
and (not self.isplaying(did))
and self.active_cmd
and (opvalue not in self.active_cmd)
and (opkey not in self.active_cmd)
):
self.log.info(f"不在激活命令中 {opvalue}")
continue
@@ -1241,7 +1259,9 @@ class XiaoMusic:
# ===========================MusicFree插件函数================================
# 在线获取歌曲列表
async def get_music_list_online(self, plugin="all", keyword="", page=1, limit=20, **kwargs):
async def get_music_list_online(
self, plugin="all", keyword="", page=1, limit=20, **kwargs
):
self.log.info("在线获取歌曲列表!")
"""
在线获取歌曲列表
@@ -1256,9 +1276,14 @@ class XiaoMusic:
dict: 搜索结果
"""
openapi_info = self.js_plugin_manager.get_openapi_info()
if openapi_info.get("enabled", False) and openapi_info.get("search_url", "") != "":
if (
openapi_info.get("enabled", False)
and openapi_info.get("search_url", "") != ""
):
# 开放接口获取
return await self.js_plugin_manager.openapi_search(openapi_info.get("search_url"), keyword)
return await self.js_plugin_manager.openapi_search(
openapi_info.get("search_url"), keyword
)
else:
if not self.js_plugin_manager:
return {"success": False, "error": "JS Plugin Manager not available"}
@@ -1284,33 +1309,30 @@ class XiaoMusic:
# 验证URL格式
parsed_url = urlparse(url)
if not parsed_url.scheme or not parsed_url.netloc:
return {
"success": False,
"url": url,
"error": "Invalid URL format"
}
return {"success": False, "url": url, "error": "Invalid URL format"}
# 创建aiohttp客户端会话
async with aiohttp.ClientSession() as session:
# 发送HEAD请求跟随重定向
async with session.head(url, allow_redirects=True,
timeout=aiohttp.ClientTimeout(total=timeout)) as response:
async with session.head(
url,
allow_redirects=True,
timeout=aiohttp.ClientTimeout(total=timeout),
) as response:
# 获取最终重定向后的URL
final_url = str(response.url)
return {
"success": True,
"url": final_url,
"statusCode": response.status
"statusCode": response.status,
}
except Exception as e:
return {
"success": False,
"url": url,
"error": f"Error occurred: {str(e)}"
}
return {"success": False, "url": url, "error": f"Error occurred: {str(e)}"}
# 调用MusicFree插件获取歌曲列表
async def get_music_list_mf(self, plugin="all", keyword="", page=1, limit=20, **kwargs):
async def get_music_list_mf(
self, plugin="all", keyword="", page=1, limit=20, **kwargs
):
self.log.info("通过MusicFree插件搜索音乐列表!")
"""
通过MusicFree插件搜索音乐列表
@@ -1329,8 +1351,8 @@ class XiaoMusic:
if not self.js_plugin_manager:
return {"success": False, "error": "JS插件管理器不可用"}
# 如果关键词包含 '-',则提取歌手名、歌名
if '-' in keyword:
parts = keyword.split('-')
if "-" in keyword:
parts = keyword.split("-")
keyword = parts[0]
artist = parts[1]
else:
@@ -1341,7 +1363,9 @@ class XiaoMusic:
return await self._search_all_plugins(keyword, artist, page, limit)
else:
# 搜索指定插件
return await self._search_specific_plugin(plugin, keyword, artist, page, limit)
return await self._search_specific_plugin(
plugin, keyword, artist, page, limit
)
except Exception as e:
self.log.error(f"搜索音乐时发生错误: {e}")
return {"success": False, "error": str(e)}
@@ -1380,7 +1404,9 @@ class XiaoMusic:
if result and isinstance(result, dict):
# 检查是否有错误信息
if "error" in result:
self.log.error(f"插件 {plugin_name} 搜索失败: {result.get('error', '未知错误')}")
self.log.error(
f"插件 {plugin_name} 搜索失败: {result.get('error', '未知错误')}"
)
continue
# 处理成功的搜索结果
@@ -1400,9 +1426,9 @@ class XiaoMusic:
unified_result,
search_keyword=keyword,
limit=limit,
search_artist=artist
search_artist=artist,
)
results = optimized_result.get('data', [])
results = optimized_result.get("data", [])
return {
"success": True,
@@ -1410,7 +1436,7 @@ class XiaoMusic:
"total": len(results),
"sources": sources,
"page": page,
"limit": limit
"limit": limit,
}
async def _search_specific_plugin(self, plugin, keyword, artist, page, limit):
@@ -1419,22 +1445,19 @@ class XiaoMusic:
results = self.js_plugin_manager.search(plugin, keyword, page, limit)
# 额外检查 resources 字段
data_list = results.get('data', [])
data_list = results.get("data", [])
if data_list:
# 优化搜索结果排序
results = self.js_plugin_manager.optimize_search_results(
results,
search_keyword=keyword,
limit=limit,
search_artist=artist
results, search_keyword=keyword, limit=limit, search_artist=artist
)
return {
"success": True,
"data": results.get('data', []),
"total": results.get('total', 0),
"data": results.get("data", []),
"total": results.get("total", 0),
"page": page,
"limit": limit
"limit": limit,
}
except Exception as e:
self.log.error(f"插件 {plugin} 搜索失败: {e}")
@@ -1449,7 +1472,7 @@ class XiaoMusic:
raise e
# 调用MusicFree插件获取真实播放url
async def get_media_source_url(self, music_item, quality: str = 'standard'):
async def get_media_source_url(self, music_item, quality: str = "standard"):
"""获取音乐项的媒体源URL
Args:
music_item : MusicFree插件定义的 IMusicItem
@@ -1458,14 +1481,14 @@ class XiaoMusic:
dict: 包含成功状态和URL信息的字典
"""
# kwargs可追加
kwargs = {'quality': quality}
kwargs = {"quality": quality}
return await self.__call_plugin_method(
plugin_name=music_item.get('platform'),
plugin_name=music_item.get("platform"),
method_name="get_media_source",
music_item=music_item,
result_key="url",
required_field="url",
**kwargs
**kwargs,
)
# 调用MusicFree插件获取歌词
@@ -1477,11 +1500,11 @@ class XiaoMusic:
dict: 包含成功状态和URL信息的字典
"""
return await self.__call_plugin_method(
plugin_name=music_item.get('platform'),
plugin_name=music_item.get("platform"),
method_name="get_lyric",
music_item=music_item,
result_key="rawLrc",
required_field="rawLrc"
required_field="rawLrc",
)
# 调用在线搜索歌曲,并优化返回
@@ -1500,26 +1523,30 @@ class XiaoMusic:
result = await self.get_music_list_online(keyword=name, limit=10)
self.log.info(f"在线搜索歌曲列表: {result}")
if result.get('success') and result.get('total') > 0:
if result.get("success") and result.get("total") > 0:
# 打印输出 result.data
self.log.info(f"歌曲列表: {result.get('data')}")
# 根据搜素关键字智能搜索出最符合的一条music_item
music_item = await self._search_top_one(result.get('data'), search_key, name)
music_item = await self._search_top_one(
result.get("data"), search_key, name
)
# 验证 music_item 是否为字典类型
if not isinstance(music_item, dict):
self.log.error(f"music_item should be a dict, but got {type(music_item)}: {music_item}")
self.log.error(
f"music_item should be a dict, but got {type(music_item)}: {music_item}"
)
return {"success": False, "error": "Invalid music item format"}
# 如果是OpenAPI则需要转换播放链接
openapi_info = self.js_plugin_manager.get_openapi_info()
if openapi_info.get("enabled", False):
return await self.get_real_url_of_openapi(music_item.get('url'))
return await self.get_real_url_of_openapi(music_item.get("url"))
else:
media_source = await self.get_media_source_url(music_item)
if media_source.get('success'):
return {"success": True, "url": media_source.get('url')}
if media_source.get("success"):
return {"success": True, "url": media_source.get("url")}
else:
return {"success": False, "error": media_source.get('error')}
return {"success": False, "error": media_source.get("error")}
else:
return {"success": False, "error": "未找到歌曲"}
@@ -1543,8 +1570,8 @@ class XiaoMusic:
# 计算每个项目的匹配分数
def calculate_match_score(item):
"""计算匹配分数"""
title = item.get('title', '').lower() if item.get('title') else ''
artist = item.get('artist', '').lower() if item.get('artist') else ''
title = item.get("title", "").lower() if item.get("title") else ""
artist = item.get("artist", "").lower() if item.get("artist") else ""
keyword = search_key.lower()
if not keyword:
@@ -1692,7 +1719,7 @@ class XiaoMusic:
async def online_play(self, did="", arg1="", **kwargs):
# 先推送默认【搜索中】音频搜索到播放url后推送给小爱
config = self.config
if config and hasattr(config, 'hostname') and hasattr(config, 'public_port'):
if config and hasattr(config, "hostname") and hasattr(config, "public_port"):
proxy_base = f"{config.hostname}:{config.public_port}"
else:
proxy_base = "http://192.168.31.241:8090"
@@ -1718,7 +1745,7 @@ class XiaoMusic:
# 后台搜索播放
async def do_play(
self, did, name, search_key="", exact=False, update_cur_list=False
self, did, name, search_key="", exact=False, update_cur_list=False
):
return await self.devices[did].play(name, search_key, exact, update_cur_list)
@@ -2114,7 +2141,7 @@ class XiaoMusicDevice:
# 更新总播放列表为了UI显示
self.xiaomusic.music_list["临时搜索列表"] = copy.copy(self._play_list)
elif (
self.device.cur_playlist == "临时搜索列表" and len(self._play_list) == 0
self.device.cur_playlist == "临时搜索列表" and len(self._play_list) == 0
) or (self.device.cur_playlist not in self.xiaomusic.music_list):
self.device.cur_playlist = "全部"
else:
@@ -2205,13 +2232,13 @@ class XiaoMusicDevice:
self.log.info("开始播放下一首")
name = self.get_cur_music()
if (
self.device.play_type == PLAY_TYPE_ALL
or self.device.play_type == PLAY_TYPE_RND
or self.device.play_type == PLAY_TYPE_SEQ
or name == ""
or (
self.device.play_type == PLAY_TYPE_ALL
or self.device.play_type == PLAY_TYPE_RND
or self.device.play_type == PLAY_TYPE_SEQ
or name == ""
or (
(name not in self._play_list) and self.device.play_type != PLAY_TYPE_ONE
)
)
):
name = self.get_next_music()
self.log.info(f"_play_next. name:{name}, cur_music:{self.get_cur_music()}")
@@ -2228,10 +2255,10 @@ class XiaoMusicDevice:
self.log.info("开始播放上一首")
name = self.get_cur_music()
if (
self.device.play_type == PLAY_TYPE_ALL
or self.device.play_type == PLAY_TYPE_RND
or name == ""
or (name not in self._play_list)
self.device.play_type == PLAY_TYPE_ALL
or self.device.play_type == PLAY_TYPE_RND
or name == ""
or (name not in self._play_list)
):
name = self.get_prev_music()
self.log.info(f"_play_prev. name:{name}, cur_music:{self.get_cur_music()}")
@@ -2304,9 +2331,9 @@ class XiaoMusicDevice:
self.log.info(f"播放 {name} 失败. 失败次数: {self._play_failed_cnt}")
await asyncio.sleep(1)
if (
self.isplaying()
and self._last_cmd != "stop"
and self._play_failed_cnt < 10
self.isplaying()
and self._last_cmd != "stop"
and self._play_failed_cnt < 10
):
self._play_failed_cnt = self._play_failed_cnt + 1
await self._play_next()
@@ -2360,8 +2387,8 @@ class XiaoMusicDevice:
self.log.info(playing_info)
# WTF xiaomi api
is_playing = (
json.loads(playing_info.get("data", {}).get("info", "{}")).get("status", -1)
== 1
json.loads(playing_info.get("data", {}).get("info", "{}")).get("status", -1)
== 1
)
return is_playing
@@ -2480,8 +2507,8 @@ class XiaoMusicDevice:
if direction == "next":
new_index = index + 1
if (
self.device.play_type == PLAY_TYPE_SEQ
and new_index >= play_list_len
self.device.play_type == PLAY_TYPE_SEQ
and new_index >= play_list_len
):
self.log.info("顺序播放结束")
return ""
@@ -2568,7 +2595,7 @@ class XiaoMusicDevice:
f"play_one_url continue_play device_id:{device_id} ret:{ret} url:{url} audio_id:{audio_id}"
)
elif self.config.use_music_api or (
self.hardware in NEED_USE_PLAY_MUSIC_API
self.hardware in NEED_USE_PLAY_MUSIC_API
):
ret = await self.xiaomusic.mina_service.play_by_music_url(
device_id, url, audio_id=audio_id
@@ -2777,7 +2804,7 @@ class XiaoMusicDevice:
return "最近新增"
for list_name, play_list in self.xiaomusic.music_list.items():
if (list_name not in ["全部", "所有歌曲", "所有电台", "临时搜索列表"]) and (
name in play_list
name in play_list
):
return list_name
if name in self.xiaomusic.music_list.get("所有歌曲", []):