Skip to content

Commit

Permalink
Reduce panics
Browse files Browse the repository at this point in the history
  • Loading branch information
jp0317 committed Nov 16, 2024
1 parent 5a86db3 commit 18c494b
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 47 deletions.
2 changes: 1 addition & 1 deletion parquet/examples/read_with_rowgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl InMemoryRowGroup {
let mut vs = std::mem::take(&mut self.column_chunks);
for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() {
if self.mask.leaf_included(leaf_idx) {
let (start, len) = meta.byte_range();
let (start, len) = meta.byte_range()?;
let data = reader
.get_bytes(start as usize..(start + len) as usize)
.await?;
Expand Down
25 changes: 14 additions & 11 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,11 +745,11 @@ impl<'a> InMemoryRowGroup<'a> {
.filter(|&(idx, (chunk, _chunk_meta))| {
chunk.is_none() && projection.leaf_included(idx)
})
.flat_map(|(idx, (_chunk, chunk_meta))| {
.flat_map(|(idx, (_chunk, chunk_meta))| -> Result<Vec<Range<usize>>> {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
let (start, _len) = chunk_meta.byte_range()?;
match offset_index[idx].page_locations.first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
Expand All @@ -760,8 +760,11 @@ impl<'a> InMemoryRowGroup<'a> {
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());

ranges
Ok(ranges)
})
.collect::<Vec<_>>()
.into_iter()
.flat_map(|ranges| ranges)
.collect();

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
Expand All @@ -779,25 +782,25 @@ impl<'a> InMemoryRowGroup<'a> {
}

*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: self.metadata.column(idx).byte_range().1 as usize,
length: self.metadata.column(idx).byte_range()?.1 as usize,
data: offsets.into_iter().zip(chunks.into_iter()).collect(),
}))
}
}
} else {
let fetch_ranges = self
let fetch_ranges: Result<Vec<Range<usize>>> = self
.column_chunks
.iter()
.enumerate()
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
let column = self.metadata.column(idx);
let (start, length) = column.byte_range();
start as usize..(start + length) as usize
let (start, length) = column.byte_range()?;
Ok(start as usize..(start + length) as usize)
})
.collect();

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut chunk_data = input.get_byte_ranges(fetch_ranges?).await?.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
Expand All @@ -806,7 +809,7 @@ impl<'a> InMemoryRowGroup<'a> {

if let Some(data) = chunk_data.next() {
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: self.metadata.column(idx).byte_range().0 as usize,
offset: self.metadata.column(idx).byte_range()?.0 as usize,
data,
}));
}
Expand Down Expand Up @@ -1008,8 +1011,8 @@ mod tests {
assert_eq!(async_batches, sync_batches);

let requests = requests.lock().unwrap();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range().unwrap();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range().unwrap();

assert_eq!(
&requests[..],
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Common Parquet errors and macros.
use core::num::TryFromIntError;
use std::error::Error;
use std::{cell, io, result, str};

Expand Down Expand Up @@ -76,6 +77,12 @@ impl Error for ParquetError {
}
}

impl From<TryFromIntError> for ParquetError {
fn from(e: TryFromIntError) -> ParquetError {
ParquetError::General(format!("Integer overflow: {e}"))
}
}

impl From<io::Error> for ParquetError {
fn from(e: io::Error) -> ParquetError {
ParquetError::External(Box::new(e))
Expand Down
11 changes: 5 additions & 6 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,17 +959,16 @@ impl ColumnChunkMetaData {
}

/// Returns the offset and length in bytes of the column chunk within the file
pub fn byte_range(&self) -> (u64, u64) {
pub fn byte_range(&self) -> Result<(u64, u64)> {
let col_start = match self.dictionary_page_offset() {
Some(dictionary_page_offset) => dictionary_page_offset,
None => self.data_page_offset(),
};
let col_len = self.compressed_size();
assert!(
col_start >= 0 && col_len >= 0,
"column start and length should not be negative"
);
(col_start as u64, col_len as u64)
if col_start < 0 || col_len < 0 {
return Err(general_err!("column start and length should not be negative"));
}
Ok((col_start as u64, col_len as u64))
}

/// Returns statistics that are set for this column chunk,
Expand Down
25 changes: 12 additions & 13 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ impl ParquetMetaDataReader {
for rg in t_file_metadata.row_groups {
row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
}
let column_orders = Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr);
let column_orders = Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;

let file_metadata = FileMetaData::new(
t_file_metadata.version,
Expand All @@ -635,15 +635,13 @@ impl ParquetMetaDataReader {
fn parse_column_orders(
t_column_orders: Option<Vec<TColumnOrder>>,
schema_descr: &SchemaDescriptor,
) -> Option<Vec<ColumnOrder>> {
) -> Result<Option<Vec<ColumnOrder>>> {
match t_column_orders {
Some(orders) => {
// Should always be the case
assert_eq!(
orders.len(),
schema_descr.num_columns(),
"Column order length mismatch"
);
if orders.len() != schema_descr.num_columns() {
return Err(general_err!("Column order length mismatch"));
};
let mut res = Vec::new();
for (i, column) in schema_descr.columns().iter().enumerate() {
match orders[i] {
Expand All @@ -657,9 +655,9 @@ impl ParquetMetaDataReader {
}
}
}
Some(res)
Ok(Some(res))
}
None => None,
None => Ok(None),
}
}
}
Expand Down Expand Up @@ -731,7 +729,7 @@ mod tests {
]);

assert_eq!(
ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr),
ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(),
Some(vec![
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
Expand All @@ -740,20 +738,21 @@ mod tests {

// Test when no column orders are defined.
assert_eq!(
ParquetMetaDataReader::parse_column_orders(None, &schema_descr),
ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(),
None
);
}

#[test]
#[should_panic(expected = "Column order length mismatch")]
fn test_metadata_column_orders_len_mismatch() {
let schema = SchemaType::group_type_builder("schema").build().unwrap();
let schema_descr = SchemaDescriptor::new(Arc::new(schema));

let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);

ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
assert!(res.is_err());
assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
}

#[test]
Expand Down
55 changes: 47 additions & 8 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ pub(crate) fn decode_page(
let is_sorted = dict_header.is_sorted.unwrap_or(false);
Page::DictionaryPage {
buf: buffer,
num_values: dict_header.num_values as u32,
num_values: dict_header.num_values.try_into()?,
encoding: Encoding::try_from(dict_header.encoding)?,
is_sorted,
}
Expand All @@ -446,7 +446,7 @@ pub(crate) fn decode_page(
.ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
Page::DataPage {
buf: buffer,
num_values: header.num_values as u32,
num_values: header.num_values.try_into()?,
encoding: Encoding::try_from(header.encoding)?,
def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
Expand All @@ -460,12 +460,12 @@ pub(crate) fn decode_page(
let is_compressed = header.is_compressed.unwrap_or(true);
Page::DataPageV2 {
buf: buffer,
num_values: header.num_values as u32,
num_values: header.num_values.try_into()?,
encoding: Encoding::try_from(header.encoding)?,
num_nulls: header.num_nulls as u32,
num_rows: header.num_rows as u32,
def_levels_byte_len: header.definition_levels_byte_length as u32,
rep_levels_byte_len: header.repetition_levels_byte_length as u32,
num_nulls: header.num_nulls.try_into()?,
num_rows: header.num_rows.try_into()?,
def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
is_compressed,
statistics: statistics::from_thrift(physical_type, header.statistics)?,
}
Expand Down Expand Up @@ -535,7 +535,7 @@ impl<R: ChunkReader> SerializedPageReader<R> {
props: ReaderPropertiesPtr,
) -> Result<Self> {
let decompressor = create_codec(meta.compression(), props.codec_options())?;
let (start, len) = meta.byte_range();
let (start, len) = meta.byte_range()?;

let state = match page_locations {
Some(locations) => {
Expand Down Expand Up @@ -578,6 +578,27 @@ impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
}
}

fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<()> {
if header_len > remaining_bytes {
return Err(eof_err!("Invalid page header"));
}
Ok(())
}

fn verify_page_size(
compressed_size: i32,
uncompressed_size: i32,
remaining_bytes: usize,
) -> Result<()> {
// The page's compressed size should not exceed the remaining bytes that are
// available to read. The page's uncompressed size is the expected size
// after decompression, which can never be negative.
if compressed_size < 0 || compressed_size as usize > remaining_bytes || uncompressed_size < 0 {
return Err(eof_err!("Invalid page header"));
}
Ok(())
}

impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
loop {
Expand All @@ -596,10 +617,16 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
*header
} else {
let (header_len, header) = read_page_header_len(&mut read)?;
verify_page_header_len(header_len, *remaining)?;
*offset += header_len;
*remaining -= header_len;
header
};
verify_page_size(
header.compressed_page_size,
header.uncompressed_page_size,
*remaining,
)?;
let data_len = header.compressed_page_size as usize;
*offset += data_len;
*remaining -= data_len;
Expand Down Expand Up @@ -683,6 +710,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
} else {
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
verify_page_header_len(header_len, *remaining_bytes)?;
*offset += header_len;
*remaining_bytes -= header_len;
let page_meta = if let Ok(page_meta) = (&header).try_into() {
Expand Down Expand Up @@ -733,12 +761,23 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
next_page_header,
} => {
if let Some(buffered_header) = next_page_header.take() {
verify_page_size(
buffered_header.compressed_page_size,
buffered_header.uncompressed_page_size,
*remaining_bytes,
)?;
// The next page header has already been peeked, so just advance the offset
*offset += buffered_header.compressed_page_size as usize;
*remaining_bytes -= buffered_header.compressed_page_size as usize;
} else {
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
verify_page_header_len(header_len, *remaining_bytes)?;
verify_page_size(
header.compressed_page_size,
header.uncompressed_page_size,
*remaining_bytes,
)?;
let data_page_size = header.compressed_page_size as usize;
*offset += header_len + data_page_size;
*remaining_bytes -= header_len + data_page_size;
Expand Down
26 changes: 26 additions & 0 deletions parquet/src/file/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,32 @@ pub fn from_thrift(
stats.max_value
};

fn check_len(min: &Option<Vec<u8>>, max: &Option<Vec<u8>>, len: usize) -> Result<()> {
if let Some(min) = min {
if min.len() < len {
return Err(ParquetError::General(format!(
"Insufficient bytes to parse min statistic",
)));
}
}
if let Some(max) = max {
if max.len() < len {
return Err(ParquetError::General(format!(
"Insufficient bytes to parse max statistic",
)));
}
}
Ok(())
}

match physical_type {
Type::BOOLEAN => check_len(&min, &max, 1),
Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
Type::INT96 => check_len(&min, &max, 12),
_ => Ok(())
}?;

// Values are encoded using PLAIN encoding definition, except that
// variable-length byte arrays do not include a length prefix.
//
Expand Down
6 changes: 6 additions & 0 deletions parquet/src/format.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions parquet/src/schema/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,10 @@ pub fn from_thrift(elements: &[SchemaElement]) -> Result<TypePtr> {
));
}

if !schema_nodes[0].is_group() {
return Err(general_err!("Expected root node to be a group type"));
}

Ok(schema_nodes.remove(0))
}

Expand Down Expand Up @@ -1227,6 +1231,8 @@ fn from_thrift_helper(elements: &[SchemaElement], index: usize) -> Result<(usize
if !is_root_node {
builder = builder.with_repetition(rep);
}
} else if !is_root_node {
return Err(general_err!("Repetition level must be defined for non-root types"));
}
Ok((next_index, Arc::new(builder.build().unwrap())))
}
Expand Down
Loading

0 comments on commit 18c494b

Please sign in to comment.