diff --git a/Cargo.lock b/Cargo.lock index c8c9bf98d9412..c2817d749db5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8983,6 +8983,7 @@ dependencies = [ name = "risingwave_common" version = "1.7.0-alpha" dependencies = [ + "ahash 0.8.6", "anyhow", "arc-swap", "arrow-array 48.0.1", @@ -9017,6 +9018,7 @@ dependencies = [ "foyer", "futures", "governor", + "hashbrown 0.14.3", "hex", "http 0.2.9", "http-body", diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index e6769ac8dd513..5f87c2b69d3b0 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -14,6 +14,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +ahash = "0.8" anyhow = "1" arc-swap = "1" arrow-array = { workspace = true } @@ -48,6 +49,7 @@ fixedbitset = "0.5" foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } governor = { version = "0.6", default-features = false, features = ["std"] } +hashbrown = "0.14" hex = "0.4.3" http = "0.2" humantime = "2.1" diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index d040245236e20..189cc250ba3a1 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -81,6 +81,7 @@ pub use risingwave_common_metrics::{ register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry, }; +pub mod lru; pub mod opts; pub mod range; pub mod row; diff --git a/src/common/src/lru.rs b/src/common/src/lru.rs new file mode 100644 index 0000000000000..fd45cca41db5d --- /dev/null +++ b/src/common/src/lru.rs @@ -0,0 +1,352 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::alloc::{Allocator, Global}; +use std::borrow::Borrow; +use std::hash::{BuildHasher, Hash}; +use std::mem::MaybeUninit; +use std::ptr::NonNull; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +pub use ahash::RandomState; +use hashbrown::hash_table::Entry; +use hashbrown::HashTable; + +pub type Sequence = usize; +pub type AtomicSequence = AtomicUsize; + +struct LruEntry +where + K: Hash + Eq, +{ + prev: Option>>, + next: Option>>, + key: MaybeUninit, + value: MaybeUninit, + hash: u64, + sequence: Sequence, +} + +impl LruEntry +where + K: Hash + Eq, +{ + fn key(&self) -> &K { + unsafe { self.key.assume_init_ref() } + } + + fn value(&self) -> &V { + unsafe { self.value.assume_init_ref() } + } + + fn value_mut(&mut self) -> &mut V { + unsafe { self.value.assume_init_mut() } + } +} + +unsafe impl Send for LruEntry where K: Hash + Eq {} +unsafe impl Sync for LruEntry where K: Hash + Eq {} + +pub struct LruCache +where + K: Hash + Eq, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, +{ + map: HashTable>, A>, + /// dummy node of the lru linked list + dummy: Box, A>, + /// Global sequence. + sequence: Arc, + + alloc: A, + hash_builder: S, +} + +unsafe impl Send for LruCache +where + K: Hash + Eq, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, +{ +} +unsafe impl Sync for LruCache +where + K: Hash + Eq, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, +{ +} + +impl LruCache +where + K: Hash + Eq, +{ + pub fn unbounded(sequence: Arc) -> Self { + Self::unbounded_with_hasher_in(sequence, RandomState::default(), Global) + } +} + +impl LruCache +where + K: Hash + Eq, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, +{ + pub fn unbounded_with_hasher_in( + sequence: Arc, + hash_builder: S, + alloc: A, + ) -> Self { + let map = HashTable::new_in(alloc.clone()); + let mut dummy = Box::new_in( + LruEntry { + prev: None, + next: None, + key: MaybeUninit::uninit(), + value: MaybeUninit::uninit(), + hash: 0, + sequence: Sequence::default(), + }, + alloc.clone(), + ); + let ptr = unsafe { NonNull::new_unchecked(dummy.as_mut() as *mut _) }; + dummy.next = Some(ptr); + dummy.prev = Some(ptr); + Self { + map, + dummy, + sequence, + alloc, + hash_builder, + } + } + + pub fn put(&mut self, key: K, mut value: V) -> Option { + unsafe { + let hash = self.hash_builder.hash_one(&key); + + match self + .map + .entry(hash, |p| p.as_ref().key() == &key, |p| p.as_ref().hash) + { + Entry::Occupied(o) => { + let mut ptr = *o.get(); + let entry = ptr.as_mut(); + std::mem::swap(&mut value, entry.value_mut()); + self.detach(ptr); + self.attach(ptr); + Some(value) + } + Entry::Vacant(v) => { + let entry = Box::new_in( + LruEntry { + prev: None, + next: None, + key: MaybeUninit::new(key), + value: MaybeUninit::new(value), + hash, + sequence: self.sequence.fetch_add(1, Ordering::Relaxed), + }, + self.alloc.clone(), + ); + let ptr = NonNull::new_unchecked(Box::into_raw(entry)); + v.insert(ptr); + self.attach(ptr); + None + } + } + } + } + + pub fn get<'a, Q>(&'a mut self, key: &Q) -> Option<&'a V> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + unsafe { + let key = key.borrow(); + let hash = self.hash_builder.hash_one(key); + if let Some(ptr) = self.map.find(hash, |p| p.as_ref().key().borrow() == key) { + let ptr = *ptr; + self.detach(ptr); + self.attach(ptr); + Some(ptr.as_ref().value()) + } else { + None + } + } + } + + pub fn get_mut<'a, Q>(&'a mut self, key: &Q) -> Option<&'a mut V> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + unsafe { + let key = key.borrow(); + let hash = self.hash_builder.hash_one(key); + if let Some(ptr) = self + .map + .find_mut(hash, |p| p.as_ref().key().borrow() == key) + { + let mut ptr = *ptr; + self.detach(ptr); + self.attach(ptr); + Some(ptr.as_mut().value_mut()) + } else { + None + } + } + } + + pub fn peek<'a, Q>(&'a self, key: &Q) -> Option<&'a V> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + unsafe { + let key = key.borrow(); + let hash = self.hash_builder.hash_one(key); + self.map + .find(hash, |p| p.as_ref().key().borrow() == key) + .map(|ptr| ptr.as_ref().value()) + } + } + + pub fn peek_mut<'a, Q>(&'a mut self, key: &Q) -> Option<&'a mut V> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + unsafe { + let key = key.borrow(); + let hash = self.hash_builder.hash_one(key); + self.map + .find(hash, |p| p.as_ref().key().borrow() == key) + .map(|ptr| ptr.clone().as_mut().value_mut()) + } + } + + pub fn contains(&self, key: &Q) -> bool + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + unsafe { + let key = key.borrow(); + let hash = self.hash_builder.hash_one(key); + self.map + .find(hash, |p| p.as_ref().key().borrow() == key) + .is_some() + } + } + + pub fn len(&self) -> usize { + self.map.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Pop first entry if its sequence is less than the given sequence. + pub fn pop_with_sequence(&mut self, sequence: Sequence) -> Option<(K, V, Sequence)> { + unsafe { + if self.is_empty() { + return None; + } + + let ptr = self.dummy.next.unwrap_unchecked(); + if ptr.as_ref().sequence >= sequence { + return None; + } + + self.detach(ptr); + + let entry = Box::from_raw_in(ptr.as_ptr(), self.alloc.clone()); + + let key = entry.key.assume_init(); + let value = entry.value.assume_init(); + let sequence = entry.sequence; + + let hash = self.hash_builder.hash_one(&key); + + match self + .map + .entry(hash, |p| p.as_ref().key() == &key, |p| p.as_ref().hash) + { + Entry::Occupied(o) => { + o.remove(); + } + Entry::Vacant(_) => {} + } + + Some((key, value, sequence)) + } + } + + pub fn clear(&mut self) { + unsafe { + while !self.is_empty() { + let ptr = self.dummy.next.unwrap_unchecked(); + self.detach(ptr); + let entry = Box::from_raw_in(ptr.as_ptr(), self.alloc.clone()); + let key = entry.key.assume_init(); + let hash = self.hash_builder.hash_one(&key); + match self + .map + .entry(hash, |p| p.as_ref().key() == &key, |p| p.as_ref().hash) + { + Entry::Occupied(o) => o.remove(), + Entry::Vacant(_) => unreachable!(), + }; + } + } + } + + fn detach(&mut self, mut ptr: NonNull>) { + unsafe { + let entry = ptr.as_mut(); + + debug_assert!(entry.prev.is_some() && entry.next.is_some()); + + entry.prev.unwrap_unchecked().as_mut().next = entry.next; + entry.next.unwrap_unchecked().as_mut().prev = entry.prev; + + entry.next = None; + entry.prev = None; + } + } + + fn attach(&mut self, mut ptr: NonNull>) { + unsafe { + let entry = ptr.as_mut(); + + debug_assert!(entry.prev.is_none() && entry.next.is_none()); + + entry.next = Some(NonNull::new_unchecked(self.dummy.as_mut() as *mut _)); + entry.prev = self.dummy.prev; + + self.dummy.prev.unwrap_unchecked().as_mut().next = Some(ptr); + self.dummy.prev = Some(ptr); + + entry.sequence = self.sequence.fetch_add(1, Ordering::Relaxed); + } + } +} + +#[cfg(test)] +mod tests {} diff --git a/src/compute/src/memory/controller.rs b/src/compute/src/memory/controller.rs index 55d0547249117..f5632e101831a 100644 --- a/src/compute/src/memory/controller.rs +++ b/src/compute/src/memory/controller.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::Ordering; use std::sync::Arc; +use risingwave_common::lru::{AtomicSequence, Sequence}; use risingwave_common::util::epoch::Epoch; use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats; use risingwave_stream::executor::monitor::StreamingMetrics; @@ -77,6 +79,9 @@ pub struct LruWatermarkController { /// The state from previous tick state: State, + + latest_sequence: Arc, + evict_sequence: Sequence, } impl LruWatermarkController { @@ -92,6 +97,8 @@ impl LruWatermarkController { threshold_graceful, threshold_aggressive, state: State::default(), + latest_sequence: config.latest_sequence.clone(), + evict_sequence: 0, } } } @@ -131,7 +138,7 @@ fn jemalloc_memory_stats() -> MemoryStats { } impl LruWatermarkController { - pub fn tick(&mut self, interval_ms: u32) -> Epoch { + pub fn tick(&mut self, interval_ms: u32) -> (Epoch, Sequence) { // NOTE: Be careful! The meaning of `allocated` and `active` differ in JeMalloc and JVM let MemoryStats { allocated: jemalloc_allocated_bytes, @@ -146,6 +153,17 @@ impl LruWatermarkController { let last_step = self.state.lru_watermark_step; let last_used_memory_bytes = self.state.used_memory_bytes; + let target_memory_bytes = self.threshold_graceful; + if cur_used_memory_bytes > target_memory_bytes { + let ratio = + (cur_used_memory_bytes - target_memory_bytes) as f64 / cur_used_memory_bytes as f64; + let latest_sequence = self.latest_sequence.load(Ordering::Relaxed); + let sequence_diff = + ((latest_sequence - self.evict_sequence) as f64 * ratio) as Sequence; + self.evict_sequence = self.evict_sequence.max(latest_sequence - sequence_diff); + } + let sequence = self.evict_sequence; + // The watermark calculation works in the following way: // // 1. When the streaming memory usage is below the graceful threshold, we do not evict @@ -224,6 +242,6 @@ impl LruWatermarkController { .set(jvm_allocated_bytes as i64); self.metrics.jvm_active_bytes.set(jvm_active_bytes as i64); - Epoch::from_physical_time(watermark_time_ms) + (Epoch::from_physical_time(watermark_time_ms), sequence) } } diff --git a/src/compute/src/memory/manager.rs b/src/compute/src/memory/manager.rs index 5e8e1b5be54ff..4becde877dc6c 100644 --- a/src/compute/src/memory/manager.rs +++ b/src/compute/src/memory/manager.rs @@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use risingwave_common::lru::AtomicSequence; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_stream::executor::monitor::StreamingMetrics; @@ -30,6 +31,8 @@ pub struct MemoryManagerConfig { pub threshold_stable: f64, pub metrics: Arc, + pub latest_sequence: Arc, + pub evict_sequence: Arc, } /// Compute node uses [`MemoryManager`] to limit the memory usage. @@ -37,6 +40,8 @@ pub struct MemoryManager { /// All cached data before the watermark should be evicted. watermark_epoch: Arc, + evict_sequence: Arc, + metrics: Arc, controller: Mutex, @@ -53,6 +58,7 @@ impl MemoryManager { Arc::new(Self { watermark_epoch: Arc::new(0.into()), + evict_sequence: config.evict_sequence.clone(), metrics: config.metrics, controller, }) @@ -91,8 +97,10 @@ impl MemoryManager { } _ = tick_interval.tick() => { - let new_watermark_epoch = self.controller.lock().unwrap().tick(interval_ms); + let (new_watermark_epoch, new_watermark_sequence) = self.controller.lock().unwrap().tick(interval_ms); + self.watermark_epoch.store(new_watermark_epoch.0, Ordering::Relaxed); + self.evict_sequence.store(new_watermark_sequence, Ordering::Relaxed); self.metrics.lru_runtime_loop_count.inc(); } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index c4ee1021a93c9..b78a68351c450 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -14,7 +14,7 @@ use std::net::SocketAddr; use std::sync::atomic::AtomicU32; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use std::time::Duration; use risingwave_batch::monitor::{ @@ -26,6 +26,7 @@ use risingwave_common::config::{ load_config, AsyncStackTraceOption, MetricLevel, StorageMemoryConfig, MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE, }; +use risingwave_common::lru::AtomicSequence; use risingwave_common::monitor::connection::{RouterExt, TcpConfig}; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; use risingwave_common::system_param::reader::SystemParamsRead; @@ -82,6 +83,11 @@ use crate::rpc::service::stream_service::StreamServiceImpl; use crate::telemetry::ComputeTelemetryCreator; use crate::ComputeNodeOpts; +pub static GLOBAL_LATEST_SEQUENCE: LazyLock> = + LazyLock::new(|| Arc::new(AtomicSequence::default())); +pub static GLOBAL_EVICT_SEQUENCE: LazyLock> = + LazyLock::new(|| Arc::new(AtomicSequence::default())); + /// Bootstraps the compute-node. pub async fn compute_node_serve( listen_addr: SocketAddr, @@ -298,6 +304,8 @@ pub async fn compute_node_serve( .developer .memory_controller_threshold_stable, metrics: streaming_metrics.clone(), + latest_sequence: GLOBAL_LATEST_SEQUENCE.clone(), + evict_sequence: GLOBAL_EVICT_SEQUENCE.clone(), }); // Run a background memory manager @@ -372,6 +380,8 @@ pub async fn compute_node_serve( streaming_metrics.clone(), await_tree_config.clone(), memory_mgr.get_watermark_epoch(), + GLOBAL_LATEST_SEQUENCE.clone(), + GLOBAL_EVICT_SEQUENCE.clone(), ); let grpc_await_tree_reg = await_tree_config diff --git a/src/stream/src/cache/managed_lru.rs b/src/stream/src/cache/managed_lru.rs index 42e158a80a78e..d5a0cdd92fb0a 100644 --- a/src/stream/src/cache/managed_lru.rs +++ b/src/stream/src/cache/managed_lru.rs @@ -14,15 +14,14 @@ use std::alloc::{Allocator, Global}; use std::borrow::Borrow; -use std::cmp::min; use std::hash::{BuildHasher, Hash}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use lru::{DefaultHasher, LruCache}; +use risingwave_common::lru::{AtomicSequence, LruCache, RandomState}; use risingwave_common::metrics::LabelGuardedIntGauge; -use risingwave_common::util::epoch::Epoch; +// use risingwave_common::util::epoch::Epoch; use risingwave_common_estimate_size::EstimateSize; use crate::common::metrics::MetricsInfo; @@ -31,37 +30,92 @@ const REPORT_SIZE_EVERY_N_KB_CHANGE: usize = 4096; /// The managed cache is a lru cache that bounds the memory usage by epoch. /// Should be used with `MemoryManager`. -pub struct ManagedLruCache { +pub struct ManagedLruCache +where + K: Hash + Eq, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, +{ inner: LruCache, /// The entry with epoch less than water should be evicted. /// Should only be updated by the `MemoryManager`. - watermark_epoch: Arc, + _watermark_epoch: Arc, + evict_sequence: Arc, /// The heap size of keys/values kv_heap_size: usize, /// The metrics of memory usage memory_usage_metrics: LabelGuardedIntGauge<3>, // The metrics of evicted watermark time - lru_evicted_watermark_time_ms: LabelGuardedIntGauge<3>, + _lru_evicted_watermark_time_ms: LabelGuardedIntGauge<3>, // Metrics info _metrics_info: MetricsInfo, /// The size reported last time last_reported_size_bytes: usize, } -impl Drop for ManagedLruCache { +impl Drop for ManagedLruCache +where + K: Hash + Eq, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, +{ fn drop(&mut self) { self.memory_usage_metrics.set(0.into()); } } -impl - ManagedLruCache +impl ManagedLruCache +where + K: Hash + Eq + EstimateSize, + V: EstimateSize, + S: BuildHasher + Send + Sync + 'static, + A: Clone + Allocator, { - pub fn new_inner( - inner: LruCache, + // pub fn new_inner( + // inner: LruCache, + // watermark_epoch: Arc, + // metrics_info: MetricsInfo, + // ) -> Self { + // let memory_usage_metrics = metrics_info + // .metrics + // .stream_memory_usage + // .with_guarded_label_values(&[ + // &metrics_info.table_id, + // &metrics_info.actor_id, + // &metrics_info.desc, + // ]); + // memory_usage_metrics.set(0.into()); + + // let lru_evicted_watermark_time_ms = metrics_info + // .metrics + // .lru_evicted_watermark_time_ms + // .with_guarded_label_values(&[ + // &metrics_info.table_id, + // &metrics_info.actor_id, + // &metrics_info.desc, + // ]); + + // Self { + // inner, + // watermark_epoch, + // kv_heap_size: 0, + // memory_usage_metrics, + // lru_evicted_watermark_time_ms, + // _metrics_info: metrics_info, + // last_reported_size_bytes: 0, + // } + // } + + pub fn unbounded_with_hasher_in( watermark_epoch: Arc, metrics_info: MetricsInfo, + latest_sequence: Arc, + evict_sequence: Arc, + hash_builder: S, + alloc: A, ) -> Self { + let inner = LruCache::unbounded_with_hasher_in(latest_sequence, hash_builder, alloc); + let memory_usage_metrics = metrics_info .metrics .stream_memory_usage @@ -83,10 +137,11 @@ impl u64 { - self.inner.current_epoch() - } - - /// An iterator visiting all values in most-recently used order. The iterator element type is - /// &V. - pub fn values(&self) -> impl Iterator { - self.inner.iter().map(|(_k, v)| v) - } + // /// Evict epochs lower than the watermark, except those entry which touched in this epoch + // pub fn evict_except_cur_epoch(&mut self) { + // let epoch = min(self.load_cur_epoch(), self.inner.current_epoch()); + // self.evict_by_epoch(epoch); + // } + + // /// Evict epochs lower than the watermark + // fn evict_by_epoch(&mut self, epoch: u64) { + // while let Some((key, value, _)) = self.inner.pop_lru_by_epoch(epoch) { + // let charge = key.estimated_size() + value.estimated_size(); + // self.kv_heap_size_dec(charge); + // } + // self.report_evicted_watermark_time(epoch); + // } + + // pub fn update_epoch(&mut self, epoch: u64) { + // self.inner.update_epoch(epoch); + // } + + // pub fn current_epoch(&mut self) -> u64 { + // self.inner.current_epoch() + // } + + // TODO(MrCroxx): remove me? + // /// An iterator visiting all values in most-recently used order. The iterator element type is + // /// &V. + // pub fn values(&self) -> impl Iterator { + // self.inner.iter().map(|(_k, v)| v) + // } pub fn put(&mut self, k: K, v: V) -> Option { let key_size = k.estimated_size(); @@ -176,16 +237,16 @@ impl Option<(K, V)> { - self.kv_heap_size_inc(k.estimated_size() + v.estimated_size()); + // pub fn push(&mut self, k: K, v: V) -> Option<(K, V)> { + // self.kv_heap_size_inc(k.estimated_size() + v.estimated_size()); - let old_kv = self.inner.push(k, v); + // let old_kv = self.inner.push(k, v); - if let Some((old_key, old_val)) = &old_kv { - self.kv_heap_size_dec(old_key.estimated_size() + old_val.estimated_size()); - } - old_kv - } + // if let Some((old_key, old_val)) = &old_kv { + // self.kv_heap_size_dec(old_key.estimated_size() + old_val.estimated_size()); + // } + // old_kv + // } pub fn contains(&self, k: &Q) -> bool where @@ -200,7 +261,7 @@ impl bool { - self.inner.len() == 0 + self.inner.is_empty() } pub fn clear(&mut self) { @@ -229,51 +290,59 @@ impl u64 { - self.watermark_epoch.load(Ordering::Relaxed) - } -} + // fn report_evicted_watermark_time(&self, epoch: u64) { + // self._lru_evicted_watermark_time_ms + // .set(Epoch(epoch).physical_time() as _); + // } -pub fn new_unbounded( - watermark_epoch: Arc, - metrics_info: MetricsInfo, -) -> ManagedLruCache { - ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, metrics_info) + // fn load_cur_epoch(&self) -> u64 { + // self._watermark_epoch.load(Ordering::Relaxed) + // } } -pub fn new_with_hasher_in< +impl ManagedLruCache +where K: Hash + Eq + EstimateSize, V: EstimateSize, - S: BuildHasher, - A: Clone + Allocator, ->( - watermark_epoch: Arc, - metrics_info: MetricsInfo, - hasher: S, - alloc: A, -) -> ManagedLruCache { - ManagedLruCache::new_inner( - LruCache::unbounded_with_hasher_in(hasher, alloc), - watermark_epoch, - metrics_info, - ) +{ + pub fn unbounded( + watermark_epoch: Arc, + metrics_info: MetricsInfo, + latest_sequence: Arc, + evict_sequence: Arc, + ) -> Self { + Self::unbounded_with_hasher( + watermark_epoch, + metrics_info, + latest_sequence, + evict_sequence, + RandomState::default(), + ) + } } -pub fn new_with_hasher( - watermark_epoch: Arc, - metrics_info: MetricsInfo, - hasher: S, -) -> ManagedLruCache { - ManagedLruCache::new_inner( - LruCache::unbounded_with_hasher(hasher), - watermark_epoch, - metrics_info, - ) +impl ManagedLruCache +where + K: Hash + Eq + EstimateSize, + V: EstimateSize, + S: BuildHasher + Send + Sync + 'static, +{ + pub fn unbounded_with_hasher( + watermark_epoch: Arc, + metrics_info: MetricsInfo, + latest_sequence: Arc, + evict_sequence: Arc, + hash_builder: S, + ) -> Self { + Self::unbounded_with_hasher_in( + watermark_epoch, + metrics_info, + latest_sequence, + evict_sequence, + hash_builder, + Global, + ) + } } pub struct MutGuard<'a, V: EstimateSize> { diff --git a/src/stream/src/executor/agg_common.rs b/src/stream/src/executor/agg_common.rs index 91c414877a76b..a66084f7fa73a 100644 --- a/src/stream/src/executor/agg_common.rs +++ b/src/stream/src/executor/agg_common.rs @@ -13,7 +13,9 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; +use risingwave_common::lru::AtomicSequence; use risingwave_expr::aggregate::AggCall; use risingwave_pb::stream_plan::PbAggNodeVersion; use risingwave_storage::StateStore; @@ -43,6 +45,8 @@ pub struct AggExecutorArgs { pub intermediate_state_table: StateTable, pub distinct_dedup_tables: HashMap>, pub watermark_epoch: AtomicU64Ref, + pub latest_sequence: Arc, + pub evict_sequence: Arc, // extra pub extra: E, diff --git a/src/stream/src/executor/aggregation/distinct.rs b/src/stream/src/executor/aggregation/distinct.rs index 8bb298b47c99d..c1d8d3b9ed192 100644 --- a/src/stream/src/executor/aggregation/distinct.rs +++ b/src/stream/src/executor/aggregation/distinct.rs @@ -20,13 +20,14 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_common::array::{ArrayRef, Op}; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; +use risingwave_common::lru::AtomicSequence; use risingwave_common::row::{self, CompactedRow, OwnedRow, Row, RowExt}; use risingwave_common::types::{ScalarImpl, ScalarRefImpl}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_storage::StateStore; use super::{AggCall, GroupKey}; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::executor::{ActorContextRef, StreamExecutorResult}; @@ -40,9 +41,19 @@ struct ColumnDeduplicater { } impl ColumnDeduplicater { - fn new(watermark_epoch: Arc, metrics_info: MetricsInfo) -> Self { + fn new( + watermark_epoch: Arc, + metrics_info: MetricsInfo, + latest_sequence: Arc, + evict_sequence: Arc, + ) -> Self { Self { - cache: new_unbounded(watermark_epoch, metrics_info), + cache: ManagedLruCache::unbounded( + watermark_epoch, + metrics_info, + latest_sequence, + evict_sequence, + ), _phantom: PhantomData, } } @@ -219,6 +230,8 @@ impl DistinctDeduplicater { pub fn new( agg_calls: &[AggCall], watermark_epoch: Arc, + latest_sequence: Arc, + evict_sequence: Arc, distinct_dedup_tables: &HashMap>, ctx: ActorContextRef, ) -> Self { @@ -238,7 +251,12 @@ impl DistinctDeduplicater { "distinct dedup", ); let call_indices: Box<[_]> = indices_and_calls.into_iter().map(|v| v.0).collect(); - let deduplicater = ColumnDeduplicater::new(watermark_epoch.clone(), metrics_info); + let deduplicater = ColumnDeduplicater::new( + watermark_epoch.clone(), + metrics_info, + latest_sequence.clone(), + evict_sequence.clone(), + ); (distinct_col, (call_indices, deduplicater)) }) .collect(); @@ -391,6 +409,8 @@ mod tests { let mut deduplicater = DistinctDeduplicater::new( &agg_calls, Arc::new(AtomicU64::new(0)), + Arc::new(AtomicSequence::new(0)), + Arc::new(AtomicSequence::new(0)), &dedup_tables, ActorContext::for_test(0), ); @@ -482,6 +502,8 @@ mod tests { let mut deduplicater = DistinctDeduplicater::new( &agg_calls, Arc::new(AtomicU64::new(0)), + Arc::new(AtomicSequence::new(0)), + Arc::new(AtomicSequence::new(0)), &dedup_tables, ActorContext::for_test(0), ); @@ -571,6 +593,8 @@ mod tests { let mut deduplicater = DistinctDeduplicater::new( &agg_calls, Arc::new(AtomicU64::new(0)), + Arc::new(AtomicSequence::new(0)), + Arc::new(AtomicSequence::new(0)), &dedup_tables, ActorContext::for_test(0), ); diff --git a/src/stream/src/executor/dedup/append_only_dedup.rs b/src/stream/src/executor/dedup/append_only_dedup.rs index 7b171a1ac844d..82d13d35fc069 100644 --- a/src/stream/src/executor/dedup/append_only_dedup.rs +++ b/src/stream/src/executor/dedup/append_only_dedup.rs @@ -19,6 +19,7 @@ use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::BitmapBuilder; +use risingwave_common::lru::AtomicSequence; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_storage::StateStore; @@ -45,12 +46,15 @@ pub struct AppendOnlyDedupExecutor { } impl AppendOnlyDedupExecutor { + #[expect(clippy::too_many_arguments)] pub fn new( ctx: ActorContextRef, input: Executor, dedup_cols: Vec, state_table: StateTable, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, metrics: Arc, ) -> Self { let metrics_info = @@ -60,7 +64,12 @@ impl AppendOnlyDedupExecutor { input: Some(input), dedup_cols, state_table, - cache: DedupCache::new(watermark_epoch, metrics_info), + cache: DedupCache::new( + watermark_epoch, + metrics_info, + latest_sequence, + evict_sequence, + ), } } @@ -240,6 +249,8 @@ mod tests { dedup_col_indices, state_table, Arc::new(AtomicU64::new(0)), + Arc::new(AtomicSequence::new(0)), + Arc::new(AtomicSequence::new(0)), Arc::new(StreamingMetrics::unused()), ) .boxed() diff --git a/src/stream/src/executor/dedup/cache.rs b/src/stream/src/executor/dedup/cache.rs index 5a9d876c356e3..e2aabd8b623a9 100644 --- a/src/stream/src/executor/dedup/cache.rs +++ b/src/stream/src/executor/dedup/cache.rs @@ -13,10 +13,12 @@ // limitations under the License. use std::hash::Hash; +use std::sync::Arc; +use risingwave_common::lru::AtomicSequence; use risingwave_common_estimate_size::EstimateSize; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::task::AtomicU64Ref; @@ -27,8 +29,18 @@ pub struct DedupCache { } impl DedupCache { - pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { - let cache = new_unbounded(watermark_epoch, metrics_info); + pub fn new( + watermark_epoch: AtomicU64Ref, + metrics_info: MetricsInfo, + latest_sequence: Arc, + evict_sequence: Arc, + ) -> Self { + let cache = ManagedLruCache::unbounded( + watermark_epoch, + metrics_info, + latest_sequence, + evict_sequence, + ); Self { inner: cache } } @@ -40,7 +52,7 @@ impl DedupCache { /// Insert a `key` into the cache without checking for duplication. pub fn insert(&mut self, key: K) { - self.inner.push(key, ()); + self.inner.put(key, ()); } /// Check whether the given key is in the cache. @@ -53,9 +65,9 @@ impl DedupCache { self.inner.evict() } - pub fn update_epoch(&mut self, epoch: u64) { + pub fn update_epoch(&mut self, _epoch: u64) { // Update the current epoch in `ManagedLruCache` - self.inner.update_epoch(epoch) + // self.inner.update_epoch(epoch) } /// Clear everything in the cache. @@ -69,12 +81,17 @@ mod tests { use std::sync::atomic::AtomicU64; use std::sync::Arc; - use super::DedupCache; + use super::*; use crate::common::metrics::MetricsInfo; #[test] fn test_dedup_cache() { - let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000)), MetricsInfo::for_test()); + let mut cache = DedupCache::new( + Arc::new(AtomicU64::new(10000)), + MetricsInfo::for_test(), + Arc::new(AtomicSequence::new(0)), + Arc::new(AtomicSequence::new(0)), + ); cache.insert(10); assert!(cache.contains(&10)); diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 685b2e65e0831..6cb8bbdbf392b 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -23,6 +23,7 @@ use risingwave_common::array::StreamChunk; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::Schema; use risingwave_common::hash::{HashKey, PrecomputedBuildHasher}; +use risingwave_common::lru::AtomicSequence; use risingwave_common::types::ScalarImpl; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqFast; @@ -41,7 +42,7 @@ use super::sort_buffer::SortBuffer; use super::{ expect_first_barrier, ActorContextRef, Executor, ExecutorInfo, StreamExecutorResult, Watermark, }; -use crate::cache::{cache_may_stale, new_with_hasher, ManagedLruCache}; +use crate::cache::{cache_may_stale, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::common::StreamChunkBuilder; @@ -125,6 +126,9 @@ struct ExecutorInner { /// Watermark epoch. watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, + /// State cache size for extreme agg. extreme_cache_size: usize, @@ -230,6 +234,8 @@ impl HashAggExecutor { intermediate_state_table: args.intermediate_state_table, distinct_dedup_tables: args.distinct_dedup_tables, watermark_epoch: args.watermark_epoch, + latest_sequence: args.latest_sequence, + evict_sequence: args.evict_sequence, extreme_cache_size: args.extreme_cache_size, chunk_size: args.extra.chunk_size, max_dirty_groups_heap_size: args.extra.max_dirty_groups_heap_size, @@ -587,15 +593,19 @@ impl HashAggExecutor { let mut vars = ExecutionVars { stats: ExecutionStats::new(), - agg_group_cache: new_with_hasher( + agg_group_cache: ManagedLruCache::unbounded_with_hasher( this.watermark_epoch.clone(), agg_group_cache_metrics_info, + this.latest_sequence.clone(), + this.evict_sequence.clone(), PrecomputedBuildHasher, ), dirty_groups: Default::default(), distinct_dedup: DistinctDeduplicater::new( &this.agg_calls, this.watermark_epoch.clone(), + this.latest_sequence.clone(), + this.evict_sequence.clone(), &this.distinct_dedup_tables, this.actor_ctx.clone(), ), @@ -620,10 +630,10 @@ impl HashAggExecutor { this.all_state_tables_mut().for_each(|table| { table.init_epoch(barrier.epoch); }); - vars.agg_group_cache.update_epoch(barrier.epoch.curr); - vars.distinct_dedup.dedup_caches_mut().for_each(|cache| { - cache.update_epoch(barrier.epoch.curr); - }); + // vars.agg_group_cache.update_epoch(barrier.epoch.curr); + // vars.distinct_dedup.dedup_caches_mut().for_each(|cache| { + // cache.update_epoch(barrier.epoch.curr); + // }); yield Message::Barrier(barrier); @@ -696,10 +706,10 @@ impl HashAggExecutor { } // Update the current epoch. - vars.agg_group_cache.update_epoch(barrier.epoch.curr); - vars.distinct_dedup.dedup_caches_mut().for_each(|cache| { - cache.update_epoch(barrier.epoch.curr); - }); + // vars.agg_group_cache.update_epoch(barrier.epoch.curr); + // vars.distinct_dedup.dedup_caches_mut().for_each(|cache| { + // cache.update_epoch(barrier.epoch.curr); + // }); yield Message::Barrier(barrier); } diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index b3f6b8102b4f4..3aa17f807ef26 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -23,6 +23,7 @@ use itertools::Itertools; use multimap::MultiMap; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::hash::{HashKey, NullBitmap}; +use risingwave_common::lru::AtomicSequence; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, DefaultOrd, ToOwnedDatum}; use risingwave_common::util::epoch::EpochPair; @@ -226,6 +227,8 @@ impl HashJoinExecutor, degree_state_table_r: StateTable, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, is_append_only: bool, metrics: Arc, chunk_size: usize, @@ -391,6 +394,8 @@ impl HashJoinExecutor HashJoinExecutor JoinHashMap { /// Create a [`JoinHashMap`] with the given LRU capacity. #[allow(clippy::too_many_arguments)] pub fn new( + latest_sequence: Arc, + evict_sequence: Arc, watermark_epoch: AtomicU64Ref, join_key_data_types: Vec, state_join_key_indices: Vec, @@ -286,8 +289,14 @@ impl JoinHashMap { &format!("hash join {}", side), ); - let cache = - new_with_hasher_in(watermark_epoch, metrics_info, PrecomputedBuildHasher, alloc); + let cache = ManagedLruCache::unbounded_with_hasher_in( + watermark_epoch, + metrics_info, + latest_sequence, + evict_sequence, + PrecomputedBuildHasher, + alloc, + ); Self { inner: cache, @@ -315,9 +324,9 @@ impl JoinHashMap { self.degree_state.table.init_epoch(epoch); } - pub fn update_epoch(&mut self, epoch: u64) { + pub fn update_epoch(&mut self, _epoch: u64) { // Update the current epoch in `ManagedLruCache` - self.inner.update_epoch(epoch) + // self.inner.update_epoch(epoch) } /// Update the vnode bitmap and manipulate the cache if necessary. diff --git a/src/stream/src/executor/lookup/cache.rs b/src/stream/src/executor/lookup/cache.rs index 6376c6804d759..d1a4fe8c0ffbb 100644 --- a/src/stream/src/executor/lookup/cache.rs +++ b/src/stream/src/executor/lookup/cache.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::lru::AtomicSequence; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common_estimate_size::collections::{EstimatedHashSet, EstimatedVec}; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::task::AtomicU64Ref; @@ -35,7 +38,7 @@ impl LookupCache { /// Update a key after lookup cache misses. pub fn batch_update(&mut self, key: OwnedRow, value: EstimatedVec) { - self.data.push(key, LookupEntryState::from_vec(value)); + self.data.put(key, LookupEntryState::from_vec(value)); } /// Apply a batch from the arrangement side @@ -70,8 +73,8 @@ impl LookupCache { } /// Update the current epoch. - pub fn update_epoch(&mut self, epoch: u64) { - self.data.update_epoch(epoch); + pub fn update_epoch(&mut self, _epoch: u64) { + // self.data.update_epoch(epoch); } /// Clear the cache. @@ -79,8 +82,18 @@ impl LookupCache { self.data.clear(); } - pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { - let cache = new_unbounded(watermark_epoch, metrics_info); + pub fn new( + watermark_epoch: AtomicU64Ref, + metrics_info: MetricsInfo, + latest_sequence: Arc, + evict_sequence: Arc, + ) -> Self { + let cache = ManagedLruCache::unbounded( + watermark_epoch, + metrics_info, + latest_sequence, + evict_sequence, + ); Self { data: cache } } } diff --git a/src/stream/src/executor/lookup/impl_.rs b/src/stream/src/executor/lookup/impl_.rs index 20873a8c7338f..dfecbfc797310 100644 --- a/src/stream/src/executor/lookup/impl_.rs +++ b/src/stream/src/executor/lookup/impl_.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::RowRef; use risingwave_common::catalog::{ColumnDesc, Schema}; +use risingwave_common::lru::AtomicSequence; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; @@ -98,6 +101,10 @@ pub struct LookupExecutorParams { pub watermark_epoch: AtomicU64Ref, + pub latest_sequence: Arc, + + pub evict_sequence: Arc, + pub chunk_size: usize, } @@ -116,6 +123,8 @@ impl LookupExecutor { column_mapping, storage_table, watermark_epoch, + latest_sequence, + evict_sequence, chunk_size, } = params; @@ -218,7 +227,12 @@ impl LookupExecutor { }, column_mapping, key_indices_mapping, - lookup_cache: LookupCache::new(watermark_epoch, metrics_info), + lookup_cache: LookupCache::new( + watermark_epoch, + metrics_info, + latest_sequence, + evict_sequence, + ), chunk_size, } } diff --git a/src/stream/src/executor/lookup/tests.rs b/src/stream/src/executor/lookup/tests.rs index 17ff64cedb424..5e7a628cb97e9 100644 --- a/src/stream/src/executor/lookup/tests.rs +++ b/src/stream/src/executor/lookup/tests.rs @@ -21,6 +21,7 @@ use itertools::Itertools; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, ConflictBehavior, Field, Schema, TableId}; +use risingwave_common::lru::AtomicSequence; use risingwave_common::types::DataType; use risingwave_common::util::epoch::test_epoch; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; @@ -222,6 +223,8 @@ async fn test_lookup_this_epoch() { vec![0, 1], ), watermark_epoch: Arc::new(AtomicU64::new(0)), + latest_sequence: Arc::new(AtomicSequence::new(0)), + evict_sequence: Arc::new(AtomicSequence::new(0)), chunk_size: 1024, })); let mut lookup_executor = lookup_executor.execute(); @@ -296,6 +299,8 @@ async fn test_lookup_last_epoch() { vec![0, 1], ), watermark_epoch: Arc::new(AtomicU64::new(0)), + latest_sequence: Arc::new(AtomicSequence::new(0)), + evict_sequence: Arc::new(AtomicSequence::new(0)), chunk_size: 1024, })); let mut lookup_executor = lookup_executor.execute(); diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index 96fb1c2fc2fc9..ed0dcf02436c6 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -25,6 +25,7 @@ use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Schema, TableId}; +use risingwave_common::lru::AtomicSequence; use risingwave_common::row::{CompactedRow, RowDeserializer}; use risingwave_common::types::DataType; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; @@ -36,7 +37,7 @@ use risingwave_storage::mem_table::KeyOp; use risingwave_storage::row_serde::value_serde::ValueRowSerde; use risingwave_storage::StateStore; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTableInner; use crate::executor::error::StreamExecutorError; @@ -79,6 +80,8 @@ impl MaterializeExecutor { vnodes: Option>, table_catalog: &Table, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, conflict_behavior: ConflictBehavior, metrics: Arc, ) -> Self { @@ -103,7 +106,12 @@ impl MaterializeExecutor { state_table, arrange_key_indices, actor_context, - materialize_cache: MaterializeCache::new(watermark_epoch, metrics_info), + materialize_cache: MaterializeCache::new( + watermark_epoch, + metrics_info, + latest_sequence, + evict_sequence, + ), conflict_behavior, } } @@ -223,7 +231,7 @@ impl MaterializeExecutor { self.materialize_cache.data.clear(); } } - self.materialize_cache.data.update_epoch(b.epoch.curr); + // self.materialize_cache.data.update_epoch(b.epoch.curr); Message::Barrier(b) } } @@ -296,7 +304,12 @@ impl MaterializeExecutor { state_table, arrange_key_indices: arrange_columns.clone(), actor_context: ActorContext::for_test(0), - materialize_cache: MaterializeCache::new(watermark_epoch, MetricsInfo::for_test()), + materialize_cache: MaterializeCache::new( + watermark_epoch, + MetricsInfo::for_test(), + Arc::new(AtomicSequence::new(0)), + Arc::new(AtomicSequence::new(0)), + ), conflict_behavior, } } @@ -448,9 +461,18 @@ pub struct MaterializeCache { type CacheValue = Option; impl MaterializeCache { - pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { - let cache: ManagedLruCache, CacheValue> = - new_unbounded(watermark_epoch, metrics_info.clone()); + pub fn new( + watermark_epoch: AtomicU64Ref, + metrics_info: MetricsInfo, + latest_sequence: Arc, + evict_sequence: Arc, + ) -> Self { + let cache: ManagedLruCache, CacheValue> = ManagedLruCache::unbounded( + watermark_epoch, + metrics_info.clone(), + latest_sequence, + evict_sequence, + ); Self { data: cache, metrics_info, @@ -508,10 +530,10 @@ impl MaterializeCache { if update_cache { match conflict_behavior { ConflictBehavior::Overwrite => { - self.data.push(key, Some(CompactedRow { row: value })); + self.data.put(key, Some(CompactedRow { row: value })); } ConflictBehavior::IgnoreConflict => { - self.data.push(key, Some(CompactedRow { row: value })); + self.data.put(key, Some(CompactedRow { row: value })); } _ => unreachable!(), } @@ -542,7 +564,7 @@ impl MaterializeCache { }; if update_cache { - self.data.push(key, None); + self.data.put(key, None); } } } @@ -581,8 +603,8 @@ impl MaterializeCache { while let Some(result) = buffered.next().await { let (key, value) = result; match conflict_behavior { - ConflictBehavior::Overwrite => self.data.push(key, value?), - ConflictBehavior::IgnoreConflict => self.data.push(key, value?), + ConflictBehavior::Overwrite => self.data.put(key, value?), + ConflictBehavior::IgnoreConflict => self.data.put(key, value?), _ => unreachable!(), }; } diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index 16fe77cb64ebc..44cfdac15e808 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -14,6 +14,7 @@ use std::marker::PhantomData; use std::ops::Bound; +use std::sync::Arc; use futures::StreamExt; use futures_async_stream::{for_await, try_stream}; @@ -21,6 +22,7 @@ use itertools::Itertools; use risingwave_common::array::stream_record::Record; use risingwave_common::array::{ArrayRef, Op, StreamChunk}; use risingwave_common::catalog::Schema; +use risingwave_common::lru::AtomicSequence; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{ToDatumRef, ToOwnedDatum}; use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast}; @@ -35,7 +37,7 @@ use risingwave_expr::window_function::{ use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::executor::{ @@ -110,6 +112,8 @@ struct ExecutorInner { state_table: StateTable, state_table_schema_len: usize, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, } struct ExecutionVars { @@ -134,6 +138,8 @@ pub struct EowcOverWindowExecutorArgs { pub order_key_index: usize, pub state_table: StateTable, pub watermark_epoch: AtomicU64Ref, + pub latest_sequence: Arc, + pub evict_sequence: Arc, } impl EowcOverWindowExecutor { @@ -152,6 +158,8 @@ impl EowcOverWindowExecutor { state_table: args.state_table, state_table_schema_len: input_info.schema.len(), watermark_epoch: args.watermark_epoch, + latest_sequence: args.latest_sequence, + evict_sequence: args.evict_sequence, }, } } @@ -350,14 +358,19 @@ impl EowcOverWindowExecutor { ); let mut vars = ExecutionVars { - partitions: new_unbounded(this.watermark_epoch.clone(), metrics_info), + partitions: ManagedLruCache::unbounded( + this.watermark_epoch.clone(), + metrics_info, + this.latest_sequence.clone(), + this.evict_sequence.clone(), + ), _phantom: PhantomData::, }; let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; this.state_table.init_epoch(barrier.epoch); - vars.partitions.update_epoch(barrier.epoch.curr); + // vars.partitions.update_epoch(barrier.epoch.curr); yield Message::Barrier(barrier); @@ -387,7 +400,7 @@ impl EowcOverWindowExecutor { } } - vars.partitions.update_epoch(barrier.epoch.curr); + // vars.partitions.update_epoch(barrier.epoch.curr); yield Message::Barrier(barrier); } diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index a7245c57f368c..9efd453559637 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -24,6 +24,7 @@ use itertools::Itertools; use risingwave_common::array::stream_record::Record; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; +use risingwave_common::lru::AtomicSequence; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy; use risingwave_common::types::{DataType, DefaultOrdered}; @@ -40,7 +41,7 @@ use super::over_partition::{ new_empty_partition_cache, shrink_partition_cache, CacheKey, OverPartition, PartitionCache, PartitionDelta, }; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::common::StreamChunkBuilder; @@ -77,6 +78,8 @@ struct ExecutorInner { state_table: StateTable, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, metrics: Arc, /// The maximum size of the chunk produced by executor at a time. @@ -145,6 +148,8 @@ pub struct OverWindowExecutorArgs { pub state_table: StateTable, pub watermark_epoch: AtomicU64Ref, + pub latest_sequence: Arc, + pub evict_sequence: Arc, pub metrics: Arc, pub chunk_size: usize, @@ -195,6 +200,8 @@ impl OverWindowExecutor { state_key_to_table_sub_pk_proj, state_table: args.state_table, watermark_epoch: args.watermark_epoch, + latest_sequence: args.latest_sequence, + evict_sequence: args.evict_sequence, metrics: args.metrics, chunk_size: args.chunk_size, cache_policy, @@ -610,7 +617,12 @@ impl OverWindowExecutor { ); let mut vars = ExecutionVars { - cached_partitions: new_unbounded(this.watermark_epoch.clone(), metrics_info), + cached_partitions: ManagedLruCache::unbounded( + this.watermark_epoch.clone(), + metrics_info, + this.latest_sequence.clone(), + this.evict_sequence.clone(), + ), recently_accessed_ranges: Default::default(), stats: Default::default(), _phantom: PhantomData::, @@ -619,7 +631,7 @@ impl OverWindowExecutor { let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; this.state_table.init_epoch(barrier.epoch); - vars.cached_partitions.update_epoch(barrier.epoch.curr); + // vars.cached_partitions.update_epoch(barrier.epoch.curr); yield Message::Barrier(barrier); @@ -688,7 +700,7 @@ impl OverWindowExecutor { } } - vars.cached_partitions.update_epoch(barrier.epoch.curr); + // vars.cached_partitions.update_epoch(barrier.epoch.curr); yield Message::Barrier(barrier); } diff --git a/src/stream/src/executor/simple_agg.rs b/src/stream/src/executor/simple_agg.rs index 890754f33462f..ee1383435afa3 100644 --- a/src/stream/src/executor/simple_agg.rs +++ b/src/stream/src/executor/simple_agg.rs @@ -16,6 +16,7 @@ use futures::StreamExt; use futures_async_stream::try_stream; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; +use risingwave_common::lru::AtomicSequence; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::aggregate::{build_retractable, AggCall, BoxedAggregateFunction}; use risingwave_pb::stream_plan::PbAggNodeVersion; @@ -88,6 +89,9 @@ struct ExecutorInner { /// Watermark epoch. watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, + /// Extreme state cache size extreme_cache_size: usize, } @@ -135,6 +139,8 @@ impl SimpleAggExecutor { intermediate_state_table: args.intermediate_state_table, distinct_dedup_tables: args.distinct_dedup_tables, watermark_epoch: args.watermark_epoch, + latest_sequence: args.latest_sequence, + evict_sequence: args.evict_sequence, extreme_cache_size: args.extreme_cache_size, }, }) @@ -239,15 +245,17 @@ impl SimpleAggExecutor { table.init_epoch(barrier.epoch); }); - let mut distinct_dedup = DistinctDeduplicater::new( + let distinct_dedup = DistinctDeduplicater::new( &this.agg_calls, this.watermark_epoch.clone(), + this.latest_sequence.clone(), + this.evict_sequence.clone(), &this.distinct_dedup_tables, this.actor_ctx.clone(), ); - distinct_dedup.dedup_caches_mut().for_each(|cache| { - cache.update_epoch(barrier.epoch.curr); - }); + // distinct_dedup.dedup_caches_mut().for_each(|cache| { + // cache.update_epoch(barrier.epoch.curr); + // }); yield Message::Barrier(barrier); @@ -285,9 +293,9 @@ impl SimpleAggExecutor { { yield Message::Chunk(chunk); } - vars.distinct_dedup.dedup_caches_mut().for_each(|cache| { - cache.update_epoch(barrier.epoch.curr); - }); + // vars.distinct_dedup.dedup_caches_mut().for_each(|cache| { + // cache.update_epoch(barrier.epoch.curr); + // }); yield Message::Barrier(barrier); } } diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 1d374189cb5e0..bc914b68c8ca1 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -27,6 +27,7 @@ use lru::DefaultHasher; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::hash::{HashKey, NullBitmap}; +use risingwave_common::lru::AtomicSequence; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqDebug; @@ -43,7 +44,7 @@ use super::{ Barrier, Execute, ExecutorInfo, Message, MessageStream, StreamExecutorError, StreamExecutorResult, }; -use crate::cache::{cache_may_stale, new_with_hasher_in, ManagedLruCache}; +use crate::cache::{cache_may_stale, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::executor::join::builder::JoinStreamChunkBuilder; use crate::executor::monitor::StreamingMetrics; @@ -489,6 +490,8 @@ impl TemporalJoinExecutor table_output_indices: Vec, table_stream_key_indices: Vec, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, metrics: Arc, chunk_size: usize, join_key_data_types: Vec, @@ -502,9 +505,11 @@ impl TemporalJoinExecutor "temporal join", ); - let cache = new_with_hasher_in( + let cache = ManagedLruCache::unbounded_with_hasher_in( watermark_epoch, metrics_info, + latest_sequence, + evict_sequence, DefaultHasher::default(), alloc, ); @@ -696,7 +701,7 @@ impl TemporalJoinExecutor self.right_table.cache.clear(); } } - self.right_table.cache.update_epoch(barrier.epoch.curr); + // self.right_table.cache.update_epoch(barrier.epoch.curr); self.right_table.update( updates, &self.right_join_keys, diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 785384307ecf4..159c7b58695db 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -275,6 +275,7 @@ pub mod agg_executor { use futures::future; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; use risingwave_common::hash::SerializedKey; + use risingwave_common::lru::AtomicSequence; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_expr::aggregate::{AggCall, AggKind}; @@ -491,6 +492,8 @@ pub mod agg_executor { intermediate_state_table, distinct_dedup_tables: Default::default(), watermark_epoch: Arc::new(AtomicU64::new(0)), + latest_sequence: Arc::new(AtomicSequence::new(0)), + evict_sequence: Arc::new(AtomicSequence::new(0)), extra: HashAggExecutorExtraArgs { group_key_indices, @@ -558,6 +561,8 @@ pub mod agg_executor { intermediate_state_table, distinct_dedup_tables: Default::default(), watermark_epoch: Arc::new(AtomicU64::new(0)), + latest_sequence: Arc::new(AtomicSequence::new(0)), + evict_sequence: Arc::new(AtomicSequence::new(0)), extra: SimpleAggExecutorExtraArgs {}, }) .unwrap(); diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 4f8e3da165994..b8d4565308643 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -19,6 +19,7 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::hash::HashKey; +use risingwave_common::lru::AtomicSequence; use risingwave_common::row::RowExt; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; @@ -28,7 +29,7 @@ use risingwave_storage::StateStore; use super::top_n_cache::TopNCacheTrait; use super::utils::*; use super::{ManagedTopNState, TopNCache}; -use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::cache::ManagedLruCache; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::error::StreamResult; @@ -51,6 +52,8 @@ impl GroupTopNExecutor, state_table: StateTable, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, ) -> StreamResult { Ok(TopNExecutorWrapper { input, @@ -63,6 +66,8 @@ impl GroupTopNExecutor InnerGroupTopNExecutor, state_table: StateTable, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, ctx: ActorContextRef, ) -> StreamResult { let metrics_info = MetricsInfo::new( @@ -124,7 +131,12 @@ impl InnerGroupTopNExecutor { } impl GroupTopNCache { - pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { - let cache = new_unbounded(watermark_epoch, metrics_info); + pub fn new( + watermark_epoch: AtomicU64Ref, + metrics_info: MetricsInfo, + latest_sequence: Arc, + evict_sequence: Arc, + ) -> Self { + let cache = ManagedLruCache::unbounded( + watermark_epoch, + metrics_info, + latest_sequence, + evict_sequence, + ); Self { data: cache } } } @@ -195,7 +217,7 @@ where self.managed_state .init_topn_cache(Some(group_key), &mut topn_cache) .await?; - self.caches.push(group_cache_key.clone(), topn_cache); + self.caches.put(group_cache_key.clone(), topn_cache); } let mut cache = self.caches.get_mut(group_cache_key).unwrap(); @@ -238,8 +260,8 @@ where self.managed_state.try_flush().await } - fn update_epoch(&mut self, epoch: u64) { - self.caches.update_epoch(epoch); + fn update_epoch(&mut self, _epoch: u64) { + // self.caches.update_epoch(epoch); } fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) { @@ -393,6 +415,8 @@ mod tests { vec![1], state_table, Arc::new(AtomicU64::new(0)), + Arc::new(AtomicSequence::new(0)), + Arc::new(AtomicSequence::new(0)), ) .unwrap(); let mut top_n_executor = top_n_executor.boxed().execute(); @@ -489,6 +513,8 @@ mod tests { vec![1], state_table, Arc::new(AtomicU64::new(0)), + Arc::new(AtomicSequence::new(0)), + Arc::new(AtomicSequence::new(0)), ) .unwrap(); let mut top_n_executor = top_n_executor.boxed().execute(); @@ -578,6 +604,8 @@ mod tests { vec![1, 2], state_table, Arc::new(AtomicU64::new(0)), + Arc::new(AtomicSequence::new(0)), + Arc::new(AtomicSequence::new(0)), ) .unwrap(); let mut top_n_executor = top_n_executor.boxed().execute(); diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index 346d73fed0196..fb5b820bffd1d 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -18,6 +18,7 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::hash::HashKey; +use risingwave_common::lru::AtomicSequence; use risingwave_common::row::{RowDeserializer, RowExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; @@ -55,6 +56,8 @@ impl group_by: Vec, state_table: StateTable, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, ) -> StreamResult { Ok(TopNExecutorWrapper { input, @@ -67,6 +70,8 @@ impl group_by, state_table, watermark_epoch, + latest_sequence, + evict_sequence, ctx, )?, }) @@ -111,6 +116,8 @@ impl group_by: Vec, state_table: StateTable, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, ctx: ActorContextRef, ) -> StreamResult { let metrics_info = MetricsInfo::new( @@ -130,7 +137,12 @@ impl managed_state, storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(), group_by, - caches: GroupTopNCache::new(watermark_epoch, metrics_info), + caches: GroupTopNCache::new( + watermark_epoch, + metrics_info, + latest_sequence, + evict_sequence, + ), cache_key_serde, ctx, }) @@ -178,7 +190,7 @@ where self.managed_state .init_topn_cache(Some(group_key), &mut topn_cache) .await?; - self.caches.push(group_cache_key.clone(), topn_cache); + self.caches.put(group_cache_key.clone(), topn_cache); } let mut cache = self.caches.get_mut(group_cache_key).unwrap(); @@ -219,8 +231,8 @@ where self.caches.evict() } - fn update_epoch(&mut self, epoch: u64) { - self.caches.update_epoch(epoch) + fn update_epoch(&mut self, _epoch: u64) { + // self.caches.update_epoch(epoch) } async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { diff --git a/src/stream/src/from_proto/append_only_dedup.rs b/src/stream/src/from_proto/append_only_dedup.rs index 020d13f3ff96b..a97a55e3d4715 100644 --- a/src/stream/src/from_proto/append_only_dedup.rs +++ b/src/stream/src/from_proto/append_only_dedup.rs @@ -43,6 +43,8 @@ impl ExecutorBuilder for AppendOnlyDedupExecutorBuilder { params.info.pk_indices.clone(), /* TODO(rc): should change to use `dedup_column_indices`, but need to check backward compatibility */ state_table, params.watermark_epoch, + params.latest_sequence, + params.evict_sequence, params.executor_stats.clone(), ); Ok((params.info, exec).into()) diff --git a/src/stream/src/from_proto/eowc_over_window.rs b/src/stream/src/from_proto/eowc_over_window.rs index 4f6e873d7bcf3..125e4ca78c18c 100644 --- a/src/stream/src/from_proto/eowc_over_window.rs +++ b/src/stream/src/from_proto/eowc_over_window.rs @@ -65,6 +65,8 @@ impl ExecutorBuilder for EowcOverWindowExecutorBuilder { order_key_index, state_table, watermark_epoch: params.watermark_epoch, + latest_sequence: params.latest_sequence, + evict_sequence: params.evict_sequence, }); Ok((params.info, exec).into()) } diff --git a/src/stream/src/from_proto/group_top_n.rs b/src/stream/src/from_proto/group_top_n.rs index 51512cb9d94d3..7597772334f30 100644 --- a/src/stream/src/from_proto/group_top_n.rs +++ b/src/stream/src/from_proto/group_top_n.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use risingwave_common::catalog::Schema; use risingwave_common::hash::{HashKey, HashKeyDispatcher}; +use risingwave_common::lru::AtomicSequence; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::stream_plan::GroupTopNNode; @@ -69,6 +70,8 @@ impl ExecutorBuilder for GroupTopNExecutorBuilder { group_by: Vec, state_table: StateTable, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, group_key_types: Vec, with_ties: bool, @@ -110,6 +115,8 @@ impl HashKeyDispatcher for GroupTopNExecutorDispatcherArgs { self.group_by, self.state_table, self.watermark_epoch, + self.latest_sequence, + self.evict_sequence, )? .boxed()) }; diff --git a/src/stream/src/from_proto/hash_agg.rs b/src/stream/src/from_proto/hash_agg.rs index 3331640519617..d36b0e329e6e6 100644 --- a/src/stream/src/from_proto/hash_agg.rs +++ b/src/stream/src/from_proto/hash_agg.rs @@ -109,6 +109,8 @@ impl ExecutorBuilder for HashAggExecutorBuilder { intermediate_state_table, distinct_dedup_tables, watermark_epoch: params.watermark_epoch, + latest_sequence: params.latest_sequence, + evict_sequence: params.evict_sequence, extra: HashAggExecutorExtraArgs { group_key_indices, chunk_size: params.env.config().developer.chunk_size, diff --git a/src/stream/src/from_proto/hash_join.rs b/src/stream/src/from_proto/hash_join.rs index ad69fd1ecd5ac..bf6a6c2566e0d 100644 --- a/src/stream/src/from_proto/hash_join.rs +++ b/src/stream/src/from_proto/hash_join.rs @@ -16,6 +16,7 @@ use std::cmp::min; use std::sync::Arc; use risingwave_common::hash::{HashKey, HashKeyDispatcher}; +use risingwave_common::lru::AtomicSequence; use risingwave_common::types::DataType; use risingwave_expr::expr::{ build_func_non_strict, build_non_strict_from_prost, InputRefExpression, NonStrictExpression, @@ -150,6 +151,8 @@ impl ExecutorBuilder for HashJoinExecutorBuilder { state_table_r, degree_state_table_r, lru_manager: params.watermark_epoch, + latest_sequence: params.latest_sequence, + evict_sequence: params.evict_sequence, is_append_only, metrics: params.executor_stats, join_type_proto: node.get_join_type()?, @@ -178,6 +181,8 @@ struct HashJoinExecutorDispatcherArgs { state_table_r: StateTable, degree_state_table_r: StateTable, lru_manager: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, is_append_only: bool, metrics: Arc, join_type_proto: JoinTypeProto, @@ -208,6 +213,8 @@ impl HashKeyDispatcher for HashJoinExecutorDispatcherArgs { self.state_table_r, self.degree_state_table_r, self.lru_manager, + self.latest_sequence, + self.evict_sequence, self.is_append_only, self.metrics, self.chunk_size, diff --git a/src/stream/src/from_proto/lookup.rs b/src/stream/src/from_proto/lookup.rs index dc7f7e3c49dfe..4e70539058427 100644 --- a/src/stream/src/from_proto/lookup.rs +++ b/src/stream/src/from_proto/lookup.rs @@ -82,6 +82,8 @@ impl ExecutorBuilder for LookupExecutorBuilder { column_mapping: lookup.column_mapping.iter().map(|x| *x as usize).collect(), storage_table, watermark_epoch: params.watermark_epoch, + latest_sequence: params.latest_sequence, + evict_sequence: params.evict_sequence, chunk_size: params.env.config().developer.chunk_size, }); Ok((params.info, exec).into()) diff --git a/src/stream/src/from_proto/mview.rs b/src/stream/src/from_proto/mview.rs index 24b0c9d534578..73561efc1df30 100644 --- a/src/stream/src/from_proto/mview.rs +++ b/src/stream/src/from_proto/mview.rs @@ -58,6 +58,8 @@ impl ExecutorBuilder for MaterializeExecutorBuilder { params.vnode_bitmap.map(Arc::new), table, params.watermark_epoch, + params.latest_sequence, + params.evict_sequence, conflict_behavior, params.executor_stats.clone(), ) @@ -111,6 +113,8 @@ impl ExecutorBuilder for ArrangeExecutorBuilder { vnodes, table, params.watermark_epoch, + params.latest_sequence, + params.evict_sequence, conflict_behavior, params.executor_stats.clone(), ) diff --git a/src/stream/src/from_proto/over_window.rs b/src/stream/src/from_proto/over_window.rs index f7ca73c183a81..a9ea9c25e5d0c 100644 --- a/src/stream/src/from_proto/over_window.rs +++ b/src/stream/src/from_proto/over_window.rs @@ -73,6 +73,8 @@ impl ExecutorBuilder for OverWindowExecutorBuilder { state_table, watermark_epoch: params.watermark_epoch, + latest_sequence: params.latest_sequence, + evict_sequence: params.evict_sequence, metrics: params.executor_stats, chunk_size: params.env.config().developer.chunk_size, diff --git a/src/stream/src/from_proto/simple_agg.rs b/src/stream/src/from_proto/simple_agg.rs index 16809edb8bcaf..97d383f15d9c0 100644 --- a/src/stream/src/from_proto/simple_agg.rs +++ b/src/stream/src/from_proto/simple_agg.rs @@ -70,6 +70,8 @@ impl ExecutorBuilder for SimpleAggExecutorBuilder { intermediate_state_table, distinct_dedup_tables, watermark_epoch: params.watermark_epoch, + latest_sequence: params.latest_sequence, + evict_sequence: params.evict_sequence, extra: SimpleAggExecutorExtraArgs {}, })?; diff --git a/src/stream/src/from_proto/temporal_join.rs b/src/stream/src/from_proto/temporal_join.rs index 15badec97e5cc..085496d94abf7 100644 --- a/src/stream/src/from_proto/temporal_join.rs +++ b/src/stream/src/from_proto/temporal_join.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use risingwave_common::catalog::ColumnId; use risingwave_common::hash::{HashKey, HashKeyDispatcher}; +use risingwave_common::lru::AtomicSequence; use risingwave_common::types::DataType; use risingwave_expr::expr::{build_non_strict_from_prost, NonStrictExpression}; use risingwave_pb::plan_common::{JoinType as JoinTypeProto, StorageTableDesc}; @@ -113,6 +114,8 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder { table_output_indices, table_stream_key_indices, watermark_epoch: params.watermark_epoch, + latest_sequence: params.latest_sequence, + evict_sequence: params.evict_sequence, chunk_size: params.env.config().developer.chunk_size, metrics: params.executor_stats, join_type_proto: node.get_join_type()?, @@ -137,6 +140,8 @@ struct TemporalJoinExecutorDispatcherArgs { table_output_indices: Vec, table_stream_key_indices: Vec, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, chunk_size: usize, metrics: Arc, join_type_proto: JoinTypeProto, @@ -168,6 +173,8 @@ impl HashKeyDispatcher for TemporalJoinExecutorDispatcherArgs self.table_output_indices, self.table_stream_key_indices, self.watermark_epoch, + self.latest_sequence, + self.evict_sequence, self.metrics, self.chunk_size, self.join_key_data_types, diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 65fd141ac376c..166fe34a5ecdc 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -45,6 +45,7 @@ mod progress; mod tests; pub use progress::CreateMviewProgress; +use risingwave_common::lru::AtomicSequence; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::LocalSstableInfo; @@ -285,6 +286,8 @@ pub(crate) struct StreamActorManager { /// Watermark epoch number. pub(super) watermark_epoch: AtomicU64Ref, + pub(super) latest_sequence: Arc, + pub(super) evict_sequence: Arc, /// Manages the await-trees of all actors. pub(super) await_tree_reg: Option>>>, @@ -741,12 +744,15 @@ pub struct LocalBarrierManager { impl LocalBarrierWorker { /// Create a [`LocalBarrierWorker`] with managed mode. + #[expect(clippy::too_many_arguments)] pub fn spawn( env: StreamEnvironment, streaming_metrics: Arc, await_tree_reg: Option>>>, barrier_await_tree_reg: Option>>>, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, actor_op_rx: UnboundedReceiver, ) -> JoinHandle<()> { let runtime = { @@ -765,6 +771,8 @@ impl LocalBarrierWorker { env: env.clone(), streaming_metrics, watermark_epoch, + latest_sequence, + evict_sequence, await_tree_reg, runtime: runtime.into(), }); @@ -872,6 +880,8 @@ impl LocalBarrierManager { None, None, Arc::new(AtomicU64::new(0)), + Arc::new(AtomicSequence::new(0)), + Arc::new(AtomicSequence::new(0)), rx, ); EventSender(tx) diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 4d4b510ee8ef1..19002c5c6d33b 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -30,6 +30,7 @@ use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::config::MetricLevel; +use risingwave_common::lru::AtomicSequence; use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan; use risingwave_pb::stream_plan::stream_node::NodeBody; @@ -136,6 +137,9 @@ pub struct ExecutorParams { /// `watermark_epoch` field in `MemoryManager` pub watermark_epoch: AtomicU64Ref, + pub latest_sequence: Arc, + pub evict_sequence: Arc, + pub shared_context: Arc, pub local_barrier_manager: LocalBarrierManager, @@ -162,6 +166,8 @@ impl LocalStreamManager { streaming_metrics: Arc, await_tree_config: Option, watermark_epoch: AtomicU64Ref, + latest_sequence: Arc, + evict_sequence: Arc, ) -> Self { let await_tree_reg = await_tree_config .clone() @@ -177,6 +183,8 @@ impl LocalStreamManager { await_tree_reg.clone(), barrier_await_tree_reg.clone(), watermark_epoch, + latest_sequence, + evict_sequence, actor_op_rx, ); Self { @@ -499,6 +507,8 @@ impl StreamActorManager { vnode_bitmap, eval_error_report, watermark_epoch: self.watermark_epoch.clone(), + latest_sequence: self.latest_sequence.clone(), + evict_sequence: self.evict_sequence.clone(), shared_context: shared_context.clone(), local_barrier_manager: shared_context.local_barrier_manager.clone(), create_actor_context: create_actor_context.clone(), diff --git a/src/stream/tests/integration_tests/eowc_over_window.rs b/src/stream/tests/integration_tests/eowc_over_window.rs index cc674e556ab5e..82d80a6109d3a 100644 --- a/src/stream/tests/integration_tests/eowc_over_window.rs +++ b/src/stream/tests/integration_tests/eowc_over_window.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::lru::AtomicSequence; use risingwave_expr::aggregate::{AggArgs, AggKind}; use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncCall, WindowFuncKind}; use risingwave_stream::executor::{EowcOverWindowExecutor, EowcOverWindowExecutorArgs}; @@ -75,6 +76,8 @@ async fn create_executor( order_key_index, state_table, watermark_epoch: Arc::new(AtomicU64::new(0)), + latest_sequence: Arc::new(AtomicSequence::new(0)), + evict_sequence: Arc::new(AtomicSequence::new(0)), }); (tx, executor.boxed().execute()) } diff --git a/src/stream/tests/integration_tests/over_window.rs b/src/stream/tests/integration_tests/over_window.rs index 0be8e1848e9cd..b33ced03e149a 100644 --- a/src/stream/tests/integration_tests/over_window.rs +++ b/src/stream/tests/integration_tests/over_window.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::lru::AtomicSequence; use risingwave_common::session_config::OverWindowCachePolicy; use risingwave_expr::aggregate::{AggArgs, AggKind}; use risingwave_expr::window_function::{ @@ -87,6 +88,8 @@ async fn create_executor( order_key_order_types, state_table, watermark_epoch: Arc::new(AtomicU64::new(0)), + latest_sequence: Arc::new(AtomicSequence::new(0)), + evict_sequence: Arc::new(AtomicSequence::new(0)), metrics: Arc::new(StreamingMetrics::unused()), chunk_size: 1024, cache_policy: OverWindowCachePolicy::Recent,