Skip to content

Commit

Permalink
Adding support for sample argument; some cleaning up. (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis authored Oct 26, 2021
1 parent 9832164 commit 989ee59
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 252 deletions.
4 changes: 2 additions & 2 deletions docs/dh_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
},
"outputs": [],
"source": [
"h = dh.Histogram(dh.axis.Regular(50, -3, 3), dh.axis.Regular(50, -3, 3))\n",
"h = dh.Histogram(dh.axis.Regular(50, -3, 3), dh.axis.Regular(50, -3, 3))\n",
"h_concrete = bh.Histogram(*h.axes)"
]
},
Expand Down Expand Up @@ -85,7 +85,7 @@
"metadata": {},
"outputs": [],
"source": [
"h.fill(x, y) # data is still lazy"
"h.fill(x, y) # data is still lazy"
]
},
{
Expand Down
212 changes: 13 additions & 199 deletions src/dask_histogram/boost.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import boost_histogram.axis as axis
import boost_histogram.storage as storage
import dask.array as da
import numpy as np
from dask.base import is_dask_collection
from dask.delayed import Delayed, delayed
from dask.utils import is_arraylike, is_dataframe_like
Expand All @@ -20,200 +19,12 @@

if TYPE_CHECKING:
from .typing import BinArg, BinType, DaskCollection, RangeArg, RangeType
else:
DaskCollection = object

import dask_histogram

__all__ = ("Histogram", "histogram", "histogram2d", "histogramdd")


@delayed
def _blocked_fill_1d(
data: Any,
meta_hist: Histogram,
weight: Any | None = None,
):
"""Single delayed (1D) histogram concrete fill."""
hfb = Histogram(*meta_hist.axes, storage=meta_hist._storage_type())
hfb.concrete_fill(data, weight=weight)
return hfb


@delayed
def _blocked_fill_rectangular(
data: Any,
meta_hist: Histogram,
weight: Any | None = None,
):
hfb = Histogram(*meta_hist.axes, storage=meta_hist._storage_type())
hfb.concrete_fill(*(data.T), weight=weight)
return hfb


@delayed
def _blocked_fill_dataframe(
data: Any,
meta_hist: Histogram,
weight: Any | None = None,
):
hfb = Histogram(*meta_hist.axes, storage=meta_hist._storage_type())
hfb.concrete_fill(*(data[c] for c in data.columns), weight=weight)
return hfb


@delayed
def _blocked_fill_multiarg(
*args: np.ndarray,
meta_hist: Histogram,
weight: Any | None = None,
):
"""Single delayed (nD) histogram concrete fill."""
hfb = Histogram(*meta_hist.axes, storage=meta_hist._storage_type())
hfb.concrete_fill(*args, weight=weight)
return hfb


@delayed
def _delayed_to_numpy(hist: Histogram, flow: bool = False):
return hist.to_numpy(flow=flow, dd=True)[0:1]


def _to_dask_array(
hist: Histogram, dtype: Any, flow: bool = False, dd: bool = False
) -> tuple[da.Array, tuple[da.Array, ...]] | tuple[da.Array, ...]:
shape = hist.shape
s1 = hist.to_delayed() # delayed sum of histogram
s2 = _delayed_to_numpy(s1, flow=flow)
arr = da.from_delayed(s2, shape=shape, dtype=dtype)
edges = (da.asarray(a.edges) for a in hist.axes)
if dd:
return (arr, list(edges))
else:
return (arr, *(tuple(edges)))


def _fill_1d(
data: DaskCollection,
*,
meta_hist: Histogram,
weight: DaskCollection | None = None,
) -> Delayed:
"""Fill a one dimensional histogram.
This function is compatible with dask.array.Array objects and
dask.dataframe.Series objects.
"""
data = data.to_delayed() # type: ignore
if weight is None:
hists = [_blocked_fill_1d(a, meta_hist) for a in data]
else:
weights = weight.to_delayed()
hists = [_blocked_fill_1d(a, meta_hist, w) for a, w in zip(data, weights)]
return delayed(sum)(hists)


def _fill_rectangular(
data: DaskCollection,
*,
meta_hist: Histogram,
weight: DaskCollection | None = None,
) -> Delayed:
"""Fill nD histogram given a rectangular (multi-column) dataset.
Array Input
-----------
If a multi-column dask.array.Array is passed to `fill`, we want to
avoid having to compute the transpose of the _entire collection_
(this may be an expensive and unncessary computation).
For this to work the input data can be chunked only along the row
axis; we convert the whole collection (nD array) to delayed which
gives us a 2D array of Delayed objects with a shape of the form
(n_row_chunks, n_column_chunks). We transpose this and take the
first and only element along the n_column_chunks dimension. This
gives us a list of Delayed objects along the row dimension (each
object wraps a multidimensional NumPy array).
Finally, we loop over the list of Delayed objects and compute the
transpose on each chunk _as necessary_ when the materialized array
(a subset of the original complete collection) is used.
DataFrame Input
---------------
DataFrames are a bit simpler; we just use to_delayed() and pass to
the dedicated @delayed fill function.
"""
if is_arraylike(data):
data = data.to_delayed().T[0] # type: ignore
ff = _blocked_fill_rectangular
elif is_dataframe_like(data):
data = data.to_delayed() # type: ignore
ff = _blocked_fill_dataframe
else:
raise TypeError(
f"data must be dask array or dataframe like; found {type(data)}"
)

if weight is None:
hists = [ff(s, meta_hist=meta_hist, weight=None) for s in data]
else:
weights = weight.to_delayed()
if len(weights) != len(data):
raise ValueError("data and weight must have the same number of chunks.")
hists = [ff(s, meta_hist=meta_hist, weight=w) for s, w in zip(data, weights)]

return delayed(sum)(hists)


def _fill_multiarg(
*data: DaskCollection,
meta_hist: Histogram,
weight: DaskCollection | None = None,
) -> Delayed:
"""Fill nD histogram given a multiarg (vectors) sample.
This function is compatible with multiple one dimensional
dask.array.Array objects as well as multiple dask.dataframe.Series
objects; they just must have equally sized chunks/partitions.
"""
D = len(data)
# each entry is data along a specific dimension
delayed_data = [a.to_delayed() for a in data]
# check that all dimensions are chunked identically
npartitions = len(delayed_data[0])
for i in range(1, D):
if len(delayed_data[i]) != npartitions:
raise ValueError("All dimensions must be chunked/partitioned identically.")
# We need to create a data structure that will connect coordinate
# chunks. We loop over the number of partitions and connect the
# ith chunk along each dimension (the loop over j is the loop over
# the total number of dimensions).
delayed_data = [
tuple(delayed_data[j][i] for j in range(D)) for i in range(npartitions)
]

if weight is None:
hists = [_blocked_fill_multiarg(*d, meta_hist=meta_hist) for d in delayed_data]
else:
weights = weight.to_delayed()
if len(weights) != npartitions:
raise ValueError(
"sample and weight must have the same number of chunks/partitions."
)
hists = [
_blocked_fill_multiarg(*d, meta_hist=meta_hist, weight=w)
for d, w in zip(delayed_data, weights)
]

return delayed(sum)(hists)


class Histogram(bh.Histogram, family=dask_histogram):
"""Histogram object capable of lazy computation.
Expand Down Expand Up @@ -346,8 +157,10 @@ def fill(
weight : dask.array.Array or dask.dataframe.Series, optional
Weights associated with each sample. The weights must be
chunked/partitioned in a way compatible with the dataset.
sample : Any
Unsupported argument from boost_histogram.Histogram.fill.
sample : dask.array.Array or dask.dataframe.Series, optional
Provide samples if the histogram storage allows it. The
partitioning/chunking of the samples must be compatible
with the input data.
threads : int, optional
Ignored argument kept for compatibility with boost-histogram.
We let Dask have complete control over threads.
Expand Down Expand Up @@ -376,7 +189,7 @@ def fill(
else:
raise ValueError(f"Cannot interpret input data: {args}")

new_fill = factory(*args, histref=self, weights=weight)
new_fill = factory(*args, histref=self, weights=weight, sample=sample)
if self._staged is not None:
self._staged += new_fill
else:
Expand Down Expand Up @@ -540,14 +353,15 @@ def to_dask_array(
The edges for each dimension
"""
if self._storage_type is bh.storage.Int64:
dtype: object = np.uint64
elif self._storage_type is bh.storage.AtomicInt64:
dtype = np.uint64
if self._staged is not None:
return self._staged.to_dask_array(flow=flow, dd=dd)
else:
dtype = float

return _to_dask_array(self, dtype=dtype, flow=flow, dd=dd)
counts, edges = self.to_numpy(flow=flow, dd=True, view=False)
counts = da.from_array(counts)
edges = [da.from_array(ea) for ea in edges]
if dd:
return counts, edges
return tuple(counts, *edges)


def histogramdd(
Expand Down
Loading

0 comments on commit 989ee59

Please sign in to comment.