Skip to content

Commit

Permalink
ports/psoc6: SPI refactoring and tests.
Browse files Browse the repository at this point in the history
Signed-off-by: NikhitaR-IFX <Nikhita.Rajasekhar@infineon.com>
  • Loading branch information
NikhitaR-IFX committed May 6, 2024
1 parent ab76c4e commit 236f1dd
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 81 deletions.
15 changes: 8 additions & 7 deletions ports/psoc6/machine_spi.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,6 @@ static void machine_spi_transfer(mp_obj_base_t *self_in, size_t len, const uint8
tx_buf = tx;
memset(rx_temp_buf, 0x01, len * sizeof(uint8_t));
rx_buf = rx_temp_buf;
// result = cyhal_spi_transfer(&self->spi_obj, tx, len, rx_temp_buf, len, write_fill);
// spi_assert_raise_val("SPI transfer failed with return code %lx !", result);
}
// Case 2: tx and rx equal --> read(), readinto() and write_readinto() with tx and rx same buffers
else {
Expand All @@ -301,8 +299,6 @@ static void machine_spi_transfer(mp_obj_base_t *self_in, size_t len, const uint8
tx_buf = tx_temp_buf;
rx_buf = rx;
write_fill = tx_temp_buf[0];
// result = cyhal_spi_transfer(&self->spi_obj, tx, len, rx, len, write_fill);
// spi_assert_raise_val("SPI read failed with return code %lx !", result);
} else {
tx_buf = tx;
rx_buf = rx;
Expand Down Expand Up @@ -348,27 +344,29 @@ static mp_obj_t machine_spi_slave_deinit(mp_obj_t self_in) {
MP_DEFINE_CONST_FUN_OBJ_1(machine_spi_slave_deinit_obj, machine_spi_slave_deinit);

static mp_obj_t machine_spi_slave_read(mp_obj_t self_in, mp_obj_t buf_in) {
cy_rslt_t result;
machine_spi_obj_t *self = MP_OBJ_TO_PTR(self_in);
mp_buffer_info_t bufinfo;
mp_get_buffer_raise(buf_in, &bufinfo, MP_BUFFER_WRITE);
uint16_t len = bufinfo.len;
uint8_t tx_dummy[len];
memset(tx_dummy, 0x02, len * sizeof(uint8_t));
cy_rslt_t result = cyhal_spi_transfer(&self->spi_obj, tx_dummy, len, bufinfo.buf, len, 0xFF);
result = cyhal_spi_transfer(&self->spi_obj, tx_dummy, len, bufinfo.buf, len, 0xFF);
spi_assert_raise_val("SPI slave read failed with return code %lx !", result);

return mp_const_none;
}
static MP_DEFINE_CONST_FUN_OBJ_2(machine_spi_slave_read_obj, machine_spi_slave_read);

static mp_obj_t machine_spi_slave_write(mp_obj_t self_in, mp_obj_t buf_in) {
cy_rslt_t result;
machine_spi_obj_t *self = MP_OBJ_TO_PTR(self_in);
mp_buffer_info_t bufinfo;
mp_get_buffer_raise(buf_in, &bufinfo, MP_BUFFER_READ);
uint16_t len = bufinfo.len;
uint8_t rx_dummy[len];
memset(rx_dummy, 0x01, len * sizeof(uint8_t));
cy_rslt_t result = cyhal_spi_transfer(&self->spi_obj, bufinfo.buf, len, rx_dummy, len, 0xFF);
result = cyhal_spi_transfer(&self->spi_obj, bufinfo.buf, len, rx_dummy, len, 0xFF);
spi_assert_raise_val("SPI slave write failed with return code %lx !", result);
return mp_const_none;
}
Expand All @@ -379,7 +377,7 @@ static mp_obj_t machine_spi_slave_write_readinto(mp_obj_t self_in, mp_obj_t tx_b
mp_buffer_info_t tx_bufinfo;
mp_buffer_info_t rx_bufinfo;
mp_get_buffer_raise(tx_buf_in, &tx_bufinfo, MP_BUFFER_READ);
mp_get_buffer_raise(rx_buf_in, &rx_bufinfo, MP_BUFFER_READ);
mp_get_buffer_raise(rx_buf_in, &rx_bufinfo, MP_BUFFER_WRITE);
uint16_t len = tx_bufinfo.len;
if (tx_bufinfo.len != rx_bufinfo.len) {
mp_raise_ValueError(MP_ERROR_TEXT("buffers must be the same length"));
Expand All @@ -397,6 +395,9 @@ static const mp_rom_map_elem_t machine_spi_slave_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR_write), MP_ROM_PTR(&machine_spi_slave_write_obj) },
{ MP_ROM_QSTR(MP_QSTR_write_readinto), MP_ROM_PTR(&machine_spi_slave_write_readinto_obj) },
{ MP_ROM_QSTR(MP_QSTR_deinit), MP_ROM_PTR(&machine_spi_slave_deinit_obj) },

{ MP_ROM_QSTR(MP_QSTR_MSB), MP_ROM_INT(MICROPY_PY_MACHINE_SPISLAVE_MSB) },
{ MP_ROM_QSTR(MP_QSTR_LSB), MP_ROM_INT(MICROPY_PY_MACHINE_SPISLAVE_LSB) },
};
static MP_DEFINE_CONST_DICT(machine_spi_slave_locals_dict, machine_spi_slave_locals_dict_table);

Expand Down
2 changes: 2 additions & 0 deletions ports/psoc6/mpconfigport.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@
#define MICROPY_PY_MACHINE_SPI_SLAVE (1)
#define MICROPY_PY_MACHINE_SPI_MSB (0)
#define MICROPY_PY_MACHINE_SPI_LSB (1)
#define MICROPY_PY_MACHINE_SPISLAVE_MSB (0)
#define MICROPY_PY_MACHINE_SPISLAVE_LSB (1)
#define MICROPY_PY_MACHINE_SOFTSPI (1)

#define MICROPY_PY_MACHINE_PSOC6_I2S (1)
Expand Down
111 changes: 68 additions & 43 deletions tests/psoc6/hw_ext/spi_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,45 @@
import os

try:
from machine import SPI, SPISlave
from machine import Pin, SPI, SPISlave
except ImportError:
print("SKIP")
raise SystemExit

# Allocate pin based on board
sck_master_pin = "P6_2"
mosi_master_pin = "P6_0"
miso_master_pin = "P6_1"
ssel_master_pin = "P6_3"
machine = os.uname().machine
if "CY8CPROTO-062-4343W" in machine:
master_write_notify_pin = "P12_4" # Sends signals when master wants to write data
master_read_notify_pin = "P12_3" # Polls pin to check if slave is writing data
# Allocate pin based on board
sck_master_pin = "P6_2"
mosi_master_pin = "P6_0"
miso_master_pin = "P6_1"
ssel_master_pin = "P6_3"

success = b"1"
failed = b"2"
signal_received = False


def signal_irq(arg):
global signal_received
signal_received = True


pin_out = Pin(master_write_notify_pin, mode=Pin.OUT, value=True)
pin_in = Pin(master_read_notify_pin, Pin.IN)
pin_in.irq(handler=signal_irq, trigger=Pin.IRQ_RISING)


def _master_ready_to_write():
pin_out.value(0)
time.sleep_ms(10)
pin_out.value(1)


def _wait_for_slave_signal():
global signal_received
while not signal_received:
pass
signal_received = False


def spi_master_configure():
Expand All @@ -30,7 +56,6 @@ def spi_master_configure():
mosi=mosi_master_pin,
miso=miso_master_pin,
)
print("\n", spi_obj)
return spi_obj


Expand All @@ -44,54 +69,54 @@ def _verify_test(test_case, actual_data, exp_data):

def spi_half_duplex_communication(spi_obj, tx, rx):
print("\n*** Half duplex communication ***")
status = bytearray(1)
print("1) master-->write and slave-->read")
print("\nTest Case 1: master-->write and slave-->read")
_master_ready_to_write()
spi_obj.write(tx)
print("tx: ", tx)
status = spi_obj.read(1)
_verify_test("master write", status, success)

print("2) slave-->write and master-->read using readinto()")
print("Master wrote (tx): \t", tx)
_wait_for_slave_signal()
rx = spi_obj.read(8)
print("Slave returned (rx): \t", rx)
# print("Status: ", spi_obj.read(8))
print("Test case successful : \t", rx == tx)

print("\nTest Case 2: slave-->write and master-->read using readinto()")
rx = bytearray(8)
_wait_for_slave_signal()
spi_obj.readinto(rx)
_verify_test("master read ", rx, tx)
exp = b"\x01\x03\x05\x07\x02\x04\x06\x08"
print("Slave wrote (rx): \t", rx)
print("Master expects (exp): \t", exp)
print("Test case successful : \t", rx == exp)

print("3) slave-->write and master-->read using readinto by sending out a write_byte=0x12")
spi_obj.readinto(rx, 0x12)
_verify_test("master read ", rx, tx)
status = spi_obj.read(1)
_verify_test("write_byte received by slave correctly", status, success)
print(
"\nTest Case 3: slave-->write and master-->read using readinto by sending out a write_byte=0x12"
)
exp = b"\x08\x06\x04\x02\x08\x06\x04\x02"
spi_obj.readinto(rx, 5)
# _wait_for_slave_signal()
print("Slave wrote (rx): \t", rx)
print("Master expects (exp): \t", exp)
print("Test case successful : \t", rx == exp)


def spi_full_duplex_communication(spi_obj, tx, rx):
print("*** Full duplex communication ***")
exp_rx = b"\x06\x06\x05\x05"
exp_rx = b"\x06\x06\x05\x05\x06\x06\x05\x05"
print("1) master-->write and slave-->read continuously")
spi_obj.write_readinto(tx, rx)
_verify_test("read value is same as expected", rx, exp_rx)


"""def spi_master_read_verify(spi_obj,tx,rx):
rx = spi_obj.read(len(tx))
print(rx)
rx = spi_obj.read(len(tx),0x11)
print(rx)
print("SPI master write followed by read is successful: ", rx==tx)
# _verify_test("read value is same as expected", rx, exp_rx)

def spi_master_readinto_verify(spi_obj,tx,rx):
spi_obj.write(tx)
print("SPI master write completed")
rx = spi_obj.read(len(tx))
print(rx)
print("SPI master write followed by read is successful: ", rx==tx)"""

print("*** SPI MASTER INSTANCE ***")
print("\n*** SPI MASTER INSTANCE ***")

spi_obj = spi_master_configure()

tx_buf = b"\x08\x01\x08\x02"
rx_buf = bytearray(4)
# tx_buf = b"\x08"
tx_buf = b"\x08\x06\x04\x02\x07\x05\x03\x01"
rx_buf = bytearray(8)
spi_half_duplex_communication(spi_obj, tx_buf, rx_buf)

"""tx_buf = b"\x06\x08\x05\x07"
rx_buf = bytearray(4)
spi_full_duplex_communication(spi_obj, tx_buf, rx_buf)"""
tx_buf = b"\x08\x06\x04\x02\x07\x05\x03\x01"
rx_buf = bytearray(8)
spi_full_duplex_communication(spi_obj, tx_buf, rx_buf)
99 changes: 68 additions & 31 deletions tests/psoc6/hw_ext/spi_slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,45 @@
import os

try:
from machine import SPI, SPISlave
from machine import Pin, SPI, SPISlave
except ImportError:
print("SKIP")
raise SystemExit

sck_slave_pin = "P13_2"
mosi_slave_pin = "P13_0"
miso_slave_pin = "P13_1"
ssel_slave_pin = "P13_3"
# Allocate pin based on board
machine = os.uname().machine
if "CY8CPROTO-062-4343W" in machine:
slave_write_notify_pin = "P12_3" # Sends signals when master wants to write data
slave_read_notify_pin = "P12_4" # Polls pin to check if slave is writing data
sck_slave_pin = "P13_2"
mosi_slave_pin = "P13_0"
miso_slave_pin = "P13_1"
ssel_slave_pin = "P13_3"

success = b"1"
failed = b"2"
signal_received = False


def signal_irq(arg):
global signal_received
signal_received = True


pin_out = Pin(slave_write_notify_pin, mode=Pin.OUT, value=True)
pin_in = Pin(slave_read_notify_pin, Pin.IN)
pin_in.irq(handler=signal_irq, trigger=Pin.IRQ_RISING)


def _slave_ready_to_write():
pin_out.value(0)
time.sleep_ms(10)
pin_out.value(1)


def _wait_for_master_signal():
global signal_received
while not signal_received:
pass
signal_received = False


def spi_slave_configure():
Expand All @@ -23,64 +50,74 @@ def spi_slave_configure():
polarity=0,
phase=0,
bits=8,
firstbit=SPI.MSB,
firstbit=SPISlave.MSB,
ssel=ssel_slave_pin,
sck=sck_slave_pin,
mosi=mosi_slave_pin,
miso=miso_slave_pin,
)
print("\n", spi_obj)
return spi_obj


def _verify_test(rx, exp_data):
if rx == exp_data:
print("Successful")
spi_obj.write(success)
spi_obj.write(rx)
else:
print("Failed")
spi_obj.write(failed)


def spi_half_duplex_communication(spi_obj, tx, rx):
print("\n***Half duplex communication ***")
write_byte = "0x12"
write_byte_status = bytearray(1)
print("1) master-->write and slave-->read")
print("\n*** Half duplex communication ***")
write_byte = bytearray(1)

# 1) master-->write and slave-->read
# print("\n1) master-->write and slave-->read")
_wait_for_master_signal()
spi_obj.read(rx)
print("rx: ", rx)
exp_data = tx
_verify_test(rx, exp_data)

# print("rx: ", rx)
# print("slave read successful : ", rx==tx)
_slave_ready_to_write()
spi_obj.write(rx)

# 2) slave-->write and master-->read"
print("2) slave-->write and master-->read using readinto()")
spi_obj.write(tx)
# print("\n2) slave-->write and master-->read using readinto()")
tx_buf = b"\x01\x03\x05\x07\x02\x04\x06\x08"
_slave_ready_to_write()
spi_obj.write(tx_buf)

# 3) slave-->write and master-->read by sending a write_byte=0x12"
print("3) slave-->write and master-->read using readinto by sending out a write_byte=0x12")
print("\n3) slave-->write and master-->read using readinto by sending out a write_byte=0x12")
tx = b"\x08\x06\x04\x02\x08\x06\x04\x02"
spi_obj.read(write_byte)
print("write_byte: ", write_byte)
spi_obj.write(tx)
spi_obj.read(write_byte_status)
_verify_test(write_byte_status, b"0x12")
print("OUT")

# _slave_ready_to_write()


def spi_full_duplex_communication(spi_obj, tx, rx):
print("*** Full duplex communication ***")
exp_rx = b"\x06\x08\x05\x07"
print("1) master-->write and slave-->read continuously")
spi_obj.write_readinto(tx, rx)
exp_rx = b"\x08\x06\x04\x02\x07\x05\x03\x01"
print("\n1) master-->write and slave-->read continuously")
spi_obj.write(tx)
# spi_obj.write_readinto(tx, rx)
spi_obj.read(rx)
print(rx)
# _verify_test("")


print("*** SPI SLAVE INSTANCE ***")
print("\n*** SPI SLAVE INSTANCE ***")

spi_obj = spi_slave_configure()

tx_buf = b"\x08\x01\x08\x02"
rx_buf = bytearray(4)
tx_buf = b"\x08\x06\x04\x02\x08\x06\x04\x02"
rx_buf = bytearray(8)
spi_half_duplex_communication(spi_obj, tx_buf, rx_buf)

"""tx_buf = b"\x06\x06\x05\x05"
rx_buf = bytearray(4)
spi_full_duplex_communication(spi_obj, tx_buf, rx_buf)"""
tx_buf = b"\x06\x06\x05\x05\x06\x06\x05\x05"
rx_buf = bytearray(8)
spi_full_duplex_communication(spi_obj, tx_buf, rx_buf)

0 comments on commit 236f1dd

Please sign in to comment.