Skip to content

Commit

Permalink
Use the Zarr's new getitems() API (#131)
Browse files Browse the repository at this point in the history
By using the new API in zarr-developers/zarr-python#1131, we do not have to guess whether to read into host or device memory. That is, no more filtering of specify keys like:
```python
 if os.path.basename(fn) in [ 
     zarr.storage.array_meta_key, 
     zarr.storage.group_meta_key, 
     zarr.storage.attrs_key, 
 ]: 
```

Notice, this PR is on hold until Zarr v2.15 is released 

Closes #119

UPDATE: Zarr v2.15 has been released

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)
  - Jordan Jacobelli (https://github.com/jjacobelli)

URL: #131
  • Loading branch information
madsbk committed Jun 21, 2023
1 parent c29eb24 commit 56d48ef
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 58 deletions.
1 change: 1 addition & 0 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies:
- libcufile=1.4.0.31
- ninja
- numpy>=1.21
- packaging
- pre-commit
- pydata-sphinx-theme
- pytest
Expand Down
1 change: 1 addition & 0 deletions conda/recipes/kvikio/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ requirements:
- numpy >=1.20
- cupy >=12.0.0
- zarr
- packaging
- {{ pin_compatible('cudatoolkit', max_pin='x', min_pin='x') }}

test:
Expand Down
1 change: 1 addition & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ dependencies:
packages:
- numpy>=1.21
- zarr
- packaging
- output_types: conda
packages:
- cupy>=12.0.0
Expand Down
1 change: 1 addition & 0 deletions legate/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ requires-python = ">=3.9"
dependencies = [
"cupy-cuda11x>=12.0.0",
"numpy>=1.21",
"packaging",
"zarr",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../dependencies.yaml and run `rapids-dependency-file-generator`.
classifiers = [
Expand Down
5 changes: 2 additions & 3 deletions python/benchmarks/single-node-io.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,8 @@ def run_zarr(args):
import kvikio.zarr

dir_path = args.dir / "zarr"

if not hasattr(zarr.Array, "meta_array"):
raise RuntimeError("requires Zarr v2.13+")
if not kvikio.zarr.supported:
raise RuntimeError(f"requires Zarr >={kvikio.zarr.MINIMUM_ZARR_VERSION}")

compressor = None
if args.zarr_compressor is not None:
Expand Down
113 changes: 82 additions & 31 deletions python/kvikio/zarr.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,114 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import contextlib
import os
import os.path
from abc import abstractmethod
from typing import Any, Mapping, Sequence

import cupy
import numpy
import numpy as np
import zarr
import zarr.creation
import zarr.storage
from numcodecs.abc import Codec
from numcodecs.compat import ensure_contiguous_ndarray_like
from numcodecs.registry import register_codec
from packaging.version import parse

import kvikio
import kvikio.nvcomp
from kvikio._lib.arr import asarray

MINIMUM_ZARR_VERSION = "2.15"

# Is this version of zarr supported? We depend on the `Context`
# argument introduced in https://github.com/zarr-developers/zarr-python/pull/1131
# in zarr v2.15.
supported = parse(zarr.__version__) >= parse(MINIMUM_ZARR_VERSION)


class GDSStore(zarr.storage.DirectoryStore):
"""GPUDirect Storage (GDS) class using directories and files.
This class works like `zarr.storage.DirectoryStore` but use GPU
buffers and will use GDS when applicable.
The store supports both CPU and GPU buffers but when reading, GPU
buffers are returned always.
This class works like `zarr.storage.DirectoryStore` but implements
getitems() in order to support direct reading into device memory.
It uses KvikIO for reads and writes, which in turn will use GDS
when applicable.
TODO: Write metadata to disk in order to preserve the item types such that
GPU items are read as GPU device buffers and CPU items are read as bytes.
Notes
-----
GDSStore doesn't implement `_fromfile()` thus non-array data such as
meta data is always read into host memory.
This is because only zarr.Array use getitems() to retrieve data.
"""

# The default output array type used by getitems().
default_meta_array = numpy.empty(())

def __init__(self, *args, **kwargs) -> None:
if not kvikio.zarr.supported:
raise RuntimeError(
f"GDSStore requires Zarr >={kvikio.zarr.MINIMUM_ZARR_VERSION}"
)
super().__init__(*args, **kwargs)

def __eq__(self, other):
return isinstance(other, GDSStore) and self.path == other.path

def _fromfile(self, fn):
"""Read `fn` into device memory _unless_ `fn` refers to Zarr metadata"""
if os.path.basename(fn) in [
zarr.storage.array_meta_key,
zarr.storage.group_meta_key,
zarr.storage.attrs_key,
]:
return super()._fromfile(fn)
else:
nbytes = os.path.getsize(fn)
with kvikio.CuFile(fn, "r") as f:
ret = cupy.empty(nbytes, dtype="u1")
read = f.read(ret)
assert read == nbytes
return ret

def _tofile(self, a, fn):
a = asarray(a)
assert a.contiguous
if a.cuda:
with kvikio.CuFile(fn, "w") as f:
written = f.write(a)
assert written == a.nbytes
else:
super()._tofile(a.obj, fn)
with kvikio.CuFile(fn, "w") as f:
written = f.write(a)
assert written == a.nbytes

def getitems(
self,
keys: Sequence[str],
*,
contexts: Mapping[str, Mapping] = {},
) -> Mapping[str, Any]:
"""Retrieve data from multiple keys.
Parameters
----------
keys : Iterable[str]
The keys to retrieve
contexts: Mapping[str, Context]
A mapping of keys to their context. Each context is a mapping of store
specific information. If the "meta_array" key exist, GDSStore use its
values as the output array otherwise GDSStore.default_meta_array is used.
Returns
-------
Mapping
A collection mapping the input keys to their results.
"""
ret = {}
io_results = []

with contextlib.ExitStack() as stack:
for key in keys:
filepath = os.path.join(self.path, key)
if not os.path.isfile(filepath):
continue
try:
meta_array = contexts[key]["meta_array"]
except KeyError:
meta_array = self.default_meta_array

nbytes = os.path.getsize(filepath)
f = stack.enter_context(kvikio.CuFile(filepath, "r"))
ret[key] = numpy.empty_like(meta_array, shape=(nbytes,), dtype="u1")
io_results.append((f.pread(ret[key]), nbytes))

for future, nbytes in io_results:
nbytes_read = future.get()
if nbytes_read != nbytes:
raise RuntimeError(
f"Incomplete read ({nbytes_read}) expected {nbytes}"
)
return ret


class NVCompCompressor(Codec):
Expand Down
1 change: 1 addition & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ requires-python = ">=3.9"
dependencies = [
"cupy-cuda11x>=12.0.0",
"numpy>=1.21",
"packaging",
"zarr",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../dependencies.yaml and run `rapids-dependency-file-generator`.
classifiers = [
Expand Down
8 changes: 4 additions & 4 deletions python/tests/test_benchmarks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import os
Expand Down Expand Up @@ -29,9 +29,9 @@ def test_single_node_io(run_cmd, tmp_path, api):
"""Test benchmarks/single-node-io.py"""

if "zarr" in api:
zarr = pytest.importorskip("zarr")
if not hasattr(zarr.Array, "meta_array"):
pytest.skip("requires Zarr v2.13+")
kz = pytest.importorskip("kvikio.zarr")
if not kz.supported:
pytest.skip(f"requires Zarr >={kz.MINIMUM_ZARR_VERSION}")

retcode = run_cmd(
cmd=[
Expand Down
113 changes: 93 additions & 20 deletions python/tests/test_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
zarr = pytest.importorskip("zarr")
kvikio_zarr = pytest.importorskip("kvikio.zarr")

# To support CuPy arrays, we need the `meta_array` argument introduced in
# Zarr v2.13, see <https://github.com/zarr-developers/zarr-python/pull/934>
if not hasattr(zarr.Array, "meta_array"):
pytest.skip("requires Zarr v2.13+", allow_module_level=True)

if not kvikio_zarr.supported:
pytest.skip(
f"requires Zarr >={kvikio_zarr.MINIMUM_ZARR_VERSION}",
allow_module_level=True,
)


@pytest.fixture
Expand All @@ -22,46 +24,117 @@ def store(tmp_path):
return kvikio_zarr.GDSStore(tmp_path / "test-file.zarr")


@pytest.mark.parametrize("array_type", ["numpy", "cupy"])
def test_direct_store_access(store, array_type):
def test_direct_store_access(store, xp):
"""Test accessing the GDS Store directly"""

module = pytest.importorskip(array_type)
a = module.arange(5, dtype="u1")
a = xp.arange(5, dtype="u1")
store["a"] = a
b = store["a"]

# Notice, GDSStore always returns a cupy array
assert type(b) is cupy.ndarray
cupy.testing.assert_array_equal(a, b)
# Notice, unless using getitems(), GDSStore always returns bytes
assert isinstance(b, bytes)
assert (xp.frombuffer(b, dtype="u1") == a).all()


def test_array(store):
"""Test Zarr array"""
@pytest.mark.parametrize("xp_write", ["numpy", "cupy"])
@pytest.mark.parametrize("xp_read_a", ["numpy", "cupy"])
@pytest.mark.parametrize("xp_read_b", ["numpy", "cupy"])
def test_direct_store_access_getitems(store, xp_write, xp_read_a, xp_read_b):
"""Test accessing the GDS Store directly using getitems()"""

a = cupy.arange(100)
z = zarr.array(
a, chunks=10, compressor=None, store=store, meta_array=cupy.empty(())
xp_read_a = pytest.importorskip(xp_read_a)
xp_read_b = pytest.importorskip(xp_read_b)
xp_write = pytest.importorskip(xp_write)
a = xp_write.arange(5, dtype="u1")
b = a * 2
store["a"] = a
store["b"] = b

res = store.getitems(
keys=["a", "b"],
contexts={
"a": {"meta_array": xp_read_a.empty(())},
"b": {"meta_array": xp_read_b.empty(())},
},
)
assert isinstance(res["a"], xp_read_a.ndarray)
assert isinstance(res["b"], xp_read_b.ndarray)
cupy.testing.assert_array_equal(res["a"], a)
cupy.testing.assert_array_equal(res["b"], b)


def test_array(store, xp):
"""Test Zarr array"""

a = xp.arange(100)
z = zarr.array(a, chunks=10, compressor=None, store=store, meta_array=xp.empty(()))
assert isinstance(z.meta_array, type(a))
assert a.shape == z.shape
assert a.dtype == z.dtype
assert isinstance(a, type(z[:]))
cupy.testing.assert_array_equal(a, z[:])
xp.testing.assert_array_equal(a, z[:])


def test_group(store):
def test_group(store, xp):
"""Test Zarr group"""

g = zarr.open_group(store, meta_array=cupy.empty(()))
g = zarr.open_group(store, meta_array=xp.empty(()))
g.ones("data", shape=(10, 11), dtype=int, compressor=None)
a = g["data"]
assert a.shape == (10, 11)
assert a.dtype == int
assert isinstance(a, zarr.Array)
assert isinstance(a[:], cupy.ndarray)
assert isinstance(a.meta_array, xp.ndarray)
assert isinstance(a[:], xp.ndarray)
assert (a[:] == 1).all()


def test_open_array(store, xp):
"""Test Zarr's open_array()"""

a = xp.arange(10)
z = zarr.open_array(
store,
shape=a.shape,
dtype=a.dtype,
chunks=(10,),
compressor=None,
meta_array=xp.empty(()),
)
z[:] = a
assert a.shape == z.shape
assert a.dtype == z.dtype
assert isinstance(a, type(z[:]))
xp.testing.assert_array_equal(a, z[:])


@pytest.mark.parametrize("inline_array", [True, False])
def test_dask_read(store, xp, inline_array):
"""Test Zarr read in Dask"""

da = pytest.importorskip("dask.array")
a = xp.arange(100)
z = zarr.array(a, chunks=10, compressor=None, store=store, meta_array=xp.empty(()))
d = da.from_zarr(z, inline_array=inline_array)
d += 1
xp.testing.assert_array_equal(a + 1, d.compute())


def test_dask_write(store, xp):
"""Test Zarr write in Dask"""

da = pytest.importorskip("dask.array")

# Write dask array to disk using Zarr
a = xp.arange(100)
d = da.from_array(a, chunks=10)
da.to_zarr(d, store, compressor=None, meta_array=xp.empty(()))

# Validate the written Zarr array
z = zarr.open_array(store)
xp.testing.assert_array_equal(a, z[:])


@pytest.mark.parametrize("xp_read", ["numpy", "cupy"])
@pytest.mark.parametrize("xp_write", ["numpy", "cupy"])
@pytest.mark.parametrize("compressor", kvikio_zarr.nvcomp_compressors)
Expand Down

0 comments on commit 56d48ef

Please sign in to comment.