From e0d87ef1d6e6c008b4b1bec935687df604158784 Mon Sep 17 00:00:00 2001
From: Paul Mattione <pmattione@nvidia.com>
Date: Sun, 15 Sep 2024 12:58:07 -0400
Subject: [PATCH 1/3] Add ability to set parquet row group max #rows and #bytes
 in java

---
 .../ai/rapids/cudf/ParquetWriterOptions.java  | 24 +++++++
 java/src/main/java/ai/rapids/cudf/Table.java  | 68 +++++++++++--------
 java/src/main/native/src/TableJni.cpp         |  8 +++
 .../test/java/ai/rapids/cudf/TableTest.java   |  8 ++-
 4 files changed, 79 insertions(+), 29 deletions(-)

diff --git a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java
index 7b58817550d..744a10c5ca9 100644
--- a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java
+++ b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java
@@ -24,9 +24,13 @@
  */
 public final class ParquetWriterOptions extends CompressionMetadataWriterOptions {
   private final StatisticsFrequency statsGranularity;
+  private int rowGroupSizeRows;
+  private long rowGroupSizeBytes;
 
   private ParquetWriterOptions(Builder builder) {
     super(builder);
+    this.rowGroupSizeRows = builder.rowGroupSizeRows;
+    this.rowGroupSizeBytes = builder.rowGroupSizeBytes;
     this.statsGranularity = builder.statsGranularity;
   }
 
@@ -51,18 +55,38 @@ public static Builder builder() {
     return new Builder();
   }
 
+  public int getRowGroupSizeRows() {
+    return rowGroupSizeRows;
+  }
+
+  public long getRowGroupSizeBytes() {
+    return rowGroupSizeBytes;
+  }
+
   public StatisticsFrequency getStatisticsFrequency() {
     return statsGranularity;
   }
 
   public static class Builder extends CompressionMetadataWriterOptions.Builder
         <Builder, ParquetWriterOptions> {
+    private int rowGroupSizeRows = 1000000; //Max of 1 million rows per row group
+    private long rowGroupSizeBytes = 128 * 1024 * 1024; //Max of 128MB per row group
     private StatisticsFrequency statsGranularity = StatisticsFrequency.ROWGROUP;
 
     public Builder() {
       super();
     }
 
+    public Builder withRowGroupSizeRows(int rowGroupSizeRows) {
+      this.rowGroupSizeRows = rowGroupSizeRows;
+      return this;
+    }
+
+    public Builder withRowGroupSizeBytes(long rowGroupSizeBytes) {
+      this.rowGroupSizeBytes = rowGroupSizeBytes;
+      return this;
+    }
+
     public Builder withStatisticsFrequency(StatisticsFrequency statsGranularity) {
       this.statsGranularity = statsGranularity;
       return this;
diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java
index 36e342cae13..715ce6b1bac 100644
--- a/java/src/main/java/ai/rapids/cudf/Table.java
+++ b/java/src/main/java/ai/rapids/cudf/Table.java
@@ -315,20 +315,22 @@ private static native long[] readAvroFromDataSource(String[] filterColumnNames,
 
   /**
    * Setup everything to write parquet formatted data to a file.
-   * @param columnNames     names that correspond to the table columns
-   * @param numChildren     Children of the top level
-   * @param flatNumChildren flattened list of children per column
-   * @param nullable        true if the column can have nulls else false
-   * @param metadataKeys    Metadata key names to place in the Parquet file
-   * @param metadataValues  Metadata values corresponding to metadataKeys
-   * @param compression     native compression codec ID
-   * @param statsFreq       native statistics frequency ID
-   * @param isInt96         true if timestamp type is int96
-   * @param precisions      precision list containing all the precisions of the decimal types in
-   *                        the columns
-   * @param isMapValues     true if a column is a map
-   * @param isBinaryValues  true if a column is a binary
-   * @param filename        local output path
+   * @param columnNames       names that correspond to the table columns
+   * @param numChildren       Children of the top level
+   * @param flatNumChildren   flattened list of children per column
+   * @param nullable          true if the column can have nulls else false
+   * @param metadataKeys      Metadata key names to place in the Parquet file
+   * @param metadataValues    Metadata values corresponding to metadataKeys
+   * @param compression       native compression codec ID
+   * @param rowGroupSizeRows  max #rows in a row group
+   * @param rowGroupSizeBytes max #bytes in a row group
+   * @param statsFreq         native statistics frequency ID
+   * @param isInt96           true if timestamp type is int96
+   * @param precisions        precision list containing all the precisions of the decimal types in
+   *                          the columns
+   * @param isMapValues       true if a column is a map
+   * @param isBinaryValues    true if a column is a binary
+   * @param filename          local output path
    * @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd.
    */
   private static native long writeParquetFileBegin(String[] columnNames,
@@ -338,6 +340,8 @@ private static native long writeParquetFileBegin(String[] columnNames,
                                                    String[] metadataKeys,
                                                    String[] metadataValues,
                                                    int compression,
+                                                   int rowGroupSizeRows,
+                                                   long rowGroupSizeBytes,
                                                    int statsFreq,
                                                    boolean[] isInt96,
                                                    int[] precisions,
@@ -349,20 +353,22 @@ private static native long writeParquetFileBegin(String[] columnNames,
 
   /**
    * Setup everything to write parquet formatted data to a buffer.
-   * @param columnNames     names that correspond to the table columns
-   * @param numChildren     Children of the top level
-   * @param flatNumChildren flattened list of children per column
-   * @param nullable        true if the column can have nulls else false
-   * @param metadataKeys    Metadata key names to place in the Parquet file
-   * @param metadataValues  Metadata values corresponding to metadataKeys
-   * @param compression     native compression codec ID
-   * @param statsFreq       native statistics frequency ID
-   * @param isInt96         true if timestamp type is int96
-   * @param precisions      precision list containing all the precisions of the decimal types in
-   *                        the columns
-   * @param isMapValues     true if a column is a map
-   * @param isBinaryValues  true if a column is a binary
-   * @param consumer        consumer of host buffers produced.
+   * @param columnNames       names that correspond to the table columns
+   * @param numChildren       Children of the top level
+   * @param flatNumChildren   flattened list of children per column
+   * @param nullable          true if the column can have nulls else false
+   * @param metadataKeys      Metadata key names to place in the Parquet file
+   * @param metadataValues    Metadata values corresponding to metadataKeys
+   * @param compression       native compression codec ID
+   * @param rowGroupSizeRows  max #rows in a row group
+   * @param rowGroupSizeBytes max #bytes in a row group
+   * @param statsFreq         native statistics frequency ID
+   * @param isInt96           true if timestamp type is int96
+   * @param precisions        precision list containing all the precisions of the decimal types in
+   *                          the columns
+   * @param isMapValues       true if a column is a map
+   * @param isBinaryValues    true if a column is a binary
+   * @param consumer          consumer of host buffers produced.
    * @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd.
    */
   private static native long writeParquetBufferBegin(String[] columnNames,
@@ -372,6 +378,8 @@ private static native long writeParquetBufferBegin(String[] columnNames,
                                                      String[] metadataKeys,
                                                      String[] metadataValues,
                                                      int compression,
+                                                     int rowGroupSizeRows,
+                                                     long rowGroupSizeBytes,
                                                      int statsFreq,
                                                      boolean[] isInt96,
                                                      int[] precisions,
@@ -1773,6 +1781,8 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) {
           options.getMetadataKeys(),
           options.getMetadataValues(),
           options.getCompressionType().nativeId,
+          options.getRowGroupSizeRows(),
+          options.getRowGroupSizeBytes(),
           options.getStatisticsFrequency().nativeId,
           options.getFlatIsTimeTypeInt96(),
           options.getFlatPrecision(),
@@ -1793,6 +1803,8 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons
           options.getMetadataKeys(),
           options.getMetadataValues(),
           options.getCompressionType().nativeId,
+          options.getRowGroupSizeRows(),
+          options.getRowGroupSizeBytes(),
           options.getStatisticsFrequency().nativeId,
           options.getFlatIsTimeTypeInt96(),
           options.getFlatPrecision(),
diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp
index c749c8c84bf..4888caa3b93 100644
--- a/java/src/main/native/src/TableJni.cpp
+++ b/java/src/main/native/src/TableJni.cpp
@@ -2109,6 +2109,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env,
                                                   jobjectArray j_metadata_keys,
                                                   jobjectArray j_metadata_values,
                                                   jint j_compression,
+                                                  jint j_row_group_size_rows,
+                                                  jlong j_row_group_size_bytes,
                                                   jint j_stats_freq,
                                                   jbooleanArray j_isInt96,
                                                   jintArray j_precisions,
@@ -2164,6 +2166,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env,
       chunked_parquet_writer_options::builder(sink)
         .metadata(std::move(metadata))
         .compression(static_cast<compression_type>(j_compression))
+        .row_group_size_rows(j_row_group_size_rows)
+        .row_group_size_bytes(j_row_group_size_bytes)
         .stats_level(static_cast<statistics_freq>(j_stats_freq))
         .key_value_metadata({kv_metadata})
         .compression_statistics(stats)
@@ -2186,6 +2190,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env,
                                                 jobjectArray j_metadata_keys,
                                                 jobjectArray j_metadata_values,
                                                 jint j_compression,
+                                                jint j_row_group_size_rows,
+                                                jlong j_row_group_size_bytes,
                                                 jint j_stats_freq,
                                                 jbooleanArray j_isInt96,
                                                 jintArray j_precisions,
@@ -2239,6 +2245,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env,
       chunked_parquet_writer_options::builder(sink)
         .metadata(std::move(metadata))
         .compression(static_cast<compression_type>(j_compression))
+        .row_group_size_rows(j_row_group_size_rows)
+        .row_group_size_bytes(j_row_group_size_bytes)
         .stats_level(static_cast<statistics_freq>(j_stats_freq))
         .key_value_metadata({kv_metadata})
         .compression_statistics(stats)
diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java
index 050bcbb268f..1fd4e87b015 100644
--- a/java/src/test/java/ai/rapids/cudf/TableTest.java
+++ b/java/src/test/java/ai/rapids/cudf/TableTest.java
@@ -8921,7 +8921,11 @@ void testParquetWriteToBufferChunked() {
     columns.add(Columns.STRUCT.name);
     WriteUtils.buildWriterOptions(optBuilder, columns);
     ParquetWriterOptions options = optBuilder.build();
-    ParquetWriterOptions optionsNoCompress = optBuilder.withCompressionType(CompressionType.NONE).build();
+    ParquetWriterOptions optionsNoCompress =
+      optBuilder.withCompressionType(CompressionType.NONE)
+      .withRowGroupSizeRows(4)
+      .withRowGroupSizeBytes(64)
+      .build();
     try (Table table0 = getExpectedFileTable(columns);
          MyBufferConsumer consumer = new MyBufferConsumer()) {
       try (TableWriter writer = Table.writeParquetChunked(options, consumer)) {
@@ -9007,6 +9011,8 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException {
           .withDecimalColumn("_c7", 4)
           .withDecimalColumn("_c8", 6)
           .withCompressionType(CompressionType.NONE)
+          .withRowGroupSizeRows(4)
+          .withRowGroupSizeBytes(64)
           .withStatisticsFrequency(ParquetWriterOptions.StatisticsFrequency.NONE)
           .build();
       try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) {

From ec1a4829b7904ea90a9ba83b7e9ef875c7cf2e7a Mon Sep 17 00:00:00 2001
From: Paul Mattione <pmattione@nvidia.com>
Date: Sun, 15 Sep 2024 19:09:03 -0400
Subject: [PATCH 2/3] Increase limits

---
 java/src/test/java/ai/rapids/cudf/TableTest.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java
index 1fd4e87b015..ca7e93c630c 100644
--- a/java/src/test/java/ai/rapids/cudf/TableTest.java
+++ b/java/src/test/java/ai/rapids/cudf/TableTest.java
@@ -8923,8 +8923,8 @@ void testParquetWriteToBufferChunked() {
     ParquetWriterOptions options = optBuilder.build();
     ParquetWriterOptions optionsNoCompress =
       optBuilder.withCompressionType(CompressionType.NONE)
-      .withRowGroupSizeRows(4)
-      .withRowGroupSizeBytes(64)
+      .withRowGroupSizeRows(10000)
+      .withRowGroupSizeBytes(10000)
       .build();
     try (Table table0 = getExpectedFileTable(columns);
          MyBufferConsumer consumer = new MyBufferConsumer()) {
@@ -9011,8 +9011,8 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException {
           .withDecimalColumn("_c7", 4)
           .withDecimalColumn("_c8", 6)
           .withCompressionType(CompressionType.NONE)
-          .withRowGroupSizeRows(4)
-          .withRowGroupSizeBytes(64)
+          .withRowGroupSizeRows(10000)
+          .withRowGroupSizeBytes(10000)
           .withStatisticsFrequency(ParquetWriterOptions.StatisticsFrequency.NONE)
           .build();
       try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) {

From 9b2c24cac77dac71c41e9cf3518bf912c68ce03a Mon Sep 17 00:00:00 2001
From: Paul Mattione <pmattione@nvidia.com>
Date: Mon, 16 Sep 2024 13:39:18 -0400
Subject: [PATCH 3/3] Update copyright year

---
 java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java
index 744a10c5ca9..8c8180436e6 100644
--- a/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java
+++ b/java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java
@@ -1,6 +1,6 @@
 /*
  *
- *  Copyright (c) 2019-2021, NVIDIA CORPORATION.
+ *  Copyright (c) 2019-2024, NVIDIA CORPORATION.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.