Skip to content

Commit

Permalink
change pointer type to void for generalizing datatype
Browse files Browse the repository at this point in the history
  • Loading branch information
chrishkchris committed Nov 8, 2020
1 parent 3f6e15a commit 993f93b
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 44 deletions.
10 changes: 5 additions & 5 deletions include/singa/io/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ class Communicator {
float sparsThreshold, bool topK, Context *ctx);
void _sparsification(Tensor &t, Tensor *accumulation, float sparsThreshold,
bool topK, Context *ctx);
void valSparsAllReduce(size_t num, float *accumulation, Context *ctx);
void topKSparsAllReduce(size_t num, float *accumulation, Context *ctx);
void valSparsAllReduce(size_t num, void *accumulation, Context *ctx);
void topKSparsAllReduce(size_t num, void *accumulation, Context *ctx);

// last group of synchronized memory blocks
std::shared_ptr<Device> device_ = nullptr;
Expand Down Expand Up @@ -145,9 +145,9 @@ class Communicator {
int *nnzGPU;
int *nnzAllGPU;
float threshold;
float *sparsSendBuff;
float *sparsRecvBuff;
float *backupBuff;
void *sparsSendBuff;
void *sparsRecvBuff;
void *backupBuff;
int *fusedIndex;
};
} // namespace singa
Expand Down
120 changes: 81 additions & 39 deletions src/io/communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ void Communicator::sparsInit() {

void Communicator::allReduce(int size, void *sendbuff, void *recvbuff,
ncclDataType_t ncclType, Context *ctx) {
NCCLCHECK(ncclAllReduce((const void *)sendbuff, (void *)recvbuff, size,
ncclType, ncclSum, comm, ctx->s));
NCCLCHECK(ncclAllReduce((const void *)sendbuff, recvbuff, size, ncclType,
ncclSum, comm, ctx->s));
}

void Communicator::generateBlocks(Tensor &t) {
Expand Down Expand Up @@ -491,6 +491,14 @@ void Communicator::sparsification(Tensor &t, float sparsThreshold, bool topK) {
void Communicator::_sparsification(Tensor &t, Tensor *accumulation,
float sparsThreshold, bool topK,
Context *ctx) {
if (t.data_type() == kFloat16) {
ncclType = ncclHalf;
dataSize = sizeof(__half);
} else {
ncclType = ncclFloat;
dataSize = sizeof(float);
}

// threshold for sprasification
threshold = sparsThreshold;

Expand All @@ -500,13 +508,13 @@ void Communicator::_sparsification(Tensor &t, Tensor *accumulation,

// memory copy to fusedBuff
CUDA_CHECK(cudaMemcpyAsync(
(void *)fusedSendBuff, (const void *)t.block()->mutable_data(),
fusedSendBuff, (const void *)t.block()->mutable_data(),
t.Size() * sizeof(float), cudaMemcpyDeviceToDevice, ctx->c1));

float *accumPtr;
void *accumPtr;

if (accumulation != NULL)
accumPtr = (float *)accumulation->block()->mutable_data();
accumPtr = accumulation->block()->mutable_data();
else
accumPtr = NULL;

Expand All @@ -516,9 +524,9 @@ void Communicator::_sparsification(Tensor &t, Tensor *accumulation,
topKSparsAllReduce(t.Size(), accumPtr, ctx);

// copy data back to tensor after allreduce
CUDA_CHECK(cudaMemcpyAsync(
(void *)t.block()->mutable_data(), (const void *)fusedRecvBuff,
t.Size() * sizeof(float), cudaMemcpyDeviceToDevice, ctx->c2));
CUDA_CHECK(cudaMemcpyAsync((void *)t.block()->mutable_data(),
(const void *)fusedRecvBuff, t.Size() * dataSize,
cudaMemcpyDeviceToDevice, ctx->c2));
}

void Communicator::fusedSparsification(vector<Tensor> &t, Tensor &accumulation,
Expand Down Expand Up @@ -551,6 +559,14 @@ void Communicator::fusedSparsification(vector<Tensor> &t, float sparsThreshold,
void Communicator::_fusedSparsification(vector<Tensor> &t, Tensor *accumulation,
float sparsThreshold, bool topK,
Context *ctx) {
if (t[0].data_type() == kFloat16) {
ncclType = ncclHalf;
dataSize = sizeof(__half);
} else {
ncclType = ncclFloat;
dataSize = sizeof(float);
}

// threshold for sprasification
threshold = sparsThreshold;

Expand All @@ -562,17 +578,21 @@ void Communicator::_fusedSparsification(vector<Tensor> &t, Tensor *accumulation,

// memory copy to fusedBuff
for (size_t i = 0; i < t.size(); i++) {
if (t[0].data_type() == kFloat16) {
offsetPointer = (void *)(static_cast<__half *>(fusedSendBuff) + offset);
} else {
offsetPointer = (void *)(static_cast<float *>(fusedSendBuff) + offset);
}
CUDA_CHECK(cudaMemcpyAsync(
(void *)(static_cast<float *>(fusedSendBuff) + offset),
(const void *)t[i].block()->mutable_data(), t[i].Size() * sizeof(float),
cudaMemcpyDeviceToDevice, ctx->c1));
offsetPointer, (const void *)t[i].block()->mutable_data(),
t[i].Size() * dataSize, cudaMemcpyDeviceToDevice, ctx->c1));
offset += t[i].Size();
}

float *accumPtr;
void *accumPtr;

if (accumulation != NULL)
accumPtr = (float *)accumulation->block()->mutable_data();
accumPtr = accumulation->block()->mutable_data();
else
accumPtr = NULL;

Expand All @@ -584,24 +604,32 @@ void Communicator::_fusedSparsification(vector<Tensor> &t, Tensor *accumulation,
// copy data back to tensors after allreduce
offset = 0;
for (size_t i = 0; i < t.size(); i++) {
if (t[0].data_type() == kFloat16) {
offsetPointer = (void *)(static_cast<__half *>(fusedRecvBuff) + offset);
} else {
offsetPointer = (void *)(static_cast<float *>(fusedRecvBuff) + offset);
}
CUDA_CHECK(cudaMemcpyAsync(
(void *)t[i].block()->mutable_data(),
(const void *)(static_cast<float *>(fusedRecvBuff) + offset),
t[i].Size() * sizeof(float), cudaMemcpyDeviceToDevice, ctx->c2));
(void *)t[i].block()->mutable_data(), (const void *)(offsetPointer),
t[i].Size() * dataSize, cudaMemcpyDeviceToDevice, ctx->c2));
offset += t[i].Size();
}
}

void Communicator::valSparsAllReduce(size_t num, float *accumulation,
void Communicator::valSparsAllReduce(size_t num, void *accumulation,
Context *ctx) {
CHECK_EQ(dataSize, sizeof(float))
<< "This function depends on thrust and support only fp32 currently";

if (sparsInitialized == false) sparsInit();

if (accumulation != NULL) {
// add the previous accumulation
cuda::add(num, static_cast<float *>(fusedSendBuff), accumulation,
cuda::add(num, static_cast<float *>(fusedSendBuff),
static_cast<float *>(accumulation),
static_cast<float *>(fusedSendBuff), ctx->c1);
// backup the fusedSendBuff
CUDA_CHECK(cudaMemcpyAsync((void *)backupBuff, (const void *)fusedSendBuff,
CUDA_CHECK(cudaMemcpyAsync(backupBuff, (const void *)fusedSendBuff,
sizeof(float) * num, cudaMemcpyDeviceToDevice,
ctx->c1));
}
Expand All @@ -612,8 +640,9 @@ void Communicator::valSparsAllReduce(size_t num, float *accumulation,

// output the gradient accumulation
if (accumulation != NULL)
cuda::sub(num, backupBuff, static_cast<float *>(fusedSendBuff),
accumulation, ctx->c1);
cuda::sub(num, static_cast<float *>(backupBuff),
static_cast<float *>(fusedSendBuff),
static_cast<float *>(accumulation), ctx->c1);

// produce the index of the sparse array
cuda::sparsindex(num, static_cast<float *>(fusedSendBuff), fusedIndex,
Expand Down Expand Up @@ -642,19 +671,20 @@ void Communicator::valSparsAllReduce(size_t num, float *accumulation,
// remove zero of values to become sprase array
cuda::removezeroval(num, static_cast<float *>(fusedSendBuff), ctx->c1);

CUDA_CHECK(cudaMemcpyAsync((void *)(sparsSendBuff), (const void *)fusedIndex,
CUDA_CHECK(cudaMemcpyAsync(sparsSendBuff, (const void *)fusedIndex,
sizeof(int) * (*nnz), cudaMemcpyDeviceToDevice,
ctx->c1));
CUDA_CHECK(cudaMemcpyAsync(
(void *)(sparsSendBuff + (*nnz)), (const void *)fusedSendBuff,
sizeof(float) * (*nnz), cudaMemcpyDeviceToDevice, ctx->c1));
CUDA_CHECK(
cudaMemcpyAsync((void *)(static_cast<float *>(sparsSendBuff) + (*nnz)),
(const void *)fusedSendBuff, sizeof(float) * (*nnz),
cudaMemcpyDeviceToDevice, ctx->c1));

// wait for the memcpy to complete
CUDA_CHECK(cudaEventRecord(event, ctx->c1));
CUDA_CHECK(cudaStreamWaitEvent(ctx->s, event, 0));

// all-gather all the sparse gradients
NCCLCHECK(ncclAllGather((const void *)sparsSendBuff, (void *)sparsRecvBuff,
NCCLCHECK(ncclAllGather((const void *)sparsSendBuff, sparsRecvBuff,
2 * nnzMax, ncclFloat, comm, ctx->s));

// wait for the all-gather to complete
Expand All @@ -673,11 +703,13 @@ void Communicator::valSparsAllReduce(size_t num, float *accumulation,

for (int i = 0; i < world_size; i++) {
CUDA_CHECK(cudaMemcpyAsync(
(void *)xInd, (const void *)(sparsRecvBuff + offset),
(void *)xInd,
(const void *)(static_cast<float *>(sparsRecvBuff) + offset),
sizeof(int) * nnzAll[i], cudaMemcpyDeviceToDevice, ctx->c2));
offset += nnzAll[i];
CUDA_CHECK(cudaMemcpyAsync(
(void *)xVal, (const void *)(sparsRecvBuff + offset),
(void *)xVal,
(const void *)(static_cast<float *>(sparsRecvBuff) + offset),
sizeof(float) * nnzAll[i], cudaMemcpyDeviceToDevice, ctx->c2));
offset += (2 * nnzMax - nnzAll[i]);
CUSPARSE_CHECK(cusparseSaxpyi(cusparse_handle, nnzAll[i], &alpha, xVal,
Expand All @@ -686,17 +718,21 @@ void Communicator::valSparsAllReduce(size_t num, float *accumulation,
}
}

void Communicator::topKSparsAllReduce(size_t num, float *accumulation,
void Communicator::topKSparsAllReduce(size_t num, void *accumulation,
Context *ctx) {
CHECK_EQ(dataSize, sizeof(float))
<< "This function depends on thrust and support only fp32 currently";

if (sparsInitialized == false) sparsInit();

// use gradient accumulation
if (accumulation != NULL) {
// add the previous accumulation
cuda::add(num, static_cast<float *>(fusedSendBuff), accumulation,
cuda::add(num, static_cast<float *>(fusedSendBuff),
static_cast<float *>(accumulation),
static_cast<float *>(fusedSendBuff), ctx->c1);
// backup the fusedSendBuff
CUDA_CHECK(cudaMemcpyAsync((void *)backupBuff, (const void *)fusedSendBuff,
CUDA_CHECK(cudaMemcpyAsync(backupBuff, (const void *)fusedSendBuff,
sizeof(float) * num, cudaMemcpyDeviceToDevice,
ctx->c1));
}
Expand All @@ -716,17 +752,21 @@ void Communicator::topKSparsAllReduce(size_t num, float *accumulation,
CUSPARSE_CHECK(cusparseSetStream(cusparse_handle, ctx->c1));
CUSPARSE_CHECK(cusparseSaxpyi(
cusparse_handle, nnzMax, &alpha, static_cast<float *>(fusedSendBuff),
fusedIndex, accumulation, CUSPARSE_INDEX_BASE_ONE));
cuda::sub(num, backupBuff, accumulation, accumulation, ctx->c1);
fusedIndex, static_cast<float *>(accumulation),
CUSPARSE_INDEX_BASE_ONE));
cuda::sub(num, static_cast<float *>(backupBuff),
static_cast<float *>(accumulation),
static_cast<float *>(accumulation), ctx->c1);
}

// the topK value and index will be sent
CUDA_CHECK(cudaMemcpyAsync((void *)(sparsSendBuff), (const void *)fusedIndex,
CUDA_CHECK(cudaMemcpyAsync(sparsSendBuff, (const void *)fusedIndex,
sizeof(int) * nnzMax, cudaMemcpyDeviceToDevice,
ctx->c1));
CUDA_CHECK(cudaMemcpyAsync(
(void *)(sparsSendBuff + nnzMax), (const void *)fusedSendBuff,
sizeof(float) * nnzMax, cudaMemcpyDeviceToDevice, ctx->c1));
CUDA_CHECK(
cudaMemcpyAsync((void *)(static_cast<float *>(sparsSendBuff) + nnzMax),
(const void *)fusedSendBuff, sizeof(float) * nnzMax,
cudaMemcpyDeviceToDevice, ctx->c1));

// wait for the memcpy to complete
CUDA_CHECK(cudaEventRecord(event, ctx->c1));
Expand All @@ -751,11 +791,13 @@ void Communicator::topKSparsAllReduce(size_t num, float *accumulation,
// all-reduce process
for (int i = 0; i < world_size; i++) {
CUDA_CHECK(cudaMemcpyAsync(
(void *)xInd, (const void *)(sparsRecvBuff + offset),
(void *)xInd,
(const void *)(static_cast<float *>(sparsRecvBuff) + offset),
sizeof(int) * nnzMax, cudaMemcpyDeviceToDevice, ctx->c2));
offset += nnzMax;
CUDA_CHECK(cudaMemcpyAsync(
(void *)xVal, (const void *)(sparsRecvBuff + offset),
(void *)xVal,
(const void *)(static_cast<float *>(sparsRecvBuff) + offset),
sizeof(float) * nnzMax, cudaMemcpyDeviceToDevice, ctx->c2));
offset += nnzMax;
CUSPARSE_CHECK(cusparseSaxpyi(cusparse_handle, nnzMax, &alpha, xVal, xInd,
Expand Down

0 comments on commit 993f93b

Please sign in to comment.