diff --git a/src/mito2/src/sst/parquet/page_reader.rs b/src/mito2/src/sst/parquet/page_reader.rs index 643dc350b88d..42950d841255 100644 --- a/src/mito2/src/sst/parquet/page_reader.rs +++ b/src/mito2/src/sst/parquet/page_reader.rs @@ -15,94 +15,47 @@ //! Parquet page reader. use std::collections::VecDeque; -use std::sync::Arc; use parquet::column::page::{Page, PageMetadata, PageReader}; use parquet::errors::Result; -use crate::cache::{CacheManagerRef, PageKey, PageValue}; - -/// A reader that can cache pages from the underlying reader. -pub(crate) enum CachedPageReader { - /// Reads from the underlying reader. - Reader { - /// Inner page reader to get pages. - page_reader: T, - }, - /// Reads from cached pages. - Pages { - /// Cached pages. - pages: VecDeque, - }, +/// A reader that reads from cached pages. +pub(crate) struct CachedPageReader { + /// Cached pages. + pages: VecDeque, } -impl CachedPageReader { - /// Returns a new reader that caches all pages for the `page_reader` if the - /// `cache_manager` is `Some`. - pub(crate) fn new( - page_reader: T, - cache_manager: Option, - page_key: PageKey, - ) -> Result { - let Some(cache) = &cache_manager else { - return Ok(CachedPageReader::Reader { page_reader }); - }; - - if let Some(page_value) = cache.get_pages(&page_key) { - // We already cached all pages. - return Ok(CachedPageReader::from_pages(&page_value.pages)); - } - - // Cache miss. We load pages from the reader. - let pages = page_reader.collect::>>()?; - let page_value = Arc::new(PageValue::new(pages)); - // Puts into the cache. - cache.put_pages(page_key, page_value.clone()); - - Ok(CachedPageReader::from_pages(&page_value.pages)) - } - +impl CachedPageReader { /// Returns a new reader from existing pages. - fn from_pages(pages: &[Page]) -> Self { - Self::Pages { + pub(crate) fn new(pages: &[Page]) -> Self { + Self { pages: pages.iter().cloned().collect(), } } } -impl PageReader for CachedPageReader { +impl PageReader for CachedPageReader { fn get_next_page(&mut self) -> Result> { - match self { - CachedPageReader::Reader { page_reader } => page_reader.get_next_page(), - CachedPageReader::Pages { pages } => Ok(pages.pop_front()), - } + Ok(self.pages.pop_front()) } fn peek_next_page(&mut self) -> Result> { - match self { - CachedPageReader::Reader { page_reader } => page_reader.peek_next_page(), - CachedPageReader::Pages { pages } => Ok(pages.front().map(page_to_page_meta)), - } + Ok(self.pages.front().map(page_to_page_meta)) } fn skip_next_page(&mut self) -> Result<()> { - match self { - CachedPageReader::Reader { page_reader } => page_reader.skip_next_page(), - CachedPageReader::Pages { pages } => { - // When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops - // the dictionary page, which is always the first page. See: - // https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770 - // But the `GenericColumnReader` will read the dictionary page before skipping records so it ensures the - // dictionary page is read first. - // https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331 - pages.pop_front(); - Ok(()) - } - } + // When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops + // the dictionary page, which is always the first page. See: + // https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770 + // But the `GenericColumnReader` will read the dictionary page before skipping records so it ensures the + // dictionary page is read first. + // https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331 + self.pages.pop_front(); + Ok(()) } } -impl Iterator for CachedPageReader { +impl Iterator for CachedPageReader { type Item = Result; fn next(&mut self) -> Option { diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 0879c293fce0..ef2225f2a3a1 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -28,7 +28,7 @@ use parquet::file::serialized_reader::SerializedPageReader; use parquet::format::PageLocation; use store_api::storage::RegionId; -use crate::cache::{CacheManagerRef, PageKey}; +use crate::cache::{CacheManagerRef, PageKey, PageValue}; use crate::sst::file::FileId; use crate::sst::parquet::page_reader::CachedPageReader; @@ -42,6 +42,10 @@ pub struct InMemoryRowGroup<'a> { file_id: FileId, row_group_idx: usize, cache_manager: Option, + /// Cached pages for each column. + /// + /// `column_cached_pages.len()` equals to `column_chunks.len()`. + column_cached_pages: Vec>>, } impl<'a> InMemoryRowGroup<'a> { @@ -73,6 +77,7 @@ impl<'a> InMemoryRowGroup<'a> { file_id, row_group_idx, cache_manager, + column_cached_pages: vec![None; metadata.columns().len()], } } @@ -136,12 +141,19 @@ impl<'a> InMemoryRowGroup<'a> { } } } else { + // Now we only use cache in dense chunk data. + self.fetch_pages_from_cache(projection); + let fetch_ranges = self .column_chunks .iter() + .zip(&self.column_cached_pages) .enumerate() - .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) - .map(|(idx, _chunk)| { + // Don't need to fetch column data if we already cache the column's pages. + .filter(|&(idx, (chunk, cached_pages))| { + chunk.is_none() && projection.leaf_included(idx) && cached_pages.is_none() + }) + .map(|(idx, (_chunk, _cached_pages))| { let column = self.metadata.column(idx); let (start, length) = column.byte_range(); start as usize..(start + length) as usize @@ -150,8 +162,13 @@ impl<'a> InMemoryRowGroup<'a> { let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { + for (idx, (chunk, cached_pages)) in self + .column_chunks + .iter_mut() + .zip(&self.column_cached_pages) + .enumerate() + { + if chunk.is_some() || !projection.leaf_included(idx) || cached_pages.is_some() { continue; } @@ -166,43 +183,82 @@ impl<'a> InMemoryRowGroup<'a> { Ok(()) } -} -impl<'a> RowGroups for InMemoryRowGroup<'a> { - fn num_rows(&self) -> usize { - self.row_count + /// Fetches pages for columns if cache is enabled. + fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) { + self.column_chunks + .iter() + .enumerate() + .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) + .for_each(|(idx, _chunk)| { + if let Some(cache) = &self.cache_manager { + let page_key = PageKey { + region_id: self.region_id, + file_id: self.file_id, + row_group_idx: self.row_group_idx, + column_idx: idx, + }; + self.column_cached_pages[idx] = cache.get_pages(&page_key); + } + }); } - fn column_chunks(&self, i: usize) -> Result> { - match &self.column_chunks[i] { - None => Err(ParquetError::General(format!( - "Invalid column index {i}, column was not fetched" - ))), + /// Creates a page reader to read column at `i`. + fn column_page_reader(&self, i: usize) -> Result> { + if let Some(cached_pages) = &self.column_cached_pages[i] { + // Already in cache. + return Ok(Box::new(CachedPageReader::new(&cached_pages.pages))); + } + + // Cache miss. + let page_reader = match &self.column_chunks[i] { + None => { + return Err(ParquetError::General(format!( + "Invalid column index {i}, column was not fetched" + ))) + } Some(data) => { let page_locations = self.page_locations.map(|index| index[i].clone()); - let page_reader = SerializedPageReader::new( + SerializedPageReader::new( data.clone(), self.metadata.column(i), self.row_count, page_locations, - )?; - let page_key = PageKey { - region_id: self.region_id, - file_id: self.file_id, - row_group_idx: self.row_group_idx, - column_idx: i, - }; - let page_reader: Box = Box::new(CachedPageReader::new( - page_reader, - self.cache_manager.clone(), - page_key, - )?); - - Ok(Box::new(ColumnChunkIterator { - reader: Some(Ok(page_reader)), - })) + )? } - } + }; + + let Some(cache) = &self.cache_manager else { + // Cache is disabled. + return Ok(Box::new(page_reader)); + }; + + // We collect all pages and put them into the cache. + let pages = page_reader.collect::>>()?; + let page_value = Arc::new(PageValue::new(pages)); + let page_key = PageKey { + region_id: self.region_id, + file_id: self.file_id, + row_group_idx: self.row_group_idx, + column_idx: i, + }; + cache.put_pages(page_key, page_value.clone()); + + Ok(Box::new(CachedPageReader::new(&page_value.pages))) + } +} + +impl<'a> RowGroups for InMemoryRowGroup<'a> { + fn num_rows(&self) -> usize { + self.row_count + } + + fn column_chunks(&self, i: usize) -> Result> { + let page_reader = self.column_page_reader(i)?; + + Ok(Box::new(ColumnChunkIterator { + reader: Some(Ok(page_reader)), + })) } }