diff --git a/docs/dh_demo.ipynb b/docs/dh_demo.ipynb index ac05dd3..4eb7f7b 100644 --- a/docs/dh_demo.ipynb +++ b/docs/dh_demo.ipynb @@ -43,7 +43,7 @@ }, "outputs": [], "source": [ - "h = dh.Histogram(dh.axis.Regular(50, -3, 3), dh.axis.Regular(50, -3, 3))\n", + "h = dh.Histogram(dh.axis.Regular(50, -3, 3), dh.axis.Regular(50, -3, 3))\n", "h_concrete = bh.Histogram(*h.axes)" ] }, @@ -85,7 +85,7 @@ "metadata": {}, "outputs": [], "source": [ - "h.fill(x, y) # data is still lazy" + "h.fill(x, y) # data is still lazy" ] }, { diff --git a/src/dask_histogram/boost.py b/src/dask_histogram/boost.py index 1583fc7..fd75df2 100644 --- a/src/dask_histogram/boost.py +++ b/src/dask_histogram/boost.py @@ -10,7 +10,6 @@ import boost_histogram.axis as axis import boost_histogram.storage as storage import dask.array as da -import numpy as np from dask.base import is_dask_collection from dask.delayed import Delayed, delayed from dask.utils import is_arraylike, is_dataframe_like @@ -20,200 +19,12 @@ if TYPE_CHECKING: from .typing import BinArg, BinType, DaskCollection, RangeArg, RangeType -else: - DaskCollection = object import dask_histogram __all__ = ("Histogram", "histogram", "histogram2d", "histogramdd") -@delayed -def _blocked_fill_1d( - data: Any, - meta_hist: Histogram, - weight: Any | None = None, -): - """Single delayed (1D) histogram concrete fill.""" - hfb = Histogram(*meta_hist.axes, storage=meta_hist._storage_type()) - hfb.concrete_fill(data, weight=weight) - return hfb - - -@delayed -def _blocked_fill_rectangular( - data: Any, - meta_hist: Histogram, - weight: Any | None = None, -): - hfb = Histogram(*meta_hist.axes, storage=meta_hist._storage_type()) - hfb.concrete_fill(*(data.T), weight=weight) - return hfb - - -@delayed -def _blocked_fill_dataframe( - data: Any, - meta_hist: Histogram, - weight: Any | None = None, -): - hfb = Histogram(*meta_hist.axes, storage=meta_hist._storage_type()) - hfb.concrete_fill(*(data[c] for c in data.columns), weight=weight) - return hfb - - -@delayed -def _blocked_fill_multiarg( - *args: np.ndarray, - meta_hist: Histogram, - weight: Any | None = None, -): - """Single delayed (nD) histogram concrete fill.""" - hfb = Histogram(*meta_hist.axes, storage=meta_hist._storage_type()) - hfb.concrete_fill(*args, weight=weight) - return hfb - - -@delayed -def _delayed_to_numpy(hist: Histogram, flow: bool = False): - return hist.to_numpy(flow=flow, dd=True)[0:1] - - -def _to_dask_array( - hist: Histogram, dtype: Any, flow: bool = False, dd: bool = False -) -> tuple[da.Array, tuple[da.Array, ...]] | tuple[da.Array, ...]: - shape = hist.shape - s1 = hist.to_delayed() # delayed sum of histogram - s2 = _delayed_to_numpy(s1, flow=flow) - arr = da.from_delayed(s2, shape=shape, dtype=dtype) - edges = (da.asarray(a.edges) for a in hist.axes) - if dd: - return (arr, list(edges)) - else: - return (arr, *(tuple(edges))) - - -def _fill_1d( - data: DaskCollection, - *, - meta_hist: Histogram, - weight: DaskCollection | None = None, -) -> Delayed: - """Fill a one dimensional histogram. - - This function is compatible with dask.array.Array objects and - dask.dataframe.Series objects. - - """ - data = data.to_delayed() # type: ignore - if weight is None: - hists = [_blocked_fill_1d(a, meta_hist) for a in data] - else: - weights = weight.to_delayed() - hists = [_blocked_fill_1d(a, meta_hist, w) for a, w in zip(data, weights)] - return delayed(sum)(hists) - - -def _fill_rectangular( - data: DaskCollection, - *, - meta_hist: Histogram, - weight: DaskCollection | None = None, -) -> Delayed: - """Fill nD histogram given a rectangular (multi-column) dataset. - - Array Input - ----------- - - If a multi-column dask.array.Array is passed to `fill`, we want to - avoid having to compute the transpose of the _entire collection_ - (this may be an expensive and unncessary computation). - - For this to work the input data can be chunked only along the row - axis; we convert the whole collection (nD array) to delayed which - gives us a 2D array of Delayed objects with a shape of the form - (n_row_chunks, n_column_chunks). We transpose this and take the - first and only element along the n_column_chunks dimension. This - gives us a list of Delayed objects along the row dimension (each - object wraps a multidimensional NumPy array). - - Finally, we loop over the list of Delayed objects and compute the - transpose on each chunk _as necessary_ when the materialized array - (a subset of the original complete collection) is used. - - DataFrame Input - --------------- - - DataFrames are a bit simpler; we just use to_delayed() and pass to - the dedicated @delayed fill function. - - """ - if is_arraylike(data): - data = data.to_delayed().T[0] # type: ignore - ff = _blocked_fill_rectangular - elif is_dataframe_like(data): - data = data.to_delayed() # type: ignore - ff = _blocked_fill_dataframe - else: - raise TypeError( - f"data must be dask array or dataframe like; found {type(data)}" - ) - - if weight is None: - hists = [ff(s, meta_hist=meta_hist, weight=None) for s in data] - else: - weights = weight.to_delayed() - if len(weights) != len(data): - raise ValueError("data and weight must have the same number of chunks.") - hists = [ff(s, meta_hist=meta_hist, weight=w) for s, w in zip(data, weights)] - - return delayed(sum)(hists) - - -def _fill_multiarg( - *data: DaskCollection, - meta_hist: Histogram, - weight: DaskCollection | None = None, -) -> Delayed: - """Fill nD histogram given a multiarg (vectors) sample. - - This function is compatible with multiple one dimensional - dask.array.Array objects as well as multiple dask.dataframe.Series - objects; they just must have equally sized chunks/partitions. - - """ - D = len(data) - # each entry is data along a specific dimension - delayed_data = [a.to_delayed() for a in data] - # check that all dimensions are chunked identically - npartitions = len(delayed_data[0]) - for i in range(1, D): - if len(delayed_data[i]) != npartitions: - raise ValueError("All dimensions must be chunked/partitioned identically.") - # We need to create a data structure that will connect coordinate - # chunks. We loop over the number of partitions and connect the - # ith chunk along each dimension (the loop over j is the loop over - # the total number of dimensions). - delayed_data = [ - tuple(delayed_data[j][i] for j in range(D)) for i in range(npartitions) - ] - - if weight is None: - hists = [_blocked_fill_multiarg(*d, meta_hist=meta_hist) for d in delayed_data] - else: - weights = weight.to_delayed() - if len(weights) != npartitions: - raise ValueError( - "sample and weight must have the same number of chunks/partitions." - ) - hists = [ - _blocked_fill_multiarg(*d, meta_hist=meta_hist, weight=w) - for d, w in zip(delayed_data, weights) - ] - - return delayed(sum)(hists) - - class Histogram(bh.Histogram, family=dask_histogram): """Histogram object capable of lazy computation. @@ -346,8 +157,10 @@ def fill( weight : dask.array.Array or dask.dataframe.Series, optional Weights associated with each sample. The weights must be chunked/partitioned in a way compatible with the dataset. - sample : Any - Unsupported argument from boost_histogram.Histogram.fill. + sample : dask.array.Array or dask.dataframe.Series, optional + Provide samples if the histogram storage allows it. The + partitioning/chunking of the samples must be compatible + with the input data. threads : int, optional Ignored argument kept for compatibility with boost-histogram. We let Dask have complete control over threads. @@ -376,7 +189,7 @@ def fill( else: raise ValueError(f"Cannot interpret input data: {args}") - new_fill = factory(*args, histref=self, weights=weight) + new_fill = factory(*args, histref=self, weights=weight, sample=sample) if self._staged is not None: self._staged += new_fill else: @@ -540,14 +353,15 @@ def to_dask_array( The edges for each dimension """ - if self._storage_type is bh.storage.Int64: - dtype: object = np.uint64 - elif self._storage_type is bh.storage.AtomicInt64: - dtype = np.uint64 + if self._staged is not None: + return self._staged.to_dask_array(flow=flow, dd=dd) else: - dtype = float - - return _to_dask_array(self, dtype=dtype, flow=flow, dd=dd) + counts, edges = self.to_numpy(flow=flow, dd=True, view=False) + counts = da.from_array(counts) + edges = [da.from_array(ea) for ea in edges] + if dd: + return counts, edges + return tuple(counts, *edges) def histogramdd( diff --git a/src/dask_histogram/core.py b/src/dask_histogram/core.py index 3b041ac..6f6399e 100644 --- a/src/dask_histogram/core.py +++ b/src/dask_histogram/core.py @@ -16,11 +16,9 @@ from dask.utils import is_dataframe_like, key_split if TYPE_CHECKING: - import numpy as np + from numpy.typing import NDArray from .typing import DaskCollection -else: - DaskCollection = object __all__ = ( "AggHistogram", @@ -52,13 +50,42 @@ def clone(histref: bh.Histogram = None) -> bh.Histogram: return bh.Histogram(*histref.axes, storage=histref._storage_type()) +def _blocked_sa( + data: Any, + *, + histref: bh.Histogram = None, +) -> bh.Histogram: + """Blocked calculation; single argument; unweighted; no sample.""" + if data.ndim == 1: + return clone(histref).fill(data) + elif data.ndim == 2: + return clone(histref).fill(*(data.T)) + else: + raise ValueError("Data must be one or two dimensional.") + + +def _blocked_sa_s( + data: Any, + sample: Any, + *, + histref: bh.Histogram = None, +) -> bh.Histogram: + """Blocked calculation; single argument; unweighted; with sample.""" + if data.ndim == 1: + return clone(histref).fill(data, sample=sample) + elif data.ndim == 2: + return clone(histref).fill(*(data.T), sample=sample) + else: + raise ValueError("Data must be one or two dimensional.") + + def _blocked_sa_w( data: Any, weights: Any, *, histref: bh.Histogram = None, ) -> bh.Histogram: - """Blocked calculation; single argument; weighted.""" + """Blocked calculation; single argument; weighted; no sample.""" if data.ndim == 1: return clone(histref).fill(data, weight=weights) elif data.ndim == 2: @@ -67,36 +94,76 @@ def _blocked_sa_w( raise ValueError("Data must be one or two dimensional.") -def _blocked_sa( +def _blocked_sa_w_s( data: Any, + weights: Any, + sample: Any, *, histref: bh.Histogram = None, ) -> bh.Histogram: - """Blocked calculation; single argument; unweighted.""" + """Blocked calculation; single argument; weighted; with sample.""" if data.ndim == 1: - return clone(histref).fill(data, weight=None) + return clone(histref).fill(data, weight=weights, sample=sample) elif data.ndim == 2: - return clone(histref).fill(*(data.T), weight=None) + return clone(histref).fill(*(data.T), weight=weights, sample=sample) else: raise ValueError("Data must be one or two dimensional.") +def _blocked_ma( + *data: Any, + histref: bh.Histogram = None, +) -> bh.Histogram: + """Blocked calculation; multiargument; unweighted; no sample.""" + return clone(histref).fill(*data) + + +def _blocked_ma_s( + *data: Any, + histref: bh.Histogram = None, +) -> bh.Histogram: + """Blocked calculation; multiargument; unweighted; with sample.""" + sample = data[-1] + data = data[:-1] + return clone(histref).fill(*data, sample=sample) + + def _blocked_ma_w( *data: Any, histref: bh.Histogram = None, ) -> bh.Histogram: - """Blocked calculation; multiargument; unweighted.""" + """Blocked calculation; multiargument; weighted; no sample.""" weights = data[-1] data = data[:-1] return clone(histref).fill(*data, weight=weights) -def _blocked_ma( +def _blocked_ma_w_s( *data: Any, histref: bh.Histogram = None, ) -> bh.Histogram: - """Blocked calculation; multiargument; unweighted.""" - return clone(histref).fill(*data, weight=None) + """Blocked calculation; multiargument; weighted; with sample.""" + weights = data[-2] + sample = data[-1] + data = data[:-2] + return clone(histref).fill(*data, weight=weights, sample=sample) + + +def _blocked_df( + data: Any, + *, + histref: bh.Histogram = None, +) -> bh.Histogram: + return clone(histref).fill(*(data[c] for c in data.columns), weight=None) + + +def _blocked_df_s( + data: Any, + sample: Any, + *, + histref: bh.Histogram = None, +) -> bh.Histogram: + return clone(histref).fill(*(data[c] for c in data.columns), sample=sample) def _blocked_df_w( @@ -105,15 +172,21 @@ def _blocked_df_w( *, histref: bh.Histogram = None, ) -> bh.Histogram: - """Blocked calculation; single argument; weighted.""" + """Blocked calculation; single argument; weighted; no sample.""" return clone(histref).fill(*(data[c] for c in data.columns), weight=weights) -def _blocked_df( +def _blocked_df_w_s( data: Any, + weights: Any, + sample: Any, + *, histref: bh.Histogram = None, ) -> bh.Histogram: - return clone(histref).fill(*(data[c] for c in data.columns), weight=None) + """Blocked calculation; single argument; weighted; with sample.""" + return clone(histref).fill( + *(data[c] for c in data.columns), weight=weights, sample=sample + ) class AggHistogram(db.Item): @@ -216,7 +289,16 @@ def to_boost(self) -> bh.Histogram: """ return self.compute() - def __array__(self) -> np.ndarray: + def values(self, flow: bool = False) -> NDArray[Any]: + return self.to_boost().values() + + def variances(self, flow: bool = False) -> NDArray[Any] | None: + return self.to_boost().variances() + + def counts(self, flow: bool = False) -> NDArray[Any]: + return self.to_boost().counts() + + def __array__(self) -> NDArray[Any]: return self.compute().__array__() def __iadd__(self, other) -> AggHistogram: @@ -376,51 +458,85 @@ def _reduction(ph: PartitionedHistogram, split_every: int = None) -> AggHistogra def _dependencies( *args: DaskCollection, - weights: DaskCollection = None, + weights: DaskCollection | None = None, + sample: DaskCollection | None = None, ) -> tuple[DaskCollection, ...]: - if weights is not None: + if weights is not None and sample is None: return (*args, weights) + elif weights is None and sample is not None: + return (*args, sample) + elif weights is not None and sample is not None: + return (*args, weights, sample) return args -def _weight_check(*data: DaskCollection, weights: DaskCollection = None) -> int: - if weights is None: +def _weight_sample_check( + *data: DaskCollection, + weights: DaskCollection | None = None, + sample: DaskCollection | None = None, +) -> int: + if weights is None and sample is None: return 0 - if weights.ndim != 1: - raise ValueError("weights must be one dimensional.") - if data[0].npartitions != weights.npartitions: - raise ValueError("weights must have as many partitions as the data.") + if weights is not None: + if weights.ndim != 1: + raise ValueError("weights must be one dimensional.") + if data[0].npartitions != weights.npartitions: + raise ValueError("weights must have as many partitions as the data.") + if sample is not None: + if sample.ndim != 1: + raise ValueError("sample must be one dimensional.") + if data[0].npartitions != sample.npartitions: + raise ValueError("sample must have as many partitions as the data.") return 0 def _partitioned_histogram( *data: DaskCollection, histref: bh.Histogram, - weights: DaskCollection = None, - split_every: int = None, + weights: DaskCollection | None = None, + sample: DaskCollection | None = None, + split_every: int | None = None, ) -> AggHistogram: - name = f"hist-on-block-{tokenize(data, histref, weights)}" + name = f"hist-on-block-{tokenize(data, histref, weights, sample)}" data_is_df = is_dataframe_like(data[0]) - _weight_check(*data, weights=weights) + _weight_sample_check(*data, weights=weights) if len(data) == 1 and not data_is_df: x = data[0] - if weights is not None: + if weights is not None and sample is not None: + g = partitionwise( + _blocked_sa_w_s, name, x, weights, sample, histref=histref + ) + elif weights is not None and sample is None: g = partitionwise(_blocked_sa_w, name, x, weights, histref=histref) + elif weights is None and sample is not None: + g = partitionwise(_blocked_sa_s, name, x, sample, histref=histref) else: g = partitionwise(_blocked_sa, name, x, histref=histref) elif len(data) == 1 and data_is_df: x = data[0] - if weights is not None: + if weights is not None and sample is not None: + g = partitionwise( + _blocked_df_w_s, name, x, weights, sample, histref=histref + ) + elif weights is not None and sample is None: g = partitionwise(_blocked_df_w, name, x, weights, histref=histref) + elif weights is None and sample is not None: + g = partitionwise(_blocked_df_s, name, x, sample, histref=histref) else: g = partitionwise(_blocked_df, name, x, histref=histref) else: - if weights is not None: + if weights is not None and sample is not None: + g = partitionwise( + _blocked_ma_w_s, name, *data, weights, sample, histref=histref + ) + elif weights is not None and sample is None: g = partitionwise(_blocked_ma_w, name, *data, weights, histref=histref) + elif weights is None and sample is not None: + g = partitionwise(_blocked_ma_s, name, *data, sample, histref=histref) else: g = partitionwise(_blocked_ma, name, *data, histref=histref) - dependencies = _dependencies(*data, weights=weights) + dependencies = _dependencies(*data, weights=weights, sample=sample) hlg = HighLevelGraph.from_collections(name, g, dependencies=dependencies) return PartitionedHistogram(hlg, name, data[0].npartitions, histref=histref) @@ -428,11 +544,16 @@ def _partitioned_histogram( def _reduced_histogram( *data: DaskCollection, histref: bh.Histogram, - weights: DaskCollection = None, - split_every: int = None, + weights: DaskCollection | None = None, + sample: DaskCollection | None = None, + split_every: int | None = None, ) -> AggHistogram: ph = _partitioned_histogram( - *data, histref=histref, weights=weights, split_every=split_every + *data, + histref=histref, + weights=weights, + sample=sample, + split_every=split_every, ) return ph.to_agg(split_every=split_every) @@ -524,11 +645,12 @@ def __call__(self, a: AggHistogram, b: AggHistogram) -> AggHistogram: def factory( *data: DaskCollection, - histref: bh.Histogram = None, - axes: Sequence[bh.axis.Axis] = None, - storage: bh.storage.Storage = None, - weights: DaskCollection = None, - split_every: int = None, + histref: bh.Histogram | None = None, + axes: Sequence[bh.axis.Axis] | None = None, + storage: bh.storage.Storage | None = None, + weights: DaskCollection | None = None, + sample: DaskCollection | None = None, + split_every: int | None = None, keep_partitioned: bool = False, ) -> AggHistogram | PartitionedHistogram: """Daskified Histogram collection factory function. @@ -565,6 +687,10 @@ def factory( weights : DaskCollection, optional Weights associated with the `data`. The partitioning/chunking of the weights must be compatible with the input data. + sample : DaskCollection, optional + Provide samples if the histogram storage allows it. The + partitioning/chunking of the samples must be compatible with + the input data. split_every : int, optional How many blocks to use in each split during aggregation. keep_partitioned : bool, optional @@ -632,4 +758,10 @@ def factory( storage = bh.storage.Double() histref = bh.Histogram(*axes, storage=storage) # type: ignore f = _partitioned_histogram if keep_partitioned else _reduced_histogram - return f(*data, histref=histref, weights=weights, split_every=split_every) + return f( + *data, + histref=histref, + weights=weights, + sample=sample, + split_every=split_every, + ) diff --git a/tests/test_core.py b/tests/test_core.py index c8ffd41..e30c664 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -11,20 +11,41 @@ import dask_histogram.core as dhc +def _gen_storage(weights, sample): + if weights is not None and sample is not None: + store = bh.storage.WeightedMean() + elif weights is None and sample is not None: + store = bh.storage.Mean() + elif weights is not None and sample is None: + store = bh.storage.Weight() + else: + store = bh.storage.Double() + return store + + @pytest.mark.parametrize("weights", [True, None]) -def test_1d_array(weights): - h = bh.Histogram(bh.axis.Regular(10, -3, 3), storage=bh.storage.Weight()) +@pytest.mark.parametrize("sample", [True, None]) +def test_1d_array(weights, sample): if weights is not None: weights = da.random.uniform(size=(2000,), chunks=(250,)) + if sample is not None: + sample = da.random.uniform(2, 8, size=(2000,), chunks=(250,)) + store = _gen_storage(weights, sample) + h = bh.Histogram(bh.axis.Regular(10, -3, 3), storage=store) x = da.random.standard_normal(size=(2000,), chunks=(250,)) - dh = dhc.factory(x, histref=h, weights=weights, split_every=4) - h.fill(x.compute(), weight=weights.compute() if weights is not None else None) + dh = dhc.factory(x, histref=h, weights=weights, split_every=4, sample=sample) + h.fill( + x.compute(), + weight=weights.compute() if weights is not None else None, + sample=sample.compute() if sample is not None else None, + ) np.testing.assert_allclose(h.counts(flow=True), dh.compute().counts(flow=True)) @pytest.mark.parametrize("weights", [True, None]) @pytest.mark.parametrize("shape", ((2000,), (2000, 2), (2000, 3), (2000, 4))) -def test_array_input(weights, shape): +@pytest.mark.parametrize("sample", [True, None]) +def test_array_input(weights, shape, sample): oned = len(shape) < 2 x = da.random.standard_normal( size=shape, chunks=(200,) if oned else (200, shape[1]) @@ -32,12 +53,18 @@ def test_array_input(weights, shape): xc = (x.compute(),) if oned else x.compute().T ax = bh.axis.Regular(10, -3, 3) axes = (ax,) if oned else (ax,) * shape[1] - weights = ( - da.random.uniform(size=(2000,), chunks=(200,)) if weights is not None else None + if weights: + weights = da.random.uniform(size=(2000,), chunks=(200,)) + if sample: + sample = da.random.uniform(3, 9, size=(2000,), chunks=(200,)) + store = _gen_storage(weights, sample) + h = bh.Histogram(*axes, storage=store) + dh = dhc.factory(x, histref=h, weights=weights, split_every=4, sample=sample) + h.fill( + *xc, + weight=weights.compute() if weights is not None else None, + sample=sample.compute() if sample is not None else None, ) - h = bh.Histogram(*axes, storage=bh.storage.Weight()) - dh = dhc.factory(x, histref=h, weights=weights, split_every=4) - h.fill(*xc, weight=weights.compute() if weights is not None else None) np.testing.assert_allclose(h.counts(flow=True), dh.compute().counts(flow=True))