Skip to content

Commit

Permalink
Merge pull request #9882 from gem/savemem2
Browse files Browse the repository at this point in the history
Reduced by G times the memory in event_based calculations
  • Loading branch information
micheles authored Aug 1, 2024
2 parents 6fb5dcd + 30b9aff commit 5743bb2
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 159 deletions.
9 changes: 6 additions & 3 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
[Michele Simionato]
* Optimized the calculation of mean and stdevs in event based calculations, with
a speedup of 13x for the EUR model

[Chris di Caprio]
* Allowed extrapolation in the Kuehn (2020) GMPEs to solve numeric issues

Expand All @@ -7,10 +11,9 @@
"object has no attribute msparams"
* Fixed a critical memory bug causing over 80 GB per core to be needed for
event based calculations with ~6 million sites
* Made `minimum_magnitude` and `minimum_intensity` mandatory in event based
calculations
* Made `minimum_intensity` mandatory in event based calculations
* Reduced the memory consumption in event_based calculations: now even calculations
with 5 million sites can be run using ~4 GB per core
with 5 million sites can be run with ~2 GB per core
* Reduced the memory occupation in `gen_poes`
* Using half the memory in postclassical by using 32 bit arrays
* Using half the memory on Windows by using half the threads by default
Expand Down
8 changes: 3 additions & 5 deletions openquake/calculators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,11 +914,9 @@ def _read_risk1(self):
if self.sitecol and oq.imtls:
logging.info('Read N=%d hazard sites and L=%d hazard levels',
len(self.sitecol), oq.imtls.size)
if oq.calculation_mode.startswith(('event_based', 'ebrisk')) and self.N > 1000:
if oq.minimum_magnitude == {'default': 0}:
oq.raise_invalid(f'minimum_magnitude must be set, see {EBDOC}')
if len(oq.min_iml) == 0:
oq.raise_invalid(f'minimum_intensity must be set, see {EBDOC}')
if (oq.calculation_mode.startswith(('event_based', 'ebrisk')) and
self.N > 1000 and len(oq.min_iml) == 0):
oq.raise_invalid(f'minimum_intensity must be set, see {EBDOC}')

if oq_hazard:
parent = self.datastore.parent
Expand Down
11 changes: 2 additions & 9 deletions openquake/calculators/event_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,9 @@ def _event_based(proxies, cmaker, stations, srcfilter, shr,
if stations and stations[0] is not None: # conditioned GMFs
assert cmaker.scenario
with shr['mea'] as mea, shr['tau'] as tau, shr['phi'] as phi:
df = computer.compute_all([mea, tau, phi], cmon, umon)
df = computer.compute_all([mea, tau, phi], max_iml, mmon, cmon, umon)
else: # regular GMFs
with mmon:
# shape (4, G, M, N) for 1M sites, 10 IMTs and 10 GSIMs
# gives 3 GB of RAM
mean_stds = cmaker.get_mean_stds(
[computer.ctx], split_by_mag=False)
# avoid numba type error
computer.ctx.flags.writeable = True
df = computer.compute_all(mean_stds, max_iml, cmon, umon)
df = computer.compute_all(None, max_iml, mmon, cmon, umon)
sig_eps.append(computer.build_sig_eps(se_dt))
dt = time.time() - t0
times.append((proxy['id'], computer.ctx.rrup.min(), dt))
Expand Down
61 changes: 1 addition & 60 deletions openquake/hazardlib/calc/conditioned_gmfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,9 @@
from dataclasses import dataclass

import numpy
from openquake.baselib.general import AccumDict
from openquake.baselib.performance import Monitor
from openquake.hazardlib import correlation, cross_correlation
from openquake.hazardlib.imt import from_string
from openquake.hazardlib.calc.gmf import GmfComputer, exp
from openquake.hazardlib.calc.gmf import GmfComputer
from openquake.hazardlib.const import StdDev
from openquake.hazardlib.geo.geodetic import geodetic_distance
from openquake.hazardlib.contexts import ContextMaker
Expand Down Expand Up @@ -201,7 +199,6 @@ def __init__(
self.cross_correl_between = (
cross_correl_between or cross_correlation.GodaAtkinson2009())
self.cross_correl_within = cross_correlation.BakerJayaram2008()
self.correlation_cutoff = cmaker.oq.correlation_cutoff
self.rupture = rupture
self.sitecol = sitecol
self.station_sitecol = station_sitecol
Expand All @@ -225,62 +222,6 @@ def get_mea_tau_phi(self):
self.cross_correl_between, self.cross_correl_within,
self.cmaker.maximum_distance, sigma=False)

def compute_all(self, mea_tau_phi, cmon=Monitor(), umon=Monitor()):
"""
:returns: (dict with fields eid, sid, gmv_X, ...), dt
"""
self.init_eid_rlz_sig_eps()
data = AccumDict(accum=[])
rng = numpy.random.default_rng(self.seed)
for g, (gsim, rlzs) in enumerate(self.cmaker.gsims.items()):
mea = mea_tau_phi[0][g]
tau = mea_tau_phi[1][g]
phi = mea_tau_phi[2][g]
with cmon:
array = self.compute(gsim, rlzs, mea, tau, phi, rng)
with umon:
self.update(data, array, rlzs, [mea])
with umon:
df = self.strip_zeros(data)
return df

def compute(self, gsim, rlzs, mea, tau, phi, rng):
"""
:param gsim: GSIM used to compute mean_stds
:param rlzs: realizations associated to the gsim
:param mea: array of shape (M, N, 1)
:param tau: array of shape (M, N, N)
:param phi: array of shape (M, N, N)
:returns: a 32 bit array of shape (N, M, E)
"""
M, N, _ = mea.shape
E = numpy.isin(self.rlz, rlzs).sum()
result = numpy.zeros((M, N, E), F32)
for m, im in enumerate(self.cmaker.imtls):
mu_Y_yD = mea[m]
cov_WY_WY_wD = tau[m]
cov_BY_BY_yD = phi[m]
result[m] = self._compute(
mu_Y_yD, cov_WY_WY_wD, cov_BY_BY_yD, im, E, rng)
if self.amplifier:
self.amplifier.amplify_gmfs(
self.ctx.ampcode, result, self.imts, self.seed)
return result.transpose(1, 0, 2)

def _compute(self, mu_Y, cov_WY_WY, cov_BY_BY, imt, num_events, rng):
eps = self.correlation_cutoff
if self.cmaker.truncation_level <= 1E-9:
gmf = exp(mu_Y, imt != "MMI")
gmf = gmf.repeat(num_events, axis=1)
else:
# add a cutoff to remove negative eigenvalues
cov_Y_Y = cov_WY_WY + cov_BY_BY + numpy.eye(len(cov_WY_WY)) * eps
arr = rng.multivariate_normal(
mu_Y.flatten(), cov_Y_Y, size=num_events,
check_valid="raise", tol=1e-5, method="cholesky")
gmf = exp(arr, imt != "MMI").T
return gmf # shapes (N, E)


@dataclass
class TempResult:
Expand Down
1 change: 0 additions & 1 deletion openquake/hazardlib/calc/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def get_distances(rupture, sites, param):
dist = numpy.zeros_like(sites.lons)
else:
raise ValueError('Unknown distance measure %r' % param)
dist.flags.writeable = False
return dist


Expand Down
120 changes: 69 additions & 51 deletions openquake/hazardlib/calc/gmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ def __init__(self, rupture, sitecol, cmaker, correlation_model=None,
self.cross_correl = cross_correl or NoCrossCorrelation(
cmaker.truncation_level)
self.gmv_fields = [f'gmv_{m}' for m in range(len(cmaker.imts))]
self.mmi_index = -1
for m, imt in enumerate(cmaker.imtls):
if imt == 'MMI':
self.mmi_index = m

def init_eid_rlz_sig_eps(self):
"""
Expand All @@ -252,25 +256,19 @@ def build_sig_eps(self, se_dt):
sig_eps[f'eps_inter_{imt}'] = self.eps[:, m]
return sig_eps

def update(self, data, array, rlzs, mean_stds, max_iml=None):
def update(self, data, array, rlzs, mean, max_iml=None):
"""
Updates the data dictionary with the values coming from the array
of GMVs. Also indirectly updates the arrays .sig and .eps.
"""
min_iml = self.cmaker.min_iml
mag = self.ebrupture.rupture.mag
mean = mean_stds[0]
if len(mean.shape) == 3: # shape (M, N, 1) for conditioned gmfs
mean = mean[:, :, 0]
mmi_index = -1
for m, imt in enumerate(self.cmaker.imtls):
if imt == 'MMI':
mmi_index = m
if max_iml is None:
M = len(self.cmaker.imts)
max_iml = numpy.full(M, numpy.inf, float)
max_iml = numpy.full(self.M, numpy.inf, float)

set_max_min(array, mean, max_iml, min_iml, mmi_index)
set_max_min(array, mean, max_iml, min_iml, self.mmi_index)
data['gmv'].append(array)

if self.sec_perils:
Expand Down Expand Up @@ -310,11 +308,12 @@ def strip_zeros(self, data):
# remove the rows with all zero values
return df[ok]

def compute_all(self, mean_stds, max_iml=None,
cmon=Monitor(), umon=Monitor()):
def compute_all(self, mean_stds=None, max_iml=None,
mmon=Monitor(), cmon=Monitor(), umon=Monitor()):
"""
:returns: DataFrame with fields eid, rlz, sid, gmv_X, ...
"""
conditioned = mean_stds is not None
self.init_eid_rlz_sig_eps()
rng = numpy.random.default_rng(self.seed)
data = AccumDict(accum=[])
Expand All @@ -323,46 +322,62 @@ def compute_all(self, mean_stds, max_iml=None,
E = len(idxs)
if E == 0: # crucial for performance
continue
if mean_stds is None:
with mmon:
ms = self.cmaker.get_4MN([self.ctx], gs)
else: # conditioned
ms = (mean_stds[0][g], mean_stds[1][g], mean_stds[2][g])
with cmon:
array = self.compute(gs, idxs, mean_stds[:, g], rng) # NME
E = len(idxs)
result = numpy.zeros(
(len(self.imts), len(self.ctx.sids), E), F32)
ccdist = self.cross_correl.distribution
if conditioned:
intra_eps = [None] * self.M
else:
# arrays of random numbers of shape (M, N, E) and (M, E)
intra_eps = [ccdist.rvs((self.N, E), rng)
for _ in range(self.M)]
self.eps[idxs] = self.cross_correl.get_inter_eps(
self.imts, E, rng).T
for m, imt in enumerate(self.imts):
try:
result[m] = self._compute(
[arr[m] for arr in ms], m, imt, gs, intra_eps[m],
idxs, rng)
except Exception as exc:
raise RuntimeError(
'(%s, %s, %s): %s' %
(gs, imt, exc.__class__.__name__, exc)
).with_traceback(exc.__traceback__)
if self.amplifier:
self.amplifier.amplify_gmfs(
self.ctx.ampcode, result, self.imts, self.seed)
with umon:
self.update(data, array, rlzs, mean_stds[:, g], max_iml)
result = result.transpose(1, 0, 2)
self.update(data, result, rlzs, ms[0], max_iml)
with umon:
return self.strip_zeros(data)

def compute(self, gsim, idxs, mean_stds, rng):
"""
:param gsim: GSIM used to compute mean_stds
:param idxs: affected indices
:param mean_stds: array of shape (4, M, N)
:param rng: random number generator for the rupture
:returns: a 32 bit array of shape (N, M, E)
"""
# sets self.eps
M = len(self.imts)
E = len(idxs)
result = numpy.zeros(
(len(self.imts), len(self.ctx.sids), E), F32)
ccdist = self.cross_correl.distribution
# build arrays of random numbers of shape (M, N, E) and (M, E)
intra_eps = [ccdist.rvs((self.N, E), rng) for _ in range(M)]
self.eps[idxs] = self.cross_correl.get_inter_eps(self.imts, E, rng).T
for m, imt in enumerate(self.imts):
try:
result[m] = self._compute(
mean_stds[:, m], m, imt, gsim, intra_eps[m], idxs)
except Exception as exc:
raise RuntimeError(
'(%s, %s, %s): %s' %
(gsim, imt, exc.__class__.__name__, exc)
).with_traceback(exc.__traceback__)
if self.amplifier:
self.amplifier.amplify_gmfs(
self.ctx.ampcode, result, self.imts, self.seed)
return result.transpose(1, 0, 2)

def _compute(self, mean_stds, m, imt, gsim, intra_eps, idxs):
# sets self.sig, returns gmf
def _compute(self, mean_stds, m, imt, gsim, intra_eps, idxs, rng=None):
if len(mean_stds) == 3: # conditioned GMFs
# mea, tau, phi with shapes (N,1), (N,N), (N,N)
mu_Y, cov_WY_WY, cov_BY_BY = mean_stds
E = len(idxs)
eps = self.cmaker.oq.correlation_cutoff
if self.cmaker.truncation_level <= 1E-9:
gmf = exp(mu_Y, imt.string != "MMI")
gmf = gmf.repeat(E, axis=1)
else:
# add a cutoff to remove negative eigenvalues
cov_Y_Y = cov_WY_WY + cov_BY_BY + numpy.eye(len(cov_WY_WY)) * eps
arr = rng.multivariate_normal(
mu_Y.flatten(), cov_Y_Y, size=E,
check_valid="raise", tol=1e-5, method="cholesky")
gmf = exp(arr, imt != "MMI").T
return gmf # shapes (N, E)

# regular case, sets self.sig, returns gmf
im = imt.string
mean, sig, tau, phi = mean_stds
if self.cmaker.truncation_level <= 1E-9:
Expand Down Expand Up @@ -454,9 +469,12 @@ def ground_motion_fields(rupture, sites, imts, gsim, truncation_level,
ebr = EBRupture(
rupture, source_id=0, trt_smr=0, n_occ=realizations, id=0, e0=0)
ebr.seed = seed
N, E = len(sites), realizations
gc = GmfComputer(ebr, sites, cmaker, correlation_model)
mean_stds = cmaker.get_mean_stds([gc.ctx])[:, 0] # shape (4, M, N)
gc.init_eid_rlz_sig_eps()
res = gc.compute(gsim, U32([0]), mean_stds,
numpy.random.default_rng(seed))
return {imt: res[:, m] for m, imt in enumerate(gc.imts)}
df = gc.compute_all()
res = {}
for m, imt in enumerate(gc.imts):
res[imt] = arr = numpy.zeros((N, E), F32)
for sid, eid, gmv in zip(df.sid, df.eid, df[f'gmv_{m}']):
arr[sid, eid] = gmv
return res
Loading

0 comments on commit 5743bb2

Please sign in to comment.