Files
AI-Account-Toolkit/cloudflare_temp_email/smtp_proxy_server/smtp_server.py

184 lines
6.3 KiB
Python

import asyncio
import logging
import email
import ssl
import httpx
from aiosmtpd.controller import Controller
from aiosmtpd.smtp import SMTP, Session, Envelope, AuthResult, LoginPassword
from config import settings
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.INFO)
def _safe_decode_payload(payload, charset):
if payload is None:
return ""
try:
return payload.decode(charset or "utf-8", errors="replace")
except LookupError:
return payload.decode("utf-8", errors="replace")
class CustomSMTPHandler:
def authenticator(self, server, session, envelope, mechanism, auth_data):
fail_nothandled = AuthResult(success=False, handled=False)
if mechanism not in ("LOGIN", "PLAIN"):
_logger.warning(f"Unsupported mechanism {mechanism}")
return fail_nothandled
if not isinstance(auth_data, LoginPassword):
_logger.warning(f"Invalid auth data {auth_data}")
return fail_nothandled
return AuthResult(success=True, auth_data=auth_data)
async def handle_DATA(self, server: SMTP, session: Session, envelope: Envelope) -> str:
_logger.info(
f"handle_DATA from {envelope.mail_from} to {envelope.rcpt_tos}"
)
if not isinstance(session.auth_data, LoginPassword):
return '530 Authentication required'
if len(envelope.rcpt_tos) != 1:
return '500 Only one recipient allowed'
# Only one recipient allowed
to_mail = envelope.rcpt_tos[0]
# Parse email
msg = email.message_from_string(envelope.content)
content_list = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
charset = part.get_content_charset()
cte = str(part.get('content-transfer-encoding', '')).lower()
if content_type not in ["text/plain", "text/html"]:
_logger.warning(f"Skipping {content_type}")
continue
if cte == "8bit":
value = part.get_payload(decode=False)
else:
payload = part.get_payload(decode=True)
value = _safe_decode_payload(payload, charset)
if not value:
continue
content_list.append({
"type": content_type,
"value": value
})
elif msg.get_content_type() in ["text/plain", "text/html"] and msg.get_payload(decode=True):
cte = str(msg.get('content-transfer-encoding', '')).lower()
charset = msg.get_content_charset()
if cte == "8bit":
value = msg.get_payload(decode=False)
else:
payload = msg.get_payload(decode=True)
value = _safe_decode_payload(payload, charset)
_logger.debug("Parsed content charset=%s", charset)
content_list.append({
"type": msg.get_content_type(),
"value": value
})
if not content_list:
return '500 Invalid content'
body = max(
content_list,
key=lambda x: (x["type"] == "text/html", len(x["value"]))
)
from_name, _ = email.utils.parseaddr(
str(email.header.make_header(
email.header.decode_header(msg['From'])
))
)
to_mail_map = {}
for to in str(email.header.make_header(
email.header.decode_header(msg['To'])
)).split(","):
tmp_to_name, tmp_to_mail = email.utils.parseaddr(to)
to_mail_map[tmp_to_mail] = tmp_to_name
_logger.info(f"Parsed mail from {from_name} to {to_mail_map}")
# Send mail
send_body = {
"token": session.auth_data.password.decode(),
"from_name": from_name,
"to_name": to_mail_map.get(to_mail),
"to_mail": to_mail,
"subject": str(email.header.make_header(
email.header.decode_header(msg['Subject'])
)),
"is_html": body["type"] == "text/html",
"content": body["value"],
}
_logger.info(f"Send mail {dict(send_body, token='***')}")
try:
res = httpx.post(
f"{settings.proxy_url}/external/api/send_mail",
json=send_body, headers={
"Content-Type": "application/json"
}
)
if res.status_code != 200:
_logger.error(
"Failed to send mail "
f"code=[{res.status_code}] text=[{res.text}]"
)
return f'500 Internal server error code=[{res.status_code}] text=[{res.text}]'
except Exception as e:
_logger.error(e)
return '500 Internal server error'
return '250 OK'
def start_smtp_server():
handler = CustomSMTPHandler()
tls_context = None
has_cert = bool(settings.smtp_tls_cert)
has_key = bool(settings.smtp_tls_key)
if has_cert != has_key:
raise ValueError(
"Both smtp_tls_cert and smtp_tls_key must be set together"
)
if has_cert and has_key:
_logger.info("TLS enabled for SMTP (STARTTLS)")
tls_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
tls_context.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
tls_context.load_cert_chain(settings.smtp_tls_cert, settings.smtp_tls_key)
server = Controller(
handler,
hostname="",
port=settings.port,
auth_require_tls=bool(tls_context),
decode_data=True,
authenticator=handler.authenticator,
auth_exclude_mechanism=["DONT"],
tls_context=tls_context,
)
_logger.info(
"Starting SMTP server on port %s tls=%s",
settings.port, bool(tls_context),
)
server.start()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except KeyboardInterrupt:
_logger.info("Got KeyboardInterrupt, stopping")
server.stop()
if __name__ == "__main__":
_logger.info(
"Starting SMTP server proxy_url=%s port=%s tls=%s",
settings.proxy_url, settings.port,
bool(settings.smtp_tls_cert and settings.smtp_tls_key),
)
start_smtp_server()