Skip to content

Commit

Permalink
Merge pull request #165 from annawoodard/fix-parsl-hangs-164
Browse files Browse the repository at this point in the history
Fix hangs in Parsl executor
  • Loading branch information
lgray authored Sep 19, 2019
2 parents e7a4165 + 4c78c87 commit 36b3fd0
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 41 deletions.
18 changes: 3 additions & 15 deletions coffea/processor/parsl/detail.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from parsl.executors import HighThroughputExecutor

from ..executor import futures_handler
from .timeout import timeout

try:
from collections.abc import Sequence
Expand Down Expand Up @@ -45,29 +46,16 @@ def _parsl_stop(dfk):
parsl.clear()


@timeout
@python_app
def derive_chunks(filename, treename, chunksize, ds, timeout=10):
import uproot
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor, TimeoutError

uproot.XRootDSource.defaults["parallel"] = False

afile = None
for i in range(5):
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(uproot.open, filename)
try:
afile = future.result(timeout=timeout)
except TimeoutError:
afile = None
else:
break

if afile is None:
raise Exception('unable to open: %s' % filename)

afile = uproot.open(filename)

tree = None
if isinstance(treename, str):
tree = afile[treename]
Expand Down
17 changes: 3 additions & 14 deletions coffea/processor/parsl/parsl_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
lz4_clevel = 1


@timeout
@python_app
def coffea_pyapp(dataset, fn, treename, chunksize, index, procstr, timeout=None, flatten=True):
import uproot
Expand All @@ -25,7 +26,6 @@ def coffea_pyapp(dataset, fn, treename, chunksize, index, procstr, timeout=None,
import lz4.frame as lz4f
from coffea import hist, processor
from coffea.processor.accumulator import value_accumulator
from concurrent.futures import ThreadPoolExecutor, TimeoutError

uproot.XRootDSource.defaults["parallel"] = False

Expand All @@ -43,19 +43,8 @@ def _read(self, chunkindex):

processor_instance = cpkl.loads(lz4f.decompress(procstr))

afile = None
for i in range(5):
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(uproot.open, fn)
try:
afile = future.result(timeout=timeout)
except TimeoutError:
afile = None
else:
break

if afile is None:
raise Exception('unable to open: %s' % fn)
afile = uproot.open(fn)

tree = None
if isinstance(treename, str):
tree = afile[treename]
Expand Down
18 changes: 6 additions & 12 deletions coffea/processor/parsl/timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,18 @@ def timeout(func):

@wraps(func)
def wrapper(*args, **kwargs):

import signal
import datetime

def _timeout_handler(signum, frame):
raise Exception("Timeout hit")

signal.signal(signal.SIGALRM, _timeout_handler)
if kwargs.get('timeout', None):
signal.alarm(kwargs.get('timeout'))
print("[Timeout wrapper:{}] Timeout {}".format(datetime.datetime.now(),
kwargs.get('timeout')))
if kwargs.get('timeout'):
signal.alarm(max(1, int(kwargs['timeout'])))
try:
retval = func(*args, **kwargs)
return retval
except Exception as e:
print("[Timeout wrapper:{}] Caught an exception {}".format(datetime.datetime.now(),
e))
raise
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result

return wrapper

0 comments on commit 36b3fd0

Please sign in to comment.