From 050e0188ada0f45601098bd3d8b6d699ed410a96 Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Thu, 21 Mar 2024 14:01:22 -0700 Subject: [PATCH] [Spark] OPTIMIZE on clustered table with no clustering columns should run compaction (#2777) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Currently, when running OPTIMIZE on a clustered table without any clustering columns(after ALTER TABLE CLUSTER BY NONE), it would fail with a long stack trace: ``` [info] org.apache.spark.SparkException: Exception thrown in awaitResult: [info] at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56) [info] at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310) [info] at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:383) [info] at org.apache.spark.sql.delta.commands.OptimizeExecutor.$anonfun$optimize$1(OptimizeTableCommand.scala:276) ... [info] Cause: java.util.concurrent.ExecutionException: Boxed Error [info] at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) ... [info] Cause: java.lang.AssertionError: assertion failed: Cannot cluster by zero columns! [info] at scala.Predef$.assert(Predef.scala:223) [info] at org.apache.spark.sql.delta.skipping.MultiDimClustering$.cluster(MultiDimClustering.scala:51) [info] at org.apache.spark.sql.delta.commands.OptimizeExecutor.runOptimizeBinJob(OptimizeTableCommand.scala:427) [info] at org.apache.spark.sql.delta.commands.OptimizeExecutor.$anonfun$optimize$6(OptimizeTableCommand.scala:277) ... ``` This change makes OPTIMIZE on a clustered table without any clustering columns run regular compaction, which is the desired behavior. ## How was this patch tested? This change adds a new test to verify the correct behavior. The test would fail without the fix. ## Does this PR introduce _any_ user-facing changes? No. --- .../delta/commands/OptimizeTableCommand.scala | 4 +- .../commands/OptimizeTableStrategy.scala | 3 +- .../clustering/ClusteredTableDDLSuite.scala | 45 +++++++++++++++++++ 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index a38967ee0aa..51393508ff2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -239,7 +239,9 @@ class OptimizeExecutor( private val isClusteredTable = ClusteredTableUtils.isSupported(txn.snapshot.protocol) - private val isMultiDimClustering = isClusteredTable || zOrderByColumns.nonEmpty + private val isMultiDimClustering = + optimizeStrategy.isInstanceOf[ClusteringStrategy] || + optimizeStrategy.isInstanceOf[ZOrderStrategy] private val clusteringColumns: Seq[String] = { if (zOrderByColumns.nonEmpty) { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala index eb361dc41be..dc84e606440 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala @@ -118,8 +118,9 @@ object OptimizeTableStrategy { private def getMode(snapshot: Snapshot, zOrderBy: Seq[String]): OptimizeTableMode.Value = { val isClusteredTable = ClusteredTableUtils.isSupported(snapshot.protocol) + val hasClusteringColumns = ClusteringColumnInfo.extractLogicalNames(snapshot).nonEmpty val isZOrderBy = zOrderBy.nonEmpty - if (isClusteredTable) { + if (isClusteredTable && hasClusteringColumns) { assert(!isZOrderBy) OptimizeTableMode.CLUSTERING } else if (isZOrderBy) { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala index 01499677f93..fbbaf7578df 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala @@ -504,6 +504,8 @@ trait ClusteredTableDDLSuiteBase extends ClusteredTableCreateOrReplaceDDLSuite with DeltaSQLCommandTest { + import testImplicits._ + test("cluster by with more than 4 columns - alter table") { val testTable = "test_table" withClusteredTable(testTable, "a INT, b INT, c INT, d INT, e INT", "a") { @@ -564,6 +566,49 @@ trait ClusteredTableDDLSuiteBase } } + test("optimize clustered table and trigger regular compaction") { + withClusteredTable(testTable, "a INT, b STRING", "a, b") { + val tableIdentifier = TableIdentifier(testTable) + verifyClusteringColumns(tableIdentifier, "a, b") + + (1 to 1000).map(i => (i, i.toString)).toDF("a", "b") + .write.mode("append").format("delta").saveAsTable(testTable) + + val (deltaLog, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(testTable)) + val targetFileSize = (snapshot.sizeInBytes / 10).toString + withSQLConf( + DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> targetFileSize, + DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> targetFileSize) { + runOptimize(testTable) { metrics => + assert(metrics.numFilesAdded > 0) + assert(metrics.numFilesRemoved > 0) + assert(metrics.clusteringStats.nonEmpty) + assert(metrics.clusteringStats.get.numOutputZCubes == 1) + } + } + + // ALTER TABLE CLUSTER BY NONE and then OPTIMIZE to trigger regular compaction. + sql(s"ALTER TABLE $testTable CLUSTER BY NONE") + verifyClusteringColumns(tableIdentifier, "") + + (1001 to 2000).map(i => (i, i.toString)).toDF("a", "b") + .repartition(10).write.mode("append").format("delta").saveAsTable(testTable) + val newSnapshot = deltaLog.update() + val newTargetFileSize = (newSnapshot.sizeInBytes / 10).toString + withSQLConf( + DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> newTargetFileSize, + DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> newTargetFileSize) { + runOptimize(testTable) { metrics => + assert(metrics.numFilesAdded > 0) + assert(metrics.numFilesRemoved > 0) + // No clustering or zorder stats indicates regular compaction. + assert(metrics.clusteringStats.isEmpty) + assert(metrics.zOrderStats.isEmpty) + } + } + } + } + test("optimize clustered table - error scenarios") { withClusteredTable(testTable, "a INT, b STRING", "a") { // Specify partition predicate.