更新gpt注册rt

This commit is contained in:
zhangchen
2026-03-31 03:57:40 +08:00
parent f06dce11c7
commit f2fb6a9b84
3 changed files with 734 additions and 217 deletions

View File

@@ -0,0 +1,467 @@
"""
OAuth PKCE 注册客户端
完整实现 auth.openai.com 注册状态机 + 登录获取 Token 的全生命周期。
每个步骤封装为独立方法,调用方按编号依次调用即可完成整个注册流程。
"""
import json
import re
import time
import urllib.parse
from typing import Optional
from curl_cffi import requests as curl_requests
from .oauth import (
OAuthStart,
_decode_jwt_segment,
generate_oauth_url,
submit_callback_url,
)
AUTH_BASE = "https://auth.openai.com"
SENTINEL_API = "https://sentinel.openai.com/backend-api/sentinel/req"
SENTINEL_REFERER = "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6"
CLOUDFLARE_TRACE = "https://cloudflare.com/cdn-cgi/trace"
class OAuthPkceClient:
"""
OAuth PKCE 注册客户端
完整注册流程12 步):
1. 检查 IP 地区
2. 访问 OAuth 授权 URL获取 oai-did Cookie
3. 获取 Sentinel Token
4. 提交邮箱 (authorize/continue)
5. 提交密码 (user/register)
6. 发送 OTP (email-otp/send)
7. 验证 OTP (email-otp/validate)
8. 创建账户 (create_account)
9. 注册后重新 OAuth 登录
10. 解析 workspace_id
11. 选择 workspace
12. 跟踪重定向链,交换 OAuth code → access_token
"""
def __init__(self, proxy: Optional[str] = None, log_fn=None):
self.proxy = proxy
self._log = log_fn or (lambda msg: None)
self._proxies = {"http": proxy, "https": proxy} if proxy else None
# 主会话:贯穿整个注册 + 登录流程
self.session = curl_requests.Session(
proxies=self._proxies,
impersonate="chrome",
)
self._device_id: Optional[str] = None
self._sentinel: Optional[str] = None
# ══════════════════════════════════════════════════════════════════
# 内部方法:获取 Sentinel Token极简模式
# ══════════════════════════════════════════════════════════════════
def _fetch_sentinel_token(self, device_id: str, flow: str = "authorize_continue") -> str:
"""
获取 Sentinel Token。
使用独立连接(不复用 session cookie请求体 p 字段留空,
只取响应中的 token 字段拼装为 openai-sentinel-token header 值。
Returns:
JSON 格式的 sentinel token 字符串。
"""
req_body = json.dumps({"p": "", "id": device_id, "flow": flow})
resp = curl_requests.post(
SENTINEL_API,
headers={
"origin": "https://sentinel.openai.com",
"referer": SENTINEL_REFERER,
"content-type": "text/plain;charset=UTF-8",
},
data=req_body,
proxies=self._proxies,
impersonate="chrome",
timeout=15,
)
if resp.status_code != 200:
raise RuntimeError(f"Sentinel 获取失败: HTTP {resp.status_code}")
c_value = resp.json().get("token", "")
if not c_value:
raise RuntimeError("Sentinel 响应缺少 token 字段")
return json.dumps({
"p": "", "t": "", "c": c_value,
"id": device_id, "flow": flow,
}, separators=(",", ":"))
# ══════════════════════════════════════════════════════════════════
# 步骤 1检查 IP 地区
# ══════════════════════════════════════════════════════════════════
def check_ip_region(self) -> str:
"""检查当前 IP 地区CN/HK 不支持。"""
try:
resp = self.session.get(CLOUDFLARE_TRACE, timeout=10)
match = re.search(r"^loc=(.+)$", resp.text, re.MULTILINE)
loc = match.group(1).strip() if match else "UNKNOWN"
self._log(f"当前 IP 地区: {loc}")
if loc in ("CN", "HK"):
raise RuntimeError(f"IP 地区不支持: {loc}")
return loc
except RuntimeError:
raise
except Exception as e:
raise RuntimeError(f"IP 地区检查失败: {e}") from e
# ══════════════════════════════════════════════════════════════════
# 步骤 2访问 OAuth 授权 URL获取 oai-did Cookie
# ══════════════════════════════════════════════════════════════════
def init_oauth_session(self) -> OAuthStart:
"""生成 OAuth PKCE URL 并访问,建立 auth.openai.com 会话。"""
oauth = generate_oauth_url()
self._log("访问 OAuth 授权 URL...")
self.session.get(oauth.auth_url, timeout=15)
self._device_id = self.session.cookies.get("oai-did") or ""
self._log(f"oai-did: {self._device_id[:16]}..." if self._device_id else "oai-did: (未获取到)")
return oauth
# ══════════════════════════════════════════════════════════════════
# 步骤 3获取 Sentinel Token
# ══════════════════════════════════════════════════════════════════
def refresh_sentinel(self) -> str:
"""获取新的 Sentinel Token 并缓存。"""
if not self._device_id:
raise RuntimeError("尚未初始化 oai-did请先调用 init_oauth_session")
self._sentinel = self._fetch_sentinel_token(self._device_id)
self._log("Sentinel Token 已获取")
return self._sentinel
# ══════════════════════════════════════════════════════════════════
# 步骤 4提交邮箱
# ══════════════════════════════════════════════════════════════════
def submit_email(self, email: str) -> dict:
"""向 authorize/continue 提交邮箱,触发注册状态机。"""
if not self._sentinel:
raise RuntimeError("Sentinel Token 未初始化")
payload = json.dumps({
"username": {"value": email, "kind": "email"},
"screen_hint": "signup",
})
self._log(f"提交邮箱: {email}")
resp = self.session.post(
f"{AUTH_BASE}/api/accounts/authorize/continue",
headers={
"referer": f"{AUTH_BASE}/create-account",
"accept": "application/json",
"content-type": "application/json",
"openai-sentinel-token": self._sentinel,
},
data=payload,
timeout=30,
)
if resp.status_code != 200:
raise RuntimeError(f"提交邮箱失败: HTTP {resp.status_code} {resp.text[:300]}")
data = resp.json()
self._log(f"邮箱提交成功")
return data
# ══════════════════════════════════════════════════════════════════
# 步骤 5提交密码
# ══════════════════════════════════════════════════════════════════
def submit_password(self, email: str, password: str) -> str:
"""向 user/register 提交密码,返回 continue_url。"""
payload = json.dumps({"password": password, "username": email})
self._log("提交密码...")
resp = self.session.post(
f"{AUTH_BASE}/api/accounts/user/register",
headers={
"referer": f"{AUTH_BASE}/create-account/password",
"accept": "application/json",
"content-type": "application/json",
"openai-sentinel-token": self._sentinel or "",
},
data=payload,
timeout=30,
)
if resp.status_code != 200:
raise RuntimeError(f"提交密码失败: HTTP {resp.status_code} {resp.text[:300]}")
continue_url = resp.json().get("continue_url") or ""
self._log(f"密码提交成功{', continue_url 已获取' if continue_url else ''}")
return continue_url
# ══════════════════════════════════════════════════════════════════
# 步骤 6发送 OTP
# ══════════════════════════════════════════════════════════════════
def send_otp(self, continue_url: str = "") -> bool:
"""触发发送邮箱验证码。"""
url = continue_url or f"{AUTH_BASE}/api/accounts/email-otp/send"
self._log(f"发送验证码: {url}")
try:
resp = self.session.post(
url,
headers={
"referer": f"{AUTH_BASE}/create-account/password",
"accept": "application/json",
"content-type": "application/json",
"openai-sentinel-token": self._sentinel or "",
},
timeout=30,
)
self._log(f"验证码发送状态: {resp.status_code}")
return resp.status_code == 200
except Exception as e:
self._log(f"发送验证码异常(非致命): {e}")
return False
# ══════════════════════════════════════════════════════════════════
# 步骤 7验证 OTP
# ══════════════════════════════════════════════════════════════════
def validate_otp(self, code: str) -> None:
"""提交邮箱验证码。"""
self._log(f"验证 OTP: {code}")
resp = self.session.post(
f"{AUTH_BASE}/api/accounts/email-otp/validate",
headers={
"referer": f"{AUTH_BASE}/email-verification",
"accept": "application/json",
"content-type": "application/json",
},
data=json.dumps({"code": code}),
timeout=30,
)
if resp.status_code != 200:
raise RuntimeError(f"OTP 验证失败: HTTP {resp.status_code} {resp.text[:300]}")
self._log("OTP 验证通过")
# ══════════════════════════════════════════════════════════════════
# 步骤 8创建账户
# ══════════════════════════════════════════════════════════════════
def create_account(self, name: str, birthdate: str) -> None:
"""提交姓名和生日完成账户创建。"""
self._log(f"创建账户: {name} ({birthdate})")
resp = self.session.post(
f"{AUTH_BASE}/api/accounts/create_account",
headers={
"referer": f"{AUTH_BASE}/about-you",
"accept": "application/json",
"content-type": "application/json",
},
data=json.dumps({"name": name, "birthdate": birthdate}),
timeout=30,
)
if resp.status_code != 200:
raise RuntimeError(f"创建账户失败: HTTP {resp.status_code} {resp.text[:300]}")
self._log("账户创建成功")
# ══════════════════════════════════════════════════════════════════
# 步骤 9注册后重新 OAuth 登录
# ══════════════════════════════════════════════════════════════════
def login_after_register(
self, email: str, password: str, otp_code: str = ""
) -> OAuthStart:
"""
注册完成后重走 OAuth 登录流程。
注册阶段的 session 不含 workspace 信息,必须重新走一次
OAuth 登录获取 oai-client-auth-session Cookie。
Returns:
登录阶段的 OAuthStart含 code_verifier 等,用于步骤 12 Token 交换)。
"""
self._log("=" * 40)
self._log("开始 OAuth 登录(获取 workspace...")
# 9-1. 访问新 OAuth URL
login_oauth = generate_oauth_url()
self.session.get(login_oauth.auth_url, timeout=15)
login_did = self.session.cookies.get("oai-did") or self._device_id or ""
self._log(f"登录阶段 oai-did: {login_did[:16]}..." if login_did else "登录阶段 oai-did: (空)")
# 9-2. 获取登录阶段 Sentinel
login_sentinel = self._fetch_sentinel_token(login_did)
# 9-3. 提交邮箱screen_hint=login
login_email_resp = self.session.post(
f"{AUTH_BASE}/api/accounts/authorize/continue",
headers={
"referer": f"{AUTH_BASE}/sign-in",
"accept": "application/json",
"content-type": "application/json",
"openai-sentinel-token": login_sentinel,
},
data=json.dumps({
"username": {"value": email, "kind": "email"},
"screen_hint": "login",
}),
timeout=30,
)
if login_email_resp.status_code != 200:
raise RuntimeError(f"登录提交邮箱失败: HTTP {login_email_resp.status_code}")
page_type = (login_email_resp.json().get("page") or {}).get("type", "")
self._log(f"登录页面类型: {page_type}")
# 9-4. 提交密码login_password 页面)
if "password" in page_type:
self._log("提交密码...")
pwd_resp = self.session.post(
f"{AUTH_BASE}/api/accounts/password/verify",
headers={
"referer": f"{AUTH_BASE}/log-in/password",
"accept": "application/json",
"content-type": "application/json",
"openai-sentinel-token": login_sentinel,
},
data=json.dumps({"password": password}),
timeout=30,
)
if pwd_resp.status_code != 200:
raise RuntimeError(f"登录密码验证失败: HTTP {pwd_resp.status_code}")
page_type = (pwd_resp.json().get("page") or {}).get("type", "")
self._log(f"密码验证后页面类型: {page_type}")
# 9-5. 二次 OTP复用注册阶段验证码
if "otp" in page_type or "verification" in page_type:
if not otp_code:
raise RuntimeError("登录需要二次 OTP 验证,但未提供验证码")
self._log(f"提交登录二次验证码: {otp_code}")
# 触发发信请求以满足后端状态机(可忽略报错)
try:
self.session.post(
f"{AUTH_BASE}/api/accounts/passwordless/send-otp",
headers={
"referer": f"{AUTH_BASE}/log-in/password",
"accept": "application/json",
"content-type": "application/json",
},
timeout=10,
)
except Exception:
pass
otp_resp = self.session.post(
f"{AUTH_BASE}/api/accounts/email-otp/validate",
headers={
"referer": f"{AUTH_BASE}/email-verification",
"accept": "application/json",
"content-type": "application/json",
"openai-sentinel-token": login_sentinel,
},
data=json.dumps({"code": otp_code}),
timeout=30,
)
if otp_resp.status_code != 200:
raise RuntimeError(f"登录二次 OTP 失败: HTTP {otp_resp.status_code} {otp_resp.text[:200]}")
self._log("登录二次验证通过")
self._log("OAuth 登录流程完成")
return login_oauth
# ══════════════════════════════════════════════════════════════════
# 步骤 10解析 workspace_id
# ══════════════════════════════════════════════════════════════════
def extract_workspace_id(self) -> str:
"""从 oai-client-auth-session CookieJWT中解析 workspace_id。"""
auth_cookie = self.session.cookies.get("oai-client-auth-session") or ""
if not auth_cookie:
raise RuntimeError("未找到 oai-client-auth-session Cookie")
# JWT 段遍历workspace 可能在第一段或第二段)
segments = auth_cookie.split(".")
for i in range(min(len(segments), 2)):
data = _decode_jwt_segment(segments[i])
workspaces = data.get("workspaces") or []
if workspaces:
wid = str((workspaces[0] or {}).get("id") or "").strip()
if wid:
self._log(f"成功解析 workspace_id: {wid}")
return wid
# 调试信息
first_data = _decode_jwt_segment(segments[0]) if segments else {}
self._log(f"Cookie 字段: {list(first_data.keys())}")
raise RuntimeError("无法从 Cookie 中解析 workspace_id")
# ══════════════════════════════════════════════════════════════════
# 步骤 11选择 workspace
# ══════════════════════════════════════════════════════════════════
def select_workspace(self, workspace_id: str) -> str:
"""选择 workspace返回 continue_url。"""
self._log(f"选择 workspace: {workspace_id}")
resp = self.session.post(
f"{AUTH_BASE}/api/accounts/workspace/select",
headers={
"referer": f"{AUTH_BASE}/sign-in-with-chatgpt/codex/consent",
"content-type": "application/json",
},
data=json.dumps({"workspace_id": workspace_id}),
timeout=30,
)
if resp.status_code != 200:
raise RuntimeError(f"workspace/select 失败: HTTP {resp.status_code} {resp.text[:300]}")
continue_url = str((resp.json() or {}).get("continue_url") or "").strip()
if not continue_url:
raise RuntimeError("workspace/select 响应缺少 continue_url")
self._log("workspace 选择成功continue_url 已获取")
return continue_url
# ══════════════════════════════════════════════════════════════════
# 步骤 12跟踪重定向链交换 OAuth code → access_token
# ══════════════════════════════════════════════════════════════════
def follow_redirects_and_exchange_token(
self, continue_url: str, oauth_start: OAuthStart
) -> dict:
"""跟踪重定向链,捕获 code= 回调 URL交换 access_token。"""
current_url = continue_url
for hop in range(8):
resp = self.session.get(current_url, allow_redirects=False, timeout=15)
location = resp.headers.get("Location") or ""
if resp.status_code not in (301, 302, 303, 307, 308) or not location:
break
next_url = urllib.parse.urljoin(current_url, location)
self._log(f"重定向 [{hop + 1}] → {next_url[:100]}...")
if "code=" in next_url and "state=" in next_url:
self._log("捕获到 OAuth 回调 URL交换 Token...")
token_json = submit_callback_url(
callback_url=next_url,
expected_state=oauth_start.state,
code_verifier=oauth_start.code_verifier,
redirect_uri=oauth_start.redirect_uri,
proxy_url=self.proxy,
)
return json.loads(token_json)
current_url = next_url
raise RuntimeError("未能在重定向链中捕获到 OAuth 回调 URL含 code= 参数)")

View File

@@ -1,8 +1,13 @@
"""
注册流程引擎 V2
基于 curl_cffi 的注册状态机,注册成功后直接复用同一会话提取 ChatGPT Session。
采用策略模式封装注册核心OAuthPkceRegisterStrategy
走 auth.openai.com OAuth PKCE 直通注册流程。
外部接口与 plugin.py 完全兼容,无需改动邮箱适配层。
"""
import random
import time
import logging
from datetime import datetime
@@ -11,34 +16,211 @@ from typing import Optional, Callable
from core.base_platform import AccountStatus
from platforms.chatgpt.register import RegistrationResult
from .chatgpt_client import ChatGPTClient
from .oauth_client import OAuthClient
from .oauth_pkce_client import OAuthPkceClient
from .utils import generate_random_name, generate_random_birthday
logger = logging.getLogger(__name__)
class EmailServiceAdapter:
"""\u5c06 V1 \u7684 email_service \u9002\u914d\u6210 V2 \u6240\u9700\u7684\u63a5\u7801\u63a5\u53e3\u3002"""
def __init__(self, email_service, email, log_fn):
self.es = email_service
self.email = email
self.log_fn = log_fn
self._used_codes = set()
def wait_for_verification_code(self, email, timeout=60, otp_sent_at=None, exclude_codes=None):
msg = f"\u6b63\u5728\u7b49\u5f85\u90ae\u7bb1 {email} \u7684\u9a8c\u8bc1\u7801 ({timeout}s)..."
self.log_fn(msg)
code = self.es.get_verification_code(
# ---------------------------------------------------------------------------
# 名字/生日数据
# ---------------------------------------------------------------------------
_FIRST_NAMES = [
"James", "John", "Robert", "Michael", "David", "William", "Richard",
"Joseph", "Thomas", "Charles", "Mary", "Patricia", "Jennifer", "Linda",
"Barbara", "Elizabeth", "Susan", "Jessica", "Sarah", "Karen", "Daniel",
"Matthew", "Anthony", "Mark", "Steven", "Andrew", "Paul", "Emily",
"Emma", "Olivia", "Sophia", "Ava", "Isabella", "Mia",
]
_LAST_NAMES = [
"Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller",
"Davis", "Rodriguez", "Martinez", "Anderson", "Taylor", "Thomas",
"Wilson", "Moore", "Jackson", "Martin", "Lee", "Thompson", "White",
]
def _random_name_and_birthday() -> tuple[str, str]:
"""随机生成全名和生日。"""
name = f"{random.choice(_FIRST_NAMES)} {random.choice(_LAST_NAMES)}"
year = random.randint(1985, 2004)
month = random.randint(1, 12)
day = random.randint(1, 28)
return name, f"{year}-{month:02d}-{day:02d}"
# ---------------------------------------------------------------------------
# 策略OAuth PKCE 注册策略
# ---------------------------------------------------------------------------
class OAuthPkceRegisterStrategy:
"""
OAuth PKCE 注册策略(策略模式)
完整注册流程12 步):
1. 检查 IP 地区
2. 访问 OAuth 授权 URL获取 oai-did Cookie
3. 获取 Sentinel Token
4. 提交邮箱 (authorize/continue)
5. 提交密码 (user/register)
6. 发送 OTP (email-otp/send)
7. 验证 OTP (email-otp/validate)
8. 创建账户 (create_account)
9. 注册后重新 OAuth 登录
10. 解析 workspace_id
11. 选择 workspace
12. 跟踪重定向链,交换 OAuth code → access_token
"""
def execute(
self,
client: OAuthPkceClient,
email_service,
email: str,
password: str,
log: Callable[[str], None],
) -> RegistrationResult:
result = RegistrationResult(success=False)
# ── 步骤 1IP 检查 ──────────────────────────────────────────────
log("步骤 1/12: 检查 IP 地区...")
client.check_ip_region()
# ── 步骤 2创建邮箱 ────────────────────────────────────────────
log("步骤 2/12: 创建邮箱接码订单...")
email_data = email_service.create_email()
email_addr = email or (email_data.get("email") if email_data else None)
if not email_addr:
result.error_message = "创建邮箱失败"
return result
result.email = email_addr
result.password = password
log(f"邮箱: {email_addr}")
# ── 步骤 3初始化 OAuth 会话 ────────────────────────────────────
log("步骤 3/12: 访问 OAuth 授权 URL获取 oai-did...")
client.init_oauth_session()
# ── 步骤 4获取 Sentinel Token ──────────────────────────────────
log("步骤 4/12: 获取 Sentinel Token...")
client.refresh_sentinel()
# ── 步骤 5提交邮箱 ────────────────────────────────────────────
log("步骤 5/12: 提交邮箱...")
client.submit_email(email_addr)
# ── 步骤 6提交密码 ────────────────────────────────────────────
log("步骤 6/12: 提交密码...")
continue_url = client.submit_password(email_addr, password)
# ── 步骤 7发送 OTP + 等待验证码 ────────────────────────────────
log("步骤 7/12: 发送验证码...")
otp_sent_at = time.time()
client.send_otp(continue_url)
log("步骤 7/12: 等待邮箱验证码(最多 120s...")
otp_code = email_service.wait_for_verification_code(
email_addr,
timeout=120,
otp_sent_at=otp_sent_at,
)
if not otp_code:
result.error_message = "未收到邮箱验证码"
return result
log(f"验证码: {otp_code}")
# ── 步骤 8验证 OTP ─────────────────────────────────────────────
log("步骤 8/12: 验证 OTP...")
client.validate_otp(otp_code)
# ── 步骤 9创建账户 ────────────────────────────────────────────
log("步骤 9/12: 填写账户信息(姓名/生日)...")
name, birthdate = _random_name_and_birthday()
log(f"账户信息: {name} ({birthdate})")
client.create_account(name, birthdate)
# ── 步骤 10注册后重新 OAuth 登录 ───────────────────────────────
log("步骤 10/12: 注册后重新 OAuth 登录...")
login_oauth = client.login_after_register(email_addr, password, otp_code)
# ── 步骤 11解析 workspace_id ───────────────────────────────────
log("步骤 11/12: 解析 workspace_id...")
workspace_id = client.extract_workspace_id()
result.workspace_id = workspace_id
# ── 步骤 12选择 workspace → 交换 Token ─────────────────────────
log("步骤 12/12: 选择 workspace...")
ws_continue_url = client.select_workspace(workspace_id)
log("步骤 12/12: 跟踪重定向链,交换 OAuth Token...")
token_data = client.follow_redirects_and_exchange_token(ws_continue_url, login_oauth)
# ── 组装结果 ─────────────────────────────────────────────────────
result.success = True
result.access_token = token_data.get("access_token", "")
result.refresh_token = token_data.get("refresh_token", "")
result.id_token = token_data.get("id_token", "")
result.account_id = token_data.get("account_id", "") or workspace_id
result.metadata = {
"type": token_data.get("type", "codex"),
"expired": token_data.get("expired", ""),
}
log("=" * 50)
log("注册流程成功完成!")
log("=" * 50)
return result
# ---------------------------------------------------------------------------
# Email Service 适配器(为 OAuthPkceClient 提供统一接码接口)
# ---------------------------------------------------------------------------
class _EmailServiceAdapter:
"""
将 V1 email_service含 create_email / get_verification_code
适配为 OAuthPkceRegisterStrategy 期待的接口。
接口:
- create_email() → {'email': str, ...}
- wait_for_verification_code(email, timeout, otp_sent_at) → str | None
"""
def __init__(self, email_service, log: Callable[[str], None]):
self._svc = email_service
self._log = log
self._used_codes: set = set()
def create_email(self, config=None):
return self._svc.create_email(config)
def wait_for_verification_code(
self, email: str, timeout: int = 120, otp_sent_at=None
) -> Optional[str]:
self._log(f"等待邮箱 {email} 的验证码timeout={timeout}s...")
code = self._svc.get_verification_code(
email=email,
timeout=timeout,
otp_sent_at=otp_sent_at,
exclude_codes=exclude_codes or self._used_codes,
exclude_codes=self._used_codes,
)
if code:
self._used_codes.add(code)
self.log_fn(f"\u6210\u529f\u83b7\u53d6\u9a8c\u8bc1\u7801: {code}")
self._log(f"成功获取验证码: {code}")
return code
# ---------------------------------------------------------------------------
# 注册引擎(对外暴露给 plugin.py接口完全向后兼容
# ---------------------------------------------------------------------------
class RegistrationEngineV2:
"""
注册引擎 V2外部接口层
plugin.py 通过此类发起注册,不感知内部策略变化。
"""
def __init__(
self,
email_service,
@@ -56,12 +238,12 @@ class RegistrationEngineV2:
self.task_uuid = task_uuid
self.max_retries = max(1, int(max_retries or 1))
self.extra_config = dict(extra_config or {})
self.email = None
self.password = None
self.logs = []
def _log(self, message: str, level: str = "info"):
self.email: Optional[str] = None
self.password: Optional[str] = None
self.logs: list[str] = []
def _log(self, message: str, level: str = "info") -> None:
timestamp = datetime.now().strftime("%H:%M:%S")
log_message = f"[{timestamp}] {message}"
self.logs.append(log_message)
@@ -72,199 +254,63 @@ class RegistrationEngineV2:
else:
logger.info(log_message)
def _format_oauth_failure(self, oauth_client: OAuthClient) -> str:
detail = str(getattr(oauth_client, "last_error", "") or "").strip()
if not detail:
detail = "获取最终 OAuth Tokens 失败"
return f"账号已创建成功,但 {detail}"
def run(self) -> RegistrationResult:
"""执行注册流程,支持整流程重试。"""
result = RegistrationResult(success=False, logs=self.logs)
last_error = ""
def _should_retry(self, message: str) -> bool:
for attempt in range(self.max_retries):
try:
if attempt > 0:
self._log(f"整流程重试 {attempt + 1}/{self.max_retries}...")
time.sleep(2)
adapter = _EmailServiceAdapter(self.email_service, self._log)
client = OAuthPkceClient(proxy=self.proxy_url, log_fn=self._log)
strategy = OAuthPkceRegisterStrategy()
result = strategy.execute(
client=client,
email_service=adapter,
email=self.email or "",
password=self.password or "AAb1234567890!",
log=self._log,
)
result.logs = self.logs
if result.success:
return result
last_error = result.error_message or "注册失败"
self._log(f"注册失败: {last_error}", "error")
if attempt < self.max_retries - 1 and self._should_retry(last_error):
self._log("准备重试...")
continue
return result
except Exception as e:
last_error = str(e)
self._log(f"注册异常: {last_error}", "error")
if attempt < self.max_retries - 1 and self._should_retry(last_error):
continue
result.error_message = last_error
result.logs = self.logs
return result
result.error_message = last_error or "注册失败"
result.logs = self.logs
return result
@staticmethod
def _should_retry(message: str) -> bool:
"""判断是否值得重试。"""
text = str(message or "").lower()
retriable_markers = [
"tls",
"ssl",
"curl: (35)",
"预授权被拦截",
"authorize",
"registration_disallowed",
"http 400",
"创建账号失败",
"未获取到 authorization code",
"consent",
"workspace",
"organization",
"otp",
"验证码",
"session",
"accessToken",
"next-auth",
"tls", "ssl", "curl: (35)",
"ip 地区检查失败", "sentinel",
"timeout", "timed out", "connection",
"验证码", "otp",
]
return any(marker.lower() in text for marker in retriable_markers)
def run(self) -> RegistrationResult:
result = RegistrationResult(success=False, logs=self.logs)
try:
last_error = ""
for attempt in range(self.max_retries):
try:
if attempt == 0:
self._log("=" * 60)
self._log("开始注册流程 V2 (Session 复用直取 AccessToken)")
self._log(f"请求模式: {self.browser_mode}")
self._log("=" * 60)
else:
self._log(f"整流程重试 {attempt + 1}/{self.max_retries} ...")
time.sleep(1)
# 1. 创建邮箱
email_data = self.email_service.create_email()
email_addr = self.email or (email_data.get('email') if email_data else None)
if not email_addr:
result.error_message = "创建邮箱失败"
return result
result.email = email_addr
pwd = self.password or "AAb1234567890!"
result.password = pwd
# 随机姓名、生日
first_name, last_name = generate_random_name()
birthdate = generate_random_birthday()
self._log(f"邮箱: {email_addr}, 密码: {pwd}")
self._log(f"注册信息: {first_name} {last_name}, 生日: {birthdate}")
# 使用包装器为底层客户端提供接码服务
skymail_adapter = EmailServiceAdapter(self.email_service, email_addr, self._log)
# 2. 初始化 V2 客户端
chatgpt_client = ChatGPTClient(
proxy=self.proxy_url,
verbose=False,
browser_mode=self.browser_mode,
)
chatgpt_client._log = self._log
self._log("步骤 1/2: 执行注册状态机...")
success, msg = chatgpt_client.register_complete_flow(
email_addr, pwd, first_name, last_name, birthdate, skymail_adapter
)
if not success:
last_error = f"注册流失败: {msg}"
if attempt < self.max_retries - 1 and self._should_retry(msg):
self._log(f"注册流失败,准备整流程重试: {msg}")
continue
result.error_message = last_error
return result
self._log("步骤 2/2: 优先复用注册会话提取 ChatGPT Session / AccessToken...")
session_ok, session_result = chatgpt_client.reuse_session_and_get_tokens()
if session_ok:
self._log("Token 提取完成!")
result.success = True
result.access_token = session_result.get("access_token", "")
result.session_token = session_result.get("session_token", "")
result.account_id = (
session_result.get("account_id")
or session_result.get("user_id")
or ("v2_acct_" + chatgpt_client.device_id[:8])
)
result.workspace_id = session_result.get("workspace_id", "")
result.metadata = {
"auth_provider": session_result.get("auth_provider", ""),
"expires": session_result.get("expires", ""),
"user_id": session_result.get("user_id", ""),
"user": session_result.get("user") or {},
"account": session_result.get("account") or {},
}
if result.workspace_id:
self._log(f"Session Workspace ID: {result.workspace_id}")
self._log("=" * 60)
self._log("注册流程成功结束!")
self._log("=" * 60)
return result
self._log(f"复用会话失败,回退到 OAuth 登录补全流程: {session_result}")
tokens = None
oauth_client = None
for oauth_attempt in range(2):
if oauth_attempt > 0:
self._log(f"同账号 OAuth 重试 {oauth_attempt + 1}/2 ...")
time.sleep(1)
oauth_client = OAuthClient(
config=self.extra_config,
proxy=self.proxy_url,
verbose=False,
browser_mode=self.browser_mode,
)
oauth_client._log = self._log
oauth_client.session = chatgpt_client.session
tokens = oauth_client.login_and_get_tokens(
email_addr,
pwd,
chatgpt_client.device_id,
chatgpt_client.ua,
chatgpt_client.sec_ch_ua,
chatgpt_client.impersonate,
skymail_adapter,
)
if tokens and tokens.get("access_token"):
break
if oauth_client.last_error and "add_phone" in oauth_client.last_error:
break
if tokens and tokens.get("access_token"):
self._log("OAuth 回退补全成功!")
result.success = True
result.access_token = tokens.get("access_token")
result.refresh_token = tokens.get("refresh_token")
result.id_token = tokens.get("id_token")
result.account_id = "v2_acct_" + chatgpt_client.device_id[:8]
session_data = oauth_client._decode_oauth_session_cookie()
if session_data:
workspaces = session_data.get("workspaces", [])
if workspaces:
result.workspace_id = str((workspaces[0] or {}).get("id") or "")
self._log(f"成功萃取 Workspace ID: {result.workspace_id}")
session_cookie = None
for cookie in oauth_client.session.cookies.jar:
if cookie.name == "__Secure-next-auth.session-token":
session_cookie = cookie.value
break
result.session_token = session_cookie
self._log("=" * 60)
self._log("注册流程成功结束!")
self._log("=" * 60)
return result
last_error = self._format_oauth_failure(oauth_client)
result.error_message = last_error
return result
except Exception as attempt_error:
last_error = str(attempt_error)
if attempt < self.max_retries - 1 and self._should_retry(last_error):
self._log(f"本轮出现异常,准备整流程重试: {last_error}")
continue
raise
result.error_message = last_error or "注册失败"
return result
except Exception as e:
self._log(f"V2 注册全流程执行异常: {e}", "error")
import traceback
traceback.print_exc()
result.error_message = str(e)
return result
return any(m in text for m in retriable_markers)

View File

@@ -4,7 +4,11 @@ from __future__ import annotations
from typing import Any
from services.chatgpt_sync import persist_cpa_sync_result, upload_chatgpt_account_to_cpa
from services.chatgpt_sync import (
_get_account_extra,
persist_cpa_sync_result,
upload_chatgpt_account_to_cpa,
)
def sync_account(account) -> list[dict[str, Any]]:
@@ -20,7 +24,7 @@ def sync_account(account) -> list[dict[str, Any]]:
a = _A()
a.email = account.email
extra = account.extra or {}
extra = _get_account_extra(account)
a.access_token = extra.get("access_token") or account.token
a.refresh_token = extra.get("refresh_token", "")
a.id_token = extra.get("id_token", "")
@@ -40,7 +44,7 @@ def sync_account(account) -> list[dict[str, Any]]:
codex_proxy_url = str(config_store.get("codex_proxy_url", "") or "").strip()
if codex_proxy_url:
upload_type = str(config_store.get("codex_proxy_upload_type", "at") or "at").strip().lower()
extra = account.extra or {}
extra = _get_account_extra(account)
class _CP:
pass