mirror of
https://github.com/kxgx/2.13-Ink-screen-clock.git
synced 2026-04-03 10:15:06 +08:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e8b3668ed7 | ||
|
|
b840257988 | ||
|
|
3541ecc135 | ||
|
|
aa6df92479 | ||
|
|
da65b28227 |
65
bin/clock.py
65
bin/clock.py
@@ -202,43 +202,40 @@ def Partial_refresh(): # 局刷函数
|
|||||||
power_str1 = power_battery()
|
power_str1 = power_battery()
|
||||||
power_str = power_str1
|
power_str = power_str1
|
||||||
Local_strong_brush() # 局部强刷
|
Local_strong_brush() # 局部强刷
|
||||||
|
|
||||||
try:
|
|
||||||
##################屏幕初始化#########################
|
|
||||||
epd = epd2in13_V4.EPD() # 初始化
|
|
||||||
epd.init() # 设定屏幕刷新模式
|
|
||||||
# epd.Clear(0xFF) # 清除屏幕内容
|
|
||||||
##################屏幕初始化#########################
|
|
||||||
logging.info("Width = %s, Height = %s", format(epd.width), format(epd.height)) # 打印屏幕高度及宽度
|
|
||||||
logging.info("Initialize and clear the display") # 屏幕开始准备相关展示
|
|
||||||
info_image = Image.new('1', (epd.height, epd.width), 255) # 画布创建准备
|
|
||||||
draw = ImageDraw.Draw(info_image)
|
|
||||||
Basic_refresh() # 全局刷新
|
|
||||||
Partial_refresh() # 局部刷新
|
|
||||||
epd.init()
|
|
||||||
epd.Clear(0xFF)
|
|
||||||
epd.sleep()
|
|
||||||
except IOError as e:
|
|
||||||
logging.info(e)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
logging.info("Keyboard interrupt detected, exiting gracefully.")
|
|
||||||
epd.init()
|
|
||||||
epd.Clear(0xFF) # 清除屏幕内容
|
|
||||||
epd.sleep() # 使屏幕进入休眠状态
|
|
||||||
epd2in13_V4.epdconfig.module_exit() # 清理资源
|
|
||||||
exit()
|
|
||||||
|
|
||||||
except Exception as e:
|
retry_interval = 180 # 设置重试间隔时间(秒)
|
||||||
logging.error("An unexpected error occurred: %s", e)
|
while True:
|
||||||
epd.init()
|
try:
|
||||||
epd.Clear(0xFF) # 清除屏幕内容
|
##################屏幕初始化#########################
|
||||||
epd.sleep() # 使屏幕进入休眠状态
|
epd = epd2in13_V4.EPD() #初始化
|
||||||
epd2in13_V4.epdconfig.module_exit() # 清理资源
|
epd.init()#设定屏幕刷新模式
|
||||||
exit()
|
#epd.Clear(0xFF) #清除屏幕内容
|
||||||
|
##################屏幕初始化#########################
|
||||||
|
logging.info("Width = %s, Height = %s", format(epd.width), format(epd.height)) #打印屏幕高度及宽度
|
||||||
|
logging.info("初始化并清空显示屏")#屏幕开始准备相关展示
|
||||||
|
info_image = Image.new('1', (epd.height, epd.width), 255) #画布创建准备
|
||||||
|
draw = ImageDraw.Draw(info_image)
|
||||||
|
Basic_refresh() #全局刷新
|
||||||
|
Partial_refresh() #局部刷新
|
||||||
|
epd.init()
|
||||||
|
epd.Clear(0xFF)
|
||||||
|
epd.sleep()
|
||||||
|
time.sleep(300)
|
||||||
|
break # 如果脚本执行成功,则退出循环
|
||||||
|
except (OSError, Exception) as e: # 捕获你提到的异常
|
||||||
|
logging.error("发生了错误: %s", e)
|
||||||
|
time.sleep(retry_interval) # 等待一段时间后重试
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.info("检测到键盘中断,正在清理并退出")
|
||||||
|
epd.init()
|
||||||
|
epd.Clear(0xFF) # 清除屏幕内容
|
||||||
|
epd.sleep() # 使屏幕进入休眠状态
|
||||||
|
epd2in13_V4.epdconfig.module_exit() # 清理资源
|
||||||
|
exit()
|
||||||
|
|
||||||
# 脚本正常结束后的清理操作
|
# 脚本正常结束后的清理操作
|
||||||
epd.init()
|
epd.init()
|
||||||
epd.Clear(0xFF) # 清除屏幕内容
|
epd.Clear(0xFF) # 清除屏幕内容
|
||||||
epd.sleep() # 使屏幕进入休眠状态
|
epd.sleep() # 使屏幕进入休眠状态
|
||||||
epd2in13_V4.epdconfig.module_exit() # 清理资源
|
epd2in13_V4.epdconfig.module_exit() # 清理资源
|
||||||
exit()
|
exit()
|
||||||
102
bin/main.py
102
bin/main.py
@@ -55,28 +55,28 @@ def set_system_time_from_hwclock(utc=True):
|
|||||||
hwclock_args = ['sudo', 'hwclock', '--hctosys']
|
hwclock_args = ['sudo', 'hwclock', '--hctosys']
|
||||||
if not utc:
|
if not utc:
|
||||||
hwclock_args.append('--localtime')
|
hwclock_args.append('--localtime')
|
||||||
|
|
||||||
logging.debug(f"Executing hwclock command: {' '.join(hwclock_args)}")
|
logging.debug(f"Executing hwclock command: {' '.join(hwclock_args)}")
|
||||||
|
|
||||||
# 使用 subprocess.run 执行 hwclock --hctosys 并捕获输出
|
# 使用 subprocess.run 执行 hwclock --hctosys 并捕获输出
|
||||||
result = subprocess.run(hwclock_args,
|
result = subprocess.run(hwclock_args,
|
||||||
check=True, # 如果命令失败,则抛出 CalledProcessError 异常
|
check=True, # 如果命令失败,则抛出 CalledProcessError 异常
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=subprocess.PIPE,
|
||||||
text=True)
|
text=True)
|
||||||
|
|
||||||
# 等待一小段时间以确保时间更新完成
|
# 等待一小段时间以确保时间更新完成
|
||||||
time.sleep(0.1) # 根据需要调整
|
time.sleep(0.1) # 根据需要调整
|
||||||
|
|
||||||
# 记录调用 hwclock 后的时间
|
# 记录调用 hwclock 后的时间
|
||||||
after_time = datetime.datetime.now()
|
after_time = datetime.datetime.now()
|
||||||
logging.debug("System time successfully set from hardware clock.")
|
logging.debug("System time successfully set from hardware clock.")
|
||||||
logging.debug(f"System time after hwclock call: {after_time}")
|
logging.debug(f"System time after hwclock call: {after_time}")
|
||||||
|
|
||||||
# 检查时间是否发生了倒退
|
# 检查时间是否发生了倒退
|
||||||
if after_time < before_time:
|
if after_time < before_time:
|
||||||
logging.warning(f"Time went backwards after hwclock call: before={before_time}, after={after_time}")
|
logging.warning(f"Time went backwards after hwclock call: before={before_time}, after={after_time}")
|
||||||
|
|
||||||
return True
|
return True
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
logging.error(f"Failed to set system time from hardware clock: {e.stderr}")
|
logging.error(f"Failed to set system time from hardware clock: {e.stderr}")
|
||||||
@@ -87,18 +87,17 @@ def set_system_time_from_hwclock(utc=True):
|
|||||||
|
|
||||||
def get_time():
|
def get_time():
|
||||||
global has_set_system_time
|
global has_set_system_time
|
||||||
|
|
||||||
if not has_set_system_time:
|
if not has_set_system_time:
|
||||||
# 尝试从硬件时钟设置系统时间,默认假设 RTC 是 UTC
|
# 尝试从硬件时钟设置系统时间,默认假设 RTC 是 UTC
|
||||||
success = set_system_time_from_hwclock(utc=True)
|
success = set_system_time_from_hwclock(utc=True)
|
||||||
# 无论成功与否,都更新标志变量以避免重复尝试
|
# 无论成功与否,都更新标志变量以避免重复尝试
|
||||||
has_set_system_time = True
|
has_set_system_time = True
|
||||||
|
|
||||||
# 获取并返回当前时间,格式为 HH:MM 大写
|
# 获取并返回当前时间,格式为 HH:MM 大写
|
||||||
current_time = time.strftime('%H:%M').upper()
|
current_time = time.strftime('%H:%M').upper()
|
||||||
logging.debug(f"Returning current time: {current_time}")
|
logging.debug(f"Returning current time: {current_time}")
|
||||||
return current_time
|
return current_time
|
||||||
|
|
||||||
def Get_ipv4_address(): # 获取当前的IP地址
|
def Get_ipv4_address(): # 获取当前的IP地址
|
||||||
try:
|
try:
|
||||||
ip_output = subprocess.check_output(
|
ip_output = subprocess.check_output(
|
||||||
@@ -108,14 +107,36 @@ def Get_ipv4_address(): # 获取当前的IP地址
|
|||||||
return filtered_ips[0] if filtered_ips else "地址获取失败"
|
return filtered_ips[0] if filtered_ips else "地址获取失败"
|
||||||
except subprocess.CalledProcessError:
|
except subprocess.CalledProcessError:
|
||||||
return "获取失败"
|
return "获取失败"
|
||||||
|
# 全局变量声明
|
||||||
|
last_power = None # 用于存储上一次获取的电量值
|
||||||
|
last_power_time = 0 # 用于存储上一次获取电量的时间戳
|
||||||
|
|
||||||
def power_battery(): # 获取当前电池电量
|
def power_battery(): # 获取当前电池电量
|
||||||
return str(int(subprocess.check_output(
|
global last_power, last_power_time
|
||||||
u"echo \"get battery\" | nc -q 0 127.0.0.1 8423|awk -F':' '{print int($2)}'",
|
current_time = time.time() # 获取当前时间戳
|
||||||
shell=True).decode('gbk'))) + u'%'
|
|
||||||
|
|
||||||
|
# 每3分钟或首次获取时更新
|
||||||
|
if (current_time - last_power_time >= 180) or (last_power is None):
|
||||||
|
try:
|
||||||
|
# 执行命令获取电池电量
|
||||||
|
result = subprocess.check_output(
|
||||||
|
u"echo \"get battery\" | nc -q 0 127.0.0.1 8423 | awk -F':' '{print int($2)}'",
|
||||||
|
shell=True, stderr=subprocess.STDOUT
|
||||||
|
).decode('gbk').strip()
|
||||||
|
new_power = f"{int(result)}%" # 格式化电量值
|
||||||
|
|
||||||
|
# 仅在电量变化或首次时更新显示
|
||||||
|
if new_power != last_power or last_power is None:
|
||||||
|
logging.info(f"电池电量更新: {new_power}")
|
||||||
|
last_power = new_power # 更新缓存值
|
||||||
|
last_power_time = current_time # 更新获取时间
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"电量获取失败: {str(e)}")
|
||||||
|
new_power = last_power if last_power else "0%" # 失败时使用缓存值或默认值
|
||||||
|
else:
|
||||||
|
new_power = last_power # 未到更新时间,使用缓存值
|
||||||
|
|
||||||
|
return new_power
|
||||||
def Bottom_edge(): # 在图片中添加底边内容
|
def Bottom_edge(): # 在图片中添加底边内容
|
||||||
draw.rectangle((0, 105, 250, 122), 'black', 'black')
|
draw.rectangle((0, 105, 250, 122), 'black', 'black')
|
||||||
'''电池图标画图'''
|
'''电池图标画图'''
|
||||||
@@ -253,24 +274,39 @@ def Partial_refresh(): # 局刷函数
|
|||||||
power_str = power_str1
|
power_str = power_str1
|
||||||
Local_strong_brush() # 局部强刷
|
Local_strong_brush() # 局部强刷
|
||||||
|
|
||||||
|
retry_interval = 180 # 设置重试间隔时间(秒)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
##################屏幕初始化#########################
|
||||||
|
epd = epd2in13_V4.EPD() #初始化
|
||||||
|
epd.init()#设定屏幕刷新模式
|
||||||
|
#epd.Clear(0xFF) #清除屏幕内容
|
||||||
|
##################屏幕初始化#########################
|
||||||
|
logging.info("Width = %s, Height = %s", format(epd.width), format(epd.height)) #打印屏幕高度及宽度
|
||||||
|
logging.info("初始化并清空显示屏")#屏幕开始准备相关展示
|
||||||
|
info_image = Image.new('1', (epd.height, epd.width), 255) #画布创建准备
|
||||||
|
draw = ImageDraw.Draw(info_image)
|
||||||
|
Basic_refresh() #全局刷新
|
||||||
|
Partial_refresh() #局部刷新
|
||||||
|
epd.init()
|
||||||
|
epd.Clear(0xFF)
|
||||||
|
epd.sleep()
|
||||||
|
time.sleep(300)
|
||||||
|
break # 如果脚本执行成功,则退出循环
|
||||||
|
except (OSError, Exception) as e: # 捕获你提到的异常
|
||||||
|
logging.error("发生了错误: %s", e)
|
||||||
|
time.sleep(retry_interval) # 等待一段时间后重试
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.info("检测到键盘中断,正在清理并退出")
|
||||||
|
epd.init()
|
||||||
|
epd.Clear(0xFF) # 清除屏幕内容
|
||||||
|
epd.sleep() # 使屏幕进入休眠状态
|
||||||
|
epd2in13_V4.epdconfig.module_exit() # 清理资源
|
||||||
|
exit()
|
||||||
|
|
||||||
try:
|
# 脚本正常结束后的清理操作
|
||||||
##################屏幕初始化#########################
|
epd.init()
|
||||||
epd = epd2in13_V4.EPD() # 初始化
|
epd.Clear(0xFF) # 清除屏幕内容
|
||||||
epd.init() # 设定屏幕刷新模式
|
epd.sleep() # 使屏幕进入休眠状态
|
||||||
logging.info("Width = %s, Height = %s", format(epd.width), format(epd.height)) # 打印屏幕高度及宽度
|
epd2in13_V4.epdconfig.module_exit() # 清理资源
|
||||||
logging.info("初始化并清空显示屏") # 屏幕开始准备相关展示
|
exit()
|
||||||
info_image = Image.new('1', (epd.height, epd.width), 255) # 画布创建准备
|
|
||||||
draw = ImageDraw.Draw(info_image)
|
|
||||||
Basic_refresh() # 全局刷新
|
|
||||||
Partial_refresh() # 局部刷新
|
|
||||||
except OSError as e:
|
|
||||||
logging.info(e)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
logging.info("检测到键盘中断,正在清理并退出")
|
|
||||||
finally:
|
|
||||||
epd.init()
|
|
||||||
epd.Clear(0xFF) # 清除屏幕内容
|
|
||||||
epd.sleep() # 使屏幕进入休眠状态
|
|
||||||
epd2in13_V4.epdconfig.module_exit() # 清理资源
|
|
||||||
exit()
|
|
||||||
@@ -11,4 +11,4 @@ do
|
|||||||
kill -9 $id
|
kill -9 $id
|
||||||
done
|
done
|
||||||
nohup /usr/bin/python3 -u $dir/$f_name > $logdir/info.log 2>&1 &
|
nohup /usr/bin/python3 -u $dir/$f_name > $logdir/info.log 2>&1 &
|
||||||
nohup /usr/bin/python3 -u $dir/$f1_name > $logdir/info-wenter.log 2>&1 &
|
nohup /usr/bin/python3 -u $dir/$f1_name > $logdir/info-weather.log 2>&1 &
|
||||||
|
|||||||
127
bin/weather.py
127
bin/weather.py
@@ -7,83 +7,92 @@ import logging
|
|||||||
import requests
|
import requests
|
||||||
from threading import Timer
|
from threading import Timer
|
||||||
|
|
||||||
white = 255 # 颜色
|
logging.basicConfig(
|
||||||
black = 0
|
level=logging.INFO,
|
||||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') # 设置日志级别
|
format='%(asctime)s - %(message)s',
|
||||||
|
datefmt='%Y-%m-%d %H:%M:%S'
|
||||||
|
)
|
||||||
|
|
||||||
def get_area_id(city_name):
|
def get_area_id(city_name):
|
||||||
"""从city.js中检索AREAID,无限重试直到成功"""
|
"""从city.js中检索AREAID,无限重试直到成功"""
|
||||||
url = "https://j.i8tq.com/weather2020/search/city.js"
|
url = "https://j.i8tq.com/weather2020/search/city.js"
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
response = requests.get(url)
|
resp = requests.get(url, timeout=10)
|
||||||
response.raise_for_status() # 检查请求是否成功
|
resp.encoding = 'utf-8' # 显式设置编码
|
||||||
city_data_text = response.text.strip().split('var city_data = ')[-1].rstrip(';')
|
city_data = json.loads(resp.text.split('=', 1)[1].rstrip(';'))
|
||||||
if city_data_text:
|
for province in city_data.values():
|
||||||
city_data = json.loads(city_data_text)
|
for city in province.values():
|
||||||
for province, cities in city_data.items():
|
for district, info in city.items():
|
||||||
for city, districts in cities.items():
|
if info['NAMECN'] == city_name:
|
||||||
for district, info in districts.items():
|
return info['AREAID']
|
||||||
if info['NAMECN'] == city_name:
|
logging.error("未找到城市: %s", city_name)
|
||||||
return info['AREAID']
|
except Exception as e:
|
||||||
logging.error("城市名称 '%s' 在城市数据中未找到", city_name)
|
logging.error("获取城市ID失败: %s", str(e))
|
||||||
else:
|
time.sleep(180)
|
||||||
logging.error("从city.js接收到的数据为空")
|
|
||||||
except (requests.RequestException, json.JSONDecodeError) as e:
|
|
||||||
logging.error("检索或解析城市数据时发生错误: %s", e)
|
|
||||||
time.sleep(180) # 重试前等待
|
|
||||||
|
|
||||||
def get_current_city():
|
def get_current_city():
|
||||||
"""获取当前城市名称,无限重试直到成功"""
|
"""获取当前定位城市并去除'市'后缀"""
|
||||||
url = "http://ip-api.com/json/?lang=zh-CN"
|
url = "http://ip-api.com/json/?lang=zh-CN"
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
response = requests.get(url)
|
resp = requests.get(url, timeout=10)
|
||||||
response.raise_for_status() # 检查请求是否成功
|
resp.encoding = 'utf-8' # 显式设置编码
|
||||||
data = response.json()
|
data = resp.json()
|
||||||
if data['status'] == 'success':
|
if data['status'] == 'success':
|
||||||
return data['city']
|
return data['city'].rstrip('市')
|
||||||
else:
|
logging.error("定位失败: %s", data.get('message'))
|
||||||
logging.error("获取当前城市失败: %s", data['message'])
|
except Exception as e:
|
||||||
except (requests.RequestException, json.JSONDecodeError) as e:
|
logging.error("定位异常: %s", str(e))
|
||||||
logging.error("检索或解析当前城市时出现错误: %s", e)
|
time.sleep(180)
|
||||||
time.sleep(180) # 重试前等待
|
|
||||||
|
|
||||||
def schedule_getWeath():
|
def schedule_getWeath():
|
||||||
"""调度 getWeath 函数每3分钟执行一次"""
|
"""定时任务调度"""
|
||||||
getWeath()
|
try:
|
||||||
Timer(180, schedule_getWeath).start() # 重新设置定时器
|
getWeath()
|
||||||
|
finally:
|
||||||
|
Timer(180, schedule_getWeath).start()
|
||||||
|
|
||||||
def getWeath(city='101060101'):
|
def getWeath(default_city='101060101'):
|
||||||
|
"""获取天气数据核心函数"""
|
||||||
|
city_name = get_current_city()
|
||||||
|
area_id = default_city # 设置默认值
|
||||||
|
|
||||||
|
if city_name:
|
||||||
|
try:
|
||||||
|
area_id = get_area_id(city_name) or default_city
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("获取区域ID失败: %s", str(e))
|
||||||
|
|
||||||
headers = {
|
headers = {
|
||||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||||||
'Referer': 'https://www.weather.com.cn/'
|
'Referer': 'https://www.weather.com.cn/'
|
||||||
}
|
}
|
||||||
current_city = get_current_city()
|
|
||||||
if current_city:
|
|
||||||
area_id = get_area_id(current_city)
|
|
||||||
if area_id:
|
|
||||||
city = area_id
|
|
||||||
try:
|
try:
|
||||||
response = requests.get(f'https://d1.weather.com.cn/sk_2d/{city}.html', headers=headers)
|
resp = requests.get(
|
||||||
response.raise_for_status() # 检查请求是否成功
|
f'https://d1.weather.com.cn/sk_2d/{area_id}.html',
|
||||||
response.encoding = 'utf-8'
|
headers=headers,
|
||||||
weather_data = response.text[11:]
|
timeout=15
|
||||||
with open('/root/2.13-Ink-screen-clock/bin/weather.json', 'w') as fileHandle:
|
)
|
||||||
fileHandle.write(weather_data)
|
resp.encoding = 'utf-8' # 关键修改:强制设置响应编码
|
||||||
print("天气文件更新")
|
resp.raise_for_status()
|
||||||
except requests.RequestException as e:
|
|
||||||
logging.error("获取天气数据时出现网络错误: %s", e)
|
# 直接处理原始字节数据
|
||||||
|
weather_data = resp.content[11:].decode('utf-8')
|
||||||
|
|
||||||
|
# 使用utf-8编码写入文件
|
||||||
|
with open('/root/2.13-Ink-screen-clock/bin/weather.json', 'w', encoding='utf-8') as f:
|
||||||
|
f.write(weather_data)
|
||||||
|
|
||||||
|
logging.info("天气数据更新成功")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error("发生了意外错误: %s", e)
|
logging.error("天气更新失败: %s", str(e))
|
||||||
|
|
||||||
try:
|
if __name__ == "__main__":
|
||||||
schedule_getWeath() # 开始调度天气获取函数
|
try:
|
||||||
except KeyboardInterrupt:
|
schedule_getWeath()
|
||||||
logging.info("检测到键盘中断,正在退出")
|
while True:
|
||||||
except Exception as e:
|
time.sleep(1) # 保持主线程存活
|
||||||
logging.error("发生了意外错误: %s", e)
|
except KeyboardInterrupt:
|
||||||
exit()
|
logging.info("程序已终止")
|
||||||
|
|
||||||
# 脚本正常结束后的清理操作
|
|
||||||
exit()
|
|
||||||
Reference in New Issue
Block a user