Merge main into pr17-maliapi

Resolve PR #17 conflicts by keeping the MaliAPI mailbox changes alongside
recent main-branch config additions for CFWorker and SMSToMe support.
This commit is contained in:
zhangchen
2026-03-30 23:20:17 +08:00
32 changed files with 3216 additions and 183 deletions

View File

@@ -10,3 +10,11 @@ static/
*.egg-info/
dist/
build/
account_manager.db
backend.stdout.log
backend.stderr.log
services/turnstile_solver/solver.log
smstome_used/
smstome_all_numbers.txt
smstome_uk_deep_numbers.txt
.env

5
.gitignore vendored
View File

@@ -52,4 +52,7 @@ data/
*.swp
.claude/
.docs/superpowers/
CLAUDE.md
CLAUDE.md
.ace-tool/
smstome*_numbers.txt
smstome_used/

52
Dockerfile Normal file
View File

@@ -0,0 +1,52 @@
# syntax=docker/dockerfile:1.7
FROM node:20-bookworm-slim AS frontend-builder
WORKDIR /app/frontend
COPY frontend/package.json frontend/package-lock.json ./
RUN npm ci
COPY frontend/ ./
RUN npm run build
FROM python:3.12-slim AS runtime
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
HOST=0.0.0.0 \
PORT=8000 \
APP_CONDA_ENV=docker \
SOLVER_BROWSER_TYPE=chromium
WORKDIR /app
COPY requirements.txt ./
RUN pip install --upgrade pip \
&& pip install -r requirements.txt \
&& installed=0 \
&& for attempt in 1 2 3; do \
if python -m playwright install --with-deps chromium; then \
installed=1; \
break; \
fi; \
if [ "$attempt" -eq 3 ]; then break; fi; \
echo "playwright browser install failed, retrying ($attempt/3)..." >&2; \
sleep 5; \
done \
&& [ "$installed" -eq 1 ]
COPY . .
COPY --from=frontend-builder /app/static /app/static
RUN chmod +x /app/docker/entrypoint.sh \
&& mkdir -p /runtime /runtime/logs /runtime/smstome_used
EXPOSE 8000 8889
VOLUME ["/runtime"]
ENTRYPOINT ["/app/docker/entrypoint.sh"]

100
README.md
View File

@@ -27,11 +27,11 @@
项目支持按需安装/启动以下 3 个插件。当前代码里配置的 Git 地址如下:
| 项目 | 用途 | Git 地址 | 当前使用说明 |
| -------------------- | -------------------------- | -------------------------------------------------------- | ---------------------------------- |
| CLIProxyAPI | CPA / 代理池管理服务 | `https://github.com/router-for-me/CLIProxyAPI.git` | 当前使用 **GitHub 直连地址**,未额外套 Git 镜像代理 |
| grok2api | Grok token 管理、回填、聊天/API 服务 | `https://github.com/chenyme/grok2api.git` | 当前使用 **GitHub 直连地址**,未额外套 Git 镜像代理 |
| kiro-account-manager | Kiro 账号管理相关插件 | `https://github.com/hj01857655/kiro-account-manager.git` | 当前使用 **GitHub 直连地址**,未额外套 Git 镜像代理 |
| 项目 | 用途 | Git 地址 | 当前使用说明 |
| -------------------- | ------------------------------------ | ---------------------------------------------------------- | -------------------------------------------------------- |
| CLIProxyAPI | CPA / 代理池管理服务 | `https://github.com/router-for-me/CLIProxyAPI.git` | 当前使用**GitHub 直连地址**,未额外套 Git 镜像代理 |
| grok2api | Grok token 管理、回填、聊天/API 服务 | `https://github.com/chenyme/grok2api.git` | 当前使用**GitHub 直连地址**,未额外套 Git 镜像代理 |
| kiro-account-manager | Kiro 账号管理相关插件 | `https://github.com/hj01857655/kiro-account-manager.git` | 当前使用**GitHub 直连地址**,未额外套 Git 镜像代理 |
> 如果后续你要改成 `ghproxy`、`gitclone`、企业 Git 镜像或其他代理地址,需要同步修改:
>
@@ -68,12 +68,12 @@
## 技术栈
| 层级 | 技术 |
| ------ | -------------------------- |
| 后端 | FastAPI + SQLiteSQLModel |
| 前端 | React + TypeScript + Vite |
| HTTP | curl\_cffi |
| 浏览器自动化 | Playwright / Camoufox |
| 层级 | 技术 |
| ------------ | ---------------------------- |
| 后端 | FastAPI + SQLiteSQLModel |
| 前端 | React + TypeScript + Vite |
| HTTP | curl\_cffi |
| 浏览器自动化 | Playwright / Camoufox |
## 环境要求
@@ -102,7 +102,7 @@ any-auto-register
- `ModuleNotFoundError: quart`
- 前端里 Turnstile Solver 一直显示“未运行”
***
---
## 安装
@@ -141,7 +141,7 @@ cd ..
D:\codemodule\ai\any-auto-register\static
```
***
---
## 启动方式
@@ -181,6 +181,36 @@ http://localhost:8000
> 注意:生产/本地构建模式下,前端由 FastAPI 直接托管,访问的是 `8000`,不是 `5173`。
### Docker 启动
如果你只想快速拉起整个项目,可以直接使用仓库根目录的 `Dockerfile``docker-compose.yml`
```bash
docker compose up --build -d
```
默认会暴露:
- Web UI / API`http://localhost:8000`
- Turnstile Solver`http://localhost:8889`
容器内仍然沿用“后端自动拉起本地 Solver”的方式`docker-compose.yml` 默认把 Solver 浏览器切到 `chromium`,避免额外依赖本机 conda/camoufox 环境。
运行时数据会持久化到 compose volume 中,包括:
- SQLite 数据库 `account_manager.db`
- `smstome_used/` 里的已用号 / 黑名单
- `smstome_all_numbers.txt`
如果需要传入 `SMSTOME_COOKIE``OPENAI_*` 等配置,直接写在仓库根目录 `.env` 即可;`docker compose` 会把它们注入到容器环境中。
常用命令:
```bash
docker compose logs -f
docker compose down
```
### 停止后端
#### PowerShell
@@ -200,7 +230,7 @@ stop_backend.bat
- 后端端口:`8000`
- Solver 端口:`8889`
***
---
## 前端开发模式
@@ -227,7 +257,7 @@ http://localhost:5173
Vite 会把 `/api` 代理到本地后端 `http://localhost:8000`
***
---
## Turnstile Solver 说明
@@ -261,7 +291,7 @@ python services/turnstile_solver/start.py --browser_type camoufox --port 8889
D:\codemodule\ai\any-auto-register\services\turnstile_solver\solver.log
```
***
---
## 常见问题排查
@@ -334,7 +364,7 @@ http://localhost:8889/
.\start_backend.ps1
```
***
---
## 邮箱服务配置
@@ -367,35 +397,35 @@ http://localhost:8889/
适合固定邮箱场景。
| 参数 | 说明 |
| ---------- | -------- |
| 邮箱地址 | 完整邮箱地址 |
| Account ID | 邮箱账号 ID |
| 参数 | 说明 |
| ---------- | ---------------- |
| 邮箱地址 | 完整邮箱地址 |
| Account ID | 邮箱账号 ID |
| JWT Token | 登录后的认证令牌 |
### Cloudflare Worker 自建邮箱
| 参数 | 说明 |
| ----------- | ------------- |
| API URL | Worker API 地址 |
| Admin Token | 管理员密码 |
| 域名 | 收件邮箱域名 |
| Fingerprint | 可选 |
| 参数 | 说明 |
| ----------- | ------------------------------------------------------------------------------------------------------------------------------------- |
| API URL | Worker API 地址(注意这是填写[cloudflare workers后端地址](https://temp-mail-docs.awsl.uk/zh/guide/ui/worker.html) !!不是pages前端地址 |
| Admin Token | 管理员密码 |
| 域名 | 收件邮箱域名 |
| Fingerprint | 可选 |
### DuckMail / Freemail
适合临时邮箱场景,部分区域可能需要代理。
***
---
## 验证码服务配置
| 服务 | 说明 |
| ---------- | -------------------------------------------- |
| YesCaptcha | 需填写 Client Key |
| 本地 Solver | 依赖 `camoufox` + `quart`,并要求后端运行在正确 conda 环境中 |
| 服务 | 说明 |
| ----------- | ---------------------------------------------------------------- |
| YesCaptcha | 需填写 Client Key |
| 本地 Solver | 依赖 `camoufox` + `quart`,并要求后端运行在正确 conda 环境中 |
***
---
## 项目结构
@@ -415,7 +445,7 @@ any-auto-register/
└── static/
```
***
---
## Electron 开发说明
@@ -429,7 +459,7 @@ Electron 开发模式不会自动启动 Python 后端。
然后再运行 Electron。
***
---
## License

View File

@@ -52,6 +52,17 @@ def execute_action(
try:
result = instance.execute_action(action_id, account, body.params)
if platform == "chatgpt" and action_id == "upload_cpa":
from services.chatgpt_sync import update_account_model_cpa_sync
sync_msg = result.get("data") or result.get("error") or ""
update_account_model_cpa_sync(
acc_model,
bool(result.get("ok")),
str(sync_msg),
session=session,
commit=False,
)
# 若操作返回了新 token更新数据库
if result.get("ok") and result.get("data", {}) and isinstance(result["data"], dict):
data = result["data"]
@@ -67,7 +78,7 @@ def execute_action(
from datetime import datetime, timezone
acc_model.updated_at = datetime.now(timezone.utc)
session.add(acc_model)
session.commit()
session.commit()
return result
except NotImplementedError as e:
raise HTTPException(400, str(e))

View File

@@ -13,7 +13,9 @@ CONFIG_KEYS = [
"moemail_api_url",
"mail_provider",
"maliapi_base_url", "maliapi_api_key", "maliapi_domain", "maliapi_auto_domain_strategy",
"cfworker_api_url", "cfworker_admin_token", "cfworker_domain", "cfworker_fingerprint",
"cfworker_api_url", "cfworker_admin_token", "cfworker_custom_auth", "cfworker_domain", "cfworker_fingerprint",
"smstome_cookie", "smstome_country_slugs", "smstome_phone_attempts", "smstome_otp_timeout_seconds",
"smstome_poll_interval_seconds", "smstome_sync_max_pages_per_country",
"luckmail_base_url", "luckmail_api_key", "luckmail_email_type", "luckmail_domain",
"cpa_api_url", "cpa_api_key",
"team_manager_url", "team_manager_key",

View File

@@ -9,12 +9,17 @@ from sqlmodel import Session, select
from core.base_platform import Account, AccountStatus
from core.db import AccountModel, engine
from services.external_apps import install, list_status, start, start_all, stop, stop_all
from services.chatgpt_sync import has_cpa_upload_success, upload_account_model_to_cpa
router = APIRouter(prefix="/integrations", tags=["integrations"])
class BackfillRequest(BaseModel):
platforms: list[str] = Field(default_factory=lambda: ["grok", "kiro"])
account_ids: list[int] = Field(default_factory=list)
pending_only: bool = False
status: Optional[str] = None
email: Optional[str] = None
def _to_account(model: AccountModel) -> Account:
@@ -65,58 +70,79 @@ def backfill_integrations(body: BackfillRequest):
summary = {"total": 0, "success": 0, "failed": 0, "items": []}
targets = set(body.platforms or [])
if "grok" in targets:
from services.grok2api_runtime import ensure_grok2api_ready
ok, msg = ensure_grok2api_ready()
if not ok:
return {
"total": 0,
"success": 0,
"failed": 0,
"items": [{"platform": "grok", "email": "", "results": [{"name": "grok2api", "ok": False, "msg": msg}]}],
}
with Session(engine) as s:
rows = s.exec(
select(AccountModel).where(AccountModel.platform.in_(targets))
).all()
q = select(AccountModel)
if body.account_ids:
q = q.where(AccountModel.id.in_(body.account_ids))
if targets:
q = q.where(AccountModel.platform.in_(targets))
elif targets:
q = q.where(AccountModel.platform.in_(targets))
else:
return summary
for row in rows:
item = {"platform": row.platform, "email": row.email, "results": []}
try:
account = _to_account(row)
results = []
if row.platform == "grok":
from core.config_store import config_store
from platforms.grok.grok2api_upload import upload_to_grok2api
if body.status:
q = q.where(AccountModel.status == body.status)
if body.email:
q = q.where(AccountModel.email.contains(body.email))
api_url = str(config_store.get("grok2api_url", "") or "").strip() or "http://127.0.0.1:8011"
app_key = str(config_store.get("grok2api_app_key", "") or "").strip() or "grok2api"
ok, msg = upload_to_grok2api(account, api_url=api_url, app_key=app_key)
results.append({"name": "grok2api", "ok": ok, "msg": msg})
rows = s.exec(q).all()
if body.pending_only:
rows = [row for row in rows if row.platform != "chatgpt" or not has_cpa_upload_success(row)]
elif row.platform == "kiro":
from core.config_store import config_store
from platforms.kiro.account_manager_upload import upload_to_kiro_manager
if any(row.platform == "grok" for row in rows):
from services.grok2api_runtime import ensure_grok2api_ready
configured_path = str(config_store.get("kiro_manager_path", "") or "").strip() or None
ok, msg = upload_to_kiro_manager(account, path=configured_path)
results.append({"name": "Kiro Manager", "ok": ok, "msg": msg})
ok, msg = ensure_grok2api_ready()
if not ok:
return {
"total": 0,
"success": 0,
"failed": 0,
"items": [{"platform": "grok", "email": "", "results": [{"name": "grok2api", "ok": False, "msg": msg}]}],
}
if not results:
item["results"].append({"name": "skip", "ok": False, "msg": "未配置对应导入目标"})
summary["failed"] += 1
else:
item["results"] = results
if all(r.get("ok") for r in results):
summary["success"] += 1
else:
for row in rows:
item = {"platform": row.platform, "email": row.email, "results": []}
try:
results = []
if row.platform == "chatgpt":
ok, msg = upload_account_model_to_cpa(row, session=s, commit=True)
results.append({"name": "CPA", "ok": ok, "msg": msg})
elif row.platform == "grok":
from core.config_store import config_store
from platforms.grok.grok2api_upload import upload_to_grok2api
account = _to_account(row)
api_url = str(config_store.get("grok2api_url", "") or "").strip() or "http://127.0.0.1:8011"
app_key = str(config_store.get("grok2api_app_key", "") or "").strip() or "grok2api"
ok, msg = upload_to_grok2api(account, api_url=api_url, app_key=app_key)
results.append({"name": "grok2api", "ok": ok, "msg": msg})
elif row.platform == "kiro":
from core.config_store import config_store
from platforms.kiro.account_manager_upload import upload_to_kiro_manager
account = _to_account(row)
configured_path = str(config_store.get("kiro_manager_path", "") or "").strip() or None
ok, msg = upload_to_kiro_manager(account, path=configured_path)
results.append({"name": "Kiro Manager", "ok": ok, "msg": msg})
if not results:
item["results"].append({"name": "skip", "ok": False, "msg": "未配置对应导入目标"})
summary["failed"] += 1
except Exception as e:
item["results"].append({"name": "error", "ok": False, "msg": str(e)})
summary["failed"] += 1
summary["items"].append(item)
summary["total"] += 1
else:
item["results"] = results
if all(r.get("ok") for r in results):
summary["success"] += 1
else:
summary["failed"] += 1
except Exception as e:
s.rollback()
item["results"].append({"name": "error", "ok": False, "msg": str(e)})
summary["failed"] += 1
summary["items"].append(item)
summary["total"] += 1
return summary

View File

@@ -168,11 +168,11 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
account.extra.setdefault("luckmail_domain", merged_extra.get("luckmail_domain"))
if merged_extra.get("luckmail_base_url"):
account.extra.setdefault("luckmail_base_url", merged_extra.get("luckmail_base_url"))
save_account(account)
saved_account = save_account(account)
if _proxy: proxy_pool.report_success(_proxy)
_log(task_id, f"✓ 注册成功: {account.email}")
_save_task_log(req.platform, account.email, "success")
_auto_upload_integrations(task_id, account)
_auto_upload_integrations(task_id, saved_account or account)
cashier_url = (account.extra or {}).get("cashier_url", "")
if cashier_url:
_log(task_id, f" [升级链接] {cashier_url}")

View File

@@ -94,9 +94,9 @@ def create_mailbox(provider: str, extra: dict = None, proxy: str = None) -> 'Bas
return TempMailLolMailbox(proxy=proxy)
elif provider == "duckmail":
return DuckMailMailbox(
api_url=extra.get("duckmail_api_url", "https://www.duckmail.sbs"),
provider_url=extra.get("duckmail_provider_url", "https://api.duckmail.sbs"),
bearer=extra.get("duckmail_bearer", "kevin273945"),
api_url=(extra.get("duckmail_api_url") or "https://www.duckmail.sbs"),
provider_url=(extra.get("duckmail_provider_url") or "https://api.duckmail.sbs"),
bearer=(extra.get("duckmail_bearer") or "kevin273945"),
proxy=proxy,
)
elif provider == "freemail":
@@ -126,6 +126,7 @@ def create_mailbox(provider: str, extra: dict = None, proxy: str = None) -> 'Bas
admin_token=extra.get("cfworker_admin_token", ""),
domain=extra.get("cfworker_domain", ""),
fingerprint=extra.get("cfworker_fingerprint", ""),
custom_auth=extra.get("cfworker_custom_auth", ""),
proxy=proxy,
)
elif provider == "luckmail":
@@ -335,9 +336,9 @@ class DuckMailMailbox(BaseMailbox):
provider_url: str = "https://api.duckmail.sbs",
bearer: str = "kevin273945",
proxy: str = None):
self.api = api_url.rstrip("/")
self.provider_url = provider_url
self.bearer = bearer
self.api = (api_url or "https://www.duckmail.sbs").rstrip("/")
self.provider_url = provider_url or "https://api.duckmail.sbs"
self.bearer = bearer or "kevin273945"
self.proxy = {"http": proxy, "https": proxy} if proxy else None
self._token = None
self._address = None
@@ -600,11 +601,12 @@ class CFWorkerMailbox(BaseMailbox):
"""Cloudflare Worker 自建临时邮箱服务"""
def __init__(self, api_url: str, admin_token: str = "", domain: str = "",
fingerprint: str = "", proxy: str = None):
fingerprint: str = "", custom_auth: str = "", proxy: str = None):
self.api = api_url.rstrip("/")
self.admin_token = admin_token
self.domain = domain
self.fingerprint = fingerprint
self.custom_auth = custom_auth
self.proxy = {"http": proxy, "https": proxy} if proxy else None
self._token = None
@@ -616,8 +618,56 @@ class CFWorkerMailbox(BaseMailbox):
}
if self.fingerprint:
h["x-fingerprint"] = self.fingerprint
if self.custom_auth:
h["x-custom-auth"] = self.custom_auth
return h
def _ensure_api_configured(self) -> None:
if not self.api:
raise RuntimeError("CF Worker API URL 未配置")
def _read_json(self, response, action: str):
try:
return response.json()
except Exception:
body = (response.text or "").strip()
snippet = body[:200] if body else "<empty>"
raise RuntimeError(
f"CF Worker {action} 返回非 JSON 响应: HTTP {response.status_code}, body={snippet}"
)
def _request_json(self, method: str, path: str, *, params: dict | None = None,
payload: dict | None = None, timeout: int = 15):
import requests
url = f"{self.api}{path}"
response = requests.request(
method,
url,
params=params,
json=payload,
headers=self._headers(),
proxies=self.proxy,
timeout=timeout,
)
body = (response.text or "").strip()
preview = body[:200] or "<empty>"
if response.status_code >= 400:
if "private site password" in body.lower():
raise RuntimeError(
"CFWorker API 需要私有站点密码,请配置 cfworker_custom_auth"
)
raise RuntimeError(
f"CFWorker API {path} 失败: HTTP {response.status_code} {preview}"
)
try:
return response.json()
except Exception as e:
raise RuntimeError(
f"CFWorker API {path} 返回非 JSON: HTTP {response.status_code} {preview}"
) from e
def _generate_local_part(self) -> str:
import random, string
# 避免纯数字开头,提高邮箱格式“像真人”的程度
@@ -626,28 +676,28 @@ class CFWorkerMailbox(BaseMailbox):
return f"{prefix}{suffix}"
def get_email(self) -> MailboxAccount:
import requests
self._ensure_api_configured()
name = self._generate_local_part()
payload = {"enablePrefix": True, "name": name}
if self.domain:
payload["domain"] = self.domain
r = requests.post(f"{self.api}/admin/new_address",
json=payload, headers=self._headers(),
proxies=self.proxy, timeout=15)
print(f"[CFWorker] new_address status={r.status_code} resp={r.text[:200]}")
data = r.json()
data = self._request_json("POST", "/admin/new_address", payload=payload, timeout=15)
email = data.get("email", data.get("address", ""))
token = data.get("token", data.get("jwt", ""))
if not email or not token:
raise RuntimeError(f"CFWorker API /admin/new_address 返回缺少 email/jwt: {data}")
self._token = token
print(f"[CFWorker] 生成邮箱: {email} token={token[:40] if token else 'NONE'}...")
return MailboxAccount(email=email, account_id=token)
def _get_mails(self, email: str) -> list:
import requests
r = requests.get(f"{self.api}/admin/mails",
self._ensure_api_configured()
data = self._request_json(
"GET",
"/admin/mails",
params={"limit": 20, "offset": 0, "address": email},
headers=self._headers(), proxies=self.proxy, timeout=10)
data = r.json()
timeout=10,
)
return data.get("results", data) if isinstance(data, dict) else data
def get_current_ids(self, account: MailboxAccount) -> set:

View File

@@ -1,9 +1,118 @@
"""全局配置持久化 - 存储在 SQLite"""
"""全局配置持久化 - 存储在 SQLite,并在缺省时回退到环境变量/.env。"""
import os
import re
from pathlib import Path
from typing import Optional
from sqlmodel import Field, SQLModel, Session, select
from .db import engine
_ENV_FILE = Path(__file__).resolve().parent.parent / ".env"
def _normalize_config_value(value) -> str:
text = str(value or "").strip()
if len(text) >= 2 and text[0] == text[-1] and text[0] in {"'", '"'}:
return text[1:-1]
return text
def _canonical_config_key(key: str) -> str:
value = str(key or "").strip()
if not value:
return ""
return re.sub(r"[^a-z0-9]+", "_", value.lower()).strip("_")
def _config_key_candidates(key: str) -> list[str]:
raw = str(key or "").strip()
if not raw:
return []
normalized = re.sub(r"[^A-Za-z0-9]+", "_", raw).strip("_")
candidates: list[str] = []
seen = set()
for item in (
raw,
raw.lower(),
raw.upper(),
normalized,
normalized.lower(),
normalized.upper(),
):
value = str(item or "").strip()
if value and value not in seen:
seen.add(value)
candidates.append(value)
return candidates
def _load_env_file(path: Path | str | None = None) -> dict[str, str]:
env_path = Path(path or _ENV_FILE)
if not env_path.exists():
return {}
try:
lines = env_path.read_text(encoding="utf-8", errors="ignore").splitlines()
except Exception:
return {}
values: dict[str, str] = {}
for raw_line in lines:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if line.lower().startswith("export "):
line = line[7:].strip()
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
if not key:
continue
values[key] = _normalize_config_value(value)
return values
def _runtime_env_values() -> dict[str, str]:
values: dict[str, str] = {}
for key, value in _load_env_file().items():
text = _normalize_config_value(value)
if text:
values[key] = text
for key, value in os.environ.items():
text = _normalize_config_value(value)
if text:
values[key] = text
return values
def _get_env_fallback_value(key: str, env_values: Optional[dict[str, str]] = None) -> str:
values = env_values if env_values is not None else _runtime_env_values()
for candidate in _config_key_candidates(key):
text = str(values.get(candidate, "") or "").strip()
if text:
return text
return ""
def _merge_env_fallback(values: dict[str, str], env_values: Optional[dict[str, str]] = None) -> dict[str, str]:
merged = dict(values or {})
runtime_values = env_values if env_values is not None else _runtime_env_values()
for env_key, env_value in runtime_values.items():
text = str(env_value or "").strip()
if not text:
continue
canonical_key = _canonical_config_key(env_key)
for target_key in (env_key, canonical_key):
if not target_key:
continue
if str(merged.get(target_key, "") or "").strip():
continue
merged[target_key] = text
return merged
class ConfigItem(SQLModel, table=True):
__tablename__ = "configs"
key: str = Field(primary_key=True)
@@ -14,9 +123,14 @@ class ConfigStore:
"""简单 key-value 配置存储"""
def get(self, key: str, default: str = "") -> str:
env_values = _runtime_env_values()
with Session(engine) as s:
item = s.get(ConfigItem, key)
return item.value if item else default
value = str(item.value if item else "" or "").strip()
if value:
return value
fallback = _get_env_fallback_value(key, env_values=env_values)
return fallback or default
def set(self, key: str, value: str) -> None:
with Session(engine) as s:
@@ -31,7 +145,8 @@ class ConfigStore:
def get_all(self) -> dict:
with Session(engine) as s:
items = s.exec(select(ConfigItem)).all()
return {i.key: i.value for i in items}
values = {i.key: i.value for i in items}
return _merge_env_fallback(values)
def set_many(self, data: dict) -> None:
with Session(engine) as s:

26
docker-compose.yml Normal file
View File

@@ -0,0 +1,26 @@
services:
app:
build:
context: .
dockerfile: Dockerfile
container_name: any-auto-register
init: true
restart: unless-stopped
env_file:
- .env
environment:
HOST: 0.0.0.0
PORT: "8000"
APP_RELOAD: "0"
APP_CONDA_ENV: docker
APP_RUNTIME_DIR: /runtime
SOLVER_PORT: "8889"
SOLVER_BROWSER_TYPE: chromium
ports:
- "8000:8000"
- "8889:8889"
volumes:
- app_runtime:/runtime
volumes:
app_runtime:

20
docker/entrypoint.sh Normal file
View File

@@ -0,0 +1,20 @@
#!/bin/sh
set -eu
APP_DIR="/app"
RUNTIME_DIR="${APP_RUNTIME_DIR:-/runtime}"
mkdir -p "${RUNTIME_DIR}" "${RUNTIME_DIR}/logs" "${RUNTIME_DIR}/smstome_used"
touch \
"${RUNTIME_DIR}/account_manager.db" \
"${RUNTIME_DIR}/smstome_all_numbers.txt" \
"${RUNTIME_DIR}/smstome_uk_deep_numbers.txt" \
"${RUNTIME_DIR}/logs/solver.log"
ln -sfn "${RUNTIME_DIR}/account_manager.db" "${APP_DIR}/account_manager.db"
ln -sfn "${RUNTIME_DIR}/smstome_used" "${APP_DIR}/smstome_used"
ln -sfn "${RUNTIME_DIR}/smstome_all_numbers.txt" "${APP_DIR}/smstome_all_numbers.txt"
ln -sfn "${RUNTIME_DIR}/smstome_uk_deep_numbers.txt" "${APP_DIR}/smstome_uk_deep_numbers.txt"
ln -sfn "${RUNTIME_DIR}/logs/solver.log" "${APP_DIR}/services/turnstile_solver/solver.log"
exec python main.py

View File

@@ -14,6 +14,7 @@ import {
Popconfirm,
Dropdown,
Typography,
Alert,
} from 'antd'
import type { MenuProps } from 'antd'
import {
@@ -39,6 +40,29 @@ const STATUS_COLORS: Record<string, string> = {
invalid: 'error',
}
function parseExtraJson(raw: string | undefined) {
if (!raw) return {}
try {
const parsed = JSON.parse(raw)
return parsed && typeof parsed === 'object' ? parsed : {}
} catch {
return {}
}
}
function normalizeAccount(account: any) {
const extra = parseExtraJson(account.extra_json)
const syncStatuses = extra.sync_statuses && typeof extra.sync_statuses === 'object' ? extra.sync_statuses : {}
const cpaSync = syncStatuses.cpa && typeof syncStatuses.cpa === 'object' ? syncStatuses.cpa : {}
return { ...account, extra, cpaSync }
}
function formatSyncTime(value?: string) {
if (!value) return ''
const date = new Date(value)
return Number.isNaN(date.getTime()) ? value : date.toLocaleString()
}
function LogPanel({ taskId, onDone }: { taskId: string; onDone: () => void }) {
const [lines, setLines] = useState<string[]>([])
const [done, setDone] = useState(false)
@@ -119,6 +143,11 @@ function LogPanel({ taskId, onDone }: { taskId: string; onDone: () => void }) {
function ActionMenu({ acc, onRefresh }: { acc: any; onRefresh: () => void }) {
const [actions, setActions] = useState<any[]>([])
const [resultOpen, setResultOpen] = useState(false)
const [resultTitle, setResultTitle] = useState('')
const [resultStatus, setResultStatus] = useState<'success' | 'error'>('success')
const [resultText, setResultText] = useState('')
const [resultUrl, setResultUrl] = useState('')
useEffect(() => {
apiFetch(`/actions/${acc.platform}`)
@@ -126,40 +155,129 @@ function ActionMenu({ acc, onRefresh }: { acc: any; onRefresh: () => void }) {
.catch(() => {})
}, [acc.platform])
const showResult = (title: string, status: 'success' | 'error', text: string, url = '') => {
setResultTitle(title)
setResultStatus(status)
setResultText(text)
setResultUrl(url)
setResultOpen(true)
}
const copyResultUrl = async () => {
if (!resultUrl) return
try {
await navigator.clipboard.writeText(resultUrl)
message.success('链接已复制')
} catch {
message.error('复制失败')
}
}
const handleAction = async (actionId: string) => {
const actionLabel = actions.find((item) => item.id === actionId)?.label || actionId
try {
const r = await apiFetch(`/actions/${acc.platform}/${acc.id}/${actionId}`, {
method: 'POST',
body: JSON.stringify({ params: {} }),
})
if (!r.ok) {
message.error(r.error || '操作失败')
showResult(actionLabel, 'error', r.error || '操作失败')
return
}
const data = r.data || {}
if (data.url || data.checkout_url || data.cashier_url) {
window.open(data.url || data.checkout_url || data.cashier_url, '_blank')
const targetUrl = data.url || data.checkout_url || data.cashier_url
message.success('链接已生成')
showResult(actionLabel, 'success', '操作成功,请在弹窗中打开或复制链接。', targetUrl)
} else {
message.success(data.message || '操作成功')
const text =
typeof data === 'string'
? data
: Object.keys(data).length > 0
? JSON.stringify(data, null, 2)
: '操作成功'
showResult(actionLabel, 'success', text)
}
onRefresh()
} catch {
message.error('请求失败')
} catch (e: any) {
const detail = e?.message ? String(e.message) : '请求失败'
message.error(detail)
showResult(actionLabel, 'error', detail)
}
}
const menuItems: MenuProps['items'] = actions.map((a) => ({
key: a.id,
label: a.label,
onClick: () => handleAction(a.id),
}))
if (actions.length === 0) return null
return (
<Dropdown menu={{ items: menuItems }}>
<Button type="link" size="small" icon={<MoreOutlined />} />
</Dropdown>
<>
<Dropdown
menu={{
items: menuItems,
onClick: ({ key }) => handleAction(String(key)),
}}
>
<Button type="link" size="small" icon={<MoreOutlined />} />
</Dropdown>
<Modal
title={resultTitle}
open={resultOpen}
onCancel={() => setResultOpen(false)}
footer={[
resultUrl ? (
<Button key="copy" onClick={copyResultUrl}>
</Button>
) : null,
resultUrl ? (
<Button
key="open"
type="primary"
onClick={() => window.open(resultUrl, '_blank', 'noopener,noreferrer')}
>
</Button>
) : null,
<Button key="ok" type={resultUrl ? 'default' : 'primary'} onClick={() => setResultOpen(false)}>
</Button>,
].filter(Boolean)}
maskClosable={false}
>
<Alert
type={resultStatus}
showIcon
message={resultStatus === 'success' ? '操作完成' : '操作失败'}
style={{ marginBottom: 12 }}
/>
{resultUrl ? (
<Space direction="vertical" style={{ width: '100%' }}>
<Text copyable={{ text: resultUrl }} style={{ wordBreak: 'break-all' }}>
{resultUrl}
</Text>
</Space>
) : null}
{resultText ? (
<pre
style={{
margin: 0,
whiteSpace: 'pre-wrap',
wordBreak: 'break-word',
fontFamily: 'monospace',
fontSize: 12,
}}
>
{resultText}
</pre>
) : null}
</Modal>
</>
)
}
@@ -186,6 +304,7 @@ export default function Accounts() {
const [importLoading, setImportLoading] = useState(false)
const [taskId, setTaskId] = useState<string | null>(null)
const [registerLoading, setRegisterLoading] = useState(false)
const [cpaSyncLoading, setCpaSyncLoading] = useState<'pending' | 'selected' | ''>('')
useEffect(() => {
if (platform) setCurrentPlatform(platform)
@@ -198,7 +317,7 @@ export default function Accounts() {
if (search) params.set('email', search)
if (filterStatus) params.set('status', filterStatus)
const data = await apiFetch(`/accounts?${params}`)
setAccounts(data.items)
setAccounts((data.items || []).map(normalizeAccount))
setTotal(data.total)
} finally {
setLoading(false)
@@ -313,8 +432,15 @@ export default function Accounts() {
freemail_password: cfg.freemail_password,
cfworker_api_url: cfg.cfworker_api_url,
cfworker_admin_token: cfg.cfworker_admin_token,
cfworker_custom_auth: cfg.cfworker_custom_auth,
cfworker_domain: cfg.cfworker_domain,
cfworker_fingerprint: cfg.cfworker_fingerprint,
smstome_cookie: cfg.smstome_cookie,
smstome_country_slugs: cfg.smstome_country_slugs,
smstome_phone_attempts: cfg.smstome_phone_attempts,
smstome_otp_timeout_seconds: cfg.smstome_otp_timeout_seconds,
smstome_poll_interval_seconds: cfg.smstome_poll_interval_seconds,
smstome_sync_max_pages_per_country: cfg.smstome_sync_max_pages_per_country,
},
}),
})
@@ -335,6 +461,96 @@ export default function Accounts() {
load()
}
const showCpaSyncResult = (title: string, result: any) => {
const lines = (result.items || [])
.flatMap((item: any) =>
(item.results || []).map((syncResult: any) => ({
email: item.email,
platform: item.platform,
ok: Boolean(syncResult.ok),
name: syncResult.name || 'CPA',
msg: syncResult.msg || '',
})),
)
.filter((item: any) => !item.ok)
.map((item: any) => `[${item.platform}] ${item.email || '-'} / ${item.name}: ${item.msg || '失败'}`)
if (lines.length === 0) return
Modal.info({
title,
width: 760,
content: (
<pre
style={{
margin: 0,
maxHeight: 360,
overflow: 'auto',
padding: 12,
borderRadius: 8,
background: 'rgba(127,127,127,0.08)',
fontSize: 12,
lineHeight: 1.5,
whiteSpace: 'pre-wrap',
wordBreak: 'break-word',
}}
>
{lines.join('\n')}
</pre>
),
})
}
const handleCpaBackfill = async (mode: 'pending' | 'selected') => {
if (currentPlatform !== 'chatgpt') return
const body: Record<string, unknown> = {
platforms: ['chatgpt'],
}
if (mode === 'selected') {
const accountIds = Array.from(selectedRowKeys)
.map((value) => Number(value))
.filter((value) => Number.isInteger(value) && value > 0)
if (accountIds.length === 0) {
message.warning('请先选择要上传的账号')
return
}
body.account_ids = accountIds
} else {
body.pending_only = true
if (filterStatus) body.status = filterStatus
if (search) body.email = search
}
setCpaSyncLoading(mode)
try {
const result = await apiFetch('/integrations/backfill', {
method: 'POST',
body: JSON.stringify(body),
})
const actionLabel = mode === 'selected' ? '所选账号 CPA 上传' : '未上传账号 CPA 补传'
if (!result.total) {
message.info('没有可处理的账号')
} else if (!result.failed) {
message.success(`${actionLabel}完成:成功 ${result.success} / ${result.total}`)
} else if (!result.success) {
message.error(`${actionLabel}失败:成功 ${result.success} / ${result.total}`)
} else {
message.warning(`${actionLabel}部分完成:成功 ${result.success} / ${result.total}`)
}
showCpaSyncResult(`${actionLabel}结果`, result)
await load()
} catch (e: any) {
message.error(`CPA 上传失败: ${e.message}`)
} finally {
setCpaSyncLoading('')
}
}
const columns: any[] = [
{
title: '邮箱',
@@ -408,6 +624,37 @@ export default function Accounts() {
},
]
if (currentPlatform === 'chatgpt') {
columns.splice(4, 0, {
title: 'CPA',
key: 'cpa_sync',
render: (_: any, record: any) => {
const sync = record.cpaSync || {}
const uploaded = Boolean(sync.uploaded || sync.uploaded_at)
const attempted = Boolean(sync.last_attempt_at)
const color = uploaded ? 'success' : attempted ? 'error' : 'default'
const label = uploaded ? '已上传' : attempted ? '最近失败' : '未上传'
const time = uploaded ? sync.uploaded_at : sync.last_attempt_at
return (
<div style={{ display: 'flex', flexDirection: 'column', gap: 4, minWidth: 140 }}>
<Tag color={color}>{label}</Tag>
{time ? (
<Text type="secondary" style={{ fontSize: 12 }}>
{formatSyncTime(time)}
</Text>
) : null}
{sync.last_message ? (
<Text type="secondary" ellipsis={{ tooltip: sync.last_message }} style={{ maxWidth: 220, fontSize: 12 }}>
{sync.last_message}
</Text>
) : null}
</div>
)
},
})
}
return (
<div>
<div style={{ marginBottom: 16, display: 'flex', justifyContent: 'space-between', flexWrap: 'wrap', gap: 8 }}>
@@ -437,6 +684,26 @@ export default function Accounts() {
)}
</Space>
<Space>
{currentPlatform === 'chatgpt' && selectedRowKeys.length > 0 && (
<Popconfirm
title={`确认上传选中的 ${selectedRowKeys.length} 个账号到 CPA`}
onConfirm={() => handleCpaBackfill('selected')}
>
<Button loading={cpaSyncLoading === 'selected'} icon={<UploadOutlined />}>
CPA
</Button>
</Popconfirm>
)}
{currentPlatform === 'chatgpt' && (
<Popconfirm
title="确认补传当前筛选范围内尚未成功上传 CPA 的账号?"
onConfirm={() => handleCpaBackfill('pending')}
>
<Button loading={cpaSyncLoading === 'pending'} icon={<UploadOutlined />} disabled={total === 0}>
CPA
</Button>
</Popconfirm>
)}
{selectedRowKeys.length > 0 && (
<Popconfirm title={`确认删除选中的 ${selectedRowKeys.length} 个账号?`} onConfirm={handleBatchDelete}>
<Button danger icon={<DeleteOutlined />}> {selectedRowKeys.length} </Button>

View File

@@ -52,8 +52,15 @@ export default function Register() {
freemail_password: cfg.freemail_password || '',
cfworker_api_url: cfg.cfworker_api_url || '',
cfworker_admin_token: cfg.cfworker_admin_token || '',
cfworker_custom_auth: cfg.cfworker_custom_auth || '',
cfworker_domain: cfg.cfworker_domain || '',
cfworker_fingerprint: cfg.cfworker_fingerprint || '',
smstome_cookie: cfg.smstome_cookie || '',
smstome_country_slugs: cfg.smstome_country_slugs || '',
smstome_phone_attempts: cfg.smstome_phone_attempts || '',
smstome_otp_timeout_seconds: cfg.smstome_otp_timeout_seconds || '',
smstome_poll_interval_seconds: cfg.smstome_poll_interval_seconds || '',
smstome_sync_max_pages_per_country: cfg.smstome_sync_max_pages_per_country || '',
luckmail_base_url: cfg.luckmail_base_url || 'https://mails.luckyous.com/',
luckmail_api_key: cfg.luckmail_api_key || '',
luckmail_email_type: cfg.luckmail_email_type || '',
@@ -94,8 +101,15 @@ export default function Register() {
freemail_password: values.freemail_password,
cfworker_api_url: values.cfworker_api_url,
cfworker_admin_token: values.cfworker_admin_token,
cfworker_custom_auth: values.cfworker_custom_auth,
cfworker_domain: values.cfworker_domain,
cfworker_fingerprint: values.cfworker_fingerprint,
smstome_cookie: values.smstome_cookie,
smstome_country_slugs: values.smstome_country_slugs,
smstome_phone_attempts: values.smstome_phone_attempts,
smstome_otp_timeout_seconds: values.smstome_otp_timeout_seconds,
smstome_poll_interval_seconds: values.smstome_poll_interval_seconds,
smstome_sync_max_pages_per_country: values.smstome_sync_max_pages_per_country,
luckmail_base_url: values.luckmail_base_url,
luckmail_api_key: values.luckmail_api_key,
luckmail_email_type: values.luckmail_email_type,
@@ -254,6 +268,9 @@ export default function Register() {
<Form.Item name="cfworker_admin_token" label="Admin Token">
<Input placeholder="abc123,,,abc" />
</Form.Item>
<Form.Item name="cfworker_custom_auth" label="Site Password">
<Input.Password placeholder="private site password" />
</Form.Item>
<Form.Item name="cfworker_domain" label="域名">
<Input placeholder="example.com" />
</Form.Item>
@@ -280,6 +297,32 @@ export default function Register() {
)}
</Card>
{platform === 'chatgpt' && (
<Card title="ChatGPT 手机验证" style={{ marginBottom: 16 }}>
<Text type="secondary" style={{ display: 'block', marginBottom: 12 }}>
OAuth `add_phone` 使
</Text>
<Form.Item name="smstome_cookie" label="SMSToMe Cookie">
<Input.Password placeholder="cf_clearance=...; PHPSESSID=..." />
</Form.Item>
<Form.Item name="smstome_country_slugs" label="国家列表">
<Input placeholder="united-kingdom,poland,finland" />
</Form.Item>
<Form.Item name="smstome_phone_attempts" label="手机号尝试次数">
<Input placeholder="3" />
</Form.Item>
<Form.Item name="smstome_otp_timeout_seconds" label="短信等待秒数">
<Input placeholder="45" />
</Form.Item>
<Form.Item name="smstome_poll_interval_seconds" label="轮询间隔秒数">
<Input placeholder="5" />
</Form.Item>
<Form.Item name="smstome_sync_max_pages_per_country" label="每国同步页数">
<Input placeholder="5" />
</Form.Item>
</Card>
)}
{captchaSolver === 'yescaptcha' && (
<Card title="验证码配置" style={{ marginBottom: 16 }}>
<Form.Item name="yescaptcha_key" label="YesCaptcha Key">

View File

@@ -118,6 +118,7 @@ const TAB_ITEMS = [
fields: [
{ key: 'cfworker_api_url', label: 'API URL', placeholder: 'https://apimail.example.com' },
{ key: 'cfworker_admin_token', label: '管理员 Token', secret: true },
{ key: 'cfworker_custom_auth', label: '站点密码', secret: true },
{ key: 'cfworker_domain', label: '邮箱域名', placeholder: 'example.com' },
{ key: 'cfworker_fingerprint', label: 'Fingerprint', placeholder: '6703363b...' },
],
@@ -170,6 +171,18 @@ const TAB_ITEMS = [
{ key: 'team_manager_key', label: 'API Key', secret: true },
],
},
{
title: 'SMSToMe 手机验证',
desc: 'ChatGPT add_phone 阶段自动取号并轮询短信验证码',
fields: [
{ key: 'smstome_cookie', label: 'SMSToMe Cookie', secret: true },
{ key: 'smstome_country_slugs', label: '国家列表', placeholder: 'united-kingdom,poland' },
{ key: 'smstome_phone_attempts', label: '手机号尝试次数', placeholder: '3' },
{ key: 'smstome_otp_timeout_seconds', label: '短信等待秒数', placeholder: '45' },
{ key: 'smstome_poll_interval_seconds', label: '轮询间隔秒数', placeholder: '5' },
{ key: 'smstome_sync_max_pages_per_country', label: '每国同步页数', placeholder: '5' },
],
},
],
},
{

View File

@@ -823,7 +823,7 @@ class ChatGPTClient:
if self._state_is_email_otp(state):
self._log("等待邮箱验证码...")
otp_code = skymail_client.wait_for_verification_code(email, timeout=30)
otp_code = skymail_client.wait_for_verification_code(email, timeout=90)
if not otp_code:
return False, "未收到验证码"

View File

@@ -11,6 +11,7 @@ try:
except ImportError:
import requests as curl_requests
from .phone_service import SMSToMePhoneService
from .utils import (
FlowState,
build_browser_headers,
@@ -38,12 +39,14 @@ class OAuthClient:
verbose: 是否输出详细日志
browser_mode: protocol | headless | headed
"""
self.oauth_issuer = config.get("oauth_issuer", "https://auth.openai.com")
self.oauth_client_id = config.get("oauth_client_id", "app_EMoamEEZ73f0CkXaXp7hrann")
self.oauth_redirect_uri = config.get("oauth_redirect_uri", "http://localhost:1455/auth/callback")
self.config = dict(config or {})
self.oauth_issuer = self.config.get("oauth_issuer", "https://auth.openai.com")
self.oauth_client_id = self.config.get("oauth_client_id", "app_EMoamEEZ73f0CkXaXp7hrann")
self.oauth_redirect_uri = self.config.get("oauth_redirect_uri", "http://localhost:1455/auth/callback")
self.proxy = proxy
self.verbose = verbose
self.browser_mode = browser_mode or "protocol"
self.last_error = ""
# 创建 session
self.session = curl_requests.Session()
@@ -55,11 +58,110 @@ class OAuthClient:
if self.verbose:
print(f" [OAuth] {msg}")
def _set_error(self, message):
self.last_error = str(message or "").strip()
if self.last_error:
self._log(self.last_error)
def _browser_pause(self, low=0.15, high=0.4):
"""在 headed 模式下注入轻微延迟,模拟真实浏览器操作节奏。"""
if self.browser_mode == "headed":
random_delay(low, high)
@staticmethod
def _iter_text_fragments(value):
if isinstance(value, str):
text = value.strip()
if text:
yield text
return
if isinstance(value, dict):
for item in value.values():
yield from OAuthClient._iter_text_fragments(item)
return
if isinstance(value, (list, tuple, set)):
for item in value:
yield from OAuthClient._iter_text_fragments(item)
@classmethod
def _should_blacklist_phone_failure(cls, detail="", state: FlowState | None = None):
fragments = [str(detail or "").strip()]
if state is not None:
fragments.extend(
cls._iter_text_fragments(
{
"page_type": state.page_type,
"continue_url": state.continue_url,
"current_url": state.current_url,
"payload": state.payload,
"raw": state.raw,
}
)
)
combined = " | ".join(fragment for fragment in fragments if fragment).lower()
if not combined:
return False
non_blacklist_markers = (
"whatsapp",
"未收到短信验证码",
"手机号验证码错误",
"phone-otp/resend",
"phone-otp/validate 异常",
"phone-otp/validate 响应不是 json",
"phone-otp/validate 失败",
"timeout",
"timed out",
"network",
"connection",
"proxy",
"ssl",
"tls",
"captcha",
"too many phone",
"too many phone numbers",
"too many verification requests",
"验证请求过多",
"接受短信次数过多",
"session limit",
"rate limit",
)
if any(marker in combined for marker in non_blacklist_markers):
return False
blacklist_markers = (
"phone number is invalid",
"invalid phone number",
"invalid phone",
"phone number invalid",
"sms verification failed",
"send sms verification failed",
"unable to send sms",
"not a valid mobile number",
"unsupported phone number",
"phone number not supported",
"carrier not supported",
"电话号码无效",
"手机号无效",
"发送短信验证失败",
"号码无效",
"号码不支持",
"手机号不支持",
)
return any(marker in combined for marker in blacklist_markers)
def _blacklist_phone_if_needed(self, phone_service, entry, detail="", state: FlowState | None = None):
if not entry or not self._should_blacklist_phone_failure(detail, state):
return False
try:
phone_service.mark_blacklisted(entry.phone)
self._log(f"已将手机号加入黑名单: {entry.phone}")
return True
except Exception as e:
self._log(f"写入手机号黑名单失败: {e}")
return False
def _headers(
self,
url,
@@ -142,6 +244,10 @@ class OAuthClient:
target = f"{state.continue_url} {state.current_url}".lower()
return state.page_type == "email_otp_verification" or "email-verification" in target or "email-otp" in target
def _state_is_add_phone(self, state: FlowState):
target = f"{state.continue_url} {state.current_url}".lower()
return state.page_type == "add_phone" or "add-phone" in target
def _state_requires_navigation(self, state: FlowState):
method = (state.method or "GET").upper()
if method != "GET":
@@ -335,7 +441,7 @@ class OAuthClient:
impersonate=impersonate,
)
if not sentinel_token:
self._log("无法获取 sentinel token (authorize_continue)")
self._set_error("无法获取 sentinel token (authorize_continue)")
return None
request_url = f"{self.oauth_issuer}/api/accounts/authorize/continue"
@@ -391,7 +497,7 @@ class OAuthClient:
self._log(f"/authorize/continue(重试) -> {r.status_code}")
if r.status_code != 200:
self._log(f"提交邮箱失败: {r.text[:180]}")
self._set_error(f"提交邮箱失败: {r.status_code} - {r.text[:180]}")
return None
data = r.json()
@@ -399,7 +505,7 @@ class OAuthClient:
self._log(describe_flow_state(flow_state))
return flow_state
except Exception as e:
self._log(f"提交邮箱异常: {e}")
self._set_error(f"提交邮箱异常: {e}")
return None
def _submit_password_verify(self, password, device_id, *, user_agent=None, sec_ch_ua=None, impersonate=None, referer=None):
@@ -415,7 +521,7 @@ class OAuthClient:
impersonate=impersonate,
)
if not sentinel_pwd:
self._log("无法获取 sentinel token (password_verify)")
self._set_error("无法获取 sentinel token (password_verify)")
return None
request_url = f"{self.oauth_issuer}/api/accounts/password/verify"
@@ -445,7 +551,7 @@ class OAuthClient:
self._log(f"/password/verify -> {r.status_code}")
if r.status_code != 200:
self._log(f"密码验证失败: {r.text[:180]}")
self._set_error(f"密码验证失败: {r.status_code} - {r.text[:180]}")
return None
data = r.json()
@@ -453,7 +559,7 @@ class OAuthClient:
self._log(f"verify {describe_flow_state(flow_state)}")
return flow_state
except Exception as e:
self._log(f"密码验证异常: {e}")
self._set_error(f"密码验证异常: {e}")
return None
def login_and_get_tokens(self, email, password, device_id, user_agent=None, sec_ch_ua=None, impersonate=None, skymail_client=None):
@@ -472,6 +578,7 @@ class OAuthClient:
Returns:
dict: tokens 字典,包含 access_token, refresh_token, id_token
"""
self.last_error = ""
self._log("开始 OAuth 登录流程...")
code_verifier, code_challenge = generate_pkce()
@@ -499,7 +606,7 @@ class OAuthClient:
impersonate=impersonate,
)
if not authorize_final_url:
self._log("Bootstrap 失败")
self._set_error("Bootstrap 失败")
return None
continue_referer = (
@@ -519,6 +626,8 @@ class OAuthClient:
authorize_params=authorize_params,
)
if not state:
if not self.last_error:
self._set_error("提交邮箱后未进入有效的 OAuth 状态")
return None
self._log(f"OAuth 状态起点: {describe_flow_state(state)}")
@@ -529,7 +638,7 @@ class OAuthClient:
signature = self._state_signature(state)
seen_states[signature] = seen_states.get(signature, 0) + 1
if seen_states[signature] > 2:
self._log(f"OAuth 状态卡住: {describe_flow_state(state)}")
self._set_error(f"OAuth 状态卡住: {describe_flow_state(state)}")
return None
code = self._extract_code_from_state(state)
@@ -553,6 +662,8 @@ class OAuthClient:
referer=state.current_url or state.continue_url or referer,
)
if not next_state:
if not self.last_error:
self._set_error("密码验证后未进入下一步 OAuth 状态")
return None
referer = state.current_url or referer
state = next_state
@@ -560,7 +671,7 @@ class OAuthClient:
if self._state_is_email_otp(state):
if not skymail_client:
self._log("当前流程需要邮箱 OTP但缺少接码客户端")
self._set_error("当前流程需要邮箱 OTP但缺少接码客户端")
return None
next_state = self._handle_otp_verification(
email,
@@ -572,6 +683,24 @@ class OAuthClient:
state,
)
if not next_state:
if not self.last_error:
self._set_error("邮箱 OTP 验证后未进入下一步 OAuth 状态")
return None
referer = state.current_url or referer
state = next_state
continue
if self._state_is_add_phone(state):
next_state = self._handle_add_phone_verification(
device_id,
user_agent,
sec_ch_ua,
impersonate,
state,
)
if not next_state:
if not self.last_error:
self._set_error("手机号验证后未进入下一步 OAuth 状态")
return None
referer = state.current_url or referer
state = next_state
@@ -621,10 +750,14 @@ class OAuthClient:
self._log(f"workspace state -> {describe_flow_state(state)}")
continue
self._log(f"未支持的 OAuth 状态: {describe_flow_state(state)}")
if not self.last_error:
self._set_error(f"workspace/org 选择失败: {describe_flow_state(state)}")
return None
self._set_error(f"未支持的 OAuth 状态: {describe_flow_state(state)}")
return None
self._log("OAuth 状态机超出最大步数")
self._set_error("OAuth 状态机超出最大步数")
return None
def _extract_code_from_url(self, url):
@@ -664,17 +797,17 @@ class OAuthClient:
self._log(f"无法获取 consent session 数据 (尝试 {attempt + 1}/{max_retries})")
time.sleep(0.3)
else:
self._log("无法获取 consent session 数据")
self._set_error("无法获取 consent session 数据")
return None, None
workspaces = session_data.get("workspaces", [])
if not workspaces:
self._log("session 中没有 workspace 信息")
self._set_error("session 中没有 workspace 信息")
return None, None
workspace_id = (workspaces[0] or {}).get("id")
if not workspace_id:
self._log("workspace_id 为空")
self._set_error("workspace_id 为空")
return None, None
self._log(f"选择 workspace: {workspace_id}")
@@ -794,7 +927,7 @@ class OAuthClient:
return self._extract_code_from_state(org_state), org_state
return None, org_state
except Exception as e:
self._log(f"解析 organization/select 响应异常: {e}")
self._set_error(f"解析 organization/select 响应异常: {e}")
# 如果有 continue_url跟随它
if continue_url:
@@ -804,10 +937,12 @@ class OAuthClient:
return None, workspace_state
except Exception as e:
self._log(f"处理 workspace/select 响应异常: {e}")
self._set_error(f"处理 workspace/select 响应异常: {e}")
return None, None
except Exception as e:
self._log(f"workspace/select 异常: {e}")
self._set_error(f"workspace/select 异常: {e}")
return None, None
return None, None
@@ -951,9 +1086,6 @@ class OAuthClient:
def _decode_oauth_session_cookie(self):
"""解码 oai-client-auth-session cookie"""
import json
import base64
try:
for cookie in self.session.cookies:
try:
@@ -961,19 +1093,44 @@ class OAuthClient:
if name == "oai-client-auth-session":
value = cookie.value if hasattr(cookie, 'value') else self.session.cookies.get(name)
if value:
padded = value + "=" * (-len(value) % 4)
try:
decoded = base64.b64decode(padded).decode('utf-8')
except Exception:
decoded = base64.urlsafe_b64decode(padded).decode('utf-8')
data = json.loads(decoded)
return data
data = self._decode_cookie_json_value(value)
if data:
return data
except Exception:
continue
except Exception:
pass
return None
@staticmethod
def _decode_cookie_json_value(value):
import base64
import json
raw_value = str(value or "").strip()
if not raw_value:
return None
candidates = [raw_value]
if "." in raw_value:
candidates.insert(0, raw_value.split(".", 1)[0])
for candidate in candidates:
candidate = candidate.strip()
if not candidate:
continue
padded = candidate + "=" * (-len(candidate) % 4)
for decoder in (base64.urlsafe_b64decode, base64.b64decode):
try:
decoded = decoder(padded).decode("utf-8")
parsed = json.loads(decoded)
except Exception:
continue
if isinstance(parsed, dict):
return parsed
return None
def _exchange_code_for_tokens(self, code, code_verifier, user_agent, impersonate):
"""用 authorization code 换取 tokens"""
@@ -1008,12 +1165,222 @@ class OAuthClient:
if r.status_code == 200:
return r.json()
else:
self._log(f"换取 tokens 失败: {r.status_code} - {r.text[:200]}")
self._set_error(f"换取 tokens 失败: {r.status_code} - {r.text[:200]}")
except Exception as e:
self._log(f"换取 tokens 异常: {e}")
self._set_error(f"换取 tokens 异常: {e}")
return None
def _send_phone_number(self, phone, device_id, user_agent, sec_ch_ua, impersonate):
request_url = f"{self.oauth_issuer}/api/accounts/add-phone/send"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=f"{self.oauth_issuer}/add-phone",
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={"oai-device-id": device_id},
)
headers.update(generate_datadog_trace())
try:
kwargs = {
"json": {"phone_number": phone},
"headers": headers,
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.25)
resp = self.session.post(request_url, **kwargs)
except Exception as e:
return False, None, f"add-phone/send 异常: {e}"
self._log(f"/add-phone/send -> {resp.status_code}")
if resp.status_code != 200:
return False, None, f"add-phone/send 失败: {resp.status_code} - {resp.text[:180]}"
try:
data = resp.json()
except Exception:
return False, None, "add-phone/send 响应不是 JSON"
next_state = self._state_from_payload(data, current_url=str(resp.url) or request_url)
self._log(f"add-phone/send {describe_flow_state(next_state)}")
return True, next_state, ""
def _resend_phone_otp(self, device_id, user_agent, sec_ch_ua, impersonate, state: FlowState):
request_url = f"{self.oauth_issuer}/api/accounts/phone-otp/resend"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=state.current_url or state.continue_url or f"{self.oauth_issuer}/phone-verification",
origin=self.oauth_issuer,
fetch_site="same-origin",
extra_headers={"oai-device-id": device_id},
)
headers.update(generate_datadog_trace())
try:
kwargs = {"headers": headers, "timeout": 30, "allow_redirects": False}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.25)
resp = self.session.post(request_url, **kwargs)
except Exception as e:
return False, f"phone-otp/resend 异常: {e}"
self._log(f"/phone-otp/resend -> {resp.status_code}")
if resp.status_code == 200:
return True, ""
return False, f"phone-otp/resend 失败: {resp.status_code} - {resp.text[:180]}"
def _validate_phone_otp(self, code, device_id, user_agent, sec_ch_ua, impersonate, state: FlowState):
request_url = f"{self.oauth_issuer}/api/accounts/phone-otp/validate"
headers = self._headers(
request_url,
user_agent=user_agent,
sec_ch_ua=sec_ch_ua,
accept="application/json",
referer=state.current_url or state.continue_url or f"{self.oauth_issuer}/phone-verification",
origin=self.oauth_issuer,
content_type="application/json",
fetch_site="same-origin",
extra_headers={"oai-device-id": device_id},
)
headers.update(generate_datadog_trace())
try:
kwargs = {
"json": {"code": code},
"headers": headers,
"timeout": 30,
"allow_redirects": False,
}
if impersonate:
kwargs["impersonate"] = impersonate
self._browser_pause(0.12, 0.25)
resp = self.session.post(request_url, **kwargs)
except Exception as e:
return False, None, f"phone-otp/validate 异常: {e}"
self._log(f"/phone-otp/validate -> {resp.status_code}")
if resp.status_code != 200:
if resp.status_code == 401:
return False, None, "手机号验证码错误"
return False, None, f"phone-otp/validate 失败: {resp.status_code} - {resp.text[:180]}"
try:
data = resp.json()
except Exception:
return False, None, "phone-otp/validate 响应不是 JSON"
next_state = self._state_from_payload(data, current_url=str(resp.url) or request_url)
self._log(f"手机号 OTP 验证通过 {describe_flow_state(next_state)}")
return True, next_state, ""
def _handle_add_phone_verification(self, device_id, user_agent, sec_ch_ua, impersonate, state: FlowState):
phone_service = SMSToMePhoneService(self.config, log_fn=self._log)
if not phone_service.enabled:
self._set_error("OAuth 登录被 add_phone 阻断,当前账号需要手机号验证;未配置可用的 SMSToMe 号码池")
return None
excluded_prefixes = set()
last_failure = ""
for attempt in range(phone_service.max_attempts):
try:
entry = phone_service.acquire_phone(exclude_prefixes=excluded_prefixes)
except Exception as e:
last_failure = f"获取手机号失败: {e}"
self._log(last_failure)
break
if not entry:
last_failure = last_failure or "SMSToMe 号码池中无可用手机号"
break
prefix = phone_service.prefix_hint(entry.phone)
self._log(
f"步骤5: add_phone 选择手机号 {attempt + 1}/{phone_service.max_attempts}: {entry.phone} ({entry.country_slug})"
)
sent, next_state, detail = self._send_phone_number(
entry.phone,
device_id,
user_agent,
sec_ch_ua,
impersonate,
)
if not sent or not next_state:
last_failure = detail or "add-phone/send 未返回有效状态"
self._log(last_failure)
self._blacklist_phone_if_needed(phone_service, entry, last_failure)
excluded_prefixes.add(prefix)
continue
if next_state.page_type != "phone_otp_verification" and "phone-verification" not in f"{next_state.continue_url} {next_state.current_url}".lower():
last_failure = f"add-phone/send 未进入手机验证码页: {describe_flow_state(next_state)}"
self._log(last_failure)
self._blacklist_phone_if_needed(phone_service, entry, last_failure, next_state)
excluded_prefixes.add(prefix)
continue
session_data = self._decode_oauth_session_cookie() or {}
verification_channel = str(session_data.get("phone_verification_channel") or "sms").strip().lower() or "sms"
bound_phone = str(session_data.get("phone_number") or entry.phone).strip() or entry.phone
self._log(f"add_phone 发码成功: phone={bound_phone}, channel={verification_channel}")
if verification_channel != "sms":
last_failure = f"add_phone 已切到 {verification_channel} 通道,当前 SMSToMe 仅支持短信接码"
self._log(last_failure)
excluded_prefixes.add(prefix)
continue
code = phone_service.wait_for_code(entry)
if not code:
self._log("手机号验证码暂未收到,尝试重发一次...")
resend_ok, resend_detail = self._resend_phone_otp(
device_id,
user_agent,
sec_ch_ua,
impersonate,
next_state,
)
if resend_ok:
code = phone_service.wait_for_code(entry)
if not code:
last_failure = resend_detail or f"手机号 {entry.phone} 未收到短信验证码"
self._log(last_failure)
excluded_prefixes.add(prefix)
continue
valid, validated_state, detail = self._validate_phone_otp(
code,
device_id,
user_agent,
sec_ch_ua,
impersonate,
next_state,
)
if not valid or not validated_state:
last_failure = detail or "手机号 OTP 验证失败"
self._log(last_failure)
excluded_prefixes.add(prefix)
continue
return validated_state
self._set_error(f"add_phone 阶段失败: {last_failure or '未完成手机号验证'}")
return None
def _handle_otp_verification(self, email, device_id, user_agent, sec_ch_ua, impersonate, skymail_client, state):
"""处理 OAuth 阶段的邮箱 OTP 验证,返回服务端声明的下一步状态。"""
@@ -1039,7 +1406,7 @@ class OAuthClient:
skymail_client._used_codes = set()
tried_codes = set(getattr(skymail_client, "_used_codes", set()))
otp_deadline = time.time() + 30
otp_deadline = time.time() + 60
otp_sent_at = time.time()
def validate_otp(code):
@@ -1085,7 +1452,7 @@ class OAuthClient:
self._log("使用 wait_for_verification_code 进行阻塞式获取新验证码...")
while time.time() < otp_deadline:
remaining = max(1, int(otp_deadline - time.time()))
wait_time = min(8, remaining)
wait_time = min(10, remaining)
try:
code = skymail_client.wait_for_verification_code(
email,
@@ -1099,6 +1466,8 @@ class OAuthClient:
if not code:
self._log("暂未收到新的 OTP继续等待...")
if self.last_error:
break
continue
if code in tried_codes:
@@ -1108,6 +1477,8 @@ class OAuthClient:
next_state = validate_otp(code)
if next_state:
return next_state
if self.last_error:
break
else:
while time.time() < otp_deadline:
messages = skymail_client.fetch_emails(email) or []
@@ -1120,8 +1491,8 @@ class OAuthClient:
candidate_codes.append(code)
if not candidate_codes:
elapsed = int(30 - max(0, otp_deadline - time.time()))
self._log(f"等待新的 OTP... ({elapsed}s/30s)")
elapsed = int(60 - max(0, otp_deadline - time.time()))
self._log(f"等待新的 OTP... ({elapsed}s/60s)")
time.sleep(2)
continue
@@ -1131,6 +1502,9 @@ class OAuthClient:
return next_state
time.sleep(2)
if self.last_error:
break
self._log(f"OAuth 阶段 OTP 验证失败,已尝试 {len(tried_codes)} 个验证码")
if not self.last_error:
self._set_error(f"OAuth 阶段 OTP 验证失败,已尝试 {len(tried_codes)} 个验证码")
return None

View File

@@ -2,6 +2,8 @@
支付核心逻辑 — 生成 Plus/Team 支付链接、无痕打开浏览器、检测订阅状态
"""
from __future__ import annotations
import logging
import subprocess
import sys
@@ -258,4 +260,4 @@ def check_subscription_status(account: Account, proxy: Optional[str] = None) ->
if settings_.get("workspace_plan_type") in ("team", "enterprise"):
return "team"
return "free"
return "free"

View File

@@ -0,0 +1,98 @@
from __future__ import annotations
from pathlib import Path
from typing import Callable, Iterable, Optional
from smstome_tool import (
PhoneEntry,
get_unused_phone,
mark_phone_blacklisted,
parse_country_slugs,
update_global_phone_list,
wait_for_otp,
)
def _to_positive_int(value, default: int, *, minimum: int = 1) -> int:
try:
parsed = int(str(value).strip())
except Exception:
return default
return parsed if parsed >= minimum else default
def _prefix_hint(phone: str, width: int = 7) -> str:
value = str(phone or "").strip()
return value[: min(len(value), width)] if value else ""
class SMSToMePhoneService:
def __init__(self, config: Optional[dict] = None, log_fn: Optional[Callable[[str], None]] = None):
self.config = dict(config or {})
self.log_fn = log_fn or (lambda _msg: None)
self.cookie_header = str(self.config.get("smstome_cookie", "") or "").strip() or None
self.country_slugs = parse_country_slugs(self.config.get("smstome_country_slugs"))
self.global_file = Path(str(self.config.get("smstome_global_file") or "smstome_all_numbers.txt"))
self.used_numbers_dir = Path(str(self.config.get("smstome_used_numbers_dir") or "smstome_used"))
self.task_name = str(self.config.get("smstome_task_name") or "chatgpt_add_phone").strip() or "chatgpt_add_phone"
self.max_attempts = _to_positive_int(self.config.get("smstome_phone_attempts"), 3)
self.otp_timeout_seconds = _to_positive_int(self.config.get("smstome_otp_timeout_seconds"), 45, minimum=10)
self.poll_interval_seconds = _to_positive_int(self.config.get("smstome_poll_interval_seconds"), 5, minimum=1)
self.sync_max_pages_per_country = _to_positive_int(
self.config.get("smstome_sync_max_pages_per_country"),
5,
)
@property
def enabled(self) -> bool:
return self._has_pool_file() or bool(self.cookie_header)
def prefix_hint(self, phone: str) -> str:
return _prefix_hint(phone)
def _has_pool_file(self) -> bool:
try:
return self.global_file.exists() and self.global_file.stat().st_size > 0
except OSError:
return False
def ensure_pool_ready(self) -> None:
if self._has_pool_file():
return
if not self.cookie_header:
raise RuntimeError("未找到 SMSToMe 号码池文件,且未配置 smstome_cookie")
self.log_fn("SMSToMe 号码池不存在,开始自动同步...")
count = update_global_phone_list(
cookie_header=self.cookie_header,
countries=self.country_slugs or None,
output_path=self.global_file,
max_pages_per_country=self.sync_max_pages_per_country,
)
if count <= 0:
raise RuntimeError("SMSToMe 号码池同步后为空")
self.log_fn(f"SMSToMe 号码池同步完成,共 {count} 个号码")
def acquire_phone(self, *, exclude_prefixes: Optional[Iterable[str]] = None) -> Optional[PhoneEntry]:
self.ensure_pool_ready()
return get_unused_phone(
self.task_name,
country_slug=self.country_slugs or None,
global_file=self.global_file,
used_numbers_dir=self.used_numbers_dir,
exclude_prefixes=exclude_prefixes,
)
def mark_blacklisted(self, phone: str) -> None:
mark_phone_blacklisted(self.task_name, phone, used_numbers_dir=self.used_numbers_dir)
def wait_for_code(self, entry: PhoneEntry, *, timeout: Optional[int] = None) -> Optional[str]:
wait_seconds = _to_positive_int(timeout, self.otp_timeout_seconds, minimum=10)
return wait_for_otp(
entry,
cookie_header=self.cookie_header,
timeout=wait_seconds,
poll_interval=self.poll_interval_seconds,
trace=lambda message: self.log_fn(f"[SMSToMe] {message}"),
raise_on_timeout=False,
)

View File

@@ -84,6 +84,7 @@ class ChatGPTPlatform(BasePlatform):
browser_mode=browser_mode,
callback_logger=log_fn,
max_retries=max_retries,
extra_config=(self.config.extra or {}),
)
engine.email = email
engine.password = password
@@ -116,6 +117,7 @@ class ChatGPTPlatform(BasePlatform):
browser_mode=browser_mode,
callback_logger=log_fn,
max_retries=max_retries,
extra_config=(self.config.extra or {}),
)
if email:
engine.email = email
@@ -193,7 +195,14 @@ class ChatGPTPlatform(BasePlatform):
if plan == "plus":
url = generate_plus_link(a, proxy=proxy, country=country)
else:
url = generate_team_link(a, proxy=proxy, country=country)
url = generate_team_link(
a,
workspace_name=params.get("workspace_name", "MyTeam"),
price_interval=params.get("price_interval", "month"),
seat_quantity=int(params.get("seat_quantity", 5) or 5),
proxy=proxy,
country=country,
)
return {"ok": bool(url), "data": {"url": url}}
elif action_id == "upload_cpa":

View File

@@ -12,6 +12,7 @@ from core.base_platform import AccountStatus
from platforms.chatgpt.register import RegistrationResult
from .chatgpt_client import ChatGPTClient
from .oauth_client import OAuthClient
from .utils import generate_random_name, generate_random_birthday
logger = logging.getLogger(__name__)
@@ -46,6 +47,7 @@ class RegistrationEngineV2:
callback_logger: Optional[Callable[[str], None]] = None,
task_uuid: Optional[str] = None,
max_retries: int = 3,
extra_config: Optional[dict] = None,
):
self.email_service = email_service
self.proxy_url = proxy_url
@@ -53,6 +55,7 @@ class RegistrationEngineV2:
self.callback_logger = callback_logger
self.task_uuid = task_uuid
self.max_retries = max(1, int(max_retries or 1))
self.extra_config = dict(extra_config or {})
self.email = None
self.password = None
@@ -69,6 +72,12 @@ class RegistrationEngineV2:
else:
logger.info(log_message)
def _format_oauth_failure(self, oauth_client: OAuthClient) -> str:
detail = str(getattr(oauth_client, "last_error", "") or "").strip()
if not detail:
detail = "获取最终 OAuth Tokens 失败"
return f"账号已创建成功,但 {detail}"
def _should_retry(self, message: str) -> bool:
text = str(message or "").lower()
retriable_markers = [
@@ -151,7 +160,7 @@ class RegistrationEngineV2:
result.error_message = last_error
return result
self._log("步骤 2/2: 复用注册会话,直接获取 ChatGPT Session / AccessToken...")
self._log("步骤 2/2: 优先复用注册会话取 ChatGPT Session / AccessToken...")
session_ok, session_result = chatgpt_client.reuse_session_and_get_tokens()
if session_ok:
@@ -181,10 +190,66 @@ class RegistrationEngineV2:
self._log("=" * 60)
return result
last_error = f"注册成功,但复用会话获取 AccessToken 失败: {session_result}"
if attempt < self.max_retries - 1:
self._log(f"{last_error},准备整流程重试")
continue
self._log(f"复用会话失败,回退到 OAuth 登录补全流程: {session_result}")
tokens = None
oauth_client = None
for oauth_attempt in range(2):
if oauth_attempt > 0:
self._log(f"同账号 OAuth 重试 {oauth_attempt + 1}/2 ...")
time.sleep(1)
oauth_client = OAuthClient(
config=self.extra_config,
proxy=self.proxy_url,
verbose=False,
browser_mode=self.browser_mode,
)
oauth_client._log = self._log
oauth_client.session = chatgpt_client.session
tokens = oauth_client.login_and_get_tokens(
email_addr,
pwd,
chatgpt_client.device_id,
chatgpt_client.ua,
chatgpt_client.sec_ch_ua,
chatgpt_client.impersonate,
skymail_adapter,
)
if tokens and tokens.get("access_token"):
break
if oauth_client.last_error and "add_phone" in oauth_client.last_error:
break
if tokens and tokens.get("access_token"):
self._log("OAuth 回退补全成功!")
result.success = True
result.access_token = tokens.get("access_token")
result.refresh_token = tokens.get("refresh_token")
result.id_token = tokens.get("id_token")
result.account_id = "v2_acct_" + chatgpt_client.device_id[:8]
session_data = oauth_client._decode_oauth_session_cookie()
if session_data:
workspaces = session_data.get("workspaces", [])
if workspaces:
result.workspace_id = str((workspaces[0] or {}).get("id") or "")
self._log(f"成功萃取 Workspace ID: {result.workspace_id}")
session_cookie = None
for cookie in oauth_client.session.cookies.jar:
if cookie.name == "__Secure-next-auth.session-token":
session_cookie = cookie.value
break
result.session_token = session_cookie
self._log("=" * 60)
self._log("注册流程成功结束!")
self._log("=" * 60)
return result
last_error = self._format_oauth_failure(oauth_client)
result.error_message = last_error
return result
except Exception as attempt_error:

View File

@@ -3,6 +3,8 @@ Token 刷新模块
支持 Session Token 和 OAuth Refresh Token 两种刷新方式
"""
from __future__ import annotations
import logging
import json
import time
@@ -331,4 +333,4 @@ def validate_account_token(account_id: int, proxy_url: Optional[str] = None) ->
return False, "账号没有 access_token"
manager = TokenRefreshManager(proxy_url=proxy_url)
return manager.validate_token(account.access_token)
return manager.validate_token(account.access_token)

View File

@@ -13,3 +13,5 @@ camoufox>=0.4.0
aiofiles>=23.0.0
rich>=13.7.1
pyinstaller>=6.0.0
httpx>=0.27.0
selectolax>=0.3.21

View File

@@ -0,0 +1,146 @@
#!/usr/bin/env python3
from __future__ import annotations
import sys
from pathlib import Path
from sqlmodel import Session, select
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from core.base_platform import Account, AccountStatus, RegisterConfig
from core.db import AccountModel, engine
from core.registry import get, load_all
def load_accounts(ids: list[int]) -> list[AccountModel]:
with Session(engine) as session:
items = []
for account_id in ids:
acc = session.get(AccountModel, account_id)
if acc and acc.platform == "chatgpt":
items.append(acc)
return items
def to_account_model(acc: AccountModel) -> Account:
return Account(
platform=acc.platform,
email=acc.email,
password=acc.password,
user_id=acc.user_id,
token=acc.token,
status=AccountStatus(acc.status),
extra=acc.get_extra(),
)
def parse_id_list(raw: str) -> list[int]:
ids = []
for part in raw.split(","):
text = part.strip()
if not text:
continue
ids.append(int(text))
return ids
def choose_account(items: list[AccountModel]) -> AccountModel:
print("\n可选 ChatGPT 账号:")
for idx, acc in enumerate(items, start=1):
extra = acc.get_extra()
print(
f"{idx}. id={acc.id} email={acc.email} "
f"refresh={'Y' if extra.get('refresh_token') else 'N'} "
f"session={'Y' if extra.get('session_token') else 'N'} "
f"cookies={'Y' if extra.get('cookies') else 'N'}"
)
while True:
raw = input("\n选择序号: ").strip()
try:
pos = int(raw)
except ValueError:
print("请输入数字序号。")
continue
if 1 <= pos <= len(items):
return items[pos - 1]
print("序号超出范围。")
def main() -> None:
load_all()
platform_cls = get("chatgpt")
instance = platform_cls(config=RegisterConfig())
raw_ids = input("输入 ChatGPT 账号 id 列表,逗号分隔: ").strip()
if not raw_ids:
print("未输入账号 id。")
return
try:
ids = parse_id_list(raw_ids)
except ValueError:
print("账号 id 列表格式错误。示例: 22,21,18")
return
accounts = load_accounts(ids)
if not accounts:
print("没有找到匹配的 ChatGPT 账号。")
return
selected = choose_account(accounts)
plan = input("套餐 [plus/team],默认 plus: ").strip().lower() or "plus"
if plan not in {"plus", "team"}:
print("不支持的套餐。")
return
country = input("地区代码,默认 US: ").strip().upper() or "US"
params: dict[str, object] = {"plan": plan, "country": country}
if plan == "team":
workspace_name = input("Workspace 名称,默认 MyTeam: ").strip() or "MyTeam"
price_interval = input("周期 [month/year],默认 month: ").strip().lower() or "month"
seat_quantity_raw = input("席位数,默认 5: ").strip() or "5"
try:
seat_quantity = int(seat_quantity_raw)
except ValueError:
print("席位数必须是整数。")
return
params.update(
{
"workspace_name": workspace_name,
"price_interval": price_interval,
"seat_quantity": seat_quantity,
}
)
account = to_account_model(selected)
print(f"\n使用账号 id={selected.id} email={selected.email}")
refresh_answer = input("先刷新 token? [Y/n]: ").strip().lower()
if refresh_answer in {"", "y", "yes"}:
refresh_result = instance.execute_action("refresh_token", account, {})
print("refresh_result =", refresh_result)
if not refresh_result.get("ok"):
return
data = refresh_result.get("data") or {}
if data.get("access_token"):
account.token = data["access_token"]
account.extra["access_token"] = data["access_token"]
if data.get("refresh_token"):
account.extra["refresh_token"] = data["refresh_token"]
result = instance.execute_action("payment_link", account, params)
print("payment_result =", result)
if result.get("ok"):
url = ((result.get("data") or {}).get("url") or "").strip()
if url:
print("\n支付链接:")
print(url)
if __name__ == "__main__":
main()

143
services/chatgpt_sync.py Normal file
View File

@@ -0,0 +1,143 @@
"""ChatGPT 账号与 CPA 的同步辅助逻辑。"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from sqlmodel import Session
from core.db import AccountModel, engine
CPA_SYNC_NAME = "cpa"
def _utcnow() -> datetime:
return datetime.now(timezone.utc)
def _utcnow_iso() -> str:
return _utcnow().isoformat()
def _get_account_extra(account: Any) -> dict[str, Any]:
if hasattr(account, "get_extra"):
try:
extra = account.get_extra()
if isinstance(extra, dict):
return extra
except Exception:
pass
extra = getattr(account, "extra", {})
return extra if isinstance(extra, dict) else {}
def get_cpa_sync_state(extra_or_account: Any) -> dict[str, Any]:
extra = extra_or_account if isinstance(extra_or_account, dict) else _get_account_extra(extra_or_account)
sync_statuses = extra.get("sync_statuses", {})
if not isinstance(sync_statuses, dict):
return {}
state = sync_statuses.get(CPA_SYNC_NAME, {})
return state if isinstance(state, dict) else {}
def has_cpa_upload_success(extra_or_account: Any) -> bool:
state = get_cpa_sync_state(extra_or_account)
return bool(state.get("uploaded") or state.get("uploaded_at"))
def record_cpa_sync_result(extra: dict[str, Any], ok: bool, msg: str) -> dict[str, Any]:
sync_statuses = extra.get("sync_statuses")
if not isinstance(sync_statuses, dict):
sync_statuses = {}
state = sync_statuses.get(CPA_SYNC_NAME)
if not isinstance(state, dict):
state = {}
now = _utcnow_iso()
state["last_attempt_ok"] = bool(ok)
state["last_message"] = msg
state["last_attempt_at"] = now
state["uploaded"] = bool(state.get("uploaded")) or bool(ok)
if ok:
state["uploaded_at"] = now
sync_statuses[CPA_SYNC_NAME] = state
extra["sync_statuses"] = sync_statuses
return state
def build_chatgpt_sync_account(account: Any):
extra = _get_account_extra(account)
class _SyncAccount:
pass
obj = _SyncAccount()
obj.email = getattr(account, "email", "")
obj.access_token = extra.get("access_token") or getattr(account, "token", "")
obj.refresh_token = extra.get("refresh_token", "")
obj.id_token = extra.get("id_token", "")
obj.session_token = extra.get("session_token", "")
obj.client_id = extra.get("client_id", "app_EMoamEEZ73f0CkXaXp7hrann")
obj.cookies = extra.get("cookies", "")
return obj
def upload_chatgpt_account_to_cpa(account: Any, api_url: str | None = None, api_key: str | None = None) -> tuple[bool, str]:
try:
sync_account = build_chatgpt_sync_account(account)
if not getattr(sync_account, "access_token", ""):
return False, "账号缺少 access_token"
from platforms.chatgpt.cpa_upload import generate_token_json, upload_to_cpa
token_data = generate_token_json(sync_account)
return upload_to_cpa(token_data, api_url=api_url, api_key=api_key)
except Exception as exc:
return False, f"上传异常: {exc}"
def update_account_model_cpa_sync(
account: AccountModel,
ok: bool,
msg: str,
session: Session | None = None,
commit: bool = True,
) -> dict[str, Any]:
extra = account.get_extra()
state = record_cpa_sync_result(extra, ok, msg)
account.set_extra(extra)
account.updated_at = _utcnow()
if session is not None:
session.add(account)
if commit:
session.commit()
session.refresh(account)
return state
def persist_cpa_sync_result(account: Any, ok: bool, msg: str) -> None:
if isinstance(account, AccountModel) and account.id is not None:
with Session(engine) as session:
row = session.get(AccountModel, account.id)
if row:
update_account_model_cpa_sync(row, ok, msg, session=session, commit=True)
return
extra = getattr(account, "extra", None)
if isinstance(extra, dict):
record_cpa_sync_result(extra, ok, msg)
def upload_account_model_to_cpa(
account: AccountModel,
session: Session | None = None,
api_url: str | None = None,
api_key: str | None = None,
commit: bool = True,
) -> tuple[bool, str]:
ok, msg = upload_chatgpt_account_to_cpa(account, api_url=api_url, api_key=api_key)
update_account_model_cpa_sync(account, ok, msg, session=session, commit=commit)
return ok, msg

View File

@@ -300,6 +300,19 @@ def _conda_exe() -> str | None:
return None
def _uv_exe() -> str | None:
candidate = shutil.which("uv")
if candidate and Path(candidate).exists():
return candidate
return None
def _venv_python(repo: Path) -> Path:
if os.name == "nt":
return repo / ".venv" / "Scripts" / "python.exe"
return repo / ".venv" / "bin" / "python"
def _resolve_kiro_exe() -> str | None:
try:
from core.config_store import config_store
@@ -398,6 +411,34 @@ def _ensure_grok2api_conda_env(repo: Path) -> str:
return env_name
def _ensure_grok2api_uv_env(repo: Path) -> str:
uv = _uv_exe()
if not uv:
raise RuntimeError("未找到 uv无法为 grok2api 自动创建项目虚拟环境")
marker = repo / ".grok2api-env-ready"
if not marker.exists() or marker.read_text(encoding="utf-8").strip() != "uv":
subprocess.run(
[
uv,
"sync",
"--frozen",
"--no-dev",
"--no-install-project",
"--python",
"3.13",
],
cwd=str(repo),
check=True,
creationflags=_creationflags(),
)
marker.write_text("uv", encoding="utf-8")
venv_python = _venv_python(repo)
if not venv_python.exists():
raise RuntimeError("grok2api 的 uv 环境创建失败,未找到 .venv/python")
return str(venv_python)
def _ensure_cliproxyapi_runtime_config(repo: Path):
config_path = repo / "config.local.yaml"
if not config_path.exists():
@@ -479,15 +520,32 @@ def _build_command(name: str) -> tuple[list[str], Path]:
if name == "grok2api":
_ensure_grok2api_runtime_config(repo)
env_name = _ensure_grok2api_conda_env(repo)
conda = _conda_exe()
if conda:
env_name = _ensure_grok2api_conda_env(repo)
return [
conda,
"run",
"--no-capture-output",
"-n",
env_name,
"python",
"-m",
"granian",
"--interface",
"asgi",
"--host",
"127.0.0.1",
"--port",
"8011",
"--workers",
"1",
"main:app",
], repo
python_exe = _ensure_grok2api_uv_env(repo)
return [
conda,
"run",
"--no-capture-output",
"-n",
env_name,
"python",
python_exe,
"-m",
"granian",
"--interface",

View File

@@ -4,6 +4,8 @@ from __future__ import annotations
from typing import Any
from services.chatgpt_sync import persist_cpa_sync_result, upload_chatgpt_account_to_cpa
def sync_account(account) -> list[dict[str, Any]]:
"""根据平台将账号同步到外部系统。"""
@@ -15,19 +17,8 @@ def sync_account(account) -> list[dict[str, Any]]:
if platform == "chatgpt":
cpa_url = config_store.get("cpa_api_url", "")
if cpa_url:
from platforms.chatgpt.cpa_upload import generate_token_json, upload_to_cpa
class _A:
pass
a = _A()
a.email = account.email
extra = account.extra or {}
a.access_token = extra.get("access_token") or account.token
a.refresh_token = extra.get("refresh_token", "")
a.id_token = extra.get("id_token", "")
ok, msg = upload_to_cpa(generate_token_json(a))
ok, msg = upload_chatgpt_account_to_cpa(account)
persist_cpa_sync_result(account, ok, msg)
results.append({"name": "CPA", "ok": ok, "msg": msg})
elif platform == "grok":

View File

@@ -6,8 +6,9 @@ import time
import threading
import requests
SOLVER_PORT = 8889
SOLVER_PORT = int(os.getenv("SOLVER_PORT", "8889"))
SOLVER_URL = f"http://localhost:{SOLVER_PORT}"
SOLVER_BROWSER_TYPE = os.getenv("SOLVER_BROWSER_TYPE", "camoufox").strip() or "camoufox"
_proc: subprocess.Popen = None
_log_file = None
_lock = threading.Lock()
@@ -40,7 +41,7 @@ def start():
"-u",
solver_script,
"--browser_type",
"camoufox",
SOLVER_BROWSER_TYPE,
"--port",
str(SOLVER_PORT),
],

1122
smstome_tool.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,79 @@
import unittest
from unittest.mock import patch
from core.base_mailbox import MailboxAccount, create_mailbox
class CFWorkerMailboxTests(unittest.TestCase):
def _build_mailbox(self):
return create_mailbox(
"cfworker",
extra={
"cfworker_api_url": "https://example.invalid",
"cfworker_admin_token": "admin-token",
"cfworker_domain": "mail.example",
},
)
@patch("requests.request")
def test_get_email_issues_single_request_via_factory_mailbox(self, mock_request):
mock_request.return_value.status_code = 200
mock_request.return_value.text = '{"email":"user@mail.example","token":"token-123"}'
mock_request.return_value.json.return_value = {
"email": "user@mail.example",
"token": "token-123",
}
mailbox = self._build_mailbox()
account = mailbox.get_email()
self.assertEqual(account.email, "user@mail.example")
self.assertEqual(account.account_id, "token-123")
mock_request.assert_called_once_with(
"POST",
"https://example.invalid/admin/new_address",
params=None,
json={"enablePrefix": True, "name": unittest.mock.ANY, "domain": "mail.example"},
headers={
"accept": "application/json, text/plain, */*",
"content-type": "application/json",
"x-admin-auth": "admin-token",
},
proxies=None,
timeout=15,
)
@patch("requests.request")
def test_get_current_ids_issues_single_request_via_factory_mailbox(self, mock_request):
mock_request.return_value.status_code = 200
mock_request.return_value.text = '{"results":[{"id":101},{"id":202}]}'
mock_request.return_value.json.return_value = {
"results": [
{"id": 101},
{"id": 202},
]
}
mailbox = self._build_mailbox()
account = MailboxAccount(email="user@mail.example")
ids = mailbox.get_current_ids(account)
self.assertEqual(ids, {"101", "202"})
mock_request.assert_called_once_with(
"GET",
"https://example.invalid/admin/mails",
params={"limit": 20, "offset": 0, "address": "user@mail.example"},
json=None,
headers={
"accept": "application/json, text/plain, */*",
"content-type": "application/json",
"x-admin-auth": "admin-token",
},
proxies=None,
timeout=10,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,203 @@
import base64
import json
import tempfile
import unittest
from pathlib import Path
from unittest import mock
from platforms.chatgpt.oauth_client import OAuthClient
from platforms.chatgpt.phone_service import SMSToMePhoneService
from platforms.chatgpt.utils import FlowState
from smstome_tool import PhoneEntry, parse_country_slugs
class OAuthCookieDecodeTests(unittest.TestCase):
def test_decode_signed_cookie_payload(self):
payload = {
"email": "demo@example.com",
"phone_number": "+447456344799",
"phone_verification_channel": "whatsapp",
}
encoded = base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8")).decode("utf-8").rstrip("=")
cookie_value = f"{encoded}.sig-a.sig-b"
self.assertEqual(OAuthClient._decode_cookie_json_value(cookie_value), payload)
def test_decode_invalid_cookie_payload(self):
self.assertIsNone(OAuthClient._decode_cookie_json_value("not-a-valid-cookie"))
class SMSToMeConfigTests(unittest.TestCase):
def test_parse_country_slugs_accepts_csv_and_iterables(self):
self.assertEqual(
parse_country_slugs("united-kingdom, poland;finland"),
["united-kingdom", "poland", "finland"],
)
self.assertEqual(
parse_country_slugs(["united-kingdom", "poland", "united_kingdom"]),
["united-kingdom", "poland"],
)
def test_phone_service_enabled_when_pool_file_exists(self):
with tempfile.TemporaryDirectory() as tmp_dir:
pool_path = Path(tmp_dir) / "phones.txt"
pool_path.write_text("+447456344799\tunited-kingdom\thttps://example.com\n", encoding="utf-8")
service = SMSToMePhoneService({"smstome_global_file": str(pool_path)})
self.assertTrue(service.enabled)
def test_phone_service_disabled_for_empty_pool_without_cookie(self):
with tempfile.TemporaryDirectory() as tmp_dir:
pool_path = Path(tmp_dir) / "phones.txt"
pool_path.write_text("", encoding="utf-8")
service = SMSToMePhoneService({"smstome_global_file": str(pool_path)})
self.assertFalse(service.enabled)
def test_wait_for_code_forwards_cookie_timeout_and_poll_interval(self):
entry = PhoneEntry(
country_slug="united-kingdom",
phone="+447456344799",
detail_url="https://example.com/phone/1",
)
service = SMSToMePhoneService(
{
"smstome_cookie": "cf_clearance=demo",
"smstome_otp_timeout_seconds": "66",
"smstome_poll_interval_seconds": "7",
}
)
with mock.patch("platforms.chatgpt.phone_service.wait_for_otp", return_value="123456") as mocked:
code = service.wait_for_code(entry)
self.assertEqual(code, "123456")
mocked.assert_called_once()
kwargs = mocked.call_args.kwargs
self.assertEqual(kwargs["cookie_header"], "cf_clearance=demo")
self.assertEqual(kwargs["timeout"], 66)
self.assertEqual(kwargs["poll_interval"], 7)
self.assertFalse(kwargs["raise_on_timeout"])
def test_ensure_pool_ready_syncs_with_configured_page_limit(self):
with tempfile.TemporaryDirectory() as tmp_dir:
pool_path = Path(tmp_dir) / "phones.txt"
service = SMSToMePhoneService(
{
"smstome_cookie": "cf_clearance=demo",
"smstome_country_slugs": "united-kingdom",
"smstome_global_file": str(pool_path),
"smstome_sync_max_pages_per_country": "9",
}
)
with mock.patch("platforms.chatgpt.phone_service.update_global_phone_list", return_value=3) as mocked:
service.ensure_pool_ready()
mocked.assert_called_once()
kwargs = mocked.call_args.kwargs
self.assertEqual(kwargs["cookie_header"], "cf_clearance=demo")
self.assertEqual(kwargs["countries"], ["united-kingdom"])
self.assertEqual(kwargs["output_path"], pool_path)
self.assertEqual(kwargs["max_pages_per_country"], 9)
class OAuthPhoneBlacklistTests(unittest.TestCase):
def test_should_blacklist_explicit_phone_rejection(self):
state = FlowState(
page_type="add_phone",
payload={"error": {"message": "phone number is invalid"}},
)
self.assertTrue(
OAuthClient._should_blacklist_phone_failure(
"add-phone/send 失败: 400 - phone number is invalid",
state,
)
)
def test_should_not_blacklist_whatsapp_or_delivery_failures(self):
self.assertFalse(
OAuthClient._should_blacklist_phone_failure(
"add_phone 已切到 whatsapp 通道,当前 SMSToMe 仅支持短信接码"
)
)
self.assertFalse(
OAuthClient._should_blacklist_phone_failure("手机号 +447000000001 未收到短信验证码")
)
def test_handle_add_phone_blacklists_explicitly_rejected_number(self):
client = OAuthClient(config={}, verbose=False)
client._log = lambda _msg: None
entry = PhoneEntry(
country_slug="united-kingdom",
phone="+447000000001",
detail_url="https://example.com/phone/1",
)
phone_service = mock.Mock()
phone_service.enabled = True
phone_service.max_attempts = 1
phone_service.acquire_phone.return_value = entry
phone_service.prefix_hint.return_value = "+447000"
with mock.patch("platforms.chatgpt.oauth_client.SMSToMePhoneService", return_value=phone_service):
with mock.patch.object(
client,
"_send_phone_number",
return_value=(False, None, "add-phone/send 失败: 400 - phone number is invalid"),
):
state = client._handle_add_phone_verification(
"device-id",
"Mozilla/5.0",
None,
None,
FlowState(page_type="add_phone"),
)
self.assertIsNone(state)
phone_service.mark_blacklisted.assert_called_once_with(entry.phone)
self.assertIn("add_phone 阶段失败", client.last_error)
def test_handle_add_phone_does_not_blacklist_whatsapp_channel(self):
client = OAuthClient(config={}, verbose=False)
client._log = lambda _msg: None
entry = PhoneEntry(
country_slug="united-kingdom",
phone="+447000000002",
detail_url="https://example.com/phone/2",
)
phone_service = mock.Mock()
phone_service.enabled = True
phone_service.max_attempts = 1
phone_service.acquire_phone.return_value = entry
phone_service.prefix_hint.return_value = "+447000"
next_state = FlowState(
page_type="phone_otp_verification",
continue_url="https://auth.openai.com/phone-verification",
)
with mock.patch("platforms.chatgpt.oauth_client.SMSToMePhoneService", return_value=phone_service):
with mock.patch.object(client, "_send_phone_number", return_value=(True, next_state, "")):
with mock.patch.object(
client,
"_decode_oauth_session_cookie",
return_value={
"phone_verification_channel": "whatsapp",
"phone_number": entry.phone,
},
):
state = client._handle_add_phone_verification(
"device-id",
"Mozilla/5.0",
None,
None,
FlowState(page_type="add_phone"),
)
self.assertIsNone(state)
phone_service.mark_blacklisted.assert_not_called()
self.assertIn("whatsapp", client.last_error)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,72 @@
import tempfile
import unittest
from pathlib import Path
from core.config_store import (
_canonical_config_key,
_get_env_fallback_value,
_load_env_file,
_merge_env_fallback,
_normalize_config_value,
)
class ConfigStoreEnvFallbackTests(unittest.TestCase):
def test_normalize_config_value_strips_matching_quotes(self):
self.assertEqual(_normalize_config_value('"quoted"'), "quoted")
self.assertEqual(_normalize_config_value("'quoted'"), "quoted")
self.assertEqual(_normalize_config_value("plain"), "plain")
def test_load_env_file_supports_export_and_quotes(self):
with tempfile.TemporaryDirectory() as tmp_dir:
env_path = Path(tmp_dir) / ".env"
env_path.write_text(
"\n".join(
[
"# comment",
"export SMSTOME_COOKIE='cf_clearance=demo'",
'cfworker_custom_auth="secret-pass"',
]
),
encoding="utf-8",
)
values = _load_env_file(env_path)
self.assertEqual(values["SMSTOME_COOKIE"], "cf_clearance=demo")
self.assertEqual(values["cfworker_custom_auth"], "secret-pass")
def test_get_env_fallback_value_matches_uppercase_env_names(self):
env_values = {
"SMSTOME_COOKIE": "cf_clearance=demo",
"CFWORKER_CUSTOM_AUTH": "secret-pass",
}
self.assertEqual(
_get_env_fallback_value("smstome_cookie", env_values=env_values),
"cf_clearance=demo",
)
self.assertEqual(
_get_env_fallback_value("cfworker_custom_auth", env_values=env_values),
"secret-pass",
)
def test_merge_env_fallback_uses_canonical_key_without_overriding_db(self):
merged = _merge_env_fallback(
{
"smstome_cookie": "",
"cfworker_custom_auth": "db-value",
},
env_values={
"SMSTOME_COOKIE": "cf_clearance=demo",
"CFWORKER_CUSTOM_AUTH": "env-value",
},
)
self.assertEqual(_canonical_config_key("SMSTOME_COOKIE"), "smstome_cookie")
self.assertEqual(merged["smstome_cookie"], "cf_clearance=demo")
self.assertEqual(merged["cfworker_custom_auth"], "db-value")
if __name__ == "__main__":
unittest.main()