From 7761c4ff18b19718d8dbde5e862505e8bbdef74e Mon Sep 17 00:00:00 2001 From: Croxx Date: Fri, 13 Oct 2023 00:30:51 -0500 Subject: [PATCH] feat(storage): filecache lazy load, unit-level refill, reduce insert latency (#12714) Signed-off-by: MrCroxx --- Cargo.lock | 35 +- src/common/src/config.rs | 57 +- src/compute/src/memory_management/mod.rs | 22 +- src/config/example.toml | 12 +- src/ctl/src/common/hummock_service.rs | 1 + src/jni_core/src/hummock_iterator.rs | 1 + src/storage/Cargo.toml | 2 +- src/storage/benches/bench_compactor.rs | 1 + src/storage/benches/bench_multi_builder.rs | 1 + .../hummock_test/src/bin/replay/main.rs | 1 + .../src/hummock/event_handler/refiller.rs | 178 +++++- src/storage/src/hummock/file_cache/store.rs | 523 +++++++++--------- .../src/hummock/iterator/test_utils.rs | 1 + src/storage/src/hummock/mod.rs | 2 +- src/storage/src/hummock/sstable_store.rs | 92 +-- .../src/hummock/store/hummock_storage.rs | 2 + src/storage/src/opts.rs | 27 +- src/storage/src/store_impl.rs | 54 +- .../src/delete_range_runner.rs | 1 + 19 files changed, 597 insertions(+), 416 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3cd5c9fe6be76..e3b68e7f34607 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1774,9 +1774,9 @@ dependencies = [ [[package]] name = "cmsketch" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "467e460587e81453bf9aeb43cd534e9c5ad670042023bd6c3f377c23b76cc2f0" +checksum = "93710598b87c37ea250ab17a36f9f79dbaf3bd20e55806cf09345103bc26d60e" dependencies = [ "paste", ] @@ -3013,7 +3013,7 @@ dependencies = [ [[package]] name = "foyer" version = "0.1.0" -source = "git+https://github.com/mrcroxx/foyer?rev=41b1d39#41b1d3934cc92976737a9296273b4c5bee6422a0" +source = "git+https://github.com/mrcroxx/foyer?rev=438eec8#438eec87e90c7a80cb53a06b711c6ea1ad7a0f41" dependencies = [ "foyer-common", "foyer-intrusive", @@ -3024,21 +3024,21 @@ dependencies = [ [[package]] name = "foyer-common" version = "0.1.0" -source = "git+https://github.com/mrcroxx/foyer?rev=41b1d39#41b1d3934cc92976737a9296273b4c5bee6422a0" +source = "git+https://github.com/mrcroxx/foyer?rev=438eec8#438eec87e90c7a80cb53a06b711c6ea1ad7a0f41" dependencies = [ "bytes", "foyer-workspace-hack", + "madsim-tokio", "parking_lot 0.12.1", "paste", "rand", - "tokio", "tracing", ] [[package]] name = "foyer-intrusive" version = "0.1.0" -source = "git+https://github.com/mrcroxx/foyer?rev=41b1d39#41b1d3934cc92976737a9296273b4c5bee6422a0" +source = "git+https://github.com/mrcroxx/foyer?rev=438eec8#438eec87e90c7a80cb53a06b711c6ea1ad7a0f41" dependencies = [ "bytes", "cmsketch", @@ -3055,11 +3055,10 @@ dependencies = [ [[package]] name = "foyer-storage" version = "0.1.0" -source = "git+https://github.com/mrcroxx/foyer?rev=41b1d39#41b1d3934cc92976737a9296273b4c5bee6422a0" +source = "git+https://github.com/mrcroxx/foyer?rev=438eec8#438eec87e90c7a80cb53a06b711c6ea1ad7a0f41" dependencies = [ "anyhow", "async-channel", - "async-trait", "bitflags 2.4.0", "bitmaps", "bytes", @@ -3070,6 +3069,7 @@ dependencies = [ "futures", "itertools 0.11.0", "libc", + "madsim-tokio", "memoffset", "nix 0.27.1", "parking_lot 0.12.1", @@ -3077,7 +3077,6 @@ dependencies = [ "prometheus", "rand", "thiserror", - "tokio", "tracing", "twox-hash", ] @@ -3085,7 +3084,7 @@ dependencies = [ [[package]] name = "foyer-workspace-hack" version = "0.1.0" -source = "git+https://github.com/mrcroxx/foyer?rev=41b1d39#41b1d3934cc92976737a9296273b4c5bee6422a0" +source = "git+https://github.com/mrcroxx/foyer?rev=438eec8#438eec87e90c7a80cb53a06b711c6ea1ad7a0f41" dependencies = [ "crossbeam-utils", "either", @@ -3096,7 +3095,6 @@ dependencies = [ "hyper", "itertools 0.10.5", "libc", - "lock_api", "memchr", "parking_lot 0.12.1", "parking_lot_core 0.9.8", @@ -3104,8 +3102,8 @@ dependencies = [ "quote", "rand", "regex", - "regex-automata 0.3.8", - "regex-syntax 0.7.5", + "regex-automata 0.4.1", + "regex-syntax 0.8.0", "syn 2.0.37", "tokio", "tracing", @@ -6685,17 +6683,6 @@ dependencies = [ "regex-syntax 0.6.29", ] -[[package]] -name = "regex-automata" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" -dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax 0.7.5", -] - [[package]] name = "regex-automata" version = "0.4.1" diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 785ad614bc257..346dc4d357c1d 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -586,15 +586,36 @@ pub struct StorageConfig { #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)] pub struct CacheRefillConfig { + /// SSTable levels to refill. #[serde(default = "default::cache_refill::data_refill_levels")] pub data_refill_levels: Vec, + /// Cache refill maximum timeout to apply version delta. #[serde(default = "default::cache_refill::timeout_ms")] pub timeout_ms: u64, + /// Inflight data cache refill tasks. #[serde(default = "default::cache_refill::concurrency")] pub concurrency: usize, + /// Block count that a data cache refill request fetches. + #[serde(default = "default::cache_refill::unit")] + pub unit: usize, + + /// Data cache refill unit admission ratio. + /// + /// Only unit whose blocks are admitted above the ratio will be refilled. + #[serde(default = "default::cache_refill::threshold")] + pub threshold: f64, + + /// Recent filter layer count. + #[serde(default = "default::cache_refill::recent_filter_layers")] + pub recent_filter_layers: usize, + + /// Recent filter layer rotate interval. + #[serde(default = "default::cache_refill::recent_filter_rotate_interval_ms")] + pub recent_filter_rotate_interval_ms: usize, + #[serde(default, flatten)] pub unrecognized: Unrecognized, } @@ -637,8 +658,8 @@ pub struct FileCacheConfig { #[serde(default = "default::file_cache::lfu_tiny_lru_capacity_ratio")] pub lfu_tiny_lru_capacity_ratio: f64, - #[serde(default = "default::file_cache::rated_random_rate_mb")] - pub rated_random_rate_mb: usize, + #[serde(default = "default::file_cache::insert_rate_limit_mb")] + pub insert_rate_limit_mb: usize, #[serde(default = "default::file_cache::flush_rate_limit_mb")] pub flush_rate_limit_mb: usize, @@ -646,6 +667,12 @@ pub struct FileCacheConfig { #[serde(default = "default::file_cache::reclaim_rate_limit_mb")] pub reclaim_rate_limit_mb: usize, + #[serde(default = "default::file_cache::allocation_bits")] + pub allocation_bits: usize, + + #[serde(default = "default::file_cache::allocation_timeout_ms")] + pub allocation_timeout_ms: usize, + #[serde(default, flatten)] pub unrecognized: Unrecognized, } @@ -1132,7 +1159,7 @@ pub mod default { 0.01 } - pub fn rated_random_rate_mb() -> usize { + pub fn insert_rate_limit_mb() -> usize { 0 } @@ -1143,6 +1170,14 @@ pub mod default { pub fn reclaim_rate_limit_mb() -> usize { 0 } + + pub fn allocation_bits() -> usize { + 0 + } + + pub fn allocation_timeout_ms() -> usize { + 10 + } } pub mod cache_refill { @@ -1157,6 +1192,22 @@ pub mod default { pub fn concurrency() -> usize { 10 } + + pub fn unit() -> usize { + 64 + } + + pub fn threshold() -> f64 { + 0.5 + } + + pub fn recent_filter_layers() -> usize { + 6 + } + + pub fn recent_filter_rotate_interval_ms() -> usize { + 10000 + } } pub mod heap_profiling { diff --git a/src/compute/src/memory_management/mod.rs b/src/compute/src/memory_management/mod.rs index 014de7144ef05..3ae47729b336a 100644 --- a/src/compute/src/memory_management/mod.rs +++ b/src/compute/src/memory_management/mod.rs @@ -152,20 +152,34 @@ pub fn storage_memory_config( .ceil() as usize) >> 20, ); + + // `foyer` uses a buffer pool to manage dirty buffers. + // + // buffer size = region size (single file size with fs device) + // + // writing buffer + flushing buffer + free buffer = buffer pool buffers + // + // To utilize flushers and allocators, buffers should >= allocators + flushers. + // + // Adding more buffers can prevent allocators from waiting for buffers to be freed by flushers. + let data_file_cache_buffer_pool_capacity_mb = storage_config .data_file_cache .buffer_pool_size_mb .unwrap_or( storage_config.data_file_cache.file_capacity_mb - * storage_config.data_file_cache.flushers, + * (storage_config.data_file_cache.flushers + + 2 * (1 << storage_config.data_file_cache.allocation_bits)), ); let meta_file_cache_buffer_pool_capacity_mb = storage_config .meta_file_cache .buffer_pool_size_mb .unwrap_or( storage_config.meta_file_cache.file_capacity_mb - * storage_config.meta_file_cache.flushers, + * (storage_config.meta_file_cache.flushers + + 2 * (1 << storage_config.meta_file_cache.allocation_bits)), ); + let compactor_memory_limit_mb = storage_config.compactor_memory_limit_mb.unwrap_or( ((non_reserved_memory_bytes as f64 * compactor_memory_proportion).ceil() as usize) >> 20, ); @@ -229,8 +243,8 @@ mod tests { assert_eq!(memory_config.block_cache_capacity_mb, 737); assert_eq!(memory_config.meta_cache_capacity_mb, 860); assert_eq!(memory_config.shared_buffer_capacity_mb, 737); - assert_eq!(memory_config.data_file_cache_buffer_pool_capacity_mb, 256); - assert_eq!(memory_config.meta_file_cache_buffer_pool_capacity_mb, 256); + assert_eq!(memory_config.data_file_cache_buffer_pool_capacity_mb, 384); + assert_eq!(memory_config.meta_file_cache_buffer_pool_capacity_mb, 384); assert_eq!(memory_config.compactor_memory_limit_mb, 819); storage_config.block_cache_capacity_mb = Some(512); diff --git a/src/config/example.toml b/src/config/example.toml index 3e407150eb8ab..141078ddf8a54 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -123,9 +123,11 @@ reclaimers = 4 recover_concurrency = 8 lfu_window_to_cache_size_ratio = 1 lfu_tiny_lru_capacity_ratio = 0.01 -rated_random_rate_mb = 0 +insert_rate_limit_mb = 0 flush_rate_limit_mb = 0 reclaim_rate_limit_mb = 0 +allocation_bits = 0 +allocation_timeout_ms = 10 [storage.meta_file_cache] dir = "" @@ -138,14 +140,20 @@ reclaimers = 4 recover_concurrency = 8 lfu_window_to_cache_size_ratio = 1 lfu_tiny_lru_capacity_ratio = 0.01 -rated_random_rate_mb = 0 +insert_rate_limit_mb = 0 flush_rate_limit_mb = 0 reclaim_rate_limit_mb = 0 +allocation_bits = 0 +allocation_timeout_ms = 10 [storage.cache_refill] data_refill_levels = [] timeout_ms = 6000 concurrency = 10 +unit = 64 +threshold = 0.5 +recent_filter_layers = 6 +recent_filter_rotate_interval_ms = 10000 [system] barrier_interval_ms = 1000 diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index fe3ffefbbdcc6..8b86892781d52 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -169,6 +169,7 @@ For `./risedev apply-compose-deploy` users, 0, FileCache::none(), FileCache::none(), + None, ))) } } diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index 5917d08582998..a8a79f4b48c45 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -69,6 +69,7 @@ impl HummockJavaBindingIterator { 0, FileCache::none(), FileCache::none(), + None, )); let reader = HummockVersionReader::new(sstable_store, Arc::new(HummockStateStoreMetrics::unused())); diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 9e778668f4385..49c1977c0f034 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -26,7 +26,7 @@ dyn-clone = "1.0.14" either = "1" enum-as-inner = "0.6" fail = "0.5" -foyer = { git = "https://github.com/mrcroxx/foyer", rev = "41b1d39" } +foyer = { git = "https://github.com/mrcroxx/foyer", rev = "438eec8" } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } hex = "0.4" diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index df455cf1000ee..41a3649adc5cf 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -56,6 +56,7 @@ pub fn mock_sstable_store() -> SstableStoreRef { 0, FileCache::none(), FileCache::none(), + None, )) } diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index a295864060866..9bf0e0a9546ec 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -144,6 +144,7 @@ fn bench_builder( 0, FileCache::none(), FileCache::none(), + None, )); let mut group = c.benchmark_group("bench_multi_builder"); diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index 7a000c914e3a9..1e9c9591bc864 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -111,6 +111,7 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result = @@ -42,6 +47,7 @@ pub static GLOBAL_CACHE_REFILL_METRICS: LazyLock = pub struct CacheRefillMetrics { pub refill_duration: HistogramVec, pub refill_total: GenericCounterVec, + pub refill_bytes: GenericCounterVec, pub data_refill_success_duration: Histogram, pub meta_refill_success_duration: Histogram, @@ -51,6 +57,9 @@ pub struct CacheRefillMetrics { pub data_refill_started_total: GenericCounter, pub meta_refill_attempts_total: GenericCounter, + pub data_refill_ideal_bytes: GenericCounter, + pub data_refill_success_bytes: GenericCounter, + pub refill_queue_total: IntGauge, } @@ -70,6 +79,13 @@ impl CacheRefillMetrics { registry, ) .unwrap(); + let refill_bytes = register_int_counter_vec_with_registry!( + "refill_bytes", + "refill bytes", + &["type", "op"], + registry, + ) + .unwrap(); let data_refill_success_duration = refill_duration .get_metric_with_label_values(&["data", "success"]) @@ -91,6 +107,13 @@ impl CacheRefillMetrics { .get_metric_with_label_values(&["meta", "attempts"]) .unwrap(); + let data_refill_ideal_bytes = refill_bytes + .get_metric_with_label_values(&["data", "ideal"]) + .unwrap(); + let data_refill_success_bytes = refill_bytes + .get_metric_with_label_values(&["data", "success"]) + .unwrap(); + let refill_queue_total = register_int_gauge_with_registry!( "refill_queue_total", "refill queue total", @@ -101,6 +124,7 @@ impl CacheRefillMetrics { Self { refill_duration, refill_total, + refill_bytes, data_refill_success_duration, meta_refill_success_duration, @@ -108,6 +132,10 @@ impl CacheRefillMetrics { data_refill_attempts_total, data_refill_started_total, meta_refill_attempts_total, + + data_refill_ideal_bytes, + data_refill_success_bytes, + refill_queue_total, } } @@ -115,9 +143,22 @@ impl CacheRefillMetrics { #[derive(Debug)] pub struct CacheRefillConfig { + /// Cache refill timeout. pub timeout: Duration, + + /// Data file cache refill levels. pub data_refill_levels: HashSet, + + /// Data file cache refill concurrency. pub concurrency: usize, + + /// Data file cache refill unit (blocks). + pub unit: usize, + + /// Data file cache reill unit threshold. + /// + /// Only units whose admit rate > threshold will be refilled. + pub threshold: f64, } struct Item { @@ -265,10 +306,8 @@ impl CacheRefillTask { delta: &SstDeltaInfo, holders: Vec, ) { - let now = Instant::now(); - // return if data file cache is disabled - let Some(filter) = context.sstable_store.data_file_cache_refill_filter() else { + let Some(filter) = context.sstable_store.data_recent_filter() else { return; }; @@ -294,29 +333,126 @@ impl CacheRefillTask { let mut tasks = vec![]; for sst_info in &holders { let task = async move { - GLOBAL_CACHE_REFILL_METRICS.data_refill_attempts_total.inc(); + if let Err(e) = Self::data_file_cache_refill_impl(context, sst_info.value()).await { + tracing::warn!("data cache refill error: {:?}", e); + } + }; + tasks.push(task); + } + + join_all(tasks).await; + } + + async fn data_file_cache_refill_impl( + context: &CacheRefillContext, + sst: &Sstable, + ) -> HummockResult<()> { + let sstable_store = &context.sstable_store; + let object_id = sst.id; + let unit = context.config.unit; + let threshold = context.config.threshold; + + if let Some(filter) = sstable_store.data_recent_filter() { + filter.insert(object_id); + } + + let mut tasks = vec![]; + + // unit-level refill: + // + // Although file cache receivces item by block, a larger range of data is still recommended to reduce + // S3 iops and per request base latency waste. + // + // To decide which unit to refill, we calculate the ratio that the block of a unit will be received by + // file cache. If the ratio is higher than a threshold, we fetich and refill the whole unit by block. + + for block_index_start in (0..sst.block_count()).step_by(unit) { + let block_index_end = std::cmp::min(block_index_start + unit, sst.block_count()); + + let (range_first, _) = sst.calculate_block_info(block_index_start); + let (range_last, _) = sst.calculate_block_info(block_index_end - 1); + let range = range_first.start..range_last.end; + + GLOBAL_CACHE_REFILL_METRICS + .data_refill_ideal_bytes + .inc_by((range.end - range.start) as u64); + + let mut writers = Vec::with_capacity(block_index_end - block_index_start); + let mut ranges = Vec::with_capacity(block_index_end - block_index_start); + let mut admits = 0; + + for block_index in block_index_start..block_index_end { + let (range, uncompressed_capacity) = sst.calculate_block_info(block_index); + let key = SstableBlockIndex { + sst_id: object_id, + block_idx: block_index as u64, + }; + let mut writer = sstable_store + .data_file_cache() + .writer(key, key.serialized_len() + uncompressed_capacity); + + if writer.judge() { + admits += 1; + } + + writers.push(writer); + ranges.push(range); + } + + if admits as f64 / writers.len() as f64 >= threshold { + let task = async move { + GLOBAL_CACHE_REFILL_METRICS.data_refill_attempts_total.inc(); - let permit = context.concurrency.acquire().await.unwrap(); + let permit = context.concurrency.acquire().await.unwrap(); - GLOBAL_CACHE_REFILL_METRICS.data_refill_started_total.inc(); + GLOBAL_CACHE_REFILL_METRICS.data_refill_started_total.inc(); - match context - .sstable_store - .fill_data_file_cache(sst_info.value()) - .await - { - Ok(()) => GLOBAL_CACHE_REFILL_METRICS + let timer = GLOBAL_CACHE_REFILL_METRICS .data_refill_success_duration - .observe(now.elapsed().as_secs_f64()), - Err(e) => { - tracing::warn!("data cache refill error: {:?}", e); + .start_timer(); + + let data = sstable_store + .store() + .read(&sstable_store.get_sst_data_path(object_id), range.clone()) + .await?; + let mut futures = vec![]; + for (mut writer, r) in writers.into_iter().zip_eq_fast(ranges) { + let offset = r.start - range.start; + let len = r.end - r.start; + let bytes = data.slice(offset..offset + len); + + let future = async move { + let block = Block::decode( + bytes, + writer.weight() - writer.key().serialized_len(), + )?; + let block = Box::new(block); + writer.force(); + let res = writer.finish(block).await.map_err(HummockError::file_cache); + if matches!(res, Ok(true)) { + GLOBAL_CACHE_REFILL_METRICS + .data_refill_success_bytes + .inc_by(len as u64); + } + res + }; + futures.push(future); } - } - drop(permit); - }; - tasks.push(task); + try_join_all(futures) + .await + .map_err(HummockError::file_cache)?; + + drop(permit); + drop(timer); + + Ok::<_, HummockError>(()) + }; + tasks.push(task); + } } - join_all(tasks).await; + try_join_all(tasks).await?; + + Ok(()) } } diff --git a/src/storage/src/hummock/file_cache/store.rs b/src/storage/src/hummock/file_cache/store.rs index fd549cbc2a96c..9de54552ae077 100644 --- a/src/storage/src/hummock/file_cache/store.rs +++ b/src/storage/src/hummock/file_cache/store.rs @@ -20,29 +20,25 @@ use std::time::Duration; use bytes::{Buf, BufMut, Bytes}; use foyer::common::code::{Key, Value}; -use foyer::storage::admission::rated_random::RatedRandomAdmissionPolicy; +use foyer::intrusive::eviction::lfu::LfuConfig; +use foyer::storage::admission::rated_ticket::RatedTicketAdmissionPolicy; use foyer::storage::admission::AdmissionPolicy; -use foyer::storage::event::EventListener; +use foyer::storage::device::fs::FsDeviceConfig; pub use foyer::storage::metrics::set_metrics_registry as set_foyer_metrics_registry; -use foyer::storage::store::FetchValueFuture; -use foyer::storage::LfuFsStoreConfig; -use risingwave_common::util::runtime::BackgroundShutdownRuntime; +use foyer::storage::reinsertion::ReinsertionPolicy; +use foyer::storage::runtime::{ + RuntimeConfig, RuntimeLazyStore, RuntimeLazyStoreConfig, RuntimeLazyStoreWriter, +}; +use foyer::storage::storage::{Storage, StorageWriter}; +use foyer::storage::store::{LfuFsStoreConfig, NoneStore, NoneStoreWriter}; use risingwave_hummock_sdk::HummockSstableObjectId; use crate::hummock::{Block, Sstable, SstableMeta}; -#[derive(thiserror::Error, Debug)] -pub enum FileCacheError { - #[error("foyer error: {0}")] - Foyer(#[from] foyer::storage::error::Error), - #[error("other {0}")] - Other(#[from] Box), -} - -impl FileCacheError { - fn foyer(e: foyer::storage::error::Error) -> Self { - Self::Foyer(e) - } +pub mod preclude { + pub use foyer::storage::storage::{ + AsyncStorageExt, ForceStorageExt, Storage, StorageExt, StorageWriter, + }; } pub type Result = core::result::Result; @@ -50,11 +46,11 @@ pub type Result = core::result::Result; pub type EvictionConfig = foyer::intrusive::eviction::lfu::LfuConfig; pub type DeviceConfig = foyer::storage::device::fs::FsDeviceConfig; -pub type FoyerStore = foyer::storage::LfuFsStore; -pub type FoyerStoreResult = foyer::storage::error::Result; -pub type FoyerStoreError = foyer::storage::error::Error; +pub type FileCacheResult = foyer::storage::error::Result; +pub type FileCacheError = foyer::storage::error::Error; -pub struct FoyerStoreConfig +#[derive(Debug)] +pub struct FileCacheConfig where K: Key, V: Value, @@ -73,331 +69,320 @@ where pub recover_concurrency: usize, pub lfu_window_to_cache_size_ratio: usize, pub lfu_tiny_lru_capacity_ratio: f64, - pub rated_random_rate: usize, - pub event_listener: Vec>>, - pub enable_filter: bool, + pub insert_rate_limit: usize, + pub allocator_bits: usize, + pub allocation_timeout: Duration, + pub admissions: Vec>>, + pub reinsertions: Vec>>, } -pub struct FoyerRuntimeConfig +impl Clone for FileCacheConfig where K: Key, V: Value, { - pub foyer_store_config: FoyerStoreConfig, - pub runtime_worker_threads: Option, + fn clone(&self) -> Self { + Self { + name: self.name.clone(), + dir: self.dir.clone(), + capacity: self.capacity, + file_capacity: self.file_capacity, + buffer_pool_size: self.buffer_pool_size, + device_align: self.device_align, + device_io_size: self.device_io_size, + flushers: self.flushers, + flush_rate_limit: self.flush_rate_limit, + reclaimers: self.reclaimers, + reclaim_rate_limit: self.reclaim_rate_limit, + recover_concurrency: self.recover_concurrency, + lfu_window_to_cache_size_ratio: self.lfu_window_to_cache_size_ratio, + lfu_tiny_lru_capacity_ratio: self.lfu_tiny_lru_capacity_ratio, + insert_rate_limit: self.insert_rate_limit, + allocator_bits: self.allocator_bits, + allocation_timeout: self.allocation_timeout, + admissions: self.admissions.clone(), + reinsertions: self.reinsertions.clone(), + } + } } -#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash)] -pub struct SstableBlockIndex { - pub sst_id: HummockSstableObjectId, - pub block_idx: u64, +#[derive(Debug)] +pub enum FileCacheWriter +where + K: Key, + V: Value, +{ + Foyer { + writer: RuntimeLazyStoreWriter, + }, + None { + writer: NoneStoreWriter, + }, } -impl Key for SstableBlockIndex { - fn serialized_len(&self) -> usize { - 8 + 8 // sst_id (8B) + block_idx (8B) - } - - fn write(&self, mut buf: &mut [u8]) { - buf.put_u64(self.sst_id); - buf.put_u64(self.block_idx); +impl From> for FileCacheWriter +where + K: Key, + V: Value, +{ + fn from(writer: RuntimeLazyStoreWriter) -> Self { + Self::Foyer { writer } } +} - fn read(mut buf: &[u8]) -> Self { - let sst_id = buf.get_u64(); - let block_idx = buf.get_u64(); - Self { sst_id, block_idx } +impl From> for FileCacheWriter +where + K: Key, + V: Value, +{ + fn from(writer: NoneStoreWriter) -> Self { + Self::None { writer } } } -impl Value for Box { - fn serialized_len(&self) -> usize { - self.raw_data().len() - } +impl StorageWriter for FileCacheWriter +where + K: Key, + V: Value, +{ + type Key = K; + type Value = V; - fn write(&self, mut buf: &mut [u8]) { - buf.put_slice(self.raw_data()) + fn key(&self) -> &Self::Key { + match self { + FileCacheWriter::Foyer { writer } => writer.key(), + FileCacheWriter::None { writer } => writer.key(), + } } - fn read(buf: &[u8]) -> Self { - let data = Bytes::copy_from_slice(buf); - let block = Block::decode_from_raw(data); - Box::new(block) + fn weight(&self) -> usize { + match self { + FileCacheWriter::Foyer { writer } => writer.weight(), + FileCacheWriter::None { writer } => writer.weight(), + } } -} -impl Value for Box { - fn serialized_len(&self) -> usize { - 8 + self.meta.encoded_size() // id (8B) + meta size + fn judge(&mut self) -> bool { + match self { + FileCacheWriter::Foyer { writer } => writer.judge(), + FileCacheWriter::None { writer } => writer.judge(), + } } - fn write(&self, mut buf: &mut [u8]) { - buf.put_u64(self.id); - // TODO(MrCroxx): avoid buffer copy - let mut buffer = vec![]; - self.meta.encode_to(&mut buffer); - buf.put_slice(&buffer[..]) + fn force(&mut self) { + match self { + FileCacheWriter::Foyer { writer } => writer.force(), + FileCacheWriter::None { writer } => writer.force(), + } } - fn read(mut buf: &[u8]) -> Self { - let id = buf.get_u64(); - let meta = SstableMeta::decode(buf).unwrap(); - Box::new(Sstable::new(id, meta)) + async fn finish(self, value: Self::Value) -> FileCacheResult { + match self { + FileCacheWriter::Foyer { writer } => writer.finish(value).await, + FileCacheWriter::None { writer } => writer.finish(value).await, + } } } -#[derive(Clone)] +#[derive(Debug)] pub enum FileCache where - K: Key + Copy, + K: Key, V: Value, { - None, - FoyerRuntime { - runtime: Arc, - store: Arc>, - enable_filter: bool, - }, + Foyer { store: RuntimeLazyStore }, + None { store: NoneStore }, +} + +impl Clone for FileCache +where + K: Key, + V: Value, +{ + fn clone(&self) -> Self { + match self { + Self::Foyer { store } => Self::Foyer { + store: store.clone(), + }, + Self::None { store } => Self::None { + store: store.clone(), + }, + } + } } impl FileCache where - K: Key + Copy, + K: Key, V: Value, { pub fn none() -> Self { - Self::None + Self::None { + store: NoneStore::default(), + } } +} - pub async fn foyer(config: FoyerRuntimeConfig) -> Result { - let mut builder = tokio::runtime::Builder::new_multi_thread(); - if let Some(runtime_worker_threads) = config.runtime_worker_threads { - builder.worker_threads(runtime_worker_threads); +impl Storage for FileCache +where + K: Key, + V: Value, +{ + type Config = FileCacheConfig; + type Key = K; + type Value = V; + type Writer = FileCacheWriter; + + async fn open(config: Self::Config) -> FileCacheResult { + let mut admissions = config.admissions; + if config.insert_rate_limit > 0 { + admissions.push(Arc::new(RatedTicketAdmissionPolicy::new( + config.insert_rate_limit, + ))); } - let runtime = builder - .thread_name("risingwave-foyer-storage") - .enable_all() - .build() - .map_err(|e| FileCacheError::Other(e.into()))?; - - let enable_filter = config.foyer_store_config.enable_filter; - - let store = runtime - .spawn(async move { - let foyer_store_config = config.foyer_store_config; - - let file_capacity = foyer_store_config.file_capacity; - let capacity = foyer_store_config.capacity; - let capacity = capacity - (capacity % file_capacity); - - let mut admissions: Vec>> = vec![]; - if foyer_store_config.rated_random_rate > 0 { - let rr = RatedRandomAdmissionPolicy::new( - foyer_store_config.rated_random_rate, - Duration::from_millis(100), - ); - admissions.push(Arc::new(rr)); - } - - let c = LfuFsStoreConfig { - name: foyer_store_config.name, - eviction_config: EvictionConfig { - window_to_cache_size_ratio: foyer_store_config - .lfu_window_to_cache_size_ratio, - tiny_lru_capacity_ratio: foyer_store_config.lfu_tiny_lru_capacity_ratio, - }, - device_config: DeviceConfig { - dir: foyer_store_config.dir.clone(), - capacity, - file_capacity, - align: foyer_store_config.device_align, - io_size: foyer_store_config.device_io_size, - }, - admissions, - reinsertions: vec![], - buffer_pool_size: foyer_store_config.buffer_pool_size, - flushers: foyer_store_config.flushers, - flush_rate_limit: foyer_store_config.flush_rate_limit, - reclaimers: foyer_store_config.reclaimers, - reclaim_rate_limit: foyer_store_config.reclaim_rate_limit, - recover_concurrency: foyer_store_config.recover_concurrency, - event_listeners: foyer_store_config.event_listener, - clean_region_threshold: foyer_store_config.reclaimers - + foyer_store_config.reclaimers / 2, - }; - - FoyerStore::open(c).await.map_err(FileCacheError::foyer) - }) - .await - .unwrap()?; - - Ok(Self::FoyerRuntime { - runtime: Arc::new(runtime.into()), - store, - enable_filter, - }) - } - #[tracing::instrument(skip(self, value))] - pub async fn insert(&self, key: K, value: V) -> Result { - match self { - FileCache::None => Ok(false), - FileCache::FoyerRuntime { runtime, store, .. } => { - let store = store.clone(); - runtime - .spawn(async move { store.insert_if_not_exists(key, value).await }) - .await - .unwrap() - .map_err(FileCacheError::foyer) + let c = RuntimeLazyStoreConfig { + store: LfuFsStoreConfig { + name: config.name.clone(), + eviction_config: LfuConfig { + window_to_cache_size_ratio: config.lfu_window_to_cache_size_ratio, + tiny_lru_capacity_ratio: config.lfu_tiny_lru_capacity_ratio, + }, + device_config: FsDeviceConfig { + dir: config.dir, + capacity: config.capacity, + file_capacity: config.file_capacity, + align: config.device_align, + io_size: config.device_io_size, + }, + allocator_bits: config.allocator_bits, + admissions, + reinsertions: config.reinsertions, + buffer_pool_size: config.buffer_pool_size, + flushers: config.flushers, + flush_rate_limit: config.flush_rate_limit, + reclaimers: config.reclaimers, + reclaim_rate_limit: config.reclaim_rate_limit, + allocation_timeout: config.allocation_timeout, + clean_region_threshold: config.reclaimers + config.reclaimers / 2, + recover_concurrency: config.recover_concurrency, } - } + .into(), + runtime: RuntimeConfig { + worker_threads: None, + thread_name: Some(config.name), + }, + }; + let store = RuntimeLazyStore::open(c).await?; + Ok(Self::Foyer { store }) } - #[tracing::instrument(skip(self))] - pub fn insert_without_wait(&self, key: K, value: V) { + fn is_ready(&self) -> bool { match self { - FileCache::None => {} - FileCache::FoyerRuntime { runtime, store, .. } => { - let store = store.clone(); - runtime.spawn(async move { store.insert_if_not_exists(key, value).await }); - } + FileCache::Foyer { store } => store.is_ready(), + FileCache::None { store } => store.is_ready(), } } - #[tracing::instrument(skip(self, value))] - pub async fn insert_force(&self, key: K, value: V) -> Result { + async fn close(&self) -> FileCacheResult<()> { match self { - FileCache::None => Ok(false), - FileCache::FoyerRuntime { runtime, store, .. } => { - let store = store.clone(); - runtime - .spawn(async move { store.insert_force(key, value).await }) - .await - .unwrap() - .map_err(FileCacheError::foyer) - } + FileCache::Foyer { store } => store.close().await, + FileCache::None { store } => store.close().await, } } - /// only fetch value if judge pass - #[tracing::instrument(skip(self, fetch_value))] - pub async fn insert_with( - &self, - key: K, - fetch_value: F, - value_serialized_len: usize, - ) -> Result - where - F: FnOnce() -> FU, - FU: FetchValueFuture, - { + fn writer(&self, key: Self::Key, weight: usize) -> Self::Writer { match self { - FileCache::None => Ok(false), - FileCache::FoyerRuntime { runtime, store, .. } => { - let store = store.clone(); - let future = fetch_value(); - runtime - .spawn(async move { - store - .insert_if_not_exists_with_future( - key, - || future, - key.serialized_len() + value_serialized_len, - ) - .await - }) - .await - .unwrap() - .map_err(FileCacheError::foyer) - } + FileCache::Foyer { store } => store.writer(key, weight).into(), + FileCache::None { store } => store.writer(key, weight).into(), } } - #[tracing::instrument(skip(self))] - pub async fn remove(&self, key: &K) -> Result { + fn exists(&self, key: &Self::Key) -> FileCacheResult { match self { - FileCache::None => Ok(false), - FileCache::FoyerRuntime { runtime, store, .. } => { - let store = store.clone(); - let key = *key; - runtime - .spawn(async move { store.remove(&key).await }) - .await - .unwrap() - .map_err(FileCacheError::foyer) - } + FileCache::Foyer { store } => store.exists(key), + FileCache::None { store } => store.exists(key), } } - #[tracing::instrument(skip(self))] - pub fn remove_without_wait(&self, key: &K) { + async fn lookup(&self, key: &Self::Key) -> FileCacheResult> { match self { - FileCache::None => {} - FileCache::FoyerRuntime { runtime, store, .. } => { - let store = store.clone(); - let key = *key; - runtime.spawn(async move { store.remove(&key).await }); - } + FileCache::Foyer { store } => store.lookup(key).await, + FileCache::None { store } => store.lookup(key).await, } } - #[tracing::instrument(skip(self))] - pub async fn clear(&self) -> Result<()> { + fn remove(&self, key: &Self::Key) -> FileCacheResult { match self { - FileCache::None => Ok(()), - FileCache::FoyerRuntime { runtime, store, .. } => { - let store = store.clone(); - runtime - .spawn(async move { store.clear().await }) - .await - .unwrap() - .map_err(FileCacheError::foyer) - } + FileCache::Foyer { store } => store.remove(key), + FileCache::None { store } => store.remove(key), } } - #[tracing::instrument(skip(self))] - pub fn clear_without_wait(&self) { + fn clear(&self) -> FileCacheResult<()> { match self { - FileCache::None => {} - FileCache::FoyerRuntime { runtime, store, .. } => { - let store = store.clone(); - runtime.spawn(async move { store.clear().await }); - } + FileCache::Foyer { store } => store.clear(), + FileCache::None { store } => store.clear(), } } +} - #[tracing::instrument(skip(self))] - pub async fn lookup(&self, key: &K) -> Result> { - match self { - FileCache::None => Ok(None), - FileCache::FoyerRuntime { runtime, store, .. } => { - let store = store.clone(); - let key = *key; - runtime - .spawn(async move { store.lookup(&key).await }) - .await - .unwrap() - .map_err(FileCacheError::foyer) - } - } +#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash)] +pub struct SstableBlockIndex { + pub sst_id: HummockSstableObjectId, + pub block_idx: u64, +} + +impl Key for SstableBlockIndex { + fn serialized_len(&self) -> usize { + 8 + 8 // sst_id (8B) + block_idx (8B) } - #[tracing::instrument(skip(self))] - pub async fn exists(&self, key: &K) -> Result { - match self { - FileCache::None => Ok(false), - FileCache::FoyerRuntime { store, .. } => { - store.exists(key).map_err(FileCacheError::foyer) - } - } + fn write(&self, mut buf: &mut [u8]) { + buf.put_u64(self.sst_id); + buf.put_u64(self.block_idx); } - pub fn is_filter_enabled(&self) -> bool { - match self { - FileCache::None => false, - FileCache::FoyerRuntime { enable_filter, .. } => *enable_filter, - } + fn read(mut buf: &[u8]) -> Self { + let sst_id = buf.get_u64(); + let block_idx = buf.get_u64(); + Self { sst_id, block_idx } + } +} + +impl Value for Box { + fn serialized_len(&self) -> usize { + self.raw_data().len() + } + + fn write(&self, mut buf: &mut [u8]) { + buf.put_slice(self.raw_data()) + } + + fn read(buf: &[u8]) -> Self { + let data = Bytes::copy_from_slice(buf); + let block = Block::decode_from_raw(data); + Box::new(block) + } +} + +impl Value for Box { + fn serialized_len(&self) -> usize { + 8 + self.meta.encoded_size() // id (8B) + meta size + } + + fn write(&self, mut buf: &mut [u8]) { + buf.put_u64(self.id); + // TODO(MrCroxx): avoid buffer copy + let mut buffer = vec![]; + self.meta.encode_to(&mut buffer); + buf.put_slice(&buffer[..]) + } + + fn read(mut buf: &[u8]) -> Self { + let id = buf.get_u64(); + let meta = SstableMeta::decode(buf).unwrap(); + Box::new(Sstable::new(id, meta)) } } diff --git a/src/storage/src/hummock/iterator/test_utils.rs b/src/storage/src/hummock/iterator/test_utils.rs index a11e46e879dda..4845d7b43a0e4 100644 --- a/src/storage/src/hummock/iterator/test_utils.rs +++ b/src/storage/src/hummock/iterator/test_utils.rs @@ -67,6 +67,7 @@ pub fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableSto 0, FileCache::none(), FileCache::none(), + None, )) } diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 5634fbd56086a..60553b5aa09a3 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -25,7 +25,7 @@ use risingwave_pb::hummock::SstableInfo; mod block_cache; pub use block_cache::*; -mod file_cache; +pub mod file_cache; pub use file_cache::*; pub mod sstable; diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 9d835409a6e28..73d6110cacd29 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -15,12 +15,10 @@ use std::clone::Clone; use std::future::Future; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::Duration; use await_tree::InstrumentAwait; use bytes::Bytes; use fail::fail_point; -use futures::future::try_join_all; use futures::{future, StreamExt}; use itertools::Itertools; use risingwave_common::cache::{CachePriority, LookupResponse, LruCacheEventListener}; @@ -41,6 +39,7 @@ use super::{ Block, BlockCache, BlockMeta, BlockResponse, FileCache, RecentFilter, Sstable, SstableBlockIndex, SstableMeta, SstableWriter, }; +use crate::hummock::file_cache::preclude::*; use crate::hummock::multi_builder::UploadJoinHandle; use crate::hummock::{ BlockHolder, CacheableEntry, HummockError, HummockResult, LruCache, MemoryLimiter, @@ -107,7 +106,7 @@ impl LruCacheEventListener for BlockCacheEventListener { sst_id: key.0, block_idx: key.1, }; - self.data_file_cache.insert_without_wait(key, value); + self.data_file_cache.insert_async(key, value); } } @@ -118,7 +117,7 @@ impl LruCacheEventListener for MetaCacheEventListener { type T = Box; fn on_release(&self, key: Self::K, value: Self::T) { - self.0.insert_without_wait(key, value); + self.0.insert_async(key, value); } } @@ -131,7 +130,7 @@ pub struct SstableStore { data_file_cache: FileCache>, meta_file_cache: FileCache>, - data_file_cache_refill_filter: Option>>, + recent_filter: Option>>, } impl SstableStore { @@ -143,6 +142,7 @@ impl SstableStore { high_priority_ratio: usize, data_file_cache: FileCache>, meta_file_cache: FileCache>, + recent_filter: Option>>, ) -> Self { // TODO: We should validate path early. Otherwise object store won't report invalid path // error until first write attempt. @@ -154,11 +154,6 @@ impl SstableStore { data_file_cache: data_file_cache.clone(), }); let meta_cache_listener = Arc::new(MetaCacheEventListener(meta_file_cache.clone())); - let data_file_cache_refill_filter = if data_file_cache.is_filter_enabled() { - Some(Arc::new(RecentFilter::new(6, Duration::from_secs(10)))) - } else { - None - }; Self { path, @@ -179,7 +174,7 @@ impl SstableStore { data_file_cache, meta_file_cache, - data_file_cache_refill_filter, + recent_filter, } } @@ -200,7 +195,7 @@ impl SstableStore { data_file_cache: FileCache::none(), meta_file_cache: FileCache::none(), - data_file_cache_refill_filter: None, + recent_filter: None, } } @@ -210,7 +205,9 @@ impl SstableStore { .delete(self.get_sst_data_path(object_id).as_str()) .await?; self.meta_cache.erase(object_id, &object_id); - self.meta_file_cache.remove_without_wait(&object_id); + self.meta_file_cache + .remove(&object_id) + .map_err(HummockError::file_cache)?; Ok(()) } @@ -230,7 +227,9 @@ impl SstableStore { // Delete from cache. for &object_id in object_id_list { self.meta_cache.erase(object_id, &object_id); - self.meta_file_cache.remove_without_wait(&object_id); + self.meta_file_cache + .remove(&object_id) + .map_err(HummockError::file_cache)?; } Ok(()) @@ -238,7 +237,9 @@ impl SstableStore { pub fn delete_cache(&self, object_id: HummockSstableObjectId) { self.meta_cache.erase(object_id, &object_id); - self.meta_file_cache.remove_without_wait(&object_id); + if let Err(e) = self.meta_file_cache.remove(&object_id) { + tracing::warn!("meta file cache remove error: {}", e); + } } async fn put_sst_data( @@ -304,7 +305,7 @@ impl SstableStore { policy }; - if let Some(filter) = self.data_file_cache_refill_filter.as_ref() { + if let Some(filter) = self.recent_filter.as_ref() { filter.insert(object_id); } @@ -317,7 +318,7 @@ impl SstableStore { )), CachePolicy::FillFileCache => { let block = fetch_block().await?; - self.data_file_cache.insert_without_wait( + self.data_file_cache.insert_async( SstableBlockIndex { sst_id: object_id, block_idx: block_index as u64, @@ -380,13 +381,17 @@ impl SstableStore { #[cfg(any(test, feature = "test"))] pub fn clear_block_cache(&self) { self.block_cache.clear(); - self.data_file_cache.clear_without_wait(); + if let Err(e) = self.data_file_cache.clear() { + tracing::warn!("data file cache clear error: {}", e); + } } #[cfg(any(test, feature = "test"))] pub fn clear_meta_cache(&self) { self.meta_cache.clear(); - self.meta_file_cache.clear_without_wait(); + if let Err(e) = self.meta_file_cache.clear() { + tracing::warn!("meta file cache clear error: {}", e); + } } /// Returns `table_holder` @@ -480,7 +485,7 @@ impl SstableStore { block_index: u64, block: Box, ) { - if let Some(filter) = self.data_file_cache_refill_filter.as_ref() { + if let Some(filter) = self.recent_filter.as_ref() { filter.insert(object_id); } self.block_cache @@ -515,54 +520,13 @@ impl SstableStore { )) } - pub fn data_file_cache_refill_filter( - &self, - ) -> Option<&Arc>> { - self.data_file_cache_refill_filter.as_ref() + pub fn data_recent_filter(&self) -> Option<&Arc>> { + self.recent_filter.as_ref() } pub fn data_file_cache(&self) -> &FileCache> { &self.data_file_cache } - - pub async fn fill_data_file_cache(&self, sst: &Sstable) -> HummockResult<()> { - let object_id = sst.id; - - if let Some(filter) = self.data_file_cache_refill_filter.as_ref() { - filter.insert(object_id); - } - - let data = self - .store - .read(&self.get_sst_data_path(object_id), ..) - .await?; - - let mut tasks = vec![]; - for block_index in 0..sst.block_count() { - let (range, uncompressed_capacity) = sst.calculate_block_info(block_index); - let bytes = data.slice(range); - let block = Block::decode(bytes, uncompressed_capacity)?; - let block = Box::new(block); - - let key = SstableBlockIndex { - sst_id: object_id, - block_idx: block_index as u64, - }; - - let cache = self.data_file_cache.clone(); - let task = async move { - cache - .insert_force(key, block) - .await - .map_err(HummockError::file_cache) - }; - tasks.push(task); - } - - try_join_all(tasks).await?; - - Ok(()) - } } pub type SstableStoreRef = Arc; @@ -742,7 +706,7 @@ impl SstableWriter for BatchUploadWriter { .await?; self.sstable_store.insert_meta_cache(self.object_id, meta); - if let Some(filter) = self.sstable_store.data_file_cache_refill_filter.as_ref() { + if let Some(filter) = self.sstable_store.recent_filter.as_ref() { filter.insert(self.object_id); } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 8ccf1af632066..5e51fa1170b12 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -183,6 +183,8 @@ impl HummockStorage { .copied() .collect(), concurrency: options.cache_refill_concurrency, + unit: options.cache_refill_unit, + threshold: options.cache_refill_threshold, }, ); diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index a26bd6b96467a..a3243cbc8c465 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -75,13 +75,19 @@ pub struct StorageOpts { pub data_file_cache_recover_concurrency: usize, pub data_file_cache_lfu_window_to_cache_size_ratio: usize, pub data_file_cache_lfu_tiny_lru_capacity_ratio: f64, - pub data_file_cache_rated_random_rate_mb: usize, + pub data_file_cache_insert_rate_limit_mb: usize, pub data_file_cache_flush_rate_limit_mb: usize, pub data_file_cache_reclaim_rate_limit_mb: usize, + pub data_file_cache_allocation_bits: usize, + pub data_file_cache_allocation_timeout_ms: usize, pub cache_refill_data_refill_levels: Vec, pub cache_refill_timeout_ms: u64, pub cache_refill_concurrency: usize, + pub cache_refill_recent_filter_layers: usize, + pub cache_refill_recent_filter_rotate_interval_ms: usize, + pub cache_refill_unit: usize, + pub cache_refill_threshold: f64, pub meta_file_cache_dir: String, pub meta_file_cache_capacity_mb: usize, @@ -94,9 +100,11 @@ pub struct StorageOpts { pub meta_file_cache_recover_concurrency: usize, pub meta_file_cache_lfu_window_to_cache_size_ratio: usize, pub meta_file_cache_lfu_tiny_lru_capacity_ratio: f64, - pub meta_file_cache_rated_random_rate_mb: usize, + pub meta_file_cache_insert_rate_limit_mb: usize, pub meta_file_cache_flush_rate_limit_mb: usize, pub meta_file_cache_reclaim_rate_limit_mb: usize, + pub meta_file_cache_allocation_bits: usize, + pub meta_file_cache_allocation_timeout_ms: usize, /// The storage url for storing backups. pub backup_storage_url: String, @@ -173,9 +181,11 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt .storage .data_file_cache .lfu_tiny_lru_capacity_ratio, - data_file_cache_rated_random_rate_mb: c.storage.data_file_cache.rated_random_rate_mb, + data_file_cache_insert_rate_limit_mb: c.storage.data_file_cache.insert_rate_limit_mb, data_file_cache_flush_rate_limit_mb: c.storage.data_file_cache.flush_rate_limit_mb, data_file_cache_reclaim_rate_limit_mb: c.storage.data_file_cache.reclaim_rate_limit_mb, + data_file_cache_allocation_bits: c.storage.data_file_cache.allocation_bits, + data_file_cache_allocation_timeout_ms: c.storage.data_file_cache.allocation_timeout_ms, meta_file_cache_dir: c.storage.meta_file_cache.dir.clone(), meta_file_cache_capacity_mb: c.storage.meta_file_cache.capacity_mb, meta_file_cache_file_capacity_mb: c.storage.meta_file_cache.file_capacity_mb, @@ -193,12 +203,21 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt .storage .meta_file_cache .lfu_tiny_lru_capacity_ratio, - meta_file_cache_rated_random_rate_mb: c.storage.meta_file_cache.rated_random_rate_mb, + meta_file_cache_insert_rate_limit_mb: c.storage.meta_file_cache.insert_rate_limit_mb, meta_file_cache_flush_rate_limit_mb: c.storage.meta_file_cache.flush_rate_limit_mb, meta_file_cache_reclaim_rate_limit_mb: c.storage.meta_file_cache.reclaim_rate_limit_mb, + meta_file_cache_allocation_bits: c.storage.meta_file_cache.allocation_bits, + meta_file_cache_allocation_timeout_ms: c.storage.meta_file_cache.allocation_timeout_ms, cache_refill_data_refill_levels: c.storage.cache_refill.data_refill_levels.clone(), cache_refill_timeout_ms: c.storage.cache_refill.timeout_ms, cache_refill_concurrency: c.storage.cache_refill.concurrency, + cache_refill_recent_filter_layers: c.storage.cache_refill.recent_filter_layers, + cache_refill_recent_filter_rotate_interval_ms: c + .storage + .cache_refill + .recent_filter_rotate_interval_ms, + cache_refill_unit: c.storage.cache_refill.unit, + cache_refill_threshold: c.storage.cache_refill.threshold, max_preload_wait_time_mill: c.storage.max_preload_wait_time_mill, object_store_streaming_read_timeout_ms: c .storage diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index b2fc43dc40b06..8460b75ebc0dc 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -15,6 +15,7 @@ use std::fmt::Debug; use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; use enum_as_inner::EnumAsInner; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; @@ -23,10 +24,11 @@ use risingwave_object_store::object::parse_remote_object_store; use crate::error::StorageResult; use crate::filter_key_extractor::{RemoteTableAccessor, RpcFilterKeyExtractorManager}; +use crate::hummock::file_cache::preclude::*; use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient; use crate::hummock::{ - set_foyer_metrics_registry, FileCache, FoyerRuntimeConfig, FoyerStoreConfig, HummockError, - HummockStorage, SstableStore, + set_foyer_metrics_registry, FileCache, FileCacheConfig, HummockError, HummockStorage, + RecentFilter, SstableStore, }; use crate::memory::sled::SledStateStore; use crate::memory::MemoryStateStore; @@ -525,12 +527,12 @@ impl StateStoreImpl { ) -> StorageResult { set_foyer_metrics_registry(GLOBAL_METRICS_REGISTRY.clone()); - let data_file_cache = if opts.data_file_cache_dir.is_empty() { - FileCache::none() + let (data_file_cache, recent_filter) = if opts.data_file_cache_dir.is_empty() { + (FileCache::none(), None) } else { const MB: usize = 1024 * 1024; - let foyer_store_config = FoyerStoreConfig { + let config = FileCacheConfig { name: "data".to_string(), dir: PathBuf::from(opts.data_file_cache_dir.clone()), capacity: opts.data_file_cache_capacity_mb * MB, @@ -540,22 +542,27 @@ impl StateStoreImpl { device_io_size: opts.data_file_cache_device_io_size, lfu_window_to_cache_size_ratio: opts.data_file_cache_lfu_window_to_cache_size_ratio, lfu_tiny_lru_capacity_ratio: opts.data_file_cache_lfu_tiny_lru_capacity_ratio, - rated_random_rate: opts.data_file_cache_rated_random_rate_mb * MB, + insert_rate_limit: opts.data_file_cache_insert_rate_limit_mb * MB, flushers: opts.data_file_cache_flushers, reclaimers: opts.data_file_cache_reclaimers, flush_rate_limit: opts.data_file_cache_flush_rate_limit_mb * MB, reclaim_rate_limit: opts.data_file_cache_reclaim_rate_limit_mb * MB, recover_concurrency: opts.data_file_cache_recover_concurrency, - event_listener: vec![], - enable_filter: !opts.cache_refill_data_refill_levels.is_empty(), - }; - let config = FoyerRuntimeConfig { - foyer_store_config, - runtime_worker_threads: None, + allocator_bits: opts.data_file_cache_allocation_bits, + allocation_timeout: Duration::from_millis( + opts.data_file_cache_allocation_timeout_ms as u64, + ), + admissions: vec![], + reinsertions: vec![], }; - FileCache::foyer(config) + let cache = FileCache::open(config) .await - .map_err(HummockError::file_cache)? + .map_err(HummockError::file_cache)?; + let filter = Some(Arc::new(RecentFilter::new( + opts.cache_refill_recent_filter_layers, + Duration::from_millis(opts.cache_refill_recent_filter_rotate_interval_ms as u64), + ))); + (cache, filter) }; let meta_file_cache = if opts.meta_file_cache_dir.is_empty() { @@ -563,7 +570,7 @@ impl StateStoreImpl { } else { const MB: usize = 1024 * 1024; - let foyer_store_config = FoyerStoreConfig { + let config = FileCacheConfig { name: "meta".to_string(), dir: PathBuf::from(opts.meta_file_cache_dir.clone()), capacity: opts.meta_file_cache_capacity_mb * MB, @@ -573,20 +580,20 @@ impl StateStoreImpl { device_io_size: opts.meta_file_cache_device_io_size, lfu_window_to_cache_size_ratio: opts.meta_file_cache_lfu_window_to_cache_size_ratio, lfu_tiny_lru_capacity_ratio: opts.meta_file_cache_lfu_tiny_lru_capacity_ratio, - rated_random_rate: opts.meta_file_cache_rated_random_rate_mb * MB, + insert_rate_limit: opts.meta_file_cache_insert_rate_limit_mb * MB, flushers: opts.meta_file_cache_flushers, reclaimers: opts.meta_file_cache_reclaimers, flush_rate_limit: opts.meta_file_cache_flush_rate_limit_mb * MB, reclaim_rate_limit: opts.meta_file_cache_reclaim_rate_limit_mb * MB, recover_concurrency: opts.meta_file_cache_recover_concurrency, - event_listener: vec![], - enable_filter: false, - }; - let config = FoyerRuntimeConfig { - foyer_store_config, - runtime_worker_threads: None, + allocator_bits: opts.meta_file_cache_allocation_bits, + allocation_timeout: Duration::from_millis( + opts.meta_file_cache_allocation_timeout_ms as u64, + ), + admissions: vec![], + reinsertions: vec![], }; - FileCache::foyer(config) + FileCache::open(config) .await .map_err(HummockError::file_cache)? }; @@ -614,6 +621,7 @@ impl StateStoreImpl { opts.high_priority_ratio, data_file_cache, meta_file_cache, + recent_filter, )); let notification_client = RpcNotificationClient::new(hummock_meta_client.get_inner().clone()); diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 5963d7b33cdea..346cf2fe6acf8 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -211,6 +211,7 @@ async fn compaction_test( 0, FileCache::none(), FileCache::none(), + None, )); let store = HummockStorage::new(