From 0a8d0d35ffb2a8972209a0489f157c6da3a67f0c Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 24 Sep 2023 16:34:09 +0800 Subject: [PATCH 01/15] feat: add cache manager --- src/mito2/src/cache.rs | 34 +++++++++++++++++++++++++++++++ src/mito2/src/config.rs | 5 +++++ src/mito2/src/engine.rs | 5 ++++- src/mito2/src/lib.rs | 1 + src/mito2/src/read/scan_region.rs | 5 +++++ src/mito2/src/read/seq_scan.rs | 18 ++++++++++++---- src/mito2/src/worker.rs | 26 ++++++++++++++++++++--- 7 files changed, 86 insertions(+), 8 deletions(-) create mode 100644 src/mito2/src/cache.rs diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs new file mode 100644 index 000000000000..70b07ee91883 --- /dev/null +++ b/src/mito2/src/cache.rs @@ -0,0 +1,34 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Cache for the engine. + +use std::sync::Arc; + +/// Manages cached data for the engine. +pub struct CacheManager {} + +pub type CacheManagerRef = Arc; + +impl CacheManager { + /// Creates a new manager with specific cache capacity in bytes. + /// Returns `None` if `capacity` is 0. + pub fn new(capacity: usize) -> Option { + if capacity == 0 { + None + } else { + Some(CacheManager {}) + } + } +} diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 3ca21a1aec91..1c9fdda73682 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -56,6 +56,10 @@ pub struct MitoConfig { pub global_write_buffer_size: ReadableSize, /// Global write buffer size threshold to reject write requests (default 2G). pub global_write_buffer_reject_size: ReadableSize, + + // Cache configs: + /// Total size for cache (default 512MB). Setting it to 0 to disable cache. + pub cache_size: ReadableSize, } impl Default for MitoConfig { @@ -70,6 +74,7 @@ impl Default for MitoConfig { auto_flush_interval: Duration::from_secs(30 * 60), global_write_buffer_size: ReadableSize::gb(1), global_write_buffer_reject_size: ReadableSize::gb(2), + cache_size: ReadableSize::mb(512), } } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 7cd1079d2d12..0674e2688ae3 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -144,7 +144,10 @@ impl EngineInner { .get_region(region_id) .context(RegionNotFoundSnafu { region_id })?; let version = region.version(); - let scan_region = ScanRegion::new(version, region.access_layer.clone(), request); + // Get cache. + let cache_manager = self.workers.cache_manager(); + let scan_region = + ScanRegion::new(version, region.access_layer.clone(), request, cache_manager); scan_region.scanner() } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 71d67983f091..dd404f2013b6 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -22,6 +22,7 @@ pub mod test_util; mod access_layer; +mod cache; mod compaction; pub mod config; pub mod engine; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index be8b318d83ff..57eb3cb5c550 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -22,6 +22,7 @@ use store_api::storage::ScanRequest; use table::predicate::{Predicate, TimeRangePredicateBuilder}; use crate::access_layer::AccessLayerRef; +use crate::cache::CacheManagerRef; use crate::error::{BuildPredicateSnafu, Result}; use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; @@ -113,6 +114,8 @@ pub(crate) struct ScanRegion { access_layer: AccessLayerRef, /// Scan request. request: ScanRequest, + /// Cache. + cache_manager: Option, } impl ScanRegion { @@ -121,11 +124,13 @@ impl ScanRegion { version: VersionRef, access_layer: AccessLayerRef, request: ScanRequest, + cache_manager: Option, ) -> ScanRegion { ScanRegion { version, access_layer, request, + cache_manager, } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 148cb3777142..85945b9c1277 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -25,6 +25,7 @@ use snafu::ResultExt; use table::predicate::Predicate; use crate::access_layer::AccessLayerRef; +use crate::cache::CacheManagerRef; use crate::error::Result; use crate::memtable::MemtableRef; use crate::read::compat::{self, CompatReader}; @@ -49,6 +50,8 @@ pub struct SeqScan { memtables: Vec, /// Handles to SST files to scan. files: Vec, + /// Cache. + cache_manager: Option, } impl SeqScan { @@ -62,37 +65,44 @@ impl SeqScan { predicate: None, memtables: Vec::new(), files: Vec::new(), + cache_manager, } } - /// Set time range filter for time index. + /// Sets time range filter for time index. #[must_use] pub(crate) fn with_time_range(mut self, time_range: Option) -> Self { self.time_range = time_range; self } - /// Set predicate to push down. + /// Sets predicate to push down. #[must_use] pub(crate) fn with_predicate(mut self, predicate: Option) -> Self { self.predicate = predicate; self } - /// Set memtables to read. + /// Sets memtables to read. #[must_use] pub(crate) fn with_memtables(mut self, memtables: Vec) -> Self { self.memtables = memtables; self } - /// Set files to read. + /// Sets files to read. #[must_use] pub(crate) fn with_files(mut self, files: Vec) -> Self { self.files = files; self } + /// Sets cache for this query. + pub(crate) fn with_cache(mut self, cache: Option) -> Self { + self.cache_manager = cache; + self + } + /// Builds a stream for the query. pub async fn build_stream(&self) -> Result { // Scans all memtables and SSTs. Builds a merge reader to merge results. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 2f72724fd8a1..283382cda3c2 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -39,6 +39,7 @@ use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, Mutex}; +use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::CompactionScheduler; use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; @@ -95,8 +96,12 @@ pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping"; /// Chan1 --> WorkerThread1 /// ``` pub(crate) struct WorkerGroup { + /// Workers of the group. workers: Vec, + /// Global background job scheduelr. scheduler: SchedulerRef, + /// Cache. + cache_manager: Option, } impl WorkerGroup { @@ -114,6 +119,7 @@ impl WorkerGroup { config.global_write_buffer_size.as_bytes() as usize, )); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); + let cache_manager = CacheManager::new(config.cache_size.as_bytes() as usize).map(Arc::new); let workers = (0..config.num_workers) .map(|id| { @@ -130,7 +136,11 @@ impl WorkerGroup { }) .collect(); - WorkerGroup { workers, scheduler } + WorkerGroup { + workers, + scheduler, + cache_manager, + } } /// Stops the worker group. @@ -166,6 +176,11 @@ impl WorkerGroup { self.worker(region_id).get_region(region_id) } + /// Returns cache of the group. + pub(crate) fn cache_manager(&self) -> Option { + self.cache_manager.clone() + } + /// Get worker for specific `region_id`. fn worker(&self, region_id: RegionId) -> &RegionWorker { let mut hasher = DefaultHasher::new(); @@ -193,6 +208,7 @@ impl WorkerGroup { assert!(config.num_workers.is_power_of_two()); let config = Arc::new(config); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); + let cache_manager = CacheManager::new(config.cache_size.as_bytes() as usize).map(Arc::new); let workers = (0..config.num_workers) .map(|id| { @@ -209,7 +225,11 @@ impl WorkerGroup { }) .collect(); - WorkerGroup { workers, scheduler } + WorkerGroup { + workers, + scheduler, + cache_manager, + } } } @@ -376,7 +396,7 @@ impl StalledRequests { /// Background worker loop to handle requests. struct RegionWorkerLoop { - // Id of the worker. + /// Id of the worker. id: WorkerId, /// Engine config. config: Arc, From 18aa884da366243272060501befa12ba71b69d0d Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 24 Sep 2023 16:39:28 +0800 Subject: [PATCH 02/15] feat: add cache to reader builder --- src/mito2/src/read/seq_scan.rs | 3 ++- src/mito2/src/sst/parquet/reader.rs | 10 ++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 85945b9c1277..3a3a6a1eaad8 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -65,7 +65,7 @@ impl SeqScan { predicate: None, memtables: Vec::new(), files: Vec::new(), - cache_manager, + cache_manager: None, } } @@ -139,6 +139,7 @@ impl SeqScan { .predicate(self.predicate.clone()) .time_range(self.time_range) .projection(Some(self.mapper.column_ids().to_vec())) + .cache(self.cache_manager.clone()) .build() .await?; if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 6280f83eef10..3a78cb9a447d 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -32,6 +32,7 @@ use store_api::storage::ColumnId; use table::predicate::Predicate; use tokio::io::BufReader; +use crate::cache::CacheManagerRef; use crate::error::{ InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result, }; @@ -55,6 +56,8 @@ pub struct ParquetReaderBuilder { /// `None` reads all columns. Due to schema change, the projection /// can contain columns not in the parquet file. projection: Option>, + /// Manager that caches SST data. + cache_manager: Option, } impl ParquetReaderBuilder { @@ -71,6 +74,7 @@ impl ParquetReaderBuilder { predicate: None, time_range: None, projection: None, + cache_manager: None, } } @@ -94,6 +98,12 @@ impl ParquetReaderBuilder { self } + /// Attaches the cache to the builder. + pub fn cache(mut self, cache: Option) -> ParquetReaderBuilder { + self.cache_manager = cache; + self + } + /// Builds and initializes a [ParquetReader]. /// /// This needs to perform IO operation. From 7e27b2a945cd685dbfe95464ce86abd2b408ba65 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 24 Sep 2023 16:52:42 +0800 Subject: [PATCH 03/15] feat: add AsyncFileReaderCache --- src/mito2/src/sst/parquet/reader.rs | 37 +++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 3a78cb9a447d..2ddbc3803a71 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -14,17 +14,22 @@ //! Parquet reader. +use std::ops::Range; use std::sync::Arc; use async_compat::CompatExt; use async_trait::async_trait; +use bytes::Bytes; use common_time::range::TimestampRange; use datatypes::arrow::record_batch::RecordBatch; +use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::TryStreamExt; use object_store::ObjectStore; +use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::errors::ParquetError; +use parquet::file::metadata::ParquetMetaData; use parquet::format::KeyValue; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; @@ -129,8 +134,12 @@ impl ParquetReaderBuilder { .await .context(OpenDalSnafu)? .compat(); - let buf_reader = BufReader::new(reader); - let mut builder = ParquetRecordBatchStreamBuilder::new(buf_reader) + let reader = BufReader::new(reader); + let reader = AsyncFileReaderCache { + reader, + cache: self.cache_manager.clone(), + }; + let mut builder = ParquetRecordBatchStreamBuilder::new(reader) .await .context(ReadParquetSnafu { path: file_path })?; @@ -259,3 +268,27 @@ impl ParquetReader { self.read_format.metadata() } } + +/// Cache layer for parquet's [AsyncFileReader]. +struct AsyncFileReaderCache { + reader: T, + cache: Option, +} + +impl AsyncFileReader for AsyncFileReaderCache { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + self.reader.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, Result, ParquetError>> { + self.reader.get_byte_ranges(ranges) + } + + fn get_metadata(&mut self) -> BoxFuture<'_, Result, ParquetError>> { + // TODO(yingwen): cache metadata. + self.reader.get_metadata() + } +} From 26dca2fc39a4c82a4c40a05cee66c5ef46f692f1 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 24 Sep 2023 17:50:21 +0800 Subject: [PATCH 04/15] feat: Impl AsyncFileReaderCache --- src/mito2/src/cache.rs | 25 ++++++++++++++++ src/mito2/src/sst/file.rs | 6 ++++ src/mito2/src/sst/parquet/reader.rs | 44 +++++++++++++++++++++++++---- 3 files changed, 70 insertions(+), 5 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 70b07ee91883..32783a22ab2b 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -16,6 +16,11 @@ use std::sync::Arc; +use parquet::file::metadata::ParquetMetaData; +use store_api::storage::RegionId; + +use crate::sst::file::FileId; + /// Manages cached data for the engine. pub struct CacheManager {} @@ -31,4 +36,24 @@ impl CacheManager { Some(CacheManager {}) } } + + /// Gets cached [ParquetMetaData]. + pub fn get_parquet_meta_data( + &self, + _region_id: RegionId, + _file_id: FileId, + ) -> Option> { + // TODO(yingwen): Implements it. + None + } + + /// Puts [ParquetMetaData] into the cache. + pub fn put_parquet_meta_data( + &self, + _region_id: RegionId, + _file_id: FileId, + _metadata: Arc, + ) { + // TODO(yingwen): Implements it. + } } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index ecf961854b7b..a16987690d09 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -118,6 +118,12 @@ impl FileHandle { inner: Arc::new(FileHandleInner::new(meta, file_purger)), } } + + /// Returns the region id of the file. + pub fn region_id(&self) -> RegionId { + self.inner.meta.region_id + } + /// Returns the file id. pub fn file_id(&self) -> FileId { self.inner.meta.file_id diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 2ddbc3803a71..003630845efd 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -24,7 +24,7 @@ use common_time::range::TimestampRange; use datatypes::arrow::record_batch::RecordBatch; use futures::future::BoxFuture; use futures::stream::BoxStream; -use futures::TryStreamExt; +use futures::{FutureExt, TryStreamExt}; use object_store::ObjectStore; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; @@ -33,7 +33,7 @@ use parquet::file::metadata::ParquetMetaData; use parquet::format::KeyValue; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, RegionId}; use table::predicate::Predicate; use tokio::io::BufReader; @@ -42,7 +42,7 @@ use crate::error::{ InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result, }; use crate::read::{Batch, BatchReader}; -use crate::sst::file::FileHandle; +use crate::sst::file::{FileHandle, FileId}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::PARQUET_METADATA_KEY; @@ -137,7 +137,10 @@ impl ParquetReaderBuilder { let reader = BufReader::new(reader); let reader = AsyncFileReaderCache { reader, + metadata: None, cache: self.cache_manager.clone(), + region_id: self.file_handle.region_id(), + file_id: self.file_handle.file_id(), }; let mut builder = ParquetRecordBatchStreamBuilder::new(reader) .await @@ -271,8 +274,16 @@ impl ParquetReader { /// Cache layer for parquet's [AsyncFileReader]. struct AsyncFileReaderCache { + /// Underlying async file reader. reader: T, + /// Parquet metadata cached locally. + metadata: Option>, + /// Global cache. cache: Option, + /// Id of the region. + region_id: RegionId, + /// Id of the file to read. + file_id: FileId, } impl AsyncFileReader for AsyncFileReaderCache { @@ -288,7 +299,30 @@ impl AsyncFileReader for AsyncFileReaderCache { } fn get_metadata(&mut self) -> BoxFuture<'_, Result, ParquetError>> { - // TODO(yingwen): cache metadata. - self.reader.get_metadata() + async { + // Tries to get from local cache. + if let Some(metadata) = &self.metadata { + return Ok(metadata.clone()); + } + + // Tries to get from global cache. + if let Some(metadata) = self + .cache + .as_ref() + .and_then(|cache| cache.get_parquet_meta_data(self.region_id, self.file_id)) + { + return Ok(metadata); + } + + // Cache miss. + let metadata = self.reader.get_metadata().await?; + // Cache the metadata. + if let Some(cache) = &self.cache { + cache.put_parquet_meta_data(self.region_id, self.file_id, metadata.clone()); + } + + Ok(metadata) + } + .boxed() } } From 5ee49bd89d7ef47a0b42017275797f6afa12b827 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 24 Sep 2023 19:16:47 +0800 Subject: [PATCH 05/15] chore: move moka dep to workspace --- Cargo.lock | 34 ++++------------------------------ Cargo.toml | 1 + src/catalog/Cargo.toml | 2 +- src/client/Cargo.toml | 2 +- src/frontend/Cargo.toml | 2 +- src/mito2/Cargo.toml | 1 + src/partition/Cargo.toml | 2 +- 7 files changed, 10 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1eaf5b4eaf36..e179470ad696 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1266,7 +1266,7 @@ dependencies = [ "log-store", "meta-client", "metrics", - "moka 0.11.3", + "moka", "object-store", "parking_lot 0.12.1", "partition", @@ -1544,7 +1544,7 @@ dependencies = [ "derive_builder 0.12.0", "enum_dispatch", "futures-util", - "moka 0.9.9", + "moka", "parking_lot 0.12.1", "prost", "rand", @@ -3347,7 +3347,7 @@ dependencies = [ "meta-client", "meta-srv", "metrics", - "moka 0.9.9", + "moka", "object-store", "openmetrics-parser", "opentelemetry-proto", @@ -5569,32 +5569,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "moka" -version = "0.9.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b28455ac4363046076054a7e9cfbd7f168019c29dba32a625f59fc0aeffaaea4" -dependencies = [ - "async-io", - "async-lock", - "crossbeam-channel", - "crossbeam-epoch", - "crossbeam-utils", - "futures-util", - "num_cpus", - "once_cell", - "parking_lot 0.12.1", - "quanta 0.11.1", - "rustc_version", - "scheduled-thread-pool", - "skeptic", - "smallvec", - "tagptr", - "thiserror", - "triomphe", - "uuid", -] - [[package]] name = "moka" version = "0.11.3" @@ -6511,7 +6485,7 @@ dependencies = [ "datafusion-expr", "datatypes", "meta-client", - "moka 0.9.9", + "moka", "serde", "serde_json", "snafu", diff --git a/Cargo.toml b/Cargo.toml index 426b07d468b2..d1d56b343b6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,6 +82,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" +moka = { version = "0.11" } once_cell = "1.18" opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] } parquet = "43.0" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index b9ff8c19ae97..c8c2c2828d1a 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -30,7 +30,7 @@ futures-util.workspace = true lazy_static.workspace = true meta-client = { workspace = true } metrics.workspace = true -moka = { version = "0.11", features = ["future"] } +moka = { workspace = true, features = ["future"] } parking_lot = "0.12" partition.workspace = true regex.workspace = true diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index dde80a01943b..0b4f656fbc13 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -26,7 +26,7 @@ datatypes = { workspace = true } derive_builder.workspace = true enum_dispatch = "0.3" futures-util.workspace = true -moka = { version = "0.9", features = ["future"] } +moka = { workspace = true, features = ["future"] } parking_lot = "0.12" prost.workspace = true rand.workspace = true diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 285ad121ba63..bc6e48d91155 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -50,7 +50,7 @@ meta-client = { workspace = true } raft-engine = { workspace = true } # Although it is not used, please do not delete it. metrics.workspace = true -moka = { version = "0.9", features = ["future"] } +moka = { workspace = true, features = ["future"] } object-store = { workspace = true } openmetrics-parser = "0.4" opentelemetry-proto.workspace = true diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 0a1ff27f9e4c..2d884ae11e55 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -39,6 +39,7 @@ humantime-serde = { workspace = true } lazy_static = "1.4" memcomparable = "0.2" metrics.workspace = true +moka.workspace = true object-store = { workspace = true } parquet = { workspace = true, features = ["async"] } paste.workspace = true diff --git a/src/partition/Cargo.toml b/src/partition/Cargo.toml index a36962289d85..a6223aded686 100644 --- a/src/partition/Cargo.toml +++ b/src/partition/Cargo.toml @@ -19,7 +19,7 @@ datafusion-expr.workspace = true datafusion.workspace = true datatypes = { workspace = true } meta-client = { workspace = true } -moka = { version = "0.9", features = ["future"] } +moka = { workspace = true, features = ["future"] } serde.workspace = true serde_json = "1.0" snafu.workspace = true From 81c3447a1881c8e385f58ba1430608a6f76dc6ef Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 25 Sep 2023 15:59:44 +0800 Subject: [PATCH 06/15] feat: add moka cache to the manager --- Cargo.lock | 1 + src/mito2/src/cache.rs | 22 +++++++++++++++++++--- src/mito2/src/sst/parquet/reader.rs | 1 + src/mito2/src/worker.rs | 4 ++-- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e179470ad696..3581dd777ac2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5551,6 +5551,7 @@ dependencies = [ "log-store", "memcomparable", "metrics", + "moka", "object-store", "parquet", "paste", diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 32783a22ab2b..004e6d0b866a 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -16,24 +16,32 @@ use std::sync::Arc; +use moka::sync::Cache; use parquet::file::metadata::ParquetMetaData; use store_api::storage::RegionId; use crate::sst::file::FileId; /// Manages cached data for the engine. -pub struct CacheManager {} +pub struct CacheManager { + cache: Cache, +} pub type CacheManagerRef = Arc; impl CacheManager { /// Creates a new manager with specific cache capacity in bytes. /// Returns `None` if `capacity` is 0. - pub fn new(capacity: usize) -> Option { + pub fn new(capacity: u64) -> Option { if capacity == 0 { None } else { - Some(CacheManager {}) + let cache = Cache::builder() + .max_capacity(capacity) + .build(); + Some(CacheManager { + cache, + }) } } @@ -57,3 +65,11 @@ impl CacheManager { // TODO(yingwen): Implements it. } } + +/// Cached value. +/// It can hold different kinds of data. +#[derive(Clone)] +enum CacheValue { + /// Parquet meta data. + ParquetMeta(Arc), +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 003630845efd..a91d781dad92 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -137,6 +137,7 @@ impl ParquetReaderBuilder { let reader = BufReader::new(reader); let reader = AsyncFileReaderCache { reader, + // TODO(yingwen): Sets the metadata when we implement row group level reader. metadata: None, cache: self.cache_manager.clone(), region_id: self.file_handle.region_id(), diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 283382cda3c2..a5d9232ee022 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -119,7 +119,7 @@ impl WorkerGroup { config.global_write_buffer_size.as_bytes() as usize, )); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); - let cache_manager = CacheManager::new(config.cache_size.as_bytes() as usize).map(Arc::new); + let cache_manager = CacheManager::new(config.cache_size.as_bytes()).map(Arc::new); let workers = (0..config.num_workers) .map(|id| { @@ -208,7 +208,7 @@ impl WorkerGroup { assert!(config.num_workers.is_power_of_two()); let config = Arc::new(config); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); - let cache_manager = CacheManager::new(config.cache_size.as_bytes() as usize).map(Arc::new); + let cache_manager = CacheManager::new(config.cache_size.as_bytes()).map(Arc::new); let workers = (0..config.num_workers) .map(|id| { From 4c26056913684e2c6784a69183d5aed7ba19c03f Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 25 Sep 2023 16:04:24 +0800 Subject: [PATCH 07/15] feat: implement parquet meta cache --- src/mito2/src/cache.rs | 67 ++++++++++++++--- src/mito2/src/cache/cache_size.rs | 120 ++++++++++++++++++++++++++++++ 2 files changed, 175 insertions(+), 12 deletions(-) create mode 100644 src/mito2/src/cache/cache_size.rs diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 004e6d0b866a..93ebcf56d4e8 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -14,17 +14,21 @@ //! Cache for the engine. +mod cache_size; + +use std::mem; use std::sync::Arc; use moka::sync::Cache; use parquet::file::metadata::ParquetMetaData; use store_api::storage::RegionId; +use crate::cache::cache_size::parquet_meta_size; use crate::sst::file::FileId; /// Manages cached data for the engine. pub struct CacheManager { - cache: Cache, + cache: Cache, } pub type CacheManagerRef = Arc; @@ -38,31 +42,53 @@ impl CacheManager { } else { let cache = Cache::builder() .max_capacity(capacity) + .weigher(|k: &CacheKey, v: &CacheValue| { + (k.estimated_size() + v.estimated_size()) as u32 + }) .build(); - Some(CacheManager { - cache, - }) + Some(CacheManager { cache }) } } /// Gets cached [ParquetMetaData]. pub fn get_parquet_meta_data( &self, - _region_id: RegionId, - _file_id: FileId, + region_id: RegionId, + file_id: FileId, ) -> Option> { - // TODO(yingwen): Implements it. - None + self.cache + .get(&CacheKey::ParquetMeta(region_id, file_id)) + .map(|v| { + // Safety: key and value have the same type. + v.into_parquet_meta().unwrap() + }) } /// Puts [ParquetMetaData] into the cache. pub fn put_parquet_meta_data( &self, - _region_id: RegionId, - _file_id: FileId, - _metadata: Arc, + region_id: RegionId, + file_id: FileId, + metadata: Arc, ) { - // TODO(yingwen): Implements it. + self.cache.insert( + CacheKey::ParquetMeta(region_id, file_id), + CacheValue::ParquetMeta(metadata), + ); + } +} + +/// Cache key. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum CacheKey { + /// Parquet meta data. + ParquetMeta(RegionId, FileId), +} + +impl CacheKey { + /// Returns memory used by the key (estimated). + fn estimated_size(&self) -> usize { + mem::size_of::() } } @@ -73,3 +99,20 @@ enum CacheValue { /// Parquet meta data. ParquetMeta(Arc), } + +impl CacheValue { + /// Returns memory used by the value (estimated). + fn estimated_size(&self) -> usize { + let inner_size = match self { + CacheValue::ParquetMeta(meta) => parquet_meta_size(&meta), + }; + inner_size + mem::size_of::() + } + + /// Convert to parquet meta. + fn into_parquet_meta(self) -> Option> { + match self { + CacheValue::ParquetMeta(meta) => Some(meta), + } + } +} diff --git a/src/mito2/src/cache/cache_size.rs b/src/mito2/src/cache/cache_size.rs new file mode 100644 index 000000000000..155c1cb68a4a --- /dev/null +++ b/src/mito2/src/cache/cache_size.rs @@ -0,0 +1,120 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Cache size of different cache value. + +use std::mem; + +use parquet::file::metadata::{ + ColumnChunkMetaData, FileMetaData, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex, + RowGroupMetaData, +}; +use parquet::file::page_index::index::Index; +use parquet::format::{ColumnOrder, KeyValue, PageLocation}; +use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor, Type}; + +/// Returns estimated size of [ParquetMetaData]. +pub fn parquet_meta_size(meta: &ParquetMetaData) -> usize { + // struct size + let mut size = mem::size_of::(); + // file_metadata + size += file_meta_heap_size(meta.file_metadata()); + // row_groups + size += meta + .row_groups() + .iter() + .map(|row_group| row_group_meta_heap_size(row_group)) + .sum::(); + // column_index + size += meta + .column_index() + .map(|index| parquet_column_index_heap_size(index)) + .unwrap_or(0); + // offset_index + size += meta + .offset_index() + .map(|index| parquet_offset_index_heap_size(index)) + .unwrap_or(0); + + size +} + +/// Returns estimated size of [FileMetaData] allocated from heap. +fn file_meta_heap_size(meta: &FileMetaData) -> usize { + // created_by + let mut size = meta.created_by().map(|s| s.len()).unwrap_or(0); + // key_value_metadata + size += meta + .key_value_metadata() + .map(|kvs| { + kvs.iter() + .map(|kv| { + kv.key.len() + + kv.value.as_ref().map(|v| v.len()).unwrap_or(0) + + mem::size_of::() + }) + .sum() + }) + .unwrap_or(0); + // schema_descr (It's a ptr so we also add size of SchemaDescriptor). + size += mem::size_of::(); + size += schema_descr_heap_size(meta.schema_descr()); + // column_orders + size += meta + .column_orders() + .map(|orders| orders.len() * mem::size_of::()) + .unwrap_or(0); + + size +} + +/// Returns estimated size of [SchemaDescriptor] allocated from heap. +fn schema_descr_heap_size(descr: &SchemaDescriptor) -> usize { + // schema + let mut size = mem::size_of::(); + // leaves + size += descr + .columns() + .iter() + .map(|descr| mem::size_of::() + column_descr_heap_size(&descr)) + .sum::(); + // leaf_to_base + size += descr.num_columns() * mem::size_of::(); + + size +} + +/// Returns estimated size of [ColumnDescriptor] allocated from heap. +fn column_descr_heap_size(descr: &ColumnDescriptor) -> usize { + descr.path().parts().iter().map(|s| s.len()).sum() +} + +/// Returns estimated size of [ColumnDescriptor] allocated from heap. +fn row_group_meta_heap_size(meta: &RowGroupMetaData) -> usize { + meta.columns().len() * mem::size_of::() +} + +/// Returns estimated size of [ParquetColumnIndex] allocated from heap. +fn parquet_column_index_heap_size(column_index: &ParquetColumnIndex) -> usize { + column_index.iter().map(|index| index.len()).sum::() * mem::size_of::() +} + +/// Returns estimated size of [ParquetOffsetIndex] allocated from heap. +fn parquet_offset_index_heap_size(offset_index: &ParquetOffsetIndex) -> usize { + offset_index + .iter() + .map(|index| index.iter().map(|index| index.len()).sum::()) + .sum::() + * mem::size_of::() +} From 5000b41285ca7dcd22812fec56bb0464a18ba14d Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 25 Sep 2023 16:55:17 +0800 Subject: [PATCH 08/15] test: test cache manager --- src/mito2/src/cache.rs | 24 +++++++++++++++++ src/mito2/src/cache/cache_size.rs | 13 +++++++++ src/mito2/src/cache/test_util.rs | 44 +++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+) create mode 100644 src/mito2/src/cache/test_util.rs diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 93ebcf56d4e8..5960f80210be 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -15,6 +15,8 @@ //! Cache for the engine. mod cache_size; +#[cfg(test)] +pub(crate) mod test_util; use std::mem; use std::sync::Arc; @@ -116,3 +118,25 @@ impl CacheValue { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::cache::test_util::parquet_meta; + + #[test] + fn test_capacity_zero() { + assert!(CacheManager::new(0).is_none()); + } + + #[test] + fn test_parquet_meta_cache() { + let cache = CacheManager::new(2000).unwrap(); + let region_id = RegionId::new(1, 1); + let file_id = FileId::random(); + assert!(cache.get_parquet_meta_data(region_id, file_id).is_none()); + let metadata = parquet_meta(); + cache.put_parquet_meta_data(region_id, file_id, metadata); + assert!(cache.get_parquet_meta_data(region_id, file_id).is_some()); + } +} diff --git a/src/mito2/src/cache/cache_size.rs b/src/mito2/src/cache/cache_size.rs index 155c1cb68a4a..0dbd1ef5524c 100644 --- a/src/mito2/src/cache/cache_size.rs +++ b/src/mito2/src/cache/cache_size.rs @@ -118,3 +118,16 @@ fn parquet_offset_index_heap_size(offset_index: &ParquetOffsetIndex) -> usize { .sum::() * mem::size_of::() } + +#[cfg(test)] +mod tests { + use super::*; + use crate::cache::test_util::parquet_meta; + + #[test] + fn test_parquet_meta_size() { + let metadata = parquet_meta(); + + assert_eq!(948, parquet_meta_size(&metadata)); + } +} diff --git a/src/mito2/src/cache/test_util.rs b/src/mito2/src/cache/test_util.rs new file mode 100644 index 000000000000..deb4ba23cd7b --- /dev/null +++ b/src/mito2/src/cache/test_util.rs @@ -0,0 +1,44 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for testing cache. + +use std::sync::Arc; + +use bytes::Bytes; +use datatypes::arrow::array::{ArrayRef, Int64Array}; +use datatypes::arrow::record_batch::RecordBatch; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use parquet::arrow::ArrowWriter; +use parquet::file::metadata::ParquetMetaData; + +/// Returns a parquet meta data. +pub(crate) fn parquet_meta() -> Arc { + let file_data = parquet_file_data(); + let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(file_data)).unwrap(); + builder.metadata().clone() +} + +/// Write a test parquet file to a buffer +fn parquet_file_data() -> Vec { + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap(); + writer.write(&to_write).unwrap(); + writer.close().unwrap(); + + buffer +} From 88e8f37766f552dcafbd683ec1e529b73aee9ea5 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 25 Sep 2023 17:06:25 +0800 Subject: [PATCH 09/15] feat: consider vec size --- src/mito2/src/cache/cache_size.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/cache/cache_size.rs b/src/mito2/src/cache/cache_size.rs index 0dbd1ef5524c..1620f8203ba1 100644 --- a/src/mito2/src/cache/cache_size.rs +++ b/src/mito2/src/cache/cache_size.rs @@ -107,16 +107,26 @@ fn row_group_meta_heap_size(meta: &RowGroupMetaData) -> usize { /// Returns estimated size of [ParquetColumnIndex] allocated from heap. fn parquet_column_index_heap_size(column_index: &ParquetColumnIndex) -> usize { - column_index.iter().map(|index| index.len()).sum::() * mem::size_of::() + column_index + .iter() + .map(|row_group| row_group.len() * mem::size_of::() + mem::size_of_val(row_group)) + .sum() } /// Returns estimated size of [ParquetOffsetIndex] allocated from heap. fn parquet_offset_index_heap_size(offset_index: &ParquetOffsetIndex) -> usize { offset_index .iter() - .map(|index| index.iter().map(|index| index.len()).sum::()) - .sum::() - * mem::size_of::() + .map(|row_group| { + row_group + .iter() + .map(|column| { + column.len() * mem::size_of::() + mem::size_of_val(column) + }) + .sum::() + + mem::size_of_val(row_group) + }) + .sum() } #[cfg(test)] From 27dc591e1b57c3f94ee33064d514f2f0e412f1cd Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 25 Sep 2023 20:27:25 +0800 Subject: [PATCH 10/15] style: fix clippy --- src/mito2/src/cache.rs | 2 +- src/mito2/src/cache/cache_size.rs | 13 ++++++------- src/mito2/src/read/scan_region.rs | 3 ++- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 5960f80210be..364b1466039a 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -106,7 +106,7 @@ impl CacheValue { /// Returns memory used by the value (estimated). fn estimated_size(&self) -> usize { let inner_size = match self { - CacheValue::ParquetMeta(meta) => parquet_meta_size(&meta), + CacheValue::ParquetMeta(meta) => parquet_meta_size(meta), }; inner_size + mem::size_of::() } diff --git a/src/mito2/src/cache/cache_size.rs b/src/mito2/src/cache/cache_size.rs index 1620f8203ba1..8ecd2d5e99c6 100644 --- a/src/mito2/src/cache/cache_size.rs +++ b/src/mito2/src/cache/cache_size.rs @@ -17,8 +17,7 @@ use std::mem; use parquet::file::metadata::{ - ColumnChunkMetaData, FileMetaData, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex, - RowGroupMetaData, + FileMetaData, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex, RowGroupMetaData, }; use parquet::file::page_index::index::Index; use parquet::format::{ColumnOrder, KeyValue, PageLocation}; @@ -34,17 +33,17 @@ pub fn parquet_meta_size(meta: &ParquetMetaData) -> usize { size += meta .row_groups() .iter() - .map(|row_group| row_group_meta_heap_size(row_group)) + .map(row_group_meta_heap_size) .sum::(); // column_index size += meta .column_index() - .map(|index| parquet_column_index_heap_size(index)) + .map(parquet_column_index_heap_size) .unwrap_or(0); // offset_index size += meta .offset_index() - .map(|index| parquet_offset_index_heap_size(index)) + .map(parquet_offset_index_heap_size) .unwrap_or(0); size @@ -87,7 +86,7 @@ fn schema_descr_heap_size(descr: &SchemaDescriptor) -> usize { size += descr .columns() .iter() - .map(|descr| mem::size_of::() + column_descr_heap_size(&descr)) + .map(|descr| mem::size_of::() + column_descr_heap_size(descr)) .sum::(); // leaf_to_base size += descr.num_columns() * mem::size_of::(); @@ -102,7 +101,7 @@ fn column_descr_heap_size(descr: &ColumnDescriptor) -> usize { /// Returns estimated size of [ColumnDescriptor] allocated from heap. fn row_group_meta_heap_size(meta: &RowGroupMetaData) -> usize { - meta.columns().len() * mem::size_of::() + mem::size_of_val(meta.columns()) } /// Returns estimated size of [ParquetColumnIndex] allocated from heap. diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 57eb3cb5c550..083238dcd06d 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -186,7 +186,8 @@ impl ScanRegion { .with_time_range(Some(time_range)) .with_predicate(Some(predicate)) .with_memtables(memtables) - .with_files(files); + .with_files(files) + .with_cache(self.cache_manager); Ok(seq_scan) } From f7eda5fb84e51cceb443edb83547fd83f168601c Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 25 Sep 2023 20:34:42 +0800 Subject: [PATCH 11/15] test: fix config api test --- tests-integration/tests/http.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 0317482da7f4..a7f2ab8f3f88 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -662,6 +662,7 @@ max_background_jobs = 4 auto_flush_interval = "30m" global_write_buffer_size = "1GiB" global_write_buffer_reject_size = "2GiB" +cache_size = "512MiB" [[region_engine]] From ab447766e4d768a8c31dbb902c43eaec82d0716e Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 26 Sep 2023 15:11:02 +0800 Subject: [PATCH 12/15] feat: divide cache --- src/mito2/src/cache.rs | 80 ++++++++++++++--------------------------- src/mito2/src/config.rs | 6 ++-- src/mito2/src/engine.rs | 8 +++-- src/mito2/src/worker.rs | 8 ++--- 4 files changed, 40 insertions(+), 62 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 364b1466039a..78d99561de9a 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -30,26 +30,29 @@ use crate::sst::file::FileId; /// Manages cached data for the engine. pub struct CacheManager { - cache: Cache, + /// Cache for SST metadata. + sst_meta_cache: Option, } pub type CacheManagerRef = Arc; impl CacheManager { - /// Creates a new manager with specific cache capacity in bytes. - /// Returns `None` if `capacity` is 0. - pub fn new(capacity: u64) -> Option { - if capacity == 0 { + /// Creates a new manager with specific cache size in bytes. + pub fn new(sst_meta_cache_size: u64) -> CacheManager { + let sst_meta_cache = if sst_meta_cache_size == 0 { None } else { let cache = Cache::builder() - .max_capacity(capacity) - .weigher(|k: &CacheKey, v: &CacheValue| { - (k.estimated_size() + v.estimated_size()) as u32 + .max_capacity(sst_meta_cache_size) + .weigher(|k: &SstMetaKey, v: &Arc| { + // We ignore the size of `Arc`. + (k.estimated_size() + parquet_meta_size(v)) as u32 }) .build(); - Some(CacheManager { cache }) - } + Some(cache) + }; + + CacheManager { sst_meta_cache } } /// Gets cached [ParquetMetaData]. @@ -58,12 +61,9 @@ impl CacheManager { region_id: RegionId, file_id: FileId, ) -> Option> { - self.cache - .get(&CacheKey::ParquetMeta(region_id, file_id)) - .map(|v| { - // Safety: key and value have the same type. - v.into_parquet_meta().unwrap() - }) + self.sst_meta_cache + .as_ref() + .and_then(|sst_meta_cache| sst_meta_cache.get(&SstMetaKey(region_id, file_id))) } /// Puts [ParquetMetaData] into the cache. @@ -73,51 +73,24 @@ impl CacheManager { file_id: FileId, metadata: Arc, ) { - self.cache.insert( - CacheKey::ParquetMeta(region_id, file_id), - CacheValue::ParquetMeta(metadata), - ); + if let Some(cache) = &self.sst_meta_cache { + cache.insert(SstMetaKey(region_id, file_id), metadata); + } } } -/// Cache key. +/// Cache key for SST meta. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -enum CacheKey { - /// Parquet meta data. - ParquetMeta(RegionId, FileId), -} +struct SstMetaKey(RegionId, FileId); -impl CacheKey { +impl SstMetaKey { /// Returns memory used by the key (estimated). fn estimated_size(&self) -> usize { - mem::size_of::() + mem::size_of::() } } -/// Cached value. -/// It can hold different kinds of data. -#[derive(Clone)] -enum CacheValue { - /// Parquet meta data. - ParquetMeta(Arc), -} - -impl CacheValue { - /// Returns memory used by the value (estimated). - fn estimated_size(&self) -> usize { - let inner_size = match self { - CacheValue::ParquetMeta(meta) => parquet_meta_size(meta), - }; - inner_size + mem::size_of::() - } - - /// Convert to parquet meta. - fn into_parquet_meta(self) -> Option> { - match self { - CacheValue::ParquetMeta(meta) => Some(meta), - } - } -} +type SstMetaCache = Cache>; #[cfg(test)] mod tests { @@ -126,12 +99,13 @@ mod tests { #[test] fn test_capacity_zero() { - assert!(CacheManager::new(0).is_none()); + let cache = CacheManager::new(0); + assert!(cache.sst_meta_cache.is_none()); } #[test] fn test_parquet_meta_cache() { - let cache = CacheManager::new(2000).unwrap(); + let cache = CacheManager::new(2000); let region_id = RegionId::new(1, 1); let file_id = FileId::random(); assert!(cache.get_parquet_meta_data(region_id, file_id).is_none()); diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 1c9fdda73682..f9e444cf38d3 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -58,8 +58,8 @@ pub struct MitoConfig { pub global_write_buffer_reject_size: ReadableSize, // Cache configs: - /// Total size for cache (default 512MB). Setting it to 0 to disable cache. - pub cache_size: ReadableSize, + /// Cache size for SST metadata (default 128MB). Setting it to 0 to disable cache. + pub sst_meta_cache_size: ReadableSize, } impl Default for MitoConfig { @@ -74,7 +74,7 @@ impl Default for MitoConfig { auto_flush_interval: Duration::from_secs(30 * 60), global_write_buffer_size: ReadableSize::gb(1), global_write_buffer_reject_size: ReadableSize::gb(2), - cache_size: ReadableSize::mb(512), + sst_meta_cache_size: ReadableSize::mb(128), } } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 0674e2688ae3..987ecbf7e593 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -146,8 +146,12 @@ impl EngineInner { let version = region.version(); // Get cache. let cache_manager = self.workers.cache_manager(); - let scan_region = - ScanRegion::new(version, region.access_layer.clone(), request, cache_manager); + let scan_region = ScanRegion::new( + version, + region.access_layer.clone(), + request, + Some(cache_manager), + ); scan_region.scanner() } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index a5d9232ee022..108e461a9918 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -101,7 +101,7 @@ pub(crate) struct WorkerGroup { /// Global background job scheduelr. scheduler: SchedulerRef, /// Cache. - cache_manager: Option, + cache_manager: CacheManagerRef, } impl WorkerGroup { @@ -119,7 +119,7 @@ impl WorkerGroup { config.global_write_buffer_size.as_bytes() as usize, )); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); - let cache_manager = CacheManager::new(config.cache_size.as_bytes()).map(Arc::new); + let cache_manager = Arc::new(CacheManager::new(config.sst_meta_cache_size.as_bytes())); let workers = (0..config.num_workers) .map(|id| { @@ -177,7 +177,7 @@ impl WorkerGroup { } /// Returns cache of the group. - pub(crate) fn cache_manager(&self) -> Option { + pub(crate) fn cache_manager(&self) -> CacheManagerRef { self.cache_manager.clone() } @@ -208,7 +208,7 @@ impl WorkerGroup { assert!(config.num_workers.is_power_of_two()); let config = Arc::new(config); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); - let cache_manager = CacheManager::new(config.cache_size.as_bytes()).map(Arc::new); + let cache_manager = Arc::new(CacheManager::new(config.sst_meta_cache_size.as_bytes())); let workers = (0..config.num_workers) .map(|id| { From 343dc7930927cbfd9a5d19f9a395f1090b4dd74a Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 26 Sep 2023 15:13:52 +0800 Subject: [PATCH 13/15] test: test disabling meta cache --- src/mito2/src/cache.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 78d99561de9a..01b2b3958f4f 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -98,9 +98,15 @@ mod tests { use crate::cache::test_util::parquet_meta; #[test] - fn test_capacity_zero() { + fn test_disable_meta_cache() { let cache = CacheManager::new(0); assert!(cache.sst_meta_cache.is_none()); + + let region_id = RegionId::new(1, 1); + let file_id = FileId::random(); + let metadata = parquet_meta(); + cache.put_parquet_meta_data(region_id, file_id, metadata); + assert!(cache.get_parquet_meta_data(region_id, file_id).is_none()); } #[test] From 85f22ac85cea17c9f3f9483e757e6d1cc2c358ee Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 26 Sep 2023 15:58:14 +0800 Subject: [PATCH 14/15] test: fix config api test --- tests-integration/tests/http.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index a7f2ab8f3f88..2d9ddc6289f4 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -662,7 +662,7 @@ max_background_jobs = 4 auto_flush_interval = "30m" global_write_buffer_size = "1GiB" global_write_buffer_reject_size = "2GiB" -cache_size = "512MiB" +sst_meta_cache_size = "128MiB" [[region_engine]] From 6ba4559d5fe4ad4f4423f857d07373bc5440dca2 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 26 Sep 2023 16:00:53 +0800 Subject: [PATCH 15/15] feat: remove meta cache if file is purged --- src/mito2/src/cache.rs | 9 +++++++++ src/mito2/src/region/opener.rs | 16 +++++++++++++++- src/mito2/src/sst/file_purger.rs | 19 ++++++++++++++++--- src/mito2/src/worker.rs | 6 ++++++ src/mito2/src/worker/handle_create.rs | 1 + src/mito2/src/worker/handle_open.rs | 1 + 6 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 01b2b3958f4f..41ddd17af18a 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -77,6 +77,13 @@ impl CacheManager { cache.insert(SstMetaKey(region_id, file_id), metadata); } } + + /// Removes [ParquetMetaData] from the cache. + pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) { + if let Some(cache) = &self.sst_meta_cache { + cache.remove(&SstMetaKey(region_id, file_id)); + } + } } /// Cache key for SST meta. @@ -118,5 +125,7 @@ mod tests { let metadata = parquet_meta(); cache.put_parquet_meta_data(region_id, file_id, metadata); assert!(cache.get_parquet_meta_data(region_id, file_id).is_some()); + cache.remove_parquet_meta_data(region_id, file_id); + assert!(cache.get_parquet_meta_data(region_id, file_id).is_none()); } } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 3683d6fc2f73..6a8a3a805f7c 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -29,6 +29,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::AccessLayer; +use crate::cache::CacheManagerRef; use crate::config::MitoConfig; use crate::error::{EmptyRegionDirSnafu, RegionCorruptedSnafu, Result}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; @@ -51,6 +52,7 @@ pub(crate) struct RegionOpener { region_dir: String, scheduler: SchedulerRef, options: HashMap, + cache_manager: Option, } impl RegionOpener { @@ -70,6 +72,7 @@ impl RegionOpener { region_dir: normalize_dir(region_dir), scheduler, options: HashMap::new(), + cache_manager: None, } } @@ -85,6 +88,12 @@ impl RegionOpener { self } + /// Sets the cache manager for the region. + pub(crate) fn cache(mut self, cache_manager: Option) -> Self { + self.cache_manager = cache_manager; + self + } + /// Writes region manifest and creates a new region if it does not exist. /// Opens the region if it already exists. /// @@ -145,7 +154,11 @@ impl RegionOpener { version_control, access_layer: access_layer.clone(), manifest_manager, - file_purger: Arc::new(LocalFilePurger::new(self.scheduler, access_layer)), + file_purger: Arc::new(LocalFilePurger::new( + self.scheduler, + access_layer, + self.cache_manager, + )), last_flush_millis: AtomicI64::new(current_time_millis()), // Region is writable after it is created. writable: AtomicBool::new(true), @@ -205,6 +218,7 @@ impl RegionOpener { let file_purger = Arc::new(LocalFilePurger::new( self.scheduler.clone(), access_layer.clone(), + self.cache_manager.clone(), )); let mutable = self.memtable_builder.build(&metadata); let options = RegionOptions::try_from(&self.options)?; diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index ace502f4a659..15c3df6cc703 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -19,6 +19,7 @@ use common_telemetry::{error, info}; use store_api::storage::RegionId; use crate::access_layer::AccessLayerRef; +use crate::cache::CacheManagerRef; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file::FileId; @@ -39,10 +40,11 @@ pub trait FilePurger: Send + Sync + fmt::Debug { pub type FilePurgerRef = Arc; +/// Purger that purges file for current region. pub struct LocalFilePurger { scheduler: SchedulerRef, - sst_layer: AccessLayerRef, + cache_manager: Option, } impl fmt::Debug for LocalFilePurger { @@ -54,10 +56,16 @@ impl fmt::Debug for LocalFilePurger { } impl LocalFilePurger { - pub fn new(scheduler: SchedulerRef, sst_layer: AccessLayerRef) -> Self { + /// Creates a new purger. + pub fn new( + scheduler: SchedulerRef, + sst_layer: AccessLayerRef, + cache_manager: Option, + ) -> Self { Self { scheduler, sst_layer, + cache_manager, } } } @@ -68,6 +76,11 @@ impl FilePurger for LocalFilePurger { let region_id = request.region_id; let sst_layer = self.sst_layer.clone(); + // Remove meta of the file from cache. + if let Some(cache) = &self.cache_manager { + cache.remove_parquet_meta_data(region_id, file_id); + } + if let Err(e) = self.scheduler.schedule(Box::pin(async move { if let Err(e) = sst_layer.delete_sst(file_id).await { error!(e; "Failed to delete SST file, file: {}, region: {}", @@ -113,7 +126,7 @@ mod tests { let scheduler = Arc::new(LocalScheduler::new(3)); let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone())); - let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer)); + let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None)); { let handle = FileHandle::new( diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 108e461a9918..4b119c69959f 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -131,6 +131,7 @@ impl WorkerGroup { write_buffer_manager: write_buffer_manager.clone(), scheduler: scheduler.clone(), listener: WorkerListener::default(), + cache_manager: cache_manager.clone(), } .start() }) @@ -220,6 +221,7 @@ impl WorkerGroup { write_buffer_manager: write_buffer_manager.clone(), scheduler: scheduler.clone(), listener: WorkerListener::new(listener.clone()), + cache_manager: cache_manager.clone(), } .start() }) @@ -246,6 +248,7 @@ struct WorkerStarter { write_buffer_manager: WriteBufferManagerRef, scheduler: SchedulerRef, listener: WorkerListener, + cache_manager: CacheManagerRef, } impl WorkerStarter { @@ -274,6 +277,7 @@ impl WorkerStarter { compaction_scheduler: CompactionScheduler::new(self.scheduler, sender.clone()), stalled_requests: StalledRequests::default(), listener: self.listener, + cache_manager: self.cache_manager, }; let handle = common_runtime::spawn_write(async move { worker_thread.run().await; @@ -428,6 +432,8 @@ struct RegionWorkerLoop { stalled_requests: StalledRequests, /// Event listener for tests. listener: WorkerListener, + /// Cache. + cache_manager: CacheManagerRef, } impl RegionWorkerLoop { diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 848082956894..e9ace31044a5 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -65,6 +65,7 @@ impl RegionWorkerLoop { ) .metadata(metadata) .options(request.options) + .cache(Some(self.cache_manager.clone())) .create_or_open(&self.config, &self.wal) .await?; diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index ab6967c0d1b6..cfeb43d84d98 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -63,6 +63,7 @@ impl RegionWorkerLoop { self.scheduler.clone(), ) .options(request.options) + .cache(Some(self.cache_manager.clone())) .open(&self.config, &self.wal) .await?;