feat: add python cli to upload image. (#11)

This commit is contained in:
RoCry
2024-05-18 07:19:34 +08:00
committed by GitHub
parent 7ecacd7ab9
commit 9ad5d834ad
3 changed files with 363 additions and 0 deletions

188
tools/cli.py Normal file
View File

@@ -0,0 +1,188 @@
import argparse
import asyncio
from contextlib import asynccontextmanager
import logging
import time
import fire
from rich.progress import track
from bleak import BleakClient, BleakScanner
from bleak.uuids import normalize_uuid_16, uuid16_dict
# commands support by firmware
EPD_CMD_CLR = 1
EPD_CMD_MODE = 2
EPD_CMD_BUF = 3
EPD_CMD_BUF_CONT = 4
EPD_CMD_LUT = 5
EPD_CMD_RST = 6
EPD_CMD_BW = 7
EPD_CMD_RED = 8
EPD_CMD_DP = 9
EPD_CMD_FILL = 10
EPD_CMD_BUF_PUT = 11
EPD_CMD_BUF_GET = 12
EPD_CMD_SNV_WRITE = 13
EPD_CMD_SNV_READ = 14
EPD_CMD_SAVE_CFG = 15
class CLI(object):
def __init__(
self,
name_prefix="C26_",
log_level=logging.DEBUG,
timeout=30,
):
self.name_prefix = name_prefix
self._logger = self._setup_logger(log_level)
self.timeout = timeout
def _filter_device(self, device, advertisement_data):
if not device.name:
return False
result = device.name.startswith(self.name_prefix)
if result:
self._logger.debug(f"found device: {device}, {advertisement_data}")
return result
def _setup_logger(self, log_level):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s - [%(levelname)s] > %(message)s",
)
logger = logging.getLogger(__name__)
logger.level = log_level
return logger
@asynccontextmanager
async def _ble_client(self):
self._logger.info("starting scan...")
device = await BleakScanner.find_device_by_filter(
filterfunc=self._filter_device,
timeout=self.timeout,
)
if device is None:
self._logger.error(f"could not find device with name: {self.name_prefix}")
raise Exception("device not found")
self._logger.info("connecting to device...")
async with BleakClient(device, timeout=self.timeout) as client:
self._logger.info(
f"connected to: {device.name if device.name else device.address}"
)
yield client
self._logger.info("disconnecting...")
await client.disconnect()
self._logger.info("disconnected")
##############################################################################################################
# Private CLI commands
##############################################################################################################
async def _do_cmd(self, client, cmd, payload=None):
data = [cmd]
if cmd in [EPD_CMD_MODE, EPD_CMD_DP]:
data.append(payload)
elif cmd in [EPD_CMD_CLR, EPD_CMD_RST, EPD_CMD_BW, EPD_CMD_RED]:
# no need payload
pass
elif cmd == EPD_CMD_BUF:
chunk_size = 60
for i in track(range(0, len(payload), chunk_size), "Sending buf..."):
chunk = payload[i : i + chunk_size]
cmd = cmd if i == 0 else EPD_CMD_BUF_CONT
# logger.debug(f"sending chunk={i+len(chunk)} of data={len(payload)}")
await client.write_gatt_char(
normalize_uuid_16(0xFFFE), bytes([cmd] + chunk)
)
return
else:
raise Exception(f"unsupported cmd: {cmd}, payload: {payload}")
self._logger.debug(f"do cmd: {data}")
await client.write_gatt_char(normalize_uuid_16(0xFFFE), bytes(data))
async def _upload_image_bwr_data(self, client, bw_data, red_data):
await self._do_cmd(client, EPD_CMD_RST)
time.sleep(2)
if bw_data:
await self._do_cmd(client, EPD_CMD_BUF, bw_data)
await self._do_cmd(client, EPD_CMD_BW)
if red_data:
await self._do_cmd(client, EPD_CMD_BUF, red_data)
await self._do_cmd(client, EPD_CMD_RED)
# display with lut 0
await self._do_cmd(client, EPD_CMD_DP, 0)
##############################################################################################################
# Public CLI commands
##############################################################################################################
async def read_etag(self):
async with self._ble_client() as client:
host_epoch = int(round(time.time()))
# read current time
value = await client.read_gatt_char(normalize_uuid_16(0xFFF1))
epoch = int.from_bytes(value, byteorder="little", signed=False)
# read time zone
value = await client.read_gatt_char(normalize_uuid_16(0xFFF2))
tz_min = int.from_bytes(value, byteorder="little", signed=True)
self._logger.info(
f"# host ts: {host_epoch}, etag ts: {epoch}, diff ({epoch - host_epoch})s, tz: {tz_min // 60}h"
)
# battery
value = await client.read_gatt_char(normalize_uuid_16(0xFFF3))
battery = int.from_bytes(value, byteorder="little", signed=False)
# temperature
value = await client.read_gatt_char(normalize_uuid_16(0xFFF4))
temp = int.from_bytes(value, byteorder="little", signed=True)
self._logger.info(f"# battery: {battery}mV, temperature: {temp}°C")
# RTC collaborate
value = await client.read_gatt_char(normalize_uuid_16(0xFFF5))
rtc = int.from_bytes(value, byteorder="little", signed=False)
self._logger.info(f"# rtc: {rtc}")
async def set_time(self):
async with self._ble_client() as client:
epoch = int(round(time.time()))
self._logger.info(f"setting time: {epoch}")
# set current time
await client.write_gatt_char(
normalize_uuid_16(0xFFF1), epoch.to_bytes(4, byteorder="little")
)
# 0: Date mode
# 1: Image mode
async def change_mode(self, mode: int):
if mode not in [0, 1]:
raise ValueError(f"invalid mode: {mode}")
async with self._ble_client() as client:
await self._do_cmd(client, EPD_CMD_MODE, mode)
# image coule be one of the following:
# 1. image file path
# 2. image url
async def upload_image(self, image: str, width: int = 296, height: int = 152):
async with self._ble_client() as client:
from process_image import download_image_if_needed, image_to_bwr_data
image_path = download_image_if_needed(image)
# convert 6608697102119889260_296x152.jpg -dither FloydSteinberg -define dither:diffusion-amount=85% -remap palette.png bmp:output.bmp
bw, red = image_to_bwr_data(image_path, width=width, height=height)
await self._upload_image_bwr_data(client, bw, red)
if __name__ == "__main__":
fire.Fire(CLI)

170
tools/process_image.py Normal file
View File

@@ -0,0 +1,170 @@
from PIL import Image
import fire
import os
def generate_temp_output_path(input_path: str, ext: str = None) -> str:
filename = os.path.basename(input_path)
name_without_ext, original_ext = os.path.splitext(filename)
if ext is None:
ext = original_ext
if not ext.startswith("."):
ext = f".{ext}"
# /tmp/blabla.ext
import tempfile
return os.path.join(tempfile.gettempdir(), f"{name_without_ext}{ext}")
# image could be filepath or image url
def download_image_if_needed(image: str) -> str:
if image.startswith("http"):
import requests
response = requests.get(image)
output_path = generate_temp_output_path(image)
with open(output_path, "wb") as f:
f.write(response.content)
return output_path
return image
# resize the image to the given width and height, aspect to fill
def resize_image(
input_path: str, width: int, height: int, output_path: str = None
) -> str:
if output_path is None:
output_path = generate_temp_output_path(input_path)
# Open the image file
with Image.open(input_path) as img:
# Calculate the required aspect ratio
desired_aspect_ratio = width / height
img_aspect_ratio = img.width / img.height
# Resize and crop logic
if desired_aspect_ratio < img_aspect_ratio:
# The requested aspect ratio is wider than the image's aspect ratio
resize_height = height
resize_width = int(height * img_aspect_ratio)
else:
# The requested aspect ratio is taller than the image's aspect ratio
resize_width = width
resize_height = int(width / img_aspect_ratio)
# Resize the image
img = img.resize((resize_width, resize_height), Image.LANCZOS)
# Calculate cropping area
left = (resize_width - width) / 2
top = (resize_height - height) / 2
right = (resize_width + width) / 2
bottom = (resize_height + height) / 2
# Crop the image
img = img.crop((left, top, right, bottom))
# Save the resized image
img.save(output_path)
return output_path
# palette.png generate with below command
# convert -size 1x3 xc:black xc:white xc:red +append palette.png
# fmt: off
PALETTE_BWR = [
0, 0, 0, # black
255, 255, 255, # white
255, 0, 0, # red
]
# fmt: on
# remap the image to the given palette
# convert input.jpg -dither FloydSteinberg -define dither:diffusion-amount=85% -remap palette.png bmp:output.bmp
def remap_image(
input_path: str,
palette: [int] = None,
output_path: str = None,
dither=Image.Dither.FLOYDSTEINBERG,
) -> str:
if palette is None:
palette = PALETTE_BWR
if output_path is None:
output_path = generate_temp_output_path(input_path, ".bmp")
with Image.open(input_path) as original_image:
original_image = original_image.convert("RGB")
# Create a new image using the 'P' mode (palette-based) with the same
# dimensions as the original.
palette_image = Image.new("P", original_image.size)
# Put the custom palette into the image
palette_image.putpalette(palette)
# Convert the original image to 'P' mode with our custom palette.
# The .quantize() method maps colors to the nearest color in the palette.
# You can play around with the 'dither' and 'colors' parameters if necessary.
converted_image = original_image.quantize(palette=palette_image, dither=dither)
# Save or display your image
converted_image.save(output_path)
return output_path
# If you want to work with the RGB values, convert it back to 'RGB' mode.
# rgb_image = converted_image.convert("RGB")
# convert image to bwr data, specific for BWR EPD
def image_to_bwr_data(
image_path: str,
width: int,
height: int,
dither=Image.Dither.FLOYDSTEINBERG,
):
# logger.debug(f"processing image: {image_path}")
fp = resize_image(image_path, width, height)
# logger.debug(f"resized image: {fp}")
fp = remap_image(fp, dither=dither)
# logger.debug(f"remapped image: {fp}")
img = Image.open(fp).convert("RGB")
width, height = img.size
bw, red = [], []
# Process pixels
# logger.debug(f"generate bw/red data: {width}x{height}")
for y in range(0, height, 8):
for x in range(width):
# logger.debug(f"processing pixel: {x}, {y}")
bw_byte, red_byte = 0, 0
for i in range(8):
if y + i >= height:
break
r, g, b = img.getpixel((x, y + i))
# three possibilities: black, white, red
# black: 0x00, 0x00, 0x00
# red: 0xff, 0x00, 0x00
# white: 0xff, 0xff, 0xff
if r == 0x00 and g == 0x00 and b == 0x00:
# black, bw=0, red=0
pass
elif r == 0xFF and g == 0x00 and b == 0x00:
# red, bw=1, red=1
bw_byte |= 1 << (8 - i - 1)
red_byte |= 1 << (8 - i - 1)
elif r == 0xFF and g == 0xFF and b == 0xFF:
# white, bw=1, red=0
bw_byte |= 1 << (8 - i - 1)
else:
raise Exception(f"invalid pixel: {r}, {g}, {b}")
bw.append(bw_byte)
red.append(red_byte)
return bw, red
if __name__ == "__main__":
fire.Fire()

5
tools/requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
bleak==0.22.1
fire==0.6.0
Pillow==10.3.0
Requests==2.31.0
rich==13.7.1