Skip to content

Commit

Permalink
[Spark] Revert column mapping protocol fix (delta-io#3748)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

We revert of
delta-io@920f185
due to creating a backward/forward compatibility issue.

In the original PR we fixed an issue where when column mapping was the
only reader feature, it would not appear in the reader features set.
This is primarily a memory representation issue, but it turns out the
invalid protocol could also be serialized with a specific sequence of
events.

The protocol action, has a requirement at initialization time to ensure
that only protocols with version 3 have the reader features set. When we
fixed the column mapping bug, the requirement was expanded to also
include reader features with version 2. This can be problematic if a
table was created with an old Delta version which allowed to serialize
the invalid protocol, and then try to read the table with the latest
Delta version. The reverse is also problematic.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

Clean revert. Existing tests.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No.
  • Loading branch information
andreaschat-db authored Oct 2, 2024
1 parent 19c054b commit 3b15f0e
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ class DeltaLog private(
def assertTableFeaturesMatchMetadata(
targetProtocol: Protocol,
targetMetadata: Metadata): Unit = {
if (!targetProtocol.supportsTableFeatures) return
if (!targetProtocol.supportsReaderFeatures && !targetProtocol.supportsWriterFeatures) return

val protocolEnabledFeatures = targetProtocol.writerFeatureNames
.flatMap(TableFeature.featureNameToFeature)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ class Snapshot(
}
base.put(Protocol.MIN_READER_VERSION_PROP, protocol.minReaderVersion.toString)
base.put(Protocol.MIN_WRITER_VERSION_PROP, protocol.minWriterVersion.toString)
if (protocol.supportsTableFeatures) {
if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) {
val features = protocol.readerAndWriterFeatureNames.map(name =>
s"${TableFeatureProtocolUtils.FEATURE_PROP_PREFIX}$name" ->
TableFeatureProtocolUtils.FEATURE_PROP_SUPPORTED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore
*/
trait TableFeatureSupport { this: Protocol =>

/**
* Check if this protocol can support arbitrary reader features. If this returns false,
* then the table may still be able to support the "columnMapping" feature.
* See [[canSupportColumnMappingFeature]] below.
*/
/** Check if this protocol is capable of adding features into its `readerFeatures` field. */
def supportsReaderFeatures: Boolean =
TableFeatureProtocolUtils.supportsReaderFeatures(minReaderVersion)

/**
* Check if this protocol is in table feature representation and can support column mapping.
* Column mapping is the only legacy reader feature and requires special handling in some
* cases.
*/
def canSupportColumnMappingFeature: Boolean =
TableFeatureProtocolUtils.canSupportColumnMappingFeature(minReaderVersion, minWriterVersion)

/** Check if this protocol is capable of adding features into its `writerFeatures` field. */
def supportsWriterFeatures: Boolean =
TableFeatureProtocolUtils.supportsWriterFeatures(minWriterVersion)

/**
* As soon as a protocol supports writer features it is considered a table features protocol.
* It is not possible to support reader features without supporting writer features.
*/
def supportsTableFeatures: Boolean = supportsWriterFeatures

/**
* Get a new Protocol object that has `feature` supported. Writer-only features will be added to
* `writerFeatures` field, and reader-writer features will be added to `readerFeatures` and
Expand All @@ -78,7 +60,7 @@ trait TableFeatureSupport { this: Protocol =>
*/
def withFeature(feature: TableFeature): Protocol = {
def shouldAddRead: Boolean = {
if (feature == ColumnMappingTableFeature && canSupportColumnMappingFeature) return true
if (supportsReaderFeatures) return true
if (feature.minReaderVersion <= minReaderVersion) return false

throw DeltaErrors.tableFeatureRequiresHigherReaderProtocolVersion(
Expand Down Expand Up @@ -129,13 +111,25 @@ trait TableFeatureSupport { this: Protocol =>
* `writerFeatures` field.
*
* The method does not require the feature to be recognized by the client, therefore will not
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync.
* Should never be used directly. Always use withFeature(feature: TableFeature): Protocol.
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution.
*/
private[actions] def withFeature(
name: String,
addToReaderFeatures: Boolean,
addToWriterFeatures: Boolean): Protocol = {
if (addToReaderFeatures && !supportsReaderFeatures) {
throw DeltaErrors.tableFeatureRequiresHigherReaderProtocolVersion(
name,
currentVersion = minReaderVersion,
requiredVersion = TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION)
}
if (addToWriterFeatures && !supportsWriterFeatures) {
throw DeltaErrors.tableFeatureRequiresHigherWriterProtocolVersion(
name,
currentVersion = minWriterVersion,
requiredVersion = TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
}

val addedReaderFeatureOpt = if (addToReaderFeatures) Some(name) else None
val addedWriterFeatureOpt = if (addToWriterFeatures) Some(name) else None

Expand All @@ -149,23 +143,23 @@ trait TableFeatureSupport { this: Protocol =>
* `readerFeatures` field.
*
* The method does not require the features to be recognized by the client, therefore will not
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync.
* Intended only for testing. Use with caution.
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution.
*/
private[delta] def withReaderFeatures(names: Iterable[String]): Protocol = {
names.foldLeft(this)(_.withFeature(_, addToReaderFeatures = true, addToWriterFeatures = false))
names.foldLeft(this)(
_.withFeature(_, addToReaderFeatures = true, addToWriterFeatures = false))
}

/**
* Get a new Protocol object with additional feature descriptors added to the protocol's
* `writerFeatures` field.
*
* The method does not require the features to be recognized by the client, therefore will not
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync.
* Intended only for testing. Use with caution.
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution.
*/
private[delta] def withWriterFeatures(names: Iterable[String]): Protocol = {
names.foldLeft(this)(_.withFeature(_, addToReaderFeatures = false, addToWriterFeatures = true))
names.foldLeft(this)(
_.withFeature(_, addToReaderFeatures = false, addToWriterFeatures = true))
}

/**
Expand Down Expand Up @@ -209,16 +203,14 @@ trait TableFeatureSupport { this: Protocol =>
*/
@JsonIgnore
lazy val implicitlySupportedFeatures: Set[TableFeature] = {
if (supportsTableFeatures) {
// As soon as a protocol supports writer features, all features need to be explicitly defined.
// This includes legacy reader features (the only one is Column Mapping), even if the
// reader protocol is legacy and explicitly supports Column Mapping.
if (supportsReaderFeatures && supportsWriterFeatures) {
// this protocol uses both reader and writer features, no feature can be implicitly supported
Set()
} else {
TableFeature.allSupportedFeaturesMap.values
.filter(_.isLegacyFeature)
.filter(_.minReaderVersion <= this.minReaderVersion)
.filter(_.minWriterVersion <= this.minWriterVersion)
.filterNot(supportsReaderFeatures || this.minReaderVersion < _.minReaderVersion)
.filterNot(supportsWriterFeatures || this.minWriterVersion < _.minWriterVersion)
.toSet
}
}
Expand Down Expand Up @@ -279,11 +271,14 @@ trait TableFeatureSupport { this: Protocol =>
val protocols = this +: others
val mergedReaderVersion = protocols.map(_.minReaderVersion).max
val mergedWriterVersion = protocols.map(_.minWriterVersion).max
val mergedFeatures = protocols.flatMap(_.readerAndWriterFeatures)
val mergedReaderFeatures = protocols.flatMap(_.readerFeatureNames)
val mergedWriterFeatures = protocols.flatMap(_.writerFeatureNames)
val mergedImplicitFeatures = protocols.flatMap(_.implicitlySupportedFeatures)

val mergedProtocol = Protocol(mergedReaderVersion, mergedWriterVersion)
.withFeatures(mergedFeatures ++ mergedImplicitFeatures)
.withReaderFeatures(mergedReaderFeatures)
.withWriterFeatures(mergedWriterFeatures)
.withFeatures(mergedImplicitFeatures)

// The merged protocol is always normalized in order to represent the protocol
// with the weakest possible form. This enables backward compatibility.
Expand Down Expand Up @@ -353,7 +348,7 @@ trait TableFeatureSupport { this: Protocol =>
*/
def normalized: Protocol = {
// Normalization can only be applied to table feature protocols.
if (!supportsTableFeatures) return this
if (!supportsWriterFeatures) return this

val (minReaderVersion, minWriterVersion) =
TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures)
Expand All @@ -376,7 +371,7 @@ trait TableFeatureSupport { this: Protocol =>
*/
def denormalized: Protocol = {
// Denormalization can only be applied to legacy protocols.
if (supportsTableFeatures) return this
if (supportsWriterFeatures) return this

val (minReaderVersion, _) =
TableFeatureProtocolUtils.minimumRequiredVersions(implicitlySupportedFeatures.toSeq)
Expand Down Expand Up @@ -424,7 +419,7 @@ object TableFeatureProtocolUtils {
/** The string constant "supported" for uses in table properties. */
val FEATURE_PROP_SUPPORTED = "supported"

/** Min reader version that supports native reader features. */
/** Min reader version that supports reader features. */
val TABLE_FEATURES_MIN_READER_VERSION = 3

/** Min reader version that supports writer features. */
Expand All @@ -445,20 +440,8 @@ object TableFeatureProtocolUtils {
s"$DEFAULT_FEATURE_PROP_PREFIX$featureName"

/**
* Determine whether a [[Protocol]] with the given reader protocol version can support column
* mapping. All table feature protocols that can support column mapping are capable of adding
* the feature to the `readerFeatures` field. This includes legacy reader protocol version
* (2, 7).
*/
def canSupportColumnMappingFeature(readerVersion: Int, writerVersion: Int): Boolean = {
readerVersion >= ColumnMappingTableFeature.minReaderVersion &&
supportsWriterFeatures(writerVersion)
}

/**
* Determine whether a [[Protocol]] with the given reader protocol version supports
* native features. All protocols that can support native reader features are capable
* of adding the feature to the `readerFeatures` field.
* Determine whether a [[Protocol]] with the given reader protocol version is capable of adding
* features into its `readerFeatures` field.
*/
def supportsReaderFeatures(readerVersion: Int): Boolean = {
readerVersion >= TABLE_FEATURES_MIN_READER_VERSION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ case class Protocol private (
// Correctness check
// Reader and writer versions must match the status of reader and writer features
require(
(supportsReaderFeatures || canSupportColumnMappingFeature) == readerFeatures.isDefined,
supportsReaderFeatures == readerFeatures.isDefined,
"Mismatched minReaderVersion and readerFeatures.")
require(
supportsWriterFeatures == writerFeatures.isDefined,
"Mismatched minWriterVersion and writerFeatures.")

// When reader is on table features, writer must be on table features too.
// When reader is on table features, writer must be on table features too
if (supportsReaderFeatures && !supportsWriterFeatures) {
throw DeltaErrors.tableFeatureReadRequiresWriteException(
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
Expand All @@ -165,7 +165,7 @@ case class Protocol private (
*/
@JsonIgnore
lazy val simpleString: String = {
if (!supportsTableFeatures) {
if (!supportsReaderFeatures && !supportsWriterFeatures) {
s"$minReaderVersion,$minWriterVersion"
} else {
val readerFeaturesStr = readerFeatures
Expand Down Expand Up @@ -202,20 +202,18 @@ object Protocol {
def apply(
minReaderVersion: Int = Action.readerVersion,
minWriterVersion: Int = Action.writerVersion): Protocol = {
val shouldAddReaderFeatures = supportsReaderFeatures(minReaderVersion) ||
canSupportColumnMappingFeature(minReaderVersion, minWriterVersion)
new Protocol(
minReaderVersion = minReaderVersion,
minWriterVersion = minWriterVersion,
readerFeatures = if (shouldAddReaderFeatures) Some(Set()) else None,
readerFeatures = if (supportsReaderFeatures(minReaderVersion)) Some(Set()) else None,
writerFeatures = if (supportsWriterFeatures(minWriterVersion)) Some(Set()) else None)
}

/** Returns the required protocol for a given feature. Takes into account dependent features. */
def forTableFeature(tf: TableFeature): Protocol = {
// Every table feature is a writer feature.
val writerFeatures = tf.requiredFeatures + tf
val readerFeatures = writerFeatures.filter(_.isReaderWriterFeature)
val readerFeatures = writerFeatures.filter(f => f.isReaderWriterFeature && !f.isLegacyFeature)
val writerFeaturesNames = writerFeatures.map(_.name)
val readerFeaturesNames = readerFeatures.map(_.name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
expectedJson =
s"""{"protocol":{"minReaderVersion":$TABLE_FEATURES_MIN_READER_VERSION,""" +
s""""minWriterVersion":$TABLE_FEATURES_MIN_WRITER_VERSION,""" +
""""readerFeatures":[],""" +
""""readerFeatures":["testLegacyReaderWriter"],""" +
""""writerFeatures":["testLegacyReaderWriter"]}}""")

testActionSerDe(
Expand All @@ -248,7 +248,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
expectedJson =
s"""{"protocol":{"minReaderVersion":$TABLE_FEATURES_MIN_READER_VERSION,""" +
s""""minWriterVersion":$TABLE_FEATURES_MIN_WRITER_VERSION,""" +
""""readerFeatures":["testReaderWriter"],""" +
""""readerFeatures":["testLegacyReaderWriter","testReaderWriter"],""" +
""""writerFeatures":["testLegacyReaderWriter","testReaderWriter"]}}""")

testActionSerDe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ trait DeltaCDCColumnMappingSuiteBase extends DeltaCDCSuiteBase
}
// upgrade to name mode
val protocol = deltaLog.snapshot.protocol
val (r, w) = if (protocol.supportsTableFeatures) {
val (r, w) = if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) {
(TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION,
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession {
Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString),
(Protocol.MIN_WRITER_VERSION_PROP,
Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString))
if (snapshot.protocol.supportsTableFeatures) {
if (snapshot.protocol.supportsReaderFeatures || snapshot.protocol.supportsWriterFeatures) {
props ++=
Protocol.minProtocolComponentsFromAutomaticallyEnabledFeatures(
spark, metadata, snapshot.protocol)
Expand Down
Loading

0 comments on commit 3b15f0e

Please sign in to comment.