From 96b19dfab42383ddc331f8f71b5d02e49f60bbfc Mon Sep 17 00:00:00 2001 From: Angus Hollands Date: Fri, 6 Oct 2023 22:25:14 +0100 Subject: [PATCH] feat: implement `shape_touched` optimisation (#381) * scaffolding * separation of data and shape touched * checkpoint * wip * wip: latest * refactor: drop reference to reports * wip * wip * fix: don't try to read root column * fix: support fallback case * fix: support typetracing sample * test: fix tests * fix: don't touch data buffers for sizes Originally, if we saw a non-metadata buffer, we'd look to see if the current node has any unknown-length attributes that need computing. But, only NumpyArray has `data`, and it has no unknown-length attributes to read. * chore: appease pre-commit * chore: remove debug statement * feat!: remove `necessary_buffers` * chore: add type hints * fix: properly "deep" copy forms * fix: typo Co-authored-by: Doug Davis * test: drop local changes * fix: correct LSP * docs: add docstring * refactor: separate mocking from projection more cleanly * feat: return reports for later consumption * feat: expose `dak.report_necessary_buffers` * fix: ensure we only check input layers * feat: make default buffer key nicer * fix: restore ability to detect serialised blockwise layers * fix: remove `_meta` when serialising IO function * test: restore original test file * docs: use new name for necessary_columns * fix: restore wildcard projection for column-at-a-time readers * refactor: use DFS to find deepest field * fix: remove old code * refactor: remove two-phase abstraction * docs: add brief comment * test: add note about broken test * feat: add `necessary_columns` interface * feat: add `necessary_columns` interface * docs: improve notes * refactor: add implementation for mixin * docs: add note about `report_necessary_buffers` * fix: support `dak.necessary_columns` * refactor: export utils * chore: require newer uproot * chore: add local test file * chore: bump awkward depndency * Update docs/api/inspect.rst --------- Co-authored-by: Doug Davis Co-authored-by: Doug Davis --- .github/workflows/awkward-main.yml | 1 + .github/workflows/conda-tests.yml | 1 + .github/workflows/coverage.yml | 1 + .github/workflows/pypi-release.yml | 1 + .github/workflows/pypi-tests.yml | 1 + .github/workflows/uproot-main.yml | 1 + docs/api/inspect.rst | 3 +- docs/more/optimization.rst | 15 +- pyproject.toml | 4 +- src/dask_awkward/__init__.py | 10 +- src/dask_awkward/layers/__init__.py | 8 + src/dask_awkward/layers/layers.py | 264 +++++++++------- src/dask_awkward/lib/__init__.py | 7 +- src/dask_awkward/lib/_utils.py | 58 ---- src/dask_awkward/lib/core.py | 6 - src/dask_awkward/lib/inspect.py | 193 ++++++++++-- src/dask_awkward/lib/io/columnar.py | 158 ++++++++++ src/dask_awkward/lib/io/io.py | 57 ++-- src/dask_awkward/lib/io/json.py | 120 +++---- src/dask_awkward/lib/io/parquet.py | 74 ++--- src/dask_awkward/lib/optimize.py | 464 +++++++++++----------------- src/dask_awkward/lib/utils.py | 159 ++++++++++ tests/.gitattributes | 1 + tests/test-uproot/nano_dy.root | 3 + tests/test_core.py | 4 +- tests/test_inspect.py | 51 ++- tests/test_io.py | 3 - tests/test_layers.py | 12 - tests/test_str.py | 9 +- 29 files changed, 996 insertions(+), 693 deletions(-) delete mode 100644 src/dask_awkward/lib/_utils.py create mode 100644 src/dask_awkward/lib/io/columnar.py create mode 100644 src/dask_awkward/lib/utils.py create mode 100644 tests/.gitattributes create mode 100644 tests/test-uproot/nano_dy.root delete mode 100644 tests/test_layers.py diff --git a/.github/workflows/awkward-main.yml b/.github/workflows/awkward-main.yml index 15a1b00a..623da14b 100644 --- a/.github/workflows/awkward-main.yml +++ b/.github/workflows/awkward-main.yml @@ -23,6 +23,7 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 0 + lfs: true - name: Setup Python uses: actions/setup-python@v4 with: diff --git a/.github/workflows/conda-tests.yml b/.github/workflows/conda-tests.yml index 8214be7f..8f34e4aa 100644 --- a/.github/workflows/conda-tests.yml +++ b/.github/workflows/conda-tests.yml @@ -24,6 +24,7 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 0 + lfs: true - name: Setup Conda Environment uses: conda-incubator/setup-miniconda@v2 with: diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index fc70799b..c022bd2c 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -23,6 +23,7 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 0 + lfs: true - name: Setup Python uses: actions/setup-python@v4 with: diff --git a/.github/workflows/pypi-release.yml b/.github/workflows/pypi-release.yml index 89879e62..1ae56776 100644 --- a/.github/workflows/pypi-release.yml +++ b/.github/workflows/pypi-release.yml @@ -14,6 +14,7 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 0 + lfs: true - name: Setup Python uses: actions/setup-python@v4 diff --git a/.github/workflows/pypi-tests.yml b/.github/workflows/pypi-tests.yml index c974b187..211ef066 100644 --- a/.github/workflows/pypi-tests.yml +++ b/.github/workflows/pypi-tests.yml @@ -25,6 +25,7 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 0 + lfs: true - name: setup Python ${{matrix.python-version}} uses: actions/setup-python@v4 with: diff --git a/.github/workflows/uproot-main.yml b/.github/workflows/uproot-main.yml index 544c0d46..a7f3eaef 100644 --- a/.github/workflows/uproot-main.yml +++ b/.github/workflows/uproot-main.yml @@ -23,6 +23,7 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 0 + lfs: true - name: Setup Python uses: actions/setup-python@v4 with: diff --git a/docs/api/inspect.rst b/docs/api/inspect.rst index e6e1bf02..6033f960 100644 --- a/docs/api/inspect.rst +++ b/docs/api/inspect.rst @@ -8,7 +8,8 @@ Inspection partition_compatibility PartitionCompatibility - necessary_columns + report_necessary_buffers + report_necessary_columns sample .. raw:: html diff --git a/docs/more/optimization.rst b/docs/more/optimization.rst index 9a50cec2..7e2b0b5e 100644 --- a/docs/more/optimization.rst +++ b/docs/more/optimization.rst @@ -127,14 +127,14 @@ will only grab ``foo`` and ``bar.x``. :py:func:`ak.from_parquet` function at compute time. You can see which columns are determined to be necessary by calling -:func:`dask_awkward.necessary_columns` on the collection of interest +:func:`dask_awkward.report_necessary_columns` on the collection of interest (it returns a mapping that pairs an input layer with the list of necessary columns): .. code:: pycon >>> import dask_awkward as dak - >>> dak.necessary_columns(result) + >>> dak.report_necessary_columns(result) {"some-layer-name": ["foo", "bar.x"]} The optimization is performed by relying on upstream Awkward-Array @@ -156,7 +156,7 @@ parameter: One can also use the ``columns=`` argument (with :func:`~dask_awkward.from_parquet`, for example) to manually define which columns should be read from disk. The -:func:`~dask_awkward.necessary_columns` function can be used to +:func:`~dask_awkward.report_necessary_columns` function can be used to determine how one should use the ``columns=`` argument. Using our above example, we write @@ -179,3 +179,12 @@ workflow). + + +.. note:: + + Under the hood, the columns optimization is implemented as a *buffers* optimization; dask-awkward determines the + buffers necessary to read from a columnar source, before translating these to column names. Some IO sources might + not support :func:`~dask_awkward.report_necessary_columns`, e.g. if the source directly reads buffers from a container. + + For these IO sources, :func:`~dask_awkward.report_necessary_buffers` can be used instead. diff --git a/pyproject.toml b/pyproject.toml index a5117393..fbf459b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ classifiers = [ "Topic :: Software Development", ] dependencies = [ - "awkward >=2.4.4", + "awkward >=2.4.5", "dask >=2023.04.0", "typing_extensions >=4.8.0", ] @@ -70,7 +70,7 @@ test = [ "pytest >=6.0", "pytest-cov >=3.0.0", "requests >=2.27.1", - "uproot", + "uproot >=5.1.0rc1", ] [project.entry-points."dask.sizeof"] diff --git a/src/dask_awkward/__init__.py b/src/dask_awkward/__init__.py index a99140e6..00e2db40 100644 --- a/src/dask_awkward/__init__.py +++ b/src/dask_awkward/__init__.py @@ -8,6 +8,7 @@ import dask_awkward.lib.reducers as reducers import dask_awkward.lib.str as str import dask_awkward.lib.structure as structure +import dask_awkward.lib.utils as utils from dask_awkward.lib.core import Array, PartitionCompatibility, Record, Scalar from dask_awkward.lib.core import _type as type from dask_awkward.lib.core import ( @@ -16,7 +17,14 @@ partition_compatibility, ) from dask_awkward.lib.describe import fields -from dask_awkward.lib.inspect import necessary_columns, sample +from dask_awkward.lib.inspect import ( + report_necessary_buffers, + report_necessary_columns, + sample, +) + +necessary_columns = report_necessary_columns # Export for backwards compatibility. + from dask_awkward.lib.io.io import ( ImplementsFormTransformation, from_awkward, diff --git a/src/dask_awkward/layers/__init__.py b/src/dask_awkward/layers/__init__.py index ebdeb516..d4ba4c5e 100644 --- a/src/dask_awkward/layers/__init__.py +++ b/src/dask_awkward/layers/__init__.py @@ -3,6 +3,10 @@ AwkwardInputLayer, AwkwardMaterializedLayer, AwkwardTreeReductionLayer, + ImplementsIOFunction, + ImplementsProjection, + IOFunctionWithMocking, + io_func_implements_projection, ) __all__ = ( @@ -10,4 +14,8 @@ "AwkwardBlockwiseLayer", "AwkwardMaterializedLayer", "AwkwardTreeReductionLayer", + "ImplementsProjection", + "ImplementsIOFunction", + "IOFunctionWithMocking", + "io_func_implements_projection", ) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 5af5c971..747ff80b 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -1,9 +1,8 @@ from __future__ import annotations import copy -import pickle from collections.abc import Callable, Mapping -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Protocol, TypeVar, cast from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token from dask.highlevelgraph import MaterializedLayer @@ -12,39 +11,94 @@ from dask_awkward.utils import LazyInputsDict if TYPE_CHECKING: - from awkward.typetracer import TypeTracerReport + from awkward import Array as AwkwardArray + from awkward._nplikes.typetracer import TypeTracerReport class AwkwardBlockwiseLayer(Blockwise): """Just like upstream Blockwise, except we override pickling""" + has_been_unpickled: bool = False + @classmethod def from_blockwise(cls, layer: Blockwise) -> AwkwardBlockwiseLayer: ob = object.__new__(cls) ob.__dict__.update(layer.__dict__) return ob - def mock(self) -> tuple[AwkwardBlockwiseLayer, Any | None]: + def mock(self) -> AwkwardBlockwiseLayer: layer = copy.copy(self) nb = layer.numblocks layer.numblocks = {k: tuple(1 for _ in v) for k, v in nb.items()} layer.__dict__.pop("_dims", None) - return layer, None + return layer def __getstate__(self) -> dict: - d = self.__dict__.copy() - try: - pickle.dumps(d["_meta"]) - except (ValueError, TypeError, KeyError): - d.pop( - "_meta", None - ) # must be a typetracer, does not pickle and not needed on scheduler - return d + # Indicator that this layer has been serialised + state = self.__dict__.copy() + state["has_been_unpickled"] = True + return state def __repr__(self) -> str: return "Awkward" + super().__repr__() +class ImplementsIOFunction(Protocol): + def __call__(self, *args, **kwargs) -> AwkwardArray: + ... + + +T = TypeVar("T") + + +class ImplementsMocking(Protocol): + def mock(self) -> AwkwardArray: + ... + + +class ImplementsProjection(ImplementsMocking, Protocol[T]): + def prepare_for_projection(self) -> tuple[AwkwardArray, TypeTracerReport, T]: + ... + + def project(self, report: TypeTracerReport, state: T) -> ImplementsIOFunction: + ... + + +class ImplementsNecessaryColumns(ImplementsProjection[T], Protocol): + def necessary_columns(self, report: TypeTracerReport, state: T) -> frozenset[str]: + ... + + +class IOFunctionWithMocking(ImplementsMocking, ImplementsIOFunction): + def __init__(self, meta: AwkwardArray, io_func: ImplementsIOFunction): + self._meta = meta + self._io_func = io_func + + def __getstate__(self) -> dict: + state = self.__dict__.copy() + state["_meta"] = None + return state + + def __call__(self, *args, **kwargs) -> AwkwardArray: + return self._io_func(*args, **kwargs) + + def mock(self) -> AwkwardArray: + assert self._meta is not None + return self._meta + + +def io_func_implements_projection(func: ImplementsIOFunction) -> bool: + return hasattr(func, "prepare_for_projection") + + +def io_func_implements_mocking(func: ImplementsIOFunction) -> bool: + return hasattr(func, "mock") + + +def io_func_implements_columnar(func: ImplementsIOFunction) -> bool: + return hasattr(func, "necessary_columns") + + class AwkwardInputLayer(AwkwardBlockwiseLayer): """A layer known to perform IO and produce Awkward arrays @@ -55,26 +109,20 @@ def __init__( self, *, name: str, - columns: str | list[str] | None, inputs: Any, - io_func: Callable, - meta: Any, - behavior: dict | None, + io_func: ImplementsIOFunction, label: str | None = None, produces_tasks: bool = False, creation_info: dict | None = None, annotations: Mapping[str, Any] | None = None, ) -> None: self.name = name - self.columns = columns self.inputs = inputs self.io_func = io_func self.label = label self.produces_tasks = produces_tasks self.annotations = annotations self.creation_info = creation_info - self._meta = meta - self._behavior = behavior io_arg_map = BlockwiseDepDict( mapping=LazyInputsDict(self.inputs), # type: ignore @@ -93,111 +141,104 @@ def __init__( def __repr__(self) -> str: return f"AwkwardInputLayer<{self.output}>" - def mock(self) -> tuple[AwkwardInputLayer, TypeTracerReport]: - """Mock the input layer as starting with a dataless typetracer. + @property + def is_projectable(self) -> bool: + # isinstance(self.io_func, ImplementsProjection) + return ( + io_func_implements_projection(self.io_func) and not self.has_been_unpickled + ) + + @property + def is_mockable(self) -> bool: + # isinstance(self.io_func, ImplementsMocking) + return io_func_implements_mocking(self.io_func) + + @property + def is_columnar(self) -> bool: + return io_func_implements_columnar(self.io_func) + + def mock(self) -> AwkwardInputLayer: + assert self.is_mockable + return AwkwardInputLayer( + name=self.name, + inputs=[None][: int(list(self.numblocks.values())[0][0])], + io_func=lambda *_, **__: cast(ImplementsMocking, self.io_func).mock(), + label=self.label, + produces_tasks=self.produces_tasks, + creation_info=self.creation_info, + annotations=self.annotations, + ) + + def prepare_for_projection(self) -> tuple[AwkwardInputLayer, TypeTracerReport, T]: + """Mock the input layer as starting with a data-less typetracer. This method is used to create new dask task graphs that operate purely on typetracer Arrays (that is, array with awkward structure but without real data buffers). This allows us to test which parts of a real awkward array will be used in a real computation. We do this by running a graph which starts - with mocked AwkwardInputLayers + with mocked AwkwardInputLayers. We mock an AwkwardInputLayer in these steps: - - 1. Copy the original ``_meta`` form. - 2. Create a new typetracer array from that form. - 3. Take the form from the new typetracer array. - 4. Label the components of the new form. - 5. Pass the new labelled form to the typetracer_with_report - function from upstream awkward. This creates a report - object that tells us which buffers in a form get used. - 6. Create a new typetracer array that represents an array that - would come from a real input layer, and make that the - result of the input layer. - 7. Return the new layer (which only results in a typetracer - array) along with the mutable report object. + 1. Ask the IO function to prepare a new meta array, and return + any transient state. + 2. Build a new AwkwardInputLayer whose IO function just returns + this meta (typetracer) array + 3. Return the new input layer and the transient state When this new layer is added to a dask task graph and that graph is computed, the report object will be mutated. Inspecting the report object after the compute tells us which buffers from the original form would be required for a real compute with the same graph. - Returns ------- AwkwardInputLayer Copy of the input layer with data-less input. TypeTracerReport - The mutable report object that is updated upon computation - of a graph starting with the new AwkwardInputLayer. - + The report object used to track touched buffers. + Any + The black-box state object returned by the IO function. """ - import awkward as ak - - from dask_awkward.lib._utils import set_form_keys - - starting_form = copy.deepcopy(self._meta.layout.form) - set_form_keys(starting_form, key=self.name) - - new_meta_array, report = ak.typetracer.typetracer_with_report( - starting_form, highlevel=True, behavior=self._behavior - ) + assert self.is_projectable + new_meta_array, report, state = cast( + ImplementsProjection, self.io_func + ).prepare_for_projection() new_input_layer = AwkwardInputLayer( name=self.name, - columns=self.columns, inputs=[None][: int(list(self.numblocks.values())[0][0])], io_func=lambda *_, **__: new_meta_array, label=self.label, produces_tasks=self.produces_tasks, creation_info=self.creation_info, annotations=self.annotations, - meta=new_meta_array, - behavior=self._behavior, ) - return new_input_layer, report - - def project_columns(self, columns: list[str]) -> AwkwardInputLayer: - if hasattr(self.io_func, "project_columns"): - # TODO: make project_columns call sites never pass in an - # empty list. - if len(columns) == 0: - columns = self._meta.fields[:1] - - # original form - original_form = self._meta.layout.form - - # original columns before column projection - original_form_columns = original_form.columns() - - # make sure that the requested columns match the order of - # the original columns; tack on "new" columns that are - # likely the wildcard columns. - original = [c for c in original_form_columns if c in columns] - new = [c for c in columns if c not in original_form_columns] - columns = original + new - - try: - io_func = self.io_func.project_columns( - columns, - original_form=original_form, - ) - except TypeError: - io_func = self.io_func.project_columns(columns) - return AwkwardInputLayer( - name=self.name, - columns=columns, - inputs=self.inputs, - io_func=io_func, - label=self.label, - produces_tasks=self.produces_tasks, - creation_info=self.creation_info, - annotations=self.annotations, - meta=self._meta, - behavior=self._behavior, - ) - return self + return new_input_layer, report, state + + def project( + self, + report: TypeTracerReport, + state: T, + ): + assert self.is_projectable + return AwkwardInputLayer( + name=self.name, + inputs=self.inputs, + io_func=cast(ImplementsProjection, self.io_func).project( + report=report, state=state + ), + label=self.label, + produces_tasks=self.produces_tasks, + creation_info=self.creation_info, + annotations=self.annotations, + ) + + def necessary_columns(self, report: TypeTracerReport, state: T) -> frozenset[str]: + assert self.is_columnar + return cast(ImplementsNecessaryColumns, self.io_func).necessary_columns( + report=report, state=state + ) class AwkwardMaterializedLayer(MaterializedLayer): @@ -213,11 +254,11 @@ def __init__( self.fn = fn super().__init__(mapping, **kwargs) - def mock(self) -> tuple[MaterializedLayer, Any | None]: + def mock(self) -> MaterializedLayer: mapping = copy.copy(self.mapping) if not mapping: # no partitions at all - return self, None + return self name = next(iter(mapping))[0] # one previous layer name @@ -240,8 +281,8 @@ def mock(self) -> tuple[MaterializedLayer, Any | None]: # just want the first partition. if len(task) == 2 and task[1] > 0: task = (task[0], 0) - return MaterializedLayer({(name, 0): task}), None - return self, None + return MaterializedLayer({(name, 0): task}) + return self # more than one previous_layer_names # @@ -254,21 +295,18 @@ def mock(self) -> tuple[MaterializedLayer, Any | None]: ) name0s = tuple((name, 0) for name in self.previous_layer_names) task = (self.fn, *name0s) - return MaterializedLayer({(name, 0): task}), None + return MaterializedLayer({(name, 0): task}) class AwkwardTreeReductionLayer(DataFrameTreeReduction): - def mock(self) -> tuple[AwkwardTreeReductionLayer, Any | None]: - return ( - AwkwardTreeReductionLayer( - name=self.name, - name_input=self.name_input, - npartitions_input=1, - concat_func=self.concat_func, - tree_node_func=self.tree_node_func, - finalize_func=self.finalize_func, - split_every=self.split_every, - tree_node_name=self.tree_node_name, - ), - None, + def mock(self) -> AwkwardTreeReductionLayer: + return AwkwardTreeReductionLayer( + name=self.name, + name_input=self.name_input, + npartitions_input=1, + concat_func=self.concat_func, + tree_node_func=self.tree_node_func, + finalize_func=self.finalize_func, + split_every=self.split_every, + tree_node_name=self.tree_node_name, ) diff --git a/src/dask_awkward/lib/__init__.py b/src/dask_awkward/lib/__init__.py index 7a51116b..7554af91 100644 --- a/src/dask_awkward/lib/__init__.py +++ b/src/dask_awkward/lib/__init__.py @@ -1,4 +1,5 @@ import dask_awkward.lib.str as str +import dask_awkward.lib.utils as utils from dask_awkward.lib.core import Array, PartitionCompatibility, Record, Scalar from dask_awkward.lib.core import _type as type from dask_awkward.lib.core import ( @@ -7,7 +8,11 @@ partition_compatibility, ) from dask_awkward.lib.describe import fields -from dask_awkward.lib.inspect import necessary_columns, sample +from dask_awkward.lib.inspect import ( + report_necessary_buffers, + report_necessary_columns, + sample, +) from dask_awkward.lib.io.io import ( ImplementsFormTransformation, from_awkward, diff --git a/src/dask_awkward/lib/_utils.py b/src/dask_awkward/lib/_utils.py deleted file mode 100644 index 6b6dd533..00000000 --- a/src/dask_awkward/lib/_utils.py +++ /dev/null @@ -1,58 +0,0 @@ -from __future__ import annotations - -from typing import Final - -from awkward.forms.form import Form - -LIST_KEY: Final = "__list__" - - -def set_form_keys(form: Form, *, key: str) -> Form: - """Recursive function to apply key labels to `form`. - - Parameters - ---------- - form : awkward.forms.form.Form - Awkward Array form object to mutate. - key : str - Label to apply. If recursion is triggered by passing in a - Record Form, the key is used as a prefix for a specific - field. - - Returns - ------- - awkward.forms.form.Form - Mutated Form object. - - """ - - # If the form is a record we need to loop over all fields in the - # record and set form that include the field name; this will keep - # recursing as well. - if form.is_record: - for field in form.fields: - full_key = f"{key}.{field}" - set_form_keys(form.content(field), key=full_key) - - # If the form is a list (e.g. ListOffsetArray) we append a - # __list__ suffix to notify the optimization pass that we only - # touched the offsets and not the data buffer for this kind of - # identified form; keep recursing - elif form.is_list: - form.form_key = f"{key}.{LIST_KEY}" - set_form_keys(form.content, key=key) - - # NumPy like array is easy - elif form.is_numpy: - form.form_key = key - - elif form.is_union: - for entry in form.contents: - set_form_keys(entry, key=key) - - # Anything else grab the content and keep recursing - else: - set_form_keys(form.content, key=key) - - # Return the now mutated Form object. - return form diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index 90c97f60..93fcbf7d 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -493,12 +493,6 @@ def __init__( divisions: tuple[int | None, ...], ) -> None: self._dask: HighLevelGraph = dsk - if hasattr(dsk, "layers"): - # i.e., NOT matrializes/persisted state - # output typetracer - lay = dsk.layers[dsk._toposort_layers()[-1]] - if isinstance(lay, AwkwardBlockwiseLayer): - lay._meta = meta # type: ignore self._name: str = name self._divisions: tuple[int | None, ...] = divisions self._meta: ak.Array = meta diff --git a/src/dask_awkward/lib/inspect.py b/src/dask_awkward/lib/inspect.py index 280d9162..030a5012 100644 --- a/src/dask_awkward/lib/inspect.py +++ b/src/dask_awkward/lib/inspect.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, NamedTuple import numpy as np from dask.base import unpack_collections @@ -9,12 +9,20 @@ from dask_awkward.layers import AwkwardInputLayer if TYPE_CHECKING: + from awkward.highlevel import Array as AwkArray + from dask_awkward.lib.core import Array -def necessary_columns(*args: Any, traverse: bool = True) -> dict[str, list[str]]: - r"""Determine the columns necessary to compute a collection. +class NecessaryBuffers(NamedTuple): + data_and_shape: frozenset[str] + shape_only: frozenset[str] + +def report_necessary_buffers( + *args: Any, traverse: bool = True +) -> dict[str, NecessaryBuffers | None]: + r"""Determine the buffer keys necessary to compute a collection. Parameters ---------- *args : Dask collections or HighLevelGraphs @@ -23,31 +31,24 @@ def necessary_columns(*args: Any, traverse: bool = True) -> dict[str, list[str]] traverse : bool, optional If True (default), builtin Python collections are traversed looking for any Dask collections they might contain. - Returns ------- - dict[str, list[str]] - Mapping that pairs the input layers in the graph to the - columns that have been determined necessary from that layer. - These are not necessarily in the same order as the original - input. - + dict[str, NecessaryBuffers | None] + Mapping that pairs the input layers in the graph to objects + describing the data and shape buffers that have been tagged + as required by column optimisation of the given layer. Examples -------- If we have a hypothetical parquet dataset (``ds``) with the fields - - "foo" - "bar" - "baz" - And the "baz" field has fields - "x" - "y" - The calculation of ``ds.bar + ds.baz.x`` will only require the ``bar`` and ``baz.x`` columns from the parquet file. - >>> import dask_awkward as dak >>> ds = dak.from_parquet("some-dataset") >>> ds.fields @@ -55,11 +56,12 @@ def necessary_columns(*args: Any, traverse: bool = True) -> dict[str, list[str]] >>> ds.baz.fields ["x", "y"] >>> x = ds.bar + ds.baz.x - >>> dak.necessary_columns(x) - {"from-parquet-abc123": ["bar", "baz.x"]} - - Notice that ``foo`` and ``baz.y`` are not determined to be - necessary. + >>> dak.report_necessary_buffers(x) + { + "from-parquet-abc123": NecessaryBuffers( + data_and_shape=frozenset(...), shape_only=frozenset(...) + ) + } """ import dask_awkward.lib.optimize as o @@ -68,23 +70,149 @@ def necessary_columns(*args: Any, traverse: bool = True) -> dict[str, list[str]] if not collections: return {} - out: dict[str, list[str]] = {} + seen_names = set() + + name_to_necessary_buffers: dict[str, NecessaryBuffers | None] = {} for obj in collections: dsk = obj if isinstance(obj, HighLevelGraph) else obj.dask - cols_this_dsk = o._necessary_columns(dsk) + projection_data = o._prepare_buffer_projection(dsk) + + # If the projection failed, or there are no input layers + if projection_data is None: + # Ensure that we have a record of the seen layers, if they're inputs + for name, layer in dsk.items(): + if isinstance(layer, AwkwardInputLayer): + seen_names.add(name) + continue + + # Unpack projection information + layer_to_reports, _ = projection_data + for name, report in layer_to_reports.items(): + existing_buffers = name_to_necessary_buffers.setdefault( + name, NecessaryBuffers(frozenset(), frozenset()) + ) + # Compute the shape-only keys in addition to the data and shape + data_and_shape = frozenset(report.data_touched) + shape_only = frozenset(report.shape_touched) - data_and_shape + + # Update set of touched keys + name_to_necessary_buffers[name] = NecessaryBuffers( + data_and_shape=existing_buffers.data_and_shape | data_and_shape, + shape_only=existing_buffers.shape_only | shape_only, + ) + + # Populate result with names of seen layers + for k in seen_names: + name_to_necessary_buffers.setdefault(k, None) + return name_to_necessary_buffers + + +def report_necessary_columns( + *args: Any, traverse: bool = True +) -> dict[str, frozenset[str] | None]: + r"""Determine the columns necessary to compute a collection built from + a columnar source. + + Parameters + ---------- + *args : Dask collections or HighLevelGraphs + The collection (or collection graph) of interest. These can be + individual objects, lists, sets, or dictionaries. + traverse : bool, optional + If True (default), builtin Python collections are traversed + looking for any Dask collections they might contain. + Returns + ------- + dict[str, frozenset[str] | None] + Mapping that pairs the input layers in the graph to the + set of necessary IO columns that have been identified by column + optimisation of the given layer. If the layer is not backed by a + columnar source, then None is returned instead of a set. + Examples + -------- + If we have a hypothetical parquet dataset (``ds``) with the fields + - "foo" + - "bar" + - "baz" + And the "baz" field has fields - for name in cols_this_dsk: - neccols = cols_this_dsk[name] - if not isinstance(dsk.layers[name], AwkwardInputLayer): - raise TypeError(f"Layer {name} should be an AwkwardInputLayer.") - cols_this_dsk[name] = o._prune_wildcards(neccols, dsk.layers[name]._meta) + - "x" + - "y" + The calculation of ``ds.bar + ds.baz.x`` will only require the + ``bar`` and ``baz.x`` columns from the parquet file. + >>> import dask_awkward as dak + >>> ds = dak.from_parquet("some-dataset") + >>> ds.fields + ["foo", "bar", "baz"] + >>> ds.baz.fields + ["x", "y"] + >>> x = ds.bar + ds.baz.x + >>> dak.report_necessary_columns(x) + { + "from-parquet-abc123": frozenset({"bar", "baz.x"}) + } + + """ + import dask_awkward.lib.optimize as o - for key, cols in cols_this_dsk.items(): - prev = out.get(key, []) - update = list(set(prev + cols)) - out[key] = update + collections, _ = unpack_collections(*args, traverse=traverse) + if not collections: + return {} - return out + seen_names = set() + + name_to_necessary_columns: dict[str, frozenset] = {} + for obj in collections: + dsk = obj if isinstance(obj, HighLevelGraph) else obj.dask + projection_data = o._prepare_buffer_projection(dsk) + + # If the projection failed, or there are no input layers + if projection_data is None: + # Ensure that we have a record of the seen layers, if they're inputs + for name, layer in dsk.items(): + if isinstance(layer, AwkwardInputLayer): + seen_names.add(name) + continue + + # Unpack projection information + layer_to_reports, layer_to_projection_state = projection_data + for name, report in layer_to_reports.items(): + layer = dsk.layers[name] + if not (isinstance(layer, AwkwardInputLayer) and layer.is_columnar): + continue + + existing_columns = name_to_necessary_columns.setdefault(name, frozenset()) + + # Update set of touched keys + name_to_necessary_columns[ + name + ] = existing_columns | layer.necessary_columns( + report=report, state=layer_to_projection_state[name] + ) + + # Populate result with names of seen layers + for k in seen_names: + name_to_necessary_columns.setdefault(k, None) + return name_to_necessary_columns + + +def _random_boolean_like(array_like: AwkArray, probability: float) -> AwkArray: + import awkward as ak + + backend = ak.backend(array_like) + layout = ak.to_layout(array_like) + + if ak.backend(array_like) == "typetracer": + return ak.Array( + ak.to_layout(np.empty(0, dtype=np.bool_)).to_typetracer(forget_length=True), + behavior=array_like.behavior, + ) + else: + return ak.Array( + np.random.random(layout.length) < probability, + behavior=array_like.behavior, + backend=backend, + ) def sample( @@ -115,6 +243,5 @@ def sample( return arr.map_partitions(lambda x: x[::factor], meta=arr._meta) else: return arr.map_partitions( - lambda x: x[np.random.random(len(x)) < probability], # type: ignore - meta=arr._meta, + lambda x: x[_random_boolean_like(x, probability)], meta=arr._meta ) diff --git a/src/dask_awkward/lib/io/columnar.py b/src/dask_awkward/lib/io/columnar.py new file mode 100644 index 00000000..88b8bb02 --- /dev/null +++ b/src/dask_awkward/lib/io/columnar.py @@ -0,0 +1,158 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Protocol, TypeVar, cast + +import awkward as ak +from awkward import Array as AwkwardArray +from awkward.forms import Form + +from dask_awkward.layers.layers import ImplementsNecessaryColumns +from dask_awkward.lib.utils import ( + METADATA_ATTRIBUTES, + FormStructure, + buffer_keys_required_to_compute_shapes, + form_with_unique_keys, + parse_buffer_key, + render_buffer_key, + trace_form_structure, + walk_graph_depth_first, +) + +if TYPE_CHECKING: + from awkward._nplikes.typetracer import TypeTracerReport + +log = logging.getLogger(__name__) + +T = TypeVar("T") + + +class ImplementsColumnProjectionMixin(ImplementsNecessaryColumns, Protocol): + @property + def form(self) -> Form: + ... + + @property + def behavior(self) -> dict | None: + ... + + def project_columns(self: T, columns: frozenset[str]) -> T: + ... + + +S = TypeVar("S", bound=ImplementsColumnProjectionMixin) + + +class ColumnProjectionMixin(ImplementsNecessaryColumns[FormStructure]): + """A mixin to add column-centric buffer projection to an IO function. + + Classes that inherit from this mixin are assumed to be able to read at the + granularity of _fields_ in a form. As such, the buffer projection is performed + such that the smallest possible number of fields (columns) are read, even + when only metadata buffers are required. + """ + + def mock(self: S) -> AwkwardArray: + return ak.typetracer.typetracer_from_form(self.form, behavior=self.behavior) + + def prepare_for_projection( + self: S, + ) -> tuple[AwkwardArray, TypeTracerReport, FormStructure]: + form = form_with_unique_keys(self.form, "@") + + # Build typetracer and associated report object + (meta, report) = ak.typetracer.typetracer_with_report( + form, + highlevel=True, + behavior=self.behavior, + buffer_key=render_buffer_key, + ) + + return ( + cast(AwkwardArray, meta), + report, + trace_form_structure(form, buffer_key=render_buffer_key), + ) + + def necessary_columns( + self: S, + report: TypeTracerReport, + state: FormStructure, + ) -> frozenset[str]: + ## Read from stash + # Form hierarchy information + form_key_to_parent_form_key = state["form_key_to_parent_form_key"] + form_key_to_child_form_keys: dict[str, list[str]] = {} + for child_key, parent_key in form_key_to_parent_form_key.items(): + form_key_to_child_form_keys.setdefault(parent_key, []).append(child_key) + form_key_to_form = state["form_key_to_form"] + # Buffer hierarchy information + form_key_to_buffer_keys = state["form_key_to_buffer_keys"] + # Column hierarchy information + form_key_to_path = state["form_key_to_path"] + + # Require the data of metadata buffers above shape-only requests + data_buffers = { + *report.data_touched, + *buffer_keys_required_to_compute_shapes( + parse_buffer_key, + report.shape_touched, + form_key_to_parent_form_key, + form_key_to_buffer_keys, + ), + } + + # We can't read buffers directly, but if we encounter a metadata + # buffer, then we should be able to pick any child. + paths = set() + wildcard_form_key = set() + for buffer_key in data_buffers: + form_key, attribute = parse_buffer_key(buffer_key) + if attribute in METADATA_ATTRIBUTES: + wildcard_form_key.add(form_key) + else: + paths.add(form_key_to_path[form_key]) + + # Select the most appropriate column for each wildcard + for form_key in wildcard_form_key: + # Find (DFS) any non-empty record form in any child + recursive_child_forms = ( + form_key_to_form[k] + for k in walk_graph_depth_first(form_key, form_key_to_child_form_keys) + ) + record_form_keys_with_contents = ( + f.form_key + for f in recursive_child_forms + if isinstance(f, ak.forms.RecordForm) and f.contents + ) + # Now find the deepest of such records + try: + last_record_form_key = next(record_form_keys_with_contents) + except StopIteration: + # This is a leaf! Therefore, we read this column + paths.add(form_key_to_path[form_key]) + continue + else: + # Ensure we get the "actual" last form key + for last_record_form_key in record_form_keys_with_contents: + ... + + # First see if any child is already included + for any_child_form_key in form_key_to_child_form_keys[last_record_form_key]: + any_child_path = form_key_to_path[any_child_form_key] + if any_child_path in paths: + break + # Otherwise, add the last child + else: + paths.add(any_child_path) + return frozenset({".".join(p) for p in paths if p}) + + def project( + self: S, + report: TypeTracerReport, + state: FormStructure, + ) -> S: + if not self.use_optimization: + return self + + return self.project_columns(self.necessary_columns(report, state)) diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index baa7f655..ef6eb404 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -1,9 +1,10 @@ from __future__ import annotations import math +import warnings from collections.abc import Iterable from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Callable, Mapping, Protocol +from typing import TYPE_CHECKING, Any, Callable, Mapping, Protocol, cast import awkward as ak import numpy as np @@ -13,8 +14,17 @@ from dask.utils import funcname, is_integer, parse_bytes from fsspec.utils import infer_compression -from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer -from dask_awkward.layers.layers import AwkwardMaterializedLayer +from dask_awkward.layers import ( + AwkwardBlockwiseLayer, + AwkwardInputLayer, + ImplementsIOFunction, +) +from dask_awkward.layers.layers import ( + AwkwardMaterializedLayer, + ImplementsMocking, + IOFunctionWithMocking, + io_func_implements_mocking, +) from dask_awkward.lib.core import ( empty_typetracer, map_partitions, @@ -153,6 +163,7 @@ def from_lists(source: list, behavior: dict | None = None) -> Array: lists, meta=typetracer_array(ak.Array(lists[0])), divisions=divs, + behavior=behavior, label="from-lists", ) @@ -458,7 +469,7 @@ def __call__(self, packed_arg): def from_map( - func: Callable, + func: ImplementsIOFunction, *iterables: Iterable, args: tuple[Any, ...] | None = None, label: str | None = None, @@ -547,6 +558,8 @@ def from_map( name = f"{label}-{token}" # Define io_func + + # FIXME: projection etc. if packed or args or kwargs: func = PackedArgCallable( func, @@ -555,24 +568,32 @@ def from_map( packed=packed, ) - dsk = AwkwardInputLayer( - name=name, - columns=None, - inputs=inputs, - io_func=func, - meta=meta, - behavior=behavior, - ) + # Special `io_func` implementations can implement mocking and optionally + # support buffer projection. + if io_func_implements_mocking(func): + io_func = func + array_meta = cast(ImplementsMocking, func).mock() + # If we know the meta, we can spoof mocking + elif meta is not None: + io_func = IOFunctionWithMocking(meta, func) + array_meta = meta + # Without `meta`, the meta will be computed by executing the graph + else: + io_func = func + array_meta = None + + dsk = AwkwardInputLayer(name=name, inputs=inputs, io_func=io_func) + + if behavior is not None: + warnings.warn( + "The `behavior` argument is deprecated for `from_map`, and consequently ignored." + ) hlg = HighLevelGraph.from_collections(name, dsk) if divisions is not None: - result = new_array_object( - hlg, name, meta=meta, behavior=dsk._behavior, divisions=divisions - ) + result = new_array_object(hlg, name, meta=array_meta, divisions=divisions) else: - result = new_array_object( - hlg, name, meta=meta, behavior=dsk._behavior, npartitions=len(inputs) - ) + result = new_array_object(hlg, name, meta=array_meta, npartitions=len(inputs)) return result diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index 65f8ade4..fcb0a9ec 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -3,7 +3,7 @@ import abc import logging import math -from collections.abc import Callable, Sequence +from collections.abc import Callable from typing import TYPE_CHECKING, Any, Literal, overload import awkward as ak @@ -18,6 +18,7 @@ from fsspec.utils import infer_compression, read_block from dask_awkward.lib.core import map_partitions, new_scalar_object, typetracer_array +from dask_awkward.lib.io.columnar import ColumnProjectionMixin from dask_awkward.lib.io.io import ( _bytes_with_sample, _BytesReadingInstructions, @@ -27,30 +28,21 @@ if TYPE_CHECKING: from awkward.contents.content import Content from fsspec.spec import AbstractFileSystem - from typing_extensions import Self from dask_awkward.lib.core import Array, Scalar - log = logging.getLogger(__name__) -def _use_optimization() -> bool: - return "json" in dask.config.get( - "awkward.optimization.columns-opt-formats", - default=[], - ) - - -class FromJsonFn: +class FromJsonFn(ColumnProjectionMixin): def __init__( self, *, storage: AbstractFileSystem, + form: Form, compression: str | None = None, schema: str | dict | list | None = None, - form: Form | None = None, - original_form: Form | None = None, + behavior: dict | None = None, **kwargs: Any, ) -> None: self.compression = compression @@ -58,35 +50,36 @@ def __init__( self.schema = schema self.kwargs = kwargs self.form = form - self.original_form = original_form + self.behavior = behavior @abc.abstractmethod def __call__(self, source: Any) -> ak.Array: ... - def _default_project_columns( - self, - columns: Sequence[str], - original_form: Form | None = None, - ) -> Self: - if self.schema is not None: - return self - - if self.form is not None: - form = self.form.select_columns(columns) - assert form is not None - schema = layout_to_jsonschema(form.length_zero_array(highlevel=False)) - - return type(self)( - schema=schema, - form=form, - storage=self.storage, - compression=self.compression, - original_form=original_form, - **self.kwargs, + @property + def use_optimization(self) -> bool: + return ( + "json" + in dask.config.get( + "awkward.optimization.columns-opt-formats", + default=[], ) + and self.schema is None + ) + + def project_columns(self, columns: set[str]): + form = self.form.select_columns(columns) + assert form is not None + schema = layout_to_jsonschema(form.length_zero_array(highlevel=False)) - return self + return type(self)( + schema=schema, + form=self.form, + storage=self.storage, + compression=self.compression, + behavior=self.behavior, + **self.kwargs, + ) class FromJsonLineDelimitedFn(FromJsonFn): @@ -94,10 +87,10 @@ def __init__( self, *, storage: AbstractFileSystem, + form: Form, compression: str | None = None, schema: str | dict | list | None = None, - form: Form | None = None, - original_form: Form | None = None, + behavior: dict | None = None, **kwargs: Any, ) -> None: super().__init__( @@ -105,7 +98,7 @@ def __init__( compression=compression, schema=schema, form=form, - original_form=original_form, + behavior=behavior, **kwargs, ) @@ -122,29 +115,16 @@ def __call__(self, source: str) -> ak.Array: return array # return ak.Array(unproject_layout(self.original_form, array.layout)) - def project_columns( - self, - columns: Sequence[str], - original_form: Form | None = None, - ) -> Self: - if not _use_optimization(): - return self - - return self._default_project_columns( - columns=columns, - original_form=original_form, - ) - class FromJsonSingleObjPerFile(FromJsonFn): def __init__( self, *, storage: AbstractFileSystem, + form: Form, compression: str | None = None, schema: str | dict | list | None = None, - form: Form | None = None, - original_form: Form | None = None, + behavior: dict | None = None, **kwargs: Any, ) -> None: super().__init__( @@ -152,7 +132,7 @@ def __init__( compression=compression, schema=schema, form=form, - original_form=original_form, + behavior=behavior, **kwargs, ) @@ -173,37 +153,24 @@ def __call__(self, source: str) -> ak.Array: return array # return ak.Array(unproject_layout(self.original_form, array.layout)) - def project_columns( - self, - columns: Sequence[str], - original_form: Form | None = None, - ) -> Self: - if not _use_optimization(): - return self - - return self._default_project_columns( - columns=columns, - original_form=original_form, - ) - class FromJsonBytesFn(FromJsonFn): def __init__( self, *, storage: AbstractFileSystem, + form: Form, compression: str | None = None, schema: str | dict | list | None = None, - form: Form | None = None, - original_form: Form | None = None, + behavior: dict | None = None, **kwargs: Any, ) -> None: super().__init__( storage=storage, compression=compression, schema=schema, + behavior=behavior, form=form, - original_form=original_form, **kwargs, ) @@ -232,19 +199,6 @@ def __call__(self, instructions: _BytesReadingInstructions) -> ak.Array: return array # return ak.Array(unproject_layout(self.original_form, array.layout)) - def project_columns( - self, - columns: Sequence[str], - original_form: Form | None = None, - ) -> Self: - if not _use_optimization(): - return self - - return self._default_project_columns( - columns=columns, - original_form=original_form, - ) - def meta_from_single_file( *, diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index 8b840c54..df03e42c 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -5,8 +5,7 @@ import logging import math import operator -from collections.abc import Sequence -from typing import Any, Literal +from typing import TYPE_CHECKING, Any, Literal, TypeVar import awkward as ak import awkward.operations.ak_from_parquet as ak_from_parquet @@ -18,27 +17,21 @@ from fsspec import AbstractFileSystem from fsspec.core import get_fs_token_paths, url_to_fs -from dask_awkward.lib.core import ( - Array, - Scalar, - map_partitions, - new_scalar_object, - typetracer_array, -) +from dask_awkward.lib.core import Array, Scalar, map_partitions, new_scalar_object +from dask_awkward.lib.io.columnar import ColumnProjectionMixin from dask_awkward.lib.io.io import from_map from dask_awkward.lib.unproject_layout import unproject_layout +if TYPE_CHECKING: + pass + log = logging.getLogger(__name__) -def _use_optimization() -> bool: - return "parquet" in dask.config.get( - "awkward.optimization.columns-opt-formats", - default=[], - ) +T = TypeVar("T") -class _FromParquetFn: +class _FromParquetFn(ColumnProjectionMixin): def __init__( self, *, @@ -66,13 +59,16 @@ def __call__(self, source: Any) -> ak.Array: ... @abc.abstractmethod - def project_columns( - self, - columns: Sequence[str] | None, - orignal_form: Form | None = None, - ) -> _FromParquetFn: + def project_columns(self, columns: set[str]): ... + @property + def use_optimization(self) -> bool: + return "parquet" in dask.config.get( + "awkward.optimization.columns-opt-formats", + default=[], + ) + def __repr__(self) -> str: s = ( "\nFromParquetFn(\n" @@ -126,28 +122,16 @@ def __call__(self, source: Any) -> Any: ) return ak.Array(unproject_layout(self.original_form, array.layout)) - def project_columns( - self, - columns: Sequence[str] | None, - original_form: Form | None = None, - ) -> _FromParquetFileWiseFn: - if not _use_optimization(): - return self - - if columns is None: - return self - - new_form = self.form.select_columns(columns) - new = _FromParquetFileWiseFn( + def project_columns(self, columns: set[str]): + return _FromParquetFileWiseFn( fs=self.fs, - form=new_form, + form=self.form.select_columns(columns), listsep=self.listsep, unnamed_root=self.unnamed_root, - original_form=original_form, + original_form=self.form, behavior=self.behavior, **self.kwargs, ) - return new class _FromParquetFragmentWiseFn(_FromParquetFn): @@ -188,22 +172,12 @@ def __call__(self, pair: Any) -> ak.Array: ) return ak.Array(unproject_layout(self.original_form, array.layout)) - def project_columns( - self, - columns: Sequence[str] | None, - original_form: Form | None = None, - ) -> _FromParquetFragmentWiseFn: - if not _use_optimization(): - return self - - if columns is None: - return self - + def project_columns(self, columns: set[str]): return _FromParquetFragmentWiseFn( fs=self.fs, form=self.form.select_columns(columns), unnamed_root=self.unnamed_root, - original_form=original_form, + original_form=self.form, behavior=self.behavior, **self.kwargs, ) @@ -319,8 +293,6 @@ def from_parquet( if split_row_groups is None: split_row_groups = row_counts is not None and len(row_counts) > 1 - meta = ak.typetracer.typetracer_from_form(subform, behavior=behavior) - if split_row_groups is False or subrg is None: # file-wise return from_map( @@ -338,7 +310,6 @@ def from_parquet( actual_paths, label=label, token=token, - meta=meta, ) else: # row-group wise @@ -378,7 +349,6 @@ def from_parquet( label=label, token=token, divisions=tuple(divisions), - meta=typetracer_array(meta), ) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index f52067f6..24656899 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -3,8 +3,8 @@ import copy import logging import warnings -from collections.abc import Hashable, Mapping -from typing import TYPE_CHECKING, Any +from collections.abc import Hashable, Iterable, Mapping +from typing import TYPE_CHECKING, Any, cast import dask.config from awkward.typetracer import touch_data @@ -14,14 +14,11 @@ from dask.local import get_sync from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer -from dask_awkward.lib._utils import LIST_KEY - -log = logging.getLogger(__name__) - if TYPE_CHECKING: - from awkward import Array as AwkwardArray + from awkward._nplikes.typetracer import TypeTracerReport +log = logging.getLogger(__name__) COLUMN_OPT_FAILED_WARNING_MSG = """The necessary columns optimization failed; exception raised: @@ -87,6 +84,88 @@ def optimize( return dsk +def _prepare_buffer_projection( + dsk: HighLevelGraph, +) -> tuple[dict[str, TypeTracerReport], dict[str, Any]] | None: + """Pair layer names with lists of necessary columns.""" + import awkward as ak + + if not _has_projectable_awkward_io_layer(dsk): + return None + + layer_to_projection_state: dict[str, Any] = {} + layer_to_reports: dict[str, TypeTracerReport] = {} + projection_layers = dict(dsk.layers) + + for name, lay in dsk.layers.items(): + if isinstance(lay, AwkwardInputLayer): + if lay.is_projectable: + # Insert mocked array into layers, replacing generation func + # Keep track of mocked state + ( + projection_layers[name], + layer_to_reports[name], + layer_to_projection_state[name], + ) = lay.prepare_for_projection() + elif lay.is_mockable: + projection_layers[name] = lay.mock() + elif hasattr(lay, "mock"): + projection_layers[name] = lay.mock() + + for name in _ak_output_layer_names(dsk): + projection_layers[name] = _mock_output(projection_layers[name]) + + for name in _opt_touch_all_layer_names(dsk): + projection_layers[name] = _touch_and_call(projection_layers[name]) + + hlg = HighLevelGraph(projection_layers, dsk.dependencies) + + # this loop builds up what are the possible final leaf nodes by + # inspecting the dependents dictionary. If something does not have + # a dependent, it must be the end of a graph. These are the things + # we need to compute for; we only use a single partition (the + # first). for a single collection `.compute()` this list will just + # be length 1; but if we are using `dask.compute` to pass in + # multiple collections to be computed simultaneously, this list + # will increase in length. + leaf_layers_keys = [ + (k, 0) for k, v in dsk.dependents.items() if isinstance(v, set) and len(v) == 0 + ] + + # now we try to compute for each possible output layer key (leaf + # node on partition 0); this will cause the typetacer reports to + # get correct fields/columns touched. If the result is a record or + # an array we of course want to touch all of the data/fields. + try: + for layer in hlg.layers.values(): + layer.__dict__.pop("_cached_dict", None) + results = get_sync(hlg, leaf_layers_keys) + for out in results: + if isinstance(out, (ak.Array, ak.Record)): + touch_data(out) + except Exception as err: + on_fail = dask.config.get("awkward.optimization.on-fail") + # this is the default, throw a warning but skip the optimization. + if on_fail == "warn": + warnings.warn( + COLUMN_OPT_FAILED_WARNING_MSG.format(exception=type(err), message=err) + ) + # option "pass" means do not throw warning but skip the optimization. + elif on_fail == "pass": + log.debug("Column projection optimization failed; optimization skipped.") + # option "raise" to raise the exception here + elif on_fail == "raise": + raise + else: + raise ValueError( + f"Invalid awkward.optimization.on-fail option: {on_fail}.\n" + "Valid options are 'warn', 'pass', or 'raise'." + ) + return None + else: + return layer_to_reports, layer_to_projection_state + + def optimize_columns(dsk: HighLevelGraph) -> HighLevelGraph: """Run column projection optimization. @@ -116,17 +195,91 @@ def optimize_columns(dsk: HighLevelGraph) -> HighLevelGraph: New, optimized task graph with column-projected ``AwkwardInputLayer``. """ - layers = dsk.layers.copy() # type: ignore - deps = dsk.dependencies.copy() # type: ignore + projection_data = _prepare_buffer_projection(dsk) + if projection_data is None: + return dsk + + # Unpack result + layer_to_reports, layer_to_projection_state = projection_data + + # Project layers using projection state + layers = dict(dsk.layers) + for name, state in layer_to_projection_state.items(): + layers[name] = cast(AwkwardInputLayer, layers[name]).project( + report=layer_to_reports[name], state=state + ) - layer_to_necessary_columns = _necessary_columns(dsk) + return HighLevelGraph(layers, dsk.dependencies) - for name, neccols in layer_to_necessary_columns.items(): - meta = layers[name]._meta - neccols = _prune_wildcards(neccols, meta) - layers[name] = layers[name].project_columns(neccols) - return HighLevelGraph(layers, deps) +def _layers_with_annotation(dsk: HighLevelGraph, key: str) -> list[str]: + return [n for n, v in dsk.layers.items() if (v.annotations or {}).get(key)] + + +def _ak_output_layer_names(dsk: HighLevelGraph) -> list[str]: + """Get a list output layer names. + + Output layer names are annotated with 'ak_output'. + + Parameters + ---------- + dsk : HighLevelGraph + Graph of interest. + + Returns + ------- + list[str] + Names of the output layers. + + """ + return _layers_with_annotation(dsk, "ak_output") + + +def _opt_touch_all_layer_names(dsk: HighLevelGraph) -> list[str]: + return [n for n, v in dsk.layers.items() if hasattr(v, "_opt_touch_all")] + # return _layers_with_annotation(dsk, "ak_touch_all") + + +def _has_projectable_awkward_io_layer(dsk: HighLevelGraph) -> bool: + """Check if a graph at least one AwkwardInputLayer that is project-able.""" + for _, v in dsk.layers.items(): + if isinstance(v, AwkwardInputLayer) and v.is_projectable: + return True + return False + + +def _touch_all_data(*args, **kwargs): + """Mock writing an ak.Array to disk by touching data buffers.""" + for arg in args + tuple(kwargs.values()): + touch_data(arg) + + +def _mock_output(layer): + """Update a layer to run the _touch_all_data.""" + assert len(layer.dsk) == 1 + + new_layer = copy.deepcopy(layer) + mp = new_layer.dsk.copy() + for k in iter(mp.keys()): + mp[k] = (_touch_all_data,) + mp[k][1:] + new_layer.dsk = mp + return new_layer + + +def _touch_and_call_fn(fn, *args, **kwargs): + _touch_all_data(*args, **kwargs) + return fn(*args, **kwargs) + + +def _touch_and_call(layer): + assert len(layer.dsk) == 1 + + new_layer = copy.deepcopy(layer) + mp = new_layer.dsk.copy() + for k in iter(mp.keys()): + mp[k] = (_touch_and_call_fn,) + mp[k] + new_layer.dsk = mp + return new_layer def rewrite_layer_chains(dsk: HighLevelGraph, keys: Any) -> HighLevelGraph: @@ -251,100 +404,6 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Any) -> HighLevelGraph: return HighLevelGraph(layers, deps) -def _projectable_input_layer_names(dsk: HighLevelGraph) -> list[str]: - """Get list of column-projectable AwkwardInputLayer names. - - Parameters - ---------- - dsk : HighLevelGraph - Task graph of interest - - Returns - ------- - list[str] - Names of the AwkwardInputLayers in the graph that are - column-projectable. - - """ - return [ - n - for n, v in dsk.layers.items() - if isinstance(v, AwkwardInputLayer) and hasattr(v.io_func, "project_columns") - # following condition means dep/pickled layers cannot be optimised - and hasattr(v, "_meta") - ] - - -def _layers_with_annotation(dsk: HighLevelGraph, key: str) -> list[str]: - return [n for n, v in dsk.layers.items() if (v.annotations or {}).get(key)] - - -def _ak_output_layer_names(dsk: HighLevelGraph) -> list[str]: - """Get a list output layer names. - - Output layer names are annotated with 'ak_output'. - - Parameters - ---------- - dsk : HighLevelGraph - Graph of interest. - - Returns - ------- - list[str] - Names of the output layers. - - """ - return _layers_with_annotation(dsk, "ak_output") - - -def _opt_touch_all_layer_names(dsk: HighLevelGraph) -> list[str]: - return [n for n, v in dsk.layers.items() if hasattr(v, "_opt_touch_all")] - # return _layers_with_annotation(dsk, "ak_touch_all") - - -def _has_projectable_awkward_io_layer(dsk: HighLevelGraph) -> bool: - """Check if a graph at least one AwkwardInputLayer that is project-able.""" - for _, v in dsk.layers.items(): - if isinstance(v, AwkwardInputLayer) and hasattr(v.io_func, "project_columns"): - return True - return False - - -def _touch_all_data(*args, **kwargs): - """Mock writing an ak.Array to disk by touching data buffers.""" - for arg in args + tuple(kwargs.values()): - touch_data(arg) - - -def _mock_output(layer): - """Update a layer to run the _touch_all_data.""" - assert len(layer.dsk) == 1 - - new_layer = copy.deepcopy(layer) - mp = new_layer.dsk.copy() - for k in iter(mp.keys()): - mp[k] = (_touch_all_data,) + mp[k][1:] - new_layer.dsk = mp - return new_layer - - -def _touch_and_call_fn(fn, *args, **kwargs): - _touch_all_data(*args, **kwargs) - return fn(*args, **kwargs) - - -def _touch_and_call(layer): - assert len(layer.dsk) == 1 - - new_layer = copy.deepcopy(layer) - mp = new_layer.dsk.copy() - for k in iter(mp.keys()): - mp[k] = (_touch_and_call_fn,) + mp[k] - new_layer.dsk = mp - return new_layer - - def _recursive_replace(args, layer, parent, indices): args2 = [] for arg in args: @@ -370,186 +429,7 @@ def _recursive_replace(args, layer, parent, indices): return args2 -def _get_column_reports(dsk: HighLevelGraph) -> dict[str, Any]: - """Get the TypeTracerReport for each input layer in a task graph.""" - if not _has_projectable_awkward_io_layer(dsk): - return {} - - import awkward as ak - - layers = dsk.layers.copy() # type: ignore - deps = dsk.dependencies.copy() # type: ignore - dependents = dsk.dependents - - reports = {} - - # make labelled report - projectable = _projectable_input_layer_names(dsk) - for name, lay in dsk.layers.items(): - if name in projectable and hasattr(lay, "mock"): - layers[name], report = lay.mock() - reports[name] = report - elif hasattr(lay, "mock"): - layers[name], _ = lay.mock() - - for name in _ak_output_layer_names(dsk): - layers[name] = _mock_output(layers[name]) - - for name in _opt_touch_all_layer_names(dsk): - layers[name] = _touch_and_call(layers[name]) - - hlg = HighLevelGraph(layers, deps) - - # this loop builds up what are the possible final leaf nodes by - # inspecting the dependents dictionary. If something does not have - # a dependent, it must be the end of a graph. These are the things - # we need to compute for; we only use a single partition (the - # first). for a single collection `.compute()` this list will just - # be length 1; but if we are using `dask.compute` to pass in - # multiple collections to be computed simultaneously, this list - # will increase in length. - leaf_layers_keys = [ - (k, 0) for k, v in dependents.items() if isinstance(v, set) and len(v) == 0 - ] - - # now we try to compute for each possible output layer key (leaf - # node on partition 0); this will cause the typetacer reports to - # get correct fields/columns touched. If the result is a record or - # an array we of course want to touch all of the data/fields. - try: - for layer in hlg.layers.values(): - layer.__dict__.pop("_cached_dict", None) - results = get_sync(hlg, leaf_layers_keys) - for out in results: - if isinstance(out, (ak.Array, ak.Record)): - touch_data(out) - except Exception as err: - on_fail = dask.config.get("awkward.optimization.on-fail") - # this is the default, throw a warning but skip the optimization. - if on_fail == "warn": - warnings.warn( - COLUMN_OPT_FAILED_WARNING_MSG.format(exception=type(err), message=err) - ) - return {} - # option "pass" means do not throw warning but skip the optimization. - elif on_fail == "pass": - log.debug("Column projection optimization failed; optimization skipped.") - return {} - # option "raise" to raise the exception here - elif on_fail == "raise": - raise - else: - raise ValueError( - f"Invalid awkward.optimization.on-fail option: {on_fail}.\n" - "Valid options are 'warn', 'pass', or 'raise'." - ) - - return reports - - -def _necessary_columns(dsk: HighLevelGraph) -> dict[str, list[str]]: - """Pair layer names with lists of necessary columns.""" - layer_to_columns = {} - for name, report in _get_column_reports(dsk).items(): - touched_data_keys = {_ for _ in report.data_touched if _ is not None} - - necessary_columns = [] - for key in sorted(touched_data_keys): - if key == name or key == "None": - continue - - layer, column = key.split(".", 1) - if layer != name: - continue - - # List offsets are tagged as {key}.{LIST_KEY}. This routine resolve - # _columns_, so we use a wildcard to indicate that we want to load - # *any* child column of this list. If the list contains no records, - # then we load - if column.endswith(LIST_KEY): - list_parent_path = column[: -(len(LIST_KEY) + 1)].rstrip(".") - if list_parent_path not in necessary_columns: - necessary_columns.append(f"{list_parent_path}.*") - else: - necessary_columns.append(column) - layer_to_columns[name] = necessary_columns - return layer_to_columns - - -def _prune_wildcards(columns: list[str], meta: AwkwardArray) -> list[str]: - """Prune wildcard '.*' suffix from necessary columns results. - - The _necessary_columns logic will provide some results of the - form: - - "foo.bar.*" - - This function will eliminate the wildcard in one of two ways - (continuing to use "foo.bar.*" as an example): - - 1. If "foo.bar" has leaves (subfields) "x", "y" and "z", and _any_ - of those (so "foo.bar.x", for example) also appears in the - columns list, then essentially nothing will happen (except we - drop the wildcard string), because we can be sure that a leaf - of "foo.bar" will be read (in this case it's "foo.bar.x"). - - 2. If "foo.bar" has multiple leaves but none of them appear in the - columns list, we will just pick the first one that we find - (that is, foo.bar.fields[0]). - - Parameters - ---------- - columns : list[str] - The "raw" columns deemed necessary by the necessary columns - logic; can still contain the wildcard syntax we've adopted. - meta : ak.Array - The metadata (typetracer array) from the AwkwardInputLayer - that is getting optimized. - - Returns - ------- - list[str] - Columns with the wildcard syntax pruned and (also augmented - with a leaf node if necessary). - - """ - - good_columns: list[str] = [] - wildcard_columns: list[str] = [] - for column in columns: - if column.endswith(".*"): - wildcard_columns.append(column) - else: - good_columns.append(column) - - for column in wildcard_columns: - definite_column = column[:-2] - # each time we meet a wildcard column we need to start back - # with the original meta array. - imeta = meta - reverse_column_parts = [*definite_column.split(".")] - reverse_column_parts.reverse() - - while reverse_column_parts: - part = reverse_column_parts.pop() - # for unnamed roots part may be an empty string, so we - # need this if statement. - if part: - imeta = imeta[part] - # The given wildcard column contains no sub-columns, so load - # the column itself - if not imeta.fields: - good_columns.append(definite_column) - - # Otherwise, prefer a column that we already need to load - else: - for field in imeta.fields: - field_column = f"{definite_column}.{field}" - if field_column in good_columns: - break - # Or, pick an arbitrary (first) column if no other fields are yet - # required - else: - good_columns.append(f"{definite_column}.{imeta.fields[0]}") - - return good_columns +def _buffer_keys_for_layer( + buffer_keys: Iterable[str], known_buffer_keys: frozenset[str] +): + return {k for k in buffer_keys if k in known_buffer_keys} diff --git a/src/dask_awkward/lib/utils.py b/src/dask_awkward/lib/utils.py new file mode 100644 index 00000000..2448dfb9 --- /dev/null +++ b/src/dask_awkward/lib/utils.py @@ -0,0 +1,159 @@ +from __future__ import annotations + +__all__ = ("trace_form_structure", "buffer_keys_required_to_compute_shapes") + +from collections.abc import Callable, Iterable, Iterator +from typing import TYPE_CHECKING, TypedDict, TypeVar + +import awkward as ak + +if TYPE_CHECKING: + from awkward.forms import Form + +KNOWN_LENGTH_ATTRIBUTES = frozenset(("mask",)) +UNKNOWN_LENGTH_ATTRIBUTES = frozenset(("offsets", "starts", "stops", "index", "tags")) +DATA_ATTRIBUTES = frozenset(("data",)) +METADATA_ATTRIBUTES = UNKNOWN_LENGTH_ATTRIBUTES | KNOWN_LENGTH_ATTRIBUTES + + +class FormStructure(TypedDict): + form_key_to_form: dict[str, Form] + form_key_to_parent_form_key: dict[str, str] + form_key_to_path: dict[str, tuple[str, ...]] + form_key_to_buffer_keys: dict[str, tuple[str, ...]] + + +def trace_form_structure(form: Form, buffer_key: Callable) -> FormStructure: + form_key_to_form: dict[str, Form] = {} + form_key_to_parent_form_key: dict[str, str | None] = {} + form_key_to_path: dict[str, tuple[str, ...]] = {} + form_key_to_buffer_keys: dict[str, tuple[str, ...]] = {} + + def impl_with_parent(form: Form, parent_form: Form | None, column_path): + # Associate child form key with parent form key + form_key_to_parent_form_key[form.form_key] = ( + None if parent_form is None else parent_form.form_key + ) + # Keep track of column-level path + form_key_to_path[form.form_key] = column_path + # Identify each form with a form key + form_key_to_form[form.form_key] = form + # Pre-compute the buffer keys for each form + form_key_to_buffer_keys[form.form_key] = form.expected_from_buffers( + recursive=False, buffer_key=buffer_key + ) + if form.is_union: + for _i, entry in enumerate(form.contents): + impl_with_parent(entry, form, column_path) + elif form.is_indexed: + impl_with_parent(form.content, form, column_path) + elif form.is_list: + impl_with_parent(form.content, form, column_path) + elif form.is_option: + impl_with_parent(form.content, form, column_path) + elif form.is_record: + for field in form.fields: + next_column_path = column_path + (field,) + # Recurse + impl_with_parent(form.content(field), form, next_column_path) + elif form.is_unknown or form.is_numpy: + pass + else: + raise AssertionError(form) + + impl_with_parent(form, None, ()) + + return { + "form_key_to_form": form_key_to_form, + "form_key_to_parent_form_key": form_key_to_parent_form_key, + "form_key_to_path": form_key_to_path, + "form_key_to_buffer_keys": form_key_to_buffer_keys, + } + + +T = TypeVar("T") + + +def walk_bijective_graph(node: T, graph: dict[T, T | None]) -> Iterator[T]: + while (node := graph.get(node)) is not None: + yield node + + +def walk_graph_breadth_first( + node: T, graph: dict[T, Iterable[T] | None] +) -> Iterator[T]: + children = graph.get(node) + if children is None: + return + yield from children + for node in children: + yield from walk_graph_breadth_first(node, graph) + + +def walk_graph_depth_first(node: T, graph: dict[T, Iterable[T] | None]) -> Iterator[T]: + children = graph.get(node) + if children is None: + return + for node in children: + yield node + yield from walk_graph_depth_first(node, graph) + + +def buffer_keys_required_to_compute_shapes( + parse_buffer_key: Callable[[str], tuple[str, str]], + shape_buffers: Iterable[str], + form_key_to_parent_key: dict[str, str], + form_key_to_buffer_keys: dict[str, Iterable[str]], +): + # Buffers needing known shapes must traverse all the way up the tree. + for buffer_key in shape_buffers: + form_key, attribute = parse_buffer_key(buffer_key) + + # For impacted form keys above this node + for impacted_form_key in walk_bijective_graph(form_key, form_key_to_parent_key): + # Identify the associated buffers + for impacted_buffer_key in form_key_to_buffer_keys[impacted_form_key]: + _, other_attribute = parse_buffer_key(impacted_buffer_key) + + # Would the omission of this key lead to unknown lengths? + if other_attribute in UNKNOWN_LENGTH_ATTRIBUTES: + # If so, touch the buffers + yield impacted_buffer_key + + +def render_buffer_key(form: Form, form_key: str, attribute: str) -> str: + return f"{form_key}-{attribute}" + + +def parse_buffer_key(buffer_key: str) -> tuple[str, str]: + return buffer_key.rsplit("-", maxsplit=1) + + +def form_with_unique_keys(form: Form, key: str) -> Form: + def impl(form: Form, key: str): + # Set form key + form.form_key = key + + # If the form is a record we need to loop over all fields in the + # record and set form that include the field name; this will keep + # recursing as well. + if form.is_record: + for field in form.fields: + impl(form.content(field), f"{key}.{field}") + + elif form.is_union: + for i, entry in enumerate(form.contents): + impl(entry, f"{key}#{i}") + + # NumPy like array is easy + elif form.is_numpy or form.is_unknown: + pass + + # Anything else grab the content and keep recursing + else: + impl(form.content, f"{key}.content") + + # Perform a "deep" copy without preserving references + form = ak.forms.from_dict(form.to_dict()) + impl(form, key) + return form diff --git a/tests/.gitattributes b/tests/.gitattributes new file mode 100644 index 00000000..c6e58634 --- /dev/null +++ b/tests/.gitattributes @@ -0,0 +1 @@ +*.root filter=lfs diff=lfs merge=lfs -text diff --git a/tests/test-uproot/nano_dy.root b/tests/test-uproot/nano_dy.root new file mode 100644 index 00000000..fdb513e6 --- /dev/null +++ b/tests/test-uproot/nano_dy.root @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:159b17b34b4d13ea4960d36fb9b01f2a505cc1a5a8de0fb3cfa185154543c81c +size 352599 diff --git a/tests/test_core.py b/tests/test_core.py index a0a0789e..c3924e7d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -335,7 +335,9 @@ def test_record_to_delayed(daa: Array) -> None: r = daa[0] assert type(r) == dak.Record d = r.to_delayed() - assert r.compute().tolist() == d.compute().tolist() + x = r.compute().tolist() + y = d.compute().tolist() + assert x == y def test_record_fields(daa: Array) -> None: diff --git a/tests/test_inspect.py b/tests/test_inspect.py index 25e265dd..66a2e9cf 100644 --- a/tests/test_inspect.py +++ b/tests/test_inspect.py @@ -7,23 +7,53 @@ import dask_awkward as dak +test_uproot_path = Path(__file__).parent / "test-uproot" -def test_necessary_columns( + +def test_necessary_buffers( daa: dak.Array, tmpdir_factory: pytest.TempdirFactory ) -> None: z = daa.points.x + daa.points.y - for k, v in dak.necessary_columns(z).items(): - assert set(v) == {"points.x", "points.y"} + for k, v in dak.report_necessary_buffers(z).items(): + assert v == ( + frozenset( + { + "@.points-offsets", + "@.points.content.y-data", + "@.points.content.x-data", + } + ), + frozenset(), + ) w = dak.to_parquet( daa.points.x, str(Path(tmpdir_factory.mktemp("pq")) / "out"), compute=False ) - for k, v in dak.necessary_columns(w).items(): - assert set(v) == {"points.x"} + for k, v in dak.report_necessary_buffers(w).items(): + assert v == ( + frozenset({"@.points-offsets", "@.points.content.x-data"}), + frozenset(), + ) q = {"z": z, "w": w} - for k, v in dak.necessary_columns(q).items(): - assert set(v) == {"points.x", "points.y"} + for k, v in dak.report_necessary_buffers(q).items(): + assert v == ( + frozenset( + { + "@.points-offsets", + "@.points.content.x-data", + "@.points.content.y-data", + } + ), + frozenset(), + ) + + z = dak.zeros_like(daa.points.x) + for k, v in dak.report_necessary_buffers(z).items(): + assert v == ( + frozenset({"@.points-offsets"}), + frozenset({"@.points.content.x-data"}), + ) def test_visualize_works(daa): @@ -34,17 +64,14 @@ def test_visualize_works(daa): dask.compute(query, optimize_graph=True) -def test_basic_root_works(daa): +def test_basic_root_works(): pytest.importorskip("hist") pytest.importorskip("uproot") import hist.dask as hda import uproot events = uproot.dask( - { - "https://github.com/CoffeaTeam/coffea/blob/master/" - "tests/samples/nano_dy.root?raw=true": "Events" - }, + {test_uproot_path / "nano_dy.root": "Events"}, steps_per_file=3, ) diff --git a/tests/test_io.py b/tests/test_io.py index 63a50f76..dd149482 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -85,9 +85,6 @@ def assert_2(arr): c = b.map_partitions(assert_1)[["b", "a"]].map_partitions(assert_2) - # arbitrary order here - assert set(list(dak.necessary_columns(c).values())[0]) == {"b", "a"} - arr = c.compute() assert arr.fields == ["b", "a"] # output has required order diff --git a/tests/test_layers.py b/tests/test_layers.py deleted file mode 100644 index f47e0182..00000000 --- a/tests/test_layers.py +++ /dev/null @@ -1,12 +0,0 @@ -import dask_awkward as dak -from dask_awkward.layers import AwkwardInputLayer - - -def test_idempotent_layer_column_project(caa): - daa = dak.from_awkward(caa, npartitions=2) - n = 0 - for _, v in daa.dask.layers.items(): - if isinstance(v, AwkwardInputLayer): - n += 1 - assert v is v.project_columns(["abc"]) - assert n > 0 diff --git a/tests/test_str.py b/tests/test_str.py index 2643530d..4fc20b82 100644 --- a/tests/test_str.py +++ b/tests/test_str.py @@ -2,14 +2,18 @@ import pytest -pytest.importorskip("pyarrow") +pyarrow = pytest.importorskip("pyarrow") import awkward as ak import awkward.operations.str as akstr +from packaging.version import Version import dask_awkward as dak from dask_awkward.lib.testutils import assert_eq +PYARROW_GT_13 = Version(pyarrow.__version__) >= Version("13.0") + + lines1 = [ "this is line one", "123", @@ -254,12 +258,13 @@ def test_rtrim_whitespace() -> None: assert_eq(akstr.rtrim_whitespace(daa), akstr.rtrim_whitespace(caa)) +@pytest.mark.skipif(not PYARROW_GT_13, reason="ak.str.slice is broken for pyarrow<13") @pytest.mark.parametrize( "args", [ (2, 10, 2), - (3, None, 1), (0, None, 3), + (3, None, 1), ], ) def test_slice(args: tuple) -> None: