Skip to content

Commit

Permalink
Add python tests for Parquet DELTA_BINARY_PACKED encoder (#14316)
Browse files Browse the repository at this point in the history
During the review of #14100 there was a suggestion to add a test of writing using cudf and then reading the resulting file back with pyarrow. This PR adds the necessary python bindings to perform this test.

NOTE: there is currently an issue with encoding 32-bit values where the deltas exceed 32-bits. parquet-mr and arrow truncate the deltas for the INT32 physical type and allow values to overflow, whereas cudf currently uses 64-bit deltas, which avoids the overflow, but can result in requiring 33-bits when encoding. The current cudf behavior is allowed by the specification (and in fact is readable by parquet-mr), but using the extra bit is not in the Parquet spirit of least output file size. This will be addressed in follow-on work.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - MithunR (https://github.com/mythrocks)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #14316
  • Loading branch information
etseidl authored Nov 8, 2023
1 parent a35f90c commit c4e6c09
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 28 deletions.
6 changes: 3 additions & 3 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ class writer_compression_statistics {
* @brief Control use of dictionary encoding for parquet writer
*/
enum dictionary_policy {
NEVER, ///< Never use dictionary encoding
ADAPTIVE, ///< Use dictionary when it will not impact compression
ALWAYS ///< Use dictionary reqardless of impact on compression
NEVER = 0, ///< Never use dictionary encoding
ADAPTIVE = 1, ///< Use dictionary when it will not impact compression
ALWAYS = 2 ///< Use dictionary regardless of impact on compression
};

/**
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/delta_enc.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ inline __device__ void put_zz128(uint8_t*& p, zigzag128_t v)
// too much shared memory.
// The parquet spec requires block_size to be a multiple of 128, and values_per_mini_block
// to be a multiple of 32.
// TODO: if these are ever made configurable, be sure to fix the page size calculation in
// delta_data_len() (page_enc.cu).
constexpr int block_size = 128;
constexpr int num_mini_blocks = 4;
constexpr int values_per_mini_block = block_size / num_mini_blocks;
Expand Down
14 changes: 11 additions & 3 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,20 @@ __device__ size_t delta_data_len(Type physical_type, cudf::type_id type_id, uint

auto const vals_per_block = delta::block_size;
size_t const num_blocks = util::div_rounding_up_unsafe(num_values, vals_per_block);
// need max dtype_len + 1 bytes for min_delta
// need max dtype_len + 1 bytes for min_delta (because we only encode 7 bits per byte)
// one byte per mini block for the bitwidth
// and block_size * dtype_len bytes for the actual encoded data
auto const block_size = dtype_len + 1 + delta::num_mini_blocks + vals_per_block * dtype_len;
auto const mini_block_header_size = dtype_len + 1 + delta::num_mini_blocks;
// each encoded value can be at most sizeof(type) * 8 + 1 bits
auto const max_bits = dtype_len * 8 + 1;
// each data block will then be max_bits * values per block. vals_per_block is guaranteed to be
// divisible by 128 (via static assert on delta::block_size), but do safe division anyway.
auto const bytes_per_block = cudf::util::div_rounding_up_unsafe(max_bits * vals_per_block, 8);
auto const block_size = mini_block_header_size + bytes_per_block;

// delta header is 2 bytes for the block_size, 1 byte for number of mini-blocks,
// max 5 bytes for number of values, and max dtype_len + 1 for first value.
// TODO: if we ever allow configurable block sizes then this calculation will need to be
// modified.
auto const header_size = 2 + 1 + 5 + dtype_len + 1;

return header_size + num_blocks * block_size;
Expand Down Expand Up @@ -1279,6 +1286,7 @@ __device__ void finish_page_encode(state_buf* s,
uint8_t const* const base = s->page.page_data + s->page.max_hdr_size;
auto const actual_data_size = static_cast<uint32_t>(end_ptr - base);
if (actual_data_size > s->page.max_data_size) {
// FIXME(ets): this needs to do error propagation back to the host
CUDF_UNREACHABLE("detected possible page data corruption");
}
s->page.max_data_size = actual_data_size;
Expand Down
16 changes: 16 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
void set_row_group_size_rows(size_type val) except +
void set_max_page_size_bytes(size_t val) except +
void set_max_page_size_rows(size_type val) except +
void enable_write_v2_headers(bool val) except +
void set_dictionary_policy(cudf_io_types.dictionary_policy policy)except +

@staticmethod
parquet_writer_options_builder builder(
Expand Down Expand Up @@ -150,6 +152,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
parquet_writer_options_builder& max_page_size_rows(
size_type val
) except +
parquet_writer_options_builder& write_v2_headers(
bool val
) except +
parquet_writer_options_builder& dictionary_policy(
cudf_io_types.dictionary_policy val
) except +

parquet_writer_options build() except +

Expand Down Expand Up @@ -191,6 +199,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
void set_row_group_size_rows(size_type val) except +
void set_max_page_size_bytes(size_t val) except +
void set_max_page_size_rows(size_type val) except +
void enable_write_v2_headers(bool val) except +
void set_dictionary_policy(cudf_io_types.dictionary_policy policy)except +

@staticmethod
chunked_parquet_writer_options_builder builder(
Expand Down Expand Up @@ -232,6 +242,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
chunked_parquet_writer_options_builder& max_page_size_rows(
size_type val
) except +
parquet_writer_options_builder& write_v2_headers(
bool val
) except +
parquet_writer_options_builder& dictionary_policy(
cudf_io_types.dictionary_policy val
) except +

chunked_parquet_writer_options build() except +

Expand Down
5 changes: 5 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/types.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ cdef extern from "cudf/io/types.hpp" \
STATISTICS_PAGE = 2,
STATISTICS_COLUMN = 3,

ctypedef enum dictionary_policy:
NEVER = 0,
ADAPTIVE = 1,
ALWAYS = 2,

cdef cppclass column_name_info:
string name
vector[column_name_info] children
Expand Down
16 changes: 16 additions & 0 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ def write_parquet(
object max_page_size_rows=None,
object partitions_info=None,
object force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
):
"""
Cython function to call into libcudf API, see `write_parquet`.
Expand Down Expand Up @@ -383,6 +385,18 @@ def write_parquet(
tmp_user_data[str.encode("pandas")] = str.encode(pandas_metadata)
user_data.push_back(tmp_user_data)

if header_version not in ("1.0", "2.0"):
raise ValueError(
f"Invalid parquet header version: {header_version}. "
"Valid values are '1.0' and '2.0'"
)

dict_policy = (
cudf_io_types.dictionary_policy.ALWAYS
if use_dictionary
else cudf_io_types.dictionary_policy.NEVER
)

cdef cudf_io_types.compression_type comp_type = _get_comp_type(compression)
cdef cudf_io_types.statistics_freq stat_freq = _get_stat_freq(statistics)

Expand All @@ -399,6 +413,8 @@ def write_parquet(
.compression(comp_type)
.stats_level(stat_freq)
.int96_timestamps(_int96_timestamps)
.write_v2_headers(header_version == "2.0")
.dictionary_policy(dict_policy)
.utc_timestamps(False)
.build()
)
Expand Down
4 changes: 4 additions & 0 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6492,6 +6492,8 @@ def to_parquet(
max_page_size_rows=None,
storage_options=None,
return_metadata=False,
use_dictionary=True,
header_version="1.0",
*args,
**kwargs,
):
Expand All @@ -6516,6 +6518,8 @@ def to_parquet(
max_page_size_rows=max_page_size_rows,
storage_options=storage_options,
return_metadata=return_metadata,
use_dictionary=use_dictionary,
header_version=header_version,
*args,
**kwargs,
)
Expand Down
13 changes: 9 additions & 4 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def _write_parquet(
partitions_info=None,
storage_options=None,
force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
):
if is_list_like(paths) and len(paths) > 1:
if partitions_info is None:
Expand Down Expand Up @@ -96,6 +98,8 @@ def _write_parquet(
"max_page_size_rows": max_page_size_rows,
"partitions_info": partitions_info,
"force_nullable_schema": force_nullable_schema,
"header_version": header_version,
"use_dictionary": use_dictionary,
}
if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs):
with ExitStack() as stack:
Expand Down Expand Up @@ -204,7 +208,6 @@ def write_to_dataset(
fs.mkdirs(root_path, exist_ok=True)

if partition_cols is not None and len(partition_cols) > 0:

(
full_paths,
metadata_file_paths,
Expand Down Expand Up @@ -712,7 +715,6 @@ def _parquet_to_frame(
dataset_kwargs=None,
**kwargs,
):

# If this is not a partitioned read, only need
# one call to `_read_parquet`
if not partition_keys:
Expand Down Expand Up @@ -756,7 +758,7 @@ def _parquet_to_frame(
)
)
# Add partition columns to the last DataFrame
for (name, value) in part_key:
for name, value in part_key:
_len = len(dfs[-1])
if partition_categories and name in partition_categories:
# Build the categorical column from `codes`
Expand Down Expand Up @@ -869,6 +871,8 @@ def to_parquet(
storage_options=None,
return_metadata=False,
force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
*args,
**kwargs,
):
Expand Down Expand Up @@ -943,6 +947,8 @@ def to_parquet(
partitions_info=partition_info,
storage_options=storage_options,
force_nullable_schema=force_nullable_schema,
header_version=header_version,
use_dictionary=use_dictionary,
)

else:
Expand Down Expand Up @@ -1045,7 +1051,6 @@ def _get_groups_and_offsets(
preserve_index=False,
**kwargs,
):

if not (set(df._data) - set(partition_cols)):
warnings.warn("No data left to save outside partition columns")

Expand Down
54 changes: 36 additions & 18 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1280,32 +1280,29 @@ def test_parquet_reader_v2(tmpdir, simple_pdf):
simple_pdf.to_parquet(pdf_fname, data_page_version="2.0")
assert_eq(cudf.read_parquet(pdf_fname), simple_pdf)

cudf.from_pandas(simple_pdf).to_parquet(pdf_fname, header_version="2.0")
assert_eq(cudf.read_parquet(pdf_fname), simple_pdf)


@pytest.mark.parametrize("nrows", [1, 100000])
@pytest.mark.parametrize("add_nulls", [True, False])
def test_delta_binary(nrows, add_nulls, tmpdir):
@pytest.mark.parametrize(
"dtype",
[
"int8",
"int16",
"int32",
"int64",
],
)
def test_delta_binary(nrows, add_nulls, dtype, tmpdir):
null_frequency = 0.25 if add_nulls else 0

# Create a pandas dataframe with random data of mixed types
arrow_table = dg.rand_dataframe(
dtypes_meta=[
{
"dtype": "int8",
"null_frequency": null_frequency,
"cardinality": nrows,
},
{
"dtype": "int16",
"null_frequency": null_frequency,
"cardinality": nrows,
},
{
"dtype": "int32",
"null_frequency": null_frequency,
"cardinality": nrows,
},
{
"dtype": "int64",
"dtype": dtype,
"null_frequency": null_frequency,
"cardinality": nrows,
},
Expand All @@ -1330,6 +1327,28 @@ def test_delta_binary(nrows, add_nulls, tmpdir):
pcdf = cudf.from_pandas(test_pdf)
assert_eq(cdf, pcdf)

# Write back out with cudf and make sure pyarrow can read it
cudf_fname = tmpdir.join("cudfv2.parquet")
pcdf.to_parquet(
cudf_fname,
compression=None,
header_version="2.0",
use_dictionary=False,
)

# FIXME(ets): should probably not use more bits than the data type
try:
cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname))
except OSError as e:
if dtype == "int32" and nrows == 100000:
pytest.mark.xfail(
reason="arrow does not support 33-bit delta encoding"
)
else:
raise e
else:
assert_eq(cdf2, cdf)


@pytest.mark.parametrize(
"data",
Expand Down Expand Up @@ -1464,7 +1483,6 @@ def test_parquet_writer_int96_timestamps(tmpdir, pdf, gdf):


def test_multifile_parquet_folder(tmpdir):

test_pdf1 = make_pdf(nrows=10, nvalids=10 // 2)
test_pdf2 = make_pdf(nrows=20)
expect = pd.concat([test_pdf1, test_pdf2])
Expand Down
8 changes: 8 additions & 0 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,14 @@
include the file path metadata (relative to `root_path`).
To request metadata binary blob when using with ``partition_cols``, Pass
``return_metadata=True`` instead of specifying ``metadata_file_path``
use_dictionary : bool, default True
When ``False``, prevents the use of dictionary encoding for Parquet page
data. When ``True``, dictionary encoding is preferred when not disabled due
to dictionary size constraints.
header_version : {{'1.0', '2.0'}}, default "1.0"
Controls whether to use version 1.0 or version 2.0 page headers when
encoding. Version 1.0 is more portable, but version 2.0 enables the
use of newer encoding schemes.
force_nullable_schema : bool, default False.
If True, writes all columns as `null` in schema.
If False, columns are written as `null` if they contain null values,
Expand Down

0 comments on commit c4e6c09

Please sign in to comment.