From 22e0b4432c9838f2536284015271d3de9a165135 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 21 Jun 2024 11:59:05 -0400 Subject: [PATCH] Revert "Write Bloom filters between row groups instead of the end (#5860)" (#5932) This reverts commit 3930d5b056b8157e0b5c5eb01b1076e476acf99b. --- parquet/Cargo.toml | 8 -- parquet/examples/write_parquet.rs | 131 -------------------------- parquet/src/arrow/arrow_writer/mod.rs | 28 +----- parquet/src/arrow/async_writer/mod.rs | 4 +- parquet/src/file/metadata.rs | 5 - parquet/src/file/properties.rs | 36 ------- parquet/src/file/writer.rs | 117 +++++++++-------------- 7 files changed, 52 insertions(+), 277 deletions(-) delete mode 100644 parquet/examples/write_parquet.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index eec7faf09d06..775ac825a2e4 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -67,7 +67,6 @@ hashbrown = { version = "0.14", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } -sysinfo = { version = "0.30.12", optional = true, default-features = false } [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -115,19 +114,12 @@ async = ["futures", "tokio"] object_store = ["dep:object_store", "async"] # Group Zstd dependencies zstd = ["dep:zstd", "zstd-sys"] -# Display memory in example/write_parquet.rs -sysinfo = ["dep:sysinfo"] [[example]] name = "read_parquet" required-features = ["arrow"] path = "./examples/read_parquet.rs" -[[example]] -name = "write_parquet" -required-features = ["cli", "sysinfo"] -path = "./examples/write_parquet.rs" - [[example]] name = "async_read_parquet" required-features = ["arrow", "async"] diff --git a/parquet/examples/write_parquet.rs b/parquet/examples/write_parquet.rs deleted file mode 100644 index d2ef550df840..000000000000 --- a/parquet/examples/write_parquet.rs +++ /dev/null @@ -1,131 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::fs::File; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::{Duration, Instant}; - -use arrow::array::{StructArray, UInt64Builder}; -use arrow::datatypes::DataType::UInt64; -use arrow::datatypes::{Field, Schema}; -use clap::{Parser, ValueEnum}; -use parquet::arrow::ArrowWriter as ParquetWriter; -use parquet::basic::Encoding; -use parquet::errors::Result; -use parquet::file::properties::{BloomFilterPosition, WriterProperties}; -use sysinfo::{MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System}; - -#[derive(ValueEnum, Clone)] -enum BloomFilterPositionArg { - End, - AfterRowGroup, -} - -#[derive(Parser)] -#[command(version)] -/// Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage. -struct Args { - #[arg(long, default_value_t = 1000)] - /// Number of batches to write - iterations: u64, - - #[arg(long, default_value_t = 1000000)] - /// Number of rows in each batch - batch: u64, - - #[arg(long, value_enum, default_value_t=BloomFilterPositionArg::AfterRowGroup)] - /// Where to write Bloom Filters - bloom_filter_position: BloomFilterPositionArg, - - /// Path to the file to write - path: PathBuf, -} - -fn now() -> String { - chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string() -} - -fn mem(system: &mut System) -> String { - let pid = Pid::from(std::process::id() as usize); - system.refresh_process_specifics(pid, ProcessRefreshKind::new().with_memory()); - system - .process(pid) - .map(|proc| format!("{}MB", proc.memory() / 1_000_000)) - .unwrap_or("N/A".to_string()) -} - -fn main() -> Result<()> { - let args = Args::parse(); - - let bloom_filter_position = match args.bloom_filter_position { - BloomFilterPositionArg::End => BloomFilterPosition::End, - BloomFilterPositionArg::AfterRowGroup => BloomFilterPosition::AfterRowGroup, - }; - - let properties = WriterProperties::builder() - .set_column_bloom_filter_enabled("id".into(), true) - .set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED) - .set_bloom_filter_position(bloom_filter_position) - .build(); - let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)])); - // Create parquet file that will be read. - let file = File::create(args.path).unwrap(); - let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?; - - let mut system = - System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything())); - eprintln!( - "{} Writing {} batches of {} rows. RSS = {}", - now(), - args.iterations, - args.batch, - mem(&mut system) - ); - - let mut array_builder = UInt64Builder::new(); - let mut last_log = Instant::now(); - for i in 0..args.iterations { - if Instant::now() - last_log > Duration::new(10, 0) { - last_log = Instant::now(); - eprintln!( - "{} Iteration {}/{}. RSS = {}", - now(), - i + 1, - args.iterations, - mem(&mut system) - ); - } - for j in 0..args.batch { - array_builder.append_value(i + j); - } - writer.write( - &StructArray::new( - schema.fields().clone(), - vec![Arc::new(array_builder.finish())], - None, - ) - .into(), - )?; - } - writer.flush()?; - writer.close()?; - - eprintln!("{} Done. RSS = {}", now(), mem(&mut system)); - - Ok(()) -} diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 800751ff964b..0beb93f80a5f 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -43,7 +43,7 @@ use crate::column::writer::{ }; use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData}; +use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr}; use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; @@ -199,7 +199,7 @@ impl ArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { self.writer.flushed_row_groups() } @@ -1053,9 +1053,7 @@ mod tests { use crate::file::metadata::ParquetMetaData; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::read_pages_locations; - use crate::file::properties::{ - BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion, - }; + use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion}; use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ reader::{FileReader, SerializedFileReader}, @@ -1703,7 +1701,6 @@ mod tests { values: ArrayRef, schema: SchemaRef, bloom_filter: bool, - bloom_filter_position: BloomFilterPosition, } impl RoundTripOptions { @@ -1714,7 +1711,6 @@ mod tests { values, schema: Arc::new(schema), bloom_filter: false, - bloom_filter_position: BloomFilterPosition::AfterRowGroup, } } } @@ -1734,7 +1730,6 @@ mod tests { values, schema, bloom_filter, - bloom_filter_position, } = options; let encodings = match values.data_type() { @@ -1775,7 +1770,6 @@ mod tests { .set_dictionary_page_size_limit(dictionary_size.max(1)) .set_encoding(*encoding) .set_bloom_filter_enabled(bloom_filter) - .set_bloom_filter_position(bloom_filter_position) .build(); files.push(roundtrip_opts(&expected_batch, props)) @@ -2133,22 +2127,6 @@ mod tests { values_required::(many_vecs_iter); } - #[test] - fn i32_column_bloom_filter_at_end() { - let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); - let mut options = RoundTripOptions::new(array, false); - options.bloom_filter = true; - options.bloom_filter_position = BloomFilterPosition::End; - - let files = one_column_roundtrip_with_options(options); - check_bloom_filter( - files, - "col".to_string(), - (0..SMALL_SIZE as i32).collect(), - (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(), - ); - } - #[test] fn i32_column_bloom_filter() { let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 28efbdc7c66e..0bedf1fcb731 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -54,7 +54,7 @@ use crate::{ arrow::arrow_writer::ArrowWriterOptions, arrow::ArrowWriter, errors::{ParquetError, Result}, - file::{metadata::RowGroupMetaData, properties::WriterProperties}, + file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties}, format::{FileMetaData, KeyValue}, }; use arrow_array::RecordBatch; @@ -172,7 +172,7 @@ impl AsyncArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { self.sync_writer.flushed_row_groups() } diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index 255fe1b7b253..fb8f798fd3ac 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -333,11 +333,6 @@ impl RowGroupMetaData { &self.columns } - /// Returns mutable slice of column chunk metadata. - pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] { - &mut self.columns - } - /// Number of rows in this row group. pub fn num_rows(&self) -> i64 { self.num_rows diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 7fc73bd56fe2..87d84cef80aa 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -43,8 +43,6 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; /// Default value for [`WriterProperties::max_row_group_size`] pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; -/// Default value for [`WriterProperties::bloom_filter_position`] -pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup; /// Default value for [`WriterProperties::created_by`] pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION")); /// Default value for [`WriterProperties::column_index_truncate_length`] @@ -88,24 +86,6 @@ impl FromStr for WriterVersion { } } -/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should -/// write Bloom filters -/// -/// Basic constant, which is not part of the Thrift definition. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum BloomFilterPosition { - /// Write Bloom Filters of each row group right after the row group - /// - /// This saves memory by writing it as soon as it is computed, at the cost - /// of data locality for readers - AfterRowGroup, - /// Write Bloom Filters at the end of the file - /// - /// This allows better data locality for readers, at the cost of memory usage - /// for writers. - End, -} - /// Reference counted writer properties. pub type WriterPropertiesPtr = Arc; @@ -150,7 +130,6 @@ pub struct WriterProperties { data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, - bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, pub(crate) key_value_metadata: Option>, @@ -238,11 +217,6 @@ impl WriterProperties { self.max_row_group_size } - /// Returns maximum number of rows in a row group. - pub fn bloom_filter_position(&self) -> BloomFilterPosition { - self.bloom_filter_position - } - /// Returns configured writer version. pub fn writer_version(&self) -> WriterVersion { self.writer_version @@ -364,7 +338,6 @@ pub struct WriterPropertiesBuilder { data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, - bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, key_value_metadata: Option>, @@ -384,7 +357,6 @@ impl WriterPropertiesBuilder { data_page_row_count_limit: usize::MAX, write_batch_size: DEFAULT_WRITE_BATCH_SIZE, max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, - bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION, writer_version: DEFAULT_WRITER_VERSION, created_by: DEFAULT_CREATED_BY.to_string(), key_value_metadata: None, @@ -404,7 +376,6 @@ impl WriterPropertiesBuilder { data_page_row_count_limit: self.data_page_row_count_limit, write_batch_size: self.write_batch_size, max_row_group_size: self.max_row_group_size, - bloom_filter_position: self.bloom_filter_position, writer_version: self.writer_version, created_by: self.created_by, key_value_metadata: self.key_value_metadata, @@ -516,12 +487,6 @@ impl WriterPropertiesBuilder { self } - /// Sets where in the final file Bloom Filters are written (default `AfterRowGroup`) - pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self { - self.bloom_filter_position = value; - self - } - /// Sets "created by" property (defaults to `parquet-rs version `). pub fn set_created_by(mut self, value: String) -> Self { self.created_by = value; @@ -1087,7 +1052,6 @@ mod tests { ); assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE); assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE); - assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION); assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION); assert_eq!(props.created_by(), DEFAULT_CREATED_BY); assert_eq!(props.key_value_metadata(), None); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index eb633f31c477..7806384cdb52 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -34,9 +34,8 @@ use crate::column::{ }; use crate::data_type::DataType; use crate::errors::{ParquetError, Result}; -use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr}; use crate::file::reader::ChunkReader; -use crate::file::{metadata::*, PARQUET_MAGIC}; +use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC}; use crate::schema::types::{self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr}; /// A wrapper around a [`Write`] that keeps track of the number @@ -116,10 +115,9 @@ pub type OnCloseColumnChunk<'a> = Box Result<() /// - the row group metadata /// - the column index for each column chunk /// - the offset index for each column chunk -pub type OnCloseRowGroup<'a, W> = Box< +pub type OnCloseRowGroup<'a> = Box< dyn FnOnce( - &'a mut TrackedWrite, - RowGroupMetaData, + RowGroupMetaDataPtr, Vec>, Vec>, Vec>, @@ -145,7 +143,7 @@ pub struct SerializedFileWriter { schema: TypePtr, descr: SchemaDescPtr, props: WriterPropertiesPtr, - row_groups: Vec, + row_groups: Vec, bloom_filters: Vec>>, column_indexes: Vec>>, offset_indexes: Vec>>, @@ -199,29 +197,18 @@ impl SerializedFileWriter { self.row_group_index += 1; - let bloom_filter_position = self.properties().bloom_filter_position(); let row_groups = &mut self.row_groups; let row_bloom_filters = &mut self.bloom_filters; let row_column_indexes = &mut self.column_indexes; let row_offset_indexes = &mut self.offset_indexes; - let on_close = move |buf, - mut metadata, - row_group_bloom_filter, - row_group_column_index, - row_group_offset_index| { - row_bloom_filters.push(row_group_bloom_filter); - row_column_indexes.push(row_group_column_index); - row_offset_indexes.push(row_group_offset_index); - // write bloom filters out immediately after the row group if requested - match bloom_filter_position { - BloomFilterPosition::AfterRowGroup => { - write_bloom_filters(buf, row_bloom_filters, &mut metadata)? - } - BloomFilterPosition::End => (), + let on_close = + |metadata, row_group_bloom_filter, row_group_column_index, row_group_offset_index| { + row_groups.push(metadata); + row_bloom_filters.push(row_group_bloom_filter); + row_column_indexes.push(row_group_column_index); + row_offset_indexes.push(row_group_offset_index); + Ok(()) }; - row_groups.push(metadata); - Ok(()) - }; let row_group_writer = SerializedRowGroupWriter::new( self.descr.clone(), @@ -234,7 +221,7 @@ impl SerializedFileWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { &self.row_groups } @@ -286,6 +273,34 @@ impl SerializedFileWriter { Ok(()) } + /// Serialize all the bloom filter to the file + fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { + // iter row group + // iter each column + // write bloom filter to the file + for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { + for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() { + match &self.bloom_filters[row_group_idx][column_idx] { + Some(bloom_filter) => { + let start_offset = self.buf.bytes_written(); + bloom_filter.write(&mut self.buf)?; + let end_offset = self.buf.bytes_written(); + // set offset and index for bloom filter + let column_chunk_meta = column_chunk + .meta_data + .as_mut() + .expect("can't have bloom filter without column metadata"); + column_chunk_meta.bloom_filter_offset = Some(start_offset as i64); + column_chunk_meta.bloom_filter_length = + Some((end_offset - start_offset) as i32); + } + None => {} + } + } + } + Ok(()) + } + /// Serialize all the column index to the file fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { // iter row group @@ -316,11 +331,6 @@ impl SerializedFileWriter { self.finished = true; let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); - // write out any remaining bloom filters after all row groups - for row_group in &mut self.row_groups { - write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?; - } - let mut row_groups = self .row_groups .as_slice() @@ -328,6 +338,7 @@ impl SerializedFileWriter { .map(|v| v.to_thrift()) .collect::>(); + self.write_bloom_filters(&mut row_groups)?; // Write column indexes and offset indexes self.write_column_indexes(&mut row_groups)?; self.write_offset_indexes(&mut row_groups)?; @@ -432,40 +443,6 @@ impl SerializedFileWriter { } } -/// Serialize all the bloom filters of the given row group to the given buffer, -/// and returns the updated row group metadata. -fn write_bloom_filters( - buf: &mut TrackedWrite, - bloom_filters: &mut [Vec>], - row_group: &mut RowGroupMetaData, -) -> Result<()> { - // iter row group - // iter each column - // write bloom filter to the file - - let row_group_idx: u16 = row_group - .ordinal() - .expect("Missing row group ordinal") - .try_into() - .expect("Negative row group ordinal"); - let row_group_idx = row_group_idx as usize; - for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() { - if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() { - let start_offset = buf.bytes_written(); - bloom_filter.write(&mut *buf)?; - let end_offset = buf.bytes_written(); - // set offset and index for bloom filter - *column_chunk = column_chunk - .clone() - .into_builder() - .set_bloom_filter_offset(Some(start_offset as i64)) - .set_bloom_filter_length(Some((end_offset - start_offset) as i32)) - .build()?; - } - } - Ok(()) -} - /// Parquet row group writer API. /// Provides methods to access column writers in an iterator-like fashion, order is /// guaranteed to match the order of schema leaves (column descriptors). @@ -491,7 +468,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { offset_indexes: Vec>, row_group_index: i16, file_offset: i64, - on_close: Option>, + on_close: Option>, } impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { @@ -508,7 +485,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { properties: WriterPropertiesPtr, buf: &'a mut TrackedWrite, row_group_index: i16, - on_close: Option>, + on_close: Option>, ) -> Self { let num_columns = schema_descr.num_columns(); let file_offset = buf.bytes_written() as i64; @@ -692,12 +669,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_file_offset(self.file_offset) .build()?; - self.row_group_metadata = Some(Arc::new(row_group_metadata.clone())); + let metadata = Arc::new(row_group_metadata); + self.row_group_metadata = Some(metadata.clone()); if let Some(on_close) = self.on_close.take() { on_close( - self.buf, - row_group_metadata, + metadata, self.bloom_filters, self.column_indexes, self.offset_indexes, @@ -1469,7 +1446,7 @@ mod tests { assert_eq!(flushed.len(), idx + 1); assert_eq!(Some(idx as i16), last_group.ordinal()); assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset()); - assert_eq!(&flushed[idx], last_group.as_ref()); + assert_eq!(flushed[idx].as_ref(), last_group.as_ref()); } let file_metadata = file_writer.close().unwrap();