diff --git a/README.md b/README.md index 09c8ebe..1a9c238 100644 --- a/README.md +++ b/README.md @@ -130,11 +130,12 @@ | MoeMail | `moemail` | 默认常用方案,自动注册账号并生成邮箱 | | TempMail.lol | `tempmail_lol` | 临时邮箱方案,部分地区可能需要代理 | | SkyMail (CloudMail) | `skymail` | 通过 API / Token / 域名使用 | +| CloudMail (genToken) | `cloudmail` | 通过管理员邮箱/口令获取 token,直接轮询 `emailList` | | YYDS Mail / MaliAPI | `maliapi` | 支持域名与自动域名策略 | | GPTMail | `gptmail` | 基于 GPTMail API 生成临时邮箱并轮询邮件,支持已知域名时本地拼装随机地址 | | OpenTrashMail | `opentrashmail` | 对接自建 OpenTrashMail 服务,支持 `/api/random` 自动取号,也支持配置域名后本地拼装随机地址 | | DuckMail | `duckmail` | 临时邮箱方案 | -| Freemail | `freemail` | 自建邮箱服务 | +| Freemail | `freemail` | 自建邮箱服务,支持指定域名生成 | | Laoudo | `laoudo` | 固定邮箱方案 | | CF Worker | `cfworker` | Cloudflare Worker 自建邮箱 | diff --git a/api/config.py b/api/config.py index 7f4b2c2..59d7de3 100644 --- a/api/config.py +++ b/api/config.py @@ -21,11 +21,18 @@ CONFIG_KEYS = [ "freemail_admin_token", "freemail_username", "freemail_password", + "freemail_domain", "moemail_api_url", "moemail_api_key", "skymail_api_base", "skymail_token", "skymail_domain", + "cloudmail_api_base", + "cloudmail_admin_email", + "cloudmail_admin_password", + "cloudmail_domain", + "cloudmail_subdomain", + "cloudmail_timeout", "mail_provider", "maliapi_base_url", "maliapi_api_key", diff --git a/core/base_mailbox.py b/core/base_mailbox.py index 8823227..742a773 100644 --- a/core/base_mailbox.py +++ b/core/base_mailbox.py @@ -2,6 +2,7 @@ import json import random +import threading import time from abc import ABC, abstractmethod @@ -223,6 +224,30 @@ def create_mailbox( domain=extra.get("skymail_domain", ""), proxy=proxy, ) + elif provider == "cloudmail": + timeout_raw = extra.get("cloudmail_timeout", extra.get("timeout", 30)) + try: + timeout_value = int(timeout_raw) + except (TypeError, ValueError): + timeout_value = 30 + return CloudMailMailbox( + api_base=extra.get("cloudmail_api_base") + or extra.get("base_url") + or "", + admin_email=extra.get("cloudmail_admin_email") + or extra.get("admin_email") + or "", + admin_password=extra.get("cloudmail_admin_password") + or extra.get("admin_password") + or extra.get("api_key") + or "", + domain=extra.get("cloudmail_domain") or extra.get("domain") or "", + subdomain=extra.get("cloudmail_subdomain") + or extra.get("subdomain") + or "", + timeout=timeout_value, + proxy=proxy, + ) elif provider == "duckmail": return DuckMailMailbox( api_url=(extra.get("duckmail_api_url") or "https://www.duckmail.sbs"), @@ -240,6 +265,7 @@ def create_mailbox( admin_token=extra.get("freemail_admin_token", ""), username=extra.get("freemail_username", ""), password=extra.get("freemail_password", ""), + domain=extra.get("freemail_domain", ""), proxy=proxy, ) elif provider == "moemail": @@ -721,6 +747,339 @@ class SkyMailMailbox(BaseMailbox): ) +class CloudMailMailbox(BaseMailbox): + """CloudMail 自建邮箱服务(genToken + emailList)""" + + _token_lock = threading.Lock() + _token_cache: dict[str, tuple[str, float]] = {} + _seen_ids_lock = threading.Lock() + _seen_ids: dict[str, set[str]] = {} + + def __init__( + self, + api_base: str, + admin_email: str, + admin_password: str, + domain: Any = "", + subdomain: str = "", + timeout: int = 30, + proxy: str = None, + ): + self.api = str(api_base or "").rstrip("/") + self.admin_email = str(admin_email or "").strip() + self.admin_password = str(admin_password or "").strip() + self.domain = domain + self.subdomain = str(subdomain or "").strip() + self.timeout = max(int(timeout or 30), 5) + self.proxy = build_requests_proxy_config(proxy) + + @staticmethod + def _extract_domain_from_url(url: str) -> str: + from urllib.parse import urlparse + + parsed = urlparse(str(url or "")) + host = (parsed.netloc or parsed.path.split("/")[0] or "").strip() + if ":" in host: + host = host.split(":", 1)[0].strip() + return host + + @staticmethod + def _normalize_domain(value: str) -> str: + domain = str(value or "").strip().lstrip("@") + if "://" in domain: + domain = CloudMailMailbox._extract_domain_from_url(domain) + return domain.strip() + + def _domain_candidates(self) -> list[str]: + candidates: list[str] = [] + + if isinstance(self.domain, (list, tuple, set)): + iterable = self.domain + else: + raw = str(self.domain or "").strip() + parsed = None + if raw.startswith("[") and raw.endswith("]"): + try: + parsed = json.loads(raw) + except Exception: + parsed = None + if isinstance(parsed, list): + iterable = parsed + elif raw: + normalized = ( + raw.replace(";", "\n") + .replace(",", "\n") + .replace("|", "\n") + .splitlines() + ) + iterable = [item for item in normalized if item] + else: + iterable = [] + + for item in iterable: + normalized = self._normalize_domain(item) + if normalized: + candidates.append(normalized) + + if not candidates: + inferred = self._normalize_domain(self._extract_domain_from_url(self.api)) + if inferred: + candidates.append(inferred) + return candidates + + def _resolve_admin_email(self) -> str: + if self.admin_email: + return self.admin_email + domains = self._domain_candidates() + if domains: + return f"admin@{domains[0]}" + return "admin@example.com" + + def _cache_key(self) -> str: + return f"{self.api}|{self._resolve_admin_email()}|{self.admin_password}" + + def _ensure_config(self) -> None: + if not self.api or not self.admin_password: + raise RuntimeError( + "CloudMail 未配置完整:请设置 cloudmail_api_base 与 cloudmail_admin_password" + ) + + def _headers(self, token: str = "") -> dict: + headers = { + "accept": "application/json", + "content-type": "application/json", + } + if token: + headers["authorization"] = token + return headers + + def _generate_token(self) -> str: + import requests + + self._ensure_config() + payload = { + "email": self._resolve_admin_email(), + "password": self.admin_password, + } + r = requests.post( + f"{self.api}/api/public/genToken", + json=payload, + headers=self._headers(), + proxies=self.proxy, + timeout=self.timeout, + ) + if r.status_code != 200: + raise RuntimeError( + f"CloudMail 生成 token 失败: {r.status_code} {str(r.text or '')[:200]}" + ) + + try: + data = r.json() + except Exception: + data = {} + if data.get("code") != 200: + raise RuntimeError(f"CloudMail 生成 token 失败: {data}") + token = ((data.get("data") or {}).get("token") or "").strip() + if not token: + raise RuntimeError("CloudMail 生成 token 失败: 响应未返回 token") + return token + + def _get_token(self, *, force_refresh: bool = False) -> str: + cache_key = self._cache_key() + now = time.time() + with CloudMailMailbox._token_lock: + if not force_refresh: + cached = CloudMailMailbox._token_cache.get(cache_key) + if cached and now < cached[1]: + return cached[0] + + token = self._generate_token() + CloudMailMailbox._token_cache[cache_key] = (token, now + 3600) + return token + + def _list_mails(self, email: str, *, retry_auth: bool = True) -> list: + import requests + + token = self._get_token() + payload = { + "toEmail": email, + "timeSort": "desc", + } + r = requests.post( + f"{self.api}/api/public/emailList", + json=payload, + headers=self._headers(token), + proxies=self.proxy, + timeout=self.timeout, + ) + if r.status_code == 401 and retry_auth: + token = self._get_token(force_refresh=True) + r = requests.post( + f"{self.api}/api/public/emailList", + json=payload, + headers=self._headers(token), + proxies=self.proxy, + timeout=self.timeout, + ) + if r.status_code != 200: + return [] + + try: + data = r.json() + except Exception: + data = {} + if data.get("code") != 200: + return [] + return data.get("data") or [] + + def _gen_prefix(self) -> str: + import random + import string + + first = random.choice(string.ascii_lowercase) + rest = "".join(random.choices(string.ascii_lowercase + string.digits, k=9)) + return first + rest + + def _build_email(self) -> str: + domains = self._domain_candidates() + if not domains: + raise RuntimeError("CloudMail 未配置可用域名") + domain = random.choice(domains) + if self.subdomain: + domain = f"{self.subdomain}.{domain}" + return f"{self._gen_prefix()}@{domain}" + + @staticmethod + def _parse_message_timestamp(message: dict) -> Optional[float]: + from datetime import datetime + + keys = [ + "time", + "date", + "created", + "createdAt", + "created_at", + "receivedAt", + "received_at", + "sendTime", + "timestamp", + ] + for key in keys: + value = message.get(key) + if value in (None, ""): + continue + if isinstance(value, (int, float)): + numeric = float(value) + return numeric / 1000 if numeric > 10_000_000_000 else numeric + text = str(value).strip() + if not text: + continue + try: + numeric = float(text) + return numeric / 1000 if numeric > 10_000_000_000 else numeric + except (TypeError, ValueError): + pass + try: + return datetime.fromisoformat(text.replace("Z", "+00:00")).timestamp() + except ValueError: + continue + return None + + @staticmethod + def _mail_id(message: dict, index: int = 0) -> str: + for key in ("emailId", "id", "mailId", "messageId"): + value = message.get(key) + if value not in (None, ""): + return str(value) + digest = ( + str(message.get("date") or message.get("time") or "") + + "|" + + str(message.get("subject") or "") + ) + return f"idx-{index}-{digest}" + + def _remember_seen_id(self, email: str, message_id: str) -> None: + with CloudMailMailbox._seen_ids_lock: + CloudMailMailbox._seen_ids.setdefault(email, set()).add(message_id) + + def _load_seen_ids(self, email: str) -> set[str]: + with CloudMailMailbox._seen_ids_lock: + return set(CloudMailMailbox._seen_ids.get(email, set())) + + def get_email(self) -> MailboxAccount: + self._ensure_config() + email = self._build_email() + self._log(f"[CloudMail] 生成邮箱: {email}") + return MailboxAccount(email=email, account_id=email) + + def get_current_ids(self, account: MailboxAccount) -> set: + target = account.account_id or account.email + try: + mails = self._list_mails(target) + return {self._mail_id(msg, idx) for idx, msg in enumerate(mails)} + except Exception: + return set() + + def wait_for_code( + self, + account: MailboxAccount, + keyword: str = "", + timeout: int = 120, + before_ids: set = None, + code_pattern: str = None, + **kwargs, + ) -> str: + target = account.account_id or account.email + seen = set(before_ids or set()) + seen.update(self._load_seen_ids(target)) + otp_sent_at = kwargs.get("otp_sent_at") + exclude_codes = { + str(code).strip() + for code in (kwargs.get("exclude_codes") or set()) + if str(code or "").strip() + } + + def poll_once() -> Optional[str]: + try: + mails = self._list_mails(target) + for idx, msg in enumerate(mails): + mid = self._mail_id(msg, idx) + if mid in seen: + continue + seen.add(mid) + self._remember_seen_id(target, mid) + + msg_ts = self._parse_message_timestamp(msg) + if otp_sent_at and msg_ts and msg_ts < float(otp_sent_at): + continue + + content = " ".join( + [ + str(msg.get("subject") or ""), + str(msg.get("content") or ""), + str(msg.get("text") or ""), + str(msg.get("html") or ""), + ] + ) + if keyword and keyword.lower() not in content.lower(): + continue + code = self._safe_extract(content, code_pattern) + if code and code in exclude_codes: + continue + if code: + self._log(f"[CloudMail] 命中验证码: {code}") + return code + except Exception: + pass + return None + + return self._run_polling_wait( + timeout=timeout, + poll_interval=3, + poll_once=poll_once, + ) + + class DuckMailMailbox(BaseMailbox): """DuckMail 自动生成邮箱(随机创建账号)""" @@ -2376,15 +2735,18 @@ class FreemailMailbox(BaseMailbox): admin_token: str = "", username: str = "", password: str = "", + domain: str = "", proxy: str = None, ): self.api = api_url.rstrip("/") self.admin_token = admin_token self.username = username self.password = password + self.domain = str(domain or "").strip().lstrip("@") self.proxy = build_requests_proxy_config(proxy) self._session = None self._email = None + self._domains = None def _get_session(self): import requests @@ -2405,15 +2767,74 @@ class FreemailMailbox(BaseMailbox): def get_email(self) -> MailboxAccount: if not self._session: self._get_session() - import requests - r = self._session.get(f"{self.api}/api/generate", timeout=15) + target_domain = self.domain + domain_index = 0 + if target_domain: + domains = self._ensure_domains() + if domains: + lookup = str(target_domain).lower() + for idx, domain in enumerate(domains): + if str(domain or "").strip().lower() == lookup: + domain_index = idx + break + + params = {"domainIndex": domain_index} if target_domain else {} + r = self._session.get(f"{self.api}/api/generate", params=params, timeout=15) data = r.json() - email = data.get("email", "") + email = str(data.get("email", "") or "") + if target_domain and email and "@" in email: + actual_domain = email.split("@", 1)[1].strip().lower() + if actual_domain != target_domain.lower(): + self._log( + f"[Freemail] 指定域名 {target_domain} 未命中,实际返回 {actual_domain}" + ) + self._email = email print(f"[Freemail] 生成邮箱: {email}") return MailboxAccount(email=email, account_id=email) + def _ensure_domains(self) -> list: + if self._domains is not None: + return self._domains + self._domains = [] + if not self._session: + self._get_session() + try: + r = self._session.get(f"{self.api}/api/domains", timeout=15) + payload = r.json() + normalized = [] + def _append_domain(value): + domain = str(value or "").strip().lstrip("@") + if domain and domain not in normalized: + normalized.append(domain) + if isinstance(payload, list): + for item in payload: + if isinstance(item, dict): + _append_domain( + item.get("domain") + or item.get("name") + or item.get("value") + ) + else: + _append_domain(item) + elif isinstance(payload, dict): + candidates = payload.get("domains") or payload.get("data") or [] + if isinstance(candidates, list): + for item in candidates: + if isinstance(item, dict): + _append_domain( + item.get("domain") + or item.get("name") + or item.get("value") + ) + else: + _append_domain(item) + self._domains = normalized + except Exception: + self._domains = [] + return self._domains + def get_current_ids(self, account: MailboxAccount) -> set: try: r = self._session.get( diff --git a/frontend/src/pages/Accounts.tsx b/frontend/src/pages/Accounts.tsx index e021f70..40a1abe 100644 --- a/frontend/src/pages/Accounts.tsx +++ b/frontend/src/pages/Accounts.tsx @@ -645,6 +645,12 @@ export default function Accounts() { skymail_api_base: cfg.skymail_api_base, skymail_token: cfg.skymail_token, skymail_domain: cfg.skymail_domain, + cloudmail_api_base: cfg.cloudmail_api_base, + cloudmail_admin_email: cfg.cloudmail_admin_email, + cloudmail_admin_password: cfg.cloudmail_admin_password, + cloudmail_domain: cfg.cloudmail_domain, + cloudmail_subdomain: cfg.cloudmail_subdomain, + cloudmail_timeout: cfg.cloudmail_timeout, duckmail_address: cfg.duckmail_address, duckmail_password: cfg.duckmail_password, duckmail_api_url: cfg.duckmail_api_url, @@ -654,6 +660,7 @@ export default function Accounts() { freemail_admin_token: cfg.freemail_admin_token, freemail_username: cfg.freemail_username, freemail_password: cfg.freemail_password, + freemail_domain: cfg.freemail_domain, cfworker_api_url: cfg.cfworker_api_url, cfworker_admin_token: cfg.cfworker_admin_token, cfworker_custom_auth: cfg.cfworker_custom_auth, diff --git a/frontend/src/pages/RegisterTaskPage.tsx b/frontend/src/pages/RegisterTaskPage.tsx index adece51..c631b57 100644 --- a/frontend/src/pages/RegisterTaskPage.tsx +++ b/frontend/src/pages/RegisterTaskPage.tsx @@ -48,6 +48,12 @@ export default function RegisterTaskPage() { skymail_api_base: cfg.skymail_api_base || 'https://api.skymail.ink', skymail_token: cfg.skymail_token || '', skymail_domain: cfg.skymail_domain || '', + cloudmail_api_base: cfg.cloudmail_api_base || '', + cloudmail_admin_email: cfg.cloudmail_admin_email || '', + cloudmail_admin_password: cfg.cloudmail_admin_password || '', + cloudmail_domain: cfg.cloudmail_domain || '', + cloudmail_subdomain: cfg.cloudmail_subdomain || '', + cloudmail_timeout: cfg.cloudmail_timeout || 30, laoudo_auth: cfg.laoudo_auth || '', laoudo_email: cfg.laoudo_email || '', laoudo_account_id: cfg.laoudo_account_id || '', @@ -68,6 +74,7 @@ export default function RegisterTaskPage() { freemail_admin_token: cfg.freemail_admin_token || '', freemail_username: cfg.freemail_username || '', freemail_password: cfg.freemail_password || '', + freemail_domain: cfg.freemail_domain || '', cfworker_api_url: cfg.cfworker_api_url || '', cfworker_admin_token: cfg.cfworker_admin_token || '', cfworker_custom_auth: cfg.cfworker_custom_auth || '', @@ -111,6 +118,12 @@ export default function RegisterTaskPage() { skymail_api_base: values.skymail_api_base, skymail_token: values.skymail_token, skymail_domain: values.skymail_domain, + cloudmail_api_base: values.cloudmail_api_base, + cloudmail_admin_email: values.cloudmail_admin_email, + cloudmail_admin_password: values.cloudmail_admin_password, + cloudmail_domain: values.cloudmail_domain, + cloudmail_subdomain: values.cloudmail_subdomain, + cloudmail_timeout: values.cloudmail_timeout, duckmail_api_url: values.duckmail_api_url, duckmail_provider_url: values.duckmail_provider_url, duckmail_bearer: values.duckmail_bearer, @@ -118,6 +131,7 @@ export default function RegisterTaskPage() { freemail_admin_token: values.freemail_admin_token, freemail_username: values.freemail_username, freemail_password: values.freemail_password, + freemail_domain: values.freemail_domain, cfworker_api_url: values.cfworker_api_url, cfworker_admin_token: values.cfworker_admin_token, cfworker_custom_auth: values.cfworker_custom_auth, @@ -207,6 +221,7 @@ export default function RegisterTaskPage() { captcha_solver: 'yescaptcha', mail_provider: 'luckmail', gptmail_base_url: 'https://mail.chatgpt.org.uk', + cloudmail_timeout: 30, count: 1, concurrency: 1, register_delay_seconds: 0, @@ -274,6 +289,7 @@ export default function RegisterTaskPage() { { value: 'moemail', label: 'MoeMail (sall.cc)' }, { value: 'tempmail_lol', label: 'TempMail.lol' }, { value: 'skymail', label: 'SkyMail (CloudMail)' }, + { value: 'cloudmail', label: 'CloudMail (genToken)' }, { value: 'maliapi', label: 'YYDS Mail / MaliAPI' }, { value: 'gptmail', label: 'GPTMail' }, { value: 'opentrashmail', label: 'OpenTrashMail' }, @@ -297,6 +313,28 @@ export default function RegisterTaskPage() { )} + {mailProvider === 'cloudmail' && ( + <> + + + + + + + + + + + + + + + + + + + + )} {mailProvider === 'laoudo' && ( <> @@ -403,6 +441,25 @@ export default function RegisterTaskPage() { )} + {mailProvider === 'freemail' && ( + <> + + + + + + + + + + + + + + + + + )} {mailProvider === 'luckmail' && ( <> diff --git a/frontend/src/pages/Settings.tsx b/frontend/src/pages/Settings.tsx index f482241..3b20a65 100644 --- a/frontend/src/pages/Settings.tsx +++ b/frontend/src/pages/Settings.tsx @@ -22,6 +22,7 @@ const SELECT_FIELDS: Record = { { label: 'Laoudo(固定邮箱)', value: 'laoudo' }, { label: 'TempMail.lol(自动生成)', value: 'tempmail_lol' }, { label: 'SkyMail(CloudMail 接口)', value: 'skymail' }, + { label: 'CloudMail(genToken 口令模式)', value: 'cloudmail' }, { label: 'DuckMail(自动生成)', value: 'duckmail' }, { label: 'MoeMail (sall.cc)', value: 'moemail' }, { label: 'YYDS Mail / MaliAPI', value: 'maliapi' }, @@ -95,6 +96,7 @@ const TAB_ITEMS = [ { key: 'freemail_admin_token', label: '管理员令牌', secret: true }, { key: 'freemail_username', label: '用户名(可选)' }, { key: 'freemail_password', label: '密码(可选)', secret: true }, + { key: 'freemail_domain', label: '邮箱域名(可选)', placeholder: 'example.com' }, ], }, { @@ -114,6 +116,18 @@ const TAB_ITEMS = [ { key: 'skymail_domain', label: '邮箱域名', placeholder: 'mail.example.com' }, ], }, + { + title: 'CloudMail', + desc: 'CloudMail 口令模式(genToken + emailList)', + fields: [ + { key: 'cloudmail_api_base', label: 'API Base', placeholder: 'https://cloudmail.example.com' }, + { key: 'cloudmail_admin_email', label: '管理员邮箱(可选)', placeholder: 'admin@example.com' }, + { key: 'cloudmail_admin_password', label: '管理员密码', secret: true }, + { key: 'cloudmail_domain', label: '邮箱域名(可选)', placeholder: 'mail.example.com,mail2.example.com' }, + { key: 'cloudmail_subdomain', label: '子域名(可选)', placeholder: 'pool-a' }, + { key: 'cloudmail_timeout', label: '请求超时秒数', placeholder: '30' }, + ], + }, { title: 'YYDS Mail / MaliAPI', desc: '基于 API Key 创建临时邮箱并轮询收件箱消息', @@ -1071,6 +1085,9 @@ export default function Settings() { if (!data.luckmail_base_url) { data.luckmail_base_url = 'https://mails.luckyous.com/' } + if (!data.cloudmail_timeout) { + data.cloudmail_timeout = 30 + } data.cfworker_domains = parseStoredDomainList(data.cfworker_domains) data.cfworker_enabled_domains = parseStoredDomainList(data.cfworker_enabled_domains) data.cfworker_random_subdomain = parseBooleanConfigValue(data.cfworker_random_subdomain) diff --git a/tests/test_cloudmail_mailbox.py b/tests/test_cloudmail_mailbox.py new file mode 100644 index 0000000..91c14ab --- /dev/null +++ b/tests/test_cloudmail_mailbox.py @@ -0,0 +1,98 @@ +import unittest +from unittest import mock + +from core.base_mailbox import CloudMailMailbox, MailboxAccount, create_mailbox + + +class CloudMailMailboxTests(unittest.TestCase): + def setUp(self): + CloudMailMailbox._token_cache.clear() + CloudMailMailbox._seen_ids.clear() + + def test_get_email_uses_configured_domain(self): + mailbox = create_mailbox( + "cloudmail", + extra={ + "cloudmail_api_base": "https://cloudmail.example.com", + "cloudmail_admin_password": "secret", + "cloudmail_domain": "mail.example.com", + }, + ) + + account = mailbox.get_email() + + self.assertTrue(account.email.endswith("@mail.example.com")) + self.assertEqual(account.account_id, account.email) + + def test_get_email_supports_legacy_field_names(self): + mailbox = create_mailbox( + "cloudmail", + extra={ + "base_url": "https://cloudmail.example.com", + "admin_password": "secret", + "domain": "mail.example.com", + "subdomain": "pool-a", + }, + ) + + account = mailbox.get_email() + + self.assertTrue(account.email.endswith("@pool-a.mail.example.com")) + self.assertEqual(account.account_id, account.email) + + @mock.patch("requests.post") + def test_wait_for_code_retries_after_auth_failure(self, mock_post): + mock_post.side_effect = [ + _json_response({"code": 200, "data": {"token": "tok-1"}}), + _text_response(401, "unauthorized"), + _json_response({"code": 200, "data": {"token": "tok-2"}}), + _json_response( + { + "code": 200, + "data": [ + { + "emailId": "m-1", + "toEmail": "demo@example.com", + "subject": "Your verification code is 654321", + "content": "", + } + ], + } + ), + ] + + mailbox = create_mailbox( + "cloudmail", + extra={ + "cloudmail_api_base": "https://cloudmail.example.com", + "cloudmail_admin_email": "admin@example.com", + "cloudmail_admin_password": "secret", + "cloudmail_domain": "mail.example.com", + }, + ) + account = MailboxAccount(email="demo@example.com", account_id="demo@example.com") + + code = mailbox.wait_for_code(account, timeout=5) + + self.assertEqual(code, "654321") + self.assertEqual(mock_post.call_count, 4) + + +def _json_response(payload: dict, status_code: int = 200): + response = mock.Mock() + response.status_code = status_code + response.text = str(payload) + response.json.return_value = payload + return response + + +def _text_response(status_code: int, text: str): + response = mock.Mock() + response.status_code = status_code + response.text = text + response.json.side_effect = ValueError("not json") + return response + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_freemail_mailbox.py b/tests/test_freemail_mailbox.py index 0f42d5f..44f6073 100644 --- a/tests/test_freemail_mailbox.py +++ b/tests/test_freemail_mailbox.py @@ -65,6 +65,64 @@ class FreemailMailboxTests(unittest.TestCase): self.assertEqual(code, "222222") self.assertEqual(mailbox._session.get.call_count, 2) + def test_get_email_prefers_configured_domain_index(self): + mailbox = create_mailbox( + "freemail", + extra={ + "freemail_api_url": "https://freemail.example", + "freemail_domain": "target.example", + }, + ) + mailbox._session = mock.Mock() + mailbox._session.get.side_effect = [ + _response(["fallback.example", "target.example"]), + _response({"email": "demo@target.example"}), + ] + + account = mailbox.get_email() + + self.assertEqual(account.email, "demo@target.example") + self.assertEqual(mailbox._session.get.call_count, 2) + _, kwargs = mailbox._session.get.call_args + self.assertEqual(kwargs.get("params"), {"domainIndex": 1}) + + def test_get_email_without_domain_does_not_pass_domain_index(self): + mailbox = create_mailbox( + "freemail", + extra={ + "freemail_api_url": "https://freemail.example", + }, + ) + mailbox._session = mock.Mock() + mailbox._session.get.return_value = _response({"email": "demo@random.example"}) + + account = mailbox.get_email() + + self.assertEqual(account.email, "demo@random.example") + self.assertEqual(mailbox._session.get.call_count, 1) + _, kwargs = mailbox._session.get.call_args + self.assertEqual(kwargs.get("params"), {}) + + def test_get_email_domain_list_supports_object_items(self): + mailbox = create_mailbox( + "freemail", + extra={ + "freemail_api_url": "https://freemail.example", + "freemail_domain": "target.example", + }, + ) + mailbox._session = mock.Mock() + mailbox._session.get.side_effect = [ + _response({"domains": [{"domain": "fallback.example"}, {"domain": "target.example"}]}), + _response({"email": "demo@target.example"}), + ] + + account = mailbox.get_email() + + self.assertEqual(account.email, "demo@target.example") + _, kwargs = mailbox._session.get.call_args + self.assertEqual(kwargs.get("params"), {"domainIndex": 1}) + def _response(payload): response = mock.Mock()