Skip to content

Commit

Permalink
clean up sync code
Browse files Browse the repository at this point in the history
  • Loading branch information
mgineer85 committed Nov 1, 2024
1 parent 717f03b commit f9fe9bc
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 81 deletions.
55 changes: 29 additions & 26 deletions node/services/backends/cameras/abstractbackend.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import io
import logging
from abc import ABC, abstractmethod
from queue import Queue
from threading import Barrier, Condition, Event
from dataclasses import dataclass
from threading import Barrier, BrokenBarrierError, Condition, Event

logger = logging.getLogger(__name__)

Expand All @@ -25,27 +25,36 @@ def write(self, buf):
self.condition.notify_all()


@dataclass
class TimestampSet:
"""Set of timestamps that shall be aligned to each other."""

reference: int # in nanoseconds
camera: int # in nanoseconds


class AbstractCameraBackend(ABC):
def __init__(self):
# declare common abstract props
self._nominal_framerate: int = None
self._queue_timestamp_monotonic_ns: Queue = None
self._timestamp_monotonic_ns: int = None
self._event_request_tick: Event = None
self._capture: Event = None
self._capture_in_progress: bool = None

self._barrier = Barrier(3, action=self.get_timestamps)
self._barrier: Barrier = None
self._current_timestampset: TimestampSet = None
self._align_timestampset: TimestampSet = None

def __repr__(self):
return f"{self.__class__}"

def get_timestamps(self):
capture_time_timestamp_ns = self._camera_timestamp_ns or 0
def get_timestamps_to_align(self) -> TimestampSet:
assert self._current_timestampset.reference is not None
assert self._current_timestampset.camera is not None

capture_time_assigned_timestamp_ns = self._timestamp_monotonic_ns or 0
capture_time_assigned_timestamp_ns -= (1.0 / self._nominal_framerate) * 1e9
self._align_timestamps = (capture_time_timestamp_ns, capture_time_assigned_timestamp_ns)
# shift reference to align with camera cycle
_current_timestampset_reference = self._current_timestampset.reference
# _current_timestampset_reference -= (1.0 / self._nominal_framerate) * 1e9

self._align_timestampset = TimestampSet(reference=_current_timestampset_reference, camera=self._current_timestampset.camera)

@abstractmethod
def start(self, nominal_framerate: int = None):
Expand All @@ -57,11 +66,11 @@ def start(self, nominal_framerate: int = None):

# init common abstract props
self._nominal_framerate = nominal_framerate
self._timestamp_monotonic_ns: int = None
self._queue_timestamp_monotonic_ns: Queue = Queue(maxsize=1)
self._event_request_tick: Event = Event()
self._capture = Event()
self._capture_in_progress = False
self._barrier = Barrier(3, action=self.get_timestamps_to_align)
self._current_timestampset = TimestampSet(None, None)
self._align_timestampset = TimestampSet(None, None)

@abstractmethod
def stop(self):
Expand All @@ -75,17 +84,11 @@ def do_capture(self, filename: str = None, number_frames: int = 1):
self._capture.set()

def sync_tick(self, timestamp_ns: int):
self._timestamp_monotonic_ns = timestamp_ns

self._barrier.wait()

# try:
# self._queue_timestamp_monotonic_ns.put_nowait(timestamp_ns)
# except Full:
# logger.info("could not queue timestamp - camera_thread not started, busy, overload or nominal fps to close to cameras max mode fps?")

def request_tick(self):
self._event_request_tick.set()
self._current_timestampset.reference = timestamp_ns
try:
self._barrier.wait()
except BrokenBarrierError:
logger.debug("sync barrier broke")

@abstractmethod
def start_stream(self):
Expand Down
78 changes: 35 additions & 43 deletions node/services/backends/cameras/picamera2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime
from pathlib import Path
from queue import Empty, Queue
from threading import current_thread
from threading import BrokenBarrierError, current_thread

from libcamera import Transform, controls
from picamera2 import Picamera2, Preview
Expand All @@ -29,24 +29,24 @@ def __init__(self, config: ConfigBackendPicamera2):
# private props
self._picamera2: Picamera2 = None
self._nominal_framerate: float = None
self._adjust_sync_offset: int = 0
self._camera_thread: StoppableThread = None
self._processing_thread: StoppableThread = None
self._align_thread: StoppableThread = None
self._queue_processing: Queue = None
self._queue_camera_timestamp_ns: Queue = None
self._streaming_output: StreamingOutput = None

# initialize private props
self._streaming_output: StreamingOutput = StreamingOutput()
self._queue_processing: Queue = Queue()
self._align_thread: StoppableThread = None
self._camera_timestamp_ns: int = None

logger.info(f"global_camera_info {Picamera2.global_camera_info()}")

def start(self, nominal_framerate: int = None):
"""To start the backend, configure picamera2"""
super().start(nominal_framerate=nominal_framerate)

# initialize private props
self._streaming_output: StreamingOutput = StreamingOutput()
self._queue_processing: Queue = Queue()
self._camera_timestamp_ns: int = None

# https://github.com/raspberrypi/picamera2/issues/576
if self._picamera2:
self._picamera2.close()
Expand Down Expand Up @@ -93,9 +93,6 @@ def start(self, nominal_framerate: int = None):

self._init_autofocus()

self._queue_camera_timestamp_ns: Queue = Queue(maxsize=1)
self._camera_timestamp_ns: int = None

self._processing_thread = StoppableThread(name="_processing_thread", target=self._processing_fun, args=(), daemon=True)
self._processing_thread.start()

Expand All @@ -115,6 +112,9 @@ def start(self, nominal_framerate: int = None):
def stop(self):
super().stop()

if self._barrier:
self._barrier.abort()

if self._picamera2:
self._picamera2.stop()
self._picamera2.close() # need to close camera so it can be used by other processes also (or be started again)
Expand Down Expand Up @@ -203,14 +203,10 @@ def clamp(n, min_value, max_value):

def recover(self):
tms = time.time()
for _ in range(4): # 2 is size of buffer so tick->clear no processing.
try:
pass
# self._queue_timestamp_monotonic_ns.get(block=True, timeout=2.0)
# self._queue_camera_timestamp_ns.get(block=True, timeout=2.0)
self._picamera2.drop_frames(1)
except Exception:
pass
try:
self._picamera2.drop_frames(2)
except Exception:
pass

logger.info(f"recovered, time taken: {round((time.time() - tms)*1.0e3, 0)}ms")

Expand All @@ -225,25 +221,17 @@ def _align_fun(self):
self.recover()

while not current_thread().stopped():
# capture_time_assigned_timestamp_ns = 0
# capture_time_timestamp_ns = 0

if self._capture_in_progress:
adjust_cycle_counter = 0 # keep counter 0 until something is in progress and wait X_CYCLES until adjustment is done afterwards

try:
self._barrier.wait()
# print(self._align_timestamps)
# capture_time_timestamp_ns = self._camera_timestamp_ns or 0
# capture_time_assigned_timestamp_ns = self._timestamp_monotonic_ns or 0
except Empty:
logger.error("queue was empty despite barrier was hit!")
break
except TimeoutError:
logger.error("TimeoutError")
# at this point we got an updated self._align_timestampset set in barriers action.
except BrokenBarrierError:
logger.debug("sync barrier broke")
break

timestamp_delta_ns = self._align_timestamps[0] - self._align_timestamps[1] # in ns
timestamp_delta_ns = self._align_timestampset.camera - self._align_timestampset.reference # in ns

# if abs(timestamp_delta_ns) > (1.1e9 * nominal_frame_duration):
# logger.info("delta big, recovering...")
Expand All @@ -267,9 +255,9 @@ def _align_fun(self):
if abs(timestamp_delta_ns / 1.0e6) > THRESHOLD_LOG:
# even in debug reduce verbosity a bit if all is fine and within 2ms tolerance
logger.debug(
f"🕑 clk/sensor/Δ/adjust=( "
f"{(self._align_timestamps[1])/1e6:.1f} / "
f"{self._align_timestamps[0]/1e6:.1f} / "
f"🕑 clk/cam/Δ/adjust=( "
f"{(self._align_timestampset.reference)/1e6:.1f} / "
f"{self._align_timestampset.camera/1e6:.1f} / "
f"{timestamp_delta_ns/1e6:5.1f} / "
f"{adjust_amount_us/1e3:5.1f}) ms"
# f"FrameDuration={round(picam_metadata['FrameDuration']/1e3,1)} ms "
Expand All @@ -278,7 +266,7 @@ def _align_fun(self):
pass
# silent

logger.info("_camera_fun left")
logger.info("_align_fun left")

def _camera_fun(self):
logger.debug("starting _camera_fun")
Expand All @@ -294,14 +282,18 @@ def _camera_fun(self):
self._capture_in_progress = False

else:
picam_metadata = self._picamera2.capture_metadata(wait=2.0)

self._camera_timestamp_ns = picam_metadata["SensorTimestamp"]
self._barrier.wait()
# try:
# self._queue_camera_timestamp_ns.put_nowait(picam_metadata["SensorTimestamp"])
# except Full:
# logger.warning("could not queue camera timestamp!")
try:
picam_metadata = self._picamera2.capture_metadata(wait=2.0)
self._current_timestampset.camera = picam_metadata["SensorTimestamp"]
except TimeoutError as exc:
logger.warning(f"camera timed out: {exc}")
break

try:
self._barrier.wait()
except BrokenBarrierError:
logger.debug("sync barrier broke")
break

logger.info("_camera_fun left")

Expand Down
3 changes: 0 additions & 3 deletions node/services/backends/cameras/virtualcamera.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,3 @@ def sync_tick(self, timestamp_ns: int):
if self._tick_tock_counter > 10:
self._tick_tock_counter = 0
logger.debug("tick")

def request_tick(self):
pass
9 changes: 0 additions & 9 deletions node/services/sync_acquisition_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,6 @@ def _sync_fun(self):
else:
self._camera_backend.sync_tick(timestamp_ns)

try:
self._gpio_backend.wait_for_clock_fall_signal(timeout=1)
except TimeoutError:
# stop devices when no clock is avail, supervisor enables again after clock is received, derives new framerate ans starts backends
logger.error("clock signal missing.")
break
else:
self._camera_backend.request_tick()

logger.info("left _sync_fun") # if left, it allows supervisor to restart if needed.

def _trigger_in_fun(self):
Expand Down

0 comments on commit f9fe9bc

Please sign in to comment.