修复chatgpt注册流程

This commit is contained in:
zhangchen
2026-03-29 22:49:31 +08:00
parent 7dbe07dbbd
commit 7c31609ceb
12 changed files with 1590 additions and 741 deletions

1
.gitignore vendored
View File

@@ -50,3 +50,4 @@ data/
.idea/
.vscode/
*.swp
.claude/

View File

@@ -5,7 +5,13 @@ export const EXECUTOR_OPTIONS = [
] as const
const PLATFORM_EXECUTORS: Record<string, string[]> = {
chatgpt: ['protocol', 'headless', 'headed'],
cursor: ['protocol', 'headless', 'headed'],
grok: ['protocol', 'headless', 'headed'],
kiro: ['protocol', 'headless', 'headed'],
tavily: ['protocol', 'headless', 'headed'],
trae: ['protocol', 'headless', 'headed'],
openblocklabs: ['protocol'],
}
export function getSupportedExecutors(platform?: string) {

View File

@@ -84,7 +84,8 @@ function LogPanel({ taskId, onDone }: { taskId: string; onDone: () => void }) {
style={{
flex: 1,
overflow: 'auto',
background: 'rgba(0,0,0,0.4)',
background: '#ffffff',
border: '1px solid #e5e7eb',
borderRadius: 8,
padding: 12,
fontFamily: 'monospace',
@@ -97,13 +98,13 @@ function LogPanel({ taskId, onDone }: { taskId: string; onDone: () => void }) {
whiteSpace: 'pre-wrap',
}}
>
{lines.length === 0 && <div style={{ color: '#7a8ba3' }}>...</div>}
{lines.length === 0 && <div style={{ color: '#9ca3af' }}>...</div>}
{lines.map((l, i) => (
<div
key={i}
style={{
lineHeight: 1.5,
color: l.includes('✓') || l.includes('成功') ? '#10b981' : l.includes('✗') || l.includes('失败') || l.includes('错误') ? '#ef4444' : '#b0bcd4',
color: l.includes('✓') || l.includes('成功') ? '#059669' : l.includes('✗') || l.includes('失败') || l.includes('错误') ? '#dc2626' : '#1f2937',
}}
>
{l}

View File

@@ -149,10 +149,13 @@ export default function Register() {
<Form.Item name="platform" label="平台" rules={[{ required: true }]}>
<Select
options={[
{ value: 'chatgpt', label: 'ChatGPT' },
{ value: 'trae', label: 'Trae.ai' },
{ value: 'cursor', label: 'Cursor' },
{ value: 'kiro', label: 'Kiro' },
{ value: 'grok', label: 'Grok' },
{ value: 'tavily', label: 'Tavily' },
{ value: 'openblocklabs', label: 'OpenBlockLabs' },
]}
/>
</Form.Item>

View File

@@ -253,7 +253,7 @@ function ConfigField({ field }: { field: FieldConfig }) {
const options = SELECT_FIELDS[field.key]
const helpText =
field.key === 'default_executor'
? '仅对支持的平台生效;当前只有 Trae 支持浏览器模式,其他平台会自动回退为纯协议。'
? '仅对支持的平台生效;ChatGPT、Cursor、Grok、Kiro、Tavily、Trae 支持浏览器模式OpenBlockLabs 仅支持纯协议。'
: undefined
return (

View File

@@ -6,7 +6,7 @@ ChatGPT 注册客户端模块
import random
import uuid
import time
from urllib.parse import urlparse, parse_qs
from urllib.parse import urlparse
try:
from curl_cffi import requests as curl_requests
@@ -16,7 +16,17 @@ except ImportError:
sys.exit(1)
from .sentinel_token import build_sentinel_token
from .utils import generate_datadog_trace
from .utils import (
FlowState,
build_browser_headers,
decode_jwt_payload,
describe_flow_state,
extract_flow_state,
generate_datadog_trace,
normalize_flow_url,
random_delay,
seed_oai_device_cookie,
)
# Chrome 指纹配置
@@ -56,17 +66,25 @@ class ChatGPTClient:
BASE = "https://chatgpt.com"
AUTH = "https://auth.openai.com"
def __init__(self, proxy=None, verbose=True):
def __init__(self, proxy=None, verbose=True, browser_mode="protocol"):
"""
初始化 ChatGPT 客户端
Args:
proxy: 代理地址
verbose: 是否输出详细日志
browser_mode: protocol | headless | headed
"""
self.proxy = proxy
self.verbose = verbose
self.browser_mode = browser_mode or "protocol"
self.device_id = str(uuid.uuid4())
self.accept_language = random.choice([
"en-US,en;q=0.9",
"en-US,en;q=0.9,zh-CN;q=0.8",
"en,en-US;q=0.9",
"en-US,en;q=0.8",
])
# 随机 Chrome 版本
self.impersonate, self.chrome_major, self.chrome_full, self.ua, self.sec_ch_ua = _random_chrome_version()
@@ -80,10 +98,7 @@ class ChatGPTClient:
# 设置基础 headers
self.session.headers.update({
"User-Agent": self.ua,
"Accept-Language": random.choice([
"en-US,en;q=0.9", "en-US,en;q=0.9,zh-CN;q=0.8",
"en,en-US;q=0.9", "en-US,en;q=0.8",
]),
"Accept-Language": self.accept_language,
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
@@ -94,17 +109,61 @@ class ChatGPTClient:
})
# 设置 oai-did cookie
self.session.cookies.set("oai-did", self.device_id, domain="chatgpt.com")
seed_oai_device_cookie(self.session, self.device_id)
self.last_registration_state = FlowState()
def _log(self, msg):
"""输出日志"""
if self.verbose:
print(f" {msg}")
def _browser_pause(self, low=0.15, high=0.45):
"""在 headed 模式下加入轻微停顿,模拟有头浏览器节奏。"""
if self.browser_mode == "headed":
random_delay(low, high)
def _headers(
self,
url,
*,
accept,
referer=None,
origin=None,
content_type=None,
navigation=False,
fetch_mode=None,
fetch_dest=None,
fetch_site=None,
extra_headers=None,
):
return build_browser_headers(
url=url,
user_agent=self.ua,
sec_ch_ua=self.sec_ch_ua,
chrome_full_version=self.chrome_full,
accept=accept,
accept_language=self.accept_language,
referer=referer,
origin=origin,
content_type=content_type,
navigation=navigation,
fetch_mode=fetch_mode,
fetch_dest=fetch_dest,
fetch_site=fetch_site,
headed=self.browser_mode == "headed",
extra_headers=extra_headers,
)
def _reset_session(self):
"""重置浏览器指纹与会话,用于绕过偶发的 Cloudflare/SPA 中间页。"""
self.device_id = str(uuid.uuid4())
self.impersonate, self.chrome_major, self.chrome_full, self.ua, self.sec_ch_ua = _random_chrome_version()
self.accept_language = random.choice([
"en-US,en;q=0.9",
"en-US,en;q=0.9,zh-CN;q=0.8",
"en,en-US;q=0.9",
"en-US,en;q=0.8",
])
self.session = curl_requests.Session(impersonate=self.impersonate)
if self.proxy:
@@ -112,10 +171,7 @@ class ChatGPTClient:
self.session.headers.update({
"User-Agent": self.ua,
"Accept-Language": random.choice([
"en-US,en;q=0.9", "en-US,en;q=0.9,zh-CN;q=0.8",
"en,en-US;q=0.9", "en-US,en;q=0.8",
]),
"Accept-Language": self.accept_language,
"sec-ch-ua": self.sec_ch_ua,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
@@ -124,18 +180,225 @@ class ChatGPTClient:
"sec-ch-ua-full-version": f'"{self.chrome_full}"',
"sec-ch-ua-platform-version": f'"{random.randint(10, 15)}.0.0"',
})
self.session.cookies.set("oai-did", self.device_id, domain=".openai.com")
self.session.cookies.set("oai-did", self.device_id, domain="chatgpt.com")
seed_oai_device_cookie(self.session, self.device_id)
def _state_from_url(self, url, method="GET"):
state = extract_flow_state(
current_url=normalize_flow_url(url, auth_base=self.AUTH),
auth_base=self.AUTH,
default_method=method,
)
if method:
state.method = str(method).upper()
return state
def _state_from_payload(self, data, current_url=""):
return extract_flow_state(
data=data,
current_url=current_url,
auth_base=self.AUTH,
)
def _state_signature(self, state: FlowState):
return (
state.page_type or "",
state.method or "",
state.continue_url or "",
state.current_url or "",
)
def _is_registration_complete_state(self, state: FlowState):
current_url = (state.current_url or "").lower()
continue_url = (state.continue_url or "").lower()
page_type = state.page_type or ""
return (
page_type in {"callback", "chatgpt_home", "oauth_callback"}
or ("chatgpt.com" in current_url and "redirect_uri" not in current_url)
or ("chatgpt.com" in continue_url and "redirect_uri" not in continue_url and page_type != "external_url")
)
def _state_is_password_registration(self, state: FlowState):
return state.page_type in {"create_account_password", "password"}
def _state_is_email_otp(self, state: FlowState):
target = f"{state.continue_url} {state.current_url}".lower()
return state.page_type == "email_otp_verification" or "email-verification" in target or "email-otp" in target
def _state_is_about_you(self, state: FlowState):
target = f"{state.continue_url} {state.current_url}".lower()
return state.page_type == "about_you" or "about-you" in target
def _state_requires_navigation(self, state: FlowState):
if (state.method or "GET").upper() != "GET":
return False
if state.page_type == "external_url" and state.continue_url:
return True
if state.continue_url and state.continue_url != state.current_url:
return True
return False
def _follow_flow_state(self, state: FlowState, referer=None):
"""跟随服务端返回的 continue_url推进注册状态机。"""
target_url = state.continue_url or state.current_url
if not target_url:
return False, "缺少可跟随的 continue_url"
try:
self._browser_pause()
r = self.session.get(
target_url,
headers=self._headers(
target_url,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
referer=referer,
navigation=True,
),
allow_redirects=True,
timeout=30,
)
final_url = str(r.url)
self._log(f"follow -> {r.status_code} {final_url}")
content_type = (r.headers.get("content-type", "") or "").lower()
if "application/json" in content_type:
try:
next_state = self._state_from_payload(r.json(), current_url=final_url)
except Exception:
next_state = self._state_from_url(final_url)
else:
next_state = self._state_from_url(final_url)
self._log(f"follow state -> {describe_flow_state(next_state)}")
return True, next_state
except Exception as e:
self._log(f"跟随 continue_url 失败: {e}")
return False, str(e)
def _get_cookie_value(self, name, domain_hint=None):
"""读取当前会话中的 Cookie。"""
for cookie in self.session.cookies.jar:
if cookie.name != name:
continue
if domain_hint and domain_hint not in (cookie.domain or ""):
continue
return cookie.value
return ""
def get_next_auth_session_token(self):
"""获取 ChatGPT next-auth 会话 Cookie。"""
return self._get_cookie_value("__Secure-next-auth.session-token", "chatgpt.com")
def fetch_chatgpt_session(self):
"""请求 ChatGPT Session 接口并返回原始会话数据。"""
url = f"{self.BASE}/api/auth/session"
self._browser_pause()
response = self.session.get(
url,
headers=self._headers(
url,
accept="application/json",
referer=f"{self.BASE}/",
fetch_site="same-origin",
),
timeout=30,
)
if response.status_code != 200:
return False, f"/api/auth/session -> HTTP {response.status_code}"
try:
data = response.json()
except Exception as exc:
return False, f"/api/auth/session 返回非 JSON: {exc}"
access_token = str(data.get("accessToken") or "").strip()
if not access_token:
return False, "/api/auth/session 未返回 accessToken"
return True, data
def reuse_session_and_get_tokens(self):
"""
复用注册阶段已建立的 ChatGPT 会话,直接读取 Session / AccessToken。
Returns:
tuple[bool, dict|str]: 成功时返回标准化 token/session 数据;失败时返回错误信息。
"""
state = self.last_registration_state or FlowState()
self._log("步骤 1/4: 跟随注册回调 external_url ...")
if state.page_type == "external_url" or self._state_requires_navigation(state):
ok, followed = self._follow_flow_state(
state,
referer=state.current_url or f"{self.AUTH}/about-you",
)
if not ok:
return False, f"注册回调落地失败: {followed}"
self.last_registration_state = followed
else:
self._log("注册回调已落地,跳过额外跟随")
self._log("步骤 2/4: 检查 __Secure-next-auth.session-token ...")
session_cookie = self.get_next_auth_session_token()
if not session_cookie:
return False, "缺少 __Secure-next-auth.session-token注册回调可能未落地"
self._log("步骤 3/4: 请求 ChatGPT /api/auth/session ...")
ok, session_or_error = self.fetch_chatgpt_session()
if not ok:
return False, session_or_error
session_data = session_or_error
access_token = str(session_data.get("accessToken") or "").strip()
session_token = str(session_data.get("sessionToken") or session_cookie or "").strip()
user = session_data.get("user") or {}
account = session_data.get("account") or {}
jwt_payload = decode_jwt_payload(access_token)
auth_payload = jwt_payload.get("https://api.openai.com/auth") or {}
account_id = (
str(account.get("id") or "").strip()
or str(auth_payload.get("chatgpt_account_id") or "").strip()
)
user_id = (
str(user.get("id") or "").strip()
or str(auth_payload.get("chatgpt_user_id") or "").strip()
or str(auth_payload.get("user_id") or "").strip()
)
normalized = {
"access_token": access_token,
"session_token": session_token,
"account_id": account_id,
"user_id": user_id,
"workspace_id": account_id,
"expires": session_data.get("expires"),
"user": user,
"account": account,
"auth_provider": session_data.get("authProvider"),
"raw_session": session_data,
}
self._log("步骤 4/4: 已从复用会话中提取 accessToken")
if account_id:
self._log(f"Session Account ID: {account_id}")
if user_id:
self._log(f"Session User ID: {user_id}")
return True, normalized
def visit_homepage(self):
"""访问首页,建立 session"""
self._log("访问 ChatGPT 首页...")
url = f"{self.BASE}/"
try:
r = self.session.get(url, headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Upgrade-Insecure-Requests": "1",
}, allow_redirects=True, timeout=30)
self._browser_pause()
r = self.session.get(
url,
headers=self._headers(
url,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
navigation=True,
),
allow_redirects=True,
timeout=30,
)
return r.status_code == 200
except Exception as e:
self._log(f"访问首页失败: {e}")
@@ -146,10 +409,16 @@ class ChatGPTClient:
self._log("获取 CSRF token...")
url = f"{self.BASE}/api/auth/csrf"
try:
r = self.session.get(url, headers={
"Accept": "application/json",
"Referer": f"{self.BASE}/"
}, timeout=30)
r = self.session.get(
url,
headers=self._headers(
url,
accept="application/json",
referer=f"{self.BASE}/",
fetch_site="same-origin",
),
timeout=30,
)
if r.status_code == 200:
data = r.json()
@@ -185,20 +454,21 @@ class ChatGPTClient:
"csrfToken": csrf_token,
"json": "true",
}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
"Referer": f"{self.BASE}/",
"Origin": self.BASE,
}
try:
self._browser_pause()
r = self.session.post(
url,
params=params,
data=form_data,
headers=headers,
headers=self._headers(
url,
accept="application/json",
referer=f"{self.BASE}/",
origin=self.BASE,
content_type="application/x-www-form-urlencoded",
fetch_site="same-origin",
),
timeout=30
)
@@ -213,27 +483,6 @@ class ChatGPTClient:
return None
def authorize(self, url):
"""获取 CSRF token"""
self._log("获取 CSRF token...")
url = f"{self.BASE}/api/auth/csrf"
try:
r = self.session.get(url, headers={
"Accept": "application/json",
"Referer": f"{self.BASE}/"
}, timeout=30)
if r.status_code == 200:
data = r.json()
token = data.get("csrfToken", "")
if token:
self._log(f"CSRF token: {token[:20]}...")
return token
except Exception as e:
self._log(f"获取 CSRF token 失败: {e}")
return None
def authorize(self, url, max_retries=3):
"""
访问 authorize URL跟随重定向带重试机制
@@ -249,12 +498,19 @@ class ChatGPTClient:
time.sleep(1) # 重试前等待
else:
self._log("访问 authorize URL...")
r = self.session.get(url, headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": f"{self.BASE}/",
"Upgrade-Insecure-Requests": "1",
}, allow_redirects=True, timeout=30)
self._browser_pause()
r = self.session.get(
url,
headers=self._headers(
url,
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
referer=f"{self.BASE}/",
navigation=True,
),
allow_redirects=True,
timeout=30,
)
final_url = str(r.url)
self._log(f"重定向到: {final_url}")
@@ -273,19 +529,15 @@ class ChatGPTClient:
return ""
def callback(self):
def callback(self, callback_url=None, referer=None):
"""完成注册回调"""
self._log("执行回调...")
url = f"{self.AUTH}/api/accounts/authorize/callback"
try:
r = self.session.get(url, headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": f"{self.AUTH}/about-you",
}, allow_redirects=True, timeout=30)
return r.status_code == 200
except Exception as e:
self._log(f"回调失败: {e}")
return False
url = callback_url or f"{self.AUTH}/api/accounts/authorize/callback"
ok, _ = self._follow_flow_state(
self._state_from_url(url),
referer=referer or f"{self.AUTH}/about-you",
)
return ok
def register_user(self, email, password):
"""
@@ -297,12 +549,14 @@ class ChatGPTClient:
self._log(f"注册用户: {email}")
url = f"{self.AUTH}/api/accounts/user/register"
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Referer": f"{self.AUTH}/create-account/password",
"Origin": self.AUTH,
}
headers = self._headers(
url,
accept="application/json",
referer=f"{self.AUTH}/create-account/password",
origin=self.AUTH,
content_type="application/json",
fetch_site="same-origin",
)
headers.update(generate_datadog_trace())
payload = {
@@ -311,6 +565,7 @@ class ChatGPTClient:
}
try:
self._browser_pause()
r = self.session.post(url, json=payload, headers=headers, timeout=30)
if r.status_code == 200:
@@ -334,21 +589,26 @@ class ChatGPTClient:
"""触发发送邮箱验证码"""
self._log("触发发送验证码...")
url = f"{self.AUTH}/api/accounts/email-otp/send"
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": f"{self.AUTH}/create-account/password",
"Upgrade-Insecure-Requests": "1",
}
try:
r = self.session.get(url, headers=headers, allow_redirects=True, timeout=30)
self._browser_pause()
r = self.session.get(
url,
headers=self._headers(
url,
accept="application/json, text/plain, */*",
referer=f"{self.AUTH}/create-account/password",
fetch_site="same-origin",
),
allow_redirects=True,
timeout=30,
)
return r.status_code == 200
except Exception as e:
self._log(f"发送验证码失败: {e}")
return False
def verify_email_otp(self, otp_code):
def verify_email_otp(self, otp_code, return_state=False):
"""
验证邮箱 OTP 码
@@ -361,22 +621,30 @@ class ChatGPTClient:
self._log(f"验证 OTP 码: {otp_code}")
url = f"{self.AUTH}/api/accounts/email-otp/validate"
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Referer": f"{self.AUTH}/email-verification",
"Origin": self.AUTH,
}
headers = self._headers(
url,
accept="application/json",
referer=f"{self.AUTH}/email-verification",
origin=self.AUTH,
content_type="application/json",
fetch_site="same-origin",
)
headers.update(generate_datadog_trace())
payload = {"code": otp_code}
try:
self._browser_pause()
r = self.session.post(url, json=payload, headers=headers, timeout=30)
if r.status_code == 200:
self._log("验证成功")
return True, "验证成功"
try:
data = r.json()
except Exception:
data = {}
next_state = self._state_from_payload(data, current_url=str(r.url) or f"{self.AUTH}/about-you")
self._log(f"验证成功 {describe_flow_state(next_state)}")
return (True, next_state) if return_state else (True, "验证成功")
else:
error_msg = r.text[:200]
self._log(f"验证失败: {r.status_code} - {error_msg}")
@@ -386,7 +654,7 @@ class ChatGPTClient:
self._log(f"验证异常: {e}")
return False, str(e)
def create_account(self, first_name, last_name, birthdate):
def create_account(self, first_name, last_name, birthdate, return_state=False):
"""
完成账号创建(提交姓名和生日)
@@ -415,14 +683,17 @@ class ChatGPTClient:
else:
self._log("create_account: 未生成 sentinel token降级继续请求")
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Referer": f"{self.AUTH}/about-you",
"Origin": self.AUTH,
"oai-device-id": self.device_id,
"User-Agent": self.ua,
}
headers = self._headers(
url,
accept="application/json",
referer=f"{self.AUTH}/about-you",
origin=self.AUTH,
content_type="application/json",
fetch_site="same-origin",
extra_headers={
"oai-device-id": self.device_id,
},
)
if sentinel_token:
headers["openai-sentinel-token"] = sentinel_token
headers.update(generate_datadog_trace())
@@ -433,11 +704,17 @@ class ChatGPTClient:
}
try:
self._browser_pause()
r = self.session.post(url, json=payload, headers=headers, timeout=30)
if r.status_code == 200:
self._log("账号创建成功")
return True, "账号创建成功"
try:
data = r.json()
except Exception:
data = {}
next_state = self._state_from_payload(data, current_url=str(r.url) or self.BASE)
self._log(f"账号创建成功 {describe_flow_state(next_state)}")
return (True, next_state) if return_state else (True, "账号创建成功")
else:
error_msg = r.text[:200]
self._log(f"创建失败: {r.status_code} - {error_msg}")
@@ -512,58 +789,84 @@ class ChatGPTClient:
break
# 5. 根据最终 URL 判断状态
need_otp = False
if "create-account/password" in final_path:
self._log("全新注册流程")
success, msg = self.register_user(email, password)
if not success:
return False, f"注册失败: {msg}"
self.send_email_otp()
need_otp = True
elif "email-verification" in final_path or "email-otp" in final_path:
self._log("跳到 OTP 验证阶段")
need_otp = True
elif "about-you" in final_path:
self._log("跳到填写信息阶段")
success, msg = self.create_account(first_name, last_name, birthdate)
if not success:
return False, f"创建账号失败: {msg}"
self.callback()
return True, "注册成功"
elif "callback" in final_path or ("chatgpt.com" in final_url and "redirect_uri" not in final_url):
self._log("账号已完成注册")
return True, "账号已完成注册"
else:
self._log(f"未知跳转: {final_url}")
success, msg = self.register_user(email, password)
if not success:
return False, f"注册失败: {msg}"
self.send_email_otp()
need_otp = True
# 6. 处理 OTP 验证
if need_otp:
self._log("等待邮箱验证码...")
otp_code = skymail_client.wait_for_verification_code(email, timeout=30)
if not otp_code:
return False, "未收到验证码"
success, msg = self.verify_email_otp(otp_code)
if not success:
return False, f"验证码失败: {msg}"
# 7. 完成账号创建
success, msg = self.create_account(first_name, last_name, birthdate)
if not success:
return False, f"创建账号失败: {msg}"
self.callback()
self._log("✅ 注册流程完成")
return True, "注册成功"
state = self._state_from_url(final_url)
self._log(f"注册状态起点: {describe_flow_state(state)}")
register_submitted = False
otp_verified = False
account_created = False
seen_states = {}
for _ in range(12):
signature = self._state_signature(state)
seen_states[signature] = seen_states.get(signature, 0) + 1
if seen_states[signature] > 2:
return False, f"注册状态卡住: {describe_flow_state(state)}"
if self._is_registration_complete_state(state):
self.last_registration_state = state
self._log("✅ 注册流程完成")
return True, "注册成功"
if self._state_is_password_registration(state):
self._log("全新注册流程")
if register_submitted:
return False, "注册密码阶段重复进入"
success, msg = self.register_user(email, password)
if not success:
return False, f"注册失败: {msg}"
register_submitted = True
if not self.send_email_otp():
self._log("发送验证码接口返回失败,继续等待邮箱中的验证码...")
state = self._state_from_url(f"{self.AUTH}/email-verification")
continue
if self._state_is_email_otp(state):
self._log("等待邮箱验证码...")
otp_code = skymail_client.wait_for_verification_code(email, timeout=30)
if not otp_code:
return False, "未收到验证码"
success, next_state = self.verify_email_otp(otp_code, return_state=True)
if not success:
return False, f"验证码失败: {next_state}"
otp_verified = True
state = next_state
self.last_registration_state = state
continue
if self._state_is_about_you(state):
if account_created:
return False, "填写信息阶段重复进入"
success, next_state = self.create_account(
first_name,
last_name,
birthdate,
return_state=True,
)
if not success:
return False, f"创建账号失败: {next_state}"
account_created = True
state = next_state
self.last_registration_state = state
continue
if self._state_requires_navigation(state):
success, next_state = self._follow_flow_state(
state,
referer=state.current_url or f"{self.AUTH}/about-you",
)
if not success:
return False, f"跳转失败: {next_state}"
state = next_state
self.last_registration_state = state
continue
if (not register_submitted) and (not otp_verified) and (not account_created):
self._log(f"未知起始状态,回退为全新注册流程: {describe_flow_state(state)}")
state = self._state_from_url(f"{self.AUTH}/create-account/password")
continue
return False, f"未支持的注册状态: {describe_flow_state(state)}"
return False, "注册状态机超出最大步数"

View File

@@ -7,6 +7,7 @@ import base64
import logging
from typing import Tuple
from datetime import datetime, timezone, timedelta
import hashlib
from curl_cffi import requests as cffi_requests
from curl_cffi import CurlMime
@@ -29,6 +30,122 @@ def _decode_jwt_payload(token: str) -> dict:
return {}
def _b64url_json(data: dict) -> str:
raw = json.dumps(data, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
def _b64url_bytes(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
def _derive_display_name(email: str) -> str:
local = (email or "").split("@", 1)[0].replace(".", " ").replace("_", " ").replace("-", " ")
parts = [part for part in local.split() if part]
if not parts:
return "OpenAI User"
return " ".join(part[:1].upper() + part[1:] for part in parts[:3])
def _get_auth_info(payload: dict) -> dict:
nested = payload.get("https://api.openai.com/auth", {})
if isinstance(nested, dict) and nested:
return nested
flat = {}
for key, value in payload.items():
if key.startswith("https://api.openai.com/auth."):
flat[key.split(".", 4)[-1]] = value
return flat
def _build_compat_id_token(
*,
access_token: str,
email: str,
) -> str:
"""
基于 access_token 构造一个仅供本地 CPA/玩具环境解析的兼容 id_token。
注意:该 token 仅用于不校验签名、只解析 payload 的本地兼容场景。
"""
payload = _decode_jwt_payload(access_token)
if not payload:
return ""
auth_info = _get_auth_info(payload)
email_from_token = ((payload.get("https://api.openai.com/profile") or {}).get("email") or payload.get("email") or email or "").strip()
email_verified = bool(
((payload.get("https://api.openai.com/profile") or {}).get("email_verified"))
if isinstance(payload.get("https://api.openai.com/profile"), dict)
else payload.get("email_verified", True)
)
account_id = str(auth_info.get("chatgpt_account_id") or auth_info.get("account_id") or "").strip()
user_id = str(
auth_info.get("chatgpt_user_id")
or auth_info.get("user_id")
or payload.get("sub")
or ""
).strip()
iat = int(payload.get("iat") or 0)
exp = int(payload.get("exp") or 0)
auth_time = int(payload.get("pwd_auth_time") or payload.get("auth_time") or iat or 0)
session_id = str(payload.get("session_id") or f"compat_session_{(account_id or user_id or 'unknown').replace('-', '')[:24]}").strip()
plan_type = str(auth_info.get("chatgpt_plan_type") or "free").strip() or "free"
organization_id = str(auth_info.get("organization_id") or f"org-{hashlib.sha1((account_id or email_from_token or user_id).encode('utf-8')).hexdigest()[:24]}")
project_id = str(auth_info.get("project_id") or f"proj_{hashlib.sha1((organization_id + ':' + (account_id or user_id)).encode('utf-8')).hexdigest()[:24]}")
compat_auth = {
"chatgpt_account_id": account_id,
"chatgpt_plan_type": plan_type,
"chatgpt_subscription_active_start": auth_info.get("chatgpt_subscription_active_start"),
"chatgpt_subscription_active_until": auth_info.get("chatgpt_subscription_active_until"),
"chatgpt_subscription_last_checked": auth_info.get("chatgpt_subscription_last_checked"),
"chatgpt_user_id": user_id,
"completed_platform_onboarding": bool(auth_info.get("completed_platform_onboarding", False)),
"groups": auth_info.get("groups", []),
"is_org_owner": bool(auth_info.get("is_org_owner", True)),
"localhost": bool(auth_info.get("localhost", True)),
"organization_id": organization_id,
"organizations": auth_info.get("organizations") or [
{
"id": organization_id,
"is_default": True,
"role": "owner",
"title": "Personal",
}
],
"project_id": project_id,
"user_id": str(auth_info.get("user_id") or user_id or "").strip(),
}
compat_payload = {
"amr": ["pwd", "otp", "mfa", "urn:openai:amr:otp_email"],
"at_hash": hashlib.sha256(access_token.encode("utf-8")).hexdigest()[:22],
"aud": ["app_EMoamEEZ73f0CkXaXp7hrann"],
"auth_provider": "password",
"auth_time": auth_time,
"email": email_from_token,
"email_verified": email_verified,
"exp": exp,
"https://api.openai.com/auth": compat_auth,
"iat": iat,
"iss": payload.get("iss") or "https://auth.openai.com",
"jti": f"compat-{hashlib.sha1(access_token.encode('utf-8')).hexdigest()[:32]}",
"name": _derive_display_name(email_from_token),
"rat": auth_time,
"sid": session_id,
"sub": payload.get("sub") or user_id,
}
header = {
"alg": "RS256",
"typ": "JWT",
"kid": "compat",
}
signature = _b64url_bytes(b"compat_signature_for_cpa_parsing_only")
return f"{_b64url_json(header)}.{_b64url_json(compat_payload)}.{signature}"
def _get_config_value(key: str) -> str:
try:
from core.config_store import config_store
@@ -47,12 +164,14 @@ def generate_token_json(account) -> dict:
access_token = getattr(account, "access_token", "")
refresh_token = getattr(account, "refresh_token", "")
id_token = getattr(account, "id_token", "")
if access_token and not id_token:
id_token = _build_compat_id_token(access_token=access_token, email=email)
expired_str = ""
account_id = ""
if access_token:
payload = _decode_jwt_payload(access_token)
auth_info = payload.get("https://api.openai.com/auth", {})
auth_info = _get_auth_info(payload)
account_id = auth_info.get("chatgpt_account_id", "")
exp_timestamp = payload.get("exp")
if isinstance(exp_timestamp, int) and exp_timestamp > 0:

File diff suppressed because it is too large Load Diff

View File

@@ -34,6 +34,7 @@ class ChatGPTPlatform(BasePlatform):
string.ascii_letters + string.digits + "!@#$", k=16))
proxy = self.config.proxy if self.config else None
browser_mode = (self.config.executor_type if self.config else None) or "protocol"
log_fn = getattr(self, '_log_fn', print)
from platforms.chatgpt.register_v2 import RegistrationEngineV2 as RegistrationEngine
log_fn = getattr(self, '_log_fn', print)
@@ -79,7 +80,11 @@ class ChatGPTPlatform(BasePlatform):
engine = RegistrationEngine(
email_service=GenericEmailService(),
proxy_url=proxy, callback_logger=log_fn, max_retries=max_retries)
proxy_url=proxy,
browser_mode=browser_mode,
callback_logger=log_fn,
max_retries=max_retries,
)
engine.email = email
engine.password = password
else:
@@ -107,7 +112,11 @@ class ChatGPTPlatform(BasePlatform):
engine = RegistrationEngine(
email_service=TempMailEmailService(),
proxy_url=proxy, callback_logger=log_fn, max_retries=max_retries)
proxy_url=proxy,
browser_mode=browser_mode,
callback_logger=log_fn,
max_retries=max_retries,
)
if email:
engine.email = email
engine.password = password

View File

@@ -1,11 +1,8 @@
"""
注册流程引擎 V2
使用完全独立的基于 curl_cffi 和 OAuth redirect 流程,完美绕过 add_phone 问题
重用 chatgpt_register_v2_by_AI 中的逻辑。
基于 curl_cffi 的注册状态机,注册成功后直接复用同一会话提取 ChatGPT Session
"""
import sys
import os
import time
import logging
from datetime import datetime
@@ -14,13 +11,7 @@ from typing import Optional, Callable
from core.base_platform import AccountStatus
from platforms.chatgpt.register import RegistrationResult
# 将 chatgpt_register_v2_by_AI 目录加入 Python 路径,方便导入
V2_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "chatgpt_register_v2_by_AI")
if V2_PATH not in sys.path:
sys.path.append(V2_PATH)
from .chatgpt_client import ChatGPTClient
from .oauth_client import OAuthClient
from .utils import generate_random_name, generate_random_birthday
logger = logging.getLogger(__name__)
@@ -51,12 +42,14 @@ class RegistrationEngineV2:
self,
email_service,
proxy_url: Optional[str] = None,
browser_mode: str = "protocol",
callback_logger: Optional[Callable[[str], None]] = None,
task_uuid: Optional[str] = None,
max_retries: int = 3,
):
self.email_service = email_service
self.proxy_url = proxy_url
self.browser_mode = browser_mode or "protocol"
self.callback_logger = callback_logger
self.task_uuid = task_uuid
self.max_retries = max(1, int(max_retries or 1))
@@ -79,7 +72,6 @@ class RegistrationEngineV2:
def _should_retry(self, message: str) -> bool:
text = str(message or "").lower()
retriable_markers = [
"oauth",
"tls",
"ssl",
"curl: (35)",
@@ -94,6 +86,9 @@ class RegistrationEngineV2:
"organization",
"otp",
"验证码",
"session",
"accessToken",
"next-auth",
]
return any(marker.lower() in text for marker in retriable_markers)
@@ -105,7 +100,8 @@ class RegistrationEngineV2:
try:
if attempt == 0:
self._log("=" * 60)
self._log("开始注册流程 V2 (OAuth Curl_cffi绕过风控)")
self._log("开始注册流程 V2 (Session 复用直取 AccessToken)")
self._log(f"请求模式: {self.browser_mode}")
self._log("=" * 60)
else:
self._log(f"整流程重试 {attempt + 1}/{self.max_retries} ...")
@@ -134,10 +130,14 @@ class RegistrationEngineV2:
skymail_adapter = EmailServiceAdapter(self.email_service, email_addr, self._log)
# 2. 初始化 V2 客户端
chatgpt_client = ChatGPTClient(proxy=self.proxy_url, verbose=False)
chatgpt_client = ChatGPTClient(
proxy=self.proxy_url,
verbose=False,
browser_mode=self.browser_mode,
)
chatgpt_client._log = self._log
self._log("开始执行完整注册认证流(OAuth Redirect)...")
self._log("步骤 1/2: 执行注册状态机...")
success, msg = chatgpt_client.register_complete_flow(
email_addr, pwd, first_name, last_name, birthdate, skymail_adapter
@@ -151,52 +151,37 @@ class RegistrationEngineV2:
result.error_message = last_error
return result
self._log("新账号已创建,注册流完成,开始无缝获取 OAuth AccessToken...")
# 3. 初始化 OAuth V2 客户端
oauth_client = OAuthClient(config={}, proxy=self.proxy_url, verbose=False)
oauth_client._log = self._log
oauth_client.session = chatgpt_client.session
self._log("步骤 2/2: 复用注册会话,直接获取 ChatGPT Session / AccessToken...")
session_ok, session_result = chatgpt_client.reuse_session_and_get_tokens()
tokens = oauth_client.login_and_get_tokens(
email_addr, pwd,
chatgpt_client.device_id,
chatgpt_client.ua,
chatgpt_client.sec_ch_ua,
chatgpt_client.impersonate,
skymail_adapter
)
if tokens and tokens.get("access_token"):
self._log("Token 换取完成!")
if session_ok:
self._log("Token 提取完成!")
result.success = True
result.access_token = tokens.get("access_token")
result.refresh_token = tokens.get("refresh_token")
result.id_token = tokens.get("id_token")
result.account_id = "v2_acct_" + chatgpt_client.device_id[:8]
result.access_token = session_result.get("access_token", "")
result.session_token = session_result.get("session_token", "")
result.account_id = (
session_result.get("account_id")
or session_result.get("user_id")
or ("v2_acct_" + chatgpt_client.device_id[:8])
)
result.workspace_id = session_result.get("workspace_id", "")
result.metadata = {
"auth_provider": session_result.get("auth_provider", ""),
"expires": session_result.get("expires", ""),
"user_id": session_result.get("user_id", ""),
"user": session_result.get("user") or {},
"account": session_result.get("account") or {},
}
# 从认证后的 Cookie 结构体里直接解析 Workspace
session_data = oauth_client._decode_oauth_session_cookie()
if session_data:
workspaces = session_data.get("workspaces", [])
if workspaces:
result.workspace_id = str((workspaces[0] or {}).get("id") or "")
self._log(f"成功萃取 Workspace ID: {result.workspace_id}")
else:
self._log("oai-client-auth-session 中仍无 workspace信息但这通常是正常情况重试即可", "warning")
session_cookie = None
for cookie in oauth_client.session.cookies.jar:
if cookie.name == "__Secure-next-auth.session-token":
session_cookie = cookie.value
break
result.session_token = session_cookie
if result.workspace_id:
self._log(f"Session Workspace ID: {result.workspace_id}")
self._log("=" * 60)
self._log("注册流程成功结束!")
self._log("=" * 60)
return result
last_error = "成功创建了账号但获取最终 OAuth Tokens 失败"
last_error = f"注册成功,但复用会话获取 AccessToken 失败: {session_result}"
if attempt < self.max_retries - 1:
self._log(f"{last_error},准备整流程重试")
continue

View File

@@ -2,12 +2,29 @@
通用工具函数模块
"""
from dataclasses import dataclass, field
import random
import string
import secrets
import hashlib
import base64
import uuid
import re
from urllib.parse import urlparse
from typing import Any, Dict
@dataclass
class FlowState:
"""OpenAI Auth/Registration 流程中的页面状态。"""
page_type: str = ""
continue_url: str = ""
method: str = "GET"
current_url: str = ""
source: str = ""
payload: Dict[str, Any] = field(default_factory=dict)
raw: Dict[str, Any] = field(default_factory=dict)
def generate_device_id():
@@ -105,7 +122,241 @@ def extract_code_from_url(url):
return None
def normalize_page_type(value):
"""将 page.type 归一化为便于分支判断的 snake_case。"""
return str(value or "").strip().lower().replace("-", "_").replace("/", "_").replace(" ", "_")
def normalize_flow_url(url, auth_base="https://auth.openai.com"):
"""将 continue_url / payload.url 归一化成绝对 URL。"""
value = str(url or "").strip()
if not value:
return ""
if value.startswith("//"):
return f"https:{value}"
if value.startswith("/"):
return f"{auth_base.rstrip('/')}{value}"
return value
def infer_page_type_from_url(url):
"""从 URL 推断流程状态,用于服务端未返回 page.type 时兜底。"""
if not url:
return ""
try:
parsed = urlparse(url)
except Exception:
return ""
host = (parsed.netloc or "").lower()
path = (parsed.path or "").lower()
if "code=" in (parsed.query or ""):
return "oauth_callback"
if "chatgpt.com" in host and "/api/auth/callback/" in path:
return "callback"
if "create-account/password" in path:
return "create_account_password"
if "email-verification" in path or "email-otp" in path:
return "email_otp_verification"
if "about-you" in path:
return "about_you"
if "log-in/password" in path:
return "login_password"
if "sign-in-with-chatgpt" in path and "consent" in path:
return "consent"
if "workspace" in path and "select" in path:
return "workspace_selection"
if "organization" in path and "select" in path:
return "organization_selection"
if "add-phone" in path:
return "add_phone"
if "callback" in path:
return "callback"
if "chatgpt.com" in host and path in {"", "/"}:
return "chatgpt_home"
if path:
return normalize_page_type(path.strip("/").replace("/", "_"))
return ""
def extract_flow_state(data=None, current_url="", auth_base="https://auth.openai.com", default_method="GET"):
"""从 API 响应或 URL 中提取统一的流程状态。"""
raw = data if isinstance(data, dict) else {}
page = raw.get("page") or {}
payload = page.get("payload") or {}
continue_url = normalize_flow_url(
raw.get("continue_url") or payload.get("url") or "",
auth_base=auth_base,
)
effective_current_url = continue_url if raw and continue_url else current_url
current = normalize_flow_url(effective_current_url or continue_url, auth_base=auth_base)
page_type = normalize_page_type(page.get("type")) or infer_page_type_from_url(continue_url or current)
method = str(raw.get("method") or payload.get("method") or default_method or "GET").upper()
return FlowState(
page_type=page_type,
continue_url=continue_url,
method=method,
current_url=current,
source="api" if raw else "url",
payload=payload if isinstance(payload, dict) else {},
raw=raw,
)
def describe_flow_state(state: FlowState):
"""生成简短的流程状态描述,便于记录日志。"""
target = state.continue_url or state.current_url or "-"
return f"page={state.page_type or '-'} method={state.method or '-'} next={target[:80]}..."
def random_delay(low=0.3, high=1.0):
"""随机延迟"""
import time
time.sleep(random.uniform(low, high))
def extract_chrome_full_version(user_agent):
"""从 UA 中提取完整的 Chrome 版本号。"""
if not user_agent:
return ""
match = re.search(r"Chrome/([0-9.]+)", user_agent)
return match.group(1) if match else ""
def _registrable_domain(hostname):
"""粗略提取可注册域名,用于推断 Sec-Fetch-Site。"""
if not hostname:
return ""
host = hostname.split(":")[0].strip(".").lower()
parts = [part for part in host.split(".") if part]
if len(parts) <= 2:
return ".".join(parts)
return ".".join(parts[-2:])
def infer_sec_fetch_site(url, referer=None, navigation=False):
"""根据目标 URL 和 Referer 推断 Sec-Fetch-Site。"""
if not referer:
return "none" if navigation else "same-origin"
try:
target = urlparse(url or "")
source = urlparse(referer or "")
if not target.scheme or not target.netloc or not source.netloc:
return "none" if navigation else "same-origin"
if (target.scheme, target.netloc) == (source.scheme, source.netloc):
return "same-origin"
if _registrable_domain(target.hostname) == _registrable_domain(source.hostname):
return "same-site"
except Exception:
pass
return "cross-site"
def build_sec_ch_ua_full_version_list(sec_ch_ua, chrome_full_version):
"""根据 sec-ch-ua 生成 sec-ch-ua-full-version-list。"""
if not sec_ch_ua or not chrome_full_version:
return ""
entries = []
for brand, version in re.findall(r'"([^"]+)";v="([^"]+)"', sec_ch_ua):
full_version = chrome_full_version if brand in {"Chromium", "Google Chrome"} else f"{version}.0.0.0"
entries.append(f'"{brand}";v="{full_version}"')
return ", ".join(entries)
def build_browser_headers(
*,
url,
user_agent,
sec_ch_ua=None,
chrome_full_version=None,
accept=None,
accept_language="en-US,en;q=0.9",
referer=None,
origin=None,
content_type=None,
navigation=False,
fetch_mode=None,
fetch_dest=None,
fetch_site=None,
headed=False,
extra_headers=None,
):
"""构造更接近真实 Chrome 有头浏览器的请求头。"""
chrome_full = chrome_full_version or extract_chrome_full_version(user_agent)
full_version_list = build_sec_ch_ua_full_version_list(sec_ch_ua, chrome_full)
headers = {
"User-Agent": user_agent or "Mozilla/5.0",
"Accept-Language": accept_language,
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-ch-ua-arch": '"x86"',
"sec-ch-ua-bitness": '"64"',
}
if accept:
headers["Accept"] = accept
if referer:
headers["Referer"] = referer
if origin:
headers["Origin"] = origin
if content_type:
headers["Content-Type"] = content_type
if sec_ch_ua:
headers["sec-ch-ua"] = sec_ch_ua
if chrome_full:
headers["sec-ch-ua-full-version"] = f'"{chrome_full}"'
headers["sec-ch-ua-platform-version"] = '"15.0.0"'
if full_version_list:
headers["sec-ch-ua-full-version-list"] = full_version_list
if navigation:
headers["Sec-Fetch-Dest"] = "document"
headers["Sec-Fetch-Mode"] = "navigate"
headers["Sec-Fetch-User"] = "?1"
headers["Upgrade-Insecure-Requests"] = "1"
headers["Cache-Control"] = "max-age=0"
else:
headers["Sec-Fetch-Dest"] = fetch_dest or "empty"
headers["Sec-Fetch-Mode"] = fetch_mode or "cors"
headers["Sec-Fetch-Site"] = fetch_site or infer_sec_fetch_site(url, referer, navigation=navigation)
if headed:
headers.setdefault("Priority", "u=0, i" if navigation else "u=1, i")
headers.setdefault("DNT", "1")
headers.setdefault("Sec-GPC", "1")
if extra_headers:
for key, value in extra_headers.items():
if value is not None:
headers[key] = value
return headers
def seed_oai_device_cookie(session, device_id):
"""在 ChatGPT / OpenAI 相关域上同步设置 oai-did。"""
for domain in (
"chatgpt.com",
".chatgpt.com",
"openai.com",
".openai.com",
"auth.openai.com",
".auth.openai.com",
):
try:
session.cookies.set("oai-did", device_id, domain=domain)
except Exception:
continue

View File

@@ -65,19 +65,38 @@ function Stop-ProcessTreeSafe {
}
Write-Host "[INFO] 尝试优雅停止 PID=$ProcessId"
cmd /c "taskkill /PID $ProcessId /T" *> $null
try {
& taskkill.exe /PID $ProcessId /T *> $null
} catch {
Write-Warning "taskkill 优雅停止返回异常: $($_.Exception.Message)"
}
if (Wait-ProcessExit -ProcessId $ProcessId -TimeoutSeconds 6) {
Write-Host "[OK] 已停止 PID=$ProcessId"
return $true
}
Write-Warning "PID=$ProcessId 未在预期时间退出,改为强制停止"
cmd /c "taskkill /PID $ProcessId /T /F" *> $null
try {
& taskkill.exe /PID $ProcessId /T /F *> $null
} catch {
Write-Warning "taskkill 强制停止返回异常: $($_.Exception.Message)"
}
if (Wait-ProcessExit -ProcessId $ProcessId -TimeoutSeconds 6) {
Write-Host "[OK] 已强制停止 PID=$ProcessId"
return $true
}
Write-Warning "taskkill 未能完全停止 PID=$ProcessId,尝试使用 Stop-Process -Force"
try {
Stop-Process -Id $ProcessId -Force -ErrorAction Stop
} catch {
Write-Warning "Stop-Process -Force 失败: $($_.Exception.Message)"
}
if (Wait-ProcessExit -ProcessId $ProcessId -TimeoutSeconds 6) {
Write-Host "[OK] 已通过 Stop-Process 强制停止 PID=$ProcessId"
return $true
}
Write-Warning "PID=$ProcessId 停止失败"
return $false
}