Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
miili committed Nov 5, 2023
1 parent f299f3f commit 4ad0440
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
10 changes: 9 additions & 1 deletion lassie/images/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class ImageFunctions(RootModel):
root: list[ImageFunctionType] = [PhaseNet()]

_queue: asyncio.Queue[Tuple[WaveformImages, WaveformBatch] | None] = PrivateAttr()
_processed_images: int = PrivateAttr(0)

def model_post_init(self, __context: Any) -> None:
# Check if phases are provided twice
Expand Down Expand Up @@ -71,17 +72,24 @@ async def iter_images(
"""

async def worker() -> None:
logger.info("start prefetching data, queue size %d", self._queue.maxsize)
logger.info(
"start pre-processing images, queue size %d", self._queue.maxsize
)
async for batch in batch_iterator:
images = await self.process_traces(batch.traces)
if self._queue.empty() and self._processed_images:
logger.warning("image queue ran empty, prefetching is too slow")
self._processed_images += 1
await self._queue.put((images, batch))

await self._queue.put(None)

task = asyncio.create_task(worker())

while True:
ret = await self._queue.get()
if ret is None:
logger.debug("image function finished")
break
yield ret

Expand Down
7 changes: 5 additions & 2 deletions lassie/waveforms/squirrel.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def filter_freqs(batch: Batch) -> Batch:
await asyncio.to_thread(filter_freqs, batch)
logger.debug("prefetched waveforms in %s", datetime_now() - start)
if self.queue.empty() and self._fetched_batches:
logger.warning("queue ran empty, prefetching is too slow")
logger.warning("waveform queue ran empty, prefetching is too slow")

self._fetched_batches += 1
await self.queue.put(batch)
Expand Down Expand Up @@ -126,7 +126,10 @@ class PyrockoSquirrel(WaveformProvider):
description="Channel selector for waveforms, "
"use e.g. `EN?` for selection of all accelerometer data.",
)
async_prefetch_batches: PositiveInt = 4
async_prefetch_batches: PositiveInt = Field(
4,
description="Queue size for asynchronous pre-fetcher.",
)

_squirrel: Squirrel | None = PrivateAttr(None)
_stations: Stations = PrivateAttr()
Expand Down

0 comments on commit 4ad0440

Please sign in to comment.