diff --git a/Cargo.lock b/Cargo.lock index bb2a5d84fd300..ffc5342c22d61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5358,7 +5358,7 @@ dependencies = [ [[package]] name = "foyer" version = "0.9.4" -source = "git+https://github.com/MrCroxx/foyer?rev=fe904bc#fe904bc120926c91d45c142c63288317ecb2bd3e" +source = "git+https://github.com/MrCroxx/foyer?rev=f85e50b#f85e50bfa0d37b4540d402273d817dc52fa9fda5" dependencies = [ "ahash 0.8.11", "anyhow", @@ -5373,7 +5373,7 @@ dependencies = [ [[package]] name = "foyer-common" version = "0.7.3" -source = "git+https://github.com/MrCroxx/foyer?rev=fe904bc#fe904bc120926c91d45c142c63288317ecb2bd3e" +source = "git+https://github.com/MrCroxx/foyer?rev=f85e50b#f85e50bfa0d37b4540d402273d817dc52fa9fda5" dependencies = [ "bytes", "cfg-if", @@ -5394,7 +5394,7 @@ dependencies = [ [[package]] name = "foyer-intrusive" version = "0.7.2" -source = "git+https://github.com/MrCroxx/foyer?rev=fe904bc#fe904bc120926c91d45c142c63288317ecb2bd3e" +source = "git+https://github.com/MrCroxx/foyer?rev=f85e50b#f85e50bfa0d37b4540d402273d817dc52fa9fda5" dependencies = [ "foyer-common", "itertools 0.13.0", @@ -5403,7 +5403,7 @@ dependencies = [ [[package]] name = "foyer-memory" version = "0.5.2" -source = "git+https://github.com/MrCroxx/foyer?rev=fe904bc#fe904bc120926c91d45c142c63288317ecb2bd3e" +source = "git+https://github.com/MrCroxx/foyer?rev=f85e50b#f85e50bfa0d37b4540d402273d817dc52fa9fda5" dependencies = [ "ahash 0.8.11", "bitflags 2.5.0", @@ -5424,7 +5424,7 @@ dependencies = [ [[package]] name = "foyer-storage" version = "0.8.5" -source = "git+https://github.com/MrCroxx/foyer?rev=fe904bc#fe904bc120926c91d45c142c63288317ecb2bd3e" +source = "git+https://github.com/MrCroxx/foyer?rev=f85e50b#f85e50bfa0d37b4540d402273d817dc52fa9fda5" dependencies = [ "ahash 0.8.11", "allocator-api2", diff --git a/Cargo.toml b/Cargo.toml index 16d11586fc531..5e2e24408a6df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -341,7 +341,7 @@ deno_websocket = { git = "https://github.com/bakjos/deno", rev = "787a232" } # patch to remove preserve_order from serde_json bson = { git = "https://github.com/risingwavelabs/bson-rust", rev = "e5175ec" } # temporarily patch until foyer release v0.10 -foyer = { git = "https://github.com/MrCroxx/foyer", rev = "fe904bc" } +foyer = { git = "https://github.com/MrCroxx/foyer", rev = "f85e50b" } [workspace.metadata.dylint] libraries = [{ path = "./lints" }] diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 21eb4a13e8c31..5f590a63016b5 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -53,6 +53,9 @@ pub mod write_limiter; pub mod recent_filter; pub use recent_filter::*; +pub mod tiered_cache_reconfigurer; +pub use tiered_cache_reconfigurer::*; + pub mod block_stream; pub use error::*; diff --git a/src/storage/src/hummock/tiered_cache_reconfigurer.rs b/src/storage/src/hummock/tiered_cache_reconfigurer.rs new file mode 100644 index 0000000000000..0c67d411cb308 --- /dev/null +++ b/src/storage/src/hummock/tiered_cache_reconfigurer.rs @@ -0,0 +1,127 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use arc_swap::Guard; +use foyer::HybridCache; +use risingwave_common::system_param::local_manager::SystemParamsReaderRef; +use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader}; +use risingwave_hummock_sdk::HummockSstableObjectId; +use tokio::sync::watch; + +use super::{Block, Sstable, SstableBlockIndex}; + +/// Background runner to update tiered cache configuration via the system parameters. +pub struct TieredCacheReconfigurer { + meta_cache: HybridCache>, + block_cache: HybridCache>, +} + +impl TieredCacheReconfigurer { + pub async fn run(self, mut rx: watch::Receiver) { + loop { + if let Err(e) = rx.changed().await { + tracing::error!("Tiered cache reconfigurer exit with error: {}", e); + } + + let p = rx.borrow().load(); + if p.minitrace() { + self.meta_cache.enable_tracing(); + self.block_cache.enable_tracing(); + tracing::info!("Enable minitrace for tiered cache."); + } else { + self.meta_cache.disable_tracing(); + self.block_cache.disable_tracing(); + tracing::info!("Disable minitrace for tiered cache."); + } + let minitrace_tiered_cache_read_ms = p.minitrace_tiered_cache_read_ms(); + let minitrace_tiered_cache_write_ms = p.minitrace_tiered_cache_write_ms(); + + tracing::info!( + "Tiered cache trace record threshold: [read = {:?}] [write = {:?}]", + minitrace_tiered_cache_read_ms, + minitrace_tiered_cache_write_ms + ); + + let meta_cache_trace_config = self.meta_cache.trace_config(); + let block_cache_trace_config = self.meta_cache.trace_config(); + + meta_cache_trace_config + .record_hybrid_get_threshold_us + .store( + minitrace_tiered_cache_read_ms as usize * 1000, + Ordering::Relaxed, + ); + meta_cache_trace_config + .record_hybrid_obtain_threshold_us + .store( + minitrace_tiered_cache_read_ms as usize * 1000, + Ordering::Relaxed, + ); + meta_cache_trace_config + .record_hybrid_fetch_threshold_us + .store( + minitrace_tiered_cache_read_ms as usize * 1000, + Ordering::Relaxed, + ); + + meta_cache_trace_config + .record_hybrid_insert_threshold_us + .store( + minitrace_tiered_cache_write_ms as usize * 1000, + Ordering::Relaxed, + ); + meta_cache_trace_config + .record_hybrid_remove_threshold_us + .store( + minitrace_tiered_cache_write_ms as usize * 1000, + Ordering::Relaxed, + ); + + block_cache_trace_config + .record_hybrid_get_threshold_us + .store( + minitrace_tiered_cache_read_ms as usize * 1000, + Ordering::Relaxed, + ); + block_cache_trace_config + .record_hybrid_obtain_threshold_us + .store( + minitrace_tiered_cache_read_ms as usize * 1000, + Ordering::Relaxed, + ); + block_cache_trace_config + .record_hybrid_fetch_threshold_us + .store( + minitrace_tiered_cache_read_ms as usize * 1000, + Ordering::Relaxed, + ); + + block_cache_trace_config + .record_hybrid_insert_threshold_us + .store( + minitrace_tiered_cache_write_ms as usize * 1000, + Ordering::Relaxed, + ); + block_cache_trace_config + .record_hybrid_remove_threshold_us + .store( + minitrace_tiered_cache_write_ms as usize * 1000, + Ordering::Relaxed, + ); + } + } +}