Files
any-auto-register/platforms/chatgpt/oauth_client.py

3308 lines
125 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
OAuth 客户端模块 - 处理 Codex OAuth 登录流程
"""
import time
import secrets
import uuid
import json
import random
from urllib.parse import urlparse, parse_qs
from core.proxy_utils import build_requests_proxy_config
from core.task_runtime import TaskInterruption
try:
from curl_cffi import requests as curl_requests
except ImportError:
import requests as curl_requests
from .phone_service import SMSToMePhoneService
from .utils import (
FlowState,
build_browser_headers,
describe_flow_state,
extract_flow_state,
generate_datadog_trace,
generate_pkce,
normalize_flow_url,
random_delay,
seed_oai_device_cookie,
)
from .sentinel_token import build_sentinel_token
from .sentinel_browser import get_sentinel_token_via_browser
class OAuthClient:
"""OAuth 客户端 - 用于获取 Access Token 和 Refresh Token"""
def __init__(self, config, proxy=None, verbose=True, browser_mode="protocol"):
"""
初始化 OAuth 客户端
Args:
config: 配置字典
proxy: 代理地址
verbose: 是否输出详细日志
browser_mode: protocol | headless | headed
"""
self.config = dict(config or {})
self.oauth_issuer = self.config.get("oauth_issuer", "https://auth.openai.com")
self.oauth_client_id = self.config.get(
"oauth_client_id", "app_EMoamEEZ73f0CkXaXp7hrann"
)
self.oauth_redirect_uri = self.config.get(
"oauth_redirect_uri", "http://localhost:1455/auth/callback"
)
self.proxy = proxy
self.verbose = verbose
self.browser_mode = browser_mode or "protocol"
self.last_error = ""
self.last_workspace_id = ""
self.last_state = FlowState()
self.last_stage = ""
self.device_id = ""
self.ua = ""
self.sec_ch_ua = ""
self.impersonate = ""
# 创建 session
self.session = curl_requests.Session()
if self.proxy:
self.session.proxies = build_requests_proxy_config(self.proxy)
def adopt_browser_context(
self,
session,
*,
device_id: str = "",
user_agent: str | None = None,
sec_ch_ua: str | None = None,
accept_language: str | None = None,
):
"""承接前序浏览器上下文,延续已建立的 cookie / session。"""
if session is not None:
self.session = session
if self.proxy:
try:
if not getattr(self.session, "proxies", None):
self.session.proxies = build_requests_proxy_config(self.proxy)
except Exception:
pass
header_updates = {}
if user_agent:
header_updates["User-Agent"] = user_agent
if sec_ch_ua:
header_updates["sec-ch-ua"] = sec_ch_ua
if accept_language:
header_updates["Accept-Language"] = accept_language
if header_updates:
try:
self.session.headers.update(header_updates)
except Exception:
pass
if device_id:
self.device_id = str(device_id or "").strip()
seed_oai_device_cookie(self.session, device_id)
self._log(f"已接入前序浏览器上下文: device_id={device_id}")
if user_agent:
self.ua = str(user_agent or "").strip()
if sec_ch_ua:
self.sec_ch_ua = str(sec_ch_ua or "").strip()
def _log(self, msg):
"""输出日志"""
if self.verbose:
print(f" [OAuth] {msg}")
def _enter_stage(self, stage: str, detail: str = ""):
self.last_stage = str(stage or "").strip()
if self.last_stage:
message = f"[stage={self.last_stage}]"
if detail:
message += f" {detail}"
self._log(message)
def _set_error(self, message):
raw_message = str(message or "").strip()
if self.last_stage and raw_message and f"[stage={self.last_stage}]" not in raw_message:
self.last_error = f"[stage={self.last_stage}] {raw_message}"
else:
self.last_error = raw_message
if self.last_error:
self._log(self.last_error)
def _browser_pause(self, low=0.15, high=0.4):
"""在 headed 模式下注入轻微延迟,模拟真实浏览器操作节奏。"""
if self.browser_mode == "headed":
random_delay(low, high)
@staticmethod
def _random_chrome_fingerprint():
profiles = [
{
"major": 131,
"impersonate": "chrome131",
"build": 6778,
"patch_range": (69, 205),
"sec_ch_ua": '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
},
{
"major": 133,
"impersonate": "chrome133a",
"build": 6943,
"patch_range": (33, 153),
"sec_ch_ua": '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
},
{
"major": 136,
"impersonate": "chrome136",
"build": 7103,
"patch_range": (48, 175),
"sec_ch_ua": '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"',
},
]
profile = random.choice(profiles)
major = profile["major"]
build = profile["build"]
patch = random.randint(*profile["patch_range"])
full_ver = f"{major}.0.{build}.{patch}"
ua = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
f"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{full_ver} Safari/537.36"
)
return ua, profile["sec_ch_ua"], profile["impersonate"]
def _ensure_oauth_fingerprint(self, user_agent, sec_ch_ua, impersonate):
if user_agent and sec_ch_ua and impersonate:
return user_agent, sec_ch_ua, impersonate
ua, ch_ua, imp = self._random_chrome_fingerprint()
user_agent = user_agent or ua
sec_ch_ua = sec_ch_ua or ch_ua
impersonate = impersonate or imp
self.ua = str(user_agent or "").strip()
self.sec_ch_ua = str(sec_ch_ua or "").strip()
self.impersonate = str(impersonate or "").strip()
try:
self.session.headers.update(
{
"User-Agent": user_agent,
"Accept-Language": random.choice(
[
"en-US,en;q=0.9",
"en-US,en;q=0.9,zh-CN;q=0.8",
"en,en-US;q=0.9",
"en-US,en;q=0.8",
]
),
"sec-ch-ua": sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-ch-ua-arch": '"x86"',
"sec-ch-ua-bitness": '"64"',
}
)
except Exception:
pass
self._log(
f"OAuth 指纹: ua={user_agent.split('Chrome/')[-1][:24]}..., sec-ch-ua={sec_ch_ua}, impersonate={impersonate}"
)
return user_agent, sec_ch_ua, impersonate
@staticmethod
def _iter_text_fragments(value):
if isinstance(value, str):
text = value.strip()
if text:
yield text
return
if isinstance(value, dict):
for item in value.values():
yield from OAuthClient._iter_text_fragments(item)
return
if isinstance(value, (list, tuple, set)):
for item in value:
yield from OAuthClient._iter_text_fragments(item)
@classmethod
def _should_blacklist_phone_failure(cls, detail="", state: FlowState | None = None):
fragments = [str(detail or "").strip()]
if state is not None:
fragments.extend(
cls._iter_text_fragments(
{
"page_type": state.page_type,
"continue_url": state.continue_url,
"current_url": state.current_url,
"payload": state.payload,
"raw": state.raw,
}
)
)
combined = " | ".join(fragment for fragment in fragments if fragment).lower()
if not combined:
return False
non_blacklist_markers = (
"whatsapp",
"未收到短信验证码",
"手机号验证码错误",
"phone-otp/resend",
"phone-otp/validate 异常",
"phone-otp/validate 响应不是 json",
"phone-otp/validate 失败",
"timeout",
"timed out",
"network",
"connection",
"proxy",
"ssl",
"tls",
"captcha",
"too many phone",
"too many phone numbers",
"too many verification requests",
"验证请求过多",
"接受短信次数过多",
"session limit",
"rate limit",
)
if any(marker in combined for marker in non_blacklist_markers):
return False
blacklist_markers = (
"phone number is invalid",
"invalid phone number",
"invalid phone",
"phone number invalid",
"sms verification failed",
"send sms verification failed",
"unable to send sms",
"not a valid mobile number",
"unsupported phone number",
"phone number not supported",
"carrier not supported",
"电话号码无效",
"手机号无效",
"发送短信验证失败",
"号码无效",
"号码不支持",
"手机号不支持",
)
return any(marker in combined for marker in blacklist_markers)
def _blacklist_phone_if_needed(
self, phone_service, entry, detail="", state: FlowState | None = None
):
if not entry or not self._should_blacklist_phone_failure(detail, state):
return False
try:
phone_service.mark_blacklisted(entry.phone)
self._log(f"已将手机号加入黑名单: {entry.phone}")
return True
except Exception as e:
self._log(f"写入手机号黑名单失败: {e}")
return False
def _headers(
self,
url,
*,
user_agent=None,
sec_ch_ua=None,
accept,
referer=None,
origin=None,
content_type=None,
navigation=False,
fetch_mode=None,
fetch_dest=None,
fetch_site=None,
extra_headers=None,
):
accept_language = None
try:
accept_language = self.session.headers.get("Accept-Language")
except Exception:
accept_language = None
return build_browser_headers(
url=url,
user_agent=user_agent or "Mozilla/5.0",
sec_ch_ua=sec_ch_ua,
accept=accept,
accept_language=accept_language or "en-US,en;q=0.9",
referer=referer,
origin=origin,
content_type=content_type,
navigation=navigation,
fetch_mode=fetch_mode,
fetch_dest=fetch_dest,
fetch_site=fetch_site,
headed=self.browser_mode == "headed",
extra_headers=extra_headers,
)
def _state_from_url(self, url, method="GET"):
state = extract_flow_state(
current_url=normalize_flow_url(url, auth_base=self.oauth_issuer),
auth_base=self.oauth_issuer,
default_method=method,
)
if method:
state.method = str(method).upper()
return state
def _state_from_payload(self, data, current_url=""):
return extract_flow_state(
data=data,
current_url=current_url,
auth_base=self.oauth_issuer,
)
def _get_cookie_value(self, name, domain_hint=None):
"""读取当前会话中的 Cookie。"""
try:
for cookie in self.session.cookies:
cookie_name = cookie.name if hasattr(cookie, "name") else str(cookie)
if cookie_name != name:
continue
cookie_domain = cookie.domain if hasattr(cookie, "domain") else ""
if domain_hint and domain_hint not in (cookie_domain or ""):
continue
return cookie.value if hasattr(cookie, "value") else ""
except Exception:
pass
return ""
def _state_signature(self, state: FlowState):
return (
state.page_type or "",
state.method or "",
state.continue_url or "",
state.current_url or "",
)
def _extract_code_from_state(self, state: FlowState):
for candidate in (
state.continue_url,
state.current_url,
(state.payload or {}).get("url", ""),
):
code = self._extract_code_from_url(candidate)
if code:
return code
return None
def _state_is_login_password(self, state: FlowState):
return state.page_type == "login_password"
def _state_is_create_account_password(self, state: FlowState):
target = f"{state.continue_url} {state.current_url}".lower()
return state.page_type == "create_account_password" or "create-account/password" in target
def _state_is_email_otp(self, state: FlowState):
target = f"{state.continue_url} {state.current_url}".lower()
return (
state.page_type == "email_otp_verification"
or "email-verification" in target
or "email-otp" in target
)
def _state_is_add_phone(self, state: FlowState):
target = f"{state.continue_url} {state.current_url}".lower()
return state.page_type == "add_phone" or "add-phone" in target
def _state_is_about_you(self, state: FlowState):
target = f"{state.continue_url} {state.current_url}".lower()
return state.page_type == "about_you" or "about-you" in target
def _state_requires_navigation(self, state: FlowState):
method = (state.method or "GET").upper()
if method != "GET":
return False
if (
state.source == "api"
and state.current_url
and state.page_type not in {"login_password", "email_otp_verification"}
):
return True
if state.page_type == "external_url" and state.continue_url:
return True
if state.continue_url and state.continue_url != state.current_url:
return True
return False
def _state_supports_workspace_resolution(self, state: FlowState):
target = f"{state.continue_url} {state.current_url}".lower()
if state.page_type in {
"consent",
"workspace_selection",
"organization_selection",
}:
return True
if any(
marker in target
for marker in (
"sign-in-with-chatgpt",
"consent",
"workspace",
"organization",
)
):
return True
session_data = self._decode_oauth_session_cookie() or {}
return bool(session_data.get("workspaces"))
def _follow_flow_state(
self,
state: FlowState,
referer=None,
user_agent=None,
impersonate=None,
max_hops=16,
):
"""跟随服务端返回的 continue_url / current_url返回新的状态或 authorization code。"""
import re
current_url = state.continue_url or state.current_url
last_url = current_url or ""
referer_url = referer
if not current_url:
return None, state
initial_code = self._extract_code_from_url(current_url)
if initial_code:
return initial_code, self._state_from_url(current_url)
for hop in range(max_hops):
try:
headers = self._headers(
current_url,
user_agent=user_agent,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
referer=referer_url,
navigation=True,
)
kwargs = {"headers": headers, "allow_redirects": False, "timeout": 30}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.3)
r = self.session.get(current_url, **kwargs)
last_url = str(r.url)
self._log(f"follow[{hop + 1}] {r.status_code} {last_url[:120]}")
except Exception as e:
maybe_localhost = re.search(r"(https?://localhost[^\s\'\"]+)", str(e))
if maybe_localhost:
location = maybe_localhost.group(1)
code = self._extract_code_from_url(location)
if code:
self._log("从 localhost 异常提取到 authorization code")
return code, self._state_from_url(location)
self._log(f"follow[{hop + 1}] 异常: {str(e)[:160]}")
return None, self._state_from_url(last_url or current_url)
code = self._extract_code_from_url(last_url)
if code:
return code, self._state_from_url(last_url)
if r.status_code in (301, 302, 303, 307, 308):
location = normalize_flow_url(
r.headers.get("Location", ""), auth_base=self.oauth_issuer
)
if not location:
return None, self._state_from_url(last_url or current_url)
code = self._extract_code_from_url(location)
if code:
return code, self._state_from_url(location)
referer_url = last_url or referer_url
current_url = location
continue
content_type = (r.headers.get("content-type", "") or "").lower()
if "application/json" in content_type:
try:
next_state = self._state_from_payload(
r.json(), current_url=last_url or current_url
)
except Exception:
next_state = self._state_from_url(last_url or current_url)
else:
next_state = self._state_from_url(last_url or current_url)
return None, next_state
return None, self._state_from_url(last_url or current_url)
def _bootstrap_oauth_session(
self,
authorize_url,
authorize_params,
device_id=None,
user_agent=None,
sec_ch_ua=None,
impersonate=None,
):
"""启动 OAuth 会话,确保 auth 域上的 login_session 已建立。"""
if device_id:
seed_oai_device_cookie(self.session, device_id)
has_login_session = False
authorize_final_url = ""
try:
headers = self._headers(
authorize_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
referer="https://chatgpt.com/",
navigation=True,
)
kwargs = {
"params": authorize_params,
"headers": headers,
"allow_redirects": True,
"timeout": 30,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
r = self.session.get(authorize_url, **kwargs)
authorize_final_url = str(r.url)
redirects = len(getattr(r, "history", []) or [])
self._log(f"/oauth/authorize -> {r.status_code}, redirects={redirects}")
has_login_session = any(
(cookie.name if hasattr(cookie, "name") else str(cookie))
== "login_session"
for cookie in self.session.cookies
)
self._log(f"login_session: {'已获取' if has_login_session else '未获取'}")
except Exception as e:
self._log(f"/oauth/authorize 异常: {e}")
if has_login_session:
return authorize_final_url
self._log("未获取到 login_session尝试 /api/oauth/oauth2/auth...")
try:
oauth2_url = f"{self.oauth_issuer}/api/oauth/oauth2/auth"
kwargs = {
"params": authorize_params,
"headers": self._headers(
oauth2_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
referer="https://chatgpt.com/",
navigation=True,
),
"allow_redirects": True,
"timeout": 30,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
r2 = self.session.get(oauth2_url, **kwargs)
authorize_final_url = str(r2.url)
redirects2 = len(getattr(r2, "history", []) or [])
self._log(
f"/api/oauth/oauth2/auth -> {r2.status_code}, redirects={redirects2}"
)
has_login_session = any(
(cookie.name if hasattr(cookie, "name") else str(cookie))
== "login_session"
for cookie in self.session.cookies
)
self._log(
f"login_session(重试): {'已获取' if has_login_session else '未获取'}"
)
except Exception as e:
self._log(f"/api/oauth/oauth2/auth 异常: {e}")
return authorize_final_url
def _bootstrap_chatgpt_entry(
self,
email: str,
device_id: str,
*,
user_agent=None,
sec_ch_ua=None,
impersonate=None,
) -> str:
"""模拟注册链路一致的 ChatGPT 首页 -> CSRF -> signin/openai。"""
homepage_url = "https://chatgpt.com/"
csrf_url = "https://chatgpt.com/api/auth/csrf"
signin_url = "https://chatgpt.com/api/auth/signin/openai"
try:
self._log("force_chatgpt_entry: 访问 ChatGPT 首页...")
self._browser_pause()
r_home = self.session.get(
homepage_url,
headers=self._headers(
homepage_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
navigation=True,
),
allow_redirects=True,
timeout=30,
)
self._log(f"force_chatgpt_entry: 首页状态 {r_home.status_code}")
except Exception as e:
self._log(f"force_chatgpt_entry: 首页访问异常: {e}")
csrf_token = ""
try:
self._log("force_chatgpt_entry: 获取 CSRF token...")
r_csrf = self.session.get(
csrf_url,
headers=self._headers(
csrf_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=homepage_url,
fetch_site="same-origin",
),
timeout=30,
)
if r_csrf.status_code == 200:
csrf_token = (r_csrf.json() or {}).get("csrfToken", "") or ""
if csrf_token:
self._log(f"force_chatgpt_entry: CSRF token={csrf_token[:16]}...")
except Exception as e:
self._log(f"force_chatgpt_entry: 获取 CSRF 异常: {e}")
authorize_url = ""
try:
self._log("force_chatgpt_entry: 提交邮箱获取 authorize URL...")
params = {
"prompt": "login",
"ext-oai-did": device_id,
"auth_session_logging_id": str(uuid.uuid4()),
"screen_hint": "login_or_signup",
"login_hint": email,
}
form_data = {
"callbackUrl": "https://chatgpt.com/",
"csrfToken": csrf_token,
"json": "true",
}
r_signin = self.session.post(
signin_url,
params=params,
data=form_data,
headers=self._headers(
signin_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=homepage_url,
origin="https://chatgpt.com",
content_type="application/x-www-form-urlencoded",
fetch_site="same-origin",
),
timeout=30,
)
if r_signin.status_code == 200:
authorize_url = (r_signin.json() or {}).get("url", "") or ""
if authorize_url:
self._log("force_chatgpt_entry: 已获取 authorize URL")
else:
self._log(
f"force_chatgpt_entry: authorize URL 获取失败 {r_signin.status_code}"
)
except Exception as e:
self._log(f"force_chatgpt_entry: 提交邮箱异常: {e}")
if not authorize_url:
return ""
try:
self._log("force_chatgpt_entry: 访问 authorize URL...")
self._browser_pause()
kwargs = {
"headers": self._headers(
authorize_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
referer=homepage_url,
navigation=True,
),
"allow_redirects": True,
"timeout": 30,
}
if impersonate:
kwargs["impersonate"] = impersonate
r_auth = self.session.get(authorize_url, **kwargs)
final_url = str(r_auth.url)
self._log(
f"force_chatgpt_entry: authorize 最终跳转 {final_url[:160]}"
)
return final_url
except Exception as e:
self._log(f"force_chatgpt_entry: 访问 authorize 异常: {e}")
return authorize_url
def _submit_authorize_continue(
self,
email,
device_id,
continue_referer,
*,
user_agent=None,
sec_ch_ua=None,
impersonate=None,
authorize_url=None,
authorize_params=None,
screen_hint=None,
):
"""提交邮箱,获取 OAuth 流程的第一页状态。"""
self._enter_stage("authorize_continue", f"email={email}")
self._log("步骤2: POST /api/accounts/authorize/continue")
self._log(f"authorize_continue: device_id={device_id}")
sentinel_token = None
for _sentinel_attempt in range(2):
sentinel_token = get_sentinel_token_via_browser(
flow="authorize_continue",
proxy=self.proxy,
page_url=continue_referer or f"{self.oauth_issuer}/log-in",
headless=self.browser_mode != "headed",
device_id=device_id,
log_fn=lambda msg: self._log(f"authorize_continue: {msg}"),
)
if sentinel_token:
self._log("authorize_continue: 已通过 Playwright SentinelSDK 获取 token")
break
sentinel_token = build_sentinel_token(
self.session,
device_id,
flow="authorize_continue",
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
)
if sentinel_token:
self._log("authorize_continue: 已通过 HTTP PoW 获取 token")
break
if _sentinel_attempt == 0:
self._log("authorize_continue: sentinel token 获取失败,重试一次...")
if not sentinel_token:
self._set_error("无法获取 sentinel token (authorize_continue)")
return None
request_url = f"{self.oauth_issuer}/api/accounts/authorize/continue"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=continue_referer,
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={
"oai-device-id": device_id,
"openai-sentinel-token": sentinel_token,
},
)
headers.update(generate_datadog_trace())
payload = {"username": {"kind": "email", "value": email}}
if screen_hint:
payload["screen_hint"] = str(screen_hint).strip()
try:
kwargs = {
"json": payload,
"headers": headers,
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
r = self.session.post(request_url, **kwargs)
self._log(f"/authorize/continue -> {r.status_code}")
self._log(
"authorize_continue 响应: "
f"referer={(continue_referer or '')[:100]} "
f"current_url={str(r.url)[:120]}"
)
if (
r.status_code == 400
and "invalid_auth_step" in (r.text or "")
and authorize_url
and authorize_params
):
self._log("invalid_auth_step重新 bootstrap...")
authorize_final_url = self._bootstrap_oauth_session(
authorize_url,
authorize_params,
device_id=device_id,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
)
continue_referer = (
authorize_final_url
if authorize_final_url.startswith(self.oauth_issuer)
else f"{self.oauth_issuer}/log-in"
)
headers["Referer"] = continue_referer
headers["Sec-Fetch-Site"] = "same-origin"
headers.update(generate_datadog_trace())
kwargs = {
"json": payload,
"headers": headers,
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
r = self.session.post(request_url, **kwargs)
self._log(f"/authorize/continue(重试) -> {r.status_code}")
if r.status_code != 200:
self._set_error(f"提交邮箱失败: {r.status_code} - {r.text[:180]}")
return None
data = r.json()
flow_state = self._state_from_payload(
data, current_url=str(r.url) or request_url
)
self._log(describe_flow_state(flow_state))
return flow_state
except Exception as e:
self._set_error(f"提交邮箱异常: {e}")
return None
def _submit_password_verify(
self,
password,
device_id,
*,
user_agent=None,
sec_ch_ua=None,
impersonate=None,
referer=None,
):
"""提交密码,获取下一步状态。"""
self._log("步骤3: POST /api/accounts/password/verify")
self._log(f"password_verify: device_id={device_id}")
sentinel_pwd = get_sentinel_token_via_browser(
flow="password_verify",
proxy=self.proxy,
page_url=referer or f"{self.oauth_issuer}/log-in/password",
headless=self.browser_mode != "headed",
device_id=device_id,
log_fn=lambda msg: self._log(f"password_verify: {msg}"),
)
if sentinel_pwd:
self._log("password_verify: 已通过 Playwright SentinelSDK 获取 token")
else:
sentinel_pwd = build_sentinel_token(
self.session,
device_id,
flow="password_verify",
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
)
if sentinel_pwd:
self._log("password_verify: 已通过 HTTP PoW 获取 token")
else:
self._set_error("无法获取 sentinel token (password_verify)")
return None
request_url = f"{self.oauth_issuer}/api/accounts/password/verify"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=referer or f"{self.oauth_issuer}/log-in/password",
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={
"oai-device-id": device_id,
"openai-sentinel-token": sentinel_pwd,
},
)
headers.update(generate_datadog_trace())
try:
kwargs = {
"json": {"password": password},
"headers": headers,
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
r = self.session.post(request_url, **kwargs)
self._log(f"/password/verify -> {r.status_code}")
if r.status_code != 200:
self._set_error(f"密码验证失败: {r.status_code} - {r.text[:180]}")
return None
data = r.json()
flow_state = self._state_from_payload(
data, current_url=str(r.url) or request_url
)
self._log(f"verify {describe_flow_state(flow_state)}")
return flow_state
except Exception as e:
self._set_error(f"密码验证异常: {e}")
return None
def _send_passwordless_login_otp(
self,
email,
device_id,
*,
user_agent=None,
sec_ch_ua=None,
impersonate=None,
referer=None,
):
"""在 login_password 状态下直接切到 passwordless OTP。"""
self._log("步骤3: 命中 login_password按新链路直接触发 passwordless OTP")
request_url = f"{self.oauth_issuer}/api/accounts/passwordless/send-otp"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=referer or f"{self.oauth_issuer}/log-in/password",
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={
"oai-device-id": device_id,
},
)
headers.update(generate_datadog_trace())
try:
kwargs = {
"headers": headers,
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
r = self.session.post(request_url, **kwargs)
self._log(f"/passwordless/send-otp -> {r.status_code}")
if r.status_code != 200:
self._set_error(f"触发 passwordless OTP 失败: {r.status_code} - {r.text[:180]}")
return None
try:
data = r.json()
except Exception:
data = {}
flow_state = self._state_from_payload(
data,
current_url=str(r.url) or f"{self.oauth_issuer}/email-verification",
)
if not self._state_is_email_otp(flow_state):
flow_state = self._state_from_url(f"{self.oauth_issuer}/email-verification")
self._log(f"passwordless OTP 已触发 {describe_flow_state(flow_state)}")
return flow_state
except Exception as e:
self._set_error(f"触发 passwordless OTP 异常: {e}")
return None
def _submit_signup_register(
self,
email,
password,
device_id,
*,
user_agent=None,
sec_ch_ua=None,
impersonate=None,
referer=None,
):
"""在 OAuth signup 流程中提交邮箱+密码。"""
self._enter_stage("authorize_continue", f"register_user email={email}")
self._log("步骤3: 命中 create_account_password提交注册密码")
request_url = f"{self.oauth_issuer}/api/accounts/user/register"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=referer or f"{self.oauth_issuer}/create-account/password",
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={
"oai-device-id": device_id,
},
)
headers.update(generate_datadog_trace())
sentinel_token = get_sentinel_token_via_browser(
flow="username_password_create",
proxy=self.proxy,
page_url=referer or f"{self.oauth_issuer}/create-account/password",
headless=self.browser_mode != "headed",
device_id=device_id,
log_fn=lambda msg: self._log(f"username_password_create: {msg}"),
)
if sentinel_token:
self._log("username_password_create: 已通过 Playwright SentinelSDK 获取 token")
else:
sentinel_token = build_sentinel_token(
self.session,
device_id,
flow="username_password_create",
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
)
if sentinel_token:
self._log("username_password_create: 已通过 HTTP PoW 获取 token")
if sentinel_token:
headers["openai-sentinel-token"] = sentinel_token
payload = {
"username": email,
"password": password,
}
try:
kwargs = {
"json": payload,
"headers": headers,
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
r = self.session.post(request_url, **kwargs)
self._log(f"/user/register -> {r.status_code}")
if r.status_code != 200:
self._set_error(f"注册失败: {r.status_code} - {r.text[:180]}")
return False
self._log("注册成功")
self._log(
f"signup/register 响应: referer={(referer or '')[:100]} current_url={str(r.url)[:120]}"
)
return True
except Exception as e:
self._set_error(f"注册异常: {e}")
return False
def _send_signup_email_otp(
self,
device_id,
*,
user_agent=None,
sec_ch_ua=None,
impersonate=None,
referer=None,
):
"""在 OAuth signup 流程中触发邮箱验证码。"""
self._enter_stage("otp", "send signup email otp")
self._log("步骤4: 触发注册邮箱 OTP")
request_url = f"{self.oauth_issuer}/api/accounts/email-otp/send"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
referer=referer or f"{self.oauth_issuer}/create-account/password",
navigation=True,
fetch_site="same-origin",
)
headers.update(generate_datadog_trace())
try:
kwargs = {
"headers": headers,
"allow_redirects": True,
"timeout": 30,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
r = self.session.get(request_url, **kwargs)
self._log(f"/email-otp/send -> {r.status_code}")
if r.status_code != 200:
self._set_error(f"发送注册 OTP 失败: {r.status_code} - {r.text[:180]}")
return None
verify_url = f"{self.oauth_issuer}/email-verification"
verify_headers = self._headers(
verify_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
referer=referer or f"{self.oauth_issuer}/create-account/password",
navigation=True,
)
verify_kwargs = {
"headers": verify_headers,
"allow_redirects": True,
"timeout": 30,
}
if impersonate:
verify_kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.25)
r_verify = self.session.get(verify_url, **verify_kwargs)
self._log(f"/email-verification -> {r_verify.status_code}")
content_type = (r_verify.headers.get("content-type", "") or "").lower()
if "application/json" in content_type:
try:
flow_state = self._state_from_payload(
r_verify.json(),
current_url=str(r_verify.url) or verify_url,
)
except Exception:
flow_state = self._state_from_url(str(r_verify.url) or verify_url)
else:
flow_state = self._state_from_url(str(r_verify.url) or verify_url)
if not self._state_is_email_otp(flow_state):
flow_state = self._state_from_url(verify_url)
self._log(f"注册 OTP 已触发 {describe_flow_state(flow_state)}")
return flow_state
except Exception as e:
self._set_error(f"发送注册 OTP 异常: {e}")
return None
def signup_and_get_tokens(
self,
email,
password,
first_name,
last_name,
birthdate,
*,
device_id="",
user_agent=None,
sec_ch_ua=None,
impersonate=None,
skymail_client=None,
allow_phone_verification=False,
signup_source="",
):
"""完成 OAuth 单链注册并换取 refresh token。"""
self.last_error = ""
self.last_workspace_id = ""
self.last_state = FlowState()
self._log(
"开始 OAuth 注册流程..."
+ (f" (source={signup_source})" if signup_source else "")
)
self._log(
"OAuth 注册策略: 单链路 signup -> otp -> about_you -> phone(如需) -> consent/workspace -> token"
)
if not skymail_client:
self._set_error("OAuth 注册流程缺少接码客户端")
return None
device_id = str(device_id or "").strip() or str(uuid.uuid4())
self.device_id = device_id
user_agent, sec_ch_ua, impersonate = self._ensure_oauth_fingerprint(
user_agent, sec_ch_ua, impersonate
)
code_verifier, code_challenge = generate_pkce()
oauth_state = secrets.token_urlsafe(32)
authorize_params = {
"response_type": "code",
"client_id": self.oauth_client_id,
"audience": "https://api.openai.com/v1",
"redirect_uri": self.oauth_redirect_uri,
"scope": "openid profile email offline_access",
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"state": oauth_state,
"prompt": "login",
"login_hint": email,
"screen_hint": "login_or_signup",
"ext-oai-did": device_id,
"auth_session_logging_id": str(uuid.uuid4()),
"ext-passkey-client-capabilities": "1111",
"codex_cli_simplified_flow": "true",
"id_token_add_organizations": "true",
}
authorize_url = f"{self.oauth_issuer}/oauth/authorize"
seed_oai_device_cookie(self.session, device_id)
self._log("步骤1: Bootstrap OAuth session...")
authorize_final_url = self._bootstrap_oauth_session(
authorize_url,
authorize_params,
device_id=device_id,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
)
if not authorize_final_url:
self._set_error("Bootstrap 失败")
return None
continue_referer = f"{self.oauth_issuer}/create-account"
state = self._submit_authorize_continue(
email,
device_id,
continue_referer,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
authorize_url=authorize_url,
authorize_params=authorize_params,
screen_hint="signup",
)
if not state:
if not self.last_error:
self._set_error("提交邮箱后未进入有效的 OAuth 注册状态")
return None
self._log(f"OAuth 注册状态起点: {describe_flow_state(state)}")
referer = continue_referer
seen_states = {}
register_submitted = False
for step in range(24):
self.last_state = state
self._log(f"注册状态步进[{step + 1}/24]: {describe_flow_state(state)}")
signature = self._state_signature(state)
seen_states[signature] = seen_states.get(signature, 0) + 1
if seen_states[signature] > 2:
self._set_error(f"OAuth 注册状态卡住: {describe_flow_state(state)}")
return None
code = self._extract_code_from_state(state)
if code:
self._log(f"获取到 authorization code: {code[:20]}...")
self._log("步骤7: POST /oauth/token")
tokens = self._exchange_code_for_tokens(
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log("[OK] OAuth 注册成功")
else:
self._log("换取 tokens 失败")
return tokens
if self._state_is_create_account_password(state):
if register_submitted:
self._set_error("注册密码阶段重复进入")
return None
ok = self._submit_signup_register(
email,
password,
device_id,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
referer=state.current_url or state.continue_url or referer,
)
if not ok:
return None
register_submitted = True
state = self._send_signup_email_otp(
device_id,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
referer=state.current_url or state.continue_url or referer,
)
if not state:
if not self.last_error:
self._set_error("注册 OTP 触发后未进入邮箱验证码状态")
return None
referer = state.current_url or referer
continue
if self._state_is_email_otp(state):
next_state = self._handle_otp_verification(
email,
device_id,
user_agent,
sec_ch_ua,
impersonate,
skymail_client,
state,
prefer_passwordless_login=False,
allow_cached_code_retry=False,
)
if not next_state:
if not self.last_error:
self._set_error("注册 OTP 验证后未进入下一步状态")
return None
referer = state.current_url or referer
state = next_state
continue
if self._state_is_about_you(state):
next_state = self._submit_about_you_create_account(
first_name,
last_name,
birthdate,
device_id,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
referer=state.current_url or state.continue_url or referer,
)
if not next_state:
if not self.last_error:
self._set_error("about_you 提交后未进入下一步 OAuth 状态")
return None
referer = state.current_url or referer
state = next_state
continue
if self._state_is_add_phone(state):
try:
raw_dump = json.dumps(state.raw or {}, ensure_ascii=False)
except Exception:
raw_dump = ""
if raw_dump:
self._log(f"add_phone 状态响应体(raw): {raw_dump}")
if not allow_phone_verification:
if not self.last_error:
self._set_error("signup 链路命中 add_phone")
return None
next_state = self._handle_add_phone_verification(
device_id,
user_agent,
sec_ch_ua,
impersonate,
state,
)
if not next_state:
if not self.last_error:
self._set_error("手机号验证后未进入下一步 OAuth 状态")
return None
referer = state.current_url or referer
state = next_state
continue
if self._state_requires_navigation(state):
code, next_state = self._follow_flow_state(
state,
referer=referer,
user_agent=user_agent,
impersonate=impersonate,
)
if code:
self._log(f"获取到 authorization code: {code[:20]}...")
self._log("步骤7: POST /oauth/token")
tokens = self._exchange_code_for_tokens(
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log("[OK] OAuth 注册成功")
else:
self._log("换取 tokens 失败")
return tokens
referer = state.current_url or referer
state = next_state
self._log(f"follow state -> {describe_flow_state(state)}")
continue
if self._state_supports_workspace_resolution(state):
self._log("步骤6: 执行 workspace/org 选择")
consent_entry = (
state.continue_url
or state.current_url
or f"{self.oauth_issuer}/sign-in-with-chatgpt/codex/consent"
)
if self._state_is_add_phone(state):
consent_entry = f"{self.oauth_issuer}/sign-in-with-chatgpt/codex/consent"
self._log("步骤6: 当前处于 add_phone改用 canonical consent URL 继续")
code, next_state = self._oauth_submit_workspace_and_org(
consent_entry,
device_id,
user_agent,
impersonate,
)
if code:
self._log(f"获取到 authorization code: {code[:20]}...")
self._log("步骤7: POST /oauth/token")
tokens = self._exchange_code_for_tokens(
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log("[OK] OAuth 注册成功")
else:
self._log("换取 tokens 失败")
return tokens
if next_state:
referer = state.current_url or referer
state = next_state
self._log(f"workspace state -> {describe_flow_state(state)}")
continue
if not self.last_error:
self._set_error(f"workspace/org 选择失败: {describe_flow_state(state)}")
return None
self._set_error(f"未支持的 OAuth 注册状态: {describe_flow_state(state)}")
return None
self._set_error("OAuth 注册状态机超出最大步数")
return None
def _submit_about_you_create_account(
self,
first_name,
last_name,
birthdate,
device_id,
*,
user_agent=None,
sec_ch_ua=None,
impersonate=None,
referer=None,
):
"""在 OAuth 登录态命中 about_you 后提交资料,完成账户创建。"""
self._enter_stage("about_you", "submit create_account")
self._log("步骤5: 命中 about_you提交姓名和生日完成注册")
self._log(
"about_you 参数: "
f"first_name={'已设置' if str(first_name or '').strip() else '缺失'}, "
f"last_name={'已设置' if str(last_name or '').strip() else '缺失'}, "
f"birthdate={str(birthdate or '').strip() or '缺失'}"
)
full_name = f"{str(first_name or '').strip()} {str(last_name or '').strip()}".strip()
if not full_name or not str(birthdate or "").strip():
self._set_error("about_you 资料不完整: 缺少姓名或生日")
return None
about_you_url = f"{self.oauth_issuer}/about-you"
request_url = f"{self.oauth_issuer}/api/accounts/create_account"
payload = {
"name": full_name,
"birthdate": str(birthdate).strip(),
}
self._log("about_you 请求体已构建,准备 POST /api/accounts/create_account")
def _build_create_headers(sentinel_token: str = ""):
extra_headers = {
"oai-device-id": device_id,
}
if sentinel_token:
extra_headers["openai-sentinel-token"] = sentinel_token
headers_local = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=referer or about_you_url,
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers=extra_headers,
)
headers_local.update(generate_datadog_trace())
return headers_local
def _post_create(sentinel_token: str = ""):
kwargs = {
"json": payload,
"headers": _build_create_headers(sentinel_token),
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
return self.session.post(request_url, **kwargs)
try:
r = _post_create()
self._log(f"/create_account -> {r.status_code}")
self._log(
"about_you 响应: "
f"current_url={str(r.url)[:120]} referer={(referer or '')[:100]}"
)
if (
r.status_code in (401, 403)
or "sentinel" in (r.text or "").lower()
or "challenge" in (r.text or "").lower()
):
self._log("create_account 首次请求需要额外挑战,补发 sentinel 后重试...")
sentinel_token = build_sentinel_token(
self.session,
device_id,
flow="oauth_create_account",
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
)
if not sentinel_token:
self._set_error("无法获取 sentinel token (oauth_create_account)")
return None
r = _post_create(sentinel_token)
self._log(f"/create_account(重试) -> {r.status_code}")
self._log(
"about_you 重试响应: "
f"current_url={str(r.url)[:120]} referer={(referer or '')[:100]}"
)
if r.status_code == 400 and "already_exists" in (r.text or ""):
consent_state = self._state_from_url(
f"{self.oauth_issuer}/sign-in-with-chatgpt/codex/consent"
)
self._log(f"about_you 命中 already_exists转入 {describe_flow_state(consent_state)}")
return consent_state
if r.status_code != 200:
self._set_error(f"about_you 提交失败: {r.status_code} - {r.text[:180]}")
return None
try:
data = r.json()
except Exception:
data = {}
flow_state = self._state_from_payload(
data,
current_url=str(r.url) or request_url,
)
if self._state_is_add_phone(flow_state):
try:
raw_text = r.text or ""
except Exception:
raw_text = ""
try:
raw_json = json.dumps(data, ensure_ascii=False)
except Exception:
raw_json = ""
if raw_text:
self._log("add_phone 触发响应体(raw): " + raw_text)
if raw_json and raw_json != raw_text:
self._log("add_phone 触发响应体(json): " + raw_json)
self._log(f"about_you 提交成功 {describe_flow_state(flow_state)}")
return flow_state
except Exception as e:
self._set_error(f"about_you 提交异常: {e}")
return None
def _recreate_session(self):
"""重新创建会话容器。"""
self.session = curl_requests.Session()
if self.proxy:
self.session.proxies = build_requests_proxy_config(self.proxy)
def login_and_get_tokens(
self,
email,
password,
device_id,
user_agent=None,
sec_ch_ua=None,
impersonate=None,
skymail_client=None,
prefer_passwordless_login=False,
allow_phone_verification=True,
force_new_browser=False,
force_password_login=False,
force_chatgpt_entry=False,
screen_hint="login",
complete_about_you_if_needed=False,
first_name="",
last_name="",
birthdate="",
login_source="",
stop_after_login=False,
_continue_depth=0,
):
"""
完整的 OAuth 登录流程,获取 tokens
Args:
email: 邮箱
password: 密码
device_id: 设备 ID
user_agent: User-Agent
sec_ch_ua: sec-ch-ua header
impersonate: curl_cffi impersonate 参数
skymail_client: Skymail 客户端(用于获取 OTP如果需要
prefer_passwordless_login: 是否强制走 passwordless OTP 链路
allow_phone_verification: add_phone 后是否允许进入手机号验证码分支
force_password_login: 即使 prefer_passwordless_login=true也强制走密码登录
force_chatgpt_entry: 在 OAuth 前先走 ChatGPT 首页 -> CSRF -> signin/openai
complete_about_you_if_needed: 命中 about_you 后是否自动提交资料完成注册
screen_hint: authorize/continue 的 screen_hintlogin/signup
first_name: about_you 名字
last_name: about_you 姓氏
birthdate: about_you 生日,格式 YYYY-MM-DD
login_source: 当前登录场景,仅用于日志
Returns:
dict: tokens 字典,包含 access_token, refresh_token, id_token
"""
self.last_error = ""
self.last_workspace_id = ""
self.last_state = FlowState()
self._log(
"开始 OAuth 登录流程..."
+ (f" (source={login_source})" if login_source else "")
)
self._log(
"OAuth 策略: "
f"prefer_passwordless_login={'on' if prefer_passwordless_login else 'off'}, "
f"allow_phone_verification={'on' if allow_phone_verification else 'off'}, "
f"complete_about_you_if_needed={'on' if complete_about_you_if_needed else 'off'}, "
f"force_new_browser={'on' if force_new_browser else 'off'}, "
f"force_password_login={'on' if force_password_login else 'off'}, "
f"force_chatgpt_entry={'on' if force_chatgpt_entry else 'off'}, "
f"screen_hint={screen_hint or 'login'}, "
f"stop_after_login={'on' if stop_after_login else 'off'}"
)
if force_new_browser:
self._log("force_new_browser: 重新创建 OAuth 会话容器")
self._recreate_session()
device_id = str(uuid.uuid4())
self._log(f"force_new_browser: 新 device_id={device_id}")
else:
if not device_id:
device_id = str(uuid.uuid4())
self._log(f"OAuth device_id 缺失,已生成新的 device_id={device_id}")
self.device_id = str(device_id or "").strip()
user_agent, sec_ch_ua, impersonate = self._ensure_oauth_fingerprint(
user_agent, sec_ch_ua, impersonate
)
code_verifier, code_challenge = generate_pkce()
oauth_state = secrets.token_urlsafe(32)
authorize_params = {
"response_type": "code",
"client_id": self.oauth_client_id,
"redirect_uri": self.oauth_redirect_uri,
"scope": "openid profile email offline_access",
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"state": oauth_state,
}
authorize_url = f"{self.oauth_issuer}/oauth/authorize"
seed_oai_device_cookie(self.session, device_id)
if force_chatgpt_entry:
self._log("force_chatgpt_entry: 启动 ChatGPT 首页链路(不影响 OAuth PKCE")
_ = self._bootstrap_chatgpt_entry(
email,
device_id,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
)
self._log("步骤1: Bootstrap OAuth session...")
authorize_final_url = self._bootstrap_oauth_session(
authorize_url,
authorize_params,
device_id=device_id,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
)
if not authorize_final_url:
self._set_error("Bootstrap 失败")
return None
continue_referer = (
authorize_final_url
if authorize_final_url.startswith(self.oauth_issuer)
else f"{self.oauth_issuer}/log-in"
)
state = self._submit_authorize_continue(
email,
device_id,
continue_referer,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
authorize_url=authorize_url,
authorize_params=authorize_params,
screen_hint=str(screen_hint or "login"),
)
if not state:
if not self.last_error:
self._set_error("提交邮箱后未进入有效的 OAuth 状态")
return None
self._log(f"OAuth 状态起点: {describe_flow_state(state)}")
seen_states = {}
referer = continue_referer
def _should_stop_after_login(state_to_check: FlowState):
if not stop_after_login:
return False
if self._state_is_login_password(state_to_check):
return False
if self._state_is_email_otp(state_to_check):
return False
if self._state_is_create_account_password(state_to_check):
return False
return True
for step in range(20):
self.last_state = state
self._log(f"状态步进[{step + 1}/20]: {describe_flow_state(state)}")
signature = self._state_signature(state)
seen_states[signature] = seen_states.get(signature, 0) + 1
if seen_states[signature] > 2:
self._set_error(f"OAuth 状态卡住: {describe_flow_state(state)}")
return None
code = self._extract_code_from_state(state)
if code:
self._log(f"获取到 authorization code: {code[:20]}...")
self._log("步骤7: POST /oauth/token")
tokens = self._exchange_code_for_tokens(
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log("[OK] OAuth 登录成功")
else:
self._log("换取 tokens 失败")
return tokens
if prefer_passwordless_login and (not force_password_login) and self._state_is_login_password(state):
next_state = self._send_passwordless_login_otp(
email,
device_id,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
referer=state.current_url or state.continue_url or referer,
)
if not next_state:
if not self.last_error:
self._set_error("passwordless OTP 触发后未进入邮箱验证码状态")
return None
referer = state.current_url or referer
state = next_state
continue
if self._state_is_create_account_password(state) and force_password_login:
self._log("命中 create_account_password按强制密码登录路径继续")
next_state = self._submit_password_verify(
password,
device_id,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
referer=state.current_url or state.continue_url or f"{self.oauth_issuer}/log-in/password",
)
if not next_state:
if not self.last_error:
self._set_error("密码验证后未进入下一步 OAuth 状态")
return None
if _should_stop_after_login(next_state):
self._log(
"登录链路已完成(密码验证后进入下一状态),按要求停止"
)
self.last_state = next_state
self._set_error("登录链路已完成,按要求停止")
return None
referer = state.current_url or referer
state = next_state
continue
if self._state_is_login_password(state):
next_state = self._submit_password_verify(
password,
device_id,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
referer=state.current_url or state.continue_url or referer,
)
if not next_state:
if not self.last_error:
self._set_error("密码验证后未进入下一步 OAuth 状态")
return None
if _should_stop_after_login(next_state):
self._log(
"登录链路已完成(密码验证后进入下一状态),按要求停止"
)
self.last_state = next_state
self._set_error("登录链路已完成,按要求停止")
return None
referer = state.current_url or referer
state = next_state
continue
if (
prefer_passwordless_login
and self._state_is_add_phone(state)
and self._state_requires_navigation(state)
):
self._log("步骤5: OTP 后命中 add_phone先实际访问 continue_url 争取重签 workspace Cookie")
code, next_state = self._follow_flow_state(
state,
referer=referer,
user_agent=user_agent,
impersonate=impersonate,
)
if code:
self._log(f"获取到 authorization code: {code[:20]}...")
self._log("步骤7: POST /oauth/token")
tokens = self._exchange_code_for_tokens(
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log("[OK] OAuth 登录成功")
else:
self._log("换取 tokens 失败")
return tokens
referer = state.current_url or referer
state = next_state
continue
if self._state_is_email_otp(state):
if not skymail_client:
self._set_error("当前流程需要邮箱 OTP但缺少接码客户端")
return None
next_state = self._handle_otp_verification(
email,
device_id,
user_agent,
sec_ch_ua,
impersonate,
skymail_client,
state,
prefer_passwordless_login=prefer_passwordless_login,
allow_cached_code_retry=_continue_depth > 0,
)
if not next_state:
if not self.last_error:
self._set_error("邮箱 OTP 验证后未进入下一步 OAuth 状态")
return None
if _should_stop_after_login(next_state):
self._log(
"登录链路已完成OTP 验证后进入下一状态),按要求停止"
)
self.last_state = next_state
self._set_error("登录链路已完成,按要求停止")
return None
referer = state.current_url or referer
state = next_state
continue
if complete_about_you_if_needed and self._state_is_about_you(state):
self._log("步骤5: 命中 about_you执行 interrupt 新链路的资料补全提交")
next_state = self._submit_about_you_create_account(
first_name,
last_name,
birthdate,
device_id,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
referer=state.current_url or state.continue_url or referer,
)
if not next_state:
if not self.last_error:
self._set_error("about_you 提交后未进入下一步 OAuth 状态")
return None
referer = state.current_url or referer
state = next_state
continue
if self._state_is_add_phone(state):
try:
raw_dump = json.dumps(state.raw or {}, ensure_ascii=False)
except Exception:
raw_dump = ""
if raw_dump:
self._log(f"add_phone 状态响应体(raw): {raw_dump}")
if not allow_phone_verification:
if self._state_supports_workspace_resolution(state):
self._log(
"步骤5: add_phone 命中,但检测到 workspace 线索,继续尝试 workspace/org 选择"
)
else:
self._log(
"步骤5: add_phone 暂无显式 workspace 线索,先尝试 canonical consent URL 抢救"
)
code, next_state = self._oauth_submit_workspace_and_org(
f"{self.oauth_issuer}/sign-in-with-chatgpt/codex/consent",
device_id,
user_agent,
impersonate,
)
if code:
self._log(f"获取到 authorization code: {code[:20]}...")
self._log("步骤7: POST /oauth/token")
tokens = self._exchange_code_for_tokens(
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log("[OK] OAuth 登录成功")
else:
self._log("换取 tokens 失败")
return tokens
if next_state:
referer = state.current_url or referer
state = next_state
self._log(f"add_phone -> workspace state -> {describe_flow_state(state)}")
continue
workspace_error = str(self.last_error or "").strip()
if prefer_passwordless_login and _continue_depth < 1:
self._log(
"步骤5: canonical consent 仍未拿到 workspace/callback"
+ (
f" ({workspace_error})"
if workspace_error
else ""
)
+ ",重启一次全新 OAuth session + 新 PKCE"
)
self._recreate_session()
return self.login_and_get_tokens(
email,
password,
device_id,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
skymail_client=skymail_client,
prefer_passwordless_login=prefer_passwordless_login,
allow_phone_verification=allow_phone_verification,
complete_about_you_if_needed=complete_about_you_if_needed,
first_name=first_name,
last_name=last_name,
birthdate=birthdate,
login_source=(
f"{login_source}:add_phone_continue"
if login_source
else "add_phone_continue"
),
_continue_depth=_continue_depth + 1,
)
else:
self._set_error(
"passwordless 登录后仍停留在 add_phone未获取到 workspace / callback"
+ (f" ({workspace_error})" if workspace_error else "")
)
return None
else:
next_state = self._handle_add_phone_verification(
device_id,
user_agent,
sec_ch_ua,
impersonate,
state,
)
if not next_state:
if not self.last_error:
self._set_error("手机号验证后未进入下一步 OAuth 状态")
return None
referer = state.current_url or referer
state = next_state
continue
if self._state_requires_navigation(state):
code, next_state = self._follow_flow_state(
state,
referer=referer,
user_agent=user_agent,
impersonate=impersonate,
)
if code:
self._log(f"获取到 authorization code: {code[:20]}...")
self._log("步骤7: POST /oauth/token")
tokens = self._exchange_code_for_tokens(
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log("[OK] OAuth 登录成功")
else:
self._log("换取 tokens 失败")
return tokens
referer = state.current_url or referer
state = next_state
self._log(f"follow state -> {describe_flow_state(state)}")
continue
if self._state_supports_workspace_resolution(state):
self._log("步骤6: 执行 workspace/org 选择")
consent_entry = (
state.continue_url
or state.current_url
or f"{self.oauth_issuer}/sign-in-with-chatgpt/codex/consent"
)
if self._state_is_add_phone(state):
consent_entry = (
f"{self.oauth_issuer}/sign-in-with-chatgpt/codex/consent"
)
self._log("步骤6: 当前处于 add_phone改用 canonical consent URL 继续")
code, next_state = self._oauth_submit_workspace_and_org(
consent_entry,
device_id,
user_agent,
impersonate,
)
if code:
self._log(f"获取到 authorization code: {code[:20]}...")
self._log("步骤7: POST /oauth/token")
tokens = self._exchange_code_for_tokens(
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log("[OK] OAuth 登录成功")
else:
self._log("换取 tokens 失败")
return tokens
if next_state:
referer = state.current_url or referer
state = next_state
self._log(f"workspace state -> {describe_flow_state(state)}")
continue
if not self.last_error:
self._set_error(
f"workspace/org 选择失败: {describe_flow_state(state)}"
)
return None
self._set_error(f"未支持的 OAuth 状态: {describe_flow_state(state)}")
return None
self._set_error("OAuth 状态机超出最大步数")
return None
def _extract_code_from_url(self, url):
"""从 URL 中提取 code"""
if not url or "code=" not in url:
return None
try:
return parse_qs(urlparse(url).query).get("code", [None])[0]
except Exception:
return None
def _oauth_follow_for_code(
self, start_url, referer, user_agent, impersonate, max_hops=16
):
"""跟随 URL 获取 authorization code手动跟随重定向"""
code, next_state = self._follow_flow_state(
self._state_from_url(start_url),
referer=referer,
user_agent=user_agent,
impersonate=impersonate,
max_hops=max_hops,
)
return code, (next_state.current_url or next_state.continue_url or start_url)
def _oauth_submit_workspace_and_org(
self, consent_url, device_id, user_agent, impersonate, max_retries=3
):
"""提交 workspace 和 organization 选择(带重试)"""
self._enter_stage("workspace_select", consent_url[:120] if consent_url else "")
session_data = None
for attempt in range(max_retries):
session_data = self._load_workspace_session_data(
consent_url=consent_url,
user_agent=user_agent,
impersonate=impersonate,
)
if session_data:
break
if attempt < max_retries - 1:
self._log(
f"无法获取 consent session 数据 (尝试 {attempt + 1}/{max_retries})"
)
time.sleep(0.3)
else:
self._set_error("无法获取 consent session 数据")
return None, None
workspaces = session_data.get("workspaces", [])
if not workspaces:
self._set_error("session 中没有 workspace 信息")
return None, None
workspace_id = (workspaces[0] or {}).get("id")
if not workspace_id:
self._set_error("workspace_id 为空")
return None, None
self.last_workspace_id = str(workspace_id).strip()
self._log(f"选择 workspace: {workspace_id}")
headers = self._headers(
f"{self.oauth_issuer}/api/accounts/workspace/select",
user_agent=user_agent,
accept="application/json",
referer=consent_url,
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={
"oai-device-id": device_id,
},
)
headers.update(generate_datadog_trace())
try:
kwargs = {
"json": {"workspace_id": workspace_id},
"headers": headers,
"allow_redirects": False,
"timeout": 30,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
r = self.session.post(
f"{self.oauth_issuer}/api/accounts/workspace/select", **kwargs
)
self._log(f"workspace/select -> {r.status_code}")
self._log(
f"workspace/select 请求: workspace_id={workspace_id} consent_url={consent_url[:120]}"
)
# 检查重定向
if r.status_code in (301, 302, 303, 307, 308):
location = normalize_flow_url(
r.headers.get("Location", ""), auth_base=self.oauth_issuer
)
if "code=" in location:
code = self._extract_code_from_url(location)
if code:
self._log("从 workspace/select 重定向获取到 code")
return code, self._state_from_url(location)
if location:
return None, self._state_from_url(location)
# 如果返回 200检查响应中的 orgs
if r.status_code == 200:
try:
data = r.json()
orgs = data.get("data", {}).get("orgs", [])
workspace_state = self._state_from_payload(
data, current_url=str(r.url)
)
continue_url = workspace_state.continue_url
if orgs:
org_id = (orgs[0] or {}).get("id")
projects = (orgs[0] or {}).get("projects", [])
project_id = (projects[0] or {}).get("id") if projects else None
if org_id:
self._log(f"选择 organization: {org_id}")
org_body = {"org_id": org_id}
if project_id:
org_body["project_id"] = project_id
org_referer = (
continue_url
if continue_url and continue_url.startswith("http")
else consent_url
)
headers = self._headers(
f"{self.oauth_issuer}/api/accounts/organization/select",
user_agent=user_agent,
accept="application/json",
referer=org_referer,
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={
"oai-device-id": device_id,
},
)
headers.update(generate_datadog_trace())
kwargs = {
"json": org_body,
"headers": headers,
"allow_redirects": False,
"timeout": 30,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
r_org = self.session.post(
f"{self.oauth_issuer}/api/accounts/organization/select",
**kwargs,
)
self._log(f"organization/select -> {r_org.status_code}")
self._log(
f"organization/select 请求: org_id={org_id} project_id={project_id or '-'}"
)
# 检查重定向
if r_org.status_code in (301, 302, 303, 307, 308):
location = normalize_flow_url(
r_org.headers.get("Location", ""),
auth_base=self.oauth_issuer,
)
if "code=" in location:
code = self._extract_code_from_url(location)
if code:
self._log(
"从 organization/select 重定向获取到 code"
)
return code, self._state_from_url(location)
if location:
return None, self._state_from_url(location)
# 检查 continue_url
if r_org.status_code == 200:
try:
org_state = self._state_from_payload(
r_org.json(), current_url=str(r_org.url)
)
self._log(
f"organization/select -> {describe_flow_state(org_state)}"
)
if self._extract_code_from_state(org_state):
return self._extract_code_from_state(
org_state
), org_state
return None, org_state
except Exception as e:
self._set_error(
f"解析 organization/select 响应异常: {e}"
)
# 如果有 continue_url跟随它
if continue_url:
code, _ = self._oauth_follow_for_code(
continue_url, consent_url, user_agent, impersonate
)
if code:
return code, self._state_from_url(continue_url)
return None, workspace_state
except Exception as e:
self._set_error(f"处理 workspace/select 响应异常: {e}")
return None, None
except Exception as e:
self._set_error(f"workspace/select 异常: {e}")
return None, None
return None, None
def _load_workspace_session_data(self, consent_url, user_agent, impersonate):
"""优先从 cookie 解码 session失败时回退到 consent HTML 中提取 workspace 数据。"""
session_data = self._decode_oauth_session_cookie()
if session_data and session_data.get("workspaces"):
return session_data
html = self._fetch_consent_page_html(consent_url, user_agent, impersonate)
if not html:
return session_data
parsed = self._extract_session_data_from_consent_html(html)
if parsed and parsed.get("workspaces"):
self._log(
f"从 consent HTML 提取到 {len(parsed.get('workspaces', []))} 个 workspace"
)
return parsed
return session_data
def _fetch_consent_page_html(self, consent_url, user_agent, impersonate):
"""获取 consent 页 HTML用于解析 React Router stream 中的 session 数据。"""
try:
headers = self._headers(
consent_url,
user_agent=user_agent,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
referer=f"{self.oauth_issuer}/email-verification",
navigation=True,
)
kwargs = {"headers": headers, "allow_redirects": False, "timeout": 30}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.3)
r = self.session.get(consent_url, **kwargs)
if r.status_code == 200 and "text/html" in (
r.headers.get("content-type", "").lower()
):
return r.text
except Exception:
pass
return ""
def _extract_session_data_from_consent_html(self, html):
"""从 consent HTML 的 React Router stream 中提取 workspace session 数据。"""
import json
import re
if not html or "workspaces" not in html:
return None
def _first_match(patterns, text):
for pattern in patterns:
m = re.search(pattern, text, re.S)
if m:
return m.group(1)
return ""
def _build_from_text(text):
if not text or "workspaces" not in text:
return None
normalized = text.replace('\\"', '"')
session_id = _first_match(
[
r'"session_id","([^"]+)"',
r'"session_id":"([^"]+)"',
],
normalized,
)
client_id = _first_match(
[
r'"openai_client_id","([^"]+)"',
r'"openai_client_id":"([^"]+)"',
],
normalized,
)
start = normalized.find('"workspaces"')
if start < 0:
start = normalized.find("workspaces")
if start < 0:
return None
end = normalized.find('"openai_client_id"', start)
if end < 0:
end = normalized.find("openai_client_id", start)
if end < 0:
end = min(len(normalized), start + 4000)
else:
end = min(len(normalized), end + 600)
workspace_chunk = normalized[start:end]
ids = re.findall(r'"id"(?:,|:)"([0-9a-fA-F-]{36})"', workspace_chunk)
if not ids:
return None
kinds = re.findall(r'"kind"(?:,|:)"([^"]+)"', workspace_chunk)
workspaces = []
seen = set()
for idx, wid in enumerate(ids):
if wid in seen:
continue
seen.add(wid)
item = {"id": wid}
if idx < len(kinds):
item["kind"] = kinds[idx]
workspaces.append(item)
if not workspaces:
return None
return {
"session_id": session_id,
"openai_client_id": client_id,
"workspaces": workspaces,
}
candidates = [html]
for quoted in re.findall(
r'streamController\.enqueue\(("(?:\\.|[^"\\])*")\)',
html,
re.S,
):
try:
decoded = json.loads(quoted)
except Exception:
continue
if decoded:
candidates.append(decoded)
if '\\"' in html:
candidates.append(html.replace('\\"', '"'))
for candidate in candidates:
parsed = _build_from_text(candidate)
if parsed and parsed.get("workspaces"):
return parsed
return None
def _decode_oauth_session_cookie(self):
"""解码 oai-client-auth-session cookie"""
try:
for cookie in self.session.cookies:
try:
name = cookie.name if hasattr(cookie, "name") else str(cookie)
if name == "oai-client-auth-session":
value = (
cookie.value
if hasattr(cookie, "value")
else self.session.cookies.get(name)
)
if value:
data = self._decode_cookie_json_value(value)
if data:
return data
except Exception:
continue
except Exception:
pass
return None
@staticmethod
def _decode_cookie_json_value(value):
import base64
import json
raw_value = str(value or "").strip()
if not raw_value:
return None
candidates = [raw_value]
if "." in raw_value:
candidates.insert(0, raw_value.split(".", 1)[0])
for candidate in candidates:
candidate = candidate.strip()
if not candidate:
continue
padded = candidate + "=" * (-len(candidate) % 4)
for decoder in (base64.urlsafe_b64decode, base64.b64decode):
try:
decoded = decoder(padded).decode("utf-8")
parsed = json.loads(decoded)
except Exception:
continue
if isinstance(parsed, dict):
return parsed
return None
def _exchange_code_for_tokens(self, code, code_verifier, user_agent, impersonate):
"""用 authorization code 换取 tokens"""
self._enter_stage("token_exchange", f"code={str(code or '')[:24]}...")
url = f"{self.oauth_issuer}/oauth/token"
payload = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": self.oauth_redirect_uri,
"client_id": self.oauth_client_id,
"code_verifier": code_verifier,
}
headers = self._headers(
url,
user_agent=user_agent,
accept="application/json",
referer=f"{self.oauth_issuer}/sign-in-with-chatgpt/codex/consent",
origin=self.oauth_issuer,
content_type="application/x-www-form-urlencoded",
fetch_site="same-origin",
)
try:
kwargs = {"data": payload, "headers": headers, "timeout": 60}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
r = self.session.post(url, **kwargs)
if r.status_code == 200:
self._log("token_exchange 成功")
return r.json()
else:
self._set_error(f"换取 tokens 失败: {r.status_code} - {r.text[:200]}")
except Exception as e:
self._set_error(f"换取 tokens 异常: {e}")
return None
def _send_phone_number(self, phone, device_id, user_agent, sec_ch_ua, impersonate):
request_url = f"{self.oauth_issuer}/api/accounts/add-phone/send"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=f"{self.oauth_issuer}/add-phone",
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={"oai-device-id": device_id},
)
headers.update(generate_datadog_trace())
try:
kwargs = {
"json": {"phone_number": phone},
"headers": headers,
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.25)
resp = self.session.post(request_url, **kwargs)
except Exception as e:
return False, None, f"add-phone/send 异常: {e}"
self._log(f"/add-phone/send -> {resp.status_code}")
if resp.status_code != 200:
return (
False,
None,
f"add-phone/send 失败: {resp.status_code} - {resp.text[:180]}",
)
try:
data = resp.json()
except Exception:
return False, None, "add-phone/send 响应不是 JSON"
next_state = self._state_from_payload(
data, current_url=str(resp.url) or request_url
)
self._log(f"add-phone/send {describe_flow_state(next_state)}")
return True, next_state, ""
def _resend_phone_otp(
self,
phone_number,
device_id,
user_agent,
sec_ch_ua,
impersonate,
state: FlowState,
):
request_url = f"{self.oauth_issuer}/api/accounts/add-phone/send"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=state.current_url
or state.continue_url
or f"{self.oauth_issuer}/add-phone",
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={"oai-device-id": device_id},
)
headers.update(generate_datadog_trace())
try:
kwargs = {
"json": {"phone_number": phone_number},
"headers": headers,
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.25)
resp = self.session.post(request_url, **kwargs)
except Exception as e:
return False, f"add-phone/send 重发异常: {e}"
self._log(f"/add-phone/send(resend) -> {resp.status_code}")
if resp.status_code == 200:
return True, ""
return False, f"add-phone/send 重发失败: {resp.status_code} - {resp.text[:180]}"
def _get_config_value(self, *keys):
for key in keys:
value = str(self.config.get(key, "") or "").strip()
if value:
return value
return ""
def _get_configured_phone_number(self) -> str:
return self._get_config_value(
"chatgpt_phone_number",
"openai_phone_number",
"phone_number",
)
def _get_configured_phone_codes(self) -> list[str]:
raw = self._get_config_value(
"chatgpt_phone_otp_codes",
"chatgpt_phone_otp_code",
"openai_phone_otp_codes",
"openai_phone_otp_code",
"phone_otp_codes",
"phone_otp_code",
)
if not raw:
return []
parts = []
for chunk in raw.replace("\n", ",").replace(";", ",").split(","):
code = str(chunk or "").strip()
if code:
parts.append(code)
return parts
def _validate_phone_otp(
self, code, device_id, user_agent, sec_ch_ua, impersonate, state: FlowState
):
request_url = f"{self.oauth_issuer}/api/accounts/phone-otp/validate"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=state.current_url
or state.continue_url
or f"{self.oauth_issuer}/phone-verification",
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={"oai-device-id": device_id},
)
headers.update(generate_datadog_trace())
try:
kwargs = {
"json": {"code": code},
"headers": headers,
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.25)
resp = self.session.post(request_url, **kwargs)
except Exception as e:
return False, None, f"phone-otp/validate 异常: {e}"
self._log(f"/phone-otp/validate -> {resp.status_code}")
if resp.status_code != 200:
if resp.status_code == 401:
return False, None, "手机号验证码错误"
return (
False,
None,
f"phone-otp/validate 失败: {resp.status_code} - {resp.text[:180]}",
)
try:
data = resp.json()
except Exception:
return False, None, "phone-otp/validate 响应不是 JSON"
next_state = self._state_from_payload(
data, current_url=str(resp.url) or request_url
)
self._log(f"手机号 OTP 验证通过 {describe_flow_state(next_state)}")
return True, next_state, ""
def _handle_add_phone_verification(
self, device_id, user_agent, sec_ch_ua, impersonate, state: FlowState
):
configured_phone = self._get_configured_phone_number()
configured_codes = self._get_configured_phone_codes()
if configured_phone:
self._log(f"步骤5: add_phone 使用配置手机号: {configured_phone}")
sent, next_state, detail = self._send_phone_number(
configured_phone,
device_id,
user_agent,
sec_ch_ua,
impersonate,
)
if not sent or not next_state:
self._set_error(detail or "add-phone/send 未返回有效状态")
return None
if (
next_state.page_type != "phone_otp_verification"
and "phone-verification"
not in f"{next_state.continue_url} {next_state.current_url}".lower()
):
if self._state_supports_workspace_resolution(next_state) or self._state_requires_navigation(next_state):
self._log(f"add_phone 提交后已进入后续状态: {describe_flow_state(next_state)}")
return next_state
self._set_error(
f"add-phone/send 未进入手机验证码页: {describe_flow_state(next_state)}"
)
return None
if configured_codes:
for idx, code in enumerate(configured_codes, start=1):
self._log(
f"步骤5: 使用配置手机号验证码 {idx}/{len(configured_codes)}: {code}"
)
valid, validated_state, detail = self._validate_phone_otp(
code,
device_id,
user_agent,
sec_ch_ua,
impersonate,
next_state,
)
if valid and validated_state:
return validated_state
self._log(detail or "手机号 OTP 验证失败")
self._set_error("配置的手机号验证码未通过验证")
return None
self._set_error(
"已提交配置手机号,但未提供 chatgpt_phone_otp_code当前流程无法继续"
)
return None
phone_service = SMSToMePhoneService(self.config, log_fn=self._log)
if not phone_service.enabled:
self._set_error(
"当前链路需要手机号验证但未配置可用的手机号能力SMSToMe 或固定手机号验证码)"
)
return None
excluded_prefixes = set()
last_failure = ""
for attempt in range(phone_service.max_attempts):
try:
entry = phone_service.acquire_phone(exclude_prefixes=excluded_prefixes)
except Exception as e:
last_failure = f"获取手机号失败: {e}"
self._log(last_failure)
break
if not entry:
last_failure = last_failure or "SMSToMe 号码池中无可用手机号"
break
prefix = phone_service.prefix_hint(entry.phone)
self._log(
f"步骤5: add_phone 选择手机号 {attempt + 1}/{phone_service.max_attempts}: {entry.phone} ({entry.country_slug})"
)
sent, next_state, detail = self._send_phone_number(
entry.phone,
device_id,
user_agent,
sec_ch_ua,
impersonate,
)
if not sent or not next_state:
last_failure = detail or "add-phone/send 未返回有效状态"
self._log(last_failure)
self._blacklist_phone_if_needed(phone_service, entry, last_failure)
excluded_prefixes.add(prefix)
continue
if (
next_state.page_type != "phone_otp_verification"
and "phone-verification"
not in f"{next_state.continue_url} {next_state.current_url}".lower()
):
last_failure = f"add-phone/send 未进入手机验证码页: {describe_flow_state(next_state)}"
self._log(last_failure)
self._blacklist_phone_if_needed(
phone_service, entry, last_failure, next_state
)
excluded_prefixes.add(prefix)
continue
session_data = self._decode_oauth_session_cookie() or {}
verification_channel = (
str(session_data.get("phone_verification_channel") or "sms")
.strip()
.lower()
or "sms"
)
bound_phone = (
str(session_data.get("phone_number") or entry.phone).strip()
or entry.phone
)
self._log(
f"add_phone 发码成功: phone={bound_phone}, channel={verification_channel}"
)
if verification_channel != "sms":
last_failure = f"add_phone 已切到 {verification_channel} 通道,当前 SMSToMe 仅支持短信接码"
self._log(last_failure)
excluded_prefixes.add(prefix)
continue
code = phone_service.wait_for_code(entry)
if not code:
self._log("手机号验证码暂未收到,尝试重发一次...")
resend_ok, resend_detail = self._resend_phone_otp(
entry.phone,
device_id,
user_agent,
sec_ch_ua,
impersonate,
next_state,
)
if resend_ok:
code = phone_service.wait_for_code(entry)
if not code:
last_failure = (
resend_detail or f"手机号 {entry.phone} 未收到短信验证码"
)
self._log(last_failure)
excluded_prefixes.add(prefix)
continue
valid, validated_state, detail = self._validate_phone_otp(
code,
device_id,
user_agent,
sec_ch_ua,
impersonate,
next_state,
)
if not valid or not validated_state:
last_failure = detail or "手机号 OTP 验证失败"
self._log(last_failure)
excluded_prefixes.add(prefix)
continue
return validated_state
self._set_error(f"add_phone 阶段失败: {last_failure or '未完成手机号验证'}")
return None
def _handle_otp_verification(
self,
email,
device_id,
user_agent,
sec_ch_ua,
impersonate,
skymail_client,
state,
*,
prefer_passwordless_login=False,
allow_cached_code_retry=False,
):
"""处理 OAuth 阶段的邮箱 OTP 验证,返回服务端声明的下一步状态。"""
self._enter_stage("otp", f"email={email}")
self._log("步骤4: 检测到邮箱 OTP 验证")
# 记录 OTP 发送时间基线——必须在 sentinel token 等耗时操作之前,
# 否则邮件 created_at 会早于 otp_cutoff 导致验证码被误判为旧邮件。
_otp_sent_at_baseline = time.time()
def _resend_email_otp() -> bool:
prefer_passwordless = bool(
prefer_passwordless_login
or allow_cached_code_retry
or self.config.get("prefer_passwordless_login")
or self.config.get("force_passwordless_login")
)
resend_ok = False
if prefer_passwordless:
request_url = f"{self.oauth_issuer}/api/accounts/passwordless/send-otp"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=state.current_url
or state.continue_url
or f"{self.oauth_issuer}/log-in/password",
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={
"oai-device-id": device_id,
},
)
headers.update(generate_datadog_trace())
try:
kwargs = {"headers": headers, "timeout": 30, "allow_redirects": False}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
resp = self.session.post(request_url, **kwargs)
self._log(f"/passwordless/send-otp -> {resp.status_code}")
if resp.status_code == 200:
resend_ok = True
except Exception as e:
self._log(f"passwordless resend 异常: {e}")
if resend_ok:
self._log("已触发 passwordless OTP 重发")
return True
request_url = f"{self.oauth_issuer}/api/accounts/email-otp/send"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json, text/plain, */*",
referer=state.current_url
or state.continue_url
or f"{self.oauth_issuer}/email-verification",
fetch_site="same-origin",
extra_headers={
"oai-device-id": device_id,
},
)
headers.update(generate_datadog_trace())
try:
kwargs = {"headers": headers, "timeout": 30, "allow_redirects": True}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause()
resp = self.session.get(request_url, **kwargs)
self._log(f"/email-otp/send -> {resp.status_code}")
if resp.status_code == 200:
self._log("已触发 email-otp 重发")
return True
self._log(f"email-otp/send 重发失败: {resp.text[:120]}")
except Exception as e:
self._log(f"email-otp/send 重发异常: {e}")
return False
request_url = f"{self.oauth_issuer}/api/accounts/email-otp/validate"
self._log(f"email_otp_validate: device_id={device_id}")
otp_referer = (
state.current_url
or state.continue_url
or f"{self.oauth_issuer}/email-verification"
)
sentinel_otp = get_sentinel_token_via_browser(
flow="email_otp_validate",
proxy=self.proxy,
page_url=otp_referer,
headless=self.browser_mode != "headed",
device_id=device_id,
log_fn=lambda msg: self._log(f"email_otp_validate: {msg}"),
)
if sentinel_otp:
self._log("email_otp_validate: 已通过 Playwright SentinelSDK 获取 token")
else:
sentinel_otp = build_sentinel_token(
self.session,
device_id,
flow="email_otp_validate",
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
impersonate=impersonate,
)
if sentinel_otp:
self._log("email_otp_validate: 已通过 HTTP PoW 获取 token")
else:
self._log("email_otp_validate: 未生成 sentinel token继续尝试")
def _build_otp_headers():
extra_headers = {
"oai-device-id": device_id,
}
if sentinel_otp:
extra_headers["openai-sentinel-token"] = sentinel_otp
headers_otp = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=otp_referer,
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers=extra_headers,
)
headers_otp.update(generate_datadog_trace())
return headers_otp
if not hasattr(skymail_client, "_used_codes"):
skymail_client._used_codes = set()
tried_codes = set()
try:
otp_wait_seconds = int(
self.config.get(
"chatgpt_oauth_otp_wait_seconds",
self.config.get("chatgpt_otp_wait_seconds", 600),
)
or 600
)
except Exception:
otp_wait_seconds = 600
otp_wait_seconds = max(30, min(otp_wait_seconds, 3600))
otp_poll_window = min(30, max(10, otp_wait_seconds))
try:
default_resend_wait_seconds = 45 if prefer_passwordless_login else 120
otp_resend_wait_seconds = int(
self.config.get(
"chatgpt_oauth_otp_resend_wait_seconds",
self.config.get(
"chatgpt_otp_resend_wait_seconds",
default_resend_wait_seconds,
),
)
or default_resend_wait_seconds
)
except Exception:
otp_resend_wait_seconds = 45 if prefer_passwordless_login else 120
otp_resend_wait_seconds = max(30, min(otp_resend_wait_seconds, 900))
otp_deadline = time.time() + otp_wait_seconds
otp_sent_at = _otp_sent_at_baseline
self._log(
f"OAuth OTP 等待窗口: total={otp_wait_seconds}s, poll_window={otp_poll_window}s, "
f"每轮最多 5 次无响应后重发,最多 3 轮"
)
def validate_otp(code):
tried_codes.add(code)
self._log(f"尝试 OTP: {code}")
try:
kwargs = {
"json": {"code": code},
"headers": _build_otp_headers(),
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.25)
resp_otp = self.session.post(request_url, **kwargs)
except Exception as e:
self._log(f"email-otp/validate 异常: {e}")
return None
self._log(f"/email-otp/validate -> {resp_otp.status_code}")
if resp_otp.status_code != 200:
self._log(f"OTP 无效: {resp_otp.text[:160]}")
return None
try:
otp_data = resp_otp.json()
except Exception:
self._log("email-otp/validate 响应不是 JSON")
return None
next_state = self._state_from_payload(
otp_data,
current_url=str(resp_otp.url)
or (state.current_url or state.continue_url or request_url),
)
self._log(f"OTP 验证通过 {describe_flow_state(next_state)}")
self._log(
f"otp 响应详情: current_url={str(resp_otp.url)[:120]} tried_codes={len(tried_codes)}"
)
remember_successful_code = getattr(
skymail_client, "remember_successful_code", None
)
if callable(remember_successful_code):
remember_successful_code(code)
else:
skymail_client._used_codes.add(code)
setattr(skymail_client, "_last_success_code", code)
setattr(skymail_client, "_last_success_code_at", time.time())
return next_state
if allow_cached_code_retry:
cached_code = ""
cached_age = None
get_recent_code = getattr(skymail_client, "get_recent_code", None)
if callable(get_recent_code):
cached_code = str(
get_recent_code(
max_age_seconds=min(180, otp_wait_seconds),
prefer_successful=True,
)
or ""
).strip()
cached_age = (
time.time() - float(getattr(skymail_client, "_last_success_code_at", 0) or 0)
if cached_code
else None
)
else:
cached_code = str(
getattr(skymail_client, "_last_success_code", "")
or getattr(skymail_client, "_last_code", "")
or ""
).strip()
cached_ts = float(
getattr(skymail_client, "_last_success_code_at", 0)
or getattr(skymail_client, "_last_code_at", 0)
or 0
)
if cached_code and cached_ts:
cached_age = time.time() - cached_ts
if cached_age > min(180, otp_wait_seconds):
cached_code = ""
if cached_code:
age_text = (
f"{int(max(0, cached_age or 0))}s前"
if cached_age is not None
else "近期"
)
self._log(
f"检测到近期缓存 OTP先直接尝试: {cached_code} ({age_text})"
)
next_state = validate_otp(cached_code)
if next_state:
return next_state
self._log("缓存 OTP 未通过,继续等待新的 OTP...")
if hasattr(skymail_client, "wait_for_verification_code"):
self._log("使用 wait_for_verification_code 进行阻塞式获取新验证码...")
no_new_count = 0
resend_round = 0
_max_no_new = 5
_max_resend_rounds = 3
while time.time() < otp_deadline:
remaining = max(1, int(otp_deadline - time.time()))
wait_time = min(otp_poll_window, remaining)
try:
code = skymail_client.wait_for_verification_code(
email,
timeout=wait_time,
otp_sent_at=otp_sent_at,
exclude_codes=tried_codes,
)
except TaskInterruption:
self._set_error("任务已手动停止")
return None
except Exception as e:
if "手动停止" in str(e):
self._set_error("任务已手动停止")
return None
self._log(f"等待 OTP 异常: {e}")
code = None
if not code:
no_new_count += 1
self._log(
f"暂未收到新的 OTP继续等待... (本轮第 {no_new_count}/{_max_no_new} 次)"
)
if no_new_count >= _max_no_new:
if resend_round < _max_resend_rounds:
resend_round += 1
self._log(
f"连续 {_max_no_new} 次未收到新 OTP"
f"触发第 {resend_round}/{_max_resend_rounds} 轮重发..."
)
if _resend_email_otp():
otp_sent_at = time.time()
no_new_count = 0
else:
self._log(
f"已完成 {_max_resend_rounds} 轮重发仍未收到 OTP放弃等待"
)
break
if self.last_error:
break
continue
if code in tried_codes:
self._log(f"跳过已尝试验证码: {code}")
continue
no_new_count = 0
next_state = validate_otp(code)
if next_state:
return next_state
if self.last_error:
break
else:
while time.time() < otp_deadline:
messages = skymail_client.fetch_emails(email) or []
candidate_codes = []
for msg in messages[:12]:
content = msg.get("content") or msg.get("text") or ""
code = skymail_client.extract_verification_code(content)
if code and code not in tried_codes:
candidate_codes.append(code)
if not candidate_codes:
elapsed = int(otp_wait_seconds - max(0, otp_deadline - time.time()))
self._log(f"等待新的 OTP... ({elapsed}s/{otp_wait_seconds}s)")
time.sleep(2)
continue
for otp_code in candidate_codes:
next_state = validate_otp(otp_code)
if next_state:
return next_state
time.sleep(2)
if self.last_error:
break
if not self.last_error:
self._set_error(
f"OAuth 阶段 OTP 验证失败,已尝试 {len(tried_codes)} 个验证码,等待窗口 {otp_wait_seconds}s"
)
return None