Files
any-auto-register/platforms/chatgpt/plugin.py
2026-04-03 11:30:39 +08:00

380 lines
14 KiB
Python

"""ChatGPT / Codex CLI 平台插件"""
import random
import string
from core.base_mailbox import BaseMailbox
from core.base_platform import Account, BasePlatform, RegisterConfig
from core.registry import register
from platforms.chatgpt.chatgpt_registration_mode_adapter import (
ChatGPTRegistrationContext,
build_chatgpt_registration_mode_adapter,
)
@register
class ChatGPTPlatform(BasePlatform):
name = "chatgpt"
display_name = "ChatGPT"
version = "1.0.0"
def __init__(self, config: RegisterConfig = None, mailbox: BaseMailbox = None):
super().__init__(config)
self.mailbox = mailbox
def check_valid(self, account: Account) -> bool:
try:
from platforms.chatgpt.payment import check_subscription_status
class _A:
pass
a = _A()
extra = account.extra or {}
a.access_token = extra.get("access_token") or account.token
a.cookies = extra.get("cookies", "")
status = check_subscription_status(a, proxy=self.config.proxy if self.config else None)
return status not in ("expired", "invalid", "banned", None)
except Exception:
return False
def register(self, email: str = None, password: str = None) -> Account:
if not password:
password = "".join(random.choices(string.ascii_letters + string.digits + "!@#$", k=16))
proxy = self.config.proxy if self.config else None
browser_mode = (self.config.executor_type if self.config else None) or "protocol"
extra_config = (self.config.extra or {}) if self.config and getattr(self.config, "extra", None) else {}
log_fn = getattr(self, "_log_fn", print)
max_retries = 3
try:
max_retries = int(extra_config.get("register_max_retries", 3) or 3)
except Exception:
max_retries = 3
if self.mailbox:
_mailbox = self.mailbox
_fixed_email = email
def _resolve_email(candidate_email: str = "") -> str:
resolved_email = str(_fixed_email or candidate_email or "").strip()
if not resolved_email:
raise RuntimeError("custom_provider 返回空邮箱地址")
return resolved_email
class GenericEmailService:
service_type = type("ST", (), {"value": "custom_provider"})()
def __init__(self):
self._acct = None
self._email = _fixed_email
self._before_ids = set()
def create_email(self, config=None):
if self._email and self._acct and _fixed_email:
return {"email": self._email, "service_id": self._acct.account_id, "token": ""}
self._acct = _mailbox.get_email()
get_current_ids = getattr(_mailbox, "get_current_ids", None)
if callable(get_current_ids):
self._before_ids = set(get_current_ids(self._acct) or [])
else:
self._before_ids = set()
generated_email = getattr(self._acct, "email", "")
if not self._email:
self._email = _resolve_email(generated_email)
elif not _fixed_email:
self._email = _resolve_email(generated_email)
return {"email": self._email, "service_id": self._acct.account_id, "token": ""}
def get_verification_code(
self,
email=None,
email_id=None,
timeout=120,
pattern=None,
otp_sent_at=None,
exclude_codes=None,
):
if not self._acct:
raise RuntimeError("邮箱账户尚未创建,无法获取验证码")
return _mailbox.wait_for_code(
self._acct,
keyword="",
timeout=timeout,
before_ids=self._before_ids,
otp_sent_at=otp_sent_at,
exclude_codes=exclude_codes,
)
def update_status(self, success, error=None):
pass
@property
def status(self):
return None
email_service = GenericEmailService()
else:
from core.base_mailbox import TempMailLolMailbox
_tmail = TempMailLolMailbox(proxy=proxy)
_tmail._task_control = getattr(self, "_task_control", None)
class TempMailEmailService:
service_type = type("ST", (), {"value": "tempmail_lol"})()
def __init__(self):
self._acct = None
self._before_ids = set()
def create_email(self, config=None):
acct = _tmail.get_email()
self._acct = acct
self._before_ids = set(_tmail.get_current_ids(acct) or [])
resolved_email = str(getattr(acct, "email", "") or "").strip()
if not resolved_email:
raise RuntimeError("tempmail_lol 返回空邮箱地址")
return {"email": resolved_email, "service_id": acct.account_id, "token": acct.account_id}
def get_verification_code(
self,
email=None,
email_id=None,
timeout=120,
pattern=None,
otp_sent_at=None,
exclude_codes=None,
):
return _tmail.wait_for_code(
self._acct,
keyword="",
timeout=timeout,
before_ids=self._before_ids,
otp_sent_at=otp_sent_at,
exclude_codes=exclude_codes,
)
def update_status(self, success, error=None):
pass
@property
def status(self):
return None
email_service = TempMailEmailService()
adapter = build_chatgpt_registration_mode_adapter(extra_config)
context = ChatGPTRegistrationContext(
email_service=email_service,
proxy_url=proxy,
callback_logger=log_fn,
email=email,
password=password,
browser_mode=browser_mode,
max_retries=max_retries,
extra_config=extra_config,
)
result = adapter.run(context)
if not result or not result.success:
raise RuntimeError(result.error_message if result else "注册失败")
return adapter.build_account(result, password)
def get_platform_actions(self) -> list:
return [
{"id": "probe_local_status", "label": "探测本地状态", "params": []},
{"id": "sync_cliproxyapi_status", "label": "同步 CLIProxyAPI 状态", "params": []},
{"id": "refresh_token", "label": "刷新 Token", "params": []},
{
"id": "payment_link",
"label": "生成支付链接",
"params": [
{"key": "country", "label": "地区", "type": "select", "options": ["US", "SG", "TR", "HK", "JP", "GB", "AU", "CA"]},
{"key": "plan", "label": "套餐", "type": "select", "options": ["plus", "team"]},
],
},
{
"id": "upload_cpa",
"label": "上传 CPA",
"params": [
{"key": "api_url", "label": "CPA API URL", "type": "text"},
{"key": "api_key", "label": "CPA API Key", "type": "text"},
],
},
{
"id": "upload_sub2api",
"label": "上传 Sub2API",
"params": [
{"key": "api_url", "label": "Sub2API API URL", "type": "text"},
{"key": "api_key", "label": "Sub2API API Key", "type": "text"},
],
},
{
"id": "upload_tm",
"label": "上传 Team Manager",
"params": [
{"key": "api_url", "label": "TM API URL", "type": "text"},
{"key": "api_key", "label": "TM API Key", "type": "text"},
],
},
{
"id": "upload_codex_proxy",
"label": "上传 CodexProxy",
"params": [
{"key": "api_url", "label": "API URL", "type": "text"},
{"key": "api_key", "label": "Admin Key", "type": "text"},
],
},
]
def execute_action(self, action_id: str, account: Account, params: dict) -> dict:
proxy = self.config.proxy if self.config else None
extra = account.extra or {}
class _A:
pass
a = _A()
a.email = account.email
a.access_token = extra.get("access_token") or account.token
a.refresh_token = extra.get("refresh_token", "")
a.id_token = extra.get("id_token", "")
a.session_token = extra.get("session_token", "")
a.client_id = extra.get("client_id", "app_EMoamEEZ73f0CkXaXp7hrann")
a.cookies = extra.get("cookies", "")
a.user_id = account.user_id
if action_id == "probe_local_status":
from platforms.chatgpt.status_probe import probe_local_chatgpt_status
probe_result = probe_local_chatgpt_status(a, proxy=proxy)
summary = (
f"认证={probe_result.get('auth', {}).get('state', 'unknown')}, "
f"订阅={probe_result.get('subscription', {}).get('plan', 'unknown')}, "
f"Codex={probe_result.get('codex', {}).get('state', 'unknown')}"
)
return {
"ok": True,
"data": {
"message": f"本地状态探测完成:{summary}",
"probe": probe_result,
},
"account_extra_patch": {
"chatgpt_local": probe_result,
},
}
if action_id == "sync_cliproxyapi_status":
from services.cliproxyapi_sync import sync_chatgpt_cliproxyapi_status
sync_result = sync_chatgpt_cliproxyapi_status(a)
ok = bool(sync_result.get("uploaded")) and sync_result.get("remote_state") not in {"unreachable", "not_found"}
summary = (
f"远端状态={sync_result.get('status') or 'not_found'}, "
f"探测={sync_result.get('remote_state') or 'not_checked'}"
)
return {
"ok": ok,
"data": {
"message": f"CLIProxyAPI 状态同步完成:{summary}",
"sync": sync_result,
},
"error": sync_result.get("message") if not ok else "",
"account_extra_patch": {
"sync_statuses": {
"cliproxyapi": sync_result,
},
},
}
if action_id == "refresh_token":
from platforms.chatgpt.token_refresh import TokenRefreshManager
manager = TokenRefreshManager(proxy_url=proxy)
result = manager.refresh_account(a)
if result.success:
return {
"ok": True,
"data": {
"access_token": result.access_token,
"refresh_token": result.refresh_token,
},
}
return {"ok": False, "error": result.error_message}
if action_id == "payment_link":
from platforms.chatgpt.payment import generate_plus_link, generate_team_link
plan = params.get("plan", "plus")
country = params.get("country", "US")
if plan == "plus":
url = generate_plus_link(a, proxy=proxy, country=country)
else:
url = generate_team_link(
a,
workspace_name=params.get("workspace_name", "MyTeam"),
price_interval=params.get("price_interval", "month"),
seat_quantity=int(params.get("seat_quantity", 5) or 5),
proxy=proxy,
country=country,
)
return {"ok": bool(url), "data": {"url": url}}
if action_id == "upload_cpa":
from platforms.chatgpt.cpa_upload import generate_token_json, upload_to_cpa
token_data = generate_token_json(a)
ok, msg = upload_to_cpa(
token_data,
api_url=params.get("api_url"),
api_key=params.get("api_key"),
)
return {"ok": ok, "data": msg}
if action_id == "upload_sub2api":
from platforms.chatgpt.sub2api_upload import upload_to_sub2api
ok, msg = upload_to_sub2api(
a,
api_url=params.get("api_url"),
api_key=params.get("api_key"),
)
return {"ok": ok, "data": msg}
if action_id == "upload_tm":
from platforms.chatgpt.cpa_upload import upload_to_team_manager
ok, msg = upload_to_team_manager(
a,
api_url=params.get("api_url"),
api_key=params.get("api_key"),
)
return {"ok": ok, "data": msg}
if action_id == "upload_codex_proxy":
upload_type = str(
params.get("upload_type")
or (self.config.extra or {}).get("codex_proxy_upload_type")
or "at"
).strip().lower()
if upload_type == "rt":
from platforms.chatgpt.cpa_upload import upload_to_codex_proxy
ok, msg = upload_to_codex_proxy(
a,
api_url=params.get("api_url"),
api_key=params.get("api_key"),
)
else:
from platforms.chatgpt.cpa_upload import upload_at_to_codex_proxy
ok, msg = upload_at_to_codex_proxy(
a,
api_url=params.get("api_url"),
api_key=params.get("api_key"),
)
return {"ok": ok, "data": msg}
raise NotImplementedError(f"未知操作: {action_id}")