diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index a942481f7e4d..81ba0a66463e 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -568,6 +568,63 @@ impl SerializedPageReader { physical_type: meta.column_type(), }) } + + /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata. + /// Unlike page metadata, an offset can uniquely identify a page. + /// + /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice. + /// This function allows us to check if the next page is being cached or read previously. + #[cfg(test)] + fn peek_next_page_offset(&mut self) -> Result> { + match &mut self.state { + SerializedPageReaderState::Values { + offset, + remaining_bytes, + next_page_header, + } => { + loop { + if *remaining_bytes == 0 { + return Ok(None); + } + return if let Some(header) = next_page_header.as_ref() { + if let Ok(_page_meta) = PageMetadata::try_from(&**header) { + Ok(Some(*offset)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + *next_page_header = None; + continue; + } + } else { + let mut read = self.reader.get_read(*offset as u64)?; + let (header_len, header) = read_page_header_len(&mut read)?; + *offset += header_len; + *remaining_bytes -= header_len; + let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) { + Ok(Some(*offset)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + continue; + }; + *next_page_header = Some(Box::new(header)); + page_meta + }; + } + } + SerializedPageReaderState::Pages { + page_locations, + dictionary_page, + .. + } => { + if let Some(page) = dictionary_page { + Ok(Some(page.offset as usize)) + } else if let Some(page) = page_locations.front() { + Ok(Some(page.offset as usize)) + } else { + Ok(None) + } + } + } + } } impl Iterator for SerializedPageReader { @@ -802,6 +859,8 @@ impl PageReader for SerializedPageReader { #[cfg(test)] mod tests { + use std::collections::HashSet; + use bytes::Buf; use crate::file::properties::{EnabledStatistics, WriterProperties}; @@ -1107,6 +1166,89 @@ mod tests { assert_eq!(page_count, 2); } + fn get_serialized_page_reader( + file_reader: &SerializedFileReader, + row_group: usize, + column: usize, + ) -> Result> { + let row_group = { + let row_group_metadata = file_reader.metadata.row_group(row_group); + let props = Arc::clone(&file_reader.props); + let f = Arc::clone(&file_reader.chunk_reader); + SerializedRowGroupReader::new( + f, + row_group_metadata, + file_reader + .metadata + .offset_index() + .map(|x| x[row_group].as_slice()), + props, + )? + }; + + let col = row_group.metadata.column(column); + + let page_locations = row_group + .offset_index + .map(|x| x[column].page_locations.clone()); + + let props = Arc::clone(&row_group.props); + SerializedPageReader::new_with_properties( + Arc::clone(&row_group.chunk_reader), + col, + row_group.metadata.num_rows() as usize, + page_locations, + props, + ) + } + + #[test] + fn test_peek_next_page_offset_matches_actual() -> Result<()> { + let test_file = get_test_file("alltypes_plain.parquet"); + let reader = SerializedFileReader::new(test_file)?; + + let mut offset_set = HashSet::new(); + let num_row_groups = reader.metadata.num_row_groups(); + for row_group in 0..num_row_groups { + let num_columns = reader.metadata.row_group(row_group).num_columns(); + for column in 0..num_columns { + let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?; + + while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() { + match &page_reader.state { + SerializedPageReaderState::Pages { + page_locations, + dictionary_page, + .. + } => { + if let Some(page) = dictionary_page { + assert_eq!(page.offset as usize, page_offset); + } else if let Some(page) = page_locations.front() { + assert_eq!(page.offset as usize, page_offset); + } else { + unreachable!() + } + } + SerializedPageReaderState::Values { + offset, + next_page_header, + .. + } => { + assert!(next_page_header.is_some()); + assert_eq!(*offset, page_offset); + } + } + let page = page_reader.get_next_page()?; + assert!(page.is_some()); + let newly_inserted = offset_set.insert(page_offset); + assert!(newly_inserted); + } + } + } + + Ok(()) + } + #[test] fn test_page_iterator() { let file = get_test_file("alltypes_plain.parquet");