Skip to content

Commit

Permalink
refactor(common): generalize LookupEntryState to EstimatedHashSet (
Browse files Browse the repository at this point in the history
…#15843)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Mar 22, 2024
1 parent a34b4f8 commit ded3314
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 54 deletions.
73 changes: 73 additions & 0 deletions src/common/estimate_size/src/collections/hashset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::hash::Hash;

use super::EstimatedVec;
use crate::{EstimateSize, KvSize};

#[derive(Default)]
pub struct EstimatedHashSet<T: EstimateSize> {
inner: HashSet<T>,
heap_size: KvSize,
}

impl<T: EstimateSize> EstimateSize for EstimatedHashSet<T> {
fn estimated_heap_size(&self) -> usize {
// TODO: Add hashset internal size.
// https://github.com/risingwavelabs/risingwave/issues/9713
self.heap_size.size()
}
}

impl<T: EstimateSize> EstimatedHashSet<T>
where
T: Eq + Hash,
{
/// Insert into the cache.
pub fn insert(&mut self, value: T) -> bool {
let heap_size = self.heap_size.add_val(&value);
let inserted = self.inner.insert(value);
if inserted {
self.heap_size.set(heap_size);
}
inserted
}

/// Delete from the cache.
pub fn remove(&mut self, value: &T) -> bool {
let removed = self.inner.remove(value);
if removed {
self.heap_size.sub_val(value);
}
removed
}

/// Convert an [`EstimatedVec`] to a [`EstimatedHashSet`]. Do not need to recalculate the
/// heap size.
pub fn from_vec(v: EstimatedVec<T>) -> Self {
let heap_size = v.estimated_heap_size();
Self {
inner: HashSet::from_iter(v),
heap_size: KvSize::with_size(heap_size),
}
}
}

impl<T: EstimateSize> EstimatedHashSet<T> {
pub fn iter(&self) -> impl Iterator<Item = &T> {
self.inner.iter()
}
}
2 changes: 2 additions & 0 deletions src/common/estimate_size/src/collections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub mod btreemap;
pub use btreemap::EstimatedBTreeMap;
pub mod vec;
pub use vec::EstimatedVec;
pub mod hashset;
pub use hashset::EstimatedHashSet;

mod private {
use super::*;
Expand Down
65 changes: 11 additions & 54 deletions src/stream/src/executor/lookup/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common_estimate_size::collections::EstimatedVec;
use risingwave_common_estimate_size::{EstimateSize, KvSize};
use risingwave_common_estimate_size::collections::{EstimatedHashSet, EstimatedVec};

use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::task::AtomicU64Ref;

pub type LookupEntryState = EstimatedHashSet<OwnedRow>;

/// A cache for lookup's arrangement side.
pub struct LookupCache {
data: ManagedLruCache<OwnedRow, LookupEntryState>,
Expand All @@ -36,7 +35,7 @@ impl LookupCache {

/// Update a key after lookup cache misses.
pub fn batch_update(&mut self, key: OwnedRow, value: EstimatedVec<OwnedRow>) {
self.data.push(key, LookupEntryState::new(value));
self.data.push(key, LookupEntryState::from_vec(value));
}

/// Apply a batch from the arrangement side
Expand All @@ -45,12 +44,17 @@ impl LookupCache {
let key = row.project(arrange_join_keys).into_owned_row();
if let Some(mut values) = self.data.get_mut(&key) {
// the item is in cache, update it
let row = row.into_owned_row();
match op {
Op::Insert | Op::UpdateInsert => {
values.insert(row.into_owned_row());
if !values.insert(row) {
panic!("inserting a duplicated value");
}
}
Op::Delete | Op::UpdateDelete => {
values.remove(&row.into_owned_row());
if !values.remove(&row) {
panic!("row {:?} should be in the cache", row);
}
}
}
}
Expand Down Expand Up @@ -80,50 +84,3 @@ impl LookupCache {
Self { data: cache }
}
}

#[derive(Default)]
pub struct LookupEntryState {
inner: HashSet<OwnedRow>,
kv_heap_size: KvSize,
}

impl EstimateSize for LookupEntryState {
fn estimated_heap_size(&self) -> usize {
// TODO: Add hashset internal size.
// https://github.com/risingwavelabs/risingwave/issues/9713
self.kv_heap_size.size()
}
}

impl LookupEntryState {
/// Insert into the cache.
fn insert(&mut self, value: OwnedRow) {
let kv_heap_size = self.kv_heap_size.add_val(&value);
if self.inner.insert(value) {
self.kv_heap_size.set(kv_heap_size);
} else {
panic!("inserting a duplicated value");
}
}

/// Delete from the cache.
fn remove(&mut self, value: &OwnedRow) {
if self.inner.remove(value) {
self.kv_heap_size.sub_val(value);
} else {
panic!("value {:?} should be in the cache", value);
}
}

fn new(value: EstimatedVec<OwnedRow>) -> Self {
let kv_heap_size = value.estimated_heap_size();
Self {
inner: HashSet::from_iter(value),
kv_heap_size: KvSize::with_size(kv_heap_size),
}
}

pub fn iter(&self) -> impl Iterator<Item = &OwnedRow> {
self.inner.iter()
}
}

0 comments on commit ded3314

Please sign in to comment.