From 4ad0440d10867b7cfa18d195a646220d53007081 Mon Sep 17 00:00:00 2001 From: miili Date: Sun, 5 Nov 2023 14:24:48 +0100 Subject: [PATCH] fixes --- lassie/images/__init__.py | 10 +++++++++- lassie/waveforms/squirrel.py | 7 +++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/lassie/images/__init__.py b/lassie/images/__init__.py index 0e2c7e8e..c6139409 100644 --- a/lassie/images/__init__.py +++ b/lassie/images/__init__.py @@ -41,6 +41,7 @@ class ImageFunctions(RootModel): root: list[ImageFunctionType] = [PhaseNet()] _queue: asyncio.Queue[Tuple[WaveformImages, WaveformBatch] | None] = PrivateAttr() + _processed_images: int = PrivateAttr(0) def model_post_init(self, __context: Any) -> None: # Check if phases are provided twice @@ -71,10 +72,16 @@ async def iter_images( """ async def worker() -> None: - logger.info("start prefetching data, queue size %d", self._queue.maxsize) + logger.info( + "start pre-processing images, queue size %d", self._queue.maxsize + ) async for batch in batch_iterator: images = await self.process_traces(batch.traces) + if self._queue.empty() and self._processed_images: + logger.warning("image queue ran empty, prefetching is too slow") + self._processed_images += 1 await self._queue.put((images, batch)) + await self._queue.put(None) task = asyncio.create_task(worker()) @@ -82,6 +89,7 @@ async def worker() -> None: while True: ret = await self._queue.get() if ret is None: + logger.debug("image function finished") break yield ret diff --git a/lassie/waveforms/squirrel.py b/lassie/waveforms/squirrel.py index b4fec49e..719b3b80 100644 --- a/lassie/waveforms/squirrel.py +++ b/lassie/waveforms/squirrel.py @@ -81,7 +81,7 @@ def filter_freqs(batch: Batch) -> Batch: await asyncio.to_thread(filter_freqs, batch) logger.debug("prefetched waveforms in %s", datetime_now() - start) if self.queue.empty() and self._fetched_batches: - logger.warning("queue ran empty, prefetching is too slow") + logger.warning("waveform queue ran empty, prefetching is too slow") self._fetched_batches += 1 await self.queue.put(batch) @@ -126,7 +126,10 @@ class PyrockoSquirrel(WaveformProvider): description="Channel selector for waveforms, " "use e.g. `EN?` for selection of all accelerometer data.", ) - async_prefetch_batches: PositiveInt = 4 + async_prefetch_batches: PositiveInt = Field( + 4, + description="Queue size for asynchronous pre-fetcher.", + ) _squirrel: Squirrel | None = PrivateAttr(None) _stations: Stations = PrivateAttr()