Skip to content

Commit

Permalink
Overlapped all communication within a single Alltoallw in NCCL backend
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas committed Feb 13, 2024
1 parent ceccc7d commit c3c20ff
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 128 deletions.
12 changes: 6 additions & 6 deletions mpi4py_fft/libfft.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ def _Xfftn_plan_cupy(shape, axes, dtype, transforms, options):
plan_bck = cp.fft.ifftn

s = tuple(np.take(shape, axes))
U = cp.array(fftw.aligned(shape, dtype=dtype)) # TODO: avoid going via CPU
_V = plan_fwd(U, s=s, axes=axes)
V = cp.array(fftw.aligned_like(_V.get())) # TODO: avoid going via CPU
U = cp.empty(shape=shape, dtype=dtype)
V = plan_fwd(U, s=s, axes=axes)
V = cp.array(V)
M = np.prod(s)

# CuPy has forward transform unscaled and backward scaled with 1/N
Expand Down Expand Up @@ -164,9 +164,9 @@ def _Xfftn_plan_cupyx_scipy(shape, axes, dtype, transforms, options):
plan_bck = cufft.ifftn

s = tuple(np.take(shape, axes))
U = cp.array(fftw.aligned(shape, dtype=dtype)) # TODO: Skip CPU detour
V = plan_fwd(U, shape=s, axes=axes)
V = cp.array(fftw.aligned_like(V.get())) # TODO: skip CPU detour
U = cp.empty(shape=shape, dtype=dtype)
V = plan_fwd(U, s=s, axes=axes)
V = cp.array(V)
M = np.prod(s)
return (_Yfftn_wrap(plan_fwd, U, V, 1, {'shape': s, 'axes': axes, 'overwrite_x': True}),
_Yfftn_wrap(plan_bck, V, U, M, {'shape': s, 'axes': axes, 'overwrite_x': True}))
Expand Down
168 changes: 47 additions & 121 deletions mpi4py_fft/pencil.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ def _subarraytypes(comm, shape, axis, subshape, dtype):
return tuple(datatypes)


def get_slice(subtype):
"""
Extract from the subtype object generated for MPI what shape the buffer
should have and what part of the array we want to send / receive.
"""
decoded = subtype.decode()
subsizes = decoded[2]['subsizes']
starts = decoded[2]['starts']
return tuple(slice(starts[i], starts[i] + subsizes[i]) for i in range(len(starts)))


class Subcomm(tuple):
r"""Class returning a tuple of subcommunicators of any dimensionality
Expand Down Expand Up @@ -236,32 +247,19 @@ def synchronize_stream():
send_to = (rank + i) % size
recv_from = (rank -i + size) % size

sliceA, shapeA = self.get_slice_and_shape(subtypesA[send_to])
sliceB, shapeB = self.get_slice_and_shape(subtypesB[recv_from])
sliceA = get_slice(subtypesA[send_to])
sliceB = get_slice(subtypesB[recv_from])

if send_to == rank:
arrayB[sliceB][:] = arrayA[sliceA][:]
else:
recvbuf = xp.empty(shapeB, dtype=self.dtype)
sendbuf = xp.empty(shapeA, dtype=self.dtype)
sendbuf[:] = arrayA[sliceA][:]
recvbuf = xp.ascontiguousarray(arrayB[sliceB])
sendbuf = xp.ascontiguousarray(arrayA[sliceA])

synchronize_stream()
comm.Sendrecv(sendbuf, send_to, recvbuf=recvbuf, source=recv_from)
arrayB[sliceB][:] = recvbuf[:]

@staticmethod
def get_slice_and_shape(subtype):
"""
Extract from the subtype object generated for MPI what shape the buffer
should have and what part of the array we want to send / receive.
"""
decoded = subtype.decode()
subsizes = decoded[2]['subsizes']
starts = decoded[2]['starts']
return tuple(slice(starts[i], starts[i] + subsizes[i]) for i in range(len(starts))), subsizes



class NCCLTransfer(Transfer):
"""
Expand All @@ -275,124 +273,52 @@ def __init__(self, *args, **kwargs):
from cupy.cuda import nccl
self.comm_nccl = nccl.NcclCommunicator(self.comm.size, self.comm.bcast(nccl.get_unique_id(), root=0), self.comm.rank)

# determine how to send the data. If we have complex numbers, we need to send two real numbers.
if self.dtype in [np.dtype('float32'), np.dtype('complex64')]:
self.NCCL_dtype = nccl.NCCL_FLOAT32
elif self.dtype in [np.dtype('float64'), np.dtype('complex128')]:
self.NCCL_dtype = nccl.NCCL_FLOAT64
elif self.dtype in [np.dtype('int32')]:
self.NCCL_dtype = nccl.NCCL_INT32
elif self.dtype in [np.dtype('int64')]:
self.NCCL_dtype = nccl.NCCL_INT64
else:
raise NotImplementedError(f'Don\'t know what NCCL dtype to use to send data of dtype {self.dtype}!')
self.count_modifier = 2 if 'complex' in str(self.dtype) else 1

def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB):
"""
Redistribute arrayA to arrayB.
As NCCL does not have all to all, we replicate it by a bunch of individual send and receives.
As NCCL also does not have complex datatypes, we have to send real and imaginary parts separately.
"""
import cupy as cp
from cupy.cuda import nccl
assert type(arrayA) == cp.ndarray
assert type(arrayB) == cp.ndarray
assert arrayA.dtype == arrayB.dtype
assert self.comm.rank == self.comm_nccl.rank_id(), f'The structure of the communicator has changed unexpectedly'

rank, size, comm = self.comm.rank, self.comm.size, self.comm_nccl
iscomplex = cp.iscomplexobj(arrayA)
NCCL_dtype, real_dtype = self.get_nccl_and_real_dtypes(arrayA)
stream = cp.cuda.get_current_stream()

stream = cp.cuda.Stream(null=True)
stream.use()
# initialize dictionaries required to overlap sends
recvbufs = {}
sendbufs = {}
sliceBs = {}

for i in range(size):
# perform all sends and receives in a single kernel to allow overlap
cp.cuda.nccl.groupStart()
for i in range(1, size + 1):
send_to = (rank + i) % size
recv_from = (rank -i + size) % size

# prepare receive buffer
local_slice, shape = self.get_slice_and_shape(subtypesB[recv_from])
recv_buff = self.get_buffer(shape, iscomplex, real_dtype)

# prepare send buffer
send_slice, send_shape = self.get_slice_and_shape(subtypesA[send_to])

# send / receive
if send_to == rank:
self.fill_buffer(recv_buff, arrayA, send_slice, iscomplex)
else:
send_buff = self.get_buffer(send_shape, iscomplex, real_dtype)
self.fill_buffer(send_buff, arrayA, send_slice, iscomplex)

# perform all sends and receives in a single kernel to allow overlap
cp.cuda.nccl.groupStart()
comm.recv(recv_buff.data.ptr, recv_buff.size, NCCL_dtype, recv_from, stream.ptr)
comm.send(send_buff.data.ptr, send_buff.size, NCCL_dtype, send_to, stream.ptr)
cp.cuda.nccl.groupEnd()
sliceA = get_slice(subtypesA[send_to])
sliceBs[i] = get_slice(subtypesB[recv_from])

self.unpack_buffer(recv_buff, arrayB, local_slice, iscomplex)
recvbufs[i] = cp.ascontiguousarray(arrayB[sliceBs[i]])
sendbufs[i] = cp.ascontiguousarray(arrayA[sliceA])

cp.cuda.Stream(null=True).use()
comm.recv(recvbufs[i].data.ptr, recvbufs[i].size * self.count_modifier, self.NCCL_dtype, recv_from, stream.ptr)
comm.send(sendbufs[i].data.ptr, sendbufs[i].size * self.count_modifier, self.NCCL_dtype, send_to, stream.ptr)
cp.cuda.nccl.groupEnd()

@staticmethod
def get_slice_and_shape(subtype):
"""
Extract from the subtype object generated for MPI what shape the buffer
should have and what part of the array we want to send / receive.
"""
decoded = subtype.decode()
subsizes = decoded[2]['subsizes']
starts = decoded[2]['starts']
return tuple(slice(starts[i], starts[i] + subsizes[i]) for i in range(len(starts))), subsizes

@staticmethod
def get_nccl_and_real_dtypes(array):
"""
Translate the datatype of the array to a NCCL type for sending with NCCL.
As NCCL does not support complex types, we have to translate them to two
real values.
"""
from cupy.cuda import nccl
nccl_dtypes = {
np.dtype('float32'): nccl.NCCL_FLOAT32,
np.dtype('float64'): nccl.NCCL_FLOAT64,
np.dtype('complex64'): nccl.NCCL_FLOAT32,
np.dtype('complex128'): nccl.NCCL_FLOAT64,
}
real_dtypes = {
np.dtype('float32'): np.dtype('float32'),
np.dtype('float64'): np.dtype('float64'),
np.dtype('complex64'): np.dtype('float32'),
np.dtype('complex128'): np.dtype('float64'),
}
return nccl_dtypes[array.dtype], real_dtypes[array.dtype]

@staticmethod
def get_buffer(shape, iscomplex, real_dtype):
"""
Get a buffer for communication. If complex numbers are used, we send
two real values instead.
"""
import cupy as cp

if iscomplex:
return cp.ndarray(shape=[2,] + shape, dtype=real_dtype)
else:
return cp.ndarray(shape=shape, dtype=real_dtype)

@staticmethod
def fill_buffer(buff, array, local_slice, iscomplex):
"""
Fill buffer for communication. If complex numbers are used, we send
two real values instead.
"""
if iscomplex:
buff[0][:] = array[local_slice].real
buff[1][:] = array[local_slice].imag
else:
buff[:] = array[local_slice][:]

@staticmethod
def unpack_buffer(buff, array, local_slice, iscomplex):
"""
Unpack buffer for communication. If complex numbers are used, we get
two real values instead.
"""
if iscomplex:
array[local_slice].real = buff[0][:]
array[local_slice].imag = buff[1][:]
else:
array[local_slice][:] = buff[:]
# distribute sent data from buffers
for i in recvbufs.keys():
if arrayB[sliceBs[i]] is not recvbufs[i]:
arrayB[sliceBs[i]][:] = recvbufs[i][:]

def destroy(self):
super().destroy()
Expand Down
1 change: 0 additions & 1 deletion tests/test_pencil.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def test_pencil():
from cupy.cuda import nccl
backends += ['NCCL']
xp['NCCL'] = cp
cp.cuda.set_allocator(cp.cuda.MemoryAsyncPool().malloc)
except ImportError:
pass

Expand Down

0 comments on commit c3c20ff

Please sign in to comment.