Skip to content

Commit

Permalink
Added Store.getsize (#2426)
Browse files Browse the repository at this point in the history
* Added Store.getsize

Closes #2420

* fixups

* lint

* wip

* Use prefix

* fixup

* Maybe fixup

* lint

* revert buffer chnages

* fixup

* fixup

* Remove AsyncIterable support

* fixup

---------

Co-authored-by: Davis Bennett <davis.v.bennett@gmail.com>
  • Loading branch information
TomAugspurger and d-v-b authored Nov 14, 2024
1 parent 7ba5296 commit f74e53a
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 3 deletions.
68 changes: 68 additions & 0 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from itertools import starmap
from typing import TYPE_CHECKING, Protocol, runtime_checkable

from zarr.core.buffer.core import default_buffer_prototype
from zarr.core.common import concurrent_map
from zarr.core.config import config

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
from types import TracebackType
Expand Down Expand Up @@ -344,6 +348,70 @@ async def _get_many(
for req in requests:
yield (req[0], await self.get(*req))

async def getsize(self, key: str) -> int:
"""
Return the size, in bytes, of a value in a Store.
Parameters
----------
key : str
Returns
-------
nbytes : int
The size of the value (in bytes).
Raises
------
FileNotFoundError
When the given key does not exist in the store.
"""
# Note to implementers: this default implementation is very inefficient since
# it requires reading the entire object. Many systems will have ways to get the
# size of an object without reading it.
value = await self.get(key, prototype=default_buffer_prototype())
if value is None:
raise FileNotFoundError(key)
return len(value)

async def getsize_prefix(self, prefix: str) -> int:
"""
Return the size, in bytes, of all values under a prefix.
Parameters
----------
prefix : str
The prefix of the directory to measure.
Returns
-------
nbytes : int
The sum of the sizes of the values in the directory (in bytes).
See Also
--------
zarr.Array.nbytes_stored
Store.getsize
Notes
-----
``getsize_prefix`` is just provided as a potentially faster alternative to
listing all the keys under a prefix calling :meth:`Store.getsize` on each.
In general, ``prefix`` should be the path of an Array or Group in the Store.
Implementations may differ on the behavior when some other ``prefix``
is provided.
"""
# TODO: Overlap listing keys with getsize calls.
# Currently, we load the list of keys into memory and only then move
# on to getting sizes. Ideally we would overlap those two, which should
# improve tail latency and might reduce memory pressure (since not all keys
# would be in memory at once).
keys = [(x,) async for x in self.list_prefix(prefix)]
limit = config.get("async.concurrency")
sizes = await concurrent_map(keys, self.getsize, limit=limit)
return sum(sizes)


@runtime_checkable
class ByteGetter(Protocol):
Expand Down
13 changes: 13 additions & 0 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,9 @@ async def nchunks_initialized(self) -> int:
"""
return len(await chunks_initialized(self))

async def nbytes_stored(self) -> int:
return await self.store_path.store.getsize_prefix(self.store_path.path)

def _iter_chunk_coords(
self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
) -> Iterator[ChunkCoords]:
Expand Down Expand Up @@ -1727,6 +1730,16 @@ def nchunks_initialized(self) -> int:
"""
return sync(self._async_array.nchunks_initialized())

def nbytes_stored(self) -> int:
"""
Determine the size, in bytes, of the array actually written to the store.
Returns
-------
size : int
"""
return sync(self._async_array.nbytes_stored())

def _iter_chunk_keys(
self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
) -> Iterator[str]:
Expand Down
4 changes: 3 additions & 1 deletion src/zarr/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def product(tup: ChunkCoords) -> int:


async def concurrent_map(
items: Iterable[T], func: Callable[..., Awaitable[V]], limit: int | None = None
items: Iterable[T],
func: Callable[..., Awaitable[V]],
limit: int | None = None,
) -> list[V]:
if limit is None:
return await asyncio.gather(*list(starmap(func, items)))
Expand Down
9 changes: 8 additions & 1 deletion src/zarr/storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import asyncio
import io
import os
import shutil
from pathlib import Path
from typing import TYPE_CHECKING

from zarr.abc.store import ByteRangeRequest, Store
from zarr.core.buffer import Buffer
from zarr.core.buffer.core import default_buffer_prototype
from zarr.core.common import concurrent_map

if TYPE_CHECKING:
Expand Down Expand Up @@ -124,10 +126,12 @@ def __eq__(self, other: object) -> bool:
async def get(
self,
key: str,
prototype: BufferPrototype,
prototype: BufferPrototype | None = None,
byte_range: tuple[int | None, int | None] | None = None,
) -> Buffer | None:
# docstring inherited
if prototype is None:
prototype = default_buffer_prototype()
if not self._is_open:
await self._open()
assert isinstance(key, str)
Expand Down Expand Up @@ -222,3 +226,6 @@ async def list_dir(self, prefix: str) -> AsyncIterator[str]:
yield key.relative_to(base).as_posix()
except (FileNotFoundError, NotADirectoryError):
pass

async def getsize(self, key: str) -> int:
return os.path.getsize(self.root / key)
8 changes: 8 additions & 0 deletions src/zarr/storage/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,11 @@ async def delete_dir(self, prefix: str) -> None:
# docstring inherited
with self.log(prefix):
await self._store.delete_dir(prefix=prefix)

async def getsize(self, key: str) -> int:
with self.log(key):
return await self._store.getsize(key)

async def getsize_prefix(self, prefix: str) -> int:
with self.log(prefix):
return await self._store.getsize_prefix(prefix)
13 changes: 13 additions & 0 deletions src/zarr/storage/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,16 @@ async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
f"{self.path}/{prefix}", detail=False, maxdepth=None, withdirs=False
):
yield onefile.removeprefix(f"{self.path}/")

async def getsize(self, key: str) -> int:
path = _dereference_path(self.path, key)
info = await self.fs._info(path)

size = info.get("size")

if size is None:
# Not all filesystems support size. Fall back to reading the entire object
return await super().getsize(key)
else:
# fsspec doesn't have typing. We'll need to assume or verify this is true
return int(size)
23 changes: 23 additions & 0 deletions src/zarr/testing/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,26 @@ async def test_set_if_not_exists(self, store: S) -> None:

result = await store.get("k2", default_buffer_prototype())
assert result == new

async def test_getsize(self, store: S) -> None:
key = "k"
data = self.buffer_cls.from_bytes(b"0" * 10)
await self.set(store, key, data)

result = await store.getsize(key)
assert isinstance(result, int)
assert result > 0

async def test_getsize_raises(self, store: S) -> None:
with pytest.raises(FileNotFoundError):
await store.getsize("not-a-real-key")

async def test_getsize_prefix(self, store: S) -> None:
prefix = "array/c/"
for i in range(10):
data = self.buffer_cls.from_bytes(b"0" * 10)
await self.set(store, f"{prefix}/{i}", data)

result = await store.getsize_prefix(prefix)
assert isinstance(result, int)
assert result > 0
24 changes: 24 additions & 0 deletions tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,30 @@ async def test_chunks_initialized() -> None:
assert observed == expected


def test_nbytes_stored() -> None:
arr = zarr.create(shape=(100,), chunks=(10,), dtype="i4")
result = arr.nbytes_stored()
assert result == 366 # the size of the metadata document. This is a fragile test.
arr[:50] = 1
result = arr.nbytes_stored()
assert result == 566 # the size with 5 chunks filled.
arr[50:] = 2
result = arr.nbytes_stored()
assert result == 766 # the size with all chunks filled.


async def test_nbytes_stored_async() -> None:
arr = await zarr.api.asynchronous.create(shape=(100,), chunks=(10,), dtype="i4")
result = await arr.nbytes_stored()
assert result == 366 # the size of the metadata document. This is a fragile test.
await arr.setitem(slice(50), 1)
result = await arr.nbytes_stored()
assert result == 566 # the size with 5 chunks filled.
await arr.setitem(slice(50, 100), 2)
result = await arr.nbytes_stored()
assert result == 766 # the size with all chunks filled.


def test_default_fill_values() -> None:
a = Array.create(MemoryStore(), shape=5, chunk_shape=5, dtype="<U4")
assert a.fill_value == ""
Expand Down
1 change: 1 addition & 0 deletions tests/test_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
if TYPE_CHECKING:
from collections.abc import AsyncGenerator

from zarr.core.buffer import BufferPrototype
from zarr.core.buffer.core import Buffer
from zarr.core.common import ChunkCoords

Expand Down
2 changes: 1 addition & 1 deletion tests/test_metadata/test_consolidated.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
open,
open_consolidated,
)
from zarr.core.buffer.core import default_buffer_prototype
from zarr.core.buffer import default_buffer_prototype
from zarr.core.group import ConsolidatedMetadata, GroupMetadata
from zarr.core.metadata import ArrayV3Metadata
from zarr.core.metadata.v2 import ArrayV2Metadata
Expand Down
1 change: 1 addition & 0 deletions tests/test_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from numcodecs.blosc import Blosc

import zarr
import zarr.core.buffer
import zarr.storage
from zarr import Array
from zarr.storage import MemoryStore, StorePath
Expand Down

0 comments on commit f74e53a

Please sign in to comment.