From 8a056148650dbe7bd8202af33415da6ee1eecca9 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 31 May 2024 15:50:39 +0800 Subject: [PATCH 1/3] move window state impls to `expr_impl` Signed-off-by: Richard Chien --- Cargo.lock | 2 + .../{state/mod.rs => state.rs} | 29 +------------ src/expr/impl/Cargo.toml | 2 + src/expr/impl/src/lib.rs | 1 + .../src/window_function}/aggregate.rs | 16 +++---- .../src/window_function}/buffer.rs | 8 ++-- src/expr/impl/src/window_function/mod.rs | 42 +++++++++++++++++++ .../src/window_function}/range_utils.rs | 0 .../src/window_function}/rank.rs | 15 +++---- 9 files changed, 69 insertions(+), 46 deletions(-) rename src/expr/core/src/window_function/{state/mod.rs => state.rs} (81%) rename src/expr/{core/src/window_function/state => impl/src/window_function}/aggregate.rs (96%) rename src/expr/{core/src/window_function/state => impl/src/window_function}/buffer.rs (99%) create mode 100644 src/expr/impl/src/window_function/mod.rs rename src/expr/{core/src/window_function/state => impl/src/window_function}/range_utils.rs (100%) rename src/expr/{core/src/window_function/state => impl/src/window_function}/rank.rs (97%) diff --git a/Cargo.lock b/Cargo.lock index b34a432d55762..13fe9b52c10bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10814,6 +10814,7 @@ dependencies = [ "chrono", "chrono-tz 0.9.0", "criterion", + "educe 0.5.7", "expect-test", "fancy-regex", "futures-async-stream", @@ -10841,6 +10842,7 @@ dependencies = [ "serde_json", "sha1", "sha2", + "smallvec", "sql-json-path", "thiserror", "thiserror-ext", diff --git a/src/expr/core/src/window_function/state/mod.rs b/src/expr/core/src/window_function/state.rs similarity index 81% rename from src/expr/core/src/window_function/state/mod.rs rename to src/expr/core/src/window_function/state.rs index 6a86667bfcafc..e25e450a117a0 100644 --- a/src/expr/core/src/window_function/state/mod.rs +++ b/src/expr/core/src/window_function/state.rs @@ -14,20 +14,13 @@ use std::collections::BTreeSet; -use itertools::Itertools; use risingwave_common::row::OwnedRow; use risingwave_common::types::{Datum, DefaultOrdered}; use risingwave_common::util::memcmp_encoding::MemcmpEncoded; use risingwave_common_estimate_size::EstimateSize; use smallvec::SmallVec; -use super::{WindowFuncCall, WindowFuncKind}; -use crate::{ExprError, Result}; - -mod aggregate; -mod buffer; -mod range_utils; -mod rank; +use crate::Result; /// Unique and ordered identifier for a row in internal states. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, EstimateSize)] @@ -109,23 +102,3 @@ pub trait WindowState: EstimateSize { } pub type BoxedWindowState = Box; - -pub fn create_window_state(call: &WindowFuncCall) -> Result { - assert!(call.frame.bounds.validate().is_ok()); - - use WindowFuncKind::*; - Ok(match call.kind { - RowNumber => Box::new(rank::RankState::::new(call)), - Rank => Box::new(rank::RankState::::new(call)), - DenseRank => Box::new(rank::RankState::::new(call)), - Aggregate(_) => aggregate::new(call)?, - kind => { - return Err(ExprError::UnsupportedFunction(format!( - "{}({}) -> {}", - kind, - call.args.arg_types().iter().format(", "), - &call.return_type, - ))); - } - }) -} diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index 292e631345fad..9e94e3395e761 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -40,6 +40,7 @@ chrono = { version = "0.4", default-features = false, features = [ "std", ] } chrono-tz = { version = "0.9", features = ["case-insensitive"] } +educe = "0.5" fancy-regex = "0.13" futures-async-stream = { workspace = true } futures-util = "0.3" @@ -65,6 +66,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" sha1 = "0.10" sha2 = "0.10" +smallvec = "1" sql-json-path = { version = "0.1", features = ["jsonbb"] } thiserror = "1" thiserror-ext = { workspace = true } diff --git a/src/expr/impl/src/lib.rs b/src/expr/impl/src/lib.rs index 50012668b001e..56bdbe3b81100 100644 --- a/src/expr/impl/src/lib.rs +++ b/src/expr/impl/src/lib.rs @@ -37,6 +37,7 @@ mod aggregate; mod scalar; mod table_function; mod udf; +mod window_function; /// Enable functions in this crate. #[macro_export] diff --git a/src/expr/core/src/window_function/state/aggregate.rs b/src/expr/impl/src/window_function/aggregate.rs similarity index 96% rename from src/expr/core/src/window_function/state/aggregate.rs rename to src/expr/impl/src/window_function/aggregate.rs index 286dc1e17174c..7710a1b7adb2d 100644 --- a/src/expr/core/src/window_function/state/aggregate.rs +++ b/src/expr/impl/src/window_function/aggregate.rs @@ -20,16 +20,18 @@ use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::{bail, must_match}; use risingwave_common_estimate_size::{EstimateSize, KvSize}; +use risingwave_expr::aggregate::{ + AggCall, AggregateFunction, AggregateState as AggImplState, BoxedAggregateFunction, +}; +use risingwave_expr::sig::FUNCTION_REGISTRY; +use risingwave_expr::window_function::{ + BoxedWindowState, FrameBounds, StateEvictHint, StateKey, StatePos, WindowFuncCall, + WindowFuncKind, WindowState, +}; +use risingwave_expr::Result; use smallvec::SmallVec; use super::buffer::{RangeWindow, RowsWindow, WindowBuffer, WindowImpl}; -use super::{BoxedWindowState, StateEvictHint, StateKey, StatePos, WindowState}; -use crate::aggregate::{ - AggCall, AggregateFunction, AggregateState as AggImplState, BoxedAggregateFunction, -}; -use crate::sig::FUNCTION_REGISTRY; -use crate::window_function::{FrameBounds, WindowFuncCall, WindowFuncKind}; -use crate::Result; type StateValue = SmallVec<[Datum; 2]>; diff --git a/src/expr/core/src/window_function/state/buffer.rs b/src/expr/impl/src/window_function/buffer.rs similarity index 99% rename from src/expr/core/src/window_function/state/buffer.rs rename to src/expr/impl/src/window_function/buffer.rs index ba064042c0021..53c5a4de89ce4 100644 --- a/src/expr/core/src/window_function/state/buffer.rs +++ b/src/expr/impl/src/window_function/buffer.rs @@ -19,11 +19,11 @@ use educe::Educe; use risingwave_common::array::Op; use risingwave_common::types::Sentinelled; use risingwave_common::util::memcmp_encoding; +use risingwave_expr::window_function::{ + FrameExclusion, RangeFrameBounds, RowsFrameBounds, StateKey, +}; -use super::range_utils::range_except; -use super::StateKey; -use crate::window_function::state::range_utils::range_diff; -use crate::window_function::{FrameExclusion, RangeFrameBounds, RowsFrameBounds}; +use super::range_utils::{range_diff, range_except}; /// A common sliding window buffer. pub(super) struct WindowBuffer { diff --git a/src/expr/impl/src/window_function/mod.rs b/src/expr/impl/src/window_function/mod.rs new file mode 100644 index 0000000000000..138f08e67b2b8 --- /dev/null +++ b/src/expr/impl/src/window_function/mod.rs @@ -0,0 +1,42 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_expr::window_function::{BoxedWindowState, WindowFuncCall, WindowFuncKind}; +use risingwave_expr::{ExprError, Result}; + +mod aggregate; +mod buffer; +mod range_utils; +mod rank; + +pub fn create_window_state(call: &WindowFuncCall) -> Result { + assert!(call.frame.bounds.validate().is_ok()); + + use WindowFuncKind::*; + Ok(match call.kind { + RowNumber => Box::new(rank::RankState::::new(call)), + Rank => Box::new(rank::RankState::::new(call)), + DenseRank => Box::new(rank::RankState::::new(call)), + Aggregate(_) => aggregate::new(call)?, + kind => { + return Err(ExprError::UnsupportedFunction(format!( + "{}({}) -> {}", + kind, + call.args.arg_types().iter().format(", "), + &call.return_type, + ))); + } + }) +} diff --git a/src/expr/core/src/window_function/state/range_utils.rs b/src/expr/impl/src/window_function/range_utils.rs similarity index 100% rename from src/expr/core/src/window_function/state/range_utils.rs rename to src/expr/impl/src/window_function/range_utils.rs diff --git a/src/expr/core/src/window_function/state/rank.rs b/src/expr/impl/src/window_function/rank.rs similarity index 97% rename from src/expr/core/src/window_function/state/rank.rs rename to src/expr/impl/src/window_function/rank.rs index 83907fadcc78b..2078ec2514a9e 100644 --- a/src/expr/core/src/window_function/state/rank.rs +++ b/src/expr/impl/src/window_function/rank.rs @@ -18,12 +18,13 @@ use risingwave_common::types::Datum; use risingwave_common::util::memcmp_encoding::MemcmpEncoded; use risingwave_common_estimate_size::collections::EstimatedVecDeque; use risingwave_common_estimate_size::EstimateSize; +use risingwave_expr::window_function::{ + StateEvictHint, StateKey, StatePos, WindowFuncCall, WindowState, +}; +use risingwave_expr::Result; use smallvec::SmallVec; use self::private::RankFuncCount; -use super::{StateEvictHint, StateKey, StatePos, WindowState}; -use crate::window_function::WindowFuncCall; -use crate::Result; mod private { use super::*; @@ -34,7 +35,7 @@ mod private { } #[derive(Default, EstimateSize)] -pub struct RowNumber { +pub(super) struct RowNumber { prev_rank: i64, } @@ -47,7 +48,7 @@ impl RankFuncCount for RowNumber { } #[derive(EstimateSize)] -pub struct Rank { +pub(super) struct Rank { prev_order_key: Option, prev_rank: i64, prev_pos_in_peer_group: i64, @@ -83,7 +84,7 @@ impl RankFuncCount for Rank { } #[derive(Default, EstimateSize)] -pub struct DenseRank { +pub(super) struct DenseRank { prev_order_key: Option, prev_rank: i64, } @@ -107,7 +108,7 @@ impl RankFuncCount for DenseRank { /// Generic state for rank window functions including `row_number`, `rank` and `dense_rank`. #[derive(EstimateSize)] -pub struct RankState { +pub(super) struct RankState { /// First state key of the partition. first_key: Option, /// State keys that are waiting to be outputted. From 065e2f74dec253bd8e837301d4d77e6cced9e232 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 31 May 2024 17:05:33 +0800 Subject: [PATCH 2/3] fix `create_window_state` Signed-off-by: Richard Chien --- src/expr/core/src/window_function/state.rs | 23 +++++++++++++++++++++- src/expr/impl/src/window_function/mod.rs | 7 +++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/src/expr/core/src/window_function/state.rs b/src/expr/core/src/window_function/state.rs index e25e450a117a0..ec5bfc70d8591 100644 --- a/src/expr/core/src/window_function/state.rs +++ b/src/expr/core/src/window_function/state.rs @@ -14,13 +14,15 @@ use std::collections::BTreeSet; +use itertools::Itertools; use risingwave_common::row::OwnedRow; use risingwave_common::types::{Datum, DefaultOrdered}; use risingwave_common::util::memcmp_encoding::MemcmpEncoded; use risingwave_common_estimate_size::EstimateSize; use smallvec::SmallVec; -use crate::Result; +use super::WindowFuncCall; +use crate::{ExprError, Result}; /// Unique and ordered identifier for a row in internal states. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, EstimateSize)] @@ -102,3 +104,22 @@ pub trait WindowState: EstimateSize { } pub type BoxedWindowState = Box; + +#[linkme::distributed_slice] +pub static WINDOW_STATE_BUILDERS: [fn(&WindowFuncCall) -> Result]; + +pub fn create_window_state(call: &WindowFuncCall) -> Result { + // we expect only one builder function in `expr_impl/window_function/mod.rs` + let builder = WINDOW_STATE_BUILDERS.iter().next(); + builder.map_or_else( + || { + Err(ExprError::UnsupportedFunction(format!( + "{}({}) -> {}", + call.kind, + call.args.arg_types().iter().format(", "), + &call.return_type, + ))) + }, + |f| f(call), + ) +} diff --git a/src/expr/impl/src/window_function/mod.rs b/src/expr/impl/src/window_function/mod.rs index 138f08e67b2b8..1153a9111a1ea 100644 --- a/src/expr/impl/src/window_function/mod.rs +++ b/src/expr/impl/src/window_function/mod.rs @@ -13,7 +13,9 @@ // limitations under the License. use itertools::Itertools; -use risingwave_expr::window_function::{BoxedWindowState, WindowFuncCall, WindowFuncKind}; +use risingwave_expr::window_function::{ + BoxedWindowState, WindowFuncCall, WindowFuncKind, WINDOW_STATE_BUILDERS, +}; use risingwave_expr::{ExprError, Result}; mod aggregate; @@ -21,7 +23,8 @@ mod buffer; mod range_utils; mod rank; -pub fn create_window_state(call: &WindowFuncCall) -> Result { +#[linkme::distributed_slice(WINDOW_STATE_BUILDERS)] +fn create_window_state_impl(call: &WindowFuncCall) -> Result { assert!(call.frame.bounds.validate().is_ok()); use WindowFuncKind::*; From 6b2bb9f49af294ac0c7f8c06a3bed8b7d9558866 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 31 May 2024 17:05:39 +0800 Subject: [PATCH 3/3] fix ut Signed-off-by: Richard Chien --- src/expr/impl/src/window_function/buffer.rs | 6 +++--- src/expr/impl/src/window_function/rank.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/expr/impl/src/window_function/buffer.rs b/src/expr/impl/src/window_function/buffer.rs index 53c5a4de89ce4..1fd44151f89b1 100644 --- a/src/expr/impl/src/window_function/buffer.rs +++ b/src/expr/impl/src/window_function/buffer.rs @@ -439,12 +439,12 @@ impl WindowImpl for RangeWindow { #[cfg(test)] mod tests { use itertools::Itertools; - - use super::*; - use crate::window_function::FrameBound::{ + use risingwave_expr::window_function::FrameBound::{ CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding, }; + use super::*; + #[test] fn test_rows_frame_unbounded_preceding_to_current_row() { let mut buffer = WindowBuffer::>::new( diff --git a/src/expr/impl/src/window_function/rank.rs b/src/expr/impl/src/window_function/rank.rs index 2078ec2514a9e..9b25a15c35590 100644 --- a/src/expr/impl/src/window_function/rank.rs +++ b/src/expr/impl/src/window_function/rank.rs @@ -177,10 +177,10 @@ mod tests { use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::memcmp_encoding; use risingwave_common::util::sort_util::OrderType; + use risingwave_expr::aggregate::AggArgs; + use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncKind}; use super::*; - use crate::aggregate::AggArgs; - use crate::window_function::{Frame, FrameBound, WindowFuncKind}; fn create_state_key(order: i64, pk: i64) -> StateKey { StateKey {