From f605a970926cfb11a6acbb25993cf9f4ea90353c Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 14:05:11 +0800 Subject: [PATCH] feat(streaming): decouple memory manager tick interval with barrier (#19494) (#19502) Signed-off-by: MrCroxx Co-authored-by: Croxx --- src/common/src/config.rs | 11 ++++- src/compute/src/memory/manager.rs | 47 ++++++-------------- src/compute/src/server.rs | 12 +++-- src/config/example.toml | 1 + src/frontend/src/expr/type_inference/func.rs | 2 +- 5 files changed, 32 insertions(+), 41 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index be8c3dc46de9..8603ce7f9fd1 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1043,6 +1043,9 @@ pub struct StreamingDeveloperConfig { #[serde(default = "default::developer::memory_controller_eviction_factor_stable")] pub memory_controller_eviction_factor_stable: f64, + #[serde(default = "default::developer::memory_controller_update_interval_ms")] + pub memory_controller_update_interval_ms: usize, + #[serde(default = "default::developer::memory_controller_sequence_tls_step")] pub memory_controller_sequence_tls_step: u64, @@ -1146,11 +1149,11 @@ pub struct ObjectStoreConfig { #[serde(default)] pub s3: S3ObjectStoreConfig, - // TODO: the following field will be deprecated after opendal is stablized + // TODO: the following field will be deprecated after opendal is stabilized #[serde(default = "default::object_store_config::opendal_upload_concurrency")] pub opendal_upload_concurrency: usize, - // TODO: the following field will be deprecated after opendal is stablized + // TODO: the following field will be deprecated after opendal is stabilized #[serde(default)] pub opendal_writer_abort_on_err: bool, @@ -1973,6 +1976,10 @@ pub mod default { 1.0 } + pub fn memory_controller_update_interval_ms() -> usize { + 100 + } + pub fn memory_controller_sequence_tls_step() -> u64 { 128 } diff --git a/src/compute/src/memory/manager.rs b/src/compute/src/memory/manager.rs index b90624193c70..235ab5802fbf 100644 --- a/src/compute/src/memory/manager.rs +++ b/src/compute/src/memory/manager.rs @@ -17,8 +17,6 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use risingwave_common::sequence::AtomicSequence; -use risingwave_common::system_param::local_manager::SystemParamsReaderRef; -use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_stream::executor::monitor::StreamingMetrics; use super::controller::LruWatermarkController; @@ -50,7 +48,7 @@ pub struct MemoryManager { impl MemoryManager { // Arbitrarily set a minimal barrier interval in case it is too small, // especially when it's 0. - const MIN_TICK_INTERVAL_MS: u32 = 10; + const MIN_INTERVAL: Duration = Duration::from_millis(10); pub fn new(config: MemoryManagerConfig) -> Arc { let controller = Mutex::new(LruWatermarkController::new(&config)); @@ -67,42 +65,23 @@ impl MemoryManager { self.watermark_sequence.clone() } - pub async fn run( - self: Arc, - initial_interval_ms: u32, - mut system_params_change_rx: tokio::sync::watch::Receiver, - ) { + pub async fn run(self: Arc, interval: Duration) { // Loop interval of running control policy - let mut interval_ms = std::cmp::max(initial_interval_ms, Self::MIN_TICK_INTERVAL_MS); - tracing::info!( - "start running MemoryManager with interval {}ms", - interval_ms - ); + let interval = std::cmp::max(interval, Self::MIN_INTERVAL); + tracing::info!("start running MemoryManager with interval {interval:?}",); // Keep same interval with the barrier interval - let mut tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); + let mut tick_interval = tokio::time::interval(interval); loop { - // Wait for a while to check if need eviction. - tokio::select! { - Ok(_) = system_params_change_rx.changed() => { - let params = system_params_change_rx.borrow().load(); - let new_interval_ms = std::cmp::max(params.barrier_interval_ms(), Self::MIN_TICK_INTERVAL_MS); - if new_interval_ms != interval_ms { - interval_ms = new_interval_ms; - tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); - tracing::info!("updated MemoryManager interval to {}ms", interval_ms); - } - } - - _ = tick_interval.tick() => { - let new_watermark_sequence = self.controller.lock().unwrap().tick(); - - self.watermark_sequence.store(new_watermark_sequence, Ordering::Relaxed); - - self.metrics.lru_runtime_loop_count.inc(); - } - } + tick_interval.tick().await; + + let new_watermark_sequence = self.controller.lock().unwrap().tick(); + + self.watermark_sequence + .store(new_watermark_sequence, Ordering::Relaxed); + + self.metrics.lru_runtime_loop_count.inc(); } } } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 909b4f96b7a1..af0d9575cacf 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -325,10 +325,14 @@ pub async fn compute_node_serve( }); // Run a background memory manager - tokio::spawn(memory_mgr.clone().run( - system_params.barrier_interval_ms(), - system_params_manager.watch_params(), - )); + tokio::spawn( + memory_mgr.clone().run(Duration::from_millis( + config + .streaming + .developer + .memory_controller_update_interval_ms as _, + )), + ); let heap_profiler = HeapProfiler::new( opts.total_memory_bytes, diff --git a/src/config/example.toml b/src/config/example.toml index a41ab0755d91..73f7c09fd3c7 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -130,6 +130,7 @@ stream_memory_controller_threshold_stable = 0.72 stream_memory_controller_eviction_factor_aggressive = 2.0 stream_memory_controller_eviction_factor_graceful = 1.5 stream_memory_controller_eviction_factor_stable = 1.0 +stream_memory_controller_update_interval_ms = 100 stream_memory_controller_sequence_tls_step = 128 stream_memory_controller_sequence_tls_lag = 32 stream_enable_arrangement_backfill = true diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 99392ad87b97..56998b5d7160 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -841,7 +841,7 @@ fn implicit_ok(source: &DataType, target: &SigDataType, eq_ok: bool) -> bool { /// Find the top `candidates` that match `inputs` on most non-null positions. This covers Rule 2, /// 4a, 4c and 4d in [`PostgreSQL`](https://www.postgresql.org/docs/current/typeconv-func.html). /// -/// * Rule 2 & 4c: Keep candidates that have most exact type matches. Exact match on all posistions +/// * Rule 2 & 4c: Keep candidates that have most exact type matches. Exact match on all positions /// is just a special case. /// * Rule 4d: Break ties by selecting those that accept preferred types at most positions. /// * Rule 4a: If the input cannot implicit cast to expected type at any position, this candidate is