diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 6d12b26402c36..ac18bcf7b98cb 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -45,7 +45,6 @@ use crate::physical_plan::{ use arrow::compute::sum; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; -use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ @@ -74,9 +73,9 @@ use parquet::arrow::{ arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, }; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::file::writer::SerializedFileWriter; -use parquet::format::FileMetaData; +use parquet::format::{FileMetaData, KeyValue}; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; @@ -788,7 +787,26 @@ impl DataSink for ParquetSink { data: SendableRecordBatchStream, context: &Arc, ) -> Result { - let parquet_props = ParquetWriterOptions::try_from(&self.parquet_options)?; + let mut parquet_opts_builder = + WriterPropertiesBuilder::try_from(&self.parquet_options)?; + if self.parquet_options.global.include_arrow_metadata { + let kv_meta = self + .parquet_options + .key_value_metadata + .to_owned() + .drain() + .map(|(key, value)| KeyValue { key, value }) + .collect::>(); + // let schema = self.config.output_schema(); + + // TODO: make this method public in parquet crate? + // https://github.com/apache/arrow-rs/blob/2908a80d9ca3e3fb0414e35b67856f1fb761304c/parquet/src/arrow/schema/mod.rs#L195 + // add_encoded_arrow_schema_to_metadata(schema, &mut kv_meta); + + parquet_opts_builder = + parquet_opts_builder.set_key_value_metadata(Some(kv_meta)); + }; + let parquet_props = parquet_opts_builder.build(); let object_store = context .runtime_env() @@ -832,7 +850,7 @@ impl DataSink for ParquetSink { .create_async_arrow_writer( &path, Arc::clone(&object_store), - parquet_props.writer_options().clone(), + parquet_props.clone(), ) .await?; let mut reservation = @@ -867,7 +885,7 @@ impl DataSink for ParquetSink { writer, rx, schema, - props.writer_options(), + &props, parallel_options_clone, pool, ) @@ -2389,6 +2407,49 @@ mod tests { Ok(()) } + /// TODO: update this test once parquet method is exposed. + #[tokio::test] + async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> { + // expected kv metadata with schema + let expected_kv_meta = vec![ + KeyValue { key: "ARROW:schema".to_string(), value: Some("/////5QAAAAQAAAAAAAKAAwACgAJAAQACgAAABAAAAAAAQQACAAIAAAABAAIAAAABAAAAAIAAAA8AAAABAAAANz///8UAAAADAAAAAAAAAUMAAAAAAAAAMz///8BAAAAYgAAABAAFAAQAAAADwAEAAAACAAQAAAAGAAAAAwAAAAAAAAFEAAAAAAAAAAEAAQABAAAAAEAAABhAAAA".to_string()) }, + KeyValue { + key: "my-data".to_string(), + value: Some("stuff".to_string()), + }, + KeyValue { + key: "my-data-bool-key".to_string(), + value: None, + }, + ]; + + // single threaded write + let opts = ParquetOptions { + allow_single_file_parallelism: false, + include_arrow_metadata: true, + ..Default::default() + }; + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + let (_, file_metadata) = get_written(parquet_sink)?; + assert_file_metadata(file_metadata, expected_kv_meta.clone()); + + // multithreaded write + let opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 2, + maximum_buffered_record_batches_per_stream: 2, + include_arrow_metadata: true, + ..Default::default() + }; + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + let (_, file_metadata) = get_written(parquet_sink)?; + assert_file_metadata(file_metadata, expected_kv_meta); + + Ok(()) + } + #[tokio::test] async fn parquet_sink_write_with_extension() -> Result<()> { let filename = "test_file.custom_ext";