From e70ca04e2a49b2809756339d093c165d7ad1033d Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Wed, 2 Oct 2024 19:38:13 +0200 Subject: [PATCH] [Spark] Fix type widening with char/varchar columns (#3744) ## Description Using type widening on a table that contains a char/varchar column causes the following reads to fail with `DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA`: ``` CREATE TABLE t (a VARCHAR(10), b INT); ALTER TABLE t SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true'); ALTER TABLE t ALTER COLUMN b TYPE LONG; SELECT * FROM t; [DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA] Unable to operate on this table because an unsupported type change was applied. Field cut was changed from VARCHAR(10) to STRING` ``` Type changes are recorded in the table metadata and a check on read ensures that all type changes are supported by the current implementation as attempting to read data after an unsupported type change could lead to incorrect results. CHAR/VARCHAR columns are sometimes stripped down to STRING internally, for that reason, ALTER TABLE incorrectly identify that column `a` type changed to STRING and records it in the type widening metadata. The read check in turn doesn't recognize that type change as one of the supported widening type changes (which doesn't include changes to string columns). Fix: 1. Never record char/varchar/string type changes in the type widening metadata 2. Never record unsupported type changes in the type widening metadata and log an assertion instead. 3. Don't fail on char/varchar/string type changes in the type widening metadata if such type change slips through 1. This will prevent failing in case a non-compliant implementation still record a char/varchar/string type change. 4. Provide a table property to bypass the check if a similar issue happens again in the future. --- .../sql/delta/DeltaParquetFileFormat.scala | 6 +- .../apache/spark/sql/delta/TypeWidening.scala | 13 ++- .../sql/delta/TypeWideningMetadata.scala | 18 +++- .../commands/alterDeltaTableCommands.scala | 5 +- .../sql/delta/sources/DeltaSQLConf.scala | 14 ++++ ...ypeWideningFeatureCompatibilitySuite.scala | 35 ++++++++ .../TypeWideningMetadataSuite.scala | 83 ++++++++----------- .../TypeWideningTableFeatureSuite.scala | 73 ++++++++++++++++ 8 files changed, 191 insertions(+), 56 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala index 830ab623741..1ce8586ef09 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.deletionvectors.{DropMarkedRowsFilter, KeepAll import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.util.ScalaExtensions._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job @@ -73,7 +74,10 @@ case class DeltaParquetFileFormat( } } - TypeWidening.assertTableReadable(protocol, metadata) + SparkSession.getActiveSession.ifDefined { session => + TypeWidening.assertTableReadable(session.sessionState.conf, protocol, metadata) + } + val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode val referenceSchema: StructType = metadata.schema diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index 90081c559e8..8c7e39eee63 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, TableFeatureProtocolUtils} +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ object TypeWidening { @@ -80,8 +82,9 @@ object TypeWidening { * happen unless a non-compliant writer applied a type change that is not part of the feature * specification. */ - def assertTableReadable(protocol: Protocol, metadata: Metadata): Unit = { - if (!isSupported(protocol) || + def assertTableReadable(conf: SQLConf, protocol: Protocol, metadata: Metadata): Unit = { + if (conf.getConf(DeltaSQLConf.DELTA_TYPE_WIDENING_BYPASS_UNSUPPORTED_TYPE_CHANGE_CHECK) || + !isSupported(protocol) || !TypeWideningMetadata.containsTypeWideningMetadata(metadata.schema)) { return } @@ -89,6 +92,12 @@ object TypeWidening { TypeWideningMetadata.getAllTypeChanges(metadata.schema).foreach { case (_, TypeChange(_, from: AtomicType, to: AtomicType, _)) if isTypeChangeSupported(from, to) => + // Char/Varchar/String type changes are allowed and independent from type widening. + // Implementations shouldn't record these type changes in the table metadata per the Delta + // spec, but in case that happen we really shouldn't block reading the table. + case (_, TypeChange(_, + _: StringType | CharType(_) | VarcharType(_), + _: StringType | CharType(_) | VarcharType(_), _)) => case (fieldPath, TypeChange(_, from: AtomicType, to: AtomicType, _)) if stableFeatureCanReadTypeChange(from, to) => val featureName = if (protocol.isFeatureSupported(TypeWideningPreviewTableFeature)) { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala index 47c103a76a0..2c8646727bb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala @@ -188,14 +188,28 @@ private[delta] object TypeWideningMetadata extends DeltaLogging { collectTypeChanges(from.elementType, to.elementType).map { typeChange => typeChange.copy(fieldPath = "element" +: typeChange.fieldPath) } - case (fromType: AtomicType, toType: AtomicType) if fromType != toType => + case (fromType: AtomicType, toType: AtomicType) if fromType != toType && + TypeWidening.isTypeChangeSupported(fromType, toType) => Seq(TypeChange( version = None, fromType, toType, fieldPath = Seq.empty )) - case (_: AtomicType, _: AtomicType) => Seq.empty + // Char/Varchar/String type changes are expected and unrelated to type widening. We don't record + // them in the table schema metadata and don't log them as unexpected type changes either, + case (StringType | CharType(_) | VarcharType(_), StringType | CharType(_) | VarcharType(_)) => + Seq.empty + case (_: AtomicType, _: AtomicType) => + deltaAssert(fromType == toType, + name = "typeWidening.unexpectedTypeChange", + msg = s"Trying to apply an unsupported type change: $fromType to $toType", + data = Map( + "fromType" -> fromType.sql, + "toType" -> toType.sql + ) + ) + Seq.empty // Don't recurse inside structs, `collectTypeChanges` should be called directly on each struct // fields instead to only collect type changes inside these fields. case (_: StructType, _: StructType) => Seq.empty diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index ee671af7b0e..bd4e4b9a449 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -720,7 +720,10 @@ case class AlterTableChangeColumnDeltaCommand( StatisticsCollection.renameDeltaStatsColumn(metadata, oldColumnPath, newColumnPath) val newSchemaWithTypeWideningMetadata = - TypeWideningMetadata.addTypeWideningMetadata(txn, schema = newSchema, oldSchema = oldSchema) + TypeWideningMetadata.addTypeWideningMetadata( + txn, + schema = newSchema, + oldSchema = metadata.schema) val newMetadata = metadata.copy( schemaString = newSchemaWithTypeWideningMetadata.json, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 41ba27ac219..ba460961e42 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1074,6 +1074,20 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + /** + * Internal config to bypass the check that ensures a table doesn't contain any unsupported type + * change when reading it. Meant as a mitigation in case the check incorrectly flags valid cases. + */ + val DELTA_TYPE_WIDENING_BYPASS_UNSUPPORTED_TYPE_CHANGE_CHECK = + buildConf("typeWidening.bypassUnsupportedTypeChangeCheck") + .internal() + .doc(""" + | Disables check that ensures a table doesn't contain any unsupported type change when + | reading it. + |""".stripMargin) + .booleanConf + .createWithDefault(false) + val DELTA_IS_DELTA_TABLE_THROW_ON_ERROR = buildConf("isDeltaTable.throwOnError") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningFeatureCompatibilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningFeatureCompatibilitySuite.scala index d524d2fcd62..f29565f1212 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningFeatureCompatibilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningFeatureCompatibilitySuite.scala @@ -120,6 +120,41 @@ trait TypeWideningCompatibilityTests { assert(latestVersion.schema("a").dataType === ShortType) checkAnswer(latestVersion, Seq(Row(1), Row(2))) } + + test("compatibility with char/varchar columns") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte, c char(3), v varchar(3)) USING DELTA") + append(Seq((1.toByte, "abc", "def")).toDF("a", "c", "v")) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1, "abc", "def"))) + + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE smallint") + append(Seq((2.toShort, "ghi", "jkl")).toDF("a", "c", "v")) + assert(readDeltaTable(tempPath).schema === + new StructType() + .add("a", ShortType, nullable = true, + metadata = typeWideningMetadata(version = 2, ByteType, ShortType)) + .add("c", StringType, nullable = true, + metadata = new MetadataBuilder() + .putString("__CHAR_VARCHAR_TYPE_STRING", "char(3)") + .build() + ) + .add("v", StringType, nullable = true, + metadata = new MetadataBuilder() + .putString("__CHAR_VARCHAR_TYPE_STRING", "varchar(3)") + .build())) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1, "abc", "def"), Row(2, "ghi", "jkl"))) + + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN c TYPE string") + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN v TYPE string") + append(Seq((3.toShort, "longer string 1", "longer string 2")).toDF("a", "c", "v")) + assert(readDeltaTable(tempPath).schema === + new StructType() + .add("a", ShortType, nullable = true, + metadata = typeWideningMetadata(version = 2, ByteType, ShortType)) + .add("c", StringType) + .add("v", StringType)) + checkAnswer(readDeltaTable(tempPath), + Seq(Row(1, "abc", "def"), Row(2, "ghi", "jkl"), Row(3, "longer string 1", "longer string 2"))) + } } /** Trait collecting tests covering type widening + column mapping. */ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala index ef317b7f261..af96dccf410 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala @@ -292,33 +292,26 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { test("addTypeWideningMetadata/removeTypeWideningMetadata on top-level fields") { val schemaWithoutMetadata = - StructType.fromDDL("i long, d decimal(15, 4), a array, m map") + StructType.fromDDL("i int, a array, m map") val firstOldSchema = - StructType.fromDDL("i short, d decimal(6, 2), a array, m map") + StructType.fromDDL("i byte, a array, m map") val secondOldSchema = - StructType.fromDDL("i int, d decimal(10, 4), a array, m map") + StructType.fromDDL("i short, a array, m map") var schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schemaWithoutMetadata, firstOldSchema) - assert(schema("i") === StructField("i", LongType, + assert(schema("i") === StructField("i", IntegerType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata("short", "long") + typeChangeMetadata("byte", "integer") )).build() )) - assert(schema("d") === StructField("d", DecimalType(15, 4), + assert(schema("a") === StructField("a", ArrayType(IntegerType), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata("decimal(6,2)", "decimal(15,4)") - )).build() - )) - - assert(schema("a") === StructField("a", ArrayType(DoubleType), - metadata = new MetadataBuilder() - .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata("byte", "double", "element") + typeChangeMetadata("byte", "integer", "element") )).build() )) @@ -332,34 +325,25 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === schemaWithoutMetadata -> Seq( Seq.empty -> schema("i"), - Seq.empty -> schema("d"), Seq.empty -> schema("a"), Seq.empty -> schema("m") )) // Second type change on all fields. schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, secondOldSchema) - assert(schema("i") === StructField("i", LongType, + assert(schema("i") === StructField("i", IntegerType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata("short", "long"), - typeChangeMetadata("integer", "long") + typeChangeMetadata("byte", "integer"), + typeChangeMetadata("short", "integer") )).build() )) - assert(schema("d") === StructField("d", DecimalType(15, 4), + assert(schema("a") === StructField("a", ArrayType(IntegerType), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata("decimal(6,2)", "decimal(15,4)"), - typeChangeMetadata("decimal(10,4)", "decimal(15,4)") - )).build() - )) - - assert(schema("a") === StructField("a", ArrayType(DoubleType), - metadata = new MetadataBuilder() - .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata("byte", "double", "element"), - typeChangeMetadata("integer", "double", "element") + typeChangeMetadata("byte", "integer", "element"), + typeChangeMetadata("short", "integer", "element") )).build() )) @@ -374,7 +358,6 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === schemaWithoutMetadata -> Seq( Seq.empty -> schema("i"), - Seq.empty -> schema("d"), Seq.empty -> schema("a"), Seq.empty -> schema("m") )) @@ -382,25 +365,25 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { test("addTypeWideningMetadata/removeTypeWideningMetadata on nested fields") { val schemaWithoutMetadata = StructType.fromDDL( - "s struct>, m: map, array>>") + "s struct>, m: map, array>>") val firstOldSchema = StructType.fromDDL( - "s struct>, m: map, array>>") + "s struct>, m: map, array>>") val secondOldSchema = StructType.fromDDL( - "s struct>, m: map, array>>") + "s struct>, m: map, array>>") // First type change on all struct fields. var schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schemaWithoutMetadata, firstOldSchema) var struct = schema("s").dataType.asInstanceOf[StructType] - assert(struct("i") === StructField("i", LongType, + assert(struct("i") === StructField("i", IntegerType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata("short", "long") + typeChangeMetadata("byte", "integer") )).build() )) - assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, LongType)), + assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, IntegerType)), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( typeChangeMetadata("byte", "integer", "element.key") @@ -408,10 +391,10 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { )) assert(struct("m") === StructField("m", - MapType(MapType(LongType, IntegerType), ArrayType(LongType)), + MapType(MapType(IntegerType, IntegerType), ArrayType(IntegerType)), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata("integer", "long", "key.key") + typeChangeMetadata("short", "integer", "key.key") )).build() )) @@ -426,28 +409,28 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, secondOldSchema) struct = schema("s").dataType.asInstanceOf[StructType] - assert(struct("i") === StructField("i", LongType, + assert(struct("i") === StructField("i", IntegerType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata("short", "long"), - typeChangeMetadata("integer", "long") + typeChangeMetadata("byte", "integer"), + typeChangeMetadata("short", "integer") )).build() )) - assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, LongType)), + assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, IntegerType)), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( typeChangeMetadata("byte", "integer", "element.key"), - typeChangeMetadata("integer", "long", "element.value") + typeChangeMetadata("short", "integer", "element.value") )).build() )) assert(struct("m") === StructField("m", - MapType(MapType(LongType, IntegerType), ArrayType(LongType)), + MapType(MapType(IntegerType, IntegerType), ArrayType(IntegerType)), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata("integer", "long", "key.key"), - typeChangeMetadata("integer", "long", "value.element") + typeChangeMetadata("short", "integer", "key.key"), + typeChangeMetadata("short", "integer", "value.element") )).build() )) assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === @@ -459,18 +442,18 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { } test("addTypeWideningMetadata/removeTypeWideningMetadata with added and removed fields") { - val newSchema = StructType.fromDDL("a int, b long, d int") - val oldSchema = StructType.fromDDL("a int, b int, c int") + val newSchema = StructType.fromDDL("a int, b int, d int") + val oldSchema = StructType.fromDDL("a int, b short, c int") val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, newSchema, oldSchema) assert(schema("a") === StructField("a", IntegerType)) assert(schema("d") === StructField("d", IntegerType)) assert(!schema.contains("c")) - assert(schema("b") === StructField("b", LongType, + assert(schema("b") === StructField("b", IntegerType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata("integer", "long") + typeChangeMetadata("short", "integer") )).build() )) assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala index 294dcd37332..e29d4c061cf 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala @@ -455,6 +455,79 @@ trait TypeWideningTableFeatureTests "toType" -> "STRING" ) ) + + // Validate that the internal table property can be used to bypass the check if needed. + withSQLConf( + DeltaSQLConf.DELTA_TYPE_WIDENING_BYPASS_UNSUPPORTED_TYPE_CHANGE_CHECK.key -> "true") { + readDeltaTable(tempPath).collect() + } + } + + test("unsupported type changes in nested structs") { + sql(s"CREATE TABLE delta.`$tempDir` (s struct) USING DELTA") + deltaLog.withNewTransaction { txn => + txn.commit( + Seq(txn.snapshot.metadata.copy( + schemaString = new StructType() + .add("s", new StructType() + .add("a", BooleanType, nullable = true, + metadata = typeWideningMetadata(version = 1, IntegerType, BooleanType))) + .json + )), + ManualUpdate) + } + + checkError( + intercept[DeltaIllegalStateException] { + readDeltaTable(tempPath).collect() + }, + "DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA", + parameters = Map( + "fieldName" -> "s.a", + "fromType" -> "INT", + "toType" -> "BOOLEAN" + ) + ) + } + + test("char/varchar/string type changes don't trigger the unsupported type change check") { + sql( + s""" + |CREATE TABLE delta.`$tempDir` ( + | a string, b string, c char(4), d char(4), e varchar(4), f varchar(4), s struct + |) USING DELTA + |""".stripMargin) + + // Add type change metadata for all string<->char<->varchar type changes and ensure the table + // can still be read. + // Note: compliant delta implementations shouldn't actually record these type changes in the + // table schema metadata. This test ensures that if a non-compliant implementation still does, + // we don't unnecessarily block reads. + deltaLog.withNewTransaction { txn => + txn.commit( + Seq(txn.snapshot.metadata.copy( + schemaString = new StructType() + .add("a", StringType, nullable = true, + metadata = typeWideningMetadata(version = 1, StringType, CharType(4))) + .add("b", StringType, nullable = true, + metadata = typeWideningMetadata(version = 1, StringType, VarcharType(4))) + .add("c", StringType, nullable = true, + metadata = typeWideningMetadata(version = 1, CharType(4), StringType)) + .add("d", StringType, nullable = true, + metadata = typeWideningMetadata(version = 1, CharType(4), VarcharType(4))) + .add("e", StringType, nullable = true, + metadata = typeWideningMetadata(version = 1, VarcharType(4), StringType)) + .add("f", StringType, nullable = true, + metadata = typeWideningMetadata(version = 1, VarcharType(4), CharType(4))) + .add("s", new StructType() + .add("x", StringType, nullable = true, + metadata = typeWideningMetadata(version = 1, StringType, CharType(4))) + ) + .json + )), + ManualUpdate) + } + readDeltaTable(tempPath).collect() } testSparkLatestOnly(