mirror of
https://github.com/zc-zhangchen/any-auto-register.git
synced 2026-05-08 16:24:07 +08:00
272 lines
8.1 KiB
Python
272 lines
8.1 KiB
Python
import json
|
|
import re
|
|
import threading
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
_POOL_CURSOR_LOCK = threading.Lock()
|
|
_POOL_CURSORS: dict[str, int] = {}
|
|
|
|
|
|
def _project_root() -> Path:
|
|
return Path.cwd()
|
|
|
|
|
|
def _normalize_pool_dir(pool_dir: str | None = None) -> Path:
|
|
raw = str(pool_dir or "mail").strip() or "mail"
|
|
path = Path(raw)
|
|
if path.is_absolute():
|
|
return path
|
|
return _project_root() / path
|
|
|
|
|
|
def _normalize_filename(filename: str | None = None) -> str:
|
|
raw = Path(str(filename or "").strip() or "").name
|
|
if not raw:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
return f"applemail_{timestamp}.json"
|
|
|
|
safe = re.sub(r"[^A-Za-z0-9._-]+", "_", raw).strip("._")
|
|
if not safe:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
safe = f"applemail_{timestamp}"
|
|
if not safe.lower().endswith(".json"):
|
|
safe += ".json"
|
|
return safe
|
|
|
|
|
|
def _extract_first(values: dict[str, Any], *keys: str) -> str:
|
|
for key in keys:
|
|
text = str(values.get(key) or "").strip()
|
|
if text:
|
|
return text
|
|
return ""
|
|
|
|
|
|
def _normalize_mailbox(value: Any) -> str:
|
|
mailbox = str(value or "").strip()
|
|
return mailbox or "INBOX"
|
|
|
|
|
|
def _normalize_record(entry: Any) -> dict[str, str]:
|
|
if isinstance(entry, str):
|
|
return _normalize_text_record(entry)
|
|
if isinstance(entry, (list, tuple)):
|
|
return _normalize_sequence_record(entry)
|
|
if not isinstance(entry, dict):
|
|
raise ValueError(f"不支持的邮箱记录格式: {type(entry).__name__}")
|
|
|
|
email = _extract_first(entry, "email", "mail", "address", "username")
|
|
client_id = _extract_first(entry, "client_id", "clientId", "clientID")
|
|
refresh_token = _extract_first(
|
|
entry,
|
|
"refresh_token",
|
|
"refreshToken",
|
|
"rt",
|
|
)
|
|
mailbox = _normalize_mailbox(
|
|
_extract_first(entry, "mailbox", "folder", "mail_folder", "mailFolder")
|
|
)
|
|
password = _extract_first(entry, "password", "pass", "pwd")
|
|
|
|
if not email:
|
|
raise ValueError("缺少 email")
|
|
if not client_id or not refresh_token:
|
|
raise ValueError(f"{email} 缺少 client_id 或 refresh_token")
|
|
|
|
record = {
|
|
"email": email,
|
|
"client_id": client_id,
|
|
"refresh_token": refresh_token,
|
|
"mailbox": mailbox,
|
|
}
|
|
if password:
|
|
record["password"] = password
|
|
return record
|
|
|
|
|
|
def _normalize_sequence_record(entry: list[Any] | tuple[Any, ...]) -> dict[str, str]:
|
|
values = [str(item or "").strip() for item in entry if str(item or "").strip()]
|
|
if not values:
|
|
raise ValueError("空邮箱记录")
|
|
if len(values) < 3:
|
|
raise ValueError(f"邮箱记录字段不足: {values}")
|
|
if len(values) >= 4:
|
|
email, password, client_id, refresh_token = values[:4]
|
|
record = {
|
|
"email": email,
|
|
"client_id": client_id,
|
|
"refresh_token": refresh_token,
|
|
"mailbox": "INBOX",
|
|
}
|
|
if password:
|
|
record["password"] = password
|
|
if len(values) >= 5 and values[4]:
|
|
record["mailbox"] = _normalize_mailbox(values[4])
|
|
return record
|
|
|
|
email, client_id, refresh_token = values[:3]
|
|
record = {
|
|
"email": email,
|
|
"client_id": client_id,
|
|
"refresh_token": refresh_token,
|
|
"mailbox": "INBOX",
|
|
}
|
|
if len(values) >= 4 and values[3]:
|
|
record["mailbox"] = _normalize_mailbox(values[3])
|
|
return record
|
|
|
|
|
|
def _normalize_text_record(line: str) -> dict[str, str]:
|
|
text = str(line or "").strip()
|
|
if not text:
|
|
raise ValueError("空邮箱记录")
|
|
|
|
if "----" in text:
|
|
return _normalize_sequence_record(text.split("----"))
|
|
if "\t" in text:
|
|
return _normalize_sequence_record(text.split("\t"))
|
|
return _normalize_sequence_record(text.split())
|
|
|
|
|
|
def _unwrap_json_records(payload: Any) -> list[Any]:
|
|
if isinstance(payload, list):
|
|
return payload
|
|
if isinstance(payload, dict):
|
|
for key in ("data", "items", "accounts", "list", "emails", "mails"):
|
|
if isinstance(payload.get(key), list):
|
|
return payload[key]
|
|
return [payload]
|
|
raise ValueError(f"JSON 根节点必须是对象或数组,当前为 {type(payload).__name__}")
|
|
|
|
|
|
def parse_applemail_pool_content(content: str) -> list[dict[str, str]]:
|
|
text = str(content or "").strip()
|
|
if not text:
|
|
raise ValueError("邮箱池内容为空")
|
|
|
|
if text[:1] in {"[", "{"}:
|
|
payload = json.loads(text)
|
|
items = _unwrap_json_records(payload)
|
|
records = [_normalize_record(item) for item in items]
|
|
else:
|
|
lines = [
|
|
line.strip()
|
|
for line in text.splitlines()
|
|
if line.strip() and not line.strip().startswith("#")
|
|
]
|
|
records = [_normalize_text_record(line) for line in lines]
|
|
|
|
if not records:
|
|
raise ValueError("邮箱池内容为空")
|
|
return records
|
|
|
|
|
|
def resolve_applemail_pool_path(
|
|
*,
|
|
pool_file: str | None = None,
|
|
pool_dir: str | None = None,
|
|
) -> Path:
|
|
base_dir = _normalize_pool_dir(pool_dir)
|
|
base_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
raw_file = str(pool_file or "").strip()
|
|
if raw_file:
|
|
file_path = Path(raw_file)
|
|
if file_path.is_absolute():
|
|
resolved = file_path
|
|
else:
|
|
resolved = base_dir / file_path.name
|
|
if not resolved.exists():
|
|
fallback = _project_root() / raw_file
|
|
if fallback.exists():
|
|
resolved = fallback
|
|
if not resolved.exists():
|
|
raise RuntimeError(f"小苹果邮箱池文件不存在: {resolved}")
|
|
return resolved
|
|
|
|
candidates = [
|
|
path
|
|
for pattern in ("*.json", "*.txt", "*.csv")
|
|
for path in base_dir.glob(pattern)
|
|
if path.is_file()
|
|
]
|
|
if not candidates:
|
|
raise RuntimeError(f"mail 目录下未找到可用的小苹果邮箱池文件: {base_dir}")
|
|
candidates.sort(key=lambda item: (item.stat().st_mtime, item.name), reverse=True)
|
|
return candidates[0]
|
|
|
|
|
|
def load_applemail_pool_records(
|
|
*,
|
|
pool_file: str | None = None,
|
|
pool_dir: str | None = None,
|
|
) -> tuple[Path, list[dict[str, str]]]:
|
|
path = resolve_applemail_pool_path(pool_file=pool_file, pool_dir=pool_dir)
|
|
content = path.read_text(encoding="utf-8", errors="ignore")
|
|
records = parse_applemail_pool_content(content)
|
|
return path, records
|
|
|
|
|
|
def load_applemail_pool_snapshot(
|
|
*,
|
|
pool_file: str | None = None,
|
|
pool_dir: str | None = None,
|
|
preview_limit: int = 100,
|
|
) -> dict[str, Any]:
|
|
path, records = load_applemail_pool_records(pool_file=pool_file, pool_dir=pool_dir)
|
|
limit = max(int(preview_limit or 0), 0)
|
|
items = [
|
|
{
|
|
"index": idx,
|
|
"email": record["email"],
|
|
"mailbox": record.get("mailbox") or "INBOX",
|
|
}
|
|
for idx, record in enumerate(records[:limit], start=1)
|
|
]
|
|
return {
|
|
"filename": path.name,
|
|
"path": str(path),
|
|
"count": len(records),
|
|
"items": items,
|
|
"truncated": len(records) > limit if limit > 0 else len(records) > 0,
|
|
}
|
|
|
|
|
|
def take_next_applemail_record(
|
|
*,
|
|
pool_file: str | None = None,
|
|
pool_dir: str | None = None,
|
|
) -> tuple[Path, dict[str, str]]:
|
|
path, records = load_applemail_pool_records(pool_file=pool_file, pool_dir=pool_dir)
|
|
key = str(path.resolve())
|
|
with _POOL_CURSOR_LOCK:
|
|
index = _POOL_CURSORS.get(key, 0)
|
|
record = records[index % len(records)]
|
|
_POOL_CURSORS[key] = index + 1
|
|
return path, record
|
|
|
|
|
|
def save_applemail_pool_json(
|
|
content: str,
|
|
*,
|
|
pool_dir: str | None = None,
|
|
filename: str | None = None,
|
|
) -> dict[str, Any]:
|
|
records = parse_applemail_pool_content(content)
|
|
output_dir = _normalize_pool_dir(pool_dir)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
safe_name = _normalize_filename(filename)
|
|
path = output_dir / safe_name
|
|
path.write_text(
|
|
json.dumps(records, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
return {
|
|
"filename": safe_name,
|
|
"path": str(path),
|
|
"count": len(records),
|
|
}
|