Skip to content

Commit

Permalink
put native bytes into queue instead of bitstring objects
Browse files Browse the repository at this point in the history
  • Loading branch information
phildong committed Nov 2, 2023
1 parent a91de46 commit dcb88a3
Showing 1 changed file with 39 additions and 34 deletions.
73 changes: 39 additions & 34 deletions miniscope_io/stream_daq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import os
import sys
import time
from datetime import datetime
import warnings
from typing import Literal, Tuple, Any, Optional
from datetime import datetime
from typing import Any, Literal, Optional, Tuple

import coloredlogs
import cv2
Expand All @@ -17,7 +17,7 @@

# Parsers for daq inputs
daqParser = argparse.ArgumentParser("stream_image_capture")
daqParser.add_argument("source", help="Input source; [\"UART\", \"OK\"]")
daqParser.add_argument("source", help='Input source; ["UART", "OK"]')
daqParser.add_argument("--port", help="serial port: string")
daqParser.add_argument("--baudrate", help="baudrate: int")

Expand All @@ -28,6 +28,7 @@
updateDeviceParser.add_argument("module", help="module to update")
updateDeviceParser.add_argument("value", help="LED value")


class MetadataHeaderFormat(BaseModel):
"""
Positions for header fields returned by :meth:`.okDev.readData`
Expand All @@ -46,13 +47,15 @@ class MetadataHeaderFormat(BaseModel):
eg. :class:`~miniscope_io.sdcard.BufferHeaderPositions` and :class:`~miniscope_io.sdcard.DataHeader`
"""

linked_list: Tuple[int, int] = (0, 32)
frame_num: Tuple[int, int] = (32, 64)
buffer_count: Tuple[int, int] = (64, 96)
frame_buffer_count: Tuple[int, int] = (96, 128)
timestamp: Tuple[int,int] = (192, 224)
timestamp: Tuple[int, int] = (192, 224)
pixel_count: Tuple[int, int] = (224, 256)


class MetadataHeader(BaseModel):
"""
Container for FPGA header data, structured by :class:`.MetadataHeaderFormat`
Expand All @@ -61,6 +64,7 @@ class MetadataHeader(BaseModel):
Assuming these are all ints for now like they are in the SDCard Headers, fixmeplz -jonny
"""

linked_list: Any
"""
Not sure what this is!
Expand All @@ -81,6 +85,7 @@ class stream_daq:
Phil/Takuya - docstrings for stream daq: what devices these correspond to, how to configure them, usage examples, tests
"""

def __init__(
self,
frame_width: int = 304,
Expand All @@ -94,7 +99,7 @@ def __init__(
):
self.frame_width = frame_width
self.frame_height = frame_height
self.preamble = Bits(preamble)
self.preamble = preamble
self.header_fmt = header_fmt
self.header_len = header_len * 32
if LSB:
Expand All @@ -106,16 +111,14 @@ def __init__(
self.nbuffer_per_fm = len(self.buffer_npix)
self.pix_depth = pix_depth

def _parse_header(self,
buffer: BitArray,
truncate: Literal['preamble', 'header', False] = False
) -> Tuple[MetadataHeader, BitArray]:

pre_len = len(self.preamble)
def _parse_header(
self, buffer: BitArray, truncate: Literal["preamble", "header", False] = False
) -> Tuple[MetadataHeader, BitArray]:
pre = Bits(self.preamble)
if self.LSB:
assert buffer[:pre_len][::-1] == self.preamble
else:
assert buffer[:pre_len] == self.preamble
pre = pre[::-1]
pre_len = len(pre)
assert buffer[:pre_len] == pre
header_data = dict()
for hd, bit_range in self.header_fmt.model_dump().items():
b = buffer[pre_len + bit_range[0] : pre_len + bit_range[1]]
Expand Down Expand Up @@ -163,23 +166,22 @@ def _uart_recv(self, serial_buffer_queue, comport: str, baudrate: int):
uart_bites = serial_port.read_until(pre_bytes)
log_uart_buffer = BitArray([x for x in uart_bites])


while 1:
# read UART data until preamble and put into queue
uart_bites = serial_port.read_until(pre_bytes)
log_uart_buffer = ([x for x in uart_bites])
log_uart_buffer = [x for x in uart_bites]
serial_buffer_queue.put(log_uart_buffer)

time.sleep(1) # time for ending other process
serial_port.close()
print("Close serial port")
sys.exit(1)

def _fpga_recv(
self, serial_buffer_queue, read_length=None, pre_first=True
):
def _fpga_recv(self, serial_buffer_queue, read_length=None, pre_first=True):
if not HAVE_OK:
raise RuntimeError('Couldnt import OpalKelly device. Check the docs for install instructions!')
raise RuntimeError(
"Couldnt import OpalKelly device. Check the docs for install instructions!"
)
# determine length
if read_length is None:
read_length = int(max(self.buffer_npix) * self.pix_depth / 8 / 16) * 16
Expand Down Expand Up @@ -210,7 +212,9 @@ def _fpga_recv(
dev.setWire(0x00, 0b0)
# read loop
cur_buffer = BitArray()
pre = Bits(self.preamble)[::-1]
pre = Bits(self.preamble)
if self.LSB:
pre = pre[::-1]
while True:
buf = dev.readData(read_length)
dat = BitArray(buf)
Expand All @@ -219,7 +223,7 @@ def _fpga_recv(
for buf_start, buf_stop in zip(pre_pos[:-1], pre_pos[1:]):
if not pre_first:
buf_start, buf_stop = buf_start + len(pre), buf_stop + len(pre)
serial_buffer_queue.put(cur_buffer[buf_start:buf_stop])
serial_buffer_queue.put(cur_buffer[buf_start:buf_stop].tobytes())
cur_buffer = cur_buffer[pre_pos[-1] :]

# Pull out data buffers from serial_buffer_queue
Expand Down Expand Up @@ -250,7 +254,9 @@ def _buffer_to_frame(self, serial_buffer_queue, frame_buffer_queue):
if (
serial_buffer_queue.qsize() > 0
): # Higher is safe but lower should be faster.
serial_buffer = serial_buffer_queue.get() # grab one buffer from queue
serial_buffer = Bits(
serial_buffer_queue.get()
) # grab one buffer from queue

header_data, serial_buffer = self._parse_header(serial_buffer)

Expand All @@ -273,7 +279,7 @@ def _buffer_to_frame(self, serial_buffer_queue, frame_buffer_queue):
cur_fm_buffer_index = header_data.frame_buffer_count

# update data
frame_buffer[cur_fm_buffer_index] = serial_buffer
frame_buffer[cur_fm_buffer_index] = serial_buffer.tobytes()

if cur_fm_buffer_index != 0:
locallogs.warning(
Expand All @@ -288,7 +294,7 @@ def _buffer_to_frame(self, serial_buffer_queue, frame_buffer_queue):
and header_data.frame_buffer_count > cur_fm_buffer_index
):
cur_fm_buffer_index = header_data.frame_buffer_count
frame_buffer[cur_fm_buffer_index] = serial_buffer
frame_buffer[cur_fm_buffer_index] = serial_buffer.tobytes()
locallogs.debug(
"----buffer #" + str(cur_fm_buffer_index) + " stored"
)
Expand Down Expand Up @@ -326,7 +332,7 @@ def _format_frame(self, frame_buffer_queue, imagearray):
for i, npix_expected in enumerate(self.buffer_npix):
if frame_data[i] is not None:
header_data, fm_dat = self._parse_header(
frame_data[i], truncate="header"
Bits(frame_data[i]), truncate="header"
)
else:
frame_data[i] = Bits(
Expand Down Expand Up @@ -371,22 +377,20 @@ def _format_frame(self, frame_buffer_queue, imagearray):
"uint:32",
[
pixel_vector[i : i + 32][::-1].uint
for i in range(0, len(pixel_vector, 32))
for i in range(0, len(pixel_vector), 32)
],
)
img = np.frombuffer(pixel_vector.tobytes(), dtype=np.uint8)
imagearray.put(img)

locallogs.info(
"frame: {}, bits lost: {}".format(
header_data.frame_num, nbit_lost
)
"frame: {}, bits lost: {}".format(header_data.frame_num, nbit_lost)
)

# COM port should probably be automatically found but not sure yet how to distinguish with other devices.
def capture(
self,
source: Literal['uart', 'fpga'],
source: Literal["uart", "fpga"],
comport: str = "COM3",
baudrate: int = 1200000,
mode: str = "DEBUG",
Expand Down Expand Up @@ -445,7 +449,7 @@ def capture(
),
)
else:
raise ValueError(f'source can be one of uart or fpga. Got {source}')
raise ValueError(f"source can be one of uart or fpga. Got {source}")

p_buffer_to_frame = multiprocessing.Process(
target=self._buffer_to_frame,
Expand Down Expand Up @@ -622,7 +626,6 @@ def main():
args = daqParser.parse_args()

if args.source == "UART":

try:
assert len(vars(args)) == 3
except AssertionError as msg:
Expand All @@ -647,11 +650,13 @@ def main():
HAVE_OK = False
try:
from miniscope_io.devices.opalkelly import okDev

HAVE_OK = True
except (ImportError, ModuleNotFoundError) as e:
warnings.warn(f'Cannot import OpalKelly device, got exception {e}')
warnings.warn(f"Cannot import OpalKelly device, got exception {e}")

daq_inst.capture(source="fpga")


if __name__ == "__main__":
main()

0 comments on commit dcb88a3

Please sign in to comment.