diff --git a/api/actions.py b/api/actions.py index ca0ddb0..68ab990 100644 --- a/api/actions.py +++ b/api/actions.py @@ -75,7 +75,7 @@ def _apply_action_result( from datetime import datetime, timezone acc_model.updated_at = datetime.now(timezone.utc) session.add(acc_model) - if platform == "chatgpt" and action_id == "upload_cpa": + if action_id == "upload_cpa": from services.chatgpt_sync import update_account_model_cpa_sync sync_msg = result.get("data") or result.get("error") or "" diff --git a/api/config.py b/api/config.py index fcd1809..2805e17 100644 --- a/api/config.py +++ b/api/config.py @@ -98,6 +98,9 @@ CONFIG_KEYS = [ "grok2api_quota", "kiro_manager_path", "kiro_manager_exe", + "qwen_cpa_enabled", + "qwen_cpa_api_url", + "qwen_cpa_api_key", "external_apps_update_mode", "contribution_enabled", "contribution_server_url", diff --git a/core/base_platform.py b/core/base_platform.py index 06561b6..71f7352 100644 --- a/core/base_platform.py +++ b/core/base_platform.py @@ -49,11 +49,20 @@ class BasePlatform(ABC): def __init__(self, config: RegisterConfig = None): self.config = config or RegisterConfig() self._task_control = None - if self.config.executor_type not in self.supported_executors: - raise NotImplementedError( - f"{self.display_name} 暂不支持 '{self.config.executor_type}' 执行器," - f"当前支持: {self.supported_executors}" + requested_executor = str(self.config.executor_type or "").strip() or "protocol" + if requested_executor not in self.supported_executors: + fallback = ( + "protocol" + if "protocol" in self.supported_executors + else (self.supported_executors[0] if self.supported_executors else "protocol") ) + print( + f"[{self.display_name or self.name}] 执行器 '{requested_executor}' 不受支持," + f"自动切换为 '{fallback}' (支持: {self.supported_executors})" + ) + self.config.executor_type = fallback + else: + self.config.executor_type = requested_executor @abstractmethod def register(self, email: str, password: str = None) -> Account: diff --git a/core/executors/playwright.py b/core/executors/playwright.py index 9694cf4..8de4ad4 100644 --- a/core/executors/playwright.py +++ b/core/executors/playwright.py @@ -52,6 +52,16 @@ class PlaywrightExecutor(BaseExecutor): raise RuntimeError("Playwright context 未初始化") return self._context + @property + def page(self) -> Any: + """兼容平台插件直接访问 executor.page 的用法。""" + return self._require_page() + + @property + def context(self) -> Any: + """兼容平台插件直接访问 executor.context 的用法。""" + return self._require_context() + def get(self, url, *, headers=None, params=None) -> Response: import urllib.parse diff --git a/frontend/src/lib/platformExecutorOptions.ts b/frontend/src/lib/platformExecutorOptions.ts index b5a02b9..dd2f55f 100644 --- a/frontend/src/lib/platformExecutorOptions.ts +++ b/frontend/src/lib/platformExecutorOptions.ts @@ -11,6 +11,7 @@ const PLATFORM_EXECUTORS: Record = { kiro: ['protocol', 'headless', 'headed'], tavily: ['protocol', 'headless', 'headed'], trae: ['protocol', 'headless', 'headed'], + qwen: ['headless', 'headed'], openblocklabs: ['protocol'], } diff --git a/frontend/src/pages/Accounts.tsx b/frontend/src/pages/Accounts.tsx index ae8a663..f7efb24 100644 --- a/frontend/src/pages/Accounts.tsx +++ b/frontend/src/pages/Accounts.tsx @@ -368,6 +368,7 @@ function ActionMenu({ acc, onRefresh, actions }: { acc: any; onRefresh: () => vo const [resultUrl, setResultUrl] = useState('') const [resultProbe, setResultProbe] = useState(null) const [resultCliproxySync, setResultCliproxySync] = useState(null) + const [runningActionId, setRunningActionId] = useState(null) const showResult = (title: string, status: 'success' | 'error', text: string, url = '', probe: any = null, cliproxySync: any = null) => { setResultTitle(title) @@ -390,7 +391,11 @@ function ActionMenu({ acc, onRefresh, actions }: { acc: any; onRefresh: () => vo } const handleAction = async (actionId: string) => { + if (runningActionId) return const actionLabel = actions.find((item) => item.id === actionId)?.label || actionId + const toastKey = `account-action:${acc?.id}:${actionId}` + setRunningActionId(actionId) + message.loading({ content: `${actionLabel}运行中...`, key: toastKey, duration: 0 }) try { const r = await apiFetch(`/actions/${acc.platform}/${acc.id}/${actionId}`, { @@ -401,6 +406,7 @@ function ActionMenu({ acc, onRefresh, actions }: { acc: any; onRefresh: () => vo const data = r.data || {} const probe = typeof data === 'object' && data ? data.probe || null : null const cliproxySync = typeof data === 'object' && data ? data.sync || null : null + message.error({ content: `${actionLabel}失败`, key: toastKey }) showResult(actionLabel, 'error', r.error || data.message || '操作失败', '', probe, cliproxySync) onRefresh() return @@ -408,10 +414,10 @@ function ActionMenu({ acc, onRefresh, actions }: { acc: any; onRefresh: () => vo const data = r.data || {} if (data.url || data.checkout_url || data.cashier_url) { const targetUrl = data.url || data.checkout_url || data.cashier_url - message.success('链接已生成') + message.success({ content: `${actionLabel}完成`, key: toastKey }) showResult(actionLabel, 'success', '操作成功,请在弹窗中打开或复制链接。', targetUrl) } else { - message.success(data.message || '操作成功') + message.success({ content: data.message || `${actionLabel}完成`, key: toastKey }) const probe = typeof data === 'object' && data ? data.probe || null : null const cliproxySync = typeof data === 'object' && data ? data.sync || null : null const text = @@ -429,14 +435,17 @@ function ActionMenu({ acc, onRefresh, actions }: { acc: any; onRefresh: () => vo onRefresh() } catch (e: any) { const detail = e?.message ? String(e.message) : '请求失败' - message.error(detail) + message.error({ content: detail, key: toastKey }) showResult(actionLabel, 'error', detail) + } finally { + setRunningActionId(null) } } const menuItems: MenuProps['items'] = actions.map((a) => ({ key: a.id, - label: a.label, + label: runningActionId === a.id ? `${a.label}(运行中)` : a.label, + disabled: Boolean(runningActionId), })) if (actions.length === 0) return null @@ -449,7 +458,12 @@ function ActionMenu({ acc, onRefresh, actions }: { acc: any; onRefresh: () => vo onClick: ({ key }) => handleAction(String(key)), }} > - )} + {currentPlatform !== 'chatgpt' && hasUploadCpaAction && ( + handleBatchUploadCpa(getUploadCpaScope())} + okText="确认" + cancelText="取消" + > + + + )} {selectedRowKeys.length > 0 && ( { setAddModalOpen(false); addForm.resetFields(); }} onOk={handleAdd} + okText="确定" + cancelText="取消" maskClosable={false} >
@@ -1450,6 +1563,8 @@ export default function Accounts() { open={importModalOpen} onCancel={() => { setImportModalOpen(false); setImportText(''); }} onOk={handleImport} + okText="确定" + cancelText="取消" confirmLoading={importLoading} maskClosable={false} > @@ -1469,6 +1584,8 @@ export default function Accounts() { open={detailModalOpen} onCancel={() => setDetailModalOpen(false)} onOk={handleDetailSave} + okText="保存" + cancelText="取消" maskClosable={false} width={760} styles={{ body: { maxHeight: '72vh', overflowY: 'auto' } }} diff --git a/frontend/src/pages/RegisterTaskPage.tsx b/frontend/src/pages/RegisterTaskPage.tsx index 8da6a87..88ca08d 100644 --- a/frontend/src/pages/RegisterTaskPage.tsx +++ b/frontend/src/pages/RegisterTaskPage.tsx @@ -268,6 +268,7 @@ export default function RegisterTaskPage() { { value: 'grok', label: 'Grok' }, { value: 'tavily', label: 'Tavily' }, { value: 'openblocklabs', label: 'OpenBlockLabs' }, + { value: 'qwen', label: 'Qwen' }, ]} /> diff --git a/frontend/src/pages/Settings.tsx b/frontend/src/pages/Settings.tsx index 89fa707..f878ea4 100644 --- a/frontend/src/pages/Settings.tsx +++ b/frontend/src/pages/Settings.tsx @@ -376,6 +376,22 @@ const TAB_ITEMS = [ }, ], }, + { + key: 'qwen', + label: 'Qwen', + icon: , + sections: [ + { + title: 'CPA 面板', + desc: '注册完成后自动上传到 CPA 管理平台', + fields: [ + { key: 'qwen_cpa_enabled', label: '启用自动上传', type: 'boolean' }, + { key: 'qwen_cpa_api_url', label: 'API URL', placeholder: 'https://your-cpa.example.com(留空则使用 ChatGPT CPA 地址)' }, + { key: 'qwen_cpa_api_key', label: 'API Key', secret: true, placeholder: '留空则使用 ChatGPT CPA Key' }, + ], + }, + ], + }, { key: 'contribution', label: '贡献', @@ -571,7 +587,7 @@ function ConfigField({ field }: { field: FieldConfig }) { const isBooleanField = field.type === 'boolean' const helpText = field.key === 'default_executor' - ? '仅对支持的平台生效;ChatGPT、Cursor、Grok、Kiro、Tavily、Trae 支持浏览器模式,OpenBlockLabs 仅支持纯协议。' + ? '仅对支持的平台生效;ChatGPT、Cursor、Grok、Kiro、Tavily、Trae、Qwen 支持浏览器模式,OpenBlockLabs 仅支持纯协议。' : field.key === 'email_domain_rule_enabled' ? '仅 CF Worker 生效:开启后会校验域名级数,以及域名至少包含 2 个字母和 2 个数字。' : field.key === 'email_domain_level_count' @@ -928,6 +944,8 @@ function IntegrationsPanel() { title={resultModal.title} onCancel={() => setResultModal((v) => ({ ...v, open: false }))} onOk={() => setResultModal((v) => ({ ...v, open: false }))} + okText="确定" + cancelText="取消" width={760} > @@ -1816,6 +1834,10 @@ export default function Settings() { data.sub2api_enabled, Boolean(String(data.sub2api_api_url ?? '').trim() && String(data.sub2api_api_key ?? '').trim()), ) + data.qwen_cpa_enabled = resolveFeatureEnabledConfig( + data.qwen_cpa_enabled, + false, // 默认关闭,不依赖 cpa_api_url + ) data.cfworker_domains = parseStoredDomainList(data.cfworker_domains) data.cfworker_enabled_domains = parseStoredDomainList(data.cfworker_enabled_domains) data.cfworker_random_subdomain = parseBooleanConfigValue(data.cfworker_random_subdomain) @@ -1886,6 +1908,7 @@ export default function Settings() { } values.cpa_enabled = parseBooleanConfigValue(values.cpa_enabled) values.sub2api_enabled = parseBooleanConfigValue(values.sub2api_enabled) + values.qwen_cpa_enabled = parseBooleanConfigValue(values.qwen_cpa_enabled) values.cfworker_random_subdomain = parseBooleanConfigValue(values.cfworker_random_subdomain) values.cfworker_random_name_subdomain = parseBooleanConfigValue(values.cfworker_random_name_subdomain) values.contribution_enabled = parseBooleanConfigValue(values.contribution_enabled) @@ -1909,6 +1932,7 @@ export default function Settings() { mail_import_source: values.mail_provider === 'applemail' ? 'applemail' : 'microsoft', cpa_enabled: values.cpa_enabled, sub2api_enabled: values.sub2api_enabled, + qwen_cpa_enabled: values.qwen_cpa_enabled, cfworker_domains: domains, cfworker_enabled_domains: enabledDomains, cfworker_domain: domains.length > 0 ? '' : values.cfworker_domain, diff --git a/platforms/qwen/core.py b/platforms/qwen/core.py index a70ae69..1815e4d 100644 --- a/platforms/qwen/core.py +++ b/platforms/qwen/core.py @@ -16,8 +16,11 @@ deferred and can be done by visiting the activation URL from email. """ import json +import base64 +import hashlib import random import re +import secrets import string import time from typing import Callable, Optional @@ -28,6 +31,10 @@ from playwright.sync_api import Page QWEN_AUTH_URL = "https://chat.qwen.ai/auth" QWEN_ACTIVATE_URL = "https://chat.qwen.ai/api/v1/auths/activate" +QWEN_OAUTH_DEVICE_CODE_URL = "https://chat.qwen.ai/api/v1/oauth2/device/code" +QWEN_OAUTH_TOKEN_URL = "https://chat.qwen.ai/api/v1/oauth2/token" +QWEN_OAUTH_CLIENT_ID = "f0304373b74a44d2b584a3fb70ca9e56" +QWEN_OAUTH_SCOPE = "openid profile email model.completion" UA = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " @@ -93,24 +100,385 @@ def call_activation_api(activation_url: str, user_agent: str = UA) -> dict: return {"ok": False, "error": str(e)} +def _b64url_no_padding(data: bytes) -> str: + return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=") + + +def _generate_qwen_pkce_pair() -> tuple[str, str]: + verifier = _b64url_no_padding(secrets.token_bytes(32)) + challenge = _b64url_no_padding(hashlib.sha256(verifier.encode("utf-8")).digest()) + return verifier, challenge + + +def _device_flow_request(code_challenge: str, user_agent: str = UA) -> dict: + resp = requests.post( + QWEN_OAUTH_DEVICE_CODE_URL, + data={ + "client_id": QWEN_OAUTH_CLIENT_ID, + "scope": QWEN_OAUTH_SCOPE, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + }, + headers={ + "User-Agent": user_agent, + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + }, + timeout=20, + ) + if resp.status_code != 200: + raise RuntimeError(f"Qwen OAuth device code failed: HTTP {resp.status_code} {resp.text[:180]}") + data = resp.json() if resp.text else {} + if not isinstance(data, dict): + raise RuntimeError("Qwen OAuth device code response invalid") + if not data.get("device_code") or not data.get("verification_uri_complete"): + raise RuntimeError(f"Qwen OAuth device code missing fields: {data}") + return data + + +def _poll_device_flow_token( + *, + device_code: str, + code_verifier: str, + timeout_seconds: int = 30, + user_agent: str = UA, +) -> dict: + deadline = time.time() + max(6, int(timeout_seconds or 30)) + last_err = "" + while time.time() < deadline: + resp = requests.post( + QWEN_OAUTH_TOKEN_URL, + data={ + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + "client_id": QWEN_OAUTH_CLIENT_ID, + "device_code": device_code, + "code_verifier": code_verifier, + }, + headers={ + "User-Agent": user_agent, + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + }, + timeout=20, + ) + text = resp.text or "" + parsed = {} + if text: + try: + parsed = resp.json() + except Exception: + parsed = {} + if resp.status_code == 200 and isinstance(parsed, dict): + access_token = str(parsed.get("access_token") or "").strip() + refresh_token = str(parsed.get("refresh_token") or "").strip() + if access_token and refresh_token: + return parsed + last_err = f"missing access/refresh token: {parsed}" + break + + if resp.status_code == 400 and isinstance(parsed, dict): + err = str(parsed.get("error") or "") + if err in {"authorization_pending", "slow_down"}: + time.sleep(2 if err == "authorization_pending" else 3) + continue + last_err = f"{err}: {parsed.get('error_description') or ''}".strip(": ") + break + + last_err = f"HTTP {resp.status_code}: {text[:180]}" + time.sleep(2) + + raise RuntimeError(f"Qwen OAuth token polling failed: {last_err or 'timeout'}") + + +def _click_first_confirm_button(page: Page) -> bool: + name_candidates = [ + "确认", + "Confirm", + "Authorize", + "授权", + "同意", + "Allow", + "Approve", + ] + for name in name_candidates: + try: + locator = page.get_by_role("button", name=name, exact=True) + if locator.count() > 0 and locator.first.is_visible(): + locator.first.click() + return True + except Exception: + continue + + selector_candidates = [ + 'button:has-text("确认")', + 'button:has-text("Authorize")', + 'button:has-text("授权")', + 'button:has-text("同意")', + 'button:has-text("Allow")', + ] + for selector in selector_candidates: + try: + locator = page.locator(selector).first + if locator.count() > 0 and locator.is_visible(): + locator.click() + return True + except Exception: + continue + + try: + fallback = page.locator("button").first + if fallback.count() > 0 and fallback.is_visible(): + fallback.click() + return True + except Exception: + pass + return False + + +def _looks_like_oauth_success(page: Page) -> bool: + try: + body_text = page.inner_text("body") + except Exception: + body_text = "" + body_text = str(body_text or "") + return ("认证成功" in body_text) or ("请转到命令行界面" in body_text) + + +def obtain_qwen_oauth_tokens_from_logged_in_page( + page: Page, + *, + log_fn: Callable | None = None, + poll_timeout_seconds: int = 30, +) -> dict: + log = log_fn or (lambda *_args, **_kwargs: None) + try: + code_verifier, code_challenge = _generate_qwen_pkce_pair() + flow = _device_flow_request(code_challenge=code_challenge) + verification_url = str(flow.get("verification_uri_complete") or "").strip() + device_code = str(flow.get("device_code") or "").strip() + if not verification_url or not device_code: + raise RuntimeError(f"Qwen OAuth device flow missing fields: {flow}") + + page.goto(verification_url, wait_until="domcontentloaded", timeout=30000) + for _ in range(12): + if _looks_like_oauth_success(page): + break + try: + if page.locator("button").count() > 0: + break + except Exception: + pass + page.wait_for_timeout(800) + if not _looks_like_oauth_success(page): + clicked = _click_first_confirm_button(page) + if not clicked: + body_preview = "" + try: + body_preview = (page.inner_text("body") or "")[:200].replace("\n", "|") + except Exception: + body_preview = "" + raise RuntimeError( + f"OAuth 授权页未找到可点击的确认按钮; url={page.url}; body={body_preview}" + ) + page.wait_for_timeout(1500) + + token_payload = _poll_device_flow_token( + device_code=device_code, + code_verifier=code_verifier, + timeout_seconds=poll_timeout_seconds, + ) + access_token = str(token_payload.get("access_token") or "").strip() + refresh_token = str(token_payload.get("refresh_token") or "").strip() + resource_url = str(token_payload.get("resource_url") or "").strip() or "portal.qwen.ai" + if not access_token or not refresh_token: + raise RuntimeError("Qwen OAuth token payload missing access/refresh token") + + return { + "oauth_access_token": access_token, + "refresh_token": refresh_token, + "resource_url": resource_url, + "oauth_token_type": str(token_payload.get("token_type") or "").strip(), + "oauth_scope": str(token_payload.get("scope") or "").strip(), + "oauth_expires_in": int(token_payload.get("expires_in") or 0), + } + except Exception as e: + log(f"Qwen OAuth device-flow failed: {e}") + return {} + + +def login_qwen_with_password(page: Page, email: str, password: str, *, log_fn: Callable | None = None) -> bool: + log = log_fn or (lambda *_args, **_kwargs: None) + if not email or not password: + return False + try: + page.goto(f"{QWEN_AUTH_URL}?mode=login", wait_until="domcontentloaded", timeout=30000) + page.wait_for_timeout(800) + page.locator('input[name="email"]').fill(email) + page.locator('input[name="password"]').fill(password) + page.get_by_role("button", name="登录", exact=True).click() + page.wait_for_timeout(3500) + url = str(page.url or "") + if "chat.qwen.ai" not in url: + return False + if "/auth" in url and "mode=login" in url: + return False + body_text = "" + try: + body_text = page.inner_text("body") + except Exception: + body_text = "" + if ("输入您的电子邮箱" in body_text) or ("继续使用 Google 登录" in body_text): + return False + try: + cookies = page.context.cookies("https://chat.qwen.ai") + has_token_cookie = any( + str(item.get("name") or "") == "token" and str(item.get("value") or "").strip() + for item in (cookies or []) + ) + if not has_token_cookie: + return False + except Exception: + return False + return True + except Exception as e: + log(f"Qwen login failed before OAuth flow: {e}") + return False + + +def obtain_qwen_oauth_tokens_with_login( + page: Page, + *, + email: str, + password: str, + log_fn: Callable | None = None, + poll_timeout_seconds: int = 30, +) -> dict: + if not login_qwen_with_password(page, email, password, log_fn=log_fn): + return {} + return obtain_qwen_oauth_tokens_from_logged_in_page( + page, + log_fn=log_fn, + poll_timeout_seconds=poll_timeout_seconds, + ) + + def wait_for_activation_link( mailbox, - mail_acct, + mail_acct=None, + *, + account_email: str = "", timeout: int = 120, before_ids: set = None, + log_fn: Callable | None = None, + max_errors: int = 3, ) -> str | None: - """Poll mailbox for Qwen activation email and extract link.""" + """Poll mailbox for Qwen activation email and extract link. + + Supports: + - CFWorker-style mailbox exposing `_get_mails(email)` + - legacy mailbox exposing `get_messages/get_message_body` + """ + log = log_fn or (lambda *_args, **_kwargs: None) + seen = set(before_ids or []) + + def _decode_mime_raw(raw: str) -> str: + raw = str(raw or "") + if not raw: + return "" + try: + import email + + msg = email.message_from_string(raw) + chunks: list[str] = [] + if msg.is_multipart(): + for part in msg.walk(): + payload = part.get_payload(decode=True) + if payload is None: + continue + charset = part.get_content_charset() or "utf-8" + try: + text = payload.decode(charset, errors="ignore") + except Exception: + text = payload.decode("utf-8", errors="ignore") + if text: + chunks.append(text) + else: + payload = msg.get_payload(decode=True) + if payload is not None: + charset = msg.get_content_charset() or "utf-8" + try: + text = payload.decode(charset, errors="ignore") + except Exception: + text = payload.decode("utf-8", errors="ignore") + if text: + chunks.append(text) + return "\n".join(chunks).strip() + except Exception: + return "" + + def _collect_mail_text(item: dict) -> str: + if not isinstance(item, dict): + return str(item or "") + raw = str(item.get("raw") or "") + decoded_raw = _decode_mime_raw(raw) + return " ".join( + [ + str(item.get("subject") or ""), + raw, + decoded_raw, + str(item.get("text") or ""), + str(item.get("content") or ""), + str(item.get("html") or ""), + str(item.get("body") or ""), + ] + ).strip() + start = time.time() + error_count = 0 while time.time() - start < timeout: time.sleep(5) try: - messages = mailbox.get_messages(mail_acct, before_ids=before_ids) - for msg in messages: + # CFWorker mailbox path + if account_email and hasattr(mailbox, "_get_mails"): + messages = mailbox._get_mails(account_email) or [] + for msg in messages: + mid = str((msg or {}).get("id") or "") + if mid and mid in seen: + continue + if mid: + seen.add(mid) + body = _collect_mail_text(msg) + link = extract_activation_link(body) + if link: + return link + continue + + # Legacy custom mailbox path + if not ( + mail_acct + and hasattr(mailbox, "get_messages") + and hasattr(mailbox, "get_message_body") + ): + log("Activation link polling aborted: mailbox does not expose readable message APIs") + break + + messages = mailbox.get_messages(mail_acct, before_ids=seen) + for msg in messages or []: + mid = str((msg or {}).get("id") or "") + if mid and mid in seen: + continue + if mid: + seen.add(mid) body = mailbox.get_message_body(mail_acct, msg.get("id")) or "" link = extract_activation_link(body) if link: return link - except Exception: + error_count = 0 + except Exception as e: + error_count += 1 + log(f"Activation polling error {error_count}/{max_errors}: {e}") + if error_count >= max(1, int(max_errors or 1)): + break continue return None @@ -124,6 +492,17 @@ class QwenRegister: self.log = log_fn self._max_retries = 2 + @staticmethod + def _resolve_access_token(tokens: dict | None) -> str: + if not isinstance(tokens, dict): + return "" + return str( + tokens.get("token") + or tokens.get("cookie:token") + or tokens.get("access_token") + or "" + ).strip() + def register( self, email: str, @@ -147,21 +526,38 @@ class QwenRegister: "Please select 'headless' or 'headed' executor." ) + last_reason = "no token" for attempt in range(self._max_retries + 1): if attempt > 0: self.log(f" Retry {attempt}/{self._max_retries}...") time.sleep(5) result = self._try_register(page, email, password, full_name) - if result.get("token"): + tokens = result.get("tokens", {}) if isinstance(result, dict) else {} + access_token = self._resolve_access_token(tokens) + if access_token: + if attempt == 0: + self.log(" first-attempt token hit") + else: + self.log(f" token hit on retry {attempt}/{self._max_retries}") return result + last_reason = str(result.get("error") or "no token") if attempt < self._max_retries: - self.log(f" No token, retrying...") + self.log(f" retry reason: {last_reason}") + self.log(" No token, retrying...") else: self.log(f" WARNING: No auth token after {self._max_retries + 1} attempts") + self.log(f" final failure reason(no token): {last_reason}") - return {"email": email, "password": password, "full_name": full_name, "tokens": {}, "status": "failed"} + return { + "email": email, + "password": password, + "full_name": full_name, + "tokens": {}, + "status": "failed", + "error": last_reason, + } def _try_register(self, page: Page, email: str, password: str, full_name: str) -> dict: """One attempt at registration. Returns dict with tokens.""" @@ -242,7 +638,17 @@ class QwenRegister: self.log(f" Current URL: {page.url}") self.log(f" Tokens found: {list(tokens.keys()) if tokens else 'none'}") - if tokens.get("token") or tokens.get("cookie:token"): + if self._resolve_access_token(tokens): + oauth_data = obtain_qwen_oauth_tokens_from_logged_in_page( + page, + log_fn=self.log, + poll_timeout_seconds=20, + ) + if oauth_data: + tokens.update(oauth_data) + self.log(" OAuth refresh_token acquired") + else: + self.log(" OAuth refresh_token not acquired (continue with web token)") return { "email": email, "password": password, @@ -251,11 +657,48 @@ class QwenRegister: "status": "success", } - return {"tokens": {}} + return { + "email": email, + "password": password, + "full_name": full_name, + "tokens": tokens, + "status": "failed", + "error": "no token after submit", + } except Exception as e: - self.log(f" Error: {e}") - return {"tokens": {}} + err = str(e) + self.log(f" Error: {err}") + + # Fallback: even when selector steps fail, page may already have token. + tokens = {} + try: + tokens = self._extract_tokens(page) + self.log( + f" Fallback token check after error: " + f"{list(tokens.keys()) if tokens else 'none'}" + ) + except Exception: + tokens = {} + + if self._resolve_access_token(tokens): + self.log(" token recovered by fallback extraction") + return { + "email": email, + "password": password, + "full_name": full_name, + "tokens": tokens, + "status": "success", + } + + return { + "email": email, + "password": password, + "full_name": full_name, + "tokens": tokens, + "status": "failed", + "error": err, + } # ---- helpers ---- diff --git a/platforms/qwen/cpa_upload.py b/platforms/qwen/cpa_upload.py new file mode 100644 index 0000000..27e2490 --- /dev/null +++ b/platforms/qwen/cpa_upload.py @@ -0,0 +1,85 @@ +"""Qwen 账号导入 CPA 辅助。""" + +from __future__ import annotations + +import base64 +import json +from datetime import datetime, timezone, timedelta +from typing import Any + + +def _decode_jwt_payload(token: str) -> dict[str, Any]: + raw = str(token or "").strip() + if not raw: + return {} + parts = raw.split(".") + if len(parts) < 2: + return {} + payload = parts[1] + padding = 4 - len(payload) % 4 + if padding != 4: + payload += "=" * padding + try: + decoded = base64.urlsafe_b64decode(payload.encode("utf-8")) + data = json.loads(decoded.decode("utf-8", errors="ignore")) + return data if isinstance(data, dict) else {} + except Exception: + return {} + + +def generate_token_json(account: Any) -> dict[str, Any]: + """生成适配 CPA 管理端 auth-file 的 Qwen token JSON。""" + email = str(getattr(account, "email", "") or "").strip() + access_token = str(getattr(account, "access_token", "") or "").strip() + refresh_token = str(getattr(account, "refresh_token", "") or "").strip() + resource_url = str(getattr(account, "resource_url", "") or "").strip() + if not resource_url: + resource_url = "portal.qwen.ai" + payload = _decode_jwt_payload(access_token) if access_token else {} + + account_id = str( + payload.get("id") + or payload.get("user_id") + or payload.get("sub") + or "" + ).strip() + + expired_str = "" + exp_timestamp = payload.get("exp") + if isinstance(exp_timestamp, int) and exp_timestamp > 0: + exp_dt = datetime.fromtimestamp(exp_timestamp, tz=timezone(timedelta(hours=8))) + expired_str = exp_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00") + + now = datetime.now(tz=timezone(timedelta(hours=8))) + return { + "type": "qwen", + "provider": "qwen", + "email": email, + "expired": expired_str, + "account_id": account_id, + "access_token": access_token, + "refresh_token": refresh_token, + "resource_url": resource_url, + "last_refresh": now.strftime("%Y-%m-%dT%H:%M:%S+08:00"), + } + + +def upload_to_cpa( + token_data: dict[str, Any], + api_url: str | None = None, + api_key: str | None = None, +) -> tuple[bool, str]: + """复用通用 CPA 上传实现。""" + access_token = str((token_data or {}).get("access_token") or "").strip() + refresh_token = str((token_data or {}).get("refresh_token") or "").strip() + if not access_token: + return False, "Qwen 凭证缺少 access_token" + if not refresh_token: + return ( + False, + "Qwen 凭证缺少 refresh_token(需 Qwen OAuth 设备登录凭证,普通网页 token 不可用)", + ) + + from platforms.chatgpt.cpa_upload import upload_to_cpa as _upload + + return _upload(token_data, api_url=api_url, api_key=api_key) diff --git a/platforms/qwen/plugin.py b/platforms/qwen/plugin.py index 605b3d2..de43dfd 100644 --- a/platforms/qwen/plugin.py +++ b/platforms/qwen/plugin.py @@ -7,11 +7,89 @@ Registration flow (verified): - Activation URL is called via API to complete registration """ +import json + from core.base_platform import BasePlatform, Account, AccountStatus, RegisterConfig from core.base_mailbox import BaseMailbox from core.registry import register +def _normalize_key(key: str) -> str: + return str(key or "").strip().lower().replace("-", "").replace("_", "") + + +def _parse_json_like(raw: str): + text = str(raw or "").strip() + if not text: + return None + if not ((text.startswith("{") and text.endswith("}")) or (text.startswith("[") and text.endswith("]"))): + return None + try: + return json.loads(text) + except Exception: + return None + + +def _find_value_by_keys(obj, key_candidates: set[str], *, depth: int = 0, max_depth: int = 5) -> str: + if depth > max_depth: + return "" + if isinstance(obj, dict): + for key, value in obj.items(): + if _normalize_key(key) in key_candidates: + if isinstance(value, str) and value.strip(): + return value.strip() + if isinstance(value, (dict, list)): + found = _find_value_by_keys(value, key_candidates, depth=depth + 1, max_depth=max_depth) + if found: + return found + elif isinstance(value, str): + parsed = _parse_json_like(value) + if parsed is not None: + found = _find_value_by_keys(parsed, key_candidates, depth=depth + 1, max_depth=max_depth) + if found: + return found + elif isinstance(obj, list): + for item in obj: + found = _find_value_by_keys(item, key_candidates, depth=depth + 1, max_depth=max_depth) + if found: + return found + elif isinstance(obj, str): + parsed = _parse_json_like(obj) + if parsed is not None: + return _find_value_by_keys(parsed, key_candidates, depth=depth + 1, max_depth=max_depth) + return "" + + +def _extract_qwen_oauth_fields(raw_tokens) -> tuple[str, str, str]: + oauth_access_keys = { + "oauthaccesstoken", + "qwenoauthaccesstoken", + "oauth_token", + "oauthaccess", + } + refresh_keys = { + "refreshtoken", + "refreshtokenvalue", + "qwenrefreshtoken", + "oauthrefreshtoken", + } + resource_keys = { + "resourceurl", + "qwenresourceurl", + "oauthresourceurl", + "baseurl", + "apiurl", + } + oauth_access_token = ( + _find_value_by_keys(raw_tokens, oauth_access_keys) + if isinstance(raw_tokens, (dict, list, str)) + else "" + ) + refresh_token = _find_value_by_keys(raw_tokens, refresh_keys) if isinstance(raw_tokens, (dict, list, str)) else "" + resource_url = _find_value_by_keys(raw_tokens, resource_keys) if isinstance(raw_tokens, (dict, list, str)) else "" + return oauth_access_token, refresh_token, resource_url + + @register class QwenPlatform(BasePlatform): name = "qwen" @@ -28,7 +106,8 @@ class QwenPlatform(BasePlatform): from platforms.qwen.core import ( QwenRegister, call_activation_api, - extract_activation_link, + wait_for_activation_link, + obtain_qwen_oauth_tokens_with_login, ) log = getattr(self, "_log_fn", print) @@ -38,7 +117,8 @@ class QwenPlatform(BasePlatform): if not email: raise RuntimeError("Qwen registration requires an email address") - log(f"Email: {email}") + log(f"[Qwen] 开始注册流程") + log(f"[Qwen] 邮箱: {email}") before_ids = self.mailbox.get_current_ids(mail_acct) if mail_acct else set() otp_timeout = self.get_mailbox_otp_timeout() @@ -46,50 +126,100 @@ class QwenPlatform(BasePlatform): reg = QwenRegister(executor=ex, log_fn=log) result = reg.register(email=email, password=password) - if result.get("status") != "success": - return Account( - platform="qwen", - email=email, - password=password or "", - token="", - status=AccountStatus.REGISTERED, - extra={"error": "Registration failed — no token cookie"}, - ) - tokens = result.get("tokens", {}) - # Token is in cookie:token from Qwen access_token = ( tokens.get("token") or tokens.get("cookie:token") or tokens.get("access_token", "") ) - # Try activation if mailbox available + if result.get("status") != "success": + reason = str(result.get("error") or "no token") + log(f"[Qwen] 注册失败: {reason}") + raise RuntimeError(f"Qwen registration failed: {reason}") + if not access_token: + log("[Qwen] 注册失败: 未获取到 access token") + raise RuntimeError("Qwen registration failed: no auth token extracted") + + log("[Qwen] 注册成功,开始激活流程...") + activated = False if self.mailbox and mail_acct: - log("Waiting for activation email...") - activation_link = None - for _ in range(min(10, otp_timeout // 5)): - import time - time.sleep(3) - try: - messages = self.mailbox.get_messages(mail_acct, before_ids=before_ids) - for msg in messages: - body = self.mailbox.get_message_body(mail_acct, msg.get("id")) or "" - link = extract_activation_link(body) - if link: - activation_link = link - break - if activation_link: - break - except Exception: - continue + log("[Qwen] 等待激活邮件...") + activation_link = wait_for_activation_link( + self.mailbox, + mail_acct=mail_acct, + account_email=email, + timeout=max(30, min(120, int(otp_timeout or 120))), + before_ids=before_ids, + log_fn=log, + ) if activation_link: - log(f"Activation link found, activating...") + log(f"[Qwen] 找到激活链接,正在激活...") act_result = call_activation_api(activation_link) activated = act_result.get("ok", False) - log(f"Activation: {'SUCCESS' if activated else 'FAILED'}") + log(f"[Qwen] 激活结果: {'成功' if activated else '失败'}") + else: + log("[Qwen] 未找到激活链接,跳过激活") + else: + log("[Qwen] 无邮箱服务,跳过激活") + + raw_tokens = tokens if isinstance(tokens, dict) else {} + oauth_access_token, refresh_token, resource_url = _extract_qwen_oauth_fields(raw_tokens) + + if not refresh_token: + log("[Qwen] 未获取到 refresh_token,尝试通过登录获取 OAuth tokens...") + with self._make_executor() as ex: + oauth_data = obtain_qwen_oauth_tokens_with_login( + ex.page, + email=str(result.get("email") or email), + password=str(result.get("password") or password or ""), + log_fn=log, + poll_timeout_seconds=20, + ) + if oauth_data: + got_refresh = str(oauth_data.get("refresh_token") or "").strip() + if got_refresh: + refresh_token = got_refresh + oauth_access_token = str( + oauth_data.get("oauth_access_token") + or oauth_data.get("access_token") + or oauth_access_token + or "" + ).strip() + resource_url = str( + oauth_data.get("resource_url") + or resource_url + or "portal.qwen.ai" + ).strip() + log(f"[Qwen] 成功获取 OAuth tokens") + log(f"[Qwen] refresh_token: {refresh_token[:20]}...") + log(f"[Qwen] resource_url: {resource_url}") + else: + log("[Qwen] OAuth 登录成功但未获取到 refresh_token") + else: + log("[Qwen] OAuth 登录失败,无法获取 refresh_token") + else: + log(f"[Qwen] 已获取 refresh_token: {refresh_token[:20]}...") + + extra = { + "activated": activated, + "full_name": result.get("full_name", ""), + "raw_tokens": raw_tokens, + } + if oauth_access_token: + extra["oauth_access_token"] = oauth_access_token + extra["qwen_oauth_access_token"] = oauth_access_token + if refresh_token: + extra["refresh_token"] = refresh_token + extra["qwen_refresh_token"] = refresh_token + if resource_url: + extra["resource_url"] = resource_url + extra["qwen_resource_url"] = resource_url + + log(f"[Qwen] 注册流程完成,激活状态: {'已激活' if activated else '未激活'}") + log(f"[Qwen] refresh_token 状态: {'已获取' if refresh_token else '未获取'}") return Account( platform="qwen", @@ -97,11 +227,7 @@ class QwenPlatform(BasePlatform): password=result["password"], token=access_token, status=AccountStatus.REGISTERED, - extra={ - "activated": activated, - "full_name": result.get("full_name", ""), - "raw_tokens": tokens, - }, + extra=extra, ) def check_valid(self, account: Account) -> bool: @@ -114,7 +240,7 @@ class QwenPlatform(BasePlatform): try: r = curl_req.get( - "https://chat.qwen.ai/api/v1/user/profile", + "https://chat.qwen.ai/api/v1/chats", headers={ "Authorization": f"Bearer {token}", "user-agent": ( @@ -133,76 +259,261 @@ class QwenPlatform(BasePlatform): def get_platform_actions(self) -> list: """Return platform-specific actions.""" return [ - {"id": "activate_account", "label": "Activate Account", "params": []}, - {"id": "get_user_info", "label": "Get User Info", "params": []}, + {"id": "activate_account", "label": "激活账号", "params": []}, + {"id": "get_user_info", "label": "获取用户信息", "params": []}, + { + "id": "upload_cpa", + "label": "导入 CPA", + "params": [ + {"key": "api_url", "label": "CPA API URL", "type": "text"}, + {"key": "api_key", "label": "CPA API Key", "type": "text"}, + ], + }, ] def execute_action(self, action_id: str, account: Account, params: dict) -> dict: """Execute platform-specific actions.""" from curl_cffi import requests as curl_req - from platforms.qwen.core import call_activation_api + from platforms.qwen.core import call_activation_api, wait_for_activation_link if action_id == "activate_account": - # Try to find activation link from email and activate + # Try to find activation link from mailbox and activate if not self.mailbox: - return {"ok": False, "error": "No mailbox configured for activation"} + try: + from core.base_mailbox import create_mailbox - email = account.email - password = account.password or "" - mail_acct = self.mailbox.get_email() - if not mail_acct or mail_acct.email != email: - return {"ok": False, "error": f"No mailbox for {email}"} + cfg_extra = (self.config.extra or {}) if self.config else {} + provider = str(cfg_extra.get("mail_provider", "luckmail") or "luckmail") + self.mailbox = create_mailbox( + provider=provider, + extra=cfg_extra, + proxy=self.config.proxy if self.config else None, + ) + except Exception as e: + return {"ok": False, "error": f"未配置可用邮箱,无法激活: {e}"} try: - from platforms.qwen.core import extract_activation_link - import time + default_wait_seconds = self.get_mailbox_otp_timeout(default=60) + wait_seconds = int(params.get("wait_seconds") or default_wait_seconds) + wait_seconds = max(5, min(120, wait_seconds)) + mail_acct = None + before_ids = None + supports_direct_email_lookup = hasattr(self.mailbox, "_get_mails") + if ( + not supports_direct_email_lookup + and hasattr(self.mailbox, "get_current_ids") + and hasattr(self.mailbox, "get_email") + ): + try: + mail_acct = self.mailbox.get_email() + if mail_acct: + before_ids = self.mailbox.get_current_ids(mail_acct) + except Exception: + mail_acct = None + before_ids = None - before_ids = self.mailbox.get_current_ids(mail_acct) - activation_link = None - for _ in range(24): # 120s - time.sleep(5) - messages = self.mailbox.get_messages(mail_acct, before_ids=before_ids) - for msg in messages: - body = self.mailbox.get_message_body(mail_acct, msg.get("id")) or "" - link = extract_activation_link(body) - if link: - activation_link = link - break - if activation_link: - break + activation_link = wait_for_activation_link( + self.mailbox, + mail_acct=mail_acct, + account_email=account.email or "", + timeout=wait_seconds, + before_ids=before_ids, + log_fn=getattr(self, "_log_fn", print), + ) if activation_link: act_result = call_activation_api(activation_link) if act_result.get("ok"): - return {"ok": True, "message": "Account activated"} - return {"ok": False, "error": f"Activation failed: {act_result}"} - return {"ok": False, "error": "No activation email found"} + return {"ok": True, "message": "账号激活成功"} + return {"ok": False, "error": f"激活请求失败: {act_result}"} + return {"ok": False, "error": f"在 {wait_seconds}s 内未找到激活邮件"} except Exception as e: return {"ok": False, "error": str(e)} if action_id == "get_user_info": token = account.token if not token: - return {"ok": False, "error": "Account missing token"} + return {"ok": False, "error": "账号缺少 token"} + def _decode_jwt_payload(raw_token: str) -> dict: + import base64 + import json as _json + + parts = str(raw_token or "").split(".") + if len(parts) < 2: + return {} + payload = parts[1] + padded = payload + "=" * ((4 - len(payload) % 4) % 4) + try: + decoded = base64.urlsafe_b64decode(padded.encode("utf-8")).decode( + "utf-8", errors="ignore" + ) + obj = _json.loads(decoded) + return obj if isinstance(obj, dict) else {} + except Exception: + return {} + + headers = { + "Authorization": f"Bearer {token}", + "user-agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/131.0.0.0 Safari/537.36" + ), + } + candidates = [ + ("users_me", "https://chat.qwen.ai/api/v1/users/me"), + ("users_profile", "https://chat.qwen.ai/api/v1/users/profile"), + ("chats", "https://chat.qwen.ai/api/v1/chats"), + ] + source_labels = { + "users_me": "用户信息接口(me)", + "users_profile": "用户资料接口(profile)", + "chats": "会话列表接口", + } + attempts = [] try: - r = curl_req.get( - "https://chat.qwen.ai/api/v1/user/profile", - headers={ - "Authorization": f"Bearer {token}", - "user-agent": ( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/131.0.0.0 Safari/537.36" - ), - }, - impersonate="chrome124", - timeout=15, - ) - if r.status_code == 200: - return {"ok": True, "data": r.json()} - return {"ok": False, "error": f"Failed: HTTP {r.status_code}"} + for source, url in candidates: + r = curl_req.get( + url, + headers=headers, + impersonate="chrome124", + timeout=15, + ) + attempts.append( + { + "source": source, + "url": url, + "status_code": r.status_code, + } + ) + if r.status_code != 200: + continue + + data = None + try: + data = r.json() + except Exception: + data = {"raw": (r.text or "")[:1000]} + + token_payload = _decode_jwt_payload(token) + payload_summary = { + "id": token_payload.get("id"), + "exp": token_payload.get("exp"), + "last_password_change": token_payload.get("last_password_change"), + } + + attempts_cn = [ + { + "接口": source_labels.get(item["source"], item["source"]), + "地址": item["url"], + "状态码": item["status_code"], + } + for item in attempts + ] + + result = { + "来源": source_labels.get(source, source), + "接口地址": url, + "HTTP状态": r.status_code, + "用户ID": payload_summary.get("id"), + "Token过期时间": payload_summary.get("exp"), + "最近改密时间": payload_summary.get("last_password_change"), + "尝试记录": attempts_cn, + "原始数据": data, + } + if source == "chats" and isinstance(data, list): + result["会话数量"] = len(data) + return {"ok": True, "data": result} + + return { + "ok": False, + "error": ( + "查询用户信息失败: " + + ", ".join( + f"{item['source']}={item['status_code']}" for item in attempts + ) + ), + } except Exception as e: return {"ok": False, "error": str(e)} + if action_id == "upload_cpa": + from platforms.qwen.cpa_upload import generate_token_json, upload_to_cpa + from platforms.qwen.core import obtain_qwen_oauth_tokens_with_login + + class _A: + pass + + extra = account.extra if isinstance(account.extra, dict) else {} + raw_tokens = extra.get("raw_tokens") if isinstance(extra.get("raw_tokens"), dict) else {} + raw_oa, raw_rt, raw_ru = _extract_qwen_oauth_fields(raw_tokens) + oauth_access_token = str( + extra.get("oauth_access_token") + or extra.get("qwen_oauth_access_token") + or raw_oa + or "" + ).strip() + refresh_token = str( + extra.get("refresh_token") + or extra.get("qwen_refresh_token") + or raw_rt + or "" + ).strip() + resource_url = str( + extra.get("resource_url") + or extra.get("qwen_resource_url") + or raw_ru + or "" + ).strip() + + account_extra_patch = {} + if not refresh_token and account.email and account.password: + with self._make_executor() as ex: + oauth_data = obtain_qwen_oauth_tokens_with_login( + ex.page, + email=str(account.email or ""), + password=str(account.password or ""), + log_fn=getattr(self, "_log_fn", print), + poll_timeout_seconds=20, + ) + got_refresh = str(oauth_data.get("refresh_token") or "").strip() + if got_refresh: + oauth_access_token = str( + oauth_data.get("oauth_access_token") + or oauth_data.get("access_token") + or oauth_access_token + or "" + ).strip() + refresh_token = got_refresh + resource_url = str( + oauth_data.get("resource_url") + or resource_url + or "portal.qwen.ai" + ).strip() + account_extra_patch = { + "oauth_access_token": oauth_access_token, + "qwen_oauth_access_token": oauth_access_token, + "refresh_token": refresh_token, + "qwen_refresh_token": refresh_token, + "resource_url": resource_url, + "qwen_resource_url": resource_url, + } + + a = _A() + a.email = account.email + a.access_token = oauth_access_token or account.token or "" + a.refresh_token = refresh_token + a.resource_url = resource_url + token_data = generate_token_json(a) + ok, msg = upload_to_cpa( + token_data, + api_url=params.get("api_url"), + api_key=params.get("api_key"), + ) + resp = {"ok": ok, "data": msg} + if account_extra_patch: + resp["account_extra_patch"] = account_extra_patch + return resp + raise NotImplementedError(f"Unknown action: {action_id}") diff --git a/services/external_sync.py b/services/external_sync.py index e9ce333..b5bec91 100644 --- a/services/external_sync.py +++ b/services/external_sync.py @@ -234,4 +234,65 @@ def sync_account(account) -> list[dict[str, Any]]: ok, msg = upload_to_kiro_manager(account, path=configured_path or None) results.append({"name": "Kiro Manager", "ok": ok, "msg": msg}) + elif platform == "qwen": + qwen_cpa_url = str(config_store.get("qwen_cpa_api_url", "") or "").strip() + if not qwen_cpa_url: + qwen_cpa_url = str(config_store.get("cpa_api_url", "") or "").strip() + qwen_cpa_enabled = _is_config_enabled( + config_store.get("qwen_cpa_enabled", ""), + default=bool(qwen_cpa_url), + ) + if qwen_cpa_enabled and qwen_cpa_url: + from platforms.qwen.cpa_upload import generate_token_json, upload_to_cpa + + class _QwenAccount: + pass + + qwen_account = _QwenAccount() + qwen_account.email = account.email + # 支持 AccountModel (extra_json) 和 Account (extra) 两种格式 + extra = {} + if hasattr(account, "extra"): + extra = account.extra or {} + elif hasattr(account, "get_extra"): + extra = account.get_extra() or {} + elif hasattr(account, "extra_json"): + import json + try: + extra = json.loads(account.extra_json or "{}") + except Exception: + extra = {} + qwen_account.access_token = _pick_text( + extra, + "oauth_access_token", + "qwen_oauth_access_token", + "access_token", + ) or account.token + qwen_account.refresh_token = _pick_text( + extra, + "refresh_token", + "qwen_refresh_token", + ) + qwen_account.resource_url = _pick_text( + extra, + "resource_url", + "qwen_resource_url", + default="portal.qwen.ai", + ) + + if not qwen_account.refresh_token: + msg = "Qwen 账号缺少 refresh_token,无法同步到 CPA" + results.append({"name": "Qwen CPA", "ok": False, "msg": msg}) + else: + token_data = generate_token_json(qwen_account) + qwen_cpa_key = str(config_store.get("qwen_cpa_api_key", "") or "").strip() + if not qwen_cpa_key: + qwen_cpa_key = str(config_store.get("cpa_api_key", "") or "").strip() + ok, msg = upload_to_cpa( + token_data, + api_url=qwen_cpa_url, + api_key=qwen_cpa_key or None, + ) + results.append({"name": "Qwen CPA", "ok": ok, "msg": msg}) + return results diff --git a/tests/test_qwen_registration.py b/tests/test_qwen_registration.py new file mode 100644 index 0000000..c4afeb6 --- /dev/null +++ b/tests/test_qwen_registration.py @@ -0,0 +1,473 @@ +import unittest +from unittest import mock + +from core.base_mailbox import MailboxAccount +from core.base_platform import Account, RegisterConfig +from platforms.qwen.core import QwenRegister, wait_for_activation_link +from platforms.qwen.cpa_upload import generate_token_json, upload_to_cpa +from platforms.qwen.plugin import QwenPlatform + + +class _DummyExecutor: + page = object() + + +class _DummyExecutorContext: + def __enter__(self): + return _DummyExecutor() + + def __exit__(self, exc_type, exc, tb): + return False + + +class _SequenceQwenRegister(QwenRegister): + def __init__(self, results, logs): + super().__init__(executor=_DummyExecutor(), log_fn=logs.append) + self._results = list(results) + self.calls = 0 + + def _try_register(self, page, email, password, full_name): + current = self._results[min(self.calls, len(self._results) - 1)] + self.calls += 1 + return dict(current) + + +class _FakeCFWorkerMailbox: + def __init__(self, mails): + self._mails = mails + + def _get_mails(self, _email: str): + return list(self._mails) + + +class _FakeCFWorkerMailboxWithDifferentCurrentEmail(_FakeCFWorkerMailbox): + def get_email(self): + return MailboxAccount(email="other@example.com", account_id="dummy") + + def get_current_ids(self, _account): + return set() + + +class _FakeHttpResp: + def __init__(self, status_code, payload=None, text=""): + self.status_code = status_code + self._payload = payload + self.text = text + + def json(self): + if self._payload is None: + raise ValueError("no json") + return self._payload + + +class QwenRegistrationTests(unittest.TestCase): + def test_get_platform_actions_contains_upload_cpa(self): + platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None) + actions = platform.get_platform_actions() + action_ids = [item.get("id") for item in actions] + self.assertIn("upload_cpa", action_ids) + + def test_register_stops_immediately_when_tokens_exist(self): + logs = [] + reg = _SequenceQwenRegister( + results=[ + { + "email": "demo@example.com", + "password": "Abc123!@#", + "full_name": "Demo", + "tokens": {"cookie:token": "tok_demo"}, + "status": "success", + } + ], + logs=logs, + ) + + result = reg.register(email="demo@example.com", password="Abc123!@#", full_name="Demo") + + self.assertEqual(reg.calls, 1) + self.assertEqual(result.get("status"), "success") + self.assertEqual(result.get("tokens", {}).get("cookie:token"), "tok_demo") + self.assertTrue(any("first-attempt token hit" in msg for msg in logs)) + + def test_register_returns_failed_after_retry_exhausted(self): + logs = [] + reg = _SequenceQwenRegister( + results=[ + {"status": "failed", "tokens": {}, "error": "attempt-1"}, + {"status": "failed", "tokens": {}, "error": "attempt-2"}, + {"status": "failed", "tokens": {}, "error": "attempt-3"}, + ], + logs=logs, + ) + + with mock.patch("platforms.qwen.core.time.sleep", return_value=None): + result = reg.register(email="demo@example.com", password="Abc123!@#", full_name="Demo") + + self.assertEqual(reg.calls, 3) + self.assertEqual(result.get("status"), "failed") + self.assertEqual(result.get("error"), "attempt-3") + self.assertTrue(any("final failure reason(no token): attempt-3" in msg for msg in logs)) + + def test_plugin_success_requires_non_empty_token(self): + platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None) + fake_result = { + "status": "success", + "email": "demo@example.com", + "password": "Abc123!@#", + "tokens": {"cookie:token": "tok_ok"}, + } + + with mock.patch.object(platform, "_make_executor", return_value=_DummyExecutorContext()): + with mock.patch("platforms.qwen.core.QwenRegister.register", return_value=fake_result): + account = platform.register(email="demo@example.com", password="Abc123!@#") + + self.assertEqual(account.email, "demo@example.com") + self.assertEqual(account.token, "tok_ok") + + def test_plugin_raises_when_registration_status_failed(self): + platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None) + fake_result = { + "status": "failed", + "email": "demo@example.com", + "password": "Abc123!@#", + "tokens": {}, + "error": "no token", + } + + with mock.patch.object(platform, "_make_executor", return_value=_DummyExecutorContext()): + with mock.patch("platforms.qwen.core.QwenRegister.register", return_value=fake_result): + with self.assertRaises(RuntimeError): + platform.register(email="demo@example.com", password="Abc123!@#") + + def test_plugin_raises_when_success_but_token_missing(self): + platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None) + fake_result = { + "status": "success", + "email": "demo@example.com", + "password": "Abc123!@#", + "tokens": {}, + } + + with mock.patch.object(platform, "_make_executor", return_value=_DummyExecutorContext()): + with mock.patch("platforms.qwen.core.QwenRegister.register", return_value=fake_result): + with self.assertRaises(RuntimeError): + platform.register(email="demo@example.com", password="Abc123!@#") + + def test_plugin_register_extracts_oauth_fields_from_raw_tokens(self): + platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None) + fake_result = { + "status": "success", + "email": "demo@example.com", + "password": "Abc123!@#", + "tokens": { + "cookie:token": "tok_ok", + "oauth_payload": ( + '{"oauth_access_token":"oa_demo",' + '"refreshToken":"rt_demo","resource_url":"portal.qwen.ai"}' + ), + }, + } + + with mock.patch.object(platform, "_make_executor", return_value=_DummyExecutorContext()): + with mock.patch("platforms.qwen.core.QwenRegister.register", return_value=fake_result): + account = platform.register(email="demo@example.com", password="Abc123!@#") + + self.assertEqual(account.token, "tok_ok") + self.assertEqual((account.extra or {}).get("oauth_access_token"), "oa_demo") + self.assertEqual((account.extra or {}).get("refresh_token"), "rt_demo") + self.assertEqual((account.extra or {}).get("resource_url"), "portal.qwen.ai") + + def test_activate_action_can_bootstrap_mailbox_and_activate(self): + platform = QwenPlatform( + config=RegisterConfig(extra={"mail_provider": "cfworker"}), + mailbox=None, + ) + account = Account( + platform="qwen", + email="demo@example.com", + password="Abc123!@#", + token="", + ) + fake_mailbox = _FakeCFWorkerMailbox( + mails=[ + { + "id": 1, + "subject": "Activate your Qwen account", + "raw": ( + "Click to activate: " + "https://chat.qwen.ai/api/v1/auths/activate?id=abc&token=def" + ), + } + ] + ) + + with mock.patch("core.base_mailbox.create_mailbox", return_value=fake_mailbox): + with mock.patch("platforms.qwen.core.call_activation_api", return_value={"ok": True}): + with mock.patch("platforms.qwen.core.time.sleep", return_value=None): + result = platform.execute_action("activate_account", account, {}) + + self.assertTrue(result.get("ok")) + + def test_activate_action_reports_timeout_with_default_wait_seconds(self): + platform = QwenPlatform( + config=RegisterConfig( + extra={ + "mail_provider": "cfworker", + "mailbox_otp_timeout_seconds": 30, + } + ), + mailbox=None, + ) + account = Account( + platform="qwen", + email="demo@example.com", + password="Abc123!@#", + token="", + ) + fake_mailbox = _FakeCFWorkerMailbox(mails=[]) + + with mock.patch("core.base_mailbox.create_mailbox", return_value=fake_mailbox): + with mock.patch("platforms.qwen.core.wait_for_activation_link", return_value=None): + result = platform.execute_action("activate_account", account, {}) + + self.assertFalse(result.get("ok")) + self.assertIn("在 30s 内未找到激活邮件", str(result.get("error"))) + + def test_activate_action_ignores_current_mailbox_email_for_cfworker_lookup(self): + platform = QwenPlatform( + config=RegisterConfig(extra={"mail_provider": "cfworker"}), + mailbox=None, + ) + account = Account( + platform="qwen", + email="target@example.com", + password="Abc123!@#", + token="", + ) + fake_mailbox = _FakeCFWorkerMailboxWithDifferentCurrentEmail( + mails=[ + { + "id": 1, + "subject": "Activate", + "raw": "https://chat.qwen.ai/api/v1/auths/activate?id=abc&token=def", + } + ] + ) + + with mock.patch("core.base_mailbox.create_mailbox", return_value=fake_mailbox): + with mock.patch("platforms.qwen.core.call_activation_api", return_value={"ok": True}): + with mock.patch("platforms.qwen.core.time.sleep", return_value=None): + result = platform.execute_action("activate_account", account, {}) + + self.assertTrue(result.get("ok")) + + def test_wait_for_activation_link_can_decode_base64_html_raw_mail(self): + import base64 + + html = ( + '' + 'Activate' + "" + ) + b64 = base64.b64encode(html.encode("utf-8")).decode("ascii") + raw = ( + "MIME-Version: 1.0\r\n" + "Content-Type: text/html; charset=utf-8\r\n" + "Content-Transfer-Encoding: base64\r\n" + "\r\n" + f"{b64}\r\n" + ) + mailbox = _FakeCFWorkerMailbox( + mails=[{"id": 1, "subject": "Activate", "raw": raw}] + ) + + with mock.patch("platforms.qwen.core.time.sleep", return_value=None): + link = wait_for_activation_link( + mailbox, + account_email="target@example.com", + timeout=5, + ) + + self.assertEqual( + link, + "https://chat.qwen.ai/api/v1/auths/activate?id=abc&token=def", + ) + + def test_get_user_info_fallbacks_to_chats_endpoint(self): + # Header: {"alg":"HS256","typ":"JWT"} + # Payload: {"id":"u1","exp":1778728455} + token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6InUxIiwiZXhwIjoxNzc4NzI4NDU1fQ.sig" + platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None) + account = Account( + platform="qwen", + email="demo@example.com", + password="Abc123!@#", + token=token, + ) + + responses = [ + _FakeHttpResp(404, {"detail": "Not Found"}), + _FakeHttpResp(403, {"detail": "restricted"}), + _FakeHttpResp(200, []), + ] + + with mock.patch("curl_cffi.requests.get", side_effect=responses): + result = platform.execute_action("get_user_info", account, {}) + + self.assertTrue(result.get("ok")) + data = result.get("data", {}) + self.assertEqual(data.get("来源"), "会话列表接口") + self.assertEqual(data.get("会话数量"), 0) + self.assertEqual(data.get("用户ID"), "u1") + + def test_upload_cpa_action_uses_qwen_uploader(self): + platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None) + account = Account( + platform="qwen", + email="demo@example.com", + password="Abc123!@#", + token="qwen_token_abc", + extra={ + "refresh_token": "qwen_refresh_token_xyz", + "resource_url": "portal.qwen.ai", + }, + ) + + with mock.patch( + "platforms.qwen.cpa_upload.generate_token_json", + return_value={"email": "demo@example.com", "access_token": "qwen_token_abc"}, + ) as build_mock: + with mock.patch( + "platforms.qwen.cpa_upload.upload_to_cpa", + return_value=(True, "上传成功"), + ) as upload_mock: + result = platform.execute_action( + "upload_cpa", + account, + {"api_url": "http://cpa.local", "api_key": "k"}, + ) + + self.assertTrue(result.get("ok")) + self.assertEqual(result.get("data"), "上传成功") + build_arg = build_mock.call_args.args[0] + self.assertEqual(getattr(build_arg, "email", ""), "demo@example.com") + self.assertEqual(getattr(build_arg, "access_token", ""), "qwen_token_abc") + self.assertEqual(getattr(build_arg, "refresh_token", ""), "qwen_refresh_token_xyz") + self.assertEqual(getattr(build_arg, "resource_url", ""), "portal.qwen.ai") + upload_mock.assert_called_once_with( + {"email": "demo@example.com", "access_token": "qwen_token_abc"}, + api_url="http://cpa.local", + api_key="k", + ) + + def test_upload_cpa_action_reads_refresh_from_raw_tokens(self): + platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None) + account = Account( + platform="qwen", + email="demo@example.com", + password="Abc123!@#", + token="qwen_token_abc", + extra={ + "raw_tokens": { + "oauth_payload": '{"refreshToken":"rt_raw","resource_url":"portal.qwen.ai"}', + } + }, + ) + + with mock.patch( + "platforms.qwen.cpa_upload.generate_token_json", + return_value={"email": "demo@example.com", "access_token": "qwen_token_abc"}, + ) as build_mock: + with mock.patch( + "platforms.qwen.cpa_upload.upload_to_cpa", + return_value=(True, "上传成功"), + ): + result = platform.execute_action( + "upload_cpa", + account, + {"api_url": "http://cpa.local", "api_key": "k"}, + ) + + self.assertTrue(result.get("ok")) + build_arg = build_mock.call_args.args[0] + self.assertEqual(getattr(build_arg, "refresh_token", ""), "rt_raw") + self.assertEqual(getattr(build_arg, "resource_url", ""), "portal.qwen.ai") + + def test_upload_cpa_action_bootstraps_oauth_when_refresh_missing(self): + platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None) + account = Account( + platform="qwen", + email="demo@example.com", + password="Abc123!@#", + token="web_token_only", + extra={}, + ) + + with mock.patch.object(platform, "_make_executor", return_value=_DummyExecutorContext()): + with mock.patch( + "platforms.qwen.core.obtain_qwen_oauth_tokens_with_login", + return_value={ + "oauth_access_token": "oauth_access_123", + "refresh_token": "oauth_refresh_456", + "resource_url": "portal.qwen.ai", + }, + ): + with mock.patch( + "platforms.qwen.cpa_upload.generate_token_json", + return_value={"email": "demo@example.com", "access_token": "oauth_access_123"}, + ) as build_mock: + with mock.patch( + "platforms.qwen.cpa_upload.upload_to_cpa", + return_value=(True, "上传成功"), + ): + result = platform.execute_action( + "upload_cpa", + account, + {"api_url": "http://cpa.local", "api_key": "k"}, + ) + + self.assertTrue(result.get("ok")) + build_arg = build_mock.call_args.args[0] + self.assertEqual(getattr(build_arg, "access_token", ""), "oauth_access_123") + self.assertEqual(getattr(build_arg, "refresh_token", ""), "oauth_refresh_456") + self.assertEqual(getattr(build_arg, "resource_url", ""), "portal.qwen.ai") + self.assertEqual( + (result.get("account_extra_patch") or {}).get("refresh_token"), + "oauth_refresh_456", + ) + + def test_qwen_cpa_upload_requires_refresh_token(self): + ok, msg = upload_to_cpa( + { + "type": "qwen", + "email": "demo@example.com", + "access_token": "token_only", + "refresh_token": "", + }, + api_url="http://cpa.local", + api_key="k", + ) + self.assertFalse(ok) + self.assertIn("refresh_token", msg) + + def test_qwen_cpa_generate_token_json_contains_oauth_fields(self): + class _A: + pass + + a = _A() + a.email = "demo@example.com" + a.access_token = "token" + a.refresh_token = "rt_demo" + a.resource_url = "portal.qwen.ai" + + token_json = generate_token_json(a) + self.assertEqual(token_json.get("type"), "qwen") + self.assertEqual(token_json.get("provider"), "qwen") + self.assertEqual(token_json.get("email"), "demo@example.com") + self.assertEqual(token_json.get("access_token"), "token") + self.assertEqual(token_json.get("refresh_token"), "rt_demo") + self.assertEqual(token_json.get("resource_url"), "portal.qwen.ai") + + +if __name__ == "__main__": + unittest.main()