Skip to content

Commit

Permalink
feat: do not fetch data if we have pages in cache
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Nov 3, 2023
1 parent 9fb7b91 commit 49ead77
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 98 deletions.
85 changes: 19 additions & 66 deletions src/mito2/src/sst/parquet/page_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,94 +15,47 @@
//! Parquet page reader.
use std::collections::VecDeque;
use std::sync::Arc;

use parquet::column::page::{Page, PageMetadata, PageReader};
use parquet::errors::Result;

use crate::cache::{CacheManagerRef, PageKey, PageValue};

/// A reader that can cache pages from the underlying reader.
pub(crate) enum CachedPageReader<T> {
/// Reads from the underlying reader.
Reader {
/// Inner page reader to get pages.
page_reader: T,
},
/// Reads from cached pages.
Pages {
/// Cached pages.
pages: VecDeque<Page>,
},
/// A reader that reads from cached pages.
pub(crate) struct CachedPageReader {
/// Cached pages.
pages: VecDeque<Page>,
}

impl<T: PageReader> CachedPageReader<T> {
/// Returns a new reader that caches all pages for the `page_reader` if the
/// `cache_manager` is `Some`.
pub(crate) fn new(
page_reader: T,
cache_manager: Option<CacheManagerRef>,
page_key: PageKey,
) -> Result<Self> {
let Some(cache) = &cache_manager else {
return Ok(CachedPageReader::Reader { page_reader });
};

if let Some(page_value) = cache.get_pages(&page_key) {
// We already cached all pages.
return Ok(CachedPageReader::from_pages(&page_value.pages));
}

// Cache miss. We load pages from the reader.
let pages = page_reader.collect::<Result<Vec<_>>>()?;
let page_value = Arc::new(PageValue::new(pages));
// Puts into the cache.
cache.put_pages(page_key, page_value.clone());

Ok(CachedPageReader::from_pages(&page_value.pages))
}

impl CachedPageReader {
/// Returns a new reader from existing pages.
fn from_pages(pages: &[Page]) -> Self {
Self::Pages {
pub(crate) fn new(pages: &[Page]) -> Self {
Self {
pages: pages.iter().cloned().collect(),
}
}
}

impl<T: PageReader> PageReader for CachedPageReader<T> {
impl PageReader for CachedPageReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
match self {
CachedPageReader::Reader { page_reader } => page_reader.get_next_page(),
CachedPageReader::Pages { pages } => Ok(pages.pop_front()),
}
Ok(self.pages.pop_front())
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
match self {
CachedPageReader::Reader { page_reader } => page_reader.peek_next_page(),
CachedPageReader::Pages { pages } => Ok(pages.front().map(page_to_page_meta)),
}
Ok(self.pages.front().map(page_to_page_meta))
}

fn skip_next_page(&mut self) -> Result<()> {
match self {
CachedPageReader::Reader { page_reader } => page_reader.skip_next_page(),
CachedPageReader::Pages { pages } => {
// When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops
// the dictionary page, which is always the first page. See:
// https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770
// But the `GenericColumnReader` will read the dictionary page before skipping records so it ensures the
// dictionary page is read first.
// https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331
pages.pop_front();
Ok(())
}
}
// When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops
// the dictionary page, which is always the first page. See:
// https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770
// But the `GenericColumnReader` will read the dictionary page before skipping records so it ensures the
// dictionary page is read first.
// https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331
self.pages.pop_front();
Ok(())
}
}

impl<T: PageReader> Iterator for CachedPageReader<T> {
impl Iterator for CachedPageReader {
type Item = Result<Page>;

fn next(&mut self) -> Option<Self::Item> {
Expand Down
120 changes: 88 additions & 32 deletions src/mito2/src/sst/parquet/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use parquet::file::serialized_reader::SerializedPageReader;
use parquet::format::PageLocation;
use store_api::storage::RegionId;

use crate::cache::{CacheManagerRef, PageKey};
use crate::cache::{CacheManagerRef, PageKey, PageValue};
use crate::sst::file::FileId;
use crate::sst::parquet::page_reader::CachedPageReader;

Expand All @@ -42,6 +42,10 @@ pub struct InMemoryRowGroup<'a> {
file_id: FileId,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
/// Cached pages for each column.
///
/// `column_cached_pages.len()` equals to `column_chunks.len()`.
column_cached_pages: Vec<Option<Arc<PageValue>>>,
}

impl<'a> InMemoryRowGroup<'a> {
Expand Down Expand Up @@ -73,6 +77,7 @@ impl<'a> InMemoryRowGroup<'a> {
file_id,
row_group_idx,
cache_manager,
column_cached_pages: vec![None; metadata.columns().len()],
}
}

Expand Down Expand Up @@ -136,12 +141,19 @@ impl<'a> InMemoryRowGroup<'a> {
}
}
} else {
// Now we only use cache in dense chunk data.
self.fetch_pages_from_cache(projection);

let fetch_ranges = self
.column_chunks
.iter()
.zip(&self.column_cached_pages)
.enumerate()
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
// Don't need to fetch column data if we already cache the column's pages.
.filter(|&(idx, (chunk, cached_pages))| {
chunk.is_none() && projection.leaf_included(idx) && cached_pages.is_none()
})
.map(|(idx, (_chunk, _cached_pages))| {
let column = self.metadata.column(idx);
let (start, length) = column.byte_range();
start as usize..(start + length) as usize
Expand All @@ -150,8 +162,13 @@ impl<'a> InMemoryRowGroup<'a> {

let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
for (idx, (chunk, cached_pages)) in self
.column_chunks
.iter_mut()
.zip(&self.column_cached_pages)
.enumerate()
{
if chunk.is_some() || !projection.leaf_included(idx) || cached_pages.is_some() {
continue;
}

Expand All @@ -166,43 +183,82 @@ impl<'a> InMemoryRowGroup<'a> {

Ok(())
}
}

impl<'a> RowGroups for InMemoryRowGroup<'a> {
fn num_rows(&self) -> usize {
self.row_count
/// Fetches pages for columns if cache is enabled.
fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) {
self.column_chunks
.iter()
.enumerate()
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.for_each(|(idx, _chunk)| {
if let Some(cache) = &self.cache_manager {
let page_key = PageKey {
region_id: self.region_id,
file_id: self.file_id,
row_group_idx: self.row_group_idx,
column_idx: idx,
};
self.column_cached_pages[idx] = cache.get_pages(&page_key);
}
});
}

fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
match &self.column_chunks[i] {
None => Err(ParquetError::General(format!(
"Invalid column index {i}, column was not fetched"
))),
/// Creates a page reader to read column at `i`.
fn column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
if let Some(cached_pages) = &self.column_cached_pages[i] {
// Already in cache.
return Ok(Box::new(CachedPageReader::new(&cached_pages.pages)));
}

// Cache miss.
let page_reader = match &self.column_chunks[i] {
None => {
return Err(ParquetError::General(format!(
"Invalid column index {i}, column was not fetched"
)))
}
Some(data) => {
let page_locations = self.page_locations.map(|index| index[i].clone());
let page_reader = SerializedPageReader::new(
SerializedPageReader::new(
data.clone(),
self.metadata.column(i),
self.row_count,
page_locations,
)?;
let page_key = PageKey {
region_id: self.region_id,
file_id: self.file_id,
row_group_idx: self.row_group_idx,
column_idx: i,
};
let page_reader: Box<dyn PageReader> = Box::new(CachedPageReader::new(
page_reader,
self.cache_manager.clone(),
page_key,
)?);

Ok(Box::new(ColumnChunkIterator {
reader: Some(Ok(page_reader)),
}))
)?
}
}
};

let Some(cache) = &self.cache_manager else {
// Cache is disabled.
return Ok(Box::new(page_reader));
};

// We collect all pages and put them into the cache.
let pages = page_reader.collect::<Result<Vec<_>>>()?;
let page_value = Arc::new(PageValue::new(pages));
let page_key = PageKey {
region_id: self.region_id,
file_id: self.file_id,
row_group_idx: self.row_group_idx,
column_idx: i,
};
cache.put_pages(page_key, page_value.clone());

Ok(Box::new(CachedPageReader::new(&page_value.pages)))
}
}

impl<'a> RowGroups for InMemoryRowGroup<'a> {
fn num_rows(&self) -> usize {
self.row_count
}

fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
let page_reader = self.column_page_reader(i)?;

Ok(Box::new(ColumnChunkIterator {
reader: Some(Ok(page_reader)),
}))
}
}

Expand Down

0 comments on commit 49ead77

Please sign in to comment.