From c08d9558c58eb17e555524d7fc8151a918c8a9a5 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 2 Dec 2024 14:47:35 -0600 Subject: [PATCH] Review comments --- .../com/nvidia/spark/rapids/GpuParquetFileFormat.scala | 4 ++-- .../spark/rapids/ParquetCachedBatchSerializer.scala | 4 ++-- .../main/scala/com/nvidia/spark/rapids/SchemaUtils.scala | 8 ++++---- .../apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index f96c7e9edc8..52a8006a925 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -389,9 +389,9 @@ class GpuParquetWriter( val writeContext = new ParquetWriteSupport().init(conf) val builder = SchemaUtils .writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema, + nullable = false, ParquetOutputTimestampType.INT96 == SQLConf.get.parquetOutputTimestampType, - parquetFieldIdEnabled, - nullable = false) + parquetFieldIdEnabled) .withMetadata(writeContext.getExtraMetaData) .withCompressionType(compressionType) Table.writeParquetChunked(builder.build(), this) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala index 885c367170a..861905f45f7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ParquetCachedBatchSerializer.scala @@ -430,8 +430,8 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer { schema: StructType): ParquetWriterOptions = { val compressionType = if (useCompression) CompressionType.SNAPPY else CompressionType.NONE SchemaUtils - .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false, - nullable = false) + .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, nullable = false, + writeInt96 = false) .withCompressionType(compressionType) .withStatisticsFrequency(StatisticsFrequency.ROWGROUP).build() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala index 47409df4d2c..cc36fc7c848 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala @@ -267,9 +267,9 @@ object SchemaUtils { builder.withStructColumn(writerOptionsFromSchema( structB, s, + nullable = nullable, writeInt96, - parquetFieldIdWriteEnabled, - nullable = nullable).build()) + parquetFieldIdWriteEnabled).build()) case a: ArrayType => builder.withListColumn( writerOptionsFromField( @@ -330,9 +330,9 @@ object SchemaUtils { def writerOptionsFromSchema[T <: NestedBuilder[T, V], V <: ColumnWriterOptions]( builder: NestedBuilder[T, V], schema: StructType, + nullable: Boolean, writeInt96: Boolean = false, - parquetFieldIdEnabled: Boolean = false, - nullable: Boolean = true): T = { + parquetFieldIdEnabled: Boolean = false): T = { schema.foreach(field => // CUDF has issues if the child of a struct is not-nullable, but the struct itself is // So we have to work around it and tell CUDF what it expects. diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala index ce6d273b6c4..4f9f555edf0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala @@ -195,9 +195,9 @@ class GpuHiveParquetWriter(override val path: String, dataSchema: StructType, override protected val tableWriter: CudfTableWriter = { val optionsBuilder = SchemaUtils .writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema, + nullable = false, writeInt96 = true, // Hive 1.2 write timestamp as INT96 - parquetFieldIdEnabled = false, - nullable = false) + parquetFieldIdEnabled = false) .withCompressionType(compType) Table.writeParquetChunked(optionsBuilder.build(), this) }