diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml index ba1f78ccef..982c69002c 100644 --- a/conda/environments/all_cuda-118_arch-x86_64.yaml +++ b/conda/environments/all_cuda-118_arch-x86_64.yaml @@ -22,6 +22,7 @@ dependencies: - libcufile=1.4.0.31 - ninja - numpy>=1.21 +- packaging - pre-commit - pydata-sphinx-theme - pytest diff --git a/conda/recipes/kvikio/meta.yaml b/conda/recipes/kvikio/meta.yaml index 887e9c4d73..d842b94c97 100644 --- a/conda/recipes/kvikio/meta.yaml +++ b/conda/recipes/kvikio/meta.yaml @@ -59,6 +59,7 @@ requirements: - numpy >=1.20 - cupy >=12.0.0 - zarr + - packaging - {{ pin_compatible('cudatoolkit', max_pin='x', min_pin='x') }} test: diff --git a/dependencies.yaml b/dependencies.yaml index ca77f753de..1b53aafc0f 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -203,6 +203,7 @@ dependencies: packages: - numpy>=1.21 - zarr + - packaging - output_types: conda packages: - cupy>=12.0.0 diff --git a/legate/pyproject.toml b/legate/pyproject.toml index 8d64bf1105..80f5b2e10e 100644 --- a/legate/pyproject.toml +++ b/legate/pyproject.toml @@ -25,6 +25,7 @@ requires-python = ">=3.9" dependencies = [ "cupy-cuda11x>=12.0.0", "numpy>=1.21", + "packaging", "zarr", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../dependencies.yaml and run `rapids-dependency-file-generator`. classifiers = [ diff --git a/python/benchmarks/single-node-io.py b/python/benchmarks/single-node-io.py index cceeb62415..72b57300cc 100644 --- a/python/benchmarks/single-node-io.py +++ b/python/benchmarks/single-node-io.py @@ -214,9 +214,8 @@ def run_zarr(args): import kvikio.zarr dir_path = args.dir / "zarr" - - if not hasattr(zarr.Array, "meta_array"): - raise RuntimeError("requires Zarr v2.13+") + if not kvikio.zarr.supported: + raise RuntimeError(f"requires Zarr >={kvikio.zarr.MINIMUM_ZARR_VERSION}") compressor = None if args.zarr_compressor is not None: diff --git a/python/kvikio/zarr.py b/python/kvikio/zarr.py index ba5d0769f0..50a6756db8 100644 --- a/python/kvikio/zarr.py +++ b/python/kvikio/zarr.py @@ -1,63 +1,114 @@ # Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. +import contextlib import os import os.path from abc import abstractmethod +from typing import Any, Mapping, Sequence import cupy +import numpy import numpy as np +import zarr import zarr.creation import zarr.storage from numcodecs.abc import Codec from numcodecs.compat import ensure_contiguous_ndarray_like from numcodecs.registry import register_codec +from packaging.version import parse import kvikio import kvikio.nvcomp -from kvikio._lib.arr import asarray + +MINIMUM_ZARR_VERSION = "2.15" + +# Is this version of zarr supported? We depend on the `Context` +# argument introduced in https://github.com/zarr-developers/zarr-python/pull/1131 +# in zarr v2.15. +supported = parse(zarr.__version__) >= parse(MINIMUM_ZARR_VERSION) class GDSStore(zarr.storage.DirectoryStore): """GPUDirect Storage (GDS) class using directories and files. - This class works like `zarr.storage.DirectoryStore` but use GPU - buffers and will use GDS when applicable. - The store supports both CPU and GPU buffers but when reading, GPU - buffers are returned always. + This class works like `zarr.storage.DirectoryStore` but implements + getitems() in order to support direct reading into device memory. + It uses KvikIO for reads and writes, which in turn will use GDS + when applicable. - TODO: Write metadata to disk in order to preserve the item types such that - GPU items are read as GPU device buffers and CPU items are read as bytes. + Notes + ----- + GDSStore doesn't implement `_fromfile()` thus non-array data such as + meta data is always read into host memory. + This is because only zarr.Array use getitems() to retrieve data. """ + # The default output array type used by getitems(). + default_meta_array = numpy.empty(()) + + def __init__(self, *args, **kwargs) -> None: + if not kvikio.zarr.supported: + raise RuntimeError( + f"GDSStore requires Zarr >={kvikio.zarr.MINIMUM_ZARR_VERSION}" + ) + super().__init__(*args, **kwargs) + def __eq__(self, other): return isinstance(other, GDSStore) and self.path == other.path - def _fromfile(self, fn): - """Read `fn` into device memory _unless_ `fn` refers to Zarr metadata""" - if os.path.basename(fn) in [ - zarr.storage.array_meta_key, - zarr.storage.group_meta_key, - zarr.storage.attrs_key, - ]: - return super()._fromfile(fn) - else: - nbytes = os.path.getsize(fn) - with kvikio.CuFile(fn, "r") as f: - ret = cupy.empty(nbytes, dtype="u1") - read = f.read(ret) - assert read == nbytes - return ret - def _tofile(self, a, fn): - a = asarray(a) - assert a.contiguous - if a.cuda: - with kvikio.CuFile(fn, "w") as f: - written = f.write(a) - assert written == a.nbytes - else: - super()._tofile(a.obj, fn) + with kvikio.CuFile(fn, "w") as f: + written = f.write(a) + assert written == a.nbytes + + def getitems( + self, + keys: Sequence[str], + *, + contexts: Mapping[str, Mapping] = {}, + ) -> Mapping[str, Any]: + """Retrieve data from multiple keys. + + Parameters + ---------- + keys : Iterable[str] + The keys to retrieve + contexts: Mapping[str, Context] + A mapping of keys to their context. Each context is a mapping of store + specific information. If the "meta_array" key exist, GDSStore use its + values as the output array otherwise GDSStore.default_meta_array is used. + + Returns + ------- + Mapping + A collection mapping the input keys to their results. + """ + ret = {} + io_results = [] + + with contextlib.ExitStack() as stack: + for key in keys: + filepath = os.path.join(self.path, key) + if not os.path.isfile(filepath): + continue + try: + meta_array = contexts[key]["meta_array"] + except KeyError: + meta_array = self.default_meta_array + + nbytes = os.path.getsize(filepath) + f = stack.enter_context(kvikio.CuFile(filepath, "r")) + ret[key] = numpy.empty_like(meta_array, shape=(nbytes,), dtype="u1") + io_results.append((f.pread(ret[key]), nbytes)) + + for future, nbytes in io_results: + nbytes_read = future.get() + if nbytes_read != nbytes: + raise RuntimeError( + f"Incomplete read ({nbytes_read}) expected {nbytes}" + ) + return ret class NVCompCompressor(Codec): diff --git a/python/pyproject.toml b/python/pyproject.toml index a876374195..9ff3b6fe4a 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -25,6 +25,7 @@ requires-python = ">=3.9" dependencies = [ "cupy-cuda11x>=12.0.0", "numpy>=1.21", + "packaging", "zarr", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../dependencies.yaml and run `rapids-dependency-file-generator`. classifiers = [ diff --git a/python/tests/test_benchmarks.py b/python/tests/test_benchmarks.py index ec2ec2b395..ee0321d40a 100644 --- a/python/tests/test_benchmarks.py +++ b/python/tests/test_benchmarks.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import os @@ -29,9 +29,9 @@ def test_single_node_io(run_cmd, tmp_path, api): """Test benchmarks/single-node-io.py""" if "zarr" in api: - zarr = pytest.importorskip("zarr") - if not hasattr(zarr.Array, "meta_array"): - pytest.skip("requires Zarr v2.13+") + kz = pytest.importorskip("kvikio.zarr") + if not kz.supported: + pytest.skip(f"requires Zarr >={kz.MINIMUM_ZARR_VERSION}") retcode = run_cmd( cmd=[ diff --git a/python/tests/test_zarr.py b/python/tests/test_zarr.py index 99d47c9df1..296c5f1ee6 100644 --- a/python/tests/test_zarr.py +++ b/python/tests/test_zarr.py @@ -10,10 +10,12 @@ zarr = pytest.importorskip("zarr") kvikio_zarr = pytest.importorskip("kvikio.zarr") -# To support CuPy arrays, we need the `meta_array` argument introduced in -# Zarr v2.13, see -if not hasattr(zarr.Array, "meta_array"): - pytest.skip("requires Zarr v2.13+", allow_module_level=True) + +if not kvikio_zarr.supported: + pytest.skip( + f"requires Zarr >={kvikio_zarr.MINIMUM_ZARR_VERSION}", + allow_module_level=True, + ) @pytest.fixture @@ -22,46 +24,117 @@ def store(tmp_path): return kvikio_zarr.GDSStore(tmp_path / "test-file.zarr") -@pytest.mark.parametrize("array_type", ["numpy", "cupy"]) -def test_direct_store_access(store, array_type): +def test_direct_store_access(store, xp): """Test accessing the GDS Store directly""" - module = pytest.importorskip(array_type) - a = module.arange(5, dtype="u1") + a = xp.arange(5, dtype="u1") store["a"] = a b = store["a"] - # Notice, GDSStore always returns a cupy array - assert type(b) is cupy.ndarray - cupy.testing.assert_array_equal(a, b) + # Notice, unless using getitems(), GDSStore always returns bytes + assert isinstance(b, bytes) + assert (xp.frombuffer(b, dtype="u1") == a).all() -def test_array(store): - """Test Zarr array""" +@pytest.mark.parametrize("xp_write", ["numpy", "cupy"]) +@pytest.mark.parametrize("xp_read_a", ["numpy", "cupy"]) +@pytest.mark.parametrize("xp_read_b", ["numpy", "cupy"]) +def test_direct_store_access_getitems(store, xp_write, xp_read_a, xp_read_b): + """Test accessing the GDS Store directly using getitems()""" - a = cupy.arange(100) - z = zarr.array( - a, chunks=10, compressor=None, store=store, meta_array=cupy.empty(()) + xp_read_a = pytest.importorskip(xp_read_a) + xp_read_b = pytest.importorskip(xp_read_b) + xp_write = pytest.importorskip(xp_write) + a = xp_write.arange(5, dtype="u1") + b = a * 2 + store["a"] = a + store["b"] = b + + res = store.getitems( + keys=["a", "b"], + contexts={ + "a": {"meta_array": xp_read_a.empty(())}, + "b": {"meta_array": xp_read_b.empty(())}, + }, ) + assert isinstance(res["a"], xp_read_a.ndarray) + assert isinstance(res["b"], xp_read_b.ndarray) + cupy.testing.assert_array_equal(res["a"], a) + cupy.testing.assert_array_equal(res["b"], b) + + +def test_array(store, xp): + """Test Zarr array""" + + a = xp.arange(100) + z = zarr.array(a, chunks=10, compressor=None, store=store, meta_array=xp.empty(())) + assert isinstance(z.meta_array, type(a)) assert a.shape == z.shape assert a.dtype == z.dtype assert isinstance(a, type(z[:])) - cupy.testing.assert_array_equal(a, z[:]) + xp.testing.assert_array_equal(a, z[:]) -def test_group(store): +def test_group(store, xp): """Test Zarr group""" - g = zarr.open_group(store, meta_array=cupy.empty(())) + g = zarr.open_group(store, meta_array=xp.empty(())) g.ones("data", shape=(10, 11), dtype=int, compressor=None) a = g["data"] assert a.shape == (10, 11) assert a.dtype == int assert isinstance(a, zarr.Array) - assert isinstance(a[:], cupy.ndarray) + assert isinstance(a.meta_array, xp.ndarray) + assert isinstance(a[:], xp.ndarray) assert (a[:] == 1).all() +def test_open_array(store, xp): + """Test Zarr's open_array()""" + + a = xp.arange(10) + z = zarr.open_array( + store, + shape=a.shape, + dtype=a.dtype, + chunks=(10,), + compressor=None, + meta_array=xp.empty(()), + ) + z[:] = a + assert a.shape == z.shape + assert a.dtype == z.dtype + assert isinstance(a, type(z[:])) + xp.testing.assert_array_equal(a, z[:]) + + +@pytest.mark.parametrize("inline_array", [True, False]) +def test_dask_read(store, xp, inline_array): + """Test Zarr read in Dask""" + + da = pytest.importorskip("dask.array") + a = xp.arange(100) + z = zarr.array(a, chunks=10, compressor=None, store=store, meta_array=xp.empty(())) + d = da.from_zarr(z, inline_array=inline_array) + d += 1 + xp.testing.assert_array_equal(a + 1, d.compute()) + + +def test_dask_write(store, xp): + """Test Zarr write in Dask""" + + da = pytest.importorskip("dask.array") + + # Write dask array to disk using Zarr + a = xp.arange(100) + d = da.from_array(a, chunks=10) + da.to_zarr(d, store, compressor=None, meta_array=xp.empty(())) + + # Validate the written Zarr array + z = zarr.open_array(store) + xp.testing.assert_array_equal(a, z[:]) + + @pytest.mark.parametrize("xp_read", ["numpy", "cupy"]) @pytest.mark.parametrize("xp_write", ["numpy", "cupy"]) @pytest.mark.parametrize("compressor", kvikio_zarr.nvcomp_compressors)