Skip to content

Commit

Permalink
Made sure the Starmap always get the datastore
Browse files Browse the repository at this point in the history
  • Loading branch information
micheles committed Oct 21, 2023
1 parent b5eb287 commit 6b82a01
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 5 deletions.
1 change: 1 addition & 0 deletions openquake/baselib/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ def __init__(self, task_func, task_args=(), distribute=None,
match = re.search(r'(\d+)', os.path.basename(h5.filename))
self.calc_id = int(match.group(1))
else:
# TODO: see if we can forbid this case
self.calc_id = None
h5 = hdf5.File(gettemp(suffix='.hdf5'), 'w')
init_performance(h5)
Expand Down
8 changes: 4 additions & 4 deletions openquake/baselib/workerpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ def call(func, args, taskno, mon, executing):


def errback(job_id, task_no, exc):
# NB: job_id can be None if the Starmap was invoked without h5
from openquake.commonlib.logs import dbcmd
dbcmd('log', job_id, datetime.utcnow(), 'ERROR',
'%s/%s' % (job_id, task_no), str(exc))
raise exc
e = exc.__class__('in job %d, task %d' % (job_id, task_no))
raise e.with_traceback(exc.__traceback__)

Expand Down Expand Up @@ -287,9 +287,9 @@ def start(self):
ctrlsock.send(' '.join(executing))
elif isinstance(cmd, tuple):
func, args, taskno, mon = cmd
eback = functools.partial(errback, mon.calc_id, taskno)
self.pool.apply_async(call, cmd + (self.executing,),
error_callback=eback)
self.pool.apply_async(
call, cmd + (self.executing,),
error_callback=functools.partial(errback, mon.calc_id, taskno))
ctrlsock.send('submitted')
else:
ctrlsock.send('unknown command')
Expand Down
1 change: 1 addition & 0 deletions openquake/calculators/event_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ def build_events_from_sources(self):
nrups = parallel.Starmap( # weighting the heavy sources
count_ruptures, [(src,) for src in sources
if src.code in b'AMSC'],
h5=self.datastore.hdf5,
progress=logging.debug).reduce()
# NB: multifault sources must be considered light to avoid a large
# data transfer, even if .count_ruptures can be slow
Expand Down
5 changes: 4 additions & 1 deletion openquake/commonlib/calc.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ def _save_events(self, rup_array, rgetters):
else:
self.datastore.swmr_on() # before the Starmap
acc = parallel.Starmap(
self.get_eid_rlz, iterargs, progress=logging.debug).reduce()
self.get_eid_rlz, iterargs,
h5=self.datastore,
progress=logging.debug
).reduce()
i = 0
for ordinal, eid_rlz in sorted(acc.items()):
for er in eid_rlz:
Expand Down

0 comments on commit 6b82a01

Please sign in to comment.