Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to set parquet row group max #rows and #bytes in java #16805

Merged
merged 3 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
*/
public final class ParquetWriterOptions extends CompressionMetadataWriterOptions {
private final StatisticsFrequency statsGranularity;
private int rowGroupSizeRows;
pmattione-nvidia marked this conversation as resolved.
Show resolved Hide resolved
private long rowGroupSizeBytes;

private ParquetWriterOptions(Builder builder) {
super(builder);
this.rowGroupSizeRows = builder.rowGroupSizeRows;
this.rowGroupSizeBytes = builder.rowGroupSizeBytes;
this.statsGranularity = builder.statsGranularity;
}

Expand All @@ -51,18 +55,38 @@ public static Builder builder() {
return new Builder();
}

public int getRowGroupSizeRows() {
return rowGroupSizeRows;
}

public long getRowGroupSizeBytes() {
return rowGroupSizeBytes;
}

public StatisticsFrequency getStatisticsFrequency() {
return statsGranularity;
}

public static class Builder extends CompressionMetadataWriterOptions.Builder
<Builder, ParquetWriterOptions> {
private int rowGroupSizeRows = 1000000; //Max of 1 million rows per row group
private long rowGroupSizeBytes = 128 * 1024 * 1024; //Max of 128MB per row group
private StatisticsFrequency statsGranularity = StatisticsFrequency.ROWGROUP;

public Builder() {
super();
}

public Builder withRowGroupSizeRows(int rowGroupSizeRows) {
this.rowGroupSizeRows = rowGroupSizeRows;
return this;
}

public Builder withRowGroupSizeBytes(long rowGroupSizeBytes) {
this.rowGroupSizeBytes = rowGroupSizeBytes;
return this;
}

public Builder withStatisticsFrequency(StatisticsFrequency statsGranularity) {
this.statsGranularity = statsGranularity;
return this;
Expand Down
68 changes: 40 additions & 28 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -315,20 +315,22 @@ private static native long[] readAvroFromDataSource(String[] filterColumnNames,

/**
* Setup everything to write parquet formatted data to a file.
* @param columnNames names that correspond to the table columns
* @param numChildren Children of the top level
* @param flatNumChildren flattened list of children per column
* @param nullable true if the column can have nulls else false
* @param metadataKeys Metadata key names to place in the Parquet file
* @param metadataValues Metadata values corresponding to metadataKeys
* @param compression native compression codec ID
* @param statsFreq native statistics frequency ID
* @param isInt96 true if timestamp type is int96
* @param precisions precision list containing all the precisions of the decimal types in
* the columns
* @param isMapValues true if a column is a map
* @param isBinaryValues true if a column is a binary
* @param filename local output path
* @param columnNames names that correspond to the table columns
* @param numChildren Children of the top level
* @param flatNumChildren flattened list of children per column
* @param nullable true if the column can have nulls else false
* @param metadataKeys Metadata key names to place in the Parquet file
* @param metadataValues Metadata values corresponding to metadataKeys
* @param compression native compression codec ID
* @param rowGroupSizeRows max #rows in a row group
* @param rowGroupSizeBytes max #bytes in a row group
* @param statsFreq native statistics frequency ID
* @param isInt96 true if timestamp type is int96
* @param precisions precision list containing all the precisions of the decimal types in
* the columns
* @param isMapValues true if a column is a map
* @param isBinaryValues true if a column is a binary
* @param filename local output path
* @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd.
*/
private static native long writeParquetFileBegin(String[] columnNames,
Expand All @@ -338,6 +340,8 @@ private static native long writeParquetFileBegin(String[] columnNames,
String[] metadataKeys,
String[] metadataValues,
int compression,
int rowGroupSizeRows,
long rowGroupSizeBytes,
int statsFreq,
boolean[] isInt96,
int[] precisions,
Expand All @@ -349,20 +353,22 @@ private static native long writeParquetFileBegin(String[] columnNames,

/**
* Setup everything to write parquet formatted data to a buffer.
* @param columnNames names that correspond to the table columns
* @param numChildren Children of the top level
* @param flatNumChildren flattened list of children per column
* @param nullable true if the column can have nulls else false
* @param metadataKeys Metadata key names to place in the Parquet file
* @param metadataValues Metadata values corresponding to metadataKeys
* @param compression native compression codec ID
* @param statsFreq native statistics frequency ID
* @param isInt96 true if timestamp type is int96
* @param precisions precision list containing all the precisions of the decimal types in
* the columns
* @param isMapValues true if a column is a map
* @param isBinaryValues true if a column is a binary
* @param consumer consumer of host buffers produced.
* @param columnNames names that correspond to the table columns
* @param numChildren Children of the top level
* @param flatNumChildren flattened list of children per column
* @param nullable true if the column can have nulls else false
* @param metadataKeys Metadata key names to place in the Parquet file
* @param metadataValues Metadata values corresponding to metadataKeys
* @param compression native compression codec ID
* @param rowGroupSizeRows max #rows in a row group
* @param rowGroupSizeBytes max #bytes in a row group
* @param statsFreq native statistics frequency ID
* @param isInt96 true if timestamp type is int96
* @param precisions precision list containing all the precisions of the decimal types in
* the columns
* @param isMapValues true if a column is a map
* @param isBinaryValues true if a column is a binary
* @param consumer consumer of host buffers produced.
* @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd.
*/
private static native long writeParquetBufferBegin(String[] columnNames,
Expand All @@ -372,6 +378,8 @@ private static native long writeParquetBufferBegin(String[] columnNames,
String[] metadataKeys,
String[] metadataValues,
int compression,
int rowGroupSizeRows,
long rowGroupSizeBytes,
int statsFreq,
boolean[] isInt96,
int[] precisions,
Expand Down Expand Up @@ -1773,6 +1781,8 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) {
options.getMetadataKeys(),
options.getMetadataValues(),
options.getCompressionType().nativeId,
options.getRowGroupSizeRows(),
options.getRowGroupSizeBytes(),
options.getStatisticsFrequency().nativeId,
options.getFlatIsTimeTypeInt96(),
options.getFlatPrecision(),
Expand All @@ -1793,6 +1803,8 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons
options.getMetadataKeys(),
options.getMetadataValues(),
options.getCompressionType().nativeId,
options.getRowGroupSizeRows(),
options.getRowGroupSizeBytes(),
options.getStatisticsFrequency().nativeId,
options.getFlatIsTimeTypeInt96(),
options.getFlatPrecision(),
Expand Down
8 changes: 8 additions & 0 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2109,6 +2109,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env,
jobjectArray j_metadata_keys,
jobjectArray j_metadata_values,
jint j_compression,
jint j_row_group_size_rows,
jlong j_row_group_size_bytes,
jint j_stats_freq,
jbooleanArray j_isInt96,
jintArray j_precisions,
Expand Down Expand Up @@ -2164,6 +2166,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env,
chunked_parquet_writer_options::builder(sink)
.metadata(std::move(metadata))
.compression(static_cast<compression_type>(j_compression))
.row_group_size_rows(j_row_group_size_rows)
.row_group_size_bytes(j_row_group_size_bytes)
.stats_level(static_cast<statistics_freq>(j_stats_freq))
.key_value_metadata({kv_metadata})
.compression_statistics(stats)
Expand All @@ -2186,6 +2190,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env,
jobjectArray j_metadata_keys,
jobjectArray j_metadata_values,
jint j_compression,
jint j_row_group_size_rows,
jlong j_row_group_size_bytes,
jint j_stats_freq,
jbooleanArray j_isInt96,
jintArray j_precisions,
Expand Down Expand Up @@ -2239,6 +2245,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env,
chunked_parquet_writer_options::builder(sink)
.metadata(std::move(metadata))
.compression(static_cast<compression_type>(j_compression))
.row_group_size_rows(j_row_group_size_rows)
.row_group_size_bytes(j_row_group_size_bytes)
.stats_level(static_cast<statistics_freq>(j_stats_freq))
.key_value_metadata({kv_metadata})
.compression_statistics(stats)
Expand Down
8 changes: 7 additions & 1 deletion java/src/test/java/ai/rapids/cudf/TableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8921,7 +8921,11 @@ void testParquetWriteToBufferChunked() {
columns.add(Columns.STRUCT.name);
WriteUtils.buildWriterOptions(optBuilder, columns);
ParquetWriterOptions options = optBuilder.build();
ParquetWriterOptions optionsNoCompress = optBuilder.withCompressionType(CompressionType.NONE).build();
ParquetWriterOptions optionsNoCompress =
optBuilder.withCompressionType(CompressionType.NONE)
.withRowGroupSizeRows(10000)
.withRowGroupSizeBytes(10000)
.build();
try (Table table0 = getExpectedFileTable(columns);
MyBufferConsumer consumer = new MyBufferConsumer()) {
try (TableWriter writer = Table.writeParquetChunked(options, consumer)) {
Expand Down Expand Up @@ -9007,6 +9011,8 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException {
.withDecimalColumn("_c7", 4)
.withDecimalColumn("_c8", 6)
.withCompressionType(CompressionType.NONE)
.withRowGroupSizeRows(10000)
.withRowGroupSizeBytes(10000)
.withStatisticsFrequency(ParquetWriterOptions.StatisticsFrequency.NONE)
.build();
try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) {
Expand Down
Loading