Skip to content

Commit

Permalink
iterator based loops
Browse files Browse the repository at this point in the history
  • Loading branch information
sneakers-the-rat committed Jun 22, 2024
1 parent 58a2ebc commit b9138dd
Showing 1 changed file with 57 additions and 65 deletions.
122 changes: 57 additions & 65 deletions miniscope_io/stream_daq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys
import time
from pathlib import Path
from typing import List, Literal, Optional, Tuple
from typing import Any, Callable, Generator, List, Literal, Optional, Tuple, Union

import cv2
import numpy as np
Expand Down Expand Up @@ -41,6 +41,19 @@
daqParser.add_argument("-c", "--config", help="YAML config file path: string")


def exact_iter(f: Callable, sentinel: Any) -> Generator[Any, None, None]:
"""
A version of :func:`iter` that compares with `is` rather than `==`
because truth value of numpy arrays is ambiguous.
"""
while True:
val = f()
if val is sentinel:
break
else:
yield val


class StreamDaq:
"""
A combined class for configuring and reading frames from a UART and FPGA source.
Expand Down Expand Up @@ -88,7 +101,7 @@ def __init__(
self.preamble = self.config.preamble
self._buffer_npix: Optional[List[int]] = None
self._nbuffer_per_fm: Optional[int] = None
self.terminate = multiprocessing.Value("b", False)
self.terminate: multiprocessing.Event = multiprocessing.Event()

@property
def buffer_npix(self) -> List[int]:
Expand Down Expand Up @@ -193,6 +206,20 @@ def _uart_recv(
self.logger.info("Close serial port")
sys.exit(1)

def _init_okdev(self, BIT_FILE: Path) -> Union[okDev, okDevMock]:

# FIXME: when multiprocessing bug resolved, remove this and just mock in tests
dev = okDevMock() if os.environ.get("PYTEST_CURRENT_TEST") is not None else okDev()

dev.uploadBit(str(BIT_FILE))
dev.setWire(0x00, 0b0010)
time.sleep(0.01)
dev.setWire(0x00, 0b0)
dev.setWire(0x00, 0b1000)
time.sleep(0.01)
dev.setWire(0x00, 0b0)
return dev

def _fpga_recv(
self,
serial_buffer_queue: multiprocessing.Queue,
Expand Down Expand Up @@ -240,26 +267,20 @@ def _fpga_recv(
raise RuntimeError(f"Configured to use bitfile at {BIT_FILE} but no such file exists")
# set up fpga devices

# FIXME: when multiprocessing bug resolved, remove this and just mock in tests
dev = okDevMock() if os.environ.get("PYTEST_CURRENT_TEST") is not None else okDev()
dev = self._init_okdev(BIT_FILE)

dev.uploadBit(str(BIT_FILE))
dev.setWire(0x00, 0b0010)
time.sleep(0.01)
dev.setWire(0x00, 0b0)
dev.setWire(0x00, 0b1000)
time.sleep(0.01)
dev.setWire(0x00, 0b0)
# read loop
cur_buffer = BitArray()
pre = Bits(self.preamble)
if self.config.LSB:
pre = pre[::-1]
while True:

while not self.terminate.is_set():
try:
buf = dev.readData(read_length)
except (EndOfRecordingException, StreamReadError):
self.terminate.value = True
except (EndOfRecordingException, StreamReadError, KeyboardInterrupt):
self.terminate.set()
serial_buffer_queue.put(None)
break

if capture_binary:
Expand Down Expand Up @@ -306,9 +327,9 @@ def _buffer_to_frame(

frame_buffer = [None] * self.nbuffer_per_fm

while 1:
if serial_buffer_queue.qsize() > 0: # Higher is safe but lower should be faster.
serial_buffer = Bits(serial_buffer_queue.get()) # grab one buffer from queue
try:
for serial_buffer in exact_iter(serial_buffer_queue.get, None):
serial_buffer = Bits(serial_buffer)

header_data, serial_buffer = self._parse_header(serial_buffer)

Expand Down Expand Up @@ -350,6 +371,8 @@ def _buffer_to_frame(
# if lost frame from buffer -> reset index
else:
cur_fm_buffer_index = 0
finally:
frame_buffer_queue.put(None)

def _format_frame(
self,
Expand Down Expand Up @@ -377,12 +400,10 @@ def _format_frame(
"""
locallogs = init_logger("streamDaq.frame")
header_data = None

while 1:
if frame_buffer_queue.qsize() > 0: # Higher is safe but lower is fast.
try:
for frame_data in exact_iter(frame_buffer_queue.get, None):
locallogs.debug("Found frame in queue")

frame_data = frame_buffer_queue.get() # pixel data for single frame
nbit_lost = 0

for i, npix_expected in enumerate(self.buffer_npix):
Expand Down Expand Up @@ -437,6 +458,8 @@ def _format_frame(

if header_data is not None:
locallogs.info(f"frame: {header_data.frame_num}, bits lost: {nbit_lost}")
finally:
imagearray.put(None)

def init_video(self, path: Path, fourcc: str = "Y800", **kwargs: dict) -> cv2.VideoWriter:
"""
Expand Down Expand Up @@ -487,6 +510,7 @@ def capture(
ValueError
If `source` is not in `("uart", "fpga")`.
"""
self.terminate.clear()

# Queue size is hard coded
queue_manager = multiprocessing.Manager()
Expand Down Expand Up @@ -549,55 +573,23 @@ def capture(
p_format_frame.start()
# p_terminate.start()
try:
while not self.terminate.value:
if imagearray.qsize() > 0:
imagearray_plot = imagearray.get()
image = imagearray_plot.reshape(
self.config.frame_width, self.config.frame_height
)
if self.config.show_video is True:
cv2.imshow("image", image)
if writer:
picture = cv2.cvtColor(
image, cv2.COLOR_GRAY2BGR
) # If your image is grayscale
writer.write(picture)
if cv2.waitKey(1) == 27 and self.config.show_video is True:
cv2.destroyAllWindows()
cv2.waitKey(100)
break # esc to quit
for imagearray_plot in exact_iter(imagearray.get, None):
image = imagearray_plot.reshape(self.config.frame_width, self.config.frame_height)
if self.config.show_video is True:
cv2.imshow("image", image)
if writer:
picture = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) # If your image is grayscale
writer.write(picture)
except KeyboardInterrupt:
self.terminate.set()
finally:
if writer:
writer.release()
self.logger.debug("VideoWriter released")
self.logger.debug("End capture")
if self.config.show_video:
cv2.destroyAllWindows()

while True:
self.logger.debug("[Terminating] uart/fpga_recv()")
p_recv.terminate()
time.sleep(0.1)
if not p_recv.is_alive():
p_recv.join(timeout=1.0)
self.logger.debug("[Terminated] uart/fpga_recv()")
break # watchdog process daemon gets [Terminated]

while True:
self.logger.debug("[Terminating] buffer_to_frame()")
p_buffer_to_frame.terminate()
time.sleep(0.1)
if not p_buffer_to_frame.is_alive():
p_buffer_to_frame.join(timeout=1.0)
self.logger.debug("[Terminated] buffer_to_frame()")
break # watchdog process daemon gets [Terminated]

while True:
self.logger.debug("[Terminating] format_frame()")
p_format_frame.terminate()
time.sleep(0.1)
if not p_format_frame.is_alive():
p_format_frame.join(timeout=1.0)
self.logger.debug("[Terminated] format_frame()")
break # watchdog process daemon gets [Terminated]
self.logger.debug("End capture")


def main() -> None: # noqa: D103
Expand Down

0 comments on commit b9138dd

Please sign in to comment.