From 15fea6d0777c98938c5aa436b3b22a76a36ef488 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 10 Jul 2024 15:40:41 -0700 Subject: [PATCH] refactor: make more explicit the relationship btwn TableParquetOptions vs ParquetOptions vs WriterProperties --- datafusion/common/src/config.rs | 168 ++++++++++++++++++ .../common/src/file_options/parquet_writer.rs | 146 +-------------- 2 files changed, 174 insertions(+), 140 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1d2a9589adfc..6c7c95808cbc 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -22,6 +22,22 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::{self, Display}; use std::str::FromStr; +#[cfg(feature = "parquet")] +use crate::file_options::parquet_writer::{ + parse_compression_string, parse_encoding_string, parse_statistics_string, + parse_version_string, +}; +#[cfg(feature = "parquet")] +use parquet::{ + file::properties::{ + WriterProperties, WriterPropertiesBuilder, DEFAULT_BLOOM_FILTER_FPP, + DEFAULT_BLOOM_FILTER_NDV, DEFAULT_MAX_STATISTICS_SIZE, + DEFAULT_STATISTICS_ENABLED, + }, + format::KeyValue, + schema::types::ColumnPath, +}; + use crate::error::_config_err; use crate::parsers::CompressionTypeVariant; use crate::{DataFusionError, Result}; @@ -454,6 +470,80 @@ config_namespace! { } } +#[cfg(feature = "parquet")] +impl ParquetOptions { + /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`]. + /// + /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options + /// applied per column; a customization which is not applicable for [`ParquetOptions`]. + pub fn writer_props_from_global_opts(&self) -> Result { + let ParquetOptions { + data_pagesize_limit, + write_batch_size, + writer_version, + compression, + dictionary_enabled, + dictionary_page_size_limit, + statistics_enabled, + max_statistics_size, + max_row_group_size, + created_by, + column_index_truncate_length, + data_page_row_count_limit, + encoding, + bloom_filter_on_write, + bloom_filter_fpp, + bloom_filter_ndv, + + // not in WriterProperties + enable_page_index: _, + pruning: _, + skip_metadata: _, + metadata_size_hint: _, + pushdown_filters: _, + reorder_filters: _, + allow_single_file_parallelism: _, + maximum_parallel_row_group_writers: _, + maximum_buffered_record_batches_per_stream: _, + bloom_filter_on_read: _, // reads not used for writer props + } = self; + + let mut builder = WriterProperties::builder() + .set_data_page_size_limit(*data_pagesize_limit) + .set_write_batch_size(*write_batch_size) + .set_writer_version(parse_version_string(writer_version.as_str())?) + .set_dictionary_enabled(dictionary_enabled.unwrap_or(false)) + .set_dictionary_page_size_limit(*dictionary_page_size_limit) + .set_statistics_enabled( + statistics_enabled + .as_ref() + .and_then(|s| parse_statistics_string(s).ok()) + .unwrap_or(DEFAULT_STATISTICS_ENABLED), + ) + .set_max_statistics_size( + max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE), + ) + .set_max_row_group_size(*max_row_group_size) + .set_created_by(created_by.clone()) + .set_column_index_truncate_length(*column_index_truncate_length) + .set_data_page_row_count_limit(*data_page_row_count_limit) + .set_bloom_filter_enabled(*bloom_filter_on_write) + .set_bloom_filter_fpp(bloom_filter_fpp.unwrap_or(DEFAULT_BLOOM_FILTER_FPP)) + .set_bloom_filter_ndv(bloom_filter_ndv.unwrap_or(DEFAULT_BLOOM_FILTER_NDV)); + + // We do not have access to default ColumnProperties set in Arrow. + // Therefore, only overwrite if these settings exist. + if let Some(compression) = compression { + builder = builder.set_compression(parse_compression_string(compression)?); + } + if let Some(encoding) = encoding { + builder = builder.set_encoding(parse_encoding_string(encoding)?); + } + + Ok(builder) + } +} + config_namespace! { /// Options related to aggregate execution /// @@ -1421,6 +1511,84 @@ impl TableParquetOptions { } } +#[cfg(feature = "parquet")] +impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { + type Error = DataFusionError; + + /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. + /// + /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. + fn try_from(table_parquet_options: &TableParquetOptions) -> Result { + // Table options include kv_metadata and col-specific options + let TableParquetOptions { + global, + column_specific_options, + key_value_metadata, + } = table_parquet_options; + + let mut builder = global.writer_props_from_global_opts()?; + + if !key_value_metadata.is_empty() { + builder = builder.set_key_value_metadata(Some( + key_value_metadata + .to_owned() + .drain() + .map(|(key, value)| KeyValue { key, value }) + .collect(), + )); + } + + // Apply column-specific options: + for (column, options) in column_specific_options { + let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect()); + + if let Some(bloom_filter_enabled) = options.bloom_filter_enabled { + builder = builder + .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled); + } + + if let Some(encoding) = &options.encoding { + let parsed_encoding = parse_encoding_string(encoding)?; + builder = builder.set_column_encoding(path.clone(), parsed_encoding); + } + + if let Some(dictionary_enabled) = options.dictionary_enabled { + builder = builder + .set_column_dictionary_enabled(path.clone(), dictionary_enabled); + } + + if let Some(compression) = &options.compression { + let parsed_compression = parse_compression_string(compression)?; + builder = + builder.set_column_compression(path.clone(), parsed_compression); + } + + if let Some(statistics_enabled) = &options.statistics_enabled { + let parsed_value = parse_statistics_string(statistics_enabled)?; + builder = + builder.set_column_statistics_enabled(path.clone(), parsed_value); + } + + if let Some(bloom_filter_fpp) = options.bloom_filter_fpp { + builder = + builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp); + } + + if let Some(bloom_filter_ndv) = options.bloom_filter_ndv { + builder = + builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv); + } + + if let Some(max_statistics_size) = options.max_statistics_size { + builder = + builder.set_column_max_statistics_size(path, max_statistics_size); + } + } + + Ok(builder) + } +} + impl ConfigField for TableParquetOptions { fn visit(&self, v: &mut V, key_prefix: &str, description: &'static str) { self.global.visit(v, key_prefix, description); diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 009164a29e34..d07360148b76 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -17,18 +17,13 @@ //! Options related to how parquet files should be written -use crate::{ - config::{ParquetOptions, TableParquetOptions}, - DataFusionError, Result, -}; +use crate::{config::TableParquetOptions, DataFusionError, Result}; use parquet::{ basic::{BrotliLevel, GzipLevel, ZstdLevel}, - file::{ - metadata::KeyValue, - properties::{EnabledStatistics, WriterProperties, WriterVersion}, + file::properties::{ + EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion, }, - schema::types::ColumnPath, }; /// Options for writing parquet files @@ -52,140 +47,11 @@ impl ParquetWriterOptions { impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { type Error = DataFusionError; - fn try_from(parquet_options: &TableParquetOptions) -> Result { - let ParquetOptions { - data_pagesize_limit, - write_batch_size, - writer_version, - dictionary_page_size_limit, - max_row_group_size, - created_by, - column_index_truncate_length, - data_page_row_count_limit, - bloom_filter_on_write, - encoding, - dictionary_enabled, - compression, - statistics_enabled, - max_statistics_size, - bloom_filter_fpp, - bloom_filter_ndv, - // below is not part of ParquetWriterOptions - enable_page_index: _, - pruning: _, - skip_metadata: _, - metadata_size_hint: _, - pushdown_filters: _, - reorder_filters: _, - allow_single_file_parallelism: _, - maximum_parallel_row_group_writers: _, - maximum_buffered_record_batches_per_stream: _, - bloom_filter_on_read: _, - } = &parquet_options.global; - - let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() { - Some( - parquet_options - .key_value_metadata - .clone() - .drain() - .map(|(key, value)| KeyValue { key, value }) - .collect::>(), - ) - } else { - None - }; - - let mut builder = WriterProperties::builder() - .set_data_page_size_limit(*data_pagesize_limit) - .set_write_batch_size(*write_batch_size) - .set_writer_version(parse_version_string(writer_version.as_str())?) - .set_dictionary_page_size_limit(*dictionary_page_size_limit) - .set_max_row_group_size(*max_row_group_size) - .set_created_by(created_by.clone()) - .set_column_index_truncate_length(*column_index_truncate_length) - .set_data_page_row_count_limit(*data_page_row_count_limit) - .set_bloom_filter_enabled(*bloom_filter_on_write) - .set_key_value_metadata(key_value_metadata); - - if let Some(encoding) = &encoding { - builder = builder.set_encoding(parse_encoding_string(encoding)?); - } - - if let Some(enabled) = dictionary_enabled { - builder = builder.set_dictionary_enabled(*enabled); - } - - if let Some(compression) = &compression { - builder = builder.set_compression(parse_compression_string(compression)?); - } - - if let Some(statistics) = &statistics_enabled { - builder = - builder.set_statistics_enabled(parse_statistics_string(statistics)?); - } - - if let Some(size) = max_statistics_size { - builder = builder.set_max_statistics_size(*size); - } - - if let Some(fpp) = bloom_filter_fpp { - builder = builder.set_bloom_filter_fpp(*fpp); - } - - if let Some(ndv) = bloom_filter_ndv { - builder = builder.set_bloom_filter_ndv(*ndv); - } - - for (column, options) in &parquet_options.column_specific_options { - let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect()); - - if let Some(bloom_filter_enabled) = options.bloom_filter_enabled { - builder = builder - .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled); - } - - if let Some(encoding) = &options.encoding { - let parsed_encoding = parse_encoding_string(encoding)?; - builder = builder.set_column_encoding(path.clone(), parsed_encoding); - } - - if let Some(dictionary_enabled) = options.dictionary_enabled { - builder = builder - .set_column_dictionary_enabled(path.clone(), dictionary_enabled); - } - - if let Some(compression) = &options.compression { - let parsed_compression = parse_compression_string(compression)?; - builder = - builder.set_column_compression(path.clone(), parsed_compression); - } - - if let Some(statistics_enabled) = &options.statistics_enabled { - let parsed_value = parse_statistics_string(statistics_enabled)?; - builder = - builder.set_column_statistics_enabled(path.clone(), parsed_value); - } - - if let Some(bloom_filter_fpp) = options.bloom_filter_fpp { - builder = - builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp); - } - - if let Some(bloom_filter_ndv) = options.bloom_filter_ndv { - builder = - builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv); - } - - if let Some(max_statistics_size) = options.max_statistics_size { - builder = - builder.set_column_max_statistics_size(path, max_statistics_size); - } - } - + fn try_from(parquet_table_options: &TableParquetOptions) -> Result { // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns) Ok(ParquetWriterOptions { - writer_options: builder.build(), + writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)? + .build(), }) } }