Skip to content

Commit

Permalink
Add CUDA support for std and var reductions (#1267)
Browse files Browse the repository at this point in the history
* Add UsesCudaMutex enum

* Implement global cuda mutex locking

* Add tests for CUDA std and var reductions

* Update reduction table in docs
  • Loading branch information
ianthomas23 authored Aug 15, 2023
1 parent 812bcf6 commit fe567d0
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 62 deletions.
39 changes: 24 additions & 15 deletions datashader/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import xarray as xr

from .antialias import AntialiasCombination
from .reductions import SpecialColumn, by, category_codes, summary
from .reductions import SpecialColumn, UsesCudaMutex, by, category_codes, summary
from .utils import (isnull, ngjit, parallel_fill,
nanmax_in_place, nanmin_in_place, nansum_in_place, nanfirst_in_place, nanlast_in_place,
nanmax_n_in_place_3d, nanmax_n_in_place_4d, nanmin_n_in_place_3d, nanmin_n_in_place_4d,
Expand Down Expand Up @@ -134,8 +134,8 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, parti
combine_temps = list(pluck(5, calls))

create = make_create(bases, dshapes, cuda)
append, uses_cuda_mutex = make_append(bases, cols, calls, glyph, antialias)
info = make_info(cols, uses_cuda_mutex)
append, any_uses_cuda_mutex = make_append(bases, cols, calls, glyph, antialias)
info = make_info(cols, any_uses_cuda_mutex)
combine = make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned)
finalize = make_finalize(bases, agg, schema, cuda, partitioned)

Expand Down Expand Up @@ -252,7 +252,7 @@ def _get_call_tuples(base, dshape, schema, cuda, antialias, self_intersect, part
base.nan_check_column, # column used to check for NaNs in some where reductions
base._build_temps(cuda), # temps
base._build_combine_temps(cuda, partitioned), # combine temps
cuda and base.uses_cuda_mutex(), # uses cuda mutex
base.uses_cuda_mutex() if cuda else UsesCudaMutex.No, # uses cuda mutex
base.is_categorical(),
)

Expand All @@ -267,7 +267,7 @@ def make_create(bases, dshapes, cuda):
return lambda shape: tuple(c(shape, array_module) for c in creators)


def make_info(cols, uses_cuda_mutex):
def make_info(cols, uses_cuda_mutex: bool):
def info(df, canvas_shape):
ret = tuple(c.apply(df) for c in cols)
if uses_cuda_mutex:
Expand All @@ -291,7 +291,8 @@ def make_append(bases, cols, calls, glyph, antialias):
need_isnull = any(call[3] for call in calls)
if need_isnull:
namespace["isnull"] = isnull
any_uses_cuda_mutex = any(call[6] for call in calls)
global_cuda_mutex = any(call[6] == UsesCudaMutex.Global for call in calls)
any_uses_cuda_mutex = any(call[6] != UsesCudaMutex.No for call in calls)
if any_uses_cuda_mutex:
# This adds an argument to the append() function that is the cuda mutex
# generated in make_info.
Expand All @@ -308,11 +309,16 @@ def make_append(bases, cols, calls, glyph, antialias):
subscript = ', '.join(['i' + str(n) for n in range(ndims)])
else:
subscript = None
prev_cuda_mutex = False
prev_local_cuda_mutex = False
categorical_args = {} # Reuse categorical arguments if used in more than one reduction

def get_cuda_mutex_call(lock: bool) -> str:
func = "cuda_mutex_lock" if lock else "cuda_mutex_unlock"
return f'{func}({arg_lk["_cuda_mutex"]}, (y, x))'

for index, (func, bases, cols, nan_check_column, temps, _, uses_cuda_mutex, categorical) \
in enumerate(calls):
local_cuda_mutex = not global_cuda_mutex and uses_cuda_mutex == UsesCudaMutex.Local
local_lk.update(zip(temps, (next(names) for i in temps)))
func_name = next(names)
namespace[func_name] = func
Expand Down Expand Up @@ -345,7 +351,7 @@ def make_append(bases, cols, calls, glyph, antialias):
if antialias:
args.append("aa_factor")

if uses_cuda_mutex and prev_cuda_mutex:
if local_cuda_mutex and prev_local_cuda_mutex:
# Avoid unnecessary mutex unlock and lock cycle
body.pop()

Expand All @@ -357,8 +363,8 @@ def make_append(bases, cols, calls, glyph, antialias):
# where reduction needs access to the return of the contained
# reduction, which is the preceding one here.
prev_body = body.pop()
if uses_cuda_mutex and not prev_cuda_mutex:
body.append(f'cuda_mutex_lock({arg_lk["_cuda_mutex"]}, (y, x))')
if local_cuda_mutex and not prev_local_cuda_mutex:
body.append(get_cuda_mutex_call(True))
body.append(f'{update_index_arg_name} = {prev_body}')

# If nan_check_column is defined then need to check if value of
Expand All @@ -376,23 +382,26 @@ def make_append(bases, cols, calls, glyph, antialias):
body.append(f'{whitespace}if {update_index_arg_name} >= 0:')
body.append(f' {whitespace}{func_name}(x, y, {", ".join(args)})')
else:
if uses_cuda_mutex and not prev_cuda_mutex:
body.append(f'cuda_mutex_lock({arg_lk["_cuda_mutex"]}, (y, x))')
if local_cuda_mutex and not prev_local_cuda_mutex:
body.append(get_cuda_mutex_call(True))
if nan_check_column:
var = f"{arg_lk[nan_check_column]}[{subscript}]"
body.append(f'if not isnull({var}):')
body.append(f' {func_name}(x, y, {", ".join(args)})')
else:
body.append(f'{func_name}(x, y, {", ".join(args)})')

if uses_cuda_mutex:
body.append(f'cuda_mutex_unlock({arg_lk["_cuda_mutex"]}, (y, x))')
if local_cuda_mutex:
body.append(get_cuda_mutex_call(False))

prev_cuda_mutex = uses_cuda_mutex
prev_local_cuda_mutex = local_cuda_mutex

body = head + ['{0} = {1}[y, x]'.format(name, arg_lk[agg])
for agg, name in local_lk.items()] + body

if global_cuda_mutex:
body = [get_cuda_mutex_call(True)] + body + [get_cuda_mutex_call(False)]

if antialias:
signature.insert(0, "aa_factor")

Expand Down
62 changes: 45 additions & 17 deletions datashader/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ class SpecialColumn(Enum):
RowIndex = 1


class UsesCudaMutex(Enum):
"""
Enum that encapsulates the need for a Reduction to use a CUDA mutex to
operate correctly on a GPU. Possible values:
No: the Reduction append_cuda function is atomic and no mutex is required.
Local: Reduction append_cuda needs wrapping in a mutex.
Global: the overall compiled append function needs wrapping in a mutex.
"""
No = 0
Local = 1
Global = 2


class Preprocess(Expr):
"""Base clase for preprocessing steps."""
def __init__(self, column: str | SpecialColumn | None):
Expand Down Expand Up @@ -292,15 +306,15 @@ def nan_check_column(self):
else:
return None

def uses_cuda_mutex(self):
def uses_cuda_mutex(self) -> UsesCudaMutex:
"""Return ``True`` if this Reduction needs to use a CUDA mutex to
ensure that it is threadsafe across CUDA threads.
If the CUDA append functions are all atomic (i.e. using functions from
the numba.cuda.atomic module) then this is ``False``, otherwise it is
``True``.
"""
return False
return UsesCudaMutex.No

def uses_row_index(self, cuda, partitioned):
"""Return ``True`` if this Reduction uses a row index virtual column.
Expand Down Expand Up @@ -707,7 +721,7 @@ def is_where(self):
def nan_check_column(self):
return self.reduction.nan_check_column

def uses_cuda_mutex(self):
def uses_cuda_mutex(self) -> UsesCudaMutex:
return self.reduction.uses_cuda_mutex()

def uses_row_index(self, cuda, partitioned):
Expand Down Expand Up @@ -1044,10 +1058,10 @@ class m2(FloatingReduction):
Name of the column to aggregate over. Column data type must be numeric.
``NaN`` values in the column are skipped.
"""
def uses_cuda_mutex(self) -> UsesCudaMutex:
return UsesCudaMutex.Global

def _build_append(self, dshape, schema, cuda, antialias, self_intersect):
if cuda:
raise ValueError("""\
The 'std' and 'var' reduction operations are not yet supported on the GPU""")
return super(m2, self)._build_append(dshape, schema, cuda, antialias, self_intersect)

def _build_create(self, required_dshape):
Expand All @@ -1056,6 +1070,7 @@ def _build_create(self, required_dshape):
def _build_temps(self, cuda=False):
return (_sum_zero(self.column), count(self.column))

# CPU append functions
@staticmethod
@ngjit
def _append(x, y, m2, field, sum, count):
Expand All @@ -1069,6 +1084,20 @@ def _append(x, y, m2, field, sum, count):
return 0
return -1

# GPU append functions
@staticmethod
@nb_cuda.jit(device=True)
def _append_cuda(x, y, m2, field, sum, count):
# sum & count are the results of sum[y, x], count[y, x] before being
# updated by field
if not isnull(field):
if count > 0:
u1 = np.float64(sum) / count
u = np.float64(sum + field) / (count + 1)
m2[y, x] += (field - u1) * (field - u)
return 0
return -1

@staticmethod
def _combine(Ms, sums, ns):
with np.errstate(divide='ignore', invalid='ignore'):
Expand Down Expand Up @@ -1519,8 +1548,8 @@ def _create_row_index_selector(self):


class max_n(FloatingNReduction):
def uses_cuda_mutex(self):
return True
def uses_cuda_mutex(self) -> UsesCudaMutex:
return UsesCudaMutex.Local

def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]:
return (AntialiasStage2(AntialiasCombination.MAX, array_module.nan, n_reduction=True),)
Expand Down Expand Up @@ -1596,8 +1625,8 @@ def _combine_cuda(aggs):


class min_n(FloatingNReduction):
def uses_cuda_mutex(self):
return True
def uses_cuda_mutex(self) -> UsesCudaMutex:
return UsesCudaMutex.Local

def _antialias_requires_2_stages(self):
return True
Expand Down Expand Up @@ -1759,8 +1788,8 @@ def out_dshape(self, input_dshape, antialias, cuda, partitioned):
else:
return dshape(ct.float64)

def uses_cuda_mutex(self):
return True
def uses_cuda_mutex(self) -> UsesCudaMutex:
return UsesCudaMutex.Local

def uses_row_index(self, cuda, partitioned):
return self.column == SpecialColumn.RowIndex or self.selector.uses_row_index(cuda, partitioned)
Expand Down Expand Up @@ -2163,8 +2192,8 @@ def _antialias_requires_2_stages(self):
def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]:
return (AntialiasStage2(AntialiasCombination.MIN, -1),)

def uses_cuda_mutex(self):
return True
def uses_cuda_mutex(self) -> UsesCudaMutex:
return UsesCudaMutex.Local

# CPU append functions
@staticmethod
Expand Down Expand Up @@ -2235,13 +2264,12 @@ def __init__(self, n=1):
def out_dshape(self, in_dshape, antialias, cuda, partitioned):
return dshape(ct.int64)

def uses_cuda_mutex(self):
return True
def uses_cuda_mutex(self) -> UsesCudaMutex:
return UsesCudaMutex.Local

def uses_row_index(self, cuda, partitioned):
return True


def _build_combine(self, dshape, antialias, cuda, partitioned):
if cuda:
return self._combine_cuda
Expand Down
16 changes: 0 additions & 16 deletions datashader/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,9 +812,6 @@ def test_mean(ddf, npartitions):
@pytest.mark.parametrize('ddf', ddfs)
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
def test_var(ddf, npartitions):
if dask_cudf and isinstance(ddf, dask_cudf.DataFrame):
pytest.skip("var not supported with cudf")

ddf = ddf.repartition(npartitions)
assert ddf.npartitions == npartitions
out = xr.DataArray(
Expand All @@ -832,9 +829,6 @@ def test_var(ddf, npartitions):
@pytest.mark.parametrize('ddf', ddfs)
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
def test_std(ddf, npartitions):
if dask_cudf and isinstance(ddf, dask_cudf.DataFrame):
pytest.skip("std not supported with cudf")

ddf = ddf.repartition(npartitions)
assert ddf.npartitions == npartitions
out = xr.DataArray(
Expand Down Expand Up @@ -1030,10 +1024,6 @@ def test_categorical_mean_binning(ddf, npartitions):
@pytest.mark.parametrize('ddf', ddfs)
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
def test_categorical_var(ddf, npartitions):
if cudf and isinstance(ddf._meta, cudf.DataFrame):
pytest.skip(
"The 'var' reduction is yet supported on the GPU"
)
ddf = ddf.repartition(npartitions)
assert ddf.npartitions == npartitions
sol = np.array([[[ 2.5, nan, nan, nan],
Expand Down Expand Up @@ -1074,10 +1064,6 @@ def test_categorical_var(ddf, npartitions):
@pytest.mark.parametrize('ddf', ddfs)
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
def test_categorical_std(ddf, npartitions):
if cudf and isinstance(ddf._meta, cudf.DataFrame):
pytest.skip(
"The 'std' reduction is yet supported on the GPU"
)
ddf = ddf.repartition(npartitions)
assert ddf.npartitions == npartitions
sol = np.sqrt(np.array([
Expand Down Expand Up @@ -1120,8 +1106,6 @@ def test_categorical_std(ddf, npartitions):
@pytest.mark.parametrize('ddf', ddfs)
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
def test_multiple_aggregates(ddf, npartitions):
if dask_cudf and isinstance(ddf, dask_cudf.DataFrame):
pytest.skip("std not supported with cudf")
ddf = ddf.repartition(npartitions)
assert ddf.npartitions == npartitions
agg = c.points(ddf, 'x', 'y',
Expand Down
14 changes: 2 additions & 12 deletions datashader/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ def test_mean(df):
assert_eq_xr(c.points(df, 'x', 'y', ds.mean('f64')), out)


@pytest.mark.parametrize('df', [df_pd])
@pytest.mark.parametrize('df', dfs)
def test_var(df):
out = xr.DataArray(values(df.i32).reshape((2, 2, 5)).var(axis=2, dtype='f8').T,
coords=coords, dims=dims)
Expand All @@ -708,7 +708,7 @@ def test_var(df):
assert_eq_xr(c.points(df, 'x', 'y', ds.var('f64')), out)


@pytest.mark.parametrize('df', [df_pd])
@pytest.mark.parametrize('df', dfs)
def test_std(df):
out = xr.DataArray(values(df.i32).reshape((2, 2, 5)).std(axis=2, dtype='f8').T,
coords=coords, dims=dims)
Expand Down Expand Up @@ -917,11 +917,6 @@ def test_categorical_mean_binning(df):

@pytest.mark.parametrize('df', dfs)
def test_categorical_var(df):
if cudf and isinstance(df, cudf.DataFrame):
pytest.skip(
"The 'var' reduction is yet supported on the GPU"
)

sol = np.array([[[ 2.5, nan, nan, nan],
[ nan, nan, 2., nan]],
[[ nan, 2., nan, nan],
Expand Down Expand Up @@ -953,11 +948,6 @@ def test_categorical_var(df):

@pytest.mark.parametrize('df', dfs)
def test_categorical_std(df):
if cudf and isinstance(df, cudf.DataFrame):
pytest.skip(
"The 'std' reduction is yet supported on the GPU"
)

sol = np.sqrt(np.array([
[[ 2.5, nan, nan, nan],
[ nan, nan, 2., nan]],
Expand Down
4 changes: 2 additions & 2 deletions doc/reduction.csv
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
:class:`~datashader.reductions.mean`, yes, yes, yes, yes, yes,
:class:`~datashader.reductions.min`, yes, yes, yes, yes, yes, yes
:class:`~datashader.reductions.min_n`, yes, yes, yes, yes, , yes
:class:`~datashader.reductions.std`, yes, yes, , , ,
:class:`~datashader.reductions.std`, yes, yes, yes, yes, ,
:class:`~datashader.reductions.sum`, yes, yes, yes, yes, yes,
:class:`~datashader.reductions.var`, yes, yes, , , ,
:class:`~datashader.reductions.var`, yes, yes, yes, yes, ,

0 comments on commit fe567d0

Please sign in to comment.