Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Aug 26, 2024
1 parent 540da45 commit ef26b85
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.gluten.metrics

import org.apache.gluten.substrait.AggregationParams

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkMetricsUtil
import org.apache.spark.util.TaskResources
Expand Down Expand Up @@ -83,9 +84,11 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric])
idx += 1
}
if (TaskResources.inSparkTask()) {
SparkMetricsUtil.incMemoryBytesSpilled(TaskResources.getLocalTaskContext().taskMetrics(),
SparkMetricsUtil.incMemoryBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
aggMetrics.spilledInputBytes)
SparkMetricsUtil.incDiskBytesSpilled(TaskResources.getLocalTaskContext().taskMetrics(),
SparkMetricsUtil.incDiskBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
aggMetrics.spilledBytes)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.gluten.metrics

import org.apache.gluten.metrics.Metrics.SingleMetric
import org.apache.gluten.substrait.JoinParams

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkMetricsUtil
import org.apache.spark.util.TaskResources
Expand Down Expand Up @@ -152,13 +153,17 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
idx += 1
}
if (TaskResources.inSparkTask()) {
SparkMetricsUtil.incMemoryBytesSpilled(TaskResources.getLocalTaskContext().taskMetrics(),
SparkMetricsUtil.incMemoryBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
hashProbeMetrics.spilledInputBytes)
SparkMetricsUtil.incDiskBytesSpilled(TaskResources.getLocalTaskContext().taskMetrics(),
SparkMetricsUtil.incDiskBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
hashProbeMetrics.spilledBytes)
SparkMetricsUtil.incMemoryBytesSpilled(TaskResources.getLocalTaskContext().taskMetrics(),
SparkMetricsUtil.incMemoryBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
hashBuildMetrics.spilledInputBytes)
SparkMetricsUtil.incDiskBytesSpilled(TaskResources.getLocalTaskContext().taskMetrics(),
SparkMetricsUtil.incDiskBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
hashBuildMetrics.spilledBytes)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpd
metrics("spilledPartitions") += operatorMetrics.spilledPartitions
metrics("spilledFiles") += operatorMetrics.spilledFiles
if (TaskResources.inSparkTask()) {
SparkMetricsUtil.incMemoryBytesSpilled(TaskResources.getLocalTaskContext().taskMetrics(),
SparkMetricsUtil.incMemoryBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
operatorMetrics.spilledInputBytes)
SparkMetricsUtil.incDiskBytesSpilled(TaskResources.getLocalTaskContext().taskMetrics(),
SparkMetricsUtil.incDiskBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
operatorMetrics.spilledBytes)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.utils

import org.apache.spark.executor.TaskMetrics

object SparkMetricsUtil {
def incMemoryBytesSpilled(task: TaskMetrics, v: Long): Unit = task.incMemoryBytesSpilled(v)
def incDiskBytesSpilled(task: TaskMetrics, v: Long): Unit = task.incDiskBytesSpilled(v)
def incMemoryBytesSpilled(task: TaskMetrics, v: Long): Unit = task.incMemoryBytesSpilled(v)
def incDiskBytesSpilled(task: TaskMetrics, v: Long): Unit = task.incDiskBytesSpilled(v)
}

0 comments on commit ef26b85

Please sign in to comment.