diff --git a/xiaomusic/api/__init__.py b/xiaomusic/api/__init__.py new file mode 100644 index 0000000..9c2f603 --- /dev/null +++ b/xiaomusic/api/__init__.py @@ -0,0 +1,5 @@ +"""API 模块统一入口""" + +from xiaomusic.api.app import HttpInit, app + +__all__ = ["app", "HttpInit"] diff --git a/xiaomusic/api/app.py b/xiaomusic/api/app.py new file mode 100644 index 0000000..c28bef1 --- /dev/null +++ b/xiaomusic/api/app.py @@ -0,0 +1,79 @@ +"""FastAPI 应用实例和中间件配置""" + +import asyncio +import os +from contextlib import asynccontextmanager +from typing import TYPE_CHECKING + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from starlette.middleware.gzip import GZipMiddleware + +from xiaomusic import __version__ +from xiaomusic.api.dependencies import ( + AuthStaticFiles, + reset_http_server, +) + +if TYPE_CHECKING: + from xiaomusic.xiaomusic import XiaoMusic + +# 导入全局变量引用(将在 HttpInit 中初始化) +import xiaomusic.api.dependencies as deps + + +@asynccontextmanager +async def app_lifespan(app): + """应用生命周期管理""" + if deps.xiaomusic is not None: + asyncio.create_task(deps.xiaomusic.run_forever()) + try: + yield + except Exception as e: + deps.log.exception(f"Execption {e}") + + +# 创建 FastAPI 应用实例 +app = FastAPI( + lifespan=app_lifespan, + version=__version__, + docs_url=None, + redoc_url=None, + openapi_url=None, +) + +# 添加 CORS 中间件 +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # 允许访问的源 + allow_credentials=False, # 支持 cookie + allow_methods=["*"], # 允许使用的请求方法 + allow_headers=["*"], # 允许携带的 Headers +) + +# 添加 GZip 中间件 +app.add_middleware(GZipMiddleware, minimum_size=500) + + +def HttpInit(_xiaomusic: "XiaoMusic"): + """初始化 HTTP 服务器 + + Args: + _xiaomusic: XiaoMusic 实例 + """ + # 设置全局变量 + deps.xiaomusic = _xiaomusic + deps.config = _xiaomusic.config + deps.log = _xiaomusic.log + + # 挂载静态文件 + folder = os.path.dirname(os.path.dirname(__file__)) # xiaomusic 目录 + app.mount("/static", AuthStaticFiles(directory=f"{folder}/static"), name="static") + + # 注册所有路由 + from xiaomusic.api.routers import register_routers + + register_routers(app) + + # 重置 HTTP 服务器配置 + reset_http_server(app) diff --git a/xiaomusic/api/dependencies.py b/xiaomusic/api/dependencies.py new file mode 100644 index 0000000..d856e50 --- /dev/null +++ b/xiaomusic/api/dependencies.py @@ -0,0 +1,102 @@ +"""依赖注入和认证相关功能""" + +import hashlib +import secrets +from typing import TYPE_CHECKING, Annotated + +from fastapi import Depends, HTTPException, Request, status +from fastapi.security import HTTPBasic, HTTPBasicCredentials +from fastapi.staticfiles import StaticFiles + +if TYPE_CHECKING: + from xiaomusic.xiaomusic import XiaoMusic + +# 全局变量 +xiaomusic: "XiaoMusic" = None +config = None +log = None + +security = HTTPBasic() + + +def verification( + credentials: Annotated[HTTPBasicCredentials, Depends(security)], +): + """HTTP Basic 认证""" + current_username_bytes = credentials.username.encode("utf8") + correct_username_bytes = config.httpauth_username.encode("utf8") + is_correct_username = secrets.compare_digest( + current_username_bytes, correct_username_bytes + ) + current_password_bytes = credentials.password.encode("utf8") + correct_password_bytes = config.httpauth_password.encode("utf8") + is_correct_password = secrets.compare_digest( + current_password_bytes, correct_password_bytes + ) + if not (is_correct_username and is_correct_password): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Basic"}, + ) + return True + + +def no_verification(): + """无认证模式""" + return True + + +def access_key_verification(file_path: str, key: str, code: str) -> bool: + """访问密钥验证""" + if config.disable_httpauth: + return True + + log.debug(f"访问限制接收端[{file_path}, {key}, {code}]") + if key is not None: + current_key_bytes = key.encode("utf8") + correct_key_bytes = ( + config.httpauth_username + config.httpauth_password + ).encode("utf8") + is_correct_key = secrets.compare_digest(correct_key_bytes, current_key_bytes) + if is_correct_key: + return True + + if code is not None: + current_code_bytes = code.encode("utf8") + correct_code_bytes = ( + hashlib.sha256( + ( + file_path + config.httpauth_username + config.httpauth_password + ).encode("utf-8") + ) + .hexdigest() + .encode("utf-8") + ) + is_correct_code = secrets.compare_digest(correct_code_bytes, current_code_bytes) + if is_correct_code: + return True + + return False + + +class AuthStaticFiles(StaticFiles): + """需要认证的静态文件服务""" + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + async def __call__(self, scope, receive, send) -> None: + request = Request(scope, receive) + if not config.disable_httpauth: + assert verification(await security(request)) + await super().__call__(scope, receive, send) + + +def reset_http_server(app): + """重置 HTTP 服务器配置""" + log.info(f"disable_httpauth:{config.disable_httpauth}") + if config.disable_httpauth: + app.dependency_overrides[verification] = no_verification + else: + app.dependency_overrides = {} diff --git a/xiaomusic/api/models.py b/xiaomusic/api/models.py new file mode 100644 index 0000000..b8e3c93 --- /dev/null +++ b/xiaomusic/api/models.py @@ -0,0 +1,68 @@ +"""Pydantic 数据模型定义""" + +from pydantic import BaseModel + + +class DidVolume(BaseModel): + did: str + volume: int = 0 + + +class DidCmd(BaseModel): + did: str + cmd: str + + +class MusicInfoObj(BaseModel): + musicname: str + title: str = "" + artist: str = "" + album: str = "" + year: str = "" + genre: str = "" + lyrics: str = "" + picture: str = "" # base64 + + +class MusicItem(BaseModel): + name: str + + +class UrlInfo(BaseModel): + url: str + + +class DidPlayMusic(BaseModel): + did: str + musicname: str = "" + searchkey: str = "" + + +class DidPlayMusicList(BaseModel): + did: str + listname: str = "" + musicname: str = "" + + +class DownloadPlayList(BaseModel): + dirname: str + url: str + + +class DownloadOneMusic(BaseModel): + name: str = "" + url: str + + +class PlayListObj(BaseModel): + name: str = "" # 歌单名 + + +class PlayListUpdateObj(BaseModel): + oldname: str # 旧歌单名字 + newname: str # 新歌单名字 + + +class PlayListMusicObj(BaseModel): + name: str = "" # 歌单名 + music_list: list[str] # 歌曲名列表 diff --git a/xiaomusic/api/routers/__init__.py b/xiaomusic/api/routers/__init__.py new file mode 100644 index 0000000..bef8a1d --- /dev/null +++ b/xiaomusic/api/routers/__init__.py @@ -0,0 +1,27 @@ +"""路由注册""" + +from xiaomusic.api import websocket +from xiaomusic.api.routers import ( + device, + file, + music, + playlist, + plugin, + system, +) + + +def register_routers(app): + """注册所有路由到应用 + + Args: + app: FastAPI 应用实例 + """ + # 注册各个路由模块 + app.include_router(system.router, tags=["系统管理"]) + app.include_router(device.router, tags=["设备控制"]) + app.include_router(music.router, tags=["音乐管理"]) + app.include_router(playlist.router, tags=["播放列表"]) + app.include_router(plugin.router, tags=["插件管理"]) + app.include_router(file.router, tags=["文件操作"]) + app.include_router(websocket.router, tags=["WebSocket"]) diff --git a/xiaomusic/api/routers/device.py b/xiaomusic/api/routers/device.py new file mode 100644 index 0000000..fd20d89 --- /dev/null +++ b/xiaomusic/api/routers/device.py @@ -0,0 +1,84 @@ +"""设备控制路由""" + +import asyncio +import urllib.parse + +from fastapi import APIRouter, Depends + +from xiaomusic.api.dependencies import log, verification, xiaomusic +from xiaomusic.api.models import DidCmd, DidVolume + +router = APIRouter() + + +@router.get("/getvolume") +async def getvolume(did: str = "", Verifcation=Depends(verification)): + """获取音量""" + if not xiaomusic.did_exist(did): + return {"volume": 0} + + volume = await xiaomusic.get_volume(did=did) + return {"volume": volume} + + +@router.post("/setvolume") +async def setvolume(data: DidVolume, Verifcation=Depends(verification)): + """设置音量""" + did = data.did + volume = data.volume + if not xiaomusic.did_exist(did): + return {"ret": "Did not exist"} + + log.info(f"set_volume {did} {volume}") + await xiaomusic.set_volume(did=did, arg1=volume) + return {"ret": "OK", "volume": volume} + + +@router.post("/cmd") +async def do_cmd(data: DidCmd, Verifcation=Depends(verification)): + """执行命令""" + did = data.did + cmd = data.cmd + log.info(f"docmd. did:{did} cmd:{cmd}") + if not xiaomusic.did_exist(did): + return {"ret": "Did not exist"} + + if len(cmd) > 0: + try: + await xiaomusic.cancel_all_tasks() + task = asyncio.create_task(xiaomusic.do_check_cmd(did=did, query=cmd)) + xiaomusic.append_running_task(task) + except Exception as e: + log.warning(f"Execption {e}") + return {"ret": "OK"} + return {"ret": "Unknow cmd"} + + +@router.get("/cmdstatus") +async def cmd_status(Verifcation=Depends(verification)): + """命令状态""" + finish = await xiaomusic.is_task_finish() + if finish: + return {"ret": "OK", "status": "finish"} + return {"ret": "OK", "status": "running"} + + +@router.get("/playurl") +async def playurl(did: str, url: str, Verifcation=Depends(verification)): + """播放 URL""" + if not xiaomusic.did_exist(did): + return {"ret": "Did not exist"} + decoded_url = urllib.parse.unquote(url) + log.info(f"playurl did: {did} url: {decoded_url}") + return await xiaomusic.play_url(did=did, arg1=decoded_url) + + +@router.get("/playtts") +async def playtts(did: str, text: str, Verifcation=Depends(verification)): + """播放 TTS""" + if not xiaomusic.did_exist(did): + return {"ret": "Did not exist"} + + log.info(f"tts {did} {text}") + await xiaomusic.do_tts(did=did, value=text) + return {"ret": "OK"} diff --git a/xiaomusic/api/routers/file.py b/xiaomusic/api/routers/file.py new file mode 100644 index 0000000..1dbb55b --- /dev/null +++ b/xiaomusic/api/routers/file.py @@ -0,0 +1,349 @@ +"""文件操作路由""" + +import asyncio +import base64 +import os +import shutil +from urllib.parse import urlparse + +import aiohttp +from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile +from fastapi.responses import ( + FileResponse, + RedirectResponse, + Response, + StreamingResponse, +) +from starlette.background import BackgroundTask + +from xiaomusic.api.dependencies import ( + access_key_verification, + config, + log, + verification, + xiaomusic, +) +from xiaomusic.api.models import DownloadOneMusic, DownloadPlayList, UrlInfo +from xiaomusic.utils import ( + check_bili_fav_list, + chmoddir, + convert_file_to_mp3, + download_one_music, + download_playlist, + downloadfile, + is_mp3, + remove_common_prefix, + remove_id3_tags, + safe_join_path, + try_add_access_control_param, +) + +router = APIRouter() + + +@router.post("/downloadjson") +async def downloadjson(data: UrlInfo, Verifcation=Depends(verification)): + """下载 JSON""" + log.info(data) + url = data.url + content = "" + try: + ret = "OK" + content = await downloadfile(url) + except Exception as e: + log.exception(f"Execption {e}") + ret = "Download JSON file failed." + return { + "ret": ret, + "content": content, + } + + +@router.post("/downloadplaylist") +async def downloadplaylist(data: DownloadPlayList, Verifcation=Depends(verification)): + """下载歌单""" + try: + bili_fav_list = await check_bili_fav_list(data.url) + download_proc_list = [] + if bili_fav_list: + for bvid, title in bili_fav_list.items(): + bvurl = f"https://www.bilibili.com/video/{bvid}" + download_proc_list[title] = await download_one_music( + config, bvurl, os.path.join(data.dirname, title) + ) + for title, download_proc_sigle in download_proc_list.items(): + exit_code = await download_proc_sigle.wait() + log.info(f"Download completed {title} with exit code {exit_code}") + dir_path = safe_join_path(config.download_path, data.dirname) + log.debug(f"Download dir_path: {dir_path}") + # 可能只是部分失败,都需要整理下载目录 + remove_common_prefix(dir_path) + chmoddir(dir_path) + return {"ret": "OK"} + else: + download_proc = await download_playlist(config, data.url, data.dirname) + + async def check_download_proc(): + # 等待子进程完成 + exit_code = await download_proc.wait() + log.info(f"Download completed with exit code {exit_code}") + + dir_path = safe_join_path(config.download_path, data.dirname) + log.debug(f"Download dir_path: {dir_path}") + # 可能只是部分失败,都需要整理下载目录 + remove_common_prefix(dir_path) + chmoddir(dir_path) + + asyncio.create_task(check_download_proc()) + return {"ret": "OK"} + except Exception as e: + log.exception(f"Execption {e}") + + return {"ret": "Failed download"} + + +@router.post("/downloadonemusic") +async def downloadonemusic(data: DownloadOneMusic, Verifcation=Depends(verification)): + """下载单首歌曲""" + try: + download_proc = await download_one_music(config, data.url, data.name) + + async def check_download_proc(): + # 等待子进程完成 + exit_code = await download_proc.wait() + log.info(f"Download completed with exit code {exit_code}") + chmoddir(config.download_path) + + asyncio.create_task(check_download_proc()) + return {"ret": "OK"} + except Exception as e: + log.exception(f"Execption {e}") + + return {"ret": "Failed download"} + + +@router.post("/uploadytdlpcookie") +async def upload_yt_dlp_cookie(file: UploadFile = File(...)): + """上传 yt-dlp cookies""" + with open(config.yt_dlp_cookies_path, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + return { + "ret": "OK", + "filename": file.filename, + "file_location": config.yt_dlp_cookies_path, + } + + +@router.post("/uploadmusic") +async def upload_music(playlist: str = Form(...), file: UploadFile = File(...)): + """上传音乐文件到当前播放列表对应的目录""" + try: + # 选择目标目录:优先尝试由播放列表中已有歌曲推断目录 + dest_dir = xiaomusic.music_path + # 特殊歌单映射 + if playlist == "下载": + dest_dir = xiaomusic.download_path + elif playlist == "其他": + dest_dir = xiaomusic.music_path + else: + # 如果播放列表中存在歌曲,从其中任意一首推断目录 + musics = xiaomusic.music_list.get(playlist, []) + if musics and len(musics) > 0: + first = musics[0] + filepath = xiaomusic.all_music.get(first, "") + if filepath: + dest_dir = os.path.dirname(filepath) + + # 确保目录存在 + if not os.path.exists(dest_dir): + os.makedirs(dest_dir, exist_ok=True) + + # 保存文件,避免路径穿越 + filename = os.path.basename(file.filename) + if filename == "": + raise HTTPException(status_code=400, detail="Invalid filename") + + dest_path = os.path.join(dest_dir, filename) + # 避免覆盖已有文件,简单地添加序号后缀 + base, ext = os.path.splitext(filename) + counter = 1 + while os.path.exists(dest_path): + filename = f"{base}_{counter}{ext}" + dest_path = os.path.join(dest_dir, filename) + counter += 1 + + with open(dest_path, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + + # 修复权限并刷新列表索引 + try: + chmoddir(dest_dir) + except Exception: + pass + + # 重新生成音乐列表索引 + try: + xiaomusic._gen_all_music_list() + except Exception: + pass + + return {"ret": "OK", "filename": filename} + except HTTPException: + raise + except Exception as e: + log.exception(f"upload music failed: {e}") + raise HTTPException(status_code=500, detail="Upload failed") from e + + +def safe_redirect(url): + """安全重定向""" + url = try_add_access_control_param(config, url) + url = url.replace("\\", "") + if not urlparse(url).netloc and not urlparse(url).scheme: + log.debug(f"redirect to {url}") + return RedirectResponse(url=url) + return None + + +@router.get("/music/{file_path:path}") +async def music_file(request: Request, file_path: str, key: str = "", code: str = ""): + """音乐文件访问""" + if not access_key_verification(f"/music/{file_path}", key, code): + raise HTTPException(status_code=404, detail="File not found") + + absolute_path = os.path.abspath(config.music_path) + absolute_file_path = os.path.normpath(os.path.join(absolute_path, file_path)) + if not absolute_file_path.startswith(absolute_path): + raise HTTPException(status_code=404, detail="File not found") + if not os.path.exists(absolute_file_path): + raise HTTPException(status_code=404, detail="File not found") + + # 移除MP3 ID3 v2标签和填充 + if config.remove_id3tag and is_mp3(file_path): + log.info(f"remove_id3tag:{config.remove_id3tag}, is_mp3:True ") + temp_mp3_file = remove_id3_tags(absolute_file_path, config) + if temp_mp3_file: + log.info(f"ID3 tag removed {absolute_file_path} to {temp_mp3_file}") + redirect = safe_redirect(f"/music/{temp_mp3_file}") + if redirect: + return redirect + else: + log.info(f"No ID3 tag remove needed: {absolute_file_path}") + + if config.convert_to_mp3 and not is_mp3(file_path): + temp_mp3_file = convert_file_to_mp3(absolute_file_path, config) + if temp_mp3_file: + log.info(f"Converted file: {absolute_file_path} to {temp_mp3_file}") + redirect = safe_redirect(f"/music/{temp_mp3_file}") + if redirect: + return redirect + else: + log.warning(f"Failed to convert file to MP3 format: {absolute_file_path}") + + return FileResponse(absolute_file_path) + + +@router.options("/music/{file_path:path}") +async def music_options(): + """音乐文件 OPTIONS""" + headers = { + "Accept-Ranges": "bytes", + } + return Response(headers=headers) + + +@router.get("/picture/{file_path:path}") +async def get_picture(request: Request, file_path: str, key: str = "", code: str = ""): + """图片文件访问""" + if not access_key_verification(f"/picture/{file_path}", key, code): + raise HTTPException(status_code=404, detail="File not found") + + absolute_path = os.path.abspath(config.picture_cache_path) + absolute_file_path = os.path.normpath(os.path.join(absolute_path, file_path)) + if not absolute_file_path.startswith(absolute_path): + raise HTTPException(status_code=404, detail="File not found") + if not os.path.exists(absolute_file_path): + raise HTTPException(status_code=404, detail="File not found") + + return FileResponse(absolute_file_path) + + +@router.get("/proxy", summary="基于正常下载逻辑的代理接口") +async def proxy(urlb64: str): + """代理接口""" + try: + # 将Base64编码的URL解码为字符串 + url_bytes = base64.b64decode(urlb64) + url = url_bytes.decode("utf-8") + print(f"解码后的代理请求: {url}") + except Exception as e: + raise HTTPException(status_code=400, detail=f"Base64解码失败: {str(e)}") from e + + log.info(f"代理请求: {url}") + + parsed_url = urlparse(url) + if not parsed_url.scheme or not parsed_url.netloc: + # Fixed: Use a new exception instance since 'e' from previous block is out of scope + invalid_url_exc = ValueError("URL缺少协议或域名") + raise HTTPException( + status_code=400, detail="无效的URL格式" + ) from invalid_url_exc + + # 创建会话并确保关闭 + session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=600), + connector=aiohttp.TCPConnector(ssl=True), + ) + + # 复用经过验证的请求头配置 + def get_wget_headers(parsed_url): + return { + "User-Agent": "Wget/1.21.3", + "Accept": "*/*", + "Accept-Encoding": "identity", + "Connection": "Keep-Alive", + } + + async def close_session(): + if not session.closed: + await session.close() + + try: + # 复用download_file中的请求逻辑 + headers = get_wget_headers(parsed_url) + resp = await session.get(url, headers=headers, allow_redirects=True) + + if resp.status not in (200, 206): + await close_session() + status_exc = ValueError(f"服务器返回状态码: {resp.status}") + raise HTTPException( + status_code=resp.status, detail=f"下载失败,状态码: {resp.status}" + ) from status_exc + + # 流式生成器,与download_file的分块逻辑一致 + async def stream_generator(): + try: + async for data in resp.content.iter_chunked(4096): + yield data + finally: + await close_session() + + # 提取文件名 + filename = parsed_url.path.split("/")[-1].split("?")[0] or "output.mp3" + + return StreamingResponse( + stream_generator(), + media_type=resp.headers.get("Content-Type", "audio/mpeg"), + headers={"Content-Disposition": f'inline; filename="{filename}"'}, + background=BackgroundTask(close_session), + ) + + except aiohttp.ClientConnectionError as e: + await close_session() + raise HTTPException(status_code=502, detail=f"连接错误: {str(e)}") from e + except asyncio.TimeoutError as e: + await close_session() + raise HTTPException(status_code=504, detail="下载超时") from e + except Exception as e: + await close_session() + raise HTTPException(status_code=500, detail=f"发生错误: {str(e)}") from e diff --git a/xiaomusic/api/routers/music.py b/xiaomusic/api/routers/music.py new file mode 100644 index 0000000..32c9edd --- /dev/null +++ b/xiaomusic/api/routers/music.py @@ -0,0 +1,215 @@ +"""音乐管理路由""" + +import json +import urllib.parse + +from fastapi import APIRouter, Depends, HTTPException, Query, Request + +from xiaomusic.api.dependencies import log, verification, xiaomusic +from xiaomusic.api.models import ( + DidPlayMusic, + MusicInfoObj, + MusicItem, +) + +router = APIRouter() + + +@router.get("/searchmusic") +def searchmusic(name: str = "", Verifcation=Depends(verification)): + """搜索音乐""" + return xiaomusic.searchmusic(name) + + +@router.get("/api/search/online") +async def search_online_music( + keyword: str = Query(..., description="搜索关键词"), + plugin: str = Query("all", description="指定插件名称,all表示搜索所有插件"), + page: int = Query(1, description="页码"), + limit: int = Query(20, description="每页数量"), + Verifcation=Depends(verification), +): + """在线音乐搜索API""" + try: + if not keyword: + return {"success": False, "error": "Keyword required"} + + return await xiaomusic.get_music_list_online( + keyword=keyword, plugin=plugin, page=page, limit=limit + ) + except Exception as e: + return {"success": False, "error": str(e)} + + +@router.get("/api/proxy/real-music-url") +async def get_real_music_url( + url: str = Query(..., description="音乐下载URL"), Verifcation=Depends(verification) +): + """通过服务端代理获取真实的音乐播放URL,避免CORS问题""" + try: + # 获取真实的音乐播放URL + return await xiaomusic.get_real_url_of_openapi(url) + + except Exception as e: + log.error(f"获取真实音乐URL失败: {e}") + # 如果代理获取失败,仍然返回原始URL + return {"success": False, "realUrl": url, "error": str(e)} + + +@router.post("/api/play/getMediaSource") +async def get_media_source(request: Request, Verifcation=Depends(verification)): + """获取音乐真实播放URL""" + try: + # 获取请求数据 + data = await request.json() + # 调用公共函数处理 + return await xiaomusic.get_media_source_url(data) + except Exception as e: + return {"success": False, "error": str(e)} + + +@router.post("/api/play/getLyric") +async def get_media_lyric(request: Request, Verifcation=Depends(verification)): + """获取音乐歌词""" + try: + # 获取请求数据 + data = await request.json() + # 调用公共函数处理 + return await xiaomusic.get_media_lyric(data) + except Exception as e: + return {"success": False, "error": str(e)} + + +@router.post("/api/play/online") +async def play_online_music(request: Request, Verifcation=Depends(verification)): + """设备端在线播放插件音乐""" + try: + # 获取请求数据 + data = await request.json() + did = data.get("did") + openapi_info = xiaomusic.js_plugin_manager.get_openapi_info() + if openapi_info.get("enabled", False): + media_source = await xiaomusic.get_real_url_of_openapi(data.get("url")) + else: + # 调用公共函数处理,获取音乐真实播放URL + media_source = await xiaomusic.get_media_source_url(data) + if not media_source or not media_source.get("url"): + return {"success": False, "error": "Failed to get media source URL"} + url = media_source.get("url") + decoded_url = urllib.parse.unquote(url) + return await xiaomusic.play_url(did=did, arg1=decoded_url) + except Exception as e: + return {"success": False, "error": str(e)} + + +@router.get("/playingmusic") +def playingmusic(did: str = "", Verifcation=Depends(verification)): + """当前播放音乐""" + if not xiaomusic.did_exist(did): + return {"ret": "Did not exist"} + + is_playing = xiaomusic.isplaying(did) + cur_music = xiaomusic.playingmusic(did) + cur_playlist = xiaomusic.get_cur_play_list(did) + # 播放进度 + offset, duration = xiaomusic.get_offset_duration(did) + return { + "ret": "OK", + "is_playing": is_playing, + "cur_music": cur_music, + "cur_playlist": cur_playlist, + "offset": offset, + "duration": duration, + } + + +@router.get("/musiclist") +async def musiclist(Verifcation=Depends(verification)): + """音乐列表""" + return xiaomusic.get_music_list() + + +@router.get("/musicinfo") +async def musicinfo( + name: str, musictag: bool = False, Verifcation=Depends(verification) +): + """音乐信息""" + url, _ = await xiaomusic.get_music_url(name) + info = { + "ret": "OK", + "name": name, + "url": url, + } + if musictag: + info["tags"] = xiaomusic.get_music_tags(name) + return info + + +@router.get("/musicinfos") +async def musicinfos( + name: list[str] = Query(None), + musictag: bool = False, + Verifcation=Depends(verification), +): + """批量音乐信息""" + ret = [] + for music_name in name: + url, _ = await xiaomusic.get_music_url(music_name) + info = { + "name": music_name, + "url": url, + } + if musictag: + info["tags"] = xiaomusic.get_music_tags(music_name) + ret.append(info) + return ret + + +@router.post("/setmusictag") +async def setmusictag(info: MusicInfoObj, Verifcation=Depends(verification)): + """设置音乐标签""" + ret = xiaomusic.set_music_tag(info.musicname, info) + return {"ret": ret} + + +@router.post("/delmusic") +async def delmusic(data: MusicItem, Verifcation=Depends(verification)): + """删除音乐""" + log.info(data) + await xiaomusic.del_music(data.name) + return "success" + + +@router.post("/playmusic") +async def playmusic(data: DidPlayMusic, Verifcation=Depends(verification)): + """播放音乐""" + did = data.did + musicname = data.musicname + searchkey = data.searchkey + if not xiaomusic.did_exist(did): + return {"ret": "Did not exist"} + + log.info(f"playmusic {did} musicname:{musicname} searchkey:{searchkey}") + await xiaomusic.do_play(did, musicname, searchkey) + return {"ret": "OK"} + + +@router.post("/refreshmusictag") +async def refreshmusictag(Verifcation=Depends(verification)): + """刷新音乐标签""" + xiaomusic.refresh_music_tag() + return { + "ret": "OK", + } + + +@router.post("/debug_play_by_music_url") +async def debug_play_by_music_url(request: Request, Verifcation=Depends(verification)): + """调试播放音乐URL""" + try: + data = await request.body() + data_dict = json.loads(data.decode("utf-8")) + log.info(f"data:{data_dict}") + return await xiaomusic.debug_play_by_music_url(arg1=data_dict) + except json.JSONDecodeError as err: + raise HTTPException(status_code=400, detail="Invalid JSON") from err diff --git a/xiaomusic/api/routers/playlist.py b/xiaomusic/api/routers/playlist.py new file mode 100644 index 0000000..dafe538 --- /dev/null +++ b/xiaomusic/api/routers/playlist.py @@ -0,0 +1,114 @@ +"""播放列表路由""" + +from fastapi import APIRouter, Depends + +from xiaomusic.api.dependencies import log, verification, xiaomusic +from xiaomusic.api.models import ( + DidPlayMusicList, + PlayListMusicObj, + PlayListObj, + PlayListUpdateObj, +) + +router = APIRouter() + + +@router.get("/curplaylist") +async def curplaylist(did: str = "", Verifcation=Depends(verification)): + """当前播放列表""" + if not xiaomusic.did_exist(did): + return "" + return xiaomusic.get_cur_play_list(did) + + +@router.post("/playmusiclist") +async def playmusiclist(data: DidPlayMusicList, Verifcation=Depends(verification)): + """播放音乐列表""" + did = data.did + listname = data.listname + musicname = data.musicname + if not xiaomusic.did_exist(did): + return {"ret": "Did not exist"} + + log.info(f"playmusiclist {did} listname:{listname} musicname:{musicname}") + await xiaomusic.do_play_music_list(did, listname, musicname) + return {"ret": "OK"} + + +@router.post("/playlistadd") +async def playlistadd(data: PlayListObj, Verifcation=Depends(verification)): + """新增歌单""" + ret = xiaomusic.play_list_add(data.name) + if ret: + return {"ret": "OK"} + return {"ret": "Add failed, may be already exist."} + + +@router.post("/playlistdel") +async def playlistdel(data: PlayListObj, Verifcation=Depends(verification)): + """移除歌单""" + ret = xiaomusic.play_list_del(data.name) + if ret: + return {"ret": "OK"} + return {"ret": "Del failed, may be not exist."} + + +@router.post("/playlistupdatename") +async def playlistupdatename( + data: PlayListUpdateObj, Verifcation=Depends(verification) +): + """修改歌单名字""" + ret = xiaomusic.play_list_update_name(data.oldname, data.newname) + if ret: + return {"ret": "OK"} + return {"ret": "Update failed, may be not exist."} + + +@router.get("/playlistnames") +async def getplaylistnames(Verifcation=Depends(verification)): + """获取所有自定义歌单""" + names = xiaomusic.get_play_list_names() + log.info(f"names {names}") + return { + "ret": "OK", + "names": names, + } + + +@router.post("/playlistaddmusic") +async def playlistaddmusic(data: PlayListMusicObj, Verifcation=Depends(verification)): + """歌单新增歌曲""" + ret = xiaomusic.play_list_add_music(data.name, data.music_list) + if ret: + return {"ret": "OK"} + return {"ret": "Add failed, may be playlist not exist."} + + +@router.post("/playlistdelmusic") +async def playlistdelmusic(data: PlayListMusicObj, Verifcation=Depends(verification)): + """歌单移除歌曲""" + ret = xiaomusic.play_list_del_music(data.name, data.music_list) + if ret: + return {"ret": "OK"} + return {"ret": "Del failed, may be playlist not exist."} + + +@router.post("/playlistupdatemusic") +async def playlistupdatemusic( + data: PlayListMusicObj, Verifcation=Depends(verification) +): + """歌单更新歌曲""" + ret = xiaomusic.play_list_update_music(data.name, data.music_list) + if ret: + return {"ret": "OK"} + return {"ret": "Del failed, may be playlist not exist."} + + +@router.get("/playlistmusics") +async def getplaylist(name: str, Verifcation=Depends(verification)): + """获取歌单中所有歌曲""" + ret, musics = xiaomusic.play_list_musics(name) + return { + "ret": "OK", + "musics": musics, + } diff --git a/xiaomusic/api/routers/plugin.py b/xiaomusic/api/routers/plugin.py new file mode 100644 index 0000000..1d13e4f --- /dev/null +++ b/xiaomusic/api/routers/plugin.py @@ -0,0 +1,163 @@ +"""插件管理路由""" + +import os + +import aiofiles +from fastapi import APIRouter, Depends, File, HTTPException, Query, Request, UploadFile + +from xiaomusic.api.dependencies import verification, xiaomusic + +router = APIRouter() + + +@router.get("/api/js-plugins") +def get_js_plugins( + enabled_only: bool = Query(False, description="是否只返回启用的插件"), + Verifcation=Depends(verification), +): + """获取插件列表""" + try: + if ( + not hasattr(xiaomusic, "js_plugin_manager") + or not xiaomusic.js_plugin_manager + ): + return {"success": False, "error": "JS Plugin Manager not available"} + + if enabled_only: + plugins = xiaomusic.js_plugin_manager.get_enabled_plugins() + else: + plugins = xiaomusic.js_plugin_manager.get_plugin_list() + return {"success": True, "data": plugins} + + except Exception as e: + return {"success": False, "error": str(e)} + + +@router.put("/api/js-plugins/{plugin_name}/enable") +def enable_js_plugin(plugin_name: str, Verifcation=Depends(verification)): + """启用插件""" + try: + if ( + not hasattr(xiaomusic, "js_plugin_manager") + or not xiaomusic.js_plugin_manager + ): + return {"success": False, "error": "JS Plugin Manager not available"} + + success = xiaomusic.js_plugin_manager.enable_plugin(plugin_name) + return {"success": success} + + except Exception as e: + return {"success": False, "error": str(e)} + + +@router.put("/api/js-plugins/{plugin_name}/disable") +def disable_js_plugin(plugin_name: str, Verifcation=Depends(verification)): + """禁用插件""" + try: + if ( + not hasattr(xiaomusic, "js_plugin_manager") + or not xiaomusic.js_plugin_manager + ): + return {"success": False, "error": "JS Plugin Manager not available"} + + success = xiaomusic.js_plugin_manager.disable_plugin(plugin_name) + return {"success": success} + + except Exception as e: + return {"success": False, "error": str(e)} + + +@router.delete("/api/js-plugins/{plugin_name}/uninstall") +def uninstall_js_plugin(plugin_name: str, Verifcation=Depends(verification)): + """卸载插件""" + try: + if ( + not hasattr(xiaomusic, "js_plugin_manager") + or not xiaomusic.js_plugin_manager + ): + return {"success": False, "error": "JS Plugin Manager not available"} + + success = xiaomusic.js_plugin_manager.uninstall_plugin(plugin_name) + return {"success": success} + + except Exception as e: + return {"success": False, "error": str(e)} + + +@router.post("/api/js-plugins/upload") +async def upload_js_plugin( + file: UploadFile = File(...), verification_dep=Depends(verification) +): + """上传 JS 插件""" + try: + # 验证文件扩展名 + if not file.filename.endswith(".js"): + raise HTTPException(status_code=400, detail="只允许上传 .js 文件") + + # 使用 JSPluginManager 中定义的插件目录 + if ( + not hasattr(xiaomusic, "js_plugin_manager") + or not xiaomusic.js_plugin_manager + ): + raise HTTPException( + status_code=500, detail="JS Plugin Manager not available" + ) + + plugin_dir = xiaomusic.js_plugin_manager.plugins_dir + os.makedirs(plugin_dir, exist_ok=True) + file_path = os.path.join(plugin_dir, file.filename) + # 校验是否已存在同名js插件 存在则提示,停止上传 + if os.path.exists(file_path): + raise HTTPException( + status_code=409, detail=f"插件 {file.filename} 已存在,请重命名后再上传" + ) + file_path = os.path.join(plugin_dir, file.filename) + + # 写入文件内容 + async with aiofiles.open(file_path, "wb") as f: + content = await file.read() + await f.write(content) + + # 更新插件配置文件 + plugin_name = os.path.splitext(file.filename)[0] + xiaomusic.js_plugin_manager.update_plugin_config(plugin_name, file.filename) + + # 重新加载插件 + xiaomusic.js_plugin_manager.reload_plugins() + + return {"success": True, "message": "插件上传成功"} + + except Exception as e: + return {"success": False, "error": str(e)} + + +@router.get("/api/openapi/load") +def get_openapi_info(Verifcation=Depends(verification)): + """获取开放接口配置信息""" + try: + openapi_info = xiaomusic.js_plugin_manager.get_openapi_info() + return {"success": True, "data": openapi_info} + except Exception as e: + return {"success": False, "error": str(e)} + + +@router.post("/api/openapi/toggle") +def toggle_openapi(Verifcation=Depends(verification)): + """开放接口状态切换""" + try: + return xiaomusic.js_plugin_manager.toggle_openapi() + except Exception as e: + return {"success": False, "error": str(e)} + + +@router.post("/api/openapi/updateUrl") +async def update_openapi_url(request: Request, Verifcation=Depends(verification)): + """更新开放接口地址""" + try: + request_json = await request.json() + search_url = request_json.get("search_url") + if not request_json or "search_url" not in request_json: + return {"success": False, "error": "Missing 'search_url' in request body"} + return xiaomusic.js_plugin_manager.update_openapi_url(search_url) + except Exception as e: + return {"success": False, "error": str(e)} diff --git a/xiaomusic/api/routers/system.py b/xiaomusic/api/routers/system.py new file mode 100644 index 0000000..b4289fc --- /dev/null +++ b/xiaomusic/api/routers/system.py @@ -0,0 +1,157 @@ +"""系统管理路由""" + +import json +import os +import shutil +import tempfile +from dataclasses import asdict + +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html +from fastapi.openapi.utils import get_openapi +from fastapi.responses import FileResponse +from starlette.background import BackgroundTask + +from xiaomusic import __version__ +from xiaomusic.api.dependencies import log, verification, xiaomusic +from xiaomusic.utils import ( + deepcopy_data_no_sensitive_info, + get_latest_version, + restart_xiaomusic, + update_version, +) + +router = APIRouter() + + +@router.get("/") +async def read_index(Verifcation=Depends(verification)): + """首页""" + folder = os.path.dirname( + os.path.dirname(os.path.dirname(__file__)) + ) # xiaomusic 目录 + return FileResponse(f"{folder}/static/index.html") + + +@router.get("/getversion") +def getversion(Verifcation=Depends(verification)): + """获取版本""" + log.debug("getversion %s", __version__) + return {"version": __version__} + + +@router.get("/getsetting") +async def getsetting(need_device_list: bool = False, Verifcation=Depends(verification)): + """获取设置""" + config_data = xiaomusic.getconfig() + data = asdict(config_data) + data["password"] = "******" + data["httpauth_password"] = "******" + if need_device_list: + device_list = await xiaomusic.getalldevices() + log.info(f"getsetting device_list: {device_list}") + data["device_list"] = device_list + return data + + +@router.post("/savesetting") +async def savesetting(request: Request, Verifcation=Depends(verification)): + """保存设置""" + try: + data_json = await request.body() + data = json.loads(data_json.decode("utf-8")) + debug_data = deepcopy_data_no_sensitive_info(data) + log.info(f"saveconfig: {debug_data}") + config_obj = xiaomusic.getconfig() + if data["password"] == "******" or data["password"] == "": + data["password"] = config_obj.password + if data["httpauth_password"] == "******" or data["httpauth_password"] == "": + data["httpauth_password"] = config_obj.httpauth_password + await xiaomusic.saveconfig(data) + + # 重置 HTTP 服务器配置 + from xiaomusic.api.app import app + from xiaomusic.api.dependencies import reset_http_server + + reset_http_server(app) + + return "save success" + except json.JSONDecodeError as err: + raise HTTPException(status_code=400, detail="Invalid JSON") from err + + +@router.get("/downloadlog") +def downloadlog(Verifcation=Depends(verification)): + """下载日志""" + file_path = xiaomusic.config.log_file + if os.path.exists(file_path): + # 创建一个临时文件来保存日志的快照 + temp_file = tempfile.NamedTemporaryFile(delete=False) + try: + with open(file_path, "rb") as f: + shutil.copyfileobj(f, temp_file) + temp_file.close() + + # 使用BackgroundTask在响应发送完毕后删除临时文件 + def cleanup_temp_file(tmp_file_path): + os.remove(tmp_file_path) + + background_task = BackgroundTask(cleanup_temp_file, temp_file.name) + return FileResponse( + temp_file.name, + media_type="text/plain", + filename="xiaomusic.txt", + background=background_task, + ) + except Exception as e: + os.remove(temp_file.name) + raise HTTPException( + status_code=500, detail="Error capturing log file" + ) from e + else: + return {"message": "File not found."} + + +@router.get("/latestversion") +async def latest_version(Verifcation=Depends(verification)): + """获取最新版本""" + version = await get_latest_version("xiaomusic") + if version: + return {"ret": "OK", "version": version} + else: + return {"ret": "Fetch version failed"} + + +@router.post("/updateversion") +async def updateversion( + version: str = "", lite: bool = True, Verifcation=Depends(verification) +): + """更新版本""" + import asyncio + + ret = await update_version(version, lite) + if ret != "OK": + return {"ret": ret} + + asyncio.create_task(restart_xiaomusic()) + return {"ret": "OK"} + + +@router.get("/docs", include_in_schema=False) +async def get_swagger_documentation(Verifcation=Depends(verification)): + """Swagger 文档""" + return get_swagger_ui_html(openapi_url="/openapi.json", title="docs") + + +@router.get("/redoc", include_in_schema=False) +async def get_redoc_documentation(Verifcation=Depends(verification)): + """ReDoc 文档""" + return get_redoc_html(openapi_url="/openapi.json", title="docs") + + +@router.get("/openapi.json", include_in_schema=False) +async def openapi(Verifcation=Depends(verification)): + """OpenAPI 规范""" + from xiaomusic.api.app import app + + return get_openapi(title=app.title, version=app.version, routes=app.routes) diff --git a/xiaomusic/api/websocket.py b/xiaomusic/api/websocket.py new file mode 100644 index 0000000..5904f9b --- /dev/null +++ b/xiaomusic/api/websocket.py @@ -0,0 +1,96 @@ +"""WebSocket 相关功能""" + +import asyncio +import json +import secrets +import time + +import jwt +from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect + +from xiaomusic.api.dependencies import verification, xiaomusic + +router = APIRouter() + +# JWT 配置 +JWT_SECRET = secrets.token_urlsafe(32) +JWT_ALGORITHM = "HS256" +JWT_EXPIRE_SECONDS = 60 * 5 # 5 分钟有效期(足够前端连接和重连) + + +@router.get("/generate_ws_token") +def generate_ws_token( + did: str, + _: bool = verification, # 复用 HTTP Basic 验证 +): + """生成 WebSocket token""" + if not xiaomusic.did_exist(did): + raise HTTPException(status_code=400, detail="Invalid did") + + payload = { + "did": did, + "exp": time.time() + JWT_EXPIRE_SECONDS, + "iat": time.time(), + } + + token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) + + return { + "token": token, + "expire_in": JWT_EXPIRE_SECONDS, + } + + +@router.websocket("/ws/playingmusic") +async def ws_playingmusic(websocket: WebSocket): + """WebSocket 播放状态推送""" + token = websocket.query_params.get("token") + if not token: + await websocket.close(code=1008, reason="Missing token") + return + + try: + # 解码 JWT(自动校验签名 + 是否过期) + payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) + did = payload.get("did") + + if not did: + await websocket.close(code=1008, reason="Invalid token") + return + + if not xiaomusic.did_exist(did): + await websocket.close(code=1003, reason="Did not exist") + return + + await websocket.accept() + + # 开始推送状态 + while True: + is_playing = xiaomusic.isplaying(did) + cur_music = xiaomusic.playingmusic(did) + cur_playlist = xiaomusic.get_cur_play_list(did) + offset, duration = xiaomusic.get_offset_duration(did) + + await websocket.send_text( + json.dumps( + { + "ret": "OK", + "is_playing": is_playing, + "cur_music": cur_music, + "cur_playlist": cur_playlist, + "offset": offset, + "duration": duration, + } + ) + ) + await asyncio.sleep(1) + + except jwt.ExpiredSignatureError: + await websocket.close(code=1008, reason="Token expired") + except jwt.InvalidTokenError: + await websocket.close(code=1008, reason="Invalid token") + except WebSocketDisconnect: + print(f"WebSocket disconnected: {did}") + except Exception as e: + print(f"Error: {e}") + await websocket.close() diff --git a/xiaomusic/cli.py b/xiaomusic/cli.py index dcddb66..42ca0a6 100644 --- a/xiaomusic/cli.py +++ b/xiaomusic/cli.py @@ -38,9 +38,9 @@ def main(): import uvicorn from xiaomusic import __version__ + from xiaomusic.api import HttpInit + from xiaomusic.api import app as HttpApp from xiaomusic.config import Config - from xiaomusic.httpserver import HttpInit - from xiaomusic.httpserver import app as HttpApp from xiaomusic.xiaomusic import XiaoMusic parser = argparse.ArgumentParser() diff --git a/xiaomusic/httpserver.py b/xiaomusic/httpserver.py deleted file mode 100644 index ea8b7d0..0000000 --- a/xiaomusic/httpserver.py +++ /dev/null @@ -1,1280 +0,0 @@ -import asyncio -import base64 -import hashlib -import json -import os -import secrets -import shutil -import tempfile -import time -import urllib.parse -from contextlib import asynccontextmanager -from dataclasses import asdict -from typing import TYPE_CHECKING, Annotated -from urllib.parse import urlparse - -import jwt -from fastapi import WebSocket, WebSocketDisconnect - -if TYPE_CHECKING: - from xiaomusic.xiaomusic import XiaoMusic -import aiofiles -import aiohttp -from fastapi import ( - Depends, - FastAPI, - File, - Form, - HTTPException, - Query, - Request, - UploadFile, - status, -) -from fastapi.middleware.cors import CORSMiddleware -from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html -from fastapi.openapi.utils import get_openapi -from fastapi.responses import RedirectResponse, StreamingResponse -from fastapi.security import HTTPBasic, HTTPBasicCredentials -from fastapi.staticfiles import StaticFiles -from pydantic import BaseModel -from starlette.background import BackgroundTask -from starlette.middleware.gzip import GZipMiddleware -from starlette.responses import FileResponse, Response - -from xiaomusic import __version__ -from xiaomusic.utils import ( - check_bili_fav_list, - chmoddir, - convert_file_to_mp3, - deepcopy_data_no_sensitive_info, - download_one_music, - download_playlist, - downloadfile, - get_latest_version, - is_mp3, - remove_common_prefix, - remove_id3_tags, - restart_xiaomusic, - safe_join_path, - try_add_access_control_param, - update_version, -) - -xiaomusic: "XiaoMusic" = None -config = None -log = None - - -@asynccontextmanager -async def app_lifespan(app): - if xiaomusic is not None: - asyncio.create_task(xiaomusic.run_forever()) - try: - yield - except Exception as e: - log.exception(f"Execption {e}") - - -security = HTTPBasic() - - -def verification( - credentials: Annotated[HTTPBasicCredentials, Depends(security)], -): - current_username_bytes = credentials.username.encode("utf8") - correct_username_bytes = config.httpauth_username.encode("utf8") - is_correct_username = secrets.compare_digest( - current_username_bytes, correct_username_bytes - ) - current_password_bytes = credentials.password.encode("utf8") - correct_password_bytes = config.httpauth_password.encode("utf8") - is_correct_password = secrets.compare_digest( - current_password_bytes, correct_password_bytes - ) - if not (is_correct_username and is_correct_password): - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect username or password", - headers={"WWW-Authenticate": "Basic"}, - ) - return True - - -def no_verification(): - return True - - -app = FastAPI( - lifespan=app_lifespan, - version=__version__, - docs_url=None, - redoc_url=None, - openapi_url=None, -) - -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], # 允许访问的源 - allow_credentials=False, # 支持 cookie - allow_methods=["*"], # 允许使用的请求方法 - allow_headers=["*"], # 允许携带的 Headers -) -# 添加 GZip 中间件 -app.add_middleware(GZipMiddleware, minimum_size=500) - - -def reset_http_server(): - log.info(f"disable_httpauth:{config.disable_httpauth}") - if config.disable_httpauth: - app.dependency_overrides[verification] = no_verification - else: - app.dependency_overrides = {} - - -class AuthStaticFiles(StaticFiles): - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - - async def __call__(self, scope, receive, send) -> None: - request = Request(scope, receive) - if not config.disable_httpauth: - assert verification(await security(request)) - await super().__call__(scope, receive, send) - - -def HttpInit(_xiaomusic): - global xiaomusic, config, log, onlines - xiaomusic = _xiaomusic - config = xiaomusic.config - log = xiaomusic.log - onlines = set() - folder = os.path.dirname(__file__) - app.mount("/static", AuthStaticFiles(directory=f"{folder}/static"), name="static") - reset_http_server() - - -@app.get("/") -async def read_index(Verifcation=Depends(verification)): - folder = os.path.dirname(__file__) - return FileResponse(f"{folder}/static/index.html") - - -@app.get("/getversion") -def getversion(Verifcation=Depends(verification)): - log.debug("getversion %s", __version__) - return {"version": __version__} - - -@app.get("/getvolume") -async def getvolume(did: str = "", Verifcation=Depends(verification)): - if not xiaomusic.did_exist(did): - return {"volume": 0} - - volume = await xiaomusic.get_volume(did=did) - return {"volume": volume} - - -class DidVolume(BaseModel): - did: str - volume: int = 0 - - -@app.post("/setvolume") -async def setvolume(data: DidVolume, Verifcation=Depends(verification)): - did = data.did - volume = data.volume - if not xiaomusic.did_exist(did): - return {"ret": "Did not exist"} - - log.info(f"set_volume {did} {volume}") - await xiaomusic.set_volume(did=did, arg1=volume) - return {"ret": "OK", "volume": volume} - - -@app.get("/searchmusic") -def searchmusic(name: str = "", Verifcation=Depends(verification)): - return xiaomusic.searchmusic(name) - - -@app.get("/api/search/online") -async def search_online_music( - keyword: str = Query(..., description="搜索关键词"), - plugin: str = Query("all", description="指定插件名称,all表示搜索所有插件"), - page: int = Query(1, description="页码"), - limit: int = Query(20, description="每页数量"), - Verifcation=Depends(verification), -): - """在线音乐搜索API""" - try: - if not keyword: - return {"success": False, "error": "Keyword required"} - - return await xiaomusic.get_music_list_online( - keyword=keyword, plugin=plugin, page=page, limit=limit - ) - except Exception as e: - return {"success": False, "error": str(e)} - - -@app.get("/api/proxy/real-music-url") -async def get_real_music_url( - url: str = Query(..., description="音乐下载URL"), Verifcation=Depends(verification) -): - """通过服务端代理获取真实的音乐播放URL,避免CORS问题""" - try: - # 获取真实的音乐播放URL - return await xiaomusic.get_real_url_of_openapi(url) - - except Exception as e: - log.error(f"获取真实音乐URL失败: {e}") - # 如果代理获取失败,仍然返回原始URL - return {"success": False, "realUrl": url, "error": str(e)} - - -@app.post("/api/play/getMediaSource") -async def get_media_source(request: Request, Verifcation=Depends(verification)): - """获取音乐真实播放URL""" - try: - # 获取请求数据 - data = await request.json() - # 调用公共函数处理 - return await xiaomusic.get_media_source_url(data) - except Exception as e: - return {"success": False, "error": str(e)} - - -@app.post("/api/play/getLyric") -async def get_media_lyric(request: Request, Verifcation=Depends(verification)): - """获取音乐真实播放URL""" - try: - # 获取请求数据 - data = await request.json() - # 调用公共函数处理 - return await xiaomusic.get_media_lyric(data) - except Exception as e: - return {"success": False, "error": str(e)} - - -@app.post("/api/play/online") -async def play_online_music(request: Request, Verifcation=Depends(verification)): - """设备端在线播放插件音乐""" - try: - # 获取请求数据 - data = await request.json() - did = data.get("did") - openapi_info = xiaomusic.js_plugin_manager.get_openapi_info() - if openapi_info.get("enabled", False): - media_source = await xiaomusic.get_real_url_of_openapi(data.get("url")) - else: - # 调用公共函数处理,获取音乐真实播放URL - media_source = await xiaomusic.get_media_source_url(data) - if not media_source or not media_source.get("url"): - return {"success": False, "error": "Failed to get media source URL"} - url = media_source.get("url") - decoded_url = urllib.parse.unquote(url) - return await xiaomusic.play_url(did=did, arg1=decoded_url) - except Exception as e: - return {"success": False, "error": str(e)} - - -# =====================================插件入口函数=============== - - -@app.get("/api/js-plugins") -def get_js_plugins( - enabled_only: bool = Query(False, description="是否只返回启用的插件"), - Verifcation=Depends(verification), -): - """获取插件列表""" - try: - if ( - not hasattr(xiaomusic, "js_plugin_manager") - or not xiaomusic.js_plugin_manager - ): - return {"success": False, "error": "JS Plugin Manager not available"} - # 重新加载插件 - # xiaomusic.js_plugin_manager.reload_plugins() - - if enabled_only: - plugins = xiaomusic.js_plugin_manager.get_enabled_plugins() - else: - plugins = xiaomusic.js_plugin_manager.get_plugin_list() - return {"success": True, "data": plugins} - - except Exception as e: - return {"success": False, "error": str(e)} - - -@app.put("/api/js-plugins/{plugin_name}/enable") -def enable_js_plugin(plugin_name: str, Verifcation=Depends(verification)): - """启用插件""" - try: - if ( - not hasattr(xiaomusic, "js_plugin_manager") - or not xiaomusic.js_plugin_manager - ): - return {"success": False, "error": "JS Plugin Manager not available"} - - success = xiaomusic.js_plugin_manager.enable_plugin(plugin_name) - return {"success": success} - - except Exception as e: - return {"success": False, "error": str(e)} - - -@app.put("/api/js-plugins/{plugin_name}/disable") -def disable_js_plugin(plugin_name: str, Verifcation=Depends(verification)): - """禁用插件""" - try: - if ( - not hasattr(xiaomusic, "js_plugin_manager") - or not xiaomusic.js_plugin_manager - ): - return {"success": False, "error": "JS Plugin Manager not available"} - - success = xiaomusic.js_plugin_manager.disable_plugin(plugin_name) - return {"success": success} - - except Exception as e: - return {"success": False, "error": str(e)} - - -@app.delete("/api/js-plugins/{plugin_name}/uninstall") -def uninstall_js_plugin(plugin_name: str, Verifcation=Depends(verification)): - """卸载插件""" - try: - if ( - not hasattr(xiaomusic, "js_plugin_manager") - or not xiaomusic.js_plugin_manager - ): - return {"success": False, "error": "JS Plugin Manager not available"} - - success = xiaomusic.js_plugin_manager.uninstall_plugin(plugin_name) - return {"success": success} - - except Exception as e: - return {"success": False, "error": str(e)} - - -@app.post("/api/js-plugins/upload") -async def upload_js_plugin( - file: UploadFile = File(...), verification=Depends(verification) -): - """上传 JS 插件""" - try: - # 验证文件扩展名 - if not file.filename.endswith(".js"): - raise HTTPException(status_code=400, detail="只允许上传 .js 文件") - - # 使用 JSPluginManager 中定义的插件目录 - if ( - not hasattr(xiaomusic, "js_plugin_manager") - or not xiaomusic.js_plugin_manager - ): - raise HTTPException( - status_code=500, detail="JS Plugin Manager not available" - ) - - plugin_dir = xiaomusic.js_plugin_manager.plugins_dir - os.makedirs(plugin_dir, exist_ok=True) - file_path = os.path.join(plugin_dir, file.filename) - # 校验是否已存在同名js插件 存在则提示,停止上传 - if os.path.exists(file_path): - raise HTTPException( - status_code=409, detail=f"插件 {file.filename} 已存在,请重命名后再上传" - ) - file_path = os.path.join(plugin_dir, file.filename) - - # 写入文件内容 - async with aiofiles.open(file_path, "wb") as f: - content = await file.read() - await f.write(content) - - # 更新插件配置文件 - plugin_name = os.path.splitext(file.filename)[0] - xiaomusic.js_plugin_manager.update_plugin_config(plugin_name, file.filename) - - # 重新加载插件 - xiaomusic.js_plugin_manager.reload_plugins() - - return {"success": True, "message": "插件上传成功"} - - except Exception as e: - return {"success": False, "error": str(e)} - - -# =====================================开放接口配置函数=============== - - -@app.get("/api/openapi/load") -def get_openapi_info(Verifcation=Depends(verification)): - """获取开放接口配置信息""" - try: - openapi_info = xiaomusic.js_plugin_manager.get_openapi_info() - return {"success": True, "data": openapi_info} - except Exception as e: - return {"success": False, "error": str(e)} - - -@app.post("/api/openapi/toggle") -def toggle_openapi(Verifcation=Depends(verification)): - """开放接口状态切换""" - try: - return xiaomusic.js_plugin_manager.toggle_openapi() - except Exception as e: - return {"success": False, "error": str(e)} - - -@app.post("/api/openapi/updateUrl") -async def update_openapi_url(request: Request, Verifcation=Depends(verification)): - """更新开放接口地址""" - try: - request_json = await request.json() - search_url = request_json.get("search_url") - if not request_json or "search_url" not in request_json: - return {"success": False, "error": "Missing 'search_url' in request body"} - return xiaomusic.js_plugin_manager.update_openapi_url(search_url) - except Exception as e: - return {"success": False, "error": str(e)} - - -# =====================================开放接口函数END=============== - - -@app.get("/playingmusic") -def playingmusic(did: str = "", Verifcation=Depends(verification)): - if not xiaomusic.did_exist(did): - return {"ret": "Did not exist"} - - is_playing = xiaomusic.isplaying(did) - cur_music = xiaomusic.playingmusic(did) - cur_playlist = xiaomusic.get_cur_play_list(did) - # 播放进度 - offset, duration = xiaomusic.get_offset_duration(did) - return { - "ret": "OK", - "is_playing": is_playing, - "cur_music": cur_music, - "cur_playlist": cur_playlist, - "offset": offset, - "duration": duration, - } - - -class DidCmd(BaseModel): - did: str - cmd: str - - -@app.post("/cmd") -async def do_cmd(data: DidCmd, Verifcation=Depends(verification)): - did = data.did - cmd = data.cmd - log.info(f"docmd. did:{did} cmd:{cmd}") - if not xiaomusic.did_exist(did): - return {"ret": "Did not exist"} - - if len(cmd) > 0: - try: - await xiaomusic.cancel_all_tasks() - task = asyncio.create_task(xiaomusic.do_check_cmd(did=did, query=cmd)) - xiaomusic.append_running_task(task) - except Exception as e: - log.warning(f"Execption {e}") - return {"ret": "OK"} - return {"ret": "Unknow cmd"} - - -@app.get("/cmdstatus") -async def cmd_status(Verifcation=Depends(verification)): - finish = await xiaomusic.is_task_finish() - if finish: - return {"ret": "OK", "status": "finish"} - return {"ret": "OK", "status": "running"} - - -@app.get("/getsetting") -async def getsetting(need_device_list: bool = False, Verifcation=Depends(verification)): - config = xiaomusic.getconfig() - data = asdict(config) - data["password"] = "******" - data["httpauth_password"] = "******" - if need_device_list: - device_list = await xiaomusic.getalldevices() - log.info(f"getsetting device_list: {device_list}") - data["device_list"] = device_list - return data - - -@app.post("/savesetting") -async def savesetting(request: Request, Verifcation=Depends(verification)): - try: - data_json = await request.body() - data = json.loads(data_json.decode("utf-8")) - debug_data = deepcopy_data_no_sensitive_info(data) - log.info(f"saveconfig: {debug_data}") - config = xiaomusic.getconfig() - if data["password"] == "******" or data["password"] == "": - data["password"] = config.password - if data["httpauth_password"] == "******" or data["httpauth_password"] == "": - data["httpauth_password"] = config.httpauth_password - await xiaomusic.saveconfig(data) - reset_http_server() - return "save success" - except json.JSONDecodeError as err: - raise HTTPException(status_code=400, detail="Invalid JSON") from err - - -@app.get("/musiclist") -async def musiclist(Verifcation=Depends(verification)): - return xiaomusic.get_music_list() - - -@app.get("/musicinfo") -async def musicinfo( - name: str, musictag: bool = False, Verifcation=Depends(verification) -): - url, _ = await xiaomusic.get_music_url(name) - info = { - "ret": "OK", - "name": name, - "url": url, - } - if musictag: - info["tags"] = xiaomusic.get_music_tags(name) - return info - - -@app.get("/musicinfos") -async def musicinfos( - name: list[str] = Query(None), - musictag: bool = False, - Verifcation=Depends(verification), -): - ret = [] - for music_name in name: - url, _ = await xiaomusic.get_music_url(music_name) - info = { - "name": music_name, - "url": url, - } - if musictag: - info["tags"] = xiaomusic.get_music_tags(music_name) - ret.append(info) - return ret - - -class MusicInfoObj(BaseModel): - musicname: str - title: str = "" - artist: str = "" - album: str = "" - year: str = "" - genre: str = "" - lyrics: str = "" - picture: str = "" # base64 - - -@app.post("/setmusictag") -async def setmusictag(info: MusicInfoObj, Verifcation=Depends(verification)): - ret = xiaomusic.set_music_tag(info.musicname, info) - return {"ret": ret} - - -@app.get("/curplaylist") -async def curplaylist(did: str = "", Verifcation=Depends(verification)): - if not xiaomusic.did_exist(did): - return "" - return xiaomusic.get_cur_play_list(did) - - -class MusicItem(BaseModel): - name: str - - -@app.post("/delmusic") -async def delmusic(data: MusicItem, Verifcation=Depends(verification)): - log.info(data) - await xiaomusic.del_music(data.name) - return "success" - - -class UrlInfo(BaseModel): - url: str - - -class DidPlayMusic(BaseModel): - did: str - musicname: str = "" - searchkey: str = "" - - -@app.post("/playmusic") -async def playmusic(data: DidPlayMusic, Verifcation=Depends(verification)): - did = data.did - musicname = data.musicname - searchkey = data.searchkey - if not xiaomusic.did_exist(did): - return {"ret": "Did not exist"} - - log.info(f"playmusic {did} musicname:{musicname} searchkey:{searchkey}") - await xiaomusic.do_play(did, musicname, searchkey) - return {"ret": "OK"} - - -class DidPlayMusicList(BaseModel): - did: str - listname: str = "" - musicname: str = "" - - -@app.post("/playmusiclist") -async def playmusiclist(data: DidPlayMusicList, Verifcation=Depends(verification)): - did = data.did - listname = data.listname - musicname = data.musicname - if not xiaomusic.did_exist(did): - return {"ret": "Did not exist"} - - log.info(f"playmusiclist {did} listname:{listname} musicname:{musicname}") - await xiaomusic.do_play_music_list(did, listname, musicname) - return {"ret": "OK"} - - -@app.post("/downloadjson") -async def downloadjson(data: UrlInfo, Verifcation=Depends(verification)): - log.info(data) - url = data.url - content = "" - try: - ret = "OK" - content = await downloadfile(url) - except Exception as e: - log.exception(f"Execption {e}") - ret = "Download JSON file failed." - return { - "ret": ret, - "content": content, - } - - -@app.get("/downloadlog") -def downloadlog(Verifcation=Depends(verification)): - file_path = xiaomusic.config.log_file - if os.path.exists(file_path): - # 创建一个临时文件来保存日志的快照 - temp_file = tempfile.NamedTemporaryFile(delete=False) - try: - with open(file_path, "rb") as f: - shutil.copyfileobj(f, temp_file) - temp_file.close() - - # 使用BackgroundTask在响应发送完毕后删除临时文件 - def cleanup_temp_file(tmp_file_path): - os.remove(tmp_file_path) - - background_task = BackgroundTask(cleanup_temp_file, temp_file.name) - return FileResponse( - temp_file.name, - media_type="text/plain", - filename="xiaomusic.txt", - background=background_task, - ) - except Exception as e: - os.remove(temp_file.name) - raise HTTPException( - status_code=500, detail="Error capturing log file" - ) from e - else: - return {"message": "File not found."} - - -@app.get("/playurl") -async def playurl(did: str, url: str, Verifcation=Depends(verification)): - if not xiaomusic.did_exist(did): - return {"ret": "Did not exist"} - decoded_url = urllib.parse.unquote(url) - log.info(f"playurl did: {did} url: {decoded_url}") - return await xiaomusic.play_url(did=did, arg1=decoded_url) - - -@app.get("/playtts") -async def playtts(did: str, text: str, Verifcation=Depends(verification)): - if not xiaomusic.did_exist(did): - return {"ret": "Did not exist"} - - log.info(f"tts {did} {text}") - await xiaomusic.do_tts(did=did, value=text) - return {"ret": "OK"} - - -@app.post("/refreshmusictag") -async def refreshmusictag(Verifcation=Depends(verification)): - xiaomusic.refresh_music_tag() - return { - "ret": "OK", - } - - -@app.post("/debug_play_by_music_url") -async def debug_play_by_music_url(request: Request, Verifcation=Depends(verification)): - try: - data = await request.body() - data_dict = json.loads(data.decode("utf-8")) - log.info(f"data:{data_dict}") - return await xiaomusic.debug_play_by_music_url(arg1=data_dict) - except json.JSONDecodeError as err: - raise HTTPException(status_code=400, detail="Invalid JSON") from err - - -@app.get("/latestversion") -async def latest_version(Verifcation=Depends(verification)): - version = await get_latest_version("xiaomusic") - if version: - return {"ret": "OK", "version": version} - else: - return {"ret": "Fetch version failed"} - - -class DownloadPlayList(BaseModel): - dirname: str - url: str - - -# 下载歌单 -@app.post("/downloadplaylist") -async def downloadplaylist(data: DownloadPlayList, Verifcation=Depends(verification)): - try: - bili_fav_list = await check_bili_fav_list(data.url) - download_proc_list = [] - if bili_fav_list: - for bvid, title in bili_fav_list.items(): - bvurl = f"https://www.bilibili.com/video/{bvid}" - download_proc_list[title] = await download_one_music( - config, bvurl, os.path.join(data.dirname, title) - ) - for title, download_proc_sigle in download_proc_list.items(): - exit_code = await download_proc_sigle.wait() - log.info(f"Download completed {title} with exit code {exit_code}") - dir_path = safe_join_path(config.download_path, data.dirname) - log.debug(f"Download dir_path: {dir_path}") - # 可能只是部分失败,都需要整理下载目录 - remove_common_prefix(dir_path) - chmoddir(dir_path) - return {"ret": "OK"} - else: - download_proc = await download_playlist(config, data.url, data.dirname) - - async def check_download_proc(): - # 等待子进程完成 - exit_code = await download_proc.wait() - log.info(f"Download completed with exit code {exit_code}") - - dir_path = safe_join_path(config.download_path, data.dirname) - log.debug(f"Download dir_path: {dir_path}") - # 可能只是部分失败,都需要整理下载目录 - remove_common_prefix(dir_path) - chmoddir(dir_path) - - asyncio.create_task(check_download_proc()) - return {"ret": "OK"} - except Exception as e: - log.exception(f"Execption {e}") - - return {"ret": "Failed download"} - - -class DownloadOneMusic(BaseModel): - name: str = "" - url: str - - -# 下载单首歌曲 -@app.post("/downloadonemusic") -async def downloadonemusic(data: DownloadOneMusic, Verifcation=Depends(verification)): - try: - download_proc = await download_one_music(config, data.url, data.name) - - async def check_download_proc(): - # 等待子进程完成 - exit_code = await download_proc.wait() - log.info(f"Download completed with exit code {exit_code}") - chmoddir(config.download_path) - - asyncio.create_task(check_download_proc()) - return {"ret": "OK"} - except Exception as e: - log.exception(f"Execption {e}") - - return {"ret": "Failed download"} - - -# 上传 yt-dlp cookies -@app.post("/uploadytdlpcookie") -async def upload_yt_dlp_cookie(file: UploadFile = File(...)): - with open(config.yt_dlp_cookies_path, "wb") as buffer: - shutil.copyfileobj(file.file, buffer) - return { - "ret": "OK", - "filename": file.filename, - "file_location": config.yt_dlp_cookies_path, - } - - -@app.post("/uploadmusic") -async def upload_music(playlist: str = Form(...), file: UploadFile = File(...)): - """上传音乐文件到当前播放列表对应的目录。""" - try: - # 选择目标目录:优先尝试由播放列表中已有歌曲推断目录 - dest_dir = xiaomusic.music_path - # 特殊歌单映射 - if playlist == "下载": - dest_dir = xiaomusic.download_path - elif playlist == "其他": - dest_dir = xiaomusic.music_path - else: - # 如果播放列表中存在歌曲,从其中任意一首推断目录 - musics = xiaomusic.music_list.get(playlist, []) - if musics and len(musics) > 0: - first = musics[0] - filepath = xiaomusic.all_music.get(first, "") - if filepath: - dest_dir = os.path.dirname(filepath) - - # 确保目录存在 - if not os.path.exists(dest_dir): - os.makedirs(dest_dir, exist_ok=True) - - # 保存文件,避免路径穿越 - filename = os.path.basename(file.filename) - if filename == "": - raise HTTPException(status_code=400, detail="Invalid filename") - - dest_path = os.path.join(dest_dir, filename) - # 避免覆盖已有文件,简单地添加序号后缀 - base, ext = os.path.splitext(filename) - counter = 1 - while os.path.exists(dest_path): - filename = f"{base}_{counter}{ext}" - dest_path = os.path.join(dest_dir, filename) - counter += 1 - - with open(dest_path, "wb") as buffer: - shutil.copyfileobj(file.file, buffer) - - # 修复权限并刷新列表索引 - try: - chmoddir(dest_dir) - except Exception: - pass - - # 重新生成音乐列表索引 - try: - xiaomusic._gen_all_music_list() - except Exception: - pass - - return {"ret": "OK", "filename": filename} - except HTTPException: - raise - except Exception as e: - log.exception(f"upload music failed: {e}") - raise HTTPException(status_code=500, detail="Upload failed") from e - - -class PlayListObj(BaseModel): - name: str = "" # 歌单名 - - -# 新增歌单 -@app.post("/playlistadd") -async def playlistadd(data: PlayListObj, Verifcation=Depends(verification)): - ret = xiaomusic.play_list_add(data.name) - if ret: - return {"ret": "OK"} - return {"ret": "Add failed, may be already exist."} - - -# 移除歌单 -@app.post("/playlistdel") -async def playlistdel(data: PlayListObj, Verifcation=Depends(verification)): - ret = xiaomusic.play_list_del(data.name) - if ret: - return {"ret": "OK"} - return {"ret": "Del failed, may be not exist."} - - -class PlayListUpdateObj(BaseModel): - oldname: str # 旧歌单名字 - newname: str # 新歌单名字 - - -# 修改歌单名字 -@app.post("/playlistupdatename") -async def playlistupdatename( - data: PlayListUpdateObj, Verifcation=Depends(verification) -): - ret = xiaomusic.play_list_update_name(data.oldname, data.newname) - if ret: - return {"ret": "OK"} - return {"ret": "Update failed, may be not exist."} - - -# 获取所有自定义歌单 -@app.get("/playlistnames") -async def getplaylistnames(Verifcation=Depends(verification)): - names = xiaomusic.get_play_list_names() - log.info(f"names {names}") - return { - "ret": "OK", - "names": names, - } - - -class PlayListMusicObj(BaseModel): - name: str = "" # 歌单名 - music_list: list[str] # 歌曲名列表 - - -# 歌单新增歌曲 -@app.post("/playlistaddmusic") -async def playlistaddmusic(data: PlayListMusicObj, Verifcation=Depends(verification)): - ret = xiaomusic.play_list_add_music(data.name, data.music_list) - if ret: - return {"ret": "OK"} - return {"ret": "Add failed, may be playlist not exist."} - - -# 歌单移除歌曲 -@app.post("/playlistdelmusic") -async def playlistdelmusic(data: PlayListMusicObj, Verifcation=Depends(verification)): - ret = xiaomusic.play_list_del_music(data.name, data.music_list) - if ret: - return {"ret": "OK"} - return {"ret": "Del failed, may be playlist not exist."} - - -# 歌单更新歌曲 -@app.post("/playlistupdatemusic") -async def playlistupdatemusic( - data: PlayListMusicObj, Verifcation=Depends(verification) -): - ret = xiaomusic.play_list_update_music(data.name, data.music_list) - if ret: - return {"ret": "OK"} - return {"ret": "Del failed, may be playlist not exist."} - - -# 获取歌单中所有歌曲 -@app.get("/playlistmusics") -async def getplaylist(name: str, Verifcation=Depends(verification)): - ret, musics = xiaomusic.play_list_musics(name) - return { - "ret": "OK", - "musics": musics, - } - - -# 更新版本 -@app.post("/updateversion") -async def updateversion( - version: str = "", lite: bool = True, Verifcation=Depends(verification) -): - ret = await update_version(version, lite) - if ret != "OK": - return {"ret": ret} - - asyncio.create_task(restart_xiaomusic()) - return {"ret": "OK"} - - -async def file_iterator(file_path, start, end): - async with aiofiles.open(file_path, mode="rb") as file: - await file.seek(start) - chunk_size = 1024 - while start <= end: - read_size = min(chunk_size, end - start + 1) - data = await file.read(read_size) - if not data: - break - start += len(data) - yield data - - -def access_key_verification(file_path, key, code): - if config.disable_httpauth: - return True - - log.debug(f"访问限制接收端[{file_path}, {key}, {code}]") - if key is not None: - current_key_bytes = key.encode("utf8") - correct_key_bytes = ( - config.httpauth_username + config.httpauth_password - ).encode("utf8") - is_correct_key = secrets.compare_digest(correct_key_bytes, current_key_bytes) - if is_correct_key: - return True - - if code is not None: - current_code_bytes = code.encode("utf8") - correct_code_bytes = ( - hashlib.sha256( - ( - file_path + config.httpauth_username + config.httpauth_password - ).encode("utf-8") - ) - .hexdigest() - .encode("utf-8") - ) - is_correct_code = secrets.compare_digest(correct_code_bytes, current_code_bytes) - if is_correct_code: - return True - - return False - - -def safe_redirect(url): - url = try_add_access_control_param(config, url) - url = url.replace("\\", "") - if not urllib.parse.urlparse(url).netloc and not urllib.parse.urlparse(url).scheme: - log.debug(f"redirect to {url}") - return RedirectResponse(url=url) - return None - - -@app.get("/music/{file_path:path}") -async def music_file(request: Request, file_path: str, key: str = "", code: str = ""): - if not access_key_verification(f"/music/{file_path}", key, code): - raise HTTPException(status_code=404, detail="File not found") - - absolute_path = os.path.abspath(config.music_path) - absolute_file_path = os.path.normpath(os.path.join(absolute_path, file_path)) - if not absolute_file_path.startswith(absolute_path): - raise HTTPException(status_code=404, detail="File not found") - if not os.path.exists(absolute_file_path): - raise HTTPException(status_code=404, detail="File not found") - - # 移除MP3 ID3 v2标签和填充 - if config.remove_id3tag and is_mp3(file_path): - log.info(f"remove_id3tag:{config.remove_id3tag}, is_mp3:True ") - temp_mp3_file = remove_id3_tags(absolute_file_path, config) - if temp_mp3_file: - log.info(f"ID3 tag removed {absolute_file_path} to {temp_mp3_file}") - redirect = safe_redirect(f"/music/{temp_mp3_file}") - if redirect: - return redirect - else: - log.info(f"No ID3 tag remove needed: {absolute_file_path}") - - if config.convert_to_mp3 and not is_mp3(file_path): - temp_mp3_file = convert_file_to_mp3(absolute_file_path, config) - if temp_mp3_file: - log.info(f"Converted file: {absolute_file_path} to {temp_mp3_file}") - redirect = safe_redirect(f"/music/{temp_mp3_file}") - if redirect: - return redirect - else: - log.warning(f"Failed to convert file to MP3 format: {absolute_file_path}") - - return FileResponse(absolute_file_path) - - -@app.options("/music/{file_path:path}") -async def music_options(): - headers = { - "Accept-Ranges": "bytes", - } - return Response(headers=headers) - - -@app.get("/picture/{file_path:path}") -async def get_picture(request: Request, file_path: str, key: str = "", code: str = ""): - if not access_key_verification(f"/picture/{file_path}", key, code): - raise HTTPException(status_code=404, detail="File not found") - - absolute_path = os.path.abspath(config.picture_cache_path) - absolute_file_path = os.path.normpath(os.path.join(absolute_path, file_path)) - if not absolute_file_path.startswith(absolute_path): - raise HTTPException(status_code=404, detail="File not found") - if not os.path.exists(absolute_file_path): - raise HTTPException(status_code=404, detail="File not found") - - return FileResponse(absolute_file_path) - - -@app.get("/docs", include_in_schema=False) -async def get_swagger_documentation(Verifcation=Depends(verification)): - return get_swagger_ui_html(openapi_url="/openapi.json", title="docs") - - -@app.get("/redoc", include_in_schema=False) -async def get_redoc_documentation(Verifcation=Depends(verification)): - return get_redoc_html(openapi_url="/openapi.json", title="docs") - - -@app.get("/openapi.json", include_in_schema=False) -async def openapi(Verifcation=Depends(verification)): - return get_openapi(title=app.title, version=app.version, routes=app.routes) - - -@app.get("/proxy", summary="基于正常下载逻辑的代理接口") -async def proxy(urlb64: str): - try: - # 将Base64编码的URL解码为字符串 - url_bytes = base64.b64decode(urlb64) - url = url_bytes.decode("utf-8") - print(f"解码后的代理请求: {url}") - except Exception as e: - raise HTTPException(status_code=400, detail=f"Base64解码失败: {str(e)}") from e - - log.info(f"代理请求: {url}") - - parsed_url = urlparse(url) - if not parsed_url.scheme or not parsed_url.netloc: - # Fixed: Use a new exception instance since 'e' from previous block is out of scope - invalid_url_exc = ValueError("URL缺少协议或域名") - raise HTTPException( - status_code=400, detail="无效的URL格式" - ) from invalid_url_exc - - # 创建会话并确保关闭 - session = aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=600), - connector=aiohttp.TCPConnector(ssl=True), - ) - - # 复用经过验证的请求头配置 - def get_wget_headers(parsed_url): - return { - "User-Agent": "Wget/1.21.3", - "Accept": "*/*", - "Accept-Encoding": "identity", - "Connection": "Keep-Alive", - } - - async def close_session(): - if not session.closed: - await session.close() - - try: - # 复用download_file中的请求逻辑 - headers = get_wget_headers(parsed_url) - resp = await session.get(url, headers=headers, allow_redirects=True) - - if resp.status not in (200, 206): - await close_session() - status_exc = ValueError(f"服务器返回状态码: {resp.status}") - raise HTTPException( - status_code=resp.status, detail=f"下载失败,状态码: {resp.status}" - ) from status_exc - - # 流式生成器,与download_file的分块逻辑一致 - async def stream_generator(): - try: - async for data in resp.content.iter_chunked(4096): - yield data - finally: - await close_session() - - # 提取文件名 - filename = parsed_url.path.split("/")[-1].split("?")[0] or "output.mp3" - - return StreamingResponse( - stream_generator(), - media_type=resp.headers.get("Content-Type", "audio/mpeg"), - headers={"Content-Disposition": f'inline; filename="{filename}"'}, - background=BackgroundTask(close_session), - ) - - except aiohttp.ClientConnectionError as e: - await close_session() - raise HTTPException(status_code=502, detail=f"连接错误: {str(e)}") from e - except asyncio.TimeoutError as e: - await close_session() - raise HTTPException(status_code=504, detail="下载超时") from e - except Exception as e: - await close_session() - raise HTTPException(status_code=500, detail=f"发生错误: {str(e)}") from e - - -# 配置 -JWT_SECRET = secrets.token_urlsafe(32) -JWT_ALGORITHM = "HS256" -JWT_EXPIRE_SECONDS = 60 * 5 # 5 分钟有效期(足够前端连接和重连) - - -@app.get("/generate_ws_token") -def generate_ws_token( - did: str, - _: bool = Depends(verification), # 复用 HTTP Basic 验证 -): - if not xiaomusic.did_exist(did): - raise HTTPException(status_code=400, detail="Invalid did") - - payload = { - "did": did, - "exp": time.time() + JWT_EXPIRE_SECONDS, - "iat": time.time(), - } - - token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) - - return { - "token": token, - "expire_in": JWT_EXPIRE_SECONDS, - } - - -@app.websocket("/ws/playingmusic") -async def ws_playingmusic(websocket: WebSocket): - token = websocket.query_params.get("token") - if not token: - await websocket.close(code=1008, reason="Missing token") - return - - try: - # 解码 JWT(自动校验签名 + 是否过期) - payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) - did = payload.get("did") - - if not did: - await websocket.close(code=1008, reason="Invalid token") - return - - if not xiaomusic.did_exist(did): - await websocket.close(code=1003, reason="Did not exist") - return - - await websocket.accept() - - # 开始推送状态 - while True: - is_playing = xiaomusic.isplaying(did) - cur_music = xiaomusic.playingmusic(did) - cur_playlist = xiaomusic.get_cur_play_list(did) - offset, duration = xiaomusic.get_offset_duration(did) - - await websocket.send_text( - json.dumps( - { - "ret": "OK", - "is_playing": is_playing, - "cur_music": cur_music, - "cur_playlist": cur_playlist, - "offset": offset, - "duration": duration, - } - ) - ) - await asyncio.sleep(1) - - except jwt.ExpiredSignatureError: - await websocket.close(code=1008, reason="Token expired") - except jwt.InvalidTokenError: - await websocket.close(code=1008, reason="Invalid token") - except WebSocketDisconnect: - print(f"WebSocket disconnected: {did}") - except Exception as e: - print(f"Error: {e}") - await websocket.close()