Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the Parquet writer's default_row_group_size_bytes from 128MB to inf #16750

Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ namespace io {
* @file
*/

constexpr size_t default_row_group_size_bytes = 128 * 1024 * 1024; ///< 128MB per row group
constexpr size_type default_row_group_size_rows = 1000000; ///< 1 million rows per row group
constexpr auto default_row_group_size_bytes =
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
std::numeric_limits<size_t>::max(); ///< Infinite bytes per row group
constexpr size_type default_row_group_size_rows = 1'000'000; ///< 1 million rows per row group
constexpr size_t default_max_page_size_bytes = 512 * 1024; ///< 512KB per page
constexpr size_type default_max_page_size_rows = 20000; ///< 20k rows per page
constexpr int32_t default_column_index_truncate_length = 64; ///< truncate to 64 bytes
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1819,8 +1819,13 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
auto const table_size = std::reduce(column_sizes.begin(), column_sizes.end());
auto const avg_row_len = util::div_rounding_up_safe<size_t>(table_size, input.num_rows());
if (avg_row_len > 0) {
auto const rg_frag_size = util::div_rounding_up_safe(max_row_group_size, avg_row_len);
max_page_fragment_size = std::min<size_type>(rg_frag_size, max_page_fragment_size);
// Ensure `rg_frag_size` is not bigger than size_type::max for default max_row_group_size
// value (=uint64::max) to avoid a sign overflow when comparing
auto const rg_frag_size =
std::min<size_t>(std::numeric_limits<size_type>::max(),
util::div_rounding_up_safe(max_row_group_size, avg_row_len));
// Safe comparison as rg_frag_size fits in size_type
max_page_fragment_size = std::min<size_type>(rg_frag_size, max_page_fragment_size);
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
}

// dividing page size by average row length will tend to overshoot the desired
Expand Down
16 changes: 8 additions & 8 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def write_parquet(
object statistics="ROWGROUP",
object metadata_file_path=None,
object int96_timestamps=False,
object row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT,
object row_group_size_bytes=None,
object row_group_size_rows=None,
object max_page_size_bytes=None,
object max_page_size_rows=None,
Expand Down Expand Up @@ -616,9 +616,9 @@ cdef class ParquetWriter:
Name of the compression to use. Use ``None`` for no compression.
statistics : {'ROWGROUP', 'PAGE', 'COLUMN', 'NONE'}, default 'ROWGROUP'
Level at which column statistics should be included in file.
row_group_size_bytes: int, default 134217728
row_group_size_bytes: int, default ``uint64 max``
Maximum size of each stripe of the output.
By default, 134217728 (128MB) will be used.
By default, a virtually infinite size equal to ``uint64 max`` will be used.
row_group_size_rows: int, default 1000000
Maximum number of rows of each stripe of the output.
By default, 1000000 (10^6 rows) will be used.
Expand Down Expand Up @@ -661,11 +661,11 @@ cdef class ParquetWriter:

def __cinit__(self, object filepath_or_buffer, object index=None,
object compression="snappy", str statistics="ROWGROUP",
int row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT,
int row_group_size_rows=1000000,
int max_page_size_bytes=524288,
int max_page_size_rows=20000,
int max_dictionary_size=1048576,
size_t row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT,
size_type row_group_size_rows=1000000,
size_t max_page_size_bytes=524288,
size_type max_page_size_rows=20000,
size_t max_dictionary_size=1048576,
bool use_dictionary=True,
bool store_schema=False):
filepaths_or_buffers = (
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6840,7 +6840,7 @@ def to_parquet(
statistics="ROWGROUP",
metadata_file_path=None,
int96_timestamps=False,
row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT,
row_group_size_bytes=None,
row_group_size_rows=None,
max_page_size_bytes=None,
max_page_size_rows=None,
Expand Down
8 changes: 4 additions & 4 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def _write_parquet(
statistics="ROWGROUP",
metadata_file_path=None,
int96_timestamps=False,
row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT,
row_group_size_bytes=None,
row_group_size_rows=None,
max_page_size_bytes=None,
max_page_size_rows=None,
Expand Down Expand Up @@ -149,7 +149,7 @@ def write_to_dataset(
return_metadata=False,
statistics="ROWGROUP",
int96_timestamps=False,
row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT,
row_group_size_bytes=None,
row_group_size_rows=None,
max_page_size_bytes=None,
max_page_size_rows=None,
Expand Down Expand Up @@ -205,7 +205,7 @@ def write_to_dataset(
If ``False``, timestamps will not be altered.
row_group_size_bytes: integer or None, default None
Maximum size of each stripe of the output.
If None, 134217728 (128MB) will be used.
If None, no limit on row group stripe size will be used.
row_group_size_rows: integer or None, default None
Maximum number of rows of each stripe of the output.
If None, 1000000 will be used.
Expand Down Expand Up @@ -980,7 +980,7 @@ def to_parquet(
statistics="ROWGROUP",
metadata_file_path=None,
int96_timestamps=False,
row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT,
row_group_size_bytes=None,
row_group_size_rows=None,
max_page_size_bytes=None,
max_page_size_rows=None,
Expand Down
12 changes: 4 additions & 8 deletions python/cudf/cudf/utils/ioutils.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the new value, does row_group_size_bytes_val_in_mb and its use still make sense?
Also, should we change the Python default to None? In C++ it would be a breaking change, but Python does not have this limitation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None makes the most sense on the Python side. Let's wait for Spark to let us know if they want a specific value in bytes or if infinity (None) is good to go.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
fsspec_parquet = None

_BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024
_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024
_ROW_GROUP_SIZE_BYTES_DEFAULT = np.iinfo(np.uint64).max

_docstring_remote_sources = """
- cuDF supports local and remote data stores. See configuration details for
Expand Down Expand Up @@ -275,10 +275,9 @@
timestamp[us] to the int96 format, which is the number of Julian
days and the number of nanoseconds since midnight of 1970-01-01.
If ``False``, timestamps will not be altered.
row_group_size_bytes: integer, default {row_group_size_bytes_val}
row_group_size_bytes: integer, default None
Maximum size of each stripe of the output.
If None, {row_group_size_bytes_val}
({row_group_size_bytes_val_in_mb} MB) will be used.
If None, no limit on row group stripe size will be used.
row_group_size_rows: integer or None, default None
Maximum number of rows of each stripe of the output.
If None, 1000000 will be used.
Expand Down Expand Up @@ -346,10 +345,7 @@
See Also
--------
cudf.read_parquet
""".format(
row_group_size_bytes_val=_ROW_GROUP_SIZE_BYTES_DEFAULT,
row_group_size_bytes_val_in_mb=_ROW_GROUP_SIZE_BYTES_DEFAULT / 1024 / 1024,
)
"""
doc_to_parquet = docfmt_partial(docstring=_docstring_to_parquet)

_docstring_merge_parquet_filemetadata = """
Expand Down
7 changes: 2 additions & 5 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from cudf.io import write_to_dataset
from cudf.io.parquet import _apply_post_filters, _normalize_filters
from cudf.utils.dtypes import cudf_dtype_from_pa_type
from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT


class CudfEngine(ArrowDatasetEngine):
Expand Down Expand Up @@ -341,9 +340,7 @@ def write_partition(
return_metadata=return_metadata,
statistics=kwargs.get("statistics", "ROWGROUP"),
int96_timestamps=kwargs.get("int96_timestamps", False),
row_group_size_bytes=kwargs.get(
"row_group_size_bytes", _ROW_GROUP_SIZE_BYTES_DEFAULT
),
row_group_size_bytes=kwargs.get("row_group_size_bytes", None),
row_group_size_rows=kwargs.get("row_group_size_rows", None),
max_page_size_bytes=kwargs.get("max_page_size_bytes", None),
max_page_size_rows=kwargs.get("max_page_size_rows", None),
Expand All @@ -365,7 +362,7 @@ def write_partition(
statistics=kwargs.get("statistics", "ROWGROUP"),
int96_timestamps=kwargs.get("int96_timestamps", False),
row_group_size_bytes=kwargs.get(
"row_group_size_bytes", _ROW_GROUP_SIZE_BYTES_DEFAULT
"row_group_size_bytes", None
),
row_group_size_rows=kwargs.get(
"row_group_size_rows", None
Expand Down
Loading