Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-389] Created kvs by storing ports to files #391

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 13 additions & 21 deletions mllib-dal/src/main/native/CorrelationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,20 @@ static void doCorrelationOneAPICompute(

JNIEXPORT jlong JNICALL
Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(
JNIEnv *env, jobject obj, jlong pNumTabData, jlong numRows, jlong numCols,
jint executorNum, jint executorCores, jint computeDeviceOrdinal,
jintArray gpuIdxArray, jobject resultObj) {
JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows,
jlong numCols, jint executorNum, jint executorCores,
jint computeDeviceOrdinal, jintArray gpuIdxArray, jstring store_path,
jobject resultObj) {
logger::println(logger::INFO,
"oneDAL (native): use DPC++ kernels; device %s",
ComputeDeviceString[computeDeviceOrdinal].c_str());

ccl::communicator &cclComm = getComm();
int rankId = cclComm.rank();
ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
switch (device) {
case ComputeDevice::host:
case ComputeDevice::cpu: {
ccl::communicator &cclComm = getComm();
int rankId = cclComm.rank();
NumericTablePtr pData = *((NumericTablePtr *)pNumTabData);
// Set number of threads for oneDAL to use for each rank
services::Environment::getInstance()->setNumberOfThreads(executorCores);
Expand All @@ -225,26 +226,17 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rankId);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

int size = cclComm.size();
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

auto queue =
getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu);
const char* path = env->GetStringUTFChars(store_path, nullptr);
ccl::string kvs_store_path(str);
auto comm = createDalCommunicator(executorNum, rank, kvs_store_path);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, size, rankId, kvs);
doCorrelationOneAPICompute(env, pNumTabData, numRows, numCols, comm,
resultObj);
env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
env->ReleaseStringUTFChars(store_path, path);
break;
}
#endif
Expand Down
35 changes: 12 additions & 23 deletions mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,46 +300,35 @@ static jobject doRFClassifierOneAPICompute(
*/
JNIEXPORT jobject JNICALL
Java_com_intel_oap_mllib_classification_RandomForestClassifierDALImpl_cRFClassifierTrainDAL(
JNIEnv *env, jobject obj, jlong pNumTabFeature, jlong featureRows,
jlong featureCols, jlong pNumTabLabel, jlong labelCols, jint executorNum,
jint computeDeviceOrdinal, jint classCount, jint treeCount,
jint numFeaturesPerNode, jint minObservationsLeafNode,
JNIEnv *env, jobject obj, jint rank, jlong pNumTabFeature,
jlong featureRows, jlong featureCols, jlong pNumTabLabel, jlong labelCols,
jint executorNum, jint computeDeviceOrdinal, jint classCount,
jint treeCount, jint numFeaturesPerNode, jint minObservationsLeafNode,
jint minObservationsSplitNode, jdouble minWeightFractionLeafNode,
jdouble minImpurityDecreaseSplitNode, jint maxTreeDepth, jlong seed,
jint maxBins, jboolean bootstrap, jintArray gpuIdxArray,
jint maxBins, jboolean bootstrap, jintArray gpuIdxArray, jstring store_path,
jobject resultObj) {
logger::println(logger::INFO, "oneDAL (native): use DPC++ kernels");

ccl::communicator &cclComm = getComm();
int rankId = cclComm.rank();
ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
switch (device) {
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rankId);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

int size = cclComm.size();
ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

auto queue =
getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu);
const char* path = env->GetStringUTFChars(store_path, nullptr);
ccl::string kvs_store_path(str);
auto comm = createDalCommunicator(executorNum, rank, kvs_store_path);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, size, rankId, kvs);
jobject hashmapObj = doRFClassifierOneAPICompute(
env, pNumTabFeature, featureRows, featureCols, pNumTabLabel,
labelCols, executorNum, computeDeviceOrdinal, classCount, treeCount,
numFeaturesPerNode, minObservationsLeafNode,
minObservationsSplitNode, minWeightFractionLeafNode,
minImpurityDecreaseSplitNode, maxTreeDepth, seed, maxBins,
bootstrap, comm, resultObj);
env->ReleaseStringUTFChars(store_path, path);
return hashmapObj;
}
default: {
Expand Down
35 changes: 13 additions & 22 deletions mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,42 +292,33 @@ static jobject doRFRegressorOneAPICompute(

JNIEXPORT jobject JNICALL
Java_com_intel_oap_mllib_regression_RandomForestRegressorDALImpl_cRFRegressorTrainDAL(
JNIEnv *env, jobject obj, jlong pNumTabFeature, jlong featureRows,
jlong featureCols, jlong pNumTabLabel, jlong labelCols, jint executorNum,
jint computeDeviceOrdinal, jint treeCount, jint numFeaturesPerNode,
jint minObservationsLeafNode, jint maxTreeDepth, jlong seed, jint maxbins,
jboolean bootstrap, jintArray gpuIdxArray, jobject resultObj) {
JNIEnv *env, jobject obj, jint rank, jlong pNumTabFeature,
jlong featureRows, jlong featureCols, jlong pNumTabLabel, jlong labelCols,
jint executorNum, jint computeDeviceOrdinal, jint treeCount,
jint numFeaturesPerNode, jint minObservationsLeafNode, jint maxTreeDepth,
jlong seed, jint maxbins, jboolean bootstrap, jintArray gpuIdxArray,
jstring store_path, jobject resultObj) {
logger::println(logger::INFO,
"OneDAL (native): use DPC++ kernels; device %s",
ComputeDeviceString[computeDeviceOrdinal].c_str());

ccl::communicator &cclComm = getComm();
int rankId = cclComm.rank();
ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
switch (device) {
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rankId);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

int size = cclComm.size();
logger::println(logger::INFO,
"OneDAL (native): use GPU kernels with rankid %d",
rank);

auto queue =
getAssignedGPU(device, cclComm, size, rankId, gpuIndices, nGpu);
const char* path = env->GetStringUTFChars(store_path, nullptr);
ccl::string kvs_store_path(str);
auto comm = createDalCommunicator(executorNum, rank, kvs_store_path);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, size, rankId, kvs);
jobject hashmapObj = doRFRegressorOneAPICompute(
env, pNumTabFeature, featureRows, featureCols, pNumTabLabel,
labelCols, executorNum, computeDeviceOrdinal, treeCount,
numFeaturesPerNode, minObservationsLeafNode, maxTreeDepth, seed,
maxbins, bootstrap, comm, resultObj);
env->ReleaseStringUTFChars(store_path, path);
return hashmapObj;
}
default: {
Expand Down
114 changes: 98 additions & 16 deletions mllib-dal/src/main/native/GPU.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@

#include "GPU.h"
#include "Logger.h"
#define STORE_TIMEOUT_SEC 120
#define KVS_CREATE_SUCCESS 0
#define KVS_CREATE_FAILURE -1

typedef std::shared_ptr<sycl::queue> queuePtr;

static std::mutex g_mtx;
static std::vector<sycl::queue> g_queueVector;
std::shared_ptr<file_store> store;

static std::vector<sycl::device> get_gpus() {
auto platforms = sycl::platform::get_platforms();
Expand All @@ -24,8 +27,56 @@ static std::vector<sycl::device> get_gpus() {
return {};
}

int create_kvs_by_store(std::shared_ptr<file_store> store, int rank,
ccl::shared_ptr_class<ccl::kvs> &kvs) {
logger::println(logger::INFO, "OneCCL (native): create_kvs_by_store ");
auto t1 = std::chrono::high_resolution_clock::now();
ccl::kvs::address_type main_addr;
auto start = std::chrono::system_clock::now();
if (rank == 0) {
kvs = ccl::create_main_kvs();
main_addr = kvs->get_address();
if (store->write((void *)main_addr.data(), main_addr.size()) < 0) {
logger::println(
logger::INFO,
"OneCCL (native): error occurred during write attempt");
kvs.reset();
return KVS_CREATE_FAILURE;
}
auto end = std::chrono::system_clock::now();
auto exec_time =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(end -
start)
.count();
logger::println(logger::INFO,
"OneCCL (native): write to store time %f secs",
exec_time / 1000);
} else {
if (store->read((void *)main_addr.data(), main_addr.size()) < 0) {
logger::println(
logger::INFO,
"OneCCL (native): error occurred during read attempt");
kvs.reset();
return KVS_CREATE_FAILURE;
}
auto end = std::chrono::system_clock::now();
auto exec_time =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(end -
start)
.count();
logger::println(logger::INFO,
"OneCCL (native): read from store time %f secs",
exec_time / 1000);
kvs = ccl::create_kvs(main_addr);
}
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
return KVS_CREATE_SUCCESS;
}

static int getLocalRank(ccl::communicator &comm, int size, int rank) {
const int MPI_MAX_PROCESSOR_NAME = 128;
/* Obtain local rank among nodes sharing the same host name */
char zero = static_cast<char>(0);
std::vector<char> name(MPI_MAX_PROCESSOR_NAME + 1, zero);
Expand Down Expand Up @@ -66,8 +117,7 @@ static sycl::queue getSyclQueue(const sycl::device device) {
}
}

sycl::queue getAssignedGPU(const ComputeDevice device, ccl::communicator &comm,
int size, int rankId, jint *gpu_indices, int n_gpu) {
sycl::queue getAssignedGPU(const ComputeDevice device, int *gpu_indices) {
switch (device) {
case ComputeDevice::host:
case ComputeDevice::cpu: {
Expand All @@ -78,19 +128,8 @@ sycl::queue getAssignedGPU(const ComputeDevice device, ccl::communicator &comm,
}
case ComputeDevice::gpu: {
logger::println(logger::INFO, "selector GPU");
auto local_rank = getLocalRank(comm, size, rankId);
auto gpus = get_gpus();

logger::println(logger::INFO,
"rank: %d size: %d local_rank: %d n_gpu: %d", rankId,
size, local_rank, n_gpu);

auto gpu_selected = gpu_indices[local_rank % n_gpu];
logger::println(logger::INFO, "GPU selected for current rank: %d",
gpu_selected);

// In case gpu_selected index is larger than number of GPU SYCL devices
auto rank_gpu = gpus[gpu_selected % gpus.size()];
auto rank_gpu = gpus[0];
sycl::queue q{rank_gpu};
return q;
}
Expand Down Expand Up @@ -125,3 +164,46 @@ sycl::queue getQueue(const ComputeDevice device) {
}
}
}

preview::spmd::communicator<preview::spmd::device_memory_access::usm>
createDalCommunicator(const jint executorNum, const jint rank,
const ccl::string kvs_store_path) {
auto gpus = get_gpus();

auto t1 = std::chrono::high_resolution_clock::now();

ccl::init();

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();

logger::println(logger::INFO, "OneCCL singleton init took %f secs",
duration / 1000);

t1 = std::chrono::high_resolution_clock::now();
ccl::shared_ptr_class<ccl::kvs> kvs;

store = std::make_shared<file_store>(
kvs_store_path, rank, std::chrono::seconds(STORE_TIMEOUT_SEC));
if (create_kvs_by_store(store, rank, kvs) != KVS_CREATE_SUCCESS) {
logger::println(logger::INFO, "can not create kvs by store");
throw std::runtime_error("Failed to create communicator");
}
t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::println(logger::INFO, "OneCCL (native): create kvs took %f secs",
duration / 1000);
sycl::queue queue{gpus[0]};
t1 = std::chrono::high_resolution_clock::now();
auto comm = preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
return comm;
}
8 changes: 5 additions & 3 deletions mllib-dal/src/main/native/GPU.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#pragma once

#include "Communicator.hpp"
#include "service.h"
#include "store.hpp"
#include <CL/cl.h>
#include <CL/sycl.hpp>
#include <daal_sycl.h>
#include <jni.h>
#include <oneapi/ccl.hpp>

sycl::queue getAssignedGPU(const ComputeDevice device, ccl::communicator &comm,
int size, int rankId, jint *gpu_indices, int n_gpu);
sycl::queue getAssignedGPU(const ComputeDevice device, jint *gpu_indices);

sycl::queue getQueue(const ComputeDevice device);
preview::spmd::communicator<preview::spmd::device_memory_access::usm>
createDalCommunicator(jint executorNum, jint rank, ccl::string ccl_ip_port);
Loading
Loading