From 620eea99ed1c1e33ed9811e9d0737c152e8085ff Mon Sep 17 00:00:00 2001 From: Josuah Demangeon Date: Wed, 21 Jun 2023 19:11:13 -0400 Subject: [PATCH 1/2] data-serial-console: implement data on the serial console with Ctrl-V --- tools/serial_console.py | 75 ++++++++++++++++++++++++++++------------- 1 file changed, 51 insertions(+), 24 deletions(-) diff --git a/tools/serial_console.py b/tools/serial_console.py index a4604cc0..d271d578 100755 --- a/tools/serial_console.py +++ b/tools/serial_console.py @@ -11,6 +11,7 @@ import os import tty import termios +import binascii from itertools import count, takewhile from typing import Iterator @@ -18,12 +19,24 @@ from bleak.backends.characteristic import BleakGATTCharacteristic from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData +from bleak.uuids import register_uuids -UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" -UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" -UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" -# TIP: you can get this function and more from the ``more-itertools`` package. +UART_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e" +UART_RX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" +UART_TX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" + +DATA_SERVICE_UUID = "e5700001-7bac-429a-b4ce-57ff900f479d" +DATA_RX_CHAR_UUID = "e5700002-7bac-429a-b4ce-57ff900f479d" +DATA_TX_CHAR_UUID = "e5700003-7bac-429a-b4ce-57ff900f479d" + +register_uuids({ + DATA_SERVICE_UUID: "Monocle Raw Serivce", + DATA_TX_CHAR_UUID: "Monocle Raw TX", + DATA_RX_CHAR_UUID: "Monocle Raw RX", +}) + +# You can get this function and more from the ``more-itertools`` package. def sliced(data: bytes, n: int) -> Iterator[bytes]: """ Slices *data* into chunks of size *n*. The last slice may be smaller than @@ -32,7 +45,7 @@ def sliced(data: bytes, n: int) -> Iterator[bytes]: return takewhile(len, (data[i : i + n] for i in count(0, n))) -async def uart_terminal(): +async def repl_terminal(): """This is a simple "terminal" program that uses the Nordic Semiconductor (nRF) UART service. It reads from stdin and sends each line of data to the remote device. Any data received from the device is printed to stdout. @@ -41,14 +54,14 @@ async def uart_terminal(): # opens sandard output in binary mode stdout = os.fdopen(1, 'wb') - def match_nus_uuid(device: BLEDevice, adv: AdvertisementData): + def match_repl_uuid(device: BLEDevice, adv: AdvertisementData): # This assumes that the device includes the UART service UUID in the # advertising data. This test may need to be adjusted depending on the # actual advertising data supplied by the device. sys.stderr.write(f"uuids={adv.service_uuids}\n") return UART_SERVICE_UUID.lower() in adv.service_uuids - device = await BleakScanner.find_device_by_filter(match_nus_uuid) + device = await BleakScanner.find_device_by_filter(match_repl_uuid) if device is None: sys.stderr.write("no matching device found\n") @@ -61,36 +74,50 @@ def handle_disconnect(_: BleakClient): for task in asyncio.all_tasks(): task.cancel() - def handle_rx(_: BleakGATTCharacteristic, data: bytearray): + def handle_repl_rx(_: BleakGATTCharacteristic, data: bytearray): stdout.write(data) stdout.flush() + def handle_data_rx(_: BleakGATTCharacteristic, data: bytearray): + hex = data.hex(' ', 1) + sys.stderr.write(f'RX: {hex} {data}\r\n') + sys.stderr.flush() + + def prompt(): + global saved_term + if sys.stdin.isatty(): + termios.tcsetattr(0, termios.TCSANOW, saved_term) + line = sys.stdin.buffer.readline() + tty.setraw(0) + return line + async with BleakClient(device, disconnected_callback=handle_disconnect) as client: - await client.start_notify(UART_TX_CHAR_UUID, handle_rx) + await client.start_notify(UART_TX_CHAR_UUID, handle_repl_rx) + await client.start_notify(DATA_TX_CHAR_UUID, handle_data_rx) loop = asyncio.get_running_loop() - nus = client.services.get_service(UART_SERVICE_UUID) - rx_char = nus.get_characteristic(UART_RX_CHAR_UUID) + repl = client.services.get_service(UART_SERVICE_UUID) + data = client.services.get_service(DATA_SERVICE_UUID) + repl_rx_char = repl.get_characteristic(UART_RX_CHAR_UUID) + data_rx_char = data.get_characteristic(DATA_RX_CHAR_UUID) # set the terminal to raw I/O: no buffering if sys.stdin.isatty(): tty.setraw(0) + # Infinite loop to read the input character until the end while True: - # This waits until you type a line and press ENTER. - # A real terminal program might put stdin in raw mode so that things - # like CTRL+C get passed to the remote device. - data = await loop.run_in_executor(None, sys.stdin.buffer.read, 1) - - # data will be empty on EOF (e.g. CTRL+D on *nix) - if not data: + ch = await loop.run_in_executor(None, sys.stdin.buffer.read, 1) + if not ch: # EOF break + if ch == b'\x16': # Ctrl-V + sys.stderr.write(f'TX: ') + sys.stderr.flush() + line = await loop.run_in_executor(None, prompt) + await client.write_gatt_char(data_rx_char, line) + else: + await client.write_gatt_char(repl_rx_char, ch) - # Writing without response requires that the data can fit in a - # single BLE packet. We can use the max_write_without_response_size - # property to split the data into chunks that will fit. - for s in sliced(data, rx_char.max_write_without_response_size): - await client.write_gatt_char(rx_char, s) if __name__ == "__main__": # save the terminal I/O state @@ -98,7 +125,7 @@ def handle_rx(_: BleakGATTCharacteristic, data: bytearray): saved_term = termios.tcgetattr(0) try: - asyncio.run(uart_terminal()) + asyncio.run(repl_terminal()) except asyncio.CancelledError: # task is cancelled on disconnect, so we ignore this error pass From 2f314ce17f49a36bd545a865529b2851ab99af2a Mon Sep 17 00:00:00 2001 From: Josuah Demangeon Date: Wed, 21 Jun 2023 19:14:24 -0400 Subject: [PATCH 2/2] data-serial-console: add introduction explanation text --- tools/serial_console.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/serial_console.py b/tools/serial_console.py index d271d578..64d2a41e 100755 --- a/tools/serial_console.py +++ b/tools/serial_console.py @@ -105,6 +105,8 @@ def prompt(): if sys.stdin.isatty(): tty.setraw(0) + sys.stderr.write('Ctrl-V + "input text" + Enter: data to raw service\r\n') + # Infinite loop to read the input character until the end while True: ch = await loop.run_in_executor(None, sys.stdin.buffer.read, 1)