Skip to content

Commit

Permalink
squirrel: adding downsampling
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Isken committed Nov 13, 2023
1 parent b2bab60 commit baf6a54
Showing 1 changed file with 27 additions and 7 deletions.
34 changes: 27 additions & 7 deletions lassie/waveforms/squirrel.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class SquirrelPrefetcher:
queue: asyncio.Queue[Batch | None]
highpass: float | None
lowpass: float | None
downsample_to: float | None
_fetched_batches: int
_task: asyncio.Task[None]

Expand All @@ -45,12 +46,14 @@ class SquirrelPrefetcher:
def __init__(
self,
iterator: Iterator[Batch],
queue_size: int = 5,
queue_size: int = 10,
downsample_to: float | None = None,
highpass: float | None = None,
lowpass: float | None = None,
) -> None:
self.iterator = iterator
self.queue = asyncio.Queue(maxsize=queue_size)
self.downsample_to = downsample_to
self.highpass = highpass
self.lowpass = lowpass
self._fetched_batches = 0
Expand All @@ -60,17 +63,29 @@ def __init__(
async def prefetch_worker(self) -> None:
logger.info("start prefetching data, queue size %d", self.queue.maxsize)

def filter_freqs(batch: Batch) -> Batch:
def post_processing(batch: Batch) -> Batch:
# Filter traces in-place
start = None
if self.downsample_to:
try:
start = datetime_now()
desired_deltat = 1.0 / self.downsample_to
for tr in batch.traces:
if tr.deltat < desired_deltat:
tr.downsample_to(desired_deltat, allow_upsample_max=2)
except Exception as exc:
logger.exception(exc)

if self.highpass:
start = datetime_now()
start = start or datetime_now()
for tr in batch.traces:
tr.highpass(4, corner=self.highpass)

if self.lowpass:
start = start or datetime_now()
for tr in batch.traces:
tr.lowpass(4, corner=self.lowpass)

if start:
logger.debug("filtered traces in %s", datetime_now() - start)
return batch
Expand All @@ -84,7 +99,7 @@ def filter_freqs(batch: Batch) -> Batch:
await self.queue.put(None)
break

await asyncio.to_thread(filter_freqs, batch)
await asyncio.to_thread(post_processing, batch)
logger.debug("prefetched waveforms in %s", self.fetch_time)

self._fetched_batches += 1
Expand Down Expand Up @@ -157,6 +172,10 @@ class PyrockoSquirrel(WaveformProvider):
default=None,
description="Lowpass filter, corner frequency in Hz.",
)
downsample_to: PositiveFloat | None = Field(
default=100.0,
description="Downsample the data to a desired frequency",
)

channel_selector: str = Field(
default="*",
Expand All @@ -165,7 +184,7 @@ class PyrockoSquirrel(WaveformProvider):
"use e.g. `EN?` for selection of all accelerometer data.",
)
async_prefetch_batches: PositiveInt = Field(
default=5,
default=10,
description="Queue size for asynchronous pre-fetcher.",
)

Expand Down Expand Up @@ -246,8 +265,9 @@ async def iter_batches(
prefetcher = SquirrelPrefetcher(
iterator,
self.async_prefetch_batches,
self.highpass,
self.lowpass,
downsample_to=self.downsample_to,
highpass=self.highpass,
lowpass=self.lowpass,
)
stats.set_queue(prefetcher.queue)

Expand Down

0 comments on commit baf6a54

Please sign in to comment.