From 65b8aef500c389e0b9651f9f3b0a9ba829704020 Mon Sep 17 00:00:00 2001 From: Marius Isken Date: Sat, 6 Jan 2024 12:50:24 +0100 Subject: [PATCH] upd --- src/qseek/magnitudes/moment_magnitude.py | 53 ++++++++++---- .../magnitudes/moment_magnitude_store.py | 72 ++++++++++++++++--- src/qseek/octree.py | 6 +- src/qseek/search.py | 2 + 4 files changed, 106 insertions(+), 27 deletions(-) diff --git a/src/qseek/magnitudes/moment_magnitude.py b/src/qseek/magnitudes/moment_magnitude.py index e19df90e..f34e68d0 100644 --- a/src/qseek/magnitudes/moment_magnitude.py +++ b/src/qseek/magnitudes/moment_magnitude.py @@ -27,6 +27,8 @@ from pyrocko.trace import Trace from qseek.models.detection import EventDetection, Receiver + from qseek.models.station import Stations + from qseek.octree import Octree logger = logging.getLogger(__name__) @@ -158,7 +160,7 @@ def get_magnitude( try: modelled_amplitude = await store.get_amplitude( - source_depth=event.depth, + source_depth=event.effective_depth, distance=station_amplitudes.distance_epi, n_amplitudes=25, peak_amplitude=peak_amplitude, @@ -211,7 +213,7 @@ class MomentMagnitudeExtractor(EventMagnitudeCalculator): seconds_before: PositiveFloat = 10.0 seconds_after: PositiveFloat = 10.0 - padding_seconds: PositiveFloat = 10.0 + padding_seconds: PositiveFloat = 5.0 gf_store_dirs: list[DirectoryPath] = [Path(".")] @@ -231,6 +233,17 @@ def model_post_init(self, __context: Any) -> None: logger.info("Loading peak amplitude stores...") self._stores = [cache.get_store(definition) for definition in self.models] + async def prepare(self, octree: Octree, stations: Stations) -> None: + logger.info("Preparing peak amplitude stores...") + root_nodes = octree.get_root_nodes(octree.size_initial) + depths = np.array([node.as_location().effective_depth for node in root_nodes]) + + for store in self._stores: + for depth in depths: + if store.has_depth(depth): + continue + await store.fill_source_depth(depth) + async def add_magnitude( self, squirrel: Squirrel, @@ -241,27 +254,36 @@ async def add_magnitude( receivers = list(event.receivers) for store, definition in zip(self._stores, self.models, strict=True): store_receivers = definition.filter_receivers(receivers) - if not store_receivers: - continue for rcv in store_receivers: receivers.remove(rcv) + if not store_receivers: + continue + + if definition.max_distance is not None: + store_receivers = [ + rcv + for rcv in store_receivers + if rcv.surface_distance_to(event) <= definition.max_distance + ] + traces = await event.receivers.get_waveforms_restituted( squirrel, + receivers=store_receivers, quantity=store.quantity, seconds_before=self.seconds_before, seconds_after=self.seconds_after, demean=True, - cut_off_fade=True, seconds_fade=self.padding_seconds, + cut_off_fade=True, ) if not traces: - logger.warning("No restituted traces found for event %s", event.time) return for tr in traces: - tr.highpass(4, store.frequency_range.min) - tr.lowpass(4, store.frequency_range.max) + if store.frequency_range.min: + tr.highpass(4, store.frequency_range.min, demean=True) + tr.lowpass(4, store.frequency_range.max, demean=True) grouped_traces = [] receivers = [] @@ -270,13 +292,14 @@ async def add_magnitude( ): grouped_traces.append(list(grp_traces)) receivers.append(event.receivers.get_receiver(nsl)) - await moment_magnitude.add_traces( - store=store, - event=event, - receivers=receivers, - traces=grouped_traces, - peak_amplitude=definition.peak_amplitude, - ) + + await moment_magnitude.add_traces( + store=store, + event=event, + receivers=receivers, + traces=grouped_traces, + peak_amplitude=definition.peak_amplitude, + ) if not moment_magnitude.average: logger.warning("No moment magnitude found for event %s", event.time) diff --git a/src/qseek/magnitudes/moment_magnitude_store.py b/src/qseek/magnitudes/moment_magnitude_store.py index 5dfd829d..c718c1c7 100644 --- a/src/qseek/magnitudes/moment_magnitude_store.py +++ b/src/qseek/magnitudes/moment_magnitude_store.py @@ -472,6 +472,19 @@ def from_selector(cls, selector: PeakAmplitudesBase) -> Self: def source_depth_range(self) -> Range: return Range.from_list([sa.source_depth for sa in self.site_amplitudes]) + def has_depth(self, depth: float) -> bool: + """ + Check if the moment magnitude store has a given depth. + + Args: + depth (float): The depth to check. + + Returns: + bool: True if the store has the given depth, False otherwise. + """ + depths = [sa.source_depth for sa in self.site_amplitudes] + return depth in depths + def get_store(self) -> gf.Store: """ Load the GF store for the given store ID. @@ -489,12 +502,10 @@ def get_store(self) -> gf.Store: config = store.config if not isinstance(config, gf.ConfigTypeA): raise EnvironmentError("GF store is not of type ConfigTypeA.") - if 1.0 / config.deltat < self.frequency_range.max: raise ValueError( f"Pyrocko GF store frequency {1.0 / config.deltat} too low." ) - return store def is_suited(self, selector: PeakAmplitudesBase) -> bool: @@ -513,13 +524,17 @@ def is_suited(self, selector: PeakAmplitudesBase) -> bool: and self.gf_interpolation == selector.gf_interpolation and self.quantity == selector.quantity and self.reference_magnitude == selector.reference_magnitude - and self.rupture_velocities.min <= selector.rupture_velocities.min - and self.rupture_velocities.max >= selector.rupture_velocities.max - and self.stress_drop.min <= selector.stress_drop.min - and self.stress_drop.max >= selector.stress_drop.max + and self.rupture_velocities.min == selector.rupture_velocities.min + and self.rupture_velocities.max == selector.rupture_velocities.max + and self.stress_drop.min == selector.stress_drop.min + and self.stress_drop.max == selector.stress_drop.max ) if selector.frequency_range: - result = result and self.frequency_range.max <= selector.frequency_range.max + result = ( + result + and self.frequency_range.min == selector.frequency_range.min + and self.frequency_range.max == selector.frequency_range.max + ) return result def _get_random_source(self, depth: float) -> gf.MTSource: @@ -822,6 +837,12 @@ def save(self, path: Path | None = None) -> None: file.write_text(self.model_dump_json()) +class CacheStats(NamedTuple): + path: Path + n_stores: int + bytes: int + + class PeakAmplitudeStoreCache: cache_dir: Path engine: gf.LocalEngine @@ -829,6 +850,8 @@ class PeakAmplitudeStoreCache: def __init__(self, cache_dir: Path, engine: gf.LocalEngine | None = None) -> None: self.cache_dir = cache_dir self.cache_dir.mkdir(parents=True, exist_ok=True) + logger.info("cache stats: %s", self.cache_stats()) + self.clean_cache() self.engine = engine or gf.LocalEngine(store_superdirs=["."]) PeakAmplitudesStore.set_engine(engine) @@ -836,11 +859,42 @@ def __init__(self, cache_dir: Path, engine: gf.LocalEngine | None = None) -> Non def clear_cache(self): """ Clear the cache directory. + + This method deletes all files in the cache directory. """ - cache_dir = self.cache_dir / "site_amplitudes" - for file in cache_dir.glob("*"): + logger.info("clearing cache directory %s", self.cache_dir) + for file in self.cache_dir.glob("*"): file.unlink() + def clean_cache(self, keep_files: int = 100) -> None: + """ + Clean the cache directory. + + Args: + keep_files (int, optional): The number of most recent files to keep in the + cache directory. Defaults to 100. + """ + files = sorted(self.cache_dir.glob("*"), key=lambda f: f.stat().st_mtime) + if len(files) <= keep_files: + return + logger.info("cleaning cache directory %s", self.cache_dir) + for file in files[keep_files:]: + file.unlink() + + def cache_stats(self) -> CacheStats: + """ + Get the cache statistics. + + Returns: + CacheStats: The cache statistics. + """ + n_stores = 0 + nbytes = 0 + for file in self.cache_dir.glob("*.json"): + n_stores += 1 + nbytes += file.stat().st_size + return CacheStats(path=self.cache_dir, n_stores=n_stores, bytes=nbytes) + def get_cached_stores( self, store_id: str, quantity: MeasurementUnit ) -> list[PeakAmplitudesStore]: diff --git a/src/qseek/octree.py b/src/qseek/octree.py index 4d36912a..cd069816 100644 --- a/src/qseek/octree.py +++ b/src/qseek/octree.py @@ -282,7 +282,7 @@ def check_limits(self) -> Octree: def model_post_init(self, __context: Any) -> None: """Initialize octree. This method is called by the pydantic model""" - self._root_nodes = self._get_root_nodes(self.size_initial) + self._root_nodes = self.get_root_nodes(self.size_initial) logger.info( "initializing octree volume with %d nodes and %.1f kmĀ³," " smallest node size: %.1f m", @@ -303,7 +303,7 @@ def extent(self) -> tuple[float, float, float]: self.depth_bounds.max - self.depth_bounds.min, ) - def _get_root_nodes(self, length: float) -> list[Node]: + def get_root_nodes(self, length: float) -> list[Node]: ln = length ext_east, ext_north, ext_depth = self.extent() east_nodes = np.arange(ext_east // ln) * ln + ln / 2 + self.east_bounds.min @@ -346,7 +346,7 @@ def reset(self) -> Self: """Reset the octree to its initial state""" logger.debug("resetting tree") self._clear_cache() - self._root_nodes = self._get_root_nodes(self.size_initial) + self._root_nodes = self.get_root_nodes(self.size_initial) return self def reduce_surface(self, accumulator: Callable = np.max) -> np.ndarray: diff --git a/src/qseek/search.py b/src/qseek/search.py index d6031376..cb4f6cad 100644 --- a/src/qseek/search.py +++ b/src/qseek/search.py @@ -384,6 +384,8 @@ async def prepare(self) -> None: phases=self.image_functions.get_phases(), rundir=self._rundir, ) + for magnitude in self.magnitudes: + await magnitude.prepare(self.octree, self.stations) self.init_boundaries() async def start(self, force_rundir: bool = False) -> None: