1
0
mirror of https://github.com/hanxi/xiaomusic.git synced 2025-12-06 14:52:50 +08:00

feat: 支持LX歌单

This commit is contained in:
涵曦
2025-09-08 19:20:35 +08:00
parent 2aae9ad0bc
commit 89cb0923ee
2 changed files with 248 additions and 45 deletions

View File

@@ -18,7 +18,9 @@ import shutil
import string
import subprocess
import tempfile
import time
import urllib.parse
from collections import OrderedDict
from collections.abc import AsyncIterator
from dataclasses import asdict, dataclass
from http.cookies import SimpleCookie
@@ -1308,3 +1310,140 @@ def chmoddir(dir_path: str):
log.info(f"Changed permissions of file: {item_path}")
except Exception as e:
log.info(f"chmoddir failed: {e}")
async def fetch_json_get(url, headers):
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
log.info(f"fetch_json_get: {url} success {data}")
# 可选:确保是 dict
if isinstance(data, dict):
return data
else:
log.warning(f"Expected dict, but got {type(data)}: {data}")
return {}
else:
log.error(f"HTTP Error: {response.status} {url}")
return {}
except aiohttp.ClientError as e:
log.error(f"ClientError fetching {url}: {e}")
return {}
except asyncio.TimeoutError:
log.error(f"Timeout fetching {url}")
return {}
except Exception as e:
log.error(f"Unexpected error fetching {url}: {e}")
return {}
class LRUCache(OrderedDict):
def __init__(self, max_size=1000):
super().__init__()
self.max_size = max_size
def __setitem__(self, key, value):
if key in self:
# 移动到末尾(最近使用)
self.move_to_end(key)
super().__setitem__(key, value)
# 如果超出大小限制,删除最早使用的项
if len(self) > self.max_size:
self.popitem(last=False)
def __getitem__(self, key):
# 访问时移动到末尾(最近使用)
if key in self:
self.move_to_end(key)
return super().__getitem__(key)
class MusicUrlCache:
def __init__(self, default_expire_days=1, max_size=1000):
self.cache = LRUCache(max_size)
self.default_expire_days = default_expire_days
self.log = logging.getLogger(__name__)
def get(self, url: str, headers: dict = None) -> str:
"""获取URL(优先从缓存获取,没有则请求API)
Args:
url: 原始URL
headers: API请求需要的headers
Returns:
str: 真实播放URL
"""
# 先查询缓存
cached_url = self._get_from_cache(url)
if cached_url:
self.log.info(f"Using cached url: {cached_url}")
return cached_url
# 缓存未命中,请求API
return self._fetch_from_api(url, headers)
def _get_from_cache(self, url: str) -> str:
"""从缓存中获取URL"""
try:
cached_url, expire_time = self.cache[url]
if time.time() > expire_time:
# 缓存过期,删除
del self.cache[url]
return ""
return cached_url
except KeyError:
return ""
def _fetch_from_api(self, url: str, headers: dict = None) -> str:
"""从API获取真实URL"""
data = fetch_json_get(url, headers or {})
if not isinstance(data, dict):
self.log.error(f"Invalid API response format: {data}")
return ""
real_url = data.get("url")
if not real_url:
self.log.error(f"No url in API response: {data}")
return ""
# 获取过期时间
expire_time = self._parse_expire_time(data)
# 缓存结果
self._set_cache(url, real_url, expire_time)
self.log.info(
f"Cached url, expire_time: {expire_time}, cache size: {len(self.cache)}"
)
return real_url
def _parse_expire_time(self, data: dict) -> float | None:
"""解析API返回的过期时间"""
try:
extra = data.get("extra", {})
expire_info = extra.get("expire", {})
if expire_info and expire_info.get("canExpire"):
expire_time = expire_info.get("time")
if expire_time:
return float(expire_time)
except Exception as e:
self.log.warning(f"Failed to parse expire time: {e}")
return None
def _set_cache(self, url: str, real_url: str, expire_time: float = None):
"""设置缓存"""
if expire_time is None:
expire_time = time.time() + (self.default_expire_days * 24 * 3600)
self.cache[url] = (real_url, expire_time)
def clear(self):
"""清空缓存"""
self.cache.clear()
@property
def size(self) -> int:
"""当前缓存大小"""
return len(self.cache)

View File

@@ -48,6 +48,7 @@ from xiaomusic.crontab import Crontab
from xiaomusic.plugin import PluginManager
from xiaomusic.utils import (
Metadata,
MusicUrlCache,
chinese_to_number,
chmodfile,
custom_sort_key,
@@ -82,9 +83,11 @@ class XiaoMusic:
self.miio_service = None
self.polling_event = asyncio.Event()
self.new_record_event = asyncio.Event()
self.url_cache = MusicUrlCache()
self.all_music = {}
self._all_radio = {} # 电台列表
self._web_music_api = {} # 需要通过api获取播放链接的列表
self.music_list = {} # 播放列表 key 为目录名, value 为 play_list
self.default_music_list_names = [] # 非自定义个歌单
self.devices = {} # key 为 did
@@ -466,35 +469,9 @@ class XiaoMusic:
url = self.all_music[name]
return url.startswith(("http://", "https://"))
# 获取歌曲播放时长,播放地址
async def get_music_sec_url(self, name):
sec = 0
url = self.get_music_url(name)
self.log.info(f"get_music_sec_url. name:{name} url:{url}")
if self.is_web_radio_music(name):
self.log.info("电台不会有播放时长")
return 0, url
if self.is_web_music(name):
origin_url = url
if self.config.web_music_proxy:
origin_url = self.all_music[name]
# 代理播放模式使用原始地址获取歌曲时长
duration, _ = await get_web_music_duration(origin_url, self.config)
else:
duration, url = await get_web_music_duration(origin_url, self.config)
sec = math.ceil(duration)
self.log.info(f"网络歌曲 {name} : {origin_url} {url} 的时长 {sec}")
else:
filename = self.get_filename(name)
self.log.info(f"get_music_sec_url. name:{name} filename:{filename}")
duration = await get_local_music_duration(filename, self.config)
sec = math.ceil(duration)
self.log.info(f"本地歌曲 {name} : {filename} {url} 的时长 {sec}")
if sec <= 0:
self.log.warning(f"获取歌曲时长失败 {name} {url}")
return sec, url
# 是否是需要通过api获取播放链接的网络歌曲
def is_need_use_play_music_api(self, name):
return name in self._web_music_api
def get_music_tags(self, name):
tags = copy.copy(self.all_music_tags.get(name, asdict(Metadata())))
@@ -535,21 +512,106 @@ class XiaoMusic:
self.try_save_tag_cache()
return "OK"
def get_music_url(self, name):
if self.is_web_music(name):
url = self.all_music[name]
self.log.info(f"get_music_url web music. name:{name}, url:{url}")
if self.config.web_music_proxy:
urlb64 = base64.b64encode(url.encode("utf-8")).decode("utf-8")
url = f"{self.hostname}:{self.public_port}/proxy?urlb64={urlb64}"
self.log.info(
f"get_music_url web music by proxy. name:{name}, url:{url}"
)
return url
async def get_music_sec_url(self, name):
"""获取歌曲播放时长和播放地址
Args:
name: 歌曲名称
Returns:
tuple: (播放时长(秒), 播放地址)
"""
url, origin_url = self.get_music_url(name)
self.log.info(f"get_music_sec_url. name:{name} url:{url}")
# 电台直接返回
if self.is_web_radio_music(name):
self.log.info("电台不会有播放时长")
return 0, url
# 获取播放时长
if self.is_web_music(name):
sec = await self._get_web_music_duration(name, url, origin_url)
else:
sec = await self._get_local_music_duration(name, url)
if sec <= 0:
self.log.warning(f"获取歌曲时长失败 {name} {url}")
return sec, url
async def _get_web_music_duration(self, name, url, origin_url):
"""获取网络音乐时长"""
if not origin_url:
origin_url = url if url else self.all_music[name]
if self.config.web_music_proxy:
# 代理模式使用原始地址获取时长
duration, _ = await get_web_music_duration(origin_url, self.config)
else:
duration, url = await get_web_music_duration(origin_url, self.config)
sec = math.ceil(duration)
self.log.info(f"网络歌曲 {name} : {origin_url} {url} 的时长 {sec}")
return sec
async def _get_local_music_duration(self, name, url):
"""获取本地音乐时长"""
filename = self.get_filename(name)
self.log.info(f"get_music_sec_url. name:{name} filename:{filename}")
duration = await get_local_music_duration(filename, self.config)
sec = math.ceil(duration)
self.log.info(f"本地歌曲 {name} : {filename} {url} 的时长 {sec}")
return sec
def get_music_url(self, name):
"""获取音乐播放地址
Args:
name: 歌曲名称
Returns:
tuple: (播放地址, 原始地址) - 网络音乐时可能有原始地址
"""
if self.is_web_music(name):
return self._get_web_music_url(name)
return self._get_local_music_url(name), None
def _get_web_music_url(self, name):
"""获取网络音乐播放地址"""
url = self.all_music[name]
self.log.info(f"get_music_url web music. name:{name}, url:{url}")
# 需要通过API获取真实播放地址
if self.is_need_use_play_music_api(name):
url = self._get_url_from_api(name, url)
if not url:
return "", None
# 是否需要代理
if self.config.web_music_proxy:
proxy_url = self._get_proxy_url(url)
return proxy_url, url
return url, None
def _get_url_from_api(self, name, url):
"""通过API获取真实播放地址"""
headers = self._all_web_music_api[name].get("headers", {})
url = self.url_cache.get(url, headers)
if not url:
self.log.error(f"get_music_url use api fail. name:{name}, url:{url}")
return url
def _get_proxy_url(self, origin_url):
"""获取代理URL"""
urlb64 = base64.b64encode(origin_url.encode("utf-8")).decode("utf-8")
proxy_url = f"{self.hostname}:{self.public_port}/proxy?urlb64={urlb64}"
self.log.info(f"Using proxy url: {proxy_url}")
return proxy_url
def _get_local_music_url(self, name):
"""获取本地音乐播放地址"""
filename = self.get_filename(name)
# 构造音乐文件的URL
# 处理文件路径
if filename.startswith(self.config.music_path):
filename = filename[len(self.config.music_path) :]
filename = filename.replace("\\", "/")
@@ -558,11 +620,10 @@ class XiaoMusic:
self.log.info(f"get_music_url local music. name:{name}, filename:{filename}")
# 构造URL
encoded_name = urllib.parse.quote(filename)
return try_add_access_control_param(
self.config,
f"{self.hostname}:{self.public_port}/music/{encoded_name}",
)
url = f"{self.hostname}:{self.public_port}/music/{encoded_name}"
return try_add_access_control_param(self.config, url)
# 给前端调用
def refresh_music_tag(self):
@@ -771,6 +832,7 @@ class XiaoMusic:
return
self._all_radio = {}
self._web_music_api = {}
music_list = json.loads(self.config.music_list_json)
try:
for item in music_list:
@@ -791,6 +853,8 @@ class XiaoMusic:
# 处理电台列表
if music_type == "radio":
self._all_radio[name] = url
if music.get("api"):
self._web_music_api[name] = music
self.log.debug(one_music_list)
# 歌曲名字相同会覆盖
self.music_list[list_name] = one_music_list