diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index 4736f4aa53a8..e62bcedcd580 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -128,7 +128,6 @@ sqllogictest -p 4566 -d dev './e2e_test/udf/external_udf.slt' pkill java echo "--- e2e, $mode, embedded udf" -python3 -m pip install --break-system-packages flask waitress sqllogictest -p 4566 -d dev './e2e_test/udf/wasm_udf.slt' sqllogictest -p 4566 -d dev './e2e_test/udf/rust_udf.slt' sqllogictest -p 4566 -d dev './e2e_test/udf/js_udf.slt' diff --git a/src/common/estimate_size/src/collections/btreemap.rs b/src/common/estimate_size/src/collections/btreemap.rs index 84ecd687d8df..f48a78715f69 100644 --- a/src/common/estimate_size/src/collections/btreemap.rs +++ b/src/common/estimate_size/src/collections/btreemap.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::fmt; use std::collections::BTreeMap; use std::ops::{Bound, RangeInclusive}; use crate::{EstimateSize, KvSize}; +#[derive(Clone)] pub struct EstimatedBTreeMap { inner: BTreeMap, heap_size: KvSize, @@ -41,40 +43,81 @@ impl EstimatedBTreeMap { pub fn is_empty(&self) -> bool { self.inner.is_empty() } -} -impl Default for EstimatedBTreeMap { - fn default() -> Self { - Self::new() + pub fn iter(&self) -> impl Iterator { + self.inner.iter() + } + + pub fn range(&self, range: R) -> std::collections::btree_map::Range<'_, K, V> + where + K: Ord, + R: std::ops::RangeBounds, + { + self.inner.range(range) + } + + pub fn values(&self) -> impl Iterator { + self.inner.values() } } impl EstimatedBTreeMap where - K: EstimateSize + Ord, - V: EstimateSize, + K: Ord, { pub fn first_key_value(&self) -> Option<(&K, &V)> { self.inner.first_key_value() } + pub fn first_key(&self) -> Option<&K> { + self.first_key_value().map(|(k, _)| k) + } + + pub fn first_value(&self) -> Option<&V> { + self.first_key_value().map(|(_, v)| v) + } + pub fn last_key_value(&self) -> Option<(&K, &V)> { self.inner.last_key_value() } - pub fn insert(&mut self, key: K, row: V) { + pub fn last_key(&self) -> Option<&K> { + self.last_key_value().map(|(k, _)| k) + } + + pub fn last_value(&self) -> Option<&V> { + self.last_key_value().map(|(_, v)| v) + } +} + +impl Default for EstimatedBTreeMap { + fn default() -> Self { + Self::new() + } +} + +impl EstimatedBTreeMap +where + K: EstimateSize + Ord, + V: EstimateSize, +{ + pub fn insert(&mut self, key: K, value: V) -> Option { let key_size = self.heap_size.add_val(&key); - self.heap_size.add_val(&row); - if let Some(old_row) = self.inner.insert(key, row) { + self.heap_size.add_val(&value); + let old_value = self.inner.insert(key, value); + if let Some(old_value) = &old_value { self.heap_size.sub_size(key_size); - self.heap_size.sub_val(&old_row); + self.heap_size.sub_val(old_value); } + old_value } - pub fn remove(&mut self, key: &K) { - if let Some(row) = self.inner.remove(key) { - self.heap_size.sub(key, &row); + pub fn remove(&mut self, key: &K) -> Option { + let old_value = self.inner.remove(key); + if let Some(old_value) = &old_value { + self.heap_size.sub(key, old_value); } + old_value } pub fn clear(&mut self) { @@ -82,6 +125,25 @@ where self.heap_size.set(0); } + pub fn pop_first(&mut self) -> Option<(K, V)> { + let (key, value) = self.inner.pop_first()?; + self.heap_size.sub(&key, &value); + Some((key, value)) + } + + pub fn pop_last(&mut self) -> Option<(K, V)> { + let (key, value) = self.inner.pop_last()?; + self.heap_size.sub(&key, &value); + Some((key, value)) + } + + pub fn last_entry(&mut self) -> Option> { + self.inner.last_entry().map(|inner| OccupiedEntry { + inner, + heap_size: &mut self.heap_size, + }) + } + /// Retain the given range of entries in the map, removing others. pub fn retain_range(&mut self, range: RangeInclusive<&K>) -> (BTreeMap, BTreeMap) where @@ -114,6 +176,27 @@ where (left, right) } + + pub fn extract_if<'a, F>( + &'a mut self, + mut pred: F, + ) -> ExtractIf<'a, K, V, impl FnMut(&K, &mut V) -> bool> + where + F: 'a + FnMut(&K, &V) -> bool, + { + let pred_immut = move |key: &K, value: &mut V| pred(key, value); + ExtractIf { + inner: self.inner.extract_if(pred_immut), + heap_size: &mut self.heap_size, + } + } + + pub fn retain(&mut self, mut f: F) + where + F: FnMut(&K, &V) -> bool, + { + self.extract_if(|k, v| !f(k, v)).for_each(drop); + } } impl EstimateSize for EstimatedBTreeMap @@ -126,6 +209,64 @@ where } } +impl fmt::Debug for EstimatedBTreeMap +where + K: fmt::Debug, + V: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} + +pub struct OccupiedEntry<'a, K, V> { + inner: std::collections::btree_map::OccupiedEntry<'a, K, V>, + heap_size: &'a mut KvSize, +} + +impl<'a, K, V> OccupiedEntry<'a, K, V> +where + K: EstimateSize + Ord, + V: EstimateSize, +{ + pub fn key(&self) -> &K { + self.inner.key() + } + + pub fn remove_entry(self) -> (K, V) { + let (key, value) = self.inner.remove_entry(); + self.heap_size.sub(&key, &value); + (key, value) + } +} + +pub struct ExtractIf<'a, K, V, F> +where + F: FnMut(&K, &mut V) -> bool, +{ + inner: std::collections::btree_map::ExtractIf<'a, K, V, F>, + heap_size: &'a mut KvSize, +} + +impl<'a, K, V, F> Iterator for ExtractIf<'a, K, V, F> +where + K: EstimateSize, + V: EstimateSize, + F: FnMut(&K, &mut V) -> bool, +{ + type Item = (K, V); + + fn next(&mut self) -> Option { + let (key, value) = self.inner.next()?; + self.heap_size.sub(&key, &value); + Some((key, value)) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + #[cfg(test)] mod tests { use super::EstimatedBTreeMap; diff --git a/src/common/estimate_size/src/lib.rs b/src/common/estimate_size/src/lib.rs index 32e41fcf87fb..3629bc5c535f 100644 --- a/src/common/estimate_size/src/lib.rs +++ b/src/common/estimate_size/src/lib.rs @@ -14,6 +14,7 @@ #![feature(allocator_api)] #![feature(btree_cursors)] +#![feature(btree_extract_if)] pub mod collections; diff --git a/src/stream/src/common/cache/mod.rs b/src/stream/src/common/cache/mod.rs deleted file mode 100644 index 27481541492e..000000000000 --- a/src/stream/src/common/cache/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod state_cache; -mod top_n_cache; - -pub use state_cache::*; -pub use top_n_cache::*; diff --git a/src/stream/src/common/cache/top_n_cache.rs b/src/stream/src/common/cache/top_n_cache.rs deleted file mode 100644 index e6e65bacea5f..000000000000 --- a/src/stream/src/common/cache/top_n_cache.rs +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; - -use risingwave_common_estimate_size::{EstimateSize, KvSize}; - -/// Inner top-N cache structure for [`super::TopNStateCache`]. -#[derive(Clone)] -pub struct TopNCache { - /// The capacity of the cache. - capacity: usize, - /// Ordered cache entries. - entries: BTreeMap, - kv_heap_size: KvSize, -} - -impl EstimateSize for TopNCache { - fn estimated_heap_size(&self) -> usize { - // TODO: Add btreemap internal size. - // https://github.com/risingwavelabs/risingwave/issues/9713 - self.kv_heap_size.size() - } -} - -impl TopNCache { - /// Create a new cache with specified capacity and order requirements. - /// To create a cache with unlimited capacity, use `usize::MAX` for `capacity`. - pub fn new(capacity: usize) -> Self { - Self { - capacity, - entries: Default::default(), - kv_heap_size: KvSize::new(), - } - } - - /// Get the capacity of the cache. - pub fn capacity(&self) -> usize { - self.capacity - } - - /// Get the number of entries in the cache. - #[allow(dead_code)] - pub fn len(&self) -> usize { - self.entries.len() - } - - /// Check if the cache is empty. - pub fn is_empty(&self) -> bool { - self.entries.is_empty() - } - - /// Clear the cache. - pub fn clear(&mut self) { - self.entries.clear(); - self.kv_heap_size.set(0); - } - - /// Insert an entry into the cache. - pub fn insert(&mut self, key: K, value: V) -> Option { - let key_size = self.kv_heap_size.add_val(&key); - self.kv_heap_size.add_val(&value); - let old_val = self.entries.insert(key, value); - if let Some(old_val) = &old_val { - self.kv_heap_size.sub_size(key_size); - self.kv_heap_size.sub_val(old_val); - } - // evict if capacity is reached - while self.entries.len() > self.capacity { - if let Some((key, val)) = self.entries.pop_last() { - self.kv_heap_size.sub(&key, &val); - } - } - old_val - } - - /// Remove an entry from the cache. - pub fn remove(&mut self, key: &K) -> Option { - let old_val = self.entries.remove(key); - if let Some(val) = &old_val { - self.kv_heap_size.sub(key, val); - } - old_val - } - - /// Get the first (smallest) key-value pair in the cache. - pub fn first_key_value(&self) -> Option<(&K, &V)> { - self.entries.first_key_value() - } - - /// Get the first (smallest) key in the cache. - pub fn first_key(&self) -> Option<&K> { - self.first_key_value().map(|(k, _)| k) - } - - /// Get the last (largest) key-value pair in the cache. - pub fn last_key_value(&self) -> Option<(&K, &V)> { - self.entries.last_key_value() - } - - /// Get the last (largest) key in the cache. - pub fn last_key(&self) -> Option<&K> { - self.last_key_value().map(|(k, _)| k) - } - - /// Iterate over the values in the cache. - pub fn values(&self) -> impl Iterator { - self.entries.values() - } -} - -#[cfg(test)] -mod tests { - use itertools::Itertools; - - use super::*; - - #[test] - fn test_top_n_cache() { - let mut cache = TopNCache::new(3); - assert_eq!(cache.capacity(), 3); - assert_eq!(cache.len(), 0); - assert!(cache.is_empty()); - assert!(cache.first_key_value().is_none()); - assert!(cache.first_key().is_none()); - assert!(cache.last_key_value().is_none()); - assert!(cache.last_key().is_none()); - assert!(cache.values().collect_vec().is_empty()); - - let old_val = cache.insert(5, "hello".to_string()); - assert!(old_val.is_none()); - assert_eq!(cache.len(), 1); - assert!(!cache.is_empty()); - assert_eq!(cache.values().collect_vec(), vec!["hello"]); - - cache.insert(3, "world".to_string()); - cache.insert(1, "risingwave!".to_string()); - assert_eq!(cache.len(), 3); - assert_eq!( - cache.first_key_value(), - Some((&1, &"risingwave!".to_string())) - ); - assert_eq!(cache.first_key(), Some(&1)); - assert_eq!(cache.last_key_value(), Some((&5, &"hello".to_string()))); - assert_eq!(cache.last_key(), Some(&5)); - assert_eq!( - cache.values().collect_vec(), - vec!["risingwave!", "world", "hello"] - ); - - cache.insert(0, "foo".to_string()); - assert_eq!(cache.capacity(), 3); - assert_eq!(cache.len(), 3); - assert_eq!(cache.first_key(), Some(&0)); - assert_eq!(cache.last_key(), Some(&3)); - assert_eq!( - cache.values().collect_vec(), - vec!["foo", "risingwave!", "world"] - ); - - let old_val = cache.remove(&0); - assert_eq!(old_val, Some("foo".to_string())); - assert_eq!(cache.len(), 2); - assert_eq!(cache.first_key(), Some(&1)); - assert_eq!(cache.last_key(), Some(&3)); - cache.remove(&3); - assert_eq!(cache.len(), 1); - assert_eq!(cache.first_key(), Some(&1)); - assert_eq!(cache.last_key(), Some(&1)); - let old_val = cache.remove(&100); // can remove non-existing key - assert!(old_val.is_none()); - assert_eq!(cache.len(), 1); - - cache.clear(); - assert_eq!(cache.len(), 0); - assert_eq!(cache.capacity(), 3); - assert_eq!(cache.first_key(), None); - assert_eq!(cache.last_key(), None); - } -} diff --git a/src/stream/src/common/mod.rs b/src/stream/src/common/mod.rs index 9d79cecd2ec5..207201c4ff34 100644 --- a/src/stream/src/common/mod.rs +++ b/src/stream/src/common/mod.rs @@ -14,10 +14,10 @@ pub use column_mapping::*; -pub mod cache; mod column_mapping; pub mod compact_chunk; pub mod log_store_impl; pub mod metrics; pub mod rate_limit; +pub mod state_cache; pub mod table; diff --git a/src/stream/src/common/cache/state_cache/mod.rs b/src/stream/src/common/state_cache/mod.rs similarity index 99% rename from src/stream/src/common/cache/state_cache/mod.rs rename to src/stream/src/common/state_cache/mod.rs index ae3b1ad3069e..7d547c26b0d8 100644 --- a/src/stream/src/common/cache/state_cache/mod.rs +++ b/src/stream/src/common/state_cache/mod.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use ordered::*; use risingwave_common::array::Op; use risingwave_common_estimate_size::EstimateSize; -pub use top_n::*; mod ordered; mod top_n; +pub use ordered::*; +pub use top_n::*; + /// A common interface for state table cache. pub trait StateCache: EstimateSize { type Key: Ord + EstimateSize; diff --git a/src/stream/src/common/cache/state_cache/ordered.rs b/src/stream/src/common/state_cache/ordered.rs similarity index 69% rename from src/stream/src/common/cache/state_cache/ordered.rs rename to src/stream/src/common/state_cache/ordered.rs index 535dbcb74e9a..8a47d1b7ecaf 100644 --- a/src/stream/src/common/cache/state_cache/ordered.rs +++ b/src/stream/src/common/state_cache/ordered.rs @@ -12,55 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; - use risingwave_common::array::Op; -use risingwave_common_estimate_size::{EstimateSize, KvSize}; +use risingwave_common_estimate_size::collections::EstimatedBTreeMap; +use risingwave_common_estimate_size::EstimateSize; use super::{StateCache, StateCacheFiller}; -/// An implementation of [`StateCache`] that uses a [`BTreeMap`] as the underlying cache, with no -/// capacity limit. +/// An implementation of [`StateCache`] that keeps all entries in an ordered in-memory map. +#[derive(Clone, EstimateSize)] pub struct OrderedStateCache { - cache: BTreeMap, + cache: EstimatedBTreeMap, synced: bool, - kv_heap_size: KvSize, -} - -impl EstimateSize for OrderedStateCache { - fn estimated_heap_size(&self) -> usize { - // TODO: Add btreemap internal size - // https://github.com/risingwavelabs/risingwave/issues/9713 - self.kv_heap_size.size() - } } impl OrderedStateCache { pub fn new() -> Self { Self { - cache: BTreeMap::new(), + cache: Default::default(), synced: false, - kv_heap_size: KvSize::new(), - } - } - - fn insert_cache(&mut self, key: K, value: V) -> Option { - let key_size = self.kv_heap_size.add_val(&key); - self.kv_heap_size.add_val(&value); - let old_val = self.cache.insert(key, value); - if let Some(old_val) = &old_val { - self.kv_heap_size.sub_size(key_size); - self.kv_heap_size.sub_val(old_val); - } - old_val - } - - fn delete_cache(&mut self, key: &K) -> Option { - let old_val = self.cache.remove(key); - if let Some(old_val) = &old_val { - self.kv_heap_size.sub(key, old_val); } - old_val } } @@ -87,7 +57,7 @@ impl StateCache for OrderedStateCache Option { if self.synced { - self.insert_cache(key, value) + self.cache.insert(key, value) } else { None } @@ -95,7 +65,7 @@ impl StateCache for OrderedStateCache Option { if self.synced { - self.delete_cache(key) + self.cache.remove(key) } else { None } diff --git a/src/stream/src/common/cache/state_cache/top_n.rs b/src/stream/src/common/state_cache/top_n.rs similarity index 87% rename from src/stream/src/common/cache/state_cache/top_n.rs rename to src/stream/src/common/state_cache/top_n.rs index 4d2423f6c1e1..fef7017d7512 100644 --- a/src/stream/src/common/cache/state_cache/top_n.rs +++ b/src/stream/src/common/state_cache/top_n.rs @@ -13,17 +13,17 @@ // limitations under the License. use risingwave_common::array::Op; +use risingwave_common_estimate_size::collections::EstimatedBTreeMap; use risingwave_common_estimate_size::EstimateSize; use super::{StateCache, StateCacheFiller}; -use crate::common::cache::TopNCache; -/// An implementation of [`StateCache`] that uses a [`TopNCache`] as the underlying cache, with -/// limited capacity. +/// An implementation of [`StateCache`] that keeps a limited number of entries in an ordered in-memory map. #[derive(Clone, EstimateSize)] pub struct TopNStateCache { table_row_count: Option, - cache: TopNCache, + cache: EstimatedBTreeMap, + capacity: usize, synced: bool, } @@ -31,7 +31,8 @@ impl TopNStateCache { pub fn new(capacity: usize) -> Self { Self { table_row_count: None, - cache: TopNCache::new(capacity), + cache: Default::default(), + capacity, synced: false, } } @@ -39,7 +40,8 @@ impl TopNStateCache { pub fn with_table_row_count(capacity: usize, table_row_count: usize) -> Self { Self { table_row_count: Some(table_row_count), - cache: TopNCache::new(capacity), + cache: Default::default(), + capacity, synced: false, } } @@ -65,7 +67,12 @@ impl TopNStateCache { || self.cache.is_empty() || &key <= self.cache.last_key().unwrap() { - self.cache.insert(key, value) + let old_v = self.cache.insert(key, value); + // evict if capacity is reached + while self.cache.len() > self.capacity { + self.cache.pop_last(); + } + old_v } else { None }; @@ -87,8 +94,8 @@ impl TopNStateCache { old_val } - pub fn capacity_inner(&self) -> usize { - self.cache.capacity() + pub fn capacity(&self) -> usize { + self.capacity } pub fn len(&self) -> usize { @@ -170,7 +177,7 @@ impl StateCacheFiller for &mut TopNState type Value = V; fn capacity(&self) -> Option { - Some(self.capacity_inner()) + Some(TopNStateCache::capacity(self)) } fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value) { diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 1d1a5e2a6047..e6fbacee009b 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -64,7 +64,7 @@ use tracing::{trace, Instrument}; use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy}; use crate::cache::cache_may_stale; -use crate::common::cache::{StateCache, StateCacheFiller}; +use crate::common::state_cache::{StateCache, StateCacheFiller}; use crate::common::table::state_table_cache::StateTableWatermarkCache; use crate::executor::{StreamExecutorError, StreamExecutorResult}; diff --git a/src/stream/src/common/table/state_table_cache.rs b/src/stream/src/common/table/state_table_cache.rs index 585b51eae060..86c32b404cb7 100644 --- a/src/stream/src/common/table/state_table_cache.rs +++ b/src/stream/src/common/table/state_table_cache.rs @@ -17,7 +17,7 @@ use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DefaultOrdered, ScalarRefImpl}; use risingwave_common_estimate_size::EstimateSize; -use crate::common::cache::{StateCache, TopNStateCache}; +use crate::common::state_cache::{StateCache, TopNStateCache}; /// The watermark cache key is just an `OwnedRow` wrapped in `DefaultOrdered`. /// This is because we want to use the `DefaultOrdered` implementation of `Ord`. @@ -125,7 +125,7 @@ impl StateTableWatermarkCache { } pub fn capacity(&self) -> usize { - self.inner.capacity_inner() + self.inner.capacity() } pub fn len(&self) -> usize { @@ -186,7 +186,7 @@ mod tests { use risingwave_common::types::{Scalar, Timestamptz}; use super::*; - use crate::common::cache::StateCacheFiller; + use crate::common::state_cache::StateCacheFiller; /// With capacity 3, test the following sequence of inserts: /// Insert diff --git a/src/stream/src/executor/aggregation/agg_state_cache.rs b/src/stream/src/executor/aggregation/agg_state_cache.rs index 71cd9a45e3f1..8a202ed6dd1d 100644 --- a/src/stream/src/executor/aggregation/agg_state_cache.rs +++ b/src/stream/src/executor/aggregation/agg_state_cache.rs @@ -23,7 +23,7 @@ use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common_estimate_size::EstimateSize; use smallvec::SmallVec; -use crate::common::cache::{StateCache, StateCacheFiller}; +use crate::common::state_cache::{StateCache, StateCacheFiller}; /// Cache key type. type CacheKey = MemcmpEncoded; diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index c8bae26f6e1b..db3364574d5d 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -31,7 +31,7 @@ use risingwave_storage::StateStore; use super::agg_state_cache::{AggStateCache, GenericAggStateCache}; use super::GroupKey; -use crate::common::cache::{OrderedStateCache, TopNStateCache}; +use crate::common::state_cache::{OrderedStateCache, TopNStateCache}; use crate::common::table::state_table::StateTable; use crate::common::StateTableColumnMapping; use crate::executor::{PkIndices, StreamExecutorResult}; diff --git a/src/stream/src/executor/sort_buffer.rs b/src/stream/src/executor/sort_buffer.rs index fc4f921e7ccb..c2dfcea07684 100644 --- a/src/stream/src/executor/sort_buffer.rs +++ b/src/stream/src/executor/sort_buffer.rs @@ -34,7 +34,7 @@ use risingwave_storage::table::KeyedRow; use risingwave_storage::StateStore; use super::{StreamExecutorError, StreamExecutorResult}; -use crate::common::cache::{StateCache, StateCacheFiller, TopNStateCache}; +use crate::common::state_cache::{StateCache, StateCacheFiller, TopNStateCache}; use crate::common::table::state_table::StateTable; type CacheKey = ( diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 9457e2f7729c..c061efd36b75 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::ops::{Deref, DerefMut}; use risingwave_common::array::Op; use risingwave_common::buffer::Bitmap; use risingwave_common::hash::HashKey; -use risingwave_common::row::RowExt; +use risingwave_common::row::{RowDeserializer, RowExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::sort_util::ColumnOrder; @@ -157,14 +158,18 @@ impl TopNExecutorBase where TopNCache: TopNCacheTrait, { - async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult { - let mut res_ops = Vec::with_capacity(self.limit); - let mut res_rows = Vec::with_capacity(self.limit); + async fn apply_chunk( + &mut self, + chunk: StreamChunk, + ) -> StreamExecutorResult> { let keys = K::build_many(&self.group_by, chunk.data_chunk()); + let mut stagings = HashMap::new(); // K -> `TopNStaging` + for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { let Some((op, row_ref)) = r else { continue; }; + // The pk without group by let pk_row = row_ref.project(&self.storage_key_indices[self.group_by.len()..]); let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde); @@ -184,12 +189,13 @@ where } let mut cache = self.caches.get_mut(group_cache_key).unwrap(); + let staging = stagings.entry(group_cache_key.clone()).or_default(); // apply the chunk to state table match op { Op::Insert | Op::UpdateInsert => { self.managed_state.insert(row_ref); - cache.insert(cache_key, row_ref, &mut res_ops, &mut res_rows); + cache.insert(cache_key, row_ref, staging); } Op::Delete | Op::UpdateDelete => { @@ -200,17 +206,27 @@ where &mut self.managed_state, cache_key, row_ref, - &mut res_ops, - &mut res_rows, + staging, ) .await?; } } } + self.metrics .group_top_n_cached_entry_count .set(self.caches.len() as i64); - generate_output(res_rows, res_ops, &self.schema) + + let data_types = self.schema.data_types(); + let deserializer = RowDeserializer::new(data_types.clone()); + let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity())); + for staging in stagings.into_values() { + for res in staging.into_deserialized_changes(&deserializer) { + let (op, row) = res?; + let _none = chunk_builder.append_row(op, row); + } + } + Ok(chunk_builder.take()) } async fn flush_data(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { @@ -252,7 +268,6 @@ where mod tests { use std::sync::atomic::AtomicU64; - use assert_matches::assert_matches; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::catalog::Field; use risingwave_common::hash::SerializedKey; @@ -262,7 +277,7 @@ mod tests { use super::*; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table; - use crate::executor::test_utils::MockSource; + use crate::executor::test_utils::{MockSource, StreamExecutorTestExt}; fn create_schema() -> Schema { Schema { @@ -359,7 +374,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = GroupTopNExecutor::::new( + let top_n = GroupTopNExecutor::::new( source, ActorContext::for_test(0), schema, @@ -371,14 +386,13 @@ mod tests { Arc::new(AtomicU64::new(0)), ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 10 9 1 + 8 8 2 @@ -386,58 +400,50 @@ mod tests { + 9 1 1 + 10 1 1 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 10 9 1 - 8 8 2 - 10 1 1 + 8 1 3 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 7 8 2 - 8 1 3 - 9 1 1 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 5 1 1 + 2 1 1 ", - ), + ) + .sort_rows(), ); } @@ -455,7 +461,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = GroupTopNExecutor::::new( + let top_n = GroupTopNExecutor::::new( source, ActorContext::for_test(0), schema, @@ -467,66 +473,57 @@ mod tests { Arc::new(AtomicU64::new(0)), ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 8 8 2 + 10 1 1 + 8 1 3 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 8 8 2 - 10 1 1 ", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 8 1 3", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 5 1 1 + 3 1 2 ", - ), + ) + .sort_rows(), ); } @@ -544,7 +541,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = GroupTopNExecutor::::new( + let top_n = GroupTopNExecutor::::new( source, ActorContext::for_test(0), schema, @@ -556,14 +553,13 @@ mod tests { Arc::new(AtomicU64::new(0)), ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 10 9 1 + 8 8 2 @@ -571,56 +567,148 @@ mod tests { + 9 1 1 + 10 1 1 + 8 1 3", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 10 9 1 - 8 8 2 - 10 1 1", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I - 7 8 2 - 8 1 3 - 9 1 1", - ), + ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - res.as_chunk().unwrap(), - &StreamChunk::from_pretty( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( " I I I + 5 1 1 + 2 1 1 + 3 1 2 + 4 1 3", - ), + ) + .sort_rows(), ); } + + #[tokio::test] + async fn test_compact_changes() { + let schema = create_schema(); + let source = MockSource::with_messages(vec![ + Message::Barrier(Barrier::new_test_barrier(test_epoch(1))), + Message::Chunk(StreamChunk::from_pretty( + " I I I + + 0 0 9 + + 0 0 8 + + 0 0 7 + + 0 0 6 + + 0 1 15 + + 0 1 14", + )), + Message::Barrier(Barrier::new_test_barrier(test_epoch(2))), + Message::Chunk(StreamChunk::from_pretty( + " I I I + - 0 0 6 + - 0 0 8 + + 0 0 4 + + 0 0 3 + + 0 1 12 + + 0 2 26 + - 0 1 12 + + 0 1 11", + )), + Message::Barrier(Barrier::new_test_barrier(test_epoch(3))), + Message::Chunk(StreamChunk::from_pretty( + " I I I + + 0 0 11", // this should result in no chunk output + )), + Message::Barrier(Barrier::new_test_barrier(test_epoch(4))), + ]) + .into_executor(schema.clone(), vec![2]); + + let state_table = create_in_memory_state_table( + &schema.data_types(), + &[ + OrderType::ascending(), + OrderType::ascending(), + OrderType::ascending(), + ], + &[0, 1, 2], // table pk = group key (0, 1) + order key (2) + additional pk (empty) + ) + .await; + + let top_n = GroupTopNExecutor::::new( + source, + ActorContext::for_test(0), + schema, + vec![ + ColumnOrder::new(0, OrderType::ascending()), + ColumnOrder::new(1, OrderType::ascending()), + ColumnOrder::new(2, OrderType::ascending()), + ], + (0, 2), // (offset, limit) + vec![ColumnOrder::new(2, OrderType::ascending())], + vec![0, 1], + state_table, + Arc::new(AtomicU64::new(0)), + ) + .unwrap(); + let mut top_n = top_n.boxed().execute(); + + // initial barrier + top_n.expect_barrier().await; + + assert_eq!( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( + " I I I + + 0 0 7 + + 0 0 6 + + 0 1 15 + + 0 1 14", + ) + .sort_rows(), + ); + top_n.expect_barrier().await; + + assert_eq!( + top_n.expect_chunk().await.sort_rows(), + StreamChunk::from_pretty( + " I I I + - 0 0 6 + - 0 0 7 + + 0 0 4 + + 0 0 3 + - 0 1 15 + + 0 1 11 + + 0 2 26", + ) + .sort_rows(), + ); + top_n.expect_barrier().await; + + // no output chunk for the last input chunk + top_n.expect_barrier().await; + } } diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index b8e409448662..f6af3b667492 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use risingwave_common::array::Op; use risingwave_common::buffer::Bitmap; use risingwave_common::hash::HashKey; @@ -137,17 +139,20 @@ impl TopNExecutorBase where TopNCache: AppendOnlyTopNCacheTrait, { - async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult { - let mut res_ops = Vec::with_capacity(self.limit); - let mut res_rows = Vec::with_capacity(self.limit); + async fn apply_chunk( + &mut self, + chunk: StreamChunk, + ) -> StreamExecutorResult> { let keys = K::build_many(&self.group_by, chunk.data_chunk()); + let mut stagings = HashMap::new(); // K -> `TopNStaging` let data_types = self.schema.data_types(); - let row_deserializer = RowDeserializer::new(data_types.clone()); + let deserializer = RowDeserializer::new(data_types.clone()); for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { let Some((op, row_ref)) = r else { continue; }; + // The pk without group by let pk_row = row_ref.project(&self.storage_key_indices[self.group_by.len()..]); let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde); @@ -164,22 +169,33 @@ where .await?; self.caches.push(group_cache_key.clone(), topn_cache); } + let mut cache = self.caches.get_mut(group_cache_key).unwrap(); + let staging = stagings.entry(group_cache_key.clone()).or_default(); debug_assert_eq!(op, Op::Insert); cache.insert( cache_key, row_ref, - &mut res_ops, - &mut res_rows, + staging, &mut self.managed_state, - &row_deserializer, + &deserializer, )?; } + self.metrics .group_top_n_cached_entry_count .set(self.caches.len() as i64); - generate_output(res_rows, res_ops, &self.schema) + + let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity())); + for staging in stagings.into_values() { + for res in staging.into_deserialized_changes(&deserializer) { + let (op, row) = res?; + let _none = chunk_builder.append_row(op, row); + } + } + + Ok(chunk_builder.take()) } async fn flush_data(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { diff --git a/src/stream/src/executor/top_n/mod.rs b/src/stream/src/executor/top_n/mod.rs index b33f5f4883ca..2ad3c0ed7330 100644 --- a/src/stream/src/executor/top_n/mod.rs +++ b/src/stream/src/executor/top_n/mod.rs @@ -19,9 +19,7 @@ use utils::*; mod top_n_cache; mod top_n_state; use top_n_cache::{TopNCache, TopNCacheTrait}; -mod topn_cache_state; use top_n_state::ManagedTopNState; -use topn_cache_state::CacheKey; // `TopN` variants mod group_top_n; diff --git a/src/stream/src/executor/top_n/top_n_appendonly.rs b/src/stream/src/executor/top_n/top_n_appendonly.rs index 1b45c82c9c83..be7610175645 100644 --- a/src/stream/src/executor/top_n/top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/top_n_appendonly.rs @@ -17,7 +17,7 @@ use risingwave_common::row::{RowDeserializer, RowExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::sort_util::ColumnOrder; -use super::top_n_cache::AppendOnlyTopNCacheTrait; +use super::top_n_cache::{AppendOnlyTopNCacheTrait, TopNStaging}; use super::utils::*; use super::{ManagedTopNState, TopNCache}; use crate::executor::prelude::*; @@ -104,11 +104,13 @@ impl TopNExecutorBase where TopNCache: AppendOnlyTopNCacheTrait, { - async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult { - let mut res_ops = Vec::with_capacity(self.cache.limit); - let mut res_rows = Vec::with_capacity(self.cache.limit); + async fn apply_chunk( + &mut self, + chunk: StreamChunk, + ) -> StreamExecutorResult> { + let mut staging = TopNStaging::new(); let data_types = self.schema.data_types(); - let row_deserializer = RowDeserializer::new(data_types); + let deserializer = RowDeserializer::new(data_types.clone()); // apply the chunk to state table for (op, row_ref) in chunk.rows() { debug_assert_eq!(op, Op::Insert); @@ -117,14 +119,21 @@ where self.cache.insert( cache_key, row_ref, - &mut res_ops, - &mut res_rows, + &mut staging, &mut self.managed_state, - &row_deserializer, + &deserializer, )?; } - generate_output(res_rows, res_ops, &self.schema) + if staging.is_empty() { + return Ok(None); + } + let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len())); + for res in staging.into_deserialized_changes(&deserializer) { + let (op, row) = res?; + let _none = chunk_builder.append_row(op, row); + } + Ok(chunk_builder.take()) } async fn flush_data(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { @@ -151,8 +160,6 @@ where #[cfg(test)] mod tests { - use assert_matches::assert_matches; - use futures::StreamExt; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{Field, Schema}; @@ -162,7 +169,7 @@ mod tests { use super::AppendOnlyTopNExecutor; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table; - use crate::executor::test_utils::MockSource; + use crate::executor::test_utils::{MockSource, StreamExecutorTestExt}; use crate::executor::{ActorContext, Barrier, Execute, Executor, Message, PkIndices}; fn create_stream_chunks() -> Vec { @@ -241,7 +248,7 @@ mod tests { .await; let schema = source.schema().clone(); - let top_n_executor = AppendOnlyTopNExecutor::<_, false>::new( + let top_n = AppendOnlyTopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -251,54 +258,43 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init epoch - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2 - + 10 3 + 9 4 - - 10 3 + 8 5" ) + .sort_rows(), ); // We added (1, 2, 3, 10, 9, 8). // Now (1, 2, 3, 8, 9) // Barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 9 4 - + 7 6 - 8 5 + 3 7 - - 7 6 + 1 8" ) + .sort_rows(), ); // We added (7, 3, 1, 9). // Now (1, 1, 2, 3, 3) // Barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 3 7 @@ -306,6 +302,7 @@ mod tests { - 3 2 + 1 13" ) + .sort_rows(), ); // We added (1, 1, 2, 3). // Now (1, 1, 1, 1, 2) @@ -322,7 +319,7 @@ mod tests { .await; let schema = source.schema().clone(); - let top_n_executor = AppendOnlyTopNExecutor::<_, false>::new( + let top_n = AppendOnlyTopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -332,30 +329,26 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init epoch - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 10 3 + 9 4 + 8 5" ) + .sort_rows(), ); // We added (1, 2, 3, 10, 9, 8). // Now (1, 2, 3) -> (8, 9, 10) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 7 6 @@ -364,17 +357,14 @@ mod tests { - 9 4 + 3 2" ) + .sort_rows(), ); // We added (7, 3, 1, 9). // Now (1, 1, 2) -> (3, 3, 7, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 8 5 @@ -384,6 +374,7 @@ mod tests { - 3 7 + 2 14" ) + .sort_rows(), ); // We added (1, 1, 2, 3). // Now (1, 1, 1) -> (1, 2, 2, 3) diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index 3ce58e4e6017..c7536bf98bbb 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -13,21 +13,26 @@ // limitations under the License. use std::cmp::Ordering; +use std::collections::BTreeMap; use std::fmt::Debug; use std::future::Future; use itertools::Itertools; use risingwave_common::array::{Op, RowRef}; -use risingwave_common::row::{CompactedRow, Row, RowDeserializer, RowExt}; +use risingwave_common::row::{CompactedRow, OwnedRow, Row, RowDeserializer, RowExt}; use risingwave_common::types::DataType; +use risingwave_common_estimate_size::collections::EstimatedBTreeMap; use risingwave_common_estimate_size::EstimateSize; use risingwave_storage::StateStore; -use super::topn_cache_state::TopNCacheState; -use super::{CacheKey, GroupKey, ManagedTopNState}; +use super::{GroupKey, ManagedTopNState}; use crate::consistency::{consistency_error, enable_strict_consistency}; use crate::executor::error::StreamExecutorResult; +/// `CacheKey` is composed of `(order_by, remaining columns of pk)`. +pub type CacheKey = (Vec, Vec); +pub type Cache = EstimatedBTreeMap; + const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2; const TOPN_CACHE_MIN_CAPACITY: usize = 10; @@ -43,19 +48,32 @@ const TOPN_CACHE_MIN_CAPACITY: usize = 10; /// `OFFSET m FETCH FIRST n ROWS WITH TIES` and `m <= RANK() <= n` are not supported now, /// since they have different semantics. pub struct TopNCache { - /// Rows in the range `[0, offset)` - pub low: TopNCacheState, - /// Rows in the range `[offset, offset+limit)` + /// Rows in the range `[0, offset)`. Should always be synced with state table. + pub low: Option, + + /// Rows in the range `[offset, offset+limit)`. Should always be synced with state table. /// /// When `WITH_TIES` is true, it also stores ties for the last element, /// and thus the size can be larger than `limit`. - pub middle: TopNCacheState, - /// Rows in the range `[offset+limit, offset+limit+high_capacity)` + pub middle: Cache, + + /// Cache of the beginning rows in the range `[offset+limit, ...)`. /// - /// When `WITH_TIES` is true, it also stores ties for the last element, - /// and thus the size can be larger than `high_capacity`. - pub high: TopNCacheState, - pub high_capacity: usize, + /// This is very similar to [`TopNStateCache`], which only caches the top-N rows in the table + /// and only accepts new records that are less than the largest in the cache. + /// + /// When `WITH_TIES` is true, it guarantees that the ties of the last element are in the cache, + /// and thus the size can be larger than `rest_cache_capacity`. + /// + /// When the cache becomes empty, if the `table_row_count` is not matched, we need to view the cache + /// as unsynced and refill it from the state table. + /// + /// TODO(rc): later we should reuse [`TopNStateCache`] here. + /// + /// [`TopNStateCache`]: crate::common::state_cache::TopNStateCache + pub high: Cache, + pub high_cache_capacity: usize, + pub offset: usize, /// Assumption: `limit != 0` pub limit: usize, @@ -83,13 +101,13 @@ impl Debug for TopNCache { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "TopNCache {{\n offset: {}, limit: {}, high_capacity: {},\n", - self.offset, self.limit, self.high_capacity + "TopNCache {{\n offset: {}, limit: {}, high_cache_capacity: {},\n", + self.offset, self.limit, self.high_cache_capacity )?; fn format_cache( f: &mut std::fmt::Formatter<'_>, - cache: &TopNCacheState, + cache: &Cache, data_types: &[DataType], ) -> std::fmt::Result { if cache.is_empty() { @@ -109,7 +127,11 @@ impl Debug for TopNCache { } writeln!(f, " low:")?; - format_cache(f, &self.low, &self.data_types)?; + if let Some(low) = &self.low { + format_cache(f, low, &self.data_types)?; + } else { + writeln!(f, " ")?; + } writeln!(f, "\n middle:")?; format_cache(f, &self.middle, &self.data_types)?; writeln!(f, "\n high:")?; @@ -128,53 +150,45 @@ pub trait TopNCacheTrait { /// /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be /// used to generate messages to be sent to downstream operators. - #[allow(clippy::too_many_arguments)] - fn insert( - &mut self, - cache_key: CacheKey, - row: impl Row + Send, - res_ops: &mut Vec, - res_rows: &mut Vec, - ); + fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging); /// Delete input row from the cache. /// /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be /// used to generate messages to be sent to downstream operators. /// - /// Because we may need to add data from the state table to `self.high` during the delete + /// Because we may need to refill data from the state table to `self.high` during the delete /// operation, we need to pass in `group_key`, `epoch` and `managed_state` to do a prefix /// scan of the state table. - #[allow(clippy::too_many_arguments)] fn delete( &mut self, group_key: Option, managed_state: &mut ManagedTopNState, cache_key: CacheKey, row: impl Row + Send, - res_ops: &mut Vec, - res_rows: &mut Vec, + staging: &mut TopNStaging, ) -> impl Future> + Send; } impl TopNCache { /// `data_types` -- Data types for the full row. pub fn new(offset: usize, limit: usize, data_types: Vec) -> Self { - assert!(limit != 0); + assert!(limit > 0); if WITH_TIES { // It's trickier to support. // Also `OFFSET WITH TIES` has different semantic with `a < RANK() < b` assert!(offset == 0, "OFFSET is not supported with WITH TIES"); } + let high_cache_capacity = offset + .checked_add(limit) + .and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR)) + .unwrap_or(usize::MAX) + .max(TOPN_CACHE_MIN_CAPACITY); Self { - low: TopNCacheState::new(), - middle: TopNCacheState::new(), - high: TopNCacheState::new(), - high_capacity: offset - .checked_add(limit) - .and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR)) - .unwrap_or(usize::MAX) - .max(TOPN_CACHE_MIN_CAPACITY), + low: if offset > 0 { Some(Cache::new()) } else { None }, + middle: Cache::new(), + high: Cache::new(), + high_cache_capacity, offset, limit, table_row_count: None, @@ -185,37 +199,35 @@ impl TopNCache { /// Clear the cache. After this, the cache must be `init` again before use. #[allow(dead_code)] pub fn clear(&mut self) { - self.low.clear(); + self.low.as_mut().map(Cache::clear); self.middle.clear(); self.high.clear(); } /// Get total count of entries in the cache. pub fn len(&self) -> usize { - self.low.len() + self.middle.len() + self.high.len() + self.low.as_ref().map(Cache::len).unwrap_or(0) + self.middle.len() + self.high.len() } pub(super) fn update_table_row_count(&mut self, table_row_count: usize) { self.table_row_count = Some(table_row_count) } - fn table_row_count_matched(&self) -> bool { - self.table_row_count - .map(|n| n == self.len()) - .unwrap_or(false) - } - - pub fn is_low_cache_full(&self) -> bool { - assert!(self.low.len() <= self.offset); - let full = self.low.len() == self.offset; - if !full { - assert!(self.middle.is_empty()); - assert!(self.high.is_empty()); + pub fn low_is_full(&self) -> bool { + if let Some(low) = &self.low { + assert!(low.len() <= self.offset); + let full = low.len() == self.offset; + if !full { + assert!(self.middle.is_empty()); + assert!(self.high.is_empty()); + } + full + } else { + true } - full } - pub fn is_middle_cache_full(&self) -> bool { + pub fn middle_is_full(&self) -> bool { // For WITH_TIES, the middle cache can exceed the capacity. if !WITH_TIES { assert!( @@ -225,7 +237,7 @@ impl TopNCache { } let full = self.middle.len() >= self.limit; if full { - assert!(self.is_low_cache_full()); + assert!(self.low_is_full()); } else { assert!( self.high.is_empty(), @@ -235,121 +247,123 @@ impl TopNCache { full } - pub fn is_high_cache_full(&self) -> bool { + pub fn high_is_full(&self) -> bool { // For WITH_TIES, the high cache can exceed the capacity. if !WITH_TIES { - assert!(self.high.len() <= self.high_capacity); + assert!(self.high.len() <= self.high_cache_capacity); } - self.high.len() >= self.high_capacity - } - - fn last_cache_key_before_high(&self) -> Option<&CacheKey> { - let middle_last_key = self.middle.last_key_value().map(|(k, _)| k); - middle_last_key.or_else(|| self.low.last_key_value().map(|(k, _)| k)) + self.high.len() >= self.high_cache_capacity } - /// Use this method instead of `self.high.insert` directly when possible. - /// - /// It only inserts into high cache if the key is smaller than the largest key in the high - /// cache. Otherwise, we simply ignore the row. We will wait until the high cache becomes - /// empty and fill it at that time. - fn insert_high_cache(&mut self, cache_key: CacheKey, row: CompactedRow, is_from_middle: bool) { - if !self.is_high_cache_full() { - if is_from_middle { - self.high.insert(cache_key, row); - return; - } - // For direct insert, we need to check if the key is smaller than the largest key - if let Some(high_last) = self.high.last_key_value() - && cache_key <= *high_last.0 - { - debug_assert!(cache_key != *high_last.0, "cache_key should be unique"); - self.high.insert(cache_key, row); - } + fn high_is_synced(&self) -> bool { + if !self.high.is_empty() { + true } else { - let high_last = self.high.last_entry().unwrap(); - if cache_key <= *high_last.key() { - debug_assert!(cache_key != *high_last.key(), "cache_key should be unique"); - high_last.remove_entry(); - self.high.insert(cache_key, row); - } + // check if table row count matches + self.table_row_count + .map(|n| n == self.len()) + .unwrap_or(false) } } + + fn last_cache_key_before_high(&self) -> Option<&CacheKey> { + let middle_last_key = self.middle.last_key_value().map(|(k, _)| k); + middle_last_key.or_else(|| { + self.low + .as_ref() + .and_then(Cache::last_key_value) + .map(|(k, _)| k) + }) + } } impl TopNCacheTrait for TopNCache { - fn insert( - &mut self, - cache_key: CacheKey, - row: impl Row + Send, - res_ops: &mut Vec, - res_rows: &mut Vec, - ) { + fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging) { if let Some(row_count) = self.table_row_count.as_mut() { *row_count += 1; } - if !self.is_low_cache_full() { - self.low.insert(cache_key, (&row).into()); - return; + let mut to_insert = (cache_key, (&row).into()); + let mut is_last_of_lower_cache = false; // for saving one key comparison + + let low_is_full = self.low_is_full(); + if let Some(low) = &mut self.low { + // try insert into low cache + + if !low_is_full { + low.insert(to_insert.0, to_insert.1); + return; + } + + // low cache is full + let low_last = low.last_entry().unwrap(); + if &to_insert.0 < low_last.key() { + // make space for the new entry + let low_last = low_last.remove_entry(); + low.insert(to_insert.0, to_insert.1); + to_insert = low_last; // move the last entry to the middle cache + is_last_of_lower_cache = true; + } } - let elem_to_compare_with_middle = if let Some(low_last) = self.low.last_entry() - && cache_key <= *low_last.key() - { - // Take the last element of `cache.low` and insert input row to it. - let low_last = low_last.remove_entry(); - self.low.insert(cache_key, (&row).into()); - low_last - } else { - (cache_key, (&row).into()) - }; - if !self.is_middle_cache_full() { - self.middle.insert( - elem_to_compare_with_middle.0, - elem_to_compare_with_middle.1.clone(), - ); - res_ops.push(Op::Insert); - res_rows.push(elem_to_compare_with_middle.1); + // try insert into middle cache + + if !self.middle_is_full() { + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + staging.insert(to_insert.0, to_insert.1); return; } - let mut is_from_middle = false; - let elem_to_compare_with_high = { - let middle_last = self.middle.last_entry().unwrap(); - if elem_to_compare_with_middle.0 <= *middle_last.key() { - // If the row in the range of [offset, offset+limit), the largest row in - // `cache.middle` needs to be moved to `cache.high` - let res = middle_last.remove_entry(); - res_ops.push(Op::Delete); - res_rows.push(res.1.clone()); - res_ops.push(Op::Insert); - res_rows.push(elem_to_compare_with_middle.1.clone()); - self.middle - .insert(elem_to_compare_with_middle.0, elem_to_compare_with_middle.1); - is_from_middle = true; - res - } else { - elem_to_compare_with_middle + // middle cache is full + let middle_last = self.middle.last_entry().unwrap(); + if is_last_of_lower_cache || &to_insert.0 < middle_last.key() { + // make space for the new entry + let middle_last = middle_last.remove_entry(); + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + + staging.delete(middle_last.0.clone(), middle_last.1.clone()); + staging.insert(to_insert.0, to_insert.1); + + to_insert = middle_last; // move the last entry to the high cache + is_last_of_lower_cache = true; + } + + // try insert into high cache + + // The logic is a bit different from the other two caches, because high cache is not + // guaranteed to be fully synced with the "high part" of the table. + + if is_last_of_lower_cache || self.high_is_synced() { + // For `is_last_of_lower_cache`, an obvious observation is that the key to insert is + // always smaller than any key in the high part of the table. + + if self.high.is_empty() { + // if high cache is empty, we can insert directly + self.high.insert(to_insert.0, to_insert.1); + return; } - }; - self.insert_high_cache( - elem_to_compare_with_high.0, - elem_to_compare_with_high.1, - is_from_middle, - ); + let high_is_full = self.high_is_full(); + let high_last = self.high.last_entry().unwrap(); + + if is_last_of_lower_cache || &to_insert.0 < high_last.key() { + // we can only insert if the key is smaller than the largest key in the high cache + if high_is_full { + // make space for the new entry + high_last.remove_entry(); + } + self.high.insert(to_insert.0, to_insert.1); + } + } } - #[allow(clippy::too_many_arguments)] async fn delete( &mut self, group_key: Option, managed_state: &mut ManagedTopNState, cache_key: CacheKey, row: impl Row + Send, - res_ops: &mut Vec, - res_rows: &mut Vec, + staging: &mut TopNStaging, ) -> StreamExecutorResult<()> { if !enable_strict_consistency() && self.table_row_count == Some(0) { // If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we @@ -361,67 +375,92 @@ impl TopNCacheTrait for TopNCache { *row_count -= 1; } - if self.is_middle_cache_full() && cache_key > *self.middle.last_key_value().unwrap().0 { - // The row is in high + if self.middle_is_full() && &cache_key > self.middle.last_key_value().unwrap().0 { + // the row is in high self.high.remove(&cache_key); - } else if self.is_low_cache_full() - && (self.offset == 0 || cache_key > *self.low.last_key_value().unwrap().0) + } else if self.low_is_full() + && self + .low + .as_ref() + .map(|low| &cache_key > low.last_key_value().unwrap().0) + .unwrap_or( + true, // if low is None, `cache_key` should be in middle + ) { - // The row is in mid - self.middle.remove(&cache_key); - res_ops.push(Op::Delete); - res_rows.push((&row).into()); + // the row is in middle + let removed = self.middle.remove(&cache_key); + staging.delete(cache_key.clone(), (&row).into()); + + if removed.is_none() { + // the middle cache should always be synced, if the key is not found, then it also doesn't + // exist in the state table + consistency_error!( + ?group_key, + ?cache_key, + "cache key not found in middle cache" + ); + return Ok(()); + } - // Try to fill the high cache if it is empty - if self.high.is_empty() && !self.table_row_count_matched() { + // refill the high cache if it's not synced + if !self.high_is_synced() { + self.high.clear(); managed_state .fill_high_cache( group_key, self, self.last_cache_key_before_high().cloned(), - self.high_capacity, + self.high_cache_capacity, ) .await?; } - // Bring one element, if any, from high cache to middle cache + // bring one element, if any, from high cache to middle cache if !self.high.is_empty() { let high_first = self.high.pop_first().unwrap(); - res_ops.push(Op::Insert); - res_rows.push(high_first.1.clone()); - self.middle.insert(high_first.0, high_first.1); + self.middle + .insert(high_first.0.clone(), high_first.1.clone()); + staging.insert(high_first.0, high_first.1); } assert!(self.high.is_empty() || self.middle.len() == self.limit); } else { - // The row is in low - self.low.remove(&cache_key); + // the row is in low + let low = self.low.as_mut().unwrap(); + let removed = low.remove(&cache_key); + + if removed.is_none() { + // the low cache should always be synced, if the key is not found, then it also doesn't + // exist in the state table + consistency_error!(?group_key, ?cache_key, "cache key not found in low cache"); + return Ok(()); + } - // Bring one element, if any, from middle cache to low cache + // bring one element, if any, from middle cache to low cache if !self.middle.is_empty() { let middle_first = self.middle.pop_first().unwrap(); - res_ops.push(Op::Delete); - res_rows.push(middle_first.1.clone()); - self.low.insert(middle_first.0, middle_first.1); + staging.delete(middle_first.0.clone(), middle_first.1.clone()); + low.insert(middle_first.0, middle_first.1); - // Try to fill the high cache if it is empty - if self.high.is_empty() && !self.table_row_count_matched() { + // fill the high cache if it's not synced + if !self.high_is_synced() { + self.high.clear(); managed_state .fill_high_cache( group_key, self, self.last_cache_key_before_high().cloned(), - self.high_capacity, + self.high_cache_capacity, ) .await?; } - // Bring one element, if any, from high cache to middle cache + // bring one element, if any, from high cache to middle cache if !self.high.is_empty() { let high_first = self.high.pop_first().unwrap(); - res_ops.push(Op::Insert); - res_rows.push(high_first.1.clone()); - self.middle.insert(high_first.0, high_first.1); + self.middle + .insert(high_first.0.clone(), high_first.1.clone()); + staging.insert(high_first.0, high_first.1); } } } @@ -431,44 +470,37 @@ impl TopNCacheTrait for TopNCache { } impl TopNCacheTrait for TopNCache { - fn insert( - &mut self, - cache_key: CacheKey, - row: impl Row + Send, - res_ops: &mut Vec, - res_rows: &mut Vec, - ) { + fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging) { if let Some(row_count) = self.table_row_count.as_mut() { *row_count += 1; } assert!( - self.low.is_empty(), - "Offset is not supported yet for WITH TIES, so low cache should be empty" + self.low.is_none(), + "Offset is not supported yet for WITH TIES, so low cache should be None" ); - let elem_to_compare_with_middle = (cache_key, row); + let to_insert: (CacheKey, CompactedRow) = (cache_key, (&row).into()); - if !self.is_middle_cache_full() { - self.middle.insert( - elem_to_compare_with_middle.0.clone(), - (&elem_to_compare_with_middle.1).into(), - ); - res_ops.push(Op::Insert); - res_rows.push((&elem_to_compare_with_middle.1).into()); + // try insert into middle cache + + if !self.middle_is_full() { + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + staging.insert(to_insert.0.clone(), to_insert.1); return; } - let sort_key = &elem_to_compare_with_middle.0 .0; - let middle_last = self.middle.last_key_value().unwrap(); - let middle_last_order_by = &middle_last.0 .0.clone(); + // middle cache is full + + let to_insert_sort_key = &(to_insert.0).0; + let middle_last_sort_key = self.middle.last_key().unwrap().0.clone(); - match sort_key.cmp(middle_last_order_by) { + match to_insert_sort_key.cmp(&middle_last_sort_key) { Ordering::Less => { - // The row is in middle. - let num_ties = self + // the row is in middle + let n_ties_of_last = self .middle - .range((middle_last_order_by.clone(), vec![])..) + .range((middle_last_sort_key.clone(), vec![])..) .count(); // We evict the last row and its ties only if the number of remaining rows still is // still larger than limit, i.e., there are limit-1 other rows. @@ -477,59 +509,74 @@ impl TopNCacheTrait for TopNCache { // insert 0 -> [0,1,1,1,1] // insert 0 -> [0,0,1,1,1,1] // insert 0 -> [0,0,0] - if self.middle.len() - num_ties + 1 >= self.limit { + if self.middle.len() + 1 - n_ties_of_last >= self.limit { + // Middle will be full without the last element and its ties after insertion. + // Let's move the last element and its ties to high cache first. while let Some(middle_last) = self.middle.last_entry() - && middle_last.key().0 == middle_last_order_by.clone() + && middle_last.key().0 == middle_last_sort_key { let middle_last = middle_last.remove_entry(); - res_ops.push(Op::Delete); - res_rows.push(middle_last.1.clone()); + staging.delete(middle_last.0.clone(), middle_last.1.clone()); + // we can blindly move entries from middle cache to high cache no matter high cache is synced or not self.high.insert(middle_last.0, middle_last.1); } } - if self.high.len() >= self.high_capacity { + if self.high.len() > self.high_cache_capacity { + // evict some entries from high cache if it exceeds the capacity let high_last = self.high.pop_last().unwrap(); - let high_last_order_by = high_last.0 .0; - self.high.retain(|k, _| k.0 != high_last_order_by); + let high_last_sort_key = (high_last.0).0; + // Remove all ties of the last element in high cache, for the sake of simplicity. + // This may cause repeatedly refill the high cache if number of ties is large. + self.high.retain(|k, _| k.0 != high_last_sort_key); } - res_ops.push(Op::Insert); - res_rows.push((&elem_to_compare_with_middle.1).into()); - self.middle.insert( - elem_to_compare_with_middle.0, - (&elem_to_compare_with_middle.1).into(), - ); + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + staging.insert(to_insert.0, to_insert.1); } Ordering::Equal => { - // The row is in middle and is a tie with the last row. - res_ops.push(Op::Insert); - res_rows.push((&elem_to_compare_with_middle.1).into()); - self.middle.insert( - elem_to_compare_with_middle.0, - (&elem_to_compare_with_middle.1).into(), - ); + // the row is in middle and is a tie of the last row + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + staging.insert(to_insert.0, to_insert.1); } Ordering::Greater => { - // The row is in high. - let elem_to_compare_with_high = elem_to_compare_with_middle; - self.insert_high_cache( - elem_to_compare_with_high.0, - elem_to_compare_with_high.1.into(), - false, - ); + // the row is in high + + if self.high_is_synced() { + // only insert into high cache if it is synced + + if self.high.is_empty() { + // if high cache is empty, we can insert directly + self.high.insert(to_insert.0, to_insert.1); + return; + } + + if to_insert_sort_key <= &self.high.last_key().unwrap().0 { + // We can only insert if the key is <= the largest key in the high cache. + // Note that we have all ties of the last element in the high cache, so we can + // safely compare only the sort key. + self.high.insert(to_insert.0, to_insert.1); + } + + if self.high.len() > self.high_cache_capacity { + // evict some entries from high cache if it exceeds the capacity + let high_last = self.high.pop_last().unwrap(); + let high_last_sort_key = (high_last.0).0; + // Remove all ties of the last element in high cache, for the sake of simplicity. + // This may cause repeatedly refill the high cache if number of ties is large. + self.high.retain(|k, _| k.0 != high_last_sort_key); + } + } } } } - #[allow(clippy::too_many_arguments)] async fn delete( &mut self, group_key: Option, managed_state: &mut ManagedTopNState, cache_key: CacheKey, row: impl Row + Send, - res_ops: &mut Vec, - res_rows: &mut Vec, + staging: &mut TopNStaging, ) -> StreamExecutorResult<()> { if !enable_strict_consistency() && self.table_row_count == Some(0) { // If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we @@ -540,56 +587,61 @@ impl TopNCacheTrait for TopNCache { *row_count -= 1; } - // Since low cache is always empty for WITH_TIES, this unwrap is safe. - let middle_last = self.middle.last_key_value().unwrap(); - let middle_last_order_by = middle_last.0 .0.clone(); + assert!( + self.low.is_none(), + "Offset is not supported yet for WITH TIES, so low cache should be None" + ); + + if self.middle.is_empty() { + consistency_error!( + ?group_key, + ?cache_key, + "middle cache is empty, but we receive a DELETE operation" + ); + staging.delete(cache_key, (&row).into()); + return Ok(()); + } + + let middle_last_sort_key = self.middle.last_key().unwrap().0.clone(); - let sort_key = cache_key.0.clone(); - if sort_key > middle_last_order_by { - // The row is in high. + let to_delete_sort_key = cache_key.0.clone(); + if to_delete_sort_key > middle_last_sort_key { + // the row is in high self.high.remove(&cache_key); } else { - // The row is in middle + // the row is in middle self.middle.remove(&cache_key); - res_ops.push(Op::Delete); - res_rows.push((&row).into()); + staging.delete(cache_key.clone(), (&row).into()); if self.middle.len() >= self.limit { - // This can happen when there are ties. + // this can happen when there are ties return Ok(()); } - // Try to fill the high cache if it is empty - if self.high.is_empty() && !self.table_row_count_matched() { + // refill the high cache if it's not synced + if !self.high_is_synced() { managed_state .fill_high_cache( group_key, self, self.last_cache_key_before_high().cloned(), - self.high_capacity, + self.high_cache_capacity, ) .await?; } - // Bring elements with the same sort key, if any, from high cache to middle cache. + // bring the first element and its ties, if any, from high cache to middle cache if !self.high.is_empty() { let high_first = self.high.pop_first().unwrap(); - let high_first_order_by = high_first.0 .0.clone(); - assert!(high_first_order_by > middle_last_order_by); - - res_ops.push(Op::Insert); - res_rows.push(high_first.1.clone()); - self.middle.insert(high_first.0, high_first.1); - - // We need to trigger insert for all rows with prefix `high_first_order_by` - // in high cache. - for (ordered_pk_row, row) in self.high.extract_if(|k, _| k.0 == high_first_order_by) - { - if ordered_pk_row.0 != high_first_order_by { - break; - } - res_ops.push(Op::Insert); - res_rows.push(row.clone()); - self.middle.insert(ordered_pk_row, row); + let high_first_sort_key = (high_first.0).0.clone(); + assert!(high_first_sort_key > middle_last_sort_key); + + self.middle + .insert(high_first.0.clone(), high_first.1.clone()); + staging.insert(high_first.0, high_first.1); + + for (cache_key, row) in self.high.extract_if(|k, _| k.0 == high_first_sort_key) { + self.middle.insert(cache_key.clone(), row.clone()); + staging.insert(cache_key, row); } } } @@ -611,8 +663,7 @@ pub trait AppendOnlyTopNCacheTrait { &mut self, cache_key: CacheKey, row_ref: RowRef<'_>, - res_ops: &mut Vec, - res_rows: &mut Vec, + staging: &mut TopNStaging, managed_state: &mut ManagedTopNState, row_deserializer: &RowDeserializer, ) -> StreamExecutorResult<()>; @@ -623,56 +674,54 @@ impl AppendOnlyTopNCacheTrait for TopNCache { &mut self, cache_key: CacheKey, row_ref: RowRef<'_>, - res_ops: &mut Vec, - res_rows: &mut Vec, + staging: &mut TopNStaging, managed_state: &mut ManagedTopNState, row_deserializer: &RowDeserializer, ) -> StreamExecutorResult<()> { - if self.is_middle_cache_full() && &cache_key >= self.middle.last_key_value().unwrap().0 { + if self.middle_is_full() && &cache_key >= self.middle.last_key().unwrap() { return Ok(()); } managed_state.insert(row_ref); - // Then insert input row to corresponding cache range according to its order key - if !self.is_low_cache_full() { - self.low.insert(cache_key, row_ref.into()); - return Ok(()); + // insert input row into corresponding cache according to its sort key + let mut to_insert = (cache_key, row_ref.into()); + + let low_is_full = self.low_is_full(); + if let Some(low) = &mut self.low { + // try insert into low cache + + if !low_is_full { + low.insert(to_insert.0, to_insert.1); + return Ok(()); + } + + // low cache is full + let low_last = low.last_entry().unwrap(); + if &to_insert.0 < low_last.key() { + // make space for the new entry + let low_last = low_last.remove_entry(); + low.insert(to_insert.0, to_insert.1); + to_insert = low_last; // move the last entry to the middle cache + } } - let elem_to_insert_into_middle = if let Some(low_last) = self.low.last_entry() - && &cache_key <= low_last.key() - { - // Take the last element of `cache.low` and insert input row to it. - let low_last = low_last.remove_entry(); - self.low.insert(cache_key, row_ref.into()); - low_last - } else { - (cache_key, row_ref.into()) - }; + // try insert into middle cache - if !self.is_middle_cache_full() { - self.middle.insert( - elem_to_insert_into_middle.0, - elem_to_insert_into_middle.1.clone(), - ); - res_ops.push(Op::Insert); - res_rows.push(elem_to_insert_into_middle.1); + if !self.middle_is_full() { + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + staging.insert(to_insert.0, to_insert.1); return Ok(()); } // The row must be in the range of [offset, offset+limit). // the largest row in `cache.middle` needs to be removed. let middle_last = self.middle.pop_last().unwrap(); - debug_assert!(elem_to_insert_into_middle.0 < middle_last.0); - - res_ops.push(Op::Delete); - res_rows.push(middle_last.1.clone()); + debug_assert!(to_insert.0 < middle_last.0); managed_state.delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?); + staging.delete(middle_last.0, middle_last.1); - res_ops.push(Op::Insert); - res_rows.push(elem_to_insert_into_middle.1.clone()); - self.middle - .insert(elem_to_insert_into_middle.0, elem_to_insert_into_middle.1); + self.middle.insert(to_insert.0.clone(), to_insert.1.clone()); + staging.insert(to_insert.0, to_insert.1); // Unlike normal topN, append only topN does not use the high part of the cache. @@ -685,37 +734,38 @@ impl AppendOnlyTopNCacheTrait for TopNCache { &mut self, cache_key: CacheKey, row_ref: RowRef<'_>, - res_ops: &mut Vec, - res_rows: &mut Vec, + staging: &mut TopNStaging, managed_state: &mut ManagedTopNState, row_deserializer: &RowDeserializer, ) -> StreamExecutorResult<()> { assert!( - self.low.is_empty(), + self.low.is_none(), "Offset is not supported yet for WITH TIES, so low cache should be empty" ); - let elem_to_compare_with_middle = (cache_key, row_ref); - - if !self.is_middle_cache_full() { - let row: CompactedRow = elem_to_compare_with_middle.1.into(); - managed_state.insert(elem_to_compare_with_middle.1); - self.middle - .insert(elem_to_compare_with_middle.0.clone(), row.clone()); - res_ops.push(Op::Insert); - res_rows.push(row); + + let to_insert = (cache_key, row_ref); + + // try insert into middle cache + + if !self.middle_is_full() { + managed_state.insert(to_insert.1); + let row: CompactedRow = to_insert.1.into(); + self.middle.insert(to_insert.0.clone(), row.clone()); + staging.insert(to_insert.0, row); return Ok(()); } - let sort_key = &elem_to_compare_with_middle.0 .0; - let middle_last = self.middle.last_key_value().unwrap(); - let middle_last_order_by = &middle_last.0 .0.clone(); + // middle cache is full - match sort_key.cmp(middle_last_order_by) { + let to_insert_sort_key = &(to_insert.0).0; + let middle_last_sort_key = self.middle.last_key().unwrap().0.clone(); + + match to_insert_sort_key.cmp(&middle_last_sort_key) { Ordering::Less => { - // The row is in middle. - let num_ties = self + // the row is in middle + let n_ties_of_last = self .middle - .range((middle_last_order_by.clone(), vec![])..) + .range((middle_last_sort_key.clone(), vec![])..) .count(); // We evict the last row and its ties only if the number of remaining rows is // still larger than limit, i.e., there are limit-1 other rows. @@ -724,41 +774,125 @@ impl AppendOnlyTopNCacheTrait for TopNCache { // insert 0 -> [0,1,1,1,1] // insert 0 -> [0,0,1,1,1,1] // insert 0 -> [0,0,0] - if self.middle.len() - num_ties + 1 >= self.limit { + if self.middle.len() + 1 - n_ties_of_last >= self.limit { + // middle will be full without the last element and its ties after insertion while let Some(middle_last) = self.middle.last_entry() - && &middle_last.key().0 == middle_last_order_by + && middle_last.key().0 == middle_last_sort_key { let middle_last = middle_last.remove_entry(); - res_ops.push(Op::Delete); - res_rows.push(middle_last.1.clone()); + // we don't need to maintain the high part so just delete it from state table managed_state .delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?); + staging.delete(middle_last.0, middle_last.1); } } - managed_state.insert(elem_to_compare_with_middle.1); - res_ops.push(Op::Insert); - res_rows.push((&elem_to_compare_with_middle.1).into()); - self.middle.insert( - elem_to_compare_with_middle.0, - (&elem_to_compare_with_middle.1).into(), - ); + managed_state.insert(to_insert.1); + let row: CompactedRow = to_insert.1.into(); + self.middle.insert(to_insert.0.clone(), row.clone()); + staging.insert(to_insert.0, row); } Ordering::Equal => { - // The row is in middle and is a tie with the last row. - managed_state.insert(elem_to_compare_with_middle.1); - res_ops.push(Op::Insert); - res_rows.push((&elem_to_compare_with_middle.1).into()); - self.middle.insert( - elem_to_compare_with_middle.0, - (&elem_to_compare_with_middle.1).into(), - ); + // the row is in middle and is a tie of the last row + managed_state.insert(to_insert.1); + let row: CompactedRow = to_insert.1.into(); + self.middle.insert(to_insert.0.clone(), row.clone()); + staging.insert(to_insert.0, row); } Ordering::Greater => { - // The row is in high. Do nothing. + // the row is in high, do nothing } } Ok(()) } } + +/// Used to build diff between before and after applying an input chunk, for `TopNCache` (of one group). +/// It should be maintained when an entry is inserted or deleted from the `middle` cache. +#[derive(Debug, Default)] +pub struct TopNStaging { + to_delete: BTreeMap, + to_insert: BTreeMap, + to_update: BTreeMap, +} + +impl TopNStaging { + pub fn new() -> Self { + Self::default() + } + + /// Insert a row into the staging changes. This method must be called when a row is + /// added to the `middle` cache. + fn insert(&mut self, cache_key: CacheKey, row: CompactedRow) { + if let Some(old_row) = self.to_delete.remove(&cache_key) { + if old_row != row { + self.to_update.insert(cache_key, (old_row, row)); + } + } else { + self.to_insert.insert(cache_key, row); + } + } + + /// Delete a row from the staging changes. This method must be called when a row is + /// removed from the `middle` cache. + fn delete(&mut self, cache_key: CacheKey, row: CompactedRow) { + if self.to_insert.remove(&cache_key).is_some() { + // do nothing more + } else if let Some((old_row, _)) = self.to_update.remove(&cache_key) { + self.to_delete.insert(cache_key, old_row); + } else { + self.to_delete.insert(cache_key, row); + } + } + + /// Get the count of effective changes in the staging. + pub fn len(&self) -> usize { + self.to_delete.len() + self.to_insert.len() + self.to_update.len() + } + + /// Check if the staging is empty. + pub fn is_empty(&self) -> bool { + self.to_delete.is_empty() && self.to_insert.is_empty() && self.to_update.is_empty() + } + + /// Iterate over the changes in the staging. + pub fn into_changes(self) -> impl Iterator { + #[cfg(debug_assertions)] + { + let keys = self + .to_delete + .keys() + .chain(self.to_insert.keys()) + .chain(self.to_update.keys()) + .unique() + .count(); + assert_eq!( + keys, + self.to_delete.len() + self.to_insert.len() + self.to_update.len(), + "should not have duplicate keys with different operations", + ); + } + + // We expect one `CacheKey` to appear at most once in the staging, and, the order of + // the outputs of `TopN` doesn't really matter, so we can simply chain the three maps. + // Although the output order is not important, we still ensure that `Delete`s are emitted + // before `Insert`s, so that we can avoid temporary violation of the `LIMIT` constraint. + self.to_update + .into_values() + .flat_map(|(old_row, new_row)| { + [(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)] + }) + .chain(self.to_delete.into_values().map(|row| (Op::Delete, row))) + .chain(self.to_insert.into_values().map(|row| (Op::Insert, row))) + } + + /// Iterate over the changes in the staging, and deserialize the rows. + pub fn into_deserialized_changes( + self, + deserializer: &RowDeserializer, + ) -> impl Iterator> + '_ { + self.into_changes() + .map(|(op, row)| Ok((op, deserializer.deserialize(row.row.as_ref())?))) + } +} diff --git a/src/stream/src/executor/top_n/top_n_plain.rs b/src/stream/src/executor/top_n/top_n_plain.rs index 63b9ca94961f..7dedb31e0f33 100644 --- a/src/stream/src/executor/top_n/top_n_plain.rs +++ b/src/stream/src/executor/top_n/top_n_plain.rs @@ -13,10 +13,11 @@ // limitations under the License. use risingwave_common::array::Op; -use risingwave_common::row::RowExt; +use risingwave_common::row::{RowDeserializer, RowExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::sort_util::ColumnOrder; +use super::top_n_cache::TopNStaging; use super::utils::*; use super::{ManagedTopNState, TopNCache, TopNCacheTrait}; use crate::executor::prelude::*; @@ -68,7 +69,7 @@ impl TopNExecutor { let mut inner = InnerTopNExecutor::new(schema, storage_key, offset_and_limit, order_by, state_table)?; - inner.cache.high_capacity = 2; + inner.cache.high_cache_capacity = 2; Ok(TopNExecutorWrapper { input, ctx, inner }) } @@ -126,9 +127,11 @@ impl TopNExecutorBase for InnerTopNExecuto where TopNCache: TopNCacheTrait, { - async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult { - let mut res_ops = Vec::with_capacity(self.cache.limit); - let mut res_rows = Vec::with_capacity(self.cache.limit); + async fn apply_chunk( + &mut self, + chunk: StreamChunk, + ) -> StreamExecutorResult> { + let mut staging = TopNStaging::new(); // apply the chunk to state table for (op, row_ref) in chunk.rows() { @@ -138,8 +141,7 @@ where Op::Insert | Op::UpdateInsert => { // First insert input row to state store self.managed_state.insert(row_ref); - self.cache - .insert(cache_key, row_ref, &mut res_ops, &mut res_rows) + self.cache.insert(cache_key, row_ref, &mut staging) } Op::Delete | Op::UpdateDelete => { @@ -151,14 +153,24 @@ where &mut self.managed_state, cache_key, row_ref, - &mut res_ops, - &mut res_rows, + &mut staging, ) .await? } } } - generate_output(res_rows, res_ops, &self.schema) + + let data_types = self.schema.data_types(); + let deserializer = RowDeserializer::new(data_types.clone()); + if staging.is_empty() { + return Ok(None); + } + let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len())); + for res in staging.into_deserialized_changes(&deserializer) { + let (op, row) = res?; + let _none = chunk_builder.append_row(op, row); + } + Ok(chunk_builder.take()) } async fn flush_data(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { @@ -184,7 +196,6 @@ where #[cfg(test)] mod tests { - use assert_matches::assert_matches; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; @@ -200,6 +211,7 @@ mod tests { use risingwave_common::util::epoch::test_epoch; use super::*; + use crate::executor::test_utils::StreamExecutorTestExt; fn create_stream_chunks() -> Vec { let chunk1 = StreamChunk::from_pretty( @@ -287,7 +299,7 @@ mod tests { .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -297,49 +309,38 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 10 3 + 9 4 + 8 5" ) + .sort_rows(), ); // Barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 7 6 - - 7 6 - - 8 5 - + 8 5 - 8 5 + 11 8" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); // (8, 9, 10, 11, 12, 13, 14) assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 8 5 @@ -347,29 +348,24 @@ mod tests { + 13 11 + 14 12" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; // (10, 12, 13, 14) - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 8 5 - 9 4 - 11 8" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } #[tokio::test] @@ -382,7 +378,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -392,76 +388,58 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2 - + 10 3 - - 10 3 - + 9 4 - - 9 4 + 8 5" ) + .sort_rows(), ); // now () -> (1, 2, 3, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - - 8 5 + 7 6 - 3 2 - + 8 5 - 1 0 - + 9 4 - - 9 4 + 5 7 - 2 1 + 9 4" ) + .sort_rows(), ); // (5, 7, 8, 9) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 9 4 + 6 9" ) + .sort_rows(), ); // (5, 6, 7, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 5 7 @@ -469,13 +447,11 @@ mod tests { - 6 9 + 10 3" ) + .sort_rows(), ); // (7, 8, 9, 10) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } // Should have the same result as above, since there are no duplicate sort keys. @@ -489,7 +465,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, true>::new( + let top_n = TopNExecutor::<_, true>::new( source, ActorContext::for_test(0), schema, @@ -499,76 +475,58 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2 - + 10 3 - - 10 3 - + 9 4 - - 9 4 + 8 5" ) + .sort_rows(), ); // now () -> (1, 2, 3, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - - 8 5 + 7 6 - 3 2 - + 8 5 - 1 0 - + 9 4 - - 9 4 + 5 7 - 2 1 + 9 4" ) + .sort_rows(), ); // (5, 7, 8, 9) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 9 4 + 6 9" ) + .sort_rows(), ); // (5, 6, 7, 8) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 5 7 @@ -576,13 +534,11 @@ mod tests { - 6 9 + 10 3" ) + .sort_rows(), ); // (7, 8, 9, 10) // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } #[tokio::test] @@ -595,7 +551,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -605,60 +561,46 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 10 3 + 9 4 + 8 5" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 7 6 - - 7 6 - - 8 5 - + 8 5 - 8 5 + 11 8" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 8 5" + + 8 5" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 8 5 @@ -668,12 +610,10 @@ mod tests { - 11 8 + 14 12" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } } @@ -684,6 +624,7 @@ mod tests { use super::*; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table_from_state_store; + use crate::executor::test_utils::StreamExecutorTestExt; fn create_source_new() -> Executor { let mut chunks = vec![ StreamChunk::from_pretty( @@ -812,7 +753,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -822,55 +763,42 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); - // should be empty assert_eq!( - *res.as_chunk().unwrap(), - StreamChunk::from_pretty(" I I I I") - ); - - let res = top_n_executor.next().await.unwrap().unwrap(); - assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - + 5 1 4 1002 - " + + 5 1 4 1002" ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - + 1 9 1 1003 - + 9 8 1 1004 - - 9 8 1 1004 - + 1 1 4 1001", - ), + + 1 9 1 1003 + + 1 1 4 1001", + ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - - 5 1 4 1002 - + 1 0 2 1006", + - 5 1 4 1002 + + 1 0 2 1006", ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } #[tokio::test] @@ -890,7 +818,7 @@ mod tests { .await; let source = create_source_new_before_recovery(); let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::<_, false>::new( + let top_n = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -900,33 +828,22 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); - // should be empty assert_eq!( - *res.as_chunk().unwrap(), - StreamChunk::from_pretty(" I I I I") - ); - - let res = top_n_executor.next().await.unwrap().unwrap(); - assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - + 5 1 4 1002 - " + + 5 1 4 1002" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; let state_table = create_in_memory_state_table_from_state_store( &[ @@ -944,7 +861,7 @@ mod tests { // recovery let source = create_source_new_after_recovery(); let schema = source.schema().clone(); - let top_n_executor_after_recovery = TopNExecutor::<_, false>::new( + let top_n_after_recovery = TopNExecutor::<_, false>::new( source, ActorContext::for_test(0), schema, @@ -954,41 +871,33 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor_after_recovery.boxed().execute(); + let mut top_n = top_n_after_recovery.boxed().execute(); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - + 1 9 1 1003 - + 9 8 1 1004 - - 9 8 1 1004 - + 1 1 4 1001", - ), + + 1 9 1 1003 + + 1 1 4 1001", + ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I I I - - 5 1 4 1002 - + 1 0 2 1006", + - 5 1 4 1002 + + 1 0 2 1006", ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } } @@ -999,6 +908,7 @@ mod tests { use super::*; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table_from_state_store; + use crate::executor::test_utils::StreamExecutorTestExt; fn create_source() -> Executor { let mut chunks = vec![ @@ -1070,7 +980,7 @@ mod tests { ) .await; let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::new_with_ties_for_test( + let top_n = TopNExecutor::new_with_ties_for_test( source, ActorContext::for_test(0), schema, @@ -1080,64 +990,56 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2" ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 3 6 - + 3 7 - - 3 7 - - 3 6 - 3 2 + 1 8 + 2 9" ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 1 0" ) + .sort_rows(), ); // High cache has only 2 capacity, but we need to trigger 3 inserts here! - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 1 8 + 3 2 + 3 6 - + 3 7 - " + + 3 7" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } fn create_source_before_recovery() -> Executor { @@ -1149,8 +1051,7 @@ mod tests { + 3 2 + 10 3 + 9 4 - + 8 5 - ", + + 8 5", ), StreamChunk::from_pretty( " I I @@ -1214,7 +1115,7 @@ mod tests { .await; let source = create_source_before_recovery(); let schema = source.schema().clone(); - let top_n_executor = TopNExecutor::new_with_ties_for_test( + let top_n = TopNExecutor::new_with_ties_for_test( source, ActorContext::for_test(0), schema, @@ -1224,41 +1125,34 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor.boxed().execute(); + let mut top_n = top_n.boxed().execute(); // consume the init barrier - top_n_executor.next().await.unwrap().unwrap(); - let res = top_n_executor.next().await.unwrap().unwrap(); + top_n.expect_barrier().await; assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I + 1 0 + 2 1 + 3 2" ) + .sort_rows(), ); - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - + 3 6 - + 3 7 - - 3 7 - - 3 6 - 3 2 + 1 8 + 2 9" ) + .sort_rows(), ); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; let state_table = create_in_memory_state_table_from_state_store( &[DataType::Int64, DataType::Int64], @@ -1271,7 +1165,7 @@ mod tests { // recovery let source = create_source_after_recovery(); let schema = source.schema().clone(); - let top_n_executor_after_recovery = TopNExecutor::new_with_ties_for_test( + let top_n_after_recovery = TopNExecutor::new_with_ties_for_test( source, ActorContext::for_test(0), schema, @@ -1281,42 +1175,34 @@ mod tests { state_table, ) .unwrap(); - let mut top_n_executor = top_n_executor_after_recovery.boxed().execute(); + let mut top_n = top_n_after_recovery.boxed().execute(); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 1 0" ) + .sort_rows(), ); // High cache has only 2 capacity, but we need to trigger 3 inserts here! - let res = top_n_executor.next().await.unwrap().unwrap(); assert_eq!( - *res.as_chunk().unwrap(), + top_n.expect_chunk().await.sort_rows(), StreamChunk::from_pretty( " I I - 1 8 + 3 2 + 3 6 - + 3 7 - " + + 3 7" ) + .sort_rows(), ); - println!("hello"); // barrier - assert_matches!( - top_n_executor.next().await.unwrap().unwrap(), - Message::Barrier(_) - ); + top_n.expect_barrier().await; } } } diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 57f4017e5118..e75f40bad81c 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -23,9 +23,11 @@ use risingwave_common::util::epoch::EpochPair; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; -use super::{serialize_pk_to_cache_key, CacheKey, CacheKeySerde, GroupKey, TopNCache}; +use super::top_n_cache::CacheKey; +use super::{serialize_pk_to_cache_key, CacheKeySerde, GroupKey, TopNCache}; use crate::common::table::state_table::StateTable; use crate::executor::error::StreamExecutorResult; +use crate::executor::top_n::top_n_cache::Cache; /// * For TopN, the storage key is: `[ order_by + remaining columns of pk ]` /// * For group TopN, the storage key is: `[ group_key + order_by + remaining columns of pk ]` @@ -143,8 +145,10 @@ impl ManagedTopNState { start_key: Option, cache_size_limit: usize, ) -> StreamExecutorResult<()> { - let cache = &mut topn_cache.high; + let high_cache = &mut topn_cache.high; + assert!(high_cache.is_empty()); + // TODO(rc): iterate from `start_key` let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table @@ -171,13 +175,13 @@ impl ManagedTopNState { { continue; } - cache.insert(topn_row.cache_key, (&topn_row.row).into()); - if cache.len() == cache_size_limit { + high_cache.insert(topn_row.cache_key, (&topn_row.row).into()); + if high_cache.len() == cache_size_limit { break; } } - if WITH_TIES && topn_cache.is_high_cache_full() { + if WITH_TIES && topn_cache.high_is_full() { let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0 .0.clone(); while let Some(item) = state_table_iter.next().await { group_row_count += 1; @@ -206,9 +210,10 @@ impl ManagedTopNState { group_key: Option, topn_cache: &mut TopNCache, ) -> StreamExecutorResult<()> { - assert!(topn_cache.low.is_empty()); + assert!(topn_cache.low.as_ref().map(Cache::is_empty).unwrap_or(true)); assert!(topn_cache.middle.is_empty()); assert!(topn_cache.high.is_empty()); + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table @@ -225,14 +230,12 @@ impl ManagedTopNState { let mut group_row_count = 0; - if topn_cache.offset > 0 { + if let Some(low) = &mut topn_cache.low { while let Some(item) = state_table_iter.next().await { group_row_count += 1; let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); - topn_cache - .low - .insert(topn_row.cache_key, (&topn_row.row).into()); - if topn_cache.low.len() == topn_cache.offset { + low.insert(topn_row.cache_key, (&topn_row.row).into()); + if low.len() == topn_cache.offset { break; } } @@ -249,7 +252,7 @@ impl ManagedTopNState { break; } } - if WITH_TIES && topn_cache.is_middle_cache_full() { + if WITH_TIES && topn_cache.middle_is_full() { let middle_last_sort_key = topn_cache.middle.last_key_value().unwrap().0 .0.clone(); while let Some(item) = state_table_iter.next().await { group_row_count += 1; @@ -268,10 +271,10 @@ impl ManagedTopNState { } assert!( - topn_cache.high_capacity > 0, + topn_cache.high_cache_capacity > 0, "topn cache high_capacity should always > 0" ); - while !topn_cache.is_high_cache_full() + while !topn_cache.high_is_full() && let Some(item) = state_table_iter.next().await { group_row_count += 1; @@ -280,7 +283,7 @@ impl ManagedTopNState { .high .insert(topn_row.cache_key, (&topn_row.row).into()); } - if WITH_TIES && topn_cache.is_high_cache_full() { + if WITH_TIES && topn_cache.high_is_full() { let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0 .0.clone(); while let Some(item) = state_table_iter.next().await { group_row_count += 1; @@ -324,7 +327,7 @@ mod tests { use super::*; use crate::executor::test_utils::top_n_executor::create_in_memory_state_table; - use crate::executor::top_n::top_n_cache::TopNCacheTrait; + use crate::executor::top_n::top_n_cache::{TopNCacheTrait, TopNStaging}; use crate::executor::top_n::{create_cache_key_serde, NO_GROUP_KEY}; use crate::row_nonnull; @@ -487,15 +490,14 @@ mod tests { let row1_bytes = serialize_pk_to_cache_key(row1.clone(), &cache_key_serde); let mut cache = TopNCache::::new(0, 1, data_types); - cache.insert(row1_bytes.clone(), row1.clone(), &mut vec![], &mut vec![]); + cache.insert(row1_bytes.clone(), row1.clone(), &mut TopNStaging::new()); cache .delete( NO_GROUP_KEY, &mut managed_state, row1_bytes, row1, - &mut vec![], - &mut vec![], + &mut TopNStaging::new(), ) .await .unwrap(); diff --git a/src/stream/src/executor/top_n/topn_cache_state.rs b/src/stream/src/executor/top_n/topn_cache_state.rs deleted file mode 100644 index fa28369f2b0d..000000000000 --- a/src/stream/src/executor/top_n/topn_cache_state.rs +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use core::fmt; -use std::alloc::Global; -use std::collections::btree_map::{BTreeMap, ExtractIf, OccupiedEntry, Range}; -use std::ops::RangeBounds; - -use risingwave_common::row::CompactedRow; -use risingwave_common_estimate_size::{EstimateSize, KvSize}; - -/// `CacheKey` is composed of `(order_by, remaining columns of pk)`. -pub type CacheKey = (Vec, Vec); - -#[derive(Default)] -pub struct TopNCacheState { - /// The full copy of the state. - inner: BTreeMap, - kv_heap_size: KvSize, -} - -impl EstimateSize for TopNCacheState { - fn estimated_heap_size(&self) -> usize { - // TODO: Add btreemap internal size. - // https://github.com/risingwavelabs/risingwave/issues/9713 - self.kv_heap_size.size() - } -} - -impl TopNCacheState { - pub fn new() -> Self { - Self { - inner: BTreeMap::new(), - kv_heap_size: KvSize::new(), - } - } - - /// Insert into the cache. - pub fn insert(&mut self, key: CacheKey, value: CompactedRow) -> Option { - self.kv_heap_size.add(&key, &value); - self.inner.insert(key, value) - } - - /// Delete from the cache. - pub fn remove(&mut self, key: &CacheKey) { - if let Some(value) = self.inner.remove(key) { - self.kv_heap_size.sub(key, &value); - } - } - - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - pub fn last_key_value(&self) -> Option<(&CacheKey, &CompactedRow)> { - self.inner.last_key_value() - } - - pub fn first_key_value(&self) -> Option<(&CacheKey, &CompactedRow)> { - self.inner.first_key_value() - } - - pub fn clear(&mut self) { - self.inner.clear() - } - - pub fn pop_first(&mut self) -> Option<(CacheKey, CompactedRow)> { - self.inner.pop_first().inspect(|(k, v)| { - self.kv_heap_size.sub(k, v); - }) - } - - pub fn pop_last(&mut self) -> Option<(CacheKey, CompactedRow)> { - self.inner.pop_last().inspect(|(k, v)| { - self.kv_heap_size.sub(k, v); - }) - } - - pub fn last_entry(&mut self) -> Option> { - self.inner - .last_entry() - .map(|entry| TopNCacheOccupiedEntry::new(entry, &mut self.kv_heap_size)) - } - - pub fn iter(&self) -> impl Iterator { - self.inner.iter() - } - - pub fn range(&self, range: R) -> Range<'_, CacheKey, CompactedRow> - where - R: RangeBounds, - { - self.inner.range(range) - } - - pub fn extract_if<'a, F1>( - &'a mut self, - mut pred: F1, - ) -> TopNExtractIf<'a, impl FnMut(&CacheKey, &mut CompactedRow) -> bool> - where - F1: 'a + FnMut(&CacheKey, &CompactedRow) -> bool, - { - let pred_immut = move |key: &CacheKey, value: &mut CompactedRow| pred(key, value); - TopNExtractIf { - inner: self.inner.extract_if(pred_immut), - kv_heap_size: &mut self.kv_heap_size, - } - } - - pub fn retain(&mut self, mut f: F) - where - F: FnMut(&CacheKey, &CompactedRow) -> bool, - { - self.extract_if(|k, v| !f(k, v)).for_each(drop); - } -} - -pub struct TopNCacheOccupiedEntry<'a> { - inner: OccupiedEntry<'a, CacheKey, CompactedRow>, - /// The total size of the `TopNCacheState` - kv_heap_size: &'a mut KvSize, -} - -impl<'a> TopNCacheOccupiedEntry<'a> { - pub fn new(entry: OccupiedEntry<'a, CacheKey, CompactedRow>, size: &'a mut KvSize) -> Self { - Self { - inner: entry, - kv_heap_size: size, - } - } - - pub fn remove_entry(self) -> (CacheKey, CompactedRow) { - let (k, v) = self.inner.remove_entry(); - self.kv_heap_size.add(&k, &v); - (k, v) - } - - pub fn key(&self) -> &CacheKey { - self.inner.key() - } -} - -impl fmt::Debug for TopNCacheState { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.inner.fmt(f) - } -} - -pub struct TopNExtractIf<'a, F> -where - F: FnMut(&CacheKey, &mut CompactedRow) -> bool, -{ - inner: ExtractIf<'a, CacheKey, CompactedRow, F, Global>, - kv_heap_size: &'a mut KvSize, -} - -impl<'a, F> Iterator for TopNExtractIf<'a, F> -where - F: 'a + FnMut(&CacheKey, &mut CompactedRow) -> bool, -{ - type Item = (CacheKey, CompactedRow); - - fn next(&mut self) -> Option { - self.inner - .next() - .inspect(|(k, v)| self.kv_heap_size.sub(k, v)) - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} diff --git a/src/stream/src/executor/top_n/utils.rs b/src/stream/src/executor/top_n/utils.rs index 18409cf3d6db..930ed93d71f6 100644 --- a/src/stream/src/executor/top_n/utils.rs +++ b/src/stream/src/executor/top_n/utils.rs @@ -14,24 +14,22 @@ use std::future::Future; -use itertools::Itertools; -use risingwave_common::array::Op; use risingwave_common::buffer::Bitmap; -use risingwave_common::row::{CompactedRow, RowDeserializer}; -use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::ColumnOrder; -use super::CacheKey; +use super::top_n_cache::CacheKey; use crate::executor::prelude::*; pub trait TopNExecutorBase: Send + 'static { /// Apply the chunk to the dirty state and get the diffs. + /// TODO(rc): There can be a 2 times amplification in terms of the chunk size, so we may need to + /// allow `apply_chunk` return a stream of chunks. Motivation is not quite strong though. fn apply_chunk( &mut self, chunk: StreamChunk, - ) -> impl Future> + Send; + ) -> impl Future>> + Send; /// Flush the buffered chunk to the storage backend. fn flush_data( @@ -102,7 +100,9 @@ where } } Message::Chunk(chunk) => { - yield Message::Chunk(self.inner.apply_chunk(chunk).await?); + if let Some(output_chunk) = self.inner.apply_chunk(chunk).await? { + yield Message::Chunk(output_chunk); + } self.inner.try_flush_data().await?; } Message::Barrier(barrier) => { @@ -120,33 +120,6 @@ where } } -pub fn generate_output( - new_rows: Vec, - new_ops: Vec, - schema: &Schema, -) -> StreamExecutorResult { - if !new_rows.is_empty() { - let mut data_chunk_builder = DataChunkBuilder::new(schema.data_types(), new_rows.len() + 1); - let row_deserializer = RowDeserializer::new(schema.data_types()); - for compacted_row in new_rows { - let res = data_chunk_builder - .append_one_row(row_deserializer.deserialize(compacted_row.row.as_ref())?); - debug_assert!(res.is_none()); - } - // since `new_rows` is not empty, we unwrap directly - let new_data_chunk = data_chunk_builder.consume_all().unwrap(); - let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec()); - Ok(new_stream_chunk) - } else { - let columns = schema - .create_array_builders(0) - .into_iter() - .map(|x| x.finish().into()) - .collect_vec(); - Ok(StreamChunk::new(vec![], columns)) - } -} - /// For a given pk (Row), it can be split into `order_key` and `additional_pk` according to /// `order_by_len`, and the two split parts are serialized separately. pub fn serialize_pk_to_cache_key(pk: impl Row, cache_key_serde: &CacheKeySerde) -> CacheKey {