Skip to content

Commit

Permalink
test: demonstrate the relationship btwn session configs and writer props
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Jul 12, 2024
1 parent 15fea6d commit 15a9fec
Showing 1 changed file with 189 additions and 0 deletions.
189 changes: 189 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2042,4 +2042,193 @@ mod tests {
let parsed_metadata = table_config.parquet.key_value_metadata;
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
}

#[cfg(feature = "parquet")]
mod test_conversion_from_session_to_writer_props {
use super::super::*;
use parquet::{basic::Compression, file::properties::EnabledStatistics};

const COL_NAME: &str = "configured";

/// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config.
fn column_options_with_non_defaults(
src_col_defaults: &ParquetOptions,
) -> ColumnOptions {
ColumnOptions {
compression: Some("zstd(22)".into()),
dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
statistics_enabled: Some("page".into()),
max_statistics_size: Some(72),
encoding: Some("RLE".into()),
bloom_filter_enabled: Some(true),
bloom_filter_fpp: Some(0.72),
bloom_filter_ndv: Some(72),
}
}

fn parquet_options_with_non_defaults() -> ParquetOptions {
let defaults = ParquetOptions::default();
let writer_version = if defaults.writer_version.eq("1.0") {
"2.0"
} else {
"1.0"
};

ParquetOptions {
data_pagesize_limit: 42,
write_batch_size: 42,
writer_version: writer_version.into(),
compression: Some("zstd(22)".into()),
dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
dictionary_page_size_limit: 42,
statistics_enabled: Some("chunk".into()),
max_statistics_size: Some(42),
max_row_group_size: 42,
created_by: "wordy".into(),
column_index_truncate_length: Some(42),
data_page_row_count_limit: 42,
encoding: Some("BYTE_STREAM_SPLIT".into()),
bloom_filter_on_write: !defaults.bloom_filter_on_write,
bloom_filter_fpp: Some(0.42),
bloom_filter_ndv: Some(42),

// not in WriterProperties, but itemizing here to not skip newly added props
enable_page_index: Default::default(),
pruning: Default::default(),
skip_metadata: Default::default(),
metadata_size_hint: Default::default(),
pushdown_filters: Default::default(),
reorder_filters: Default::default(),
allow_single_file_parallelism: Default::default(),
maximum_parallel_row_group_writers: Default::default(),
maximum_buffered_record_batches_per_stream: Default::default(),
bloom_filter_on_read: Default::default(),
}
}

fn extract_column_options(
props: &WriterProperties,
col: ColumnPath,
) -> ColumnOptions {
let bloom_filter_default_props = props.bloom_filter_properties(&col);

ColumnOptions {
bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
encoding: props.encoding(&col).map(|s| s.to_string()),
dictionary_enabled: Some(props.dictionary_enabled(&col)),
compression: match props.compression(&col) {
Compression::ZSTD(lvl) => {
Some(format!("zstd({})", lvl.compression_level()))
}
_ => None,
},
statistics_enabled: Some(
match props.statistics_enabled(&col) {
EnabledStatistics::None => "none",
EnabledStatistics::Chunk => "chunk",
EnabledStatistics::Page => "page",
}
.into(),
),
bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
max_statistics_size: Some(props.max_statistics_size(&col)),
}
}

/// For testing only, take a single write's props and convert back into the session config.
/// (use identity to confirm correct.)
fn session_config_from_writer_props(
props: &WriterProperties,
) -> TableParquetOptions {
let default_col = ColumnPath::from("col doesn't have specific config");
let default_col_props = extract_column_options(props, default_col);

let configured_col = ColumnPath::from(COL_NAME);
let configured_col_props = extract_column_options(props, configured_col);

let key_value_metadata = props
.key_value_metadata()
.map(|pairs| {
HashMap::from_iter(
pairs
.iter()
.cloned()
.map(|KeyValue { key, value }| (key, value)),
)
})
.unwrap_or_default();

TableParquetOptions {
global: ParquetOptions {
// global options
data_pagesize_limit: props.dictionary_page_size_limit(),
write_batch_size: props.write_batch_size(),
writer_version: format!("{}.0", props.writer_version().as_num()),
dictionary_page_size_limit: props.dictionary_page_size_limit(),
max_row_group_size: props.max_row_group_size(),
created_by: props.created_by().to_string(),
column_index_truncate_length: props.column_index_truncate_length(),
data_page_row_count_limit: props.data_page_row_count_limit(),

// global options which set the default column props
encoding: default_col_props.encoding,
compression: default_col_props.compression,
dictionary_enabled: default_col_props.dictionary_enabled,
statistics_enabled: default_col_props.statistics_enabled,
max_statistics_size: default_col_props.max_statistics_size,
bloom_filter_on_write: default_col_props
.bloom_filter_enabled
.unwrap_or_default(),
bloom_filter_fpp: default_col_props.bloom_filter_fpp,
bloom_filter_ndv: default_col_props.bloom_filter_ndv,

// not in WriterProperties
enable_page_index: Default::default(),
pruning: Default::default(),
skip_metadata: Default::default(),
metadata_size_hint: Default::default(),
pushdown_filters: Default::default(),
reorder_filters: Default::default(),
allow_single_file_parallelism: Default::default(),
maximum_parallel_row_group_writers: Default::default(),
maximum_buffered_record_batches_per_stream: Default::default(),
bloom_filter_on_read: Default::default(),
},
column_specific_options: HashMap::from([(
COL_NAME.into(),
configured_col_props,
)]),
key_value_metadata,
}
}

#[test]
fn table_parquet_opts_to_writer_props() {
// ParquetOptions, all props set to non-default
let parquet_options = parquet_options_with_non_defaults();

// TableParquetOptions, using ParquetOptions for global settings
let key = "foo".to_string();
let value = Some("bar".into());
let table_parquet_opts = TableParquetOptions {
global: parquet_options.clone(),
column_specific_options: [(
COL_NAME.into(),
column_options_with_non_defaults(&parquet_options),
)]
.into(),
key_value_metadata: [(key.clone(), value.clone())].into(),
};

let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
.unwrap()
.build();
assert_eq!(
table_parquet_opts,
session_config_from_writer_props(&writer_props),
"the writer_props should have the same configuration as the session's TableParquetOptions",
);
}
}
}

0 comments on commit 15a9fec

Please sign in to comment.