From 657927054344ec047c2e2e9bfb125f2cd712b1bc Mon Sep 17 00:00:00 2001 From: Kaifei Yi Date: Fri, 13 Dec 2024 08:21:55 +0800 Subject: [PATCH] [GLUTEN-8187][VL] Support velox cache metrics (#8188) Velox backend supports cache functionality controlled by the config spark.gluten.sql.columnar.backend.velox.cacheEnabled, When the cache is enabled, we want to measure its effectiveness through a series of metrics. --- .../java/org/apache/gluten/metrics/Metrics.java | 12 ++++++++++++ .../apache/gluten/metrics/OperatorMetrics.java | 9 +++++++++ .../backendsapi/velox/VeloxMetricsApi.scala | 15 ++++++++++++--- .../gluten/metrics/BatchScanMetricsUpdater.scala | 3 +++ .../metrics/FileSourceScanMetricsUpdater.scala | 6 ++++++ .../metrics/HiveTableScanMetricsUpdater.scala | 6 ++++++ .../org/apache/gluten/metrics/MetricsUtil.scala | 9 +++++++++ .../gluten/execution/VeloxMetricsSuite.scala | 12 ++++++++++++ cpp/core/jni/JniWrapper.cc | 8 +++++++- cpp/core/utils/Metrics.h | 3 +++ cpp/velox/compute/WholeStageResultIterator.cc | 7 +++++++ 11 files changed, 86 insertions(+), 4 deletions(-) diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java index b0fccff98180..fb9745ae9070 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java @@ -48,6 +48,9 @@ public class Metrics implements IMetrics { public long[] processedStrides; public long[] remainingFilterTime; public long[] ioWaitTime; + public long[] storageReadBytes; + public long[] localReadBytes; + public long[] ramReadBytes; public long[] preloadSplits; public long[] physicalWrittenBytes; @@ -88,6 +91,9 @@ public Metrics( long[] processedStrides, long[] remainingFilterTime, long[] ioWaitTime, + long[] storageReadBytes, + long[] localReadBytes, + long[] ramReadBytes, long[] preloadSplits, long[] physicalWrittenBytes, long[] writeIOTime, @@ -122,6 +128,9 @@ public Metrics( this.processedStrides = processedStrides; this.remainingFilterTime = remainingFilterTime; this.ioWaitTime = ioWaitTime; + this.storageReadBytes = storageReadBytes; + this.localReadBytes = localReadBytes; + this.ramReadBytes = ramReadBytes; this.preloadSplits = preloadSplits; this.physicalWrittenBytes = physicalWrittenBytes; this.writeIOTime = writeIOTime; @@ -163,6 +172,9 @@ public OperatorMetrics getOperatorMetrics(int index) { processedStrides[index], remainingFilterTime[index], ioWaitTime[index], + storageReadBytes[index], + localReadBytes[index], + ramReadBytes[index], preloadSplits[index], physicalWrittenBytes[index], writeIOTime[index], diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java index 6e8fbb100ffa..36d827a288b6 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java @@ -46,6 +46,9 @@ public class OperatorMetrics implements IOperatorMetrics { public long processedStrides; public long remainingFilterTime; public long ioWaitTime; + public long storageReadBytes; + public long localReadBytes; + public long ramReadBytes; public long preloadSplits; public long physicalWrittenBytes; @@ -83,6 +86,9 @@ public OperatorMetrics( long processedStrides, long remainingFilterTime, long ioWaitTime, + long storageReadBytes, + long localReadBytes, + long ramReadBytes, long preloadSplits, long physicalWrittenBytes, long writeIOTime, @@ -116,6 +122,9 @@ public OperatorMetrics( this.processedStrides = processedStrides; this.remainingFilterTime = remainingFilterTime; this.ioWaitTime = ioWaitTime; + this.storageReadBytes = storageReadBytes; + this.localReadBytes = localReadBytes; + this.ramReadBytes = ramReadBytes; this.preloadSplits = preloadSplits; this.physicalWrittenBytes = physicalWrittenBytes; this.writeIOTime = writeIOTime; diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 934b680382ea..bf92dac2dda0 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -100,7 +100,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric( sparkContext, "remaining filter time"), - "ioWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "io wait time") + "ioWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "io wait time"), + "storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"), + "localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"), + "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes") ) override def genBatchScanTransformerMetricsUpdater( @@ -138,7 +141,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric( sparkContext, "remaining filter time"), - "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time") + "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"), + "storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"), + "localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"), + "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes") ) override def genHiveTableScanTransformerMetricsUpdater( @@ -176,7 +182,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric( sparkContext, "remaining filter time"), - "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time") + "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"), + "storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"), + "localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"), + "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes") ) override def genFileSourceScanTransformerMetricsUpdater( diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala index 16eabedfd60e..31f6b5a313e4 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala @@ -50,6 +50,9 @@ class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metri metrics("processedStrides") += operatorMetrics.processedStrides metrics("remainingFilterTime") += operatorMetrics.remainingFilterTime metrics("ioWaitTime") += operatorMetrics.ioWaitTime + metrics("storageReadBytes") += operatorMetrics.storageReadBytes + metrics("localReadBytes") += operatorMetrics.localReadBytes + metrics("ramReadBytes") += operatorMetrics.ramReadBytes metrics("preloadSplits") += operatorMetrics.preloadSplits } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala index f76d97d30e43..f8693f9246e3 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala @@ -46,6 +46,9 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric val processedStrides: SQLMetric = metrics("processedStrides") val remainingFilterTime: SQLMetric = metrics("remainingFilterTime") val ioWaitTime: SQLMetric = metrics("ioWaitTime") + val storageReadBytes: SQLMetric = metrics("storageReadBytes") + val localReadBytes: SQLMetric = metrics("localReadBytes") + val ramReadBytes: SQLMetric = metrics("ramReadBytes") override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { inputMetrics.bridgeIncBytesRead(rawInputBytes.value) @@ -73,6 +76,9 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric processedStrides += operatorMetrics.processedStrides remainingFilterTime += operatorMetrics.remainingFilterTime ioWaitTime += operatorMetrics.ioWaitTime + storageReadBytes += operatorMetrics.storageReadBytes + localReadBytes += operatorMetrics.localReadBytes + ramReadBytes += operatorMetrics.ramReadBytes preloadSplits += operatorMetrics.preloadSplits } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala index 00ef8d2b6c77..f7e0dc180091 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala @@ -41,6 +41,9 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric] val processedStrides: SQLMetric = metrics("processedStrides") val remainingFilterTime: SQLMetric = metrics("remainingFilterTime") val ioWaitTime: SQLMetric = metrics("ioWaitTime") + val storageReadBytes: SQLMetric = metrics("storageReadBytes") + val localReadBytes: SQLMetric = metrics("localReadBytes") + val ramReadBytes: SQLMetric = metrics("ramReadBytes") override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { inputMetrics.bridgeIncBytesRead(rawInputBytes.value) @@ -68,6 +71,9 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric] processedStrides += operatorMetrics.processedStrides remainingFilterTime += operatorMetrics.remainingFilterTime ioWaitTime += operatorMetrics.ioWaitTime + storageReadBytes += operatorMetrics.storageReadBytes + localReadBytes += operatorMetrics.localReadBytes + ramReadBytes += operatorMetrics.ramReadBytes preloadSplits += operatorMetrics.preloadSplits } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index b8ef1620f905..dc32fc5d9661 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -124,6 +124,9 @@ object MetricsUtil extends Logging { var processedStrides: Long = 0 var remainingFilterTime: Long = 0 var ioWaitTime: Long = 0 + var storageReadBytes: Long = 0 + var localReadBytes: Long = 0 + var ramReadBytes: Long = 0 var preloadSplits: Long = 0 var numWrittenFiles: Long = 0 @@ -151,6 +154,9 @@ object MetricsUtil extends Logging { processedStrides += metrics.processedStrides remainingFilterTime += metrics.remainingFilterTime ioWaitTime += metrics.ioWaitTime + storageReadBytes += metrics.storageReadBytes + localReadBytes += metrics.localReadBytes + ramReadBytes += metrics.ramReadBytes preloadSplits += metrics.preloadSplits numWrittenFiles += metrics.numWrittenFiles } @@ -185,6 +191,9 @@ object MetricsUtil extends Logging { processedStrides, remainingFilterTime, ioWaitTime, + storageReadBytes, + localReadBytes, + ramReadBytes, preloadSplits, physicalWrittenBytes, writeIOTime, diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala index e41f00821c27..a7ad8514f93a 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala @@ -263,4 +263,16 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } } } + + test("Velox cache metrics") { + val df = spark.sql(s"SELECT * FROM metrics_t1") + val scans = collect(df.queryExecution.executedPlan) { + case scan: FileSourceScanExecTransformer => scan + } + df.collect() + assert(scans.length === 1) + val metrics = scans.head.metrics + assert(metrics("storageReadBytes").value > 0) + assert(metrics("ramReadBytes").value == 0) + } } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 61cedf85763e..d8bf9229e46d 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -229,7 +229,10 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;"); metricsBuilderConstructor = getMethodIdOrError( - env, metricsBuilderClass, "", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V"); + env, + metricsBuilderClass, + "", + "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V"); nativeColumnarToRowInfoClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;"); @@ -574,6 +577,9 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp longArray[Metrics::kProcessedStrides], longArray[Metrics::kRemainingFilterTime], longArray[Metrics::kIoWaitTime], + longArray[Metrics::kStorageReadBytes], + longArray[Metrics::kLocalReadBytes], + longArray[Metrics::kRamReadBytes], longArray[Metrics::kPreloadSplits], longArray[Metrics::kPhysicalWrittenBytes], longArray[Metrics::kWriteIOTime], diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h index 8da13bc91635..761f5fbe7914 100644 --- a/cpp/core/utils/Metrics.h +++ b/cpp/core/utils/Metrics.h @@ -73,6 +73,9 @@ struct Metrics { kProcessedStrides, kRemainingFilterTime, kIoWaitTime, + kStorageReadBytes, + kLocalReadBytes, + kRamReadBytes, kPreloadSplits, // Write metrics. diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index d2f825dcfa8e..ba67c182555a 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -41,6 +41,9 @@ const std::string kSkippedStrides = "skippedStrides"; const std::string kProcessedStrides = "processedStrides"; const std::string kRemainingFilterTime = "totalRemainingFilterTime"; const std::string kIoWaitTime = "ioWaitWallNanos"; +const std::string kStorageReadBytes = "storageReadBytes"; +const std::string kLocalReadBytes = "localReadBytes"; +const std::string kRamReadBytes = "ramReadBytes"; const std::string kPreloadSplits = "readyPreloadedSplits"; const std::string kNumWrittenFiles = "numWrittenFiles"; const std::string kWriteIOTime = "writeIOTime"; @@ -422,6 +425,10 @@ void WholeStageResultIterator::collectMetrics() { metrics_->get(Metrics::kRemainingFilterTime)[metricIndex] = runtimeMetric("sum", second->customStats, kRemainingFilterTime); metrics_->get(Metrics::kIoWaitTime)[metricIndex] = runtimeMetric("sum", second->customStats, kIoWaitTime); + metrics_->get(Metrics::kStorageReadBytes)[metricIndex] = + runtimeMetric("sum", second->customStats, kStorageReadBytes); + metrics_->get(Metrics::kLocalReadBytes)[metricIndex] = runtimeMetric("sum", second->customStats, kLocalReadBytes); + metrics_->get(Metrics::kRamReadBytes)[metricIndex] = runtimeMetric("sum", second->customStats, kRamReadBytes); metrics_->get(Metrics::kPreloadSplits)[metricIndex] = runtimeMetric("sum", entry.second->customStats, kPreloadSplits); metrics_->get(Metrics::kNumWrittenFiles)[metricIndex] =