From 570e5f39d7f3cfb0b55292093c052ca710331d85 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 19 Jan 2024 15:45:00 +0800 Subject: [PATCH] convert between rust type and pb Signed-off-by: Richard Chien --- proto/expr.proto | 7 +- src/expr/core/src/window_function/call.rs | 94 +++++++++++++++++++++-- src/frontend/src/binder/expr/function.rs | 10 ++- 3 files changed, 100 insertions(+), 11 deletions(-) diff --git a/proto/expr.proto b/proto/expr.proto index 9c6dd8e59fbfc..c644a1005bd3a 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -410,9 +410,9 @@ message AggCall { message WindowFrame { enum Type { TYPE_UNSPECIFIED = 0; - // RANGE = 1; + TYPE_RANGE = 1; TYPE_ROWS = 2; - // GROUPS = 3; + // TYPE_GROUPS = 3; } enum BoundType { BOUND_TYPE_UNSPECIFIED = 0; @@ -441,6 +441,9 @@ message WindowFrame { Bound start = 2; Bound end = 3; Exclusion exclusion = 4; + + // The data type of offset value in `RANGE` frame. + optional data.DataType offset_data_type = 10; } message WindowFunction { diff --git a/src/expr/core/src/window_function/call.rs b/src/expr/core/src/window_function/call.rs index 558e7af1324cf..46ac09700487a 100644 --- a/src/expr/core/src/window_function/call.rs +++ b/src/expr/core/src/window_function/call.rs @@ -14,10 +14,12 @@ use std::fmt::Display; +use anyhow::Context; use enum_as_inner::EnumAsInner; use parse_display::Display; use risingwave_common::bail; -use risingwave_common::types::{DataType, IsNegative, ScalarImpl, ScalarRefImpl, ToText}; +use risingwave_common::types::{DataType, Datum, IsNegative, ScalarImpl, ScalarRefImpl, ToText}; +use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt}; use risingwave_pb::expr::window_frame::{PbBound, PbExclusion}; use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction}; use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding}; @@ -85,13 +87,25 @@ impl Frame { impl Frame { pub fn from_protobuf(frame: &PbWindowFrame) -> Result { use risingwave_pb::expr::window_frame::PbType; + let start = frame.get_start()?; + let end = frame.get_end()?; let bounds = match frame.get_type()? { PbType::Unspecified => bail!("unspecified type of `WindowFrame`"), PbType::Rows => { - let start = FrameBound::from_protobuf(frame.get_start()?)?; - let end = FrameBound::from_protobuf(frame.get_end()?)?; + let start = FrameBound::::from_protobuf(start)?; + let end = FrameBound::::from_protobuf(end)?; FrameBounds::Rows(RowsFrameBounds { start, end }) } + PbType::Range => { + let offset_data_type = DataType::from(frame.get_offset_data_type()?); + let start = FrameBound::::from_protobuf(start, &offset_data_type)?; + let end = FrameBound::::from_protobuf(end, &offset_data_type)?; + FrameBounds::Range(RangeFrameBounds { + offset_data_type, + start, + end, + }) + } }; let exclusion = FrameExclusion::from_protobuf(frame.get_exclusion()?)?; Ok(Self { bounds, exclusion }) @@ -106,10 +120,19 @@ impl Frame { start: Some(start.to_protobuf()), end: Some(end.to_protobuf()), exclusion, + offset_data_type: None, + }, + FrameBounds::Range(RangeFrameBounds { + offset_data_type, + start, + end, + }) => PbWindowFrame { + r#type: PbType::Range as _, + start: Some(start.to_protobuf()), + end: Some(end.to_protobuf()), + exclusion, + offset_data_type: Some(offset_data_type.to_protobuf()), }, - FrameBounds::Range(_) => { - todo!("`RANGE` frame should be temporarily banned in `LogicalOverWindow`") - } } } } @@ -180,6 +203,7 @@ pub enum FrameBound { #[derive(Debug, Clone, Eq, PartialEq, Hash)] pub struct RangeFrameBounds { + pub offset_data_type: DataType, pub start: FrameBound, pub end: FrameBound, } @@ -329,6 +353,64 @@ impl FrameBound { } } +impl FrameBound { + pub fn from_protobuf(bound: &PbBound, offset_data_type: &DataType) -> Result { + use risingwave_pb::expr::window_frame::bound::PbOffset; + use risingwave_pb::expr::window_frame::PbBoundType; + + let offset = bound.get_offset()?; + let bound = match offset { + PbOffset::Integer(_) => bail!("offset of `FrameBound` must be `Datum`"), + PbOffset::Datum(offset) => match bound.get_type()? { + PbBoundType::Unspecified => bail!("unspecified type of `FrameBound`"), + PbBoundType::UnboundedPreceding => Self::UnboundedPreceding, + PbBoundType::Preceding => Self::Preceding( + Datum::from_protobuf(offset, offset_data_type) + .context("offset `Datum` is not decodable")? + .context("offset of `FrameBound` must be non-NULL")?, + ), + PbBoundType::CurrentRow => Self::CurrentRow, + PbBoundType::Following => Self::Following( + Datum::from_protobuf(offset, offset_data_type) + .context("offset `Datum` is not decodable")? + .context("offset of `FrameBound` must be non-NULL")?, + ), + PbBoundType::UnboundedFollowing => Self::UnboundedFollowing, + }, + }; + Ok(bound) + } + + pub fn to_protobuf(&self) -> PbBound { + use risingwave_pb::expr::window_frame::bound::PbOffset; + use risingwave_pb::expr::window_frame::PbBoundType; + + let (r#type, offset) = match self { + Self::UnboundedPreceding => ( + PbBoundType::UnboundedPreceding, + PbOffset::Datum(Default::default()), + ), + Self::Preceding(offset) => ( + PbBoundType::Preceding, + PbOffset::Datum(Some(offset).to_protobuf()), + ), + Self::CurrentRow => (PbBoundType::CurrentRow, PbOffset::Datum(Default::default())), + Self::Following(offset) => ( + PbBoundType::Following, + PbOffset::Datum(Some(offset).to_protobuf()), + ), + Self::UnboundedFollowing => ( + PbBoundType::UnboundedFollowing, + PbOffset::Datum(Default::default()), + ), + }; + PbBound { + r#type: r#type as _, + offset: Some(offset), + } + } +} + impl FrameBound { fn for_display(&self) -> FrameBound { match self { diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 137cb02a8745a..338f11454caaf 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -716,9 +716,13 @@ impl Binder { let (start, end) = self.bind_window_frame_scalar_impl_bounds( frame.start_bound, frame.end_bound, - offset_data_type, + &offset_data_type, )?; - FrameBounds::Range(RangeFrameBounds { start, end }) + FrameBounds::Range(RangeFrameBounds { + start, + end, + offset_data_type, + }) } WindowFrameUnits::Groups => { bail_not_implemented!( @@ -778,7 +782,7 @@ impl Binder { &mut self, start: WindowFrameBound, end: Option, - offset_data_type: DataType, + offset_data_type: &DataType, ) -> Result<(FrameBound, FrameBound)> { let mut convert_bound = |bound| -> Result> { Ok(match bound {