mirror of
https://github.com/adminlove520/AI-Account-Toolkit.git
synced 2026-05-16 09:26:46 +08:00
- 新增 GPT_register+duckmail+CPA+autouploadsub2api (DuckMail + OAuth + Sub2Api 注册工具) - 新增 team_all-in-one (ChatGPT Team 一键注册工具) - 新增 Code-Patch 项目 - 新增 ABCard 子模块 (ChatGPT Business/Plus 自动开通) - 新增 cloudflare_temp_email 子模块 (Cloudflare 临时邮箱服务) - 添加 .gitignore 文件 - 更新 README.md (新增项目导航、子模块说明) - 添加 CHANGELOG.md
2136 lines
90 KiB
Python
2136 lines
90 KiB
Python
"""
|
||
ChatGPT 批量自动注册工具 (纯协议版) - DuckMail 临时邮箱 + Team 邀请 + Codex OAuth(CPA、SUB2API)
|
||
依赖: pip install curl_cffi
|
||
功能: 纯协议实现注册 → Team 邀请 → Codex OAuth 全流程,无需浏览器
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import uuid
|
||
import json
|
||
import random
|
||
import string
|
||
import time
|
||
import sys
|
||
import threading
|
||
import traceback
|
||
import secrets
|
||
import hashlib
|
||
import base64
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from urllib.parse import urlparse, parse_qs, urlencode, quote
|
||
|
||
from curl_cffi import requests as curl_requests
|
||
|
||
|
||
# ================= 加载配置 =================
|
||
def _load_config():
|
||
"""从 config.json 加载配置,环境变量优先级更高"""
|
||
config = {
|
||
"total_accounts": 4,
|
||
"mail_provider": "gptmail",
|
||
"gptmail_api_key": "gpt-test",
|
||
"gptmail_base": "https://mail.chatgpt.org.uk",
|
||
"npcmail_api_key": "",
|
||
"npcmail_base": "https://dash.xphdfs.me",
|
||
"npcmail_domain": "",
|
||
"cmail_api_key": "",
|
||
"cmail_base": "",
|
||
"cmail_domain": "",
|
||
"cmail_expiry_days": 7,
|
||
"proxy": "",
|
||
"output_file": "registered_accounts.txt",
|
||
"csv_file": "registered_accounts.csv",
|
||
"invite_tracker_file": "invite_tracker.json",
|
||
"enable_oauth": True,
|
||
"oauth_required": True,
|
||
"oauth_issuer": "https://auth.openai.com",
|
||
"oauth_client_id": "app_EMoamEEZ73f0CkXaXp7hrann",
|
||
"oauth_redirect_uri": "http://localhost:1455/auth/callback",
|
||
"ak_file": "ak.txt",
|
||
"rk_file": "rk.txt",
|
||
"token_json_dir": "codex_tokens",
|
||
"upload_api_url": "",
|
||
"upload_api_token": "",
|
||
"SUB2API_URL": "",
|
||
"SUB2API_TOKEN": "",
|
||
"teams": [],
|
||
}
|
||
|
||
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
|
||
if os.path.exists(config_path):
|
||
try:
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
file_config = json.load(f)
|
||
config.update(file_config)
|
||
except Exception as e:
|
||
print(f"⚠️ 加载 config.json 失败: {e}")
|
||
|
||
config["mail_provider"] = os.environ.get("MAIL_PROVIDER", config["mail_provider"])
|
||
config["gptmail_api_key"] = os.environ.get("GPTMAIL_API_KEY", config["gptmail_api_key"])
|
||
config["gptmail_base"] = os.environ.get("GPTMAIL_BASE", config["gptmail_base"])
|
||
config["npcmail_api_key"] = os.environ.get("NPCMAIL_API_KEY", config.get("npcmail_api_key", ""))
|
||
config["npcmail_base"] = os.environ.get("NPCMAIL_BASE", config.get("npcmail_base", ""))
|
||
config["npcmail_domain"] = os.environ.get("NPCMAIL_DOMAIN", config.get("npcmail_domain", ""))
|
||
config["cmail_api_key"] = os.environ.get("CMAIL_API_KEY", config["cmail_api_key"])
|
||
config["cmail_base"] = os.environ.get("CMAIL_BASE", config["cmail_base"])
|
||
config["cmail_domain"] = os.environ.get("CMAIL_DOMAIN", config["cmail_domain"])
|
||
config["cmail_expiry_days"] = int(os.environ.get("CMAIL_EXPIRY_DAYS", config["cmail_expiry_days"]))
|
||
config["proxy"] = os.environ.get("PROXY", config["proxy"])
|
||
config["total_accounts"] = int(os.environ.get("TOTAL_ACCOUNTS", config["total_accounts"]))
|
||
config["enable_oauth"] = os.environ.get("ENABLE_OAUTH", config["enable_oauth"])
|
||
config["oauth_required"] = os.environ.get("OAUTH_REQUIRED", config["oauth_required"])
|
||
config["oauth_issuer"] = os.environ.get("OAUTH_ISSUER", config["oauth_issuer"])
|
||
config["oauth_client_id"] = os.environ.get("OAUTH_CLIENT_ID", config["oauth_client_id"])
|
||
config["oauth_redirect_uri"] = os.environ.get("OAUTH_REDIRECT_URI", config["oauth_redirect_uri"])
|
||
config["ak_file"] = os.environ.get("AK_FILE", config["ak_file"])
|
||
config["rk_file"] = os.environ.get("RK_FILE", config["rk_file"])
|
||
config["token_json_dir"] = os.environ.get("TOKEN_JSON_DIR", config["token_json_dir"])
|
||
config["upload_api_url"] = os.environ.get("UPLOAD_API_URL", config["upload_api_url"])
|
||
config["upload_api_token"] = os.environ.get("UPLOAD_API_TOKEN", config["upload_api_token"])
|
||
config["SUB2API_URL"] = os.environ.get("SUB2API_URL", config["SUB2API_URL"])
|
||
config["SUB2API_TOKEN"] = os.environ.get("SUB2API_TOKEN", config["SUB2API_TOKEN"])
|
||
|
||
return config
|
||
|
||
|
||
def _as_bool(value):
|
||
if isinstance(value, bool):
|
||
return value
|
||
if value is None:
|
||
return False
|
||
return str(value).strip().lower() in {"1", "true", "yes", "y", "on"}
|
||
|
||
|
||
_CONFIG = _load_config()
|
||
MAIL_PROVIDER = (_CONFIG.get("mail_provider") or "gptmail").strip().lower()
|
||
GPTMAIL_API_KEY = _CONFIG.get("gptmail_api_key", "gpt-test")
|
||
GPTMAIL_BASE = _CONFIG.get("gptmail_base", "https://mail.chatgpt.org.uk").rstrip("/")
|
||
CMAIL_API_KEY = _CONFIG.get("npcmail_api_key") or _CONFIG.get("cmail_api_key", "")
|
||
CMAIL_BASE = (_CONFIG.get("npcmail_base") or _CONFIG.get("cmail_base", "") or "").rstrip("/")
|
||
CMAIL_DOMAIN = (_CONFIG.get("npcmail_domain") or _CONFIG.get("cmail_domain", "") or "").strip()
|
||
CMAIL_EXPIRY_DAYS = int(_CONFIG.get("cmail_expiry_days", 7) or 7)
|
||
DEFAULT_TOTAL_ACCOUNTS = _CONFIG["total_accounts"]
|
||
DEFAULT_PROXY = _CONFIG["proxy"]
|
||
DEFAULT_OUTPUT_FILE = _CONFIG["output_file"]
|
||
CSV_FILE = _CONFIG.get("csv_file", "registered_accounts.csv")
|
||
INVITE_TRACKER_FILE = _CONFIG.get("invite_tracker_file", "invite_tracker.json")
|
||
ENABLE_OAUTH = _as_bool(_CONFIG.get("enable_oauth", True))
|
||
OAUTH_REQUIRED = _as_bool(_CONFIG.get("oauth_required", True))
|
||
OAUTH_ISSUER = _CONFIG["oauth_issuer"].rstrip("/")
|
||
OAUTH_CLIENT_ID = _CONFIG["oauth_client_id"]
|
||
OAUTH_REDIRECT_URI = _CONFIG["oauth_redirect_uri"]
|
||
AK_FILE = _CONFIG["ak_file"]
|
||
RK_FILE = _CONFIG["rk_file"]
|
||
TOKEN_JSON_DIR = _CONFIG["token_json_dir"]
|
||
UPLOAD_API_URL = _CONFIG["upload_api_url"]
|
||
UPLOAD_API_TOKEN = _CONFIG["upload_api_token"]
|
||
SUB2API_URL = _CONFIG["SUB2API_URL"]
|
||
SUB2API_TOKEN = _CONFIG["SUB2API_TOKEN"]
|
||
TEAMS = _CONFIG.get("teams", [])
|
||
|
||
if not TEAMS:
|
||
print("⚠️ 警告: 未设置 teams,请在 config.json 中配置 Team 信息")
|
||
|
||
# 全局线程锁
|
||
_print_lock = threading.Lock()
|
||
_file_lock = threading.Lock()
|
||
|
||
|
||
def _mail_provider_name():
|
||
if MAIL_PROVIDER in {"cmail", "npcmail"}:
|
||
return "npcmail"
|
||
return "gptmail"
|
||
|
||
|
||
# ================= Chrome 指纹配置 =================
|
||
_CHROME_PROFILES = [
|
||
{
|
||
"major": 131, "impersonate": "chrome131",
|
||
"build": 6778, "patch_range": (69, 205),
|
||
"sec_ch_ua": '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
|
||
},
|
||
{
|
||
"major": 133, "impersonate": "chrome133a",
|
||
"build": 6943, "patch_range": (33, 153),
|
||
"sec_ch_ua": '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
|
||
},
|
||
{
|
||
"major": 136, "impersonate": "chrome136",
|
||
"build": 7103, "patch_range": (48, 175),
|
||
"sec_ch_ua": '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"',
|
||
},
|
||
{
|
||
"major": 142, "impersonate": "chrome142",
|
||
"build": 7540, "patch_range": (30, 150),
|
||
"sec_ch_ua": '"Chromium";v="142", "Google Chrome";v="142", "Not_A Brand";v="99"',
|
||
},
|
||
]
|
||
|
||
|
||
def _random_chrome_version():
|
||
profile = random.choice(_CHROME_PROFILES)
|
||
major = profile["major"]
|
||
build = profile["build"]
|
||
patch = random.randint(*profile["patch_range"])
|
||
full_ver = f"{major}.0.{build}.{patch}"
|
||
ua = f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{full_ver} Safari/537.36"
|
||
return profile["impersonate"], major, full_ver, ua, profile["sec_ch_ua"]
|
||
|
||
|
||
def _random_delay(low=0.3, high=1.0):
|
||
time.sleep(random.uniform(low, high))
|
||
|
||
|
||
def _make_trace_headers():
|
||
trace_id = random.randint(10**17, 10**18 - 1)
|
||
parent_id = random.randint(10**17, 10**18 - 1)
|
||
tp = f"00-{uuid.uuid4().hex}-{format(parent_id, '016x')}-01"
|
||
return {
|
||
"traceparent": tp, "tracestate": "dd=s:1;o:rum",
|
||
"x-datadog-origin": "rum", "x-datadog-sampling-priority": "1",
|
||
"x-datadog-trace-id": str(trace_id), "x-datadog-parent-id": str(parent_id),
|
||
}
|
||
|
||
|
||
def _generate_pkce():
|
||
code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(64)).rstrip(b"=").decode("ascii")
|
||
digest = hashlib.sha256(code_verifier.encode("ascii")).digest()
|
||
code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")
|
||
return code_verifier, code_challenge
|
||
|
||
|
||
def _generate_password(length=14):
|
||
lower = string.ascii_lowercase
|
||
upper = string.ascii_uppercase
|
||
digits = string.digits
|
||
special = "!@#$%&*"
|
||
pwd = [random.choice(lower), random.choice(upper),
|
||
random.choice(digits), random.choice(special)]
|
||
all_chars = lower + upper + digits + special
|
||
pwd += [random.choice(all_chars) for _ in range(length - 4)]
|
||
random.shuffle(pwd)
|
||
return "".join(pwd)
|
||
|
||
|
||
def _random_name():
|
||
first_names = [
|
||
"James", "Emma", "Liam", "Olivia", "Noah", "Ava", "Ethan", "Sophia",
|
||
"Lucas", "Mia", "Mason", "Isabella", "Logan", "Charlotte", "Alexander",
|
||
"Amelia", "Benjamin", "Harper", "William", "Evelyn", "Henry", "Abigail",
|
||
"Sebastian", "Emily", "Jack", "Elizabeth", "Michael", "Robert", "David",
|
||
"Joseph", "Thomas", "Christopher", "Daniel", "Matthew", "Anthony",
|
||
"Mary", "Patricia", "Jennifer", "Linda", "Barbara", "Susan", "Jessica",
|
||
"Sarah", "Karen", "Lisa", "Nancy", "Betty", "Margaret", "Sandra",
|
||
"Ashley", "Kimberly", "Donna", "Michelle", "Dorothy", "Carol",
|
||
"Amanda", "Melissa", "Deborah", "Stephanie", "Rebecca", "Sharon",
|
||
]
|
||
last_names = [
|
||
"Smith", "Johnson", "Brown", "Davis", "Wilson", "Moore", "Taylor",
|
||
"Clark", "Hall", "Young", "Anderson", "Thomas", "Jackson", "White",
|
||
"Harris", "Martin", "Thompson", "Garcia", "Robinson", "Lewis",
|
||
"Walker", "Allen", "King", "Wright", "Scott", "Green", "Adams",
|
||
"Nelson", "Baker", "Rivera", "Campbell", "Mitchell", "Carter",
|
||
"Roberts", "Phillips", "Evans", "Turner", "Diaz", "Parker",
|
||
]
|
||
return f"{random.choice(first_names)} {random.choice(last_names)}"
|
||
|
||
|
||
def _random_birthdate():
|
||
y = random.randint(1980, 2002)
|
||
m = random.randint(1, 12)
|
||
d = random.randint(1, 28)
|
||
return f"{y}-{m:02d}-{d:02d}"
|
||
|
||
|
||
# ================= Sentinel Token (PoW) =================
|
||
|
||
class SentinelTokenGenerator:
|
||
"""纯 Python 版本 sentinel token 生成器(PoW)"""
|
||
|
||
MAX_ATTEMPTS = 500000
|
||
ERROR_PREFIX = "wQ8Lk5FbGpA2NcR9dShT6gYjU7VxZ4D"
|
||
|
||
def __init__(self, device_id=None, user_agent=None):
|
||
self.device_id = device_id or str(uuid.uuid4())
|
||
self.user_agent = user_agent or (
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/145.0.0.0 Safari/537.36"
|
||
)
|
||
self.requirements_seed = str(random.random())
|
||
self.sid = str(uuid.uuid4())
|
||
|
||
@staticmethod
|
||
def _fnv1a_32(text: str):
|
||
h = 2166136261
|
||
for ch in text:
|
||
h ^= ord(ch)
|
||
h = (h * 16777619) & 0xFFFFFFFF
|
||
h ^= (h >> 16)
|
||
h = (h * 2246822507) & 0xFFFFFFFF
|
||
h ^= (h >> 13)
|
||
h = (h * 3266489909) & 0xFFFFFFFF
|
||
h ^= (h >> 16)
|
||
h &= 0xFFFFFFFF
|
||
return format(h, "08x")
|
||
|
||
def _get_config(self):
|
||
now_str = time.strftime(
|
||
"%a %b %d %Y %H:%M:%S GMT+0000 (Coordinated Universal Time)",
|
||
time.gmtime(),
|
||
)
|
||
perf_now = random.uniform(1000, 50000)
|
||
time_origin = time.time() * 1000 - perf_now
|
||
nav_prop = random.choice([
|
||
"vendorSub", "productSub", "vendor", "maxTouchPoints",
|
||
"scheduling", "userActivation", "doNotTrack", "geolocation",
|
||
"connection", "plugins", "mimeTypes", "pdfViewerEnabled",
|
||
"webkitTemporaryStorage", "webkitPersistentStorage",
|
||
"hardwareConcurrency", "cookieEnabled", "credentials",
|
||
"mediaDevices", "permissions", "locks", "ink",
|
||
])
|
||
nav_val = f"{nav_prop}-undefined"
|
||
|
||
return [
|
||
"1920x1080", now_str, 4294705152, random.random(),
|
||
self.user_agent,
|
||
"https://sentinel.openai.com/sentinel/20260124ceb8/sdk.js",
|
||
None, None, "en-US", "en-US,en", random.random(), nav_val,
|
||
random.choice(["location", "implementation", "URL", "documentURI", "compatMode"]),
|
||
random.choice(["Object", "Function", "Array", "Number", "parseFloat", "undefined"]),
|
||
perf_now, self.sid, "",
|
||
random.choice([4, 8, 12, 16]), time_origin,
|
||
]
|
||
|
||
@staticmethod
|
||
def _base64_encode(data):
|
||
raw = json.dumps(data, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
|
||
return base64.b64encode(raw).decode("ascii")
|
||
|
||
def _run_check(self, start_time, seed, difficulty, config, nonce):
|
||
config[3] = nonce
|
||
config[9] = round((time.time() - start_time) * 1000)
|
||
data = self._base64_encode(config)
|
||
hash_hex = self._fnv1a_32(seed + data)
|
||
diff_len = len(difficulty)
|
||
if hash_hex[:diff_len] <= difficulty:
|
||
return data + "~S"
|
||
return None
|
||
|
||
def generate_token(self, seed=None, difficulty=None):
|
||
seed = seed if seed is not None else self.requirements_seed
|
||
difficulty = str(difficulty or "0")
|
||
start_time = time.time()
|
||
config = self._get_config()
|
||
for i in range(self.MAX_ATTEMPTS):
|
||
result = self._run_check(start_time, seed, difficulty, config, i)
|
||
if result:
|
||
return "gAAAAAB" + result
|
||
return "gAAAAAB" + self.ERROR_PREFIX + self._base64_encode(str(None))
|
||
|
||
def generate_requirements_token(self):
|
||
config = self._get_config()
|
||
config[3] = 1
|
||
config[9] = round(random.uniform(5, 50))
|
||
data = self._base64_encode(config)
|
||
return "gAAAAAC" + data
|
||
|
||
|
||
def fetch_sentinel_challenge(session, device_id, flow="authorize_continue", user_agent=None,
|
||
sec_ch_ua=None, impersonate=None):
|
||
generator = SentinelTokenGenerator(device_id=device_id, user_agent=user_agent)
|
||
req_body = {"p": generator.generate_requirements_token(), "id": device_id, "flow": flow}
|
||
headers = {
|
||
"Content-Type": "text/plain;charset=UTF-8",
|
||
"Referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html",
|
||
"Origin": "https://sentinel.openai.com",
|
||
"User-Agent": user_agent or "Mozilla/5.0",
|
||
"sec-ch-ua": sec_ch_ua or '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"',
|
||
"sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"',
|
||
}
|
||
kwargs = {"data": json.dumps(req_body), "headers": headers, "timeout": 20}
|
||
if impersonate:
|
||
kwargs["impersonate"] = impersonate
|
||
try:
|
||
resp = session.post("https://sentinel.openai.com/backend-api/sentinel/req", **kwargs)
|
||
except Exception:
|
||
return None
|
||
if resp.status_code != 200:
|
||
return None
|
||
try:
|
||
return resp.json()
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def build_sentinel_token(session, device_id, flow="authorize_continue", user_agent=None,
|
||
sec_ch_ua=None, impersonate=None):
|
||
challenge = fetch_sentinel_challenge(session, device_id, flow=flow, user_agent=user_agent,
|
||
sec_ch_ua=sec_ch_ua, impersonate=impersonate)
|
||
if not challenge:
|
||
return None
|
||
c_value = challenge.get("token", "")
|
||
if not c_value:
|
||
return None
|
||
pow_data = challenge.get("proofofwork") or {}
|
||
generator = SentinelTokenGenerator(device_id=device_id, user_agent=user_agent)
|
||
if pow_data.get("required") and pow_data.get("seed"):
|
||
p_value = generator.generate_token(seed=pow_data.get("seed"), difficulty=pow_data.get("difficulty", "0"))
|
||
else:
|
||
p_value = generator.generate_requirements_token()
|
||
return json.dumps({"p": p_value, "t": "", "c": c_value, "id": device_id, "flow": flow}, separators=(",", ":"))
|
||
|
||
|
||
def _extract_code_from_url(url: str):
|
||
if not url or "code=" not in url:
|
||
return None
|
||
try:
|
||
return parse_qs(urlparse(url).query).get("code", [None])[0]
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _decode_jwt_payload(token: str):
|
||
try:
|
||
parts = token.split(".")
|
||
if len(parts) != 3:
|
||
return {}
|
||
payload = parts[1]
|
||
padding = 4 - len(payload) % 4
|
||
if padding != 4:
|
||
payload += "=" * padding
|
||
decoded = base64.urlsafe_b64decode(payload)
|
||
return json.loads(decoded)
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
# ================= Token 保存与上传 =================
|
||
def _build_default_model_mapping() -> dict:
|
||
return {
|
||
"gpt-3.5-turbo": "gpt-3.5-turbo",
|
||
"gpt-3.5-turbo-0125": "gpt-3.5-turbo-0125",
|
||
"gpt-3.5-turbo-1106": "gpt-3.5-turbo-1106",
|
||
"gpt-3.5-turbo-16k": "gpt-3.5-turbo-16k",
|
||
"gpt-4": "gpt-4",
|
||
"gpt-4-turbo": "gpt-4-turbo",
|
||
"gpt-4-turbo-preview": "gpt-4-turbo-preview",
|
||
"gpt-4o": "gpt-4o",
|
||
"gpt-4o-2024-08-06": "gpt-4o-2024-08-06",
|
||
"gpt-4o-2024-11-20": "gpt-4o-2024-11-20",
|
||
"gpt-4o-mini": "gpt-4o-mini",
|
||
"gpt-4o-mini-2024-07-18": "gpt-4o-mini-2024-07-18",
|
||
"gpt-4.5-preview": "gpt-4.5-preview",
|
||
"gpt-4.1": "gpt-4.1",
|
||
"gpt-4.1-mini": "gpt-4.1-mini",
|
||
"gpt-4.1-nano": "gpt-4.1-nano",
|
||
"o1": "o1",
|
||
"o1-preview": "o1-preview",
|
||
"o1-mini": "o1-mini",
|
||
"o1-pro": "o1-pro",
|
||
"o3": "o3",
|
||
"o3-mini": "o3-mini",
|
||
"o3-pro": "o3-pro",
|
||
"o4-mini": "o4-mini",
|
||
"gpt-5": "gpt-5",
|
||
"gpt-5-2025-08-07": "gpt-5-2025-08-07",
|
||
"gpt-5-chat": "gpt-5-chat",
|
||
"gpt-5-chat-latest": "gpt-5-chat-latest",
|
||
"gpt-5-codex": "gpt-5-codex",
|
||
"gpt-5.3-codex-spark": "gpt-5.3-codex-spark",
|
||
"gpt-5-pro": "gpt-5-pro",
|
||
"gpt-5-pro-2025-10-06": "gpt-5-pro-2025-10-06",
|
||
"gpt-5-mini": "gpt-5-mini",
|
||
"gpt-5-mini-2025-08-07": "gpt-5-mini-2025-08-07",
|
||
"gpt-5-nano": "gpt-5-nano",
|
||
"gpt-5-nano-2025-08-07": "gpt-5-nano-2025-08-07",
|
||
"gpt-5.1": "gpt-5.1",
|
||
"gpt-5.1-2025-11-13": "gpt-5.1-2025-11-13",
|
||
"gpt-5.1-chat-latest": "gpt-5.1-chat-latest",
|
||
"gpt-5.1-codex": "gpt-5.1-codex",
|
||
"gpt-5.1-codex-max": "gpt-5.1-codex-max",
|
||
"gpt-5.1-codex-mini": "gpt-5.1-codex-mini",
|
||
"gpt-5.2": "gpt-5.2",
|
||
"gpt-5.2-2025-12-11": "gpt-5.2-2025-12-11",
|
||
"gpt-5.2-chat-latest": "gpt-5.2-chat-latest",
|
||
"gpt-5.2-codex": "gpt-5.2-codex",
|
||
"gpt-5.2-pro": "gpt-5.2-pro",
|
||
"gpt-5.2-pro-2025-12-11": "gpt-5.2-pro-2025-12-11",
|
||
"gpt-5.4": "gpt-5.4",
|
||
"gpt-5.4-2026-03-05": "gpt-5.4-2026-03-05",
|
||
"gpt-5.3-codex": "gpt-5.3-codex",
|
||
"chatgpt-4o-latest": "chatgpt-4o-latest",
|
||
"gpt-4o-audio-preview": "gpt-4o-audio-preview",
|
||
"gpt-4o-realtime-preview": "gpt-4o-realtime-preview",
|
||
}
|
||
|
||
|
||
def _build_codex_account_payload(email: str, tokens: dict) -> dict:
|
||
"""将 OAuth token 转换为 codex.csun.site /api/v1/admin/accounts 所需的 payload 格式"""
|
||
access_token = tokens.get("access_token", "")
|
||
refresh_token = tokens.get("refresh_token", "")
|
||
id_token = tokens.get("id_token", "")
|
||
expires_in = tokens.get("expires_in", 863999)
|
||
|
||
# 从 access_token JWT 中提取字段
|
||
at_payload = _decode_jwt_payload(access_token) if access_token else {}
|
||
at_auth = at_payload.get("https://api.openai.com/auth", {})
|
||
chatgpt_account_id = at_auth.get("chatgpt_account_id", "")
|
||
chatgpt_user_id = at_auth.get("chatgpt_user_id", "")
|
||
exp_timestamp = at_payload.get("exp", 0)
|
||
expires_at = exp_timestamp if isinstance(exp_timestamp, int) and exp_timestamp > 0 else int(time.time()) + expires_in
|
||
|
||
# 从 id_token JWT 中提取 organization_id
|
||
it_payload = _decode_jwt_payload(id_token) if id_token else {}
|
||
it_auth = it_payload.get("https://api.openai.com/auth", {})
|
||
organization_id = it_auth.get("organization_id", "")
|
||
if not organization_id:
|
||
orgs = it_auth.get("organizations", [])
|
||
if orgs:
|
||
organization_id = (orgs[0] or {}).get("id", "")
|
||
|
||
return {
|
||
"name": email,
|
||
"notes": "",
|
||
"platform": "openai",
|
||
"type": "oauth",
|
||
"credentials": {
|
||
"access_token": access_token,
|
||
"refresh_token": refresh_token,
|
||
"expires_in": expires_in,
|
||
"expires_at": expires_at,
|
||
"client_id": OAUTH_CLIENT_ID,
|
||
"chatgpt_account_id": chatgpt_account_id,
|
||
"chatgpt_user_id": chatgpt_user_id,
|
||
"organization_id": organization_id,
|
||
"model_mapping": _build_default_model_mapping(),
|
||
},
|
||
"extra": {
|
||
"email": email,
|
||
"openai_oauth_responses_websockets_v2_mode": "off",
|
||
"openai_oauth_responses_websockets_v2_enabled": False,
|
||
},
|
||
"proxy_id": None,
|
||
"concurrency": 10,
|
||
"priority": 1,
|
||
"rate_multiplier": 1,
|
||
"group_ids": [2], #根据实际情况修改分组
|
||
"expires_at": None,
|
||
"auto_pause_on_expired": True,
|
||
}
|
||
|
||
def _save_codex_tokens(email: str, tokens: dict):
|
||
access_token = tokens.get("access_token", "")
|
||
refresh_token = tokens.get("refresh_token", "")
|
||
id_token = tokens.get("id_token", "")
|
||
|
||
if access_token:
|
||
with _file_lock:
|
||
with open(AK_FILE, "a", encoding="utf-8") as f:
|
||
f.write(f"{access_token}\n")
|
||
|
||
if refresh_token:
|
||
with _file_lock:
|
||
with open(RK_FILE, "a", encoding="utf-8") as f:
|
||
f.write(f"{refresh_token}\n")
|
||
|
||
if not access_token:
|
||
return
|
||
|
||
payload = _decode_jwt_payload(access_token)
|
||
auth_info = payload.get("https://api.openai.com/auth", {})
|
||
account_id = auth_info.get("chatgpt_account_id", "")
|
||
|
||
exp_timestamp = payload.get("exp")
|
||
expired_str = ""
|
||
if isinstance(exp_timestamp, int) and exp_timestamp > 0:
|
||
from datetime import datetime, timezone
|
||
exp_dt = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc)
|
||
expired_str = exp_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
||
from datetime import datetime, timezone, timedelta
|
||
now = datetime.now(tz=timezone.utc)
|
||
token_data = {
|
||
"id_token": id_token,
|
||
"access_token": access_token,
|
||
"refresh_token": refresh_token,
|
||
"account_id": account_id,
|
||
"last_refresh": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||
"email": email,
|
||
"type": "codex",
|
||
"expired": expired_str,
|
||
"uploaded_platforms": [],
|
||
"cpa_uploaded": False,
|
||
"cpa_synced": False,
|
||
"uploaded_at": {},
|
||
}
|
||
|
||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
token_dir = TOKEN_JSON_DIR if os.path.isabs(TOKEN_JSON_DIR) else os.path.join(base_dir, TOKEN_JSON_DIR)
|
||
os.makedirs(token_dir, exist_ok=True)
|
||
token_path = os.path.join(token_dir, f"{email}.json")
|
||
with _file_lock:
|
||
with open(token_path, "w", encoding="utf-8") as f:
|
||
json.dump(token_data, f, ensure_ascii=False)
|
||
|
||
if UPLOAD_API_URL:
|
||
# 推送到 CPA
|
||
_upload_token_json(token_path)
|
||
|
||
if SUB2API_URL and SUB2API_TOKEN:
|
||
# 推送到 SUB2API
|
||
try:
|
||
api_payload = _build_codex_account_payload(email, tokens)
|
||
print(f"[SUB2API] 开始推送至 SUB2API:")
|
||
print(api_payload)
|
||
resp = curl_requests.post(
|
||
SUB2API_URL,
|
||
headers={
|
||
"Authorization": f"Bearer {SUB2API_TOKEN}",
|
||
"Content-Type": "application/json",
|
||
"Accept": "application/json, text/plain, */*",
|
||
"Referer": SUB2API_URL.replace("/api/v1/admin/accounts", "/admin/accounts"),
|
||
},
|
||
json=api_payload,
|
||
timeout=30,
|
||
)
|
||
print(f"[SUB2API] POST {SUB2API_URL} -> {resp.status_code}")
|
||
if resp.status_code not in (200, 201):
|
||
print(f"[SUB2API] 响应: {resp.text[:300]}")
|
||
except Exception as e:
|
||
print(f"[SUB2API 请求失败: {e}")
|
||
else:
|
||
print("[SUB2API] 未配置 SUB2API_URL 或 SUB2API_TOKEN,跳过推送")
|
||
|
||
|
||
def _upload_token_json(filepath):
|
||
mp = None
|
||
try:
|
||
from curl_cffi import CurlMime
|
||
filename = os.path.basename(filepath)
|
||
mp = CurlMime()
|
||
mp.addpart(name="file", content_type="application/json", filename=filename, local_path=filepath)
|
||
session = curl_requests.Session()
|
||
if DEFAULT_PROXY:
|
||
session.proxies = {"http": DEFAULT_PROXY, "https": DEFAULT_PROXY}
|
||
resp = session.post(UPLOAD_API_URL, multipart=mp,
|
||
headers={"Authorization": f"Bearer {UPLOAD_API_TOKEN}"},
|
||
verify=False, timeout=30)
|
||
if resp.status_code == 200:
|
||
with _print_lock:
|
||
print(f" [CPA] Token JSON 已上传到 CPA 管理平台")
|
||
else:
|
||
with _print_lock:
|
||
print(f" [CPA] 上传失败: {resp.status_code} - {resp.text[:200]}")
|
||
except Exception as e:
|
||
with _print_lock:
|
||
print(f" [CPA] 上传异常: {e}")
|
||
finally:
|
||
if mp:
|
||
mp.close()
|
||
|
||
|
||
# ================= Team 邀请 =================
|
||
|
||
def load_invite_tracker():
|
||
default = {"teams": {team["account_id"]: [] for team in TEAMS}}
|
||
if os.path.exists(INVITE_TRACKER_FILE):
|
||
try:
|
||
with open(INVITE_TRACKER_FILE, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, dict) and "teams" in data:
|
||
return data
|
||
except Exception as e:
|
||
print(f"⚠️ Failed to load invite tracker: {e}")
|
||
return default
|
||
|
||
|
||
def save_invite_tracker(tracker):
|
||
"""保存 invite tracker(调用者应已持有 _file_lock)"""
|
||
try:
|
||
with open(INVITE_TRACKER_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(tracker, f, ensure_ascii=False, indent=2)
|
||
except Exception as e:
|
||
print(f"⚠️ Failed to save invite tracker: {e}")
|
||
|
||
|
||
def get_available_team(tracker):
|
||
for team in TEAMS:
|
||
account_id = team["account_id"]
|
||
invited = tracker["teams"].get(account_id, [])
|
||
if len(invited) < team["max_invites"]:
|
||
return team
|
||
return None
|
||
|
||
|
||
def _get_fresh_access_token(team: dict, tag: str = ""):
|
||
"""使用 session_token (cookie) 从 /api/auth/session 获取 fresh access_token"""
|
||
prefix = f"[{tag}] " if tag else ""
|
||
session_token = team.get("session_token", "")
|
||
if not session_token:
|
||
# 降级: 直接使用 auth_token
|
||
return team.get("auth_token", "")
|
||
|
||
s = curl_requests.Session(verify=False, impersonate="chrome120")
|
||
if DEFAULT_PROXY:
|
||
s.proxies = {"http": DEFAULT_PROXY, "https": DEFAULT_PROXY}
|
||
s.cookies.set("__Secure-next-auth.session-token", session_token, domain="chatgpt.com")
|
||
|
||
try:
|
||
r = s.get("https://chatgpt.com/api/auth/session", timeout=30)
|
||
if r.status_code == 200:
|
||
data = r.json()
|
||
fresh_token = data.get("accessToken", "")
|
||
if fresh_token:
|
||
with _print_lock:
|
||
print(f"{prefix}🔑 获取 fresh access_token 成功 (expires: {data.get('expires', '?')})")
|
||
return f"Bearer {fresh_token}"
|
||
with _print_lock:
|
||
print(f"{prefix}⚠️ 获取 fresh token 失败 (status={r.status_code}),降级使用 auth_token")
|
||
except Exception as e:
|
||
with _print_lock:
|
||
print(f"{prefix}⚠️ 获取 fresh token 异常: {e},降级使用 auth_token")
|
||
return team.get("auth_token", "")
|
||
|
||
|
||
def invite_to_team(email: str, team: dict, tag: str = ""):
|
||
"""通过协议发送 Team 邀请 (自动刷新 access_token)"""
|
||
prefix = f"[{tag}] " if tag else ""
|
||
|
||
# 获取 fresh access token
|
||
auth_token = _get_fresh_access_token(team, tag)
|
||
|
||
session = curl_requests.Session(verify=False, impersonate="chrome120")
|
||
if DEFAULT_PROXY:
|
||
session.proxies = {"http": DEFAULT_PROXY, "https": DEFAULT_PROXY}
|
||
|
||
headers = {
|
||
"accept": "*/*",
|
||
"accept-language": "en-US,en;q=0.9",
|
||
"authorization": auth_token,
|
||
"chatgpt-account-id": team["account_id"],
|
||
"content-type": "application/json",
|
||
"origin": "https://chatgpt.com",
|
||
"referer": "https://chatgpt.com/",
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
|
||
}
|
||
payload = {
|
||
"email_addresses": [email],
|
||
"role": "standard-user",
|
||
"resend_emails": True,
|
||
}
|
||
invite_url = f"https://chatgpt.com/backend-api/accounts/{team['account_id']}/invites"
|
||
try:
|
||
response = session.post(invite_url, headers=headers, json=payload, timeout=30)
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
with _print_lock:
|
||
print(f"{prefix}[Invite] 响应: {json.dumps(result, ensure_ascii=False)[:300]}")
|
||
if result.get("account_invites"):
|
||
with _print_lock:
|
||
print(f"{prefix}✅ Successfully invited {email} to {team['name']}")
|
||
return True
|
||
elif result.get("errored_emails"):
|
||
with _print_lock:
|
||
print(f"{prefix}⚠️ Invite error for {email}: {result['errored_emails']}")
|
||
return False
|
||
else:
|
||
# 响应200但结构不符预期,视为成功(某些情况下接口直接返回空对象)
|
||
with _print_lock:
|
||
print(f"{prefix}⚠️ Invite 响应结构未知,视为已发送: {email}")
|
||
return True
|
||
else:
|
||
with _print_lock:
|
||
print(f"{prefix}❌ Failed to invite {email}: HTTP {response.status_code}")
|
||
print(f"{prefix} Response: {response.text[:200]}")
|
||
return False
|
||
except Exception as e:
|
||
with _print_lock:
|
||
print(f"{prefix}❌ Invite request failed: {e}")
|
||
return False
|
||
|
||
|
||
def auto_invite_to_team(email: str, tag: str = ""):
|
||
"""自动选择可用 Team 并发送邀请"""
|
||
if not TEAMS:
|
||
with _print_lock:
|
||
print(f"[{tag}] ⚠️ 未配置 teams,跳过邀请" if tag else "⚠️ 未配置 teams,跳过邀请")
|
||
return False
|
||
|
||
with _file_lock:
|
||
tracker = load_invite_tracker()
|
||
for account_id, emails in tracker["teams"].items():
|
||
if email in emails:
|
||
with _print_lock:
|
||
print(f"[{tag}] ⚠️ {email} already invited, skipping" if tag else f"⚠️ {email} already invited")
|
||
return False
|
||
team = get_available_team(tracker)
|
||
if not team:
|
||
with _print_lock:
|
||
print(f"[{tag}] ❌ All teams are full" if tag else "❌ All teams are full")
|
||
return False
|
||
|
||
if invite_to_team(email, team, tag=tag):
|
||
with _file_lock:
|
||
tracker = load_invite_tracker()
|
||
account_id = team["account_id"]
|
||
if account_id not in tracker["teams"]:
|
||
tracker["teams"][account_id] = []
|
||
tracker["teams"][account_id].append(email)
|
||
save_invite_tracker(tracker)
|
||
count = len(tracker["teams"][account_id])
|
||
with _print_lock:
|
||
print(f"[{tag}] Team status: {team['name']} has {count}/{team['max_invites']} invites"
|
||
if tag else f" Team status: {team['name']} has {count}/{team['max_invites']} invites")
|
||
return True
|
||
return False
|
||
|
||
|
||
# ================= CSV 保存 =================
|
||
|
||
def save_to_csv(email: str, password: str, dm_password: str = "", oauth_status: str = ""):
|
||
import csv
|
||
file_exists = os.path.exists(CSV_FILE)
|
||
with _file_lock:
|
||
with open(CSV_FILE, "a", newline="", encoding="utf-8") as f:
|
||
writer = csv.writer(f)
|
||
if not file_exists:
|
||
writer.writerow(["email", "password", "duckmail_password", "oauth_status", "timestamp"])
|
||
writer.writerow([email, password, dm_password, oauth_status, time.strftime("%Y-%m-%d %H:%M:%S")])
|
||
|
||
|
||
# ================= ChatGPTRegister 核心类 =================
|
||
|
||
class ChatGPTRegister:
|
||
BASE = "https://chatgpt.com"
|
||
AUTH = "https://auth.openai.com"
|
||
|
||
def __init__(self, proxy: str = None, tag: str = ""):
|
||
self.tag = tag
|
||
self.device_id = str(uuid.uuid4())
|
||
self.auth_session_logging_id = str(uuid.uuid4())
|
||
self.impersonate, self.chrome_major, self.chrome_full, self.ua, self.sec_ch_ua = _random_chrome_version()
|
||
|
||
self.session = curl_requests.Session(impersonate=self.impersonate, verify=False)
|
||
self.proxy = proxy
|
||
if self.proxy:
|
||
self.session.proxies = {"http": self.proxy, "https": self.proxy}
|
||
|
||
self.session.headers.update({
|
||
"User-Agent": self.ua,
|
||
"Accept-Language": random.choice([
|
||
"en-US,en;q=0.9", "en-US,en;q=0.9,zh-CN;q=0.8",
|
||
"en,en-US;q=0.9", "en-US,en;q=0.8",
|
||
]),
|
||
"sec-ch-ua": self.sec_ch_ua, "sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": '"Windows"', "sec-ch-ua-arch": '"x86"',
|
||
"sec-ch-ua-bitness": '"64"',
|
||
"sec-ch-ua-full-version": f'"{self.chrome_full}"',
|
||
"sec-ch-ua-platform-version": f'"{random.randint(10, 15)}.0.0"',
|
||
})
|
||
self.session.cookies.set("oai-did", self.device_id, domain="chatgpt.com")
|
||
self._callback_url = None
|
||
|
||
def _print(self, msg):
|
||
prefix = f"[{self.tag}] " if self.tag else ""
|
||
with _print_lock:
|
||
print(f"{prefix}{msg}")
|
||
|
||
def _log(self, step, method, url, status, body=None):
|
||
prefix = f"[{self.tag}] " if self.tag else ""
|
||
lines = [f"\n{'='*60}", f"{prefix}[Step] {step}", f"{prefix}[{method}] {url}",
|
||
f"{prefix}[Status] {status}"]
|
||
if body:
|
||
try:
|
||
lines.append(f"{prefix}[Response] {json.dumps(body, indent=2, ensure_ascii=False)[:1000]}")
|
||
except Exception:
|
||
lines.append(f"{prefix}[Response] {str(body)[:1000]}")
|
||
lines.append(f"{'='*60}")
|
||
with _print_lock:
|
||
print("\n".join(lines))
|
||
|
||
# ---- DuckMail (使用标准 requests,避免 curl_cffi TLS 超时) ----
|
||
|
||
def _create_duckmail_session(self):
|
||
"""使用标准 requests + retry 策略(与 cpa.py 保持一致)"""
|
||
import requests as std_requests
|
||
from requests.adapters import HTTPAdapter
|
||
from urllib3.util.retry import Retry
|
||
session = std_requests.Session()
|
||
retry_strategy = Retry(
|
||
total=5, backoff_factor=1,
|
||
status_forcelist=[429, 500, 502, 503, 504],
|
||
allowed_methods=["HEAD", "GET", "POST", "OPTIONS"],
|
||
)
|
||
adapter = HTTPAdapter(max_retries=retry_strategy)
|
||
session.mount("https://", adapter)
|
||
session.mount("http://", adapter)
|
||
session.headers.update({
|
||
"User-Agent": self.ua, "Accept": "application/json", "Content-Type": "application/json",
|
||
})
|
||
if self.proxy:
|
||
session.proxies = {"http": self.proxy, "https": self.proxy}
|
||
return session
|
||
|
||
def create_temp_email(self):
|
||
"""使用 GPTMail API 生成临时邮箱,返回 (email, password, email_address)"""
|
||
session = self._create_duckmail_session()
|
||
headers = {"X-API-Key": GPTMAIL_API_KEY}
|
||
max_retries = 5
|
||
for attempt in range(max_retries):
|
||
try:
|
||
chars = string.ascii_lowercase + string.digits
|
||
ts = int(time.time()) % 100000
|
||
prefix = f"t{ts}" + "".join(random.choices(chars, k=8))
|
||
print(f" GPTMail 生成邮箱 (第{attempt+1}次), prefix={prefix}")
|
||
res = session.post(
|
||
f"{GPTMAIL_BASE}/api/generate-email",
|
||
json={"prefix": prefix},
|
||
headers=headers,
|
||
timeout=30,
|
||
)
|
||
if res.status_code == 200:
|
||
data = res.json()
|
||
if data.get("success") and data.get("data", {}).get("email"):
|
||
email = data["data"]["email"]
|
||
password = _generate_password()
|
||
print(f" ✅ GPTMail 邮箱生成成功: {email}")
|
||
# mail_token 直接复用 email 地址(查收件箱用 email 参数)
|
||
return email, password, email
|
||
raise Exception(f"GPTMail 返回异常: {res.text[:200]}")
|
||
else:
|
||
raise Exception(f"GPTMail HTTP {res.status_code}: {res.text[:200]}")
|
||
except Exception as e:
|
||
print(f" ⚠️ GPTMail 重试 {attempt+1}/{max_retries}: {e}")
|
||
time.sleep(1)
|
||
raise Exception("GPTMail 创建邮箱失败: 超过最大重试次数")
|
||
|
||
def _fetch_emails_duckmail(self, mail_token: str):
|
||
"""mail_token 此处为 email 地址,通过 GPTMail API 获取邮件列表"""
|
||
try:
|
||
session = self._create_duckmail_session()
|
||
res = session.get(
|
||
f"{GPTMAIL_BASE}/api/emails",
|
||
params={"email": mail_token},
|
||
headers={"X-API-Key": GPTMAIL_API_KEY},
|
||
timeout=30,
|
||
)
|
||
if res.status_code == 200:
|
||
data = res.json()
|
||
if data.get("success"):
|
||
msgs = data.get("data", {}).get("emails", [])
|
||
if msgs:
|
||
print(f" [DEBUG] Fetched {len(msgs)} email(s)")
|
||
return msgs
|
||
else:
|
||
print(f" [DEBUG] Fetch emails failed: {res.status_code} {res.text[:100]}")
|
||
except Exception as e:
|
||
print(f" [DEBUG] _fetch_emails_duckmail error: {e}")
|
||
return []
|
||
|
||
def _REMOVED_fetch_emails_duckmail_OLD(self, mail_token: str):
|
||
# 已废弃,占位保留
|
||
return []
|
||
|
||
def _fetch_email_detail_duckmail(self, mail_token: str, msg_id: str):
|
||
"""通过 GPTMail API 读取单封邮件详情,mail_token 不使用(保留签名兼容)"""
|
||
try:
|
||
session = self._create_duckmail_session()
|
||
res = session.get(
|
||
f"{GPTMAIL_BASE}/api/email/{msg_id}",
|
||
headers={"X-API-Key": GPTMAIL_API_KEY},
|
||
timeout=30,
|
||
)
|
||
if res.status_code == 200:
|
||
data = res.json()
|
||
if data.get("success"):
|
||
detail = data.get("data", {})
|
||
# 统一 html/text 字段
|
||
html_val = detail.get("html_content") or detail.get("content", "")
|
||
text_val = detail.get("content", "")
|
||
detail["html"] = html_val
|
||
detail["text"] = text_val
|
||
return detail
|
||
except Exception as e:
|
||
print(f" [DEBUG] _fetch_email_detail_duckmail error: {e}")
|
||
return None
|
||
|
||
def _extract_verification_code(self, email_content: str):
|
||
if not email_content:
|
||
return None
|
||
patterns = [
|
||
r"Verification code:?\s*(\d{6})", r"code is\s*(\d{6})",
|
||
r"代码为[::]?\s*(\d{6})", r"验证码[::]?\s*(\d{6})",
|
||
r">\s*(\d{6})\s*<", r"(?<![#&])\b(\d{6})\b",
|
||
]
|
||
for pattern in patterns:
|
||
matches = re.findall(pattern, email_content, re.IGNORECASE)
|
||
for code in matches:
|
||
if code == "177010":
|
||
continue
|
||
return code
|
||
return None
|
||
|
||
def wait_for_verification_email(self, mail_token: str, timeout: int = 120):
|
||
self._print(f"[OTP] 等待验证码邮件 (最多 {timeout}s)...")
|
||
start_time = time.time()
|
||
while time.time() - start_time < timeout:
|
||
messages = self._fetch_emails_duckmail(mail_token)
|
||
if messages and len(messages) > 0:
|
||
first_msg = messages[0]
|
||
print(f" [DEBUG] Message keys: {list(first_msg.keys())}")
|
||
# Try multiple possible id fields
|
||
msg_id = first_msg.get("id") or first_msg.get("@id") or first_msg.get("message_id")
|
||
|
||
# First try: extract OTP directly from list item (worker may embed content)
|
||
inline_content = first_msg.get("text") or first_msg.get("html") or first_msg.get("raw") or first_msg.get("source") or ""
|
||
if inline_content:
|
||
code = self._extract_verification_code(inline_content)
|
||
if code:
|
||
self._print(f"[OTP] 验证码: {code}")
|
||
return code
|
||
|
||
# Second try: fetch detail by msg_id
|
||
if msg_id:
|
||
detail = self._fetch_email_detail_duckmail(mail_token, str(msg_id))
|
||
if detail:
|
||
content = detail.get("text") or detail.get("html") or detail.get("source") or ""
|
||
code = self._extract_verification_code(content)
|
||
if code:
|
||
self._print(f"[OTP] 验证码: {code}")
|
||
return code
|
||
elapsed = int(time.time() - start_time)
|
||
self._print(f"[OTP] 等待中... ({elapsed}s/{timeout}s)")
|
||
time.sleep(3)
|
||
self._print(f"[OTP] 超时 ({timeout}s)")
|
||
return None
|
||
|
||
# ---- 注册流程 ----
|
||
|
||
def create_temp_email(self):
|
||
provider = _mail_provider_name()
|
||
if provider == "npcmail":
|
||
if not CMAIL_BASE or not CMAIL_API_KEY:
|
||
raise Exception("NPCmail 未配置 API 域名或 API Key,请在 config.json 中补充 npcmail_base,并确认前端已保存 npcmail_api_key")
|
||
session = self._create_duckmail_session()
|
||
payload = {"count": 1, "expiryDays": CMAIL_EXPIRY_DAYS}
|
||
if CMAIL_DOMAIN:
|
||
payload["domain"] = CMAIL_DOMAIN
|
||
res = session.post(
|
||
f"{CMAIL_BASE}/api/public/batch-create-emails",
|
||
json=payload,
|
||
headers={"X-API-Key": CMAIL_API_KEY},
|
||
timeout=30,
|
||
)
|
||
if res.status_code != 200:
|
||
raise Exception(f"NPCmail HTTP {res.status_code}: {res.text[:200]}")
|
||
data = res.json()
|
||
emails = data.get("emails") or []
|
||
if not data.get("success") or not emails:
|
||
raise Exception(f"NPCmail response error: {res.text[:200]}")
|
||
created = emails[0]
|
||
address = (created.get("address") or "").strip()
|
||
if not address:
|
||
raise Exception("NPCmail 创建邮箱成功但未返回 address")
|
||
pin_code = (created.get("pin_code") or "").strip()
|
||
print(f" [OK] NPCmail email created: {address}")
|
||
return address, pin_code, address
|
||
|
||
session = self._create_duckmail_session()
|
||
headers = {"X-API-Key": GPTMAIL_API_KEY}
|
||
max_retries = 5
|
||
for attempt in range(max_retries):
|
||
try:
|
||
chars = string.ascii_lowercase + string.digits
|
||
ts = int(time.time()) % 100000
|
||
prefix = f"t{ts}" + "".join(random.choices(chars, k=8))
|
||
print(f" GPTMail creating email ({attempt+1}/{max_retries}), prefix={prefix}")
|
||
res = session.post(
|
||
f"{GPTMAIL_BASE}/api/generate-email",
|
||
json={"prefix": prefix},
|
||
headers=headers,
|
||
timeout=30,
|
||
)
|
||
if res.status_code == 200:
|
||
data = res.json()
|
||
if data.get("success") and data.get("data", {}).get("email"):
|
||
email = data["data"]["email"]
|
||
password = _generate_password()
|
||
print(f" [OK] GPTMail email created: {email}")
|
||
return email, password, email
|
||
raise Exception(f"GPTMail response error: {res.text[:200]}")
|
||
raise Exception(f"GPTMail HTTP {res.status_code}: {res.text[:200]}")
|
||
except Exception as e:
|
||
print(f" [WARN] GPTMail retry {attempt+1}/{max_retries}: {e}")
|
||
time.sleep(1)
|
||
raise Exception("GPTMail create email failed: exceeded max retries")
|
||
|
||
def _fetch_emails(self, mail_token: str):
|
||
if _mail_provider_name() == "npcmail":
|
||
try:
|
||
session = self._create_duckmail_session()
|
||
encoded_address = quote(mail_token, safe="")
|
||
res = session.get(
|
||
f"{CMAIL_BASE}/api/public/emails/{encoded_address}/messages",
|
||
headers={"X-API-Key": CMAIL_API_KEY},
|
||
timeout=30,
|
||
)
|
||
if res.status_code == 200:
|
||
data = res.json()
|
||
if isinstance(data, list):
|
||
if data:
|
||
print(f" [DEBUG] Fetched {len(data)} email(s)")
|
||
return data
|
||
else:
|
||
print(f" [DEBUG] Fetch NPCmail emails failed: {res.status_code} {res.text[:100]}")
|
||
except Exception as e:
|
||
print(f" [DEBUG] _fetch_emails_npcmail error: {e}")
|
||
return []
|
||
return self._fetch_emails_duckmail(mail_token)
|
||
|
||
def _fetch_email_detail(self, mail_token: str, msg_id: str):
|
||
if _mail_provider_name() == "npcmail":
|
||
return None
|
||
return self._fetch_email_detail_duckmail(mail_token, msg_id)
|
||
|
||
def _extract_codes_cmail(self, addresses: list[str]):
|
||
if not addresses:
|
||
return []
|
||
try:
|
||
session = self._create_duckmail_session()
|
||
res = session.post(
|
||
f"{CMAIL_BASE}/api/public/extract-codes",
|
||
json={"addresses": addresses},
|
||
headers={"X-API-Key": CMAIL_API_KEY},
|
||
timeout=30,
|
||
)
|
||
if res.status_code == 200:
|
||
data = res.json()
|
||
return data if isinstance(data, list) else []
|
||
print(f" [DEBUG] Extract NPCmail codes failed: {res.status_code} {res.text[:100]}")
|
||
except Exception as e:
|
||
print(f" [DEBUG] _extract_codes_npcmail error: {e}")
|
||
return []
|
||
|
||
def wait_for_verification_email(self, mail_token: str, timeout: int = 120):
|
||
self._print(f"[OTP] 绛夊緟楠岃瘉鐮侀偖浠?(鏈€澶?{timeout}s)...")
|
||
start_time = time.time()
|
||
while time.time() - start_time < timeout:
|
||
if _mail_provider_name() == "npcmail":
|
||
extracted = self._extract_codes_cmail([mail_token])
|
||
if extracted:
|
||
match = extracted[0]
|
||
code = match.get("code")
|
||
if code:
|
||
self._print(f"[OTP] 楠岃瘉鐮? {code}")
|
||
return str(code)
|
||
|
||
messages = self._fetch_emails(mail_token)
|
||
if messages:
|
||
first_msg = messages[0]
|
||
print(f" [DEBUG] Message keys: {list(first_msg.keys())}")
|
||
msg_id = first_msg.get("id") or first_msg.get("@id") or first_msg.get("message_id")
|
||
inline_content = first_msg.get("text") or first_msg.get("body") or first_msg.get("html") or first_msg.get("raw") or first_msg.get("source") or ""
|
||
if inline_content:
|
||
code = self._extract_verification_code(inline_content)
|
||
if code:
|
||
self._print(f"[OTP] 楠岃瘉鐮? {code}")
|
||
return code
|
||
if msg_id:
|
||
detail = self._fetch_email_detail(mail_token, str(msg_id))
|
||
if detail:
|
||
content = detail.get("text") or detail.get("body") or detail.get("html") or detail.get("source") or ""
|
||
code = self._extract_verification_code(content)
|
||
if code:
|
||
self._print(f"[OTP] 楠岃瘉鐮? {code}")
|
||
return code
|
||
elapsed = int(time.time() - start_time)
|
||
self._print(f"[OTP] 绛夊緟涓?.. ({elapsed}s/{timeout}s)")
|
||
time.sleep(3)
|
||
self._print(f"[OTP] 瓒呮椂 ({timeout}s)")
|
||
return None
|
||
|
||
def visit_homepage(self):
|
||
url = f"{self.BASE}/"
|
||
r = self.session.get(url, headers={
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
|
||
"Upgrade-Insecure-Requests": "1",
|
||
}, allow_redirects=True)
|
||
self._log("0. Visit homepage", "GET", url, r.status_code,
|
||
{"cookies_count": len(self.session.cookies)})
|
||
|
||
def wait_for_verification_email(self, mail_token: str, timeout: int = 120):
|
||
self._print(f"[OTP] 等待验证码邮件 (最多 {timeout}s)...")
|
||
start_time = time.time()
|
||
while time.time() - start_time < timeout:
|
||
if _mail_provider_name() == "npcmail":
|
||
extracted = self._extract_codes_cmail([mail_token])
|
||
if extracted:
|
||
match = extracted[0]
|
||
code = match.get("code")
|
||
if code:
|
||
self._print(f"[OTP] 验证码: {code}")
|
||
return str(code)
|
||
|
||
messages = self._fetch_emails(mail_token)
|
||
if messages:
|
||
first_msg = messages[0]
|
||
print(f" [DEBUG] Message keys: {list(first_msg.keys())}")
|
||
msg_id = first_msg.get("id") or first_msg.get("@id") or first_msg.get("message_id")
|
||
inline_content = first_msg.get("text") or first_msg.get("body") or first_msg.get("html") or first_msg.get("raw") or first_msg.get("source") or ""
|
||
if inline_content:
|
||
code = self._extract_verification_code(inline_content)
|
||
if code:
|
||
self._print(f"[OTP] 验证码: {code}")
|
||
return code
|
||
if msg_id:
|
||
detail = self._fetch_email_detail(mail_token, str(msg_id))
|
||
if detail:
|
||
content = detail.get("text") or detail.get("body") or detail.get("html") or detail.get("source") or ""
|
||
code = self._extract_verification_code(content)
|
||
if code:
|
||
self._print(f"[OTP] 验证码: {code}")
|
||
return code
|
||
elapsed = int(time.time() - start_time)
|
||
self._print(f"[OTP] 等待中... ({elapsed}s/{timeout}s)")
|
||
time.sleep(3)
|
||
self._print(f"[OTP] 超时 ({timeout}s)")
|
||
return None
|
||
|
||
def get_csrf(self) -> str:
|
||
url = f"{self.BASE}/api/auth/csrf"
|
||
r = self.session.get(url, headers={"Accept": "application/json", "Referer": f"{self.BASE}/"})
|
||
data = r.json()
|
||
token = data.get("csrfToken", "")
|
||
self._log("1. Get CSRF", "GET", url, r.status_code, data)
|
||
if not token:
|
||
raise Exception("Failed to get CSRF token")
|
||
return token
|
||
|
||
def signin(self, email: str, csrf: str) -> str:
|
||
url = f"{self.BASE}/api/auth/signin/openai"
|
||
params = {
|
||
"prompt": "login", "ext-oai-did": self.device_id,
|
||
"auth_session_logging_id": self.auth_session_logging_id,
|
||
"screen_hint": "login_or_signup", "login_hint": email,
|
||
}
|
||
form_data = {"callbackUrl": f"{self.BASE}/", "csrfToken": csrf, "json": "true"}
|
||
r = self.session.post(url, params=params, data=form_data, headers={
|
||
"Content-Type": "application/x-www-form-urlencoded",
|
||
"Accept": "application/json", "Referer": f"{self.BASE}/", "Origin": self.BASE,
|
||
})
|
||
data = r.json()
|
||
authorize_url = data.get("url", "")
|
||
self._log("2. Signin", "POST", url, r.status_code, data)
|
||
if not authorize_url:
|
||
raise Exception("Failed to get authorize URL")
|
||
return authorize_url
|
||
|
||
def authorize(self, url: str) -> str:
|
||
r = self.session.get(url, headers={
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
"Referer": f"{self.BASE}/", "Upgrade-Insecure-Requests": "1",
|
||
}, allow_redirects=True)
|
||
final_url = str(r.url)
|
||
self._log("3. Authorize", "GET", url, r.status_code, {"final_url": final_url})
|
||
return final_url
|
||
|
||
def register(self, email: str, password: str):
|
||
url = f"{self.AUTH}/api/accounts/user/register"
|
||
headers = {"Content-Type": "application/json", "Accept": "application/json",
|
||
"Referer": f"{self.AUTH}/create-account/password", "Origin": self.AUTH}
|
||
headers.update(_make_trace_headers())
|
||
r = self.session.post(url, json={"username": email, "password": password}, headers=headers)
|
||
try:
|
||
data = r.json()
|
||
except Exception:
|
||
data = {"text": r.text[:500]}
|
||
self._log("4. Register", "POST", url, r.status_code, data)
|
||
return r.status_code, data
|
||
|
||
def send_otp(self):
|
||
url = f"{self.AUTH}/api/accounts/email-otp/send"
|
||
r = self.session.get(url, headers={
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
"Referer": f"{self.AUTH}/create-account/password", "Upgrade-Insecure-Requests": "1",
|
||
}, allow_redirects=True)
|
||
try:
|
||
data = r.json()
|
||
except Exception:
|
||
data = {"final_url": str(r.url), "status": r.status_code}
|
||
self._log("5. Send OTP", "GET", url, r.status_code, data)
|
||
return r.status_code, data
|
||
|
||
def validate_otp(self, code: str):
|
||
url = f"{self.AUTH}/api/accounts/email-otp/validate"
|
||
headers = {"Content-Type": "application/json", "Accept": "application/json",
|
||
"Referer": f"{self.AUTH}/email-verification", "Origin": self.AUTH}
|
||
headers.update(_make_trace_headers())
|
||
r = self.session.post(url, json={"code": code}, headers=headers)
|
||
try:
|
||
data = r.json()
|
||
except Exception:
|
||
data = {"text": r.text[:500]}
|
||
self._log("6. Validate OTP", "POST", url, r.status_code, data)
|
||
return r.status_code, data
|
||
|
||
def create_account(self, name: str, birthdate: str):
|
||
url = f"{self.AUTH}/api/accounts/create_account"
|
||
headers = {"Content-Type": "application/json", "Accept": "application/json",
|
||
"Referer": f"{self.AUTH}/about-you", "Origin": self.AUTH}
|
||
headers.update(_make_trace_headers())
|
||
r = self.session.post(url, json={"name": name, "birthdate": birthdate}, headers=headers)
|
||
try:
|
||
data = r.json()
|
||
except Exception:
|
||
data = {"text": r.text[:500]}
|
||
self._log("7. Create Account", "POST", url, r.status_code, data)
|
||
if isinstance(data, dict):
|
||
cb = data.get("continue_url") or data.get("url") or data.get("redirect_url")
|
||
if cb:
|
||
self._callback_url = cb
|
||
return r.status_code, data
|
||
|
||
def callback(self, url: str = None):
|
||
if not url:
|
||
url = self._callback_url
|
||
if not url:
|
||
self._print("[!] No callback URL, skipping.")
|
||
return None, None
|
||
r = self.session.get(url, headers={
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
"Upgrade-Insecure-Requests": "1",
|
||
}, allow_redirects=True)
|
||
self._log("8. Callback", "GET", url, r.status_code, {"final_url": str(r.url)})
|
||
return r.status_code, {"final_url": str(r.url)}
|
||
|
||
# ---- 注册主流程 ----
|
||
|
||
def run_register(self, email, password, name, birthdate, mail_token):
|
||
self.visit_homepage()
|
||
_random_delay(0.3, 0.8)
|
||
csrf = self.get_csrf()
|
||
_random_delay(0.2, 0.5)
|
||
auth_url = self.signin(email, csrf)
|
||
_random_delay(0.3, 0.8)
|
||
final_url = self.authorize(auth_url)
|
||
final_path = urlparse(final_url).path
|
||
_random_delay(0.3, 0.8)
|
||
self._print(f"Authorize → {final_path}")
|
||
need_otp = False
|
||
|
||
if "create-account/password" in final_path:
|
||
self._print("全新注册流程")
|
||
_random_delay(0.5, 1.0)
|
||
status, data = self.register(email, password)
|
||
if status != 200:
|
||
raise Exception(f"Register 失败 ({status}): {data}")
|
||
_random_delay(0.3, 0.8)
|
||
self.send_otp()
|
||
need_otp = True
|
||
elif "email-verification" in final_path or "email-otp" in final_path:
|
||
self._print("跳到 OTP 验证阶段")
|
||
need_otp = True
|
||
elif "about-you" in final_path:
|
||
self._print("跳到填写信息阶段")
|
||
_random_delay(0.5, 1.0)
|
||
self.create_account(name, birthdate)
|
||
_random_delay(0.3, 0.5)
|
||
self.callback()
|
||
return True
|
||
elif "callback" in final_path or "chatgpt.com" in final_url:
|
||
self._print("账号已完成注册")
|
||
return True
|
||
else:
|
||
self._print(f"未知跳转: {final_url}")
|
||
self.register(email, password)
|
||
self.send_otp()
|
||
need_otp = True
|
||
|
||
if need_otp:
|
||
otp_code = self.wait_for_verification_email(mail_token)
|
||
if not otp_code:
|
||
raise Exception("未能获取验证码")
|
||
_random_delay(0.3, 0.8)
|
||
status, data = self.validate_otp(otp_code)
|
||
if status != 200:
|
||
self._print("验证码失败,重试...")
|
||
self.send_otp()
|
||
_random_delay(1.0, 2.0)
|
||
otp_code = self.wait_for_verification_email(mail_token, timeout=60)
|
||
if not otp_code:
|
||
raise Exception("重试后仍未获取验证码")
|
||
_random_delay(0.3, 0.8)
|
||
status, data = self.validate_otp(otp_code)
|
||
if status != 200:
|
||
raise Exception(f"验证码失败 ({status}): {data}")
|
||
|
||
_random_delay(0.5, 1.5)
|
||
status, data = self.create_account(name, birthdate)
|
||
if status != 200:
|
||
raise Exception(f"Create account 失败 ({status}): {data}")
|
||
_random_delay(0.2, 0.5)
|
||
self.callback()
|
||
return True
|
||
|
||
# ---- OAuth helpers ----
|
||
|
||
def _decode_oauth_session_cookie(self):
|
||
jar = getattr(self.session.cookies, "jar", None)
|
||
cookie_items = list(jar) if jar is not None else []
|
||
for c in cookie_items:
|
||
name = getattr(c, "name", "") or ""
|
||
if "oai-client-auth-session" not in name:
|
||
continue
|
||
raw_val = (getattr(c, "value", "") or "").strip()
|
||
if not raw_val:
|
||
continue
|
||
candidates = [raw_val]
|
||
try:
|
||
from urllib.parse import unquote
|
||
decoded = unquote(raw_val)
|
||
if decoded != raw_val:
|
||
candidates.append(decoded)
|
||
except Exception:
|
||
pass
|
||
for val in candidates:
|
||
try:
|
||
if (val.startswith('"') and val.endswith('"')) or (val.startswith("'") and val.endswith("'")):
|
||
val = val[1:-1]
|
||
part = val.split(".")[0] if "." in val else val
|
||
pad = 4 - len(part) % 4
|
||
if pad != 4:
|
||
part += "=" * pad
|
||
raw = base64.urlsafe_b64decode(part)
|
||
data = json.loads(raw.decode("utf-8"))
|
||
if isinstance(data, dict):
|
||
return data
|
||
except Exception:
|
||
continue
|
||
return None
|
||
|
||
def _oauth_allow_redirect_extract_code(self, url: str, referer: str = None):
|
||
headers = {
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
"Upgrade-Insecure-Requests": "1", "User-Agent": self.ua,
|
||
}
|
||
if referer:
|
||
headers["Referer"] = referer
|
||
try:
|
||
resp = self.session.get(url, headers=headers, allow_redirects=True,
|
||
timeout=30, impersonate=self.impersonate)
|
||
final_url = str(resp.url)
|
||
code = _extract_code_from_url(final_url)
|
||
if code:
|
||
return code
|
||
for r in getattr(resp, "history", []) or []:
|
||
loc = r.headers.get("Location", "")
|
||
code = _extract_code_from_url(loc) or _extract_code_from_url(str(r.url))
|
||
if code:
|
||
return code
|
||
except Exception as e:
|
||
maybe_localhost = re.search(r'(https?://localhost[^\s\'\"]+)', str(e))
|
||
if maybe_localhost:
|
||
code = _extract_code_from_url(maybe_localhost.group(1))
|
||
if code:
|
||
return code
|
||
return None
|
||
|
||
def _oauth_follow_for_code(self, start_url: str, referer: str = None, max_hops: int = 16):
|
||
headers = {
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
"Upgrade-Insecure-Requests": "1", "User-Agent": self.ua,
|
||
}
|
||
if referer:
|
||
headers["Referer"] = referer
|
||
current_url = start_url
|
||
last_url = start_url
|
||
for hop in range(max_hops):
|
||
try:
|
||
resp = self.session.get(current_url, headers=headers, allow_redirects=False,
|
||
timeout=30, impersonate=self.impersonate)
|
||
except Exception as e:
|
||
maybe_localhost = re.search(r'(https?://localhost[^\s\'\"]+)', str(e))
|
||
if maybe_localhost:
|
||
code = _extract_code_from_url(maybe_localhost.group(1))
|
||
if code:
|
||
return code, maybe_localhost.group(1)
|
||
return None, last_url
|
||
last_url = str(resp.url)
|
||
code = _extract_code_from_url(last_url)
|
||
if code:
|
||
return code, last_url
|
||
if resp.status_code in (301, 302, 303, 307, 308):
|
||
loc = resp.headers.get("Location", "")
|
||
if not loc:
|
||
return None, last_url
|
||
if loc.startswith("/"):
|
||
loc = f"{OAUTH_ISSUER}{loc}"
|
||
code = _extract_code_from_url(loc)
|
||
if code:
|
||
return code, loc
|
||
current_url = loc
|
||
headers["Referer"] = last_url
|
||
continue
|
||
return None, last_url
|
||
return None, last_url
|
||
|
||
def _oauth_submit_workspace_and_org(self, consent_url: str):
|
||
session_data = self._decode_oauth_session_cookie()
|
||
if not session_data:
|
||
self._print("[OAuth] 无法解码 oai-client-auth-session")
|
||
return None
|
||
workspaces = session_data.get("workspaces", [])
|
||
if not workspaces:
|
||
self._print("[OAuth] session 中没有 workspace 信息")
|
||
return None
|
||
workspace_id = (workspaces[0] or {}).get("id")
|
||
if not workspace_id:
|
||
return None
|
||
|
||
h = {"Accept": "application/json", "Content-Type": "application/json",
|
||
"Origin": OAUTH_ISSUER, "Referer": consent_url,
|
||
"User-Agent": self.ua, "oai-device-id": self.device_id}
|
||
h.update(_make_trace_headers())
|
||
|
||
resp = self.session.post(f"{OAUTH_ISSUER}/api/accounts/workspace/select",
|
||
json={"workspace_id": workspace_id}, headers=h,
|
||
allow_redirects=False, timeout=30, impersonate=self.impersonate)
|
||
self._print(f"[OAuth] workspace/select -> {resp.status_code}")
|
||
|
||
if resp.status_code in (301, 302, 303, 307, 308):
|
||
loc = resp.headers.get("Location", "")
|
||
if loc.startswith("/"):
|
||
loc = f"{OAUTH_ISSUER}{loc}"
|
||
code = _extract_code_from_url(loc)
|
||
if code:
|
||
return code
|
||
code, _ = self._oauth_follow_for_code(loc, referer=consent_url)
|
||
if not code:
|
||
code = self._oauth_allow_redirect_extract_code(loc, referer=consent_url)
|
||
return code
|
||
|
||
if resp.status_code != 200:
|
||
return None
|
||
|
||
try:
|
||
ws_data = resp.json()
|
||
except Exception:
|
||
return None
|
||
|
||
ws_next = ws_data.get("continue_url", "")
|
||
orgs = ws_data.get("data", {}).get("orgs", [])
|
||
|
||
org_id = None
|
||
project_id = None
|
||
if orgs:
|
||
org_id = (orgs[0] or {}).get("id")
|
||
projects = (orgs[0] or {}).get("projects", [])
|
||
if projects:
|
||
project_id = (projects[0] or {}).get("id")
|
||
|
||
if org_id:
|
||
org_body = {"org_id": org_id}
|
||
if project_id:
|
||
org_body["project_id"] = project_id
|
||
h_org = dict(h)
|
||
if ws_next:
|
||
h_org["Referer"] = ws_next if ws_next.startswith("http") else f"{OAUTH_ISSUER}{ws_next}"
|
||
resp_org = self.session.post(f"{OAUTH_ISSUER}/api/accounts/organization/select",
|
||
json=org_body, headers=h_org, allow_redirects=False,
|
||
timeout=30, impersonate=self.impersonate)
|
||
self._print(f"[OAuth] organization/select -> {resp_org.status_code}")
|
||
if resp_org.status_code in (301, 302, 303, 307, 308):
|
||
loc = resp_org.headers.get("Location", "")
|
||
if loc.startswith("/"):
|
||
loc = f"{OAUTH_ISSUER}{loc}"
|
||
code = _extract_code_from_url(loc)
|
||
if code:
|
||
return code
|
||
code, _ = self._oauth_follow_for_code(loc, referer=h_org.get("Referer"))
|
||
if not code:
|
||
code = self._oauth_allow_redirect_extract_code(loc, referer=h_org.get("Referer"))
|
||
return code
|
||
if resp_org.status_code == 200:
|
||
try:
|
||
org_data = resp_org.json()
|
||
except Exception:
|
||
return None
|
||
org_next = org_data.get("continue_url", "")
|
||
if org_next:
|
||
if org_next.startswith("/"):
|
||
org_next = f"{OAUTH_ISSUER}{org_next}"
|
||
code, _ = self._oauth_follow_for_code(org_next, referer=h_org.get("Referer"))
|
||
if not code:
|
||
code = self._oauth_allow_redirect_extract_code(org_next, referer=h_org.get("Referer"))
|
||
return code
|
||
|
||
if ws_next:
|
||
if ws_next.startswith("/"):
|
||
ws_next = f"{OAUTH_ISSUER}{ws_next}"
|
||
code, _ = self._oauth_follow_for_code(ws_next, referer=consent_url)
|
||
if not code:
|
||
code = self._oauth_allow_redirect_extract_code(ws_next, referer=consent_url)
|
||
return code
|
||
return None
|
||
|
||
# ---- Codex OAuth 纯协议 ----
|
||
|
||
def perform_codex_oauth_login_http(self, email: str, password: str, mail_token: str = None):
|
||
self._print("[OAuth] 开始执行 Codex OAuth 纯协议流程...")
|
||
self.session.cookies.set("oai-did", self.device_id, domain=".auth.openai.com")
|
||
self.session.cookies.set("oai-did", self.device_id, domain="auth.openai.com")
|
||
|
||
code_verifier, code_challenge = _generate_pkce()
|
||
state = secrets.token_urlsafe(24)
|
||
authorize_params = {
|
||
"response_type": "code", "client_id": OAUTH_CLIENT_ID,
|
||
"redirect_uri": OAUTH_REDIRECT_URI, "scope": "openid profile email offline_access",
|
||
"code_challenge": code_challenge, "code_challenge_method": "S256", "state": state,
|
||
}
|
||
authorize_url = f"{OAUTH_ISSUER}/oauth/authorize?{urlencode(authorize_params)}"
|
||
|
||
def _oauth_json_headers(referer: str):
|
||
h = {"Accept": "application/json", "Content-Type": "application/json",
|
||
"Origin": OAUTH_ISSUER, "Referer": referer,
|
||
"User-Agent": self.ua, "oai-device-id": self.device_id}
|
||
h.update(_make_trace_headers())
|
||
return h
|
||
|
||
def _bootstrap_oauth_session():
|
||
self._print("[OAuth] 1/7 GET /oauth/authorize")
|
||
try:
|
||
r = self.session.get(authorize_url, headers={
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
"Referer": f"{self.BASE}/", "Upgrade-Insecure-Requests": "1", "User-Agent": self.ua,
|
||
}, allow_redirects=True, timeout=30, impersonate=self.impersonate)
|
||
except Exception as e:
|
||
self._print(f"[OAuth] /oauth/authorize 异常: {e}")
|
||
return False, ""
|
||
final_url = str(r.url)
|
||
self._print(f"[OAuth] /oauth/authorize -> {r.status_code}, final={final_url[:140]}")
|
||
has_login = any(getattr(c, "name", "") == "login_session" for c in self.session.cookies)
|
||
if not has_login:
|
||
try:
|
||
r2 = self.session.get(f"{OAUTH_ISSUER}/api/oauth/oauth2/auth",
|
||
headers={"Accept": "text/html", "Referer": authorize_url,
|
||
"User-Agent": self.ua},
|
||
params=authorize_params, allow_redirects=True,
|
||
timeout=30, impersonate=self.impersonate)
|
||
final_url = str(r2.url)
|
||
except Exception:
|
||
pass
|
||
has_login = any(getattr(c, "name", "") == "login_session" for c in self.session.cookies)
|
||
return has_login, final_url
|
||
|
||
def _post_authorize_continue(referer_url: str):
|
||
sentinel = build_sentinel_token(self.session, self.device_id, flow="authorize_continue",
|
||
user_agent=self.ua, sec_ch_ua=self.sec_ch_ua, impersonate=self.impersonate)
|
||
if not sentinel:
|
||
self._print("[OAuth] authorize_continue sentinel 失败")
|
||
return None
|
||
headers = _oauth_json_headers(referer_url)
|
||
headers["openai-sentinel-token"] = sentinel
|
||
try:
|
||
return self.session.post(f"{OAUTH_ISSUER}/api/accounts/authorize/continue",
|
||
json={"username": {"kind": "email", "value": email}},
|
||
headers=headers, timeout=30, allow_redirects=False,
|
||
impersonate=self.impersonate)
|
||
except Exception as e:
|
||
self._print(f"[OAuth] authorize/continue 异常: {e}")
|
||
return None
|
||
|
||
has_login_session, authorize_final_url = _bootstrap_oauth_session()
|
||
if not authorize_final_url:
|
||
return None
|
||
|
||
continue_referer = authorize_final_url if authorize_final_url.startswith(OAUTH_ISSUER) else f"{OAUTH_ISSUER}/log-in"
|
||
|
||
self._print("[OAuth] 2/7 POST /api/accounts/authorize/continue")
|
||
resp_continue = _post_authorize_continue(continue_referer)
|
||
if resp_continue is None:
|
||
return None
|
||
|
||
if resp_continue.status_code == 400 and "invalid_auth_step" in (resp_continue.text or ""):
|
||
self._print("[OAuth] invalid_auth_step, 重新 bootstrap")
|
||
has_login_session, authorize_final_url = _bootstrap_oauth_session()
|
||
if not authorize_final_url:
|
||
return None
|
||
continue_referer = authorize_final_url if authorize_final_url.startswith(OAUTH_ISSUER) else f"{OAUTH_ISSUER}/log-in"
|
||
resp_continue = _post_authorize_continue(continue_referer)
|
||
if resp_continue is None:
|
||
return None
|
||
|
||
if resp_continue.status_code != 200:
|
||
self._print(f"[OAuth] 邮箱提交失败: {resp_continue.text[:180]}")
|
||
return None
|
||
|
||
try:
|
||
continue_data = resp_continue.json()
|
||
except Exception:
|
||
return None
|
||
|
||
continue_url = continue_data.get("continue_url", "")
|
||
page_type = (continue_data.get("page") or {}).get("type", "")
|
||
|
||
self._print("[OAuth] 3/7 POST /api/accounts/password/verify")
|
||
sentinel_pwd = build_sentinel_token(self.session, self.device_id, flow="password_verify",
|
||
user_agent=self.ua, sec_ch_ua=self.sec_ch_ua, impersonate=self.impersonate)
|
||
if not sentinel_pwd:
|
||
return None
|
||
|
||
headers_verify = _oauth_json_headers(f"{OAUTH_ISSUER}/log-in/password")
|
||
headers_verify["openai-sentinel-token"] = sentinel_pwd
|
||
|
||
try:
|
||
resp_verify = self.session.post(f"{OAUTH_ISSUER}/api/accounts/password/verify",
|
||
json={"password": password}, headers=headers_verify,
|
||
timeout=30, allow_redirects=False, impersonate=self.impersonate)
|
||
except Exception as e:
|
||
self._print(f"[OAuth] password/verify 异常: {e}")
|
||
return None
|
||
|
||
if resp_verify.status_code != 200:
|
||
self._print(f"[OAuth] 密码校验失败: {resp_verify.text[:180]}")
|
||
return None
|
||
|
||
try:
|
||
verify_data = resp_verify.json()
|
||
except Exception:
|
||
return None
|
||
|
||
continue_url = verify_data.get("continue_url", "") or continue_url
|
||
page_type = (verify_data.get("page") or {}).get("type", "") or page_type
|
||
|
||
# OTP 阶段
|
||
need_oauth_otp = (page_type == "email_otp_verification"
|
||
or "email-verification" in (continue_url or "")
|
||
or "email-otp" in (continue_url or ""))
|
||
|
||
if need_oauth_otp:
|
||
self._print("[OAuth] 4/7 检测到邮箱 OTP 验证")
|
||
if not mail_token:
|
||
self._print("[OAuth] 需要 OTP 但未提供 mail_token")
|
||
return None
|
||
headers_otp = _oauth_json_headers(f"{OAUTH_ISSUER}/email-verification")
|
||
tried_codes = set()
|
||
otp_success = False
|
||
otp_deadline = time.time() + 120
|
||
while time.time() < otp_deadline and not otp_success:
|
||
messages = self._fetch_emails(mail_token) or []
|
||
candidate_codes = []
|
||
for msg in messages[:12]:
|
||
code = None
|
||
# Try inline content first (like workers do)
|
||
inline_content = msg.get("text") or msg.get("html") or msg.get("raw") or msg.get("source") or ""
|
||
if inline_content:
|
||
code = self._extract_verification_code(inline_content)
|
||
|
||
# Try fetching detail if inline extraction failed
|
||
if not code:
|
||
msg_id = msg.get("id") or msg.get("@id") or msg.get("message_id")
|
||
if msg_id:
|
||
detail = self._fetch_email_detail(mail_token, str(msg_id))
|
||
if detail:
|
||
content = detail.get("text") or detail.get("html") or detail.get("source") or ""
|
||
code = self._extract_verification_code(content)
|
||
|
||
if code and code not in tried_codes:
|
||
candidate_codes.append(code)
|
||
if not candidate_codes:
|
||
time.sleep(2)
|
||
continue
|
||
for otp_code in candidate_codes:
|
||
tried_codes.add(otp_code)
|
||
self._print(f"[OAuth] 尝试 OTP: {otp_code}")
|
||
try:
|
||
resp_otp = self.session.post(f"{OAUTH_ISSUER}/api/accounts/email-otp/validate",
|
||
json={"code": otp_code}, headers=headers_otp,
|
||
timeout=30, allow_redirects=False, impersonate=self.impersonate)
|
||
except Exception:
|
||
continue
|
||
if resp_otp.status_code != 200:
|
||
continue
|
||
try:
|
||
otp_data = resp_otp.json()
|
||
except Exception:
|
||
continue
|
||
continue_url = otp_data.get("continue_url", "") or continue_url
|
||
page_type = (otp_data.get("page") or {}).get("type", "") or page_type
|
||
otp_success = True
|
||
break
|
||
if not otp_success:
|
||
time.sleep(2)
|
||
if not otp_success:
|
||
self._print(f"[OAuth] OTP 验证失败")
|
||
return None
|
||
|
||
# 提取 code
|
||
code = None
|
||
consent_url = continue_url
|
||
if consent_url and consent_url.startswith("/"):
|
||
consent_url = f"{OAUTH_ISSUER}{consent_url}"
|
||
if not consent_url and "consent" in page_type:
|
||
consent_url = f"{OAUTH_ISSUER}/sign-in-with-chatgpt/codex/consent"
|
||
if consent_url:
|
||
code = _extract_code_from_url(consent_url)
|
||
|
||
if not code and consent_url:
|
||
self._print("[OAuth] 5/7 跟随 continue_url 提取 code")
|
||
code, _ = self._oauth_follow_for_code(consent_url, referer=f"{OAUTH_ISSUER}/log-in/password")
|
||
|
||
consent_hint = (("consent" in (consent_url or "")) or ("sign-in-with-chatgpt" in (consent_url or ""))
|
||
or ("workspace" in (consent_url or "")) or ("organization" in (consent_url or ""))
|
||
or ("consent" in page_type) or ("organization" in page_type))
|
||
|
||
if not code and consent_hint:
|
||
if not consent_url:
|
||
consent_url = f"{OAUTH_ISSUER}/sign-in-with-chatgpt/codex/consent"
|
||
self._print("[OAuth] 6/7 执行 workspace/org 选择")
|
||
code = self._oauth_submit_workspace_and_org(consent_url)
|
||
|
||
if not code:
|
||
fallback_consent = f"{OAUTH_ISSUER}/sign-in-with-chatgpt/codex/consent"
|
||
self._print("[OAuth] 6/7 回退 consent 路径重试")
|
||
code = self._oauth_submit_workspace_and_org(fallback_consent)
|
||
if not code:
|
||
code, _ = self._oauth_follow_for_code(fallback_consent, referer=f"{OAUTH_ISSUER}/log-in/password")
|
||
|
||
if not code:
|
||
self._print("[OAuth] 未获取到 authorization code")
|
||
return None
|
||
|
||
self._print("[OAuth] 7/7 POST /oauth/token")
|
||
token_resp = self.session.post(f"{OAUTH_ISSUER}/oauth/token",
|
||
headers={"Content-Type": "application/x-www-form-urlencoded", "User-Agent": self.ua},
|
||
data={"grant_type": "authorization_code", "code": code,
|
||
"redirect_uri": OAUTH_REDIRECT_URI, "client_id": OAUTH_CLIENT_ID,
|
||
"code_verifier": code_verifier},
|
||
timeout=60, impersonate=self.impersonate)
|
||
self._print(f"[OAuth] /oauth/token -> {token_resp.status_code}")
|
||
|
||
if token_resp.status_code != 200:
|
||
self._print(f"[OAuth] token 交换失败: {token_resp.text[:200]}")
|
||
return None
|
||
|
||
try:
|
||
data = token_resp.json()
|
||
except Exception:
|
||
return None
|
||
if not data.get("access_token"):
|
||
return None
|
||
|
||
self._print("[OAuth] Codex Token 获取成功 ✅")
|
||
return data
|
||
|
||
|
||
# ================= Team 母号注册 =================
|
||
|
||
def register_team_master(proxy=None):
|
||
"""注册 Team 母号:注册 → 获取 AccessToken 和 SessionToken(跳过 Codex OAuth)"""
|
||
tag = "master"
|
||
reg = ChatGPTRegister(proxy=proxy, tag=tag)
|
||
|
||
# 1. 创建临时邮箱
|
||
reg._print("[GPTMail] 创建临时邮箱...")
|
||
email, email_pwd, mail_token = reg.create_temp_email()
|
||
tag = email.split("@")[0]
|
||
reg.tag = tag
|
||
|
||
chatgpt_password = _generate_password()
|
||
name = _random_name()
|
||
birthdate = _random_birthdate()
|
||
|
||
with _print_lock:
|
||
print(f"\n{'='*60}")
|
||
print(f" [母号注册] {email}")
|
||
print(f" ChatGPT密码: {chatgpt_password}")
|
||
print(f" 邮箱密码: {email_pwd}")
|
||
print(f" 姓名: {name} | 生日: {birthdate}")
|
||
print(f"{'='*60}")
|
||
|
||
# 2. 执行注册流程
|
||
reg.run_register(email, chatgpt_password, name, birthdate, mail_token)
|
||
|
||
# 3. 获取 SessionToken 和 AccessToken(跳过 Codex OAuth)
|
||
session_token_value = None
|
||
access_token = None
|
||
|
||
# 从 cookies 中提取 __Secure-next-auth.session-token
|
||
jar = getattr(reg.session.cookies, "jar", None)
|
||
cookie_items = list(jar) if jar is not None else []
|
||
for c in cookie_items:
|
||
cookie_name = getattr(c, "name", "") or ""
|
||
if "__Secure-next-auth.session-token" in cookie_name:
|
||
session_token_value = getattr(c, "value", "")
|
||
break
|
||
|
||
if session_token_value:
|
||
reg._print("🔑 获取到 Session Token")
|
||
try:
|
||
r = reg.session.get("https://chatgpt.com/api/auth/session", timeout=30)
|
||
if r.status_code == 200:
|
||
data = r.json()
|
||
access_token = data.get("accessToken", "")
|
||
if access_token:
|
||
reg._print("🔑 获取 Access Token 成功 ✅")
|
||
else:
|
||
reg._print("⚠️ Session 响应中无 accessToken")
|
||
else:
|
||
reg._print(f"⚠️ 获取 session 失败: HTTP {r.status_code}")
|
||
except Exception as e:
|
||
reg._print(f"⚠️ 获取 session 异常: {e}")
|
||
else:
|
||
reg._print("⚠️ 未找到 session token cookie")
|
||
|
||
# 4. 保存注册记录
|
||
with _file_lock:
|
||
with open(DEFAULT_OUTPUT_FILE, "a", encoding="utf-8") as out:
|
||
out.write(f"{email}----{chatgpt_password}----{email_pwd}----master\n")
|
||
save_to_csv(email, chatgpt_password, email_pwd, oauth_status="master")
|
||
|
||
reg._print(f"✅ 母号注册完成: {email}")
|
||
return {
|
||
"email": email,
|
||
"password": chatgpt_password,
|
||
"email_password": email_pwd,
|
||
"access_token": access_token,
|
||
"session_token": session_token_value,
|
||
}
|
||
|
||
|
||
def get_team_info_from_session(session_token, proxy=None):
|
||
"""使用 session_token 获取 Team 信息(名称、ID、AccessToken、SessionToken)"""
|
||
s = curl_requests.Session(verify=False, impersonate="chrome120")
|
||
if proxy:
|
||
s.proxies = {"http": proxy, "https": proxy}
|
||
s.cookies.set("__Secure-next-auth.session-token", session_token, domain="chatgpt.com")
|
||
|
||
# 1. 获取 fresh access token
|
||
r = s.get("https://chatgpt.com/api/auth/session", timeout=30)
|
||
if r.status_code != 200:
|
||
print(f"[TeamInfo] 获取 session 失败: HTTP {r.status_code}")
|
||
return None
|
||
|
||
data = r.json()
|
||
access_token = data.get("accessToken", "")
|
||
if not access_token:
|
||
print("[TeamInfo] Session 响应中无 accessToken")
|
||
return None
|
||
|
||
# 获取更新后的 session token
|
||
new_session_token = session_token
|
||
jar = getattr(s.cookies, "jar", None)
|
||
if jar:
|
||
for c in list(jar):
|
||
cname = getattr(c, "name", "") or ""
|
||
if "__Secure-next-auth.session-token" in cname:
|
||
val = getattr(c, "value", "")
|
||
if val:
|
||
new_session_token = val
|
||
break
|
||
|
||
# 2. 获取 workspace/team 信息
|
||
headers = {
|
||
"authorization": f"Bearer {access_token}",
|
||
"accept": "*/*",
|
||
"content-type": "application/json",
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
||
"(KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36",
|
||
}
|
||
|
||
team_name = ""
|
||
account_id = ""
|
||
try:
|
||
r2 = s.get("https://chatgpt.com/backend-api/accounts/check/v4-2023-04-27",
|
||
headers=headers, timeout=30)
|
||
if r2.status_code == 200:
|
||
accounts_data = r2.json()
|
||
accounts = accounts_data.get("accounts", {})
|
||
print(f"[TeamInfo] 找到 {len(accounts)} 个 workspace")
|
||
for acc_id, acc_info in accounts.items():
|
||
account = acc_info.get("account", {})
|
||
structure = account.get("structure", "")
|
||
plan_type = account.get("plan_type", "")
|
||
print(f"[TeamInfo] {acc_id}: structure={structure}, plan_type={plan_type}")
|
||
if structure == "workspace" or plan_type == "team":
|
||
account_id = acc_id
|
||
team_name = account.get("name", "") or account.get("display_name", "")
|
||
break
|
||
else:
|
||
print(f"[TeamInfo] accounts/check 失败: HTTP {r2.status_code}")
|
||
except Exception as e:
|
||
print(f"[TeamInfo] accounts/check 异常: {e}")
|
||
|
||
# Fallback: 从 JWT 解码获取 account_id
|
||
if not account_id:
|
||
payload = _decode_jwt_payload(access_token)
|
||
auth_info = payload.get("https://api.openai.com/auth", {})
|
||
account_id = auth_info.get("chatgpt_account_id", "")
|
||
print(f"[TeamInfo] 从 JWT 获取 account_id: {account_id}")
|
||
|
||
return {
|
||
"name": team_name or f"Team-{account_id[:8]}",
|
||
"account_id": account_id,
|
||
"auth_token": access_token,
|
||
"session_token": new_session_token,
|
||
}
|
||
|
||
|
||
# ================= 并发批量注册 =================
|
||
|
||
def _register_one(idx, total, proxy, output_file):
|
||
"""单个注册任务(线程内运行):DuckMail 创建 → 注册 → Team 邀请 → Codex OAuth"""
|
||
reg = None
|
||
try:
|
||
reg = ChatGPTRegister(proxy=proxy, tag=f"{idx}")
|
||
|
||
# 1. 创建 DuckMail 临时邮箱
|
||
reg._print("[DuckMail] 创建临时邮箱...")
|
||
email, email_pwd, mail_token = reg.create_temp_email()
|
||
tag = email.split("@")[0]
|
||
reg.tag = tag
|
||
|
||
chatgpt_password = _generate_password()
|
||
name = _random_name()
|
||
birthdate = _random_birthdate()
|
||
|
||
with _print_lock:
|
||
print(f"\n{'='*60}")
|
||
print(f" [{idx}/{total}] 注册: {email}")
|
||
print(f" ChatGPT密码: {chatgpt_password}")
|
||
print(f" 邮箱密码: {email_pwd}")
|
||
print(f" 姓名: {name} | 生日: {birthdate}")
|
||
print(f"{'='*60}")
|
||
|
||
# 2. 执行注册流程
|
||
reg.run_register(email, chatgpt_password, name, birthdate, mail_token)
|
||
|
||
# 3. Team 邀请
|
||
reg._print("📨 发送 Team 邀请...")
|
||
invite_ok = auto_invite_to_team(email, tag=tag)
|
||
if invite_ok:
|
||
reg._print("⏳ 等待邀请生效...")
|
||
time.sleep(5)
|
||
|
||
# 4. Codex OAuth
|
||
oauth_ok = True
|
||
if ENABLE_OAUTH:
|
||
reg._print("[OAuth] 开始获取 Codex Token...")
|
||
tokens = reg.perform_codex_oauth_login_http(email, chatgpt_password, mail_token=mail_token)
|
||
oauth_ok = bool(tokens and tokens.get("access_token"))
|
||
if oauth_ok:
|
||
_save_codex_tokens(email, tokens)
|
||
reg._print("[OAuth] Token 已保存 ✅")
|
||
else:
|
||
if OAUTH_REQUIRED:
|
||
raise Exception("OAuth 获取失败(oauth_required=true)")
|
||
reg._print("[OAuth] 获取失败(按配置继续)")
|
||
|
||
# 5. 保存结果
|
||
with _file_lock:
|
||
with open(output_file, "a", encoding="utf-8") as out:
|
||
out.write(f"{email}----{chatgpt_password}----{email_pwd}----oauth={'ok' if oauth_ok else 'fail'}\n")
|
||
|
||
save_to_csv(email, chatgpt_password, email_pwd, oauth_status="ok" if oauth_ok else "fail")
|
||
|
||
with _print_lock:
|
||
print(f"\n[OK] [{tag}] {email} 注册成功! 🎉")
|
||
return True, email, None
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
with _print_lock:
|
||
print(f"\n[FAIL] [{idx}] 注册失败: {error_msg}")
|
||
traceback.print_exc()
|
||
return False, None, error_msg
|
||
|
||
|
||
def run_batch(total_accounts: int = 4, output_file="registered_accounts.txt",
|
||
max_workers=1, proxy=None):
|
||
"""并发批量注册 - DuckMail 临时邮箱 + Team 邀请 + Codex OAuth"""
|
||
actual_workers = min(max_workers, total_accounts)
|
||
print(f"\n{'#'*60}")
|
||
print(f" ChatGPT 批量自动注册 (纯协议版)")
|
||
print(f" 注册数量: {total_accounts} | 并发数: {actual_workers}")
|
||
print(f" GPTMail: {GPTMAIL_BASE}")
|
||
print(f" Teams: {len(TEAMS)} 个")
|
||
print(f" OAuth: {'开启' if ENABLE_OAUTH else '关闭'} | required: {'是' if OAUTH_REQUIRED else '否'}")
|
||
if ENABLE_OAUTH:
|
||
print(f" OAuth Issuer: {OAUTH_ISSUER}")
|
||
print(f" OAuth Client: {OAUTH_CLIENT_ID}")
|
||
print(f" Token输出: {TOKEN_JSON_DIR}/, {AK_FILE}, {RK_FILE}")
|
||
print(f" 输出文件: {output_file}")
|
||
print(f"{'#'*60}\n")
|
||
|
||
success_count = 0
|
||
fail_count = 0
|
||
start_time = time.time()
|
||
|
||
with ThreadPoolExecutor(max_workers=actual_workers) as executor:
|
||
futures = {}
|
||
for idx in range(1, total_accounts + 1):
|
||
future = executor.submit(_register_one, idx, total_accounts, proxy, output_file)
|
||
futures[future] = idx
|
||
|
||
for future in as_completed(futures):
|
||
idx = futures[future]
|
||
try:
|
||
ok, email, err = future.result()
|
||
if ok:
|
||
success_count += 1
|
||
else:
|
||
fail_count += 1
|
||
print(f" [账号 {idx}] 失败: {err}")
|
||
except Exception as e:
|
||
fail_count += 1
|
||
with _print_lock:
|
||
print(f"[FAIL] 账号 {idx} 线程异常: {e}")
|
||
|
||
elapsed = time.time() - start_time
|
||
avg = elapsed / total_accounts if total_accounts else 0
|
||
print(f"\n{'#'*60}")
|
||
print(f" 注册完成! 耗时 {elapsed:.1f} 秒")
|
||
print(f" 总数: {total_accounts} | 成功: {success_count} | 失败: {fail_count}")
|
||
print(f" 平均速度: {avg:.1f} 秒/个")
|
||
if success_count > 0:
|
||
print(f" 结果文件: {output_file}")
|
||
print(f"{'#'*60}")
|
||
|
||
|
||
def main():
|
||
print("=" * 60)
|
||
print(" ChatGPT 批量自动注册工具 (纯协议版)")
|
||
print(" 注册 → Team 邀请 → Codex OAuth 全流程自动化")
|
||
print("=" * 60)
|
||
|
||
proxy = DEFAULT_PROXY
|
||
if proxy:
|
||
print(f"[Info] 使用代理: {proxy}")
|
||
else:
|
||
env_proxy = (os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy")
|
||
or os.environ.get("ALL_PROXY") or os.environ.get("all_proxy"))
|
||
if env_proxy:
|
||
print(f"[Info] 检测到环境变量代理: {env_proxy}")
|
||
proxy = env_proxy
|
||
else:
|
||
proxy_input = input("输入代理地址 (留空=不使用代理): ").strip()
|
||
proxy = proxy_input or None
|
||
|
||
if proxy:
|
||
print(f"[Info] 使用代理: {proxy}")
|
||
|
||
count_input = input(f"\n注册账号数量 (默认 {DEFAULT_TOTAL_ACCOUNTS}): ").strip()
|
||
total_accounts = int(count_input) if count_input.isdigit() and int(count_input) > 0 else DEFAULT_TOTAL_ACCOUNTS
|
||
|
||
workers_input = input("并发数 (默认 1): ").strip()
|
||
max_workers = int(workers_input) if workers_input.isdigit() and int(workers_input) > 0 else 1
|
||
|
||
run_batch(total_accounts=total_accounts, output_file=DEFAULT_OUTPUT_FILE,
|
||
max_workers=max_workers, proxy=proxy)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|