Skip to content

Commit

Permalink
use ParquetMetaDataReader to read page indexes (apache#6506)
Browse files Browse the repository at this point in the history
add tests
  • Loading branch information
etseidl authored Oct 7, 2024
1 parent a117eed commit 7b30881
Showing 1 changed file with 154 additions and 20 deletions.
174 changes: 154 additions & 20 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::bloom_filter::Sbbf;
use crate::column::page::{Page, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::page_index::index_reader;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::{
metadata::*,
Expand Down Expand Up @@ -210,24 +209,19 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
}
}

let mut metadata = metadata_builder.build();

// If page indexes are desired, build them with the filtered set of row groups
if options.enable_page_index {
let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in metadata_builder.row_groups().iter() {
let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?;
columns_indexes.push(column_index);
offset_indexes.push(offset_index);
}
metadata_builder = metadata_builder
.set_column_index(Some(columns_indexes))
.set_offset_index(Some(offset_indexes));
let mut reader =
ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
reader.read_page_indexes(&chunk_reader)?;
metadata = reader.finish()?;
}

Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(metadata_builder.build()),
metadata: Arc::new(metadata),
props: Arc::new(options.props),
})
}
Expand Down Expand Up @@ -769,12 +763,15 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

#[cfg(test)]
mod tests {
use bytes::Buf;

use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::format::BoundaryOrder;

use crate::basic::{self, ColumnOrder};
use crate::column::reader::ColumnReader;
use crate::data_type::private::ParquetValueType;
use crate::data_type::{AsBytes, FixedLenByteArrayType};
use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
use crate::file::page_index::index::{Index, NativeIndex};
use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
use crate::file::writer::SerializedFileWriter;
Expand Down Expand Up @@ -1198,50 +1195,62 @@ mod tests {

#[test]
fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let origin_reader = SerializedFileReader::new(test_file)?;
let metadata = origin_reader.metadata();
let mid = get_midpoint_offset(metadata.row_group(0));

// true, true predicate
let test_file = get_test_file("alltypes_plain.parquet");
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|_, _| true))
.with_range(mid, mid + 1)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
assert_eq!(metadata.column_index().unwrap().len(), 1);
assert_eq!(metadata.offset_index().unwrap().len(), 1);

// true, false predicate
let test_file = get_test_file("alltypes_plain.parquet");
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|_, _| true))
.with_range(0, mid)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert_eq!(metadata.column_index().unwrap().len(), 0);
assert_eq!(metadata.offset_index().unwrap().len(), 0);

// false, true predicate
let test_file = get_test_file("alltypes_plain.parquet");
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|_, _| false))
.with_range(mid, mid + 1)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert_eq!(metadata.column_index().unwrap().len(), 0);
assert_eq!(metadata.offset_index().unwrap().len(), 0);

// false, false predicate
let test_file = get_test_file("alltypes_plain.parquet");
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|_, _| false))
.with_range(0, mid)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert_eq!(metadata.column_index().unwrap().len(), 0);
assert_eq!(metadata.offset_index().unwrap().len(), 0);
Ok(())
}

Expand Down Expand Up @@ -1804,4 +1813,129 @@ mod tests {
start += 1;
}
}

#[test]
fn test_filtered_rowgroup_metadata() {
let message_type = "
message test_schema {
REQUIRED INT32 a;
}
";
let schema = Arc::new(parse_message_type(message_type).unwrap());
let props = Arc::new(
WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build(),
);
let mut file: File = tempfile::tempfile().unwrap();
let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
let data = [1, 2, 3, 4, 5];

// write 5 row groups
for idx in 0..5 {
let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
let mut row_group_writer = file_writer.next_row_group().unwrap();
if let Some(mut writer) = row_group_writer.next_column().unwrap() {
writer
.typed::<Int32Type>()
.write_batch(data_i.as_slice(), None, None)
.unwrap();
writer.close().unwrap();
}
row_group_writer.close().unwrap();
file_writer.flushed_row_groups();
}
let file_metadata = file_writer.close().unwrap();

assert_eq!(file_metadata.num_rows, 25);
assert_eq!(file_metadata.row_groups.len(), 5);

// read only the 3rd row group
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
.build();
let reader =
SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
.unwrap();
let metadata = reader.metadata();

// check we got the expected row group
assert_eq!(metadata.num_row_groups(), 1);
assert_eq!(metadata.row_group(0).ordinal(), Some(2));

// check we only got the relevant page indexes
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
assert_eq!(metadata.column_index().unwrap().len(), 1);
assert_eq!(metadata.offset_index().unwrap().len(), 1);
let col_idx = metadata.column_index().unwrap();
let off_idx = metadata.offset_index().unwrap();
let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
let pg_idx = &col_idx[0][0];
let off_idx_i = &off_idx[0][0];

// test that we got the index matching the row group
match pg_idx {
Index::INT32(int_idx) => {
let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
}
_ => panic!("wrong stats type"),
}

// check offset index matches too
assert_eq!(
off_idx_i.page_locations[0].offset,
metadata.row_group(0).column(0).data_page_offset()
);

// read non-contiguous row groups
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
.build();
let reader =
SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
.unwrap();
let metadata = reader.metadata();

// check we got the expected row groups
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).ordinal(), Some(1));
assert_eq!(metadata.row_group(1).ordinal(), Some(3));

// check we only got the relevant page indexes
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
assert_eq!(metadata.column_index().unwrap().len(), 2);
assert_eq!(metadata.offset_index().unwrap().len(), 2);
let col_idx = metadata.column_index().unwrap();
let off_idx = metadata.offset_index().unwrap();

for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
let pg_idx = &col_idx_i[0];
let off_idx_i = &off_idx[i][0];

// test that we got the index matching the row group
match pg_idx {
Index::INT32(int_idx) => {
let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
}
_ => panic!("wrong stats type"),
}

// check offset index matches too
assert_eq!(
off_idx_i.page_locations[0].offset,
metadata.row_group(i).column(0).data_page_offset()
);
}
}
}

0 comments on commit 7b30881

Please sign in to comment.