Skip to content

Commit

Permalink
pass order data type and order type via message WindowFrame
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Jan 23, 2024
1 parent 5a7858c commit ea66a9c
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 31 deletions.
6 changes: 5 additions & 1 deletion proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,12 @@ message WindowFrame {
Bound end = 3;
Exclusion exclusion = 4;

// The data type of order column in `RANGE` frame.
optional data.DataType order_data_type = 10;
// The order type of order column in `RANGE` frame.
optional common.OrderType order_type = 15;
// The data type of offset value in `RANGE` frame.
optional data.DataType offset_data_type = 10;
optional data.DataType offset_data_type = 20;
}

message WindowFunction {
Expand Down
137 changes: 117 additions & 20 deletions src/expr/core/src/window_function/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,26 @@
// limitations under the License.

use std::fmt::Display;
use std::ops::Deref;
use std::sync::Arc;

use anyhow::Context;
use educe::Educe;
use enum_as_inner::EnumAsInner;
use parse_display::Display;
use risingwave_common::bail;
use risingwave_common::types::{DataType, Datum, IsNegative, ScalarImpl, ScalarRefImpl, ToText};
use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
use risingwave_pb::expr::window_frame::{PbBound, PbExclusion};
use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction};
use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding};

use super::WindowFuncKind;
use crate::aggregate::AggArgs;
use crate::expr::{
build_func, BoxedExpression, ExpressionBoxExt, InputRefExpression, LiteralExpression,
};
use crate::Result;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -97,10 +104,22 @@ impl Frame {
FrameBounds::Rows(RowsFrameBounds { start, end })
}
PbType::Range => {
let order_data_type = DataType::from(frame.get_order_data_type()?);
let order_type = OrderType::from_protobuf(frame.get_order_type()?);
let offset_data_type = DataType::from(frame.get_offset_data_type()?);
let start = FrameBound::<ScalarImpl>::from_protobuf(start, &offset_data_type)?;
let end = FrameBound::<ScalarImpl>::from_protobuf(end, &offset_data_type)?;
let start = FrameBound::<RangeFrameOffset>::from_protobuf(
start,
&order_data_type,
&offset_data_type,
)?;
let end = FrameBound::<RangeFrameOffset>::from_protobuf(
end,
&order_data_type,
&offset_data_type,
)?;
FrameBounds::Range(RangeFrameBounds {
order_data_type,
order_type,
offset_data_type,
start,
end,
Expand All @@ -120,9 +139,13 @@ impl Frame {
start: Some(start.to_protobuf()),
end: Some(end.to_protobuf()),
exclusion,
order_data_type: None,
order_type: None,
offset_data_type: None,
},
FrameBounds::Range(RangeFrameBounds {
order_data_type,
order_type,
offset_data_type,
start,
end,
Expand All @@ -131,6 +154,8 @@ impl Frame {
start: Some(start.to_protobuf()),
end: Some(end.to_protobuf()),
exclusion,
order_data_type: Some(order_data_type.to_protobuf()),
order_type: Some(order_type.to_protobuf()),
offset_data_type: Some(offset_data_type.to_protobuf()),
},
}
Expand Down Expand Up @@ -191,9 +216,46 @@ impl FrameBoundsImpl for RowsFrameBounds {

#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct RangeFrameBounds {
pub order_data_type: DataType,
pub order_type: OrderType,
pub offset_data_type: DataType,
pub start: FrameBound<ScalarImpl>,
pub end: FrameBound<ScalarImpl>,
pub start: FrameBound<RangeFrameOffset>,
pub end: FrameBound<RangeFrameOffset>,
}

/// The wrapper type for [`ScalarImpl`] range frame offset, containing
/// two expressions to help adding and subtracting the offset.
#[derive(Debug, Clone, Educe)]
#[educe(PartialEq, Eq, Hash)]
pub struct RangeFrameOffset {
/// The original offset value.
offset: ScalarImpl,
/// Built expression for `$0 + offset`.
#[expect(dead_code)]
#[educe(PartialEq(ignore), Hash(ignore))]
add_expr: Option<Arc<BoxedExpression>>,
/// Built expression for `$0 - offset`.
#[expect(dead_code)]
#[educe(PartialEq(ignore), Hash(ignore))]
sub_expr: Option<Arc<BoxedExpression>>,
}

impl RangeFrameOffset {
pub fn new(offset: ScalarImpl) -> Self {
Self {
offset,
add_expr: None,
sub_expr: None,
}
}
}

impl Deref for RangeFrameOffset {
type Target = ScalarImpl;

fn deref(&self) -> &Self::Target {
&self.offset
}
}

impl Display for RangeFrameBounds {
Expand Down Expand Up @@ -291,6 +353,16 @@ impl<T> FrameBound<T> {

Ok(())
}

pub fn map<U>(self, f: impl Fn(T) -> U) -> FrameBound<U> {
match self {
UnboundedPreceding => UnboundedPreceding,
Preceding(offset) => Preceding(f(offset)),
CurrentRow => CurrentRow,
Following(offset) => Following(f(offset)),
UnboundedFollowing => UnboundedFollowing,
}
}
}

impl FrameBound<usize> {
Expand Down Expand Up @@ -353,29 +425,54 @@ impl FrameBound<usize> {
}
}

impl FrameBound<ScalarImpl> {
pub fn from_protobuf(bound: &PbBound, offset_data_type: &DataType) -> Result<Self> {
impl FrameBound<RangeFrameOffset> {
pub fn from_protobuf(
bound: &PbBound,
order_data_type: &DataType,
offset_data_type: &DataType,
) -> Result<Self> {
use risingwave_pb::expr::expr_node::PbType as PbExprType;
use risingwave_pb::expr::window_frame::bound::PbOffset;
use risingwave_pb::expr::window_frame::PbBoundType;

let offset = bound.get_offset()?;
let bound = match offset {
PbOffset::Integer(_) => bail!("offset of `FrameBound<ScalarImpl>` must be `Datum`"),
PbOffset::Integer(_) => bail!("offset of `RANGE` frame bound must be `Datum`"),
PbOffset::Datum(offset) => match bound.get_type()? {
PbBoundType::Unspecified => bail!("unspecified type of `FrameBound<ScalarImpl>`"),
PbBoundType::UnboundedPreceding => Self::UnboundedPreceding,
PbBoundType::Preceding => Self::Preceding(
Datum::from_protobuf(offset, offset_data_type)
.context("offset `Datum` is not decodable")?
.context("offset of `FrameBound<ScalarImpl>` must be non-NULL")?,
),
PbBoundType::CurrentRow => Self::CurrentRow,
PbBoundType::Following => Self::Following(
Datum::from_protobuf(offset, offset_data_type)
.context("offset `Datum` is not decodable")?
.context("offset of `FrameBound<ScalarImpl>` must be non-NULL")?,
),
PbBoundType::UnboundedFollowing => Self::UnboundedFollowing,
bound_type @ (PbBoundType::Preceding | PbBoundType::Following) => {
let offset_value = Datum::from_protobuf(offset, offset_data_type)
.context("offset `Datum` is not decodable")?
.context("offset of `FrameBound<ScalarImpl>` must be non-NULL")?;
let input_expr = InputRefExpression::new(order_data_type.clone(), 0);
let offset_expr = LiteralExpression::new(
offset_data_type.clone(),
Some(offset_value.clone()),
);
let add_expr = build_func(
PbExprType::Add,
order_data_type.clone(),
vec![input_expr.clone().boxed(), offset_expr.clone().boxed()],
)?;
let sub_expr = build_func(
PbExprType::Subtract,
order_data_type.clone(),
vec![input_expr.boxed(), offset_expr.boxed()],
)?;
let offset = RangeFrameOffset {
offset: offset_value,
add_expr: Some(Arc::new(add_expr)),
sub_expr: Some(Arc::new(sub_expr)),
};
if bound_type == PbBoundType::Preceding {
Self::Preceding(offset)
} else {
Self::Following(offset)
}
}
},
};
Ok(bound)
Expand All @@ -392,12 +489,12 @@ impl FrameBound<ScalarImpl> {
),
Self::Preceding(offset) => (
PbBoundType::Preceding,
PbOffset::Datum(Some(offset).to_protobuf()),
PbOffset::Datum(Some(offset.as_scalar_ref_impl()).to_protobuf()),
),
Self::CurrentRow => (PbBoundType::CurrentRow, PbOffset::Datum(Default::default())),
Self::Following(offset) => (
PbBoundType::Following,
PbOffset::Datum(Some(offset).to_protobuf()),
PbOffset::Datum(Some(offset.as_scalar_ref_impl()).to_protobuf()),
),
Self::UnboundedFollowing => (
PbBoundType::UnboundedFollowing,
Expand All @@ -411,7 +508,7 @@ impl FrameBound<ScalarImpl> {
}
}

impl FrameBound<ScalarImpl> {
impl FrameBound<RangeFrameOffset> {
fn for_display(&self) -> FrameBound<String> {
match self {
UnboundedPreceding => UnboundedPreceding,
Expand Down
22 changes: 12 additions & 10 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use risingwave_common::types::{data_types, DataType, ScalarImpl, Timestamptz};
use risingwave_common::{bail_not_implemented, current_cluster_version, no_function};
use risingwave_expr::aggregate::{agg_kinds, AggKind};
use risingwave_expr::window_function::{
Frame, FrameBound, FrameBounds, FrameExclusion, RangeFrameBounds, RowsFrameBounds,
WindowFuncKind,
Frame, FrameBound, FrameBounds, FrameExclusion, RangeFrameBounds, RangeFrameOffset,
RowsFrameBounds, WindowFuncKind,
};
use risingwave_sqlparser::ast::{
self, Expr as AstExpr, Function, FunctionArg, FunctionArgExpr, Ident, SelectItem, SetExpr,
Expand Down Expand Up @@ -680,7 +680,7 @@ impl Binder {
FrameBounds::Rows(RowsFrameBounds { start, end })
}
WindowFrameUnits::Range => {
let order_data_type = order_by
let order_by_expr = order_by
.sort_exprs
.iter()
// for `RANGE` frame, there should be exactly one `ORDER BY` column
Expand All @@ -690,14 +690,14 @@ impl Binder {
"there should be exactly one ordering column for `RANGE` frame"
.to_string(),
)
})?
.expr
.return_type();
})?;
let order_data_type = order_by_expr.expr.return_type();
let order_type = order_by_expr.order_type;

let offset_data_type = match order_data_type {
let offset_data_type = match &order_data_type {
// for numeric ordering columns, `offset` should be the same type
// NOTE: actually in PG it can be a larger type, but we don't support this here
t @ data_types::range_frame_numeric!() => t,
t @ data_types::range_frame_numeric!() => t.clone(),
// for datetime ordering columns, `offset` should be interval
data_types::range_frame_datetime!() => DataType::Interval,
// other types are not supported
Expand All @@ -719,9 +719,11 @@ impl Binder {
&offset_data_type,
)?;
FrameBounds::Range(RangeFrameBounds {
order_data_type,
order_type,
offset_data_type,
start,
end,
start: start.map(RangeFrameOffset::new),
end: end.map(RangeFrameOffset::new),
})
}
WindowFrameUnits::Groups => {
Expand Down

0 comments on commit ea66a9c

Please sign in to comment.