diff --git a/xiaomusic/api/routers/file.py b/xiaomusic/api/routers/file.py index 1fe5929..0e16e1e 100644 --- a/xiaomusic/api/routers/file.py +++ b/xiaomusic/api/routers/file.py @@ -54,6 +54,50 @@ from xiaomusic.utils.system_utils import try_add_access_control_param router = APIRouter() +def _process_m3u8_content(m3u8_content: str, base_url: str, radio: bool = None) -> str: + """处理 m3u8 文件内容,将资源 URL 替换为代理 URL + + Args: + m3u8_content: m3u8 文件内容 + base_url: m3u8 文件的 URL(用于解析相对路径) + radio: 是否为电台直播流 + + Returns: + str: 处理后的 m3u8 内容 + """ + from urllib.parse import urljoin + + lines = m3u8_content.split('\n') + processed_lines = [] + + for line in lines: + stripped_line = line.strip() + + # 跳过注释行和空行 + if not stripped_line or stripped_line.startswith('#'): + processed_lines.append(line) + continue + + # 处理资源行(.ts、.m3u8 等) + # 判断是否为 URL(包含协议或以 / 开头) + if stripped_line.startswith(('http://', 'https://', '/')): + # 绝对 URL,直接使用 + resource_url = stripped_line + else: + # 相对 URL,需要拼接 + resource_url = urljoin(base_url, stripped_line) + + # 将资源 URL 替换为代理 URL + urlb64 = base64.b64encode(resource_url.encode("utf-8")).decode("utf-8") + proxy_url = f"/proxy?urlb64={urlb64}" + if radio is not None: + proxy_url += f"&radio={'true' if radio else 'false'}" + + processed_lines.append(proxy_url) + + return '\n'.join(processed_lines) + + @router.post("/api/file/cleantempdir") async def cleantempdir(Verifcation=Depends(verification)): await clean_temp_dir(xiaomusic.config) @@ -365,7 +409,50 @@ async def proxy(urlb64: str, radio: bool = None): status_code=resp.status, detail=f"下载失败,状态码: {resp.status}" ) from status_exc - # 流式生成器,与download_file的分块逻辑一致 + # 提取文件名,根据URL扩展名智能判断 + filename = parsed_url.path.split("/")[-1].split("?")[0] + content_type = resp.headers.get("Content-Type", "").lower() + + # 判断是否为 m3u8 文件 + is_m3u8 = ( + url.lower().endswith(".m3u8") + or "mpegurl" in content_type + or "m3u8" in content_type + ) + + if not filename: + # 根据URL扩展名或Content-Type设置默认文件名 + path_lower = parsed_url.path.lower() + if path_lower.endswith(".m3u8") or is_m3u8: + filename = "stream.m3u8" + elif path_lower.endswith(".m3u"): + filename = "stream.m3u" + else: + filename = "output.mp3" + + # 如果是 m3u8 文件,需要处理内容,将相对路径替换为代理 URL + if is_m3u8: + try: + # 读取完整的 m3u8 内容 + m3u8_content = await resp.text() + await close_session() + + # 处理 m3u8 内容,替换资源 URL + processed_content = _process_m3u8_content(m3u8_content, url, radio) + + # 返回处理后的内容 + return Response( + content=processed_content, + media_type="application/vnd.apple.mpegurl", + headers={"Content-Disposition": f'inline; filename="{filename}"'}, + ) + except Exception as e: + log.exception(f"处理 m3u8 文件失败: {e}") + # 失败时返回原始内容 + await close_session() + raise + + # 非 m3u8 文件,使用流式传输 async def stream_generator(): try: async for data in resp.content.iter_chunked(4096): @@ -373,23 +460,6 @@ async def proxy(urlb64: str, radio: bool = None): finally: await close_session() - # 提取文件名,根据URL扩展名智能判断 - filename = parsed_url.path.split("/")[-1].split("?")[0] - if not filename: - # 根据URL扩展名或Content-Type设置默认文件名 - path_lower = parsed_url.path.lower() - if path_lower.endswith(".m3u8"): - filename = "stream.m3u8" - elif path_lower.endswith(".m3u"): - filename = "stream.m3u" - else: - # 根据Content-Type推断 - content_type = resp.headers.get("Content-Type", "").lower() - if "mpegurl" in content_type or "m3u8" in content_type: - filename = "stream.m3u8" - else: - filename = "output.mp3" - return StreamingResponse( stream_generator(), media_type=resp.headers.get("Content-Type", "audio/mpeg"),