Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(common): move some over window utils to common #14204

Merged
merged 10 commits into from
Dec 27, 2023
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ members = [
"src/utils/sync-point",
"src/utils/variables",
"src/utils/with_options",
"src/utils/delta_btree_map",
"src/utils/workspace-config",
"src/workspace-hack",
]
Expand Down Expand Up @@ -136,7 +137,9 @@ arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" }
arrow-schema-deltalake = { package = "arrow-schema", version = "48.0.1" }
# Use a forked version which removes the dependencies on dynamo db to reduce
# compile time and binary size.
deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd4640490202ffe98adbd13b09cef8e007b", features = ["s3-no-concurrent-write"] }
deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd4640490202ffe98adbd13b09cef8e007b", features = [
"s3-no-concurrent-write",
] }
parquet = "49"
thiserror-ext = "0.0.10"
tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::BTreeMap;
use std::ops::{Bound, RangeInclusive};

use risingwave_common::estimate_size::{EstimateSize, KvSize};
use crate::estimate_size::{EstimateSize, KvSize};

pub struct EstimatedBTreeMap<K, V> {
inner: BTreeMap<K, V>,
Expand Down Expand Up @@ -77,7 +77,6 @@ where
}
}

#[expect(dead_code)]
pub fn clear(&mut self) {
self.inner.clear();
self.heap_size.set(0);
Expand Down
6 changes: 4 additions & 2 deletions src/common/src/estimate_size/collections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ mod heap;
pub mod lru;
pub use heap::*;
pub mod vecdeque;
pub use vecdeque::EstimatedVecDeque as VecDeque;
pub use vecdeque::EstimatedVecDeque;
pub mod hashmap;
pub use hashmap::EstimatedHashMap as HashMap;
pub use hashmap::EstimatedHashMap;
pub mod btreemap;
pub use btreemap::EstimatedBTreeMap;

mod private {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#![feature(negative_impls)]
#![feature(bound_map)]
#![feature(array_methods)]
#![feature(btree_cursors)]
stdrc marked this conversation as resolved.
Show resolved Hide resolved

#[cfg_attr(not(test), expect(unused_extern_crates))]
extern crate self as risingwave_common;
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ mod ordered;
mod ordered_float;
mod postgres_type;
mod scalar_impl;
mod sentinel;
mod serial;
mod struct_type;
mod successor;
Expand All @@ -81,6 +82,7 @@ pub use self::ops::{CheckedAdd, IsNegative};
pub use self::ordered::*;
pub use self::ordered_float::{FloatExt, IntoOrdered};
pub use self::scalar_impl::*;
pub use self::sentinel::Sentinelled;
pub use self::serial::Serial;
pub use self::struct_type::StructType;
pub use self::successor::Successor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,23 @@
// limitations under the License.

use enum_as_inner::EnumAsInner;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_expr::window_function::StateKey;

use crate::estimate_size::EstimateSize;

/// [`Sentinelled<T>`] wraps type `T` to provide smallest (smaller than any normal `T` value) and largest
/// (larger than ant normal `T` value) sentinel value for `T`.
///
/// Sentinel is a very common technique used to simplify tree/list/array algorithms. The main idea is to
/// insert sentinel node to the beginning or/and the end, so that algorithms don't need to handle complex
/// edge cases.
#[derive(Debug, Clone, PartialEq, Eq, EnumAsInner)]
pub(super) enum KeyWithSentinel<T> {
pub enum Sentinelled<T> {
xxchan marked this conversation as resolved.
Show resolved Hide resolved
Smallest,
Normal(T),
Largest,
}

impl<T> KeyWithSentinel<T> {
impl<T> Sentinelled<T> {
pub fn as_normal_expect(&self) -> &T {
self.as_normal().expect("expect normal key")
}
Expand All @@ -33,7 +39,7 @@ impl<T> KeyWithSentinel<T> {
}
}

impl<T> PartialOrd for KeyWithSentinel<T>
impl<T> PartialOrd for Sentinelled<T>
where
T: Ord,
{
Expand All @@ -42,12 +48,12 @@ where
}
}

impl<T> Ord for KeyWithSentinel<T>
impl<T> Ord for Sentinelled<T>
where
T: Ord,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
use KeyWithSentinel::*;
use Sentinelled::*;
match (self, other) {
(Smallest, Smallest) => std::cmp::Ordering::Equal,
(Smallest, _) => std::cmp::Ordering::Less,
Expand All @@ -60,7 +66,7 @@ where
}
}

impl<T: EstimateSize> EstimateSize for KeyWithSentinel<T> {
impl<T: EstimateSize> EstimateSize for Sentinelled<T> {
fn estimated_heap_size(&self) -> usize {
match self {
Self::Smallest => 0,
Expand All @@ -69,9 +75,3 @@ impl<T: EstimateSize> EstimateSize for KeyWithSentinel<T> {
}
}
}

impl From<StateKey> for KeyWithSentinel<StateKey> {
fn from(key: StateKey) -> Self {
Self::Normal(key)
}
}
8 changes: 7 additions & 1 deletion src/expr/core/src/window_function/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::BTreeSet;
use itertools::Itertools;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{Datum, DefaultOrdered};
use risingwave_common::types::{Datum, DefaultOrdered, Sentinelled};
use risingwave_common::util::memcmp_encoding::MemcmpEncoded;
use smallvec::SmallVec;

Expand All @@ -36,6 +36,12 @@ pub struct StateKey {
pub pk: DefaultOrdered<OwnedRow>,
}

impl From<StateKey> for Sentinelled<StateKey> {
fn from(key: StateKey) -> Self {
Self::Normal(key)
}
}

#[derive(Debug)]
pub struct StatePos<'a> {
/// Only 2 cases in which the `key` is `None`:
Expand Down
4 changes: 2 additions & 2 deletions src/expr/core/src/window_function/state/rank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::marker::PhantomData;

use risingwave_common::estimate_size::collections::VecDeque;
use risingwave_common::estimate_size::collections::EstimatedVecDeque;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::types::Datum;
use risingwave_common::util::memcmp_encoding::MemcmpEncoded;
Expand Down Expand Up @@ -111,7 +111,7 @@ pub struct RankState<RF: RankFuncCount> {
/// First state key of the partition.
first_key: Option<StateKey>,
/// State keys that are waiting to be outputted.
buffer: VecDeque<StateKey>,
buffer: EstimatedVecDeque<StateKey>,
/// Function-specific state.
func_state: RF,
_phantom: PhantomData<RF>,
Expand Down
1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async-trait = "0.1"
auto_enums = "0.8"
await-tree = { workspace = true }
bytes = "1"
delta_btree_map = { path = "../utils/delta_btree_map" }
educe = "0.5"
either = "1"
enum-as-inner = "0.6"
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::Schema;
use risingwave_common::estimate_size::collections::hashmap::EstimatedHashMap;
use risingwave_common::estimate_size::collections::EstimatedHashMap;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::hash::{HashKey, PrecomputedBuildHasher};
use risingwave_common::types::ScalarImpl;
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/over_window/eowc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use itertools::Itertools;
use risingwave_common::array::stream_record::Record;
use risingwave_common::array::{ArrayRef, Op, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::estimate_size::collections::VecDeque;
use risingwave_common::estimate_size::collections::EstimatedVecDeque;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::{ToDatumRef, ToOwnedDatum};
Expand All @@ -46,7 +46,7 @@ use crate::task::AtomicU64Ref;

struct Partition {
states: WindowStates,
curr_row_buffer: VecDeque<OwnedRow>,
curr_row_buffer: EstimatedVecDeque<OwnedRow>,
}

impl EstimateSize for Partition {
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::marker::PhantomData;
use std::ops::RangeInclusive;
use std::sync::Arc;

use delta_btree_map::{Change, PositionType};
use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
Expand All @@ -33,7 +34,6 @@ use risingwave_expr::window_function::{
};
use risingwave_storage::StateStore;

use super::delta_btree_map::Change;
use super::over_partition::{
new_empty_partition_cache, shrink_partition_cache, CacheKey, OverPartition, PartitionCache,
PartitionDelta,
Expand All @@ -42,7 +42,6 @@ use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::common::StreamChunkBuilder;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::over_window::delta_btree_map::PositionType;
use crate::executor::test_utils::prelude::StateTable;
use crate::executor::{
expect_first_barrier, ActorContextRef, BoxedExecutor, Executor, ExecutorInfo, Message,
Expand Down
3 changes: 0 additions & 3 deletions src/stream/src/executor/over_window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod delta_btree_map;
mod eowc;
mod estimated_btree_map;
mod general;
mod over_partition;
mod sentinel;

pub use eowc::{EowcOverWindowExecutor, EowcOverWindowExecutorArgs};
pub use general::{OverWindowExecutor, OverWindowExecutorArgs};
11 changes: 5 additions & 6 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,22 @@ use std::collections::{BTreeMap, VecDeque};
use std::marker::PhantomData;
use std::ops::{Bound, RangeInclusive};

use delta_btree_map::{Change, DeltaBTreeMap};
use futures_async_stream::for_await;
use risingwave_common::array::stream_record::Record;
use risingwave_common::estimate_size::collections::EstimatedBTreeMap;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
use risingwave_common::types::Sentinelled;
use risingwave_expr::window_function::{FrameBounds, StateKey, WindowFuncCall};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

use super::delta_btree_map::Change;
use super::estimated_btree_map::EstimatedBTreeMap;
use super::general::RowConverter;
use super::sentinel::KeyWithSentinel;
use crate::executor::over_window::delta_btree_map::DeltaBTreeMap;
use crate::executor::test_utils::prelude::StateTable;
use crate::executor::StreamExecutorResult;

pub(super) type CacheKey = KeyWithSentinel<StateKey>;
pub(super) type CacheKey = Sentinelled<StateKey>;

/// Range cache for one over window partition.
/// The cache entries can be:
Expand Down Expand Up @@ -1198,7 +1197,7 @@ mod find_affected_ranges_tests {

#[test]
fn test_empty_with_sentinels() {
let cache: BTreeMap<KeyWithSentinel<StateKey>, OwnedRow> = create_cache!(..., , ...);
let cache: BTreeMap<Sentinelled<StateKey>, OwnedRow> = create_cache!(..., , ...);
let delta = create_delta!((1, Insert), (2, Insert));

{
Expand Down
21 changes: 21 additions & 0 deletions src/utils/delta_btree_map/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "delta_btree_map"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As our convention, the name shall be risingwave_delta_btree_map.

version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
repository = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[package.metadata.cargo-machete]
ignored = ["workspace-hack"]

[package.metadata.cargo-udeps.ignore]
normal = ["workspace-hack"]

[dependencies]
enum-as-inner = "0.6"

[lints]
workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(btree_cursors)]
stdrc marked this conversation as resolved.
Show resolved Hide resolved

use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::ops::Bound;
Expand All @@ -21,13 +23,13 @@ use enum_as_inner::EnumAsInner;
/// [`DeltaBTreeMap`] wraps two [`BTreeMap`] references respectively as snapshot and delta,
/// providing cursor that can iterate over the updated version of the snapshot.
#[derive(Debug, Clone, Copy)]
pub(super) struct DeltaBTreeMap<'a, K: Ord, V> {
pub struct DeltaBTreeMap<'a, K: Ord, V> {
snapshot: &'a BTreeMap<K, V>,
delta: &'a BTreeMap<K, Change<V>>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumAsInner)]
pub(super) enum Change<V> {
pub enum Change<V> {
Insert(V),
Delete,
}
Expand Down Expand Up @@ -144,7 +146,7 @@ impl<'a, K: Ord, V> DeltaBTreeMap<'a, K, V> {

/// Cursor that can iterate back and forth over the updated version of the snapshot.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct CursorWithDelta<'a, K: Ord, V> {
pub struct CursorWithDelta<'a, K: Ord, V> {
snapshot: &'a BTreeMap<K, V>,
delta: &'a BTreeMap<K, Change<V>>,
curr_key_value: Option<(&'a K, &'a V)>,
Expand All @@ -153,7 +155,7 @@ pub(super) struct CursorWithDelta<'a, K: Ord, V> {
/// Type of cursor position. [`PositionType::Ghost`] is a special position between the first and
/// the last item, where the key and value are `None`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumAsInner)]
pub(super) enum PositionType {
pub enum PositionType {
Ghost,
Snapshot,
DeltaUpdate,
Expand Down Expand Up @@ -191,7 +193,6 @@ impl<'a, K: Ord, V> CursorWithDelta<'a, K, V> {
}

/// Get the value pointed by the cursor.
#[cfg_attr(not(test), expect(dead_code))]
pub fn value(&self) -> Option<&'a V> {
self.curr_key_value.map(|(_, v)| v)
}
Expand Down
Loading