Files
any-auto-register/api/config.py

202 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import APIRouter
from pydantic import BaseModel
from core.config_store import config_store
router = APIRouter(prefix="/config", tags=["config"])
CONFIG_KEYS = [
"laoudo_auth",
"laoudo_email",
"laoudo_account_id",
"yescaptcha_key",
"twocaptcha_key",
"default_executor",
"default_captcha_solver",
"duckmail_api_url",
"duckmail_provider_url",
"duckmail_bearer",
"duckmail_domain",
"duckmail_api_key",
"freemail_api_url",
"freemail_admin_token",
"freemail_username",
"freemail_password",
"freemail_domain",
"moemail_api_url",
"moemail_api_key",
"skymail_api_base",
"skymail_token",
"skymail_domain",
"cloudmail_api_base",
"cloudmail_admin_email",
"cloudmail_admin_password",
"cloudmail_domain",
"cloudmail_subdomain",
"cloudmail_timeout",
"mail_provider",
"mailbox_otp_timeout_seconds",
"maliapi_base_url",
"maliapi_api_key",
"maliapi_domain",
"maliapi_auto_domain_strategy",
"applemail_base_url",
"applemail_pool_dir",
"applemail_pool_file",
"applemail_mailboxes",
"gptmail_base_url",
"gptmail_api_key",
"gptmail_domain",
"opentrashmail_api_url",
"opentrashmail_domain",
"opentrashmail_password",
"cfworker_api_url",
"cfworker_admin_token",
"cfworker_custom_auth",
"cfworker_domain",
"cfworker_domains",
"cfworker_enabled_domains",
"cfworker_subdomain",
"cfworker_random_subdomain",
"cfworker_random_name_subdomain",
"cfworker_fingerprint",
"smstome_cookie",
"smstome_country_slugs",
"smstome_phone_attempts",
"smstome_otp_timeout_seconds",
"smstome_poll_interval_seconds",
"smstome_sync_max_pages_per_country",
"luckmail_base_url",
"luckmail_api_key",
"luckmail_email_type",
"luckmail_domain",
"cpa_enabled",
"cpa_api_url",
"cpa_api_key",
"cpa_cleanup_enabled",
"cpa_cleanup_interval_minutes",
"cpa_cleanup_threshold",
"cpa_cleanup_concurrency",
"cpa_cleanup_register_delay_seconds",
"sub2api_enabled",
"sub2api_api_url",
"sub2api_api_key",
"sub2api_group_ids",
"team_manager_url",
"team_manager_key",
"codex_proxy_url",
"codex_proxy_key",
"codex_proxy_upload_type",
"cliproxyapi_base_url",
"cliproxyapi_management_key",
"grok2api_url",
"grok2api_app_key",
"grok2api_pool",
"grok2api_quota",
"kiro_manager_path",
"kiro_manager_exe",
"contribution_enabled",
"contribution_server_url",
"contribution_key",
]
class ConfigUpdate(BaseModel):
data: dict
class AppleMailImportRequest(BaseModel):
content: str
filename: str = ""
pool_dir: str = ""
bind_to_config: bool = True
@router.get("")
def get_config():
all_cfg = config_store.get_all()
if not all_cfg.get("mail_provider"):
all_cfg["mail_provider"] = "luckmail"
if not all_cfg.get("applemail_base_url"):
all_cfg["applemail_base_url"] = "https://www.appleemail.top"
if not all_cfg.get("applemail_pool_dir"):
all_cfg["applemail_pool_dir"] = "mail"
if not all_cfg.get("applemail_mailboxes"):
all_cfg["applemail_mailboxes"] = "INBOX,Junk"
if not all_cfg.get("gptmail_base_url"):
all_cfg["gptmail_base_url"] = "https://mail.chatgpt.org.uk"
if not all_cfg.get("luckmail_base_url"):
all_cfg["luckmail_base_url"] = "https://mails.luckyous.com/"
if not all_cfg.get("contribution_server_url"):
all_cfg["contribution_server_url"] = "http://new.xem8k5.top:7317/"
# 只返回已知 key未设置的返回空字符串
return {k: all_cfg.get(k, "") for k in CONFIG_KEYS}
@router.put("")
def update_config(body: ConfigUpdate):
# 只允许更新已知 key
safe = {k: v for k, v in body.data.items() if k in CONFIG_KEYS}
config_store.set_many(safe)
return {"ok": True, "updated": list(safe.keys())}
@router.post("/applemail/import")
def import_applemail_pool(body: AppleMailImportRequest):
from core.applemail_pool import load_applemail_pool_snapshot, save_applemail_pool_json
pool_dir = str(body.pool_dir or config_store.get("applemail_pool_dir", "mail")).strip() or "mail"
result = save_applemail_pool_json(
body.content,
pool_dir=pool_dir,
filename=body.filename,
)
if body.bind_to_config:
config_store.set_many(
{
"applemail_pool_dir": pool_dir,
"applemail_pool_file": result["filename"],
}
)
snapshot = load_applemail_pool_snapshot(
pool_file=result["filename"],
pool_dir=pool_dir,
)
return {
**result,
"pool_dir": pool_dir,
"bound_to_config": body.bind_to_config,
"items": snapshot["items"],
"truncated": snapshot["truncated"],
}
@router.get("/applemail/pool")
def get_applemail_pool_snapshot(
pool_dir: str = "",
pool_file: str = "",
):
from core.applemail_pool import load_applemail_pool_snapshot
resolved_pool_dir = str(pool_dir or config_store.get("applemail_pool_dir", "mail")).strip() or "mail"
resolved_pool_file = str(pool_file or config_store.get("applemail_pool_file", "")).strip()
try:
snapshot = load_applemail_pool_snapshot(
pool_file=resolved_pool_file,
pool_dir=resolved_pool_dir,
)
except Exception:
snapshot = {
"filename": resolved_pool_file,
"path": "",
"count": 0,
"items": [],
"truncated": False,
}
return {
**snapshot,
"pool_dir": resolved_pool_dir,
}