diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index ddb7d0df9052..83cd61371d96 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -27,7 +27,6 @@ use crate::bloom_filter::Sbbf; use crate::column::page::{Page, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; -use crate::file::page_index::index_reader; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::{ metadata::*, @@ -210,24 +209,19 @@ impl SerializedFileReader { } } + let mut metadata = metadata_builder.build(); + + // If page indexes are desired, build them with the filtered set of row groups if options.enable_page_index { - let mut columns_indexes = vec![]; - let mut offset_indexes = vec![]; - - for rg in metadata_builder.row_groups().iter() { - let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?; - let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?; - columns_indexes.push(column_index); - offset_indexes.push(offset_index); - } - metadata_builder = metadata_builder - .set_column_index(Some(columns_indexes)) - .set_offset_index(Some(offset_indexes)); + let mut reader = + ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true); + reader.read_page_indexes(&chunk_reader)?; + metadata = reader.finish()?; } Ok(Self { chunk_reader: Arc::new(chunk_reader), - metadata: Arc::new(metadata_builder.build()), + metadata: Arc::new(metadata), props: Arc::new(options.props), }) } @@ -769,12 +763,15 @@ impl PageReader for SerializedPageReader { #[cfg(test)] mod tests { + use bytes::Buf; + + use crate::file::properties::{EnabledStatistics, WriterProperties}; use crate::format::BoundaryOrder; use crate::basic::{self, ColumnOrder}; use crate::column::reader::ColumnReader; use crate::data_type::private::ParquetValueType; - use crate::data_type::{AsBytes, FixedLenByteArrayType}; + use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type}; use crate::file::page_index::index::{Index, NativeIndex}; use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes}; use crate::file::writer::SerializedFileWriter; @@ -1198,50 +1195,62 @@ mod tests { #[test] fn test_file_reader_filter_row_groups_and_range() -> Result<()> { - let test_file = get_test_file("alltypes_plain.parquet"); + let test_file = get_test_file("alltypes_tiny_pages.parquet"); let origin_reader = SerializedFileReader::new(test_file)?; let metadata = origin_reader.metadata(); let mid = get_midpoint_offset(metadata.row_group(0)); // true, true predicate - let test_file = get_test_file("alltypes_plain.parquet"); + let test_file = get_test_file("alltypes_tiny_pages.parquet"); let read_options = ReadOptionsBuilder::new() + .with_page_index() .with_predicate(Box::new(|_, _| true)) .with_range(mid, mid + 1) .build(); let reader = SerializedFileReader::new_with_options(test_file, read_options)?; let metadata = reader.metadata(); assert_eq!(metadata.num_row_groups(), 1); + assert_eq!(metadata.column_index().unwrap().len(), 1); + assert_eq!(metadata.offset_index().unwrap().len(), 1); // true, false predicate - let test_file = get_test_file("alltypes_plain.parquet"); + let test_file = get_test_file("alltypes_tiny_pages.parquet"); let read_options = ReadOptionsBuilder::new() + .with_page_index() .with_predicate(Box::new(|_, _| true)) .with_range(0, mid) .build(); let reader = SerializedFileReader::new_with_options(test_file, read_options)?; let metadata = reader.metadata(); assert_eq!(metadata.num_row_groups(), 0); + assert_eq!(metadata.column_index().unwrap().len(), 0); + assert_eq!(metadata.offset_index().unwrap().len(), 0); // false, true predicate - let test_file = get_test_file("alltypes_plain.parquet"); + let test_file = get_test_file("alltypes_tiny_pages.parquet"); let read_options = ReadOptionsBuilder::new() + .with_page_index() .with_predicate(Box::new(|_, _| false)) .with_range(mid, mid + 1) .build(); let reader = SerializedFileReader::new_with_options(test_file, read_options)?; let metadata = reader.metadata(); assert_eq!(metadata.num_row_groups(), 0); + assert_eq!(metadata.column_index().unwrap().len(), 0); + assert_eq!(metadata.offset_index().unwrap().len(), 0); // false, false predicate - let test_file = get_test_file("alltypes_plain.parquet"); + let test_file = get_test_file("alltypes_tiny_pages.parquet"); let read_options = ReadOptionsBuilder::new() + .with_page_index() .with_predicate(Box::new(|_, _| false)) .with_range(0, mid) .build(); let reader = SerializedFileReader::new_with_options(test_file, read_options)?; let metadata = reader.metadata(); assert_eq!(metadata.num_row_groups(), 0); + assert_eq!(metadata.column_index().unwrap().len(), 0); + assert_eq!(metadata.offset_index().unwrap().len(), 0); Ok(()) } @@ -1804,4 +1813,129 @@ mod tests { start += 1; } } + + #[test] + fn test_filtered_rowgroup_metadata() { + let message_type = " + message test_schema { + REQUIRED INT32 a; + } + "; + let schema = Arc::new(parse_message_type(message_type).unwrap()); + let props = Arc::new( + WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .build(), + ); + let mut file: File = tempfile::tempfile().unwrap(); + let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap(); + let data = [1, 2, 3, 4, 5]; + + // write 5 row groups + for idx in 0..5 { + let data_i: Vec = data.iter().map(|x| x * (idx + 1)).collect(); + let mut row_group_writer = file_writer.next_row_group().unwrap(); + if let Some(mut writer) = row_group_writer.next_column().unwrap() { + writer + .typed::() + .write_batch(data_i.as_slice(), None, None) + .unwrap(); + writer.close().unwrap(); + } + row_group_writer.close().unwrap(); + file_writer.flushed_row_groups(); + } + let file_metadata = file_writer.close().unwrap(); + + assert_eq!(file_metadata.num_rows, 25); + assert_eq!(file_metadata.row_groups.len(), 5); + + // read only the 3rd row group + let read_options = ReadOptionsBuilder::new() + .with_page_index() + .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2)) + .build(); + let reader = + SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options) + .unwrap(); + let metadata = reader.metadata(); + + // check we got the expected row group + assert_eq!(metadata.num_row_groups(), 1); + assert_eq!(metadata.row_group(0).ordinal(), Some(2)); + + // check we only got the relevant page indexes + assert!(metadata.column_index().is_some()); + assert!(metadata.offset_index().is_some()); + assert_eq!(metadata.column_index().unwrap().len(), 1); + assert_eq!(metadata.offset_index().unwrap().len(), 1); + let col_idx = metadata.column_index().unwrap(); + let off_idx = metadata.offset_index().unwrap(); + let col_stats = metadata.row_group(0).column(0).statistics().unwrap(); + let pg_idx = &col_idx[0][0]; + let off_idx_i = &off_idx[0][0]; + + // test that we got the index matching the row group + match pg_idx { + Index::INT32(int_idx) => { + let min = col_stats.min_bytes_opt().unwrap().get_i32_le(); + let max = col_stats.max_bytes_opt().unwrap().get_i32_le(); + assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref()); + assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref()); + } + _ => panic!("wrong stats type"), + } + + // check offset index matches too + assert_eq!( + off_idx_i.page_locations[0].offset, + metadata.row_group(0).column(0).data_page_offset() + ); + + // read non-contiguous row groups + let read_options = ReadOptionsBuilder::new() + .with_page_index() + .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1)) + .build(); + let reader = + SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options) + .unwrap(); + let metadata = reader.metadata(); + + // check we got the expected row groups + assert_eq!(metadata.num_row_groups(), 2); + assert_eq!(metadata.row_group(0).ordinal(), Some(1)); + assert_eq!(metadata.row_group(1).ordinal(), Some(3)); + + // check we only got the relevant page indexes + assert!(metadata.column_index().is_some()); + assert!(metadata.offset_index().is_some()); + assert_eq!(metadata.column_index().unwrap().len(), 2); + assert_eq!(metadata.offset_index().unwrap().len(), 2); + let col_idx = metadata.column_index().unwrap(); + let off_idx = metadata.offset_index().unwrap(); + + for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) { + let col_stats = metadata.row_group(i).column(0).statistics().unwrap(); + let pg_idx = &col_idx_i[0]; + let off_idx_i = &off_idx[i][0]; + + // test that we got the index matching the row group + match pg_idx { + Index::INT32(int_idx) => { + let min = col_stats.min_bytes_opt().unwrap().get_i32_le(); + let max = col_stats.max_bytes_opt().unwrap().get_i32_le(); + assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref()); + assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref()); + } + _ => panic!("wrong stats type"), + } + + // check offset index matches too + assert_eq!( + off_idx_i.page_locations[0].offset, + metadata.row_group(i).column(0).data_page_offset() + ); + } + } }