From fe567d0797257b0ef6873cf62305be45e4724cf7 Mon Sep 17 00:00:00 2001 From: Ian Thomas Date: Tue, 15 Aug 2023 09:17:43 +0100 Subject: [PATCH] Add CUDA support for std and var reductions (#1267) * Add UsesCudaMutex enum * Implement global cuda mutex locking * Add tests for CUDA std and var reductions * Update reduction table in docs --- datashader/compiler.py | 39 +++++++++++++-------- datashader/reductions.py | 62 ++++++++++++++++++++++++--------- datashader/tests/test_dask.py | 16 --------- datashader/tests/test_pandas.py | 14 ++------ doc/reduction.csv | 4 +-- 5 files changed, 73 insertions(+), 62 deletions(-) diff --git a/datashader/compiler.py b/datashader/compiler.py index cb0e518ee..22ee48dfb 100644 --- a/datashader/compiler.py +++ b/datashader/compiler.py @@ -8,7 +8,7 @@ import xarray as xr from .antialias import AntialiasCombination -from .reductions import SpecialColumn, by, category_codes, summary +from .reductions import SpecialColumn, UsesCudaMutex, by, category_codes, summary from .utils import (isnull, ngjit, parallel_fill, nanmax_in_place, nanmin_in_place, nansum_in_place, nanfirst_in_place, nanlast_in_place, nanmax_n_in_place_3d, nanmax_n_in_place_4d, nanmin_n_in_place_3d, nanmin_n_in_place_4d, @@ -134,8 +134,8 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, parti combine_temps = list(pluck(5, calls)) create = make_create(bases, dshapes, cuda) - append, uses_cuda_mutex = make_append(bases, cols, calls, glyph, antialias) - info = make_info(cols, uses_cuda_mutex) + append, any_uses_cuda_mutex = make_append(bases, cols, calls, glyph, antialias) + info = make_info(cols, any_uses_cuda_mutex) combine = make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned) finalize = make_finalize(bases, agg, schema, cuda, partitioned) @@ -252,7 +252,7 @@ def _get_call_tuples(base, dshape, schema, cuda, antialias, self_intersect, part base.nan_check_column, # column used to check for NaNs in some where reductions base._build_temps(cuda), # temps base._build_combine_temps(cuda, partitioned), # combine temps - cuda and base.uses_cuda_mutex(), # uses cuda mutex + base.uses_cuda_mutex() if cuda else UsesCudaMutex.No, # uses cuda mutex base.is_categorical(), ) @@ -267,7 +267,7 @@ def make_create(bases, dshapes, cuda): return lambda shape: tuple(c(shape, array_module) for c in creators) -def make_info(cols, uses_cuda_mutex): +def make_info(cols, uses_cuda_mutex: bool): def info(df, canvas_shape): ret = tuple(c.apply(df) for c in cols) if uses_cuda_mutex: @@ -291,7 +291,8 @@ def make_append(bases, cols, calls, glyph, antialias): need_isnull = any(call[3] for call in calls) if need_isnull: namespace["isnull"] = isnull - any_uses_cuda_mutex = any(call[6] for call in calls) + global_cuda_mutex = any(call[6] == UsesCudaMutex.Global for call in calls) + any_uses_cuda_mutex = any(call[6] != UsesCudaMutex.No for call in calls) if any_uses_cuda_mutex: # This adds an argument to the append() function that is the cuda mutex # generated in make_info. @@ -308,11 +309,16 @@ def make_append(bases, cols, calls, glyph, antialias): subscript = ', '.join(['i' + str(n) for n in range(ndims)]) else: subscript = None - prev_cuda_mutex = False + prev_local_cuda_mutex = False categorical_args = {} # Reuse categorical arguments if used in more than one reduction + def get_cuda_mutex_call(lock: bool) -> str: + func = "cuda_mutex_lock" if lock else "cuda_mutex_unlock" + return f'{func}({arg_lk["_cuda_mutex"]}, (y, x))' + for index, (func, bases, cols, nan_check_column, temps, _, uses_cuda_mutex, categorical) \ in enumerate(calls): + local_cuda_mutex = not global_cuda_mutex and uses_cuda_mutex == UsesCudaMutex.Local local_lk.update(zip(temps, (next(names) for i in temps))) func_name = next(names) namespace[func_name] = func @@ -345,7 +351,7 @@ def make_append(bases, cols, calls, glyph, antialias): if antialias: args.append("aa_factor") - if uses_cuda_mutex and prev_cuda_mutex: + if local_cuda_mutex and prev_local_cuda_mutex: # Avoid unnecessary mutex unlock and lock cycle body.pop() @@ -357,8 +363,8 @@ def make_append(bases, cols, calls, glyph, antialias): # where reduction needs access to the return of the contained # reduction, which is the preceding one here. prev_body = body.pop() - if uses_cuda_mutex and not prev_cuda_mutex: - body.append(f'cuda_mutex_lock({arg_lk["_cuda_mutex"]}, (y, x))') + if local_cuda_mutex and not prev_local_cuda_mutex: + body.append(get_cuda_mutex_call(True)) body.append(f'{update_index_arg_name} = {prev_body}') # If nan_check_column is defined then need to check if value of @@ -376,8 +382,8 @@ def make_append(bases, cols, calls, glyph, antialias): body.append(f'{whitespace}if {update_index_arg_name} >= 0:') body.append(f' {whitespace}{func_name}(x, y, {", ".join(args)})') else: - if uses_cuda_mutex and not prev_cuda_mutex: - body.append(f'cuda_mutex_lock({arg_lk["_cuda_mutex"]}, (y, x))') + if local_cuda_mutex and not prev_local_cuda_mutex: + body.append(get_cuda_mutex_call(True)) if nan_check_column: var = f"{arg_lk[nan_check_column]}[{subscript}]" body.append(f'if not isnull({var}):') @@ -385,14 +391,17 @@ def make_append(bases, cols, calls, glyph, antialias): else: body.append(f'{func_name}(x, y, {", ".join(args)})') - if uses_cuda_mutex: - body.append(f'cuda_mutex_unlock({arg_lk["_cuda_mutex"]}, (y, x))') + if local_cuda_mutex: + body.append(get_cuda_mutex_call(False)) - prev_cuda_mutex = uses_cuda_mutex + prev_local_cuda_mutex = local_cuda_mutex body = head + ['{0} = {1}[y, x]'.format(name, arg_lk[agg]) for agg, name in local_lk.items()] + body + if global_cuda_mutex: + body = [get_cuda_mutex_call(True)] + body + [get_cuda_mutex_call(False)] + if antialias: signature.insert(0, "aa_factor") diff --git a/datashader/reductions.py b/datashader/reductions.py index 1922f3144..65f7913d5 100644 --- a/datashader/reductions.py +++ b/datashader/reductions.py @@ -54,6 +54,20 @@ class SpecialColumn(Enum): RowIndex = 1 +class UsesCudaMutex(Enum): + """ + Enum that encapsulates the need for a Reduction to use a CUDA mutex to + operate correctly on a GPU. Possible values: + + No: the Reduction append_cuda function is atomic and no mutex is required. + Local: Reduction append_cuda needs wrapping in a mutex. + Global: the overall compiled append function needs wrapping in a mutex. + """ + No = 0 + Local = 1 + Global = 2 + + class Preprocess(Expr): """Base clase for preprocessing steps.""" def __init__(self, column: str | SpecialColumn | None): @@ -292,7 +306,7 @@ def nan_check_column(self): else: return None - def uses_cuda_mutex(self): + def uses_cuda_mutex(self) -> UsesCudaMutex: """Return ``True`` if this Reduction needs to use a CUDA mutex to ensure that it is threadsafe across CUDA threads. @@ -300,7 +314,7 @@ def uses_cuda_mutex(self): the numba.cuda.atomic module) then this is ``False``, otherwise it is ``True``. """ - return False + return UsesCudaMutex.No def uses_row_index(self, cuda, partitioned): """Return ``True`` if this Reduction uses a row index virtual column. @@ -707,7 +721,7 @@ def is_where(self): def nan_check_column(self): return self.reduction.nan_check_column - def uses_cuda_mutex(self): + def uses_cuda_mutex(self) -> UsesCudaMutex: return self.reduction.uses_cuda_mutex() def uses_row_index(self, cuda, partitioned): @@ -1044,10 +1058,10 @@ class m2(FloatingReduction): Name of the column to aggregate over. Column data type must be numeric. ``NaN`` values in the column are skipped. """ + def uses_cuda_mutex(self) -> UsesCudaMutex: + return UsesCudaMutex.Global + def _build_append(self, dshape, schema, cuda, antialias, self_intersect): - if cuda: - raise ValueError("""\ -The 'std' and 'var' reduction operations are not yet supported on the GPU""") return super(m2, self)._build_append(dshape, schema, cuda, antialias, self_intersect) def _build_create(self, required_dshape): @@ -1056,6 +1070,7 @@ def _build_create(self, required_dshape): def _build_temps(self, cuda=False): return (_sum_zero(self.column), count(self.column)) + # CPU append functions @staticmethod @ngjit def _append(x, y, m2, field, sum, count): @@ -1069,6 +1084,20 @@ def _append(x, y, m2, field, sum, count): return 0 return -1 + # GPU append functions + @staticmethod + @nb_cuda.jit(device=True) + def _append_cuda(x, y, m2, field, sum, count): + # sum & count are the results of sum[y, x], count[y, x] before being + # updated by field + if not isnull(field): + if count > 0: + u1 = np.float64(sum) / count + u = np.float64(sum + field) / (count + 1) + m2[y, x] += (field - u1) * (field - u) + return 0 + return -1 + @staticmethod def _combine(Ms, sums, ns): with np.errstate(divide='ignore', invalid='ignore'): @@ -1519,8 +1548,8 @@ def _create_row_index_selector(self): class max_n(FloatingNReduction): - def uses_cuda_mutex(self): - return True + def uses_cuda_mutex(self) -> UsesCudaMutex: + return UsesCudaMutex.Local def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: return (AntialiasStage2(AntialiasCombination.MAX, array_module.nan, n_reduction=True),) @@ -1596,8 +1625,8 @@ def _combine_cuda(aggs): class min_n(FloatingNReduction): - def uses_cuda_mutex(self): - return True + def uses_cuda_mutex(self) -> UsesCudaMutex: + return UsesCudaMutex.Local def _antialias_requires_2_stages(self): return True @@ -1759,8 +1788,8 @@ def out_dshape(self, input_dshape, antialias, cuda, partitioned): else: return dshape(ct.float64) - def uses_cuda_mutex(self): - return True + def uses_cuda_mutex(self) -> UsesCudaMutex: + return UsesCudaMutex.Local def uses_row_index(self, cuda, partitioned): return self.column == SpecialColumn.RowIndex or self.selector.uses_row_index(cuda, partitioned) @@ -2163,8 +2192,8 @@ def _antialias_requires_2_stages(self): def _antialias_stage_2(self, self_intersect, array_module) -> tuple[AntialiasStage2]: return (AntialiasStage2(AntialiasCombination.MIN, -1),) - def uses_cuda_mutex(self): - return True + def uses_cuda_mutex(self) -> UsesCudaMutex: + return UsesCudaMutex.Local # CPU append functions @staticmethod @@ -2235,13 +2264,12 @@ def __init__(self, n=1): def out_dshape(self, in_dshape, antialias, cuda, partitioned): return dshape(ct.int64) - def uses_cuda_mutex(self): - return True + def uses_cuda_mutex(self) -> UsesCudaMutex: + return UsesCudaMutex.Local def uses_row_index(self, cuda, partitioned): return True - def _build_combine(self, dshape, antialias, cuda, partitioned): if cuda: return self._combine_cuda diff --git a/datashader/tests/test_dask.py b/datashader/tests/test_dask.py index 3de5842e9..e3a3d58df 100644 --- a/datashader/tests/test_dask.py +++ b/datashader/tests/test_dask.py @@ -812,9 +812,6 @@ def test_mean(ddf, npartitions): @pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_var(ddf, npartitions): - if dask_cudf and isinstance(ddf, dask_cudf.DataFrame): - pytest.skip("var not supported with cudf") - ddf = ddf.repartition(npartitions) assert ddf.npartitions == npartitions out = xr.DataArray( @@ -832,9 +829,6 @@ def test_var(ddf, npartitions): @pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_std(ddf, npartitions): - if dask_cudf and isinstance(ddf, dask_cudf.DataFrame): - pytest.skip("std not supported with cudf") - ddf = ddf.repartition(npartitions) assert ddf.npartitions == npartitions out = xr.DataArray( @@ -1030,10 +1024,6 @@ def test_categorical_mean_binning(ddf, npartitions): @pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_categorical_var(ddf, npartitions): - if cudf and isinstance(ddf._meta, cudf.DataFrame): - pytest.skip( - "The 'var' reduction is yet supported on the GPU" - ) ddf = ddf.repartition(npartitions) assert ddf.npartitions == npartitions sol = np.array([[[ 2.5, nan, nan, nan], @@ -1074,10 +1064,6 @@ def test_categorical_var(ddf, npartitions): @pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_categorical_std(ddf, npartitions): - if cudf and isinstance(ddf._meta, cudf.DataFrame): - pytest.skip( - "The 'std' reduction is yet supported on the GPU" - ) ddf = ddf.repartition(npartitions) assert ddf.npartitions == npartitions sol = np.sqrt(np.array([ @@ -1120,8 +1106,6 @@ def test_categorical_std(ddf, npartitions): @pytest.mark.parametrize('ddf', ddfs) @pytest.mark.parametrize('npartitions', [1, 2, 3, 4]) def test_multiple_aggregates(ddf, npartitions): - if dask_cudf and isinstance(ddf, dask_cudf.DataFrame): - pytest.skip("std not supported with cudf") ddf = ddf.repartition(npartitions) assert ddf.npartitions == npartitions agg = c.points(ddf, 'x', 'y', diff --git a/datashader/tests/test_pandas.py b/datashader/tests/test_pandas.py index dd373f5e0..e5eb58e16 100644 --- a/datashader/tests/test_pandas.py +++ b/datashader/tests/test_pandas.py @@ -696,7 +696,7 @@ def test_mean(df): assert_eq_xr(c.points(df, 'x', 'y', ds.mean('f64')), out) -@pytest.mark.parametrize('df', [df_pd]) +@pytest.mark.parametrize('df', dfs) def test_var(df): out = xr.DataArray(values(df.i32).reshape((2, 2, 5)).var(axis=2, dtype='f8').T, coords=coords, dims=dims) @@ -708,7 +708,7 @@ def test_var(df): assert_eq_xr(c.points(df, 'x', 'y', ds.var('f64')), out) -@pytest.mark.parametrize('df', [df_pd]) +@pytest.mark.parametrize('df', dfs) def test_std(df): out = xr.DataArray(values(df.i32).reshape((2, 2, 5)).std(axis=2, dtype='f8').T, coords=coords, dims=dims) @@ -917,11 +917,6 @@ def test_categorical_mean_binning(df): @pytest.mark.parametrize('df', dfs) def test_categorical_var(df): - if cudf and isinstance(df, cudf.DataFrame): - pytest.skip( - "The 'var' reduction is yet supported on the GPU" - ) - sol = np.array([[[ 2.5, nan, nan, nan], [ nan, nan, 2., nan]], [[ nan, 2., nan, nan], @@ -953,11 +948,6 @@ def test_categorical_var(df): @pytest.mark.parametrize('df', dfs) def test_categorical_std(df): - if cudf and isinstance(df, cudf.DataFrame): - pytest.skip( - "The 'std' reduction is yet supported on the GPU" - ) - sol = np.sqrt(np.array([ [[ 2.5, nan, nan, nan], [ nan, nan, 2., nan]], diff --git a/doc/reduction.csv b/doc/reduction.csv index f1e3467a3..70b438798 100644 --- a/doc/reduction.csv +++ b/doc/reduction.csv @@ -11,6 +11,6 @@ :class:`~datashader.reductions.mean`, yes, yes, yes, yes, yes, :class:`~datashader.reductions.min`, yes, yes, yes, yes, yes, yes :class:`~datashader.reductions.min_n`, yes, yes, yes, yes, , yes -:class:`~datashader.reductions.std`, yes, yes, , , , +:class:`~datashader.reductions.std`, yes, yes, yes, yes, , :class:`~datashader.reductions.sum`, yes, yes, yes, yes, yes, -:class:`~datashader.reductions.var`, yes, yes, , , , +:class:`~datashader.reductions.var`, yes, yes, yes, yes, ,