From 236f1dd85582b82279ca5e89c6b95a6e334a3615 Mon Sep 17 00:00:00 2001 From: NikhitaR-IFX Date: Mon, 6 May 2024 13:28:25 +0530 Subject: [PATCH] ports/psoc6: SPI refactoring and tests. Signed-off-by: NikhitaR-IFX --- ports/psoc6/machine_spi.c | 15 +++-- ports/psoc6/mpconfigport.h | 2 + tests/psoc6/hw_ext/spi_master.py | 111 +++++++++++++++++++------------ tests/psoc6/hw_ext/spi_slave.py | 99 ++++++++++++++++++--------- 4 files changed, 146 insertions(+), 81 deletions(-) diff --git a/ports/psoc6/machine_spi.c b/ports/psoc6/machine_spi.c index 01ae490bbf25a..70d36efacfdf6 100644 --- a/ports/psoc6/machine_spi.c +++ b/ports/psoc6/machine_spi.c @@ -291,8 +291,6 @@ static void machine_spi_transfer(mp_obj_base_t *self_in, size_t len, const uint8 tx_buf = tx; memset(rx_temp_buf, 0x01, len * sizeof(uint8_t)); rx_buf = rx_temp_buf; - // result = cyhal_spi_transfer(&self->spi_obj, tx, len, rx_temp_buf, len, write_fill); - // spi_assert_raise_val("SPI transfer failed with return code %lx !", result); } // Case 2: tx and rx equal --> read(), readinto() and write_readinto() with tx and rx same buffers else { @@ -301,8 +299,6 @@ static void machine_spi_transfer(mp_obj_base_t *self_in, size_t len, const uint8 tx_buf = tx_temp_buf; rx_buf = rx; write_fill = tx_temp_buf[0]; - // result = cyhal_spi_transfer(&self->spi_obj, tx, len, rx, len, write_fill); - // spi_assert_raise_val("SPI read failed with return code %lx !", result); } else { tx_buf = tx; rx_buf = rx; @@ -348,13 +344,14 @@ static mp_obj_t machine_spi_slave_deinit(mp_obj_t self_in) { MP_DEFINE_CONST_FUN_OBJ_1(machine_spi_slave_deinit_obj, machine_spi_slave_deinit); static mp_obj_t machine_spi_slave_read(mp_obj_t self_in, mp_obj_t buf_in) { + cy_rslt_t result; machine_spi_obj_t *self = MP_OBJ_TO_PTR(self_in); mp_buffer_info_t bufinfo; mp_get_buffer_raise(buf_in, &bufinfo, MP_BUFFER_WRITE); uint16_t len = bufinfo.len; uint8_t tx_dummy[len]; memset(tx_dummy, 0x02, len * sizeof(uint8_t)); - cy_rslt_t result = cyhal_spi_transfer(&self->spi_obj, tx_dummy, len, bufinfo.buf, len, 0xFF); + result = cyhal_spi_transfer(&self->spi_obj, tx_dummy, len, bufinfo.buf, len, 0xFF); spi_assert_raise_val("SPI slave read failed with return code %lx !", result); return mp_const_none; @@ -362,13 +359,14 @@ static mp_obj_t machine_spi_slave_read(mp_obj_t self_in, mp_obj_t buf_in) { static MP_DEFINE_CONST_FUN_OBJ_2(machine_spi_slave_read_obj, machine_spi_slave_read); static mp_obj_t machine_spi_slave_write(mp_obj_t self_in, mp_obj_t buf_in) { + cy_rslt_t result; machine_spi_obj_t *self = MP_OBJ_TO_PTR(self_in); mp_buffer_info_t bufinfo; mp_get_buffer_raise(buf_in, &bufinfo, MP_BUFFER_READ); uint16_t len = bufinfo.len; uint8_t rx_dummy[len]; memset(rx_dummy, 0x01, len * sizeof(uint8_t)); - cy_rslt_t result = cyhal_spi_transfer(&self->spi_obj, bufinfo.buf, len, rx_dummy, len, 0xFF); + result = cyhal_spi_transfer(&self->spi_obj, bufinfo.buf, len, rx_dummy, len, 0xFF); spi_assert_raise_val("SPI slave write failed with return code %lx !", result); return mp_const_none; } @@ -379,7 +377,7 @@ static mp_obj_t machine_spi_slave_write_readinto(mp_obj_t self_in, mp_obj_t tx_b mp_buffer_info_t tx_bufinfo; mp_buffer_info_t rx_bufinfo; mp_get_buffer_raise(tx_buf_in, &tx_bufinfo, MP_BUFFER_READ); - mp_get_buffer_raise(rx_buf_in, &rx_bufinfo, MP_BUFFER_READ); + mp_get_buffer_raise(rx_buf_in, &rx_bufinfo, MP_BUFFER_WRITE); uint16_t len = tx_bufinfo.len; if (tx_bufinfo.len != rx_bufinfo.len) { mp_raise_ValueError(MP_ERROR_TEXT("buffers must be the same length")); @@ -397,6 +395,9 @@ static const mp_rom_map_elem_t machine_spi_slave_locals_dict_table[] = { { MP_ROM_QSTR(MP_QSTR_write), MP_ROM_PTR(&machine_spi_slave_write_obj) }, { MP_ROM_QSTR(MP_QSTR_write_readinto), MP_ROM_PTR(&machine_spi_slave_write_readinto_obj) }, { MP_ROM_QSTR(MP_QSTR_deinit), MP_ROM_PTR(&machine_spi_slave_deinit_obj) }, + + { MP_ROM_QSTR(MP_QSTR_MSB), MP_ROM_INT(MICROPY_PY_MACHINE_SPISLAVE_MSB) }, + { MP_ROM_QSTR(MP_QSTR_LSB), MP_ROM_INT(MICROPY_PY_MACHINE_SPISLAVE_LSB) }, }; static MP_DEFINE_CONST_DICT(machine_spi_slave_locals_dict, machine_spi_slave_locals_dict_table); diff --git a/ports/psoc6/mpconfigport.h b/ports/psoc6/mpconfigport.h index f06130c917121..322cf87d9de26 100644 --- a/ports/psoc6/mpconfigport.h +++ b/ports/psoc6/mpconfigport.h @@ -128,6 +128,8 @@ #define MICROPY_PY_MACHINE_SPI_SLAVE (1) #define MICROPY_PY_MACHINE_SPI_MSB (0) #define MICROPY_PY_MACHINE_SPI_LSB (1) +#define MICROPY_PY_MACHINE_SPISLAVE_MSB (0) +#define MICROPY_PY_MACHINE_SPISLAVE_LSB (1) #define MICROPY_PY_MACHINE_SOFTSPI (1) #define MICROPY_PY_MACHINE_PSOC6_I2S (1) diff --git a/tests/psoc6/hw_ext/spi_master.py b/tests/psoc6/hw_ext/spi_master.py index 1bfdd67378891..f0d5ee73db91e 100644 --- a/tests/psoc6/hw_ext/spi_master.py +++ b/tests/psoc6/hw_ext/spi_master.py @@ -3,19 +3,45 @@ import os try: - from machine import SPI, SPISlave + from machine import Pin, SPI, SPISlave except ImportError: print("SKIP") raise SystemExit -# Allocate pin based on board -sck_master_pin = "P6_2" -mosi_master_pin = "P6_0" -miso_master_pin = "P6_1" -ssel_master_pin = "P6_3" +machine = os.uname().machine +if "CY8CPROTO-062-4343W" in machine: + master_write_notify_pin = "P12_4" # Sends signals when master wants to write data + master_read_notify_pin = "P12_3" # Polls pin to check if slave is writing data + # Allocate pin based on board + sck_master_pin = "P6_2" + mosi_master_pin = "P6_0" + miso_master_pin = "P6_1" + ssel_master_pin = "P6_3" -success = b"1" -failed = b"2" +signal_received = False + + +def signal_irq(arg): + global signal_received + signal_received = True + + +pin_out = Pin(master_write_notify_pin, mode=Pin.OUT, value=True) +pin_in = Pin(master_read_notify_pin, Pin.IN) +pin_in.irq(handler=signal_irq, trigger=Pin.IRQ_RISING) + + +def _master_ready_to_write(): + pin_out.value(0) + time.sleep_ms(10) + pin_out.value(1) + + +def _wait_for_slave_signal(): + global signal_received + while not signal_received: + pass + signal_received = False def spi_master_configure(): @@ -30,7 +56,6 @@ def spi_master_configure(): mosi=mosi_master_pin, miso=miso_master_pin, ) - print("\n", spi_obj) return spi_obj @@ -44,54 +69,54 @@ def _verify_test(test_case, actual_data, exp_data): def spi_half_duplex_communication(spi_obj, tx, rx): print("\n*** Half duplex communication ***") - status = bytearray(1) - print("1) master-->write and slave-->read") + print("\nTest Case 1: master-->write and slave-->read") + _master_ready_to_write() spi_obj.write(tx) - print("tx: ", tx) - status = spi_obj.read(1) - _verify_test("master write", status, success) - - print("2) slave-->write and master-->read using readinto()") + print("Master wrote (tx): \t", tx) + _wait_for_slave_signal() + rx = spi_obj.read(8) + print("Slave returned (rx): \t", rx) + # print("Status: ", spi_obj.read(8)) + print("Test case successful : \t", rx == tx) + + print("\nTest Case 2: slave-->write and master-->read using readinto()") + rx = bytearray(8) + _wait_for_slave_signal() spi_obj.readinto(rx) - _verify_test("master read ", rx, tx) + exp = b"\x01\x03\x05\x07\x02\x04\x06\x08" + print("Slave wrote (rx): \t", rx) + print("Master expects (exp): \t", exp) + print("Test case successful : \t", rx == exp) - print("3) slave-->write and master-->read using readinto by sending out a write_byte=0x12") - spi_obj.readinto(rx, 0x12) - _verify_test("master read ", rx, tx) - status = spi_obj.read(1) - _verify_test("write_byte received by slave correctly", status, success) + print( + "\nTest Case 3: slave-->write and master-->read using readinto by sending out a write_byte=0x12" + ) + exp = b"\x08\x06\x04\x02\x08\x06\x04\x02" + spi_obj.readinto(rx, 5) + # _wait_for_slave_signal() + print("Slave wrote (rx): \t", rx) + print("Master expects (exp): \t", exp) + print("Test case successful : \t", rx == exp) def spi_full_duplex_communication(spi_obj, tx, rx): print("*** Full duplex communication ***") - exp_rx = b"\x06\x06\x05\x05" + exp_rx = b"\x06\x06\x05\x05\x06\x06\x05\x05" print("1) master-->write and slave-->read continuously") spi_obj.write_readinto(tx, rx) - _verify_test("read value is same as expected", rx, exp_rx) - - -"""def spi_master_read_verify(spi_obj,tx,rx): - rx = spi_obj.read(len(tx)) print(rx) - rx = spi_obj.read(len(tx),0x11) - print(rx) - print("SPI master write followed by read is successful: ", rx==tx) + # _verify_test("read value is same as expected", rx, exp_rx) -def spi_master_readinto_verify(spi_obj,tx,rx): - spi_obj.write(tx) - print("SPI master write completed") - rx = spi_obj.read(len(tx)) - print(rx) - print("SPI master write followed by read is successful: ", rx==tx)""" -print("*** SPI MASTER INSTANCE ***") +print("\n*** SPI MASTER INSTANCE ***") spi_obj = spi_master_configure() -tx_buf = b"\x08\x01\x08\x02" -rx_buf = bytearray(4) +# tx_buf = b"\x08" +tx_buf = b"\x08\x06\x04\x02\x07\x05\x03\x01" +rx_buf = bytearray(8) spi_half_duplex_communication(spi_obj, tx_buf, rx_buf) -"""tx_buf = b"\x06\x08\x05\x07" -rx_buf = bytearray(4) -spi_full_duplex_communication(spi_obj, tx_buf, rx_buf)""" +tx_buf = b"\x08\x06\x04\x02\x07\x05\x03\x01" +rx_buf = bytearray(8) +spi_full_duplex_communication(spi_obj, tx_buf, rx_buf) diff --git a/tests/psoc6/hw_ext/spi_slave.py b/tests/psoc6/hw_ext/spi_slave.py index 3fd4b022dcfe4..8d39589f4bb6f 100644 --- a/tests/psoc6/hw_ext/spi_slave.py +++ b/tests/psoc6/hw_ext/spi_slave.py @@ -3,18 +3,45 @@ import os try: - from machine import SPI, SPISlave + from machine import Pin, SPI, SPISlave except ImportError: print("SKIP") raise SystemExit -sck_slave_pin = "P13_2" -mosi_slave_pin = "P13_0" -miso_slave_pin = "P13_1" -ssel_slave_pin = "P13_3" + # Allocate pin based on board +machine = os.uname().machine +if "CY8CPROTO-062-4343W" in machine: + slave_write_notify_pin = "P12_3" # Sends signals when master wants to write data + slave_read_notify_pin = "P12_4" # Polls pin to check if slave is writing data + sck_slave_pin = "P13_2" + mosi_slave_pin = "P13_0" + miso_slave_pin = "P13_1" + ssel_slave_pin = "P13_3" -success = b"1" -failed = b"2" +signal_received = False + + +def signal_irq(arg): + global signal_received + signal_received = True + + +pin_out = Pin(slave_write_notify_pin, mode=Pin.OUT, value=True) +pin_in = Pin(slave_read_notify_pin, Pin.IN) +pin_in.irq(handler=signal_irq, trigger=Pin.IRQ_RISING) + + +def _slave_ready_to_write(): + pin_out.value(0) + time.sleep_ms(10) + pin_out.value(1) + + +def _wait_for_master_signal(): + global signal_received + while not signal_received: + pass + signal_received = False def spi_slave_configure(): @@ -23,64 +50,74 @@ def spi_slave_configure(): polarity=0, phase=0, bits=8, - firstbit=SPI.MSB, + firstbit=SPISlave.MSB, ssel=ssel_slave_pin, sck=sck_slave_pin, mosi=mosi_slave_pin, miso=miso_slave_pin, ) - print("\n", spi_obj) return spi_obj def _verify_test(rx, exp_data): if rx == exp_data: print("Successful") - spi_obj.write(success) + spi_obj.write(rx) else: print("Failed") spi_obj.write(failed) def spi_half_duplex_communication(spi_obj, tx, rx): - print("\n***Half duplex communication ***") - write_byte = "0x12" - write_byte_status = bytearray(1) - print("1) master-->write and slave-->read") + print("\n*** Half duplex communication ***") + write_byte = bytearray(1) + # 1) master-->write and slave-->read + # print("\n1) master-->write and slave-->read") + _wait_for_master_signal() spi_obj.read(rx) - print("rx: ", rx) - exp_data = tx - _verify_test(rx, exp_data) + + # print("rx: ", rx) + # print("slave read successful : ", rx==tx) + _slave_ready_to_write() + spi_obj.write(rx) # 2) slave-->write and master-->read" - print("2) slave-->write and master-->read using readinto()") - spi_obj.write(tx) + # print("\n2) slave-->write and master-->read using readinto()") + tx_buf = b"\x01\x03\x05\x07\x02\x04\x06\x08" + _slave_ready_to_write() + spi_obj.write(tx_buf) # 3) slave-->write and master-->read by sending a write_byte=0x12" - print("3) slave-->write and master-->read using readinto by sending out a write_byte=0x12") + print("\n3) slave-->write and master-->read using readinto by sending out a write_byte=0x12") + tx = b"\x08\x06\x04\x02\x08\x06\x04\x02" + spi_obj.read(write_byte) + print("write_byte: ", write_byte) spi_obj.write(tx) - spi_obj.read(write_byte_status) - _verify_test(write_byte_status, b"0x12") + print("OUT") + + # _slave_ready_to_write() def spi_full_duplex_communication(spi_obj, tx, rx): print("*** Full duplex communication ***") - exp_rx = b"\x06\x08\x05\x07" - print("1) master-->write and slave-->read continuously") - spi_obj.write_readinto(tx, rx) + exp_rx = b"\x08\x06\x04\x02\x07\x05\x03\x01" + print("\n1) master-->write and slave-->read continuously") + spi_obj.write(tx) + # spi_obj.write_readinto(tx, rx) + spi_obj.read(rx) print(rx) # _verify_test("") -print("*** SPI SLAVE INSTANCE ***") +print("\n*** SPI SLAVE INSTANCE ***") spi_obj = spi_slave_configure() -tx_buf = b"\x08\x01\x08\x02" -rx_buf = bytearray(4) +tx_buf = b"\x08\x06\x04\x02\x08\x06\x04\x02" +rx_buf = bytearray(8) spi_half_duplex_communication(spi_obj, tx_buf, rx_buf) -"""tx_buf = b"\x06\x06\x05\x05" -rx_buf = bytearray(4) -spi_full_duplex_communication(spi_obj, tx_buf, rx_buf)""" +tx_buf = b"\x06\x06\x05\x05\x06\x06\x05\x05" +rx_buf = bytearray(8) +spi_full_duplex_communication(spi_obj, tx_buf, rx_buf)