Skip to content

Commit

Permalink
feat(mito): Ports InMemoryRowGroup from parquet crate (#2633)
Browse files Browse the repository at this point in the history
* feat: ports InMemoryRowGroup from parquet

* chore: pub InMemoryRowGroup

* style: allow some clippy lints
  • Loading branch information
evenyag authored Oct 23, 2023
1 parent 4d47865 commit 82dbc3e
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
mod format;
pub mod reader;
pub mod row_group;
mod stats;
pub mod writer;

Expand Down
230 changes: 230 additions & 0 deletions src/mito2/src/sst/parquet/row_group.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650).
use std::sync::Arc;

use bytes::{Buf, Bytes};
use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::ProjectionMask;
use parquet::column::page::{PageIterator, PageReader};
use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::reader::{ChunkReader, Length};
use parquet::file::serialized_reader::SerializedPageReader;
use parquet::format::PageLocation;

/// An in-memory collection of column chunks
pub struct InMemoryRowGroup<'a> {
metadata: &'a RowGroupMetaData,
page_locations: Option<&'a [Vec<PageLocation>]>,
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
row_count: usize,
}

impl<'a> InMemoryRowGroup<'a> {
/// Fetches the necessary column data into memory
// TODO(yingwen): Fix clippy warnings.
#[allow(clippy::filter_map_bool_then)]
#[allow(clippy::useless_conversion)]
pub async fn fetch<T: AsyncFileReader + Send>(
&mut self,
input: &mut T,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
if let Some((selection, page_locations)) = selection.zip(self.page_locations) {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];

let fetch_ranges = self
.column_chunks
.iter()
.zip(self.metadata.columns())
.enumerate()
.filter_map(|(idx, (chunk, chunk_meta))| {
(chunk.is_none() && projection.leaf_included(idx)).then(|| {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
}
_ => (),
}

ranges.extend(selection.scan_ranges(&page_locations[idx]));
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());

ranges
})
})
.flatten()
.collect();

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut page_start_offsets = page_start_offsets.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}

if let Some(offsets) = page_start_offsets.next() {
let mut chunks = Vec::with_capacity(offsets.len());
for _ in 0..offsets.len() {
chunks.push(chunk_data.next().unwrap());
}

*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: self.metadata.column(idx).byte_range().1 as usize,
data: offsets.into_iter().zip(chunks.into_iter()).collect(),
}))
}
}
} else {
let fetch_ranges = self
.column_chunks
.iter()
.enumerate()
.filter_map(|(idx, chunk)| {
(chunk.is_none() && projection.leaf_included(idx)).then(|| {
let column = self.metadata.column(idx);
let (start, length) = column.byte_range();
start as usize..(start + length) as usize
})
})
.collect();

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}

if let Some(data) = chunk_data.next() {
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: self.metadata.column(idx).byte_range().0 as usize,
data,
}));
}
}
}

Ok(())
}
}

impl<'a> RowGroups for InMemoryRowGroup<'a> {
fn num_rows(&self) -> usize {
self.row_count
}

fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
match &self.column_chunks[i] {
None => Err(ParquetError::General(format!(
"Invalid column index {i}, column was not fetched"
))),
Some(data) => {
let page_locations = self.page_locations.map(|index| index[i].clone());
let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
data.clone(),
self.metadata.column(i),
self.row_count,
page_locations,
)?);

Ok(Box::new(ColumnChunkIterator {
reader: Some(Ok(page_reader)),
}))
}
}
}
}

/// An in-memory column chunk
#[derive(Clone)]
enum ColumnChunkData {
/// Column chunk data representing only a subset of data pages
Sparse {
/// Length of the full column chunk
length: usize,
/// Set of data pages included in this sparse chunk. Each element is a tuple
/// of (page offset, page data)
data: Vec<(usize, Bytes)>,
},
/// Full column chunk and its offset
Dense { offset: usize, data: Bytes },
}

impl ColumnChunkData {
fn get(&self, start: u64) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.clone())
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {start}"
))
}),
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
Ok(data.slice(start..))
}
}
}
}

impl Length for ColumnChunkData {
fn len(&self) -> u64 {
match &self {
ColumnChunkData::Sparse { length, .. } => *length as u64,
ColumnChunkData::Dense { data, .. } => data.len() as u64,
}
}
}

impl ChunkReader for ColumnChunkData {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64) -> Result<Self::T> {
Ok(self.get(start)?.reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
Ok(self.get(start)?.slice(..length))
}
}

/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
struct ColumnChunkIterator {
reader: Option<Result<Box<dyn PageReader>>>,
}

impl Iterator for ColumnChunkIterator {
type Item = Result<Box<dyn PageReader>>;

fn next(&mut self) -> Option<Self::Item> {
self.reader.take()
}
}

impl PageIterator for ColumnChunkIterator {}

0 comments on commit 82dbc3e

Please sign in to comment.