From 9394b35ed7bb57a343ab6038dc1c1345db104604 Mon Sep 17 00:00:00 2001 From: cashmand Date: Wed, 11 Dec 2024 08:12:27 -0800 Subject: [PATCH] [SPARK-48898][SQL] Set nullability correctly in the Variant schema ### What changes were proposed in this pull request? The `variantShreddingSchema` method converts a human-readable schema for Variant to one that's a valid shredding schema. According to the shredding schema in https://github.com/apache/parquet-format/pull/461, each shredded field in an object should be a required group - i.e. a non-nullable struct. This PR fixes the `variantShreddingSchema` to mark that struct as non-nullable. ### Why are the changes needed? If we use `variantShreddingSchema` to construct a schema for Parquet, the schema would be technically non-conformant with the spec by setting the group as optional. I don't think this should really matter to readers, but it would waste a bit of space in the Parquet file by adding an extra definition level. ### Does this PR introduce _any_ user-facing change? No, this code is not used yet. ### How was this patch tested? Added a test to do some minimal validation of the `variantShreddingSchema` function. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49118 from cashmand/SPARK-48898-nullability. Authored-by: cashmand Signed-off-by: Wenchen Fan --- .../parquet/SparkShreddingUtils.scala | 5 +++- .../sql/VariantWriteShreddingSuite.scala | 30 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala index 41244e20c369f..507633abfb285 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala @@ -81,8 +81,11 @@ case object SparkShreddingUtils { StructField(TypedValueFieldName, arrayShreddingSchema, nullable = true) ) case StructType(fields) => + // The field name level is always non-nullable: Variant null values are represented in the + // "value" columna as "00", and missing values are represented by setting both "value" and + // "typed_value" to null. val objectShreddingSchema = StructType(fields.map(f => - f.copy(dataType = variantShreddingSchema(f.dataType, false)))) + f.copy(dataType = variantShreddingSchema(f.dataType, false), nullable = false))) Seq( StructField(VariantValueFieldName, BinaryType, nullable = true), StructField(TypedValueFieldName, objectShreddingSchema, nullable = true) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala index a62c6e4462464..9022d8cfdca49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala @@ -67,6 +67,36 @@ class VariantWriteShreddingSuite extends SparkFunSuite with ExpressionEvalHelper private val emptyMetadata: Array[Byte] = parseJson("null").getMetadata + test("variantShreddingSchema") { + // Validate the schema produced by SparkShreddingUtils.variantShreddingSchema for a few simple + // cases. + // metadata is always non-nullable. + assert(SparkShreddingUtils.variantShreddingSchema(IntegerType) == + StructType(Seq( + StructField("metadata", BinaryType, nullable = false), + StructField("value", BinaryType, nullable = true), + StructField("typed_value", IntegerType, nullable = true)))) + + val fieldA = StructType(Seq( + StructField("value", BinaryType, nullable = true), + StructField("typed_value", TimestampNTZType, nullable = true))) + val arrayType = ArrayType(StructType(Seq( + StructField("value", BinaryType, nullable = true), + StructField("typed_value", StringType, nullable = true)))) + val fieldB = StructType(Seq( + StructField("value", BinaryType, nullable = true), + StructField("typed_value", arrayType, nullable = true))) + val objectType = StructType(Seq( + StructField("a", fieldA, nullable = false), + StructField("b", fieldB, nullable = false))) + val structSchema = DataType.fromDDL("a timestamp_ntz, b array") + assert(SparkShreddingUtils.variantShreddingSchema(structSchema) == + StructType(Seq( + StructField("metadata", BinaryType, nullable = false), + StructField("value", BinaryType, nullable = true), + StructField("typed_value", objectType, nullable = true)))) + } + test("shredding as fixed numeric types") { /* Cast integer to any wider numeric type. */ testWithSchema("1", IntegerType, Row(emptyMetadata, null, 1))