Skip to content

Commit

Permalink
Add peek_next_page_offset to SerializedPageReader (#6945)
Browse files Browse the repository at this point in the history
* add peek_next_page_offset

* Update parquet/src/file/serialized_reader.rs

Co-authored-by: Andrew Lamb <[email protected]>

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
XiangpengHao and alamb authored Jan 9, 2025
1 parent d0260fc commit 88fb923
Showing 1 changed file with 142 additions and 0 deletions.
142 changes: 142 additions & 0 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,63 @@ impl<R: ChunkReader> SerializedPageReader<R> {
physical_type: meta.column_type(),
})
}

/// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
/// Unlike page metadata, an offset can uniquely identify a page.
///
/// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
/// This function allows us to check if the next page is being cached or read previously.
#[cfg(test)]
fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
match &mut self.state {
SerializedPageReaderState::Values {
offset,
remaining_bytes,
next_page_header,
} => {
loop {
if *remaining_bytes == 0 {
return Ok(None);
}
return if let Some(header) = next_page_header.as_ref() {
if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
Ok(Some(*offset))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
*next_page_header = None;
continue;
}
} else {
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
*offset += header_len;
*remaining_bytes -= header_len;
let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
Ok(Some(*offset))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
};
*next_page_header = Some(Box::new(header));
page_meta
};
}
}
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
..
} => {
if let Some(page) = dictionary_page {
Ok(Some(page.offset as usize))
} else if let Some(page) = page_locations.front() {
Ok(Some(page.offset as usize))
} else {
Ok(None)
}
}
}
}
}

impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
Expand Down Expand Up @@ -802,6 +859,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use bytes::Buf;

use crate::file::properties::{EnabledStatistics, WriterProperties};
Expand Down Expand Up @@ -1107,6 +1166,89 @@ mod tests {
assert_eq!(page_count, 2);
}

fn get_serialized_page_reader<R: ChunkReader>(
file_reader: &SerializedFileReader<R>,
row_group: usize,
column: usize,
) -> Result<SerializedPageReader<R>> {
let row_group = {
let row_group_metadata = file_reader.metadata.row_group(row_group);
let props = Arc::clone(&file_reader.props);
let f = Arc::clone(&file_reader.chunk_reader);
SerializedRowGroupReader::new(
f,
row_group_metadata,
file_reader
.metadata
.offset_index()
.map(|x| x[row_group].as_slice()),
props,
)?
};

let col = row_group.metadata.column(column);

let page_locations = row_group
.offset_index
.map(|x| x[column].page_locations.clone());

let props = Arc::clone(&row_group.props);
SerializedPageReader::new_with_properties(
Arc::clone(&row_group.chunk_reader),
col,
row_group.metadata.num_rows() as usize,
page_locations,
props,
)
}

#[test]
fn test_peek_next_page_offset_matches_actual() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let reader = SerializedFileReader::new(test_file)?;

let mut offset_set = HashSet::new();
let num_row_groups = reader.metadata.num_row_groups();
for row_group in 0..num_row_groups {
let num_columns = reader.metadata.row_group(row_group).num_columns();
for column in 0..num_columns {
let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;

while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
match &page_reader.state {
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
..
} => {
if let Some(page) = dictionary_page {
assert_eq!(page.offset as usize, page_offset);
} else if let Some(page) = page_locations.front() {
assert_eq!(page.offset as usize, page_offset);
} else {
unreachable!()
}
}
SerializedPageReaderState::Values {
offset,
next_page_header,
..
} => {
assert!(next_page_header.is_some());
assert_eq!(*offset, page_offset);
}
}
let page = page_reader.get_next_page()?;
assert!(page.is_some());
let newly_inserted = offset_set.insert(page_offset);
assert!(newly_inserted);
}
}
}

Ok(())
}

#[test]
fn test_page_iterator() {
let file = get_test_file("alltypes_plain.parquet");
Expand Down

0 comments on commit 88fb923

Please sign in to comment.