diff --git a/src/common/estimate_size/src/collections/hashset.rs b/src/common/estimate_size/src/collections/hashset.rs new file mode 100644 index 0000000000000..ae55dd72d51d5 --- /dev/null +++ b/src/common/estimate_size/src/collections/hashset.rs @@ -0,0 +1,73 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::hash::Hash; + +use super::EstimatedVec; +use crate::{EstimateSize, KvSize}; + +#[derive(Default)] +pub struct EstimatedHashSet { + inner: HashSet, + kv_heap_size: KvSize, +} + +impl EstimateSize for EstimatedHashSet { + fn estimated_heap_size(&self) -> usize { + // TODO: Add hashset internal size. + // https://github.com/risingwavelabs/risingwave/issues/9713 + self.kv_heap_size.size() + } +} + +impl EstimatedHashSet +where + T: Eq + Hash, +{ + /// Insert into the cache. + pub fn insert(&mut self, value: T) -> bool { + let kv_heap_size = self.kv_heap_size.add_val(&value); + let inserted = self.inner.insert(value); + if inserted { + self.kv_heap_size.set(kv_heap_size); + } + inserted + } + + /// Delete from the cache. + pub fn remove(&mut self, value: &T) -> bool { + let removed = self.inner.remove(value); + if removed { + self.kv_heap_size.sub_val(value); + } + removed + } + + /// Convert an [`EstimatedVec`] to a [`EstimatedHashSet`]. Do not need to recalculate the + /// heap size. + pub fn from_vec(v: EstimatedVec) -> Self { + let kv_heap_size = v.estimated_heap_size(); + Self { + inner: HashSet::from_iter(v), + kv_heap_size: KvSize::with_size(kv_heap_size), + } + } +} + +impl EstimatedHashSet { + pub fn iter(&self) -> impl Iterator { + self.inner.iter() + } +} diff --git a/src/common/estimate_size/src/collections/mod.rs b/src/common/estimate_size/src/collections/mod.rs index 3e8864f4549e1..f7cdff490a88d 100644 --- a/src/common/estimate_size/src/collections/mod.rs +++ b/src/common/estimate_size/src/collections/mod.rs @@ -25,6 +25,8 @@ pub mod btreemap; pub use btreemap::EstimatedBTreeMap; pub mod vec; pub use vec::EstimatedVec; +pub mod hashset; +pub use hashset::EstimatedHashSet; mod private { use super::*; diff --git a/src/stream/src/executor/lookup/cache.rs b/src/stream/src/executor/lookup/cache.rs index 7e6544473366b..6376c6804d759 100644 --- a/src/stream/src/executor/lookup/cache.rs +++ b/src/stream/src/executor/lookup/cache.rs @@ -12,17 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; - use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::row::{OwnedRow, Row, RowExt}; -use risingwave_common_estimate_size::collections::EstimatedVec; -use risingwave_common_estimate_size::{EstimateSize, KvSize}; +use risingwave_common_estimate_size::collections::{EstimatedHashSet, EstimatedVec}; use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::task::AtomicU64Ref; +pub type LookupEntryState = EstimatedHashSet; + /// A cache for lookup's arrangement side. pub struct LookupCache { data: ManagedLruCache, @@ -36,7 +35,7 @@ impl LookupCache { /// Update a key after lookup cache misses. pub fn batch_update(&mut self, key: OwnedRow, value: EstimatedVec) { - self.data.push(key, LookupEntryState::new(value)); + self.data.push(key, LookupEntryState::from_vec(value)); } /// Apply a batch from the arrangement side @@ -45,12 +44,17 @@ impl LookupCache { let key = row.project(arrange_join_keys).into_owned_row(); if let Some(mut values) = self.data.get_mut(&key) { // the item is in cache, update it + let row = row.into_owned_row(); match op { Op::Insert | Op::UpdateInsert => { - values.insert(row.into_owned_row()); + if !values.insert(row) { + panic!("inserting a duplicated value"); + } } Op::Delete | Op::UpdateDelete => { - values.remove(&row.into_owned_row()); + if !values.remove(&row) { + panic!("row {:?} should be in the cache", row); + } } } } @@ -80,50 +84,3 @@ impl LookupCache { Self { data: cache } } } - -#[derive(Default)] -pub struct LookupEntryState { - inner: HashSet, - kv_heap_size: KvSize, -} - -impl EstimateSize for LookupEntryState { - fn estimated_heap_size(&self) -> usize { - // TODO: Add hashset internal size. - // https://github.com/risingwavelabs/risingwave/issues/9713 - self.kv_heap_size.size() - } -} - -impl LookupEntryState { - /// Insert into the cache. - fn insert(&mut self, value: OwnedRow) { - let kv_heap_size = self.kv_heap_size.add_val(&value); - if self.inner.insert(value) { - self.kv_heap_size.set(kv_heap_size); - } else { - panic!("inserting a duplicated value"); - } - } - - /// Delete from the cache. - fn remove(&mut self, value: &OwnedRow) { - if self.inner.remove(value) { - self.kv_heap_size.sub_val(value); - } else { - panic!("value {:?} should be in the cache", value); - } - } - - fn new(value: EstimatedVec) -> Self { - let kv_heap_size = value.estimated_heap_size(); - Self { - inner: HashSet::from_iter(value), - kv_heap_size: KvSize::with_size(kv_heap_size), - } - } - - pub fn iter(&self) -> impl Iterator { - self.inner.iter() - } -}