Skip to content

Commit

Permalink
[ML-380] Optimized code and reduced spark job on running GPU. (#384)
Browse files Browse the repository at this point in the history
* update spark to 3.3.3

Signed-off-by: minmingzhu <minming.zhu@intel.com>

* remove oneccl communicator

* update

* optimize code

* update optimize code

* update

* format code style

* update

* update

* format code style

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

---------

Signed-off-by: minmingzhu <minming.zhu@intel.com>
  • Loading branch information
minmingzhu authored Oct 23, 2024
1 parent e44b052 commit a1a62a6
Show file tree
Hide file tree
Showing 23 changed files with 170 additions and 235 deletions.
1 change: 0 additions & 1 deletion mllib-dal/src/main/native/Common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@
#endif

#include "GPU.h"
#include "Communicator.hpp"
#include "oneapi/dal/table/homogen.hpp"
18 changes: 4 additions & 14 deletions mllib-dal/src/main/native/CorrelationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,23 +225,13 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

auto queue = getAssignedGPU(device, gpuIndices);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
auto comm = getDalComm();
doCorrelationOneAPICompute(env, pNumTabData, numRows, numCols, comm,
resultObj);
env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
break;
}
#endif
Expand Down
19 changes: 4 additions & 15 deletions mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,22 +313,11 @@ Java_com_intel_oap_mllib_classification_RandomForestClassifierDALImpl_cRFClassif
ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
switch (device) {
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);

auto queue = getAssignedGPU(device, gpuIndices);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
auto comm = getDalComm();
jobject hashmapObj = doRFClassifierOneAPICompute(
env, pNumTabFeature, featureRows, featureCols, pNumTabLabel,
labelCols, executorNum, computeDeviceOrdinal, classCount, treeCount,
Expand Down
17 changes: 4 additions & 13 deletions mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,20 +305,11 @@ Java_com_intel_oap_mllib_regression_RandomForestRegressorDALImpl_cRFRegressorTra
ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
switch (device) {
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

auto queue = getAssignedGPU(device, gpuIndices);
logger::println(logger::INFO,
"OneDAL (native): use GPU kernels with rankid %d",
rank);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
auto comm = getDalComm();
jobject hashmapObj = doRFRegressorOneAPICompute(
env, pNumTabFeature, featureRows, featureCols, pNumTabLabel,
labelCols, executorNum, computeDeviceOrdinal, treeCount,
Expand Down
2 changes: 1 addition & 1 deletion mllib-dal/src/main/native/GPU.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ typedef std::shared_ptr<sycl::queue> queuePtr;
static std::mutex g_mtx;
static std::vector<sycl::queue> g_queueVector;

static std::vector<sycl::device> get_gpus() {
std::vector<sycl::device> get_gpus() {
auto platforms = sycl::platform::get_platforms();
for (auto p : platforms) {
auto devices = p.get_devices(sycl::info::device_type::gpu);
Expand Down
3 changes: 2 additions & 1 deletion mllib-dal/src/main/native/GPU.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
#include "service.h"
#include <CL/cl.h>
#include <CL/sycl.hpp>
#include <daal_sycl.h>
#include <jni.h>
#include <oneapi/ccl.hpp>

sycl::queue getAssignedGPU(const ComputeDevice device, jint *gpu_indices);

sycl::queue getQueue(const ComputeDevice device);

std::vector<sycl::device> get_gpus();
19 changes: 4 additions & 15 deletions mllib-dal/src/main/native/KMeansImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,25 +338,14 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

auto queue = getAssignedGPU(device, gpuIndices);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
logger::println(logger::INFO,
"OneDAL (native): use GPU kernels with rankid %d",
rank);
auto comm = getDalComm();
ret = doKMeansOneAPICompute(env, pNumTabData, numRows, numCols,
pNumTabCenters, clusterNum, tolerance,
iterationNum, comm, resultObj);

env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
break;
}
#endif
Expand Down
30 changes: 11 additions & 19 deletions mllib-dal/src/main/native/LinearRegressionImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,17 @@ ridge_regression_compute(size_t rankId, ccl::communicator &comm,
}

#ifdef CPU_GPU_PROFILE
static jlong doLROneAPICompute(JNIEnv *env, size_t rankId, sycl::queue &queue,
jlong pNumTabFeature, jlong featureRows,
jlong featureCols, jlong pNumTabLabel,
jlong labelCols, jboolean jfitIntercept,
jint executorNum, jobject resultObj) {
static jlong doLROneAPICompute(
JNIEnv *env, size_t rankId,
preview::spmd::communicator<preview::spmd::device_memory_access::usm> comm,
jlong pNumTabFeature, jlong featureRows, jlong featureCols,
jlong pNumTabLabel, jlong labelCols, jboolean jfitIntercept,
jint executorNum, jobject resultObj) {
logger::println(logger::INFO,
"oneDAL (native): GPU compute start , rankid %d", rankId);
const bool isRoot = (rankId == ccl_root);
bool fitIntercept = bool(jfitIntercept);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm = preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rankId, kvs);
homogen_table xtrain = *reinterpret_cast<homogen_table *>(
createHomogenTableWithArrayPtr(pNumTabFeature, featureRows, featureCols,
comm.get_queue())
Expand Down Expand Up @@ -279,19 +277,13 @@ Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTra
jlong resultptr = 0L;
if (useGPU) {
#ifdef CPU_GPU_PROFILE
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);
auto queue = getAssignedGPU(device, gpuIndices);

resultptr = doLROneAPICompute(env, rank, queue, feature, featureRows,
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);
auto comm = getDalComm();
resultptr = doLROneAPICompute(env, rank, comm, feature, featureRows,
featureCols, label, labelCols,
fitIntercept, executorNum, resultObj);
env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
#endif
} else {
ccl::communicator &cclComm = getComm();
Expand Down
64 changes: 55 additions & 9 deletions mllib-dal/src/main/native/OneCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <oneapi/ccl.hpp>

#include "CCLInitSingleton.hpp"
#include "GPU.h"
#include "Logger.h"
#include "OneCCL.h"
#include "com_intel_oap_mllib_OneCCL__.h"
Expand All @@ -43,9 +44,18 @@ static size_t rank_id = 0;
static std::vector<ccl::communicator> g_comms;
static std::vector<ccl::shared_ptr_class<ccl::kvs>> g_kvs;

ccl::communicator &getComm() { return g_comms[0]; }
ccl::shared_ptr_class<ccl::kvs> &getKvs() { return g_kvs[0]; }

ccl::communicator &getComm() { return g_comms[0]; }
#ifdef CPU_GPU_PROFILE
static std::vector<oneapi::dal::preview::spmd::communicator<
oneapi::dal::preview::spmd::device_memory_access::usm>>
g_dal_comms;
oneapi::dal::preview::spmd::communicator<
oneapi::dal::preview::spmd::device_memory_access::usm> &
getDalComm() {
return g_dal_comms[0];
}
#endif
JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init(
JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port,
jobject param) {
Expand All @@ -55,26 +65,57 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init(
auto t1 = std::chrono::high_resolution_clock::now();

ccl::init();

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::println(logger::INFO, "OneCCL (native): init took %f secs",
duration / 1000);
const char *str = env->GetStringUTFChars(ip_port, 0);
ccl::string ccl_ip_port(str);

#ifdef CPU_ONLY_PROFILE
auto &singletonCCLInit = CCLInitSingleton::get(size, rank, ccl_ip_port);

g_kvs.push_back(singletonCCLInit.kvs);

#ifdef CPU_ONLY_PROFILE
g_comms.push_back(
ccl::create_communicator(size, rank, singletonCCLInit.kvs));

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
rank_id = getComm().rank();
comm_size = getComm().size();

#endif

#ifdef CPU_GPU_PROFILE
t1 = std::chrono::high_resolution_clock::now();
auto kvs_attr = ccl::create_kvs_attr();

kvs_attr.set<ccl::kvs_attr_id::ip_port>(ccl_ip_port);

ccl::shared_ptr_class<ccl::kvs> kvs = ccl::create_main_kvs(kvs_attr);

t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::println(logger::INFO, "OneCCL (native): init took %f secs",
logger::println(logger::INFO, "OneCCL (native): create kvs took %f secs",
duration / 1000);
auto gpus = get_gpus();
sycl::queue queue{gpus[0]};
t1 = std::chrono::high_resolution_clock::now();
auto comm = oneapi::dal::preview::spmd::make_communicator<
oneapi::dal::preview::spmd::backend::ccl>(queue, size, rank, kvs);
t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::println(logger::INFO,
"OneCCL (native): create communicator took %f secs",
duration / 1000);
g_dal_comms.push_back(comm);
rank_id = getDalComm().get_rank();
comm_size = getDalComm().get_rank_count();
#endif

jclass cls = env->GetObjectClass(param);
jfieldID fid_comm_size = env->GetFieldID(cls, "commSize", "J");
jfieldID fid_rank_id = env->GetFieldID(cls, "rankId", "J");
Expand All @@ -89,8 +130,13 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init(
JNIEXPORT void JNICALL
Java_com_intel_oap_mllib_OneCCL_00024_c_1cleanup(JNIEnv *env, jobject obj) {
logger::printerrln(logger::INFO, "OneCCL (native): cleanup");
#ifdef CPU_ONLY_PROFILE
g_kvs.pop_back();
g_comms.pop_back();
#endif
#ifdef CPU_GPU_PROFILE
g_dal_comms.pop_back();
#endif
}

JNIEXPORT jboolean JNICALL
Expand Down
12 changes: 11 additions & 1 deletion mllib-dal/src/main/native/OneCCL.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#pragma once

#include <oneapi/ccl.hpp>

#include <vector>

using namespace std;

namespace ccl {
Expand All @@ -44,4 +44,14 @@ event CCL_API gather(const BufferType *sendbuf, int sendcount,

ccl::communicator &getComm();
ccl::shared_ptr_class<ccl::kvs> &getKvs();

#ifdef CPU_GPU_PROFILE
#ifndef ONEDAL_DATA_PARALLEL
#define ONEDAL_DATA_PARALLEL
#endif
#include "Communicator.hpp"
oneapi::dal::preview::spmd::communicator<
oneapi::dal::preview::spmd::device_memory_access::usm> &
getDalComm();
#endif
extern const size_t ccl_root;
18 changes: 4 additions & 14 deletions mllib-dal/src/main/native/PCAImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,22 +277,12 @@ Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL(
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

auto queue = getAssignedGPU(device, gpuIndices);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
auto comm = getDalComm();
doPCAOneAPICompute(env, pNumTabData, numRows, numCols, comm, resultObj);
env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
break;
}
#endif
Expand Down
19 changes: 5 additions & 14 deletions mllib-dal/src/main/native/SummarizerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,22 +295,13 @@ Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL(
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);
auto queue = getAssignedGPU(device, gpuIndices);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

auto comm = getDalComm();
doSummarizerOneAPICompute(env, pNumTabData, numRows, numCols, comm,
resultObj);
env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
break;
}
#endif
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a1a62a6

Please sign in to comment.