Skip to content

Commit

Permalink
Change the default dictionary policy in Parquet writer from ALWAYS
Browse files Browse the repository at this point in the history
…to `ADAPTIVE` (#15570)

This PR changes the default dictionary policy in parquet from `ALWAYS` to `ADAPTIVE` and adds an argument `max_dictionary_size` to control the `ADAPTIVE`-ness of the dictionary policy. This change prevents a silent fallback to `UNCOMPRESSED` when writing parquet files with `ZSTD` compression leading to better performance for several use cases.

Partially closes #15501.

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Bradley Dice (https://github.com/bdice)

URL: #15570
  • Loading branch information
mhaseeb123 authored May 11, 2024
1 parent b5a9c4b commit ce1933f
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 11 deletions.
8 changes: 4 additions & 4 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ class parquet_writer_options {
// Maximum size of min or max values in column index
int32_t _column_index_truncate_length = default_column_index_truncate_length;
// When to use dictionary encoding for data
dictionary_policy _dictionary_policy = dictionary_policy::ALWAYS;
dictionary_policy _dictionary_policy = dictionary_policy::ADAPTIVE;
// Maximum size of column chunk dictionary (in bytes)
size_t _max_dictionary_size = default_max_dictionary_size;
// Maximum number of rows in a page fragment
Expand Down Expand Up @@ -1095,7 +1095,7 @@ class parquet_writer_options_builder {
* dictionary_policy::ALWAYS will allow the use of dictionary encoding even if it will result in
* the disabling of compression for columns that would otherwise be compressed.
*
* The default value is dictionary_policy::ALWAYS.
* The default value is dictionary_policy::ADAPTIVE.
*
* @param val policy for dictionary use
* @return this for chaining
Expand Down Expand Up @@ -1258,7 +1258,7 @@ class chunked_parquet_writer_options {
// Maximum size of min or max values in column index
int32_t _column_index_truncate_length = default_column_index_truncate_length;
// When to use dictionary encoding for data
dictionary_policy _dictionary_policy = dictionary_policy::ALWAYS;
dictionary_policy _dictionary_policy = dictionary_policy::ADAPTIVE;
// Maximum size of column chunk dictionary (in bytes)
size_t _max_dictionary_size = default_max_dictionary_size;
// Maximum number of rows in a page fragment
Expand Down Expand Up @@ -1751,7 +1751,7 @@ class chunked_parquet_writer_options_builder {
* dictionary_policy::ALWAYS will allow the use of dictionary encoding even if it will result in
* the disabling of compression for columns that would otherwise be compressed.
*
* The default value is dictionary_policy::ALWAYS.
* The default value is dictionary_policy::ADAPTIVE.
*
* @param val policy for dictionary use
* @return this for chaining
Expand Down
14 changes: 12 additions & 2 deletions python/cudf/cudf/_lib/cpp/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
size_type get_row_group_size_rows() except +
size_t get_max_page_size_bytes() except +
size_type get_max_page_size_rows() except +
size_t get_max_dictionary_size() except +

void set_partitions(
vector[cudf_io_types.partition_info] partitions
Expand Down Expand Up @@ -103,8 +104,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
void set_row_group_size_rows(size_type val) except +
void set_max_page_size_bytes(size_t val) except +
void set_max_page_size_rows(size_type val) except +
void set_max_dictionary_size(size_t val) except +
void enable_write_v2_headers(bool val) except +
void set_dictionary_policy(cudf_io_types.dictionary_policy policy)except +
void set_dictionary_policy(cudf_io_types.dictionary_policy policy) except +

@staticmethod
parquet_writer_options_builder builder(
Expand Down Expand Up @@ -155,6 +157,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
parquet_writer_options_builder& max_page_size_rows(
size_type val
) except +
parquet_writer_options_builder& max_dictionary_size(
size_t val
) except +
parquet_writer_options_builder& write_v2_headers(
bool val
) except +
Expand All @@ -179,6 +184,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
size_type get_row_group_size_rows() except +
size_t get_max_page_size_bytes() except +
size_type get_max_page_size_rows() except +
size_t get_max_dictionary_size() except +

void set_metadata(
cudf_io_types.table_input_metadata m
Expand All @@ -202,8 +208,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
void set_row_group_size_rows(size_type val) except +
void set_max_page_size_bytes(size_t val) except +
void set_max_page_size_rows(size_type val) except +
void set_max_dictionary_size(size_t val) except +
void enable_write_v2_headers(bool val) except +
void set_dictionary_policy(cudf_io_types.dictionary_policy policy)except +
void set_dictionary_policy(cudf_io_types.dictionary_policy policy) except +

@staticmethod
chunked_parquet_writer_options_builder builder(
Expand Down Expand Up @@ -245,6 +252,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
chunked_parquet_writer_options_builder& max_page_size_rows(
size_type val
) except +
chunked_parquet_writer_options_builder& max_dictionary_size(
size_t val
) except +
parquet_writer_options_builder& write_v2_headers(
bool val
) except +
Expand Down
28 changes: 25 additions & 3 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ def write_parquet(
object row_group_size_rows=None,
object max_page_size_bytes=None,
object max_page_size_rows=None,
object max_dictionary_size=None,
object partitions_info=None,
object force_nullable_schema=False,
header_version="1.0",
Expand Down Expand Up @@ -478,7 +479,7 @@ def write_parquet(
)

dict_policy = (
cudf_io_types.dictionary_policy.ALWAYS
cudf_io_types.dictionary_policy.ADAPTIVE
if use_dictionary
else cudf_io_types.dictionary_policy.NEVER
)
Expand Down Expand Up @@ -528,6 +529,8 @@ def write_parquet(
args.set_max_page_size_bytes(max_page_size_bytes)
if max_page_size_rows is not None:
args.set_max_page_size_rows(max_page_size_rows)
if max_dictionary_size is not None:
args.set_max_dictionary_size(max_dictionary_size)

with nogil:
out_metadata_c = move(parquet_writer(args))
Expand Down Expand Up @@ -571,7 +574,14 @@ cdef class ParquetWriter:
max_page_size_rows: int, default 20000
Maximum number of rows of each page of the output.
By default, 20000 will be used.
max_dictionary_size: int, default 1048576
Maximum size of the dictionary page for each output column chunk. Dictionary
encoding for column chunks that exceeds this limit will be disabled.
By default, 1048576 (1MB) will be used.
use_dictionary : bool, default True
If ``True``, enable dictionary encoding for Parquet page data
subject to ``max_dictionary_size`` constraints.
If ``False``, disable dictionary encoding for Parquet page data.
See Also
--------
cudf.io.parquet.write_parquet
Expand All @@ -588,13 +598,17 @@ cdef class ParquetWriter:
cdef size_type row_group_size_rows
cdef size_t max_page_size_bytes
cdef size_type max_page_size_rows
cdef size_t max_dictionary_size
cdef cudf_io_types.dictionary_policy dict_policy

def __cinit__(self, object filepath_or_buffer, object index=None,
object compression="snappy", str statistics="ROWGROUP",
int row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT,
int row_group_size_rows=1000000,
int max_page_size_bytes=524288,
int max_page_size_rows=20000):
int max_page_size_rows=20000,
int max_dictionary_size=1048576,
bool use_dictionary=True):
filepaths_or_buffers = (
list(filepath_or_buffer)
if is_list_like(filepath_or_buffer)
Expand All @@ -609,6 +623,12 @@ cdef class ParquetWriter:
self.row_group_size_rows = row_group_size_rows
self.max_page_size_bytes = max_page_size_bytes
self.max_page_size_rows = max_page_size_rows
self.max_dictionary_size = max_dictionary_size
self.dict_policy = (
cudf_io_types.dictionary_policy.ADAPTIVE
if use_dictionary
else cudf_io_types.dictionary_policy.NEVER
)

def write_table(self, table, object partitions_info=None):
""" Writes a single table to the file """
Expand Down Expand Up @@ -726,8 +746,10 @@ cdef class ParquetWriter:
.row_group_size_rows(self.row_group_size_rows)
.max_page_size_bytes(self.max_page_size_bytes)
.max_page_size_rows(self.max_page_size_rows)
.max_dictionary_size(self.max_dictionary_size)
.build()
)
args.set_dictionary_policy(self.dict_policy)
self.writer.reset(new cpp_parquet_chunked_writer(args))
self.initialized = True

Expand Down
4 changes: 4 additions & 0 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def _write_parquet(
row_group_size_rows=None,
max_page_size_bytes=None,
max_page_size_rows=None,
max_dictionary_size=None,
partitions_info=None,
storage_options=None,
force_nullable_schema=False,
Expand Down Expand Up @@ -96,6 +97,7 @@ def _write_parquet(
"row_group_size_rows": row_group_size_rows,
"max_page_size_bytes": max_page_size_bytes,
"max_page_size_rows": max_page_size_rows,
"max_dictionary_size": max_dictionary_size,
"partitions_info": partitions_info,
"force_nullable_schema": force_nullable_schema,
"header_version": header_version,
Expand Down Expand Up @@ -898,6 +900,7 @@ def to_parquet(
row_group_size_rows=None,
max_page_size_bytes=None,
max_page_size_rows=None,
max_dictionary_size=None,
storage_options=None,
return_metadata=False,
force_nullable_schema=False,
Expand Down Expand Up @@ -974,6 +977,7 @@ def to_parquet(
row_group_size_rows=row_group_size_rows,
max_page_size_bytes=max_page_size_bytes,
max_page_size_rows=max_page_size_rows,
max_dictionary_size=max_dictionary_size,
partitions_info=partition_info,
storage_options=storage_options,
force_nullable_schema=force_nullable_schema,
Expand Down
37 changes: 37 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1890,6 +1890,43 @@ def test_parquet_writer_max_page_size(tmpdir, max_page_size_kwargs):
assert s1 > s2


@pytest.mark.parametrize("use_dict", [False, True])
@pytest.mark.parametrize("max_dict_size", [0, 1048576])
def test_parquet_writer_dictionary_setting(use_dict, max_dict_size):
# Simple test for checking the validity of dictionary encoding setting
# and behavior of ParquetWriter in cudf.
# Write a table with repetitive data with varying dictionary settings.
# Make sure the written columns are dictionary-encoded accordingly.

# Table with repetitive data
table = cudf.DataFrame(
{
"int32": cudf.Series([1024] * 1024, dtype="int64"),
}
)

# Write to Parquet using ParquetWriter
buffer = BytesIO()
writer = ParquetWriter(
buffer,
use_dictionary=use_dict,
max_dictionary_size=max_dict_size,
)
writer.write_table(table)
writer.close()

# Read encodings from parquet file
got = pq.ParquetFile(buffer)
encodings = got.metadata.row_group(0).column(0).encodings

# Check for `PLAIN_DICTIONARY` encoding if dictionary encoding enabled
# and dictionary page limit > 0
if use_dict is True and max_dict_size > 0:
assert "PLAIN_DICTIONARY" in encodings
else:
assert "PLAIN_DICTIONARY" not in encodings


@pytest.mark.parametrize("filename", ["myfile.parquet", None])
@pytest.mark.parametrize("cols", [["b"], ["c", "b"]])
def test_parquet_partitioned(tmpdir_factory, cols, filename):
Expand Down
8 changes: 6 additions & 2 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@
max_page_size_rows: integer or None, default None
Maximum number of rows of each page of the output.
If None, 20000 will be used.
max_dictionary_size: integer or None, default None
Maximum size of the dictionary page for each output column chunk. Dictionary
encoding for column chunks that exceeds this limit will be disabled.
If None, 1048576 (1MB) will be used.
storage_options : dict, optional, default None
Extra options that make sense for a particular storage connection,
e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value
Expand All @@ -292,8 +296,8 @@
``return_metadata=True`` instead of specifying ``metadata_file_path``
use_dictionary : bool, default True
When ``False``, prevents the use of dictionary encoding for Parquet page
data. When ``True``, dictionary encoding is preferred when not disabled due
to dictionary size constraints.
data. When ``True``, dictionary encoding is preferred subject to
``max_dictionary_size`` constraints.
header_version : {{'1.0', '2.0'}}, default "1.0"
Controls whether to use version 1.0 or version 2.0 page headers when
encoding. Version 1.0 is more portable, but version 2.0 enables the
Expand Down

0 comments on commit ce1933f

Please sign in to comment.