Skip to content

Commit

Permalink
refactor: make more explicit the relationship btwn TableParquetOption…
Browse files Browse the repository at this point in the history
…s vs ParquetOptions vs WriterProperties
  • Loading branch information
wiedld committed Jul 12, 2024
1 parent 4d04a6e commit 15fea6d
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 140 deletions.
168 changes: 168 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ use std::collections::{BTreeMap, HashMap};
use std::fmt::{self, Display};
use std::str::FromStr;

#[cfg(feature = "parquet")]
use crate::file_options::parquet_writer::{
parse_compression_string, parse_encoding_string, parse_statistics_string,
parse_version_string,
};
#[cfg(feature = "parquet")]
use parquet::{
file::properties::{
WriterProperties, WriterPropertiesBuilder, DEFAULT_BLOOM_FILTER_FPP,
DEFAULT_BLOOM_FILTER_NDV, DEFAULT_MAX_STATISTICS_SIZE,
DEFAULT_STATISTICS_ENABLED,
},
format::KeyValue,
schema::types::ColumnPath,
};

use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::{DataFusionError, Result};
Expand Down Expand Up @@ -454,6 +470,80 @@ config_namespace! {
}
}

#[cfg(feature = "parquet")]
impl ParquetOptions {
/// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
///
/// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options
/// applied per column; a customization which is not applicable for [`ParquetOptions`].
pub fn writer_props_from_global_opts(&self) -> Result<WriterPropertiesBuilder> {
let ParquetOptions {
data_pagesize_limit,
write_batch_size,
writer_version,
compression,
dictionary_enabled,
dictionary_page_size_limit,
statistics_enabled,
max_statistics_size,
max_row_group_size,
created_by,
column_index_truncate_length,
data_page_row_count_limit,
encoding,
bloom_filter_on_write,
bloom_filter_fpp,
bloom_filter_ndv,

// not in WriterProperties
enable_page_index: _,
pruning: _,
skip_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
reorder_filters: _,
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
bloom_filter_on_read: _, // reads not used for writer props
} = self;

let mut builder = WriterProperties::builder()
.set_data_page_size_limit(*data_pagesize_limit)
.set_write_batch_size(*write_batch_size)
.set_writer_version(parse_version_string(writer_version.as_str())?)
.set_dictionary_enabled(dictionary_enabled.unwrap_or(false))
.set_dictionary_page_size_limit(*dictionary_page_size_limit)
.set_statistics_enabled(
statistics_enabled
.as_ref()
.and_then(|s| parse_statistics_string(s).ok())
.unwrap_or(DEFAULT_STATISTICS_ENABLED),
)
.set_max_statistics_size(
max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE),
)
.set_max_row_group_size(*max_row_group_size)
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_on_write)
.set_bloom_filter_fpp(bloom_filter_fpp.unwrap_or(DEFAULT_BLOOM_FILTER_FPP))
.set_bloom_filter_ndv(bloom_filter_ndv.unwrap_or(DEFAULT_BLOOM_FILTER_NDV));

// We do not have access to default ColumnProperties set in Arrow.
// Therefore, only overwrite if these settings exist.
if let Some(compression) = compression {
builder = builder.set_compression(parse_compression_string(compression)?);
}
if let Some(encoding) = encoding {
builder = builder.set_encoding(parse_encoding_string(encoding)?);
}

Ok(builder)
}
}

config_namespace! {
/// Options related to aggregate execution
///
Expand Down Expand Up @@ -1421,6 +1511,84 @@ impl TableParquetOptions {
}
}

#[cfg(feature = "parquet")]
impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
type Error = DataFusionError;

/// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
///
/// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
// Table options include kv_metadata and col-specific options
let TableParquetOptions {
global,
column_specific_options,
key_value_metadata,
} = table_parquet_options;

let mut builder = global.writer_props_from_global_opts()?;

if !key_value_metadata.is_empty() {
builder = builder.set_key_value_metadata(Some(
key_value_metadata
.to_owned()
.drain()
.map(|(key, value)| KeyValue { key, value })
.collect(),
));
}

// Apply column-specific options:
for (column, options) in column_specific_options {
let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());

if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
builder = builder
.set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
}

if let Some(encoding) = &options.encoding {
let parsed_encoding = parse_encoding_string(encoding)?;
builder = builder.set_column_encoding(path.clone(), parsed_encoding);
}

if let Some(dictionary_enabled) = options.dictionary_enabled {
builder = builder
.set_column_dictionary_enabled(path.clone(), dictionary_enabled);
}

if let Some(compression) = &options.compression {
let parsed_compression = parse_compression_string(compression)?;
builder =
builder.set_column_compression(path.clone(), parsed_compression);
}

if let Some(statistics_enabled) = &options.statistics_enabled {
let parsed_value = parse_statistics_string(statistics_enabled)?;
builder =
builder.set_column_statistics_enabled(path.clone(), parsed_value);
}

if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
builder =
builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
}

if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
builder =
builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
}

if let Some(max_statistics_size) = options.max_statistics_size {
builder =
builder.set_column_max_statistics_size(path, max_statistics_size);
}
}

Ok(builder)
}
}

impl ConfigField for TableParquetOptions {
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
self.global.visit(v, key_prefix, description);
Expand Down
146 changes: 6 additions & 140 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

//! Options related to how parquet files should be written
use crate::{
config::{ParquetOptions, TableParquetOptions},
DataFusionError, Result,
};
use crate::{config::TableParquetOptions, DataFusionError, Result};

use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::{
metadata::KeyValue,
properties::{EnabledStatistics, WriterProperties, WriterVersion},
file::properties::{
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
},
schema::types::ColumnPath,
};

/// Options for writing parquet files
Expand All @@ -52,140 +47,11 @@ impl ParquetWriterOptions {
impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
type Error = DataFusionError;

fn try_from(parquet_options: &TableParquetOptions) -> Result<Self> {
let ParquetOptions {
data_pagesize_limit,
write_batch_size,
writer_version,
dictionary_page_size_limit,
max_row_group_size,
created_by,
column_index_truncate_length,
data_page_row_count_limit,
bloom_filter_on_write,
encoding,
dictionary_enabled,
compression,
statistics_enabled,
max_statistics_size,
bloom_filter_fpp,
bloom_filter_ndv,
// below is not part of ParquetWriterOptions
enable_page_index: _,
pruning: _,
skip_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
reorder_filters: _,
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
bloom_filter_on_read: _,
} = &parquet_options.global;

let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() {
Some(
parquet_options
.key_value_metadata
.clone()
.drain()
.map(|(key, value)| KeyValue { key, value })
.collect::<Vec<_>>(),
)
} else {
None
};

let mut builder = WriterProperties::builder()
.set_data_page_size_limit(*data_pagesize_limit)
.set_write_batch_size(*write_batch_size)
.set_writer_version(parse_version_string(writer_version.as_str())?)
.set_dictionary_page_size_limit(*dictionary_page_size_limit)
.set_max_row_group_size(*max_row_group_size)
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_on_write)
.set_key_value_metadata(key_value_metadata);

if let Some(encoding) = &encoding {
builder = builder.set_encoding(parse_encoding_string(encoding)?);
}

if let Some(enabled) = dictionary_enabled {
builder = builder.set_dictionary_enabled(*enabled);
}

if let Some(compression) = &compression {
builder = builder.set_compression(parse_compression_string(compression)?);
}

if let Some(statistics) = &statistics_enabled {
builder =
builder.set_statistics_enabled(parse_statistics_string(statistics)?);
}

if let Some(size) = max_statistics_size {
builder = builder.set_max_statistics_size(*size);
}

if let Some(fpp) = bloom_filter_fpp {
builder = builder.set_bloom_filter_fpp(*fpp);
}

if let Some(ndv) = bloom_filter_ndv {
builder = builder.set_bloom_filter_ndv(*ndv);
}

for (column, options) in &parquet_options.column_specific_options {
let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());

if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
builder = builder
.set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
}

if let Some(encoding) = &options.encoding {
let parsed_encoding = parse_encoding_string(encoding)?;
builder = builder.set_column_encoding(path.clone(), parsed_encoding);
}

if let Some(dictionary_enabled) = options.dictionary_enabled {
builder = builder
.set_column_dictionary_enabled(path.clone(), dictionary_enabled);
}

if let Some(compression) = &options.compression {
let parsed_compression = parse_compression_string(compression)?;
builder =
builder.set_column_compression(path.clone(), parsed_compression);
}

if let Some(statistics_enabled) = &options.statistics_enabled {
let parsed_value = parse_statistics_string(statistics_enabled)?;
builder =
builder.set_column_statistics_enabled(path.clone(), parsed_value);
}

if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
builder =
builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
}

if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
builder =
builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
}

if let Some(max_statistics_size) = options.max_statistics_size {
builder =
builder.set_column_max_statistics_size(path, max_statistics_size);
}
}

fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
// ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
Ok(ParquetWriterOptions {
writer_options: builder.build(),
writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
.build(),
})
}
}
Expand Down

0 comments on commit 15fea6d

Please sign in to comment.