Files
any-auto-register/core/executors/protocol.py
2026-03-31 17:18:40 +08:00

51 lines
1.7 KiB
Python

"""纯协议执行器 - 基于 curl_cffi"""
from curl_cffi import requests as curl_requests
from ..base_executor import BaseExecutor, Response
from ..proxy_utils import build_requests_proxy_config
class ProtocolExecutor(BaseExecutor):
def __init__(self, proxy: str = None, impersonate: str = "chrome124"):
super().__init__(proxy)
self.s = curl_requests.Session()
self.s.impersonate = impersonate
if proxy:
self.s.proxies = build_requests_proxy_config(proxy)
self.s.headers.update(
{
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
}
)
def _wrap(self, r) -> Response:
cookies = {c.name: c.value for c in self.s.cookies.jar}
return Response(
status_code=r.status_code,
text=r.text,
headers=dict(r.headers),
cookies=cookies,
)
def get(self, url, *, headers=None, params=None) -> Response:
r = self.s.get(url, headers=headers, params=params)
return self._wrap(r)
def post(self, url, *, headers=None, params=None, data=None, json=None) -> Response:
r = self.s.post(url, headers=headers, params=params, data=data, json=json)
return self._wrap(r)
def get_cookies(self) -> dict:
return {c.name: c.value for c in self.s.cookies.jar}
def set_cookies(self, cookies: dict) -> None:
for k, v in cookies.items():
self.s.cookies.set(k, v)
def close(self) -> None:
self.s.close()