修复FreemailMailbox 接码,和device_id,会话复用问题

This commit is contained in:
zhangchen
2026-04-02 22:11:25 +08:00
parent cc270b3a4b
commit 9f440e4e08
7 changed files with 204 additions and 11 deletions

View File

@@ -2094,6 +2094,11 @@ class FreemailMailbox(BaseMailbox):
**kwargs,
) -> str:
seen = set(before_ids or [])
exclude_codes = {
str(code).strip()
for code in (kwargs.get("exclude_codes") or set())
if str(code or "").strip()
}
def poll_once() -> Optional[str]:
try:
@@ -2108,8 +2113,10 @@ class FreemailMailbox(BaseMailbox):
continue
seen.add(mid)
# 直接用 verification_code 字段
code = str(msg.get("verification_code") or "")
code = str(msg.get("verification_code") or "").strip()
if code and code != "None":
if code in exclude_codes:
continue
return code
# 兜底:从 preview 提取
text = (
@@ -2117,6 +2124,8 @@ class FreemailMailbox(BaseMailbox):
)
code = self._safe_extract(text, code_pattern)
if code:
if code in exclude_codes:
continue
return code
except Exception:
pass

View File

@@ -154,6 +154,8 @@ PASSWORD_CHARSET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567
DEFAULT_PASSWORD_LENGTH = 12
# 用户信息生成(用于注册)
MIN_REGISTRATION_AGE = 20
MAX_REGISTRATION_AGE = 45
# 常用英文名
FIRST_NAMES = [
@@ -174,9 +176,9 @@ def generate_random_user_info() -> dict:
# 随机选择名字
name = random.choice(FIRST_NAMES)
# 生成随机生日(18-45岁
# 生成随机生日(20-45岁
current_year = datetime.now().year
birth_year = random.randint(current_year - 45, current_year - 18)
birth_year = random.randint(current_year - MAX_REGISTRATION_AGE, current_year - MIN_REGISTRATION_AGE)
birth_month = random.randint(1, 12)
# 根据月份确定天数
if birth_month in [1, 3, 5, 7, 8, 10, 12]:

View File

@@ -18,6 +18,7 @@ from curl_cffi import requests as cffi_requests
from core.task_runtime import TaskInterruption
from .oauth import OAuthManager, OAuthStart
from .http_client import OpenAIHTTPClient, HTTPClientError
from .utils import generate_device_id, seed_oai_device_cookie
# from ..services import EmailServiceFactory, BaseEmailService, EmailServiceType # removed: external dep
# from ..database import crud # removed: external dep
# from ..database.session import get_db # removed: external dep
@@ -133,6 +134,7 @@ class RefreshTokenRegistrationEngine:
self.session_token: Optional[str] = None # 会话令牌
self.logs: list = []
self._otp_sent_at: Optional[float] = None # OTP 发送时间戳
self._device_id: Optional[str] = None # 当前注册流程复用的 Device ID
self._used_verification_codes = set() # 已取过的验证码,避免二次登录时捞到旧码
self._is_existing_account: bool = False # 是否为已注册账号(用于自动登录)
self._token_acquisition_requires_login: bool = False # 新注册账号需要二次登录拿 token
@@ -210,34 +212,40 @@ class RefreshTokenRegistrationEngine:
"""初始化会话"""
try:
self.session = self.http_client.session
if self._device_id:
seed_oai_device_cookie(self.session, self._device_id)
return True
except Exception as e:
self._log(f"初始化会话失败: {e}", "error")
return False
def _get_device_id(self) -> Optional[str]:
"""获取 Device ID"""
"""获取并复用 Device ID,同时访问 OAuth URL 建立当前会话。"""
if not self.oauth_start:
return None
if not self._device_id:
self._device_id = generate_device_id()
max_attempts = 3
for attempt in range(1, max_attempts + 1):
try:
if not self.session:
self.session = self.http_client.session
seed_oai_device_cookie(self.session, self._device_id)
response = self.session.get(
self.oauth_start.auth_url,
timeout=20
)
did = self.session.cookies.get("oai-did")
if did:
self._log(f"Device ID: {did}")
return did
if response.status_code < 400:
self._log(f"Device ID: {self._device_id}")
return self._device_id
self._log(
f"获取 Device ID 失败: 未返回 oai-did Cookie (HTTP {response.status_code},{attempt}/{max_attempts} 次)",
f"获取 Device ID 失败: 建立 OAuth 会话返回 HTTP {response.status_code} ({attempt}/{max_attempts} 次)",
"warning" if attempt < max_attempts else "error"
)
except Exception as e:
@@ -876,6 +884,7 @@ class RefreshTokenRegistrationEngine:
self._is_existing_account = False
self._token_acquisition_requires_login = False
self._otp_sent_at = None
self._device_id = None
self._used_verification_codes.clear()
self._log("=" * 60)

View File

@@ -13,6 +13,8 @@ import re
from urllib.parse import urlparse
from typing import Any, Dict
from .constants import MAX_REGISTRATION_AGE, MIN_REGISTRATION_AGE
@dataclass
class FlowState:
@@ -61,8 +63,14 @@ def generate_random_name():
def generate_random_birthday():
"""生成随机生日字符串,格式 YYYY-MM-DD20~30岁)"""
year = random.randint(1996, 2006)
"""生成随机生日字符串,格式 YYYY-MM-DD20~45岁)"""
from datetime import datetime
current_year = datetime.now().year
year = random.randint(
current_year - MAX_REGISTRATION_AGE,
current_year - MIN_REGISTRATION_AGE,
)
month = random.randint(1, 12)
day = random.randint(1, 28)
return f"{year:04d}-{month:02d}-{day:02d}"

View File

@@ -0,0 +1,35 @@
import unittest
from datetime import datetime
from platforms.chatgpt.constants import (
MAX_REGISTRATION_AGE,
MIN_REGISTRATION_AGE,
generate_random_user_info,
)
from platforms.chatgpt.utils import generate_random_birthday
class ChatGPTBirthdateRangeTests(unittest.TestCase):
def test_generate_random_user_info_birthdate_stays_within_20_to_45(self):
current_year = datetime.now().year
for _ in range(50):
birthdate = generate_random_user_info()["birthdate"]
birth_year = int(birthdate.split("-", 1)[0])
age = current_year - birth_year
self.assertGreaterEqual(age, MIN_REGISTRATION_AGE)
self.assertLessEqual(age, MAX_REGISTRATION_AGE)
def test_generate_random_birthday_stays_within_20_to_45(self):
current_year = datetime.now().year
for _ in range(50):
birthdate = generate_random_birthday()
birth_year = int(birthdate.split("-", 1)[0])
age = current_year - birth_year
self.assertGreaterEqual(age, MIN_REGISTRATION_AGE)
self.assertLessEqual(age, MAX_REGISTRATION_AGE)
if __name__ == "__main__":
unittest.main()

View File

@@ -29,6 +29,20 @@ class SequenceEmailService(DummyEmailService):
return self.codes.pop(0)
class _DummyHTTPClient:
def __init__(self, sessions):
self._sessions = list(sessions)
self._index = 0
@property
def session(self):
return self._sessions[self._index]
def close(self):
if self._index < len(self._sessions) - 1:
self._index += 1
class RegistrationEngineFlowTests(unittest.TestCase):
def _make_engine(self):
return RefreshTokenRegistrationEngine(
@@ -57,6 +71,46 @@ class RegistrationEngineFlowTests(unittest.TestCase):
self.assertEqual(email_service.calls[1]["exclude_codes"], {"111111"})
self.assertEqual(engine._used_verification_codes, {"111111", "222222"})
@mock.patch("platforms.chatgpt.refresh_token_registration_engine.seed_oai_device_cookie")
@mock.patch(
"platforms.chatgpt.refresh_token_registration_engine.generate_device_id",
return_value="device-fixed",
)
def test_get_device_id_reuses_generated_value_across_auth_flow_reset(
self, _generate_device_id, mock_seed_cookie
):
engine = self._make_engine()
first_session = mock.Mock()
first_session.cookies = mock.Mock()
first_session.get.return_value = mock.Mock(status_code=200)
second_session = mock.Mock()
second_session.cookies = mock.Mock()
second_session.get.return_value = mock.Mock(status_code=200)
engine.http_client = _DummyHTTPClient([first_session, second_session])
engine.oauth_start = mock.Mock(auth_url="https://auth.openai.com/oauth/authorize")
self.assertTrue(engine._init_session())
first_did = engine._get_device_id()
engine._reset_auth_flow()
engine.oauth_start = mock.Mock(auth_url="https://auth.openai.com/oauth/authorize")
self.assertTrue(engine._init_session())
second_did = engine._get_device_id()
self.assertEqual(first_did, "device-fixed")
self.assertEqual(second_did, "device-fixed")
_generate_device_id.assert_called_once()
self.assertEqual(first_session.get.call_count, 1)
self.assertEqual(second_session.get.call_count, 1)
self.assertEqual(
[call.args for call in mock_seed_cookie.call_args_list],
[
(first_session, "device-fixed"),
(second_session, "device-fixed"),
(second_session, "device-fixed"),
],
)
def test_run_restarts_login_after_new_registration(self):
engine = self._make_engine()

View File

@@ -0,0 +1,76 @@
import unittest
from unittest import mock
from core.base_mailbox import MailboxAccount, create_mailbox
class FreemailMailboxTests(unittest.TestCase):
def _build_mailbox(self):
mailbox = create_mailbox(
"freemail",
extra={"freemail_api_url": "https://freemail.example"},
)
mailbox._session = mock.Mock()
return mailbox
@mock.patch("time.sleep", return_value=None)
def test_wait_for_code_skips_excluded_verification_code_field(self, _sleep):
mailbox = self._build_mailbox()
mailbox._session.get.side_effect = [
_response(
[
{"id": "m1", "verification_code": "111111"},
]
),
_response(
[
{"id": "m1", "verification_code": "111111"},
{"id": "m2", "verification_code": "222222"},
]
),
]
code = mailbox.wait_for_code(
MailboxAccount(email="demo@example.com"),
timeout=5,
exclude_codes={"111111"},
)
self.assertEqual(code, "222222")
self.assertEqual(mailbox._session.get.call_count, 2)
@mock.patch("time.sleep", return_value=None)
def test_wait_for_code_skips_excluded_preview_extracted_code(self, _sleep):
mailbox = self._build_mailbox()
mailbox._session.get.side_effect = [
_response(
[
{"id": "m1", "verification_code": None, "preview": "Your verification code is 111111"},
]
),
_response(
[
{"id": "m1", "verification_code": None, "preview": "Your verification code is 111111"},
{"id": "m2", "verification_code": None, "preview": "Your verification code is 222222"},
]
),
]
code = mailbox.wait_for_code(
MailboxAccount(email="demo@example.com"),
timeout=5,
exclude_codes={"111111"},
)
self.assertEqual(code, "222222")
self.assertEqual(mailbox._session.get.call_count, 2)
def _response(payload):
response = mock.Mock()
response.json.return_value = payload
return response
if __name__ == "__main__":
unittest.main()