Skip to content

Commit

Permalink
finished implementation on picam device.
Browse files Browse the repository at this point in the history
  • Loading branch information
mgineer85 committed Nov 3, 2024
1 parent d2bf3c1 commit c220dae
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 71 deletions.
5 changes: 4 additions & 1 deletion wigglecam/services/acquisitionservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ def wait_for_trigger_job(self, timeout: float = None):
self._flag_trigger_job.clear()
return val

def clear_trigger_job_flag(self):
self._flag_trigger_job.clear()

def _device_start(self, derived_fps: int):
logger.info("starting device")

Expand Down Expand Up @@ -275,7 +278,7 @@ def _trigger_in_fun(self):
filepath = PATH_STANDALONE / filename

with open(filepath, "wb") as f:
f.write(self.encode_frame_to_image(frame, "jpg"))
f.write(self.encode_frame_to_image(frame, "jpeg"))

logger.info(f"image saved to {filepath}")

Expand Down
133 changes: 65 additions & 68 deletions wigglecam/services/backends/cameras/picamera2.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
import dataclasses
import io
import logging
import time
from datetime import datetime
from pathlib import Path
from queue import Empty, Queue
from threading import BrokenBarrierError, current_thread
from threading import BrokenBarrierError, Condition, Event, current_thread

from libcamera import Transform, controls
from picamera2 import Picamera2, Preview
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput

from ....utils.stoppablethread import StoppableThread
from ...config.models import ConfigBackendPicamera2
from .abstractbackend import AbstractCameraBackend, BackendItem, BackendRequest, StreamingOutput
from .abstractbackend import AbstractCameraBackend, StreamingOutput

logger = logging.getLogger(__name__)


ADJUST_EVERY_X_CYCLE = 10


@dataclasses.dataclass
class HiresData:
# dataframe
frame: object = None
# signal to producer that requesting thread is ready to be notified
request_hires_still: Event = None
# condition when frame is avail
condition: Condition = None


class Picamera2Backend(AbstractCameraBackend):
def __init__(self, config: ConfigBackendPicamera2):
super().__init__()
Expand All @@ -29,9 +37,8 @@ def __init__(self, config: ConfigBackendPicamera2):
# private props
self._picamera2: Picamera2 = None
self._nominal_framerate: float = None
self._processing_thread: StoppableThread = None
self._queue_processing: Queue[tuple[BackendRequest, object]] = None
self._streaming_output: StreamingOutput = None
self._hires_data: HiresData = None

logger.info(f"global_camera_info {Picamera2.global_camera_info()}")

Expand All @@ -41,7 +48,7 @@ def start(self, nominal_framerate: int = None):

# initialize private props
self._streaming_output: StreamingOutput = StreamingOutput()
self._queue_processing: Queue[tuple[BackendRequest, object]] = Queue()
self._hires_data: HiresData = HiresData(frame=None, request_hires_still=Event(), condition=Condition())

# https://github.com/raspberrypi/picamera2/issues/576
if self._picamera2:
Expand Down Expand Up @@ -89,9 +96,6 @@ def start(self, nominal_framerate: int = None):

self._init_autofocus()

self._processing_thread = StoppableThread(name="_processing_thread", target=self._processing_fun, args=(), daemon=True)
self._processing_thread.start()

logger.info(f"camera_config: {self._picamera2.camera_config}")
logger.info(f"camera_controls: {self._picamera2.camera_controls}")
logger.info(f"controls: {self._picamera2.controls}")
Expand All @@ -108,17 +112,12 @@ def stop(self):
self._picamera2.stop()
self._picamera2.close() # need to close camera so it can be used by other processes also (or be started again)

if self._processing_thread and self._processing_thread.is_alive():
self._processing_thread.stop()
self._processing_thread.join()

logger.debug(f"{self.__module__} stopped")

def camera_alive(self) -> bool:
super_alive = super().camera_alive()
processing_alive = self._processing_thread and self._processing_thread.is_alive()

return super_alive and processing_alive
return super_alive and True

def start_stream(self):
self._picamera2.stop_recording()
Expand All @@ -143,6 +142,34 @@ def wait_for_lores_image(self):

return self._streaming_output.frame

def wait_for_hires_frame(self):
with self._hires_data.condition:
self._hires_data.request_hires_still.set()

if not self._hires_data.condition.wait(timeout=2.0):
raise TimeoutError("timeout receiving frames")

self._hires_data.request_hires_still.clear()
return self._hires_data.frame

def wait_for_hires_image(self, format: str) -> bytes:
return super().wait_for_hires_image(format=format)

def encode_frame_to_image(self, frame, format: str) -> bytes:
# for picamera2 frame is a == jpeg data, so no convertion needed.
if format == "jpeg":
tms = time.time()

bytes_io = io.BytesIO()
image = self._picamera2.helpers.make_image(frame, self._picamera2.camera_config["main"])
image.save(bytes_io, format="jpeg", quality=self._config.original_still_quality)
logger.info(f"jpg encode finished, time taken: {round((time.time() - tms)*1.0e3, 0)}ms")

return bytes_io.getbuffer()

else:
raise NotImplementedError

def _check_framerate(self):
assert self._nominal_framerate is not None

Expand Down Expand Up @@ -247,64 +274,34 @@ def _camera_fun(self):
self._started_evt.wait(timeout=10) # we wait very long, it would usually not time out except there is a bug and this unstalls

while not current_thread().stopped():
backendrequest = None
if self._hires_data.request_hires_still.is_set():
# only capture one pic and return, overlying classes are responsible to ask again if needed fast enough
self._hires_data.request_hires_still.clear()

try:
backendrequest = self._queue_in.get_nowait()
except Empty:
pass # no actual job to process...

if backendrequest:
# TODO: check if capable to keep up with the framerate or something get lost? for now we only use 1 frame so it's ok
tms = time.time()
buffer = self._picamera2.capture_buffer("main", wait=2.0)
self._queue_processing.put((backendrequest, buffer))
logger.info(f"queued up buffer to process image, time taken: {round((time.time() - tms)*1.0e3, 0)}ms")
# capture hq picture
with self._picamera2.captured_request(wait=1.5) as request:
self._hires_data.frame = request.make_buffer("main")
picam_metadata = request.get_metadata()

logger.info("got buffer from cam to send to waiting threads")

with self._hires_data.condition:
self._hires_data.condition.notify_all()
else:
# capture metadata blocks until new metadata is avail
try:
picam_metadata = self._picamera2.capture_metadata(wait=2.0)
self._current_timestampset.camera = picam_metadata["SensorTimestamp"]

except TimeoutError as exc:
logger.warning(f"camera timed out: {exc}")
break

try:
self._barrier.wait()
except BrokenBarrierError:
logger.debug("sync barrier broke")
break

logger.info("_camera_fun left")
self._current_timestampset.camera = picam_metadata["SensorTimestamp"]

def _processing_fun(self):
# TODO: this might be better in multiprocessing or use some lib that is in c++ releasing the gil during processing...
logger.debug("starting _processing_fun")
buffer_to_proc = None

while not current_thread().stopped():
try:
(backendrequest, buffer_to_proc) = self._queue_processing.get(block=True, timeout=1.0)
logger.info("got img off queue, jpg proc start")
except Empty:
continue # just continue but allow .stopped to exit after 1.0 sec latest...

# start processing here...
folder = Path("./tmp/")
filename = Path(f"img_{datetime.now().astimezone().strftime('%Y%m%d-%H%M%S-%f')}")
filepath = folder / filename
logger.info(f"{filepath=}")

tms = time.time()
image = self._picamera2.helpers.make_image(buffer_to_proc, self._picamera2.camera_config["main"])
image.save(filepath.with_suffix(".jpg"), quality=self._config.original_still_quality)
logger.info(f"jpg compression finished, time taken: {round((time.time() - tms)*1.0e3, 0)}ms")

backenditem = BackendItem(
filepath=filepath,
)
self._queue_out.put(backenditem)
logger.info(f"result item put on output queue: {backenditem}")

self._queue_in.task_done()
self._barrier.wait()
except BrokenBarrierError:
logger.debug("sync barrier broke")
break

logger.info("_processing_fun left")
logger.info("_camera_fun left")
2 changes: 1 addition & 1 deletion wigglecam/services/backends/cameras/virtualcamera.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def wait_for_hires_image(self, format: str):

def encode_frame_to_image(self, frame, format: str) -> bytes:
# for virtualcamera frame == jpeg data, so no convertion needed.
if format in ("jpg", "jpeg"):
if format == "jpeg":
return frame
else:
raise NotImplementedError
Expand Down
4 changes: 3 additions & 1 deletion wigglecam/services/jobservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ def setup_job_request(self, jobrequest: JobRequest) -> JobItem:
if self._current_job:
raise ConnectionRefusedError("there is already an unprocessed job! reset first to queue a new job or process it")

self._acquisition_service.clear_trigger_job_flag() # reset, otherwise if it was set, the job is processed immediately

self._current_job = JobItem(request=jobrequest)
self.db_add_jobitem(self._current_job)

Expand Down Expand Up @@ -172,7 +174,7 @@ def _jobprocessor_fun(self):
filename = Path(f"img_{frame.captured_time}_{frame.seq:>03}").with_suffix(".jpg")
filepath = PATH_ORIGINAL / filename
with open(filepath, "wb") as f:
f.write(self._acquisition_service.encode_frame_to_image(frame.frame, "jpg"))
f.write(self._acquisition_service.encode_frame_to_image(frame.frame, "jpeg"))

self._current_job.filepaths.append(filepath)
logger.info(f"image saved to {filepath}")
Expand Down

0 comments on commit c220dae

Please sign in to comment.