Skip to content

Commit

Permalink
add documentation to fpga-related functions
Browse files Browse the repository at this point in the history
  • Loading branch information
phildong committed Dec 20, 2023
1 parent 0176dc0 commit 95d75bb
Showing 1 changed file with 144 additions and 13 deletions.
157 changes: 144 additions & 13 deletions miniscope_io/stream_daq.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@

class MetadataHeaderFormat(BaseModel):
"""
Positions for header fields returned by :meth:`.okDev.readData`
Format model used to parse header at the beginning of every buffer.
The model attributes are key-value pairs mapping the variable/information in the header to their corresponding position (in bits) in the header.
.. todo::
Expand Down Expand Up @@ -60,9 +62,6 @@ class MetadataHeader(BaseModel):
"""
Container for FPGA header data, structured by :class:`.MetadataHeaderFormat`
.. note::
Assuming these are all ints for now like they are in the SDCard Headers, fixmeplz -jonny
"""

linked_list: Any
Expand Down Expand Up @@ -97,6 +96,38 @@ def __init__(
buffer_npix: Tuple[int] = (20432, 20432, 20432, 20432, 10688),
pix_depth: int = 8,
) -> None:
"""
Constructer for the class.
Currently supports UART and FPGA source.
Parameters
----------
frame_width : int, optional
Width of the frame (in pixels), by default 304.
frame_height : int, optional
Height of the frame (in pixels), by default 304.
preamble : bytes, optional
Preamble string at the beginning of every buffer header, by default b"\x12\x34\x56".
header_fmt : MetadataHeaderFormat, optional
Header format used to parse information from buffer header, by default `MetadataHeaderFormat()`.
header_len : int, optional
Length of header in (32-bit) words, by default 11.
This is useful when not all the variable/words in the header are defined in :class:`.MetadataHeaderFormat`.
The user is responsible to ensure that `header_len * 32` is larger than the largest bit position defined in :class:`.MetadataHeaderFormat` otherwise unexpected behavior might occur.
LSB : bool, optional
Whether the sourse is in "LSB" mode or not, by default True.
If `not LSB`, then the incoming bitstream is expected to be in Most Significant Bit first mode and data are transmitted in normal order.
If `LSB`, then the incoming bitstream is in the format that each 32-bit words are bit-wise reversed on its own.
Furthermore, the order of 32-bit words in the pixel data within the buffer is reversed (but the order of words in the header is preserved).
Note that this format does not correspond to the usual LSB-first convention and the parameter name is chosen for the lack of better words.
buffer_npix : Tuple[int], optional
A tuple defining how pixels within a single frame is split across multiple buffers, by default (20432, 20432, 20432, 20432, 10688).
Each number in the tuple represents how many pixels are contained in each buffer.
The length of the tuple represents the number of buffers a frame is split across.
pix_depth : int, optional
Bit-depth of each pixel, by default 8.
"""
self.frame_width = frame_width
self.frame_height = frame_height
self.preamble = preamble
Expand All @@ -107,13 +138,33 @@ def __init__(
else:
self.LSB = False
self.buffer_npix = buffer_npix
assert frame_height * frame_width == sum(self.buffer_npix)
assert frame_height * frame_width == sum(
self.buffer_npix
), "Number of pixels defined by frame width and height must agree with total number of pixels in `buffer_npix`!"
self.nbuffer_per_fm = len(self.buffer_npix)
self.pix_depth = pix_depth

def _parse_header(
self, buffer: BitArray, truncate: Literal["preamble", "header", False] = False
) -> Tuple[MetadataHeader, BitArray]:
self, buffer: bytes, truncate: Literal["preamble", "header", False] = False
) -> Tuple[MetadataHeader, bytes]:
"""
Function to parse header from each buffer.
Parameters
----------
buffer : bytes
Input buffer.
truncate : Literal[preamble, header, False], optional
Whether the parsed header should be truncated from the returned buffer.
If `"preamble"`, then only the preamble is truncated.
If `"header"`, then the full header is truncated.
If `False`, then `buffer` is returned untouched.
Returns
-------
Tuple[MetadataHeader, bytes]
The returned header data and (optionally truncated) buffer data.
"""
pre = Bits(self.preamble)
if self.LSB:
pre = pre[::-1]
Expand All @@ -136,8 +187,21 @@ def _parse_header(
else:
return header_data, buffer

# Receive buffers and push into serial_buffer_queue
def _uart_recv(self, serial_buffer_queue, comport: str, baudrate: int):
def _uart_recv(
self, serial_buffer_queue: multiprocessing.Queue, comport: str, baudrate: int
):
"""
Receive buffers and push into serial_buffer_queue
Parameters
----------
serial_buffer_queue : multiprocessing.Queue
_description_
comport : str
_description_
baudrate : int
_description_
"""
# set up logger
locallogs = logging.getLogger(__name__)
locallogs.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -183,6 +247,28 @@ def _fpga_recv(
read_length: int = None,
pre_first: bool = True,
) -> None:
"""
Function to read bitstream from OpalKelly device and store buffer in `serial_buffer_queue`.
The bits data are read in fixed chunks defined by `read_length`.
Then we concatenate the chunks and try to look for `self.preamble` in the data.
The data between every pair of `self.preamble` is considered to be a single buffer and stored in `serial_buffer_queue`.
Parameters
----------
serial_buffer_queue : multiprocessing.Queue[bytes]
The queue holding the buffer data.
read_length : int, optional
Length of data to read in chunks (in number of bytes), by default None.
If `None`, an optimal length is estimated so that it roughly covers a single buffer and is an integer multiple of 16 bytes (as recommended by OpalKelly).
pre_first : bool, optional
Whether preamble/header is returned at the beginning of each buffer, by default True.
Raises
------
RuntimeError
If the OpalKelly device library cannot be found
"""
if not HAVE_OK:
raise RuntimeError(
"Couldnt import OpalKelly device. Check the docs for install instructions!"
Expand Down Expand Up @@ -231,13 +317,25 @@ def _fpga_recv(
serial_buffer_queue.put(cur_buffer[buf_start:buf_stop].tobytes())
cur_buffer = cur_buffer[pre_pos[-1] :]

# Pull out data buffers from serial_buffer_queue
# Make a list of buffers forming a frame and push into frame_buffer_queue
def _buffer_to_frame(
self,
serial_buffer_queue: multiprocessing.Queue[bytes],
frame_buffer_queue: multiprocessing.Queue[bytes],
frame_buffer_queue: multiprocessing.Queue[list[bytes]],
):
"""
Group buffers together to make frames.
Pull out buffers in `serial_buffer_queue`, then get frame and buffer index by parsing headers in the buffer.
The buffers belonging to the same frame are put in the same list at corresponding buffer index.
The lists representing each frame are then put into `frame_buffer_queue`.
Parameters
----------
serial_buffer_queue : multiprocessing.Queue[bytes]
Input buffer queue.
frame_buffer_queue : multiprocessing.Queue[list[bytes]]
Output frame queue.
"""
# set up logger
locallogs = logging.getLogger(__name__)
locallogs.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -314,9 +412,23 @@ def _buffer_to_frame(

def _format_frame(
self,
frame_buffer_queue: multiprocessing.Queue[bytes],
frame_buffer_queue: multiprocessing.Queue[list[bytes]],
imagearray: multiprocessing.Queue[np.ndarray],
):
"""
Construct frame from grouped buffers.
Each frame data is concatenated from a list of buffers in `frame_buffer_queue` according to `buffer_npix`.
If there is any mismatch between the expected length of each buffer (defined by `buffer_npix`) and the actual length, then the buffer is either truncated or zero-padded at the end to make the length appropriate, and a warning is thrown.
Finally, the concatenated buffer data are converted into a 1d numpy array with uint8 dtype and put into `imagearray` queue.
Parameters
----------
frame_buffer_queue : multiprocessing.Queue[list[bytes]]
Input buffer queue.
imagearray : multiprocessing.Queue[np.ndarray]
Output image array queue.
"""
locallogs = logging.getLogger(__name__)
locallogs.setLevel(logging.DEBUG)

Expand Down Expand Up @@ -407,6 +519,25 @@ def capture(
baudrate: int = 1200000,
read_length: Optional[int] = None,
):
"""
Entry point to start frame capture.
Parameters
----------
source : Literal[uart, fpga]
Device source.
comport : str, optional
Passed to :function:`~miniscope_io.stream_daq.stream_daq._uart_recv` when `source == "uart"`, by default "COM3".
baudrate : int, optional
Passed to :function:`~miniscope_io.stream_daq.stream_daq._uart_recv` when `source == "uart"`, by default 1200000.
read_length : Optional[int], optional
Passed to :function:`~miniscope_io.stream_daq.stream_daq._fpga_recv` when `source == "fpga"`, by default None.
Raises
------
ValueError
If `source` is not in `("uart", "fpga")`.
"""
logdirectories = [
"log",
"log/uart_recv",
Expand Down

0 comments on commit 95d75bb

Please sign in to comment.