From b4c670ee60feb4ceaec800c19d84b5a62648ad5e Mon Sep 17 00:00:00 2001 From: zhangchen <1987834247@qq.com> Date: Sat, 28 Mar 2026 18:31:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dkiro=E6=8A=A5=E9=94=99=20?= =?UTF-8?q?=E5=91=BD=E5=90=8D=E5=8A=A0=E4=B8=8A=E9=9A=8F=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- platforms/kiro/core.py | 253 +++++++++++++++++++++++++++++++---------- 1 file changed, 193 insertions(+), 60 deletions(-) diff --git a/platforms/kiro/core.py b/platforms/kiro/core.py index 7b361fc..44ca8a8 100644 --- a/platforms/kiro/core.py +++ b/platforms/kiro/core.py @@ -11,8 +11,9 @@ import re import hashlib import threading import base64 +import os from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer -from typing import Tuple, Union +from typing import Tuple, Union, Optional from urllib.parse import urlencode, urlparse, parse_qs from urllib.request import Request, build_opener @@ -41,6 +42,37 @@ KIRO_IDC_SCOPES = [ ] logger = logging.getLogger("kiro.playwright") +_UA_TEMPLATES = [ + { + "name": "win_chrome", + "template": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{ver} Safari/537.36", + }, + { + "name": "mac_chrome", + "template": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_{minor}_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{ver} Safari/537.36", + }, + { + "name": "linux_chrome", + "template": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{ver} Safari/537.36", + }, +] + +_LOCALE_TIMEZONE_POOLS = [ + ("en-US", ["America/New_York", "America/Chicago", "America/Denver", "America/Los_Angeles"]), + ("en-GB", ["Europe/London"]), + ("en-CA", ["America/Toronto", "America/Vancouver"]), + ("en-AU", ["Australia/Sydney", "Australia/Melbourne"]), +] + +_VIEWPORT_PRESETS = [ + (1366, 768), + (1440, 900), + (1536, 864), + (1600, 900), + (1680, 1050), + (1920, 1080), +] + class _DesktopAuthCallbackServer: def __init__(self, expected_state: str): @@ -136,6 +168,45 @@ class KiroRegister: def log(self, msg): self.log_fn(f"[{self.tag}] {msg}") + def _human_sleep(self, min_seconds: float = 0.18, max_seconds: float = 0.65): + time.sleep(random.uniform(min_seconds, max_seconds)) + + def _randomize_name(self, base_name: str) -> str: + base = (base_name or "Kiro User").strip() + suffix = "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=3)) + str(random.randint(10, 99)) + return f"{base} {suffix}" + + def _random_chrome_version(self) -> str: + major = random.randint(130, 136) + build = random.randint(6400, 7399) + patch = random.randint(40, 220) + return f"{major}.0.{build}.{patch}" + + def _build_random_profile(self) -> dict: + ua_tmpl = random.choice(_UA_TEMPLATES) + chrome_ver = self._random_chrome_version() + + locale, tz_pool = random.choice(_LOCALE_TIMEZONE_POOLS) + timezone_id = random.choice(tz_pool) + + base_w, base_h = random.choice(_VIEWPORT_PRESETS) + width = max(1100, base_w + random.randint(-72, 72)) + height = max(700, base_h + random.randint(-54, 54)) + + if ua_tmpl["name"] == "mac_chrome": + os_minor = random.choice([14, 15, 16]) + user_agent = ua_tmpl["template"].format(ver=chrome_ver, minor=os_minor) + else: + user_agent = ua_tmpl["template"].format(ver=chrome_ver) + + return { + "name": f"{ua_tmpl['name']}_{chrome_ver}", + "user_agent": user_agent, + "locale": locale, + "timezone_id": timezone_id, + "viewport": {"width": width, "height": height}, + } + def _init_browser(self): self.pw = sync_playwright().start() launch_opts = { @@ -149,12 +220,27 @@ class KiroRegister: launch_opts["proxy"] = {"server": self.proxy} self.browser = self.pw.chromium.launch(**launch_opts) - self.context = self.browser.new_context( - user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", - locale="en-US", - timezone_id="America/New_York", - viewport={"width": 1280, "height": 800} + profile = self._build_random_profile() + + env_locale = os.getenv("KIRO_LOCALE", "").strip() + env_timezone = os.getenv("KIRO_TIMEZONE", "").strip() + locale = env_locale or profile["locale"] + timezone_id = env_timezone or profile["timezone_id"] + viewport = dict(profile["viewport"]) + + self.log( + f"浏览器画像: {profile['name']} / {locale} / {timezone_id} / " + f"{viewport['width']}x{viewport['height']}" ) + self.context = self.browser.new_context( + user_agent=profile["user_agent"], + locale=locale, + timezone_id=timezone_id, + viewport=viewport, + color_scheme=random.choice(["light", "dark"]), + reduced_motion=random.choice(["reduce", "no-preference"]), + ) + self.context.set_extra_http_headers({"Accept-Language": f"{locale},en;q=0.9"}) # 拦截 Kiro 登录成功相关的请求/响应,提取 Token self.context.on("request", self._on_request) @@ -274,6 +360,43 @@ class KiroRegister: if self.pw: self.pw.stop() + def _accept_cookie_banner_if_present(self, page: Page): + try: + if page.locator('text=/cookie/i').count() == 0: + return + + selectors = [ + 'button[data-id*="awsccc"]:has-text("Accept")', + 'button[id*="awsccc-accept"]', + 'button:has-text("Accept")', + ] + for sel in selectors: + btn = page.locator(sel).first + if btn.count() > 0 and btn.is_visible(): + btn.click(timeout=2000) + self.log("已处理 Cookie 横幅(Accept)") + self._human_sleep(0.2, 0.45) + return + except Exception: + pass + + def _get_aws_alert_text(self, page: Page) -> str: + selectors = [ + ".awsui-alert-content", + '[data-testid*="alert"]', + '[role="alert"]', + ] + for sel in selectors: + try: + alert = page.locator(sel).first + if alert.count() > 0 and alert.is_visible(): + text = (alert.text_content() or "").strip() + if text: + return text + except Exception: + continue + return "" + def _type_like_human(self, page: Page, selector_or_locator: Union[str, Locator], text: str): if isinstance(selector_or_locator, str): el = page.locator(selector_or_locator).first @@ -281,10 +404,12 @@ class KiroRegister: # It's already a Locator, use its first match el = selector_or_locator.first - el.click() - for char in text: - page.keyboard.type(char, delay=random.randint(50, 150)) - time.sleep(random.uniform(0.1, 0.4)) + el.click(delay=random.randint(45, 160)) + for idx, char in enumerate(text): + page.keyboard.type(char, delay=random.randint(45, 210)) + if idx > 0 and random.random() < 0.12: + self._human_sleep(0.05, 0.2) + self._human_sleep(0.2, 0.55) def _solve_captcha_if_exists(self, page: Page): try: @@ -297,7 +422,7 @@ class KiroRegister: def _click_primary_button(self, page: Page): # 给予 React 状态同步时间,防止打字太快点击导致验证失效 - time.sleep(0.5) + self._human_sleep(0.45, 1.05) try: # 依优先级测试页面上可能的提要按钮 @@ -307,18 +432,21 @@ class KiroRegister: 'button[data-testid="test-primary-button"]', 'button[type="submit"]:has-text("Continue")', 'button[type="submit"]:has-text("Verify")', - 'button[type="submit"]:has-text("Create")' + 'button[type="submit"]:has-text("Create")', + 'button[type="submit"]:has-text("Next")', ] for sel in selectors: btn = page.locator(sel).first - if btn.is_visible(): + if btn.count() > 0 and btn.is_visible(): btn.click(timeout=2000) + self._human_sleep(0.22, 0.65) return # 最后的退路:查找未带有 awsccc(Cookie Consent)的可见 Submit 按钮 fallback = page.locator('button[type="submit"]:not([data-id*="awsccc"]):visible').first - if fallback.is_visible(): + if fallback.count() > 0 and fallback.is_visible(): fallback.click(timeout=2000) + self._human_sleep(0.22, 0.65) except Exception: pass @@ -353,10 +481,44 @@ class KiroRegister: if error_text: return False, error_text - time.sleep(0.5) + self._human_sleep(0.2, 0.6) return False, "提交验证码后未进入密码设置页" + def _wait_for_otp_step(self, page: Page, timeout_ms: int = 18000) -> Tuple[bool, str, Optional[Locator]]: + deadline = time.time() + (timeout_ms / 1000) + otp_candidates = [ + page.get_by_label("Verification code", exact=True), + page.locator('input[placeholder*="6-digit" i]'), + page.locator('div[data-testid*="code-input"] input'), + page.locator('input[name="code"], input[id*="code"]'), + ] + error_patterns = [ + re.compile(r"error processing your request", re.I), + re.compile(r"couldn't complete your request", re.I), + re.compile(r"verify your email", re.I), + re.compile(r"invalid verification code", re.I), + ] + + while time.time() < deadline: + for locator in otp_candidates: + try: + field = locator.first + if field.count() > 0 and field.is_visible(): + return True, "", field + except Exception: + continue + + error_text = self._get_first_visible_text(page, error_patterns) + if not error_text: + error_text = self._get_aws_alert_text(page) + if error_text: + return False, error_text, None + + self._human_sleep(0.2, 0.6) + + return False, "等待 OTP 输入框超时", None + def _fill_password_fields(self, page: Page, password: str): password_field = page.get_by_label("Password", exact=True) confirm_field = page.get_by_label("Confirm password", exact=True) @@ -506,7 +668,7 @@ class KiroRegister: pass self._type_like_human(page, email_input, email) self._click_primary_button(page) - time.sleep(1) + self._human_sleep(0.7, 1.4) except Exception: pass @@ -521,7 +683,7 @@ class KiroRegister: pass password_input.fill(pwd) self._click_primary_button(page) - time.sleep(1) + self._human_sleep(0.7, 1.4) except Exception: pass @@ -531,7 +693,7 @@ class KiroRegister: if button.count() > 0 and button.is_visible(): self.log(f"桌面授权页点击 {label} ...") button.click(timeout=2000) - time.sleep(1) + self._human_sleep(0.6, 1.2) break except Exception: continue @@ -587,12 +749,12 @@ class KiroRegister: otp_input.fill(str(otp_code)) self._click_primary_button(auth_page) desktop_otp_used = True - time.sleep(1) + self._human_sleep(0.7, 1.5) continue except Exception as otp_error: raise RuntimeError(f"桌面授权验证码处理失败: {otp_error}") from otp_error self._handle_desktop_auth_page(auth_page, email=email, pwd=pwd) - time.sleep(1) + self._human_sleep(0.6, 1.3) callback = callback_server.wait(timeout=5) desktop_token = self._exchange_desktop_token( @@ -646,6 +808,7 @@ class KiroRegister: mail_token: str = None, otp_timeout: int = 120, otp_callback=None) -> Tuple[bool, dict]: if not pwd: pwd = f"Aa!1{uuid.uuid4().hex[:8]}" + name = self._randomize_name(name) self.log(f"开始 Playwright 流程, Email: {email}") page = None @@ -658,7 +821,7 @@ class KiroRegister: self.log("加载 Kiro Login ...") page.goto(KIRO_SIGNIN_URL, wait_until="domcontentloaded") - time.sleep(3) + self._human_sleep(1.9, 3.4) # Debug: dump all buttons to log try: @@ -676,6 +839,7 @@ class KiroRegister: self.log("等待跳转到 AWS SSO ...") page.wait_for_url(re.compile(r"signin\.aws"), timeout=30000) + self._accept_cookie_banner_if_present(page) self._solve_captcha_if_exists(page) # 1. 填写 Email @@ -695,7 +859,7 @@ class KiroRegister: email_input.wait_for(state="visible", timeout=15000) self._type_like_human(page, 'input[placeholder="username@example.com"], input[type="email"]', email) self._click_primary_button(page) - time.sleep(2) + self._human_sleep(1.1, 2.4) self._solve_captcha_if_exists(page) # 2. 填写名字 @@ -710,24 +874,15 @@ class KiroRegister: name_input.first.wait_for(state="visible", timeout=10000) self._type_like_human(page, name_input, name) self._click_primary_button(page) - time.sleep(2) + self._human_sleep(1.1, 2.4) except Exception as e: self.log(f"未出现名字输入框或等待超时,尝试跳过: {e}") # 3. 收集邮箱验证码 self.log("3. 等待触发 OTP...") - otp_sel = 'input[placeholder*="6-digit" i], div[data-testid*="code-input"] input, input[name="code"], input[id*="code"], input[type="text"]:not([disabled])' - - otp_input = None - try: - otp_input = page.get_by_label("Verification code", exact=True) - if otp_input.count() == 0: - otp_input = page.locator(otp_sel).first - - otp_input.wait_for(state="visible", timeout=15000) - except Exception as e: - self.log(f"未能找到 OTP 输入框,页面可能不同: {e}") - pass + otp_ready, otp_wait_error, otp_input = self._wait_for_otp_step(page, timeout_ms=18000) + if not otp_ready: + return False, {"error": f"姓名提交后 AWS 返回错误: {otp_wait_error}"} otp_code = None if otp_callback: @@ -739,7 +894,7 @@ class KiroRegister: self.log(f"获取到验证码: {otp_code},正在填入...") self._type_like_human(page, otp_input, otp_code) self._click_primary_button(page) - time.sleep(2) + self._human_sleep(1.0, 2.2) # 4. 设定与确认密码 self.log("4. 设定与确认密码...") @@ -750,7 +905,7 @@ class KiroRegister: self._fill_password_fields(page, pwd) self._click_primary_button(page) - time.sleep(3) + self._human_sleep(1.3, 2.8) self._solve_captcha_if_exists(page) password_error = self._get_first_visible_text(page, [ @@ -772,7 +927,7 @@ class KiroRegister: try: # 至少等它请求完 CompleteLogin page.wait_for_url(re.compile(r"kiro\.dev"), timeout=30000) - time.sleep(5) + self._human_sleep(3.0, 5.8) except TimeoutError: pass @@ -846,28 +1001,6 @@ class KiroRegister: except Exception as desktop_error: self.log(f"⚠️ 桌面端 Builder ID Token 补抓失败: {desktop_error}") - try: - success_debug = { - "url": page.url, - "tokens": self._captured_tokens, - "cookies": [ - { - "name": c.get("name", ""), - "value": c.get("value", ""), - "domain": c.get("domain", ""), - "path": c.get("path", ""), - } - for c in self.context.cookies() - if "kiro.dev" in c.get("domain", "") or "aws" in c.get("domain", "") - ], - "network_debug": self._network_debug[-20:], - } - with open("kiro_success_debug.json", "w", encoding="utf-8") as f: - json.dump(success_debug, f, ensure_ascii=False, indent=2) - self.log("成功调试信息已保存为 kiro_success_debug.json") - except Exception: - pass - return True, { "email": email, "password": pwd,