From c039762be4d1472897b7902a49e3e8e5e4e3d095 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 3 Oct 2024 13:35:32 -0700 Subject: [PATCH] Detect missing page indexes while reading Parquet metadata (#6507) * detect missing page indexes on read * lint * fix overlap test * add some clarifying comments per review suggestion --- parquet/src/arrow/mod.rs | 60 +++++++++++++++++++++++++++++ parquet/src/file/metadata/reader.rs | 34 +++++++++++++--- 2 files changed, 88 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index afe7ed1ebec9..737843326825 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -236,3 +236,63 @@ pub fn parquet_column<'a>( .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?; Some((parquet_idx, field)) } + +#[cfg(test)] +mod test { + use crate::arrow::ArrowWriter; + use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter}; + use crate::file::properties::{EnabledStatistics, WriterProperties}; + use arrow_array::{ArrayRef, Int32Array, RecordBatch}; + use bytes::Bytes; + use std::sync::Arc; + + #[test] + // Reproducer for https://github.com/apache/arrow-rs/issues/6464 + fn test_metadata_read_write_partial_offset() { + let parquet_bytes = create_parquet_file(); + + // read the metadata from the file WITHOUT the page index structures + let original_metadata = ParquetMetaDataReader::new() + .parse_and_finish(&parquet_bytes) + .unwrap(); + + // this should error because the page indexes are not present, but have offsets specified + let metadata_bytes = metadata_to_bytes(&original_metadata); + let err = ParquetMetaDataReader::new() + .with_page_indexes(true) // there are no page indexes in the metadata + .parse_and_finish(&metadata_bytes) + .err() + .unwrap(); + assert_eq!( + err.to_string(), + "EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..341" + ); + } + + /// Write a parquet filed into an in memory buffer + fn create_parquet_file() -> Bytes { + let mut buf = vec![]; + let data = vec![100, 200, 201, 300, 102, 33]; + let array: ArrayRef = Arc::new(Int32Array::from(data)); + let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap(); + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .build(); + + let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + drop(writer); + + Bytes::from(buf) + } + + /// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter + fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes { + let mut buf = vec![]; + ParquetMetaDataWriter::new(&mut buf, metadata) + .finish() + .unwrap(); + Bytes::from(buf) + } +} diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 88f53d02f3f4..ec4b97bb5727 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -58,6 +58,9 @@ pub struct ParquetMetaDataReader { column_index: bool, offset_index: bool, prefetch_hint: Option, + // Size of the serialized thrift metadata plus the 8 byte footer. Only set if + // `self.parse_metadata` is called. + metadata_size: Option, } impl ParquetMetaDataReader { @@ -195,7 +198,7 @@ impl ParquetMetaDataReader { /// let metadata = reader.finish().unwrap(); /// ``` pub fn try_parse_sized(&mut self, reader: &R, file_size: usize) -> Result<()> { - self.metadata = match Self::parse_metadata(reader) { + self.metadata = match self.parse_metadata(reader) { Ok(metadata) => Some(metadata), // FIXME: throughout this module ParquetError::IndexOutOfBound is used to indicate the // need for more data. This is not it's intended use. The plan is to add a NeedMoreData @@ -282,6 +285,19 @@ impl ParquetMetaDataReader { } } + // Perform extra sanity check to make sure `range` and the footer metadata don't + // overlap. + if let Some(metadata_size) = self.metadata_size { + let metadata_range = file_size.saturating_sub(metadata_size)..file_size; + if range.end > metadata_range.start { + return Err(eof_err!( + "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}", + range, + metadata_range + )); + } + } + let bytes_needed = range.end - range.start; let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?; let offset = range.start; @@ -455,8 +471,9 @@ impl ParquetMetaDataReader { range } - // one-shot parse of footer - fn parse_metadata(chunk_reader: &R) -> Result { + // One-shot parse of footer. + // Side effect: this will set `self.metadata_size` + fn parse_metadata(&mut self, chunk_reader: &R) -> Result { // check file is large enough to hold footer let file_size = chunk_reader.len(); if file_size < (FOOTER_SIZE as u64) { @@ -473,6 +490,7 @@ impl ParquetMetaDataReader { let metadata_len = Self::decode_footer(&footer)?; let footer_metadata_len = FOOTER_SIZE + metadata_len; + self.metadata_size = Some(footer_metadata_len); if footer_metadata_len > file_size as usize { return Err(ParquetError::IndexOutOfBound( @@ -654,14 +672,16 @@ mod tests { #[test] fn test_parse_metadata_size_smaller_than_footer() { let test_file = tempfile::tempfile().unwrap(); - let err = ParquetMetaDataReader::parse_metadata(&test_file).unwrap_err(); + let err = ParquetMetaDataReader::new() + .parse_metadata(&test_file) + .unwrap_err(); assert!(matches!(err, ParquetError::IndexOutOfBound(8, _))); } #[test] fn test_parse_metadata_corrupt_footer() { let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]); - let reader_result = ParquetMetaDataReader::parse_metadata(&data); + let reader_result = ParquetMetaDataReader::new().parse_metadata(&data); assert_eq!( reader_result.unwrap_err().to_string(), "Parquet error: Invalid Parquet file. Corrupt footer" @@ -671,7 +691,9 @@ mod tests { #[test] fn test_parse_metadata_invalid_start() { let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']); - let err = ParquetMetaDataReader::parse_metadata(&test_file).unwrap_err(); + let err = ParquetMetaDataReader::new() + .parse_metadata(&test_file) + .unwrap_err(); assert!(matches!(err, ParquetError::IndexOutOfBound(263, _))); }