Skip to content

Commit

Permalink
Merge pull request #86 from aminnj/master
Browse files Browse the repository at this point in the history
user can specify max chunks to process with lazy chunking
  • Loading branch information
lgray authored May 29, 2019
2 parents 46531c8 + 3ac33cb commit 7f7d6bb
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions fnal_column_analysis_tools/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,21 @@ def _work_function(item):
@lru_cache(maxsize=128)
def _get_chunking(filelist, treename, chunksize):
items = []
for fn in filelist:
nentries = uproot.numentries(fn, treename)
executor = None if len(filelist) < 5 else concurrent.futures.ThreadPoolExecutor(10)
for fn, nentries in uproot.numentries(filelist, treename, total=False, executor=executor).items():
for index in range(nentries // chunksize + 1):
items.append((fn, chunksize, index))
return items


def run_uproot_job(fileset, treename, processor_instance, executor, executor_args={}, chunksize=500000):
def _get_chunking_lazy(filelist, treename, chunksize):
for fn in filelist:
nentries = uproot.numentries(fn, treename)
for index in range(nentries // chunksize + 1):
yield (fn, chunksize, index)


def run_uproot_job(fileset, treename, processor_instance, executor, executor_args={}, chunksize=500000, maxchunks=None):
'''
A convenience wrapper to submit jobs for a file set, which is a
dictionary of dataset: [file list] entries. Supports only uproot
Expand All @@ -97,6 +104,7 @@ def run_uproot_job(fileset, treename, processor_instance, executor, executor_arg
for item in items: accumulator += function(item)
executor_args: extra arguments to pass to executor
chunksize: number of entries to process at a time in the data frame
maxchunks: maximum number of chunks to process per dataset
'''
if not isinstance(fileset, Mapping):
raise ValueError("Expected fileset to be a mapping dataset: list(files)")
Expand All @@ -105,7 +113,13 @@ def run_uproot_job(fileset, treename, processor_instance, executor, executor_arg

items = []
for dataset, filelist in tqdm(fileset.items(), desc='Preprocessing'):
for chunk in _get_chunking(tuple(filelist), treename, chunksize):
if maxchunks is not None:
chunks = _get_chunking_lazy(tuple(filelist), treename, chunksize)
else:
chunks = _get_chunking(tuple(filelist), treename, chunksize)
for ichunk, chunk in enumerate(chunks):
if (maxchunks is not None) and (ichunk > maxchunks):
break
items.append((dataset, chunk[0], treename, chunk[1], chunk[2], processor_instance))

output = processor_instance.accumulator.identity()
Expand Down

0 comments on commit 7f7d6bb

Please sign in to comment.