From d3f3ee5f2e7ebb270886bfc2aa7f329690762327 Mon Sep 17 00:00:00 2001 From: adminlove520 <791751568@qq.com> Date: Thu, 26 Feb 2026 10:27:06 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20ChatGPT=20=E6=89=B9=E9=87=8F=E6=B3=A8?= =?UTF-8?q?=E5=86=8C=E5=B7=A5=E5=85=B7=20-=20=E6=94=AF=E6=8C=81=20DuckMail?= =?UTF-8?q?=20+=20CPA=20=E9=9D=A2=E6=9D=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 87 ++ chatgpt_register.py | 1866 +++++++++++++++++++++++++++++++ codex/README.md | 60 + codex/config.json | 21 + codex/protocol_keygen.py | 2277 ++++++++++++++++++++++++++++++++++++++ config.json | 22 + 6 files changed, 4333 insertions(+) create mode 100644 README.md create mode 100644 chatgpt_register.py create mode 100644 codex/README.md create mode 100644 codex/config.json create mode 100644 codex/protocol_keygen.py create mode 100644 config.json diff --git a/README.md b/README.md new file mode 100644 index 0000000..b31fb7d --- /dev/null +++ b/README.md @@ -0,0 +1,87 @@ +# ChatGPT 批量自动注册工具 + +> 使用 DuckMail 临时邮箱,并发自动注册 ChatGPT 账号 + +## 功能 + +- 📨 自动创建临时邮箱 (DuckMail) +- 📥 自动获取 OTP 验证码 +- ⚡ 支持并发注册多个账号 +- 🔄 自动处理 OAuth 登录 +- ☁️ 支持代理配置 +- 📤 支持上传账号到 Codex / CPA 面板 + +## 环境 + +```bash +pip install curl_cffi +``` + +## 配置 (config.json) + +```json +{ + "total_accounts": 5, + "duckmail_api_base": "https://api.duckmail.sbs", + "duckmail_bearer": "你的 DuckMail API Token", + "proxy": "http://127.0.0.1:7890", + "output_file": "registered_accounts.txt", + "enable_oauth": true, + "oauth_redirect_uri": "http://localhost:1455/auth/callback", + "ak_file": "ak.txt", + "rk_file": "rk.txt" +} +``` + +| 配置项 | 说明 | +|--------|------| +| total_accounts | 注册账号数量 | +| duckmail_bearer | DuckMail API Token | +| proxy | 代理地址 (可选) | +| output_file | 输出账号文件 | +| enable_oauth | 启用 OAuth 登录 | +| ak_file | Access Key 文件 | +| rk_file | Refresh Key 文件 | + +## CPA 面板集成 + +注册完成后,可以自动上传账号到 CPA 面板: + +| 配置项 | 说明 | 参考 | +|--------|------|------| +| upload_api_url | CPA 面板上传 API 地址 | https://help.router-for.me/cn/ | +| upload_api_token | CPA 面板登录密码 | 你的 CPA 面板密码 | + +> CPA 面板仓库: https://github.com/dongshuyan/CPA-Dashboard + +## 使用 + +```bash +python chatgpt_register.py +``` + +## 输出 + +注册成功的账号会保存到 `registered_accounts.txt` + +## 目录结构 + +``` +chatgpt_register/ +├── chatgpt_register.py # 主程序 +├── config.json # 配置文件 +├── README.md # 本文档 +├── codex/ # Codex 协议密钥生成 +│ ├── config.json +│ └── protocol_keygen.py +├── registered_accounts.txt # 输出的账号 +├── ak.txt # Access Keys +└── rk.txt # Refresh Keys +``` + +## 注意事项 + +- 需要有效的代理才能注册成功 +- DuckMail API Token 需要从 https://duckmail.sbs 获取 +- 建议使用代理避免 IP 被封 +- 使用 CPA 面板需要先部署面板服务 diff --git a/chatgpt_register.py b/chatgpt_register.py new file mode 100644 index 0000000..b0ee5c7 --- /dev/null +++ b/chatgpt_register.py @@ -0,0 +1,1866 @@ +""" +ChatGPT 批量自动注册工具 (并发版) - DuckMail 临时邮箱版 +依赖: pip install curl_cffi +功能: 使用 DuckMail 临时邮箱,并发自动注册 ChatGPT 账号,自动获取 OTP 验证码 +""" + +import os +import re +import uuid +import json +import random +import string +import time +import sys +import threading +import traceback +import secrets +import hashlib +import base64 +from concurrent.futures import ThreadPoolExecutor, as_completed +from urllib.parse import urlparse, parse_qs, urlencode + +from curl_cffi import requests as curl_requests + +# ================= 加载配置 ================= +def _load_config(): + """从 config.json 加载配置,环境变量优先级更高""" + config = { + "total_accounts": 3, + "duckmail_api_base": "https://api.duckmail.sbs", + "duckmail_bearer": "", + "proxy": "", + "output_file": "registered_accounts.txt", + "enable_oauth": True, + "oauth_required": True, + "oauth_issuer": "https://auth.openai.com", + "oauth_client_id": "app_EMoamEEZ73f0CkXaXp7hrann", + "oauth_redirect_uri": "http://localhost:1455/auth/callback", + "ak_file": "ak.txt", + "rk_file": "rk.txt", + "token_json_dir": "codex_tokens", + "upload_api_url": "", + "upload_api_token": "", + } + + config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json") + if os.path.exists(config_path): + try: + with open(config_path, "r", encoding="utf-8") as f: + file_config = json.load(f) + config.update(file_config) + except Exception as e: + print(f"⚠️ 加载 config.json 失败: {e}") + + # 环境变量优先级更高 + config["duckmail_api_base"] = os.environ.get("DUCKMAIL_API_BASE", config["duckmail_api_base"]) + config["duckmail_bearer"] = os.environ.get("DUCKMAIL_BEARER", config["duckmail_bearer"]) + config["proxy"] = os.environ.get("PROXY", config["proxy"]) + config["total_accounts"] = int(os.environ.get("TOTAL_ACCOUNTS", config["total_accounts"])) + config["enable_oauth"] = os.environ.get("ENABLE_OAUTH", config["enable_oauth"]) + config["oauth_required"] = os.environ.get("OAUTH_REQUIRED", config["oauth_required"]) + config["oauth_issuer"] = os.environ.get("OAUTH_ISSUER", config["oauth_issuer"]) + config["oauth_client_id"] = os.environ.get("OAUTH_CLIENT_ID", config["oauth_client_id"]) + config["oauth_redirect_uri"] = os.environ.get("OAUTH_REDIRECT_URI", config["oauth_redirect_uri"]) + config["ak_file"] = os.environ.get("AK_FILE", config["ak_file"]) + config["rk_file"] = os.environ.get("RK_FILE", config["rk_file"]) + config["token_json_dir"] = os.environ.get("TOKEN_JSON_DIR", config["token_json_dir"]) + config["upload_api_url"] = os.environ.get("UPLOAD_API_URL", config["upload_api_url"]) + config["upload_api_token"] = os.environ.get("UPLOAD_API_TOKEN", config["upload_api_token"]) + + return config + + +def _as_bool(value): + if isinstance(value, bool): + return value + if value is None: + return False + return str(value).strip().lower() in {"1", "true", "yes", "y", "on"} + + +_CONFIG = _load_config() +DUCKMAIL_API_BASE = _CONFIG["duckmail_api_base"] +DUCKMAIL_BEARER = _CONFIG["duckmail_bearer"] +DEFAULT_TOTAL_ACCOUNTS = _CONFIG["total_accounts"] +DEFAULT_PROXY = _CONFIG["proxy"] +DEFAULT_OUTPUT_FILE = _CONFIG["output_file"] +ENABLE_OAUTH = _as_bool(_CONFIG.get("enable_oauth", True)) +OAUTH_REQUIRED = _as_bool(_CONFIG.get("oauth_required", True)) +OAUTH_ISSUER = _CONFIG["oauth_issuer"].rstrip("/") +OAUTH_CLIENT_ID = _CONFIG["oauth_client_id"] +OAUTH_REDIRECT_URI = _CONFIG["oauth_redirect_uri"] +AK_FILE = _CONFIG["ak_file"] +RK_FILE = _CONFIG["rk_file"] +TOKEN_JSON_DIR = _CONFIG["token_json_dir"] +UPLOAD_API_URL = _CONFIG["upload_api_url"] +UPLOAD_API_TOKEN = _CONFIG["upload_api_token"] + +if not DUCKMAIL_BEARER: + print("⚠️ 警告: 未设置 DUCKMAIL_BEARER,请在 config.json 中设置或设置环境变量") + print(" 文件: config.json -> duckmail_bearer") + print(" 环境变量: export DUCKMAIL_BEARER='your_api_key_here'") + +# 全局线程锁 +_print_lock = threading.Lock() +_file_lock = threading.Lock() + + +# Chrome 指纹配置: impersonate 与 sec-ch-ua 必须匹配真实浏览器 +_CHROME_PROFILES = [ + { + "major": 131, "impersonate": "chrome131", + "build": 6778, "patch_range": (69, 205), + "sec_ch_ua": '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"', + }, + { + "major": 133, "impersonate": "chrome133a", + "build": 6943, "patch_range": (33, 153), + "sec_ch_ua": '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"', + }, + { + "major": 136, "impersonate": "chrome136", + "build": 7103, "patch_range": (48, 175), + "sec_ch_ua": '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', + }, + { + "major": 142, "impersonate": "chrome142", + "build": 7540, "patch_range": (30, 150), + "sec_ch_ua": '"Chromium";v="142", "Google Chrome";v="142", "Not_A Brand";v="99"', + }, +] + + +def _random_chrome_version(): + profile = random.choice(_CHROME_PROFILES) + major = profile["major"] + build = profile["build"] + patch = random.randint(*profile["patch_range"]) + full_ver = f"{major}.0.{build}.{patch}" + ua = f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{full_ver} Safari/537.36" + return profile["impersonate"], major, full_ver, ua, profile["sec_ch_ua"] + + +def _random_delay(low=0.3, high=1.0): + time.sleep(random.uniform(low, high)) + + +def _make_trace_headers(): + trace_id = random.randint(10**17, 10**18 - 1) + parent_id = random.randint(10**17, 10**18 - 1) + tp = f"00-{uuid.uuid4().hex}-{format(parent_id, '016x')}-01" + return { + "traceparent": tp, "tracestate": "dd=s:1;o:rum", + "x-datadog-origin": "rum", "x-datadog-sampling-priority": "1", + "x-datadog-trace-id": str(trace_id), "x-datadog-parent-id": str(parent_id), + } + + +def _generate_pkce(): + code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(64)).rstrip(b"=").decode("ascii") + digest = hashlib.sha256(code_verifier.encode("ascii")).digest() + code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii") + return code_verifier, code_challenge + + +class SentinelTokenGenerator: + """纯 Python 版本 sentinel token 生成器(PoW)""" + + MAX_ATTEMPTS = 500000 + ERROR_PREFIX = "wQ8Lk5FbGpA2NcR9dShT6gYjU7VxZ4D" + + def __init__(self, device_id=None, user_agent=None): + self.device_id = device_id or str(uuid.uuid4()) + self.user_agent = user_agent or ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.0.0 Safari/537.36" + ) + self.requirements_seed = str(random.random()) + self.sid = str(uuid.uuid4()) + + @staticmethod + def _fnv1a_32(text: str): + h = 2166136261 + for ch in text: + h ^= ord(ch) + h = (h * 16777619) & 0xFFFFFFFF + h ^= (h >> 16) + h = (h * 2246822507) & 0xFFFFFFFF + h ^= (h >> 13) + h = (h * 3266489909) & 0xFFFFFFFF + h ^= (h >> 16) + h &= 0xFFFFFFFF + return format(h, "08x") + + def _get_config(self): + now_str = time.strftime( + "%a %b %d %Y %H:%M:%S GMT+0000 (Coordinated Universal Time)", + time.gmtime(), + ) + perf_now = random.uniform(1000, 50000) + time_origin = time.time() * 1000 - perf_now + nav_prop = random.choice([ + "vendorSub", "productSub", "vendor", "maxTouchPoints", + "scheduling", "userActivation", "doNotTrack", "geolocation", + "connection", "plugins", "mimeTypes", "pdfViewerEnabled", + "webkitTemporaryStorage", "webkitPersistentStorage", + "hardwareConcurrency", "cookieEnabled", "credentials", + "mediaDevices", "permissions", "locks", "ink", + ]) + nav_val = f"{nav_prop}-undefined" + + return [ + "1920x1080", + now_str, + 4294705152, + random.random(), + self.user_agent, + "https://sentinel.openai.com/sentinel/20260124ceb8/sdk.js", + None, + None, + "en-US", + "en-US,en", + random.random(), + nav_val, + random.choice(["location", "implementation", "URL", "documentURI", "compatMode"]), + random.choice(["Object", "Function", "Array", "Number", "parseFloat", "undefined"]), + perf_now, + self.sid, + "", + random.choice([4, 8, 12, 16]), + time_origin, + ] + + @staticmethod + def _base64_encode(data): + raw = json.dumps(data, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + return base64.b64encode(raw).decode("ascii") + + def _run_check(self, start_time, seed, difficulty, config, nonce): + config[3] = nonce + config[9] = round((time.time() - start_time) * 1000) + data = self._base64_encode(config) + hash_hex = self._fnv1a_32(seed + data) + diff_len = len(difficulty) + if hash_hex[:diff_len] <= difficulty: + return data + "~S" + return None + + def generate_token(self, seed=None, difficulty=None): + seed = seed if seed is not None else self.requirements_seed + difficulty = str(difficulty or "0") + start_time = time.time() + config = self._get_config() + + for i in range(self.MAX_ATTEMPTS): + result = self._run_check(start_time, seed, difficulty, config, i) + if result: + return "gAAAAAB" + result + return "gAAAAAB" + self.ERROR_PREFIX + self._base64_encode(str(None)) + + def generate_requirements_token(self): + config = self._get_config() + config[3] = 1 + config[9] = round(random.uniform(5, 50)) + data = self._base64_encode(config) + return "gAAAAAC" + data + + +def fetch_sentinel_challenge(session, device_id, flow="authorize_continue", user_agent=None, + sec_ch_ua=None, impersonate=None): + generator = SentinelTokenGenerator(device_id=device_id, user_agent=user_agent) + req_body = { + "p": generator.generate_requirements_token(), + "id": device_id, + "flow": flow, + } + headers = { + "Content-Type": "text/plain;charset=UTF-8", + "Referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html", + "Origin": "https://sentinel.openai.com", + "User-Agent": user_agent or "Mozilla/5.0", + "sec-ch-ua": sec_ch_ua or '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"', + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"Windows"', + } + + kwargs = { + "data": json.dumps(req_body), + "headers": headers, + "timeout": 20, + } + if impersonate: + kwargs["impersonate"] = impersonate + + try: + resp = session.post("https://sentinel.openai.com/backend-api/sentinel/req", **kwargs) + except Exception: + return None + + if resp.status_code != 200: + return None + + try: + return resp.json() + except Exception: + return None + + +def build_sentinel_token(session, device_id, flow="authorize_continue", user_agent=None, + sec_ch_ua=None, impersonate=None): + challenge = fetch_sentinel_challenge( + session, + device_id, + flow=flow, + user_agent=user_agent, + sec_ch_ua=sec_ch_ua, + impersonate=impersonate, + ) + if not challenge: + return None + + c_value = challenge.get("token", "") + if not c_value: + return None + + pow_data = challenge.get("proofofwork") or {} + generator = SentinelTokenGenerator(device_id=device_id, user_agent=user_agent) + + if pow_data.get("required") and pow_data.get("seed"): + p_value = generator.generate_token( + seed=pow_data.get("seed"), + difficulty=pow_data.get("difficulty", "0"), + ) + else: + p_value = generator.generate_requirements_token() + + return json.dumps({ + "p": p_value, + "t": "", + "c": c_value, + "id": device_id, + "flow": flow, + }, separators=(",", ":")) + + +def _extract_code_from_url(url: str): + if not url or "code=" not in url: + return None + try: + return parse_qs(urlparse(url).query).get("code", [None])[0] + except Exception: + return None + + +def _decode_jwt_payload(token: str): + try: + parts = token.split(".") + if len(parts) != 3: + return {} + payload = parts[1] + padding = 4 - len(payload) % 4 + if padding != 4: + payload += "=" * padding + decoded = base64.urlsafe_b64decode(payload) + return json.loads(decoded) + except Exception: + return {} + + +def _save_codex_tokens(email: str, tokens: dict): + access_token = tokens.get("access_token", "") + refresh_token = tokens.get("refresh_token", "") + id_token = tokens.get("id_token", "") + + if access_token: + with _file_lock: + with open(AK_FILE, "a", encoding="utf-8") as f: + f.write(f"{access_token}\n") + + if refresh_token: + with _file_lock: + with open(RK_FILE, "a", encoding="utf-8") as f: + f.write(f"{refresh_token}\n") + + if not access_token: + return + + payload = _decode_jwt_payload(access_token) + auth_info = payload.get("https://api.openai.com/auth", {}) + account_id = auth_info.get("chatgpt_account_id", "") + + exp_timestamp = payload.get("exp") + expired_str = "" + if isinstance(exp_timestamp, int) and exp_timestamp > 0: + from datetime import datetime, timezone, timedelta + + exp_dt = datetime.fromtimestamp(exp_timestamp, tz=timezone(timedelta(hours=8))) + expired_str = exp_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00") + + from datetime import datetime, timezone, timedelta + + now = datetime.now(tz=timezone(timedelta(hours=8))) + token_data = { + "type": "codex", + "email": email, + "expired": expired_str, + "id_token": id_token, + "account_id": account_id, + "access_token": access_token, + "last_refresh": now.strftime("%Y-%m-%dT%H:%M:%S+08:00"), + "refresh_token": refresh_token, + } + + base_dir = os.path.dirname(os.path.abspath(__file__)) + token_dir = TOKEN_JSON_DIR if os.path.isabs(TOKEN_JSON_DIR) else os.path.join(base_dir, TOKEN_JSON_DIR) + os.makedirs(token_dir, exist_ok=True) + + token_path = os.path.join(token_dir, f"{email}.json") + with _file_lock: + with open(token_path, "w", encoding="utf-8") as f: + json.dump(token_data, f, ensure_ascii=False) + + # 上传到 CPA 管理平台 + if UPLOAD_API_URL: + _upload_token_json(token_path) + + +def _upload_token_json(filepath): + """上传 Token JSON 文件到 CPA 管理平台""" + mp = None + try: + from curl_cffi import CurlMime + + filename = os.path.basename(filepath) + mp = CurlMime() + mp.addpart( + name="file", + content_type="application/json", + filename=filename, + local_path=filepath, + ) + + session = curl_requests.Session() + if DEFAULT_PROXY: + session.proxies = {"http": DEFAULT_PROXY, "https": DEFAULT_PROXY} + + resp = session.post( + UPLOAD_API_URL, + multipart=mp, + headers={"Authorization": f"Bearer {UPLOAD_API_TOKEN}"}, + verify=False, + timeout=30, + ) + + if resp.status_code == 200: + with _print_lock: + print(f" [CPA] Token JSON 已上传到 CPA 管理平台") + else: + with _print_lock: + print(f" [CPA] 上传失败: {resp.status_code} - {resp.text[:200]}") + except Exception as e: + with _print_lock: + print(f" [CPA] 上传异常: {e}") + finally: + if mp: + mp.close() + + +def _generate_password(length=14): + lower = string.ascii_lowercase + upper = string.ascii_uppercase + digits = string.digits + special = "!@#$%&*" + pwd = [random.choice(lower), random.choice(upper), + random.choice(digits), random.choice(special)] + all_chars = lower + upper + digits + special + pwd += [random.choice(all_chars) for _ in range(length - 4)] + random.shuffle(pwd) + return "".join(pwd) + + +# ================= DuckMail 邮箱函数 ================= + +def _create_duckmail_session(): + """创建带重试的 DuckMail 请求会话""" + session = curl_requests.Session() + session.headers.update({ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + "Accept": "application/json", + "Content-Type": "application/json", + }) + return session + + +def create_temp_email(): + """创建 DuckMail 临时邮箱,返回 (email, password, mail_token)""" + if not DUCKMAIL_BEARER: + raise Exception("DUCKMAIL_BEARER 未设置,无法创建临时邮箱") + + # 生成随机邮箱前缀 8-13 位 + chars = string.ascii_lowercase + string.digits + length = random.randint(8, 13) + email_local = "".join(random.choice(chars) for _ in range(length)) + email = f"{email_local}@duckmail.sbs" + password = _generate_password() + + api_base = DUCKMAIL_API_BASE.rstrip("/") + headers = {"Authorization": f"Bearer {DUCKMAIL_BEARER}"} + session = _create_duckmail_session() + + try: + # 1. 创建账号 + payload = {"address": email, "password": password} + res = session.post( + f"{api_base}/accounts", + json=payload, + headers=headers, + timeout=15, + impersonate="chrome131" + ) + + if res.status_code not in [200, 201]: + raise Exception(f"创建邮箱失败: {res.status_code} - {res.text[:200]}") + + # 2. 获取 Token(用于读取邮件) + time.sleep(0.5) + token_payload = {"address": email, "password": password} + token_res = session.post( + f"{api_base}/token", + json=token_payload, + timeout=15, + impersonate="chrome131" + ) + + if token_res.status_code == 200: + token_data = token_res.json() + mail_token = token_data.get("token") + if mail_token: + return email, password, mail_token + + raise Exception(f"获取邮件 Token 失败: {token_res.status_code}") + + except Exception as e: + raise Exception(f"DuckMail 创建邮箱失败: {e}") + + +def _fetch_emails_duckmail(mail_token: str): + """从 DuckMail 获取邮件列表""" + try: + api_base = DUCKMAIL_API_BASE.rstrip("/") + headers = {"Authorization": f"Bearer {mail_token}"} + session = _create_duckmail_session() + + res = session.get( + f"{api_base}/messages", + headers=headers, + timeout=15, + impersonate="chrome131" + ) + + if res.status_code == 200: + data = res.json() + # DuckMail API 返回格式可能是 hydra:member 或 member + messages = data.get("hydra:member") or data.get("member") or data.get("data") or [] + return messages + return [] + except Exception as e: + return [] + + +def _fetch_email_detail_duckmail(mail_token: str, msg_id: str): + """获取 DuckMail 单封邮件详情""" + try: + api_base = DUCKMAIL_API_BASE.rstrip("/") + headers = {"Authorization": f"Bearer {mail_token}"} + session = _create_duckmail_session() + + # 处理 msg_id 格式 + if isinstance(msg_id, str) and msg_id.startswith("/messages/"): + msg_id = msg_id.split("/")[-1] + + res = session.get( + f"{api_base}/messages/{msg_id}", + headers=headers, + timeout=15, + impersonate="chrome131" + ) + + if res.status_code == 200: + return res.json() + except Exception: + pass + return None + + +def _extract_verification_code(email_content: str): + """从邮件内容提取 6 位验证码""" + if not email_content: + return None + + patterns = [ + r"Verification code:?\s*(\d{6})", + r"code is\s*(\d{6})", + r"代码为[::]?\s*(\d{6})", + r"验证码[::]?\s*(\d{6})", + r">\s*(\d{6})\s*<", + r"(? 0: + # 获取最新邮件详情 + first_msg = messages[0] + msg_id = first_msg.get("id") or first_msg.get("@id") + + if msg_id: + detail = _fetch_email_detail_duckmail(mail_token, msg_id) + if detail: + # DuckMail 的邮件内容在 text 或 html 字段 + content = detail.get("text") or detail.get("html") or "" + code = _extract_verification_code(content) + if code: + return code + + time.sleep(3) + + return None + + +def _random_name(): + first = random.choice([ + "James", "Emma", "Liam", "Olivia", "Noah", "Ava", "Ethan", "Sophia", + "Lucas", "Mia", "Mason", "Isabella", "Logan", "Charlotte", "Alexander", + "Amelia", "Benjamin", "Harper", "William", "Evelyn", "Henry", "Abigail", + "Sebastian", "Emily", "Jack", "Elizabeth", + ]) + last = random.choice([ + "Smith", "Johnson", "Brown", "Davis", "Wilson", "Moore", "Taylor", + "Clark", "Hall", "Young", "Anderson", "Thomas", "Jackson", "White", + "Harris", "Martin", "Thompson", "Garcia", "Robinson", "Lewis", + "Walker", "Allen", "King", "Wright", "Scott", "Green", + ]) + return f"{first} {last}" + + +def _random_birthdate(): + y = random.randint(1985, 2002) + m = random.randint(1, 12) + d = random.randint(1, 28) + return f"{y}-{m:02d}-{d:02d}" + + +class ChatGPTRegister: + BASE = "https://chatgpt.com" + AUTH = "https://auth.openai.com" + + def __init__(self, proxy: str = None, tag: str = ""): + self.tag = tag # 线程标识,用于日志 + self.device_id = str(uuid.uuid4()) + self.auth_session_logging_id = str(uuid.uuid4()) + self.impersonate, self.chrome_major, self.chrome_full, self.ua, self.sec_ch_ua = _random_chrome_version() + + self.session = curl_requests.Session(impersonate=self.impersonate) + + self.proxy = proxy + if self.proxy: + self.session.proxies = {"http": self.proxy, "https": self.proxy} + + self.session.headers.update({ + "User-Agent": self.ua, + "Accept-Language": random.choice([ + "en-US,en;q=0.9", "en-US,en;q=0.9,zh-CN;q=0.8", + "en,en-US;q=0.9", "en-US,en;q=0.8", + ]), + "sec-ch-ua": self.sec_ch_ua, "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"Windows"', "sec-ch-ua-arch": '"x86"', + "sec-ch-ua-bitness": '"64"', + "sec-ch-ua-full-version": f'"{self.chrome_full}"', + "sec-ch-ua-platform-version": f'"{random.randint(10, 15)}.0.0"', + }) + + self.session.cookies.set("oai-did", self.device_id, domain="chatgpt.com") + self._callback_url = None + + def _log(self, step, method, url, status, body=None): + prefix = f"[{self.tag}] " if self.tag else "" + lines = [ + f"\n{'='*60}", + f"{prefix}[Step] {step}", + f"{prefix}[{method}] {url}", + f"{prefix}[Status] {status}", + ] + if body: + try: + lines.append(f"{prefix}[Response] {json.dumps(body, indent=2, ensure_ascii=False)[:1000]}") + except Exception: + lines.append(f"{prefix}[Response] {str(body)[:1000]}") + lines.append(f"{'='*60}") + with _print_lock: + print("\n".join(lines)) + + def _print(self, msg): + prefix = f"[{self.tag}] " if self.tag else "" + with _print_lock: + print(f"{prefix}{msg}") + + # ==================== DuckMail 临时邮箱 ==================== + + def _create_duckmail_session(self): + """创建带重试的 DuckMail 请求会话""" + session = curl_requests.Session() + session.headers.update({ + "User-Agent": self.ua, + "Accept": "application/json", + "Content-Type": "application/json", + }) + if self.proxy: + session.proxies = {"http": self.proxy, "https": self.proxy} + return session + + def create_temp_email(self): + """创建 DuckMail 临时邮箱,返回 (email, password, mail_token)""" + if not DUCKMAIL_BEARER: + raise Exception("DUCKMAIL_BEARER 未设置,无法创建临时邮箱") + + # 生成随机邮箱前缀 8-13 位 + chars = string.ascii_lowercase + string.digits + length = random.randint(8, 13) + email_local = "".join(random.choice(chars) for _ in range(length)) + email = f"{email_local}@duckmail.sbs" + password = _generate_password() + + api_base = DUCKMAIL_API_BASE.rstrip("/") + headers = {"Authorization": f"Bearer {DUCKMAIL_BEARER}"} + session = self._create_duckmail_session() + + try: + # 1. 创建账号 + payload = {"address": email, "password": password} + res = session.post( + f"{api_base}/accounts", + json=payload, + headers=headers, + timeout=15, + impersonate=self.impersonate + ) + + if res.status_code not in [200, 201]: + raise Exception(f"创建邮箱失败: {res.status_code} - {res.text[:200]}") + + # 2. 获取 Token(用于读取邮件) + time.sleep(0.5) + token_payload = {"address": email, "password": password} + token_res = session.post( + f"{api_base}/token", + json=token_payload, + timeout=15, + impersonate=self.impersonate + ) + + if token_res.status_code == 200: + token_data = token_res.json() + mail_token = token_data.get("token") + if mail_token: + return email, password, mail_token + + raise Exception(f"获取邮件 Token 失败: {token_res.status_code}") + + except Exception as e: + raise Exception(f"DuckMail 创建邮箱失败: {e}") + + def _fetch_emails_duckmail(self, mail_token: str): + """从 DuckMail 获取邮件列表""" + try: + api_base = DUCKMAIL_API_BASE.rstrip("/") + headers = {"Authorization": f"Bearer {mail_token}"} + session = self._create_duckmail_session() + + res = session.get( + f"{api_base}/messages", + headers=headers, + timeout=15, + impersonate=self.impersonate + ) + + if res.status_code == 200: + data = res.json() + messages = data.get("hydra:member") or data.get("member") or data.get("data") or [] + return messages + return [] + except Exception: + return [] + + def _fetch_email_detail_duckmail(self, mail_token: str, msg_id: str): + """获取 DuckMail 单封邮件详情""" + try: + api_base = DUCKMAIL_API_BASE.rstrip("/") + headers = {"Authorization": f"Bearer {mail_token}"} + session = self._create_duckmail_session() + + if isinstance(msg_id, str) and msg_id.startswith("/messages/"): + msg_id = msg_id.split("/")[-1] + + res = session.get( + f"{api_base}/messages/{msg_id}", + headers=headers, + timeout=15, + impersonate=self.impersonate + ) + + if res.status_code == 200: + return res.json() + except Exception: + pass + return None + + def _extract_verification_code(self, email_content: str): + """从邮件内容提取 6 位验证码""" + if not email_content: + return None + + patterns = [ + r"Verification code:?\s*(\d{6})", + r"code is\s*(\d{6})", + r"代码为[::]?\s*(\d{6})", + r"验证码[::]?\s*(\d{6})", + r">\s*(\d{6})\s*<", + r"(? 0: + first_msg = messages[0] + msg_id = first_msg.get("id") or first_msg.get("@id") + + if msg_id: + detail = self._fetch_email_detail_duckmail(mail_token, msg_id) + if detail: + content = detail.get("text") or detail.get("html") or "" + code = self._extract_verification_code(content) + if code: + self._print(f"[OTP] 验证码: {code}") + return code + + elapsed = int(time.time() - start_time) + self._print(f"[OTP] 等待中... ({elapsed}s/{timeout}s)") + time.sleep(3) + + self._print(f"[OTP] 超时 ({timeout}s)") + return None + + # ==================== 注册流程 ==================== + + def visit_homepage(self): + url = f"{self.BASE}/" + r = self.session.get(url, headers={ + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", + "Upgrade-Insecure-Requests": "1", + }, allow_redirects=True) + self._log("0. Visit homepage", "GET", url, r.status_code, + {"cookies_count": len(self.session.cookies)}) + + def get_csrf(self) -> str: + url = f"{self.BASE}/api/auth/csrf" + r = self.session.get(url, headers={"Accept": "application/json", "Referer": f"{self.BASE}/"}) + data = r.json() + token = data.get("csrfToken", "") + self._log("1. Get CSRF", "GET", url, r.status_code, data) + if not token: + raise Exception("Failed to get CSRF token") + return token + + def signin(self, email: str, csrf: str) -> str: + url = f"{self.BASE}/api/auth/signin/openai" + params = { + "prompt": "login", "ext-oai-did": self.device_id, + "auth_session_logging_id": self.auth_session_logging_id, + "screen_hint": "login_or_signup", "login_hint": email, + } + form_data = {"callbackUrl": f"{self.BASE}/", "csrfToken": csrf, "json": "true"} + r = self.session.post(url, params=params, data=form_data, headers={ + "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json", "Referer": f"{self.BASE}/", "Origin": self.BASE, + }) + data = r.json() + authorize_url = data.get("url", "") + self._log("2. Signin", "POST", url, r.status_code, data) + if not authorize_url: + raise Exception("Failed to get authorize URL") + return authorize_url + + def authorize(self, url: str) -> str: + r = self.session.get(url, headers={ + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Referer": f"{self.BASE}/", "Upgrade-Insecure-Requests": "1", + }, allow_redirects=True) + final_url = str(r.url) + self._log("3. Authorize", "GET", url, r.status_code, {"final_url": final_url}) + return final_url + + def register(self, email: str, password: str): + url = f"{self.AUTH}/api/accounts/user/register" + headers = {"Content-Type": "application/json", "Accept": "application/json", + "Referer": f"{self.AUTH}/create-account/password", "Origin": self.AUTH} + headers.update(_make_trace_headers()) + r = self.session.post(url, json={"username": email, "password": password}, headers=headers) + try: data = r.json() + except Exception: data = {"text": r.text[:500]} + self._log("4. Register", "POST", url, r.status_code, data) + return r.status_code, data + + def send_otp(self): + url = f"{self.AUTH}/api/accounts/email-otp/send" + r = self.session.get(url, headers={ + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Referer": f"{self.AUTH}/create-account/password", "Upgrade-Insecure-Requests": "1", + }, allow_redirects=True) + try: data = r.json() + except Exception: data = {"final_url": str(r.url), "status": r.status_code} + self._log("5. Send OTP", "GET", url, r.status_code, data) + return r.status_code, data + + def validate_otp(self, code: str): + url = f"{self.AUTH}/api/accounts/email-otp/validate" + headers = {"Content-Type": "application/json", "Accept": "application/json", + "Referer": f"{self.AUTH}/email-verification", "Origin": self.AUTH} + headers.update(_make_trace_headers()) + r = self.session.post(url, json={"code": code}, headers=headers) + try: data = r.json() + except Exception: data = {"text": r.text[:500]} + self._log("6. Validate OTP", "POST", url, r.status_code, data) + return r.status_code, data + + def create_account(self, name: str, birthdate: str): + url = f"{self.AUTH}/api/accounts/create_account" + headers = {"Content-Type": "application/json", "Accept": "application/json", + "Referer": f"{self.AUTH}/about-you", "Origin": self.AUTH} + headers.update(_make_trace_headers()) + r = self.session.post(url, json={"name": name, "birthdate": birthdate}, headers=headers) + try: data = r.json() + except Exception: data = {"text": r.text[:500]} + self._log("7. Create Account", "POST", url, r.status_code, data) + if isinstance(data, dict): + cb = data.get("continue_url") or data.get("url") or data.get("redirect_url") + if cb: + self._callback_url = cb + return r.status_code, data + + def callback(self, url: str = None): + if not url: + url = self._callback_url + if not url: + self._print("[!] No callback URL, skipping.") + return None, None + r = self.session.get(url, headers={ + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Upgrade-Insecure-Requests": "1", + }, allow_redirects=True) + self._log("8. Callback", "GET", url, r.status_code, {"final_url": str(r.url)}) + return r.status_code, {"final_url": str(r.url)} + + # ==================== 自动注册主流程 ==================== + + def run_register(self, email, password, name, birthdate, mail_token): + """使用 DuckMail 的注册流程""" + self.visit_homepage() + _random_delay(0.3, 0.8) + csrf = self.get_csrf() + _random_delay(0.2, 0.5) + auth_url = self.signin(email, csrf) + _random_delay(0.3, 0.8) + + final_url = self.authorize(auth_url) + final_path = urlparse(final_url).path + _random_delay(0.3, 0.8) + + self._print(f"Authorize → {final_path}") + + need_otp = False + + if "create-account/password" in final_path: + self._print("全新注册流程") + _random_delay(0.5, 1.0) + status, data = self.register(email, password) + if status != 200: + raise Exception(f"Register 失败 ({status}): {data}") + # register 之后可能还需要 send_otp(全新注册流程中 OTP 不一定在 authorize 时发送) + _random_delay(0.3, 0.8) + self.send_otp() + need_otp = True + elif "email-verification" in final_path or "email-otp" in final_path: + self._print("跳到 OTP 验证阶段 (authorize 已触发 OTP,不再重复发送)") + # 不调用 send_otp(),因为 authorize 重定向到 email-verification 时服务器已发送 OTP + need_otp = True + elif "about-you" in final_path: + self._print("跳到填写信息阶段") + _random_delay(0.5, 1.0) + self.create_account(name, birthdate) + _random_delay(0.3, 0.5) + self.callback() + return True + elif "callback" in final_path or "chatgpt.com" in final_url: + self._print("账号已完成注册") + return True + else: + self._print(f"未知跳转: {final_url}") + self.register(email, password) + self.send_otp() + need_otp = True + + if need_otp: + # 使用 DuckMail 等待验证码 + otp_code = self.wait_for_verification_email(mail_token) + if not otp_code: + raise Exception("未能获取验证码") + + _random_delay(0.3, 0.8) + status, data = self.validate_otp(otp_code) + if status != 200: + self._print("验证码失败,重试...") + self.send_otp() + _random_delay(1.0, 2.0) + otp_code = self.wait_for_verification_email(mail_token, timeout=60) + if not otp_code: + raise Exception("重试后仍未获取验证码") + _random_delay(0.3, 0.8) + status, data = self.validate_otp(otp_code) + if status != 200: + raise Exception(f"验证码失败 ({status}): {data}") + + _random_delay(0.5, 1.5) + status, data = self.create_account(name, birthdate) + if status != 200: + raise Exception(f"Create account 失败 ({status}): {data}") + _random_delay(0.2, 0.5) + self.callback() + return True + + def _decode_oauth_session_cookie(self): + jar = getattr(self.session.cookies, "jar", None) + if jar is not None: + cookie_items = list(jar) + else: + cookie_items = [] + + for c in cookie_items: + name = getattr(c, "name", "") or "" + if "oai-client-auth-session" not in name: + continue + + raw_val = (getattr(c, "value", "") or "").strip() + if not raw_val: + continue + + candidates = [raw_val] + try: + from urllib.parse import unquote + + decoded = unquote(raw_val) + if decoded != raw_val: + candidates.append(decoded) + except Exception: + pass + + for val in candidates: + try: + if (val.startswith('"') and val.endswith('"')) or (val.startswith("'") and val.endswith("'")): + val = val[1:-1] + + part = val.split(".")[0] if "." in val else val + pad = 4 - len(part) % 4 + if pad != 4: + part += "=" * pad + raw = base64.urlsafe_b64decode(part) + data = json.loads(raw.decode("utf-8")) + if isinstance(data, dict): + return data + except Exception: + continue + return None + + def _oauth_allow_redirect_extract_code(self, url: str, referer: str = None): + headers = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Upgrade-Insecure-Requests": "1", + "User-Agent": self.ua, + } + if referer: + headers["Referer"] = referer + + try: + resp = self.session.get( + url, + headers=headers, + allow_redirects=True, + timeout=30, + impersonate=self.impersonate, + ) + final_url = str(resp.url) + code = _extract_code_from_url(final_url) + if code: + self._print("[OAuth] allow_redirect 命中最终 URL code") + return code + + for r in getattr(resp, "history", []) or []: + loc = r.headers.get("Location", "") + code = _extract_code_from_url(loc) + if code: + self._print("[OAuth] allow_redirect 命中 history Location code") + return code + code = _extract_code_from_url(str(r.url)) + if code: + self._print("[OAuth] allow_redirect 命中 history URL code") + return code + except Exception as e: + maybe_localhost = re.search(r'(https?://localhost[^\s\'\"]+)', str(e)) + if maybe_localhost: + code = _extract_code_from_url(maybe_localhost.group(1)) + if code: + self._print("[OAuth] allow_redirect 从 localhost 异常提取 code") + return code + self._print(f"[OAuth] allow_redirect 异常: {e}") + + return None + + def _oauth_follow_for_code(self, start_url: str, referer: str = None, max_hops: int = 16): + headers = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Upgrade-Insecure-Requests": "1", + "User-Agent": self.ua, + } + if referer: + headers["Referer"] = referer + + current_url = start_url + last_url = start_url + + for hop in range(max_hops): + try: + resp = self.session.get( + current_url, + headers=headers, + allow_redirects=False, + timeout=30, + impersonate=self.impersonate, + ) + except Exception as e: + maybe_localhost = re.search(r'(https?://localhost[^\s\'\"]+)', str(e)) + if maybe_localhost: + code = _extract_code_from_url(maybe_localhost.group(1)) + if code: + self._print(f"[OAuth] follow[{hop + 1}] 命中 localhost 回调") + return code, maybe_localhost.group(1) + self._print(f"[OAuth] follow[{hop + 1}] 请求异常: {e}") + return None, last_url + + last_url = str(resp.url) + self._print(f"[OAuth] follow[{hop + 1}] {resp.status_code} {last_url[:140]}") + code = _extract_code_from_url(last_url) + if code: + return code, last_url + + if resp.status_code in (301, 302, 303, 307, 308): + loc = resp.headers.get("Location", "") + if not loc: + return None, last_url + if loc.startswith("/"): + loc = f"{OAUTH_ISSUER}{loc}" + code = _extract_code_from_url(loc) + if code: + return code, loc + current_url = loc + headers["Referer"] = last_url + continue + + return None, last_url + + return None, last_url + + def _oauth_submit_workspace_and_org(self, consent_url: str): + session_data = self._decode_oauth_session_cookie() + if not session_data: + jar = getattr(self.session.cookies, "jar", None) + if jar is not None: + cookie_names = [getattr(c, "name", "") for c in list(jar)] + else: + cookie_names = list(self.session.cookies.keys()) + self._print(f"[OAuth] 无法解码 oai-client-auth-session, cookies={cookie_names[:12]}") + return None + + workspaces = session_data.get("workspaces", []) + if not workspaces: + self._print("[OAuth] session 中没有 workspace 信息") + return None + + workspace_id = (workspaces[0] or {}).get("id") + if not workspace_id: + self._print("[OAuth] workspace_id 为空") + return None + + h = { + "Accept": "application/json", + "Content-Type": "application/json", + "Origin": OAUTH_ISSUER, + "Referer": consent_url, + "User-Agent": self.ua, + "oai-device-id": self.device_id, + } + h.update(_make_trace_headers()) + + resp = self.session.post( + f"{OAUTH_ISSUER}/api/accounts/workspace/select", + json={"workspace_id": workspace_id}, + headers=h, + allow_redirects=False, + timeout=30, + impersonate=self.impersonate, + ) + self._print(f"[OAuth] workspace/select -> {resp.status_code}") + + if resp.status_code in (301, 302, 303, 307, 308): + loc = resp.headers.get("Location", "") + if loc.startswith("/"): + loc = f"{OAUTH_ISSUER}{loc}" + code = _extract_code_from_url(loc) + if code: + return code + code, _ = self._oauth_follow_for_code(loc, referer=consent_url) + if not code: + code = self._oauth_allow_redirect_extract_code(loc, referer=consent_url) + return code + + if resp.status_code != 200: + self._print(f"[OAuth] workspace/select 失败: {resp.status_code}") + return None + + try: + ws_data = resp.json() + except Exception: + self._print("[OAuth] workspace/select 响应不是 JSON") + return None + + ws_next = ws_data.get("continue_url", "") + orgs = ws_data.get("data", {}).get("orgs", []) + ws_page = (ws_data.get("page") or {}).get("type", "") + self._print(f"[OAuth] workspace/select page={ws_page or '-'} next={(ws_next or '-')[:140]}") + + org_id = None + project_id = None + if orgs: + org_id = (orgs[0] or {}).get("id") + projects = (orgs[0] or {}).get("projects", []) + if projects: + project_id = (projects[0] or {}).get("id") + + if org_id: + org_body = {"org_id": org_id} + if project_id: + org_body["project_id"] = project_id + + h_org = dict(h) + if ws_next: + h_org["Referer"] = ws_next if ws_next.startswith("http") else f"{OAUTH_ISSUER}{ws_next}" + + resp_org = self.session.post( + f"{OAUTH_ISSUER}/api/accounts/organization/select", + json=org_body, + headers=h_org, + allow_redirects=False, + timeout=30, + impersonate=self.impersonate, + ) + self._print(f"[OAuth] organization/select -> {resp_org.status_code}") + if resp_org.status_code in (301, 302, 303, 307, 308): + loc = resp_org.headers.get("Location", "") + if loc.startswith("/"): + loc = f"{OAUTH_ISSUER}{loc}" + code = _extract_code_from_url(loc) + if code: + return code + code, _ = self._oauth_follow_for_code(loc, referer=h_org.get("Referer")) + if not code: + code = self._oauth_allow_redirect_extract_code(loc, referer=h_org.get("Referer")) + return code + + if resp_org.status_code == 200: + try: + org_data = resp_org.json() + except Exception: + self._print("[OAuth] organization/select 响应不是 JSON") + return None + + org_next = org_data.get("continue_url", "") + org_page = (org_data.get("page") or {}).get("type", "") + self._print(f"[OAuth] organization/select page={org_page or '-'} next={(org_next or '-')[:140]}") + if org_next: + if org_next.startswith("/"): + org_next = f"{OAUTH_ISSUER}{org_next}" + code, _ = self._oauth_follow_for_code(org_next, referer=h_org.get("Referer")) + if not code: + code = self._oauth_allow_redirect_extract_code(org_next, referer=h_org.get("Referer")) + return code + + if ws_next: + if ws_next.startswith("/"): + ws_next = f"{OAUTH_ISSUER}{ws_next}" + code, _ = self._oauth_follow_for_code(ws_next, referer=consent_url) + if not code: + code = self._oauth_allow_redirect_extract_code(ws_next, referer=consent_url) + return code + + return None + + def perform_codex_oauth_login_http(self, email: str, password: str, mail_token: str = None): + self._print("[OAuth] 开始执行 Codex OAuth 纯协议流程...") + + # 兼容两种 domain 形式,确保 auth 域也带 oai-did + self.session.cookies.set("oai-did", self.device_id, domain=".auth.openai.com") + self.session.cookies.set("oai-did", self.device_id, domain="auth.openai.com") + + code_verifier, code_challenge = _generate_pkce() + state = secrets.token_urlsafe(24) + + authorize_params = { + "response_type": "code", + "client_id": OAUTH_CLIENT_ID, + "redirect_uri": OAUTH_REDIRECT_URI, + "scope": "openid profile email offline_access", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "state": state, + } + authorize_url = f"{OAUTH_ISSUER}/oauth/authorize?{urlencode(authorize_params)}" + + def _oauth_json_headers(referer: str): + h = { + "Accept": "application/json", + "Content-Type": "application/json", + "Origin": OAUTH_ISSUER, + "Referer": referer, + "User-Agent": self.ua, + "oai-device-id": self.device_id, + } + h.update(_make_trace_headers()) + return h + + def _bootstrap_oauth_session(): + self._print("[OAuth] 1/7 GET /oauth/authorize") + try: + r = self.session.get( + authorize_url, + headers={ + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Referer": f"{self.BASE}/", + "Upgrade-Insecure-Requests": "1", + "User-Agent": self.ua, + }, + allow_redirects=True, + timeout=30, + impersonate=self.impersonate, + ) + except Exception as e: + self._print(f"[OAuth] /oauth/authorize 异常: {e}") + return False, "" + + final_url = str(r.url) + redirects = len(getattr(r, "history", []) or []) + self._print(f"[OAuth] /oauth/authorize -> {r.status_code}, final={(final_url or '-')[:140]}, redirects={redirects}") + + has_login = any(getattr(c, "name", "") == "login_session" for c in self.session.cookies) + self._print(f"[OAuth] login_session: {'已获取' if has_login else '未获取'}") + + if not has_login: + self._print("[OAuth] 未拿到 login_session,尝试访问 oauth2 auth 入口") + oauth2_url = f"{OAUTH_ISSUER}/api/oauth/oauth2/auth" + try: + r2 = self.session.get( + oauth2_url, + headers={ + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Referer": authorize_url, + "Upgrade-Insecure-Requests": "1", + "User-Agent": self.ua, + }, + params=authorize_params, + allow_redirects=True, + timeout=30, + impersonate=self.impersonate, + ) + final_url = str(r2.url) + redirects2 = len(getattr(r2, "history", []) or []) + self._print(f"[OAuth] /api/oauth/oauth2/auth -> {r2.status_code}, final={(final_url or '-')[:140]}, redirects={redirects2}") + except Exception as e: + self._print(f"[OAuth] /api/oauth/oauth2/auth 异常: {e}") + + has_login = any(getattr(c, "name", "") == "login_session" for c in self.session.cookies) + self._print(f"[OAuth] login_session(重试): {'已获取' if has_login else '未获取'}") + + return has_login, final_url + + def _post_authorize_continue(referer_url: str): + sentinel_authorize = build_sentinel_token( + self.session, + self.device_id, + flow="authorize_continue", + user_agent=self.ua, + sec_ch_ua=self.sec_ch_ua, + impersonate=self.impersonate, + ) + if not sentinel_authorize: + self._print("[OAuth] authorize_continue 的 sentinel token 获取失败") + return None + + headers_continue = _oauth_json_headers(referer_url) + headers_continue["openai-sentinel-token"] = sentinel_authorize + + try: + return self.session.post( + f"{OAUTH_ISSUER}/api/accounts/authorize/continue", + json={"username": {"kind": "email", "value": email}}, + headers=headers_continue, + timeout=30, + allow_redirects=False, + impersonate=self.impersonate, + ) + except Exception as e: + self._print(f"[OAuth] authorize/continue 异常: {e}") + return None + + has_login_session, authorize_final_url = _bootstrap_oauth_session() + if not authorize_final_url: + return None + + continue_referer = authorize_final_url if authorize_final_url.startswith(OAUTH_ISSUER) else f"{OAUTH_ISSUER}/log-in" + + self._print("[OAuth] 2/7 POST /api/accounts/authorize/continue") + resp_continue = _post_authorize_continue(continue_referer) + if resp_continue is None: + return None + + self._print(f"[OAuth] /authorize/continue -> {resp_continue.status_code}") + if resp_continue.status_code == 400 and "invalid_auth_step" in (resp_continue.text or ""): + self._print("[OAuth] invalid_auth_step,重新 bootstrap 后重试一次") + has_login_session, authorize_final_url = _bootstrap_oauth_session() + if not authorize_final_url: + return None + continue_referer = authorize_final_url if authorize_final_url.startswith(OAUTH_ISSUER) else f"{OAUTH_ISSUER}/log-in" + resp_continue = _post_authorize_continue(continue_referer) + if resp_continue is None: + return None + self._print(f"[OAuth] /authorize/continue(重试) -> {resp_continue.status_code}") + + if resp_continue.status_code != 200: + self._print(f"[OAuth] 邮箱提交失败: {resp_continue.text[:180]}") + return None + + try: + continue_data = resp_continue.json() + except Exception: + self._print("[OAuth] authorize/continue 响应解析失败") + return None + + continue_url = continue_data.get("continue_url", "") + page_type = (continue_data.get("page") or {}).get("type", "") + self._print(f"[OAuth] continue page={page_type or '-'} next={(continue_url or '-')[:140]}") + + self._print("[OAuth] 3/7 POST /api/accounts/password/verify") + sentinel_pwd = build_sentinel_token( + self.session, + self.device_id, + flow="password_verify", + user_agent=self.ua, + sec_ch_ua=self.sec_ch_ua, + impersonate=self.impersonate, + ) + if not sentinel_pwd: + self._print("[OAuth] password_verify 的 sentinel token 获取失败") + return None + + headers_verify = _oauth_json_headers(f"{OAUTH_ISSUER}/log-in/password") + headers_verify["openai-sentinel-token"] = sentinel_pwd + + try: + resp_verify = self.session.post( + f"{OAUTH_ISSUER}/api/accounts/password/verify", + json={"password": password}, + headers=headers_verify, + timeout=30, + allow_redirects=False, + impersonate=self.impersonate, + ) + except Exception as e: + self._print(f"[OAuth] password/verify 异常: {e}") + return None + + self._print(f"[OAuth] /password/verify -> {resp_verify.status_code}") + if resp_verify.status_code != 200: + self._print(f"[OAuth] 密码校验失败: {resp_verify.text[:180]}") + return None + + try: + verify_data = resp_verify.json() + except Exception: + self._print("[OAuth] password/verify 响应解析失败") + return None + + continue_url = verify_data.get("continue_url", "") or continue_url + page_type = (verify_data.get("page") or {}).get("type", "") or page_type + self._print(f"[OAuth] verify page={page_type or '-'} next={(continue_url or '-')[:140]}") + + need_oauth_otp = ( + page_type == "email_otp_verification" + or "email-verification" in (continue_url or "") + or "email-otp" in (continue_url or "") + ) + + if need_oauth_otp: + self._print("[OAuth] 4/7 检测到邮箱 OTP 验证") + if not mail_token: + self._print("[OAuth] OAuth 阶段需要邮箱 OTP,但未提供 mail_token") + return None + + headers_otp = _oauth_json_headers(f"{OAUTH_ISSUER}/email-verification") + tried_codes = set() + otp_success = False + otp_deadline = time.time() + 120 + + while time.time() < otp_deadline and not otp_success: + messages = self._fetch_emails_duckmail(mail_token) or [] + candidate_codes = [] + + for msg in messages[:12]: + msg_id = msg.get("id") or msg.get("@id") + if not msg_id: + continue + detail = self._fetch_email_detail_duckmail(mail_token, msg_id) + if not detail: + continue + content = detail.get("text") or detail.get("html") or "" + code = self._extract_verification_code(content) + if code and code not in tried_codes: + candidate_codes.append(code) + + if not candidate_codes: + elapsed = int(120 - max(0, otp_deadline - time.time())) + self._print(f"[OAuth] OTP 等待中... ({elapsed}s/120s)") + time.sleep(2) + continue + + for otp_code in candidate_codes: + tried_codes.add(otp_code) + self._print(f"[OAuth] 尝试 OTP: {otp_code}") + try: + resp_otp = self.session.post( + f"{OAUTH_ISSUER}/api/accounts/email-otp/validate", + json={"code": otp_code}, + headers=headers_otp, + timeout=30, + allow_redirects=False, + impersonate=self.impersonate, + ) + except Exception as e: + self._print(f"[OAuth] email-otp/validate 异常: {e}") + continue + + self._print(f"[OAuth] /email-otp/validate -> {resp_otp.status_code}") + if resp_otp.status_code != 200: + self._print(f"[OAuth] OTP 无效,继续尝试下一条: {resp_otp.text[:160]}") + continue + + try: + otp_data = resp_otp.json() + except Exception: + self._print("[OAuth] email-otp/validate 响应解析失败") + continue + + continue_url = otp_data.get("continue_url", "") or continue_url + page_type = (otp_data.get("page") or {}).get("type", "") or page_type + self._print(f"[OAuth] OTP 验证通过 page={page_type or '-'} next={(continue_url or '-')[:140]}") + otp_success = True + break + + if not otp_success: + time.sleep(2) + + if not otp_success: + self._print(f"[OAuth] OAuth 阶段 OTP 验证失败,已尝试 {len(tried_codes)} 个验证码") + return None + + code = None + consent_url = continue_url + if consent_url and consent_url.startswith("/"): + consent_url = f"{OAUTH_ISSUER}{consent_url}" + + if not consent_url and "consent" in page_type: + consent_url = f"{OAUTH_ISSUER}/sign-in-with-chatgpt/codex/consent" + + if consent_url: + code = _extract_code_from_url(consent_url) + + if not code and consent_url: + self._print("[OAuth] 5/7 跟随 continue_url 提取 code") + code, _ = self._oauth_follow_for_code(consent_url, referer=f"{OAUTH_ISSUER}/log-in/password") + + consent_hint = ( + ("consent" in (consent_url or "")) + or ("sign-in-with-chatgpt" in (consent_url or "")) + or ("workspace" in (consent_url or "")) + or ("organization" in (consent_url or "")) + or ("consent" in page_type) + or ("organization" in page_type) + ) + + if not code and consent_hint: + if not consent_url: + consent_url = f"{OAUTH_ISSUER}/sign-in-with-chatgpt/codex/consent" + self._print("[OAuth] 6/7 执行 workspace/org 选择") + code = self._oauth_submit_workspace_and_org(consent_url) + + if not code: + fallback_consent = f"{OAUTH_ISSUER}/sign-in-with-chatgpt/codex/consent" + self._print("[OAuth] 6/7 回退 consent 路径重试") + code = self._oauth_submit_workspace_and_org(fallback_consent) + if not code: + code, _ = self._oauth_follow_for_code(fallback_consent, referer=f"{OAUTH_ISSUER}/log-in/password") + + if not code: + self._print("[OAuth] 未获取到 authorization code") + return None + + self._print("[OAuth] 7/7 POST /oauth/token") + token_resp = self.session.post( + f"{OAUTH_ISSUER}/oauth/token", + headers={"Content-Type": "application/x-www-form-urlencoded", "User-Agent": self.ua}, + data={ + "grant_type": "authorization_code", + "code": code, + "redirect_uri": OAUTH_REDIRECT_URI, + "client_id": OAUTH_CLIENT_ID, + "code_verifier": code_verifier, + }, + timeout=60, + impersonate=self.impersonate, + ) + self._print(f"[OAuth] /oauth/token -> {token_resp.status_code}") + + if token_resp.status_code != 200: + self._print(f"[OAuth] token 交换失败: {token_resp.status_code} {token_resp.text[:200]}") + return None + + try: + data = token_resp.json() + except Exception: + self._print("[OAuth] token 响应解析失败") + return None + + if not data.get("access_token"): + self._print("[OAuth] token 响应缺少 access_token") + return None + + self._print("[OAuth] Codex Token 获取成功") + return data + + +# ==================== 并发批量注册 ==================== + +def _register_one(idx, total, proxy, output_file): + """单个注册任务 (在线程中运行) - 使用 DuckMail 临时邮箱""" + reg = None + try: + reg = ChatGPTRegister(proxy=proxy, tag=f"{idx}") + + # 1. 创建 DuckMail 临时邮箱 + reg._print("[DuckMail] 创建临时邮箱...") + email, email_pwd, mail_token = reg.create_temp_email() + tag = email.split("@")[0] + reg.tag = tag # 更新 tag + + chatgpt_password = _generate_password() + name = _random_name() + birthdate = _random_birthdate() + + with _print_lock: + print(f"\n{'='*60}") + print(f" [{idx}/{total}] 注册: {email}") + print(f" ChatGPT密码: {chatgpt_password}") + print(f" 邮箱密码: {email_pwd}") + print(f" 姓名: {name} | 生日: {birthdate}") + print(f"{'='*60}") + + # 2. 执行注册流程 + reg.run_register(email, chatgpt_password, name, birthdate, mail_token) + + # 3. OAuth(可选) + oauth_ok = True + if ENABLE_OAUTH: + reg._print("[OAuth] 开始获取 Codex Token...") + tokens = reg.perform_codex_oauth_login_http(email, chatgpt_password, mail_token=mail_token) + oauth_ok = bool(tokens and tokens.get("access_token")) + if oauth_ok: + _save_codex_tokens(email, tokens) + reg._print("[OAuth] Token 已保存") + else: + msg = "OAuth 获取失败" + if OAUTH_REQUIRED: + raise Exception(f"{msg}(oauth_required=true)") + reg._print(f"[OAuth] {msg}(按配置继续)") + + # 4. 线程安全写入结果 + with _file_lock: + with open(output_file, "a", encoding="utf-8") as out: + out.write(f"{email}----{chatgpt_password}----{email_pwd}----oauth={'ok' if oauth_ok else 'fail'}\n") + + with _print_lock: + print(f"\n[OK] [{tag}] {email} 注册成功!") + return True, email, None + + except Exception as e: + error_msg = str(e) + with _print_lock: + print(f"\n[FAIL] [{idx}] 注册失败: {error_msg}") + traceback.print_exc() + return False, None, error_msg + + +def run_batch(total_accounts: int = 3, output_file="registered_accounts.txt", + max_workers=3, proxy=None): + """并发批量注册 - DuckMail 临时邮箱版""" + + if not DUCKMAIL_BEARER: + print("❌ 错误: 未设置 DUCKMAIL_BEARER 环境变量") + print(" 请设置: export DUCKMAIL_BEARER='your_api_key_here'") + print(" 或: set DUCKMAIL_BEARER=your_api_key_here (Windows)") + return + + actual_workers = min(max_workers, total_accounts) + print(f"\n{'#'*60}") + print(f" ChatGPT 批量自动注册 (DuckMail 临时邮箱版)") + print(f" 注册数量: {total_accounts} | 并发数: {actual_workers}") + print(f" DuckMail: {DUCKMAIL_API_BASE}") + print(f" OAuth: {'开启' if ENABLE_OAUTH else '关闭'} | required: {'是' if OAUTH_REQUIRED else '否'}") + if ENABLE_OAUTH: + print(f" OAuth Issuer: {OAUTH_ISSUER}") + print(f" OAuth Client: {OAUTH_CLIENT_ID}") + print(f" Token输出: {TOKEN_JSON_DIR}/, {AK_FILE}, {RK_FILE}") + print(f" 输出文件: {output_file}") + print(f"{'#'*60}\n") + + success_count = 0 + fail_count = 0 + start_time = time.time() + + with ThreadPoolExecutor(max_workers=actual_workers) as executor: + futures = {} + for idx in range(1, total_accounts + 1): + future = executor.submit( + _register_one, idx, total_accounts, proxy, output_file + ) + futures[future] = idx + + for future in as_completed(futures): + idx = futures[future] + try: + ok, email, err = future.result() + if ok: + success_count += 1 + else: + fail_count += 1 + print(f" [账号 {idx}] 失败: {err}") + except Exception as e: + fail_count += 1 + with _print_lock: + print(f"[FAIL] 账号 {idx} 线程异常: {e}") + + elapsed = time.time() - start_time + avg = elapsed / total_accounts if total_accounts else 0 + print(f"\n{'#'*60}") + print(f" 注册完成! 耗时 {elapsed:.1f} 秒") + print(f" 总数: {total_accounts} | 成功: {success_count} | 失败: {fail_count}") + print(f" 平均速度: {avg:.1f} 秒/个") + if success_count > 0: + print(f" 结果文件: {output_file}") + print(f"{'#'*60}") + + +def main(): + print("=" * 60) + print(" ChatGPT 批量自动注册工具 (DuckMail 临时邮箱版)") + print("=" * 60) + + # 检查 DuckMail 配置 + if not DUCKMAIL_BEARER: + print("\n⚠️ 警告: 未设置 DUCKMAIL_BEARER") + print(" 请编辑 config.json 设置 duckmail_bearer,或设置环境变量:") + print(" Windows: set DUCKMAIL_BEARER=your_api_key_here") + print(" Linux/Mac: export DUCKMAIL_BEARER='your_api_key_here'") + print("\n 按 Enter 继续尝试运行 (可能会失败)...") + input() + + # 交互式代理配置 + proxy = DEFAULT_PROXY + if proxy: + print(f"[Info] 检测到默认代理: {proxy}") + use_default = input("使用此代理? (Y/n): ").strip().lower() + if use_default == "n": + proxy = input("输入代理地址 (留空=不使用代理): ").strip() or None + else: + env_proxy = os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy") \ + or os.environ.get("ALL_PROXY") or os.environ.get("all_proxy") + if env_proxy: + print(f"[Info] 检测到环境变量代理: {env_proxy}") + use_env = input("使用此代理? (Y/n): ").strip().lower() + if use_env == "n": + proxy = input("输入代理地址 (留空=不使用代理): ").strip() or None + else: + proxy = env_proxy + else: + proxy = input("输入代理地址 (如 http://127.0.0.1:7890,留空=不使用代理): ").strip() or None + + if proxy: + print(f"[Info] 使用代理: {proxy}") + else: + print("[Info] 不使用代理") + + # 输入注册数量 + count_input = input(f"\n注册账号数量 (默认 {DEFAULT_TOTAL_ACCOUNTS}): ").strip() + total_accounts = int(count_input) if count_input.isdigit() and int(count_input) > 0 else DEFAULT_TOTAL_ACCOUNTS + + workers_input = input("并发数 (默认 3): ").strip() + max_workers = int(workers_input) if workers_input.isdigit() and int(workers_input) > 0 else 3 + + run_batch(total_accounts=total_accounts, output_file=DEFAULT_OUTPUT_FILE, + max_workers=max_workers, proxy=proxy) + + +if __name__ == "__main__": + main() diff --git a/codex/README.md b/codex/README.md new file mode 100644 index 0000000..710e545 --- /dev/null +++ b/codex/README.md @@ -0,0 +1,60 @@ +# Codex 协议密钥生成工具 + +> 为 ChatGPT 注册生成 Codex 协议所需的 Access Key 和 Refresh Key + +## 功能 + +- 🔑 自动生成 Access Key (ak) +- 🔄 自动生成 Refresh Key (rk) +- 📤 支持上传到 Codex / CPA 面板 +- ⚡ 支持并发生成 + +## 配置 (config.json) + +```json +{ + "total_accounts": 800, + "concurrent_workers": 8, + "headless": false, + "proxy": "http://127.0.0.1:7890", + "cf_worker_domain": "你的 Cloudflare Worker 域名", + "cf_email_domain": "你的 Cloudflare 邮箱域名", + "cf_admin_password": "你的 Cloudflare 管理密码", + "upload_api_url": "https://你的CPA地址/v0/management/auth-files", + "upload_api_token": "你的CPA密码", + "cli_proxy_api_base": "你的CPA基础URL", + "cli_proxy_management_url": "http://你的CPA地址/management.html#/oauth", + "cli_proxy_password": "你的CPA密码" +} +``` + +| 配置项 | 说明 | +|--------|------| +| total_accounts | 生成账号数量 | +| concurrent_workers | 并发数 | +| proxy | 代理地址 | +| cf_worker_domain | Cloudflare Worker 域名 | +| upload_api_url | CPA 上传 API | +| cli_proxy_api_base | CPA CLI 代理 API | + +## 使用 + +```bash +python protocol_keygen.py +``` + +## 输出 + +- `ak.txt` - Access Keys +- `rk.txt` - Refresh Keys +- `registered_accounts.csv` - CSV 格式账号 + +## 接入 CPA 面板 + +生成后可以自动上传到 CPA 面板: + +1. 部署 CPA 面板: https://github.com/dongshuyan/CPA-Dashboard +2. 配置 `upload_api_url` 和 `upload_api_token` +3. 运行后自动上传 + +> 文档: https://help.router-for.me/cn/ diff --git a/codex/config.json b/codex/config.json new file mode 100644 index 0000000..a9c0ad9 --- /dev/null +++ b/codex/config.json @@ -0,0 +1,21 @@ +{ + "total_accounts": 800, + "concurrent_workers": 8, + "headless": false, + "proxy": "", + "cf_worker_domain": "", + "cf_email_domain": "", + "cf_admin_password": "", + "oauth_issuer": "https://auth.openai.com", + "oauth_client_id": "app_EMoamEEZ73f0CkXaXp7hrann", + "oauth_redirect_uri": "http://localhost:1455/auth/callback", + "upload_api_url": "https://你的CPA地址/v0/management/auth-files", + "upload_api_token": "你的CPA密码", + "cli_proxy_api_base": "你的CPA基础URL", + "cli_proxy_management_url": "http://你的CPA地址/management.html#/oauth", + "cli_proxy_password": "你的CPA密码", + "accounts_file": "accounts.txt", + "csv_file": "registered_accounts.csv", + "ak_file": "ak.txt", + "rk_file": "rk.txt" +} \ No newline at end of file diff --git a/codex/protocol_keygen.py b/codex/protocol_keygen.py new file mode 100644 index 0000000..1d9acc0 --- /dev/null +++ b/codex/protocol_keygen.py @@ -0,0 +1,2277 @@ +""" +OpenAI 协议注册机 (Protocol Keygen) v5 — 全流程纯 HTTP 实现 +======================================================== +协议注册机实现 + +核心架构(全流程纯 HTTP,零浏览器依赖): + + 【注册流程】全步骤纯 HTTP: + 步骤0:GET /oauth/authorize → 获取 login_session cookie(PKCE + screen_hint=signup) + 步骤0:POST /api/accounts/authorize/continue → 提交邮箱(需 sentinel token) + 步骤2:POST /api/accounts/user/register → 注册用户(username+password,需 sentinel) + 步骤3:GET /api/accounts/email-otp/send → 触发验证码发送 + 步骤4:POST /api/accounts/email-otp/validate → 提交邮箱验证码 + 步骤5:POST /api/accounts/create_account → 提交姓名+生日完成注册 + + 【OAuth 登录流程】纯 HTTP(perform_codex_oauth_login_http): + 步骤1:GET /oauth/authorize → 获取 login_session + 步骤2:POST /api/accounts/authorize/continue → 提交邮箱 + 步骤3:POST /api/accounts/password/verify → 提交密码 + 步骤4:consent 多步流程 → 提取 code → POST /oauth/token 换取 tokens + + Sentinel Token PoW 生成(纯 Python,逆向 SDK JS 的 PoW 算法): + - FNV-1a 哈希 + xorshift 混合 + - 伪造浏览器环境数据数组 + - 暴力搜索直到哈希前缀 ≤ 难度阈值 + - t 字段传空字符串(服务端不校验),c 字段从 sentinel API 实时获取 + +关键协议字段(逆向还原): + - oai-client-auth-session: OAuth 流程中由服务端 Set-Cookie 设置的会话 cookie + - openai-sentinel-token: JSON 对象 {p, t, c, id, flow} + - Cookie 链式传递: 每步 Set-Cookie 自动累积 + - oai-did: 设备唯一标识(UUID v4) + +环境依赖: + pip install requests +""" + +import json +import os +import re +import sys +import time +import uuid +import math +import random +import string +import secrets +import hashlib +import base64 +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timezone, timedelta +from urllib.parse import urlparse, parse_qs, urlencode, quote + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +import urllib3 + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + +# =================== 配置加载 =================== + +def load_config(): + """加载外部配置文件""" + config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json") + if not os.path.exists(config_path): + raise FileNotFoundError(f"config.json 未找到: {config_path}") + with open(config_path, "r", encoding="utf-8") as f: + return json.load(f) + + +_config = load_config() + +# 基础配置 +TOTAL_ACCOUNTS = _config.get("total_accounts", 30) +CONCURRENT_WORKERS = _config.get("concurrent_workers", 1) # 并发数(默认串行) +HEADLESS = _config.get("headless", False) # 是否无头模式运行浏览器 +PROXY = _config.get("proxy", "") + +# 邮箱配置 +CF_WORKER_DOMAIN = _config.get("cf_worker_domain", "email.tuxixilax.cfd") +CF_EMAIL_DOMAIN = _config.get("cf_email_domain", "tuxixilax.cfd") +CF_ADMIN_PASSWORD = _config.get("cf_admin_password", "") + +# OAuth 配置 +OAUTH_ISSUER = _config.get("oauth_issuer", "https://auth.openai.com") +OAUTH_CLIENT_ID = _config.get("oauth_client_id", "app_EMoamEEZ73f0CkXaXp7hrann") +OAUTH_REDIRECT_URI = _config.get("oauth_redirect_uri", "http://localhost:1455/auth/callback") + +# 上传配置 +UPLOAD_API_URL = _config.get("upload_api_url", "") +UPLOAD_API_TOKEN = _config.get("upload_api_token", "") + +# 输出文件 +ACCOUNTS_FILE = _config.get("accounts_file", "accounts.txt") +CSV_FILE = _config.get("csv_file", "registered_accounts.csv") +AK_FILE = _config.get("ak_file", "ak.txt") +RK_FILE = _config.get("rk_file", "rk.txt") + +# 并发文件写入锁(多线程共享文件时防止数据竞争) +_file_lock = threading.Lock() + +# OpenAI 认证域名 +OPENAI_AUTH_BASE = "https://auth.openai.com" + +# ChatGPT 域名(用于 OAuth 登录获取 Token) +CHATGPT_BASE = "https://chatgpt.com" + + +# =================== HTTP 会话管理 =================== + +def create_session(): + """创建带重试策略的 HTTP 会话""" + session = requests.Session() + retry = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) + adapter = HTTPAdapter(max_retries=retry) + session.mount("https://", adapter) + session.mount("http://", adapter) + if PROXY: + session.proxies = {"http": PROXY, "https": PROXY} + return session + + +# 使用普通 session(全流程纯 HTTP,无需浏览器) + + +# =================== 工具函数 =================== + +# 浏览器 UA(需与 sec-ch-ua 版本一致) +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.0.0 Safari/537.36" +) + +# API 请求头模板(从 cURL 逆向提取) +COMMON_HEADERS = { + "accept": "application/json", + "accept-language": "en-US,en;q=0.9", + "content-type": "application/json", + "origin": OPENAI_AUTH_BASE, + "user-agent": USER_AGENT, + "sec-ch-ua": '"Google Chrome";v="145", "Not?A_Brand";v="8", "Chromium";v="145"', + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"Windows"', + "sec-fetch-dest": "empty", + "sec-fetch-mode": "cors", + "sec-fetch-site": "same-origin", +} + +# 页面导航请求头(用于 GET 类请求) +NAVIGATE_HEADERS = { + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", + "accept-language": "en-US,en;q=0.9", + "user-agent": USER_AGENT, + "sec-ch-ua": '"Google Chrome";v="145", "Not?A_Brand";v="8", "Chromium";v="145"', + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"Windows"', + "sec-fetch-dest": "document", + "sec-fetch-mode": "navigate", + "sec-fetch-site": "same-origin", + "sec-fetch-user": "?1", + "upgrade-insecure-requests": "1", +} + + +def generate_device_id(): + """生成设备唯一标识(oai-did),UUID v4 格式""" + return str(uuid.uuid4()) + + +def generate_random_password(length=16): + """生成符合 OpenAI 要求的随机密码""" + chars = string.ascii_letters + string.digits + "!@#$%" + pwd = list( + random.choice(string.ascii_uppercase) + + random.choice(string.ascii_lowercase) + + random.choice(string.digits) + + random.choice("!@#$%") + + "".join(random.choice(chars) for _ in range(length - 4)) + ) + random.shuffle(pwd) + return "".join(pwd) + + +def generate_random_name(): + """随机生成自然的英文姓名""" + first = [ + "James", "Robert", "John", "Michael", "David", "William", "Richard", + "Mary", "Jennifer", "Linda", "Elizabeth", "Susan", "Jessica", "Sarah", + "Emily", "Emma", "Olivia", "Sophia", "Liam", "Noah", "Oliver", "Ethan", + ] + last = [ + "Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", + "Davis", "Wilson", "Anderson", "Thomas", "Taylor", "Moore", "Martin", + ] + return random.choice(first), random.choice(last) + + +def generate_random_birthday(): + """生成随机生日字符串,格式 YYYY-MM-DD(20~30岁)""" + year = random.randint(1996, 2006) + month = random.randint(1, 12) + day = random.randint(1, 28) + return f"{year:04d}-{month:02d}-{day:02d}" + + +def generate_datadog_trace(): + """生成 Datadog APM 追踪头(从 cURL 中逆向提取的格式)""" + trace_id = str(random.getrandbits(64)) + parent_id = str(random.getrandbits(64)) + trace_hex = format(int(trace_id), '016x') + parent_hex = format(int(parent_id), '016x') + return { + "traceparent": f"00-0000000000000000{trace_hex}-{parent_hex}-01", + "tracestate": "dd=s:1;o:rum", + "x-datadog-origin": "rum", + "x-datadog-parent-id": parent_id, + "x-datadog-sampling-priority": "1", + "x-datadog-trace-id": trace_id, + } + + +def generate_pkce(): + """生成 PKCE code_verifier 和 code_challenge""" + code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(64)).rstrip(b"=").decode("ascii") + digest = hashlib.sha256(code_verifier.encode("ascii")).digest() + code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii") + return code_verifier, code_challenge + + +# =================== Sentinel Token 逆向生成 =================== +# +# 以下代码基于对 sentinel.openai.com 的 SDK JS 代码的逆向分析: +# https://sentinel.openai.com/sentinel/20260124ceb8/sdk.js +# +# 核心算法: +# 1. _getConfig() → 收集浏览器环境数据(18个元素的数组) +# 2. _runCheck(startTime, seed, difficulty, config, nonce) → PoW 计算 +# a) config[3] = nonce(第4个元素设为当前尝试次数) +# b) config[9] = performance.now() - startTime(耗时) +# c) data = base64(JSON.stringify(config)) +# d) hash = fnv1a_32(seed + data) +# e) 若 hash 的 hex 前缀 ≤ difficulty → 返回 data + "~S" +# 3. 最终 token = "gAAAAAB" + answer +# +# FNV-1a 32位哈希: +# offset_basis = 2166136261 +# prime = 16777619 +# for each byte: hash ^= byte; hash = (hash * prime) >>> 0 +# 然后做 xorshift 混合 + 转 8 位 hex +# + +class SentinelTokenGenerator: + """ + Sentinel Token 纯 Python 生成器 + + 通过逆向 sentinel SDK 的 PoW 算法, + 纯 Python 构造合法的 openai-sentinel-token。 + """ + + MAX_ATTEMPTS = 500000 # 最大 PoW 尝试次数 + ERROR_PREFIX = "wQ8Lk5FbGpA2NcR9dShT6gYjU7VxZ4D" # SDK 中的错误前缀常量 + + def __init__(self, device_id=None): + self.device_id = device_id or generate_device_id() + self.requirements_seed = str(random.random()) + self.sid = str(uuid.uuid4()) + + @staticmethod + def _fnv1a_32(text): + """ + FNV-1a 32位哈希算法(从 SDK JS 逆向还原) + + 逆向来源:SDK 中的匿名函数,特征码: + e = 2166136261 (FNV offset basis) + e ^= t.charCodeAt(r) + e = Math.imul(e, 16777619) >>> 0 (FNV prime) + + 最后做 xorshift 混合(murmurhash3 风格的 finalizer): + e ^= e >>> 16 + e = Math.imul(e, 2246822507) >>> 0 + e ^= e >>> 13 + e = Math.imul(e, 3266489909) >>> 0 + e ^= e >>> 16 + """ + h = 2166136261 # FNV offset basis + for ch in text: + code = ord(ch) + h ^= code + # Math.imul(h, 16777619) >>> 0 模拟无符号32位乘法 + h = ((h * 16777619) & 0xFFFFFFFF) + + # xorshift 混合(murmurhash3 finalizer) + h ^= (h >> 16) + h = ((h * 2246822507) & 0xFFFFFFFF) + h ^= (h >> 13) + h = ((h * 3266489909) & 0xFFFFFFFF) + h ^= (h >> 16) + h = h & 0xFFFFFFFF + + # 转为8位 hex 字符串,左补零 + return format(h, '08x') + + def _get_config(self): + """ + 构造浏览器环境数据数组(_getConfig 方法逆向还原) + + SDK 中的元素对应关系(按索引): + [0] screen.width + screen.height → "1920x1080" 格式 + [1] new Date().toString() → 时间字符串 + [2] performance.memory.jsHeapSizeLimit → 内存限制 + [3] Math.random() → 随机数(后被 nonce 覆盖) + [4] navigator.userAgent → UA + [5] 随机 script src → 随机选一个页面 script 的 src + [6] 脚本版本匹配 → script src 匹配 c/[^/]*/_ + [7] document.documentElement.data-build → 构建版本 + [8] navigator.language → 语言 + [9] navigator.languages.join(',') → 语言列表(后被耗时覆盖) + [10] Math.random() → 随机数 + [11] 随机 navigator 属性 → 随机取 navigator 原型链上的一个属性 + [12] Object.keys(document) 随机一个 → document 属性 + [13] Object.keys(window) 随机一个 → window 属性 + [14] performance.now() → 高精度时间 + [15] self.sid → 会话标识 UUID + [16] URLSearchParams 参数 → URL 搜索参数 + [17] navigator.hardwareConcurrency → CPU 核心数 + [18] performance.timeOrigin → 时间起点 + """ + # 模拟真实的浏览器环境数据 + screen_info = f"1920x1080" + now = datetime.now(timezone.utc) + # 格式化为 JS Date.toString() 格式 + date_str = now.strftime("%a %b %d %Y %H:%M:%S GMT+0000 (Coordinated Universal Time)") + js_heap_limit = 4294705152 # Chrome 典型值 + nav_random1 = random.random() + ua = USER_AGENT + # 模拟 sentinel SDK 的 script src + script_src = "https://sentinel.openai.com/sentinel/20260124ceb8/sdk.js" + # 匹配 c/[^/]*/_ + script_version = None + data_build = None + language = "en-US" + languages = "en-US,en" + nav_random2 = random.random() + # 模拟随机 navigator 属性 + nav_props = [ + "vendorSub", "productSub", "vendor", "maxTouchPoints", + "scheduling", "userActivation", "doNotTrack", "geolocation", + "connection", "plugins", "mimeTypes", "pdfViewerEnabled", + "webkitTemporaryStorage", "webkitPersistentStorage", + "hardwareConcurrency", "cookieEnabled", "credentials", + "mediaDevices", "permissions", "locks", "ink", + ] + nav_prop = random.choice(nav_props) + # 模拟属性值 + nav_val = f"{nav_prop}−undefined" # SDK 用 − (U+2212) 而非 - (U+002D) + doc_key = random.choice(["location", "implementation", "URL", "documentURI", "compatMode"]) + win_key = random.choice(["Object", "Function", "Array", "Number", "parseFloat", "undefined"]) + perf_now = random.uniform(1000, 50000) + hardware_concurrency = random.choice([4, 8, 12, 16]) + # 模拟 performance.timeOrigin(毫秒级 Unix 时间戳) + time_origin = time.time() * 1000 - perf_now + + config = [ + screen_info, # [0] 屏幕尺寸 + date_str, # [1] 时间 + js_heap_limit, # [2] 内存限制 + nav_random1, # [3] 占位,后被 nonce 替换 + ua, # [4] UserAgent + script_src, # [5] script src + script_version, # [6] 脚本版本 + data_build, # [7] 构建版本 + language, # [8] 语言 + languages, # [9] 占位,后被耗时替换 + nav_random2, # [10] 随机数 + nav_val, # [11] navigator 属性 + doc_key, # [12] document key + win_key, # [13] window key + perf_now, # [14] performance.now + self.sid, # [15] 会话 UUID + "", # [16] URL 参数 + hardware_concurrency, # [17] CPU 核心数 + time_origin, # [18] 时间起点 + ] + return config + + @staticmethod + def _base64_encode(data): + """ + 模拟 SDK 的 E() 函数:JSON.stringify → TextEncoder.encode → btoa + """ + json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) + encoded = json_str.encode('utf-8') + return base64.b64encode(encoded).decode('ascii') + + def _run_check(self, start_time, seed, difficulty, config, nonce): + """ + 单次 PoW 检查(_runCheck 方法逆向还原) + + 参数: + start_time: 起始时间(秒) + seed: PoW 种子字符串 + difficulty: 难度字符串(hex 前缀阈值) + config: 环境配置数组 + nonce: 当前尝试序号 + + 返回: + 成功时返回 base64(config) + "~S" + 失败时返回 None + """ + # 设置 nonce 和耗时 + config[3] = nonce + config[9] = round((time.time() - start_time) * 1000) # 毫秒 + + # base64 编码环境数据 + data = self._base64_encode(config) + + # 计算 FNV-1a 哈希:hash(seed + data) + hash_input = seed + data + hash_hex = self._fnv1a_32(hash_input) + + # 难度校验:哈希前缀 ≤ 难度值 + diff_len = len(difficulty) + if hash_hex[:diff_len] <= difficulty: + return data + "~S" + + return None + + def generate_token(self, seed=None, difficulty=None): + """ + 生成 sentinel token(完整 PoW 流程) + + 参数: + seed: PoW 种子(来自服务端的 proofofwork.seed) + difficulty: 难度值(来自服务端的 proofofwork.difficulty) + + 返回: + 格式为 "gAAAAAB..." 的 sentinel token 字符串 + """ + # 如果没有服务端提供的 seed/difficulty,使用 requirements token 模式 + if seed is None: + seed = self.requirements_seed + difficulty = difficulty or "0" + + + start_time = time.time() + + config = self._get_config() + + for i in range(self.MAX_ATTEMPTS): + result = self._run_check(start_time, seed, difficulty, config, i) + if result: + elapsed = time.time() - start_time + print(f" ✅ PoW 完成: {i+1} 次迭代, 耗时 {elapsed:.2f}s") + return "gAAAAAB" + result + + # PoW 失败(超过最大尝试次数),返回错误 token + print(f" ⚠️ PoW 超过最大尝试次数 ({self.MAX_ATTEMPTS})") + return "gAAAAAB" + self.ERROR_PREFIX + self._base64_encode(str(None)) + + def generate_requirements_token(self): + """ + 生成 requirements token(不需要服务端参数) + + 这是 SDK 中 getRequirementsToken() 的还原。 + 用于不需要服务端 seed 的场景(如注册页面初始化)。 + """ + config = self._get_config() + config[3] = 1 + config[9] = round(random.uniform(5, 50)) # 模拟小延迟 + data = self._base64_encode(config) + return "gAAAAAC" + data # 注意前缀是 C 不是 B + + +# =================== Cloudflare 临时邮箱 =================== + +def create_temp_email(session): + """通过 Cloudflare Worker 创建临时邮箱""" + print("📧 创建临时邮箱...") + name_len = random.randint(10, 14) + name_chars = list(random.choices(string.ascii_lowercase, k=name_len)) + for _ in range(random.choice([1, 2])): + pos = random.randint(2, len(name_chars) - 1) + name_chars.insert(pos, random.choice(string.digits)) + name = "".join(name_chars) + + try: + res = session.post( + f"https://{CF_WORKER_DOMAIN}/admin/new_address", + json={"enablePrefix": True, "name": name, "domain": CF_EMAIL_DOMAIN}, + headers={"x-admin-auth": CF_ADMIN_PASSWORD, "Content-Type": "application/json"}, + timeout=10, verify=False, + ) + if res.status_code == 200: + data = res.json() + email = data.get("address") + token = data.get("jwt") + if email: + print(f" ✅ 邮箱: {email}") + return email, token + print(f" ❌ 创建失败: {res.status_code}") + except Exception as e: + print(f" ❌ 异常: {e}") + return None, None + + +def fetch_emails(session, email, cf_token): + """获取邮箱中的邮件""" + try: + res = session.get( + f"https://{CF_WORKER_DOMAIN}/api/mails", + params={"limit": 10, "offset": 0}, + headers={"Authorization": f"Bearer {cf_token}"}, + verify=False, timeout=30, + ) + if res.status_code == 200: + return res.json().get("results", []) + except Exception: + pass + return [] + + +def extract_verification_code(content): + """从邮件内容提取6位验证码""" + if not content: + return None + # 策略1:HTML body 样式匹配 + m = re.search(r'background-color:\s*#F3F3F3[^>]*>[\s\S]*?(\d{6})[\s\S]*?

', content) + if m: + return m.group(1) + # 策略2:Subject + m = re.search(r'Subject:.*?(\d{6})', content) + if m and m.group(1) != "177010": + return m.group(1) + # 策略3:通用正则 + for pat in [r'>\s*(\d{6})\s*<', r'(? 0: + org_id = ws_orgs[0].get("id") + projects = ws_orgs[0].get("projects", []) + if projects: + project_id = projects[0].get("id") + print(f" ✅ org_id: {org_id}") + print(f" ✅ project_id: {project_id}") + + if org_id: + print(f" [4c] POST organization/select...") + body = {"org_id": org_id} + if project_id: + body["project_id"] = project_id + + h_org = dict(COMMON_HEADERS) + h_org["referer"] = org_url + h_org["oai-device-id"] = device_id + h_org.update(generate_datadog_trace()) + + resp = session.post( + f"{OAUTH_ISSUER}/api/accounts/organization/select", + json=body, headers=h_org, + verify=False, timeout=30, allow_redirects=False, + ) + print(f" 状态码: {resp.status_code}") + + if resp.status_code in (301, 302, 303, 307, 308): + loc = resp.headers.get("Location", "") + auth_code = _extract_code_from_url(loc) + if auth_code: + print(f" ✅ organization/select 获取到 code(长度: {len(auth_code)})") + else: + # 继续跟踪重定向链 + auth_code = _follow_and_extract_code(session, loc) + if auth_code: + print(f" ✅ 跟踪重定向获取到 code(长度: {len(auth_code)})") + elif resp.status_code == 200: + org_data = resp.json() + org_next = org_data.get("continue_url", "") + print(f" org continue_url: {org_next}") + if org_next: + full_next = org_next if org_next.startswith("http") else f"{OAUTH_ISSUER}{org_next}" + auth_code = _follow_and_extract_code(session, full_next) + if auth_code: + print(f" ✅ 跟踪获取到 code(长度: {len(auth_code)})") + else: + print(f" ⚠️ 未找到 org_id,尝试直接跟踪 consent URL...") + auth_code = _follow_and_extract_code(session, org_url) + if auth_code: + print(f" ✅ 直接跟踪获取到 code(长度: {len(auth_code)})") + else: + # workspace/select 返回了非 organization 的 continue_url,直接跟踪 + if ws_next: + full_next = ws_next if ws_next.startswith("http") else f"{OAUTH_ISSUER}{ws_next}" + auth_code = _follow_and_extract_code(session, full_next) + if auth_code: + print(f" ✅ 跟踪获取到 code(长度: {len(auth_code)})") + except Exception as e: + print(f" ⚠️ workspace/select 异常: {e}") + import traceback + traceback.print_exc() + + # ----- 步骤4d: 备用策略 — allow_redirects=True 捕获 ConnectionError ----- + if not auth_code: + print(" [4d] 备用策略: GET consent (allow_redirects=True)...") + try: + resp = session.get(consent_url, headers=NAVIGATE_HEADERS, + verify=False, timeout=30, allow_redirects=True) + print(f" 最终: {resp.status_code}, URL: {resp.url[:200]}") + auth_code = _extract_code_from_url(resp.url) + if auth_code: + print(f" ✅ 最终 URL 中提取到 code") + # 检查重定向链 + if not auth_code and resp.history: + for r in resp.history: + loc = r.headers.get("Location", "") + auth_code = _extract_code_from_url(loc) + if auth_code: + print(f" ✅ 重定向链中提取到 code") + break + except requests.exceptions.ConnectionError as e: + url_match = re.search(r'(https?://localhost[^\s\'"]+)', str(e)) + if url_match: + auth_code = _extract_code_from_url(url_match.group(1)) + if auth_code: + print(f" ✅ ConnectionError 中提取到 code") + except Exception as e: + print(f" ⚠️ 备用策略异常: {e}") + + if not auth_code: + print(" ❌ 未获取到 authorization code") + return None + + # 用 code 换 token(复用已有的 codex_exchange_code 函数) + return codex_exchange_code(auth_code, code_verifier) + + +# =================== Codex OAuth 登录 + CPA 回调(浏览器版,作为 fallback) =================== + +def perform_codex_oauth_login(email, password, registrar_session=None): + """ + 注册成功后,通过浏览器混合模式执行 Codex OAuth 登录获取 Token。 + + 混合架构: + 浏览器层:完成 OAuth 登录全流程(邮箱+密码提交) + - sentinel SDK 在浏览器内自动生成 t/c 字段(反机器人遥测+challenge response) + - 通过 CDP 网络事件监听捕获 authorization code + HTTP 层:用 code 换取 tokens(POST /oauth/token,无需 sentinel) + + 使用 Codex 专用配置(来自 config.json): + client_id: app_EMoamEEZ73f0CkXaXp7hrann(Codex CLI) + redirect_uri: http://localhost:1455/auth/callback + scope: openid profile email offline_access + + 参数: + email: 注册的邮箱 + password: 注册的密码 + registrar_session: 注册时的 requests.Session(含 CF cookies,可选,本模式暂未使用) + 返回: + dict: tokens 字典(含 access_token/refresh_token/id_token),失败返回 None + """ + print("\n🔐 执行 Codex OAuth 登录获取 Token(浏览器混合模式)...") + + # 1. 构造 PKCE 参数 + code_verifier, code_challenge = generate_pkce() + state = secrets.token_urlsafe(32) + + authorize_params = { + "response_type": "code", + "client_id": OAUTH_CLIENT_ID, + "redirect_uri": OAUTH_REDIRECT_URI, + "scope": "openid profile email offline_access", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "state": state, + } + authorize_url = f"{OAUTH_ISSUER}/oauth/authorize?{urlencode(authorize_params)}" + + try: + import undetected_chromedriver as uc + from selenium.webdriver.common.by import By + except ImportError: + print(" ❌ 需要安装 undetected-chromedriver:") + print(" pip install undetected-chromedriver selenium") + return None + + driver = None + try: + # 2. 启动浏览器(带 CDP 网络事件监听) + mode_str = "无头模式" if HEADLESS else "有头模式" + print(f" 🌐 启动浏览器执行 OAuth 登录({mode_str},sentinel SDK 自动处理 t/c 字段)...") + options = uc.ChromeOptions() + options.add_argument("--no-sandbox") + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--disable-gpu") + options.add_argument("--window-size=800,600") + options.add_argument(f"--user-agent={USER_AGENT}") + if HEADLESS: + options.add_argument("--headless=new") + if PROXY: + options.add_argument(f"--proxy-server={PROXY}") + + driver = uc.Chrome(version_main=145, options=options, use_subprocess=True) + + # 启用 CDP 网络事件监听(捕获请求中的 authorization code 回调) + driver.execute_cdp_cmd("Network.enable", {}) + + # 注入 JS Hook:拦截所有导航/请求,捕获回调 URL 中的 code + # 由于 redirect_uri 是 localhost:1455(不可达),浏览器会导航失败但 URL 仍可读取 + # 同时注入 sentinel token 拦截 Hook(调试用,可查看 t/c 内容) + hook_js = """ + // 拦截 XHR 请求头,捕获 sentinel token(调试用) + (function() { + window.__sentinel_tokens = []; + const origOpen = XMLHttpRequest.prototype.open; + const origSetHeader = XMLHttpRequest.prototype.setRequestHeader; + XMLHttpRequest.prototype.setRequestHeader = function(name, value) { + if (name === 'openai-sentinel-token') { + try { + window.__sentinel_tokens.push(JSON.parse(value)); + console.log('SENTINEL_CAPTURED:', value.substring(0, 80)); + } catch(e) {} + } + return origSetHeader.call(this, name, value); + }; + + // 同时拦截 fetch + const origFetch = window.fetch; + window.fetch = function(input, init) { + if (init && init.headers) { + let sentinel = null; + if (init.headers instanceof Headers) { + sentinel = init.headers.get('openai-sentinel-token'); + } else if (typeof init.headers === 'object') { + sentinel = init.headers['openai-sentinel-token']; + } + if (sentinel) { + try { + window.__sentinel_tokens.push(JSON.parse(sentinel)); + console.log('SENTINEL_CAPTURED_FETCH:', sentinel.substring(0, 80)); + } catch(e) {} + } + } + return origFetch.apply(this, arguments); + }; + })(); + """ + # 在新文档加载前注入 Hook + driver.execute_cdp_cmd( + "Page.addScriptToEvaluateOnNewDocument", + {"source": hook_js} + ) + + # 3. 导航到 OAuth authorize URL + print(f" 📡 访问 OAuth authorize URL...") + driver.get(authorize_url) + + # 4. 等待 Cloudflare Challenge 完成 + 页面加载 + print(" ⏳ 等待 Cloudflare Challenge + 登录页面加载...") + for i in range(60): + try: + current_url = driver.current_url + # 检查是否已到达回调(极快通过的情况) + if "localhost" in current_url and "code=" in current_url: + print(f" ✅ 快速到达回调(第 {i+1}s)") + break + # 检查是否有输入框或按钮(登录页加载完成) + inputs = driver.find_elements(By.CSS_SELECTOR, "input") + if inputs: + print(f" ✅ 登录页面加载完成(第 {i+1}s)") + break + except Exception: + pass + if i % 15 == 0 and i > 0: + print(f" ... 已等待 {i}s") + time.sleep(1) + + time.sleep(1) + + # 辅助函数:检测并点击错误页面的重试按钮 + def _check_and_retry_error(): + """检测 OAuth 错误页面并点击重试按钮""" + try: + buttons = driver.find_elements(By.TAG_NAME, "button") + for btn in buttons: + try: + btn_text = btn.text.strip().lower() + if btn_text in ["重试", "retry", "try again", "重新尝试"]: + if btn.is_displayed(): + driver.execute_script("arguments[0].click();", btn) + print(f" 🔁 检测到错误页面,已点击重试") + time.sleep(3) + return True + except Exception: + continue + except Exception: + pass + return False + + # 5. 自动化 OAuth 登录流程(邮箱 → 密码 → 确认) + auth_code = None + max_steps = 30 # 最大步骤数(防止无限循环) + + for step_i in range(max_steps): + try: + current_url = driver.current_url + + # ===== 检查是否已到达回调 URL ===== + if ("localhost" in current_url or "callback" in current_url) and "code=" in current_url: + parsed = urlparse(current_url) + params = parse_qs(parsed.query) + auth_code = params.get("code", [None])[0] + if auth_code: + print(f" ✅ 获取到 authorization code(URL 回调,长度: {len(auth_code)})") + break + + # ===== 检是否是错误页面 ===== + if _check_and_retry_error(): + continue + + # ===== 邮箱输入页面 ===== + email_inputs = driver.find_elements( + By.CSS_SELECTOR, + 'input[type="email"], input[name="email"], input[name="username"], input[id="email"]' + ) + visible_email = [e for e in email_inputs if e.is_displayed()] + if visible_email: + print(f" 📧 [OAuth] 输入邮箱: {email}") + inp = visible_email[0] + inp.clear() + inp.send_keys(email) + time.sleep(0.5) + # 点击 Continue/Submit 按钮 + submit_btns = driver.find_elements(By.CSS_SELECTOR, 'button[type="submit"]') + if submit_btns: + driver.execute_script("arguments[0].click();", submit_btns[0]) + else: + # 回退:查找任何按钮 + buttons = driver.find_elements(By.TAG_NAME, "button") + for btn in buttons: + text = btn.text.strip().lower() + if text in ("continue", "继续", "next", "sign in", "log in"): + driver.execute_script("arguments[0].click();", btn) + break + print(" ✅ 邮箱已提交") + time.sleep(3) + continue + + # ===== 密码输入页面 ===== + pwd_inputs = driver.find_elements( + By.CSS_SELECTOR, + 'input[type="password"], input[name="password"]' + ) + visible_pwd = [e for e in pwd_inputs if e.is_displayed()] + if visible_pwd: + print(" 🔑 [OAuth] 输入密码...") + inp = visible_pwd[0] + inp.clear() + # 逐字符输入密码(模拟真实打字,避免反机器人检测) + for char in password: + inp.send_keys(char) + time.sleep(0.03) + time.sleep(0.5) + # 点击 Submit + submit_btns = driver.find_elements(By.CSS_SELECTOR, 'button[type="submit"]') + if submit_btns: + driver.execute_script("arguments[0].click();", submit_btns[0]) + else: + buttons = driver.find_elements(By.TAG_NAME, "button") + for btn in buttons: + text = btn.text.strip().lower() + if text in ("continue", "继续", "log in", "sign in"): + driver.execute_script("arguments[0].click();", btn) + break + print(" ✅ 密码已提交") + time.sleep(3) + continue + + # ===== 授权确认页面 / Continue 按钮 ===== + buttons = driver.find_elements(By.TAG_NAME, "button") + clicked_consent = False + for btn in buttons: + try: + btn_text = btn.text.strip().lower() + if btn_text in ("continue", "继续", "allow", "approve", "accept", "authorize"): + if btn.is_displayed() and btn.is_enabled(): + driver.execute_script("arguments[0].click();", btn) + print(f" ✅ [OAuth] 已点击确认按钮: '{btn.text.strip()}'") + clicked_consent = True + time.sleep(3) + break + except Exception: + continue + + if clicked_consent: + continue + + # ===== 没有可操作的元素,等待页面变化 ===== + time.sleep(2) + + except Exception as e: + print(f" ⚠️ OAuth 步骤异常: {e}") + time.sleep(2) + + # 6. 如果通过 URL 未获取到 code,尝试从网络日志中获取 + if not auth_code: + print(" 🔍 尝试从浏览器网络日志中提取 authorization code...") + try: + # 检查 performance log(如果可用) + logs = driver.get_log("performance") + for entry in logs: + try: + msg = json.loads(entry["message"]) + method = msg.get("message", {}).get("method", "") + if method in ("Network.requestWillBeSent", "Network.responseReceived"): + url = (msg.get("message", {}).get("params", {}) + .get("request", {}).get("url", "") + or msg.get("message", {}).get("params", {}) + .get("response", {}).get("url", "")) + if "code=" in url and "localhost" in url: + parsed = urlparse(url) + params = parse_qs(parsed.query) + auth_code = params.get("code", [None])[0] + if auth_code: + print(f" ✅ 从网络日志中获取到 code(长度: {len(auth_code)})") + break + except Exception: + continue + except Exception: + pass + + # 7. 最后尝试:直接读取当前 URL + if not auth_code: + try: + final_url = driver.current_url + if "code=" in final_url: + parsed = urlparse(final_url) + params = parse_qs(parsed.query) + auth_code = params.get("code", [None])[0] + if auth_code: + print(f" ✅ 从最终 URL 获取到 code(长度: {len(auth_code)})") + except Exception: + pass + + # 调试:打印捕获到的 sentinel tokens(如果有) + try: + captured = driver.execute_script("return window.__sentinel_tokens || [];") + if captured: + print(f" 📋 调试: 共捕获 {len(captured)} 个 sentinel tokens") + for idx, st in enumerate(captured[:3]): # 最多打印3个 + t_val = st.get("t", "") + c_val = st.get("c", "") + flow = st.get("flow", "") + print(f" [{idx}] flow={flow}, t长度={len(t_val)}, c长度={len(c_val)}") + except Exception: + pass + + # 8. 用 authorization code 换取 tokens + if auth_code: + return codex_exchange_code(auth_code, code_verifier) + + print(" ❌ 未获取到 authorization code") + try: + print(f" 最终 URL: {driver.current_url[:200]}") + except Exception: + pass + return None + + except Exception as e: + print(f" ❌ Codex OAuth 登录异常: {e}") + import traceback + traceback.print_exc() + return None + finally: + if driver: + try: + driver.quit() + print(" 🔒 OAuth 浏览器已关闭") + except (OSError, Exception): + pass + + +def codex_exchange_code(code, code_verifier): + """ + 用 authorization code 换取 Codex tokens + + POST https://auth.openai.com/oauth/token + Content-Type: application/x-www-form-urlencoded + """ + print(" 🔄 换取 Codex Token...") + session = create_session() + + for attempt in range(2): + try: + resp = session.post( + f"{OAUTH_ISSUER}/oauth/token", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={ + "grant_type": "authorization_code", + "code": code, + "redirect_uri": OAUTH_REDIRECT_URI, + "client_id": OAUTH_CLIENT_ID, + "code_verifier": code_verifier, + }, + verify=False, + timeout=60, + ) + break + except Exception as e: + if attempt == 0: + print(f" ⚠️ Token 交换超时,重试...") + time.sleep(2) + continue + print(f" ❌ Token 交换失败: {e}") + return None + + if resp.status_code == 200: + data = resp.json() + print(f" ✅ Codex Token 获取成功!") + print(f" Access Token 长度: {len(data.get('access_token', ''))}") + print(f" Refresh Token: {'✅' if data.get('refresh_token') else '❌'}") + print(f" ID Token: {'✅' if data.get('id_token') else '❌'}") + return data + else: + print(f" ❌ Token 交换失败: {resp.status_code}") + print(f" 响应: {resp.text[:300]}") + return None + + +# =================== Token JSON 保存 + CPA 上传 =================== + +def decode_jwt_payload(token): + """解析 JWT token 的 payload 部分""" + try: + parts = token.split(".") + if len(parts) != 3: + return {} + payload = parts[1] + # 补齐 base64 padding + padding = 4 - len(payload) % 4 + if padding != 4: + payload += "=" * padding + decoded = base64.urlsafe_b64decode(payload) + return json.loads(decoded) + except Exception: + return {} + + +def save_token_json(email, access_token, refresh_token=None, id_token=None): + """ + 保存完整的 Token JSON 文件(格式兼容 Codex),并自动上传到 CPA 管理平台。 + + JSON 格式与 codex_ultimate.py 一致: + { + "type": "codex", + "email": "xxx@xxx.com", + "expired": "2026-02-20T15:30:00+08:00", + "id_token": "...", + "account_id": "...", + "access_token": "...", + "last_refresh": "2026-02-18T15:30:00+08:00", + "refresh_token": "..." + } + """ + try: + from datetime import datetime, timezone, timedelta + + payload = decode_jwt_payload(access_token) + + # 提取 account_id + auth_info = payload.get("https://api.openai.com/auth", {}) + account_id = auth_info.get("chatgpt_account_id", "") + + # 计算过期时间 + exp_timestamp = payload.get("exp", 0) + if exp_timestamp: + exp_dt = datetime.fromtimestamp(exp_timestamp, tz=timezone(timedelta(hours=8))) + expired_str = exp_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00") + else: + expired_str = "" + + now = datetime.now(tz=timezone(timedelta(hours=8))) + last_refresh_str = now.strftime("%Y-%m-%dT%H:%M:%S+08:00") + + token_data = { + "type": "codex", + "email": email, + "expired": expired_str, + "id_token": id_token or "", + "account_id": account_id, + "access_token": access_token, + "last_refresh": last_refresh_str, + "refresh_token": refresh_token or "", + } + + filename = f"{email}.json" + with open(filename, "w", encoding="utf-8") as f: + json.dump(token_data, f, ensure_ascii=False) + print(f" ✅ Token JSON 已保存到 {filename}") + + # 上传到 CPA 管理平台 + if UPLOAD_API_URL: + upload_token_json(filename) + + except Exception as e: + print(f" ❌ 保存 Token JSON 失败: {e}") + + +def upload_token_json(filename): + """上传 Token JSON 文件到 CPA 管理平台""" + try: + session = create_session() + with open(filename, "rb") as f: + files = {"file": (filename, f, "application/json")} + headers = {"Authorization": f"Bearer {UPLOAD_API_TOKEN}"} + + resp = session.post( + UPLOAD_API_URL, + files=files, + headers=headers, + verify=False, + timeout=30, + ) + + if resp.status_code == 200: + print(f" ✅ Token JSON 已上传到 CPA 管理平台") + else: + print(f" ❌ CPA 上传失败: {resp.status_code} - {resp.text[:200]}") + except Exception as e: + print(f" ❌ CPA 上传异常: {e}") + + +def save_tokens(email, tokens): + """保存 tokens 到所有目标(txt + JSON + CPA 上传),线程安全""" + access_token = tokens.get("access_token", "") + refresh_token = tokens.get("refresh_token", "") + id_token = tokens.get("id_token", "") + + with _file_lock: + if access_token: + with open(AK_FILE, "a", encoding="utf-8") as f: + f.write(f"{access_token}\n") + if refresh_token: + with open(RK_FILE, "a", encoding="utf-8") as f: + f.write(f"{refresh_token}\n") + + if access_token: + save_token_json(email, access_token, refresh_token, id_token) + + +# =================== 账号持久化 =================== + +def save_account(email, password): + """保存账号信息(线程安全)""" + try: + with _file_lock: + with open(ACCOUNTS_FILE, "a", encoding="utf-8") as f: + f.write(f"{email}:{password}\n") + file_exists = os.path.exists(CSV_FILE) + with open(CSV_FILE, "a", newline="", encoding="utf-8") as f: + import csv + w = csv.writer(f) + if not file_exists: + w.writerow(["email", "password", "timestamp"]) + w.writerow([email, password, time.strftime("%Y-%m-%d %H:%M:%S")]) + print(f" ✅ 账号已保存") + except Exception as e: + print(f" ⚠️ 保存失败: {e}") + + +# =================== 批量执行入口 =================== + +def register_one(worker_id=0, task_index=0, total=1): + """ + 注册单个账号的完整流程(线程安全) + 返回: (email, password, success, reg_time, total_time) + """ + tag = f"[W{worker_id}]" if CONCURRENT_WORKERS > 1 else "" + t_start = time.time() + session = create_session() + + # 1. 创建临时邮箱 + email, cf_token = create_temp_email(session) + if not email: + return None, None, False, 0, 0 + + password = generate_random_password() + + # 2. 协议注册 + registrar = ProtocolRegistrar() + success, email, password = registrar.register(email, cf_token, password) + save_account(email, password) + + t_reg = time.time() - t_start # 注册耗时 + + if not success: + return email, password, False, t_reg, t_reg + + print(f" 📝 注册耗时: {t_reg:.1f}s") + + # 3. Codex OAuth 登录 + tokens = None + try: + tokens = perform_codex_oauth_login_http( + email, password, + registrar_session=registrar.session, + cf_token=cf_token, + ) + + if not tokens: + print(f"{tag} ❌ 纯 HTTP OAuth 失败") + + t_total = time.time() - t_start + if tokens: + save_tokens(email, tokens) + print(f"{tag} ✅ {email} | 注册 {t_reg:.1f}s + OAuth {t_total - t_reg:.1f}s = 总 {t_total:.1f}s") + else: + print(f"{tag} ⚠️ OAuth 失败(注册已成功)") + except Exception as e: + t_total = time.time() - t_start + print(f"{tag} ⚠️ OAuth 异常: {e}") + + return email, password, True, t_reg, t_total + + +def run_batch(): + """批量注册入口(支持并发)""" + workers = max(1, CONCURRENT_WORKERS) + batch_start = time.time() + + print(f"\n🚀 协议注册机 v5 — {TOTAL_ACCOUNTS} 个账号 | 并发 {workers} | 域名 {CF_EMAIL_DOMAIN}") + + ok = 0 + fail = 0 + results_lock = threading.Lock() + reg_times = [] # 注册耗时列表 + total_times = [] # 总耗时列表 + + if workers == 1: + for i in range(TOTAL_ACCOUNTS): + print(f"\n--- [{i+1}/{TOTAL_ACCOUNTS}] ---") + + email, password, success, t_reg, t_total = register_one( + worker_id=0, task_index=i + 1, total=TOTAL_ACCOUNTS + ) + + if success: + ok += 1 + reg_times.append(t_reg) + total_times.append(t_total) + else: + fail += 1 + + wall = time.time() - batch_start + throughput = wall / ok if ok > 0 else 0 + print(f"📊 {i+1}/{TOTAL_ACCOUNTS} | ✅{ok} ❌{fail} | 吞吐 {throughput:.1f}s/个 | 已用 {wall:.0f}s") + + if i < TOTAL_ACCOUNTS - 1: + wait = random.randint(3, 8) + time.sleep(wait) + else: + print(f"🔀 启动 {workers} 个并发 worker...\n") + + def _worker_task(task_index, worker_id): + if task_index > 1: + jitter = random.uniform(1, 3) * worker_id + time.sleep(jitter) + try: + email, password, success, t_reg, t_total = register_one( + worker_id=worker_id, + task_index=task_index, + total=TOTAL_ACCOUNTS + ) + return task_index, email, password, success, t_reg, t_total + except Exception as e: + print(f"[W{worker_id}] ❌ 异常: {e}") + return task_index, None, None, False, 0, 0 + + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = {} + for i in range(TOTAL_ACCOUNTS): + worker_id = (i % workers) + 1 + future = executor.submit(_worker_task, i + 1, worker_id) + futures[future] = i + 1 + + for future in as_completed(futures): + task_idx = futures[future] + try: + _, email, password, success, t_reg, t_total = future.result() + with results_lock: + if success: + ok += 1 + reg_times.append(t_reg) + total_times.append(t_total) + else: + fail += 1 + done = ok + fail + wall = time.time() - batch_start + throughput = wall / ok if ok > 0 else 0 + print(f"📊 {done}/{TOTAL_ACCOUNTS} | ✅{ok} ❌{fail} | 吞吐 {throughput:.1f}s/个 | 已用 {wall:.0f}s") + except Exception as e: + with results_lock: + fail += 1 + print(f"❌ 任务 {task_idx} 异常: {e}") + + elapsed = time.time() - batch_start + throughput = elapsed / ok if ok > 0 else 0 + avg_reg = sum(reg_times) / len(reg_times) if reg_times else 0 + avg_total = sum(total_times) / len(total_times) if total_times else 0 + print(f"\n🏁 完成: ✅{ok} ❌{fail} | 总耗时 {elapsed:.1f}s | 吞吐 {throughput:.1f}s/个 | 单号(注册 {avg_reg:.1f}s + OAuth {avg_total - avg_reg:.1f}s = {avg_total:.1f}s)") + + +if __name__ == "__main__": + run_batch() diff --git a/config.json b/config.json new file mode 100644 index 0000000..eb65acc --- /dev/null +++ b/config.json @@ -0,0 +1,22 @@ +{ + "_comment": "ChatGPT(DuckMail)", + + "total_accounts": 5, + + "duckmail_api_base": "https://api.duckmail.sbs", + "duckmail_bearer": "", + + "proxy": "", + + "output_file": "registered_accounts.txt", + "enable_oauth": true, + "oauth_required": true, + "oauth_issuer": "https://auth.openai.com", + "oauth_client_id": "app_EMoamEEZ73f0CkXaXp7hrann", + "oauth_redirect_uri": "http://localhost:1455/auth/callback", + "ak_file": "ak.txt", + "rk_file": "rk.txt", + "token_json_dir": "codex_tokens", + "upload_api_url": "http://localhost:8317/v0/management/auth-files", + "upload_api_token": "你的cpa面板登录密码" +}