diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index fcc3842111d1..e6772244a045 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -388,6 +388,14 @@ pub enum Error { region_dir: String, location: Location, }, + + #[snafu(display("Failed to read arrow record batch from parquet file {}", path))] + ArrowReader { + path: String, + #[snafu(source)] + error: ArrowError, + location: Location, + }, } pub type Result = std::result::Result; @@ -458,6 +466,7 @@ impl ErrorExt for Error { RegionReadonly { .. } => StatusCode::RegionReadonly, JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } => StatusCode::RegionNotFound, + ArrowReader { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 6c6415b6f2cd..67353c90dd61 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -173,6 +173,7 @@ impl ScanRegion { ); let predicate = Predicate::new(self.request.filters.clone()); + // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match &self.request.projection { Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?, None => ProjectionMapper::all(&self.version.metadata)?, diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 416a2f5d719a..a9fac535355b 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -26,7 +26,7 @@ //! //! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()). -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use api::v1::SemanticType; @@ -138,7 +138,10 @@ impl ReadFormat { } } - /// Gets the converted arrow schema. + /// Gets the arrow schema of the SST file. + /// + /// This schema is computed from the region metadata but should be the same + /// as the arrow schema decoded from the file metadata. pub(crate) fn arrow_schema(&self) -> &SchemaRef { &self.arrow_schema } @@ -178,7 +181,7 @@ impl ReadFormat { pub(crate) fn convert_record_batch( &self, record_batch: &RecordBatch, - batches: &mut Vec, + batches: &mut VecDeque, ) -> Result<()> { debug_assert!(batches.is_empty()); @@ -249,7 +252,7 @@ impl ReadFormat { } let batch = builder.build()?; - batches.push(batch); + batches.push_back(batch); } Ok(()) @@ -768,7 +771,7 @@ mod tests { assert_eq!(arrow_schema, *read_format.arrow_schema()); let record_batch = RecordBatch::new_empty(arrow_schema); - let mut batches = vec![]; + let mut batches = VecDeque::new(); read_format .convert_record_batch(&record_batch, &mut batches) .unwrap(); @@ -790,14 +793,14 @@ mod tests { ]; let arrow_schema = build_test_arrow_schema(); let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap(); - let mut batches = vec![]; + let mut batches = VecDeque::new(); read_format .convert_record_batch(&record_batch, &mut batches) .unwrap(); assert_eq!( vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)], - batches + batches.into_iter().collect::>(), ); } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 0e40a909a364..254ecab1c119 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -14,37 +14,34 @@ //! Parquet reader. -use std::collections::HashSet; -use std::ops::Range; +use std::collections::{HashSet, VecDeque}; use std::sync::Arc; -use async_compat::CompatExt; +use async_compat::{Compat, CompatExt}; use async_trait::async_trait; -use bytes::Bytes; use common_time::range::TimestampRange; use datatypes::arrow::record_batch::RecordBatch; -use futures::future::BoxFuture; -use futures::stream::BoxStream; -use futures::{FutureExt, TryStreamExt}; -use object_store::ObjectStore; +use object_store::{ObjectStore, Reader}; +use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::arrow::async_reader::AsyncFileReader; -use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; -use parquet::errors::ParquetError; +use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; use parquet::file::metadata::ParquetMetaData; use parquet::format::KeyValue; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; -use store_api::storage::{ColumnId, RegionId}; +use store_api::storage::ColumnId; use table::predicate::Predicate; use tokio::io::BufReader; use crate::cache::CacheManagerRef; use crate::error::{ - InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result, + ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, + Result, }; use crate::read::{Batch, BatchReader}; -use crate::sst::file::{FileHandle, FileId}; +use crate::sst::file::FileHandle; use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::row_group::InMemoryRowGroup; use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; @@ -114,46 +111,22 @@ impl ParquetReaderBuilder { /// Builds and initializes a [ParquetReader]. /// /// This needs to perform IO operation. - pub async fn build(self) -> Result { + pub async fn build(&self) -> Result { let file_path = self.file_handle.file_path(&self.file_dir); - let (stream, read_format) = self.init_stream(&file_path).await?; - - Ok(ParquetReader { - file_path, - _file_handle: self.file_handle, - stream, - read_format, - batches: Vec::new(), - }) - } - - /// Initializes the parquet stream, also creates a [ReadFormat] to decode record batches. - async fn init_stream(&self, file_path: &str) -> Result<(BoxedRecordBatchStream, ReadFormat)> { - // Creates parquet stream builder. + // Now we create a reader to read the whole file. let reader = self .object_store - .reader(file_path) + .reader(&file_path) .await .context(OpenDalSnafu)? .compat(); - let reader = BufReader::new(reader); - let reader = AsyncFileReaderCache { - reader, - // TODO(yingwen): Sets the metadata when we implement row group level reader. - metadata: None, - cache: self.cache_manager.clone(), - region_id: self.file_handle.region_id(), - file_id: self.file_handle.file_id(), - }; - let mut builder = ParquetRecordBatchStreamBuilder::new(reader) - .await - .context(ReadParquetSnafu { path: file_path })? - .with_batch_size(DEFAULT_READ_BATCH_SIZE); - - // Decode region metadata. - let key_value_meta = builder.metadata().file_metadata().key_value_metadata(); - let region_meta = self.get_region_metadata(file_path, key_value_meta)?; - + let mut reader = BufReader::new(reader); + // Loads parquet metadata of the file. + let parquet_meta = self.read_parquet_metadata(&mut reader, &file_path).await?; + // Decodes region metadata. + let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); + let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?; + // Computes column ids to read. let column_ids: HashSet<_> = self .projection .as_ref() @@ -165,56 +138,59 @@ impl ParquetReaderBuilder { .map(|c| c.column_id) .collect() }); - let read_format = ReadFormat::new(Arc::new(region_meta)); - // The arrow schema converted from the region meta should be the same as parquet's. - // We only compare fields to avoid schema's metadata breaks the comparison. - ensure!( - read_format.arrow_schema().fields() == builder.schema().fields(), - InvalidParquetSnafu { - file: file_path, - reason: format!( - "schema mismatch, expect: {:?}, given: {:?}", - read_format.arrow_schema().fields(), - builder.schema().fields() - ) - } - ); - - // Prune row groups by metadata. - if let Some(predicate) = &self.predicate { - let stats = RowGroupPruningStats::new( - builder.metadata().row_groups(), - &read_format, - column_ids, - ); - let pruned_row_groups = predicate + // Prunes row groups by metadata. + let row_groups: VecDeque<_> = if let Some(predicate) = &self.predicate { + let stats = + RowGroupPruningStats::new(parquet_meta.row_groups(), &read_format, column_ids); + + predicate .prune_with_stats(&stats, read_format.metadata().schema.arrow_schema()) .into_iter() .enumerate() .filter_map(|(idx, valid)| if valid { Some(idx) } else { None }) - .collect::>(); - builder = builder.with_row_groups(pruned_row_groups); - } + .collect() + } else { + (0..parquet_meta.num_row_groups()).collect() + }; - let parquet_schema_desc = builder.metadata().file_metadata().schema_descr(); - if let Some(column_ids) = self.projection.as_ref() { + // Computes the projection mask. + let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); + let projection_mask = if let Some(column_ids) = self.projection.as_ref() { let indices = read_format.projection_indices(column_ids.iter().copied()); - let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices); - builder = builder.with_projection(projection_mask); - } + // Now we assumes we don't have nested schemas. + ProjectionMask::roots(parquet_schema_desc, indices) + } else { + ProjectionMask::all() + }; - let stream = builder - .build() - .context(ReadParquetSnafu { path: file_path })?; + // Computes the field levels. + let hint = Some(read_format.arrow_schema().fields()); + let field_levels = + parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) + .context(ReadParquetSnafu { path: &file_path })?; - Ok((Box::pin(stream), read_format)) + let reader_builder = RowGroupReaderBuilder { + file_path, + parquet_meta, + file_reader: reader, + projection: projection_mask, + field_levels, + }; + + Ok(ParquetReader { + _file_handle: self.file_handle.clone(), + row_groups, + read_format, + reader_builder, + current_reader: None, + batches: VecDeque::new(), + }) } - /// Decode region metadata from key value. + /// Decodes region metadata from key value. fn get_region_metadata( - &self, file_path: &str, key_value_meta: Option<&Vec>, ) -> Result { @@ -239,49 +215,119 @@ impl ParquetReaderBuilder { RegionMetadata::from_json(json).context(InvalidMetadataSnafu) } + + /// Reads parquet metadata of specific file. + async fn read_parquet_metadata( + &self, + reader: &mut impl AsyncFileReader, + file_path: &str, + ) -> Result> { + // Tries to get from global cache. + if let Some(metadata) = self.cache_manager.as_ref().and_then(|cache| { + cache.get_parquet_meta_data(self.file_handle.region_id(), self.file_handle.file_id()) + }) { + return Ok(metadata); + } + + // Cache miss, get from the reader. + let metadata = reader + .get_metadata() + .await + .context(ReadParquetSnafu { path: file_path })?; + // Cache the metadata. + if let Some(cache) = &self.cache_manager { + cache.put_parquet_meta_data( + self.file_handle.region_id(), + self.file_handle.file_id(), + metadata.clone(), + ); + } + + Ok(metadata) + } } -type BoxedRecordBatchStream = BoxStream<'static, std::result::Result>; +/// Builder to build a [ParquetRecordBatchReader] for a row group. +struct RowGroupReaderBuilder { + /// Path of the file. + file_path: String, + /// Metadata of the parquet file. + parquet_meta: Arc, + /// Reader to get data. + file_reader: BufReader>, + /// Projection mask. + projection: ProjectionMask, + /// Field levels to read. + field_levels: FieldLevels, +} + +impl RowGroupReaderBuilder { + /// Path of the file to read. + fn file_path(&self) -> &str { + &self.file_path + } + + /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. + async fn build(&mut self, row_group_idx: usize) -> Result { + let mut row_group = InMemoryRowGroup::create(&self.parquet_meta, row_group_idx); + // Fetches data into memory. + row_group + .fetch(&mut self.file_reader, &self.projection, None) + .await + .context(ReadParquetSnafu { + path: &self.file_path, + })?; + + // Builds the parquet reader. + // Now the row selection is None. + ParquetRecordBatchReader::try_new_with_row_groups( + &self.field_levels, + &row_group, + DEFAULT_READ_BATCH_SIZE, + None, + ) + .context(ReadParquetSnafu { + path: &self.file_path, + }) + } +} /// Parquet batch reader to read our SST format. pub struct ParquetReader { - /// Path of the file. - file_path: String, /// SST file to read. /// /// Holds the file handle to avoid the file purge purge it. _file_handle: FileHandle, - /// Inner parquet record batch stream. - stream: BoxedRecordBatchStream, + /// Indices of row groups to read. + row_groups: VecDeque, /// Helper to read record batches. /// /// Not `None` if [ParquetReader::stream] is not `None`. read_format: ReadFormat, + /// Builder to build row group readers. + reader_builder: RowGroupReaderBuilder, + /// Reader of current row group. + current_reader: Option, /// Buffered batches to return. - batches: Vec, + batches: VecDeque, } #[async_trait] impl BatchReader for ParquetReader { async fn next_batch(&mut self) -> Result> { - if let Some(batch) = self.batches.pop() { + if let Some(batch) = self.batches.pop_front() { return Ok(Some(batch)); } // We need to fetch next record batch and convert it to batches. - let Some(record_batch) = self.stream.try_next().await.context(ReadParquetSnafu { - path: &self.file_path, - })? - else { + let Some(record_batch) = self.fetch_next_record_batch().await? else { return Ok(None); }; self.read_format .convert_record_batch(&record_batch, &mut self.batches)?; - // Reverse batches so we could pop it. - self.batches.reverse(); - Ok(self.batches.pop()) + Ok(self.batches.pop_front()) } } @@ -290,59 +336,43 @@ impl ParquetReader { pub fn metadata(&self) -> &RegionMetadataRef { self.read_format.metadata() } -} -/// Cache layer for parquet's [AsyncFileReader]. -struct AsyncFileReaderCache { - /// Underlying async file reader. - reader: T, - /// Parquet metadata cached locally. - metadata: Option>, - /// Global cache. - cache: Option, - /// Id of the region. - region_id: RegionId, - /// Id of the file to read. - file_id: FileId, -} - -impl AsyncFileReader for AsyncFileReaderCache { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.reader.get_bytes(range) - } - - fn get_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, Result, ParquetError>> { - self.reader.get_byte_ranges(ranges) - } - - fn get_metadata(&mut self) -> BoxFuture<'_, Result, ParquetError>> { - async { - // Tries to get from local cache. - if let Some(metadata) = &self.metadata { - return Ok(metadata.clone()); - } - - // Tries to get from global cache. - if let Some(metadata) = self - .cache - .as_ref() - .and_then(|cache| cache.get_parquet_meta_data(self.region_id, self.file_id)) + /// Tries to fetch next [RecordBatch] from the reader. + /// + /// If the reader is exhausted, reads next row group. + async fn fetch_next_record_batch(&mut self) -> Result> { + if let Some(row_group_reader) = &mut self.current_reader { + if let Some(record_batch) = + row_group_reader + .next() + .transpose() + .context(ArrowReaderSnafu { + path: self.reader_builder.file_path(), + })? { - return Ok(metadata); - } - - // Cache miss. - let metadata = self.reader.get_metadata().await?; - // Cache the metadata. - if let Some(cache) = &self.cache { - cache.put_parquet_meta_data(self.region_id, self.file_id, metadata.clone()); + return Ok(Some(record_batch)); } + } - Ok(metadata) + // No more items in current row group, reads next row group. + while let Some(row_group_idx) = self.row_groups.pop_front() { + let mut row_group_reader = self.reader_builder.build(row_group_idx).await?; + let Some(record_batch) = + row_group_reader + .next() + .transpose() + .context(ArrowReaderSnafu { + path: self.reader_builder.file_path(), + })? + else { + continue; + }; + + // Sets current reader to this reader. + self.current_reader = Some(row_group_reader); + return Ok(Some(record_batch)); } - .boxed() + + Ok(None) } } diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index a80f7c874253..7be77d692fc5 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -22,7 +22,7 @@ use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::ProjectionMask; use parquet::column::page::{PageIterator, PageReader}; use parquet::errors::{ParquetError, Result}; -use parquet::file::metadata::RowGroupMetaData; +use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::reader::{ChunkReader, Length}; use parquet::file::serialized_reader::SerializedPageReader; use parquet::format::PageLocation; @@ -36,10 +36,25 @@ pub struct InMemoryRowGroup<'a> { } impl<'a> InMemoryRowGroup<'a> { + /// Creates a new [InMemoryRowGroup] by `row_group_idx`. + /// + /// # Panics + /// Panics if the `row_group_idx` is invalid. + pub fn create(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self { + let metadata = parquet_meta.row_group(row_group_idx); + let page_locations = parquet_meta + .offset_index() + .map(|x| x[row_group_idx].as_slice()); + + Self { + metadata, + row_count: metadata.num_rows() as usize, + column_chunks: vec![None; metadata.columns().len()], + page_locations, + } + } + /// Fetches the necessary column data into memory - // TODO(yingwen): Fix clippy warnings. - #[allow(clippy::filter_map_bool_then)] - #[allow(clippy::useless_conversion)] pub async fn fetch( &mut self, input: &mut T, @@ -56,26 +71,26 @@ impl<'a> InMemoryRowGroup<'a> { .iter() .zip(self.metadata.columns()) .enumerate() - .filter_map(|(idx, (chunk, chunk_meta))| { - (chunk.is_none() && projection.leaf_included(idx)).then(|| { - // If the first page does not start at the beginning of the column, - // then we need to also fetch a dictionary page. - let mut ranges = vec![]; - let (start, _len) = chunk_meta.byte_range(); - match page_locations[idx].first() { - Some(first) if first.offset as u64 != start => { - ranges.push(start as usize..first.offset as usize); - } - _ => (), + .filter(|&(idx, (chunk, _chunk_meta))| { + chunk.is_none() && projection.leaf_included(idx) + }) + .flat_map(|(idx, (_chunk, chunk_meta))| { + // If the first page does not start at the beginning of the column, + // then we need to also fetch a dictionary page. + let mut ranges = vec![]; + let (start, _len) = chunk_meta.byte_range(); + match page_locations[idx].first() { + Some(first) if first.offset as u64 != start => { + ranges.push(start as usize..first.offset as usize); } + _ => (), + } - ranges.extend(selection.scan_ranges(&page_locations[idx])); - page_start_offsets.push(ranges.iter().map(|range| range.start).collect()); + ranges.extend(selection.scan_ranges(&page_locations[idx])); + page_start_offsets.push(ranges.iter().map(|range| range.start).collect()); - ranges - }) + ranges }) - .flatten() .collect(); let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); @@ -94,7 +109,7 @@ impl<'a> InMemoryRowGroup<'a> { *chunk = Some(Arc::new(ColumnChunkData::Sparse { length: self.metadata.column(idx).byte_range().1 as usize, - data: offsets.into_iter().zip(chunks.into_iter()).collect(), + data: offsets.into_iter().zip(chunks).collect(), })) } } @@ -103,12 +118,11 @@ impl<'a> InMemoryRowGroup<'a> { .column_chunks .iter() .enumerate() - .filter_map(|(idx, chunk)| { - (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let column = self.metadata.column(idx); - let (start, length) = column.byte_range(); - start as usize..(start + length) as usize - }) + .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) + .map(|(idx, _chunk)| { + let column = self.metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize }) .collect();