diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 45e36786d41d..0882ef82c7e3 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use common_telemetry::debug; use common_time::range::TimestampRange; use datatypes::arrow::record_batch::RecordBatch; -use object_store::{ObjectStore, Reader}; +use object_store::ObjectStore; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; @@ -178,7 +178,7 @@ impl ParquetReaderBuilder { file_handle: self.file_handle.clone(), file_path, parquet_meta, - file_reader: reader, + object_store: self.object_store.clone(), projection: projection_mask, field_levels, cache_manager: self.cache_manager.clone(), @@ -285,8 +285,8 @@ struct RowGroupReaderBuilder { file_path: String, /// Metadata of the parquet file. parquet_meta: Arc, - /// Reader to get data. - file_reader: BufReader, + /// Object store as an Operator. + object_store: ObjectStore, /// Projection mask. projection: ProjectionMask, /// Field levels to read. @@ -309,10 +309,12 @@ impl RowGroupReaderBuilder { &self.parquet_meta, row_group_idx, self.cache_manager.clone(), + &self.file_path, + self.object_store.clone(), ); // Fetches data into memory. row_group - .fetch(&mut self.file_reader, &self.projection, None) + .fetch(&self.projection, None) .await .context(ReadParquetSnafu { path: &self.file_path, diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 827db8999ae8..b24413e43f69 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -14,11 +14,12 @@ //! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650). +use std::ops::Range; use std::sync::Arc; use bytes::{Buf, Bytes}; +use object_store::ObjectStore; use parquet::arrow::arrow_reader::{RowGroups, RowSelection}; -use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::ProjectionMask; use parquet::column::page::{PageIterator, PageReader}; use parquet::errors::{ParquetError, Result}; @@ -46,6 +47,9 @@ pub struct InMemoryRowGroup<'a> { /// /// `column_cached_pages.len()` equals to `column_chunks.len()`. column_cached_pages: Vec>>, + file_path: &'a str, + /// Object store. + object_store: ObjectStore, } impl<'a> InMemoryRowGroup<'a> { @@ -59,6 +63,8 @@ impl<'a> InMemoryRowGroup<'a> { parquet_meta: &'a ParquetMetaData, row_group_idx: usize, cache_manager: Option, + file_path: &'a str, + object_store: ObjectStore, ) -> Self { let metadata = parquet_meta.row_group(row_group_idx); // `page_locations` is always `None` if we don't set @@ -78,13 +84,14 @@ impl<'a> InMemoryRowGroup<'a> { row_group_idx, cache_manager, column_cached_pages: vec![None; metadata.columns().len()], + file_path, + object_store, } } /// Fetches the necessary column data into memory - pub async fn fetch( + pub async fn fetch( &mut self, - input: &mut T, projection: &ProjectionMask, selection: Option<&RowSelection>, ) -> Result<()> { @@ -93,7 +100,7 @@ impl<'a> InMemoryRowGroup<'a> { // `RowSelection` let mut page_start_offsets: Vec> = vec![]; - let fetch_ranges = self + let fetch_ranges: Vec<_> = self .column_chunks .iter() .zip(self.metadata.columns()) @@ -119,8 +126,11 @@ impl<'a> InMemoryRowGroup<'a> { ranges }) .collect(); + let mut chunk_data = + fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges) + .await? + .into_iter(); - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); let mut page_start_offsets = page_start_offsets.into_iter(); for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { @@ -165,7 +175,10 @@ impl<'a> InMemoryRowGroup<'a> { return Ok(()); } - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut chunk_data = + fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges) + .await? + .into_iter(); for (idx, (chunk, cached_pages)) in self .column_chunks @@ -336,3 +349,86 @@ impl Iterator for ColumnChunkIterator { } impl PageIterator for ColumnChunkIterator {} + +/// Fetches data from object store. +/// If the object store supports blocking, use sequence blocking read. +/// Otherwise, use concurrent read. +async fn fetch_byte_ranges( + file_path: &str, + object_store: ObjectStore, + ranges: Vec>, +) -> Result> { + let ranges: Vec<_> = ranges + .iter() + .map(|range| range.start as u64..range.end as u64) + .collect(); + if object_store.info().full_capability().blocking { + fetch_ranges_seq(file_path, object_store, ranges).await + } else { + fetch_ranges_concurrent(file_path, object_store, ranges).await + } +} + +/// Fetches data from object store sequentially +async fn fetch_ranges_seq( + file_path: &str, + object_store: ObjectStore, + ranges: Vec>, +) -> Result> { + let block_object_store = object_store.blocking(); + let file_path = file_path.to_string(); + + let f = move || -> Result> { + ranges + .into_iter() + .map(|range| { + let data = block_object_store + .read_with(&file_path) + .range(range) + .call() + .map_err(|e| ParquetError::External(Box::new(e)))?; + Ok::<_, ParquetError>(Bytes::from(data)) + }) + .collect::>>() + }; + + maybe_spawn_blocking(f).await +} + +/// Fetches data from object store concurrently. +async fn fetch_ranges_concurrent( + file_path: &str, + object_store: ObjectStore, + ranges: Vec>, +) -> Result> { + // TODO(QuenKar): may merge small ranges to a bigger range to optimize. + let mut handles = Vec::with_capacity(ranges.len()); + for range in ranges { + let future_read = object_store.read_with(file_path); + handles.push(async move { + let data = future_read + .range(range.start..range.end) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?; + Ok::<_, ParquetError>(Bytes::from(data)) + }); + } + let results = futures::future::try_join_all(handles).await?; + Ok(results) +} + +// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83 +/// Takes a function and spawns it to a tokio blocking pool if available +pub async fn maybe_spawn_blocking(f: F) -> Result +where + F: FnOnce() -> Result + Send + 'static, + T: Send + 'static, +{ + match tokio::runtime::Handle::try_current() { + Ok(runtime) => runtime + .spawn_blocking(f) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?, + Err(_) => f(), + } +}