From a5913a7b23c305a78de31a0afc89c1f97311827b Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Thu, 15 Aug 2024 15:03:38 +0800 Subject: [PATCH] update --- mllib-dal/src/main/native/OneCCL.cpp | 29 +++++++++---------- .../scala/com/intel/oap/mllib/OneCCL.scala | 7 ++--- .../RandomForestClassifierDALImpl.scala | 2 +- .../oap/mllib/clustering/KMeansDALImpl.scala | 2 +- .../intel/oap/mllib/feature/PCADALImpl.scala | 2 +- .../regression/LinearRegressionDALImpl.scala | 2 +- .../RandomForestRegressorDALImpl.scala | 2 +- .../oap/mllib/stat/CorrelationDALImpl.scala | 2 +- .../oap/mllib/stat/SummarizerDALImpl.scala | 2 +- 9 files changed, 23 insertions(+), 27 deletions(-) diff --git a/mllib-dal/src/main/native/OneCCL.cpp b/mllib-dal/src/main/native/OneCCL.cpp index 2e7417d74..0443c3a05 100644 --- a/mllib-dal/src/main/native/OneCCL.cpp +++ b/mllib-dal/src/main/native/OneCCL.cpp @@ -47,7 +47,7 @@ ccl::communicator &getComm() { return g_comms[0]; } ccl::shared_ptr_class &getKvs() { return g_kvs[0]; } JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( - JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port, jint computeDeviceOrdinal, + JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port, jobject param) { logger::println(logger::INFO, "OneCCL (native): init"); @@ -64,21 +64,18 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( auto &singletonCCLInit = CCLInitSingleton::get(size, rank, ccl_ip_port); g_kvs.push_back(singletonCCLInit.kvs); - ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); - switch (device) { - case ComputeDevice::host: - case ComputeDevice::cpu: { - g_comms.push_back( - ccl::create_communicator(size, rank, singletonCCLInit.kvs)); - - auto t2 = std::chrono::high_resolution_clock::now(); - auto duration = - (float)std::chrono::duration_cast(t2 - t1) - .count(); - logger::println(logger::INFO, "OneCCL (native): init took %f secs", - duration / 1000); - break; - } + +#ifdef CPU_ONLY_PROFILE + g_comms.push_back( + ccl::create_communicator(size, rank, singletonCCLInit.kvs)); + + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + (float)std::chrono::duration_cast(t2 - t1) + .count(); + logger::println(logger::INFO, "OneCCL (native): init took %f secs", + duration / 1000); +#endif jclass cls = env->GetObjectClass(param); jfieldID fid_comm_size = env->GetFieldID(cls, "commSize", "J"); diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala index 70ddef079..48caebe1b 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala @@ -24,14 +24,14 @@ object OneCCL extends Logging { var cclParam = new CCLParam() - def init(executor_num: Int, rank: Int, ip_port: String, computeDeviceOrdinal: Int): Unit = { + def init(executor_num: Int, rank: Int, ip_port: String): Unit = { setExecutorEnv() logInfo(s"Initializing with IP_PORT: ${ip_port}") // cclParam is output from native code - c_init(executor_num, rank, ip_port, computeDeviceOrdinal, cclParam) + c_init(executor_num, rank, ip_port, cclParam) // executor number should equal to oneCCL world size assert(executor_num == cclParam.getCommSize, @@ -67,8 +67,7 @@ object OneCCL extends Logging { @native def c_getAvailPort(localIP: String): Int - @native private def c_init(size: Int, rank: Int, ip_port: String, - computeDeviceOrdinal: Int, param: CCLParam): Int + @native private def c_init(size: Int, rank: Int, ip_port: String, param: CCLParam): Int @native private def c_cleanup(): Unit } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index d0cfa42e4..6aca49f14 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -76,7 +76,7 @@ class RandomForestClassifierDALImpl(val uid: String, val kvsIPPort = getOneCCLIPPort(labeledPointsTables) labeledPointsTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) + OneCCL.init(executorNum, rank, kvsIPPort) Iterator.empty }.count() rfcTimer.record("OneCCL Init") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index d8752fcd3..64b2f6c7f 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -53,7 +53,7 @@ class KMeansDALImpl(var nClusters: Int, val kvsIPPort = getOneCCLIPPort(coalescedTables) coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) + OneCCL.init(executorNum, rank, kvsIPPort) Iterator.empty }.count() kmeansTimer.record("OneCCL Init") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index b9df1f6c3..8eb9554a1 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -60,7 +60,7 @@ class PCADALImpl(val k: Int, pcaTimer.record("Data Convertion") coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) + OneCCL.init(executorNum, rank, kvsIPPort) Iterator.empty }.count() pcaTimer.record("OneCCL Init") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index a0ed680d6..f95bc0846 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -121,7 +121,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, (label.toString.toLong, 0L, 0L) } - OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) + OneCCL.init(executorNum, rank, kvsIPPort) val result = new LiRResult() val gpuIndices = if (useDevice == "GPU") { diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index 11e924cd6..018473a61 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -70,7 +70,7 @@ class RandomForestRegressorDALImpl(val uid: String, val kvsIPPort = getOneCCLIPPort(labeledPointsTables) labeledPointsTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) + OneCCL.init(executorNum, rank, kvsIPPort) Iterator.empty }.count() rfrTimer.record("OneCCL Init") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index 73a172bb0..e521aefe7 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -47,7 +47,7 @@ class CorrelationDALImpl( val kvsIPPort = getOneCCLIPPort(coalescedTables) coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) + OneCCL.init(executorNum, rank, kvsIPPort) Iterator.empty }.count() corTimer.record("OneCCL Init") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index 3828674f2..277039ab1 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -48,7 +48,7 @@ class SummarizerDALImpl(val executorNum: Int, val kvsIPPort = getOneCCLIPPort(data) coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) + OneCCL.init(executorNum, rank, kvsIPPort) Iterator.empty }.count() sumTimer.record("OneCCL Init")