diff --git a/mllib-dal/src/main/native/OneCCL.cpp b/mllib-dal/src/main/native/OneCCL.cpp index d539aac76..2e7417d74 100644 --- a/mllib-dal/src/main/native/OneCCL.cpp +++ b/mllib-dal/src/main/native/OneCCL.cpp @@ -47,7 +47,7 @@ ccl::communicator &getComm() { return g_comms[0]; } ccl::shared_ptr_class &getKvs() { return g_kvs[0]; } JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( - JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port, + JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port, jint computeDeviceOrdinal, jobject param) { logger::println(logger::INFO, "OneCCL (native): init"); @@ -58,29 +58,34 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init( const char *str = env->GetStringUTFChars(ip_port, 0); ccl::string ccl_ip_port(str); + const char *device = env->GetStringUTFChars(use_device, 0); + ccl::string ccl_ip_port(str); auto &singletonCCLInit = CCLInitSingleton::get(size, rank, ccl_ip_port); g_kvs.push_back(singletonCCLInit.kvs); - -#ifdef CPU_ONLY_PROFILE - g_comms.push_back( - ccl::create_communicator(size, rank, singletonCCLInit.kvs)); - - auto t2 = std::chrono::high_resolution_clock::now(); - auto duration = - (float)std::chrono::duration_cast(t2 - t1) - .count(); - logger::println(logger::INFO, "OneCCL (native): init took %f secs", - duration / 1000); -#endif + ComputeDevice device = getComputeDeviceByOrdinal(computeDeviceOrdinal); + switch (device) { + case ComputeDevice::host: + case ComputeDevice::cpu: { + g_comms.push_back( + ccl::create_communicator(size, rank, singletonCCLInit.kvs)); + + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + (float)std::chrono::duration_cast(t2 - t1) + .count(); + logger::println(logger::INFO, "OneCCL (native): init took %f secs", + duration / 1000); + break; + } jclass cls = env->GetObjectClass(param); jfieldID fid_comm_size = env->GetFieldID(cls, "commSize", "J"); jfieldID fid_rank_id = env->GetFieldID(cls, "rankId", "J"); - env->SetLongField(param, fid_comm_size, comm_size); - env->SetLongField(param, fid_rank_id, rank_id); + env->SetLongField(param, size, comm_size); + env->SetLongField(param, rank, rank_id); env->ReleaseStringUTFChars(ip_port, str); return 1; diff --git a/mllib-dal/src/main/native/SummarizerImpl.cpp b/mllib-dal/src/main/native/SummarizerImpl.cpp index 37ae93256..bea17dadd 100644 --- a/mllib-dal/src/main/native/SummarizerImpl.cpp +++ b/mllib-dal/src/main/native/SummarizerImpl.cpp @@ -268,9 +268,15 @@ static void doSummarizerOneAPICompute( JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_stat_SummarizerDALImpl_cSummarizerTrainDAL( +<<<<<<< HEAD JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows, jlong numCols, jint executorNum, jint executorCores, jint computeDeviceOrdinal, jintArray gpuIdxArray, jobject resultObj) { +======= + JNIEnv *env, jobject obj, jint rank, jlong pNumTabData, jlong numRows, jlong numCols, + jint executorNum, jint executorCores, jint computeDeviceOrdinal, + jintArray gpuIdxArray, jobject resultObj) { +>>>>>>> remove oneccl communicator logger::println(logger::INFO, "oneDAL (native): use DPC++ kernels; device %s", ComputeDeviceString[computeDeviceOrdinal].c_str()); diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala index 48caebe1b..70ddef079 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala @@ -24,14 +24,14 @@ object OneCCL extends Logging { var cclParam = new CCLParam() - def init(executor_num: Int, rank: Int, ip_port: String): Unit = { + def init(executor_num: Int, rank: Int, ip_port: String, computeDeviceOrdinal: Int): Unit = { setExecutorEnv() logInfo(s"Initializing with IP_PORT: ${ip_port}") // cclParam is output from native code - c_init(executor_num, rank, ip_port, cclParam) + c_init(executor_num, rank, ip_port, computeDeviceOrdinal, cclParam) // executor number should equal to oneCCL world size assert(executor_num == cclParam.getCommSize, @@ -67,7 +67,8 @@ object OneCCL extends Logging { @native def c_getAvailPort(localIP: String): Int - @native private def c_init(size: Int, rank: Int, ip_port: String, param: CCLParam): Int + @native private def c_init(size: Int, rank: Int, ip_port: String, + computeDeviceOrdinal: Int, param: CCLParam): Int @native private def c_cleanup(): Unit } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index 6aca49f14..d0cfa42e4 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -76,7 +76,7 @@ class RandomForestClassifierDALImpl(val uid: String, val kvsIPPort = getOneCCLIPPort(labeledPointsTables) labeledPointsTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) Iterator.empty }.count() rfcTimer.record("OneCCL Init") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 64b2f6c7f..d8752fcd3 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -53,7 +53,7 @@ class KMeansDALImpl(var nClusters: Int, val kvsIPPort = getOneCCLIPPort(coalescedTables) coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) Iterator.empty }.count() kmeansTimer.record("OneCCL Init") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index 8eb9554a1..b9df1f6c3 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -60,7 +60,7 @@ class PCADALImpl(val k: Int, pcaTimer.record("Data Convertion") coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) Iterator.empty }.count() pcaTimer.record("OneCCL Init") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index f95bc0846..a0ed680d6 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -121,7 +121,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, (label.toString.toLong, 0L, 0L) } - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) val result = new LiRResult() val gpuIndices = if (useDevice == "GPU") { diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index 018473a61..11e924cd6 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -70,7 +70,7 @@ class RandomForestRegressorDALImpl(val uid: String, val kvsIPPort = getOneCCLIPPort(labeledPointsTables) labeledPointsTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) Iterator.empty }.count() rfrTimer.record("OneCCL Init") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index e521aefe7..73a172bb0 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -47,7 +47,7 @@ class CorrelationDALImpl( val kvsIPPort = getOneCCLIPPort(coalescedTables) coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) Iterator.empty }.count() corTimer.record("OneCCL Init") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index 277039ab1..3828674f2 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -48,7 +48,7 @@ class SummarizerDALImpl(val executorNum: Int, val kvsIPPort = getOneCCLIPPort(data) coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) + OneCCL.init(executorNum, rank, kvsIPPort, computeDevice.ordinal()) Iterator.empty }.count() sumTimer.record("OneCCL Init")