Skip to content

Commit

Permalink
[Spark] Nicer error when failing to read table due to type change not…
Browse files Browse the repository at this point in the history
… support… (delta-io#3728)

## Description
Delta 3.2/3.3 only supports a limited subset of type changes that will
become available with Delta 4.0 / Spark 4.0.
This changes improves the error returned when reading a table with an
unsupported type change to tell user to upgrade to Delta 4.0 in case the
type change will be supported in that version.

## How was this patch tested?
Added tests to cover the error path.

## Does this PR introduce _any_ user-facing changes?
Updates error thrown on unsupported type change when reading a table.
  • Loading branch information
johanl-db authored Oct 1, 2024
1 parent be191c5 commit 440cc3e
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 1 deletion.
8 changes: 8 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2838,6 +2838,14 @@
],
"sqlState" : "0AKDC"
},
"DELTA_UNSUPPORTED_TYPE_CHANGE_IN_PREVIEW" : {
"message" : [
"This table can't be read by this version of Delta because an unsupported type change was applied. Field <fieldPath> was changed from <fromType> to <toType>.",
"Please upgrade to Delta 4.0 or higher to read this table, or drop the Type Widening table feature using a client that supports reading this table:",
" ALTER TABLE tableName DROP FEATURE <typeWideningFeatureName>"
],
"sqlState" : "0AKDC"
},
"DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA" : {
"message" : [
"Unable to operate on this table because an unsupported type change was applied. Field <fieldName> was changed from <fromType> to <toType>."
Expand Down
14 changes: 14 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,20 @@ trait DeltaErrorsBase
)
}

def unsupportedTypeChangeInPreview(
fieldPath: Seq[String],
fromType: DataType,
toType: DataType,
feature: TypeWideningTableFeatureBase): Throwable =
new DeltaUnsupportedOperationException(
errorClass = "DELTA_UNSUPPORTED_TYPE_CHANGE_IN_PREVIEW",
messageParameters = Array(
SchemaUtils.prettyFieldName(fieldPath),
fromType.sql,
toType.sql,
feature.name
))

def unsupportedTypeChangeInSchema(
fieldPath: Seq[String],
fromType: DataType,
Expand Down
29 changes: 29 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ object TypeWidening {
TypeWideningMetadata.getAllTypeChanges(metadata.schema).foreach {
case (_, TypeChange(_, from: AtomicType, to: AtomicType, _))
if isTypeChangeSupported(from, to) =>
case (fieldPath, TypeChange(_, from: AtomicType, to: AtomicType, _))
if stableFeatureCanReadTypeChange(from, to) =>
val featureName = if (protocol.isFeatureSupported(TypeWideningPreviewTableFeature)) {
TypeWideningPreviewTableFeature
} else {
TypeWideningTableFeature
}
throw DeltaErrors.unsupportedTypeChangeInPreview(fieldPath, from, to, featureName)
case (fieldPath, invalidChange) =>
throw DeltaErrors.unsupportedTypeChangeInSchema(
fieldPath ++ invalidChange.fieldPath,
Expand All @@ -97,4 +105,25 @@ object TypeWidening {
)
}
}

/**
* Whether the given type change is supported in the stable version of the feature. Used to
* provide a helpful error message during the preview phase if upgrading to Delta 4.0 would allow
* reading the table.
*/
private def stableFeatureCanReadTypeChange(fromType: AtomicType, toType: AtomicType): Boolean =
(fromType, toType) match {
case (from, to) if from == to => true
case (from: IntegralType, to: IntegralType) => from.defaultSize <= to.defaultSize
case (FloatType, DoubleType) => true
case (DateType, TimestampNTZType) => true
case (ByteType | ShortType | IntegerType, DoubleType) => true
case (from: DecimalType, to: DecimalType) => to.isWiderThan(from)
// Byte, Short, Integer are all stored as INT32 in parquet. The parquet readers support
// converting INT32 to Decimal(10, 0) and wider.
case (ByteType | ShortType | IntegerType, d: DecimalType) => d.isWiderThan(IntegerType)
// The parquet readers support converting INT64 to Decimal(20, 0) and wider.
case (LongType, d: DecimalType) => d.isWiderThan(LongType)
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,24 @@ trait DeltaErrorsSuiteBase
|""".stripMargin
))
}
{
checkError(
exception = intercept[DeltaUnsupportedOperationException] {
throw DeltaErrors.unsupportedTypeChangeInPreview(
fieldPath = Seq("origin", "country"),
fromType = IntegerType,
toType = LongType,
feature = TypeWideningPreviewTableFeature
)
},
"DELTA_UNSUPPORTED_TYPE_CHANGE_IN_PREVIEW",
parameters = Map(
"fieldPath" -> "origin.country",
"fromType" -> "INT",
"toType" -> "BIGINT",
"typeWideningFeatureName" -> "typeWidening-preview"
))
}
{
val e = intercept[DeltaIllegalStateException] {
throw DeltaErrors.unsupportedTypeChangeInSchema(Seq("s", "a"), IntegerType, StringType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ class TypeWideningTableFeatureSuite
with TypeWideningDropFeatureTestMixin
with TypeWideningTableFeatureTests

trait TypeWideningTableFeatureTests extends RowTrackingTestUtils with TypeWideningTestCases {
trait TypeWideningTableFeatureTests
extends RowTrackingTestUtils
with DeltaExcludedBySparkVersionTestMixinShims
with TypeWideningTestCases {
self: QueryTest
with TypeWideningTestMixin
with TypeWideningDropFeatureTestMixin =>
Expand Down Expand Up @@ -454,6 +457,43 @@ trait TypeWideningTableFeatureTests extends RowTrackingTestUtils with TypeWideni
)
}

testSparkLatestOnly(
"helpful error when reading type changes not supported yet during preview") {
sql(s"CREATE TABLE delta.`$tempDir` (a int) USING DELTA")
val metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
new MetadataBuilder()
.putString("toType", "long")
.putString("fromType", "int")
.putLong("tableVersion", 1)
.build()
)).build()

// Delta 3.2/3.3 doesn't support changing type from int->long, we manually commit that type
// change to simulate what Delta 4.0 could do.
deltaLog.withNewTransaction { txn =>
txn.commit(
Seq(txn.snapshot.metadata.copy(
schemaString = new StructType()
.add("a", LongType, nullable = true, metadata).json
)),
ManualUpdate)
}

checkError(
exception = intercept[DeltaUnsupportedOperationException] {
readDeltaTable(tempPath).collect()
},
"DELTA_UNSUPPORTED_TYPE_CHANGE_IN_PREVIEW",
parameters = Map(
"fieldPath" -> "a",
"fromType" -> "INT",
"toType" -> "BIGINT",
"typeWideningFeatureName" -> "typeWidening-preview"
)
)
}

test("type widening rewrite metrics") {
sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA")
addSingleFile(Seq(1, 2, 3), ByteType)
Expand Down

0 comments on commit 440cc3e

Please sign in to comment.