Skip to content

Commit

Permalink
refactor(common): move some over window utils to common (#14204)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Dec 27, 2023
1 parent ec7bf4b commit d27d6c1
Show file tree
Hide file tree
Showing 17 changed files with 80 additions and 41 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ members = [
"src/tests/simulation",
"src/tests/sqlsmith",
"src/tests/state_cleaning_test",
"src/utils/delta_btree_map",
"src/utils/local_stats_alloc",
"src/utils/pgwire",
"src/utils/runtime",
Expand Down Expand Up @@ -136,7 +137,9 @@ arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" }
arrow-schema-deltalake = { package = "arrow-schema", version = "48.0.1" }
# Use a forked version which removes the dependencies on dynamo db to reduce
# compile time and binary size.
deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd4640490202ffe98adbd13b09cef8e007b", features = ["s3-no-concurrent-write"] }
deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd4640490202ffe98adbd13b09cef8e007b", features = [
"s3-no-concurrent-write",
] }
parquet = "49"
thiserror-ext = "0.0.10"
tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::BTreeMap;
use std::ops::{Bound, RangeInclusive};

use risingwave_common::estimate_size::{EstimateSize, KvSize};
use crate::estimate_size::{EstimateSize, KvSize};

pub struct EstimatedBTreeMap<K, V> {
inner: BTreeMap<K, V>,
Expand Down Expand Up @@ -77,7 +77,6 @@ where
}
}

#[expect(dead_code)]
pub fn clear(&mut self) {
self.inner.clear();
self.heap_size.set(0);
Expand Down
6 changes: 4 additions & 2 deletions src/common/src/estimate_size/collections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ mod heap;
pub mod lru;
pub use heap::*;
pub mod vecdeque;
pub use vecdeque::EstimatedVecDeque as VecDeque;
pub use vecdeque::EstimatedVecDeque;
pub mod hashmap;
pub use hashmap::EstimatedHashMap as HashMap;
pub use hashmap::EstimatedHashMap;
pub mod btreemap;
pub use btreemap::EstimatedBTreeMap;

mod private {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#![feature(negative_impls)]
#![feature(bound_map)]
#![feature(array_methods)]
#![feature(btree_cursors)]

#[cfg_attr(not(test), expect(unused_extern_crates))]
extern crate self as risingwave_common;
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ mod ordered;
mod ordered_float;
mod postgres_type;
mod scalar_impl;
mod sentinel;
mod serial;
mod struct_type;
mod successor;
Expand All @@ -81,6 +82,7 @@ pub use self::ops::{CheckedAdd, IsNegative};
pub use self::ordered::*;
pub use self::ordered_float::{FloatExt, IntoOrdered};
pub use self::scalar_impl::*;
pub use self::sentinel::Sentinelled;
pub use self::serial::Serial;
pub use self::struct_type::StructType;
pub use self::successor::Successor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,23 @@
// limitations under the License.

use enum_as_inner::EnumAsInner;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_expr::window_function::StateKey;

use crate::estimate_size::EstimateSize;

/// [`Sentinelled<T>`] wraps type `T` to provide smallest (smaller than any normal `T` value) and largest
/// (larger than ant normal `T` value) sentinel value for `T`.
///
/// Sentinel is a very common technique used to simplify tree/list/array algorithms. The main idea is to
/// insert sentinel node to the beginning or/and the end, so that algorithms don't need to handle complex
/// edge cases.
#[derive(Debug, Clone, PartialEq, Eq, EnumAsInner)]
pub(super) enum KeyWithSentinel<T> {
pub enum Sentinelled<T> {
Smallest,
Normal(T),
Largest,
}

impl<T> KeyWithSentinel<T> {
impl<T> Sentinelled<T> {
pub fn as_normal_expect(&self) -> &T {
self.as_normal().expect("expect normal key")
}
Expand All @@ -33,7 +39,7 @@ impl<T> KeyWithSentinel<T> {
}
}

impl<T> PartialOrd for KeyWithSentinel<T>
impl<T> PartialOrd for Sentinelled<T>
where
T: Ord,
{
Expand All @@ -42,12 +48,12 @@ where
}
}

impl<T> Ord for KeyWithSentinel<T>
impl<T> Ord for Sentinelled<T>
where
T: Ord,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
use KeyWithSentinel::*;
use Sentinelled::*;
match (self, other) {
(Smallest, Smallest) => std::cmp::Ordering::Equal,
(Smallest, _) => std::cmp::Ordering::Less,
Expand All @@ -60,7 +66,7 @@ where
}
}

impl<T: EstimateSize> EstimateSize for KeyWithSentinel<T> {
impl<T: EstimateSize> EstimateSize for Sentinelled<T> {
fn estimated_heap_size(&self) -> usize {
match self {
Self::Smallest => 0,
Expand All @@ -69,9 +75,3 @@ impl<T: EstimateSize> EstimateSize for KeyWithSentinel<T> {
}
}
}

impl From<StateKey> for KeyWithSentinel<StateKey> {
fn from(key: StateKey) -> Self {
Self::Normal(key)
}
}
8 changes: 7 additions & 1 deletion src/expr/core/src/window_function/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::BTreeSet;
use itertools::Itertools;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{Datum, DefaultOrdered};
use risingwave_common::types::{Datum, DefaultOrdered, Sentinelled};
use risingwave_common::util::memcmp_encoding::MemcmpEncoded;
use smallvec::SmallVec;

Expand All @@ -36,6 +36,12 @@ pub struct StateKey {
pub pk: DefaultOrdered<OwnedRow>,
}

impl From<StateKey> for Sentinelled<StateKey> {
fn from(key: StateKey) -> Self {
Self::Normal(key)
}
}

#[derive(Debug)]
pub struct StatePos<'a> {
/// Only 2 cases in which the `key` is `None`:
Expand Down
4 changes: 2 additions & 2 deletions src/expr/core/src/window_function/state/rank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::marker::PhantomData;

use risingwave_common::estimate_size::collections::VecDeque;
use risingwave_common::estimate_size::collections::EstimatedVecDeque;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::types::Datum;
use risingwave_common::util::memcmp_encoding::MemcmpEncoded;
Expand Down Expand Up @@ -111,7 +111,7 @@ pub struct RankState<RF: RankFuncCount> {
/// First state key of the partition.
first_key: Option<StateKey>,
/// State keys that are waiting to be outputted.
buffer: VecDeque<StateKey>,
buffer: EstimatedVecDeque<StateKey>,
/// Function-specific state.
func_state: RF,
_phantom: PhantomData<RF>,
Expand Down
1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async-trait = "0.1"
auto_enums = "0.8"
await-tree = { workspace = true }
bytes = "1"
delta_btree_map = { path = "../utils/delta_btree_map" }
educe = "0.5"
either = "1"
enum-as-inner = "0.6"
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::Schema;
use risingwave_common::estimate_size::collections::hashmap::EstimatedHashMap;
use risingwave_common::estimate_size::collections::EstimatedHashMap;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::hash::{HashKey, PrecomputedBuildHasher};
use risingwave_common::types::ScalarImpl;
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/over_window/eowc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use itertools::Itertools;
use risingwave_common::array::stream_record::Record;
use risingwave_common::array::{ArrayRef, Op, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::estimate_size::collections::VecDeque;
use risingwave_common::estimate_size::collections::EstimatedVecDeque;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::{ToDatumRef, ToOwnedDatum};
Expand All @@ -46,7 +46,7 @@ use crate::task::AtomicU64Ref;

struct Partition {
states: WindowStates,
curr_row_buffer: VecDeque<OwnedRow>,
curr_row_buffer: EstimatedVecDeque<OwnedRow>,
}

impl EstimateSize for Partition {
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::marker::PhantomData;
use std::ops::RangeInclusive;
use std::sync::Arc;

use delta_btree_map::{Change, PositionType};
use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
Expand All @@ -33,7 +34,6 @@ use risingwave_expr::window_function::{
};
use risingwave_storage::StateStore;

use super::delta_btree_map::Change;
use super::over_partition::{
new_empty_partition_cache, shrink_partition_cache, CacheKey, OverPartition, PartitionCache,
PartitionDelta,
Expand All @@ -42,7 +42,6 @@ use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::common::StreamChunkBuilder;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::over_window::delta_btree_map::PositionType;
use crate::executor::test_utils::prelude::StateTable;
use crate::executor::{
expect_first_barrier, ActorContextRef, BoxedExecutor, Executor, ExecutorInfo, Message,
Expand Down
3 changes: 0 additions & 3 deletions src/stream/src/executor/over_window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod delta_btree_map;
mod eowc;
mod estimated_btree_map;
mod general;
mod over_partition;
mod sentinel;

pub use eowc::{EowcOverWindowExecutor, EowcOverWindowExecutorArgs};
pub use general::{OverWindowExecutor, OverWindowExecutorArgs};
11 changes: 5 additions & 6 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,22 @@ use std::collections::{BTreeMap, VecDeque};
use std::marker::PhantomData;
use std::ops::{Bound, RangeInclusive};

use delta_btree_map::{Change, DeltaBTreeMap};
use futures_async_stream::for_await;
use risingwave_common::array::stream_record::Record;
use risingwave_common::estimate_size::collections::EstimatedBTreeMap;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
use risingwave_common::types::Sentinelled;
use risingwave_expr::window_function::{FrameBounds, StateKey, WindowFuncCall};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

use super::delta_btree_map::Change;
use super::estimated_btree_map::EstimatedBTreeMap;
use super::general::RowConverter;
use super::sentinel::KeyWithSentinel;
use crate::executor::over_window::delta_btree_map::DeltaBTreeMap;
use crate::executor::test_utils::prelude::StateTable;
use crate::executor::StreamExecutorResult;

pub(super) type CacheKey = KeyWithSentinel<StateKey>;
pub(super) type CacheKey = Sentinelled<StateKey>;

/// Range cache for one over window partition.
/// The cache entries can be:
Expand Down Expand Up @@ -1198,7 +1197,7 @@ mod find_affected_ranges_tests {

#[test]
fn test_empty_with_sentinels() {
let cache: BTreeMap<KeyWithSentinel<StateKey>, OwnedRow> = create_cache!(..., , ...);
let cache: BTreeMap<Sentinelled<StateKey>, OwnedRow> = create_cache!(..., , ...);
let delta = create_delta!((1, Insert), (2, Insert));

{
Expand Down
21 changes: 21 additions & 0 deletions src/utils/delta_btree_map/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "delta_btree_map"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
repository = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[package.metadata.cargo-machete]
ignored = ["workspace-hack"]

[package.metadata.cargo-udeps.ignore]
normal = ["workspace-hack"]

[dependencies]
enum-as-inner = "0.6"

[lints]
workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(btree_cursors)]

use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::ops::Bound;
Expand All @@ -21,13 +23,13 @@ use enum_as_inner::EnumAsInner;
/// [`DeltaBTreeMap`] wraps two [`BTreeMap`] references respectively as snapshot and delta,
/// providing cursor that can iterate over the updated version of the snapshot.
#[derive(Debug, Clone, Copy)]
pub(super) struct DeltaBTreeMap<'a, K: Ord, V> {
pub struct DeltaBTreeMap<'a, K: Ord, V> {
snapshot: &'a BTreeMap<K, V>,
delta: &'a BTreeMap<K, Change<V>>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumAsInner)]
pub(super) enum Change<V> {
pub enum Change<V> {
Insert(V),
Delete,
}
Expand Down Expand Up @@ -144,7 +146,7 @@ impl<'a, K: Ord, V> DeltaBTreeMap<'a, K, V> {

/// Cursor that can iterate back and forth over the updated version of the snapshot.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct CursorWithDelta<'a, K: Ord, V> {
pub struct CursorWithDelta<'a, K: Ord, V> {
snapshot: &'a BTreeMap<K, V>,
delta: &'a BTreeMap<K, Change<V>>,
curr_key_value: Option<(&'a K, &'a V)>,
Expand All @@ -153,7 +155,7 @@ pub(super) struct CursorWithDelta<'a, K: Ord, V> {
/// Type of cursor position. [`PositionType::Ghost`] is a special position between the first and
/// the last item, where the key and value are `None`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumAsInner)]
pub(super) enum PositionType {
pub enum PositionType {
Ghost,
Snapshot,
DeltaUpdate,
Expand Down Expand Up @@ -191,7 +193,6 @@ impl<'a, K: Ord, V> CursorWithDelta<'a, K, V> {
}

/// Get the value pointed by the cursor.
#[cfg_attr(not(test), expect(dead_code))]
pub fn value(&self) -> Option<&'a V> {
self.curr_key_value.map(|(_, v)| v)
}
Expand Down

0 comments on commit d27d6c1

Please sign in to comment.