diff --git a/platforms/chatgpt/constants.py b/platforms/chatgpt/constants.py index a331a63..9f78e09 100644 --- a/platforms/chatgpt/constants.py +++ b/platforms/chatgpt/constants.py @@ -62,6 +62,7 @@ OPENAI_API_ENDPOINTS = { "signup": "https://auth.openai.com/api/accounts/authorize/continue", "register": "https://auth.openai.com/api/accounts/user/register", "password_verify": "https://auth.openai.com/api/accounts/password/verify", + "passwordless_send_otp": "https://auth.openai.com/api/accounts/passwordless/send-otp", "send_otp": "https://auth.openai.com/api/accounts/email-otp/send", "validate_otp": "https://auth.openai.com/api/accounts/email-otp/validate", "create_account": "https://auth.openai.com/api/accounts/create_account", diff --git a/platforms/chatgpt/refresh_token_registration_engine.py b/platforms/chatgpt/refresh_token_registration_engine.py index b832296..c03ca1e 100644 --- a/platforms/chatgpt/refresh_token_registration_engine.py +++ b/platforms/chatgpt/refresh_token_registration_engine.py @@ -6,6 +6,8 @@ import base64 import json import logging +import random +import re import secrets import time import urllib.parse @@ -144,6 +146,7 @@ class RefreshTokenRegistrationEngine: self._otp_sent_at: Optional[float] = None # OTP 发送时间戳 self._device_id: Optional[str] = None # 当前注册流程复用的 Device ID self._used_verification_codes = set() # 已取过的验证码,避免二次登录时捞到旧码 + self._last_verification_code: Optional[str] = None # 最近一次成功校验的验证码 self._is_existing_account: bool = False # 是否为已注册账号(用于自动登录) self._token_acquisition_requires_login: bool = False # 新注册账号需要二次登录拿 token self._post_otp_continue_url: str = "" @@ -339,10 +342,24 @@ class RefreshTokenRegistrationEngine: if not self.session: self.session = self.http_client.session if flow in {"username_password_create", "oauth_create_account"}: + headless = self.browser_mode != "headed" + if self.browser_mode == "headed": + try: + import os + if os.name != "nt": + has_display = bool( + os.environ.get("DISPLAY") + or os.environ.get("WAYLAND_DISPLAY") + ) + if not has_display: + headless = True + self._log("未检测到显示环境,强制使用 headless 模式", "warning") + except Exception: + pass browser_token = get_sentinel_token_via_browser( flow=flow, proxy=self.proxy_url, - headless=self.browser_mode != "headed", + headless=headless, device_id=did, log_fn=lambda msg: self._log(msg), ) @@ -384,6 +401,21 @@ class RefreshTokenRegistrationEngine: SignupFormResult: 提交结果,包含账号状态判断 """ try: + self._log(f"{log_label}: 先访问页面获取 Cloudflare Cookie...") + try: + page_url = referer + nav_headers = self._build_navigation_headers(referer=page_url) + page_resp = self.session.get( + page_url, + headers=nav_headers, + allow_redirects=True, + timeout=15, + ) + self._log(f"{log_label}: 页面访问状态: {page_resp.status_code}") + time.sleep(random.uniform(1.0, 2.5)) + except Exception as page_err: + self._log(f"{log_label}: 页面访问异常(继续尝试): {page_err}", "warning") + request_body = json.dumps({ "username": { "value": self.email, @@ -402,6 +434,8 @@ class RefreshTokenRegistrationEngine: if sen_token: headers["openai-sentinel-token"] = sen_token + time.sleep(random.uniform(0.8, 2.0)) + response = self.session.post( OPENAI_API_ENDPOINTS["signup"], headers=headers, @@ -522,6 +556,40 @@ class RefreshTokenRegistrationEngine: self._log(f"提交登录密码失败: {e}", "error") return SignupFormResult(success=False, error_message=str(e)) + def _send_passwordless_login_otp(self) -> bool: + """触发登录验证码(passwordless)。""" + try: + headers = self._build_json_headers( + referer="https://auth.openai.com/log-in/password", + include_device_id=True, + include_datadog=True, + ) + response = self.session.post( + OPENAI_API_ENDPOINTS["passwordless_send_otp"], + headers=headers, + data=json.dumps({"email": self.email or ""}), + ) + self._log(f"触发登录验证码状态: {response.status_code}") + if response.status_code == 200: + self._otp_sent_at = time.time() + return True + + fallback_resp = self.session.get( + OPENAI_API_ENDPOINTS["send_otp"], + headers=self._build_navigation_headers( + referer="https://auth.openai.com/log-in/password" + ), + ) + self._log(f"触发登录验证码(备用)状态: {fallback_resp.status_code}") + if fallback_resp.status_code == 200: + self._otp_sent_at = time.time() + return True + return False + + except Exception as e: + self._log(f"触发登录验证码失败: {e}", "warning") + return False + def _reset_auth_flow(self) -> None: """重置会话,准备重新发起 OAuth 流程。""" self.http_client.close() @@ -558,15 +626,26 @@ class RefreshTokenRegistrationEngine: def _complete_token_exchange(self, result: RegistrationResult) -> bool: """在登录态已建立后,继续完成 workspace 和 OAuth token 获取。""" self._log("等待登录验证码...") - code = self._get_verification_code() - if not code: - result.error_message = "获取验证码失败" - return False + last_code = str(self._last_verification_code or "").strip() + code = "" + if (self._is_existing_account or self._token_acquisition_requires_login) and last_code: + self._log(f"先尝试上一枚验证码: {last_code}") + if self._validate_verification_code(last_code): + self._log("上一枚验证码校验通过") + code = last_code + else: + self._log("上一枚验证码无效,改为获取新验证码", "warning") - self._log("校验登录验证码...") - if not self._validate_verification_code(code): - result.error_message = "验证码校验失败" - return False + if not code: + code = self._get_verification_code() + if not code: + result.error_message = "获取验证码失败" + return False + + self._log("校验登录验证码...") + if not self._validate_verification_code(code): + result.error_message = "验证码校验失败" + return False self._log("解析 OTP 后的 OAuth 跳转状态...") continue_url = self._resolve_post_otp_continue_url() @@ -623,6 +702,34 @@ class RefreshTokenRegistrationEngine: return False, f"重新登录未进入验证码页面: {password_result.page_type or 'unknown'}" return True, "" + def _restart_passwordless_login_flow(self) -> Tuple[bool, str]: + """已存在账号走验证码登录流程获取 Token。""" + self._token_acquisition_requires_login = True + self._is_existing_account = True + self._log("账号已存在,开始验证码登录以获取 Token...") + self._reset_auth_flow() + + did, sen_token = self._prepare_authorize_flow("重新登录(验证码)") + if not did: + return False, "重新登录时获取 Device ID 失败" + if not sen_token: + return False, "重新登录时 Sentinel POW 验证失败" + + login_start_result = self._submit_login_start(did, sen_token) + if not login_start_result.success: + return False, f"重新登录提交邮箱失败: {login_start_result.error_message}" + + if login_start_result.page_type == OPENAI_PAGE_TYPES["LOGIN_PASSWORD"]: + self._log("登录页为密码页,改用验证码登录", "warning") + if not self._send_passwordless_login_otp(): + return False, "触发登录验证码失败" + return True, "" + + if login_start_result.page_type != OPENAI_PAGE_TYPES["EMAIL_OTP_VERIFICATION"]: + return False, f"重新登录未进入验证码页面: {login_start_result.page_type or 'unknown'}" + + return True, "" + def _register_password(self) -> Tuple[bool, Optional[str]]: """注册密码""" try: @@ -631,6 +738,21 @@ class RefreshTokenRegistrationEngine: self.password = password # 保存密码到实例变量 self._log(f"生成密码: {password}") + self._log("提交密码前:先访问页面获取 Cloudflare Cookie...") + try: + page_url = "https://auth.openai.com/create-account/password" + nav_headers = self._build_navigation_headers(referer=page_url) + page_resp = self.session.get( + page_url, + headers=nav_headers, + allow_redirects=True, + timeout=15, + ) + self._log(f"提交密码前:页面访问状态: {page_resp.status_code}") + time.sleep(random.uniform(1.5, 3.0)) + except Exception as page_err: + self._log(f"提交密码前:页面访问异常(继续尝试): {page_err}", "warning") + # 提交密码注册 register_body = json.dumps({ "password": password, @@ -649,6 +771,8 @@ class RefreshTokenRegistrationEngine: if sen_token: headers["openai-sentinel-token"] = sen_token + time.sleep(random.uniform(1.0, 2.5)) + response = self.session.post( OPENAI_API_ENDPOINTS["register"], headers=headers, @@ -786,7 +910,7 @@ class RefreshTokenRegistrationEngine: code = self.email_service.get_verification_code( email=self.email, email_id=email_id, - timeout=30, + timeout=90, pattern=OTP_CODE_PATTERN, otp_sent_at=self._otp_sent_at, exclude_codes=exclude_codes, @@ -809,6 +933,22 @@ class RefreshTokenRegistrationEngine: def _validate_verification_code(self, code: str) -> bool: """验证验证码""" try: + self._log("验证前:访问邮箱验证码页面以模拟用户行为...") + try: + email_verification_url = "https://auth.openai.com/email-verification" + nav_headers = self._build_navigation_headers(referer=email_verification_url) + page_resp = self.session.get( + email_verification_url, + headers=nav_headers, + allow_redirects=True, + timeout=15, + ) + self._log(f"验证前:页面访问状态: {page_resp.status_code}") + time.sleep(random.uniform(2.0, 4.5)) + except Exception as page_err: + self._log(f"验证前:页面访问异常(继续): {page_err}", "warning") + + time.sleep(random.uniform(0.8, 2.0)) code_body = f'{{"code":"{code}"}}' headers = self._build_json_headers( referer="https://auth.openai.com/email-verification", @@ -845,13 +985,17 @@ class RefreshTokenRegistrationEngine: self._log(f"验证码校验后 continue_url: {self._post_otp_continue_url}") if self._post_otp_page_type: self._log(f"验证码校验后页面类型: {self._post_otp_page_type}") + code_value = str(code or "").strip() + if code_value: + self._last_verification_code = code_value + self._used_verification_codes.add(code_value) return True except Exception as e: self._log(f"验证验证码失败: {e}", "error") return False - def _create_user_account(self) -> bool: + def _create_user_account(self) -> Tuple[bool, bool]: """创建用户账户""" try: user_info = generate_random_user_info() @@ -879,11 +1023,30 @@ class RefreshTokenRegistrationEngine: self._log(f"账户创建状态: {response.status_code}") if response.status_code == 200: - return True + return True, False body_preview = response.text[:200] self._log(f"账户创建失败: {body_preview}", "warning") + error_message = "" + error_code = "" + try: + error_json = response.json() or {} + error_message = str((error_json.get("error") or {}).get("message") or "") + error_code = str((error_json.get("error") or {}).get("code") or "") + except Exception: + error_message = body_preview + + if ( + error_code == "user_already_exists" + or "already exists" in error_message.lower() + or "please login" in error_message.lower() + ): + self._log("检测到账号已存在,切换到验证码登录流程", "warning") + self._is_existing_account = True + self.password = "" + return False, True + body_lower = body_preview.lower() should_retry = response.status_code in (400, 403) and any( marker in body_lower @@ -912,14 +1075,33 @@ class RefreshTokenRegistrationEngine: ) self._log(f"账户创建重试状态: {retry_resp.status_code}") if retry_resp.status_code == 200: - return True + return True, False self._log(f"账户创建重试失败: {retry_resp.text[:200]}", "warning") - return False + retry_message = "" + retry_code = "" + try: + retry_json = retry_resp.json() or {} + retry_message = str((retry_json.get("error") or {}).get("message") or "") + retry_code = str((retry_json.get("error") or {}).get("code") or "") + except Exception: + retry_message = retry_resp.text[:200] + + if ( + retry_code == "user_already_exists" + or "already exists" in retry_message.lower() + or "please login" in retry_message.lower() + ): + self._log("重试后检测到账号已存在,切换到验证码登录流程", "warning") + self._is_existing_account = True + self.password = "" + return False, True + + return False, False except Exception as e: self._log(f"创建账户失败: {e}", "error") - return False + return False, False @staticmethod def _decode_cookie_json_value(raw_value: str) -> Optional[Dict[str, Any]]: @@ -956,6 +1138,143 @@ class RefreshTokenRegistrationEngine: return None return self._decode_cookie_json_value(auth_cookie) + def _fetch_consent_page_html(self, consent_url: str) -> str: + try: + headers = self._build_navigation_headers( + referer="https://auth.openai.com/email-verification" + ) + response = self.session.get( + consent_url, + headers=headers, + allow_redirects=False, + timeout=30, + ) + content_type = str(response.headers.get("content-type") or "").lower() + if response.status_code == 200 and "text/html" in content_type: + return response.text + self._log( + f"Workspace 解析: consent HTML 未命中 (status={response.status_code}, type={content_type})", + "warning", + ) + except Exception: + pass + return "" + + def _extract_session_data_from_consent_html(self, html: str) -> Optional[Dict[str, Any]]: + if not html or "workspaces" not in html: + return None + + def _first_match(patterns, text): + for pattern in patterns: + match = re.search(pattern, text, re.S) + if match: + return match.group(1) + return "" + + def _build_from_text(text): + if not text or "workspaces" not in text: + return None + + normalized = text.replace('\\"', '"') + + session_id = _first_match( + [ + r'"session_id","([^"]+)"', + r'"session_id":"([^"]+)"', + ], + normalized, + ) + client_id = _first_match( + [ + r'"openai_client_id","([^"]+)"', + r'"openai_client_id":"([^"]+)"', + ], + normalized, + ) + + start = normalized.find('"workspaces"') + if start < 0: + start = normalized.find("workspaces") + if start < 0: + return None + + end = normalized.find('"openai_client_id"', start) + if end < 0: + end = normalized.find("openai_client_id", start) + if end < 0: + end = min(len(normalized), start + 4000) + else: + end = min(len(normalized), end + 600) + + workspace_chunk = normalized[start:end] + ids = re.findall(r'"id"(?:,|:)"([0-9a-fA-F-]{36})"', workspace_chunk) + if not ids: + return None + + kinds = re.findall(r'"kind"(?:,|:)"([^"]+)"', workspace_chunk) + workspaces = [] + seen = set() + for idx, wid in enumerate(ids): + if wid in seen: + continue + seen.add(wid) + item = {"id": wid} + if idx < len(kinds): + item["kind"] = kinds[idx] + workspaces.append(item) + + if not workspaces: + return None + + return { + "session_id": session_id, + "openai_client_id": client_id, + "workspaces": workspaces, + } + + candidates = [html] + for quoted in re.findall( + r'streamController\.enqueue\(("(?:\\.|[^"\\])*")\)', + html, + re.S, + ): + try: + decoded = json.loads(quoted) + except Exception: + continue + if decoded: + candidates.append(decoded) + + if '\\"' in html: + candidates.append(html.replace('\\"', '"')) + + for candidate in candidates: + parsed = _build_from_text(candidate) + if parsed and parsed.get("workspaces"): + return parsed + + return None + + def _load_workspace_session_data(self, consent_url: str) -> Optional[Dict[str, Any]]: + self._log("Workspace 解析: 尝试从 Cookie/consent HTML 提取 session 数据") + session_data = self._decode_auth_session_cookie() + if session_data and session_data.get("workspaces"): + self._log("Workspace 解析: Cookie 命中 workspace 数据") + return session_data + + html = self._fetch_consent_page_html(consent_url) + if not html: + self._log("Workspace 解析: consent HTML 获取失败", "warning") + return session_data + + parsed = self._extract_session_data_from_consent_html(html) + if parsed and parsed.get("workspaces"): + self._log(f"从 consent HTML 提取到 {len(parsed.get('workspaces', []))} 个 workspace") + return parsed + + self._log("Workspace 解析: consent HTML 未提取到 workspace", "warning") + return session_data + def _extract_callback_url_from_candidate(self, candidate: str) -> str: normalized = normalize_flow_url(str(candidate or "").strip(), auth_base="https://auth.openai.com") if not normalized: @@ -1097,8 +1416,8 @@ class RefreshTokenRegistrationEngine: ) page_type = str(self._post_otp_page_type or "").strip().lower() - if continue_url and "about-you" in continue_url: - self._log("OTP 后进入 about-you,按参考 RT 逻辑补齐 consent 跳转...") + if (continue_url and "about-you" in continue_url) or page_type in {"about_you", "about-you"}: + self._log("OTP 后进入 about-you,访问页面补齐 Cookie/consent 跳转...") try: response = self.session.get( "https://auth.openai.com/about-you", @@ -1228,7 +1547,15 @@ class RefreshTokenRegistrationEngine: except Exception as e: self._log(f"加载 consent 页面异常: {e}", "warning") - workspace_id = self._get_workspace_id() or "" + self._log("Workspace 解析: 准备从 consent HTML 抽取 workspace 数据") + session_data = self._load_workspace_session_data(consent_url) + if session_data and session_data.get("workspaces"): + workspace_id = str((session_data.get("workspaces") or [{}])[0].get("id") or "").strip() + if workspace_id: + self._log(f"Workspace ID(HTML): {workspace_id}") + + if not workspace_id: + workspace_id = self._get_workspace_id() or "" if workspace_id: try: ws_response = self.session.post( @@ -1405,6 +1732,7 @@ class RefreshTokenRegistrationEngine: self._post_otp_continue_url = "" self._post_otp_page_type = "" self._used_verification_codes.clear() + self._last_verification_code = None self._log("=" * 60) self._log("注册流程启动") @@ -1470,14 +1798,22 @@ class RefreshTokenRegistrationEngine: return result self._log("9. 创建用户账户...") - if not self._create_user_account(): - result.error_message = "创建用户账户失败" - return result + account_ok, account_exists = self._create_user_account() + if account_exists: + self._log("账号已存在,切换到验证码登录流程", "warning") + login_ready, login_error = self._restart_passwordless_login_flow() + if not login_ready: + result.error_message = login_error + return result + else: + if not account_ok: + result.error_message = "创建用户账户失败" + return result - login_ready, login_error = self._restart_login_flow() - if not login_ready: - result.error_message = login_error - return result + login_ready, login_error = self._restart_login_flow() + if not login_ready: + result.error_message = login_error + return result if not self._complete_token_exchange(result): return result diff --git a/tests/test_chatgpt_register.py b/tests/test_chatgpt_register.py index 3bdd39b..7cd2631 100644 --- a/tests/test_chatgpt_register.py +++ b/tests/test_chatgpt_register.py @@ -167,7 +167,7 @@ class RegistrationEngineFlowTests(unittest.TestCase): mock.patch.object(engine, "_send_verification_code", return_value=True) as send_otp, \ mock.patch.object(engine, "_get_verification_code", return_value="123456") as get_otp, \ mock.patch.object(engine, "_validate_verification_code", return_value=True) as validate_otp, \ - mock.patch.object(engine, "_create_user_account", return_value=True) as create_account, \ + mock.patch.object(engine, "_create_user_account", return_value=(True, False)) as create_account, \ mock.patch.object(engine, "_restart_login_flow", side_effect=fake_restart_login_flow) as restart_login, \ mock.patch.object(engine, "_complete_token_exchange", side_effect=fake_complete_token_exchange) as complete_exchange: result = engine.run()