"""插件拉取 / 启停管理""" from __future__ import annotations import os import re import stat import shutil import subprocess import sys import threading import time from pathlib import Path from typing import Any import requests _ROOT = Path(__file__).resolve().parents[2] _EXT_ROOT = _ROOT / "_ext_targets" _LOG_ROOT = Path(__file__).resolve().parent / "external_logs" _LOG_ROOT.mkdir(parents=True, exist_ok=True) _REMOTE_URLS = { "cliproxyapi": "https://github.com/router-for-me/CLIProxyAPI.git", "grok2api": "https://github.com/chenyme/grok2api.git", "kiro-manager": "https://github.com/hj01857655/kiro-account-manager.git", } _KIRO_MANAGER_MSI_URL = ( "https://github.com/hj01857655/kiro-account-manager/releases/download/" "v1.8.3/KiroAccountManager_1.8.3_x64_zh-CN.msi" ) _KIRO_MANAGER_MSI = _EXT_ROOT / "KiroAccountManager_1.8.3_x64_zh-CN.msi" _KIRO_MANAGER_EXTRACT_DIR = _EXT_ROOT / "kiro-manager-msi-extract" _KIRO_MANAGER_EXTRACT_EXE = _KIRO_MANAGER_EXTRACT_DIR / "PFiles" / "KiroAccountManager" / "kiro-account-manager.exe" _SERVICE_META = { "cliproxyapi": { "label": "CLIProxyAPI", "repo_name": "CLIProxyAPI", "url": "http://127.0.0.1:8317", "health": "http://127.0.0.1:8317/", "management_url": "http://127.0.0.1:8317/management.html", "port": 8317, "kind": "web", }, "grok2api": { "label": "grok2api", "repo_name": "grok2api", "url": "http://127.0.0.1:8011", "health": "http://127.0.0.1:8011/health", "management_url": "http://127.0.0.1:8011/admin", "port": 8011, "kind": "web", }, "kiro-manager": { "label": "Kiro Account Manager", "repo_name": "kiro-account-manager", "url": "", "health": "", "kind": "desktop", }, } _PROCS: dict[str, subprocess.Popen] = {} _LOG_FILES: dict[str, Any] = {} _LAST_ERROR: dict[str, str] = {} _LOCK = threading.Lock() _SEMVER_TAG_PATTERN = re.compile(r"^v?\d+\.\d+\.\d+(?:-[0-9A-Za-z.-]+)?(?:\+[0-9A-Za-z.-]+)?$") def _get_setting(key: str, default: str = "") -> str: try: from core.config_store import config_store value = str(config_store.get(key, "") or "").strip() return value or default except Exception: return default def _creationflags() -> int: return getattr(subprocess, "CREATE_NO_WINDOW", 0) def _repo_path(name: str) -> Path: return _EXT_ROOT / _SERVICE_META[name]["repo_name"] def _log_path(name: str) -> Path: return _LOG_ROOT / f"{name}.log" def _close_log(name: str): f = _LOG_FILES.pop(name, None) if f: try: f.close() except Exception: pass def _open_log(name: str): _close_log(name) f = open(_log_path(name), "a", encoding="utf-8") _LOG_FILES[name] = f return f def _make_tree_writable(path: Path): if not path.exists(): return for root, dirs, files in os.walk(path): for dirname in dirs: p = Path(root) / dirname try: p.chmod(p.stat().st_mode | stat.S_IWRITE) except Exception: pass for filename in files: p = Path(root) / filename try: p.chmod(p.stat().st_mode | stat.S_IWRITE) except Exception: pass try: path.chmod(path.stat().st_mode | stat.S_IWRITE) except Exception: pass def _kill_processes_touching_path(path: Path): if os.name != "nt": return try: subprocess.run( [ "powershell", "-NoProfile", "-Command", "$p=$args[0]; " "Get-CimInstance Win32_Process | " "Where-Object { (($_.CommandLine -like ('*' + $p + '*')) -or ($_.ExecutablePath -like ('*' + $p + '*'))) } | " "ForEach-Object { Stop-Process -Id $_.ProcessId -Force -ErrorAction SilentlyContinue }", str(path), ], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) except Exception: pass def _external_apps_update_mode() -> str: mode = _get_setting("external_apps_update_mode", "tag").strip().lower() return "branch" if mode == "branch" else "tag" def _git_has_remote_branch(repo: Path, branch: str) -> bool: if not branch: return False check = subprocess.run( ["git", "-C", str(repo), "show-ref", "--verify", "--quiet", f"refs/remotes/origin/{branch}"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) return check.returncode == 0 def _origin_default_branch(repo: Path) -> str: try: out = subprocess.check_output( ["git", "-C", str(repo), "symbolic-ref", "--short", "refs/remotes/origin/HEAD"], text=True, creationflags=_creationflags(), ).strip() if out.startswith("origin/"): branch = out.split("/", 1)[1].strip() if branch: return branch except Exception: pass for candidate in ("main", "master"): if _git_has_remote_branch(repo, candidate): return candidate return "main" def _current_local_branch(repo: Path) -> str: try: branch = subprocess.check_output( ["git", "-C", str(repo), "rev-parse", "--abbrev-ref", "HEAD"], text=True, creationflags=_creationflags(), ).strip() except Exception: return "" return branch if branch and branch != "HEAD" else "" def _sync_repo_to_branch_head(repo: Path, preferred_branch: str = ""): candidates = [] preferred = str(preferred_branch or "").strip() if preferred: candidates.append(preferred) local_branch = _current_local_branch(repo) if local_branch and local_branch not in candidates: candidates.append(local_branch) default_branch = _origin_default_branch(repo) if default_branch and default_branch not in candidates: candidates.append(default_branch) for fallback in ("main", "master"): if fallback not in candidates: candidates.append(fallback) for branch in candidates: if not _git_has_remote_branch(repo, branch): continue subprocess.run( ["git", "-C", str(repo), "checkout", "-B", branch, f"origin/{branch}"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) subprocess.run( ["git", "-C", str(repo), "reset", "--hard", f"origin/{branch}"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) subprocess.run( ["git", "-C", str(repo), "clean", "-fd"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) return raise RuntimeError(f"未找到可用远端分支(repo={repo})") def _latest_semver_tag(repo: Path) -> str: try: out = subprocess.check_output( [ "git", "-C", str(repo), "for-each-ref", "refs/tags", "--sort=-version:refname", "--format=%(refname:strip=2)", ], text=True, creationflags=_creationflags(), ) except Exception: return "" for line in out.splitlines(): tag = str(line or "").strip() if not tag: continue if _SEMVER_TAG_PATTERN.fullmatch(tag): return tag return "" def _sync_repo_to_latest_semver_tag(repo: Path) -> bool: tag = _latest_semver_tag(repo) if not tag: return False subprocess.run( ["git", "-C", str(repo), "checkout", "--force", tag], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) subprocess.run( ["git", "-C", str(repo), "reset", "--hard", tag], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) subprocess.run( ["git", "-C", str(repo), "clean", "-fd"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) return True def _sync_repo_to_latest(name: str): repo = _repo_path(name) repo.parent.mkdir(parents=True, exist_ok=True) if not repo.exists(): subprocess.run( ["git", "clone", _REMOTE_URLS[name], str(repo)], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) subprocess.run( ["git", "-C", str(repo), "fetch", "--all", "--tags", "--prune"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) mode = _external_apps_update_mode() if mode == "branch": _sync_repo_to_branch_head(repo) return if not _sync_repo_to_latest_semver_tag(repo): _sync_repo_to_branch_head(repo) def install(name: str) -> dict[str, Any]: with _LOCK: if name not in _SERVICE_META: raise KeyError(name) _sync_repo_to_latest(name) return _status_one(name) def uninstall(name: str) -> dict[str, Any]: if name not in _SERVICE_META: raise KeyError(name) try: stop(name) except Exception: pass with _LOCK: repo = _repo_path(name) if repo.exists(): _kill_processes_touching_path(repo) last_exc: Exception | None = None for _ in range(12): try: _make_tree_writable(repo) shutil.rmtree(repo) last_exc = None break except Exception as exc: last_exc = exc _kill_processes_touching_path(repo) try: subprocess.run( ["attrib", "-R", str(repo / "*"), "/S", "/D"], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) except Exception: pass time.sleep(0.5) if repo.exists(): _LAST_ERROR[name] = ( f"卸载失败:目录仍存在 {repo}" + (f",原因:{last_exc}" if last_exc else "") ) raise RuntimeError(_LAST_ERROR[name]) _PROCS.pop(name, None) _LAST_ERROR.pop(name, None) _close_log(name) return _status_one(name) def _health_ok(name: str) -> bool: url = _SERVICE_META[name].get("health") if not url: return False try: r = requests.get(url, timeout=2) return r.status_code < 500 except Exception: return False def _find_pid_by_port(port: int) -> int | None: if not port: return None try: out = subprocess.check_output( ["netstat", "-ano", "-p", "tcp"], text=True, creationflags=_creationflags(), ) except Exception: return None for line in out.splitlines(): parts = line.split() if len(parts) >= 5 and parts[0].upper() == "TCP": local = parts[1] state = parts[3].upper() pid = parts[4] if local.endswith(f":{port}") and state == "LISTENING": try: return int(pid) except Exception: return None return None def _proc_running(name: str) -> bool: proc = _PROCS.get(name) return bool(proc and proc.poll() is None) def _kiro_known_exe_paths() -> list[str]: candidates: list[str] = [] try: from core.config_store import config_store configured = str(config_store.get("kiro_manager_exe", "") or "").strip() if configured and Path(configured).exists(): candidates.append(str(Path(configured).resolve()).lower()) except Exception: pass for item in [ Path(os.environ.get("LOCALAPPDATA", "")) / "Programs" / "KiroAccountManager" / "KiroAccountManager.exe", Path(os.environ.get("ProgramFiles", "")) / "KiroAccountManager" / "KiroAccountManager.exe", Path(os.environ.get("LOCALAPPDATA", "")) / "Programs" / "kiro-account-manager" / "kiro-account-manager.exe", Path(os.environ.get("ProgramFiles", "")) / "kiro-account-manager" / "kiro-account-manager.exe", _KIRO_MANAGER_EXTRACT_EXE, ]: if item.exists(): candidates.append(str(item.resolve()).lower()) return candidates def _find_desktop_pid(name: str) -> int | None: if name != "kiro-manager": return None target_paths = set(_kiro_known_exe_paths()) try: processes = subprocess.check_output( [ "powershell", "-NoProfile", "-Command", "Get-CimInstance Win32_Process | " "Where-Object { $_.Name -in @('KiroAccountManager.exe','kiro-account-manager.exe') } | " "Select-Object ProcessId,ExecutablePath | ConvertTo-Json -Compress", ], text=True, creationflags=_creationflags(), ).strip() except Exception: return None if not processes: return None try: import json data = json.loads(processes) items = data if isinstance(data, list) else [data] for item in items: pid = item.get("ProcessId") exe = str(item.get("ExecutablePath") or "").strip() if not pid: continue if not target_paths: return int(pid) if exe: try: if str(Path(exe).resolve()).lower() in target_paths: return int(pid) except Exception: if exe.lower() in target_paths: return int(pid) except Exception: return None return None def _status_one(name: str) -> dict[str, Any]: meta = _SERVICE_META[name] repo = _repo_path(name) proc = _PROCS.get(name) desktop_pid = _find_desktop_pid(name) if meta["kind"] == "desktop" else None running = _health_ok(name) if meta["kind"] == "web" else bool(desktop_pid or _proc_running(name)) pid = proc.pid if proc and proc.poll() is None else desktop_pid if meta["kind"] == "web" and running: pid = _find_pid_by_port(int(meta.get("port") or 0)) or pid return { "name": name, "label": meta["label"], "repo_path": str(repo), "repo_exists": repo.exists(), "url": meta.get("url", ""), "management_url": meta.get("management_url", ""), "management_key": ( _get_setting("cliproxyapi_management_key", "cliproxyapi") if name == "cliproxyapi" else _get_setting("grok2api_app_key", "grok2api") if name == "grok2api" else "" ), "running": running, "pid": pid, "log_path": str(_log_path(name)), "last_error": _LAST_ERROR.get(name, ""), "kind": meta["kind"], } def list_status() -> list[dict[str, Any]]: return [_status_one(name) for name in _SERVICE_META] def _find_go() -> str | None: candidates = [ shutil.which("go"), str(Path.home() / "go" / "pkg" / "mod" / "golang.org" / "toolchain@v0.0.1-go1.24.10.windows-amd64" / "bin" / "go.exe"), str(Path.home() / "go" / "pkg" / "mod" / "golang.org" / "toolchain@v0.0.1-go1.24.0.windows-amd64" / "bin" / "go.exe"), r"C:\Program Files\Go\bin\go.exe", ] for item in candidates: if item and Path(item).exists(): return item return None def _conda_exe() -> str | None: candidates = [ shutil.which("conda"), r"D:\miniconda\conda3\Scripts\conda.exe", r"D:\miniconda\conda3\Library\bin\conda.bat", ] for item in candidates: if item and Path(item).exists(): return item return None def _uv_exe() -> str | None: candidate = shutil.which("uv") if candidate and Path(candidate).exists(): return candidate return None def _venv_python(repo: Path) -> Path: if os.name == "nt": return repo / ".venv" / "Scripts" / "python.exe" return repo / ".venv" / "bin" / "python" def _resolve_kiro_exe() -> str | None: try: from core.config_store import config_store configured = str(config_store.get("kiro_manager_exe", "") or "").strip() if configured and Path(configured).exists(): return configured except Exception: pass candidates = [ Path(os.environ.get("LOCALAPPDATA", "")) / "Programs" / "KiroAccountManager" / "KiroAccountManager.exe", Path(os.environ.get("ProgramFiles", "")) / "KiroAccountManager" / "KiroAccountManager.exe", Path(os.environ.get("LOCALAPPDATA", "")) / "Programs" / "kiro-account-manager" / "kiro-account-manager.exe", Path(os.environ.get("ProgramFiles", "")) / "kiro-account-manager" / "kiro-account-manager.exe", _KIRO_MANAGER_EXTRACT_EXE, ] for item in candidates: if item.exists(): return str(item) extracted = _ensure_kiro_extracted_exe() if extracted: return extracted return None def _download_file(url: str, dest: Path): dest.parent.mkdir(parents=True, exist_ok=True) with requests.get(url, stream=True, timeout=60) as resp: resp.raise_for_status() with open(dest, "wb") as f: for chunk in resp.iter_content(chunk_size=1024 * 1024): if chunk: f.write(chunk) def _ensure_kiro_extracted_exe() -> str | None: if _KIRO_MANAGER_EXTRACT_EXE.exists(): return str(_KIRO_MANAGER_EXTRACT_EXE) if not _KIRO_MANAGER_MSI.exists(): _download_file(_KIRO_MANAGER_MSI_URL, _KIRO_MANAGER_MSI) _KIRO_MANAGER_EXTRACT_DIR.mkdir(parents=True, exist_ok=True) subprocess.run( [ "msiexec.exe", "/a", str(_KIRO_MANAGER_MSI), f"TARGETDIR={_KIRO_MANAGER_EXTRACT_DIR}", "/qn", ], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) if _KIRO_MANAGER_EXTRACT_EXE.exists(): return str(_KIRO_MANAGER_EXTRACT_EXE) return None def _ensure_grok2api_conda_env(repo: Path) -> str: env_name = "grok2api-313" conda = _conda_exe() if not conda: raise RuntimeError("未找到 conda,无法为 grok2api 自动创建 Python 3.13 环境") check = subprocess.run( [conda, "run", "--no-capture-output", "-n", env_name, "python", "--version"], cwd=str(repo), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) if check.returncode != 0: subprocess.run( [conda, "create", "-y", "-n", env_name, "python=3.13"], cwd=str(repo), check=True, creationflags=_creationflags(), ) marker = repo / ".grok2api-env-ready" if not marker.exists(): subprocess.run( [conda, "run", "--no-capture-output", "-n", env_name, "python", "-m", "pip", "install", "--upgrade", "pip"], cwd=str(repo), check=True, creationflags=_creationflags(), ) subprocess.run( [conda, "run", "--no-capture-output", "-n", env_name, "python", "-m", "pip", "install", "."], cwd=str(repo), check=True, creationflags=_creationflags(), ) marker.write_text(env_name, encoding="utf-8") return env_name def _ensure_grok2api_uv_env(repo: Path) -> str: uv = _uv_exe() if not uv: raise RuntimeError("未找到 uv,无法为 grok2api 自动创建项目虚拟环境") marker = repo / ".grok2api-env-ready" if not marker.exists() or marker.read_text(encoding="utf-8").strip() != "uv": subprocess.run( [ uv, "sync", "--frozen", "--no-dev", "--no-install-project", "--python", "3.13", ], cwd=str(repo), check=True, creationflags=_creationflags(), ) marker.write_text("uv", encoding="utf-8") venv_python = _venv_python(repo) if not venv_python.exists(): raise RuntimeError("grok2api 的 uv 环境创建失败,未找到 .venv/python") return str(venv_python) def _ensure_cliproxyapi_runtime_config(repo: Path): config_path = repo / "config.local.yaml" if not config_path.exists(): shutil.copyfile(repo / "config.example.yaml", config_path) secret = _get_setting("cliproxyapi_management_key", "cliproxyapi") lines = config_path.read_text(encoding="utf-8").splitlines() updated_lines = [] replaced = False for line in lines: if line.lstrip().startswith("secret-key:"): indent = line[: len(line) - len(line.lstrip())] updated_lines.append(f'{indent}secret-key: "{secret}"') replaced = True else: updated_lines.append(line) if not replaced: updated_lines.append(f' secret-key: "{secret}"') config_path.write_text("\n".join(updated_lines) + "\n", encoding="utf-8") def _ensure_grok2api_runtime_config(repo: Path): data_dir = repo / "data" data_dir.mkdir(parents=True, exist_ok=True) config_file = data_dir / "config.toml" app_key = _get_setting("grok2api_app_key", "grok2api") default_config = repo / "config.defaults.toml" if not config_file.exists(): if default_config.exists(): config_file.write_text(default_config.read_text(encoding="utf-8"), encoding="utf-8") else: config_file.write_text("[app]\n", encoding="utf-8") lines = config_file.read_text(encoding="utf-8").splitlines() updated_lines: list[str] = [] in_app = False app_section_found = False app_key_written = False for line in lines: stripped = line.strip() if stripped.startswith("[") and stripped.endswith("]"): if in_app and not app_key_written: updated_lines.append(f'app_key = "{app_key}"') app_key_written = True in_app = stripped == "[app]" app_section_found = app_section_found or in_app updated_lines.append(line) continue if in_app and stripped.startswith("app_key"): indent = line[: len(line) - len(line.lstrip())] updated_lines.append(f'{indent}app_key = "{app_key}"') app_key_written = True continue updated_lines.append(line) if not app_section_found: if updated_lines and updated_lines[-1].strip(): updated_lines.append("") updated_lines.append("[app]") updated_lines.append(f'app_key = "{app_key}"') elif in_app and not app_key_written: updated_lines.append(f'app_key = "{app_key}"') config_file.write_text("\n".join(updated_lines) + "\n", encoding="utf-8") def _build_command(name: str) -> tuple[list[str], Path]: repo = _repo_path(name) if name == "cliproxyapi": go_exe = _find_go() if not go_exe: raise RuntimeError("未找到 go,可在设置中先安装 Go 或将 go.exe 加入 PATH") _ensure_cliproxyapi_runtime_config(repo) config_path = repo / "config.local.yaml" return [go_exe, "run", "./cmd/server", "-config", str(config_path)], repo if name == "grok2api": _ensure_grok2api_runtime_config(repo) conda = _conda_exe() if conda: env_name = _ensure_grok2api_conda_env(repo) return [ conda, "run", "--no-capture-output", "-n", env_name, "python", "-m", "granian", "--interface", "asgi", "--host", "127.0.0.1", "--port", "8011", "--workers", "1", "main:app", ], repo python_exe = _ensure_grok2api_uv_env(repo) return [ python_exe, "-m", "granian", "--interface", "asgi", "--host", "127.0.0.1", "--port", "8011", "--workers", "1", "main:app", ], repo if name == "kiro-manager": exe = _resolve_kiro_exe() if exe: return [exe], repo cargo = shutil.which("cargo") if not cargo: raise RuntimeError("未找到 Kiro Account Manager 可执行文件,且系统未安装 Rust/Cargo,无法从源码启动") return ["npm", "run", "tauri", "dev"], repo raise KeyError(name) def start(name: str) -> dict[str, Any]: with _LOCK: if name not in _SERVICE_META: raise KeyError(name) repo = _repo_path(name) if not repo.exists(): raise RuntimeError(f"{_SERVICE_META[name]['label']} 未安装,请先在插件页点击“安装”") if _status_one(name)["running"]: return _status_one(name) log_file = _open_log(name) try: command, cwd = _build_command(name) proc = subprocess.Popen( command, cwd=str(cwd), stdout=log_file, stderr=subprocess.STDOUT, creationflags=_creationflags(), ) _PROCS[name] = proc _LAST_ERROR[name] = "" except Exception as e: _LAST_ERROR[name] = str(e) _close_log(name) raise if _SERVICE_META[name]["kind"] == "web": for _ in range(90): time.sleep(1) if _health_ok(name): return _status_one(name) proc = _PROCS.get(name) if proc and proc.poll() is not None: _LAST_ERROR[name] = f"启动失败,退出码={proc.returncode}" return _status_one(name) _LAST_ERROR[name] = "启动超时" else: time.sleep(2) return _status_one(name) def stop(name: str) -> dict[str, Any]: with _LOCK: proc = _PROCS.get(name) port_pid = None desktop_pid = None if _SERVICE_META[name]["kind"] == "web": port_pid = _find_pid_by_port(int(_SERVICE_META[name].get("port") or 0)) else: desktop_pid = _find_desktop_pid(name) if proc and proc.poll() is None: if os.name == "nt": subprocess.run( ["taskkill", "/PID", str(proc.pid), "/T", "/F"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) else: proc.terminate() try: proc.wait(timeout=8) except Exception: proc.kill() if port_pid and (not proc or port_pid != proc.pid): subprocess.run( ["taskkill", "/PID", str(port_pid), "/T", "/F"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) if desktop_pid and (not proc or desktop_pid != proc.pid): subprocess.run( ["taskkill", "/PID", str(desktop_pid), "/T", "/F"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=_creationflags(), ) _PROCS.pop(name, None) _close_log(name) if _SERVICE_META[name]["kind"] == "web": for _ in range(10): if not _health_ok(name): break time.sleep(1) return _status_one(name) def start_all() -> list[dict[str, Any]]: results = [] for name in _SERVICE_META: try: if not _repo_path(name).exists(): item = _status_one(name) item["last_error"] = "未安装;如需使用请先手动安装" results.append(item) else: results.append(start(name)) except Exception: results.append(_status_one(name)) return results def stop_all() -> list[dict[str, Any]]: return [stop(name) for name in _SERVICE_META]