mirror of
https://github.com/luguoyixiazi/test_nine.git
synced 2025-12-06 14:52:49 +08:00
Fixes #5 添加dinov3、任务头及yolo11n用于通过二者的验证码 同时更新fullpage.9.2.0-guwyxh.js中新的常量 ``` json { captcha_token":"2064329542", "tsfq":"xovrayel" } ```
325 lines
12 KiB
Python
325 lines
12 KiB
Python
import os
|
||
import json
|
||
import time
|
||
import httpx
|
||
import shutil
|
||
import random
|
||
import uvicorn
|
||
import logging
|
||
import logging.handlers
|
||
LOG_DIR = "logs"
|
||
os.makedirs(LOG_DIR, exist_ok=True)
|
||
LOG_FILENAME = os.path.join(LOG_DIR, "app.log")
|
||
log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
|
||
# --- 日志配置字典 ---
|
||
LOGGING_CONFIG = {
|
||
"version": 1,
|
||
"disable_existing_loggers": False,
|
||
"formatters": {
|
||
# 定义一个名为 "default" 的格式化器
|
||
"default": {
|
||
"()": "logging.Formatter", # 使用标准的 Formatter
|
||
"fmt": "[%(levelname)s - %(filename)s:%(lineno)d | %(funcName)s] - %(asctime)s - %(message)s",
|
||
"datefmt": "%Y-%m-%d %H:%M:%S",
|
||
},
|
||
},
|
||
"handlers": {
|
||
# 控制台输出的 Handler
|
||
"console": {
|
||
"class": "logging.StreamHandler",
|
||
"formatter": "default", # 使用上面定义的 default 格式
|
||
"stream": "ext://sys.stderr",
|
||
},
|
||
# 文件输出和轮转的 Handler
|
||
"file_rotating": {
|
||
"class": "logging.handlers.TimedRotatingFileHandler",
|
||
"formatter": "default", # 也使用上面定义的 default 格式
|
||
"filename": LOG_FILENAME, # 日志文件路径
|
||
"when": "D", # 按天轮转 ('D' for Day)
|
||
"interval": 1, # 每天轮转一次
|
||
"backupCount": 2, # 保留2个旧的日志文件 (加上当前文件,总共覆盖3天)
|
||
"encoding": "utf-8",
|
||
},
|
||
},
|
||
"loggers": {
|
||
# 根日志记录器
|
||
"": {
|
||
# 同时将日志发送到 console 和 file_rotating 两个 Handler
|
||
"handlers": ["console", "file_rotating"],
|
||
"level": log_level,
|
||
},
|
||
# 针对 uvicorn 的日志记录器进行配置,确保它们也使用我们的设置
|
||
"uvicorn": {
|
||
"handlers": ["console", "file_rotating"],
|
||
"level": "WARNING",
|
||
"propagate": False, # 阻止 uvicorn 日志向上传播到根 logger,避免重复记录
|
||
},
|
||
"uvicorn.error": {
|
||
"level": "WARNING",
|
||
"propagate": True, # uvicorn.error 应该传播,以便根记录器可以捕获它
|
||
},
|
||
"uvicorn.access": {
|
||
"handlers": ["console", "file_rotating"],
|
||
"level": log_level,
|
||
"propagate": False,
|
||
},
|
||
},
|
||
}
|
||
logging.config.dictConfig(LOGGING_CONFIG)
|
||
logger = logging.getLogger(__name__)
|
||
from crack import Crack
|
||
from typing import Optional, Dict, Any
|
||
from contextlib import asynccontextmanager
|
||
from fastapi.responses import JSONResponse
|
||
from fastapi import FastAPI,Query, HTTPException
|
||
from predict import (predict_onnx,
|
||
predict_onnx_pdl,
|
||
predict_onnx_dfine,
|
||
predict_dino_classify_pipeline,
|
||
load_by,
|
||
unload,
|
||
get_models,
|
||
get_available_models)
|
||
from crop_image import crop_image_v3,save_path,save_fail_path,save_pass_path,validate_path
|
||
|
||
PORT = 9645
|
||
|
||
def get_available_hosts() -> set[str]:
|
||
"""获取本机所有可用的IPv4地址。"""
|
||
import socket
|
||
hosts = {"127.0.0.1"}
|
||
try:
|
||
hostname = socket.gethostname()
|
||
addr_info = socket.getaddrinfo(hostname, None, socket.AF_INET)
|
||
hosts.update({info[4][0] for info in addr_info})
|
||
except socket.gaierror:
|
||
try:
|
||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
|
||
s.connect(("8.8.8.8", 80))
|
||
hosts.add(s.getsockname()[0])
|
||
except OSError:
|
||
pass
|
||
return hosts
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
logger.info("="*50)
|
||
logger.info("启动服务中...")
|
||
# 从 uvicorn 配置中获取 host 和 port
|
||
server = app.servers[0] if app.servers else None
|
||
host = server.config.host if server else "0.0.0.0"
|
||
port = server.config.port if server else PORT
|
||
if host == "0.0.0.0":
|
||
available_hosts = get_available_hosts()
|
||
logger.info(f"服务地址(依需求选用,docker中使用宿主机host:{port},若使用Uvicorn运行则基于命令):")
|
||
for h in sorted(list(available_hosts)):
|
||
logger.info(f" - http://{h}:{port}")
|
||
else:
|
||
logger.info(f"服务地址: http://{host}:{port}")
|
||
logger.info(f"可用服务路径如下:")
|
||
for route in app.routes:
|
||
logger.info(f" -{route.methods} {route.path}")
|
||
logger.info(f"具体api使用可以查看/docs")
|
||
logger.info("="*50)
|
||
|
||
yield
|
||
logger.info("="*50)
|
||
logger.info("服务关闭")
|
||
logger.info("="*50)
|
||
|
||
app = FastAPI(title="极验V3图标点选+九宫格", lifespan=lifespan)
|
||
|
||
def prepare(gt: str, challenge: str) -> tuple[Crack, bytes, str, str]:
|
||
"""获取信息。"""
|
||
logger.info(f"开始获取:\ngt:{gt}\nchallenge:{challenge}")
|
||
crack = Crack(gt, challenge)
|
||
logger.debug(f"初次获得{crack.gettype()}")
|
||
crack.get_c_s()
|
||
time.sleep(random.uniform(0.4,0.6))
|
||
crack.ajax()
|
||
pic_content,pic_name,pic_type = crack.get_pic()
|
||
return crack,pic_content,pic_name,pic_type
|
||
|
||
def do_pass_nine(pic_content: bytes, use_v3_model: bool, point: Optional[str]) -> list[str]:
|
||
"""处理九宫格验证码,返回坐标点列表。"""
|
||
crop_image_v3(pic_content)
|
||
if use_v3_model:
|
||
result_list = predict_onnx_pdl(validate_path)
|
||
else:
|
||
with open(f"{validate_path}/cropped_9.jpg", "rb") as rb:
|
||
icon_image = rb.read()
|
||
with open(f"{validate_path}/nine.jpg", "rb") as rb:
|
||
bg_image = rb.read()
|
||
result_list = predict_onnx(icon_image, bg_image, point)
|
||
return [f"{col}_{row}" for row, col in result_list]
|
||
|
||
def do_pass_icon(pic:Any, draw_result: bool) -> list[str]:
|
||
"""处理图标点选验证码,返回坐标点列表。"""
|
||
result_list = predict_onnx_dfine(pic,draw_result)
|
||
logger.debug(result_list)
|
||
return [f"{round(x / 333 * 10000)}_{round(y / 333 * 10000)}" for x, y in result_list]
|
||
|
||
def do_pass_icon0(pic:Any, draw_result: bool) -> list[str]:
|
||
"""处理图标点选验证码,返回坐标点列表。"""
|
||
result_list = predict_dino_classify_pipeline(pic,draw_result)
|
||
return [f"{round(x / 333 * 10000)}_{round(y / 333 * 10000)}" for x, y in result_list]
|
||
|
||
def save_image_for_train(pic_name,pic_type,passed):
|
||
shutil.move(os.path.join(validate_path,pic_name),os.path.join(save_path,pic_name))
|
||
if passed:
|
||
path_2_save = os.path.join(save_pass_path,pic_name.split('.')[0])
|
||
else:
|
||
path_2_save = os.path.join(save_fail_path,pic_name.split('.')[0])
|
||
os.makedirs(path_2_save,exist_ok=True)
|
||
for pic in os.listdir(validate_path):
|
||
if pic_type == "nine" and pic.startswith('cropped'):
|
||
shutil.move(os.path.join(validate_path,pic),os.path.join(path_2_save,pic))
|
||
if pic_type == "icon" and pic.startswith('icon'):
|
||
shutil.move(os.path.join(validate_path,pic),os.path.join(path_2_save,pic))
|
||
|
||
|
||
def handle_pass_request(gt: str, challenge: str, save_result: bool, **kwargs) -> JSONResponse:
|
||
"""
|
||
统一处理所有验证码请求的核心函数。
|
||
"""
|
||
start_time = time.monotonic()
|
||
try:
|
||
# 1. 准备
|
||
crack, pic_content, pic_name, pic_type = prepare(gt, challenge)
|
||
|
||
# 2. 识别
|
||
logger.debug(f"接收图片类型{pic_type}")
|
||
if pic_type == "nine":
|
||
point_list = do_pass_nine(
|
||
pic_content,
|
||
use_v3_model=kwargs.get("use_v3_model", True),
|
||
point=kwargs.get("point",None)
|
||
)
|
||
elif pic_type == 'icon': # dino
|
||
point_list = do_pass_icon0(pic_content, save_result)
|
||
elif pic_type == "icon1": # d-fine
|
||
point_list = do_pass_icon(pic_content, save_result)
|
||
else:
|
||
raise HTTPException(status_code=400, detail=f"Unknown picture type: {pic_type}")
|
||
|
||
# 3. 验证
|
||
elapsed = time.monotonic() - start_time
|
||
wait_time = max(0, 4.0 - elapsed)
|
||
time.sleep(wait_time)
|
||
|
||
response_str = crack.verify(point_list)
|
||
result = json.loads(response_str)
|
||
|
||
# 4. 后处理
|
||
passed = 'validate' in result.get('data', {})
|
||
if save_result:
|
||
save_image_for_train(pic_name, pic_type, passed)
|
||
else:
|
||
os.remove(os.path.join(validate_path,pic_name))
|
||
|
||
total_time = time.monotonic() - start_time
|
||
logger.info(
|
||
f"请求完成,耗时: {total_time:.2f}s (等待 {wait_time:.2f}s). "
|
||
f"结果: {result}"
|
||
)
|
||
return JSONResponse(content=result)
|
||
|
||
except Exception as e:
|
||
logger.error(f"服务错误: {e}", exc_info=True)
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={"error": "An internal server error occurred.", "detail": str(e)}
|
||
)
|
||
|
||
|
||
|
||
@app.get("/pass_nine")
|
||
def pass_nine(gt: str = Query(...),
|
||
challenge: str = Query(...),
|
||
point: str = Query(default=None),
|
||
use_v3_model = Query(default=True),
|
||
save_result = Query(default=False)
|
||
):
|
||
return handle_pass_request(
|
||
gt, challenge, save_result,
|
||
use_v3_model=use_v3_model, point=point
|
||
)
|
||
|
||
@app.get("/pass_icon")
|
||
def pass_icon(gt: str = Query(...),
|
||
challenge: str = Query(...),
|
||
save_result = Query(default=False)
|
||
):
|
||
return handle_pass_request(gt, challenge, save_result)
|
||
|
||
@app.get("/pass_uni")
|
||
def pass_uni(gt: str = Query(...),
|
||
challenge: str = Query(...),
|
||
save_result = Query(default=False)
|
||
):
|
||
return handle_pass_request(gt, challenge, save_result)
|
||
|
||
@app.get("/pass_hutao")
|
||
def pass_hutao(gt: str = Query(...),
|
||
challenge: str = Query(...),
|
||
save_result = Query(default=False)):
|
||
try:
|
||
# 调用原函数获取返回值
|
||
response = handle_pass_request(gt, challenge, save_result)
|
||
# 获取原始状态码和内容
|
||
original_status_code = response.status_code
|
||
original_content = json.loads(response.body.decode("utf-8"))
|
||
if original_status_code == 200 and original_content.get("status",False)=="success" and "validate" in original_content.get("data",{}):
|
||
rebuild_content = {"code":0,"data":{"gt":gt,"challenge":challenge,"validate":original_content["data"]["validate"]}}
|
||
else:
|
||
rebuild_content = {"code":1,"data":{"gt":gt,"challenge":challenge,"validate":original_content}}
|
||
return JSONResponse(content=rebuild_content, status_code=original_status_code)
|
||
|
||
except Exception as e:
|
||
logger.error(f"修改路由错误: {e}", exc_info=True)
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={"error": "An internal server error occurred.", "detail": str(e)}
|
||
)
|
||
|
||
@app.get("/list_model")
|
||
def list_model():
|
||
return JSONResponse(content = get_models())
|
||
@app.get("/list_all_model")
|
||
def list_model():
|
||
return JSONResponse(content = get_available_models())
|
||
@app.get("/load_model")
|
||
def load_model(name: str = Query(...)):
|
||
return JSONResponse(content = get_models())
|
||
@app.get("/unload_model")
|
||
def unload_model(name: str = Query(...)):
|
||
return JSONResponse(content = get_models())
|
||
|
||
@app.get("/set_log_level")
|
||
def set_log_level(level: str = Query(...)):
|
||
"""
|
||
在服务运行时动态修改所有主要 logger 的日志级别。
|
||
例如: /set_log_level?level=DEBUG
|
||
"""
|
||
# 将字符串级别转换为 logging 模块对应的整数值
|
||
level_str = str(level).upper()
|
||
numeric_level = getattr(logging, level_str, None)
|
||
if not isinstance(numeric_level, int):
|
||
raise HTTPException(status_code=400, detail=f"无效的日志级别: {level}")
|
||
|
||
# 获取并修改您配置中所有关键 logger 的级别
|
||
# 1. 修改根 logger
|
||
logging.getLogger().setLevel(numeric_level)
|
||
# 2. 修改 uvicorn logger
|
||
logging.getLogger("uvicorn").setLevel(numeric_level)
|
||
logging.getLogger("uvicorn.error").setLevel(numeric_level)
|
||
logging.getLogger("uvicorn.access").setLevel(numeric_level)
|
||
|
||
# 记录一条高级别的日志,确保能被看到
|
||
logger.warning(f"所有 logger 的日志级别已被动态修改为: {level_str}")
|
||
|
||
return JSONResponse(content = f"Log level successfully set to {level_str}")
|
||
|
||
if __name__ == "__main__":
|
||
uvicorn.run(app,port=PORT,log_config=LOGGING_CONFIG)
|
||
|