diff --git a/include/singa/io/communicator.h b/include/singa/io/communicator.h index cb0fc7508e..8344873e96 100644 --- a/include/singa/io/communicator.h +++ b/include/singa/io/communicator.h @@ -106,8 +106,8 @@ class Communicator { float sparsThreshold, bool topK, Context *ctx); void _sparsification(Tensor &t, Tensor *accumulation, float sparsThreshold, bool topK, Context *ctx); - void valSparsAllReduce(size_t num, float *accumulation, Context *ctx); - void topKSparsAllReduce(size_t num, float *accumulation, Context *ctx); + void valSparsAllReduce(size_t num, void *accumulation, Context *ctx); + void topKSparsAllReduce(size_t num, void *accumulation, Context *ctx); // last group of synchronized memory blocks std::shared_ptr device_ = nullptr; @@ -145,9 +145,9 @@ class Communicator { int *nnzGPU; int *nnzAllGPU; float threshold; - float *sparsSendBuff; - float *sparsRecvBuff; - float *backupBuff; + void *sparsSendBuff; + void *sparsRecvBuff; + void *backupBuff; int *fusedIndex; }; } // namespace singa diff --git a/src/io/communicator.cc b/src/io/communicator.cc index 2068e61f99..97ac18d40d 100644 --- a/src/io/communicator.cc +++ b/src/io/communicator.cc @@ -140,8 +140,8 @@ void Communicator::sparsInit() { void Communicator::allReduce(int size, void *sendbuff, void *recvbuff, ncclDataType_t ncclType, Context *ctx) { - NCCLCHECK(ncclAllReduce((const void *)sendbuff, (void *)recvbuff, size, - ncclType, ncclSum, comm, ctx->s)); + NCCLCHECK(ncclAllReduce((const void *)sendbuff, recvbuff, size, ncclType, + ncclSum, comm, ctx->s)); } void Communicator::generateBlocks(Tensor &t) { @@ -491,6 +491,14 @@ void Communicator::sparsification(Tensor &t, float sparsThreshold, bool topK) { void Communicator::_sparsification(Tensor &t, Tensor *accumulation, float sparsThreshold, bool topK, Context *ctx) { + if (t.data_type() == kFloat16) { + ncclType = ncclHalf; + dataSize = sizeof(__half); + } else { + ncclType = ncclFloat; + dataSize = sizeof(float); + } + // threshold for sprasification threshold = sparsThreshold; @@ -500,13 +508,13 @@ void Communicator::_sparsification(Tensor &t, Tensor *accumulation, // memory copy to fusedBuff CUDA_CHECK(cudaMemcpyAsync( - (void *)fusedSendBuff, (const void *)t.block()->mutable_data(), + fusedSendBuff, (const void *)t.block()->mutable_data(), t.Size() * sizeof(float), cudaMemcpyDeviceToDevice, ctx->c1)); - float *accumPtr; + void *accumPtr; if (accumulation != NULL) - accumPtr = (float *)accumulation->block()->mutable_data(); + accumPtr = accumulation->block()->mutable_data(); else accumPtr = NULL; @@ -516,9 +524,9 @@ void Communicator::_sparsification(Tensor &t, Tensor *accumulation, topKSparsAllReduce(t.Size(), accumPtr, ctx); // copy data back to tensor after allreduce - CUDA_CHECK(cudaMemcpyAsync( - (void *)t.block()->mutable_data(), (const void *)fusedRecvBuff, - t.Size() * sizeof(float), cudaMemcpyDeviceToDevice, ctx->c2)); + CUDA_CHECK(cudaMemcpyAsync((void *)t.block()->mutable_data(), + (const void *)fusedRecvBuff, t.Size() * dataSize, + cudaMemcpyDeviceToDevice, ctx->c2)); } void Communicator::fusedSparsification(vector &t, Tensor &accumulation, @@ -551,6 +559,14 @@ void Communicator::fusedSparsification(vector &t, float sparsThreshold, void Communicator::_fusedSparsification(vector &t, Tensor *accumulation, float sparsThreshold, bool topK, Context *ctx) { + if (t[0].data_type() == kFloat16) { + ncclType = ncclHalf; + dataSize = sizeof(__half); + } else { + ncclType = ncclFloat; + dataSize = sizeof(float); + } + // threshold for sprasification threshold = sparsThreshold; @@ -562,17 +578,21 @@ void Communicator::_fusedSparsification(vector &t, Tensor *accumulation, // memory copy to fusedBuff for (size_t i = 0; i < t.size(); i++) { + if (t[0].data_type() == kFloat16) { + offsetPointer = (void *)(static_cast<__half *>(fusedSendBuff) + offset); + } else { + offsetPointer = (void *)(static_cast(fusedSendBuff) + offset); + } CUDA_CHECK(cudaMemcpyAsync( - (void *)(static_cast(fusedSendBuff) + offset), - (const void *)t[i].block()->mutable_data(), t[i].Size() * sizeof(float), - cudaMemcpyDeviceToDevice, ctx->c1)); + offsetPointer, (const void *)t[i].block()->mutable_data(), + t[i].Size() * dataSize, cudaMemcpyDeviceToDevice, ctx->c1)); offset += t[i].Size(); } - float *accumPtr; + void *accumPtr; if (accumulation != NULL) - accumPtr = (float *)accumulation->block()->mutable_data(); + accumPtr = accumulation->block()->mutable_data(); else accumPtr = NULL; @@ -584,24 +604,32 @@ void Communicator::_fusedSparsification(vector &t, Tensor *accumulation, // copy data back to tensors after allreduce offset = 0; for (size_t i = 0; i < t.size(); i++) { + if (t[0].data_type() == kFloat16) { + offsetPointer = (void *)(static_cast<__half *>(fusedRecvBuff) + offset); + } else { + offsetPointer = (void *)(static_cast(fusedRecvBuff) + offset); + } CUDA_CHECK(cudaMemcpyAsync( - (void *)t[i].block()->mutable_data(), - (const void *)(static_cast(fusedRecvBuff) + offset), - t[i].Size() * sizeof(float), cudaMemcpyDeviceToDevice, ctx->c2)); + (void *)t[i].block()->mutable_data(), (const void *)(offsetPointer), + t[i].Size() * dataSize, cudaMemcpyDeviceToDevice, ctx->c2)); offset += t[i].Size(); } } -void Communicator::valSparsAllReduce(size_t num, float *accumulation, +void Communicator::valSparsAllReduce(size_t num, void *accumulation, Context *ctx) { + CHECK_EQ(dataSize, sizeof(float)) + << "This function depends on thrust and support only fp32 currently"; + if (sparsInitialized == false) sparsInit(); if (accumulation != NULL) { // add the previous accumulation - cuda::add(num, static_cast(fusedSendBuff), accumulation, + cuda::add(num, static_cast(fusedSendBuff), + static_cast(accumulation), static_cast(fusedSendBuff), ctx->c1); // backup the fusedSendBuff - CUDA_CHECK(cudaMemcpyAsync((void *)backupBuff, (const void *)fusedSendBuff, + CUDA_CHECK(cudaMemcpyAsync(backupBuff, (const void *)fusedSendBuff, sizeof(float) * num, cudaMemcpyDeviceToDevice, ctx->c1)); } @@ -612,8 +640,9 @@ void Communicator::valSparsAllReduce(size_t num, float *accumulation, // output the gradient accumulation if (accumulation != NULL) - cuda::sub(num, backupBuff, static_cast(fusedSendBuff), - accumulation, ctx->c1); + cuda::sub(num, static_cast(backupBuff), + static_cast(fusedSendBuff), + static_cast(accumulation), ctx->c1); // produce the index of the sparse array cuda::sparsindex(num, static_cast(fusedSendBuff), fusedIndex, @@ -642,19 +671,20 @@ void Communicator::valSparsAllReduce(size_t num, float *accumulation, // remove zero of values to become sprase array cuda::removezeroval(num, static_cast(fusedSendBuff), ctx->c1); - CUDA_CHECK(cudaMemcpyAsync((void *)(sparsSendBuff), (const void *)fusedIndex, + CUDA_CHECK(cudaMemcpyAsync(sparsSendBuff, (const void *)fusedIndex, sizeof(int) * (*nnz), cudaMemcpyDeviceToDevice, ctx->c1)); - CUDA_CHECK(cudaMemcpyAsync( - (void *)(sparsSendBuff + (*nnz)), (const void *)fusedSendBuff, - sizeof(float) * (*nnz), cudaMemcpyDeviceToDevice, ctx->c1)); + CUDA_CHECK( + cudaMemcpyAsync((void *)(static_cast(sparsSendBuff) + (*nnz)), + (const void *)fusedSendBuff, sizeof(float) * (*nnz), + cudaMemcpyDeviceToDevice, ctx->c1)); // wait for the memcpy to complete CUDA_CHECK(cudaEventRecord(event, ctx->c1)); CUDA_CHECK(cudaStreamWaitEvent(ctx->s, event, 0)); // all-gather all the sparse gradients - NCCLCHECK(ncclAllGather((const void *)sparsSendBuff, (void *)sparsRecvBuff, + NCCLCHECK(ncclAllGather((const void *)sparsSendBuff, sparsRecvBuff, 2 * nnzMax, ncclFloat, comm, ctx->s)); // wait for the all-gather to complete @@ -673,11 +703,13 @@ void Communicator::valSparsAllReduce(size_t num, float *accumulation, for (int i = 0; i < world_size; i++) { CUDA_CHECK(cudaMemcpyAsync( - (void *)xInd, (const void *)(sparsRecvBuff + offset), + (void *)xInd, + (const void *)(static_cast(sparsRecvBuff) + offset), sizeof(int) * nnzAll[i], cudaMemcpyDeviceToDevice, ctx->c2)); offset += nnzAll[i]; CUDA_CHECK(cudaMemcpyAsync( - (void *)xVal, (const void *)(sparsRecvBuff + offset), + (void *)xVal, + (const void *)(static_cast(sparsRecvBuff) + offset), sizeof(float) * nnzAll[i], cudaMemcpyDeviceToDevice, ctx->c2)); offset += (2 * nnzMax - nnzAll[i]); CUSPARSE_CHECK(cusparseSaxpyi(cusparse_handle, nnzAll[i], &alpha, xVal, @@ -686,17 +718,21 @@ void Communicator::valSparsAllReduce(size_t num, float *accumulation, } } -void Communicator::topKSparsAllReduce(size_t num, float *accumulation, +void Communicator::topKSparsAllReduce(size_t num, void *accumulation, Context *ctx) { + CHECK_EQ(dataSize, sizeof(float)) + << "This function depends on thrust and support only fp32 currently"; + if (sparsInitialized == false) sparsInit(); // use gradient accumulation if (accumulation != NULL) { // add the previous accumulation - cuda::add(num, static_cast(fusedSendBuff), accumulation, + cuda::add(num, static_cast(fusedSendBuff), + static_cast(accumulation), static_cast(fusedSendBuff), ctx->c1); // backup the fusedSendBuff - CUDA_CHECK(cudaMemcpyAsync((void *)backupBuff, (const void *)fusedSendBuff, + CUDA_CHECK(cudaMemcpyAsync(backupBuff, (const void *)fusedSendBuff, sizeof(float) * num, cudaMemcpyDeviceToDevice, ctx->c1)); } @@ -716,17 +752,21 @@ void Communicator::topKSparsAllReduce(size_t num, float *accumulation, CUSPARSE_CHECK(cusparseSetStream(cusparse_handle, ctx->c1)); CUSPARSE_CHECK(cusparseSaxpyi( cusparse_handle, nnzMax, &alpha, static_cast(fusedSendBuff), - fusedIndex, accumulation, CUSPARSE_INDEX_BASE_ONE)); - cuda::sub(num, backupBuff, accumulation, accumulation, ctx->c1); + fusedIndex, static_cast(accumulation), + CUSPARSE_INDEX_BASE_ONE)); + cuda::sub(num, static_cast(backupBuff), + static_cast(accumulation), + static_cast(accumulation), ctx->c1); } // the topK value and index will be sent - CUDA_CHECK(cudaMemcpyAsync((void *)(sparsSendBuff), (const void *)fusedIndex, + CUDA_CHECK(cudaMemcpyAsync(sparsSendBuff, (const void *)fusedIndex, sizeof(int) * nnzMax, cudaMemcpyDeviceToDevice, ctx->c1)); - CUDA_CHECK(cudaMemcpyAsync( - (void *)(sparsSendBuff + nnzMax), (const void *)fusedSendBuff, - sizeof(float) * nnzMax, cudaMemcpyDeviceToDevice, ctx->c1)); + CUDA_CHECK( + cudaMemcpyAsync((void *)(static_cast(sparsSendBuff) + nnzMax), + (const void *)fusedSendBuff, sizeof(float) * nnzMax, + cudaMemcpyDeviceToDevice, ctx->c1)); // wait for the memcpy to complete CUDA_CHECK(cudaEventRecord(event, ctx->c1)); @@ -751,11 +791,13 @@ void Communicator::topKSparsAllReduce(size_t num, float *accumulation, // all-reduce process for (int i = 0; i < world_size; i++) { CUDA_CHECK(cudaMemcpyAsync( - (void *)xInd, (const void *)(sparsRecvBuff + offset), + (void *)xInd, + (const void *)(static_cast(sparsRecvBuff) + offset), sizeof(int) * nnzMax, cudaMemcpyDeviceToDevice, ctx->c2)); offset += nnzMax; CUDA_CHECK(cudaMemcpyAsync( - (void *)xVal, (const void *)(sparsRecvBuff + offset), + (void *)xVal, + (const void *)(static_cast(sparsRecvBuff) + offset), sizeof(float) * nnzMax, cudaMemcpyDeviceToDevice, ctx->c2)); offset += nnzMax; CUSPARSE_CHECK(cusparseSaxpyi(cusparse_handle, nnzMax, &alpha, xVal, xInd,