Files
any-auto-register/platforms/chatgpt/chatgpt_client.py

1167 lines
41 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ChatGPT 注册客户端模块
使用 curl_cffi 模拟浏览器行为
"""
import random
import uuid
import time
from urllib.parse import urlparse
from core.proxy_utils import build_requests_proxy_config
try:
from curl_cffi import requests as curl_requests
except ImportError:
print("[FAIL] 需要安装 curl_cffi: pip install curl_cffi")
import sys
sys.exit(1)
from .sentinel_token import build_sentinel_token
from .sentinel_browser import get_sentinel_token_via_browser
from .utils import (
FlowState,
build_browser_headers,
decode_jwt_payload,
describe_flow_state,
extract_flow_state,
generate_datadog_trace,
normalize_flow_url,
random_delay,
seed_oai_device_cookie,
)
# Chrome 指纹配置
_CHROME_PROFILES = [
{
"major": 131,
"impersonate": "chrome131",
"build": 6778,
"patch_range": (69, 205),
"sec_ch_ua": '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
},
{
"major": 133,
"impersonate": "chrome133a",
"build": 6943,
"patch_range": (33, 153),
"sec_ch_ua": '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
},
{
"major": 136,
"impersonate": "chrome136",
"build": 7103,
"patch_range": (48, 175),
"sec_ch_ua": '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"',
},
]
def _random_chrome_version():
"""随机选择一个 Chrome 版本"""
profile = random.choice(_CHROME_PROFILES)
major = profile["major"]
build = profile["build"]
patch = random.randint(*profile["patch_range"])
full_ver = f"{major}.0.{build}.{patch}"
ua = f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{full_ver} Safari/537.36"
return profile["impersonate"], major, full_ver, ua, profile["sec_ch_ua"]
class ChatGPTClient:
"""ChatGPT 注册客户端"""
BASE = "https://chatgpt.com"
AUTH = "https://auth.openai.com"
def __init__(self, proxy=None, verbose=True, browser_mode="protocol"):
"""
初始化 ChatGPT 客户端
Args:
proxy: 代理地址
verbose: 是否输出详细日志
browser_mode: protocol | headless | headed
"""
self.proxy = proxy
self.verbose = verbose
self.browser_mode = browser_mode or "protocol"
self.device_id = str(uuid.uuid4())
self.accept_language = random.choice(
[
"en-US,en;q=0.9",
"en-US,en;q=0.9,zh-CN;q=0.8",
"en,en-US;q=0.9",
"en-US,en;q=0.8",
]
)
# 随机 Chrome 版本
(
self.impersonate,
self.chrome_major,
self.chrome_full,
self.ua,
self.sec_ch_ua,
) = _random_chrome_version()
# 创建 session
self.session = curl_requests.Session(impersonate=self.impersonate)
if self.proxy:
self.session.proxies = build_requests_proxy_config(self.proxy)
# 设置基础 headers
self.session.headers.update(
{
"User-Agent": self.ua,
"Accept-Language": self.accept_language,
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-ch-ua-arch": '"x86"',
"sec-ch-ua-bitness": '"64"',
"sec-ch-ua-full-version": f'"{self.chrome_full}"',
"sec-ch-ua-platform-version": f'"{random.randint(10, 15)}.0.0"',
}
)
# 设置 oai-did cookie
seed_oai_device_cookie(self.session, self.device_id)
self.last_registration_state = FlowState()
self.last_stage = ""
def _get_sentinel_token(self, flow: str, *, page_url: str | None = None):
prefer_browser = flow in {"username_password_create", "oauth_create_account"}
if prefer_browser:
token = get_sentinel_token_via_browser(
flow=flow,
proxy=self.proxy,
page_url=page_url,
headless=self.browser_mode != "headed",
device_id=self.device_id,
log_fn=lambda msg: self._log(msg),
)
if token:
self._log(f"{flow}: 已通过 Playwright SentinelSDK 获取 token")
return token
token = build_sentinel_token(
self.session,
self.device_id,
flow=flow,
user_agent=self.ua,
sec_ch_ua=self.sec_ch_ua,
impersonate=self.impersonate,
)
if token:
self._log(f"{flow}: 已通过 HTTP PoW 获取 token")
return token
def _log(self, msg):
"""输出日志"""
if self.verbose:
print(f" {msg}")
def _enter_stage(self, stage: str, detail: str = ""):
self.last_stage = str(stage or "").strip()
if self.last_stage:
message = f"[stage={self.last_stage}]"
if detail:
message += f" {detail}"
self._log(message)
def _browser_pause(self, low=0.15, high=0.45):
"""在 headed 模式下加入轻微停顿,模拟有头浏览器节奏。"""
if self.browser_mode == "headed":
random_delay(low, high)
def _headers(
self,
url,
*,
accept,
referer=None,
origin=None,
content_type=None,
navigation=False,
fetch_mode=None,
fetch_dest=None,
fetch_site=None,
extra_headers=None,
):
return build_browser_headers(
url=url,
user_agent=self.ua,
sec_ch_ua=self.sec_ch_ua,
chrome_full_version=self.chrome_full,
accept=accept,
accept_language=self.accept_language,
referer=referer,
origin=origin,
content_type=content_type,
navigation=navigation,
fetch_mode=fetch_mode,
fetch_dest=fetch_dest,
fetch_site=fetch_site,
headed=self.browser_mode == "headed",
extra_headers=extra_headers,
)
def _reset_session(self):
"""重置浏览器指纹与会话,用于绕过偶发的 Cloudflare/SPA 中间页。"""
self.device_id = str(uuid.uuid4())
(
self.impersonate,
self.chrome_major,
self.chrome_full,
self.ua,
self.sec_ch_ua,
) = _random_chrome_version()
self.accept_language = random.choice(
[
"en-US,en;q=0.9",
"en-US,en;q=0.9,zh-CN;q=0.8",
"en,en-US;q=0.9",
"en-US,en;q=0.8",
]
)
self.session = curl_requests.Session(impersonate=self.impersonate)
if self.proxy:
self.session.proxies = build_requests_proxy_config(self.proxy)
self.session.headers.update(
{
"User-Agent": self.ua,
"Accept-Language": self.accept_language,
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-ch-ua-arch": '"x86"',
"sec-ch-ua-bitness": '"64"',
"sec-ch-ua-full-version": f'"{self.chrome_full}"',
"sec-ch-ua-platform-version": f'"{random.randint(10, 15)}.0.0"',
}
)
seed_oai_device_cookie(self.session, self.device_id)
def _state_from_url(self, url, method="GET"):
state = extract_flow_state(
current_url=normalize_flow_url(url, auth_base=self.AUTH),
auth_base=self.AUTH,
default_method=method,
)
if method:
state.method = str(method).upper()
return state
def _state_from_payload(self, data, current_url=""):
return extract_flow_state(
data=data,
current_url=current_url,
auth_base=self.AUTH,
)
def _state_signature(self, state: FlowState):
return (
state.page_type or "",
state.method or "",
state.continue_url or "",
state.current_url or "",
)
def _is_registration_complete_state(self, state: FlowState):
current_url = (state.current_url or "").lower()
continue_url = (state.continue_url or "").lower()
page_type = state.page_type or ""
return (
page_type in {"callback", "chatgpt_home", "oauth_callback"}
or ("chatgpt.com" in current_url and "redirect_uri" not in current_url)
or (
"chatgpt.com" in continue_url
and "redirect_uri" not in continue_url
and page_type != "external_url"
)
)
def _state_is_password_registration(self, state: FlowState):
return state.page_type in {"create_account_password", "password"}
def _state_is_email_otp(self, state: FlowState):
target = f"{state.continue_url} {state.current_url}".lower()
return (
state.page_type == "email_otp_verification"
or "email-verification" in target
or "email-otp" in target
)
def _state_is_about_you(self, state: FlowState):
target = f"{state.continue_url} {state.current_url}".lower()
return state.page_type == "about_you" or "about-you" in target
def _state_requires_navigation(self, state: FlowState):
if (state.method or "GET").upper() != "GET":
return False
if state.page_type == "external_url" and state.continue_url:
return True
if state.continue_url and state.continue_url != state.current_url:
return True
return False
def _follow_flow_state(self, state: FlowState, referer=None):
"""跟随服务端返回的 continue_url推进注册状态机。"""
target_url = state.continue_url or state.current_url
if not target_url:
return False, "缺少可跟随的 continue_url"
try:
self._browser_pause()
r = self.session.get(
target_url,
headers=self._headers(
target_url,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
referer=referer,
navigation=True,
),
allow_redirects=True,
timeout=30,
)
final_url = str(r.url)
self._log(f"follow -> {r.status_code} {final_url}")
content_type = (r.headers.get("content-type", "") or "").lower()
if "application/json" in content_type:
try:
next_state = self._state_from_payload(
r.json(), current_url=final_url
)
except Exception:
next_state = self._state_from_url(final_url)
else:
next_state = self._state_from_url(final_url)
self._log(f"follow state -> {describe_flow_state(next_state)}")
return True, next_state
except Exception as e:
self._log(f"跟随 continue_url 失败: {e}")
return False, str(e)
def _get_cookie_value(self, name, domain_hint=None):
"""读取当前会话中的 Cookie。"""
for cookie in self.session.cookies.jar:
if cookie.name != name:
continue
if domain_hint and domain_hint not in (cookie.domain or ""):
continue
return cookie.value
return ""
def get_next_auth_session_token(self):
"""获取 ChatGPT next-auth 会话 Cookie。"""
return (
self._get_cookie_value("__Secure-next-auth.session-token", "chatgpt.com")
or self._get_cookie_value("__Secure-authjs.session-token", "chatgpt.com")
)
def fetch_chatgpt_session(self, max_attempts=5, retry_delay=1.2):
"""请求 ChatGPT Session 接口并返回原始会话数据。"""
url = f"{self.BASE}/api/auth/session"
last_error = ""
for attempt in range(max(1, int(max_attempts or 1))):
try:
self._browser_pause()
response = self.session.get(
url,
headers=self._headers(
url,
accept="application/json",
referer=f"{self.BASE}/",
fetch_site="same-origin",
),
timeout=30,
)
except Exception as exc:
last_error = f"/api/auth/session 请求异常: {exc}"
if attempt < max_attempts - 1:
self._log(
f"{last_error},等待 {retry_delay:.1f}s 后重试 "
f"({attempt + 1}/{max_attempts})"
)
time.sleep(retry_delay)
continue
return False, last_error
if response.status_code != 200:
last_error = f"/api/auth/session -> HTTP {response.status_code}"
if attempt < max_attempts - 1:
self._log(
f"{last_error},等待 {retry_delay:.1f}s 后重试 "
f"({attempt + 1}/{max_attempts})"
)
time.sleep(retry_delay)
continue
return False, last_error
try:
data = response.json()
except Exception as exc:
last_error = f"/api/auth/session 返回非 JSON: {exc}"
if attempt < max_attempts - 1:
self._log(
f"{last_error},等待 {retry_delay:.1f}s 后重试 "
f"({attempt + 1}/{max_attempts})"
)
time.sleep(retry_delay)
continue
return False, last_error
access_token = str(data.get("accessToken") or "").strip()
if access_token:
return True, data
last_error = "/api/auth/session 未返回 accessToken"
if attempt < max_attempts - 1:
self._log(
f"{last_error},等待 {retry_delay:.1f}s 后重试 "
f"({attempt + 1}/{max_attempts})"
)
try:
self.session.get(
f"{self.BASE}/",
headers=self._headers(
f"{self.BASE}/",
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
referer=f"{self.BASE}/",
navigation=True,
),
allow_redirects=True,
timeout=30,
)
except Exception:
pass
time.sleep(retry_delay)
continue
return False, last_error
return False, last_error or "/api/auth/session 未返回 accessToken"
def reuse_session_and_get_tokens(self):
"""
承接前序阶段已建立的 ChatGPT 会话,直接读取 Session / AccessToken。
Returns:
tuple[bool, dict|str]: 成功时返回标准化 token/session 数据;失败时返回错误信息。
"""
self._enter_stage("token_exchange", "reuse session -> /api/auth/session")
state = self.last_registration_state or FlowState()
self._log("步骤 1/4: 跟随注册回调 external_url ...")
if state.page_type == "external_url" or self._state_requires_navigation(state):
ok, followed = self._follow_flow_state(
state,
referer=state.current_url or f"{self.AUTH}/about-you",
)
if not ok:
return False, f"注册回调落地失败: {followed}"
self.last_registration_state = followed
else:
self._log("注册回调已落地,跳过额外跟随")
self._log("步骤 2/4: 检查 __Secure-next-auth.session-token ...")
session_cookie = ""
for attempt in range(5):
session_cookie = self.get_next_auth_session_token()
if session_cookie:
break
self._log(
f"next-auth session cookie 尚未落地,补一次 ChatGPT 首页触达 "
f"({attempt + 1}/5)"
)
try:
self._browser_pause(0.2, 0.5)
self.session.get(
f"{self.BASE}/",
headers=self._headers(
f"{self.BASE}/",
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
referer=state.current_url or f"{self.AUTH}/about-you",
navigation=True,
),
allow_redirects=True,
timeout=30,
)
except Exception as exc:
self._log(f"补触达 ChatGPT 首页异常: {exc}")
time.sleep(1.0)
if not session_cookie:
return False, "缺少 ChatGPT session-token注册回调可能未完全落地"
self._log("步骤 3/4: 请求 ChatGPT /api/auth/session ...")
ok, session_or_error = self.fetch_chatgpt_session()
if not ok:
return False, session_or_error
session_data = session_or_error
access_token = str(session_data.get("accessToken") or "").strip()
session_token = str(
session_data.get("sessionToken") or session_cookie or ""
).strip()
user = session_data.get("user") or {}
account = session_data.get("account") or {}
jwt_payload = decode_jwt_payload(access_token)
auth_payload = jwt_payload.get("https://api.openai.com/auth") or {}
account_id = (
str(account.get("id") or "").strip()
or str(auth_payload.get("chatgpt_account_id") or "").strip()
)
user_id = (
str(user.get("id") or "").strip()
or str(auth_payload.get("chatgpt_user_id") or "").strip()
or str(auth_payload.get("user_id") or "").strip()
)
normalized = {
"access_token": access_token,
"session_token": session_token,
"account_id": account_id,
"user_id": user_id,
"workspace_id": account_id,
"expires": session_data.get("expires"),
"user": user,
"account": account,
"auth_provider": session_data.get("authProvider"),
"raw_session": session_data,
}
self._log("步骤 4/4: 已从当前会话中提取 accessToken")
if account_id:
self._log(f"Session Account ID: {account_id}")
if user_id:
self._log(f"Session User ID: {user_id}")
return True, normalized
def visit_homepage(self):
"""访问首页,建立 session"""
self._log("访问 ChatGPT 首页...")
url = f"{self.BASE}/"
try:
self._browser_pause()
r = self.session.get(
url,
headers=self._headers(
url,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
navigation=True,
),
allow_redirects=True,
timeout=30,
)
return r.status_code == 200
except Exception as e:
self._log(f"访问首页失败: {e}")
return False
def get_csrf_token(self):
"""获取 CSRF token"""
self._log("获取 CSRF token...")
url = f"{self.BASE}/api/auth/csrf"
try:
r = self.session.get(
url,
headers=self._headers(
url,
accept="application/json",
referer=f"{self.BASE}/",
fetch_site="same-origin",
),
timeout=30,
)
if r.status_code == 200:
data = r.json()
token = data.get("csrfToken", "")
if token:
self._log(f"CSRF token: {token[:20]}...")
return token
except Exception as e:
self._log(f"获取 CSRF token 失败: {e}")
return None
def signin(self, email, csrf_token):
"""
提交邮箱,获取 authorize URL
Returns:
str: authorize URL
"""
self._log(f"提交邮箱: {email}")
url = f"{self.BASE}/api/auth/signin/openai"
params = {
"prompt": "login",
"ext-oai-did": self.device_id,
"auth_session_logging_id": str(uuid.uuid4()),
"screen_hint": "login_or_signup",
"login_hint": email,
}
form_data = {
"callbackUrl": f"{self.BASE}/",
"csrfToken": csrf_token,
"json": "true",
}
try:
self._browser_pause()
r = self.session.post(
url,
params=params,
data=form_data,
headers=self._headers(
url,
accept="application/json",
referer=f"{self.BASE}/",
origin=self.BASE,
content_type="application/x-www-form-urlencoded",
fetch_site="same-origin",
),
timeout=30,
)
if r.status_code == 200:
data = r.json()
authorize_url = data.get("url", "")
if authorize_url:
self._log(f"获取到 authorize URL")
return authorize_url
except Exception as e:
self._log(f"提交邮箱失败: {e}")
return None
def authorize(self, url, max_retries=3):
"""
访问 authorize URL跟随重定向带重试机制
这是关键步骤,建立 auth.openai.com 的 session
Returns:
str: 最终重定向的 URL
"""
for attempt in range(max_retries):
try:
if attempt > 0:
self._log(
f"访问 authorize URL... (尝试 {attempt + 1}/{max_retries})"
)
time.sleep(1) # 重试前等待
else:
self._log("访问 authorize URL...")
self._browser_pause()
r = self.session.get(
url,
headers=self._headers(
url,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
referer=f"{self.BASE}/",
navigation=True,
),
allow_redirects=True,
timeout=30,
)
final_url = str(r.url)
self._log(f"重定向到: {final_url}")
return final_url
except Exception as e:
error_msg = str(e)
is_tls_error = (
"TLS" in error_msg
or "SSL" in error_msg
or "curl: (35)" in error_msg
)
if is_tls_error and attempt < max_retries - 1:
self._log(
f"Authorize TLS 错误 (尝试 {attempt + 1}/{max_retries}): {error_msg[:100]}"
)
continue
else:
self._log(f"Authorize 失败: {e}")
return ""
return ""
def callback(self, callback_url=None, referer=None):
"""完成注册回调"""
self._log("执行回调...")
url = callback_url or f"{self.AUTH}/api/accounts/authorize/callback"
ok, _ = self._follow_flow_state(
self._state_from_url(url),
referer=referer or f"{self.AUTH}/about-you",
)
return ok
def register_user(self, email, password):
"""
注册用户(邮箱 + 密码)
Returns:
tuple: (success, message)
"""
self._enter_stage("authorize_continue", f"register_user email={email}")
self._log(f"注册用户: {email}")
url = f"{self.AUTH}/api/accounts/user/register"
headers = self._headers(
url,
accept="application/json",
referer=f"{self.AUTH}/create-account/password",
origin=self.AUTH,
content_type="application/json",
fetch_site="same-origin",
)
headers.update(generate_datadog_trace())
headers["oai-device-id"] = self.device_id
sentinel_token = self._get_sentinel_token(
"username_password_create",
page_url=f"{self.AUTH}/create-account/password",
)
if sentinel_token:
headers["openai-sentinel-token"] = sentinel_token
payload = {
"username": email,
"password": password,
}
try:
self._browser_pause()
r = self.session.post(url, json=payload, headers=headers, timeout=30)
if r.status_code == 200:
data = r.json()
self._log("注册成功")
self._log(f"authorize_continue/register_user 响应 URL: {str(r.url)[:120]}")
return True, "注册成功"
else:
try:
error_data = r.json()
error_msg = error_data.get("error", {}).get("message", r.text[:200])
except:
error_msg = r.text[:200]
self._log(f"注册失败: {r.status_code} - {error_msg}")
return False, f"HTTP {r.status_code}: {error_msg}"
except Exception as e:
self._log(f"注册异常: {e}")
return False, str(e)
def send_email_otp(self, referer=None):
"""触发发送邮箱验证码"""
self._enter_stage("otp", "send email otp")
self._log("触发发送验证码...")
url = f"{self.AUTH}/api/accounts/email-otp/send"
try:
self._browser_pause()
r = self.session.get(
url,
headers=self._headers(
url,
accept="application/json, text/plain, */*",
referer=referer or f"{self.AUTH}/create-account/password",
fetch_site="same-origin",
),
allow_redirects=True,
timeout=30,
)
self._log(f"验证码发送状态: {r.status_code}")
if r.status_code != 200:
self._log(f"验证码发送失败响应: {r.text[:180]}")
return False
try:
payload = r.json()
except Exception:
payload = {}
if isinstance(payload, dict) and payload:
next_state = self._state_from_payload(payload, current_url=str(r.url) or url)
self._log(f"验证码发送响应: {describe_flow_state(next_state)}")
self._log(f"otp/send 当前 URL: {str(r.url)[:120]}")
else:
self._log("验证码发送响应: 非 JSON按已触发处理")
return True
except Exception as e:
self._log(f"发送验证码失败: {e}")
return False
def verify_email_otp(self, otp_code, return_state=False):
"""
验证邮箱 OTP 码
Args:
otp_code: 6位验证码
Returns:
tuple: (success, message)
"""
self._enter_stage("otp", f"verify email otp code={otp_code}")
self._log(f"验证 OTP 码: {otp_code}")
url = f"{self.AUTH}/api/accounts/email-otp/validate"
headers = self._headers(
url,
accept="application/json",
referer=f"{self.AUTH}/email-verification",
origin=self.AUTH,
content_type="application/json",
fetch_site="same-origin",
)
headers.update(generate_datadog_trace())
payload = {"code": otp_code}
try:
self._browser_pause()
r = self.session.post(url, json=payload, headers=headers, timeout=30)
if r.status_code == 200:
try:
data = r.json()
except Exception:
data = {}
next_state = self._state_from_payload(
data, current_url=str(r.url) or f"{self.AUTH}/about-you"
)
self._log(f"验证成功 {describe_flow_state(next_state)}")
self._log(f"otp/validate 当前 URL: {str(r.url)[:120]}")
return (True, next_state) if return_state else (True, "验证成功")
else:
error_msg = r.text[:200]
self._log(f"验证失败: {r.status_code} - {error_msg}")
return False, f"HTTP {r.status_code}"
except Exception as e:
self._log(f"验证异常: {e}")
return False, str(e)
def create_account(self, first_name, last_name, birthdate, return_state=False):
"""
完成账号创建(提交姓名和生日)
Args:
first_name: 名
last_name: 姓
birthdate: 生日 (YYYY-MM-DD)
Returns:
tuple: (success, message)
"""
self._enter_stage("about_you", "register create_account")
name = f"{first_name} {last_name}"
self._log(f"完成账号创建: {name}")
url = f"{self.AUTH}/api/accounts/create_account"
sentinel_token = self._get_sentinel_token(
"oauth_create_account",
page_url=f"{self.AUTH}/about-you",
)
if sentinel_token:
self._log("create_account: 已生成 sentinel token")
else:
self._log("create_account: 未生成 sentinel token降级继续请求")
headers = self._headers(
url,
accept="application/json",
referer=f"{self.AUTH}/about-you",
origin=self.AUTH,
content_type="application/json",
fetch_site="same-origin",
extra_headers={
"oai-device-id": self.device_id,
},
)
if sentinel_token:
headers["openai-sentinel-token"] = sentinel_token
headers.update(generate_datadog_trace())
payload = {
"name": name,
"birthdate": birthdate,
}
try:
self._browser_pause()
r = self.session.post(url, json=payload, headers=headers, timeout=30)
if r.status_code == 200:
try:
data = r.json()
except Exception:
data = {}
next_state = self._state_from_payload(
data, current_url=str(r.url) or self.BASE
)
self._log(f"账号创建成功 {describe_flow_state(next_state)}")
self._log(f"about_you/create_account 当前 URL: {str(r.url)[:120]}")
return (True, next_state) if return_state else (True, "账号创建成功")
else:
error_code = ""
error_msg = r.text[:200]
try:
error_data = r.json() or {}
error_info = error_data.get("error") or {}
error_code = str(error_info.get("code") or "").strip()
error_msg = str(error_info.get("message") or error_msg).strip()
except Exception:
pass
detail = f"HTTP {r.status_code}"
if error_code:
detail += f": {error_code}"
elif error_msg:
detail += f": {error_msg}"
self._log(f"创建失败: {detail} - {error_msg[:200]}")
return False, detail
except Exception as e:
self._log(f"创建异常: {e}")
return False, str(e)
def register_complete_flow(
self,
email,
password,
first_name,
last_name,
birthdate,
skymail_client,
stop_before_about_you_submission=False,
otp_wait_timeout=600,
otp_resend_wait_timeout=300,
):
"""
完整的注册流程(基于原版 run_register 方法)
Args:
email: 邮箱
password: 密码
first_name: 名
last_name: 姓
birthdate: 生日
skymail_client: Skymail 客户端(用于获取验证码)
Returns:
tuple: (success, message)
"""
from urllib.parse import urlparse
self._log(
"注册状态机参数: "
f"stop_before_about_you_submission={'on' if stop_before_about_you_submission else 'off'}, "
f"otp_wait_timeout={otp_wait_timeout}s, otp_resend_wait_timeout={otp_resend_wait_timeout}s"
)
try:
otp_wait_timeout = max(30, int(otp_wait_timeout or 600))
except Exception:
otp_wait_timeout = 600
try:
otp_resend_wait_timeout = max(30, int(otp_resend_wait_timeout or 300))
except Exception:
otp_resend_wait_timeout = 300
max_auth_attempts = 3
final_url = ""
final_path = ""
for auth_attempt in range(max_auth_attempts):
if auth_attempt > 0:
self._log(f"预授权阶段重试 {auth_attempt + 1}/{max_auth_attempts}...")
self._reset_session()
# 1. 访问首页
if not self.visit_homepage():
if auth_attempt < max_auth_attempts - 1:
continue
return False, "访问首页失败"
# 2. 获取 CSRF token
csrf_token = self.get_csrf_token()
if not csrf_token:
if auth_attempt < max_auth_attempts - 1:
continue
return False, "获取 CSRF token 失败"
# 3. 提交邮箱,获取 authorize URL
auth_url = self.signin(email, csrf_token)
if not auth_url:
if auth_attempt < max_auth_attempts - 1:
continue
return False, "提交邮箱失败"
# 4. 访问 authorize URL关键步骤
final_url = self.authorize(auth_url)
if not final_url:
if auth_attempt < max_auth_attempts - 1:
continue
return False, "Authorize 失败"
final_path = urlparse(final_url).path
self._log(f"Authorize → {final_path}")
# /api/accounts/authorize 实际上常对应 Cloudflare 403 中间页,不要继续走 authorize_continue。
if "api/accounts/authorize" in final_path or final_path == "/error":
self._log(
f"检测到 Cloudflare/SPA 中间页,准备重试预授权: {final_url[:160]}..."
)
if auth_attempt < max_auth_attempts - 1:
continue
return False, f"预授权被拦截: {final_path}"
break
state = self._state_from_url(final_url)
self._log(f"注册状态起点: {describe_flow_state(state)}")
register_submitted = False
otp_verified = False
account_created = False
seen_states = {}
otp_send_attempts = 0
for _ in range(12):
signature = self._state_signature(state)
seen_states[signature] = seen_states.get(signature, 0) + 1
self._log(
f"注册状态推进: step={sum(seen_states.values())} "
f"state={describe_flow_state(state)} seen={seen_states[signature]}"
)
if seen_states[signature] > 2:
return False, f"注册状态卡住: {describe_flow_state(state)}"
if self._is_registration_complete_state(state):
self.last_registration_state = state
self._log("[OK] 注册流程完成")
return True, "注册成功"
if self._state_is_password_registration(state):
self._enter_stage("authorize_continue", describe_flow_state(state))
self._log("全新注册流程")
if register_submitted:
return False, "注册密码阶段重复进入"
success, msg = self.register_user(email, password)
if not success:
return False, f"注册失败: {msg}"
register_submitted = True
otp_send_attempts += 1
self._log(f"发送注册验证码: attempt={otp_send_attempts}")
if not self.send_email_otp(
referer=state.current_url or state.continue_url or f"{self.AUTH}/create-account/password"
):
self._log("发送验证码接口返回失败,继续等待邮箱中的验证码...")
else:
self._log("发送注册验证码成功,进入收码阶段")
state = self._state_from_url(f"{self.AUTH}/email-verification")
continue
if self._state_is_email_otp(state):
self._enter_stage("otp", describe_flow_state(state))
self._log("等待邮箱验证码...")
otp_code = skymail_client.wait_for_verification_code(
email, timeout=otp_wait_timeout
)
if not otp_code:
self._log(
"首次等待未收到验证码,尝试重发一次 email-otp/send "
f"后再等待 {otp_resend_wait_timeout}s"
)
otp_send_attempts += 1
resend_ok = self.send_email_otp(
referer=state.current_url or state.continue_url or f"{self.AUTH}/email-verification"
)
if resend_ok:
self._log(f"重发验证码成功: attempt={otp_send_attempts}")
else:
self._log(f"重发验证码失败: attempt={otp_send_attempts}")
otp_code = skymail_client.wait_for_verification_code(
email, timeout=otp_resend_wait_timeout
)
if not otp_code:
return False, "未收到验证码"
success, next_state = self.verify_email_otp(otp_code, return_state=True)
if not success:
return False, f"验证码失败: {next_state}"
otp_verified = True
state = next_state
self.last_registration_state = state
continue
if self._state_is_about_you(state):
self._enter_stage("about_you", describe_flow_state(state))
if stop_before_about_you_submission:
self.last_registration_state = state
self._log(
"注册链路已到 about_you按 interrupt 流程停止。"
"下一步交由 OAuth 新会话提交姓名+生日。"
)
return True, "pending_about_you_submission"
if account_created:
return False, "填写信息阶段重复进入"
success, next_state = self.create_account(
first_name,
last_name,
birthdate,
return_state=True,
)
if not success:
return False, f"创建账号失败: {next_state}"
account_created = True
state = next_state
self.last_registration_state = state
continue
if self._state_requires_navigation(state):
if "workspace" in f"{state.continue_url} {state.current_url}".lower() or "consent" in f"{state.continue_url} {state.current_url}".lower():
self._enter_stage("workspace_select", describe_flow_state(state))
elif state.page_type == "external_url":
self._enter_stage("token_exchange", describe_flow_state(state))
success, next_state = self._follow_flow_state(
state,
referer=state.current_url or f"{self.AUTH}/about-you",
)
if not success:
return False, f"跳转失败: {next_state}"
state = next_state
self.last_registration_state = state
continue
if (
(not register_submitted)
and (not otp_verified)
and (not account_created)
):
self._log(
f"未知起始状态,回退为全新注册流程: {describe_flow_state(state)}"
)
state = self._state_from_url(f"{self.AUTH}/create-account/password")
continue
return False, f"未支持的注册状态: {describe_flow_state(state)}"
return False, "注册状态机超出最大步数"