Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-380] Optimized code and reduced spark job on running GPU. #384

Merged
merged 40 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
be55d63
update spark to 3.3.3
minmingzhu Sep 18, 2023
6d2b055
Merge branch 'oap-project:master' into master
minmingzhu Sep 18, 2023
833444e
Merge branch 'oap-project:master' into master
minmingzhu Sep 19, 2023
23f0491
Merge branch 'oap-project:master' into master
minmingzhu Sep 19, 2023
f42d3be
Merge branch 'oap-project:master' into master
minmingzhu Sep 21, 2023
db6e64c
Merge branch 'oap-project:master' into master
minmingzhu Sep 26, 2023
f3f58bb
Merge branch 'oap-project:master' into master
minmingzhu Oct 9, 2023
262a746
Merge branch 'oap-project:master' into master
minmingzhu Oct 11, 2023
7802b2d
Merge branch 'oap-project:master' into master
minmingzhu Mar 5, 2024
efb9097
Merge branch 'oap-project:master' into master
minmingzhu Mar 25, 2024
c3f570b
Merge branch 'oap-project:master' into master
minmingzhu Mar 26, 2024
3bee6bb
Merge branch 'oap-project:master' into master
minmingzhu Jul 22, 2024
5d08ac0
Merge branch 'oap-project:master' into master
minmingzhu Jul 22, 2024
49bdc75
Merge branch 'oap-project:master' into master
minmingzhu Sep 10, 2024
0a44fbe
Merge branch 'oap-project:master' into master
minmingzhu Sep 10, 2024
7ff407c
Merge branch 'oap-project:master' into master
minmingzhu Sep 10, 2024
c29e617
Merge branch 'oap-project:master' into master
minmingzhu Oct 16, 2024
93f3eaa
remove oneccl communicator
minmingzhu Jul 25, 2024
589c0c1
update
minmingzhu Aug 15, 2024
3a0ed9b
optimize code
minmingzhu Aug 18, 2024
23a81e5
update optimize code
minmingzhu Aug 19, 2024
462716e
update
minmingzhu Sep 10, 2024
1d19d96
format code style
minmingzhu Sep 10, 2024
f657977
update
minmingzhu Sep 10, 2024
1f657f9
update
minmingzhu Sep 10, 2024
a1eb9bc
format code style
minmingzhu Sep 10, 2024
f72beaf
update
minmingzhu Sep 10, 2024
eedd941
update
minmingzhu Sep 10, 2024
37f936a
update
minmingzhu Oct 15, 2024
5bd92e2
update
minmingzhu Oct 17, 2024
abc5cde
update
minmingzhu Oct 18, 2024
b429512
update
minmingzhu Oct 18, 2024
4487b66
update
minmingzhu Oct 18, 2024
f8c9a59
update
minmingzhu Oct 18, 2024
e98c2ef
update
minmingzhu Oct 18, 2024
f489126
update
minmingzhu Oct 21, 2024
66588a9
update
minmingzhu Oct 21, 2024
f2813e3
update
minmingzhu Oct 21, 2024
1c799ed
update
minmingzhu Oct 21, 2024
c0a2cf3
update
minmingzhu Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion mllib-dal/src/main/native/Common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@
#endif

#include "GPU.h"
#include "Communicator.hpp"
#include "oneapi/dal/table/homogen.hpp"
18 changes: 4 additions & 14 deletions mllib-dal/src/main/native/CorrelationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,23 +225,13 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

auto queue = getAssignedGPU(device, gpuIndices);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
auto comm = getDalComm();
doCorrelationOneAPICompute(env, pNumTabData, numRows, numCols, comm,
resultObj);
env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
break;
}
#endif
Expand Down
19 changes: 4 additions & 15 deletions mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,22 +313,11 @@ Java_com_intel_oap_mllib_classification_RandomForestClassifierDALImpl_cRFClassif
ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
switch (device) {
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);

auto queue = getAssignedGPU(device, gpuIndices);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
auto comm = getDalComm();
jobject hashmapObj = doRFClassifierOneAPICompute(
env, pNumTabFeature, featureRows, featureCols, pNumTabLabel,
labelCols, executorNum, computeDeviceOrdinal, classCount, treeCount,
Expand Down
17 changes: 4 additions & 13 deletions mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,20 +305,11 @@ Java_com_intel_oap_mllib_regression_RandomForestRegressorDALImpl_cRFRegressorTra
ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
switch (device) {
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

auto queue = getAssignedGPU(device, gpuIndices);
logger::println(logger::INFO,
"OneDAL (native): use GPU kernels with rankid %d",
rank);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
auto comm = getDalComm();
jobject hashmapObj = doRFRegressorOneAPICompute(
env, pNumTabFeature, featureRows, featureCols, pNumTabLabel,
labelCols, executorNum, computeDeviceOrdinal, treeCount,
Expand Down
2 changes: 1 addition & 1 deletion mllib-dal/src/main/native/GPU.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ typedef std::shared_ptr<sycl::queue> queuePtr;
static std::mutex g_mtx;
static std::vector<sycl::queue> g_queueVector;

static std::vector<sycl::device> get_gpus() {
std::vector<sycl::device> get_gpus() {
auto platforms = sycl::platform::get_platforms();
for (auto p : platforms) {
auto devices = p.get_devices(sycl::info::device_type::gpu);
Expand Down
3 changes: 2 additions & 1 deletion mllib-dal/src/main/native/GPU.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
#include "service.h"
#include <CL/cl.h>
#include <CL/sycl.hpp>
#include <daal_sycl.h>
#include <jni.h>
#include <oneapi/ccl.hpp>

sycl::queue getAssignedGPU(const ComputeDevice device, jint *gpu_indices);

sycl::queue getQueue(const ComputeDevice device);

std::vector<sycl::device> get_gpus();
19 changes: 4 additions & 15 deletions mllib-dal/src/main/native/KMeansImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,25 +338,14 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

auto queue = getAssignedGPU(device, gpuIndices);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
logger::println(logger::INFO,
"OneDAL (native): use GPU kernels with rankid %d",
rank);
auto comm = getDalComm();
ret = doKMeansOneAPICompute(env, pNumTabData, numRows, numCols,
pNumTabCenters, clusterNum, tolerance,
iterationNum, comm, resultObj);

env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
break;
}
#endif
Expand Down
30 changes: 11 additions & 19 deletions mllib-dal/src/main/native/LinearRegressionImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,17 @@ ridge_regression_compute(size_t rankId, ccl::communicator &comm,
}

#ifdef CPU_GPU_PROFILE
static jlong doLROneAPICompute(JNIEnv *env, size_t rankId, sycl::queue &queue,
jlong pNumTabFeature, jlong featureRows,
jlong featureCols, jlong pNumTabLabel,
jlong labelCols, jboolean jfitIntercept,
jint executorNum, jobject resultObj) {
static jlong doLROneAPICompute(
JNIEnv *env, size_t rankId,
preview::spmd::communicator<preview::spmd::device_memory_access::usm> comm,
jlong pNumTabFeature, jlong featureRows, jlong featureCols,
jlong pNumTabLabel, jlong labelCols, jboolean jfitIntercept,
jint executorNum, jobject resultObj) {
logger::println(logger::INFO,
"oneDAL (native): GPU compute start , rankid %d", rankId);
const bool isRoot = (rankId == ccl_root);
bool fitIntercept = bool(jfitIntercept);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm = preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rankId, kvs);
homogen_table xtrain = *reinterpret_cast<homogen_table *>(
createHomogenTableWithArrayPtr(pNumTabFeature, featureRows, featureCols,
comm.get_queue())
Expand Down Expand Up @@ -279,19 +277,13 @@ Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTra
jlong resultptr = 0L;
if (useGPU) {
#ifdef CPU_GPU_PROFILE
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);
auto queue = getAssignedGPU(device, gpuIndices);

resultptr = doLROneAPICompute(env, rank, queue, feature, featureRows,
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);
auto comm = getDalComm();
resultptr = doLROneAPICompute(env, rank, comm, feature, featureRows,
featureCols, label, labelCols,
fitIntercept, executorNum, resultObj);
env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
#endif
} else {
ccl::communicator &cclComm = getComm();
Expand Down
64 changes: 55 additions & 9 deletions mllib-dal/src/main/native/OneCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <oneapi/ccl.hpp>

#include "CCLInitSingleton.hpp"
#include "GPU.h"
#include "Logger.h"
#include "OneCCL.h"
#include "com_intel_oap_mllib_OneCCL__.h"
Expand All @@ -43,9 +44,18 @@ static size_t rank_id = 0;
static std::vector<ccl::communicator> g_comms;
static std::vector<ccl::shared_ptr_class<ccl::kvs>> g_kvs;

ccl::communicator &getComm() { return g_comms[0]; }
ccl::shared_ptr_class<ccl::kvs> &getKvs() { return g_kvs[0]; }

ccl::communicator &getComm() { return g_comms[0]; }
#ifdef CPU_GPU_PROFILE
static std::vector<oneapi::dal::preview::spmd::communicator<
oneapi::dal::preview::spmd::device_memory_access::usm>>
g_dal_comms;
oneapi::dal::preview::spmd::communicator<
oneapi::dal::preview::spmd::device_memory_access::usm> &
getDalComm() {
return g_dal_comms[0];
}
#endif
JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init(
JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port,
jobject param) {
Expand All @@ -55,26 +65,57 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init(
auto t1 = std::chrono::high_resolution_clock::now();

ccl::init();

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::println(logger::INFO, "OneCCL (native): init took %f secs",
duration / 1000);
const char *str = env->GetStringUTFChars(ip_port, 0);
ccl::string ccl_ip_port(str);

#ifdef CPU_ONLY_PROFILE
auto &singletonCCLInit = CCLInitSingleton::get(size, rank, ccl_ip_port);

g_kvs.push_back(singletonCCLInit.kvs);

#ifdef CPU_ONLY_PROFILE
g_comms.push_back(
ccl::create_communicator(size, rank, singletonCCLInit.kvs));

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
rank_id = getComm().rank();
comm_size = getComm().size();

#endif

#ifdef CPU_GPU_PROFILE
t1 = std::chrono::high_resolution_clock::now();
auto kvs_attr = ccl::create_kvs_attr();

kvs_attr.set<ccl::kvs_attr_id::ip_port>(ccl_ip_port);

ccl::shared_ptr_class<ccl::kvs> kvs = ccl::create_main_kvs(kvs_attr);

t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::println(logger::INFO, "OneCCL (native): init took %f secs",
logger::println(logger::INFO, "OneCCL (native): create kvs took %f secs",
duration / 1000);
auto gpus = get_gpus();
sycl::queue queue{gpus[0]};
t1 = std::chrono::high_resolution_clock::now();
auto comm = oneapi::dal::preview::spmd::make_communicator<
oneapi::dal::preview::spmd::backend::ccl>(queue, size, rank, kvs);
t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::println(logger::INFO,
"OneCCL (native): create communicator took %f secs",
duration / 1000);
g_dal_comms.push_back(comm);
rank_id = getDalComm().get_rank();
comm_size = getDalComm().get_rank_count();
#endif

jclass cls = env->GetObjectClass(param);
jfieldID fid_comm_size = env->GetFieldID(cls, "commSize", "J");
jfieldID fid_rank_id = env->GetFieldID(cls, "rankId", "J");
Expand All @@ -89,8 +130,13 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init(
JNIEXPORT void JNICALL
Java_com_intel_oap_mllib_OneCCL_00024_c_1cleanup(JNIEnv *env, jobject obj) {
logger::printerrln(logger::INFO, "OneCCL (native): cleanup");
#ifdef CPU_ONLY_PROFILE
g_kvs.pop_back();
g_comms.pop_back();
#endif
#ifdef CPU_GPU_PROFILE
g_dal_comms.pop_back();
#endif
}

JNIEXPORT jboolean JNICALL
Expand Down
12 changes: 11 additions & 1 deletion mllib-dal/src/main/native/OneCCL.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#pragma once

#include <oneapi/ccl.hpp>

#include <vector>

using namespace std;

namespace ccl {
Expand All @@ -44,4 +44,14 @@ event CCL_API gather(const BufferType *sendbuf, int sendcount,

ccl::communicator &getComm();
ccl::shared_ptr_class<ccl::kvs> &getKvs();

#ifdef CPU_GPU_PROFILE
#ifndef ONEDAL_DATA_PARALLEL
#define ONEDAL_DATA_PARALLEL
#endif
#include "Communicator.hpp"
oneapi::dal::preview::spmd::communicator<
oneapi::dal::preview::spmd::device_memory_access::usm> &
getDalComm();
#endif
extern const size_t ccl_root;
18 changes: 4 additions & 14 deletions mllib-dal/src/main/native/PCAImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,22 +277,12 @@ Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL(
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

auto queue = getAssignedGPU(device, gpuIndices);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
auto comm = getDalComm();
doPCAOneAPICompute(env, pNumTabData, numRows, numCols, comm, resultObj);
env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
break;
}
#endif
Expand Down
19 changes: 5 additions & 14 deletions mllib-dal/src/main/native/SummarizerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,22 +295,13 @@ Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL(
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);
auto queue = getAssignedGPU(device, gpuIndices);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

auto comm = getDalComm();
doSummarizerOneAPICompute(env, pNumTabData, numRows, numCols, comm,
resultObj);
env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
break;
}
#endif
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading