From d27d6c1eca115c60a6b31958e4a8ef6de5a5affe Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 27 Dec 2023 17:08:23 +0800 Subject: [PATCH] refactor(common): move some over window utils to `common` (#14204) Signed-off-by: Richard Chien --- Cargo.lock | 8 ++++++ Cargo.toml | 5 +++- .../estimate_size/collections/btreemap.rs} | 3 +- .../src/estimate_size/collections/mod.rs | 6 ++-- src/common/src/lib.rs | 1 + src/common/src/types/mod.rs | 2 ++ .../src/types}/sentinel.rs | 28 +++++++++---------- .../core/src/window_function/state/mod.rs | 8 +++++- .../core/src/window_function/state/rank.rs | 4 +-- src/stream/Cargo.toml | 1 + src/stream/src/executor/hash_agg.rs | 2 +- src/stream/src/executor/over_window/eowc.rs | 4 +-- .../src/executor/over_window/general.rs | 3 +- src/stream/src/executor/over_window/mod.rs | 3 -- .../executor/over_window/over_partition.rs | 11 ++++---- src/utils/delta_btree_map/Cargo.toml | 21 ++++++++++++++ .../delta_btree_map/src/lib.rs} | 11 ++++---- 17 files changed, 80 insertions(+), 41 deletions(-) rename src/{stream/src/executor/over_window/estimated_btree_map.rs => common/src/estimate_size/collections/btreemap.rs} (98%) rename src/{stream/src/executor/over_window => common/src/types}/sentinel.rs (73%) create mode 100644 src/utils/delta_btree_map/Cargo.toml rename src/{stream/src/executor/over_window/delta_btree_map.rs => utils/delta_btree_map/src/lib.rs} (99%) diff --git a/Cargo.lock b/Cargo.lock index b6b6ba743eed8..dd99f20c8e148 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2963,6 +2963,13 @@ dependencies = [ "uuid", ] +[[package]] +name = "delta_btree_map" +version = "1.5.0-alpha" +dependencies = [ + "enum-as-inner", +] + [[package]] name = "deltalake" version = "0.17.0" @@ -9384,6 +9391,7 @@ dependencies = [ "await-tree", "bytes", "criterion", + "delta_btree_map", "educe 0.5.7", "either", "enum-as-inner", diff --git a/Cargo.toml b/Cargo.toml index a7dcc57ff4f23..08f6d98ab78f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ members = [ "src/tests/simulation", "src/tests/sqlsmith", "src/tests/state_cleaning_test", + "src/utils/delta_btree_map", "src/utils/local_stats_alloc", "src/utils/pgwire", "src/utils/runtime", @@ -136,7 +137,9 @@ arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" } arrow-schema-deltalake = { package = "arrow-schema", version = "48.0.1" } # Use a forked version which removes the dependencies on dynamo db to reduce # compile time and binary size. -deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd4640490202ffe98adbd13b09cef8e007b", features = ["s3-no-concurrent-write"] } +deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd4640490202ffe98adbd13b09cef8e007b", features = [ + "s3-no-concurrent-write", +] } parquet = "49" thiserror-ext = "0.0.10" tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } diff --git a/src/stream/src/executor/over_window/estimated_btree_map.rs b/src/common/src/estimate_size/collections/btreemap.rs similarity index 98% rename from src/stream/src/executor/over_window/estimated_btree_map.rs rename to src/common/src/estimate_size/collections/btreemap.rs index 953fc66346db7..7efb1ca4201aa 100644 --- a/src/stream/src/executor/over_window/estimated_btree_map.rs +++ b/src/common/src/estimate_size/collections/btreemap.rs @@ -15,7 +15,7 @@ use std::collections::BTreeMap; use std::ops::{Bound, RangeInclusive}; -use risingwave_common::estimate_size::{EstimateSize, KvSize}; +use crate::estimate_size::{EstimateSize, KvSize}; pub struct EstimatedBTreeMap { inner: BTreeMap, @@ -77,7 +77,6 @@ where } } - #[expect(dead_code)] pub fn clear(&mut self) { self.inner.clear(); self.heap_size.set(0); diff --git a/src/common/src/estimate_size/collections/mod.rs b/src/common/src/estimate_size/collections/mod.rs index c8b36d8352d6d..5909e43037389 100644 --- a/src/common/src/estimate_size/collections/mod.rs +++ b/src/common/src/estimate_size/collections/mod.rs @@ -20,9 +20,11 @@ mod heap; pub mod lru; pub use heap::*; pub mod vecdeque; -pub use vecdeque::EstimatedVecDeque as VecDeque; +pub use vecdeque::EstimatedVecDeque; pub mod hashmap; -pub use hashmap::EstimatedHashMap as HashMap; +pub use hashmap::EstimatedHashMap; +pub mod btreemap; +pub use btreemap::EstimatedBTreeMap; mod private { use super::*; diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index f545287bd9cc9..f95b39d287fdb 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -43,6 +43,7 @@ #![feature(negative_impls)] #![feature(bound_map)] #![feature(array_methods)] +#![feature(btree_cursors)] #[cfg_attr(not(test), expect(unused_extern_crates))] extern crate self as risingwave_common; diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 7b6f393725e71..03b477fa09e57 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -60,6 +60,7 @@ mod ordered; mod ordered_float; mod postgres_type; mod scalar_impl; +mod sentinel; mod serial; mod struct_type; mod successor; @@ -81,6 +82,7 @@ pub use self::ops::{CheckedAdd, IsNegative}; pub use self::ordered::*; pub use self::ordered_float::{FloatExt, IntoOrdered}; pub use self::scalar_impl::*; +pub use self::sentinel::Sentinelled; pub use self::serial::Serial; pub use self::struct_type::StructType; pub use self::successor::Successor; diff --git a/src/stream/src/executor/over_window/sentinel.rs b/src/common/src/types/sentinel.rs similarity index 73% rename from src/stream/src/executor/over_window/sentinel.rs rename to src/common/src/types/sentinel.rs index fd7ac2cf9749f..c6372ac99e373 100644 --- a/src/stream/src/executor/over_window/sentinel.rs +++ b/src/common/src/types/sentinel.rs @@ -13,17 +13,23 @@ // limitations under the License. use enum_as_inner::EnumAsInner; -use risingwave_common::estimate_size::EstimateSize; -use risingwave_expr::window_function::StateKey; +use crate::estimate_size::EstimateSize; + +/// [`Sentinelled`] wraps type `T` to provide smallest (smaller than any normal `T` value) and largest +/// (larger than ant normal `T` value) sentinel value for `T`. +/// +/// Sentinel is a very common technique used to simplify tree/list/array algorithms. The main idea is to +/// insert sentinel node to the beginning or/and the end, so that algorithms don't need to handle complex +/// edge cases. #[derive(Debug, Clone, PartialEq, Eq, EnumAsInner)] -pub(super) enum KeyWithSentinel { +pub enum Sentinelled { Smallest, Normal(T), Largest, } -impl KeyWithSentinel { +impl Sentinelled { pub fn as_normal_expect(&self) -> &T { self.as_normal().expect("expect normal key") } @@ -33,7 +39,7 @@ impl KeyWithSentinel { } } -impl PartialOrd for KeyWithSentinel +impl PartialOrd for Sentinelled where T: Ord, { @@ -42,12 +48,12 @@ where } } -impl Ord for KeyWithSentinel +impl Ord for Sentinelled where T: Ord, { fn cmp(&self, other: &Self) -> std::cmp::Ordering { - use KeyWithSentinel::*; + use Sentinelled::*; match (self, other) { (Smallest, Smallest) => std::cmp::Ordering::Equal, (Smallest, _) => std::cmp::Ordering::Less, @@ -60,7 +66,7 @@ where } } -impl EstimateSize for KeyWithSentinel { +impl EstimateSize for Sentinelled { fn estimated_heap_size(&self) -> usize { match self { Self::Smallest => 0, @@ -69,9 +75,3 @@ impl EstimateSize for KeyWithSentinel { } } } - -impl From for KeyWithSentinel { - fn from(key: StateKey) -> Self { - Self::Normal(key) - } -} diff --git a/src/expr/core/src/window_function/state/mod.rs b/src/expr/core/src/window_function/state/mod.rs index c6ff4467b4787..b56ecda36de5c 100644 --- a/src/expr/core/src/window_function/state/mod.rs +++ b/src/expr/core/src/window_function/state/mod.rs @@ -17,7 +17,7 @@ use std::collections::BTreeSet; use itertools::Itertools; use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::OwnedRow; -use risingwave_common::types::{Datum, DefaultOrdered}; +use risingwave_common::types::{Datum, DefaultOrdered, Sentinelled}; use risingwave_common::util::memcmp_encoding::MemcmpEncoded; use smallvec::SmallVec; @@ -36,6 +36,12 @@ pub struct StateKey { pub pk: DefaultOrdered, } +impl From for Sentinelled { + fn from(key: StateKey) -> Self { + Self::Normal(key) + } +} + #[derive(Debug)] pub struct StatePos<'a> { /// Only 2 cases in which the `key` is `None`: diff --git a/src/expr/core/src/window_function/state/rank.rs b/src/expr/core/src/window_function/state/rank.rs index 61ed4a3af8645..76547ce71db03 100644 --- a/src/expr/core/src/window_function/state/rank.rs +++ b/src/expr/core/src/window_function/state/rank.rs @@ -14,7 +14,7 @@ use std::marker::PhantomData; -use risingwave_common::estimate_size::collections::VecDeque; +use risingwave_common::estimate_size::collections::EstimatedVecDeque; use risingwave_common::estimate_size::EstimateSize; use risingwave_common::types::Datum; use risingwave_common::util::memcmp_encoding::MemcmpEncoded; @@ -111,7 +111,7 @@ pub struct RankState { /// First state key of the partition. first_key: Option, /// State keys that are waiting to be outputted. - buffer: VecDeque, + buffer: EstimatedVecDeque, /// Function-specific state. func_state: RF, _phantom: PhantomData, diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index aaba296832490..d3a035fa8b594 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -22,6 +22,7 @@ async-trait = "0.1" auto_enums = "0.8" await-tree = { workspace = true } bytes = "1" +delta_btree_map = { path = "../utils/delta_btree_map" } educe = "0.5" either = "1" enum-as-inner = "0.6" diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 1fdde9083e15a..541b971a23b25 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -22,7 +22,7 @@ use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::Schema; -use risingwave_common::estimate_size::collections::hashmap::EstimatedHashMap; +use risingwave_common::estimate_size::collections::EstimatedHashMap; use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::{HashKey, PrecomputedBuildHasher}; use risingwave_common::types::ScalarImpl; diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index 4e0f087de26be..9eb35887df7eb 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use risingwave_common::array::stream_record::Record; use risingwave_common::array::{ArrayRef, Op, StreamChunk}; use risingwave_common::catalog::Schema; -use risingwave_common::estimate_size::collections::VecDeque; +use risingwave_common::estimate_size::collections::EstimatedVecDeque; use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{ToDatumRef, ToOwnedDatum}; @@ -46,7 +46,7 @@ use crate::task::AtomicU64Ref; struct Partition { states: WindowStates, - curr_row_buffer: VecDeque, + curr_row_buffer: EstimatedVecDeque, } impl EstimateSize for Partition { diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index 165da65dd8979..8297d51a525c4 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -17,6 +17,7 @@ use std::marker::PhantomData; use std::ops::RangeInclusive; use std::sync::Arc; +use delta_btree_map::{Change, PositionType}; use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; @@ -33,7 +34,6 @@ use risingwave_expr::window_function::{ }; use risingwave_storage::StateStore; -use super::delta_btree_map::Change; use super::over_partition::{ new_empty_partition_cache, shrink_partition_cache, CacheKey, OverPartition, PartitionCache, PartitionDelta, @@ -42,7 +42,6 @@ use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::common::StreamChunkBuilder; use crate::executor::monitor::StreamingMetrics; -use crate::executor::over_window::delta_btree_map::PositionType; use crate::executor::test_utils::prelude::StateTable; use crate::executor::{ expect_first_barrier, ActorContextRef, BoxedExecutor, Executor, ExecutorInfo, Message, diff --git a/src/stream/src/executor/over_window/mod.rs b/src/stream/src/executor/over_window/mod.rs index 7a41bc87da060..8c16e89721d95 100644 --- a/src/stream/src/executor/over_window/mod.rs +++ b/src/stream/src/executor/over_window/mod.rs @@ -12,12 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod delta_btree_map; mod eowc; -mod estimated_btree_map; mod general; mod over_partition; -mod sentinel; pub use eowc::{EowcOverWindowExecutor, EowcOverWindowExecutorArgs}; pub use general::{OverWindowExecutor, OverWindowExecutorArgs}; diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index 1fd860d5d4c5f..287110ed354ad 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -19,23 +19,22 @@ use std::collections::{BTreeMap, VecDeque}; use std::marker::PhantomData; use std::ops::{Bound, RangeInclusive}; +use delta_btree_map::{Change, DeltaBTreeMap}; use futures_async_stream::for_await; use risingwave_common::array::stream_record::Record; +use risingwave_common::estimate_size::collections::EstimatedBTreeMap; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy; +use risingwave_common::types::Sentinelled; use risingwave_expr::window_function::{FrameBounds, StateKey, WindowFuncCall}; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; -use super::delta_btree_map::Change; -use super::estimated_btree_map::EstimatedBTreeMap; use super::general::RowConverter; -use super::sentinel::KeyWithSentinel; -use crate::executor::over_window::delta_btree_map::DeltaBTreeMap; use crate::executor::test_utils::prelude::StateTable; use crate::executor::StreamExecutorResult; -pub(super) type CacheKey = KeyWithSentinel; +pub(super) type CacheKey = Sentinelled; /// Range cache for one over window partition. /// The cache entries can be: @@ -1198,7 +1197,7 @@ mod find_affected_ranges_tests { #[test] fn test_empty_with_sentinels() { - let cache: BTreeMap, OwnedRow> = create_cache!(..., , ...); + let cache: BTreeMap, OwnedRow> = create_cache!(..., , ...); let delta = create_delta!((1, Insert), (2, Insert)); { diff --git a/src/utils/delta_btree_map/Cargo.toml b/src/utils/delta_btree_map/Cargo.toml new file mode 100644 index 0000000000000..8ae7245b2741d --- /dev/null +++ b/src/utils/delta_btree_map/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "delta_btree_map" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[dependencies] +enum-as-inner = "0.6" + +[lints] +workspace = true diff --git a/src/stream/src/executor/over_window/delta_btree_map.rs b/src/utils/delta_btree_map/src/lib.rs similarity index 99% rename from src/stream/src/executor/over_window/delta_btree_map.rs rename to src/utils/delta_btree_map/src/lib.rs index 4203eeb4958fc..a195d67802cb5 100644 --- a/src/stream/src/executor/over_window/delta_btree_map.rs +++ b/src/utils/delta_btree_map/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(btree_cursors)] + use std::cmp::Ordering; use std::collections::BTreeMap; use std::ops::Bound; @@ -21,13 +23,13 @@ use enum_as_inner::EnumAsInner; /// [`DeltaBTreeMap`] wraps two [`BTreeMap`] references respectively as snapshot and delta, /// providing cursor that can iterate over the updated version of the snapshot. #[derive(Debug, Clone, Copy)] -pub(super) struct DeltaBTreeMap<'a, K: Ord, V> { +pub struct DeltaBTreeMap<'a, K: Ord, V> { snapshot: &'a BTreeMap, delta: &'a BTreeMap>, } #[derive(Debug, Clone, Copy, PartialEq, Eq, EnumAsInner)] -pub(super) enum Change { +pub enum Change { Insert(V), Delete, } @@ -144,7 +146,7 @@ impl<'a, K: Ord, V> DeltaBTreeMap<'a, K, V> { /// Cursor that can iterate back and forth over the updated version of the snapshot. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(super) struct CursorWithDelta<'a, K: Ord, V> { +pub struct CursorWithDelta<'a, K: Ord, V> { snapshot: &'a BTreeMap, delta: &'a BTreeMap>, curr_key_value: Option<(&'a K, &'a V)>, @@ -153,7 +155,7 @@ pub(super) struct CursorWithDelta<'a, K: Ord, V> { /// Type of cursor position. [`PositionType::Ghost`] is a special position between the first and /// the last item, where the key and value are `None`. #[derive(Debug, Clone, Copy, PartialEq, Eq, EnumAsInner)] -pub(super) enum PositionType { +pub enum PositionType { Ghost, Snapshot, DeltaUpdate, @@ -191,7 +193,6 @@ impl<'a, K: Ord, V> CursorWithDelta<'a, K, V> { } /// Get the value pointed by the cursor. - #[cfg_attr(not(test), expect(dead_code))] pub fn value(&self) -> Option<&'a V> { self.curr_key_value.map(|(_, v)| v) }