mirror of
https://github.com/hanxi/xiaomusic.git
synced 2026-03-19 08:49:45 +08:00
Potential fix for code scanning alert no. 66: Full server-side request forgery (#630)
* Potential fix for code scanning alert no. 66: Full server-side request forgery Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * Auto-format code 🧹🌟🤖 --------- Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> Co-authored-by: Formatter [BOT] <runner@runnervmh13bl.i3bvc2db5o2uln5sftk5ylwv3b.ex.internal.cloudapp.net>
This commit is contained in:
@@ -2,12 +2,14 @@
|
||||
import asyncio
|
||||
import base64
|
||||
import copy
|
||||
import ipaddress
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import socket
|
||||
import time
|
||||
import urllib.parse
|
||||
from collections import OrderedDict
|
||||
@@ -1305,11 +1307,57 @@ class XiaoMusic:
|
||||
|
||||
import aiohttp
|
||||
|
||||
# 内部辅助函数:检查主机解析到的IP是否安全,防止访问内网/本地地址
|
||||
def _is_safe_hostname(parsed) -> bool:
|
||||
hostname = parsed.hostname
|
||||
if not hostname:
|
||||
return False
|
||||
try:
|
||||
# 解析主机名对应的所有地址
|
||||
addrinfo_list = socket.getaddrinfo(hostname, None)
|
||||
except Exception:
|
||||
return False
|
||||
for family, _, _, _, sockaddr in addrinfo_list:
|
||||
ip_str = (
|
||||
sockaddr[0] if family in (socket.AF_INET, socket.AF_INET6) else None
|
||||
)
|
||||
if not ip_str:
|
||||
continue
|
||||
try:
|
||||
ip_obj = ipaddress.ip_address(ip_str)
|
||||
except ValueError:
|
||||
return False
|
||||
# 拒绝内网、回环、链路本地、多播和保留地址
|
||||
if (
|
||||
ip_obj.is_private
|
||||
or ip_obj.is_loopback
|
||||
or ip_obj.is_link_local
|
||||
or ip_obj.is_multicast
|
||||
or ip_obj.is_reserved
|
||||
):
|
||||
return False
|
||||
return True
|
||||
|
||||
try:
|
||||
# 验证URL格式
|
||||
parsed_url = urlparse(url)
|
||||
if not parsed_url.scheme or not parsed_url.netloc:
|
||||
return {"success": False, "url": url, "error": "Invalid URL format"}
|
||||
# 仅允许 http/https
|
||||
if parsed_url.scheme not in ("http", "https"):
|
||||
return {
|
||||
"success": False,
|
||||
"url": url,
|
||||
"error": "Unsupported URL scheme",
|
||||
}
|
||||
# 检查主机是否安全,防止SSRF到内网
|
||||
if not _is_safe_hostname(parsed_url):
|
||||
return {
|
||||
"success": False,
|
||||
"url": url,
|
||||
"error": "Unsafe target host",
|
||||
}
|
||||
|
||||
# 创建aiohttp客户端会话
|
||||
async with aiohttp.ClientSession() as session:
|
||||
# 发送HEAD请求跟随重定向
|
||||
|
||||
Reference in New Issue
Block a user