Merge branch 'feature/dev' into main

This commit is contained in:
zhangchen
2026-04-10 01:03:15 +08:00
24 changed files with 1773 additions and 261 deletions

View File

@@ -6,6 +6,8 @@ from services.mail_imports import MailImportExecuteRequest, MailImportSnapshotRe
router = APIRouter(prefix="/config", tags=["config"])
CONFIG_KEYS = [
"email_domain_rule_enabled",
"email_domain_level_count",
"laoudo_auth",
"laoudo_email",
"laoudo_account_id",
@@ -100,6 +102,9 @@ CONFIG_KEYS = [
"contribution_enabled",
"contribution_server_url",
"contribution_key",
"contribution_mode",
"custom_contribution_url",
"custom_contribution_token",
]
@@ -137,8 +142,16 @@ def get_config():
all_cfg["contribution_enabled"] = "0"
if not all_cfg.get("contribution_server_url"):
all_cfg["contribution_server_url"] = "http://new.xem8k5.top:7317/"
if not all_cfg.get("contribution_mode"):
all_cfg["contribution_mode"] = "codex"
if not all_cfg.get("custom_contribution_url"):
all_cfg["custom_contribution_url"] = "http://127.0.0.1:5000"
if not all_cfg.get("external_apps_update_mode"):
all_cfg["external_apps_update_mode"] = "tag"
if not str(all_cfg.get("email_domain_rule_enabled", "") or "").strip():
all_cfg["email_domain_rule_enabled"] = "0"
if not str(all_cfg.get("email_domain_level_count", "") or "").strip():
all_cfg["email_domain_level_count"] = "2"
# 只返回已知 key未设置的返回空字符串
return {k: all_cfg.get(k, "") for k in CONFIG_KEYS}
@@ -149,6 +162,19 @@ def update_config(body: ConfigUpdate):
safe = {k: v for k, v in body.data.items() if k in CONFIG_KEYS}
if safe.get("mail_provider") == "outlook":
safe["mail_provider"] = "microsoft"
if "email_domain_rule_enabled" in safe:
enabled = str(safe.get("email_domain_rule_enabled", "")).strip().lower()
safe["email_domain_rule_enabled"] = (
"1" if enabled in {"1", "true", "yes", "on"} else "0"
)
if "email_domain_level_count" in safe:
try:
level_count = int(str(safe.get("email_domain_level_count", "")).strip())
except (TypeError, ValueError) as exc:
raise HTTPException(status_code=400, detail="域名级数必须是整数") from exc
if level_count < 2:
raise HTTPException(status_code=400, detail="域名级数不能小于 2")
safe["email_domain_level_count"] = str(level_count)
config_store.set_many(safe)
return {"ok": True, "updated": list(safe.keys())}

View File

@@ -18,6 +18,10 @@ class ProxyBulkCreate(BaseModel):
region: str = ""
class ProxyBatchDelete(BaseModel):
ids: list[int]
@router.get("")
def list_proxies(session: Session = Depends(get_session)):
items = session.exec(select(ProxyModel)).all()
@@ -61,6 +65,27 @@ def delete_proxy(proxy_id: int, session: Session = Depends(get_session)):
return {"ok": True}
@router.post("/batch-delete")
def batch_delete_proxies(body: ProxyBatchDelete, session: Session = Depends(get_session)):
if not body.ids:
raise HTTPException(400, "代理 ID 列表不能为空")
ids = list(dict.fromkeys(int(i) for i in body.ids))
if len(ids) > 1000:
raise HTTPException(400, "单次最多删除 1000 条代理")
proxies = session.exec(select(ProxyModel).where(ProxyModel.id.in_(ids))).all()
found_ids = {p.id for p in proxies if p.id is not None}
for p in proxies:
session.delete(p)
session.commit()
return {
"deleted": len(found_ids),
"not_found": [pid for pid in ids if pid not in found_ids],
"total_requested": len(ids),
}
@router.patch("/{proxy_id}/toggle")
def toggle_proxy(proxy_id: int, session: Session = Depends(get_session)):
p = session.get(ProxyModel, proxy_id)

View File

@@ -4,7 +4,8 @@ from pydantic import BaseModel, Field
from sqlmodel import Session, select
from typing import Optional
from copy import deepcopy
from core.db import TaskLog, engine
from datetime import datetime, timezone
from core.db import TaskLog, TaskRunModel, engine
from core.task_runtime import (
AttemptOutcome,
AttemptResult,
@@ -42,18 +43,229 @@ class TaskLogBatchDeleteRequest(BaseModel):
ids: list[int]
def _ensure_task_exists(task_id: str) -> None:
def _utcnow() -> datetime:
return datetime.now(timezone.utc)
def _json_dumps(value, fallback):
try:
return json.dumps(value, ensure_ascii=False)
except Exception:
return json.dumps(fallback, ensure_ascii=False)
def _json_loads(raw: str, fallback):
try:
return json.loads(raw or "")
except Exception:
return fallback
def _to_epoch_seconds(value) -> float:
if isinstance(value, datetime):
return value.timestamp()
try:
return float(value or 0)
except Exception:
return 0.0
def _to_datetime(value) -> datetime:
try:
ts = float(value or 0)
if ts > 1_000_000_000_000:
ts /= 1000
if ts <= 0:
return _utcnow()
return datetime.fromtimestamp(ts, tz=timezone.utc)
except Exception:
return _utcnow()
def _normalize_snapshot(snapshot: dict) -> dict:
return {
"id": str(snapshot.get("id") or ""),
"status": str(snapshot.get("status") or "pending"),
"platform": str(snapshot.get("platform") or ""),
"source": str(snapshot.get("source") or "manual"),
"meta": snapshot.get("meta") if isinstance(snapshot.get("meta"), dict) else {},
"total": int(snapshot.get("total") or 0),
"progress": str(snapshot.get("progress") or "0/0"),
"logs": snapshot.get("logs") if isinstance(snapshot.get("logs"), list) else [],
"success": int(snapshot.get("success") or 0),
"registered": int(snapshot.get("registered") or 0),
"skipped": int(snapshot.get("skipped") or 0),
"errors": snapshot.get("errors") if isinstance(snapshot.get("errors"), list) else [],
"control": snapshot.get("control") if isinstance(snapshot.get("control"), dict) else {},
"cashier_urls": snapshot.get("cashier_urls") if isinstance(snapshot.get("cashier_urls"), list) else [],
"error": str(snapshot.get("error") or ""),
"created_at": _to_epoch_seconds(snapshot.get("created_at")),
"updated_at": _to_epoch_seconds(snapshot.get("updated_at")),
}
def _task_run_to_snapshot(row: TaskRunModel) -> dict:
return _normalize_snapshot(
{
"id": row.id,
"status": row.status,
"platform": row.platform,
"source": row.source,
"meta": _json_loads(row.meta_json, {}),
"total": row.total,
"progress": row.progress,
"logs": _json_loads(row.logs_json, []),
"success": row.success,
"registered": row.registered,
"skipped": row.skipped,
"errors": _json_loads(row.errors_json, []),
"control": _json_loads(row.control_json, {}),
"cashier_urls": _json_loads(row.cashier_urls_json, []),
"error": row.error,
"created_at": row.created_at,
"updated_at": row.updated_at,
}
)
def _upsert_task_run(snapshot: dict) -> None:
normalized = _normalize_snapshot(snapshot)
if not normalized["id"]:
return
with Session(engine) as s:
row = s.get(TaskRunModel, normalized["id"])
if row is None:
row = TaskRunModel(
id=normalized["id"],
platform=normalized["platform"],
source=normalized["source"],
status=normalized["status"],
total=normalized["total"],
progress=normalized["progress"],
success=normalized["success"],
registered=normalized["registered"],
skipped=normalized["skipped"],
error=normalized["error"],
meta_json=_json_dumps(normalized["meta"], {}),
logs_json=_json_dumps(normalized["logs"], []),
errors_json=_json_dumps(normalized["errors"], []),
cashier_urls_json=_json_dumps(normalized["cashier_urls"], []),
control_json=_json_dumps(normalized["control"], {}),
created_at=_to_datetime(normalized["created_at"]),
updated_at=_to_datetime(normalized["updated_at"]),
)
s.add(row)
else:
row.platform = normalized["platform"]
row.source = normalized["source"]
row.status = normalized["status"]
row.total = normalized["total"]
row.progress = normalized["progress"]
row.success = normalized["success"]
row.registered = normalized["registered"]
row.skipped = normalized["skipped"]
row.error = normalized["error"]
row.meta_json = _json_dumps(normalized["meta"], {})
row.logs_json = _json_dumps(normalized["logs"], [])
row.errors_json = _json_dumps(normalized["errors"], [])
row.cashier_urls_json = _json_dumps(normalized["cashier_urls"], [])
row.control_json = _json_dumps(normalized["control"], {})
if row.created_at is None:
row.created_at = _to_datetime(normalized["created_at"])
row.updated_at = _to_datetime(normalized["updated_at"])
s.add(row)
s.commit()
def _persist_task_snapshot(task_id: str) -> None:
if not _task_store.exists(task_id):
return
try:
snapshot = _task_store.snapshot(task_id)
except Exception:
return
_upsert_task_run(snapshot)
def _get_persisted_task(task_id: str) -> Optional[dict]:
with Session(engine) as s:
row = s.get(TaskRunModel, task_id)
if row is None:
return None
return _task_run_to_snapshot(row)
def _list_persisted_tasks() -> list[dict]:
with Session(engine) as s:
rows = s.exec(select(TaskRunModel)).all()
snapshots = [_task_run_to_snapshot(row) for row in rows]
snapshots.sort(
key=lambda item: (
{"running": 0, "pending": 1, "done": 2, "failed": 3, "stopped": 4}.get(
str(item.get("status") or ""),
9,
),
-_to_epoch_seconds(item.get("created_at")),
)
)
return snapshots
def _finalize_orphan_tasks() -> None:
with Session(engine) as s:
rows = s.exec(
select(TaskRunModel).where(TaskRunModel.status.in_(["pending", "running"]))
).all()
if not rows:
return
changed = False
for row in rows:
if _task_store.exists(row.id):
continue
row.status = "stopped"
row.error = row.error or "任务因服务重启中断"
logs = _json_loads(row.logs_json, [])
tip = "[SYSTEM] 任务因服务重启中断,已自动标记为已停止"
if tip not in logs:
ts = datetime.now().strftime("%H:%M:%S")
logs.append(f"[{ts}] {tip}")
row.logs_json = _json_dumps(logs, [])
row.updated_at = _utcnow()
s.add(row)
changed = True
if changed:
s.commit()
def _ensure_task_exists(task_id: str) -> None:
if _task_store.exists(task_id):
return
if _get_persisted_task(task_id) is None:
raise HTTPException(404, "任务不存在")
def _ensure_task_mutable(task_id: str) -> None:
_ensure_task_exists(task_id)
snapshot = _task_store.snapshot(task_id)
if _task_store.exists(task_id):
snapshot = _task_store.snapshot(task_id)
else:
snapshot = _get_persisted_task(task_id) or {}
if snapshot.get("status") in {"done", "failed", "stopped"}:
raise HTTPException(409, "任务已结束,无法再执行控制操作")
def _get_task_snapshot(task_id: str) -> dict:
_ensure_task_exists(task_id)
if _task_store.exists(task_id):
_persist_task_snapshot(task_id)
snapshot = _get_persisted_task(task_id)
if snapshot is None and _task_store.exists(task_id):
snapshot = _normalize_snapshot(_task_store.snapshot(task_id))
if snapshot is None:
raise HTTPException(404, "任务不存在")
return snapshot
def _prepare_register_request(req: RegisterTaskRequest) -> RegisterTaskRequest:
from core.config_store import config_store
@@ -91,6 +303,7 @@ def _create_task_record(
source=source,
meta=meta,
)
_persist_task_snapshot(task_id)
def enqueue_register_task(
@@ -124,37 +337,42 @@ def _log(task_id: str, msg: str):
ts = time.strftime("%H:%M:%S")
entry = f"[{ts}] {msg}"
_task_store.append_log(task_id, entry)
_persist_task_snapshot(task_id)
print(entry)
def _save_task_log(
platform: str, email: str, status: str, error: str = "", detail: dict = None
):
"""Write a TaskLog record to the database."""
with Session(engine) as s:
log = TaskLog(
platform=platform,
email=email,
status=status,
error=error,
detail_json=json.dumps(detail or {}, ensure_ascii=False),
)
s.add(log)
s.commit()
"""Write a TaskLog record to the database (fire-and-forget, non-blocking)."""
def _write():
with Session(engine) as s:
log = TaskLog(
platform=platform,
email=email,
status=status,
error=error,
detail_json=json.dumps(detail or {}, ensure_ascii=False),
)
s.add(log)
s.commit()
threading.Thread(target=_write, daemon=True).start()
def _auto_upload_integrations(task_id: str, account):
"""注册成功后自动导入外部系统。"""
try:
from services.external_sync import sync_account
"""注册成功后自动导入外部系统(后台线程,不阻塞注册流程)"""
def _run():
try:
from services.external_sync import sync_account
for result in sync_account(account):
name = result.get("name", "Auto Upload")
ok = bool(result.get("ok"))
msg = result.get("msg", "")
_log(task_id, f" [{name}] {'[OK] ' + msg if ok else '[FAIL] ' + msg}")
except Exception as e:
_log(task_id, f" [Auto Upload] 自动导入异常: {e}")
for result in sync_account(account):
name = result.get("name", "Auto Upload")
ok = bool(result.get("ok"))
msg = result.get("msg", "")
_log(task_id, f" [{name}] {'[OK] ' + msg if ok else '[FAIL] ' + msg}")
except Exception as e:
_log(task_id, f" [Auto Upload] 自动导入异常: {e}")
threading.Thread(target=_run, daemon=True).start()
def _run_register(task_id: str, req: RegisterTaskRequest):
@@ -166,6 +384,7 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
control = _task_store.control_for(task_id)
_task_store.mark_running(task_id)
_persist_task_snapshot(task_id)
success = 0
skipped = 0
errors = []
@@ -187,35 +406,53 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
try:
PlatformCls = get(req.platform)
def _build_mailbox(proxy: Optional[str]):
from core.config_store import config_store
# 预先计算 merged_extra所有线程共享只读副本避免每线程重复调用 config_store
from core.config_store import config_store as _cs
_base_extra = _cs.get_all().copy()
_base_extra.update(
{k: v for k, v in req.extra.items() if v is not None and v != ""}
)
merged_extra = config_store.get_all().copy()
merged_extra.update(
{k: v for k, v in req.extra.items() if v is not None and v != ""}
)
# 批量预取代理(无固定代理时),减少每线程单独查 DB
from core.proxy_pool import proxy_pool as _proxy_pool
_prefetched_proxies: list[str] = []
_prefetch_lock = threading.Lock()
if not req.proxy and req.count > 1:
with Session(engine) as _s:
from core.db import ProxyModel
from sqlmodel import select as _sel
_active = _s.exec(
_sel(ProxyModel).where(ProxyModel.is_active == True)
).all()
_prefetched_proxies = [p.url for p in _active if p.url]
def _get_proxy() -> Optional[str]:
if req.proxy:
return req.proxy
if _prefetched_proxies:
with _prefetch_lock:
if _prefetched_proxies:
import random
return random.choice(_prefetched_proxies)
return _proxy_pool.get_next()
def _build_mailbox(proxy: Optional[str]):
return create_mailbox(
provider=merged_extra.get("mail_provider", "luckmail"),
extra=merged_extra,
provider=_base_extra.get("mail_provider", "luckmail"),
extra=_base_extra,
proxy=proxy,
)
def _do_one(i: int):
nonlocal next_start_time
proxy_pool = None
_proxy = None
current_email = req.email or ""
attempt_id: int | None = None
try:
from core.proxy_pool import proxy_pool
control.checkpoint()
attempt_id = control.start_attempt()
control.checkpoint(attempt_id=attempt_id)
_proxy = req.proxy
if not _proxy:
_proxy = proxy_pool.get_next()
_proxy = normalize_proxy_url(_proxy)
_proxy = normalize_proxy_url(_get_proxy())
if req.register_delay_seconds > 0:
with start_gate_lock:
control.checkpoint(attempt_id=attempt_id)
@@ -232,12 +469,8 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
)
next_start_time = time.time() + req.register_delay_seconds
control.checkpoint(attempt_id=attempt_id)
from core.config_store import config_store
merged_extra = config_store.get_all().copy()
merged_extra.update(
{k: v for k, v in req.extra.items() if v is not None and v != ""}
)
merged_extra = _base_extra
_config = RegisterConfig(
executor_type=req.executor_type,
@@ -254,6 +487,7 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
_platform.mailbox._task_attempt_token = attempt_id
_platform.mailbox._log_fn = _platform._log_fn
_task_store.set_progress(task_id, f"{i + 1}/{req.count}")
_persist_task_snapshot(task_id)
_log(task_id, f"开始注册第 {i + 1}/{req.count} 个账号")
if _proxy:
_log(task_id, f"使用代理: {_proxy}")
@@ -262,6 +496,20 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
password=req.password,
)
current_email = account.email or current_email
if str(merged_extra.get("mail_provider", "")).strip() == "cfworker":
from core.email_domain_policy import validate_email_domain_policy
validate_email_domain_policy(
account.email,
{
"email_domain_rule_enabled": merged_extra.get(
"email_domain_rule_enabled", "0"
),
"email_domain_level_count": merged_extra.get(
"email_domain_level_count", "2"
),
},
)
if isinstance(account.extra, dict):
mail_provider = merged_extra.get("mail_provider", "")
if mail_provider:
@@ -291,7 +539,7 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
)
saved_account = save_account(account)
if _proxy:
proxy_pool.report_success(_proxy)
_proxy_pool.report_success(_proxy)
_log(task_id, f"[OK] 注册成功: {account.email}")
_save_task_log(req.platform, account.email, "success")
_auto_upload_integrations(task_id, saved_account or account)
@@ -299,6 +547,7 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
if cashier_url:
_log(task_id, f" [升级链接] {cashier_url}")
_task_store.add_cashier_url(task_id, cashier_url)
_persist_task_snapshot(task_id)
return AttemptResult.success()
except SkipCurrentAttemptRequested as e:
_log(task_id, f"[SKIP] 已跳过当前账号: {e}")
@@ -313,8 +562,8 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
_log(task_id, f"[STOP] {e}")
return AttemptResult.stopped(str(e))
except Exception as e:
if _proxy and proxy_pool is not None:
proxy_pool.report_fail(_proxy)
if _proxy:
_proxy_pool.report_fail(_proxy)
_log(task_id, f"[FAIL] 注册失败: {e}")
_save_task_log(
req.platform,
@@ -328,7 +577,7 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
from concurrent.futures import CancelledError, ThreadPoolExecutor, as_completed
max_workers = min(req.concurrency, req.count, 5)
max_workers = min(req.concurrency, req.count)
stopped = False
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = [pool.submit(_do_one, i) for i in range(req.count)]
@@ -354,6 +603,7 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
success=success,
registered=success + skipped + len(errors),
)
_persist_task_snapshot(task_id)
if stopped or control.is_stop_requested():
stopped = True
for pending in futures:
@@ -370,6 +620,7 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
errors=errors,
error=str(e),
)
_persist_task_snapshot(task_id)
_task_store.cleanup()
return
@@ -389,6 +640,7 @@ def _run_register(task_id: str, req: RegisterTaskRequest):
skipped=skipped,
errors=errors,
)
_persist_task_snapshot(task_id)
_task_store.cleanup()
@@ -403,7 +655,10 @@ def create_register_task(
@router.post("/{task_id}/skip-current")
def skip_current_account(task_id: str):
_finalize_orphan_tasks()
_ensure_task_mutable(task_id)
if not _task_store.exists(task_id):
raise HTTPException(409, "任务已结束或服务已重启,无法跳过当前账号")
control = _task_store.request_skip_current(task_id)
_log(task_id, "收到手动跳过当前账号请求")
return {"ok": True, "task_id": task_id, "control": control}
@@ -411,7 +666,10 @@ def skip_current_account(task_id: str):
@router.post("/{task_id}/stop")
def stop_task(task_id: str):
_finalize_orphan_tasks()
_ensure_task_mutable(task_id)
if not _task_store.exists(task_id):
raise HTTPException(409, "任务已结束或服务已重启,无法停止")
control = _task_store.request_stop(task_id)
_log(task_id, "收到手动停止任务请求")
return {"ok": True, "task_id": task_id, "control": control}
@@ -465,13 +723,21 @@ def batch_delete_logs(body: TaskLogBatchDeleteRequest):
@router.get("/{task_id}/logs/stream")
async def stream_logs(task_id: str, since: int = 0):
"""SSE 实时日志流"""
_finalize_orphan_tasks()
_ensure_task_exists(task_id)
async def event_generator():
sent = since
use_memory = _task_store.exists(task_id)
while True:
logs, status = _task_store.log_state(task_id)
snapshot = _task_store.snapshot(task_id)
if use_memory:
logs, status = _task_store.log_state(task_id)
snapshot = _task_store.snapshot(task_id)
_persist_task_snapshot(task_id)
else:
snapshot = _get_persisted_task(task_id) or {}
logs = snapshot.get("logs") or []
status = snapshot.get("status") or "failed"
counters = {
"success": int(snapshot.get("success") or 0),
"registered": int(snapshot.get("registered") or 0),
@@ -483,6 +749,10 @@ async def stream_logs(task_id: str, since: int = 0):
if status in ("done", "failed", "stopped"):
yield f"data: {json.dumps({'done': True, 'status': status, **counters})}\n\n"
break
if not use_memory:
# 非内存任务仅提供持久化快照,不进入无限轮询
yield f"data: {json.dumps({'done': True, 'status': 'stopped', **counters})}\n\n"
break
await asyncio.sleep(0.5)
return StreamingResponse(
@@ -497,10 +767,27 @@ async def stream_logs(task_id: str, since: int = 0):
@router.get("/{task_id}")
def get_task(task_id: str):
_ensure_task_exists(task_id)
return _task_store.snapshot(task_id)
_finalize_orphan_tasks()
return _get_task_snapshot(task_id)
@router.get("")
def list_tasks():
return _task_store.list_snapshots()
_finalize_orphan_tasks()
# 以 DB 为主返回,避免进程重启导致列表丢失
return _list_persisted_tasks()
@router.delete("/{task_id}")
def delete_task(task_id: str):
_finalize_orphan_tasks()
snapshot = _get_task_snapshot(task_id)
status = str(snapshot.get("status") or "")
if status in {"pending", "running"}:
raise HTTPException(409, "运行中的任务不允许删除,请先停止任务")
with Session(engine) as s:
row = s.get(TaskRunModel, task_id)
if row is not None:
s.delete(row)
s.commit()
return {"ok": True, "task_id": task_id}

View File

@@ -315,6 +315,7 @@ def create_mailbox(
domains=extra.get("cfworker_domains", ""),
enabled_domains=extra.get("cfworker_enabled_domains", ""),
subdomain=extra.get("cfworker_subdomain", ""),
domain_level_count=extra.get("email_domain_level_count", 2),
random_subdomain=extra.get("cfworker_random_subdomain", False),
random_name_subdomain=extra.get("cfworker_random_name_subdomain", False),
fingerprint=extra.get("cfworker_fingerprint", ""),
@@ -2284,6 +2285,7 @@ class CFWorkerMailbox(BaseMailbox):
domains: Any = None,
enabled_domains: Any = None,
subdomain: str = "",
domain_level_count: Any = 2,
random_subdomain: Any = False,
random_name_subdomain: Any = False,
fingerprint: str = "",
@@ -2302,6 +2304,7 @@ class CFWorkerMailbox(BaseMailbox):
else:
self.enabled_domains = raw_enabled_domains
self.subdomain = self._normalize_subdomain(subdomain)
self.domain_level_count = self._parse_domain_level_count(domain_level_count)
self.random_subdomain = self._to_bool(random_subdomain)
self.random_name_subdomain = self._to_bool(random_name_subdomain)
self.fingerprint = fingerprint
@@ -2405,6 +2408,14 @@ class CFWorkerMailbox(BaseMailbox):
text = str(value or "").strip().lower()
return text in {"1", "true", "yes", "on"}
@staticmethod
def _parse_domain_level_count(value: Any) -> int:
try:
parsed = int(str(value or "").strip() or "2")
except (TypeError, ValueError):
return 2
return parsed if parsed >= 2 else 2
@classmethod
def _parse_domains(cls, value: Any) -> list[str]:
if not value:
@@ -2473,6 +2484,13 @@ class CFWorkerMailbox(BaseMailbox):
if self.subdomain:
sub_parts.append(self.subdomain)
base_level_count = len([part for part in domain.split(".") if part])
expected_total_levels = max(self.domain_level_count, 2)
missing_levels = max(expected_total_levels - (base_level_count + len(sub_parts)), 0)
if missing_levels > 0:
fillers = [self._generate_subdomain_label() for _ in range(missing_levels)]
sub_parts = fillers + sub_parts
if not sub_parts:
return domain
return f"{'.'.join(sub_parts)}.{domain}"
@@ -3256,14 +3274,43 @@ class OutlookGraphMailboxBackend(OutlookMailboxBackend):
)
seen: set[str] = set()
for folder in self.mailbox._graph_folder_names:
messages = self.mailbox._graph_list_messages(
access_token=access_token,
folder=folder,
)
for message in messages:
message_id = str(message.get("id") or "").strip()
if message_id:
seen.add(f"{folder}:{message_id}")
try:
messages = self.mailbox._graph_list_messages(
access_token=access_token,
folder=folder,
)
for message in messages:
message_id = str(message.get("id") or "").strip()
if message_id:
seen.add(f"{folder}:{message_id}")
except RuntimeError as exc:
if "HTTP 401" in str(exc):
# 401 → token 失效,强制刷新后重试一次
self.mailbox._log(
f"[微软邮箱][Graph] get_current_ids folder={folder} 遇到 401强制刷新 token"
)
_cache = (account.extra or {}).get("_oauth_token_cache")
if isinstance(_cache, dict):
_cache.pop(
self.mailbox._normalize_backend_name(self.backend_name), None
)
access_token = self.mailbox._get_oauth_access_token(
account,
preferred_backend=self.backend_name,
)
try:
messages = self.mailbox._graph_list_messages(
access_token=access_token,
folder=folder,
)
for message in messages:
message_id = str(message.get("id") or "").strip()
if message_id:
seen.add(f"{folder}:{message_id}")
except Exception:
pass
else:
raise
return seen
def wait_for_code(
@@ -3283,7 +3330,23 @@ class OutlookGraphMailboxBackend(OutlookMailboxBackend):
}
keyword_lower = str(keyword or "").strip().lower()
# 标记是否已做过一次 401 强制刷 token避免无限循环
_token_refreshed = False
def _force_refresh_token() -> str:
"""清除 OAuth 缓存,强制重新获取 access token。"""
_cache = (account.extra or {}).get("_oauth_token_cache")
if isinstance(_cache, dict):
_cache.pop(
self.mailbox._normalize_backend_name(self.backend_name), None
)
return self.mailbox._get_oauth_access_token(
account,
preferred_backend=self.backend_name,
)
def poll_once() -> Optional[str]:
nonlocal _token_refreshed
access_token = self.mailbox._get_oauth_access_token(
account,
preferred_backend=self.backend_name,
@@ -3342,6 +3405,52 @@ class OutlookGraphMailboxBackend(OutlookMailboxBackend):
)
return code
except Exception as exc:
exc_str = str(exc)
# 401 → token 失效,强制刷新后重试一次
if "HTTP 401" in exc_str and not _token_refreshed:
_token_refreshed = True
self.mailbox._log(
f"[微软邮箱][Graph] folder={folder} 遇到 401强制刷新 token 后重试"
)
try:
access_token = _force_refresh_token()
messages = self.mailbox._graph_list_messages(
access_token=access_token,
folder=folder,
)
new_messages = []
for message in messages:
message_id = str(message.get("id") or "").strip()
seen_key = f"{folder}:{message_id}"
if not message_id or seen_key in seen:
continue
seen.add(seen_key)
new_messages.append(message)
for message in new_messages:
subject = str(message.get("subject") or "").strip()
text = self.mailbox._graph_message_text(message)
if keyword_lower and keyword_lower not in text.lower():
continue
code = self.mailbox._safe_extract(text, code_pattern)
if not code:
mid = str(message.get("id") or "").strip()
if mid:
detail = self.mailbox._graph_get_message(
access_token=access_token,
message_id=mid,
)
text = self.mailbox._graph_message_text(detail)
code = self.mailbox._safe_extract(text, code_pattern)
if code and code not in exclude_codes:
self.mailbox._log(
f"[微软邮箱][Graph] folder={folder} 刷新 token 后验证码提取成功: {code}"
)
return code
except Exception as retry_exc:
self.mailbox._log(
f"[微软邮箱][Graph] folder={folder} 刷新 token 后仍然失败: {retry_exc}"
)
continue
self.mailbox._log(
f"[微软邮箱][Graph] folder={folder} 查询异常: {exc}"
)
@@ -3358,6 +3467,9 @@ class OutlookGraphMailboxBackend(OutlookMailboxBackend):
class OutlookMailbox(BaseMailbox):
"""微软邮箱Outlook / Hotmail本地账号池Graph / IMAP 策略)"""
# 类级别锁:保证多线程并发时取号互斥,防止多个实例取到同一个邮箱
_pop_lock = threading.Lock()
def __init__(
self,
imap_server: str = "",
@@ -3414,7 +3526,7 @@ class OutlookMailbox(BaseMailbox):
from sqlmodel import Session, select
from core.db import engine, OutlookAccountModel
with self._lock:
with OutlookMailbox._pop_lock:
with Session(engine) as session:
account = (
session.exec(
@@ -3695,7 +3807,7 @@ class OutlookMailbox(BaseMailbox):
"endpoint": probe.get("endpoint", ""),
}
self._log(f"[微软邮箱] OAuth token 获取失败,回退密码登录: {email}")
return {}
return {"reason": str(probe.get("reason") or "")}
def _fetch_oauth_token(
self,
@@ -3744,7 +3856,9 @@ class OutlookMailbox(BaseMailbox):
)
access_token = str(bundle.get("access_token") or "").strip()
if not access_token:
raise RuntimeError("微软邮箱 OAuth access token 获取失败")
reason = bundle.get("reason", "")
suffix = f" [{reason}]" if reason else ""
raise RuntimeError(f"微软邮箱 OAuth access token 获取失败{suffix}")
expires_in = bundle.get("expires_in")
try:

View File

@@ -49,6 +49,28 @@ class TaskLog(SQLModel, table=True):
created_at: datetime = Field(default_factory=_utcnow)
class TaskRunModel(SQLModel, table=True):
__tablename__ = "task_runs"
id: str = Field(primary_key=True)
platform: str = Field(index=True)
source: str = Field(default="manual", index=True)
status: str = Field(default="pending", index=True)
total: int = 0
progress: str = "0/0"
success: int = 0
registered: int = 0
skipped: int = 0
error: str = ""
meta_json: str = "{}"
logs_json: str = "[]"
errors_json: str = "[]"
cashier_urls_json: str = "[]"
control_json: str = "{}"
created_at: datetime = Field(default_factory=_utcnow, index=True)
updated_at: datetime = Field(default_factory=_utcnow, index=True)
class OutlookAccountModel(SQLModel, table=True):
__tablename__ = "outlook_accounts"

View File

@@ -0,0 +1,53 @@
"""邮箱域名全局策略校验。"""
from __future__ import annotations
import re
from typing import Any
def _to_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
text = str(value or "").strip().lower()
return text in {"1", "true", "yes", "on"}
def _required_level_count(value: Any) -> int:
text = str(value or "").strip()
if not text:
return 2
try:
level_count = int(text)
except (TypeError, ValueError) as exc:
raise ValueError("域名级数必须是整数") from exc
if level_count < 2:
raise ValueError("域名级数不能小于 2")
return level_count
def validate_email_domain_policy(email: str, config: dict[str, Any] | None = None) -> None:
cfg = config or {}
if not _to_bool(cfg.get("email_domain_rule_enabled")):
return
address = str(email or "").strip().lower()
if "@" not in address:
raise ValueError("邮箱格式无效,缺少域名")
_, domain = address.rsplit("@", 1)
domain = domain.strip().strip(".")
if not domain:
raise ValueError("邮箱格式无效,缺少域名")
levels = [part for part in domain.split(".") if part]
required_levels = _required_level_count(cfg.get("email_domain_level_count"))
if len(levels) < required_levels:
raise ValueError(
f"邮箱域名不满足要求:当前 {len(levels)} 级,至少需要 {required_levels}"
)
letters = len(re.findall(r"[a-z]", domain))
digits = len(re.findall(r"\d", domain))
if letters < 2 or digits < 2:
raise ValueError("邮箱域名不满足要求:域名至少包含 2 个英文字母和 2 个数字")

View File

@@ -164,6 +164,8 @@ class RegisterTaskRecord:
"skipped": self.skipped,
"errors": list(self.errors),
"control": self.control.snapshot(),
"created_at": self.created_at,
"updated_at": self.updated_at,
}
if self.cashier_urls:
data["cashier_urls"] = list(self.cashier_urls)

View File

@@ -10,6 +10,7 @@ import {
SunOutlined,
MoonOutlined,
LogoutOutlined,
PlayCircleOutlined,
} from '@ant-design/icons'
import zhCN from 'antd/locale/zh_CN'
import Dashboard from '@/pages/Dashboard'
@@ -18,6 +19,7 @@ import RegisterTaskPage from '@/pages/RegisterTaskPage'
import Proxies from '@/pages/Proxies'
import Settings from '@/pages/Settings'
import TaskHistory from '@/pages/TaskHistory'
import RunningTasks from '@/pages/RunningTasks'
import Login from '@/pages/Login'
import { darkTheme, lightTheme } from './theme'
import { apiFetch, clearToken, getToken } from '@/lib/utils'
@@ -94,6 +96,7 @@ function AppContent() {
if (path === '/history') return ['/history']
if (path === '/proxies') return ['/proxies']
if (path === '/settings') return ['/settings']
if (path === '/running-tasks') return ['/running-tasks']
return ['/']
}
@@ -103,14 +106,21 @@ function AppContent() {
icon: <DashboardOutlined />,
label: '仪表盘',
},
{
key: '/running-tasks',
icon: <PlayCircleOutlined />,
label: '任务运行',
},
{
key: '/accounts',
icon: <UserOutlined />,
label: '平台管理',
children: platforms.map(p => ({
key: `/accounts/${p.key}`,
label: p.label,
})),
children: [
...platforms.map(p => ({
key: `/accounts/${p.key}`,
label: p.label,
})),
],
},
{
key: '/history',
@@ -230,6 +240,7 @@ function AppContent() {
<Route path="/accounts" element={<Accounts />} />
<Route path="/accounts/:platform" element={<Accounts />} />
<Route path="/register" element={<RegisterTaskPage />} />
<Route path="/running-tasks" element={<RunningTasks />} />
<Route path="/history" element={<TaskHistory />} />
<Route path="/proxies" element={<Proxies />} />
<Route path="/settings" element={<Settings />} />

View File

@@ -524,6 +524,8 @@ export default function Accounts() {
const [accounts, setAccounts] = useState<any[]>([])
const [platformActions, setPlatformActions] = useState<any[]>([])
const [total, setTotal] = useState(0)
const [page, setPage] = useState(1)
const [pageSize, setPageSize] = useState(50)
const [loading, setLoading] = useState(false)
const [search, setSearch] = useState('')
const [filterStatus, setFilterStatus] = useState('')
@@ -571,7 +573,7 @@ export default function Accounts() {
setLoading(true)
try {
const params = new URLSearchParams({ platform: currentPlatform, page: '1', page_size: '100' })
const params = new URLSearchParams({ platform: currentPlatform, page: String(page), page_size: String(pageSize) })
if (search) params.set('email', search)
if (filterStatus) params.set('status', filterStatus)
if (createdAtStart) params.set('created_at_start', createdAtStart)
@@ -582,7 +584,7 @@ export default function Accounts() {
} finally {
setLoading(false)
}
}, [currentPlatform, search, filterStatus, createdAtStart, createdAtEnd])
}, [currentPlatform, search, filterStatus, createdAtStart, createdAtEnd, page, pageSize])
useEffect(() => {
load()
@@ -1217,7 +1219,13 @@ export default function Accounts() {
<Button type="link" size="small" onClick={() => { setCurrentAccount(record); setDetailModalOpen(true); }}>
</Button>
<Popconfirm title="确认删除?" onConfirm={() => handleDelete(record.id)}>
<Popconfirm
title="确认删除该账号吗?"
onConfirm={() => handleDelete(record.id)}
okText="删除"
cancelText="取消"
okButtonProps={{ danger: true }}
>
<Button type="link" size="small" danger>
</Button>
@@ -1254,14 +1262,14 @@ export default function Accounts() {
<Input.Search
placeholder="搜索邮箱..."
allowClear
onSearch={setSearch}
onSearch={(v) => { setPage(1); setSearch(v) }}
style={{ width: 200 }}
/>
<Select
placeholder="状态筛选"
allowClear
style={{ width: 120 }}
onChange={setFilterStatus}
onChange={(v) => { setPage(1); setFilterStatus(v) }}
options={[
{ value: 'registered', label: '已注册' },
{ value: 'trial', label: '试用中' },
@@ -1274,13 +1282,13 @@ export default function Accounts() {
showTime
allowClear
placeholder="开始时间"
onChange={(value) => setCreatedAtStart(value ? value.toISOString() : '')}
onChange={(value) => { setPage(1); setCreatedAtStart(value ? value.toISOString() : '') }}
/>
<DatePicker
showTime
allowClear
placeholder="结束时间"
onChange={(value) => setCreatedAtEnd(value ? value.toISOString() : '')}
onChange={(value) => { setPage(1); setCreatedAtEnd(value ? value.toISOString() : '') }}
/>
<Text type="secondary">{total} </Text>
{selectedRowKeys.length > 0 && (
@@ -1316,6 +1324,8 @@ export default function Accounts() {
: '确认补传当前筛选范围内远端未发现且本地状态有效的账号?'
}
onConfirm={() => handleCpaBackfill(getBackfillScope())}
okText="确认"
cancelText="取消"
>
<Button
loading={cpaSyncLoading === 'pending' || cpaSyncLoading === 'selected'}
@@ -1327,7 +1337,13 @@ export default function Accounts() {
</Popconfirm>
)}
{selectedRowKeys.length > 0 && (
<Popconfirm title={`确认删除选中的 ${selectedRowKeys.length} 个账号?`} onConfirm={handleBatchDelete}>
<Popconfirm
title={`确认删除选中的 ${selectedRowKeys.length} 个账号?`}
onConfirm={handleBatchDelete}
okText="删除"
cancelText="取消"
okButtonProps={{ danger: true }}
>
<Button danger icon={<DeleteOutlined />}> {selectedRowKeys.length} </Button>
</Popconfirm>
)}
@@ -1349,7 +1365,7 @@ export default function Accounts() {
selectedRowKeys,
onChange: setSelectedRowKeys,
}}
pagination={{ pageSize: 20, showSizeChanger: false }}
pagination={{ total, current: page, pageSize, showSizeChanger: true, pageSizeOptions: ['20', '50', '100'], onChange: (p, ps) => { setPage(p); setPageSize(ps) } }}
scroll={{ x: isChatgptPlatform ? 1440 : 980 }}
onRow={(record) => ({
onDoubleClick: () => {
@@ -1373,7 +1389,7 @@ export default function Accounts() {
<Input type="number" min={1} />
</Form.Item>
<Form.Item name="concurrency" label="并发数" initialValue={1} rules={[{ required: true }]}>
<Input type="number" min={1} max={5} />
<Input type="number" min={1} />
</Form.Item>
<Form.Item name="register_delay_seconds" label="每个注册延迟(秒)" initialValue={0}>
<InputNumber min={0} precision={1} step={0.5} style={{ width: '100%' }} placeholder="0 = 不延迟" />

View File

@@ -1,5 +1,5 @@
import { useEffect, useState } from 'react'
import { Card, Table, Button, Input, Tag, Space, Popconfirm, message } from 'antd'
import { useEffect, useState, type Key } from 'react'
import { Card, Table, Button, Input, Tag, Space, Popconfirm, message, Modal } from 'antd'
import {
PlusOutlined,
DeleteOutlined,
@@ -17,6 +17,7 @@ export default function Proxies() {
const [region, setRegion] = useState('')
const [checking, setChecking] = useState(false)
const [loading, setLoading] = useState(false)
const [selectedRowKeys, setSelectedRowKeys] = useState<Key[]>([])
const load = async () => {
setLoading(true)
@@ -57,9 +58,47 @@ export default function Proxies() {
}
const del = async (id: number) => {
await apiFetch(`/proxies/${id}`, { method: 'DELETE' })
message.success('删除成功')
load()
try {
await apiFetch(`/proxies/${id}`, { method: 'DELETE' })
message.success('删除成功')
setSelectedRowKeys((prev) => prev.filter((key) => key !== id))
load()
} catch (e: any) {
message.error(`删除失败: ${e.message || '未知错误'}`)
}
}
const batchDel = async () => {
if (selectedRowKeys.length === 0) return
const ids = selectedRowKeys.map((key) => Number(key)).filter((v) => Number.isFinite(v))
try {
const result = await apiFetch('/proxies/batch-delete', {
method: 'POST',
body: JSON.stringify({ ids }),
}) as { deleted: number; not_found?: number[]; total_requested?: number }
setSelectedRowKeys([])
load()
const notFound = (result.not_found || []) as number[]
Modal.success({
title: '批量删除结果',
okText: '知道了',
content: (
<div>
<div>{result.total_requested ?? ids.length} </div>
<div>{result.deleted ?? 0} </div>
<div>{notFound.length} </div>
{notFound.length > 0 && (
<div style={{ marginTop: 8, maxHeight: 120, overflow: 'auto', fontFamily: 'monospace' }}>
{notFound.join(', ')}
</div>
)}
</div>
),
})
} catch (e: any) {
message.error(`批量删除失败: ${e.message || '未知错误'}`)
}
}
const toggle = async (id: number) => {
@@ -121,7 +160,13 @@ export default function Proxies() {
icon={record.is_active ? <SwapLeftOutlined /> : <SwapRightOutlined />}
onClick={() => toggle(record.id)}
/>
<Popconfirm title="确认删除?" onConfirm={() => del(record.id)}>
<Popconfirm
title="确认删除该代理吗?"
onConfirm={() => del(record.id)}
okText="删除"
cancelText="取消"
okButtonProps={{ danger: true }}
>
<Button type="text" size="small" danger icon={<DeleteOutlined />} />
</Popconfirm>
</Space>
@@ -165,11 +210,32 @@ export default function Proxies() {
</Card>
<Card>
<div style={{ marginBottom: 12, display: 'flex', justifyContent: 'space-between' }}>
<div style={{ color: '#7a8ba3' }}>
{selectedRowKeys.length}
</div>
<Popconfirm
title={`确认删除选中的 ${selectedRowKeys.length} 条代理?`}
onConfirm={batchDel}
okText="删除"
cancelText="取消"
okButtonProps={{ danger: true }}
disabled={selectedRowKeys.length === 0}
>
<Button danger icon={<DeleteOutlined />} disabled={selectedRowKeys.length === 0}>
</Button>
</Popconfirm>
</div>
<Table
rowKey="id"
columns={columns}
dataSource={proxies}
loading={loading}
rowSelection={{
selectedRowKeys,
onChange: (keys) => setSelectedRowKeys(keys),
}}
pagination={false}
/>
</Card>

View File

@@ -288,7 +288,7 @@ export default function RegisterTaskPage() {
<Input type="number" min={1} />
</Form.Item>
<Form.Item name="concurrency" label="并发数" style={{ flex: 1 }}>
<Input type="number" min={1} max={5} />
<Input type="number" min={1} />
</Form.Item>
</Space>
<Space style={{ width: '100%' }}>

View File

@@ -0,0 +1,342 @@
import { useCallback, useEffect, useState } from 'react'
import {
Badge,
Button,
Card,
Col,
Drawer,
Empty,
message,
Popconfirm,
Progress,
Row,
Space,
Tag,
Typography,
} from 'antd'
import {
DeleteOutlined,
FileTextOutlined,
LoadingOutlined,
ReloadOutlined,
} from '@ant-design/icons'
import { apiFetch } from '@/lib/utils'
import { TaskLogPanel } from '@/components/TaskLogPanel'
const { Text, Title } = Typography
interface TaskSnapshot {
id: string
platform: string
source: string
status: 'pending' | 'running' | 'done' | 'failed' | 'stopped'
total: number
progress: string
success: number
registered: number
skipped: number
errors: string[]
created_at: number | string | null
updated_at: number | string | null
control: { stop_requested: boolean }
}
const PLATFORM_LABELS: Record<string, string> = {
chatgpt: 'ChatGPT',
trae: 'Trae',
cursor: 'Cursor',
grok: 'Grok',
kiro: 'Kiro',
tavily: 'Tavily',
openblocklabs: 'OpenBlock Labs',
}
const SOURCE_LABELS: Record<string, string> = {
manual: '手动',
api: 'API',
schedule: '调度',
}
const STATUS_CONFIG: Record<string, { color: string; label: string; icon?: React.ReactNode }> = {
pending: { color: 'default', label: '等待中', icon: <LoadingOutlined /> },
running: { color: 'processing', label: '运行中', icon: <LoadingOutlined /> },
done: { color: 'success', label: '已完成' },
failed: { color: 'error', label: '失败' },
stopped: { color: 'warning', label: '已停止' },
}
function toUnixSeconds(value: unknown): number | null {
if (value === null || value === undefined) return null
if (typeof value === 'string') {
const trimmed = value.trim()
if (!trimmed) return null
const maybeNum = Number(trimmed)
if (Number.isFinite(maybeNum)) {
return maybeNum > 1_000_000_000_000 ? maybeNum / 1000 : maybeNum
}
const parsed = Date.parse(trimmed)
if (Number.isFinite(parsed)) return parsed / 1000
return null
}
if (typeof value !== 'number' || !Number.isFinite(value)) return null
// 兼容毫秒时间戳
return value > 1_000_000_000_000 ? value / 1000 : value
}
function formatDuration(startTs: unknown, endTs?: unknown): string {
const start = toUnixSeconds(startTs)
const end = toUnixSeconds(endTs) ?? (Date.now() / 1000)
if (start === null || !Number.isFinite(end)) return '-'
const seconds = Math.max(0, Math.floor(end - start))
if (seconds < 60) return `${seconds}s`
if (seconds < 3600) return `${Math.floor(seconds / 60)}m ${seconds % 60}s`
const h = Math.floor(seconds / 3600)
const m = Math.floor((seconds % 3600) / 60)
return `${h}h ${m}m`
}
export default function RunningTasks() {
const [tasks, setTasks] = useState<TaskSnapshot[]>([])
const [loading, setLoading] = useState(false)
const [logTaskId, setLogTaskId] = useState<string | null>(null)
const [now, setNow] = useState(() => Date.now() / 1000)
const load = useCallback(async () => {
setLoading(true)
try {
const data = (await apiFetch('/tasks')) as TaskSnapshot[]
// sort: running first, then pending, then finished (newest first)
const order = { running: 0, pending: 1, done: 2, failed: 3, stopped: 4 }
const sorted = [...(data || [])].sort((a, b) => {
const oa = order[a.status] ?? 9
const ob = order[b.status] ?? 9
if (oa !== ob) return oa - ob
const ta = toUnixSeconds(a.created_at) ?? 0
const tb = toUnixSeconds(b.created_at) ?? 0
return tb - ta
})
setTasks(sorted)
} finally {
setLoading(false)
}
}, [])
useEffect(() => {
load()
const poll = setInterval(() => {
load()
setNow(Date.now() / 1000)
}, 2500)
// tick every second for live duration
const tick = setInterval(() => setNow(Date.now() / 1000), 1000)
return () => {
clearInterval(poll)
clearInterval(tick)
}
}, [load])
const isActive = (t: TaskSnapshot) => t.status === 'running' || t.status === 'pending'
const activeTasks = tasks.filter(isActive)
const finishedTasks = tasks.filter((t) => !isActive(t))
const handleDelete = async (taskId: string) => {
try {
await apiFetch(`/tasks/${taskId}`, { method: 'DELETE' })
if (logTaskId === taskId) setLogTaskId(null)
setTasks((prev) => prev.filter((t) => t.id !== taskId))
message.success('任务已删除')
} catch (error) {
const detail = error instanceof Error ? error.message : '删除失败'
message.error(detail)
}
}
const renderTask = (task: TaskSnapshot) => {
const cfg = STATUS_CONFIG[task.status] || { color: 'default', label: task.status }
const failed = task.errors?.length ?? 0
const totalRaw = Number(task.total)
const doneRaw = Number(task.registered)
const success = Number.isFinite(Number(task.success)) ? Number(task.success) : 0
const skipped = Number.isFinite(Number(task.skipped)) ? Number(task.skipped) : 0
const total = Number.isFinite(totalRaw) && totalRaw > 0 ? Math.floor(totalRaw) : 0
const done = Number.isFinite(doneRaw) && doneRaw > 0 ? Math.floor(doneRaw) : 0
const pct = total > 0 ? Math.max(0, Math.min(100, Math.round((done / total) * 100))) : 0
const duration = isActive(task)
? formatDuration(task.created_at, now)
: formatDuration(task.created_at, task.updated_at)
return (
<Card
key={task.id}
size="small"
style={{ marginBottom: 12 }}
bodyStyle={{ padding: '12px 16px' }}
>
<Row gutter={[12, 8]} align="middle" wrap>
{/* Task ID + platform */}
<Col flex="220px">
<Space direction="vertical" size={2}>
<Text code style={{ fontSize: 11 }}>
{task.id}
</Text>
<Space size={4}>
<Tag color="blue" style={{ margin: 0 }}>
{PLATFORM_LABELS[task.platform] || task.platform}
</Tag>
<Text type="secondary" style={{ fontSize: 11 }}>
{SOURCE_LABELS[task.source] || task.source || '-'}
</Text>
</Space>
</Space>
</Col>
{/* Status */}
<Col flex="90px">
<Badge status={cfg.color as any} text={cfg.label} />
</Col>
{/* Duration */}
<Col flex="70px">
<Text type="secondary" style={{ fontSize: 12 }}>
{duration}
</Text>
</Col>
{/* Progress bar */}
<Col flex="1" style={{ minWidth: 160 }}>
<Space direction="vertical" size={2} style={{ width: '100%' }}>
<Progress
percent={pct}
size="small"
status={
task.status === 'failed'
? 'exception'
: task.status === 'done'
? 'success'
: task.status === 'stopped'
? 'exception'
: 'active'
}
format={() => `${done}/${total}`}
/>
<Space size={8}>
<Text style={{ fontSize: 11, color: '#10b981' }}>
{success}
</Text>
{failed > 0 && (
<Text style={{ fontSize: 11, color: '#dc2626' }}>
{failed}
</Text>
)}
{skipped > 0 && (
<Text style={{ fontSize: 11, color: '#d97706' }}>
{skipped}
</Text>
)}
</Space>
</Space>
</Col>
{/* Log button */}
<Col>
<Space>
<Button
size="small"
icon={<FileTextOutlined />}
onClick={() => setLogTaskId(task.id)}
>
</Button>
{!isActive(task) && (
<Popconfirm
title="确认删除该任务记录?"
okText="删除"
cancelText="取消"
okButtonProps={{ danger: true }}
onConfirm={() => handleDelete(task.id)}
>
<Button size="small" danger icon={<DeleteOutlined />}>
</Button>
</Popconfirm>
)}
</Space>
</Col>
</Row>
</Card>
)
}
return (
<div>
<div
style={{
display: 'flex',
alignItems: 'center',
justifyContent: 'space-between',
marginBottom: 16,
}}
>
<Title level={4} style={{ margin: 0 }}>
</Title>
<Button icon={<ReloadOutlined />} loading={loading} onClick={load}>
</Button>
</div>
{/* Active tasks */}
{activeTasks.length > 0 && (
<div style={{ marginBottom: 24 }}>
<Text
strong
style={{ display: 'block', marginBottom: 8, fontSize: 13, color: '#6366f1' }}
>
({activeTasks.length})
</Text>
{activeTasks.map(renderTask)}
</div>
)}
{/* Finished tasks */}
{finishedTasks.length > 0 && (
<div>
<Text
strong
style={{ display: 'block', marginBottom: 8, fontSize: 13, color: '#6b7280' }}
>
({finishedTasks.length})
</Text>
{finishedTasks.map(renderTask)}
</div>
)}
{tasks.length === 0 && !loading && (
<Empty description="暂无任务记录" style={{ marginTop: 60 }} />
)}
{/* Log drawer */}
<Drawer
title={
<Space>
<FileTextOutlined />
<span></span>
{logTaskId && (
<Text code style={{ fontSize: 11 }}>
{logTaskId}
</Text>
)}
</Space>
}
open={!!logTaskId}
onClose={() => setLogTaskId(null)}
width={720}
destroyOnClose
bodyStyle={{ padding: 16 }}
>
{logTaskId && <TaskLogPanel taskId={logTaskId} />}
</Drawer>
</div>
)
}

View File

@@ -221,6 +221,8 @@ const TAB_ITEMS = [
{ key: 'cfworker_admin_token', label: '管理员 Token', secret: true },
{ key: 'cfworker_custom_auth', label: '站点密码', secret: true },
{ key: 'cfworker_subdomain', label: '固定子域名', placeholder: 'mail / pool-a' },
{ key: 'email_domain_rule_enabled', label: '启用域名规则', type: 'boolean' },
{ key: 'email_domain_level_count', label: '域名级数N 级)', placeholder: '例如 2 / 3 / 4' },
{ key: 'cfworker_random_subdomain', label: '随机子域名', type: 'boolean' },
{ key: 'cfworker_random_name_subdomain', label: '随机姓名子域名', type: 'boolean' },
{ key: 'cfworker_fingerprint', label: 'Fingerprint', placeholder: '6703363b...' },
@@ -570,6 +572,10 @@ function ConfigField({ field }: { field: FieldConfig }) {
const helpText =
field.key === 'default_executor'
? '仅对支持的平台生效ChatGPT、Cursor、Grok、Kiro、Tavily、Trae 支持浏览器模式OpenBlockLabs 仅支持纯协议。'
: field.key === 'email_domain_rule_enabled'
? '仅 CF Worker 生效:开启后会校验域名级数,以及域名至少包含 2 个字母和 2 个数字。'
: field.key === 'email_domain_level_count'
? '例如 2=example.com3=a.example.com4=a.b.example.com。'
: undefined
return (
@@ -1088,10 +1094,20 @@ function ContributionPanel({
const [statsResponse, setStatsResponse] = useState<Record<string, unknown> | null>(null)
const [redeemResponse, setRedeemResponse] = useState<Record<string, unknown> | null>(null)
const [statsError, setStatsError] = useState('')
const [bindingCustom, setBindingCustom] = useState(false)
const [customEmail, setCustomEmail] = useState('')
const [customStatsResponse, setCustomStatsResponse] = useState<Record<string, unknown> | null>(null)
const [customBalanceResponse, setCustomBalanceResponse] = useState<Record<string, unknown> | null>(null)
const [loadingCustomStats, setLoadingCustomStats] = useState(false)
const contributionEnabled = Form.useWatch('contribution_enabled', form)
const contributionMode = String(Form.useWatch('contribution_mode', form) || 'codex').trim()
const contributionServerUrl = String(Form.useWatch('contribution_server_url', form) || '').trim()
const contributionKey = String(Form.useWatch('contribution_key', form) || '').trim()
const customContributionUrl = String(Form.useWatch('custom_contribution_url', form) || '').trim()
const customContributionToken = String(Form.useWatch('custom_contribution_token', form) || '').trim()
const isCustomMode = contributionMode === 'custom'
const rawData = asRecord(statsResponse?.['data'])
const serverInfo = pickRecord(rawData, ['server_info', 'server', 'server_stats', 'stats']) || rawData
@@ -1235,6 +1251,68 @@ function ContributionPanel({
}
}
const doBindCustom = async () => {
if (!customEmail.trim()) {
message.error('请输入邮箱')
return
}
if (!customContributionUrl) {
message.error('请先填写自定义服务器地址')
return
}
setBindingCustom(true)
try {
const data = await apiFetch('/contribution/custom/bind', {
method: 'POST',
body: JSON.stringify({
email: customEmail.trim(),
server_url: customContributionUrl,
}),
})
const token = pickString(asRecord(data), ['token'])
if (token) {
form.setFieldValue('custom_contribution_token', token)
message.success('绑定成功token 已自动填充,请点击保存配置')
setCustomEmail('')
} else {
message.success('绑定成功')
}
} catch (e: any) {
message.error(String(e?.message || '绑定失败'))
} finally {
setBindingCustom(false)
}
}
const fetchCustomStats = async () => {
if (!contributionEnabled) {
message.warning('请先开启贡献功能')
return
}
if (!customContributionUrl) {
message.error('请先填写自定义服务器地址')
return
}
if (!customContributionToken) {
message.error('请先绑定邮箱获取 token')
return
}
setLoadingCustomStats(true)
try {
const [status, balance] = await Promise.all([
apiFetch(`/contribution/custom/status?server_url=${encodeURIComponent(customContributionUrl)}&token=${encodeURIComponent(customContributionToken)}`),
apiFetch(`/contribution/custom/balance?server_url=${encodeURIComponent(customContributionUrl)}&token=${encodeURIComponent(customContributionToken)}`),
])
setCustomStatsResponse(asRecord(status))
setCustomBalanceResponse(asRecord(balance))
message.success('信息已刷新')
} catch (e: any) {
message.error(String(e?.message || '获取信息失败'))
} finally {
setLoadingCustomStats(false)
}
}
return (
<div style={{ display: 'flex', flexDirection: 'column', gap: 16 }}>
<Card title="配置">
@@ -1255,104 +1333,178 @@ function ContributionPanel({
<Form.Item name="contribution_enabled" label="是否开启" valuePropName="checked">
<Switch checkedChildren="开启" unCheckedChildren="关闭" />
</Form.Item>
<Form.Item
name="contribution_server_url"
label="服务器地址"
rules={[{ required: true, message: '请输入服务器地址' }]}
>
<Input placeholder="http://new.xem8k5.top:7317/" />
</Form.Item>
<Form.Item name="contribution_key" label="API Key">
<Input
placeholder="留空可点击右侧按钮自动创建"
addonAfter={(
<Button
type="link"
size="small"
loading={creatingKey}
onClick={() => { void doGenerateKey() }}
style={{ paddingInline: 0 }}
>
key?
</Button>
)}
/>
<Form.Item name="contribution_mode" label="贡献模式">
<Select>
<Select.Option value="codex">Codex2APIxem中转站</Select.Option>
<Select.Option value="custom"></Select.Option>
</Select>
</Form.Item>
{!isCustomMode ? (
<>
<Form.Item
name="contribution_server_url"
label="服务器地址"
rules={[{ required: true, message: '请输入服务器地址' }]}
>
<Input placeholder="http://new.xem8k5.top:7317/" />
</Form.Item>
<Form.Item name="contribution_key" label="API Key">
<Input
placeholder="留空可点击右侧按钮自动创建"
addonAfter={(
<Button
type="link"
size="small"
loading={creatingKey}
onClick={() => { void doGenerateKey() }}
style={{ paddingInline: 0 }}
>
key?
</Button>
)}
/>
</Form.Item>
</>
) : (
<>
<Form.Item
name="custom_contribution_url"
label="自定义服务器地址"
rules={[{ required: true, message: '请输入服务器地址' }]}
>
<Input placeholder="http://127.0.0.1:5000" />
</Form.Item>
<Form.Item label="绑定邮箱">
<Space.Compact style={{ width: '100%' }}>
<Input
placeholder="输入邮箱以绑定账号"
value={customEmail}
onChange={(e) => setCustomEmail(e.target.value)}
onPressEnter={() => { void doBindCustom() }}
/>
<Button type="primary" loading={bindingCustom} onClick={() => { void doBindCustom() }}>
</Button>
</Space.Compact>
</Form.Item>
<Form.Item name="custom_contribution_token" label="Token">
<Input.TextArea placeholder="绑定邮箱后自动填充" rows={3} />
</Form.Item>
</>
)}
<Button type="primary" icon={<SaveOutlined />} onClick={onSave} loading={saving} block>
{saved ? '已保存 ✓' : '保存配置'}
</Button>
</Card>
<Card
title="信息"
extra={(
<Button loading={loadingStats} onClick={() => { void fetchStats() }}>
</Button>
)}
>
{!contributionEnabled ? (
<Alert type="info" showIcon message="贡献功能已关闭,开启后可获取服务器与 key 信息。" />
) : (
<Space direction="vertical" style={{ width: '100%' }} size={12}>
{statsError ? <Alert type="error" showIcon message={statsError} /> : null}
<div>
<Typography.Text strong></Typography.Text>
<div style={{ marginTop: 8, display: 'grid', gridTemplateColumns: 'repeat(auto-fit, minmax(180px, 1fr))', gap: 8 }}>
<Tag color="blue">: {formatDisplayNumber(serverQuotaAccountCount)}</Tag>
<Tag color="geekblue">: {formatDisplayNumber(serverQuotaTotal)}</Tag>
<Tag color="volcano">: {formatDisplayNumber(serverQuotaUsed)}</Tag>
<Tag color="green">: {formatDisplayNumber(serverQuotaRemaining)}</Tag>
<Tag color="orange">: {formatDisplayPercent(serverQuotaUsedPercent)}</Tag>
<Tag color="cyan">: {formatDisplayPercent(serverQuotaRemainingPercent)}</Tag>
<Tag color="purple">: {formatDisplayNumber(serverQuotaRemainingAccounts, 2)}</Tag>
</div>
</div>
<div>
<Typography.Text strong>API Key</Typography.Text>
<Space style={{ marginLeft: 8 }}>
<Typography.Text copyable={keyFromStats ? { text: keyFromStats } : undefined}>
{keyFromStats || '-'}
</Typography.Text>
{!isCustomMode ? (
<>
<Card
title="信息"
extra={(
<Button loading={loadingStats} onClick={() => { void fetchStats() }}>
</Button>
)}
>
{!contributionEnabled ? (
<Alert type="info" showIcon message="贡献功能已关闭,开启后可获取服务器与 key 信息。" />
) : (
<Space direction="vertical" style={{ width: '100%' }} size={12}>
{statsError ? <Alert type="error" showIcon message={statsError} /> : null}
<div>
<Typography.Text strong></Typography.Text>
<div style={{ marginTop: 8, display: 'grid', gridTemplateColumns: 'repeat(auto-fit, minmax(180px, 1fr))', gap: 8 }}>
<Tag color="blue">: {formatDisplayNumber(serverQuotaAccountCount)}</Tag>
<Tag color="geekblue">: {formatDisplayNumber(serverQuotaTotal)}</Tag>
<Tag color="volcano">: {formatDisplayNumber(serverQuotaUsed)}</Tag>
<Tag color="green">: {formatDisplayNumber(serverQuotaRemaining)}</Tag>
<Tag color="orange">: {formatDisplayPercent(serverQuotaUsedPercent)}</Tag>
<Tag color="cyan">: {formatDisplayPercent(serverQuotaRemainingPercent)}</Tag>
<Tag color="purple">: {formatDisplayNumber(serverQuotaRemainingAccounts, 2)}</Tag>
</div>
</div>
<div>
<Typography.Text strong>API Key</Typography.Text>
<Space style={{ marginLeft: 8 }}>
<Typography.Text copyable={keyFromStats ? { text: keyFromStats } : undefined}>
{keyFromStats || '-'}
</Typography.Text>
</Space>
</div>
<div>
<Typography.Text strong>key </Typography.Text>
<div style={{ marginTop: 8, display: 'grid', gridTemplateColumns: 'repeat(auto-fit, minmax(180px, 1fr))', gap: 8 }}>
<Tag color="blue">: {keyBalance ?? '-'}</Tag>
<Tag color="geekblue">: {keySource}</Tag>
<Tag color="cyan">: {boundAccounts ?? '-'}</Tag>
<Tag color="purple">: {settlementAmount ?? '-'}</Tag>
</div>
</div>
</Space>
</div>
<div>
<Typography.Text strong>key </Typography.Text>
<div style={{ marginTop: 8, display: 'grid', gridTemplateColumns: 'repeat(auto-fit, minmax(180px, 1fr))', gap: 8 }}>
<Tag color="blue">: {keyBalance ?? '-'}</Tag>
<Tag color="geekblue">: {keySource}</Tag>
<Tag color="cyan">: {boundAccounts ?? '-'}</Tag>
<Tag color="purple">: {settlementAmount ?? '-'}</Tag>
</div>
</div>
</Space>
)}
</Card>
)}
</Card>
<Card title="提现">
<Space direction="vertical" style={{ width: '100%' }}>
<Typography.Text>key {keyBalance ?? '-'}</Typography.Text>
<Form.Item label="提现金额" style={{ marginBottom: 0 }}>
<Select
value={redeemAmount}
onChange={setRedeemAmount}
style={{ width: 240 }}
options={CONTRIBUTION_REDEEM_OPTIONS.map((amount) => ({ label: String(amount), value: amount }))}
/>
</Form.Item>
<Button type="primary" danger onClick={() => { void doRedeem() }} loading={redeeming}>
</Button>
{redeemResponse ? (
<Alert
type={redeemResponse.ok === false ? 'error' : 'success'}
showIcon
message={redeemResponse.ok === false ? `提现失败:${String(redeemResponse.error || '-')}` : redeemSuccessText}
description={<pre style={{ margin: 0, whiteSpace: 'pre-wrap' }}>{formatResultText(redeemResponse)}</pre>}
/>
) : null}
</Space>
</Card>
<Card title="提现">
<Space direction="vertical" style={{ width: '100%' }}>
<Typography.Text>key {keyBalance ?? '-'}</Typography.Text>
<Form.Item label="提现金额" style={{ marginBottom: 0 }}>
<Select
value={redeemAmount}
onChange={setRedeemAmount}
style={{ width: 240 }}
options={CONTRIBUTION_REDEEM_OPTIONS.map((amount) => ({ label: String(amount), value: amount }))}
/>
</Form.Item>
<Button type="primary" danger onClick={() => { void doRedeem() }} loading={redeeming}>
</Button>
{redeemResponse ? (
<Alert
type={redeemResponse.ok === false ? 'error' : 'success'}
showIcon
message={redeemResponse.ok === false ? `提现失败:${String(redeemResponse.error || '-')}` : redeemSuccessText}
description={<pre style={{ margin: 0, whiteSpace: 'pre-wrap' }}>{formatResultText(redeemResponse)}</pre>}
/>
) : null}
</Space>
</Card>
</>
) : (
<Card
title="信息"
extra={(
<Button loading={loadingCustomStats} onClick={() => { void fetchCustomStats() }}>
</Button>
)}
>
{!contributionEnabled ? (
<Alert type="info" showIcon message="贡献功能已关闭,开启后可获取信息。" />
) : !customContributionToken ? (
<Alert type="warning" showIcon message="请先绑定邮箱获取 token" />
) : (
<Space direction="vertical" style={{ width: '100%' }} size={12}>
<div>
<Typography.Text strong></Typography.Text>
<div style={{ marginTop: 8 }}>
<Tag color="blue">: {pickNumber(asRecord(customBalanceResponse), ['balance']) ?? '-'}</Tag>
</div>
</div>
<div>
<Typography.Text strong></Typography.Text>
<div style={{ marginTop: 8 }}>
<Tag color="green">: {pickNumber(asRecord(customStatsResponse), ['success_count']) ?? '-'}</Tag>
<Tag color="orange">: {pickNumber(asRecord(customStatsResponse), ['pending_count']) ?? '-'}</Tag>
<Tag color="red">: {pickNumber(asRecord(customStatsResponse), ['failed_count']) ?? '-'}</Tag>
</div>
</div>
</Space>
)}
</Card>
)}
</div>
)
}
@@ -1647,6 +1799,12 @@ export default function Settings() {
if (!data.contribution_server_url) {
data.contribution_server_url = 'http://new.xem8k5.top:7317/'
}
if (!data.contribution_mode) {
data.contribution_mode = 'codex'
}
if (!data.custom_contribution_url) {
data.custom_contribution_url = 'http://127.0.0.1:5000'
}
if (!data.cloudmail_timeout) {
data.cloudmail_timeout = 30
}
@@ -1663,6 +1821,10 @@ export default function Settings() {
data.cfworker_random_subdomain = parseBooleanConfigValue(data.cfworker_random_subdomain)
data.cfworker_random_name_subdomain = parseBooleanConfigValue(data.cfworker_random_name_subdomain)
data.contribution_enabled = parseBooleanConfigValue(data.contribution_enabled)
data.email_domain_rule_enabled = parseBooleanConfigValue(data.email_domain_rule_enabled)
if (!String(data.email_domain_level_count ?? '').trim()) {
data.email_domain_level_count = 2
}
data.mail_import_source = configMailProvider === 'applemail' ? 'applemail' : 'microsoft'
data.mail_provider = isMailImportProvider ? 'mail_import' : configMailProvider
form.setFieldsValue(data)
@@ -1727,6 +1889,19 @@ export default function Settings() {
values.cfworker_random_subdomain = parseBooleanConfigValue(values.cfworker_random_subdomain)
values.cfworker_random_name_subdomain = parseBooleanConfigValue(values.cfworker_random_name_subdomain)
values.contribution_enabled = parseBooleanConfigValue(values.contribution_enabled)
values.email_domain_rule_enabled = parseBooleanConfigValue(values.email_domain_rule_enabled)
const rawDomainLevelCount = Number.parseInt(String(values.email_domain_level_count ?? '').trim(), 10)
if (values.mail_provider === 'cfworker' && values.email_domain_rule_enabled) {
if (!Number.isInteger(rawDomainLevelCount) || rawDomainLevelCount < 2) {
setActiveTab('mailbox')
message.error('域名级数必须是大于等于 2 的整数')
return
}
}
values.email_domain_level_count =
Number.isInteger(rawDomainLevelCount) && rawDomainLevelCount >= 2
? String(rawDomainLevelCount)
: '2'
await apiFetch('/config', { method: 'PUT', body: JSON.stringify({ data: values }) })
form.setFieldsValue({
@@ -1740,6 +1915,11 @@ export default function Settings() {
cfworker_random_subdomain: values.cfworker_random_subdomain,
cfworker_random_name_subdomain: values.cfworker_random_name_subdomain,
contribution_enabled: values.contribution_enabled,
contribution_mode: values.contribution_mode,
custom_contribution_url: values.custom_contribution_url,
custom_contribution_token: values.custom_contribution_token,
email_domain_rule_enabled: values.email_domain_rule_enabled,
email_domain_level_count: values.email_domain_level_count,
})
message.success('保存成功')
setSaved(true)

View File

@@ -121,6 +121,9 @@ export default function TaskHistory() {
<Popconfirm
title={`确认删除选中的 ${selectedRowKeys.length} 条任务历史?`}
onConfirm={handleBatchDelete}
okText="删除"
cancelText="取消"
okButtonProps={{ danger: true }}
>
<Button danger icon={<DeleteOutlined />}>
{selectedRowKeys.length}

View File

@@ -30,7 +30,7 @@ class EmailServiceAdapter:
code = self.es.get_verification_code(
timeout=timeout,
otp_sent_at=otp_sent_at,
exclude_codes=exclude_codes or self._used_codes,
exclude_codes=exclude_codes if exclude_codes is not None else self._used_codes,
)
if code:
self._used_codes.add(code)

View File

@@ -12,7 +12,7 @@ from core.proxy_utils import build_requests_proxy_config
try:
from curl_cffi import requests as curl_requests
except ImportError:
print(" 需要安装 curl_cffi: pip install curl_cffi")
print("[FAIL] 需要安装 curl_cffi: pip install curl_cffi")
import sys
sys.exit(1)
@@ -1055,7 +1055,7 @@ class ChatGPTClient:
if self._is_registration_complete_state(state):
self.last_registration_state = state
self._log(" 注册流程完成")
self._log("[OK] 注册流程完成")
return True, "注册成功"
if self._state_is_password_registration(state):

View File

@@ -71,12 +71,28 @@ class BaseChatGPTRegistrationModeAdapter(ABC):
"""按模式构造底层注册引擎。"""
def run(self, context: ChatGPTRegistrationContext):
engine = self._create_engine(context)
if context.email is not None:
engine.email = context.email
if context.password is not None:
engine.password = context.password
return engine.run()
_MAILBOX_ERROR_MARKERS = ("service_abuse_mode", "oauth_token_failed")
_MAX_ATTEMPTS = 3
result = None
for _attempt in range(_MAX_ATTEMPTS):
engine = self._create_engine(context)
if context.email is not None:
engine.email = context.email
if context.password is not None:
engine.password = context.password
result = engine.run()
if result.success:
return result
err = str(getattr(result, "error_message", "") or "")
matched_marker = next((m for m in _MAILBOX_ERROR_MARKERS if m in err), None)
if matched_marker and _attempt < _MAX_ATTEMPTS - 1:
context.callback_logger(
f"邮箱 OAuth token 已失效({matched_marker}"
f"换用下一个邮箱重试 ({_attempt + 1}/{_MAX_ATTEMPTS - 1})..."
)
continue
break
return result
def build_account(self, result, fallback_password: str) -> Account:
return Account(

View File

@@ -781,17 +781,19 @@ class OAuthClient:
self._log("步骤2: POST /api/accounts/authorize/continue")
self._log(f"authorize_continue: device_id={device_id}")
sentinel_token = get_sentinel_token_via_browser(
flow="authorize_continue",
proxy=self.proxy,
page_url=continue_referer or f"{self.oauth_issuer}/log-in",
headless=self.browser_mode != "headed",
device_id=device_id,
log_fn=lambda msg: self._log(f"authorize_continue: {msg}"),
)
if sentinel_token:
self._log("authorize_continue: 已通过 Playwright SentinelSDK 获取 token")
else:
sentinel_token = None
for _sentinel_attempt in range(2):
sentinel_token = get_sentinel_token_via_browser(
flow="authorize_continue",
proxy=self.proxy,
page_url=continue_referer or f"{self.oauth_issuer}/log-in",
headless=self.browser_mode != "headed",
device_id=device_id,
log_fn=lambda msg: self._log(f"authorize_continue: {msg}"),
)
if sentinel_token:
self._log("authorize_continue: 已通过 Playwright SentinelSDK 获取 token")
break
sentinel_token = build_sentinel_token(
self.session,
device_id,
@@ -802,9 +804,12 @@ class OAuthClient:
)
if sentinel_token:
self._log("authorize_continue: 已通过 HTTP PoW 获取 token")
else:
self._set_error("无法获取 sentinel token (authorize_continue)")
return None
break
if _sentinel_attempt == 0:
self._log("authorize_continue: sentinel token 获取失败,重试一次...")
if not sentinel_token:
self._set_error("无法获取 sentinel token (authorize_continue)")
return None
request_url = f"{self.oauth_issuer}/api/accounts/authorize/continue"
headers = self._headers(
@@ -1324,7 +1329,7 @@ class OAuthClient:
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log(" OAuth 注册成功")
self._log("[OK] OAuth 注册成功")
else:
self._log("换取 tokens 失败")
return tokens
@@ -1439,7 +1444,7 @@ class OAuthClient:
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log(" OAuth 注册成功")
self._log("[OK] OAuth 注册成功")
else:
self._log("换取 tokens 失败")
return tokens
@@ -1471,7 +1476,7 @@ class OAuthClient:
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log(" OAuth 注册成功")
self._log("[OK] OAuth 注册成功")
else:
self._log("换取 tokens 失败")
return tokens
@@ -1809,7 +1814,7 @@ class OAuthClient:
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log(" OAuth 登录成功")
self._log("[OK] OAuth 登录成功")
else:
self._log("换取 tokens 失败")
return tokens
@@ -1899,7 +1904,7 @@ class OAuthClient:
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log(" OAuth 登录成功")
self._log("[OK] OAuth 登录成功")
else:
self._log("换取 tokens 失败")
return tokens
@@ -1986,7 +1991,7 @@ class OAuthClient:
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log(" OAuth 登录成功")
self._log("[OK] OAuth 登录成功")
else:
self._log("换取 tokens 失败")
return tokens
@@ -2065,7 +2070,7 @@ class OAuthClient:
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log(" OAuth 登录成功")
self._log("[OK] OAuth 登录成功")
else:
self._log("换取 tokens 失败")
return tokens
@@ -2099,7 +2104,7 @@ class OAuthClient:
code, code_verifier, user_agent, impersonate
)
if tokens:
self._log(" OAuth 登录成功")
self._log("[OK] OAuth 登录成功")
else:
self._log("换取 tokens 失败")
return tokens
@@ -3076,7 +3081,7 @@ class OAuthClient:
if not hasattr(skymail_client, "_used_codes"):
skymail_client._used_codes = set()
tried_codes = set(getattr(skymail_client, "_used_codes", set()))
tried_codes = set()
try:
otp_wait_seconds = int(
self.config.get(
@@ -3106,9 +3111,9 @@ class OAuthClient:
otp_resend_wait_seconds = max(30, min(otp_resend_wait_seconds, 900))
otp_deadline = time.time() + otp_wait_seconds
otp_sent_at = _otp_sent_at_baseline
next_resend_at = time.time() + otp_resend_wait_seconds
self._log(
f"OAuth OTP 等待窗口: total={otp_wait_seconds}s, poll_window={otp_poll_window}s"
f"OAuth OTP 等待窗口: total={otp_wait_seconds}s, poll_window={otp_poll_window}s, "
f"每轮最多 5 次无响应后重发,最多 3 轮"
)
def validate_otp(code):
@@ -3210,6 +3215,10 @@ class OAuthClient:
if hasattr(skymail_client, "wait_for_verification_code"):
self._log("使用 wait_for_verification_code 进行阻塞式获取新验证码...")
no_new_count = 0
resend_round = 0
_max_no_new = 5
_max_resend_rounds = 3
while time.time() < otp_deadline:
remaining = max(1, int(otp_deadline - time.time()))
wait_time = min(otp_poll_window, remaining)
@@ -3231,16 +3240,25 @@ class OAuthClient:
code = None
if not code:
if time.time() >= next_resend_at and not self.last_error:
self._log(
f"暂未收到 OTP触发重发(间隔 {otp_resend_wait_seconds}s"
)
if _resend_email_otp():
otp_sent_at = time.time()
next_resend_at = otp_sent_at + otp_resend_wait_seconds
no_new_count += 1
self._log(
f"暂未收到新的 OTP继续等待... (本轮第 {no_new_count}/{_max_no_new} 次)"
)
if no_new_count >= _max_no_new:
if resend_round < _max_resend_rounds:
resend_round += 1
self._log(
f"连续 {_max_no_new} 次未收到新 OTP"
f"触发第 {resend_round}/{_max_resend_rounds} 轮重发..."
)
if _resend_email_otp():
otp_sent_at = time.time()
no_new_count = 0
else:
next_resend_at = time.time() + otp_resend_wait_seconds
self._log("未收到新的 OTP继续等待...")
self._log(
f"已完成 {_max_resend_rounds} 轮重发仍未收到 OTP放弃等待"
)
break
if self.last_error:
break
continue
@@ -3249,6 +3267,7 @@ class OAuthClient:
self._log(f"跳过已尝试验证码: {code}")
continue
no_new_count = 0
next_state = validate_otp(code)
if next_state:
return next_state

View File

@@ -131,7 +131,7 @@ class EmailServiceAdapter:
otp_sent_at: float | None = None,
exclude_codes=None,
):
excluded = set(exclude_codes or set()) | set(self._used_codes)
excluded = set(exclude_codes) if exclude_codes is not None else set(self._used_codes)
self.log_fn(f"正在等待邮箱 {email} 的验证码 ({timeout}s)...")
code = self.email_service.get_verification_code(
email=email,
@@ -322,6 +322,73 @@ class RefreshTokenRegistrationEngine:
or ""
).strip()
def _parallel_add_phone_retry(
self,
*,
result,
register_client,
email_adapter,
first_name: str,
last_name: str,
birthdate: str,
register_otp_wait_seconds: int,
parallel: int = 3,
):
"""add_phone 阻断后,并行启动多路全新 OAuth session第一个成功的获胜。"""
from concurrent.futures import ThreadPoolExecutor, as_completed
winning_tokens = None
winning_client = None
def _one_attempt(idx):
client = self._build_oauth_client()
client.config.setdefault(
"chatgpt_oauth_otp_wait_seconds", register_otp_wait_seconds
)
self._log(f"add_phone 并行重试 #{idx + 1}/{parallel} 启动...")
t = client.login_and_get_tokens(
result.email,
self.password,
device_id="",
user_agent=getattr(register_client, "ua", None),
sec_ch_ua=getattr(register_client, "sec_ch_ua", None),
impersonate=getattr(register_client, "impersonate", None),
skymail_client=email_adapter,
prefer_passwordless_login=True,
allow_phone_verification=False,
force_new_browser=True,
force_chatgpt_entry=False,
screen_hint="login",
force_password_login=False,
complete_about_you_if_needed=True,
first_name=first_name,
last_name=last_name,
birthdate=birthdate,
login_source=f"add_phone_parallel_{idx}",
)
return t, client
with ThreadPoolExecutor(max_workers=parallel) as executor:
futures = {executor.submit(_one_attempt, i): i for i in range(parallel)}
for future in as_completed(futures):
try:
t, client = future.result()
if t and not winning_tokens:
winning_tokens = t
winning_client = client
self._log(
f"add_phone 并行重试 #{futures[future] + 1} 成功,取消其余..."
)
# 取消尚未开始的 futures
for f in futures:
if f is not future:
f.cancel()
break
except Exception as exc:
self._log(f"add_phone 并行重试异常: {exc}", "warning")
return winning_tokens, winning_client
def _populate_result_from_tokens(
self,
result: RegistrationResult,
@@ -426,19 +493,31 @@ class RefreshTokenRegistrationEngine:
self._log,
)
register_client = self._build_chatgpt_client()
self._log("2. 执行注册状态机interrupt 模式:不在注册阶段提交 about_you...")
registered, registration_message = register_client.register_complete_flow(
result.email,
self.password,
first_name,
last_name,
birthdate,
email_adapter,
stop_before_about_you_submission=True,
otp_wait_timeout=register_otp_wait_seconds,
otp_resend_wait_timeout=register_otp_resend_wait_seconds,
)
_REG_RETRY_MARKERS = ("访问首页失败", "预授权被拦截")
registered = False
registration_message = ""
for _reg_attempt in range(3):
if _reg_attempt > 0:
self._log(
f"注册状态机重试 {_reg_attempt}/2原因: {registration_message}..."
)
register_client = self._build_chatgpt_client()
self._log("2. 执行注册状态机interrupt 模式:不在注册阶段提交 about_you...")
registered, registration_message = register_client.register_complete_flow(
result.email,
self.password,
first_name,
last_name,
birthdate,
email_adapter,
stop_before_about_you_submission=True,
otp_wait_timeout=register_otp_wait_seconds,
otp_resend_wait_timeout=register_otp_resend_wait_seconds,
)
if registered:
break
if not any(m in registration_message for m in _REG_RETRY_MARKERS):
break
if not registered:
if not self._should_switch_to_login_after_register_failure(
@@ -530,8 +609,25 @@ class RefreshTokenRegistrationEngine:
if not tokens:
last_error = oauth_client.last_error or "OAuth 登录状态机失败"
result.error_message = last_error
return result
if "add_phone" in last_error:
self._log(
"OAuth add_phone 阻断,启动并行 OAuth 重试3 路并发)...",
"warning",
)
tokens, oauth_client = self._parallel_add_phone_retry(
result=result,
register_client=register_client,
email_adapter=email_adapter,
first_name=first_name,
last_name=last_name,
birthdate=birthdate,
register_otp_wait_seconds=register_otp_wait_seconds,
)
if not tokens:
last_error = (oauth_client.last_error if oauth_client else None) or last_error
if not tokens:
result.error_message = last_error
return result
self._populate_result_from_tokens(
result=result,

View File

@@ -270,7 +270,7 @@ def _get_sentinel_token_via_quickjs(
separators=(",", ":"),
ensure_ascii=False,
)
logger("Sentinel QuickJS 成功: p= t= c=")
logger("Sentinel QuickJS 成功: p=OK t=OK c=OK")
return token
except Exception as e:
logger(f"Sentinel QuickJS 异常: {e}")
@@ -350,7 +350,7 @@ def get_sentinel_token_via_browser(
{
"name": "oai-did",
"value": str(device_id),
"url": "https://auth.openai.com/",
"domain": "auth.openai.com",
"path": "/",
"secure": True,
"sameSite": "Lax",
@@ -401,9 +401,9 @@ def get_sentinel_token_via_browser(
parsed = json.loads(token)
logger(
"Sentinel Browser 成功: "
f"p={'' if parsed.get('p') else ''} "
f"t={'' if parsed.get('t') else ''} "
f"c={'' if parsed.get('c') else ''}"
f"p={'OK' if parsed.get('p') else 'X'} "
f"t={'OK' if parsed.get('t') else 'X'} "
f"c={'OK' if parsed.get('c') else 'X'}"
)
except Exception:
logger(f"Sentinel Browser 成功: len={len(token)}")

View File

@@ -19,6 +19,19 @@ def _is_config_enabled(value: Any, default: bool = False) -> bool:
return normalized in {"1", "true", "yes", "on", "enabled"}
def _pick_text(source: Any, *keys: str, default: str = "") -> str:
if not isinstance(source, dict):
return default
for key in keys:
value = source.get(key)
if value is None:
continue
text = value.strip() if isinstance(value, str) else str(value).strip()
if text:
return text
return default
def sync_account(account) -> list[dict[str, Any]]:
"""根据平台将账号同步到外部系统。"""
from core.config_store import config_store
@@ -33,11 +46,11 @@ def sync_account(account) -> list[dict[str, Any]]:
a = _A()
a.email = account.email
extra = _get_account_extra(account)
a.access_token = extra.get("access_token") or account.token
a.refresh_token = extra.get("refresh_token", "")
a.id_token = extra.get("id_token", "")
a.session_token = extra.get("session_token", "")
a.client_id = extra.get("client_id", "app_EMoamEEZ73f0CkXaXp7hrann")
a.access_token = _pick_text(extra, "access_token", "accessToken") or account.token
a.refresh_token = _pick_text(extra, "refresh_token", "refreshToken")
a.id_token = _pick_text(extra, "id_token", "idToken")
a.session_token = _pick_text(extra, "session_token", "sessionToken")
a.client_id = _pick_text(extra, "client_id", "clientId", default="app_EMoamEEZ73f0CkXaXp7hrann")
return a
if platform == "chatgpt":
@@ -46,22 +59,108 @@ def sync_account(account) -> list[dict[str, Any]]:
# 贡献模式优先级最高:开启后仅上传到贡献服务器,避免重复上报到其它平台。
contribution_enabled = _is_config_enabled(config_store.get("contribution_enabled", "0"))
if contribution_enabled:
contribution_url = str(config_store.get("contribution_server_url", "") or "").strip()
contribution_key = str(config_store.get("contribution_key", "") or "").strip()
if not contribution_url:
msg = "Contribution 服务器地址未配置"
persist_cpa_sync_result(account, False, msg)
results.append({"name": "Contribution", "ok": False, "msg": msg})
return results
contribution_mode = str(config_store.get("contribution_mode", "codex") or "codex").strip().lower()
ok, msg = upload_chatgpt_account_to_cpa(
account,
api_url=contribution_url,
api_key=contribution_key or None,
)
persist_cpa_sync_result(account, ok, msg)
results.append({"name": "Contribution", "ok": ok, "msg": msg})
return results
if contribution_mode == "custom":
# 自定义贡献系统模式
custom_url = str(config_store.get("custom_contribution_url", "") or "").strip()
custom_token = str(config_store.get("custom_contribution_token", "") or "").strip()
if not custom_url:
msg = "自定义贡献服务器地址未配置"
persist_cpa_sync_result(account, False, msg)
results.append({"name": "CustomContribution", "ok": False, "msg": msg})
return results
if not custom_token:
msg = "自定义贡献系统 token 未配置(请先绑定邮箱)"
persist_cpa_sync_result(account, False, msg)
results.append({"name": "CustomContribution", "ok": False, "msg": msg})
return results
try:
import requests
from platforms.chatgpt.cpa_upload import generate_token_json
# 生成完整的 token JSON
extra = _get_account_extra(account)
token_json = generate_token_json(account)
# 如果 token_json 中没有 refresh_token从 extra 获取
if not token_json.get("refresh_token"):
refresh_token = _pick_text(extra, "refresh_token", "refreshToken")
print(f"[DEBUG] extra keys: {list(extra.keys())}")
print(f"[DEBUG] refresh_token from extra: {refresh_token[:20] if refresh_token else 'EMPTY'}")
if refresh_token:
token_json["refresh_token"] = refresh_token
if not token_json.get("access_token"):
access_token = _pick_text(extra, "access_token", "accessToken") or getattr(account, "token", "")
if access_token:
token_json["access_token"] = access_token
if not token_json.get("id_token"):
id_token = _pick_text(extra, "id_token", "idToken")
if id_token:
token_json["id_token"] = id_token
if not token_json.get("client_id"):
client_id = _pick_text(extra, "client_id", "clientId")
if client_id:
token_json["client_id"] = client_id
refresh_token = str(token_json.get("refresh_token") or "").strip()
access_token = str(token_json.get("access_token") or "").strip()
# 验证必须有 refresh_token
print(f"[DEBUG] Final token_json keys: {list(token_json.keys())}")
print(f"[DEBUG] Final refresh_token: {refresh_token[:20] if refresh_token else 'EMPTY'}")
if not refresh_token:
msg = "账号缺少 refresh_token"
persist_cpa_sync_result(account, False, msg)
results.append({"name": "CustomContribution", "ok": False, "msg": msg})
return results
resp = requests.post(
f"{custom_url.rstrip('/')}/api/upload",
json={
"email": account.email,
"refresh_token": refresh_token,
"access_token": access_token,
"token_json": token_json,
},
headers={"Authorization": f"Bearer {custom_token}"},
timeout=15,
)
data = resp.json()
if resp.status_code >= 400:
msg = data.get("error") or data.get("message") or str(data)
persist_cpa_sync_result(account, False, msg)
results.append({"name": "CustomContribution", "ok": False, "msg": msg})
return results
msg = f"上传成功: {data.get('message', '')}"
persist_cpa_sync_result(account, True, msg)
results.append({"name": "CustomContribution", "ok": True, "msg": msg})
return results
except Exception as exc:
msg = f"上传到自定义贡献系统失败: {exc}"
persist_cpa_sync_result(account, False, msg)
results.append({"name": "CustomContribution", "ok": False, "msg": msg})
return results
else:
# codex2api 模式(原有逻辑)
contribution_url = str(config_store.get("contribution_server_url", "") or "").strip()
contribution_key = str(config_store.get("contribution_key", "") or "").strip()
if not contribution_url:
msg = "Contribution 服务器地址未配置"
persist_cpa_sync_result(account, False, msg)
results.append({"name": "Contribution", "ok": False, "msg": msg})
return results
ok, msg = upload_chatgpt_account_to_cpa(
account,
api_url=contribution_url,
api_key=contribution_key or None,
)
persist_cpa_sync_result(account, ok, msg)
results.append({"name": "Contribution", "ok": ok, "msg": msg})
return results
cpa_url = str(config_store.get("cpa_api_url", "") or "").strip()
cpa_enabled = _is_config_enabled(
@@ -82,8 +181,8 @@ def sync_account(account) -> list[dict[str, Any]]:
pass
cp = _CP()
cp.access_token = extra.get("access_token") or account.token
cp.refresh_token = extra.get("refresh_token", "")
cp.access_token = _pick_text(extra, "access_token", "accessToken") or account.token
cp.refresh_token = _pick_text(extra, "refresh_token", "refreshToken")
if upload_type == "rt":
from platforms.chatgpt.cpa_upload import upload_to_codex_proxy

View File

@@ -128,6 +128,61 @@ class CFWorkerMailboxTests(unittest.TestCase):
"rand.mail.sub.example",
)
@patch("requests.request")
def test_get_email_auto_fills_to_configured_domain_levels(self, mock_request):
mock_request.return_value.status_code = 200
mock_request.return_value.text = '{"email":"user@l1.l2.github163.com","token":"token-123"}'
mock_request.return_value.json.return_value = {
"email": "user@l1.l2.github163.com",
"token": "token-123",
}
mailbox = create_mailbox(
"cfworker",
extra={
"cfworker_api_url": "https://example.invalid",
"cfworker_admin_token": "admin-token",
"cfworker_domain": "github163.com",
"email_domain_level_count": 4,
},
)
with patch.object(type(mailbox), "_generate_subdomain_label", side_effect=["l1", "l2"]):
mailbox.get_email()
self.assertEqual(
mock_request.call_args.kwargs["json"]["domain"],
"l1.l2.github163.com",
)
@patch("requests.request")
def test_get_email_auto_fill_keeps_configured_subdomain(self, mock_request):
mock_request.return_value.status_code = 200
mock_request.return_value.text = '{"email":"user@l1.pool.github163.com","token":"token-123"}'
mock_request.return_value.json.return_value = {
"email": "user@l1.pool.github163.com",
"token": "token-123",
}
mailbox = create_mailbox(
"cfworker",
extra={
"cfworker_api_url": "https://example.invalid",
"cfworker_admin_token": "admin-token",
"cfworker_domain": "github163.com",
"cfworker_subdomain": "pool",
"email_domain_level_count": 4,
},
)
with patch.object(type(mailbox), "_generate_subdomain_label", return_value="l1"):
mailbox.get_email()
self.assertEqual(
mock_request.call_args.kwargs["json"]["domain"],
"l1.pool.github163.com",
)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,39 @@
import pytest
from core.email_domain_policy import validate_email_domain_policy
def test_policy_disabled_skips_validation():
validate_email_domain_policy("user@example.com", {"email_domain_rule_enabled": "0"})
def test_policy_checks_domain_level_count():
with pytest.raises(ValueError, match="至少需要 4 级"):
validate_email_domain_policy(
"user@a1b2.example.com",
{
"email_domain_rule_enabled": "1",
"email_domain_level_count": "4",
},
)
def test_policy_checks_letter_and_digit_count():
with pytest.raises(ValueError, match="至少包含 2 个英文字母和 2 个数字"):
validate_email_domain_policy(
"user@ab.example.com",
{
"email_domain_rule_enabled": "1",
"email_domain_level_count": "2",
},
)
def test_policy_accepts_valid_n_level_domain():
validate_email_domain_policy(
"user@a1.b2.example.com",
{
"email_domain_rule_enabled": "1",
"email_domain_level_count": "4",
},
)

View File

@@ -71,6 +71,47 @@ class ExternalSyncContributionModeTests(unittest.TestCase):
upload_mock.assert_not_called()
persist_mock.assert_called_once()
def test_custom_contribution_upload_sends_token_json_and_top_level_tokens(self):
account = DummyAccount(
extra={
"refreshToken": "rt-camel-case",
"accessToken": "at-camel-case",
"idToken": "id-camel-case",
"clientId": "client-camel-case",
}
)
cfg = {
"contribution_enabled": "1",
"contribution_mode": "custom",
"custom_contribution_url": "http://custom.local:5000",
"custom_contribution_token": "custom-token",
}
response = mock.Mock()
response.status_code = 200
response.json.return_value = {"message": "queued"}
with mock.patch("core.config_store.config_store.get", side_effect=_config_getter(cfg)):
with mock.patch("platforms.chatgpt.cpa_upload.generate_token_json", return_value={"type": "codex", "email": account.email}):
with mock.patch("requests.post", return_value=response) as post_mock:
with mock.patch("services.external_sync.persist_cpa_sync_result") as persist_mock:
result = sync_account(account)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["name"], "CustomContribution")
self.assertTrue(result[0]["ok"])
persist_mock.assert_called_once_with(account, True, "上传成功: queued")
post_mock.assert_called_once()
payload = post_mock.call_args.kwargs["json"]
self.assertEqual(payload["email"], account.email)
self.assertEqual(payload["refresh_token"], "rt-camel-case")
self.assertEqual(payload["access_token"], "at-camel-case")
self.assertEqual(payload["token_json"]["refresh_token"], "rt-camel-case")
self.assertEqual(payload["token_json"]["access_token"], "at-camel-case")
self.assertEqual(payload["token_json"]["id_token"], "id-camel-case")
self.assertEqual(payload["token_json"]["client_id"], "client-camel-case")
def test_contribution_disabled_keeps_existing_cpa_sync(self):
account = DummyAccount()
cfg = {