1
0
mirror of https://github.com/hanxi/xiaomusic.git synced 2026-06-01 12:15:48 +08:00

feat: 完善 LX Server 播放链接解析流程 (#886)

* fix: 修复 Python 3.10 环境下 asyncio.timeout 导致的 AttributeError

* feat: 完善 LX Server 播放链接解析流程

- 支持 LX Server 服务端缓存检查,命中缓存时直接返回播放地址
- 播放链接解析失败时支持按可用音质自动降级
- 音质降级耗尽后支持跨平台自动换源匹配同歌
This commit is contained in:
ALITTLESEEDX
2026-05-26 00:27:57 +08:00
committed by GitHub
parent 6c40909155
commit 4b60e73a0b
2 changed files with 580 additions and 11 deletions

View File

@@ -1434,6 +1434,7 @@ class JSPluginManager:
url: str,
song_info: dict[str, Any],
quality: str = "320k",
req_id: str | None = None,
lx_server_info: dict[str, Any] | None = None,
):
"""直接调用LX Server接口获取音乐URL
@@ -1442,6 +1443,7 @@ class JSPluginManager:
url (str): 在线搜索接口地址
song_info (dict[str, Any]): 歌曲信息
quality (str): 音质默认为320k
req_id (str): SSE进度订阅请求ID
lx_server_info (dict): LX Server 配置信息
Returns:
@@ -1451,6 +1453,12 @@ class JSPluginManager:
headers = (
self._build_lx_server_headers(lx_server_info) if lx_server_info else None
)
if headers:
headers = dict(headers)
else:
headers = {}
if req_id:
headers["x-req-id"] = req_id
result = await self._http_request(
"POST", url, json_data=json_data, headers=headers
)
@@ -1469,6 +1477,150 @@ class JSPluginManager:
}
return raw_data
def _build_lx_server_cache_check_params(
self,
song_info: dict[str, Any],
quality: str = "320k",
exact_quality: bool = False,
) -> dict[str, str]:
if not isinstance(song_info, dict):
song_info = {}
meta = song_info.get("meta")
if not isinstance(meta, dict):
meta = {}
songmid = (
song_info.get("songmid")
or meta.get("songmid")
or meta.get("songId")
or ""
)
song_id = song_info.get("songId") or meta.get("songId") or song_info.get("id")
params = {
"name": str(song_info.get("name") or ""),
"singer": str(song_info.get("singer") or ""),
"source": str(song_info.get("source") or ""),
"songmid": str(songmid or ""),
"songId": str(song_id or songmid or ""),
"quality": str(quality or ""),
}
if exact_quality:
params["exactQuality"] = "1"
return params
async def lx_server_music_cache_check(
self,
url: str,
song_info: dict[str, Any],
quality: str = "320k",
exact_quality: bool = False,
lx_server_info: dict[str, Any] | None = None,
):
"""检查 LX Server 是否已有服务端音乐缓存。"""
params = self._build_lx_server_cache_check_params(
song_info=song_info,
quality=quality,
exact_quality=exact_quality,
)
headers = (
self._build_lx_server_headers(lx_server_info) if lx_server_info else None
)
result = await self._http_request("GET", url, params=params, headers=headers)
if not result["success"]:
self.log.warning(f"LX Server缓存检查失败: {result['error']}")
return {"exists": False, "success": False, "error": result["error"]}
raw_data = result["data"]
self.log.info(f"LX Server缓存检查返回原始Json: {raw_data}")
if isinstance(raw_data, dict):
return raw_data
return {
"exists": False,
"success": False,
"error": f"API request failed: {raw_data}",
}
async def lx_server_music_progress(
self,
url: str,
req_id: str,
lx_server_info: dict[str, Any] | None = None,
timeout: int = 15,
):
"""订阅 LX Server 音乐解析进度,直到收到 status=success。"""
import aiohttp
headers = (
self._build_lx_server_headers(lx_server_info) if lx_server_info else None
)
connector = aiohttp.TCPConnector(ssl=False)
client_timeout = aiohttp.ClientTimeout(total=timeout)
attempts = []
last_error = ""
try:
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(
url,
params={"reqId": req_id},
timeout=client_timeout,
headers=headers,
) as response:
response.raise_for_status()
async for raw_line in response.content:
line = raw_line.decode("utf-8", errors="ignore").strip()
if not line or not line.startswith("data:"):
continue
payload = line.removeprefix("data:").strip()
try:
attempt = json.loads(payload)
except json.JSONDecodeError:
self.log.warning(
f"LX Server解析进度返回非JSON数据: {payload}"
)
continue
attempts.append(attempt)
status = attempt.get("status")
if status == "success":
return {
"success": True,
"data": attempt,
"attempts": attempts,
}
if status in ("fail", "failed", "error"):
last_error = (
attempt.get("message")
or attempt.get("error")
or "LX Server解析音乐直链失败"
)
except aiohttp.ClientResponseError as e:
self.log.error(f"LX Server解析进度HTTP错误: {e.status} {e.message}")
return {
"success": False,
"error": f"HTTP {e.status}: {e.message}",
"attempts": attempts,
}
except asyncio.TimeoutError:
self.log.error(f"LX Server解析进度等待超时: {url}")
return {
"success": False,
"error": last_error or "等待LX Server解析进度超时",
"attempts": attempts,
}
except Exception as e:
self.log.error(f"LX Server解析进度请求失败: {e}")
return {"success": False, "error": str(e), "attempts": attempts}
return {
"success": False,
"error": last_error or "未收到LX Server解析成功进度",
"attempts": attempts,
}
async def lx_server_music_lyric(
self,
url: str,
@@ -1491,7 +1643,7 @@ class JSPluginManager:
params = {
k: v
for k, v in params.items()
if v is not None and not (isinstance(v, (dict, list)) and not v)
if v is not None and not (isinstance(v, dict | list) and not v)
}
self.log.info(f"LX Server 歌词接口请求参数:{params}")

View File

@@ -8,12 +8,15 @@ import base64
import ipaddress
import json
import socket
from urllib.parse import urlparse
import uuid
from urllib.parse import urljoin, urlparse
import aiohttp
from xiaomusic.const import PLAY_TYPE_ALL
LX_QUALITY_PRIORITY = ["master", "flac24bit", "flac", "320k", "192k", "128k"]
def _build_keyword(song_name, artist):
"""
@@ -213,21 +216,434 @@ class OnlineMusicService:
else:
return {"success": False, "error": "LX Server 接口未配置!"}
async def _execute_lx_server_music_url(self, song_info):
async def _execute_lx_server_music_url(self, song_info, quality=None):
"""执行LX Server获取音乐播放直链"""
lx_server_info = self.js_plugin_manager.get_lx_server_info()
if lx_server_info.get("base_url", "") != "":
# LX Server接口获取
result_data = await self.js_plugin_manager.lx_server_music_url(
url=lx_server_info.get("base_url") + "/music/url",
song_info=song_info,
lx_server_info=lx_server_info,
)
else:
base_url = lx_server_info.get("base_url", "")
if base_url == "":
return {"success": False, "error": "LX Server接口未配置"}
preferred_quality = self._get_lx_server_music_quality(song_info, quality)
quality = self._get_lx_server_best_quality(song_info, preferred_quality)
return await self._resolve_lx_server_music_url(
lx_server_info=lx_server_info,
base_url=base_url,
song_info=song_info,
quality=quality,
preferred_quality=preferred_quality,
allow_auto_switch=True,
is_retry=False,
)
async def _resolve_lx_server_music_url(
self,
lx_server_info,
base_url,
song_info,
quality,
preferred_quality,
allow_auto_switch=True,
is_retry=False,
):
"""解析 LX Server 播放链接:失败降级,最后尝试换源。"""
result_data = await self._fetch_lx_server_music_url_once(
lx_server_info=lx_server_info,
base_url=base_url,
song_info=song_info,
quality=quality,
exact_quality=is_retry,
)
if result_data.get("url") and not result_data.get("errorMsg"):
return result_data
error_msg = self._get_lx_server_result_error(result_data)
if not self._is_lx_platform_not_supported_error(error_msg):
next_quality = self._get_lx_server_next_lower_quality(quality, song_info)
if next_quality:
self.log.info(
"LX Server解析失败尝试降低音质: "
f"{self._get_lx_quality_display_name(quality)} -> "
f"{self._get_lx_quality_display_name(next_quality)}"
)
return await self._resolve_lx_server_music_url(
lx_server_info=lx_server_info,
base_url=base_url,
song_info=song_info,
quality=next_quality,
preferred_quality=preferred_quality,
allow_auto_switch=allow_auto_switch,
is_retry=True,
)
if allow_auto_switch:
song_name = self._get_lx_song_name(song_info)
self.log.info(f"LX Server原始源解析失败准备自动尝试换源: {song_name}")
matched_song = await self._find_lx_server_other_source_match(
lx_server_info=lx_server_info,
base_url=base_url,
song_info=song_info,
)
if matched_song:
matched_quality = self._get_lx_server_best_quality(
matched_song, preferred_quality
)
self.log.info(
"LX Server找到备选源尝试播放: "
f"{self._get_lx_song_source(matched_song)} "
f"({matched_quality})"
)
return await self._fetch_lx_server_music_url_once(
lx_server_info=lx_server_info,
base_url=base_url,
song_info=matched_song,
quality=matched_quality,
exact_quality=True,
)
return result_data
async def _fetch_lx_server_music_url_once(
self,
lx_server_info,
base_url,
song_info,
quality,
exact_quality=False,
):
"""执行一次 LX Server 直链解析请求。"""
cache_result = await self.js_plugin_manager.lx_server_music_cache_check(
url=base_url + "/music/cache/check",
song_info=song_info,
quality=quality,
exact_quality=exact_quality,
lx_server_info=lx_server_info,
)
cache_hit = (
cache_result.get("exists")
and cache_result.get("url")
and not cache_result.get("isCollision")
)
self._log_lx_server_cache_check(song_info, quality, cache_result, cache_hit)
if cache_hit:
cache_result = dict(cache_result)
cache_result["url"] = self._normalize_lx_server_url(
base_url, cache_result["url"]
)
return cache_result
req_id = uuid.uuid4().hex
progress_task = asyncio.create_task(
self.js_plugin_manager.lx_server_music_progress(
url=base_url + "/music/progress",
req_id=req_id,
lx_server_info=lx_server_info,
)
)
try:
# 先让 SSE 连接注册到 LX Server再用同一个 reqId 请求解析直链。
await asyncio.sleep(0.05)
result_data = await self.js_plugin_manager.lx_server_music_url(
url=base_url + "/music/url",
song_info=song_info,
quality=quality,
req_id=req_id,
lx_server_info=lx_server_info,
)
if not result_data.get("url"):
return result_data
return result_data
finally:
if not progress_task.done():
progress_task.cancel()
try:
await progress_task
except asyncio.CancelledError:
pass
except Exception:
pass
def _get_lx_server_music_quality(self, song_info, quality=None):
"""获取 LX Server 播放直链请求音质,保持旧逻辑默认 320k。"""
if quality and quality != "standard":
return quality
if not isinstance(song_info, dict):
return "320k"
return song_info.get("quality") or song_info.get("type") or "320k"
def _get_lx_server_best_quality(self, song_info, user_preference="320k"):
"""选择歌曲实际可用音质。"""
if not song_info:
return "128k"
available_qualities = self._get_lx_server_available_qualities(song_info)
if not available_qualities:
self.log.warning("LX Server歌曲无音质信息使用默认 128k")
return "128k"
start_index = LX_QUALITY_PRIORITY.index(user_preference) if user_preference in LX_QUALITY_PRIORITY else -1
if start_index == -1:
self.log.warning(f"LX Server无效的音质偏好: {user_preference}")
return available_qualities[0] or "128k"
for quality in LX_QUALITY_PRIORITY[start_index:]:
if quality in available_qualities:
self.log.info(f"LX Server选择音质: {quality} (偏好: {user_preference})")
return quality
self.log.warning(f"LX Server无匹配音质使用第一个可用: {available_qualities[0]}")
return available_qualities[0] or "128k"
def _get_lx_server_next_lower_quality(self, current_quality, song_info):
"""获取下一级可用音质。"""
if current_quality not in LX_QUALITY_PRIORITY:
return None
available_qualities = self._get_lx_server_available_qualities(song_info)
start_index = LX_QUALITY_PRIORITY.index(current_quality) + 1
for quality in LX_QUALITY_PRIORITY[start_index:]:
if not available_qualities or quality in available_qualities:
return quality
return None
def _get_lx_server_available_qualities(self, song_info):
"""读取歌曲可用音质。"""
if not isinstance(song_info, dict):
return ["128k"]
meta = song_info.get("meta")
if not isinstance(meta, dict):
meta = {}
types = (
song_info.get("types")
or song_info.get("_types")
or song_info.get("qualitys")
or song_info.get("_qualitys")
or meta.get("qualitys")
or meta.get("_qualitys")
or meta.get("types")
or meta.get("_types")
or {}
)
if isinstance(types, list):
available = []
for item in types:
quality = item.get("type") if isinstance(item, dict) else item
if quality:
available.append(quality)
return available
if isinstance(types, dict):
return [quality for quality, enabled in types.items() if enabled]
return []
def _get_lx_quality_display_name(self, quality):
names = {
"master": "Master",
"flac24bit": "Hi-Res",
"flac": "SQ 无损",
"320k": "HQ 高品质",
"192k": "标准",
"128k": "标准",
}
return names.get(quality, str(quality).upper())
def _get_lx_server_result_error(self, result_data):
if not isinstance(result_data, dict):
return str(result_data)
return (
result_data.get("errorMsg")
or result_data.get("error")
or result_data.get("message")
or "服务器未返回播放链接"
)
def _is_lx_platform_not_supported_error(self, error_msg):
return bool(
error_msg
and ("未找到支持" in error_msg or "not supported" in error_msg)
)
async def _find_lx_server_other_source_match(
self,
lx_server_info,
base_url,
song_info,
):
"""跨平台寻找同歌。"""
name = self._get_lx_song_name(song_info)
singer = self._get_lx_song_singer(song_info)
if not name or not singer:
return None
query = f"{name} {singer}"
original_source = self._get_lx_song_source(song_info)
search_sources = self._get_lx_server_auto_switch_sources(
lx_server_info, original_source
)
if not search_sources:
self.log.info("LX Server自动换源未配置可用平台")
return None
search_tasks = [
self.js_plugin_manager.lx_server_search(
url=base_url + "/music/search",
keyword=query,
artist="",
page=1,
limit=20,
source=source,
lx_server_info=lx_server_info,
)
for source in search_sources
]
search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
for result in search_results:
if isinstance(result, Exception):
self.log.warning(f"LX Server自动换源搜索失败: {result}")
continue
if not isinstance(result, dict):
continue
for item in result.get("data", []):
candidate = self._normalize_lx_search_candidate(item)
if self._is_lx_same_song_match(song_info, candidate):
duration_diff = abs(
self._time_to_seconds(self._get_lx_song_interval(song_info))
- self._time_to_seconds(self._get_lx_song_interval(candidate))
)
self.log.info(
"LX Server自动换源匹配成功: "
f"{self._get_lx_song_name(candidate)} via "
f"{self._get_lx_song_source(candidate)} "
f"(时长误差: {duration_diff}s)"
)
return candidate
self.log.info("LX Server自动换源未找到合适匹配结果")
return None
def _get_lx_server_auto_switch_sources(self, lx_server_info, original_source):
if not isinstance(lx_server_info, dict):
return []
platforms = lx_server_info.get("platforms")
if not isinstance(platforms, dict):
return []
return [
str(source)
for source in platforms
if source and str(source) != original_source
]
def _normalize_lx_search_candidate(self, item):
"""兼容 lx_server_search 转换后的结构和 LX 原始歌曲结构。"""
if not isinstance(item, dict):
return {}
raw = item.get("_raw")
if isinstance(raw, dict):
candidate = dict(raw)
else:
candidate = dict(item)
if not candidate.get("name") and item.get("title"):
candidate["name"] = item["title"]
if not candidate.get("singer") and item.get("artist"):
candidate["singer"] = item["artist"]
if not candidate.get("interval") and item.get("duration"):
candidate["interval"] = item["duration"]
if not candidate.get("source"):
candidate["source"] = item.get("source") or item.get("platform", "")
return candidate
def _is_lx_same_song_match(self, target_song, candidate):
target_duration = self._time_to_seconds(self._get_lx_song_interval(target_song))
candidate_duration = self._time_to_seconds(
self._get_lx_song_interval(candidate)
)
if abs(target_duration - candidate_duration) > 5:
return False
target_name = self._get_lx_song_name(target_song).lower().strip()
candidate_name = self._get_lx_song_name(candidate).lower().strip()
if not target_name or not candidate_name:
return False
if target_name not in candidate_name and candidate_name not in target_name:
return False
target_singer = self._get_lx_song_singer(target_song).lower()
candidate_singer = self._get_lx_song_singer(candidate).lower()
if target_singer and candidate_singer:
return (
target_singer in candidate_singer
or candidate_singer in target_singer
)
return True
def _time_to_seconds(self, time_str):
if not isinstance(time_str, str) or ":" not in time_str:
return 0
parts = time_str.split(":")
if len(parts) != 2:
return 0
try:
return int(parts[0]) * 60 + int(parts[1])
except ValueError:
return 0
def _get_lx_song_name(self, song_info):
if not isinstance(song_info, dict):
return ""
return str(song_info.get("name") or song_info.get("title") or "")
def _get_lx_song_singer(self, song_info):
if not isinstance(song_info, dict):
return ""
return str(song_info.get("singer") or song_info.get("artist") or "")
def _get_lx_song_source(self, song_info):
if not isinstance(song_info, dict):
return ""
return str(song_info.get("source") or song_info.get("platform") or "")
def _get_lx_song_interval(self, song_info):
if not isinstance(song_info, dict):
return ""
return str(song_info.get("interval") or song_info.get("duration") or "")
def _normalize_lx_server_url(self, base_url, url):
"""将 LX Server 返回的相对路径转成可直接播放的完整 URL。"""
if not isinstance(url, str) or not url:
return url
parsed_url = urlparse(url)
if parsed_url.scheme and parsed_url.netloc:
return url
return urljoin(base_url, url)
def _log_lx_server_cache_check(self, song_info, quality, cache_result, cache_hit):
"""打印 LX Server 缓存检查结果。"""
if not isinstance(song_info, dict):
song_info = {}
song_name = song_info.get("name") or song_info.get("title") or "未知歌曲"
singer = song_info.get("singer") or song_info.get("artist") or "未知歌手"
if cache_hit:
self.log.info(
"LX Server缓存检查: "
f"{song_name} - {singer} ({cache_result.get('quality') or quality}) "
f"命中缓存: 是, "
f"filename: {cache_result.get('filename', '')}, "
f"foundIn: {cache_result.get('foundIn', '')}, "
f"folder: {cache_result.get('folder', '')}"
)
else:
self.log.info(
"LX Server缓存检查: "
f"{song_name} - {singer} ({quality}) 命中缓存: 否"
)
async def _execute_lx_server_music_lyric(self, song_info):
"""执行LX Server获取音乐歌词"""
lx_server_info = self.js_plugin_manager.get_lx_server_info()
@@ -1005,6 +1421,7 @@ class OnlineMusicService:
# LX Server在线搜索
return await self._execute_lx_server_music_url(
song_info=music_item.get("_raw"),
quality=quality,
)
else:
# kwargs可追加