From a1a62a66dcf80e594b24b0fc2fc69343af3b979b Mon Sep 17 00:00:00 2001 From: minmingzhu <45281494+minmingzhu@users.noreply.github.com> Date: Wed, 23 Oct 2024 16:18:24 +0800 Subject: [PATCH] [ML-380] Optimized code and reduced spark job on running GPU. (#384) * update spark to 3.3.3 Signed-off-by: minmingzhu * remove oneccl communicator * update * optimize code * update optimize code * update * format code style * update * update * format code style * update * update * update * update * update * update * update * update * update * update * update * update * update * update --------- Signed-off-by: minmingzhu --- mllib-dal/src/main/native/Common.hpp | 1 - mllib-dal/src/main/native/CorrelationImpl.cpp | 18 ++---- .../native/DecisionForestClassifierImpl.cpp | 19 ++---- .../native/DecisionForestRegressorImpl.cpp | 17 ++--- mllib-dal/src/main/native/GPU.cpp | 2 +- mllib-dal/src/main/native/GPU.h | 3 +- mllib-dal/src/main/native/KMeansImpl.cpp | 19 ++---- .../src/main/native/LinearRegressionImpl.cpp | 30 ++++----- mllib-dal/src/main/native/OneCCL.cpp | 64 ++++++++++++++++--- mllib-dal/src/main/native/OneCCL.h | 12 +++- mllib-dal/src/main/native/PCAImpl.cpp | 18 ++---- mllib-dal/src/main/native/SummarizerImpl.cpp | 19 ++---- .../javah/com_intel_oap_mllib_OneCCL__.h | 2 +- mllib-dal/src/main/native/service.h | 16 ++--- .../scala/com/intel/oap/mllib/CommonJob.scala | 39 +++++++++++ .../RandomForestClassifierDALImpl.scala | 18 +----- .../oap/mllib/clustering/KMeansDALImpl.scala | 19 +----- .../intel/oap/mllib/feature/PCADALImpl.scala | 18 +----- .../regression/LinearRegressionDALImpl.scala | 15 +---- .../RandomForestRegressorDALImpl.scala | 18 +----- .../oap/mllib/stat/CorrelationDALImpl.scala | 18 +----- .../oap/mllib/stat/SummarizerDALImpl.scala | 18 +----- .../com/intel/oap/mllib/TestCommon.scala | 2 + 23 files changed, 170 insertions(+), 235 deletions(-) create mode 100644 mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala diff --git a/mllib-dal/src/main/native/Common.hpp b/mllib-dal/src/main/native/Common.hpp index 5ead8c8c1..f6606f225 100644 --- a/mllib-dal/src/main/native/Common.hpp +++ b/mllib-dal/src/main/native/Common.hpp @@ -21,5 +21,4 @@ #endif #include "GPU.h" -#include "Communicator.hpp" #include "oneapi/dal/table/homogen.hpp" diff --git a/mllib-dal/src/main/native/CorrelationImpl.cpp b/mllib-dal/src/main/native/CorrelationImpl.cpp index 5f4b47cfd..d868856e3 100644 --- a/mllib-dal/src/main/native/CorrelationImpl.cpp +++ b/mllib-dal/src/main/native/CorrelationImpl.cpp @@ -225,23 +225,13 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL( } #ifdef CPU_GPU_PROFILE case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rank); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - - auto queue = getAssignedGPU(device, gpuIndices); + logger::println(logger::INFO, + "oneDAL (native): use GPU kernels with rankid %d", + rank); - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, executorNum, rank, kvs); + auto comm = getDalComm(); doCorrelationOneAPICompute(env, pNumTabData, numRows, numCols, comm, resultObj); - env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); break; } #endif diff --git a/mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp b/mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp index f383eb7b9..83ce18126 100644 --- a/mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp +++ b/mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp @@ -313,22 +313,11 @@ Java_com_intel_oap_mllib_classification_RandomForestClassifierDALImpl_cRFClassif ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); switch (device) { case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rank); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - - ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); - - auto queue = getAssignedGPU(device, gpuIndices); + logger::println(logger::INFO, + "oneDAL (native): use GPU kernels with rankid %d", + rank); - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, executorNum, rank, kvs); + auto comm = getDalComm(); jobject hashmapObj = doRFClassifierOneAPICompute( env, pNumTabFeature, featureRows, featureCols, pNumTabLabel, labelCols, executorNum, computeDeviceOrdinal, classCount, treeCount, diff --git a/mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp b/mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp index ca1fb2aaa..ddd099e41 100644 --- a/mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp +++ b/mllib-dal/src/main/native/DecisionForestRegressorImpl.cpp @@ -305,20 +305,11 @@ Java_com_intel_oap_mllib_regression_RandomForestRegressorDALImpl_cRFRegressorTra ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); switch (device) { case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rank); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - - auto queue = getAssignedGPU(device, gpuIndices); + logger::println(logger::INFO, + "OneDAL (native): use GPU kernels with rankid %d", + rank); - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, executorNum, rank, kvs); + auto comm = getDalComm(); jobject hashmapObj = doRFRegressorOneAPICompute( env, pNumTabFeature, featureRows, featureCols, pNumTabLabel, labelCols, executorNum, computeDeviceOrdinal, treeCount, diff --git a/mllib-dal/src/main/native/GPU.cpp b/mllib-dal/src/main/native/GPU.cpp index 9dbba24f4..0c79a10ed 100644 --- a/mllib-dal/src/main/native/GPU.cpp +++ b/mllib-dal/src/main/native/GPU.cpp @@ -10,7 +10,7 @@ typedef std::shared_ptr queuePtr; static std::mutex g_mtx; static std::vector g_queueVector; -static std::vector get_gpus() { +std::vector get_gpus() { auto platforms = sycl::platform::get_platforms(); for (auto p : platforms) { auto devices = p.get_devices(sycl::info::device_type::gpu); diff --git a/mllib-dal/src/main/native/GPU.h b/mllib-dal/src/main/native/GPU.h index f8d7c25a9..9023cd1f2 100644 --- a/mllib-dal/src/main/native/GPU.h +++ b/mllib-dal/src/main/native/GPU.h @@ -3,10 +3,11 @@ #include "service.h" #include #include -#include #include #include sycl::queue getAssignedGPU(const ComputeDevice device, jint *gpu_indices); sycl::queue getQueue(const ComputeDevice device); + +std::vector get_gpus(); diff --git a/mllib-dal/src/main/native/KMeansImpl.cpp b/mllib-dal/src/main/native/KMeansImpl.cpp index 9a3ce8550..722508185 100644 --- a/mllib-dal/src/main/native/KMeansImpl.cpp +++ b/mllib-dal/src/main/native/KMeansImpl.cpp @@ -338,25 +338,14 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe } #ifdef CPU_GPU_PROFILE case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rank); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - - auto queue = getAssignedGPU(device, gpuIndices); - - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, executorNum, rank, kvs); + logger::println(logger::INFO, + "OneDAL (native): use GPU kernels with rankid %d", + rank); + auto comm = getDalComm(); ret = doKMeansOneAPICompute(env, pNumTabData, numRows, numCols, pNumTabCenters, clusterNum, tolerance, iterationNum, comm, resultObj); - env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); break; } #endif diff --git a/mllib-dal/src/main/native/LinearRegressionImpl.cpp b/mllib-dal/src/main/native/LinearRegressionImpl.cpp index 21ab41546..3ed064317 100644 --- a/mllib-dal/src/main/native/LinearRegressionImpl.cpp +++ b/mllib-dal/src/main/native/LinearRegressionImpl.cpp @@ -214,19 +214,17 @@ ridge_regression_compute(size_t rankId, ccl::communicator &comm, } #ifdef CPU_GPU_PROFILE -static jlong doLROneAPICompute(JNIEnv *env, size_t rankId, sycl::queue &queue, - jlong pNumTabFeature, jlong featureRows, - jlong featureCols, jlong pNumTabLabel, - jlong labelCols, jboolean jfitIntercept, - jint executorNum, jobject resultObj) { +static jlong doLROneAPICompute( + JNIEnv *env, size_t rankId, + preview::spmd::communicator comm, + jlong pNumTabFeature, jlong featureRows, jlong featureCols, + jlong pNumTabLabel, jlong labelCols, jboolean jfitIntercept, + jint executorNum, jobject resultObj) { logger::println(logger::INFO, "oneDAL (native): GPU compute start , rankid %d", rankId); const bool isRoot = (rankId == ccl_root); bool fitIntercept = bool(jfitIntercept); - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = preview::spmd::make_communicator( - queue, executorNum, rankId, kvs); homogen_table xtrain = *reinterpret_cast( createHomogenTableWithArrayPtr(pNumTabFeature, featureRows, featureCols, comm.get_queue()) @@ -279,19 +277,13 @@ Java_com_intel_oap_mllib_regression_LinearRegressionDALImpl_cLinearRegressionTra jlong resultptr = 0L; if (useGPU) { #ifdef CPU_GPU_PROFILE - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rank); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - auto queue = getAssignedGPU(device, gpuIndices); - - resultptr = doLROneAPICompute(env, rank, queue, feature, featureRows, + logger::println(logger::INFO, + "oneDAL (native): use GPU kernels with rankid %d", + rank); + auto comm = getDalComm(); + resultptr = doLROneAPICompute(env, rank, comm, feature, featureRows, featureCols, label, labelCols, fitIntercept, executorNum, resultObj); - env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); #endif } else { ccl::communicator &cclComm = getComm(); diff --git a/mllib-dal/src/main/native/OneCCL.cpp b/mllib-dal/src/main/native/OneCCL.cpp index d539aac76..587556d63 100644 --- a/mllib-dal/src/main/native/OneCCL.cpp +++ b/mllib-dal/src/main/native/OneCCL.cpp @@ -29,6 +29,7 @@ #include #include "CCLInitSingleton.hpp" +#include "GPU.h" #include "Logger.h" #include "OneCCL.h" #include "com_intel_oap_mllib_OneCCL__.h" @@ -43,9 +44,18 @@ static size_t rank_id = 0; static std::vector g_comms; static std::vector> g_kvs; -ccl::communicator &getComm() { return g_comms[0]; } ccl::shared_ptr_class &getKvs() { return g_kvs[0]; } - +ccl::communicator &getComm() { return g_comms[0]; } +#ifdef CPU_GPU_PROFILE +static std::vector> + g_dal_comms; +oneapi::dal::preview::spmd::communicator< + oneapi::dal::preview::spmd::device_memory_access::usm> & +getDalComm() { + return g_dal_comms[0]; +} +#endif JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port, jobject param) { @@ -55,26 +65,57 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( auto t1 = std::chrono::high_resolution_clock::now(); ccl::init(); - + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + (float)std::chrono::duration_cast(t2 - t1) + .count(); + logger::println(logger::INFO, "OneCCL (native): init took %f secs", + duration / 1000); const char *str = env->GetStringUTFChars(ip_port, 0); ccl::string ccl_ip_port(str); +#ifdef CPU_ONLY_PROFILE auto &singletonCCLInit = CCLInitSingleton::get(size, rank, ccl_ip_port); g_kvs.push_back(singletonCCLInit.kvs); - -#ifdef CPU_ONLY_PROFILE g_comms.push_back( ccl::create_communicator(size, rank, singletonCCLInit.kvs)); - auto t2 = std::chrono::high_resolution_clock::now(); - auto duration = + rank_id = getComm().rank(); + comm_size = getComm().size(); + +#endif + +#ifdef CPU_GPU_PROFILE + t1 = std::chrono::high_resolution_clock::now(); + auto kvs_attr = ccl::create_kvs_attr(); + + kvs_attr.set(ccl_ip_port); + + ccl::shared_ptr_class kvs = ccl::create_main_kvs(kvs_attr); + + t2 = std::chrono::high_resolution_clock::now(); + duration = (float)std::chrono::duration_cast(t2 - t1) .count(); - logger::println(logger::INFO, "OneCCL (native): init took %f secs", + logger::println(logger::INFO, "OneCCL (native): create kvs took %f secs", duration / 1000); + auto gpus = get_gpus(); + sycl::queue queue{gpus[0]}; + t1 = std::chrono::high_resolution_clock::now(); + auto comm = oneapi::dal::preview::spmd::make_communicator< + oneapi::dal::preview::spmd::backend::ccl>(queue, size, rank, kvs); + t2 = std::chrono::high_resolution_clock::now(); + duration = + (float)std::chrono::duration_cast(t2 - t1) + .count(); + logger::println(logger::INFO, + "OneCCL (native): create communicator took %f secs", + duration / 1000); + g_dal_comms.push_back(comm); + rank_id = getDalComm().get_rank(); + comm_size = getDalComm().get_rank_count(); #endif - jclass cls = env->GetObjectClass(param); jfieldID fid_comm_size = env->GetFieldID(cls, "commSize", "J"); jfieldID fid_rank_id = env->GetFieldID(cls, "rankId", "J"); @@ -89,8 +130,13 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( JNIEXPORT void JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1cleanup(JNIEnv *env, jobject obj) { logger::printerrln(logger::INFO, "OneCCL (native): cleanup"); +#ifdef CPU_ONLY_PROFILE g_kvs.pop_back(); g_comms.pop_back(); +#endif +#ifdef CPU_GPU_PROFILE + g_dal_comms.pop_back(); +#endif } JNIEXPORT jboolean JNICALL diff --git a/mllib-dal/src/main/native/OneCCL.h b/mllib-dal/src/main/native/OneCCL.h index 7fda0c639..5770d4db2 100644 --- a/mllib-dal/src/main/native/OneCCL.h +++ b/mllib-dal/src/main/native/OneCCL.h @@ -17,8 +17,8 @@ #pragma once #include - #include + using namespace std; namespace ccl { @@ -44,4 +44,14 @@ event CCL_API gather(const BufferType *sendbuf, int sendcount, ccl::communicator &getComm(); ccl::shared_ptr_class &getKvs(); + +#ifdef CPU_GPU_PROFILE +#ifndef ONEDAL_DATA_PARALLEL +#define ONEDAL_DATA_PARALLEL +#endif +#include "Communicator.hpp" +oneapi::dal::preview::spmd::communicator< + oneapi::dal::preview::spmd::device_memory_access::usm> & +getDalComm(); +#endif extern const size_t ccl_root; diff --git a/mllib-dal/src/main/native/PCAImpl.cpp b/mllib-dal/src/main/native/PCAImpl.cpp index 743d07fbf..58bade63c 100644 --- a/mllib-dal/src/main/native/PCAImpl.cpp +++ b/mllib-dal/src/main/native/PCAImpl.cpp @@ -277,22 +277,12 @@ Java_com_intel_oap_mllib_feature_PCADALImpl_cPCATrainDAL( } #ifdef CPU_GPU_PROFILE case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rank); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - - auto queue = getAssignedGPU(device, gpuIndices); + logger::println(logger::INFO, + "oneDAL (native): use GPU kernels with rankid %d", + rank); - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, executorNum, rank, kvs); + auto comm = getDalComm(); doPCAOneAPICompute(env, pNumTabData, numRows, numCols, comm, resultObj); - env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); break; } #endif diff --git a/mllib-dal/src/main/native/SummarizerImpl.cpp b/mllib-dal/src/main/native/SummarizerImpl.cpp index 37ae93256..061eb587a 100644 --- a/mllib-dal/src/main/native/SummarizerImpl.cpp +++ b/mllib-dal/src/main/native/SummarizerImpl.cpp @@ -295,22 +295,13 @@ Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL( } #ifdef CPU_GPU_PROFILE case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); - logger::println( - logger::INFO, - "oneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rank); - - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); - auto queue = getAssignedGPU(device, gpuIndices); - - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, executorNum, rank, kvs); + logger::println(logger::INFO, + "oneDAL (native): use GPU kernels with rankid %d", + rank); + + auto comm = getDalComm(); doSummarizerOneAPICompute(env, pNumTabData, numRows, numCols, comm, resultObj); - env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); break; } #endif diff --git a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneCCL__.h b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneCCL__.h index 4bfa1d0c3..a89b7d214 100644 --- a/mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneCCL__.h +++ b/mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneCCL__.h @@ -45,7 +45,7 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1getAvailPort * Signature: (IILjava/lang/String;Lcom/intel/oap/mllib/CCLParam;)I */ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init - (JNIEnv *, jobject, jint, jint, jstring, jstring, jobject); + (JNIEnv *, jobject, jint, jint, jstring, jobject); /* * Class: com_intel_oap_mllib_OneCCL__ diff --git a/mllib-dal/src/main/native/service.h b/mllib-dal/src/main/native/service.h index f208b8389..ed3478068 100644 --- a/mllib-dal/src/main/native/service.h +++ b/mllib-dal/src/main/native/service.h @@ -54,14 +54,9 @@ using namespace daal::data_management; #endif #include "oneapi/dal/table/homogen.hpp" -using namespace oneapi::dal; -using namespace oneapi::dal::detail; - +typedef std::vector ByteBuffer; typedef float GpuAlgorithmFPType; /* Algorithm floating-point type */ typedef double CpuAlgorithmFPType; /* Algorithm floating-point type */ -typedef std::vector ByteBuffer; -typedef std::shared_ptr CSRTablePtr; - enum class ComputeDevice { host, cpu, gpu, uninitialized }; const std::string ComputeDeviceString[] = {"HOST", "CPU", "GPU"}; @@ -70,20 +65,23 @@ void printNumericTable(const NumericTablePtr &dataTable, size_t nPrintedCols = 0, size_t interval = 10); size_t serializeDAALObject(SerializationIface *pData, ByteBuffer &buffer); SerializationIfacePtr deserializeDAALObject(daal::byte *buff, size_t length); -CSRNumericTable *createFloatSparseTable(const std::string &datasetFileName); ComputeDevice getComputeDeviceByOrdinal(size_t computeDeviceOrdinal); -void saveCSRTablePtrToVector(const CSRTablePtr &ptr); #ifdef CPU_GPU_PROFILE -#include "oneapi/dal/table/common.hpp" #include "oneapi/dal/table/row_accessor.hpp" +using namespace oneapi::dal; +using namespace oneapi::dal::detail; typedef std::shared_ptr HomogenTablePtr; +typedef std::shared_ptr CSRTablePtr; void saveHomogenTablePtrToVector(const HomogenTablePtr &ptr); HomogenTablePtr createHomogenTableWithArrayPtr(size_t pNumTabData, size_t numRows, size_t numClos, sycl::queue queue); +CSRNumericTable *createFloatSparseTable(const std::string &datasetFileName); +void saveCSRTablePtrToVector(const CSRTablePtr &ptr); + NumericTablePtr homegenToSyclHomogen(NumericTablePtr ntHomogen); inline void printHomegenTable(const oneapi::dal::table &table) { auto arr = oneapi::dal::row_accessor(table).pull(); diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala new file mode 100644 index 000000000..bd2a7be18 --- /dev/null +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2020 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.mllib + +import org.apache.spark.TaskContext +import org.apache.spark.rdd.RDD + +object CommonJob { + def initCCLAndSetAffinityMask(data: RDD[_], + executorNum: Int, + kvsIPPort: String, + useDevice: String): Unit = { + data.mapPartitionsWithIndex { (rank, table) => + OneCCL.init(executorNum, rank, kvsIPPort) + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setAffinityMask(gpuIndices(0).toString()) + Iterator.empty + }.count() + } +} diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index da0612b0e..c148b4715 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -16,7 +16,7 @@ package com.intel.oap.mllib.classification import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.annotation.Since import org.apache.spark.TaskContext @@ -75,21 +75,7 @@ class RandomForestClassifierDALImpl(val uid: String, rfcTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(labeledPointsTables) - labeledPointsTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setAffinityMask(gpuIndices(0).toString()) - Iterator.empty - }.count() - - labeledPointsTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.initCCLAndSetAffinityMask(labeledPointsTables, executorNum, kvsIPPort, useDevice) rfcTimer.record("OneCCL Init") val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) => diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 284be303a..1a5eb082e 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.clustering import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -51,21 +51,8 @@ class KMeansDALImpl(var nClusters: Int, kmeansTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(coalescedTables) - coalescedTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setAffinityMask(gpuIndices(0).toString()) - Iterator.empty - }.count() - - coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + + CommonJob.initCCLAndSetAffinityMask(coalescedTables, executorNum, kvsIPPort, useDevice) kmeansTimer.record("OneCCL Init") val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index 68e41d51b..26b418ae6 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -19,7 +19,7 @@ package com.intel.oap.mllib.feature import java.nio.DoubleBuffer import com.intel.daal.data_management.data.{HomogenNumericTable, NumericTable} import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Service, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Service, Utils} import org.apache.spark.TaskContext import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging @@ -59,21 +59,7 @@ class PCADALImpl(val k: Int, val kvsIPPort = getOneCCLIPPort(coalescedTables) pcaTimer.record("Data Convertion") - coalescedTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setAffinityMask(gpuIndices(0).toString()) - Iterator.empty - }.count() - - coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.initCCLAndSetAffinityMask(coalescedTables, executorNum, kvsIPPort, useDevice) pcaTimer.record("OneCCL Init") val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index 4717c9bbe..d9b89f161 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.regression import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.SparkException import org.apache.spark.TaskContext @@ -106,16 +106,8 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, } lrTimer.record("Data Convertion") - labeledPointsTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setAffinityMask(gpuIndices(0).toString()) - Iterator.empty - }.count() + CommonJob.initCCLAndSetAffinityMask(labeledPointsTables, executorNum, kvsIPPort, useDevice) + lrTimer.record("OneCCL Init") val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) => val (feature, label) = tables.next() @@ -132,7 +124,6 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, (label.toString.toLong, 0L, 0L) } - OneCCL.init(executorNum, rank, kvsIPPort) val result = new LiRResult() val gpuIndices = if (useDevice == "GPU") { diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index 850a65c32..b602bd703 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.regression import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.classification.{LearningNode, RandomForestResult} -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -69,21 +69,7 @@ class RandomForestRegressorDALImpl(val uid: String, val kvsIPPort = getOneCCLIPPort(labeledPointsTables) - labeledPointsTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setAffinityMask(gpuIndices(0).toString()) - Iterator.empty - }.count() - - labeledPointsTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.initCCLAndSetAffinityMask(labeledPointsTables, executorNum, kvsIPPort, useDevice) rfrTimer.record("OneCCL Init") val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) => diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index 4e2595f02..be58fda2b 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.stat import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -46,23 +46,9 @@ class CorrelationDALImpl( val kvsIPPort = getOneCCLIPPort(coalescedTables) - coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.initCCLAndSetAffinityMask(coalescedTables, executorNum, kvsIPPort, useDevice) corTimer.record("OneCCL Init") - coalescedTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setAffinityMask(gpuIndices(0).toString()) - Iterator.empty - }.count() - val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") { iter.next() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index bcf0f951a..8a1f58273 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -16,7 +16,7 @@ package com.intel.oap.mllib.stat -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.Vector @@ -47,23 +47,9 @@ class SummarizerDALImpl(val executorNum: Int, val kvsIPPort = getOneCCLIPPort(data) - coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.initCCLAndSetAffinityMask(coalescedTables, executorNum, kvsIPPort, useDevice) sumTimer.record("OneCCL Init") - coalescedTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setAffinityMask(gpuIndices(0).toString()) - Iterator.empty - }.count() - val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") { iter.next() diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala index a56170cee..264abeca3 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala @@ -100,12 +100,14 @@ object TestCommon { val device = System.getProperty("computeDevice") var computeDevice: Common.ComputeDevice = Common.ComputeDevice.HOST if(device != null) { + // scalastyle:off caselocale device.toUpperCase match { case "HOST" => computeDevice = Common.ComputeDevice.HOST case "CPU" => computeDevice = Common.ComputeDevice.CPU case "GPU" => computeDevice = Common.ComputeDevice.GPU case _ => "Invalid Device" } + // scalastyle:on caselocale } System.out.println("getDevice : " + computeDevice) computeDevice