Skip to content

Commit

Permalink
process termination, minor bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
t-sasatani committed Jun 25, 2024
1 parent c510c1c commit 7faedb5
Showing 1 changed file with 28 additions and 5 deletions.
33 changes: 28 additions & 5 deletions miniscope_io/stream_daq.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def _init_okdev(self, BIT_FILE: Path) -> Union[okDev, okDevMock]:
if os.environ.get("PYTEST_CURRENT_TEST") or os.environ.get("STREAMDAQ_MOCKRUN"):
dev = okDevMock()
else:
okDev()
dev = okDev()

dev.uploadBit(str(BIT_FILE))
dev.setWire(0x00, 0b0010)
Expand Down Expand Up @@ -342,6 +342,8 @@ def _buffer_to_frame(

try:
for serial_buffer in exact_iter(serial_buffer_queue.get, None):
if self.terminate.is_set():
break

header_data, serial_buffer = self._parse_header(serial_buffer)
serial_buffer = self._trim(
Expand Down Expand Up @@ -416,6 +418,9 @@ def _format_frame(
locallogs = init_logger("streamDaq.frame")
try:
for frame_data in exact_iter(frame_buffer_queue.get, None):
if self.terminate.is_set():
break

locallogs.debug("Found frame in queue")
if len(frame_data) == 0:
continue
Expand Down Expand Up @@ -521,6 +526,7 @@ def capture(
p_recv = multiprocessing.Process(
target=self._fpga_recv,
args=(serial_buffer_queue, read_length, True, binary),
name="_fpga_recv"
)
else:
raise ValueError(f"source can be one of uart or fpga. Got {source}")
Expand All @@ -540,13 +546,15 @@ def capture(
serial_buffer_queue,
frame_buffer_queue,
),
name="_buffer_to_frame"
)
p_format_frame = multiprocessing.Process(
target=self._format_frame,
args=(
frame_buffer_queue,
imagearray,
),
name="_format_frame"
)
"""
p_terminate = multiprocessing.Process(
Expand All @@ -560,21 +568,36 @@ def capture(
for image in exact_iter(imagearray.get, None):
if self.config.show_video is True:
cv2.imshow("image", image)
cv2.waitKey(1) #needed for updating interactive window. delay 1 ms for now.
if cv2.waitKey(1) == 27: # get out with ESC key
self.terminate.set()
break
if writer:
picture = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) # If your image is grayscale
writer.write(picture)
except KeyboardInterrupt:
self.terminate.set()
except Exception as e:
self.logger.exception(f"Error during capture: {e}")
self.terminate.set()
finally:
if writer:
writer.release()
self.logger.debug("VideoWriter released")
if self.config.show_video:
cv2.destroyAllWindows()

self.logger.debug("End capture")

cv2.waitKey(100)

# Give some time for processes to clean up
time.sleep(1)

# Join child processes with a timeout
for p in [p_recv, p_buffer_to_frame, p_format_frame]:
p.join(timeout=2)
if p.is_alive():
self.logger.warning(f"Process {p.name} did not terminate in time and will be terminated forcefully.")
p.terminate()
p.join()
self.logger.info("Child processes joined. End capture.")

def main() -> None: # noqa: D103
args = daqParser.parse_args()
Expand Down

0 comments on commit 7faedb5

Please sign in to comment.