Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: Add ParquetMetaDataReader #6392

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0c5087f
add ParquetMetaDataReader
etseidl Aug 30, 2024
d5b60ab
add todo
etseidl Sep 11, 2024
d462fda
add more todos
etseidl Sep 11, 2024
6b9dd1c
take a stab at reading metadata without file size provided
etseidl Sep 12, 2024
0a2c4b2
temporarily comment out MetadataLoader
etseidl Sep 13, 2024
58f2463
remove debug print
etseidl Sep 13, 2024
08b985a
clippy
etseidl Sep 13, 2024
96062e1
Merge remote-tracking branch 'origin/master' into metadata_reader
etseidl Sep 16, 2024
cdf6ac5
add more todos
etseidl Sep 16, 2024
25e23d7
uncomment MetadataLoader
etseidl Sep 16, 2024
03bc663
silence doc warnings
etseidl Sep 16, 2024
51a5a72
fix size check
etseidl Sep 17, 2024
8a3f496
add try_parse_range
etseidl Sep 17, 2024
f8450e2
start on documentation
etseidl Sep 17, 2024
180e3e6
make sure docs compile
etseidl Sep 17, 2024
9d1147d
attempt recovery in test
etseidl Sep 17, 2024
1a1d3aa
implement some suggestions from review
etseidl Sep 18, 2024
d450ab8
remove suffix reading for now
etseidl Sep 18, 2024
3c340b7
add new error types to aid recovery
etseidl Sep 18, 2024
0d13599
remove parquet_metadata_from_file and add ParquetMetaDataReader::parse
etseidl Sep 19, 2024
d300cf3
remove todo
etseidl Sep 19, 2024
4ee162f
point to with_prefetch_hint from try_load docstring
etseidl Sep 19, 2024
2d65c3f
refactor the retry logic
etseidl Sep 19, 2024
2a2cf81
Merge remote-tracking branch 'origin/master' into metadata_reader
etseidl Sep 19, 2024
faff575
add some more tests
etseidl Sep 19, 2024
c9e5ea6
add load() and bring over tests from async_reader/metadata.rs
etseidl Sep 20, 2024
4214909
only run new tests if async is enabled
etseidl Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions parquet/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Common Parquet errors and macros.

use std::error::Error;
use std::ops::Range;
use std::{cell, io, result, str};

#[cfg(feature = "arrow")]
Expand Down Expand Up @@ -45,6 +46,10 @@ pub enum ParquetError {
IndexOutOfBound(usize, usize),
/// An external error variant
External(Box<dyn Error + Send + Sync>),
/// Returned when a function needs more data to complete properly.
NeedMoreData(usize),
/// Returned when a function needs a larger range of data to complete properly.
NeedLargerRange(Range<usize>),
}

impl std::fmt::Display for ParquetError {
Expand All @@ -61,6 +66,8 @@ impl std::fmt::Display for ParquetError {
write!(fmt, "Index {index} out of bound: {bound}")
}
ParquetError::External(e) => write!(fmt, "External: {e}"),
ParquetError::NeedMoreData(needed) => write!(fmt, "NeedMoreData: {needed}"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nitpick is that these seems pretty similar . I wonder if it would make sense to combine them somehow 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't intending two, it just turned out that way. I could make the second usize optional with the understanding that a range is being requested.

Also, does adding to the enum make this a breaking change? If so, I could go back to my tortured use of IndexOutOfBound until it's open season on breaking changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does adding to the enum make this a breaking change? If so, I could go back to my tortured use of IndexOutOfBound until it's open season on breaking changes.

Yes, unfortunately, it does make it a breaking change

https://github.com/apache/arrow-rs/blob/master/parquet/src/errors.rs#L29

We should probably mark the error type as "non exhaustive" which would make it a non breaking change in the future

ParquetError::NeedLargerRange(range) => write!(fmt, "NeedLargerRange: {:?}", range),
}
}
}
Expand Down
136 changes: 78 additions & 58 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,30 @@ impl ParquetMetaDataReader {
}

/// Same as [`Self::try_parse()`], but only `file_range` bytes of the original file are
/// available.
// TODO(ets): should this also use IndexOutOfBound when range doesn't include the whole footer?
/// available. `file_range.end` must point to the end of the file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little confused about how file_range works in this case (given that it seems to me that ChunkReader would in theory allow reading arbitrary ranges)

Is the idea that try_parse_range limits the requests to the reader so they are only within file_range?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for the case of impl ChunkReader for Bytes. Say you've speculatively read the last 1000 bytes of a file into a buffer, and let's say that's actually sufficient for the page indexes. But the locations of the page indexes are absolute (in this case they're in the range 1100..1500), so you can't actually find the indexes in the buffer unless you know the file offset of the beginning of the buffer (trying to seek to 1100 in a 1000 byte buffer clearly won't work...you need to subtract 1000 from the absolute offsets and read 100..500 from the buffer).

I also wanted different error behavior for the File vs Bytes use cases. If a File is passed, there's no sense in asking for a larger file; if the metadata can't be read there's either some I/O error going on or a corrupted file.

Perhaps we could instead just pass in the file size, while still mandating that if Bytes are passed they must include the footer. We could then infer the range as file_size-reader.len()..file_size, and then errors could simply return the number of bytes needed from the tail of the file. This would perhaps solve the above issue with two new and very similar errors types.

pub fn try_parse_range<R: ChunkReader>(
&mut self,
reader: &R,
file_range: Range<usize>,
) -> Result<()> {
self.metadata = Some(Self::parse_metadata(reader)?);
self.metadata = match Self::parse_metadata(reader) {
Ok(metadata) => Some(metadata),
Err(ParquetError::NeedMoreData(needed)) => {
// If the provided range starts at 0, then presumably there is no more to read,
// so return an EOF error.
if file_range.start == 0 {
return Err(eof_err!(
"Parquet file too small. Size is {} but need {needed}",
reader.len()
));
} else {
return Err(ParquetError::NeedLargerRange(
file_range.end - needed..file_range.end,
));
}
}
Err(e) => return Err(e),
};

// we can return if page indexes aren't requested
if !self.column_index && !self.offset_index {
Expand All @@ -179,9 +195,8 @@ impl ParquetMetaDataReader {

// Check to see if needed range is within `file_range`. Checking `range.end` seems
// redundant, but it guards against `range_for_page_index()` returning garbage.
// TODO(ets): should probably add a new error type...IOOB is a little tortured
if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
return Err(ParquetError::IndexOutOfBound(range.start, range.end));
return Err(ParquetError::NeedLargerRange(range.start..file_range.end));
}

let bytes_needed = range.end - range.start;
Expand Down Expand Up @@ -321,9 +336,7 @@ impl ParquetMetaDataReader {
// check file is large enough to hold footer
let file_size = chunk_reader.len();
if file_size < (FOOTER_SIZE as u64) {
return Err(general_err!(
"Invalid Parquet file. Size is smaller than footer"
));
return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
}

let mut footer = [0_u8; 8];
Expand All @@ -335,12 +348,7 @@ impl ParquetMetaDataReader {
let footer_metadata_len = FOOTER_SIZE + metadata_len;

if footer_metadata_len > file_size as usize {
return Err(general_err!(
"Invalid Parquet file. Reported metadata length of {} + {} byte footer, but file is only {} bytes",
metadata_len,
FOOTER_SIZE,
file_size
));
return Err(ParquetError::NeedMoreData(footer_metadata_len));
}

let start = file_size - footer_metadata_len as u64;
Expand Down Expand Up @@ -517,11 +525,8 @@ mod tests {
#[test]
fn test_parse_metadata_size_smaller_than_footer() {
let test_file = tempfile::tempfile().unwrap();
let reader_result = ParquetMetaDataReader::parse_metadata(&test_file);
assert_eq!(
reader_result.unwrap_err().to_string(),
"Parquet error: Invalid Parquet file. Size is smaller than footer"
);
let err = ParquetMetaDataReader::parse_metadata(&test_file).unwrap_err();
assert!(matches!(err, ParquetError::NeedMoreData(8)));
}

#[test]
Expand All @@ -537,11 +542,8 @@ mod tests {
#[test]
fn test_parse_metadata_invalid_start() {
let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
let reader_result = ParquetMetaDataReader::parse_metadata(&test_file);
assert_eq!(
reader_result.unwrap_err().to_string(),
"Parquet error: Invalid Parquet file. Reported metadata length of 255 + 8 byte footer, but file is only 8 bytes"
);
let err = ParquetMetaDataReader::parse_metadata(&test_file).unwrap_err();
assert!(matches!(err, ParquetError::NeedMoreData(263)));
}

#[test]
Expand Down Expand Up @@ -597,71 +599,89 @@ mod tests {
}

#[test]
fn test_simple() {
fn test_try_parse() {
let file = get_test_file("alltypes_tiny_pages.parquet");
let len = file.len() as usize;

let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);

let bytes_for_range = |range: &Range<usize>| {
file.get_bytes(range.start as u64, range.end - range.start)
.unwrap()
};

// read entire file
let bytes = file.get_bytes(0, len).unwrap();
let bytes = bytes_for_range(&(0..len));
reader.try_parse(&bytes).unwrap();
let metadata = reader.finish().unwrap();
assert!(metadata.column_index.is_some());
assert!(metadata.offset_index.is_some());

// read more than enough of file
let range = 320000..len;
let bytes = file
.get_bytes(range.start as u64, range.end - range.start)
.unwrap();
let bytes = bytes_for_range(&range);
reader.try_parse_range(&bytes, range).unwrap();
let metadata = reader.finish().unwrap();
assert!(metadata.column_index.is_some());
assert!(metadata.offset_index.is_some());

// exactly enough
let range = 323583..len;
let bytes = file
.get_bytes(range.start as u64, range.end - range.start)
.unwrap();
let bytes = bytes_for_range(&range);
reader.try_parse_range(&bytes, range).unwrap();
let metadata = reader.finish().unwrap();
assert!(metadata.column_index.is_some());
assert!(metadata.offset_index.is_some());

// not enough for page index
let range = 323584..len;
let bytes = file
.get_bytes(range.start as u64, range.end - range.start)
.unwrap();
match reader.try_parse_range(&bytes, range) {
Ok(_) => (),
Err(err) => match err {
// expected error, try again with provided bounds
ParquetError::IndexOutOfBound(start, _) => {
let range = start..len;
let bytes = file
.get_bytes(range.start as u64, range.end - range.start)
.unwrap();
reader.try_parse_range(&bytes, range).unwrap();
let metadata = reader.finish().unwrap();
assert!(metadata.column_index.is_some());
assert!(metadata.offset_index.is_some());
}
_ => panic!("unexpected error"),
},
}
let bytes = bytes_for_range(&range);
// should fail
match reader.try_parse_range(&bytes, range).unwrap_err() {
// expected error, try again with provided bounds
ParquetError::NeedLargerRange(needed) => {
let bytes = file
.get_bytes(needed.start as u64, needed.end - needed.start)
.unwrap();
reader.try_parse_range(&bytes, needed).unwrap();
let metadata = reader.finish().unwrap();
assert!(metadata.column_index.is_some());
assert!(metadata.offset_index.is_some());
}
_ => panic!("unexpected error"),
};

// not enough for file metadata
let mut reader = ParquetMetaDataReader::new();
let range = 452505..len;
let bytes = file
.get_bytes(range.start as u64, range.end - range.start)
.unwrap();
let err = reader.try_parse_range(&bytes, range).unwrap_err();
let bytes = bytes_for_range(&range);
// should fail
match reader.try_parse_range(&bytes, range).unwrap_err() {
// expected error, try again with provided bounds
ParquetError::NeedLargerRange(needed) => {
let bytes = file
.get_bytes(needed.start as u64, needed.end - needed.start)
.unwrap();
reader.try_parse_range(&bytes, needed).unwrap();
reader.finish().unwrap();
}
_ => panic!("unexpected error"),
};

// not enough for file metadata but use try_parse()
let reader_result = reader.try_parse(&bytes).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Invalid Parquet file. Reported metadata length of 1721 + 8 byte footer, but file is only 1728 bytes"
reader_result.to_string(),
"EOF: Parquet file too small. Size is 1728 but need 1729"
);

// read head of file rather than tail
let range = 0..1000;
let bytes = bytes_for_range(&range);
let reader_result = reader.try_parse_range(&bytes, range).unwrap_err();
assert_eq!(
reader_result.to_string(),
"Parquet error: Invalid Parquet file. Corrupt footer"
);
}
}
Loading