Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Oct 2, 2024
1 parent 19c054b commit b7a9493
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ class DeltaLog private(
def assertTableFeaturesMatchMetadata(
targetProtocol: Protocol,
targetMetadata: Metadata): Unit = {
if (!targetProtocol.supportsTableFeatures) return
if (!targetProtocol.supportsReaderFeatures && !targetProtocol.supportsWriterFeatures) return

val protocolEnabledFeatures = targetProtocol.writerFeatureNames
.flatMap(TableFeature.featureNameToFeature)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ class Snapshot(
}
base.put(Protocol.MIN_READER_VERSION_PROP, protocol.minReaderVersion.toString)
base.put(Protocol.MIN_WRITER_VERSION_PROP, protocol.minWriterVersion.toString)
if (protocol.supportsTableFeatures) {
if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) {
val features = protocol.readerAndWriterFeatureNames.map(name =>
s"${TableFeatureProtocolUtils.FEATURE_PROP_PREFIX}$name" ->
TableFeatureProtocolUtils.FEATURE_PROP_SUPPORTED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore
*/
trait TableFeatureSupport { this: Protocol =>

/**
* Check if this protocol can support arbitrary reader features. If this returns false,
* then the table may still be able to support the "columnMapping" feature.
* See [[canSupportColumnMappingFeature]] below.
*/
/** Check if this protocol is capable of adding features into its `readerFeatures` field. */
def supportsReaderFeatures: Boolean =
TableFeatureProtocolUtils.supportsReaderFeatures(minReaderVersion)

/**
* Check if this protocol is in table feature representation and can support column mapping.
* Column mapping is the only legacy reader feature and requires special handling in some
* cases.
*/
def canSupportColumnMappingFeature: Boolean =
TableFeatureProtocolUtils.canSupportColumnMappingFeature(minReaderVersion, minWriterVersion)

/** Check if this protocol is capable of adding features into its `writerFeatures` field. */
def supportsWriterFeatures: Boolean =
TableFeatureProtocolUtils.supportsWriterFeatures(minWriterVersion)

/**
* As soon as a protocol supports writer features it is considered a table features protocol.
* It is not possible to support reader features without supporting writer features.
*/
def supportsTableFeatures: Boolean = supportsWriterFeatures

/**
* Get a new Protocol object that has `feature` supported. Writer-only features will be added to
* `writerFeatures` field, and reader-writer features will be added to `readerFeatures` and
Expand All @@ -78,7 +60,7 @@ trait TableFeatureSupport { this: Protocol =>
*/
def withFeature(feature: TableFeature): Protocol = {
def shouldAddRead: Boolean = {
if (feature == ColumnMappingTableFeature && canSupportColumnMappingFeature) return true
if (supportsReaderFeatures) return true
if (feature.minReaderVersion <= minReaderVersion) return false

throw DeltaErrors.tableFeatureRequiresHigherReaderProtocolVersion(
Expand Down Expand Up @@ -129,13 +111,25 @@ trait TableFeatureSupport { this: Protocol =>
* `writerFeatures` field.
*
* The method does not require the feature to be recognized by the client, therefore will not
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync.
* Should never be used directly. Always use withFeature(feature: TableFeature): Protocol.
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution.
*/
private[actions] def withFeature(
name: String,
addToReaderFeatures: Boolean,
addToWriterFeatures: Boolean): Protocol = {
if (addToReaderFeatures && !supportsReaderFeatures) {
throw DeltaErrors.tableFeatureRequiresHigherReaderProtocolVersion(
name,
currentVersion = minReaderVersion,
requiredVersion = TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION)
}
if (addToWriterFeatures && !supportsWriterFeatures) {
throw DeltaErrors.tableFeatureRequiresHigherWriterProtocolVersion(
name,
currentVersion = minWriterVersion,
requiredVersion = TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
}

val addedReaderFeatureOpt = if (addToReaderFeatures) Some(name) else None
val addedWriterFeatureOpt = if (addToWriterFeatures) Some(name) else None

Expand All @@ -149,23 +143,23 @@ trait TableFeatureSupport { this: Protocol =>
* `readerFeatures` field.
*
* The method does not require the features to be recognized by the client, therefore will not
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync.
* Intended only for testing. Use with caution.
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution.
*/
private[delta] def withReaderFeatures(names: Iterable[String]): Protocol = {
names.foldLeft(this)(_.withFeature(_, addToReaderFeatures = true, addToWriterFeatures = false))
names.foldLeft(this)(
_.withFeature(_, addToReaderFeatures = true, addToWriterFeatures = false))
}

/**
* Get a new Protocol object with additional feature descriptors added to the protocol's
* `writerFeatures` field.
*
* The method does not require the features to be recognized by the client, therefore will not
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync.
* Intended only for testing. Use with caution.
* try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution.
*/
private[delta] def withWriterFeatures(names: Iterable[String]): Protocol = {
names.foldLeft(this)(_.withFeature(_, addToReaderFeatures = false, addToWriterFeatures = true))
names.foldLeft(this)(
_.withFeature(_, addToReaderFeatures = false, addToWriterFeatures = true))
}

/**
Expand Down Expand Up @@ -209,16 +203,14 @@ trait TableFeatureSupport { this: Protocol =>
*/
@JsonIgnore
lazy val implicitlySupportedFeatures: Set[TableFeature] = {
if (supportsTableFeatures) {
// As soon as a protocol supports writer features, all features need to be explicitly defined.
// This includes legacy reader features (the only one is Column Mapping), even if the
// reader protocol is legacy and explicitly supports Column Mapping.
if (supportsReaderFeatures && supportsWriterFeatures) {
// this protocol uses both reader and writer features, no feature can be implicitly supported
Set()
} else {
TableFeature.allSupportedFeaturesMap.values
.filter(_.isLegacyFeature)
.filter(_.minReaderVersion <= this.minReaderVersion)
.filter(_.minWriterVersion <= this.minWriterVersion)
.filterNot(supportsReaderFeatures || this.minReaderVersion < _.minReaderVersion)
.filterNot(supportsWriterFeatures || this.minWriterVersion < _.minWriterVersion)
.toSet
}
}
Expand Down Expand Up @@ -279,11 +271,14 @@ trait TableFeatureSupport { this: Protocol =>
val protocols = this +: others
val mergedReaderVersion = protocols.map(_.minReaderVersion).max
val mergedWriterVersion = protocols.map(_.minWriterVersion).max
val mergedFeatures = protocols.flatMap(_.readerAndWriterFeatures)
val mergedReaderFeatures = protocols.flatMap(_.readerFeatureNames)
val mergedWriterFeatures = protocols.flatMap(_.writerFeatureNames)
val mergedImplicitFeatures = protocols.flatMap(_.implicitlySupportedFeatures)

val mergedProtocol = Protocol(mergedReaderVersion, mergedWriterVersion)
.withFeatures(mergedFeatures ++ mergedImplicitFeatures)
.withReaderFeatures(mergedReaderFeatures)
.withWriterFeatures(mergedWriterFeatures)
.withFeatures(mergedImplicitFeatures)

// The merged protocol is always normalized in order to represent the protocol
// with the weakest possible form. This enables backward compatibility.
Expand Down Expand Up @@ -353,7 +348,7 @@ trait TableFeatureSupport { this: Protocol =>
*/
def normalized: Protocol = {
// Normalization can only be applied to table feature protocols.
if (!supportsTableFeatures) return this
if (!supportsWriterFeatures) return this

val (minReaderVersion, minWriterVersion) =
TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures)
Expand All @@ -376,7 +371,7 @@ trait TableFeatureSupport { this: Protocol =>
*/
def denormalized: Protocol = {
// Denormalization can only be applied to legacy protocols.
if (supportsTableFeatures) return this
if (supportsWriterFeatures) return this

val (minReaderVersion, _) =
TableFeatureProtocolUtils.minimumRequiredVersions(implicitlySupportedFeatures.toSeq)
Expand Down Expand Up @@ -424,7 +419,7 @@ object TableFeatureProtocolUtils {
/** The string constant "supported" for uses in table properties. */
val FEATURE_PROP_SUPPORTED = "supported"

/** Min reader version that supports native reader features. */
/** Min reader version that supports reader features. */
val TABLE_FEATURES_MIN_READER_VERSION = 3

/** Min reader version that supports writer features. */
Expand All @@ -445,20 +440,8 @@ object TableFeatureProtocolUtils {
s"$DEFAULT_FEATURE_PROP_PREFIX$featureName"

/**
* Determine whether a [[Protocol]] with the given reader protocol version can support column
* mapping. All table feature protocols that can support column mapping are capable of adding
* the feature to the `readerFeatures` field. This includes legacy reader protocol version
* (2, 7).
*/
def canSupportColumnMappingFeature(readerVersion: Int, writerVersion: Int): Boolean = {
readerVersion >= ColumnMappingTableFeature.minReaderVersion &&
supportsWriterFeatures(writerVersion)
}

/**
* Determine whether a [[Protocol]] with the given reader protocol version supports
* native features. All protocols that can support native reader features are capable
* of adding the feature to the `readerFeatures` field.
* Determine whether a [[Protocol]] with the given reader protocol version is capable of adding
* features into its `readerFeatures` field.
*/
def supportsReaderFeatures(readerVersion: Int): Boolean = {
readerVersion >= TABLE_FEATURES_MIN_READER_VERSION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ case class Protocol private (
// Correctness check
// Reader and writer versions must match the status of reader and writer features
require(
(supportsReaderFeatures || canSupportColumnMappingFeature) == readerFeatures.isDefined,
supportsReaderFeatures == readerFeatures.isDefined,
"Mismatched minReaderVersion and readerFeatures.")
require(
supportsWriterFeatures == writerFeatures.isDefined,
"Mismatched minWriterVersion and writerFeatures.")

// When reader is on table features, writer must be on table features too.
// When reader is on table features, writer must be on table features too
if (supportsReaderFeatures && !supportsWriterFeatures) {
throw DeltaErrors.tableFeatureReadRequiresWriteException(
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
Expand All @@ -165,7 +165,7 @@ case class Protocol private (
*/
@JsonIgnore
lazy val simpleString: String = {
if (!supportsTableFeatures) {
if (!supportsReaderFeatures && !supportsWriterFeatures) {
s"$minReaderVersion,$minWriterVersion"
} else {
val readerFeaturesStr = readerFeatures
Expand Down Expand Up @@ -202,20 +202,18 @@ object Protocol {
def apply(
minReaderVersion: Int = Action.readerVersion,
minWriterVersion: Int = Action.writerVersion): Protocol = {
val shouldAddReaderFeatures = supportsReaderFeatures(minReaderVersion) ||
canSupportColumnMappingFeature(minReaderVersion, minWriterVersion)
new Protocol(
minReaderVersion = minReaderVersion,
minWriterVersion = minWriterVersion,
readerFeatures = if (shouldAddReaderFeatures) Some(Set()) else None,
readerFeatures = if (supportsReaderFeatures(minReaderVersion)) Some(Set()) else None,
writerFeatures = if (supportsWriterFeatures(minWriterVersion)) Some(Set()) else None)
}

/** Returns the required protocol for a given feature. Takes into account dependent features. */
def forTableFeature(tf: TableFeature): Protocol = {
// Every table feature is a writer feature.
val writerFeatures = tf.requiredFeatures + tf
val readerFeatures = writerFeatures.filter(_.isReaderWriterFeature)
val readerFeatures = writerFeatures.filter(f => f.isReaderWriterFeature && !f.isLegacyFeature)
val writerFeaturesNames = writerFeatures.map(_.name)
val readerFeaturesNames = readerFeatures.map(_.name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
expectedJson =
s"""{"protocol":{"minReaderVersion":$TABLE_FEATURES_MIN_READER_VERSION,""" +
s""""minWriterVersion":$TABLE_FEATURES_MIN_WRITER_VERSION,""" +
""""readerFeatures":[],""" +
""""readerFeatures":["testLegacyReaderWriter"],""" +
""""writerFeatures":["testLegacyReaderWriter"]}}""")

testActionSerDe(
Expand All @@ -248,7 +248,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
expectedJson =
s"""{"protocol":{"minReaderVersion":$TABLE_FEATURES_MIN_READER_VERSION,""" +
s""""minWriterVersion":$TABLE_FEATURES_MIN_WRITER_VERSION,""" +
""""readerFeatures":["testReaderWriter"],""" +
""""readerFeatures":["testLegacyReaderWriter","testReaderWriter"],""" +
""""writerFeatures":["testLegacyReaderWriter","testReaderWriter"]}}""")

testActionSerDe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ trait DeltaCDCColumnMappingSuiteBase extends DeltaCDCSuiteBase
}
// upgrade to name mode
val protocol = deltaLog.snapshot.protocol
val (r, w) = if (protocol.supportsTableFeatures) {
val (r, w) = if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) {
(TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION,
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession {
Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString),
(Protocol.MIN_WRITER_VERSION_PROP,
Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString))
if (snapshot.protocol.supportsTableFeatures) {
if (snapshot.protocol.supportsReaderFeatures || snapshot.protocol.supportsWriterFeatures) {
props ++=
Protocol.minProtocolComponentsFromAutomaticallyEnabledFeatures(
spark, metadata, snapshot.protocol)
Expand Down
Loading

0 comments on commit b7a9493

Please sign in to comment.