diff --git a/openquake/baselib/parallel.py b/openquake/baselib/parallel.py index f5313975c80b..583ed26e2176 100644 --- a/openquake/baselib/parallel.py +++ b/openquake/baselib/parallel.py @@ -768,6 +768,7 @@ def __init__(self, task_func, task_args=(), distribute=None, match = re.search(r'(\d+)', os.path.basename(h5.filename)) self.calc_id = int(match.group(1)) else: + # TODO: see if we can forbid this case self.calc_id = None h5 = hdf5.File(gettemp(suffix='.hdf5'), 'w') init_performance(h5) diff --git a/openquake/baselib/workerpool.py b/openquake/baselib/workerpool.py index 69bd10e6ee16..20dc60519e0e 100644 --- a/openquake/baselib/workerpool.py +++ b/openquake/baselib/workerpool.py @@ -232,10 +232,10 @@ def call(func, args, taskno, mon, executing): def errback(job_id, task_no, exc): + # NB: job_id can be None if the Starmap was invoked without h5 from openquake.commonlib.logs import dbcmd dbcmd('log', job_id, datetime.utcnow(), 'ERROR', '%s/%s' % (job_id, task_no), str(exc)) - raise exc e = exc.__class__('in job %d, task %d' % (job_id, task_no)) raise e.with_traceback(exc.__traceback__) @@ -287,9 +287,9 @@ def start(self): ctrlsock.send(' '.join(executing)) elif isinstance(cmd, tuple): func, args, taskno, mon = cmd - eback = functools.partial(errback, mon.calc_id, taskno) - self.pool.apply_async(call, cmd + (self.executing,), - error_callback=eback) + self.pool.apply_async( + call, cmd + (self.executing,), + error_callback=functools.partial(errback, mon.calc_id, taskno)) ctrlsock.send('submitted') else: ctrlsock.send('unknown command') diff --git a/openquake/calculators/event_based.py b/openquake/calculators/event_based.py index cecd418bc845..0247e4a5672c 100644 --- a/openquake/calculators/event_based.py +++ b/openquake/calculators/event_based.py @@ -446,6 +446,7 @@ def build_events_from_sources(self): nrups = parallel.Starmap( # weighting the heavy sources count_ruptures, [(src,) for src in sources if src.code in b'AMSC'], + h5=self.datastore.hdf5, progress=logging.debug).reduce() # NB: multifault sources must be considered light to avoid a large # data transfer, even if .count_ruptures can be slow diff --git a/openquake/commonlib/calc.py b/openquake/commonlib/calc.py index 143057891360..b8fb99217e7e 100644 --- a/openquake/commonlib/calc.py +++ b/openquake/commonlib/calc.py @@ -240,7 +240,10 @@ def _save_events(self, rup_array, rgetters): else: self.datastore.swmr_on() # before the Starmap acc = parallel.Starmap( - self.get_eid_rlz, iterargs, progress=logging.debug).reduce() + self.get_eid_rlz, iterargs, + h5=self.datastore, + progress=logging.debug + ).reduce() i = 0 for ordinal, eid_rlz in sorted(acc.items()): for er in eid_rlz: