diff --git a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java index 7b58817550d..8c8180436e6 100644 --- a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java +++ b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,9 +24,13 @@ */ public final class ParquetWriterOptions extends CompressionMetadataWriterOptions { private final StatisticsFrequency statsGranularity; + private int rowGroupSizeRows; + private long rowGroupSizeBytes; private ParquetWriterOptions(Builder builder) { super(builder); + this.rowGroupSizeRows = builder.rowGroupSizeRows; + this.rowGroupSizeBytes = builder.rowGroupSizeBytes; this.statsGranularity = builder.statsGranularity; } @@ -51,18 +55,38 @@ public static Builder builder() { return new Builder(); } + public int getRowGroupSizeRows() { + return rowGroupSizeRows; + } + + public long getRowGroupSizeBytes() { + return rowGroupSizeBytes; + } + public StatisticsFrequency getStatisticsFrequency() { return statsGranularity; } public static class Builder extends CompressionMetadataWriterOptions.Builder { + private int rowGroupSizeRows = 1000000; //Max of 1 million rows per row group + private long rowGroupSizeBytes = 128 * 1024 * 1024; //Max of 128MB per row group private StatisticsFrequency statsGranularity = StatisticsFrequency.ROWGROUP; public Builder() { super(); } + public Builder withRowGroupSizeRows(int rowGroupSizeRows) { + this.rowGroupSizeRows = rowGroupSizeRows; + return this; + } + + public Builder withRowGroupSizeBytes(long rowGroupSizeBytes) { + this.rowGroupSizeBytes = rowGroupSizeBytes; + return this; + } + public Builder withStatisticsFrequency(StatisticsFrequency statsGranularity) { this.statsGranularity = statsGranularity; return this; diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index cbb126d7ee5..09da43374ae 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -332,20 +332,22 @@ private static native long[] readAvroFromDataSource(String[] filterColumnNames, /** * Setup everything to write parquet formatted data to a file. - * @param columnNames names that correspond to the table columns - * @param numChildren Children of the top level - * @param flatNumChildren flattened list of children per column - * @param nullable true if the column can have nulls else false - * @param metadataKeys Metadata key names to place in the Parquet file - * @param metadataValues Metadata values corresponding to metadataKeys - * @param compression native compression codec ID - * @param statsFreq native statistics frequency ID - * @param isInt96 true if timestamp type is int96 - * @param precisions precision list containing all the precisions of the decimal types in - * the columns - * @param isMapValues true if a column is a map - * @param isBinaryValues true if a column is a binary - * @param filename local output path + * @param columnNames names that correspond to the table columns + * @param numChildren Children of the top level + * @param flatNumChildren flattened list of children per column + * @param nullable true if the column can have nulls else false + * @param metadataKeys Metadata key names to place in the Parquet file + * @param metadataValues Metadata values corresponding to metadataKeys + * @param compression native compression codec ID + * @param rowGroupSizeRows max #rows in a row group + * @param rowGroupSizeBytes max #bytes in a row group + * @param statsFreq native statistics frequency ID + * @param isInt96 true if timestamp type is int96 + * @param precisions precision list containing all the precisions of the decimal types in + * the columns + * @param isMapValues true if a column is a map + * @param isBinaryValues true if a column is a binary + * @param filename local output path * @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd. */ private static native long writeParquetFileBegin(String[] columnNames, @@ -355,6 +357,8 @@ private static native long writeParquetFileBegin(String[] columnNames, String[] metadataKeys, String[] metadataValues, int compression, + int rowGroupSizeRows, + long rowGroupSizeBytes, int statsFreq, boolean[] isInt96, int[] precisions, @@ -366,20 +370,22 @@ private static native long writeParquetFileBegin(String[] columnNames, /** * Setup everything to write parquet formatted data to a buffer. - * @param columnNames names that correspond to the table columns - * @param numChildren Children of the top level - * @param flatNumChildren flattened list of children per column - * @param nullable true if the column can have nulls else false - * @param metadataKeys Metadata key names to place in the Parquet file - * @param metadataValues Metadata values corresponding to metadataKeys - * @param compression native compression codec ID - * @param statsFreq native statistics frequency ID - * @param isInt96 true if timestamp type is int96 - * @param precisions precision list containing all the precisions of the decimal types in - * the columns - * @param isMapValues true if a column is a map - * @param isBinaryValues true if a column is a binary - * @param consumer consumer of host buffers produced. + * @param columnNames names that correspond to the table columns + * @param numChildren Children of the top level + * @param flatNumChildren flattened list of children per column + * @param nullable true if the column can have nulls else false + * @param metadataKeys Metadata key names to place in the Parquet file + * @param metadataValues Metadata values corresponding to metadataKeys + * @param compression native compression codec ID + * @param rowGroupSizeRows max #rows in a row group + * @param rowGroupSizeBytes max #bytes in a row group + * @param statsFreq native statistics frequency ID + * @param isInt96 true if timestamp type is int96 + * @param precisions precision list containing all the precisions of the decimal types in + * the columns + * @param isMapValues true if a column is a map + * @param isBinaryValues true if a column is a binary + * @param consumer consumer of host buffers produced. * @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd. */ private static native long writeParquetBufferBegin(String[] columnNames, @@ -389,6 +395,8 @@ private static native long writeParquetBufferBegin(String[] columnNames, String[] metadataKeys, String[] metadataValues, int compression, + int rowGroupSizeRows, + long rowGroupSizeBytes, int statsFreq, boolean[] isInt96, int[] precisions, @@ -1820,6 +1828,8 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) { options.getMetadataKeys(), options.getMetadataValues(), options.getCompressionType().nativeId, + options.getRowGroupSizeRows(), + options.getRowGroupSizeBytes(), options.getStatisticsFrequency().nativeId, options.getFlatIsTimeTypeInt96(), options.getFlatPrecision(), @@ -1840,6 +1850,8 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons options.getMetadataKeys(), options.getMetadataValues(), options.getCompressionType().nativeId, + options.getRowGroupSizeRows(), + options.getRowGroupSizeBytes(), options.getStatisticsFrequency().nativeId, options.getFlatIsTimeTypeInt96(), options.getFlatPrecision(), diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index 40a111209b0..92e213bcb60 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -2150,6 +2150,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env, jobjectArray j_metadata_keys, jobjectArray j_metadata_values, jint j_compression, + jint j_row_group_size_rows, + jlong j_row_group_size_bytes, jint j_stats_freq, jbooleanArray j_isInt96, jintArray j_precisions, @@ -2205,6 +2207,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env, chunked_parquet_writer_options::builder(sink) .metadata(std::move(metadata)) .compression(static_cast(j_compression)) + .row_group_size_rows(j_row_group_size_rows) + .row_group_size_bytes(j_row_group_size_bytes) .stats_level(static_cast(j_stats_freq)) .key_value_metadata({kv_metadata}) .compression_statistics(stats) @@ -2227,6 +2231,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env, jobjectArray j_metadata_keys, jobjectArray j_metadata_values, jint j_compression, + jint j_row_group_size_rows, + jlong j_row_group_size_bytes, jint j_stats_freq, jbooleanArray j_isInt96, jintArray j_precisions, @@ -2280,6 +2286,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env, chunked_parquet_writer_options::builder(sink) .metadata(std::move(metadata)) .compression(static_cast(j_compression)) + .row_group_size_rows(j_row_group_size_rows) + .row_group_size_bytes(j_row_group_size_bytes) .stats_level(static_cast(j_stats_freq)) .key_value_metadata({kv_metadata}) .compression_statistics(stats) diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index 56fe63598d9..830f2b33b32 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -9122,7 +9122,11 @@ void testParquetWriteToBufferChunked() { columns.add(Columns.STRUCT.name); WriteUtils.buildWriterOptions(optBuilder, columns); ParquetWriterOptions options = optBuilder.build(); - ParquetWriterOptions optionsNoCompress = optBuilder.withCompressionType(CompressionType.NONE).build(); + ParquetWriterOptions optionsNoCompress = + optBuilder.withCompressionType(CompressionType.NONE) + .withRowGroupSizeRows(10000) + .withRowGroupSizeBytes(10000) + .build(); try (Table table0 = getExpectedFileTable(columns); MyBufferConsumer consumer = new MyBufferConsumer()) { try (TableWriter writer = Table.writeParquetChunked(options, consumer)) { @@ -9208,6 +9212,8 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException { .withDecimalColumn("_c7", 4) .withDecimalColumn("_c8", 6) .withCompressionType(CompressionType.NONE) + .withRowGroupSizeRows(10000) + .withRowGroupSizeBytes(10000) .withStatisticsFrequency(ParquetWriterOptions.StatisticsFrequency.NONE) .build(); try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) {