diff --git a/proto/expr.proto b/proto/expr.proto index 9a6b444498097..b0083c0ba426d 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -442,8 +442,12 @@ message WindowFrame { Bound end = 3; Exclusion exclusion = 4; + // The data type of order column in `RANGE` frame. + optional data.DataType order_data_type = 10; + // The order type of order column in `RANGE` frame. + optional common.OrderType order_type = 15; // The data type of offset value in `RANGE` frame. - optional data.DataType offset_data_type = 10; + optional data.DataType offset_data_type = 20; } message WindowFunction { diff --git a/src/expr/core/src/window_function/call.rs b/src/expr/core/src/window_function/call.rs index af5d2f45211be..9d0ed78002c25 100644 --- a/src/expr/core/src/window_function/call.rs +++ b/src/expr/core/src/window_function/call.rs @@ -13,12 +13,16 @@ // limitations under the License. use std::fmt::Display; +use std::ops::Deref; +use std::sync::Arc; use anyhow::Context; +use educe::Educe; use enum_as_inner::EnumAsInner; use parse_display::Display; use risingwave_common::bail; use risingwave_common::types::{DataType, Datum, IsNegative, ScalarImpl, ScalarRefImpl, ToText}; +use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt}; use risingwave_pb::expr::window_frame::{PbBound, PbExclusion}; use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction}; @@ -26,6 +30,9 @@ use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, Unbounded use super::WindowFuncKind; use crate::aggregate::AggArgs; +use crate::expr::{ + build_func, BoxedExpression, ExpressionBoxExt, InputRefExpression, LiteralExpression, +}; use crate::Result; #[derive(Debug, Clone)] @@ -97,10 +104,22 @@ impl Frame { FrameBounds::Rows(RowsFrameBounds { start, end }) } PbType::Range => { + let order_data_type = DataType::from(frame.get_order_data_type()?); + let order_type = OrderType::from_protobuf(frame.get_order_type()?); let offset_data_type = DataType::from(frame.get_offset_data_type()?); - let start = FrameBound::::from_protobuf(start, &offset_data_type)?; - let end = FrameBound::::from_protobuf(end, &offset_data_type)?; + let start = FrameBound::::from_protobuf( + start, + &order_data_type, + &offset_data_type, + )?; + let end = FrameBound::::from_protobuf( + end, + &order_data_type, + &offset_data_type, + )?; FrameBounds::Range(RangeFrameBounds { + order_data_type, + order_type, offset_data_type, start, end, @@ -120,9 +139,13 @@ impl Frame { start: Some(start.to_protobuf()), end: Some(end.to_protobuf()), exclusion, + order_data_type: None, + order_type: None, offset_data_type: None, }, FrameBounds::Range(RangeFrameBounds { + order_data_type, + order_type, offset_data_type, start, end, @@ -131,6 +154,8 @@ impl Frame { start: Some(start.to_protobuf()), end: Some(end.to_protobuf()), exclusion, + order_data_type: Some(order_data_type.to_protobuf()), + order_type: Some(order_type.to_protobuf()), offset_data_type: Some(offset_data_type.to_protobuf()), }, } @@ -191,9 +216,46 @@ impl FrameBoundsImpl for RowsFrameBounds { #[derive(Debug, Clone, Eq, PartialEq, Hash)] pub struct RangeFrameBounds { + pub order_data_type: DataType, + pub order_type: OrderType, pub offset_data_type: DataType, - pub start: FrameBound, - pub end: FrameBound, + pub start: FrameBound, + pub end: FrameBound, +} + +/// The wrapper type for [`ScalarImpl`] range frame offset, containing +/// two expressions to help adding and subtracting the offset. +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct RangeFrameOffset { + /// The original offset value. + offset: ScalarImpl, + /// Built expression for `$0 + offset`. + #[expect(dead_code)] + #[educe(PartialEq(ignore), Hash(ignore))] + add_expr: Option>, + /// Built expression for `$0 - offset`. + #[expect(dead_code)] + #[educe(PartialEq(ignore), Hash(ignore))] + sub_expr: Option>, +} + +impl RangeFrameOffset { + pub fn new(offset: ScalarImpl) -> Self { + Self { + offset, + add_expr: None, + sub_expr: None, + } + } +} + +impl Deref for RangeFrameOffset { + type Target = ScalarImpl; + + fn deref(&self) -> &Self::Target { + &self.offset + } } impl Display for RangeFrameBounds { @@ -291,6 +353,16 @@ impl FrameBound { Ok(()) } + + pub fn map(self, f: impl Fn(T) -> U) -> FrameBound { + match self { + UnboundedPreceding => UnboundedPreceding, + Preceding(offset) => Preceding(f(offset)), + CurrentRow => CurrentRow, + Following(offset) => Following(f(offset)), + UnboundedFollowing => UnboundedFollowing, + } + } } impl FrameBound { @@ -353,29 +425,54 @@ impl FrameBound { } } -impl FrameBound { - pub fn from_protobuf(bound: &PbBound, offset_data_type: &DataType) -> Result { +impl FrameBound { + pub fn from_protobuf( + bound: &PbBound, + order_data_type: &DataType, + offset_data_type: &DataType, + ) -> Result { + use risingwave_pb::expr::expr_node::PbType as PbExprType; use risingwave_pb::expr::window_frame::bound::PbOffset; use risingwave_pb::expr::window_frame::PbBoundType; let offset = bound.get_offset()?; let bound = match offset { - PbOffset::Integer(_) => bail!("offset of `FrameBound` must be `Datum`"), + PbOffset::Integer(_) => bail!("offset of `RANGE` frame bound must be `Datum`"), PbOffset::Datum(offset) => match bound.get_type()? { PbBoundType::Unspecified => bail!("unspecified type of `FrameBound`"), PbBoundType::UnboundedPreceding => Self::UnboundedPreceding, - PbBoundType::Preceding => Self::Preceding( - Datum::from_protobuf(offset, offset_data_type) - .context("offset `Datum` is not decodable")? - .context("offset of `FrameBound` must be non-NULL")?, - ), PbBoundType::CurrentRow => Self::CurrentRow, - PbBoundType::Following => Self::Following( - Datum::from_protobuf(offset, offset_data_type) - .context("offset `Datum` is not decodable")? - .context("offset of `FrameBound` must be non-NULL")?, - ), PbBoundType::UnboundedFollowing => Self::UnboundedFollowing, + bound_type @ (PbBoundType::Preceding | PbBoundType::Following) => { + let offset_value = Datum::from_protobuf(offset, offset_data_type) + .context("offset `Datum` is not decodable")? + .context("offset of `FrameBound` must be non-NULL")?; + let input_expr = InputRefExpression::new(order_data_type.clone(), 0); + let offset_expr = LiteralExpression::new( + offset_data_type.clone(), + Some(offset_value.clone()), + ); + let add_expr = build_func( + PbExprType::Add, + order_data_type.clone(), + vec![input_expr.clone().boxed(), offset_expr.clone().boxed()], + )?; + let sub_expr = build_func( + PbExprType::Subtract, + order_data_type.clone(), + vec![input_expr.boxed(), offset_expr.boxed()], + )?; + let offset = RangeFrameOffset { + offset: offset_value, + add_expr: Some(Arc::new(add_expr)), + sub_expr: Some(Arc::new(sub_expr)), + }; + if bound_type == PbBoundType::Preceding { + Self::Preceding(offset) + } else { + Self::Following(offset) + } + } }, }; Ok(bound) @@ -392,12 +489,12 @@ impl FrameBound { ), Self::Preceding(offset) => ( PbBoundType::Preceding, - PbOffset::Datum(Some(offset).to_protobuf()), + PbOffset::Datum(Some(offset.as_scalar_ref_impl()).to_protobuf()), ), Self::CurrentRow => (PbBoundType::CurrentRow, PbOffset::Datum(Default::default())), Self::Following(offset) => ( PbBoundType::Following, - PbOffset::Datum(Some(offset).to_protobuf()), + PbOffset::Datum(Some(offset.as_scalar_ref_impl()).to_protobuf()), ), Self::UnboundedFollowing => ( PbBoundType::UnboundedFollowing, @@ -411,7 +508,7 @@ impl FrameBound { } } -impl FrameBound { +impl FrameBound { fn for_display(&self) -> FrameBound { match self { UnboundedPreceding => UnboundedPreceding, diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index a7eb1075649b7..3364799017987 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -27,8 +27,8 @@ use risingwave_common::types::{data_types, DataType, ScalarImpl, Timestamptz}; use risingwave_common::{bail_not_implemented, current_cluster_version, no_function}; use risingwave_expr::aggregate::{agg_kinds, AggKind}; use risingwave_expr::window_function::{ - Frame, FrameBound, FrameBounds, FrameExclusion, RangeFrameBounds, RowsFrameBounds, - WindowFuncKind, + Frame, FrameBound, FrameBounds, FrameExclusion, RangeFrameBounds, RangeFrameOffset, + RowsFrameBounds, WindowFuncKind, }; use risingwave_sqlparser::ast::{ self, Expr as AstExpr, Function, FunctionArg, FunctionArgExpr, Ident, SelectItem, SetExpr, @@ -680,7 +680,7 @@ impl Binder { FrameBounds::Rows(RowsFrameBounds { start, end }) } WindowFrameUnits::Range => { - let order_data_type = order_by + let order_by_expr = order_by .sort_exprs .iter() // for `RANGE` frame, there should be exactly one `ORDER BY` column @@ -690,14 +690,14 @@ impl Binder { "there should be exactly one ordering column for `RANGE` frame" .to_string(), ) - })? - .expr - .return_type(); + })?; + let order_data_type = order_by_expr.expr.return_type(); + let order_type = order_by_expr.order_type; - let offset_data_type = match order_data_type { + let offset_data_type = match &order_data_type { // for numeric ordering columns, `offset` should be the same type // NOTE: actually in PG it can be a larger type, but we don't support this here - t @ data_types::range_frame_numeric!() => t, + t @ data_types::range_frame_numeric!() => t.clone(), // for datetime ordering columns, `offset` should be interval data_types::range_frame_datetime!() => DataType::Interval, // other types are not supported @@ -719,9 +719,11 @@ impl Binder { &offset_data_type, )?; FrameBounds::Range(RangeFrameBounds { + order_data_type, + order_type, offset_data_type, - start, - end, + start: start.map(RangeFrameOffset::new), + end: end.map(RangeFrameOffset::new), }) } WindowFrameUnits::Groups => {