更新 weather.py

This commit is contained in:
星光-k
2025-02-08 01:17:31 +00:00
committed by GitHub
parent b840257988
commit e8b3668ed7

View File

@@ -7,83 +7,92 @@ import logging
import requests import requests
from threading import Timer from threading import Timer
white = 255 # 颜色 logging.basicConfig(
black = 0 level=logging.INFO,
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') # 设置日志级别 format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def get_area_id(city_name): def get_area_id(city_name):
"""从city.js中检索AREAID无限重试直到成功""" """从city.js中检索AREAID无限重试直到成功"""
url = "https://j.i8tq.com/weather2020/search/city.js" url = "https://j.i8tq.com/weather2020/search/city.js"
while True: while True:
try: try:
response = requests.get(url) resp = requests.get(url, timeout=10)
response.raise_for_status() # 检查请求是否成功 resp.encoding = 'utf-8' # 显式设置编码
city_data_text = response.text.strip().split('var city_data = ')[-1].rstrip(';') city_data = json.loads(resp.text.split('=', 1)[1].rstrip(';'))
if city_data_text: for province in city_data.values():
city_data = json.loads(city_data_text) for city in province.values():
for province, cities in city_data.items(): for district, info in city.items():
for city, districts in cities.items(): if info['NAMECN'] == city_name:
for district, info in districts.items(): return info['AREAID']
if info['NAMECN'] == city_name: logging.error("未找到城市: %s", city_name)
return info['AREAID'] except Exception as e:
logging.error("城市名称 '%s' 在城市数据中未找到", city_name) logging.error("获取城市ID失败: %s", str(e))
else: time.sleep(180)
logging.error("从city.js接收到的数据为空")
except (requests.RequestException, json.JSONDecodeError) as e:
logging.error("检索或解析城市数据时发生错误: %s", e)
time.sleep(180) # 重试前等待
def get_current_city(): def get_current_city():
"""获取当前城市名称,无限重试直到成功""" """获取当前定位城市并去除''后缀"""
url = "http://ip-api.com/json/?lang=zh-CN" url = "http://ip-api.com/json/?lang=zh-CN"
while True: while True:
try: try:
response = requests.get(url) resp = requests.get(url, timeout=10)
response.raise_for_status() # 检查请求是否成功 resp.encoding = 'utf-8' # 显式设置编码
data = response.json() data = resp.json()
if data['status'] == 'success': if data['status'] == 'success':
return data['city'] return data['city'].rstrip('')
else: logging.error("定位失败: %s", data.get('message'))
logging.error("获取当前城市失败: %s", data['message']) except Exception as e:
except (requests.RequestException, json.JSONDecodeError) as e: logging.error("定位异常: %s", str(e))
logging.error("检索或解析当前城市时出现错误: %s", e) time.sleep(180)
time.sleep(180) # 重试前等待
def schedule_getWeath(): def schedule_getWeath():
"""调度 getWeath 函数每3分钟执行一次""" """定时任务调度"""
getWeath() try:
Timer(180, schedule_getWeath).start() # 重新设置定时器 getWeath()
finally:
Timer(180, schedule_getWeath).start()
def getWeath(city='101060101'): def getWeath(default_city='101060101'):
"""获取天气数据核心函数"""
city_name = get_current_city()
area_id = default_city # 设置默认值
if city_name:
try:
area_id = get_area_id(city_name) or default_city
except Exception as e:
logging.error("获取区域ID失败: %s", str(e))
headers = { headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Referer': 'https://www.weather.com.cn/' 'Referer': 'https://www.weather.com.cn/'
} }
current_city = get_current_city()
if current_city:
area_id = get_area_id(current_city)
if area_id:
city = area_id
try: try:
response = requests.get(f'https://d1.weather.com.cn/sk_2d/{city}.html', headers=headers) resp = requests.get(
response.raise_for_status() # 检查请求是否成功 f'https://d1.weather.com.cn/sk_2d/{area_id}.html',
response.encoding = 'utf-8' headers=headers,
weather_data = response.text[11:] timeout=15
with open('/root/2.13-Ink-screen-clock/bin/weather.json', 'w') as fileHandle: )
fileHandle.write(weather_data) resp.encoding = 'utf-8' # 关键修改:强制设置响应编码
print("天气文件更新") resp.raise_for_status()
except requests.RequestException as e:
logging.error("获取天气数据时出现网络错误: %s", e) # 直接处理原始字节数据
weather_data = resp.content[11:].decode('utf-8')
# 使用utf-8编码写入文件
with open('/root/2.13-Ink-screen-clock/bin/weather.json', 'w', encoding='utf-8') as f:
f.write(weather_data)
logging.info("天气数据更新成功")
except Exception as e: except Exception as e:
logging.error("发生了意外错误: %s", e) logging.error("天气更新失败: %s", str(e))
try: if __name__ == "__main__":
schedule_getWeath() # 开始调度天气获取函数 try:
except KeyboardInterrupt: schedule_getWeath()
logging.info("检测到键盘中断,正在退出") while True:
except Exception as e: time.sleep(1) # 保持主线程存活
logging.error("发生了意外错误: %s", e) except KeyboardInterrupt:
exit() logging.info("程序已终止")
# 脚本正常结束后的清理操作
exit()