Skip to content

Commit

Permalink
finished refactor on dev, now test picam
Browse files Browse the repository at this point in the history
  • Loading branch information
mgineer85 committed Nov 3, 2024
1 parent ae293b4 commit d2bf3c1
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 198 deletions.
6 changes: 2 additions & 4 deletions wigglecam/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
from .services.acquisitionservice import AcquisitionService
from .services.baseservice import BaseService
from .services.config import appconfig
from .services.jobconnectedservice import JobConnectedService
from .services.jobstandaloneservice import JobStandaloneService
from .services.jobservice import JobService
from .services.loggingservice import LoggingService

logger = logging.getLogger(__name__)
Expand All @@ -22,8 +21,7 @@ class Container:
# container
logging_service = LoggingService(config=appconfig.logging)
synced_acquisition_service = AcquisitionService(config=appconfig.syncedacquisition)
jobconnectedservice = JobConnectedService(config=appconfig.jobconnected, acquisition_service=synced_acquisition_service)
jobstandaloneservice = JobStandaloneService(config=appconfig.jobstandalone, acquisition_service=synced_acquisition_service)
jobconnectedservice = JobService(config=appconfig.jobconnected, acquisition_service=synced_acquisition_service)

def __init__(self):
# ensure dirs are avail
Expand Down
2 changes: 1 addition & 1 deletion wigglecam/routers/api/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from fastapi import APIRouter, HTTPException, status

from ...container import container
from ...services.jobconnectedservice import JobItem, JobRequest
from ...services.jobservice import JobItem, JobRequest

logger = logging.getLogger(__name__)
router = APIRouter(
Expand Down
111 changes: 41 additions & 70 deletions wigglecam/services/acquisitionservice.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,26 @@
import logging
import os
import time
from dataclasses import dataclass
from datetime import datetime
from importlib import import_module
from pathlib import Path
from queue import Empty, Queue
from threading import Event, current_thread

from ..utils.stoppablethread import StoppableThread
from .backends.cameras.abstractbackend import AbstractCameraBackend, BackendItem, BackendRequest
from .backends.cameras.abstractbackend import AbstractCameraBackend
from .backends.io.abstractbackend import AbstractIoBackend
from .baseservice import BaseService
from .config.models import ConfigSyncedAcquisition

logger = logging.getLogger(__name__)


@dataclass
class AcqRequest:
seq_no: int
# nothing to align here until today...
DATA_PATH = Path("./media")
# as from image source
PATH_STANDALONE = DATA_PATH / "standalone"


@dataclass
class AcquisitionItem:
# request: AcqRequest
# backenditem: BackendItem
filepath: Path
print(DATA_PATH)
print(PATH_STANDALONE)


class AcquisitionService(BaseService):
Expand All @@ -44,13 +39,16 @@ def __init__(self, config: ConfigSyncedAcquisition):
self._trigger_out_thread: StoppableThread = None
self._supervisor_thread: StoppableThread = None

self._flag_trigger_job: Event = None
self._flag_trigger_out: Event = None
self._device_initialized_once: bool = False

# initialize private properties.
self._flag_trigger_job: Event = Event()
self._flag_trigger_out: Event = Event()
self._queue_in: Queue[AcqRequest] = Queue()
self._queue_out: Queue[AcquisitionItem] = Queue()

# ensure data directories exist
os.makedirs(f"{PATH_STANDALONE}", exist_ok=True)

def start(self):
super().start()
Expand Down Expand Up @@ -101,6 +99,9 @@ def wait_for_hires_frame(self):
def wait_for_hires_image(self, format: str):
return self._camera_backend.wait_for_hires_image(format=format)

def encode_frame_to_image(self, frame, format: str):
return self._camera_backend.encode_frame_to_image(frame, format)

def gen_stream(self):
"""
yield jpeg images to stream to client (if not created otherwise)
Expand Down Expand Up @@ -131,12 +132,15 @@ def trigger_execute_job(self):
# maybe config can be changed in future and so also the _tirgger_out_thread is not started on secondary nodes.
self._flag_trigger_out.set()

def wait_for_trigger_in(self, timeout: float = None):
def wait_for_trigger_job(self, timeout: float = None):
# maybe in future replace by this? lets see... https://superfastpython.com/thread-race-condition-timing/
# there is only one thread allowed to listen to this event: the jobXservice. Otherwise the event could be missed.
val = self._gpio_backend._trigger_in_flag.wait(timeout)
# standalone mode could process it also, so need to clarify in
# TODO: check how to allow two listening threads.
val = self._flag_trigger_job.wait(timeout)
if val:
# if true, directly clear, because we trigger only once!
self._gpio_backend._trigger_in_flag.clear()
self._flag_trigger_job.clear()
return val

def _device_start(self, derived_fps: int):
Expand Down Expand Up @@ -252,61 +256,28 @@ def _sync_fun(self):
logger.info("left _sync_fun") # if left, it allows supervisor to restart if needed.

def _trigger_in_fun(self):
logger.info("_trigger_in_fun started")
while not current_thread().stopped():
time.sleep(1)
continue

if self._gpio_backend._trigger_in_flag.wait(timeout=1.0):
self._gpio_backend._trigger_in_flag.clear() # first clear to avoid endless loops

logger.info("trigger_in received to start processing job")

# this is implementation for wigglecam_minimal to allow working without external job setup.
if self._queue_in.empty() and self._config.allow_standalone_job:
# useful if mobile camera is without any interconnection to a concentrator that could setup a job
self._queue_in.put(AcqRequest(seq_no=0))
logger.info("default job was added to the input queue")

# send down to backend the job in input queue
# the jobs have just to be in the queue, the backend is taking care about the correct timing -
# it might fail if it can not catch up with the framerate
while not current_thread().stopped():
try:
acqrequest = self._queue_in.get_nowait()
logger.info(f"got acquisition request off the queue: {acqrequest}, passing to capture backend.")
backendrequest = BackendRequest()
self._camera_backend._queue_in.put(backendrequest)
except Empty:
logger.info("all capture jobs sent to backend...")
break # leave inner processing loop, continue listen to trigger in outer.

# get back the jobs one by one
# TODO: maybe we don't need to wait later for join...
logger.info("waiting for job to finish")
self._camera_backend._queue_in.join()
logger.info("ok, continue")

while not current_thread().stopped():
try:
backenditem: BackendItem = self._camera_backend._queue_out.get_nowait()
acquisitionitem = AcquisitionItem(
filepath=backenditem.filepath,
)
self._queue_out.put(acquisitionitem)
logger.info(f"put {acquisitionitem} on queue output")
self._queue_in.task_done()
except Empty:
logger.info("all capture jobs received from backend...")
break # leave inner processing loop, continue listen to trigger in outer.
except TimeoutError:
logger.info("timed out waiting for job to finish :(")
break

logger.info("trigger_in finished, waiting for next job")
# if true, directly clear, because we trigger only once!
self._gpio_backend._trigger_in_flag.clear()
# maybe in future replace by this? lets see... https://superfastpython.com/thread-race-condition-timing/

else:
pass
# flag not set, continue
# job is triggered by this flag.
self._flag_trigger_job.set()

if self._config.standalone_mode:
# recommended to disable in production.
logger.info("standalone mode is enabled! to use the job processor and connectivity, disable standalone mode in config!")
frame = self.wait_for_hires_frame()

filename = Path(f"img_{datetime.now().astimezone().strftime('%Y%m%d-%H%M%S-%f')}").with_suffix(".jpg")
filepath = PATH_STANDALONE / filename

with open(filepath, "wb") as f:
f.write(self.encode_frame_to_image(frame, "jpg"))

logger.info(f"image saved to {filepath}")

logger.info("left _trigger_in_fun") # if left, it allows supervisor to restart if needed.

Expand Down
6 changes: 3 additions & 3 deletions wigglecam/services/backends/cameras/abstractbackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,19 @@ def stop_stream(self):
pass

@abstractmethod
def wait_for_lores_image(self):
def wait_for_lores_image(self) -> bytes:
pass

@abstractmethod
def wait_for_hires_frame(self):
pass

@abstractmethod
def wait_for_hires_image(self, format: str):
def wait_for_hires_image(self, format: str) -> bytes:
return self.encode_frame_to_image(self.wait_for_hires_frame(), format)

@abstractmethod
def encode_frame_to_image(self, format: str):
def encode_frame_to_image(self, frame, format: str) -> bytes:
pass

@abstractmethod
Expand Down
10 changes: 5 additions & 5 deletions wigglecam/services/backends/cameras/virtualcamera.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,25 @@ def stop_stream(self):
def wait_for_hires_frame(self):
return self.wait_for_lores_image()

def wait_for_hires_image(self, *args, **kwargs):
return super().wait_for_hires_image(*args, **kwargs)
def wait_for_hires_image(self, format: str):
return super().wait_for_hires_image(format=format)

def encode_frame_to_image(self, frame, format: str):
def encode_frame_to_image(self, frame, format: str) -> bytes:
# for virtualcamera frame == jpeg data, so no convertion needed.
if format in ("jpg", "jpeg"):
return frame
else:
raise NotImplementedError

def _produce_dummy_image(self):
def _produce_dummy_image(self) -> bytes:
byte_io = io.BytesIO()
imarray = numpy.random.rand(250, 250, 3) * 255
random_image = Image.fromarray(imarray.astype("uint8"), "RGB")
random_image.save(byte_io, format="JPEG", quality=50)

return byte_io.getbuffer()

def wait_for_lores_image(self):
def wait_for_lores_image(self) -> bytes:
"""for other threads to receive a lores JPEG image"""

with self._data_condition:
Expand Down
3 changes: 1 addition & 2 deletions wigglecam/services/config/appconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pydantic import PrivateAttr
from pydantic_settings import BaseSettings, SettingsConfigDict

from .models import ConfigJobConnected, ConfigJobStandalone, ConfigLogging, ConfigSyncedAcquisition
from .models import ConfigJobConnected, ConfigLogging, ConfigSyncedAcquisition


class AppConfig(BaseSettings):
Expand All @@ -26,7 +26,6 @@ class AppConfig(BaseSettings):
logging: ConfigLogging = ConfigLogging()
syncedacquisition: ConfigSyncedAcquisition = ConfigSyncedAcquisition()
jobconnected: ConfigJobConnected = ConfigJobConnected()
jobstandalone: ConfigJobStandalone = ConfigJobStandalone()

model_config = SettingsConfigDict(
env_file_encoding="utf-8",
Expand Down
6 changes: 2 additions & 4 deletions wigglecam/services/config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,11 @@ class GroupIoBackend(BaseModel):


class ConfigSyncedAcquisition(BaseModel):
standalone_mode: bool = Field(default=True)

camera_backends: GroupCameraBackend = Field(default=GroupCameraBackend())
io_backends: GroupIoBackend = Field(default=GroupIoBackend())


class ConfigJobStandalone(BaseModel):
enabled: bool = Field(default=True)


class ConfigJobConnected(BaseModel):
pass
Loading

0 comments on commit d2bf3c1

Please sign in to comment.