From 9ad5d834ad68602a792c6947125a68bb43f39c5c Mon Sep 17 00:00:00 2001 From: RoCry Date: Sat, 18 May 2024 07:19:34 +0800 Subject: [PATCH] feat: add python cli to upload image. (#11) --- tools/cli.py | 188 +++++++++++++++++++++++++++++++++++++++++ tools/process_image.py | 170 +++++++++++++++++++++++++++++++++++++ tools/requirements.txt | 5 ++ 3 files changed, 363 insertions(+) create mode 100644 tools/cli.py create mode 100644 tools/process_image.py create mode 100644 tools/requirements.txt diff --git a/tools/cli.py b/tools/cli.py new file mode 100644 index 0000000..bd3762c --- /dev/null +++ b/tools/cli.py @@ -0,0 +1,188 @@ +import argparse +import asyncio +from contextlib import asynccontextmanager +import logging +import time +import fire + +from rich.progress import track + + +from bleak import BleakClient, BleakScanner +from bleak.uuids import normalize_uuid_16, uuid16_dict + +# commands support by firmware +EPD_CMD_CLR = 1 +EPD_CMD_MODE = 2 +EPD_CMD_BUF = 3 +EPD_CMD_BUF_CONT = 4 +EPD_CMD_LUT = 5 +EPD_CMD_RST = 6 +EPD_CMD_BW = 7 +EPD_CMD_RED = 8 +EPD_CMD_DP = 9 +EPD_CMD_FILL = 10 +EPD_CMD_BUF_PUT = 11 +EPD_CMD_BUF_GET = 12 +EPD_CMD_SNV_WRITE = 13 +EPD_CMD_SNV_READ = 14 +EPD_CMD_SAVE_CFG = 15 + + +class CLI(object): + def __init__( + self, + name_prefix="C26_", + log_level=logging.DEBUG, + timeout=30, + ): + self.name_prefix = name_prefix + self._logger = self._setup_logger(log_level) + self.timeout = timeout + + def _filter_device(self, device, advertisement_data): + if not device.name: + return False + result = device.name.startswith(self.name_prefix) + if result: + self._logger.debug(f"found device: {device}, {advertisement_data}") + return result + + def _setup_logger(self, log_level): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s - [%(levelname)s] > %(message)s", + ) + logger = logging.getLogger(__name__) + logger.level = log_level + return logger + + @asynccontextmanager + async def _ble_client(self): + self._logger.info("starting scan...") + + device = await BleakScanner.find_device_by_filter( + filterfunc=self._filter_device, + timeout=self.timeout, + ) + if device is None: + self._logger.error(f"could not find device with name: {self.name_prefix}") + raise Exception("device not found") + + self._logger.info("connecting to device...") + + async with BleakClient(device, timeout=self.timeout) as client: + self._logger.info( + f"connected to: {device.name if device.name else device.address}" + ) + yield client + self._logger.info("disconnecting...") + await client.disconnect() + + self._logger.info("disconnected") + + ############################################################################################################## + # Private CLI commands + ############################################################################################################## + async def _do_cmd(self, client, cmd, payload=None): + data = [cmd] + + if cmd in [EPD_CMD_MODE, EPD_CMD_DP]: + data.append(payload) + elif cmd in [EPD_CMD_CLR, EPD_CMD_RST, EPD_CMD_BW, EPD_CMD_RED]: + # no need payload + pass + elif cmd == EPD_CMD_BUF: + chunk_size = 60 + for i in track(range(0, len(payload), chunk_size), "Sending buf..."): + chunk = payload[i : i + chunk_size] + cmd = cmd if i == 0 else EPD_CMD_BUF_CONT + # logger.debug(f"sending chunk={i+len(chunk)} of data={len(payload)}") + await client.write_gatt_char( + normalize_uuid_16(0xFFFE), bytes([cmd] + chunk) + ) + return + else: + raise Exception(f"unsupported cmd: {cmd}, payload: {payload}") + + self._logger.debug(f"do cmd: {data}") + await client.write_gatt_char(normalize_uuid_16(0xFFFE), bytes(data)) + + async def _upload_image_bwr_data(self, client, bw_data, red_data): + await self._do_cmd(client, EPD_CMD_RST) + time.sleep(2) + + if bw_data: + await self._do_cmd(client, EPD_CMD_BUF, bw_data) + await self._do_cmd(client, EPD_CMD_BW) + if red_data: + await self._do_cmd(client, EPD_CMD_BUF, red_data) + await self._do_cmd(client, EPD_CMD_RED) + # display with lut 0 + await self._do_cmd(client, EPD_CMD_DP, 0) + + ############################################################################################################## + # Public CLI commands + ############################################################################################################## + async def read_etag(self): + async with self._ble_client() as client: + host_epoch = int(round(time.time())) + + # read current time + value = await client.read_gatt_char(normalize_uuid_16(0xFFF1)) + epoch = int.from_bytes(value, byteorder="little", signed=False) + + # read time zone + value = await client.read_gatt_char(normalize_uuid_16(0xFFF2)) + tz_min = int.from_bytes(value, byteorder="little", signed=True) + self._logger.info( + f"# host ts: {host_epoch}, etag ts: {epoch}, diff ({epoch - host_epoch})s, tz: {tz_min // 60}h" + ) + + # battery + value = await client.read_gatt_char(normalize_uuid_16(0xFFF3)) + battery = int.from_bytes(value, byteorder="little", signed=False) + + # temperature + value = await client.read_gatt_char(normalize_uuid_16(0xFFF4)) + temp = int.from_bytes(value, byteorder="little", signed=True) + self._logger.info(f"# battery: {battery}mV, temperature: {temp}°C") + + # RTC collaborate + value = await client.read_gatt_char(normalize_uuid_16(0xFFF5)) + rtc = int.from_bytes(value, byteorder="little", signed=False) + self._logger.info(f"# rtc: {rtc}") + + async def set_time(self): + async with self._ble_client() as client: + epoch = int(round(time.time())) + self._logger.info(f"setting time: {epoch}") + + # set current time + await client.write_gatt_char( + normalize_uuid_16(0xFFF1), epoch.to_bytes(4, byteorder="little") + ) + + # 0: Date mode + # 1: Image mode + async def change_mode(self, mode: int): + if mode not in [0, 1]: + raise ValueError(f"invalid mode: {mode}") + async with self._ble_client() as client: + await self._do_cmd(client, EPD_CMD_MODE, mode) + + # image coule be one of the following: + # 1. image file path + # 2. image url + async def upload_image(self, image: str, width: int = 296, height: int = 152): + async with self._ble_client() as client: + from process_image import download_image_if_needed, image_to_bwr_data + + image_path = download_image_if_needed(image) + # convert 6608697102119889260_296x152.jpg -dither FloydSteinberg -define dither:diffusion-amount=85% -remap palette.png bmp:output.bmp + bw, red = image_to_bwr_data(image_path, width=width, height=height) + await self._upload_image_bwr_data(client, bw, red) + + +if __name__ == "__main__": + fire.Fire(CLI) diff --git a/tools/process_image.py b/tools/process_image.py new file mode 100644 index 0000000..60c501c --- /dev/null +++ b/tools/process_image.py @@ -0,0 +1,170 @@ +from PIL import Image +import fire +import os + + +def generate_temp_output_path(input_path: str, ext: str = None) -> str: + filename = os.path.basename(input_path) + name_without_ext, original_ext = os.path.splitext(filename) + if ext is None: + ext = original_ext + if not ext.startswith("."): + ext = f".{ext}" + # /tmp/blabla.ext + import tempfile + + return os.path.join(tempfile.gettempdir(), f"{name_without_ext}{ext}") + + +# image could be filepath or image url +def download_image_if_needed(image: str) -> str: + if image.startswith("http"): + import requests + + response = requests.get(image) + output_path = generate_temp_output_path(image) + with open(output_path, "wb") as f: + f.write(response.content) + return output_path + return image + + +# resize the image to the given width and height, aspect to fill +def resize_image( + input_path: str, width: int, height: int, output_path: str = None +) -> str: + if output_path is None: + output_path = generate_temp_output_path(input_path) + + # Open the image file + with Image.open(input_path) as img: + # Calculate the required aspect ratio + desired_aspect_ratio = width / height + img_aspect_ratio = img.width / img.height + + # Resize and crop logic + if desired_aspect_ratio < img_aspect_ratio: + # The requested aspect ratio is wider than the image's aspect ratio + resize_height = height + resize_width = int(height * img_aspect_ratio) + else: + # The requested aspect ratio is taller than the image's aspect ratio + resize_width = width + resize_height = int(width / img_aspect_ratio) + + # Resize the image + img = img.resize((resize_width, resize_height), Image.LANCZOS) + + # Calculate cropping area + left = (resize_width - width) / 2 + top = (resize_height - height) / 2 + right = (resize_width + width) / 2 + bottom = (resize_height + height) / 2 + + # Crop the image + img = img.crop((left, top, right, bottom)) + + # Save the resized image + img.save(output_path) + + return output_path + + +# palette.png generate with below command +# convert -size 1x3 xc:black xc:white xc:red +append palette.png +# fmt: off +PALETTE_BWR = [ + 0, 0, 0, # black + 255, 255, 255, # white + 255, 0, 0, # red +] +# fmt: on + + +# remap the image to the given palette +# convert input.jpg -dither FloydSteinberg -define dither:diffusion-amount=85% -remap palette.png bmp:output.bmp +def remap_image( + input_path: str, + palette: [int] = None, + output_path: str = None, + dither=Image.Dither.FLOYDSTEINBERG, +) -> str: + if palette is None: + palette = PALETTE_BWR + if output_path is None: + output_path = generate_temp_output_path(input_path, ".bmp") + + with Image.open(input_path) as original_image: + original_image = original_image.convert("RGB") + + # Create a new image using the 'P' mode (palette-based) with the same + # dimensions as the original. + palette_image = Image.new("P", original_image.size) + + # Put the custom palette into the image + palette_image.putpalette(palette) + + # Convert the original image to 'P' mode with our custom palette. + # The .quantize() method maps colors to the nearest color in the palette. + # You can play around with the 'dither' and 'colors' parameters if necessary. + converted_image = original_image.quantize(palette=palette_image, dither=dither) + + # Save or display your image + converted_image.save(output_path) + + return output_path + + # If you want to work with the RGB values, convert it back to 'RGB' mode. + # rgb_image = converted_image.convert("RGB") + + +# convert image to bwr data, specific for BWR EPD +def image_to_bwr_data( + image_path: str, + width: int, + height: int, + dither=Image.Dither.FLOYDSTEINBERG, +): + # logger.debug(f"processing image: {image_path}") + fp = resize_image(image_path, width, height) + # logger.debug(f"resized image: {fp}") + fp = remap_image(fp, dither=dither) + # logger.debug(f"remapped image: {fp}") + + img = Image.open(fp).convert("RGB") + width, height = img.size + bw, red = [], [] + + # Process pixels + # logger.debug(f"generate bw/red data: {width}x{height}") + for y in range(0, height, 8): + for x in range(width): + # logger.debug(f"processing pixel: {x}, {y}") + bw_byte, red_byte = 0, 0 + for i in range(8): + if y + i >= height: + break + r, g, b = img.getpixel((x, y + i)) + # three possibilities: black, white, red + # black: 0x00, 0x00, 0x00 + # red: 0xff, 0x00, 0x00 + # white: 0xff, 0xff, 0xff + if r == 0x00 and g == 0x00 and b == 0x00: + # black, bw=0, red=0 + pass + elif r == 0xFF and g == 0x00 and b == 0x00: + # red, bw=1, red=1 + bw_byte |= 1 << (8 - i - 1) + red_byte |= 1 << (8 - i - 1) + elif r == 0xFF and g == 0xFF and b == 0xFF: + # white, bw=1, red=0 + bw_byte |= 1 << (8 - i - 1) + else: + raise Exception(f"invalid pixel: {r}, {g}, {b}") + bw.append(bw_byte) + red.append(red_byte) + return bw, red + + +if __name__ == "__main__": + fire.Fire() diff --git a/tools/requirements.txt b/tools/requirements.txt new file mode 100644 index 0000000..2aa659e --- /dev/null +++ b/tools/requirements.txt @@ -0,0 +1,5 @@ +bleak==0.22.1 +fire==0.6.0 +Pillow==10.3.0 +Requests==2.31.0 +rich==13.7.1