Skip to content

Commit

Permalink
tests/psoc6/hw_ext/multi_blocking: WIP i2s tests.
Browse files Browse the repository at this point in the history
Signed-off-by: enriquezgarc <enriquezgarcia.external@infineon.com>
  • Loading branch information
jaenrig-ifx committed Apr 23, 2024
1 parent 183a1d7 commit e81e918
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 117 deletions.
176 changes: 96 additions & 80 deletions tests/psoc6/hw_ext/multi_blocking/i2s_rx.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from machine import I2S
from machine import I2S, Pin
import binascii
import time
import array
Expand All @@ -11,66 +11,53 @@
sck_rx_pin = "P5_4"
ws_rx_pin = "P5_5"
sd_rx_pin = "P5_6"
# while extmod/refactoring
# print("SKIP")
# raise SystemExit
rx_ready_signal_pin_name = "P13_5"
elif "CY8CPROTO-063-BLE" in machine:
# This would be the right pins for this test, but unfortunately
# These would be the right pins for this test, but unfortunately
# the P5_1 is allocated for the UART serial comm terminal communication.
# So this tests is not currently possible for this board.
# Thus this tests is not currently possible for this board.
# sck_rx_pin = "P5_4"
# ws_rx_pin = "P5_5"
# sd_rx_pin = "P5_6"
print("SKIP")
raise SystemExit

test_rates = [8000, 16000, 32000, 48000, 22050, 44100]
test_formats = [I2S.MONO, I2S.STEREO]
test_bits = [16, 32]
test_bits_resolution = [8, 12, 16, 20, 32]
rx_ready_signal_pin = None

rx_buf = bytearray([0] * 2560)

# test_data_mono
# test_data_stereo
# test_data_resolution_not_channel_width
#

# for each test values
# create_instance
# Trigger sending from other board
# read data
# validate data received
# deinit instance

audio_in = I2S(
0,
sck=sck_rx_pin,
ws=ws_rx_pin,
sd=sd_rx_pin,
mode=I2S.RX,
bits=16,
format=I2S.STEREO,
rate=16000,
ibuf=20000,
)

# while True:
num_read = audio_in.readinto(rx_buf)
print(num_read)
# print(rx_buf)


def count_occurences(rx_buf, num_read):
i = 0
same_value_count = 0
while i < (num_read - 1):
if rx_buf[i] == rx_buf[i + 1]:
same_value_count += 1
else:
print(f"Value {hex(rx_buf[i])} sequence length is {same_value_count}")
same_value_count = 1
i += 1

def rx_ready_signal_init():
global rx_ready_signal_pin
rx_ready_signal_pin = Pin(rx_ready_signal_pin_name, Pin.OUT, value=0)
rx_ready_signal_pin.low()


def rx_ready_signal_deinit():
global rx_ready_signal_pin
rx_ready_signal_pin.deinit()


def notify_readiness_to_tx():
global rx_ready_signal_pin
rx_ready_signal_pin.high()
time.sleep(0.5)
rx_ready_signal_pin.low()


def make_expected_sequence():
len = 0xFF
expected_frame = bytearray(len)
for i in range(len):
expected_frame[i] = i

return expected_frame


def find_sublist_in_list(full_list, sublist):
for i in range(len(full_list) - len(sublist) - 1):
if full_list[i : i + len(sublist)] == sublist:
return True

return False


def print_i2s_format(raw_buf, bits):
Expand Down Expand Up @@ -104,36 +91,65 @@ def print_i2s_format(raw_buf, bits):
print(s_tuple)


# print_i2s_format(rx_buf, 16)


def make_expected_sequence():
len = 0xFF
expected_frame = bytearray(len)
for i in range(len):
expected_frame[i] = i

return expected_frame


def find_sublist_in_list(full_list, sublist):
for i in range(len(full_list) - len(sublist) - 1):
if full_list[i : i + len(sublist)] == sublist:
return True

return False


exp_seq = make_expected_sequence()

print("Frame sequence detected : ", find_sublist_in_list(rx_buf, exp_seq))

test_rates = [8000, 16000, 32000, 48000] # 22050, 44100]
test_formats = [I2S.MONO, I2S.STEREO]
test_bits = [16, 32]
test_bits_resolution = [8, 12, 16, 20, 32]

# rx_ascii = binascii.hexlify(rx_buf)
# print(f'rx_buf = {rx_ascii}')
# _format = I2S.STEREO
# _bits = 16
# _rate = 8000
###############################################################################

rx_ready_signal_init()

for _format in test_formats:
for _bits in test_bits:
for _rate in test_rates:
audio_in = I2S(
0,
sck=sck_rx_pin,
ws=ws_rx_pin,
sd=sd_rx_pin,
mode=I2S.RX,
bits=_bits,
format=_format,
rate=_rate,
ibuf=20000,
)
rx_buf = bytearray([0] * 2560)

notify_readiness_to_tx()

# Read i2s data
num_read = audio_in.readinto(rx_buf)
audio_in.deinit()

# validate data
exp_seq = make_expected_sequence()
is_seq_received = find_sublist_in_list(rx_buf, exp_seq)
print(
f"data received for format = {_format}, bits = {_bits}, rate = {_rate} : {is_seq_received}"
)
# print_i2s_format(rx_buf, 16)

notify_readiness_to_tx()

# time.sleep(1)

rx_ready_signal_deinit()

# def count_occurences(rx_buf, num_read):
# i = 0
# same_value_count = 0
# while i < (num_read - 1):
# if rx_buf[i] == rx_buf[i + 1]:
# same_value_count += 1
# else:
# print(f"Value {hex(rx_buf[i])} sequence length is {same_value_count}")
# same_value_count = 1
# i += 1

# time.sleep(5)
# audio_in.deinit()

# tx_done = False
# def tx_complete_irq(obj):
Expand Down
31 changes: 27 additions & 4 deletions tests/psoc6/hw_ext/multi_blocking/i2s_rx.py.exp
Original file line number Diff line number Diff line change
@@ -1,4 +1,27 @@
tx Buffer
b'\x01\x00\x17\x15\x16D'
Rx Buffer
bytearray(b'\x00\x00\x00\x00\x01\x00\x17\x15\x16D')
data received for format = 0, bits = 16, rate = 8000 : True
data received for format = 0, bits = 16, rate = 16000 : True
data received for format = 0, bits = 16, rate = 32000 : True
data received for format = 0, bits = 16, rate = 48000 : True
data received for format = 0, bits = 16, rate = 22050 : True
data received for format = 0, bits = 16, rate = 44100 : True

data received for format = 0, bits = 32, rate = 8000 : True
data received for format = 0, bits = 32, rate = 16000 : True
data received for format = 0, bits = 32, rate = 32000 : True
data received for format = 0, bits = 32, rate = 48000 : True
data received for format = 0, bits = 32, rate = 22050 : True
data received for format = 0, bits = 32, rate = 44100 : True

data received for format = 1, bits = 16, rate = 8000 : True
data received for format = 1, bits = 16, rate = 16000 : True
data received for format = 1, bits = 16, rate = 32000 : True
data received for format = 1, bits = 16, rate = 48000 : True
data received for format = 1, bits = 16, rate = 22050 : True
data received for format = 1, bits = 16, rate = 44100 : True

data received for format = 1, bits = 32, rate = 8000 : True
data received for format = 1, bits = 32, rate = 16000 : True
data received for format = 1, bits = 32, rate = 32000 : True
data received for format = 1, bits = 32, rate = 48000 : True
data received for format = 1, bits = 32, rate = 22050 : True
data received for format = 1, bits = 32, rate = 44100 : True
112 changes: 79 additions & 33 deletions tests/psoc6/hw_ext/multi_blocking/i2s_tx.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from machine import I2S
from machine import I2S, Pin
import binascii
import time
import array
Expand All @@ -11,6 +11,7 @@
sck_tx_pin = "P13_1"
ws_tx_pin = "P13_2"
sd_tx_pin = "P13_3"
wait_signal_pin_name = "P13_5"
# while extmod/refactoring
# print("SKIP")
# raise SystemExit
Expand All @@ -24,22 +25,48 @@
print("SKIP")
raise SystemExit

signal_received = False
wait_signal_pin = None

# test_rates = [8000, 16000, 32000, 48000, 22050, 44100]
# test_formats = [I2S.MONO, I2S.STEREO]
# test_bits = [16,32]
# test_bits_resolution = [8,12,16,20,32]

# test_data_mono
# test_data_stereo
# test_data_resolution_not_channel_width
#
def signal_irq(arg):
global signal_received
signal_received = True

# for each test values
# wait for trigger
# create_instance
# send multiple times the data
# deinit instance

# Initialize wait_signal pin
def wait_signal_init():
global wait_signal_pin
wait_signal_pin = Pin(wait_signal_pin_name, Pin.IN)
wait_signal_pin.irq(handler=signal_irq, trigger=Pin.IRQ_RISING)


def wait_signal_deinit():
global wait_signal_pin
wait_signal_pin.deinit()


def is_rx_done():
# The second interrupt notifies rx completion
global signal_received

if signal_received:
# Clear flag
signal_received = False
print("rx done")
return True
else:
return False


def wait_for_rx_ready():
global signal_received

while not signal_received:
pass

signal_received = False
print("rx ready")


def make_i2s_tx_frame_detect_pattern():
Expand Down Expand Up @@ -123,22 +150,41 @@ def make_i2s_tx_data_pattern():


buf = make_i2s_tx_frame_detect_pattern()
# print_i2s_format(buf,16)

# buf = make_i2s_tx_data_pattern()

audio_out = I2S(
0,
sck=sck_tx_pin,
ws=ws_tx_pin,
sd=sd_tx_pin,
mode=I2S.TX,
bits=16,
format=I2S.STEREO,
rate=16000,
ibuf=20000,
)

while True:
num_written = audio_out.write(buf)
print(num_written)

test_rates = [8000, 16000, 32000, 48000] # 22050, 44100]
test_formats = [I2S.MONO, I2S.STEREO]
test_bits = [16, 32]
test_bits_resolution = [8, 12, 16, 20, 32]

# _format = I2S.STEREO
# _bits = 16
# _rate = 8000
###############################################################################

wait_signal_init()

for _format in test_formats:
for _bits in test_bits:
for _rate in test_rates:
wait_for_rx_ready()

audio_out = I2S(
0,
sck=sck_tx_pin,
ws=ws_tx_pin,
sd=sd_tx_pin,
mode=I2S.TX,
bits=_bits,
format=_format,
rate=_rate,
ibuf=20000,
)

while not is_rx_done():
num_written = audio_out.write(buf)

# wait_for_rx_ready()

audio_out.deinit()

wait_signal_deinit()

0 comments on commit e81e918

Please sign in to comment.