diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala index 48caebe1b..c89c9ffd2 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala @@ -42,12 +42,8 @@ object OneCCL extends Logging { } // Run on Executor - def setExecutorEnv(): Unit = { - setEnv("CCL_ATL_TRANSPORT", "ofi") - // Set CCL_ROOT to workaround CCL_ROOT env read bug, should remove when upstream fix this - setEnv("CCL_ROOT", "/opt/intel/oneapi/ccl/latest") - // Uncomment this if you whant to debug oneCCL - // setEnv("CCL_LOG_LEVEL", "debug") + def setExecutorEnv(key: String, value: String): Unit = { + setEnv(key, value) } // Run on Executor diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index 6aca49f14..70479c79a 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -75,6 +75,17 @@ class RandomForestClassifierDALImpl(val uid: String, rfcTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(labeledPointsTables) + labeledPointsTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) + Iterator.empty + }.count() + labeledPointsTables.mapPartitionsWithIndex { (rank, table) => OneCCL.init(executorNum, rank, kvsIPPort) Iterator.empty diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 64b2f6c7f..14eb16800 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -51,6 +51,16 @@ class KMeansDALImpl(var nClusters: Int, kmeansTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(coalescedTables) + coalescedTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) + Iterator.empty + }.count() coalescedTables.mapPartitionsWithIndex { (rank, table) => OneCCL.init(executorNum, rank, kvsIPPort) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index 8eb9554a1..7190ade3f 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -59,6 +59,17 @@ class PCADALImpl(val k: Int, val kvsIPPort = getOneCCLIPPort(coalescedTables) pcaTimer.record("Data Convertion") + coalescedTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) + Iterator.empty + }.count() + coalescedTables.mapPartitionsWithIndex { (rank, table) => OneCCL.init(executorNum, rank, kvsIPPort) Iterator.empty diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index f95bc0846..40b2f4423 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -106,6 +106,17 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, } lrTimer.record("Data Convertion") + labeledPointsTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) + Iterator.empty + }.count() + val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) => val (feature, label) = tables.next() val (featureTabAddr : Long, featureRows : Long, featureColumns : Long) = diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index 018473a61..77ea4c656 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -69,6 +69,17 @@ class RandomForestRegressorDALImpl(val uid: String, val kvsIPPort = getOneCCLIPPort(labeledPointsTables) + labeledPointsTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) + Iterator.empty + }.count() + labeledPointsTables.mapPartitionsWithIndex { (rank, table) => OneCCL.init(executorNum, rank, kvsIPPort) Iterator.empty diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index e521aefe7..fff2d4ac5 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -52,6 +52,17 @@ class CorrelationDALImpl( }.count() corTimer.record("OneCCL Init") + coalescedTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) + Iterator.empty + }.count() + val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") { iter.next() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index 277039ab1..dcde7ef91 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -53,6 +53,17 @@ class SummarizerDALImpl(val executorNum: Int, }.count() sumTimer.record("OneCCL Init") + coalescedTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) + Iterator.empty + }.count() + val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") { iter.next()