Add ChatGPT phone verification with SMSToMe

This commit is contained in:
highkay
2026-03-30 17:41:24 +08:00
parent e09730deb6
commit d905b2f7d8
14 changed files with 2136 additions and 47 deletions

5
.gitignore vendored
View File

@@ -52,4 +52,7 @@ data/
*.swp
.claude/
.docs/superpowers/
CLAUDE.md
CLAUDE.md
.ace-tool/
smstome*_numbers.txt
smstome_used/

View File

@@ -14,6 +14,7 @@ CONFIG_KEYS = [
"mail_provider",
"cfworker_api_url", "cfworker_admin_token", "cfworker_custom_auth", "cfworker_domain", "cfworker_fingerprint",
"smstome_cookie", "smstome_country_slugs", "smstome_phone_attempts", "smstome_otp_timeout_seconds",
"smstome_poll_interval_seconds", "smstome_sync_max_pages_per_country",
"luckmail_base_url", "luckmail_api_key", "luckmail_email_type", "luckmail_domain",
"cpa_api_url", "cpa_api_key",
"team_manager_url", "team_manager_key",

View File

@@ -1,9 +1,118 @@
"""全局配置持久化 - 存储在 SQLite"""
"""全局配置持久化 - 存储在 SQLite,并在缺省时回退到环境变量/.env。"""
import os
import re
from pathlib import Path
from typing import Optional
from sqlmodel import Field, SQLModel, Session, select
from .db import engine
_ENV_FILE = Path(__file__).resolve().parent.parent / ".env"
def _normalize_config_value(value) -> str:
text = str(value or "").strip()
if len(text) >= 2 and text[0] == text[-1] and text[0] in {"'", '"'}:
return text[1:-1]
return text
def _canonical_config_key(key: str) -> str:
value = str(key or "").strip()
if not value:
return ""
return re.sub(r"[^a-z0-9]+", "_", value.lower()).strip("_")
def _config_key_candidates(key: str) -> list[str]:
raw = str(key or "").strip()
if not raw:
return []
normalized = re.sub(r"[^A-Za-z0-9]+", "_", raw).strip("_")
candidates: list[str] = []
seen = set()
for item in (
raw,
raw.lower(),
raw.upper(),
normalized,
normalized.lower(),
normalized.upper(),
):
value = str(item or "").strip()
if value and value not in seen:
seen.add(value)
candidates.append(value)
return candidates
def _load_env_file(path: Path | str | None = None) -> dict[str, str]:
env_path = Path(path or _ENV_FILE)
if not env_path.exists():
return {}
try:
lines = env_path.read_text(encoding="utf-8", errors="ignore").splitlines()
except Exception:
return {}
values: dict[str, str] = {}
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if line.lower().startswith("export "):
line = line[7:].strip()
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
if not key:
continue
values[key] = _normalize_config_value(value)
return values
def _runtime_env_values() -> dict[str, str]:
values: dict[str, str] = {}
for key, value in _load_env_file().items():
text = _normalize_config_value(value)
if text:
values[key] = text
for key, value in os.environ.items():
text = _normalize_config_value(value)
if text:
values[key] = text
return values
def _get_env_fallback_value(key: str, env_values: Optional[dict[str, str]] = None) -> str:
values = env_values if env_values is not None else _runtime_env_values()
for candidate in _config_key_candidates(key):
text = str(values.get(candidate, "") or "").strip()
if text:
return text
return ""
def _merge_env_fallback(values: dict[str, str], env_values: Optional[dict[str, str]] = None) -> dict[str, str]:
merged = dict(values or {})
runtime_values = env_values if env_values is not None else _runtime_env_values()
for env_key, env_value in runtime_values.items():
text = str(env_value or "").strip()
if not text:
continue
canonical_key = _canonical_config_key(env_key)
for target_key in (env_key, canonical_key):
if not target_key:
continue
if str(merged.get(target_key, "") or "").strip():
continue
merged[target_key] = text
return merged
class ConfigItem(SQLModel, table=True):
__tablename__ = "configs"
key: str = Field(primary_key=True)
@@ -14,9 +123,14 @@ class ConfigStore:
"""简单 key-value 配置存储"""
def get(self, key: str, default: str = "") -> str:
env_values = _runtime_env_values()
with Session(engine) as s:
item = s.get(ConfigItem, key)
return item.value if item else default
value = str(item.value if item else "" or "").strip()
if value:
return value
fallback = _get_env_fallback_value(key, env_values=env_values)
return fallback or default
def set(self, key: str, value: str) -> None:
with Session(engine) as s:
@@ -31,7 +145,8 @@ class ConfigStore:
def get_all(self) -> dict:
with Session(engine) as s:
items = s.exec(select(ConfigItem)).all()
return {i.key: i.value for i in items}
values = {i.key: i.value for i in items}
return _merge_env_fallback(values)
def set_many(self, data: dict) -> None:
with Session(engine) as s:

View File

@@ -316,6 +316,8 @@ export default function Accounts() {
smstome_country_slugs: cfg.smstome_country_slugs,
smstome_phone_attempts: cfg.smstome_phone_attempts,
smstome_otp_timeout_seconds: cfg.smstome_otp_timeout_seconds,
smstome_poll_interval_seconds: cfg.smstome_poll_interval_seconds,
smstome_sync_max_pages_per_country: cfg.smstome_sync_max_pages_per_country,
},
}),
})

View File

@@ -55,6 +55,8 @@ export default function Register() {
smstome_country_slugs: cfg.smstome_country_slugs || '',
smstome_phone_attempts: cfg.smstome_phone_attempts || '',
smstome_otp_timeout_seconds: cfg.smstome_otp_timeout_seconds || '',
smstome_poll_interval_seconds: cfg.smstome_poll_interval_seconds || '',
smstome_sync_max_pages_per_country: cfg.smstome_sync_max_pages_per_country || '',
luckmail_base_url: cfg.luckmail_base_url || 'https://mails.luckyous.com/',
luckmail_api_key: cfg.luckmail_api_key || '',
luckmail_email_type: cfg.luckmail_email_type || '',
@@ -98,6 +100,8 @@ export default function Register() {
smstome_country_slugs: values.smstome_country_slugs,
smstome_phone_attempts: values.smstome_phone_attempts,
smstome_otp_timeout_seconds: values.smstome_otp_timeout_seconds,
smstome_poll_interval_seconds: values.smstome_poll_interval_seconds,
smstome_sync_max_pages_per_country: values.smstome_sync_max_pages_per_country,
luckmail_base_url: values.luckmail_base_url,
luckmail_api_key: values.luckmail_api_key,
luckmail_email_type: values.luckmail_email_type,
@@ -260,6 +264,32 @@ export default function Register() {
)}
</Card>
{platform === 'chatgpt' && (
<Card title="ChatGPT 手机验证" style={{ marginBottom: 16 }}>
<Text type="secondary" style={{ display: 'block', marginBottom: 12 }}>
OAuth `add_phone` 使
</Text>
<Form.Item name="smstome_cookie" label="SMSToMe Cookie">
<Input.Password placeholder="cf_clearance=...; PHPSESSID=..." />
</Form.Item>
<Form.Item name="smstome_country_slugs" label="国家列表">
<Input placeholder="united-kingdom,poland,finland" />
</Form.Item>
<Form.Item name="smstome_phone_attempts" label="手机号尝试次数">
<Input placeholder="3" />
</Form.Item>
<Form.Item name="smstome_otp_timeout_seconds" label="短信等待秒数">
<Input placeholder="45" />
</Form.Item>
<Form.Item name="smstome_poll_interval_seconds" label="轮询间隔秒数">
<Input placeholder="5" />
</Form.Item>
<Form.Item name="smstome_sync_max_pages_per_country" label="每国同步页数">
<Input placeholder="5" />
</Form.Item>
</Card>
)}
{captchaSolver === 'yescaptcha' && (
<Card title="验证码配置" style={{ marginBottom: 16 }}>
<Form.Item name="yescaptcha_key" label="YesCaptcha Key">

View File

@@ -163,6 +163,8 @@ const TAB_ITEMS = [
{ key: 'smstome_country_slugs', label: '国家列表', placeholder: 'united-kingdom,poland' },
{ key: 'smstome_phone_attempts', label: '手机号尝试次数', placeholder: '3' },
{ key: 'smstome_otp_timeout_seconds', label: '短信等待秒数', placeholder: '45' },
{ key: 'smstome_poll_interval_seconds', label: '轮询间隔秒数', placeholder: '5' },
{ key: 'smstome_sync_max_pages_per_country', label: '每国同步页数', placeholder: '5' },
],
},
],

View File

@@ -823,7 +823,7 @@ class ChatGPTClient:
if self._state_is_email_otp(state):
self._log("等待邮箱验证码...")
otp_code = skymail_client.wait_for_verification_code(email, timeout=30)
otp_code = skymail_client.wait_for_verification_code(email, timeout=90)
if not otp_code:
return False, "未收到验证码"

View File

@@ -11,6 +11,7 @@ try:
except ImportError:
import requests as curl_requests
from .phone_service import SMSToMePhoneService
from .utils import (
FlowState,
build_browser_headers,
@@ -38,12 +39,14 @@ class OAuthClient:
verbose: 是否输出详细日志
browser_mode: protocol | headless | headed
"""
self.oauth_issuer = config.get("oauth_issuer", "https://auth.openai.com")
self.oauth_client_id = config.get("oauth_client_id", "app_EMoamEEZ73f0CkXaXp7hrann")
self.oauth_redirect_uri = config.get("oauth_redirect_uri", "http://localhost:1455/auth/callback")
self.config = dict(config or {})
self.oauth_issuer = self.config.get("oauth_issuer", "https://auth.openai.com")
self.oauth_client_id = self.config.get("oauth_client_id", "app_EMoamEEZ73f0CkXaXp7hrann")
self.oauth_redirect_uri = self.config.get("oauth_redirect_uri", "http://localhost:1455/auth/callback")
self.proxy = proxy
self.verbose = verbose
self.browser_mode = browser_mode or "protocol"
self.last_error = ""
# 创建 session
self.session = curl_requests.Session()
@@ -55,11 +58,110 @@ class OAuthClient:
if self.verbose:
print(f" [OAuth] {msg}")
def _set_error(self, message):
self.last_error = str(message or "").strip()
if self.last_error:
self._log(self.last_error)
def _browser_pause(self, low=0.15, high=0.4):
"""在 headed 模式下注入轻微延迟,模拟真实浏览器操作节奏。"""
if self.browser_mode == "headed":
random_delay(low, high)
@staticmethod
def _iter_text_fragments(value):
if isinstance(value, str):
text = value.strip()
if text:
yield text
return
if isinstance(value, dict):
for item in value.values():
yield from OAuthClient._iter_text_fragments(item)
return
if isinstance(value, (list, tuple, set)):
for item in value:
yield from OAuthClient._iter_text_fragments(item)
@classmethod
def _should_blacklist_phone_failure(cls, detail="", state: FlowState | None = None):
fragments = [str(detail or "").strip()]
if state is not None:
fragments.extend(
cls._iter_text_fragments(
{
"page_type": state.page_type,
"continue_url": state.continue_url,
"current_url": state.current_url,
"payload": state.payload,
"raw": state.raw,
}
)
)
combined = " | ".join(fragment for fragment in fragments if fragment).lower()
if not combined:
return False
non_blacklist_markers = (
"whatsapp",
"未收到短信验证码",
"手机号验证码错误",
"phone-otp/resend",
"phone-otp/validate 异常",
"phone-otp/validate 响应不是 json",
"phone-otp/validate 失败",
"timeout",
"timed out",
"network",
"connection",
"proxy",
"ssl",
"tls",
"captcha",
"too many phone",
"too many phone numbers",
"too many verification requests",
"验证请求过多",
"接受短信次数过多",
"session limit",
"rate limit",
)
if any(marker in combined for marker in non_blacklist_markers):
return False
blacklist_markers = (
"phone number is invalid",
"invalid phone number",
"invalid phone",
"phone number invalid",
"sms verification failed",
"send sms verification failed",
"unable to send sms",
"not a valid mobile number",
"unsupported phone number",
"phone number not supported",
"carrier not supported",
"电话号码无效",
"手机号无效",
"发送短信验证失败",
"号码无效",
"号码不支持",
"手机号不支持",
)
return any(marker in combined for marker in blacklist_markers)
def _blacklist_phone_if_needed(self, phone_service, entry, detail="", state: FlowState | None = None):
if not entry or not self._should_blacklist_phone_failure(detail, state):
return False
try:
phone_service.mark_blacklisted(entry.phone)
self._log(f"已将手机号加入黑名单: {entry.phone}")
return True
except Exception as e:
self._log(f"写入手机号黑名单失败: {e}")
return False
def _headers(
self,
url,
@@ -142,6 +244,10 @@ class OAuthClient:
target = f"{state.continue_url} {state.current_url}".lower()
return state.page_type == "email_otp_verification" or "email-verification" in target or "email-otp" in target
def _state_is_add_phone(self, state: FlowState):
target = f"{state.continue_url} {state.current_url}".lower()
return state.page_type == "add_phone" or "add-phone" in target
def _state_requires_navigation(self, state: FlowState):
method = (state.method or "GET").upper()
if method != "GET":
@@ -335,7 +441,7 @@ class OAuthClient:
impersonate=impersonate,
)
if not sentinel_token:
self._log("无法获取 sentinel token (authorize_continue)")
self._set_error("无法获取 sentinel token (authorize_continue)")
return None
request_url = f"{self.oauth_issuer}/api/accounts/authorize/continue"
@@ -391,7 +497,7 @@ class OAuthClient:
self._log(f"/authorize/continue(重试) -> {r.status_code}")
if r.status_code != 200:
self._log(f"提交邮箱失败: {r.text[:180]}")
self._set_error(f"提交邮箱失败: {r.status_code} - {r.text[:180]}")
return None
data = r.json()
@@ -399,7 +505,7 @@ class OAuthClient:
self._log(describe_flow_state(flow_state))
return flow_state
except Exception as e:
self._log(f"提交邮箱异常: {e}")
self._set_error(f"提交邮箱异常: {e}")
return None
def _submit_password_verify(self, password, device_id, *, user_agent=None, sec_ch_ua=None, impersonate=None, referer=None):
@@ -415,7 +521,7 @@ class OAuthClient:
impersonate=impersonate,
)
if not sentinel_pwd:
self._log("无法获取 sentinel token (password_verify)")
self._set_error("无法获取 sentinel token (password_verify)")
return None
request_url = f"{self.oauth_issuer}/api/accounts/password/verify"
@@ -445,7 +551,7 @@ class OAuthClient:
self._log(f"/password/verify -> {r.status_code}")
if r.status_code != 200:
self._log(f"密码验证失败: {r.text[:180]}")
self._set_error(f"密码验证失败: {r.status_code} - {r.text[:180]}")
return None
data = r.json()
@@ -453,7 +559,7 @@ class OAuthClient:
self._log(f"verify {describe_flow_state(flow_state)}")
return flow_state
except Exception as e:
self._log(f"密码验证异常: {e}")
self._set_error(f"密码验证异常: {e}")
return None
def login_and_get_tokens(self, email, password, device_id, user_agent=None, sec_ch_ua=None, impersonate=None, skymail_client=None):
@@ -472,6 +578,7 @@ class OAuthClient:
Returns:
dict: tokens 字典,包含 access_token, refresh_token, id_token
"""
self.last_error = ""
self._log("开始 OAuth 登录流程...")
code_verifier, code_challenge = generate_pkce()
@@ -499,7 +606,7 @@ class OAuthClient:
impersonate=impersonate,
)
if not authorize_final_url:
self._log("Bootstrap 失败")
self._set_error("Bootstrap 失败")
return None
continue_referer = (
@@ -519,6 +626,8 @@ class OAuthClient:
authorize_params=authorize_params,
)
if not state:
if not self.last_error:
self._set_error("提交邮箱后未进入有效的 OAuth 状态")
return None
self._log(f"OAuth 状态起点: {describe_flow_state(state)}")
@@ -529,7 +638,7 @@ class OAuthClient:
signature = self._state_signature(state)
seen_states[signature] = seen_states.get(signature, 0) + 1
if seen_states[signature] > 2:
self._log(f"OAuth 状态卡住: {describe_flow_state(state)}")
self._set_error(f"OAuth 状态卡住: {describe_flow_state(state)}")
return None
code = self._extract_code_from_state(state)
@@ -553,6 +662,8 @@ class OAuthClient:
referer=state.current_url or state.continue_url or referer,
)
if not next_state:
if not self.last_error:
self._set_error("密码验证后未进入下一步 OAuth 状态")
return None
referer = state.current_url or referer
state = next_state
@@ -560,7 +671,7 @@ class OAuthClient:
if self._state_is_email_otp(state):
if not skymail_client:
self._log("当前流程需要邮箱 OTP但缺少接码客户端")
self._set_error("当前流程需要邮箱 OTP但缺少接码客户端")
return None
next_state = self._handle_otp_verification(
email,
@@ -572,6 +683,24 @@ class OAuthClient:
state,
)
if not next_state:
if not self.last_error:
self._set_error("邮箱 OTP 验证后未进入下一步 OAuth 状态")
return None
referer = state.current_url or referer
state = next_state
continue
if self._state_is_add_phone(state):
next_state = self._handle_add_phone_verification(
device_id,
user_agent,
sec_ch_ua,
impersonate,
state,
)
if not next_state:
if not self.last_error:
self._set_error("手机号验证后未进入下一步 OAuth 状态")
return None
referer = state.current_url or referer
state = next_state
@@ -621,10 +750,14 @@ class OAuthClient:
self._log(f"workspace state -> {describe_flow_state(state)}")
continue
self._log(f"未支持的 OAuth 状态: {describe_flow_state(state)}")
if not self.last_error:
self._set_error(f"workspace/org 选择失败: {describe_flow_state(state)}")
return None
self._set_error(f"未支持的 OAuth 状态: {describe_flow_state(state)}")
return None
self._log("OAuth 状态机超出最大步数")
self._set_error("OAuth 状态机超出最大步数")
return None
def _extract_code_from_url(self, url):
@@ -664,17 +797,17 @@ class OAuthClient:
self._log(f"无法获取 consent session 数据 (尝试 {attempt + 1}/{max_retries})")
time.sleep(0.3)
else:
self._log("无法获取 consent session 数据")
self._set_error("无法获取 consent session 数据")
return None, None
workspaces = session_data.get("workspaces", [])
if not workspaces:
self._log("session 中没有 workspace 信息")
self._set_error("session 中没有 workspace 信息")
return None, None
workspace_id = (workspaces[0] or {}).get("id")
if not workspace_id:
self._log("workspace_id 为空")
self._set_error("workspace_id 为空")
return None, None
self._log(f"选择 workspace: {workspace_id}")
@@ -794,7 +927,7 @@ class OAuthClient:
return self._extract_code_from_state(org_state), org_state
return None, org_state
except Exception as e:
self._log(f"解析 organization/select 响应异常: {e}")
self._set_error(f"解析 organization/select 响应异常: {e}")
# 如果有 continue_url跟随它
if continue_url:
@@ -804,10 +937,12 @@ class OAuthClient:
return None, workspace_state
except Exception as e:
self._log(f"处理 workspace/select 响应异常: {e}")
self._set_error(f"处理 workspace/select 响应异常: {e}")
return None, None
except Exception as e:
self._log(f"workspace/select 异常: {e}")
self._set_error(f"workspace/select 异常: {e}")
return None, None
return None, None
@@ -951,9 +1086,6 @@ class OAuthClient:
def _decode_oauth_session_cookie(self):
"""解码 oai-client-auth-session cookie"""
import json
import base64
try:
for cookie in self.session.cookies:
try:
@@ -961,19 +1093,44 @@ class OAuthClient:
if name == "oai-client-auth-session":
value = cookie.value if hasattr(cookie, 'value') else self.session.cookies.get(name)
if value:
padded = value + "=" * (-len(value) % 4)
try:
decoded = base64.b64decode(padded).decode('utf-8')
except Exception:
decoded = base64.urlsafe_b64decode(padded).decode('utf-8')
data = json.loads(decoded)
return data
data = self._decode_cookie_json_value(value)
if data:
return data
except Exception:
continue
except Exception:
pass
return None
@staticmethod
def _decode_cookie_json_value(value):
import base64
import json
raw_value = str(value or "").strip()
if not raw_value:
return None
candidates = [raw_value]
if "." in raw_value:
candidates.insert(0, raw_value.split(".", 1)[0])
for candidate in candidates:
candidate = candidate.strip()
if not candidate:
continue
padded = candidate + "=" * (-len(candidate) % 4)
for decoder in (base64.urlsafe_b64decode, base64.b64decode):
try:
decoded = decoder(padded).decode("utf-8")
parsed = json.loads(decoded)
except Exception:
continue
if isinstance(parsed, dict):
return parsed
return None
def _exchange_code_for_tokens(self, code, code_verifier, user_agent, impersonate):
"""用 authorization code 换取 tokens"""
@@ -1008,12 +1165,222 @@ class OAuthClient:
if r.status_code == 200:
return r.json()
else:
self._log(f"换取 tokens 失败: {r.status_code} - {r.text[:200]}")
self._set_error(f"换取 tokens 失败: {r.status_code} - {r.text[:200]}")
except Exception as e:
self._log(f"换取 tokens 异常: {e}")
self._set_error(f"换取 tokens 异常: {e}")
return None
def _send_phone_number(self, phone, device_id, user_agent, sec_ch_ua, impersonate):
request_url = f"{self.oauth_issuer}/api/accounts/add-phone/send"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=f"{self.oauth_issuer}/add-phone",
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={"oai-device-id": device_id},
)
headers.update(generate_datadog_trace())
try:
kwargs = {
"json": {"phone_number": phone},
"headers": headers,
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.25)
resp = self.session.post(request_url, **kwargs)
except Exception as e:
return False, None, f"add-phone/send 异常: {e}"
self._log(f"/add-phone/send -> {resp.status_code}")
if resp.status_code != 200:
return False, None, f"add-phone/send 失败: {resp.status_code} - {resp.text[:180]}"
try:
data = resp.json()
except Exception:
return False, None, "add-phone/send 响应不是 JSON"
next_state = self._state_from_payload(data, current_url=str(resp.url) or request_url)
self._log(f"add-phone/send {describe_flow_state(next_state)}")
return True, next_state, ""
def _resend_phone_otp(self, device_id, user_agent, sec_ch_ua, impersonate, state: FlowState):
request_url = f"{self.oauth_issuer}/api/accounts/phone-otp/resend"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=state.current_url or state.continue_url or f"{self.oauth_issuer}/phone-verification",
origin=self.oauth_issuer,
fetch_site="same-origin",
extra_headers={"oai-device-id": device_id},
)
headers.update(generate_datadog_trace())
try:
kwargs = {"headers": headers, "timeout": 30, "allow_redirects": False}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.25)
resp = self.session.post(request_url, **kwargs)
except Exception as e:
return False, f"phone-otp/resend 异常: {e}"
self._log(f"/phone-otp/resend -> {resp.status_code}")
if resp.status_code == 200:
return True, ""
return False, f"phone-otp/resend 失败: {resp.status_code} - {resp.text[:180]}"
def _validate_phone_otp(self, code, device_id, user_agent, sec_ch_ua, impersonate, state: FlowState):
request_url = f"{self.oauth_issuer}/api/accounts/phone-otp/validate"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=state.current_url or state.continue_url or f"{self.oauth_issuer}/phone-verification",
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={"oai-device-id": device_id},
)
headers.update(generate_datadog_trace())
try:
kwargs = {
"json": {"code": code},
"headers": headers,
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.25)
resp = self.session.post(request_url, **kwargs)
except Exception as e:
return False, None, f"phone-otp/validate 异常: {e}"
self._log(f"/phone-otp/validate -> {resp.status_code}")
if resp.status_code != 200:
if resp.status_code == 401:
return False, None, "手机号验证码错误"
return False, None, f"phone-otp/validate 失败: {resp.status_code} - {resp.text[:180]}"
try:
data = resp.json()
except Exception:
return False, None, "phone-otp/validate 响应不是 JSON"
next_state = self._state_from_payload(data, current_url=str(resp.url) or request_url)
self._log(f"手机号 OTP 验证通过 {describe_flow_state(next_state)}")
return True, next_state, ""
def _handle_add_phone_verification(self, device_id, user_agent, sec_ch_ua, impersonate, state: FlowState):
phone_service = SMSToMePhoneService(self.config, log_fn=self._log)
if not phone_service.enabled:
self._set_error("OAuth 登录被 add_phone 阻断,当前账号需要手机号验证;未配置可用的 SMSToMe 号码池")
return None
excluded_prefixes = set()
last_failure = ""
for attempt in range(phone_service.max_attempts):
try:
entry = phone_service.acquire_phone(exclude_prefixes=excluded_prefixes)
except Exception as e:
last_failure = f"获取手机号失败: {e}"
self._log(last_failure)
break
if not entry:
last_failure = last_failure or "SMSToMe 号码池中无可用手机号"
break
prefix = phone_service.prefix_hint(entry.phone)
self._log(
f"步骤5: add_phone 选择手机号 {attempt + 1}/{phone_service.max_attempts}: {entry.phone} ({entry.country_slug})"
)
sent, next_state, detail = self._send_phone_number(
entry.phone,
device_id,
user_agent,
sec_ch_ua,
impersonate,
)
if not sent or not next_state:
last_failure = detail or "add-phone/send 未返回有效状态"
self._log(last_failure)
self._blacklist_phone_if_needed(phone_service, entry, last_failure)
excluded_prefixes.add(prefix)
continue
if next_state.page_type != "phone_otp_verification" and "phone-verification" not in f"{next_state.continue_url} {next_state.current_url}".lower():
last_failure = f"add-phone/send 未进入手机验证码页: {describe_flow_state(next_state)}"
self._log(last_failure)
self._blacklist_phone_if_needed(phone_service, entry, last_failure, next_state)
excluded_prefixes.add(prefix)
continue
session_data = self._decode_oauth_session_cookie() or {}
verification_channel = str(session_data.get("phone_verification_channel") or "sms").strip().lower() or "sms"
bound_phone = str(session_data.get("phone_number") or entry.phone).strip() or entry.phone
self._log(f"add_phone 发码成功: phone={bound_phone}, channel={verification_channel}")
if verification_channel != "sms":
last_failure = f"add_phone 已切到 {verification_channel} 通道,当前 SMSToMe 仅支持短信接码"
self._log(last_failure)
excluded_prefixes.add(prefix)
continue
code = phone_service.wait_for_code(entry)
if not code:
self._log("手机号验证码暂未收到,尝试重发一次...")
resend_ok, resend_detail = self._resend_phone_otp(
device_id,
user_agent,
sec_ch_ua,
impersonate,
next_state,
)
if resend_ok:
code = phone_service.wait_for_code(entry)
if not code:
last_failure = resend_detail or f"手机号 {entry.phone} 未收到短信验证码"
self._log(last_failure)
excluded_prefixes.add(prefix)
continue
valid, validated_state, detail = self._validate_phone_otp(
code,
device_id,
user_agent,
sec_ch_ua,
impersonate,
next_state,
)
if not valid or not validated_state:
last_failure = detail or "手机号 OTP 验证失败"
self._log(last_failure)
excluded_prefixes.add(prefix)
continue
return validated_state
self._set_error(f"add_phone 阶段失败: {last_failure or '未完成手机号验证'}")
return None
def _handle_otp_verification(self, email, device_id, user_agent, sec_ch_ua, impersonate, skymail_client, state):
"""处理 OAuth 阶段的邮箱 OTP 验证,返回服务端声明的下一步状态。"""
@@ -1039,7 +1406,7 @@ class OAuthClient:
skymail_client._used_codes = set()
tried_codes = set(getattr(skymail_client, "_used_codes", set()))
otp_deadline = time.time() + 30
otp_deadline = time.time() + 60
otp_sent_at = time.time()
def validate_otp(code):
@@ -1085,7 +1452,7 @@ class OAuthClient:
self._log("使用 wait_for_verification_code 进行阻塞式获取新验证码...")
while time.time() < otp_deadline:
remaining = max(1, int(otp_deadline - time.time()))
wait_time = min(8, remaining)
wait_time = min(10, remaining)
try:
code = skymail_client.wait_for_verification_code(
email,
@@ -1099,6 +1466,8 @@ class OAuthClient:
if not code:
self._log("暂未收到新的 OTP继续等待...")
if self.last_error:
break
continue
if code in tried_codes:
@@ -1108,6 +1477,8 @@ class OAuthClient:
next_state = validate_otp(code)
if next_state:
return next_state
if self.last_error:
break
else:
while time.time() < otp_deadline:
messages = skymail_client.fetch_emails(email) or []
@@ -1120,8 +1491,8 @@ class OAuthClient:
candidate_codes.append(code)
if not candidate_codes:
elapsed = int(30 - max(0, otp_deadline - time.time()))
self._log(f"等待新的 OTP... ({elapsed}s/30s)")
elapsed = int(60 - max(0, otp_deadline - time.time()))
self._log(f"等待新的 OTP... ({elapsed}s/60s)")
time.sleep(2)
continue
@@ -1131,6 +1502,9 @@ class OAuthClient:
return next_state
time.sleep(2)
if self.last_error:
break
self._log(f"OAuth 阶段 OTP 验证失败,已尝试 {len(tried_codes)} 个验证码")
if not self.last_error:
self._set_error(f"OAuth 阶段 OTP 验证失败,已尝试 {len(tried_codes)} 个验证码")
return None

View File

@@ -0,0 +1,98 @@
from __future__ import annotations
from pathlib import Path
from typing import Callable, Iterable, Optional
from smstome_tool import (
PhoneEntry,
get_unused_phone,
mark_phone_blacklisted,
parse_country_slugs,
update_global_phone_list,
wait_for_otp,
)
def _to_positive_int(value, default: int, *, minimum: int = 1) -> int:
try:
parsed = int(str(value).strip())
except Exception:
return default
return parsed if parsed >= minimum else default
def _prefix_hint(phone: str, width: int = 7) -> str:
value = str(phone or "").strip()
return value[: min(len(value), width)] if value else ""
class SMSToMePhoneService:
def __init__(self, config: Optional[dict] = None, log_fn: Optional[Callable[[str], None]] = None):
self.config = dict(config or {})
self.log_fn = log_fn or (lambda _msg: None)
self.cookie_header = str(self.config.get("smstome_cookie", "") or "").strip() or None
self.country_slugs = parse_country_slugs(self.config.get("smstome_country_slugs"))
self.global_file = Path(str(self.config.get("smstome_global_file") or "smstome_all_numbers.txt"))
self.used_numbers_dir = Path(str(self.config.get("smstome_used_numbers_dir") or "smstome_used"))
self.task_name = str(self.config.get("smstome_task_name") or "chatgpt_add_phone").strip() or "chatgpt_add_phone"
self.max_attempts = _to_positive_int(self.config.get("smstome_phone_attempts"), 3)
self.otp_timeout_seconds = _to_positive_int(self.config.get("smstome_otp_timeout_seconds"), 45, minimum=10)
self.poll_interval_seconds = _to_positive_int(self.config.get("smstome_poll_interval_seconds"), 5, minimum=1)
self.sync_max_pages_per_country = _to_positive_int(
self.config.get("smstome_sync_max_pages_per_country"),
5,
)
@property
def enabled(self) -> bool:
return self._has_pool_file() or bool(self.cookie_header)
def prefix_hint(self, phone: str) -> str:
return _prefix_hint(phone)
def _has_pool_file(self) -> bool:
try:
return self.global_file.exists() and self.global_file.stat().st_size > 0
except OSError:
return False
def ensure_pool_ready(self) -> None:
if self._has_pool_file():
return
if not self.cookie_header:
raise RuntimeError("未找到 SMSToMe 号码池文件,且未配置 smstome_cookie")
self.log_fn("SMSToMe 号码池不存在,开始自动同步...")
count = update_global_phone_list(
cookie_header=self.cookie_header,
countries=self.country_slugs or None,
output_path=self.global_file,
max_pages_per_country=self.sync_max_pages_per_country,
)
if count <= 0:
raise RuntimeError("SMSToMe 号码池同步后为空")
self.log_fn(f"SMSToMe 号码池同步完成,共 {count} 个号码")
def acquire_phone(self, *, exclude_prefixes: Optional[Iterable[str]] = None) -> Optional[PhoneEntry]:
self.ensure_pool_ready()
return get_unused_phone(
self.task_name,
country_slug=self.country_slugs or None,
global_file=self.global_file,
used_numbers_dir=self.used_numbers_dir,
exclude_prefixes=exclude_prefixes,
)
def mark_blacklisted(self, phone: str) -> None:
mark_phone_blacklisted(self.task_name, phone, used_numbers_dir=self.used_numbers_dir)
def wait_for_code(self, entry: PhoneEntry, *, timeout: Optional[int] = None) -> Optional[str]:
wait_seconds = _to_positive_int(timeout, self.otp_timeout_seconds, minimum=10)
return wait_for_otp(
entry,
cookie_header=self.cookie_header,
timeout=wait_seconds,
poll_interval=self.poll_interval_seconds,
trace=lambda message: self.log_fn(f"[SMSToMe] {message}"),
raise_on_timeout=False,
)

View File

@@ -84,6 +84,7 @@ class ChatGPTPlatform(BasePlatform):
browser_mode=browser_mode,
callback_logger=log_fn,
max_retries=max_retries,
extra_config=(self.config.extra or {}),
)
engine.email = email
engine.password = password
@@ -116,6 +117,7 @@ class ChatGPTPlatform(BasePlatform):
browser_mode=browser_mode,
callback_logger=log_fn,
max_retries=max_retries,
extra_config=(self.config.extra or {}),
)
if email:
engine.email = email

View File

@@ -12,6 +12,7 @@ from core.base_platform import AccountStatus
from platforms.chatgpt.register import RegistrationResult
from .chatgpt_client import ChatGPTClient
from .oauth_client import OAuthClient
from .utils import generate_random_name, generate_random_birthday
logger = logging.getLogger(__name__)
@@ -46,6 +47,7 @@ class RegistrationEngineV2:
callback_logger: Optional[Callable[[str], None]] = None,
task_uuid: Optional[str] = None,
max_retries: int = 3,
extra_config: Optional[dict] = None,
):
self.email_service = email_service
self.proxy_url = proxy_url
@@ -53,6 +55,7 @@ class RegistrationEngineV2:
self.callback_logger = callback_logger
self.task_uuid = task_uuid
self.max_retries = max(1, int(max_retries or 1))
self.extra_config = dict(extra_config or {})
self.email = None
self.password = None
@@ -69,6 +72,12 @@ class RegistrationEngineV2:
else:
logger.info(log_message)
def _format_oauth_failure(self, oauth_client: OAuthClient) -> str:
detail = str(getattr(oauth_client, "last_error", "") or "").strip()
if not detail:
detail = "获取最终 OAuth Tokens 失败"
return f"账号已创建成功,但 {detail}"
def _should_retry(self, message: str) -> bool:
text = str(message or "").lower()
retriable_markers = [
@@ -151,7 +160,7 @@ class RegistrationEngineV2:
result.error_message = last_error
return result
self._log("步骤 2/2: 复用注册会话,直接获取 ChatGPT Session / AccessToken...")
self._log("步骤 2/2: 优先复用注册会话取 ChatGPT Session / AccessToken...")
session_ok, session_result = chatgpt_client.reuse_session_and_get_tokens()
if session_ok:
@@ -181,10 +190,66 @@ class RegistrationEngineV2:
self._log("=" * 60)
return result
last_error = f"注册成功,但复用会话获取 AccessToken 失败: {session_result}"
if attempt < self.max_retries - 1:
self._log(f"{last_error},准备整流程重试")
continue
self._log(f"复用会话失败,回退到 OAuth 登录补全流程: {session_result}")
tokens = None
oauth_client = None
for oauth_attempt in range(2):
if oauth_attempt > 0:
self._log(f"同账号 OAuth 重试 {oauth_attempt + 1}/2 ...")
time.sleep(1)
oauth_client = OAuthClient(
config=self.extra_config,
proxy=self.proxy_url,
verbose=False,
browser_mode=self.browser_mode,
)
oauth_client._log = self._log
oauth_client.session = chatgpt_client.session
tokens = oauth_client.login_and_get_tokens(
email_addr,
pwd,
chatgpt_client.device_id,
chatgpt_client.ua,
chatgpt_client.sec_ch_ua,
chatgpt_client.impersonate,
skymail_adapter,
)
if tokens and tokens.get("access_token"):
break
if oauth_client.last_error and "add_phone" in oauth_client.last_error:
break
if tokens and tokens.get("access_token"):
self._log("OAuth 回退补全成功!")
result.success = True
result.access_token = tokens.get("access_token")
result.refresh_token = tokens.get("refresh_token")
result.id_token = tokens.get("id_token")
result.account_id = "v2_acct_" + chatgpt_client.device_id[:8]
session_data = oauth_client._decode_oauth_session_cookie()
if session_data:
workspaces = session_data.get("workspaces", [])
if workspaces:
result.workspace_id = str((workspaces[0] or {}).get("id") or "")
self._log(f"成功萃取 Workspace ID: {result.workspace_id}")
session_cookie = None
for cookie in oauth_client.session.cookies.jar:
if cookie.name == "__Secure-next-auth.session-token":
session_cookie = cookie.value
break
result.session_token = session_cookie
self._log("=" * 60)
self._log("注册流程成功结束!")
self._log("=" * 60)
return result
last_error = self._format_oauth_failure(oauth_client)
result.error_message = last_error
return result
except Exception as attempt_error:

1122
smstome_tool.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,203 @@
import base64
import json
import tempfile
import unittest
from pathlib import Path
from unittest import mock
from platforms.chatgpt.oauth_client import OAuthClient
from platforms.chatgpt.phone_service import SMSToMePhoneService
from platforms.chatgpt.utils import FlowState
from smstome_tool import PhoneEntry, parse_country_slugs
class OAuthCookieDecodeTests(unittest.TestCase):
def test_decode_signed_cookie_payload(self):
payload = {
"email": "demo@example.com",
"phone_number": "+447456344799",
"phone_verification_channel": "whatsapp",
}
encoded = base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8")).decode("utf-8").rstrip("=")
cookie_value = f"{encoded}.sig-a.sig-b"
self.assertEqual(OAuthClient._decode_cookie_json_value(cookie_value), payload)
def test_decode_invalid_cookie_payload(self):
self.assertIsNone(OAuthClient._decode_cookie_json_value("not-a-valid-cookie"))
class SMSToMeConfigTests(unittest.TestCase):
def test_parse_country_slugs_accepts_csv_and_iterables(self):
self.assertEqual(
parse_country_slugs("united-kingdom, poland;finland"),
["united-kingdom", "poland", "finland"],
)
self.assertEqual(
parse_country_slugs(["united-kingdom", "poland", "united_kingdom"]),
["united-kingdom", "poland"],
)
def test_phone_service_enabled_when_pool_file_exists(self):
with tempfile.TemporaryDirectory() as tmp_dir:
pool_path = Path(tmp_dir) / "phones.txt"
pool_path.write_text("+447456344799\tunited-kingdom\thttps://example.com\n", encoding="utf-8")
service = SMSToMePhoneService({"smstome_global_file": str(pool_path)})
self.assertTrue(service.enabled)
def test_phone_service_disabled_for_empty_pool_without_cookie(self):
with tempfile.TemporaryDirectory() as tmp_dir:
pool_path = Path(tmp_dir) / "phones.txt"
pool_path.write_text("", encoding="utf-8")
service = SMSToMePhoneService({"smstome_global_file": str(pool_path)})
self.assertFalse(service.enabled)
def test_wait_for_code_forwards_cookie_timeout_and_poll_interval(self):
entry = PhoneEntry(
country_slug="united-kingdom",
phone="+447456344799",
detail_url="https://example.com/phone/1",
)
service = SMSToMePhoneService(
{
"smstome_cookie": "cf_clearance=demo",
"smstome_otp_timeout_seconds": "66",
"smstome_poll_interval_seconds": "7",
}
)
with mock.patch("platforms.chatgpt.phone_service.wait_for_otp", return_value="123456") as mocked:
code = service.wait_for_code(entry)
self.assertEqual(code, "123456")
mocked.assert_called_once()
kwargs = mocked.call_args.kwargs
self.assertEqual(kwargs["cookie_header"], "cf_clearance=demo")
self.assertEqual(kwargs["timeout"], 66)
self.assertEqual(kwargs["poll_interval"], 7)
self.assertFalse(kwargs["raise_on_timeout"])
def test_ensure_pool_ready_syncs_with_configured_page_limit(self):
with tempfile.TemporaryDirectory() as tmp_dir:
pool_path = Path(tmp_dir) / "phones.txt"
service = SMSToMePhoneService(
{
"smstome_cookie": "cf_clearance=demo",
"smstome_country_slugs": "united-kingdom",
"smstome_global_file": str(pool_path),
"smstome_sync_max_pages_per_country": "9",
}
)
with mock.patch("platforms.chatgpt.phone_service.update_global_phone_list", return_value=3) as mocked:
service.ensure_pool_ready()
mocked.assert_called_once()
kwargs = mocked.call_args.kwargs
self.assertEqual(kwargs["cookie_header"], "cf_clearance=demo")
self.assertEqual(kwargs["countries"], ["united-kingdom"])
self.assertEqual(kwargs["output_path"], pool_path)
self.assertEqual(kwargs["max_pages_per_country"], 9)
class OAuthPhoneBlacklistTests(unittest.TestCase):
def test_should_blacklist_explicit_phone_rejection(self):
state = FlowState(
page_type="add_phone",
payload={"error": {"message": "phone number is invalid"}},
)
self.assertTrue(
OAuthClient._should_blacklist_phone_failure(
"add-phone/send 失败: 400 - phone number is invalid",
state,
)
)
def test_should_not_blacklist_whatsapp_or_delivery_failures(self):
self.assertFalse(
OAuthClient._should_blacklist_phone_failure(
"add_phone 已切到 whatsapp 通道,当前 SMSToMe 仅支持短信接码"
)
)
self.assertFalse(
OAuthClient._should_blacklist_phone_failure("手机号 +447000000001 未收到短信验证码")
)
def test_handle_add_phone_blacklists_explicitly_rejected_number(self):
client = OAuthClient(config={}, verbose=False)
client._log = lambda _msg: None
entry = PhoneEntry(
country_slug="united-kingdom",
phone="+447000000001",
detail_url="https://example.com/phone/1",
)
phone_service = mock.Mock()
phone_service.enabled = True
phone_service.max_attempts = 1
phone_service.acquire_phone.return_value = entry
phone_service.prefix_hint.return_value = "+447000"
with mock.patch("platforms.chatgpt.oauth_client.SMSToMePhoneService", return_value=phone_service):
with mock.patch.object(
client,
"_send_phone_number",
return_value=(False, None, "add-phone/send 失败: 400 - phone number is invalid"),
):
state = client._handle_add_phone_verification(
"device-id",
"Mozilla/5.0",
None,
None,
FlowState(page_type="add_phone"),
)
self.assertIsNone(state)
phone_service.mark_blacklisted.assert_called_once_with(entry.phone)
self.assertIn("add_phone 阶段失败", client.last_error)
def test_handle_add_phone_does_not_blacklist_whatsapp_channel(self):
client = OAuthClient(config={}, verbose=False)
client._log = lambda _msg: None
entry = PhoneEntry(
country_slug="united-kingdom",
phone="+447000000002",
detail_url="https://example.com/phone/2",
)
phone_service = mock.Mock()
phone_service.enabled = True
phone_service.max_attempts = 1
phone_service.acquire_phone.return_value = entry
phone_service.prefix_hint.return_value = "+447000"
next_state = FlowState(
page_type="phone_otp_verification",
continue_url="https://auth.openai.com/phone-verification",
)
with mock.patch("platforms.chatgpt.oauth_client.SMSToMePhoneService", return_value=phone_service):
with mock.patch.object(client, "_send_phone_number", return_value=(True, next_state, "")):
with mock.patch.object(
client,
"_decode_oauth_session_cookie",
return_value={
"phone_verification_channel": "whatsapp",
"phone_number": entry.phone,
},
):
state = client._handle_add_phone_verification(
"device-id",
"Mozilla/5.0",
None,
None,
FlowState(page_type="add_phone"),
)
self.assertIsNone(state)
phone_service.mark_blacklisted.assert_not_called()
self.assertIn("whatsapp", client.last_error)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,72 @@
import tempfile
import unittest
from pathlib import Path
from core.config_store import (
_canonical_config_key,
_get_env_fallback_value,
_load_env_file,
_merge_env_fallback,
_normalize_config_value,
)
class ConfigStoreEnvFallbackTests(unittest.TestCase):
def test_normalize_config_value_strips_matching_quotes(self):
self.assertEqual(_normalize_config_value('"quoted"'), "quoted")
self.assertEqual(_normalize_config_value("'quoted'"), "quoted")
self.assertEqual(_normalize_config_value("plain"), "plain")
def test_load_env_file_supports_export_and_quotes(self):
with tempfile.TemporaryDirectory() as tmp_dir:
env_path = Path(tmp_dir) / ".env"
env_path.write_text(
"\n".join(
[
"# comment",
"export SMSTOME_COOKIE='cf_clearance=demo'",
'cfworker_custom_auth="secret-pass"',
]
),
encoding="utf-8",
)
values = _load_env_file(env_path)
self.assertEqual(values["SMSTOME_COOKIE"], "cf_clearance=demo")
self.assertEqual(values["cfworker_custom_auth"], "secret-pass")
def test_get_env_fallback_value_matches_uppercase_env_names(self):
env_values = {
"SMSTOME_COOKIE": "cf_clearance=demo",
"CFWORKER_CUSTOM_AUTH": "secret-pass",
}
self.assertEqual(
_get_env_fallback_value("smstome_cookie", env_values=env_values),
"cf_clearance=demo",
)
self.assertEqual(
_get_env_fallback_value("cfworker_custom_auth", env_values=env_values),
"secret-pass",
)
def test_merge_env_fallback_uses_canonical_key_without_overriding_db(self):
merged = _merge_env_fallback(
{
"smstome_cookie": "",
"cfworker_custom_auth": "db-value",
},
env_values={
"SMSTOME_COOKIE": "cf_clearance=demo",
"CFWORKER_CUSTOM_AUTH": "env-value",
},
)
self.assertEqual(_canonical_config_key("SMSTOME_COOKIE"), "smstome_cookie")
self.assertEqual(merged["smstome_cookie"], "cf_clearance=demo")
self.assertEqual(merged["cfworker_custom_auth"], "db-value")
if __name__ == "__main__":
unittest.main()