Skip to content

Commit

Permalink
[Spark] Add CLONE support for clustered tables (delta-io#2802)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
Adds support for CLONE operation on clustered tables.

## How was this patch tested?
Test added to `ClusteredTableDDLSuite`

## Does this PR introduce _any_ user-facing changes?
No.
  • Loading branch information
chirag-s-db authored Mar 29, 2024
1 parent 9028303 commit 0983543
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Action, DomainMetadata, Protocol}
import org.apache.spark.sql.delta.clustering.ClusteringMetadataDomain
import org.apache.spark.sql.delta.metering.DeltaLogging

object DomainMetadataUtils extends DeltaLogging {
Expand All @@ -27,6 +28,10 @@ object DomainMetadataUtils extends DeltaLogging {
private val METADATA_DOMAIN_TO_COPY_FOR_RESTORE_TABLE =
METADATA_DOMAINS_TO_REMOVE_FOR_REPLACE_TABLE

// List of metadata domains that will be copied from the table on a CLONE operation.
private val METADATA_DOMAIN_TO_COPY_FOR_CLONE_TABLE: Set[String] = Set(
ClusteringMetadataDomain.domainName)

/**
* Returns whether the protocol version supports the [[DomainMetadata]] action.
*/
Expand Down Expand Up @@ -96,4 +101,14 @@ object DomainMetadataUtils extends DeltaLogging {
METADATA_DOMAIN_TO_COPY_FOR_RESTORE_TABLE.contains(m.domain)
}
}

/**
* Generates sequence of DomainMetadata to commit for CLONE TABLE command.
*/
def handleDomainMetadataForCloneTable(
sourceDomainMetadatas: Seq[DomainMetadata]): Seq[DomainMetadata] = {
sourceDomainMetadatas.filter { m =>
METADATA_DOMAIN_TO_COPY_FOR_CLONE_TABLE.contains(m.domain)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.UUID

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -226,6 +227,13 @@ abstract class CloneTableBase(
// CLONE does not preserve Row IDs and Commit Versions
copiedFile.copy(baseRowId = None, defaultRowCommitVersion = None)
}
sourceTable.snapshot.foreach { sourceSnapshot =>
// Handle DomainMetadata for cloning a table.
if (deltaOperation.name == DeltaOperations.OP_CLONE) {
actions ++=
DomainMetadataUtils.handleDomainMetadataForCloneTable(sourceSnapshot.domainMetadata)
}
}
val sourceName = sourceTable.name
// Override source table metadata with user-defined table properties
val context = Map[String, String]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession
doAssert(lastOperationParameters(CLUSTERING_PARAMETER_KEY) === "[]")
doAssert(lastOperationParameters(ZORDER_PARAMETER_KEY) === "[]")
}
case "CLONE" =>
// CLUSTER BY not in operation parameters for CLONE - similar to PARTITION BY.
doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY))
case o if clusterBySupportedOperations.contains(o) =>
if (expectClustering) {
assertClusterByExists()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, Del

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{ArrayType, IntegerType, StructField, StructType}

Expand Down Expand Up @@ -666,6 +667,58 @@ trait ClusteredTableDDLSuiteBase
}
}
}

test("validate CLONE on clustered table") {
import testImplicits._
val srcTable = "SrcTbl"
val dstTable1 = "DestTbl1"
val dstTable2 = "DestTbl2"
val dstTable3 = "DestTbl3"

withTable(srcTable, dstTable1, dstTable2, dstTable3) {
// Create the source table.
sql(s"CREATE TABLE $srcTable (col1 INT, col2 INT, col3 INT) " +
s"USING delta CLUSTER BY (col1, col2)")
val tableIdent = new TableIdentifier(srcTable)
(1 to 100).map(i => (i, i + 1000, i + 100)).toDF("col1", "col2", "col3")
.repartitionByRange(100, col("col1"))
.write.format("delta").mode("append").saveAsTable(srcTable)

// Force clustering on the source table.
val (_, srcSnapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdent)
val ingestionSize = srcSnapshot.allFiles.collect().map(_.size).sum
withSQLConf(
DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> (ingestionSize / 4).toString) {
runOptimize(srcTable) { res =>
assert(res.numFilesAdded === 4)
assert(res.numFilesRemoved === 100)
}
}

// Create destination table as a clone of the source table.
sql(s"CREATE TABLE $dstTable1 SHALLOW CLONE $srcTable")

// Validate clustering columns and that clustering columns in stats schema.
val (_, dstSnapshot1) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(dstTable1))
verifyClusteringColumns(TableIdentifier(dstTable1), "col1,col2")
ClusteredTableUtils.validateClusteringColumnsInStatsSchema(dstSnapshot1, Seq("col1", "col2"))

// Change to CLUSTER BY NONE, then CLONE from earlier version to validate that the
// clustering column information is maintainted.
sql(s"ALTER TABLE $srcTable CLUSTER BY NONE")
sql(s"CREATE TABLE $dstTable2 SHALLOW CLONE $srcTable VERSION AS OF 2")
val (_, dstSnapshot2) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(dstTable2))
verifyClusteringColumns(TableIdentifier(dstTable2), "col1,col2")
ClusteredTableUtils.validateClusteringColumnsInStatsSchema(dstSnapshot2, Seq("col1", "col2"))

// Validate CLONE after CLUSTER BY NONE
sql(s"CREATE TABLE $dstTable3 SHALLOW CLONE $srcTable")
val (_, dstSnapshot3) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(dstTable3))
verifyClusteringColumns(TableIdentifier(dstTable3), "")
ClusteredTableUtils.validateClusteringColumnsInStatsSchema(dstSnapshot3, Seq.empty)

}
}
}

trait ClusteredTableDDLSuite extends ClusteredTableDDLSuiteBase
Expand Down

0 comments on commit 0983543

Please sign in to comment.