diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DomainMetadataUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DomainMetadataUtils.scala index af633012234..94df58755fb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DomainMetadataUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DomainMetadataUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.actions.{Action, DomainMetadata, Protocol} +import org.apache.spark.sql.delta.clustering.ClusteringMetadataDomain import org.apache.spark.sql.delta.metering.DeltaLogging object DomainMetadataUtils extends DeltaLogging { @@ -27,6 +28,10 @@ object DomainMetadataUtils extends DeltaLogging { private val METADATA_DOMAIN_TO_COPY_FOR_RESTORE_TABLE = METADATA_DOMAINS_TO_REMOVE_FOR_REPLACE_TABLE + // List of metadata domains that will be copied from the table on a CLONE operation. + private val METADATA_DOMAIN_TO_COPY_FOR_CLONE_TABLE: Set[String] = Set( + ClusteringMetadataDomain.domainName) + /** * Returns whether the protocol version supports the [[DomainMetadata]] action. */ @@ -96,4 +101,14 @@ object DomainMetadataUtils extends DeltaLogging { METADATA_DOMAIN_TO_COPY_FOR_RESTORE_TABLE.contains(m.domain) } } + + /** + * Generates sequence of DomainMetadata to commit for CLONE TABLE command. + */ + def handleDomainMetadataForCloneTable( + sourceDomainMetadatas: Seq[DomainMetadata]): Seq[DomainMetadata] = { + sourceDomainMetadatas.filter { m => + METADATA_DOMAIN_TO_COPY_FOR_CLONE_TABLE.contains(m.domain) + } + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala index 09c0e5a10d0..af4d66993cc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala @@ -22,6 +22,7 @@ import java.util.UUID import scala.collection.JavaConverters._ +import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.metering.DeltaLogging @@ -226,6 +227,13 @@ abstract class CloneTableBase( // CLONE does not preserve Row IDs and Commit Versions copiedFile.copy(baseRowId = None, defaultRowCommitVersion = None) } + sourceTable.snapshot.foreach { sourceSnapshot => + // Handle DomainMetadata for cloning a table. + if (deltaOperation.name == DeltaOperations.OP_CLONE) { + actions ++= + DomainMetadataUtils.handleDomainMetadataForCloneTable(sourceSnapshot.domainMetadata) + } + } val sourceName = sourceTable.name // Override source table metadata with user-defined table properties val context = Map[String, String]() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala index e8feeafe5ba..18ebcba41fa 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala @@ -141,6 +141,9 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession doAssert(lastOperationParameters(CLUSTERING_PARAMETER_KEY) === "[]") doAssert(lastOperationParameters(ZORDER_PARAMETER_KEY) === "[]") } + case "CLONE" => + // CLUSTER BY not in operation parameters for CLONE - similar to PARTITION BY. + doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY)) case o if clusterBySupportedOperations.contains(o) => if (expectClustering) { assertClusterByExists() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala index fbbaf7578df..40b26c4ee87 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, Del import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{ArrayType, IntegerType, StructField, StructType} @@ -666,6 +667,58 @@ trait ClusteredTableDDLSuiteBase } } } + + test("validate CLONE on clustered table") { + import testImplicits._ + val srcTable = "SrcTbl" + val dstTable1 = "DestTbl1" + val dstTable2 = "DestTbl2" + val dstTable3 = "DestTbl3" + + withTable(srcTable, dstTable1, dstTable2, dstTable3) { + // Create the source table. + sql(s"CREATE TABLE $srcTable (col1 INT, col2 INT, col3 INT) " + + s"USING delta CLUSTER BY (col1, col2)") + val tableIdent = new TableIdentifier(srcTable) + (1 to 100).map(i => (i, i + 1000, i + 100)).toDF("col1", "col2", "col3") + .repartitionByRange(100, col("col1")) + .write.format("delta").mode("append").saveAsTable(srcTable) + + // Force clustering on the source table. + val (_, srcSnapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdent) + val ingestionSize = srcSnapshot.allFiles.collect().map(_.size).sum + withSQLConf( + DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> (ingestionSize / 4).toString) { + runOptimize(srcTable) { res => + assert(res.numFilesAdded === 4) + assert(res.numFilesRemoved === 100) + } + } + + // Create destination table as a clone of the source table. + sql(s"CREATE TABLE $dstTable1 SHALLOW CLONE $srcTable") + + // Validate clustering columns and that clustering columns in stats schema. + val (_, dstSnapshot1) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(dstTable1)) + verifyClusteringColumns(TableIdentifier(dstTable1), "col1,col2") + ClusteredTableUtils.validateClusteringColumnsInStatsSchema(dstSnapshot1, Seq("col1", "col2")) + + // Change to CLUSTER BY NONE, then CLONE from earlier version to validate that the + // clustering column information is maintainted. + sql(s"ALTER TABLE $srcTable CLUSTER BY NONE") + sql(s"CREATE TABLE $dstTable2 SHALLOW CLONE $srcTable VERSION AS OF 2") + val (_, dstSnapshot2) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(dstTable2)) + verifyClusteringColumns(TableIdentifier(dstTable2), "col1,col2") + ClusteredTableUtils.validateClusteringColumnsInStatsSchema(dstSnapshot2, Seq("col1", "col2")) + + // Validate CLONE after CLUSTER BY NONE + sql(s"CREATE TABLE $dstTable3 SHALLOW CLONE $srcTable") + val (_, dstSnapshot3) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(dstTable3)) + verifyClusteringColumns(TableIdentifier(dstTable3), "") + ClusteredTableUtils.validateClusteringColumnsInStatsSchema(dstSnapshot3, Seq.empty) + + } + } } trait ClusteredTableDDLSuite extends ClusteredTableDDLSuiteBase