From 2cc0c167d9725c1b1965980965d637928d887380 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 26 Sep 2024 07:25:37 -0700 Subject: [PATCH] Deprecate methods from footer.rs in favor of `ParquetMetaDataReader` (#6451) * deprecate methods from footer.rs * remove mention of parse_metadata from docs for ParquetMetaData --- parquet/src/arrow/async_reader/metadata.rs | 9 +- parquet/src/arrow/async_reader/mod.rs | 65 +++++-- parquet/src/bin/parquet-concat.rs | 3 +- parquet/src/bin/parquet-layout.rs | 3 +- parquet/src/file/footer.rs | 199 +-------------------- parquet/src/file/metadata/mod.rs | 3 +- parquet/src/file/metadata/writer.rs | 8 +- parquet/src/file/serialized_reader.rs | 5 +- 8 files changed, 70 insertions(+), 225 deletions(-) diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index b4bf77f2608d..cd45d2abdbcd 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -17,8 +17,7 @@ use crate::arrow::async_reader::AsyncFileReader; use crate::errors::{ParquetError, Result}; -use crate::file::footer::{decode_footer, decode_metadata}; -use crate::file::metadata::ParquetMetaData; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; use crate::file::FOOTER_SIZE; @@ -76,7 +75,7 @@ impl MetadataLoader { let mut footer = [0; FOOTER_SIZE]; footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]); - let length = decode_footer(&footer)?; + let length = ParquetMetaDataReader::decode_footer(&footer)?; if file_size < length + FOOTER_SIZE { return Err(ParquetError::EOF(format!( @@ -90,13 +89,13 @@ impl MetadataLoader { let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE { let metadata_start = file_size - length - FOOTER_SIZE; let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; - (decode_metadata(&meta)?, None) + (ParquetMetaDataReader::decode_metadata(&meta)?, None) } else { let metadata_start = file_size - length - FOOTER_SIZE - footer_start; let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE]; ( - decode_metadata(slice)?, + ParquetMetaDataReader::decode_metadata(slice)?, Some((footer_start, suffix.slice(..metadata_start))), ) }; diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 5695dbc10fe1..89e4d6adb552 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -104,8 +104,7 @@ use crate::bloom_filter::{ }; use crate::column::page::{PageIterator, PageReader}; use crate::errors::{ParquetError, Result}; -use crate::file::footer::{decode_footer, decode_metadata}; -use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use crate::file::FOOTER_SIZE; @@ -186,14 +185,14 @@ impl AsyncFileReader for T { let mut buf = [0_u8; FOOTER_SIZE]; self.read_exact(&mut buf).await?; - let metadata_len = decode_footer(&buf)?; + let metadata_len = ParquetMetaDataReader::decode_footer(&buf)?; self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64)) .await?; let mut buf = Vec::with_capacity(metadata_len); self.take(metadata_len as _).read_to_end(&mut buf).await?; - Ok(Arc::new(decode_metadata(&buf)?)) + Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?)) } .boxed() } @@ -909,7 +908,7 @@ mod tests { }; use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::arrow::ArrowWriter; - use crate::file::footer::parse_metadata; + use crate::file::metadata::ParquetMetaDataReader; use crate::file::page_index::index_reader; use crate::file::properties::WriterProperties; use arrow::compute::kernels::cmp::eq; @@ -952,7 +951,9 @@ mod tests { let path = format!("{testdata}/alltypes_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1007,7 +1008,9 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1073,7 +1076,9 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1117,7 +1122,9 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1173,7 +1180,9 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1238,7 +1247,9 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1317,7 +1328,9 @@ mod tests { writer.close().unwrap(); let data: Bytes = buf.into(); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let parquet_schema = metadata.file_metadata().schema_descr_ptr(); let test = TestReader { @@ -1391,7 +1404,9 @@ mod tests { writer.close().unwrap(); let data: Bytes = buf.into(); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); assert_eq!(metadata.num_row_groups(), 2); @@ -1479,7 +1494,9 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let parquet_schema = metadata.file_metadata().schema_descr_ptr(); let metadata = Arc::new(metadata); @@ -1529,7 +1546,9 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let offset_index = index_reader::read_offset_indexes(&data, metadata.row_group(0).columns()) @@ -1619,7 +1638,9 @@ mod tests { let path = format!("{testdata}/alltypes_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let file_rows = metadata.file_metadata().num_rows() as usize; let metadata = Arc::new(metadata); @@ -1764,7 +1785,9 @@ mod tests { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let metadata = Arc::new(metadata); let async_reader = TestReader { data: data.clone(), @@ -1793,7 +1816,9 @@ mod tests { } async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) { - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); @@ -1933,7 +1958,9 @@ mod tests { writer.close().unwrap(); let data: Bytes = buf.into(); - let metadata = parse_metadata(&data).unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); let parquet_schema = metadata.file_metadata().schema_descr_ptr(); let test = TestReader { diff --git a/parquet/src/bin/parquet-concat.rs b/parquet/src/bin/parquet-concat.rs index 9cbdf8e7b399..e8ce4ca1dbed 100644 --- a/parquet/src/bin/parquet-concat.rs +++ b/parquet/src/bin/parquet-concat.rs @@ -39,6 +39,7 @@ use clap::Parser; use parquet::column::writer::ColumnCloseResult; use parquet::errors::{ParquetError, Result}; +use parquet::file::metadata::ParquetMetaDataReader; use parquet::file::properties::WriterProperties; use parquet::file::writer::SerializedFileWriter; use std::fs::File; @@ -70,7 +71,7 @@ impl Args { .iter() .map(|x| { let reader = File::open(x)?; - let metadata = parquet::file::footer::parse_metadata(&reader)?; + let metadata = ParquetMetaDataReader::new().parse_and_finish(&reader)?; Ok((reader, metadata)) }) .collect::>>()?; diff --git a/parquet/src/bin/parquet-layout.rs b/parquet/src/bin/parquet-layout.rs index 79a0acb5f57c..46a231a7d02b 100644 --- a/parquet/src/bin/parquet-layout.rs +++ b/parquet/src/bin/parquet-layout.rs @@ -37,6 +37,7 @@ use std::fs::File; use std::io::Read; use clap::Parser; +use parquet::file::metadata::ParquetMetaDataReader; use serde::Serialize; use thrift::protocol::TCompactInputProtocol; @@ -79,7 +80,7 @@ struct Page { } fn do_layout(reader: &C) -> Result { - let metadata = parquet::file::footer::parse_metadata(reader)?; + let metadata = ParquetMetaDataReader::new().parse_and_finish(reader)?; let schema = metadata.file_metadata().schema_descr(); let row_groups = (0..metadata.num_row_groups()) diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 7a75576c3645..3dd698e3d443 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -15,17 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::{io::Read, sync::Arc}; - -use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; -use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; - -use crate::basic::ColumnOrder; - -use crate::errors::{ParquetError, Result}; -use crate::file::{metadata::*, reader::ChunkReader, FOOTER_SIZE, PARQUET_MAGIC}; - -use crate::schema::types::{self, SchemaDescriptor}; +use crate::errors::Result; +use crate::file::{metadata::*, reader::ChunkReader, FOOTER_SIZE}; /// Reads the [ParquetMetaData] from the footer of the parquet file. /// @@ -51,34 +42,9 @@ use crate::schema::types::{self, SchemaDescriptor}; /// # See Also /// [`decode_metadata`] for decoding the metadata from the bytes. /// [`decode_footer`] for decoding the metadata length from the footer. +#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub fn parse_metadata(chunk_reader: &R) -> Result { - // check file is large enough to hold footer - let file_size = chunk_reader.len(); - if file_size < (FOOTER_SIZE as u64) { - return Err(general_err!( - "Invalid Parquet file. Size is smaller than footer" - )); - } - - let mut footer = [0_u8; 8]; - chunk_reader - .get_read(file_size - 8)? - .read_exact(&mut footer)?; - - let metadata_len = decode_footer(&footer)?; - let footer_metadata_len = FOOTER_SIZE + metadata_len; - - if footer_metadata_len > file_size as usize { - return Err(general_err!( - "Invalid Parquet file. Reported metadata length of {} + {} byte footer, but file is only {} bytes", - metadata_len, - FOOTER_SIZE, - file_size - )); - } - - let start = file_size - footer_metadata_len as u64; - decode_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref()) + ParquetMetaDataReader::new().parse_and_finish(chunk_reader) } /// Decodes [`ParquetMetaData`] from the provided bytes. @@ -88,28 +54,9 @@ pub fn parse_metadata(chunk_reader: &R) -> Result Result { - // TODO: row group filtering - let mut prot = TCompactSliceInputProtocol::new(buf); - let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) - .map_err(|e| ParquetError::General(format!("Could not parse metadata: {e}")))?; - let schema = types::from_thrift(&t_file_metadata.schema)?; - let schema_descr = Arc::new(SchemaDescriptor::new(schema)); - let mut row_groups = Vec::new(); - for rg in t_file_metadata.row_groups { - row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); - } - let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr); - - let file_metadata = FileMetaData::new( - t_file_metadata.version, - t_file_metadata.num_rows, - t_file_metadata.created_by, - t_file_metadata.key_value_metadata, - schema_descr, - column_orders, - ); - Ok(ParquetMetaData::new(file_metadata, row_groups)) + ParquetMetaDataReader::decode_metadata(buf) } /// Decodes the Parquet footer returning the metadata length in bytes @@ -123,137 +70,7 @@ pub fn decode_metadata(buf: &[u8]) -> Result { /// | len | 'PAR1' | /// +-----+--------+ /// ``` +#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader::decode_footer")] pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { - // check this is indeed a parquet file - if slice[4..] != PARQUET_MAGIC { - return Err(general_err!("Invalid Parquet file. Corrupt footer")); - } - - // get the metadata length from the footer - let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap()); - // u32 won't be larger than usize in most cases - Ok(metadata_len as usize) -} - -/// Parses column orders from Thrift definition. -/// If no column orders are defined, returns `None`. -fn parse_column_orders( - t_column_orders: Option>, - schema_descr: &SchemaDescriptor, -) -> Option> { - match t_column_orders { - Some(orders) => { - // Should always be the case - assert_eq!( - orders.len(), - schema_descr.num_columns(), - "Column order length mismatch" - ); - let mut res = Vec::new(); - for (i, column) in schema_descr.columns().iter().enumerate() { - match orders[i] { - TColumnOrder::TYPEORDER(_) => { - let sort_order = ColumnOrder::get_sort_order( - column.logical_type(), - column.converted_type(), - column.physical_type(), - ); - res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); - } - } - } - Some(res) - } - None => None, - } -} - -#[cfg(test)] -mod tests { - use super::*; - use bytes::Bytes; - - use crate::basic::SortOrder; - use crate::basic::Type; - use crate::format::TypeDefinedOrder; - use crate::schema::types::Type as SchemaType; - - #[test] - fn test_parse_metadata_size_smaller_than_footer() { - let test_file = tempfile::tempfile().unwrap(); - let reader_result = parse_metadata(&test_file); - assert_eq!( - reader_result.unwrap_err().to_string(), - "Parquet error: Invalid Parquet file. Size is smaller than footer" - ); - } - - #[test] - fn test_parse_metadata_corrupt_footer() { - let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]); - let reader_result = parse_metadata(&data); - assert_eq!( - reader_result.unwrap_err().to_string(), - "Parquet error: Invalid Parquet file. Corrupt footer" - ); - } - - #[test] - fn test_parse_metadata_invalid_start() { - let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']); - let reader_result = parse_metadata(&test_file); - assert_eq!( - reader_result.unwrap_err().to_string(), - "Parquet error: Invalid Parquet file. Reported metadata length of 255 + 8 byte footer, but file is only 8 bytes" - ); - } - - #[test] - fn test_metadata_column_orders_parse() { - // Define simple schema, we do not need to provide logical types. - let fields = vec![ - Arc::new( - SchemaType::primitive_type_builder("col1", Type::INT32) - .build() - .unwrap(), - ), - Arc::new( - SchemaType::primitive_type_builder("col2", Type::FLOAT) - .build() - .unwrap(), - ), - ]; - let schema = SchemaType::group_type_builder("schema") - .with_fields(fields) - .build() - .unwrap(); - let schema_descr = SchemaDescriptor::new(Arc::new(schema)); - - let t_column_orders = Some(vec![ - TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), - TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), - ]); - - assert_eq!( - parse_column_orders(t_column_orders, &schema_descr), - Some(vec![ - ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), - ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED) - ]) - ); - - // Test when no column orders are defined. - assert_eq!(parse_column_orders(None, &schema_descr), None); - } - - #[test] - #[should_panic(expected = "Column order length mismatch")] - fn test_metadata_column_orders_len_mismatch() { - let schema = SchemaType::group_type_builder("schema").build().unwrap(); - let schema_descr = SchemaDescriptor::new(Arc::new(schema)); - - let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]); - - parse_column_orders(t_column_orders, &schema_descr); - } + ParquetMetaDataReader::decode_footer(slice) } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 75d90a1d8602..05bc261a6649 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -161,10 +161,9 @@ pub type ParquetOffsetIndex = Vec>; /// * [`ParquetColumnIndex`] and [`ParquetOffsetIndex`]: Optional "Page Index" structures (see [`Self::column_index`] and [`Self::offset_index`]) /// /// This structure is read by the various readers in this crate or can be read -/// directly from a file using the [`parse_metadata`] function. +/// directly from a file using the [`ParquetMetaDataReader`] struct. /// /// [`parquet.thrift`]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift -/// [`parse_metadata`]: crate::file::footer::parse_metadata #[derive(Debug, Clone, PartialEq)] pub struct ParquetMetaData { /// File level metadata diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 92ce60556c3e..db78606e42ea 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -384,9 +384,9 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { mod tests { use std::sync::Arc; - use crate::file::footer::parse_metadata; use crate::file::metadata::{ - ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataWriter, RowGroupMetaData, + ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter, + RowGroupMetaData, }; use crate::file::properties::{EnabledStatistics, WriterProperties}; use crate::file::reader::{FileReader, SerializedFileReader}; @@ -428,7 +428,9 @@ mod tests { let data = buf.into_inner().freeze(); - let decoded_metadata = parse_metadata(&data).unwrap(); + let decoded_metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); assert!(!has_page_index(&metadata.metadata)); assert_eq!(metadata.metadata, decoded_metadata); diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index b8ee4001a99c..6fb0f78c1613 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -30,7 +30,6 @@ use crate::errors::{ParquetError, Result}; use crate::file::page_index::index_reader; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::{ - footer, metadata::*, properties::{ReaderProperties, ReaderPropertiesPtr}, reader::*, @@ -180,7 +179,7 @@ impl SerializedFileReader { /// Creates file reader from a Parquet file. /// Returns error if Parquet file does not exist or is corrupt. pub fn new(chunk_reader: R) -> Result { - let metadata = footer::parse_metadata(&chunk_reader)?; + let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?; let props = Arc::new(ReaderProperties::builder().build()); Ok(Self { chunk_reader: Arc::new(chunk_reader), @@ -192,7 +191,7 @@ impl SerializedFileReader { /// Creates file reader from a Parquet file with read options. /// Returns error if Parquet file does not exist or is corrupt. pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result { - let metadata = footer::parse_metadata(&chunk_reader)?; + let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?; let mut predicates = options.predicates; let row_groups = metadata.row_groups().to_vec(); let mut filtered_row_groups = Vec::::new();