From 9eb816acff765c78b70ab23c8ee642760cbdb752 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Tue, 2 Apr 2024 16:55:13 +0800 Subject: [PATCH] perf(memory): use thread-local squence-based memory eviction policy Signed-off-by: MrCroxx --- Cargo.lock | 2 + src/common/Cargo.toml | 2 + src/common/src/config.rs | 24 ++ src/common/src/lib.rs | 2 + src/common/src/lru.rs | 347 ++++++++++++++++++ src/common/src/sequence.rs | 72 ++++ src/compute/src/memory/controller.rs | 36 +- src/compute/src/memory/manager.rs | 34 +- src/compute/src/server.rs | 14 +- src/config/example.toml | 3 + src/stream/src/cache/managed_lru.rs | 325 +++++++--------- .../src/executor/aggregation/distinct.rs | 6 +- src/stream/src/executor/dedup/cache.rs | 6 +- src/stream/src/executor/hash_agg.rs | 12 +- src/stream/src/executor/join/hash_join.rs | 12 +- src/stream/src/executor/lookup/cache.rs | 6 +- src/stream/src/executor/mview/materialize.rs | 6 +- src/stream/src/executor/over_window/eowc.rs | 8 +- .../src/executor/over_window/general.rs | 11 +- src/stream/src/executor/temporal_join.rs | 8 +- src/stream/src/executor/top_n/group_top_n.rs | 6 +- 21 files changed, 696 insertions(+), 246 deletions(-) create mode 100644 src/common/src/lru.rs create mode 100644 src/common/src/sequence.rs diff --git a/Cargo.lock b/Cargo.lock index c66bb5a91db5..4312e822e9f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9020,6 +9020,7 @@ dependencies = [ name = "risingwave_common" version = "1.7.0-alpha" dependencies = [ + "ahash 0.8.6", "anyhow", "arc-swap", "arrow-array 48.0.1", @@ -9054,6 +9055,7 @@ dependencies = [ "foyer", "futures", "governor", + "hashbrown 0.14.3", "hex", "http 0.2.9", "http-body", diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 31ebcc59e93d..7c31dd3627b7 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -14,6 +14,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +ahash = "0.8" anyhow = "1" arc-swap = "1" arrow-array = { workspace = true } @@ -48,6 +49,7 @@ fixedbitset = "0.5" foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } governor = { version = "0.6", default-features = false, features = ["std"] } +hashbrown = "0.14" hex = "0.4.3" http = "0.2" humantime = "2.1" diff --git a/src/common/src/config.rs b/src/common/src/config.rs index c8bbd591d715..a7d5122a1498 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -922,6 +922,15 @@ pub struct StreamingDeveloperConfig { #[serde(default = "default::developer::memory_controller_threshold_stable")] pub memory_controller_threshold_stable: f64, + #[serde(default = "default::developer::memory_controller_eviction_factor_aggressive")] + pub memory_controller_eviction_factor_aggressive: f64, + + #[serde(default = "default::developer::memory_controller_eviction_factor_graceful")] + pub memory_controller_eviction_factor_graceful: f64, + + #[serde(default = "default::developer::memory_controller_eviction_factor_stable")] + pub memory_controller_eviction_factor_stable: f64, + #[serde(default = "default::developer::stream_enable_arrangement_backfill")] /// Enable arrangement backfill /// If true, the arrangement backfill will be disabled, @@ -1566,12 +1575,27 @@ pub mod default { pub fn memory_controller_threshold_aggressive() -> f64 { 0.9 } + pub fn memory_controller_threshold_graceful() -> f64 { 0.8 } + pub fn memory_controller_threshold_stable() -> f64 { 0.7 } + + pub fn memory_controller_eviction_factor_aggressive() -> f64 { + 2.0 + } + + pub fn memory_controller_eviction_factor_graceful() -> f64 { + 1.5 + } + + pub fn memory_controller_eviction_factor_stable() -> f64 { + 1.0 + } + pub fn stream_enable_arrangement_backfill() -> bool { true } diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index d040245236e2..dbd9db3d6bd8 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -81,9 +81,11 @@ pub use risingwave_common_metrics::{ register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry, }; +pub mod lru; pub mod opts; pub mod range; pub mod row; +pub mod sequence; pub mod session_config; pub mod system_param; pub mod telemetry; diff --git a/src/common/src/lru.rs b/src/common/src/lru.rs new file mode 100644 index 000000000000..2d06984ab1a9 --- /dev/null +++ b/src/common/src/lru.rs @@ -0,0 +1,347 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::alloc::{Allocator, Global}; +use std::borrow::Borrow; +use std::cell::RefCell; +use std::hash::{BuildHasher, Hash}; +use std::mem::MaybeUninit; +use std::ptr::NonNull; + +pub use ahash::RandomState; +use hashbrown::hash_table::Entry; +use hashbrown::HashTable; + +use crate::sequence::{Sequence, Sequencer}; + +thread_local! { + pub static SEQUENCER: RefCell = RefCell::new(Sequencer::new(Sequencer::DEFAULT_STEP, Sequencer::DEFAULT_LAG)); +} + +struct LruEntry +where + K: Hash + Eq, +{ + prev: Option>>, + next: Option>>, + key: MaybeUninit, + value: MaybeUninit, + hash: u64, + sequence: Sequence, +} + +impl LruEntry +where + K: Hash + Eq, +{ + fn key(&self) -> &K { + unsafe { self.key.assume_init_ref() } + } + + fn value(&self) -> &V { + unsafe { self.value.assume_init_ref() } + } + + fn value_mut(&mut self) -> &mut V { + unsafe { self.value.assume_init_mut() } + } +} + +unsafe impl Send for LruEntry where K: Hash + Eq {} +unsafe impl Sync for LruEntry where K: Hash + Eq {} + +pub struct LruCache +where + K: Hash + Eq, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, +{ + map: HashTable>, A>, + /// dummy node of the lru linked list + dummy: Box, A>, + + alloc: A, + hash_builder: S, +} + +unsafe impl Send for LruCache +where + K: Hash + Eq, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, +{ +} +unsafe impl Sync for LruCache +where + K: Hash + Eq, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, +{ +} + +impl LruCache +where + K: Hash + Eq, +{ + pub fn unbounded() -> Self { + Self::unbounded_with_hasher_in(RandomState::default(), Global) + } +} + +impl LruCache +where + K: Hash + Eq, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, +{ + pub fn unbounded_with_hasher_in(hash_builder: S, alloc: A) -> Self { + let map = HashTable::new_in(alloc.clone()); + let mut dummy = Box::new_in( + LruEntry { + prev: None, + next: None, + key: MaybeUninit::uninit(), + value: MaybeUninit::uninit(), + hash: 0, + sequence: Sequence::default(), + }, + alloc.clone(), + ); + let ptr = unsafe { NonNull::new_unchecked(dummy.as_mut() as *mut _) }; + dummy.next = Some(ptr); + dummy.prev = Some(ptr); + Self { + map, + dummy, + alloc, + hash_builder, + } + } + + pub fn put(&mut self, key: K, mut value: V) -> Option { + unsafe { + let hash = self.hash_builder.hash_one(&key); + + match self + .map + .entry(hash, |p| p.as_ref().key() == &key, |p| p.as_ref().hash) + { + Entry::Occupied(o) => { + let mut ptr = *o.get(); + let entry = ptr.as_mut(); + std::mem::swap(&mut value, entry.value_mut()); + self.detach(ptr); + self.attach(ptr); + Some(value) + } + Entry::Vacant(v) => { + let entry = Box::new_in( + LruEntry { + prev: None, + next: None, + key: MaybeUninit::new(key), + value: MaybeUninit::new(value), + hash, + sequence: SEQUENCER.with(|s| s.borrow_mut().inc()), + }, + self.alloc.clone(), + ); + let ptr = NonNull::new_unchecked(Box::into_raw(entry)); + v.insert(ptr); + self.attach(ptr); + None + } + } + } + } + + pub fn get<'a, Q>(&'a mut self, key: &Q) -> Option<&'a V> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + unsafe { + let key = key.borrow(); + let hash = self.hash_builder.hash_one(key); + if let Some(ptr) = self.map.find(hash, |p| p.as_ref().key().borrow() == key) { + let ptr = *ptr; + self.detach(ptr); + self.attach(ptr); + Some(ptr.as_ref().value()) + } else { + None + } + } + } + + pub fn get_mut<'a, Q>(&'a mut self, key: &Q) -> Option<&'a mut V> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + unsafe { + let key = key.borrow(); + let hash = self.hash_builder.hash_one(key); + if let Some(ptr) = self + .map + .find_mut(hash, |p| p.as_ref().key().borrow() == key) + { + let mut ptr = *ptr; + self.detach(ptr); + self.attach(ptr); + Some(ptr.as_mut().value_mut()) + } else { + None + } + } + } + + pub fn peek<'a, Q>(&'a self, key: &Q) -> Option<&'a V> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + unsafe { + let key = key.borrow(); + let hash = self.hash_builder.hash_one(key); + self.map + .find(hash, |p| p.as_ref().key().borrow() == key) + .map(|ptr| ptr.as_ref().value()) + } + } + + pub fn peek_mut<'a, Q>(&'a mut self, key: &Q) -> Option<&'a mut V> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + unsafe { + let key = key.borrow(); + let hash = self.hash_builder.hash_one(key); + self.map + .find(hash, |p| p.as_ref().key().borrow() == key) + .map(|ptr| ptr.clone().as_mut().value_mut()) + } + } + + pub fn contains(&self, key: &Q) -> bool + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + unsafe { + let key = key.borrow(); + let hash = self.hash_builder.hash_one(key); + self.map + .find(hash, |p| p.as_ref().key().borrow() == key) + .is_some() + } + } + + pub fn len(&self) -> usize { + self.map.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Pop first entry if its sequence is less than the given sequence. + pub fn pop_with_sequence(&mut self, sequence: Sequence) -> Option<(K, V, Sequence)> { + unsafe { + if self.is_empty() { + return None; + } + + let ptr = self.dummy.next.unwrap_unchecked(); + if ptr.as_ref().sequence >= sequence { + return None; + } + + self.detach(ptr); + + let entry = Box::from_raw_in(ptr.as_ptr(), self.alloc.clone()); + + let key = entry.key.assume_init(); + let value = entry.value.assume_init(); + let sequence = entry.sequence; + + let hash = self.hash_builder.hash_one(&key); + + match self + .map + .entry(hash, |p| p.as_ref().key() == &key, |p| p.as_ref().hash) + { + Entry::Occupied(o) => { + o.remove(); + } + Entry::Vacant(_) => {} + } + + Some((key, value, sequence)) + } + } + + pub fn clear(&mut self) { + unsafe { + while !self.is_empty() { + let ptr = self.dummy.next.unwrap_unchecked(); + self.detach(ptr); + let entry = Box::from_raw_in(ptr.as_ptr(), self.alloc.clone()); + let key = entry.key.assume_init(); + let hash = self.hash_builder.hash_one(&key); + match self + .map + .entry(hash, |p| p.as_ref().key() == &key, |p| p.as_ref().hash) + { + Entry::Occupied(o) => o.remove(), + Entry::Vacant(_) => unreachable!(), + }; + } + } + } + + fn detach(&mut self, mut ptr: NonNull>) { + unsafe { + let entry = ptr.as_mut(); + + debug_assert!(entry.prev.is_some() && entry.next.is_some()); + + entry.prev.unwrap_unchecked().as_mut().next = entry.next; + entry.next.unwrap_unchecked().as_mut().prev = entry.prev; + + entry.next = None; + entry.prev = None; + } + } + + fn attach(&mut self, mut ptr: NonNull>) { + unsafe { + let entry = ptr.as_mut(); + + debug_assert!(entry.prev.is_none() && entry.next.is_none()); + + entry.next = Some(NonNull::new_unchecked(self.dummy.as_mut() as *mut _)); + entry.prev = self.dummy.prev; + + self.dummy.prev.unwrap_unchecked().as_mut().next = Some(ptr); + self.dummy.prev = Some(ptr); + + entry.sequence = SEQUENCER.with(|s| s.borrow_mut().inc()); + } + } +} + +#[cfg(test)] +mod tests {} diff --git a/src/common/src/sequence.rs b/src/common/src/sequence.rs new file mode 100644 index 000000000000..48ff70ff6c4d --- /dev/null +++ b/src/common/src/sequence.rs @@ -0,0 +1,72 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicU64, Ordering}; + +pub type Sequence = u64; +pub type AtomicSequence = AtomicU64; + +pub static SEQUENCE_GLOBAL: AtomicSequence = AtomicSequence::new(0); + +pub struct Sequencer { + local: Sequence, + target: Sequence, + + step: Sequence, + lag: Sequence, +} + +impl Sequencer { + pub const DEFAULT_LAG: Sequence = Self::DEFAULT_STEP * 16; + pub const DEFAULT_STEP: Sequence = 128; + + pub const fn new(step: Sequence, lag: Sequence) -> Self { + Self { + local: 0, + target: 0, + step, + lag, + } + } + + pub fn global(&self) -> Sequence { + SEQUENCE_GLOBAL.load(Ordering::Relaxed) + } + + pub fn local(&self) -> Sequence { + self.local + } + + pub fn inc(&mut self) -> Sequence { + self.try_alloc(); + let res = self.local; + self.local += 1; + res + } + + #[inline(always)] + fn try_alloc(&mut self) { + if self.local == self.target + || self.local + self.lag < SEQUENCE_GLOBAL.load(Ordering::Relaxed) + { + self.alloc() + } + } + + #[inline(always)] + fn alloc(&mut self) { + self.local = SEQUENCE_GLOBAL.fetch_add(self.step, Ordering::Relaxed); + self.target = self.local + self.step; + } +} diff --git a/src/compute/src/memory/controller.rs b/src/compute/src/memory/controller.rs index 55d054724911..b5fa05162d9b 100644 --- a/src/compute/src/memory/controller.rs +++ b/src/compute/src/memory/controller.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::Ordering; use std::sync::Arc; +use risingwave_common::sequence::{Sequence, SEQUENCE_GLOBAL}; use risingwave_common::util::epoch::Epoch; use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats; use risingwave_stream::executor::monitor::StreamingMetrics; @@ -75,8 +77,14 @@ pub struct LruWatermarkController { threshold_graceful: usize, threshold_aggressive: usize, + eviction_factor_stable: f64, + eviction_factor_graceful: f64, + eviction_factor_aggressive: f64, + /// The state from previous tick state: State, + + watermark_sequence: Sequence, } impl LruWatermarkController { @@ -91,7 +99,11 @@ impl LruWatermarkController { threshold_stable, threshold_graceful, threshold_aggressive, + eviction_factor_stable: config.eviction_factor_stable, + eviction_factor_graceful: config.eviction_factor_graceful, + eviction_factor_aggressive: config.eviction_factor_aggressive, state: State::default(), + watermark_sequence: 0, } } } @@ -131,7 +143,7 @@ fn jemalloc_memory_stats() -> MemoryStats { } impl LruWatermarkController { - pub fn tick(&mut self, interval_ms: u32) -> Epoch { + pub fn tick(&mut self, interval_ms: u32) -> Sequence { // NOTE: Be careful! The meaning of `allocated` and `active` differ in JeMalloc and JVM let MemoryStats { allocated: jemalloc_allocated_bytes, @@ -146,6 +158,26 @@ impl LruWatermarkController { let last_step = self.state.lru_watermark_step; let last_used_memory_bytes = self.state.used_memory_bytes; + // * aggressive ~ inf : evict factor aggressive + // * graceful ~ aggressive : evict factor graceful + // * stable ~ graceful : evict factor stable + // * 0 ~ stable : no eviction + let to_evict_bytes = cur_used_memory_bytes.saturating_sub(self.threshold_aggressive) as f64 + * self.eviction_factor_aggressive + + cur_used_memory_bytes + .saturating_sub(self.threshold_graceful) + .min(self.threshold_aggressive - self.threshold_graceful) as f64 + * self.eviction_factor_graceful + + cur_used_memory_bytes + .saturating_sub(self.threshold_stable) + .min(self.threshold_graceful - self.threshold_stable) as f64 + * self.eviction_factor_stable; + let ratio = to_evict_bytes / cur_used_memory_bytes as f64; + let latest_sequence = SEQUENCE_GLOBAL.load(Ordering::Relaxed); + let sequence_diff = + ((latest_sequence - self.watermark_sequence) as f64 * ratio) as Sequence; + self.watermark_sequence = latest_sequence.min(self.watermark_sequence + sequence_diff); + // The watermark calculation works in the following way: // // 1. When the streaming memory usage is below the graceful threshold, we do not evict @@ -224,6 +256,6 @@ impl LruWatermarkController { .set(jvm_allocated_bytes as i64); self.metrics.jvm_active_bytes.set(jvm_active_bytes as i64); - Epoch::from_physical_time(watermark_time_ms) + self.watermark_sequence } } diff --git a/src/compute/src/memory/manager.rs b/src/compute/src/memory/manager.rs index 5e8e1b5be54f..f322339691bf 100644 --- a/src/compute/src/memory/manager.rs +++ b/src/compute/src/memory/manager.rs @@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use risingwave_common::sequence::AtomicSequence; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_stream::executor::monitor::StreamingMetrics; @@ -29,13 +30,17 @@ pub struct MemoryManagerConfig { pub threshold_graceful: f64, pub threshold_stable: f64, + pub eviction_factor_stable: f64, + pub eviction_factor_graceful: f64, + pub eviction_factor_aggressive: f64, + pub metrics: Arc, } /// Compute node uses [`MemoryManager`] to limit the memory usage. pub struct MemoryManager { /// All cached data before the watermark should be evicted. - watermark_epoch: Arc, + watermark_sequence: Arc, metrics: Arc, @@ -52,14 +57,14 @@ impl MemoryManager { tracing::info!("LRU watermark controller: {:?}", &controller); Arc::new(Self { - watermark_epoch: Arc::new(0.into()), + watermark_sequence: Arc::new(0.into()), metrics: config.metrics, controller, }) } - pub fn get_watermark_epoch(&self) -> Arc { - self.watermark_epoch.clone() + pub fn get_watermark_sequence(&self) -> Arc { + self.watermark_sequence.clone() } pub async fn run( @@ -81,20 +86,21 @@ impl MemoryManager { // Wait for a while to check if need eviction. tokio::select! { Ok(_) = system_params_change_rx.changed() => { - let params = system_params_change_rx.borrow().load(); - let new_interval_ms = std::cmp::max(params.barrier_interval_ms(), Self::MIN_TICK_INTERVAL_MS); - if new_interval_ms != interval_ms { - interval_ms = new_interval_ms; - tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); - tracing::info!("updated MemoryManager interval to {}ms", interval_ms); - } + let params = system_params_change_rx.borrow().load(); + let new_interval_ms = std::cmp::max(params.barrier_interval_ms(), Self::MIN_TICK_INTERVAL_MS); + if new_interval_ms != interval_ms { + interval_ms = new_interval_ms; + tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); + tracing::info!("updated MemoryManager interval to {}ms", interval_ms); + } } _ = tick_interval.tick() => { - let new_watermark_epoch = self.controller.lock().unwrap().tick(interval_ms); - self.watermark_epoch.store(new_watermark_epoch.0, Ordering::Relaxed); + let new_watermark_sequence = self.controller.lock().unwrap().tick(interval_ms); + + self.watermark_sequence.store(new_watermark_sequence, Ordering::Relaxed); - self.metrics.lru_runtime_loop_count.inc(); + self.metrics.lru_runtime_loop_count.inc(); } } } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 7119df4926b9..5e75dfaab3ed 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -297,6 +297,18 @@ pub async fn compute_node_serve( .streaming .developer .memory_controller_threshold_stable, + eviction_factor_stable: config + .streaming + .developer + .memory_controller_eviction_factor_stable, + eviction_factor_graceful: config + .streaming + .developer + .memory_controller_eviction_factor_graceful, + eviction_factor_aggressive: config + .streaming + .developer + .memory_controller_eviction_factor_aggressive, metrics: streaming_metrics.clone(), }); @@ -369,7 +381,7 @@ pub async fn compute_node_serve( stream_env.clone(), streaming_metrics.clone(), await_tree_config.clone(), - memory_mgr.get_watermark_epoch(), + memory_mgr.get_watermark_sequence(), ); let grpc_await_tree_reg = await_tree_config diff --git a/src/config/example.toml b/src/config/example.toml index cc12e3533079..fe6b51d70864 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -107,6 +107,9 @@ stream_hash_agg_max_dirty_groups_heap_size = 67108864 stream_memory_controller_threshold_aggressive = 0.9 stream_memory_controller_threshold_graceful = 0.8 stream_memory_controller_threshold_stable = 0.7 +stream_memory_controller_eviction_factor_aggressive = 2.0 +stream_memory_controller_eviction_factor_graceful = 1.5 +stream_memory_controller_eviction_factor_stable = 1.0 stream_enable_arrangement_backfill = true [storage] diff --git a/src/stream/src/cache/managed_lru.rs b/src/stream/src/cache/managed_lru.rs index 42e158a80a78..7e0e388c93b6 100644 --- a/src/stream/src/cache/managed_lru.rs +++ b/src/stream/src/cache/managed_lru.rs @@ -14,15 +14,15 @@ use std::alloc::{Allocator, Global}; use std::borrow::Borrow; -use std::cmp::min; use std::hash::{BuildHasher, Hash}; use std::ops::{Deref, DerefMut}; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; -use lru::{DefaultHasher, LruCache}; +use risingwave_common::lru::{LruCache, RandomState}; use risingwave_common::metrics::LabelGuardedIntGauge; -use risingwave_common::util::epoch::Epoch; +use risingwave_common::sequence::AtomicSequence; +// use risingwave_common::util::epoch::Epoch; use risingwave_common_estimate_size::EstimateSize; use crate::common::metrics::MetricsInfo; @@ -31,37 +31,39 @@ const REPORT_SIZE_EVERY_N_KB_CHANGE: usize = 4096; /// The managed cache is a lru cache that bounds the memory usage by epoch. /// Should be used with `MemoryManager`. -pub struct ManagedLruCache { +pub struct ManagedLruCache +where + K: Hash + Eq, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, +{ inner: LruCache, - /// The entry with epoch less than water should be evicted. - /// Should only be updated by the `MemoryManager`. - watermark_epoch: Arc, - /// The heap size of keys/values - kv_heap_size: usize, - /// The metrics of memory usage - memory_usage_metrics: LabelGuardedIntGauge<3>, - // The metrics of evicted watermark time - lru_evicted_watermark_time_ms: LabelGuardedIntGauge<3>, + + /// The entry with sequence less than `watermark_sequence` should be evicted. + /// `watermark_sequence` should only be updatd by `MemoryManager`. + watermark_sequence: Arc, + // Metrics info _metrics_info: MetricsInfo, - /// The size reported last time - last_reported_size_bytes: usize, -} -impl Drop for ManagedLruCache { - fn drop(&mut self) { - self.memory_usage_metrics.set(0.into()); - } + reporter: HeapSizeReporter, } -impl - ManagedLruCache +impl ManagedLruCache +where + K: Hash + Eq + EstimateSize, + V: EstimateSize, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, { - pub fn new_inner( - inner: LruCache, - watermark_epoch: Arc, + pub fn unbounded_with_hasher_in( + watermark_sequence: Arc, metrics_info: MetricsInfo, + hash_builder: S, + alloc: A, ) -> Self { + let inner = LruCache::unbounded_with_hasher_in(hash_builder, alloc); + let memory_usage_metrics = metrics_info .metrics .stream_memory_usage @@ -72,80 +74,43 @@ impl u64 { - self.inner.current_epoch() - } - - /// An iterator visiting all values in most-recently used order. The iterator element type is - /// &V. - pub fn values(&self) -> impl Iterator { - self.inner.iter().map(|(_k, v)| v) } pub fn put(&mut self, k: K, v: V) -> Option { let key_size = k.estimated_size(); - self.kv_heap_size_inc(key_size + v.estimated_size()); + self.reporter.inc(key_size + v.estimated_size()); let old_val = self.inner.put(k, v); if let Some(old_val) = &old_val { - self.kv_heap_size_dec(key_size + old_val.estimated_size()); + self.reporter.dec(key_size + old_val.estimated_size()); } old_val } + // TODO(MrCroxx): REMOVE ME!!! + pub fn push(&mut self, k: K, v: V) -> Option { + self.put(k, v) + } + pub fn get_mut(&mut self, k: &K) -> Option> { let v = self.inner.get_mut(k); - v.map(|inner| { - MutGuard::new( - inner, - &mut self.kv_heap_size, - &mut self.last_reported_size_bytes, - &mut self.memory_usage_metrics, - ) - }) + v.map(|inner| MutGuard::new(inner, &mut self.reporter)) } pub fn get(&mut self, k: &Q) -> Option<&V> @@ -166,25 +131,7 @@ impl Option> { let v = self.inner.peek_mut(k); - v.map(|inner| { - MutGuard::new( - inner, - &mut self.kv_heap_size, - &mut self.last_reported_size_bytes, - &mut self.memory_usage_metrics, - ) - }) - } - - pub fn push(&mut self, k: K, v: V) -> Option<(K, V)> { - self.kv_heap_size_inc(k.estimated_size() + v.estimated_size()); - - let old_kv = self.inner.push(k, v); - - if let Some((old_key, old_val)) = &old_kv { - self.kv_heap_size_dec(old_key.estimated_size() + old_val.estimated_size()); - } - old_kv + v.map(|inner| MutGuard::new(inner, &mut self.reporter)) } pub fn contains(&self, k: &Q) -> bool @@ -200,129 +147,69 @@ impl bool { - self.inner.len() == 0 + self.inner.is_empty() } pub fn clear(&mut self) { self.inner.clear(); } - fn kv_heap_size_inc(&mut self, size: usize) { - self.kv_heap_size = self.kv_heap_size.saturating_add(size); - self.report_memory_usage(); - } - - fn kv_heap_size_dec(&mut self, size: usize) { - self.kv_heap_size = self.kv_heap_size.saturating_sub(size); - self.report_memory_usage(); - } - - fn report_memory_usage(&mut self) -> bool { - if self.kv_heap_size.abs_diff(self.last_reported_size_bytes) - > REPORT_SIZE_EVERY_N_KB_CHANGE << 10 - { - self.memory_usage_metrics.set(self.kv_heap_size as _); - self.last_reported_size_bytes = self.kv_heap_size; - true - } else { - false - } - } - - fn report_evicted_watermark_time(&self, epoch: u64) { - self.lru_evicted_watermark_time_ms - .set(Epoch(epoch).physical_time() as _); - } - - fn load_cur_epoch(&self) -> u64 { - self.watermark_epoch.load(Ordering::Relaxed) - } -} - -pub fn new_unbounded( - watermark_epoch: Arc, - metrics_info: MetricsInfo, -) -> ManagedLruCache { - ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, metrics_info) + // TODO(MrCroxx): REMOVE ME!!! + pub fn update_epoch(&mut self, _: u64) {} } -pub fn new_with_hasher_in< +impl ManagedLruCache +where K: Hash + Eq + EstimateSize, V: EstimateSize, - S: BuildHasher, - A: Clone + Allocator, ->( - watermark_epoch: Arc, - metrics_info: MetricsInfo, - hasher: S, - alloc: A, -) -> ManagedLruCache { - ManagedLruCache::new_inner( - LruCache::unbounded_with_hasher_in(hasher, alloc), - watermark_epoch, - metrics_info, - ) +{ + pub fn unbounded(watermark_sequence: Arc, metrics_info: MetricsInfo) -> Self { + Self::unbounded_with_hasher(watermark_sequence, metrics_info, RandomState::default()) + } } -pub fn new_with_hasher( - watermark_epoch: Arc, - metrics_info: MetricsInfo, - hasher: S, -) -> ManagedLruCache { - ManagedLruCache::new_inner( - LruCache::unbounded_with_hasher(hasher), - watermark_epoch, - metrics_info, - ) +impl ManagedLruCache +where + K: Hash + Eq + EstimateSize, + V: EstimateSize, + S: BuildHasher + Send + Sync + 'static, +{ + pub fn unbounded_with_hasher( + watermark_sequence: Arc, + metrics_info: MetricsInfo, + hash_builder: S, + ) -> Self { + Self::unbounded_with_hasher_in(watermark_sequence, metrics_info, hash_builder, Global) + } } pub struct MutGuard<'a, V: EstimateSize> { inner: &'a mut V, - // The size of the original value - original_val_size: usize, - // The total size of a collection - total_size: &'a mut usize, - last_reported_size_bytes: &'a mut usize, - memory_usage_metrics: &'a mut LabelGuardedIntGauge<3>, + reporter: &'a mut HeapSizeReporter, + old_value_size: usize, } impl<'a, V: EstimateSize> MutGuard<'a, V> { - pub fn new( - inner: &'a mut V, - total_size: &'a mut usize, - last_reported_size_bytes: &'a mut usize, - memory_usage_metrics: &'a mut LabelGuardedIntGauge<3>, - ) -> Self { - let original_val_size = inner.estimated_size(); + fn new(inner: &'a mut V, reporter: &'a mut HeapSizeReporter) -> Self { + let old_value_size = inner.estimated_size(); Self { inner, - original_val_size, - total_size, - last_reported_size_bytes, - memory_usage_metrics, - } - } - - fn report_memory_usage(&mut self) -> bool { - if self.total_size.abs_diff(*self.last_reported_size_bytes) - > REPORT_SIZE_EVERY_N_KB_CHANGE << 10 - { - self.memory_usage_metrics.set(*self.total_size as _); - *self.last_reported_size_bytes = *self.total_size; - true - } else { - false + reporter, + old_value_size, } } } impl<'a, V: EstimateSize> Drop for MutGuard<'a, V> { fn drop(&mut self) { - *self.total_size = self - .total_size - .saturating_sub(self.original_val_size) - .saturating_add(self.inner.estimated_size()); - self.report_memory_usage(); + let new_value_size = self.inner.estimated_size(); + if new_value_size != self.old_value_size { + self.reporter.apply(|size| { + *size = size + .saturating_sub(self.old_value_size) + .saturating_add(new_value_size) + }) + } } } @@ -339,3 +226,57 @@ impl<'a, V: EstimateSize> DerefMut for MutGuard<'a, V> { self.inner } } + +struct HeapSizeReporter { + metrics: LabelGuardedIntGauge<3>, + heap_size: usize, + last_reported: usize, +} + +impl HeapSizeReporter { + fn new( + heap_size_metrics: LabelGuardedIntGauge<3>, + heap_size: usize, + last_reported: usize, + ) -> Self { + Self { + metrics: heap_size_metrics, + heap_size, + last_reported, + } + } + + fn inc(&mut self, size: usize) { + self.heap_size = self.heap_size.saturating_add(size); + self.try_report(); + } + + fn dec(&mut self, size: usize) { + self.heap_size = self.heap_size.saturating_sub(size); + self.try_report(); + } + + fn apply(&mut self, f: F) + where + F: FnOnce(&mut usize), + { + f(&mut self.heap_size); + self.try_report(); + } + + fn try_report(&mut self) -> bool { + if self.heap_size.abs_diff(self.last_reported) >= REPORT_SIZE_EVERY_N_KB_CHANGE << 10 { + self.metrics.set(self.heap_size as _); + self.last_reported = self.heap_size; + true + } else { + false + } + } +} + +impl Drop for HeapSizeReporter { + fn drop(&mut self) { + self.metrics.set(0); + } +} diff --git a/src/stream/src/executor/aggregation/distinct.rs b/src/stream/src/executor/aggregation/distinct.rs index 8bb298b47c99..205f9118e3a7 100644 --- a/src/stream/src/executor/aggregation/distinct.rs +++ b/src/stream/src/executor/aggregation/distinct.rs @@ -26,7 +26,7 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_storage::StateStore; use super::{AggCall, GroupKey}; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::executor::{ActorContextRef, StreamExecutorResult}; @@ -40,9 +40,9 @@ struct ColumnDeduplicater { } impl ColumnDeduplicater { - fn new(watermark_epoch: Arc, metrics_info: MetricsInfo) -> Self { + fn new(watermark_sequence: Arc, metrics_info: MetricsInfo) -> Self { Self { - cache: new_unbounded(watermark_epoch, metrics_info), + cache: ManagedLruCache::unbounded(watermark_sequence, metrics_info), _phantom: PhantomData, } } diff --git a/src/stream/src/executor/dedup/cache.rs b/src/stream/src/executor/dedup/cache.rs index 5a9d876c356e..937fb05555fc 100644 --- a/src/stream/src/executor/dedup/cache.rs +++ b/src/stream/src/executor/dedup/cache.rs @@ -16,7 +16,7 @@ use std::hash::Hash; use risingwave_common_estimate_size::EstimateSize; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::task::AtomicU64Ref; @@ -27,8 +27,8 @@ pub struct DedupCache { } impl DedupCache { - pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { - let cache = new_unbounded(watermark_epoch, metrics_info); + pub fn new(watermark_sequence: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { + let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info); Self { inner: cache } } diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 685b2e65e083..e2004b32f349 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -41,7 +41,7 @@ use super::sort_buffer::SortBuffer; use super::{ expect_first_barrier, ActorContextRef, Executor, ExecutorInfo, StreamExecutorResult, Watermark, }; -use crate::cache::{cache_may_stale, new_with_hasher, ManagedLruCache}; +use crate::cache::{cache_may_stale, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::common::StreamChunkBuilder; @@ -123,7 +123,7 @@ struct ExecutorInner { distinct_dedup_tables: HashMap>, /// Watermark epoch. - watermark_epoch: AtomicU64Ref, + watermark_sequence: AtomicU64Ref, /// State cache size for extreme agg. extreme_cache_size: usize, @@ -229,7 +229,7 @@ impl HashAggExecutor { storages: args.storages, intermediate_state_table: args.intermediate_state_table, distinct_dedup_tables: args.distinct_dedup_tables, - watermark_epoch: args.watermark_epoch, + watermark_sequence: args.watermark_epoch, extreme_cache_size: args.extreme_cache_size, chunk_size: args.extra.chunk_size, max_dirty_groups_heap_size: args.extra.max_dirty_groups_heap_size, @@ -587,15 +587,15 @@ impl HashAggExecutor { let mut vars = ExecutionVars { stats: ExecutionStats::new(), - agg_group_cache: new_with_hasher( - this.watermark_epoch.clone(), + agg_group_cache: ManagedLruCache::unbounded_with_hasher( + this.watermark_sequence.clone(), agg_group_cache_metrics_info, PrecomputedBuildHasher, ), dirty_groups: Default::default(), distinct_dedup: DistinctDeduplicater::new( &this.agg_calls, - this.watermark_epoch.clone(), + this.watermark_sequence.clone(), &this.distinct_dedup_tables, this.actor_ctx.clone(), ), diff --git a/src/stream/src/executor/join/hash_join.rs b/src/stream/src/executor/join/hash_join.rs index f7088181e3c3..304f35c02007 100644 --- a/src/stream/src/executor/join/hash_join.rs +++ b/src/stream/src/executor/join/hash_join.rs @@ -34,7 +34,7 @@ use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; use super::row::{DegreeType, EncodedJoinRow}; -use crate::cache::{new_with_hasher_in, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::consistency::enable_strict_consistency; @@ -233,7 +233,7 @@ impl JoinHashMap { /// Create a [`JoinHashMap`] with the given LRU capacity. #[allow(clippy::too_many_arguments)] pub fn new( - watermark_epoch: AtomicU64Ref, + watermark_sequence: AtomicU64Ref, join_key_data_types: Vec, state_join_key_indices: Vec, state_all_data_types: Vec, @@ -287,8 +287,12 @@ impl JoinHashMap { &format!("hash join {}", side), ); - let cache = - new_with_hasher_in(watermark_epoch, metrics_info, PrecomputedBuildHasher, alloc); + let cache = ManagedLruCache::unbounded_with_hasher_in( + watermark_sequence, + metrics_info, + PrecomputedBuildHasher, + alloc, + ); Self { inner: cache, diff --git a/src/stream/src/executor/lookup/cache.rs b/src/stream/src/executor/lookup/cache.rs index 6376c6804d75..9cefcf59e0ce 100644 --- a/src/stream/src/executor/lookup/cache.rs +++ b/src/stream/src/executor/lookup/cache.rs @@ -16,7 +16,7 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common_estimate_size::collections::{EstimatedHashSet, EstimatedVec}; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::task::AtomicU64Ref; @@ -79,8 +79,8 @@ impl LookupCache { self.data.clear(); } - pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { - let cache = new_unbounded(watermark_epoch, metrics_info); + pub fn new(watermark_sequence: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { + let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info); Self { data: cache } } } diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index c7912e433489..c9ccb5d278f0 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -36,7 +36,7 @@ use risingwave_storage::mem_table::KeyOp; use risingwave_storage::row_serde::value_serde::ValueRowSerde; use risingwave_storage::StateStore; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTableInner; use crate::executor::error::StreamExecutorError; @@ -449,9 +449,9 @@ pub struct MaterializeCache { type CacheValue = Option; impl MaterializeCache { - pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { + pub fn new(watermark_sequence: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { let cache: ManagedLruCache, CacheValue> = - new_unbounded(watermark_epoch, metrics_info.clone()); + ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone()); Self { data: cache, metrics_info, diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index 16fe77cb64eb..a96cc6ae4699 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -35,7 +35,7 @@ use risingwave_expr::window_function::{ use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::executor::{ @@ -109,7 +109,7 @@ struct ExecutorInner { order_key_index: usize, // no `OrderType` here, cuz we expect the input is ascending state_table: StateTable, state_table_schema_len: usize, - watermark_epoch: AtomicU64Ref, + watermark_sequence: AtomicU64Ref, } struct ExecutionVars { @@ -151,7 +151,7 @@ impl EowcOverWindowExecutor { order_key_index: args.order_key_index, state_table: args.state_table, state_table_schema_len: input_info.schema.len(), - watermark_epoch: args.watermark_epoch, + watermark_sequence: args.watermark_epoch, }, } } @@ -350,7 +350,7 @@ impl EowcOverWindowExecutor { ); let mut vars = ExecutionVars { - partitions: new_unbounded(this.watermark_epoch.clone(), metrics_info), + partitions: ManagedLruCache::unbounded(this.watermark_sequence.clone(), metrics_info), _phantom: PhantomData::, }; diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index a7245c57f368..bc245d5c7ed4 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -40,7 +40,7 @@ use super::over_partition::{ new_empty_partition_cache, shrink_partition_cache, CacheKey, OverPartition, PartitionCache, PartitionDelta, }; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::common::StreamChunkBuilder; @@ -76,7 +76,7 @@ struct ExecutorInner { state_key_to_table_sub_pk_proj: Vec, state_table: StateTable, - watermark_epoch: AtomicU64Ref, + watermark_sequence: AtomicU64Ref, metrics: Arc, /// The maximum size of the chunk produced by executor at a time. @@ -194,7 +194,7 @@ impl OverWindowExecutor { input_schema_len: input_schema.len(), state_key_to_table_sub_pk_proj, state_table: args.state_table, - watermark_epoch: args.watermark_epoch, + watermark_sequence: args.watermark_epoch, metrics: args.metrics, chunk_size: args.chunk_size, cache_policy, @@ -610,7 +610,10 @@ impl OverWindowExecutor { ); let mut vars = ExecutionVars { - cached_partitions: new_unbounded(this.watermark_epoch.clone(), metrics_info), + cached_partitions: ManagedLruCache::unbounded( + this.watermark_sequence.clone(), + metrics_info, + ), recently_accessed_ranges: Default::default(), stats: Default::default(), _phantom: PhantomData::, diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 1d374189cb5e..6bb9efa03cff 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -43,7 +43,7 @@ use super::{ Barrier, Execute, ExecutorInfo, Message, MessageStream, StreamExecutorError, StreamExecutorResult, }; -use crate::cache::{cache_may_stale, new_with_hasher_in, ManagedLruCache}; +use crate::cache::{cache_may_stale, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::executor::join::builder::JoinStreamChunkBuilder; use crate::executor::monitor::StreamingMetrics; @@ -488,7 +488,7 @@ impl TemporalJoinExecutor output_indices: Vec, table_output_indices: Vec, table_stream_key_indices: Vec, - watermark_epoch: AtomicU64Ref, + watermark_sequence: AtomicU64Ref, metrics: Arc, chunk_size: usize, join_key_data_types: Vec, @@ -502,8 +502,8 @@ impl TemporalJoinExecutor "temporal join", ); - let cache = new_with_hasher_in( - watermark_epoch, + let cache = ManagedLruCache::unbounded_with_hasher_in( + watermark_sequence, metrics_info, DefaultHasher::default(), alloc, diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 4f8e3da16599..255e76580cb5 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -28,7 +28,7 @@ use risingwave_storage::StateStore; use super::top_n_cache::TopNCacheTrait; use super::utils::*; use super::{ManagedTopNState, TopNCache}; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::error::StreamResult; @@ -136,8 +136,8 @@ pub struct GroupTopNCache { } impl GroupTopNCache { - pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { - let cache = new_unbounded(watermark_epoch, metrics_info); + pub fn new(watermark_sequence: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { + let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info); Self { data: cache } } }