diff --git a/xiaomusic/js_plugin_manager.py b/xiaomusic/js_plugin_manager.py index ac6742e..bea71c8 100644 --- a/xiaomusic/js_plugin_manager.py +++ b/xiaomusic/js_plugin_manager.py @@ -1434,6 +1434,7 @@ class JSPluginManager: url: str, song_info: dict[str, Any], quality: str = "320k", + req_id: str | None = None, lx_server_info: dict[str, Any] | None = None, ): """直接调用LX Server接口获取音乐URL @@ -1442,6 +1443,7 @@ class JSPluginManager: url (str): 在线搜索接口地址 song_info (dict[str, Any]): 歌曲信息 quality (str): 音质,默认为320k + req_id (str): SSE进度订阅请求ID lx_server_info (dict): LX Server 配置信息 Returns: @@ -1451,6 +1453,12 @@ class JSPluginManager: headers = ( self._build_lx_server_headers(lx_server_info) if lx_server_info else None ) + if headers: + headers = dict(headers) + else: + headers = {} + if req_id: + headers["x-req-id"] = req_id result = await self._http_request( "POST", url, json_data=json_data, headers=headers ) @@ -1469,6 +1477,150 @@ class JSPluginManager: } return raw_data + def _build_lx_server_cache_check_params( + self, + song_info: dict[str, Any], + quality: str = "320k", + exact_quality: bool = False, + ) -> dict[str, str]: + if not isinstance(song_info, dict): + song_info = {} + meta = song_info.get("meta") + if not isinstance(meta, dict): + meta = {} + + songmid = ( + song_info.get("songmid") + or meta.get("songmid") + or meta.get("songId") + or "" + ) + song_id = song_info.get("songId") or meta.get("songId") or song_info.get("id") + + params = { + "name": str(song_info.get("name") or ""), + "singer": str(song_info.get("singer") or ""), + "source": str(song_info.get("source") or ""), + "songmid": str(songmid or ""), + "songId": str(song_id or songmid or ""), + "quality": str(quality or ""), + } + if exact_quality: + params["exactQuality"] = "1" + return params + + async def lx_server_music_cache_check( + self, + url: str, + song_info: dict[str, Any], + quality: str = "320k", + exact_quality: bool = False, + lx_server_info: dict[str, Any] | None = None, + ): + """检查 LX Server 是否已有服务端音乐缓存。""" + params = self._build_lx_server_cache_check_params( + song_info=song_info, + quality=quality, + exact_quality=exact_quality, + ) + headers = ( + self._build_lx_server_headers(lx_server_info) if lx_server_info else None + ) + result = await self._http_request("GET", url, params=params, headers=headers) + if not result["success"]: + self.log.warning(f"LX Server缓存检查失败: {result['error']}") + return {"exists": False, "success": False, "error": result["error"]} + + raw_data = result["data"] + self.log.info(f"LX Server缓存检查返回原始Json: {raw_data}") + if isinstance(raw_data, dict): + return raw_data + return { + "exists": False, + "success": False, + "error": f"API request failed: {raw_data}", + } + + async def lx_server_music_progress( + self, + url: str, + req_id: str, + lx_server_info: dict[str, Any] | None = None, + timeout: int = 15, + ): + """订阅 LX Server 音乐解析进度,直到收到 status=success。""" + import aiohttp + + headers = ( + self._build_lx_server_headers(lx_server_info) if lx_server_info else None + ) + connector = aiohttp.TCPConnector(ssl=False) + client_timeout = aiohttp.ClientTimeout(total=timeout) + attempts = [] + last_error = "" + + try: + async with aiohttp.ClientSession(connector=connector) as session: + async with session.get( + url, + params={"reqId": req_id}, + timeout=client_timeout, + headers=headers, + ) as response: + response.raise_for_status() + async for raw_line in response.content: + line = raw_line.decode("utf-8", errors="ignore").strip() + if not line or not line.startswith("data:"): + continue + + payload = line.removeprefix("data:").strip() + try: + attempt = json.loads(payload) + except json.JSONDecodeError: + self.log.warning( + f"LX Server解析进度返回非JSON数据: {payload}" + ) + continue + + attempts.append(attempt) + status = attempt.get("status") + if status == "success": + return { + "success": True, + "data": attempt, + "attempts": attempts, + } + if status in ("fail", "failed", "error"): + last_error = ( + attempt.get("message") + or attempt.get("error") + or "LX Server解析音乐直链失败" + ) + + except aiohttp.ClientResponseError as e: + self.log.error(f"LX Server解析进度HTTP错误: {e.status} {e.message}") + return { + "success": False, + "error": f"HTTP {e.status}: {e.message}", + "attempts": attempts, + } + except asyncio.TimeoutError: + self.log.error(f"LX Server解析进度等待超时: {url}") + return { + "success": False, + "error": last_error or "等待LX Server解析进度超时", + "attempts": attempts, + } + except Exception as e: + self.log.error(f"LX Server解析进度请求失败: {e}") + return {"success": False, "error": str(e), "attempts": attempts} + + return { + "success": False, + "error": last_error or "未收到LX Server解析成功进度", + "attempts": attempts, + } + async def lx_server_music_lyric( self, url: str, @@ -1491,7 +1643,7 @@ class JSPluginManager: params = { k: v for k, v in params.items() - if v is not None and not (isinstance(v, (dict, list)) and not v) + if v is not None and not (isinstance(v, dict | list) and not v) } self.log.info(f"LX Server 歌词接口请求参数:{params}") diff --git a/xiaomusic/online_music.py b/xiaomusic/online_music.py index 20dcd11..ccd5f12 100644 --- a/xiaomusic/online_music.py +++ b/xiaomusic/online_music.py @@ -8,12 +8,15 @@ import base64 import ipaddress import json import socket -from urllib.parse import urlparse +import uuid +from urllib.parse import urljoin, urlparse import aiohttp from xiaomusic.const import PLAY_TYPE_ALL +LX_QUALITY_PRIORITY = ["master", "flac24bit", "flac", "320k", "192k", "128k"] + def _build_keyword(song_name, artist): """ @@ -213,21 +216,434 @@ class OnlineMusicService: else: return {"success": False, "error": "LX Server 接口未配置!"} - async def _execute_lx_server_music_url(self, song_info): + async def _execute_lx_server_music_url(self, song_info, quality=None): """执行LX Server获取音乐播放直链""" lx_server_info = self.js_plugin_manager.get_lx_server_info() - if lx_server_info.get("base_url", "") != "": - # LX Server接口获取 - result_data = await self.js_plugin_manager.lx_server_music_url( - url=lx_server_info.get("base_url") + "/music/url", - song_info=song_info, - lx_server_info=lx_server_info, - ) - else: + base_url = lx_server_info.get("base_url", "") + if base_url == "": return {"success": False, "error": "LX Server接口未配置!"} + preferred_quality = self._get_lx_server_music_quality(song_info, quality) + quality = self._get_lx_server_best_quality(song_info, preferred_quality) + return await self._resolve_lx_server_music_url( + lx_server_info=lx_server_info, + base_url=base_url, + song_info=song_info, + quality=quality, + preferred_quality=preferred_quality, + allow_auto_switch=True, + is_retry=False, + ) + + async def _resolve_lx_server_music_url( + self, + lx_server_info, + base_url, + song_info, + quality, + preferred_quality, + allow_auto_switch=True, + is_retry=False, + ): + """解析 LX Server 播放链接:失败降级,最后尝试换源。""" + result_data = await self._fetch_lx_server_music_url_once( + lx_server_info=lx_server_info, + base_url=base_url, + song_info=song_info, + quality=quality, + exact_quality=is_retry, + ) + if result_data.get("url") and not result_data.get("errorMsg"): + return result_data + + error_msg = self._get_lx_server_result_error(result_data) + if not self._is_lx_platform_not_supported_error(error_msg): + next_quality = self._get_lx_server_next_lower_quality(quality, song_info) + if next_quality: + self.log.info( + "LX Server解析失败,尝试降低音质: " + f"{self._get_lx_quality_display_name(quality)} -> " + f"{self._get_lx_quality_display_name(next_quality)}" + ) + return await self._resolve_lx_server_music_url( + lx_server_info=lx_server_info, + base_url=base_url, + song_info=song_info, + quality=next_quality, + preferred_quality=preferred_quality, + allow_auto_switch=allow_auto_switch, + is_retry=True, + ) + + if allow_auto_switch: + song_name = self._get_lx_song_name(song_info) + self.log.info(f"LX Server原始源解析失败,准备自动尝试换源: {song_name}") + matched_song = await self._find_lx_server_other_source_match( + lx_server_info=lx_server_info, + base_url=base_url, + song_info=song_info, + ) + if matched_song: + matched_quality = self._get_lx_server_best_quality( + matched_song, preferred_quality + ) + self.log.info( + "LX Server找到备选源,尝试播放: " + f"{self._get_lx_song_source(matched_song)} " + f"({matched_quality})" + ) + return await self._fetch_lx_server_music_url_once( + lx_server_info=lx_server_info, + base_url=base_url, + song_info=matched_song, + quality=matched_quality, + exact_quality=True, + ) + return result_data + async def _fetch_lx_server_music_url_once( + self, + lx_server_info, + base_url, + song_info, + quality, + exact_quality=False, + ): + """执行一次 LX Server 直链解析请求。""" + cache_result = await self.js_plugin_manager.lx_server_music_cache_check( + url=base_url + "/music/cache/check", + song_info=song_info, + quality=quality, + exact_quality=exact_quality, + lx_server_info=lx_server_info, + ) + cache_hit = ( + cache_result.get("exists") + and cache_result.get("url") + and not cache_result.get("isCollision") + ) + self._log_lx_server_cache_check(song_info, quality, cache_result, cache_hit) + if cache_hit: + cache_result = dict(cache_result) + cache_result["url"] = self._normalize_lx_server_url( + base_url, cache_result["url"] + ) + return cache_result + + req_id = uuid.uuid4().hex + progress_task = asyncio.create_task( + self.js_plugin_manager.lx_server_music_progress( + url=base_url + "/music/progress", + req_id=req_id, + lx_server_info=lx_server_info, + ) + ) + + try: + # 先让 SSE 连接注册到 LX Server,再用同一个 reqId 请求解析直链。 + await asyncio.sleep(0.05) + result_data = await self.js_plugin_manager.lx_server_music_url( + url=base_url + "/music/url", + song_info=song_info, + quality=quality, + req_id=req_id, + lx_server_info=lx_server_info, + ) + if not result_data.get("url"): + return result_data + + return result_data + finally: + if not progress_task.done(): + progress_task.cancel() + try: + await progress_task + except asyncio.CancelledError: + pass + except Exception: + pass + + def _get_lx_server_music_quality(self, song_info, quality=None): + """获取 LX Server 播放直链请求音质,保持旧逻辑默认 320k。""" + if quality and quality != "standard": + return quality + if not isinstance(song_info, dict): + return "320k" + return song_info.get("quality") or song_info.get("type") or "320k" + + def _get_lx_server_best_quality(self, song_info, user_preference="320k"): + """选择歌曲实际可用音质。""" + if not song_info: + return "128k" + + available_qualities = self._get_lx_server_available_qualities(song_info) + if not available_qualities: + self.log.warning("LX Server歌曲无音质信息,使用默认 128k") + return "128k" + + start_index = LX_QUALITY_PRIORITY.index(user_preference) if user_preference in LX_QUALITY_PRIORITY else -1 + if start_index == -1: + self.log.warning(f"LX Server无效的音质偏好: {user_preference}") + return available_qualities[0] or "128k" + + for quality in LX_QUALITY_PRIORITY[start_index:]: + if quality in available_qualities: + self.log.info(f"LX Server选择音质: {quality} (偏好: {user_preference})") + return quality + + self.log.warning(f"LX Server无匹配音质,使用第一个可用: {available_qualities[0]}") + return available_qualities[0] or "128k" + + def _get_lx_server_next_lower_quality(self, current_quality, song_info): + """获取下一级可用音质。""" + if current_quality not in LX_QUALITY_PRIORITY: + return None + + available_qualities = self._get_lx_server_available_qualities(song_info) + start_index = LX_QUALITY_PRIORITY.index(current_quality) + 1 + for quality in LX_QUALITY_PRIORITY[start_index:]: + if not available_qualities or quality in available_qualities: + return quality + return None + + def _get_lx_server_available_qualities(self, song_info): + """读取歌曲可用音质。""" + if not isinstance(song_info, dict): + return ["128k"] + + meta = song_info.get("meta") + if not isinstance(meta, dict): + meta = {} + types = ( + song_info.get("types") + or song_info.get("_types") + or song_info.get("qualitys") + or song_info.get("_qualitys") + or meta.get("qualitys") + or meta.get("_qualitys") + or meta.get("types") + or meta.get("_types") + or {} + ) + + if isinstance(types, list): + available = [] + for item in types: + quality = item.get("type") if isinstance(item, dict) else item + if quality: + available.append(quality) + return available + if isinstance(types, dict): + return [quality for quality, enabled in types.items() if enabled] + return [] + + def _get_lx_quality_display_name(self, quality): + names = { + "master": "Master", + "flac24bit": "Hi-Res", + "flac": "SQ 无损", + "320k": "HQ 高品质", + "192k": "标准", + "128k": "标准", + } + return names.get(quality, str(quality).upper()) + + def _get_lx_server_result_error(self, result_data): + if not isinstance(result_data, dict): + return str(result_data) + return ( + result_data.get("errorMsg") + or result_data.get("error") + or result_data.get("message") + or "服务器未返回播放链接" + ) + + def _is_lx_platform_not_supported_error(self, error_msg): + return bool( + error_msg + and ("未找到支持" in error_msg or "not supported" in error_msg) + ) + + async def _find_lx_server_other_source_match( + self, + lx_server_info, + base_url, + song_info, + ): + """跨平台寻找同歌。""" + name = self._get_lx_song_name(song_info) + singer = self._get_lx_song_singer(song_info) + if not name or not singer: + return None + + query = f"{name} {singer}" + original_source = self._get_lx_song_source(song_info) + search_sources = self._get_lx_server_auto_switch_sources( + lx_server_info, original_source + ) + if not search_sources: + self.log.info("LX Server自动换源未配置可用平台") + return None + search_tasks = [ + self.js_plugin_manager.lx_server_search( + url=base_url + "/music/search", + keyword=query, + artist="", + page=1, + limit=20, + source=source, + lx_server_info=lx_server_info, + ) + for source in search_sources + ] + search_results = await asyncio.gather(*search_tasks, return_exceptions=True) + + for result in search_results: + if isinstance(result, Exception): + self.log.warning(f"LX Server自动换源搜索失败: {result}") + continue + if not isinstance(result, dict): + continue + for item in result.get("data", []): + candidate = self._normalize_lx_search_candidate(item) + if self._is_lx_same_song_match(song_info, candidate): + duration_diff = abs( + self._time_to_seconds(self._get_lx_song_interval(song_info)) + - self._time_to_seconds(self._get_lx_song_interval(candidate)) + ) + self.log.info( + "LX Server自动换源匹配成功: " + f"{self._get_lx_song_name(candidate)} via " + f"{self._get_lx_song_source(candidate)} " + f"(时长误差: {duration_diff}s)" + ) + return candidate + + self.log.info("LX Server自动换源未找到合适匹配结果") + return None + + def _get_lx_server_auto_switch_sources(self, lx_server_info, original_source): + if not isinstance(lx_server_info, dict): + return [] + + platforms = lx_server_info.get("platforms") + if not isinstance(platforms, dict): + return [] + + return [ + str(source) + for source in platforms + if source and str(source) != original_source + ] + + def _normalize_lx_search_candidate(self, item): + """兼容 lx_server_search 转换后的结构和 LX 原始歌曲结构。""" + if not isinstance(item, dict): + return {} + + raw = item.get("_raw") + if isinstance(raw, dict): + candidate = dict(raw) + else: + candidate = dict(item) + + if not candidate.get("name") and item.get("title"): + candidate["name"] = item["title"] + if not candidate.get("singer") and item.get("artist"): + candidate["singer"] = item["artist"] + if not candidate.get("interval") and item.get("duration"): + candidate["interval"] = item["duration"] + if not candidate.get("source"): + candidate["source"] = item.get("source") or item.get("platform", "") + return candidate + + def _is_lx_same_song_match(self, target_song, candidate): + target_duration = self._time_to_seconds(self._get_lx_song_interval(target_song)) + candidate_duration = self._time_to_seconds( + self._get_lx_song_interval(candidate) + ) + if abs(target_duration - candidate_duration) > 5: + return False + + target_name = self._get_lx_song_name(target_song).lower().strip() + candidate_name = self._get_lx_song_name(candidate).lower().strip() + if not target_name or not candidate_name: + return False + if target_name not in candidate_name and candidate_name not in target_name: + return False + + target_singer = self._get_lx_song_singer(target_song).lower() + candidate_singer = self._get_lx_song_singer(candidate).lower() + if target_singer and candidate_singer: + return ( + target_singer in candidate_singer + or candidate_singer in target_singer + ) + return True + + def _time_to_seconds(self, time_str): + if not isinstance(time_str, str) or ":" not in time_str: + return 0 + parts = time_str.split(":") + if len(parts) != 2: + return 0 + try: + return int(parts[0]) * 60 + int(parts[1]) + except ValueError: + return 0 + + def _get_lx_song_name(self, song_info): + if not isinstance(song_info, dict): + return "" + return str(song_info.get("name") or song_info.get("title") or "") + + def _get_lx_song_singer(self, song_info): + if not isinstance(song_info, dict): + return "" + return str(song_info.get("singer") or song_info.get("artist") or "") + + def _get_lx_song_source(self, song_info): + if not isinstance(song_info, dict): + return "" + return str(song_info.get("source") or song_info.get("platform") or "") + + def _get_lx_song_interval(self, song_info): + if not isinstance(song_info, dict): + return "" + return str(song_info.get("interval") or song_info.get("duration") or "") + + def _normalize_lx_server_url(self, base_url, url): + """将 LX Server 返回的相对路径转成可直接播放的完整 URL。""" + if not isinstance(url, str) or not url: + return url + parsed_url = urlparse(url) + if parsed_url.scheme and parsed_url.netloc: + return url + return urljoin(base_url, url) + + def _log_lx_server_cache_check(self, song_info, quality, cache_result, cache_hit): + """打印 LX Server 缓存检查结果。""" + if not isinstance(song_info, dict): + song_info = {} + song_name = song_info.get("name") or song_info.get("title") or "未知歌曲" + singer = song_info.get("singer") or song_info.get("artist") or "未知歌手" + + if cache_hit: + self.log.info( + "LX Server缓存检查: " + f"{song_name} - {singer} ({cache_result.get('quality') or quality}) " + f"命中缓存: 是, " + f"filename: {cache_result.get('filename', '')}, " + f"foundIn: {cache_result.get('foundIn', '')}, " + f"folder: {cache_result.get('folder', '')}" + ) + else: + self.log.info( + "LX Server缓存检查: " + f"{song_name} - {singer} ({quality}) 命中缓存: 否" + ) + async def _execute_lx_server_music_lyric(self, song_info): """执行LX Server获取音乐歌词""" lx_server_info = self.js_plugin_manager.get_lx_server_info() @@ -1005,6 +1421,7 @@ class OnlineMusicService: # LX Server在线搜索 return await self._execute_lx_server_music_url( song_info=music_item.get("_raw"), + quality=quality, ) else: # kwargs可追加