更新qwen注册机

This commit is contained in:
zhangchen
2026-04-14 14:40:49 +08:00
parent 3e06448cc1
commit e702b38ecb
13 changed files with 1645 additions and 107 deletions

View File

@@ -75,7 +75,7 @@ def _apply_action_result(
from datetime import datetime, timezone
acc_model.updated_at = datetime.now(timezone.utc)
session.add(acc_model)
if platform == "chatgpt" and action_id == "upload_cpa":
if action_id == "upload_cpa":
from services.chatgpt_sync import update_account_model_cpa_sync
sync_msg = result.get("data") or result.get("error") or ""

View File

@@ -98,6 +98,9 @@ CONFIG_KEYS = [
"grok2api_quota",
"kiro_manager_path",
"kiro_manager_exe",
"qwen_cpa_enabled",
"qwen_cpa_api_url",
"qwen_cpa_api_key",
"external_apps_update_mode",
"contribution_enabled",
"contribution_server_url",

View File

@@ -49,11 +49,20 @@ class BasePlatform(ABC):
def __init__(self, config: RegisterConfig = None):
self.config = config or RegisterConfig()
self._task_control = None
if self.config.executor_type not in self.supported_executors:
raise NotImplementedError(
f"{self.display_name} 暂不支持 '{self.config.executor_type}' 执行器,"
f"当前支持: {self.supported_executors}"
requested_executor = str(self.config.executor_type or "").strip() or "protocol"
if requested_executor not in self.supported_executors:
fallback = (
"protocol"
if "protocol" in self.supported_executors
else (self.supported_executors[0] if self.supported_executors else "protocol")
)
print(
f"[{self.display_name or self.name}] 执行器 '{requested_executor}' 不受支持,"
f"自动切换为 '{fallback}' (支持: {self.supported_executors})"
)
self.config.executor_type = fallback
else:
self.config.executor_type = requested_executor
@abstractmethod
def register(self, email: str, password: str = None) -> Account:

View File

@@ -52,6 +52,16 @@ class PlaywrightExecutor(BaseExecutor):
raise RuntimeError("Playwright context 未初始化")
return self._context
@property
def page(self) -> Any:
"""兼容平台插件直接访问 executor.page 的用法。"""
return self._require_page()
@property
def context(self) -> Any:
"""兼容平台插件直接访问 executor.context 的用法。"""
return self._require_context()
def get(self, url, *, headers=None, params=None) -> Response:
import urllib.parse

View File

@@ -11,6 +11,7 @@ const PLATFORM_EXECUTORS: Record<string, string[]> = {
kiro: ['protocol', 'headless', 'headed'],
tavily: ['protocol', 'headless', 'headed'],
trae: ['protocol', 'headless', 'headed'],
qwen: ['headless', 'headed'],
openblocklabs: ['protocol'],
}

View File

@@ -368,6 +368,7 @@ function ActionMenu({ acc, onRefresh, actions }: { acc: any; onRefresh: () => vo
const [resultUrl, setResultUrl] = useState('')
const [resultProbe, setResultProbe] = useState<any>(null)
const [resultCliproxySync, setResultCliproxySync] = useState<any>(null)
const [runningActionId, setRunningActionId] = useState<string | null>(null)
const showResult = (title: string, status: 'success' | 'error', text: string, url = '', probe: any = null, cliproxySync: any = null) => {
setResultTitle(title)
@@ -390,7 +391,11 @@ function ActionMenu({ acc, onRefresh, actions }: { acc: any; onRefresh: () => vo
}
const handleAction = async (actionId: string) => {
if (runningActionId) return
const actionLabel = actions.find((item) => item.id === actionId)?.label || actionId
const toastKey = `account-action:${acc?.id}:${actionId}`
setRunningActionId(actionId)
message.loading({ content: `${actionLabel}运行中...`, key: toastKey, duration: 0 })
try {
const r = await apiFetch(`/actions/${acc.platform}/${acc.id}/${actionId}`, {
@@ -401,6 +406,7 @@ function ActionMenu({ acc, onRefresh, actions }: { acc: any; onRefresh: () => vo
const data = r.data || {}
const probe = typeof data === 'object' && data ? data.probe || null : null
const cliproxySync = typeof data === 'object' && data ? data.sync || null : null
message.error({ content: `${actionLabel}失败`, key: toastKey })
showResult(actionLabel, 'error', r.error || data.message || '操作失败', '', probe, cliproxySync)
onRefresh()
return
@@ -408,10 +414,10 @@ function ActionMenu({ acc, onRefresh, actions }: { acc: any; onRefresh: () => vo
const data = r.data || {}
if (data.url || data.checkout_url || data.cashier_url) {
const targetUrl = data.url || data.checkout_url || data.cashier_url
message.success('链接已生成')
message.success({ content: `${actionLabel}完成`, key: toastKey })
showResult(actionLabel, 'success', '操作成功,请在弹窗中打开或复制链接。', targetUrl)
} else {
message.success(data.message || '操作成功')
message.success({ content: data.message || `${actionLabel}完成`, key: toastKey })
const probe = typeof data === 'object' && data ? data.probe || null : null
const cliproxySync = typeof data === 'object' && data ? data.sync || null : null
const text =
@@ -429,14 +435,17 @@ function ActionMenu({ acc, onRefresh, actions }: { acc: any; onRefresh: () => vo
onRefresh()
} catch (e: any) {
const detail = e?.message ? String(e.message) : '请求失败'
message.error(detail)
message.error({ content: detail, key: toastKey })
showResult(actionLabel, 'error', detail)
} finally {
setRunningActionId(null)
}
}
const menuItems: MenuProps['items'] = actions.map((a) => ({
key: a.id,
label: a.label,
label: runningActionId === a.id ? `${a.label}(运行中)` : a.label,
disabled: Boolean(runningActionId),
}))
if (actions.length === 0) return null
@@ -449,7 +458,12 @@ function ActionMenu({ acc, onRefresh, actions }: { acc: any; onRefresh: () => vo
onClick: ({ key }) => handleAction(String(key)),
}}
>
<Button type="link" size="small" icon={<MoreOutlined />} />
<Button
type="link"
size="small"
icon={<MoreOutlined />}
loading={Boolean(runningActionId)}
/>
</Dropdown>
<Modal
title={resultTitle}
@@ -549,6 +563,7 @@ export default function Accounts() {
const [taskId, setTaskId] = useState<string | null>(null)
const [registerLoading, setRegisterLoading] = useState(false)
const [cpaSyncLoading, setCpaSyncLoading] = useState<'pending' | 'selected' | ''>('')
const [cpaUploadLoading, setCpaUploadLoading] = useState<'all' | 'selected' | ''>('')
const [statusSyncLoading, setStatusSyncLoading] = useState<'probe_selected' | 'probe_all' | 'remote_selected' | 'remote_all' | ''>('')
useEffect(() => {
@@ -1009,17 +1024,77 @@ export default function Accounts() {
}
}
const handleBatchUploadCpa = async (scope: 'selected' | 'all') => {
const toastKey = `batch-upload-cpa:${scope}`
const scopeLabel = scope === 'selected' ? '所选账号' : '当前筛选账号'
const body: Record<string, unknown> = {
params: {},
}
if (scope === 'selected') {
const accountIds = Array.from(selectedRowKeys)
.map((value) => Number(value))
.filter((value) => Number.isInteger(value) && value > 0)
if (accountIds.length === 0) {
message.warning('请先选择要导入 CPA 的账号')
return
}
body.account_ids = accountIds
} else {
body.all_filtered = true
if (search) body.email = search
if (filterStatus) body.status = filterStatus
}
setCpaUploadLoading(scope)
message.loading({ content: `${scopeLabel}导入 CPA 进行中...`, key: toastKey, duration: 0 })
try {
const result = await apiFetch(`/actions/${currentPlatform}/upload_cpa/batch`, {
method: 'POST',
body: JSON.stringify(body),
})
if (!result.total) {
message.info({ content: '没有可处理的账号', key: toastKey })
} else if (!result.failed) {
message.success({ content: `${scopeLabel}导入 CPA 完成:成功 ${result.success} / ${result.total}`, key: toastKey })
} else if (!result.success) {
message.error({ content: `${scopeLabel}导入 CPA 失败:成功 ${result.success} / ${result.total}`, key: toastKey })
} else {
message.warning({ content: `${scopeLabel}导入 CPA 部分完成:成功 ${result.success} / ${result.total}`, key: toastKey })
}
showBatchActionResult(`${scopeLabel}导入 CPA 结果`, result)
await load()
} catch (e: any) {
message.error({ content: `导入 CPA 失败: ${e.message}`, key: toastKey })
} finally {
setCpaUploadLoading('')
}
}
const getStatusSyncScope = (): 'selected' | 'all' => (selectedRowKeys.length > 0 ? 'selected' : 'all')
const getBackfillScope = (): 'selected' | 'pending' => (selectedRowKeys.length > 0 ? 'selected' : 'pending')
const getUploadCpaScope = (): 'selected' | 'all' => (selectedRowKeys.length > 0 ? 'selected' : 'all')
const backfillButtonLabel = () => {
const scope = getBackfillScope()
const count = scope === 'selected' ? selectedRowKeys.length : total
return scope === 'selected' ? `补传所选远端未发现 (${count})` : `补传远端未发现 (${count})`
}
const uploadCpaButtonLabel = () => {
const scope = getUploadCpaScope()
const count = scope === 'selected' ? selectedRowKeys.length : total
return scope === 'selected' ? `导入所选 CPA (${count})` : `导入筛选 CPA (${count})`
}
const isChatgptPlatform = currentPlatform === 'chatgpt'
const hasUploadCpaAction = platformActions.some((item) => item?.id === 'upload_cpa')
const monospaceStyle: React.CSSProperties = {
fontFamily: 'SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", monospace',
fontSize: 12,
@@ -1167,6 +1242,22 @@ export default function Accounts() {
},
)
} else {
if (hasUploadCpaAction) {
columns.push({
title: 'CPA',
key: 'cpa_sync',
width: 120,
render: (_: any, record: any) => {
const cpaMeta = uploadSyncMeta(record.cpaSync || {})
return (
<Tag color={cpaMeta.color} title={uploadSyncTitle('CPA', record.cpaSync || {})}>
{cpaMeta.label}
</Tag>
)
},
})
}
columns.push(
{
title: '地区',
@@ -1336,6 +1427,26 @@ export default function Accounts() {
</Button>
</Popconfirm>
)}
{currentPlatform !== 'chatgpt' && hasUploadCpaAction && (
<Popconfirm
title={
getUploadCpaScope() === 'selected'
? `确认导入所选 ${selectedRowKeys.length} 个账号到 CPA`
: `确认导入当前筛选范围内 ${total} 个账号到 CPA`
}
onConfirm={() => handleBatchUploadCpa(getUploadCpaScope())}
okText="确认"
cancelText="取消"
>
<Button
loading={cpaUploadLoading === 'selected' || cpaUploadLoading === 'all'}
icon={<UploadOutlined />}
disabled={getUploadCpaScope() === 'selected' ? selectedRowKeys.length === 0 : total === 0}
>
{uploadCpaButtonLabel()}
</Button>
</Popconfirm>
)}
{selectedRowKeys.length > 0 && (
<Popconfirm
title={`确认删除选中的 ${selectedRowKeys.length} 个账号?`}
@@ -1418,6 +1529,8 @@ export default function Accounts() {
open={addModalOpen}
onCancel={() => { setAddModalOpen(false); addForm.resetFields(); }}
onOk={handleAdd}
okText="确定"
cancelText="取消"
maskClosable={false}
>
<Form form={addForm} layout="vertical">
@@ -1450,6 +1563,8 @@ export default function Accounts() {
open={importModalOpen}
onCancel={() => { setImportModalOpen(false); setImportText(''); }}
onOk={handleImport}
okText="确定"
cancelText="取消"
confirmLoading={importLoading}
maskClosable={false}
>
@@ -1469,6 +1584,8 @@ export default function Accounts() {
open={detailModalOpen}
onCancel={() => setDetailModalOpen(false)}
onOk={handleDetailSave}
okText="保存"
cancelText="取消"
maskClosable={false}
width={760}
styles={{ body: { maxHeight: '72vh', overflowY: 'auto' } }}

View File

@@ -268,6 +268,7 @@ export default function RegisterTaskPage() {
{ value: 'grok', label: 'Grok' },
{ value: 'tavily', label: 'Tavily' },
{ value: 'openblocklabs', label: 'OpenBlockLabs' },
{ value: 'qwen', label: 'Qwen' },
]}
/>
</Form.Item>

View File

@@ -376,6 +376,22 @@ const TAB_ITEMS = [
},
],
},
{
key: 'qwen',
label: 'Qwen',
icon: <ApiOutlined />,
sections: [
{
title: 'CPA 面板',
desc: '注册完成后自动上传到 CPA 管理平台',
fields: [
{ key: 'qwen_cpa_enabled', label: '启用自动上传', type: 'boolean' },
{ key: 'qwen_cpa_api_url', label: 'API URL', placeholder: 'https://your-cpa.example.com留空则使用 ChatGPT CPA 地址)' },
{ key: 'qwen_cpa_api_key', label: 'API Key', secret: true, placeholder: '留空则使用 ChatGPT CPA Key' },
],
},
],
},
{
key: 'contribution',
label: '贡献',
@@ -571,7 +587,7 @@ function ConfigField({ field }: { field: FieldConfig }) {
const isBooleanField = field.type === 'boolean'
const helpText =
field.key === 'default_executor'
? '仅对支持的平台生效ChatGPT、Cursor、Grok、Kiro、Tavily、Trae 支持浏览器模式OpenBlockLabs 仅支持纯协议。'
? '仅对支持的平台生效ChatGPT、Cursor、Grok、Kiro、Tavily、Trae、Qwen 支持浏览器模式OpenBlockLabs 仅支持纯协议。'
: field.key === 'email_domain_rule_enabled'
? '仅 CF Worker 生效:开启后会校验域名级数,以及域名至少包含 2 个字母和 2 个数字。'
: field.key === 'email_domain_level_count'
@@ -928,6 +944,8 @@ function IntegrationsPanel() {
title={resultModal.title}
onCancel={() => setResultModal((v) => ({ ...v, open: false }))}
onOk={() => setResultModal((v) => ({ ...v, open: false }))}
okText="确定"
cancelText="取消"
width={760}
>
<Typography.Paragraph style={{ marginBottom: 8, color: resultModal.ok ? '#10b981' : '#ef4444' }}>
@@ -1816,6 +1834,10 @@ export default function Settings() {
data.sub2api_enabled,
Boolean(String(data.sub2api_api_url ?? '').trim() && String(data.sub2api_api_key ?? '').trim()),
)
data.qwen_cpa_enabled = resolveFeatureEnabledConfig(
data.qwen_cpa_enabled,
false, // 默认关闭,不依赖 cpa_api_url
)
data.cfworker_domains = parseStoredDomainList(data.cfworker_domains)
data.cfworker_enabled_domains = parseStoredDomainList(data.cfworker_enabled_domains)
data.cfworker_random_subdomain = parseBooleanConfigValue(data.cfworker_random_subdomain)
@@ -1886,6 +1908,7 @@ export default function Settings() {
}
values.cpa_enabled = parseBooleanConfigValue(values.cpa_enabled)
values.sub2api_enabled = parseBooleanConfigValue(values.sub2api_enabled)
values.qwen_cpa_enabled = parseBooleanConfigValue(values.qwen_cpa_enabled)
values.cfworker_random_subdomain = parseBooleanConfigValue(values.cfworker_random_subdomain)
values.cfworker_random_name_subdomain = parseBooleanConfigValue(values.cfworker_random_name_subdomain)
values.contribution_enabled = parseBooleanConfigValue(values.contribution_enabled)
@@ -1909,6 +1932,7 @@ export default function Settings() {
mail_import_source: values.mail_provider === 'applemail' ? 'applemail' : 'microsoft',
cpa_enabled: values.cpa_enabled,
sub2api_enabled: values.sub2api_enabled,
qwen_cpa_enabled: values.qwen_cpa_enabled,
cfworker_domains: domains,
cfworker_enabled_domains: enabledDomains,
cfworker_domain: domains.length > 0 ? '' : values.cfworker_domain,

View File

@@ -16,8 +16,11 @@ deferred and can be done by visiting the activation URL from email.
"""
import json
import base64
import hashlib
import random
import re
import secrets
import string
import time
from typing import Callable, Optional
@@ -28,6 +31,10 @@ from playwright.sync_api import Page
QWEN_AUTH_URL = "https://chat.qwen.ai/auth"
QWEN_ACTIVATE_URL = "https://chat.qwen.ai/api/v1/auths/activate"
QWEN_OAUTH_DEVICE_CODE_URL = "https://chat.qwen.ai/api/v1/oauth2/device/code"
QWEN_OAUTH_TOKEN_URL = "https://chat.qwen.ai/api/v1/oauth2/token"
QWEN_OAUTH_CLIENT_ID = "f0304373b74a44d2b584a3fb70ca9e56"
QWEN_OAUTH_SCOPE = "openid profile email model.completion"
UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
@@ -93,24 +100,385 @@ def call_activation_api(activation_url: str, user_agent: str = UA) -> dict:
return {"ok": False, "error": str(e)}
def _b64url_no_padding(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
def _generate_qwen_pkce_pair() -> tuple[str, str]:
verifier = _b64url_no_padding(secrets.token_bytes(32))
challenge = _b64url_no_padding(hashlib.sha256(verifier.encode("utf-8")).digest())
return verifier, challenge
def _device_flow_request(code_challenge: str, user_agent: str = UA) -> dict:
resp = requests.post(
QWEN_OAUTH_DEVICE_CODE_URL,
data={
"client_id": QWEN_OAUTH_CLIENT_ID,
"scope": QWEN_OAUTH_SCOPE,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
},
headers={
"User-Agent": user_agent,
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
},
timeout=20,
)
if resp.status_code != 200:
raise RuntimeError(f"Qwen OAuth device code failed: HTTP {resp.status_code} {resp.text[:180]}")
data = resp.json() if resp.text else {}
if not isinstance(data, dict):
raise RuntimeError("Qwen OAuth device code response invalid")
if not data.get("device_code") or not data.get("verification_uri_complete"):
raise RuntimeError(f"Qwen OAuth device code missing fields: {data}")
return data
def _poll_device_flow_token(
*,
device_code: str,
code_verifier: str,
timeout_seconds: int = 30,
user_agent: str = UA,
) -> dict:
deadline = time.time() + max(6, int(timeout_seconds or 30))
last_err = ""
while time.time() < deadline:
resp = requests.post(
QWEN_OAUTH_TOKEN_URL,
data={
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"client_id": QWEN_OAUTH_CLIENT_ID,
"device_code": device_code,
"code_verifier": code_verifier,
},
headers={
"User-Agent": user_agent,
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
},
timeout=20,
)
text = resp.text or ""
parsed = {}
if text:
try:
parsed = resp.json()
except Exception:
parsed = {}
if resp.status_code == 200 and isinstance(parsed, dict):
access_token = str(parsed.get("access_token") or "").strip()
refresh_token = str(parsed.get("refresh_token") or "").strip()
if access_token and refresh_token:
return parsed
last_err = f"missing access/refresh token: {parsed}"
break
if resp.status_code == 400 and isinstance(parsed, dict):
err = str(parsed.get("error") or "")
if err in {"authorization_pending", "slow_down"}:
time.sleep(2 if err == "authorization_pending" else 3)
continue
last_err = f"{err}: {parsed.get('error_description') or ''}".strip(": ")
break
last_err = f"HTTP {resp.status_code}: {text[:180]}"
time.sleep(2)
raise RuntimeError(f"Qwen OAuth token polling failed: {last_err or 'timeout'}")
def _click_first_confirm_button(page: Page) -> bool:
name_candidates = [
"确认",
"Confirm",
"Authorize",
"授权",
"同意",
"Allow",
"Approve",
]
for name in name_candidates:
try:
locator = page.get_by_role("button", name=name, exact=True)
if locator.count() > 0 and locator.first.is_visible():
locator.first.click()
return True
except Exception:
continue
selector_candidates = [
'button:has-text("确认")',
'button:has-text("Authorize")',
'button:has-text("授权")',
'button:has-text("同意")',
'button:has-text("Allow")',
]
for selector in selector_candidates:
try:
locator = page.locator(selector).first
if locator.count() > 0 and locator.is_visible():
locator.click()
return True
except Exception:
continue
try:
fallback = page.locator("button").first
if fallback.count() > 0 and fallback.is_visible():
fallback.click()
return True
except Exception:
pass
return False
def _looks_like_oauth_success(page: Page) -> bool:
try:
body_text = page.inner_text("body")
except Exception:
body_text = ""
body_text = str(body_text or "")
return ("认证成功" in body_text) or ("请转到命令行界面" in body_text)
def obtain_qwen_oauth_tokens_from_logged_in_page(
page: Page,
*,
log_fn: Callable | None = None,
poll_timeout_seconds: int = 30,
) -> dict:
log = log_fn or (lambda *_args, **_kwargs: None)
try:
code_verifier, code_challenge = _generate_qwen_pkce_pair()
flow = _device_flow_request(code_challenge=code_challenge)
verification_url = str(flow.get("verification_uri_complete") or "").strip()
device_code = str(flow.get("device_code") or "").strip()
if not verification_url or not device_code:
raise RuntimeError(f"Qwen OAuth device flow missing fields: {flow}")
page.goto(verification_url, wait_until="domcontentloaded", timeout=30000)
for _ in range(12):
if _looks_like_oauth_success(page):
break
try:
if page.locator("button").count() > 0:
break
except Exception:
pass
page.wait_for_timeout(800)
if not _looks_like_oauth_success(page):
clicked = _click_first_confirm_button(page)
if not clicked:
body_preview = ""
try:
body_preview = (page.inner_text("body") or "")[:200].replace("\n", "|")
except Exception:
body_preview = ""
raise RuntimeError(
f"OAuth 授权页未找到可点击的确认按钮; url={page.url}; body={body_preview}"
)
page.wait_for_timeout(1500)
token_payload = _poll_device_flow_token(
device_code=device_code,
code_verifier=code_verifier,
timeout_seconds=poll_timeout_seconds,
)
access_token = str(token_payload.get("access_token") or "").strip()
refresh_token = str(token_payload.get("refresh_token") or "").strip()
resource_url = str(token_payload.get("resource_url") or "").strip() or "portal.qwen.ai"
if not access_token or not refresh_token:
raise RuntimeError("Qwen OAuth token payload missing access/refresh token")
return {
"oauth_access_token": access_token,
"refresh_token": refresh_token,
"resource_url": resource_url,
"oauth_token_type": str(token_payload.get("token_type") or "").strip(),
"oauth_scope": str(token_payload.get("scope") or "").strip(),
"oauth_expires_in": int(token_payload.get("expires_in") or 0),
}
except Exception as e:
log(f"Qwen OAuth device-flow failed: {e}")
return {}
def login_qwen_with_password(page: Page, email: str, password: str, *, log_fn: Callable | None = None) -> bool:
log = log_fn or (lambda *_args, **_kwargs: None)
if not email or not password:
return False
try:
page.goto(f"{QWEN_AUTH_URL}?mode=login", wait_until="domcontentloaded", timeout=30000)
page.wait_for_timeout(800)
page.locator('input[name="email"]').fill(email)
page.locator('input[name="password"]').fill(password)
page.get_by_role("button", name="登录", exact=True).click()
page.wait_for_timeout(3500)
url = str(page.url or "")
if "chat.qwen.ai" not in url:
return False
if "/auth" in url and "mode=login" in url:
return False
body_text = ""
try:
body_text = page.inner_text("body")
except Exception:
body_text = ""
if ("输入您的电子邮箱" in body_text) or ("继续使用 Google 登录" in body_text):
return False
try:
cookies = page.context.cookies("https://chat.qwen.ai")
has_token_cookie = any(
str(item.get("name") or "") == "token" and str(item.get("value") or "").strip()
for item in (cookies or [])
)
if not has_token_cookie:
return False
except Exception:
return False
return True
except Exception as e:
log(f"Qwen login failed before OAuth flow: {e}")
return False
def obtain_qwen_oauth_tokens_with_login(
page: Page,
*,
email: str,
password: str,
log_fn: Callable | None = None,
poll_timeout_seconds: int = 30,
) -> dict:
if not login_qwen_with_password(page, email, password, log_fn=log_fn):
return {}
return obtain_qwen_oauth_tokens_from_logged_in_page(
page,
log_fn=log_fn,
poll_timeout_seconds=poll_timeout_seconds,
)
def wait_for_activation_link(
mailbox,
mail_acct,
mail_acct=None,
*,
account_email: str = "",
timeout: int = 120,
before_ids: set = None,
log_fn: Callable | None = None,
max_errors: int = 3,
) -> str | None:
"""Poll mailbox for Qwen activation email and extract link."""
"""Poll mailbox for Qwen activation email and extract link.
Supports:
- CFWorker-style mailbox exposing `_get_mails(email)`
- legacy mailbox exposing `get_messages/get_message_body`
"""
log = log_fn or (lambda *_args, **_kwargs: None)
seen = set(before_ids or [])
def _decode_mime_raw(raw: str) -> str:
raw = str(raw or "")
if not raw:
return ""
try:
import email
msg = email.message_from_string(raw)
chunks: list[str] = []
if msg.is_multipart():
for part in msg.walk():
payload = part.get_payload(decode=True)
if payload is None:
continue
charset = part.get_content_charset() or "utf-8"
try:
text = payload.decode(charset, errors="ignore")
except Exception:
text = payload.decode("utf-8", errors="ignore")
if text:
chunks.append(text)
else:
payload = msg.get_payload(decode=True)
if payload is not None:
charset = msg.get_content_charset() or "utf-8"
try:
text = payload.decode(charset, errors="ignore")
except Exception:
text = payload.decode("utf-8", errors="ignore")
if text:
chunks.append(text)
return "\n".join(chunks).strip()
except Exception:
return ""
def _collect_mail_text(item: dict) -> str:
if not isinstance(item, dict):
return str(item or "")
raw = str(item.get("raw") or "")
decoded_raw = _decode_mime_raw(raw)
return " ".join(
[
str(item.get("subject") or ""),
raw,
decoded_raw,
str(item.get("text") or ""),
str(item.get("content") or ""),
str(item.get("html") or ""),
str(item.get("body") or ""),
]
).strip()
start = time.time()
error_count = 0
while time.time() - start < timeout:
time.sleep(5)
try:
messages = mailbox.get_messages(mail_acct, before_ids=before_ids)
for msg in messages:
# CFWorker mailbox path
if account_email and hasattr(mailbox, "_get_mails"):
messages = mailbox._get_mails(account_email) or []
for msg in messages:
mid = str((msg or {}).get("id") or "")
if mid and mid in seen:
continue
if mid:
seen.add(mid)
body = _collect_mail_text(msg)
link = extract_activation_link(body)
if link:
return link
continue
# Legacy custom mailbox path
if not (
mail_acct
and hasattr(mailbox, "get_messages")
and hasattr(mailbox, "get_message_body")
):
log("Activation link polling aborted: mailbox does not expose readable message APIs")
break
messages = mailbox.get_messages(mail_acct, before_ids=seen)
for msg in messages or []:
mid = str((msg or {}).get("id") or "")
if mid and mid in seen:
continue
if mid:
seen.add(mid)
body = mailbox.get_message_body(mail_acct, msg.get("id")) or ""
link = extract_activation_link(body)
if link:
return link
except Exception:
error_count = 0
except Exception as e:
error_count += 1
log(f"Activation polling error {error_count}/{max_errors}: {e}")
if error_count >= max(1, int(max_errors or 1)):
break
continue
return None
@@ -124,6 +492,17 @@ class QwenRegister:
self.log = log_fn
self._max_retries = 2
@staticmethod
def _resolve_access_token(tokens: dict | None) -> str:
if not isinstance(tokens, dict):
return ""
return str(
tokens.get("token")
or tokens.get("cookie:token")
or tokens.get("access_token")
or ""
).strip()
def register(
self,
email: str,
@@ -147,21 +526,38 @@ class QwenRegister:
"Please select 'headless' or 'headed' executor."
)
last_reason = "no token"
for attempt in range(self._max_retries + 1):
if attempt > 0:
self.log(f" Retry {attempt}/{self._max_retries}...")
time.sleep(5)
result = self._try_register(page, email, password, full_name)
if result.get("token"):
tokens = result.get("tokens", {}) if isinstance(result, dict) else {}
access_token = self._resolve_access_token(tokens)
if access_token:
if attempt == 0:
self.log(" first-attempt token hit")
else:
self.log(f" token hit on retry {attempt}/{self._max_retries}")
return result
last_reason = str(result.get("error") or "no token")
if attempt < self._max_retries:
self.log(f" No token, retrying...")
self.log(f" retry reason: {last_reason}")
self.log(" No token, retrying...")
else:
self.log(f" WARNING: No auth token after {self._max_retries + 1} attempts")
self.log(f" final failure reason(no token): {last_reason}")
return {"email": email, "password": password, "full_name": full_name, "tokens": {}, "status": "failed"}
return {
"email": email,
"password": password,
"full_name": full_name,
"tokens": {},
"status": "failed",
"error": last_reason,
}
def _try_register(self, page: Page, email: str, password: str, full_name: str) -> dict:
"""One attempt at registration. Returns dict with tokens."""
@@ -242,7 +638,17 @@ class QwenRegister:
self.log(f" Current URL: {page.url}")
self.log(f" Tokens found: {list(tokens.keys()) if tokens else 'none'}")
if tokens.get("token") or tokens.get("cookie:token"):
if self._resolve_access_token(tokens):
oauth_data = obtain_qwen_oauth_tokens_from_logged_in_page(
page,
log_fn=self.log,
poll_timeout_seconds=20,
)
if oauth_data:
tokens.update(oauth_data)
self.log(" OAuth refresh_token acquired")
else:
self.log(" OAuth refresh_token not acquired (continue with web token)")
return {
"email": email,
"password": password,
@@ -251,11 +657,48 @@ class QwenRegister:
"status": "success",
}
return {"tokens": {}}
return {
"email": email,
"password": password,
"full_name": full_name,
"tokens": tokens,
"status": "failed",
"error": "no token after submit",
}
except Exception as e:
self.log(f" Error: {e}")
return {"tokens": {}}
err = str(e)
self.log(f" Error: {err}")
# Fallback: even when selector steps fail, page may already have token.
tokens = {}
try:
tokens = self._extract_tokens(page)
self.log(
f" Fallback token check after error: "
f"{list(tokens.keys()) if tokens else 'none'}"
)
except Exception:
tokens = {}
if self._resolve_access_token(tokens):
self.log(" token recovered by fallback extraction")
return {
"email": email,
"password": password,
"full_name": full_name,
"tokens": tokens,
"status": "success",
}
return {
"email": email,
"password": password,
"full_name": full_name,
"tokens": tokens,
"status": "failed",
"error": err,
}
# ---- helpers ----

View File

@@ -0,0 +1,85 @@
"""Qwen 账号导入 CPA 辅助。"""
from __future__ import annotations
import base64
import json
from datetime import datetime, timezone, timedelta
from typing import Any
def _decode_jwt_payload(token: str) -> dict[str, Any]:
raw = str(token or "").strip()
if not raw:
return {}
parts = raw.split(".")
if len(parts) < 2:
return {}
payload = parts[1]
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
try:
decoded = base64.urlsafe_b64decode(payload.encode("utf-8"))
data = json.loads(decoded.decode("utf-8", errors="ignore"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def generate_token_json(account: Any) -> dict[str, Any]:
"""生成适配 CPA 管理端 auth-file 的 Qwen token JSON。"""
email = str(getattr(account, "email", "") or "").strip()
access_token = str(getattr(account, "access_token", "") or "").strip()
refresh_token = str(getattr(account, "refresh_token", "") or "").strip()
resource_url = str(getattr(account, "resource_url", "") or "").strip()
if not resource_url:
resource_url = "portal.qwen.ai"
payload = _decode_jwt_payload(access_token) if access_token else {}
account_id = str(
payload.get("id")
or payload.get("user_id")
or payload.get("sub")
or ""
).strip()
expired_str = ""
exp_timestamp = payload.get("exp")
if isinstance(exp_timestamp, int) and exp_timestamp > 0:
exp_dt = datetime.fromtimestamp(exp_timestamp, tz=timezone(timedelta(hours=8)))
expired_str = exp_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
now = datetime.now(tz=timezone(timedelta(hours=8)))
return {
"type": "qwen",
"provider": "qwen",
"email": email,
"expired": expired_str,
"account_id": account_id,
"access_token": access_token,
"refresh_token": refresh_token,
"resource_url": resource_url,
"last_refresh": now.strftime("%Y-%m-%dT%H:%M:%S+08:00"),
}
def upload_to_cpa(
token_data: dict[str, Any],
api_url: str | None = None,
api_key: str | None = None,
) -> tuple[bool, str]:
"""复用通用 CPA 上传实现。"""
access_token = str((token_data or {}).get("access_token") or "").strip()
refresh_token = str((token_data or {}).get("refresh_token") or "").strip()
if not access_token:
return False, "Qwen 凭证缺少 access_token"
if not refresh_token:
return (
False,
"Qwen 凭证缺少 refresh_token需 Qwen OAuth 设备登录凭证,普通网页 token 不可用)",
)
from platforms.chatgpt.cpa_upload import upload_to_cpa as _upload
return _upload(token_data, api_url=api_url, api_key=api_key)

View File

@@ -7,11 +7,89 @@ Registration flow (verified):
- Activation URL is called via API to complete registration
"""
import json
from core.base_platform import BasePlatform, Account, AccountStatus, RegisterConfig
from core.base_mailbox import BaseMailbox
from core.registry import register
def _normalize_key(key: str) -> str:
return str(key or "").strip().lower().replace("-", "").replace("_", "")
def _parse_json_like(raw: str):
text = str(raw or "").strip()
if not text:
return None
if not ((text.startswith("{") and text.endswith("}")) or (text.startswith("[") and text.endswith("]"))):
return None
try:
return json.loads(text)
except Exception:
return None
def _find_value_by_keys(obj, key_candidates: set[str], *, depth: int = 0, max_depth: int = 5) -> str:
if depth > max_depth:
return ""
if isinstance(obj, dict):
for key, value in obj.items():
if _normalize_key(key) in key_candidates:
if isinstance(value, str) and value.strip():
return value.strip()
if isinstance(value, (dict, list)):
found = _find_value_by_keys(value, key_candidates, depth=depth + 1, max_depth=max_depth)
if found:
return found
elif isinstance(value, str):
parsed = _parse_json_like(value)
if parsed is not None:
found = _find_value_by_keys(parsed, key_candidates, depth=depth + 1, max_depth=max_depth)
if found:
return found
elif isinstance(obj, list):
for item in obj:
found = _find_value_by_keys(item, key_candidates, depth=depth + 1, max_depth=max_depth)
if found:
return found
elif isinstance(obj, str):
parsed = _parse_json_like(obj)
if parsed is not None:
return _find_value_by_keys(parsed, key_candidates, depth=depth + 1, max_depth=max_depth)
return ""
def _extract_qwen_oauth_fields(raw_tokens) -> tuple[str, str, str]:
oauth_access_keys = {
"oauthaccesstoken",
"qwenoauthaccesstoken",
"oauth_token",
"oauthaccess",
}
refresh_keys = {
"refreshtoken",
"refreshtokenvalue",
"qwenrefreshtoken",
"oauthrefreshtoken",
}
resource_keys = {
"resourceurl",
"qwenresourceurl",
"oauthresourceurl",
"baseurl",
"apiurl",
}
oauth_access_token = (
_find_value_by_keys(raw_tokens, oauth_access_keys)
if isinstance(raw_tokens, (dict, list, str))
else ""
)
refresh_token = _find_value_by_keys(raw_tokens, refresh_keys) if isinstance(raw_tokens, (dict, list, str)) else ""
resource_url = _find_value_by_keys(raw_tokens, resource_keys) if isinstance(raw_tokens, (dict, list, str)) else ""
return oauth_access_token, refresh_token, resource_url
@register
class QwenPlatform(BasePlatform):
name = "qwen"
@@ -28,7 +106,8 @@ class QwenPlatform(BasePlatform):
from platforms.qwen.core import (
QwenRegister,
call_activation_api,
extract_activation_link,
wait_for_activation_link,
obtain_qwen_oauth_tokens_with_login,
)
log = getattr(self, "_log_fn", print)
@@ -38,7 +117,8 @@ class QwenPlatform(BasePlatform):
if not email:
raise RuntimeError("Qwen registration requires an email address")
log(f"Email: {email}")
log(f"[Qwen] 开始注册流程")
log(f"[Qwen] 邮箱: {email}")
before_ids = self.mailbox.get_current_ids(mail_acct) if mail_acct else set()
otp_timeout = self.get_mailbox_otp_timeout()
@@ -46,50 +126,100 @@ class QwenPlatform(BasePlatform):
reg = QwenRegister(executor=ex, log_fn=log)
result = reg.register(email=email, password=password)
if result.get("status") != "success":
return Account(
platform="qwen",
email=email,
password=password or "",
token="",
status=AccountStatus.REGISTERED,
extra={"error": "Registration failed — no token cookie"},
)
tokens = result.get("tokens", {})
# Token is in cookie:token from Qwen
access_token = (
tokens.get("token")
or tokens.get("cookie:token")
or tokens.get("access_token", "")
)
# Try activation if mailbox available
if result.get("status") != "success":
reason = str(result.get("error") or "no token")
log(f"[Qwen] 注册失败: {reason}")
raise RuntimeError(f"Qwen registration failed: {reason}")
if not access_token:
log("[Qwen] 注册失败: 未获取到 access token")
raise RuntimeError("Qwen registration failed: no auth token extracted")
log("[Qwen] 注册成功,开始激活流程...")
activated = False
if self.mailbox and mail_acct:
log("Waiting for activation email...")
activation_link = None
for _ in range(min(10, otp_timeout // 5)):
import time
time.sleep(3)
try:
messages = self.mailbox.get_messages(mail_acct, before_ids=before_ids)
for msg in messages:
body = self.mailbox.get_message_body(mail_acct, msg.get("id")) or ""
link = extract_activation_link(body)
if link:
activation_link = link
break
if activation_link:
break
except Exception:
continue
log("[Qwen] 等待激活邮件...")
activation_link = wait_for_activation_link(
self.mailbox,
mail_acct=mail_acct,
account_email=email,
timeout=max(30, min(120, int(otp_timeout or 120))),
before_ids=before_ids,
log_fn=log,
)
if activation_link:
log(f"Activation link found, activating...")
log(f"[Qwen] 找到激活链接,正在激活...")
act_result = call_activation_api(activation_link)
activated = act_result.get("ok", False)
log(f"Activation: {'SUCCESS' if activated else 'FAILED'}")
log(f"[Qwen] 激活结果: {'成功' if activated else '失败'}")
else:
log("[Qwen] 未找到激活链接,跳过激活")
else:
log("[Qwen] 无邮箱服务,跳过激活")
raw_tokens = tokens if isinstance(tokens, dict) else {}
oauth_access_token, refresh_token, resource_url = _extract_qwen_oauth_fields(raw_tokens)
if not refresh_token:
log("[Qwen] 未获取到 refresh_token尝试通过登录获取 OAuth tokens...")
with self._make_executor() as ex:
oauth_data = obtain_qwen_oauth_tokens_with_login(
ex.page,
email=str(result.get("email") or email),
password=str(result.get("password") or password or ""),
log_fn=log,
poll_timeout_seconds=20,
)
if oauth_data:
got_refresh = str(oauth_data.get("refresh_token") or "").strip()
if got_refresh:
refresh_token = got_refresh
oauth_access_token = str(
oauth_data.get("oauth_access_token")
or oauth_data.get("access_token")
or oauth_access_token
or ""
).strip()
resource_url = str(
oauth_data.get("resource_url")
or resource_url
or "portal.qwen.ai"
).strip()
log(f"[Qwen] 成功获取 OAuth tokens")
log(f"[Qwen] refresh_token: {refresh_token[:20]}...")
log(f"[Qwen] resource_url: {resource_url}")
else:
log("[Qwen] OAuth 登录成功但未获取到 refresh_token")
else:
log("[Qwen] OAuth 登录失败,无法获取 refresh_token")
else:
log(f"[Qwen] 已获取 refresh_token: {refresh_token[:20]}...")
extra = {
"activated": activated,
"full_name": result.get("full_name", ""),
"raw_tokens": raw_tokens,
}
if oauth_access_token:
extra["oauth_access_token"] = oauth_access_token
extra["qwen_oauth_access_token"] = oauth_access_token
if refresh_token:
extra["refresh_token"] = refresh_token
extra["qwen_refresh_token"] = refresh_token
if resource_url:
extra["resource_url"] = resource_url
extra["qwen_resource_url"] = resource_url
log(f"[Qwen] 注册流程完成,激活状态: {'已激活' if activated else '未激活'}")
log(f"[Qwen] refresh_token 状态: {'已获取' if refresh_token else '未获取'}")
return Account(
platform="qwen",
@@ -97,11 +227,7 @@ class QwenPlatform(BasePlatform):
password=result["password"],
token=access_token,
status=AccountStatus.REGISTERED,
extra={
"activated": activated,
"full_name": result.get("full_name", ""),
"raw_tokens": tokens,
},
extra=extra,
)
def check_valid(self, account: Account) -> bool:
@@ -114,7 +240,7 @@ class QwenPlatform(BasePlatform):
try:
r = curl_req.get(
"https://chat.qwen.ai/api/v1/user/profile",
"https://chat.qwen.ai/api/v1/chats",
headers={
"Authorization": f"Bearer {token}",
"user-agent": (
@@ -133,76 +259,261 @@ class QwenPlatform(BasePlatform):
def get_platform_actions(self) -> list:
"""Return platform-specific actions."""
return [
{"id": "activate_account", "label": "Activate Account", "params": []},
{"id": "get_user_info", "label": "Get User Info", "params": []},
{"id": "activate_account", "label": "激活账号", "params": []},
{"id": "get_user_info", "label": "获取用户信息", "params": []},
{
"id": "upload_cpa",
"label": "导入 CPA",
"params": [
{"key": "api_url", "label": "CPA API URL", "type": "text"},
{"key": "api_key", "label": "CPA API Key", "type": "text"},
],
},
]
def execute_action(self, action_id: str, account: Account, params: dict) -> dict:
"""Execute platform-specific actions."""
from curl_cffi import requests as curl_req
from platforms.qwen.core import call_activation_api
from platforms.qwen.core import call_activation_api, wait_for_activation_link
if action_id == "activate_account":
# Try to find activation link from email and activate
# Try to find activation link from mailbox and activate
if not self.mailbox:
return {"ok": False, "error": "No mailbox configured for activation"}
try:
from core.base_mailbox import create_mailbox
email = account.email
password = account.password or ""
mail_acct = self.mailbox.get_email()
if not mail_acct or mail_acct.email != email:
return {"ok": False, "error": f"No mailbox for {email}"}
cfg_extra = (self.config.extra or {}) if self.config else {}
provider = str(cfg_extra.get("mail_provider", "luckmail") or "luckmail")
self.mailbox = create_mailbox(
provider=provider,
extra=cfg_extra,
proxy=self.config.proxy if self.config else None,
)
except Exception as e:
return {"ok": False, "error": f"未配置可用邮箱,无法激活: {e}"}
try:
from platforms.qwen.core import extract_activation_link
import time
default_wait_seconds = self.get_mailbox_otp_timeout(default=60)
wait_seconds = int(params.get("wait_seconds") or default_wait_seconds)
wait_seconds = max(5, min(120, wait_seconds))
mail_acct = None
before_ids = None
supports_direct_email_lookup = hasattr(self.mailbox, "_get_mails")
if (
not supports_direct_email_lookup
and hasattr(self.mailbox, "get_current_ids")
and hasattr(self.mailbox, "get_email")
):
try:
mail_acct = self.mailbox.get_email()
if mail_acct:
before_ids = self.mailbox.get_current_ids(mail_acct)
except Exception:
mail_acct = None
before_ids = None
before_ids = self.mailbox.get_current_ids(mail_acct)
activation_link = None
for _ in range(24): # 120s
time.sleep(5)
messages = self.mailbox.get_messages(mail_acct, before_ids=before_ids)
for msg in messages:
body = self.mailbox.get_message_body(mail_acct, msg.get("id")) or ""
link = extract_activation_link(body)
if link:
activation_link = link
break
if activation_link:
break
activation_link = wait_for_activation_link(
self.mailbox,
mail_acct=mail_acct,
account_email=account.email or "",
timeout=wait_seconds,
before_ids=before_ids,
log_fn=getattr(self, "_log_fn", print),
)
if activation_link:
act_result = call_activation_api(activation_link)
if act_result.get("ok"):
return {"ok": True, "message": "Account activated"}
return {"ok": False, "error": f"Activation failed: {act_result}"}
return {"ok": False, "error": "No activation email found"}
return {"ok": True, "message": "账号激活成功"}
return {"ok": False, "error": f"激活请求失败: {act_result}"}
return {"ok": False, "error": f"{wait_seconds}s 内未找到激活邮件"}
except Exception as e:
return {"ok": False, "error": str(e)}
if action_id == "get_user_info":
token = account.token
if not token:
return {"ok": False, "error": "Account missing token"}
return {"ok": False, "error": "账号缺少 token"}
def _decode_jwt_payload(raw_token: str) -> dict:
import base64
import json as _json
parts = str(raw_token or "").split(".")
if len(parts) < 2:
return {}
payload = parts[1]
padded = payload + "=" * ((4 - len(payload) % 4) % 4)
try:
decoded = base64.urlsafe_b64decode(padded.encode("utf-8")).decode(
"utf-8", errors="ignore"
)
obj = _json.loads(decoded)
return obj if isinstance(obj, dict) else {}
except Exception:
return {}
headers = {
"Authorization": f"Bearer {token}",
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
),
}
candidates = [
("users_me", "https://chat.qwen.ai/api/v1/users/me"),
("users_profile", "https://chat.qwen.ai/api/v1/users/profile"),
("chats", "https://chat.qwen.ai/api/v1/chats"),
]
source_labels = {
"users_me": "用户信息接口(me)",
"users_profile": "用户资料接口(profile)",
"chats": "会话列表接口",
}
attempts = []
try:
r = curl_req.get(
"https://chat.qwen.ai/api/v1/user/profile",
headers={
"Authorization": f"Bearer {token}",
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
),
},
impersonate="chrome124",
timeout=15,
)
if r.status_code == 200:
return {"ok": True, "data": r.json()}
return {"ok": False, "error": f"Failed: HTTP {r.status_code}"}
for source, url in candidates:
r = curl_req.get(
url,
headers=headers,
impersonate="chrome124",
timeout=15,
)
attempts.append(
{
"source": source,
"url": url,
"status_code": r.status_code,
}
)
if r.status_code != 200:
continue
data = None
try:
data = r.json()
except Exception:
data = {"raw": (r.text or "")[:1000]}
token_payload = _decode_jwt_payload(token)
payload_summary = {
"id": token_payload.get("id"),
"exp": token_payload.get("exp"),
"last_password_change": token_payload.get("last_password_change"),
}
attempts_cn = [
{
"接口": source_labels.get(item["source"], item["source"]),
"地址": item["url"],
"状态码": item["status_code"],
}
for item in attempts
]
result = {
"来源": source_labels.get(source, source),
"接口地址": url,
"HTTP状态": r.status_code,
"用户ID": payload_summary.get("id"),
"Token过期时间": payload_summary.get("exp"),
"最近改密时间": payload_summary.get("last_password_change"),
"尝试记录": attempts_cn,
"原始数据": data,
}
if source == "chats" and isinstance(data, list):
result["会话数量"] = len(data)
return {"ok": True, "data": result}
return {
"ok": False,
"error": (
"查询用户信息失败: "
+ ", ".join(
f"{item['source']}={item['status_code']}" for item in attempts
)
),
}
except Exception as e:
return {"ok": False, "error": str(e)}
if action_id == "upload_cpa":
from platforms.qwen.cpa_upload import generate_token_json, upload_to_cpa
from platforms.qwen.core import obtain_qwen_oauth_tokens_with_login
class _A:
pass
extra = account.extra if isinstance(account.extra, dict) else {}
raw_tokens = extra.get("raw_tokens") if isinstance(extra.get("raw_tokens"), dict) else {}
raw_oa, raw_rt, raw_ru = _extract_qwen_oauth_fields(raw_tokens)
oauth_access_token = str(
extra.get("oauth_access_token")
or extra.get("qwen_oauth_access_token")
or raw_oa
or ""
).strip()
refresh_token = str(
extra.get("refresh_token")
or extra.get("qwen_refresh_token")
or raw_rt
or ""
).strip()
resource_url = str(
extra.get("resource_url")
or extra.get("qwen_resource_url")
or raw_ru
or ""
).strip()
account_extra_patch = {}
if not refresh_token and account.email and account.password:
with self._make_executor() as ex:
oauth_data = obtain_qwen_oauth_tokens_with_login(
ex.page,
email=str(account.email or ""),
password=str(account.password or ""),
log_fn=getattr(self, "_log_fn", print),
poll_timeout_seconds=20,
)
got_refresh = str(oauth_data.get("refresh_token") or "").strip()
if got_refresh:
oauth_access_token = str(
oauth_data.get("oauth_access_token")
or oauth_data.get("access_token")
or oauth_access_token
or ""
).strip()
refresh_token = got_refresh
resource_url = str(
oauth_data.get("resource_url")
or resource_url
or "portal.qwen.ai"
).strip()
account_extra_patch = {
"oauth_access_token": oauth_access_token,
"qwen_oauth_access_token": oauth_access_token,
"refresh_token": refresh_token,
"qwen_refresh_token": refresh_token,
"resource_url": resource_url,
"qwen_resource_url": resource_url,
}
a = _A()
a.email = account.email
a.access_token = oauth_access_token or account.token or ""
a.refresh_token = refresh_token
a.resource_url = resource_url
token_data = generate_token_json(a)
ok, msg = upload_to_cpa(
token_data,
api_url=params.get("api_url"),
api_key=params.get("api_key"),
)
resp = {"ok": ok, "data": msg}
if account_extra_patch:
resp["account_extra_patch"] = account_extra_patch
return resp
raise NotImplementedError(f"Unknown action: {action_id}")

View File

@@ -234,4 +234,65 @@ def sync_account(account) -> list[dict[str, Any]]:
ok, msg = upload_to_kiro_manager(account, path=configured_path or None)
results.append({"name": "Kiro Manager", "ok": ok, "msg": msg})
elif platform == "qwen":
qwen_cpa_url = str(config_store.get("qwen_cpa_api_url", "") or "").strip()
if not qwen_cpa_url:
qwen_cpa_url = str(config_store.get("cpa_api_url", "") or "").strip()
qwen_cpa_enabled = _is_config_enabled(
config_store.get("qwen_cpa_enabled", ""),
default=bool(qwen_cpa_url),
)
if qwen_cpa_enabled and qwen_cpa_url:
from platforms.qwen.cpa_upload import generate_token_json, upload_to_cpa
class _QwenAccount:
pass
qwen_account = _QwenAccount()
qwen_account.email = account.email
# 支持 AccountModel (extra_json) 和 Account (extra) 两种格式
extra = {}
if hasattr(account, "extra"):
extra = account.extra or {}
elif hasattr(account, "get_extra"):
extra = account.get_extra() or {}
elif hasattr(account, "extra_json"):
import json
try:
extra = json.loads(account.extra_json or "{}")
except Exception:
extra = {}
qwen_account.access_token = _pick_text(
extra,
"oauth_access_token",
"qwen_oauth_access_token",
"access_token",
) or account.token
qwen_account.refresh_token = _pick_text(
extra,
"refresh_token",
"qwen_refresh_token",
)
qwen_account.resource_url = _pick_text(
extra,
"resource_url",
"qwen_resource_url",
default="portal.qwen.ai",
)
if not qwen_account.refresh_token:
msg = "Qwen 账号缺少 refresh_token无法同步到 CPA"
results.append({"name": "Qwen CPA", "ok": False, "msg": msg})
else:
token_data = generate_token_json(qwen_account)
qwen_cpa_key = str(config_store.get("qwen_cpa_api_key", "") or "").strip()
if not qwen_cpa_key:
qwen_cpa_key = str(config_store.get("cpa_api_key", "") or "").strip()
ok, msg = upload_to_cpa(
token_data,
api_url=qwen_cpa_url,
api_key=qwen_cpa_key or None,
)
results.append({"name": "Qwen CPA", "ok": ok, "msg": msg})
return results

View File

@@ -0,0 +1,473 @@
import unittest
from unittest import mock
from core.base_mailbox import MailboxAccount
from core.base_platform import Account, RegisterConfig
from platforms.qwen.core import QwenRegister, wait_for_activation_link
from platforms.qwen.cpa_upload import generate_token_json, upload_to_cpa
from platforms.qwen.plugin import QwenPlatform
class _DummyExecutor:
page = object()
class _DummyExecutorContext:
def __enter__(self):
return _DummyExecutor()
def __exit__(self, exc_type, exc, tb):
return False
class _SequenceQwenRegister(QwenRegister):
def __init__(self, results, logs):
super().__init__(executor=_DummyExecutor(), log_fn=logs.append)
self._results = list(results)
self.calls = 0
def _try_register(self, page, email, password, full_name):
current = self._results[min(self.calls, len(self._results) - 1)]
self.calls += 1
return dict(current)
class _FakeCFWorkerMailbox:
def __init__(self, mails):
self._mails = mails
def _get_mails(self, _email: str):
return list(self._mails)
class _FakeCFWorkerMailboxWithDifferentCurrentEmail(_FakeCFWorkerMailbox):
def get_email(self):
return MailboxAccount(email="other@example.com", account_id="dummy")
def get_current_ids(self, _account):
return set()
class _FakeHttpResp:
def __init__(self, status_code, payload=None, text=""):
self.status_code = status_code
self._payload = payload
self.text = text
def json(self):
if self._payload is None:
raise ValueError("no json")
return self._payload
class QwenRegistrationTests(unittest.TestCase):
def test_get_platform_actions_contains_upload_cpa(self):
platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None)
actions = platform.get_platform_actions()
action_ids = [item.get("id") for item in actions]
self.assertIn("upload_cpa", action_ids)
def test_register_stops_immediately_when_tokens_exist(self):
logs = []
reg = _SequenceQwenRegister(
results=[
{
"email": "demo@example.com",
"password": "Abc123!@#",
"full_name": "Demo",
"tokens": {"cookie:token": "tok_demo"},
"status": "success",
}
],
logs=logs,
)
result = reg.register(email="demo@example.com", password="Abc123!@#", full_name="Demo")
self.assertEqual(reg.calls, 1)
self.assertEqual(result.get("status"), "success")
self.assertEqual(result.get("tokens", {}).get("cookie:token"), "tok_demo")
self.assertTrue(any("first-attempt token hit" in msg for msg in logs))
def test_register_returns_failed_after_retry_exhausted(self):
logs = []
reg = _SequenceQwenRegister(
results=[
{"status": "failed", "tokens": {}, "error": "attempt-1"},
{"status": "failed", "tokens": {}, "error": "attempt-2"},
{"status": "failed", "tokens": {}, "error": "attempt-3"},
],
logs=logs,
)
with mock.patch("platforms.qwen.core.time.sleep", return_value=None):
result = reg.register(email="demo@example.com", password="Abc123!@#", full_name="Demo")
self.assertEqual(reg.calls, 3)
self.assertEqual(result.get("status"), "failed")
self.assertEqual(result.get("error"), "attempt-3")
self.assertTrue(any("final failure reason(no token): attempt-3" in msg for msg in logs))
def test_plugin_success_requires_non_empty_token(self):
platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None)
fake_result = {
"status": "success",
"email": "demo@example.com",
"password": "Abc123!@#",
"tokens": {"cookie:token": "tok_ok"},
}
with mock.patch.object(platform, "_make_executor", return_value=_DummyExecutorContext()):
with mock.patch("platforms.qwen.core.QwenRegister.register", return_value=fake_result):
account = platform.register(email="demo@example.com", password="Abc123!@#")
self.assertEqual(account.email, "demo@example.com")
self.assertEqual(account.token, "tok_ok")
def test_plugin_raises_when_registration_status_failed(self):
platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None)
fake_result = {
"status": "failed",
"email": "demo@example.com",
"password": "Abc123!@#",
"tokens": {},
"error": "no token",
}
with mock.patch.object(platform, "_make_executor", return_value=_DummyExecutorContext()):
with mock.patch("platforms.qwen.core.QwenRegister.register", return_value=fake_result):
with self.assertRaises(RuntimeError):
platform.register(email="demo@example.com", password="Abc123!@#")
def test_plugin_raises_when_success_but_token_missing(self):
platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None)
fake_result = {
"status": "success",
"email": "demo@example.com",
"password": "Abc123!@#",
"tokens": {},
}
with mock.patch.object(platform, "_make_executor", return_value=_DummyExecutorContext()):
with mock.patch("platforms.qwen.core.QwenRegister.register", return_value=fake_result):
with self.assertRaises(RuntimeError):
platform.register(email="demo@example.com", password="Abc123!@#")
def test_plugin_register_extracts_oauth_fields_from_raw_tokens(self):
platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None)
fake_result = {
"status": "success",
"email": "demo@example.com",
"password": "Abc123!@#",
"tokens": {
"cookie:token": "tok_ok",
"oauth_payload": (
'{"oauth_access_token":"oa_demo",'
'"refreshToken":"rt_demo","resource_url":"portal.qwen.ai"}'
),
},
}
with mock.patch.object(platform, "_make_executor", return_value=_DummyExecutorContext()):
with mock.patch("platforms.qwen.core.QwenRegister.register", return_value=fake_result):
account = platform.register(email="demo@example.com", password="Abc123!@#")
self.assertEqual(account.token, "tok_ok")
self.assertEqual((account.extra or {}).get("oauth_access_token"), "oa_demo")
self.assertEqual((account.extra or {}).get("refresh_token"), "rt_demo")
self.assertEqual((account.extra or {}).get("resource_url"), "portal.qwen.ai")
def test_activate_action_can_bootstrap_mailbox_and_activate(self):
platform = QwenPlatform(
config=RegisterConfig(extra={"mail_provider": "cfworker"}),
mailbox=None,
)
account = Account(
platform="qwen",
email="demo@example.com",
password="Abc123!@#",
token="",
)
fake_mailbox = _FakeCFWorkerMailbox(
mails=[
{
"id": 1,
"subject": "Activate your Qwen account",
"raw": (
"Click to activate: "
"https://chat.qwen.ai/api/v1/auths/activate?id=abc&token=def"
),
}
]
)
with mock.patch("core.base_mailbox.create_mailbox", return_value=fake_mailbox):
with mock.patch("platforms.qwen.core.call_activation_api", return_value={"ok": True}):
with mock.patch("platforms.qwen.core.time.sleep", return_value=None):
result = platform.execute_action("activate_account", account, {})
self.assertTrue(result.get("ok"))
def test_activate_action_reports_timeout_with_default_wait_seconds(self):
platform = QwenPlatform(
config=RegisterConfig(
extra={
"mail_provider": "cfworker",
"mailbox_otp_timeout_seconds": 30,
}
),
mailbox=None,
)
account = Account(
platform="qwen",
email="demo@example.com",
password="Abc123!@#",
token="",
)
fake_mailbox = _FakeCFWorkerMailbox(mails=[])
with mock.patch("core.base_mailbox.create_mailbox", return_value=fake_mailbox):
with mock.patch("platforms.qwen.core.wait_for_activation_link", return_value=None):
result = platform.execute_action("activate_account", account, {})
self.assertFalse(result.get("ok"))
self.assertIn("在 30s 内未找到激活邮件", str(result.get("error")))
def test_activate_action_ignores_current_mailbox_email_for_cfworker_lookup(self):
platform = QwenPlatform(
config=RegisterConfig(extra={"mail_provider": "cfworker"}),
mailbox=None,
)
account = Account(
platform="qwen",
email="target@example.com",
password="Abc123!@#",
token="",
)
fake_mailbox = _FakeCFWorkerMailboxWithDifferentCurrentEmail(
mails=[
{
"id": 1,
"subject": "Activate",
"raw": "https://chat.qwen.ai/api/v1/auths/activate?id=abc&token=def",
}
]
)
with mock.patch("core.base_mailbox.create_mailbox", return_value=fake_mailbox):
with mock.patch("platforms.qwen.core.call_activation_api", return_value={"ok": True}):
with mock.patch("platforms.qwen.core.time.sleep", return_value=None):
result = platform.execute_action("activate_account", account, {})
self.assertTrue(result.get("ok"))
def test_wait_for_activation_link_can_decode_base64_html_raw_mail(self):
import base64
html = (
'<html><body>'
'<a href="https://chat.qwen.ai/api/v1/auths/activate?id=abc&token=def">Activate</a>'
"</body></html>"
)
b64 = base64.b64encode(html.encode("utf-8")).decode("ascii")
raw = (
"MIME-Version: 1.0\r\n"
"Content-Type: text/html; charset=utf-8\r\n"
"Content-Transfer-Encoding: base64\r\n"
"\r\n"
f"{b64}\r\n"
)
mailbox = _FakeCFWorkerMailbox(
mails=[{"id": 1, "subject": "Activate", "raw": raw}]
)
with mock.patch("platforms.qwen.core.time.sleep", return_value=None):
link = wait_for_activation_link(
mailbox,
account_email="target@example.com",
timeout=5,
)
self.assertEqual(
link,
"https://chat.qwen.ai/api/v1/auths/activate?id=abc&token=def",
)
def test_get_user_info_fallbacks_to_chats_endpoint(self):
# Header: {"alg":"HS256","typ":"JWT"}
# Payload: {"id":"u1","exp":1778728455}
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6InUxIiwiZXhwIjoxNzc4NzI4NDU1fQ.sig"
platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None)
account = Account(
platform="qwen",
email="demo@example.com",
password="Abc123!@#",
token=token,
)
responses = [
_FakeHttpResp(404, {"detail": "Not Found"}),
_FakeHttpResp(403, {"detail": "restricted"}),
_FakeHttpResp(200, []),
]
with mock.patch("curl_cffi.requests.get", side_effect=responses):
result = platform.execute_action("get_user_info", account, {})
self.assertTrue(result.get("ok"))
data = result.get("data", {})
self.assertEqual(data.get("来源"), "会话列表接口")
self.assertEqual(data.get("会话数量"), 0)
self.assertEqual(data.get("用户ID"), "u1")
def test_upload_cpa_action_uses_qwen_uploader(self):
platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None)
account = Account(
platform="qwen",
email="demo@example.com",
password="Abc123!@#",
token="qwen_token_abc",
extra={
"refresh_token": "qwen_refresh_token_xyz",
"resource_url": "portal.qwen.ai",
},
)
with mock.patch(
"platforms.qwen.cpa_upload.generate_token_json",
return_value={"email": "demo@example.com", "access_token": "qwen_token_abc"},
) as build_mock:
with mock.patch(
"platforms.qwen.cpa_upload.upload_to_cpa",
return_value=(True, "上传成功"),
) as upload_mock:
result = platform.execute_action(
"upload_cpa",
account,
{"api_url": "http://cpa.local", "api_key": "k"},
)
self.assertTrue(result.get("ok"))
self.assertEqual(result.get("data"), "上传成功")
build_arg = build_mock.call_args.args[0]
self.assertEqual(getattr(build_arg, "email", ""), "demo@example.com")
self.assertEqual(getattr(build_arg, "access_token", ""), "qwen_token_abc")
self.assertEqual(getattr(build_arg, "refresh_token", ""), "qwen_refresh_token_xyz")
self.assertEqual(getattr(build_arg, "resource_url", ""), "portal.qwen.ai")
upload_mock.assert_called_once_with(
{"email": "demo@example.com", "access_token": "qwen_token_abc"},
api_url="http://cpa.local",
api_key="k",
)
def test_upload_cpa_action_reads_refresh_from_raw_tokens(self):
platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None)
account = Account(
platform="qwen",
email="demo@example.com",
password="Abc123!@#",
token="qwen_token_abc",
extra={
"raw_tokens": {
"oauth_payload": '{"refreshToken":"rt_raw","resource_url":"portal.qwen.ai"}',
}
},
)
with mock.patch(
"platforms.qwen.cpa_upload.generate_token_json",
return_value={"email": "demo@example.com", "access_token": "qwen_token_abc"},
) as build_mock:
with mock.patch(
"platforms.qwen.cpa_upload.upload_to_cpa",
return_value=(True, "上传成功"),
):
result = platform.execute_action(
"upload_cpa",
account,
{"api_url": "http://cpa.local", "api_key": "k"},
)
self.assertTrue(result.get("ok"))
build_arg = build_mock.call_args.args[0]
self.assertEqual(getattr(build_arg, "refresh_token", ""), "rt_raw")
self.assertEqual(getattr(build_arg, "resource_url", ""), "portal.qwen.ai")
def test_upload_cpa_action_bootstraps_oauth_when_refresh_missing(self):
platform = QwenPlatform(config=RegisterConfig(executor_type="headless"), mailbox=None)
account = Account(
platform="qwen",
email="demo@example.com",
password="Abc123!@#",
token="web_token_only",
extra={},
)
with mock.patch.object(platform, "_make_executor", return_value=_DummyExecutorContext()):
with mock.patch(
"platforms.qwen.core.obtain_qwen_oauth_tokens_with_login",
return_value={
"oauth_access_token": "oauth_access_123",
"refresh_token": "oauth_refresh_456",
"resource_url": "portal.qwen.ai",
},
):
with mock.patch(
"platforms.qwen.cpa_upload.generate_token_json",
return_value={"email": "demo@example.com", "access_token": "oauth_access_123"},
) as build_mock:
with mock.patch(
"platforms.qwen.cpa_upload.upload_to_cpa",
return_value=(True, "上传成功"),
):
result = platform.execute_action(
"upload_cpa",
account,
{"api_url": "http://cpa.local", "api_key": "k"},
)
self.assertTrue(result.get("ok"))
build_arg = build_mock.call_args.args[0]
self.assertEqual(getattr(build_arg, "access_token", ""), "oauth_access_123")
self.assertEqual(getattr(build_arg, "refresh_token", ""), "oauth_refresh_456")
self.assertEqual(getattr(build_arg, "resource_url", ""), "portal.qwen.ai")
self.assertEqual(
(result.get("account_extra_patch") or {}).get("refresh_token"),
"oauth_refresh_456",
)
def test_qwen_cpa_upload_requires_refresh_token(self):
ok, msg = upload_to_cpa(
{
"type": "qwen",
"email": "demo@example.com",
"access_token": "token_only",
"refresh_token": "",
},
api_url="http://cpa.local",
api_key="k",
)
self.assertFalse(ok)
self.assertIn("refresh_token", msg)
def test_qwen_cpa_generate_token_json_contains_oauth_fields(self):
class _A:
pass
a = _A()
a.email = "demo@example.com"
a.access_token = "token"
a.refresh_token = "rt_demo"
a.resource_url = "portal.qwen.ai"
token_json = generate_token_json(a)
self.assertEqual(token_json.get("type"), "qwen")
self.assertEqual(token_json.get("provider"), "qwen")
self.assertEqual(token_json.get("email"), "demo@example.com")
self.assertEqual(token_json.get("access_token"), "token")
self.assertEqual(token_json.get("refresh_token"), "rt_demo")
self.assertEqual(token_json.get("resource_url"), "portal.qwen.ai")
if __name__ == "__main__":
unittest.main()