Skip to content

Commit

Permalink
rename VecWithKvSize to EstimatedVec and impl EstimateSize for it
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Mar 21, 2024
1 parent a25bc40 commit 2c18ce9
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/common/estimate_size/src/collections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use hashmap::EstimatedHashMap;
pub mod btreemap;
pub use btreemap::EstimatedBTreeMap;
pub mod vec;
pub use vec::VecWithKvSize;
pub use vec::EstimatedVec;

mod private {
use super::*;
Expand Down
18 changes: 10 additions & 8 deletions src/common/estimate_size/src/collections/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
use crate::EstimateSize;

#[derive(Clone)]
pub struct VecWithKvSize<T: EstimateSize> {
pub struct EstimatedVec<T: EstimateSize> {
inner: Vec<T>,
kv_heap_size: usize,
}

impl<T: EstimateSize> Default for VecWithKvSize<T> {
impl<T: EstimateSize> Default for EstimatedVec<T> {
fn default() -> Self {
Self {
inner: vec![],
Expand All @@ -29,13 +29,15 @@ impl<T: EstimateSize> Default for VecWithKvSize<T> {
}
}

impl<T: EstimateSize> VecWithKvSize<T> {
pub fn new() -> Self {
Default::default()
impl<T: EstimateSize> EstimateSize for EstimatedVec<T> {
fn estimated_heap_size(&self) -> usize {
self.kv_heap_size
}
}

pub fn get_kv_size(&self) -> usize {
self.kv_heap_size
impl<T: EstimateSize> EstimatedVec<T> {
pub fn new() -> Self {
Default::default()
}

pub fn push(&mut self, value: T) {
Expand All @@ -54,7 +56,7 @@ impl<T: EstimateSize> VecWithKvSize<T> {
}
}

impl<T: EstimateSize> IntoIterator for VecWithKvSize<T> {
impl<T: EstimateSize> IntoIterator for EstimatedVec<T> {
type IntoIter = std::vec::IntoIter<Self::Item>;
type Item = T;

Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/lookup/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashSet;

use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common_estimate_size::collections::VecWithKvSize;
use risingwave_common_estimate_size::collections::EstimatedVec;
use risingwave_common_estimate_size::{EstimateSize, KvSize};

use crate::cache::{new_unbounded, ManagedLruCache};
Expand All @@ -35,7 +35,7 @@ impl LookupCache {
}

/// Update a key after lookup cache misses.
pub fn batch_update(&mut self, key: OwnedRow, value: VecWithKvSize<OwnedRow>) {
pub fn batch_update(&mut self, key: OwnedRow, value: EstimatedVec<OwnedRow>) {
self.data.push(key, LookupEntryState::new(value));
}

Expand Down Expand Up @@ -115,8 +115,8 @@ impl LookupEntryState {
}
}

fn new(value: VecWithKvSize<OwnedRow>) -> Self {
let kv_heap_size = value.get_kv_size();
fn new(value: EstimatedVec<OwnedRow>) -> Self {
let kv_heap_size = value.estimated_heap_size();
Self {
inner: HashSet::from_iter(value),
kv_heap_size: KvSize::with_size(kv_heap_size),
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/lookup/impl_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_common_estimate_size::collections::VecWithKvSize;
use risingwave_common_estimate_size::collections::EstimatedVec;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
Expand Down Expand Up @@ -370,7 +370,7 @@ impl<S: StateStore> LookupExecutor<S> {

tracing::debug!(target: "events::stream::lookup::lookup_row", "{:?}", lookup_row);

let mut all_rows = VecWithKvSize::new();
let mut all_rows = EstimatedVec::new();
// Drop the stream.
{
let all_data_iter = match self.arrangement.use_current_epoch {
Expand Down

0 comments on commit 2c18ce9

Please sign in to comment.