From 0264a93d6128ee4424945c596b3884c1fb143b60 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 06:15:03 +0000 Subject: [PATCH] feat(streaming): decouple memory manager tick interval with barrier (#19494) (#19503) Signed-off-by: MrCroxx Co-authored-by: Croxx --- src/common/src/config.rs | 11 ++++- src/compute/src/memory/manager.rs | 47 ++++++-------------- src/compute/src/server.rs | 12 +++-- src/config/example.toml | 1 + src/frontend/src/expr/type_inference/func.rs | 2 +- 5 files changed, 32 insertions(+), 41 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 9c615416e78ee..d6a8c37f3bea5 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1013,6 +1013,9 @@ pub struct StreamingDeveloperConfig { #[serde(default = "default::developer::memory_controller_eviction_factor_stable")] pub memory_controller_eviction_factor_stable: f64, + #[serde(default = "default::developer::memory_controller_update_interval_ms")] + pub memory_controller_update_interval_ms: usize, + #[serde(default = "default::developer::memory_controller_sequence_tls_step")] pub memory_controller_sequence_tls_step: u64, @@ -1110,11 +1113,11 @@ pub struct ObjectStoreConfig { #[serde(default)] pub s3: S3ObjectStoreConfig, - // TODO: the following field will be deprecated after opendal is stablized + // TODO: the following field will be deprecated after opendal is stabilized #[serde(default = "default::object_store_config::opendal_upload_concurrency")] pub opendal_upload_concurrency: usize, - // TODO: the following field will be deprecated after opendal is stablized + // TODO: the following field will be deprecated after opendal is stabilized #[serde(default)] pub opendal_writer_abort_on_err: bool, @@ -1908,6 +1911,10 @@ pub mod default { 1.0 } + pub fn memory_controller_update_interval_ms() -> usize { + 100 + } + pub fn memory_controller_sequence_tls_step() -> u64 { 128 } diff --git a/src/compute/src/memory/manager.rs b/src/compute/src/memory/manager.rs index b90624193c703..235ab5802fbfd 100644 --- a/src/compute/src/memory/manager.rs +++ b/src/compute/src/memory/manager.rs @@ -17,8 +17,6 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use risingwave_common::sequence::AtomicSequence; -use risingwave_common::system_param::local_manager::SystemParamsReaderRef; -use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_stream::executor::monitor::StreamingMetrics; use super::controller::LruWatermarkController; @@ -50,7 +48,7 @@ pub struct MemoryManager { impl MemoryManager { // Arbitrarily set a minimal barrier interval in case it is too small, // especially when it's 0. - const MIN_TICK_INTERVAL_MS: u32 = 10; + const MIN_INTERVAL: Duration = Duration::from_millis(10); pub fn new(config: MemoryManagerConfig) -> Arc { let controller = Mutex::new(LruWatermarkController::new(&config)); @@ -67,42 +65,23 @@ impl MemoryManager { self.watermark_sequence.clone() } - pub async fn run( - self: Arc, - initial_interval_ms: u32, - mut system_params_change_rx: tokio::sync::watch::Receiver, - ) { + pub async fn run(self: Arc, interval: Duration) { // Loop interval of running control policy - let mut interval_ms = std::cmp::max(initial_interval_ms, Self::MIN_TICK_INTERVAL_MS); - tracing::info!( - "start running MemoryManager with interval {}ms", - interval_ms - ); + let interval = std::cmp::max(interval, Self::MIN_INTERVAL); + tracing::info!("start running MemoryManager with interval {interval:?}",); // Keep same interval with the barrier interval - let mut tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); + let mut tick_interval = tokio::time::interval(interval); loop { - // Wait for a while to check if need eviction. - tokio::select! { - Ok(_) = system_params_change_rx.changed() => { - let params = system_params_change_rx.borrow().load(); - let new_interval_ms = std::cmp::max(params.barrier_interval_ms(), Self::MIN_TICK_INTERVAL_MS); - if new_interval_ms != interval_ms { - interval_ms = new_interval_ms; - tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); - tracing::info!("updated MemoryManager interval to {}ms", interval_ms); - } - } - - _ = tick_interval.tick() => { - let new_watermark_sequence = self.controller.lock().unwrap().tick(); - - self.watermark_sequence.store(new_watermark_sequence, Ordering::Relaxed); - - self.metrics.lru_runtime_loop_count.inc(); - } - } + tick_interval.tick().await; + + let new_watermark_sequence = self.controller.lock().unwrap().tick(); + + self.watermark_sequence + .store(new_watermark_sequence, Ordering::Relaxed); + + self.metrics.lru_runtime_loop_count.inc(); } } } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index c4a3780d9146a..c80efa768a99f 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -328,10 +328,14 @@ pub async fn compute_node_serve( }); // Run a background memory manager - tokio::spawn(memory_mgr.clone().run( - system_params.barrier_interval_ms(), - system_params_manager.watch_params(), - )); + tokio::spawn( + memory_mgr.clone().run(Duration::from_millis( + config + .streaming + .developer + .memory_controller_update_interval_ms as _, + )), + ); let heap_profiler = HeapProfiler::new( opts.total_memory_bytes, diff --git a/src/config/example.toml b/src/config/example.toml index 85ac81bc5a364..4c82a83b63dc0 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -120,6 +120,7 @@ stream_memory_controller_threshold_stable = 0.72 stream_memory_controller_eviction_factor_aggressive = 2.0 stream_memory_controller_eviction_factor_graceful = 1.5 stream_memory_controller_eviction_factor_stable = 1.0 +stream_memory_controller_update_interval_ms = 100 stream_memory_controller_sequence_tls_step = 128 stream_memory_controller_sequence_tls_lag = 32 stream_enable_arrangement_backfill = true diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 9ed7530499921..dad24762c6ef2 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -829,7 +829,7 @@ fn implicit_ok(source: &DataType, target: &SigDataType, eq_ok: bool) -> bool { /// Find the top `candidates` that match `inputs` on most non-null positions. This covers Rule 2, /// 4a, 4c and 4d in [`PostgreSQL`](https://www.postgresql.org/docs/current/typeconv-func.html). /// -/// * Rule 2 & 4c: Keep candidates that have most exact type matches. Exact match on all posistions +/// * Rule 2 & 4c: Keep candidates that have most exact type matches. Exact match on all positions /// is just a special case. /// * Rule 4d: Break ties by selecting those that accept preferred types at most positions. /// * Rule 4a: If the input cannot implicit cast to expected type at any position, this candidate is