Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: Add ParquetMetaDataReader #6392

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0c5087f
add ParquetMetaDataReader
etseidl Aug 30, 2024
d5b60ab
add todo
etseidl Sep 11, 2024
d462fda
add more todos
etseidl Sep 11, 2024
6b9dd1c
take a stab at reading metadata without file size provided
etseidl Sep 12, 2024
0a2c4b2
temporarily comment out MetadataLoader
etseidl Sep 13, 2024
58f2463
remove debug print
etseidl Sep 13, 2024
08b985a
clippy
etseidl Sep 13, 2024
96062e1
Merge remote-tracking branch 'origin/master' into metadata_reader
etseidl Sep 16, 2024
cdf6ac5
add more todos
etseidl Sep 16, 2024
25e23d7
uncomment MetadataLoader
etseidl Sep 16, 2024
03bc663
silence doc warnings
etseidl Sep 16, 2024
51a5a72
fix size check
etseidl Sep 17, 2024
8a3f496
add try_parse_range
etseidl Sep 17, 2024
f8450e2
start on documentation
etseidl Sep 17, 2024
180e3e6
make sure docs compile
etseidl Sep 17, 2024
9d1147d
attempt recovery in test
etseidl Sep 17, 2024
1a1d3aa
implement some suggestions from review
etseidl Sep 18, 2024
d450ab8
remove suffix reading for now
etseidl Sep 18, 2024
3c340b7
add new error types to aid recovery
etseidl Sep 18, 2024
0d13599
remove parquet_metadata_from_file and add ParquetMetaDataReader::parse
etseidl Sep 19, 2024
d300cf3
remove todo
etseidl Sep 19, 2024
4ee162f
point to with_prefetch_hint from try_load docstring
etseidl Sep 19, 2024
2d65c3f
refactor the retry logic
etseidl Sep 19, 2024
2a2cf81
Merge remote-tracking branch 'origin/master' into metadata_reader
etseidl Sep 19, 2024
faff575
add some more tests
etseidl Sep 19, 2024
c9e5ea6
add load() and bring over tests from async_reader/metadata.rs
etseidl Sep 20, 2024
4214909
only run new tests if async is enabled
etseidl Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 4 additions & 22 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer;
use crate::file::metadata::reader::parquet_metadata_from_file;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index_reader;
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;

Expand Down Expand Up @@ -382,23 +381,7 @@ impl ArrowReaderMetadata {
/// `Self::metadata` is missing the page index, this function will attempt
/// to load the page index by making an object store request.
pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
let mut metadata = footer::parse_metadata(reader)?;
if options.page_index {
let column_index = metadata
.row_groups()
.iter()
.map(|rg| index_reader::read_columns_indexes(reader, rg.columns()))
.collect::<Result<Vec<_>>>()?;
metadata.set_column_index(Some(column_index));

let offset_index = metadata
.row_groups()
.iter()
.map(|rg| index_reader::read_offset_indexes(reader, rg.columns()))
.collect::<Result<Vec<_>>>()?;

metadata.set_offset_index(Some(offset_index))
}
let metadata = parquet_metadata_from_file(reader, options.page_index, options.page_index)?;
Self::try_new(Arc::new(metadata), options)
}

Expand Down Expand Up @@ -3496,9 +3479,8 @@ mod tests {
ArrowReaderOptions::new().with_page_index(true),
)
.unwrap();
// Although `Vec<Vec<PageLoacation>>` of each row group is empty,
// we should read the file successfully.
assert!(builder.metadata().offset_index().unwrap()[0].is_empty());
// Absent page indexes should not be initialized, and file should still be readable.
assert!(builder.metadata().offset_index().is_none());
let reader = builder.build().unwrap();
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(batches.len(), 1);
Expand Down
51 changes: 27 additions & 24 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
use crate::file::FOOTER_SIZE;
Expand Down Expand Up @@ -76,27 +75,27 @@ impl<F: MetadataFetch> MetadataLoader<F> {
let mut footer = [0; FOOTER_SIZE];
footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);

let length = decode_footer(&footer)?;
let length = ParquetMetaDataReader::decode_footer(&footer)?;

if file_size < length + FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
"file size of {} is less than footer + metadata {}",
file_size,
length + 8
length + FOOTER_SIZE
)));
}

// Did not fetch the entire file metadata in the initial read, need to make a second request
let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE {
let metadata_start = file_size - length - FOOTER_SIZE;
let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
(decode_metadata(&meta)?, None)
(ParquetMetaDataReader::decode_metadata(&meta)?, None)
} else {
let metadata_start = file_size - length - FOOTER_SIZE - footer_start;

let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
(
decode_metadata(slice)?,
ParquetMetaDataReader::decode_metadata(slice)?,
Some((footer_start, suffix.slice(..metadata_start))),
)
};
Expand Down Expand Up @@ -237,8 +236,10 @@ where
Fut: Future<Output = Result<Bytes>> + Send,
{
let fetch = MetadataFetchFn(fetch);
let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
Ok(loader.finish())
// TODO(ets): should add option to read page index to this function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative perhaps would be to deprecate fetch_parquet_metadata entirely and suggest people use ParquetMetaDataReader which s more complete and full featured -- I think we could deprecate this function in a minor release (we can't remover it until a major release)

let mut reader = ParquetMetaDataReader::new().with_prefetch_hint(prefetch);
reader.try_load(fetch, file_size).await?;
reader.finish()
}

#[cfg(test)]
Expand Down Expand Up @@ -332,41 +333,43 @@ mod tests {
};

let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
loader.load_page_index(true, true).await.unwrap();
let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
loader.try_load(f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
let metadata = loader.finish();
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch just footer exactly
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
loader.load_page_index(true, true).await.unwrap();
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(1729));
loader.try_load(f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish();
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than footer but not enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
loader.load_page_index(true, true).await.unwrap();
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(130649));
loader.try_load(f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish();
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch exactly enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
loader.load_page_index(true, true).await.unwrap();
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(130650));
loader.try_load(f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
let metadata = loader.finish();
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
}
}
57 changes: 22 additions & 35 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,9 @@ use crate::bloom_filter::{
};
use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::file::FOOTER_SIZE;
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};

mod metadata;
Expand Down Expand Up @@ -179,21 +177,10 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
async move {
self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;

let mut buf = [0_u8; FOOTER_SIZE];
self.read_exact(&mut buf).await?;

let metadata_len = decode_footer(&buf)?;
self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
.await?;

let mut buf = Vec::with_capacity(metadata_len);
self.take(metadata_len as _).read_to_end(&mut buf).await?;

Ok(Arc::new(decode_metadata(&buf)?))
let mut reader = ParquetMetaDataReader::new();
reader.try_load_from_tail(self).await?;
Ok(Arc::new(reader.finish()?))
}
.boxed()
}
Expand All @@ -220,9 +207,9 @@ impl ArrowReaderMetadata {
&& metadata.offset_index().is_none()
{
let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
let mut loader = MetadataLoader::new(input, m);
loader.load_page_index(true, true).await?;
metadata = Arc::new(loader.finish())
let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
reader.load_page_index(input, None).await?;
metadata = Arc::new(reader.finish()?)
}
Self::try_new(metadata, options)
}
Expand Down Expand Up @@ -909,7 +896,7 @@ mod tests {
};
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::ArrowWriter;
use crate::file::footer::parse_metadata;
use crate::file::metadata::reader::parquet_metadata_from_file;
use crate::file::page_index::index_reader;
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
Expand Down Expand Up @@ -952,7 +939,7 @@ mod tests {
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1007,7 +994,7 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1073,7 +1060,7 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1117,7 +1104,7 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1173,7 +1160,7 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1238,7 +1225,7 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1317,7 +1304,7 @@ mod tests {
writer.close().unwrap();

let data: Bytes = buf.into();
let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();

let test = TestReader {
Expand Down Expand Up @@ -1391,7 +1378,7 @@ mod tests {
writer.close().unwrap();

let data: Bytes = buf.into();
let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();

assert_eq!(metadata.num_row_groups(), 2);

Expand Down Expand Up @@ -1479,7 +1466,7 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
let metadata = Arc::new(metadata);

Expand Down Expand Up @@ -1529,7 +1516,7 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();

let offset_index =
index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
Expand Down Expand Up @@ -1619,7 +1606,7 @@ mod tests {
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();
let file_rows = metadata.file_metadata().num_rows() as usize;
let metadata = Arc::new(metadata);

Expand Down Expand Up @@ -1764,7 +1751,7 @@ mod tests {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();
let metadata = Arc::new(metadata);
let async_reader = TestReader {
data: data.clone(),
Expand Down Expand Up @@ -1793,7 +1780,7 @@ mod tests {
}

async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
Expand Down Expand Up @@ -1933,7 +1920,7 @@ mod tests {
writer.close().unwrap();

let data: Bytes = buf.into();
let metadata = parse_metadata(&data).unwrap();
let metadata = parquet_metadata_from_file(&data, false, false).unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();

let test = TestReader {
Expand Down
18 changes: 8 additions & 10 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt};

use object_store::{ObjectMeta, ObjectStore};

use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::Result;
use crate::file::metadata::ParquetMetaData;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};

/// Reads Parquet files in object storage using [`ObjectStore`].
///
Expand Down Expand Up @@ -124,15 +124,13 @@ impl AsyncFileReader for ParquetObjectReader {

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let preload_column_index = self.preload_column_index;
let preload_offset_index = self.preload_offset_index;
let mut reader = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint);
let file_size = self.meta.size;
let prefetch = self.metadata_size_hint;
let mut loader = MetadataLoader::load(self, file_size, prefetch).await?;
loader
.load_page_index(preload_column_index, preload_offset_index)
.await?;
Ok(Arc::new(loader.finish()))
reader.try_load(self, file_size).await?;
Ok(Arc::new(reader.finish()?))
})
}
}
Expand Down
3 changes: 2 additions & 1 deletion parquet/src/bin/parquet-concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ impl Args {
.iter()
.map(|x| {
let reader = File::open(x)?;
let metadata = parquet::file::footer::parse_metadata(&reader)?;
let metadata =
parquet::file::metadata::parquet_metadata_from_file(&reader, false, false)?;
Ok((reader, metadata))
})
.collect::<Result<Vec<_>>>()?;
Expand Down
Loading
Loading