Skip to content

Commit

Permalink
[Spark] Fix type widening with char/varchar columns (delta-io#3744)
Browse files Browse the repository at this point in the history
## Description
Using type widening on a table that contains a char/varchar column
causes the following reads to fail with
`DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA`:
```
CREATE TABLE t (a VARCHAR(10), b INT);
ALTER TABLE t SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
ALTER TABLE t ALTER COLUMN b TYPE LONG;

SELECT * FROM t;
[DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA] Unable to operate on this table because an unsupported type change was applied. Field cut was changed from VARCHAR(10) to STRING`
```

Type changes are recorded in the table metadata and a check on read
ensures that all type changes are supported by the current
implementation as attempting to read data after an unsupported type
change could lead to incorrect results.
CHAR/VARCHAR columns are sometimes stripped down to STRING internally,
for that reason, ALTER TABLE incorrectly identify that column `a` type
changed to STRING and records it in the type widening metadata.

The read check in turn doesn't recognize that type change as one of the
supported widening type changes (which doesn't include changes to string
columns).

Fix:
1. Never record char/varchar/string type changes in the type widening
metadata
2. Never record unsupported type changes in the type widening metadata
and log an assertion instead.
3. Don't fail on char/varchar/string type changes in the type widening
metadata if such type change slips through 1. This will prevent failing
in case a non-compliant implementation still record a
char/varchar/string type change.
4. Provide a table property to bypass the check if a similar issue
happens again in the future.
  • Loading branch information
johanl-db authored Oct 2, 2024
1 parent 3b15f0e commit e70ca04
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.deletionvectors.{DropMarkedRowsFilter, KeepAll
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.util.ScalaExtensions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
Expand Down Expand Up @@ -73,7 +74,10 @@ case class DeltaParquetFileFormat(
}
}

TypeWidening.assertTableReadable(protocol, metadata)
SparkSession.getActiveSession.ifDefined { session =>
TypeWidening.assertTableReadable(session.sessionState.conf, protocol, metadata)
}


val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode
val referenceSchema: StructType = metadata.schema
Expand Down
13 changes: 11 additions & 2 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

object TypeWidening {
Expand Down Expand Up @@ -80,15 +82,22 @@ object TypeWidening {
* happen unless a non-compliant writer applied a type change that is not part of the feature
* specification.
*/
def assertTableReadable(protocol: Protocol, metadata: Metadata): Unit = {
if (!isSupported(protocol) ||
def assertTableReadable(conf: SQLConf, protocol: Protocol, metadata: Metadata): Unit = {
if (conf.getConf(DeltaSQLConf.DELTA_TYPE_WIDENING_BYPASS_UNSUPPORTED_TYPE_CHANGE_CHECK) ||
!isSupported(protocol) ||
!TypeWideningMetadata.containsTypeWideningMetadata(metadata.schema)) {
return
}

TypeWideningMetadata.getAllTypeChanges(metadata.schema).foreach {
case (_, TypeChange(_, from: AtomicType, to: AtomicType, _))
if isTypeChangeSupported(from, to) =>
// Char/Varchar/String type changes are allowed and independent from type widening.
// Implementations shouldn't record these type changes in the table metadata per the Delta
// spec, but in case that happen we really shouldn't block reading the table.
case (_, TypeChange(_,
_: StringType | CharType(_) | VarcharType(_),
_: StringType | CharType(_) | VarcharType(_), _)) =>
case (fieldPath, TypeChange(_, from: AtomicType, to: AtomicType, _))
if stableFeatureCanReadTypeChange(from, to) =>
val featureName = if (protocol.isFeatureSupported(TypeWideningPreviewTableFeature)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,28 @@ private[delta] object TypeWideningMetadata extends DeltaLogging {
collectTypeChanges(from.elementType, to.elementType).map { typeChange =>
typeChange.copy(fieldPath = "element" +: typeChange.fieldPath)
}
case (fromType: AtomicType, toType: AtomicType) if fromType != toType =>
case (fromType: AtomicType, toType: AtomicType) if fromType != toType &&
TypeWidening.isTypeChangeSupported(fromType, toType) =>
Seq(TypeChange(
version = None,
fromType,
toType,
fieldPath = Seq.empty
))
case (_: AtomicType, _: AtomicType) => Seq.empty
// Char/Varchar/String type changes are expected and unrelated to type widening. We don't record
// them in the table schema metadata and don't log them as unexpected type changes either,
case (StringType | CharType(_) | VarcharType(_), StringType | CharType(_) | VarcharType(_)) =>
Seq.empty
case (_: AtomicType, _: AtomicType) =>
deltaAssert(fromType == toType,
name = "typeWidening.unexpectedTypeChange",
msg = s"Trying to apply an unsupported type change: $fromType to $toType",
data = Map(
"fromType" -> fromType.sql,
"toType" -> toType.sql
)
)
Seq.empty
// Don't recurse inside structs, `collectTypeChanges` should be called directly on each struct
// fields instead to only collect type changes inside these fields.
case (_: StructType, _: StructType) => Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,10 @@ case class AlterTableChangeColumnDeltaCommand(
StatisticsCollection.renameDeltaStatsColumn(metadata, oldColumnPath, newColumnPath)

val newSchemaWithTypeWideningMetadata =
TypeWideningMetadata.addTypeWideningMetadata(txn, schema = newSchema, oldSchema = oldSchema)
TypeWideningMetadata.addTypeWideningMetadata(
txn,
schema = newSchema,
oldSchema = metadata.schema)

val newMetadata = metadata.copy(
schemaString = newSchemaWithTypeWideningMetadata.json,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,20 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

/**
* Internal config to bypass the check that ensures a table doesn't contain any unsupported type
* change when reading it. Meant as a mitigation in case the check incorrectly flags valid cases.
*/
val DELTA_TYPE_WIDENING_BYPASS_UNSUPPORTED_TYPE_CHANGE_CHECK =
buildConf("typeWidening.bypassUnsupportedTypeChangeCheck")
.internal()
.doc("""
| Disables check that ensures a table doesn't contain any unsupported type change when
| reading it.
|""".stripMargin)
.booleanConf
.createWithDefault(false)

val DELTA_IS_DELTA_TABLE_THROW_ON_ERROR =
buildConf("isDeltaTable.throwOnError")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,41 @@ trait TypeWideningCompatibilityTests {
assert(latestVersion.schema("a").dataType === ShortType)
checkAnswer(latestVersion, Seq(Row(1), Row(2)))
}

test("compatibility with char/varchar columns") {
sql(s"CREATE TABLE delta.`$tempPath` (a byte, c char(3), v varchar(3)) USING DELTA")
append(Seq((1.toByte, "abc", "def")).toDF("a", "c", "v"))
checkAnswer(readDeltaTable(tempPath), Seq(Row(1, "abc", "def")))

sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE smallint")
append(Seq((2.toShort, "ghi", "jkl")).toDF("a", "c", "v"))
assert(readDeltaTable(tempPath).schema ===
new StructType()
.add("a", ShortType, nullable = true,
metadata = typeWideningMetadata(version = 2, ByteType, ShortType))
.add("c", StringType, nullable = true,
metadata = new MetadataBuilder()
.putString("__CHAR_VARCHAR_TYPE_STRING", "char(3)")
.build()
)
.add("v", StringType, nullable = true,
metadata = new MetadataBuilder()
.putString("__CHAR_VARCHAR_TYPE_STRING", "varchar(3)")
.build()))
checkAnswer(readDeltaTable(tempPath), Seq(Row(1, "abc", "def"), Row(2, "ghi", "jkl")))

sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN c TYPE string")
sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN v TYPE string")
append(Seq((3.toShort, "longer string 1", "longer string 2")).toDF("a", "c", "v"))
assert(readDeltaTable(tempPath).schema ===
new StructType()
.add("a", ShortType, nullable = true,
metadata = typeWideningMetadata(version = 2, ByteType, ShortType))
.add("c", StringType)
.add("v", StringType))
checkAnswer(readDeltaTable(tempPath),
Seq(Row(1, "abc", "def"), Row(2, "ghi", "jkl"), Row(3, "longer string 1", "longer string 2")))
}
}

/** Trait collecting tests covering type widening + column mapping. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,33 +292,26 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest {

test("addTypeWideningMetadata/removeTypeWideningMetadata on top-level fields") {
val schemaWithoutMetadata =
StructType.fromDDL("i long, d decimal(15, 4), a array<double>, m map<short, int>")
StructType.fromDDL("i int, a array<int>, m map<short, int>")
val firstOldSchema =
StructType.fromDDL("i short, d decimal(6, 2), a array<byte>, m map<byte, int>")
StructType.fromDDL("i byte, a array<byte>, m map<byte, int>")
val secondOldSchema =
StructType.fromDDL("i int, d decimal(10, 4), a array<int>, m map<short, byte>")
StructType.fromDDL("i short, a array<short>, m map<short, byte>")

var schema =
TypeWideningMetadata.addTypeWideningMetadata(txn, schemaWithoutMetadata, firstOldSchema)

assert(schema("i") === StructField("i", LongType,
assert(schema("i") === StructField("i", IntegerType,
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("short", "long")
typeChangeMetadata("byte", "integer")
)).build()
))

assert(schema("d") === StructField("d", DecimalType(15, 4),
assert(schema("a") === StructField("a", ArrayType(IntegerType),
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("decimal(6,2)", "decimal(15,4)")
)).build()
))

assert(schema("a") === StructField("a", ArrayType(DoubleType),
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("byte", "double", "element")
typeChangeMetadata("byte", "integer", "element")
)).build()
))

Expand All @@ -332,34 +325,25 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest {
assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) ===
schemaWithoutMetadata -> Seq(
Seq.empty -> schema("i"),
Seq.empty -> schema("d"),
Seq.empty -> schema("a"),
Seq.empty -> schema("m")
))
// Second type change on all fields.
schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, secondOldSchema)

assert(schema("i") === StructField("i", LongType,
assert(schema("i") === StructField("i", IntegerType,
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("short", "long"),
typeChangeMetadata("integer", "long")
typeChangeMetadata("byte", "integer"),
typeChangeMetadata("short", "integer")
)).build()
))

assert(schema("d") === StructField("d", DecimalType(15, 4),
assert(schema("a") === StructField("a", ArrayType(IntegerType),
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("decimal(6,2)", "decimal(15,4)"),
typeChangeMetadata("decimal(10,4)", "decimal(15,4)")
)).build()
))

assert(schema("a") === StructField("a", ArrayType(DoubleType),
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("byte", "double", "element"),
typeChangeMetadata("integer", "double", "element")
typeChangeMetadata("byte", "integer", "element"),
typeChangeMetadata("short", "integer", "element")
)).build()
))

Expand All @@ -374,44 +358,43 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest {
assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) ===
schemaWithoutMetadata -> Seq(
Seq.empty -> schema("i"),
Seq.empty -> schema("d"),
Seq.empty -> schema("a"),
Seq.empty -> schema("m")
))
}

test("addTypeWideningMetadata/removeTypeWideningMetadata on nested fields") {
val schemaWithoutMetadata = StructType.fromDDL(
"s struct<i: long, a: array<map<int, long>>, m: map<map<long, int>, array<long>>>")
"s struct<i: int, a: array<map<int, int>>, m: map<map<int, int>, array<int>>>")
val firstOldSchema = StructType.fromDDL(
"s struct<i: short, a: array<map<byte, long>>, m: map<map<int, int>, array<long>>>")
"s struct<i: byte, a: array<map<byte, int>>, m: map<map<short, int>, array<int>>>")
val secondOldSchema = StructType.fromDDL(
"s struct<i: int, a: array<map<int, int>>, m: map<map<long, int>, array<int>>>")
"s struct<i: short, a: array<map<int, short>>, m: map<map<int, int>, array<short>>>")

// First type change on all struct fields.
var schema =
TypeWideningMetadata.addTypeWideningMetadata(txn, schemaWithoutMetadata, firstOldSchema)
var struct = schema("s").dataType.asInstanceOf[StructType]

assert(struct("i") === StructField("i", LongType,
assert(struct("i") === StructField("i", IntegerType,
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("short", "long")
typeChangeMetadata("byte", "integer")
)).build()
))

assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, LongType)),
assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, IntegerType)),
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("byte", "integer", "element.key")
)).build()
))

assert(struct("m") === StructField("m",
MapType(MapType(LongType, IntegerType), ArrayType(LongType)),
MapType(MapType(IntegerType, IntegerType), ArrayType(IntegerType)),
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("integer", "long", "key.key")
typeChangeMetadata("short", "integer", "key.key")
)).build()
))

Expand All @@ -426,28 +409,28 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest {
schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, secondOldSchema)
struct = schema("s").dataType.asInstanceOf[StructType]

assert(struct("i") === StructField("i", LongType,
assert(struct("i") === StructField("i", IntegerType,
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("short", "long"),
typeChangeMetadata("integer", "long")
typeChangeMetadata("byte", "integer"),
typeChangeMetadata("short", "integer")
)).build()
))

assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, LongType)),
assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, IntegerType)),
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("byte", "integer", "element.key"),
typeChangeMetadata("integer", "long", "element.value")
typeChangeMetadata("short", "integer", "element.value")
)).build()
))

assert(struct("m") === StructField("m",
MapType(MapType(LongType, IntegerType), ArrayType(LongType)),
MapType(MapType(IntegerType, IntegerType), ArrayType(IntegerType)),
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("integer", "long", "key.key"),
typeChangeMetadata("integer", "long", "value.element")
typeChangeMetadata("short", "integer", "key.key"),
typeChangeMetadata("short", "integer", "value.element")
)).build()
))
assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) ===
Expand All @@ -459,18 +442,18 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest {
}

test("addTypeWideningMetadata/removeTypeWideningMetadata with added and removed fields") {
val newSchema = StructType.fromDDL("a int, b long, d int")
val oldSchema = StructType.fromDDL("a int, b int, c int")
val newSchema = StructType.fromDDL("a int, b int, d int")
val oldSchema = StructType.fromDDL("a int, b short, c int")

val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, newSchema, oldSchema)
assert(schema("a") === StructField("a", IntegerType))
assert(schema("d") === StructField("d", IntegerType))
assert(!schema.contains("c"))

assert(schema("b") === StructField("b", LongType,
assert(schema("b") === StructField("b", IntegerType,
metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
typeChangeMetadata("integer", "long")
typeChangeMetadata("short", "integer")
)).build()
))
assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) ===
Expand Down
Loading

0 comments on commit e70ca04

Please sign in to comment.