Skip to content

Commit

Permalink
Fixed the behaviour of Alter Table set protocol versions
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Jun 27, 2024
1 parent 6903541 commit b50aa8f
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite
// 2 or 3 will add "invariants" and "appendOnly", filling in the gaps for writer
// protocol version 3, and then we can downgrade to version 3.
val proposedNewProtocol = protocolBeforeUpdate
.normalizeLegacyFeatures
.merge(newProtocolForLatestMetadata)

if (proposedNewProtocol != protocolBeforeUpdate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ object TableFeature {
RowTrackingFeature,
InCommitTimestampTableFeature,
VariantTypeTableFeature,
TestRemovableWriterFeature,
CoordinatedCommitsTableFeature)
if (DeltaUtils.isTesting) {
features ++= Set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,19 +359,6 @@ trait TableFeatureSupport { this: Protocol =>
}
}

/**
* For table feature protocols, add the missing legacy features to match exactly the
* implicit features of the minimum required legacy protocol versions.
*/
def normalizeLegacyFeatures: Protocol = {
if (!supportsWriterFeatures) return this

// Prepare the protocol but filling the "gaps" with all missing legacy features.
val (minReaderVersion, minWriterVersion) =
TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures)
merge(Protocol(minReaderVersion, minWriterVersion))
}

/**
* Check if a `feature` is supported by this protocol. This means either (a) the protocol does
* not support table features and implicitly supports the feature, or (b) the protocol supports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3464,10 +3464,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
truncateHistory = truncateHistory)
}

def validVersions = Seq((1, 1), (1, 2), (1, 3), (1, 4), (2, 5), (2, 6), (1, 7), (3, 7))
def invalidVersions = Seq((2, 2), (2, 3))
private def validVersions = Seq((1, 1), (1, 2), (1, 3), (1, 4), (2, 5), (1, 7), (3, 7))
private def invalidVersions = Seq((2, 2), (2, 3))
for ((readerVersion, writerVersion) <- validVersions ++ invalidVersions)
test(s"Legacy features are added when setting legacy versions: " +
test("Legacy features are added when setting legacy versions: " +
s"readerVersionToSet = $readerVersion, writerVersionToSet = $writerVersion") {
withTempDir { dir =>
val deltaLog = DeltaLog.forTable(spark, dir)
Expand All @@ -3491,7 +3491,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
}
}

test(s"Setting protocol versions in a legacy table should not add any features") {
test("Setting protocol versions in a legacy table should not add any features") {
withTempDir { dir =>
val deltaLog = DeltaLog.forTable(spark, dir)

Expand Down Expand Up @@ -3528,7 +3528,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
sql(
s"""CREATE TABLE delta.`${deltaLog.dataPath}` (id bigint) USING delta
|TBLPROPERTIES (
|delta.minReaderVersion = $TABLE_FEATURES_MIN_READER_VERSION,
|delta.minReaderVersion = ${Math.max(tableFeatureToAdd.minReaderVersion, 1)},
|delta.minWriterVersion = $TABLE_FEATURES_MIN_WRITER_VERSION,
|delta.feature.${tableFeatureToAdd.name} = 'supported',
|delta.feature.${ChangeDataFeedTableFeature.name} = 'supported'
Expand All @@ -3550,23 +3550,13 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest

if (!preemptiveVersionDowngrade) sql(downgradeProtocolVersionsSQL)

// - When setting the protocol versions on a protocol with legacy features only, we ignore
// the actual version set when it is lower than required from the already enabled legacy
// features. Instead, we always attempt to set the minimum required versions.
// - When setting the protocol versions on a protocol with table features we add the legacy
// features of the requested versions.
if (preemptiveVersionDowngrade) {
val expectedProtocol = if (downgradeVersionToSet < 4) {
Protocol(tableFeatureToAdd.minReaderVersion, 7).withFeature(ChangeDataFeedTableFeature)
.merge(Protocol(1, downgradeVersionToSet))
} else {
Protocol(1, downgradeVersionToSet)
}
assert(deltaLog.update().protocol === expectedProtocol)
val expectedProtocol = if (downgradeVersionToSet < 4) {
Protocol(tableFeatureToAdd.minReaderVersion, 7).withFeature(ChangeDataFeedTableFeature)
.merge(Protocol(1, downgradeVersionToSet))
} else {
val expectedWriterVersion = if (downgradeVersionToSet > 4) downgradeVersionToSet else 4
assert(deltaLog.update().protocol === Protocol(1, expectedWriterVersion))
Protocol(1, downgradeVersionToSet)
}
assert(deltaLog.update().protocol === expectedProtocol)
}
}

Expand Down Expand Up @@ -3604,6 +3594,8 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
tableFeatureToAdd.name,
truncateHistory = tableFeatureToAdd.isReaderWriterFeature).run(spark)

if (setLegacyVersions && downgradeAfterDrop) sql(downgradeProtocolVersionsSQL)

val expectedProtocol = if (setLegacyVersions) {
Protocol(1, 4)
} else {
Expand Down Expand Up @@ -3636,7 +3628,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
assert(deltaLog.update().protocol === Protocol(1, 2))
}
}

private def dropV2CheckpointsTableFeature(spark: SparkSession, log: DeltaLog): Unit = {
spark.sql(s"ALTER TABLE delta.`${log.dataPath}` DROP FEATURE " +
s"`${V2CheckpointTableFeature.name}`")
Expand Down

0 comments on commit b50aa8f

Please sign in to comment.