同步CloudMail与Freemail域名配置支持

This commit is contained in:
cong
2026-04-03 18:40:40 +08:00
parent 31d9a71ec7
commit 6bd52b8e0e
8 changed files with 670 additions and 4 deletions

View File

@@ -130,11 +130,12 @@
| MoeMail | `moemail` | 默认常用方案,自动注册账号并生成邮箱 |
| TempMail.lol | `tempmail_lol` | 临时邮箱方案,部分地区可能需要代理 |
| SkyMail (CloudMail) | `skymail` | 通过 API / Token / 域名使用 |
| CloudMail (genToken) | `cloudmail` | 通过管理员邮箱/口令获取 token直接轮询 `emailList` |
| YYDS Mail / MaliAPI | `maliapi` | 支持域名与自动域名策略 |
| GPTMail | `gptmail` | 基于 GPTMail API 生成临时邮箱并轮询邮件,支持已知域名时本地拼装随机地址 |
| OpenTrashMail | `opentrashmail` | 对接自建 OpenTrashMail 服务,支持 `/api/random` 自动取号,也支持配置域名后本地拼装随机地址 |
| DuckMail | `duckmail` | 临时邮箱方案 |
| Freemail | `freemail` | 自建邮箱服务 |
| Freemail | `freemail` | 自建邮箱服务,支持指定域名生成 |
| Laoudo | `laoudo` | 固定邮箱方案 |
| CF Worker | `cfworker` | Cloudflare Worker 自建邮箱 |

View File

@@ -21,11 +21,18 @@ CONFIG_KEYS = [
"freemail_admin_token",
"freemail_username",
"freemail_password",
"freemail_domain",
"moemail_api_url",
"moemail_api_key",
"skymail_api_base",
"skymail_token",
"skymail_domain",
"cloudmail_api_base",
"cloudmail_admin_email",
"cloudmail_admin_password",
"cloudmail_domain",
"cloudmail_subdomain",
"cloudmail_timeout",
"mail_provider",
"maliapi_base_url",
"maliapi_api_key",

View File

@@ -2,6 +2,7 @@
import json
import random
import threading
import time
from abc import ABC, abstractmethod
@@ -223,6 +224,30 @@ def create_mailbox(
domain=extra.get("skymail_domain", ""),
proxy=proxy,
)
elif provider == "cloudmail":
timeout_raw = extra.get("cloudmail_timeout", extra.get("timeout", 30))
try:
timeout_value = int(timeout_raw)
except (TypeError, ValueError):
timeout_value = 30
return CloudMailMailbox(
api_base=extra.get("cloudmail_api_base")
or extra.get("base_url")
or "",
admin_email=extra.get("cloudmail_admin_email")
or extra.get("admin_email")
or "",
admin_password=extra.get("cloudmail_admin_password")
or extra.get("admin_password")
or extra.get("api_key")
or "",
domain=extra.get("cloudmail_domain") or extra.get("domain") or "",
subdomain=extra.get("cloudmail_subdomain")
or extra.get("subdomain")
or "",
timeout=timeout_value,
proxy=proxy,
)
elif provider == "duckmail":
return DuckMailMailbox(
api_url=(extra.get("duckmail_api_url") or "https://www.duckmail.sbs"),
@@ -240,6 +265,7 @@ def create_mailbox(
admin_token=extra.get("freemail_admin_token", ""),
username=extra.get("freemail_username", ""),
password=extra.get("freemail_password", ""),
domain=extra.get("freemail_domain", ""),
proxy=proxy,
)
elif provider == "moemail":
@@ -721,6 +747,339 @@ class SkyMailMailbox(BaseMailbox):
)
class CloudMailMailbox(BaseMailbox):
"""CloudMail 自建邮箱服务genToken + emailList"""
_token_lock = threading.Lock()
_token_cache: dict[str, tuple[str, float]] = {}
_seen_ids_lock = threading.Lock()
_seen_ids: dict[str, set[str]] = {}
def __init__(
self,
api_base: str,
admin_email: str,
admin_password: str,
domain: Any = "",
subdomain: str = "",
timeout: int = 30,
proxy: str = None,
):
self.api = str(api_base or "").rstrip("/")
self.admin_email = str(admin_email or "").strip()
self.admin_password = str(admin_password or "").strip()
self.domain = domain
self.subdomain = str(subdomain or "").strip()
self.timeout = max(int(timeout or 30), 5)
self.proxy = build_requests_proxy_config(proxy)
@staticmethod
def _extract_domain_from_url(url: str) -> str:
from urllib.parse import urlparse
parsed = urlparse(str(url or ""))
host = (parsed.netloc or parsed.path.split("/")[0] or "").strip()
if ":" in host:
host = host.split(":", 1)[0].strip()
return host
@staticmethod
def _normalize_domain(value: str) -> str:
domain = str(value or "").strip().lstrip("@")
if "://" in domain:
domain = CloudMailMailbox._extract_domain_from_url(domain)
return domain.strip()
def _domain_candidates(self) -> list[str]:
candidates: list[str] = []
if isinstance(self.domain, (list, tuple, set)):
iterable = self.domain
else:
raw = str(self.domain or "").strip()
parsed = None
if raw.startswith("[") and raw.endswith("]"):
try:
parsed = json.loads(raw)
except Exception:
parsed = None
if isinstance(parsed, list):
iterable = parsed
elif raw:
normalized = (
raw.replace(";", "\n")
.replace(",", "\n")
.replace("|", "\n")
.splitlines()
)
iterable = [item for item in normalized if item]
else:
iterable = []
for item in iterable:
normalized = self._normalize_domain(item)
if normalized:
candidates.append(normalized)
if not candidates:
inferred = self._normalize_domain(self._extract_domain_from_url(self.api))
if inferred:
candidates.append(inferred)
return candidates
def _resolve_admin_email(self) -> str:
if self.admin_email:
return self.admin_email
domains = self._domain_candidates()
if domains:
return f"admin@{domains[0]}"
return "admin@example.com"
def _cache_key(self) -> str:
return f"{self.api}|{self._resolve_admin_email()}|{self.admin_password}"
def _ensure_config(self) -> None:
if not self.api or not self.admin_password:
raise RuntimeError(
"CloudMail 未配置完整:请设置 cloudmail_api_base 与 cloudmail_admin_password"
)
def _headers(self, token: str = "") -> dict:
headers = {
"accept": "application/json",
"content-type": "application/json",
}
if token:
headers["authorization"] = token
return headers
def _generate_token(self) -> str:
import requests
self._ensure_config()
payload = {
"email": self._resolve_admin_email(),
"password": self.admin_password,
}
r = requests.post(
f"{self.api}/api/public/genToken",
json=payload,
headers=self._headers(),
proxies=self.proxy,
timeout=self.timeout,
)
if r.status_code != 200:
raise RuntimeError(
f"CloudMail 生成 token 失败: {r.status_code} {str(r.text or '')[:200]}"
)
try:
data = r.json()
except Exception:
data = {}
if data.get("code") != 200:
raise RuntimeError(f"CloudMail 生成 token 失败: {data}")
token = ((data.get("data") or {}).get("token") or "").strip()
if not token:
raise RuntimeError("CloudMail 生成 token 失败: 响应未返回 token")
return token
def _get_token(self, *, force_refresh: bool = False) -> str:
cache_key = self._cache_key()
now = time.time()
with CloudMailMailbox._token_lock:
if not force_refresh:
cached = CloudMailMailbox._token_cache.get(cache_key)
if cached and now < cached[1]:
return cached[0]
token = self._generate_token()
CloudMailMailbox._token_cache[cache_key] = (token, now + 3600)
return token
def _list_mails(self, email: str, *, retry_auth: bool = True) -> list:
import requests
token = self._get_token()
payload = {
"toEmail": email,
"timeSort": "desc",
}
r = requests.post(
f"{self.api}/api/public/emailList",
json=payload,
headers=self._headers(token),
proxies=self.proxy,
timeout=self.timeout,
)
if r.status_code == 401 and retry_auth:
token = self._get_token(force_refresh=True)
r = requests.post(
f"{self.api}/api/public/emailList",
json=payload,
headers=self._headers(token),
proxies=self.proxy,
timeout=self.timeout,
)
if r.status_code != 200:
return []
try:
data = r.json()
except Exception:
data = {}
if data.get("code") != 200:
return []
return data.get("data") or []
def _gen_prefix(self) -> str:
import random
import string
first = random.choice(string.ascii_lowercase)
rest = "".join(random.choices(string.ascii_lowercase + string.digits, k=9))
return first + rest
def _build_email(self) -> str:
domains = self._domain_candidates()
if not domains:
raise RuntimeError("CloudMail 未配置可用域名")
domain = random.choice(domains)
if self.subdomain:
domain = f"{self.subdomain}.{domain}"
return f"{self._gen_prefix()}@{domain}"
@staticmethod
def _parse_message_timestamp(message: dict) -> Optional[float]:
from datetime import datetime
keys = [
"time",
"date",
"created",
"createdAt",
"created_at",
"receivedAt",
"received_at",
"sendTime",
"timestamp",
]
for key in keys:
value = message.get(key)
if value in (None, ""):
continue
if isinstance(value, (int, float)):
numeric = float(value)
return numeric / 1000 if numeric > 10_000_000_000 else numeric
text = str(value).strip()
if not text:
continue
try:
numeric = float(text)
return numeric / 1000 if numeric > 10_000_000_000 else numeric
except (TypeError, ValueError):
pass
try:
return datetime.fromisoformat(text.replace("Z", "+00:00")).timestamp()
except ValueError:
continue
return None
@staticmethod
def _mail_id(message: dict, index: int = 0) -> str:
for key in ("emailId", "id", "mailId", "messageId"):
value = message.get(key)
if value not in (None, ""):
return str(value)
digest = (
str(message.get("date") or message.get("time") or "")
+ "|"
+ str(message.get("subject") or "")
)
return f"idx-{index}-{digest}"
def _remember_seen_id(self, email: str, message_id: str) -> None:
with CloudMailMailbox._seen_ids_lock:
CloudMailMailbox._seen_ids.setdefault(email, set()).add(message_id)
def _load_seen_ids(self, email: str) -> set[str]:
with CloudMailMailbox._seen_ids_lock:
return set(CloudMailMailbox._seen_ids.get(email, set()))
def get_email(self) -> MailboxAccount:
self._ensure_config()
email = self._build_email()
self._log(f"[CloudMail] 生成邮箱: {email}")
return MailboxAccount(email=email, account_id=email)
def get_current_ids(self, account: MailboxAccount) -> set:
target = account.account_id or account.email
try:
mails = self._list_mails(target)
return {self._mail_id(msg, idx) for idx, msg in enumerate(mails)}
except Exception:
return set()
def wait_for_code(
self,
account: MailboxAccount,
keyword: str = "",
timeout: int = 120,
before_ids: set = None,
code_pattern: str = None,
**kwargs,
) -> str:
target = account.account_id or account.email
seen = set(before_ids or set())
seen.update(self._load_seen_ids(target))
otp_sent_at = kwargs.get("otp_sent_at")
exclude_codes = {
str(code).strip()
for code in (kwargs.get("exclude_codes") or set())
if str(code or "").strip()
}
def poll_once() -> Optional[str]:
try:
mails = self._list_mails(target)
for idx, msg in enumerate(mails):
mid = self._mail_id(msg, idx)
if mid in seen:
continue
seen.add(mid)
self._remember_seen_id(target, mid)
msg_ts = self._parse_message_timestamp(msg)
if otp_sent_at and msg_ts and msg_ts < float(otp_sent_at):
continue
content = " ".join(
[
str(msg.get("subject") or ""),
str(msg.get("content") or ""),
str(msg.get("text") or ""),
str(msg.get("html") or ""),
]
)
if keyword and keyword.lower() not in content.lower():
continue
code = self._safe_extract(content, code_pattern)
if code and code in exclude_codes:
continue
if code:
self._log(f"[CloudMail] 命中验证码: {code}")
return code
except Exception:
pass
return None
return self._run_polling_wait(
timeout=timeout,
poll_interval=3,
poll_once=poll_once,
)
class DuckMailMailbox(BaseMailbox):
"""DuckMail 自动生成邮箱(随机创建账号)"""
@@ -2376,15 +2735,18 @@ class FreemailMailbox(BaseMailbox):
admin_token: str = "",
username: str = "",
password: str = "",
domain: str = "",
proxy: str = None,
):
self.api = api_url.rstrip("/")
self.admin_token = admin_token
self.username = username
self.password = password
self.domain = str(domain or "").strip().lstrip("@")
self.proxy = build_requests_proxy_config(proxy)
self._session = None
self._email = None
self._domains = None
def _get_session(self):
import requests
@@ -2405,15 +2767,74 @@ class FreemailMailbox(BaseMailbox):
def get_email(self) -> MailboxAccount:
if not self._session:
self._get_session()
import requests
r = self._session.get(f"{self.api}/api/generate", timeout=15)
target_domain = self.domain
domain_index = 0
if target_domain:
domains = self._ensure_domains()
if domains:
lookup = str(target_domain).lower()
for idx, domain in enumerate(domains):
if str(domain or "").strip().lower() == lookup:
domain_index = idx
break
params = {"domainIndex": domain_index} if target_domain else {}
r = self._session.get(f"{self.api}/api/generate", params=params, timeout=15)
data = r.json()
email = data.get("email", "")
email = str(data.get("email", "") or "")
if target_domain and email and "@" in email:
actual_domain = email.split("@", 1)[1].strip().lower()
if actual_domain != target_domain.lower():
self._log(
f"[Freemail] 指定域名 {target_domain} 未命中,实际返回 {actual_domain}"
)
self._email = email
print(f"[Freemail] 生成邮箱: {email}")
return MailboxAccount(email=email, account_id=email)
def _ensure_domains(self) -> list:
if self._domains is not None:
return self._domains
self._domains = []
if not self._session:
self._get_session()
try:
r = self._session.get(f"{self.api}/api/domains", timeout=15)
payload = r.json()
normalized = []
def _append_domain(value):
domain = str(value or "").strip().lstrip("@")
if domain and domain not in normalized:
normalized.append(domain)
if isinstance(payload, list):
for item in payload:
if isinstance(item, dict):
_append_domain(
item.get("domain")
or item.get("name")
or item.get("value")
)
else:
_append_domain(item)
elif isinstance(payload, dict):
candidates = payload.get("domains") or payload.get("data") or []
if isinstance(candidates, list):
for item in candidates:
if isinstance(item, dict):
_append_domain(
item.get("domain")
or item.get("name")
or item.get("value")
)
else:
_append_domain(item)
self._domains = normalized
except Exception:
self._domains = []
return self._domains
def get_current_ids(self, account: MailboxAccount) -> set:
try:
r = self._session.get(

View File

@@ -645,6 +645,12 @@ export default function Accounts() {
skymail_api_base: cfg.skymail_api_base,
skymail_token: cfg.skymail_token,
skymail_domain: cfg.skymail_domain,
cloudmail_api_base: cfg.cloudmail_api_base,
cloudmail_admin_email: cfg.cloudmail_admin_email,
cloudmail_admin_password: cfg.cloudmail_admin_password,
cloudmail_domain: cfg.cloudmail_domain,
cloudmail_subdomain: cfg.cloudmail_subdomain,
cloudmail_timeout: cfg.cloudmail_timeout,
duckmail_address: cfg.duckmail_address,
duckmail_password: cfg.duckmail_password,
duckmail_api_url: cfg.duckmail_api_url,
@@ -654,6 +660,7 @@ export default function Accounts() {
freemail_admin_token: cfg.freemail_admin_token,
freemail_username: cfg.freemail_username,
freemail_password: cfg.freemail_password,
freemail_domain: cfg.freemail_domain,
cfworker_api_url: cfg.cfworker_api_url,
cfworker_admin_token: cfg.cfworker_admin_token,
cfworker_custom_auth: cfg.cfworker_custom_auth,

View File

@@ -48,6 +48,12 @@ export default function RegisterTaskPage() {
skymail_api_base: cfg.skymail_api_base || 'https://api.skymail.ink',
skymail_token: cfg.skymail_token || '',
skymail_domain: cfg.skymail_domain || '',
cloudmail_api_base: cfg.cloudmail_api_base || '',
cloudmail_admin_email: cfg.cloudmail_admin_email || '',
cloudmail_admin_password: cfg.cloudmail_admin_password || '',
cloudmail_domain: cfg.cloudmail_domain || '',
cloudmail_subdomain: cfg.cloudmail_subdomain || '',
cloudmail_timeout: cfg.cloudmail_timeout || 30,
laoudo_auth: cfg.laoudo_auth || '',
laoudo_email: cfg.laoudo_email || '',
laoudo_account_id: cfg.laoudo_account_id || '',
@@ -68,6 +74,7 @@ export default function RegisterTaskPage() {
freemail_admin_token: cfg.freemail_admin_token || '',
freemail_username: cfg.freemail_username || '',
freemail_password: cfg.freemail_password || '',
freemail_domain: cfg.freemail_domain || '',
cfworker_api_url: cfg.cfworker_api_url || '',
cfworker_admin_token: cfg.cfworker_admin_token || '',
cfworker_custom_auth: cfg.cfworker_custom_auth || '',
@@ -111,6 +118,12 @@ export default function RegisterTaskPage() {
skymail_api_base: values.skymail_api_base,
skymail_token: values.skymail_token,
skymail_domain: values.skymail_domain,
cloudmail_api_base: values.cloudmail_api_base,
cloudmail_admin_email: values.cloudmail_admin_email,
cloudmail_admin_password: values.cloudmail_admin_password,
cloudmail_domain: values.cloudmail_domain,
cloudmail_subdomain: values.cloudmail_subdomain,
cloudmail_timeout: values.cloudmail_timeout,
duckmail_api_url: values.duckmail_api_url,
duckmail_provider_url: values.duckmail_provider_url,
duckmail_bearer: values.duckmail_bearer,
@@ -118,6 +131,7 @@ export default function RegisterTaskPage() {
freemail_admin_token: values.freemail_admin_token,
freemail_username: values.freemail_username,
freemail_password: values.freemail_password,
freemail_domain: values.freemail_domain,
cfworker_api_url: values.cfworker_api_url,
cfworker_admin_token: values.cfworker_admin_token,
cfworker_custom_auth: values.cfworker_custom_auth,
@@ -207,6 +221,7 @@ export default function RegisterTaskPage() {
captcha_solver: 'yescaptcha',
mail_provider: 'luckmail',
gptmail_base_url: 'https://mail.chatgpt.org.uk',
cloudmail_timeout: 30,
count: 1,
concurrency: 1,
register_delay_seconds: 0,
@@ -274,6 +289,7 @@ export default function RegisterTaskPage() {
{ value: 'moemail', label: 'MoeMail (sall.cc)' },
{ value: 'tempmail_lol', label: 'TempMail.lol' },
{ value: 'skymail', label: 'SkyMail (CloudMail)' },
{ value: 'cloudmail', label: 'CloudMail (genToken)' },
{ value: 'maliapi', label: 'YYDS Mail / MaliAPI' },
{ value: 'gptmail', label: 'GPTMail' },
{ value: 'opentrashmail', label: 'OpenTrashMail' },
@@ -297,6 +313,28 @@ export default function RegisterTaskPage() {
</Form.Item>
</>
)}
{mailProvider === 'cloudmail' && (
<>
<Form.Item name="cloudmail_api_base" label="API Base" rules={[{ required: true, message: '请输入 CloudMail API 地址' }]}>
<Input placeholder="https://cloudmail.example.com" />
</Form.Item>
<Form.Item name="cloudmail_admin_email" label="管理员邮箱(可选)" extra="留空自动使用 admin@域名">
<Input placeholder="admin@example.com" />
</Form.Item>
<Form.Item name="cloudmail_admin_password" label="管理员密码" rules={[{ required: true, message: '请输入 CloudMail 管理员密码' }]}>
<Input.Password placeholder="admin password" />
</Form.Item>
<Form.Item name="cloudmail_domain" label="邮箱域名(可选)" extra="支持单个域名,或逗号分隔多个域名">
<Input placeholder="mail.example.com,mail2.example.com" />
</Form.Item>
<Form.Item name="cloudmail_subdomain" label="子域名(可选)">
<Input placeholder="pool-a" />
</Form.Item>
<Form.Item name="cloudmail_timeout" label="请求超时秒数">
<InputNumber min={5} max={120} style={{ width: '100%' }} />
</Form.Item>
</>
)}
{mailProvider === 'laoudo' && (
<>
<Form.Item name="laoudo_email" label="邮箱地址">
@@ -403,6 +441,25 @@ export default function RegisterTaskPage() {
</Form.Item>
</>
)}
{mailProvider === 'freemail' && (
<>
<Form.Item name="freemail_api_url" label="API URL" rules={[{ required: true, message: '请输入 Freemail API 地址' }]}>
<Input placeholder="https://mail.example.com" />
</Form.Item>
<Form.Item name="freemail_admin_token" label="管理员令牌(可选)">
<Input.Password placeholder="JWT_TOKEN" />
</Form.Item>
<Form.Item name="freemail_username" label="用户名(可选)">
<Input placeholder="admin" />
</Form.Item>
<Form.Item name="freemail_password" label="密码(可选)">
<Input.Password placeholder="password" />
</Form.Item>
<Form.Item name="freemail_domain" label="邮箱域名(可选)" extra="填写后会优先使用该域名生成邮箱">
<Input placeholder="example.com" />
</Form.Item>
</>
)}
{mailProvider === 'luckmail' && (
<>
<Form.Item name="luckmail_base_url" label="平台地址">

View File

@@ -22,6 +22,7 @@ const SELECT_FIELDS: Record<string, { label: string; value: string }[]> = {
{ label: 'Laoudo固定邮箱', value: 'laoudo' },
{ label: 'TempMail.lol自动生成', value: 'tempmail_lol' },
{ label: 'SkyMailCloudMail 接口)', value: 'skymail' },
{ label: 'CloudMailgenToken 口令模式)', value: 'cloudmail' },
{ label: 'DuckMail自动生成', value: 'duckmail' },
{ label: 'MoeMail (sall.cc)', value: 'moemail' },
{ label: 'YYDS Mail / MaliAPI', value: 'maliapi' },
@@ -95,6 +96,7 @@ const TAB_ITEMS = [
{ key: 'freemail_admin_token', label: '管理员令牌', secret: true },
{ key: 'freemail_username', label: '用户名(可选)' },
{ key: 'freemail_password', label: '密码(可选)', secret: true },
{ key: 'freemail_domain', label: '邮箱域名(可选)', placeholder: 'example.com' },
],
},
{
@@ -114,6 +116,18 @@ const TAB_ITEMS = [
{ key: 'skymail_domain', label: '邮箱域名', placeholder: 'mail.example.com' },
],
},
{
title: 'CloudMail',
desc: 'CloudMail 口令模式genToken + emailList',
fields: [
{ key: 'cloudmail_api_base', label: 'API Base', placeholder: 'https://cloudmail.example.com' },
{ key: 'cloudmail_admin_email', label: '管理员邮箱(可选)', placeholder: 'admin@example.com' },
{ key: 'cloudmail_admin_password', label: '管理员密码', secret: true },
{ key: 'cloudmail_domain', label: '邮箱域名(可选)', placeholder: 'mail.example.com,mail2.example.com' },
{ key: 'cloudmail_subdomain', label: '子域名(可选)', placeholder: 'pool-a' },
{ key: 'cloudmail_timeout', label: '请求超时秒数', placeholder: '30' },
],
},
{
title: 'YYDS Mail / MaliAPI',
desc: '基于 API Key 创建临时邮箱并轮询收件箱消息',
@@ -1071,6 +1085,9 @@ export default function Settings() {
if (!data.luckmail_base_url) {
data.luckmail_base_url = 'https://mails.luckyous.com/'
}
if (!data.cloudmail_timeout) {
data.cloudmail_timeout = 30
}
data.cfworker_domains = parseStoredDomainList(data.cfworker_domains)
data.cfworker_enabled_domains = parseStoredDomainList(data.cfworker_enabled_domains)
data.cfworker_random_subdomain = parseBooleanConfigValue(data.cfworker_random_subdomain)

View File

@@ -0,0 +1,98 @@
import unittest
from unittest import mock
from core.base_mailbox import CloudMailMailbox, MailboxAccount, create_mailbox
class CloudMailMailboxTests(unittest.TestCase):
def setUp(self):
CloudMailMailbox._token_cache.clear()
CloudMailMailbox._seen_ids.clear()
def test_get_email_uses_configured_domain(self):
mailbox = create_mailbox(
"cloudmail",
extra={
"cloudmail_api_base": "https://cloudmail.example.com",
"cloudmail_admin_password": "secret",
"cloudmail_domain": "mail.example.com",
},
)
account = mailbox.get_email()
self.assertTrue(account.email.endswith("@mail.example.com"))
self.assertEqual(account.account_id, account.email)
def test_get_email_supports_legacy_field_names(self):
mailbox = create_mailbox(
"cloudmail",
extra={
"base_url": "https://cloudmail.example.com",
"admin_password": "secret",
"domain": "mail.example.com",
"subdomain": "pool-a",
},
)
account = mailbox.get_email()
self.assertTrue(account.email.endswith("@pool-a.mail.example.com"))
self.assertEqual(account.account_id, account.email)
@mock.patch("requests.post")
def test_wait_for_code_retries_after_auth_failure(self, mock_post):
mock_post.side_effect = [
_json_response({"code": 200, "data": {"token": "tok-1"}}),
_text_response(401, "unauthorized"),
_json_response({"code": 200, "data": {"token": "tok-2"}}),
_json_response(
{
"code": 200,
"data": [
{
"emailId": "m-1",
"toEmail": "demo@example.com",
"subject": "Your verification code is 654321",
"content": "",
}
],
}
),
]
mailbox = create_mailbox(
"cloudmail",
extra={
"cloudmail_api_base": "https://cloudmail.example.com",
"cloudmail_admin_email": "admin@example.com",
"cloudmail_admin_password": "secret",
"cloudmail_domain": "mail.example.com",
},
)
account = MailboxAccount(email="demo@example.com", account_id="demo@example.com")
code = mailbox.wait_for_code(account, timeout=5)
self.assertEqual(code, "654321")
self.assertEqual(mock_post.call_count, 4)
def _json_response(payload: dict, status_code: int = 200):
response = mock.Mock()
response.status_code = status_code
response.text = str(payload)
response.json.return_value = payload
return response
def _text_response(status_code: int, text: str):
response = mock.Mock()
response.status_code = status_code
response.text = text
response.json.side_effect = ValueError("not json")
return response
if __name__ == "__main__":
unittest.main()

View File

@@ -65,6 +65,64 @@ class FreemailMailboxTests(unittest.TestCase):
self.assertEqual(code, "222222")
self.assertEqual(mailbox._session.get.call_count, 2)
def test_get_email_prefers_configured_domain_index(self):
mailbox = create_mailbox(
"freemail",
extra={
"freemail_api_url": "https://freemail.example",
"freemail_domain": "target.example",
},
)
mailbox._session = mock.Mock()
mailbox._session.get.side_effect = [
_response(["fallback.example", "target.example"]),
_response({"email": "demo@target.example"}),
]
account = mailbox.get_email()
self.assertEqual(account.email, "demo@target.example")
self.assertEqual(mailbox._session.get.call_count, 2)
_, kwargs = mailbox._session.get.call_args
self.assertEqual(kwargs.get("params"), {"domainIndex": 1})
def test_get_email_without_domain_does_not_pass_domain_index(self):
mailbox = create_mailbox(
"freemail",
extra={
"freemail_api_url": "https://freemail.example",
},
)
mailbox._session = mock.Mock()
mailbox._session.get.return_value = _response({"email": "demo@random.example"})
account = mailbox.get_email()
self.assertEqual(account.email, "demo@random.example")
self.assertEqual(mailbox._session.get.call_count, 1)
_, kwargs = mailbox._session.get.call_args
self.assertEqual(kwargs.get("params"), {})
def test_get_email_domain_list_supports_object_items(self):
mailbox = create_mailbox(
"freemail",
extra={
"freemail_api_url": "https://freemail.example",
"freemail_domain": "target.example",
},
)
mailbox._session = mock.Mock()
mailbox._session.get.side_effect = [
_response({"domains": [{"domain": "fallback.example"}, {"domain": "target.example"}]}),
_response({"email": "demo@target.example"}),
]
account = mailbox.get_email()
self.assertEqual(account.email, "demo@target.example")
_, kwargs = mailbox._session.get.call_args
self.assertEqual(kwargs.get("params"), {"domainIndex": 1})
def _response(payload):
response = mock.Mock()