diff --git a/check_plugins.py b/check_plugins.py index 3b843e4..95d0d0e 100644 --- a/check_plugins.py +++ b/check_plugins.py @@ -5,7 +5,7 @@ import sys -sys.path.append('.') +sys.path.append(".") from xiaomusic.config import Config from xiaomusic.js_plugin_manager import JSPluginManager @@ -18,9 +18,14 @@ def check_all_plugins(): config.verbose = True class SimpleLogger: - def info(self, msg): print(f"[INFO] {msg}") - def error(self, msg): print(f"[ERROR] {msg}") - def debug(self, msg): print(f"[DEBUG] {msg}") + def info(self, msg): + print(f"[INFO] {msg}") + + def error(self, msg): + print(f"[ERROR] {msg}") + + def debug(self, msg): + print(f"[DEBUG] {msg}") print("1. 创建插件管理器...") manager = JSPluginManager(None) @@ -28,6 +33,7 @@ def check_all_plugins(): manager.log = SimpleLogger() import time + time.sleep(3) # 等待插件加载 print("\n2. 获取所有插件状态...") @@ -39,7 +45,7 @@ def check_all_plugins(): failed_plugins = [] for plugin in plugins: - if plugin.get('loaded', False) and plugin.get('enabled', False): + if plugin.get("loaded", False) and plugin.get("enabled", False): working_plugins.append(plugin) else: failed_plugins.append(plugin) @@ -53,8 +59,9 @@ def check_all_plugins(): print(f" ✗ {plugin['name']}: {plugin.get('error', 'Unknown error')}") # 清理 - if hasattr(manager, 'node_process') and manager.node_process: + if hasattr(manager, "node_process") and manager.node_process: manager.node_process.terminate() + if __name__ == "__main__": check_all_plugins() diff --git a/xiaomusic/httpserver.py b/xiaomusic/httpserver.py index e3de744..22dfb9a 100644 --- a/xiaomusic/httpserver.py +++ b/xiaomusic/httpserver.py @@ -95,7 +95,7 @@ security = HTTPBasic() def verification( - credentials: Annotated[HTTPBasicCredentials, Depends(security)], + credentials: Annotated[HTTPBasicCredentials, Depends(security)], ): current_username_bytes = credentials.username.encode("utf8") correct_username_bytes = config.httpauth_username.encode("utf8") @@ -255,25 +255,28 @@ def searchmusic(name: str = "", Verifcation=Depends(verification)): @app.get("/api/search/online") async def search_online_music( - keyword: str = Query(..., description="搜索关键词"), - plugin: str = Query("all", description="指定插件名称,all表示搜索所有插件"), - page: int = Query(1, description="页码"), - limit: int = Query(20, description="每页数量"), - Verifcation=Depends(verification) + keyword: str = Query(..., description="搜索关键词"), + plugin: str = Query("all", description="指定插件名称,all表示搜索所有插件"), + page: int = Query(1, description="页码"), + limit: int = Query(20, description="每页数量"), + Verifcation=Depends(verification), ): """在线音乐搜索API""" try: if not keyword: return {"success": False, "error": "Keyword required"} - return await xiaomusic.get_music_list_online(keyword=keyword, plugin=plugin, page=page, limit=limit) + return await xiaomusic.get_music_list_online( + keyword=keyword, plugin=plugin, page=page, limit=limit + ) except Exception as e: return {"success": False, "error": str(e)} @app.get("/api/proxy/real-music-url") -async def get_real_music_url(url: str = Query(..., description="音乐下载URL"), - Verifcation=Depends(verification)): +async def get_real_music_url( + url: str = Query(..., description="音乐下载URL"), Verifcation=Depends(verification) +): """通过服务端代理获取真实的音乐播放URL,避免CORS问题""" try: # 获取真实的音乐播放URL @@ -282,17 +285,11 @@ async def get_real_music_url(url: str = Query(..., description="音乐下载URL" except Exception as e: log.error(f"获取真实音乐URL失败: {e}") # 如果代理获取失败,仍然返回原始URL - return { - "success": False, - "realUrl": url, - "error": str(e) - } + return {"success": False, "realUrl": url, "error": str(e)} @app.post("/api/play/getMediaSource") -async def get_media_source( - request: Request, Verifcation=Depends(verification) -): +async def get_media_source(request: Request, Verifcation=Depends(verification)): """获取音乐真实播放URL""" try: # 获取请求数据 @@ -304,9 +301,7 @@ async def get_media_source( @app.post("/api/play/getLyric") -async def get_media_lyric( - request: Request, Verifcation=Depends(verification) -): +async def get_media_lyric(request: Request, Verifcation=Depends(verification)): """获取音乐真实播放URL""" try: # 获取请求数据 @@ -318,23 +313,21 @@ async def get_media_lyric( @app.post("/api/play/online") -async def play_online_music( - request: Request, Verifcation=Depends(verification) -): +async def play_online_music(request: Request, Verifcation=Depends(verification)): """设备端在线播放插件音乐""" try: # 获取请求数据 data = await request.json() - did = data.get('did') + did = data.get("did") openapi_info = xiaomusic.js_plugin_manager.get_openapi_info() if openapi_info.get("enabled", False): - media_source = await xiaomusic.get_real_url_of_openapi(data.get('url')) + media_source = await xiaomusic.get_real_url_of_openapi(data.get("url")) else: # 调用公共函数处理,获取音乐真实播放URL media_source = await xiaomusic.get_media_source_url(data) - if not media_source or not media_source.get('url'): + if not media_source or not media_source.get("url"): return {"success": False, "error": "Failed to get media source URL"} - url = media_source.get('url') + url = media_source.get("url") decoded_url = urllib.parse.unquote(url) return await xiaomusic.play_url(did=did, arg1=decoded_url) except Exception as e: @@ -343,14 +336,18 @@ async def play_online_music( # =====================================插件入口函数=============== + @app.get("/api/js-plugins") def get_js_plugins( - enabled_only: bool = Query(False, description="是否只返回启用的插件"), - Verifcation=Depends(verification) + enabled_only: bool = Query(False, description="是否只返回启用的插件"), + Verifcation=Depends(verification), ): """获取插件列表""" try: - if not hasattr(xiaomusic, 'js_plugin_manager') or not xiaomusic.js_plugin_manager: + if ( + not hasattr(xiaomusic, "js_plugin_manager") + or not xiaomusic.js_plugin_manager + ): return {"success": False, "error": "JS Plugin Manager not available"} # 重新加载插件 # xiaomusic.js_plugin_manager.reload_plugins() @@ -369,7 +366,10 @@ def get_js_plugins( def enable_js_plugin(plugin_name: str, Verifcation=Depends(verification)): """启用插件""" try: - if not hasattr(xiaomusic, 'js_plugin_manager') or not xiaomusic.js_plugin_manager: + if ( + not hasattr(xiaomusic, "js_plugin_manager") + or not xiaomusic.js_plugin_manager + ): return {"success": False, "error": "JS Plugin Manager not available"} success = xiaomusic.js_plugin_manager.enable_plugin(plugin_name) @@ -383,7 +383,10 @@ def enable_js_plugin(plugin_name: str, Verifcation=Depends(verification)): def disable_js_plugin(plugin_name: str, Verifcation=Depends(verification)): """禁用插件""" try: - if not hasattr(xiaomusic, 'js_plugin_manager') or not xiaomusic.js_plugin_manager: + if ( + not hasattr(xiaomusic, "js_plugin_manager") + or not xiaomusic.js_plugin_manager + ): return {"success": False, "error": "JS Plugin Manager not available"} success = xiaomusic.js_plugin_manager.disable_plugin(plugin_name) @@ -397,7 +400,10 @@ def disable_js_plugin(plugin_name: str, Verifcation=Depends(verification)): def uninstall_js_plugin(plugin_name: str, Verifcation=Depends(verification)): """卸载插件""" try: - if not hasattr(xiaomusic, 'js_plugin_manager') or not xiaomusic.js_plugin_manager: + if ( + not hasattr(xiaomusic, "js_plugin_manager") + or not xiaomusic.js_plugin_manager + ): return {"success": False, "error": "JS Plugin Manager not available"} success = xiaomusic.js_plugin_manager.uninstall_plugin(plugin_name) @@ -409,29 +415,35 @@ def uninstall_js_plugin(plugin_name: str, Verifcation=Depends(verification)): @app.post("/api/js-plugins/upload") async def upload_js_plugin( - file: UploadFile = File(...), - verification=Depends(verification) + file: UploadFile = File(...), verification=Depends(verification) ): """上传 JS 插件""" try: # 验证文件扩展名 - if not file.filename.endswith('.js'): + if not file.filename.endswith(".js"): raise HTTPException(status_code=400, detail="只允许上传 .js 文件") # 使用 JSPluginManager 中定义的插件目录 - if not hasattr(xiaomusic, 'js_plugin_manager') or not xiaomusic.js_plugin_manager: - raise HTTPException(status_code=500, detail="JS Plugin Manager not available") + if ( + not hasattr(xiaomusic, "js_plugin_manager") + or not xiaomusic.js_plugin_manager + ): + raise HTTPException( + status_code=500, detail="JS Plugin Manager not available" + ) plugin_dir = xiaomusic.js_plugin_manager.plugins_dir os.makedirs(plugin_dir, exist_ok=True) file_path = os.path.join(plugin_dir, file.filename) # 校验是否已存在同名js插件 存在则提示,停止上传 if os.path.exists(file_path): - raise HTTPException(status_code=409, detail=f"插件 {file.filename} 已存在,请重命名后再上传") + raise HTTPException( + status_code=409, detail=f"插件 {file.filename} 已存在,请重命名后再上传" + ) file_path = os.path.join(plugin_dir, file.filename) # 写入文件内容 - async with aiofiles.open(file_path, 'wb') as f: + async with aiofiles.open(file_path, "wb") as f: content = await file.read() await f.write(content) @@ -450,10 +462,9 @@ async def upload_js_plugin( # =====================================开放接口配置函数=============== + @app.get("/api/openapi/load") -def get_openapi_info( - Verifcation=Depends(verification) -): +def get_openapi_info(Verifcation=Depends(verification)): """获取开放接口配置信息""" try: openapi_info = xiaomusic.js_plugin_manager.get_openapi_info() @@ -472,12 +483,12 @@ def toggle_openapi(Verifcation=Depends(verification)): @app.post("/api/openapi/updateUrl") -async def update_openapi_url(request: Request, Verifcation=Depends(verification)): +async def update_openapi_url(request: Request, Verifcation=Depends(verification)): """更新开放接口地址""" try: request_json = await request.json() - search_url = request_json.get('search_url') - if not request_json or 'search_url' not in request_json: + search_url = request_json.get("search_url") + if not request_json or "search_url" not in request_json: return {"success": False, "error": "Missing 'search_url' in request body"} return xiaomusic.js_plugin_manager.update_openapi_url(search_url) except Exception as e: @@ -486,6 +497,7 @@ async def update_openapi_url(request: Request, Verifcation=Depends(verification # =====================================开放接口函数END=============== + @app.get("/playingmusic") def playingmusic(did: str = "", Verifcation=Depends(verification)): if not xiaomusic.did_exist(did): @@ -577,7 +589,7 @@ async def musiclist(Verifcation=Depends(verification)): @app.get("/musicinfo") async def musicinfo( - name: str, musictag: bool = False, Verifcation=Depends(verification) + name: str, musictag: bool = False, Verifcation=Depends(verification) ): url, _ = await xiaomusic.get_music_url(name) info = { @@ -592,9 +604,9 @@ async def musicinfo( @app.get("/musicinfos") async def musicinfos( - name: list[str] = Query(None), - musictag: bool = False, - Verifcation=Depends(verification), + name: list[str] = Query(None), + musictag: bool = False, + Verifcation=Depends(verification), ): ret = [] for music_name in name: @@ -957,7 +969,7 @@ class PlayListUpdateObj(BaseModel): # 修改歌单名字 @app.post("/playlistupdatename") async def playlistupdatename( - data: PlayListUpdateObj, Verifcation=Depends(verification) + data: PlayListUpdateObj, Verifcation=Depends(verification) ): ret = xiaomusic.play_list_update_name(data.oldname, data.newname) if ret: @@ -1002,7 +1014,7 @@ async def playlistdelmusic(data: PlayListMusicObj, Verifcation=Depends(verificat # 歌单更新歌曲 @app.post("/playlistupdatemusic") async def playlistupdatemusic( - data: PlayListMusicObj, Verifcation=Depends(verification) + data: PlayListMusicObj, Verifcation=Depends(verification) ): ret = xiaomusic.play_list_update_music(data.name, data.music_list) if ret: @@ -1023,7 +1035,7 @@ async def getplaylist(name: str, Verifcation=Depends(verification)): # 更新版本 @app.post("/updateversion") async def updateversion( - version: str = "", lite: bool = True, Verifcation=Depends(verification) + version: str = "", lite: bool = True, Verifcation=Depends(verification) ): ret = await update_version(version, lite) if ret != "OK": @@ -1054,7 +1066,7 @@ def access_key_verification(file_path, key, code): if key is not None: current_key_bytes = key.encode("utf8") correct_key_bytes = ( - config.httpauth_username + config.httpauth_password + config.httpauth_username + config.httpauth_password ).encode("utf8") is_correct_key = secrets.compare_digest(correct_key_bytes, current_key_bytes) if is_correct_key: @@ -1065,7 +1077,7 @@ def access_key_verification(file_path, key, code): correct_code_bytes = ( hashlib.sha256( ( - file_path + config.httpauth_username + config.httpauth_password + file_path + config.httpauth_username + config.httpauth_password ).encode("utf-8") ) .hexdigest() @@ -1250,8 +1262,8 @@ JWT_EXPIRE_SECONDS = 60 * 5 # 5 分钟有效期(足够前端连接和重连 @app.get("/generate_ws_token") def generate_ws_token( - did: str, - _: bool = Depends(verification), # 复用 HTTP Basic 验证 + did: str, + _: bool = Depends(verification), # 复用 HTTP Basic 验证 ): if not xiaomusic.did_exist(did): raise HTTPException(status_code=400, detail="Invalid did") diff --git a/xiaomusic/js_adapter.py b/xiaomusic/js_adapter.py index 267302a..b8bd317 100644 --- a/xiaomusic/js_adapter.py +++ b/xiaomusic/js_adapter.py @@ -14,7 +14,9 @@ class JSAdapter: self.xiaomusic = xiaomusic self.log = logging.getLogger(__name__) - def format_search_results(self, plugin_results: list[dict], plugin_name: str) -> list[str]: + def format_search_results( + self, plugin_results: list[dict], plugin_name: str + ) -> list[str]: """格式化搜索结果为 xiaomusic 格式,返回 ID 列表""" formatted_ids = [] for item in plugin_results: @@ -23,20 +25,24 @@ class JSAdapter: continue # 构造符合 xiaomusic 格式的音乐项 - music_id = self._generate_music_id(plugin_name, item.get('id', ''), item.get('songmid', '')) + music_id = self._generate_music_id( + plugin_name, item.get("id", ""), item.get("songmid", "") + ) music_item = { - 'id': music_id, - 'title': item.get('title', item.get('name', '')), - 'artist': self._extract_artists(item), - 'album': item.get('album', item.get('albumName', '')), - 'source': 'online', - 'plugin_name': plugin_name, - 'original_data': item, - 'duration': item.get('duration', 0), - 'cover': item.get('artwork', item.get('cover', item.get('albumPic', ''))), - 'url': item.get('url', ''), - 'lyric': item.get('lyric', item.get('lrc', '')), - 'quality': item.get('quality', ''), + "id": music_id, + "title": item.get("title", item.get("name", "")), + "artist": self._extract_artists(item), + "album": item.get("album", item.get("albumName", "")), + "source": "online", + "plugin_name": plugin_name, + "original_data": item, + "duration": item.get("duration", 0), + "cover": item.get( + "artwork", item.get("cover", item.get("albumPic", "")) + ), + "url": item.get("url", ""), + "lyric": item.get("lyric", item.get("lrc", "")), + "quality": item.get("quality", ""), } # 添加到 all_music 字典中 @@ -45,15 +51,19 @@ class JSAdapter: return formatted_ids - def format_media_source_result(self, media_source_result: dict, music_item: dict) -> dict: + def format_media_source_result( + self, media_source_result: dict, music_item: dict + ) -> dict: """格式化媒体源结果""" if not media_source_result: return {} formatted = { - 'url': media_source_result.get('url', ''), - 'headers': media_source_result.get('headers', {}), - 'userAgent': media_source_result.get('userAgent', media_source_result.get('user_agent', '')) + "url": media_source_result.get("url", ""), + "headers": media_source_result.get("headers", {}), + "userAgent": media_source_result.get( + "userAgent", media_source_result.get("user_agent", "") + ), } return formatted @@ -61,18 +71,18 @@ class JSAdapter: def format_lyric_result(self, lyric_result: dict) -> str: """格式化歌词结果为 lrc 格式字符串""" if not lyric_result: - return '' + return "" # 获取原始歌词和翻译 - raw_lrc = lyric_result.get('rawLrc', lyric_result.get('raw_lrc', '')) - translation = lyric_result.get('translation', '') + raw_lrc = lyric_result.get("rawLrc", lyric_result.get("raw_lrc", "")) + translation = lyric_result.get("translation", "") # 如果有翻译,合并歌词和翻译 if translation and raw_lrc: # 这里可以实现歌词和翻译的合并逻辑 return f"{raw_lrc}\n{translation}" - return raw_lrc or translation or '' + return raw_lrc or translation or "" def format_album_info_result(self, album_info_result: dict) -> dict: """格式化专辑信息结果""" @@ -80,14 +90,18 @@ class JSAdapter: return {} formatted = { - 'isEnd': album_info_result.get('isEnd', True), - 'musicList': self.format_search_results(album_info_result.get('musicList', []), 'album'), - 'albumItem': { - 'title': album_info_result.get('albumItem', {}).get('title', ''), - 'artist': album_info_result.get('albumItem', {}).get('artist', ''), - 'cover': album_info_result.get('albumItem', {}).get('cover', ''), - 'description': album_info_result.get('albumItem', {}).get('description', ''), - } + "isEnd": album_info_result.get("isEnd", True), + "musicList": self.format_search_results( + album_info_result.get("musicList", []), "album" + ), + "albumItem": { + "title": album_info_result.get("albumItem", {}).get("title", ""), + "artist": album_info_result.get("albumItem", {}).get("artist", ""), + "cover": album_info_result.get("albumItem", {}).get("cover", ""), + "description": album_info_result.get("albumItem", {}).get( + "description", "" + ), + }, } return formatted @@ -98,13 +112,17 @@ class JSAdapter: return {} formatted = { - 'isEnd': music_sheet_result.get('isEnd', True), - 'musicList': self.format_search_results(music_sheet_result.get('musicList', []), 'playlist'), - 'sheetItem': { - 'title': music_sheet_result.get('sheetItem', {}).get('title', ''), - 'cover': music_sheet_result.get('sheetItem', {}).get('cover', ''), - 'description': music_sheet_result.get('sheetItem', {}).get('description', ''), - } + "isEnd": music_sheet_result.get("isEnd", True), + "musicList": self.format_search_results( + music_sheet_result.get("musicList", []), "playlist" + ), + "sheetItem": { + "title": music_sheet_result.get("sheetItem", {}).get("title", ""), + "cover": music_sheet_result.get("sheetItem", {}).get("cover", ""), + "description": music_sheet_result.get("sheetItem", {}).get( + "description", "" + ), + }, } return formatted @@ -115,8 +133,10 @@ class JSAdapter: return {} formatted = { - 'isEnd': artist_works_result.get('isEnd', True), - 'data': self.format_search_results(artist_works_result.get('data', []), 'artist'), + "isEnd": artist_works_result.get("isEnd", True), + "data": self.format_search_results( + artist_works_result.get("data", []), "artist" + ), } return formatted @@ -128,19 +148,16 @@ class JSAdapter: formatted = [] for group in top_lists_result: - formatted_group = { - 'title': group.get('title', ''), - 'data': [] - } + formatted_group = {"title": group.get("title", ""), "data": []} - for item in group.get('data', []): + for item in group.get("data", []): formatted_item = { - 'id': item.get('id', ''), - 'title': item.get('title', ''), - 'description': item.get('description', ''), - 'coverImg': item.get('coverImg', item.get('cover', '')), + "id": item.get("id", ""), + "title": item.get("title", ""), + "description": item.get("description", ""), + "coverImg": item.get("coverImg", item.get("cover", "")), } - formatted_group['data'].append(formatted_item) + formatted_group["data"].append(formatted_item) formatted.append(formatted_group) @@ -152,14 +169,18 @@ class JSAdapter: return {} formatted = { - 'isEnd': top_list_detail_result.get('isEnd', True), - 'musicList': self.format_search_results(top_list_detail_result.get('musicList', []), 'toplist'), - 'topListItem': top_list_detail_result.get('topListItem', {}), + "isEnd": top_list_detail_result.get("isEnd", True), + "musicList": self.format_search_results( + top_list_detail_result.get("musicList", []), "toplist" + ), + "topListItem": top_list_detail_result.get("topListItem", {}), } return formatted - def _generate_music_id(self, plugin_name: str, item_id: str, fallback_id: str = '') -> str: + def _generate_music_id( + self, plugin_name: str, item_id: str, fallback_id: str = "" + ) -> str: """生成唯一音乐ID""" if item_id: return f"online_{plugin_name}_{item_id}" @@ -170,7 +191,7 @@ class JSAdapter: def _extract_artists(self, item: dict) -> str: """提取艺术家信息""" # 尝试多种可能的艺术家字段 - artist_fields = ['artist', 'artists', 'singer', 'author', 'creator', 'singers'] + artist_fields = ["artist", "artists", "singer", "author", "creator", "singers"] for field in artist_fields: if field in item: @@ -180,36 +201,36 @@ class JSAdapter: artists = [] for artist in value: if isinstance(artist, dict): - artists.append(artist.get('name', str(artist))) + artists.append(artist.get("name", str(artist))) else: artists.append(str(artist)) - return ', '.join(artists) + return ", ".join(artists) elif isinstance(value, dict): # 如果是艺术家对象 - return value.get('name', str(value)) + return value.get("name", str(value)) elif value: return str(value) # 如果没有找到艺术家信息,返回默认值 - return '未知艺术家' + return "未知艺术家" def convert_music_item_for_plugin(self, music_item: dict) -> dict: """将 xiaomusic 音乐项转换为插件兼容格式""" # 如果原始数据存在,优先使用原始数据 - if isinstance(music_item, dict) and 'original_data' in music_item: - return music_item['original_data'] + if isinstance(music_item, dict) and "original_data" in music_item: + return music_item["original_data"] # 否则构造一个基本的音乐项 converted = { - 'id': music_item.get('id', ''), - 'title': music_item.get('title', ''), - 'artist': music_item.get('artist', ''), - 'album': music_item.get('album', ''), - 'url': music_item.get('url', ''), - 'duration': music_item.get('duration', 0), - 'artwork': music_item.get('cover', ''), - 'lyric': music_item.get('lyric', ''), - 'quality': music_item.get('quality', ''), + "id": music_item.get("id", ""), + "title": music_item.get("title", ""), + "artist": music_item.get("artist", ""), + "album": music_item.get("album", ""), + "url": music_item.get("url", ""), + "duration": music_item.get("duration", 0), + "artwork": music_item.get("cover", ""), + "lyric": music_item.get("lyric", ""), + "quality": music_item.get("quality", ""), } return converted diff --git a/xiaomusic/js_plugin_manager.py b/xiaomusic/js_plugin_manager.py index 10a01e2..64f03be 100644 --- a/xiaomusic/js_plugin_manager.py +++ b/xiaomusic/js_plugin_manager.py @@ -48,14 +48,14 @@ class JSPluginManager: try: self.node_process = subprocess.Popen( - ['node', '--max-old-space-size=128', runner_path], + ["node", "--max-old-space-size=128", runner_path], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, - encoding='utf-8', - errors='replace', - bufsize=1 # 行缓冲 + encoding="utf-8", + errors="replace", + bufsize=1, # 行缓冲 ) self.log.info("Node.js process started successfully") @@ -88,7 +88,9 @@ class JSPluginManager: self._handle_response(response) except json.JSONDecodeError as e: # 捕获非 JSON 输出(可能是插件的调试信息或错误信息) - self.log.warning(f"Non-JSON output from Node.js process: {line.strip()}, error: {e}") + self.log.warning( + f"Non-JSON output from Node.js process: {line.strip()}, error: {e}" + ) except Exception as e: self.log.error(f"Message handler error: {e}") time.sleep(0.1) @@ -100,7 +102,9 @@ class JSPluginManager: try: error_line = self.node_process.stderr.readline() if error_line: - self.log.error(f"Node.js process error output: {error_line.strip()}") + self.log.error( + f"Node.js process error output: {error_line.strip()}" + ) except Exception as e: self.log.error(f"Error handler error: {e}") time.sleep(0.1) @@ -108,31 +112,35 @@ class JSPluginManager: threading.Thread(target=stdout_handler, daemon=True).start() threading.Thread(target=stderr_handler, daemon=True).start() - def _send_message(self, message: dict[str, Any], timeout: int = 30) -> dict[str, Any]: + def _send_message( + self, message: dict[str, Any], timeout: int = 30 + ) -> dict[str, Any]: """发送消息到 Node.js 子进程""" with self._lock: if not self.node_process or self.node_process.poll() is not None: raise Exception("Node.js process not available") message_id = f"msg_{int(time.time() * 1000)}" - message['id'] = message_id + message["id"] = message_id # 记录发送的消息 self.log.info( - f"JS Plugin Manager sending message: {message.get('action', 'unknown')} for plugin: {message.get('pluginName', 'unknown')}") - if 'params' in message: + f"JS Plugin Manager sending message: {message.get('action', 'unknown')} for plugin: {message.get('pluginName', 'unknown')}" + ) + if "params" in message: self.log.info(f"JS Plugin Manager search params: {message['params']}") - elif 'musicItem' in message: + elif "musicItem" in message: self.log.info(f"JS Plugin Manager music item: {message['musicItem']}") # 发送消息 - self.node_process.stdin.write(json.dumps(message) + '\n') + self.node_process.stdin.write(json.dumps(message) + "\n") self.node_process.stdin.flush() # 等待响应 response = self._wait_for_response(message_id, timeout) self.log.info( - f"JS Plugin Manager received response for message {message_id}: {response.get('success', 'unknown')}") + f"JS Plugin Manager received response for message {message_id}: {response.get('success', 'unknown')}" + ) return response def _wait_for_response(self, message_id: str, timeout: int) -> dict[str, Any]: @@ -149,32 +157,41 @@ class JSPluginManager: def _handle_response(self, response: dict[str, Any]): """处理 Node.js 进程的响应""" - message_id = response.get('id') - self.log.debug(f"JS Plugin Manager received raw response: {response}") # 添加原始响应日志 + message_id = response.get("id") + self.log.debug( + f"JS Plugin Manager received raw response: {response}" + ) # 添加原始响应日志 # 添加更严格的数据验证 if not isinstance(response, dict): - self.log.error(f"JS Plugin Manager received invalid response type: {type(response)}, value: {response}") + self.log.error( + f"JS Plugin Manager received invalid response type: {type(response)}, value: {response}" + ) return - if 'id' not in response: - self.log.error(f"JS Plugin Manager received response without id: {response}") + if "id" not in response: + self.log.error( + f"JS Plugin Manager received response without id: {response}" + ) return # 确保 success 字段存在 - if 'success' not in response: - self.log.warning(f"JS Plugin Manager received response without success field: {response}") - response['success'] = False + if "success" not in response: + self.log.warning( + f"JS Plugin Manager received response without success field: {response}" + ) + response["success"] = False # 如果有 result 字段,验证其结构 - if 'result' in response and response['result'] is not None: - result = response['result'] + if "result" in response and response["result"] is not None: + result = response["result"] if isinstance(result, dict): # 对搜索结果进行特殊处理 - if 'data' in result and not isinstance(result['data'], list): + if "data" in result and not isinstance(result["data"], list): self.log.warning( - f"JS Plugin Manager received result with invalid data type: {type(result['data'])}, setting to empty list") - result['data'] = [] + f"JS Plugin Manager received result with invalid data type: {type(result['data'])}, setting to empty list" + ) + result["data"] = [] if message_id: self.response_handlers[message_id] = response @@ -189,7 +206,7 @@ class JSPluginManager: try: # 读取配置文件中的 OpenAPI 配置信息 if os.path.exists(self.plugins_config_path): - with open(self.plugins_config_path, encoding='utf-8') as f: + with open(self.plugins_config_path, encoding="utf-8") as f: config_data = json.load(f) # 返回 openapi_info 配置项 return config_data.get("openapi_info", {}) @@ -206,7 +223,7 @@ class JSPluginManager: try: # 读取配置文件中的 OpenAPI 配置信息 if os.path.exists(self.plugins_config_path): - with open(self.plugins_config_path, encoding='utf-8') as f: + with open(self.plugins_config_path, encoding="utf-8") as f: config_data = json.load(f) # 获取当前的 openapi_info 配置,如果没有则初始化 @@ -219,7 +236,7 @@ class JSPluginManager: # 更新配置数据 config_data["openapi_info"] = openapi_info # 写回配置文件 - with open(self.plugins_config_path, 'w', encoding='utf-8') as f: + with open(self.plugins_config_path, "w", encoding="utf-8") as f: json.dump(config_data, f, ensure_ascii=False, indent=2) return {"success": True} else: @@ -229,7 +246,7 @@ class JSPluginManager: # 出错时返回默认配置 return {"success": False, "error": str(e)} - def update_openapi_url(self,openapi_url: str) -> dict[str, Any]: + def update_openapi_url(self, openapi_url: str) -> dict[str, Any]: """更新开放接口地址 Returns: 更新后的配置信息 :type openapi_url: 新的接口地址 @@ -237,7 +254,7 @@ class JSPluginManager: try: # 读取配置文件中的 OpenAPI 配置信息 if os.path.exists(self.plugins_config_path): - with open(self.plugins_config_path, encoding='utf-8') as f: + with open(self.plugins_config_path, encoding="utf-8") as f: config_data = json.load(f) # 获取当前的 openapi_info 配置,如果没有则初始化 @@ -250,7 +267,7 @@ class JSPluginManager: # 更新配置数据 config_data["openapi_info"] = openapi_info # 写回配置文件 - with open(self.plugins_config_path, 'w', encoding='utf-8') as f: + with open(self.plugins_config_path, "w", encoding="utf-8") as f: json.dump(config_data, f, ensure_ascii=False, indent=2) return {"success": True} else: @@ -270,7 +287,9 @@ class JSPluginManager: # 读取、加载插件配置Json if not os.path.exists(self.plugins_config_path): # 复制 plugins-config-example.json 模板,创建插件配置Json文件 - example_config_path = os.path.join(os.path.dirname(__file__), "plugins-config-example.json") + example_config_path = os.path.join( + os.path.dirname(__file__), "plugins-config-example.json" + ) if os.path.exists(example_config_path): shutil.copy2(example_config_path, self.plugins_config_path) else: @@ -279,12 +298,9 @@ class JSPluginManager: "password": "", "enabled_plugins": [], "plugins_info": [], - "openapi_info": { - "enabled": False, - "search_url": "" - } + "openapi_info": {"enabled": False, "search_url": ""}, } - with open(self.plugins_config_path, 'w', encoding='utf-8') as f: + with open(self.plugins_config_path, "w", encoding="utf-8") as f: json.dump(base_config, f, ensure_ascii=False, indent=2) # 输出文件夹、配置文件地址 self.log.info(f"Plugins directory: {self.plugins_dir}") @@ -294,7 +310,7 @@ class JSPluginManager: # 读取配置文件配置 enabled_plugins = self.get_enabled_plugins() for filename in os.listdir(self.plugins_dir): - if filename.endswith('.js'): + if filename.endswith(".js"): try: plugin_name = os.path.splitext(filename)[0] # 如果是重要插件或没有指定重要插件列表,则加载 @@ -303,31 +319,35 @@ class JSPluginManager: self.log.info(f"Loading plugin: {plugin_name}") self.load_plugin(plugin_name) except Exception as e: - self.log.error(f"Failed to load important plugin {plugin_name}: {e}") + self.log.error( + f"Failed to load important plugin {plugin_name}: {e}" + ) # 即使加载失败也记录插件信息 self.plugins[plugin_name] = { - 'name': plugin_name, - 'enabled': False, - 'loaded': False, - 'error': str(e) + "name": plugin_name, + "enabled": False, + "loaded": False, + "error": str(e), } else: - self.log.debug(f"Skipping plugin (not in important list): {plugin_name}") + self.log.debug( + f"Skipping plugin (not in important list): {plugin_name}" + ) # 标记为未加载但可用 self.plugins[plugin_name] = { - 'name': plugin_name, - 'enabled': False, - 'loaded': False, - 'error': 'Not loaded (not in important plugins list)' + "name": plugin_name, + "enabled": False, + "loaded": False, + "error": "Not loaded (not in important plugins list)", } except Exception as e: self.log.error(f"Failed to load plugin {filename}: {e}") # 即使加载失败也记录插件信息 self.plugins[plugin_name] = { - 'name': plugin_name, - 'enabled': False, - 'loaded': False, - 'error': str(e) + "name": plugin_name, + "enabled": False, + "loaded": False, + "error": str(e), } def load_plugin(self, plugin_name: str) -> bool: @@ -338,25 +358,25 @@ class JSPluginManager: raise FileNotFoundError(f"Plugin file not found: {plugin_file}") try: - with open(plugin_file, encoding='utf-8') as f: + with open(plugin_file, encoding="utf-8") as f: js_code = f.read() - response = self._send_message({ - 'action': 'load', - 'name': plugin_name, - 'code': js_code - }) + response = self._send_message( + {"action": "load", "name": plugin_name, "code": js_code} + ) - if response['success']: + if response["success"]: self.plugins[plugin_name] = { - 'status': 'loaded', - 'load_time': time.time(), - 'enabled': True + "status": "loaded", + "load_time": time.time(), + "enabled": True, } self.log.info(f"Loaded JS plugin: {plugin_name}") return True else: - self.log.error(f"Failed to load JS plugin {plugin_name}: {response['error']}") + self.log.error( + f"Failed to load JS plugin {plugin_name}: {response['error']}" + ) return False except Exception as e: @@ -369,7 +389,7 @@ class JSPluginManager: try: # 读取配置文件中的启用插件列表 if os.path.exists(self.plugins_config_path): - with open(self.plugins_config_path, encoding='utf-8') as f: + with open(self.plugins_config_path, encoding="utf-8") as f: config_data = json.load(f) plugin_infos = config_data.get("plugins_info", []) enabled_plugins = config_data.get("enabled_plugins", []) @@ -380,9 +400,13 @@ class JSPluginManager: # 先按 enabled 属性排序(True 在前) # 再按 enabled_plugins 顺序排序(启用的插件才参与此排序) def sort_key(plugin_info): - name = plugin_info['name'] - is_enabled = plugin_info.get('enabled', False) - order = enabled_order.get(name, len(enabled_plugins)) if is_enabled else len(enabled_plugins) + name = plugin_info["name"] + is_enabled = plugin_info.get("enabled", False) + order = ( + enabled_order.get(name, len(enabled_plugins)) + if is_enabled + else len(enabled_plugins) + ) # (-is_enabled) 将 True(1) 放到前面,False(0) 放到后面 # order 控制启用插件间的相对顺序 return -is_enabled, order @@ -397,7 +421,7 @@ class JSPluginManager: try: # 读取配置文件中的启用插件列表 if os.path.exists(self.plugins_config_path): - with open(self.plugins_config_path, encoding='utf-8') as f: + with open(self.plugins_config_path, encoding="utf-8") as f: config_data = json.load(f) return config_data.get("enabled_plugins", []) else: @@ -411,39 +435,48 @@ class JSPluginManager: if plugin_name not in self.plugins: raise ValueError(f"Plugin {plugin_name} not found or not loaded") - self.log.info(f"JS Plugin Manager starting search in plugin {plugin_name} for keyword: {keyword}") - response = self._send_message({ - 'action': 'search', - 'pluginName': plugin_name, - 'params': { - 'keywords': keyword, - 'page': page, - 'limit': limit + self.log.info( + f"JS Plugin Manager starting search in plugin {plugin_name} for keyword: {keyword}" + ) + response = self._send_message( + { + "action": "search", + "pluginName": plugin_name, + "params": {"keywords": keyword, "page": page, "limit": limit}, } - }) + ) - self.log.debug(f"JS Plugin Manager search response: {response}") # 使用 debug 级别,减少日志量 + self.log.debug( + f"JS Plugin Manager search response: {response}" + ) # 使用 debug 级别,减少日志量 - if not response['success']: - self.log.error(f"JS Plugin Manager search failed in plugin {plugin_name}: {response['error']}") + if not response["success"]: + self.log.error( + f"JS Plugin Manager search failed in plugin {plugin_name}: {response['error']}" + ) # 添加详细的错误信息 self.log.error(f"JS Plugin Manager full error response: {response}") raise Exception(f"Search failed: {response['error']}") else: # 检查返回的数据结构 - result_data = response['result'] - self.log.debug(f"JS Plugin Manager search raw result: {result_data}") # 使用 debug 级别 - data_list = result_data.get('data', []) - is_end = result_data.get('isEnd', True) + result_data = response["result"] + self.log.debug( + f"JS Plugin Manager search raw result: {result_data}" + ) # 使用 debug 级别 + data_list = result_data.get("data", []) + is_end = result_data.get("isEnd", True) self.log.info( - f"JS Plugin Manager search completed in plugin {plugin_name}, isEnd: {is_end}, found {len(data_list)} results") + f"JS Plugin Manager search completed in plugin {plugin_name}, isEnd: {is_end}, found {len(data_list)} results" + ) # 检查数据类型是否正确 if not isinstance(data_list, list): self.log.error( - f"JS Plugin Manager search returned invalid data type: {type(data_list)}, value: {data_list}") + f"JS Plugin Manager search returned invalid data type: {type(data_list)}, value: {data_list}" + ) else: self.log.debug( - f"JS Plugin Manager search data sample: {data_list[:2] if len(data_list) > 0 else 'No results'}") + f"JS Plugin Manager search data sample: {data_list[:2] if len(data_list) > 0 else 'No results'}" + ) return result_data async def openapi_search(self, url: str, keyword: str, limit: int = 10): @@ -462,21 +495,19 @@ class JSPluginManager: try: # 如果关键词包含 '-',则提取歌手名、歌名 - if '-' in keyword: - parts = keyword.split('-') + if "-" in keyword: + parts = keyword.split("-") keyword = parts[0] artist = parts[1] else: artist = "" # 构造请求参数 - params = { - 'type': "aggregateSearch", - 'keyword': keyword, - 'limit': limit - } + params = {"type": "aggregateSearch", "keyword": keyword, "limit": limit} # 使用aiohttp发起异步HTTP GET请求 async with aiohttp.ClientSession() as session: - async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as response: + async with session.get( + url, params=params, timeout=aiohttp.ClientTimeout(total=10) + ) as response: response.raise_for_status() # 抛出HTTP错误 # 解析响应数据 raw_data = await response.json() @@ -484,26 +515,28 @@ class JSPluginManager: self.log.info(f"在线接口返回Json: {raw_data}") # 检查API调用是否成功 - if raw_data.get('code') != 200: - raise Exception(f"API request failed with code: {raw_data.get('code', 'unknown')}") + if raw_data.get("code") != 200: + raise Exception( + f"API request failed with code: {raw_data.get('code', 'unknown')}" + ) # 提取实际的搜索结果 - api_data = raw_data.get('data', {}) - results = api_data.get('results', []) + api_data = raw_data.get("data", {}) + results = api_data.get("results", []) # 转换数据格式以匹配插件系统的期望格式 converted_data = [] for item in results: converted_item = { - 'id': item.get('id', ''), - 'title': item.get('name', ''), - 'artist': item.get('artist', ''), - 'album': item.get('album', ''), - 'platform': 'OpenAPI-' + item.get('platform'), - 'isOpenAPI': True, - 'url': item.get('url', ''), - 'artwork': item.get('pic', ''), - 'lrc': item.get('lrc', '') + "id": item.get("id", ""), + "title": item.get("name", ""), + "artist": item.get("artist", ""), + "album": item.get("album", ""), + "platform": "OpenAPI-" + item.get("platform"), + "isOpenAPI": True, + "url": item.get("url", ""), + "artwork": item.get("pic", ""), + "lrc": item.get("lrc", ""), } converted_data.append(converted_item) # 排序筛选 @@ -513,9 +546,9 @@ class JSPluginManager: unified_result, search_keyword=keyword, limit=limit, - search_artist=artist + search_artist=artist, ) - results = optimized_result.get('data', []) + results = optimized_result.get("data", []) # 返回统一格式的数据 return { "success": True, @@ -523,7 +556,7 @@ class JSPluginManager: "total": len(results), "sources": {"OpenAPI": len(results)}, "page": 1, - "limit": limit + "limit": limit, } except asyncio.TimeoutError as e: @@ -535,7 +568,7 @@ class JSPluginManager: "total": 0, "sources": {}, "page": 1, - "limit": limit + "limit": limit, } except Exception as e: self.log.error(f"OpenAPI search error at URL {url}: {e}") @@ -546,15 +579,15 @@ class JSPluginManager: "total": 0, "sources": {}, "page": 1, - "limit": limit + "limit": limit, } def optimize_search_results( - self, - result_data: dict[str, Any], # 搜索结果数据,字典类型,包含任意类型的值 - search_keyword: str = "", # 搜索关键词,默认为空字符串 - search_artist: str = "", # 搜索歌手名,默认为空字符串 - limit: int = 1 # 返回结果数量限制,默认为1 + self, + result_data: dict[str, Any], # 搜索结果数据,字典类型,包含任意类型的值 + search_keyword: str = "", # 搜索关键词,默认为空字符串 + search_artist: str = "", # 搜索歌手名,默认为空字符串 + limit: int = 1, # 返回结果数量限制,默认为1 ) -> dict[str, Any]: # 返回优化后的搜索结果,字典类型,包含任意类型的值 """ 优化搜索结果,根据关键词、歌手名和平台权重对结果进行排序 @@ -566,7 +599,7 @@ class JSPluginManager: 返回: 优化后的搜索结果数据,已根据匹配度和平台权重排序 """ - if not result_data or 'data' not in result_data or not result_data['data']: + if not result_data or "data" not in result_data or not result_data["data"]: return result_data # 清理搜索关键词和歌手名,去除首尾空格 @@ -578,7 +611,7 @@ class JSPluginManager: return result_data # 两者都空才不排序 # 获取待处理的数据列表 - data_list = result_data['data'] + data_list = result_data["data"] self.log.info(f"列表信息::{data_list}") # 预计算平台权重,启用插件列表中的前9个插件有权重,排名越靠前权重越高 enabled_plugins = self.get_enabled_plugins() @@ -593,9 +626,9 @@ class JSPluginManager: 匹配分数,包含标题匹配分、艺术家匹配分和平台加分 """ # 获取并标准化标题、艺术家和平台信息 - title = item.get('title', '').lower() - artist = item.get('artist', '').lower() - platform = item.get('platform', '') + title = item.get("title", "").lower() + artist = item.get("artist", "").lower() + platform = item.get("platform", "") # 标准化搜索关键词和艺术家名 kw = search_keyword.lower() @@ -628,7 +661,7 @@ class JSPluginManager: self.log.info(f"排序后列表信息::{sorted_data}") if 0 < limit < len(sorted_data): sorted_data = sorted_data[:limit] - result_data['data'] = sorted_data + result_data["data"] = sorted_data return result_data def get_media_source(self, plugin_name: str, music_item: dict[str, Any], quality): @@ -637,22 +670,28 @@ class JSPluginManager: raise ValueError(f"Plugin {plugin_name} not found or not loaded") self.log.debug( - f"JS Plugin Manager getting media source in plugin {plugin_name} for item: {music_item.get('title', 'unknown')} by {music_item.get('artist', 'unknown')}") - response = self._send_message({ - 'action': 'getMediaSource', - 'pluginName': plugin_name, - 'musicItem': music_item, - 'quality': quality - }) + f"JS Plugin Manager getting media source in plugin {plugin_name} for item: {music_item.get('title', 'unknown')} by {music_item.get('artist', 'unknown')}" + ) + response = self._send_message( + { + "action": "getMediaSource", + "pluginName": plugin_name, + "musicItem": music_item, + "quality": quality, + } + ) - if not response['success']: - self.log.error(f"JS Plugin Manager getMediaSource failed in plugin {plugin_name}: {response['error']}") + if not response["success"]: + self.log.error( + f"JS Plugin Manager getMediaSource failed in plugin {plugin_name}: {response['error']}" + ) raise Exception(f"getMediaSource failed: {response['error']}") else: self.log.debug( - f"JS Plugin Manager getMediaSource completed in plugin {plugin_name}, URL length: {len(response['result'].get('url', '')) if response['result'] else 0}") + f"JS Plugin Manager getMediaSource completed in plugin {plugin_name}, URL length: {len(response['result'].get('url', '')) if response['result'] else 0}" + ) - return response['result'] + return response["result"] def get_lyric(self, plugin_name: str, music_item: dict[str, Any]): """获取歌词""" @@ -660,18 +699,19 @@ class JSPluginManager: raise ValueError(f"Plugin {plugin_name} not found or not loaded") self.log.debug( - f"JS Plugin Manager getting lyric in plugin {plugin_name} for music: {music_item.get('title', 'unknown')}") - response = self._send_message({ - 'action': 'getLyric', - 'pluginName': plugin_name, - 'musicItem': music_item - }) + f"JS Plugin Manager getting lyric in plugin {plugin_name} for music: {music_item.get('title', 'unknown')}" + ) + response = self._send_message( + {"action": "getLyric", "pluginName": plugin_name, "musicItem": music_item} + ) - if not response['success']: - self.log.error(f"JS Plugin Manager getLyric failed in plugin {plugin_name}: {response['error']}") + if not response["success"]: + self.log.error( + f"JS Plugin Manager getLyric failed in plugin {plugin_name}: {response['error']}" + ) raise Exception(f"getLyric failed: {response['error']}") - return response['result'] + return response["result"] def get_music_info(self, plugin_name: str, music_item: dict[str, Any]): """获取音乐详情""" @@ -679,113 +719,155 @@ class JSPluginManager: raise ValueError(f"Plugin {plugin_name} not found or not loaded") self.log.debug( - f"JS Plugin Manager getting music info in plugin {plugin_name} for music: {music_item.get('title', 'unknown')}") - response = self._send_message({ - 'action': 'getMusicInfo', - 'pluginName': plugin_name, - 'musicItem': music_item - }) + f"JS Plugin Manager getting music info in plugin {plugin_name} for music: {music_item.get('title', 'unknown')}" + ) + response = self._send_message( + { + "action": "getMusicInfo", + "pluginName": plugin_name, + "musicItem": music_item, + } + ) - if not response['success']: - self.log.error(f"JS Plugin Manager getMusicInfo failed in plugin {plugin_name}: {response['error']}") + if not response["success"]: + self.log.error( + f"JS Plugin Manager getMusicInfo failed in plugin {plugin_name}: {response['error']}" + ) raise Exception(f"getMusicInfo failed: {response['error']}") - return response['result'] + return response["result"] - def get_album_info(self, plugin_name: str, album_info: dict[str, Any], page: int = 1): + def get_album_info( + self, plugin_name: str, album_info: dict[str, Any], page: int = 1 + ): """获取专辑详情""" if plugin_name not in self.plugins: raise ValueError(f"Plugin {plugin_name} not found or not loaded") self.log.debug( - f"JS Plugin Manager getting album info in plugin {plugin_name} for album: {album_info.get('title', 'unknown')}") - response = self._send_message({ - 'action': 'getAlbumInfo', - 'pluginName': plugin_name, - 'albumInfo': album_info - }) + f"JS Plugin Manager getting album info in plugin {plugin_name} for album: {album_info.get('title', 'unknown')}" + ) + response = self._send_message( + { + "action": "getAlbumInfo", + "pluginName": plugin_name, + "albumInfo": album_info, + } + ) - if not response['success']: - self.log.error(f"JS Plugin Manager getAlbumInfo failed in plugin {plugin_name}: {response['error']}") + if not response["success"]: + self.log.error( + f"JS Plugin Manager getAlbumInfo failed in plugin {plugin_name}: {response['error']}" + ) raise Exception(f"getAlbumInfo failed: {response['error']}") - return response['result'] + return response["result"] - def get_music_sheet_info(self, plugin_name: str, playlist_info: dict[str, Any], page: int = 1): + def get_music_sheet_info( + self, plugin_name: str, playlist_info: dict[str, Any], page: int = 1 + ): """获取歌单详情""" if plugin_name not in self.plugins: raise ValueError(f"Plugin {plugin_name} not found or not loaded") self.log.debug( - f"JS Plugin Manager getting music sheet info in plugin {plugin_name} for playlist: {playlist_info.get('title', 'unknown')}") - response = self._send_message({ - 'action': 'getMusicSheetInfo', - 'pluginName': plugin_name, - 'playlistInfo': playlist_info - }) + f"JS Plugin Manager getting music sheet info in plugin {plugin_name} for playlist: {playlist_info.get('title', 'unknown')}" + ) + response = self._send_message( + { + "action": "getMusicSheetInfo", + "pluginName": plugin_name, + "playlistInfo": playlist_info, + } + ) - if not response['success']: - self.log.error(f"JS Plugin Manager getMusicSheetInfo failed in plugin {plugin_name}: {response['error']}") + if not response["success"]: + self.log.error( + f"JS Plugin Manager getMusicSheetInfo failed in plugin {plugin_name}: {response['error']}" + ) raise Exception(f"getMusicSheetInfo failed: {response['error']}") - return response['result'] + return response["result"] - def get_artist_works(self, plugin_name: str, artist_item: dict[str, Any], page: int = 1, type_: str = 'music'): + def get_artist_works( + self, + plugin_name: str, + artist_item: dict[str, Any], + page: int = 1, + type_: str = "music", + ): """获取作者作品""" if plugin_name not in self.plugins: raise ValueError(f"Plugin {plugin_name} not found or not loaded") self.log.debug( - f"JS Plugin Manager getting artist works in plugin {plugin_name} for artist: {artist_item.get('title', 'unknown')}") - response = self._send_message({ - 'action': 'getArtistWorks', - 'pluginName': plugin_name, - 'artistItem': artist_item, - 'page': page, - 'type': type_ - }) + f"JS Plugin Manager getting artist works in plugin {plugin_name} for artist: {artist_item.get('title', 'unknown')}" + ) + response = self._send_message( + { + "action": "getArtistWorks", + "pluginName": plugin_name, + "artistItem": artist_item, + "page": page, + "type": type_, + } + ) - if not response['success']: - self.log.error(f"JS Plugin Manager getArtistWorks failed in plugin {plugin_name}: {response['error']}") + if not response["success"]: + self.log.error( + f"JS Plugin Manager getArtistWorks failed in plugin {plugin_name}: {response['error']}" + ) raise Exception(f"getArtistWorks failed: {response['error']}") - return response['result'] + return response["result"] def import_music_item(self, plugin_name: str, url_like: str): """导入单曲""" if plugin_name not in self.plugins: raise ValueError(f"Plugin {plugin_name} not found or not loaded") - self.log.debug(f"JS Plugin Manager importing music item in plugin {plugin_name} from: {url_like}") - response = self._send_message({ - 'action': 'importMusicItem', - 'pluginName': plugin_name, - 'urlLike': url_like - }) + self.log.debug( + f"JS Plugin Manager importing music item in plugin {plugin_name} from: {url_like}" + ) + response = self._send_message( + { + "action": "importMusicItem", + "pluginName": plugin_name, + "urlLike": url_like, + } + ) - if not response['success']: - self.log.error(f"JS Plugin Manager importMusicItem failed in plugin {plugin_name}: {response['error']}") + if not response["success"]: + self.log.error( + f"JS Plugin Manager importMusicItem failed in plugin {plugin_name}: {response['error']}" + ) raise Exception(f"importMusicItem failed: {response['error']}") - return response['result'] + return response["result"] def import_music_sheet(self, plugin_name: str, url_like: str): """导入歌单""" if plugin_name not in self.plugins: raise ValueError(f"Plugin {plugin_name} not found or not loaded") - self.log.debug(f"JS Plugin Manager importing music sheet in plugin {plugin_name} from: {url_like}") - response = self._send_message({ - 'action': 'importMusicSheet', - 'pluginName': plugin_name, - 'urlLike': url_like - }) + self.log.debug( + f"JS Plugin Manager importing music sheet in plugin {plugin_name} from: {url_like}" + ) + response = self._send_message( + { + "action": "importMusicSheet", + "pluginName": plugin_name, + "urlLike": url_like, + } + ) - if not response['success']: - self.log.error(f"JS Plugin Manager importMusicSheet failed in plugin {plugin_name}: {response['error']}") + if not response["success"]: + self.log.error( + f"JS Plugin Manager importMusicSheet failed in plugin {plugin_name}: {response['error']}" + ) raise Exception(f"importMusicSheet failed: {response['error']}") - return response['result'] + return response["result"] def get_top_lists(self, plugin_name: str): """获取榜单列表""" @@ -793,41 +875,49 @@ class JSPluginManager: raise ValueError(f"Plugin {plugin_name} not found or not loaded") self.log.debug(f"JS Plugin Manager getting top lists in plugin {plugin_name}") - response = self._send_message({ - 'action': 'getTopLists', - 'pluginName': plugin_name - }) + response = self._send_message( + {"action": "getTopLists", "pluginName": plugin_name} + ) - if not response['success']: - self.log.error(f"JS Plugin Manager getTopLists failed in plugin {plugin_name}: {response['error']}") + if not response["success"]: + self.log.error( + f"JS Plugin Manager getTopLists failed in plugin {plugin_name}: {response['error']}" + ) raise Exception(f"getTopLists failed: {response['error']}") - return response['result'] + return response["result"] - def get_top_list_detail(self, plugin_name: str, top_list_item: dict[str, Any], page: int = 1): + def get_top_list_detail( + self, plugin_name: str, top_list_item: dict[str, Any], page: int = 1 + ): """获取榜单详情""" if plugin_name not in self.plugins: raise ValueError(f"Plugin {plugin_name} not found or not loaded") self.log.debug( - f"JS Plugin Manager getting top list detail in plugin {plugin_name} for list: {top_list_item.get('title', 'unknown')}") - response = self._send_message({ - 'action': 'getTopListDetail', - 'pluginName': plugin_name, - 'topListItem': top_list_item, - 'page': page - }) + f"JS Plugin Manager getting top list detail in plugin {plugin_name} for list: {top_list_item.get('title', 'unknown')}" + ) + response = self._send_message( + { + "action": "getTopListDetail", + "pluginName": plugin_name, + "topListItem": top_list_item, + "page": page, + } + ) - if not response['success']: - self.log.error(f"JS Plugin Manager getTopListDetail failed in plugin {plugin_name}: {response['error']}") + if not response["success"]: + self.log.error( + f"JS Plugin Manager getTopListDetail failed in plugin {plugin_name}: {response['error']}" + ) raise Exception(f"getTopListDetail failed: {response['error']}") - return response['result'] + return response["result"] # 启用插件 def enable_plugin(self, plugin_name: str) -> bool: if plugin_name in self.plugins: - self.plugins[plugin_name]['enabled'] = True + self.plugins[plugin_name]["enabled"] = True # 读取、修改 插件配置json文件:① 将plugins_info属性中对于的插件状态改为禁用、2:将 enabled_plugins中对应插件移除 # 同步更新配置文件 try: @@ -836,7 +926,7 @@ class JSPluginManager: # 读取现有配置 if os.path.exists(config_file_path): - with open(config_file_path, encoding='utf-8') as f: + with open(config_file_path, encoding="utf-8") as f: config_data = json.load(f) # 更新plugins_info中对应插件的状态 @@ -853,23 +943,26 @@ class JSPluginManager: config_data["enabled_plugins"].insert(0, plugin_name) # 写回配置文件 - with open(config_file_path, 'w', encoding='utf-8') as f: + with open(config_file_path, "w", encoding="utf-8") as f: json.dump(config_data, f, ensure_ascii=False, indent=2) - self.log.info(f"Plugin config updated for enabled plugin {plugin_name}") + self.log.info( + f"Plugin config updated for enabled plugin {plugin_name}" + ) # 更新插件引擎 self.reload_plugins() except Exception as e: - self.log.error(f"Failed to update plugin config when enabling {plugin_name}: {e}") + self.log.error( + f"Failed to update plugin config when enabling {plugin_name}: {e}" + ) return True return False # 禁用插件 def disable_plugin(self, plugin_name: str) -> bool: - if plugin_name in self.plugins: - self.plugins[plugin_name]['enabled'] = False + self.plugins[plugin_name]["enabled"] = False # 读取、修改 插件配置json文件:① 将plugins_info属性中对于的插件状态改为禁用、2:将 enabled_plugins中对应插件移除 # 同步更新配置文件 try: @@ -878,7 +971,7 @@ class JSPluginManager: # 读取现有配置 if os.path.exists(config_file_path): - with open(config_file_path, encoding='utf-8') as f: + with open(config_file_path, encoding="utf-8") as f: config_data = json.load(f) # 更新plugins_info中对应插件的状态 @@ -895,14 +988,18 @@ class JSPluginManager: config_data["enabled_plugins"].remove(plugin_name) # 写回配置文件 - with open(config_file_path, 'w', encoding='utf-8') as f: + with open(config_file_path, "w", encoding="utf-8") as f: json.dump(config_data, f, ensure_ascii=False, indent=2) - self.log.info(f"Plugin config updated for enabled plugin {plugin_name}") + self.log.info( + f"Plugin config updated for enabled plugin {plugin_name}" + ) # 更新插件引擎 self.reload_plugins() except Exception as e: - self.log.error(f"Failed to update plugin config when enabling {plugin_name}: {e}") + self.log.error( + f"Failed to update plugin config when enabling {plugin_name}: {e}" + ) return True return False @@ -919,25 +1016,31 @@ class JSPluginManager: # 读取现有配置 if os.path.exists(config_file_path): - with open(config_file_path, encoding='utf-8') as f: + with open(config_file_path, encoding="utf-8") as f: config_data = json.load(f) # 移除plugins_info属性中对应的插件项目 if "plugins_info" in config_data: config_data["plugins_info"] = [ - plugin_info for plugin_info in config_data["plugins_info"] + plugin_info + for plugin_info in config_data["plugins_info"] if plugin_info.get("name") != plugin_name ] # 从enabled_plugins中移除插件(如果存在) - if "enabled_plugins" in config_data and plugin_name in config_data["enabled_plugins"]: + if ( + "enabled_plugins" in config_data + and plugin_name in config_data["enabled_plugins"] + ): config_data["enabled_plugins"].remove(plugin_name) # 回写配置文件 - with open(config_file_path, 'w', encoding='utf-8') as f: + with open(config_file_path, "w", encoding="utf-8") as f: json.dump(config_data, f, ensure_ascii=False, indent=2) - self.log.info(f"Plugin config updated for uninstalled plugin {plugin_name}") + self.log.info( + f"Plugin config updated for uninstalled plugin {plugin_name}" + ) # 删除插件文件夹中的指定插件文件 plugin_file_path = os.path.join(self.plugins_dir, f"{plugin_name}.js") @@ -973,13 +1076,13 @@ class JSPluginManager: "account": "", "password": "", "enabled_plugins": [], - "plugins_info": [] + "plugins_info": [], } - with open(config_file_path, 'w', encoding='utf-8') as f: + with open(config_file_path, "w", encoding="utf-8") as f: json.dump(base_config, f, ensure_ascii=False, indent=2) # 读取现有配置 - with open(config_file_path, encoding='utf-8') as f: + with open(config_file_path, encoding="utf-8") as f: config_data = json.load(f) # 检查是否已存在该插件信息 @@ -994,13 +1097,13 @@ class JSPluginManager: new_plugin_info = { "name": plugin_name, "file": plugin_file, - "enabled": False # 默认不启用 + "enabled": False, # 默认不启用 } if "plugins_info" not in config_data: config_data["plugins_info"] = [] config_data["plugins_info"].append(new_plugin_info) # 写回配置文件 - with open(config_file_path, 'w', encoding='utf-8') as f: + with open(config_file_path, "w", encoding="utf-8") as f: json.dump(config_data, f, ensure_ascii=False, indent=2) self.log.info(f"Plugin config updated for {plugin_name}") diff --git a/xiaomusic/utils.py b/xiaomusic/utils.py index be975d4..eac9921 100644 --- a/xiaomusic/utils.py +++ b/xiaomusic/utils.py @@ -200,7 +200,7 @@ def custom_sort_key(s): def _get_depth_path(root, directory, depth): # 计算当前目录的深度 - relative_path = root[len(directory):].strip(os.sep) + relative_path = root[len(directory) :].strip(os.sep) path_parts = relative_path.split(os.sep) if len(path_parts) >= depth: return os.path.join(directory, *path_parts[:depth]) @@ -231,7 +231,7 @@ def traverse_music_directory(directory, depth, exclude_dirs, support_extension): dirs[:] = [d for d in dirs if d not in exclude_dirs] # 计算当前目录的深度 - current_depth = root[len(directory):].count(os.sep) + 1 + current_depth = root[len(directory) :].count(os.sep) + 1 if current_depth > depth: depth_path = _get_depth_path(root, directory, depth - 1) _append_files_result(result, depth_path, root, files, support_extension) @@ -242,14 +242,14 @@ def traverse_music_directory(directory, depth, exclude_dirs, support_extension): # 发送给网页3thplay,用于三者设备播放 async def thdplay( - action, args="/static/3thdplay.mp3", target="HTTP://192.168.1.10:58090/thdaction" + action, args="/static/3thdplay.mp3", target="HTTP://192.168.1.10:58090/thdaction" ): # 接口地址 target,在参数文件指定 data = {"action": action, "args": args} try: async with aiohttp.ClientSession() as session: async with session.post( - target, json=data, timeout=5 + target, json=data, timeout=5 ) as response: # 增加超时以避免长时间挂起 # 如果响应不是200,引发异常 response.raise_for_status() @@ -276,7 +276,7 @@ async def downloadfile(url): # 使用 aiohttp 创建一个客户端会话来发起请求 async with aiohttp.ClientSession() as session: async with session.get( - cleaned_url, timeout=5 + cleaned_url, timeout=5 ) as response: # 增加超时以避免长时间挂起 # 如果响应不是200,引发异常 response.raise_for_status() @@ -345,11 +345,11 @@ async def get_web_music_duration(url, config): cleaned_url = parsed_url.geturl() async with aiohttp.ClientSession() as session: async with session.get( - cleaned_url, - allow_redirects=True, - headers={ - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36" - }, + cleaned_url, + allow_redirects=True, + headers={ + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36" + }, ) as response: url = str(response.url) # 设置总超时时间为3秒 @@ -427,14 +427,18 @@ def get_duration_by_ffprobe(file_path, ffmpeg_location): ) # 输出命令执行结果 - log.info(f"命令执行结果 command result - return code: {result.returncode}, stdout: {result.stdout}") + log.info( + f"命令执行结果 command result - return code: {result.returncode}, stdout: {result.stdout}" + ) # 解析 JSON 输出 ffprobe_output = json.loads(result.stdout) # 获取时长 duration = float(ffprobe_output["format"]["duration"]) - log.info(f"Successfully extracted duration: {duration} seconds for file: {file_path}") + log.info( + f"Successfully extracted duration: {duration} seconds for file: {file_path}" + ) except Exception as e: log.warning(f"Error getting local music {file_path} duration: {e}") @@ -499,8 +503,8 @@ def remove_id3_tags(input_file: str, config) -> str: # 检查是否存在ID3 v2.3或v2.4标签 if not ( - audio.tags - and (audio.tags.version == (2, 3, 0) or audio.tags.version == (2, 4, 0)) + audio.tags + and (audio.tags.version == (2, 3, 0) or audio.tags.version == (2, 4, 0)) ): return None @@ -1148,7 +1152,7 @@ def remove_common_prefix(directory): # 检查文件名是否以共同前缀开头 if filename.startswith(common_prefix): # 构造新的文件名 - new_filename = filename[len(common_prefix):] + new_filename = filename[len(common_prefix) :] match = pattern.search(new_filename.strip()) if match: num = match.group(1) @@ -1339,10 +1343,10 @@ async def fetch_json_get(url, headers, config): async with aiohttp.ClientSession(connector=connector) as session: # 3. 发起带代理的GET请求 async with session.get( - url, - headers=headers, - proxy=proxy, # 传入格式化后的代理参数 - timeout=10, # 超时时间(秒),避免无限等待 + url, + headers=headers, + proxy=proxy, # 传入格式化后的代理参数 + timeout=10, # 超时时间(秒),避免无限等待 ) as response: if response.status == 200: data = await response.json() @@ -1482,7 +1486,7 @@ class MusicUrlCache: async def text_to_mp3( - text: str, save_dir: str, voice: str = "zh-CN-XiaoxiaoNeural" + text: str, save_dir: str, voice: str = "zh-CN-XiaoxiaoNeural" ) -> str: """ 使用edge-tts将文本转换为MP3语音文件 diff --git a/xiaomusic/xiaomusic.py b/xiaomusic/xiaomusic.py index 30d6617..9634d37 100644 --- a/xiaomusic/xiaomusic.py +++ b/xiaomusic/xiaomusic.py @@ -112,6 +112,7 @@ class XiaoMusic: # 初始化 JS 插件管理器 try: from xiaomusic.js_plugin_manager import JSPluginManager + self.js_plugin_manager = JSPluginManager(self) self.log.info("JS Plugin Manager initialized successfully") except Exception as e: @@ -121,6 +122,7 @@ class XiaoMusic: # 初始化 JS 插件适配器 try: from xiaomusic.js_adapter import JSAdapter + self.js_adapter = JSAdapter(self) self.log.info("JS Adapter initialized successfully") except Exception as e: @@ -148,8 +150,15 @@ class XiaoMusic: self.log.warning("配置文件目录和音乐目录建议设置为不同的目录") # 私有方法:调用插件方法的通用封装 - async def __call_plugin_method(self, plugin_name: str, method_name: str, music_item: dict, result_key: str, - required_field: str = None, **kwargs): + async def __call_plugin_method( + self, + plugin_name: str, + method_name: str, + music_item: dict, + result_key: str, + required_field: str = None, + **kwargs, + ): """ 通用方法:调用 JS 插件的方法并返回结果 @@ -177,13 +186,22 @@ class XiaoMusic: try: # 调用插件方法,传递额外参数 - result = getattr(self.js_plugin_manager, method_name)(plugin_name, music_item, **kwargs) - if not result or not result.get(result_key) or result.get(result_key) == 'None': + result = getattr(self.js_plugin_manager, method_name)( + plugin_name, music_item, **kwargs + ) + if ( + not result + or not result.get(result_key) + or result.get(result_key) == "None" + ): return {"success": False, "error": f"Failed to get {result_key}"} # 如果指定了必填字段,则额外校验 if required_field and not result.get(required_field): - return {"success": False, "error": f"Missing required field: {required_field}"} + return { + "success": False, + "error": f"Missing required field: {required_field}", + } # 追加属性后返回 result["success"] = True return result @@ -561,7 +579,7 @@ class XiaoMusic: picture = tags["picture"] if picture: if picture.startswith(self.config.picture_cache_path): - picture = picture[len(self.config.picture_cache_path):] + picture = picture[len(self.config.picture_cache_path) :] picture = picture.replace("\\", "/") if picture.startswith("/"): picture = picture[1:] @@ -712,7 +730,7 @@ class XiaoMusic: # 处理文件路径 if filename.startswith(self.config.music_path): - filename = filename[len(self.config.music_path):] + filename = filename[len(self.config.music_path) :] filename = filename.replace("\\", "/") if filename.startswith("/"): filename = filename[1:] @@ -802,7 +820,7 @@ class XiaoMusic: # TODO: 网络歌曲获取歌曲额外信息 pass elif os.path.exists(file_or_url) and not_in_dirs( - file_or_url, ignore_tag_absolute_dirs + file_or_url, ignore_tag_absolute_dirs ): all_music_tags[name] = extract_audio_metadata( file_or_url, self.config.picture_cache_path @@ -839,7 +857,7 @@ class XiaoMusic: if dir_name == os.path.basename(self.music_path): dir_name = "其他" if self.music_path != self.download_path and dir_name == os.path.basename( - self.download_path + self.download_path ): dir_name = "下载" if dir_name not in all_music_by_dir: @@ -1013,7 +1031,7 @@ class XiaoMusic: self.start_file_watch() analytics_task = asyncio.create_task(self.analytics_task_daily()) assert ( - analytics_task is not None + analytics_task is not None ) # to keep the reference to task, do not remove this async with ClientSession() as session: self.session = session @@ -1126,11 +1144,11 @@ class XiaoMusic: opvalue = self.config.key_word_dict.get(opkey) if ( - (not ctrl_panel) - and (not self.isplaying(did)) - and self.active_cmd - and (opvalue not in self.active_cmd) - and (opkey not in self.active_cmd) + (not ctrl_panel) + and (not self.isplaying(did)) + and self.active_cmd + and (opvalue not in self.active_cmd) + and (opkey not in self.active_cmd) ): self.log.info(f"不在激活命令中 {opvalue}") continue @@ -1241,7 +1259,9 @@ class XiaoMusic: # ===========================MusicFree插件函数================================ # 在线获取歌曲列表 - async def get_music_list_online(self, plugin="all", keyword="", page=1, limit=20, **kwargs): + async def get_music_list_online( + self, plugin="all", keyword="", page=1, limit=20, **kwargs + ): self.log.info("在线获取歌曲列表!") """ 在线获取歌曲列表 @@ -1256,9 +1276,14 @@ class XiaoMusic: dict: 搜索结果 """ openapi_info = self.js_plugin_manager.get_openapi_info() - if openapi_info.get("enabled", False) and openapi_info.get("search_url", "") != "": + if ( + openapi_info.get("enabled", False) + and openapi_info.get("search_url", "") != "" + ): # 开放接口获取 - return await self.js_plugin_manager.openapi_search(openapi_info.get("search_url"), keyword) + return await self.js_plugin_manager.openapi_search( + openapi_info.get("search_url"), keyword + ) else: if not self.js_plugin_manager: return {"success": False, "error": "JS Plugin Manager not available"} @@ -1284,33 +1309,30 @@ class XiaoMusic: # 验证URL格式 parsed_url = urlparse(url) if not parsed_url.scheme or not parsed_url.netloc: - return { - "success": False, - "url": url, - "error": "Invalid URL format" - } + return {"success": False, "url": url, "error": "Invalid URL format"} # 创建aiohttp客户端会话 async with aiohttp.ClientSession() as session: # 发送HEAD请求跟随重定向 - async with session.head(url, allow_redirects=True, - timeout=aiohttp.ClientTimeout(total=timeout)) as response: + async with session.head( + url, + allow_redirects=True, + timeout=aiohttp.ClientTimeout(total=timeout), + ) as response: # 获取最终重定向后的URL final_url = str(response.url) return { "success": True, "url": final_url, - "statusCode": response.status + "statusCode": response.status, } except Exception as e: - return { - "success": False, - "url": url, - "error": f"Error occurred: {str(e)}" - } + return {"success": False, "url": url, "error": f"Error occurred: {str(e)}"} # 调用MusicFree插件获取歌曲列表 - async def get_music_list_mf(self, plugin="all", keyword="", page=1, limit=20, **kwargs): + async def get_music_list_mf( + self, plugin="all", keyword="", page=1, limit=20, **kwargs + ): self.log.info("通过MusicFree插件搜索音乐列表!") """ 通过MusicFree插件搜索音乐列表 @@ -1329,8 +1351,8 @@ class XiaoMusic: if not self.js_plugin_manager: return {"success": False, "error": "JS插件管理器不可用"} # 如果关键词包含 '-',则提取歌手名、歌名 - if '-' in keyword: - parts = keyword.split('-') + if "-" in keyword: + parts = keyword.split("-") keyword = parts[0] artist = parts[1] else: @@ -1341,7 +1363,9 @@ class XiaoMusic: return await self._search_all_plugins(keyword, artist, page, limit) else: # 搜索指定插件 - return await self._search_specific_plugin(plugin, keyword, artist, page, limit) + return await self._search_specific_plugin( + plugin, keyword, artist, page, limit + ) except Exception as e: self.log.error(f"搜索音乐时发生错误: {e}") return {"success": False, "error": str(e)} @@ -1380,7 +1404,9 @@ class XiaoMusic: if result and isinstance(result, dict): # 检查是否有错误信息 if "error" in result: - self.log.error(f"插件 {plugin_name} 搜索失败: {result.get('error', '未知错误')}") + self.log.error( + f"插件 {plugin_name} 搜索失败: {result.get('error', '未知错误')}" + ) continue # 处理成功的搜索结果 @@ -1400,9 +1426,9 @@ class XiaoMusic: unified_result, search_keyword=keyword, limit=limit, - search_artist=artist + search_artist=artist, ) - results = optimized_result.get('data', []) + results = optimized_result.get("data", []) return { "success": True, @@ -1410,7 +1436,7 @@ class XiaoMusic: "total": len(results), "sources": sources, "page": page, - "limit": limit + "limit": limit, } async def _search_specific_plugin(self, plugin, keyword, artist, page, limit): @@ -1419,22 +1445,19 @@ class XiaoMusic: results = self.js_plugin_manager.search(plugin, keyword, page, limit) # 额外检查 resources 字段 - data_list = results.get('data', []) + data_list = results.get("data", []) if data_list: # 优化搜索结果排序 results = self.js_plugin_manager.optimize_search_results( - results, - search_keyword=keyword, - limit=limit, - search_artist=artist + results, search_keyword=keyword, limit=limit, search_artist=artist ) return { "success": True, - "data": results.get('data', []), - "total": results.get('total', 0), + "data": results.get("data", []), + "total": results.get("total", 0), "page": page, - "limit": limit + "limit": limit, } except Exception as e: self.log.error(f"插件 {plugin} 搜索失败: {e}") @@ -1449,7 +1472,7 @@ class XiaoMusic: raise e # 调用MusicFree插件获取真实播放url - async def get_media_source_url(self, music_item, quality: str = 'standard'): + async def get_media_source_url(self, music_item, quality: str = "standard"): """获取音乐项的媒体源URL Args: music_item : MusicFree插件定义的 IMusicItem @@ -1458,14 +1481,14 @@ class XiaoMusic: dict: 包含成功状态和URL信息的字典 """ # kwargs可追加 - kwargs = {'quality': quality} + kwargs = {"quality": quality} return await self.__call_plugin_method( - plugin_name=music_item.get('platform'), + plugin_name=music_item.get("platform"), method_name="get_media_source", music_item=music_item, result_key="url", required_field="url", - **kwargs + **kwargs, ) # 调用MusicFree插件获取歌词 @@ -1477,11 +1500,11 @@ class XiaoMusic: dict: 包含成功状态和URL信息的字典 """ return await self.__call_plugin_method( - plugin_name=music_item.get('platform'), + plugin_name=music_item.get("platform"), method_name="get_lyric", music_item=music_item, result_key="rawLrc", - required_field="rawLrc" + required_field="rawLrc", ) # 调用在线搜索歌曲,并优化返回 @@ -1500,26 +1523,30 @@ class XiaoMusic: result = await self.get_music_list_online(keyword=name, limit=10) self.log.info(f"在线搜索歌曲列表: {result}") - if result.get('success') and result.get('total') > 0: + if result.get("success") and result.get("total") > 0: # 打印输出 result.data self.log.info(f"歌曲列表: {result.get('data')}") # 根据搜素关键字,智能搜索出最符合的一条music_item - music_item = await self._search_top_one(result.get('data'), search_key, name) + music_item = await self._search_top_one( + result.get("data"), search_key, name + ) # 验证 music_item 是否为字典类型 if not isinstance(music_item, dict): - self.log.error(f"music_item should be a dict, but got {type(music_item)}: {music_item}") + self.log.error( + f"music_item should be a dict, but got {type(music_item)}: {music_item}" + ) return {"success": False, "error": "Invalid music item format"} # 如果是OpenAPI,则需要转换播放链接 openapi_info = self.js_plugin_manager.get_openapi_info() if openapi_info.get("enabled", False): - return await self.get_real_url_of_openapi(music_item.get('url')) + return await self.get_real_url_of_openapi(music_item.get("url")) else: media_source = await self.get_media_source_url(music_item) - if media_source.get('success'): - return {"success": True, "url": media_source.get('url')} + if media_source.get("success"): + return {"success": True, "url": media_source.get("url")} else: - return {"success": False, "error": media_source.get('error')} + return {"success": False, "error": media_source.get("error")} else: return {"success": False, "error": "未找到歌曲"} @@ -1543,8 +1570,8 @@ class XiaoMusic: # 计算每个项目的匹配分数 def calculate_match_score(item): """计算匹配分数""" - title = item.get('title', '').lower() if item.get('title') else '' - artist = item.get('artist', '').lower() if item.get('artist') else '' + title = item.get("title", "").lower() if item.get("title") else "" + artist = item.get("artist", "").lower() if item.get("artist") else "" keyword = search_key.lower() if not keyword: @@ -1692,7 +1719,7 @@ class XiaoMusic: async def online_play(self, did="", arg1="", **kwargs): # 先推送默认【搜索中】音频,搜索到播放url后推送给小爱 config = self.config - if config and hasattr(config, 'hostname') and hasattr(config, 'public_port'): + if config and hasattr(config, "hostname") and hasattr(config, "public_port"): proxy_base = f"{config.hostname}:{config.public_port}" else: proxy_base = "http://192.168.31.241:8090" @@ -1718,7 +1745,7 @@ class XiaoMusic: # 后台搜索播放 async def do_play( - self, did, name, search_key="", exact=False, update_cur_list=False + self, did, name, search_key="", exact=False, update_cur_list=False ): return await self.devices[did].play(name, search_key, exact, update_cur_list) @@ -2114,7 +2141,7 @@ class XiaoMusicDevice: # 更新总播放列表,为了UI显示 self.xiaomusic.music_list["临时搜索列表"] = copy.copy(self._play_list) elif ( - self.device.cur_playlist == "临时搜索列表" and len(self._play_list) == 0 + self.device.cur_playlist == "临时搜索列表" and len(self._play_list) == 0 ) or (self.device.cur_playlist not in self.xiaomusic.music_list): self.device.cur_playlist = "全部" else: @@ -2205,13 +2232,13 @@ class XiaoMusicDevice: self.log.info("开始播放下一首") name = self.get_cur_music() if ( - self.device.play_type == PLAY_TYPE_ALL - or self.device.play_type == PLAY_TYPE_RND - or self.device.play_type == PLAY_TYPE_SEQ - or name == "" - or ( + self.device.play_type == PLAY_TYPE_ALL + or self.device.play_type == PLAY_TYPE_RND + or self.device.play_type == PLAY_TYPE_SEQ + or name == "" + or ( (name not in self._play_list) and self.device.play_type != PLAY_TYPE_ONE - ) + ) ): name = self.get_next_music() self.log.info(f"_play_next. name:{name}, cur_music:{self.get_cur_music()}") @@ -2228,10 +2255,10 @@ class XiaoMusicDevice: self.log.info("开始播放上一首") name = self.get_cur_music() if ( - self.device.play_type == PLAY_TYPE_ALL - or self.device.play_type == PLAY_TYPE_RND - or name == "" - or (name not in self._play_list) + self.device.play_type == PLAY_TYPE_ALL + or self.device.play_type == PLAY_TYPE_RND + or name == "" + or (name not in self._play_list) ): name = self.get_prev_music() self.log.info(f"_play_prev. name:{name}, cur_music:{self.get_cur_music()}") @@ -2304,9 +2331,9 @@ class XiaoMusicDevice: self.log.info(f"播放 {name} 失败. 失败次数: {self._play_failed_cnt}") await asyncio.sleep(1) if ( - self.isplaying() - and self._last_cmd != "stop" - and self._play_failed_cnt < 10 + self.isplaying() + and self._last_cmd != "stop" + and self._play_failed_cnt < 10 ): self._play_failed_cnt = self._play_failed_cnt + 1 await self._play_next() @@ -2360,8 +2387,8 @@ class XiaoMusicDevice: self.log.info(playing_info) # WTF xiaomi api is_playing = ( - json.loads(playing_info.get("data", {}).get("info", "{}")).get("status", -1) - == 1 + json.loads(playing_info.get("data", {}).get("info", "{}")).get("status", -1) + == 1 ) return is_playing @@ -2480,8 +2507,8 @@ class XiaoMusicDevice: if direction == "next": new_index = index + 1 if ( - self.device.play_type == PLAY_TYPE_SEQ - and new_index >= play_list_len + self.device.play_type == PLAY_TYPE_SEQ + and new_index >= play_list_len ): self.log.info("顺序播放结束") return "" @@ -2568,7 +2595,7 @@ class XiaoMusicDevice: f"play_one_url continue_play device_id:{device_id} ret:{ret} url:{url} audio_id:{audio_id}" ) elif self.config.use_music_api or ( - self.hardware in NEED_USE_PLAY_MUSIC_API + self.hardware in NEED_USE_PLAY_MUSIC_API ): ret = await self.xiaomusic.mina_service.play_by_music_url( device_id, url, audio_id=audio_id @@ -2777,7 +2804,7 @@ class XiaoMusicDevice: return "最近新增" for list_name, play_list in self.xiaomusic.music_list.items(): if (list_name not in ["全部", "所有歌曲", "所有电台", "临时搜索列表"]) and ( - name in play_list + name in play_list ): return list_name if name in self.xiaomusic.music_list.get("所有歌曲", []):