Skip to content

Commit

Permalink
Merge pull request #549 from martindurant/pq-metadata
Browse files Browse the repository at this point in the history
feat:  Only return parquet metadata if intending to write
  • Loading branch information
martindurant authored Nov 20, 2024
2 parents fcae1a2 + 64b3649 commit 1d4d4e9
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/awkward-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
run: |
python3 -m pip install pip wheel -U
python3 -m pip install -q --no-cache-dir -e .[complete,test]
python3 -m pip uninstall -y awkward && pip install git+https://github.com/scikit-hep/awkward.git@main
python3 -m pip uninstall -y awkward && pip install git+https://github.com/scikit-hep/awkward.git@main --no-deps
- name: Run tests
run: |
python3 -m pytest
47 changes: 38 additions & 9 deletions src/dask_awkward/lib/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ def __init__(
npartitions: int,
prefix: str | None = None,
storage_options: dict | None = None,
write_metadata: bool = False,
**kwargs: Any,
):
self.fs = fs
Expand All @@ -496,16 +497,19 @@ def __init__(
if isinstance(self.fs.protocol, str)
else self.fs.protocol[0]
)
self.write_metadata = write_metadata
self.kwargs = kwargs

def __call__(self, data, block_index):
filename = f"part{str(block_index[0]).zfill(self.zfill)}.parquet"
if self.prefix is not None:
filename = f"{self.prefix}-{filename}"
filename = self.fs.unstrip_protocol(f"{self.path}{self.fs.sep}{filename}")
return ak.to_parquet(
out = ak.to_parquet(
data, filename, **self.kwargs, storage_options=self.storage_options
)
if self.write_metadata:
return out


def to_parquet(
Expand Down Expand Up @@ -597,7 +601,10 @@ def to_parquet(
storage_options
Storage options passed to ``fsspec``.
write_metadata
Write Parquet metadata.
Write Parquet metadata. Note, that when this is True, all the
metadata pieces will be pulled into a single finalizer task. When
False, the whole write graph can be evaluated as a more efficient
tree reduction.
compute
If ``True``, immediately compute the result (write data to
disk). If ``False`` a Scalar collection will be returned such
Expand Down Expand Up @@ -667,6 +674,7 @@ def to_parquet(
parquet_old_int96_timestamps=parquet_old_int96_timestamps,
parquet_compliant_nested=parquet_compliant_nested,
parquet_extra_options=parquet_extra_options,
write_metadata=write_metadata,
),
array,
BlockIndex((array.npartitions,)),
Expand All @@ -681,17 +689,38 @@ def to_parquet(
dsk[(final_name, 0)] = (_metadata_file_from_metas, fs, path) + tuple(
map_res.__dask_keys__()
)
graph = HighLevelGraph.from_collections(
final_name,
AwkwardMaterializedLayer(dsk, previous_layer_names=[map_res.name]),
dependencies=[map_res],
)
out = new_scalar_object(graph, final_name, dtype="f8")
else:
final_name = name + "-finalize"
dsk[(final_name, 0)] = (lambda *_: None, map_res.__dask_keys__())
graph = HighLevelGraph.from_collections(
final_name,
AwkwardMaterializedLayer(dsk, previous_layer_names=[map_res.name]),
dependencies=[map_res],
)
out = new_scalar_object(graph, final_name, dtype="f8")
from dask_awkward.layers import AwkwardTreeReductionLayer

layer = AwkwardTreeReductionLayer(
name=final_name,
concat_func=none_to_none,
tree_node_func=none_to_none,
name_input=map_res.name,
npartitions_input=map_res.npartitions,
finalize_func=none_to_none,
)
graph = HighLevelGraph.from_collections(
final_name,
layer,
dependencies=[map_res],
)
out = new_scalar_object(graph, final_name, dtype="f8")

if compute:
out.compute()
return None
else:
return out


def none_to_none(*_):
"""Dummy reduction function where write tasks produce no metadata"""
return None

0 comments on commit 1d4d4e9

Please sign in to comment.