Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Isken committed Jan 6, 2024
1 parent 432bbc6 commit 65b8aef
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 27 deletions.
53 changes: 38 additions & 15 deletions src/qseek/magnitudes/moment_magnitude.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from pyrocko.trace import Trace

from qseek.models.detection import EventDetection, Receiver
from qseek.models.station import Stations
from qseek.octree import Octree


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -158,7 +160,7 @@ def get_magnitude(

try:
modelled_amplitude = await store.get_amplitude(
source_depth=event.depth,
source_depth=event.effective_depth,
distance=station_amplitudes.distance_epi,
n_amplitudes=25,
peak_amplitude=peak_amplitude,
Expand Down Expand Up @@ -211,7 +213,7 @@ class MomentMagnitudeExtractor(EventMagnitudeCalculator):

seconds_before: PositiveFloat = 10.0
seconds_after: PositiveFloat = 10.0
padding_seconds: PositiveFloat = 10.0
padding_seconds: PositiveFloat = 5.0

gf_store_dirs: list[DirectoryPath] = [Path(".")]

Expand All @@ -231,6 +233,17 @@ def model_post_init(self, __context: Any) -> None:
logger.info("Loading peak amplitude stores...")
self._stores = [cache.get_store(definition) for definition in self.models]

async def prepare(self, octree: Octree, stations: Stations) -> None:
logger.info("Preparing peak amplitude stores...")
root_nodes = octree.get_root_nodes(octree.size_initial)
depths = np.array([node.as_location().effective_depth for node in root_nodes])

for store in self._stores:
for depth in depths:
if store.has_depth(depth):
continue
await store.fill_source_depth(depth)

async def add_magnitude(
self,
squirrel: Squirrel,
Expand All @@ -241,27 +254,36 @@ async def add_magnitude(
receivers = list(event.receivers)
for store, definition in zip(self._stores, self.models, strict=True):
store_receivers = definition.filter_receivers(receivers)
if not store_receivers:
continue
for rcv in store_receivers:
receivers.remove(rcv)

if not store_receivers:
continue

if definition.max_distance is not None:
store_receivers = [
rcv
for rcv in store_receivers
if rcv.surface_distance_to(event) <= definition.max_distance
]

traces = await event.receivers.get_waveforms_restituted(
squirrel,
receivers=store_receivers,
quantity=store.quantity,
seconds_before=self.seconds_before,
seconds_after=self.seconds_after,
demean=True,
cut_off_fade=True,
seconds_fade=self.padding_seconds,
cut_off_fade=True,
)
if not traces:
logger.warning("No restituted traces found for event %s", event.time)
return

for tr in traces:
tr.highpass(4, store.frequency_range.min)
tr.lowpass(4, store.frequency_range.max)
if store.frequency_range.min:
tr.highpass(4, store.frequency_range.min, demean=True)
tr.lowpass(4, store.frequency_range.max, demean=True)

grouped_traces = []
receivers = []
Expand All @@ -270,13 +292,14 @@ async def add_magnitude(
):
grouped_traces.append(list(grp_traces))
receivers.append(event.receivers.get_receiver(nsl))
await moment_magnitude.add_traces(
store=store,
event=event,
receivers=receivers,
traces=grouped_traces,
peak_amplitude=definition.peak_amplitude,
)

await moment_magnitude.add_traces(
store=store,
event=event,
receivers=receivers,
traces=grouped_traces,
peak_amplitude=definition.peak_amplitude,
)

if not moment_magnitude.average:
logger.warning("No moment magnitude found for event %s", event.time)
Expand Down
72 changes: 63 additions & 9 deletions src/qseek/magnitudes/moment_magnitude_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,19 @@ def from_selector(cls, selector: PeakAmplitudesBase) -> Self:
def source_depth_range(self) -> Range:
return Range.from_list([sa.source_depth for sa in self.site_amplitudes])

def has_depth(self, depth: float) -> bool:
"""
Check if the moment magnitude store has a given depth.
Args:
depth (float): The depth to check.
Returns:
bool: True if the store has the given depth, False otherwise.
"""
depths = [sa.source_depth for sa in self.site_amplitudes]
return depth in depths

def get_store(self) -> gf.Store:
"""
Load the GF store for the given store ID.
Expand All @@ -489,12 +502,10 @@ def get_store(self) -> gf.Store:
config = store.config
if not isinstance(config, gf.ConfigTypeA):
raise EnvironmentError("GF store is not of type ConfigTypeA.")

if 1.0 / config.deltat < self.frequency_range.max:
raise ValueError(
f"Pyrocko GF store frequency {1.0 / config.deltat} too low."
)

return store

def is_suited(self, selector: PeakAmplitudesBase) -> bool:
Expand All @@ -513,13 +524,17 @@ def is_suited(self, selector: PeakAmplitudesBase) -> bool:
and self.gf_interpolation == selector.gf_interpolation
and self.quantity == selector.quantity
and self.reference_magnitude == selector.reference_magnitude
and self.rupture_velocities.min <= selector.rupture_velocities.min
and self.rupture_velocities.max >= selector.rupture_velocities.max
and self.stress_drop.min <= selector.stress_drop.min
and self.stress_drop.max >= selector.stress_drop.max
and self.rupture_velocities.min == selector.rupture_velocities.min
and self.rupture_velocities.max == selector.rupture_velocities.max
and self.stress_drop.min == selector.stress_drop.min
and self.stress_drop.max == selector.stress_drop.max
)
if selector.frequency_range:
result = result and self.frequency_range.max <= selector.frequency_range.max
result = (
result
and self.frequency_range.min == selector.frequency_range.min
and self.frequency_range.max == selector.frequency_range.max
)
return result

def _get_random_source(self, depth: float) -> gf.MTSource:
Expand Down Expand Up @@ -822,25 +837,64 @@ def save(self, path: Path | None = None) -> None:
file.write_text(self.model_dump_json())


class CacheStats(NamedTuple):
path: Path
n_stores: int
bytes: int


class PeakAmplitudeStoreCache:
cache_dir: Path
engine: gf.LocalEngine

def __init__(self, cache_dir: Path, engine: gf.LocalEngine | None = None) -> None:
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
logger.info("cache stats: %s", self.cache_stats())
self.clean_cache()

self.engine = engine or gf.LocalEngine(store_superdirs=["."])
PeakAmplitudesStore.set_engine(engine)

def clear_cache(self):
"""
Clear the cache directory.
This method deletes all files in the cache directory.
"""
cache_dir = self.cache_dir / "site_amplitudes"
for file in cache_dir.glob("*"):
logger.info("clearing cache directory %s", self.cache_dir)
for file in self.cache_dir.glob("*"):
file.unlink()

def clean_cache(self, keep_files: int = 100) -> None:
"""
Clean the cache directory.
Args:
keep_files (int, optional): The number of most recent files to keep in the
cache directory. Defaults to 100.
"""
files = sorted(self.cache_dir.glob("*"), key=lambda f: f.stat().st_mtime)
if len(files) <= keep_files:
return
logger.info("cleaning cache directory %s", self.cache_dir)
for file in files[keep_files:]:
file.unlink()

def cache_stats(self) -> CacheStats:
"""
Get the cache statistics.
Returns:
CacheStats: The cache statistics.
"""
n_stores = 0
nbytes = 0
for file in self.cache_dir.glob("*.json"):
n_stores += 1
nbytes += file.stat().st_size
return CacheStats(path=self.cache_dir, n_stores=n_stores, bytes=nbytes)

def get_cached_stores(
self, store_id: str, quantity: MeasurementUnit
) -> list[PeakAmplitudesStore]:
Expand Down
6 changes: 3 additions & 3 deletions src/qseek/octree.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def check_limits(self) -> Octree:

def model_post_init(self, __context: Any) -> None:
"""Initialize octree. This method is called by the pydantic model"""
self._root_nodes = self._get_root_nodes(self.size_initial)
self._root_nodes = self.get_root_nodes(self.size_initial)
logger.info(
"initializing octree volume with %d nodes and %.1f km³,"
" smallest node size: %.1f m",
Expand All @@ -303,7 +303,7 @@ def extent(self) -> tuple[float, float, float]:
self.depth_bounds.max - self.depth_bounds.min,
)

def _get_root_nodes(self, length: float) -> list[Node]:
def get_root_nodes(self, length: float) -> list[Node]:
ln = length
ext_east, ext_north, ext_depth = self.extent()
east_nodes = np.arange(ext_east // ln) * ln + ln / 2 + self.east_bounds.min
Expand Down Expand Up @@ -346,7 +346,7 @@ def reset(self) -> Self:
"""Reset the octree to its initial state"""
logger.debug("resetting tree")
self._clear_cache()
self._root_nodes = self._get_root_nodes(self.size_initial)
self._root_nodes = self.get_root_nodes(self.size_initial)
return self

def reduce_surface(self, accumulator: Callable = np.max) -> np.ndarray:
Expand Down
2 changes: 2 additions & 0 deletions src/qseek/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ async def prepare(self) -> None:
phases=self.image_functions.get_phases(),
rundir=self._rundir,
)
for magnitude in self.magnitudes:
await magnitude.prepare(self.octree, self.stations)
self.init_boundaries()

async def start(self, force_rundir: bool = False) -> None:
Expand Down

0 comments on commit 65b8aef

Please sign in to comment.