From d905b2f7d8992f4813216cdc4429f2b8ea5d3451 Mon Sep 17 00:00:00 2001 From: highkay Date: Mon, 30 Mar 2026 17:41:24 +0800 Subject: [PATCH] Add ChatGPT phone verification with SMSToMe --- .gitignore | 5 +- api/config.py | 1 + core/config_store.py | 121 ++- frontend/src/pages/Accounts.tsx | 2 + frontend/src/pages/Register.tsx | 30 + frontend/src/pages/Settings.tsx | 2 + platforms/chatgpt/chatgpt_client.py | 2 +- platforms/chatgpt/oauth_client.py | 448 ++++++++- platforms/chatgpt/phone_service.py | 98 ++ platforms/chatgpt/plugin.py | 2 + platforms/chatgpt/register_v2.py | 75 +- smstome_tool.py | 1122 +++++++++++++++++++++++ tests/test_chatgpt_phone_flow.py | 203 ++++ tests/test_config_store_env_fallback.py | 72 ++ 14 files changed, 2136 insertions(+), 47 deletions(-) create mode 100644 platforms/chatgpt/phone_service.py create mode 100644 smstome_tool.py create mode 100644 tests/test_chatgpt_phone_flow.py create mode 100644 tests/test_config_store_env_fallback.py diff --git a/.gitignore b/.gitignore index d27c268..d87880b 100644 --- a/.gitignore +++ b/.gitignore @@ -52,4 +52,7 @@ data/ *.swp .claude/ .docs/superpowers/ -CLAUDE.md \ No newline at end of file +CLAUDE.md +.ace-tool/ +smstome*_numbers.txt +smstome_used/ diff --git a/api/config.py b/api/config.py index bd71bbb..f59f2b8 100644 --- a/api/config.py +++ b/api/config.py @@ -14,6 +14,7 @@ CONFIG_KEYS = [ "mail_provider", "cfworker_api_url", "cfworker_admin_token", "cfworker_custom_auth", "cfworker_domain", "cfworker_fingerprint", "smstome_cookie", "smstome_country_slugs", "smstome_phone_attempts", "smstome_otp_timeout_seconds", + "smstome_poll_interval_seconds", "smstome_sync_max_pages_per_country", "luckmail_base_url", "luckmail_api_key", "luckmail_email_type", "luckmail_domain", "cpa_api_url", "cpa_api_key", "team_manager_url", "team_manager_key", diff --git a/core/config_store.py b/core/config_store.py index 198271d..cb7da7d 100644 --- a/core/config_store.py +++ b/core/config_store.py @@ -1,9 +1,118 @@ -"""全局配置持久化 - 存储在 SQLite""" +"""全局配置持久化 - 存储在 SQLite,并在缺省时回退到环境变量/.env。""" +import os +import re +from pathlib import Path from typing import Optional from sqlmodel import Field, SQLModel, Session, select from .db import engine +_ENV_FILE = Path(__file__).resolve().parent.parent / ".env" + + +def _normalize_config_value(value) -> str: + text = str(value or "").strip() + if len(text) >= 2 and text[0] == text[-1] and text[0] in {"'", '"'}: + return text[1:-1] + return text + + +def _canonical_config_key(key: str) -> str: + value = str(key or "").strip() + if not value: + return "" + return re.sub(r"[^a-z0-9]+", "_", value.lower()).strip("_") + + +def _config_key_candidates(key: str) -> list[str]: + raw = str(key or "").strip() + if not raw: + return [] + + normalized = re.sub(r"[^A-Za-z0-9]+", "_", raw).strip("_") + candidates: list[str] = [] + seen = set() + for item in ( + raw, + raw.lower(), + raw.upper(), + normalized, + normalized.lower(), + normalized.upper(), + ): + value = str(item or "").strip() + if value and value not in seen: + seen.add(value) + candidates.append(value) + return candidates + + +def _load_env_file(path: Path | str | None = None) -> dict[str, str]: + env_path = Path(path or _ENV_FILE) + if not env_path.exists(): + return {} + + try: + lines = env_path.read_text(encoding="utf-8", errors="ignore").splitlines() + except Exception: + return {} + + values: dict[str, str] = {} + for raw_line in lines: + line = raw_line.strip() + if not line or line.startswith("#"): + continue + if line.lower().startswith("export "): + line = line[7:].strip() + if "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + if not key: + continue + values[key] = _normalize_config_value(value) + return values + + +def _runtime_env_values() -> dict[str, str]: + values: dict[str, str] = {} + for key, value in _load_env_file().items(): + text = _normalize_config_value(value) + if text: + values[key] = text + for key, value in os.environ.items(): + text = _normalize_config_value(value) + if text: + values[key] = text + return values + + +def _get_env_fallback_value(key: str, env_values: Optional[dict[str, str]] = None) -> str: + values = env_values if env_values is not None else _runtime_env_values() + for candidate in _config_key_candidates(key): + text = str(values.get(candidate, "") or "").strip() + if text: + return text + return "" + + +def _merge_env_fallback(values: dict[str, str], env_values: Optional[dict[str, str]] = None) -> dict[str, str]: + merged = dict(values or {}) + runtime_values = env_values if env_values is not None else _runtime_env_values() + for env_key, env_value in runtime_values.items(): + text = str(env_value or "").strip() + if not text: + continue + canonical_key = _canonical_config_key(env_key) + for target_key in (env_key, canonical_key): + if not target_key: + continue + if str(merged.get(target_key, "") or "").strip(): + continue + merged[target_key] = text + return merged + + class ConfigItem(SQLModel, table=True): __tablename__ = "configs" key: str = Field(primary_key=True) @@ -14,9 +123,14 @@ class ConfigStore: """简单 key-value 配置存储""" def get(self, key: str, default: str = "") -> str: + env_values = _runtime_env_values() with Session(engine) as s: item = s.get(ConfigItem, key) - return item.value if item else default + value = str(item.value if item else "" or "").strip() + if value: + return value + fallback = _get_env_fallback_value(key, env_values=env_values) + return fallback or default def set(self, key: str, value: str) -> None: with Session(engine) as s: @@ -31,7 +145,8 @@ class ConfigStore: def get_all(self) -> dict: with Session(engine) as s: items = s.exec(select(ConfigItem)).all() - return {i.key: i.value for i in items} + values = {i.key: i.value for i in items} + return _merge_env_fallback(values) def set_many(self, data: dict) -> None: with Session(engine) as s: diff --git a/frontend/src/pages/Accounts.tsx b/frontend/src/pages/Accounts.tsx index 0afe5bd..ac3365c 100644 --- a/frontend/src/pages/Accounts.tsx +++ b/frontend/src/pages/Accounts.tsx @@ -316,6 +316,8 @@ export default function Accounts() { smstome_country_slugs: cfg.smstome_country_slugs, smstome_phone_attempts: cfg.smstome_phone_attempts, smstome_otp_timeout_seconds: cfg.smstome_otp_timeout_seconds, + smstome_poll_interval_seconds: cfg.smstome_poll_interval_seconds, + smstome_sync_max_pages_per_country: cfg.smstome_sync_max_pages_per_country, }, }), }) diff --git a/frontend/src/pages/Register.tsx b/frontend/src/pages/Register.tsx index bffb502..ab95a35 100644 --- a/frontend/src/pages/Register.tsx +++ b/frontend/src/pages/Register.tsx @@ -55,6 +55,8 @@ export default function Register() { smstome_country_slugs: cfg.smstome_country_slugs || '', smstome_phone_attempts: cfg.smstome_phone_attempts || '', smstome_otp_timeout_seconds: cfg.smstome_otp_timeout_seconds || '', + smstome_poll_interval_seconds: cfg.smstome_poll_interval_seconds || '', + smstome_sync_max_pages_per_country: cfg.smstome_sync_max_pages_per_country || '', luckmail_base_url: cfg.luckmail_base_url || 'https://mails.luckyous.com/', luckmail_api_key: cfg.luckmail_api_key || '', luckmail_email_type: cfg.luckmail_email_type || '', @@ -98,6 +100,8 @@ export default function Register() { smstome_country_slugs: values.smstome_country_slugs, smstome_phone_attempts: values.smstome_phone_attempts, smstome_otp_timeout_seconds: values.smstome_otp_timeout_seconds, + smstome_poll_interval_seconds: values.smstome_poll_interval_seconds, + smstome_sync_max_pages_per_country: values.smstome_sync_max_pages_per_country, luckmail_base_url: values.luckmail_base_url, luckmail_api_key: values.luckmail_api_key, luckmail_email_type: values.luckmail_email_type, @@ -260,6 +264,32 @@ export default function Register() { )} + {platform === 'chatgpt' && ( + + + 仅在 OAuth 流程进入 `add_phone` 时使用,用于自动取号并轮询短信验证码。 + + + + + + + + + + + + + + + + + + + + + )} + {captchaSolver === 'yescaptcha' && ( diff --git a/frontend/src/pages/Settings.tsx b/frontend/src/pages/Settings.tsx index d59bfb1..42b8bc1 100644 --- a/frontend/src/pages/Settings.tsx +++ b/frontend/src/pages/Settings.tsx @@ -163,6 +163,8 @@ const TAB_ITEMS = [ { key: 'smstome_country_slugs', label: '国家列表', placeholder: 'united-kingdom,poland' }, { key: 'smstome_phone_attempts', label: '手机号尝试次数', placeholder: '3' }, { key: 'smstome_otp_timeout_seconds', label: '短信等待秒数', placeholder: '45' }, + { key: 'smstome_poll_interval_seconds', label: '轮询间隔秒数', placeholder: '5' }, + { key: 'smstome_sync_max_pages_per_country', label: '每国同步页数', placeholder: '5' }, ], }, ], diff --git a/platforms/chatgpt/chatgpt_client.py b/platforms/chatgpt/chatgpt_client.py index 59ec3d7..0f4cb82 100644 --- a/platforms/chatgpt/chatgpt_client.py +++ b/platforms/chatgpt/chatgpt_client.py @@ -823,7 +823,7 @@ class ChatGPTClient: if self._state_is_email_otp(state): self._log("等待邮箱验证码...") - otp_code = skymail_client.wait_for_verification_code(email, timeout=30) + otp_code = skymail_client.wait_for_verification_code(email, timeout=90) if not otp_code: return False, "未收到验证码" diff --git a/platforms/chatgpt/oauth_client.py b/platforms/chatgpt/oauth_client.py index d6b4726..0b735b8 100644 --- a/platforms/chatgpt/oauth_client.py +++ b/platforms/chatgpt/oauth_client.py @@ -11,6 +11,7 @@ try: except ImportError: import requests as curl_requests +from .phone_service import SMSToMePhoneService from .utils import ( FlowState, build_browser_headers, @@ -38,12 +39,14 @@ class OAuthClient: verbose: 是否输出详细日志 browser_mode: protocol | headless | headed """ - self.oauth_issuer = config.get("oauth_issuer", "https://auth.openai.com") - self.oauth_client_id = config.get("oauth_client_id", "app_EMoamEEZ73f0CkXaXp7hrann") - self.oauth_redirect_uri = config.get("oauth_redirect_uri", "http://localhost:1455/auth/callback") + self.config = dict(config or {}) + self.oauth_issuer = self.config.get("oauth_issuer", "https://auth.openai.com") + self.oauth_client_id = self.config.get("oauth_client_id", "app_EMoamEEZ73f0CkXaXp7hrann") + self.oauth_redirect_uri = self.config.get("oauth_redirect_uri", "http://localhost:1455/auth/callback") self.proxy = proxy self.verbose = verbose self.browser_mode = browser_mode or "protocol" + self.last_error = "" # 创建 session self.session = curl_requests.Session() @@ -55,11 +58,110 @@ class OAuthClient: if self.verbose: print(f" [OAuth] {msg}") + def _set_error(self, message): + self.last_error = str(message or "").strip() + if self.last_error: + self._log(self.last_error) + def _browser_pause(self, low=0.15, high=0.4): """在 headed 模式下注入轻微延迟,模拟真实浏览器操作节奏。""" if self.browser_mode == "headed": random_delay(low, high) + @staticmethod + def _iter_text_fragments(value): + if isinstance(value, str): + text = value.strip() + if text: + yield text + return + if isinstance(value, dict): + for item in value.values(): + yield from OAuthClient._iter_text_fragments(item) + return + if isinstance(value, (list, tuple, set)): + for item in value: + yield from OAuthClient._iter_text_fragments(item) + + @classmethod + def _should_blacklist_phone_failure(cls, detail="", state: FlowState | None = None): + fragments = [str(detail or "").strip()] + if state is not None: + fragments.extend( + cls._iter_text_fragments( + { + "page_type": state.page_type, + "continue_url": state.continue_url, + "current_url": state.current_url, + "payload": state.payload, + "raw": state.raw, + } + ) + ) + + combined = " | ".join(fragment for fragment in fragments if fragment).lower() + if not combined: + return False + + non_blacklist_markers = ( + "whatsapp", + "未收到短信验证码", + "手机号验证码错误", + "phone-otp/resend", + "phone-otp/validate 异常", + "phone-otp/validate 响应不是 json", + "phone-otp/validate 失败", + "timeout", + "timed out", + "network", + "connection", + "proxy", + "ssl", + "tls", + "captcha", + "too many phone", + "too many phone numbers", + "too many verification requests", + "验证请求过多", + "接受短信次数过多", + "session limit", + "rate limit", + ) + if any(marker in combined for marker in non_blacklist_markers): + return False + + blacklist_markers = ( + "phone number is invalid", + "invalid phone number", + "invalid phone", + "phone number invalid", + "sms verification failed", + "send sms verification failed", + "unable to send sms", + "not a valid mobile number", + "unsupported phone number", + "phone number not supported", + "carrier not supported", + "电话号码无效", + "手机号无效", + "发送短信验证失败", + "号码无效", + "号码不支持", + "手机号不支持", + ) + return any(marker in combined for marker in blacklist_markers) + + def _blacklist_phone_if_needed(self, phone_service, entry, detail="", state: FlowState | None = None): + if not entry or not self._should_blacklist_phone_failure(detail, state): + return False + try: + phone_service.mark_blacklisted(entry.phone) + self._log(f"已将手机号加入黑名单: {entry.phone}") + return True + except Exception as e: + self._log(f"写入手机号黑名单失败: {e}") + return False + def _headers( self, url, @@ -142,6 +244,10 @@ class OAuthClient: target = f"{state.continue_url} {state.current_url}".lower() return state.page_type == "email_otp_verification" or "email-verification" in target or "email-otp" in target + def _state_is_add_phone(self, state: FlowState): + target = f"{state.continue_url} {state.current_url}".lower() + return state.page_type == "add_phone" or "add-phone" in target + def _state_requires_navigation(self, state: FlowState): method = (state.method or "GET").upper() if method != "GET": @@ -335,7 +441,7 @@ class OAuthClient: impersonate=impersonate, ) if not sentinel_token: - self._log("无法获取 sentinel token (authorize_continue)") + self._set_error("无法获取 sentinel token (authorize_continue)") return None request_url = f"{self.oauth_issuer}/api/accounts/authorize/continue" @@ -391,7 +497,7 @@ class OAuthClient: self._log(f"/authorize/continue(重试) -> {r.status_code}") if r.status_code != 200: - self._log(f"提交邮箱失败: {r.text[:180]}") + self._set_error(f"提交邮箱失败: {r.status_code} - {r.text[:180]}") return None data = r.json() @@ -399,7 +505,7 @@ class OAuthClient: self._log(describe_flow_state(flow_state)) return flow_state except Exception as e: - self._log(f"提交邮箱异常: {e}") + self._set_error(f"提交邮箱异常: {e}") return None def _submit_password_verify(self, password, device_id, *, user_agent=None, sec_ch_ua=None, impersonate=None, referer=None): @@ -415,7 +521,7 @@ class OAuthClient: impersonate=impersonate, ) if not sentinel_pwd: - self._log("无法获取 sentinel token (password_verify)") + self._set_error("无法获取 sentinel token (password_verify)") return None request_url = f"{self.oauth_issuer}/api/accounts/password/verify" @@ -445,7 +551,7 @@ class OAuthClient: self._log(f"/password/verify -> {r.status_code}") if r.status_code != 200: - self._log(f"密码验证失败: {r.text[:180]}") + self._set_error(f"密码验证失败: {r.status_code} - {r.text[:180]}") return None data = r.json() @@ -453,7 +559,7 @@ class OAuthClient: self._log(f"verify {describe_flow_state(flow_state)}") return flow_state except Exception as e: - self._log(f"密码验证异常: {e}") + self._set_error(f"密码验证异常: {e}") return None def login_and_get_tokens(self, email, password, device_id, user_agent=None, sec_ch_ua=None, impersonate=None, skymail_client=None): @@ -472,6 +578,7 @@ class OAuthClient: Returns: dict: tokens 字典,包含 access_token, refresh_token, id_token """ + self.last_error = "" self._log("开始 OAuth 登录流程...") code_verifier, code_challenge = generate_pkce() @@ -499,7 +606,7 @@ class OAuthClient: impersonate=impersonate, ) if not authorize_final_url: - self._log("Bootstrap 失败") + self._set_error("Bootstrap 失败") return None continue_referer = ( @@ -519,6 +626,8 @@ class OAuthClient: authorize_params=authorize_params, ) if not state: + if not self.last_error: + self._set_error("提交邮箱后未进入有效的 OAuth 状态") return None self._log(f"OAuth 状态起点: {describe_flow_state(state)}") @@ -529,7 +638,7 @@ class OAuthClient: signature = self._state_signature(state) seen_states[signature] = seen_states.get(signature, 0) + 1 if seen_states[signature] > 2: - self._log(f"OAuth 状态卡住: {describe_flow_state(state)}") + self._set_error(f"OAuth 状态卡住: {describe_flow_state(state)}") return None code = self._extract_code_from_state(state) @@ -553,6 +662,8 @@ class OAuthClient: referer=state.current_url or state.continue_url or referer, ) if not next_state: + if not self.last_error: + self._set_error("密码验证后未进入下一步 OAuth 状态") return None referer = state.current_url or referer state = next_state @@ -560,7 +671,7 @@ class OAuthClient: if self._state_is_email_otp(state): if not skymail_client: - self._log("当前流程需要邮箱 OTP,但缺少接码客户端") + self._set_error("当前流程需要邮箱 OTP,但缺少接码客户端") return None next_state = self._handle_otp_verification( email, @@ -572,6 +683,24 @@ class OAuthClient: state, ) if not next_state: + if not self.last_error: + self._set_error("邮箱 OTP 验证后未进入下一步 OAuth 状态") + return None + referer = state.current_url or referer + state = next_state + continue + + if self._state_is_add_phone(state): + next_state = self._handle_add_phone_verification( + device_id, + user_agent, + sec_ch_ua, + impersonate, + state, + ) + if not next_state: + if not self.last_error: + self._set_error("手机号验证后未进入下一步 OAuth 状态") return None referer = state.current_url or referer state = next_state @@ -621,10 +750,14 @@ class OAuthClient: self._log(f"workspace state -> {describe_flow_state(state)}") continue - self._log(f"未支持的 OAuth 状态: {describe_flow_state(state)}") + if not self.last_error: + self._set_error(f"workspace/org 选择失败: {describe_flow_state(state)}") + return None + + self._set_error(f"未支持的 OAuth 状态: {describe_flow_state(state)}") return None - self._log("OAuth 状态机超出最大步数") + self._set_error("OAuth 状态机超出最大步数") return None def _extract_code_from_url(self, url): @@ -664,17 +797,17 @@ class OAuthClient: self._log(f"无法获取 consent session 数据 (尝试 {attempt + 1}/{max_retries})") time.sleep(0.3) else: - self._log("无法获取 consent session 数据") + self._set_error("无法获取 consent session 数据") return None, None workspaces = session_data.get("workspaces", []) if not workspaces: - self._log("session 中没有 workspace 信息") + self._set_error("session 中没有 workspace 信息") return None, None workspace_id = (workspaces[0] or {}).get("id") if not workspace_id: - self._log("workspace_id 为空") + self._set_error("workspace_id 为空") return None, None self._log(f"选择 workspace: {workspace_id}") @@ -794,7 +927,7 @@ class OAuthClient: return self._extract_code_from_state(org_state), org_state return None, org_state except Exception as e: - self._log(f"解析 organization/select 响应异常: {e}") + self._set_error(f"解析 organization/select 响应异常: {e}") # 如果有 continue_url,跟随它 if continue_url: @@ -804,10 +937,12 @@ class OAuthClient: return None, workspace_state except Exception as e: - self._log(f"处理 workspace/select 响应异常: {e}") + self._set_error(f"处理 workspace/select 响应异常: {e}") + return None, None except Exception as e: - self._log(f"workspace/select 异常: {e}") + self._set_error(f"workspace/select 异常: {e}") + return None, None return None, None @@ -951,9 +1086,6 @@ class OAuthClient: def _decode_oauth_session_cookie(self): """解码 oai-client-auth-session cookie""" - import json - import base64 - try: for cookie in self.session.cookies: try: @@ -961,19 +1093,44 @@ class OAuthClient: if name == "oai-client-auth-session": value = cookie.value if hasattr(cookie, 'value') else self.session.cookies.get(name) if value: - padded = value + "=" * (-len(value) % 4) - try: - decoded = base64.b64decode(padded).decode('utf-8') - except Exception: - decoded = base64.urlsafe_b64decode(padded).decode('utf-8') - data = json.loads(decoded) - return data + data = self._decode_cookie_json_value(value) + if data: + return data except Exception: continue except Exception: pass return None + + @staticmethod + def _decode_cookie_json_value(value): + import base64 + import json + + raw_value = str(value or "").strip() + if not raw_value: + return None + + candidates = [raw_value] + if "." in raw_value: + candidates.insert(0, raw_value.split(".", 1)[0]) + + for candidate in candidates: + candidate = candidate.strip() + if not candidate: + continue + padded = candidate + "=" * (-len(candidate) % 4) + for decoder in (base64.urlsafe_b64decode, base64.b64decode): + try: + decoded = decoder(padded).decode("utf-8") + parsed = json.loads(decoded) + except Exception: + continue + if isinstance(parsed, dict): + return parsed + + return None def _exchange_code_for_tokens(self, code, code_verifier, user_agent, impersonate): """用 authorization code 换取 tokens""" @@ -1008,12 +1165,222 @@ class OAuthClient: if r.status_code == 200: return r.json() else: - self._log(f"换取 tokens 失败: {r.status_code} - {r.text[:200]}") + self._set_error(f"换取 tokens 失败: {r.status_code} - {r.text[:200]}") except Exception as e: - self._log(f"换取 tokens 异常: {e}") + self._set_error(f"换取 tokens 异常: {e}") return None + + def _send_phone_number(self, phone, device_id, user_agent, sec_ch_ua, impersonate): + request_url = f"{self.oauth_issuer}/api/accounts/add-phone/send" + headers = self._headers( + request_url, + user_agent=user_agent, + sec_ch_ua=sec_ch_ua, + accept="application/json", + referer=f"{self.oauth_issuer}/add-phone", + origin=self.oauth_issuer, + content_type="application/json", + fetch_site="same-origin", + extra_headers={"oai-device-id": device_id}, + ) + headers.update(generate_datadog_trace()) + + try: + kwargs = { + "json": {"phone_number": phone}, + "headers": headers, + "timeout": 30, + "allow_redirects": False, + } + if impersonate: + kwargs["impersonate"] = impersonate + + self._browser_pause(0.12, 0.25) + resp = self.session.post(request_url, **kwargs) + except Exception as e: + return False, None, f"add-phone/send 异常: {e}" + + self._log(f"/add-phone/send -> {resp.status_code}") + if resp.status_code != 200: + return False, None, f"add-phone/send 失败: {resp.status_code} - {resp.text[:180]}" + + try: + data = resp.json() + except Exception: + return False, None, "add-phone/send 响应不是 JSON" + + next_state = self._state_from_payload(data, current_url=str(resp.url) or request_url) + self._log(f"add-phone/send {describe_flow_state(next_state)}") + return True, next_state, "" + + def _resend_phone_otp(self, device_id, user_agent, sec_ch_ua, impersonate, state: FlowState): + request_url = f"{self.oauth_issuer}/api/accounts/phone-otp/resend" + headers = self._headers( + request_url, + user_agent=user_agent, + sec_ch_ua=sec_ch_ua, + accept="application/json", + referer=state.current_url or state.continue_url or f"{self.oauth_issuer}/phone-verification", + origin=self.oauth_issuer, + fetch_site="same-origin", + extra_headers={"oai-device-id": device_id}, + ) + headers.update(generate_datadog_trace()) + + try: + kwargs = {"headers": headers, "timeout": 30, "allow_redirects": False} + if impersonate: + kwargs["impersonate"] = impersonate + self._browser_pause(0.12, 0.25) + resp = self.session.post(request_url, **kwargs) + except Exception as e: + return False, f"phone-otp/resend 异常: {e}" + + self._log(f"/phone-otp/resend -> {resp.status_code}") + if resp.status_code == 200: + return True, "" + return False, f"phone-otp/resend 失败: {resp.status_code} - {resp.text[:180]}" + + def _validate_phone_otp(self, code, device_id, user_agent, sec_ch_ua, impersonate, state: FlowState): + request_url = f"{self.oauth_issuer}/api/accounts/phone-otp/validate" + headers = self._headers( + request_url, + user_agent=user_agent, + sec_ch_ua=sec_ch_ua, + accept="application/json", + referer=state.current_url or state.continue_url or f"{self.oauth_issuer}/phone-verification", + origin=self.oauth_issuer, + content_type="application/json", + fetch_site="same-origin", + extra_headers={"oai-device-id": device_id}, + ) + headers.update(generate_datadog_trace()) + + try: + kwargs = { + "json": {"code": code}, + "headers": headers, + "timeout": 30, + "allow_redirects": False, + } + if impersonate: + kwargs["impersonate"] = impersonate + self._browser_pause(0.12, 0.25) + resp = self.session.post(request_url, **kwargs) + except Exception as e: + return False, None, f"phone-otp/validate 异常: {e}" + + self._log(f"/phone-otp/validate -> {resp.status_code}") + if resp.status_code != 200: + if resp.status_code == 401: + return False, None, "手机号验证码错误" + return False, None, f"phone-otp/validate 失败: {resp.status_code} - {resp.text[:180]}" + + try: + data = resp.json() + except Exception: + return False, None, "phone-otp/validate 响应不是 JSON" + + next_state = self._state_from_payload(data, current_url=str(resp.url) or request_url) + self._log(f"手机号 OTP 验证通过 {describe_flow_state(next_state)}") + return True, next_state, "" + + def _handle_add_phone_verification(self, device_id, user_agent, sec_ch_ua, impersonate, state: FlowState): + phone_service = SMSToMePhoneService(self.config, log_fn=self._log) + if not phone_service.enabled: + self._set_error("OAuth 登录被 add_phone 阻断,当前账号需要手机号验证;未配置可用的 SMSToMe 号码池") + return None + + excluded_prefixes = set() + last_failure = "" + + for attempt in range(phone_service.max_attempts): + try: + entry = phone_service.acquire_phone(exclude_prefixes=excluded_prefixes) + except Exception as e: + last_failure = f"获取手机号失败: {e}" + self._log(last_failure) + break + + if not entry: + last_failure = last_failure or "SMSToMe 号码池中无可用手机号" + break + + prefix = phone_service.prefix_hint(entry.phone) + self._log( + f"步骤5: add_phone 选择手机号 {attempt + 1}/{phone_service.max_attempts}: {entry.phone} ({entry.country_slug})" + ) + + sent, next_state, detail = self._send_phone_number( + entry.phone, + device_id, + user_agent, + sec_ch_ua, + impersonate, + ) + if not sent or not next_state: + last_failure = detail or "add-phone/send 未返回有效状态" + self._log(last_failure) + self._blacklist_phone_if_needed(phone_service, entry, last_failure) + excluded_prefixes.add(prefix) + continue + + if next_state.page_type != "phone_otp_verification" and "phone-verification" not in f"{next_state.continue_url} {next_state.current_url}".lower(): + last_failure = f"add-phone/send 未进入手机验证码页: {describe_flow_state(next_state)}" + self._log(last_failure) + self._blacklist_phone_if_needed(phone_service, entry, last_failure, next_state) + excluded_prefixes.add(prefix) + continue + + session_data = self._decode_oauth_session_cookie() or {} + verification_channel = str(session_data.get("phone_verification_channel") or "sms").strip().lower() or "sms" + bound_phone = str(session_data.get("phone_number") or entry.phone).strip() or entry.phone + self._log(f"add_phone 发码成功: phone={bound_phone}, channel={verification_channel}") + + if verification_channel != "sms": + last_failure = f"add_phone 已切到 {verification_channel} 通道,当前 SMSToMe 仅支持短信接码" + self._log(last_failure) + excluded_prefixes.add(prefix) + continue + + code = phone_service.wait_for_code(entry) + if not code: + self._log("手机号验证码暂未收到,尝试重发一次...") + resend_ok, resend_detail = self._resend_phone_otp( + device_id, + user_agent, + sec_ch_ua, + impersonate, + next_state, + ) + if resend_ok: + code = phone_service.wait_for_code(entry) + if not code: + last_failure = resend_detail or f"手机号 {entry.phone} 未收到短信验证码" + self._log(last_failure) + excluded_prefixes.add(prefix) + continue + + valid, validated_state, detail = self._validate_phone_otp( + code, + device_id, + user_agent, + sec_ch_ua, + impersonate, + next_state, + ) + if not valid or not validated_state: + last_failure = detail or "手机号 OTP 验证失败" + self._log(last_failure) + excluded_prefixes.add(prefix) + continue + + return validated_state + + self._set_error(f"add_phone 阶段失败: {last_failure or '未完成手机号验证'}") + return None def _handle_otp_verification(self, email, device_id, user_agent, sec_ch_ua, impersonate, skymail_client, state): """处理 OAuth 阶段的邮箱 OTP 验证,返回服务端声明的下一步状态。""" @@ -1039,7 +1406,7 @@ class OAuthClient: skymail_client._used_codes = set() tried_codes = set(getattr(skymail_client, "_used_codes", set())) - otp_deadline = time.time() + 30 + otp_deadline = time.time() + 60 otp_sent_at = time.time() def validate_otp(code): @@ -1085,7 +1452,7 @@ class OAuthClient: self._log("使用 wait_for_verification_code 进行阻塞式获取新验证码...") while time.time() < otp_deadline: remaining = max(1, int(otp_deadline - time.time())) - wait_time = min(8, remaining) + wait_time = min(10, remaining) try: code = skymail_client.wait_for_verification_code( email, @@ -1099,6 +1466,8 @@ class OAuthClient: if not code: self._log("暂未收到新的 OTP,继续等待...") + if self.last_error: + break continue if code in tried_codes: @@ -1108,6 +1477,8 @@ class OAuthClient: next_state = validate_otp(code) if next_state: return next_state + if self.last_error: + break else: while time.time() < otp_deadline: messages = skymail_client.fetch_emails(email) or [] @@ -1120,8 +1491,8 @@ class OAuthClient: candidate_codes.append(code) if not candidate_codes: - elapsed = int(30 - max(0, otp_deadline - time.time())) - self._log(f"等待新的 OTP... ({elapsed}s/30s)") + elapsed = int(60 - max(0, otp_deadline - time.time())) + self._log(f"等待新的 OTP... ({elapsed}s/60s)") time.sleep(2) continue @@ -1131,6 +1502,9 @@ class OAuthClient: return next_state time.sleep(2) + if self.last_error: + break - self._log(f"OAuth 阶段 OTP 验证失败,已尝试 {len(tried_codes)} 个验证码") + if not self.last_error: + self._set_error(f"OAuth 阶段 OTP 验证失败,已尝试 {len(tried_codes)} 个验证码") return None diff --git a/platforms/chatgpt/phone_service.py b/platforms/chatgpt/phone_service.py new file mode 100644 index 0000000..2d8be05 --- /dev/null +++ b/platforms/chatgpt/phone_service.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Callable, Iterable, Optional + +from smstome_tool import ( + PhoneEntry, + get_unused_phone, + mark_phone_blacklisted, + parse_country_slugs, + update_global_phone_list, + wait_for_otp, +) + + +def _to_positive_int(value, default: int, *, minimum: int = 1) -> int: + try: + parsed = int(str(value).strip()) + except Exception: + return default + return parsed if parsed >= minimum else default + + +def _prefix_hint(phone: str, width: int = 7) -> str: + value = str(phone or "").strip() + return value[: min(len(value), width)] if value else "" + + +class SMSToMePhoneService: + def __init__(self, config: Optional[dict] = None, log_fn: Optional[Callable[[str], None]] = None): + self.config = dict(config or {}) + self.log_fn = log_fn or (lambda _msg: None) + self.cookie_header = str(self.config.get("smstome_cookie", "") or "").strip() or None + self.country_slugs = parse_country_slugs(self.config.get("smstome_country_slugs")) + self.global_file = Path(str(self.config.get("smstome_global_file") or "smstome_all_numbers.txt")) + self.used_numbers_dir = Path(str(self.config.get("smstome_used_numbers_dir") or "smstome_used")) + self.task_name = str(self.config.get("smstome_task_name") or "chatgpt_add_phone").strip() or "chatgpt_add_phone" + self.max_attempts = _to_positive_int(self.config.get("smstome_phone_attempts"), 3) + self.otp_timeout_seconds = _to_positive_int(self.config.get("smstome_otp_timeout_seconds"), 45, minimum=10) + self.poll_interval_seconds = _to_positive_int(self.config.get("smstome_poll_interval_seconds"), 5, minimum=1) + self.sync_max_pages_per_country = _to_positive_int( + self.config.get("smstome_sync_max_pages_per_country"), + 5, + ) + + @property + def enabled(self) -> bool: + return self._has_pool_file() or bool(self.cookie_header) + + def prefix_hint(self, phone: str) -> str: + return _prefix_hint(phone) + + def _has_pool_file(self) -> bool: + try: + return self.global_file.exists() and self.global_file.stat().st_size > 0 + except OSError: + return False + + def ensure_pool_ready(self) -> None: + if self._has_pool_file(): + return + if not self.cookie_header: + raise RuntimeError("未找到 SMSToMe 号码池文件,且未配置 smstome_cookie") + + self.log_fn("SMSToMe 号码池不存在,开始自动同步...") + count = update_global_phone_list( + cookie_header=self.cookie_header, + countries=self.country_slugs or None, + output_path=self.global_file, + max_pages_per_country=self.sync_max_pages_per_country, + ) + if count <= 0: + raise RuntimeError("SMSToMe 号码池同步后为空") + self.log_fn(f"SMSToMe 号码池同步完成,共 {count} 个号码") + + def acquire_phone(self, *, exclude_prefixes: Optional[Iterable[str]] = None) -> Optional[PhoneEntry]: + self.ensure_pool_ready() + return get_unused_phone( + self.task_name, + country_slug=self.country_slugs or None, + global_file=self.global_file, + used_numbers_dir=self.used_numbers_dir, + exclude_prefixes=exclude_prefixes, + ) + + def mark_blacklisted(self, phone: str) -> None: + mark_phone_blacklisted(self.task_name, phone, used_numbers_dir=self.used_numbers_dir) + + def wait_for_code(self, entry: PhoneEntry, *, timeout: Optional[int] = None) -> Optional[str]: + wait_seconds = _to_positive_int(timeout, self.otp_timeout_seconds, minimum=10) + return wait_for_otp( + entry, + cookie_header=self.cookie_header, + timeout=wait_seconds, + poll_interval=self.poll_interval_seconds, + trace=lambda message: self.log_fn(f"[SMSToMe] {message}"), + raise_on_timeout=False, + ) diff --git a/platforms/chatgpt/plugin.py b/platforms/chatgpt/plugin.py index e0583ea..c10adb3 100644 --- a/platforms/chatgpt/plugin.py +++ b/platforms/chatgpt/plugin.py @@ -84,6 +84,7 @@ class ChatGPTPlatform(BasePlatform): browser_mode=browser_mode, callback_logger=log_fn, max_retries=max_retries, + extra_config=(self.config.extra or {}), ) engine.email = email engine.password = password @@ -116,6 +117,7 @@ class ChatGPTPlatform(BasePlatform): browser_mode=browser_mode, callback_logger=log_fn, max_retries=max_retries, + extra_config=(self.config.extra or {}), ) if email: engine.email = email diff --git a/platforms/chatgpt/register_v2.py b/platforms/chatgpt/register_v2.py index 33b99b0..e83446c 100644 --- a/platforms/chatgpt/register_v2.py +++ b/platforms/chatgpt/register_v2.py @@ -12,6 +12,7 @@ from core.base_platform import AccountStatus from platforms.chatgpt.register import RegistrationResult from .chatgpt_client import ChatGPTClient +from .oauth_client import OAuthClient from .utils import generate_random_name, generate_random_birthday logger = logging.getLogger(__name__) @@ -46,6 +47,7 @@ class RegistrationEngineV2: callback_logger: Optional[Callable[[str], None]] = None, task_uuid: Optional[str] = None, max_retries: int = 3, + extra_config: Optional[dict] = None, ): self.email_service = email_service self.proxy_url = proxy_url @@ -53,6 +55,7 @@ class RegistrationEngineV2: self.callback_logger = callback_logger self.task_uuid = task_uuid self.max_retries = max(1, int(max_retries or 1)) + self.extra_config = dict(extra_config or {}) self.email = None self.password = None @@ -69,6 +72,12 @@ class RegistrationEngineV2: else: logger.info(log_message) + def _format_oauth_failure(self, oauth_client: OAuthClient) -> str: + detail = str(getattr(oauth_client, "last_error", "") or "").strip() + if not detail: + detail = "获取最终 OAuth Tokens 失败" + return f"账号已创建成功,但 {detail}" + def _should_retry(self, message: str) -> bool: text = str(message or "").lower() retriable_markers = [ @@ -151,7 +160,7 @@ class RegistrationEngineV2: result.error_message = last_error return result - self._log("步骤 2/2: 复用注册会话,直接获取 ChatGPT Session / AccessToken...") + self._log("步骤 2/2: 优先复用注册会话提取 ChatGPT Session / AccessToken...") session_ok, session_result = chatgpt_client.reuse_session_and_get_tokens() if session_ok: @@ -181,10 +190,66 @@ class RegistrationEngineV2: self._log("=" * 60) return result - last_error = f"注册成功,但复用会话获取 AccessToken 失败: {session_result}" - if attempt < self.max_retries - 1: - self._log(f"{last_error},准备整流程重试") - continue + self._log(f"复用会话失败,回退到 OAuth 登录补全流程: {session_result}") + tokens = None + oauth_client = None + for oauth_attempt in range(2): + if oauth_attempt > 0: + self._log(f"同账号 OAuth 重试 {oauth_attempt + 1}/2 ...") + time.sleep(1) + + oauth_client = OAuthClient( + config=self.extra_config, + proxy=self.proxy_url, + verbose=False, + browser_mode=self.browser_mode, + ) + oauth_client._log = self._log + oauth_client.session = chatgpt_client.session + + tokens = oauth_client.login_and_get_tokens( + email_addr, + pwd, + chatgpt_client.device_id, + chatgpt_client.ua, + chatgpt_client.sec_ch_ua, + chatgpt_client.impersonate, + skymail_adapter, + ) + if tokens and tokens.get("access_token"): + break + + if oauth_client.last_error and "add_phone" in oauth_client.last_error: + break + + if tokens and tokens.get("access_token"): + self._log("OAuth 回退补全成功!") + result.success = True + result.access_token = tokens.get("access_token") + result.refresh_token = tokens.get("refresh_token") + result.id_token = tokens.get("id_token") + result.account_id = "v2_acct_" + chatgpt_client.device_id[:8] + + session_data = oauth_client._decode_oauth_session_cookie() + if session_data: + workspaces = session_data.get("workspaces", []) + if workspaces: + result.workspace_id = str((workspaces[0] or {}).get("id") or "") + self._log(f"成功萃取 Workspace ID: {result.workspace_id}") + + session_cookie = None + for cookie in oauth_client.session.cookies.jar: + if cookie.name == "__Secure-next-auth.session-token": + session_cookie = cookie.value + break + result.session_token = session_cookie + + self._log("=" * 60) + self._log("注册流程成功结束!") + self._log("=" * 60) + return result + + last_error = self._format_oauth_failure(oauth_client) result.error_message = last_error return result except Exception as attempt_error: diff --git a/smstome_tool.py b/smstome_tool.py new file mode 100644 index 0000000..fbd3f39 --- /dev/null +++ b/smstome_tool.py @@ -0,0 +1,1122 @@ +from __future__ import annotations + +"""SMSToMe phone pool + OTP helper. + +该文件是一个**单独的工具脚本**,负责: + +1. `update_global_phone_list`:抓取多个国家的全部可用手机号,写入本地 txt。 +2. `get_unused_phone`:针对某个任务名,返回一个尚未使用过的手机号。 +3. `wait_for_otp`:轮询该手机号的短信页面,提取验证码。 + +实现细节: + - 基于 `httpx` + `selectolax` 的 HTTP + HTML 解析方案; + - 默认使用浏览器风格 UA,禁用系统代理 (`trust_env=False`),避免影响 Tavily 相关代理行为; + - 支持通过环境变量 `SMSTOME_COOKIE`、仓库根目录 `config.yaml` 或显式参数注入 Cookie; + - 使用简单的循环 + 退避重试,避免额外引入 tenacity 依赖。 + +注意: + - txt 持久化仅做简单记录,不做数据库级别的状态管理; + - 全量号码文件中会额外保存国家 slug 与详情页 URL,方便后续获取验证码; + - 每个任务的“已使用号码列表”是独立的 txt 文件,仅按手机号一行记录。 +""" + +import os +import random +import re +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Callable, Dict, Iterable, List, Optional + +import httpx +from selectolax.parser import HTMLParser +from urllib.parse import urljoin, urlsplit + +try: + from runtime_support import get_nonempty_str, load_yaml_config +except ImportError: + def get_nonempty_str(mapping, *keys): + data = mapping if isinstance(mapping, dict) else {} + for key in keys: + value = str(data.get(key, "") or "").strip() + if value: + return value + return "" + + def load_yaml_config(config_path): + path = Path(config_path) + if not path.exists(): + return {} + try: + import yaml + except ImportError: + return {} + try: + loaded = yaml.safe_load(path.read_text(encoding="utf-8")) + except Exception: + return {} + return loaded if isinstance(loaded, dict) else {} + + +SMSTOME_BASE_URL = "https://smstome.com" +DEFAULT_CONFIG_PATH = Path(__file__).with_name("config.yaml") + +# 当前支持的国家 slug(来自站点 URL) +DEFAULT_COUNTRY_SLUGS: List[str] = [ + "poland", + "united-kingdom", + "slovenia", + "sweden", + "finland", + "belgium", +] + + +# 全量号码列表文件(每行:phone\tcountry_slug\tdetail_url) +GLOBAL_PHONE_FILE = Path("smstome_all_numbers.txt") +DEFAULT_SYNC_MAX_PAGES_PER_COUNTRY = 5 + +# 每个任务自己的“已使用号码”目录(文件名:_used_numbers.txt) +USED_NUMBERS_DIR = Path("smstome_used") +BLACKLISTED_NUMBERS_SUFFIX = "_blacklisted_numbers.txt" +USED_NUMBERS_SUFFIX = "_used_numbers.txt" +PHONE_PREFIX_WIDTH = 7 + + +DEFAULT_HEADERS = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/123.0.0.0 Safari/537.36" + ), + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.9", + "Connection": "keep-alive", +} + +OTP_SEPARATOR_CHARS = r"[\s\-]" +OTP_BIDI_CHARS_RE = re.compile(r"[\u200e\u200f\u202a-\u202e\u2066-\u2069]") +OTP_SPLIT_CANDIDATE_RE = re.compile(r"(? int: + value = int(start_page or 1) + if value < 1: + raise ValueError(f"start_page must be >= 1, got {start_page}") + return value + + +def _resolve_country_page_window( + *, + detected_max_page: int, + start_page: int = 1, + max_pages_per_country: Optional[int] = DEFAULT_SYNC_MAX_PAGES_PER_COUNTRY, +) -> list[int]: + start = _normalize_start_page(start_page) + if detected_max_page < start: + return [] + if max_pages_per_country is None: + end_page = detected_max_page + else: + if max_pages_per_country < 1: + raise ValueError(f"max_pages_per_country must be >= 1, got {max_pages_per_country}") + end_page = min(detected_max_page, start + max_pages_per_country - 1) + return list(range(start, end_page + 1)) + + +def _normalize_message_text_for_otp(message_text: str) -> str: + text = OTP_BIDI_CHARS_RE.sub("", message_text or "") + return text.strip() + + +def _extract_otp_from_text( + message_text: str, + *, + min_digits: int = 4, + max_digits: int = 8, +) -> Optional[str]: + text = _normalize_message_text_for_otp(message_text) + if not text: + return None + + for match in OTP_SPLIT_CANDIDATE_RE.finditer(text): + digits = re.sub(OTP_SEPARATOR_CHARS, "", match.group(1)) + if min_digits <= len(digits) <= max_digits: + return digits + return None + + +def _extract_recent_6digit_otp(message_text: str, received_text: str) -> Optional[str]: + """优先匹配“最近约 1 分钟内”的 6 位验证码。""" + + msg = (message_text or "").strip() + recv = (received_text or "").strip().lower() + if not msg: + return None + + recent_markers = ( + "just now", + "few seconds", + "second ago", + "seconds ago", + "sec ago", + "secs ago", + "now", + ) + is_recent = any(marker in recv for marker in recent_markers) + + if not is_recent: + # 兼容 "1 min ago" / "1 minute ago" 等形式 + minute_match = re.search(r"(\d+)\s*(m|min|mins|minute|minutes)\b", recv) + if minute_match: + is_recent = int(minute_match.group(1)) <= 1 + + if not is_recent: + return None + + return _extract_otp_from_text(msg, min_digits=6, max_digits=6) + + +def _parse_received_age_minutes(received_text: str) -> Optional[float]: + recv = (received_text or "").strip().lower() + if not recv: + return None + + immediate_markers = ( + "just now", + "few seconds", + "second ago", + "seconds ago", + "sec ago", + "secs ago", + "moments ago", + "now", + ) + if any(marker in recv for marker in immediate_markers): + return 0.0 + + if re.search(r"\ban?\s+(m|min|mins|minute|minutes)\b", recv): + return 1.0 + if re.search(r"\ban?\s+(h|hr|hrs|hour|hours)\b", recv): + return 60.0 + if "yesterday" in recv: + return 24.0 * 60.0 + + match = re.search( + r"(\d+)\s*(s|sec|secs|second|seconds|m|min|mins|minute|minutes|h|hr|hrs|hour|hours|d|day|days)\b", + recv, + ) + if not match: + return None + + value = int(match.group(1)) + unit = match.group(2) + if unit.startswith("s"): + return value / 60.0 + if unit.startswith("m"): + return float(value) + if unit.startswith("h"): + return float(value) * 60.0 + if unit.startswith("d"): + return float(value) * 24.0 * 60.0 + return None + + +@dataclass(frozen=True) +class PhoneEntry: + """代表一个 SMSToMe 手机号记录。""" + + country_slug: str + phone: str # e.g. "+48573583699" + detail_url: str # e.g. "https://smstome.com/poland/phone/48573583699/sms/14642" + + +@dataclass(frozen=True) +class SmsMessage: + """单条短信记录。""" + + from_label: str + received_text: str + message_text: str + + +class SmsOtpPollingError(RuntimeError): + pass + + +class SmsInboxEmptyError(SmsOtpPollingError): + pass + + +class SmsOtpTimeoutError(SmsOtpPollingError): + pass + + +class SmsOtpFetchError(SmsOtpPollingError): + pass + + +def _summarize_sms_message(message: SmsMessage | None, *, max_len: int = 96) -> str: + if message is None: + return "none" + snippet = " ".join((message.message_text or "").split()) + if len(snippet) > max_len: + snippet = snippet[: max_len - 3] + "..." + return ( + f"from={message.from_label!r}, received={message.received_text!r}, " + f"text={snippet!r}" + ) + + +def _classify_timeout_state( + *, + latest_message: SmsMessage | None, + unmatched_new_message_count: int, +) -> str: + if latest_message is None: + return "empty-inbox" + if unmatched_new_message_count > 0: + return "new-messages-no-otp" + return "stale-inbox-no-new-messages" + + +def _has_recent_sms_history( + messages: Iterable[SmsMessage], + *, + max_age_minutes: float = DEFAULT_RECENT_HISTORY_MINUTES, +) -> bool: + for message in messages: + age_minutes = _parse_received_age_minutes(message.received_text) + if age_minutes is None: + continue + if age_minutes <= max_age_minutes: + return True + return False + + +def _parse_cookie_header(cookie_header: str) -> Dict[str, str]: + """将浏览器复制的 Cookie 字符串解析为字典。 + + 例如: + "a=1; b=2; cf_clearance=xxx" -> {"a": "1", "b": "2", "cf_clearance": "xxx"} + """ + + cookies: Dict[str, str] = {} + for part in cookie_header.split(";"): + part = part.strip() + if not part or "=" not in part: + continue + name, value = part.split("=", 1) + name = name.strip() + value = value.strip() + if name: + cookies[name] = value + return cookies + + +def _load_cookie_from_config(config_path: Path | str | None = None) -> Optional[str]: + try: + from core.config_store import config_store + + stored = str(config_store.get("smstome_cookie", "") or "").strip() + if stored: + return stored + except Exception: + pass + + config = load_yaml_config(config_path or DEFAULT_CONFIG_PATH) + return get_nonempty_str(config, "SMSTOME_COOKIE", "smstome_cookie") + + +def _resolve_cookie_header(cookie_header: Optional[str]) -> str: + explicit_cookie = (cookie_header or "").strip() + if explicit_cookie: + return explicit_cookie + + env_cookie = os.getenv("SMSTOME_COOKIE", "").strip() + if env_cookie: + return env_cookie + + return _load_cookie_from_config() or "" + + +def _build_client(*, cookie_header: Optional[str], timeout: float) -> httpx.Client: + """构造 httpx.Client,注入 UA 和可选 Cookie,禁用系统代理。""" + + headers = dict(DEFAULT_HEADERS) + cookie_header = _resolve_cookie_header(cookie_header) + + cookies: Dict[str, str] = {} + if cookie_header: + cookies.update(_parse_cookie_header(cookie_header)) + + client = httpx.Client( + headers=headers, + cookies=cookies, + timeout=timeout, + follow_redirects=True, + trust_env=False, # 不继承环境代理,避免影响 Tavily 流量策略 + ) + return client + + +def _polite_sleep(base_delay: float, jitter: float) -> None: + """在请求之间添加一点随机延迟,用于简单规避风控。 + + Args: + base_delay: 基础延迟秒数,<=0 表示不等待。 + jitter: 抖动上限秒数,>0 时会在 [0, jitter] 之间随机增加额外延迟。 + """ + + if base_delay <= 0: + return + extra = random.uniform(0, jitter) if jitter > 0 else 0.0 + time.sleep(base_delay + extra) + + +def _fetch_with_retries( + client: httpx.Client, + url: str, + *, + max_attempts: int = 3, + backoff_factor: float = 0.5, +) -> str: + """带简单重试的 GET 请求,返回文本内容。 + + - 对网络异常 / 5xx 做有限次重试; + - 对 4xx(例如 403/404)不做额外特殊处理,直接抛出。 + """ + + last_exc: Optional[Exception] = None + for attempt in range(1, max_attempts + 1): + try: + resp = client.get(url) + resp.raise_for_status() + return resp.text + except (httpx.RequestError, httpx.HTTPStatusError) as exc: # noqa: PERF203 + last_exc = exc + # 4xx 错误通常不需要重试 + status = getattr(exc, "response", None) + status_code = getattr(status, "status_code", None) + if isinstance(status_code, int) and 400 <= status_code < 500: + raise + + if attempt >= max_attempts: + raise + sleep_s = backoff_factor * attempt + time.sleep(sleep_s) + + # 正常逻辑不会走到这里 + raise RuntimeError(f"Failed to fetch {url!r}: {last_exc}") + + +def _detect_max_page(tree: HTMLParser) -> int: + """从国家列表页中解析最大页码,若没有分页则返回 1。""" + + max_page = 1 + # 仅关注包含 `?page=` 的链接,避免抓到其它数字 + for a in tree.css("a[href*='?page=']"): + text = (a.text() or "").strip() + if text.isdigit(): + try: + value = int(text) + except ValueError: + continue + if value > max_page: + max_page = value + return max_page + + +def _collect_numbers_from_country_page( + tree: HTMLParser, + country_slug: str, + phone_map: Dict[str, PhoneEntry], +) -> None: + """从单个国家页解析所有号码并写入 phone_map。""" + + for article in tree.css("article"): + link = article.css_first("a[href*='/phone/']") + if link is None: + continue + phone_text = (link.text() or "").strip() + if not phone_text: + continue + href = (link.attributes.get("href") or "").strip() + if not href: + continue + + detail_url = urljoin(SMSTOME_BASE_URL + "/", href) + # 以手机号去重,后出现的记录会覆盖之前的(一般无影响) + phone_map[phone_text] = PhoneEntry( + country_slug=country_slug, + phone=phone_text, + detail_url=detail_url, + ) + + +def _find_phone_entry_on_country_page( + tree: HTMLParser, + *, + phone: str, + country_slug: str, +) -> Optional[PhoneEntry]: + target_phone = (phone or "").strip() + if not target_phone: + return None + + phone_map: Dict[str, PhoneEntry] = {} + _collect_numbers_from_country_page(tree, country_slug, phone_map) + return phone_map.get(target_phone) + + +def resolve_live_phone_entry( + entry: PhoneEntry, + *, + cookie_header: Optional[str] = None, + request_timeout: float = 20.0, + http_max_attempts: int = 3, + max_pages_per_country: Optional[int] = None, + start_page: int = 1, + per_page_delay: float = 0.0, + jitter: float = 0.0, +) -> Optional[PhoneEntry]: + detail_host = (urlsplit(entry.detail_url).netloc or "").lower() + if "smstome.com" not in detail_host: + return entry + + client = _build_client(cookie_header=cookie_header, timeout=request_timeout) + try: + first_url = f"{SMSTOME_BASE_URL}/country/{entry.country_slug}" + first_page_html = _fetch_with_retries(client, first_url, max_attempts=http_max_attempts) + first_tree = HTMLParser(first_page_html) + page_window = _resolve_country_page_window( + detected_max_page=_detect_max_page(first_tree), + start_page=start_page, + max_pages_per_country=max_pages_per_country, + ) + if not page_window: + return entry + + for index, page in enumerate(page_window): + if page == 1: + html = first_page_html + else: + if index > 0: + _polite_sleep(per_page_delay, jitter) + html = _fetch_with_retries( + client, + f"{first_url}?page={page}", + max_attempts=http_max_attempts, + ) + tree = HTMLParser(html) + resolved = _find_phone_entry_on_country_page( + tree, + phone=entry.phone, + country_slug=entry.country_slug, + ) + if resolved is not None: + return resolved + if index + 1 < len(page_window): + _polite_sleep(per_page_delay, jitter) + return entry + finally: + client.close() + + +def update_global_phone_list( + *, + cookie_header: Optional[str] = None, + countries: Optional[Iterable[str]] = None, + output_path: Path | str = GLOBAL_PHONE_FILE, + request_timeout: float = 20.0, + http_max_attempts: int = 3, + max_pages_per_country: Optional[int] = DEFAULT_SYNC_MAX_PAGES_PER_COUNTRY, + start_page: int = 1, + per_page_delay: float = 1.0, + per_country_delay: float = 3.0, + jitter: float = 0.5, + require_recent_history: bool = True, + recent_history_minutes: float = DEFAULT_RECENT_HISTORY_MINUTES, +) -> int: + """抓取多个国家的号码并写入 txt 文件。 + + txt 格式:每行 `phone\tcountry_slug\tdetail_url`,例如: + + +48573583699 poland https://smstome.com/poland/phone/48573583699/sms/14642 + + Args: + cookie_header: 可选的 Cookie 字符串;若为 None,则尝试从 + `SMSTOME_COOKIE` 环境变量,再回退到仓库根目录 `config.yaml` + 读取。 + countries: 需要同步的国家 slug 列表;若为 None,则使用 + DEFAULT_COUNTRY_SLUGS。 + output_path: 全量号码 txt 文件路径。 + request_timeout: HTTP 请求超时时间(秒)。 + http_max_attempts: 单个请求的最大重试次数。 + max_pages_per_country: 从 start_page 开始,最多抓取多少页,默认 5。 + start_page: 每个国家从第几页开始抓,默认 1。 + per_page_delay: 每翻一页之间的基础延迟(秒),默认 1s。 + per_country_delay: 每个国家抓取完成后的基础延迟(秒),默认 3s。 + jitter: 额外抖动上限(秒),会在 [0, jitter] 内随机增加到延迟上, + 用于让访问节奏更“人类化”。 + + Returns: + 写入文件的去重后手机号数量。 + """ + + if countries is None: + countries = DEFAULT_COUNTRY_SLUGS + + client = _build_client(cookie_header=cookie_header, timeout=request_timeout) + try: + phone_map: Dict[str, PhoneEntry] = {} + + for country_slug in countries: + first_url = f"{SMSTOME_BASE_URL}/country/{country_slug}" + first_page_html = _fetch_with_retries(client, first_url, max_attempts=http_max_attempts) + first_tree = HTMLParser(first_page_html) + page_window = _resolve_country_page_window( + detected_max_page=_detect_max_page(first_tree), + start_page=start_page, + max_pages_per_country=max_pages_per_country, + ) + + for index, page in enumerate(page_window): + if page == 1: + html = first_page_html + else: + if index > 0: + _polite_sleep(per_page_delay, jitter) + url = f"{first_url}?page={page}" + html = _fetch_with_retries(client, url, max_attempts=http_max_attempts) + tree = HTMLParser(html) + _collect_numbers_from_country_page(tree, country_slug, phone_map) + if page == 1 and index + 1 < len(page_window): + _polite_sleep(per_page_delay, jitter) + + # 每个国家抓取完后再稍微停顿一下 + _polite_sleep(per_country_delay, jitter) + + if require_recent_history: + filtered_phone_map: Dict[str, PhoneEntry] = {} + for phone in sorted(phone_map.keys()): + entry = phone_map[phone] + try: + messages = _fetch_sms_messages( + client, + entry.detail_url, + http_max_attempts=http_max_attempts, + ) + except Exception: + continue + if _has_recent_sms_history( + messages, + max_age_minutes=recent_history_minutes, + ): + filtered_phone_map[phone] = entry + phone_map = filtered_phone_map + + output = Path(output_path) + output.parent.mkdir(parents=True, exist_ok=True) + + # 仅要求“记录全量号码”,但为了后续方便,额外保存国家与详情 URL。 + with output.open("w", encoding="utf-8") as f: + for phone in sorted(phone_map.keys()): + entry = phone_map[phone] + f.write(f"{entry.phone}\t{entry.country_slug}\t{entry.detail_url}\n") + + return len(phone_map) + finally: + client.close() + + +def load_global_phone_index(path: Path | str = GLOBAL_PHONE_FILE) -> Dict[str, PhoneEntry]: + """从全量号码 txt 文件中加载索引。""" + + phone_index: Dict[str, PhoneEntry] = {} + file_path = Path(path) + if not file_path.exists(): + raise FileNotFoundError(f"Global phone list not found: {file_path}") + + with file_path.open("r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + parts = line.split("\t") + if len(parts) < 3: + continue + phone, country_slug, detail_url = parts[0], parts[1], parts[2] + phone_index[phone] = PhoneEntry( + country_slug=country_slug, + phone=phone, + detail_url=detail_url, + ) + + return phone_index + + +def _sanitize_task_name(task_name: str) -> str: + """将任务名转换为适合作为文件名的形式。""" + + return re.sub(r"[^a-zA-Z0-9_.-]", "_", task_name) + + +def _used_numbers_file(task_name: str, *, base_dir: Path | str = USED_NUMBERS_DIR) -> Path: + """返回某个任务对应的“已使用号码”文件路径。""" + + safe_name = _sanitize_task_name(task_name) + directory = Path(base_dir) + directory.mkdir(parents=True, exist_ok=True) + return directory / f"{safe_name}{USED_NUMBERS_SUFFIX}" + + +def _blacklisted_numbers_file(task_name: str, *, base_dir: Path | str = USED_NUMBERS_DIR) -> Path: + """返回某个任务对应的“黑名单号码”文件路径。""" + + safe_name = _sanitize_task_name(task_name) + directory = Path(base_dir) + directory.mkdir(parents=True, exist_ok=True) + return directory / f"{safe_name}{BLACKLISTED_NUMBERS_SUFFIX}" + + +def _load_phone_set(path: Path) -> set[str]: + values: set[str] = set() + if not path.exists(): + return values + with path.open("r", encoding="utf-8") as f: + for line in f: + value = line.strip() + if value: + values.add(value) + return values + + +def _phone_prefix_hint(phone: str, *, width: int = PHONE_PREFIX_WIDTH) -> str: + value = (phone or "").strip() + if not value: + return "" + return value[: min(len(value), width)] + + +def mark_phone_blacklisted( + task_name: str, + phone: str, + *, + used_numbers_dir: Path | str = USED_NUMBERS_DIR, +) -> None: + phone_value = (phone or "").strip() + if not phone_value: + return + + blacklist_file = _blacklisted_numbers_file(task_name, base_dir=used_numbers_dir) + existing = _load_phone_set(blacklist_file) + if phone_value in existing: + return + with blacklist_file.open("a", encoding="utf-8") as f: + f.write(phone_value + "\n") + + +def parse_country_slugs(country_slug: Optional[str | Iterable[str]]) -> list[str]: + if country_slug is None: + return [] + + if isinstance(country_slug, str): + raw_parts = re.split(r"[\s,;|]+", country_slug.strip()) + else: + raw_parts = [] + for item in country_slug: + raw_parts.extend(re.split(r"[\s,;|]+", str(item).strip())) + + normalized: list[str] = [] + seen: set[str] = set() + for part in raw_parts: + value = part.strip().lower().replace("_", "-") + if not value or value in seen: + continue + seen.add(value) + normalized.append(value) + return normalized + + +def get_unused_phone( + task_name: str, + *, + country_slug: Optional[str | Iterable[str]] = None, + global_file: Path | str = GLOBAL_PHONE_FILE, + used_numbers_dir: Path | str = USED_NUMBERS_DIR, + exclude_prefixes: Optional[Iterable[str]] = None, +) -> Optional[PhoneEntry]: + """返回一个对指定任务尚未使用过的手机号,并立即标记为已使用。 + + 调用者应在调用前先运行一次 `update_global_phone_list`,确保 + `global_file` 是最新的。 + + Args: + task_name: 任务名称(例如目标站点标识),用于区分不同任务的 + 使用记录文件。 + country_slug: 若指定,则仅从该国家或国家列表中选择;支持单个 + slug、逗号分隔字符串或可迭代 slug 集合。为 None 表示任意国家。 + global_file: 全量号码文件路径。 + used_numbers_dir: 每个任务“已使用号码”文件所在目录。 + exclude_prefixes: 可选的手机号前缀黑名单;用于在单次流程里避开 + 已明确被目标站点拒绝的号段。 + + Returns: + 未使用过的 PhoneEntry;若没有可用号码则返回 None。 + """ + + phone_index = load_global_phone_index(global_file) + + used_file = _used_numbers_file(task_name, base_dir=used_numbers_dir) + blacklist_file = _blacklisted_numbers_file(task_name, base_dir=used_numbers_dir) + used_numbers = _load_phone_set(used_file) + blacklisted_numbers = _load_phone_set(blacklist_file) + excluded_prefixes = { + prefix + for prefix in (_phone_prefix_hint(value) for value in (exclude_prefixes or ())) + if prefix + } + + country_slugs = parse_country_slugs(country_slug) + + candidates = [ + entry + for entry in phone_index.values() + if (not country_slugs or entry.country_slug in country_slugs) + and entry.phone not in used_numbers + and entry.phone not in blacklisted_numbers + and _phone_prefix_hint(entry.phone) not in excluded_prefixes + ] + if not candidates: + return None + + remaining = list(candidates) + while remaining: + entry = random.choice(remaining) + remaining.remove(entry) + try: + refreshed_entry = resolve_live_phone_entry(entry) + except Exception: + refreshed_entry = entry + if refreshed_entry is None: + continue + with used_file.open("a", encoding="utf-8") as f: + f.write(refreshed_entry.phone + "\n") + return refreshed_entry + return None + + +def _fetch_sms_messages( + client: httpx.Client, + detail_url: str, + *, + http_max_attempts: int, +) -> List[SmsMessage]: + """抓取某个号码主页(第一页)的短信列表。""" + + html = _fetch_with_retries(client, detail_url, max_attempts=http_max_attempts) + tree = HTMLParser(html) + + # 页面中只有一个主要的短信表格,这里直接取第一个 table 即可。 + table = tree.css_first("table") + if table is None: + return [] + + messages: List[SmsMessage] = [] + for tr in table.css("tr"): + # 跳过表头行(包含 th) + if tr.css_first("th") is not None: + continue + tds = tr.css("td") + if len(tds) < 3: + continue + from_label = tds[0].text(strip=True) + received_text = tds[1].text(strip=True) + message_text = tds[2].text(separator=" ", strip=True) + if not message_text: + continue + messages.append( + SmsMessage( + from_label=from_label, + received_text=received_text, + message_text=message_text, + ) + ) + + return messages + + +def wait_for_otp( + entry: PhoneEntry, + *, + cookie_header: Optional[str] = None, + timeout: float = 120.0, + poll_interval: float = 5.0, + otp_regex: str = r"\b(\d{4,8})\b", + http_max_attempts: int = 3, + trace: Callable[[str], None] | None = None, + raise_on_timeout: bool = False, +) -> Optional[str]: + """轮询指定手机号短信,提取验证码并返回。 + + 基本逻辑: + 1. 启动时抓取一次当前短信列表,记录为已见; + 2. 在给定 `timeout` 内,每隔 `poll_interval` 秒重新抓取; + 3. 对每条“未见过”的短信,用 `otp_regex` 匹配验证码; + 4. 匹配成功则返回第一个验证码;超时则返回 None。 + + Args: + entry: 通过 `get_unused_phone` 或其它方式得到的 PhoneEntry。 + cookie_header: 可选 Cookie 字符串;若为 None,则尝试从 + `SMSTOME_COOKIE` 环境变量,再回退到仓库根目录 `config.yaml` + 读取。 + timeout: 最大等待时间(秒)。 + poll_interval: 轮询间隔(秒)。 + otp_regex: 用于从短信中提取验证码的正则,默认匹配 4–8 位数字。 + http_max_attempts: 每次抓取短信时的 HTTP 重试次数。 + trace: 可选日志回调;若提供,会输出每轮轮询的诊断摘要。 + raise_on_timeout: 若为 True,超时后抛出更具体的异常,而不是返回 None。 + + Returns: + 匹配到的验证码字符串;若超时未获得则返回 None。 + """ + + client = _build_client(cookie_header=cookie_header, timeout=timeout) + pattern = re.compile(otp_regex) + emit = trace or (lambda _msg: None) + + seen_messages: set[str] = set() + unmatched_new_message_count = 0 + latest_unmatched_message: SmsMessage | None = None + + def _fetch_messages(phase: str, *, poll_number: int | None = None) -> List[SmsMessage]: + try: + return _fetch_sms_messages( + client, entry.detail_url, http_max_attempts=http_max_attempts + ) + except Exception as exc: + label = f"{phase} fetch-error" + if poll_number is not None: + label += f" poll={poll_number}" + emit(f"{label} type={type(exc).__name__} error={exc}") + raise SmsOtpFetchError( + f"SMSToMe {phase} fetch failed for {entry.phone}: {exc}" + ) from exc + + # 初始抓取,避免把历史短信误当成“新短信” + initial_messages = _fetch_messages("initial") + latest_message = initial_messages[0] if initial_messages else None + latest_snapshot = ( + latest_message.from_label, + latest_message.received_text, + latest_message.message_text, + ) if latest_message else None + emit( + f"poll start phone={entry.phone} messages={len(initial_messages)} " + f"latest={_summarize_sms_message(latest_message)}" + ) + if initial_messages: + quick_otp = _extract_recent_6digit_otp( + latest_message.message_text, + latest_message.received_text, + ) + if quick_otp: + emit( + "matched quick recent OTP " + f"code={quick_otp} latest={_summarize_sms_message(latest_message)}" + ) + return quick_otp + + for msg in initial_messages: + seen_messages.add(msg.message_text) + + deadline = time.monotonic() + timeout + poll_count = 0 + + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + timeout_state = _classify_timeout_state( + latest_message=latest_message, + unmatched_new_message_count=unmatched_new_message_count, + ) + summary = ( + f"final state={timeout_state} polls={poll_count} " + f"latest={_summarize_sms_message(latest_message)}" + ) + if latest_unmatched_message is not None: + summary += ( + " first_unmatched_new=" + + _summarize_sms_message(latest_unmatched_message) + ) + emit(summary) + emit( + f"timeout after {poll_count} poll(s); latest={_summarize_sms_message(latest_message)}" + ) + if raise_on_timeout: + if latest_message is None: + raise SmsInboxEmptyError( + f"SMSToMe inbox stayed empty for {entry.phone} after {poll_count} poll(s)" + ) + raise SmsOtpTimeoutError( + f"SMSToMe OTP timeout state={timeout_state} for {entry.phone} " + f"after {poll_count} poll(s); latest={_summarize_sms_message(latest_message)}" + ) + return None + + sleep_s = min(poll_interval, max(remaining, 0)) + if sleep_s > 0: + time.sleep(sleep_s) + + poll_count += 1 + messages = _fetch_messages("poll", poll_number=poll_count) + latest_message = messages[0] if messages else None + current_snapshot = ( + latest_message.from_label, + latest_message.received_text, + latest_message.message_text, + ) if latest_message else None + new_count = sum(1 for msg in messages if msg.message_text not in seen_messages) + if poll_count <= 3 or current_snapshot != latest_snapshot or new_count: + emit( + f"poll {poll_count}: messages={len(messages)} new={new_count} " + f"latest={_summarize_sms_message(latest_message)}" + ) + latest_snapshot = current_snapshot + if messages: + quick_otp = _extract_recent_6digit_otp( + latest_message.message_text, + latest_message.received_text, + ) + if quick_otp: + emit( + "matched quick recent OTP " + f"code={quick_otp} latest={_summarize_sms_message(latest_message)}" + ) + return quick_otp + + for msg in messages: + if msg.message_text in seen_messages: + continue + seen_messages.add(msg.message_text) + unmatched_new_message_count += 1 + latest_unmatched_message = msg + normalized_text = _normalize_message_text_for_otp(msg.message_text) + match = pattern.search(normalized_text) + if match: + code = re.sub(OTP_SEPARATOR_CHARS, "", match.group(1)) + emit(f"matched regex OTP code={code} message={_summarize_sms_message(msg)}") + return code + fallback_otp = _extract_otp_from_text(msg.message_text) + if fallback_otp: + emit( + f"matched fallback OTP code={fallback_otp} " + f"message={_summarize_sms_message(msg)}" + ) + return fallback_otp + if new_count and latest_unmatched_message is not None: + emit( + "new messages arrived without OTP match " + f"count={new_count} sample={_summarize_sms_message(latest_unmatched_message)}" + ) + + +if __name__ == "__main__": # pragma: no cover - 简单调试入口 + import argparse + + parser = argparse.ArgumentParser( + description="SMSToMe phone pool & OTP helper", + ) + + subparsers = parser.add_subparsers(dest="command", required=True) + + sync_parser = subparsers.add_parser( + "sync", help="同步全量手机号到 txt 文件", + ) + sync_parser.add_argument( + "--cookie", + dest="cookie", + help="可选 Cookie 字符串;为空则使用 SMSTOME_COOKIE 环境变量或 config.yaml", + ) + sync_parser.add_argument( + "--max-pages-per-country", + dest="max_pages_per_country", + type=int, + default=DEFAULT_SYNC_MAX_PAGES_PER_COUNTRY, + help=( + "从起始页开始,每个国家最多抓取多少页;" + f"默认 {DEFAULT_SYNC_MAX_PAGES_PER_COUNTRY}" + ), + ) + sync_parser.add_argument( + "--start-page", + dest="start_page", + type=int, + default=1, + help="每个国家从第几页开始抓;默认 1", + ) + sync_parser.add_argument( + "--countries", + dest="countries", + help="可选国家 slug 列表;支持单个 slug 或逗号分隔,例如 united-kingdom,sweden", + ) + sync_parser.add_argument( + "--output", + dest="output_path", + default=str(GLOBAL_PHONE_FILE), + help=f"同步结果输出文件;默认 {GLOBAL_PHONE_FILE}", + ) + sync_parser.add_argument( + "--skip-history-check", + dest="skip_history_check", + action="store_true", + help="不同步详情页历史活跃度;默认会过滤掉没有分钟级历史短信的号码", + ) + sync_parser.add_argument( + "--recent-history-minutes", + dest="recent_history_minutes", + type=float, + default=DEFAULT_RECENT_HISTORY_MINUTES, + help=( + "同步时仅保留最近 N 分钟内有历史短信的号码;" + f"默认 {int(DEFAULT_RECENT_HISTORY_MINUTES)}" + ), + ) + + pick_parser = subparsers.add_parser( + "pick", help="为某个任务选择一个未使用的手机号", + ) + pick_parser.add_argument("task", help="任务名称,用于区分已使用号码文件") + pick_parser.add_argument( + "--country", + dest="country", + help="可选国家 slug(例如 poland、sweden)", + ) + + args = parser.parse_args() + + if args.command == "sync": + count = update_global_phone_list( + cookie_header=args.cookie, + countries=parse_country_slugs(args.countries) or None, + output_path=args.output_path, + max_pages_per_country=args.max_pages_per_country, + start_page=args.start_page, + require_recent_history=not args.skip_history_check, + recent_history_minutes=args.recent_history_minutes, + ) + print(f"Synced {count} phone numbers into {args.output_path}") + elif args.command == "pick": + entry = get_unused_phone( + task_name=args.task, + country_slug=args.country, + ) + if entry is None: + print("No unused phone available.") + else: + print( + f"Task={args.task} -> {entry.phone} " + f"(country={entry.country_slug})", + ) diff --git a/tests/test_chatgpt_phone_flow.py b/tests/test_chatgpt_phone_flow.py new file mode 100644 index 0000000..3a0e502 --- /dev/null +++ b/tests/test_chatgpt_phone_flow.py @@ -0,0 +1,203 @@ +import base64 +import json +import tempfile +import unittest +from pathlib import Path +from unittest import mock + +from platforms.chatgpt.oauth_client import OAuthClient +from platforms.chatgpt.phone_service import SMSToMePhoneService +from platforms.chatgpt.utils import FlowState +from smstome_tool import PhoneEntry, parse_country_slugs + + +class OAuthCookieDecodeTests(unittest.TestCase): + def test_decode_signed_cookie_payload(self): + payload = { + "email": "demo@example.com", + "phone_number": "+447456344799", + "phone_verification_channel": "whatsapp", + } + encoded = base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8")).decode("utf-8").rstrip("=") + cookie_value = f"{encoded}.sig-a.sig-b" + + self.assertEqual(OAuthClient._decode_cookie_json_value(cookie_value), payload) + + def test_decode_invalid_cookie_payload(self): + self.assertIsNone(OAuthClient._decode_cookie_json_value("not-a-valid-cookie")) + + +class SMSToMeConfigTests(unittest.TestCase): + def test_parse_country_slugs_accepts_csv_and_iterables(self): + self.assertEqual( + parse_country_slugs("united-kingdom, poland;finland"), + ["united-kingdom", "poland", "finland"], + ) + self.assertEqual( + parse_country_slugs(["united-kingdom", "poland", "united_kingdom"]), + ["united-kingdom", "poland"], + ) + + def test_phone_service_enabled_when_pool_file_exists(self): + with tempfile.TemporaryDirectory() as tmp_dir: + pool_path = Path(tmp_dir) / "phones.txt" + pool_path.write_text("+447456344799\tunited-kingdom\thttps://example.com\n", encoding="utf-8") + + service = SMSToMePhoneService({"smstome_global_file": str(pool_path)}) + self.assertTrue(service.enabled) + + def test_phone_service_disabled_for_empty_pool_without_cookie(self): + with tempfile.TemporaryDirectory() as tmp_dir: + pool_path = Path(tmp_dir) / "phones.txt" + pool_path.write_text("", encoding="utf-8") + + service = SMSToMePhoneService({"smstome_global_file": str(pool_path)}) + self.assertFalse(service.enabled) + + def test_wait_for_code_forwards_cookie_timeout_and_poll_interval(self): + entry = PhoneEntry( + country_slug="united-kingdom", + phone="+447456344799", + detail_url="https://example.com/phone/1", + ) + service = SMSToMePhoneService( + { + "smstome_cookie": "cf_clearance=demo", + "smstome_otp_timeout_seconds": "66", + "smstome_poll_interval_seconds": "7", + } + ) + + with mock.patch("platforms.chatgpt.phone_service.wait_for_otp", return_value="123456") as mocked: + code = service.wait_for_code(entry) + + self.assertEqual(code, "123456") + mocked.assert_called_once() + kwargs = mocked.call_args.kwargs + self.assertEqual(kwargs["cookie_header"], "cf_clearance=demo") + self.assertEqual(kwargs["timeout"], 66) + self.assertEqual(kwargs["poll_interval"], 7) + self.assertFalse(kwargs["raise_on_timeout"]) + + def test_ensure_pool_ready_syncs_with_configured_page_limit(self): + with tempfile.TemporaryDirectory() as tmp_dir: + pool_path = Path(tmp_dir) / "phones.txt" + service = SMSToMePhoneService( + { + "smstome_cookie": "cf_clearance=demo", + "smstome_country_slugs": "united-kingdom", + "smstome_global_file": str(pool_path), + "smstome_sync_max_pages_per_country": "9", + } + ) + + with mock.patch("platforms.chatgpt.phone_service.update_global_phone_list", return_value=3) as mocked: + service.ensure_pool_ready() + + mocked.assert_called_once() + kwargs = mocked.call_args.kwargs + self.assertEqual(kwargs["cookie_header"], "cf_clearance=demo") + self.assertEqual(kwargs["countries"], ["united-kingdom"]) + self.assertEqual(kwargs["output_path"], pool_path) + self.assertEqual(kwargs["max_pages_per_country"], 9) + + +class OAuthPhoneBlacklistTests(unittest.TestCase): + def test_should_blacklist_explicit_phone_rejection(self): + state = FlowState( + page_type="add_phone", + payload={"error": {"message": "phone number is invalid"}}, + ) + self.assertTrue( + OAuthClient._should_blacklist_phone_failure( + "add-phone/send 失败: 400 - phone number is invalid", + state, + ) + ) + + def test_should_not_blacklist_whatsapp_or_delivery_failures(self): + self.assertFalse( + OAuthClient._should_blacklist_phone_failure( + "add_phone 已切到 whatsapp 通道,当前 SMSToMe 仅支持短信接码" + ) + ) + self.assertFalse( + OAuthClient._should_blacklist_phone_failure("手机号 +447000000001 未收到短信验证码") + ) + + def test_handle_add_phone_blacklists_explicitly_rejected_number(self): + client = OAuthClient(config={}, verbose=False) + client._log = lambda _msg: None + entry = PhoneEntry( + country_slug="united-kingdom", + phone="+447000000001", + detail_url="https://example.com/phone/1", + ) + phone_service = mock.Mock() + phone_service.enabled = True + phone_service.max_attempts = 1 + phone_service.acquire_phone.return_value = entry + phone_service.prefix_hint.return_value = "+447000" + + with mock.patch("platforms.chatgpt.oauth_client.SMSToMePhoneService", return_value=phone_service): + with mock.patch.object( + client, + "_send_phone_number", + return_value=(False, None, "add-phone/send 失败: 400 - phone number is invalid"), + ): + state = client._handle_add_phone_verification( + "device-id", + "Mozilla/5.0", + None, + None, + FlowState(page_type="add_phone"), + ) + + self.assertIsNone(state) + phone_service.mark_blacklisted.assert_called_once_with(entry.phone) + self.assertIn("add_phone 阶段失败", client.last_error) + + def test_handle_add_phone_does_not_blacklist_whatsapp_channel(self): + client = OAuthClient(config={}, verbose=False) + client._log = lambda _msg: None + entry = PhoneEntry( + country_slug="united-kingdom", + phone="+447000000002", + detail_url="https://example.com/phone/2", + ) + phone_service = mock.Mock() + phone_service.enabled = True + phone_service.max_attempts = 1 + phone_service.acquire_phone.return_value = entry + phone_service.prefix_hint.return_value = "+447000" + + next_state = FlowState( + page_type="phone_otp_verification", + continue_url="https://auth.openai.com/phone-verification", + ) + + with mock.patch("platforms.chatgpt.oauth_client.SMSToMePhoneService", return_value=phone_service): + with mock.patch.object(client, "_send_phone_number", return_value=(True, next_state, "")): + with mock.patch.object( + client, + "_decode_oauth_session_cookie", + return_value={ + "phone_verification_channel": "whatsapp", + "phone_number": entry.phone, + }, + ): + state = client._handle_add_phone_verification( + "device-id", + "Mozilla/5.0", + None, + None, + FlowState(page_type="add_phone"), + ) + + self.assertIsNone(state) + phone_service.mark_blacklisted.assert_not_called() + self.assertIn("whatsapp", client.last_error) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_config_store_env_fallback.py b/tests/test_config_store_env_fallback.py new file mode 100644 index 0000000..71fc122 --- /dev/null +++ b/tests/test_config_store_env_fallback.py @@ -0,0 +1,72 @@ +import tempfile +import unittest +from pathlib import Path + +from core.config_store import ( + _canonical_config_key, + _get_env_fallback_value, + _load_env_file, + _merge_env_fallback, + _normalize_config_value, +) + + +class ConfigStoreEnvFallbackTests(unittest.TestCase): + def test_normalize_config_value_strips_matching_quotes(self): + self.assertEqual(_normalize_config_value('"quoted"'), "quoted") + self.assertEqual(_normalize_config_value("'quoted'"), "quoted") + self.assertEqual(_normalize_config_value("plain"), "plain") + + def test_load_env_file_supports_export_and_quotes(self): + with tempfile.TemporaryDirectory() as tmp_dir: + env_path = Path(tmp_dir) / ".env" + env_path.write_text( + "\n".join( + [ + "# comment", + "export SMSTOME_COOKIE='cf_clearance=demo'", + 'cfworker_custom_auth="secret-pass"', + ] + ), + encoding="utf-8", + ) + + values = _load_env_file(env_path) + + self.assertEqual(values["SMSTOME_COOKIE"], "cf_clearance=demo") + self.assertEqual(values["cfworker_custom_auth"], "secret-pass") + + def test_get_env_fallback_value_matches_uppercase_env_names(self): + env_values = { + "SMSTOME_COOKIE": "cf_clearance=demo", + "CFWORKER_CUSTOM_AUTH": "secret-pass", + } + + self.assertEqual( + _get_env_fallback_value("smstome_cookie", env_values=env_values), + "cf_clearance=demo", + ) + self.assertEqual( + _get_env_fallback_value("cfworker_custom_auth", env_values=env_values), + "secret-pass", + ) + + def test_merge_env_fallback_uses_canonical_key_without_overriding_db(self): + merged = _merge_env_fallback( + { + "smstome_cookie": "", + "cfworker_custom_auth": "db-value", + }, + env_values={ + "SMSTOME_COOKIE": "cf_clearance=demo", + "CFWORKER_CUSTOM_AUTH": "env-value", + }, + ) + + self.assertEqual(_canonical_config_key("SMSTOME_COOKIE"), "smstome_cookie") + self.assertEqual(merged["smstome_cookie"], "cf_clearance=demo") + self.assertEqual(merged["cfworker_custom_auth"], "db-value") + + +if __name__ == "__main__": + unittest.main()