diff --git a/Cargo.lock b/Cargo.lock index 1eaf5b4eaf36..3581dd777ac2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1266,7 +1266,7 @@ dependencies = [ "log-store", "meta-client", "metrics", - "moka 0.11.3", + "moka", "object-store", "parking_lot 0.12.1", "partition", @@ -1544,7 +1544,7 @@ dependencies = [ "derive_builder 0.12.0", "enum_dispatch", "futures-util", - "moka 0.9.9", + "moka", "parking_lot 0.12.1", "prost", "rand", @@ -3347,7 +3347,7 @@ dependencies = [ "meta-client", "meta-srv", "metrics", - "moka 0.9.9", + "moka", "object-store", "openmetrics-parser", "opentelemetry-proto", @@ -5551,6 +5551,7 @@ dependencies = [ "log-store", "memcomparable", "metrics", + "moka", "object-store", "parquet", "paste", @@ -5569,32 +5570,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "moka" -version = "0.9.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b28455ac4363046076054a7e9cfbd7f168019c29dba32a625f59fc0aeffaaea4" -dependencies = [ - "async-io", - "async-lock", - "crossbeam-channel", - "crossbeam-epoch", - "crossbeam-utils", - "futures-util", - "num_cpus", - "once_cell", - "parking_lot 0.12.1", - "quanta 0.11.1", - "rustc_version", - "scheduled-thread-pool", - "skeptic", - "smallvec", - "tagptr", - "thiserror", - "triomphe", - "uuid", -] - [[package]] name = "moka" version = "0.11.3" @@ -6511,7 +6486,7 @@ dependencies = [ "datafusion-expr", "datatypes", "meta-client", - "moka 0.9.9", + "moka", "serde", "serde_json", "snafu", diff --git a/Cargo.toml b/Cargo.toml index 426b07d468b2..d1d56b343b6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,6 +82,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" +moka = { version = "0.11" } once_cell = "1.18" opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] } parquet = "43.0" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index b9ff8c19ae97..c8c2c2828d1a 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -30,7 +30,7 @@ futures-util.workspace = true lazy_static.workspace = true meta-client = { workspace = true } metrics.workspace = true -moka = { version = "0.11", features = ["future"] } +moka = { workspace = true, features = ["future"] } parking_lot = "0.12" partition.workspace = true regex.workspace = true diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index dde80a01943b..0b4f656fbc13 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -26,7 +26,7 @@ datatypes = { workspace = true } derive_builder.workspace = true enum_dispatch = "0.3" futures-util.workspace = true -moka = { version = "0.9", features = ["future"] } +moka = { workspace = true, features = ["future"] } parking_lot = "0.12" prost.workspace = true rand.workspace = true diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 285ad121ba63..bc6e48d91155 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -50,7 +50,7 @@ meta-client = { workspace = true } raft-engine = { workspace = true } # Although it is not used, please do not delete it. metrics.workspace = true -moka = { version = "0.9", features = ["future"] } +moka = { workspace = true, features = ["future"] } object-store = { workspace = true } openmetrics-parser = "0.4" opentelemetry-proto.workspace = true diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 0a1ff27f9e4c..2d884ae11e55 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -39,6 +39,7 @@ humantime-serde = { workspace = true } lazy_static = "1.4" memcomparable = "0.2" metrics.workspace = true +moka.workspace = true object-store = { workspace = true } parquet = { workspace = true, features = ["async"] } paste.workspace = true diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs new file mode 100644 index 000000000000..41ddd17af18a --- /dev/null +++ b/src/mito2/src/cache.rs @@ -0,0 +1,131 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Cache for the engine. + +mod cache_size; +#[cfg(test)] +pub(crate) mod test_util; + +use std::mem; +use std::sync::Arc; + +use moka::sync::Cache; +use parquet::file::metadata::ParquetMetaData; +use store_api::storage::RegionId; + +use crate::cache::cache_size::parquet_meta_size; +use crate::sst::file::FileId; + +/// Manages cached data for the engine. +pub struct CacheManager { + /// Cache for SST metadata. + sst_meta_cache: Option, +} + +pub type CacheManagerRef = Arc; + +impl CacheManager { + /// Creates a new manager with specific cache size in bytes. + pub fn new(sst_meta_cache_size: u64) -> CacheManager { + let sst_meta_cache = if sst_meta_cache_size == 0 { + None + } else { + let cache = Cache::builder() + .max_capacity(sst_meta_cache_size) + .weigher(|k: &SstMetaKey, v: &Arc| { + // We ignore the size of `Arc`. + (k.estimated_size() + parquet_meta_size(v)) as u32 + }) + .build(); + Some(cache) + }; + + CacheManager { sst_meta_cache } + } + + /// Gets cached [ParquetMetaData]. + pub fn get_parquet_meta_data( + &self, + region_id: RegionId, + file_id: FileId, + ) -> Option> { + self.sst_meta_cache + .as_ref() + .and_then(|sst_meta_cache| sst_meta_cache.get(&SstMetaKey(region_id, file_id))) + } + + /// Puts [ParquetMetaData] into the cache. + pub fn put_parquet_meta_data( + &self, + region_id: RegionId, + file_id: FileId, + metadata: Arc, + ) { + if let Some(cache) = &self.sst_meta_cache { + cache.insert(SstMetaKey(region_id, file_id), metadata); + } + } + + /// Removes [ParquetMetaData] from the cache. + pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) { + if let Some(cache) = &self.sst_meta_cache { + cache.remove(&SstMetaKey(region_id, file_id)); + } + } +} + +/// Cache key for SST meta. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct SstMetaKey(RegionId, FileId); + +impl SstMetaKey { + /// Returns memory used by the key (estimated). + fn estimated_size(&self) -> usize { + mem::size_of::() + } +} + +type SstMetaCache = Cache>; + +#[cfg(test)] +mod tests { + use super::*; + use crate::cache::test_util::parquet_meta; + + #[test] + fn test_disable_meta_cache() { + let cache = CacheManager::new(0); + assert!(cache.sst_meta_cache.is_none()); + + let region_id = RegionId::new(1, 1); + let file_id = FileId::random(); + let metadata = parquet_meta(); + cache.put_parquet_meta_data(region_id, file_id, metadata); + assert!(cache.get_parquet_meta_data(region_id, file_id).is_none()); + } + + #[test] + fn test_parquet_meta_cache() { + let cache = CacheManager::new(2000); + let region_id = RegionId::new(1, 1); + let file_id = FileId::random(); + assert!(cache.get_parquet_meta_data(region_id, file_id).is_none()); + let metadata = parquet_meta(); + cache.put_parquet_meta_data(region_id, file_id, metadata); + assert!(cache.get_parquet_meta_data(region_id, file_id).is_some()); + cache.remove_parquet_meta_data(region_id, file_id); + assert!(cache.get_parquet_meta_data(region_id, file_id).is_none()); + } +} diff --git a/src/mito2/src/cache/cache_size.rs b/src/mito2/src/cache/cache_size.rs new file mode 100644 index 000000000000..8ecd2d5e99c6 --- /dev/null +++ b/src/mito2/src/cache/cache_size.rs @@ -0,0 +1,142 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Cache size of different cache value. + +use std::mem; + +use parquet::file::metadata::{ + FileMetaData, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex, RowGroupMetaData, +}; +use parquet::file::page_index::index::Index; +use parquet::format::{ColumnOrder, KeyValue, PageLocation}; +use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor, Type}; + +/// Returns estimated size of [ParquetMetaData]. +pub fn parquet_meta_size(meta: &ParquetMetaData) -> usize { + // struct size + let mut size = mem::size_of::(); + // file_metadata + size += file_meta_heap_size(meta.file_metadata()); + // row_groups + size += meta + .row_groups() + .iter() + .map(row_group_meta_heap_size) + .sum::(); + // column_index + size += meta + .column_index() + .map(parquet_column_index_heap_size) + .unwrap_or(0); + // offset_index + size += meta + .offset_index() + .map(parquet_offset_index_heap_size) + .unwrap_or(0); + + size +} + +/// Returns estimated size of [FileMetaData] allocated from heap. +fn file_meta_heap_size(meta: &FileMetaData) -> usize { + // created_by + let mut size = meta.created_by().map(|s| s.len()).unwrap_or(0); + // key_value_metadata + size += meta + .key_value_metadata() + .map(|kvs| { + kvs.iter() + .map(|kv| { + kv.key.len() + + kv.value.as_ref().map(|v| v.len()).unwrap_or(0) + + mem::size_of::() + }) + .sum() + }) + .unwrap_or(0); + // schema_descr (It's a ptr so we also add size of SchemaDescriptor). + size += mem::size_of::(); + size += schema_descr_heap_size(meta.schema_descr()); + // column_orders + size += meta + .column_orders() + .map(|orders| orders.len() * mem::size_of::()) + .unwrap_or(0); + + size +} + +/// Returns estimated size of [SchemaDescriptor] allocated from heap. +fn schema_descr_heap_size(descr: &SchemaDescriptor) -> usize { + // schema + let mut size = mem::size_of::(); + // leaves + size += descr + .columns() + .iter() + .map(|descr| mem::size_of::() + column_descr_heap_size(descr)) + .sum::(); + // leaf_to_base + size += descr.num_columns() * mem::size_of::(); + + size +} + +/// Returns estimated size of [ColumnDescriptor] allocated from heap. +fn column_descr_heap_size(descr: &ColumnDescriptor) -> usize { + descr.path().parts().iter().map(|s| s.len()).sum() +} + +/// Returns estimated size of [ColumnDescriptor] allocated from heap. +fn row_group_meta_heap_size(meta: &RowGroupMetaData) -> usize { + mem::size_of_val(meta.columns()) +} + +/// Returns estimated size of [ParquetColumnIndex] allocated from heap. +fn parquet_column_index_heap_size(column_index: &ParquetColumnIndex) -> usize { + column_index + .iter() + .map(|row_group| row_group.len() * mem::size_of::() + mem::size_of_val(row_group)) + .sum() +} + +/// Returns estimated size of [ParquetOffsetIndex] allocated from heap. +fn parquet_offset_index_heap_size(offset_index: &ParquetOffsetIndex) -> usize { + offset_index + .iter() + .map(|row_group| { + row_group + .iter() + .map(|column| { + column.len() * mem::size_of::() + mem::size_of_val(column) + }) + .sum::() + + mem::size_of_val(row_group) + }) + .sum() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cache::test_util::parquet_meta; + + #[test] + fn test_parquet_meta_size() { + let metadata = parquet_meta(); + + assert_eq!(948, parquet_meta_size(&metadata)); + } +} diff --git a/src/mito2/src/cache/test_util.rs b/src/mito2/src/cache/test_util.rs new file mode 100644 index 000000000000..deb4ba23cd7b --- /dev/null +++ b/src/mito2/src/cache/test_util.rs @@ -0,0 +1,44 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for testing cache. + +use std::sync::Arc; + +use bytes::Bytes; +use datatypes::arrow::array::{ArrayRef, Int64Array}; +use datatypes::arrow::record_batch::RecordBatch; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use parquet::arrow::ArrowWriter; +use parquet::file::metadata::ParquetMetaData; + +/// Returns a parquet meta data. +pub(crate) fn parquet_meta() -> Arc { + let file_data = parquet_file_data(); + let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(file_data)).unwrap(); + builder.metadata().clone() +} + +/// Write a test parquet file to a buffer +fn parquet_file_data() -> Vec { + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap(); + writer.write(&to_write).unwrap(); + writer.close().unwrap(); + + buffer +} diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 3ca21a1aec91..f9e444cf38d3 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -56,6 +56,10 @@ pub struct MitoConfig { pub global_write_buffer_size: ReadableSize, /// Global write buffer size threshold to reject write requests (default 2G). pub global_write_buffer_reject_size: ReadableSize, + + // Cache configs: + /// Cache size for SST metadata (default 128MB). Setting it to 0 to disable cache. + pub sst_meta_cache_size: ReadableSize, } impl Default for MitoConfig { @@ -70,6 +74,7 @@ impl Default for MitoConfig { auto_flush_interval: Duration::from_secs(30 * 60), global_write_buffer_size: ReadableSize::gb(1), global_write_buffer_reject_size: ReadableSize::gb(2), + sst_meta_cache_size: ReadableSize::mb(128), } } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 7cd1079d2d12..987ecbf7e593 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -144,7 +144,14 @@ impl EngineInner { .get_region(region_id) .context(RegionNotFoundSnafu { region_id })?; let version = region.version(); - let scan_region = ScanRegion::new(version, region.access_layer.clone(), request); + // Get cache. + let cache_manager = self.workers.cache_manager(); + let scan_region = ScanRegion::new( + version, + region.access_layer.clone(), + request, + Some(cache_manager), + ); scan_region.scanner() } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 71d67983f091..dd404f2013b6 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -22,6 +22,7 @@ pub mod test_util; mod access_layer; +mod cache; mod compaction; pub mod config; pub mod engine; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index be8b318d83ff..083238dcd06d 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -22,6 +22,7 @@ use store_api::storage::ScanRequest; use table::predicate::{Predicate, TimeRangePredicateBuilder}; use crate::access_layer::AccessLayerRef; +use crate::cache::CacheManagerRef; use crate::error::{BuildPredicateSnafu, Result}; use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; @@ -113,6 +114,8 @@ pub(crate) struct ScanRegion { access_layer: AccessLayerRef, /// Scan request. request: ScanRequest, + /// Cache. + cache_manager: Option, } impl ScanRegion { @@ -121,11 +124,13 @@ impl ScanRegion { version: VersionRef, access_layer: AccessLayerRef, request: ScanRequest, + cache_manager: Option, ) -> ScanRegion { ScanRegion { version, access_layer, request, + cache_manager, } } @@ -181,7 +186,8 @@ impl ScanRegion { .with_time_range(Some(time_range)) .with_predicate(Some(predicate)) .with_memtables(memtables) - .with_files(files); + .with_files(files) + .with_cache(self.cache_manager); Ok(seq_scan) } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 148cb3777142..3a3a6a1eaad8 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -25,6 +25,7 @@ use snafu::ResultExt; use table::predicate::Predicate; use crate::access_layer::AccessLayerRef; +use crate::cache::CacheManagerRef; use crate::error::Result; use crate::memtable::MemtableRef; use crate::read::compat::{self, CompatReader}; @@ -49,6 +50,8 @@ pub struct SeqScan { memtables: Vec, /// Handles to SST files to scan. files: Vec, + /// Cache. + cache_manager: Option, } impl SeqScan { @@ -62,37 +65,44 @@ impl SeqScan { predicate: None, memtables: Vec::new(), files: Vec::new(), + cache_manager: None, } } - /// Set time range filter for time index. + /// Sets time range filter for time index. #[must_use] pub(crate) fn with_time_range(mut self, time_range: Option) -> Self { self.time_range = time_range; self } - /// Set predicate to push down. + /// Sets predicate to push down. #[must_use] pub(crate) fn with_predicate(mut self, predicate: Option) -> Self { self.predicate = predicate; self } - /// Set memtables to read. + /// Sets memtables to read. #[must_use] pub(crate) fn with_memtables(mut self, memtables: Vec) -> Self { self.memtables = memtables; self } - /// Set files to read. + /// Sets files to read. #[must_use] pub(crate) fn with_files(mut self, files: Vec) -> Self { self.files = files; self } + /// Sets cache for this query. + pub(crate) fn with_cache(mut self, cache: Option) -> Self { + self.cache_manager = cache; + self + } + /// Builds a stream for the query. pub async fn build_stream(&self) -> Result { // Scans all memtables and SSTs. Builds a merge reader to merge results. @@ -129,6 +139,7 @@ impl SeqScan { .predicate(self.predicate.clone()) .time_range(self.time_range) .projection(Some(self.mapper.column_ids().to_vec())) + .cache(self.cache_manager.clone()) .build() .await?; if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) { diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 3683d6fc2f73..6a8a3a805f7c 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -29,6 +29,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::AccessLayer; +use crate::cache::CacheManagerRef; use crate::config::MitoConfig; use crate::error::{EmptyRegionDirSnafu, RegionCorruptedSnafu, Result}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; @@ -51,6 +52,7 @@ pub(crate) struct RegionOpener { region_dir: String, scheduler: SchedulerRef, options: HashMap, + cache_manager: Option, } impl RegionOpener { @@ -70,6 +72,7 @@ impl RegionOpener { region_dir: normalize_dir(region_dir), scheduler, options: HashMap::new(), + cache_manager: None, } } @@ -85,6 +88,12 @@ impl RegionOpener { self } + /// Sets the cache manager for the region. + pub(crate) fn cache(mut self, cache_manager: Option) -> Self { + self.cache_manager = cache_manager; + self + } + /// Writes region manifest and creates a new region if it does not exist. /// Opens the region if it already exists. /// @@ -145,7 +154,11 @@ impl RegionOpener { version_control, access_layer: access_layer.clone(), manifest_manager, - file_purger: Arc::new(LocalFilePurger::new(self.scheduler, access_layer)), + file_purger: Arc::new(LocalFilePurger::new( + self.scheduler, + access_layer, + self.cache_manager, + )), last_flush_millis: AtomicI64::new(current_time_millis()), // Region is writable after it is created. writable: AtomicBool::new(true), @@ -205,6 +218,7 @@ impl RegionOpener { let file_purger = Arc::new(LocalFilePurger::new( self.scheduler.clone(), access_layer.clone(), + self.cache_manager.clone(), )); let mutable = self.memtable_builder.build(&metadata); let options = RegionOptions::try_from(&self.options)?; diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index ecf961854b7b..a16987690d09 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -118,6 +118,12 @@ impl FileHandle { inner: Arc::new(FileHandleInner::new(meta, file_purger)), } } + + /// Returns the region id of the file. + pub fn region_id(&self) -> RegionId { + self.inner.meta.region_id + } + /// Returns the file id. pub fn file_id(&self) -> FileId { self.inner.meta.file_id diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index ace502f4a659..15c3df6cc703 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -19,6 +19,7 @@ use common_telemetry::{error, info}; use store_api::storage::RegionId; use crate::access_layer::AccessLayerRef; +use crate::cache::CacheManagerRef; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file::FileId; @@ -39,10 +40,11 @@ pub trait FilePurger: Send + Sync + fmt::Debug { pub type FilePurgerRef = Arc; +/// Purger that purges file for current region. pub struct LocalFilePurger { scheduler: SchedulerRef, - sst_layer: AccessLayerRef, + cache_manager: Option, } impl fmt::Debug for LocalFilePurger { @@ -54,10 +56,16 @@ impl fmt::Debug for LocalFilePurger { } impl LocalFilePurger { - pub fn new(scheduler: SchedulerRef, sst_layer: AccessLayerRef) -> Self { + /// Creates a new purger. + pub fn new( + scheduler: SchedulerRef, + sst_layer: AccessLayerRef, + cache_manager: Option, + ) -> Self { Self { scheduler, sst_layer, + cache_manager, } } } @@ -68,6 +76,11 @@ impl FilePurger for LocalFilePurger { let region_id = request.region_id; let sst_layer = self.sst_layer.clone(); + // Remove meta of the file from cache. + if let Some(cache) = &self.cache_manager { + cache.remove_parquet_meta_data(region_id, file_id); + } + if let Err(e) = self.scheduler.schedule(Box::pin(async move { if let Err(e) = sst_layer.delete_sst(file_id).await { error!(e; "Failed to delete SST file, file: {}, region: {}", @@ -113,7 +126,7 @@ mod tests { let scheduler = Arc::new(LocalScheduler::new(3)); let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone())); - let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer)); + let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None)); { let handle = FileHandle::new( diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 6280f83eef10..a91d781dad92 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -14,29 +14,35 @@ //! Parquet reader. +use std::ops::Range; use std::sync::Arc; use async_compat::CompatExt; use async_trait::async_trait; +use bytes::Bytes; use common_time::range::TimestampRange; use datatypes::arrow::record_batch::RecordBatch; +use futures::future::BoxFuture; use futures::stream::BoxStream; -use futures::TryStreamExt; +use futures::{FutureExt, TryStreamExt}; use object_store::ObjectStore; +use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::errors::ParquetError; +use parquet::file::metadata::ParquetMetaData; use parquet::format::KeyValue; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, RegionId}; use table::predicate::Predicate; use tokio::io::BufReader; +use crate::cache::CacheManagerRef; use crate::error::{ InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result, }; use crate::read::{Batch, BatchReader}; -use crate::sst::file::FileHandle; +use crate::sst::file::{FileHandle, FileId}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::PARQUET_METADATA_KEY; @@ -55,6 +61,8 @@ pub struct ParquetReaderBuilder { /// `None` reads all columns. Due to schema change, the projection /// can contain columns not in the parquet file. projection: Option>, + /// Manager that caches SST data. + cache_manager: Option, } impl ParquetReaderBuilder { @@ -71,6 +79,7 @@ impl ParquetReaderBuilder { predicate: None, time_range: None, projection: None, + cache_manager: None, } } @@ -94,6 +103,12 @@ impl ParquetReaderBuilder { self } + /// Attaches the cache to the builder. + pub fn cache(mut self, cache: Option) -> ParquetReaderBuilder { + self.cache_manager = cache; + self + } + /// Builds and initializes a [ParquetReader]. /// /// This needs to perform IO operation. @@ -119,8 +134,16 @@ impl ParquetReaderBuilder { .await .context(OpenDalSnafu)? .compat(); - let buf_reader = BufReader::new(reader); - let mut builder = ParquetRecordBatchStreamBuilder::new(buf_reader) + let reader = BufReader::new(reader); + let reader = AsyncFileReaderCache { + reader, + // TODO(yingwen): Sets the metadata when we implement row group level reader. + metadata: None, + cache: self.cache_manager.clone(), + region_id: self.file_handle.region_id(), + file_id: self.file_handle.file_id(), + }; + let mut builder = ParquetRecordBatchStreamBuilder::new(reader) .await .context(ReadParquetSnafu { path: file_path })?; @@ -249,3 +272,58 @@ impl ParquetReader { self.read_format.metadata() } } + +/// Cache layer for parquet's [AsyncFileReader]. +struct AsyncFileReaderCache { + /// Underlying async file reader. + reader: T, + /// Parquet metadata cached locally. + metadata: Option>, + /// Global cache. + cache: Option, + /// Id of the region. + region_id: RegionId, + /// Id of the file to read. + file_id: FileId, +} + +impl AsyncFileReader for AsyncFileReaderCache { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + self.reader.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, Result, ParquetError>> { + self.reader.get_byte_ranges(ranges) + } + + fn get_metadata(&mut self) -> BoxFuture<'_, Result, ParquetError>> { + async { + // Tries to get from local cache. + if let Some(metadata) = &self.metadata { + return Ok(metadata.clone()); + } + + // Tries to get from global cache. + if let Some(metadata) = self + .cache + .as_ref() + .and_then(|cache| cache.get_parquet_meta_data(self.region_id, self.file_id)) + { + return Ok(metadata); + } + + // Cache miss. + let metadata = self.reader.get_metadata().await?; + // Cache the metadata. + if let Some(cache) = &self.cache { + cache.put_parquet_meta_data(self.region_id, self.file_id, metadata.clone()); + } + + Ok(metadata) + } + .boxed() + } +} diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 2f72724fd8a1..4b119c69959f 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -39,6 +39,7 @@ use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, Mutex}; +use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::CompactionScheduler; use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; @@ -95,8 +96,12 @@ pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping"; /// Chan1 --> WorkerThread1 /// ``` pub(crate) struct WorkerGroup { + /// Workers of the group. workers: Vec, + /// Global background job scheduelr. scheduler: SchedulerRef, + /// Cache. + cache_manager: CacheManagerRef, } impl WorkerGroup { @@ -114,6 +119,7 @@ impl WorkerGroup { config.global_write_buffer_size.as_bytes() as usize, )); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); + let cache_manager = Arc::new(CacheManager::new(config.sst_meta_cache_size.as_bytes())); let workers = (0..config.num_workers) .map(|id| { @@ -125,12 +131,17 @@ impl WorkerGroup { write_buffer_manager: write_buffer_manager.clone(), scheduler: scheduler.clone(), listener: WorkerListener::default(), + cache_manager: cache_manager.clone(), } .start() }) .collect(); - WorkerGroup { workers, scheduler } + WorkerGroup { + workers, + scheduler, + cache_manager, + } } /// Stops the worker group. @@ -166,6 +177,11 @@ impl WorkerGroup { self.worker(region_id).get_region(region_id) } + /// Returns cache of the group. + pub(crate) fn cache_manager(&self) -> CacheManagerRef { + self.cache_manager.clone() + } + /// Get worker for specific `region_id`. fn worker(&self, region_id: RegionId) -> &RegionWorker { let mut hasher = DefaultHasher::new(); @@ -193,6 +209,7 @@ impl WorkerGroup { assert!(config.num_workers.is_power_of_two()); let config = Arc::new(config); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); + let cache_manager = Arc::new(CacheManager::new(config.sst_meta_cache_size.as_bytes())); let workers = (0..config.num_workers) .map(|id| { @@ -204,12 +221,17 @@ impl WorkerGroup { write_buffer_manager: write_buffer_manager.clone(), scheduler: scheduler.clone(), listener: WorkerListener::new(listener.clone()), + cache_manager: cache_manager.clone(), } .start() }) .collect(); - WorkerGroup { workers, scheduler } + WorkerGroup { + workers, + scheduler, + cache_manager, + } } } @@ -226,6 +248,7 @@ struct WorkerStarter { write_buffer_manager: WriteBufferManagerRef, scheduler: SchedulerRef, listener: WorkerListener, + cache_manager: CacheManagerRef, } impl WorkerStarter { @@ -254,6 +277,7 @@ impl WorkerStarter { compaction_scheduler: CompactionScheduler::new(self.scheduler, sender.clone()), stalled_requests: StalledRequests::default(), listener: self.listener, + cache_manager: self.cache_manager, }; let handle = common_runtime::spawn_write(async move { worker_thread.run().await; @@ -376,7 +400,7 @@ impl StalledRequests { /// Background worker loop to handle requests. struct RegionWorkerLoop { - // Id of the worker. + /// Id of the worker. id: WorkerId, /// Engine config. config: Arc, @@ -408,6 +432,8 @@ struct RegionWorkerLoop { stalled_requests: StalledRequests, /// Event listener for tests. listener: WorkerListener, + /// Cache. + cache_manager: CacheManagerRef, } impl RegionWorkerLoop { diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 848082956894..e9ace31044a5 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -65,6 +65,7 @@ impl RegionWorkerLoop { ) .metadata(metadata) .options(request.options) + .cache(Some(self.cache_manager.clone())) .create_or_open(&self.config, &self.wal) .await?; diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index ab6967c0d1b6..cfeb43d84d98 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -63,6 +63,7 @@ impl RegionWorkerLoop { self.scheduler.clone(), ) .options(request.options) + .cache(Some(self.cache_manager.clone())) .open(&self.config, &self.wal) .await?; diff --git a/src/partition/Cargo.toml b/src/partition/Cargo.toml index a36962289d85..a6223aded686 100644 --- a/src/partition/Cargo.toml +++ b/src/partition/Cargo.toml @@ -19,7 +19,7 @@ datafusion-expr.workspace = true datafusion.workspace = true datatypes = { workspace = true } meta-client = { workspace = true } -moka = { version = "0.9", features = ["future"] } +moka = { workspace = true, features = ["future"] } serde.workspace = true serde_json = "1.0" snafu.workspace = true diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 0317482da7f4..2d9ddc6289f4 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -662,6 +662,7 @@ max_background_jobs = 4 auto_flush_interval = "30m" global_write_buffer_size = "1GiB" global_write_buffer_reject_size = "2GiB" +sst_meta_cache_size = "128MiB" [[region_engine]]