Skip to content

Commit

Permalink
[Spark] Extend Delta feature drop safety check to unbackfilled deltas (
Browse files Browse the repository at this point in the history
…delta-io#3028)

#### Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

When dropping a feature, DeltaLog now checks both backfilled and
unbackfilled deltas for any traces of the feature before confirming it's
safe to drop.

However, feature dropping currently does a checkpoint before detecting
feature traces in the history, so there are no unbackfilled deltas at
that point.

## How was this patch tested?

UTs

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
sumeet-db authored May 2, 2024
1 parent 122288f commit e7fa94d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,9 @@ sealed trait RemovableFeature { self: TableFeature =>

// Check if commits between 0 version and toVersion contain any traces of the feature.
val allHistoricalDeltaFiles = deltaLog
.listFrom(0L)
.takeWhile(file => FileNames.getFileVersionOpt(file.getPath).forall(_ <= toVersion))
.getChangeLogFiles(0)
.takeWhile { case (version, _) => version <= toVersion }
.map { case (_, file) => file }
.filter(FileNames.isDeltaFile)
.toSeq
DeltaLogFileIndex(DeltaLogFileIndex.COMMIT_FILE_FORMAT, allHistoricalDeltaFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable

import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils._
import org.apache.spark.sql.delta.managedcommit.{CommitOwnerProvider, InMemoryCommitOwnerBuilder}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
Expand Down Expand Up @@ -490,6 +491,52 @@ class DeltaTableFeatureSuite
}
}

test("drop table feature works with managed commits") {
val table = "tbl"
withTable(table) {
spark.range(0).write.format("delta").saveAsTable(table)
val log = DeltaLog.forTable(spark, TableIdentifier(table))
val featureName = TestRemovableReaderWriterFeature.name
assert(!log.update().protocol.readerAndWriterFeatureNames.contains(featureName))

// Add managed commit table feature to the table
CommitOwnerProvider.registerBuilder(InMemoryCommitOwnerBuilder(batchSize = 100))
val tblProperties1 = Seq(s"'${DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.key}' = 'in-memory'")
sql(buildTablePropertyModifyingCommand(
"ALTER", targetTableName = table, sourceTableName = table, tblProperties1))

// Add TestRemovableReaderWriterFeature to the table in unbackfilled delta files
val tblProperties2 = Seq(s"'$FEATURE_PROP_PREFIX$featureName' = 'supported', " +
s"'delta.minWriterVersion' = $TABLE_FEATURES_MIN_WRITER_VERSION, " +
s"'${TestRemovableReaderWriterFeature.TABLE_PROP_KEY}' = 'true'")
sql(buildTablePropertyModifyingCommand(
"ALTER", targetTableName = table, sourceTableName = table, tblProperties2))
assert(log.update().protocol.readerAndWriterFeatureNames.contains(featureName))

// Disable feature on the latest snapshot
val tblProperties3 = Seq(s"'${TestRemovableReaderWriterFeature.TABLE_PROP_KEY}' = 'false'")
sql(buildTablePropertyModifyingCommand(
"ALTER", targetTableName = table, sourceTableName = table, tblProperties3))

val tableFeature =
TableFeature.featureNameToFeature(featureName).get.asInstanceOf[RemovableFeature]
assert(tableFeature.historyContainsFeature(spark, log.update()))

// Dropping feature should fail because the feature still has traces in deltas.
val e = intercept[DeltaTableFeatureException] {
sql(s"ALTER TABLE $table DROP FEATURE $featureName")
}
assert(e.getMessage.contains("DELTA_FEATURE_DROP_HISTORICAL_VERSIONS_EXIST"), e)

// Add in a checkpoint and cleanUp up older logs containing feature traces
log.startTransaction().commitManually()
log.checkpoint()
log.cleanUpExpiredLogs(log.update(), deltaRetentionMillisOpt = Some(-1000000000000L))
sql(s"ALTER TABLE $table DROP FEATURE $featureName")
assert(!log.update().protocol.readerAndWriterFeatureNames.contains(featureName))
}
}

private def buildTablePropertyModifyingCommand(
commandName: String,
targetTableName: String,
Expand Down

0 comments on commit e7fa94d

Please sign in to comment.