From a9852c65251b0517e342173d593fc20823c7ad23 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 1 Nov 2023 11:06:20 +0800 Subject: [PATCH 01/14] feat: add page cache --- src/mito2/src/cache.rs | 113 +++++++++++++++++++++++++++++-- src/mito2/src/config.rs | 5 ++ src/mito2/src/read/projection.rs | 2 +- src/mito2/src/worker.rs | 2 + 4 files changed, 116 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index dbdbdc72faa8..764eb4c3ccea 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use datatypes::value::Value; use datatypes::vectors::VectorRef; use moka::sync::Cache; +use parquet::column::page::Page; use parquet::file::metadata::ParquetMetaData; use store_api::storage::RegionId; @@ -36,13 +37,19 @@ pub struct CacheManager { sst_meta_cache: Option, /// Cache for vectors. vector_cache: Option, + /// Cache for SST pages. + page_cache: Option, } pub type CacheManagerRef = Arc; impl CacheManager { /// Creates a new manager with specific cache size in bytes. - pub fn new(sst_meta_cache_size: u64, vector_cache_size: u64) -> CacheManager { + pub fn new( + sst_meta_cache_size: u64, + vector_cache_size: u64, + page_cache_size: u64, + ) -> CacheManager { let sst_meta_cache = if sst_meta_cache_size == 0 { None } else { @@ -67,10 +74,22 @@ impl CacheManager { .build(); Some(cache) }; + let page_cache = if page_cache_size == 0 { + None + } else { + let cache = Cache::builder() + .max_capacity(page_cache_size) + .weigher(|k: &PageKey, v: &Arc| { + (k.estimated_size() + v.estimated_size()) as u32 + }) + .build(); + Some(cache) + }; CacheManager { sst_meta_cache, vector_cache, + page_cache, } } @@ -117,6 +136,20 @@ impl CacheManager { cache.insert(key, vector); } } + + /// Gets pages for the row group. + pub fn get_pages(&self, page_key: &PageKey) -> Option> { + self.page_cache + .as_ref() + .and_then(|page_cache| page_cache.get(page_key)) + } + + /// Puts pages of the row group into the cache. + pub fn put_pages(&self, page_key: PageKey, pages: Arc) { + if let Some(cache) = &self.page_cache { + cache.insert(page_key, pages); + } + } } /// Cache key (region id, file id) for SST meta. @@ -126,7 +159,46 @@ struct SstMetaKey(RegionId, FileId); impl SstMetaKey { /// Returns memory used by the key (estimated). fn estimated_size(&self) -> usize { - mem::size_of::() + mem::size_of::() + } +} + +/// Cache key for pages of a SST row group. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PageKey { + /// Region id of the SST file to cache. + pub region_id: RegionId, + /// Id of the SST file to cache. + pub file_id: FileId, + /// Index of the row group. + pub row_group_idx: usize, + /// Index of the column in the row group. + pub column_idx: usize, +} + +impl PageKey { + /// Returns memory used by the key (estimated). + fn estimated_size(&self) -> usize { + mem::size_of::() + } +} + +/// Cached row group pages for a column. +pub struct PageValue { + /// All pages of the column in the row group. + pages: Vec, +} + +impl PageValue { + /// Creates a new page value. + pub fn new(pages: Vec) -> PageValue { + PageValue { pages } + } + + /// Returns memory used by the value (estimated). + fn estimated_size(&self) -> usize { + // We only consider heap size of all pages. + self.pages.iter().map(|page| page.buffer().len()).sum() } } @@ -136,6 +208,8 @@ type SstMetaCache = Cache>; /// /// e.g. `"hello" => ["hello", "hello", "hello"]` type VectorCache = Cache; +/// Maps (region, file, row group, column) to [PageValue]. +type PageCache = Cache>; #[cfg(test)] mod tests { @@ -146,8 +220,10 @@ mod tests { #[test] fn test_disable_cache() { - let cache = CacheManager::new(0, 0); + let cache = CacheManager::new(0, 0, 0); assert!(cache.sst_meta_cache.is_none()); + assert!(cache.vector_cache.is_none()); + assert!(cache.page_cache.is_none()); let region_id = RegionId::new(1, 1); let file_id = FileId::random(); @@ -159,11 +235,21 @@ mod tests { let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10])); cache.put_repeated_vector(value.clone(), vector.clone()); assert!(cache.get_repeated_vector(&value).is_none()); + + let key = PageKey { + region_id, + file_id, + row_group_idx: 0, + column_idx: 0, + }; + let pages = Arc::new(PageValue::new(Vec::new())); + cache.put_pages(key.clone(), pages); + assert!(cache.get_pages(&key).is_none()); } #[test] fn test_parquet_meta_cache() { - let cache = CacheManager::new(2000, 0); + let cache = CacheManager::new(2000, 0, 0); let region_id = RegionId::new(1, 1); let file_id = FileId::random(); assert!(cache.get_parquet_meta_data(region_id, file_id).is_none()); @@ -176,7 +262,7 @@ mod tests { #[test] fn test_repeated_vector_cache() { - let cache = CacheManager::new(0, 4096); + let cache = CacheManager::new(0, 4096, 0); let value = Value::Int64(10); assert!(cache.get_repeated_vector(&value).is_none()); let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10])); @@ -184,4 +270,21 @@ mod tests { let cached = cache.get_repeated_vector(&value).unwrap(); assert_eq!(vector, cached); } + + #[test] + fn test_page_cache() { + let cache = CacheManager::new(0, 0, 1000); + let region_id = RegionId::new(1, 1); + let file_id = FileId::random(); + let key = PageKey { + region_id, + file_id, + row_group_idx: 0, + column_idx: 0, + }; + assert!(cache.get_pages(&key).is_none()); + let pages = Arc::new(PageValue::new(Vec::new())); + cache.put_pages(key.clone(), pages); + assert!(cache.get_pages(&key).is_some()); + } } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 5350c2c8e1a0..18d17648966f 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -65,6 +65,10 @@ pub struct MitoConfig { pub sst_meta_cache_size: ReadableSize, /// Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. pub vector_cache_size: ReadableSize, + /// Cache size for pages of SST row groups (default 1GB). Setting it to 0 to disable the cache. + pub page_cache_size: ReadableSize, + + // Other configs: /// Buffer size for SST writing. pub sst_write_buffer_size: ReadableSize, } @@ -83,6 +87,7 @@ impl Default for MitoConfig { global_write_buffer_reject_size: ReadableSize::gb(2), sst_meta_cache_size: ReadableSize::mb(128), vector_cache_size: ReadableSize::mb(512), + page_cache_size: ReadableSize::gb(1), sst_write_buffer_size: ReadableSize::mb(8), } } diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index ba1f462dcd4d..c5e6feefcbe1 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -342,7 +342,7 @@ mod tests { assert_eq!([0, 1, 2, 3, 4], mapper.column_ids()); assert_eq!([3, 4], mapper.batch_fields()); - let cache = CacheManager::new(0, 1024); + let cache = CacheManager::new(0, 1024, 0); let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3); let record_batch = mapper.convert(&batch, Some(&cache)).unwrap(); let expect = "\ diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index bc2463f224fe..a69b79c63f1d 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -123,6 +123,7 @@ impl WorkerGroup { let cache_manager = Arc::new(CacheManager::new( config.sst_meta_cache_size.as_bytes(), config.vector_cache_size.as_bytes(), + config.page_cache_size.as_bytes(), )); let workers = (0..config.num_workers) @@ -221,6 +222,7 @@ impl WorkerGroup { let cache_manager = Arc::new(CacheManager::new( config.sst_meta_cache_size.as_bytes(), config.vector_cache_size.as_bytes(), + config.page_cache_size.as_bytes(), )); let workers = (0..config.num_workers) From 623aef6fc2fe98965c0e4e7f02ac999be171443e Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 1 Nov 2023 11:15:55 +0800 Subject: [PATCH 02/14] docs: update mito config toml --- config/datanode.example.toml | 3 ++- config/standalone.example.toml | 30 ++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 7793e148ac4c..47107f4be8b6 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -105,10 +105,11 @@ global_write_buffer_reject_size = "2GB" sst_meta_cache_size = "128MB" # Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. vector_cache_size = "512MB" +# Cache size for pages of SST row groups (default 1GB). Setting it to 0 to disable the cache. +page_cache_size = "1GB" # Buffer size for SST writing. sst_write_buffer_size = "8MB" - # Log options # [logging] # Specify logs directory. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index e4bd41a616c4..c21311b85ce4 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -152,6 +152,36 @@ auto_flush_interval = "1h" # Global write buffer size for all regions. global_write_buffer_size = "1GB" +# Mito engine options +[[region_engine]] +[region_engine.mito] +# Number of region workers +num_workers = 8 +# Request channel size of each worker +worker_channel_size = 128 +# Max batch size for a worker to handle requests +worker_request_batch_size = 64 +# Number of meta action updated to trigger a new checkpoint for the manifest +manifest_checkpoint_distance = 10 +# Manifest compression type +manifest_compress_type = "Uncompressed" +# Max number of running background jobs +max_background_jobs = 4 +# Interval to auto flush a region if it has not flushed yet. +auto_flush_interval = "1h" +# Global write buffer size for all regions. +global_write_buffer_size = "1GB" +# Global write buffer size threshold to reject write requests (default 2G). +global_write_buffer_reject_size = "2GB" +# Cache size for SST metadata (default 128MB). Setting it to 0 to disable the cache. +sst_meta_cache_size = "128MB" +# Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. +vector_cache_size = "512MB" +# Cache size for pages of SST row groups (default 1GB). Setting it to 0 to disable the cache. +page_cache_size = "1GB" +# Buffer size for SST writing. +sst_write_buffer_size = "8MB" + # Log options # [logging] # Specify logs directory. From a586f76cfe8d27907617d278d4e55714e67371fe Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 1 Nov 2023 20:33:05 +0800 Subject: [PATCH 03/14] feat: impl CachedPageReader --- src/mito2/src/cache.rs | 2 +- src/mito2/src/sst/parquet.rs | 1 + src/mito2/src/sst/parquet/page_reader.rs | 139 +++++++++++++++++++++++ 3 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 src/mito2/src/sst/parquet/page_reader.rs diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 764eb4c3ccea..9d838d4f1caf 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -186,7 +186,7 @@ impl PageKey { /// Cached row group pages for a column. pub struct PageValue { /// All pages of the column in the row group. - pages: Vec, + pub pages: Vec, } impl PageValue { diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 481f98f1af12..73604638c604 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -15,6 +15,7 @@ //! SST in parquet format. mod format; +mod page_reader; pub mod reader; pub mod row_group; mod stats; diff --git a/src/mito2/src/sst/parquet/page_reader.rs b/src/mito2/src/sst/parquet/page_reader.rs new file mode 100644 index 000000000000..643dc350b88d --- /dev/null +++ b/src/mito2/src/sst/parquet/page_reader.rs @@ -0,0 +1,139 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Parquet page reader. + +use std::collections::VecDeque; +use std::sync::Arc; + +use parquet::column::page::{Page, PageMetadata, PageReader}; +use parquet::errors::Result; + +use crate::cache::{CacheManagerRef, PageKey, PageValue}; + +/// A reader that can cache pages from the underlying reader. +pub(crate) enum CachedPageReader { + /// Reads from the underlying reader. + Reader { + /// Inner page reader to get pages. + page_reader: T, + }, + /// Reads from cached pages. + Pages { + /// Cached pages. + pages: VecDeque, + }, +} + +impl CachedPageReader { + /// Returns a new reader that caches all pages for the `page_reader` if the + /// `cache_manager` is `Some`. + pub(crate) fn new( + page_reader: T, + cache_manager: Option, + page_key: PageKey, + ) -> Result { + let Some(cache) = &cache_manager else { + return Ok(CachedPageReader::Reader { page_reader }); + }; + + if let Some(page_value) = cache.get_pages(&page_key) { + // We already cached all pages. + return Ok(CachedPageReader::from_pages(&page_value.pages)); + } + + // Cache miss. We load pages from the reader. + let pages = page_reader.collect::>>()?; + let page_value = Arc::new(PageValue::new(pages)); + // Puts into the cache. + cache.put_pages(page_key, page_value.clone()); + + Ok(CachedPageReader::from_pages(&page_value.pages)) + } + + /// Returns a new reader from existing pages. + fn from_pages(pages: &[Page]) -> Self { + Self::Pages { + pages: pages.iter().cloned().collect(), + } + } +} + +impl PageReader for CachedPageReader { + fn get_next_page(&mut self) -> Result> { + match self { + CachedPageReader::Reader { page_reader } => page_reader.get_next_page(), + CachedPageReader::Pages { pages } => Ok(pages.pop_front()), + } + } + + fn peek_next_page(&mut self) -> Result> { + match self { + CachedPageReader::Reader { page_reader } => page_reader.peek_next_page(), + CachedPageReader::Pages { pages } => Ok(pages.front().map(page_to_page_meta)), + } + } + + fn skip_next_page(&mut self) -> Result<()> { + match self { + CachedPageReader::Reader { page_reader } => page_reader.skip_next_page(), + CachedPageReader::Pages { pages } => { + // When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops + // the dictionary page, which is always the first page. See: + // https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770 + // But the `GenericColumnReader` will read the dictionary page before skipping records so it ensures the + // dictionary page is read first. + // https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331 + pages.pop_front(); + Ok(()) + } + } + } +} + +impl Iterator for CachedPageReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } +} + +/// Get [PageMetadata] from `page`. +/// +/// The conversion is based on [decode_page()](https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L438-L481) +/// and [PageMetadata](https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/page.rs#L279-L301). +fn page_to_page_meta(page: &Page) -> PageMetadata { + match page { + Page::DataPage { num_values, .. } => PageMetadata { + num_rows: None, + num_levels: Some(*num_values as usize), + is_dict: false, + }, + Page::DataPageV2 { + num_values, + num_rows, + .. + } => PageMetadata { + num_rows: Some(*num_rows as usize), + num_levels: Some(*num_values as usize), + is_dict: false, + }, + Page::DictionaryPage { .. } => PageMetadata { + num_rows: None, + num_levels: None, + is_dict: true, + }, + } +} From a9e606aa50d61f7029b1d9a8962da5fe9a777ec7 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 1 Nov 2023 21:23:59 +0800 Subject: [PATCH 04/14] feat: use cache reader to read row group --- src/mito2/src/sst/parquet/reader.rs | 14 +++++++++- src/mito2/src/sst/parquet/row_group.rs | 37 ++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index a56e140ec033..45f58a4c9616 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -183,6 +183,7 @@ impl ParquetReaderBuilder { file_reader: reader, projection: projection_mask, field_levels, + cache_manager: self.cache_manager.clone(), }; let metrics = Metrics { @@ -292,6 +293,8 @@ struct RowGroupReaderBuilder { projection: ProjectionMask, /// Field levels to read. field_levels: FieldLevels, + /// Cache. + cache_manager: Option, } impl RowGroupReaderBuilder { @@ -302,7 +305,13 @@ impl RowGroupReaderBuilder { /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. async fn build(&mut self, row_group_idx: usize) -> Result { - let mut row_group = InMemoryRowGroup::create(&self.parquet_meta, row_group_idx); + let mut row_group = InMemoryRowGroup::create( + self.file_handle.region_id(), + self.file_handle.file_id(), + &self.parquet_meta, + row_group_idx, + self.cache_manager.clone(), + ); // Fetches data into memory. row_group .fetch(&mut self.file_reader, &self.projection, None) @@ -334,6 +343,9 @@ pub struct ParquetReader { /// Not `None` if [ParquetReader::stream] is not `None`. read_format: ReadFormat, /// Builder to build row group readers. + /// + /// The builder contains the file handle so don't drop the builder while using + /// the [ParquetReader]. reader_builder: RowGroupReaderBuilder, /// Reader of current row group. current_reader: Option, diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 7be77d692fc5..0879c293fce0 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -26,6 +26,11 @@ use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::reader::{ChunkReader, Length}; use parquet::file::serialized_reader::SerializedPageReader; use parquet::format::PageLocation; +use store_api::storage::RegionId; + +use crate::cache::{CacheManagerRef, PageKey}; +use crate::sst::file::FileId; +use crate::sst::parquet::page_reader::CachedPageReader; /// An in-memory collection of column chunks pub struct InMemoryRowGroup<'a> { @@ -33,6 +38,10 @@ pub struct InMemoryRowGroup<'a> { page_locations: Option<&'a [Vec]>, column_chunks: Vec>>, row_count: usize, + region_id: RegionId, + file_id: FileId, + row_group_idx: usize, + cache_manager: Option, } impl<'a> InMemoryRowGroup<'a> { @@ -40,8 +49,17 @@ impl<'a> InMemoryRowGroup<'a> { /// /// # Panics /// Panics if the `row_group_idx` is invalid. - pub fn create(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self { + pub fn create( + region_id: RegionId, + file_id: FileId, + parquet_meta: &'a ParquetMetaData, + row_group_idx: usize, + cache_manager: Option, + ) -> Self { let metadata = parquet_meta.row_group(row_group_idx); + // `page_locations` is always `None` if we don't set + // [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index) + // to `true`. let page_locations = parquet_meta .offset_index() .map(|x| x[row_group_idx].as_slice()); @@ -51,6 +69,10 @@ impl<'a> InMemoryRowGroup<'a> { row_count: metadata.num_rows() as usize, column_chunks: vec![None; metadata.columns().len()], page_locations, + region_id, + file_id, + row_group_idx, + cache_manager, } } @@ -158,11 +180,22 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> { ))), Some(data) => { let page_locations = self.page_locations.map(|index| index[i].clone()); - let page_reader: Box = Box::new(SerializedPageReader::new( + let page_reader = SerializedPageReader::new( data.clone(), self.metadata.column(i), self.row_count, page_locations, + )?; + let page_key = PageKey { + region_id: self.region_id, + file_id: self.file_id, + row_group_idx: self.row_group_idx, + column_idx: i, + }; + let page_reader: Box = Box::new(CachedPageReader::new( + page_reader, + self.cache_manager.clone(), + page_key, )?); Ok(Box::new(ColumnChunkIterator { From 60726dfb84675d13ca653ec4a4b3513dd1b4c86e Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 1 Nov 2023 23:18:09 +0800 Subject: [PATCH 05/14] feat: do not fetch data if we have pages in cache --- src/mito2/src/sst/parquet/page_reader.rs | 85 ++++------------ src/mito2/src/sst/parquet/row_group.rs | 120 +++++++++++++++++------ 2 files changed, 107 insertions(+), 98 deletions(-) diff --git a/src/mito2/src/sst/parquet/page_reader.rs b/src/mito2/src/sst/parquet/page_reader.rs index 643dc350b88d..42950d841255 100644 --- a/src/mito2/src/sst/parquet/page_reader.rs +++ b/src/mito2/src/sst/parquet/page_reader.rs @@ -15,94 +15,47 @@ //! Parquet page reader. use std::collections::VecDeque; -use std::sync::Arc; use parquet::column::page::{Page, PageMetadata, PageReader}; use parquet::errors::Result; -use crate::cache::{CacheManagerRef, PageKey, PageValue}; - -/// A reader that can cache pages from the underlying reader. -pub(crate) enum CachedPageReader { - /// Reads from the underlying reader. - Reader { - /// Inner page reader to get pages. - page_reader: T, - }, - /// Reads from cached pages. - Pages { - /// Cached pages. - pages: VecDeque, - }, +/// A reader that reads from cached pages. +pub(crate) struct CachedPageReader { + /// Cached pages. + pages: VecDeque, } -impl CachedPageReader { - /// Returns a new reader that caches all pages for the `page_reader` if the - /// `cache_manager` is `Some`. - pub(crate) fn new( - page_reader: T, - cache_manager: Option, - page_key: PageKey, - ) -> Result { - let Some(cache) = &cache_manager else { - return Ok(CachedPageReader::Reader { page_reader }); - }; - - if let Some(page_value) = cache.get_pages(&page_key) { - // We already cached all pages. - return Ok(CachedPageReader::from_pages(&page_value.pages)); - } - - // Cache miss. We load pages from the reader. - let pages = page_reader.collect::>>()?; - let page_value = Arc::new(PageValue::new(pages)); - // Puts into the cache. - cache.put_pages(page_key, page_value.clone()); - - Ok(CachedPageReader::from_pages(&page_value.pages)) - } - +impl CachedPageReader { /// Returns a new reader from existing pages. - fn from_pages(pages: &[Page]) -> Self { - Self::Pages { + pub(crate) fn new(pages: &[Page]) -> Self { + Self { pages: pages.iter().cloned().collect(), } } } -impl PageReader for CachedPageReader { +impl PageReader for CachedPageReader { fn get_next_page(&mut self) -> Result> { - match self { - CachedPageReader::Reader { page_reader } => page_reader.get_next_page(), - CachedPageReader::Pages { pages } => Ok(pages.pop_front()), - } + Ok(self.pages.pop_front()) } fn peek_next_page(&mut self) -> Result> { - match self { - CachedPageReader::Reader { page_reader } => page_reader.peek_next_page(), - CachedPageReader::Pages { pages } => Ok(pages.front().map(page_to_page_meta)), - } + Ok(self.pages.front().map(page_to_page_meta)) } fn skip_next_page(&mut self) -> Result<()> { - match self { - CachedPageReader::Reader { page_reader } => page_reader.skip_next_page(), - CachedPageReader::Pages { pages } => { - // When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops - // the dictionary page, which is always the first page. See: - // https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770 - // But the `GenericColumnReader` will read the dictionary page before skipping records so it ensures the - // dictionary page is read first. - // https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331 - pages.pop_front(); - Ok(()) - } - } + // When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops + // the dictionary page, which is always the first page. See: + // https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770 + // But the `GenericColumnReader` will read the dictionary page before skipping records so it ensures the + // dictionary page is read first. + // https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331 + self.pages.pop_front(); + Ok(()) } } -impl Iterator for CachedPageReader { +impl Iterator for CachedPageReader { type Item = Result; fn next(&mut self) -> Option { diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 0879c293fce0..ef2225f2a3a1 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -28,7 +28,7 @@ use parquet::file::serialized_reader::SerializedPageReader; use parquet::format::PageLocation; use store_api::storage::RegionId; -use crate::cache::{CacheManagerRef, PageKey}; +use crate::cache::{CacheManagerRef, PageKey, PageValue}; use crate::sst::file::FileId; use crate::sst::parquet::page_reader::CachedPageReader; @@ -42,6 +42,10 @@ pub struct InMemoryRowGroup<'a> { file_id: FileId, row_group_idx: usize, cache_manager: Option, + /// Cached pages for each column. + /// + /// `column_cached_pages.len()` equals to `column_chunks.len()`. + column_cached_pages: Vec>>, } impl<'a> InMemoryRowGroup<'a> { @@ -73,6 +77,7 @@ impl<'a> InMemoryRowGroup<'a> { file_id, row_group_idx, cache_manager, + column_cached_pages: vec![None; metadata.columns().len()], } } @@ -136,12 +141,19 @@ impl<'a> InMemoryRowGroup<'a> { } } } else { + // Now we only use cache in dense chunk data. + self.fetch_pages_from_cache(projection); + let fetch_ranges = self .column_chunks .iter() + .zip(&self.column_cached_pages) .enumerate() - .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) - .map(|(idx, _chunk)| { + // Don't need to fetch column data if we already cache the column's pages. + .filter(|&(idx, (chunk, cached_pages))| { + chunk.is_none() && projection.leaf_included(idx) && cached_pages.is_none() + }) + .map(|(idx, (_chunk, _cached_pages))| { let column = self.metadata.column(idx); let (start, length) = column.byte_range(); start as usize..(start + length) as usize @@ -150,8 +162,13 @@ impl<'a> InMemoryRowGroup<'a> { let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { + for (idx, (chunk, cached_pages)) in self + .column_chunks + .iter_mut() + .zip(&self.column_cached_pages) + .enumerate() + { + if chunk.is_some() || !projection.leaf_included(idx) || cached_pages.is_some() { continue; } @@ -166,43 +183,82 @@ impl<'a> InMemoryRowGroup<'a> { Ok(()) } -} -impl<'a> RowGroups for InMemoryRowGroup<'a> { - fn num_rows(&self) -> usize { - self.row_count + /// Fetches pages for columns if cache is enabled. + fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) { + self.column_chunks + .iter() + .enumerate() + .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) + .for_each(|(idx, _chunk)| { + if let Some(cache) = &self.cache_manager { + let page_key = PageKey { + region_id: self.region_id, + file_id: self.file_id, + row_group_idx: self.row_group_idx, + column_idx: idx, + }; + self.column_cached_pages[idx] = cache.get_pages(&page_key); + } + }); } - fn column_chunks(&self, i: usize) -> Result> { - match &self.column_chunks[i] { - None => Err(ParquetError::General(format!( - "Invalid column index {i}, column was not fetched" - ))), + /// Creates a page reader to read column at `i`. + fn column_page_reader(&self, i: usize) -> Result> { + if let Some(cached_pages) = &self.column_cached_pages[i] { + // Already in cache. + return Ok(Box::new(CachedPageReader::new(&cached_pages.pages))); + } + + // Cache miss. + let page_reader = match &self.column_chunks[i] { + None => { + return Err(ParquetError::General(format!( + "Invalid column index {i}, column was not fetched" + ))) + } Some(data) => { let page_locations = self.page_locations.map(|index| index[i].clone()); - let page_reader = SerializedPageReader::new( + SerializedPageReader::new( data.clone(), self.metadata.column(i), self.row_count, page_locations, - )?; - let page_key = PageKey { - region_id: self.region_id, - file_id: self.file_id, - row_group_idx: self.row_group_idx, - column_idx: i, - }; - let page_reader: Box = Box::new(CachedPageReader::new( - page_reader, - self.cache_manager.clone(), - page_key, - )?); - - Ok(Box::new(ColumnChunkIterator { - reader: Some(Ok(page_reader)), - })) + )? } - } + }; + + let Some(cache) = &self.cache_manager else { + // Cache is disabled. + return Ok(Box::new(page_reader)); + }; + + // We collect all pages and put them into the cache. + let pages = page_reader.collect::>>()?; + let page_value = Arc::new(PageValue::new(pages)); + let page_key = PageKey { + region_id: self.region_id, + file_id: self.file_id, + row_group_idx: self.row_group_idx, + column_idx: i, + }; + cache.put_pages(page_key, page_value.clone()); + + Ok(Box::new(CachedPageReader::new(&page_value.pages))) + } +} + +impl<'a> RowGroups for InMemoryRowGroup<'a> { + fn num_rows(&self) -> usize { + self.row_count + } + + fn column_chunks(&self, i: usize) -> Result> { + let page_reader = self.column_page_reader(i)?; + + Ok(Box::new(ColumnChunkIterator { + reader: Some(Ok(page_reader)), + })) } } From 830d264f9e284431e3345f27513ccfd0241ad1c6 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 1 Nov 2023 23:34:02 +0800 Subject: [PATCH 06/14] chore: return if nothing to fetch --- src/mito2/src/sst/parquet/row_group.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index ef2225f2a3a1..827db8999ae8 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -144,7 +144,7 @@ impl<'a> InMemoryRowGroup<'a> { // Now we only use cache in dense chunk data. self.fetch_pages_from_cache(projection); - let fetch_ranges = self + let fetch_ranges: Vec<_> = self .column_chunks .iter() .zip(&self.column_cached_pages) @@ -160,6 +160,11 @@ impl<'a> InMemoryRowGroup<'a> { }) .collect(); + if fetch_ranges.is_empty() { + // Nothing to fetch. + return Ok(()); + } + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); for (idx, (chunk, cached_pages)) in self From 4efe77f020b205d3088e49196ce0b886afd9e886 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 2 Nov 2023 10:55:48 +0800 Subject: [PATCH 07/14] feat: enlarge page cache size --- config/datanode.example.toml | 4 ++-- config/standalone.example.toml | 4 ++-- src/mito2/src/config.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 47107f4be8b6..2a1c97b16f08 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -105,8 +105,8 @@ global_write_buffer_reject_size = "2GB" sst_meta_cache_size = "128MB" # Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. vector_cache_size = "512MB" -# Cache size for pages of SST row groups (default 1GB). Setting it to 0 to disable the cache. -page_cache_size = "1GB" +# Cache size for pages of SST row groups (default 2GB). Setting it to 0 to disable the cache. +page_cache_size = "2GB" # Buffer size for SST writing. sst_write_buffer_size = "8MB" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index c21311b85ce4..e03014ca66db 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -177,8 +177,8 @@ global_write_buffer_reject_size = "2GB" sst_meta_cache_size = "128MB" # Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. vector_cache_size = "512MB" -# Cache size for pages of SST row groups (default 1GB). Setting it to 0 to disable the cache. -page_cache_size = "1GB" +# Cache size for pages of SST row groups (default 2GB). Setting it to 0 to disable the cache. +page_cache_size = "2GB" # Buffer size for SST writing. sst_write_buffer_size = "8MB" diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 18d17648966f..f58266fd0db1 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -65,7 +65,7 @@ pub struct MitoConfig { pub sst_meta_cache_size: ReadableSize, /// Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. pub vector_cache_size: ReadableSize, - /// Cache size for pages of SST row groups (default 1GB). Setting it to 0 to disable the cache. + /// Cache size for pages of SST row groups (default 2GB). Setting it to 0 to disable the cache. pub page_cache_size: ReadableSize, // Other configs: @@ -87,7 +87,7 @@ impl Default for MitoConfig { global_write_buffer_reject_size: ReadableSize::gb(2), sst_meta_cache_size: ReadableSize::mb(128), vector_cache_size: ReadableSize::mb(512), - page_cache_size: ReadableSize::gb(1), + page_cache_size: ReadableSize::gb(2), sst_write_buffer_size: ReadableSize::mb(8), } } From ff075892af566d8a339de157b60e53912312831e Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 2 Nov 2023 12:34:55 +0800 Subject: [PATCH 08/14] test: test write read parquet --- src/mito2/src/read.rs | 2 +- src/mito2/src/sst/parquet.rs | 77 +++++++++++++++++++ src/mito2/src/sst/parquet/writer.rs | 2 - src/mito2/src/test_util.rs | 26 +++++-- src/mito2/src/test_util/sst_util.rs | 112 ++++++++++++++++++++++++++++ 5 files changed, 210 insertions(+), 9 deletions(-) create mode 100644 src/mito2/src/test_util/sst_util.rs diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index ee2305fdd916..7c3fda279026 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -686,7 +686,7 @@ mod tests { op_types: &[OpType], field: &[u64], ) -> Batch { - new_batch_builder(b"test", timestamps, sequences, op_types, field) + new_batch_builder(b"test", timestamps, sequences, op_types, 1, field) .build() .unwrap() } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 73604638c604..ad4caa7c4d38 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -60,3 +60,80 @@ pub struct SstInfo { /// Number of rows. pub num_rows: usize, } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::OpType; + use common_time::Timestamp; + + use super::*; + use crate::read::Batch; + use crate::sst::parquet::reader::ParquetReaderBuilder; + use crate::sst::parquet::writer::ParquetWriter; + use crate::test_util::sst_util::{ + new_primary_key, new_source, sst_file_handle, sst_region_metadata, + }; + use crate::test_util::{check_reader_result, new_batch_builder, TestEnv}; + + const FILE_DIR: &str = "/"; + + fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { + assert!(end > start); + let pk = new_primary_key(tags); + let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); + let sequences = vec![1000; end - start]; + let op_types = vec![OpType::Put; end - start]; + let field: Vec<_> = (start..end).map(|v| v as u64).collect(); + new_batch_builder(&pk, ×tamps, &sequences, &op_types, 2, &field) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_write_read() { + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + let metadata = Arc::new(sst_region_metadata()); + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + // Use a small row group size for test. + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + + let mut writer = + ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone()); + let info = writer.write_all(&write_opts).await.unwrap().unwrap(); + assert_eq!(200, info.num_rows); + assert!(info.file_size > 0); + assert_eq!( + ( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(199) + ), + info.time_range + ); + + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); + let mut reader = builder.build().await.unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch_by_range(&["a", "d"], 0, 50), + new_batch_by_range(&["a", "d"], 50, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 150), + new_batch_by_range(&["b", "h"], 150, 200), + ], + ) + .await; + } +} diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 14d6a9da3e7a..b236ce1d3c4f 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -155,5 +155,3 @@ impl SourceStats { } } } - -// TODO(yingwen): Port tests. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 69bd22d26e1b..e602643ff5c5 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -17,6 +17,7 @@ pub mod memtable_util; pub mod meta_util; pub mod scheduler_util; +pub mod sst_util; pub mod version_util; use std::collections::HashMap; @@ -195,6 +196,12 @@ impl TestEnv { ) } + /// Only initializes the object store manager, returns the default object store. + pub fn init_object_store_manager(&mut self) -> ObjectStore { + self.object_store_manager = Some(Arc::new(self.create_object_store_manager())); + self.get_object_store().unwrap() + } + /// Creates a new [WorkerGroup] with specific config under this env. pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; @@ -207,14 +214,19 @@ impl TestEnv { ) -> (RaftEngineLogStore, ObjectStoreManager) { let data_home = self.data_home.path(); let wal_path = data_home.join("wal"); - let data_path = data_home.join("data").as_path().display().to_string(); - let log_store = log_store_util::create_tmp_local_file_log_store(&wal_path).await; + + let object_store_manager = self.create_object_store_manager(); + (log_store, object_store_manager) + } + + fn create_object_store_manager(&self) -> ObjectStoreManager { + let data_home = self.data_home.path(); + let data_path = data_home.join("data").as_path().display().to_string(); let mut builder = Fs::default(); builder.root(&data_path); let object_store = ObjectStore::new(builder).unwrap().finish(); - let object_store_manager = ObjectStoreManager::new("default", object_store); - (log_store, object_store_manager) + ObjectStoreManager::new("default", object_store) } /// If `initial_metadata` is `Some`, creates a new manifest. If `initial_metadata` @@ -414,6 +426,7 @@ pub fn new_batch_builder( timestamps: &[i64], sequences: &[u64], op_types: &[OpType], + field_column_id: ColumnId, field: &[u64], ) -> BatchBuilder { let mut builder = BatchBuilder::new(primary_key.to_vec()); @@ -431,13 +444,14 @@ pub fn new_batch_builder( ))) .unwrap() .push_field_array( - 1, + field_column_id, Arc::new(UInt64Array::from_iter_values(field.iter().copied())), ) .unwrap(); builder } +/// Returns a new [Batch] whose field has column id 1. pub fn new_batch( primary_key: &[u8], timestamps: &[i64], @@ -445,7 +459,7 @@ pub fn new_batch( op_types: &[OpType], field: &[u64], ) -> Batch { - new_batch_builder(primary_key, timestamps, sequences, op_types, field) + new_batch_builder(primary_key, timestamps, sequences, op_types, 1, field) .build() .unwrap() } diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs new file mode 100644 index 000000000000..3638d119faa1 --- /dev/null +++ b/src/mito2/src/test_util/sst_util.rs @@ -0,0 +1,112 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for testing SSTs. + +use api::v1::SemanticType; +use common_time::Timestamp; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use datatypes::value::ValueRef; +use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; +use store_api::storage::RegionId; + +use crate::read::{Batch, Source}; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::sst::file::{FileHandle, FileId, FileMeta}; +use crate::test_util::{new_noop_file_purger, VecBatchReader}; + +/// Test region id. +const REGION_ID: RegionId = RegionId::new(0, 0); + +/// Creates a new region metadata for testing SSTs. +/// +/// Schema: tag_0, tag_1, field_0, ts +pub fn sst_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(REGION_ID); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_0".to_string(), + ConcreteDataType::uint64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![0, 1]); + builder.build().unwrap() +} + +/// Encodes a primary key for specific tags. +pub fn new_primary_key(tags: &[&str]) -> Vec { + let fields = (0..tags.len()) + .map(|_| SortField::new(ConcreteDataType::string_datatype())) + .collect(); + let converter = McmpRowCodec::new(fields); + converter + .encode(tags.iter().map(|tag| ValueRef::String(tag))) + .unwrap() +} + +/// Creates a [Source] from `batches`. +pub fn new_source(batches: &[Batch]) -> Source { + let reader = VecBatchReader::new(batches); + Source::Reader(Box::new(reader)) +} + +/// Creates a new [FileHandle] for a SST. +pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { + let file_purger = new_noop_file_purger(); + FileHandle::new( + FileMeta { + region_id: REGION_ID, + file_id: FileId::random(), + time_range: ( + Timestamp::new_millisecond(start_ms), + Timestamp::new_millisecond(end_ms), + ), + level: 0, + file_size: 0, + }, + file_purger, + ) +} From 7da8dbcc46adc4b95cd6ae10eb159615af4c52f6 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 2 Nov 2023 12:40:56 +0800 Subject: [PATCH 09/14] test: test cache --- src/mito2/src/sst/parquet.rs | 63 ++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index ad4caa7c4d38..af3f8479f39c 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -69,6 +69,7 @@ mod tests { use common_time::Timestamp; use super::*; + use crate::cache::{CacheManager, PageKey}; use crate::read::Batch; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; @@ -109,8 +110,7 @@ mod tests { ..Default::default() }; - let mut writer = - ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone()); + let mut writer = ParquetWriter::new(file_path, metadata, source, object_store.clone()); let info = writer.write_all(&write_opts).await.unwrap().unwrap(); assert_eq!(200, info.num_rows); assert!(info.file_size > 0); @@ -136,4 +136,63 @@ mod tests { ) .await; } + + #[tokio::test] + async fn test_read_with_cache() { + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + let metadata = Arc::new(sst_region_metadata()); + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + // Use a small row group size for test. + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + // Prepare data. + let mut writer = + ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone()); + writer.write_all(&write_opts).await.unwrap().unwrap(); + + let cache = Some(Arc::new(CacheManager::new(0, 0, 64 * 1024 * 1024))); + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store) + .cache(cache.clone()); + for _ in 0..3 { + let mut reader = builder.build().await.unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch_by_range(&["a", "d"], 0, 50), + new_batch_by_range(&["a", "d"], 50, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 150), + new_batch_by_range(&["b", "h"], 150, 200), + ], + ) + .await; + } + + // Cache 4 row groups. + for i in 0..4 { + let page_key = PageKey { + region_id: metadata.region_id, + file_id: handle.file_id(), + row_group_idx: i, + column_idx: 0, + }; + assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some()); + } + let page_key = PageKey { + region_id: metadata.region_id, + file_id: handle.file_id(), + row_group_idx: 5, + column_idx: 0, + }; + assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); + } } From 8a87f627adb55f446f664446d8c9d3718803776e Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 9 Nov 2023 12:24:09 +0800 Subject: [PATCH 10/14] docs: update comments --- src/mito2/src/sst/parquet/page_reader.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/sst/parquet/page_reader.rs b/src/mito2/src/sst/parquet/page_reader.rs index 42950d841255..1416da448b5a 100644 --- a/src/mito2/src/sst/parquet/page_reader.rs +++ b/src/mito2/src/sst/parquet/page_reader.rs @@ -45,10 +45,10 @@ impl PageReader for CachedPageReader { fn skip_next_page(&mut self) -> Result<()> { // When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops - // the dictionary page, which is always the first page. See: + // the dictionary page. So it always return the dictionary page as the first page. See: // https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770 - // But the `GenericColumnReader` will read the dictionary page before skipping records so it ensures the - // dictionary page is read first. + // But the `GenericColumnReader` will read the dictionary page before skipping records so it won't skip dictionary page. + // So we don't need to handle the dictionary page specifically in this method. // https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331 self.pages.pop_front(); Ok(()) From 67cb8233afd1fefa1f4df1cf9b37a2eb26b4e606 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 9 Nov 2023 14:09:59 +0800 Subject: [PATCH 11/14] test: fix config api test --- tests-integration/tests/http.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 6a54707a1ad3..f5479418fe56 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -743,6 +743,7 @@ global_write_buffer_size = "1GiB" global_write_buffer_reject_size = "2GiB" sst_meta_cache_size = "128MiB" vector_cache_size = "512MiB" +page_cache_size = "2GiB" sst_write_buffer_size = "8MiB" [[datanode.region_engine]] From 30610d01073970df6fd944540dd403205ff35b87 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 16 Nov 2023 16:34:13 +0800 Subject: [PATCH 12/14] feat: cache metrics --- src/mito2/src/cache.rs | 89 +++++++++++++++++++++++------ src/mito2/src/metrics.rs | 25 +++++++- src/mito2/src/sst/parquet/reader.rs | 2 +- 3 files changed, 95 insertions(+), 21 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 9d838d4f1caf..529e5d3d4eee 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -29,8 +29,16 @@ use parquet::file::metadata::ParquetMetaData; use store_api::storage::RegionId; use crate::cache::cache_size::parquet_meta_size; +use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; use crate::sst::file::FileId; +// Metrics type key for sst meta. +const SST_META_TYPE: &str = "sst_meta"; +// Metrics type key for vector. +const VECTOR_TYPE: &str = "vector"; +// Metrics type key for pages. +const PAGE_TYPE: &str = "page"; + /// Manages cached data for the engine. pub struct CacheManager { /// Cache for SST metadata. @@ -55,9 +63,12 @@ impl CacheManager { } else { let cache = Cache::builder() .max_capacity(sst_meta_cache_size) - .weigher(|k: &SstMetaKey, v: &Arc| { - // We ignore the size of `Arc`. - (k.estimated_size() + parquet_meta_size(v)) as u32 + .weigher(meta_cache_weight) + .eviction_listener(|k, v, _cause| { + let size = meta_cache_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[SST_META_TYPE]) + .sub(size.into()); }) .build(); Some(cache) @@ -67,9 +78,12 @@ impl CacheManager { } else { let cache = Cache::builder() .max_capacity(vector_cache_size) - .weigher(|_k, v: &VectorRef| { - // We ignore the heap size of `Value`. - (mem::size_of::() + v.memory_size()) as u32 + .weigher(vector_cache_weight) + .eviction_listener(|k, v, _cause| { + let size = vector_cache_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[VECTOR_TYPE]) + .sub(size.into()); }) .build(); Some(cache) @@ -79,8 +93,10 @@ impl CacheManager { } else { let cache = Cache::builder() .max_capacity(page_cache_size) - .weigher(|k: &PageKey, v: &Arc| { - (k.estimated_size() + v.estimated_size()) as u32 + .weigher(page_cache_weight) + .eviction_listener(|k, v, _cause| { + let size = page_cache_weight(&k, &v); + CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into()); }) .build(); Some(cache) @@ -99,9 +115,10 @@ impl CacheManager { region_id: RegionId, file_id: FileId, ) -> Option> { - self.sst_meta_cache - .as_ref() - .and_then(|sst_meta_cache| sst_meta_cache.get(&SstMetaKey(region_id, file_id))) + self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| { + let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id)); + update_hit_miss(value, SST_META_TYPE) + }) } /// Puts [ParquetMetaData] into the cache. @@ -112,7 +129,11 @@ impl CacheManager { metadata: Arc, ) { if let Some(cache) = &self.sst_meta_cache { - cache.insert(SstMetaKey(region_id, file_id), metadata); + let key = SstMetaKey(region_id, file_id); + CACHE_BYTES + .with_label_values(&[SST_META_TYPE]) + .add(meta_cache_weight(&key, &metadata).into()); + cache.insert(key, metadata); } } @@ -125,33 +146,65 @@ impl CacheManager { /// Gets a vector with repeated value for specific `key`. pub fn get_repeated_vector(&self, key: &Value) -> Option { - self.vector_cache - .as_ref() - .and_then(|vector_cache| vector_cache.get(key)) + self.vector_cache.as_ref().and_then(|vector_cache| { + let value = vector_cache.get(key); + update_hit_miss(value, VECTOR_TYPE) + }) } /// Puts a vector with repeated value into the cache. pub fn put_repeated_vector(&self, key: Value, vector: VectorRef) { if let Some(cache) = &self.vector_cache { + CACHE_BYTES + .with_label_values(&[VECTOR_TYPE]) + .add(vector_cache_weight(&key, &vector).into()); cache.insert(key, vector); } } /// Gets pages for the row group. pub fn get_pages(&self, page_key: &PageKey) -> Option> { - self.page_cache - .as_ref() - .and_then(|page_cache| page_cache.get(page_key)) + self.page_cache.as_ref().and_then(|page_cache| { + let value = page_cache.get(page_key); + update_hit_miss(value, PAGE_TYPE) + }) } /// Puts pages of the row group into the cache. pub fn put_pages(&self, page_key: PageKey, pages: Arc) { if let Some(cache) = &self.page_cache { + CACHE_BYTES + .with_label_values(&[PAGE_TYPE]) + .add(page_cache_weight(&page_key, &pages).into()); cache.insert(page_key, pages); } } } +fn meta_cache_weight(k: &SstMetaKey, v: &Arc) -> u32 { + // We ignore the size of `Arc`. + (k.estimated_size() + parquet_meta_size(v)) as u32 +} + +fn vector_cache_weight(_k: &Value, v: &VectorRef) -> u32 { + // We ignore the heap size of `Value`. + (mem::size_of::() + v.memory_size()) as u32 +} + +fn page_cache_weight(k: &PageKey, v: &Arc) -> u32 { + (k.estimated_size() + v.estimated_size()) as u32 +} + +/// Updates cache hit/miss metrics. +fn update_hit_miss(value: Option, cache_type: &str) -> Option { + if value.is_some() { + CACHE_HIT.with_label_values(&[cache_type]).inc(); + } else { + CACHE_MISS.with_label_values(&[cache_type]).inc(); + } + value +} + /// Cache key (region id, file id) for SST meta. #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct SstMetaKey(RegionId, FileId); diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 7b8aa475da93..d53cbd495dd5 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -37,8 +37,6 @@ lazy_static! { ) .unwrap(); - - // ------ Flush related metrics /// Counter of scheduled flush requests. /// Note that the flush scheduler may merge some flush requests. @@ -122,4 +120,27 @@ lazy_static! { pub static ref MERGE_FILTER_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!("mito_merge_filter_rows_total", "mito merge filter rows total", &[TYPE_LABEL]).unwrap(); // ------- End of query metrics. + + // Cache related metrics. + /// Cache hit counter. + pub static ref CACHE_HIT: IntCounterVec = register_int_counter_vec!( + "mito_cache_hit", + "mito cache hit", + &[TYPE_LABEL] + ) + .unwrap(); + /// Cache miss counter. + pub static ref CACHE_MISS: IntCounterVec = register_int_counter_vec!( + "mito_cache_miss", + "mito cache miss", + &[TYPE_LABEL] + ) + .unwrap(); + /// Cache size in bytes. + pub static ref CACHE_BYTES: IntGaugeVec = register_int_gauge_vec!( + "mito_cache_bytes", + "mito cache bytes", + &[TYPE_LABEL] + ) + .unwrap(); } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 45f58a4c9616..49e38ddc4d3d 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -344,7 +344,7 @@ pub struct ParquetReader { read_format: ReadFormat, /// Builder to build row group readers. /// - /// The builder contains the file handle so don't drop the builder while using + /// The builder contains the file handle, so don't drop the builder while using /// the [ParquetReader]. reader_builder: RowGroupReaderBuilder, /// Reader of current row group. From 384903f35f8f70f9bcafc6204ba08199091996dd Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 16 Nov 2023 16:53:29 +0800 Subject: [PATCH 13/14] feat: change default page cache size --- config/datanode.example.toml | 4 ++-- config/standalone.example.toml | 4 ++-- src/mito2/src/config.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 2a1c97b16f08..d646fb93737b 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -105,8 +105,8 @@ global_write_buffer_reject_size = "2GB" sst_meta_cache_size = "128MB" # Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. vector_cache_size = "512MB" -# Cache size for pages of SST row groups (default 2GB). Setting it to 0 to disable the cache. -page_cache_size = "2GB" +# Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache. +page_cache_size = "512MB" # Buffer size for SST writing. sst_write_buffer_size = "8MB" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index e03014ca66db..b355896130d8 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -177,8 +177,8 @@ global_write_buffer_reject_size = "2GB" sst_meta_cache_size = "128MB" # Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. vector_cache_size = "512MB" -# Cache size for pages of SST row groups (default 2GB). Setting it to 0 to disable the cache. -page_cache_size = "2GB" +# Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache. +page_cache_size = "512MB" # Buffer size for SST writing. sst_write_buffer_size = "8MB" diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index f58266fd0db1..a51d469de3b5 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -65,7 +65,7 @@ pub struct MitoConfig { pub sst_meta_cache_size: ReadableSize, /// Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. pub vector_cache_size: ReadableSize, - /// Cache size for pages of SST row groups (default 2GB). Setting it to 0 to disable the cache. + /// Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache. pub page_cache_size: ReadableSize, // Other configs: @@ -87,7 +87,7 @@ impl Default for MitoConfig { global_write_buffer_reject_size: ReadableSize::gb(2), sst_meta_cache_size: ReadableSize::mb(128), vector_cache_size: ReadableSize::mb(512), - page_cache_size: ReadableSize::gb(2), + page_cache_size: ReadableSize::mb(512), sst_write_buffer_size: ReadableSize::mb(8), } } From 982b602bc01d45675034d8b4f268b566276a183e Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 16 Nov 2023 17:06:12 +0800 Subject: [PATCH 14/14] test: fix config api test --- tests-integration/tests/http.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f5479418fe56..80fef6fecc26 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -743,7 +743,7 @@ global_write_buffer_size = "1GiB" global_write_buffer_reject_size = "2GiB" sst_meta_cache_size = "128MiB" vector_cache_size = "512MiB" -page_cache_size = "2GiB" +page_cache_size = "512MiB" sst_write_buffer_size = "8MiB" [[datanode.region_engine]]