Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP(iox-10578): patched df upgrade 202-04-23 #15

Closed
wants to merge 9 commits into from
Closed
73 changes: 71 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,12 +1364,31 @@ impl TableOptions {

/// Options that control how Parquet files are read, including global options
/// that apply to all columns and optional column-specific overrides
///
/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
/// (e.g. sorting_columns).
#[derive(Clone, Default, Debug, PartialEq)]
pub struct TableParquetOptions {
/// Global Parquet options that propagates to all columns.
pub global: ParquetOptions,
/// Column specific options. Default usage is parquet.XX::column.
pub column_specific_options: HashMap<String, ColumnOptions>,
/// Additional file-level metadata to include. Inserted into the key_value_metadata
/// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
///
/// Multiple entries are permitted
/// ```sql
/// OPTIONS (
/// 'format.metadata::key1' '',
/// 'format.metadata::key2' 'value',
/// 'format.metadata::key3' 'value has spaces',
/// 'format.metadata::key4' 'value has special chars :: :',
/// 'format.metadata::key_dupe' 'original will be overwritten',
/// 'format.metadata::key_dupe' 'final'
/// )
/// ```
pub key_value_metadata: HashMap<String, Option<String>>,
}

impl ConfigField for TableParquetOptions {
Expand All @@ -1380,8 +1399,24 @@ impl ConfigField for TableParquetOptions {
}

fn set(&mut self, key: &str, value: &str) -> Result<()> {
// Determine the key if it's a global or column-specific setting
if key.contains("::") {
// Determine if the key is a global, metadata, or column-specific setting
if key.starts_with("metadata::") {
let k =
match key.split("::").collect::<Vec<_>>()[..] {
[_meta] | [_meta, ""] => return Err(DataFusionError::Configuration(
"Invalid metadata key provided, missing key in metadata::<key>"
.to_string(),
)),
[_meta, k] => k.into(),
_ => {
return Err(DataFusionError::Configuration(format!(
"Invalid metadata key provided, found too many '::' in \"{key}\""
)))
}
};
self.key_value_metadata.insert(k, Some(value.into()));
Ok(())
} else if key.contains("::") {
self.column_specific_options.set(key, value)
} else {
self.global.set(key, value)
Expand Down Expand Up @@ -1773,4 +1808,38 @@ mod tests {
.iter()
.any(|item| item.key == "format.bloom_filter_enabled::col1"))
}

#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options_config_metadata_entry() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.set("format.metadata::key1", "").unwrap();
table_config.set("format.metadata::key2", "value2").unwrap();
table_config
.set("format.metadata::key3", "value with spaces ")
.unwrap();
table_config
.set("format.metadata::key4", "value with special chars :: :")
.unwrap();

let parsed_metadata = table_config.parquet.key_value_metadata.clone();
assert_eq!(parsed_metadata.get("should not exist1"), None);
assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
assert_eq!(
parsed_metadata.get("key3"),
Some(&Some("value with spaces ".into()))
);
assert_eq!(
parsed_metadata.get("key4"),
Some(&Some("value with special chars :: :".into()))
);

// duplicate keys are overwritten
table_config.set("format.metadata::key_dupe", "A").unwrap();
table_config.set("format.metadata::key_dupe", "B").unwrap();
let parsed_metadata = table_config.parquet.key_value_metadata;
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
}
}
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ mod tests {
123
);

// properties which remain as default on WriterProperties
assert_eq!(properties.key_value_metadata(), None);
assert_eq!(properties.sorting_columns(), None);

Ok(())
}

Expand Down
104 changes: 73 additions & 31 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

//! Options related to how parquet files should be written

use crate::{config::TableParquetOptions, DataFusionError, Result};
use crate::{
config::{ParquetOptions, TableParquetOptions},
DataFusionError, Result,
};

use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{EnabledStatistics, WriterProperties, WriterVersion},
file::{
metadata::KeyValue,
properties::{EnabledStatistics, WriterProperties, WriterVersion},
},
schema::types::ColumnPath,
};

Expand All @@ -47,53 +53,87 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
type Error = DataFusionError;

fn try_from(parquet_options: &TableParquetOptions) -> Result<Self> {
let parquet_session_options = &parquet_options.global;
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(parquet_session_options.data_pagesize_limit)
.set_write_batch_size(parquet_session_options.write_batch_size)
.set_writer_version(parse_version_string(
&parquet_session_options.writer_version,
)?)
.set_dictionary_page_size_limit(
parquet_session_options.dictionary_page_size_limit,
)
.set_max_row_group_size(parquet_session_options.max_row_group_size)
.set_created_by(parquet_session_options.created_by.clone())
.set_column_index_truncate_length(
parquet_session_options.column_index_truncate_length,
let ParquetOptions {
data_pagesize_limit,
write_batch_size,
writer_version,
dictionary_page_size_limit,
max_row_group_size,
created_by,
column_index_truncate_length,
data_page_row_count_limit,
bloom_filter_enabled,
encoding,
dictionary_enabled,
compression,
statistics_enabled,
max_statistics_size,
bloom_filter_fpp,
bloom_filter_ndv,
// below is not part of ParquetWriterOptions
enable_page_index: _,
pruning: _,
skip_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
reorder_filters: _,
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
} = &parquet_options.global;

let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() {
Some(
parquet_options
.key_value_metadata
.clone()
.drain()
.map(|(key, value)| KeyValue { key, value })
.collect::<Vec<_>>(),
)
.set_data_page_row_count_limit(
parquet_session_options.data_page_row_count_limit,
)
.set_bloom_filter_enabled(parquet_session_options.bloom_filter_enabled);
} else {
None
};

if let Some(encoding) = &parquet_session_options.encoding {
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(*data_pagesize_limit)
.set_write_batch_size(*write_batch_size)
.set_writer_version(parse_version_string(writer_version.as_str())?)
.set_dictionary_page_size_limit(*dictionary_page_size_limit)
.set_max_row_group_size(*max_row_group_size)
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_enabled)
.set_key_value_metadata(key_value_metadata);

if let Some(encoding) = &encoding {
builder = builder.set_encoding(parse_encoding_string(encoding)?);
}

if let Some(enabled) = parquet_session_options.dictionary_enabled {
builder = builder.set_dictionary_enabled(enabled);
if let Some(enabled) = dictionary_enabled {
builder = builder.set_dictionary_enabled(*enabled);
}

if let Some(compression) = &parquet_session_options.compression {
if let Some(compression) = &compression {
builder = builder.set_compression(parse_compression_string(compression)?);
}

if let Some(statistics) = &parquet_session_options.statistics_enabled {
if let Some(statistics) = &statistics_enabled {
builder =
builder.set_statistics_enabled(parse_statistics_string(statistics)?);
}

if let Some(size) = parquet_session_options.max_statistics_size {
builder = builder.set_max_statistics_size(size);
if let Some(size) = max_statistics_size {
builder = builder.set_max_statistics_size(*size);
}

if let Some(fpp) = parquet_session_options.bloom_filter_fpp {
builder = builder.set_bloom_filter_fpp(fpp);
if let Some(fpp) = bloom_filter_fpp {
builder = builder.set_bloom_filter_fpp(*fpp);
}

if let Some(ndv) = parquet_session_options.bloom_filter_ndv {
builder = builder.set_bloom_filter_ndv(ndv);
if let Some(ndv) = bloom_filter_ndv {
builder = builder.set_bloom_filter_ndv(*ndv);
}

for (column, options) in &parquet_options.column_specific_options {
Expand Down Expand Up @@ -141,6 +181,8 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
builder.set_column_max_statistics_size(path, max_statistics_size);
}
}

// ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
Ok(ParquetWriterOptions {
writer_options: builder.build(),
})
Expand Down
29 changes: 26 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ mod tests {
};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::metadata::{KeyValue, ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::page_index::index::Index;
use tokio::fs::File;
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -1865,7 +1865,13 @@ mod tests {
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
TableParquetOptions::default(),
TableParquetOptions {
key_value_metadata: std::collections::HashMap::from([
("my-data".to_string(), Some("stuff".to_string())),
("my-data-bool-key".to_string(), None),
]),
..Default::default()
},
));

// create data
Expand Down Expand Up @@ -1899,7 +1905,10 @@ mod tests {
let (
path,
FileMetaData {
num_rows, schema, ..
num_rows,
schema,
key_value_metadata,
..
},
) = written.take(1).next().unwrap();
let path_parts = path.parts().collect::<Vec<_>>();
Expand All @@ -1915,6 +1924,20 @@ mod tests {
"output file metadata should contain col b"
);

let mut key_value_metadata = key_value_metadata.unwrap();
key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key));
let expected_metadata = vec![
KeyValue {
key: "my-data".to_string(),
value: Some("stuff".to_string()),
},
KeyValue {
key: "my-data-bool-key".to_string(),
value: None,
},
];
assert_eq!(key_value_metadata, expected_metadata);

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
.unwrap()
.unwrap(),
column_specific_options,
key_value_metadata: Default::default(),
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
expr,
substring_from,
substring_for,
special: false,
special: _,
} => self.sql_substring_to_expr(
expr,
substring_from,
Expand Down
Loading
Loading