diff --git a/README.md b/README.md index a7f527d..09c8ebe 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,7 @@ | SkyMail (CloudMail) | `skymail` | 通过 API / Token / 域名使用 | | YYDS Mail / MaliAPI | `maliapi` | 支持域名与自动域名策略 | | GPTMail | `gptmail` | 基于 GPTMail API 生成临时邮箱并轮询邮件,支持已知域名时本地拼装随机地址 | +| OpenTrashMail | `opentrashmail` | 对接自建 OpenTrashMail 服务,支持 `/api/random` 自动取号,也支持配置域名后本地拼装随机地址 | | DuckMail | `duckmail` | 临时邮箱方案 | | Freemail | `freemail` | 自建邮箱服务 | | Laoudo | `laoudo` | 固定邮箱方案 | diff --git a/api/config.py b/api/config.py index 605cb20..e0ce319 100644 --- a/api/config.py +++ b/api/config.py @@ -33,6 +33,9 @@ CONFIG_KEYS = [ "gptmail_base_url", "gptmail_api_key", "gptmail_domain", + "opentrashmail_api_url", + "opentrashmail_domain", + "opentrashmail_password", "cfworker_api_url", "cfworker_admin_token", "cfworker_custom_auth", diff --git a/core/base_mailbox.py b/core/base_mailbox.py index 8cb579e..8387f82 100644 --- a/core/base_mailbox.py +++ b/core/base_mailbox.py @@ -262,6 +262,13 @@ def create_mailbox( domain=extra.get("gptmail_domain", ""), proxy=proxy, ) + elif provider == "opentrashmail": + return OpenTrashMailMailbox( + api_url=extra.get("opentrashmail_api_url", ""), + domain=extra.get("opentrashmail_domain", ""), + password=extra.get("opentrashmail_password", ""), + proxy=proxy, + ) elif provider == "cfworker": return CFWorkerMailbox( api_url=extra.get("cfworker_api_url", ""), @@ -1305,6 +1312,272 @@ class GPTMailMailbox(BaseMailbox): ) +class OpenTrashMailMailbox(BaseMailbox): + """OpenTrashMail 临时邮箱服务""" + + def __init__( + self, + api_url: str = "", + domain: str = "", + password: str = "", + proxy: str = None, + ): + self.api = str(api_url or "").strip().rstrip("/") + self.domain = self._normalize_domain(domain) + self.password = str(password or "").strip() + self.proxy = build_requests_proxy_config(proxy) + + @staticmethod + def _normalize_domain(value: Any) -> str: + domain = str(value or "").strip().lower() + if domain.startswith("@"): + domain = domain[1:] + return domain + + @staticmethod + def _generate_local_part() -> str: + import string + + prefix = "".join(random.choices(string.ascii_lowercase, k=8)) + suffix = "".join(random.choices(string.digits, k=2)) + return f"{prefix}{suffix}" + + def _headers(self) -> dict[str, str]: + return {"accept": "application/json, text/plain, */*"} + + def _request( + self, + method: str, + path: str, + *, + params: dict | None = None, + timeout: int = 15, + ): + import requests + + request_params = dict(params or {}) + if self.password and "password" not in request_params: + request_params["password"] = self.password + + return requests.request( + method, + f"{self.api}{path}", + params=request_params or None, + json=None, + headers=self._headers(), + proxies=self.proxy, + timeout=timeout, + ) + + def _require_api(self) -> None: + if not self.api: + raise RuntimeError( + "OpenTrashMail 未配置 API URL,请检查 opentrashmail_api_url" + ) + + def _build_email_path(self, email: str) -> str: + from urllib.parse import quote + + return quote(str(email or "").strip(), safe="@") + + def _parse_random_email(self, html_text: str) -> str: + import re + + text = str(html_text or "") + if not text: + return "" + + match = re.search(r"/address/([^\"'<>\s]+@[^\"'<>\s]+)", text, re.IGNORECASE) + if match: + return str(match.group(1) or "").strip() + + match = re.search( + r"([a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,})", + text, + re.IGNORECASE, + ) + if match: + return str(match.group(1) or "").strip() + return "" + + def _list_messages(self, email: str) -> list[dict[str, Any]]: + self._require_api() + response = self._request( + "GET", + f"/json/{self._build_email_path(email)}", + timeout=10, + ) + if response.status_code == 404: + return [] + try: + payload = response.json() + except Exception as exc: + preview = (response.text or "")[:200] + raise RuntimeError( + f"OpenTrashMail 收件箱返回非 JSON: HTTP {response.status_code} {preview}" + ) from exc + + if response.status_code >= 400: + if isinstance(payload, dict) and payload.get("error"): + error = payload.get("error") + else: + error = response.text or f"HTTP {response.status_code}" + raise RuntimeError(f"OpenTrashMail 收件箱查询失败: {str(error).strip()}") + + if not payload: + return [] + + messages: list[dict[str, Any]] = [] + if isinstance(payload, dict): + for message_id, item in payload.items(): + if not isinstance(item, dict): + continue + message = dict(item) + message.setdefault("id", str(message_id)) + messages.append(message) + elif isinstance(payload, list): + for item in payload: + if isinstance(item, dict): + messages.append(item) + return messages + + def _get_message_detail(self, email: str, message_id: str) -> dict[str, Any]: + self._require_api() + response = self._request( + "GET", + f"/json/{self._build_email_path(email)}/{message_id}", + timeout=10, + ) + if response.status_code == 404: + return {} + try: + payload = response.json() + except Exception as exc: + preview = (response.text or "")[:200] + raise RuntimeError( + f"OpenTrashMail 邮件详情返回非 JSON: HTTP {response.status_code} {preview}" + ) from exc + + if response.status_code >= 400: + if isinstance(payload, dict) and payload.get("error"): + error = payload.get("error") + else: + error = response.text or f"HTTP {response.status_code}" + raise RuntimeError(f"OpenTrashMail 邮件详情查询失败: {str(error).strip()}") + + return payload if isinstance(payload, dict) else {} + + def get_email(self) -> MailboxAccount: + if self.domain: + email = f"{self._generate_local_part()}@{self.domain}" + self._log(f"[OpenTrashMail] 本地拼装邮箱: {email}") + return MailboxAccount( + email=email, + account_id=email, + extra={ + "provider": "opentrashmail", + "domain": self.domain, + "local_address": True, + }, + ) + + self._require_api() + response = self._request("GET", "/api/random", timeout=15) + if response.status_code >= 400: + raise RuntimeError( + f"OpenTrashMail 随机邮箱生成失败: HTTP {response.status_code}" + ) + + email = self._parse_random_email(response.text) + if not email: + preview = (response.text or "")[:200] + raise RuntimeError(f"OpenTrashMail 未能解析随机邮箱: {preview}") + + self._log(f"[OpenTrashMail] 生成邮箱: {email}") + return MailboxAccount( + email=email, + account_id=email, + extra={"provider": "opentrashmail"}, + ) + + def get_current_ids(self, account: MailboxAccount) -> set: + try: + return { + str(message.get("id")) + for message in self._list_messages(account.email) + if message.get("id") is not None + } + except Exception: + return set() + + def wait_for_code( + self, + account: MailboxAccount, + keyword: str = "", + timeout: int = 120, + before_ids: set = None, + code_pattern: str = None, + **kwargs, + ) -> str: + import re + + seen = {str(mid) for mid in (before_ids or set())} + exclude_codes = { + str(code) for code in (kwargs.get("exclude_codes") or set()) if code + } + + def poll_once() -> Optional[str]: + try: + messages = self._list_messages(account.email) + for message in messages: + message_id = str(message.get("id") or "").strip() + if not message_id or message_id in seen: + continue + seen.add(message_id) + + detail = self._get_message_detail(account.email, message_id) + parsed = detail.get("parsed") if isinstance(detail, dict) else {} + if not isinstance(parsed, dict): + parsed = {} + + decoded_raw = self._decode_raw_content(detail.get("raw") or "") + search_text = " ".join( + [ + str(message.get("subject") or ""), + str(message.get("from") or ""), + str(message.get("body") or ""), + str(detail.get("from") or ""), + str(parsed.get("subject") or ""), + str(parsed.get("body") or ""), + str(parsed.get("htmlbody") or ""), + decoded_raw, + ] + ).strip() + search_text = re.sub( + r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", + "", + search_text, + ) + if keyword and keyword.lower() not in search_text.lower(): + continue + + code = self._safe_extract(search_text, code_pattern) + if code and code in exclude_codes: + continue + if code: + self._log(f"[OpenTrashMail] 收到验证码: {code}") + return code + except Exception: + pass + return None + + return self._run_polling_wait( + timeout=timeout, + poll_interval=3, + poll_once=poll_once, + ) + + class CFWorkerMailbox(BaseMailbox): """Cloudflare Worker 自建临时邮箱服务""" diff --git a/frontend/src/pages/RegisterTaskPage.tsx b/frontend/src/pages/RegisterTaskPage.tsx index 4d918cd..2be8d71 100644 --- a/frontend/src/pages/RegisterTaskPage.tsx +++ b/frontend/src/pages/RegisterTaskPage.tsx @@ -53,6 +53,9 @@ export default function RegisterTaskPage() { gptmail_base_url: cfg.gptmail_base_url || 'https://mail.chatgpt.org.uk', gptmail_api_key: cfg.gptmail_api_key || '', gptmail_domain: cfg.gptmail_domain || '', + opentrashmail_api_url: cfg.opentrashmail_api_url || '', + opentrashmail_domain: cfg.opentrashmail_domain || '', + opentrashmail_password: cfg.opentrashmail_password || '', maliapi_base_url: cfg.maliapi_base_url || 'https://maliapi.215.im/v1', maliapi_api_key: cfg.maliapi_api_key || '', maliapi_domain: cfg.maliapi_domain || '', @@ -95,6 +98,9 @@ export default function RegisterTaskPage() { gptmail_base_url: values.gptmail_base_url, gptmail_api_key: values.gptmail_api_key, gptmail_domain: values.gptmail_domain, + opentrashmail_api_url: values.opentrashmail_api_url, + opentrashmail_domain: values.opentrashmail_domain, + opentrashmail_password: values.opentrashmail_password, maliapi_base_url: values.maliapi_base_url, maliapi_api_key: values.maliapi_api_key, maliapi_domain: values.maliapi_domain, @@ -268,6 +274,7 @@ export default function RegisterTaskPage() { { value: 'skymail', label: 'SkyMail (CloudMail)' }, { value: 'maliapi', label: 'YYDS Mail / MaliAPI' }, { value: 'gptmail', label: 'GPTMail' }, + { value: 'opentrashmail', label: 'OpenTrashMail' }, { value: 'duckmail', label: 'DuckMail' }, { value: 'freemail', label: 'Freemail' }, { value: 'laoudo', label: 'Laoudo' }, @@ -340,6 +347,27 @@ export default function RegisterTaskPage() { )} + {mailProvider === 'opentrashmail' && ( + <> + + + + + + + + + + + )} {mailProvider === 'cfworker' && ( <> diff --git a/frontend/src/pages/Settings.tsx b/frontend/src/pages/Settings.tsx index 9c41d2c..8a217c6 100644 --- a/frontend/src/pages/Settings.tsx +++ b/frontend/src/pages/Settings.tsx @@ -26,6 +26,7 @@ const SELECT_FIELDS: Record = { { label: 'MoeMail (sall.cc)', value: 'moemail' }, { label: 'YYDS Mail / MaliAPI', value: 'maliapi' }, { label: 'GPTMail', value: 'gptmail' }, + { label: 'OpenTrashMail', value: 'opentrashmail' }, { label: 'Freemail(自建 CF Worker)', value: 'freemail' }, { label: 'CF Worker(自建域名)', value: 'cfworker' }, ], @@ -129,6 +130,15 @@ const TAB_ITEMS = [ { key: 'gptmail_domain', label: '邮箱域名(可选)', placeholder: 'example.com' }, ], }, + { + title: 'OpenTrashMail', + desc: '对接 opentrashmail 服务;可直接轮询 /json/,也支持已知域名时本地拼装随机地址', + fields: [ + { key: 'opentrashmail_api_url', label: 'API URL', placeholder: 'http://mail.example.com:8085' }, + { key: 'opentrashmail_domain', label: '邮箱域名(可选)', placeholder: 'xiyoufm.com' }, + { key: 'opentrashmail_password', label: '站点密码(可选)', secret: true, placeholder: '启用 PASSWORD 时填写' }, + ], + }, { title: 'TempMail.lol', desc: '自动生成邮箱,无需配置,需要代理访问(CN IP 被封)', diff --git a/tests/test_opentrashmail_mailbox.py b/tests/test_opentrashmail_mailbox.py new file mode 100644 index 0000000..498496c --- /dev/null +++ b/tests/test_opentrashmail_mailbox.py @@ -0,0 +1,146 @@ +import unittest +from unittest.mock import patch + +from core.base_mailbox import MailboxAccount, create_mailbox + + +class OpenTrashMailMailboxTests(unittest.TestCase): + def _build_mailbox(self, **extra): + config = { + "opentrashmail_api_url": "https://mail.example.com", + "opentrashmail_password": "secret-pass", + } + config.update(extra) + return create_mailbox("opentrashmail", extra=config) + + @patch("requests.request") + def test_get_email_can_compose_local_address_when_domain_configured(self, mock_request): + mailbox = self._build_mailbox(opentrashmail_domain="xiyoufm.com") + + with patch.object(type(mailbox), "_generate_local_part", return_value="demo1234"): + account = mailbox.get_email() + + self.assertEqual(account.email, "demo1234@xiyoufm.com") + self.assertEqual(account.account_id, "demo1234@xiyoufm.com") + self.assertEqual(account.extra["domain"], "xiyoufm.com") + mock_request.assert_not_called() + + @patch("requests.request") + def test_get_email_parses_random_address_from_html(self, mock_request): + mock_request.return_value.status_code = 200 + mock_request.return_value.text = """ + + + """ + + mailbox = self._build_mailbox() + account = mailbox.get_email() + + self.assertEqual(account.email, "ashamed.glove@xiyoufm.com") + self.assertEqual(account.account_id, "ashamed.glove@xiyoufm.com") + mock_request.assert_called_once_with( + "GET", + "https://mail.example.com/api/random", + params={"password": "secret-pass"}, + json=None, + headers={"accept": "application/json, text/plain, */*"}, + proxies=None, + timeout=15, + ) + + @patch("requests.request") + def test_get_current_ids_reads_json_listing(self, mock_request): + mock_request.return_value.status_code = 200 + mock_request.return_value.json.return_value = { + "1775019492111": { + "email": "test@xiyoufm.com", + "subject": "测试", + }, + "1775019500000": { + "email": "test@xiyoufm.com", + "subject": "验证码 123456", + }, + } + mock_request.return_value.text = "" + + mailbox = self._build_mailbox() + ids = mailbox.get_current_ids(MailboxAccount(email="test@xiyoufm.com")) + + self.assertEqual(ids, {"1775019492111", "1775019500000"}) + mock_request.assert_called_once_with( + "GET", + "https://mail.example.com/json/test@xiyoufm.com", + params={"password": "secret-pass"}, + json=None, + headers={"accept": "application/json, text/plain, */*"}, + proxies=None, + timeout=10, + ) + + @patch("time.sleep", return_value=None) + @patch("requests.request") + def test_wait_for_code_reads_detail_and_skips_excluded_codes(self, mock_request, _sleep): + mock_request.side_effect = [ + _response( + { + "m1": { + "email": "test@xiyoufm.com", + "subject": "Your code 111111", + } + } + ), + _response( + { + "raw": "Subject: Your code 111111\r\n\r\n111111", + "parsed": { + "subject": "Your code 111111", + "body": "111111", + }, + } + ), + _response( + { + "m1": { + "email": "test@xiyoufm.com", + "subject": "Your code 111111", + }, + "m2": { + "email": "test@xiyoufm.com", + "subject": "Your code 222222", + }, + } + ), + _response( + { + "raw": "Subject: verification code\r\n\r\n222222", + "parsed": { + "subject": "verification code", + "body": "222222", + }, + } + ), + ] + + mailbox = self._build_mailbox() + code = mailbox.wait_for_code( + MailboxAccount(email="test@xiyoufm.com"), + timeout=5, + exclude_codes={"111111"}, + ) + + self.assertEqual(code, "222222") + self.assertEqual(mock_request.call_count, 4) + + +def _response(payload, status_code=200): + response = unittest.mock.Mock() + response.status_code = status_code + response.json.return_value = payload + response.text = "" + return response + + +if __name__ == "__main__": + unittest.main()