From f2fb6a9b84a0a958084d321b4fa6562e9929a562 Mon Sep 17 00:00:00 2001 From: zhangchen <1987834247@qq.com> Date: Tue, 31 Mar 2026 03:57:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0gpt=E6=B3=A8=E5=86=8Crt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- platforms/chatgpt/oauth_pkce_client.py | 467 ++++++++++++++++++++++++ platforms/chatgpt/register_v2.py | 474 ++++++++++++++----------- services/external_sync.py | 10 +- 3 files changed, 734 insertions(+), 217 deletions(-) create mode 100644 platforms/chatgpt/oauth_pkce_client.py diff --git a/platforms/chatgpt/oauth_pkce_client.py b/platforms/chatgpt/oauth_pkce_client.py new file mode 100644 index 0000000..2aafdd3 --- /dev/null +++ b/platforms/chatgpt/oauth_pkce_client.py @@ -0,0 +1,467 @@ +""" +OAuth PKCE 注册客户端 + +完整实现 auth.openai.com 注册状态机 + 登录获取 Token 的全生命周期。 +每个步骤封装为独立方法,调用方按编号依次调用即可完成整个注册流程。 +""" + +import json +import re +import time +import urllib.parse +from typing import Optional + +from curl_cffi import requests as curl_requests + +from .oauth import ( + OAuthStart, + _decode_jwt_segment, + generate_oauth_url, + submit_callback_url, +) + +AUTH_BASE = "https://auth.openai.com" +SENTINEL_API = "https://sentinel.openai.com/backend-api/sentinel/req" +SENTINEL_REFERER = "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6" +CLOUDFLARE_TRACE = "https://cloudflare.com/cdn-cgi/trace" + + +class OAuthPkceClient: + """ + OAuth PKCE 注册客户端 + + 完整注册流程(12 步): + 1. 检查 IP 地区 + 2. 访问 OAuth 授权 URL,获取 oai-did Cookie + 3. 获取 Sentinel Token + 4. 提交邮箱 (authorize/continue) + 5. 提交密码 (user/register) + 6. 发送 OTP (email-otp/send) + 7. 验证 OTP (email-otp/validate) + 8. 创建账户 (create_account) + 9. 注册后重新 OAuth 登录 + 10. 解析 workspace_id + 11. 选择 workspace + 12. 跟踪重定向链,交换 OAuth code → access_token + """ + + def __init__(self, proxy: Optional[str] = None, log_fn=None): + self.proxy = proxy + self._log = log_fn or (lambda msg: None) + self._proxies = {"http": proxy, "https": proxy} if proxy else None + + # 主会话:贯穿整个注册 + 登录流程 + self.session = curl_requests.Session( + proxies=self._proxies, + impersonate="chrome", + ) + + self._device_id: Optional[str] = None + self._sentinel: Optional[str] = None + + # ══════════════════════════════════════════════════════════════════ + # 内部方法:获取 Sentinel Token(极简模式) + # ══════════════════════════════════════════════════════════════════ + + def _fetch_sentinel_token(self, device_id: str, flow: str = "authorize_continue") -> str: + """ + 获取 Sentinel Token。 + + 使用独立连接(不复用 session cookie),请求体 p 字段留空, + 只取响应中的 token 字段拼装为 openai-sentinel-token header 值。 + + Returns: + JSON 格式的 sentinel token 字符串。 + """ + req_body = json.dumps({"p": "", "id": device_id, "flow": flow}) + + resp = curl_requests.post( + SENTINEL_API, + headers={ + "origin": "https://sentinel.openai.com", + "referer": SENTINEL_REFERER, + "content-type": "text/plain;charset=UTF-8", + }, + data=req_body, + proxies=self._proxies, + impersonate="chrome", + timeout=15, + ) + + if resp.status_code != 200: + raise RuntimeError(f"Sentinel 获取失败: HTTP {resp.status_code}") + + c_value = resp.json().get("token", "") + if not c_value: + raise RuntimeError("Sentinel 响应缺少 token 字段") + + return json.dumps({ + "p": "", "t": "", "c": c_value, + "id": device_id, "flow": flow, + }, separators=(",", ":")) + + # ══════════════════════════════════════════════════════════════════ + # 步骤 1:检查 IP 地区 + # ══════════════════════════════════════════════════════════════════ + + def check_ip_region(self) -> str: + """检查当前 IP 地区,CN/HK 不支持。""" + try: + resp = self.session.get(CLOUDFLARE_TRACE, timeout=10) + match = re.search(r"^loc=(.+)$", resp.text, re.MULTILINE) + loc = match.group(1).strip() if match else "UNKNOWN" + self._log(f"当前 IP 地区: {loc}") + if loc in ("CN", "HK"): + raise RuntimeError(f"IP 地区不支持: {loc}") + return loc + except RuntimeError: + raise + except Exception as e: + raise RuntimeError(f"IP 地区检查失败: {e}") from e + + # ══════════════════════════════════════════════════════════════════ + # 步骤 2:访问 OAuth 授权 URL,获取 oai-did Cookie + # ══════════════════════════════════════════════════════════════════ + + def init_oauth_session(self) -> OAuthStart: + """生成 OAuth PKCE URL 并访问,建立 auth.openai.com 会话。""" + oauth = generate_oauth_url() + self._log("访问 OAuth 授权 URL...") + self.session.get(oauth.auth_url, timeout=15) + self._device_id = self.session.cookies.get("oai-did") or "" + self._log(f"oai-did: {self._device_id[:16]}..." if self._device_id else "oai-did: (未获取到)") + return oauth + + # ══════════════════════════════════════════════════════════════════ + # 步骤 3:获取 Sentinel Token + # ══════════════════════════════════════════════════════════════════ + + def refresh_sentinel(self) -> str: + """获取新的 Sentinel Token 并缓存。""" + if not self._device_id: + raise RuntimeError("尚未初始化 oai-did(请先调用 init_oauth_session)") + self._sentinel = self._fetch_sentinel_token(self._device_id) + self._log("Sentinel Token 已获取") + return self._sentinel + + # ══════════════════════════════════════════════════════════════════ + # 步骤 4:提交邮箱 + # ══════════════════════════════════════════════════════════════════ + + def submit_email(self, email: str) -> dict: + """向 authorize/continue 提交邮箱,触发注册状态机。""" + if not self._sentinel: + raise RuntimeError("Sentinel Token 未初始化") + + payload = json.dumps({ + "username": {"value": email, "kind": "email"}, + "screen_hint": "signup", + }) + self._log(f"提交邮箱: {email}") + + resp = self.session.post( + f"{AUTH_BASE}/api/accounts/authorize/continue", + headers={ + "referer": f"{AUTH_BASE}/create-account", + "accept": "application/json", + "content-type": "application/json", + "openai-sentinel-token": self._sentinel, + }, + data=payload, + timeout=30, + ) + if resp.status_code != 200: + raise RuntimeError(f"提交邮箱失败: HTTP {resp.status_code} {resp.text[:300]}") + + data = resp.json() + self._log(f"邮箱提交成功") + return data + + # ══════════════════════════════════════════════════════════════════ + # 步骤 5:提交密码 + # ══════════════════════════════════════════════════════════════════ + + def submit_password(self, email: str, password: str) -> str: + """向 user/register 提交密码,返回 continue_url。""" + payload = json.dumps({"password": password, "username": email}) + self._log("提交密码...") + + resp = self.session.post( + f"{AUTH_BASE}/api/accounts/user/register", + headers={ + "referer": f"{AUTH_BASE}/create-account/password", + "accept": "application/json", + "content-type": "application/json", + "openai-sentinel-token": self._sentinel or "", + }, + data=payload, + timeout=30, + ) + if resp.status_code != 200: + raise RuntimeError(f"提交密码失败: HTTP {resp.status_code} {resp.text[:300]}") + + continue_url = resp.json().get("continue_url") or "" + self._log(f"密码提交成功{', continue_url 已获取' if continue_url else ''}") + return continue_url + + # ══════════════════════════════════════════════════════════════════ + # 步骤 6:发送 OTP + # ══════════════════════════════════════════════════════════════════ + + def send_otp(self, continue_url: str = "") -> bool: + """触发发送邮箱验证码。""" + url = continue_url or f"{AUTH_BASE}/api/accounts/email-otp/send" + self._log(f"发送验证码: {url}") + + try: + resp = self.session.post( + url, + headers={ + "referer": f"{AUTH_BASE}/create-account/password", + "accept": "application/json", + "content-type": "application/json", + "openai-sentinel-token": self._sentinel or "", + }, + timeout=30, + ) + self._log(f"验证码发送状态: {resp.status_code}") + return resp.status_code == 200 + except Exception as e: + self._log(f"发送验证码异常(非致命): {e}") + return False + + # ══════════════════════════════════════════════════════════════════ + # 步骤 7:验证 OTP + # ══════════════════════════════════════════════════════════════════ + + def validate_otp(self, code: str) -> None: + """提交邮箱验证码。""" + self._log(f"验证 OTP: {code}") + + resp = self.session.post( + f"{AUTH_BASE}/api/accounts/email-otp/validate", + headers={ + "referer": f"{AUTH_BASE}/email-verification", + "accept": "application/json", + "content-type": "application/json", + }, + data=json.dumps({"code": code}), + timeout=30, + ) + if resp.status_code != 200: + raise RuntimeError(f"OTP 验证失败: HTTP {resp.status_code} {resp.text[:300]}") + self._log("OTP 验证通过") + + # ══════════════════════════════════════════════════════════════════ + # 步骤 8:创建账户 + # ══════════════════════════════════════════════════════════════════ + + def create_account(self, name: str, birthdate: str) -> None: + """提交姓名和生日完成账户创建。""" + self._log(f"创建账户: {name} ({birthdate})") + + resp = self.session.post( + f"{AUTH_BASE}/api/accounts/create_account", + headers={ + "referer": f"{AUTH_BASE}/about-you", + "accept": "application/json", + "content-type": "application/json", + }, + data=json.dumps({"name": name, "birthdate": birthdate}), + timeout=30, + ) + if resp.status_code != 200: + raise RuntimeError(f"创建账户失败: HTTP {resp.status_code} {resp.text[:300]}") + self._log("账户创建成功") + + # ══════════════════════════════════════════════════════════════════ + # 步骤 9:注册后重新 OAuth 登录 + # ══════════════════════════════════════════════════════════════════ + + def login_after_register( + self, email: str, password: str, otp_code: str = "" + ) -> OAuthStart: + """ + 注册完成后重走 OAuth 登录流程。 + + 注册阶段的 session 不含 workspace 信息,必须重新走一次 + OAuth 登录获取 oai-client-auth-session Cookie。 + + Returns: + 登录阶段的 OAuthStart(含 code_verifier 等,用于步骤 12 Token 交换)。 + """ + self._log("=" * 40) + self._log("开始 OAuth 登录(获取 workspace)...") + + # 9-1. 访问新 OAuth URL + login_oauth = generate_oauth_url() + self.session.get(login_oauth.auth_url, timeout=15) + login_did = self.session.cookies.get("oai-did") or self._device_id or "" + self._log(f"登录阶段 oai-did: {login_did[:16]}..." if login_did else "登录阶段 oai-did: (空)") + + # 9-2. 获取登录阶段 Sentinel + login_sentinel = self._fetch_sentinel_token(login_did) + + # 9-3. 提交邮箱(screen_hint=login) + login_email_resp = self.session.post( + f"{AUTH_BASE}/api/accounts/authorize/continue", + headers={ + "referer": f"{AUTH_BASE}/sign-in", + "accept": "application/json", + "content-type": "application/json", + "openai-sentinel-token": login_sentinel, + }, + data=json.dumps({ + "username": {"value": email, "kind": "email"}, + "screen_hint": "login", + }), + timeout=30, + ) + if login_email_resp.status_code != 200: + raise RuntimeError(f"登录提交邮箱失败: HTTP {login_email_resp.status_code}") + + page_type = (login_email_resp.json().get("page") or {}).get("type", "") + self._log(f"登录页面类型: {page_type}") + + # 9-4. 提交密码(login_password 页面) + if "password" in page_type: + self._log("提交密码...") + pwd_resp = self.session.post( + f"{AUTH_BASE}/api/accounts/password/verify", + headers={ + "referer": f"{AUTH_BASE}/log-in/password", + "accept": "application/json", + "content-type": "application/json", + "openai-sentinel-token": login_sentinel, + }, + data=json.dumps({"password": password}), + timeout=30, + ) + if pwd_resp.status_code != 200: + raise RuntimeError(f"登录密码验证失败: HTTP {pwd_resp.status_code}") + page_type = (pwd_resp.json().get("page") or {}).get("type", "") + self._log(f"密码验证后页面类型: {page_type}") + + # 9-5. 二次 OTP(复用注册阶段验证码) + if "otp" in page_type or "verification" in page_type: + if not otp_code: + raise RuntimeError("登录需要二次 OTP 验证,但未提供验证码") + self._log(f"提交登录二次验证码: {otp_code}") + # 触发发信请求以满足后端状态机(可忽略报错) + try: + self.session.post( + f"{AUTH_BASE}/api/accounts/passwordless/send-otp", + headers={ + "referer": f"{AUTH_BASE}/log-in/password", + "accept": "application/json", + "content-type": "application/json", + }, + timeout=10, + ) + except Exception: + pass + + otp_resp = self.session.post( + f"{AUTH_BASE}/api/accounts/email-otp/validate", + headers={ + "referer": f"{AUTH_BASE}/email-verification", + "accept": "application/json", + "content-type": "application/json", + "openai-sentinel-token": login_sentinel, + }, + data=json.dumps({"code": otp_code}), + timeout=30, + ) + if otp_resp.status_code != 200: + raise RuntimeError(f"登录二次 OTP 失败: HTTP {otp_resp.status_code} {otp_resp.text[:200]}") + self._log("登录二次验证通过") + + self._log("OAuth 登录流程完成") + return login_oauth + + # ══════════════════════════════════════════════════════════════════ + # 步骤 10:解析 workspace_id + # ══════════════════════════════════════════════════════════════════ + + def extract_workspace_id(self) -> str: + """从 oai-client-auth-session Cookie(JWT)中解析 workspace_id。""" + auth_cookie = self.session.cookies.get("oai-client-auth-session") or "" + if not auth_cookie: + raise RuntimeError("未找到 oai-client-auth-session Cookie") + + # JWT 段遍历(workspace 可能在第一段或第二段) + segments = auth_cookie.split(".") + for i in range(min(len(segments), 2)): + data = _decode_jwt_segment(segments[i]) + workspaces = data.get("workspaces") or [] + if workspaces: + wid = str((workspaces[0] or {}).get("id") or "").strip() + if wid: + self._log(f"成功解析 workspace_id: {wid}") + return wid + + # 调试信息 + first_data = _decode_jwt_segment(segments[0]) if segments else {} + self._log(f"Cookie 字段: {list(first_data.keys())}") + raise RuntimeError("无法从 Cookie 中解析 workspace_id") + + # ══════════════════════════════════════════════════════════════════ + # 步骤 11:选择 workspace + # ══════════════════════════════════════════════════════════════════ + + def select_workspace(self, workspace_id: str) -> str: + """选择 workspace,返回 continue_url。""" + self._log(f"选择 workspace: {workspace_id}") + + resp = self.session.post( + f"{AUTH_BASE}/api/accounts/workspace/select", + headers={ + "referer": f"{AUTH_BASE}/sign-in-with-chatgpt/codex/consent", + "content-type": "application/json", + }, + data=json.dumps({"workspace_id": workspace_id}), + timeout=30, + ) + if resp.status_code != 200: + raise RuntimeError(f"workspace/select 失败: HTTP {resp.status_code} {resp.text[:300]}") + + continue_url = str((resp.json() or {}).get("continue_url") or "").strip() + if not continue_url: + raise RuntimeError("workspace/select 响应缺少 continue_url") + self._log("workspace 选择成功,continue_url 已获取") + return continue_url + + # ══════════════════════════════════════════════════════════════════ + # 步骤 12:跟踪重定向链,交换 OAuth code → access_token + # ══════════════════════════════════════════════════════════════════ + + def follow_redirects_and_exchange_token( + self, continue_url: str, oauth_start: OAuthStart + ) -> dict: + """跟踪重定向链,捕获 code= 回调 URL,交换 access_token。""" + current_url = continue_url + + for hop in range(8): + resp = self.session.get(current_url, allow_redirects=False, timeout=15) + location = resp.headers.get("Location") or "" + + if resp.status_code not in (301, 302, 303, 307, 308) or not location: + break + + next_url = urllib.parse.urljoin(current_url, location) + self._log(f"重定向 [{hop + 1}] → {next_url[:100]}...") + + if "code=" in next_url and "state=" in next_url: + self._log("捕获到 OAuth 回调 URL,交换 Token...") + token_json = submit_callback_url( + callback_url=next_url, + expected_state=oauth_start.state, + code_verifier=oauth_start.code_verifier, + redirect_uri=oauth_start.redirect_uri, + proxy_url=self.proxy, + ) + return json.loads(token_json) + + current_url = next_url + + raise RuntimeError("未能在重定向链中捕获到 OAuth 回调 URL(含 code= 参数)") diff --git a/platforms/chatgpt/register_v2.py b/platforms/chatgpt/register_v2.py index e83446c..153647a 100644 --- a/platforms/chatgpt/register_v2.py +++ b/platforms/chatgpt/register_v2.py @@ -1,8 +1,13 @@ """ 注册流程引擎 V2 -基于 curl_cffi 的注册状态机,注册成功后直接复用同一会话提取 ChatGPT Session。 + +采用策略模式封装注册核心(OAuthPkceRegisterStrategy), +走 auth.openai.com OAuth PKCE 直通注册流程。 + +外部接口与 plugin.py 完全兼容,无需改动邮箱适配层。 """ +import random import time import logging from datetime import datetime @@ -11,34 +16,211 @@ from typing import Optional, Callable from core.base_platform import AccountStatus from platforms.chatgpt.register import RegistrationResult -from .chatgpt_client import ChatGPTClient -from .oauth_client import OAuthClient +from .oauth_pkce_client import OAuthPkceClient from .utils import generate_random_name, generate_random_birthday logger = logging.getLogger(__name__) -class EmailServiceAdapter: - """\u5c06 V1 \u7684 email_service \u9002\u914d\u6210 V2 \u6240\u9700\u7684\u63a5\u7801\u63a5\u53e3\u3002""" - def __init__(self, email_service, email, log_fn): - self.es = email_service - self.email = email - self.log_fn = log_fn - self._used_codes = set() - def wait_for_verification_code(self, email, timeout=60, otp_sent_at=None, exclude_codes=None): - msg = f"\u6b63\u5728\u7b49\u5f85\u90ae\u7bb1 {email} \u7684\u9a8c\u8bc1\u7801 ({timeout}s)..." - self.log_fn(msg) - code = self.es.get_verification_code( +# --------------------------------------------------------------------------- +# 名字/生日数据 +# --------------------------------------------------------------------------- + +_FIRST_NAMES = [ + "James", "John", "Robert", "Michael", "David", "William", "Richard", + "Joseph", "Thomas", "Charles", "Mary", "Patricia", "Jennifer", "Linda", + "Barbara", "Elizabeth", "Susan", "Jessica", "Sarah", "Karen", "Daniel", + "Matthew", "Anthony", "Mark", "Steven", "Andrew", "Paul", "Emily", + "Emma", "Olivia", "Sophia", "Ava", "Isabella", "Mia", +] +_LAST_NAMES = [ + "Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", + "Davis", "Rodriguez", "Martinez", "Anderson", "Taylor", "Thomas", + "Wilson", "Moore", "Jackson", "Martin", "Lee", "Thompson", "White", +] + + +def _random_name_and_birthday() -> tuple[str, str]: + """随机生成全名和生日。""" + name = f"{random.choice(_FIRST_NAMES)} {random.choice(_LAST_NAMES)}" + year = random.randint(1985, 2004) + month = random.randint(1, 12) + day = random.randint(1, 28) + return name, f"{year}-{month:02d}-{day:02d}" + + +# --------------------------------------------------------------------------- +# 策略:OAuth PKCE 注册策略 +# --------------------------------------------------------------------------- + +class OAuthPkceRegisterStrategy: + """ + OAuth PKCE 注册策略(策略模式) + + 完整注册流程(12 步): + 1. 检查 IP 地区 + 2. 访问 OAuth 授权 URL,获取 oai-did Cookie + 3. 获取 Sentinel Token + 4. 提交邮箱 (authorize/continue) + 5. 提交密码 (user/register) + 6. 发送 OTP (email-otp/send) + 7. 验证 OTP (email-otp/validate) + 8. 创建账户 (create_account) + 9. 注册后重新 OAuth 登录 + 10. 解析 workspace_id + 11. 选择 workspace + 12. 跟踪重定向链,交换 OAuth code → access_token + """ + + def execute( + self, + client: OAuthPkceClient, + email_service, + email: str, + password: str, + log: Callable[[str], None], + ) -> RegistrationResult: + result = RegistrationResult(success=False) + + # ── 步骤 1:IP 检查 ────────────────────────────────────────────── + log("步骤 1/12: 检查 IP 地区...") + client.check_ip_region() + + # ── 步骤 2:创建邮箱 ──────────────────────────────────────────── + log("步骤 2/12: 创建邮箱接码订单...") + email_data = email_service.create_email() + email_addr = email or (email_data.get("email") if email_data else None) + if not email_addr: + result.error_message = "创建邮箱失败" + return result + result.email = email_addr + result.password = password + log(f"邮箱: {email_addr}") + + # ── 步骤 3:初始化 OAuth 会话 ──────────────────────────────────── + log("步骤 3/12: 访问 OAuth 授权 URL,获取 oai-did...") + client.init_oauth_session() + + # ── 步骤 4:获取 Sentinel Token ────────────────────────────────── + log("步骤 4/12: 获取 Sentinel Token...") + client.refresh_sentinel() + + # ── 步骤 5:提交邮箱 ──────────────────────────────────────────── + log("步骤 5/12: 提交邮箱...") + client.submit_email(email_addr) + + # ── 步骤 6:提交密码 ──────────────────────────────────────────── + log("步骤 6/12: 提交密码...") + continue_url = client.submit_password(email_addr, password) + + # ── 步骤 7:发送 OTP + 等待验证码 ──────────────────────────────── + log("步骤 7/12: 发送验证码...") + otp_sent_at = time.time() + client.send_otp(continue_url) + + log("步骤 7/12: 等待邮箱验证码(最多 120s)...") + otp_code = email_service.wait_for_verification_code( + email_addr, + timeout=120, + otp_sent_at=otp_sent_at, + ) + if not otp_code: + result.error_message = "未收到邮箱验证码" + return result + log(f"验证码: {otp_code}") + + # ── 步骤 8:验证 OTP ───────────────────────────────────────────── + log("步骤 8/12: 验证 OTP...") + client.validate_otp(otp_code) + + # ── 步骤 9:创建账户 ──────────────────────────────────────────── + log("步骤 9/12: 填写账户信息(姓名/生日)...") + name, birthdate = _random_name_and_birthday() + log(f"账户信息: {name} ({birthdate})") + client.create_account(name, birthdate) + + # ── 步骤 10:注册后重新 OAuth 登录 ─────────────────────────────── + log("步骤 10/12: 注册后重新 OAuth 登录...") + login_oauth = client.login_after_register(email_addr, password, otp_code) + + # ── 步骤 11:解析 workspace_id ─────────────────────────────────── + log("步骤 11/12: 解析 workspace_id...") + workspace_id = client.extract_workspace_id() + result.workspace_id = workspace_id + + # ── 步骤 12:选择 workspace → 交换 Token ───────────────────────── + log("步骤 12/12: 选择 workspace...") + ws_continue_url = client.select_workspace(workspace_id) + + log("步骤 12/12: 跟踪重定向链,交换 OAuth Token...") + token_data = client.follow_redirects_and_exchange_token(ws_continue_url, login_oauth) + + # ── 组装结果 ───────────────────────────────────────────────────── + result.success = True + result.access_token = token_data.get("access_token", "") + result.refresh_token = token_data.get("refresh_token", "") + result.id_token = token_data.get("id_token", "") + result.account_id = token_data.get("account_id", "") or workspace_id + result.metadata = { + "type": token_data.get("type", "codex"), + "expired": token_data.get("expired", ""), + } + + log("=" * 50) + log("注册流程成功完成!") + log("=" * 50) + return result + + +# --------------------------------------------------------------------------- +# Email Service 适配器(为 OAuthPkceClient 提供统一接码接口) +# --------------------------------------------------------------------------- + +class _EmailServiceAdapter: + """ + 将 V1 email_service(含 create_email / get_verification_code) + 适配为 OAuthPkceRegisterStrategy 期待的接口。 + + 接口: + - create_email() → {'email': str, ...} + - wait_for_verification_code(email, timeout, otp_sent_at) → str | None + """ + + def __init__(self, email_service, log: Callable[[str], None]): + self._svc = email_service + self._log = log + self._used_codes: set = set() + + def create_email(self, config=None): + return self._svc.create_email(config) + + def wait_for_verification_code( + self, email: str, timeout: int = 120, otp_sent_at=None + ) -> Optional[str]: + self._log(f"等待邮箱 {email} 的验证码(timeout={timeout}s)...") + code = self._svc.get_verification_code( + email=email, timeout=timeout, otp_sent_at=otp_sent_at, - exclude_codes=exclude_codes or self._used_codes, + exclude_codes=self._used_codes, ) if code: self._used_codes.add(code) - self.log_fn(f"\u6210\u529f\u83b7\u53d6\u9a8c\u8bc1\u7801: {code}") + self._log(f"成功获取验证码: {code}") return code + +# --------------------------------------------------------------------------- +# 注册引擎(对外暴露给 plugin.py,接口完全向后兼容) +# --------------------------------------------------------------------------- + class RegistrationEngineV2: + """ + 注册引擎 V2(外部接口层) + + plugin.py 通过此类发起注册,不感知内部策略变化。 + """ + def __init__( self, email_service, @@ -56,12 +238,12 @@ class RegistrationEngineV2: self.task_uuid = task_uuid self.max_retries = max(1, int(max_retries or 1)) self.extra_config = dict(extra_config or {}) - - self.email = None - self.password = None - self.logs = [] - - def _log(self, message: str, level: str = "info"): + + self.email: Optional[str] = None + self.password: Optional[str] = None + self.logs: list[str] = [] + + def _log(self, message: str, level: str = "info") -> None: timestamp = datetime.now().strftime("%H:%M:%S") log_message = f"[{timestamp}] {message}" self.logs.append(log_message) @@ -72,199 +254,63 @@ class RegistrationEngineV2: else: logger.info(log_message) - def _format_oauth_failure(self, oauth_client: OAuthClient) -> str: - detail = str(getattr(oauth_client, "last_error", "") or "").strip() - if not detail: - detail = "获取最终 OAuth Tokens 失败" - return f"账号已创建成功,但 {detail}" + def run(self) -> RegistrationResult: + """执行注册流程,支持整流程重试。""" + result = RegistrationResult(success=False, logs=self.logs) + last_error = "" - def _should_retry(self, message: str) -> bool: + for attempt in range(self.max_retries): + try: + if attempt > 0: + self._log(f"整流程重试 {attempt + 1}/{self.max_retries}...") + time.sleep(2) + + adapter = _EmailServiceAdapter(self.email_service, self._log) + client = OAuthPkceClient(proxy=self.proxy_url, log_fn=self._log) + strategy = OAuthPkceRegisterStrategy() + + result = strategy.execute( + client=client, + email_service=adapter, + email=self.email or "", + password=self.password or "AAb1234567890!", + log=self._log, + ) + result.logs = self.logs + + if result.success: + return result + + last_error = result.error_message or "注册失败" + self._log(f"注册失败: {last_error}", "error") + + if attempt < self.max_retries - 1 and self._should_retry(last_error): + self._log("准备重试...") + continue + + return result + + except Exception as e: + last_error = str(e) + self._log(f"注册异常: {last_error}", "error") + if attempt < self.max_retries - 1 and self._should_retry(last_error): + continue + result.error_message = last_error + result.logs = self.logs + return result + + result.error_message = last_error or "注册失败" + result.logs = self.logs + return result + + @staticmethod + def _should_retry(message: str) -> bool: + """判断是否值得重试。""" text = str(message or "").lower() retriable_markers = [ - "tls", - "ssl", - "curl: (35)", - "预授权被拦截", - "authorize", - "registration_disallowed", - "http 400", - "创建账号失败", - "未获取到 authorization code", - "consent", - "workspace", - "organization", - "otp", - "验证码", - "session", - "accessToken", - "next-auth", + "tls", "ssl", "curl: (35)", + "ip 地区检查失败", "sentinel", + "timeout", "timed out", "connection", + "验证码", "otp", ] - return any(marker.lower() in text for marker in retriable_markers) - - def run(self) -> RegistrationResult: - result = RegistrationResult(success=False, logs=self.logs) - try: - last_error = "" - for attempt in range(self.max_retries): - try: - if attempt == 0: - self._log("=" * 60) - self._log("开始注册流程 V2 (Session 复用直取 AccessToken)") - self._log(f"请求模式: {self.browser_mode}") - self._log("=" * 60) - else: - self._log(f"整流程重试 {attempt + 1}/{self.max_retries} ...") - time.sleep(1) - - # 1. 创建邮箱 - email_data = self.email_service.create_email() - email_addr = self.email or (email_data.get('email') if email_data else None) - if not email_addr: - result.error_message = "创建邮箱失败" - return result - - result.email = email_addr - - pwd = self.password or "AAb1234567890!" - result.password = pwd - - # 随机姓名、生日 - first_name, last_name = generate_random_name() - birthdate = generate_random_birthday() - - self._log(f"邮箱: {email_addr}, 密码: {pwd}") - self._log(f"注册信息: {first_name} {last_name}, 生日: {birthdate}") - - # 使用包装器为底层客户端提供接码服务 - skymail_adapter = EmailServiceAdapter(self.email_service, email_addr, self._log) - - # 2. 初始化 V2 客户端 - chatgpt_client = ChatGPTClient( - proxy=self.proxy_url, - verbose=False, - browser_mode=self.browser_mode, - ) - chatgpt_client._log = self._log - - self._log("步骤 1/2: 执行注册状态机...") - - success, msg = chatgpt_client.register_complete_flow( - email_addr, pwd, first_name, last_name, birthdate, skymail_adapter - ) - - if not success: - last_error = f"注册流失败: {msg}" - if attempt < self.max_retries - 1 and self._should_retry(msg): - self._log(f"注册流失败,准备整流程重试: {msg}") - continue - result.error_message = last_error - return result - - self._log("步骤 2/2: 优先复用注册会话提取 ChatGPT Session / AccessToken...") - session_ok, session_result = chatgpt_client.reuse_session_and_get_tokens() - - if session_ok: - self._log("Token 提取完成!") - result.success = True - result.access_token = session_result.get("access_token", "") - result.session_token = session_result.get("session_token", "") - result.account_id = ( - session_result.get("account_id") - or session_result.get("user_id") - or ("v2_acct_" + chatgpt_client.device_id[:8]) - ) - result.workspace_id = session_result.get("workspace_id", "") - result.metadata = { - "auth_provider": session_result.get("auth_provider", ""), - "expires": session_result.get("expires", ""), - "user_id": session_result.get("user_id", ""), - "user": session_result.get("user") or {}, - "account": session_result.get("account") or {}, - } - - if result.workspace_id: - self._log(f"Session Workspace ID: {result.workspace_id}") - - self._log("=" * 60) - self._log("注册流程成功结束!") - self._log("=" * 60) - return result - - self._log(f"复用会话失败,回退到 OAuth 登录补全流程: {session_result}") - tokens = None - oauth_client = None - for oauth_attempt in range(2): - if oauth_attempt > 0: - self._log(f"同账号 OAuth 重试 {oauth_attempt + 1}/2 ...") - time.sleep(1) - - oauth_client = OAuthClient( - config=self.extra_config, - proxy=self.proxy_url, - verbose=False, - browser_mode=self.browser_mode, - ) - oauth_client._log = self._log - oauth_client.session = chatgpt_client.session - - tokens = oauth_client.login_and_get_tokens( - email_addr, - pwd, - chatgpt_client.device_id, - chatgpt_client.ua, - chatgpt_client.sec_ch_ua, - chatgpt_client.impersonate, - skymail_adapter, - ) - if tokens and tokens.get("access_token"): - break - - if oauth_client.last_error and "add_phone" in oauth_client.last_error: - break - - if tokens and tokens.get("access_token"): - self._log("OAuth 回退补全成功!") - result.success = True - result.access_token = tokens.get("access_token") - result.refresh_token = tokens.get("refresh_token") - result.id_token = tokens.get("id_token") - result.account_id = "v2_acct_" + chatgpt_client.device_id[:8] - - session_data = oauth_client._decode_oauth_session_cookie() - if session_data: - workspaces = session_data.get("workspaces", []) - if workspaces: - result.workspace_id = str((workspaces[0] or {}).get("id") or "") - self._log(f"成功萃取 Workspace ID: {result.workspace_id}") - - session_cookie = None - for cookie in oauth_client.session.cookies.jar: - if cookie.name == "__Secure-next-auth.session-token": - session_cookie = cookie.value - break - result.session_token = session_cookie - - self._log("=" * 60) - self._log("注册流程成功结束!") - self._log("=" * 60) - return result - - last_error = self._format_oauth_failure(oauth_client) - result.error_message = last_error - return result - except Exception as attempt_error: - last_error = str(attempt_error) - if attempt < self.max_retries - 1 and self._should_retry(last_error): - self._log(f"本轮出现异常,准备整流程重试: {last_error}") - continue - raise - - result.error_message = last_error or "注册失败" - return result - - except Exception as e: - self._log(f"V2 注册全流程执行异常: {e}", "error") - import traceback - traceback.print_exc() - result.error_message = str(e) - return result + return any(m in text for m in retriable_markers) diff --git a/services/external_sync.py b/services/external_sync.py index 602b79d..72f5460 100644 --- a/services/external_sync.py +++ b/services/external_sync.py @@ -4,7 +4,11 @@ from __future__ import annotations from typing import Any -from services.chatgpt_sync import persist_cpa_sync_result, upload_chatgpt_account_to_cpa +from services.chatgpt_sync import ( + _get_account_extra, + persist_cpa_sync_result, + upload_chatgpt_account_to_cpa, +) def sync_account(account) -> list[dict[str, Any]]: @@ -20,7 +24,7 @@ def sync_account(account) -> list[dict[str, Any]]: a = _A() a.email = account.email - extra = account.extra or {} + extra = _get_account_extra(account) a.access_token = extra.get("access_token") or account.token a.refresh_token = extra.get("refresh_token", "") a.id_token = extra.get("id_token", "") @@ -40,7 +44,7 @@ def sync_account(account) -> list[dict[str, Any]]: codex_proxy_url = str(config_store.get("codex_proxy_url", "") or "").strip() if codex_proxy_url: upload_type = str(config_store.get("codex_proxy_upload_type", "at") or "at").strip().lower() - extra = account.extra or {} + extra = _get_account_extra(account) class _CP: pass