Skip to content

Commit

Permalink
optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
minmingzhu committed Sep 10, 2024
1 parent 2404762 commit de61d25
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 129 deletions.
46 changes: 46 additions & 0 deletions mllib-dal/src/main/native/GPU.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,49 @@ sycl::queue getQueue(const ComputeDevice device) {
}
}
}


preview::spmd::communicator<preview::spmd::device_memory_access::usm> createDalCommunicator(const jint executorNum, const jint rank, const ccl::string ccl_ip_port){
auto gpus = get_gpus();

auto t1 = std::chrono::high_resolution_clock::now();

ccl::init();

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();

logger::println(logger::INFO, "OneCCL singleton init took %f secs",
duration / 1000);
logger::Logger::getInstance(c_breakdown_name).printLogToFile("rankID was %d, OneCCL singleton init took %f secs.", rank, duration / 1000 );


t1 = std::chrono::high_resolution_clock::now();

auto kvs_attr = ccl::create_kvs_attr();

kvs_attr.set<ccl::kvs_attr_id::ip_port>(ccl_ip_port);

ccl::shared_ptr_class<ccl::kvs> kvs = ccl::create_main_kvs(kvs_attr);

t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::println(logger::INFO, "OneCCL (native): create kvs took %f secs",
duration / 1000);
logger::Logger::getInstance(c_breakdown_name).printLogToFile("rankID was %d, OneCCL create communicator took %f secs.", rank, duration / 1000 );
sycl::queue queue{gpus[0]};
t1 = std::chrono::high_resolution_clock::now();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
t2 = std::chrono::high_resolution_clock::now();
duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::Logger::getInstance(c_breakdown_name).printLogToFile("rankID was %d, create communicator took %f secs.", rank, duration / 1000 );
return comm;
}

2 changes: 2 additions & 0 deletions mllib-dal/src/main/native/GPU.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
#include <daal_sycl.h>
#include <jni.h>
#include <oneapi/ccl.hpp>
#include "Communicator.hpp"

sycl::queue getAssignedGPU(const ComputeDevice device, jint *gpu_indices);

sycl::queue getQueue(const ComputeDevice device);
preview::spmd::communicator<preview::spmd::device_memory_access::usm> createDalCommunicator(jint executorNum, jint rank, ccl::string ccl_ip_port);
16 changes: 5 additions & 11 deletions mllib-dal/src/main/native/KMeansImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,25 +338,19 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe
}
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
int nGpu = env->GetArrayLength(gpuIdxArray);
logger::println(
logger::INFO,
"OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu,
rank);
"OneDAL (native): use GPU kernels with rankid %d", rank);

jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0);
const char *str = env->GetStringUTFChars(ip_port, nullptr);
ccl::string ccl_ip_port(str);
auto comm = createDalCommunicator(executorNum, rank, ccl_ip_port);

auto queue = getAssignedGPU(device, gpuIndices);

ccl::shared_ptr_class<ccl::kvs> &kvs = getKvs();
auto comm =
preview::spmd::make_communicator<preview::spmd::backend::ccl>(
queue, executorNum, rank, kvs);
ret = doKMeansOneAPICompute(env, pNumTabData, numRows, numCols,
pNumTabCenters, clusterNum, tolerance,
iterationNum, comm, resultObj);

env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0);
env->ReleaseStringUTFChars(ip_port, str);
break;
}
#endif
Expand Down
46 changes: 46 additions & 0 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2020 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.mllib

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD

object CommonJob {

def setAffinityMask(data: RDD[_], useDevice: String): Unit = {
data.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()
}

def createCCLInit(data: RDD[_], executorNum: Int, kvsIPPort: String, useDevice: String): Unit = {
if (useDevice == "CPU") {
data.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
}.count()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.intel.oap.mllib.classification

import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.{OneCCL, OneDAL, Utils}
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils}
import com.intel.oneapi.dal.table.Common
import org.apache.spark.annotation.Since
import org.apache.spark.TaskContext
Expand Down Expand Up @@ -75,21 +75,8 @@ class RandomForestClassifierDALImpl(val uid: String,
rfcTimer.record("Data Convertion")
val kvsIPPort = getOneCCLIPPort(labeledPointsTables)

labeledPointsTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

labeledPointsTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
}.count()
CommonJob.setAffinityMask(labeledPointsTables, useDevice)
CommonJob.createCCLInit(labeledPointsTables, executorNum, kvsIPPort, useDevice)
rfcTimer.record("OneCCL Init")

val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.intel.oap.mllib.clustering

import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.{OneCCL, OneDAL, Utils}
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils}
import com.intel.oneapi.dal.table.Common
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -51,21 +51,9 @@ class KMeansDALImpl(var nClusters: Int,
kmeansTimer.record("Data Convertion")

val kvsIPPort = getOneCCLIPPort(coalescedTables)
coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

coalescedTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
}.count()

CommonJob.setAffinityMask(coalescedTables, useDevice)
CommonJob.createCCLInit(coalescedTables, executorNum, kvsIPPort, useDevice)
kmeansTimer.record("OneCCL Init")

val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
Expand Down Expand Up @@ -118,7 +106,9 @@ class KMeansDALImpl(var nClusters: Int,
} else {
Iterator.empty
}
OneCCL.cleanup()
if (useDevice == "CPU") {
OneCCL.cleanup()
}
ret
}.collect()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.intel.oap.mllib.feature
import java.nio.DoubleBuffer
import com.intel.daal.data_management.data.{HomogenNumericTable, NumericTable}
import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.{OneCCL, OneDAL, Service, Utils}
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Service, Utils}
import org.apache.spark.TaskContext
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -59,21 +59,8 @@ class PCADALImpl(val k: Int,
val kvsIPPort = getOneCCLIPPort(coalescedTables)
pcaTimer.record("Data Convertion")

coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

coalescedTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
}.count()
CommonJob.setAffinityMask(coalescedTables, useDevice)
CommonJob.createCCLInit(coalescedTables, executorNum, kvsIPPort, useDevice)
pcaTimer.record("OneCCL Init")

val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.intel.oap.mllib.regression

import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.{OneCCL, OneDAL, Utils}
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils}
import com.intel.oneapi.dal.table.Common
import org.apache.spark.SparkException
import org.apache.spark.TaskContext
Expand Down Expand Up @@ -106,16 +106,9 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean,
}
lrTimer.record("Data Convertion")

labeledPointsTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()
CommonJob.setAffinityMask(labeledPointsTables, useDevice)
CommonJob.createCCLInit(labeledPointsTables, executorNum, kvsIPPort, useDevice)
lrTimer.record("OneCCL Init")

val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) =>
val (feature, label) = tables.next()
Expand All @@ -132,7 +125,6 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean,
(label.toString.toLong, 0L, 0L)
}

OneCCL.init(executorNum, rank, kvsIPPort)
val result = new LiRResult()

val gpuIndices = if (useDevice == "GPU") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package com.intel.oap.mllib.regression

import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.classification.{LearningNode, RandomForestResult}
import com.intel.oap.mllib.{OneCCL, OneDAL, Utils}
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils}
import com.intel.oneapi.dal.table.Common
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -69,21 +69,8 @@ class RandomForestRegressorDALImpl(val uid: String,

val kvsIPPort = getOneCCLIPPort(labeledPointsTables)

labeledPointsTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

labeledPointsTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
}.count()
CommonJob.setAffinityMask(labeledPointsTables, useDevice)
CommonJob.createCCLInit(labeledPointsTables, executorNum, kvsIPPort, useDevice)
rfrTimer.record("OneCCL Init")

val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.intel.oap.mllib.stat

import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.{OneCCL, OneDAL, Utils}
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils}
import com.intel.oneapi.dal.table.Common
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -46,23 +46,10 @@ class CorrelationDALImpl(

val kvsIPPort = getOneCCLIPPort(coalescedTables)

coalescedTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
}.count()
CommonJob.setAffinityMask(coalescedTables, useDevice)
CommonJob.createCCLInit(coalescedTables, executorNum, kvsIPPort, useDevice)
corTimer.record("OneCCL Init")

coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") {
iter.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.intel.oap.mllib.stat

import com.intel.oap.mllib.{OneCCL, OneDAL, Utils}
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils}
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.ml.linalg.Vector
Expand Down Expand Up @@ -47,23 +47,10 @@ class SummarizerDALImpl(val executorNum: Int,

val kvsIPPort = getOneCCLIPPort(data)

coalescedTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
}.count()
CommonJob.setAffinityMask(coalescedTables, useDevice)
CommonJob.createCCLInit(coalescedTables, executorNum, kvsIPPort, useDevice)
sumTimer.record("OneCCL Init")

coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") {
iter.next()
Expand Down
Loading

0 comments on commit de61d25

Please sign in to comment.