1
0
mirror of https://github.com/hanxi/xiaomusic.git synced 2026-05-02 23:39:51 +08:00

fix(security): arbitrary code execution via eval in plugin exec (#819)

The plugin manager executes plugin code strings using `eval(code, ...)` for both sync and async plugin functions. If an attacker can influence `code` (directly or indirectly via command/config/plugin inputs), this allows arbitrary Python execution in the server process.

Affected files: plugin.py

Signed-off-by: tuanaiseo <221258316+tuanaiseo@users.noreply.github.com>
This commit is contained in:
tuanaiseo
2026-04-07 08:33:21 +07:00
committed by GitHub
parent 178797fd91
commit 5f3f2e174a

View File

@@ -1,3 +1,4 @@
import ast
import importlib
import inspect
import pkgutil
@@ -46,28 +47,52 @@ class PluginManager:
"""返回包含所有插件函数的字典,可以用作 exec 要执行的代码的命名空间"""
return self._funcs.copy()
def _parse_plugin_call(self, code):
try:
expression = ast.parse(code, mode="eval")
except SyntaxError as exc:
raise ValueError("Invalid plugin code.") from exc
if not isinstance(expression.body, ast.Call):
raise ValueError("Plugin code must be a function call.")
call = expression.body
if not isinstance(call.func, ast.Name):
raise ValueError("Plugin code must call a plugin function directly.")
if call.keywords:
raise ValueError("Keyword arguments are not supported.")
return call.func.id, [self._parse_plugin_arg(arg) for arg in call.args]
def _parse_plugin_arg(self, arg):
if isinstance(arg, ast.Constant):
if isinstance(arg.value, (str, int, float, bool, type(None))):
return arg.value
elif isinstance(arg, ast.List):
return [self._parse_plugin_arg(item) for item in arg.elts]
elif isinstance(arg, ast.Tuple):
return tuple(self._parse_plugin_arg(item) for item in arg.elts)
elif isinstance(arg, ast.Dict):
if any(key is None for key in arg.keys):
raise ValueError("Unsupported plugin argument.")
keys = [self._parse_plugin_arg(key) for key in arg.keys]
values = [self._parse_plugin_arg(value) for value in arg.values]
return dict(zip(keys, values))
raise ValueError("Unsupported plugin argument.")
async def execute_plugin(self, code):
"""
执行指定的插件代码。插件函数可以是同步或异步。
:param code: 需要执行的插件函数代码(例如 'plugin1("hello")'
"""
# 分解代码字符串以获取函数名
func_name = code.split("(")[0]
func_name, args = self._parse_plugin_call(code)
# 根据解析出的函数名从插件字典中获取函数
plugin_func = self.get_func(func_name)
if not plugin_func:
raise ValueError(f"No plugin function named '{func_name}' found.")
# 检查函数是否是异步函数
global_namespace = globals().copy()
local_namespace = self.get_local_namespace()
if inspect.iscoroutinefunction(plugin_func):
# 如果是异步函数,构建执行用的协程对象
coroutine = eval(code, global_namespace, local_namespace)
# 等待协程执行
await coroutine
await plugin_func(*args)
else:
# 如果是普通函数,直接执行代码
eval(code, global_namespace, local_namespace)
plugin_func(*args)