5 Commits

Author SHA1 Message Date
星光-k
9268438fd1 Update main.py 2025-05-09 00:45:42 +08:00
星光-k
8c247e2fc6 Update weather.py 2025-05-09 00:44:58 +08:00
星光-k
dd011c3955 Update weather.py 2025-05-09 00:44:32 +08:00
星光-k
e85188dda8 Update weather.py 2025-03-15 11:01:29 +08:00
星光-k
7c73382851 Update README.md 2025-02-08 20:14:55 +08:00
3 changed files with 192 additions and 84 deletions

View File

@@ -1,6 +1,6 @@
# 墨水屏展示当前时间及天气数据 # 墨水屏展示当前时间及天气数据
本项目复刻自[Seek-Huang](https://github.com/Seek-Huang/2.13-Ink-screen-clock)的代码仓库 本项目复刻自[Seek-Huang](https://github.com/Seek-Huang)的[代码仓库](https://github.com/Seek-Huang/2.13-Ink-screen-clock)
并在此基础上进行改进 并在此基础上进行改进
## 本仓库已添加[一键安装部署脚本](https://github.com/kxgx/2.13-Ink-screen-clock#%E4%BD%BF%E7%94%A8%E8%84%9A%E6%9C%AC%E7%9B%B4%E6%8E%A5%E5%AE%89%E8%A3%85%E6%8E%A8%E8%8D%90) ## 本仓库已添加[一键安装部署脚本](https://github.com/kxgx/2.13-Ink-screen-clock#%E4%BD%BF%E7%94%A8%E8%84%9A%E6%9C%AC%E7%9B%B4%E6%8E%A5%E5%AE%89%E8%A3%85%E6%8E%A8%E8%8D%90)

View File

@@ -159,26 +159,39 @@ def Bottom_edge(): # 在图片中添加底边内容
draw.text((10, 107), f"IP:{local_addr}", font=font05, fill=255) # 显示当前IP地址 draw.text((10, 107), f"IP:{local_addr}", font=font05, fill=255) # 显示当前IP地址
def Weather(): # 在图片中添加天气内容 def Weather():
with open('/root/2.13-Ink-screen-clock/bin/weather.json', 'r') as file: try:
weather_data = json.load(file) with open('/root/2.13-Ink-screen-clock/bin/weather.json', 'r') as Weather_json:
global Weather_position, temperature, weather, wind_direction, weather_update, weather_date, humidity Weather_data = Weather_json.read()
Weather_position = weather_data['cityname'] # 定位位置 if not Weather_data.strip(): # 检查文件是否为空
temperature = weather_data['temp'] + u'°C' # 温度 logging.error("天气数据文件为空")
weather = weather_data['weather'] # 天气情况 return
wind_direction = weather_data['WD'] # 风向
weather_update = weather_data['time'] # 天气更新时间 Weather_text = json.loads(Weather_data)
weather_date = weather_data['date'] # 日期 global Weather_position, temperature, weather, wind_direction, weather_update, weather_date, humidity
humidity = weather_data['SD'] # 湿度
draw.text((150, 25), "天气:", font=font06, fill=0) # 显示当前天气前缀 Weather_position = Weather_text.get('cityname', '未知') # 使用get方法提供默认值
draw.text((150, 45), "温度:", font=font06, fill=0) # 显示当前温度前缀 temperature = f"{Weather_text.get('temp', '--')}°C"
draw.text((150, 65), "湿度:", font=font06, fill=0) # 显示当前湿度前缀 weather = Weather_text.get('weather', '未知')
draw.text((150, 85), "城市:", font=font06, fill=0) # 显示当前城市前缀 wind_direction = Weather_text.get('WD', '未知')
draw.text((191, 25), weather, font=font06, fill=0) weather_update = Weather_text.get('time', '未知')
draw.text((191, 45), temperature, font=font06, fill=0) weather_date = Weather_text.get('date', '未知')
draw.text((191, 65), humidity, font=font06, fill=0) humidity = Weather_text.get('SD', '未知')
draw.text((191, 85), Weather_position, font=font06, fill=0) draw.text((150,25),"天气:",font = font06,fill =0)#显示当前天气前缀
draw.text((211, 107), weather_update, font=font05, fill=255) # 显示天气更新时间 draw.text((150,45),"温度:",font = font06,fill =0)#显示当前温度前缀
draw.text((150,65),"湿度:",font = font06,fill =0)#显示当前湿度前缀
draw.text((150,85),"城市:",font = font06,fill =0)#显示当前城市前缀
draw.text((191,25),weather,font = font06,fill =0)
draw.text((191,45),temperature,font = font06,fill =0)
draw.text((191,65),humidity,font = font06,fill =0)
draw.text((191,85),Weather_position,font = font06,fill =0)
draw.text((211,107),weather_update,font = font05,fill =255) #显示天气更新时间
except FileNotFoundError:
logging.error("天气数据文件未找到")
except json.JSONDecodeError as e:
logging.error(f"天气数据解析失败: {str(e)}")
except Exception as e:
logging.error(f"获取天气信息时发生错误: {str(e)}")
def Basic_refresh(): # 全刷函数 def Basic_refresh(): # 全刷函数
@@ -309,4 +322,4 @@ epd.init()
epd.Clear(0xFF) # 清除屏幕内容 epd.Clear(0xFF) # 清除屏幕内容
epd.sleep() # 使屏幕进入休眠状态 epd.sleep() # 使屏幕进入休眠状态
epd2in13_V4.epdconfig.module_exit() # 清理资源 epd2in13_V4.epdconfig.module_exit() # 清理资源
exit() exit()

View File

@@ -5,6 +5,9 @@ import json
import time import time
import logging import logging
import requests import requests
import random
import socket
from functools import wraps
from threading import Timer from threading import Timer
logging.basicConfig( logging.basicConfig(
@@ -13,13 +16,163 @@ logging.basicConfig(
datefmt='%Y-%m-%d %H:%M:%S' datefmt='%Y-%m-%d %H:%M:%S'
) )
def check_network_connection():
"""检查网络连接状态"""
try:
# 尝试连接一个可靠的公共DNS服务器
socket.create_connection(("223.5.5.5", 53), timeout=5)
return True
except OSError:
return False
def get_ip():
"""改进的IP获取函数"""
if not check_network_connection():
logging.warning("网络连接不可用")
return None
services = [
{"url": "https://api.ipify.org?format=json", "field": "ip"},
{"url": "https://ipinfo.io/json", "field": "ip"},
{"url": "https://ifconfig.me/all.json", "field": "ip_addr"}
]
for service in services:
try:
resp = requests.get(service["url"], timeout=10)
data = resp.json()
return data.get(service["field"])
except Exception:
continue
logging.error("所有IP服务尝试失败")
return None
def get_ip():
"""从ip.cn获取当前IP地址"""
url = "https://ip.cn/api/index?ip=&type=0"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
try:
resp = requests.get(url, headers=headers, timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get('code') == 'Success':
return data.get('ip', '')
logging.error("获取IP失败: %s", data.get('msg', '未知错误'))
except Exception as e:
logging.error("获取IP异常: %s", str(e))
return None
def get_current_city():
"""通过IP地址获取当前定位城市并去除''后缀"""
ip = get_ip()
if not ip:
return None
url = f"http://ip-api.com/json/{ip}?fields=city&lang=zh-CN"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
while True:
try:
resp = requests.get(url, headers=headers, timeout=10)
data = resp.json()
if data.get('status') == 'success':
return data.get('city', '').replace('', '')
logging.error("定位失败: %s", data.get('message', '未知错误'))
except Exception as e:
logging.error("定位异常: %s", str(e))
time.sleep(180)
def schedule_getWeath():
"""改进的定时任务调度"""
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
getWeath()
break
except Exception as e:
retry_count += 1
wait_time = min(300, retry_count * 60) # 指数退避
logging.error(f"天气更新失败({retry_count}/{max_retries}): {str(e)}")
time.sleep(wait_time)
# 无论成功与否,都安排下一次执行
Timer(1800, schedule_getWeath).start() # 30分钟间隔
def getWeath(default_city='101060101'):
"""改进的天气获取函数"""
# 1. 检查网络连接
if not check_network_connection():
logging.error("网络不可用,跳过天气更新")
return
# 2. 获取城市信息
city_name = None
try:
city_name = get_current_city()
except Exception as e:
logging.error(f"获取城市失败: {str(e)}")
# 3. 确定区域ID
area_id = default_city
if city_name:
try:
found_id = get_area_id(city_name)
if found_id:
area_id = found_id
logging.info(f"使用城市区域ID: {area_id} ({city_name})")
except Exception as e:
logging.error(f"获取区域ID失败: {str(e)}")
# 4. 获取天气数据
weather_apis = [
f'https://d1.weather.com.cn/sk_2d/{area_id}.html',
f'https://www.weather.com.cn/weather1d/{area_id}.shtml'
]
for api_url in weather_apis:
try:
resp = requests.get(
api_url,
headers={'User-Agent': 'Mozilla/5.0'},
timeout=15
)
resp.raise_for_status()
# 处理不同API的响应格式
if 'sk_2d' in api_url:
weather_data = resp.content[11:].decode('utf-8')
else:
weather_data = parse_html_weather(resp.text)
# 验证数据有效性
if validate_weather_data(weather_data):
with open('/root/2.13-Ink-screen-clock/bin/weather.json', 'w') as f:
json.dump(weather_data, f)
logging.info("天气数据更新成功")
return
except Exception as e:
logging.warning(f"天气API {api_url} 失败: {str(e)}")
continue
logging.error("所有天气API尝试失败")
def get_area_id(city_name): def get_area_id(city_name):
"""从city.js中检索AREAID无限重试直到成功""" """从city.js中检索AREAID无限重试直到成功"""
url = "https://j.i8tq.com/weather2020/search/city.js" url = "https://j.i8tq.com/weather2020/search/city.js"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
while True: while True:
try: try:
resp = requests.get(url, timeout=10) resp = requests.get(url, headers=headers, timeout=10)
resp.encoding = 'utf-8' # 显式设置编码 resp.encoding = 'utf-8'
city_data = json.loads(resp.text.split('=', 1)[1].rstrip(';')) city_data = json.loads(resp.text.split('=', 1)[1].rstrip(';'))
for province in city_data.values(): for province in city_data.values():
for city in province.values(): for city in province.values():
@@ -31,68 +184,10 @@ def get_area_id(city_name):
logging.error("获取城市ID失败: %s", str(e)) logging.error("获取城市ID失败: %s", str(e))
time.sleep(180) time.sleep(180)
def get_current_city():
"""获取当前定位城市并去除''后缀"""
url = "http://ip-api.com/json/?lang=zh-CN"
while True:
try:
resp = requests.get(url, timeout=10)
resp.encoding = 'utf-8' # 显式设置编码
data = resp.json()
if data['status'] == 'success':
return data['city'].rstrip('')
logging.error("定位失败: %s", data.get('message'))
except Exception as e:
logging.error("定位异常: %s", str(e))
time.sleep(180)
def schedule_getWeath():
"""定时任务调度"""
try:
getWeath()
finally:
Timer(180, schedule_getWeath).start()
def getWeath(default_city='101060101'):
"""获取天气数据核心函数"""
city_name = get_current_city()
area_id = default_city # 设置默认值
if city_name:
try:
area_id = get_area_id(city_name) or default_city
except Exception as e:
logging.error("获取区域ID失败: %s", str(e))
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Referer': 'https://www.weather.com.cn/'
}
try:
resp = requests.get(
f'https://d1.weather.com.cn/sk_2d/{area_id}.html',
headers=headers,
timeout=15
)
resp.encoding = 'utf-8' # 关键修改:强制设置响应编码
resp.raise_for_status()
# 直接处理原始字节数据
weather_data = resp.content[11:].decode('utf-8')
# 使用utf-8编码写入文件
with open('/root/2.13-Ink-screen-clock/bin/weather.json', 'w', encoding='utf-8') as f:
f.write(weather_data)
logging.info("天气数据更新成功")
except Exception as e:
logging.error("天气更新失败: %s", str(e))
if __name__ == "__main__": if __name__ == "__main__":
try: try:
schedule_getWeath() schedule_getWeath()
while True: while True:
time.sleep(1) # 保持主线程存活 time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("程序已终止") logging.info("程序已终止")