From e4bb12ed3e0a446effac420349470dc2aa5dfb44 Mon Sep 17 00:00:00 2001 From: Icy Cloud Date: Fri, 12 Jan 2024 23:44:55 +0800 Subject: [PATCH] [TESTED] now with UART support --- .idea/CH347PythonLib.iml | 2 +- .idea/misc.xml | 2 +- README.md | 9 ++- ch347api/__device.py | 106 ++++++++++++++++++++++++++-- ch347api/__init__.py | 1 + ch347api/uart.py | 125 ++++++++++++++++++++++++++++++++++ demo.py | 50 ++++++++++++-- setup.py | 6 +- test.py => test_deprecated.py | 0 9 files changed, 284 insertions(+), 17 deletions(-) rename test.py => test_deprecated.py (100%) diff --git a/.idea/CH347PythonLib.iml b/.idea/CH347PythonLib.iml index e95e312..4e2fd05 100644 --- a/.idea/CH347PythonLib.iml +++ b/.idea/CH347PythonLib.iml @@ -2,7 +2,7 @@ - + diff --git a/.idea/misc.xml b/.idea/misc.xml index 2fe0af5..9bde80f 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -6,5 +6,5 @@ - + \ No newline at end of file diff --git a/README.md b/README.md index c4ebf96..0bf13e7 100644 --- a/README.md +++ b/README.md @@ -19,11 +19,11 @@ _+ [CH347-HIDAPI Github Page](https://github.com/i2cy/ch347-hidapi) +_

## Abstract -This project is the API library of CH347 USB-SPI/I2C bridge chip based on Python. +This project is the API library of CH347 USB-SPI/I2C/UART bridge chip based on Python. `Standard USB-HID mode setting of CH347 chip supported only` -This library provides full access of SPI/I2C settings and communication with CH347 USB-SPI +This library provides full access of SPI/I2C/UART settings and communication with CH347 USB-SPI/I2C/UART bridge chip in Python language. __For demonstration and code reference please refer to the `demo.py` file in [source page](https://github.com/i2cy/CH347-HIDAPI/blob/master/demo.py).__ @@ -45,6 +45,11 @@ THUS, THIS API MAY NOT FULLY CAPABLE OF EVERY FUNCTION IN OFFICIAL API FROM CH34 ## Update Notes +#### 2024-01-12 + 1. Now with fully compatible UART (UART1 with pins TXD1/RXD1/RTS1/CTS1/DTR1) support under mode 3 (which is HID mode), + 2. Baudrate supports ranging from 1.2K to 9M + 3. Multithread receiver for UART (optional, default is on) to receive the data in parallel + #### 2024-01-08 1. Added independent I2C interface class objects (I2CDevice) and SPI interface class objects (SPIDevice) 2. Added new demo file `demo.py` to demonstrate the usage of classes added above (simplified code) diff --git a/ch347api/__device.py b/ch347api/__device.py index 042e99c..dc24304 100644 --- a/ch347api/__device.py +++ b/ch347api/__device.py @@ -12,31 +12,129 @@ from .__i2c import convert_i2c_address, convert_int_to_bytes from typing import Tuple, Any from functools import wraps +import warnings VENDOR_ID: int = 6790 PRODUCT_ID: int = 21980 +class CH347HIDUART1(hid.device): + + def __init__(self, vendor_id=VENDOR_ID, product_id=PRODUCT_ID): + """ + Class of CH347 UART1 interface based on hidapi + :param vendor_id: the vender ID of the device + :type vendor_id: int + :param product_id: the product ID of the device + :type product_id: int + """ + super(CH347HIDUART1, self).__init__() + target = None + for ele in hid.enumerate(): + if ele["vendor_id"] == vendor_id and ele["product_id"] == product_id: + if ele['interface_number'] == 0: # UART interface ID: 0 + target = ele['path'] + + self.open_path(target) + + def init_UART(self, baudrate: int = 115200, stop_bits: int = 1, verify_bits: int = 0, timeout: int = 32) -> bool: + """ + Initialize the configuration of the UART interface + :param baudrate: the baudrate of the device, default is 115200 + :type baudrate: int + :param stop_bits: number of stop bits, default is 1 + :type stop_bits: int + :param verify_bits: number of verify bits, default is 0 + :type verify_bits: int + :param timeout: timeout in milliseconds, default is 32, this value should not exceed the maximum of 255 + :type timeout: int + :return: operation status + :rtype: bool + """ + header = b"\x00\xcb\x08\x00" + stop_bits = stop_bits * 2 - 2 + + if baudrate < 1200 or baudrate > 7_500_000: + raise ValueError("Invalid baudrate, correct value should ranging from 1200 to 7500000") + + if timeout > 255: + raise Exception("timeout should not exceed the maximum number of 255") + + payload = header + struct.pack(" int: + """ + Write data to the device + :param data: data to write + :type data: bytes + :return: wrote length + :rtype: int + """ + offset = 0 + while len(data) - offset > 510: + payload = struct.pack(" list: + """ + Read data from the device if any byte is available + :param length: maximum length of the data, default is -1 which means read all bytes that received + :type length: int + :return: list of bytes read + :rtype: list + """ + self.set_nonblocking(1) + + ret = [] + while len(ret) < length or length < 0: + chunk = self.read(512) + if chunk: + chunk_len = struct.unpack(" None: + if self.__multithreading: + self.__live = False + try: + self.__data_lock.release() + self.__thread.join() + except RuntimeError: + pass + + def kill(self): + """ + Kill receiver thread if multithreading is on + :return: + """ + if self.__multithreading: + self.__live = False + try: + self.__data_lock.release() + self.__thread.join() + except RuntimeError: + pass + + def write(self, data: bytes) -> int: + """ + Write data to the device + :param data: data to write + :type data: bytes + :return: wrote length + :rtype: int + """ + return self.dev.write_raw(data) + + def read(self, length: int = -1, timeout: int = 5) -> list: + """ + Read data from the device if any byte is available + :param length: maximum length of the data, default is -1 which means read all bytes that received + :type length: int + :param timeout: timeout in seconds, default is 5, set to 0 means return data from buffer immediately instead of + waiting for the buffer to collect enough data (available only for multithreading) + :type timeout: int + :return: list of bytes read + :rtype: list + """ + if self.__multithreading: + # wait for data + t0 = time.time() + + if length < 0: + while time.time() - t0 < timeout and len(self.__received_data) == 0: + time.sleep(0.002) + else: + while time.time() - t0 < timeout and length > len(self.__received_data): + time.sleep(0.002) + + self.__data_lock.acquire() + + if len(self.__received_data) > length > 0: + ret = self.__received_data[:length] + self.__received_data = self.__received_data[length:] + + else: + ret = self.__received_data + self.__received_data = [] + + self.__data_lock.release() + + else: + ret = self.dev.read_raw(length) + + return ret diff --git a/demo.py b/demo.py index 1aa090d..a44b94a 100644 --- a/demo.py +++ b/demo.py @@ -8,7 +8,7 @@ import random import time -from ch347api import CH347HIDDev, I2CDevice, SPIDevice, SPIClockFreq, I2CClockFreq +from ch347api import CH347HIDDev, I2CDevice, SPIDevice, UARTDevice, SPIClockFreq, I2CClockFreq def i2c_demo(): @@ -61,7 +61,7 @@ def spi_demo(): # i2c = I2CDevice(addr=0x68, ch347_device=dev) # write test (activate CS -> write data -> deactivate CS) - print("performing SPI write test") + print("[SPI] performing SPI write test") spi.write_CS1(b"hello world") spi.write_CS2(b"this is ch347") spi.write_CS1([0, 1, 2, 3]) @@ -72,16 +72,54 @@ def spi_demo(): spi.write_CS1(b"this is ch347") # read test (activate CS -> read data -> deactivate CS) - print("performing SPI read test") - print("received 16 bytes from SPI bus on CS1:", bytes(spi.read_CS1(16))) + print("[SPI] performing SPI read test") + print("[SPI] received 16 bytes from SPI bus on CS1:", bytes(spi.read_CS1(16))) # write&read test (activate CS -> read data -> deactivate CS) - random_bytes = random.randbytes(512) - print("write read test result (with MOSI, MISO short connected): {}".format( + random_bytes = b"\xa5\x5a\x5a\xa5" * 128 + print("[SPI] write read test result (with MOSI, MISO short connected): {}".format( bytes(spi.writeRead_CS1(random_bytes)) == random_bytes )) +def uart_demo(): + # while performing this test please make sure TX and RX pin short connected + + # initialize an uart communication object + # -*- Way 1 -*- + uart = UARTDevice(baudrate=7_500_000) + + # -*- Way 2 -*- (with no multithreading receiver) + # uart = UARTDevice(baudrate=115200, stop_bits=1, verify_bits=0, timeout=128, multithreading=False) + + # uart write test + test_b1 = b"hello world, this is ch347. " + test_b2 = b"using CH347-HIDAPI" + test_b3 = b"\xa5\x5a\x5a\xa5\x00\x01\x02\x03\xfc\xfd\xfe\xff\xa5\x5a\x5a\xa5" + wrote = uart.write(test_b1) + print("[UART] wrote {} bytes with content \"{}\"".format(wrote, test_b1.decode("utf-8"))) + wrote = uart.write(test_b2) + print("[UART] wrote {} bytes with content \"{}\"".format(wrote, test_b2.decode("utf-8"))) + + # uart read test + read = uart.read(len(test_b1 + test_b2)) + print("[UART] read {} bytes of data test result: {}".format(len(read), bytes(read) == test_b1 + test_b2)) + print("[UART] received: {}".format(bytes(read))) + + # uart accuracy test + print("[UART] continuous sending and receiving test with 4MB size in progress..") + payload = test_b3 * 64 * 1024 * 4 + t0 = time.time() + uart.write(payload) + read = uart.read(len(payload), timeout=15) + print("[UART] 4MB payload received, time spent: {:.2f} ms, accuracy test result: {}".format( + (time.time() - t0) * 1000, bytes(read) == payload)) + + # [VITAL] kill sub-thread(receiver thread) for safe exit + uart.kill() + + if __name__ == "__main__": i2c_demo() spi_demo() + uart_demo() diff --git a/setup.py b/setup.py index a0c16bd..5a19aa1 100644 --- a/setup.py +++ b/setup.py @@ -12,11 +12,11 @@ setuptools.setup( name="ch347api", - version="0.2.0", + version="0.3.0", author="I2cy Cloud", author_email="i2cy@outlook.com", - description="A Python Library provides full access of SPI/I2C settings and communication" - " with CH347 USB-SPI bridge chip in Python language.", + description="A Python Library provides full access of SPI/I2C/UART settings and communication" + " with CH347 USB-SPI/I2C/UART bridge chip in Python language.", long_description=long_description, long_description_content_type="text/markdown", url="https://github.com/i2cy/ch347-hidapi", diff --git a/test.py b/test_deprecated.py similarity index 100% rename from test.py rename to test_deprecated.py