Skip to content

Commit

Permalink
fix(11770): fix parallel ParquetSink to encode arrow schema into the …
Browse files Browse the repository at this point in the history
…file metadata, based on the ParquetOptions
  • Loading branch information
wiedld committed Dec 21, 2024
1 parent da88cec commit 013b098
Showing 1 changed file with 38 additions and 4 deletions.
42 changes: 38 additions & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::physical_plan::{

use arrow::compute::sum;
use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{
Expand Down Expand Up @@ -792,7 +791,16 @@ impl DataSink for ParquetSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let parquet_props = ParquetWriterOptions::try_from(&self.parquet_options)?;
let parquet_props = if !self.parquet_options.global.skip_arrow_metadata {
let schema = self.config.output_schema();
self.parquet_options
.into_writer_properties_builder_with_arrow_schema(Some(schema))?
.build()
} else {
self.parquet_options
.into_writer_properties_builder()?
.build()
};

let object_store = context
.runtime_env()
Expand Down Expand Up @@ -836,7 +844,7 @@ impl DataSink for ParquetSink {
.create_async_arrow_writer(
&path,
Arc::clone(&object_store),
parquet_props.writer_options().clone(),
parquet_props.clone(),
)
.await?;
let mut reservation =
Expand Down Expand Up @@ -871,7 +879,7 @@ impl DataSink for ParquetSink {
writer,
rx,
schema,
props.writer_options(),
&props,
parallel_options_clone,
pool,
)
Expand Down Expand Up @@ -2438,6 +2446,32 @@ mod tests {
let (_, file_metadata) = get_written(parquet_sink)?;
assert_file_metadata(file_metadata, expected_with.clone());

// multithreaded write, skip insert
let opts = ParquetOptions {
allow_single_file_parallelism: true,
maximum_parallel_row_group_writers: 2,
maximum_buffered_record_batches_per_stream: 2,
skip_arrow_metadata: true,
..Default::default()
};
let parquet_sink =
create_written_parquet_sink_using_config("file:///", opts).await?;
let (_, file_metadata) = get_written(parquet_sink)?;
assert_file_metadata(file_metadata, expected_without);

// multithreaded write, do not skip insert
let opts = ParquetOptions {
allow_single_file_parallelism: true,
maximum_parallel_row_group_writers: 2,
maximum_buffered_record_batches_per_stream: 2,
skip_arrow_metadata: false,
..Default::default()
};
let parquet_sink =
create_written_parquet_sink_using_config("file:///", opts).await?;
let (_, file_metadata) = get_written(parquet_sink)?;
assert_file_metadata(file_metadata, expected_with);

Ok(())
}

Expand Down

0 comments on commit 013b098

Please sign in to comment.