diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 2bd8c21e7510..9a7a8b85a8fa 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -25,6 +25,7 @@ pub(crate) mod write_cache; use std::mem; use std::sync::Arc; +use bytes::Bytes; use datatypes::value::Value; use datatypes::vectors::VectorRef; use moka::notification::RemovalCause; @@ -155,6 +156,7 @@ impl CacheManager { } } + // TODO(yingwen): Rename pages to page. /// Gets pages for the row group. pub fn get_pages(&self, page_key: &PageKey) -> Option> { self.page_cache.as_ref().and_then(|page_cache| { @@ -395,15 +397,31 @@ impl SstMetaKey { /// Cache key for pages of a SST row group. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PageKey { - /// Region id of the SST file to cache. - pub region_id: RegionId, - /// Id of the SST file to cache. - pub file_id: FileId, - /// Index of the row group. - pub row_group_idx: usize, - /// Index of the column in the row group. - pub column_idx: usize, +pub enum PageKey { + /// Cache key for a compressed byte range in a row group. + Compressed { + /// Region id of the SST file to cache. + region_id: RegionId, + /// Id of the SST file to cache. + file_id: FileId, + /// Index of the row group. + row_group_idx: usize, + /// Index of the column in the row group. + column_idx: usize, + }, + /// Cache key for an uncompressed page. + Uncompressed { + /// Region id of the SST file to cache. + region_id: RegionId, + /// Id of the SST file to cache. + file_id: FileId, + /// Index of the row group. + row_group_idx: usize, + /// Index of the column in the row group. + column_idx: usize, + /// Index of the page. + page_idx: usize, + }, } impl PageKey { @@ -414,21 +432,41 @@ impl PageKey { } /// Cached row group pages for a column. +#[derive(Default)] pub struct PageValue { - /// All pages of the column in the row group. - pub pages: Vec, + /// Compressed page of the column in the row group. + pub compressed: Bytes, + /// Uncompressed page of the column in the row group. It's always + /// `Some` if the key is for uncompressed page except in tests. + // We don't use enum here to make it easier to mock the struct. + pub uncompressed: Option, } impl PageValue { - /// Creates a new page value. - pub fn new(pages: Vec) -> PageValue { - PageValue { pages } + /// Creates a new value from a compressed page. + pub fn new_compressed(bytes: Bytes) -> PageValue { + PageValue { + compressed: bytes, + uncompressed: None, + } + } + + /// Creates a new value from an uncompressed page. + pub fn new_uncompressed(page: Page) -> PageValue { + PageValue { + compressed: Bytes::new(), + uncompressed: Some(page), + } } /// Returns memory used by the value (estimated). fn estimated_size(&self) -> usize { - // We only consider heap size of all pages. - self.pages.iter().map(|page| page.buffer().len()).sum() + mem::size_of::() + + self.compressed.len() + + self + .uncompressed + .as_ref() + .map_or(0, |page| page.buffer().len()) } } @@ -507,13 +545,14 @@ mod tests { .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value) .is_none()); - let key = PageKey { + let key = PageKey::Uncompressed { region_id, file_id, row_group_idx: 0, column_idx: 0, + page_idx: 0, }; - let pages = Arc::new(PageValue::new(Vec::new())); + let pages = Arc::new(PageValue::default()); cache.put_pages(key.clone(), pages); assert!(cache.get_pages(&key).is_none()); @@ -562,14 +601,14 @@ mod tests { let cache = CacheManager::builder().page_cache_size(1000).build(); let region_id = RegionId::new(1, 1); let file_id = FileId::random(); - let key = PageKey { + let key = PageKey::Compressed { region_id, file_id, row_group_idx: 0, column_idx: 0, }; assert!(cache.get_pages(&key).is_none()); - let pages = Arc::new(PageValue::new(Vec::new())); + let pages = Arc::new(PageValue::default()); cache.put_pages(key.clone(), pages); assert!(cache.get_pages(&key).is_some()); } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 34819c0c7155..adf508d44bd8 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -218,7 +218,7 @@ mod tests { // Cache 4 row groups. for i in 0..4 { - let page_key = PageKey { + let page_key = PageKey::Compressed { region_id: metadata.region_id, file_id: handle.file_id(), row_group_idx: i, @@ -226,7 +226,7 @@ mod tests { }; assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some()); } - let page_key = PageKey { + let page_key = PageKey::Compressed { region_id: metadata.region_id, file_id: handle.file_id(), row_group_idx: 5, diff --git a/src/mito2/src/sst/parquet/page_reader.rs b/src/mito2/src/sst/parquet/page_reader.rs index 1416da448b5a..b4c95e5e8c3d 100644 --- a/src/mito2/src/sst/parquet/page_reader.rs +++ b/src/mito2/src/sst/parquet/page_reader.rs @@ -14,44 +14,115 @@ //! Parquet page reader. -use std::collections::VecDeque; +use std::sync::Arc; use parquet::column::page::{Page, PageMetadata, PageReader}; use parquet::errors::Result; +use parquet::file::reader::SerializedPageReader; +use store_api::storage::RegionId; -/// A reader that reads from cached pages. +use crate::cache::{CacheManagerRef, PageKey, PageValue}; +use crate::sst::file::FileId; +use crate::sst::parquet::row_group::ColumnChunkData; + +/// A reader that reads pages from the cache. pub(crate) struct CachedPageReader { - /// Cached pages. - pages: VecDeque, + /// Page cache. + cache: CacheManagerRef, + /// Reader to fall back. `None` indicates the reader is exhausted. + reader: Option>, + region_id: RegionId, + file_id: FileId, + row_group_idx: usize, + column_idx: usize, + /// Current page index. + current_page_idx: usize, } impl CachedPageReader { - /// Returns a new reader from existing pages. - pub(crate) fn new(pages: &[Page]) -> Self { + /// Returns a new reader from a cache and a reader. + pub(crate) fn new( + cache: CacheManagerRef, + reader: SerializedPageReader, + region_id: RegionId, + file_id: FileId, + row_group_idx: usize, + column_idx: usize, + ) -> Self { Self { - pages: pages.iter().cloned().collect(), + cache, + reader: Some(reader), + region_id, + file_id, + row_group_idx, + column_idx, + current_page_idx: 0, } } } impl PageReader for CachedPageReader { fn get_next_page(&mut self) -> Result> { - Ok(self.pages.pop_front()) + let Some(reader) = self.reader.as_mut() else { + // The reader is exhausted. + return Ok(None); + }; + + // Tries to get it from the cache first. + let key = PageKey::Uncompressed { + region_id: self.region_id, + file_id: self.file_id, + row_group_idx: self.row_group_idx, + column_idx: self.column_idx, + page_idx: self.current_page_idx, + }; + if let Some(page) = self.cache.get_pages(&key) { + // Cache hit. + // Bumps the page index. + self.current_page_idx += 1; + // The reader skips this page. + reader.skip_next_page()?; + debug_assert!(page.uncompressed.is_some()); + return Ok(page.uncompressed.clone()); + } + + // Cache miss, load the page from the reader. + let Some(page) = reader.get_next_page()? else { + // The reader is exhausted. + self.reader = None; + return Ok(None); + }; + // Puts the page into the cache. + self.cache + .put_pages(key, Arc::new(PageValue::new_uncompressed(page.clone()))); + // Bumps the page index. + self.current_page_idx += 1; + + Ok(Some(page)) } fn peek_next_page(&mut self) -> Result> { - Ok(self.pages.front().map(page_to_page_meta)) + // The reader is exhausted. + let Some(reader) = self.reader.as_mut() else { + return Ok(None); + }; + // It only decodes the page header so we don't query the cache. + reader.peek_next_page() } fn skip_next_page(&mut self) -> Result<()> { + // The reader is exhausted. + let Some(reader) = self.reader.as_mut() else { + return Ok(()); + }; // When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops // the dictionary page. So it always return the dictionary page as the first page. See: // https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770 // But the `GenericColumnReader` will read the dictionary page before skipping records so it won't skip dictionary page. // So we don't need to handle the dictionary page specifically in this method. // https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331 - self.pages.pop_front(); - Ok(()) + self.current_page_idx += 1; + reader.skip_next_page() } } @@ -62,31 +133,3 @@ impl Iterator for CachedPageReader { self.get_next_page().transpose() } } - -/// Get [PageMetadata] from `page`. -/// -/// The conversion is based on [decode_page()](https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L438-L481) -/// and [PageMetadata](https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/page.rs#L279-L301). -fn page_to_page_meta(page: &Page) -> PageMetadata { - match page { - Page::DataPage { num_values, .. } => PageMetadata { - num_rows: None, - num_levels: Some(*num_values as usize), - is_dict: false, - }, - Page::DataPageV2 { - num_values, - num_rows, - .. - } => PageMetadata { - num_rows: Some(*num_rows as usize), - num_levels: Some(*num_values as usize), - is_dict: false, - }, - Page::DictionaryPage { .. } => PageMetadata { - num_rows: None, - num_levels: None, - is_dict: true, - }, - } -} diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 68a91e55fef4..596d6a9bff2c 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -40,16 +40,13 @@ use crate::sst::parquet::page_reader::CachedPageReader; pub struct InMemoryRowGroup<'a> { metadata: &'a RowGroupMetaData, page_locations: Option<&'a [Vec]>, + /// Compressed page of each column. column_chunks: Vec>>, row_count: usize, region_id: RegionId, file_id: FileId, row_group_idx: usize, cache_manager: Option, - /// Cached pages for each column. - /// - /// `column_cached_pages.len()` equals to `column_chunks.len()`. - column_cached_pages: Vec>>, file_path: &'a str, /// Object store. object_store: ObjectStore, @@ -86,7 +83,6 @@ impl<'a> InMemoryRowGroup<'a> { file_id, row_group_idx, cache_manager, - column_cached_pages: vec![None; metadata.columns().len()], file_path, object_store, } @@ -159,18 +155,15 @@ impl<'a> InMemoryRowGroup<'a> { } } else { // Now we only use cache in dense chunk data. - self.fetch_pages_from_cache(projection); + self.fetch_compressed_pages_from_cache(projection); let fetch_ranges = self .column_chunks .iter() - .zip(&self.column_cached_pages) .enumerate() // Don't need to fetch column data if we already cache the column's pages. - .filter(|&(idx, (chunk, cached_pages))| { - chunk.is_none() && projection.leaf_included(idx) && cached_pages.is_none() - }) - .map(|(idx, (_chunk, _cached_pages))| { + .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) + .map(|(idx, _chunk)| { let column = self.metadata.column(idx); let (start, length) = column.byte_range(); start..(start + length) @@ -184,17 +177,24 @@ impl<'a> InMemoryRowGroup<'a> { let mut chunk_data = self.fetch_bytes(&fetch_ranges).await?.into_iter(); - for (idx, (chunk, cached_pages)) in self - .column_chunks - .iter_mut() - .zip(&self.column_cached_pages) - .enumerate() - { - if chunk.is_some() || !projection.leaf_included(idx) || cached_pages.is_some() { + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { continue; } if let Some(data) = chunk_data.next() { + // Put the page to the cache. + if let Some(cache) = &self.cache_manager { + let page_key = PageKey::Compressed { + region_id: self.region_id, + file_id: self.file_id, + row_group_idx: self.row_group_idx, + column_idx: idx, + }; + cache + .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone()))); + } + *chunk = Some(Arc::new(ColumnChunkData::Dense { offset: self.metadata.column(idx).byte_range().0 as usize, data, @@ -206,21 +206,28 @@ impl<'a> InMemoryRowGroup<'a> { Ok(()) } - /// Fetches pages for columns if cache is enabled. - fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) { + /// Fetches compressed pages for columns if cache is enabled. + /// If a page is in the cache, sets the column chunk for the column. + fn fetch_compressed_pages_from_cache(&mut self, projection: &ProjectionMask) { self.column_chunks - .iter() + .iter_mut() .enumerate() - .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) - .for_each(|(idx, _chunk)| { + .filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx)) + .for_each(|(idx, chunk)| { if let Some(cache) = &self.cache_manager { - let page_key = PageKey { + let page_key = PageKey::Compressed { region_id: self.region_id, file_id: self.file_id, row_group_idx: self.row_group_idx, column_idx: idx, }; - self.column_cached_pages[idx] = cache.get_pages(&page_key); + + *chunk = cache.get_pages(&page_key).map(|page_value| { + Arc::new(ColumnChunkData::Dense { + offset: self.metadata.column(idx).byte_range().0 as usize, + data: page_value.compressed.clone(), + }) + }); } }); } @@ -259,12 +266,6 @@ impl<'a> InMemoryRowGroup<'a> { /// Creates a page reader to read column at `i`. fn column_page_reader(&self, i: usize) -> Result> { - if let Some(cached_pages) = &self.column_cached_pages[i] { - // Already in cache. - return Ok(Box::new(CachedPageReader::new(&cached_pages.pages))); - } - - // Cache miss. let page_reader = match &self.column_chunks[i] { None => { return Err(ParquetError::General(format!( @@ -283,22 +284,18 @@ impl<'a> InMemoryRowGroup<'a> { }; let Some(cache) = &self.cache_manager else { - // Cache is disabled. return Ok(Box::new(page_reader)); }; - // We collect all pages and put them into the cache. - let pages = page_reader.collect::>>()?; - let page_value = Arc::new(PageValue::new(pages)); - let page_key = PageKey { - region_id: self.region_id, - file_id: self.file_id, - row_group_idx: self.row_group_idx, - column_idx: i, - }; - cache.put_pages(page_key, page_value.clone()); - - Ok(Box::new(CachedPageReader::new(&page_value.pages))) + let cached_reader = CachedPageReader::new( + cache.clone(), + page_reader, + self.region_id, + self.file_id, + self.row_group_idx, + i, + ); + Ok(Box::new(cached_reader)) } } @@ -318,7 +315,7 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> { /// An in-memory column chunk #[derive(Clone)] -enum ColumnChunkData { +pub(crate) enum ColumnChunkData { /// Column chunk data representing only a subset of data pages Sparse { /// Length of the full column chunk