diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 968d8215ca4d..aa597c3be59e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1368,6 +1368,9 @@ pub struct TableParquetOptions { pub global: ParquetOptions, /// Column specific options. Default usage is parquet.XX::column. pub column_specific_options: HashMap, + /// Additional metadata to be inserted into the key_value_metadata + /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html). + pub key_value_metadata: HashMap>, } impl ConfigField for TableParquetOptions { @@ -1381,6 +1384,20 @@ impl ConfigField for TableParquetOptions { // Determine the key if it's a global or column-specific setting if key.contains("::") { self.column_specific_options.set(key, value) + } else if key.eq("metadata") { + for maybe_pair in value.split('_') { + let (k, v) = match maybe_pair.split(':').collect::>()[..] { + [k, v] => (k.into(), Some(v.into())), + [k] => (k.into(), None), + _ => { + return Err(DataFusionError::Configuration(format!( + "Invalid metadata provided \"{maybe_pair}\"" + ))) + } + }; + self.key_value_metadata.insert(k, v); + } + Ok(()) } else { self.global.set(key, value) } diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index eb1ce1b364fd..a760619a7ba8 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -124,6 +124,10 @@ mod tests { 123 ); + // properties which remain as default on WriterProperties + assert_eq!(properties.key_value_metadata(), None); + assert_eq!(properties.sorting_columns(), None); + Ok(()) } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 28e73ba48f53..4958246f54b4 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -17,11 +17,17 @@ //! Options related to how parquet files should be written -use crate::{config::TableParquetOptions, DataFusionError, Result}; +use crate::{ + config::{ParquetOptions, TableParquetOptions}, + DataFusionError, Result, +}; use parquet::{ basic::{BrotliLevel, GzipLevel, ZstdLevel}, - file::properties::{EnabledStatistics, WriterProperties, WriterVersion}, + file::{ + metadata::KeyValue, + properties::{EnabledStatistics, WriterProperties, WriterVersion}, + }, schema::types::ColumnPath, }; @@ -47,53 +53,87 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { type Error = DataFusionError; fn try_from(parquet_options: &TableParquetOptions) -> Result { - let parquet_session_options = &parquet_options.global; - let mut builder = WriterProperties::builder() - .set_data_page_size_limit(parquet_session_options.data_pagesize_limit) - .set_write_batch_size(parquet_session_options.write_batch_size) - .set_writer_version(parse_version_string( - &parquet_session_options.writer_version, - )?) - .set_dictionary_page_size_limit( - parquet_session_options.dictionary_page_size_limit, - ) - .set_max_row_group_size(parquet_session_options.max_row_group_size) - .set_created_by(parquet_session_options.created_by.clone()) - .set_column_index_truncate_length( - parquet_session_options.column_index_truncate_length, + let ParquetOptions { + data_pagesize_limit, + write_batch_size, + writer_version, + dictionary_page_size_limit, + max_row_group_size, + created_by, + column_index_truncate_length, + data_page_row_count_limit, + bloom_filter_enabled, + encoding, + dictionary_enabled, + compression, + statistics_enabled, + max_statistics_size, + bloom_filter_fpp, + bloom_filter_ndv, + // below is not part of ParquetWriterOptions + enable_page_index: _, + pruning: _, + skip_metadata: _, + metadata_size_hint: _, + pushdown_filters: _, + reorder_filters: _, + allow_single_file_parallelism: _, + maximum_parallel_row_group_writers: _, + maximum_buffered_record_batches_per_stream: _, + } = &parquet_options.global; + + let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() { + Some( + parquet_options + .key_value_metadata + .clone() + .drain() + .map(|(key, value)| KeyValue { key, value }) + .collect::>(), ) - .set_data_page_row_count_limit( - parquet_session_options.data_page_row_count_limit, - ) - .set_bloom_filter_enabled(parquet_session_options.bloom_filter_enabled); + } else { + None + }; - if let Some(encoding) = &parquet_session_options.encoding { + let mut builder = WriterProperties::builder() + .set_data_page_size_limit(*data_pagesize_limit) + .set_write_batch_size(*write_batch_size) + .set_writer_version(parse_version_string(writer_version.as_str())?) + .set_dictionary_page_size_limit(*dictionary_page_size_limit) + .set_max_row_group_size(*max_row_group_size) + .set_created_by(created_by.clone()) + .set_column_index_truncate_length(*column_index_truncate_length) + .set_data_page_row_count_limit(*data_page_row_count_limit) + .set_bloom_filter_enabled(*bloom_filter_enabled) + .set_key_value_metadata(key_value_metadata); + + if let Some(encoding) = &encoding { builder = builder.set_encoding(parse_encoding_string(encoding)?); } - if let Some(enabled) = parquet_session_options.dictionary_enabled { - builder = builder.set_dictionary_enabled(enabled); + if let Some(enabled) = dictionary_enabled { + builder = builder.set_dictionary_enabled(*enabled); } - if let Some(compression) = &parquet_session_options.compression { + if let Some(compression) = &compression { builder = builder.set_compression(parse_compression_string(compression)?); } - if let Some(statistics) = &parquet_session_options.statistics_enabled { + if let Some(statistics) = &statistics_enabled { builder = builder.set_statistics_enabled(parse_statistics_string(statistics)?); } - if let Some(size) = parquet_session_options.max_statistics_size { - builder = builder.set_max_statistics_size(size); + if let Some(size) = max_statistics_size { + builder = builder.set_max_statistics_size(*size); } - if let Some(fpp) = parquet_session_options.bloom_filter_fpp { - builder = builder.set_bloom_filter_fpp(fpp); + if let Some(fpp) = bloom_filter_fpp { + builder = builder.set_bloom_filter_fpp(*fpp); } - if let Some(ndv) = parquet_session_options.bloom_filter_ndv { - builder = builder.set_bloom_filter_ndv(ndv); + if let Some(ndv) = bloom_filter_ndv { + builder = builder.set_bloom_filter_ndv(*ndv); } for (column, options) in &parquet_options.column_specific_options { @@ -141,6 +181,8 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { builder.set_column_max_statistics_size(path, max_statistics_size); } } + + // ParquetWriterOptions will have defaults for the remaining fields (e.g. key_value_metadata & sorting_columns) Ok(ParquetWriterOptions { writer_options: builder.build(), }) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index bcf4e8a2c8e4..a52ef99af044 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1865,7 +1865,13 @@ mod tests { }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, - TableParquetOptions::default(), + TableParquetOptions { + key_value_metadata: std::collections::HashMap::from([( + "my-data".to_string(), + Some("stuff".to_string()), + )]), + ..Default::default() + }, )); // create data @@ -1899,7 +1905,10 @@ mod tests { let ( path, FileMetaData { - num_rows, schema, .. + num_rows, + schema, + key_value_metadata, + .. }, ) = written.take(1).next().unwrap(); let path_parts = path.parts().collect::>(); @@ -1915,6 +1924,13 @@ mod tests { "output file metadata should contain col b" ); + let key_value_metadata = key_value_metadata.unwrap(); + let my_metadata = key_value_metadata + .iter() + .filter(|kv| kv.key == "my-data") + .collect::>(); + assert_eq!(my_metadata.len(), 1); + Ok(()) } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index aaca4dc48236..48975b833f26 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -974,6 +974,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { .unwrap() .unwrap(), column_specific_options, + key_value_metadata: Default::default(), }) } } diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index fca892dfcdad..230f157026d4 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -283,11 +283,38 @@ OPTIONS ( 'format.statistics_enabled::col2' none, 'format.max_statistics_size' 123, 'format.bloom_filter_fpp' 0.001, -'format.bloom_filter_ndv' 100 +'format.bloom_filter_ndv' 100, +'format.metadata' 'foo:bar baz' ) ---- 2 +# valid vs invalid metadata + +statement ok +COPY source_table +TO 'test_files/scratch/copy/table_with_metadata/' +STORED AS PARQUET +OPTIONS ( +'format.metadata' '' +) + +statement error +COPY source_table +TO 'test_files/scratch/copy/table_with_metadata/' +STORED AS PARQUET +OPTIONS ( +'format.metadata' 'foo:bar:extra' +) + +statement error +COPY source_table +TO 'test_files/scratch/copy/table_with_metadata/' +STORED AS PARQUET +OPTIONS ( +'format.wrong-metadata-key' 'foo:bar baz' +) + # validate multiple parquet file output with all options set statement ok CREATE EXTERNAL TABLE validate_parquet_with_options STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_with_options/';