From 15a9fec1795320e46e71949f68616fa0428c4e3e Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 12 Jul 2024 12:17:33 -0700 Subject: [PATCH] test: demonstrate the relationship btwn session configs and writer props --- datafusion/common/src/config.rs | 189 ++++++++++++++++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 6c7c95808cbc..99ae584c3256 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -2042,4 +2042,193 @@ mod tests { let parsed_metadata = table_config.parquet.key_value_metadata; assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into()))); } + + #[cfg(feature = "parquet")] + mod test_conversion_from_session_to_writer_props { + use super::super::*; + use parquet::{basic::Compression, file::properties::EnabledStatistics}; + + const COL_NAME: &str = "configured"; + + /// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config. + fn column_options_with_non_defaults( + src_col_defaults: &ParquetOptions, + ) -> ColumnOptions { + ColumnOptions { + compression: Some("zstd(22)".into()), + dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v), + statistics_enabled: Some("page".into()), + max_statistics_size: Some(72), + encoding: Some("RLE".into()), + bloom_filter_enabled: Some(true), + bloom_filter_fpp: Some(0.72), + bloom_filter_ndv: Some(72), + } + } + + fn parquet_options_with_non_defaults() -> ParquetOptions { + let defaults = ParquetOptions::default(); + let writer_version = if defaults.writer_version.eq("1.0") { + "2.0" + } else { + "1.0" + }; + + ParquetOptions { + data_pagesize_limit: 42, + write_batch_size: 42, + writer_version: writer_version.into(), + compression: Some("zstd(22)".into()), + dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)), + dictionary_page_size_limit: 42, + statistics_enabled: Some("chunk".into()), + max_statistics_size: Some(42), + max_row_group_size: 42, + created_by: "wordy".into(), + column_index_truncate_length: Some(42), + data_page_row_count_limit: 42, + encoding: Some("BYTE_STREAM_SPLIT".into()), + bloom_filter_on_write: !defaults.bloom_filter_on_write, + bloom_filter_fpp: Some(0.42), + bloom_filter_ndv: Some(42), + + // not in WriterProperties, but itemizing here to not skip newly added props + enable_page_index: Default::default(), + pruning: Default::default(), + skip_metadata: Default::default(), + metadata_size_hint: Default::default(), + pushdown_filters: Default::default(), + reorder_filters: Default::default(), + allow_single_file_parallelism: Default::default(), + maximum_parallel_row_group_writers: Default::default(), + maximum_buffered_record_batches_per_stream: Default::default(), + bloom_filter_on_read: Default::default(), + } + } + + fn extract_column_options( + props: &WriterProperties, + col: ColumnPath, + ) -> ColumnOptions { + let bloom_filter_default_props = props.bloom_filter_properties(&col); + + ColumnOptions { + bloom_filter_enabled: Some(bloom_filter_default_props.is_some()), + encoding: props.encoding(&col).map(|s| s.to_string()), + dictionary_enabled: Some(props.dictionary_enabled(&col)), + compression: match props.compression(&col) { + Compression::ZSTD(lvl) => { + Some(format!("zstd({})", lvl.compression_level())) + } + _ => None, + }, + statistics_enabled: Some( + match props.statistics_enabled(&col) { + EnabledStatistics::None => "none", + EnabledStatistics::Chunk => "chunk", + EnabledStatistics::Page => "page", + } + .into(), + ), + bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp), + bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv), + max_statistics_size: Some(props.max_statistics_size(&col)), + } + } + + /// For testing only, take a single write's props and convert back into the session config. + /// (use identity to confirm correct.) + fn session_config_from_writer_props( + props: &WriterProperties, + ) -> TableParquetOptions { + let default_col = ColumnPath::from("col doesn't have specific config"); + let default_col_props = extract_column_options(props, default_col); + + let configured_col = ColumnPath::from(COL_NAME); + let configured_col_props = extract_column_options(props, configured_col); + + let key_value_metadata = props + .key_value_metadata() + .map(|pairs| { + HashMap::from_iter( + pairs + .iter() + .cloned() + .map(|KeyValue { key, value }| (key, value)), + ) + }) + .unwrap_or_default(); + + TableParquetOptions { + global: ParquetOptions { + // global options + data_pagesize_limit: props.dictionary_page_size_limit(), + write_batch_size: props.write_batch_size(), + writer_version: format!("{}.0", props.writer_version().as_num()), + dictionary_page_size_limit: props.dictionary_page_size_limit(), + max_row_group_size: props.max_row_group_size(), + created_by: props.created_by().to_string(), + column_index_truncate_length: props.column_index_truncate_length(), + data_page_row_count_limit: props.data_page_row_count_limit(), + + // global options which set the default column props + encoding: default_col_props.encoding, + compression: default_col_props.compression, + dictionary_enabled: default_col_props.dictionary_enabled, + statistics_enabled: default_col_props.statistics_enabled, + max_statistics_size: default_col_props.max_statistics_size, + bloom_filter_on_write: default_col_props + .bloom_filter_enabled + .unwrap_or_default(), + bloom_filter_fpp: default_col_props.bloom_filter_fpp, + bloom_filter_ndv: default_col_props.bloom_filter_ndv, + + // not in WriterProperties + enable_page_index: Default::default(), + pruning: Default::default(), + skip_metadata: Default::default(), + metadata_size_hint: Default::default(), + pushdown_filters: Default::default(), + reorder_filters: Default::default(), + allow_single_file_parallelism: Default::default(), + maximum_parallel_row_group_writers: Default::default(), + maximum_buffered_record_batches_per_stream: Default::default(), + bloom_filter_on_read: Default::default(), + }, + column_specific_options: HashMap::from([( + COL_NAME.into(), + configured_col_props, + )]), + key_value_metadata, + } + } + + #[test] + fn table_parquet_opts_to_writer_props() { + // ParquetOptions, all props set to non-default + let parquet_options = parquet_options_with_non_defaults(); + + // TableParquetOptions, using ParquetOptions for global settings + let key = "foo".to_string(); + let value = Some("bar".into()); + let table_parquet_opts = TableParquetOptions { + global: parquet_options.clone(), + column_specific_options: [( + COL_NAME.into(), + column_options_with_non_defaults(&parquet_options), + )] + .into(), + key_value_metadata: [(key.clone(), value.clone())].into(), + }; + + let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts) + .unwrap() + .build(); + assert_eq!( + table_parquet_opts, + session_config_from_writer_props(&writer_props), + "the writer_props should have the same configuration as the session's TableParquetOptions", + ); + } + } }