From 9c509a1c06cf85b120533c53ca913581d5a6f36c Mon Sep 17 00:00:00 2001 From: minmingzhu Date: Sun, 18 Aug 2024 09:52:43 +0800 Subject: [PATCH] optimize code --- mllib-dal/src/main/native/GPU.cpp | 46 +++++++++++++++++++ mllib-dal/src/main/native/GPU.h | 2 + mllib-dal/src/main/native/KMeansImpl.cpp | 16 ++----- .../scala/com/intel/oap/mllib/CommonJob.scala | 46 +++++++++++++++++++ .../RandomForestClassifierDALImpl.scala | 19 ++------ .../oap/mllib/clustering/KMeansDALImpl.scala | 24 +++------- .../intel/oap/mllib/feature/PCADALImpl.scala | 19 ++------ .../regression/LinearRegressionDALImpl.scala | 16 ++----- .../RandomForestRegressorDALImpl.scala | 19 ++------ .../oap/mllib/stat/CorrelationDALImpl.scala | 19 ++------ .../oap/mllib/stat/SummarizerDALImpl.scala | 19 ++------ .../oap/mllib/ConvertHomogenTableSuite.scala | 6 +-- .../mllib/CorrelationHomogenTableSuite.scala | 2 +- .../mllib/SummarizerHomogenTableSuite.scala | 12 ++--- .../com/intel/oap/mllib/TestCommon.scala | 12 ++--- 15 files changed, 141 insertions(+), 136 deletions(-) create mode 100644 mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala diff --git a/mllib-dal/src/main/native/GPU.cpp b/mllib-dal/src/main/native/GPU.cpp index 9dbba24f4..5be0223a4 100644 --- a/mllib-dal/src/main/native/GPU.cpp +++ b/mllib-dal/src/main/native/GPU.cpp @@ -113,3 +113,49 @@ sycl::queue getQueue(const ComputeDevice device) { } } } + + +preview::spmd::communicator createDalCommunicator(const jint executorNum, const jint rank, const ccl::string ccl_ip_port){ + auto gpus = get_gpus(); + + auto t1 = std::chrono::high_resolution_clock::now(); + + ccl::init(); + + auto t2 = std::chrono::high_resolution_clock::now(); + auto duration = + (float)std::chrono::duration_cast(t2 - t1).count(); + + logger::println(logger::INFO, "OneCCL singleton init took %f secs", + duration / 1000); + logger::Logger::getInstance(c_breakdown_name).printLogToFile("rankID was %d, OneCCL singleton init took %f secs.", rank, duration / 1000 ); + + + t1 = std::chrono::high_resolution_clock::now(); + + auto kvs_attr = ccl::create_kvs_attr(); + + kvs_attr.set(ccl_ip_port); + + ccl::shared_ptr_class kvs = ccl::create_main_kvs(kvs_attr); + + t2 = std::chrono::high_resolution_clock::now(); + duration = + (float)std::chrono::duration_cast(t2 - t1) + .count(); + logger::println(logger::INFO, "OneCCL (native): create kvs took %f secs", + duration / 1000); + logger::Logger::getInstance(c_breakdown_name).printLogToFile("rankID was %d, OneCCL create communicator took %f secs.", rank, duration / 1000 ); + sycl::queue queue{gpus[0]}; + t1 = std::chrono::high_resolution_clock::now(); + auto comm = + preview::spmd::make_communicator( + queue, executorNum, rank, kvs); + t2 = std::chrono::high_resolution_clock::now(); + duration = + (float)std::chrono::duration_cast(t2 - t1) + .count(); + logger::Logger::getInstance(c_breakdown_name).printLogToFile("rankID was %d, create communicator took %f secs.", rank, duration / 1000 ); + return comm; +} + diff --git a/mllib-dal/src/main/native/GPU.h b/mllib-dal/src/main/native/GPU.h index f8d7c25a9..83b3272f0 100644 --- a/mllib-dal/src/main/native/GPU.h +++ b/mllib-dal/src/main/native/GPU.h @@ -6,7 +6,9 @@ #include #include #include +#include "Communicator.hpp" sycl::queue getAssignedGPU(const ComputeDevice device, jint *gpu_indices); sycl::queue getQueue(const ComputeDevice device); +preview::spmd::communicator createDalCommunicator(jint executorNum, jint rank, ccl::string ccl_ip_port); diff --git a/mllib-dal/src/main/native/KMeansImpl.cpp b/mllib-dal/src/main/native/KMeansImpl.cpp index f690c1c45..21ef5e218 100644 --- a/mllib-dal/src/main/native/KMeansImpl.cpp +++ b/mllib-dal/src/main/native/KMeansImpl.cpp @@ -338,25 +338,19 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansOneapiComputeWithInitCe } #ifdef CPU_GPU_PROFILE case ComputeDevice::gpu: { - int nGpu = env->GetArrayLength(gpuIdxArray); logger::println( logger::INFO, - "OneDAL (native): use GPU kernels with %d GPU(s) rankid %d", nGpu, - rank); + "OneDAL (native): use GPU kernels with rankid %d", rank); - jint *gpuIndices = env->GetIntArrayElements(gpuIdxArray, 0); + const char *str = env->GetStringUTFChars(ip_port, nullptr); + ccl::string ccl_ip_port(str); + auto comm = createDalCommunicator(executorNum, rank, ccl_ip_port); - auto queue = getAssignedGPU(device, gpuIndices); - - ccl::shared_ptr_class &kvs = getKvs(); - auto comm = - preview::spmd::make_communicator( - queue, executorNum, rank, kvs); ret = doKMeansOneAPICompute(env, pNumTabData, numRows, numCols, pNumTabCenters, clusterNum, tolerance, iterationNum, comm, resultObj); - env->ReleaseIntArrayElements(gpuIdxArray, gpuIndices, 0); + env->ReleaseStringUTFChars(ip_port, str); break; } #endif diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala new file mode 100644 index 000000000..e3e0aab58 --- /dev/null +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2020 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.mllib + +import org.apache.spark.TaskContext +import org.apache.spark.rdd.RDD + +object CommonJob { + + def setAffinityMask(data: RDD[_], useDevice: String): Unit = { + data.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) + Iterator.empty + }.count() + } + + def createCCLInit(data: RDD[_], executorNum: Int, kvsIPPort: String, useDevice: String): Unit = { + if (useDevice == "CPU") { + data.mapPartitionsWithIndex { (rank, table) => + OneCCL.init(executorNum, rank, kvsIPPort) + Iterator.empty + }.count() + } + } + +} diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index 70479c79a..6a2da4ac7 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -16,7 +16,7 @@ package com.intel.oap.mllib.classification import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.annotation.Since import org.apache.spark.TaskContext @@ -75,21 +75,8 @@ class RandomForestClassifierDALImpl(val uid: String, rfcTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(labeledPointsTables) - labeledPointsTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) - Iterator.empty - }.count() - - labeledPointsTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.setAffinityMask(labeledPointsTables, useDevice) + CommonJob.createCCLInit(labeledPointsTables, executorNum, kvsIPPort, useDevice) rfcTimer.record("OneCCL Init") val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) => diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 14eb16800..61dd1ef80 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.clustering import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -51,21 +51,9 @@ class KMeansDALImpl(var nClusters: Int, kmeansTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(coalescedTables) - coalescedTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) - Iterator.empty - }.count() - - coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + + CommonJob.setAffinityMask(coalescedTables, useDevice) + CommonJob.createCCLInit(coalescedTables, executorNum, kvsIPPort, useDevice) kmeansTimer.record("OneCCL Init") val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => @@ -118,7 +106,9 @@ class KMeansDALImpl(var nClusters: Int, } else { Iterator.empty } - OneCCL.cleanup() + if (useDevice == "CPU") { + OneCCL.cleanup() + } ret }.collect() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index 7190ade3f..071117cc0 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -19,7 +19,7 @@ package com.intel.oap.mllib.feature import java.nio.DoubleBuffer import com.intel.daal.data_management.data.{HomogenNumericTable, NumericTable} import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Service, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Service, Utils} import org.apache.spark.TaskContext import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging @@ -59,21 +59,8 @@ class PCADALImpl(val k: Int, val kvsIPPort = getOneCCLIPPort(coalescedTables) pcaTimer.record("Data Convertion") - coalescedTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) - Iterator.empty - }.count() - - coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.setAffinityMask(coalescedTables, useDevice) + CommonJob.createCCLInit(coalescedTables, executorNum, kvsIPPort, useDevice) pcaTimer.record("OneCCL Init") val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index 40b2f4423..79243f988 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.regression import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.SparkException import org.apache.spark.TaskContext @@ -106,16 +106,9 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, } lrTimer.record("Data Convertion") - labeledPointsTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) - Iterator.empty - }.count() + CommonJob.setAffinityMask(labeledPointsTables, useDevice) + CommonJob.createCCLInit(labeledPointsTables, executorNum, kvsIPPort, useDevice) + lrTimer.record("OneCCL Init") val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) => val (feature, label) = tables.next() @@ -132,7 +125,6 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, (label.toString.toLong, 0L, 0L) } - OneCCL.init(executorNum, rank, kvsIPPort) val result = new LiRResult() val gpuIndices = if (useDevice == "GPU") { diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index 77ea4c656..100be8823 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.regression import com.intel.oap.mllib.Utils.getOneCCLIPPort import com.intel.oap.mllib.classification.{LearningNode, RandomForestResult} -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -69,21 +69,8 @@ class RandomForestRegressorDALImpl(val uid: String, val kvsIPPort = getOneCCLIPPort(labeledPointsTables) - labeledPointsTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) - Iterator.empty - }.count() - - labeledPointsTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.setAffinityMask(labeledPointsTables, useDevice) + CommonJob.createCCLInit(labeledPointsTables, executorNum, kvsIPPort, useDevice) rfrTimer.record("OneCCL Init") val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) => diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index fff2d4ac5..04a3760bb 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -17,7 +17,7 @@ package com.intel.oap.mllib.stat import com.intel.oap.mllib.Utils.getOneCCLIPPort -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import com.intel.oneapi.dal.table.Common import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -46,23 +46,10 @@ class CorrelationDALImpl( val kvsIPPort = getOneCCLIPPort(coalescedTables) - coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.setAffinityMask(coalescedTables, useDevice) + CommonJob.createCCLInit(coalescedTables, executorNum, kvsIPPort, useDevice) corTimer.record("OneCCL Init") - coalescedTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) - Iterator.empty - }.count() - val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") { iter.next() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index dcde7ef91..c8422b097 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -16,7 +16,7 @@ package com.intel.oap.mllib.stat -import com.intel.oap.mllib.{OneCCL, OneDAL, Utils} +import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.Vector @@ -47,23 +47,10 @@ class SummarizerDALImpl(val executorNum: Int, val kvsIPPort = getOneCCLIPPort(data) - coalescedTables.mapPartitionsWithIndex { (rank, table) => - OneCCL.init(executorNum, rank, kvsIPPort) - Iterator.empty - }.count() + CommonJob.setAffinityMask(coalescedTables, useDevice) + CommonJob.createCCLInit(coalescedTables, executorNum, kvsIPPort, useDevice) sumTimer.record("OneCCL Init") - coalescedTables.mapPartitionsWithIndex { (rank, iter) => - val gpuIndices = if (useDevice == "GPU") { - val resources = TaskContext.get().resources() - resources("gpu").addresses.map(_.toInt) - } else { - null - } - OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString()) - Iterator.empty - }.count() - val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") { iter.next() diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/ConvertHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/ConvertHomogenTableSuite.scala index bbb6bbe7e..3246387b3 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/ConvertHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/ConvertHomogenTableSuite.scala @@ -57,7 +57,7 @@ class ConvertHomogenTableSuite extends FunctionsSuite with Logging { val metadata = table.getMetaData for (i <- 0 until 10) { assert(metadata.getDataType(i) == FLOAT64) - assert(metadata.getFeatureType(i) == Common.FeatureType.RATIO) + assert(metadata.getFeatureType(i) == CommonJob.FeatureType.RATIO) } assertArrayEquals(table.getDoubleData, TestCommon.convertArray(data)) @@ -75,7 +75,7 @@ class ConvertHomogenTableSuite extends FunctionsSuite with Logging { val metadata = table.getMetaData for (i <- 0 until 10) { assert(metadata.getDataType(i) == FLOAT64) - assert(metadata.getFeatureType(i) == Common.FeatureType.RATIO) + assert(metadata.getFeatureType(i) == CommonJob.FeatureType.RATIO) } assertArrayEquals(table.getDoubleData, data) @@ -105,7 +105,7 @@ class ConvertHomogenTableSuite extends FunctionsSuite with Logging { val metadata = table.getMetaData for (i <- 0 until 10) { assert(metadata.getDataType(i) == FLOAT64) - assert(metadata.getFeatureType(i) == Common.FeatureType.RATIO) + assert(metadata.getFeatureType(i) == CommonJob.FeatureType.RATIO) } assertArrayEquals(table.getDoubleData, TestCommon.convertArray(data)) diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala index 34361766d..98d37a338 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/CorrelationHomogenTableSuite.scala @@ -45,7 +45,7 @@ class CorrelationHomogenTableSuite extends FunctionsSuite with Logging { val correlationDAL = new CorrelationDALImpl(1, 1) val gpuIndices = Array(0) val result = new CorrelationResult() - correlationDAL.cCorrelationTrainDAL(dataTable.getcObejct(), sourceData.length, sourceData(0).length, 1, 1, Common.ComputeDevice.HOST.ordinal(), gpuIndices, result); + correlationDAL.cCorrelationTrainDAL(dataTable.getcObejct(), sourceData.length, sourceData(0).length, 1, 1, CommonJob.ComputeDevice.HOST.ordinal(), gpuIndices, result); val correlationMatrix = TestCommon.getMatrixFromTable(OneDAL.makeHomogenTable( result.getCorrelationNumericTable), TestCommon.getComputeDevice) diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala index 712cccbfa..5917af2e1 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/SummarizerHomogenTableSuite.scala @@ -31,15 +31,15 @@ class SummarizerHomogenTableSuite extends FunctionsSuite with Logging{ val sourceData = TestCommon.readCSV("src/test/resources/data/covcormoments_dense.csv") - val dataTable = new HomogenTable(sourceData.length, sourceData(0).length, TestCommon.convertArray(sourceData), Common.ComputeDevice.HOST); + val dataTable = new HomogenTable(sourceData.length, sourceData(0).length, TestCommon.convertArray(sourceData), CommonJob.ComputeDevice.HOST); val summarizerDAL = new SummarizerDALImpl(1, 1) val gpuIndices = Array(0) val result = new SummarizerResult() - summarizerDAL.cSummarizerTrainDAL(dataTable.getcObejct(), sourceData.length, sourceData(0).length, 1, 1, Common.ComputeDevice.HOST.ordinal(), gpuIndices, result) - val meanTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMeanNumericTable), Common.ComputeDevice.HOST) - val varianceTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getVarianceNumericTable), Common.ComputeDevice.HOST) - val minimumTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMinimumNumericTable), Common.ComputeDevice.HOST) - val maximumTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMaximumNumericTable), Common.ComputeDevice.HOST) + summarizerDAL.cSummarizerTrainDAL(dataTable.getcObejct(), sourceData.length, sourceData(0).length, 1, 1, CommonJob.ComputeDevice.HOST.ordinal(), gpuIndices, result) + val meanTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMeanNumericTable), CommonJob.ComputeDevice.HOST) + val varianceTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getVarianceNumericTable), CommonJob.ComputeDevice.HOST) + val minimumTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMinimumNumericTable), CommonJob.ComputeDevice.HOST) + val maximumTable = OneDAL.homogenTable1xNToVector(OneDAL.makeHomogenTable(result.getMaximumNumericTable), CommonJob.ComputeDevice.HOST) assertArrayEquals(expectMean , meanTable.toArray, 0.000001) assertArrayEquals(expectVariance, varianceTable.toDense.values, 0.000001) diff --git a/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala b/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala index 5a2ecef27..9ae20cec4 100644 --- a/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala +++ b/mllib-dal/src/test/scala/com/intel/oap/mllib/TestCommon.scala @@ -84,7 +84,7 @@ object TestCommon { arrayDouble } def getMatrixFromTable(table: HomogenTable, - device: Common.ComputeDevice): DenseMatrix = { + device: CommonJob.ComputeDevice): DenseMatrix = { val numRows = table.getRowCount.toInt val numCols = table.getColumnCount.toInt // returned DoubleBuffer is ByteByffer, need to copy as double array @@ -97,14 +97,14 @@ object TestCommon { matrix } - def getComputeDevice: Common.ComputeDevice = { + def getComputeDevice: CommonJob.ComputeDevice = { val device = System.getProperty("computeDevice") - var computeDevice: Common.ComputeDevice = Common.ComputeDevice.HOST + var computeDevice: CommonJob.ComputeDevice = CommonJob.ComputeDevice.HOST if(device != null) { device.toUpperCase match { - case "HOST" => computeDevice = Common.ComputeDevice.HOST - case "CPU" => computeDevice = Common.ComputeDevice.CPU - case "GPU" => computeDevice = Common.ComputeDevice.GPU + case "HOST" => computeDevice = CommonJob.ComputeDevice.HOST + case "CPU" => computeDevice = CommonJob.ComputeDevice.CPU + case "GPU" => computeDevice = CommonJob.ComputeDevice.GPU case _ => "Invalid Device" } }