diff --git a/config/datanode.example.toml b/config/datanode.example.toml index a35c79a38e76..3d4d3c81e845 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -105,10 +105,11 @@ global_write_buffer_reject_size = "2GB" sst_meta_cache_size = "128MB" # Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. vector_cache_size = "512MB" +# Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache. +page_cache_size = "512MB" # Buffer size for SST writing. sst_write_buffer_size = "8MB" - # Log options, see `standalone.example.toml` # [logging] # dir = "/tmp/greptimedb/logs" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 623b5f5452e0..254b89d02f29 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -152,6 +152,36 @@ auto_flush_interval = "1h" # Global write buffer size for all regions. global_write_buffer_size = "1GB" +# Mito engine options +[[region_engine]] +[region_engine.mito] +# Number of region workers +num_workers = 8 +# Request channel size of each worker +worker_channel_size = 128 +# Max batch size for a worker to handle requests +worker_request_batch_size = 64 +# Number of meta action updated to trigger a new checkpoint for the manifest +manifest_checkpoint_distance = 10 +# Manifest compression type +manifest_compress_type = "Uncompressed" +# Max number of running background jobs +max_background_jobs = 4 +# Interval to auto flush a region if it has not flushed yet. +auto_flush_interval = "1h" +# Global write buffer size for all regions. +global_write_buffer_size = "1GB" +# Global write buffer size threshold to reject write requests (default 2G). +global_write_buffer_reject_size = "2GB" +# Cache size for SST metadata (default 128MB). Setting it to 0 to disable the cache. +sst_meta_cache_size = "128MB" +# Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. +vector_cache_size = "512MB" +# Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache. +page_cache_size = "512MB" +# Buffer size for SST writing. +sst_write_buffer_size = "8MB" + # Log options # [logging] # Specify logs directory. diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index dbdbdc72faa8..529e5d3d4eee 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -24,33 +24,51 @@ use std::sync::Arc; use datatypes::value::Value; use datatypes::vectors::VectorRef; use moka::sync::Cache; +use parquet::column::page::Page; use parquet::file::metadata::ParquetMetaData; use store_api::storage::RegionId; use crate::cache::cache_size::parquet_meta_size; +use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; use crate::sst::file::FileId; +// Metrics type key for sst meta. +const SST_META_TYPE: &str = "sst_meta"; +// Metrics type key for vector. +const VECTOR_TYPE: &str = "vector"; +// Metrics type key for pages. +const PAGE_TYPE: &str = "page"; + /// Manages cached data for the engine. pub struct CacheManager { /// Cache for SST metadata. sst_meta_cache: Option, /// Cache for vectors. vector_cache: Option, + /// Cache for SST pages. + page_cache: Option, } pub type CacheManagerRef = Arc; impl CacheManager { /// Creates a new manager with specific cache size in bytes. - pub fn new(sst_meta_cache_size: u64, vector_cache_size: u64) -> CacheManager { + pub fn new( + sst_meta_cache_size: u64, + vector_cache_size: u64, + page_cache_size: u64, + ) -> CacheManager { let sst_meta_cache = if sst_meta_cache_size == 0 { None } else { let cache = Cache::builder() .max_capacity(sst_meta_cache_size) - .weigher(|k: &SstMetaKey, v: &Arc| { - // We ignore the size of `Arc`. - (k.estimated_size() + parquet_meta_size(v)) as u32 + .weigher(meta_cache_weight) + .eviction_listener(|k, v, _cause| { + let size = meta_cache_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[SST_META_TYPE]) + .sub(size.into()); }) .build(); Some(cache) @@ -60,9 +78,25 @@ impl CacheManager { } else { let cache = Cache::builder() .max_capacity(vector_cache_size) - .weigher(|_k, v: &VectorRef| { - // We ignore the heap size of `Value`. - (mem::size_of::() + v.memory_size()) as u32 + .weigher(vector_cache_weight) + .eviction_listener(|k, v, _cause| { + let size = vector_cache_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[VECTOR_TYPE]) + .sub(size.into()); + }) + .build(); + Some(cache) + }; + let page_cache = if page_cache_size == 0 { + None + } else { + let cache = Cache::builder() + .max_capacity(page_cache_size) + .weigher(page_cache_weight) + .eviction_listener(|k, v, _cause| { + let size = page_cache_weight(&k, &v); + CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into()); }) .build(); Some(cache) @@ -71,6 +105,7 @@ impl CacheManager { CacheManager { sst_meta_cache, vector_cache, + page_cache, } } @@ -80,9 +115,10 @@ impl CacheManager { region_id: RegionId, file_id: FileId, ) -> Option> { - self.sst_meta_cache - .as_ref() - .and_then(|sst_meta_cache| sst_meta_cache.get(&SstMetaKey(region_id, file_id))) + self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| { + let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id)); + update_hit_miss(value, SST_META_TYPE) + }) } /// Puts [ParquetMetaData] into the cache. @@ -93,7 +129,11 @@ impl CacheManager { metadata: Arc, ) { if let Some(cache) = &self.sst_meta_cache { - cache.insert(SstMetaKey(region_id, file_id), metadata); + let key = SstMetaKey(region_id, file_id); + CACHE_BYTES + .with_label_values(&[SST_META_TYPE]) + .add(meta_cache_weight(&key, &metadata).into()); + cache.insert(key, metadata); } } @@ -106,17 +146,63 @@ impl CacheManager { /// Gets a vector with repeated value for specific `key`. pub fn get_repeated_vector(&self, key: &Value) -> Option { - self.vector_cache - .as_ref() - .and_then(|vector_cache| vector_cache.get(key)) + self.vector_cache.as_ref().and_then(|vector_cache| { + let value = vector_cache.get(key); + update_hit_miss(value, VECTOR_TYPE) + }) } /// Puts a vector with repeated value into the cache. pub fn put_repeated_vector(&self, key: Value, vector: VectorRef) { if let Some(cache) = &self.vector_cache { + CACHE_BYTES + .with_label_values(&[VECTOR_TYPE]) + .add(vector_cache_weight(&key, &vector).into()); cache.insert(key, vector); } } + + /// Gets pages for the row group. + pub fn get_pages(&self, page_key: &PageKey) -> Option> { + self.page_cache.as_ref().and_then(|page_cache| { + let value = page_cache.get(page_key); + update_hit_miss(value, PAGE_TYPE) + }) + } + + /// Puts pages of the row group into the cache. + pub fn put_pages(&self, page_key: PageKey, pages: Arc) { + if let Some(cache) = &self.page_cache { + CACHE_BYTES + .with_label_values(&[PAGE_TYPE]) + .add(page_cache_weight(&page_key, &pages).into()); + cache.insert(page_key, pages); + } + } +} + +fn meta_cache_weight(k: &SstMetaKey, v: &Arc) -> u32 { + // We ignore the size of `Arc`. + (k.estimated_size() + parquet_meta_size(v)) as u32 +} + +fn vector_cache_weight(_k: &Value, v: &VectorRef) -> u32 { + // We ignore the heap size of `Value`. + (mem::size_of::() + v.memory_size()) as u32 +} + +fn page_cache_weight(k: &PageKey, v: &Arc) -> u32 { + (k.estimated_size() + v.estimated_size()) as u32 +} + +/// Updates cache hit/miss metrics. +fn update_hit_miss(value: Option, cache_type: &str) -> Option { + if value.is_some() { + CACHE_HIT.with_label_values(&[cache_type]).inc(); + } else { + CACHE_MISS.with_label_values(&[cache_type]).inc(); + } + value } /// Cache key (region id, file id) for SST meta. @@ -126,7 +212,46 @@ struct SstMetaKey(RegionId, FileId); impl SstMetaKey { /// Returns memory used by the key (estimated). fn estimated_size(&self) -> usize { - mem::size_of::() + mem::size_of::() + } +} + +/// Cache key for pages of a SST row group. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PageKey { + /// Region id of the SST file to cache. + pub region_id: RegionId, + /// Id of the SST file to cache. + pub file_id: FileId, + /// Index of the row group. + pub row_group_idx: usize, + /// Index of the column in the row group. + pub column_idx: usize, +} + +impl PageKey { + /// Returns memory used by the key (estimated). + fn estimated_size(&self) -> usize { + mem::size_of::() + } +} + +/// Cached row group pages for a column. +pub struct PageValue { + /// All pages of the column in the row group. + pub pages: Vec, +} + +impl PageValue { + /// Creates a new page value. + pub fn new(pages: Vec) -> PageValue { + PageValue { pages } + } + + /// Returns memory used by the value (estimated). + fn estimated_size(&self) -> usize { + // We only consider heap size of all pages. + self.pages.iter().map(|page| page.buffer().len()).sum() } } @@ -136,6 +261,8 @@ type SstMetaCache = Cache>; /// /// e.g. `"hello" => ["hello", "hello", "hello"]` type VectorCache = Cache; +/// Maps (region, file, row group, column) to [PageValue]. +type PageCache = Cache>; #[cfg(test)] mod tests { @@ -146,8 +273,10 @@ mod tests { #[test] fn test_disable_cache() { - let cache = CacheManager::new(0, 0); + let cache = CacheManager::new(0, 0, 0); assert!(cache.sst_meta_cache.is_none()); + assert!(cache.vector_cache.is_none()); + assert!(cache.page_cache.is_none()); let region_id = RegionId::new(1, 1); let file_id = FileId::random(); @@ -159,11 +288,21 @@ mod tests { let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10])); cache.put_repeated_vector(value.clone(), vector.clone()); assert!(cache.get_repeated_vector(&value).is_none()); + + let key = PageKey { + region_id, + file_id, + row_group_idx: 0, + column_idx: 0, + }; + let pages = Arc::new(PageValue::new(Vec::new())); + cache.put_pages(key.clone(), pages); + assert!(cache.get_pages(&key).is_none()); } #[test] fn test_parquet_meta_cache() { - let cache = CacheManager::new(2000, 0); + let cache = CacheManager::new(2000, 0, 0); let region_id = RegionId::new(1, 1); let file_id = FileId::random(); assert!(cache.get_parquet_meta_data(region_id, file_id).is_none()); @@ -176,7 +315,7 @@ mod tests { #[test] fn test_repeated_vector_cache() { - let cache = CacheManager::new(0, 4096); + let cache = CacheManager::new(0, 4096, 0); let value = Value::Int64(10); assert!(cache.get_repeated_vector(&value).is_none()); let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10])); @@ -184,4 +323,21 @@ mod tests { let cached = cache.get_repeated_vector(&value).unwrap(); assert_eq!(vector, cached); } + + #[test] + fn test_page_cache() { + let cache = CacheManager::new(0, 0, 1000); + let region_id = RegionId::new(1, 1); + let file_id = FileId::random(); + let key = PageKey { + region_id, + file_id, + row_group_idx: 0, + column_idx: 0, + }; + assert!(cache.get_pages(&key).is_none()); + let pages = Arc::new(PageValue::new(Vec::new())); + cache.put_pages(key.clone(), pages); + assert!(cache.get_pages(&key).is_some()); + } } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index b4d77d145707..e6061ffa7a2e 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -63,6 +63,10 @@ pub struct MitoConfig { pub sst_meta_cache_size: ReadableSize, /// Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. pub vector_cache_size: ReadableSize, + /// Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache. + pub page_cache_size: ReadableSize, + + // Other configs: /// Buffer size for SST writing. pub sst_write_buffer_size: ReadableSize, } @@ -81,6 +85,7 @@ impl Default for MitoConfig { global_write_buffer_reject_size: ReadableSize::gb(2), sst_meta_cache_size: ReadableSize::mb(128), vector_cache_size: ReadableSize::mb(512), + page_cache_size: ReadableSize::mb(512), sst_write_buffer_size: ReadableSize::mb(8), } } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 7b8aa475da93..d53cbd495dd5 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -37,8 +37,6 @@ lazy_static! { ) .unwrap(); - - // ------ Flush related metrics /// Counter of scheduled flush requests. /// Note that the flush scheduler may merge some flush requests. @@ -122,4 +120,27 @@ lazy_static! { pub static ref MERGE_FILTER_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!("mito_merge_filter_rows_total", "mito merge filter rows total", &[TYPE_LABEL]).unwrap(); // ------- End of query metrics. + + // Cache related metrics. + /// Cache hit counter. + pub static ref CACHE_HIT: IntCounterVec = register_int_counter_vec!( + "mito_cache_hit", + "mito cache hit", + &[TYPE_LABEL] + ) + .unwrap(); + /// Cache miss counter. + pub static ref CACHE_MISS: IntCounterVec = register_int_counter_vec!( + "mito_cache_miss", + "mito cache miss", + &[TYPE_LABEL] + ) + .unwrap(); + /// Cache size in bytes. + pub static ref CACHE_BYTES: IntGaugeVec = register_int_gauge_vec!( + "mito_cache_bytes", + "mito cache bytes", + &[TYPE_LABEL] + ) + .unwrap(); } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index ee2305fdd916..7c3fda279026 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -686,7 +686,7 @@ mod tests { op_types: &[OpType], field: &[u64], ) -> Batch { - new_batch_builder(b"test", timestamps, sequences, op_types, field) + new_batch_builder(b"test", timestamps, sequences, op_types, 1, field) .build() .unwrap() } diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index ba1f462dcd4d..c5e6feefcbe1 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -342,7 +342,7 @@ mod tests { assert_eq!([0, 1, 2, 3, 4], mapper.column_ids()); assert_eq!([3, 4], mapper.batch_fields()); - let cache = CacheManager::new(0, 1024); + let cache = CacheManager::new(0, 1024, 0); let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3); let record_batch = mapper.convert(&batch, Some(&cache)).unwrap(); let expect = "\ diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 481f98f1af12..af3f8479f39c 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -15,6 +15,7 @@ //! SST in parquet format. mod format; +mod page_reader; pub mod reader; pub mod row_group; mod stats; @@ -59,3 +60,139 @@ pub struct SstInfo { /// Number of rows. pub num_rows: usize, } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::OpType; + use common_time::Timestamp; + + use super::*; + use crate::cache::{CacheManager, PageKey}; + use crate::read::Batch; + use crate::sst::parquet::reader::ParquetReaderBuilder; + use crate::sst::parquet::writer::ParquetWriter; + use crate::test_util::sst_util::{ + new_primary_key, new_source, sst_file_handle, sst_region_metadata, + }; + use crate::test_util::{check_reader_result, new_batch_builder, TestEnv}; + + const FILE_DIR: &str = "/"; + + fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { + assert!(end > start); + let pk = new_primary_key(tags); + let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); + let sequences = vec![1000; end - start]; + let op_types = vec![OpType::Put; end - start]; + let field: Vec<_> = (start..end).map(|v| v as u64).collect(); + new_batch_builder(&pk, ×tamps, &sequences, &op_types, 2, &field) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_write_read() { + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + let metadata = Arc::new(sst_region_metadata()); + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + // Use a small row group size for test. + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + + let mut writer = ParquetWriter::new(file_path, metadata, source, object_store.clone()); + let info = writer.write_all(&write_opts).await.unwrap().unwrap(); + assert_eq!(200, info.num_rows); + assert!(info.file_size > 0); + assert_eq!( + ( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(199) + ), + info.time_range + ); + + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); + let mut reader = builder.build().await.unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch_by_range(&["a", "d"], 0, 50), + new_batch_by_range(&["a", "d"], 50, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 150), + new_batch_by_range(&["b", "h"], 150, 200), + ], + ) + .await; + } + + #[tokio::test] + async fn test_read_with_cache() { + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + let metadata = Arc::new(sst_region_metadata()); + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + // Use a small row group size for test. + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + // Prepare data. + let mut writer = + ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone()); + writer.write_all(&write_opts).await.unwrap().unwrap(); + + let cache = Some(Arc::new(CacheManager::new(0, 0, 64 * 1024 * 1024))); + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store) + .cache(cache.clone()); + for _ in 0..3 { + let mut reader = builder.build().await.unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch_by_range(&["a", "d"], 0, 50), + new_batch_by_range(&["a", "d"], 50, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 150), + new_batch_by_range(&["b", "h"], 150, 200), + ], + ) + .await; + } + + // Cache 4 row groups. + for i in 0..4 { + let page_key = PageKey { + region_id: metadata.region_id, + file_id: handle.file_id(), + row_group_idx: i, + column_idx: 0, + }; + assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some()); + } + let page_key = PageKey { + region_id: metadata.region_id, + file_id: handle.file_id(), + row_group_idx: 5, + column_idx: 0, + }; + assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); + } +} diff --git a/src/mito2/src/sst/parquet/page_reader.rs b/src/mito2/src/sst/parquet/page_reader.rs new file mode 100644 index 000000000000..1416da448b5a --- /dev/null +++ b/src/mito2/src/sst/parquet/page_reader.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Parquet page reader. + +use std::collections::VecDeque; + +use parquet::column::page::{Page, PageMetadata, PageReader}; +use parquet::errors::Result; + +/// A reader that reads from cached pages. +pub(crate) struct CachedPageReader { + /// Cached pages. + pages: VecDeque, +} + +impl CachedPageReader { + /// Returns a new reader from existing pages. + pub(crate) fn new(pages: &[Page]) -> Self { + Self { + pages: pages.iter().cloned().collect(), + } + } +} + +impl PageReader for CachedPageReader { + fn get_next_page(&mut self) -> Result> { + Ok(self.pages.pop_front()) + } + + fn peek_next_page(&mut self) -> Result> { + Ok(self.pages.front().map(page_to_page_meta)) + } + + fn skip_next_page(&mut self) -> Result<()> { + // When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops + // the dictionary page. So it always return the dictionary page as the first page. See: + // https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770 + // But the `GenericColumnReader` will read the dictionary page before skipping records so it won't skip dictionary page. + // So we don't need to handle the dictionary page specifically in this method. + // https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331 + self.pages.pop_front(); + Ok(()) + } +} + +impl Iterator for CachedPageReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } +} + +/// Get [PageMetadata] from `page`. +/// +/// The conversion is based on [decode_page()](https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L438-L481) +/// and [PageMetadata](https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/page.rs#L279-L301). +fn page_to_page_meta(page: &Page) -> PageMetadata { + match page { + Page::DataPage { num_values, .. } => PageMetadata { + num_rows: None, + num_levels: Some(*num_values as usize), + is_dict: false, + }, + Page::DataPageV2 { + num_values, + num_rows, + .. + } => PageMetadata { + num_rows: Some(*num_rows as usize), + num_levels: Some(*num_values as usize), + is_dict: false, + }, + Page::DictionaryPage { .. } => PageMetadata { + num_rows: None, + num_levels: None, + is_dict: true, + }, + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index a56e140ec033..49e38ddc4d3d 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -183,6 +183,7 @@ impl ParquetReaderBuilder { file_reader: reader, projection: projection_mask, field_levels, + cache_manager: self.cache_manager.clone(), }; let metrics = Metrics { @@ -292,6 +293,8 @@ struct RowGroupReaderBuilder { projection: ProjectionMask, /// Field levels to read. field_levels: FieldLevels, + /// Cache. + cache_manager: Option, } impl RowGroupReaderBuilder { @@ -302,7 +305,13 @@ impl RowGroupReaderBuilder { /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. async fn build(&mut self, row_group_idx: usize) -> Result { - let mut row_group = InMemoryRowGroup::create(&self.parquet_meta, row_group_idx); + let mut row_group = InMemoryRowGroup::create( + self.file_handle.region_id(), + self.file_handle.file_id(), + &self.parquet_meta, + row_group_idx, + self.cache_manager.clone(), + ); // Fetches data into memory. row_group .fetch(&mut self.file_reader, &self.projection, None) @@ -334,6 +343,9 @@ pub struct ParquetReader { /// Not `None` if [ParquetReader::stream] is not `None`. read_format: ReadFormat, /// Builder to build row group readers. + /// + /// The builder contains the file handle, so don't drop the builder while using + /// the [ParquetReader]. reader_builder: RowGroupReaderBuilder, /// Reader of current row group. current_reader: Option, diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 7be77d692fc5..827db8999ae8 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -26,6 +26,11 @@ use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::reader::{ChunkReader, Length}; use parquet::file::serialized_reader::SerializedPageReader; use parquet::format::PageLocation; +use store_api::storage::RegionId; + +use crate::cache::{CacheManagerRef, PageKey, PageValue}; +use crate::sst::file::FileId; +use crate::sst::parquet::page_reader::CachedPageReader; /// An in-memory collection of column chunks pub struct InMemoryRowGroup<'a> { @@ -33,6 +38,14 @@ pub struct InMemoryRowGroup<'a> { page_locations: Option<&'a [Vec]>, column_chunks: Vec>>, row_count: usize, + region_id: RegionId, + file_id: FileId, + row_group_idx: usize, + cache_manager: Option, + /// Cached pages for each column. + /// + /// `column_cached_pages.len()` equals to `column_chunks.len()`. + column_cached_pages: Vec>>, } impl<'a> InMemoryRowGroup<'a> { @@ -40,8 +53,17 @@ impl<'a> InMemoryRowGroup<'a> { /// /// # Panics /// Panics if the `row_group_idx` is invalid. - pub fn create(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self { + pub fn create( + region_id: RegionId, + file_id: FileId, + parquet_meta: &'a ParquetMetaData, + row_group_idx: usize, + cache_manager: Option, + ) -> Self { let metadata = parquet_meta.row_group(row_group_idx); + // `page_locations` is always `None` if we don't set + // [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index) + // to `true`. let page_locations = parquet_meta .offset_index() .map(|x| x[row_group_idx].as_slice()); @@ -51,6 +73,11 @@ impl<'a> InMemoryRowGroup<'a> { row_count: metadata.num_rows() as usize, column_chunks: vec![None; metadata.columns().len()], page_locations, + region_id, + file_id, + row_group_idx, + cache_manager, + column_cached_pages: vec![None; metadata.columns().len()], } } @@ -114,22 +141,39 @@ impl<'a> InMemoryRowGroup<'a> { } } } else { - let fetch_ranges = self + // Now we only use cache in dense chunk data. + self.fetch_pages_from_cache(projection); + + let fetch_ranges: Vec<_> = self .column_chunks .iter() + .zip(&self.column_cached_pages) .enumerate() - .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) - .map(|(idx, _chunk)| { + // Don't need to fetch column data if we already cache the column's pages. + .filter(|&(idx, (chunk, cached_pages))| { + chunk.is_none() && projection.leaf_included(idx) && cached_pages.is_none() + }) + .map(|(idx, (_chunk, _cached_pages))| { let column = self.metadata.column(idx); let (start, length) = column.byte_range(); start as usize..(start + length) as usize }) .collect(); + if fetch_ranges.is_empty() { + // Nothing to fetch. + return Ok(()); + } + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { + for (idx, (chunk, cached_pages)) in self + .column_chunks + .iter_mut() + .zip(&self.column_cached_pages) + .enumerate() + { + if chunk.is_some() || !projection.leaf_included(idx) || cached_pages.is_some() { continue; } @@ -144,32 +188,82 @@ impl<'a> InMemoryRowGroup<'a> { Ok(()) } -} -impl<'a> RowGroups for InMemoryRowGroup<'a> { - fn num_rows(&self) -> usize { - self.row_count + /// Fetches pages for columns if cache is enabled. + fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) { + self.column_chunks + .iter() + .enumerate() + .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) + .for_each(|(idx, _chunk)| { + if let Some(cache) = &self.cache_manager { + let page_key = PageKey { + region_id: self.region_id, + file_id: self.file_id, + row_group_idx: self.row_group_idx, + column_idx: idx, + }; + self.column_cached_pages[idx] = cache.get_pages(&page_key); + } + }); } - fn column_chunks(&self, i: usize) -> Result> { - match &self.column_chunks[i] { - None => Err(ParquetError::General(format!( - "Invalid column index {i}, column was not fetched" - ))), + /// Creates a page reader to read column at `i`. + fn column_page_reader(&self, i: usize) -> Result> { + if let Some(cached_pages) = &self.column_cached_pages[i] { + // Already in cache. + return Ok(Box::new(CachedPageReader::new(&cached_pages.pages))); + } + + // Cache miss. + let page_reader = match &self.column_chunks[i] { + None => { + return Err(ParquetError::General(format!( + "Invalid column index {i}, column was not fetched" + ))) + } Some(data) => { let page_locations = self.page_locations.map(|index| index[i].clone()); - let page_reader: Box = Box::new(SerializedPageReader::new( + SerializedPageReader::new( data.clone(), self.metadata.column(i), self.row_count, page_locations, - )?); - - Ok(Box::new(ColumnChunkIterator { - reader: Some(Ok(page_reader)), - })) + )? } - } + }; + + let Some(cache) = &self.cache_manager else { + // Cache is disabled. + return Ok(Box::new(page_reader)); + }; + + // We collect all pages and put them into the cache. + let pages = page_reader.collect::>>()?; + let page_value = Arc::new(PageValue::new(pages)); + let page_key = PageKey { + region_id: self.region_id, + file_id: self.file_id, + row_group_idx: self.row_group_idx, + column_idx: i, + }; + cache.put_pages(page_key, page_value.clone()); + + Ok(Box::new(CachedPageReader::new(&page_value.pages))) + } +} + +impl<'a> RowGroups for InMemoryRowGroup<'a> { + fn num_rows(&self) -> usize { + self.row_count + } + + fn column_chunks(&self, i: usize) -> Result> { + let page_reader = self.column_page_reader(i)?; + + Ok(Box::new(ColumnChunkIterator { + reader: Some(Ok(page_reader)), + })) } } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index aa68a1e1c6cd..17d50dc3a3f1 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -163,5 +163,3 @@ impl SourceStats { } } } - -// TODO(yingwen): Port tests. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 69bd22d26e1b..e602643ff5c5 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -17,6 +17,7 @@ pub mod memtable_util; pub mod meta_util; pub mod scheduler_util; +pub mod sst_util; pub mod version_util; use std::collections::HashMap; @@ -195,6 +196,12 @@ impl TestEnv { ) } + /// Only initializes the object store manager, returns the default object store. + pub fn init_object_store_manager(&mut self) -> ObjectStore { + self.object_store_manager = Some(Arc::new(self.create_object_store_manager())); + self.get_object_store().unwrap() + } + /// Creates a new [WorkerGroup] with specific config under this env. pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; @@ -207,14 +214,19 @@ impl TestEnv { ) -> (RaftEngineLogStore, ObjectStoreManager) { let data_home = self.data_home.path(); let wal_path = data_home.join("wal"); - let data_path = data_home.join("data").as_path().display().to_string(); - let log_store = log_store_util::create_tmp_local_file_log_store(&wal_path).await; + + let object_store_manager = self.create_object_store_manager(); + (log_store, object_store_manager) + } + + fn create_object_store_manager(&self) -> ObjectStoreManager { + let data_home = self.data_home.path(); + let data_path = data_home.join("data").as_path().display().to_string(); let mut builder = Fs::default(); builder.root(&data_path); let object_store = ObjectStore::new(builder).unwrap().finish(); - let object_store_manager = ObjectStoreManager::new("default", object_store); - (log_store, object_store_manager) + ObjectStoreManager::new("default", object_store) } /// If `initial_metadata` is `Some`, creates a new manifest. If `initial_metadata` @@ -414,6 +426,7 @@ pub fn new_batch_builder( timestamps: &[i64], sequences: &[u64], op_types: &[OpType], + field_column_id: ColumnId, field: &[u64], ) -> BatchBuilder { let mut builder = BatchBuilder::new(primary_key.to_vec()); @@ -431,13 +444,14 @@ pub fn new_batch_builder( ))) .unwrap() .push_field_array( - 1, + field_column_id, Arc::new(UInt64Array::from_iter_values(field.iter().copied())), ) .unwrap(); builder } +/// Returns a new [Batch] whose field has column id 1. pub fn new_batch( primary_key: &[u8], timestamps: &[i64], @@ -445,7 +459,7 @@ pub fn new_batch( op_types: &[OpType], field: &[u64], ) -> Batch { - new_batch_builder(primary_key, timestamps, sequences, op_types, field) + new_batch_builder(primary_key, timestamps, sequences, op_types, 1, field) .build() .unwrap() } diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs new file mode 100644 index 000000000000..3638d119faa1 --- /dev/null +++ b/src/mito2/src/test_util/sst_util.rs @@ -0,0 +1,112 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for testing SSTs. + +use api::v1::SemanticType; +use common_time::Timestamp; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use datatypes::value::ValueRef; +use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; +use store_api::storage::RegionId; + +use crate::read::{Batch, Source}; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::sst::file::{FileHandle, FileId, FileMeta}; +use crate::test_util::{new_noop_file_purger, VecBatchReader}; + +/// Test region id. +const REGION_ID: RegionId = RegionId::new(0, 0); + +/// Creates a new region metadata for testing SSTs. +/// +/// Schema: tag_0, tag_1, field_0, ts +pub fn sst_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(REGION_ID); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_0".to_string(), + ConcreteDataType::uint64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![0, 1]); + builder.build().unwrap() +} + +/// Encodes a primary key for specific tags. +pub fn new_primary_key(tags: &[&str]) -> Vec { + let fields = (0..tags.len()) + .map(|_| SortField::new(ConcreteDataType::string_datatype())) + .collect(); + let converter = McmpRowCodec::new(fields); + converter + .encode(tags.iter().map(|tag| ValueRef::String(tag))) + .unwrap() +} + +/// Creates a [Source] from `batches`. +pub fn new_source(batches: &[Batch]) -> Source { + let reader = VecBatchReader::new(batches); + Source::Reader(Box::new(reader)) +} + +/// Creates a new [FileHandle] for a SST. +pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { + let file_purger = new_noop_file_purger(); + FileHandle::new( + FileMeta { + region_id: REGION_ID, + file_id: FileId::random(), + time_range: ( + Timestamp::new_millisecond(start_ms), + Timestamp::new_millisecond(end_ms), + ), + level: 0, + file_size: 0, + }, + file_purger, + ) +} diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index ebff32be789d..64f7472c3842 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -122,6 +122,7 @@ impl WorkerGroup { let cache_manager = Arc::new(CacheManager::new( config.sst_meta_cache_size.as_bytes(), config.vector_cache_size.as_bytes(), + config.page_cache_size.as_bytes(), )); let workers = (0..config.num_workers) @@ -219,6 +220,7 @@ impl WorkerGroup { let cache_manager = Arc::new(CacheManager::new( config.sst_meta_cache_size.as_bytes(), config.vector_cache_size.as_bytes(), + config.page_cache_size.as_bytes(), )); let workers = (0..config.num_workers) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c8f7ed871756..e5b398e2a673 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -743,6 +743,7 @@ global_write_buffer_size = "1GiB" global_write_buffer_reject_size = "2GiB" sst_meta_cache_size = "128MiB" vector_cache_size = "512MiB" +page_cache_size = "512MiB" sst_write_buffer_size = "8MiB" [[datanode.region_engine]]