Skip to content

Commit

Permalink
refactor(estimate_size, topn): merge TopNCacheState with `Estimated…
Browse files Browse the repository at this point in the history
…BTreeMap` (#17892)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Nov 22, 2024
1 parent d365cf4 commit d8a3839
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 206 deletions.
122 changes: 116 additions & 6 deletions src/common/estimate_size/src/collections/btreemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::fmt;
use std::collections::BTreeMap;
use std::ops::{Bound, RangeInclusive};

Expand Down Expand Up @@ -62,18 +63,18 @@ where
self.inner.last_key_value()
}

pub fn insert(&mut self, key: K, row: V) {
pub fn insert(&mut self, key: K, value: V) {
let key_size = self.heap_size.add_val(&key);
self.heap_size.add_val(&row);
if let Some(old_row) = self.inner.insert(key, row) {
self.heap_size.add_val(&value);
if let Some(old_value) = self.inner.insert(key, value) {
self.heap_size.sub_size(key_size);
self.heap_size.sub_val(&old_row);
self.heap_size.sub_val(&old_value);
}
}

pub fn remove(&mut self, key: &K) {
if let Some(row) = self.inner.remove(key) {
self.heap_size.sub(key, &row);
if let Some(value) = self.inner.remove(key) {
self.heap_size.sub(key, &value);
}
}

Expand All @@ -82,6 +83,36 @@ where
self.heap_size.set(0);
}

pub fn pop_first(&mut self) -> Option<(K, V)> {
let (key, value) = self.inner.pop_first()?;
self.heap_size.sub(&key, &value);
Some((key, value))
}

pub fn pop_last(&mut self) -> Option<(K, V)> {
let (key, value) = self.inner.pop_last()?;
self.heap_size.sub(&key, &value);
Some((key, value))
}

pub fn last_entry(&mut self) -> Option<OccupiedEntry<'_, K, V>> {
self.inner.last_entry().map(|inner| OccupiedEntry {
inner,
heap_size: &mut self.heap_size,
})
}

pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
self.inner.iter()
}

pub fn range<R>(&self, range: R) -> std::collections::btree_map::Range<'_, K, V>
where
R: std::ops::RangeBounds<K>,
{
self.inner.range(range)
}

/// Retain the given range of entries in the map, removing others.
pub fn retain_range(&mut self, range: RangeInclusive<&K>) -> (BTreeMap<K, V>, BTreeMap<K, V>)
where
Expand Down Expand Up @@ -114,6 +145,27 @@ where

(left, right)
}

pub fn extract_if<'a, F>(
&'a mut self,
mut pred: F,
) -> ExtractIf<'a, K, V, impl FnMut(&K, &mut V) -> bool>
where
F: 'a + FnMut(&K, &V) -> bool,
{
let pred_immut = move |key: &K, value: &mut V| pred(key, value);
ExtractIf {
inner: self.inner.extract_if(pred_immut),
heap_size: &mut self.heap_size,
}
}

pub fn retain<F>(&mut self, mut f: F)
where
F: FnMut(&K, &V) -> bool,
{
self.extract_if(|k, v| !f(k, v)).for_each(drop);
}
}

impl<K, V> EstimateSize for EstimatedBTreeMap<K, V>
Expand All @@ -126,6 +178,64 @@ where
}
}

impl<K, V> fmt::Debug for EstimatedBTreeMap<K, V>
where
K: fmt::Debug,
V: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}

pub struct OccupiedEntry<'a, K, V> {
inner: std::collections::btree_map::OccupiedEntry<'a, K, V>,
heap_size: &'a mut KvSize,
}

impl<'a, K, V> OccupiedEntry<'a, K, V>
where
K: EstimateSize + Ord,
V: EstimateSize,
{
pub fn key(&self) -> &K {
self.inner.key()
}

pub fn remove_entry(self) -> (K, V) {
let (key, value) = self.inner.remove_entry();
self.heap_size.sub(&key, &value);
(key, value)
}
}

pub struct ExtractIf<'a, K, V, F>
where
F: FnMut(&K, &mut V) -> bool,
{
inner: std::collections::btree_map::ExtractIf<'a, K, V, F>,
heap_size: &'a mut KvSize,
}

impl<'a, K, V, F> Iterator for ExtractIf<'a, K, V, F>
where
K: EstimateSize,
V: EstimateSize,
F: FnMut(&K, &mut V) -> bool,
{
type Item = (K, V);

fn next(&mut self) -> Option<Self::Item> {
let (key, value) = self.inner.next()?;
self.heap_size.sub(&key, &value);
Some((key, value))
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

#[cfg(test)]
mod tests {
use super::EstimatedBTreeMap;
Expand Down
1 change: 1 addition & 0 deletions src/common/estimate_size/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#![feature(allocator_api)]
#![feature(btree_cursors)]
#![feature(btree_extract_if)]

pub mod collections;

Expand Down
2 changes: 0 additions & 2 deletions src/stream/src/executor/top_n/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ use utils::*;
mod top_n_cache;
mod top_n_state;
use top_n_cache::{TopNCache, TopNCacheTrait};
mod topn_cache_state;
use top_n_state::ManagedTopNState;
use topn_cache_state::CacheKey;

// `TopN` variants
mod group_top_n;
Expand Down
22 changes: 13 additions & 9 deletions src/stream/src/executor/top_n/top_n_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ use itertools::Itertools;
use risingwave_common::array::{Op, RowRef};
use risingwave_common::row::{CompactedRow, Row, RowDeserializer, RowExt};
use risingwave_common::types::DataType;
use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_storage::StateStore;

use super::topn_cache_state::TopNCacheState;
use super::{CacheKey, GroupKey, ManagedTopNState};
use super::{GroupKey, ManagedTopNState};
use crate::consistency::{consistency_error, enable_strict_consistency};
use crate::executor::error::StreamExecutorResult;

/// `CacheKey` is composed of `(order_by, remaining columns of pk)`.
pub type CacheKey = (Vec<u8>, Vec<u8>);
type Cache = EstimatedBTreeMap<CacheKey, CompactedRow>;

const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2;
const TOPN_CACHE_MIN_CAPACITY: usize = 10;

Expand All @@ -44,17 +48,17 @@ const TOPN_CACHE_MIN_CAPACITY: usize = 10;
/// since they have different semantics.
pub struct TopNCache<const WITH_TIES: bool> {
/// Rows in the range `[0, offset)`
pub low: TopNCacheState,
pub low: Cache,
/// Rows in the range `[offset, offset+limit)`
///
/// When `WITH_TIES` is true, it also stores ties for the last element,
/// and thus the size can be larger than `limit`.
pub middle: TopNCacheState,
pub middle: Cache,
/// Rows in the range `[offset+limit, offset+limit+high_capacity)`
///
/// When `WITH_TIES` is true, it also stores ties for the last element,
/// and thus the size can be larger than `high_capacity`.
pub high: TopNCacheState,
pub high: Cache,
pub high_capacity: usize,
pub offset: usize,
/// Assumption: `limit != 0`
Expand Down Expand Up @@ -89,7 +93,7 @@ impl<const WITH_TIES: bool> Debug for TopNCache<WITH_TIES> {

fn format_cache(
f: &mut std::fmt::Formatter<'_>,
cache: &TopNCacheState,
cache: &Cache,
data_types: &[DataType],
) -> std::fmt::Result {
if cache.is_empty() {
Expand Down Expand Up @@ -167,9 +171,9 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
assert!(offset == 0, "OFFSET is not supported with WITH TIES");
}
Self {
low: TopNCacheState::new(),
middle: TopNCacheState::new(),
high: TopNCacheState::new(),
low: Cache::new(),
middle: Cache::new(),
high: Cache::new(),
high_capacity: offset
.checked_add(limit)
.and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR))
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/executor/top_n/top_n_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use risingwave_common::util::epoch::EpochPair;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

use super::{serialize_pk_to_cache_key, CacheKey, CacheKeySerde, GroupKey, TopNCache};
use super::top_n_cache::CacheKey;
use super::{serialize_pk_to_cache_key, CacheKeySerde, GroupKey, TopNCache};
use crate::common::table::state_table::StateTable;
use crate::executor::error::StreamExecutorResult;

Expand Down
Loading

0 comments on commit d8a3839

Please sign in to comment.