Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate where combine_cpu functions by ndim #1265

Merged
merged 1 commit into from
Aug 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions datashader/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from datashader.antialias import AntialiasCombination, AntialiasStage2
from datashader.utils import isminus1, isnull
from numba import cuda as nb_cuda
from numba.typed import List

try:
from datashader.transfer_functions._cuda_utils import (
Expand Down Expand Up @@ -1868,7 +1867,16 @@ def _build_combine(self, dshape, antialias, cuda, partitioned):
invalid = isminus1 if self.selector.uses_row_index(cuda, partitioned) else isnull

@ngjit
def combine_cpu(aggs, selector_aggs):
def combine_cpu_2d(aggs, selector_aggs):
ny, nx = aggs[0].shape
for y in range(ny):
for x in range(nx):
value = selector_aggs[1][y, x]
if not invalid(value) and append(x, y, selector_aggs[0], value) >= 0:
aggs[0][y, x] = aggs[1][y, x]

@ngjit
def combine_cpu_3d(aggs, selector_aggs):
ny, nx, ncat = aggs[0].shape
for y in range(ny):
for x in range(nx):
Expand All @@ -1878,9 +1886,21 @@ def combine_cpu(aggs, selector_aggs):
aggs[0][y, x, cat] = aggs[1][y, x, cat]

@ngjit
def combine_cpu_n(aggs, selector_aggs):
# Generic solution for combining dask partitions of a where
# reduction with a selector that is a FloatingNReduction.
def combine_cpu_n_3d(aggs, selector_aggs):
ny, nx, n = aggs[0].shape
for y in range(ny):
for x in range(nx):
for i in range(n):
value = selector_aggs[1][y, x, i]
if invalid(value):
break
update_index = append(x, y, selector_aggs[0], value)
if update_index < 0:
break
shift_and_insert(aggs[0][y, x], aggs[1][y, x, i], update_index)

@ngjit
def combine_cpu_n_4d(aggs, selector_aggs):
ny, nx, ncat, n = aggs[0].shape
for y in range(ny):
for x in range(nx):
Expand Down Expand Up @@ -1955,10 +1975,9 @@ def wrapped_combine(aggs, selector_aggs):
combine_cuda_n_4d[cuda_args(aggs[0].shape[:3])](aggs, selector_aggs)
else:
if ndim == 3:
# 4d view of each agg, note use of numba typed list.
aggs = List([np.expand_dims(agg, 2) for agg in aggs])
selector_aggs = List([np.expand_dims(agg, 2) for agg in selector_aggs])
combine_cpu_n(aggs, selector_aggs)
combine_cpu_n_3d(aggs, selector_aggs)
else:
combine_cpu_n_4d(aggs, selector_aggs)
else:
# ndim is either 2 (ny, nx) or 3 (ny, nx, ncat)
if cuda:
Expand All @@ -1968,10 +1987,9 @@ def wrapped_combine(aggs, selector_aggs):
combine_cuda_3d[cuda_args(aggs[0].shape)](aggs, selector_aggs)
else:
if ndim == 2:
# 3d view of each agg, note use of numba typed list.
aggs = List([np.expand_dims(agg, 2) for agg in aggs])
selector_aggs = List([np.expand_dims(agg, 2) for agg in selector_aggs])
combine_cpu(aggs, selector_aggs)
combine_cpu_2d(aggs, selector_aggs)
else:
combine_cpu_3d(aggs, selector_aggs)

return ret

Expand Down