From e0d87ef1d6e6c008b4b1bec935687df604158784 Mon Sep 17 00:00:00 2001 From: Paul Mattione <pmattione@nvidia.com> Date: Sun, 15 Sep 2024 12:58:07 -0400 Subject: [PATCH 1/3] Add ability to set parquet row group max #rows and #bytes in java --- .../ai/rapids/cudf/ParquetWriterOptions.java | 24 +++++++ java/src/main/java/ai/rapids/cudf/Table.java | 68 +++++++++++-------- java/src/main/native/src/TableJni.cpp | 8 +++ .../test/java/ai/rapids/cudf/TableTest.java | 8 ++- 4 files changed, 79 insertions(+), 29 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java index 7b58817550d..744a10c5ca9 100644 --- a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java +++ b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java @@ -24,9 +24,13 @@ */ public final class ParquetWriterOptions extends CompressionMetadataWriterOptions { private final StatisticsFrequency statsGranularity; + private int rowGroupSizeRows; + private long rowGroupSizeBytes; private ParquetWriterOptions(Builder builder) { super(builder); + this.rowGroupSizeRows = builder.rowGroupSizeRows; + this.rowGroupSizeBytes = builder.rowGroupSizeBytes; this.statsGranularity = builder.statsGranularity; } @@ -51,18 +55,38 @@ public static Builder builder() { return new Builder(); } + public int getRowGroupSizeRows() { + return rowGroupSizeRows; + } + + public long getRowGroupSizeBytes() { + return rowGroupSizeBytes; + } + public StatisticsFrequency getStatisticsFrequency() { return statsGranularity; } public static class Builder extends CompressionMetadataWriterOptions.Builder <Builder, ParquetWriterOptions> { + private int rowGroupSizeRows = 1000000; //Max of 1 million rows per row group + private long rowGroupSizeBytes = 128 * 1024 * 1024; //Max of 128MB per row group private StatisticsFrequency statsGranularity = StatisticsFrequency.ROWGROUP; public Builder() { super(); } + public Builder withRowGroupSizeRows(int rowGroupSizeRows) { + this.rowGroupSizeRows = rowGroupSizeRows; + return this; + } + + public Builder withRowGroupSizeBytes(long rowGroupSizeBytes) { + this.rowGroupSizeBytes = rowGroupSizeBytes; + return this; + } + public Builder withStatisticsFrequency(StatisticsFrequency statsGranularity) { this.statsGranularity = statsGranularity; return this; diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index 36e342cae13..715ce6b1bac 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -315,20 +315,22 @@ private static native long[] readAvroFromDataSource(String[] filterColumnNames, /** * Setup everything to write parquet formatted data to a file. - * @param columnNames names that correspond to the table columns - * @param numChildren Children of the top level - * @param flatNumChildren flattened list of children per column - * @param nullable true if the column can have nulls else false - * @param metadataKeys Metadata key names to place in the Parquet file - * @param metadataValues Metadata values corresponding to metadataKeys - * @param compression native compression codec ID - * @param statsFreq native statistics frequency ID - * @param isInt96 true if timestamp type is int96 - * @param precisions precision list containing all the precisions of the decimal types in - * the columns - * @param isMapValues true if a column is a map - * @param isBinaryValues true if a column is a binary - * @param filename local output path + * @param columnNames names that correspond to the table columns + * @param numChildren Children of the top level + * @param flatNumChildren flattened list of children per column + * @param nullable true if the column can have nulls else false + * @param metadataKeys Metadata key names to place in the Parquet file + * @param metadataValues Metadata values corresponding to metadataKeys + * @param compression native compression codec ID + * @param rowGroupSizeRows max #rows in a row group + * @param rowGroupSizeBytes max #bytes in a row group + * @param statsFreq native statistics frequency ID + * @param isInt96 true if timestamp type is int96 + * @param precisions precision list containing all the precisions of the decimal types in + * the columns + * @param isMapValues true if a column is a map + * @param isBinaryValues true if a column is a binary + * @param filename local output path * @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd. */ private static native long writeParquetFileBegin(String[] columnNames, @@ -338,6 +340,8 @@ private static native long writeParquetFileBegin(String[] columnNames, String[] metadataKeys, String[] metadataValues, int compression, + int rowGroupSizeRows, + long rowGroupSizeBytes, int statsFreq, boolean[] isInt96, int[] precisions, @@ -349,20 +353,22 @@ private static native long writeParquetFileBegin(String[] columnNames, /** * Setup everything to write parquet formatted data to a buffer. - * @param columnNames names that correspond to the table columns - * @param numChildren Children of the top level - * @param flatNumChildren flattened list of children per column - * @param nullable true if the column can have nulls else false - * @param metadataKeys Metadata key names to place in the Parquet file - * @param metadataValues Metadata values corresponding to metadataKeys - * @param compression native compression codec ID - * @param statsFreq native statistics frequency ID - * @param isInt96 true if timestamp type is int96 - * @param precisions precision list containing all the precisions of the decimal types in - * the columns - * @param isMapValues true if a column is a map - * @param isBinaryValues true if a column is a binary - * @param consumer consumer of host buffers produced. + * @param columnNames names that correspond to the table columns + * @param numChildren Children of the top level + * @param flatNumChildren flattened list of children per column + * @param nullable true if the column can have nulls else false + * @param metadataKeys Metadata key names to place in the Parquet file + * @param metadataValues Metadata values corresponding to metadataKeys + * @param compression native compression codec ID + * @param rowGroupSizeRows max #rows in a row group + * @param rowGroupSizeBytes max #bytes in a row group + * @param statsFreq native statistics frequency ID + * @param isInt96 true if timestamp type is int96 + * @param precisions precision list containing all the precisions of the decimal types in + * the columns + * @param isMapValues true if a column is a map + * @param isBinaryValues true if a column is a binary + * @param consumer consumer of host buffers produced. * @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd. */ private static native long writeParquetBufferBegin(String[] columnNames, @@ -372,6 +378,8 @@ private static native long writeParquetBufferBegin(String[] columnNames, String[] metadataKeys, String[] metadataValues, int compression, + int rowGroupSizeRows, + long rowGroupSizeBytes, int statsFreq, boolean[] isInt96, int[] precisions, @@ -1773,6 +1781,8 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) { options.getMetadataKeys(), options.getMetadataValues(), options.getCompressionType().nativeId, + options.getRowGroupSizeRows(), + options.getRowGroupSizeBytes(), options.getStatisticsFrequency().nativeId, options.getFlatIsTimeTypeInt96(), options.getFlatPrecision(), @@ -1793,6 +1803,8 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons options.getMetadataKeys(), options.getMetadataValues(), options.getCompressionType().nativeId, + options.getRowGroupSizeRows(), + options.getRowGroupSizeBytes(), options.getStatisticsFrequency().nativeId, options.getFlatIsTimeTypeInt96(), options.getFlatPrecision(), diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index c749c8c84bf..4888caa3b93 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -2109,6 +2109,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env, jobjectArray j_metadata_keys, jobjectArray j_metadata_values, jint j_compression, + jint j_row_group_size_rows, + jlong j_row_group_size_bytes, jint j_stats_freq, jbooleanArray j_isInt96, jintArray j_precisions, @@ -2164,6 +2166,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env, chunked_parquet_writer_options::builder(sink) .metadata(std::move(metadata)) .compression(static_cast<compression_type>(j_compression)) + .row_group_size_rows(j_row_group_size_rows) + .row_group_size_bytes(j_row_group_size_bytes) .stats_level(static_cast<statistics_freq>(j_stats_freq)) .key_value_metadata({kv_metadata}) .compression_statistics(stats) @@ -2186,6 +2190,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env, jobjectArray j_metadata_keys, jobjectArray j_metadata_values, jint j_compression, + jint j_row_group_size_rows, + jlong j_row_group_size_bytes, jint j_stats_freq, jbooleanArray j_isInt96, jintArray j_precisions, @@ -2239,6 +2245,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env, chunked_parquet_writer_options::builder(sink) .metadata(std::move(metadata)) .compression(static_cast<compression_type>(j_compression)) + .row_group_size_rows(j_row_group_size_rows) + .row_group_size_bytes(j_row_group_size_bytes) .stats_level(static_cast<statistics_freq>(j_stats_freq)) .key_value_metadata({kv_metadata}) .compression_statistics(stats) diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index 050bcbb268f..1fd4e87b015 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -8921,7 +8921,11 @@ void testParquetWriteToBufferChunked() { columns.add(Columns.STRUCT.name); WriteUtils.buildWriterOptions(optBuilder, columns); ParquetWriterOptions options = optBuilder.build(); - ParquetWriterOptions optionsNoCompress = optBuilder.withCompressionType(CompressionType.NONE).build(); + ParquetWriterOptions optionsNoCompress = + optBuilder.withCompressionType(CompressionType.NONE) + .withRowGroupSizeRows(4) + .withRowGroupSizeBytes(64) + .build(); try (Table table0 = getExpectedFileTable(columns); MyBufferConsumer consumer = new MyBufferConsumer()) { try (TableWriter writer = Table.writeParquetChunked(options, consumer)) { @@ -9007,6 +9011,8 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException { .withDecimalColumn("_c7", 4) .withDecimalColumn("_c8", 6) .withCompressionType(CompressionType.NONE) + .withRowGroupSizeRows(4) + .withRowGroupSizeBytes(64) .withStatisticsFrequency(ParquetWriterOptions.StatisticsFrequency.NONE) .build(); try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) { From ec1a4829b7904ea90a9ba83b7e9ef875c7cf2e7a Mon Sep 17 00:00:00 2001 From: Paul Mattione <pmattione@nvidia.com> Date: Sun, 15 Sep 2024 19:09:03 -0400 Subject: [PATCH 2/3] Increase limits --- java/src/test/java/ai/rapids/cudf/TableTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index 1fd4e87b015..ca7e93c630c 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -8923,8 +8923,8 @@ void testParquetWriteToBufferChunked() { ParquetWriterOptions options = optBuilder.build(); ParquetWriterOptions optionsNoCompress = optBuilder.withCompressionType(CompressionType.NONE) - .withRowGroupSizeRows(4) - .withRowGroupSizeBytes(64) + .withRowGroupSizeRows(10000) + .withRowGroupSizeBytes(10000) .build(); try (Table table0 = getExpectedFileTable(columns); MyBufferConsumer consumer = new MyBufferConsumer()) { @@ -9011,8 +9011,8 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException { .withDecimalColumn("_c7", 4) .withDecimalColumn("_c8", 6) .withCompressionType(CompressionType.NONE) - .withRowGroupSizeRows(4) - .withRowGroupSizeBytes(64) + .withRowGroupSizeRows(10000) + .withRowGroupSizeBytes(10000) .withStatisticsFrequency(ParquetWriterOptions.StatisticsFrequency.NONE) .build(); try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) { From 9b2c24cac77dac71c41e9cf3518bf912c68ce03a Mon Sep 17 00:00:00 2001 From: Paul Mattione <pmattione@nvidia.com> Date: Mon, 16 Sep 2024 13:39:18 -0400 Subject: [PATCH 3/3] Update copyright year --- java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java index 744a10c5ca9..8c8180436e6 100644 --- a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java +++ b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.