diff --git a/fnal_column_analysis_tools/processor/executor.py b/fnal_column_analysis_tools/processor/executor.py index a6006bb5d..0f00c8f59 100644 --- a/fnal_column_analysis_tools/processor/executor.py +++ b/fnal_column_analysis_tools/processor/executor.py @@ -74,14 +74,21 @@ def _work_function(item): @lru_cache(maxsize=128) def _get_chunking(filelist, treename, chunksize): items = [] - for fn in filelist: - nentries = uproot.numentries(fn, treename) + executor = None if len(filelist) < 5 else concurrent.futures.ThreadPoolExecutor(10) + for fn, nentries in uproot.numentries(filelist, treename, total=False, executor=executor).items(): for index in range(nentries // chunksize + 1): items.append((fn, chunksize, index)) return items -def run_uproot_job(fileset, treename, processor_instance, executor, executor_args={}, chunksize=500000): +def _get_chunking_lazy(filelist, treename, chunksize): + for fn in filelist: + nentries = uproot.numentries(fn, treename) + for index in range(nentries // chunksize + 1): + yield (fn, chunksize, index) + + +def run_uproot_job(fileset, treename, processor_instance, executor, executor_args={}, chunksize=500000, maxchunks=None): ''' A convenience wrapper to submit jobs for a file set, which is a dictionary of dataset: [file list] entries. Supports only uproot @@ -97,6 +104,7 @@ def run_uproot_job(fileset, treename, processor_instance, executor, executor_arg for item in items: accumulator += function(item) executor_args: extra arguments to pass to executor chunksize: number of entries to process at a time in the data frame + maxchunks: maximum number of chunks to process per dataset ''' if not isinstance(fileset, Mapping): raise ValueError("Expected fileset to be a mapping dataset: list(files)") @@ -105,7 +113,13 @@ def run_uproot_job(fileset, treename, processor_instance, executor, executor_arg items = [] for dataset, filelist in tqdm(fileset.items(), desc='Preprocessing'): - for chunk in _get_chunking(tuple(filelist), treename, chunksize): + if maxchunks is not None: + chunks = _get_chunking_lazy(tuple(filelist), treename, chunksize) + else: + chunks = _get_chunking(tuple(filelist), treename, chunksize) + for ichunk, chunk in enumerate(chunks): + if (maxchunks is not None) and (ichunk > maxchunks): + break items.append((dataset, chunk[0], treename, chunk[1], chunk[2], processor_instance)) output = processor_instance.accumulator.identity()