Merge pull request #78 from woodnaonly/main

feat: 接入 OpenTrashMail 邮箱服务
This commit is contained in:
zhangchen
2026-04-03 15:42:24 +08:00
committed by GitHub
6 changed files with 461 additions and 0 deletions

View File

@@ -132,6 +132,7 @@
| SkyMail (CloudMail) | `skymail` | 通过 API / Token / 域名使用 |
| YYDS Mail / MaliAPI | `maliapi` | 支持域名与自动域名策略 |
| GPTMail | `gptmail` | 基于 GPTMail API 生成临时邮箱并轮询邮件,支持已知域名时本地拼装随机地址 |
| OpenTrashMail | `opentrashmail` | 对接自建 OpenTrashMail 服务,支持 `/api/random` 自动取号,也支持配置域名后本地拼装随机地址 |
| DuckMail | `duckmail` | 临时邮箱方案 |
| Freemail | `freemail` | 自建邮箱服务 |
| Laoudo | `laoudo` | 固定邮箱方案 |

View File

@@ -33,6 +33,9 @@ CONFIG_KEYS = [
"gptmail_base_url",
"gptmail_api_key",
"gptmail_domain",
"opentrashmail_api_url",
"opentrashmail_domain",
"opentrashmail_password",
"cfworker_api_url",
"cfworker_admin_token",
"cfworker_custom_auth",

View File

@@ -262,6 +262,13 @@ def create_mailbox(
domain=extra.get("gptmail_domain", ""),
proxy=proxy,
)
elif provider == "opentrashmail":
return OpenTrashMailMailbox(
api_url=extra.get("opentrashmail_api_url", ""),
domain=extra.get("opentrashmail_domain", ""),
password=extra.get("opentrashmail_password", ""),
proxy=proxy,
)
elif provider == "cfworker":
return CFWorkerMailbox(
api_url=extra.get("cfworker_api_url", ""),
@@ -1305,6 +1312,272 @@ class GPTMailMailbox(BaseMailbox):
)
class OpenTrashMailMailbox(BaseMailbox):
"""OpenTrashMail 临时邮箱服务"""
def __init__(
self,
api_url: str = "",
domain: str = "",
password: str = "",
proxy: str = None,
):
self.api = str(api_url or "").strip().rstrip("/")
self.domain = self._normalize_domain(domain)
self.password = str(password or "").strip()
self.proxy = build_requests_proxy_config(proxy)
@staticmethod
def _normalize_domain(value: Any) -> str:
domain = str(value or "").strip().lower()
if domain.startswith("@"):
domain = domain[1:]
return domain
@staticmethod
def _generate_local_part() -> str:
import string
prefix = "".join(random.choices(string.ascii_lowercase, k=8))
suffix = "".join(random.choices(string.digits, k=2))
return f"{prefix}{suffix}"
def _headers(self) -> dict[str, str]:
return {"accept": "application/json, text/plain, */*"}
def _request(
self,
method: str,
path: str,
*,
params: dict | None = None,
timeout: int = 15,
):
import requests
request_params = dict(params or {})
if self.password and "password" not in request_params:
request_params["password"] = self.password
return requests.request(
method,
f"{self.api}{path}",
params=request_params or None,
json=None,
headers=self._headers(),
proxies=self.proxy,
timeout=timeout,
)
def _require_api(self) -> None:
if not self.api:
raise RuntimeError(
"OpenTrashMail 未配置 API URL请检查 opentrashmail_api_url"
)
def _build_email_path(self, email: str) -> str:
from urllib.parse import quote
return quote(str(email or "").strip(), safe="@")
def _parse_random_email(self, html_text: str) -> str:
import re
text = str(html_text or "")
if not text:
return ""
match = re.search(r"/address/([^\"'<>\s]+@[^\"'<>\s]+)", text, re.IGNORECASE)
if match:
return str(match.group(1) or "").strip()
match = re.search(
r"([a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,})",
text,
re.IGNORECASE,
)
if match:
return str(match.group(1) or "").strip()
return ""
def _list_messages(self, email: str) -> list[dict[str, Any]]:
self._require_api()
response = self._request(
"GET",
f"/json/{self._build_email_path(email)}",
timeout=10,
)
if response.status_code == 404:
return []
try:
payload = response.json()
except Exception as exc:
preview = (response.text or "")[:200]
raise RuntimeError(
f"OpenTrashMail 收件箱返回非 JSON: HTTP {response.status_code} {preview}"
) from exc
if response.status_code >= 400:
if isinstance(payload, dict) and payload.get("error"):
error = payload.get("error")
else:
error = response.text or f"HTTP {response.status_code}"
raise RuntimeError(f"OpenTrashMail 收件箱查询失败: {str(error).strip()}")
if not payload:
return []
messages: list[dict[str, Any]] = []
if isinstance(payload, dict):
for message_id, item in payload.items():
if not isinstance(item, dict):
continue
message = dict(item)
message.setdefault("id", str(message_id))
messages.append(message)
elif isinstance(payload, list):
for item in payload:
if isinstance(item, dict):
messages.append(item)
return messages
def _get_message_detail(self, email: str, message_id: str) -> dict[str, Any]:
self._require_api()
response = self._request(
"GET",
f"/json/{self._build_email_path(email)}/{message_id}",
timeout=10,
)
if response.status_code == 404:
return {}
try:
payload = response.json()
except Exception as exc:
preview = (response.text or "")[:200]
raise RuntimeError(
f"OpenTrashMail 邮件详情返回非 JSON: HTTP {response.status_code} {preview}"
) from exc
if response.status_code >= 400:
if isinstance(payload, dict) and payload.get("error"):
error = payload.get("error")
else:
error = response.text or f"HTTP {response.status_code}"
raise RuntimeError(f"OpenTrashMail 邮件详情查询失败: {str(error).strip()}")
return payload if isinstance(payload, dict) else {}
def get_email(self) -> MailboxAccount:
if self.domain:
email = f"{self._generate_local_part()}@{self.domain}"
self._log(f"[OpenTrashMail] 本地拼装邮箱: {email}")
return MailboxAccount(
email=email,
account_id=email,
extra={
"provider": "opentrashmail",
"domain": self.domain,
"local_address": True,
},
)
self._require_api()
response = self._request("GET", "/api/random", timeout=15)
if response.status_code >= 400:
raise RuntimeError(
f"OpenTrashMail 随机邮箱生成失败: HTTP {response.status_code}"
)
email = self._parse_random_email(response.text)
if not email:
preview = (response.text or "")[:200]
raise RuntimeError(f"OpenTrashMail 未能解析随机邮箱: {preview}")
self._log(f"[OpenTrashMail] 生成邮箱: {email}")
return MailboxAccount(
email=email,
account_id=email,
extra={"provider": "opentrashmail"},
)
def get_current_ids(self, account: MailboxAccount) -> set:
try:
return {
str(message.get("id"))
for message in self._list_messages(account.email)
if message.get("id") is not None
}
except Exception:
return set()
def wait_for_code(
self,
account: MailboxAccount,
keyword: str = "",
timeout: int = 120,
before_ids: set = None,
code_pattern: str = None,
**kwargs,
) -> str:
import re
seen = {str(mid) for mid in (before_ids or set())}
exclude_codes = {
str(code) for code in (kwargs.get("exclude_codes") or set()) if code
}
def poll_once() -> Optional[str]:
try:
messages = self._list_messages(account.email)
for message in messages:
message_id = str(message.get("id") or "").strip()
if not message_id or message_id in seen:
continue
seen.add(message_id)
detail = self._get_message_detail(account.email, message_id)
parsed = detail.get("parsed") if isinstance(detail, dict) else {}
if not isinstance(parsed, dict):
parsed = {}
decoded_raw = self._decode_raw_content(detail.get("raw") or "")
search_text = " ".join(
[
str(message.get("subject") or ""),
str(message.get("from") or ""),
str(message.get("body") or ""),
str(detail.get("from") or ""),
str(parsed.get("subject") or ""),
str(parsed.get("body") or ""),
str(parsed.get("htmlbody") or ""),
decoded_raw,
]
).strip()
search_text = re.sub(
r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"",
search_text,
)
if keyword and keyword.lower() not in search_text.lower():
continue
code = self._safe_extract(search_text, code_pattern)
if code and code in exclude_codes:
continue
if code:
self._log(f"[OpenTrashMail] 收到验证码: {code}")
return code
except Exception:
pass
return None
return self._run_polling_wait(
timeout=timeout,
poll_interval=3,
poll_once=poll_once,
)
class CFWorkerMailbox(BaseMailbox):
"""Cloudflare Worker 自建临时邮箱服务"""

View File

@@ -53,6 +53,9 @@ export default function RegisterTaskPage() {
gptmail_base_url: cfg.gptmail_base_url || 'https://mail.chatgpt.org.uk',
gptmail_api_key: cfg.gptmail_api_key || '',
gptmail_domain: cfg.gptmail_domain || '',
opentrashmail_api_url: cfg.opentrashmail_api_url || '',
opentrashmail_domain: cfg.opentrashmail_domain || '',
opentrashmail_password: cfg.opentrashmail_password || '',
maliapi_base_url: cfg.maliapi_base_url || 'https://maliapi.215.im/v1',
maliapi_api_key: cfg.maliapi_api_key || '',
maliapi_domain: cfg.maliapi_domain || '',
@@ -95,6 +98,9 @@ export default function RegisterTaskPage() {
gptmail_base_url: values.gptmail_base_url,
gptmail_api_key: values.gptmail_api_key,
gptmail_domain: values.gptmail_domain,
opentrashmail_api_url: values.opentrashmail_api_url,
opentrashmail_domain: values.opentrashmail_domain,
opentrashmail_password: values.opentrashmail_password,
maliapi_base_url: values.maliapi_base_url,
maliapi_api_key: values.maliapi_api_key,
maliapi_domain: values.maliapi_domain,
@@ -268,6 +274,7 @@ export default function RegisterTaskPage() {
{ value: 'skymail', label: 'SkyMail (CloudMail)' },
{ value: 'maliapi', label: 'YYDS Mail / MaliAPI' },
{ value: 'gptmail', label: 'GPTMail' },
{ value: 'opentrashmail', label: 'OpenTrashMail' },
{ value: 'duckmail', label: 'DuckMail' },
{ value: 'freemail', label: 'Freemail' },
{ value: 'laoudo', label: 'Laoudo' },
@@ -340,6 +347,27 @@ export default function RegisterTaskPage() {
</Form.Item>
</>
)}
{mailProvider === 'opentrashmail' && (
<>
<Form.Item name="opentrashmail_api_url" label="API URL" rules={[{ required: true, message: '请输入 OpenTrashMail 地址' }]}>
<Input placeholder="http://mail.example.com:8085" />
</Form.Item>
<Form.Item
name="opentrashmail_domain"
label="邮箱域名(可选)"
extra="已知 OpenTrashMail 当前启用域名时可直接本地拼装随机地址;留空则调用 /api/random 自动获取"
>
<Input placeholder="xiyoufm.com" />
</Form.Item>
<Form.Item
name="opentrashmail_password"
label="站点密码(可选)"
extra="当 OpenTrashMail 开启 PASSWORD 保护时填写,会自动追加到 JSON API 查询参数"
>
<Input.Password placeholder="留空表示未启用" />
</Form.Item>
</>
)}
{mailProvider === 'cfworker' && (
<>
<Form.Item name="cfworker_api_url" label="API URL">

View File

@@ -26,6 +26,7 @@ const SELECT_FIELDS: Record<string, { label: string; value: string }[]> = {
{ label: 'MoeMail (sall.cc)', value: 'moemail' },
{ label: 'YYDS Mail / MaliAPI', value: 'maliapi' },
{ label: 'GPTMail', value: 'gptmail' },
{ label: 'OpenTrashMail', value: 'opentrashmail' },
{ label: 'Freemail自建 CF Worker', value: 'freemail' },
{ label: 'CF Worker自建域名', value: 'cfworker' },
],
@@ -129,6 +130,15 @@ const TAB_ITEMS = [
{ key: 'gptmail_domain', label: '邮箱域名(可选)', placeholder: 'example.com' },
],
},
{
title: 'OpenTrashMail',
desc: '对接 opentrashmail 服务;可直接轮询 /json/<email>,也支持已知域名时本地拼装随机地址',
fields: [
{ key: 'opentrashmail_api_url', label: 'API URL', placeholder: 'http://mail.example.com:8085' },
{ key: 'opentrashmail_domain', label: '邮箱域名(可选)', placeholder: 'xiyoufm.com' },
{ key: 'opentrashmail_password', label: '站点密码(可选)', secret: true, placeholder: '启用 PASSWORD 时填写' },
],
},
{
title: 'TempMail.lol',
desc: '自动生成邮箱无需配置需要代理访问CN IP 被封)',

View File

@@ -0,0 +1,146 @@
import unittest
from unittest.mock import patch
from core.base_mailbox import MailboxAccount, create_mailbox
class OpenTrashMailMailboxTests(unittest.TestCase):
def _build_mailbox(self, **extra):
config = {
"opentrashmail_api_url": "https://mail.example.com",
"opentrashmail_password": "secret-pass",
}
config.update(extra)
return create_mailbox("opentrashmail", extra=config)
@patch("requests.request")
def test_get_email_can_compose_local_address_when_domain_configured(self, mock_request):
mailbox = self._build_mailbox(opentrashmail_domain="xiyoufm.com")
with patch.object(type(mailbox), "_generate_local_part", return_value="demo1234"):
account = mailbox.get_email()
self.assertEqual(account.email, "demo1234@xiyoufm.com")
self.assertEqual(account.account_id, "demo1234@xiyoufm.com")
self.assertEqual(account.extra["domain"], "xiyoufm.com")
mock_request.assert_not_called()
@patch("requests.request")
def test_get_email_parses_random_address_from_html(self, mock_request):
mock_request.return_value.status_code = 200
mock_request.return_value.text = """
<nav aria-label="breadcrumb">
<ul><li>ashamed.glove@xiyoufm.com</li></ul>
</nav>
<script>history.pushState({urlpath:"/address/ashamed.glove@xiyoufm.com"}, "", "/address/ashamed.glove@xiyoufm.com");</script>
"""
mailbox = self._build_mailbox()
account = mailbox.get_email()
self.assertEqual(account.email, "ashamed.glove@xiyoufm.com")
self.assertEqual(account.account_id, "ashamed.glove@xiyoufm.com")
mock_request.assert_called_once_with(
"GET",
"https://mail.example.com/api/random",
params={"password": "secret-pass"},
json=None,
headers={"accept": "application/json, text/plain, */*"},
proxies=None,
timeout=15,
)
@patch("requests.request")
def test_get_current_ids_reads_json_listing(self, mock_request):
mock_request.return_value.status_code = 200
mock_request.return_value.json.return_value = {
"1775019492111": {
"email": "test@xiyoufm.com",
"subject": "测试",
},
"1775019500000": {
"email": "test@xiyoufm.com",
"subject": "验证码 123456",
},
}
mock_request.return_value.text = ""
mailbox = self._build_mailbox()
ids = mailbox.get_current_ids(MailboxAccount(email="test@xiyoufm.com"))
self.assertEqual(ids, {"1775019492111", "1775019500000"})
mock_request.assert_called_once_with(
"GET",
"https://mail.example.com/json/test@xiyoufm.com",
params={"password": "secret-pass"},
json=None,
headers={"accept": "application/json, text/plain, */*"},
proxies=None,
timeout=10,
)
@patch("time.sleep", return_value=None)
@patch("requests.request")
def test_wait_for_code_reads_detail_and_skips_excluded_codes(self, mock_request, _sleep):
mock_request.side_effect = [
_response(
{
"m1": {
"email": "test@xiyoufm.com",
"subject": "Your code 111111",
}
}
),
_response(
{
"raw": "Subject: Your code 111111\r\n\r\n111111",
"parsed": {
"subject": "Your code 111111",
"body": "111111",
},
}
),
_response(
{
"m1": {
"email": "test@xiyoufm.com",
"subject": "Your code 111111",
},
"m2": {
"email": "test@xiyoufm.com",
"subject": "Your code 222222",
},
}
),
_response(
{
"raw": "Subject: verification code\r\n\r\n222222",
"parsed": {
"subject": "verification code",
"body": "222222",
},
}
),
]
mailbox = self._build_mailbox()
code = mailbox.wait_for_code(
MailboxAccount(email="test@xiyoufm.com"),
timeout=5,
exclude_codes={"111111"},
)
self.assertEqual(code, "222222")
self.assertEqual(mock_request.call_count, 4)
def _response(payload, status_code=200):
response = unittest.mock.Mock()
response.status_code = status_code
response.json.return_value = payload
response.text = ""
return response
if __name__ == "__main__":
unittest.main()