Files
any-auto-register/core/base_mailbox.py
流云 cba60a2aa3 feat: 新增 LuckMail 平台及自动分配邮箱渠道支持
- 引入 LuckMail 核心 SDK
- 封装 \LuckMailMailbox\ 适配器接入注册工厂
- 支持智能匹配各类项目的子渠道编码
- 完善前端 UI 配置选项
2026-03-28 22:02:22 +08:00

761 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""邮箱池基类 - 抽象临时邮箱/收件服务"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, Any
@dataclass
class MailboxAccount:
email: str
account_id: str = ""
extra: dict = None # 平台额外信息
class BaseMailbox(ABC):
def _log(self, message: str) -> None:
log_fn = getattr(self, "_log_fn", None)
if callable(log_fn):
log_fn(message)
@abstractmethod
def get_email(self) -> MailboxAccount:
"""获取一个可用邮箱"""
...
@abstractmethod
def wait_for_code(self, account: MailboxAccount, keyword: str = "",
timeout: int = 120, before_ids: set = None,
code_pattern: str = None, **kwargs) -> str:
"""等待并返回验证码code_pattern 为自定义正则默认匹配6位数字"""
...
def _safe_extract(self, text: str, pattern: str = None) -> Optional[str]:
"""通用验证码提取逻辑:若有捕获组则返回 group(1),否则返回 group(0)"""
import re
text = str(text or "")
if not text:
return None
patterns = []
if pattern:
patterns.append(pattern)
# 先匹配带明显语义的验证码,避免误提取 MIME boundary、时间戳等 6 位数字。
patterns.extend([
r'(?is)(?:verification\s+code|one[-\s]*time\s+(?:password|code)|security\s+code|login\s+code|验证码|校验码|动态码|認證碼|驗證碼)[^0-9]{0,30}(\d{6})',
r'(?is)\bcode\b[^0-9]{0,12}(\d{6})',
r'(?<!#)(?<!\d)(\d{6})(?!\d)',
])
for regex in patterns:
m = re.search(regex, text)
if m:
# 兼容逻辑:若 pattern 中有捕获组则取 group(1),否则取 group(0)
return m.group(1) if m.groups() else m.group(0)
return None
def _decode_raw_content(self, raw: str) -> str:
"""解析邮件原始文本 (借鉴自 Fugle),处理 Quoted-Printable 和 HTML 实体"""
import quopri, html, re
text = str(raw or "")
if not text: return ""
# 简单切分 Header 和 Body
if "\r\n\r\n" in text:
text = text.split("\r\n\r\n", 1)[1]
elif "\n\n" in text:
text = text.split("\n\n", 1)[1]
try:
# 处理 Quoted-Printable
decoded_bytes = quopri.decodestring(text)
text = decoded_bytes.decode("utf-8", errors="ignore")
except Exception:
pass
# 清除 HTML 标签并反转义
text = html.unescape(text)
text = re.sub(r'(?im)^content-(?:type|transfer-encoding):.*$', ' ', text)
text = re.sub(r'(?im)^--+[_=\w.-]+$', ' ', text)
text = re.sub(r'(?i)----=_part_[\w.]+', ' ', text)
text = re.sub(r'<[^>]+>', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
@abstractmethod
def get_current_ids(self, account: MailboxAccount) -> set:
"""返回当前邮件 ID 集合(用于过滤旧邮件)"""
...
def create_mailbox(provider: str, extra: dict = None, proxy: str = None) -> 'BaseMailbox':
"""工厂方法:根据 provider 创建对应的 mailbox 实例"""
extra = extra or {}
if provider == "tempmail_lol":
return TempMailLolMailbox(proxy=proxy)
elif provider == "duckmail":
return DuckMailMailbox(
api_url=extra.get("duckmail_api_url", "https://www.duckmail.sbs"),
provider_url=extra.get("duckmail_provider_url", "https://api.duckmail.sbs"),
bearer=extra.get("duckmail_bearer", "kevin273945"),
proxy=proxy,
)
elif provider == "freemail":
return FreemailMailbox(
api_url=extra.get("freemail_api_url", ""),
admin_token=extra.get("freemail_admin_token", ""),
username=extra.get("freemail_username", ""),
password=extra.get("freemail_password", ""),
proxy=proxy,
)
elif provider == "moemail":
return MoeMailMailbox(
api_url=extra.get("moemail_api_url", "https://sall.cc"),
proxy=proxy,
)
elif provider == "cfworker":
return CFWorkerMailbox(
api_url=extra.get("cfworker_api_url", ""),
admin_token=extra.get("cfworker_admin_token", ""),
domain=extra.get("cfworker_domain", ""),
fingerprint=extra.get("cfworker_fingerprint", ""),
proxy=proxy,
)
elif provider == "luckmail":
return LuckMailMailbox(
base_url=extra.get("luckmail_base_url") or "https://mails.luckyous.com/",
api_key=extra.get("luckmail_api_key", ""),
project_code=extra.get("luckmail_project_code", ""),
email_type=extra.get("luckmail_email_type", ""),
)
else: # laoudo
return LaoudoMailbox(
auth_token=extra.get("laoudo_auth", ""),
email=extra.get("laoudo_email", ""),
account_id=extra.get("laoudo_account_id", ""),
)
class LaoudoMailbox(BaseMailbox):
"""laoudo.com 邮箱服务"""
def __init__(self, auth_token: str, email: str, account_id: str):
self.auth = auth_token
self._email = email
self._account_id = account_id
self.api = "https://laoudo.com/api/email"
self._ua = "Mozilla/5.0"
def get_email(self) -> MailboxAccount:
if not self._email:
raise RuntimeError(
"Laoudo 邮箱未配置或已失效,请检查 laoudo_auth、laoudo_email、laoudo_account_id 配置,"
"或切换到 tempmail_lol无需配置"
)
return MailboxAccount(email=self._email, account_id=self._account_id)
def get_current_ids(self, account: MailboxAccount) -> set:
from curl_cffi import requests as curl_requests
try:
r = curl_requests.get(
f"{self.api}/list",
params={"accountId": account.account_id, "allReceive": 0,
"emailId": 0, "timeSort": 1, "size": 50, "type": 0},
headers={"authorization": self.auth, "user-agent": self._ua},
timeout=15, impersonate="chrome131"
)
if r.status_code == 200:
mails = r.json().get("data", {}).get("list", []) or []
return {m.get("id") or m.get("emailId") for m in mails if m.get("id") or m.get("emailId")}
except Exception:
pass
return set()
def wait_for_code(self, account: MailboxAccount, keyword: str = "trae",
timeout: int = 120, before_ids: set = None, code_pattern: str = None, **kwargs) -> str:
import re, time
from curl_cffi import requests as curl_requests
seen = set(before_ids) if before_ids else set()
start = time.time()
h = {"authorization": self.auth, "user-agent": self._ua}
while time.time() - start < timeout:
try:
r = curl_requests.get(
f"{self.api}/list",
params={"accountId": account.account_id, "allReceive": 0,
"emailId": 0, "timeSort": 1, "size": 50, "type": 0},
headers=h, timeout=15, impersonate="chrome131"
)
if r.status_code == 200:
mails = r.json().get("data", {}).get("list", []) or []
for mail in mails:
mid = mail.get("id") or mail.get("emailId")
if not mid or mid in seen:
continue
seen.add(mid)
text = (str(mail.get("subject", "")) + " " +
str(mail.get("content") or mail.get("html") or ""))
if keyword and keyword.lower() not in text.lower():
continue
code = self._safe_extract(text, code_pattern)
if code:
return code
except Exception:
pass
time.sleep(4)
raise TimeoutError(f"等待验证码超时 ({timeout}s)")
class AitreMailbox(BaseMailbox):
"""mail.aitre.cc 临时邮箱"""
def __init__(self, email: str):
self._email = email
self.api = "https://mail.aitre.cc/api/tempmail"
def get_email(self) -> MailboxAccount:
return MailboxAccount(email=self._email)
def get_current_ids(self, account: MailboxAccount) -> set:
import requests
try:
r = requests.get(f"{self.api}/emails", params={"email": account.email}, timeout=10)
emails = r.json().get("emails", [])
return {str(m["id"]) for m in emails if "id" in m}
except Exception:
return set()
def wait_for_code(self, account: MailboxAccount, keyword: str = "trae",
timeout: int = 120, before_ids: set = None, code_pattern: str = None, **kwargs) -> str:
import re, time, requests
seen = set(before_ids) if before_ids else set()
last_check = None
start = time.time()
while time.time() - start < timeout:
params = {"email": account.email}
if last_check:
params["lastCheck"] = last_check
try:
r = requests.get(f"{self.api}/poll", params=params, timeout=10)
data = r.json()
last_check = data.get("lastChecked")
if data.get("count", 0) > 0:
r2 = requests.get(f"{self.api}/emails", params={"email": account.email}, timeout=10)
for mail in r2.json().get("emails", []):
mid = str(mail.get("id", ""))
if mid in seen:
continue
seen.add(mid)
text = mail.get("preview", "") + mail.get("content", "")
if keyword and keyword.lower() not in text.lower():
continue
code = self._safe_extract(text, code_pattern)
if code:
return code
except Exception:
pass
time.sleep(3)
raise TimeoutError(f"等待验证码超时 ({timeout}s)")
class TempMailLolMailbox(BaseMailbox):
"""tempmail.lol 免费临时邮箱(无需注册,自动生成)"""
def __init__(self, proxy: str = None):
self.api = "https://api.tempmail.lol/v2"
self.proxy = {"http": proxy, "https": proxy} if proxy else None
self._token = None
self._email = None
def get_email(self) -> MailboxAccount:
import requests
r = requests.post(f"{self.api}/inbox/create",
json={},
proxies=self.proxy, timeout=15)
data = r.json()
email = data.get("address") or data.get("email", "")
if not email:
raise RuntimeError(f"tempmail.lol API 返回空邮箱: {data}")
self._email = email
self._token = data.get("token", "")
print(f"[TempMailLol] 生成邮箱: {self._email}")
return MailboxAccount(email=self._email, account_id=self._token)
def get_current_ids(self, account: MailboxAccount) -> set:
import requests
try:
r = requests.get(f"{self.api}/inbox",
params={"token": account.account_id},
proxies=self.proxy, timeout=10)
return {str(m["id"]) for m in r.json().get("emails", [])}
except Exception:
return set()
def wait_for_code(self, account: MailboxAccount, keyword: str = "",
timeout: int = 120, before_ids: set = None, code_pattern: str = None, **kwargs) -> str:
import re, time, requests
seen = set(before_ids or [])
otp_sent_at = kwargs.get("otp_sent_at")
otp_cutoff = float(otp_sent_at) - 2 if otp_sent_at else None
start = time.time()
while time.time() - start < timeout:
try:
r = requests.get(f"{self.api}/inbox",
params={"token": account.account_id},
proxies=self.proxy, timeout=10)
for mail in sorted(r.json().get("emails", []), key=lambda x: x.get("date", 0), reverse=True):
mid = str(mail.get("id", ""))
if mid in seen:
continue
if otp_sent_at and mail.get("date", 0) / 1000 < otp_sent_at:
continue
seen.add(mid)
text = mail.get("subject", "") + " " + mail.get("body", "") + " " + mail.get("html", "")
if keyword and keyword.lower() not in text.lower():
continue
code = self._safe_extract(text, code_pattern)
if code:
return code
except Exception:
pass
time.sleep(3)
raise TimeoutError(f"等待验证码超时 ({timeout}s)")
class DuckMailMailbox(BaseMailbox):
"""DuckMail 自动生成邮箱(随机创建账号)"""
def __init__(self, api_url: str = "https://www.duckmail.sbs",
provider_url: str = "https://api.duckmail.sbs",
bearer: str = "kevin273945",
proxy: str = None):
self.api = api_url.rstrip("/")
self.provider_url = provider_url
self.bearer = bearer
self.proxy = {"http": proxy, "https": proxy} if proxy else None
self._token = None
self._address = None
def _common_headers(self) -> dict:
return {
"authorization": f"Bearer {self.bearer}",
"content-type": "application/json",
"x-api-provider-base-url": self.provider_url,
}
def get_email(self) -> MailboxAccount:
import requests, random, string
username = "".join(random.choices(string.ascii_lowercase + string.digits, k=10))
password = "Test" + "".join(random.choices(string.digits, k=8)) + "!"
domain = self.provider_url.replace("https://api.", "").replace("https://", "")
address = f"{username}@{domain}"
# 创建账号
r = requests.post(f"{self.api}/api/mail?endpoint=%2Faccounts",
json={"address": address, "password": password},
headers=self._common_headers(), proxies=self.proxy, timeout=15)
data = r.json()
self._address = data.get("address", address)
# 登录获取 token
r2 = requests.post(f"{self.api}/api/mail?endpoint=%2Ftoken",
json={"address": self._address, "password": password},
headers=self._common_headers(), proxies=self.proxy, timeout=15)
self._token = r2.json().get("token", "")
return MailboxAccount(email=self._address, account_id=self._token)
def get_current_ids(self, account: MailboxAccount) -> set:
import requests
try:
r = requests.get(f"{self.api}/api/mail?endpoint=%2Fmessages%3Fpage%3D1",
headers={"authorization": f"Bearer {account.account_id}",
"x-api-provider-base-url": self.provider_url},
proxies=self.proxy, timeout=10)
return {str(m["id"]) for m in r.json().get("hydra:member", [])}
except Exception:
return set()
def wait_for_code(self, account: MailboxAccount, keyword: str = "",
timeout: int = 120, before_ids: set = None, code_pattern: str = None, **kwargs) -> str:
import re, time, requests
seen = set(before_ids or [])
start = time.time()
while time.time() - start < timeout:
try:
r = requests.get(f"{self.api}/api/mail?endpoint=%2Fmessages%3Fpage%3D1",
headers={"authorization": f"Bearer {account.account_id}",
"x-api-provider-base-url": self.provider_url},
proxies=self.proxy, timeout=10)
msgs = r.json().get("hydra:member", [])
for msg in msgs:
mid = str(msg.get("id") or msg.get("msgid") or "")
if mid in seen: continue
seen.add(mid)
# 请求邮件详情获取完整 text
try:
r2 = requests.get(f"{self.api}/api/mail?endpoint=%2Fmessages%2F{mid}",
headers={"authorization": f"Bearer {account.account_id}",
"x-api-provider-base-url": self.provider_url},
proxies=self.proxy, timeout=10)
detail = r2.json()
body = str(detail.get("text") or "") + " " + str(detail.get("subject") or "")
except Exception:
body = str(msg.get("subject") or "")
body = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '', body)
code = self._safe_extract(body, code_pattern)
if code: return code
except Exception:
pass
time.sleep(3)
raise TimeoutError(f"等待验证码超时 ({timeout}s)")
class CFWorkerMailbox(BaseMailbox):
"""Cloudflare Worker 自建临时邮箱服务"""
def __init__(self, api_url: str, admin_token: str = "", domain: str = "",
fingerprint: str = "", proxy: str = None):
self.api = api_url.rstrip("/")
self.admin_token = admin_token
self.domain = domain
self.fingerprint = fingerprint
self.proxy = {"http": proxy, "https": proxy} if proxy else None
self._token = None
def _headers(self) -> dict:
h = {
"accept": "application/json, text/plain, */*",
"content-type": "application/json",
"x-admin-auth": self.admin_token,
}
if self.fingerprint:
h["x-fingerprint"] = self.fingerprint
return h
def _generate_local_part(self) -> str:
import random, string
# 避免纯数字开头,提高邮箱格式“像真人”的程度
prefix = "".join(random.choices(string.ascii_lowercase, k=6))
suffix = "".join(random.choices(string.digits, k=4))
return f"{prefix}{suffix}"
def get_email(self) -> MailboxAccount:
import requests
name = self._generate_local_part()
payload = {"enablePrefix": True, "name": name}
if self.domain:
payload["domain"] = self.domain
r = requests.post(f"{self.api}/admin/new_address",
json=payload, headers=self._headers(),
proxies=self.proxy, timeout=15)
print(f"[CFWorker] new_address status={r.status_code} resp={r.text[:200]}")
data = r.json()
email = data.get("email", data.get("address", ""))
token = data.get("token", data.get("jwt", ""))
self._token = token
print(f"[CFWorker] 生成邮箱: {email} token={token[:40] if token else 'NONE'}...")
return MailboxAccount(email=email, account_id=token)
def _get_mails(self, email: str) -> list:
import requests
r = requests.get(f"{self.api}/admin/mails",
params={"limit": 20, "offset": 0, "address": email},
headers=self._headers(), proxies=self.proxy, timeout=10)
data = r.json()
return data.get("results", data) if isinstance(data, dict) else data
def get_current_ids(self, account: MailboxAccount) -> set:
try:
mails = self._get_mails(account.email)
return {str(m.get("id", "")) for m in mails}
except Exception:
return set()
def wait_for_code(self, account: MailboxAccount, keyword: str = "",
timeout: int = 120, before_ids: set = None, code_pattern: str = None, **kwargs) -> str:
import re
import time
from datetime import datetime, timezone
seen = set(before_ids or [])
exclude_codes = set(kwargs.get("exclude_codes") or [])
otp_sent_at = kwargs.get("otp_sent_at")
otp_cutoff = float(otp_sent_at) - 2 if otp_sent_at else None
start = time.time()
while time.time() - start < timeout:
try:
mails = self._get_mails(account.email)
for mail in sorted(mails, key=lambda x: x.get("id", 0), reverse=True):
mid = str(mail.get("id", ""))
if not mid or mid in seen:
continue
seen.add(mid)
created_at = str(mail.get("created_at", "") or "").strip()
if otp_cutoff and created_at:
try:
mail_ts = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc).timestamp()
if mail_ts < otp_cutoff:
self._log(f"[CFWorker] \u8df3\u8fc7\u65e7\u90ae\u4ef6 id={mid} created_at={created_at}")
continue
except Exception:
pass
raw = str(mail.get("raw", ""))
subject = str(mail.get("subject", ""))
search_text = f"{subject} {self._decode_raw_content(raw)}".strip()
search_text = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '', search_text)
search_text = re.sub(r'm=\+\d+\.\d+', '', search_text)
search_text = re.sub(r'\bt=\d+\b', '', search_text)
if keyword and keyword.lower() not in search_text.lower():
continue
code = self._safe_extract(search_text, code_pattern)
if code and code in exclude_codes:
self._log(f"[CFWorker] \u8df3\u8fc7\u5df2\u7528\u9a8c\u8bc1\u7801 id={mid} created_at={created_at} code={code}")
continue
if code:
self._log(f"[CFWorker] \u547d\u4e2d\u65b0\u9a8c\u8bc1\u7801 id={mid} created_at={created_at} code={code}")
return code
except Exception:
pass
time.sleep(3)
raise TimeoutError(f"\u7b49\u5f85\u9a8c\u8bc1\u7801\u8d85\u65f6 ({timeout}s)")
class MoeMailMailbox(BaseMailbox):
"""MoeMail (sall.cc) 邮箱服务 - 自动注册账号并生成临时邮箱"""
def __init__(self, api_url: str = "https://sall.cc", proxy: str = None):
self.api = api_url.rstrip("/")
self.proxy = {"http": proxy, "https": proxy} if proxy else None
self._session_token = None
self._email = None
def _register_and_login(self) -> str:
import requests, random, string
s = requests.Session()
s.proxies = self.proxy
ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36"
s.headers.update({"user-agent": ua, "origin": self.api, "referer": f"{self.api}/zh-CN/login"})
# 注册
username = "".join(random.choices(string.ascii_lowercase + string.digits, k=12))
password = "Test" + "".join(random.choices(string.digits, k=8)) + "!"
print(f"[MoeMail] 注册账号: {username} / {password}")
r_reg = s.post(f"{self.api}/api/auth/register",
json={"username": username, "password": password, "turnstileToken": ""},
timeout=15)
print(f"[MoeMail] 注册结果: {r_reg.status_code} {r_reg.text[:80]}")
# 获取 CSRF
csrf_r = s.get(f"{self.api}/api/auth/csrf", timeout=10)
csrf = csrf_r.json().get("csrfToken", "")
# 登录
s.post(f"{self.api}/api/auth/callback/credentials",
headers={"content-type": "application/x-www-form-urlencoded"},
data=f"username={username}&password={password}&csrfToken={csrf}&redirect=false&callbackUrl={self.api}",
allow_redirects=True, timeout=15)
self._session = s
for cookie in s.cookies:
if "session-token" in cookie.name:
self._session_token = cookie.value
print(f"[MoeMail] 登录成功")
return cookie.value
print(f"[MoeMail] 登录失败cookies: {[c.name for c in s.cookies]}")
return ""
def get_email(self) -> MailboxAccount:
# 每次调用都重新注册新账号,保证邮箱唯一
self._session_token = None
self._register_and_login()
import random, string
name = "".join(random.choices(string.ascii_letters + string.digits, k=8))
# 获取可用域名列表,随机选一个
domain = "sall.cc"
try:
cfg_r = self._session.get(f"{self.api}/api/config", timeout=10)
domains = [d.strip() for d in cfg_r.json().get("emailDomains", "sall.cc").split(",") if d.strip()]
if domains:
domain = random.choice(domains)
except Exception:
pass
r = self._session.post(f"{self.api}/api/emails/generate",
json={"name": name, "domain": domain, "expiryTime": 86400000},
timeout=15)
data = r.json()
self._email = data.get("email", data.get("address", ""))
email_id = data.get("id", "")
print(f"[MoeMail] 生成邮箱: {self._email} id={email_id} domain={domain} status={r.status_code}")
if not email_id:
print(f"[MoeMail] 生成失败: {data}")
if email_id:
self._email_count = getattr(self, '_email_count', 0) + 1
return MailboxAccount(email=self._email, account_id=str(email_id))
def get_current_ids(self, account: MailboxAccount) -> set:
try:
r = self._session.get(f"{self.api}/api/emails/{account.account_id}", timeout=10)
return {str(m.get("id", "")) for m in r.json().get("messages", [])}
except Exception:
return set()
def wait_for_code(self, account: MailboxAccount, keyword: str = "",
timeout: int = 120, before_ids: set = None,
code_pattern: str = None, **kwargs) -> str:
import re, time
seen = set(before_ids or [])
start = time.time()
pattern = re.compile(code_pattern) if code_pattern else None
while time.time() - start < timeout:
try:
r = self._session.get(f"{self.api}/api/emails/{account.account_id}",
timeout=10)
msgs = r.json().get("messages", [])
for msg in msgs:
mid = str(msg.get("id", ""))
if not mid or mid in seen: continue
seen.add(mid)
body = str(msg.get("content") or msg.get("text") or msg.get("body") or msg.get("html") or "") + " " + str(msg.get("subject") or "")
body = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '', body)
code = self._safe_extract(body, code_pattern)
if code: return code
except Exception:
pass
time.sleep(3)
raise TimeoutError(f"等待验证码超时 ({timeout}s)")
class LuckMailMailbox(BaseMailbox):
"""LuckMail 付费接码平台 - 通过 SDK 创建订单并获取验证码"""
def __init__(self, base_url: str, api_key: str,
project_code: str = "", email_type: str = ""):
if not base_url or not api_key:
raise RuntimeError(
"LuckMail 未配置:请在全局设置中填写 luckmail_base_url 和 luckmail_api_key"
)
from .luckmail import LuckMailClient
self._client = LuckMailClient(
base_url=base_url,
api_key=api_key,
)
self._project_code = project_code
self._email_type = email_type or None
self._order_no = None
def get_email(self) -> MailboxAccount:
try:
body = {"project_code": self._project_code}
if self._email_type:
body["email_type"] = self._email_type
order = self._client.user._sync_create_order(body)
except Exception as e:
raise RuntimeError(f"LuckMail 创建订单失败: {e}") from e
self._order_no = order.order_no
email = order.email_address
self._log(f"[LuckMail] 订单 {order.order_no} 分配邮箱: {email}")
self._log(f"[LuckMail] 超时时间: {order.expired_at}")
return MailboxAccount(email=email, account_id=order.order_no)
def get_current_ids(self, account: MailboxAccount) -> set:
# LuckMail 由服务端管理邮件,本地无需维护 ID 集合
return set()
def wait_for_code(self, account: MailboxAccount, keyword: str = "",
timeout: int = 120, before_ids: set = None,
code_pattern: str = None, **kwargs) -> str:
order_no = account.account_id or self._order_no
if not order_no:
raise RuntimeError("LuckMail 未创建订单,无法等待验证码")
def on_poll(result):
self._log(f"[LuckMail] 轮询中... 状态: {result.status}")
try:
code_result = self._client.user._sync_wait_for_code(
order_no=order_no,
timeout=timeout,
interval=3.0,
on_poll=on_poll,
)
except Exception as e:
raise TimeoutError(f"LuckMail 等待验证码失败: {e}") from e
if code_result.status == "success" and code_result.verification_code:
code = code_result.verification_code
self._log(f"[LuckMail] 收到验证码: {code}")
return code
raise TimeoutError(
f"LuckMail 等待验证码超时 ({timeout}s),最终状态: {code_result.status}"
)
class FreemailMailbox(BaseMailbox):
"""
Freemail 自建邮箱服务(基于 Cloudflare Worker
项目: https://github.com/idinging/freemail
支持管理员令牌或账号密码两种认证方式
"""
def __init__(self, api_url: str, admin_token: str = "",
username: str = "", password: str = "",
proxy: str = None):
self.api = api_url.rstrip("/")
self.admin_token = admin_token
self.username = username
self.password = password
self.proxy = {"http": proxy, "https": proxy} if proxy else None
self._session = None
self._email = None
def _get_session(self):
import requests
s = requests.Session()
s.proxies = self.proxy
if self.admin_token:
s.headers.update({"Authorization": f"Bearer {self.admin_token}"})
elif self.username and self.password:
s.post(f"{self.api}/api/login",
json={"username": self.username, "password": self.password},
timeout=15)
self._session = s
return s
def get_email(self) -> MailboxAccount:
if not self._session:
self._get_session()
import requests
r = self._session.get(f"{self.api}/api/generate", timeout=15)
data = r.json()
email = data.get("email", "")
self._email = email
print(f"[Freemail] 生成邮箱: {email}")
return MailboxAccount(email=email, account_id=email)
def get_current_ids(self, account: MailboxAccount) -> set:
try:
r = self._session.get(f"{self.api}/api/emails",
params={"mailbox": account.email, "limit": 50}, timeout=10)
return {str(m["id"]) for m in r.json() if "id" in m}
except Exception:
return set()
def wait_for_code(self, account: MailboxAccount, keyword: str = "",
timeout: int = 120, before_ids: set = None, code_pattern: str = None, **kwargs) -> str:
import re, time
seen = set(before_ids or [])
start = time.time()
while time.time() - start < timeout:
try:
r = self._session.get(f"{self.api}/api/emails",
params={"mailbox": account.email, "limit": 20}, timeout=10)
for msg in r.json():
mid = str(msg.get("id", ""))
if not mid or mid in seen: continue
seen.add(mid)
# 直接用 verification_code 字段
code = str(msg.get("verification_code") or "")
if code and code != "None":
return code
# 兜底:从 preview 提取
text = str(msg.get("preview", "")) + " " + str(msg.get("subject", ""))
code = self._safe_extract(text, code_pattern)
if code: return code
except Exception:
pass
time.sleep(3)
raise TimeoutError(f"等待验证码超时 ({timeout}s)")