-
Notifications
You must be signed in to change notification settings - Fork 824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC: Add ParquetMetaDataReader #6392
Changes from 1 commit
0c5087f
d5b60ab
d462fda
6b9dd1c
0a2c4b2
58f2463
08b985a
96062e1
cdf6ac5
25e23d7
03bc663
51a5a72
8a3f496
f8450e2
180e3e6
9d1147d
1a1d3aa
d450ab8
3c340b7
0d13599
d300cf3
4ee162f
2d65c3f
2a2cf81
faff575
c9e5ea6
4214909
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -153,14 +153,30 @@ impl ParquetMetaDataReader { | |
} | ||
|
||
/// Same as [`Self::try_parse()`], but only `file_range` bytes of the original file are | ||
/// available. | ||
// TODO(ets): should this also use IndexOutOfBound when range doesn't include the whole footer? | ||
/// available. `file_range.end` must point to the end of the file. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was a little confused about how Is the idea that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's for the case of I also wanted different error behavior for the Perhaps we could instead just pass in the file size, while still mandating that if |
||
pub fn try_parse_range<R: ChunkReader>( | ||
&mut self, | ||
reader: &R, | ||
file_range: Range<usize>, | ||
) -> Result<()> { | ||
self.metadata = Some(Self::parse_metadata(reader)?); | ||
self.metadata = match Self::parse_metadata(reader) { | ||
Ok(metadata) => Some(metadata), | ||
Err(ParquetError::NeedMoreData(needed)) => { | ||
// If the provided range starts at 0, then presumably there is no more to read, | ||
// so return an EOF error. | ||
if file_range.start == 0 { | ||
return Err(eof_err!( | ||
"Parquet file too small. Size is {} but need {needed}", | ||
reader.len() | ||
)); | ||
} else { | ||
return Err(ParquetError::NeedLargerRange( | ||
file_range.end - needed..file_range.end, | ||
)); | ||
} | ||
} | ||
Err(e) => return Err(e), | ||
}; | ||
|
||
// we can return if page indexes aren't requested | ||
if !self.column_index && !self.offset_index { | ||
|
@@ -179,9 +195,8 @@ impl ParquetMetaDataReader { | |
|
||
// Check to see if needed range is within `file_range`. Checking `range.end` seems | ||
// redundant, but it guards against `range_for_page_index()` returning garbage. | ||
// TODO(ets): should probably add a new error type...IOOB is a little tortured | ||
if !(file_range.contains(&range.start) && file_range.contains(&range.end)) { | ||
return Err(ParquetError::IndexOutOfBound(range.start, range.end)); | ||
return Err(ParquetError::NeedLargerRange(range.start..file_range.end)); | ||
} | ||
|
||
let bytes_needed = range.end - range.start; | ||
|
@@ -321,9 +336,7 @@ impl ParquetMetaDataReader { | |
// check file is large enough to hold footer | ||
let file_size = chunk_reader.len(); | ||
if file_size < (FOOTER_SIZE as u64) { | ||
return Err(general_err!( | ||
"Invalid Parquet file. Size is smaller than footer" | ||
)); | ||
return Err(ParquetError::NeedMoreData(FOOTER_SIZE)); | ||
} | ||
|
||
let mut footer = [0_u8; 8]; | ||
|
@@ -335,12 +348,7 @@ impl ParquetMetaDataReader { | |
let footer_metadata_len = FOOTER_SIZE + metadata_len; | ||
|
||
if footer_metadata_len > file_size as usize { | ||
return Err(general_err!( | ||
"Invalid Parquet file. Reported metadata length of {} + {} byte footer, but file is only {} bytes", | ||
metadata_len, | ||
FOOTER_SIZE, | ||
file_size | ||
)); | ||
return Err(ParquetError::NeedMoreData(footer_metadata_len)); | ||
} | ||
|
||
let start = file_size - footer_metadata_len as u64; | ||
|
@@ -517,11 +525,8 @@ mod tests { | |
#[test] | ||
fn test_parse_metadata_size_smaller_than_footer() { | ||
let test_file = tempfile::tempfile().unwrap(); | ||
let reader_result = ParquetMetaDataReader::parse_metadata(&test_file); | ||
assert_eq!( | ||
reader_result.unwrap_err().to_string(), | ||
"Parquet error: Invalid Parquet file. Size is smaller than footer" | ||
); | ||
let err = ParquetMetaDataReader::parse_metadata(&test_file).unwrap_err(); | ||
assert!(matches!(err, ParquetError::NeedMoreData(8))); | ||
} | ||
|
||
#[test] | ||
|
@@ -537,11 +542,8 @@ mod tests { | |
#[test] | ||
fn test_parse_metadata_invalid_start() { | ||
let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']); | ||
let reader_result = ParquetMetaDataReader::parse_metadata(&test_file); | ||
assert_eq!( | ||
reader_result.unwrap_err().to_string(), | ||
"Parquet error: Invalid Parquet file. Reported metadata length of 255 + 8 byte footer, but file is only 8 bytes" | ||
); | ||
let err = ParquetMetaDataReader::parse_metadata(&test_file).unwrap_err(); | ||
assert!(matches!(err, ParquetError::NeedMoreData(263))); | ||
} | ||
|
||
#[test] | ||
|
@@ -597,71 +599,89 @@ mod tests { | |
} | ||
|
||
#[test] | ||
fn test_simple() { | ||
fn test_try_parse() { | ||
let file = get_test_file("alltypes_tiny_pages.parquet"); | ||
let len = file.len() as usize; | ||
|
||
let mut reader = ParquetMetaDataReader::new().with_page_indexes(true); | ||
|
||
let bytes_for_range = |range: &Range<usize>| { | ||
file.get_bytes(range.start as u64, range.end - range.start) | ||
.unwrap() | ||
}; | ||
|
||
// read entire file | ||
let bytes = file.get_bytes(0, len).unwrap(); | ||
let bytes = bytes_for_range(&(0..len)); | ||
reader.try_parse(&bytes).unwrap(); | ||
let metadata = reader.finish().unwrap(); | ||
assert!(metadata.column_index.is_some()); | ||
assert!(metadata.offset_index.is_some()); | ||
|
||
// read more than enough of file | ||
let range = 320000..len; | ||
let bytes = file | ||
.get_bytes(range.start as u64, range.end - range.start) | ||
.unwrap(); | ||
let bytes = bytes_for_range(&range); | ||
reader.try_parse_range(&bytes, range).unwrap(); | ||
let metadata = reader.finish().unwrap(); | ||
assert!(metadata.column_index.is_some()); | ||
assert!(metadata.offset_index.is_some()); | ||
|
||
// exactly enough | ||
let range = 323583..len; | ||
let bytes = file | ||
.get_bytes(range.start as u64, range.end - range.start) | ||
.unwrap(); | ||
let bytes = bytes_for_range(&range); | ||
reader.try_parse_range(&bytes, range).unwrap(); | ||
let metadata = reader.finish().unwrap(); | ||
assert!(metadata.column_index.is_some()); | ||
assert!(metadata.offset_index.is_some()); | ||
|
||
// not enough for page index | ||
let range = 323584..len; | ||
let bytes = file | ||
.get_bytes(range.start as u64, range.end - range.start) | ||
.unwrap(); | ||
match reader.try_parse_range(&bytes, range) { | ||
Ok(_) => (), | ||
Err(err) => match err { | ||
// expected error, try again with provided bounds | ||
ParquetError::IndexOutOfBound(start, _) => { | ||
let range = start..len; | ||
let bytes = file | ||
.get_bytes(range.start as u64, range.end - range.start) | ||
.unwrap(); | ||
reader.try_parse_range(&bytes, range).unwrap(); | ||
let metadata = reader.finish().unwrap(); | ||
assert!(metadata.column_index.is_some()); | ||
assert!(metadata.offset_index.is_some()); | ||
} | ||
_ => panic!("unexpected error"), | ||
}, | ||
} | ||
let bytes = bytes_for_range(&range); | ||
// should fail | ||
match reader.try_parse_range(&bytes, range).unwrap_err() { | ||
// expected error, try again with provided bounds | ||
ParquetError::NeedLargerRange(needed) => { | ||
let bytes = file | ||
.get_bytes(needed.start as u64, needed.end - needed.start) | ||
.unwrap(); | ||
reader.try_parse_range(&bytes, needed).unwrap(); | ||
let metadata = reader.finish().unwrap(); | ||
assert!(metadata.column_index.is_some()); | ||
assert!(metadata.offset_index.is_some()); | ||
} | ||
_ => panic!("unexpected error"), | ||
}; | ||
|
||
// not enough for file metadata | ||
let mut reader = ParquetMetaDataReader::new(); | ||
let range = 452505..len; | ||
let bytes = file | ||
.get_bytes(range.start as u64, range.end - range.start) | ||
.unwrap(); | ||
let err = reader.try_parse_range(&bytes, range).unwrap_err(); | ||
let bytes = bytes_for_range(&range); | ||
// should fail | ||
match reader.try_parse_range(&bytes, range).unwrap_err() { | ||
// expected error, try again with provided bounds | ||
ParquetError::NeedLargerRange(needed) => { | ||
let bytes = file | ||
.get_bytes(needed.start as u64, needed.end - needed.start) | ||
.unwrap(); | ||
reader.try_parse_range(&bytes, needed).unwrap(); | ||
reader.finish().unwrap(); | ||
} | ||
_ => panic!("unexpected error"), | ||
}; | ||
|
||
// not enough for file metadata but use try_parse() | ||
let reader_result = reader.try_parse(&bytes).unwrap_err(); | ||
assert_eq!( | ||
err.to_string(), | ||
"Parquet error: Invalid Parquet file. Reported metadata length of 1721 + 8 byte footer, but file is only 1728 bytes" | ||
reader_result.to_string(), | ||
"EOF: Parquet file too small. Size is 1728 but need 1729" | ||
); | ||
|
||
// read head of file rather than tail | ||
let range = 0..1000; | ||
let bytes = bytes_for_range(&range); | ||
let reader_result = reader.try_parse_range(&bytes, range).unwrap_err(); | ||
assert_eq!( | ||
reader_result.to_string(), | ||
"Parquet error: Invalid Parquet file. Corrupt footer" | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nitpick is that these seems pretty similar . I wonder if it would make sense to combine them somehow 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't intending two, it just turned out that way. I could make the second
usize
optional with the understanding that a range is being requested.Also, does adding to the enum make this a breaking change? If so, I could go back to my tortured use of
IndexOutOfBound
until it's open season on breaking changes.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, does adding to the enum make this a breaking change? If so, I could go back to my tortured use of IndexOutOfBound until it's open season on breaking changes.
Yes, unfortunately, it does make it a breaking change
https://github.com/apache/arrow-rs/blob/master/parquet/src/errors.rs#L29
We should probably mark the error type as "non exhaustive" which would make it a non breaking change in the future