mirror of
https://github.com/zc-zhangchen/any-auto-register.git
synced 2026-05-08 08:14:04 +08:00
45 lines
1.6 KiB
Python
45 lines
1.6 KiB
Python
"""纯协议执行器 - 基于 curl_cffi"""
|
|
from curl_cffi import requests as curl_requests
|
|
from ..base_executor import BaseExecutor, Response
|
|
|
|
|
|
class ProtocolExecutor(BaseExecutor):
|
|
def __init__(self, proxy: str = None, impersonate: str = "chrome124"):
|
|
super().__init__(proxy)
|
|
self.s = curl_requests.Session()
|
|
self.s.impersonate = impersonate
|
|
if proxy:
|
|
self.s.proxies = {"http": proxy, "https": proxy}
|
|
self.s.headers.update({
|
|
"user-agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/124.0.0.0 Safari/537.36")
|
|
})
|
|
|
|
def _wrap(self, r) -> Response:
|
|
cookies = {c.name: c.value for c in self.s.cookies.jar}
|
|
return Response(
|
|
status_code=r.status_code,
|
|
text=r.text,
|
|
headers=dict(r.headers),
|
|
cookies=cookies,
|
|
)
|
|
|
|
def get(self, url, *, headers=None, params=None) -> Response:
|
|
r = self.s.get(url, headers=headers, params=params)
|
|
return self._wrap(r)
|
|
|
|
def post(self, url, *, headers=None, params=None, data=None, json=None) -> Response:
|
|
r = self.s.post(url, headers=headers, params=params, data=data, json=json)
|
|
return self._wrap(r)
|
|
|
|
def get_cookies(self) -> dict:
|
|
return {c.name: c.value for c in self.s.cookies.jar}
|
|
|
|
def set_cookies(self, cookies: dict) -> None:
|
|
for k, v in cookies.items():
|
|
self.s.cookies.set(k, v)
|
|
|
|
def close(self) -> None:
|
|
self.s.close()
|