From 647fef752f6100fa5bee07958deac4e6aa1b297e Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 26 Nov 2024 14:45:18 -0600 Subject: [PATCH 1/3] Add in tests for failing cases Signed-off-by: Robert (Bobby) Evans --- .../src/main/python/orc_write_test.py | 20 ++++++++++++++++++ .../src/main/python/parquet_write_test.py | 19 +++++++++++++++++ .../spark/rapids/GpuParquetFileFormat.scala | 3 ++- .../rapids/ParquetCachedBatchSerializer.scala | 5 +++-- .../com/nvidia/spark/rapids/SchemaUtils.scala | 21 ++++++++++++------- .../sql/hive/rapids/GpuHiveFileFormat.scala | 3 ++- .../spark/sql/rapids/GpuOrcFileFormat.scala | 4 ++-- 7 files changed, 61 insertions(+), 14 deletions(-) diff --git a/integration_tests/src/main/python/orc_write_test.py b/integration_tests/src/main/python/orc_write_test.py index ddb69524ac4..b7539870b05 100644 --- a/integration_tests/src/main/python/orc_write_test.py +++ b/integration_tests/src/main/python/orc_write_test.py @@ -102,6 +102,26 @@ def test_write_round_trip_corner(spark_tmp_path, orc_gen, orc_impl): data_path, conf={'spark.sql.orc.impl': orc_impl, 'spark.rapids.sql.format.orc.write.enabled': True}) +@pytest.mark.parametrize('gen', [ByteGen(nullable=False), + ShortGen(nullable=False), + IntegerGen(nullable=False), + LongGen(nullable=False), + FloatGen(nullable=False), + DoubleGen(nullable=False), + BooleanGen(nullable=False), + StringGen(nullable=False), + StructGen([('b', LongGen(nullable=False))], nullable=False)], ids=idfn) +@pytest.mark.parametrize('orc_impl', ["native", "hive"]) +@allow_non_gpu(*non_utc_allow) +def test_write_round_trip_nullable_struct(spark_tmp_path, gen, orc_impl): + gen_for_struct = StructGen([('c', gen)], nullable=True) + data_path = spark_tmp_path + '/ORC_DATA' + assert_gpu_and_cpu_writes_are_equal_collect( + lambda spark, path: unary_op_df(spark, gen_for_struct, num_slices=1).write.orc(path), + lambda spark, path: spark.read.orc(path), + data_path, + conf={'spark.sql.orc.impl': orc_impl, 'spark.rapids.sql.format.orc.write.enabled': True}) + orc_part_write_gens = [ byte_gen, short_gen, int_gen, long_gen, boolean_gen, # Some file systems have issues with UTF8 strings so to help the test pass even there diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index 775b4a9d1cb..a0445691232 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -105,6 +105,25 @@ def test_write_round_trip(spark_tmp_path, parquet_gens): data_path, conf=writer_confs) +@pytest.mark.parametrize('gen', [ByteGen(nullable=False), + ShortGen(nullable=False), + IntegerGen(nullable=False), + LongGen(nullable=False), + FloatGen(nullable=False), + DoubleGen(nullable=False), + BooleanGen(nullable=False), + StringGen(nullable=False), + StructGen([('b', LongGen(nullable=False))], nullable=False)], ids=idfn) +@allow_non_gpu(*non_utc_allow) +def test_write_round_trip_nullable_struct(spark_tmp_path, gen): + gen_for_struct = StructGen([('c', gen)], nullable=True) + data_path = spark_tmp_path + '/PARQUET_DATA' + assert_gpu_and_cpu_writes_are_equal_collect( + lambda spark, path: unary_op_df(spark, gen_for_struct, num_slices=1).write.parquet(path), + lambda spark, path: spark.read.parquet(path), + data_path, + conf=writer_confs) + all_nulls_string_gen = SetValuesGen(StringType(), [None]) empty_or_null_string_gen = SetValuesGen(StringType(), [None, ""]) all_empty_string_gen = SetValuesGen(StringType(), [""]) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 2b5f246e56a..f96c7e9edc8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -390,7 +390,8 @@ class GpuParquetWriter( val builder = SchemaUtils .writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema, ParquetOutputTimestampType.INT96 == SQLConf.get.parquetOutputTimestampType, - parquetFieldIdEnabled) + parquetFieldIdEnabled, + nullable = false) .withMetadata(writeContext.getExtraMetaData) .withCompressionType(compressionType) Table.writeParquetChunked(builder.build(), this) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala index d88f21922ce..885c367170a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -430,7 +430,8 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer { schema: StructType): ParquetWriterOptions = { val compressionType = if (useCompression) CompressionType.SNAPPY else CompressionType.NONE SchemaUtils - .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false) + .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false, + nullable = false) .withCompressionType(compressionType) .withStatisticsFrequency(StatisticsFrequency.ROWGROUP).build() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala index 22047f22e68..47409df4d2c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -247,19 +247,19 @@ object SchemaUtils { dataType match { case dt: DecimalType => - if(parquetFieldIdWriteEnabled && parquetFieldId.nonEmpty) { + if (parquetFieldIdWriteEnabled && parquetFieldId.nonEmpty) { builder.withDecimalColumn(name, dt.precision, nullable, parquetFieldId.get) } else { builder.withDecimalColumn(name, dt.precision, nullable) } case TimestampType => - if(parquetFieldIdWriteEnabled && parquetFieldId.nonEmpty) { + if (parquetFieldIdWriteEnabled && parquetFieldId.nonEmpty) { builder.withTimestampColumn(name, writeInt96, nullable, parquetFieldId.get) } else { builder.withTimestampColumn(name, writeInt96, nullable) } case s: StructType => - val structB = if(parquetFieldIdWriteEnabled && parquetFieldId.nonEmpty) { + val structB = if (parquetFieldIdWriteEnabled && parquetFieldId.nonEmpty) { structBuilder(name, nullable, parquetFieldId.get) } else { structBuilder(name, nullable) @@ -267,7 +267,9 @@ object SchemaUtils { builder.withStructColumn(writerOptionsFromSchema( structB, s, - writeInt96, parquetFieldIdWriteEnabled).build()) + writeInt96, + parquetFieldIdWriteEnabled, + nullable = nullable).build()) case a: ArrayType => builder.withListColumn( writerOptionsFromField( @@ -329,10 +331,13 @@ object SchemaUtils { builder: NestedBuilder[T, V], schema: StructType, writeInt96: Boolean = false, - parquetFieldIdEnabled: Boolean = false): T = { + parquetFieldIdEnabled: Boolean = false, + nullable: Boolean = true): T = { schema.foreach(field => - writerOptionsFromField(builder, field.dataType, field.name, field.nullable, writeInt96, - field.metadata, parquetFieldIdEnabled) + // CUDF has issues if the child of a struct is not-nullable, but the struct itself is + // So we have to work around it and tell CUDF what it expects. + writerOptionsFromField(builder, field.dataType, field.name, nullable || field.nullable, + writeInt96, field.metadata, parquetFieldIdWriteEnabled = parquetFieldIdEnabled) ) builder.asInstanceOf[T] } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala index 3b5244e5c79..ce6d273b6c4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala @@ -196,7 +196,8 @@ class GpuHiveParquetWriter(override val path: String, dataSchema: StructType, val optionsBuilder = SchemaUtils .writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema, writeInt96 = true, // Hive 1.2 write timestamp as INT96 - parquetFieldIdEnabled = false) + parquetFieldIdEnabled = false, + nullable = false) .withCompressionType(compType) Table.writeParquetChunked(optionsBuilder.build(), this) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala index d2f4380646c..3f4aaa0c7fa 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -200,7 +200,7 @@ class GpuOrcWriter(override val path: String, override val tableWriter: TableWriter = { val builder = SchemaUtils - .writerOptionsFromSchema(ORCWriterOptions.builder(), dataSchema) + .writerOptionsFromSchema(ORCWriterOptions.builder(), dataSchema, nullable = false) .withCompressionType(CompressionType.valueOf(OrcConf.COMPRESS.getString(conf))) Table.writeORCChunked(builder.build(), this) } From c08d9558c58eb17e555524d7fc8151a918c8a9a5 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 2 Dec 2024 14:47:35 -0600 Subject: [PATCH 2/3] Review comments --- .../com/nvidia/spark/rapids/GpuParquetFileFormat.scala | 4 ++-- .../spark/rapids/ParquetCachedBatchSerializer.scala | 4 ++-- .../main/scala/com/nvidia/spark/rapids/SchemaUtils.scala | 8 ++++---- .../apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index f96c7e9edc8..52a8006a925 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -389,9 +389,9 @@ class GpuParquetWriter( val writeContext = new ParquetWriteSupport().init(conf) val builder = SchemaUtils .writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema, + nullable = false, ParquetOutputTimestampType.INT96 == SQLConf.get.parquetOutputTimestampType, - parquetFieldIdEnabled, - nullable = false) + parquetFieldIdEnabled) .withMetadata(writeContext.getExtraMetaData) .withCompressionType(compressionType) Table.writeParquetChunked(builder.build(), this) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala index 885c367170a..861905f45f7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala @@ -430,8 +430,8 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer { schema: StructType): ParquetWriterOptions = { val compressionType = if (useCompression) CompressionType.SNAPPY else CompressionType.NONE SchemaUtils - .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false, - nullable = false) + .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, nullable = false, + writeInt96 = false) .withCompressionType(compressionType) .withStatisticsFrequency(StatisticsFrequency.ROWGROUP).build() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala index 47409df4d2c..cc36fc7c848 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala @@ -267,9 +267,9 @@ object SchemaUtils { builder.withStructColumn(writerOptionsFromSchema( structB, s, + nullable = nullable, writeInt96, - parquetFieldIdWriteEnabled, - nullable = nullable).build()) + parquetFieldIdWriteEnabled).build()) case a: ArrayType => builder.withListColumn( writerOptionsFromField( @@ -330,9 +330,9 @@ object SchemaUtils { def writerOptionsFromSchema[T <: NestedBuilder[T, V], V <: ColumnWriterOptions]( builder: NestedBuilder[T, V], schema: StructType, + nullable: Boolean, writeInt96: Boolean = false, - parquetFieldIdEnabled: Boolean = false, - nullable: Boolean = true): T = { + parquetFieldIdEnabled: Boolean = false): T = { schema.foreach(field => // CUDF has issues if the child of a struct is not-nullable, but the struct itself is // So we have to work around it and tell CUDF what it expects. diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala index ce6d273b6c4..4f9f555edf0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala @@ -195,9 +195,9 @@ class GpuHiveParquetWriter(override val path: String, dataSchema: StructType, override protected val tableWriter: CudfTableWriter = { val optionsBuilder = SchemaUtils .writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema, + nullable = false, writeInt96 = true, // Hive 1.2 write timestamp as INT96 - parquetFieldIdEnabled = false, - nullable = false) + parquetFieldIdEnabled = false) .withCompressionType(compType) Table.writeParquetChunked(optionsBuilder.build(), this) } From 5fda55a3510ae9c0a29359a12deff97f03e2c4f6 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 13 Dec 2024 09:28:09 -0600 Subject: [PATCH 3/3] Update to deal with bools off for ORC --- integration_tests/src/main/python/orc_write_test.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/orc_write_test.py b/integration_tests/src/main/python/orc_write_test.py index 672d6c44e8a..103cae474a3 100644 --- a/integration_tests/src/main/python/orc_write_test.py +++ b/integration_tests/src/main/python/orc_write_test.py @@ -151,7 +151,11 @@ def test_write_round_trip_nullable_struct(spark_tmp_path, gen, orc_impl): lambda spark, path: unary_op_df(spark, gen_for_struct, num_slices=1).write.orc(path), lambda spark, path: spark.read.orc(path), data_path, - conf={'spark.sql.orc.impl': orc_impl, 'spark.rapids.sql.format.orc.write.enabled': True}) + conf={'spark.sql.orc.impl': orc_impl, + 'spark.rapids.sql.format.orc.write.enabled': True, + # https://github.com/NVIDIA/spark-rapids/issues/11736, so verify that we still do it correctly + # once this is fixed + 'spark.rapids.sql.format.orc.write.boolType.enabled' : True}) orc_part_write_gens = [ # Add back boolean_gen when https://github.com/rapidsai/cudf/issues/6763 is fixed