From 1dc19b200dfc991927c36c6062ca3fb946bff094 Mon Sep 17 00:00:00 2001 From: Gabor Csapo Date: Thu, 30 Jun 2022 17:43:17 +0800 Subject: [PATCH] Add integration tests for the stm32h743xx HIC's UDB flavour UDB has a number of custom features that need to be tested before any UDB FW or HW release. ## How to run the tests? All test are stadalone with no additional hardware setup required. Run these quick tests before committing changes. Setup using: `python3 -m pip install -r "{path_to_test_folder}/requirements.txt"`. Then run: `python udb_test_main.py --test_bin_path {path-to-binary-running-on-device-that-you-want-test} --dummy_bin_path {path-to-binary-that-has-a-different-version-from-the-test-image} --serial_port_path {path-to-serial-port}` Example with full paths: `python source/hic_hal/stm32/stm32h743ii/extended_features/test/udb_test_main.py --test_bin_path ~/Downloads/test_0.12_local_stm32h743ii_udb_if_crc.bin --dummy_bin_path ~/Downloads/old_0.11_udb_stm32h743ii_if_crc.bin --serial_port_path /dev/serial/by-id/usb-Arm_DAPLink_CMSIS-DAP_00000081004400413330511331373438a5a5a5a597969940-if02` You can additionally add -d for detailed logs and --run-all to run tests that require special attention. ## How to add new tests? The test suite uses the standard python unittest library with the same standards. Add your tests to the test/udb_integration_test/tests folder. Each test needs to extend the TestCase class or a subclass of it. You can use any of the udb devices that are implemented to make it easier to talk to UDB through DAP, serial or the file system. Before submitting your changes run mypy on the tests for type checking. Some libaries might have errors, but the test suite should have none. Co-authored-by: Yangte Chen Co-authored-by: Eric Lee --- test/udb_integration_test/README | 12 ++ test/udb_integration_test/requirements.txt | 2 + .../tests/test_dap_commands.py | 26 +++ .../tests/test_shell_commands.py | 148 +++++++++++++++++ .../tests/test_software_update.py | 54 ++++++ .../tests/test_usb_stress.py | 35 ++++ test/udb_integration_test/udb_dap_device.py | 61 +++++++ .../udb_mass_storage_device.py | 85 ++++++++++ .../udb_integration_test/udb_serial_device.py | 157 ++++++++++++++++++ test/udb_integration_test/udb_test_helper.py | 81 +++++++++ test/udb_integration_test/udb_test_main.py | 44 +++++ 11 files changed, 705 insertions(+) create mode 100644 test/udb_integration_test/README create mode 100644 test/udb_integration_test/requirements.txt create mode 100644 test/udb_integration_test/tests/test_dap_commands.py create mode 100644 test/udb_integration_test/tests/test_shell_commands.py create mode 100644 test/udb_integration_test/tests/test_software_update.py create mode 100644 test/udb_integration_test/tests/test_usb_stress.py create mode 100644 test/udb_integration_test/udb_dap_device.py create mode 100644 test/udb_integration_test/udb_mass_storage_device.py create mode 100644 test/udb_integration_test/udb_serial_device.py create mode 100644 test/udb_integration_test/udb_test_helper.py create mode 100644 test/udb_integration_test/udb_test_main.py diff --git a/test/udb_integration_test/README b/test/udb_integration_test/README new file mode 100644 index 0000000000..1f0c65ed41 --- /dev/null +++ b/test/udb_integration_test/README @@ -0,0 +1,12 @@ +## How to run the tests? +All test are stadalone with no additional hardware setup required. Run these quick tests before committing changes. + +Setup using: `python3 -m pip install -r "{path_to_test_folder}/requirements.txt"`. +Then run: `python udb_test_main.py --test_bin_path {path-to-binary-running-on-device-that-you-want-test} --dummy_bin_path {path-to-binary-that-has-a-different-version-from-the-test-image} --serial_port_path {path-to-serial-port}` +Example with full paths: `python source/hic_hal/stm32/stm32h743ii/extended_features/test/udb_test_main.py --test_bin_path ~/Downloads/test_0.12_local_stm32h743ii_udb_if_crc.bin --dummy_bin_path ~/Downloads/old_0.11_udb_stm32h743ii_if_crc.bin --serial_port_path /dev/serial/by-id/usb-Arm_DAPLink*-if02` +You can additionally add -d for detailed logs and --run-all to run tests that require special attention. + +## How to add new tests? +The test suite uses the standard python unittest library with the same standards. Add your tests to the test/udb_integration_test/tests folder. Each test needs to extend the TestCase class or a subclass of it. You can use any of the udb devices that are implemented to make it easier to talk to UDB through DAP, serial or the file system. + +Before submitting your changes run mypy on the tests for type checking. Some libaries might have errors, but the test suite should have none. \ No newline at end of file diff --git a/test/udb_integration_test/requirements.txt b/test/udb_integration_test/requirements.txt new file mode 100644 index 0000000000..2a097b786b --- /dev/null +++ b/test/udb_integration_test/requirements.txt @@ -0,0 +1,2 @@ +pyOCD==0.34.* +pexpect diff --git a/test/udb_integration_test/tests/test_dap_commands.py b/test/udb_integration_test/tests/test_dap_commands.py new file mode 100644 index 0000000000..c401abc2bd --- /dev/null +++ b/test/udb_integration_test/tests/test_dap_commands.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +from typing import ClassVar, Generator +from udb_dap_device import UDBDapTestDevice +from udb_test_helper import ContextTest +from pyocd.probe.pydapaccess.cmsis_dap_core import Capabilities +import logging + +logger = logging.getLogger("test.udb_integration_test") + +class DAPCommandTest(ContextTest): + udb: UDBDapTestDevice + + def context(self) -> Generator: + with UDBDapTestDevice() as self.udb: + yield + + def test_dap_cmd_info_for_swd_capability(self) -> None: + # Verify the DAP info command returned the expected data when the pyocd device was + # initialized + self.assertTrue((self.udb.get_device_dap_capabilities() & Capabilities.SWD) != 0, + "No SWD capability returned in DAP info") + + def test_dap_vendor_command_version(self) -> None: + # Verify device responds to vendor commands + self.assertTrue((self.udb.get_udb_interface_version()[1:5] == "udb_", + "Wrong version returned")) diff --git a/test/udb_integration_test/tests/test_shell_commands.py b/test/udb_integration_test/tests/test_shell_commands.py new file mode 100644 index 0000000000..84589c4db0 --- /dev/null +++ b/test/udb_integration_test/tests/test_shell_commands.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python +from udb_test_helper import UDBTestResources, ContextTest +from unittest import TestCase, skipUnless +from udb_serial_device import UDBSerialTestDevice, OopsError +from typing import Generator +import time +import re + +class ShellCommandTest(ContextTest): + udb_serial: UDBSerialTestDevice + + def context(self) -> Generator: + with UDBSerialTestDevice() as self.udb_serial: + yield + + def test_help(self) -> None: + output = self.udb_serial.command("help") + self.assertRegex(output, "show available commands") + + def test_gpio(self) -> None: + output = self.udb_serial.command("gpio read E 9") + self.assertRegex(output, "GPIO port E pin 9 is 0") + output = self.udb_serial.command("gpio pp_set E 9") + self.assertRegex(output, "GPIO port E pin 9 is 1") + output = self.udb_serial.command("gpio read E 9") + self.assertRegex(output, "GPIO port E pin 9 is 1") + output = self.udb_serial.command("gpio pp_clear E 9") + self.assertRegex(output, "GPIO port E pin 9 is 0") + output = self.udb_serial.command("gpio read E 9") + self.assertRegex(output, "GPIO port E pin 9 is 0") + + def test_pwm(self) -> None: + self.udb_serial.command("pwm start 50 60") + self.udb_serial.command("pwm stop") + + def test_version(self) -> None: + output = self.udb_serial.command("version") + self.assertRegex(output, "Interface ver: udb_(.*)_hw:([0-9]*)\r\nBootloader " \ + "ver: (.*)\r\n\r\nDONE 0\r") + + def test_adapter_type(self) -> None: + self.udb_serial.command("adapter_type") + + def test_i2c_probe(self) -> None: + output = self.udb_serial.command("i2c probe 2") + self.assertRegex(output, "probing...\r\n0x17\r\n0x50\r\n0x51\r\n0x52\r\n" \ + "0x53\r\n0x54\r\n0x55\r\n0x56\r\n0x57") + + def test_measure_power(self) -> None: + output = self.udb_serial.command("measure_power") + result = re.search("Target: Mainboard USB\r\n\tvoltage: ([0-9]*) mV\r\n\tcurrent: ([0-9]*) uA\r\n", output) + if result != None: + # needs this assert otherwise typing complains cause result is of type + # Optional[Match] + assert result is not None + self.assertLess(int(result.group(1)), 5150, "Voltage is unexpectedly large") + self.assertGreater(int(result.group(1)), 4850, "Voltage is unexpectedly small") + self.assertLess(int(result.group(2)), 150000, "Current is unexpectedly large") + self.assertGreater(int(result.group(2)), 113000, "Current is unexpectedly small") + else: + self.assertTrue(False, "Can't find expected output") + + def test_uptime(self) -> None: + test_seconds = 10 + + output = self.udb_serial.command("uptime") + result = re.search("([0-9]*) mins ([0-9]*) secs\r\n", output) + prev_secs = int(result.group(1)) * 60 + int(result.group(2)) + + time.sleep(test_seconds) + + output = self.udb_serial.command("uptime") + result = re.search("([0-9]*) mins ([0-9]*) secs\r\n", output) + secs = int(result.group(1)) * 60 + int(result.group(2)) + + # secs may wrap around in seconds + self.assertEqual((prev_secs + test_seconds) % 3600, secs, "uptime is not accurate") + + def test_ext_relay(self) -> None: + self.udb_serial.command("ext_relay on") + output = self.udb_serial.command("ext_relay status") + self.assertRegex(output, "external relay is on") + self.udb_serial.command("ext_relay off") + output = self.udb_serial.command("ext_relay status") + self.assertRegex(output, "external relay is off") + + def test_swd_dut(self) -> None: + self.udb_serial.command("swd_dut 0") + output = self.udb_serial.command("swd_dut") + self.assertRegex(output, "DUT 0") + self.udb_serial.command("swd_dut 1") + output = self.udb_serial.command("swd_dut") + self.assertRegex(output, "DUT 1") + + def test_btn(self) -> None: + for btn in ['RST0_L', 'BOOT0_L', 'BTN0_L', 'RST1', 'BOOT1', 'BTN1']: + self.udb_serial.command("btn {btn_name} press".format(btn_name=btn)) + self.udb_serial.command("btn {btn_name} release".format(btn_name=btn)) + self.udb_serial.command("btn {btn_name} tap".format(btn_name=btn)) + +class ShellCommandWithResetTest(TestCase): + def test_reset(self) -> None: + with UDBSerialTestDevice() as udb_serial: + try: + output = udb_serial.command("reset") + self.assertTrue(False, f"Expected UDB to reset, but it didn't. Serial " \ + f"output: {output}") + except OSError: + pass + with UDBSerialTestDevice() as udb_serial: + self.assertLess(udb_serial.get_time_to_open(), + UDBTestResources.get_expected_boot_timedelta(), + msg="Regression in boot time") + + @skipUnless(UDBTestResources.should_run_all_tests(), + "this test runs only with the --run-all flag and you have to diconnect your " \ + "debugger from UDB otherwise the assert will halt UDB") + def test_watchdog(self) -> None: + with UDBSerialTestDevice() as udb_serial: + try: + output = udb_serial.command("fault test_watchdog") + time.sleep(10) + self.assertTrue(False, f"Expected UDB to reset by watchdog, but it didn't. Serial " \ + f"output: {output}") + except OSError: + pass + with UDBSerialTestDevice() as udb_serial: + self.assertLess(udb_serial.get_time_to_open(), + UDBTestResources.get_expected_boot_timedelta(), + msg="Regression in boot time") + + @skipUnless(UDBTestResources.should_run_all_tests(), + "this test runs only with the --run-all flag and you have to diconnect your " \ + "debugger from UDB otherwise the assert will halt UDB") + def test_assert(self) -> None: + with UDBSerialTestDevice() as udb_serial: + try: + output = udb_serial.command("fault test_assert") + self.assertTrue(False, "Expected UDB to reset, but it didn't. Please make sure " \ + "there is no debugger connected to UDB, because then the " \ + "assert will cause UDB to halt! Serial output: " \ + f"{output}") + except (OopsError, OSError): + pass + with UDBSerialTestDevice() as udb_serial: + self.assertLess(udb_serial.get_time_to_open(), + UDBTestResources.get_expected_boot_timedelta(), + msg="Regression in boot time") diff --git a/test/udb_integration_test/tests/test_software_update.py b/test/udb_integration_test/tests/test_software_update.py new file mode 100644 index 0000000000..1a3de39943 --- /dev/null +++ b/test/udb_integration_test/tests/test_software_update.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +from udb_serial_device import UDBSerialTestDevice +from udb_dap_device import UDBDapTestDevice +from datetime import datetime +from udb_mass_storage_device import UDBMassStorageDevice +from unittest import TestCase +from udb_test_helper import UDBTestResources, indent_string +import logging + +logger = logging.getLogger("test.udb_integration_test") + +class SoftwareUpdateTest(TestCase): + def test_swu(self) -> None: + with UDBDapTestDevice() as udb_dap: + version_1 = udb_dap.get_udb_interface_version() + logger.info("\n" + indent_string(f"Version before test: {version_1}")) + with UDBSerialTestDevice() as udb: + logger.info(indent_string("Resetting into SWU mode...")) + udb.command_no_wait("\nreset_into_swu_mode") + with UDBMassStorageDevice() as udb: + start = datetime.now() + logger.info(indent_string("Copying dummy binary with different version...")) + udb.copy_firmware(UDBTestResources.get_path_to_binary_with_diff_version()) + with UDBSerialTestDevice() as udb: + self.assertLess(udb.get_time_to_open(), + UDBTestResources.get_expected_boot_timedelta(), + msg="Regression in boot time") + with UDBDapTestDevice() as udb_dap: + version_2 = udb_dap.get_udb_interface_version() + swu_time_taken = datetime.now() - start + logger.info(indent_string(f"Version after update:{version_2}")) + with UDBSerialTestDevice() as udb: + logger.info(indent_string("Resetting into SWU mode...")) + udb.command_no_wait("\nreset_into_swu_mode") + with UDBMassStorageDevice() as udb: + logger.info(indent_string("Copying back the original test binary...")) + udb.copy_firmware(UDBTestResources.get_path_to_current_binary()) + with UDBSerialTestDevice() as udb: + self.assertLess(udb.get_time_to_open(), + UDBTestResources.get_expected_boot_timedelta(), + msg="Regression in boot time") + with UDBDapTestDevice() as udb_dap: + version_3 = udb_dap.get_udb_interface_version() + logger.info(indent_string(f"Version after the test:{version_3}")) + + logger.info(indent_string(f"The software update test took {swu_time_taken.seconds}s")) + + expected_swu_time_sec = 30 + self.assertLess(swu_time_taken.seconds, expected_swu_time_sec, f"SWU took too long") + self.assertEqual(version_1, version_3, "The firmware version is not the same after the " \ + "test as before the test. You probably provide the wrong binary.") + self.assertNotEqual(version_1, version_2, "The version after the test software update " \ + "is the same as before, SWU probably failed or the wrong dummy " \ + "binary was provided.") diff --git a/test/udb_integration_test/tests/test_usb_stress.py b/test/udb_integration_test/tests/test_usb_stress.py new file mode 100644 index 0000000000..26c778740a --- /dev/null +++ b/test/udb_integration_test/tests/test_usb_stress.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +from typing import Generator +from udb_serial_device import UDBSerialTestDevice +from udb_dap_device import UDBDapTestDevice +from datetime import datetime +from unittest import skipUnless +import logging +from udb_test_helper import UDBTestResources, ContextTest, indent_string + +logger = logging.getLogger("test.udb_integration_test") + +class USBStressTest(ContextTest): + udb_dap: UDBDapTestDevice + udb_serial: UDBSerialTestDevice + + def context(self) -> Generator: + with UDBSerialTestDevice(baudrate=3000000) as self.udb_serial: + with UDBDapTestDevice() as self.udb_dap: + yield + + def test_usb(self) -> None: + start = datetime.now() + count = 0 + while True: + self.udb_serial.command_no_wait("TEST1TEST2TEST3TEST4TEST5TEST6TEST7TEST8TEST9TEST" \ + "TEST1TEST2TEST3TEST4TEST5TEST6TEST7TEST8TEST9TEST") + self.assertEqual(self.udb_dap.get_udb_interface_version()[1:5], + "udb_", "DAP commandreplies are bad") + count += 1 + if (datetime.now() - start).seconds > 15: + break + self.udb_serial.flush() + output = self.udb_serial.command("help") + self.assertRegex(output, "show available commands") + logger.info("\n" + indent_string(f"Read and wrote {count} times...")) diff --git a/test/udb_integration_test/udb_dap_device.py b/test/udb_integration_test/udb_dap_device.py new file mode 100644 index 0000000000..9a334721d5 --- /dev/null +++ b/test/udb_integration_test/udb_dap_device.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +from typing import ClassVar +from pyocd.probe import pydapaccess +from datetime import datetime, timedelta +import logging +from udb_test_helper import UDBTestResources + +logger = logging.getLogger("test.udb_integration_test") + +class DapDeviceError(Exception): + pass + +class UDBDapTestDevice: + timeout: ClassVar[timedelta] = timedelta(seconds=0.5) + + def __enter__(self): + self.device = None + unique_id = None + start = datetime.now() + while unique_id == None: + for dev in pydapaccess.DAPAccess.get_connected_devices(): + if dev.product_name[0:7] == "DAPLink": + if unique_id is None: + unique_id = dev.get_unique_id() + else: + logger.warning("WARNING: multiple DAPLinks are connected! Picking the" \ + " first one...") + if datetime.now() - start > UDBTestResources.get_expected_boot_timedelta(): + raise DapDeviceError("Timeout: Can't find any DAPLink device connected") + + self.device = pydapaccess.DAPAccess.get_device(unique_id) + if self.device is None: + raise DapDeviceError("Can't get DAPLink device object") + + self.device.open() + return self + + def __exit__(self, exc_type, exc_value, traceback) -> None: + self.device.close() + + def run_dap_test(self, + command_number: int, + send_data: list[int], + expected_data: list[int]=None, + expected_data_bit_mask: list[int]=None) -> list: + result = self.device.vendor(command_number, send_data) + logger.debug("Result: ", result) + if expected_data and expected_data_bit_mask: + # Not yet implemented + assert False + return result + + def get_device_dap_capabilities(self) -> int: + return self.device._capabilities + + def get_udb_interface_version(self) -> str: + vendor_cmd_id_get_version = 36 + result = self.device.vendor(vendor_cmd_id_get_version, [0]) + version = "".join(map(chr, result)) + logger.debug("".join(map(chr, result))) + return version diff --git a/test/udb_integration_test/udb_mass_storage_device.py b/test/udb_integration_test/udb_mass_storage_device.py new file mode 100644 index 0000000000..aadeb6d9e5 --- /dev/null +++ b/test/udb_integration_test/udb_mass_storage_device.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +import subprocess +from datetime import datetime +from time import sleep +import os +import logging +from udb_test_helper import UDBTestResources, indent_string + +logger = logging.getLogger("test.udb_integration_test") + +class UDBMassStorageError(Exception): + pass + +class UDBMassStorageDevice: + temp_folder_name: str = "./temp_udb_drive" + + def __enter__(self): + start = datetime.now() + self.path_to_udb_drive = None + self.tried_mounting = False + drive_name_to_search = "DAPLINK" + logger.info("\tWaiting for DAPLINK_BL drive to automount...") + while self.path_to_udb_drive == None: + output = self.run_cmd_and_find_string("mount", drive_name_to_search) + if output: + self.path_to_udb_drive = output[2] + search_dur_s = (datetime.now() - start).seconds + logger.info(f"\tFound UDB drive at {self.path_to_udb_drive} after {search_dur_s}s") + break + if datetime.now() - start > UDBTestResources.get_expected_boot_timedelta(): + if not self.tried_mounting: + self.tried_mounting = True + logger.info("\tAutomount failed, mounting manually") + drive_name_to_search = self.mount_udb_device() + else: + raise UDBMassStorageError("Timeout: can't find UDB drive") + sleep(0.1) + return self + + def __exit__(self, exc_type, exc_value, traceback) -> None: + if self.tried_mounting: + self.run_shell_command(f"sudo umount {self.temp_folder_name}") + sleep(0.1) + self.run_shell_command(f"rm -rf {self.temp_folder_name}") + + def run_shell_command(self, cmd: str, secs_before_kill: float =0.1) -> str: + proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) + try: + proc_out, err = proc.communicate(timeout=secs_before_kill) + proc.kill() + except subprocess.TimeoutExpired: + proc.kill() + proc_out, err = proc.communicate() + + if err is not None: + logger.error(str(proc_out[:60]) + "...") + raise UDBMassStorageError(f"Error running command \"{cmd}\": ", err) + elif len(proc_out) > 0: + logger.debug(indent_string(str(proc_out[:120]) + "...")) + return proc_out.decode("utf-8") + + def run_cmd_and_find_string(self, command: str, string: str) -> list[str]: + out = self.run_shell_command(command) + drive_list = out.split("\n") + for drive in drive_list: + if string in drive: + return drive.split(" ") + return [] + + def mount_udb_device(self) -> str: + path_to_udb_device = self.run_cmd_and_find_string('lsblk -o LABEL,PATH', "DAPLINK")[1] + if path_to_udb_device is None: + raise UDBMassStorageError("Can't find unmounted UDB mass storage devices") + logger.warning(f"\tRunning \"sudo mount\" to mount UDB mass storage device " \ + f"{path_to_udb_device} to {self.temp_folder_name}") + self.run_shell_command(f"mkdir {self.temp_folder_name}") + + self.run_shell_command(f"sudo mount {path_to_udb_device} {self.temp_folder_name} " \ + f"--options nosuid,nodev,flush,showexec,uid={os.getuid()}," \ + f"gid={os.getgid()}", 25) + + return path_to_udb_device + + def copy_firmware(self, path_to_binary: str) -> None: + self.run_shell_command(f"cp {path_to_binary} {self.path_to_udb_drive}", 5) diff --git a/test/udb_integration_test/udb_serial_device.py b/test/udb_integration_test/udb_serial_device.py new file mode 100644 index 0000000000..d948775a66 --- /dev/null +++ b/test/udb_integration_test/udb_serial_device.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python +import pexpect +from psutil import boot_time +from serial import Serial, SerialException +from pexpect.spawnbase import SpawnBase +import sys +import re +from datetime import datetime, timedelta + +from udb_test_helper import UDBTestResources + +class WatchdogError(Exception): + pass + +class OopsError(Exception): + pass + +class UDBSerialTestDeviceError(Exception): + pass + +class SerialExpect(SpawnBase): + ''' Implementation of pexpect's SpawnBase for use with pyserial ''' + + def __init__(self, ser: Serial, **kwargs) -> None: + SpawnBase.__init__(self, **kwargs) + + self.closed = False + self.name = ser.name + self.ser = ser + + def close(self) -> None: + self.flush() + self.ser.close() + + def isalive(self) -> bool: + return self.ser.is_open + + def send(self, s: str) -> int: + s = self._coerce_send_string(s) + self._log(s, 'send') + + s = self._encoder.encode(s, final=False) + return self.ser.write(s) + + def sendline(self, s: str) -> int: + s = self._coerce_send_string(s) + return self.send(s + self.linesep) + + def write(self, s: str) -> None: + self.send(s) + + def writelines(self, sequence: list[str]) -> None: + for s in sequence: + self.write(s) + + def read_nonblocking(self, size: int=1, timeout: float=-1) -> str: + if timeout == -1: + timeout = self.timeout + + to_read = min(self.ser.in_waiting, size) + data = self.ser.read(to_read) + data = self._decoder.decode(data, final=False) + self._log(data, 'read') + return data + +class UDBSerialTestDevice: + ser: Serial + dev: SerialExpect + cached_output: str + time_to_open: timedelta + + def __init__(self, baudrate: int=1000000, debug: bool=False) -> None: + start = datetime.now() + while True: + try: + self.ser = Serial(UDBTestResources.get_serial_port_path(), baudrate) + break + except (SerialException, FileNotFoundError, UDBSerialTestDeviceError, OSError) as e: + if datetime.now() - start > UDBTestResources.get_expected_boot_timedelta() * 2: + raise UDBSerialTestDeviceError("Timeout and error while trying to open " \ + "serial port:", e) + continue + self.time_to_open = datetime.now() - start + self.dev = SerialExpect(self.ser) + + # Log data coming in to stdout + if debug: + self.dev.logfile = sys.stdout.buffer + + def __enter__(self): + self.flush() + return self + + def __exit__(self, exc_type, exc_value, traceback) -> None: + self.dev.close() + + def get_time_to_open(self): + return self.time_to_open + + def wait_for(self, string: str, timeout: float=10) -> str: + idx = self.dev.expect([ + # 0, EOF + pexpect.EOF, + # 1, TIMEOUT + pexpect.TIMEOUT, + # 2, watchdog happened + '!!wdog!!', + # 3, Oops occurred + 'Oops!', + # 4, got expected string. No errors! + string], + timeout=timeout) + + output = self.dev.before.decode('utf-8') + + if idx == 0: + raise EOFError(output) + elif idx == 1: + raise TimeoutError(output) + elif idx == 2: + raise WatchdogError(output) + elif idx == 3: + raise OopsError(output) + else: + match = re.search('DONE ([0-9-]*)\r', output) + if match: + code = int(match.group(1)) + if code != 0: + raise ValueError + return output + + def wait_for_prompt(self, timeout: float=10) -> str: + return self.wait_for('\n> ', timeout=timeout) + + def command(self, cmd: str, timeout: float=10) -> str: + self.command_no_wait(cmd) + return self.wait_for_prompt(timeout=timeout) + + def commands(self, cmds: str, timeout: float=10) -> list[str]: + return [self.command(cmd, timeout) for cmd in cmds] + + def command_no_wait(self, cmd: str) -> None: + self.dev.send(cmd + '\n') + + def update_baud_rate(self, baud: int) -> None: + self.ser.baudrate = baud + + def flush(self) -> None: + self.ser.flush() + if sys.platform.lower() != 'darwin': + self.ser.reset_input_buffer() + self.ser.reset_output_buffer() + try: + while(True): + self.wait_for_prompt(0.1) + except Exception: + pass diff --git a/test/udb_integration_test/udb_test_helper.py b/test/udb_integration_test/udb_test_helper.py new file mode 100644 index 0000000000..a1e30c0d87 --- /dev/null +++ b/test/udb_integration_test/udb_test_helper.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +from typing import ClassVar, Generator, Optional +from datetime import timedelta +from unittest import TestCase +import logging + +logger = logging.getLogger("test.udb_integration_test") + +def indent_string(string: str) -> str: + return '\t' + '\t'.join(string.splitlines(True)) + +class ContextTest(TestCase): + __context: Generator + + # A unit test where setUp/tearDown are combined into a single generator to make sure + # closing happens even if an exception is raised during the test and to minimize the amount of + # code needed + def context(self) -> Generator: + # Put both setUp and tearDown code in this generator method with a single `yield` + # between + yield + + def setUp(self) -> None: + self.__context = self.context() + next(self.__context) + + def tearDown(self) -> None: + for _ in self.__context: + raise RuntimeError("Context method should only yield once") + +class UDBTestResources: + serial_port_path: ClassVar[Optional[str]] = None + path_to_binary_with_diff_version: ClassVar[Optional[str]] = None + path_to_current_binary: ClassVar[Optional[str]] = None + boot_timedelta: ClassVar[timedelta] = timedelta(seconds=20) + run_all_tests: ClassVar[bool] = False + + @classmethod + def set_serial_port_path(cls, + serial_port_path: str) -> None: + cls.serial_port_path = serial_port_path + + @classmethod + def get_serial_port_path(cls) -> Optional[str]: + if cls.serial_port_path is None: + raise Exception("Error: serial port path needs to be set before trying to access it") + return cls.serial_port_path + + @classmethod + def set_binary_paths(cls, path_to_current_binary: str, + path_to_binary_with_diff_version: str) -> None: + cls.path_to_current_binary = path_to_current_binary + cls.path_to_binary_with_diff_version = path_to_binary_with_diff_version + + @classmethod + def get_path_to_binary_with_diff_version(cls) -> Optional[str]: + if cls.path_to_binary_with_diff_version is None: + raise Exception("Error: path_to_binary_with_diff_version needs to be set before trying to access it") + return cls.path_to_binary_with_diff_version + + @classmethod + def get_path_to_current_binary(cls) -> Optional[str]: + if cls.path_to_current_binary is None: + raise Exception("Error: path_to_current_binary needs to be set before trying to access it") + return cls.path_to_current_binary + + @classmethod + def set_expected_boot_timedelta(cls, timeout: timedelta) -> None: + cls.boot_timedelta = timeout + + @classmethod + def get_expected_boot_timedelta(cls) -> timedelta: + return cls.boot_timedelta + + @classmethod + def set_should_run_all_tests(cls, run_all_tests: bool) -> None: + cls.run_all_tests = run_all_tests + + @classmethod + def should_run_all_tests(cls) -> bool: + return cls.run_all_tests diff --git a/test/udb_integration_test/udb_test_main.py b/test/udb_integration_test/udb_test_main.py new file mode 100644 index 0000000000..02aacafd72 --- /dev/null +++ b/test/udb_integration_test/udb_test_main.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +from datetime import timedelta +from udb_test_helper import UDBTestResources +from argparse import ArgumentParser +import unittest +import logging +import os + +if __name__ == "__main__": + parser = ArgumentParser() + parser.add_argument("--serial_port_path", dest="serial_port_path", required=True, + help="Path to the UDB debug console serial port") + parser.add_argument("--test_bin_path", dest="test_bin", required=True, + help="Path to the binary you are testing") + parser.add_argument("--dummy_bin_path", dest="dummy_bin", required=True, + help="Path to a dummy binary that has a different version from the " \ + "binary under test") + parser.add_argument("-d", "--debug", + action="store_true", dest="debug", default=False, + help="don't print logs") + parser.add_argument("-a", "--run-all", + action="store_true", dest="run_all", default=False, + help="With this flag, the script won't skip less convinient tests such " \ + "as the assert test, which requires the debugger connected to UDB " \ + "to be detached.") + args = parser.parse_args() + + logging.basicConfig(format='%(message)s') + logger = logging.getLogger("test.udb_integration_test") + if args.debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + + max_allowed_boot_time = timedelta(seconds=13) + + UDBTestResources.set_binary_paths(args.test_bin, args.dummy_bin) + UDBTestResources.set_should_run_all_tests(args.run_all) + UDBTestResources.set_expected_boot_timedelta(max_allowed_boot_time) + UDBTestResources.set_serial_port_path(args.serial_port_path) + + suite = unittest.TestLoader().discover(os.path.dirname(os.path.realpath(__file__))+"/tests", + pattern = "*") + unittest.TextTestRunner(verbosity=2, failfast=True).run(suite)