Skip to content

Commit

Permalink
remove some changes as per reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
jp0317 committed Nov 20, 2024
1 parent a4f8286 commit a88bc81
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 39 deletions.
2 changes: 1 addition & 1 deletion parquet/examples/read_with_rowgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl InMemoryRowGroup {
let mut vs = std::mem::take(&mut self.column_chunks);
for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() {
if self.mask.leaf_included(leaf_idx) {
let (start, len) = meta.byte_range()?;
let (start, len) = meta.byte_range();
let data = reader
.get_bytes(start as usize..(start + len) as usize)
.await?;
Expand Down
25 changes: 11 additions & 14 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,11 +745,11 @@ impl<'a> InMemoryRowGroup<'a> {
.filter(|&(idx, (chunk, _chunk_meta))| {
chunk.is_none() && projection.leaf_included(idx)
})
.flat_map(|(idx, (_chunk, chunk_meta))| -> Result<Vec<Range<usize>>> {
.flat_map(|(idx, (_chunk, chunk_meta))| {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range()?;
let (start, _len) = chunk_meta.byte_range();
match offset_index[idx].page_locations.first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
Expand All @@ -760,11 +760,8 @@ impl<'a> InMemoryRowGroup<'a> {
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());

Ok(ranges)
ranges
})
.collect::<Vec<_>>()
.into_iter()
.flatten()
.collect();

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
Expand All @@ -782,25 +779,25 @@ impl<'a> InMemoryRowGroup<'a> {
}

*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: self.metadata.column(idx).byte_range()?.1 as usize,
length: self.metadata.column(idx).byte_range().1 as usize,
data: offsets.into_iter().zip(chunks.into_iter()).collect(),
}))
}
}
} else {
let fetch_ranges: Result<Vec<Range<usize>>> = self
let fetch_ranges = self
.column_chunks
.iter()
.enumerate()
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
let column = self.metadata.column(idx);
let (start, length) = column.byte_range()?;
Ok(start as usize..(start + length) as usize)
let (start, length) = column.byte_range();
start as usize..(start + length) as usize
})
.collect();

let mut chunk_data = input.get_byte_ranges(fetch_ranges?).await?.into_iter();
let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
Expand All @@ -809,7 +806,7 @@ impl<'a> InMemoryRowGroup<'a> {

if let Some(data) = chunk_data.next() {
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: self.metadata.column(idx).byte_range()?.0 as usize,
offset: self.metadata.column(idx).byte_range().0 as usize,
data,
}));
}
Expand Down Expand Up @@ -1011,8 +1008,8 @@ mod tests {
assert_eq!(async_batches, sync_batches);

let requests = requests.lock().unwrap();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range().unwrap();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range().unwrap();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();

assert_eq!(
&requests[..],
Expand Down
13 changes: 6 additions & 7 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,18 +959,17 @@ impl ColumnChunkMetaData {
}

/// Returns the offset and length in bytes of the column chunk within the file
pub fn byte_range(&self) -> Result<(u64, u64)> {
pub fn byte_range(&self) -> (u64, u64) {
let col_start = match self.dictionary_page_offset() {
Some(dictionary_page_offset) => dictionary_page_offset,
None => self.data_page_offset(),
};
let col_len = self.compressed_size();
if col_start < 0 || col_len < 0 {
return Err(general_err!(
"column start and length should not be negative"
));
}
Ok((col_start as u64, col_len as u64))
assert!(
col_start >= 0 && col_len >= 0,
"column start and length should not be negative"
);
(col_start as u64, col_len as u64)
}

/// Returns statistics that are set for this column chunk,
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ impl<R: ChunkReader> SerializedPageReader<R> {
props: ReaderPropertiesPtr,
) -> Result<Self> {
let decompressor = create_codec(meta.compression(), props.codec_options())?;
let (start, len) = meta.byte_range()?;
let (start, len) = meta.byte_range();

let state = match page_locations {
Some(locations) => {
Expand Down
4 changes: 0 additions & 4 deletions parquet/src/schema/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1247,10 +1247,6 @@ fn from_thrift_helper(elements: &[SchemaElement], index: usize) -> Result<(usize
if !is_root_node {
builder = builder.with_repetition(rep);
}
} else if !is_root_node {
return Err(general_err!(
"Repetition level must be defined for non-root types"
));
}
Ok((next_index, Arc::new(builder.build().unwrap())))
}
Expand Down
14 changes: 2 additions & 12 deletions parquet/src/thrift.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,7 @@ impl<'a> TCompactSliceInputProtocol<'a> {
let mut shift = 0;
loop {
let byte = self.read_byte()?;
let val = (byte & 0x7F) as u64;
let val = val.checked_shl(shift).map_or_else(
|| {
Err(thrift::Error::Protocol(thrift::ProtocolError {
kind: thrift::ProtocolErrorKind::InvalidData,
message: format!("cannot left-shift {} by {} bits", val, shift),
}))
},
Ok,
)?;
in_progress |= val;
in_progress |= ((byte & 0x7F) as u64) << shift;
shift += 7;
if byte & 0x80 == 0 {
return Ok(in_progress);
Expand Down Expand Up @@ -117,7 +107,7 @@ macro_rules! thrift_unimplemented {

impl TInputProtocol for TCompactSliceInputProtocol<'_> {
fn read_message_begin(&mut self) -> thrift::Result<TMessageIdentifier> {
thrift_unimplemented!()
unimplemented!()
}

fn read_message_end(&mut self) -> thrift::Result<()> {
Expand Down

0 comments on commit a88bc81

Please sign in to comment.