diff --git a/src/common/estimate_size/src/collections/btreemap.rs b/src/common/estimate_size/src/collections/btreemap.rs index 84ecd687d8df1..31b66cf95812e 100644 --- a/src/common/estimate_size/src/collections/btreemap.rs +++ b/src/common/estimate_size/src/collections/btreemap.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::fmt; use std::collections::BTreeMap; use std::ops::{Bound, RangeInclusive}; @@ -62,18 +63,18 @@ where self.inner.last_key_value() } - pub fn insert(&mut self, key: K, row: V) { + pub fn insert(&mut self, key: K, value: V) { let key_size = self.heap_size.add_val(&key); - self.heap_size.add_val(&row); - if let Some(old_row) = self.inner.insert(key, row) { + self.heap_size.add_val(&value); + if let Some(old_value) = self.inner.insert(key, value) { self.heap_size.sub_size(key_size); - self.heap_size.sub_val(&old_row); + self.heap_size.sub_val(&old_value); } } pub fn remove(&mut self, key: &K) { - if let Some(row) = self.inner.remove(key) { - self.heap_size.sub(key, &row); + if let Some(value) = self.inner.remove(key) { + self.heap_size.sub(key, &value); } } @@ -82,6 +83,36 @@ where self.heap_size.set(0); } + pub fn pop_first(&mut self) -> Option<(K, V)> { + let (key, value) = self.inner.pop_first()?; + self.heap_size.sub(&key, &value); + Some((key, value)) + } + + pub fn pop_last(&mut self) -> Option<(K, V)> { + let (key, value) = self.inner.pop_last()?; + self.heap_size.sub(&key, &value); + Some((key, value)) + } + + pub fn last_entry(&mut self) -> Option> { + self.inner.last_entry().map(|inner| OccupiedEntry { + inner, + heap_size: &mut self.heap_size, + }) + } + + pub fn iter(&self) -> impl Iterator { + self.inner.iter() + } + + pub fn range(&self, range: R) -> std::collections::btree_map::Range<'_, K, V> + where + R: std::ops::RangeBounds, + { + self.inner.range(range) + } + /// Retain the given range of entries in the map, removing others. pub fn retain_range(&mut self, range: RangeInclusive<&K>) -> (BTreeMap, BTreeMap) where @@ -114,6 +145,27 @@ where (left, right) } + + pub fn extract_if<'a, F>( + &'a mut self, + mut pred: F, + ) -> ExtractIf<'a, K, V, impl FnMut(&K, &mut V) -> bool> + where + F: 'a + FnMut(&K, &V) -> bool, + { + let pred_immut = move |key: &K, value: &mut V| pred(key, value); + ExtractIf { + inner: self.inner.extract_if(pred_immut), + heap_size: &mut self.heap_size, + } + } + + pub fn retain(&mut self, mut f: F) + where + F: FnMut(&K, &V) -> bool, + { + self.extract_if(|k, v| !f(k, v)).for_each(drop); + } } impl EstimateSize for EstimatedBTreeMap @@ -126,6 +178,64 @@ where } } +impl fmt::Debug for EstimatedBTreeMap +where + K: fmt::Debug, + V: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} + +pub struct OccupiedEntry<'a, K, V> { + inner: std::collections::btree_map::OccupiedEntry<'a, K, V>, + heap_size: &'a mut KvSize, +} + +impl<'a, K, V> OccupiedEntry<'a, K, V> +where + K: EstimateSize + Ord, + V: EstimateSize, +{ + pub fn key(&self) -> &K { + self.inner.key() + } + + pub fn remove_entry(self) -> (K, V) { + let (key, value) = self.inner.remove_entry(); + self.heap_size.sub(&key, &value); + (key, value) + } +} + +pub struct ExtractIf<'a, K, V, F> +where + F: FnMut(&K, &mut V) -> bool, +{ + inner: std::collections::btree_map::ExtractIf<'a, K, V, F>, + heap_size: &'a mut KvSize, +} + +impl<'a, K, V, F> Iterator for ExtractIf<'a, K, V, F> +where + K: EstimateSize, + V: EstimateSize, + F: FnMut(&K, &mut V) -> bool, +{ + type Item = (K, V); + + fn next(&mut self) -> Option { + let (key, value) = self.inner.next()?; + self.heap_size.sub(&key, &value); + Some((key, value)) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + #[cfg(test)] mod tests { use super::EstimatedBTreeMap; diff --git a/src/common/estimate_size/src/lib.rs b/src/common/estimate_size/src/lib.rs index 32e41fcf87fb2..3629bc5c535f2 100644 --- a/src/common/estimate_size/src/lib.rs +++ b/src/common/estimate_size/src/lib.rs @@ -14,6 +14,7 @@ #![feature(allocator_api)] #![feature(btree_cursors)] +#![feature(btree_extract_if)] pub mod collections; diff --git a/src/stream/src/executor/top_n/mod.rs b/src/stream/src/executor/top_n/mod.rs index b33f5f4883cac..2ad3c0ed7330e 100644 --- a/src/stream/src/executor/top_n/mod.rs +++ b/src/stream/src/executor/top_n/mod.rs @@ -19,9 +19,7 @@ use utils::*; mod top_n_cache; mod top_n_state; use top_n_cache::{TopNCache, TopNCacheTrait}; -mod topn_cache_state; use top_n_state::ManagedTopNState; -use topn_cache_state::CacheKey; // `TopN` variants mod group_top_n; diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index 3ce58e4e6017d..b627328cebf3e 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -20,14 +20,18 @@ use itertools::Itertools; use risingwave_common::array::{Op, RowRef}; use risingwave_common::row::{CompactedRow, Row, RowDeserializer, RowExt}; use risingwave_common::types::DataType; +use risingwave_common_estimate_size::collections::EstimatedBTreeMap; use risingwave_common_estimate_size::EstimateSize; use risingwave_storage::StateStore; -use super::topn_cache_state::TopNCacheState; -use super::{CacheKey, GroupKey, ManagedTopNState}; +use super::{GroupKey, ManagedTopNState}; use crate::consistency::{consistency_error, enable_strict_consistency}; use crate::executor::error::StreamExecutorResult; +/// `CacheKey` is composed of `(order_by, remaining columns of pk)`. +pub type CacheKey = (Vec, Vec); +type Cache = EstimatedBTreeMap; + const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2; const TOPN_CACHE_MIN_CAPACITY: usize = 10; @@ -44,17 +48,17 @@ const TOPN_CACHE_MIN_CAPACITY: usize = 10; /// since they have different semantics. pub struct TopNCache { /// Rows in the range `[0, offset)` - pub low: TopNCacheState, + pub low: Cache, /// Rows in the range `[offset, offset+limit)` /// /// When `WITH_TIES` is true, it also stores ties for the last element, /// and thus the size can be larger than `limit`. - pub middle: TopNCacheState, + pub middle: Cache, /// Rows in the range `[offset+limit, offset+limit+high_capacity)` /// /// When `WITH_TIES` is true, it also stores ties for the last element, /// and thus the size can be larger than `high_capacity`. - pub high: TopNCacheState, + pub high: Cache, pub high_capacity: usize, pub offset: usize, /// Assumption: `limit != 0` @@ -89,7 +93,7 @@ impl Debug for TopNCache { fn format_cache( f: &mut std::fmt::Formatter<'_>, - cache: &TopNCacheState, + cache: &Cache, data_types: &[DataType], ) -> std::fmt::Result { if cache.is_empty() { @@ -167,9 +171,9 @@ impl TopNCache { assert!(offset == 0, "OFFSET is not supported with WITH TIES"); } Self { - low: TopNCacheState::new(), - middle: TopNCacheState::new(), - high: TopNCacheState::new(), + low: Cache::new(), + middle: Cache::new(), + high: Cache::new(), high_capacity: offset .checked_add(limit) .and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR)) diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 57f4017e51185..7cce8ecb4c6ac 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -23,7 +23,8 @@ use risingwave_common::util::epoch::EpochPair; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; -use super::{serialize_pk_to_cache_key, CacheKey, CacheKeySerde, GroupKey, TopNCache}; +use super::top_n_cache::CacheKey; +use super::{serialize_pk_to_cache_key, CacheKeySerde, GroupKey, TopNCache}; use crate::common::table::state_table::StateTable; use crate::executor::error::StreamExecutorResult; diff --git a/src/stream/src/executor/top_n/topn_cache_state.rs b/src/stream/src/executor/top_n/topn_cache_state.rs deleted file mode 100644 index fa28369f2b0d6..0000000000000 --- a/src/stream/src/executor/top_n/topn_cache_state.rs +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use core::fmt; -use std::alloc::Global; -use std::collections::btree_map::{BTreeMap, ExtractIf, OccupiedEntry, Range}; -use std::ops::RangeBounds; - -use risingwave_common::row::CompactedRow; -use risingwave_common_estimate_size::{EstimateSize, KvSize}; - -/// `CacheKey` is composed of `(order_by, remaining columns of pk)`. -pub type CacheKey = (Vec, Vec); - -#[derive(Default)] -pub struct TopNCacheState { - /// The full copy of the state. - inner: BTreeMap, - kv_heap_size: KvSize, -} - -impl EstimateSize for TopNCacheState { - fn estimated_heap_size(&self) -> usize { - // TODO: Add btreemap internal size. - // https://github.com/risingwavelabs/risingwave/issues/9713 - self.kv_heap_size.size() - } -} - -impl TopNCacheState { - pub fn new() -> Self { - Self { - inner: BTreeMap::new(), - kv_heap_size: KvSize::new(), - } - } - - /// Insert into the cache. - pub fn insert(&mut self, key: CacheKey, value: CompactedRow) -> Option { - self.kv_heap_size.add(&key, &value); - self.inner.insert(key, value) - } - - /// Delete from the cache. - pub fn remove(&mut self, key: &CacheKey) { - if let Some(value) = self.inner.remove(key) { - self.kv_heap_size.sub(key, &value); - } - } - - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - pub fn last_key_value(&self) -> Option<(&CacheKey, &CompactedRow)> { - self.inner.last_key_value() - } - - pub fn first_key_value(&self) -> Option<(&CacheKey, &CompactedRow)> { - self.inner.first_key_value() - } - - pub fn clear(&mut self) { - self.inner.clear() - } - - pub fn pop_first(&mut self) -> Option<(CacheKey, CompactedRow)> { - self.inner.pop_first().inspect(|(k, v)| { - self.kv_heap_size.sub(k, v); - }) - } - - pub fn pop_last(&mut self) -> Option<(CacheKey, CompactedRow)> { - self.inner.pop_last().inspect(|(k, v)| { - self.kv_heap_size.sub(k, v); - }) - } - - pub fn last_entry(&mut self) -> Option> { - self.inner - .last_entry() - .map(|entry| TopNCacheOccupiedEntry::new(entry, &mut self.kv_heap_size)) - } - - pub fn iter(&self) -> impl Iterator { - self.inner.iter() - } - - pub fn range(&self, range: R) -> Range<'_, CacheKey, CompactedRow> - where - R: RangeBounds, - { - self.inner.range(range) - } - - pub fn extract_if<'a, F1>( - &'a mut self, - mut pred: F1, - ) -> TopNExtractIf<'a, impl FnMut(&CacheKey, &mut CompactedRow) -> bool> - where - F1: 'a + FnMut(&CacheKey, &CompactedRow) -> bool, - { - let pred_immut = move |key: &CacheKey, value: &mut CompactedRow| pred(key, value); - TopNExtractIf { - inner: self.inner.extract_if(pred_immut), - kv_heap_size: &mut self.kv_heap_size, - } - } - - pub fn retain(&mut self, mut f: F) - where - F: FnMut(&CacheKey, &CompactedRow) -> bool, - { - self.extract_if(|k, v| !f(k, v)).for_each(drop); - } -} - -pub struct TopNCacheOccupiedEntry<'a> { - inner: OccupiedEntry<'a, CacheKey, CompactedRow>, - /// The total size of the `TopNCacheState` - kv_heap_size: &'a mut KvSize, -} - -impl<'a> TopNCacheOccupiedEntry<'a> { - pub fn new(entry: OccupiedEntry<'a, CacheKey, CompactedRow>, size: &'a mut KvSize) -> Self { - Self { - inner: entry, - kv_heap_size: size, - } - } - - pub fn remove_entry(self) -> (CacheKey, CompactedRow) { - let (k, v) = self.inner.remove_entry(); - self.kv_heap_size.add(&k, &v); - (k, v) - } - - pub fn key(&self) -> &CacheKey { - self.inner.key() - } -} - -impl fmt::Debug for TopNCacheState { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.inner.fmt(f) - } -} - -pub struct TopNExtractIf<'a, F> -where - F: FnMut(&CacheKey, &mut CompactedRow) -> bool, -{ - inner: ExtractIf<'a, CacheKey, CompactedRow, F, Global>, - kv_heap_size: &'a mut KvSize, -} - -impl<'a, F> Iterator for TopNExtractIf<'a, F> -where - F: 'a + FnMut(&CacheKey, &mut CompactedRow) -> bool, -{ - type Item = (CacheKey, CompactedRow); - - fn next(&mut self) -> Option { - self.inner - .next() - .inspect(|(k, v)| self.kv_heap_size.sub(k, v)) - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} diff --git a/src/stream/src/executor/top_n/utils.rs b/src/stream/src/executor/top_n/utils.rs index 18409cf3d6db1..f3adebdc8000b 100644 --- a/src/stream/src/executor/top_n/utils.rs +++ b/src/stream/src/executor/top_n/utils.rs @@ -23,7 +23,7 @@ use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::ColumnOrder; -use super::CacheKey; +use super::top_n_cache::CacheKey; use crate::executor::prelude::*; pub trait TopNExecutorBase: Send + 'static {