Skip to content

Commit

Permalink
[SPARK-48898][SQL] Set nullability correctly in the Variant schema
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

The variantShreddingSchema method converts a human-readable schema for Variant to one that's a valid shredding schema. According to the shredding schema in apache/parquet-format#461, each shredded field in an object should be a required group - i.e. a non-nullable struct. This PR fixes the variantShreddingSchema to mark that struct as non-nullable.

### Why are the changes needed?

If we use variantShreddingSchema to construct a schema for Parquet, the schema would be technically non-conformant with the spec by setting the group as optional. I don't think this should really matter to readers, but it would waste a bit of space in the Parquet file by adding an extra definition level.

### Does this PR introduce _any_ user-facing change?

No, this code is not used yet.

### How was this patch tested?

Added a test to do some minimal validation of the variantShreddingSchema function.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49151 from cashmand/SPARK-48898-nullability-again.

Authored-by: cashmand <david.cashman@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cashmand authored and ericm-db committed Dec 16, 2024
1 parent 4f31bda commit 511f808
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,21 @@ case object SparkShreddingUtils {
*/
def variantShreddingSchema(dataType: DataType, isTopLevel: Boolean = true): StructType = {
val fields = dataType match {
case ArrayType(elementType, containsNull) =>
case ArrayType(elementType, _) =>
// Always set containsNull to false. One of value or typed_value must always be set for
// array elements.
val arrayShreddingSchema =
ArrayType(variantShreddingSchema(elementType, false), containsNull)
ArrayType(variantShreddingSchema(elementType, false), containsNull = false)
Seq(
StructField(VariantValueFieldName, BinaryType, nullable = true),
StructField(TypedValueFieldName, arrayShreddingSchema, nullable = true)
)
case StructType(fields) =>
// The field name level is always non-nullable: Variant null values are represented in the
// "value" columna as "00", and missing values are represented by setting both "value" and
// "typed_value" to null.
val objectShreddingSchema = StructType(fields.map(f =>
f.copy(dataType = variantShreddingSchema(f.dataType, false))))
f.copy(dataType = variantShreddingSchema(f.dataType, false), nullable = false)))
Seq(
StructField(VariantValueFieldName, BinaryType, nullable = true),
StructField(TypedValueFieldName, objectShreddingSchema, nullable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,16 @@ class VariantShreddingSuite extends QueryTest with SharedSparkSession with Parqu
Row(metadata(Nil), null, Array(Row(null, null))))
checkException(path, "v", "MALFORMED_VARIANT")
// Shredded field must not be null.
writeRows(path, writeSchema(StructType.fromDDL("a int")),
// Construct the schema manually, because SparkShreddingUtils.variantShreddingSchema will make
// `a` non-nullable, which would prevent us from writing the file.
val schema = StructType(Seq(StructField("v", StructType(Seq(
StructField("metadata", BinaryType),
StructField("value", BinaryType),
StructField("typed_value", StructType(Seq(
StructField("a", StructType(Seq(
StructField("value", BinaryType),
StructField("typed_value", BinaryType))))))))))))
writeRows(path, schema,
Row(metadata(Seq("a")), null, Row(null)))
checkException(path, "v", "MALFORMED_VARIANT")
// `value` must not contain any shredded field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,36 @@ class VariantWriteShreddingSuite extends SparkFunSuite with ExpressionEvalHelper

private val emptyMetadata: Array[Byte] = parseJson("null").getMetadata

test("variantShreddingSchema") {
// Validate the schema produced by SparkShreddingUtils.variantShreddingSchema for a few simple
// cases.
// metadata is always non-nullable.
assert(SparkShreddingUtils.variantShreddingSchema(IntegerType) ==
StructType(Seq(
StructField("metadata", BinaryType, nullable = false),
StructField("value", BinaryType, nullable = true),
StructField("typed_value", IntegerType, nullable = true))))

val fieldA = StructType(Seq(
StructField("value", BinaryType, nullable = true),
StructField("typed_value", TimestampNTZType, nullable = true)))
val arrayType = ArrayType(StructType(Seq(
StructField("value", BinaryType, nullable = true),
StructField("typed_value", StringType, nullable = true))), containsNull = false)
val fieldB = StructType(Seq(
StructField("value", BinaryType, nullable = true),
StructField("typed_value", arrayType, nullable = true)))
val objectType = StructType(Seq(
StructField("a", fieldA, nullable = false),
StructField("b", fieldB, nullable = false)))
val structSchema = DataType.fromDDL("a timestamp_ntz, b array<string>")
assert(SparkShreddingUtils.variantShreddingSchema(structSchema) ==
StructType(Seq(
StructField("metadata", BinaryType, nullable = false),
StructField("value", BinaryType, nullable = true),
StructField("typed_value", objectType, nullable = true))))
}

test("shredding as fixed numeric types") {
/* Cast integer to any wider numeric type. */
testWithSchema("1", IntegerType, Row(emptyMetadata, null, 1))
Expand Down

0 comments on commit 511f808

Please sign in to comment.