1
0
mirror of https://github.com/hanxi/xiaomusic.git synced 2026-04-14 20:53:41 +08:00

refactor: 重构拆分 httpserver 文件

This commit is contained in:
涵曦
2026-01-06 09:24:44 +08:00
parent ddf2aef7b7
commit b38154738d
14 changed files with 1461 additions and 1282 deletions

View File

@@ -0,0 +1,5 @@
"""API 模块统一入口"""
from xiaomusic.api.app import HttpInit, app
__all__ = ["app", "HttpInit"]

79
xiaomusic/api/app.py Normal file
View File

@@ -0,0 +1,79 @@
"""FastAPI 应用实例和中间件配置"""
import asyncio
import os
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware
from xiaomusic import __version__
from xiaomusic.api.dependencies import (
AuthStaticFiles,
reset_http_server,
)
if TYPE_CHECKING:
from xiaomusic.xiaomusic import XiaoMusic
# 导入全局变量引用(将在 HttpInit 中初始化)
import xiaomusic.api.dependencies as deps
@asynccontextmanager
async def app_lifespan(app):
"""应用生命周期管理"""
if deps.xiaomusic is not None:
asyncio.create_task(deps.xiaomusic.run_forever())
try:
yield
except Exception as e:
deps.log.exception(f"Execption {e}")
# 创建 FastAPI 应用实例
app = FastAPI(
lifespan=app_lifespan,
version=__version__,
docs_url=None,
redoc_url=None,
openapi_url=None,
)
# 添加 CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许访问的源
allow_credentials=False, # 支持 cookie
allow_methods=["*"], # 允许使用的请求方法
allow_headers=["*"], # 允许携带的 Headers
)
# 添加 GZip 中间件
app.add_middleware(GZipMiddleware, minimum_size=500)
def HttpInit(_xiaomusic: "XiaoMusic"):
"""初始化 HTTP 服务器
Args:
_xiaomusic: XiaoMusic 实例
"""
# 设置全局变量
deps.xiaomusic = _xiaomusic
deps.config = _xiaomusic.config
deps.log = _xiaomusic.log
# 挂载静态文件
folder = os.path.dirname(os.path.dirname(__file__)) # xiaomusic 目录
app.mount("/static", AuthStaticFiles(directory=f"{folder}/static"), name="static")
# 注册所有路由
from xiaomusic.api.routers import register_routers
register_routers(app)
# 重置 HTTP 服务器配置
reset_http_server(app)

View File

@@ -0,0 +1,102 @@
"""依赖注入和认证相关功能"""
import hashlib
import secrets
from typing import TYPE_CHECKING, Annotated
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.staticfiles import StaticFiles
if TYPE_CHECKING:
from xiaomusic.xiaomusic import XiaoMusic
# 全局变量
xiaomusic: "XiaoMusic" = None
config = None
log = None
security = HTTPBasic()
def verification(
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
):
"""HTTP Basic 认证"""
current_username_bytes = credentials.username.encode("utf8")
correct_username_bytes = config.httpauth_username.encode("utf8")
is_correct_username = secrets.compare_digest(
current_username_bytes, correct_username_bytes
)
current_password_bytes = credentials.password.encode("utf8")
correct_password_bytes = config.httpauth_password.encode("utf8")
is_correct_password = secrets.compare_digest(
current_password_bytes, correct_password_bytes
)
if not (is_correct_username and is_correct_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Basic"},
)
return True
def no_verification():
"""无认证模式"""
return True
def access_key_verification(file_path: str, key: str, code: str) -> bool:
"""访问密钥验证"""
if config.disable_httpauth:
return True
log.debug(f"访问限制接收端[{file_path}, {key}, {code}]")
if key is not None:
current_key_bytes = key.encode("utf8")
correct_key_bytes = (
config.httpauth_username + config.httpauth_password
).encode("utf8")
is_correct_key = secrets.compare_digest(correct_key_bytes, current_key_bytes)
if is_correct_key:
return True
if code is not None:
current_code_bytes = code.encode("utf8")
correct_code_bytes = (
hashlib.sha256(
(
file_path + config.httpauth_username + config.httpauth_password
).encode("utf-8")
)
.hexdigest()
.encode("utf-8")
)
is_correct_code = secrets.compare_digest(correct_code_bytes, current_code_bytes)
if is_correct_code:
return True
return False
class AuthStaticFiles(StaticFiles):
"""需要认证的静态文件服务"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
async def __call__(self, scope, receive, send) -> None:
request = Request(scope, receive)
if not config.disable_httpauth:
assert verification(await security(request))
await super().__call__(scope, receive, send)
def reset_http_server(app):
"""重置 HTTP 服务器配置"""
log.info(f"disable_httpauth:{config.disable_httpauth}")
if config.disable_httpauth:
app.dependency_overrides[verification] = no_verification
else:
app.dependency_overrides = {}

68
xiaomusic/api/models.py Normal file
View File

@@ -0,0 +1,68 @@
"""Pydantic 数据模型定义"""
from pydantic import BaseModel
class DidVolume(BaseModel):
did: str
volume: int = 0
class DidCmd(BaseModel):
did: str
cmd: str
class MusicInfoObj(BaseModel):
musicname: str
title: str = ""
artist: str = ""
album: str = ""
year: str = ""
genre: str = ""
lyrics: str = ""
picture: str = "" # base64
class MusicItem(BaseModel):
name: str
class UrlInfo(BaseModel):
url: str
class DidPlayMusic(BaseModel):
did: str
musicname: str = ""
searchkey: str = ""
class DidPlayMusicList(BaseModel):
did: str
listname: str = ""
musicname: str = ""
class DownloadPlayList(BaseModel):
dirname: str
url: str
class DownloadOneMusic(BaseModel):
name: str = ""
url: str
class PlayListObj(BaseModel):
name: str = "" # 歌单名
class PlayListUpdateObj(BaseModel):
oldname: str # 旧歌单名字
newname: str # 新歌单名字
class PlayListMusicObj(BaseModel):
name: str = "" # 歌单名
music_list: list[str] # 歌曲名列表

View File

@@ -0,0 +1,27 @@
"""路由注册"""
from xiaomusic.api import websocket
from xiaomusic.api.routers import (
device,
file,
music,
playlist,
plugin,
system,
)
def register_routers(app):
"""注册所有路由到应用
Args:
app: FastAPI 应用实例
"""
# 注册各个路由模块
app.include_router(system.router, tags=["系统管理"])
app.include_router(device.router, tags=["设备控制"])
app.include_router(music.router, tags=["音乐管理"])
app.include_router(playlist.router, tags=["播放列表"])
app.include_router(plugin.router, tags=["插件管理"])
app.include_router(file.router, tags=["文件操作"])
app.include_router(websocket.router, tags=["WebSocket"])

View File

@@ -0,0 +1,84 @@
"""设备控制路由"""
import asyncio
import urllib.parse
from fastapi import APIRouter, Depends
from xiaomusic.api.dependencies import log, verification, xiaomusic
from xiaomusic.api.models import DidCmd, DidVolume
router = APIRouter()
@router.get("/getvolume")
async def getvolume(did: str = "", Verifcation=Depends(verification)):
"""获取音量"""
if not xiaomusic.did_exist(did):
return {"volume": 0}
volume = await xiaomusic.get_volume(did=did)
return {"volume": volume}
@router.post("/setvolume")
async def setvolume(data: DidVolume, Verifcation=Depends(verification)):
"""设置音量"""
did = data.did
volume = data.volume
if not xiaomusic.did_exist(did):
return {"ret": "Did not exist"}
log.info(f"set_volume {did} {volume}")
await xiaomusic.set_volume(did=did, arg1=volume)
return {"ret": "OK", "volume": volume}
@router.post("/cmd")
async def do_cmd(data: DidCmd, Verifcation=Depends(verification)):
"""执行命令"""
did = data.did
cmd = data.cmd
log.info(f"docmd. did:{did} cmd:{cmd}")
if not xiaomusic.did_exist(did):
return {"ret": "Did not exist"}
if len(cmd) > 0:
try:
await xiaomusic.cancel_all_tasks()
task = asyncio.create_task(xiaomusic.do_check_cmd(did=did, query=cmd))
xiaomusic.append_running_task(task)
except Exception as e:
log.warning(f"Execption {e}")
return {"ret": "OK"}
return {"ret": "Unknow cmd"}
@router.get("/cmdstatus")
async def cmd_status(Verifcation=Depends(verification)):
"""命令状态"""
finish = await xiaomusic.is_task_finish()
if finish:
return {"ret": "OK", "status": "finish"}
return {"ret": "OK", "status": "running"}
@router.get("/playurl")
async def playurl(did: str, url: str, Verifcation=Depends(verification)):
"""播放 URL"""
if not xiaomusic.did_exist(did):
return {"ret": "Did not exist"}
decoded_url = urllib.parse.unquote(url)
log.info(f"playurl did: {did} url: {decoded_url}")
return await xiaomusic.play_url(did=did, arg1=decoded_url)
@router.get("/playtts")
async def playtts(did: str, text: str, Verifcation=Depends(verification)):
"""播放 TTS"""
if not xiaomusic.did_exist(did):
return {"ret": "Did not exist"}
log.info(f"tts {did} {text}")
await xiaomusic.do_tts(did=did, value=text)
return {"ret": "OK"}

View File

@@ -0,0 +1,349 @@
"""文件操作路由"""
import asyncio
import base64
import os
import shutil
from urllib.parse import urlparse
import aiohttp
from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import (
FileResponse,
RedirectResponse,
Response,
StreamingResponse,
)
from starlette.background import BackgroundTask
from xiaomusic.api.dependencies import (
access_key_verification,
config,
log,
verification,
xiaomusic,
)
from xiaomusic.api.models import DownloadOneMusic, DownloadPlayList, UrlInfo
from xiaomusic.utils import (
check_bili_fav_list,
chmoddir,
convert_file_to_mp3,
download_one_music,
download_playlist,
downloadfile,
is_mp3,
remove_common_prefix,
remove_id3_tags,
safe_join_path,
try_add_access_control_param,
)
router = APIRouter()
@router.post("/downloadjson")
async def downloadjson(data: UrlInfo, Verifcation=Depends(verification)):
"""下载 JSON"""
log.info(data)
url = data.url
content = ""
try:
ret = "OK"
content = await downloadfile(url)
except Exception as e:
log.exception(f"Execption {e}")
ret = "Download JSON file failed."
return {
"ret": ret,
"content": content,
}
@router.post("/downloadplaylist")
async def downloadplaylist(data: DownloadPlayList, Verifcation=Depends(verification)):
"""下载歌单"""
try:
bili_fav_list = await check_bili_fav_list(data.url)
download_proc_list = []
if bili_fav_list:
for bvid, title in bili_fav_list.items():
bvurl = f"https://www.bilibili.com/video/{bvid}"
download_proc_list[title] = await download_one_music(
config, bvurl, os.path.join(data.dirname, title)
)
for title, download_proc_sigle in download_proc_list.items():
exit_code = await download_proc_sigle.wait()
log.info(f"Download completed {title} with exit code {exit_code}")
dir_path = safe_join_path(config.download_path, data.dirname)
log.debug(f"Download dir_path: {dir_path}")
# 可能只是部分失败,都需要整理下载目录
remove_common_prefix(dir_path)
chmoddir(dir_path)
return {"ret": "OK"}
else:
download_proc = await download_playlist(config, data.url, data.dirname)
async def check_download_proc():
# 等待子进程完成
exit_code = await download_proc.wait()
log.info(f"Download completed with exit code {exit_code}")
dir_path = safe_join_path(config.download_path, data.dirname)
log.debug(f"Download dir_path: {dir_path}")
# 可能只是部分失败,都需要整理下载目录
remove_common_prefix(dir_path)
chmoddir(dir_path)
asyncio.create_task(check_download_proc())
return {"ret": "OK"}
except Exception as e:
log.exception(f"Execption {e}")
return {"ret": "Failed download"}
@router.post("/downloadonemusic")
async def downloadonemusic(data: DownloadOneMusic, Verifcation=Depends(verification)):
"""下载单首歌曲"""
try:
download_proc = await download_one_music(config, data.url, data.name)
async def check_download_proc():
# 等待子进程完成
exit_code = await download_proc.wait()
log.info(f"Download completed with exit code {exit_code}")
chmoddir(config.download_path)
asyncio.create_task(check_download_proc())
return {"ret": "OK"}
except Exception as e:
log.exception(f"Execption {e}")
return {"ret": "Failed download"}
@router.post("/uploadytdlpcookie")
async def upload_yt_dlp_cookie(file: UploadFile = File(...)):
"""上传 yt-dlp cookies"""
with open(config.yt_dlp_cookies_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {
"ret": "OK",
"filename": file.filename,
"file_location": config.yt_dlp_cookies_path,
}
@router.post("/uploadmusic")
async def upload_music(playlist: str = Form(...), file: UploadFile = File(...)):
"""上传音乐文件到当前播放列表对应的目录"""
try:
# 选择目标目录:优先尝试由播放列表中已有歌曲推断目录
dest_dir = xiaomusic.music_path
# 特殊歌单映射
if playlist == "下载":
dest_dir = xiaomusic.download_path
elif playlist == "其他":
dest_dir = xiaomusic.music_path
else:
# 如果播放列表中存在歌曲,从其中任意一首推断目录
musics = xiaomusic.music_list.get(playlist, [])
if musics and len(musics) > 0:
first = musics[0]
filepath = xiaomusic.all_music.get(first, "")
if filepath:
dest_dir = os.path.dirname(filepath)
# 确保目录存在
if not os.path.exists(dest_dir):
os.makedirs(dest_dir, exist_ok=True)
# 保存文件,避免路径穿越
filename = os.path.basename(file.filename)
if filename == "":
raise HTTPException(status_code=400, detail="Invalid filename")
dest_path = os.path.join(dest_dir, filename)
# 避免覆盖已有文件,简单地添加序号后缀
base, ext = os.path.splitext(filename)
counter = 1
while os.path.exists(dest_path):
filename = f"{base}_{counter}{ext}"
dest_path = os.path.join(dest_dir, filename)
counter += 1
with open(dest_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 修复权限并刷新列表索引
try:
chmoddir(dest_dir)
except Exception:
pass
# 重新生成音乐列表索引
try:
xiaomusic._gen_all_music_list()
except Exception:
pass
return {"ret": "OK", "filename": filename}
except HTTPException:
raise
except Exception as e:
log.exception(f"upload music failed: {e}")
raise HTTPException(status_code=500, detail="Upload failed") from e
def safe_redirect(url):
"""安全重定向"""
url = try_add_access_control_param(config, url)
url = url.replace("\\", "")
if not urlparse(url).netloc and not urlparse(url).scheme:
log.debug(f"redirect to {url}")
return RedirectResponse(url=url)
return None
@router.get("/music/{file_path:path}")
async def music_file(request: Request, file_path: str, key: str = "", code: str = ""):
"""音乐文件访问"""
if not access_key_verification(f"/music/{file_path}", key, code):
raise HTTPException(status_code=404, detail="File not found")
absolute_path = os.path.abspath(config.music_path)
absolute_file_path = os.path.normpath(os.path.join(absolute_path, file_path))
if not absolute_file_path.startswith(absolute_path):
raise HTTPException(status_code=404, detail="File not found")
if not os.path.exists(absolute_file_path):
raise HTTPException(status_code=404, detail="File not found")
# 移除MP3 ID3 v2标签和填充
if config.remove_id3tag and is_mp3(file_path):
log.info(f"remove_id3tag:{config.remove_id3tag}, is_mp3:True ")
temp_mp3_file = remove_id3_tags(absolute_file_path, config)
if temp_mp3_file:
log.info(f"ID3 tag removed {absolute_file_path} to {temp_mp3_file}")
redirect = safe_redirect(f"/music/{temp_mp3_file}")
if redirect:
return redirect
else:
log.info(f"No ID3 tag remove needed: {absolute_file_path}")
if config.convert_to_mp3 and not is_mp3(file_path):
temp_mp3_file = convert_file_to_mp3(absolute_file_path, config)
if temp_mp3_file:
log.info(f"Converted file: {absolute_file_path} to {temp_mp3_file}")
redirect = safe_redirect(f"/music/{temp_mp3_file}")
if redirect:
return redirect
else:
log.warning(f"Failed to convert file to MP3 format: {absolute_file_path}")
return FileResponse(absolute_file_path)
@router.options("/music/{file_path:path}")
async def music_options():
"""音乐文件 OPTIONS"""
headers = {
"Accept-Ranges": "bytes",
}
return Response(headers=headers)
@router.get("/picture/{file_path:path}")
async def get_picture(request: Request, file_path: str, key: str = "", code: str = ""):
"""图片文件访问"""
if not access_key_verification(f"/picture/{file_path}", key, code):
raise HTTPException(status_code=404, detail="File not found")
absolute_path = os.path.abspath(config.picture_cache_path)
absolute_file_path = os.path.normpath(os.path.join(absolute_path, file_path))
if not absolute_file_path.startswith(absolute_path):
raise HTTPException(status_code=404, detail="File not found")
if not os.path.exists(absolute_file_path):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(absolute_file_path)
@router.get("/proxy", summary="基于正常下载逻辑的代理接口")
async def proxy(urlb64: str):
"""代理接口"""
try:
# 将Base64编码的URL解码为字符串
url_bytes = base64.b64decode(urlb64)
url = url_bytes.decode("utf-8")
print(f"解码后的代理请求: {url}")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Base64解码失败: {str(e)}") from e
log.info(f"代理请求: {url}")
parsed_url = urlparse(url)
if not parsed_url.scheme or not parsed_url.netloc:
# Fixed: Use a new exception instance since 'e' from previous block is out of scope
invalid_url_exc = ValueError("URL缺少协议或域名")
raise HTTPException(
status_code=400, detail="无效的URL格式"
) from invalid_url_exc
# 创建会话并确保关闭
session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=600),
connector=aiohttp.TCPConnector(ssl=True),
)
# 复用经过验证的请求头配置
def get_wget_headers(parsed_url):
return {
"User-Agent": "Wget/1.21.3",
"Accept": "*/*",
"Accept-Encoding": "identity",
"Connection": "Keep-Alive",
}
async def close_session():
if not session.closed:
await session.close()
try:
# 复用download_file中的请求逻辑
headers = get_wget_headers(parsed_url)
resp = await session.get(url, headers=headers, allow_redirects=True)
if resp.status not in (200, 206):
await close_session()
status_exc = ValueError(f"服务器返回状态码: {resp.status}")
raise HTTPException(
status_code=resp.status, detail=f"下载失败,状态码: {resp.status}"
) from status_exc
# 流式生成器与download_file的分块逻辑一致
async def stream_generator():
try:
async for data in resp.content.iter_chunked(4096):
yield data
finally:
await close_session()
# 提取文件名
filename = parsed_url.path.split("/")[-1].split("?")[0] or "output.mp3"
return StreamingResponse(
stream_generator(),
media_type=resp.headers.get("Content-Type", "audio/mpeg"),
headers={"Content-Disposition": f'inline; filename="{filename}"'},
background=BackgroundTask(close_session),
)
except aiohttp.ClientConnectionError as e:
await close_session()
raise HTTPException(status_code=502, detail=f"连接错误: {str(e)}") from e
except asyncio.TimeoutError as e:
await close_session()
raise HTTPException(status_code=504, detail="下载超时") from e
except Exception as e:
await close_session()
raise HTTPException(status_code=500, detail=f"发生错误: {str(e)}") from e

View File

@@ -0,0 +1,215 @@
"""音乐管理路由"""
import json
import urllib.parse
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from xiaomusic.api.dependencies import log, verification, xiaomusic
from xiaomusic.api.models import (
DidPlayMusic,
MusicInfoObj,
MusicItem,
)
router = APIRouter()
@router.get("/searchmusic")
def searchmusic(name: str = "", Verifcation=Depends(verification)):
"""搜索音乐"""
return xiaomusic.searchmusic(name)
@router.get("/api/search/online")
async def search_online_music(
keyword: str = Query(..., description="搜索关键词"),
plugin: str = Query("all", description="指定插件名称all表示搜索所有插件"),
page: int = Query(1, description="页码"),
limit: int = Query(20, description="每页数量"),
Verifcation=Depends(verification),
):
"""在线音乐搜索API"""
try:
if not keyword:
return {"success": False, "error": "Keyword required"}
return await xiaomusic.get_music_list_online(
keyword=keyword, plugin=plugin, page=page, limit=limit
)
except Exception as e:
return {"success": False, "error": str(e)}
@router.get("/api/proxy/real-music-url")
async def get_real_music_url(
url: str = Query(..., description="音乐下载URL"), Verifcation=Depends(verification)
):
"""通过服务端代理获取真实的音乐播放URL避免CORS问题"""
try:
# 获取真实的音乐播放URL
return await xiaomusic.get_real_url_of_openapi(url)
except Exception as e:
log.error(f"获取真实音乐URL失败: {e}")
# 如果代理获取失败仍然返回原始URL
return {"success": False, "realUrl": url, "error": str(e)}
@router.post("/api/play/getMediaSource")
async def get_media_source(request: Request, Verifcation=Depends(verification)):
"""获取音乐真实播放URL"""
try:
# 获取请求数据
data = await request.json()
# 调用公共函数处理
return await xiaomusic.get_media_source_url(data)
except Exception as e:
return {"success": False, "error": str(e)}
@router.post("/api/play/getLyric")
async def get_media_lyric(request: Request, Verifcation=Depends(verification)):
"""获取音乐歌词"""
try:
# 获取请求数据
data = await request.json()
# 调用公共函数处理
return await xiaomusic.get_media_lyric(data)
except Exception as e:
return {"success": False, "error": str(e)}
@router.post("/api/play/online")
async def play_online_music(request: Request, Verifcation=Depends(verification)):
"""设备端在线播放插件音乐"""
try:
# 获取请求数据
data = await request.json()
did = data.get("did")
openapi_info = xiaomusic.js_plugin_manager.get_openapi_info()
if openapi_info.get("enabled", False):
media_source = await xiaomusic.get_real_url_of_openapi(data.get("url"))
else:
# 调用公共函数处理,获取音乐真实播放URL
media_source = await xiaomusic.get_media_source_url(data)
if not media_source or not media_source.get("url"):
return {"success": False, "error": "Failed to get media source URL"}
url = media_source.get("url")
decoded_url = urllib.parse.unquote(url)
return await xiaomusic.play_url(did=did, arg1=decoded_url)
except Exception as e:
return {"success": False, "error": str(e)}
@router.get("/playingmusic")
def playingmusic(did: str = "", Verifcation=Depends(verification)):
"""当前播放音乐"""
if not xiaomusic.did_exist(did):
return {"ret": "Did not exist"}
is_playing = xiaomusic.isplaying(did)
cur_music = xiaomusic.playingmusic(did)
cur_playlist = xiaomusic.get_cur_play_list(did)
# 播放进度
offset, duration = xiaomusic.get_offset_duration(did)
return {
"ret": "OK",
"is_playing": is_playing,
"cur_music": cur_music,
"cur_playlist": cur_playlist,
"offset": offset,
"duration": duration,
}
@router.get("/musiclist")
async def musiclist(Verifcation=Depends(verification)):
"""音乐列表"""
return xiaomusic.get_music_list()
@router.get("/musicinfo")
async def musicinfo(
name: str, musictag: bool = False, Verifcation=Depends(verification)
):
"""音乐信息"""
url, _ = await xiaomusic.get_music_url(name)
info = {
"ret": "OK",
"name": name,
"url": url,
}
if musictag:
info["tags"] = xiaomusic.get_music_tags(name)
return info
@router.get("/musicinfos")
async def musicinfos(
name: list[str] = Query(None),
musictag: bool = False,
Verifcation=Depends(verification),
):
"""批量音乐信息"""
ret = []
for music_name in name:
url, _ = await xiaomusic.get_music_url(music_name)
info = {
"name": music_name,
"url": url,
}
if musictag:
info["tags"] = xiaomusic.get_music_tags(music_name)
ret.append(info)
return ret
@router.post("/setmusictag")
async def setmusictag(info: MusicInfoObj, Verifcation=Depends(verification)):
"""设置音乐标签"""
ret = xiaomusic.set_music_tag(info.musicname, info)
return {"ret": ret}
@router.post("/delmusic")
async def delmusic(data: MusicItem, Verifcation=Depends(verification)):
"""删除音乐"""
log.info(data)
await xiaomusic.del_music(data.name)
return "success"
@router.post("/playmusic")
async def playmusic(data: DidPlayMusic, Verifcation=Depends(verification)):
"""播放音乐"""
did = data.did
musicname = data.musicname
searchkey = data.searchkey
if not xiaomusic.did_exist(did):
return {"ret": "Did not exist"}
log.info(f"playmusic {did} musicname:{musicname} searchkey:{searchkey}")
await xiaomusic.do_play(did, musicname, searchkey)
return {"ret": "OK"}
@router.post("/refreshmusictag")
async def refreshmusictag(Verifcation=Depends(verification)):
"""刷新音乐标签"""
xiaomusic.refresh_music_tag()
return {
"ret": "OK",
}
@router.post("/debug_play_by_music_url")
async def debug_play_by_music_url(request: Request, Verifcation=Depends(verification)):
"""调试播放音乐URL"""
try:
data = await request.body()
data_dict = json.loads(data.decode("utf-8"))
log.info(f"data:{data_dict}")
return await xiaomusic.debug_play_by_music_url(arg1=data_dict)
except json.JSONDecodeError as err:
raise HTTPException(status_code=400, detail="Invalid JSON") from err

View File

@@ -0,0 +1,114 @@
"""播放列表路由"""
from fastapi import APIRouter, Depends
from xiaomusic.api.dependencies import log, verification, xiaomusic
from xiaomusic.api.models import (
DidPlayMusicList,
PlayListMusicObj,
PlayListObj,
PlayListUpdateObj,
)
router = APIRouter()
@router.get("/curplaylist")
async def curplaylist(did: str = "", Verifcation=Depends(verification)):
"""当前播放列表"""
if not xiaomusic.did_exist(did):
return ""
return xiaomusic.get_cur_play_list(did)
@router.post("/playmusiclist")
async def playmusiclist(data: DidPlayMusicList, Verifcation=Depends(verification)):
"""播放音乐列表"""
did = data.did
listname = data.listname
musicname = data.musicname
if not xiaomusic.did_exist(did):
return {"ret": "Did not exist"}
log.info(f"playmusiclist {did} listname:{listname} musicname:{musicname}")
await xiaomusic.do_play_music_list(did, listname, musicname)
return {"ret": "OK"}
@router.post("/playlistadd")
async def playlistadd(data: PlayListObj, Verifcation=Depends(verification)):
"""新增歌单"""
ret = xiaomusic.play_list_add(data.name)
if ret:
return {"ret": "OK"}
return {"ret": "Add failed, may be already exist."}
@router.post("/playlistdel")
async def playlistdel(data: PlayListObj, Verifcation=Depends(verification)):
"""移除歌单"""
ret = xiaomusic.play_list_del(data.name)
if ret:
return {"ret": "OK"}
return {"ret": "Del failed, may be not exist."}
@router.post("/playlistupdatename")
async def playlistupdatename(
data: PlayListUpdateObj, Verifcation=Depends(verification)
):
"""修改歌单名字"""
ret = xiaomusic.play_list_update_name(data.oldname, data.newname)
if ret:
return {"ret": "OK"}
return {"ret": "Update failed, may be not exist."}
@router.get("/playlistnames")
async def getplaylistnames(Verifcation=Depends(verification)):
"""获取所有自定义歌单"""
names = xiaomusic.get_play_list_names()
log.info(f"names {names}")
return {
"ret": "OK",
"names": names,
}
@router.post("/playlistaddmusic")
async def playlistaddmusic(data: PlayListMusicObj, Verifcation=Depends(verification)):
"""歌单新增歌曲"""
ret = xiaomusic.play_list_add_music(data.name, data.music_list)
if ret:
return {"ret": "OK"}
return {"ret": "Add failed, may be playlist not exist."}
@router.post("/playlistdelmusic")
async def playlistdelmusic(data: PlayListMusicObj, Verifcation=Depends(verification)):
"""歌单移除歌曲"""
ret = xiaomusic.play_list_del_music(data.name, data.music_list)
if ret:
return {"ret": "OK"}
return {"ret": "Del failed, may be playlist not exist."}
@router.post("/playlistupdatemusic")
async def playlistupdatemusic(
data: PlayListMusicObj, Verifcation=Depends(verification)
):
"""歌单更新歌曲"""
ret = xiaomusic.play_list_update_music(data.name, data.music_list)
if ret:
return {"ret": "OK"}
return {"ret": "Del failed, may be playlist not exist."}
@router.get("/playlistmusics")
async def getplaylist(name: str, Verifcation=Depends(verification)):
"""获取歌单中所有歌曲"""
ret, musics = xiaomusic.play_list_musics(name)
return {
"ret": "OK",
"musics": musics,
}

View File

@@ -0,0 +1,163 @@
"""插件管理路由"""
import os
import aiofiles
from fastapi import APIRouter, Depends, File, HTTPException, Query, Request, UploadFile
from xiaomusic.api.dependencies import verification, xiaomusic
router = APIRouter()
@router.get("/api/js-plugins")
def get_js_plugins(
enabled_only: bool = Query(False, description="是否只返回启用的插件"),
Verifcation=Depends(verification),
):
"""获取插件列表"""
try:
if (
not hasattr(xiaomusic, "js_plugin_manager")
or not xiaomusic.js_plugin_manager
):
return {"success": False, "error": "JS Plugin Manager not available"}
if enabled_only:
plugins = xiaomusic.js_plugin_manager.get_enabled_plugins()
else:
plugins = xiaomusic.js_plugin_manager.get_plugin_list()
return {"success": True, "data": plugins}
except Exception as e:
return {"success": False, "error": str(e)}
@router.put("/api/js-plugins/{plugin_name}/enable")
def enable_js_plugin(plugin_name: str, Verifcation=Depends(verification)):
"""启用插件"""
try:
if (
not hasattr(xiaomusic, "js_plugin_manager")
or not xiaomusic.js_plugin_manager
):
return {"success": False, "error": "JS Plugin Manager not available"}
success = xiaomusic.js_plugin_manager.enable_plugin(plugin_name)
return {"success": success}
except Exception as e:
return {"success": False, "error": str(e)}
@router.put("/api/js-plugins/{plugin_name}/disable")
def disable_js_plugin(plugin_name: str, Verifcation=Depends(verification)):
"""禁用插件"""
try:
if (
not hasattr(xiaomusic, "js_plugin_manager")
or not xiaomusic.js_plugin_manager
):
return {"success": False, "error": "JS Plugin Manager not available"}
success = xiaomusic.js_plugin_manager.disable_plugin(plugin_name)
return {"success": success}
except Exception as e:
return {"success": False, "error": str(e)}
@router.delete("/api/js-plugins/{plugin_name}/uninstall")
def uninstall_js_plugin(plugin_name: str, Verifcation=Depends(verification)):
"""卸载插件"""
try:
if (
not hasattr(xiaomusic, "js_plugin_manager")
or not xiaomusic.js_plugin_manager
):
return {"success": False, "error": "JS Plugin Manager not available"}
success = xiaomusic.js_plugin_manager.uninstall_plugin(plugin_name)
return {"success": success}
except Exception as e:
return {"success": False, "error": str(e)}
@router.post("/api/js-plugins/upload")
async def upload_js_plugin(
file: UploadFile = File(...), verification_dep=Depends(verification)
):
"""上传 JS 插件"""
try:
# 验证文件扩展名
if not file.filename.endswith(".js"):
raise HTTPException(status_code=400, detail="只允许上传 .js 文件")
# 使用 JSPluginManager 中定义的插件目录
if (
not hasattr(xiaomusic, "js_plugin_manager")
or not xiaomusic.js_plugin_manager
):
raise HTTPException(
status_code=500, detail="JS Plugin Manager not available"
)
plugin_dir = xiaomusic.js_plugin_manager.plugins_dir
os.makedirs(plugin_dir, exist_ok=True)
file_path = os.path.join(plugin_dir, file.filename)
# 校验是否已存在同名js插件 存在则提示,停止上传
if os.path.exists(file_path):
raise HTTPException(
status_code=409, detail=f"插件 {file.filename} 已存在,请重命名后再上传"
)
file_path = os.path.join(plugin_dir, file.filename)
# 写入文件内容
async with aiofiles.open(file_path, "wb") as f:
content = await file.read()
await f.write(content)
# 更新插件配置文件
plugin_name = os.path.splitext(file.filename)[0]
xiaomusic.js_plugin_manager.update_plugin_config(plugin_name, file.filename)
# 重新加载插件
xiaomusic.js_plugin_manager.reload_plugins()
return {"success": True, "message": "插件上传成功"}
except Exception as e:
return {"success": False, "error": str(e)}
@router.get("/api/openapi/load")
def get_openapi_info(Verifcation=Depends(verification)):
"""获取开放接口配置信息"""
try:
openapi_info = xiaomusic.js_plugin_manager.get_openapi_info()
return {"success": True, "data": openapi_info}
except Exception as e:
return {"success": False, "error": str(e)}
@router.post("/api/openapi/toggle")
def toggle_openapi(Verifcation=Depends(verification)):
"""开放接口状态切换"""
try:
return xiaomusic.js_plugin_manager.toggle_openapi()
except Exception as e:
return {"success": False, "error": str(e)}
@router.post("/api/openapi/updateUrl")
async def update_openapi_url(request: Request, Verifcation=Depends(verification)):
"""更新开放接口地址"""
try:
request_json = await request.json()
search_url = request_json.get("search_url")
if not request_json or "search_url" not in request_json:
return {"success": False, "error": "Missing 'search_url' in request body"}
return xiaomusic.js_plugin_manager.update_openapi_url(search_url)
except Exception as e:
return {"success": False, "error": str(e)}

View File

@@ -0,0 +1,157 @@
"""系统管理路由"""
import json
import os
import shutil
import tempfile
from dataclasses import asdict
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html
from fastapi.openapi.utils import get_openapi
from fastapi.responses import FileResponse
from starlette.background import BackgroundTask
from xiaomusic import __version__
from xiaomusic.api.dependencies import log, verification, xiaomusic
from xiaomusic.utils import (
deepcopy_data_no_sensitive_info,
get_latest_version,
restart_xiaomusic,
update_version,
)
router = APIRouter()
@router.get("/")
async def read_index(Verifcation=Depends(verification)):
"""首页"""
folder = os.path.dirname(
os.path.dirname(os.path.dirname(__file__))
) # xiaomusic 目录
return FileResponse(f"{folder}/static/index.html")
@router.get("/getversion")
def getversion(Verifcation=Depends(verification)):
"""获取版本"""
log.debug("getversion %s", __version__)
return {"version": __version__}
@router.get("/getsetting")
async def getsetting(need_device_list: bool = False, Verifcation=Depends(verification)):
"""获取设置"""
config_data = xiaomusic.getconfig()
data = asdict(config_data)
data["password"] = "******"
data["httpauth_password"] = "******"
if need_device_list:
device_list = await xiaomusic.getalldevices()
log.info(f"getsetting device_list: {device_list}")
data["device_list"] = device_list
return data
@router.post("/savesetting")
async def savesetting(request: Request, Verifcation=Depends(verification)):
"""保存设置"""
try:
data_json = await request.body()
data = json.loads(data_json.decode("utf-8"))
debug_data = deepcopy_data_no_sensitive_info(data)
log.info(f"saveconfig: {debug_data}")
config_obj = xiaomusic.getconfig()
if data["password"] == "******" or data["password"] == "":
data["password"] = config_obj.password
if data["httpauth_password"] == "******" or data["httpauth_password"] == "":
data["httpauth_password"] = config_obj.httpauth_password
await xiaomusic.saveconfig(data)
# 重置 HTTP 服务器配置
from xiaomusic.api.app import app
from xiaomusic.api.dependencies import reset_http_server
reset_http_server(app)
return "save success"
except json.JSONDecodeError as err:
raise HTTPException(status_code=400, detail="Invalid JSON") from err
@router.get("/downloadlog")
def downloadlog(Verifcation=Depends(verification)):
"""下载日志"""
file_path = xiaomusic.config.log_file
if os.path.exists(file_path):
# 创建一个临时文件来保存日志的快照
temp_file = tempfile.NamedTemporaryFile(delete=False)
try:
with open(file_path, "rb") as f:
shutil.copyfileobj(f, temp_file)
temp_file.close()
# 使用BackgroundTask在响应发送完毕后删除临时文件
def cleanup_temp_file(tmp_file_path):
os.remove(tmp_file_path)
background_task = BackgroundTask(cleanup_temp_file, temp_file.name)
return FileResponse(
temp_file.name,
media_type="text/plain",
filename="xiaomusic.txt",
background=background_task,
)
except Exception as e:
os.remove(temp_file.name)
raise HTTPException(
status_code=500, detail="Error capturing log file"
) from e
else:
return {"message": "File not found."}
@router.get("/latestversion")
async def latest_version(Verifcation=Depends(verification)):
"""获取最新版本"""
version = await get_latest_version("xiaomusic")
if version:
return {"ret": "OK", "version": version}
else:
return {"ret": "Fetch version failed"}
@router.post("/updateversion")
async def updateversion(
version: str = "", lite: bool = True, Verifcation=Depends(verification)
):
"""更新版本"""
import asyncio
ret = await update_version(version, lite)
if ret != "OK":
return {"ret": ret}
asyncio.create_task(restart_xiaomusic())
return {"ret": "OK"}
@router.get("/docs", include_in_schema=False)
async def get_swagger_documentation(Verifcation=Depends(verification)):
"""Swagger 文档"""
return get_swagger_ui_html(openapi_url="/openapi.json", title="docs")
@router.get("/redoc", include_in_schema=False)
async def get_redoc_documentation(Verifcation=Depends(verification)):
"""ReDoc 文档"""
return get_redoc_html(openapi_url="/openapi.json", title="docs")
@router.get("/openapi.json", include_in_schema=False)
async def openapi(Verifcation=Depends(verification)):
"""OpenAPI 规范"""
from xiaomusic.api.app import app
return get_openapi(title=app.title, version=app.version, routes=app.routes)

View File

@@ -0,0 +1,96 @@
"""WebSocket 相关功能"""
import asyncio
import json
import secrets
import time
import jwt
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
from xiaomusic.api.dependencies import verification, xiaomusic
router = APIRouter()
# JWT 配置
JWT_SECRET = secrets.token_urlsafe(32)
JWT_ALGORITHM = "HS256"
JWT_EXPIRE_SECONDS = 60 * 5 # 5 分钟有效期(足够前端连接和重连)
@router.get("/generate_ws_token")
def generate_ws_token(
did: str,
_: bool = verification, # 复用 HTTP Basic 验证
):
"""生成 WebSocket token"""
if not xiaomusic.did_exist(did):
raise HTTPException(status_code=400, detail="Invalid did")
payload = {
"did": did,
"exp": time.time() + JWT_EXPIRE_SECONDS,
"iat": time.time(),
}
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return {
"token": token,
"expire_in": JWT_EXPIRE_SECONDS,
}
@router.websocket("/ws/playingmusic")
async def ws_playingmusic(websocket: WebSocket):
"""WebSocket 播放状态推送"""
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=1008, reason="Missing token")
return
try:
# 解码 JWT自动校验签名 + 是否过期)
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
did = payload.get("did")
if not did:
await websocket.close(code=1008, reason="Invalid token")
return
if not xiaomusic.did_exist(did):
await websocket.close(code=1003, reason="Did not exist")
return
await websocket.accept()
# 开始推送状态
while True:
is_playing = xiaomusic.isplaying(did)
cur_music = xiaomusic.playingmusic(did)
cur_playlist = xiaomusic.get_cur_play_list(did)
offset, duration = xiaomusic.get_offset_duration(did)
await websocket.send_text(
json.dumps(
{
"ret": "OK",
"is_playing": is_playing,
"cur_music": cur_music,
"cur_playlist": cur_playlist,
"offset": offset,
"duration": duration,
}
)
)
await asyncio.sleep(1)
except jwt.ExpiredSignatureError:
await websocket.close(code=1008, reason="Token expired")
except jwt.InvalidTokenError:
await websocket.close(code=1008, reason="Invalid token")
except WebSocketDisconnect:
print(f"WebSocket disconnected: {did}")
except Exception as e:
print(f"Error: {e}")
await websocket.close()

View File

@@ -38,9 +38,9 @@ def main():
import uvicorn
from xiaomusic import __version__
from xiaomusic.api import HttpInit
from xiaomusic.api import app as HttpApp
from xiaomusic.config import Config
from xiaomusic.httpserver import HttpInit
from xiaomusic.httpserver import app as HttpApp
from xiaomusic.xiaomusic import XiaoMusic
parser = argparse.ArgumentParser()

File diff suppressed because it is too large Load Diff