Skip to content

Commit

Permalink
refactor(state_cache): simplify StateCaches by reusing `EstimatedBT…
Browse files Browse the repository at this point in the history
…reeMap` (#17895)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Nov 22, 2024
1 parent 88aed8f commit 70b60c1
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 291 deletions.
77 changes: 54 additions & 23 deletions src/common/estimate_size/src/collections/btreemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::ops::{Bound, RangeInclusive};

use crate::{EstimateSize, KvSize};

#[derive(Clone)]
pub struct EstimatedBTreeMap<K, V> {
inner: BTreeMap<K, V>,
heap_size: KvSize,
Expand All @@ -42,40 +43,81 @@ impl<K, V> EstimatedBTreeMap<K, V> {
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}

impl<K, V> Default for EstimatedBTreeMap<K, V> {
fn default() -> Self {
Self::new()
pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
self.inner.iter()
}

pub fn range<R>(&self, range: R) -> std::collections::btree_map::Range<'_, K, V>
where
K: Ord,
R: std::ops::RangeBounds<K>,
{
self.inner.range(range)
}

pub fn values(&self) -> impl Iterator<Item = &V> {
self.inner.values()
}
}

impl<K, V> EstimatedBTreeMap<K, V>
where
K: EstimateSize + Ord,
V: EstimateSize,
K: Ord,
{
pub fn first_key_value(&self) -> Option<(&K, &V)> {
self.inner.first_key_value()
}

pub fn first_key(&self) -> Option<&K> {
self.first_key_value().map(|(k, _)| k)
}

pub fn first_value(&self) -> Option<&V> {
self.first_key_value().map(|(_, v)| v)
}

pub fn last_key_value(&self) -> Option<(&K, &V)> {
self.inner.last_key_value()
}

pub fn insert(&mut self, key: K, value: V) {
pub fn last_key(&self) -> Option<&K> {
self.last_key_value().map(|(k, _)| k)
}

pub fn last_value(&self) -> Option<&V> {
self.last_key_value().map(|(_, v)| v)
}
}

impl<K, V> Default for EstimatedBTreeMap<K, V> {
fn default() -> Self {
Self::new()
}
}

impl<K, V> EstimatedBTreeMap<K, V>
where
K: EstimateSize + Ord,
V: EstimateSize,
{
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
let key_size = self.heap_size.add_val(&key);
self.heap_size.add_val(&value);
if let Some(old_value) = self.inner.insert(key, value) {
let old_value = self.inner.insert(key, value);
if let Some(old_value) = &old_value {
self.heap_size.sub_size(key_size);
self.heap_size.sub_val(&old_value);
self.heap_size.sub_val(old_value);
}
old_value
}

pub fn remove(&mut self, key: &K) {
if let Some(value) = self.inner.remove(key) {
self.heap_size.sub(key, &value);
pub fn remove(&mut self, key: &K) -> Option<V> {
let old_value = self.inner.remove(key);
if let Some(old_value) = &old_value {
self.heap_size.sub(key, old_value);
}
old_value
}

pub fn clear(&mut self) {
Expand All @@ -102,17 +144,6 @@ where
})
}

pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
self.inner.iter()
}

pub fn range<R>(&self, range: R) -> std::collections::btree_map::Range<'_, K, V>
where
R: std::ops::RangeBounds<K>,
{
self.inner.range(range)
}

/// Retain the given range of entries in the map, removing others.
pub fn retain_range(&mut self, range: RangeInclusive<&K>) -> (BTreeMap<K, V>, BTreeMap<K, V>)
where
Expand Down
19 changes: 0 additions & 19 deletions src/stream/src/common/cache/mod.rs

This file was deleted.

191 changes: 0 additions & 191 deletions src/stream/src/common/cache/top_n_cache.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/stream/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

pub use column_mapping::*;

pub mod cache;
mod column_mapping;
pub mod compact_chunk;
pub mod log_store_impl;
pub mod metrics;
pub mod rate_limit;
pub mod state_cache;
pub mod table;
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub use ordered::*;
use risingwave_common::array::Op;
use risingwave_common_estimate_size::EstimateSize;
pub use top_n::*;

mod ordered;
mod top_n;

pub use ordered::*;
pub use top_n::*;

/// A common interface for state table cache.
pub trait StateCache: EstimateSize {
type Key: Ord + EstimateSize;
Expand Down
Loading

0 comments on commit 70b60c1

Please sign in to comment.