From 03cb6bcbdcc873832ffc5a11a64b37de8255e237 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 21 May 2024 16:20:51 +0800 Subject: [PATCH 1/3] rename `build_exprs` to `prepare` Signed-off-by: Richard Chien --- src/expr/core/src/window_function/call.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/expr/core/src/window_function/call.rs b/src/expr/core/src/window_function/call.rs index f5714c64345a1..94af805b7f907 100644 --- a/src/expr/core/src/window_function/call.rs +++ b/src/expr/core/src/window_function/call.rs @@ -317,11 +317,7 @@ impl RangeFrameOffset { } } - fn build_exprs( - &mut self, - order_data_type: &DataType, - offset_data_type: &DataType, - ) -> Result<()> { + fn prepare(&mut self, order_data_type: &DataType, offset_data_type: &DataType) -> Result<()> { use risingwave_pb::expr::expr_node::PbType as PbExprType; let input_expr = InputRefExpression::new(order_data_type.clone(), 0); @@ -340,15 +336,14 @@ impl RangeFrameOffset { Ok(()) } + #[cfg(test)] pub fn new_for_test( offset: ScalarImpl, order_data_type: &DataType, offset_data_type: &DataType, ) -> Self { let mut offset = Self::new(offset); - offset - .build_exprs(order_data_type, offset_data_type) - .unwrap(); + offset.prepare(order_data_type, offset_data_type).unwrap(); offset } } @@ -650,7 +645,7 @@ impl RangeFrameBound { .context("offset `Datum` is not decodable")? .context("offset of `RangeFrameBound` must be non-NULL")?; let mut offset = RangeFrameOffset::new(offset_value); - offset.build_exprs(order_data_type, offset_data_type)?; + offset.prepare(order_data_type, offset_data_type)?; if bound_type == PbBoundType::Preceding { Self::Preceding(offset) } else { From f5b8fe58df8b9969c46dc0b00f2b9dc321aa0ec1 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 30 May 2024 16:52:11 +0800 Subject: [PATCH 2/3] move `RowsFrameBounds` and `RangeFrameBounds` to separated modules Signed-off-by: Richard Chien --- src/expr/core/src/window_function/call.rs | 598 +-------------------- src/expr/core/src/window_function/mod.rs | 4 + src/expr/core/src/window_function/range.rs | 401 ++++++++++++++ src/expr/core/src/window_function/rows.rs | 234 ++++++++ 4 files changed, 644 insertions(+), 593 deletions(-) create mode 100644 src/expr/core/src/window_function/range.rs create mode 100644 src/expr/core/src/window_function/rows.rs diff --git a/src/expr/core/src/window_function/call.rs b/src/expr/core/src/window_function/call.rs index 94af805b7f907..bf3178907ab7b 100644 --- a/src/expr/core/src/window_function/call.rs +++ b/src/expr/core/src/window_function/call.rs @@ -13,34 +13,17 @@ // limitations under the License. use std::fmt::Display; -use std::ops::Deref; -use std::sync::Arc; -use anyhow::Context; -use educe::Educe; use enum_as_inner::EnumAsInner; -use futures_util::FutureExt; use parse_display::Display; -use risingwave_common::row::OwnedRow; -use risingwave_common::types::{ - DataType, Datum, IsNegative, ScalarImpl, ScalarRefImpl, Sentinelled, ToOwnedDatum, ToText, -}; -use risingwave_common::util::sort_util::{Direction, OrderType}; -use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt}; +use risingwave_common::types::DataType; use risingwave_common::{bail, must_match}; -use risingwave_pb::expr::window_frame::{ - PbBound, PbBoundType, PbBounds, PbExclusion, PbRangeFrameBound, PbRangeFrameBounds, - PbRowsFrameBound, PbRowsFrameBounds, -}; +use risingwave_pb::expr::window_frame::{PbBounds, PbExclusion}; use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction}; use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding}; -use super::WindowFuncKind; +use super::{RangeFrameBounds, RowsFrameBound, RowsFrameBounds, WindowFuncKind}; use crate::aggregate::AggArgs; -use crate::expr::{ - build_func, BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, - LiteralExpression, -}; use crate::Result; #[derive(Debug, Clone)] @@ -185,319 +168,6 @@ pub trait FrameBoundsImpl { fn validate(&self) -> Result<()>; } -#[derive(Display, Debug, Clone, Eq, PartialEq, Hash)] -#[display("ROWS BETWEEN {start} AND {end}")] -pub struct RowsFrameBounds { - pub start: RowsFrameBound, - pub end: RowsFrameBound, -} - -impl RowsFrameBounds { - fn from_protobuf(bounds: &PbRowsFrameBounds) -> Result { - let start = FrameBound::::from_protobuf(bounds.get_start()?)?; - let end = FrameBound::::from_protobuf(bounds.get_end()?)?; - Ok(Self { start, end }) - } - - fn to_protobuf(&self) -> PbRowsFrameBounds { - PbRowsFrameBounds { - start: Some(self.start.to_protobuf()), - end: Some(self.end.to_protobuf()), - } - } -} - -impl RowsFrameBounds { - /// Check if the `ROWS` frame is canonical. - /// - /// A canonical `ROWS` frame is defined as: - /// - /// - Its bounds are valid (see [`Self::validate`]). - /// - It contains the current row. - pub fn is_canonical(&self) -> bool { - self.validate().is_ok() && { - let start = self.start.to_offset(); - let end = self.end.to_offset(); - start.unwrap_or(0) <= 0 && end.unwrap_or(0) >= 0 - } - } - - /// Get the number of preceding rows. - pub fn n_preceding_rows(&self) -> Option { - match (&self.start, &self.end) { - (UnboundedPreceding, _) => None, - (Preceding(n1), Preceding(n2)) => Some(*n1.max(n2)), - (Preceding(n), _) => Some(*n), - (CurrentRow | Following(_) | UnboundedFollowing, _) => Some(0), - } - } - - /// Get the number of following rows. - pub fn n_following_rows(&self) -> Option { - match (&self.start, &self.end) { - (_, UnboundedFollowing) => None, - (Following(n1), Following(n2)) => Some(*n1.max(n2)), - (_, Following(n)) => Some(*n), - (_, CurrentRow | Preceding(_) | UnboundedPreceding) => Some(0), - } - } -} - -impl FrameBoundsImpl for RowsFrameBounds { - fn validate(&self) -> Result<()> { - FrameBound::validate_bounds(&self.start, &self.end, |_| Ok(())) - } -} - -#[derive(Debug, Clone, Eq, PartialEq, Hash)] -pub struct RangeFrameBounds { - pub order_data_type: DataType, - pub order_type: OrderType, - pub offset_data_type: DataType, - pub start: RangeFrameBound, - pub end: RangeFrameBound, -} - -impl RangeFrameBounds { - fn from_protobuf(bounds: &PbRangeFrameBounds) -> Result { - let order_data_type = DataType::from(bounds.get_order_data_type()?); - let order_type = OrderType::from_protobuf(bounds.get_order_type()?); - let offset_data_type = DataType::from(bounds.get_offset_data_type()?); - let start = FrameBound::::from_protobuf( - bounds.get_start()?, - &order_data_type, - &offset_data_type, - )?; - let end = FrameBound::::from_protobuf( - bounds.get_end()?, - &order_data_type, - &offset_data_type, - )?; - Ok(Self { - order_data_type, - order_type, - offset_data_type, - start, - end, - }) - } - - fn to_protobuf(&self) -> PbRangeFrameBounds { - PbRangeFrameBounds { - start: Some(self.start.to_protobuf()), - end: Some(self.end.to_protobuf()), - order_data_type: Some(self.order_data_type.to_protobuf()), - order_type: Some(self.order_type.to_protobuf()), - offset_data_type: Some(self.offset_data_type.to_protobuf()), - } - } -} - -/// The wrapper type for [`ScalarImpl`] range frame offset, containing -/// two expressions to help adding and subtracting the offset. -#[derive(Debug, Clone, Educe)] -#[educe(PartialEq, Eq, Hash)] -pub struct RangeFrameOffset { - /// The original offset value. - offset: ScalarImpl, - /// Built expression for `$0 + offset`. - #[educe(PartialEq(ignore), Hash(ignore))] - add_expr: Option>, - /// Built expression for `$0 - offset`. - #[educe(PartialEq(ignore), Hash(ignore))] - sub_expr: Option>, -} - -impl RangeFrameOffset { - pub fn new(offset: ScalarImpl) -> Self { - Self { - offset, - add_expr: None, - sub_expr: None, - } - } - - fn prepare(&mut self, order_data_type: &DataType, offset_data_type: &DataType) -> Result<()> { - use risingwave_pb::expr::expr_node::PbType as PbExprType; - - let input_expr = InputRefExpression::new(order_data_type.clone(), 0); - let offset_expr = - LiteralExpression::new(offset_data_type.clone(), Some(self.offset.clone())); - self.add_expr = Some(Arc::new(build_func( - PbExprType::Add, - order_data_type.clone(), - vec![input_expr.clone().boxed(), offset_expr.clone().boxed()], - )?)); - self.sub_expr = Some(Arc::new(build_func( - PbExprType::Subtract, - order_data_type.clone(), - vec![input_expr.boxed(), offset_expr.boxed()], - )?)); - Ok(()) - } - - #[cfg(test)] - pub fn new_for_test( - offset: ScalarImpl, - order_data_type: &DataType, - offset_data_type: &DataType, - ) -> Self { - let mut offset = Self::new(offset); - offset.prepare(order_data_type, offset_data_type).unwrap(); - offset - } -} - -impl Deref for RangeFrameOffset { - type Target = ScalarImpl; - - fn deref(&self) -> &Self::Target { - &self.offset - } -} - -impl Display for RangeFrameBounds { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "RANGE BETWEEN {} AND {}", - self.start.for_display(), - self.end.for_display() - )?; - Ok(()) - } -} - -impl FrameBoundsImpl for RangeFrameBounds { - fn validate(&self) -> Result<()> { - fn validate_non_negative(val: impl IsNegative + Display) -> Result<()> { - if val.is_negative() { - bail!( - "frame bound offset should be non-negative, but {} is given", - val - ); - } - Ok(()) - } - - FrameBound::validate_bounds(&self.start, &self.end, |offset| { - match offset.as_scalar_ref_impl() { - // TODO(rc): use decl macro? - ScalarRefImpl::Int16(val) => validate_non_negative(val)?, - ScalarRefImpl::Int32(val) => validate_non_negative(val)?, - ScalarRefImpl::Int64(val) => validate_non_negative(val)?, - ScalarRefImpl::Float32(val) => validate_non_negative(val)?, - ScalarRefImpl::Float64(val) => validate_non_negative(val)?, - ScalarRefImpl::Decimal(val) => validate_non_negative(val)?, - ScalarRefImpl::Interval(val) => { - if !val.is_never_negative() { - bail!( - "for frame bound offset of type `interval`, each field should be non-negative, but {} is given", - val - ); - } - if matches!(self.order_data_type, DataType::Timestamptz) { - // for `timestamptz`, we only support offset without `month` and `day` fields - if val.months() != 0 || val.days() != 0 { - bail!( - "for frame order column of type `timestamptz`, offset should not have non-zero `month` and `day`", - ); - } - } - }, - _ => unreachable!("other order column data types are not supported and should be banned in frontend"), - } - Ok(()) - }) - } -} - -impl RangeFrameBounds { - /// Get the frame start for a given order column value. - /// - /// ## Examples - /// - /// For the following frames: - /// - /// ```sql - /// ORDER BY x ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - /// ORDER BY x DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - /// ``` - /// - /// For any CURRENT ROW with any order value, the frame start is always the first-most row, which is - /// represented by [`Sentinelled::Smallest`]. - /// - /// For the following frame: - /// - /// ```sql - /// ORDER BY x ASC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW - /// ``` - /// - /// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `90`. - /// - /// For the following frame: - /// - /// ```sql - /// ORDER BY x DESC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW - /// ``` - /// - /// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `110`. - pub fn frame_start_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled { - self.start.for_calc().bound_of(order_value, self.order_type) - } - - /// Get the frame end for a given order column value. It's very similar to `frame_start_of`, just with - /// everything on the other direction. - pub fn frame_end_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled { - self.end.for_calc().bound_of(order_value, self.order_type) - } - - /// Get the order value of the CURRENT ROW of the first frame that includes the given order value. - /// - /// ## Examples - /// - /// For the following frames: - /// - /// ```sql - /// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING - /// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING - /// ``` - /// - /// For any given order value, the first CURRENT ROW is always the first-most row, which is - /// represented by [`Sentinelled::Smallest`]. - /// - /// For the following frame: - /// - /// ```sql - /// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING - /// ``` - /// - /// For a given order value `100`, the first CURRENT ROW should have order value `90`. - /// - /// For the following frame: - /// - /// ```sql - /// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING - /// ``` - /// - /// For a given order value `100`, the first CURRENT ROW should have order value `110`. - pub fn first_curr_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled { - self.end - .for_calc() - .reverse() - .bound_of(order_value, self.order_type) - } - - /// Get the order value of the CURRENT ROW of the last frame that includes the given order value. - /// It's very similar to `first_curr_of`, just with everything on the other direction. - pub fn last_curr_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled { - self.start - .for_calc() - .reverse() - .bound_of(order_value, self.order_type) - } -} - #[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)] #[display(style = "TITLE CASE")] pub enum FrameBound { @@ -510,9 +180,6 @@ pub enum FrameBound { UnboundedFollowing, } -pub type RowsFrameBound = FrameBound; -pub type RangeFrameBound = FrameBound; - impl FrameBound { fn offset_value(&self) -> Option<&T> { match self { @@ -521,7 +188,7 @@ impl FrameBound { } } - fn validate_bounds( + pub(super) fn validate_bounds( start: &Self, end: &Self, offset_checker: impl Fn(&T) -> Result<()>, @@ -564,7 +231,7 @@ impl FrameBound where T: Copy, { - fn reverse(self) -> FrameBound { + pub(super) fn reverse(self) -> FrameBound { match self { UnboundedPreceding => UnboundedFollowing, Preceding(offset) => Following(offset), @@ -575,172 +242,6 @@ where } } -impl RowsFrameBound { - fn from_protobuf_legacy(bound: &PbBound) -> Result { - use risingwave_pb::expr::window_frame::bound::PbOffset; - - let offset = bound.get_offset()?; - let bound = match offset { - PbOffset::Integer(offset) => Self::from_protobuf(&PbRowsFrameBound { - r#type: bound.get_type()? as _, - offset: Some(*offset), - })?, - PbOffset::Datum(_) => bail!("offset of `RowsFrameBound` must be `Integer`"), - }; - Ok(bound) - } - - fn from_protobuf(bound: &PbRowsFrameBound) -> Result { - let bound = match bound.get_type()? { - PbBoundType::Unspecified => bail!("unspecified type of `RowsFrameBound`"), - PbBoundType::UnboundedPreceding => Self::UnboundedPreceding, - PbBoundType::Preceding => Self::Preceding(*bound.get_offset()? as usize), - PbBoundType::CurrentRow => Self::CurrentRow, - PbBoundType::Following => Self::Following(*bound.get_offset()? as usize), - PbBoundType::UnboundedFollowing => Self::UnboundedFollowing, - }; - Ok(bound) - } - - fn to_protobuf(&self) -> PbRowsFrameBound { - let (r#type, offset) = match self { - Self::UnboundedPreceding => (PbBoundType::UnboundedPreceding, None), - Self::Preceding(offset) => (PbBoundType::Preceding, Some(*offset as _)), - Self::CurrentRow => (PbBoundType::CurrentRow, None), - Self::Following(offset) => (PbBoundType::Following, Some(*offset as _)), - Self::UnboundedFollowing => (PbBoundType::UnboundedFollowing, None), - }; - PbRowsFrameBound { - r#type: r#type as _, - offset, - } - } -} - -impl RowsFrameBound { - /// Convert the bound to sized offset from current row. `None` if the bound is unbounded. - pub fn to_offset(&self) -> Option { - match self { - UnboundedPreceding | UnboundedFollowing => None, - CurrentRow => Some(0), - Preceding(n) => Some(-(*n as isize)), - Following(n) => Some(*n as isize), - } - } -} - -impl RangeFrameBound { - fn from_protobuf( - bound: &PbRangeFrameBound, - order_data_type: &DataType, - offset_data_type: &DataType, - ) -> Result { - let bound = match bound.get_type()? { - PbBoundType::Unspecified => bail!("unspecified type of `RangeFrameBound`"), - PbBoundType::UnboundedPreceding => Self::UnboundedPreceding, - PbBoundType::CurrentRow => Self::CurrentRow, - PbBoundType::UnboundedFollowing => Self::UnboundedFollowing, - bound_type @ (PbBoundType::Preceding | PbBoundType::Following) => { - let offset_value = Datum::from_protobuf(bound.get_offset()?, offset_data_type) - .context("offset `Datum` is not decodable")? - .context("offset of `RangeFrameBound` must be non-NULL")?; - let mut offset = RangeFrameOffset::new(offset_value); - offset.prepare(order_data_type, offset_data_type)?; - if bound_type == PbBoundType::Preceding { - Self::Preceding(offset) - } else { - Self::Following(offset) - } - } - }; - Ok(bound) - } - - fn to_protobuf(&self) -> PbRangeFrameBound { - let (r#type, offset) = match self { - Self::UnboundedPreceding => (PbBoundType::UnboundedPreceding, None), - Self::Preceding(offset) => ( - PbBoundType::Preceding, - Some(Some(offset.as_scalar_ref_impl()).to_protobuf()), - ), - Self::CurrentRow => (PbBoundType::CurrentRow, None), - Self::Following(offset) => ( - PbBoundType::Following, - Some(Some(offset.as_scalar_ref_impl()).to_protobuf()), - ), - Self::UnboundedFollowing => (PbBoundType::UnboundedFollowing, None), - }; - PbRangeFrameBound { - r#type: r#type as _, - offset, - } - } -} - -impl RangeFrameBound { - fn for_display(&self) -> FrameBound { - match self { - UnboundedPreceding => UnboundedPreceding, - Preceding(offset) => Preceding(offset.as_scalar_ref_impl().to_text()), - CurrentRow => CurrentRow, - Following(offset) => Following(offset.as_scalar_ref_impl().to_text()), - UnboundedFollowing => UnboundedFollowing, - } - } - - fn for_calc(&self) -> FrameBound> { - match self { - UnboundedPreceding => UnboundedPreceding, - Preceding(offset) => Preceding(RangeFrameOffsetRef { - add_expr: offset.add_expr.as_ref().unwrap().as_ref(), - sub_expr: offset.sub_expr.as_ref().unwrap().as_ref(), - }), - CurrentRow => CurrentRow, - Following(offset) => Following(RangeFrameOffsetRef { - add_expr: offset.add_expr.as_ref().unwrap().as_ref(), - sub_expr: offset.sub_expr.as_ref().unwrap().as_ref(), - }), - UnboundedFollowing => UnboundedFollowing, - } - } -} - -#[derive(Debug, Educe)] -#[educe(Clone, Copy)] -pub struct RangeFrameOffsetRef<'a> { - /// Built expression for `$0 + offset`. - add_expr: &'a dyn Expression, - /// Built expression for `$0 - offset`. - sub_expr: &'a dyn Expression, -} - -impl FrameBound> { - fn bound_of(self, order_value: impl ToOwnedDatum, order_type: OrderType) -> Sentinelled { - let expr = match (self, order_type.direction()) { - (UnboundedPreceding, _) => return Sentinelled::Smallest, - (UnboundedFollowing, _) => return Sentinelled::Largest, - (CurrentRow, _) => return Sentinelled::Normal(order_value.to_owned_datum()), - (Preceding(offset), Direction::Ascending) - | (Following(offset), Direction::Descending) => { - // should SUBTRACT the offset - offset.sub_expr - } - (Following(offset), Direction::Ascending) - | (Preceding(offset), Direction::Descending) => { - // should ADD the offset - offset.add_expr - } - }; - let row = OwnedRow::new(vec![order_value.to_owned_datum()]); - Sentinelled::Normal( - expr.eval_row(&row) - .now_or_never() - .expect("frame bound calculation should finish immediately") - .expect("just simple calculation, should succeed"), // TODO(rc): handle overflow - ) - } -} - #[derive(Display, Debug, Copy, Clone, Eq, PartialEq, Hash, Default, EnumAsInner)] #[display("EXCLUDE {}", style = "TITLE CASE")] pub enum FrameExclusion { @@ -768,92 +269,3 @@ impl FrameExclusion { } } } - -#[cfg(test)] -mod tests { - - use super::*; - - #[test] - fn test_rows_frame_bounds() { - let bounds = RowsFrameBounds { - start: Preceding(1), - end: CurrentRow, - }; - assert!(bounds.validate().is_ok()); - assert!(bounds.is_canonical()); - assert_eq!(bounds.start.to_offset(), Some(-1)); - assert_eq!(bounds.end.to_offset(), Some(0)); - assert_eq!(bounds.n_preceding_rows(), Some(1)); - assert_eq!(bounds.n_following_rows(), Some(0)); - - let bounds = RowsFrameBounds { - start: CurrentRow, - end: Following(1), - }; - assert!(bounds.validate().is_ok()); - assert!(bounds.is_canonical()); - assert_eq!(bounds.start.to_offset(), Some(0)); - assert_eq!(bounds.end.to_offset(), Some(1)); - assert_eq!(bounds.n_preceding_rows(), Some(0)); - assert_eq!(bounds.n_following_rows(), Some(1)); - - let bounds = RowsFrameBounds { - start: UnboundedPreceding, - end: Following(10), - }; - assert!(bounds.validate().is_ok()); - assert!(bounds.is_canonical()); - assert_eq!(bounds.start.to_offset(), None); - assert_eq!(bounds.end.to_offset(), Some(10)); - assert_eq!(bounds.n_preceding_rows(), None); - assert_eq!(bounds.n_following_rows(), Some(10)); - - let bounds = RowsFrameBounds { - start: Preceding(10), - end: UnboundedFollowing, - }; - assert!(bounds.validate().is_ok()); - assert!(bounds.is_canonical()); - assert_eq!(bounds.start.to_offset(), Some(-10)); - assert_eq!(bounds.end.to_offset(), None); - assert_eq!(bounds.n_preceding_rows(), Some(10)); - assert_eq!(bounds.n_following_rows(), None); - - let bounds = RowsFrameBounds { - start: Preceding(1), - end: Preceding(10), - }; - assert!(bounds.validate().is_ok()); - assert!(!bounds.is_canonical()); - assert_eq!(bounds.start.to_offset(), Some(-1)); - assert_eq!(bounds.end.to_offset(), Some(-10)); - assert_eq!(bounds.n_preceding_rows(), Some(10)); - assert_eq!(bounds.n_following_rows(), Some(0)); - - let bounds = RowsFrameBounds { - start: Following(10), - end: Following(1), - }; - assert!(bounds.validate().is_ok()); - assert!(!bounds.is_canonical()); - assert_eq!(bounds.start.to_offset(), Some(10)); - assert_eq!(bounds.end.to_offset(), Some(1)); - assert_eq!(bounds.n_preceding_rows(), Some(0)); - assert_eq!(bounds.n_following_rows(), Some(10)); - - let bounds = RowsFrameBounds { - start: UnboundedFollowing, - end: Following(10), - }; - assert!(bounds.validate().is_err()); - assert!(!bounds.is_canonical()); - - let bounds = RowsFrameBounds { - start: Preceding(10), - end: UnboundedPreceding, - }; - assert!(bounds.validate().is_err()); - assert!(!bounds.is_canonical()); - } -} diff --git a/src/expr/core/src/window_function/mod.rs b/src/expr/core/src/window_function/mod.rs index dd735d086fb12..5f747d2296729 100644 --- a/src/expr/core/src/window_function/mod.rs +++ b/src/expr/core/src/window_function/mod.rs @@ -17,6 +17,10 @@ pub use kind::*; mod call; pub use call::*; +mod rows; +pub use rows::*; +mod range; +pub use range::*; mod state; pub use state::*; diff --git a/src/expr/core/src/window_function/range.rs b/src/expr/core/src/window_function/range.rs new file mode 100644 index 0000000000000..ebc2f5ef2d4b4 --- /dev/null +++ b/src/expr/core/src/window_function/range.rs @@ -0,0 +1,401 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::ops::Deref; +use std::sync::Arc; + +use anyhow::Context; +use educe::Educe; +use futures_util::FutureExt; +use risingwave_common::bail; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{ + DataType, Datum, IsNegative, ScalarImpl, ScalarRefImpl, Sentinelled, ToOwnedDatum, ToText, +}; +use risingwave_common::util::sort_util::{Direction, OrderType}; +use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt}; +use risingwave_pb::expr::window_frame::{PbBoundType, PbRangeFrameBound, PbRangeFrameBounds}; + +use super::FrameBound::{ + self, CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding, +}; +use super::FrameBoundsImpl; +use crate::expr::{ + build_func, BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, + LiteralExpression, +}; +use crate::Result; + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct RangeFrameBounds { + pub order_data_type: DataType, + pub order_type: OrderType, + pub offset_data_type: DataType, + pub start: RangeFrameBound, + pub end: RangeFrameBound, +} + +impl RangeFrameBounds { + pub(super) fn from_protobuf(bounds: &PbRangeFrameBounds) -> Result { + let order_data_type = DataType::from(bounds.get_order_data_type()?); + let order_type = OrderType::from_protobuf(bounds.get_order_type()?); + let offset_data_type = DataType::from(bounds.get_offset_data_type()?); + let start = FrameBound::::from_protobuf( + bounds.get_start()?, + &order_data_type, + &offset_data_type, + )?; + let end = FrameBound::::from_protobuf( + bounds.get_end()?, + &order_data_type, + &offset_data_type, + )?; + Ok(Self { + order_data_type, + order_type, + offset_data_type, + start, + end, + }) + } + + pub(super) fn to_protobuf(&self) -> PbRangeFrameBounds { + PbRangeFrameBounds { + start: Some(self.start.to_protobuf()), + end: Some(self.end.to_protobuf()), + order_data_type: Some(self.order_data_type.to_protobuf()), + order_type: Some(self.order_type.to_protobuf()), + offset_data_type: Some(self.offset_data_type.to_protobuf()), + } + } +} + +impl Display for RangeFrameBounds { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RANGE BETWEEN {} AND {}", + self.start.for_display(), + self.end.for_display() + )?; + Ok(()) + } +} + +impl FrameBoundsImpl for RangeFrameBounds { + fn validate(&self) -> Result<()> { + fn validate_non_negative(val: impl IsNegative + Display) -> Result<()> { + if val.is_negative() { + bail!( + "frame bound offset should be non-negative, but {} is given", + val + ); + } + Ok(()) + } + + FrameBound::validate_bounds(&self.start, &self.end, |offset| { + match offset.as_scalar_ref_impl() { + // TODO(rc): use decl macro? + ScalarRefImpl::Int16(val) => validate_non_negative(val)?, + ScalarRefImpl::Int32(val) => validate_non_negative(val)?, + ScalarRefImpl::Int64(val) => validate_non_negative(val)?, + ScalarRefImpl::Float32(val) => validate_non_negative(val)?, + ScalarRefImpl::Float64(val) => validate_non_negative(val)?, + ScalarRefImpl::Decimal(val) => validate_non_negative(val)?, + ScalarRefImpl::Interval(val) => { + if !val.is_never_negative() { + bail!( + "for frame bound offset of type `interval`, each field should be non-negative, but {} is given", + val + ); + } + if matches!(self.order_data_type, DataType::Timestamptz) { + // for `timestamptz`, we only support offset without `month` and `day` fields + if val.months() != 0 || val.days() != 0 { + bail!( + "for frame order column of type `timestamptz`, offset should not have non-zero `month` and `day`", + ); + } + } + }, + _ => unreachable!("other order column data types are not supported and should be banned in frontend"), + } + Ok(()) + }) + } +} + +impl RangeFrameBounds { + /// Get the frame start for a given order column value. + /// + /// ## Examples + /// + /// For the following frames: + /// + /// ```sql + /// ORDER BY x ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + /// ORDER BY x DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + /// ``` + /// + /// For any CURRENT ROW with any order value, the frame start is always the first-most row, which is + /// represented by [`Sentinelled::Smallest`]. + /// + /// For the following frame: + /// + /// ```sql + /// ORDER BY x ASC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW + /// ``` + /// + /// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `90`. + /// + /// For the following frame: + /// + /// ```sql + /// ORDER BY x DESC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW + /// ``` + /// + /// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `110`. + pub fn frame_start_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled { + self.start.for_calc().bound_of(order_value, self.order_type) + } + + /// Get the frame end for a given order column value. It's very similar to `frame_start_of`, just with + /// everything on the other direction. + pub fn frame_end_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled { + self.end.for_calc().bound_of(order_value, self.order_type) + } + + /// Get the order value of the CURRENT ROW of the first frame that includes the given order value. + /// + /// ## Examples + /// + /// For the following frames: + /// + /// ```sql + /// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + /// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + /// ``` + /// + /// For any given order value, the first CURRENT ROW is always the first-most row, which is + /// represented by [`Sentinelled::Smallest`]. + /// + /// For the following frame: + /// + /// ```sql + /// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING + /// ``` + /// + /// For a given order value `100`, the first CURRENT ROW should have order value `90`. + /// + /// For the following frame: + /// + /// ```sql + /// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING + /// ``` + /// + /// For a given order value `100`, the first CURRENT ROW should have order value `110`. + pub fn first_curr_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled { + self.end + .for_calc() + .reverse() + .bound_of(order_value, self.order_type) + } + + /// Get the order value of the CURRENT ROW of the last frame that includes the given order value. + /// It's very similar to `first_curr_of`, just with everything on the other direction. + pub fn last_curr_of(&self, order_value: impl ToOwnedDatum) -> Sentinelled { + self.start + .for_calc() + .reverse() + .bound_of(order_value, self.order_type) + } +} + +pub type RangeFrameBound = FrameBound; + +impl RangeFrameBound { + fn from_protobuf( + bound: &PbRangeFrameBound, + order_data_type: &DataType, + offset_data_type: &DataType, + ) -> Result { + let bound = match bound.get_type()? { + PbBoundType::Unspecified => bail!("unspecified type of `RangeFrameBound`"), + PbBoundType::UnboundedPreceding => Self::UnboundedPreceding, + PbBoundType::CurrentRow => Self::CurrentRow, + PbBoundType::UnboundedFollowing => Self::UnboundedFollowing, + bound_type @ (PbBoundType::Preceding | PbBoundType::Following) => { + let offset_value = Datum::from_protobuf(bound.get_offset()?, offset_data_type) + .context("offset `Datum` is not decodable")? + .context("offset of `RangeFrameBound` must be non-NULL")?; + let mut offset = RangeFrameOffset::new(offset_value); + offset.prepare(order_data_type, offset_data_type)?; + if bound_type == PbBoundType::Preceding { + Self::Preceding(offset) + } else { + Self::Following(offset) + } + } + }; + Ok(bound) + } + + fn to_protobuf(&self) -> PbRangeFrameBound { + let (r#type, offset) = match self { + Self::UnboundedPreceding => (PbBoundType::UnboundedPreceding, None), + Self::Preceding(offset) => ( + PbBoundType::Preceding, + Some(Some(offset.as_scalar_ref_impl()).to_protobuf()), + ), + Self::CurrentRow => (PbBoundType::CurrentRow, None), + Self::Following(offset) => ( + PbBoundType::Following, + Some(Some(offset.as_scalar_ref_impl()).to_protobuf()), + ), + Self::UnboundedFollowing => (PbBoundType::UnboundedFollowing, None), + }; + PbRangeFrameBound { + r#type: r#type as _, + offset, + } + } +} + +impl RangeFrameBound { + fn for_display(&self) -> FrameBound { + match self { + UnboundedPreceding => UnboundedPreceding, + Preceding(offset) => Preceding(offset.as_scalar_ref_impl().to_text()), + CurrentRow => CurrentRow, + Following(offset) => Following(offset.as_scalar_ref_impl().to_text()), + UnboundedFollowing => UnboundedFollowing, + } + } + + fn for_calc(&self) -> FrameBound> { + match self { + UnboundedPreceding => UnboundedPreceding, + Preceding(offset) => Preceding(RangeFrameOffsetRef { + add_expr: offset.add_expr.as_ref().unwrap().as_ref(), + sub_expr: offset.sub_expr.as_ref().unwrap().as_ref(), + }), + CurrentRow => CurrentRow, + Following(offset) => Following(RangeFrameOffsetRef { + add_expr: offset.add_expr.as_ref().unwrap().as_ref(), + sub_expr: offset.sub_expr.as_ref().unwrap().as_ref(), + }), + UnboundedFollowing => UnboundedFollowing, + } + } +} + +/// The wrapper type for [`ScalarImpl`] range frame offset, containing +/// two expressions to help adding and subtracting the offset. +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct RangeFrameOffset { + /// The original offset value. + offset: ScalarImpl, + /// Built expression for `$0 + offset`. + #[educe(PartialEq(ignore), Hash(ignore))] + add_expr: Option>, + /// Built expression for `$0 - offset`. + #[educe(PartialEq(ignore), Hash(ignore))] + sub_expr: Option>, +} + +impl RangeFrameOffset { + pub fn new(offset: ScalarImpl) -> Self { + Self { + offset, + add_expr: None, + sub_expr: None, + } + } + + fn prepare(&mut self, order_data_type: &DataType, offset_data_type: &DataType) -> Result<()> { + use risingwave_pb::expr::expr_node::PbType as PbExprType; + + let input_expr = InputRefExpression::new(order_data_type.clone(), 0); + let offset_expr = + LiteralExpression::new(offset_data_type.clone(), Some(self.offset.clone())); + self.add_expr = Some(Arc::new(build_func( + PbExprType::Add, + order_data_type.clone(), + vec![input_expr.clone().boxed(), offset_expr.clone().boxed()], + )?)); + self.sub_expr = Some(Arc::new(build_func( + PbExprType::Subtract, + order_data_type.clone(), + vec![input_expr.boxed(), offset_expr.boxed()], + )?)); + Ok(()) + } + + pub fn new_for_test( + offset: ScalarImpl, + order_data_type: &DataType, + offset_data_type: &DataType, + ) -> Self { + let mut offset = Self::new(offset); + offset.prepare(order_data_type, offset_data_type).unwrap(); + offset + } +} + +impl Deref for RangeFrameOffset { + type Target = ScalarImpl; + + fn deref(&self) -> &Self::Target { + &self.offset + } +} + +#[derive(Debug, Educe)] +#[educe(Clone, Copy)] +pub struct RangeFrameOffsetRef<'a> { + /// Built expression for `$0 + offset`. + add_expr: &'a dyn Expression, + /// Built expression for `$0 - offset`. + sub_expr: &'a dyn Expression, +} + +impl FrameBound> { + fn bound_of(self, order_value: impl ToOwnedDatum, order_type: OrderType) -> Sentinelled { + let expr = match (self, order_type.direction()) { + (UnboundedPreceding, _) => return Sentinelled::Smallest, + (UnboundedFollowing, _) => return Sentinelled::Largest, + (CurrentRow, _) => return Sentinelled::Normal(order_value.to_owned_datum()), + (Preceding(offset), Direction::Ascending) + | (Following(offset), Direction::Descending) => { + // should SUBTRACT the offset + offset.sub_expr + } + (Following(offset), Direction::Ascending) + | (Preceding(offset), Direction::Descending) => { + // should ADD the offset + offset.add_expr + } + }; + let row = OwnedRow::new(vec![order_value.to_owned_datum()]); + Sentinelled::Normal( + expr.eval_row(&row) + .now_or_never() + .expect("frame bound calculation should finish immediately") + .expect("just simple calculation, should succeed"), // TODO(rc): handle overflow + ) + } +} diff --git a/src/expr/core/src/window_function/rows.rs b/src/expr/core/src/window_function/rows.rs new file mode 100644 index 0000000000000..61eda77b65ef5 --- /dev/null +++ b/src/expr/core/src/window_function/rows.rs @@ -0,0 +1,234 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use parse_display::Display; +use risingwave_common::bail; +use risingwave_pb::expr::window_frame::{ + PbBound, PbBoundType, PbRowsFrameBound, PbRowsFrameBounds, +}; + +use super::FrameBound::{ + self, CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding, +}; +use super::FrameBoundsImpl; +use crate::Result; + +#[derive(Display, Debug, Clone, Eq, PartialEq, Hash)] +#[display("ROWS BETWEEN {start} AND {end}")] +pub struct RowsFrameBounds { + pub start: RowsFrameBound, + pub end: RowsFrameBound, +} + +impl RowsFrameBounds { + pub(super) fn from_protobuf(bounds: &PbRowsFrameBounds) -> Result { + let start = FrameBound::::from_protobuf(bounds.get_start()?)?; + let end = FrameBound::::from_protobuf(bounds.get_end()?)?; + Ok(Self { start, end }) + } + + pub(super) fn to_protobuf(&self) -> PbRowsFrameBounds { + PbRowsFrameBounds { + start: Some(self.start.to_protobuf()), + end: Some(self.end.to_protobuf()), + } + } +} + +impl RowsFrameBounds { + /// Check if the `ROWS` frame is canonical. + /// + /// A canonical `ROWS` frame is defined as: + /// + /// - Its bounds are valid (see [`Self::validate`]). + /// - It contains the current row. + pub fn is_canonical(&self) -> bool { + self.validate().is_ok() && { + let start = self.start.to_offset(); + let end = self.end.to_offset(); + start.unwrap_or(0) <= 0 && end.unwrap_or(0) >= 0 + } + } + + /// Get the number of preceding rows. + pub fn n_preceding_rows(&self) -> Option { + match (&self.start, &self.end) { + (UnboundedPreceding, _) => None, + (Preceding(n1), Preceding(n2)) => Some(*n1.max(n2)), + (Preceding(n), _) => Some(*n), + (CurrentRow | Following(_) | UnboundedFollowing, _) => Some(0), + } + } + + /// Get the number of following rows. + pub fn n_following_rows(&self) -> Option { + match (&self.start, &self.end) { + (_, UnboundedFollowing) => None, + (Following(n1), Following(n2)) => Some(*n1.max(n2)), + (_, Following(n)) => Some(*n), + (_, CurrentRow | Preceding(_) | UnboundedPreceding) => Some(0), + } + } +} + +impl FrameBoundsImpl for RowsFrameBounds { + fn validate(&self) -> Result<()> { + FrameBound::validate_bounds(&self.start, &self.end, |_| Ok(())) + } +} + +pub type RowsFrameBound = FrameBound; + +impl RowsFrameBound { + pub(super) fn from_protobuf_legacy(bound: &PbBound) -> Result { + use risingwave_pb::expr::window_frame::bound::PbOffset; + + let offset = bound.get_offset()?; + let bound = match offset { + PbOffset::Integer(offset) => Self::from_protobuf(&PbRowsFrameBound { + r#type: bound.get_type()? as _, + offset: Some(*offset), + })?, + PbOffset::Datum(_) => bail!("offset of `RowsFrameBound` must be `Integer`"), + }; + Ok(bound) + } + + fn from_protobuf(bound: &PbRowsFrameBound) -> Result { + let bound = match bound.get_type()? { + PbBoundType::Unspecified => bail!("unspecified type of `RowsFrameBound`"), + PbBoundType::UnboundedPreceding => Self::UnboundedPreceding, + PbBoundType::Preceding => Self::Preceding(*bound.get_offset()? as usize), + PbBoundType::CurrentRow => Self::CurrentRow, + PbBoundType::Following => Self::Following(*bound.get_offset()? as usize), + PbBoundType::UnboundedFollowing => Self::UnboundedFollowing, + }; + Ok(bound) + } + + fn to_protobuf(&self) -> PbRowsFrameBound { + let (r#type, offset) = match self { + Self::UnboundedPreceding => (PbBoundType::UnboundedPreceding, None), + Self::Preceding(offset) => (PbBoundType::Preceding, Some(*offset as _)), + Self::CurrentRow => (PbBoundType::CurrentRow, None), + Self::Following(offset) => (PbBoundType::Following, Some(*offset as _)), + Self::UnboundedFollowing => (PbBoundType::UnboundedFollowing, None), + }; + PbRowsFrameBound { + r#type: r#type as _, + offset, + } + } +} + +impl RowsFrameBound { + /// Convert the bound to sized offset from current row. `None` if the bound is unbounded. + pub fn to_offset(&self) -> Option { + match self { + UnboundedPreceding | UnboundedFollowing => None, + CurrentRow => Some(0), + Preceding(n) => Some(-(*n as isize)), + Following(n) => Some(*n as isize), + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_rows_frame_bounds() { + let bounds = RowsFrameBounds { + start: Preceding(1), + end: CurrentRow, + }; + assert!(bounds.validate().is_ok()); + assert!(bounds.is_canonical()); + assert_eq!(bounds.start.to_offset(), Some(-1)); + assert_eq!(bounds.end.to_offset(), Some(0)); + assert_eq!(bounds.n_preceding_rows(), Some(1)); + assert_eq!(bounds.n_following_rows(), Some(0)); + + let bounds = RowsFrameBounds { + start: CurrentRow, + end: Following(1), + }; + assert!(bounds.validate().is_ok()); + assert!(bounds.is_canonical()); + assert_eq!(bounds.start.to_offset(), Some(0)); + assert_eq!(bounds.end.to_offset(), Some(1)); + assert_eq!(bounds.n_preceding_rows(), Some(0)); + assert_eq!(bounds.n_following_rows(), Some(1)); + + let bounds = RowsFrameBounds { + start: UnboundedPreceding, + end: Following(10), + }; + assert!(bounds.validate().is_ok()); + assert!(bounds.is_canonical()); + assert_eq!(bounds.start.to_offset(), None); + assert_eq!(bounds.end.to_offset(), Some(10)); + assert_eq!(bounds.n_preceding_rows(), None); + assert_eq!(bounds.n_following_rows(), Some(10)); + + let bounds = RowsFrameBounds { + start: Preceding(10), + end: UnboundedFollowing, + }; + assert!(bounds.validate().is_ok()); + assert!(bounds.is_canonical()); + assert_eq!(bounds.start.to_offset(), Some(-10)); + assert_eq!(bounds.end.to_offset(), None); + assert_eq!(bounds.n_preceding_rows(), Some(10)); + assert_eq!(bounds.n_following_rows(), None); + + let bounds = RowsFrameBounds { + start: Preceding(1), + end: Preceding(10), + }; + assert!(bounds.validate().is_ok()); + assert!(!bounds.is_canonical()); + assert_eq!(bounds.start.to_offset(), Some(-1)); + assert_eq!(bounds.end.to_offset(), Some(-10)); + assert_eq!(bounds.n_preceding_rows(), Some(10)); + assert_eq!(bounds.n_following_rows(), Some(0)); + + let bounds = RowsFrameBounds { + start: Following(10), + end: Following(1), + }; + assert!(bounds.validate().is_ok()); + assert!(!bounds.is_canonical()); + assert_eq!(bounds.start.to_offset(), Some(10)); + assert_eq!(bounds.end.to_offset(), Some(1)); + assert_eq!(bounds.n_preceding_rows(), Some(0)); + assert_eq!(bounds.n_following_rows(), Some(10)); + + let bounds = RowsFrameBounds { + start: UnboundedFollowing, + end: Following(10), + }; + assert!(bounds.validate().is_err()); + assert!(!bounds.is_canonical()); + + let bounds = RowsFrameBounds { + start: Preceding(10), + end: UnboundedPreceding, + }; + assert!(bounds.validate().is_err()); + assert!(!bounds.is_canonical()); + } +} From 152ba9eee2a9c1f854d5c6287ea1634860fabf6b Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 30 May 2024 17:06:36 +0800 Subject: [PATCH 3/3] remove a `pub` Signed-off-by: Richard Chien --- src/expr/core/src/window_function/range.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/expr/core/src/window_function/range.rs b/src/expr/core/src/window_function/range.rs index ebc2f5ef2d4b4..922e247dd8369 100644 --- a/src/expr/core/src/window_function/range.rs +++ b/src/expr/core/src/window_function/range.rs @@ -366,7 +366,7 @@ impl Deref for RangeFrameOffset { #[derive(Debug, Educe)] #[educe(Clone, Copy)] -pub struct RangeFrameOffsetRef<'a> { +struct RangeFrameOffsetRef<'a> { /// Built expression for `$0 + offset`. add_expr: &'a dyn Expression, /// Built expression for `$0 - offset`.