Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-388]Improved Communicator code, removed redundant code #390

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
be55d63
update spark to 3.3.3
minmingzhu Sep 18, 2023
6d2b055
Merge branch 'oap-project:master' into master
minmingzhu Sep 18, 2023
833444e
Merge branch 'oap-project:master' into master
minmingzhu Sep 19, 2023
23f0491
Merge branch 'oap-project:master' into master
minmingzhu Sep 19, 2023
f42d3be
Merge branch 'oap-project:master' into master
minmingzhu Sep 21, 2023
db6e64c
Merge branch 'oap-project:master' into master
minmingzhu Sep 26, 2023
f3f58bb
Merge branch 'oap-project:master' into master
minmingzhu Oct 9, 2023
262a746
Merge branch 'oap-project:master' into master
minmingzhu Oct 11, 2023
7802b2d
Merge branch 'oap-project:master' into master
minmingzhu Mar 5, 2024
efb9097
Merge branch 'oap-project:master' into master
minmingzhu Mar 25, 2024
c3f570b
Merge branch 'oap-project:master' into master
minmingzhu Mar 26, 2024
3bee6bb
Merge branch 'oap-project:master' into master
minmingzhu Jul 22, 2024
5d08ac0
Merge branch 'oap-project:master' into master
minmingzhu Jul 22, 2024
49bdc75
Merge branch 'oap-project:master' into master
minmingzhu Sep 10, 2024
0a44fbe
Merge branch 'oap-project:master' into master
minmingzhu Sep 10, 2024
7ff407c
Merge branch 'oap-project:master' into master
minmingzhu Sep 10, 2024
e607f04
remove oneccl communicator
minmingzhu Jul 25, 2024
cff719a
update
minmingzhu Aug 15, 2024
8b5d91d
optimize code
minmingzhu Aug 18, 2024
6e1eb5b
update optimize code
minmingzhu Aug 19, 2024
d0846f1
update
minmingzhu Sep 10, 2024
12df1cb
format code style
minmingzhu Sep 10, 2024
ccfc4dc
update
minmingzhu Sep 10, 2024
ea585f0
update
minmingzhu Sep 10, 2024
62c0cc5
format code style
minmingzhu Sep 10, 2024
154c2d7
update
minmingzhu Sep 10, 2024
2d59b86
update
minmingzhu Sep 10, 2024
9913f82
remove oneccl communicator
minmingzhu Jul 25, 2024
c97bf7a
update
minmingzhu Aug 15, 2024
7c85bd9
set ZE_AFFINITY_MASK=rankId
minmingzhu Aug 16, 2024
43b9795
optimize code
minmingzhu Aug 18, 2024
6744a69
improve communicator code
minmingzhu Aug 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions mllib-dal/src/main/native/CCLInitSingleton.hpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
/*******************************************************************************
* Copyright 2020 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
* Copyright 2020 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/

#pragma once
#include <iostream>
Expand All @@ -21,10 +21,10 @@
#include "Logger.h"

class CCLInitSingleton {
public:
public:
ccl::shared_ptr_class<ccl::kvs> kvs;

static CCLInitSingleton& get(int size, int rank, ccl::string ccl_ip_port) {
static CCLInitSingleton &get(int size, int rank, ccl::string ccl_ip_port) {
static std::once_flag flag;
static CCLInitSingleton instance;
std::call_once(flag, [size, rank, ccl_ip_port] {
Expand All @@ -33,9 +33,8 @@ class CCLInitSingleton {
return instance;
}

private:
CCLInitSingleton() {
}
private:
CCLInitSingleton() {}

CCLInitSingleton(int size, int rank, ccl::string ccl_ip_port) {
auto t1 = std::chrono::high_resolution_clock::now();
Expand All @@ -49,7 +48,9 @@ class CCLInitSingleton {

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 -
t1)
.count();

logger::println(logger::INFO, "OneCCL singleton init took %f secs",
duration / 1000);
Expand Down
1 change: 0 additions & 1 deletion mllib-dal/src/main/native/Common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@
#endif

#include "GPU.h"
#include "Communicator.hpp"
#include "oneapi/dal/table/homogen.hpp"
30 changes: 6 additions & 24 deletions mllib-dal/src/main/native/Communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,29 @@

#include "oneapi/ccl.hpp"
#include "oneapi/dal/detail/ccl/communicator.hpp"
#include "Singleton.hpp"

namespace de = oneapi::dal::detail;
namespace oneapi::dal::preview::spmd {

namespace backend {
struct ccl {};
} // namespace backend
class ccl_info {
friend class de::singleton<ccl_info>;

private:
ccl_info(int size, int rankId, ccl::shared_ptr_class<ccl::kvs> keyvs) {
rank = rankId;
rank_count = size;
kvs = keyvs;
}

public:
ccl::shared_ptr_class<ccl::kvs> kvs;
int rank;
int rank_count;
};

template <typename Backend>
communicator<device_memory_access::none> make_communicator(int size, int rank, const ccl::shared_ptr_class<ccl::kvs> kvs) {
auto& info = de::singleton<ccl_info>::get(size, rank, kvs);
// integral cast
return oneapi::dal::detail::ccl_communicator<device_memory_access::none>{ info.kvs,
info.rank,
info.rank_count };
return oneapi::dal::detail::ccl_communicator<device_memory_access::none>{ kvs,
rank,
size };
}

template <typename Backend>
communicator<device_memory_access::usm> make_communicator(sycl::queue& queue, int size, int rank, const ccl::shared_ptr_class<ccl::kvs> kvs) {
auto& info = de::singleton<ccl_info>::get(size, rank, kvs);
return oneapi::dal::detail::ccl_communicator<device_memory_access::usm>{
queue,
info.kvs,
oneapi::dal::detail::integral_cast<std::int64_t>(info.rank),
oneapi::dal::detail::integral_cast<std::int64_t>(info.rank_count)
kvs,
oneapi::dal::detail::integral_cast<std::int64_t>(rank),
oneapi::dal::detail::integral_cast<std::int64_t>(size)
};
}

Expand Down
23 changes: 9 additions & 14 deletions mllib-dal/src/main/native/CorrelationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ JNIEXPORT jlong JNICALL
Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(
JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows,
jlong numCols, jint executorNum, jint executorCores,
jint computeDeviceOrdinal, jintArray gpuIdxArray, jobject resultObj) {
jint computeDeviceOrdinal, jintArray gpuIdxArray, jstring ip_port,
jobject resultObj) {
logger::println(logger::INFO,
"oneDAL (native): use DPC++ kernels; device %s",
ComputeDeviceString[computeDeviceOrdinal].c_str());
Expand All @@ -225,23 +226,17 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

auto queue = getAssignedGPU(device, gpuIndices);
const char *str = env->GetStringUTFChars(ip_port, nullptr);
ccl::string ccl_ip_port(str);
auto comm = createDalCommunicator(executorNum, rank, ccl_ip_port);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
doCorrelationOneAPICompute(env, pNumTabData, numRows, numCols, comm,
resultObj);
env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
env->ReleaseStringUTFChars(ip_port, str);
break;
}
#endif
Expand Down
23 changes: 8 additions & 15 deletions mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,36 +306,29 @@ Java_com_intel_oap_mllib_classification_RandomForestClassifierDALImpl_cRFClassif
jint treeCount, jint numFeaturesPerNode, jint minObservationsLeafNode,
jint minObservationsSplitNode, jdouble minWeightFractionLeafNode,
jdouble minImpurityDecreaseSplitNode, jint maxTreeDepth, jlong seed,
jint maxBins, jboolean bootstrap, jintArray gpuIdxArray,
jint maxBins, jboolean bootstrap, jintArray gpuIdxArray, jstring ip_port,
jobject resultObj) {
logger::println(logger::INFO, "oneDAL (native): use DPC++ kernels");

ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
switch (device) {
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);

ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
logger::println(logger::INFO,
"oneDAL (native): use GPU kernels with rankid %d",
rank);

auto queue = getAssignedGPU(device, gpuIndices);
const char *str = env->GetStringUTFChars(ip_port, nullptr);
ccl::string ccl_ip_port(str);
auto comm = createDalCommunicator(executorNum, rank, ccl_ip_port);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
jobject hashmapObj = doRFClassifierOneAPICompute(
env, pNumTabFeature, featureRows, featureCols, pNumTabLabel,
labelCols, executorNum, computeDeviceOrdinal, classCount, treeCount,
numFeaturesPerNode, minObservationsLeafNode,
minObservationsSplitNode, minWeightFractionLeafNode,
minImpurityDecreaseSplitNode, maxTreeDepth, seed, maxBins,
bootstrap, comm, resultObj);
env->ReleaseStringUTFChars(ip_port, str);
return hashmapObj;
}
default: {
Expand Down
21 changes: 8 additions & 13 deletions mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,33 +297,28 @@ Java_com_intel_oap_mllib_regression_RandomForestRegressorDALImpl_cRFRegressorTra
jint executorNum, jint computeDeviceOrdinal, jint treeCount,
jint numFeaturesPerNode, jint minObservationsLeafNode, jint maxTreeDepth,
jlong seed, jint maxbins, jboolean bootstrap, jintArray gpuIdxArray,
jobject resultObj) {
jstring ip_port, jobject resultObj) {
logger::println(logger::INFO,
"OneDAL (native): use DPC++ kernels; device %s",
ComputeDeviceString[computeDeviceOrdinal].c_str());

ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal);
switch (device) {
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);
logger::println(logger::INFO,
"OneDAL (native): use GPU kernels with rankid %d",
rank);

auto queue = getAssignedGPU(device, gpuIndices);
const char *str = env->GetStringUTFChars(ip_port, nullptr);
ccl::string ccl_ip_port(str);
auto comm = createDalCommunicator(executorNum, rank, ccl_ip_port);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
jobject hashmapObj = doRFRegressorOneAPICompute(
env, pNumTabFeature, featureRows, featureCols, pNumTabLabel,
labelCols, executorNum, computeDeviceOrdinal, treeCount,
numFeaturesPerNode, minObservationsLeafNode, maxTreeDepth, seed,
maxbins, bootstrap, comm, resultObj);
env->ReleaseStringUTFChars(ip_port, str);
return hashmapObj;
}
default: {
Expand Down
43 changes: 42 additions & 1 deletion mllib-dal/src/main/native/GPU.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ static std::vector<sycl::device> get_gpus() {
}

static int getLocalRank(ccl::communicator &comm, int size, int rank) {
const int MPI_MAX_PROCESSOR_NAME = 128;
/* Obtain local rank among nodes sharing the same host name */
char zero = static_cast<char>(0);
std::vector<char> name(MPI_MAX_PROCESSOR_NAME + 1, zero);
Expand Down Expand Up @@ -113,3 +112,45 @@ sycl::queue getQueue(const ComputeDevice device) {
}
}
}

preview::spmd::communicator<preview::spmd::device_memory_access::usm>
createDalCommunicator(const jint executorNum, const jint rank,
const ccl::string ccl_ip_port) {
auto gpus = get_gpus();

auto t1 = std::chrono::high_resolution_clock::now();

ccl::init();

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();

logger::println(logger::INFO, "OneCCL singleton init took %f secs",
duration / 1000);

t1 = std::chrono::high_resolution_clock::now();

auto kvs_attr = ccl::create_kvs_attr();

kvs_attr.set<ccl::kvs_attr_id::ip_port>(ccl_ip_port);

ccl::shared_ptr_class<ccl::kvs> kvs = ccl::create_main_kvs(kvs_attr);

t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::println(logger::INFO, "OneCCL (native): create kvs took %f secs",
duration / 1000);
sycl::queue queue{gpus[0]};
t1 = std::chrono::high_resolution_clock::now();
auto comm = preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
return comm;
}
5 changes: 4 additions & 1 deletion mllib-dal/src/main/native/GPU.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
#include "service.h"
#include <CL/cl.h>
#include <CL/sycl.hpp>
#include <daal_sycl.h>
#include <jni.h>
#include <oneapi/ccl.hpp>

#include "Communicator.hpp"

sycl::queue getAssignedGPU(const ComputeDevice device, jint *gpu_indices);

sycl::queue getQueue(const ComputeDevice device);
preview::spmd::communicator<preview::spmd::device_memory_access::usm>
createDalCommunicator(jint executorNum, jint rank, ccl::string ccl_ip_port);
23 changes: 9 additions & 14 deletions mllib-dal/src/main/native/KMeansImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe
JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows,
jlong numCols, jlong pNumTabCenters, jint clusterNum, jdouble tolerance,
jint iterationNum, jint executorNum, jint executorCores,
jint computeDeviceOrdinal, jintArray gpuIdxArray, jobject resultObj) {
jint computeDeviceOrdinal, jintArray gpuIdxArray, jstring ip_port,
jobject resultObj) {
logger::println(logger::INFO,
"OneDAL (native): use DPC++ kernels; device %s",
ComputeDeviceString[computeDeviceOrdinal].c_str());
Expand Down Expand Up @@ -338,25 +339,19 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);
logger::println(logger::INFO,
"OneDAL (native): use GPU kernels with rankid %d",
rank);

auto queue = getAssignedGPU(device, gpuIndices);
const char *str = env->GetStringUTFChars(ip_port, nullptr);
ccl::string ccl_ip_port(str);
auto comm = createDalCommunicator(executorNum, rank, ccl_ip_port);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
ret = doKMeansOneAPICompute(env, pNumTabData, numRows, numCols,
pNumTabCenters, clusterNum, tolerance,
iterationNum, comm, resultObj);

env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
env->ReleaseStringUTFChars(ip_port, str);
break;
}
#endif
Expand Down
Loading
Loading