diff --git a/src/common/estimate_size/src/collections/btreemap.rs b/src/common/estimate_size/src/collections/btreemap.rs index 31b66cf95812e..f48a78715f692 100644 --- a/src/common/estimate_size/src/collections/btreemap.rs +++ b/src/common/estimate_size/src/collections/btreemap.rs @@ -18,6 +18,7 @@ use std::ops::{Bound, RangeInclusive}; use crate::{EstimateSize, KvSize}; +#[derive(Clone)] pub struct EstimatedBTreeMap { inner: BTreeMap, heap_size: KvSize, @@ -42,40 +43,81 @@ impl EstimatedBTreeMap { pub fn is_empty(&self) -> bool { self.inner.is_empty() } -} -impl Default for EstimatedBTreeMap { - fn default() -> Self { - Self::new() + pub fn iter(&self) -> impl Iterator { + self.inner.iter() + } + + pub fn range(&self, range: R) -> std::collections::btree_map::Range<'_, K, V> + where + K: Ord, + R: std::ops::RangeBounds, + { + self.inner.range(range) + } + + pub fn values(&self) -> impl Iterator { + self.inner.values() } } impl EstimatedBTreeMap where - K: EstimateSize + Ord, - V: EstimateSize, + K: Ord, { pub fn first_key_value(&self) -> Option<(&K, &V)> { self.inner.first_key_value() } + pub fn first_key(&self) -> Option<&K> { + self.first_key_value().map(|(k, _)| k) + } + + pub fn first_value(&self) -> Option<&V> { + self.first_key_value().map(|(_, v)| v) + } + pub fn last_key_value(&self) -> Option<(&K, &V)> { self.inner.last_key_value() } - pub fn insert(&mut self, key: K, value: V) { + pub fn last_key(&self) -> Option<&K> { + self.last_key_value().map(|(k, _)| k) + } + + pub fn last_value(&self) -> Option<&V> { + self.last_key_value().map(|(_, v)| v) + } +} + +impl Default for EstimatedBTreeMap { + fn default() -> Self { + Self::new() + } +} + +impl EstimatedBTreeMap +where + K: EstimateSize + Ord, + V: EstimateSize, +{ + pub fn insert(&mut self, key: K, value: V) -> Option { let key_size = self.heap_size.add_val(&key); self.heap_size.add_val(&value); - if let Some(old_value) = self.inner.insert(key, value) { + let old_value = self.inner.insert(key, value); + if let Some(old_value) = &old_value { self.heap_size.sub_size(key_size); - self.heap_size.sub_val(&old_value); + self.heap_size.sub_val(old_value); } + old_value } - pub fn remove(&mut self, key: &K) { - if let Some(value) = self.inner.remove(key) { - self.heap_size.sub(key, &value); + pub fn remove(&mut self, key: &K) -> Option { + let old_value = self.inner.remove(key); + if let Some(old_value) = &old_value { + self.heap_size.sub(key, old_value); } + old_value } pub fn clear(&mut self) { @@ -102,17 +144,6 @@ where }) } - pub fn iter(&self) -> impl Iterator { - self.inner.iter() - } - - pub fn range(&self, range: R) -> std::collections::btree_map::Range<'_, K, V> - where - R: std::ops::RangeBounds, - { - self.inner.range(range) - } - /// Retain the given range of entries in the map, removing others. pub fn retain_range(&mut self, range: RangeInclusive<&K>) -> (BTreeMap, BTreeMap) where diff --git a/src/stream/src/common/cache/mod.rs b/src/stream/src/common/cache/mod.rs deleted file mode 100644 index 27481541492e0..0000000000000 --- a/src/stream/src/common/cache/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod state_cache; -mod top_n_cache; - -pub use state_cache::*; -pub use top_n_cache::*; diff --git a/src/stream/src/common/cache/top_n_cache.rs b/src/stream/src/common/cache/top_n_cache.rs deleted file mode 100644 index e6e65bacea5fe..0000000000000 --- a/src/stream/src/common/cache/top_n_cache.rs +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; - -use risingwave_common_estimate_size::{EstimateSize, KvSize}; - -/// Inner top-N cache structure for [`super::TopNStateCache`]. -#[derive(Clone)] -pub struct TopNCache { - /// The capacity of the cache. - capacity: usize, - /// Ordered cache entries. - entries: BTreeMap, - kv_heap_size: KvSize, -} - -impl EstimateSize for TopNCache { - fn estimated_heap_size(&self) -> usize { - // TODO: Add btreemap internal size. - // https://github.com/risingwavelabs/risingwave/issues/9713 - self.kv_heap_size.size() - } -} - -impl TopNCache { - /// Create a new cache with specified capacity and order requirements. - /// To create a cache with unlimited capacity, use `usize::MAX` for `capacity`. - pub fn new(capacity: usize) -> Self { - Self { - capacity, - entries: Default::default(), - kv_heap_size: KvSize::new(), - } - } - - /// Get the capacity of the cache. - pub fn capacity(&self) -> usize { - self.capacity - } - - /// Get the number of entries in the cache. - #[allow(dead_code)] - pub fn len(&self) -> usize { - self.entries.len() - } - - /// Check if the cache is empty. - pub fn is_empty(&self) -> bool { - self.entries.is_empty() - } - - /// Clear the cache. - pub fn clear(&mut self) { - self.entries.clear(); - self.kv_heap_size.set(0); - } - - /// Insert an entry into the cache. - pub fn insert(&mut self, key: K, value: V) -> Option { - let key_size = self.kv_heap_size.add_val(&key); - self.kv_heap_size.add_val(&value); - let old_val = self.entries.insert(key, value); - if let Some(old_val) = &old_val { - self.kv_heap_size.sub_size(key_size); - self.kv_heap_size.sub_val(old_val); - } - // evict if capacity is reached - while self.entries.len() > self.capacity { - if let Some((key, val)) = self.entries.pop_last() { - self.kv_heap_size.sub(&key, &val); - } - } - old_val - } - - /// Remove an entry from the cache. - pub fn remove(&mut self, key: &K) -> Option { - let old_val = self.entries.remove(key); - if let Some(val) = &old_val { - self.kv_heap_size.sub(key, val); - } - old_val - } - - /// Get the first (smallest) key-value pair in the cache. - pub fn first_key_value(&self) -> Option<(&K, &V)> { - self.entries.first_key_value() - } - - /// Get the first (smallest) key in the cache. - pub fn first_key(&self) -> Option<&K> { - self.first_key_value().map(|(k, _)| k) - } - - /// Get the last (largest) key-value pair in the cache. - pub fn last_key_value(&self) -> Option<(&K, &V)> { - self.entries.last_key_value() - } - - /// Get the last (largest) key in the cache. - pub fn last_key(&self) -> Option<&K> { - self.last_key_value().map(|(k, _)| k) - } - - /// Iterate over the values in the cache. - pub fn values(&self) -> impl Iterator { - self.entries.values() - } -} - -#[cfg(test)] -mod tests { - use itertools::Itertools; - - use super::*; - - #[test] - fn test_top_n_cache() { - let mut cache = TopNCache::new(3); - assert_eq!(cache.capacity(), 3); - assert_eq!(cache.len(), 0); - assert!(cache.is_empty()); - assert!(cache.first_key_value().is_none()); - assert!(cache.first_key().is_none()); - assert!(cache.last_key_value().is_none()); - assert!(cache.last_key().is_none()); - assert!(cache.values().collect_vec().is_empty()); - - let old_val = cache.insert(5, "hello".to_string()); - assert!(old_val.is_none()); - assert_eq!(cache.len(), 1); - assert!(!cache.is_empty()); - assert_eq!(cache.values().collect_vec(), vec!["hello"]); - - cache.insert(3, "world".to_string()); - cache.insert(1, "risingwave!".to_string()); - assert_eq!(cache.len(), 3); - assert_eq!( - cache.first_key_value(), - Some((&1, &"risingwave!".to_string())) - ); - assert_eq!(cache.first_key(), Some(&1)); - assert_eq!(cache.last_key_value(), Some((&5, &"hello".to_string()))); - assert_eq!(cache.last_key(), Some(&5)); - assert_eq!( - cache.values().collect_vec(), - vec!["risingwave!", "world", "hello"] - ); - - cache.insert(0, "foo".to_string()); - assert_eq!(cache.capacity(), 3); - assert_eq!(cache.len(), 3); - assert_eq!(cache.first_key(), Some(&0)); - assert_eq!(cache.last_key(), Some(&3)); - assert_eq!( - cache.values().collect_vec(), - vec!["foo", "risingwave!", "world"] - ); - - let old_val = cache.remove(&0); - assert_eq!(old_val, Some("foo".to_string())); - assert_eq!(cache.len(), 2); - assert_eq!(cache.first_key(), Some(&1)); - assert_eq!(cache.last_key(), Some(&3)); - cache.remove(&3); - assert_eq!(cache.len(), 1); - assert_eq!(cache.first_key(), Some(&1)); - assert_eq!(cache.last_key(), Some(&1)); - let old_val = cache.remove(&100); // can remove non-existing key - assert!(old_val.is_none()); - assert_eq!(cache.len(), 1); - - cache.clear(); - assert_eq!(cache.len(), 0); - assert_eq!(cache.capacity(), 3); - assert_eq!(cache.first_key(), None); - assert_eq!(cache.last_key(), None); - } -} diff --git a/src/stream/src/common/mod.rs b/src/stream/src/common/mod.rs index 9d79cecd2ec53..207201c4ff348 100644 --- a/src/stream/src/common/mod.rs +++ b/src/stream/src/common/mod.rs @@ -14,10 +14,10 @@ pub use column_mapping::*; -pub mod cache; mod column_mapping; pub mod compact_chunk; pub mod log_store_impl; pub mod metrics; pub mod rate_limit; +pub mod state_cache; pub mod table; diff --git a/src/stream/src/common/cache/state_cache/mod.rs b/src/stream/src/common/state_cache/mod.rs similarity index 99% rename from src/stream/src/common/cache/state_cache/mod.rs rename to src/stream/src/common/state_cache/mod.rs index ae3b1ad3069e4..7d547c26b0d88 100644 --- a/src/stream/src/common/cache/state_cache/mod.rs +++ b/src/stream/src/common/state_cache/mod.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use ordered::*; use risingwave_common::array::Op; use risingwave_common_estimate_size::EstimateSize; -pub use top_n::*; mod ordered; mod top_n; +pub use ordered::*; +pub use top_n::*; + /// A common interface for state table cache. pub trait StateCache: EstimateSize { type Key: Ord + EstimateSize; diff --git a/src/stream/src/common/cache/state_cache/ordered.rs b/src/stream/src/common/state_cache/ordered.rs similarity index 69% rename from src/stream/src/common/cache/state_cache/ordered.rs rename to src/stream/src/common/state_cache/ordered.rs index 535dbcb74e9a0..8a47d1b7ecafe 100644 --- a/src/stream/src/common/cache/state_cache/ordered.rs +++ b/src/stream/src/common/state_cache/ordered.rs @@ -12,55 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; - use risingwave_common::array::Op; -use risingwave_common_estimate_size::{EstimateSize, KvSize}; +use risingwave_common_estimate_size::collections::EstimatedBTreeMap; +use risingwave_common_estimate_size::EstimateSize; use super::{StateCache, StateCacheFiller}; -/// An implementation of [`StateCache`] that uses a [`BTreeMap`] as the underlying cache, with no -/// capacity limit. +/// An implementation of [`StateCache`] that keeps all entries in an ordered in-memory map. +#[derive(Clone, EstimateSize)] pub struct OrderedStateCache { - cache: BTreeMap, + cache: EstimatedBTreeMap, synced: bool, - kv_heap_size: KvSize, -} - -impl EstimateSize for OrderedStateCache { - fn estimated_heap_size(&self) -> usize { - // TODO: Add btreemap internal size - // https://github.com/risingwavelabs/risingwave/issues/9713 - self.kv_heap_size.size() - } } impl OrderedStateCache { pub fn new() -> Self { Self { - cache: BTreeMap::new(), + cache: Default::default(), synced: false, - kv_heap_size: KvSize::new(), - } - } - - fn insert_cache(&mut self, key: K, value: V) -> Option { - let key_size = self.kv_heap_size.add_val(&key); - self.kv_heap_size.add_val(&value); - let old_val = self.cache.insert(key, value); - if let Some(old_val) = &old_val { - self.kv_heap_size.sub_size(key_size); - self.kv_heap_size.sub_val(old_val); - } - old_val - } - - fn delete_cache(&mut self, key: &K) -> Option { - let old_val = self.cache.remove(key); - if let Some(old_val) = &old_val { - self.kv_heap_size.sub(key, old_val); } - old_val } } @@ -87,7 +57,7 @@ impl StateCache for OrderedStateCache Option { if self.synced { - self.insert_cache(key, value) + self.cache.insert(key, value) } else { None } @@ -95,7 +65,7 @@ impl StateCache for OrderedStateCache Option { if self.synced { - self.delete_cache(key) + self.cache.remove(key) } else { None } diff --git a/src/stream/src/common/cache/state_cache/top_n.rs b/src/stream/src/common/state_cache/top_n.rs similarity index 87% rename from src/stream/src/common/cache/state_cache/top_n.rs rename to src/stream/src/common/state_cache/top_n.rs index 4d2423f6c1e10..fef7017d75129 100644 --- a/src/stream/src/common/cache/state_cache/top_n.rs +++ b/src/stream/src/common/state_cache/top_n.rs @@ -13,17 +13,17 @@ // limitations under the License. use risingwave_common::array::Op; +use risingwave_common_estimate_size::collections::EstimatedBTreeMap; use risingwave_common_estimate_size::EstimateSize; use super::{StateCache, StateCacheFiller}; -use crate::common::cache::TopNCache; -/// An implementation of [`StateCache`] that uses a [`TopNCache`] as the underlying cache, with -/// limited capacity. +/// An implementation of [`StateCache`] that keeps a limited number of entries in an ordered in-memory map. #[derive(Clone, EstimateSize)] pub struct TopNStateCache { table_row_count: Option, - cache: TopNCache, + cache: EstimatedBTreeMap, + capacity: usize, synced: bool, } @@ -31,7 +31,8 @@ impl TopNStateCache { pub fn new(capacity: usize) -> Self { Self { table_row_count: None, - cache: TopNCache::new(capacity), + cache: Default::default(), + capacity, synced: false, } } @@ -39,7 +40,8 @@ impl TopNStateCache { pub fn with_table_row_count(capacity: usize, table_row_count: usize) -> Self { Self { table_row_count: Some(table_row_count), - cache: TopNCache::new(capacity), + cache: Default::default(), + capacity, synced: false, } } @@ -65,7 +67,12 @@ impl TopNStateCache { || self.cache.is_empty() || &key <= self.cache.last_key().unwrap() { - self.cache.insert(key, value) + let old_v = self.cache.insert(key, value); + // evict if capacity is reached + while self.cache.len() > self.capacity { + self.cache.pop_last(); + } + old_v } else { None }; @@ -87,8 +94,8 @@ impl TopNStateCache { old_val } - pub fn capacity_inner(&self) -> usize { - self.cache.capacity() + pub fn capacity(&self) -> usize { + self.capacity } pub fn len(&self) -> usize { @@ -170,7 +177,7 @@ impl StateCacheFiller for &mut TopNState type Value = V; fn capacity(&self) -> Option { - Some(self.capacity_inner()) + Some(TopNStateCache::capacity(self)) } fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value) { diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 1d1a5e2a6047f..e6fbacee009b3 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -64,7 +64,7 @@ use tracing::{trace, Instrument}; use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy}; use crate::cache::cache_may_stale; -use crate::common::cache::{StateCache, StateCacheFiller}; +use crate::common::state_cache::{StateCache, StateCacheFiller}; use crate::common::table::state_table_cache::StateTableWatermarkCache; use crate::executor::{StreamExecutorError, StreamExecutorResult}; diff --git a/src/stream/src/common/table/state_table_cache.rs b/src/stream/src/common/table/state_table_cache.rs index 585b51eae0606..86c32b404cb7e 100644 --- a/src/stream/src/common/table/state_table_cache.rs +++ b/src/stream/src/common/table/state_table_cache.rs @@ -17,7 +17,7 @@ use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DefaultOrdered, ScalarRefImpl}; use risingwave_common_estimate_size::EstimateSize; -use crate::common::cache::{StateCache, TopNStateCache}; +use crate::common::state_cache::{StateCache, TopNStateCache}; /// The watermark cache key is just an `OwnedRow` wrapped in `DefaultOrdered`. /// This is because we want to use the `DefaultOrdered` implementation of `Ord`. @@ -125,7 +125,7 @@ impl StateTableWatermarkCache { } pub fn capacity(&self) -> usize { - self.inner.capacity_inner() + self.inner.capacity() } pub fn len(&self) -> usize { @@ -186,7 +186,7 @@ mod tests { use risingwave_common::types::{Scalar, Timestamptz}; use super::*; - use crate::common::cache::StateCacheFiller; + use crate::common::state_cache::StateCacheFiller; /// With capacity 3, test the following sequence of inserts: /// Insert diff --git a/src/stream/src/executor/aggregation/agg_state_cache.rs b/src/stream/src/executor/aggregation/agg_state_cache.rs index 71cd9a45e3f1b..8a202ed6dd1dd 100644 --- a/src/stream/src/executor/aggregation/agg_state_cache.rs +++ b/src/stream/src/executor/aggregation/agg_state_cache.rs @@ -23,7 +23,7 @@ use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common_estimate_size::EstimateSize; use smallvec::SmallVec; -use crate::common::cache::{StateCache, StateCacheFiller}; +use crate::common::state_cache::{StateCache, StateCacheFiller}; /// Cache key type. type CacheKey = MemcmpEncoded; diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index c8bae26f6e1b3..db3364574d5da 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -31,7 +31,7 @@ use risingwave_storage::StateStore; use super::agg_state_cache::{AggStateCache, GenericAggStateCache}; use super::GroupKey; -use crate::common::cache::{OrderedStateCache, TopNStateCache}; +use crate::common::state_cache::{OrderedStateCache, TopNStateCache}; use crate::common::table::state_table::StateTable; use crate::common::StateTableColumnMapping; use crate::executor::{PkIndices, StreamExecutorResult}; diff --git a/src/stream/src/executor/sort_buffer.rs b/src/stream/src/executor/sort_buffer.rs index fc4f921e7ccbd..c2dfcea076843 100644 --- a/src/stream/src/executor/sort_buffer.rs +++ b/src/stream/src/executor/sort_buffer.rs @@ -34,7 +34,7 @@ use risingwave_storage::table::KeyedRow; use risingwave_storage::StateStore; use super::{StreamExecutorError, StreamExecutorResult}; -use crate::common::cache::{StateCache, StateCacheFiller, TopNStateCache}; +use crate::common::state_cache::{StateCache, StateCacheFiller, TopNStateCache}; use crate::common::table::state_table::StateTable; type CacheKey = (