Skip to content

Commit

Permalink
Add ability to set parquet row group max #rows and #bytes in java (#1…
Browse files Browse the repository at this point in the history
…6805)

Adds the ability to set the max # rows per row group and max # bytes per row group for parquet files.

Authors:
  - Paul Mattione (https://github.com/pmattione-nvidia)

Approvers:
  - Robert (Bobby) Evans (https://github.com/revans2)
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Nghia Truong (https://github.com/ttnghia)

URL: #16805
  • Loading branch information
pmattione-nvidia authored Sep 17, 2024
1 parent 86861e0 commit f8d5063
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 30 deletions.
26 changes: 25 additions & 1 deletion java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,9 +24,13 @@
*/
public final class ParquetWriterOptions extends CompressionMetadataWriterOptions {
private final StatisticsFrequency statsGranularity;
private int rowGroupSizeRows;
private long rowGroupSizeBytes;

private ParquetWriterOptions(Builder builder) {
super(builder);
this.rowGroupSizeRows = builder.rowGroupSizeRows;
this.rowGroupSizeBytes = builder.rowGroupSizeBytes;
this.statsGranularity = builder.statsGranularity;
}

Expand All @@ -51,18 +55,38 @@ public static Builder builder() {
return new Builder();
}

public int getRowGroupSizeRows() {
return rowGroupSizeRows;
}

public long getRowGroupSizeBytes() {
return rowGroupSizeBytes;
}

public StatisticsFrequency getStatisticsFrequency() {
return statsGranularity;
}

public static class Builder extends CompressionMetadataWriterOptions.Builder
<Builder, ParquetWriterOptions> {
private int rowGroupSizeRows = 1000000; //Max of 1 million rows per row group
private long rowGroupSizeBytes = 128 * 1024 * 1024; //Max of 128MB per row group
private StatisticsFrequency statsGranularity = StatisticsFrequency.ROWGROUP;

public Builder() {
super();
}

public Builder withRowGroupSizeRows(int rowGroupSizeRows) {
this.rowGroupSizeRows = rowGroupSizeRows;
return this;
}

public Builder withRowGroupSizeBytes(long rowGroupSizeBytes) {
this.rowGroupSizeBytes = rowGroupSizeBytes;
return this;
}

public Builder withStatisticsFrequency(StatisticsFrequency statsGranularity) {
this.statsGranularity = statsGranularity;
return this;
Expand Down
68 changes: 40 additions & 28 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -332,20 +332,22 @@ private static native long[] readAvroFromDataSource(String[] filterColumnNames,

/**
* Setup everything to write parquet formatted data to a file.
* @param columnNames names that correspond to the table columns
* @param numChildren Children of the top level
* @param flatNumChildren flattened list of children per column
* @param nullable true if the column can have nulls else false
* @param metadataKeys Metadata key names to place in the Parquet file
* @param metadataValues Metadata values corresponding to metadataKeys
* @param compression native compression codec ID
* @param statsFreq native statistics frequency ID
* @param isInt96 true if timestamp type is int96
* @param precisions precision list containing all the precisions of the decimal types in
* the columns
* @param isMapValues true if a column is a map
* @param isBinaryValues true if a column is a binary
* @param filename local output path
* @param columnNames names that correspond to the table columns
* @param numChildren Children of the top level
* @param flatNumChildren flattened list of children per column
* @param nullable true if the column can have nulls else false
* @param metadataKeys Metadata key names to place in the Parquet file
* @param metadataValues Metadata values corresponding to metadataKeys
* @param compression native compression codec ID
* @param rowGroupSizeRows max #rows in a row group
* @param rowGroupSizeBytes max #bytes in a row group
* @param statsFreq native statistics frequency ID
* @param isInt96 true if timestamp type is int96
* @param precisions precision list containing all the precisions of the decimal types in
* the columns
* @param isMapValues true if a column is a map
* @param isBinaryValues true if a column is a binary
* @param filename local output path
* @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd.
*/
private static native long writeParquetFileBegin(String[] columnNames,
Expand All @@ -355,6 +357,8 @@ private static native long writeParquetFileBegin(String[] columnNames,
String[] metadataKeys,
String[] metadataValues,
int compression,
int rowGroupSizeRows,
long rowGroupSizeBytes,
int statsFreq,
boolean[] isInt96,
int[] precisions,
Expand All @@ -366,20 +370,22 @@ private static native long writeParquetFileBegin(String[] columnNames,

/**
* Setup everything to write parquet formatted data to a buffer.
* @param columnNames names that correspond to the table columns
* @param numChildren Children of the top level
* @param flatNumChildren flattened list of children per column
* @param nullable true if the column can have nulls else false
* @param metadataKeys Metadata key names to place in the Parquet file
* @param metadataValues Metadata values corresponding to metadataKeys
* @param compression native compression codec ID
* @param statsFreq native statistics frequency ID
* @param isInt96 true if timestamp type is int96
* @param precisions precision list containing all the precisions of the decimal types in
* the columns
* @param isMapValues true if a column is a map
* @param isBinaryValues true if a column is a binary
* @param consumer consumer of host buffers produced.
* @param columnNames names that correspond to the table columns
* @param numChildren Children of the top level
* @param flatNumChildren flattened list of children per column
* @param nullable true if the column can have nulls else false
* @param metadataKeys Metadata key names to place in the Parquet file
* @param metadataValues Metadata values corresponding to metadataKeys
* @param compression native compression codec ID
* @param rowGroupSizeRows max #rows in a row group
* @param rowGroupSizeBytes max #bytes in a row group
* @param statsFreq native statistics frequency ID
* @param isInt96 true if timestamp type is int96
* @param precisions precision list containing all the precisions of the decimal types in
* the columns
* @param isMapValues true if a column is a map
* @param isBinaryValues true if a column is a binary
* @param consumer consumer of host buffers produced.
* @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd.
*/
private static native long writeParquetBufferBegin(String[] columnNames,
Expand All @@ -389,6 +395,8 @@ private static native long writeParquetBufferBegin(String[] columnNames,
String[] metadataKeys,
String[] metadataValues,
int compression,
int rowGroupSizeRows,
long rowGroupSizeBytes,
int statsFreq,
boolean[] isInt96,
int[] precisions,
Expand Down Expand Up @@ -1820,6 +1828,8 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) {
options.getMetadataKeys(),
options.getMetadataValues(),
options.getCompressionType().nativeId,
options.getRowGroupSizeRows(),
options.getRowGroupSizeBytes(),
options.getStatisticsFrequency().nativeId,
options.getFlatIsTimeTypeInt96(),
options.getFlatPrecision(),
Expand All @@ -1840,6 +1850,8 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons
options.getMetadataKeys(),
options.getMetadataValues(),
options.getCompressionType().nativeId,
options.getRowGroupSizeRows(),
options.getRowGroupSizeBytes(),
options.getStatisticsFrequency().nativeId,
options.getFlatIsTimeTypeInt96(),
options.getFlatPrecision(),
Expand Down
8 changes: 8 additions & 0 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2150,6 +2150,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env,
jobjectArray j_metadata_keys,
jobjectArray j_metadata_values,
jint j_compression,
jint j_row_group_size_rows,
jlong j_row_group_size_bytes,
jint j_stats_freq,
jbooleanArray j_isInt96,
jintArray j_precisions,
Expand Down Expand Up @@ -2205,6 +2207,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env,
chunked_parquet_writer_options::builder(sink)
.metadata(std::move(metadata))
.compression(static_cast<compression_type>(j_compression))
.row_group_size_rows(j_row_group_size_rows)
.row_group_size_bytes(j_row_group_size_bytes)
.stats_level(static_cast<statistics_freq>(j_stats_freq))
.key_value_metadata({kv_metadata})
.compression_statistics(stats)
Expand All @@ -2227,6 +2231,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env,
jobjectArray j_metadata_keys,
jobjectArray j_metadata_values,
jint j_compression,
jint j_row_group_size_rows,
jlong j_row_group_size_bytes,
jint j_stats_freq,
jbooleanArray j_isInt96,
jintArray j_precisions,
Expand Down Expand Up @@ -2280,6 +2286,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env,
chunked_parquet_writer_options::builder(sink)
.metadata(std::move(metadata))
.compression(static_cast<compression_type>(j_compression))
.row_group_size_rows(j_row_group_size_rows)
.row_group_size_bytes(j_row_group_size_bytes)
.stats_level(static_cast<statistics_freq>(j_stats_freq))
.key_value_metadata({kv_metadata})
.compression_statistics(stats)
Expand Down
8 changes: 7 additions & 1 deletion java/src/test/java/ai/rapids/cudf/TableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9122,7 +9122,11 @@ void testParquetWriteToBufferChunked() {
columns.add(Columns.STRUCT.name);
WriteUtils.buildWriterOptions(optBuilder, columns);
ParquetWriterOptions options = optBuilder.build();
ParquetWriterOptions optionsNoCompress = optBuilder.withCompressionType(CompressionType.NONE).build();
ParquetWriterOptions optionsNoCompress =
optBuilder.withCompressionType(CompressionType.NONE)
.withRowGroupSizeRows(10000)
.withRowGroupSizeBytes(10000)
.build();
try (Table table0 = getExpectedFileTable(columns);
MyBufferConsumer consumer = new MyBufferConsumer()) {
try (TableWriter writer = Table.writeParquetChunked(options, consumer)) {
Expand Down Expand Up @@ -9208,6 +9212,8 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException {
.withDecimalColumn("_c7", 4)
.withDecimalColumn("_c8", 6)
.withCompressionType(CompressionType.NONE)
.withRowGroupSizeRows(10000)
.withRowGroupSizeBytes(10000)
.withStatisticsFrequency(ParquetWriterOptions.StatisticsFrequency.NONE)
.build();
try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) {
Expand Down

0 comments on commit f8d5063

Please sign in to comment.