Merge branch 'zc-zhangchen:main' into main

This commit is contained in:
Yzzz
2026-04-03 15:37:11 +08:00
committed by GitHub
8 changed files with 873 additions and 170 deletions

View File

@@ -152,7 +152,7 @@ def _auto_upload_integrations(task_id: str, account):
name = result.get("name", "Auto Upload")
ok = bool(result.get("ok"))
msg = result.get("msg", "")
_log(task_id, f" [{name}] {' ' + msg if ok else ' ' + msg}")
_log(task_id, f" [{name}] {'[OK] ' + msg if ok else '[FAIL] ' + msg}")
except Exception as e:
_log(task_id, f" [Auto Upload] 自动导入异常: {e}")
@@ -292,7 +292,7 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
saved_account = save_account(account)
if _proxy:
proxy_pool.report_success(_proxy)
_log(task_id, f" 注册成功: {account.email}")
_log(task_id, f"[OK] 注册成功: {account.email}")
_save_task_log(req.platform, account.email, "success")
_auto_upload_integrations(task_id, saved_account or account)
cashier_url = (account.extra or {}).get("cashier_url", "")
@@ -301,7 +301,7 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
_task_store.add_cashier_url(task_id, cashier_url)
return AttemptResult.success()
except SkipCurrentAttemptRequested as e:
_log(task_id, f" 已跳过当前账号: {e}")
_log(task_id, f"[SKIP] 已跳过当前账号: {e}")
_save_task_log(
req.platform,
current_email,
@@ -310,12 +310,12 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
)
return AttemptResult.skipped(str(e))
except StopTaskRequested as e:
_log(task_id, f" {e}")
_log(task_id, f"[STOP] {e}")
return AttemptResult.stopped(str(e))
except Exception as e:
if _proxy and proxy_pool is not None:
proxy_pool.report_fail(_proxy)
_log(task_id, f" 注册失败: {e}")
_log(task_id, f"[FAIL] 注册失败: {e}")
_save_task_log(
req.platform,
current_email,
@@ -338,7 +338,7 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
except CancelledError:
continue
except Exception as e:
_log(task_id, f" 任务线程异常: {e}")
_log(task_id, f"[ERROR] 任务线程异常: {e}")
errors.append(str(e))
continue
if result.outcome == AttemptOutcome.SUCCESS:

View File

@@ -18,6 +18,7 @@ except ImportError:
sys.exit(1)
from .sentinel_token import build_sentinel_token
from .sentinel_browser import get_sentinel_token_via_browser
from .utils import (
FlowState,
build_browser_headers,
@@ -130,6 +131,33 @@ class ChatGPTClient:
seed_oai_device_cookie(self.session, self.device_id)
self.last_registration_state = FlowState()
def _get_sentinel_token(self, flow: str, *, page_url: str | None = None):
prefer_browser = flow in {"username_password_create", "oauth_create_account"}
if prefer_browser:
token = get_sentinel_token_via_browser(
flow=flow,
proxy=self.proxy,
page_url=page_url,
headless=self.browser_mode != "headed",
device_id=self.device_id,
log_fn=lambda msg: self._log(msg),
)
if token:
self._log(f"{flow}: 已通过 Playwright SentinelSDK 获取 token")
return token
token = build_sentinel_token(
self.session,
self.device_id,
flow=flow,
user_agent=self.ua,
sec_ch_ua=self.sec_ch_ua,
impersonate=self.impersonate,
)
if token:
self._log(f"{flow}: 已通过 HTTP PoW 获取 token")
return token
def _log(self, msg):
"""输出日志"""
if self.verbose:
@@ -606,6 +634,14 @@ class ChatGPTClient:
fetch_site="same-origin",
)
headers.update(generate_datadog_trace())
headers["oai-device-id"] = self.device_id
sentinel_token = self._get_sentinel_token(
"username_password_create",
page_url=f"{self.AUTH}/create-account/password",
)
if sentinel_token:
headers["openai-sentinel-token"] = sentinel_token
payload = {
"username": email,
@@ -720,13 +756,9 @@ class ChatGPTClient:
self._log(f"完成账号创建: {name}")
url = f"{self.AUTH}/api/accounts/create_account"
sentinel_token = build_sentinel_token(
self.session,
self.device_id,
flow="authorize_continue",
user_agent=self.ua,
sec_ch_ua=self.sec_ch_ua,
impersonate=self.impersonate,
sentinel_token = self._get_sentinel_token(
"oauth_create_account",
page_url=f"{self.AUTH}/about-you",
)
if sentinel_token:
self._log("create_account: 已生成 sentinel token")

View File

@@ -112,6 +112,7 @@ class RefreshTokenChatGPTRegistrationAdapter(BaseChatGPTRegistrationModeAdapter)
email_service=context.email_service,
proxy_url=context.proxy_url,
callback_logger=context.callback_logger,
browser_mode=context.browser_mode,
)

View File

@@ -66,6 +66,7 @@ OPENAI_API_ENDPOINTS = {
"validate_otp": "https://auth.openai.com/api/accounts/email-otp/validate",
"create_account": "https://auth.openai.com/api/accounts/create_account",
"select_workspace": "https://auth.openai.com/api/accounts/workspace/select",
"select_organization": "https://auth.openai.com/api/accounts/organization/select",
}
# OpenAI 页面类型(用于判断账号状态)
@@ -150,8 +151,8 @@ OPENAI_VERIFICATION_KEYWORDS = [
]
# 密码生成
PASSWORD_CHARSET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
DEFAULT_PASSWORD_LENGTH = 12
PASSWORD_CHARSET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%"
DEFAULT_PASSWORD_LENGTH = 16
# 用户信息生成(用于注册)
MIN_REGISTRATION_AGE = 20
@@ -243,7 +244,7 @@ DEFAULT_SETTINGS = [
("proxy.port", "7890", "代理端口", "proxy"),
("registration.max_retries", "3", "最大重试次数", "registration"),
("registration.timeout", "120", "超时时间(秒)", "registration"),
("registration.default_password_length", "12", "默认密码长度", "registration"),
("registration.default_password_length", "16", "默认密码长度", "registration"),
("webui.host", "0.0.0.0", "Web UI 监听主机", "webui"),
("webui.port", "8000", "Web UI 监听端口", "webui"),
("webui.debug", "true", "调试模式", "webui"),

View File

@@ -3,12 +3,12 @@
从 main.py 中提取并重构的注册流程
"""
import re
import base64
import json
import time
import logging
import secrets
import string
import time
import urllib.parse
from typing import Optional, Dict, Any, Tuple, Callable
from dataclasses import dataclass
from datetime import datetime
@@ -17,8 +17,16 @@ from curl_cffi import requests as cffi_requests
from core.task_runtime import TaskInterruption
from .oauth import OAuthManager, OAuthStart
from .http_client import OpenAIHTTPClient, HTTPClientError
from .utils import generate_device_id, seed_oai_device_cookie
from .http_client import OpenAIHTTPClient
from .sentinel_browser import get_sentinel_token_via_browser
from .sentinel_token import build_sentinel_token
from .utils import (
generate_datadog_trace,
generate_device_id,
generate_random_password,
normalize_flow_url,
seed_oai_device_cookie,
)
# from ..services import EmailServiceFactory, BaseEmailService, EmailServiceType # removed: external dep
# from ..database import crud # removed: external dep
# from ..database.session import get_db # removed: external dep
@@ -29,8 +37,6 @@ from .constants import (
OTP_CODE_PATTERN,
DEFAULT_PASSWORD_LENGTH,
PASSWORD_CHARSET,
AccountStatus,
TaskStatus,
)
# from ..config.settings import get_settings # removed: external dep
@@ -95,7 +101,8 @@ class RefreshTokenRegistrationEngine:
email_service,
proxy_url: Optional[str] = None,
callback_logger: Optional[Callable[[str], None]] = None,
task_uuid: Optional[str] = None
task_uuid: Optional[str] = None,
browser_mode: str = "headless",
):
"""
初始化注册引擎
@@ -110,6 +117,7 @@ class RefreshTokenRegistrationEngine:
self.proxy_url = proxy_url
self.callback_logger = callback_logger or (lambda msg: logger.info(msg))
self.task_uuid = task_uuid
self.browser_mode = str(browser_mode or "headless").strip().lower()
# 创建 HTTP 客户端
self.http_client = OpenAIHTTPClient(proxy_url=proxy_url)
@@ -138,6 +146,8 @@ class RefreshTokenRegistrationEngine:
self._used_verification_codes = set() # 已取过的验证码,避免二次登录时捞到旧码
self._is_existing_account: bool = False # 是否为已注册账号(用于自动登录)
self._token_acquisition_requires_login: bool = False # 新注册账号需要二次登录拿 token
self._post_otp_continue_url: str = ""
self._post_otp_page_type: str = ""
def _log(self, message: str, level: str = "info"):
"""记录日志"""
@@ -169,7 +179,8 @@ class RefreshTokenRegistrationEngine:
def _generate_password(self, length: int = DEFAULT_PASSWORD_LENGTH) -> str:
"""生成随机密码"""
return ''.join(secrets.choice(PASSWORD_CHARSET) for _ in range(length))
resolved_length = max(int(length or DEFAULT_PASSWORD_LENGTH), 8)
return generate_random_password(resolved_length)
def _check_ip_location(self) -> Tuple[bool, Optional[str]]:
"""检查 IP 地理位置"""
@@ -270,18 +281,82 @@ class RefreshTokenRegistrationEngine:
return None
def _check_sentinel(self, did: str) -> Optional[str]:
"""检查 Sentinel 拦截"""
def _default_user_agent(self) -> str:
try:
sen_token = self.http_client.check_sentinel(did)
if sen_token:
self._log(f"Sentinel token 获取成功")
return sen_token
self._log("Sentinel 检查失败: 未获取到 token", "warning")
return None
user_agent = str(self.session.headers.get("User-Agent") or "").strip()
if user_agent:
return user_agent
except Exception:
pass
return (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.0.0 Safari/537.36"
)
def _build_json_headers(
self,
*,
referer: str,
include_device_id: bool = False,
include_datadog: bool = False,
content_type: str = "application/json",
accept: str = "application/json",
) -> Dict[str, str]:
headers = {
"accept": accept,
"accept-language": "en-US,en;q=0.9",
"content-type": content_type,
"origin": "https://auth.openai.com",
"referer": referer,
"user-agent": self._default_user_agent(),
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
}
if include_device_id and self._device_id:
headers["oai-device-id"] = self._device_id
if include_datadog:
headers.update(generate_datadog_trace())
return headers
def _build_navigation_headers(self, *, referer: str) -> Dict[str, str]:
return {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"accept-language": "en-US,en;q=0.9",
"referer": referer,
"user-agent": self._default_user_agent(),
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
}
def _check_sentinel(self, did: str, *, flow: str = "authorize_continue") -> Optional[str]:
"""按参考实现为指定 flow 生成完整 Sentinel token。"""
try:
if not self.session:
self.session = self.http_client.session
if flow in {"username_password_create", "oauth_create_account"}:
browser_token = get_sentinel_token_via_browser(
flow=flow,
proxy=self.proxy_url,
headless=self.browser_mode != "headed",
device_id=did,
log_fn=lambda msg: self._log(msg),
)
if browser_token:
self._log(f"Sentinel Browser token 获取成功 ({flow})")
return browser_token
sen_token = build_sentinel_token(self.session, did, flow=flow)
if sen_token:
self._log(f"Sentinel token 获取成功 ({flow})")
return sen_token
self._log(f"Sentinel 检查失败: 未获取到 token ({flow})", "warning")
return None
except Exception as e:
self._log(f"Sentinel 检查异常: {e}", "warning")
self._log(f"Sentinel 检查异常 ({flow}): {e}", "warning")
return None
def _submit_auth_start(
@@ -309,21 +384,15 @@ class RefreshTokenRegistrationEngine:
"screen_hint": screen_hint,
})
headers = {
"referer": referer,
"accept": "application/json",
"content-type": "application/json",
}
headers = self._build_json_headers(
referer=referer,
include_device_id=True,
include_datadog=True,
)
headers["oai-device-id"] = did
if sen_token:
sentinel = json.dumps({
"p": "",
"t": "",
"c": sen_token,
"id": did,
"flow": "authorize_continue",
})
headers["openai-sentinel-token"] = sentinel
headers["openai-sentinel-token"] = sen_token
response = self.session.post(
OPENAI_API_ENDPOINTS["signup"],
@@ -402,13 +471,18 @@ class RefreshTokenRegistrationEngine:
def _submit_login_password(self) -> SignupFormResult:
"""提交登录密码,进入邮箱验证码页面。"""
try:
headers = self._build_json_headers(
referer="https://auth.openai.com/log-in/password",
include_device_id=True,
include_datadog=True,
)
sen_token = self._check_sentinel(self._device_id or "", flow="password_verify")
if sen_token:
headers["openai-sentinel-token"] = sen_token
response = self.session.post(
OPENAI_API_ENDPOINTS["password_verify"],
headers={
"referer": "https://auth.openai.com/log-in/password",
"accept": "application/json",
"content-type": "application/json",
},
headers=headers,
data=json.dumps({"password": self.password}),
)
@@ -447,6 +521,8 @@ class RefreshTokenRegistrationEngine:
self.oauth_start = None
self.session_token = None
self._otp_sent_at = None
self._post_otp_continue_url = ""
self._post_otp_page_type = ""
def _prepare_authorize_flow(self, label: str) -> Tuple[Optional[str], Optional[str]]:
"""初始化当前阶段的授权流程,返回 device id 和 sentinel token。"""
@@ -484,25 +560,14 @@ class RefreshTokenRegistrationEngine:
result.error_message = "验证码校验失败"
return False
self._log("获取 Workspace ID...")
workspace_id = self._get_workspace_id()
if not workspace_id:
result.error_message = "获取 Workspace ID 失败"
return False
result.workspace_id = workspace_id
self._log("选择 Workspace...")
continue_url = self._select_workspace(workspace_id)
if not continue_url:
result.error_message = "选择 Workspace 失败"
return False
self._log("跟随重定向链...")
callback_url = self._follow_redirects(continue_url)
self._log("解析 OTP 后的 OAuth 跳转状态...")
continue_url = self._resolve_post_otp_continue_url()
self._log("执行 consent/workspace/organization 流程...")
callback_url, workspace_id = self._resolve_oauth_callback_url(continue_url)
if not callback_url:
result.error_message = "跟随重定向链失败"
result.error_message = "未获取到 OAuth 回调地址"
return False
result.workspace_id = workspace_id or ""
self._log("处理 OAuth 回调并获取 Token...")
token_info = self._handle_oauth_callback(callback_url)
@@ -564,13 +629,21 @@ class RefreshTokenRegistrationEngine:
"username": self.email
})
headers = self._build_json_headers(
referer="https://auth.openai.com/create-account/password",
include_device_id=True,
include_datadog=True,
)
sen_token = self._check_sentinel(
self._device_id or "",
flow="username_password_create",
)
if sen_token:
headers["openai-sentinel-token"] = sen_token
response = self.session.post(
OPENAI_API_ENDPOINTS["register"],
headers={
"referer": "https://auth.openai.com/create-account/password",
"accept": "application/json",
"content-type": "application/json",
},
headers=headers,
data=register_body,
)
@@ -631,10 +704,9 @@ class RefreshTokenRegistrationEngine:
response = self.session.get(
OPENAI_API_ENDPOINTS["send_otp"],
headers={
"referer": "https://auth.openai.com/create-account/password",
"accept": "application/json",
},
headers=self._build_navigation_headers(
referer="https://auth.openai.com/create-account/password"
),
)
self._log(f"验证码发送状态: {response.status_code}")
@@ -687,19 +759,42 @@ class RefreshTokenRegistrationEngine:
"""验证验证码"""
try:
code_body = f'{{"code":"{code}"}}'
headers = self._build_json_headers(
referer="https://auth.openai.com/email-verification",
include_device_id=True,
include_datadog=True,
)
sen_token = self._check_sentinel(
self._device_id or "",
flow="email_otp_validate",
)
if sen_token:
headers["openai-sentinel-token"] = sen_token
response = self.session.post(
OPENAI_API_ENDPOINTS["validate_otp"],
headers={
"referer": "https://auth.openai.com/email-verification",
"accept": "application/json",
"content-type": "application/json",
},
headers=headers,
data=code_body,
)
self._log(f"验证码校验状态: {response.status_code}")
return response.status_code == 200
if response.status_code != 200:
return False
try:
response_data = response.json() or {}
except Exception:
response_data = {}
self._post_otp_continue_url = str(response_data.get("continue_url") or "").strip()
self._post_otp_page_type = str(
((response_data.get("page") or {}).get("type")) or ""
).strip()
if self._post_otp_continue_url:
self._log(f"验证码校验后 continue_url: {self._post_otp_continue_url}")
if self._post_otp_page_type:
self._log(f"验证码校验后页面类型: {self._post_otp_page_type}")
return True
except Exception as e:
self._log(f"验证验证码失败: {e}", "error")
@@ -712,147 +807,462 @@ class RefreshTokenRegistrationEngine:
self._log(f"生成用户信息: {user_info['name']}, 生日: {user_info['birthdate']}")
create_account_body = json.dumps(user_info)
headers = self._build_json_headers(
referer="https://auth.openai.com/about-you",
include_device_id=True,
include_datadog=True,
)
sen_token = self._check_sentinel(
self._device_id or "",
flow="oauth_create_account",
)
if sen_token:
headers["openai-sentinel-token"] = sen_token
response = self.session.post(
OPENAI_API_ENDPOINTS["create_account"],
headers={
"referer": "https://auth.openai.com/about-you",
"accept": "application/json",
"content-type": "application/json",
},
headers=headers,
data=create_account_body,
)
self._log(f"账户创建状态: {response.status_code}")
if response.status_code != 200:
self._log(f"账户创建失败: {response.text[:200]}", "warning")
if response.status_code == 200:
return True
body_preview = response.text[:200]
self._log(f"账户创建失败: {body_preview}", "warning")
should_retry = response.status_code in (400, 403) and (
"sentinel" in body_preview.lower()
or "registration_disallowed" in body_preview.lower()
)
if not should_retry:
return False
return True
self._log("create_account 命中 sentinel 校验,刷新 token 后重试一次...", "warning")
retry_token = self._check_sentinel(
self._device_id or "",
flow="oauth_create_account",
)
if retry_token:
headers["openai-sentinel-token"] = retry_token
retry_resp = self.session.post(
OPENAI_API_ENDPOINTS["create_account"],
headers=headers,
data=create_account_body,
)
self._log(f"账户创建重试状态: {retry_resp.status_code}")
if retry_resp.status_code == 200:
return True
self._log(f"账户创建重试失败: {retry_resp.text[:200]}", "warning")
return False
except Exception as e:
self._log(f"创建账户失败: {e}", "error")
return False
def _get_workspace_id(self) -> Optional[str]:
"""获取 Workspace ID"""
@staticmethod
def _decode_cookie_json_value(raw_value: str) -> Optional[Dict[str, Any]]:
value = str(raw_value or "").strip()
if not value:
return None
candidates = [value]
if "." in value:
parts = value.split(".")
candidates = [parts[0], value, *parts[:2]]
for candidate in candidates:
candidate = str(candidate or "").strip()
if not candidate:
continue
padded = candidate + "=" * (-len(candidate) % 4)
for decoder in (base64.urlsafe_b64decode, base64.b64decode):
try:
decoded = decoder(padded.encode("ascii")).decode("utf-8")
parsed = json.loads(decoded)
except Exception:
continue
if isinstance(parsed, dict):
return parsed
return None
def _decode_auth_session_cookie(self) -> Optional[Dict[str, Any]]:
try:
auth_cookie = self.session.cookies.get("oai-client-auth-session")
if not auth_cookie:
self._log("未能获取到授权 Cookie", "error")
return None
except Exception:
auth_cookie = None
if not auth_cookie:
return None
return self._decode_cookie_json_value(auth_cookie)
# 解码 JWT
import base64
import json as json_module
def _extract_callback_url_from_candidate(self, candidate: str) -> str:
normalized = normalize_flow_url(str(candidate or "").strip(), auth_base="https://auth.openai.com")
if not normalized:
return ""
parsed = urllib.parse.urlparse(normalized)
query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
code = str((query.get("code") or [""])[0] or "").strip()
state = str((query.get("state") or [""])[0] or "").strip()
return normalized if code and state else ""
def _follow_and_extract_callback_url(self, start_url: str, max_depth: int = 10) -> str:
current_url = normalize_flow_url(start_url, auth_base="https://auth.openai.com")
referer = "https://auth.openai.com/sign-in-with-chatgpt/codex/consent"
for hop in range(max_depth):
if not current_url:
return ""
callback_url = self._extract_callback_url_from_candidate(current_url)
if callback_url:
return callback_url
self._log(f"OAuth 跟随重定向 {hop + 1}/{max_depth}: {current_url[:120]}...")
try:
segments = auth_cookie.split(".")
if len(segments) < 1:
self._log("授权 Cookie 格式错误", "error")
return None
# 解码第一个 segment
payload = segments[0]
pad = "=" * ((4 - (len(payload) % 4)) % 4)
decoded = base64.urlsafe_b64decode((payload + pad).encode("ascii"))
auth_json = json_module.loads(decoded.decode("utf-8"))
workspaces = auth_json.get("workspaces") or []
if not workspaces:
self._log("授权 Cookie 里没有 workspace 信息", "error")
return None
workspace_id = str((workspaces[0] or {}).get("id") or "").strip()
if not workspace_id:
self._log("无法解析 workspace_id", "error")
return None
self._log(f"Workspace ID: {workspace_id}")
return workspace_id
response = self.session.get(
current_url,
headers=self._build_navigation_headers(referer=referer),
allow_redirects=False,
timeout=15,
)
except Exception as e:
self._log(f"解析授权 Cookie 失败: {e}", "error")
self._log(f"OAuth 跟随重定向失败: {e}", "warning")
return ""
referer = current_url
location = str(response.headers.get("Location") or "").strip()
if response.status_code in (301, 302, 303, 307, 308) and location:
next_url = normalize_flow_url(
urllib.parse.urljoin(current_url, location),
auth_base="https://auth.openai.com",
)
callback_url = self._extract_callback_url_from_candidate(next_url)
if callback_url:
return callback_url
current_url = next_url
continue
callback_url = self._extract_callback_url_from_candidate(str(response.url))
if callback_url:
return callback_url
break
return ""
def _create_account_during_oauth_if_needed(self) -> str:
user_info = generate_random_user_info()
headers = self._build_json_headers(
referer="https://auth.openai.com/about-you",
include_device_id=True,
include_datadog=True,
)
sen_token = self._check_sentinel(self._device_id or "", flow="oauth_create_account")
if sen_token:
headers["openai-sentinel-token"] = sen_token
try:
response = self.session.post(
OPENAI_API_ENDPOINTS["create_account"],
headers=headers,
data=json.dumps(user_info),
)
except Exception as e:
self._log(f"OAuth about-you create_account 失败: {e}", "warning")
return ""
if response.status_code == 200:
try:
response_data = response.json() or {}
except Exception:
response_data = {}
return normalize_flow_url(
str(response_data.get("continue_url") or ""),
auth_base="https://auth.openai.com",
)
body_text = response.text[:200]
if response.status_code == 400 and "already_exists" in body_text.lower():
return "https://auth.openai.com/sign-in-with-chatgpt/codex/consent"
self._log(f"OAuth about-you create_account 失败: {response.status_code} {body_text}", "warning")
return ""
def _resolve_post_otp_continue_url(self) -> str:
continue_url = normalize_flow_url(
self._post_otp_continue_url,
auth_base="https://auth.openai.com",
)
page_type = str(self._post_otp_page_type or "").strip().lower()
if continue_url and "about-you" in continue_url:
self._log("OTP 后进入 about-you按参考 RT 逻辑补齐 consent 跳转...")
try:
response = self.session.get(
"https://auth.openai.com/about-you",
headers=self._build_navigation_headers(
referer="https://auth.openai.com/email-verification"
),
allow_redirects=True,
timeout=30,
)
final_url = normalize_flow_url(
str(response.url or ""),
auth_base="https://auth.openai.com",
)
callback_url = self._extract_callback_url_from_candidate(final_url)
if callback_url:
return callback_url
if "consent" in final_url or "organization" in final_url:
return final_url
except Exception as e:
self._log(f"GET about-you 失败: {e}", "warning")
created_continue_url = self._create_account_during_oauth_if_needed()
if created_continue_url:
return created_continue_url
if not continue_url and "consent" in page_type:
continue_url = "https://auth.openai.com/sign-in-with-chatgpt/codex/consent"
if continue_url:
return continue_url
return "https://auth.openai.com/sign-in-with-chatgpt/codex/consent"
def _get_workspace_id(self) -> Optional[str]:
"""从 oai-client-auth-session cookie 中解析 workspace_id。"""
try:
auth_json = self._decode_auth_session_cookie()
if not auth_json:
self._log("未能解码 oai-client-auth-session Cookie", "error")
return None
workspaces = auth_json.get("workspaces") or []
if not workspaces:
self._log("授权 Cookie 里没有 workspace 信息", "error")
return None
workspace_id = str((workspaces[0] or {}).get("id") or "").strip()
if not workspace_id:
self._log("无法解析 workspace_id", "error")
return None
self._log(f"Workspace ID: {workspace_id}")
return workspace_id
except Exception as e:
self._log(f"获取 Workspace ID 失败: {e}", "error")
return None
def _select_workspace(self, workspace_id: str) -> Optional[str]:
"""选择 Workspace"""
"""兼容旧逻辑:仅提交 workspace 并返回 continue_url。"""
try:
select_body = f'{{"workspace_id":"{workspace_id}"}}'
response = self.session.post(
OPENAI_API_ENDPOINTS["select_workspace"],
headers={
"referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent",
"content-type": "application/json",
},
data=select_body,
headers=self._build_json_headers(
referer="https://auth.openai.com/sign-in-with-chatgpt/codex/consent",
include_device_id=True,
include_datadog=True,
),
data=json.dumps({"workspace_id": workspace_id}),
allow_redirects=False,
timeout=30,
)
if response.status_code != 200:
self._log(f"选择 workspace 失败: {response.status_code}", "error")
self._log(f"响应: {response.text[:200]}", "warning")
return None
continue_url = str((response.json() or {}).get("continue_url") or "").strip()
if not continue_url:
self._log("workspace/select 响应里缺少 continue_url", "error")
return None
self._log(f"Continue URL: {continue_url[:100]}...")
return continue_url
except Exception as e:
self._log(f"选择 Workspace 失败: {e}", "error")
return None
def _follow_redirects(self, start_url: str) -> Optional[str]:
"""跟随重定向,寻找回调 URL"""
"""兼容旧逻辑:手动跟随重定向,寻找 OAuth 回调 URL"""
callback_url = self._follow_and_extract_callback_url(start_url)
if callback_url:
return callback_url
self._log("未能在重定向链中找到回调 URL", "error")
return None
def _resolve_oauth_callback_url(self, start_url: str) -> Tuple[str, str]:
consent_url = normalize_flow_url(
start_url or "https://auth.openai.com/sign-in-with-chatgpt/codex/consent",
auth_base="https://auth.openai.com",
)
workspace_id = ""
callback_url = self._extract_callback_url_from_candidate(consent_url)
if callback_url:
return callback_url, workspace_id
self._log(f"consent URL: {consent_url}")
try:
current_url = start_url
max_redirects = 6
response = self.session.get(
consent_url,
headers=self._build_navigation_headers(
referer="https://auth.openai.com/email-verification"
),
allow_redirects=False,
timeout=30,
)
if response.status_code in (301, 302, 303, 307, 308):
location = normalize_flow_url(
urllib.parse.urljoin(consent_url, str(response.headers.get("Location") or "")),
auth_base="https://auth.openai.com",
)
callback_url = self._extract_callback_url_from_candidate(location)
if callback_url:
return callback_url, workspace_id
callback_url = self._follow_and_extract_callback_url(location)
if callback_url:
return callback_url, workspace_id
except Exception as e:
self._log(f"加载 consent 页面异常: {e}", "warning")
for i in range(max_redirects):
self._log(f"重定向 {i+1}/{max_redirects}: {current_url[:100]}...")
response = self.session.get(
current_url,
workspace_id = self._get_workspace_id() or ""
if workspace_id:
try:
ws_response = self.session.post(
OPENAI_API_ENDPOINTS["select_workspace"],
headers=self._build_json_headers(
referer=consent_url,
include_device_id=True,
include_datadog=True,
),
data=json.dumps({"workspace_id": workspace_id}),
allow_redirects=False,
timeout=15
timeout=30,
)
location = response.headers.get("Location") or ""
self._log(f"workspace/select -> {ws_response.status_code}")
# 如果不是重定向状态码,停止
if response.status_code not in [301, 302, 303, 307, 308]:
self._log(f"非重定向状态码: {response.status_code}")
break
if ws_response.status_code in (301, 302, 303, 307, 308):
location = normalize_flow_url(
urllib.parse.urljoin(
consent_url, str(ws_response.headers.get("Location") or "")
),
auth_base="https://auth.openai.com",
)
callback_url = self._extract_callback_url_from_candidate(location)
if callback_url:
return callback_url, workspace_id
callback_url = self._follow_and_extract_callback_url(location)
if callback_url:
return callback_url, workspace_id
if not location:
self._log("重定向响应缺少 Location 头")
break
if ws_response.status_code == 200:
try:
ws_data = ws_response.json() or {}
except Exception:
ws_data = {}
# 构建下一个 URL
import urllib.parse
next_url = urllib.parse.urljoin(current_url, location)
ws_continue_url = normalize_flow_url(
str(ws_data.get("continue_url") or ""),
auth_base="https://auth.openai.com",
)
orgs = ((ws_data.get("data") or {}).get("orgs")) or []
# 检查是否包含回调参数
if "code=" in next_url and "state=" in next_url:
self._log(f"找到回调 URL: {next_url[:100]}...")
return next_url
if orgs:
first_org = orgs[0] or {}
org_id = str(first_org.get("id") or "").strip()
project_id = str(
(((first_org.get("projects") or [None])[0]) or {}).get("id") or ""
).strip()
if org_id:
org_payload = {"org_id": org_id}
if project_id:
org_payload["project_id"] = project_id
current_url = next_url
org_referer = ws_continue_url or consent_url
org_response = self.session.post(
OPENAI_API_ENDPOINTS["select_organization"],
headers=self._build_json_headers(
referer=org_referer,
include_device_id=True,
include_datadog=True,
),
data=json.dumps(org_payload),
allow_redirects=False,
timeout=30,
)
self._log("未能在重定向链中找到回调 URL", "error")
return None
self._log(f"organization/select -> {org_response.status_code}")
if org_response.status_code in (301, 302, 303, 307, 308):
location = normalize_flow_url(
urllib.parse.urljoin(
org_referer,
str(org_response.headers.get("Location") or ""),
),
auth_base="https://auth.openai.com",
)
callback_url = self._extract_callback_url_from_candidate(location)
if callback_url:
return callback_url, workspace_id
callback_url = self._follow_and_extract_callback_url(location)
if callback_url:
return callback_url, workspace_id
if org_response.status_code == 200:
try:
org_data = org_response.json() or {}
except Exception:
org_data = {}
org_continue_url = normalize_flow_url(
str(org_data.get("continue_url") or ""),
auth_base="https://auth.openai.com",
)
callback_url = self._extract_callback_url_from_candidate(org_continue_url)
if callback_url:
return callback_url, workspace_id
if org_continue_url:
callback_url = self._follow_and_extract_callback_url(org_continue_url)
if callback_url:
return callback_url, workspace_id
callback_url = self._extract_callback_url_from_candidate(ws_continue_url)
if callback_url:
return callback_url, workspace_id
if ws_continue_url:
callback_url = self._follow_and_extract_callback_url(ws_continue_url)
if callback_url:
return callback_url, workspace_id
except Exception as e:
self._log(f"处理 workspace/select 响应异常: {e}", "warning")
try:
response = self.session.get(
consent_url,
headers=self._build_navigation_headers(
referer="https://auth.openai.com/email-verification"
),
allow_redirects=True,
timeout=30,
)
callback_url = self._extract_callback_url_from_candidate(str(response.url or ""))
if callback_url:
return callback_url, workspace_id
for item in getattr(response, "history", []) or []:
callback_url = self._extract_callback_url_from_candidate(
str((item.headers or {}).get("Location") or "")
)
if callback_url:
return callback_url, workspace_id
except Exception as e:
self._log(f"跟随重定向失败: {e}", "error")
return None
self._log(f"consent fallback 跟随失败: {e}", "warning")
return "", workspace_id
def _handle_oauth_callback(self, callback_url: str) -> Optional[Dict[str, Any]]:
"""处理 OAuth 回调"""
@@ -894,6 +1304,8 @@ class RefreshTokenRegistrationEngine:
self._token_acquisition_requires_login = False
self._otp_sent_at = None
self._device_id = None
self._post_otp_continue_url = ""
self._post_otp_page_type = ""
self._used_verification_codes.clear()
self._log("=" * 60)

View File

@@ -0,0 +1,137 @@
"""Playwright 版 Sentinel SDK token 获取辅助。"""
from __future__ import annotations
import json
from typing import Callable, Optional
from core.proxy_utils import build_playwright_proxy_config
def _flow_page_url(flow: str) -> str:
flow_name = str(flow or "").strip().lower()
mapping = {
"authorize_continue": "https://auth.openai.com/create-account",
"username_password_create": "https://auth.openai.com/create-account/password",
"password_verify": "https://auth.openai.com/log-in/password",
"email_otp_validate": "https://auth.openai.com/email-verification",
"oauth_create_account": "https://auth.openai.com/about-you",
}
return mapping.get(flow_name, "https://auth.openai.com/about-you")
def get_sentinel_token_via_browser(
*,
flow: str,
proxy: Optional[str] = None,
timeout_ms: int = 45000,
page_url: Optional[str] = None,
headless: bool = True,
device_id: Optional[str] = None,
log_fn: Optional[Callable[[str], None]] = None,
) -> Optional[str]:
"""通过浏览器直接调用 SentinelSDK.token(flow) 获取完整 token。"""
logger = log_fn or (lambda _msg: None)
try:
from playwright.sync_api import sync_playwright
except Exception as e:
logger(f"Sentinel Browser 不可用: {e}")
return None
target_url = str(page_url or _flow_page_url(flow)).strip() or _flow_page_url(flow)
launch_args = {
"headless": bool(headless),
"args": [
"--no-sandbox",
"--disable-blink-features=AutomationControlled",
],
}
proxy_config = build_playwright_proxy_config(proxy)
if proxy_config:
launch_args["proxy"] = proxy_config
logger(f"Sentinel Browser 启动: flow={flow}, url={target_url}")
with sync_playwright() as p:
browser = p.chromium.launch(**launch_args)
try:
context = browser.new_context(
viewport={"width": 1440, "height": 900},
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/136.0.7103.92 Safari/537.36"
),
ignore_https_errors=True,
)
if device_id:
try:
context.add_cookies(
[
{
"name": "oai-did",
"value": str(device_id),
"url": "https://auth.openai.com/",
"path": "/",
"secure": True,
"sameSite": "Lax",
}
]
)
except Exception:
pass
page = context.new_page()
page.goto(target_url, wait_until="domcontentloaded", timeout=timeout_ms)
page.wait_for_function(
"() => typeof window.SentinelSDK !== 'undefined' && typeof window.SentinelSDK.token === 'function'",
timeout=min(timeout_ms, 15000),
)
result = page.evaluate(
"""
async ({ flow }) => {
try {
const token = await window.SentinelSDK.token(flow);
return { success: true, token };
} catch (e) {
return {
success: false,
error: (e && (e.message || String(e))) || "unknown",
};
}
}
""",
{"flow": flow},
)
if not result or not result.get("success") or not result.get("token"):
logger(
"Sentinel Browser 获取失败: "
+ str((result or {}).get("error") or "no result")
)
return None
token = str(result["token"] or "").strip()
if not token:
logger("Sentinel Browser 返回空 token")
return None
try:
parsed = json.loads(token)
logger(
"Sentinel Browser 成功: "
f"p={'' if parsed.get('p') else ''} "
f"t={'' if parsed.get('t') else ''} "
f"c={'' if parsed.get('c') else ''}"
)
except Exception:
logger(f"Sentinel Browser 成功: len={len(token)}")
return token
except Exception as e:
logger(f"Sentinel Browser 异常: {e}")
return None
finally:
browser.close()

View File

@@ -114,13 +114,13 @@ class TurnstileAPIServer:
self.console.clear()
combined_text = Text()
combined_text.append("\n📢 Channel: ", style="bold white")
combined_text.append("\n[Channel] ", style="bold white")
combined_text.append("https://t.me/D3_vin", style="cyan")
combined_text.append("\n💬 Chat: ", style="bold white")
combined_text.append("\n[Chat] ", style="bold white")
combined_text.append("https://t.me/D3vin_chat", style="cyan")
combined_text.append("\n📁 GitHub: ", style="bold white")
combined_text.append("\n[GitHub] ", style="bold white")
combined_text.append("https://github.com/D3-vin", style="cyan")
combined_text.append("\n📁 Version: ", style="bold white")
combined_text.append("\n[Version] ", style="bold white")
combined_text.append("1.2a", style="green")
combined_text.append("\n")

View File

@@ -1,3 +1,5 @@
import base64
import json
import unittest
from unittest import mock
@@ -51,6 +53,11 @@ class _DummyHTTPClient:
class RegistrationEngineFlowTests(unittest.TestCase):
@staticmethod
def _encode_cookie_payload(data):
raw = json.dumps(data, separators=(",", ":")).encode("utf-8")
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
def _make_engine(self):
return RefreshTokenRegistrationEngine(
email_service=DummyEmailService(),
@@ -228,6 +235,119 @@ class RegistrationEngineFlowTests(unittest.TestCase):
restart_login.assert_not_called()
complete_exchange.assert_called_once()
@mock.patch(
"platforms.chatgpt.refresh_token_registration_engine.build_sentinel_token",
return_value='{"flow":"password_verify"}',
)
def test_submit_login_password_uses_password_verify_sentinel(self, mock_build_sentinel):
engine = self._make_engine()
engine._device_id = "device-fixed"
engine.password = "Secret123!"
engine.session = mock.Mock()
response = mock.Mock(status_code=200)
response.json.return_value = {
"page": {"type": "email_otp_verification"},
"continue_url": "/email-verification",
}
engine.session.post.return_value = response
result = engine._submit_login_password()
self.assertTrue(result.success)
mock_build_sentinel.assert_called_once_with(
engine.session, "device-fixed", flow="password_verify"
)
headers = engine.session.post.call_args.kwargs["headers"]
self.assertEqual(headers["openai-sentinel-token"], '{"flow":"password_verify"}')
def test_resolve_oauth_callback_url_handles_organization_select_redirect(self):
engine = self._make_engine()
engine._device_id = "device-fixed"
engine.session = mock.Mock()
cookie_payload = {
"workspaces": [{"id": "ws-123", "kind": "personal"}],
}
engine.session.cookies.get.side_effect = lambda name, default=None: (
self._encode_cookie_payload(cookie_payload)
if name == "oai-client-auth-session"
else default
)
consent_response = mock.Mock(status_code=200, headers={}, url="https://auth.openai.com/sign-in-with-chatgpt/codex/consent")
workspace_response = mock.Mock(status_code=200, headers={}, url="https://auth.openai.com/api/accounts/workspace/select")
workspace_response.json.return_value = {
"continue_url": "/sign-in-with-chatgpt/codex/organization",
"page": {"type": "organization_select"},
"data": {
"orgs": [
{
"id": "org-123",
"projects": [{"id": "proj-123"}],
}
]
},
}
org_response = mock.Mock(
status_code=302,
headers={
"Location": "http://localhost:1455/auth/callback?code=auth-code&state=oauth-state"
},
)
engine.session.get.side_effect = [consent_response]
engine.session.post.side_effect = [workspace_response, org_response]
callback_url, workspace_id = engine._resolve_oauth_callback_url(
"https://auth.openai.com/sign-in-with-chatgpt/codex/consent"
)
self.assertEqual(workspace_id, "ws-123")
self.assertEqual(
callback_url,
"http://localhost:1455/auth/callback?code=auth-code&state=oauth-state",
)
self.assertEqual(engine.session.post.call_count, 2)
@mock.patch(
"platforms.chatgpt.refresh_token_registration_engine.build_sentinel_token",
return_value='{"source":"pow"}',
)
@mock.patch(
"platforms.chatgpt.refresh_token_registration_engine.get_sentinel_token_via_browser",
return_value='{"source":"browser"}',
)
def test_check_sentinel_prefers_browser_for_register_and_create_account_flows(
self, mock_browser_token, mock_pow_token
):
engine = self._make_engine()
engine.session = mock.Mock()
token = engine._check_sentinel("device-fixed", flow="username_password_create")
self.assertEqual(token, '{"source":"browser"}')
mock_browser_token.assert_called_once()
mock_pow_token.assert_not_called()
@mock.patch(
"platforms.chatgpt.refresh_token_registration_engine.build_sentinel_token",
return_value='{"source":"pow"}',
)
@mock.patch(
"platforms.chatgpt.refresh_token_registration_engine.get_sentinel_token_via_browser",
return_value=None,
)
def test_check_sentinel_falls_back_to_pow_when_browser_token_missing(
self, mock_browser_token, mock_pow_token
):
engine = self._make_engine()
engine.session = mock.Mock()
token = engine._check_sentinel("device-fixed", flow="oauth_create_account")
self.assertEqual(token, '{"source":"pow"}')
mock_browser_token.assert_called_once()
mock_pow_token.assert_called_once_with(
engine.session, "device-fixed", flow="oauth_create_account"
)
if __name__ == "__main__":
unittest.main()