From 5f3f2e174adb6ffd84edfa3a2365050929300bce Mon Sep 17 00:00:00 2001 From: tuanaiseo Date: Tue, 7 Apr 2026 08:33:21 +0700 Subject: [PATCH] fix(security): arbitrary code execution via `eval` in plugin exec (#819) The plugin manager executes plugin code strings using `eval(code, ...)` for both sync and async plugin functions. If an attacker can influence `code` (directly or indirectly via command/config/plugin inputs), this allows arbitrary Python execution in the server process. Affected files: plugin.py Signed-off-by: tuanaiseo <221258316+tuanaiseo@users.noreply.github.com> --- xiaomusic/plugin.py | 51 +++++++++++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/xiaomusic/plugin.py b/xiaomusic/plugin.py index f8cc1f3..9a20f78 100644 --- a/xiaomusic/plugin.py +++ b/xiaomusic/plugin.py @@ -1,3 +1,4 @@ +import ast import importlib import inspect import pkgutil @@ -46,28 +47,52 @@ class PluginManager: """返回包含所有插件函数的字典,可以用作 exec 要执行的代码的命名空间""" return self._funcs.copy() + def _parse_plugin_call(self, code): + try: + expression = ast.parse(code, mode="eval") + except SyntaxError as exc: + raise ValueError("Invalid plugin code.") from exc + + if not isinstance(expression.body, ast.Call): + raise ValueError("Plugin code must be a function call.") + + call = expression.body + if not isinstance(call.func, ast.Name): + raise ValueError("Plugin code must call a plugin function directly.") + if call.keywords: + raise ValueError("Keyword arguments are not supported.") + + return call.func.id, [self._parse_plugin_arg(arg) for arg in call.args] + + def _parse_plugin_arg(self, arg): + if isinstance(arg, ast.Constant): + if isinstance(arg.value, (str, int, float, bool, type(None))): + return arg.value + elif isinstance(arg, ast.List): + return [self._parse_plugin_arg(item) for item in arg.elts] + elif isinstance(arg, ast.Tuple): + return tuple(self._parse_plugin_arg(item) for item in arg.elts) + elif isinstance(arg, ast.Dict): + if any(key is None for key in arg.keys): + raise ValueError("Unsupported plugin argument.") + keys = [self._parse_plugin_arg(key) for key in arg.keys] + values = [self._parse_plugin_arg(value) for value in arg.values] + return dict(zip(keys, values)) + + raise ValueError("Unsupported plugin argument.") + async def execute_plugin(self, code): """ 执行指定的插件代码。插件函数可以是同步或异步。 :param code: 需要执行的插件函数代码(例如 'plugin1("hello")') """ - # 分解代码字符串以获取函数名 - func_name = code.split("(")[0] + func_name, args = self._parse_plugin_call(code) - # 根据解析出的函数名从插件字典中获取函数 plugin_func = self.get_func(func_name) - if not plugin_func: raise ValueError(f"No plugin function named '{func_name}' found.") - # 检查函数是否是异步函数 - global_namespace = globals().copy() - local_namespace = self.get_local_namespace() if inspect.iscoroutinefunction(plugin_func): - # 如果是异步函数,构建执行用的协程对象 - coroutine = eval(code, global_namespace, local_namespace) - # 等待协程执行 - await coroutine + await plugin_func(*args) else: - # 如果是普通函数,直接执行代码 - eval(code, global_namespace, local_namespace) + plugin_func(*args)