fix(chatgpt): improve workspace recovery and login fallback

This commit is contained in:
cong
2026-04-04 00:08:04 +08:00
parent 6441c93d88
commit 53b0a04279
3 changed files with 363 additions and 26 deletions

View File

@@ -62,6 +62,7 @@ OPENAI_API_ENDPOINTS = {
"signup": "https://auth.openai.com/api/accounts/authorize/continue",
"register": "https://auth.openai.com/api/accounts/user/register",
"password_verify": "https://auth.openai.com/api/accounts/password/verify",
"passwordless_send_otp": "https://auth.openai.com/api/accounts/passwordless/send-otp",
"send_otp": "https://auth.openai.com/api/accounts/email-otp/send",
"validate_otp": "https://auth.openai.com/api/accounts/email-otp/validate",
"create_account": "https://auth.openai.com/api/accounts/create_account",

View File

@@ -6,6 +6,8 @@
import base64
import json
import logging
import random
import re
import secrets
import time
import urllib.parse
@@ -144,6 +146,7 @@ class RefreshTokenRegistrationEngine:
self._otp_sent_at: Optional[float] = None # OTP 发送时间戳
self._device_id: Optional[str] = None # 当前注册流程复用的 Device ID
self._used_verification_codes = set() # 已取过的验证码,避免二次登录时捞到旧码
self._last_verification_code: Optional[str] = None # 最近一次成功校验的验证码
self._is_existing_account: bool = False # 是否为已注册账号(用于自动登录)
self._token_acquisition_requires_login: bool = False # 新注册账号需要二次登录拿 token
self._post_otp_continue_url: str = ""
@@ -339,10 +342,24 @@ class RefreshTokenRegistrationEngine:
if not self.session:
self.session = self.http_client.session
if flow in {"username_password_create", "oauth_create_account"}:
headless = self.browser_mode != "headed"
if self.browser_mode == "headed":
try:
import os
if os.name != "nt":
has_display = bool(
os.environ.get("DISPLAY")
or os.environ.get("WAYLAND_DISPLAY")
)
if not has_display:
headless = True
self._log("未检测到显示环境,强制使用 headless 模式", "warning")
except Exception:
pass
browser_token = get_sentinel_token_via_browser(
flow=flow,
proxy=self.proxy_url,
headless=self.browser_mode != "headed",
headless=headless,
device_id=did,
log_fn=lambda msg: self._log(msg),
)
@@ -384,6 +401,21 @@ class RefreshTokenRegistrationEngine:
SignupFormResult: 提交结果,包含账号状态判断
"""
try:
self._log(f"{log_label}: 先访问页面获取 Cloudflare Cookie...")
try:
page_url = referer
nav_headers = self._build_navigation_headers(referer=page_url)
page_resp = self.session.get(
page_url,
headers=nav_headers,
allow_redirects=True,
timeout=15,
)
self._log(f"{log_label}: 页面访问状态: {page_resp.status_code}")
time.sleep(random.uniform(1.0, 2.5))
except Exception as page_err:
self._log(f"{log_label}: 页面访问异常(继续尝试): {page_err}", "warning")
request_body = json.dumps({
"username": {
"value": self.email,
@@ -402,6 +434,8 @@ class RefreshTokenRegistrationEngine:
if sen_token:
headers["openai-sentinel-token"] = sen_token
time.sleep(random.uniform(0.8, 2.0))
response = self.session.post(
OPENAI_API_ENDPOINTS["signup"],
headers=headers,
@@ -522,6 +556,40 @@ class RefreshTokenRegistrationEngine:
self._log(f"提交登录密码失败: {e}", "error")
return SignupFormResult(success=False, error_message=str(e))
def _send_passwordless_login_otp(self) -> bool:
"""触发登录验证码passwordless"""
try:
headers = self._build_json_headers(
referer="https://auth.openai.com/log-in/password",
include_device_id=True,
include_datadog=True,
)
response = self.session.post(
OPENAI_API_ENDPOINTS["passwordless_send_otp"],
headers=headers,
data=json.dumps({"email": self.email or ""}),
)
self._log(f"触发登录验证码状态: {response.status_code}")
if response.status_code == 200:
self._otp_sent_at = time.time()
return True
fallback_resp = self.session.get(
OPENAI_API_ENDPOINTS["send_otp"],
headers=self._build_navigation_headers(
referer="https://auth.openai.com/log-in/password"
),
)
self._log(f"触发登录验证码(备用)状态: {fallback_resp.status_code}")
if fallback_resp.status_code == 200:
self._otp_sent_at = time.time()
return True
return False
except Exception as e:
self._log(f"触发登录验证码失败: {e}", "warning")
return False
def _reset_auth_flow(self) -> None:
"""重置会话,准备重新发起 OAuth 流程。"""
self.http_client.close()
@@ -558,15 +626,26 @@ class RefreshTokenRegistrationEngine:
def _complete_token_exchange(self, result: RegistrationResult) -> bool:
"""在登录态已建立后,继续完成 workspace 和 OAuth token 获取。"""
self._log("等待登录验证码...")
code = self._get_verification_code()
if not code:
result.error_message = "获取验证码失败"
return False
last_code = str(self._last_verification_code or "").strip()
code = ""
if (self._is_existing_account or self._token_acquisition_requires_login) and last_code:
self._log(f"先尝试上一枚验证码: {last_code}")
if self._validate_verification_code(last_code):
self._log("上一枚验证码校验通过")
code = last_code
else:
self._log("上一枚验证码无效,改为获取新验证码", "warning")
self._log("校验登录验证码...")
if not self._validate_verification_code(code):
result.error_message = "验证码校验失败"
return False
if not code:
code = self._get_verification_code()
if not code:
result.error_message = "获取验证码失败"
return False
self._log("校验登录验证码...")
if not self._validate_verification_code(code):
result.error_message = "验证码校验失败"
return False
self._log("解析 OTP 后的 OAuth 跳转状态...")
continue_url = self._resolve_post_otp_continue_url()
@@ -623,6 +702,34 @@ class RefreshTokenRegistrationEngine:
return False, f"重新登录未进入验证码页面: {password_result.page_type or 'unknown'}"
return True, ""
def _restart_passwordless_login_flow(self) -> Tuple[bool, str]:
"""已存在账号走验证码登录流程获取 Token。"""
self._token_acquisition_requires_login = True
self._is_existing_account = True
self._log("账号已存在,开始验证码登录以获取 Token...")
self._reset_auth_flow()
did, sen_token = self._prepare_authorize_flow("重新登录(验证码)")
if not did:
return False, "重新登录时获取 Device ID 失败"
if not sen_token:
return False, "重新登录时 Sentinel POW 验证失败"
login_start_result = self._submit_login_start(did, sen_token)
if not login_start_result.success:
return False, f"重新登录提交邮箱失败: {login_start_result.error_message}"
if login_start_result.page_type == OPENAI_PAGE_TYPES["LOGIN_PASSWORD"]:
self._log("登录页为密码页,改用验证码登录", "warning")
if not self._send_passwordless_login_otp():
return False, "触发登录验证码失败"
return True, ""
if login_start_result.page_type != OPENAI_PAGE_TYPES["EMAIL_OTP_VERIFICATION"]:
return False, f"重新登录未进入验证码页面: {login_start_result.page_type or 'unknown'}"
return True, ""
def _register_password(self) -> Tuple[bool, Optional[str]]:
"""注册密码"""
try:
@@ -631,6 +738,21 @@ class RefreshTokenRegistrationEngine:
self.password = password # 保存密码到实例变量
self._log(f"生成密码: {password}")
self._log("提交密码前:先访问页面获取 Cloudflare Cookie...")
try:
page_url = "https://auth.openai.com/create-account/password"
nav_headers = self._build_navigation_headers(referer=page_url)
page_resp = self.session.get(
page_url,
headers=nav_headers,
allow_redirects=True,
timeout=15,
)
self._log(f"提交密码前:页面访问状态: {page_resp.status_code}")
time.sleep(random.uniform(1.5, 3.0))
except Exception as page_err:
self._log(f"提交密码前:页面访问异常(继续尝试): {page_err}", "warning")
# 提交密码注册
register_body = json.dumps({
"password": password,
@@ -649,6 +771,8 @@ class RefreshTokenRegistrationEngine:
if sen_token:
headers["openai-sentinel-token"] = sen_token
time.sleep(random.uniform(1.0, 2.5))
response = self.session.post(
OPENAI_API_ENDPOINTS["register"],
headers=headers,
@@ -786,7 +910,7 @@ class RefreshTokenRegistrationEngine:
code = self.email_service.get_verification_code(
email=self.email,
email_id=email_id,
timeout=30,
timeout=90,
pattern=OTP_CODE_PATTERN,
otp_sent_at=self._otp_sent_at,
exclude_codes=exclude_codes,
@@ -809,6 +933,22 @@ class RefreshTokenRegistrationEngine:
def _validate_verification_code(self, code: str) -> bool:
"""验证验证码"""
try:
self._log("验证前:访问邮箱验证码页面以模拟用户行为...")
try:
email_verification_url = "https://auth.openai.com/email-verification"
nav_headers = self._build_navigation_headers(referer=email_verification_url)
page_resp = self.session.get(
email_verification_url,
headers=nav_headers,
allow_redirects=True,
timeout=15,
)
self._log(f"验证前:页面访问状态: {page_resp.status_code}")
time.sleep(random.uniform(2.0, 4.5))
except Exception as page_err:
self._log(f"验证前:页面访问异常(继续): {page_err}", "warning")
time.sleep(random.uniform(0.8, 2.0))
code_body = f'{{"code":"{code}"}}'
headers = self._build_json_headers(
referer="https://auth.openai.com/email-verification",
@@ -845,13 +985,17 @@ class RefreshTokenRegistrationEngine:
self._log(f"验证码校验后 continue_url: {self._post_otp_continue_url}")
if self._post_otp_page_type:
self._log(f"验证码校验后页面类型: {self._post_otp_page_type}")
code_value = str(code or "").strip()
if code_value:
self._last_verification_code = code_value
self._used_verification_codes.add(code_value)
return True
except Exception as e:
self._log(f"验证验证码失败: {e}", "error")
return False
def _create_user_account(self) -> bool:
def _create_user_account(self) -> Tuple[bool, bool]:
"""创建用户账户"""
try:
user_info = generate_random_user_info()
@@ -879,11 +1023,30 @@ class RefreshTokenRegistrationEngine:
self._log(f"账户创建状态: {response.status_code}")
if response.status_code == 200:
return True
return True, False
body_preview = response.text[:200]
self._log(f"账户创建失败: {body_preview}", "warning")
error_message = ""
error_code = ""
try:
error_json = response.json() or {}
error_message = str((error_json.get("error") or {}).get("message") or "")
error_code = str((error_json.get("error") or {}).get("code") or "")
except Exception:
error_message = body_preview
if (
error_code == "user_already_exists"
or "already exists" in error_message.lower()
or "please login" in error_message.lower()
):
self._log("检测到账号已存在,切换到验证码登录流程", "warning")
self._is_existing_account = True
self.password = ""
return False, True
body_lower = body_preview.lower()
should_retry = response.status_code in (400, 403) and any(
marker in body_lower
@@ -912,14 +1075,33 @@ class RefreshTokenRegistrationEngine:
)
self._log(f"账户创建重试状态: {retry_resp.status_code}")
if retry_resp.status_code == 200:
return True
return True, False
self._log(f"账户创建重试失败: {retry_resp.text[:200]}", "warning")
return False
retry_message = ""
retry_code = ""
try:
retry_json = retry_resp.json() or {}
retry_message = str((retry_json.get("error") or {}).get("message") or "")
retry_code = str((retry_json.get("error") or {}).get("code") or "")
except Exception:
retry_message = retry_resp.text[:200]
if (
retry_code == "user_already_exists"
or "already exists" in retry_message.lower()
or "please login" in retry_message.lower()
):
self._log("重试后检测到账号已存在,切换到验证码登录流程", "warning")
self._is_existing_account = True
self.password = ""
return False, True
return False, False
except Exception as e:
self._log(f"创建账户失败: {e}", "error")
return False
return False, False
@staticmethod
def _decode_cookie_json_value(raw_value: str) -> Optional[Dict[str, Any]]:
@@ -956,6 +1138,143 @@ class RefreshTokenRegistrationEngine:
return None
return self._decode_cookie_json_value(auth_cookie)
def _fetch_consent_page_html(self, consent_url: str) -> str:
try:
headers = self._build_navigation_headers(
referer="https://auth.openai.com/email-verification"
)
response = self.session.get(
consent_url,
headers=headers,
allow_redirects=False,
timeout=30,
)
content_type = str(response.headers.get("content-type") or "").lower()
if response.status_code == 200 and "text/html" in content_type:
return response.text
self._log(
f"Workspace 解析: consent HTML 未命中 (status={response.status_code}, type={content_type})",
"warning",
)
except Exception:
pass
return ""
def _extract_session_data_from_consent_html(self, html: str) -> Optional[Dict[str, Any]]:
if not html or "workspaces" not in html:
return None
def _first_match(patterns, text):
for pattern in patterns:
match = re.search(pattern, text, re.S)
if match:
return match.group(1)
return ""
def _build_from_text(text):
if not text or "workspaces" not in text:
return None
normalized = text.replace('\\"', '"')
session_id = _first_match(
[
r'"session_id","([^"]+)"',
r'"session_id":"([^"]+)"',
],
normalized,
)
client_id = _first_match(
[
r'"openai_client_id","([^"]+)"',
r'"openai_client_id":"([^"]+)"',
],
normalized,
)
start = normalized.find('"workspaces"')
if start < 0:
start = normalized.find("workspaces")
if start < 0:
return None
end = normalized.find('"openai_client_id"', start)
if end < 0:
end = normalized.find("openai_client_id", start)
if end < 0:
end = min(len(normalized), start + 4000)
else:
end = min(len(normalized), end + 600)
workspace_chunk = normalized[start:end]
ids = re.findall(r'"id"(?:,|:)"([0-9a-fA-F-]{36})"', workspace_chunk)
if not ids:
return None
kinds = re.findall(r'"kind"(?:,|:)"([^"]+)"', workspace_chunk)
workspaces = []
seen = set()
for idx, wid in enumerate(ids):
if wid in seen:
continue
seen.add(wid)
item = {"id": wid}
if idx < len(kinds):
item["kind"] = kinds[idx]
workspaces.append(item)
if not workspaces:
return None
return {
"session_id": session_id,
"openai_client_id": client_id,
"workspaces": workspaces,
}
candidates = [html]
for quoted in re.findall(
r'streamController\.enqueue\(("(?:\\.|[^"\\])*")\)',
html,
re.S,
):
try:
decoded = json.loads(quoted)
except Exception:
continue
if decoded:
candidates.append(decoded)
if '\\"' in html:
candidates.append(html.replace('\\"', '"'))
for candidate in candidates:
parsed = _build_from_text(candidate)
if parsed and parsed.get("workspaces"):
return parsed
return None
def _load_workspace_session_data(self, consent_url: str) -> Optional[Dict[str, Any]]:
self._log("Workspace 解析: 尝试从 Cookie/consent HTML 提取 session 数据")
session_data = self._decode_auth_session_cookie()
if session_data and session_data.get("workspaces"):
self._log("Workspace 解析: Cookie 命中 workspace 数据")
return session_data
html = self._fetch_consent_page_html(consent_url)
if not html:
self._log("Workspace 解析: consent HTML 获取失败", "warning")
return session_data
parsed = self._extract_session_data_from_consent_html(html)
if parsed and parsed.get("workspaces"):
self._log(f"从 consent HTML 提取到 {len(parsed.get('workspaces', []))} 个 workspace")
return parsed
self._log("Workspace 解析: consent HTML 未提取到 workspace", "warning")
return session_data
def _extract_callback_url_from_candidate(self, candidate: str) -> str:
normalized = normalize_flow_url(str(candidate or "").strip(), auth_base="https://auth.openai.com")
if not normalized:
@@ -1097,8 +1416,8 @@ class RefreshTokenRegistrationEngine:
)
page_type = str(self._post_otp_page_type or "").strip().lower()
if continue_url and "about-you" in continue_url:
self._log("OTP 后进入 about-you按参考 RT 逻辑补齐 consent 跳转...")
if (continue_url and "about-you" in continue_url) or page_type in {"about_you", "about-you"}:
self._log("OTP 后进入 about-you访问页面补齐 Cookie/consent 跳转...")
try:
response = self.session.get(
"https://auth.openai.com/about-you",
@@ -1228,7 +1547,15 @@ class RefreshTokenRegistrationEngine:
except Exception as e:
self._log(f"加载 consent 页面异常: {e}", "warning")
workspace_id = self._get_workspace_id() or ""
self._log("Workspace 解析: 准备从 consent HTML 抽取 workspace 数据")
session_data = self._load_workspace_session_data(consent_url)
if session_data and session_data.get("workspaces"):
workspace_id = str((session_data.get("workspaces") or [{}])[0].get("id") or "").strip()
if workspace_id:
self._log(f"Workspace ID(HTML): {workspace_id}")
if not workspace_id:
workspace_id = self._get_workspace_id() or ""
if workspace_id:
try:
ws_response = self.session.post(
@@ -1405,6 +1732,7 @@ class RefreshTokenRegistrationEngine:
self._post_otp_continue_url = ""
self._post_otp_page_type = ""
self._used_verification_codes.clear()
self._last_verification_code = None
self._log("=" * 60)
self._log("注册流程启动")
@@ -1470,14 +1798,22 @@ class RefreshTokenRegistrationEngine:
return result
self._log("9. 创建用户账户...")
if not self._create_user_account():
result.error_message = "创建用户账户失败"
return result
account_ok, account_exists = self._create_user_account()
if account_exists:
self._log("账号已存在,切换到验证码登录流程", "warning")
login_ready, login_error = self._restart_passwordless_login_flow()
if not login_ready:
result.error_message = login_error
return result
else:
if not account_ok:
result.error_message = "创建用户账户失败"
return result
login_ready, login_error = self._restart_login_flow()
if not login_ready:
result.error_message = login_error
return result
login_ready, login_error = self._restart_login_flow()
if not login_ready:
result.error_message = login_error
return result
if not self._complete_token_exchange(result):
return result

View File

@@ -167,7 +167,7 @@ class RegistrationEngineFlowTests(unittest.TestCase):
mock.patch.object(engine, "_send_verification_code", return_value=True) as send_otp, \
mock.patch.object(engine, "_get_verification_code", return_value="123456") as get_otp, \
mock.patch.object(engine, "_validate_verification_code", return_value=True) as validate_otp, \
mock.patch.object(engine, "_create_user_account", return_value=True) as create_account, \
mock.patch.object(engine, "_create_user_account", return_value=(True, False)) as create_account, \
mock.patch.object(engine, "_restart_login_flow", side_effect=fake_restart_login_flow) as restart_login, \
mock.patch.object(engine, "_complete_token_exchange", side_effect=fake_complete_token_exchange) as complete_exchange:
result = engine.run()