Files
any-auto-register/services/external_sync.py
2026-04-14 15:00:55 +08:00

238 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""外部系统同步(自动导入 / 回填)"""
from __future__ import annotations
from typing import Any
from services.chatgpt_sync import (
_get_account_extra,
persist_cpa_sync_result,
persist_sub2api_sync_result,
upload_chatgpt_account_to_cpa,
)
def _is_config_enabled(value: Any, default: bool = False) -> bool:
normalized = str(value or "").strip().lower()
if not normalized:
return default
return normalized in {"1", "true", "yes", "on", "enabled"}
def _pick_text(source: Any, *keys: str, default: str = "") -> str:
if not isinstance(source, dict):
return default
for key in keys:
value = source.get(key)
if value is None:
continue
text = value.strip() if isinstance(value, str) else str(value).strip()
if text:
return text
return default
def sync_account(account) -> list[dict[str, Any]]:
"""根据平台将账号同步到外部系统。"""
from core.config_store import config_store
platform = getattr(account, "platform", "")
results: list[dict[str, Any]] = []
def _build_chatgpt_upload_account():
class _A:
pass
a = _A()
a.email = account.email
extra = _get_account_extra(account)
a.access_token = _pick_text(extra, "access_token", "accessToken") or account.token
a.refresh_token = _pick_text(extra, "refresh_token", "refreshToken")
a.id_token = _pick_text(extra, "id_token", "idToken")
a.session_token = _pick_text(extra, "session_token", "sessionToken")
a.client_id = _pick_text(extra, "client_id", "clientId", default="app_EMoamEEZ73f0CkXaXp7hrann")
return a
if platform == "chatgpt":
upload_account = _build_chatgpt_upload_account()
# 贡献模式优先级最高:开启后仅上传到贡献服务器,避免重复上报到其它平台。
contribution_enabled = _is_config_enabled(config_store.get("contribution_enabled", "0"))
if contribution_enabled:
contribution_mode = str(config_store.get("contribution_mode", "codex") or "codex").strip().lower()
if contribution_mode == "custom":
# 自定义贡献系统模式
custom_url = str(config_store.get("custom_contribution_url", "") or "").strip()
custom_token = str(config_store.get("custom_contribution_token", "") or "").strip()
if not custom_url:
msg = "自定义贡献服务器地址未配置"
persist_cpa_sync_result(account, False, msg)
results.append({"name": "CustomContribution", "ok": False, "msg": msg})
return results
if not custom_token:
msg = "自定义贡献系统 token 未配置(请先绑定邮箱)"
persist_cpa_sync_result(account, False, msg)
results.append({"name": "CustomContribution", "ok": False, "msg": msg})
return results
try:
import requests
from platforms.chatgpt.cpa_upload import generate_token_json
# 生成完整的 token JSON
extra = _get_account_extra(account)
token_json = generate_token_json(account)
# 如果 token_json 中没有 refresh_token从 extra 获取
if not token_json.get("refresh_token"):
refresh_token = _pick_text(extra, "refresh_token", "refreshToken")
print(f"[DEBUG] extra keys: {list(extra.keys())}")
print(f"[DEBUG] refresh_token from extra: {refresh_token[:20] if refresh_token else 'EMPTY'}")
if refresh_token:
token_json["refresh_token"] = refresh_token
if not token_json.get("access_token"):
access_token = _pick_text(extra, "access_token", "accessToken") or getattr(account, "token", "")
if access_token:
token_json["access_token"] = access_token
if not token_json.get("id_token"):
id_token = _pick_text(extra, "id_token", "idToken")
if id_token:
token_json["id_token"] = id_token
if not token_json.get("client_id"):
client_id = _pick_text(extra, "client_id", "clientId")
if client_id:
token_json["client_id"] = client_id
refresh_token = str(token_json.get("refresh_token") or "").strip()
access_token = str(token_json.get("access_token") or "").strip()
# 验证必须有 refresh_token
print(f"[DEBUG] Final token_json keys: {list(token_json.keys())}")
print(f"[DEBUG] Final refresh_token: {refresh_token[:20] if refresh_token else 'EMPTY'}")
if not refresh_token:
msg = "账号缺少 refresh_token"
persist_cpa_sync_result(account, False, msg)
results.append({"name": "CustomContribution", "ok": False, "msg": msg})
return results
resp = requests.post(
f"{custom_url.rstrip('/')}/api/upload",
json={
"email": account.email,
"refresh_token": refresh_token,
"access_token": access_token,
"token_json": token_json,
},
headers={"Authorization": f"Bearer {custom_token}"},
timeout=15,
)
data = resp.json()
if resp.status_code >= 400:
msg = data.get("error") or data.get("message") or str(data)
persist_cpa_sync_result(account, False, msg)
results.append({"name": "CustomContribution", "ok": False, "msg": msg})
return results
msg = f"上传成功: {data.get('message', '')}"
persist_cpa_sync_result(account, True, msg)
results.append({"name": "CustomContribution", "ok": True, "msg": msg})
return results
except Exception as exc:
msg = f"上传到自定义贡献系统失败: {exc}"
persist_cpa_sync_result(account, False, msg)
results.append({"name": "CustomContribution", "ok": False, "msg": msg})
return results
else:
# codex2api 模式(原有逻辑)
contribution_url = str(config_store.get("contribution_server_url", "") or "").strip()
contribution_key = str(config_store.get("contribution_key", "") or "").strip()
if not contribution_url:
msg = "Contribution 服务器地址未配置"
persist_cpa_sync_result(account, False, msg)
results.append({"name": "Contribution", "ok": False, "msg": msg})
return results
ok, msg = upload_chatgpt_account_to_cpa(
account,
api_url=contribution_url,
api_key=contribution_key or None,
)
persist_cpa_sync_result(account, ok, msg)
results.append({"name": "Contribution", "ok": ok, "msg": msg})
return results
cpa_url = str(config_store.get("cpa_api_url", "") or "").strip()
cpa_enabled = _is_config_enabled(
config_store.get("cpa_enabled", ""),
default=bool(cpa_url),
)
if cpa_enabled and cpa_url:
ok, msg = upload_chatgpt_account_to_cpa(account)
persist_cpa_sync_result(account, ok, msg)
results.append({"name": "CPA", "ok": ok, "msg": msg})
codex_proxy_url = str(config_store.get("codex_proxy_url", "") or "").strip()
if codex_proxy_url:
upload_type = str(config_store.get("codex_proxy_upload_type", "at") or "at").strip().lower()
extra = _get_account_extra(account)
class _CP:
pass
cp = _CP()
cp.access_token = _pick_text(extra, "access_token", "accessToken") or account.token
cp.refresh_token = _pick_text(extra, "refresh_token", "refreshToken")
if upload_type == "rt":
from platforms.chatgpt.cpa_upload import upload_to_codex_proxy
ok, msg = upload_to_codex_proxy(cp)
results.append({"name": "CodexProxy(RT)", "ok": ok, "msg": msg})
else:
from platforms.chatgpt.cpa_upload import upload_at_to_codex_proxy
ok, msg = upload_at_to_codex_proxy(cp)
results.append({"name": "CodexProxy(AT)", "ok": ok, "msg": msg})
# 关键逻辑ChatGPT 现在支持同时回填 CPA 和 Sub2API互不覆盖、分别上报结果。
sub2api_url = str(config_store.get("sub2api_api_url", "") or "").strip()
sub2api_key = str(config_store.get("sub2api_api_key", "") or "").strip()
sub2api_enabled = _is_config_enabled(
config_store.get("sub2api_enabled", ""),
default=bool(sub2api_url and sub2api_key),
)
if sub2api_enabled and sub2api_url and sub2api_key:
from platforms.chatgpt.sub2api_upload import upload_to_sub2api
ok, msg = upload_to_sub2api(
upload_account,
api_url=sub2api_url,
api_key=sub2api_key,
)
persist_sub2api_sync_result(account, ok, msg)
results.append({"name": "Sub2API", "ok": ok, "msg": msg})
elif platform == "grok":
grok2api_url = str(config_store.get("grok2api_url", "") or "").strip()
if grok2api_url:
from services.grok2api_runtime import ensure_grok2api_ready
from platforms.grok.grok2api_upload import upload_to_grok2api
ready, ready_msg = ensure_grok2api_ready()
if not ready:
results.append({"name": "grok2api", "ok": False, "msg": ready_msg})
return results
ok, msg = upload_to_grok2api(account)
results.append({"name": "grok2api", "ok": ok, "msg": msg})
elif platform == "kiro":
from platforms.kiro.account_manager_upload import resolve_manager_path, upload_to_kiro_manager
configured_path = str(config_store.get("kiro_manager_path", "") or "").strip()
target_path = resolve_manager_path(configured_path or None)
if configured_path or target_path.parent.exists() or target_path.exists():
ok, msg = upload_to_kiro_manager(account, path=configured_path or None)
results.append({"name": "Kiro Manager", "ok": ok, "msg": msg})
return results