Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fine-grained page cache for row groups #4576

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 59 additions & 20 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) mod write_cache;
use std::mem;
use std::sync::Arc;

use bytes::Bytes;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use moka::notification::RemovalCause;
Expand Down Expand Up @@ -155,6 +156,7 @@ impl CacheManager {
}
}

// TODO(yingwen): Rename pages to page.
/// Gets pages for the row group.
pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
self.page_cache.as_ref().and_then(|page_cache| {
Expand Down Expand Up @@ -395,15 +397,31 @@ impl SstMetaKey {

/// Cache key for pages of a SST row group.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PageKey {
/// Region id of the SST file to cache.
pub region_id: RegionId,
/// Id of the SST file to cache.
pub file_id: FileId,
/// Index of the row group.
pub row_group_idx: usize,
/// Index of the column in the row group.
pub column_idx: usize,
pub enum PageKey {
/// Cache key for a compressed byte range in a row group.
Compressed {
/// Region id of the SST file to cache.
region_id: RegionId,
/// Id of the SST file to cache.
file_id: FileId,
/// Index of the row group.
row_group_idx: usize,
/// Index of the column in the row group.
column_idx: usize,
},
/// Cache key for an uncompressed page.
Uncompressed {
/// Region id of the SST file to cache.
region_id: RegionId,
/// Id of the SST file to cache.
file_id: FileId,
/// Index of the row group.
row_group_idx: usize,
/// Index of the column in the row group.
column_idx: usize,
/// Index of the page.
page_idx: usize,
},
}

impl PageKey {
Expand All @@ -414,21 +432,41 @@ impl PageKey {
}

/// Cached row group pages for a column.
#[derive(Default)]
pub struct PageValue {
/// All pages of the column in the row group.
pub pages: Vec<Page>,
/// Compressed page of the column in the row group.
pub compressed: Bytes,
/// Uncompressed page of the column in the row group. It's always
/// `Some` if the key is for uncompressed page except in tests.
// We don't use enum here to make it easier to mock the struct.
pub uncompressed: Option<Page>,
}

impl PageValue {
/// Creates a new page value.
pub fn new(pages: Vec<Page>) -> PageValue {
PageValue { pages }
/// Creates a new value from a compressed page.
pub fn new_compressed(bytes: Bytes) -> PageValue {
PageValue {
compressed: bytes,
uncompressed: None,
}
}

/// Creates a new value from an uncompressed page.
pub fn new_uncompressed(page: Page) -> PageValue {
PageValue {
compressed: Bytes::new(),
uncompressed: Some(page),
}
}

/// Returns memory used by the value (estimated).
fn estimated_size(&self) -> usize {
// We only consider heap size of all pages.
self.pages.iter().map(|page| page.buffer().len()).sum()
mem::size_of::<Self>()
+ self.compressed.len()
+ self
.uncompressed
.as_ref()
.map_or(0, |page| page.buffer().len())
}
}

Expand Down Expand Up @@ -507,13 +545,14 @@ mod tests {
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.is_none());

let key = PageKey {
let key = PageKey::Uncompressed {
region_id,
file_id,
row_group_idx: 0,
column_idx: 0,
page_idx: 0,
};
let pages = Arc::new(PageValue::new(Vec::new()));
let pages = Arc::new(PageValue::default());
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_none());

Expand Down Expand Up @@ -562,14 +601,14 @@ mod tests {
let cache = CacheManager::builder().page_cache_size(1000).build();
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let key = PageKey {
let key = PageKey::Compressed {
region_id,
file_id,
row_group_idx: 0,
column_idx: 0,
};
assert!(cache.get_pages(&key).is_none());
let pages = Arc::new(PageValue::new(Vec::new()));
let pages = Arc::new(PageValue::default());
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_some());
}
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,15 @@ mod tests {

// Cache 4 row groups.
for i in 0..4 {
let page_key = PageKey {
let page_key = PageKey::Compressed {
region_id: metadata.region_id,
file_id: handle.file_id(),
row_group_idx: i,
column_idx: 0,
};
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some());
}
let page_key = PageKey {
let page_key = PageKey::Compressed {
region_id: metadata.region_id,
file_id: handle.file_id(),
row_group_idx: 5,
Expand Down
121 changes: 82 additions & 39 deletions src/mito2/src/sst/parquet/page_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,115 @@

//! Parquet page reader.

use std::collections::VecDeque;
use std::sync::Arc;

use parquet::column::page::{Page, PageMetadata, PageReader};
use parquet::errors::Result;
use parquet::file::reader::SerializedPageReader;
use store_api::storage::RegionId;

/// A reader that reads from cached pages.
use crate::cache::{CacheManagerRef, PageKey, PageValue};
use crate::sst::file::FileId;
use crate::sst::parquet::row_group::ColumnChunkData;

/// A reader that reads pages from the cache.
pub(crate) struct CachedPageReader {
/// Cached pages.
pages: VecDeque<Page>,
/// Page cache.
cache: CacheManagerRef,
/// Reader to fall back. `None` indicates the reader is exhausted.
reader: Option<SerializedPageReader<ColumnChunkData>>,
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
column_idx: usize,
/// Current page index.
current_page_idx: usize,
}

impl CachedPageReader {
/// Returns a new reader from existing pages.
pub(crate) fn new(pages: &[Page]) -> Self {
/// Returns a new reader from a cache and a reader.
pub(crate) fn new(
cache: CacheManagerRef,
reader: SerializedPageReader<ColumnChunkData>,
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
column_idx: usize,
) -> Self {
Self {
pages: pages.iter().cloned().collect(),
cache,
reader: Some(reader),
region_id,
file_id,
row_group_idx,
column_idx,
current_page_idx: 0,
}
}
}

impl PageReader for CachedPageReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
Ok(self.pages.pop_front())
let Some(reader) = self.reader.as_mut() else {
// The reader is exhausted.
return Ok(None);
};

// Tries to get it from the cache first.
let key = PageKey::Uncompressed {
region_id: self.region_id,
file_id: self.file_id,
row_group_idx: self.row_group_idx,
column_idx: self.column_idx,
page_idx: self.current_page_idx,
};
if let Some(page) = self.cache.get_pages(&key) {
// Cache hit.
// Bumps the page index.
self.current_page_idx += 1;
// The reader skips this page.
reader.skip_next_page()?;
debug_assert!(page.uncompressed.is_some());
return Ok(page.uncompressed.clone());
}

// Cache miss, load the page from the reader.
let Some(page) = reader.get_next_page()? else {
// The reader is exhausted.
self.reader = None;
return Ok(None);
};
// Puts the page into the cache.
self.cache
.put_pages(key, Arc::new(PageValue::new_uncompressed(page.clone())));
// Bumps the page index.
self.current_page_idx += 1;

Ok(Some(page))
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
Ok(self.pages.front().map(page_to_page_meta))
// The reader is exhausted.
let Some(reader) = self.reader.as_mut() else {
return Ok(None);
};
// It only decodes the page header so we don't query the cache.
reader.peek_next_page()
}

fn skip_next_page(&mut self) -> Result<()> {
// The reader is exhausted.
let Some(reader) = self.reader.as_mut() else {
return Ok(());
};
// When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops
// the dictionary page. So it always return the dictionary page as the first page. See:
// https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770
// But the `GenericColumnReader` will read the dictionary page before skipping records so it won't skip dictionary page.
// So we don't need to handle the dictionary page specifically in this method.
// https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331
self.pages.pop_front();
Ok(())
self.current_page_idx += 1;
reader.skip_next_page()
}
}

Expand All @@ -62,31 +133,3 @@ impl Iterator for CachedPageReader {
self.get_next_page().transpose()
}
}

/// Get [PageMetadata] from `page`.
///
/// The conversion is based on [decode_page()](https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L438-L481)
/// and [PageMetadata](https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/page.rs#L279-L301).
fn page_to_page_meta(page: &Page) -> PageMetadata {
match page {
Page::DataPage { num_values, .. } => PageMetadata {
num_rows: None,
num_levels: Some(*num_values as usize),
is_dict: false,
},
Page::DataPageV2 {
num_values,
num_rows,
..
} => PageMetadata {
num_rows: Some(*num_rows as usize),
num_levels: Some(*num_values as usize),
is_dict: false,
},
Page::DictionaryPage { .. } => PageMetadata {
num_rows: None,
num_levels: None,
is_dict: true,
},
}
}
Loading
Loading