diff --git a/src/qseek/apps/qseek.py b/src/qseek/apps/qseek.py index 9745d6e9..59056af1 100644 --- a/src/qseek/apps/qseek.py +++ b/src/qseek/apps/qseek.py @@ -224,14 +224,14 @@ async def extract() -> None: iterator = asyncio.as_completed( tuple( search.add_magnitude_and_features(detection) - for detection in search._detections + for detection in search._catalog ) ) for result in track( iterator, description="Extracting features", - total=search._detections.n_detections, + total=search._catalog.n_events, ): event = await result if event.magnitudes: @@ -239,8 +239,8 @@ async def extract() -> None: print(f"{mag.magnitude} {mag.average:.2f}±{mag.error:.2f}") print("--") - await search._detections.save() - await search._detections.export_detections( + await search._catalog.save() + await search._catalog.export_detections( jitter_location=search.octree.smallest_node_size() ) diff --git a/src/qseek/models/catalog.py b/src/qseek/models/catalog.py index d35ee13a..f2803628 100644 --- a/src/qseek/models/catalog.py +++ b/src/qseek/models/catalog.py @@ -40,16 +40,16 @@ def _populate_table(self, table: Table) -> None: class EventCatalog(BaseModel): rundir: Path - detections: list[EventDetection] = [] + events: list[EventDetection] = [] _stats: EventCatalogStats = PrivateAttr(default_factory=EventCatalogStats) def model_post_init(self, __context: Any) -> None: EventDetection.set_rundir(self.rundir) @property - def n_detections(self) -> int: + def n_events(self) -> int: """Number of detections""" - return len(self.detections) + return len(self.events) @property def markers_dir(self) -> Path: @@ -64,18 +64,18 @@ def csv_dir(self) -> Path: return dir async def add(self, detection: EventDetection) -> None: - detection.set_index(self.n_detections) + detection.set_index(self.n_events) markers_file = self.markers_dir / f"{time_to_path(detection.time)}.list" self.markers_dir.mkdir(exist_ok=True) detection.export_pyrocko_markers(markers_file) - self.detections.append(detection) + self.events.append(detection) logger.info( "%s event detection %d %s: %.5f°, %.5f°, depth %.1f m, " "border distance %.1f m, semblance %.3f, magnitude %.2f", Symbols.Target, - self.n_detections, + self.n_events, detection.time, *detection.effective_lat_lon, detection.depth, @@ -116,24 +116,24 @@ def load_rundir(cls, rundir: Path) -> EventCatalog: for idx, line in enumerate(f): detection = EventDetection.model_validate_json(line) detection.set_index(idx) - catalog.detections.append(detection) + catalog.events.append(detection) - logger.info("loaded %d detections", catalog.n_detections) + logger.info("loaded %d detections", catalog.n_events) stats = catalog._stats - stats.n_detections = catalog.n_detections + stats.n_detections = catalog.n_events if catalog: stats.max_semblance = max(detection.semblance for detection in catalog) return catalog async def save(self) -> None: """Save catalog to current rundir.""" - logger.debug("saving %d detections", self.n_detections) + logger.debug("saving %d detections", self.n_events) lines_events = [] lines_recv = [] # Has to be the unsorted - for detection in self.detections: + for detection in self.events: lines_events.append(f"{detection.model_dump_json(exclude={'receivers'})}\n") lines_recv.append(f"{detection.receivers.model_dump_json()}\n") @@ -179,7 +179,7 @@ async def export_csv(self, file: Path, jitter_location: float = 0.0) -> None: if jitter_location: detections = [det.jitter_location(jitter_location) for det in self] else: - detections = self.detections + detections = self.events csv_dicts: list[dict] = [] for detection in detections: @@ -207,7 +207,7 @@ def export_pyrocko_events( filename (Path): output filename """ logger.info("saving Pyrocko events to %s", filename) - detections = self.detections + detections = self.events if jitter_location: detections = [det.jitter_location(jitter_location) for det in detections] dump_events( @@ -229,4 +229,4 @@ def export_pyrocko_markers(self, filename: Path) -> None: marker.save_markers(pyrocko_markers, str(filename)) def __iter__(self) -> Iterator[EventDetection]: - return iter(sorted(self.detections, key=lambda d: d.time)) + return iter(sorted(self.events, key=lambda d: d.time)) diff --git a/src/qseek/models/semblance.py b/src/qseek/models/semblance.py index d91c2633..1fba5c83 100644 --- a/src/qseek/models/semblance.py +++ b/src/qseek/models/semblance.py @@ -93,10 +93,12 @@ def __init__( start_time: datetime, sampling_rate: float, padding_samples: int = 0, + exponent: float = 1.0, ) -> None: self.sampling_rate = sampling_rate self.padding_samples = padding_samples self.n_samples_unpadded = n_samples + self.exponent = exponent self._start_time = start_time self._node_hashes = [node.hash() for node in nodes] @@ -149,6 +151,8 @@ def maximum_semblance(self) -> np.ndarray: """ if self._max_semblance is None: self._max_semblance = self.semblance.max(axis=0) + if self.exponent != 1.0: + self._max_semblance **= self.exponent return self._max_semblance @property @@ -164,10 +168,46 @@ def get_cache(self) -> dict[bytes, np.ndarray]: for i, node_hash in enumerate(self._node_hashes) } + def get_semblance(self, time_idx: int) -> np.ndarray: + """ + Get the semblance values at a specific time index. + + Parameters: + time_idx (int): The index of the desired time. + + Returns: + np.ndarray: The semblance values at the specified time index. + """ + semblance = self.semblance[:, time_idx] + if self.exponent != 1.0: + semblance **= self.exponent + return semblance + def get_cache_mask(self, cache: dict[bytes, np.ndarray]) -> np.ndarray: + """ + Returns a boolean mask indicating whether each node hash + in self._node_hashes is present in the cache. + + Args: + cache (dict[bytes, np.ndarray]): The cache dictionary containing node + hashes as keys. + + Returns: + np.ndarray: A boolean mask indicating whether each node hash is + present in the cache. + """ return np.array([hash in cache for hash in self._node_hashes]) - def apply_cache(self, cache: dict[bytes, np.ndarray]): + def apply_cache(self, cache: dict[bytes, np.ndarray]) -> None: + """ + Applies the cached data to the `semblance_unpadded` array. + + Args: + cache (dict[bytes, np.ndarray]): The cache containing the cached data. + + Returns: + None + """ if not cache: return mask = self.get_cache_mask(cache) @@ -176,7 +216,10 @@ def apply_cache(self, cache: dict[bytes, np.ndarray]): self.semblance_unpadded[mask, :] = np.stack(data) def maximum_node_semblance(self) -> np.ndarray: - return self.semblance.max(axis=1) + semblance = self.semblance.max(axis=1) + if self.exponent != 1.0: + semblance **= self.exponent + return semblance async def maxima_node_idx(self, nparallel: int = 6) -> np.ndarray: """Indices of maximum semblance at any time step. @@ -219,7 +262,7 @@ def apply_exponent(self, exponent: float) -> None: """ if exponent == 1.0: return - self.semblance_unpadded **= exponent + np.power(self.semblance_unpadded, exponent, out=self.semblance_unpadded) self._clear_cache() def median_mask(self, level: float = 3.0) -> np.ndarray: @@ -256,9 +299,16 @@ async def find_peaks( Returns: tuple[np.ndarray, np.ndarray]: Indices of peaks and peak values. """ + max_semblance_unpadded = await asyncio.to_thread( + self.semblance_unpadded.max, + axis=0, + ) + if self.exponent != 1.0: + max_semblance_unpadded **= self.exponent + detection_idx, _ = await asyncio.to_thread( signal.find_peaks, - self.semblance_unpadded.max(axis=0), + max_semblance_unpadded, height=height, prominence=prominence, distance=distance, @@ -269,7 +319,10 @@ async def find_peaks( detection_idx = detection_idx[detection_idx < self.maximum_semblance.size] semblance = self.maximum_semblance[detection_idx] else: - maximum_semblance = self.semblance_unpadded.max(axis=0) + maximum_semblance = await asyncio.to_thread( + self.semblance_unpadded.max, + axis=0, + ) semblance = maximum_semblance[detection_idx] return detection_idx, semblance diff --git a/src/qseek/models/station.py b/src/qseek/models/station.py index e4ffc10a..25712509 100644 --- a/src/qseek/models/station.py +++ b/src/qseek/models/station.py @@ -43,7 +43,21 @@ def from_pyrocko_station(cls, station: PyrockoStation) -> Station: ) def as_pyrocko_station(self) -> PyrockoStation: - return PyrockoStation(**self.model_dump(exclude={"effective_lat_lon"})) + return PyrockoStation( + **self.model_dump( + include={ + "network", + "station", + "location", + "lat", + "lon", + "north_shift", + "east_shift", + "depth", + "elevation", + } + ) + ) @property def nsl(self) -> NSL: diff --git a/src/qseek/octree.py b/src/qseek/octree.py index 6dc21c0a..74162f01 100644 --- a/src/qseek/octree.py +++ b/src/qseek/octree.py @@ -363,8 +363,8 @@ def set_level(self, level: int): raise ValueError( f"invalid level {level}, expected level <= {self.n_levels()}" ) - logger.debug("setting tree to level %d", level) self.reset() + logger.debug("setting tree to level %d", level) for _ in range(level): for node in self: node.split() diff --git a/src/qseek/search.py b/src/qseek/search.py index 434616d6..aef9a174 100644 --- a/src/qseek/search.py +++ b/src/qseek/search.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import cProfile import logging from collections import deque from datetime import datetime, timedelta, timezone @@ -53,6 +54,7 @@ logger = logging.getLogger(__name__) SamplingRate = Literal[10, 20, 25, 50, 100] +p = cProfile.Profile() class SearchStats(Stats): @@ -269,7 +271,7 @@ class Search(BaseModel): ] = PrivateAttr({}) _last_detection_export: int = 0 - _detections: EventCatalog = PrivateAttr() + _catalog: EventCatalog = PrivateAttr() _config_stem: str = PrivateAttr("") _rundir: Path = PrivateAttr() @@ -304,7 +306,11 @@ def init_rundir(self, force: bool = False) -> None: self._init_logging() logger.info("created new rundir %s", rundir) - self._detections = EventCatalog(rundir=rundir) + self._catalog = EventCatalog(rundir=rundir) + + @property + def catalog(self) -> EventCatalog: + return self._catalog def _init_logging(self) -> None: file_logger = logging.FileHandler(self._rundir / "qseek.log") @@ -329,7 +335,7 @@ def set_progress(self, time: datetime) -> None: progress_file = self._rundir / "progress.json" progress_file.write_text(self._progress.model_dump_json()) - def init_boundaries(self) -> None: + async def init_boundaries(self) -> None: """Initialise search.""" # Grid/receiver distances distances = self.octree.distances_stations(self.stations) @@ -339,7 +345,11 @@ def init_boundaries(self) -> None: for phase, tracer in self.ray_tracers.iter_phase_tracer( phases=self.image_functions.get_phases() ): - traveltimes = tracer.get_travel_times(phase, self.octree, self.stations) + traveltimes = await tracer.get_travel_times( + phase, + self.octree, + self.stations, + ) self._travel_time_ranges[phase] = ( timedelta(seconds=np.nanmin(traveltimes)), timedelta(seconds=np.nanmax(traveltimes)), @@ -410,7 +420,7 @@ async def prepare(self) -> None: ) for magnitude in self.magnitudes: await magnitude.prepare(self.octree, self.stations) - self.init_boundaries() + await self.init_boundaries() async def start(self, force_rundir: bool = False) -> None: if not self.has_rundir(): @@ -435,6 +445,7 @@ async def start(self, force_rundir: bool = False) -> None: ) console = asyncio.create_task(RuntimeStats.live_view()) + p.enable() async for images, batch in self.image_functions.iter_images(waveform_iterator): batch_processing_start = datetime_now() @@ -449,7 +460,7 @@ async def start(self, force_rundir: bool = False) -> None: detections, semblance_trace = await search_block.search() - self._detections.save_semblance_trace(semblance_trace) + self._catalog.save_semblance_trace(semblance_trace) if detections: BackgroundTasks.create_task(self.new_detections(detections)) @@ -460,12 +471,14 @@ async def start(self, force_rundir: bool = False) -> None: ) self.set_progress(batch.end_time) + p.dump_stats("qseek.prof") + p.disable() - await BackgroundTasks.wait_all() + # await BackgroundTasks.wait_all() console.cancel() - await self._detections.export_detections(jitter_location=self.octree.size_limit) + await self._catalog.export_detections(jitter_location=self.octree.size_limit) logger.info("finished search in %s", datetime_now() - processing_start) - logger.info("found %d detections", self._detections.n_detections) + logger.info("found %d detections", self._catalog.n_events) async def new_detections(self, detections: list[EventDetection]) -> None: """ @@ -479,17 +492,17 @@ async def new_detections(self, detections: list[EventDetection]) -> None: ) for detection in detections: - await self._detections.add(detection) + await self._catalog.add(detection) await self._new_detection.emit(detection) if ( - self._detections.n_detections - and self._detections.n_detections - self._last_detection_export > 100 + self._catalog.n_events + and self._catalog.n_events - self._last_detection_export > 100 ): - await self._detections.export_detections( + await self._catalog.export_detections( jitter_location=self.octree.smallest_node_size() ) - self._last_detection_export = self._detections.n_detections + self._last_detection_export = self._catalog.n_events async def add_magnitude_and_features(self, event: EventDetection) -> EventDetection: """ @@ -521,7 +534,7 @@ def load_rundir(cls, rundir: Path) -> Self: search_file = rundir / "search.json" search = cls.model_validate_json(search_file.read_bytes()) search._rundir = rundir - search._detections = EventCatalog.load_rundir(rundir) + search._catalog = EventCatalog.load_rundir(rundir) progress_file = rundir / "progress.json" if progress_file.exists(): @@ -600,7 +613,11 @@ async def calculate_semblance( logger.debug("stacking image %s", image.image_function.name) parent = self.parent - traveltimes = ray_tracer.get_travel_times(image.phase, octree, image.stations) + traveltimes = await ray_tracer.get_travel_times( + image.phase, + octree, + image.stations, + ) if parent.station_corrections: station_delays = await parent.station_corrections.get_delays( @@ -686,6 +703,7 @@ async def search( start_time=self.start_time, sampling_rate=sampling_rate, padding_samples=padding_samples, + exponent=1.0 / parent.image_mean_p, ) for image in images: @@ -699,13 +717,12 @@ async def search( # Applying the generalized mean to the semblance semblance.normalize(images.cumulative_weight()) - semblance.apply_exponent(1.0 / parent.image_mean_p) semblance.apply_cache(semblance_cache or {}) # Apply after normalization detection_idx, detection_semblance = await semblance.find_peaks( - height=parent.detection_threshold**parent.image_mean_p, - prominence=parent.detection_threshold**parent.image_mean_p, + height=parent.detection_threshold, + prominence=parent.detection_threshold, distance=round(parent.detection_blinding.total_seconds() * sampling_rate), ) @@ -719,7 +736,7 @@ async def search( for time_idx, semblance_detection in zip( detection_idx, detection_semblance, strict=True ): - octree.map_semblance(semblance.semblance[:, time_idx]) + octree.map_semblance(semblance.get_semblance(time_idx)) node_idx = maxima_node_idx[time_idx] source_node = octree[node_idx] diff --git a/src/qseek/server.py b/src/qseek/server.py index 31f5d350..36475e55 100644 --- a/src/qseek/server.py +++ b/src/qseek/server.py @@ -120,14 +120,14 @@ async def get_index(self, request: web.Request) -> web.Response: async def get_detections(self, request: web.Request) -> web.Response: return pydantic_response( - self.search._detections, + self.search._catalog, exclude={"detections": {"__all__": {"octree", "stations"}}}, ) async def get_detection(self, request: web.Request) -> web.Response: req = await parse_pydantic_request(request, model=DetectionRequest) try: - detection = self.search._detections.get(req.uid) + detection = self.search._catalog.get(req.uid) except KeyError: return web.HTTPNotFound() return pydantic_response(detection) diff --git a/src/qseek/test.py b/src/qseek/test.py index ae8ee712..9be3045f 100644 --- a/src/qseek/test.py +++ b/src/qseek/test.py @@ -2,6 +2,6 @@ from qseek.models.catalog import EventCatalog detections = EventCatalog(rundir="test-qseek/") -detection = detections.detections[0] +detection = detections.events[0] plot.plot_detection(detection) diff --git a/src/qseek/tracers/base.py b/src/qseek/tracers/base.py index 40047ecb..36f67d9c 100644 --- a/src/qseek/tracers/base.py +++ b/src/qseek/tracers/base.py @@ -61,7 +61,7 @@ def get_travel_times_locations( [self.get_travel_time_location(phase, source, recv) for recv in receivers] ) - def get_travel_times( + async def get_travel_times( self, phase: str, octree: Octree, diff --git a/src/qseek/tracers/cake.py b/src/qseek/tracers/cake.py index 900adef4..b631536c 100644 --- a/src/qseek/tracers/cake.py +++ b/src/qseek/tracers/cake.py @@ -363,7 +363,7 @@ def _interpolate_traveltimes_sptree( coordinates, ) - def init_lut(self, octree: Octree, stations: Stations) -> None: + async def init_lut(self, octree: Octree, stations: Stations) -> None: logger.debug( "initializing LUT for %d stations and %d nodes", stations.n_stations, @@ -373,7 +373,7 @@ def init_lut(self, octree: Octree, stations: Stations) -> None: self._cached_station_indeces = { sta.nsl.pretty: idx for idx, sta in enumerate(stations) } - station_traveltimes = self.interpolate_travel_times(octree, stations) + station_traveltimes = await self.interpolate_travel_times(octree, stations) for node, traveltimes in zip(octree, station_traveltimes, strict=True): self._node_lut[node.hash()] = traveltimes.astype(np.float32) @@ -382,11 +382,11 @@ def lut_fill_level(self) -> float: """Return the fill level of the LUT as a float between 0.0 and 1.0""" return len(self._node_lut) / self._node_lut.get_size() - def fill_lut(self, nodes: Sequence[Node]) -> None: + async def fill_lut(self, nodes: Sequence[Node]) -> None: logger.debug("filling traveltimes LUT for %d nodes", len(nodes)) stations = self._cached_stations - traveltimes = self._interpolate_travel_times( + traveltimes = await self._interpolate_travel_times( surface_distances(nodes, stations), np.array([sta.effective_depth for sta in stations]), np.array([node.as_location().effective_depth for node in nodes]), @@ -395,7 +395,7 @@ def fill_lut(self, nodes: Sequence[Node]) -> None: for node, times in zip(nodes, traveltimes, strict=True): self._node_lut[node.hash()] = times.astype(np.float32) - def get_travel_times(self, octree: Octree, stations: Stations) -> np.ndarray: + async def get_travel_times(self, octree: Octree, stations: Stations) -> np.ndarray: try: station_indices = np.fromiter( (self._cached_station_indeces[sta.nsl.pretty] for sta in stations), @@ -418,7 +418,7 @@ def get_travel_times(self, octree: Octree, stations: Stations) -> np.ndarray: stations_travel_times.append(node_travel_times) if fill_nodes: - self.fill_lut(fill_nodes) + await self.fill_lut(fill_nodes) cache_hits, cache_misses = self._node_lut.get_stats() total_hits = cache_hits + cache_misses @@ -428,11 +428,11 @@ def get_travel_times(self, octree: Octree, stations: Stations) -> np.ndarray: self.lut_fill_level() * 100, cache_hit_rate * 100, ) - return self.get_travel_times(octree, stations) + return await self.get_travel_times(octree, stations) return np.asarray(stations_travel_times).astype(float, copy=False) - def interpolate_travel_times( + async def interpolate_travel_times( self, octree: Octree, stations: Stations, @@ -443,11 +443,11 @@ def interpolate_travel_times( [node.as_location().effective_depth for node in octree] ) - return self._interpolate_travel_times( + return await self._interpolate_travel_times( receiver_distances, receiver_depths, source_depths ) - def _interpolate_travel_times( + async def _interpolate_travel_times( self, receiver_distances: np.ndarray, receiver_depths: np.ndarray, @@ -593,7 +593,7 @@ async def prepare( tree = TravelTimeTree.new(timing=timing, **traveltime_tree_args) tree.save(self.cache_dir) - tree.init_lut(octree, stations) + await tree.init_lut(octree, stations) self._travel_time_trees[phase_descr] = tree def _get_sptree_model(self, phase: str) -> TravelTimeTree: diff --git a/src/qseek/tracers/constant_velocity.py b/src/qseek/tracers/constant_velocity.py index 9b2d3dd1..86b1fe5d 100644 --- a/src/qseek/tracers/constant_velocity.py +++ b/src/qseek/tracers/constant_velocity.py @@ -6,7 +6,7 @@ from pydantic import Field, PositiveFloat from qseek.tracers.base import ModelledArrival, RayTracer -from qseek.utils import PhaseDescription, log_call +from qseek.utils import PhaseDescription if TYPE_CHECKING: import numpy as np @@ -43,8 +43,7 @@ def get_travel_time_location( self._check_phase(phase) return source.distance_to(receiver) / self.velocity - @log_call - def get_travel_times( + async def get_travel_times( self, phase: str, octree: Octree, diff --git a/test/conftest.py b/test/conftest.py index 37fb0760..02514da8 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -161,4 +161,4 @@ def detections() -> Generator[EventCatalog, None, None]: ) detections.append(detection) with TemporaryDirectory() as tmpdir: - yield EventCatalog(rundir=Path(tmpdir), detections=detections) + yield EventCatalog(rundir=Path(tmpdir), events=detections)