diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 872c0e410408..481f98f1af12 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -16,6 +16,7 @@ mod format; pub mod reader; +pub mod row_group; mod stats; pub mod writer; diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs new file mode 100644 index 000000000000..a80f7c874253 --- /dev/null +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -0,0 +1,230 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650). + +use std::sync::Arc; + +use bytes::{Buf, Bytes}; +use parquet::arrow::arrow_reader::{RowGroups, RowSelection}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::ProjectionMask; +use parquet::column::page::{PageIterator, PageReader}; +use parquet::errors::{ParquetError, Result}; +use parquet::file::metadata::RowGroupMetaData; +use parquet::file::reader::{ChunkReader, Length}; +use parquet::file::serialized_reader::SerializedPageReader; +use parquet::format::PageLocation; + +/// An in-memory collection of column chunks +pub struct InMemoryRowGroup<'a> { + metadata: &'a RowGroupMetaData, + page_locations: Option<&'a [Vec]>, + column_chunks: Vec>>, + row_count: usize, +} + +impl<'a> InMemoryRowGroup<'a> { + /// Fetches the necessary column data into memory + // TODO(yingwen): Fix clippy warnings. + #[allow(clippy::filter_map_bool_then)] + #[allow(clippy::useless_conversion)] + pub async fn fetch( + &mut self, + input: &mut T, + projection: &ProjectionMask, + selection: Option<&RowSelection>, + ) -> Result<()> { + if let Some((selection, page_locations)) = selection.zip(self.page_locations) { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut page_start_offsets: Vec> = vec![]; + + let fetch_ranges = self + .column_chunks + .iter() + .zip(self.metadata.columns()) + .enumerate() + .filter_map(|(idx, (chunk, chunk_meta))| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + // If the first page does not start at the beginning of the column, + // then we need to also fetch a dictionary page. + let mut ranges = vec![]; + let (start, _len) = chunk_meta.byte_range(); + match page_locations[idx].first() { + Some(first) if first.offset as u64 != start => { + ranges.push(start as usize..first.offset as usize); + } + _ => (), + } + + ranges.extend(selection.scan_ranges(&page_locations[idx])); + page_start_offsets.push(ranges.iter().map(|range| range.start).collect()); + + ranges + }) + }) + .flatten() + .collect(); + + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut page_start_offsets = page_start_offsets.into_iter(); + + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(offsets) = page_start_offsets.next() { + let mut chunks = Vec::with_capacity(offsets.len()); + for _ in 0..offsets.len() { + chunks.push(chunk_data.next().unwrap()); + } + + *chunk = Some(Arc::new(ColumnChunkData::Sparse { + length: self.metadata.column(idx).byte_range().1 as usize, + data: offsets.into_iter().zip(chunks.into_iter()).collect(), + })) + } + } + } else { + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let column = self.metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize + }) + }) + .collect(); + + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(data) = chunk_data.next() { + *chunk = Some(Arc::new(ColumnChunkData::Dense { + offset: self.metadata.column(idx).byte_range().0 as usize, + data, + })); + } + } + } + + Ok(()) + } +} + +impl<'a> RowGroups for InMemoryRowGroup<'a> { + fn num_rows(&self) -> usize { + self.row_count + } + + fn column_chunks(&self, i: usize) -> Result> { + match &self.column_chunks[i] { + None => Err(ParquetError::General(format!( + "Invalid column index {i}, column was not fetched" + ))), + Some(data) => { + let page_locations = self.page_locations.map(|index| index[i].clone()); + let page_reader: Box = Box::new(SerializedPageReader::new( + data.clone(), + self.metadata.column(i), + self.row_count, + page_locations, + )?); + + Ok(Box::new(ColumnChunkIterator { + reader: Some(Ok(page_reader)), + })) + } + } + } +} + +/// An in-memory column chunk +#[derive(Clone)] +enum ColumnChunkData { + /// Column chunk data representing only a subset of data pages + Sparse { + /// Length of the full column chunk + length: usize, + /// Set of data pages included in this sparse chunk. Each element is a tuple + /// of (page offset, page data) + data: Vec<(usize, Bytes)>, + }, + /// Full column chunk and its offset + Dense { offset: usize, data: Bytes }, +} + +impl ColumnChunkData { + fn get(&self, start: u64) -> Result { + match &self { + ColumnChunkData::Sparse { data, .. } => data + .binary_search_by_key(&start, |(offset, _)| *offset as u64) + .map(|idx| data[idx].1.clone()) + .map_err(|_| { + ParquetError::General(format!( + "Invalid offset in sparse column chunk data: {start}" + )) + }), + ColumnChunkData::Dense { offset, data } => { + let start = start as usize - *offset; + Ok(data.slice(start..)) + } + } + } +} + +impl Length for ColumnChunkData { + fn len(&self) -> u64 { + match &self { + ColumnChunkData::Sparse { length, .. } => *length as u64, + ColumnChunkData::Dense { data, .. } => data.len() as u64, + } + } +} + +impl ChunkReader for ColumnChunkData { + type T = bytes::buf::Reader; + + fn get_read(&self, start: u64) -> Result { + Ok(self.get(start)?.reader()) + } + + fn get_bytes(&self, start: u64, length: usize) -> Result { + Ok(self.get(start)?.slice(..length)) + } +} + +/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`] +struct ColumnChunkIterator { + reader: Option>>, +} + +impl Iterator for ColumnChunkIterator { + type Item = Result>; + + fn next(&mut self) -> Option { + self.reader.take() + } +} + +impl PageIterator for ColumnChunkIterator {}