Skip to content

Commit

Permalink
Merge pull request #203 from lgray/topic_use_offsets_spark
Browse files Browse the repository at this point in the history
Use offsets to build jagged arrays in spark, rather than counts
  • Loading branch information
lgray authored Nov 11, 2019
2 parents 5272fcc + 35b9f2a commit 9d3f41b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
14 changes: 9 additions & 5 deletions coffea/processor/templates/spark.py.tmpl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
global coffea_udf


@fn.pandas_udf(BinaryType(), fn.PandasUDFType.SCALAR)
def coffea_udf(dataset, {% for col in cols %}{{col}}{{ "," if not loop.last }}{% endfor %}):
global processor_instance, lz4_clevel
Expand All @@ -10,19 +11,21 @@ def coffea_udf(dataset, {% for col in cols %}{{col}}{{ "," if not loop.last }}{%
size = dataset.size
items = {}

counts_store = {}
offsets_store = {}

for i, col in enumerate(columns):
#numpy array
if columns[i].array[0].base is None:
items[names[i]] = columns[i].values
else:
prefix = names[i].split('_')[0] # this is specific to nanoaod, sadness
if prefix not in counts_store.keys():
counts_store[prefix] = columns[i].str.len().values
counts = counts_store[prefix]
if prefix not in offsets_store.keys():
counts = columns[i].str.len().values
offsets_store[prefix] = np.zeros(shape=(counts.size + 1, ), dtype=np.int64)
offsets_store[prefix][1:] = np.cumsum(counts)
offsets = offsets_store[prefix]
contents = columns[i].array[0].base
items[names[i]] = awkward.JaggedArray.fromcounts(counts, contents)
items[names[i]] = awkward.JaggedArray.fromoffsets(offsets, contents)

df = processor.PreloadedDataFrame(size=size, items=items)
df['dataset'] = dataset[0]
Expand All @@ -36,6 +39,7 @@ def coffea_udf(dataset, {% for col in cols %}{{col}}{{ "," if not loop.last }}{%

return pd.Series(outs)


@fn.pandas_udf(BinaryType(), fn.PandasUDFType.SCALAR)
def coffea_udf_flat(dataset, {% for col in cols %}{{col}}{{ "," if not loop.last }}{% endfor %}):
global processor_instance, lz4_clevel
Expand Down
2 changes: 1 addition & 1 deletion coffea/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import re

__version__ = "0.6.16"
__version__ = "0.6.17"
version = __version__
version_info = tuple(re.split(r"[-\.]", __version__))

Expand Down

0 comments on commit 9d3f41b

Please sign in to comment.