Skip to content

Commit

Permalink
Merge branch 'develop' into feat/impl_kafka_log_store
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 22, 2023
2 parents d19178d + 7d509e9 commit f8307a3
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 11 deletions.
12 changes: 7 additions & 5 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use async_trait::async_trait;
use common_telemetry::debug;
use common_time::range::TimestampRange;
use datatypes::arrow::record_batch::RecordBatch;
use object_store::{ObjectStore, Reader};
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
Expand Down Expand Up @@ -178,7 +178,7 @@ impl ParquetReaderBuilder {
file_handle: self.file_handle.clone(),
file_path,
parquet_meta,
file_reader: reader,
object_store: self.object_store.clone(),
projection: projection_mask,
field_levels,
cache_manager: self.cache_manager.clone(),
Expand Down Expand Up @@ -285,8 +285,8 @@ struct RowGroupReaderBuilder {
file_path: String,
/// Metadata of the parquet file.
parquet_meta: Arc<ParquetMetaData>,
/// Reader to get data.
file_reader: BufReader<Reader>,
/// Object store as an Operator.
object_store: ObjectStore,
/// Projection mask.
projection: ProjectionMask,
/// Field levels to read.
Expand All @@ -309,10 +309,12 @@ impl RowGroupReaderBuilder {
&self.parquet_meta,
row_group_idx,
self.cache_manager.clone(),
&self.file_path,
self.object_store.clone(),
);
// Fetches data into memory.
row_group
.fetch(&mut self.file_reader, &self.projection, None)
.fetch(&self.projection, None)
.await
.context(ReadParquetSnafu {
path: &self.file_path,
Expand Down
108 changes: 102 additions & 6 deletions src/mito2/src/sst/parquet/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650).
use std::ops::Range;
use std::sync::Arc;

use bytes::{Buf, Bytes};
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::ProjectionMask;
use parquet::column::page::{PageIterator, PageReader};
use parquet::errors::{ParquetError, Result};
Expand Down Expand Up @@ -46,6 +47,9 @@ pub struct InMemoryRowGroup<'a> {
///
/// `column_cached_pages.len()` equals to `column_chunks.len()`.
column_cached_pages: Vec<Option<Arc<PageValue>>>,
file_path: &'a str,
/// Object store.
object_store: ObjectStore,
}

impl<'a> InMemoryRowGroup<'a> {
Expand All @@ -59,6 +63,8 @@ impl<'a> InMemoryRowGroup<'a> {
parquet_meta: &'a ParquetMetaData,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
file_path: &'a str,
object_store: ObjectStore,
) -> Self {
let metadata = parquet_meta.row_group(row_group_idx);
// `page_locations` is always `None` if we don't set
Expand All @@ -78,13 +84,14 @@ impl<'a> InMemoryRowGroup<'a> {
row_group_idx,
cache_manager,
column_cached_pages: vec![None; metadata.columns().len()],
file_path,
object_store,
}
}

/// Fetches the necessary column data into memory
pub async fn fetch<T: AsyncFileReader + Send>(
pub async fn fetch(
&mut self,
input: &mut T,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
Expand All @@ -93,7 +100,7 @@ impl<'a> InMemoryRowGroup<'a> {
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];

let fetch_ranges = self
let fetch_ranges: Vec<_> = self
.column_chunks
.iter()
.zip(self.metadata.columns())
Expand All @@ -119,8 +126,11 @@ impl<'a> InMemoryRowGroup<'a> {
ranges
})
.collect();
let mut chunk_data =
fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges)
.await?
.into_iter();

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut page_start_offsets = page_start_offsets.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
Expand Down Expand Up @@ -165,7 +175,10 @@ impl<'a> InMemoryRowGroup<'a> {
return Ok(());
}

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut chunk_data =
fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges)
.await?
.into_iter();

for (idx, (chunk, cached_pages)) in self
.column_chunks
Expand Down Expand Up @@ -336,3 +349,86 @@ impl Iterator for ColumnChunkIterator {
}

impl PageIterator for ColumnChunkIterator {}

/// Fetches data from object store.
/// If the object store supports blocking, use sequence blocking read.
/// Otherwise, use concurrent read.
async fn fetch_byte_ranges(
file_path: &str,
object_store: ObjectStore,
ranges: Vec<Range<usize>>,
) -> Result<Vec<Bytes>> {
let ranges: Vec<_> = ranges
.iter()
.map(|range| range.start as u64..range.end as u64)
.collect();
if object_store.info().full_capability().blocking {
fetch_ranges_seq(file_path, object_store, ranges).await
} else {
fetch_ranges_concurrent(file_path, object_store, ranges).await
}
}

/// Fetches data from object store sequentially
async fn fetch_ranges_seq(
file_path: &str,
object_store: ObjectStore,
ranges: Vec<Range<u64>>,
) -> Result<Vec<Bytes>> {
let block_object_store = object_store.blocking();
let file_path = file_path.to_string();

let f = move || -> Result<Vec<Bytes>> {
ranges
.into_iter()
.map(|range| {
let data = block_object_store
.read_with(&file_path)
.range(range)
.call()
.map_err(|e| ParquetError::External(Box::new(e)))?;
Ok::<_, ParquetError>(Bytes::from(data))
})
.collect::<Result<Vec<_>>>()
};

maybe_spawn_blocking(f).await
}

/// Fetches data from object store concurrently.
async fn fetch_ranges_concurrent(
file_path: &str,
object_store: ObjectStore,
ranges: Vec<Range<u64>>,
) -> Result<Vec<Bytes>> {
// TODO(QuenKar): may merge small ranges to a bigger range to optimize.
let mut handles = Vec::with_capacity(ranges.len());
for range in ranges {
let future_read = object_store.read_with(file_path);
handles.push(async move {
let data = future_read
.range(range.start..range.end)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
Ok::<_, ParquetError>(Bytes::from(data))
});
}
let results = futures::future::try_join_all(handles).await?;
Ok(results)
}

// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83
/// Takes a function and spawns it to a tokio blocking pool if available
pub async fn maybe_spawn_blocking<F, T>(f: F) -> Result<T>
where
F: FnOnce() -> Result<T> + Send + 'static,
T: Send + 'static,
{
match tokio::runtime::Handle::try_current() {
Ok(runtime) => runtime
.spawn_blocking(f)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?,
Err(_) => f(),
}
}

0 comments on commit f8307a3

Please sign in to comment.